zhangxinlei1996 преди 1 година
родител
ревизия
ad56ebf107

+ 1 - 0
data_mgo_to_tidb_project/common.toml

@@ -41,6 +41,7 @@ projectsetIdGt = "5a7986d540d2d9bbe8396c7f"
 projectsetIdLt = "59a4e2cf5d11e1c745ea7022"
 ch = 5
 tableName = "projectset"
+crontab = "0 19 14 ? * *"
 
 [alarm]
   isOpen = true                        # 异常通知开关。默认关闭

+ 1 - 0
data_mgo_to_tidb_project/config/conf.go

@@ -74,6 +74,7 @@ type info struct {
 	ProjectsetIdLt string
 	Ch             int    //并发
 	TableName      string //表名 projectset
+	Crontab        string
 }
 
 type alarm struct {

BIN
data_mgo_to_tidb_project/data_mgo_to_tidb_project.exe


BIN
data_mgo_to_tidb_project/data_mgo_to_tidb_project_cuiliang


BIN
data_mgo_to_tidb_project/data_mgo_to_tidb_project → data_mgo_to_tidb_project/data_mgo_to_tidb_project_zengliang


+ 1 - 0
data_mgo_to_tidb_project/go.mod

@@ -7,6 +7,7 @@ require (
 	bp.jydev.jianyu360.cn/BP/jynsq v0.0.0-20220222052708-ebc43af90698
 	github.com/BurntSushi/toml v1.2.0
 	github.com/gogf/gf/v2 v2.5.6
+	github.com/robfig/cron v1.2.0
 	go.mongodb.org/mongo-driver v1.10.3
 	go.uber.org/zap v1.23.0
 	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22

+ 2 - 0
data_mgo_to_tidb_project/go.sum

@@ -213,6 +213,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
 github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
 github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=

+ 13 - 1
data_mgo_to_tidb_project/main.go

@@ -3,6 +3,8 @@ package main
 import (
 	"data_mgo_to_tidb_project/config"
 
+	"github.com/robfig/cron"
+
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 )
@@ -24,8 +26,18 @@ func init() {
 }
 
 func main() {
+	//存量
+	// Projectset("", "")
 
-	Projectset()
+	// return
+	//增量
+	cr := cron.New()
+	cr.AddFunc(config.Conf.Info.Crontab, func() {
+		if gteid, ltid := GetCurTimePiInfo(); gteid != "" && ltid != "" {
+			Projectset(gteid, ltid)
+		}
+	})
+	cr.Start()
 
 	select {}
 }

+ 31 - 8
data_mgo_to_tidb_project/service.go

@@ -18,7 +18,7 @@ import (
 )
 
 //projectset同步
-func Projectset() {
+func Projectset(gteId, ltId string) {
 	sess := MongoP.GetMgoConn()
 	defer MongoP.DestoryMongoConn(sess)
 
@@ -26,19 +26,37 @@ func Projectset() {
 	wg := &sync.WaitGroup{}
 
 	query := map[string]interface{}{}
-	if config.Conf.Info.ProjectsetIdGt != "" {
+	if gteId != "" && ltId != "" {
+		query = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": mongodb.StringTOBsonId(gteId),
+				"$lt": mongodb.StringTOBsonId(ltId),
+			},
+		}
+		log.Info("info", zap.Any("ids_zengliang", map[string]interface{}{
+			"gteId": gteId,
+			"lteId": ltId,
+		}))
+
+	} else if config.Conf.Info.ProjectsetIdGt != "" {
 		query = map[string]interface{}{
 			"_id": map[string]interface{}{
 				"$gt": mongodb.StringTOBsonId(config.Conf.Info.ProjectsetIdGt),
 				"$lt": mongodb.StringTOBsonId(config.Conf.Info.ProjectsetIdLt),
 			},
 		}
+
+		log.Info("info", zap.Any("ids_cunliang", map[string]interface{}{
+			"gteId": config.Conf.Info.ProjectsetIdGt,
+			"lteId": config.Conf.Info.ProjectsetIdLt,
+		}))
 	}
 	log.Info("info", zap.Any("query", query))
+
 	it := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Info.TableName).Find(query).Sort("-_id").Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
-		if count%20000 == 0 {
+		if count%10000 == 0 {
 			log.Info(fmt.Sprintf("current --- %d ,%s", count, tmp["_id"]))
 		}
 		ch <- true
@@ -149,9 +167,13 @@ func dwd_f_bid_project_final_package(tx *sql.Tx, tmp map[string]interface{}) boo
 				saveMap := map[string]interface{}{}
 				//
 				saveMap["s_projectid"] = _id
-				saveMap["s_packagecode"] = gconv.String(vv["origin"]) //标(包)段编号
-				saveMap["s_packagename"] = gconv.String(vv["name"])   //标段名称
-				saveMap["s_detail"] = gconv.String(vv["text"])        //标段内容
+				packagecode := gconv.String(vv["origin"])
+				if VarcharCheck(packagecode, 255) {
+					continue
+				}
+				saveMap["s_packagecode"] = packagecode              //标(包)段编号
+				saveMap["s_packagename"] = gconv.String(vv["name"]) //标段名称
+				saveMap["s_detail"] = gconv.String(vv["text"])      //标段内容
 				//
 				TransferMoneyRateInfo(vv, &saveMap, []string{"budget", "bidamount"}, []float64{1000000000.0, 1000000000.0})
 				//
@@ -273,8 +295,9 @@ func InsertGlobalMysqlData(tx *sql.Tx, name string, data map[string]interface{},
 	id := MysqlTool.InsertByTx(tx, name, data)
 	if id == -1 {
 		log.Info("插入数据异常", zap.String(name, mark))
-		Alert(fmt.Sprintf("%s表 检测到异常数据同步,id:%s </br>", name, mark))
-
+		if config.Conf.Alarm.IsOpen {
+			Alert(fmt.Sprintf("%s表 检测到异常数据同步,id:%s </br>", name, mark))
+		}
 	}
 	return id
 }

+ 12 - 0
data_mgo_to_tidb_project/util.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"log"
 	"reflect"
+	"strconv"
 	"strings"
 	"time"
 	"unicode/utf8"
@@ -175,3 +176,14 @@ func GetObjectId() {
 func VarcharCheck(str string, length int) bool {
 	return utf8.RuneCountInString(str) > length
 }
+
+//获取每天0点数据
+func GetCurTimePiInfo() (string, string) {
+	now := time.Now()
+	start := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
+	end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gteid := strconv.FormatInt(start, 16) + "0000000000000000"
+	ltid := strconv.FormatInt(end, 16) + "0000000000000000"
+	log.Println("curtime:", gteid, ltid, "==", start, end)
+	return gteid, ltid
+}