jianghan7 1 年之前
父节点
当前提交
ccab592db8
共有 7 个文件被更改,包括 45 次插入14 次删除
  1. 二进制
      data_tidb/data_tidb
  2. 1 0
      data_tidb/go.mod
  3. 2 0
      data_tidb/go.sum
  4. 15 5
      data_tidb/main.go
  5. 6 3
      data_tidb/project.go
  6. 18 3
      field_sync/main.go
  7. 3 3
      field_sync/task.go

二进制
data_tidb/data_tidb


+ 1 - 0
data_tidb/go.mod

@@ -5,6 +5,7 @@ go 1.16
 require (
 	github.com/BurntSushi/toml v1.2.0
 	github.com/olivere/elastic/v7 v7.0.32
+	github.com/robfig/cron v1.2.0
 	github.com/shopspring/decimal v1.3.1
 	github.com/spf13/cobra v1.5.0
 	go.mongodb.org/mongo-driver v1.10.3

+ 2 - 0
data_tidb/go.sum

@@ -82,6 +82,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
+github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
 github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=

+ 15 - 5
data_tidb/main.go

@@ -4,6 +4,7 @@ import (
 	"data_tidb/config"
 	"encoding/json"
 	"fmt"
+	"github.com/robfig/cron"
 	"github.com/spf13/cobra"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
@@ -19,6 +20,8 @@ import (
 
 var (
 	UdpClient udp.UdpClient
+
+	Pici int64
 )
 
 func init() {
@@ -338,7 +341,6 @@ func relation() *cobra.Command {
 // @Description 项目数据(目前仅关系表数据)
 // @Author J 2022/9/20 17:52
 func projectAdd() *cobra.Command {
-	var pici int64
 	cmdClient := &cobra.Command{
 		Use:   "project",
 		Short: "Start processing project data",
@@ -347,10 +349,18 @@ func projectAdd() *cobra.Command {
 			//go SaveProTagFunc()
 			//go SaveProbFunc()
 			go SaveRelationFunc()
-			taskPAdd(pici)
+
+			taskPAdd()
+
+			crn := cron.New()
+			cronstr := "0 10 * * * *" // 每30min执行一次
+			_ = crn.AddFunc(cronstr, func() {
+				taskPAdd()
+			})
+			crn.Start()
 		},
 	}
-	cmdClient.Flags().Int64VarP(&pici, "pici", "p", 0, "")
+	cmdClient.Flags().Int64VarP(&Pici, "pici", "p", 0, "")
 	return cmdClient
 }
 
@@ -788,7 +798,7 @@ func SaveRelationFunc() {
 					defer func() {
 						<-saveRelationSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bpmc_relation_new", RelationField, arru...)
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -800,7 +810,7 @@ func SaveRelationFunc() {
 					defer func() {
 						<-saveRelationSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bpmc_relation_new", RelationField, arru...)
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0

+ 6 - 3
data_tidb/project.go

@@ -76,20 +76,23 @@ func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int
 	}
 }
 
-func taskPAdd(pici int64) {
+func taskPAdd() {
 	sess := MongoP.GetMgoConn()
 	defer MongoP.DestoryMongoConn(sess)
 
 	ch := make(chan bool, 20)
 	wg := &sync.WaitGroup{}
 
-	q := bson.M{"pici": bson.M{"$gt": pici}}
-	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding_back").Find(q).Iter()
+	q := bson.M{"pici": bson.M{"$gt": Pici}}
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("projectset_20230904").Find(q).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%20000 == 0 {
 			log.Info(fmt.Sprintf("current --- %d", count))
 		}
+		if t := util.Int64All(tmp["pici"]); t > Pici {
+			Pici = t
+		}
 		ch <- true
 		wg.Add(1)
 		go func(tmp map[string]interface{}) {

+ 18 - 3
field_sync/main.go

@@ -45,7 +45,7 @@ func init() {
 	InitFileInfo()
 	InitLog()
 	InitMgo()
-	//inits()
+	inits()
 	redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.DbIndex)
 
 	log.Info("init success")
@@ -53,7 +53,7 @@ func init() {
 
 func main() {
 	go checkMapJob()
-	//go nsqMethod()
+	go nsqMethod()
 
 	go UpdateBidding()
 	go UpdateExtract()
@@ -62,6 +62,21 @@ func main() {
 	UdpClient.Listen(processUdpMsg)
 	log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
 
+	info, _ := MgoB.Find("bidding_processing_ids", `{"dataprocess": 6}`, bson.M{"_id": 1}, nil, false, -1, -1)
+	util.Debug(len(*info))
+	log.Info("", zap.Int("size", len(*info)))
+	if len(*info) > 0 {
+		for i, m := range *info {
+			mapInfo := make(map[string]interface{})
+			mapInfo["gtid"] = util.ObjToString(m["gtid"])
+			mapInfo["lteid"] = util.ObjToString(m["lteid"])
+			mapInfo["stype"] = "bidding"
+			mapInfo["key"] = fmt.Sprintf("%s-%s-bidding", util.ObjToString(m["gtid"]), util.ObjToString(m["lteid"]))
+			log.Info(fmt.Sprint(i), zap.Any("--", mapInfo))
+			biddingTask(nil, mapInfo)
+		}
+	}
+
 	ch := make(chan bool, 1)
 	<-ch
 }
@@ -110,7 +125,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 					biddingAllTask(data, mapInfo)
 				}()
 			case "monitor":
-				log.Info("monitor", zap.Any("mapInfo:", mapInfo))
+				//
 			default:
 				pool <- true
 				go func() {

+ 3 - 3
field_sync/task.go

@@ -328,9 +328,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		//------------------对比结束
 
 		//处理key descript
-		//if bkey == "" {
-		//	DealInfo(&tmp, &update)
-		//}
+		if bkey == "" {
+			DealInfo(&tmp, &update)
+		}
 		// entidlist
 		extractMap := make(map[string]interface{})
 		if update["s_winner"] != "" {