Browse Source

添加 高质量库 定时任务

wcc 1 year ago
parent
commit
731c949ae2
7 changed files with 432 additions and 4 deletions
  1. 19 0
      highMark/config.json
  2. 203 0
      highMark/main.go
  3. 9 0
      src/config.json
  4. 159 0
      src/crons/high_mark.go
  5. 24 3
      src/main.go
  6. 17 0
      src/util/config.go
  7. 1 1
      src/util/elasticSim.go

+ 19 - 0
highMark/config.json

@@ -0,0 +1,19 @@
+{
+  "bidding": {
+    "addr": "127.0.0.1:27082",
+    "db": "jyqykhfw",
+    "coll": "f_task",
+    "size": 10,
+    "username": "",
+    "password": ""
+  },
+  "bidding_high": {
+    "addr": "192.168.3.166:27082",
+    "db": "qfw_high",
+    "coll": "wcc_high_bidding",
+    "size": 10,
+    "username": "",
+    "password": ""
+  },
+  "spec": "0 */1 * * * *"
+}

+ 203 - 0
highMark/main.go

@@ -0,0 +1,203 @@
+package highMark
+
+import (
+	"fmt"
+	"mongodb"
+	"os"
+	"qfw/util"
+	"time"
+)
+
+var (
+	Mgo, MgoH                             *mongodb.MongodbSim
+	Sysconfig, bidddingConf, biddingHConf map[string]interface{}
+)
+
+func Init() {
+	util.ReadConfig(&Sysconfig)
+	s := Sysconfig
+	fmt.Println(s)
+	bidddingConf = Sysconfig["bidding"].(map[string]interface{})
+	Mgo = &mongodb.MongodbSim{
+		MongodbAddr: bidddingConf["addr"].(string),
+		Size:        util.IntAllDef(bidddingConf["size"], 5),
+		DbName:      bidddingConf["db"].(string),
+		UserName:    bidddingConf["username"].(string),
+		Password:    bidddingConf["password"].(string),
+		//Direct:   true,
+	}
+	Mgo.InitPool()
+
+	biddingHConf = Sysconfig["bidding_high"].(map[string]interface{})
+	//高质量库
+	MgoH = &mongodb.MongodbSim{
+		MongodbAddr: biddingHConf["addr"].(string),
+		Size:        util.IntAllDef(biddingHConf["size"], 5),
+		DbName:      biddingHConf["db"].(string),
+		UserName:    biddingHConf["username"].(string),
+		Password:    biddingHConf["password"].(string),
+		//Direct:      true,
+	}
+
+	MgoH.InitPool()
+
+}
+
+//func main() {
+//	Init()
+//	c := cron.New()
+//	err := c.AddFunc(Sysconfig["spec"].(string), Mark)
+//	if err != nil {
+//		util.Debug("err", err)
+//	}
+//
+//	c.Start()
+//	defer c.Stop()
+//
+//	select {}
+//
+//	//highMark()
+//}
+
+func Mark() {
+	go highMark()
+}
+
+func highMark() {
+	defer util.Catch()
+	sess := Mgo.GetMgoConn()
+	defer Mgo.DestoryMongoConn(sess)
+
+	taskQuery := map[string]interface{}{
+		"s_stype":  "group",
+		"s_status": "已完成",
+		"is_return_highdata": map[string]interface{}{
+			"$exists": 0,
+		},
+	}
+
+	fields, _ := Mgo.Find("high_fields", nil, `{"sort":1}`, nil, false, -1, -1)
+	if len(*fields) == 0 {
+		util.Debug("字段顺序配置为空")
+		os.Exit(1)
+	}
+
+	tasks, _ := Mgo.Find("f_task", taskQuery, nil, nil, false, -1, -1)
+	util.Debug("本次处理任务总数:", len(*tasks))
+
+	for _, task := range *tasks {
+		util.Debug("开始处理任务数据:", task["s_groupname"], task["s_entname"])
+		taskID := mongodb.BsonIdToSId(task["_id"])
+		//任务对应的数据表
+		s_sourceinfo := util.ObjToString(task["s_sourceinfo"])
+		q := map[string]interface{}{
+			"s_grouptaskid": map[string]interface{}{
+				"$exists": 1,
+			},
+		}
+
+		query := sess.DB(bidddingConf["db"].(string)).C(s_sourceinfo).Find(&q).Select(nil).Iter()
+		count := 0
+		for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+			infoID := mongodb.BsonIdToSId(tmp["_id"])
+			if count%1000 == 0 {
+				util.Debug(fmt.Sprintf(" %v  deal current --- %d", task["s_entname"], count))
+			}
+			//找到标注数据结果
+			marked, _ := Mgo.FindById("marked", infoID, nil)
+			markedData := *marked
+			//计算标注 结果
+			//标注结果,十进制数字
+			if markedData["v_taginfo"] == nil {
+				continue
+			}
+			taginfo := markedData["v_taginfo"].(map[string]interface{})
+			res := calculateFlag(taginfo, *fields) //返回标注的十进制数字
+
+			if data, ok := markedData["v_baseinfo"].(map[string]interface{}); ok {
+				data["_id"] = tmp["_id"]
+				data["field_bitvalue"] = res
+				data["i_comeintime"] = time.Now().Unix()
+				data["i_updatetime"] = time.Now().Unix()
+
+				update := make(map[string]interface{})
+				update["$set"] = data
+				where := map[string]interface{}{
+					"_id": tmp["_id"],
+				}
+
+				if !MgoH.Update(util.ObjToString(biddingHConf["coll"]), where, update, true, false) {
+					util.Debug("任务 ", task["s_groupname"], infoID, "入库错误,请检查")
+				} else {
+					//1、更新数据源信息
+					setResult := map[string]interface{}{ //更新字段集
+						"is_return_highdata":  1,
+						"return_highdatetime": time.Now().Unix(),
+					}
+					set := map[string]interface{}{
+						"$set": setResult,
+					}
+					Mgo.UpdateById(s_sourceinfo, infoID, set)
+				}
+			}
+
+		}
+		util.Debug("任务: ", task["s_entname"], "数据表: ", s_sourceinfo, " 处理总数为: ", count, "分配的数据总量为: ", task["i_givenum"])
+
+		if count > 0 {
+			//当前任务结束
+			//3.更新任务表,
+			taskSetResult := map[string]interface{}{ //更新字段集
+				"is_return_highdata": 1,
+			}
+			taskSet := map[string]interface{}{
+				"$set": taskSetResult,
+			}
+			Mgo.UpdateById("f_task", taskID, taskSet)
+			//4. 记录任务中入高质量库数据
+			taskInsert := map[string]interface{}{
+				"task_id":         taskID,            //任务ID
+				"high_mark_count": count,             // 标注入高质量数据
+				"given_count":     task["i_givenum"], //任务分配数量
+				"createtime":      time.Now().Unix(),
+				"updatetime":      time.Now().Unix(),
+			}
+			Mgo.Save("high_result", taskInsert)
+		} else {
+			util.Debug(task["s_entname"], "数据表:", s_sourceinfo, "获取的数据总数为:", count, "分配的数据总量为:", task["i_givenum"])
+
+		}
+
+		util.Debug(task["s_groupname"], "数据处理完毕")
+	}
+
+	util.Debug("所有任务处理完毕")
+}
+
+//calculateFlag 根据数据,返回被标注的字段数字
+func calculateFlag(marked map[string]interface{}, data []map[string]interface{}) uint64 {
+	var result uint64
+	for _, item := range data {
+		name, ok := item["name"].(string)
+		if !ok {
+			continue
+		}
+
+		sort, ok := item["sort"].(int32)
+
+		if !ok {
+			continue
+		}
+
+		// 根据字段名称查找对应的标记值
+		_, ok = marked[name]
+		if !ok {
+			continue
+		}
+
+		// 通过位运算将标记值放置到正确的位置
+		result |= 1 << (sort - 1)
+	}
+
+	return result
+}

