浏览代码

feat:xiugai

wangchuanjin 2 天之前
父节点
当前提交
5554e91e64
共有 4 个文件被更改,包括 111 次插入25 次删除
  1. 8 8
      config.yaml
  2. 1 0
      lastId.json
  3. 98 13
      main.go
  4. 4 4
      main_test.go

+ 8 - 8
config.yaml

@@ -1,21 +1,20 @@
 udpPort: ":1889"
 mongodb:
   main:
-    mongodbAddr: "172.20.45.128:27080,172.31.31.202:27081"
+    #mongodbAddr: "172.20.45.128:27080,172.31.31.202:27081"
     dbName: "qfw"
-    userName: "JS2Z_Rbid_ProG"
-    password: "JS2z@S1e3aR5Ch"
+    #userName: "JS2Z_Rbid_ProG"
+    #password: "JS2z@S1e3aR5Ch"
     size: 8
-    #mongodbAddr: "172.20.45.129:27002"
-    #dbName: "qfw"
+    mongodbAddr: "172.20.45.129:27002"
   extract:
-    mongodbAddr: "172.17.4.85:27080"
+    mongodbAddr: "172.20.47.168:27080"
     dbName: "qfw"
     size: 8
     #mongodbAddr: "172.20.45.129:27002"
     #dbName: "wcj"
   site:
-    mongodbAddr: "172.17.4.87:27080"
+    mongodbAddr: "172.20.47.168:27080"
     size: 8
     dbName: "editor"
 scoringPoolSize: 5
@@ -30,4 +29,5 @@ logger:
 nextNode:
   stype: "bidding"
   addr: "127.0.0.1"
-  port: 889
+  port: 889
+esDataProcess: 9

+ 1 - 0
lastId.json

@@ -0,0 +1 @@
+{"lastId":"67c276463309c0998b665edb","startTime":1755705600}

+ 98 - 13
main.go

@@ -6,8 +6,10 @@ import (
 	"dataPrefer/service"
 	"encoding/json"
 	"flag"
+	"github.com/gogf/gf/v2/encoding/gjson"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gctx"
+	"github.com/gogf/gf/v2/os/gfile"
 	"gopkg.in/natefinch/lumberjack.v2"
 	"io"
 	mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
@@ -18,8 +20,15 @@ import (
 	"time"
 )
 
-var udpclient mu.UdpClient
-var lock = &sync.Mutex{}
+const (
+	Collection_Ids = "bidding_processing_ids"
+	LastIdPath     = "./lastId.json"
+)
+
+var (
+	udpclient mu.UdpClient
+	lock      = &sync.Mutex{}
+)
 
 func main() {
 	model := flag.Int("m", 0, "1:非定时任务")
@@ -59,17 +68,7 @@ func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 			log.Println("接收到udp消息", string(data))
 			sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
 			if qu.ObjToString(mapInfo["stype"]) == "bidding" {
-				lock.Lock()
-				service.IncDataById(sid, eid)
-				sendNextNode(nil, sid, eid)
-				for {
-					time.Sleep(10 * time.Second)
-					if db.Mgo_Main.Count("bidding_processing_ids", map[string]interface{}{"lteid": eid, "dataprocess": 9}) > 0 {
-						break
-					}
-					log.Println(string(data), "es索引未生完,等待中。。。")
-				}
-				lock.Unlock()
+				execute(nil, sid, eid)
 			} else {
 				sendNextNode(data, "", "")
 			}
@@ -79,6 +78,32 @@ func ProcessUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		log.Println("其他节点回应:", str)
 	}
 }
