Jianghan 11 月之前
父节点
当前提交
80592c6a77

+ 2 - 4
data_project/common.toml

@@ -2,7 +2,7 @@
 udp = ":1782"
 thread = 1
 loadStart = 1
-validdays = 150
+validdays = 300
 statusdays = 15
 backupFlag = true
 siteColl = "site"
@@ -30,9 +30,7 @@ size = 15
 user = ""
 password = ""
 [db.redis]
-addr = "project=192.168.3.173:8379"
-addrQb = "qyxy_buyer=192.168.3.173:8379"
-dbQb = 3
+addr = "project=192.168.3.149:1713,qyxy_buyer=192.168.3.149:1713"
 
 [db.es]
 addr = "http://192.168.3.241:9205"

+ 1 - 3
data_project/config/conf.go

@@ -87,9 +87,7 @@ type nsq struct {
 }
 
 type redis struct {
-	Addr   string
-	AddrQb string
-	DbQb   int
+	Addr string
 }
 
 type es struct {

+ 7 - 5
data_project/init.go

@@ -26,7 +26,8 @@ var (
 	SkipSiteList                                   []string
 	BlaskListMap                                   map[string]bool
 
-	RedisCode string
+	RedisProject, RedisBuyer string
+	P_KEY, B_KEY             string
 )
 
 var (
@@ -95,10 +96,11 @@ func init() {
 	udpclient.Listen(processUdpMsg)
 	log.Info("udp init port:" + udpclient.Local)
 
-	//"qyxy_buyer=172.17.4.189:8379"
-	RedisCode = "qyxy_buyer"
-	redis.InitRedis1(config.Conf.DB.Redis.AddrQb, config.Conf.DB.Redis.DbQb) // 采购单位与中标单位初次合作项目
-	redis.InitRedis1(config.Conf.DB.Redis.Addr, 0)
+	RedisProject = "project"
+	P_KEY = "project_detail_%s"
+	RedisBuyer = "qyxy_buyer"
+	B_KEY = "project_buyer_%s"
+	redis.InitRedis(config.Conf.DB.Redis.Addr) // 采购单位与中标单位初次合作项目
 
 	cof := make(map[string]interface{})
 	util.ReadConfig(&cof)

+ 3 - 3
data_project/load_data.go

@@ -102,7 +102,7 @@ func (p *ProjectTask) loadData(starttime int64) {
 			var pc *ProjectCache
 			_ = json.Unmarshal(bys, &pc)
 			saveFiled(p, tmp, pc)
-			redis.PutCKV("project", pc.Id.Hex(), tmp)
+			redis.PutCKV(RedisProject, fmt.Sprintf(P_KEY, pc.Id.Hex()), tmp)
 			pool <- pc
 		}(tmp)
 		tmp = make(map[string]interface{})
@@ -174,9 +174,9 @@ func saveFiled(p *ProjectTask, res map[string]interface{}, tmp *ProjectCache) {
 			for i, eid := range elist {
 				if eid != "-" {
 					text := buyer + "," + wlist[i]
-					ex, _ := redis.Exists(RedisCode, text)
+					ex, _ := redis.Exists(RedisBuyer, text)
 					if !ex {
-						redis.PutCKV(RedisCode, text, tmp.Id.Hex())
+						redis.PutCKV(RedisBuyer, text, tmp.Id.Hex())
 					}
 				}
 			}

+ 4 - 4
data_project/main.go

@@ -79,7 +79,7 @@ func DealSign() {
 	}
 }
 
-func mainT() {
+func main() {
 
 	P_QL.loadSpiderCode()
 	P_QL.loadSite()
@@ -167,9 +167,9 @@ func mainT() {
 	}
 }
 
-func main() {
-	sid = "64f5d956513e65bb3f0e4d4f"
-	eid = "65e1a06866cf0db42adf5a87"
+func mainT() {
+	sid = "65e8263066cf0db42aecce50"
+	eid = "66867b8a66cf0db42a1227d6"
 	//flag.StringVar(&sid, "sid", "", "开始id")
 	//flag.StringVar(&eid, "eid", "", "结束id")
 	//flag.Parse()

+ 6 - 5
data_project/project.go

@@ -2,6 +2,7 @@ package main
 
 import (
 	"encoding/json"
+	"fmt"
 	"go.mongodb.org/mongo-driver/bson"
 	"go.mongodb.org/mongo-driver/bson/primitive"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
@@ -628,7 +629,7 @@ func (p *ProjectTask) NewProject(tmp map[string]interface{}, thisinfo *Info) (st
 	if tmp["bid_field"] != nil {
 		set["bid_field"] = tmp["bid_field"]
 	}
-	redis.PutCKV("project", thisinfo.Id, set)
+	redis.PutCKV(RedisProject, fmt.Sprintf(P_KEY, thisinfo.Id), set)
 	set["size"] = 1
 	push := p.PushListInfo(tmp, thisinfo.Id)
 	if len(thisinfo.Winners) > 0 {
@@ -699,7 +700,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 			return
 		}
 	}
-	pdata := redis.Get("project", pInfo.Id.Hex())
+	pdata := redis.Get(RedisProject, fmt.Sprintf(P_KEY, pInfo.Id.Hex()))
 	if pdata == nil {
 		log.Info("redis err, not find project " + pInfo.Id.Hex())
 		return
@@ -1155,7 +1156,7 @@ func (p *ProjectTask) UpdateProject(tmp map[string]interface{}, thisinfo *Info,
 	} else {
 		set["pici"] = tmp["comeintime"]
 	}
-	redis.PutCKV("project", project.Id.Hex(), set)
+	redis.PutCKV(RedisProject, fmt.Sprintf(P_KEY, project.Id.Hex()), set)
 	update := map[string]interface{}{}
 	if len(set) > 0 {
 		update["$set"] = set
@@ -1525,9 +1526,9 @@ func FirstCooperation(set map[string]interface{}, b string, winns, entidlist []s
 	for i, eid := range entidlist {
 		if eid != "-" {
 			text := b + "," + winns[i]
-			ex, _ := redis.Exists(RedisCode, text)
+			ex, _ := redis.Exists(RedisBuyer, text)
 			if !ex {
-				redis.PutCKV(RedisCode, text, pid)
+				redis.PutCKV(RedisBuyer, text, pid)
 				eids = append(eids, eid)
 			}
 		}

+ 1 - 1
data_project/task.go

@@ -196,7 +196,7 @@ func (p *ProjectTask) clearMem() {
 			for k, v := range p.AllIdsMap {
 				if p.currentTime-v.P.LastTime > p.validTime {
 					clearNum++
-					redis.Del("project", k)
+					redis.Del(RedisProject, k)
 					//删除id的map
 					delete(p.AllIdsMap, k)
 					//删除pb

+ 1 - 0
data_project_information/common.toml

@@ -1,6 +1,7 @@
 [serve]
 udp = ":1782"
 thread = 1
+pici = 0
 
 [db]
 

+ 1 - 0
data_project_information/config/conf.go

@@ -31,6 +31,7 @@ type serve struct {
 	Udp        string
 	LoadStart  int64
 	Thread     int
+	Pici       int64
 	SiteColl   string
 	ValidDays  int
 	StatusDays int

+ 4 - 1
data_project_information/go.mod

@@ -1,11 +1,14 @@
 module data_project_information
 
-go 1.20
+go 1.21
+
+//toolchain go1.22.4
 
 require (
 	github.com/BurntSushi/toml v1.3.2
 	github.com/ClickHouse/clickhouse-go/v2 v2.20.0
 	github.com/olivere/elastic/v7 v7.0.32
+	github.com/robfig/cron/v3 v3.0.1
 	github.com/spf13/cobra v1.8.0
 	go.mongodb.org/mongo-driver v1.11.4
 	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20230627091444-ee2add33ba67

+ 2 - 0
data_project_information/go.sum

@@ -104,6 +104,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
 github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=

+ 25 - 1
data_project_information/main.go

@@ -3,6 +3,7 @@ package main
 import (
 	"data_project_information/config"
 	"fmt"
+	"github.com/robfig/cron/v3"
 	"github.com/spf13/cobra"
 	"time"
 )
@@ -25,6 +26,7 @@ func main() {
 
 	rootCmd := &cobra.Command{Use: "my cmd"}
 	rootCmd.AddCommand(project())
+	rootCmd.AddCommand(projectAdd())
 	rootCmd.AddCommand(tidb())
 	if err := rootCmd.Execute(); err != nil {
 		fmt.Println("rootCmd.Execute failed", err.Error())
@@ -46,7 +48,29 @@ func project() *cobra.Command {
 			task()
 		},
 	}
-	//cmdClient.Flags().StringVarP(&cfg, "conf", "c", "", "server config [toml]")
+	return cmdClient
+}
+
+func projectAdd() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "project-add",
+		Short: "Start processing project data",
+		Run: func(cmd *cobra.Command, args []string) {
+			InitMgo()
+			InitEs()
+
+			go updateFuc()
+			taskAdd()
+			crn := cron.New()
+			_, _ = crn.AddFunc("10 * * * *", func() {
+				taskAdd()
+			})
+			crn.Start()
+
+			ch := make(chan bool, 1)
+			<-ch
+		},
+	}
 	return cmdClient
 }
 

+ 82 - 12
data_project_information/task.go

@@ -15,6 +15,8 @@ import (
 	"time"
 )
 
+var piciLock sync.Mutex
+
 func task() {
 
 	client := Es.GetEsConn()
@@ -23,10 +25,11 @@ func task() {
 	wg := &sync.WaitGroup{}
 
 	tf := []int64{1577808000, 1609430400, 1640966400, 1672502400, 1712851200}
-	fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set") // 查询字段
+	fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form") // 查询字段
 	for i, tm := range tf {
 		query := es.NewBoolQuery().
-			Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
+			//Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
+			Must(es.NewExistsQuery("property_form"))
 		if i == 0 {
 			query.Must(es.NewRangeQuery("comeintime").Lte(tm))
 		} else if i == 1 {
@@ -63,6 +66,47 @@ func task() {
 	}
 }
 
+func taskAdd() {
+
+	client := Es.GetEsConn()
+	defer Es.DestoryEsConn(client)
+
+	wg := &sync.WaitGroup{}
+
+	fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form", "pici") // 查询字段
+	query := es.NewBoolQuery().
+		Should(es.NewTermQuery("tag_topinformation", "情报_物业")).
+		Should(es.NewTermQuery("tag_topinformation_ai", "情报_物业")).MinimumShouldMatch("1")
+	//Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
+	//Must(es.NewExistsQuery("property_form"))
+	if config.Conf.Serve.Pici > 0 {
+		query.Must(es.NewRangeQuery("pici").Gte(config.Conf.Serve.Pici))
+	}
+
+	util.Debug(fmt.Sprintf("数据量为:%d", Es.Count("bidding", query)))
+	countDocs := 0
+	res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
+	if err == nil {
+		taskInfoA(res, wg, &countDocs)
+		scrollId := res.ScrollId
+		for {
+			searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
+			if err != nil {
+				util.Debug("Es Search Data Error:", err.Error())
+				break
+			}
+			taskInfoA(searchResult, wg, &countDocs)
+			scrollId = searchResult.ScrollId
+		}
+		wg.Wait()
+		util.Debug(fmt.Sprintf("处理结束,处理文档%d条", countDocs))
+		util.Debug(config.Conf.Serve.Pici)
+		_, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
+	} else {
+		util.Debug(err)
+	}
+}
+
 func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
 	chd := make(chan bool, 5)
 	for _, hit := range searchResult.Hits.Hits {
@@ -76,20 +120,46 @@ func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int
 			}()
 			tmp := make(map[string]interface{})
 			if json.Unmarshal(tmpHit.Source, &tmp) == nil {
+				piciLock.Lock()
+				if util.Int64All(tmp["pici"]) > config.Conf.Serve.Pici {
+					config.Conf.Serve.Pici = util.Int64All(tmp["pici"])
+				}
+				piciLock.Unlock()
 				update := make(map[string]interface{})
-				update["tag_topinformation"] = tmp["tag_topinformation"]
-				update["tag_subinformation"] = tmp["tag_subinformation"]
+				if tmp["tag_topinformation"] != nil {
+					update["tag_topinformation"] = tmp["tag_topinformation"]
+				}
+				if tmp["tag_subinformation"] != nil {
+					update["tag_subinformation"] = tmp["tag_subinformation"]
+				}
+				if tmp["tag_topinformation_ai"] != nil {
+					update["tag_topinformation"] = tmp["tag_topinformation_ai"]
+				}
+				if tmp["tag_subinformation_ai"] != nil {
+					update["tag_subinformation_ai"] = tmp["tag_subinformation_ai"]
+				}
 				if tmp["tag_set"] != nil {
 					update["tag_set"] = tmp["tag_set"]
 				}
+				if tmp["property_form"] != nil {
+					update["property_form"] = tmp["property_form"]
+				}
 				//updatePool <- []map[string]interface{}{
 				//	{"ids": tmpHit.Id},
-				//	{"$set": update, "$push": bson.M{"tag_information_ids": tmpHit.Id}},
+				//	{"$set": update},
+				//}
+				//if tmp["tag_topinformation"] == nil {
+				//	updatePool <- []map[string]interface{}{
+				//		{"ids": tmpHit.Id},
+				//		{"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id},
+				//			"$unset": bson.M{"tag_topinformation": 1, "tag_subinformation": 1}},
+				//	}
+				//} else {
+				//	updatePool <- []map[string]interface{}{
+				//		{"ids": tmpHit.Id},
+				//		{"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id}},
+				//	}
 				//}
-				updatePool <- []map[string]interface{}{
-					{"ids": tmpHit.Id},
-					{"$set": update},
-				}
 
 			}
 		}(hit)
@@ -108,7 +178,7 @@ func taskT() {
 	ch := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
 
-	q := bson.M{"firsttime": bson.M{"$gte": 1577808000}}
+	q := bson.M{"firsttime": bson.M{"$gte": 1711360551}}
 	query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
@@ -307,10 +377,10 @@ func getStr(b string) string {
 	if b == "" {
 		return ""
 	}
-	a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然\n资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)"
+	a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)"
 	a2 := "(卫健委|医疗)"
 	a3 := "(教育|学校)"
-	a4 := "(人行l金融业)"
+	a4 := "(人行|金融业)"
 	a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)"
 	if strings.Contains(a1, b) {
 		return "政府机构"

+ 32 - 5
data_tidb/main.go

@@ -21,7 +21,8 @@ import (
 var (
 	UdpClient udp.UdpClient
 
-	Pici int64
+	Pici     int64
+	piciLock sync.Mutex
 )
 
 func init() {
@@ -54,6 +55,8 @@ func main() {
 	rootCmd.AddCommand(bidding())
 	rootCmd.AddCommand(relation())
 	rootCmd.AddCommand(projectAdd())
+	rootCmd.AddCommand(redisPid())
+	rootCmd.AddCommand(repairRel())
 	if err := rootCmd.Execute(); err != nil {
 		fmt.Println("rootCmd.Execute failed", err.Error())
 	}
@@ -345,17 +348,19 @@ func projectAdd() *cobra.Command {
 		Use:   "project",
 		Short: "Start processing project data",
 		Run: func(cmd *cobra.Command, args []string) {
+			InitEs()
+
 			//go SaveProFunc()
 			//go SaveProTagFunc()
 			//go SaveProbFunc()
 			go SaveRelationFunc()
 
-			taskPAdd()
-
+			taskR()
 			crn := cron.New()
-			cronstr := "0 10 * * * *" // 每30min执行一次
+			cronstr := "0 30 * * * *" // 每30min执行一次
 			_ = crn.AddFunc(cronstr, func() {
-				taskPAdd()
+				//taskPAdd()
+				taskR()
 			})
 			crn.Start()
 		},
@@ -364,6 +369,28 @@ func projectAdd() *cobra.Command {
 	return cmdClient
 }
 
+func redisPid() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "redis-pid",
+		Short: "Start processing redis pid data",
+		Run: func(cmd *cobra.Command, args []string) {
+			taskRedisPid()
+		},
+	}
+	return cmdClient
+}
+
+func repairRel() *cobra.Command {
+	cmdClient := &cobra.Command{
+		Use:   "repair-relation",
+		Short: "Start processing redis pid data",
+		Run: func(cmd *cobra.Command, args []string) {
+			taskRepair()
+		},
+	}
+	return cmdClient
+}
+
 func SaveFunc() {
 	arru := make([]map[string]interface{}, saveSize)
 	indexu := 0

+ 49 - 9
data_tidb/project.go

@@ -23,14 +23,15 @@ func taskR() {
 
 	wg := &sync.WaitGroup{}
 
-	query := es.NewBoolQuery()
-	//Must(es.NewTermsQuery("id", "64e7685255d5406905c94a64"))
-	//Must(es.NewRangeQuery("comeintime").Gte(1688140800).Lte(1690444635)).
+	query := es.NewBoolQuery().
+		//Must(es.NewTermsQuery("id", "64e7685255d5406905c94a64"))
+		Must(es.NewRangeQuery("comeintime").Gt(Pici))
 	//Must(es.NewExistsQuery("yuceendtime"))
-
+	fsc := es.NewFetchSourceContext(true).Include("buyer", "buyertel", "buyerperson", "agency", "agencytel",
+		"agencyperson", "s_winner", "winnertel", "winnerperson", "id", "pici")
 	util.Debug(Es.Count("bidding", query))
 	countDocs := 0
-	res, err := client.Scroll().Index("bidding").Query(query).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
+	res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
 	if err == nil {
 		taskInfoA(res, wg, &countDocs)
 		scrollId := res.ScrollId
@@ -44,7 +45,7 @@ func taskR() {
 			scrollId = searchResult.ScrollId
 		}
 		wg.Wait()
-		util.Debug("over---", countDocs)
+		util.Debug("over---", countDocs, Pici)
 		_, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
 	} else {
 		util.Debug(err)
@@ -52,7 +53,7 @@ func taskR() {
 }
 
 func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
-	ch := make(chan bool, 1)
+	ch := make(chan bool, 2)
 	for _, hit := range searchResult.Hits.Hits {
 		//开始处理数据
 		wg.Add(1)
@@ -62,9 +63,13 @@ func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int
 				<-ch
 				wg.Done()
 			}()
-
 			tmp := make(map[string]interface{})
 			if json.Unmarshal(tmpHit.Source, &tmp) == nil {
+				if t := util.Int64All(tmp["pici"]); t > Pici {
+					piciLock.Lock()
+					Pici = t
+					piciLock.Unlock()
+				}
 				taskRelation(tmp)
 			}
 		}(hit)
@@ -311,7 +316,7 @@ func taskRelation(tmp map[string]interface{}) {
 	id := util.ObjToString(tmp["id"])
 	pid := ""
 	if str := redis.GetStr("project_ids", id); str == "" {
-		info, _ := MongoP.FindOneByField("projectset_20230407", bson.M{"ids": id}, bson.M{"ids": 1})
+		info, _ := MongoP.FindOneByField("projectset_20230904", bson.M{"ids": id}, bson.M{"ids": 1})
 		if len(*info) > 0 {
 			pid = mongodb.BsonIdToSId((*info)["_id"])
 			for _, s := range util.ObjArrToStringArr((*info)["ids"].([]interface{})) {
@@ -520,3 +525,38 @@ func taskRelation2(tmp map[string]interface{}) {
 		}
 	}
 }
+
+func taskRedisPid() {
+	sess := MongoP.GetMgoConn()
+	defer MongoP.DestoryMongoConn(sess)
+
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+
+	f := bson.M{"ids": 1, "id": 1}
+	query := sess.DB(config.Conf.DB.MongoB.Dbname).C("projectset_20230904").Find(nil).Select(f).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
+		if count%20000 == 0 {
+			log.Info(fmt.Sprintf("current --- %d", count))
+		}
+		if t := util.Int64All(tmp["pici"]); t > Pici {
+			Pici = t
+		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			pid := mongodb.BsonIdToSId(tmp["_id"])
+			for _, i := range tmp["ids"].([]interface{}) {
+				redis.PutCKV("project_ids", util.ObjToString(i), pid)
+			}
+		}(tmp)
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	log.Info(fmt.Sprintf("over --- %d", count))
+}

+ 107 - 0
data_tidb/task.go

@@ -0,0 +1,107 @@
+package main
+
+import (
+	"fmt"
+	"go.uber.org/zap"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
+	"strings"
+	"sync"
+)
+
+func taskRepair() {
+	pool := make(chan bool, 2) //控制线程数
+	wg := &sync.WaitGroup{}
+
+	finalId := 0
+	lastInfo := MysqlTool.SelectBySql(fmt.Sprintf("SELECT * FROM %s ORDER BY id DESC", "global_common_data.dws_f_bpmc_relation"))
+	if len(*lastInfo) > 0 {
+		finalId = util.IntAll((*lastInfo)[0]["id"])
+	}
+	log.Info("查询最后id---", zap.Int("finally id: ", finalId))
+	lastid, count := finalId, 0
+	for {
+		log.Info("重新查询,lastid---", zap.Int("lastid: ", lastid))
+		q := fmt.Sprintf("SELECT * FROM %s WHERE id < %d ORDER BY id DESC limit 1000000", "global_common_data.dws_f_bpmc_relation", lastid)
+		rows, err := MysqlTool.DB.Query(q)
+		if err != nil {
+			log.Error("mysql query err ", zap.Error(err))
+		}
+		columns, err := rows.Columns()
+		//if finalId == lastid {
+		//	log.Info("----finish-----", zap.Int("count: ", count))
+		//	break
+		//}
+		for rows.Next() {
+			scanArgs := make([]interface{}, len(columns))
+			values := make([]interface{}, len(columns))
+			ret := make(map[string]interface{})
+			for k := range values {
+				scanArgs[k] = &values[k]
+			}
+			err = rows.Scan(scanArgs...)
+			if err != nil {
+				log.Error("mysql scan err ", zap.Error(err))
+				break
+			}
+			for i, col := range values {
+				if v, ok := col.([]uint8); ok {
+					ret[columns[i]] = string(v)
+				} else {
+					ret[columns[i]] = col
+				}
+			}
+			lastid = util.IntAll(ret["id"])
+			count++
+			if count%2000 == 0 {
+				log.Info("current----", zap.Int("count: ", count), zap.Int("lastid: ", lastid))
+			}
+			pool <- true
+			wg.Add(1)
+			go func(tmp map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				task1(tmp)
+			}(ret)
+			ret = make(map[string]interface{})
+		}
+		_ = rows.Close()
+		wg.Wait()
+	}
+}
+
+func task1(tmp map[string]interface{}) {
+	sql := `SELECT count(*) FROM global_common_data.dws_f_bpmc_relation WHERE infoid = ? AND projectid = ? AND name_id = ?`
+	sql1 := `SELECT id FROM global_common_data.dws_f_bpmc_relation WHERE infoid = ? AND projectid = ? AND name_id = ?  ORDER BY id`
+	sql2 := `DELETE FROM global_common_data.dws_f_bpmc_relation WHERE id IN (%s)`
+	c := MysqlTool.QueryCount(sql, tmp["infoid"], tmp["projectid"], tmp["name_id"])
+	if c > 1 {
+		info := MysqlTool.Query(sql1, tmp["infoid"], tmp["projectid"], tmp["name_id"])
+		if info != nil && len(*info) > 0 {
+			var ids []int64
+			for i, v := range *info {
+				if i != 0 {
+					ids = append(ids, util.Int64All(v["id"]))
+				}
+			}
+			if len(ids) > 0 {
+				str1, arr1 := WhArgs(ids)
+				log.Info("del---", zap.Any("ids---", ids))
+				MysqlTool.ExecBySql(fmt.Sprintf(sql2, str1), arr1...)
+			}
+		}
+
+	}
+}
+
+func WhArgs(args []int64) (string, []interface{}) {
+	newArgs := make([]interface{}, len(args))
+	wh := make([]string, len(args))
+	for k, v := range args {
+		newArgs[k] = v
+		wh[k] = "?"
+	}
+	return strings.Join(wh, ","), newArgs
+}