xuzhiheng vor 1 Jahr
Ursprung
Commit
e4bac1ca9d

+ 558 - 0
data_mgo_to_tidb/bidding.go

@@ -0,0 +1,558 @@
+package main
+
+import (
+	"data_tidb/config"
+	"fmt"
+	"github.com/shopspring/decimal"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"reflect"
+	"regexp"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+var (
+	regLetter = regexp.MustCompile("[a-z]*")
+)
+
+func doBiddingTask(gtid, lteid string, mapInfo map[string]interface{}) {
+	sess := MongoB.GetMgoConn()
+	defer MongoB.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+
+	stype := util.ObjToString(mapInfo["stype"])
+	q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId(gtid),
+		"$lte": mongodb.StringTOBsonId(lteid)}}
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%1000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if util.IntAll(tmp["dataprocess"]) != 8 {
+				return
+			}
+			if stype == "bidding_history" && tmp["history_updatetime"] == nil {
+				return
+			}
+			taskBase(tmp)
+			taskTags(tmp)
+			taskExpand(tmp)
+			taskAtts(tmp)
+			taskInfoformat(tmp)
+			taskIntent(tmp)
+			taskWinner(tmp)
+			taskPackage(tmp)
+			taskPur(tmp)
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
+func taskB() {
+	sess := MongoB.GetMgoConn()
+	defer MongoB.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+
+	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("634eac71911e1eb345b2d861")}
+	q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("632d42d667a6b0a2861eef92")}}
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding").Find(q).Sort("_id").Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+
+			if util.IntAll(tmp["extracttype"]) != -1 {
+				taskBase(tmp)
+				taskTags(tmp)
+				taskExpand(tmp)
+				taskAtts(tmp)
+				taskInfoformat(tmp)
+				taskIntent(tmp)
+				taskWinner(tmp)
+				//taskPackage(tmp)
+				taskPur(tmp)
+			}
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
+// @Description 基本信息
+// @Author J 2022/9/22 11:12
+func taskBase(tmp map[string]interface{}) {
+	saveM := make(map[string]interface{})
+	var errf []string // 异常字段
+	for _, f := range BaseField {
+		if f == "infoid" {
+			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+		} else if f == "area_code" {
+			if tmp["area"] != nil {
+				saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
+			}
+		} else if f == "city_code" {
+			if tmp["area"] != nil && tmp["city"] != nil {
+				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
+				saveM[f] = AreaCode[c]
+			}
+		} else if f == "district_code" {
+			if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
+				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
+				saveM[f] = AreaCode[c]
+			}
+		} else if f == "toptype_code" {
+			if obj := util.ObjToString(tmp["toptype"]); obj != "" {
+				saveM[f] = TopTypeCode[obj]
+			}
+		} else if f == "subtype_code" {
+			if obj := util.ObjToString(tmp["subtype"]); obj != "" {
+				saveM[f] = SubTypeCode[obj]
+			}
+		} else if f == "buyerclass_code" {
+			if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
+				saveM[f] = BuyerCode[obj]
+			}
+		} else if f == "createtime" || f == "updatetime" {
+			saveM[f] = time.Now().Format(util.Date_Full_Layout)
+		} else if f == "comeintime" || f == "publishtime" || f == "bidopentime" {
+			if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
+				t := util.Int64All(tmp[f])
+				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+			}
+		} else if f == "multipackage" || f == "isValidFile" {
+			if tmp[f] == nil {
+				saveM[f] = 0
+			} else {
+				saveM[f] = tmp[f]
+			}
+		} else if f == "buyer_id" {
+			if b := util.ObjToString(tmp["buyer"]); b != "" {
+				saveM["buyer"] = b
+				//if code := redis.GetStr("qyxy_id", b); code != "" {
+				if code := getNameId(b); code != "" {
+					saveM[f] = code
+				}
+			}
+		} else if f == "agency_id" {
+			if b := util.ObjToString(tmp["agency"]); b != "" {
+				saveM["agency"] = b
+				//if code := redis.GetStr("qyxy_id", b); code != "" {
+				if code := getNameId(b); code != "" {
+					saveM[f] = code
+				}
+			} else {
+				if tmp[f] != nil {
+					saveM[f] = tmp[f]
+				}
+			}
+		} else {
+			if tmp[f] != nil {
+				if BaseVMap[f] != nil {
+					var b bool
+					saveM[f], b = verifyF(f, tmp[f], BaseVMap[f])
+					// 保存异常字段数据
+					if b {
+						errf = append(errf, f)
+					}
+				} else {
+					saveM[f] = tmp[f]
+				}
+			}
+		}
+	}
+	saveBasePool <- saveM
+	if len(errf) > 0 {
+		saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")}
+	}
+}
+
+func getNameId(name string) string {
+	info := MysqlTool.FindOne("dws_f_ent_baseinfo", map[string]interface{}{"name": name}, "name_id", "")
+	if info != nil && (*info)["name_Id"] != nil {
+		return util.ObjToString((*info)["name_Id"])
+	} else {
+		return ""
+	}
+}
+
+// @Description 扩展信息
+// @Author J 2022/9/22 11:13
+func taskExpand(tmp map[string]interface{}) {
+	saveM := make(map[string]interface{})
+	var errf []string // 异常字段
+	for _, f := range ExpandField {
+		if f == "infoid" {
+			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+		} else if f == "project_startdate" || f == "project_completedate" || f == "signstarttime" || f == "bidendtime" || f == "bidstarttime" || f == "docstarttime" ||
+			f == "docendtime" || f == "signaturedate" || f == "signendtime" {
+			if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
+				t := util.Int64All(tmp[f])
+				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+			}
+		} else if f == "createtime" || f == "updatetime" {
+			saveM[f] = time.Now().Format(util.Date_Full_Layout)
+		} else if f == "bidway" {
+			if util.ObjToString(tmp[f]) == "电子投标" {
+				saveM[f] = 1
+			} else if util.ObjToString(tmp[f]) == "纸质投标" {
+				saveM[f] = 0
+			}
+		} else if f == "review_experts" {
+			if tmp[f] != nil {
+				if reflect.TypeOf(tmp[f]).String() == "string" {
+					saveM[f] = tmp[f]
+				} else if reflect.TypeOf(tmp[f]).String() == "[]interface {}" {
+					if arr, ok := tmp[f].([]interface{}); ok {
+						saveM[f] = strings.Join(util.ObjArrToStringArr(arr), ",")
+					}
+				}
+			}
+		} else if f == "bid_guarantee" || f == "contract_guarantee" {
+			if tmp[f] != nil {
+				if tmp[f].(bool) {
+					saveM[f] = 1
+				} else {
+					saveM[f] = 0
+				}
+			}
+		} else if f == "supervisorrate" || f == "agencyfee" {
+			if tmp[f] != nil {
+				if reflect.TypeOf(tmp[f]).String() == "string" {
+					v2, err := strconv.ParseFloat(strings.ReplaceAll(util.ObjToString(tmp[f]), "%", ""), 64)
+					if err != nil {
+						v, _ := decimal.NewFromFloat(v2).Div(decimal.NewFromFloat(float64(100))).Float64()
+						saveM[f] = v
+					}
+				} else {
+					saveM[f], _ = util.FormatFloat(util.Float64All(tmp[f]), 4)
+				}
+			}
+		} else if f == "project_duration" {
+			if tmp[f] != nil {
+				tmp[f] = util.IntAll(tmp[f])
+			}
+		} else {
+			if tmp[f] != nil {
+				if ExpandVMap[f] != nil {
+					var b bool
+					saveM[f], b = verifyF(f, tmp[f], ExpandVMap[f])
+					// 保存异常字段数据
+					if b {
+						errf = append(errf, f)
+					}
+				} else {
+					saveM[f] = tmp[f]
+				}
+			}
+		}
+	}
+	saveExpandPool <- saveM
+	if len(errf) > 0 {
+		saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")}
+	}
+}
+
+// @Description 标签记录
+// @Author J 2022/9/22 11:13
+func taskTags(tmp map[string]interface{}) {
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	if topArr, ok := tmp["topscopeclass"].([]interface{}); ok {
+		for _, i2 := range topArr {
+			tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母
+			code := TopScopeCode[tclass]
+			saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+			//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+		}
+	}
+	if subArr, ok := tmp["subscopeclass"].([]interface{}); ok {
+		for _, i2 := range subArr {
+			sc := strings.Split(util.ObjToString(i2), "_")
+			if len(sc) > 1 {
+				code := SubScopeCode[sc[1]]
+				saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+			}
+			//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+		}
+	}
+	if tArr, ok := tmp["certificate_class"].([]interface{}); ok {
+		for _, i2 := range tArr {
+			if util.ObjToString(i2) == "ISO" {
+				saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "01", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+				//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "01", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+			} else if util.ObjToString(i2) == "AAA" {
+				saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "02", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+				//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "02", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+			} else if util.ObjToString(i2) == "ISO,AAA" {
+				saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "03", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+				//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "03", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+			}
+		}
+	}
+}
+
+// @Description 附件
+// @Author J 2022/9/22 11:13
+func taskAtts(tmp map[string]interface{}) {
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	if tmp["projectinfo"] != nil {
+		if pinfo, o := tmp["projectinfo"].(map[string]interface{}); o {
+			if attsMap, ok := pinfo["attachments"].(map[string]interface{}); ok {
+				for _, attr := range attsMap {
+					if at, ok := attr.(map[string]interface{}); ok {
+						if util.ObjToString(at["fid"]) != "" {
+							ftype := ""
+							for _, s := range FileTypeArr {
+								ft := strings.ToLower(util.ObjToString(tmp["ftype"]))
+								if strings.Contains(ft, s) {
+									ftype = s
+									break
+								}
+							}
+							saveAttrPool <- map[string]interface{}{"infoid": id, "org_url": at["org_url"], "size": at["size"], "fid": at["fid"],
+								"filename": at["filename"], "ftype": ftype, "file_type": 0, "createtime": time.Now().Format(util.Date_Full_Layout)}
+						}
+					}
+				}
+			}
+		}
+	}
+	if attachTxt, o := tmp["attach_text"].(map[string]interface{}); o {
+		if len(attachTxt) > 0 {
+			for _, at := range attachTxt {
+				at1 := at.(map[string]interface{})
+				if len(at1) > 0 {
+					for k, v := range at1 {
+						if reflect.TypeOf(v).String() == "string" {
+							if util.ObjToString(at1["attach_url"]) != "" {
+								saveAttrPool <- map[string]interface{}{"infoid": id, "fid": at1["attach_url"], "filename": at1["file_name"], "file_type": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+							}
+							break
+						} else {
+							if at2, ok := at1[k].(map[string]interface{}); ok {
+								if util.ObjToString(at2["attach_url"]) != "" {
+									saveAttrPool <- map[string]interface{}{"infoid": id, "fid": at2["attach_url"], "filename": at2["file_name"], "file_type": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+								}
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+// @Description 拟建
+// @Author J 2022/9/22 15:56
+func taskInfoformat(tmp map[string]interface{}) {
+	if util.IntAll(tmp["infoformat"]) != 2 && tmp["projectinfo"] != nil {
+		return
+	}
+	if info, ok := tmp["projectinfo"].(map[string]interface{}); ok {
+		delete(info, "attachments")
+		if len(info) > 0 {
+			saveM := make(map[string]interface{})
+			for _, f := range IfmField {
+				if f == "infoid" {
+					saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+				} else if f == "createtime" || f == "updatetime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				} else if f == "approvetime" {
+					if info[f] != nil && util.IntAll(tmp[f]) > 0 {
+						saveM[f] = info[f]
+					}
+				} else {
+					if info[f] != nil {
+						saveM[f] = info[f]
+					}
+				}
+			}
+			saveIfmPool <- saveM
+		}
+	}
+}
+
+// @Description 采购意向
+// @Author J 2022/9/22 16:27
+func taskIntent(tmp map[string]interface{}) {
+	if arr, ok := tmp["procurementlist"].([]interface{}); ok {
+		for _, p := range arr {
+			p1 := p.(map[string]interface{})
+			saveM := make(map[string]interface{})
+			for _, f := range IntentField {
+				if f == "infoid" {
+					saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+				} else if f == "createtime" || f == "updatetime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				} else if f == "buyer_id" {
+					if b := util.ObjToString(tmp["buyer"]); b != "" {
+						//if code := redis.GetStr("qyxy_id", b); code != "" {
+						//	saveM[f] = code
+						//}
+						if code := getNameId(b); code != "" {
+							saveM[f] = code
+						}
+					}
+				} else {
+					if p1[f] != nil {
+						saveM[f] = p1[f]
+					}
+				}
+			}
+			saveIntentPool <- saveM
+		}
+	}
+}
+
+// @Description 中标单位
+// @Author J 2022/9/27 10:58
+func taskWinner(tmp map[string]interface{}) {
+	if wod, ok := tmp["winnerorder"].([]interface{}); ok {
+		for _, w := range wod {
+			w1 := w.(map[string]interface{})
+			if w1["sort"] != nil {
+				saveM := make(map[string]interface{})
+				for _, f := range WinnerField {
+					if f == "infoid" {
+						saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+					} else if f == "createtime" || f == "updatetime" {
+						saveM[f] = time.Now().Format(util.Date_Full_Layout)
+					} else if f == "winnersort" {
+						saveM[f] = util.IntAll(w1["sort"])
+					} else if f == "winner_id" {
+						if b := util.ObjToString(w1["entname"]); b != "" {
+							saveM["winner"] = b
+							//if code := redis.GetStr("qyxy_id", b); code != "" {
+							//	saveM[f] = code
+							//}
+							if code := getNameId(b); code != "" {
+								saveM[f] = code
+							}
+						}
+					} else if f == "package_id" {
+
+					}
+				}
+				saveWinnerPool <- saveM
+			}
+		}
+	}
+
+	warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
+	if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
+		warr = append(warr, util.ObjToString(tmp["winner"]))
+	}
+	for _, s := range warr {
+		saveM := make(map[string]interface{})
+		for _, f := range WinnerField {
+			if f == "infoid" {
+				saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+			} else if f == "createtime" || f == "updatetime" {
+				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+			} else if f == "winnersort" {
+				saveM[f] = 0
+			} else if f == "winner_id" {
+				if s != "" {
+					saveM["winner"] = s
+					//if code := redis.GetStr("qyxy_id", s); code != "" {
+					//	saveM[f] = code
+					//}
+					if code := getNameId(s); code != "" {
+						saveM[f] = code
+					}
+				}
+			}
+		}
+		saveWinnerPool <- saveM
+	}
+}
+
+func BinarySearch(s []string, k string) int {
+	sort.Strings(s)
+	lo, hi := 0, len(s)-1
+	for lo <= hi {
+		m := (lo + hi) >> 1
+		if s[m] < k {
+			lo = m + 1
+		} else if s[m] > k {
+			hi = m - 1
+		} else {
+			return m
+		}
+	}
+	return -1
+}
+
+func taskPackage(tmp map[string]interface{}) {
+
+}
+
+// @Description 标的物
+// @Author J 2022/9/29 16:48
+func taskPur(tmp map[string]interface{}) {
+	if plist, ok := tmp["purchasinglist"].([]interface{}); ok {
+		for _, p := range plist {
+			saveM := make(map[string]interface{})
+			p1 := p.(map[string]interface{})
+			for _, f := range PurField {
+				if f == "infoid" {
+					saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+				} else if f == "unitprice" || f == "totalprice" {
+					if p1[f] != nil {
+						if reflect.TypeOf(p1[f]).String() == "string" {
+
+						} else {
+							if util.Float64All(p1[f]) <= 10000000000 {
+								saveM[f], _ = util.FormatFloat(util.Float64All(p1[f]), 4)
+							}
+						}
+					}
+				} else {
+					if p1[f] != nil {
+						if reflect.TypeOf(p1[f]).String() == "string" {
+							if f == "item" || f == "itemname" || f == "brandname" {
+								if len(util.ObjToString(p1[f])) <= 500 {
+									saveM[f] = p1[f]
+								}
+							} else {
+								saveM[f] = p1[f]
+							}
+						} else {
+							saveM[f] = p1[f]
+						}
+					}
+				}
+			}
+			savePurPool <- saveM
+		}
+	}
+}

+ 46 - 0
data_mgo_to_tidb/common.toml

@@ -0,0 +1,46 @@
+
+[udp]
+locport = ":1681"
+[db]
+
+[db.mysql]
+addr = "192.168.3.14:4000"
+dbname = "global_common_data"
+size = 5
+user = "root"
+password = "=PDT49#80Z!RVv52_z"
+
+[db.mongob]
+addr = "192.168.3.207:27001"
+dbname = "qfw_data"
+size = 5
+user = "root"
+password = "root"
+[db.mongop]
+addr = "192.168.3.207:27001"
+dbname = "qfw_data"
+size = 5
+user = "root"
+password = "root"
+
+[mail]
+send = false
+to = "wangjianghan@topnet.net.cn"
+api = "http://172.17.145.179:19281/_send/_mail"
+
+# 日志
+[log]
+# 日志路径,为空将输出控制台
+logpath = ""
+# log size (M)
+maxsize = 10
+# compress log
+compress = true
+# log save  time (day)
+maxage =  7
+# save total log file total
+maxbackups = 10
+# log level
+loglevel  = "debug"
+# text or json output
+format = "text"

+ 84 - 0
data_mgo_to_tidb/config/conf.go

@@ -0,0 +1,84 @@
+package config
+
+import (
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/BurntSushi/toml"
+)
+
+var (
+	// Conf crocodile conf
+	Conf *conf
+)
+
+// Init Config
+func Init(conf string) {
+	_, err := toml.DecodeFile(conf, &Conf)
+	if err != nil {
+		fmt.Printf("Err %v", err)
+		os.Exit(1)
+	}
+}
+
+type conf struct {
+	Udp  udp
+	DB   db
+	Mail mail
+	Log  log
+}
+
+type udp struct {
+	LocPort string
+}
+
+type mail struct {
+	Send bool
+	To   string
+	Api  string
+}
+
+// Log Config
+type log struct {
+	LogPath    string
+	MaxSize    int
+	Compress   bool
+	MaxAge     int
+	MaxBackups int
+	LogLevel   string
+	Format     string
+}
+
+type db struct {
+	MongoB mgo
+	MongoP mgo
+	Mysql  mysql
+}
+
+type mgo struct {
+	Addr     string
+	Dbname   string
+	Size     int
+	User     string
+	Password string
+}
+
+type mysql struct {
+	Addr     string
+	Dbname   string
+	Size     int
+	User     string
+	Password string
+}
+
+type duration struct {
+	time.Duration
+}
+
+// UnmarshalText parse 10s to time.Time
+func (d *duration) UnmarshalText(text []byte) error {
+	var err error
+	d.Duration, err = time.ParseDuration(string(text))
+	return err
+}

+ 53 - 0
data_mgo_to_tidb/config/conf_test.go

@@ -0,0 +1,53 @@
+package config
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+)
+
+var confs = `# log
+[log]
+logpath = ""
+maxsize = 10
+compress = true
+maxage =  7
+maxbackups = 10
+loglevel  = "info"
+format = "text"
+
+[serve]
+grpcAddr = "192.168.3.12:10021"
+udpPort = "1782"
+
+[db]
+[db.mongo]
+addr = "192.168.3.207:27092"
+dbname = "qfw"
+size = 10
+user = ""
+password = ""
+[db.mongo1]
+addr = "192.168.3.207:27092"
+dbname = "wjh"
+size = 5
+user = ""
+password = ""
+
+[db.es]
+addr = "http://192.168.3.206:9800"
+size = 5
+indexm = "medical_institution_v1"
+typem = "medical_institution"
+indexs = "supplier_product_v1"
+types = "supplier_product"
+
+`
+
+func TestInit(t *testing.T) {
+	testfile := "/tmp/crocodile.toml"
+	ioutil.WriteFile(testfile, []byte(confs), 0644)
+	Init(testfile)
+	t.Logf("%+v", Conf.Serve.GrpcAddr)
+	os.Remove(testfile)
+}

BIN
data_mgo_to_tidb/data_tidb


+ 206 - 0
data_mgo_to_tidb/field-criteria.json

@@ -0,0 +1,206 @@
+{
+  "dws_f_bid_baseinfo": {
+    "field_array": ["infoid", "area_code", "city_code", "district_code", "budget", "bidamount", "biddiscount", "title", "toptype_code",
+      "subtype_code", "projectname", "projectcode", "buyerclass_code", "publishtime", "bidopentime", "comeintime", "isValidFile", "multipackage",
+      "projectscope_id", "detail_id", "href", "purchasing", "site", "updatetime", "createtime", "buyer_id", "agency_id"],
+    "field_criteria": {
+      "budget": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "bidamount": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "biddiscount": {
+        "stype": "float",
+        "min": 0,
+        "max": 1000000,
+        "decimal": 4
+      },
+      "title": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "projectname": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "projectcode": {
+        "stype": "string",
+        "length": 100,
+        "intercept": true
+      },
+      "href": {
+        "stype": "string",
+        "length": 5000,
+        "intercept": false
+      },
+      "purchasing": {
+        "stype": "string",
+        "length": 2000,
+        "intercept": true
+      },
+      "site": {
+        "stype": "string",
+        "length": 100,
+        "intercept": false
+      }
+    }
+  },
+  "dws_f_bid_expand_baseinfo": {
+    "field_array": ["infoid", "projectperiod", "project_duration", "project_timeunit", "project_startdate", "project_completedate", "signstarttime",
+      "signendtime", "docstarttime", "docendtime", "bidendtime", "bidstarttime", "signaturedate", "bidway", "docamount", "agencyrate", "agencyfee",
+      "currency", "funds", "payway", "bidamounttype", "review_experts", "bidmethod", "bid_bond", "bid_guarantee", "contract_bond", "contract_guarantee",
+      "bidopenaddress", "contractcode", "supervisorrate", "getdocmethod", "projectaddr", "project_scale", "purchasing_tag", "updatetime", "createtime"],
+    "field_criteria": {
+      "projectperiod": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "project_timeunit": {
+        "stype": "string",
+        "length": 50,
+        "intercept": true
+      },
+      "funds": {
+        "stype": "string",
+        "length": 100,
+        "intercept": true
+      },
+      "currency": {
+        "stype": "string",
+        "length": 20,
+        "intercept": true
+      },
+      "payway": {
+        "stype": "string",
+        "length": 255,
+        "intercept": true
+      },
+      "bidmethod": {
+        "stype": "string",
+        "length": 255,
+        "intercept": true
+      },
+      "bid_bond": {
+        "stype": "string",
+        "length": 1000,
+        "intercept": true
+      },
+      "contract_bond": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "bidopenaddress": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "contractcode": {
+        "stype": "string",
+        "length": 100,
+        "intercept": false
+      },
+      "getdocmethod": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "projectaddr": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "project_scale": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "purchasing_tag": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "biddiscount": {
+        "stype": "float",
+        "min": 0,
+        "max": 999999,
+        "decimal": 4
+      },
+      "docamount": {
+        "stype": "float",
+        "min": 0,
+        "max": 99999999,
+        "decimal": 2
+      },
+      "agencyrate": {
+        "stype": "float",
+        "min": 0,
+        "max": 999999,
+        "decimal": 4
+      },
+      "agencyfee": {
+        "stype": "float",
+        "min": 0,
+        "max": 99999999,
+        "decimal": 2
+      },
+      "supervisorrate": {
+        "stype": "float",
+        "min": 0,
+        "max": 999999,
+        "decimal": 4
+      }
+    }
+  },
+  "dws_f_project_baseinfo": {
+    "field_array": ["projectid", "projectcode", "projectname", "area_code", "city_code", "district_code", "budget", "bidamount", "bidstatus", "bidtype",
+      "bidopentime", "createtime", "firsttime", "zbtime", "jgtime", "lasttime", "buyer_id", "agency_id", "updatetime"],
+    "field_criteria": {
+      "budget": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "bidamount": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "projectname": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      }
+    }
+  },
+  "dws_f_project_business": {
+    "field_array": ["projectid", "buyer_id", "agency_id", "winner_id", "area_code", "city_code", "district_code", "budget", "bidamount", "bidstatus", "bidtype",
+      "buyerclass_code", "firsttime", "zbtime", "jgtime", "lasttime", "bidopentime"],
+    "field_criteria": {
+      "budget": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "bidamount": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      }
+    }
+  }
+}

+ 12 - 0
data_mgo_to_tidb/go.mod

@@ -0,0 +1,12 @@
+module data_tidb
+
+go 1.16
+
+require (
+	github.com/BurntSushi/toml v1.2.0
+	github.com/shopspring/decimal v1.3.1
+	github.com/spf13/cobra v1.5.0
+	go.mongodb.org/mongo-driver v1.10.3
+	go.uber.org/zap v1.23.0
+	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230620072956-7ec055be2061
+)

+ 218 - 0
data_mgo_to_tidb/go.sum

@@ -0,0 +1,218 @@
+cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
+github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
+github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
+github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI=
+github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
+github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
+github.com/aws/aws-sdk-go v1.43.21/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
+github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
+github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
+github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
+github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
+github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dchest/captcha v1.0.0 h1:vw+bm/qMFvTgcjQlYVTuQBJkarm5R0YSsDKhm1HZI2o=
+github.com/dchest/captcha v1.0.0/go.mod h1:7zoElIawLp7GUMLcj54K9kbw+jEyvz2K0FDdRRYhvWo=
+github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
+github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
+github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
+github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
+github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
+github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
+github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
+github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
+github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
+github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
+github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
+github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
+github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
+github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
+github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
+github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
+github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
+github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
+github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
+github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
+github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM=
+github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak=
+github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU=
+github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM=
+github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
+github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
+github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
+github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
+github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
+github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.mongodb.org/mongo-driver v1.10.3 h1:XDQEvmh6z1EUsXuIkXE9TaVeqHw6SwS1uf93jFs0HBA=
+go.mongodb.org/mongo-driver v1.10.3/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
+go.opentelemetry.io/otel v1.5.0/go.mod h1:Jm/m+rNp/z0eqJc74H7LPwQ3G87qkU/AnnAydAjSAHk=
+go.opentelemetry.io/otel/trace v1.5.0/go.mod h1:sq55kfhjXYr1zVSyexg0w1mpa03AYXR5eyTkB9NPPdE=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
+go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
+go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
+go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
+go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
+golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
+golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
+golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
+golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
+golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
+google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
+google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
+google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
+google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
+google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
+google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230620072956-7ec055be2061 h1:xfmoVsDEqVv1XzAVxgGEDi+W9ojHBJc6OmTbg3b1tP0=
+jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230620072956-7ec055be2061/go.mod h1:1Rp0ioZBhikjXHYYXmnzL6RNfvTDM/2XvRB+vuPLurI=

+ 211 - 0
data_mgo_to_tidb/init.go

@@ -0,0 +1,211 @@
+package main
+
+import (
+	"data_tidb/config"
+	"fmt"
+	"go.uber.org/zap"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mysqldb"
+	"os"
+)
+
+var (
+	MongoB, MongoP *mongodb.MongodbSim
+	MysqlTool      *mysqldb.Mysql
+
+	FCriteria map[string]interface{}
+
+	saveSize       = 200
+	saveBasePool   = make(chan map[string]interface{}, 5000)
+	saveBaseSp     = make(chan bool, 1)
+	saveExpandPool = make(chan map[string]interface{}, 5000)
+	saveExpandSp   = make(chan bool, 1)
+	saveTagPool    = make(chan map[string]interface{}, 5000)
+	saveTagSp      = make(chan bool, 1)
+	saveAttrPool   = make(chan map[string]interface{}, 5000)
+	saveAttrSp     = make(chan bool, 1)
+	saveIfmPool    = make(chan map[string]interface{}, 5000)
+	saveIfmSp      = make(chan bool, 1)
+	saveIntentPool = make(chan map[string]interface{}, 5000)
+	saveIntentSp   = make(chan bool, 1)
+	saveWinnerPool = make(chan map[string]interface{}, 5000)
+	saveWinnerSp   = make(chan bool, 1)
+	savePkgPool    = make(chan map[string]interface{}, 5000)
+	savePkgSp      = make(chan bool, 1)
+	savePurPool    = make(chan map[string]interface{}, 5000)
+	savePurSp      = make(chan bool, 1)
+
+	saveProPool    = make(chan map[string]interface{}, 5000)
+	saveProSp      = make(chan bool, 1)
+	saveProbPool   = make(chan map[string]interface{}, 5000)
+	saveProbSp     = make(chan bool, 1)
+	saveProTagPool = make(chan map[string]interface{}, 5000)
+	saveProTagSp   = make(chan bool, 1)
+
+	saveRelationPool = make(chan map[string]interface{}, 5000)
+	saveRelationSp   = make(chan bool, 1)
+
+	saveErrPool = make(chan map[string]interface{}, 5000)
+	saveErrSp   = make(chan bool, 1)
+
+	AreaCode     = make(map[string]string, 5000)
+	TopTypeCode  = make(map[string]string, 10)
+	SubTypeCode  = make(map[string]string, 40)
+	BuyerCode    = make(map[string]string, 100)
+	TopScopeCode = make(map[string]interface{}, 20)
+	SubScopeCode = make(map[string]interface{}, 70)
+
+	BaseField   []string
+	BaseVMap    map[string]interface{}
+	ExpandField []string
+	ExpandVMap  map[string]interface{}
+
+	ProField    []string
+	ProVMap     map[string]interface{}
+	ProBusField []string
+	ProBusVMap  map[string]interface{}
+
+	TagsField    = []string{"infoid", "labelcode", "labelvalues", "labelweight", "createtime"}
+	AttrField    = []string{"infoid", "filename", "fid", "ftype", "org_url", "size", "file_type", "createtime"}
+	FileTypeArr  = []string{"pdf", "doc", "docx", "xlsx", "xls", "jpg", "zip", "rar", "txt", "gif", "png", "bmp", "swf", "html"}
+	IfmField     = []string{"infoid", "approvecode", "approvedept", "approvestatus", "approvetime", "approvenumber", "approvecontent", "projecttype", "approvecity", "updatetime", "createtime"}
+	PurField     = []string{"infoid", "item", "itemname", "brandname", "specs", "model", "unitname", "number", "unitprice", "totalprice"}
+	IntentField  = []string{"infoid", "projectname", "projectscope", "item", "buyer_id", "totalprice", "expurasingtime", "reserved_amount", "updatetime", "createtime"}
+	WinnerField  = []string{"infoid", "winner_id", "winnersort", "package_id", "createtime", "updatetime"}
+	PackageField = []string{"infoid", "packagename", "packagename", "packageorgin", "packagedetail", "budget", "bidamount", "winner_id", "updatetime", "createtime"}
+
+	ProTagsField = []string{"projectid", "labelcode", "labelvalues", "labelweight", "createtime"}
+
+	RelationField = []string{"projectid", "infoid", "name_id", "contact_id", "identity_type", "createtime"}
+)
+
+// InitLog @Description
+// @Author J 2022/7/26 15:30
+func InitLog() {
+	logcfg := config.Conf.Log
+
+	err := log.InitLog(
+		log.Path(logcfg.LogPath),
+		log.Level(logcfg.LogLevel),
+		log.Compress(logcfg.Compress),
+		log.MaxSize(logcfg.MaxSize),
+		log.MaxBackups(logcfg.MaxBackups),
+		log.MaxAge(logcfg.MaxAge),
+		log.Format(logcfg.Format),
+	)
+	if err != nil {
+		fmt.Printf("InitLog failed: %v\n", err)
+		os.Exit(1)
+	}
+}
+
+func InitMgo() {
+	MongoB = &mongodb.MongodbSim{
+		MongodbAddr: config.Conf.DB.MongoB.Addr,
+		DbName:      config.Conf.DB.MongoB.Dbname,
+		Size:        config.Conf.DB.MongoB.Size,
+		UserName:    config.Conf.DB.MongoB.User,
+		Password:    config.Conf.DB.MongoB.Password,
+	}
+	MongoB.InitPool()
+	MongoP = &mongodb.MongodbSim{
+		MongodbAddr: config.Conf.DB.MongoP.Addr,
+		DbName:      config.Conf.DB.MongoP.Dbname,
+		Size:        config.Conf.DB.MongoP.Size,
+		UserName:    config.Conf.DB.MongoP.User,
+		Password:    config.Conf.DB.MongoP.Password,
+	}
+	MongoP.InitPool()
+}
+
+func InitMysql() {
+	dbcfg := config.Conf.DB.Mysql
+	MysqlTool = &mysqldb.Mysql{
+		Address:  dbcfg.Addr,
+		DBName:   dbcfg.Dbname,
+		UserName: dbcfg.User,
+		PassWord: dbcfg.Password,
+	}
+	MysqlTool.Init()
+}
+
+func InitField() {
+	info := MysqlTool.Find("code_area", nil, "", "", -1, -1)
+	for _, m := range *info {
+		var key string
+		for i, v := range []string{"area", "city", "district"} {
+			if i == 0 && util.ObjToString(m[v]) != "" {
+				key = util.ObjToString(m[v])
+			} else if util.ObjToString(m[v]) != "" {
+				key += "," + util.ObjToString(m[v])
+			}
+		}
+		AreaCode[key] = util.ObjToString(m["code"])
+	}
+	log.Info("InitField", zap.Int("AreaCode", len(AreaCode)))
+
+	info1 := MysqlTool.Find("code_bidtopsubtype", nil, "", "", -1, -1)
+	for _, m := range *info1 {
+		if util.IntAll(m["level"]) == 1 {
+			TopTypeCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+		}
+		if util.IntAll(m["level"]) == 2 {
+			SubTypeCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+		}
+	}
+	log.Info("InitField", zap.Int("TopTypeCode", len(TopTypeCode)), zap.Int("SubTypeCode", len(SubTypeCode)))
+
+	info2 := MysqlTool.Find("code_buyerclass", nil, "", "", -1, -1)
+	for _, m := range *info2 {
+		BuyerCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+	}
+	log.Info("InitField", zap.Int("BuyerCode", len(BuyerCode)))
+
+	info3 := MysqlTool.Find("code_bidscope", nil, "", "", -1, -1)
+	for _, m := range *info3 {
+		if util.IntAll(m["level"]) == 1 {
+			TopScopeCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+		}
+		if util.IntAll(m["level"]) == 2 {
+			SubScopeCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+		}
+	}
+	log.Info("InitField", zap.Int("TopScopeCode", len(TopScopeCode)), zap.Int("SubScopeCode", len(SubScopeCode)))
+
+	util.ReadConfig("./field-criteria.json", &FCriteria)
+	if len(FCriteria) > 0 {
+		if m, o := FCriteria["dws_f_bid_baseinfo"].(map[string]interface{}); o {
+			BaseField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+			BaseVMap = m["field_criteria"].(map[string]interface{})
+		} else {
+			log.Error("InitField", zap.String("field_array", "dws_f_bid_baseinfo"))
+			panic("dws_f_bid_baseinfo")
+		}
+		if m, o := FCriteria["dws_f_bid_expand_baseinfo"].(map[string]interface{}); o {
+			ExpandField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+			ExpandVMap = m["field_criteria"].(map[string]interface{})
+		} else {
+			log.Error("InitField", zap.String("field_array", "dws_f_bid_expand_baseinfo"))
+			panic("dws_f_bid_expand_baseinfo")
+		}
+		if m, o := FCriteria["dws_f_project_baseinfo"].(map[string]interface{}); o {
+			ProField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+			ProVMap = m["field_criteria"].(map[string]interface{})
+		} else {
+			log.Error("InitField", zap.String("field_array", "dws_f_project_baseinfo"))
+			panic("dws_f_project_baseinfo")
+		}
+		if m, o := FCriteria["dws_f_project_business"].(map[string]interface{}); o {
+			ProBusField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+			ProBusVMap = m["field_criteria"].(map[string]interface{})
+		} else {
+			log.Error("InitField", zap.String("field_array", "dws_f_project_business"))
+			panic("dws_f_project_business")
+		}
+	} else {
+		log.Error("InitField, 未找到field-criteria.json文件")
+		panic("InitField, 未找到field-criteria.json文件")
+	}
+}

+ 846 - 0
data_mgo_to_tidb/main.go

@@ -0,0 +1,846 @@
+package main
+
+import (
+	"data_tidb/config"
+	"encoding/json"
+	"fmt"
+	"github.com/spf13/cobra"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.uber.org/zap"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
+	"net"
+	"sync"
+	"time"
+)
+
+var (
+	UdpClient udp.UdpClient
+)
+
+func init() {
+	config.Init("./common.toml")
+	InitLog()
+	InitMgo()
+	InitMysql()
+	InitField()
+
+	redis.InitRedis1("qyxy_id=127.0.0.1:8379", 1)
+	//redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
+	log.Info("init success")
+}
+
+func main() {
+	//
+	//go SaveFunc()
+	//go SaveTagFunc()
+	//go SaveExpandFunc()
+	//go SaveAttrFunc()
+	//go SaveImfFunc()
+	//go SaveIntentFunc()
+	//go SaveWinnerFunc()
+	//go SavePackageFunc()
+	//go SavePurFunc()
+	//go saveErrMethod()
+
+	rootCmd := &cobra.Command{Use: "my cmd"}
+	rootCmd.AddCommand(bidding())
+	rootCmd.AddCommand(project())
+	rootCmd.AddCommand(projectAdd())
+	if err := rootCmd.Execute(); err != nil {
+		fmt.Println("rootCmd.Execute failed", err.Error())
+	}
+
+	//go SaveRelationFunc()
+	//taskMysql()
+
+	//UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
+	//UdpClient.Listen(processUdpMsg)
+	//log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
+
+	c := make(chan bool, 1)
+	<-c
+
+}
+
+func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
+	defer util.Catch()
+	switch act {
+	case udp.OP_TYPE_DATA: //上个节点的数据
+		var mapInfo map[string]interface{}
+		err := json.Unmarshal(data, &mapInfo)
+		log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
+		gtid, _ := mapInfo["gtid"].(string)
+		lteid, _ := mapInfo["lteid"].(string)
+		if err != nil {
+			UdpClient.WriteUdp([]byte("tidb udp error"), udp.OP_NOOP, ra) //udp失败回写
+		} else {
+			//udp成功回写
+			if k := util.ObjToString(mapInfo["key"]); k != "" {
+				UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
+			} else {
+				k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
+				UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
+			}
+			log.Info("start sync ...")
+			doBiddingTask(gtid, lteid, mapInfo)
+		}
+	}
+}
+
+func taskMysql() {
+	pool := make(chan bool, 5) //控制线程数
+	wg := &sync.WaitGroup{}
+
+	finalId := 0
+	lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation"))
+	//lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation_new"))
+	if len(*lastInfo) > 0 {
+		finalId = util.IntAll((*lastInfo)[0]["id"])
+	}
+	log.Info("查询最后id---", zap.Int("finally id: ", finalId))
+	lastid, count := 0, 0
+	for {
+		log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
+		q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_bpmc_relation", lastid)
+		//q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
+		//q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
+		rows, err := MysqlTool.DB.Query(q)
+		if err != nil {
+			log.Error("mysql query err ", zap.Error(err))
+		}
+		columns, err := rows.Columns()
+		if finalId == lastid {
+			log.Info("----finish-----", zap.Int("count: ", count))
+			break
+		}
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				log.Error("mysql scan err ", zap.Error(err))
+				break
+			}
+			for i, col := range values {
+				if col == nil {
+					ret[columns[i]] = nil
+				} else {
+					switch val := (*scanArgs[i].(*interface{})).(type) {
+					case byte:
+						ret[columns[i]] = val
+						break
+					case []byte:
+						v := string(val)
+						switch v {
+						case "\x00": // 处理数据类型为bit的情况
+							ret[columns[i]] = 0
+						case "\x01": // 处理数据类型为bit的情况
+							ret[columns[i]] = 1
+						default:
+							ret[columns[i]] = v
+							break
+						}
+						break
+					case time.Time:
+						if val.IsZero() {
+							ret[columns[i]] = nil
+						} else {
+							ret[columns[i]] = val.Format("2006-01-02 15:04:05")
+						}
+						break
+					default:
+						ret[columns[i]] = val
+					}
+				}
+			}
+			lastid = util.IntAll(ret["id"])
+			count++
+			if count%20000 == 0 {
+				log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
+			}
+			pool <- true
+			wg.Add(1)
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				//cid := util.Int64All(tmp["id"])
+				//iid := util.ObjToString(tmp["infoid"])
+				//name_id := util.ObjToString(tmp["name_id"])
+				//identity_type := util.Int64All(tmp["identity_type+0"])
+				//if name_id != "" {
+				//	coll := "bidding"
+				//	if iid > "5a862e7040d2d9bbe88e3b1f" {
+				//		coll = "bidding"
+				//	} else {
+				//		coll = "bidding_back"
+				//	}
+				//	info, _ := MongoB.FindById(coll, iid, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
+				//	if len(*info) > 0 {
+				//		if identity_type == 1 {
+				//			if util.ObjToString((*info)["buyertel"]) != "" {
+				//				q := make(map[string]interface{})
+				//				q["name_id"] = name_id
+				//				q["identity_type"] = identity_type
+				//				q["contact_tel"] = util.ObjToString((*info)["buyertel"])
+				//				if util.ObjToString((*info)["buyerperson"]) != "" {
+				//					q["contact_name"] = util.ObjToString((*info)["buyerperson"])
+				//				}
+				//				cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+				//				if cinfo != nil && len(*cinfo) > 0 {
+				//					MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+				//				}
+				//			}
+				//		} else if identity_type == 2 {
+				//			if util.ObjToString((*info)["winnertel"]) != "" {
+				//				q := make(map[string]interface{})
+				//				q["name_id"] = name_id
+				//				q["identity_type"] = identity_type
+				//				q["contact_tel"] = util.ObjToString((*info)["winnertel"])
+				//				if util.ObjToString((*info)["winnerperson"]) != "" {
+				//					q["contact_name"] = util.ObjToString((*info)["winnerperson"])
+				//				}
+				//				cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+				//				if cinfo != nil && len(*cinfo) > 0 {
+				//					MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+				//				}
+				//			}
+				//		} else if identity_type == 4 {
+				//			if util.ObjToString((*info)["agencytel"]) != "" {
+				//				q := make(map[string]interface{})
+				//				q["name_id"] = name_id
+				//				q["identity_type"] = identity_type
+				//				q["contact_tel"] = util.ObjToString((*info)["agencytel"])
+				//				if util.ObjToString((*info)["agencyperson"]) != "" {
+				//					q["contact_name"] = util.ObjToString((*info)["agencyperson"])
+				//				}
+				//				cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+				//				if cinfo != nil && len(*cinfo) > 0 {
+				//					MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+				//				}
+				//			}
+				//		}
+				//	}
+				//}
+
+				//redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
+
+				saveM := make(map[string]interface{})
+				if util.ObjToString(tmp["name_id"]) != "" {
+					saveM["name_id"] = util.ObjToString(tmp["name_id"])
+				} else {
+					return
+				}
+				if util.ObjToString(tmp["contact_id"]) != "" {
+					saveM["contact_id"] = util.IntAll(tmp["contact_id"])
+				} else {
+					return
+				}
+				saveM["projectid"] = util.ObjToString(tmp["projectid"])
+				saveM["infoid"] = util.ObjToString(tmp["infoid"])
+				saveM["identity_type"] = tmp["identity_type"]
+				saveRelationPool <- saveM
+			}(ret)
+			ret = make(map[string]interface{})
+		}
+		_ = rows.Close()
+		wg.Wait()
+	}
+}
+
+func taskMgo() {
+	sess := MongoP.GetMgoConn()
+	defer MongoP.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 3)
+	wg := &sync.WaitGroup{}
+
+	q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}}
+	field := map[string]interface{}{"ids": 1}
+	query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			util.Debug("current ---", count, tmp["_id"])
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			id := mongodb.BsonIdToSId(tmp["_id"])
+			for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) {
+				redis.PutCKV("s_id", i, id)
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+
+	util.Debug("over ---", count)
+}
+
+// @Description 标讯数据
+// @Author J 2022/9/20 17:52
+func bidding() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "bidding",
+		Short: "Start processing bidding data",
+		Run: func(cmd *cobra.Command, args []string) {
+
+			go SaveFunc()
+			go SaveTagFunc()
+			go SaveExpandFunc()
+			go SaveAttrFunc()
+			go SaveImfFunc()
+			go SaveIntentFunc()
+			go SaveWinnerFunc()
+			go SavePackageFunc()
+			go SavePurFunc()
+
+			go saveErrMethod()
+			taskB()
+		},
+	}
+	//cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
+	return cmdClient
+}
+
+// @Description 项目数据
+// @Author J 2022/9/20 17:52
+func project() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "project",
+		Short: "Start processing project data",
+		Run: func(cmd *cobra.Command, args []string) {
+			//go SaveProFunc()
+			//go SaveProTagFunc()
+			//go SaveProbFunc()
+			go SaveRelationFunc()
+			taskP()
+		},
+	}
+	//cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
+	return cmdClient
+}
+
+// @Description 项目数据
+// @Author J 2022/9/20 17:52
+func projectAdd() *cobra.Command {
+	var pici int64
+	cmdClient := &cobra.Command{
+		Use:   "project",
+		Short: "Start processing project data",
+		Run: func(cmd *cobra.Command, args []string) {
+			//go SaveProFunc()
+			//go SaveProTagFunc()
+			//go SaveProbFunc()
+			go SaveRelationFunc()
+			taskPAdd(pici)
+		},
+	}
+	cmdClient.Flags().Int64VarP(&pici, "pici", "p", 0, "")
+	return cmdClient
+}
+
+func SaveFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveBasePool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveBaseSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBaseSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveBaseSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBaseSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_baseinfo", BaseField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveExpandFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveExpandPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveExpandSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveExpandSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveExpandSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveExpandSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveTagFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveTagPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveTagSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveTagSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveTagSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveTagSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveAttrFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveAttrPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveAttrSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveAttrSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveAttrSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveAttrSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveImfFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveIfmPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveIfmSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveIfmSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveIfmSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveIfmSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SavePurFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-savePurPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				savePurSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePurSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				savePurSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePurSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveIntentFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveIntentPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveIntentSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveIntentSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveIntentSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveIntentSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveWinnerFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveWinnerPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveWinnerSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveWinnerSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveWinnerSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveWinnerSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SavePackageFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-savePkgPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				savePkgSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePkgSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				savePkgSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePkgSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveProFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveProPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveProSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveProSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveProbFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveProbPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveProbSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProbSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveProbSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProbSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveProTagFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveProTagPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveProTagSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProTagSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveProTagSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProTagSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveRelationFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveRelationPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveRelationSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveRelationSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveRelationSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveRelationSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+// 字段错误数据
+func saveErrMethod() {
+	arru := make([]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveErrPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveErrSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveErrSp
+					}()
+					MongoB.SaveBulk("bidding_tidb_f_err", arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveErrSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveErrSp
+					}()
+					MongoB.SaveBulk("bidding_tidb_f_err", arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}

+ 486 - 0
data_mgo_to_tidb/project.go

@@ -0,0 +1,486 @@
+package main
+
+import (
+	"data_tidb/config"
+	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.uber.org/zap"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
+	"strings"
+	"sync"
+	"time"
+)
+
+func taskP() {
+	sess := MongoP.GetMgoConn()
+	defer MongoP.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+
+	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("64e5a63855d5406905c574e6")}
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(nil).Sort("-_id").Skip(100000).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			//taskPro(tmp)
+			//taskBusiness(tmp)
+			//taskProTag(tmp)
+
+			taskRelation(tmp)
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
+func taskPAdd(pici int64) {
+	sess := MongoP.GetMgoConn()
+	defer MongoP.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+
+	q := bson.M{"pici": bson.M{"$gt": pici}}
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(q).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			//taskPro(tmp)
+			//taskBusiness(tmp)
+			//taskProTag(tmp)
+
+			taskRelation2(tmp)
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
+var BidStatus = map[string]int{
+	"预告": 0,
+	"拟建": 1,
+	"招标": 2,
+	"中标": 3,
+	"成交": 4,
+	"废标": 5,
+	"流标": 6,
+	"合同": 7,
+	"其它": 8,
+}
+var BidType = map[string]int{
+	"招标": 0,
+	"邀标": 1,
+	"单一": 2,
+	"竞价": 3,
+	"竞谈": 4,
+	"询价": 5,
+}
+
+// @Description 基础信息
+// @Author J 2022/9/22 18:32
+func taskPro(tmp map[string]interface{}) {
+	saveM := make(map[string]interface{})
+	for _, f := range ProField {
+		if f == "projectid" {
+			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+		} else if f == "area_code" {
+			if tmp["area"] != nil {
+				saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
+			}
+		} else if f == "city_code" {
+			if tmp["area"] != nil && tmp["city"] != nil {
+				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
+				saveM[f] = AreaCode[c]
+			}
+		} else if f == "district_code" {
+			if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
+				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
+				saveM[f] = AreaCode[c]
+			}
+		} else if f == "updatetime" {
+			saveM[f] = time.Now().Format(util.Date_Full_Layout)
+		} else if f == "buyerclass_code" {
+			if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
+				saveM[f] = BuyerCode[obj]
+			}
+		} else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
+			if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
+				t := util.Int64All(tmp[f])
+				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+			}
+		} else if f == "bidstatus" {
+			if b := util.ObjToString(tmp[f]); b != "" {
+				tmp[f] = BidStatus[b]
+			}
+		} else if f == "bidtype" {
+			if b := util.ObjToString(tmp[f]); b != "" {
+				tmp[f] = BidType[b]
+			}
+		} else if f == "multipackage" {
+			if tmp[f] == nil {
+				saveM[f] = 0
+			} else {
+				saveM[f] = tmp[f]
+			}
+		} else if f == "buyer_id" {
+			if b := util.ObjToString(tmp["buyer"]); b != "" {
+				if code := redis.GetStr("qyxy_id", b); code != "" {
+					saveM[f] = code
+				}
+			}
+		} else if f == "agency_id" {
+			if b := util.ObjToString(tmp["agency"]); b != "" {
+				if code := redis.GetStr("qyxy_id", b); code != "" {
+					saveM[f] = code
+				}
+			}
+		} else {
+			if tmp[f] != nil {
+				if ProVMap[f] != nil {
+					saveM[f], _ = verifyF(f, tmp[f], ProVMap[f])
+				} else {
+					saveM[f] = tmp[f]
+				}
+			}
+		}
+	}
+	saveProPool <- saveM
+}
+
+// @Description 项目业务表
+// @Author J 2022/9/30 13:40
+func taskBusiness(tmp map[string]interface{}) {
+	warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
+	if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
+		warr = append(warr, util.ObjToString(tmp["winner"]))
+	}
+	for _, s := range warr {
+		saveM := make(map[string]interface{})
+		for _, f := range ProBusField {
+			if f == "projectid" {
+				saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+			} else if f == "area_code" {
+				if tmp["area"] != nil {
+					saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
+				}
+			} else if f == "city_code" {
+				if tmp["area"] != nil && tmp["city"] != nil {
+					c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
+					saveM[f] = AreaCode[c]
+				}
+			} else if f == "district_code" {
+				if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
+					c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
+					saveM[f] = AreaCode[c]
+				}
+			} else if f == "updatetime" {
+				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+			} else if f == "buyerclass_code" {
+				if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
+					saveM[f] = BuyerCode[obj]
+				}
+			} else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
+				if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
+					t := util.Int64All(tmp[f])
+					saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+				}
+			} else if f == "bidstatus" {
+				if b := util.ObjToString(tmp[f]); b != "" {
+					tmp[f] = BidStatus[b]
+				}
+			} else if f == "bidtype" {
+				if b := util.ObjToString(tmp[f]); b != "" {
+					tmp[f] = BidType[b]
+				}
+			} else if f == "buyer_id" {
+				if b := util.ObjToString(tmp["buyer"]); b != "" {
+					saveM["buyer"] = b
+					if code := redis.GetStr("qyxy_id", b); code != "" {
+						saveM[f] = code
+					}
+				}
+			} else if f == "agency_id" {
+				if b := util.ObjToString(tmp["agency"]); b != "" {
+					saveM["agency"] = b
+					if code := redis.GetStr("qyxy_id", b); code != "" {
+						saveM[f] = code
+					}
+				}
+			} else if f == "winner_id" {
+				if s != "" {
+					saveM["winner"] = s
+					if code := redis.GetStr("qyxy_id", s); code != "" {
+						saveM[f] = code
+					}
+				}
+			} else {
+				if tmp[f] != nil {
+					if ProBusVMap[f] != nil {
+						saveM[f], _ = verifyF(f, tmp[f], ProBusVMap[f])
+					} else {
+						saveM[f] = tmp[f]
+					}
+				}
+			}
+		}
+		saveProbPool <- saveM
+	}
+
+}
+
+// @Description 项目信息标签
+// @Author J 2022/9/30 13:54
+func taskProTag(tmp map[string]interface{}) {
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	if topArr, ok := tmp["topscopeclass"].([]interface{}); ok {
+		for _, i2 := range topArr {
+			tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母
+			code := TopScopeCode[tclass]
+			saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+		}
+	}
+	if subArr, ok := tmp["subscopeclass"].([]interface{}); ok {
+		for _, i2 := range subArr {
+			sc := strings.Split(util.ObjToString(i2), "_")
+			code := SubScopeCode[sc[1]]
+			saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+		}
+	}
+}
+
+// @Description 关系表
+// @Author J 2022/9/30 13:56
+func taskRelation(tmp map[string]interface{}) {
+	pid := mongodb.BsonIdToSId(tmp["_id"])
+	if tmp["ids"] == nil {
+		log.Info("taskRelation ids err", zap.Any("id", pid))
+		return
+	}
+	ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
+	lid := ids[len(ids)-1]
+
+	//if b := util.ObjToString(tmp["buyer"]); b != "" {
+	//	saveM := make(map[string]interface{})
+	//
+	//	saveM["projectid"] = pid
+	//	saveM["infoid"] = lid
+	//	saveM["identity_type"] = 1
+	//	saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
+	//	if code := redis.GetStr("qyxy_id", b); code != "" {
+	//		saveM["name_id"] = code
+	//		if util.ObjToString(tmp["buyertel"]) != "" {
+	//			q := make(map[string]interface{})
+	//			q["name_id"] = code
+	//			q["identity_type"] = 1
+	//			q["contact_tel"] = util.ObjToString(tmp["buyertel"])
+	//			if util.ObjToString(tmp["buyerperson"]) != "" {
+	//				q["contact_name"] = util.ObjToString(tmp["buyerperson"])
+	//			}
+	//			cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+	//			if cinfo != nil && len(*cinfo) > 0 {
+	//				saveM["contact_id"] = (*cinfo)["id"]
+	//				saveRelationPool <- saveM
+	//			}
+	//		}
+	//	}
+	//}
+
+	//if a := util.ObjToString(tmp["agency"]); a != "" {
+	//	saveM := make(map[string]interface{})
+	//	saveM["projectid"] = pid
+	//	saveM["infoid"] = lid
+	//	saveM["identity_type"] = 4
+	//	saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
+	//	if code := redis.GetStr("qyxy_id", a); code != "" {
+	//		saveM["name_id"] = code
+	//		if util.ObjToString(tmp["agencytel"]) != "" {
+	//			q := make(map[string]interface{})
+	//			q["name_id"] = code
+	//			q["identity_type"] = 4 // 100
+	//			q["contact_tel"] = util.ObjToString(tmp["agencytel"])
+	//			if util.ObjToString(tmp["agencyperson"]) != "" {
+	//				q["contact_name"] = util.ObjToString(tmp["agencyperson"])
+	//			}
+	//			cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+	//			if cinfo != nil && len(*cinfo) > 0 {
+	//				saveM["contact_id"] = (*cinfo)["id"]
+	//				saveRelationPool <- saveM
+	//			}
+	//		}
+	//	}
+	//}
+
+	for _, item := range tmp["list"].([]interface{}) {
+		item1 := item.(map[string]interface{})
+		sw := util.ObjToString(item1["s_winner"])
+		if !strings.Contains(sw, ",") {
+			if code := redis.GetStr("qyxy_id", sw); code != "" {
+				saveM := make(map[string]interface{})
+				saveM["projectid"] = pid
+				saveM["infoid"] = lid
+				saveM["identity_type"] = 2
+				saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
+				saveM["name_id"] = code
+				if util.ObjToString(item1["winnertel"]) != "" {
+					q := make(map[string]interface{})
+					q["name_id"] = code
+					q["identity_type"] = 2 // 010
+					q["contact_tel"] = util.ObjToString(item1["winnertel"])
+					if util.ObjToString(item1["winnerperson"]) != "" {
+						q["contact_name"] = util.ObjToString(item1["winnerperson"])
+					}
+					cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+					if cinfo != nil && len(*cinfo) > 0 {
+						saveM["contact_id"] = (*cinfo)["id"]
+						saveRelationPool <- saveM
+					}
+				}
+			}
+		}
+	}
+}
+
+func taskRelation2(tmp map[string]interface{}) {
+
+	pid := mongodb.BsonIdToSId(tmp["_id"])
+	if tmp["ids"] == nil {
+		log.Info("taskRelation ids err", zap.Any("id", pid))
+		return
+	}
+	info := MysqlTool.Find("dws_f_bpmc_relation", bson.M{"projectid": pid}, "", "", -1, -1)
+	if len(*info) > 0 {
+
+	} else {
+		ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
+		lid := ids[len(ids)-1]
+
+		if b := util.ObjToString(tmp["buyer"]); b != "" {
+			saveM := make(map[string]interface{})
+			for _, f := range RelationField {
+				if f == "projectid" {
+					saveM[f] = pid
+				} else if f == "infoid" {
+					saveM[f] = lid
+				} else if f == "name_id" {
+					if code := redis.GetStr("qyxy_id", b); code != "" {
+						saveM[f] = code
+						if util.ObjToString(tmp["buyertel"]) != "" {
+							q := make(map[string]interface{})
+							q["name_id"] = code
+							q["identity_type"] = 1
+							q["contact_tel"] = util.ObjToString(tmp["buyertel"])
+							if util.ObjToString(tmp["buyerperson"]) != "" {
+								q["contact_name"] = util.ObjToString(tmp["buyerperson"])
+							}
+							cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+							if cinfo != nil && len(*cinfo) > 0 {
+								saveM["contact_id"] = (*cinfo)["id"]
+							}
+						}
+					}
+				} else if f == "identity_type" {
+					saveM[f] = 1 // 001
+				} else if f == "createtime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				}
+			}
+			saveRelationPool <- saveM
+		}
+
+		if a := util.ObjToString(tmp["agency"]); a != "" {
+			saveM := make(map[string]interface{})
+			for _, f := range RelationField {
+				if f == "projectid" {
+					saveM[f] = pid
+				} else if f == "infoid" {
+					saveM[f] = lid
+				} else if f == "name_id" {
+					if code := redis.GetStr("qyxy_id", a); code != "" {
+						saveM[f] = code
+						if util.ObjToString(tmp["buyertel"]) != "" {
+							q := make(map[string]interface{})
+							q["name_id"] = code
+							q["identity_type"] = 4
+							q["contact_tel"] = util.ObjToString(tmp["agencytel"])
+							if util.ObjToString(tmp["agencyperson"]) != "" {
+								q["contact_name"] = util.ObjToString(tmp["agencyperson"])
+							}
+							cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+							if cinfo != nil && len(*cinfo) > 0 {
+								saveM["contact_id"] = (*cinfo)["id"]
+							}
+						}
+					}
+				} else if f == "identity_type" {
+					saveM[f] = 4 // 100
+				} else if f == "createtime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				}
+			}
+			saveRelationPool <- saveM
+		}
+
+		warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
+		if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
+			warr = append(warr, util.ObjToString(tmp["winner"]))
+		}
+		for _, ws := range warr {
+			saveM := make(map[string]interface{})
+			for _, f := range RelationField {
+				if f == "projectid" {
+					saveM[f] = pid
+				} else if f == "infoid" {
+					saveM[f] = lid
+				} else if f == "name_id" {
+					if code := redis.GetStr("qyxy_id", ws); code != "" {
+						saveM[f] = code
+						if util.ObjToString(tmp["buyertel"]) != "" {
+							q := make(map[string]interface{})
+							q["name_id"] = code
+							q["identity_type"] = 2
+							q["contact_tel"] = util.ObjToString(tmp["winnertel"])
+							if util.ObjToString(tmp["winnerperson"]) != "" {
+								q["contact_name"] = util.ObjToString(tmp["winnerperson"])
+							}
+							cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+							if cinfo != nil && len(*cinfo) > 0 {
+								saveM["contact_id"] = (*cinfo)["id"]
+							}
+						}
+					}
+				} else if f == "identity_type" {
+					saveM[f] = 2 // 010
+				} else if f == "createtime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				}
+			}
+			saveRelationPool <- saveM
+		}
+	}
+}

