Jianghan 2 anos atrás
pai
commit
b24ad63753

+ 2 - 0
.gitignore

@@ -0,0 +1,2 @@
+field_sync
+field_dispose

+ 14 - 5
README.md

@@ -7,12 +7,21 @@
 + rpc serve-name extract_expert_service
 
 
-后续修改
-id段内,只需要通过微服务中心获取到5个ip+端口(非重复)可以一直重复处理数据
-
-
 ## field_sync 数据处理流程-数据索引之前字段同步及处理
 + 抽取字段同步到bidding表
 + 剑鱼关键词处理  
 + isValidFile 附件有效字段  
-+ entidlist 中标单位id字段  
++ entidlist 中标单位id字段  
+
+## data_tidb 数据处理流程-数据同步到tidb库(bidding、proejctset)  
++ bidding数据  
+  + 基本信息
+  + 扩展信息
+  + 标的物信息
+  + 附件信息
+  + 拟建数据信息
+  + 采购意向信息
+  + 采购单位、代理机构唯一标识  
++ projectset数据
+  + 基础信息
+  + 

+ 493 - 0
data_tidb/bidding.go

@@ -0,0 +1,493 @@
+package main
+
+import (
+	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
+	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"app.yhyue.com/data_processing/common_utils/redis"
+	"data_tidb/config"
+	"fmt"
+	"github.com/shopspring/decimal"
+	"reflect"
+	"regexp"
+	"sort"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+var (
+	regLetter = regexp.MustCompile("[a-z]*")
+)
+
+func taskB() {
+	sess := MongoB.GetMgoConn()
+	defer MongoB.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 10)
+	wg := &sync.WaitGroup{}
+
+	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5f4c9672c014544073734928")}
+	//q := map[string]interface{}{"_id": map[string]interface{}{"$gt": mongodb.StringTOBsonId("5ad8c18fa5cb26b9b72ce098")}}
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("bidding_back").Find(nil).Sort("_id").Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+
+			if util.IntAll(tmp["extracttype"]) != -1 {
+				taskBase(tmp)
+				//taskExpand(tmp)
+				//taskTags(tmp)
+				//taskAtts(tmp)
+				//taskInfoformat(tmp)
+				//taskIntent(tmp)
+				//taskWinner(tmp)
+				//taskPackage(tmp)
+				//taskPur(tmp)
+			}
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
+// @Description 基本信息
+// @Author J 2022/9/22 11:12
+func taskBase(tmp map[string]interface{}) {
+	saveM := make(map[string]interface{})
+	var errf []string // 异常字段
+	for _, f := range BaseField {
+		if f == "infoid" {
+			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+		} else if f == "area_code" {
+			if tmp["area"] != nil {
+				saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
+			}
+		} else if f == "city_code" {
+			if tmp["area"] != nil && tmp["city"] != nil {
+				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
+				saveM[f] = AreaCode[c]
+			}
+		} else if f == "district_code" {
+			if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
+				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
+				saveM[f] = AreaCode[c]
+			}
+		} else if f == "toptype_code" {
+			if obj := util.ObjToString(tmp["toptype"]); obj != "" {
+				saveM[f] = TopTypeCode[obj]
+			}
+		} else if f == "subtype_code" {
+			if obj := util.ObjToString(tmp["subtype"]); obj != "" {
+				saveM[f] = SubTypeCode[obj]
+			}
+		} else if f == "buyerclass_code" {
+			if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
+				saveM[f] = BuyerCode[obj]
+			}
+		} else if f == "createtime" || f == "updatetime" {
+			saveM[f] = time.Now().Format(util.Date_Full_Layout)
+		} else if f == "comeintime" || f == "publishtime" || f == "bidopentime" {
+			if tmp[f] != nil && util.IntAll(tmp[f]) > 0 {
+				t := util.Int64All(tmp[f])
+				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+			}
+		} else if f == "multipackage" || f == "isValidFile" {
+			if tmp[f] == nil {
+				saveM[f] = 0
+			} else {
+				saveM[f] = tmp[f]
+			}
+		} else if f == "buyer_id" {
+			if b := util.ObjToString(tmp["buyer"]); b != "" {
+				saveM["buyer"] = b
+				if code := redis.GetStr("qyxy_id", b); code != "" {
+					saveM[f] = code
+				}
+			}
+		} else if f == "agency_id" {
+			if b := util.ObjToString(tmp["agency"]); b != "" {
+				saveM["agency"] = b
+				if code := redis.GetStr("qyxy_id", b); code != "" {
+					saveM[f] = code
+				}
+			} else {
+				if tmp[f] != nil {
+					saveM[f] = tmp[f]
+				}
+			}
+		} else {
+			if tmp[f] != nil {
+				if BaseVMap[f] != nil {
+					var b bool
+					saveM[f], b = verifyF(f, tmp[f], BaseVMap[f])
+					// 保存异常字段数据
+					if b {
+						errf = append(errf, f)
+					}
+				} else {
+					saveM[f] = tmp[f]
+				}
+			}
+		}
+	}
+	saveBasePool <- saveM
+	if len(errf) > 0 {
+		saveErrPool <- map[string]interface{}{"infoid": mongodb.BsonIdToSId(tmp["_id"]), "f": strings.Join(errf, ",")}
+	}
+}
+
+// @Description 扩展信息
+// @Author J 2022/9/22 11:13
+func taskExpand(tmp map[string]interface{}) {
+	saveM := make(map[string]interface{})
+	for _, f := range ExpandField {
+		if f == "infoid" {
+			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+		} else if f == "project_startdate" || f == "project_completedate" || f == "signaturedate" || f == "bidendtime" || f == "bidstarttime" {
+			if tmp[f] != nil {
+				t := util.Int64All(tmp[f])
+				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+			}
+		} else if f == "createtime" || f == "updatetime" {
+			saveM[f] = time.Now().Format(util.Date_Full_Layout)
+		} else if f == "bidway" {
+			if util.ObjToString(tmp[f]) == "电子投标" {
+				saveM[f] = 1
+			} else if util.ObjToString(tmp[f]) == "纸质投标" {
+				saveM[f] = 0
+			}
+		} else if f == "review_experts" {
+			if tmp[f] != nil {
+				if reflect.TypeOf(tmp[f]).String() == "string" {
+					saveM[f] = tmp[f]
+				} else if reflect.TypeOf(tmp[f]).String() == "[]interface {}" {
+					if arr, ok := tmp[f].([]interface{}); ok {
+						saveM[f] = strings.Join(util.ObjArrToStringArr(arr), ",")
+					}
+				}
+			}
+		} else if f == "bid_guarantee" || f == "contract_guarantee" {
+			if tmp[f] != nil {
+				if tmp[f].(bool) {
+					saveM[f] = 1
+				} else {
+					saveM[f] = 0
+				}
+			}
+		} else if f == "supervisorrate" || f == "agencyfee" {
+			if tmp[f] != nil {
+				if reflect.TypeOf(tmp[f]).String() == "string" {
+					v2, err := strconv.ParseFloat(strings.ReplaceAll(util.ObjToString(tmp[f]), "%", ""), 64)
+					if err != nil {
+						v, _ := decimal.NewFromFloat(v2).Div(decimal.NewFromFloat(float64(100))).Float64()
+						saveM[f] = v
+					}
+				} else {
+					saveM[f], _ = util.FormatFloat(util.Float64All(tmp[f]), 4)
+				}
+			}
+		} else if f == "project_duration" {
+			if tmp[f] != nil {
+				tmp[f] = util.IntAll(tmp[f])
+			}
+		} else {
+			if tmp[f] != nil {
+				if reflect.TypeOf(tmp[f]).String() == "string" {
+					if f == "projectperiod" || f == "payway" || f == "bidopenaddress" {
+						if len(util.ObjToString(tmp[f])) <= 500 {
+							saveM[f] = tmp[f]
+						}
+					} else if f == "funds" || f == "getdocmethod" || f == "project_scale" {
+						if len(util.ObjToString(tmp[f])) <= 5000 {
+							saveM[f] = tmp[f]
+						}
+					} else {
+						saveM[f] = tmp[f]
+					}
+				} else {
+					saveM[f] = tmp[f]
+				}
+			}
+		}
+	}
+	saveExpandPool <- saveM
+}
+
+// @Description 标签记录
+// @Author J 2022/9/22 11:13
+func taskTags(tmp map[string]interface{}) {
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	if topArr, ok := tmp["topscopeclass"].([]interface{}); ok {
+		for _, i2 := range topArr {
+			tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母
+			code := TopScopeCode[tclass]
+			saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+			//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+		}
+	}
+	if subArr, ok := tmp["subscopeclass"].([]interface{}); ok {
+		for _, i2 := range subArr {
+			sc := strings.Split(util.ObjToString(i2), "_")
+			if len(sc) > 1 {
+				code := SubScopeCode[sc[1]]
+				saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+			}
+			//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+		}
+	}
+	if tArr, ok := tmp["certificate_class"].([]interface{}); ok {
+		for _, i2 := range tArr {
+			if util.ObjToString(i2) == "ISO" {
+				saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "01", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+				//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "01", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+			} else if util.ObjToString(i2) == "AAA" {
+				saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "02", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+				//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "02", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+			} else if util.ObjToString(i2) == "ISO,AAA" {
+				saveTagPool <- map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "03", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+				//MysqlTool.Insert("bid_tags", map[string]interface{}{"infoid": id, "labelcode": "3", "labelvalues": "03", "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)})
+			}
+		}
+	}
+}
+
+// @Description 附件
+// @Author J 2022/9/22 11:13
+func taskAtts(tmp map[string]interface{}) {
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	if tmp["projectinfo"] != nil {
+		if pinfo, o := tmp["projectinfo"].(map[string]interface{}); o {
+			if attsMap, ok := pinfo["attachments"].(map[string]interface{}); ok {
+				for _, attr := range attsMap {
+					if at, ok := attr.(map[string]interface{}); ok {
+						if util.ObjToString(at["fid"]) != "" {
+							ftype := ""
+							for _, s := range FileTypeArr {
+								ft := strings.ToLower(util.ObjToString(tmp["ftype"]))
+								if strings.Contains(ft, s) {
+									ftype = s
+									break
+								}
+							}
+							saveAttrPool <- map[string]interface{}{"infoid": id, "org_url": at["org_url"], "size": at["size"], "fid": at["fid"],
+								"filename": at["filename"], "ftype": ftype, "file_type": 0, "createtime": time.Now().Format(util.Date_Full_Layout)}
+						}
+					}
+				}
+			}
+		}
+	}
+	if attachTxt, o := tmp["attach_text"].(map[string]interface{}); o {
+		if len(attachTxt) > 0 {
+			for _, at := range attachTxt {
+				at1 := at.(map[string]interface{})
+				if len(at1) > 0 {
+					for k, v := range at1 {
+						if reflect.TypeOf(v).String() == "string" {
+							if util.ObjToString(at1["attach_url"]) != "" {
+								saveAttrPool <- map[string]interface{}{"infoid": id, "fid": at1["attach_url"], "filename": at1["file_name"], "file_type": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+							}
+							break
+						} else {
+							if at2, ok := at1[k].(map[string]interface{}); ok {
+								if util.ObjToString(at2["attach_url"]) != "" {
+									saveAttrPool <- map[string]interface{}{"infoid": id, "fid": at2["attach_url"], "filename": at2["file_name"], "file_type": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+								}
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+// @Description 拟建
+// @Author J 2022/9/22 15:56
+func taskInfoformat(tmp map[string]interface{}) {
+	if util.IntAll(tmp["infoformat"]) != 2 && tmp["projectinfo"] != nil {
+		return
+	}
+	if info, ok := tmp["projectinfo"].(map[string]interface{}); ok {
+		delete(info, "attachments")
+		if len(info) > 0 {
+			saveM := make(map[string]interface{})
+			for _, f := range IfmField {
+				if f == "infoid" {
+					saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+				} else if f == "createtime" || f == "updatetime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				} else if f == "approvetime" {
+					if info[f] != nil && util.IntAll(tmp[f]) > 0 {
+						saveM[f] = info[f]
+					}
+				} else {
+					if info[f] != nil {
+						saveM[f] = info[f]
+					}
+				}
+			}
+			saveIfmPool <- saveM
+		}
+	}
+}
+
+// @Description 采购意向
+// @Author J 2022/9/22 16:27
+func taskIntent(tmp map[string]interface{}) {
+	if arr, ok := tmp["procurementlist"].([]interface{}); ok {
+		for _, p := range arr {
+			p1 := p.(map[string]interface{})
+			saveM := make(map[string]interface{})
+			for _, f := range IntentField {
+				if f == "infoid" {
+					saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+				} else if f == "createtime" || f == "updatetime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				} else if f == "buyer_id" {
+					if b := util.ObjToString(tmp["buyer"]); b != "" {
+						if code := redis.GetStr("qyxy_id", b); code != "" {
+							saveM[f] = code
+						}
+					}
+				} else {
+					if p1[f] != nil {
+						saveM[f] = p1[f]
+					}
+				}
+			}
+			saveIntentPool <- saveM
+		}
+	}
+}
+
+// @Description 中标单位
+// @Author J 2022/9/27 10:58
+func taskWinner(tmp map[string]interface{}) {
+	if wod, ok := tmp["winnerorder"].([]interface{}); ok {
+		for _, w := range wod {
+			w1 := w.(map[string]interface{})
+			if w1["sort"] != nil {
+				saveM := make(map[string]interface{})
+				for _, f := range WinnerField {
+					if f == "infoid" {
+						saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+					} else if f == "createtime" || f == "updatetime" {
+						saveM[f] = time.Now().Format(util.Date_Full_Layout)
+					} else if f == "winnersort" {
+						saveM[f] = util.IntAll(w1["sort"])
+					} else if f == "winner_id" {
+						if b := util.ObjToString(w1["entname"]); b != "" {
+							saveM["winner"] = b
+							if code := redis.GetStr("qyxy_id", b); code != "" {
+								saveM[f] = code
+							}
+						}
+					} else if f == "package_id" {
+
+					}
+				}
+				saveWinnerPool <- saveM
+			}
+		}
+	}
+
+	warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
+	if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
+		warr = append(warr, util.ObjToString(tmp["winner"]))
+	}
+	for _, s := range warr {
+		saveM := make(map[string]interface{})
+		for _, f := range WinnerField {
+			if f == "infoid" {
+				saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+			} else if f == "createtime" || f == "updatetime" {
+				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+			} else if f == "winnersort" {
+				saveM[f] = 0
+			} else if f == "winner_id" {
+				if s != "" {
+					saveM["winner"] = s
+					if code := redis.GetStr("qyxy_id", s); code != "" {
+						saveM[f] = code
+					}
+				}
+			}
+		}
+		saveWinnerPool <- saveM
+	}
+}
+
+func BinarySearch(s []string, k string) int {
+	sort.Strings(s)
+	lo, hi := 0, len(s)-1
+	for lo <= hi {
+		m := (lo + hi) >> 1
+		if s[m] < k {
+			lo = m + 1
+		} else if s[m] > k {
+			hi = m - 1
+		} else {
+			return m
+		}
+	}
+	return -1
+}
+
+func taskPackage(tmp map[string]interface{}) {
+
+}
+
+// @Description 标的物
+// @Author J 2022/9/29 16:48
+func taskPur(tmp map[string]interface{}) {
+	if plist, ok := tmp["purchasinglist"].([]interface{}); ok {
+		for _, p := range plist {
+			saveM := make(map[string]interface{})
+			p1 := p.(map[string]interface{})
+			for _, f := range PurField {
+				if f == "infoid" {
+					saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+				} else if f == "unitprice" || f == "totalprice" {
+					if p1[f] != nil {
+						if reflect.TypeOf(p1[f]).String() == "string" {
+
+						} else {
+							if util.Float64All(p1[f]) <= 10000000000 {
+								saveM[f], _ = util.FormatFloat(util.Float64All(p1[f]), 4)
+							}
+						}
+					}
+				} else {
+					if p1[f] != nil {
+						if reflect.TypeOf(p1[f]).String() == "string" {
+							if f == "item" || f == "itemname" || f == "brandname" {
+								if len(util.ObjToString(p1[f])) <= 500 {
+									saveM[f] = p1[f]
+								}
+							} else {
+								saveM[f] = p1[f]
+							}
+						} else {
+							saveM[f] = p1[f]
+						}
+					}
+				}
+			}
+			savePurPool <- saveM
+		}
+	}
+}

+ 43 - 0
data_tidb/common.toml

@@ -0,0 +1,43 @@
+
+[db]
+[db.mysql]
+addr = "192.168.3.14:4000"
+dbname = "global_common_data"
+size = 5
+user = "root"
+password = "=PDT49#80Z!RVv52_z"
+
+[db.mongob]
+addr = "192.168.3.207:27001"
+dbname = "qfw_data"
+size = 5
+user = "root"
+password = "root"
+[db.mongop]
+addr = "192.168.3.207:27001"
+dbname = "qfw_data"
+size = 5
+user = "root"
+password = "root"
+
+[mail]
+send = false
+to = "wangjianghan@topnet.net.cn"
+api = "http://172.17.145.179:19281/_send/_mail"
+
+# 日志
+[log]
+# 日志路径,为空将输出控制台
+logpath = ""
+# log size (M)
+maxsize = 10
+# compress log
+compress = true
+# log save  time (day)
+maxage =  7
+# save total log file total
+maxbackups = 10
+# log level
+loglevel  = "debug"
+# text or json output
+format = "text"

+ 85 - 0
data_tidb/config/conf.go

@@ -0,0 +1,85 @@
+package config
+
+import (
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/BurntSushi/toml"
+)
+
+var (
+	// Conf crocodile conf
+	Conf *conf
+)
+
+// Init Config
+func Init(conf string) {
+	_, err := toml.DecodeFile(conf, &Conf)
+	if err != nil {
+		fmt.Printf("Err %v", err)
+		os.Exit(1)
+	}
+}
+
+type conf struct {
+	DB   db
+	Mail mail
+	Log  log
+}
+
+type udpNext struct {
+	Addr  string
+	Port  int
+	Stype string
+}
+
+type mail struct {
+	Send bool
+	To   string
+	Api  string
+}
+
+// Log Config
+type log struct {
+	LogPath    string
+	MaxSize    int
+	Compress   bool
+	MaxAge     int
+	MaxBackups int
+	LogLevel   string
+	Format     string
+}
+
+type db struct {
+	MongoB mgo
+	MongoP mgo
+	Mysql  mysql
+}
+
+type mgo struct {
+	Addr     string
+	Dbname   string
+	Size     int
+	User     string
+	Password string
+}
+
+type mysql struct {
+	Addr     string
+	Dbname   string
+	Size     int
+	User     string
+	Password string
+}
+
+type duration struct {
+	time.Duration
+}
+
+// UnmarshalText parse 10s to time.Time
+func (d *duration) UnmarshalText(text []byte) error {
+	var err error
+	d.Duration, err = time.ParseDuration(string(text))
+	return err
+}

+ 53 - 0
data_tidb/config/conf_test.go

@@ -0,0 +1,53 @@
+package config
+
+import (
+	"io/ioutil"
+	"os"
+	"testing"
+)
+
+var confs = `# log
+[log]
+logpath = ""
+maxsize = 10
+compress = true
+maxage =  7
+maxbackups = 10
+loglevel  = "info"
+format = "text"
+
+[serve]
+grpcAddr = "192.168.3.12:10021"
+udpPort = "1782"
+
+[db]
+[db.mongo]
+addr = "192.168.3.207:27092"
+dbname = "qfw"
+size = 10
+user = ""
+password = ""
+[db.mongo1]
+addr = "192.168.3.207:27092"
+dbname = "wjh"
+size = 5
+user = ""
+password = ""
+
+[db.es]
+addr = "http://192.168.3.206:9800"
+size = 5
+indexm = "medical_institution_v1"
+typem = "medical_institution"
+indexs = "supplier_product_v1"
+types = "supplier_product"
+
+`
+
+func TestInit(t *testing.T) {
+	testfile := "/tmp/crocodile.toml"
+	ioutil.WriteFile(testfile, []byte(confs), 0644)
+	Init(testfile)
+	t.Logf("%+v", Conf.Serve.GrpcAddr)
+	os.Remove(testfile)
+}

BIN
data_tidb/data_tidb


BIN
data_tidb/data_tidb_linux


+ 98 - 0
data_tidb/field-criteria.json

@@ -0,0 +1,98 @@
+{
+  "dws_f_bid_baseinfo": {
+    "field_array": ["infoid", "area_code", "city_code", "district_code", "budget", "bidamount", "biddiscount", "title", "toptype_code",
+      "subtype_code", "projectname", "projectcode", "buyerclass_code", "publishtime", "bidopentime", "comeintime", "isValidFile", "multipackage",
+      "projectscope_id", "detail_id", "href", "purchasing", "site", "updatetime", "createtime", "buyer_id", "agency_id"],
+    "field_criteria": {
+      "budget": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "bidamount": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "biddiscount": {
+        "stype": "float",
+        "min": 0,
+        "max": 1000000,
+        "decimal": 4
+      },
+      "title": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "projectname": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      },
+      "projectcode": {
+        "stype": "string",
+        "length": 100,
+        "intercept": true
+      },
+      "href": {
+        "stype": "string",
+        "length": 5000,
+        "intercept": false
+      },
+      "purchasing": {
+        "stype": "string",
+        "length": 2000,
+        "intercept": true
+      },
+      "site": {
+        "stype": "string",
+        "length": 100,
+        "intercept": false
+      }
+    }
+  },
+  "dws_f_project_baseinfo": {
+    "field_array": ["projectid", "projectcode", "projectname", "area_code", "city_code", "district_code", "budget", "bidamount", "bidstatus", "bidtype",
+      "bidopentime", "createtime", "firsttime", "zbtime", "jgtime", "lasttime", "buyer_id", "agency_id", "updatetime"],
+    "field_criteria": {
+      "budget": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "bidamount": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "projectname": {
+        "stype": "string",
+        "length": 500,
+        "intercept": true
+      }
+    }
+  },
+  "dws_f_project_business": {
+    "field_array": ["projectid", "buyer_id", "agency_id", "winner_id", "area_code", "city_code", "district_code", "budget", "bidamount", "bidstatus", "bidtype",
+      "buyerclass_code", "firsttime", "zbtime", "jgtime", "lasttime", "bidopentime"],
+    "field_criteria": {
+      "budget": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      },
+      "bidamount": {
+        "stype": "float",
+        "min": 0,
+        "max": 100000000000,
+        "decimal": 4
+      }
+    }
+  }
+}

+ 12 - 0
data_tidb/go.mod

@@ -0,0 +1,12 @@
+module data_tidb
+
+go 1.16
+
+require (
+	app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d
+	github.com/BurntSushi/toml v1.2.0
+	github.com/shopspring/decimal v1.3.1
+	github.com/spf13/cobra v1.5.0
+	go.mongodb.org/mongo-driver v1.10.3
+	go.uber.org/zap v1.23.0
+)

+ 131 - 0
data_tidb/go.sum

@@ -0,0 +1,131 @@
+app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d h1:Nh2rC3LBqh0alvam2vr4is/vbUaPkl0rbZxVETx3nmk=
+app.yhyue.com/data_processing/common_utils v0.0.0-20220927054143-d9e97522625d/go.mod h1:9PlRUNzirlF/LL1W7fA7koCudxJe3uO5nshDWlCnGo8=
+github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
+github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
+github.com/PuerkitoBio/goquery v1.8.0 h1:PJTF7AmFCFKk1N6V6jmKfrNH9tV5pNE6lZMkG0gta/U=
+github.com/PuerkitoBio/goquery v1.8.0/go.mod h1:ypIiRMtY7COPGk+I/YbZLbxsxn9g5ejnI2HSMtkjZvI=
+github.com/andybalholm/cascadia v1.3.1 h1:nhxRkql1kdYCc8Snf7D5/D3spOX+dBgjA6u8x004T2c=
+github.com/andybalholm/cascadia v1.3.1/go.mod h1:R4bJ1UQfqADjvDa4P6HZHLh/3OxWWEqc0Sk8XGwHqvA=
+github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
+github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
+github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dchest/captcha v1.0.0 h1:vw+bm/qMFvTgcjQlYVTuQBJkarm5R0YSsDKhm1HZI2o=
+github.com/dchest/captcha v1.0.0/go.mod h1:7zoElIawLp7GUMLcj54K9kbw+jEyvz2K0FDdRRYhvWo=
+github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
+github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
+github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
+github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
+github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
+github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
+github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
+github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
+github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
+github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
+github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
+github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
+github.com/spf13/cobra v1.5.0 h1:X+jTBEBqF0bHN+9cSMgmfuvv2VHJ9ezmFNf9Y/XstYU=
+github.com/spf13/cobra v1.5.0/go.mod h1:dWXEIy2H428czQCjInthrTRUg7yKbok+2Qi/yBIJoUM=
+github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
+github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
+github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
+github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
+github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
+github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
+github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
+github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
+github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
+github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
+github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.mongodb.org/mongo-driver v1.10.3 h1:XDQEvmh6z1EUsXuIkXE9TaVeqHw6SwS1uf93jFs0HBA=
+go.mongodb.org/mongo-driver v1.10.3/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
+go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
+go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
+go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
+go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
+go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
+go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
+go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
+go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
+golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
+golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
+golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
+golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
+golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
+golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
+gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/olivere/elastic.v2 v2.0.61/go.mod h1:CTVyl1gckiFw1aLZYxC00g3f9jnHmhoOKcWF7W3c6n4=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

+ 204 - 0
data_tidb/init.go

@@ -0,0 +1,204 @@
+package main
+
+import (
+	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
+	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"app.yhyue.com/data_processing/common_utils/mysqldb"
+	"data_tidb/config"
+	"fmt"
+	"go.uber.org/zap"
+	"os"
+)
+
+var (
+	MongoB, MongoP *mongodb.MongodbSim
+	MysqlTool      *mysqldb.Mysql
+
+	FCriteria map[string]interface{}
+
+	saveSize       = 200
+	saveBasePool   = make(chan map[string]interface{}, 5000)
+	saveBaseSp     = make(chan bool, 1)
+	saveExpandPool = make(chan map[string]interface{}, 5000)
+	saveExpandSp   = make(chan bool, 1)
+	saveTagPool    = make(chan map[string]interface{}, 5000)
+	saveTagSp      = make(chan bool, 1)
+	saveAttrPool   = make(chan map[string]interface{}, 5000)
+	saveAttrSp     = make(chan bool, 1)
+	saveIfmPool    = make(chan map[string]interface{}, 5000)
+	saveIfmSp      = make(chan bool, 1)
+	saveIntentPool = make(chan map[string]interface{}, 5000)
+	saveIntentSp   = make(chan bool, 1)
+	saveWinnerPool = make(chan map[string]interface{}, 5000)
+	saveWinnerSp   = make(chan bool, 1)
+	savePkgPool    = make(chan map[string]interface{}, 5000)
+	savePkgSp      = make(chan bool, 1)
+	savePurPool    = make(chan map[string]interface{}, 5000)
+	savePurSp      = make(chan bool, 1)
+
+	saveProPool    = make(chan map[string]interface{}, 5000)
+	saveProSp      = make(chan bool, 1)
+	saveProbPool   = make(chan map[string]interface{}, 5000)
+	saveProbSp     = make(chan bool, 1)
+	saveProTagPool = make(chan map[string]interface{}, 5000)
+	saveProTagSp   = make(chan bool, 1)
+
+	saveRelationPool = make(chan map[string]interface{}, 5000)
+	saveRelationSp   = make(chan bool, 1)
+
+	saveErrPool = make(chan map[string]interface{}, 5000)
+	saveErrSp   = make(chan bool, 1)
+
+	AreaCode     = make(map[string]string, 5000)
+	TopTypeCode  = make(map[string]string, 10)
+	SubTypeCode  = make(map[string]string, 40)
+	BuyerCode    = make(map[string]string, 100)
+	TopScopeCode = make(map[string]interface{}, 20)
+	SubScopeCode = make(map[string]interface{}, 70)
+
+	BaseField   []string
+	BaseVMap    map[string]interface{}
+	ProField    []string
+	ProVMap     map[string]interface{}
+	ProBusField []string
+	ProBusVMap  map[string]interface{}
+	ExpandField = []string{"infoid", "projectperiod", "project_duration", "project_timeunit", "project_startdate", "project_completedate", "signstarttime",
+		"signendtime", "docstarttime", "docendtime", "bidendtime", "bidstarttime", "signaturedate", "bidway", "docamount", "agencyrate", "agencyfee",
+		"currency", "funds", "payway", "bidamounttype", "review_experts", "bidmethod", "bid_bond", "bid_guarantee", "contract_bond", "contract_guarantee",
+		"bidopenaddress", "contractcode", "supervisorrate", "getdocmethod", "projectaddr", "project_scale", "purchasing_tag", "updatetime", "createtime"}
+	TagsField    = []string{"infoid", "labelcode", "labelvalues", "labelweight", "createtime"}
+	AttrField    = []string{"infoid", "filename", "fid", "ftype", "org_url", "size", "file_type", "createtime"}
+	FileTypeArr  = []string{"pdf", "doc", "docx", "xlsx", "xls", "jpg", "zip", "rar", "txt", "gif", "png", "bmp", "swf", "html"}
+	IfmField     = []string{"infoid", "approvecode", "approvedept", "approvestatus", "approvetime", "approvenumber", "approvecontent", "projecttype", "approvecity", "updatetime", "createtime"}
+	PurField     = []string{"infoid", "item", "itemname", "brandname", "specs", "model", "unitname", "number", "unitprice", "totalprice"}
+	IntentField  = []string{"infoid", "projectname", "projectscope", "item", "buyer_id", "totalprice", "expurasingtime", "reserved_amount", "updatetime", "createtime"}
+	WinnerField  = []string{"infoid", "winner_id", "winnersort", "package_id", "createtime", "updatetime"}
+	PackageField = []string{"infoid", "packagename", "packagename", "packageorgin", "packagedetail", "budget", "bidamount", "winner_id", "updatetime", "createtime"}
+
+	ProTagsField = []string{"projectid", "labelcode", "labelvalues", "labelweight", "createtime"}
+
+	RelationField = []string{"projectid", "infoid", "name_id", "contact_id", "identity_type", "createtime"}
+)
+
+// InitLog @Description
+// @Author J 2022/7/26 15:30
+func InitLog() {
+	logcfg := config.Conf.Log
+
+	err := log.InitLog(
+		log.Path(logcfg.LogPath),
+		log.Level(logcfg.LogLevel),
+		log.Compress(logcfg.Compress),
+		log.MaxSize(logcfg.MaxSize),
+		log.MaxBackups(logcfg.MaxBackups),
+		log.MaxAge(logcfg.MaxAge),
+		log.Format(logcfg.Format),
+	)
+	if err != nil {
+		fmt.Printf("InitLog failed: %v\n", err)
+		os.Exit(1)
+	}
+}
+
+func InitMgo() {
+	MongoB = &mongodb.MongodbSim{
+		MongodbAddr: config.Conf.DB.MongoB.Addr,
+		DbName:      config.Conf.DB.MongoB.Dbname,
+		Size:        config.Conf.DB.MongoB.Size,
+		UserName:    config.Conf.DB.MongoB.User,
+		Password:    config.Conf.DB.MongoB.Password,
+	}
+	MongoB.InitPool()
+	MongoP = &mongodb.MongodbSim{
+		MongodbAddr: config.Conf.DB.MongoP.Addr,
+		DbName:      config.Conf.DB.MongoP.Dbname,
+		Size:        config.Conf.DB.MongoP.Size,
+		UserName:    config.Conf.DB.MongoP.User,
+		Password:    config.Conf.DB.MongoP.Password,
+	}
+	MongoP.InitPool()
+}
+
+func InitMysql() {
+	dbcfg := config.Conf.DB.Mysql
+	MysqlTool = &mysqldb.Mysql{
+		Address:  dbcfg.Addr,
+		DBName:   dbcfg.Dbname,
+		UserName: dbcfg.User,
+		PassWord: dbcfg.Password,
+	}
+	MysqlTool.Init()
+}
+
+func InitField() {
+	info := MysqlTool.Find("code_area", nil, "", "", -1, -1)
+	for _, m := range *info {
+		var key string
+		for i, v := range []string{"area", "city", "district"} {
+			if i == 0 && util.ObjToString(m[v]) != "" {
+				key = util.ObjToString(m[v])
+			} else if util.ObjToString(m[v]) != "" {
+				key += "," + util.ObjToString(m[v])
+			}
+		}
+		AreaCode[key] = util.ObjToString(m["code"])
+	}
+	log.Info("InitField", zap.Int("AreaCode", len(AreaCode)))
+
+	info1 := MysqlTool.Find("code_bidtopsubtype", nil, "", "", -1, -1)
+	for _, m := range *info1 {
+		if util.IntAll(m["level"]) == 1 {
+			TopTypeCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+		}
+		if util.IntAll(m["level"]) == 2 {
+			SubTypeCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+		}
+	}
+	log.Info("InitField", zap.Int("TopTypeCode", len(TopTypeCode)), zap.Int("SubTypeCode", len(SubTypeCode)))
+
+	info2 := MysqlTool.Find("code_buyerclass", nil, "", "", -1, -1)
+	for _, m := range *info2 {
+		BuyerCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+	}
+	log.Info("InitField", zap.Int("BuyerCode", len(BuyerCode)))
+
+	info3 := MysqlTool.Find("code_bidscope", nil, "", "", -1, -1)
+	for _, m := range *info3 {
+		if util.IntAll(m["level"]) == 1 {
+			TopScopeCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+		}
+		if util.IntAll(m["level"]) == 2 {
+			SubScopeCode[util.ObjToString(m["name"])] = util.ObjToString(m["code"])
+		}
+	}
+	log.Info("InitField", zap.Int("TopScopeCode", len(TopScopeCode)), zap.Int("SubScopeCode", len(SubScopeCode)))
+
+	util.ReadConfig("./field-criteria.json", &FCriteria)
+	if len(FCriteria) > 0 {
+		if m, o := FCriteria["dws_f_bid_baseinfo"].(map[string]interface{}); o {
+			BaseField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+			BaseVMap = m["field_criteria"].(map[string]interface{})
+		} else {
+			log.Error("InitField", zap.String("field_array", "dws_f_bid_baseinfo"))
+			os.Exit(-1)
+		}
+		if m, o := FCriteria["dws_f_project_baseinfo"].(map[string]interface{}); o {
+			ProField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+			ProVMap = m["field_criteria"].(map[string]interface{})
+		} else {
+			log.Error("InitField", zap.String("field_array", "dws_f_project_baseinfo"))
+			os.Exit(-1)
+		}
+		if m, o := FCriteria["dws_f_project_business"].(map[string]interface{}); o {
+			ProBusField = util.ObjArrToStringArr(m["field_array"].([]interface{}))
+			ProBusVMap = m["field_criteria"].(map[string]interface{})
+		} else {
+			log.Error("InitField", zap.String("field_array", "dws_f_project_business"))
+			os.Exit(-1)
+		}
+	} else {
+		log.Error("InitField, 未找到field-criteria.json文件")
+		os.Exit(-1)
+	}
+}

+ 755 - 0
data_tidb/main.go

@@ -0,0 +1,755 @@
+package main
+
+import (
+	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
+	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"app.yhyue.com/data_processing/common_utils/redis"
+	"data_tidb/config"
+	"fmt"
+	"github.com/spf13/cobra"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.uber.org/zap"
+	"sync"
+	"time"
+)
+
+func init() {
+	config.Init("./common.toml")
+	InitLog()
+	InitMgo()
+	InitMysql()
+	InitField()
+
+	redis.InitRedis1("qyxy_id=127.0.0.1:4379", 1)
+	//redis.InitRedis1("qyxy_id=192.168.3.166:4379", 1)
+	log.Info("init success")
+}
+
+func main() {
+
+	rootCmd := &cobra.Command{Use: "my cmd"}
+	rootCmd.AddCommand(bidding())
+	rootCmd.AddCommand(project())
+	if err := rootCmd.Execute(); err != nil {
+		fmt.Println("rootCmd.Execute failed", err.Error())
+	}
+
+	//taskMysql()
+	//taskMgo()
+
+	c := make(chan bool, 1)
+	<-c
+
+}
+
+func taskMysql() {
+	pool := make(chan bool, 5) //控制线程数
+	wg := &sync.WaitGroup{}
+
+	finalId := 0
+	//lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo"))
+	lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation"))
+	if len(*lastInfo) > 0 {
+		finalId = util.IntAll((*lastInfo)[0]["id"])
+	}
+	log.Info("查询最后id---", zap.Int("finally id: ", finalId))
+	lastid, count := 0, 0
+	for {
+		log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
+		q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation", lastid)
+		//q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
+		//q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
+		rows, err := MysqlTool.DB.Query(q)
+		if err != nil {
+			log.Error("mysql query err ", zap.Error(err))
+		}
+		columns, err := rows.Columns()
+		if finalId == lastid {
+			log.Info("----finish-----", zap.Int("count: ", count))
+			break
+		}
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				log.Error("mysql scan err ", zap.Error(err))
+				break
+			}
+			for i, col := range values {
+				if col == nil {
+					ret[columns[i]] = nil
+				} else {
+					switch val := (*scanArgs[i].(*interface{})).(type) {
+					case byte:
+						ret[columns[i]] = val
+						break
+					case []byte:
+						v := string(val)
+						switch v {
+						case "\x00": // 处理数据类型为bit的情况
+							ret[columns[i]] = 0
+						case "\x01": // 处理数据类型为bit的情况
+							ret[columns[i]] = 1
+						default:
+							ret[columns[i]] = v
+							break
+						}
+						break
+					case time.Time:
+						if val.IsZero() {
+							ret[columns[i]] = nil
+						} else {
+							ret[columns[i]] = val.Format("2006-01-02 15:04:05")
+						}
+						break
+					default:
+						ret[columns[i]] = val
+					}
+				}
+			}
+			lastid = util.IntAll(ret["id"])
+			count++
+			if count%20000 == 0 {
+				log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
+			}
+			pool <- true
+			wg.Add(1)
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				cid := util.Int64All(tmp["id"])
+				pid := util.ObjToString(tmp["projectid"])
+				name_id := util.ObjToString(tmp["name_id"])
+				identity_type := util.Int64All(tmp["identity_type+0"])
+				if name_id != "" {
+					pinfo, _ := MongoP.FindById("projectset", pid, bson.M{"ids": 1})
+					if len(*pinfo) > 0 {
+						for _, id := range util.ObjArrToStringArr((*pinfo)["ids"].([]interface{})) {
+							coll := "bidding"
+							//if id > "5a862e7040d2d9bbe88e3b1f" {
+							//	coll = "bidding"
+							//} else {
+							//	coll = "bidding_back"
+							//}
+							info, _ := MongoB.FindById(coll, id, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
+							if len(*info) > 0 {
+								if identity_type == 1 {
+									if util.ObjToString((*info)["buyertel"]) != "" && util.ObjToString((*info)["buyerperson"]) != "" {
+										q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["buyerperson"]), "contact_tel": util.ObjToString((*info)["buyertel"])}
+										cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+										if cinfo != nil && len(*cinfo) > 0 {
+											MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+											break
+										}
+									}
+								} else if identity_type == 2 {
+									if util.ObjToString((*info)["winnertel"]) != "" && util.ObjToString((*info)["winnerperson"]) != "" {
+										if util.ObjToString((*info)["winnertel"]) != "" && util.ObjToString((*info)["winnerperson"]) != "" {
+											q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["winnerperson"]), "contact_tel": util.ObjToString((*info)["winnertel"])}
+											cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+											if cinfo != nil && len(*cinfo) > 0 {
+												MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+												break
+											}
+										}
+									}
+								} else if identity_type == 4 {
+									if util.ObjToString((*info)["agencytel"]) != "" && util.ObjToString((*info)["agencyperson"]) != "" {
+										if util.ObjToString((*info)["agencytel"]) != "" && util.ObjToString((*info)["agencyperson"]) != "" {
+											q := map[string]interface{}{"name_id": name_id, "identity_type": identity_type, "contact_name": util.ObjToString((*info)["agencyperson"]), "contact_tel": util.ObjToString((*info)["agencytel"])}
+											cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+											if cinfo != nil && len(*cinfo) > 0 {
+												MysqlTool.Update("dws_f_bpmc_relation", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+												break
+											}
+										}
+									}
+								}
+							}
+						}
+					}
+				}
+				//redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
+			}(ret)
+			ret = make(map[string]interface{})
+		}
+		_ = rows.Close()
+		wg.Wait()
+	}
+}
+
+func taskMgo() {
+	sess := MongoP.GetMgoConn()
+	defer MongoP.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 3)
+	wg := &sync.WaitGroup{}
+
+	q := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId("63411488911e1eb3459fb87e")}}
+	field := map[string]interface{}{"ids": 1}
+	query := sess.DB("qfw").C("projectset_20220721").Find(q).Select(field).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			util.Debug("current ---", count, tmp["_id"])
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			id := mongodb.BsonIdToSId(tmp["_id"])
+			for _, i := range util.ObjArrToStringArr(tmp["ids"].([]interface{})) {
+				redis.PutCKV("s_id", i, id)
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+
+	util.Debug("over ---", count)
+}
+
+// @Description 标讯数据
+// @Author J 2022/9/20 17:52
+func bidding() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "bidding",
+		Short: "Start processing bidding data",
+		Run: func(cmd *cobra.Command, args []string) {
+			go SaveFunc()
+			//go SaveTagFunc()
+			//go SaveExpandFunc()
+			//go SaveAttrFunc()
+			//go SaveImfFunc()
+			//go SaveIntentFunc()
+			//go SaveWinnerFunc()
+			//go SavePackageFunc()
+			//go SavePurFunc()
+
+			go saveErrMethod()
+			taskB()
+		},
+	}
+	//cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
+	return cmdClient
+}
+
+// @Description 项目数据
+// @Author J 2022/9/20 17:52
+func project() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "project",
+		Short: "Start processing project data",
+		Run: func(cmd *cobra.Command, args []string) {
+			go SaveProFunc()
+			go SaveProTagFunc()
+			go SaveProbFunc()
+			go SaveRelationFunc()
+			taskP()
+		},
+	}
+	//cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
+	return cmdClient
+}
+
+func SaveFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveBasePool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveBaseSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBaseSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_baseinfo_new", BaseField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveBaseSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveBaseSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_baseinfo_new", BaseField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveExpandFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveExpandPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveExpandSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveExpandSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveExpandSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveExpandSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_expand_baseinfo", ExpandField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveTagFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveTagPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveTagSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveTagSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveTagSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveTagSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_tags", TagsField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveAttrFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveAttrPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveAttrSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveAttrSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveAttrSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveAttrSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_filetext_baseinfo", AttrField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveImfFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveIfmPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveIfmSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveIfmSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveIfmSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveIfmSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_infoformat_baseinfo", IfmField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SavePurFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-savePurPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				savePurSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePurSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				savePurSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePurSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_purchasing_baseinfo", PurField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveIntentFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveIntentPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveIntentSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveIntentSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveIntentSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveIntentSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_intention_baseinfo", IntentField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveWinnerFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveWinnerPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveWinnerSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveWinnerSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveWinnerSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveWinnerSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_winner_baseinfo", WinnerField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SavePackageFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-savePkgPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				savePkgSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePkgSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				savePkgSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-savePkgSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bid_package_baseinfo", PackageField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveProFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveProPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveProSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveProSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_baseinfo", ProField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveProbFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveProbPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveProbSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProbSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveProbSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProbSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_business", ProBusField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveProTagFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveProTagPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveProTagSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProTagSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveProTagSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveProTagSp
+					}()
+					MysqlTool.InsertBulk("dws_f_project_tags", ProTagsField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+func SaveRelationFunc() {
+	arru := make([]map[string]interface{}, saveSize)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveRelationPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveRelationSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveRelationSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveRelationSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveRelationSp
+					}()
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}
+
+// 字段错误数据
+func saveErrMethod() {
+	arru := make([]map[string]interface{}, 200)
+	indexu := 0
+	for {
+		select {
+		case v := <-saveErrPool:
+			arru[indexu] = v
+			indexu++
+			if indexu == saveSize {
+				saveErrSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveErrSp
+					}()
+					MongoB.SaveBulk("bidding_tidb_f_err", arru...)
+				}(arru)
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		case <-time.After(1000 * time.Millisecond):
+			if indexu > 0 {
+				saveErrSp <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-saveErrSp
+					}()
+					MongoB.SaveBulk("bidding_tidb_f_err", arru...)
+				}(arru[:indexu])
+				arru = make([]map[string]interface{}, saveSize)
+				indexu = 0
+			}
+		}
+	}
+}