+ 9 - 0
src/config.json

@@ -398,5 +398,14 @@
                 "descript": "采购单位联系电话"
             }
         ]
+    },
+    "high_mark":{
+        "addr":"192.168.3.166:27082",
+        "db": "qfw_high",
+        "coll": "bidding",
+        "size": 10,
+        "username": "",
+        "password": "",
+        "spec": "0 38 14 * * *"
     }
 }

+ 159 - 0
src/crons/high_mark.go

@@ -0,0 +1,159 @@
+package crons
+
+import (
+	"fmt"
+	"mongodb"
+	"os"
+	"qfw/util"
+	"time"
+	util2 "util"
+)
+
+func Test() {
+	fmt.Println("aaaa")
+}
+
+func MarkTask() {
+	go highMark()
+}
+
+//highMark 高质量库数据
+func highMark() {
+	defer util.Catch()
+	sess := util2.Mgo.GetMgoConn()
+	defer util2.Mgo.DestoryMongoConn(sess)
+	biddingHConf := util2.Sysconfig["high_mark"].(map[string]interface{})
+
+	taskQuery := map[string]interface{}{
+		"s_stype":  "group",
+		"s_status": "已完成",
+		"is_return_highdata": map[string]interface{}{
+			"$exists": 0,
+		},
+	}
+
+	fields, _ := util2.Mgo.Find("high_fields", nil, `{"sort":1}`, nil, false, -1, -1)
+	if len(*fields) == 0 {
+		util.Debug("字段顺序配置为空")
+		os.Exit(1)
+	}
+
+	tasks, _ := util2.Mgo.Find("f_task", taskQuery, nil, nil, false, -1, -1)
+	util.Debug("本次处理任务总数:", len(*tasks))
+
+	for _, task := range *tasks {
+		util.Debug("开始处理任务数据:", task["s_groupname"], task["s_entname"])
+		taskID := mongodb.BsonIdToSId(task["_id"])
+		//任务对应的数据表
+		s_sourceinfo := util.ObjToString(task["s_sourceinfo"])
+		q := map[string]interface{}{
+			"s_grouptaskid": map[string]interface{}{
+				"$exists": 1,
+			},
+		}
+
+		query := sess.DB(util2.Mgo.DbName).C(s_sourceinfo).Find(&q).Select(nil).Iter()
+		count := 0
+		for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+			infoID := mongodb.BsonIdToSId(tmp["_id"])
+			if count%1000 == 0 {
+				util.Debug(fmt.Sprintf(" %v  deal current --- %d", task["s_entname"], count))
+			}
+			//找到标注数据结果
+			marked, _ := util2.Mgo.FindById("marked", infoID, nil)
+			markedData := *marked
+			//计算标注 结果
+			//标注结果,十进制数字
+			if markedData["v_taginfo"] == nil {
+				continue
+			}
+			taginfo := markedData["v_taginfo"].(map[string]interface{})
+			res := calculateFlag(taginfo, *fields) //返回标注的十进制数字
+
+			if data, ok := markedData["v_baseinfo"].(map[string]interface{}); ok {
+				data["_id"] = tmp["_id"]
+				data["field_bitvalue"] = res
+				data["i_comeintime"] = time.Now().Unix()
+				data["i_updatetime"] = time.Now().Unix()
+
+				update := make(map[string]interface{})
+				update["$set"] = data
+				where := map[string]interface{}{
+					"_id": tmp["_id"],
+				}
+
+				if !util2.MgoHM.Update(util.ObjToString(biddingHConf["coll"]), where, update, true, false) {
+					util.Debug("任务 ", task["s_groupname"], infoID, "入库错误,请检查")
+				} else {
+					//1、更新数据源信息
+					setResult := map[string]interface{}{ //更新字段集
+						"is_return_highdata":  1,
+						"return_highdatetime": time.Now().Unix(),
+					}
+					set := map[string]interface{}{
+						"$set": setResult,
+					}
+					util2.Mgo.UpdateById(s_sourceinfo, infoID, set)
+				}
+			}
+
+		}
+		util.Debug("任务: ", task["s_entname"], "数据表: ", s_sourceinfo, " 处理总数为: ", count, "分配的数据总量为: ", task["i_givenum"])
+
+		if count > 0 {
+			//当前任务结束
+			//3.更新任务表,
+			taskSetResult := map[string]interface{}{ //更新字段集
+				"is_return_highdata": 1,
+			}
+			taskSet := map[string]interface{}{
+				"$set": taskSetResult,
+			}
+			util2.Mgo.UpdateById("f_task", taskID, taskSet)
+			//4. 记录任务中入高质量库数据
+			taskInsert := map[string]interface{}{
+				"task_id":         taskID,            //任务ID
+				"high_mark_count": count,             // 标注入高质量数据
+				"given_count":     task["i_givenum"], //任务分配数量
+				"createtime":      time.Now().Unix(),
+				"updatetime":      time.Now().Unix(),
+			}
+			util2.Mgo.Save("high_result", taskInsert)
+		} else {
+			util.Debug(task["s_entname"], "数据表:", s_sourceinfo, "获取的数据总数为:", count, "分配的数据总量为:", task["i_givenum"])
+
+		}
+
+		util.Debug(task["s_groupname"], "数据处理完毕")
+	}
+
+	util.Debug("所有任务处理完毕")
+}
+
+//calculateFlag 根据数据,返回被标注的字段数字
+func calculateFlag(marked map[string]interface{}, data []map[string]interface{}) uint64 {
+	var result uint64
+	for _, item := range data {
+		name, ok := item["name"].(string)
+		if !ok {
+			continue
+		}
+
+		sort, ok := item["sort"].(int32)
+
+		if !ok {
+			continue
+		}
+
+		// 根据字段名称查找对应的标记值
+		_, ok = marked[name]
+		if !ok {
+			continue
+		}
+
+		// 通过位运算将标记值放置到正确的位置
+		result |= 1 << (sort - 1)
+	}
+
+	return result
+}