+func execute(ids_id interface{}, sid, eid string) {
+	lock.Lock()
+	defer lock.Unlock()
+	service.IncDataById(sid, eid)
+	sendNextNode(nil, sid, eid)
+	for {
+		time.Sleep(10 * time.Second)
+		if db.Mgo_Main.Count(Collection_Ids, map[string]interface{}{"lteid": eid, "dataprocess": g.Config().MustGet(gctx.New(), "esDataProcess").Int()}) > 0 {
+			break
+		}
+		log.Println("sid", sid, "eid", eid, "es索引未生完,等待中。。。")
+	}
+	if jn, err := gjson.Load(LastIdPath); err != nil {
+		log.Println("加载", LastIdPath, "出错", err)
+	} else if eid > jn.Get("lastId").String() {
+		jn.Set("lastId", eid)
+		if err := gfile.PutContents(LastIdPath, jn.String()); err != nil {
+			log.Println("覆盖", LastIdPath, "出错", err)
+		}
+	}
+	if ids_id == nil {
+		idsProcOver(sid, eid)
+	} else {
+		updateIdsById(ids_id, sid, eid)
+	}
+}
 
 // 下节点发送
 func sendNextNode(by []byte, sid string, eid string) {
@@ -100,5 +125,65 @@ func sendNextNode(by []byte, sid string, eid string) {
 }
 
 func repair() {
+	jn, err := gjson.Load(LastIdPath)
+	if err != nil {
+		log.Println("程序启动,修补数据加载", LastIdPath, "出错", err)
+		return
+	}
+	lastId := jn.Get("lastId").String()
+	if lastId == "" {
+		log.Println("程序启动,修补数据lastId异常为空!")
+		return
+	}
+	startTime := jn.Get("startTime").Int64()
+	if startTime == 0 {
+		log.Println("程序启动后,修补数据startTime异常为0!")
+		return
+	}
+	sess := db.Mgo_Main.GetMgoConn()
+	defer db.Mgo_Main.DestoryMongoConn(sess)
+	it := sess.DB(db.Mgo_Main.DbName).C(Collection_Ids).Find(map[string]interface{}{"gtid": map[string]interface{}{"$gte": lastId}, "dataprocess": 8, "createtime": map[string]interface{}{"$gte": startTime}}).Select(map[string]interface{}{"_id": 1, "gtid": 1, "lteid": 1}).Sort().Iter()
+	index := 0
+	for m := make(map[string]interface{}); it.Next(&m); {
+		sid, _ := m["gtid"].(string)
+		eid, _ := m["lteid"].(string)
+		if sid == "" || eid == "" {
+			log.Println("sid", sid, "eid", eid, "异常,过滤掉!")
+			continue
+		}
+		index++
+		execute(m["_id"], sid, eid)
+		m = make(map[string]interface{})
+	}
+	log.Println("程序启动后,修补id段数据", index)
+}
+
+// id段处理完,更新状态
+func idsProcOver(sid, eid string) {
+	query := map[string]interface{}{
+		"gtid": map[string]interface{}{
+			"$gte": sid,
+		},
+		"lteid": map[string]interface{}{
+			"$lte": eid,
+		},
+	}
+	datas, _ := db.Mgo_Main.Find(Collection_Ids, query, nil, `{"_id":1}`, false, 0, -1)
+	if datas == nil || len(*datas) == 0 {
+		log.Println("未查询到记录id段落~", query)
+	}
+	log.Println("开始更新流程段落记录", len(*datas))
+	for _, v := range *datas {
+		updateIdsById(v["_id"], sid, eid)
+	}
+}
 
+func updateIdsById(_id interface{}, sid, eid string) {
+	ok := db.Mgo_Main.UpdateById(Collection_Ids, _id, map[string]interface{}{
+		"$set": map[string]interface{}{
+			"dataprocess": 10,
+			"updatetime":  time.Now().Unix(),
+		},
+	})
+	log.Println(_id, "流程段落记录更新完毕", sid, eid, ok)
 }

+ 4 - 4
main_test.go

@@ -16,14 +16,14 @@ func TestSendNextNode(t *testing.T) {
 	uc := mu.UdpClient{Local: ":880", BufSize: 1024}
 	uc.Listen(ProcessUdpMsg)
 	by, _ := json.Marshal(map[string]interface{}{
-		"gtid":  "1",
-		"lteid": "2",
+		"gtid":  "67c11e863309c0998b6172ac",
+		"lteid": "67c276463309c0998b665edb",
 		"stype": "bidding",
-		"key":   "1-2-bidding",
+		"key":   "67c11e863309c0998b6172ac-67c276463309c0998b665edb-bidding",
 	})
 	addr := &net.UDPAddr{
 		IP:   net.ParseIP("127.0.0.1"),
-		Port: 889,
+		Port: 1889,
 	}
 	log.Println(uc.WriteUdp(by, mu.OP_TYPE_DATA, addr)) //发送下节点
 }