+ 322 - 0
data_tidb/project.go

@@ -0,0 +1,322 @@
+package main
+
+import (
+	util "app.yhyue.com/data_processing/common_utils"
+	"app.yhyue.com/data_processing/common_utils/log"
+	"app.yhyue.com/data_processing/common_utils/mongodb"
+	"app.yhyue.com/data_processing/common_utils/redis"
+	"data_tidb/config"
+	"fmt"
+	"go.uber.org/zap"
+	"strings"
+	"sync"
+	"time"
+)
+
+func taskP() {
+	sess := MongoP.GetMgoConn()
+	defer MongoP.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+
+	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5c4044d4a5cb26b9b7b963cc")}
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset").Find(nil).Sort("_id").Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			taskPro(tmp)
+			taskBusiness(tmp)
+			taskProTag(tmp)
+
+			taskRelation(tmp)
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
+var BidStatus = map[string]int{
+	"预告": 0,
+	"拟建": 1,
+	"招标": 2,
+	"中标": 3,
+	"成交": 4,
+	"废标": 5,
+	"流标": 6,
+	"合同": 7,
+	"其它": 8,
+}
+var BidType = map[string]int{
+	"招标": 0,
+	"邀标": 1,
+	"单一": 2,
+	"竞价": 3,
+	"竞谈": 4,
+	"询价": 5,
+}
+
+// @Description 基础信息
+// @Author J 2022/9/22 18:32
+func taskPro(tmp map[string]interface{}) {
+	saveM := make(map[string]interface{})
+	for _, f := range ProField {
+		if f == "projectid" {
+			saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+		} else if f == "area_code" {
+			if tmp["area"] != nil {
+				saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
+			}
+		} else if f == "city_code" {
+			if tmp["area"] != nil && tmp["city"] != nil {
+				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
+				saveM[f] = AreaCode[c]
+			}
+		} else if f == "district_code" {
+			if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
+				c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
+				saveM[f] = AreaCode[c]
+			}
+		} else if f == "updatetime" {
+			saveM[f] = time.Now().Format(util.Date_Full_Layout)
+		} else if f == "buyerclass_code" {
+			if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
+				saveM[f] = BuyerCode[obj]
+			}
+		} else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
+			if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
+				t := util.Int64All(tmp[f])
+				saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+			}
+		} else if f == "bidstatus" {
+			if b := util.ObjToString(tmp[f]); b != "" {
+				tmp[f] = BidStatus[b]
+			}
+		} else if f == "bidtype" {
+			if b := util.ObjToString(tmp[f]); b != "" {
+				tmp[f] = BidType[b]
+			}
+		} else if f == "multipackage" {
+			if tmp[f] == nil {
+				saveM[f] = 0
+			} else {
+				saveM[f] = tmp[f]
+			}
+		} else if f == "buyer_id" {
+			if b := util.ObjToString(tmp["buyer"]); b != "" {
+				if code := redis.GetStr("qyxy_id", b); code != "" {
+					saveM[f] = code
+				}
+			}
+		} else if f == "agency_id" {
+			if b := util.ObjToString(tmp["agency"]); b != "" {
+				if code := redis.GetStr("qyxy_id", b); code != "" {
+					saveM[f] = code
+				}
+			}
+		} else {
+			if tmp[f] != nil {
+				if ProVMap[f] != nil {
+					saveM[f], _ = verifyF(f, tmp[f], ProVMap[f])
+				} else {
+					saveM[f] = tmp[f]
+				}
+			}
+		}
+	}
+	saveProPool <- saveM
+}
+
+// @Description 项目业务表
+// @Author J 2022/9/30 13:40
+func taskBusiness(tmp map[string]interface{}) {
+	warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
+	if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
+		warr = append(warr, util.ObjToString(tmp["winner"]))
+	}
+	for _, s := range warr {
+		saveM := make(map[string]interface{})
+		for _, f := range ProBusField {
+			if f == "projectid" {
+				saveM[f] = mongodb.BsonIdToSId(tmp["_id"])
+			} else if f == "area_code" {
+				if tmp["area"] != nil {
+					saveM[f] = AreaCode[util.ObjToString(tmp["area"])]
+				}
+			} else if f == "city_code" {
+				if tmp["area"] != nil && tmp["city"] != nil {
+					c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"])
+					saveM[f] = AreaCode[c]
+				}
+			} else if f == "district_code" {
+				if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil {
+					c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"])
+					saveM[f] = AreaCode[c]
+				}
+			} else if f == "updatetime" {
+				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+			} else if f == "buyerclass_code" {
+				if obj := util.ObjToString(tmp["buyerclass"]); obj != "" {
+					saveM[f] = BuyerCode[obj]
+				}
+			} else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" {
+				if tmp[f] != nil && util.Int64All(tmp[f]) > 0 {
+					t := util.Int64All(tmp[f])
+					saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
+				}
+			} else if f == "bidstatus" {
+				if b := util.ObjToString(tmp[f]); b != "" {
+					tmp[f] = BidStatus[b]
+				}
+			} else if f == "bidtype" {
+				if b := util.ObjToString(tmp[f]); b != "" {
+					tmp[f] = BidType[b]
+				}
+			} else if f == "buyer_id" {
+				if b := util.ObjToString(tmp["buyer"]); b != "" {
+					saveM["buyer"] = b
+					if code := redis.GetStr("qyxy_id", b); code != "" {
+						saveM[f] = code
+					}
+				}
+			} else if f == "agency_id" {
+				if b := util.ObjToString(tmp["agency"]); b != "" {
+					saveM["agency"] = b
+					if code := redis.GetStr("qyxy_id", b); code != "" {
+						saveM[f] = code
+					}
+				}
+			} else if f == "winner_id" {
+				if s != "" {
+					saveM["winner"] = s
+					if code := redis.GetStr("qyxy_id", s); code != "" {
+						saveM[f] = code
+					}
+				}
+			} else {
+				if tmp[f] != nil {
+					if ProBusVMap[f] != nil {
+						saveM[f], _ = verifyF(f, tmp[f], ProBusVMap[f])
+					} else {
+						saveM[f] = tmp[f]
+					}
+				}
+			}
+		}
+		saveProbPool <- saveM
+	}
+
+}
+
+// @Description 项目信息标签
+// @Author J 2022/9/30 13:54
+func taskProTag(tmp map[string]interface{}) {
+	id := mongodb.BsonIdToSId(tmp["_id"])
+	if topArr, ok := tmp["topscopeclass"].([]interface{}); ok {
+		for _, i2 := range topArr {
+			tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母
+			code := TopScopeCode[tclass]
+			saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+		}
+	}
+	if subArr, ok := tmp["subscopeclass"].([]interface{}); ok {
+		for _, i2 := range subArr {
+			sc := strings.Split(util.ObjToString(i2), "_")
+			code := SubScopeCode[sc[1]]
+			saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)}
+		}
+	}
+}
+
+// @Description 关系表
+// @Author J 2022/9/30 13:56
+func taskRelation(tmp map[string]interface{}) {
+	pid := mongodb.BsonIdToSId(tmp["_id"])
+	if tmp["ids"] == nil {
+		log.Info("taskRelation ids err", zap.Any("id", pid))
+		return
+	}
+	ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
+	lid := ids[len(ids)-1]
+
+	if b := util.ObjToString(tmp["buyer"]); b != "" {
+		saveM := make(map[string]interface{})
+		for _, f := range RelationField {
+			if f == "projectid" {
+				saveM[f] = pid
+			} else if f == "infoid" {
+				saveM[f] = lid
+			} else if f == "name_id" {
+				if code := redis.GetStr("qyxy_id", b); code != "" {
+					saveM[f] = code
+				}
+			} else if f == "contract_id" {
+				// todo
+			} else if f == "identity_type" {
+				saveM[f] = 1 // 001
+			} else if f == "createtime" {
+				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+			}
+		}
+		saveRelationPool <- saveM
+	}
+
+	if a := util.ObjToString(tmp["agency"]); a != "" {
+		saveM := make(map[string]interface{})
+		for _, f := range RelationField {
+			if f == "projectid" {
+				saveM[f] = pid
+			} else if f == "infoid" {
+				saveM[f] = lid
+			} else if f == "name_id" {
+				if code := redis.GetStr("qyxy_id", a); code != "" {
+					saveM[f] = code
+				}
+			} else if f == "contract_id" {
+
+			} else if f == "identity_type" {
+				saveM[f] = 4 // 100
+			} else if f == "createtime" {
+				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+			}
+		}
+		saveRelationPool <- saveM
+	}
+
+	warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
+	if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
+		warr = append(warr, util.ObjToString(tmp["winner"]))
+	}
+	for _, ws := range warr {
+		saveM := make(map[string]interface{})
+		for _, f := range RelationField {
+			if f == "projectid" {
+				saveM[f] = pid
+			} else if f == "infoid" {
+				saveM[f] = lid
+			} else if f == "name_id" {
+				if code := redis.GetStr("qyxy_id", ws); code != "" {
+					saveM[f] = code
+				}
+			} else if f == "contract_id" {
+
+			} else if f == "identity_type" {
+				saveM[f] = 2 // 010
+			} else if f == "createtime" {
+				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+			}
+		}
+		saveRelationPool <- saveM
+	}
+}

