Ver código fonte

Merge branch 'master' of https://jygit.jydev.jianyu360.cn/data_processing/data_field_dispose

zhengkun 1 ano atrás
pai
commit
228953446e

BIN
data_clear_sync/data_sync


+ 1 - 0
data_mgo_to_tidb_project/common.toml

@@ -41,6 +41,7 @@ projectsetIdGt = "5a7986d540d2d9bbe8396c7f"
 projectsetIdLt = "59a4e2cf5d11e1c745ea7022"
 ch = 5
 tableName = "projectset"
+crontab = "0 19 14 ? * *"
 
 [alarm]
   isOpen = true                        # 异常通知开关。默认关闭

+ 1 - 0
data_mgo_to_tidb_project/config/conf.go

@@ -74,6 +74,7 @@ type info struct {
 	ProjectsetIdLt string
 	Ch             int    //并发
 	TableName      string //表名 projectset
+	Crontab        string
 }
 
 type alarm struct {

BIN
data_tidb/data_tidb → data_mgo_to_tidb_project/data_mgo_to_tidb_project.exe


BIN
data_tidb/data_tidb_linux → data_mgo_to_tidb_project/data_mgo_to_tidb_project_cuiliang


BIN
data_mgo_to_tidb_project/data_mgo_to_tidb_project → data_mgo_to_tidb_project/data_mgo_to_tidb_project_zengliang


+ 1 - 0
data_mgo_to_tidb_project/go.mod

@@ -7,6 +7,7 @@ require (
 	bp.jydev.jianyu360.cn/BP/jynsq v0.0.0-20220222052708-ebc43af90698
 	github.com/BurntSushi/toml v1.2.0
 	github.com/gogf/gf/v2 v2.5.6
+	github.com/robfig/cron v1.2.0
 	go.mongodb.org/mongo-driver v1.10.3
 	go.uber.org/zap v1.23.0
 	gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22

+ 2 - 0
data_mgo_to_tidb_project/go.sum

@@ -213,6 +213,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
 github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
 github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=

+ 13 - 1
data_mgo_to_tidb_project/main.go

@@ -3,6 +3,8 @@ package main
 import (
 	"data_mgo_to_tidb_project/config"
 
+	"github.com/robfig/cron"
+
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 )
@@ -24,8 +26,18 @@ func init() {
 }
 
 func main() {
+	//存量
+	// Projectset("", "")
 
-	Projectset()
+	// return
+	//增量
+	cr := cron.New()
+	cr.AddFunc(config.Conf.Info.Crontab, func() {
+		if gteid, ltid := GetCurTimePiInfo(); gteid != "" && ltid != "" {
+			Projectset(gteid, ltid)
+		}
+	})
+	cr.Start()
 
 	select {}
 }

+ 31 - 8
data_mgo_to_tidb_project/service.go

@@ -18,7 +18,7 @@ import (
 )
 
 //projectset同步
-func Projectset() {
+func Projectset(gteId, ltId string) {
 	sess := MongoP.GetMgoConn()
 	defer MongoP.DestoryMongoConn(sess)
 
@@ -26,19 +26,37 @@ func Projectset() {
 	wg := &sync.WaitGroup{}
 
 	query := map[string]interface{}{}
-	if config.Conf.Info.ProjectsetIdGt != "" {
+	if gteId != "" && ltId != "" {
+		query = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt": mongodb.StringTOBsonId(gteId),
+				"$lt": mongodb.StringTOBsonId(ltId),
+			},
+		}
+		log.Info("info", zap.Any("ids_zengliang", map[string]interface{}{
+			"gteId": gteId,
+			"lteId": ltId,
+		}))
+
+	} else if config.Conf.Info.ProjectsetIdGt != "" {
 		query = map[string]interface{}{
 			"_id": map[string]interface{}{
 				"$gt": mongodb.StringTOBsonId(config.Conf.Info.ProjectsetIdGt),
 				"$lt": mongodb.StringTOBsonId(config.Conf.Info.ProjectsetIdLt),
 			},
 		}
+
+		log.Info("info", zap.Any("ids_cunliang", map[string]interface{}{
+			"gteId": config.Conf.Info.ProjectsetIdGt,
+			"lteId": config.Conf.Info.ProjectsetIdLt,
+		}))
 	}
 	log.Info("info", zap.Any("query", query))
+
 	it := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.Info.TableName).Find(query).Sort("-_id").Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
-		if count%20000 == 0 {
+		if count%10000 == 0 {
 			log.Info(fmt.Sprintf("current --- %d ,%s", count, tmp["_id"]))
 		}
 		ch <- true
@@ -149,9 +167,13 @@ func dwd_f_bid_project_final_package(tx *sql.Tx, tmp map[string]interface{}) boo
 				saveMap := map[string]interface{}{}
 				//
 				saveMap["s_projectid"] = _id
-				saveMap["s_packagecode"] = gconv.String(vv["origin"]) //标(包)段编号
-				saveMap["s_packagename"] = gconv.String(vv["name"])   //标段名称
-				saveMap["s_detail"] = gconv.String(vv["text"])        //标段内容
+				packagecode := gconv.String(vv["origin"])
+				if VarcharCheck(packagecode, 255) {
+					continue
+				}
+				saveMap["s_packagecode"] = packagecode              //标(包)段编号
+				saveMap["s_packagename"] = gconv.String(vv["name"]) //标段名称
+				saveMap["s_detail"] = gconv.String(vv["text"])      //标段内容
 				//
 				TransferMoneyRateInfo(vv, &saveMap, []string{"budget", "bidamount"}, []float64{1000000000.0, 1000000000.0})
 				//
@@ -273,8 +295,9 @@ func InsertGlobalMysqlData(tx *sql.Tx, name string, data map[string]interface{},
 	id := MysqlTool.InsertByTx(tx, name, data)
 	if id == -1 {
 		log.Info("插入数据异常", zap.String(name, mark))
-		Alert(fmt.Sprintf("%s表 检测到异常数据同步,id:%s </br>", name, mark))
-
+		if config.Conf.Alarm.IsOpen {
+			Alert(fmt.Sprintf("%s表 检测到异常数据同步,id:%s </br>", name, mark))
+		}
 	}
 	return id
 }

+ 12 - 0
data_mgo_to_tidb_project/util.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"log"
 	"reflect"
+	"strconv"
 	"strings"
 	"time"
 	"unicode/utf8"
@@ -175,3 +176,14 @@ func GetObjectId() {
 func VarcharCheck(str string, length int) bool {
 	return utf8.RuneCountInString(str) > length
 }
+
+//获取每天0点数据
+func GetCurTimePiInfo() (string, string) {
+	now := time.Now()
+	start := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local).Unix()
+	end := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
+	gteid := strconv.FormatInt(start, 16) + "0000000000000000"
+	ltid := strconv.FormatInt(end, 16) + "0000000000000000"
+	log.Println("curtime:", gteid, ltid, "==", start, end)
+	return gteid, ltid
+}

+ 1 - 0
data_tidb/go.mod

@@ -5,6 +5,7 @@ go 1.16
 require (
 	github.com/BurntSushi/toml v1.2.0
 	github.com/olivere/elastic/v7 v7.0.32
+	github.com/robfig/cron v1.2.0
 	github.com/shopspring/decimal v1.3.1
 	github.com/spf13/cobra v1.5.0
 	go.mongodb.org/mongo-driver v1.10.3

+ 2 - 0
data_tidb/go.sum

@@ -82,6 +82,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
 github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=

+ 15 - 5
data_tidb/main.go

@@ -4,6 +4,7 @@ import (
 	"data_tidb/config"
 	"encoding/json"
 	"fmt"
+	"github.com/robfig/cron"
 	"github.com/spf13/cobra"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
@@ -19,6 +20,8 @@ import (
 
 var (
 	UdpClient udp.UdpClient
+
+	Pici int64
 )
 
 func init() {
@@ -338,7 +341,6 @@ func relation() *cobra.Command {
 // @Description 项目数据(目前仅关系表数据)
 // @Author J 2022/9/20 17:52
 func projectAdd() *cobra.Command {
-	var pici int64
 	cmdClient := &cobra.Command{
 		Use:   "project",
 		Short: "Start processing project data",
@@ -347,10 +349,18 @@ func projectAdd() *cobra.Command {
 			//go SaveProTagFunc()
 			//go SaveProbFunc()
 			go SaveRelationFunc()
-			taskPAdd(pici)
+
+			taskPAdd()
+
+			crn := cron.New()
+			cronstr := "0 10 * * * *" // 每30min执行一次
+			_ = crn.AddFunc(cronstr, func() {
+				taskPAdd()
+			})
+			crn.Start()
 		},
 	}
-	cmdClient.Flags().Int64VarP(&pici, "pici", "p", 0, "")
+	cmdClient.Flags().Int64VarP(&Pici, "pici", "p", 0, "")
 	return cmdClient
 }
 
@@ -788,7 +798,7 @@ func SaveRelationFunc() {
 					defer func() {
 						<-saveRelationSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bpmc_relation_new", RelationField, arru...)
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -800,7 +810,7 @@ func SaveRelationFunc() {
 					defer func() {
 						<-saveRelationSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bpmc_relation_new", RelationField, arru...)
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0

+ 6 - 3
data_tidb/project.go

@@ -76,20 +76,23 @@ func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int
 	}
 }
 
-func taskPAdd(pici int64) {
+func taskPAdd() {
 	sess := MongoP.GetMgoConn()
 	defer MongoP.DestoryMongoConn(sess)
 
 	ch := make(chan bool, 20)
 	wg := &sync.WaitGroup{}
 
-	q := bson.M{"pici": bson.M{"$gt": pici}}
-	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding_back").Find(q).Iter()
+	q := bson.M{"pici": bson.M{"$gt": Pici}}
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("projectset_20230904").Find(q).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%20000 == 0 {
 			log.Info(fmt.Sprintf("current --- %d", count))
 		}
+		if t := util.Int64All(tmp["pici"]); t > Pici {
+			Pici = t
+		}
 		ch <- true
 		wg.Add(1)
 		go func(tmp map[string]interface{}) {

BIN
field_py/field_dispose_1786_linux


+ 18 - 3
field_sync/main.go

@@ -45,7 +45,7 @@ func init() {
 	InitFileInfo()
 	InitLog()
 	InitMgo()
-	//inits()
+	inits()
 	redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.DbIndex)
 
 	log.Info("init success")
@@ -53,7 +53,7 @@ func init() {
 
 func main() {
 	go checkMapJob()
-	//go nsqMethod()
+	go nsqMethod()
 
 	go UpdateBidding()
 	go UpdateExtract()
@@ -62,6 +62,21 @@ func main() {
 	UdpClient.Listen(processUdpMsg)
 	log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
 
+	info, _ := MgoB.Find("bidding_processing_ids", `{"dataprocess": 6}`, bson.M{"_id": 1}, nil, false, -1, -1)
+	util.Debug(len(*info))
+	log.Info("", zap.Int("size", len(*info)))
+	if len(*info) > 0 {
+		for i, m := range *info {
+			mapInfo := make(map[string]interface{})
+			mapInfo["gtid"] = util.ObjToString(m["gtid"])
+			mapInfo["lteid"] = util.ObjToString(m["lteid"])
+			mapInfo["stype"] = "bidding"
+			mapInfo["key"] = fmt.Sprintf("%s-%s-bidding", util.ObjToString(m["gtid"]), util.ObjToString(m["lteid"]))
+			log.Info(fmt.Sprint(i), zap.Any("--", mapInfo))
+			biddingTask(nil, mapInfo)
+		}
+	}
+
 	ch := make(chan bool, 1)
 	<-ch
 }
@@ -110,7 +125,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					biddingAllTask(data, mapInfo)
 				}()
 			case "monitor":
-				log.Info("monitor", zap.Any("mapInfo:", mapInfo))
+				//
 			default:
 				pool <- true
 				go func() {

+ 3 - 3
field_sync/task.go

@@ -328,9 +328,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		//------------------对比结束
 
 		//处理key descript
-		//if bkey == "" {
-		//	DealInfo(&tmp, &update)
-		//}
+		if bkey == "" {
+			DealInfo(&tmp, &update)
+		}
 		// entidlist
 		extractMap := make(map[string]interface{})
 		if update["s_winner"] != "" {

BIN
monitor/monitor_linux