+ 56 - 0
data_mgo_to_tidb/util.go

@@ -0,0 +1,56 @@
+package main
+
+import (
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"reflect"
+	"strings"
+	"unicode/utf8"
+)
+
+// @Description 字段验证、处理
+// @Author J 2022/10/8 17:16
+func verifyF(f string, v interface{}, tmp1 interface{}) (interface{}, bool) {
+	tmp := tmp1.(map[string]interface{})
+	stype := util.ObjToString(tmp["stype"])
+	vt := reflect.TypeOf(v).String()
+	if strings.Contains(vt, stype) {
+		switch stype {
+		case "string":
+			v1 := util.ObjToString(v)
+			lens := util.IntAll(tmp["length"])
+			if tmp["intercept"].(bool) {
+				if utf8.RuneCountInString(v1) > lens {
+					return string(([]rune(v1))[:lens]), true
+				} else {
+					return v1, false
+				}
+			} else {
+				if utf8.RuneCountInString(v1) > lens {
+					return "", true
+				} else {
+					return v1, false
+				}
+			}
+		case "float":
+			v1 := util.Float64All(v)
+			min := util.Float64All(tmp["mix"])
+			max := util.Float64All(tmp["max"])
+			if min != 0 && v1 < min {
+				return nil, true
+			}
+			if max != 0 && v1 >= max {
+				return nil, true
+			}
+			dm := util.IntAll(tmp["decimal"]) //小数点位数
+			v2, err := util.FormatFloat(v1, dm)
+			if err != nil {
+				return nil, true
+			}
+			return v2, false
+
+		default:
+			break
+		}
+	}
+	return nil, true
+}

