package main import ( "encoding/json" "fmt" "github.com/go-ego/gse" "go.mongodb.org/mongo-driver/bson" "go.uber.org/zap" "io/ioutil" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis" "net/http" "net/url" "os" "proposed_project/config" "regexp" "strings" "sync" "time" "unicode/utf8" ) var ( seg gse.Segmenter stopWords []string regNum = regexp.MustCompile(`[\dA-Za-z]{6,30}`) regSymb = regexp.MustCompile("[•、,,.。??'\"“”‘’·~!@#¥$%…&*()()\\-—+=【】\\[\\]{}{}<>《》|\\/\\s]+") regDel = regexp.MustCompile("项目|工程|生产|中心") reg1 = regexp.MustCompile("分布式光伏发电|自然人|出让|国有建设用地使用权") field = []string{"projectname.pname"} sField = []string{"projectname", "bidstatus", "firsttime", "_id", "area", "city", "district", "ids"} esQ = `{"query": {"bool": {"must": [{"multi_match": {"query": "%s","type": "phrase","fields": ["projectname"]}}]}},"size": 100}` savePpPool = make(chan map[string]interface{}, 5000) savePpSp = make(chan bool, 3) ) func initSeg() { _ = seg.LoadDict("./t_1.txt") //_ = seg.LoadDict() seg.AddToken("渼陂", 3, "") seg.LoadStop("./stopwords.txt") //f, _ := os.Open("./stopwords.txt") //defer f.Close() //scanner := bufio.NewScanner(f) //for scanner.Scan() { // stopWords = append(stopWords, scanner.Text()) //} //sort.Strings(stopWords) } // @Description 关联 // @Author J 2023/4/25 10:26 func taskComb() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} f := map[string]interface{}{ "projectname": 1, "approvecode": 1, "approvenumber": 1, "approvestatus": 1, "area": 1, "city": 1, "district": 1, "ids": 1, } query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(nil).Select(f).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if filterMethod(tmp) { return } var mArr []map[string]interface{} var eArr []map[string]interface{} n1, n2 := 0, 0 // approvecode、approvenumber //q := Method2(util.ObjToString(tmp["approvecode"]), util.ObjToString(tmp["approvenumber"])) //if q != "" { // binfo := Es.Get("projectset", q) // if binfo != nil && len(*binfo) > 0 { // for _, m := range *binfo { // n1 = len(*binfo) // mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 1}) // } // } //} wds, q := Method1(util.ObjToString(tmp["projectname"])) if q != "" { binfo := Es.Get("projectset", q) if binfo != nil && len(*binfo) > 0 { n2 = len(*binfo) for _, m := range *binfo { if b, _ := redis.Exists(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"])); b { continue } if util.ObjToString(m["bidstatus"]) == "拟建" { eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]}) continue } if judgeArea(tmp, m, wds) { eArr = append(eArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "bidstatus": m["bidstatus"]}) continue } size := len(m["ids"].([]interface{})) redis.PutCKV(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"]), fmt.Sprintf("%s-%d", mongodb.BsonIdToSId(tmp["_id"]), size)) redis.PutCKV(config.Conf.DB.Redis.Proposed, mongodb.BsonIdToSId(tmp["_id"]), len(tmp["ids"].([]interface{}))) mArr = append(mArr, map[string]interface{}{"pid": util.ObjToString(m["_id"]), "projectname": m["projectname"], "source": 2}) } } } //if mArr != nil && len(mArr) > 0 { save := make(map[string]interface{}) save["_id"] = tmp["_id"] save["ids"] = mArr save["wds"] = wds save["err"] = eArr save["projectname"] = tmp["projectname"] save["esearch"] = q save["size_1"] = n1 save["size_2"] = n2 save["createtime"] = time.Now().Unix() savePpPool <- save //} }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } func Method(pname string) string { pname = regNum.ReplaceAllString(pname, "") pname = regSymb.ReplaceAllString(pname, "") //wds := seg.CutStop(pname, true) util.Debug(pname) surl := config.Conf.DB.Es.Addr + "/projectset_v2/_analyze" URL, _ := url.Parse(surl) Q := URL.Query() Q.Add("pretty", "1") Q.Add("analyzer", "my_ngram_title") Q.Add("text", pname) URL.RawQuery = Q.Encode() resp, err := http.Get(URL.String()) if err != nil { log.Info("") } result, err := ioutil.ReadAll(resp.Body) if err != nil { log.Info("") } var resMap map[string]interface{} json.Unmarshal(result, &resMap) if resMap == nil || len(resMap["tokens"].([]interface{})) == 0 { log.Info("") } tokens := util.ObjArrToMapArr(resMap["tokens"].([]interface{})) var should []interface{} for _, t := range tokens { wd := util.ObjToString(t["token"]) if in(wd) { // 删除词 跳过 continue } mm := &ShouldObject{MultiMatch: &MultiMatch{ Query: wd, Type: "phrase", Fields: field, }} should = append(should, mm) } query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Must: should}}} jsr, err := json.Marshal(query) if err != nil { fmt.Printf("Err %v", err) os.Exit(1) } return string(jsr) } // @Description jieba分词查询 // @Desc projectname // @Desc minimum_should_match 当should分支总数小于等于指定的数量时,则必须匹配所有should分支,当should分支总数大于指定的数量时,则应用指定的说明符 // @Author J 2023/3/7 17:13 func Method1(pname string) ([]string, string) { if pname == "" { return nil, "" } var wds []string p1 := regSymb.ReplaceAllString(pname, "") p1 = regNum.ReplaceAllString(p1, "") if utf8.RuneCountInString(p1) < 12 { wds = append(wds, pname) } else { wds = seg.CutStop(pname, true) } if len(wds) > 0 { wds = combArr(wds) var should []interface{} for _, t := range wds { //if utf8.RuneCountInString(t) == 1 { // continue //} mm := &ShouldObject{MultiMatch: &MultiMatch{ Query: t, Type: "phrase", Fields: field, }} should = append(should, mm) } if should == nil || len(should) <= 0 { return nil, "" } query := &QueryObject{Query: &BoolObject{Bool: &MMSMObject{Should: should, MinSdMatch: config.Conf.DB.Es.MinSdMh}}, Source: sField, Size: 1000} jsr, err := json.Marshal(query) if err != nil { fmt.Printf("Err %v", err) os.Exit(1) } return wds, strings.Replace(string(jsr), "\\u003c", "<", -1) } else { return nil, "" } } // @Description // @Author J 2023/3/20 14:39 func Method2(acode, anumb string) string { if acode == "" && anumb == "" { return "" } var should []interface{} if acode != "" { should = append(should, &ShouldObject{MultiMatch: &MultiMatch{ Query: acode, Type: "phrase", Fields: []string{"detail"}, }}) } if anumb != "" { should = append(should, &ShouldObject{MultiMatch: &MultiMatch{ Query: anumb, Type: "phrase", Fields: []string{"detail"}, }}) } query := &QueryObject1{Query: &BoolObject1{Bool: &MMSObject{Should: should}}, Source: sField, Size: 500} jsr, err := json.Marshal(query) if err != nil { fmt.Printf("Err %v", err) os.Exit(1) } return string(jsr) } func judgeArea(tmp, btmp map[string]interface{}, wds []string) bool { if util.ObjToString(tmp["area"]) == "全国" || util.ObjToString(btmp["area"]) == "全国" { pname := util.ObjToString(btmp["projectname"]) for _, wd := range wds { if !strings.Contains(strings.ToLower(pname), strings.ToLower(wd)) { return true } } return false } if tmp["district"] != nil && btmp["district"] != nil { if util.ObjToString(tmp["district"]) == util.ObjToString(btmp["district"]) { return false } else { return true } } else if tmp["city"] != nil && btmp["city"] != nil { if util.ObjToString(tmp["city"]) == util.ObjToString(btmp["city"]) { return false } else { return true } } else if tmp["area"] != nil && btmp["area"] != nil { if util.ObjToString(tmp["area"]) == util.ObjToString(btmp["area"]) { return false } else { return true } } return false } var regs = []*regexp.Regexp{ regexp.MustCompile("[\\\\u4e00-\\\\u9fa5]{0,10}私宅"), regexp.MustCompile("自然人|分布式光伏发电"), } // @Description 过滤数据 // @Author J 2023/3/24 09:30 func filterMethod(tmp map[string]interface{}) bool { pname := regSymb.ReplaceAllString(util.ObjToString(tmp["projectname"]), "") pname = regNum.ReplaceAllString(pname, "") p1 := regDel.ReplaceAllString(pname, "") p1 = regSymb.ReplaceAllString(p1, "") if utf8.RuneCountInString(p1) <= 5 { return true } if len(reg1.FindAllString(pname, -1)) > 1 { return true } for _, reg := range regs { if reg.MatchString(util.ObjToString(tmp["projectname"])) { return true } } return false } func combArr(arr []string) []string { var nArr []string for i := 0; i < len(arr); i++ { if i == len(arr)-1 { if utf8.RuneCountInString(arr[i]) == 1 { if len(nArr) > 0 { nArr[len(nArr)-1] += arr[i] } } else { nArr = append(nArr, arr[i]) } } else { if utf8.RuneCountInString(arr[i]) == 1 { if i == 0 { nArr = append(nArr, arr[i]) } else { nArr[len(nArr)-1] += arr[i] //if utf8.RuneCountInString(arr[i+1]) == 1 { // if utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 { // nArr[len(nArr)-1] += arr[i] // } else { // nArr = append(nArr, arr[i]) // } //} else { // nArr[len(nArr)-1] += arr[i] //} } } else { if len(nArr) > 0 && utf8.RuneCountInString(nArr[len(nArr)-1]) == 1 { nArr[len(nArr)-1] += arr[i] } else { nArr = append(nArr, arr[i]) } } } } return nArr } func SavePpMethod() { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-savePpPool: arru[indexu] = v indexu++ if indexu == saveSize { savePpSp <- true go func(arru []map[string]interface{}) { defer func() { <-savePpSp }() MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { savePpSp <- true go func(arru []map[string]interface{}) { defer func() { <-savePpSp }() MgoPro.SaveBulk(config.Conf.DB.MongoP.CombColl, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } } var StageCode []TagMatching func initStage() { info, _ := MgoBid.Find(config.Conf.Serve.TagRule, bson.M{"label_name": "project_stage"}, `{"_id": 1}`, nil, false, -1, -1) for _, m := range *info { tag := TagMatching{} tag.tagName = util.ObjToString(m["label_name"]) tag.tagCode = util.ObjToString(m["code"]) // 关键词 tag.matchField = []string{"title", "project"} if v := util.ObjToString(m["keyword"]); v != "" { tag.matchKey = util.ObjToString(m["keyword"]) tag.matchKeyReg = GetRegex(util.ObjToString(m["keyword"])) } // 附件词 if f := util.ObjToString(m["match_fjword"]); f != "" { tag.addField = strings.Split(f, ",") for _, s := range tag.addField { SelectF[s] = 1 } if v := util.ObjToString(m["fjword"]); v != "" { tag.addKey = util.ObjToString(m["fjword"]) tag.addKeyReg = GetRegex(util.ObjToString(m["fjword"])) } } // 排除词 if f := util.ObjToString(m["match_pcword"]); f != "" { tag.excludeField = strings.Split(f, ",") for _, s := range tag.excludeField { SelectF[s] = 1 } if v := util.ObjToString(m["pcword"]); v != "" { tag.excludeKey = util.ObjToString(m["pcword"]) tag.excludeKeyReg = GetRegex(util.ObjToString(m["pcword"])) } } // 清理词 if v := util.ObjToString(m["qlword"]); v != "" { tag.clearKey = strings.Split(util.ObjToString(m["qlword"]), ",") } StageCode = append(StageCode, tag) } log.Info("initStage", zap.Int("StageCode", len(StageCode))) } func taskD() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} //q := bson.M{"_id": mongodb.StringTOBsonId("60a2995b8a2adb30a57172ec")} query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.CombColl).Find(nil).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if ids, ok := tmp["ids"].([]interface{}); ok { id := mongodb.BsonIdToSId(tmp["_id"]) for _, p := range ids { p1 := p.(map[string]interface{}) info, _ := MgoPro.FindById(config.Conf.DB.MongoP.ProjectColl, util.ObjToString(p1["pid"]), nil) if list, ok1 := (*info)["list"].([]interface{}); ok1 { for _, l := range list { l1 := l.(map[string]interface{}) m := make(map[string]interface{}) m["project_stage_code"] = tagFunc(l1) m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) m["title"] = util.ObjToString(l1["title"]) if t := util.Int64All(l1["publishtime"]); t > 0 { m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } m["infoid"] = util.ObjToString(l1["infoid"]) m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"]))) m["createtime"] = time.Now().Format(util.Date_Full_Layout) //MgoPro.Save("projectset_comb_temp1", m) MysqlTool.Insert("dwd_f_nzj_follw_record", m) } } if buyer := util.ObjToString((*info)["buyer"]); buyer != "" { s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer}) if s <= 0 { saveEnt := make(map[string]interface{}) saveEnt["proposed_id"] = id saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout) saveEnt["name"] = buyer if eid := redis.GetStr("ent_id", buyer); eid != "" { arr := strings.Split(eid, "_") saveEnt["name_id"] = arr[0] if len(arr) == 2 { saveEnt["area_code"] = arr[1] } else if len(arr) == 3 { saveEnt["city_code"] = arr[2] } info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1) if info != nil && len(*info) > 0 { saveEnt["address"] = (*info)[0]["address"] } } saveEnt["identity_type"] = 2 saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])] saveEntPool1 <- saveEnt } } if winner := util.ObjToString((*info)["buyer"]); winner != "" { for _, w := range strings.Split(winner, ",") { s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w}) if s <= 0 { saveEnt := make(map[string]interface{}) saveEnt["proposed_id"] = id saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout) saveEnt["name"] = w if eid := redis.GetStr("ent_id", w); eid != "" { arr := strings.Split(eid, "_") saveEnt["name_id"] = arr[0] if len(arr) == 2 { saveEnt["area_code"] = arr[1] } else if len(arr) == 3 { saveEnt["city_code"] = arr[2] } info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1) if info != nil && len(*info) > 0 { saveEnt["address"] = (*info)[0]["address"] } } saveEnt["identity_type"] = 3 saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])] saveEntPool1 <- saveEnt } } } } size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}) info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc") MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)}) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } // @Description 施工准备(06)、施工(07)、设计(05) // @Author J 2023/4/21 14:45 func tagFunc(info map[string]interface{}) string { tag := taskFuc1(info) if tag["project_stage"] != "" { return util.ObjToString(tag["project_stage"]) } if util.ObjToString(info["toptype"]) == "招标" || util.ObjToString(info["toptype"]) == "预告" { return "06" } return "00" } // @Description 在建项目增量 // @Author J 2023/4/24 13:58 func taskAA() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}} query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProjectColl).Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("project current --- %d", count)) } if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici { config.Conf.Serve.Pici = pc } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() bUpdate := false id := mongodb.BsonIdToSId(tmp["_id"]) if str := redis.GetStr(config.Conf.DB.Redis.Project, id); str != "" { strs := strings.Split(str, "-") size := len(tmp["list"].([]interface{})) if size != util.IntAll(strs[1]) { list := tmp["list"].([]interface{}) for k := range list { info1 := list[size-k-1].(map[string]interface{}) //倒序 infoid := util.ObjToString(info1["infoid"]) binfo := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"infoid": infoid}, "", "") if binfo != nil && len(*binfo) > 0 { break } else { m := make(map[string]interface{}) m["project_stage_code"] = tagFunc(info1) m["proposed_id"] = strs[0] m["title"] = util.ObjToString(info1["title"]) if t := util.Int64All(info1["publishtime"]); t > 0 { m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } m["infoid"] = util.ObjToString(info1["infoid"]) m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(info1["infoid"]))) m["createtime"] = time.Now().Format(util.Date_Full_Layout) MysqlTool.Insert("dwd_f_nzj_follw_record", m) bUpdate = true } } redis.PutCKV(config.Conf.DB.Redis.Project, id, fmt.Sprintf("%s-%d", strs[0], size)) } } else { // 新项目 // 新项目是否都有必要进行关联(拟建项目) bUpdate = Method3(tmp) } // 更新拟在建基本信息表 if bUpdate { size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}) info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc") if info != nil && len(*info) > 0 { MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)}) } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("project over --- %d, pici ---%d", count, config.Conf.Serve.Pici)) } // @Description 拟建项目增量 // @Author J 2023/4/24 13:59 func taskBB() { sess := MgoPro.GetMgoConn() defer MgoPro.DestoryMongoConn(sess) ch := make(chan bool, config.Conf.Serve.Thread) wg := &sync.WaitGroup{} q := bson.M{"pici": bson.M{"$gte": config.Conf.Serve.Pici}} query := sess.DB(config.Conf.DB.MongoP.Dbname).C(config.Conf.DB.MongoP.ProposedColl).Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%2000 == 0 { log.Info(fmt.Sprintf("proposed current --- %d", count)) } if pc := util.Int64All(tmp["pici"]); pc > config.Conf.Serve.Pici { config.Conf.Serve.Pici = pc } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() bUpdate := false id := mongodb.BsonIdToSId(tmp["_id"]) if num := redis.GetInt(config.Conf.DB.Redis.Proposed, id); num > 0 { size := len(tmp["list"].([]interface{})) list := tmp["list"].([]interface{}) if num != size { for k := range list { info1 := list[size-k-1].(map[string]interface{}) infoid := util.ObjToString(info1["infoid"]) binfo := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"infoid": infoid}, "", "") if binfo != nil && len(*binfo) > 0 { break } else { m := make(map[string]interface{}) m["project_stage_code"] = tagFunc(info1) m["proposed_id"] = id m["title"] = util.ObjToString(info1["title"]) if t := util.Int64All(info1["publishtime"]); t > 0 { m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } m["infoid"] = util.ObjToString(info1["infoid"]) m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(info1["infoid"]))) m["createtime"] = time.Now().Format(util.Date_Full_Layout) MysqlTool.Insert("dwd_f_nzj_follw_record", m) bUpdate = true } } redis.PutCKV(config.Conf.DB.Redis.Proposed, id, size) } } else { // 新项目 // 1、关联 wds, q := Method1(util.ObjToString(tmp["projectname"])) if q != "" { binfo := Es.Get("projectset", q) if binfo != nil && len(*binfo) > 0 { for _, m := range *binfo { if b, _ := redis.Exists(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"])); b { continue } if util.ObjToString(m["bidstatus"]) == "拟建" { continue } if judgeArea(tmp, m, wds) { continue } size := len(m["ids"].([]interface{})) redis.PutCKV(config.Conf.DB.Redis.Project, util.ObjToString(m["_id"]), fmt.Sprintf("%s-%d", mongodb.BsonIdToSId(tmp["_id"]), size)) redis.PutCKV(config.Conf.DB.Redis.Proposed, mongodb.BsonIdToSId(tmp["_id"]), len(tmp["ids"].([]interface{}))) bUpdate = true // 2、保存信息到tidb if list, ok1 := m["list"].([]interface{}); ok1 { for _, l := range list { l1 := l.(map[string]interface{}) m := make(map[string]interface{}) m["project_stage_code"] = tagFunc(l1) m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) m["title"] = util.ObjToString(l1["title"]) if t := util.Int64All(l1["publishtime"]); t > 0 { m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } m["infoid"] = util.ObjToString(l1["infoid"]) m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"]))) m["createtime"] = time.Now().Format(util.Date_Full_Layout) MysqlTool.Insert("dwd_f_nzj_follw_record", m) } } if buyer := util.ObjToString(m["buyer"]); buyer != "" { s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": buyer}) if s <= 0 { saveEnt := make(map[string]interface{}) saveEnt["proposed_id"] = id saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout) saveEnt["name"] = buyer if eid := redis.GetStr("ent_id", buyer); eid != "" { arr := strings.Split(eid, "_") saveEnt["name_id"] = arr[0] if len(arr) == 2 { saveEnt["area_code"] = arr[1] } else if len(arr) == 3 { saveEnt["city_code"] = arr[2] } info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1) if info != nil && len(*info) > 0 { saveEnt["address"] = (*info)[0]["address"] } } saveEnt["identity_type"] = 2 saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])] saveEntPool1 <- saveEnt } } if winner := util.ObjToString(m["buyer"]); winner != "" { for _, w := range strings.Split(winner, ",") { s := MysqlTool.Count("dwd_f_nzj_ent", bson.M{"proposed_id": id, "name": w}) if s <= 0 { saveEnt := make(map[string]interface{}) saveEnt["proposed_id"] = id saveEnt["createtime"] = time.Now().Format(util.Date_Full_Layout) saveEnt["name"] = w if eid := redis.GetStr("ent_id", w); eid != "" { arr := strings.Split(eid, "_") saveEnt["name_id"] = arr[0] if len(arr) == 2 { saveEnt["area_code"] = arr[1] } else if len(arr) == 3 { saveEnt["city_code"] = arr[2] } info := MysqlTool1.Find("dws_f_ent_baseinfo", bson.M{"name_id": arr[0]}, "address", "", -1, -1) if info != nil && len(*info) > 0 { saveEnt["address"] = (*info)[0]["address"] } } saveEnt["identity_type"] = 3 saveEnt["area_code"] = AreaCode[util.ObjToString(tmp["area"])] saveEntPool1 <- saveEnt } } } } } } } // 更新拟在建基本信息表 if bUpdate { size := MysqlTool.Count("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}) info := MysqlTool.FindOne("dwd_f_nzj_follw_record", bson.M{"proposed_id": id}, "project_stage_code", "publishtime desc") MysqlTool.Update("dwd_f_nzj_baseinfo", bson.M{"proposed_id": id}, bson.M{"follow_num": size, "project_stage_code": (*info)["project_stage_code"], "updatetime": time.Now().Format(util.Date_Full_Layout)}) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("proposed over --- %d, pici ---%d", count, config.Conf.Serve.Pici)) } func Method3(tmp map[string]interface{}) bool { pname := util.ObjToString(tmp["projectname"]) pname = strings.ReplaceAll(pname, "\"", "'") binfo := Es.Get("proposed", fmt.Sprintf(esQ, pname)) if binfo != nil && len(*binfo) > 0 { if list, ok1 := (*binfo)[0]["list"].([]interface{}); ok1 { for _, l := range list { l1 := l.(map[string]interface{}) m := make(map[string]interface{}) m["project_stage_code"] = tagFunc(l1) m["proposed_id"] = mongodb.BsonIdToSId(tmp["_id"]) m["title"] = util.ObjToString(l1["title"]) if t := util.Int64All(l1["publishtime"]); t > 0 { m["publishtime"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } m["infoid"] = util.ObjToString(l1["infoid"]) m["jybxhref"] = fmt.Sprintf(config.Conf.Serve.JyHref, util.CommonEncodeArticle("content", util.ObjToString(l1["infoid"]))) m["createtime"] = time.Now().Format(util.Date_Full_Layout) MysqlTool.Insert("dwd_f_nzj_follw_record", m) } redis.PutCKV(config.Conf.DB.Redis.Project, mongodb.BsonIdToSId(tmp["_id"]), fmt.Sprintf("%s-%d", util.ObjToString((*binfo)[0]["_id"]), len(list))) return true } } return false } var saveEntPool1 = make(chan map[string]interface{}, 5000) var saveEntSp1 = make(chan bool, 1) func SaveEntFunc1(table string, arr []string) { arru := make([]map[string]interface{}, saveSize) indexu := 0 for { select { case v := <-saveEntPool1: arru[indexu] = v indexu++ if indexu == saveSize { saveEntSp1 <- true go func(arru []map[string]interface{}) { defer func() { <-saveEntSp1 }() MysqlTool.InsertBulk(table, arr, arru...) }(arru) arru = make([]map[string]interface{}, saveSize) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { saveEntSp1 <- true go func(arru []map[string]interface{}) { defer func() { <-saveEntSp1 }() MysqlTool.InsertBulk(table, arr, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, saveSize) indexu = 0 } } } }