+ 24 - 3
src/main.go

@@ -1,11 +1,13 @@
 package main
 
 import (
+	util "common_utils"
+	"crons"
 	_ "filter"
+	"front"
+	"github.com/cron"
 	qu "qfw/util"
 	"time"
-
-	"front"
 	. "util"
 
 	"github.com/go-xweb/xweb"
@@ -34,7 +36,26 @@ func init() {
 
 }
 
+func cronTask() {
+	defer util.Catch()
+	c := cron.New()
+	biddingHConf := Sysconfig["high_mark"].(map[string]interface{})
+	if biddingHConf["spec"] == nil || biddingHConf["spec"] == "" {
+		util.Debug("高质量库 定时任务 为空,请配置")
+	} else {
+		err := c.AddFunc(biddingHConf["spec"].(string), crons.MarkTask)
+		if err != nil {
+			util.Debug("cronTask err", err)
+		}
+
+		c.Start()
+		defer c.Stop()
+		select {}
+	}
+
+}
+
 func main() {
+	go cronTask()
 	xweb.Run(":" + qu.ObjToString(Sysconfig["port"]))
-
 }

+ 17 - 0
src/util/config.go

@@ -1,6 +1,7 @@
 package util
 
 import (
+	util "common_utils"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/mongo"
 	"mongodb"
@@ -37,6 +38,7 @@ var (
 	TopSubStypeArr      []string
 	TopSubStypeArr2     []string
 	FieldsArr           []map[string]interface{}
+	MgoHM               *mongodb.MongodbSim //高质量库
 
 	MgoBulkSize = 200
 )
@@ -98,6 +100,21 @@ func InitConfig() {
 	}
 	MgoB.InitPool()
 
+	biddingHConf := Sysconfig["high_mark"].(map[string]interface{})
+	if biddingHConf == nil || len(biddingHConf) == 0 {
+		util.Debug("high_mark 配置项为空,请配置高质量库参数")
+	}
+	//高质量库
+	MgoHM = &mongodb.MongodbSim{
+		MongodbAddr: biddingHConf["addr"].(string),
+		Size:        util.IntAllDef(biddingHConf["size"], 5),
+		DbName:      biddingHConf["db"].(string),
+		UserName:    biddingHConf["username"].(string),
+		Password:    biddingHConf["password"].(string),
+	}
+
+	MgoHM.InitPool()
+
 	esConf := Sysconfig["es"].(map[string]interface{})
 	Es = &Elastic{
 		S_esurl:  qu.ObjToString(esConf["addr"]), //http://172.17.145.170:9800

+ 1 - 1
src/util/elasticSim.go

@@ -5,7 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	es "gopkg.in/olivere/elastic.v7"
+	es "github.com/olivere/elastic/v7"
 	"log"
 	"qfw/util"
 	"runtime"