+ 20 - 0
data_monitor/config.go

@@ -0,0 +1,20 @@
+package main
+
+type (
+	Config struct {
+		CornExp string `json:"cornexp"`
+		Es      struct {
+			Address      string `json:"address"`
+			DbSize       int    `json:"dbSize"`
+			BiddingIndex string `json:"biddingIndex"`
+			ProjectIndex string `json:"projectIndex"`
+			Version      string `json:"version"`
+			UserName     string `json:"userName"`
+			Password     string `json:"password"`
+		} `json:"es"`
+		WxKey           string `json:"wxKey"`
+		WxApi           string `json:"wxApi"`
+		LastProjectTime int64  `json:"lastProjectTime"`
+		LastBiddingTime int64  `json:"lastBiddingTime"`
+	}
+)

+ 16 - 0
data_monitor/config.json

@@ -0,0 +1,16 @@
+{
+	"cornexp": "0 0 */2 * * ?",
+	"es": {
+		"address": "http://172.17.4.184:19800",
+		"dbSize": 10,
+		"biddingIndex": "bidding",
+		"projectIndex": "projectset",
+		"version": "v7",
+		"userName": "",
+  		"password":
+	},
+	"wxKey": "45962efc-ca87-4996-9ffa-08bf6608ab7a",
+	"wxApi": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=%s",
+	"lastProjectTime": 1660438800,
+	"lastBiddingTime": 1660438800,
+}

