jianghan7 1 жил өмнө
parent
commit
9f3f948013

+ 3 - 3
data_project/common.toml

@@ -1,6 +1,6 @@
 [serve]
 udp = ":1782"
-thread = 3
+thread = 1
 loadStart = 0
 validdays = 150
 statusdays = 15
@@ -30,8 +30,8 @@ size = 15
 user = ""
 password = ""
 [db.redis]
-addr = "project=192.168.3.207:1679"
-addrQb = "qyxy_buyer=192.168.3.207:1679"
+addr = "project=192.168.3.173:8379"
+addrQb = "qyxy_buyer=192.168.3.173:8379"
 dbQb = 3
 
 [db.es]

+ 1 - 0
data_project/init.go

@@ -319,6 +319,7 @@ type Info struct {
 type ProjectCache struct {
 	Id            primitive.ObjectID `json:"_id"`
 	Ids           []string           `json:"ids,omitempty"`
+	Size          int                `json:"size"`
 	FirstTime     int64              `json:"firsttime,omitempty"`   //项目的最早时间
 	LastTime      int64              `json:"lasttime,omitempty"`    //项目的最后时间
 	ProjectName   string             `json:"projectname,omitempty"` //项目名称

+ 3 - 3
data_project/main.go

@@ -79,7 +79,7 @@ func DealSign() {
 	}
 }
 
-func main() {
+func mainT() {
 
 	P_QL.loadSpiderCode()
 	P_QL.loadSite()
@@ -167,9 +167,9 @@ func main() {
 	}
 }
 
-func mainT() {
+func main() {
 	sid = "626cccaa631ff1ac3d29289e"
-	eid = "646cd2abc994e88c251e5dae"
+	eid = "640aa55d8aea8786d1cd0247"
 	//flag.StringVar(&sid, "sid", "", "开始id")
 	//flag.StringVar(&eid, "eid", "", "结束id")
 	//flag.Parse()

+ 9 - 7
data_project/merge_select.go

@@ -1,7 +1,7 @@
 package main
 
-//根据字符特征打分
-//3为最高分,pj为评级 A AD A  AA AA AB
+// 根据字符特征打分
+// 3为最高分,pj为评级 A AD A  AA AA AB
 func Select(compareStr string, info *Info, compareInfo *ProjectCache) (res, pj int) {
 	//没有可对比的项目名称、或项目编号 //评级
 	if compareNoPass[compareStr] {
@@ -152,12 +152,14 @@ func Select(compareStr string, info *Info, compareInfo *ProjectCache) (res, pj i
 				pj = 2
 			}
 		case "BDD":
-			if info.PNBH > 10 {
-				res = 2
-			} else {
-				res = 1
+			if CheckContain(info.ProjectName, compareInfo.ProjectName) != 3 {
+				if info.PNBH > 10 {
+					res = 2
+				} else {
+					res = 1
+				}
+				pj = 1
 			}
-			pj = 1
 		case "DAA":
 			if info.LenPTC > 8 || info.LenPC > 8 {
 				res = 3

+ 14 - 12
data_project/project.go

@@ -143,12 +143,10 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 			if ComparePlace(compareProject, info) {
 				continue
 			}
-
 			info.PNBH = 0
 			info.PCBH = 0
 			info.PTCBH = 0
 			compareStr, score := comparePNC(info, compareProject)
-
 			resVal, pjVal := Select(compareStr, info, compareProject)
 			//---------------------------------------
 			if resVal > 0 {
@@ -184,12 +182,12 @@ func (p *ProjectTask) startProjectMerge(info *Info, tmp map[string]interface{})
 
 			ex := 0
 			resArr := []*ProjectCache{}
-			for i, res := range resN {
-				choose, e := p.CompareStatus(resN[i], info)
-				if !choose {
-					ex = e
-					resArr = append(resArr, res)
-				}
+			for _, res := range resN {
+				//choose, e := p.CompareStatus(resN[i], info)
+				//if !choose {
+				//	ex = e
+				resArr = append(resArr, res)
+				//}
 			}
 			if len(resArr) > 0 {
 				bFindProject = true
@@ -295,9 +293,10 @@ func (p *ProjectTask) compareBCTABB(info *Info, cp *ProjectCache, diffTime int64
 	}
 	score2 = 0
 	if compareCity == "AA" {
-		if info.District != "" && info.District == cp.District {
-			score2 = 1
-		}
+		score2 = 1
+		//if info.District != "" && info.District == cp.District {
+		//	score2 = 1
+		//}
 	}
 
 	compareTime = "D"
@@ -683,6 +682,7 @@ func (p *ProjectTask) NewCachePinfo(id primitive.ObjectID, thisinfo *Info, bidty
 		Bidamount:   thisinfo.Bidamount,
 		Bidstatus:   bidstatus,
 		Bidtype:     bidtype,
+		Size:        1,
 	}
 	if thisinfo.LenPTC > 5 {
 		p1.MPC = append(p1.MPC, thisinfo.PTC)
@@ -1148,7 +1148,9 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 
 	set["mpn"] = pInfo.MPN
 	set["mpc"] = pInfo.MPC
-	set["size"] = len(pInfo.Ids) + 1
+	pInfo.Size = pInfo.Size + 1
+	set["size"] = pInfo.Size
+
 	if p.currentType == "project" || p.currentType == "project_history" {
 		set["pici"] = p.pici
 	} else {

BIN
data_project/projectset


+ 6 - 6
data_project/update.go

@@ -393,12 +393,12 @@ func (p *ProjectTask) ReMerge(info *Info, tmp map[string]interface{}, tmpPro map
 
 			ex := 0
 			resArr := []*ProjectCache{}
-			for i, res := range resN {
-				choose, e := p.CompareStatus(resN[i], info)
-				if !choose {
-					ex = e
-					resArr = append(resArr, res)
-				}
+			for _, res := range resN {
+				//choose, e := p.CompareStatus(resN[i], info)
+				//if !choose {
+				//	ex = e
+				resArr = append(resArr, res)
+				//}
 			}
 			if len(resArr) > 0 {
 				bFindProject = true

+ 125 - 87
data_tidb/main.go

@@ -34,30 +34,32 @@ func init() {
 }
 
 func main() {
+	//
+	//go SaveFunc()
+	//go SaveTagFunc()
+	//go SaveExpandFunc()
+	//go SaveAttrFunc()
+	//go SaveImfFunc()
+	//go SaveIntentFunc()
+	//go SaveWinnerFunc()
+	//go SavePackageFunc()
+	//go SavePurFunc()
+	//go saveErrMethod()
+
+	rootCmd := &cobra.Command{Use: "my cmd"}
+	rootCmd.AddCommand(bidding())
+	rootCmd.AddCommand(project())
+	rootCmd.AddCommand(projectAdd())
+	if err := rootCmd.Execute(); err != nil {
+		fmt.Println("rootCmd.Execute failed", err.Error())
+	}
 
-	go SaveFunc()
-	go SaveTagFunc()
-	go SaveExpandFunc()
-	go SaveAttrFunc()
-	go SaveImfFunc()
-	go SaveIntentFunc()
-	go SaveWinnerFunc()
-	go SavePackageFunc()
-	go SavePurFunc()
-	go saveErrMethod()
-
-	//rootCmd := &cobra.Command{Use: "my cmd"}
-	//rootCmd.AddCommand(bidding())
-	//rootCmd.AddCommand(project())
-	//if err := rootCmd.Execute(); err != nil {
-	//	fmt.Println("rootCmd.Execute failed", err.Error())
-	//}
-
+	//go SaveRelationFunc()
 	//taskMysql()
 
-	UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
-	UdpClient.Listen(processUdpMsg)
-	log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
+	//UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
+	//UdpClient.Listen(processUdpMsg)
+	//log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
 
 	c := make(chan bool, 1)
 	<-c
@@ -93,17 +95,17 @@ func taskMysql() {
 	pool := make(chan bool, 5) //控制线程数
 	wg := &sync.WaitGroup{}
 
-	finalId := 6500183
-	//lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_ent_baseinfo"))
+	finalId := 0
+	lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation"))
 	//lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s ORDER BY id DESC LIMIT 1", "dws_f_bpmc_relation_new"))
-	//if len(*lastInfo) > 0 {
-	//	finalId = util.IntAll((*lastInfo)[0]["id"])
-	//}
+	if len(*lastInfo) > 0 {
+		finalId = util.IntAll((*lastInfo)[0]["id"])
+	}
 	log.Info("查询最后id---", zap.Int("finally id: ", finalId))
 	lastid, count := 0, 0
 	for {
 		log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
-		q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id > %d ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation_new", lastid)
+		q := fmt.Sprintf("SELECT * FROM %s WHERE id > %d ORDER BY id ASC limit 100000", "dws_f_bpmc_relation", lastid)
 		//q := fmt.Sprintf("SELECT id, projectid, infoid, name_id, identity_type+0 FROM %s WHERE id=61771536 ORDER BY id ASC limit 1000000", "dws_f_bpmc_relation")
 		//q := fmt.Sprintf("SELECT id, name, name_id FROM %s WHERE id>%d ORDER BY id ASC limit 1000000", "dws_f_ent_baseinfo", lastid)
 		rows, err := MysqlTool.DB.Query(q)
@@ -171,65 +173,82 @@ func taskMysql() {
 					<-pool
 					wg.Done()
 				}()
-				cid := util.Int64All(tmp["id"])
-				iid := util.ObjToString(tmp["infoid"])
-				name_id := util.ObjToString(tmp["name_id"])
-				identity_type := util.Int64All(tmp["identity_type+0"])
-				if name_id != "" {
-					coll := "bidding"
-					if iid > "5a862e7040d2d9bbe88e3b1f" {
-						coll = "bidding"
-					} else {
-						coll = "bidding_back"
-					}
-					info, _ := MongoB.FindById(coll, iid, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
-					if len(*info) > 0 {
-						if identity_type == 1 {
-							if util.ObjToString((*info)["buyertel"]) != "" {
-								q := make(map[string]interface{})
-								q["name_id"] = name_id
-								q["identity_type"] = identity_type
-								q["contact_tel"] = util.ObjToString((*info)["buyertel"])
-								if util.ObjToString((*info)["buyerperson"]) != "" {
-									q["contact_name"] = util.ObjToString((*info)["buyerperson"])
-								}
-								cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
-								if cinfo != nil && len(*cinfo) > 0 {
-									MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
-								}
-							}
-						} else if identity_type == 2 {
-							if util.ObjToString((*info)["winnertel"]) != "" {
-								q := make(map[string]interface{})
-								q["name_id"] = name_id
-								q["identity_type"] = identity_type
-								q["contact_tel"] = util.ObjToString((*info)["winnertel"])
-								if util.ObjToString((*info)["winnerperson"]) != "" {
-									q["contact_name"] = util.ObjToString((*info)["winnerperson"])
-								}
-								cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
-								if cinfo != nil && len(*cinfo) > 0 {
-									MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
-								}
-							}
-						} else if identity_type == 4 {
-							if util.ObjToString((*info)["agencytel"]) != "" {
-								q := make(map[string]interface{})
-								q["name_id"] = name_id
-								q["identity_type"] = identity_type
-								q["contact_tel"] = util.ObjToString((*info)["agencytel"])
-								if util.ObjToString((*info)["agencyperson"]) != "" {
-									q["contact_name"] = util.ObjToString((*info)["agencyperson"])
-								}
-								cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
-								if cinfo != nil && len(*cinfo) > 0 {
-									MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
-								}
-							}
-						}
-					}
-				}
+				//cid := util.Int64All(tmp["id"])
+				//iid := util.ObjToString(tmp["infoid"])
+				//name_id := util.ObjToString(tmp["name_id"])
+				//identity_type := util.Int64All(tmp["identity_type+0"])
+				//if name_id != "" {
+				//	coll := "bidding"
+				//	if iid > "5a862e7040d2d9bbe88e3b1f" {
+				//		coll = "bidding"
+				//	} else {
+				//		coll = "bidding_back"
+				//	}
+				//	info, _ := MongoB.FindById(coll, iid, bson.M{"agencytel": 1, "agencyperson": 1, "buyertel": 1, "buyerperson": 1, "winnertel": 1, "winnerperson": 1})
+				//	if len(*info) > 0 {
+				//		if identity_type == 1 {
+				//			if util.ObjToString((*info)["buyertel"]) != "" {
+				//				q := make(map[string]interface{})
+				//				q["name_id"] = name_id
+				//				q["identity_type"] = identity_type
+				//				q["contact_tel"] = util.ObjToString((*info)["buyertel"])
+				//				if util.ObjToString((*info)["buyerperson"]) != "" {
+				//					q["contact_name"] = util.ObjToString((*info)["buyerperson"])
+				//				}
+				//				cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+				//				if cinfo != nil && len(*cinfo) > 0 {
+				//					MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+				//				}
+				//			}
+				//		} else if identity_type == 2 {
+				//			if util.ObjToString((*info)["winnertel"]) != "" {
+				//				q := make(map[string]interface{})
+				//				q["name_id"] = name_id
+				//				q["identity_type"] = identity_type
+				//				q["contact_tel"] = util.ObjToString((*info)["winnertel"])
+				//				if util.ObjToString((*info)["winnerperson"]) != "" {
+				//					q["contact_name"] = util.ObjToString((*info)["winnerperson"])
+				//				}
+				//				cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+				//				if cinfo != nil && len(*cinfo) > 0 {
+				//					MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+				//				}
+				//			}
+				//		} else if identity_type == 4 {
+				//			if util.ObjToString((*info)["agencytel"]) != "" {
+				//				q := make(map[string]interface{})
+				//				q["name_id"] = name_id
+				//				q["identity_type"] = identity_type
+				//				q["contact_tel"] = util.ObjToString((*info)["agencytel"])
+				//				if util.ObjToString((*info)["agencyperson"]) != "" {
+				//					q["contact_name"] = util.ObjToString((*info)["agencyperson"])
+				//				}
+				//				cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+				//				if cinfo != nil && len(*cinfo) > 0 {
+				//					MysqlTool.Update("dws_f_bpmc_relation_new", bson.M{"id": cid}, bson.M{"contact_id": (*cinfo)["id"]})
+				//				}
+				//			}
+				//		}
+				//	}
+				//}
+
 				//redis.PutCKV("qyxy_id", util.ObjToString(tmp["name"]), util.ObjToString(tmp["name_id"]))
+
+				saveM := make(map[string]interface{})
+				if util.ObjToString(tmp["name_id"]) != "" {
+					saveM["name_id"] = util.ObjToString(tmp["name_id"])
+				} else {
+					return
+				}
+				if util.ObjToString(tmp["contact_id"]) != "" {
+					saveM["contact_id"] = util.IntAll(tmp["contact_id"])
+				} else {
+					return
+				}
+				saveM["projectid"] = util.ObjToString(tmp["projectid"])
+				saveM["infoid"] = util.ObjToString(tmp["infoid"])
+				saveM["identity_type"] = tmp["identity_type"]
+				saveRelationPool <- saveM
 			}(ret)
 			ret = make(map[string]interface{})
 		}
@@ -316,6 +335,25 @@ func project() *cobra.Command {
 	return cmdClient
 }
 
+// @Description 项目数据
+// @Author J 2022/9/20 17:52
+func projectAdd() *cobra.Command {
+	var pici int64
+	cmdClient := &cobra.Command{
+		Use:   "project",
+		Short: "Start processing project data",
+		Run: func(cmd *cobra.Command, args []string) {
+			//go SaveProFunc()
+			//go SaveProTagFunc()
+			//go SaveProbFunc()
+			go SaveRelationFunc()
+			taskPAdd(pici)
+		},
+	}
+	cmdClient.Flags().Int64VarP(&pici, "pici", "p", 0, "")
+	return cmdClient
+}
+
 func SaveFunc() {
 	arru := make([]map[string]interface{}, saveSize)
 	indexu := 0
@@ -750,7 +788,7 @@ func SaveRelationFunc() {
 					defer func() {
 						<-saveRelationSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bpmc_relation_new", RelationField, arru...)
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
 				}(arru)
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0
@@ -762,7 +800,7 @@ func SaveRelationFunc() {
 					defer func() {
 						<-saveRelationSp
 					}()
-					MysqlTool.InsertBulk("dws_f_bpmc_relation_new", RelationField, arru...)
+					MysqlTool.InsertBulk("dws_f_bpmc_relation", RelationField, arru...)
 				}(arru[:indexu])
 				arru = make([]map[string]interface{}, saveSize)
 				indexu = 0

+ 221 - 57
data_tidb/project.go

@@ -3,6 +3,7 @@ package main
 import (
 	"data_tidb/config"
 	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
@@ -20,8 +21,8 @@ func taskP() {
 	ch := make(chan bool, 20)
 	wg := &sync.WaitGroup{}
 
-	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("5c4044d4a5cb26b9b7b963cc")}
-	query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(nil).Sort("_id").Iter()
+	//q := map[string]interface{}{"_id": mongodb.StringTOBsonId("64e5a63855d5406905c574e6")}
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(nil).Sort("-_id").Skip(100000).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
 		if count%20000 == 0 {
@@ -47,6 +48,40 @@ func taskP() {
 	log.Info(fmt.Sprintf("over --- %d", count))
 }
 
+func taskPAdd(pici int64) {
+	sess := MongoP.GetMgoConn()
+	defer MongoP.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 20)
+	wg := &sync.WaitGroup{}
+
+	q := bson.M{"pici": bson.M{"$gt": pici}}
+	query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(q).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			//taskPro(tmp)
+			//taskBusiness(tmp)
+			//taskProTag(tmp)
+
+			taskRelation2(tmp)
+
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}
+
 var BidStatus = map[string]int{
 	"预告": 0,
 	"拟建": 1,
@@ -250,73 +285,202 @@ func taskRelation(tmp map[string]interface{}) {
 	ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
 	lid := ids[len(ids)-1]
 
-	if b := util.ObjToString(tmp["buyer"]); b != "" {
-		saveM := make(map[string]interface{})
-		for _, f := range RelationField {
-			if f == "projectid" {
-				saveM[f] = pid
-			} else if f == "infoid" {
-				saveM[f] = lid
-			} else if f == "name_id" {
-				if code := redis.GetStr("qyxy_id", b); code != "" {
-					saveM[f] = code
+	//if b := util.ObjToString(tmp["buyer"]); b != "" {
+	//	saveM := make(map[string]interface{})
+	//
+	//	saveM["projectid"] = pid
+	//	saveM["infoid"] = lid
+	//	saveM["identity_type"] = 1
+	//	saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
+	//	if code := redis.GetStr("qyxy_id", b); code != "" {
+	//		saveM["name_id"] = code
+	//		if util.ObjToString(tmp["buyertel"]) != "" {
+	//			q := make(map[string]interface{})
+	//			q["name_id"] = code
+	//			q["identity_type"] = 1
+	//			q["contact_tel"] = util.ObjToString(tmp["buyertel"])
+	//			if util.ObjToString(tmp["buyerperson"]) != "" {
+	//				q["contact_name"] = util.ObjToString(tmp["buyerperson"])
+	//			}
+	//			cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+	//			if cinfo != nil && len(*cinfo) > 0 {
+	//				saveM["contact_id"] = (*cinfo)["id"]
+	//				saveRelationPool <- saveM
+	//			}
+	//		}
+	//	}
+	//}
+
+	//if a := util.ObjToString(tmp["agency"]); a != "" {
+	//	saveM := make(map[string]interface{})
+	//	saveM["projectid"] = pid
+	//	saveM["infoid"] = lid
+	//	saveM["identity_type"] = 4
+	//	saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
+	//	if code := redis.GetStr("qyxy_id", a); code != "" {
+	//		saveM["name_id"] = code
+	//		if util.ObjToString(tmp["agencytel"]) != "" {
+	//			q := make(map[string]interface{})
+	//			q["name_id"] = code
+	//			q["identity_type"] = 4 // 100
+	//			q["contact_tel"] = util.ObjToString(tmp["agencytel"])
+	//			if util.ObjToString(tmp["agencyperson"]) != "" {
+	//				q["contact_name"] = util.ObjToString(tmp["agencyperson"])
+	//			}
+	//			cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+	//			if cinfo != nil && len(*cinfo) > 0 {
+	//				saveM["contact_id"] = (*cinfo)["id"]
+	//				saveRelationPool <- saveM
+	//			}
+	//		}
+	//	}
+	//}
+
+	for _, item := range tmp["list"].([]interface{}) {
+		item1 := item.(map[string]interface{})
+		sw := util.ObjToString(item1["s_winner"])
+		if !strings.Contains(sw, ",") {
+			if code := redis.GetStr("qyxy_id", sw); code != "" {
+				saveM := make(map[string]interface{})
+				saveM["projectid"] = pid
+				saveM["infoid"] = lid
+				saveM["identity_type"] = 2
+				saveM["createtime"] = time.Now().Format(util.Date_Full_Layout)
+				saveM["name_id"] = code
+				if util.ObjToString(item1["winnertel"]) != "" {
+					q := make(map[string]interface{})
+					q["name_id"] = code
+					q["identity_type"] = 2 // 010
+					q["contact_tel"] = util.ObjToString(item1["winnertel"])
+					if util.ObjToString(item1["winnerperson"]) != "" {
+						q["contact_name"] = util.ObjToString(item1["winnerperson"])
+					}
+					cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+					if cinfo != nil && len(*cinfo) > 0 {
+						saveM["contact_id"] = (*cinfo)["id"]
+						saveRelationPool <- saveM
+					}
 				}
-			} else if f == "contact_id" {
-				// todo
-			} else if f == "identity_type" {
-				saveM[f] = 1 // 001
-			} else if f == "createtime" {
-				saveM[f] = time.Now().Format(util.Date_Full_Layout)
 			}
 		}
-		saveRelationPool <- saveM
 	}
+}
 
-	if a := util.ObjToString(tmp["agency"]); a != "" {
-		saveM := make(map[string]interface{})
-		for _, f := range RelationField {
-			if f == "projectid" {
-				saveM[f] = pid
-			} else if f == "infoid" {
-				saveM[f] = lid
-			} else if f == "name_id" {
-				if code := redis.GetStr("qyxy_id", a); code != "" {
-					saveM[f] = code
-				}
-			} else if f == "contact_id" {
+func taskRelation2(tmp map[string]interface{}) {
 
-			} else if f == "identity_type" {
-				saveM[f] = 4 // 100
-			} else if f == "createtime" {
-				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+	pid := mongodb.BsonIdToSId(tmp["_id"])
+	if tmp["ids"] == nil {
+		log.Info("taskRelation ids err", zap.Any("id", pid))
+		return
+	}
+	info := MysqlTool.Find("dws_f_bpmc_relation", bson.M{"projectid": pid}, "", "", -1, -1)
+	if len(*info) > 0 {
+
+	} else {
+		ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
+		lid := ids[len(ids)-1]
+
+		if b := util.ObjToString(tmp["buyer"]); b != "" {
+			saveM := make(map[string]interface{})
+			for _, f := range RelationField {
+				if f == "projectid" {
+					saveM[f] = pid
+				} else if f == "infoid" {
+					saveM[f] = lid
+				} else if f == "name_id" {
+					if code := redis.GetStr("qyxy_id", b); code != "" {
+						saveM[f] = code
+						if util.ObjToString(tmp["buyertel"]) != "" {
+							q := make(map[string]interface{})
+							q["name_id"] = code
+							q["identity_type"] = 1
+							q["contact_tel"] = util.ObjToString(tmp["buyertel"])
+							if util.ObjToString(tmp["buyerperson"]) != "" {
+								q["contact_name"] = util.ObjToString(tmp["buyerperson"])
+							}
+							cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+							if cinfo != nil && len(*cinfo) > 0 {
+								saveM["contact_id"] = (*cinfo)["id"]
+							}
+						}
+					}
+				} else if f == "identity_type" {
+					saveM[f] = 1 // 001
+				} else if f == "createtime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				}
 			}
+			saveRelationPool <- saveM
 		}
-		saveRelationPool <- saveM
-	}
 
-	warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
-	if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
-		warr = append(warr, util.ObjToString(tmp["winner"]))
-	}
-	for _, ws := range warr {
-		saveM := make(map[string]interface{})
-		for _, f := range RelationField {
-			if f == "projectid" {
-				saveM[f] = pid
-			} else if f == "infoid" {
-				saveM[f] = lid
-			} else if f == "name_id" {
-				if code := redis.GetStr("qyxy_id", ws); code != "" {
-					saveM[f] = code
+		if a := util.ObjToString(tmp["agency"]); a != "" {
+			saveM := make(map[string]interface{})
+			for _, f := range RelationField {
+				if f == "projectid" {
+					saveM[f] = pid
+				} else if f == "infoid" {
+					saveM[f] = lid
+				} else if f == "name_id" {
+					if code := redis.GetStr("qyxy_id", a); code != "" {
+						saveM[f] = code
+						if util.ObjToString(tmp["buyertel"]) != "" {
+							q := make(map[string]interface{})
+							q["name_id"] = code
+							q["identity_type"] = 4
+							q["contact_tel"] = util.ObjToString(tmp["agencytel"])
+							if util.ObjToString(tmp["agencyperson"]) != "" {
+								q["contact_name"] = util.ObjToString(tmp["agencyperson"])
+							}
+							cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+							if cinfo != nil && len(*cinfo) > 0 {
+								saveM["contact_id"] = (*cinfo)["id"]
+							}
+						}
+					}
+				} else if f == "identity_type" {
+					saveM[f] = 4 // 100
+				} else if f == "createtime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
 				}
-			} else if f == "contact_id" {
+			}
+			saveRelationPool <- saveM
+		}
 
-			} else if f == "identity_type" {
-				saveM[f] = 2 // 010
-			} else if f == "createtime" {
-				saveM[f] = time.Now().Format(util.Date_Full_Layout)
+		warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",")
+		if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 {
+			warr = append(warr, util.ObjToString(tmp["winner"]))
+		}
+		for _, ws := range warr {
+			saveM := make(map[string]interface{})
+			for _, f := range RelationField {
+				if f == "projectid" {
+					saveM[f] = pid
+				} else if f == "infoid" {
+					saveM[f] = lid
+				} else if f == "name_id" {
+					if code := redis.GetStr("qyxy_id", ws); code != "" {
+						saveM[f] = code
+						if util.ObjToString(tmp["buyertel"]) != "" {
+							q := make(map[string]interface{})
+							q["name_id"] = code
+							q["identity_type"] = 2
+							q["contact_tel"] = util.ObjToString(tmp["winnertel"])
+							if util.ObjToString(tmp["winnerperson"]) != "" {
+								q["contact_name"] = util.ObjToString(tmp["winnerperson"])
+							}
+							cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "")
+							if cinfo != nil && len(*cinfo) > 0 {
+								saveM["contact_id"] = (*cinfo)["id"]
+							}
+						}
+					}
+				} else if f == "identity_type" {
+					saveM[f] = 2 // 010
+				} else if f == "createtime" {
+					saveM[f] = time.Now().Format(util.Date_Full_Layout)
+				}
 			}
+			saveRelationPool <- saveM
 		}
-		saveRelationPool <- saveM
 	}
 }

BIN
field_py/field_dispose_log


+ 2 - 2
field_py/main.go

@@ -5,7 +5,7 @@ import (
 	"field-dispose/config"
 	"fmt"
 	"go.uber.org/zap"
-	"io/ioutil"
+	"io"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
@@ -168,7 +168,7 @@ func checkMapJob() {
 						res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "python字段识别-send-fail", k.(string)))
 						if err == nil {
 							defer res.Body.Close()
-							read, err := ioutil.ReadAll(res.Body)
+							read, err := io.ReadAll(res.Body)
 							log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
 						}
 					} else {

+ 17 - 7
field_py/task.go

@@ -8,6 +8,7 @@ import (
 	"field-dispose/config"
 	"field-dispose/proto"
 	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/credentials/insecure"
@@ -32,8 +33,6 @@ func getIntention(mapinfo map[string]interface{}) {
 	gtid, _ := mapinfo["gtid"].(string)
 	lteid, _ := mapinfo["lteid"].(string)
 
-	//MgoB.Update("bidding_processing_ids", bson.M{"gtid": gtid}, bson.M{"$set": bson.M{"dataprocess": 2, "updatetime": time.Now().Unix()}}, false, false)
-
 	sess := MgoB.GetMgoConn()
 	defer MgoB.DestoryMongoConn(sess)
 	ch := make(chan bool, config.Conf.Serve.Thread)
@@ -46,14 +45,17 @@ func getIntention(mapinfo map[string]interface{}) {
 	}
 	field := map[string]interface{}{
 		"toptype":     1,
+		"title":       1,
 		"attach_text": 1,
 		"contenthtml": 1,
 		"site":        1,
 		"detail":      1,
 	}
 	log.Info(fmt.Sprintf("count --- %d", MgoB.Count("bidding", query)))
-	it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(&query).Select(&field).Iter()
+	it := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(query).Select(&field).Iter()
 	count := 0
+	clock := sync.RWMutex{}
+	c1, c2 := 0, 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
 		if count%200 == 0 {
 			log.Info("getIntention", zap.Int("current:", count))
@@ -75,13 +77,20 @@ func getIntention(mapinfo map[string]interface{}) {
 				}
 			}
 			if result1 != nil && len(result1) > 0 {
+				clock.Lock()
 				if result1["purchasinglist"] != nil && len(result1["purchasinglist"].([]interface{})) > 0 {
+					c1++
 					update["purchasinglist"] = result1["purchasinglist"]
 					update["purchasing"] = result1["purchasing"]
 				}
 				if result1["procurementlist"] != nil {
+					c2++
 					update["procurementlist"] = result1["procurementlist"]
 				}
+				if result1["purchasingsource"] != nil {
+					update["purchasingsource"] = result1["purchasingsource"]
+				}
+				clock.Unlock()
 			}
 			if len(update) > 0 {
 				updatePool <- []map[string]interface{}{
@@ -99,6 +108,7 @@ func getIntention(mapinfo map[string]interface{}) {
 	}
 	wg.Wait()
 	log.Info("dispose over...", zap.Int("count:", count), zap.String("gtid:", gtid), zap.String("lteid:", lteid))
+	MgoB.Update("bidding_processing_ids", bson.M{"gtid": gtid}, bson.M{"$set": bson.M{"dataprocess": 2, "updatetime": time.Now().Unix(), "pu_size": c1, "pr_size": c2}}, false, false)
 	NextNode(mapinfo)
 }
 
@@ -186,7 +196,7 @@ func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
 	//重试获取ip、port
 	if Skipping {
 		for i := 1; i <= 3; i++ {
-			repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 2})
+			repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
 			if err != nil {
 				continue
 			} else {
@@ -197,7 +207,7 @@ func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
 		}
 	} else {
 		for {
-			repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 2})
+			repl, err := ipClient.Apply(context.Background(), &service.ApplyReqData{Name: "goods_service", Balance: 3})
 			if err != nil {
 				continue
 			} else {
@@ -230,7 +240,7 @@ func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
 	go func(ctx context.Context) {
 		select {
 		case <-ctx.Done():
-			//_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
+			_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
 		case <-time.After(time.Second * 30):
 			// 超时处理
 			//log.Info("rpcGetFieldP 字段识别超过2min", zap.Any("serve", "goods_service"), zap.String("id", id), zap.Any("ip+port", addr))
@@ -258,7 +268,7 @@ func rpcGetFieldP(reqStr, id string) (map[string]interface{}, error) {
 		return nil, errors.New("Json Unmarshal Error")
 	}
 	// 服务中心释放服务
-	//_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
+	_, _ = ipClient.Release(context.Background(), &service.ApplyRepData{Ip: ip, Port: int32(port)})
 	return result, nil
 }
 

BIN
field_sync/field_sync


BIN
field_sync/field_sync_1787_1


+ 2 - 2
field_sync/main.go

@@ -52,8 +52,8 @@ func init() {
 }
 
 func main() {
-	//go checkMapJob()
-	//go nsqMethod()
+	go checkMapJob()
+	go nsqMethod()
 
 	go UpdateBidding()
 	go UpdateExtract()

+ 26 - 17
field_sync/task.go

@@ -5,6 +5,7 @@ import (
 	"field_sync/config"
 	"field_sync/oss"
 	"fmt"
+	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
@@ -28,15 +29,15 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 	defer util.Catch()
 
 	stype := util.ObjToString(mapInfo["stype"])
-	//if stype == "bidding" {
-	//	uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
-	//		"lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
-	//	MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true)
-	//}
-	// 领域标签处理的数据 id段
-	//if stype == "bidding_history" {
-	//	MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
-	//}
+	if stype == "bidding" {
+		uq := bson.M{"gtid": bson.M{"$gte": util.ObjToString(mapInfo["gtid"])},
+			"lteid": bson.M{"$lte": util.ObjToString(mapInfo["lteid"])}}
+		MgoB.Update("bidding_processing_ids", uq, bson.M{"$set": bson.M{"dataprocess": 7, "updatetime": time.Now().Unix()}}, false, true)
+	}
+	//领域标签处理的数据 id段
+	if stype == "bidding_history" {
+		MgoB.Save("field_data_record", map[string]interface{}{"gtid": mapInfo["gtid"], "lteid": mapInfo["lteid"], "status": 0})
+	}
 
 	q, _ := mapInfo["query"].(map[string]interface{})
 	bkey, _ := mapInfo["bkey"].(string)
@@ -89,14 +90,14 @@ func biddingTask(data []byte, mapInfo map[string]interface{}) {
 		MgoB.DestoryMongoConn(biddingConn)
 	}
 	log.Info("bidding sync...over", zap.Int64("all", count), zap.Int("extract sync", c))
-	//NextNode(mapInfo, stype)
-	//NextNodePro(mapInfo, stype)
-	//NextNodeTidb(mapInfo, stype)
-	//if stype == "bidding_history" {
-	//	NextNodeBidData(mapInfo)  // bidding-data数据
-	//	NextNodeTidbQyxy(mapInfo) // tidb-企业数据
-	//	NextNodeHn(mapInfo)
-	//}
+	NextNode(mapInfo, stype)
+	NextNodePro(mapInfo, stype)
+	NextNodeTidb(mapInfo, stype)
+	if stype == "bidding_history" {
+		NextNodeBidData(mapInfo)  // bidding-data数据
+		NextNodeTidbQyxy(mapInfo) // tidb-企业数据
+		NextNodeHn(mapInfo)
+	}
 }
 
 func biddingAllTask(data []byte, mapInfo map[string]interface{}) {
@@ -281,6 +282,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			for _, k := range config.Conf.Serve.FieldS {
 				v1 := compare[k] //extract
 				v2 := tmp[k]     //bidding
+				if k == "project_startdate" || k == "project_completedate" {
+					util.Debug(v1, v2)
+				}
 				if v2 == nil && v1 != nil {
 					update[k] = v1
 				} else if v2 != nil && v1 != nil {
@@ -293,6 +297,9 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 					} else if k == "city" || k == "district" {
 						update[k] = ""
 					} else {
+						if k == "project_startdate" || k == "project_completedate" {
+							util.Debug(v1, v2)
+						}
 						del[k] = 1
 					}
 				}
@@ -372,6 +379,8 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
 			}
 		}
 		if len(update) > 0 {
+			util.Debug(update)
+			util.Debug(del)
 			if len(del) > 0 {
 				bidUpdate = append(bidUpdate, []map[string]interface{}{{
 					"_id": tmp["_id"],

+ 6 - 7
monitor/main.go

@@ -8,7 +8,7 @@ import (
 	"github.com/spf13/cobra"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.uber.org/zap"
-	"io/ioutil"
+	"io"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"monitor/config"
@@ -45,18 +45,17 @@ func biddingFile() *cobra.Command {
 		Run: func(cmd *cobra.Command, args []string) {
 			InitMgo()
 
+			log.Info("bidding_warn start")
 			crn := cron.New()
 			_ = crn.AddFunc("@hourly", func() {
 				taskFile()
+				taskProject()
 			})
 
 			_ = crn.AddFunc("0 */30 * * * ?", func() {
 				taskPy()
 			})
 
-			_ = crn.AddFunc("@hourly", func() {
-				taskProject()
-			})
 			crn.Start()
 
 			c := make(chan bool, 1)
@@ -76,7 +75,7 @@ func taskFile() {
 }
 
 func taskPy() {
-	info, _ := MgoB.Find("bidding_processing_ids", bson.M{"dataprocess": bson.M{"$in": []int{1, 2}}}, nil, bson.M{"count": 1, "dataprocess": 1}, false, -1, -1)
+	info, _ := MgoB.Find("bidding_processing_ids", bson.M{"dataprocess": 1}, nil, bson.M{"count": 1, "dataprocess": 1}, false, -1, -1)
 	count := 0
 	for _, m := range *info {
 		count += util.IntAll(m["count"])
@@ -106,7 +105,7 @@ func SendMsg(content string) {
 	bytesData, _ := json.Marshal(data)
 	req, _ := http.NewRequest("POST", WebUrl, bytes.NewReader(bytesData))
 	resp, _ := client.Do(req)
-	body, _ := ioutil.ReadAll(resp.Body)
+	body, _ := io.ReadAll(resp.Body)
 	log.Info("SendMsg", zap.String("resp", string(body)))
 }
 
@@ -118,6 +117,6 @@ func SendMsg1(content string) {
 	bytesData, _ := json.Marshal(data)
 	req, _ := http.NewRequest("POST", WebUrl1, bytes.NewReader(bytesData))
 	resp, _ := client.Do(req)
-	body, _ := ioutil.ReadAll(resp.Body)
+	body, _ := io.ReadAll(resp.Body)
 	log.Info("SendMsg", zap.String("resp", string(body)))
 }