+ 55 - 0
data_tidb/util.go

@@ -0,0 +1,55 @@
+package main
+
+import (
+	util "app.yhyue.com/data_processing/common_utils"
+	"reflect"
+	"strings"
+	"unicode/utf8"
+)
+
+// @Description 字段验证、处理
+// @Author J 2022/10/8 17:16
+func verifyF(f string, v interface{}, tmp1 interface{}) (interface{}, bool) {
+	tmp := tmp1.(map[string]interface{})
+	stype := util.ObjToString(tmp["stype"])
+	vt := reflect.TypeOf(v).String()
+	if strings.Contains(vt, stype) {
+		switch stype {
+		case "string":
+			v1 := util.ObjToString(v)
+			lens := util.IntAll(tmp["length"])
+			if tmp["intercept"].(bool) {
+				if utf8.RuneCountInString(v1) > lens {
+					return string(([]rune(v1))[:lens]), true
+				} else {
+					return v1, false
+				}
+			} else {
+				if utf8.RuneCountInString(v1) > lens {
+					return "", true
+				} else {
+					return v1, false
+				}
+			}
+		case "float":
+			v1 := util.Float64All(v)
+			min := util.Float64All(tmp["mix"])
+			max := util.Float64All(tmp["max"])
+			if min != 0 && v1 < min {
+				return nil, true
+			}
+			if max != 0 && v1 >= max {
+				return nil, true
+			}
+			dm := util.IntAll(tmp["decimal"]) //小数点位数
+			v2, err := util.FormatFloat(v1, dm)
+			if err != nil {
+				return nil, true
+			}
+			return v2, false
+		default:
+			break
+		}
+	}
+	return nil, true
+}

