package main import ( "data_tidb/config" "fmt" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "strings" "sync" "time" ) func taskP() { sess := MongoP.GetMgoConn() defer MongoP.DestoryMongoConn(sess) ch := make(chan bool, 20) wg := &sync.WaitGroup{} //q := map[string]interface{}{"_id": mongodb.StringTOBsonId("64e5a63855d5406905c574e6")} query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(nil).Sort("-_id").Skip(100000).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() //taskPro(tmp) //taskBusiness(tmp) //taskProTag(tmp) taskRelation(tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func taskPAdd(pici int64) { sess := MongoP.GetMgoConn() defer MongoP.DestoryMongoConn(sess) ch := make(chan bool, 20) wg := &sync.WaitGroup{} q := bson.M{"pici": bson.M{"$gt": pici}} query := sess.DB(config.Conf.DB.MongoP.Dbname).C("projectset_20230407").Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() //taskPro(tmp) //taskBusiness(tmp) //taskProTag(tmp) taskRelation2(tmp) }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } var BidStatus = map[string]int{ "预告": 0, "拟建": 1, "招标": 2, "中标": 3, "成交": 4, "废标": 5, "流标": 6, "合同": 7, "其它": 8, } var BidType = map[string]int{ "招标": 0, "邀标": 1, "单一": 2, "竞价": 3, "竞谈": 4, "询价": 5, } // @Description 基础信息 // @Author J 2022/9/22 18:32 func taskPro(tmp map[string]interface{}) { saveM := make(map[string]interface{}) for _, f := range ProField { if f == "projectid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "area_code" { if tmp["area"] != nil { saveM[f] = AreaCode[util.ObjToString(tmp["area"])] } } else if f == "city_code" { if tmp["area"] != nil && tmp["city"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) saveM[f] = AreaCode[c] } } else if f == "district_code" { if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"]) saveM[f] = AreaCode[c] } } else if f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "buyerclass_code" { if obj := util.ObjToString(tmp["buyerclass"]); obj != "" { saveM[f] = BuyerCode[obj] } } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" { if tmp[f] != nil && util.Int64All(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "bidstatus" { if b := util.ObjToString(tmp[f]); b != "" { tmp[f] = BidStatus[b] } } else if f == "bidtype" { if b := util.ObjToString(tmp[f]); b != "" { tmp[f] = BidType[b] } } else if f == "multipackage" { if tmp[f] == nil { saveM[f] = 0 } else { saveM[f] = tmp[f] } } else if f == "buyer_id" { if b := util.ObjToString(tmp["buyer"]); b != "" { if code := redis.GetStr("qyxy_id", b); code != "" { saveM[f] = code } } } else if f == "agency_id" { if b := util.ObjToString(tmp["agency"]); b != "" { if code := redis.GetStr("qyxy_id", b); code != "" { saveM[f] = code } } } else { if tmp[f] != nil { if ProVMap[f] != nil { saveM[f], _ = verifyF(f, tmp[f], ProVMap[f]) } else { saveM[f] = tmp[f] } } } } saveProPool <- saveM } // @Description 项目业务表 // @Author J 2022/9/30 13:40 func taskBusiness(tmp map[string]interface{}) { warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",") if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 { warr = append(warr, util.ObjToString(tmp["winner"])) } for _, s := range warr { saveM := make(map[string]interface{}) for _, f := range ProBusField { if f == "projectid" { saveM[f] = mongodb.BsonIdToSId(tmp["_id"]) } else if f == "area_code" { if tmp["area"] != nil { saveM[f] = AreaCode[util.ObjToString(tmp["area"])] } } else if f == "city_code" { if tmp["area"] != nil && tmp["city"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) saveM[f] = AreaCode[c] } } else if f == "district_code" { if tmp["area"] != nil && tmp["city"] != nil && tmp["district"] != nil { c := util.ObjToString(tmp["area"]) + "," + util.ObjToString(tmp["city"]) + "," + util.ObjToString(tmp["district"]) saveM[f] = AreaCode[c] } } else if f == "updatetime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } else if f == "buyerclass_code" { if obj := util.ObjToString(tmp["buyerclass"]); obj != "" { saveM[f] = BuyerCode[obj] } } else if f == "firsttime" || f == "zbtime" || f == "jgtime" || f == "lasttime" || f == "bidopentime" || f == "createtime" { if tmp[f] != nil && util.Int64All(tmp[f]) > 0 { t := util.Int64All(tmp[f]) saveM[f] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } else if f == "bidstatus" { if b := util.ObjToString(tmp[f]); b != "" { tmp[f] = BidStatus[b] } } else if f == "bidtype" { if b := util.ObjToString(tmp[f]); b != "" { tmp[f] = BidType[b] } } else if f == "buyer_id" { if b := util.ObjToString(tmp["buyer"]); b != "" { saveM["buyer"] = b if code := redis.GetStr("qyxy_id", b); code != "" { saveM[f] = code } } } else if f == "agency_id" { if b := util.ObjToString(tmp["agency"]); b != "" { saveM["agency"] = b if code := redis.GetStr("qyxy_id", b); code != "" { saveM[f] = code } } } else if f == "winner_id" { if s != "" { saveM["winner"] = s if code := redis.GetStr("qyxy_id", s); code != "" { saveM[f] = code } } } else { if tmp[f] != nil { if ProBusVMap[f] != nil { saveM[f], _ = verifyF(f, tmp[f], ProBusVMap[f]) } else { saveM[f] = tmp[f] } } } } saveProbPool <- saveM } } // @Description 项目信息标签 // @Author J 2022/9/30 13:54 func taskProTag(tmp map[string]interface{}) { id := mongodb.BsonIdToSId(tmp["_id"]) if topArr, ok := tmp["topscopeclass"].([]interface{}); ok { for _, i2 := range topArr { tclass := regLetter.ReplaceAllString(util.ObjToString(i2), "") // 去除字母 code := TopScopeCode[tclass] saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "1", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} } } if subArr, ok := tmp["subscopeclass"].([]interface{}); ok { for _, i2 := range subArr { sc := strings.Split(util.ObjToString(i2), "_") code := SubScopeCode[sc[1]] saveProTagPool <- map[string]interface{}{"projectid": id, "labelcode": "2", "labelvalues": code, "labelweight": 1, "createtime": time.Now().Format(util.Date_Full_Layout)} } } } // @Description 关系表 // @Author J 2022/9/30 13:56 func taskRelation(tmp map[string]interface{}) { pid := mongodb.BsonIdToSId(tmp["_id"]) if tmp["ids"] == nil { log.Info("taskRelation ids err", zap.Any("id", pid)) return } ids := util.ObjArrToStringArr(tmp["ids"].([]interface{})) lid := ids[len(ids)-1] //if b := util.ObjToString(tmp["buyer"]); b != "" { // saveM := make(map[string]interface{}) // // saveM["projectid"] = pid // saveM["infoid"] = lid // saveM["identity_type"] = 1 // saveM["createtime"] = time.Now().Format(util.Date_Full_Layout) // if code := redis.GetStr("qyxy_id", b); code != "" { // saveM["name_id"] = code // if util.ObjToString(tmp["buyertel"]) != "" { // q := make(map[string]interface{}) // q["name_id"] = code // q["identity_type"] = 1 // q["contact_tel"] = util.ObjToString(tmp["buyertel"]) // if util.ObjToString(tmp["buyerperson"]) != "" { // q["contact_name"] = util.ObjToString(tmp["buyerperson"]) // } // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") // if cinfo != nil && len(*cinfo) > 0 { // saveM["contact_id"] = (*cinfo)["id"] // saveRelationPool <- saveM // } // } // } //} //if a := util.ObjToString(tmp["agency"]); a != "" { // saveM := make(map[string]interface{}) // saveM["projectid"] = pid // saveM["infoid"] = lid // saveM["identity_type"] = 4 // saveM["createtime"] = time.Now().Format(util.Date_Full_Layout) // if code := redis.GetStr("qyxy_id", a); code != "" { // saveM["name_id"] = code // if util.ObjToString(tmp["agencytel"]) != "" { // q := make(map[string]interface{}) // q["name_id"] = code // q["identity_type"] = 4 // 100 // q["contact_tel"] = util.ObjToString(tmp["agencytel"]) // if util.ObjToString(tmp["agencyperson"]) != "" { // q["contact_name"] = util.ObjToString(tmp["agencyperson"]) // } // cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") // if cinfo != nil && len(*cinfo) > 0 { // saveM["contact_id"] = (*cinfo)["id"] // saveRelationPool <- saveM // } // } // } //} for _, item := range tmp["list"].([]interface{}) { item1 := item.(map[string]interface{}) sw := util.ObjToString(item1["s_winner"]) if !strings.Contains(sw, ",") { if code := redis.GetStr("qyxy_id", sw); code != "" { saveM := make(map[string]interface{}) saveM["projectid"] = pid saveM["infoid"] = lid saveM["identity_type"] = 2 saveM["createtime"] = time.Now().Format(util.Date_Full_Layout) saveM["name_id"] = code if util.ObjToString(item1["winnertel"]) != "" { q := make(map[string]interface{}) q["name_id"] = code q["identity_type"] = 2 // 010 q["contact_tel"] = util.ObjToString(item1["winnertel"]) if util.ObjToString(item1["winnerperson"]) != "" { q["contact_name"] = util.ObjToString(item1["winnerperson"]) } cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") if cinfo != nil && len(*cinfo) > 0 { saveM["contact_id"] = (*cinfo)["id"] saveRelationPool <- saveM } } } } } } func taskRelation2(tmp map[string]interface{}) { pid := mongodb.BsonIdToSId(tmp["_id"]) if tmp["ids"] == nil { log.Info("taskRelation ids err", zap.Any("id", pid)) return } info := MysqlTool.Find("dws_f_bpmc_relation", bson.M{"projectid": pid}, "", "", -1, -1) if len(*info) > 0 { } else { ids := util.ObjArrToStringArr(tmp["ids"].([]interface{})) lid := ids[len(ids)-1] if b := util.ObjToString(tmp["buyer"]); b != "" { saveM := make(map[string]interface{}) for _, f := range RelationField { if f == "projectid" { saveM[f] = pid } else if f == "infoid" { saveM[f] = lid } else if f == "name_id" { if code := redis.GetStr("qyxy_id", b); code != "" { saveM[f] = code if util.ObjToString(tmp["buyertel"]) != "" { q := make(map[string]interface{}) q["name_id"] = code q["identity_type"] = 1 q["contact_tel"] = util.ObjToString(tmp["buyertel"]) if util.ObjToString(tmp["buyerperson"]) != "" { q["contact_name"] = util.ObjToString(tmp["buyerperson"]) } cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") if cinfo != nil && len(*cinfo) > 0 { saveM["contact_id"] = (*cinfo)["id"] } } } } else if f == "identity_type" { saveM[f] = 1 // 001 } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } } saveRelationPool <- saveM } if a := util.ObjToString(tmp["agency"]); a != "" { saveM := make(map[string]interface{}) for _, f := range RelationField { if f == "projectid" { saveM[f] = pid } else if f == "infoid" { saveM[f] = lid } else if f == "name_id" { if code := redis.GetStr("qyxy_id", a); code != "" { saveM[f] = code if util.ObjToString(tmp["buyertel"]) != "" { q := make(map[string]interface{}) q["name_id"] = code q["identity_type"] = 4 q["contact_tel"] = util.ObjToString(tmp["agencytel"]) if util.ObjToString(tmp["agencyperson"]) != "" { q["contact_name"] = util.ObjToString(tmp["agencyperson"]) } cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") if cinfo != nil && len(*cinfo) > 0 { saveM["contact_id"] = (*cinfo)["id"] } } } } else if f == "identity_type" { saveM[f] = 4 // 100 } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } } saveRelationPool <- saveM } warr := strings.Split(util.ObjToString(tmp["s_winner"]), ",") if BinarySearch(warr, util.ObjToString(tmp["winner"])) == -1 { warr = append(warr, util.ObjToString(tmp["winner"])) } for _, ws := range warr { saveM := make(map[string]interface{}) for _, f := range RelationField { if f == "projectid" { saveM[f] = pid } else if f == "infoid" { saveM[f] = lid } else if f == "name_id" { if code := redis.GetStr("qyxy_id", ws); code != "" { saveM[f] = code if util.ObjToString(tmp["buyertel"]) != "" { q := make(map[string]interface{}) q["name_id"] = code q["identity_type"] = 2 q["contact_tel"] = util.ObjToString(tmp["winnertel"]) if util.ObjToString(tmp["winnerperson"]) != "" { q["contact_name"] = util.ObjToString(tmp["winnerperson"]) } cinfo := MysqlTool.FindOne("dws_f_ent_contact", q, "", "") if cinfo != nil && len(*cinfo) > 0 { saveM["contact_id"] = (*cinfo)["id"] } } } } else if f == "identity_type" { saveM[f] = 2 // 010 } else if f == "createtime" { saveM[f] = time.Now().Format(util.Date_Full_Layout) } } saveRelationPool <- saveM } } }