BIN
data_monitor/data_monitor


+ 93 - 0
data_monitor/main.go

@@ -0,0 +1,93 @@
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	elastic "es"
+	"fmt"
+	"io/ioutil"
+	"log"
+	"net/http"
+	common "qfw/util"
+
+	"github.com/robfig/cron"
+)
+
+var (
+	Es                       elastic.Es
+	cfg                      = new(Config)
+	projectTime, biddingTime = 0, 0
+)
+
+func main() {
+	common.ReadConfig(&cfg)
+	log.Println("cfg ", cfg)
+	Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
+	runJob()
+	c := cron.New()
+	c.AddFunc(cfg.CornExp, func() {
+		runJob()
+	})
+	c.Start()
+	select {}
+}
+
+func runJob() {
+	log.Println("增量数据查询开始")
+	lastProjectTime, isOk := getData(cfg.LastProjectTime, cfg.Es.ProjectIndex)
+	if isOk {
+		cfg.LastProjectTime = lastProjectTime
+	}
+	lastBiddingTime, isOks := getData(cfg.LastBiddingTime, cfg.Es.BiddingIndex)
+	if isOks {
+		cfg.LastBiddingTime = lastBiddingTime
+	}
+	common.WriteSysConfig(cfg)
+	log.Println("增量数据查询结束")
+}
+
+// Send("cbs告警:超过"+fmt.Sprint(SysConfig.TimeExpire)+"个小时未更新数据", WxKey)
+func getData(LastTime int64, index string) (int64, bool) {
+	endTime, isOk := int64(0), true
+	esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d"}}}}},"_source":["pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
+	idQuery := fmt.Sprintf(esquery, LastTime)
+	res := Es.Get(index, index, idQuery)
+	if res != nil && *res != nil && len(*res) == 1 {
+		endTime = common.Int64All((*res)[0]["pici"])
+		if index == cfg.Es.BiddingIndex {
+			biddingTime = 0
+		} else if index == cfg.Es.ProjectIndex {
+			projectTime = 0
+		}
+		log.Println("本次任务查找到数据...", endTime)
+	} else {
+		endTime = LastTime
+		isOk = false
+		log.Println("本次任务未查找到数据...", idQuery)
+		if index == cfg.Es.BiddingIndex {
+			biddingTime += 2
+		} else if index == cfg.Es.ProjectIndex {
+			projectTime += 2
+		}
+		Send(index+fmt.Sprint(projectTime)+"个小时未更新数据", cfg.WxKey)
+	}
+	return endTime, isOk
+}
+
+func Send(msg, key string) {
+	m := map[string]interface{}{
+		"msgtype": "text",
+		"text": map[string]string{
+			"content": msg,
+		},
+	}
+	b, _ := json.Marshal(m)
+	res, err := http.Post(fmt.Sprintf(cfg.WxApi, key), "application/json", bytes.NewReader(b))
+	if err != nil {
+		log.Println("发送出错", err)
+	} else {
+		defer res.Body.Close()
+		resByte, _ := ioutil.ReadAll(res.Body)
+		log.Println("发送结果", string(resByte))
+	}
+}

+ 5 - 4
monitor/main.go

@@ -4,16 +4,17 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"io"
+	"monitor/config"
+	"net/http"
+	"time"
+
 	"github.com/robfig/cron"
 	"github.com/spf13/cobra"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
-	"io"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
-	"monitor/config"
-	"net/http"
-	"time"
 )
 
 var (