BIN
field_py/field_dispose_linux


+ 12 - 3
field_py/task.go

@@ -46,7 +46,7 @@ func getIntention(gtid, lteid string, mapinfo map[string]interface{}) {
 		"site":        1,
 		"detail":      1,
 	}
-	log.Info(fmt.Sprintf("%d", MgoB.Count("bidding", query)))
+	log.Info(fmt.Sprintf("count --- %d", MgoB.Count("bidding", query)))
 	it := sess.DB(config.Conf.DB.Mongo.Dbname).C("bidding").Find(&query).Select(&field).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
@@ -199,7 +199,8 @@ func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
 	atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
 	//处理数据
 	addr := ip + ":" + fmt.Sprint(port)
-	log.Info("rpc", zap.String("addr", addr), zap.String("str", reqStr))
+	start := time.Now()
+	log.Info("rpc", zap.String("addr", addr))
 	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 	if err != nil {
 		atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
@@ -219,6 +220,10 @@ func rpcGetFieldP(reqStr string) (map[string]interface{}, error) {
 	if json.Unmarshal([]byte(resp.Goods), &result) != nil {
 		return nil, errors.New("Json Unmarshal Error")
 	}
+	if time.Since(start).Minutes() > 5 {
+		// py接口字段识别超过5分钟
+		log.Info("rpcGetFieldP 字段识别超过5min", zap.Any("serve", "goods_service"), zap.Any("reqStr", reqStr), zap.Any("ip+port", addr))
+	}
 	return result, nil
 }
 
@@ -253,7 +258,7 @@ func rpcGetFieldR(reqStr string) (map[string]interface{}, error) {
 	atomic.StoreInt64(&IpGetErrNum, 0) //异常次数重置
 	//处理数据
 	addr := ip + ":" + fmt.Sprint(port)
-	log.Info("rpc", zap.String("addr", addr), zap.String("str", reqStr))
+	start := time.Now()
 	conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
 	if err != nil {
 		atomic.AddInt64(&ExtractDialErrNum, 1) //异常次数+1
@@ -273,6 +278,10 @@ func rpcGetFieldR(reqStr string) (map[string]interface{}, error) {
 	if json.Unmarshal([]byte(resp.Results), &result) != nil {
 		return nil, errors.New("Json Unmarshal Error")
 	}
+	if time.Since(start).Minutes() > 5 {
+		// py接口字段识别超过5分钟
+		log.Info("rpcGetFieldR 字段识别超过5min", zap.Any("serve", "extract_expert_service"), zap.Any("reqStr", reqStr), zap.Any("ip+port", addr))
+	}
 	return result, nil
 }
 

+ 9 - 1
field_sync/common.toml

@@ -7,7 +7,7 @@ fields = ["buyerzipcode", "winnertel", "winnerperson", "contractcode", "winnerad
     "area", "city", "district", "s_winner", "toptype", "subtype", "subscopeclass", "s_subscopeclass", "dataging", "winnerorder", "project_scale",
     "project_duration", "project_timeunit", "project_startdate", "project_completedate", "payway", "contract_guarantee", "bid_guarantee", "qualifies",
     "funds", "review_experts", "bidmethod", "bidendtime", "bidopenaddress", "docamount", "bidway", "agencyrate", "agencyfee", "getdocmethod", "purchasing_tag",
-    "package"
+    "package", "history_updatetime"
 ]
 
 [udp]
@@ -16,6 +16,14 @@ locport = ":1787"
 addr = "127.0.0.1"
 port = 1783
 stype = "bidding"
+[udp.project]
+addr = "127.0.0.1"
+port = 1782
+stype = ""
+[udp.tidb]
+addr = "127.0.0.1"
+port = 1680
+stype = "subject"
 
 [nsq]
 addr = "192.168.3.166:4150"

+ 2 - 0
field_sync/config/conf.go

@@ -39,6 +39,8 @@ type serve struct {
 type udp struct {
 	LocPort string
 	Next    udpNext
+	Project udpNext
+	Tidb    udpNext
 }
 
 type nsq struct {

+ 49 - 6
field_sync/main.go

@@ -55,8 +55,8 @@ func main() {
 	go checkMapJob()
 	go nsqMethod()
 
-	go UpdateBidding()
-	go UpdateExtract()
+	//go UpdateBidding()
+	//go UpdateExtract()
 
 	UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
 	UdpClient.Listen(processUdpMsg)
@@ -127,13 +127,13 @@ type UdpNode struct {
 	retry     int
 }
 
-func NextNode(mapInfo map[string]interface{}) {
+func NextNode(mapInfo map[string]interface{}, stype string) {
 	var next = &net.UDPAddr{
 		IP:   net.ParseIP(config.Conf.Udp.Next.Addr),
 		Port: util.IntAll(config.Conf.Udp.Next.Port),
 	}
-	mapInfo["stype"] = config.Conf.Udp.Next.Stype
-	key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
+	mapInfo["stype"] = stype
+	key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), stype)
 	mapInfo["key"] = key
 	log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
 	datas, _ := json.Marshal(mapInfo)
@@ -142,6 +142,49 @@ func NextNode(mapInfo map[string]interface{}) {
 	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
 }
 
+func NextNodePro(mapInfo map[string]interface{}, stype string) {
+	var next = &net.UDPAddr{
+		IP:   net.ParseIP(config.Conf.Udp.Project.Addr),
+		Port: util.IntAll(config.Conf.Udp.Project.Port),
+	}
+	if stype == "bidding_history" {
+		mapInfo["stype"] = "project_history"
+	} else {
+		mapInfo["stype"] = "project"
+	}
+	key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
+	mapInfo["key"] = key
+	log.Info("udp project node", zap.Any("mapinfo:", mapInfo))
+	datas, _ := json.Marshal(mapInfo)
+	node := &UdpNode{datas, next, time.Now().Unix(), 0}
+	UdpTaskMap.Store(key, node)
+	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
+}
+
+func NextNodeBidData(mapInfo map[string]interface{}) {
+	next := &net.UDPAddr{
+		IP:   net.ParseIP(config.Conf.Udp.Next.Addr),
+		Port: util.IntAll(config.Conf.Udp.Next.Port),
+	}
+	mapInfo["stype"] = "biddingdata"
+	mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
+	datas, _ := json.Marshal(mapInfo)
+	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
+}
+
+func NextNodeTidb(mapInfo map[string]interface{}) {
+	next := &net.UDPAddr{
+		IP:   net.ParseIP(config.Conf.Udp.Tidb.Addr),
+		Port: util.IntAll(config.Conf.Udp.Tidb.Port),
+	}
+	mapInfo["stype"] = "subject"
+	key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
+	datas, _ := json.Marshal(mapInfo)
+	node := &UdpNode{datas, next, time.Now().Unix(), 0}
+	UdpTaskMap.Store(key, node)
+	_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
+}
+
 func UpdateBidding() {
 	arru := make([][]map[string]interface{}, MgoBulkSize)
 	indexu := 0
@@ -231,7 +274,7 @@ func checkMapJob() {
 						}
 					} else {
 						log.Info("udp重发", zap.Any("k:", k))
-						UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
+						//UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
 					}
 				} else if now-node.timestamp > 10 {
 					log.Info("udp任务超时中..", zap.Any("k:", k))

+ 64 - 32
field_sync/task.go

@@ -81,15 +81,11 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 		MgoB.DestoryMongoConn(biddingConn)
 	}
 	log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c))
-	NextNode(mapInfo)
+	NextNode(mapInfo, stype)
+	NextNodePro(mapInfo, stype)
 	if stype == "bidding_history" {
-		var next = &net.UDPAddr{
-			IP:   net.ParseIP(config.Conf.Udp.Next.Addr),
-			Port: util.IntAll(config.Conf.Udp.Next.Port),
-		}
-		mapInfo["stype"] = "biddingdata"
-		datas, _ := json.Marshal(mapInfo)
-		_ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
+		NextNodeBidData(mapInfo) // bidding-data数据
+		NextNodeTidb(mapInfo)    // tidb-企业数据
 	}
 }
 
@@ -97,6 +93,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 	syncNo := 0 //抽取表数据同步数量
 	//对比两张表数据,减少查询次数
 	var compare map[string]interface{}
+	var bidUpdate [][]map[string]interface{}
+	var extUpdate [][]map[string]interface{}
+	//SaveEsLock := &sync.Mutex{}
 	log.Info("start ...")
 	for n, tmp := range infos {
 		tid := mongodb.BsonIdToSId(tmp["_id"])
@@ -164,8 +163,8 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		}
 		// entidlist
 		extractMap := make(map[string]interface{})
-		if tmp["s_winner"] != "" {
-			cid := companyFun(tmp)
+		if update["s_winner"] != "" {
+			cid := companyFun(update)
 			if len(cid) > 0 {
 				tmp["entidlist"] = cid
 				update["entidlist"] = cid
@@ -176,15 +175,28 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 		typeFunc(tmp, update, extractMap)
 		if len(extractMap) > 0 {
 			if extractMap["toptype"] != nil && extractMap["subtype"] == nil {
-				updateExtPool <- []map[string]interface{}{
+				//updateExtPool <- []map[string]interface{}{
+				//	{"_id": tmp["_id"]},
+				//	{"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
+				//}
+				extUpdate = append(extUpdate, []map[string]interface{}{
 					{"_id": tmp["_id"]},
 					{"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
-				}
+				})
 			} else {
-				updateExtPool <- []map[string]interface{}{
+				//updateExtPool <- []map[string]interface{}{
+				//	{"_id": tmp["_id"]},
+				//	{"$set": extractMap},
+				//}
+				extUpdate = append(extUpdate, []map[string]interface{}{
 					{"_id": tmp["_id"]},
 					{"$set": extractMap},
-				}
+				})
+			}
+			if len(extUpdate) >= MgoBulkSize {
+				tmps := extUpdate
+				MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
+				extUpdate = [][]map[string]interface{}{}
 			}
 		}
 		// 附件有效字段
@@ -198,17 +210,42 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			}
 		}
 		if len(update) > 0 {
-			updateBidPool <- []map[string]interface{}{{
+
+			//SaveEsLock.Lock()
+			bidUpdate = append(bidUpdate, []map[string]interface{}{{
 				"_id": tmp["_id"],
 			},
 				{"$set": update},
+			})
+			if len(bidUpdate) >= MgoBulkSize {
+				tmps := bidUpdate
+				MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
+				bidUpdate = [][]map[string]interface{}{}
 			}
+			//SaveEsLock.Unlock()
+			//updateBidPool <- []map[string]interface{}{{
+			//	"_id": tmp["_id"],
+			//},
+			//	{"$set": update},
+			//}
 		}
 		if n%500 == 0 {
 			log.Info("biddingTask", zap.Int("current", n))
 		}
 		tmp = make(map[string]interface{})
 	}
+	//SaveEsLock.Lock()
+	if len(bidUpdate) > 0 {
+		tmps := bidUpdate
+		MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, tmps...)
+		bidUpdate = [][]map[string]interface{}{}
+	}
+	if len(extUpdate) > 0 {
+		tmps := extUpdate
+		MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, tmps...)
+		extUpdate = [][]map[string]interface{}{}
+	}
+	//SaveEsLock.Unlock()
 	return syncNo
 }
 
@@ -441,41 +478,36 @@ func taskinfo(id string) {
 	//处理分类
 	fieldFun(*extractM, update)
 
-	//同时保存到elastic
-	for tk, tv := range update {
-		(*tmp)[tk] = tv
-	}
-
 	extractMap := make(map[string]interface{})
 	if util.ObjToString((*tmp)["s_winner"]) != "" {
 		cid := companyFun(*tmp)
 		if len(cid) > 0 {
-			(*tmp)["entidlist"] = cid
 			update["entidlist"] = cid
 			extractMap["entidlist"] = cid
 		}
-		updateExtPool <- []map[string]interface{}{
-			{"_id": mongodb.StringTOBsonId(id)},
-			{"$set": extractMap},
-		}
+		MgoE.UpdateById(config.Conf.DB.MongoE.Coll, id, map[string]interface{}{"$set": extractMap})
+		//updateExtPool <- []map[string]interface{}{
+		//	{"_id": mongodb.StringTOBsonId(id)},
+		//	{"$set": extractMap},
+		//}
 	}
 
 	// 附件有效字段
 	if i := validFile(*tmp); i != 0 {
 		if i == -1 {
-			(*tmp)["isValidFile"] = false
 			update["isValidFile"] = false
 		} else {
-			(*tmp)["isValidFile"] = true
 			update["isValidFile"] = true
 		}
 	}
+	util.Debug(update)
 	if len(update) > 0 {
-		updateBidPool <- []map[string]interface{}{{
-			"_id": mongodb.StringTOBsonId(id),
-		},
-			{"$set": update},
-		}
+		MgoB.UpdateById(config.Conf.DB.MongoB.Coll, id, map[string]interface{}{"$set": update})
+		//updateBidPool <- []map[string]interface{}{{
+		//	"_id": mongodb.StringTOBsonId(id),
+		//},
+		//	{"$set": update},
+		//}
 	}
 
 	mapinfo := map[string]interface{}{