package main import ( "context" "data_project_information/config" "encoding/json" "fmt" "github.com/gogf/gf/v2/util/gconv" "strings" "sync" "time" es "github.com/olivere/elastic/v7" "go.mongodb.org/mongo-driver/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" ) //var piciLock sync.Mutex func task() { client := Es.GetEsConn() defer Es.DestoryEsConn(client) wg := &sync.WaitGroup{} tf := []int64{1577808000, 1609430400, 1640966400, 1672502400, 1712851200} fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form") // 查询字段 for i, tm := range tf { query := es.NewBoolQuery(). //Must(es.NewMatchQuery("tag_topinformation", "情报_物业")) Must(es.NewExistsQuery("property_form")) if i == 0 { query.Must(es.NewRangeQuery("comeintime").Lte(tm)) } else if i == 1 { query.Must(es.NewRangeQuery("comeintime").Gte(tf[0]).Lte(tm)) } else if i == 2 { query.Must(es.NewRangeQuery("comeintime").Gte(tf[1]).Lte(tm)) } else if i == 3 { query.Must(es.NewRangeQuery("comeintime").Gte(tf[2]).Lte(tm)) } else { query.Must(es.NewRangeQuery("comeintime").Lte(tf[3])) } util.Debug(fmt.Sprintf("第%d次查询,数据量为:%d", i+1, Es.Count("bidding", query))) countDocs := 0 res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标 if err == nil { taskInfoA(res, wg, &countDocs) scrollId := res.ScrollId for { searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询 if err != nil { util.Debug("Es Search Data Error:", err.Error()) break } taskInfoA(searchResult, wg, &countDocs) scrollId = searchResult.ScrollId } wg.Wait() util.Debug(fmt.Sprintf("第%d次处理结束,处理文档%d条", i+1, countDocs)) _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标 } else { util.Debug(err) } } } func taskOld() { query := es.NewBoolQuery(). Should(es.NewExistsQuery("tag_topinformation"), es.NewExistsQuery("tag_topinformation_ai")). MinimumNumberShouldMatch(1) query.Must(es.NewRangeQuery("pici").Lte(1732244400)) util.Debug(fmt.Sprintf("数据量为:%d", Es.Count("bidding_ai", query))) client := Es.GetEsConn() defer Es.DestoryEsConn(client) wg := &sync.WaitGroup{} fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form", "pici") // 查询字段 countDocs := 0 res, err := client.Scroll().Index("bidding_ai").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标 if err == nil { taskInfoA(res, wg, &countDocs) scrollId := res.ScrollId for { searchResult, err := client.Scroll("1m").Index("bidding_ai").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询 if err != nil { util.Debug("Es Search Data Error:", err.Error()) break } taskInfoA(searchResult, wg, &countDocs) scrollId = searchResult.ScrollId } wg.Wait() util.Debug(fmt.Sprintf("处理结束,处理文档%d条", countDocs)) _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标 } else { util.Debug(err) } } func taskAdd() { client := Es.GetEsConn() defer Es.DestoryEsConn(client) wg := &sync.WaitGroup{} fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set", "tag_topinformation_ai", "tag_subinformation_ai", "property_form", "pici") // 查询字段 query := es.NewBoolQuery(). // Should(es.NewExistsQuery("tag_topinformation")). // Should(es.NewExistsQuery("tag_topinformation_ai")). Should(es.NewExistsQuery("tag_topinformation"), es.NewExistsQuery("tag_topinformation_ai")). MinimumNumberShouldMatch(1) pici := time.Now().Unix() start := pici - 3600*2 end := pici - 3600 util.Debug(start, end) //if config.Conf.Serve.Pici > 0 { //query.Must(es.NewRangeQuery("pici").Gte(config.Conf.Serve.Pici).Lte(config.Conf.Serve.Pici)) //} query.Must(es.NewRangeQuery("pici").Gte(start).Lte(end)) //query.Must(es.NewTermQuery("_id", "673fc96ab25c3e1deb743862")) util.Debug(fmt.Sprintf("数据量为:%d", Es.Count("bidding_ai", query))) countDocs := 0 res, err := client.Scroll().Index("bidding_ai").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标 if err == nil { taskInfoA(res, wg, &countDocs) scrollId := res.ScrollId for { searchResult, err := client.Scroll("1m").Index("bidding_ai").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询 if err != nil { util.Debug("Es Search Data Error:", err.Error()) break } taskInfoA(searchResult, wg, &countDocs) scrollId = searchResult.ScrollId } wg.Wait() util.Debug(fmt.Sprintf("处理结束,处理文档%d条", countDocs)) util.Debug(config.Conf.Serve.Pici) _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标 } else { util.Debug(err) } } func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) { chd := make(chan bool, 5) for _, hit := range searchResult.Hits.Hits { //开始处理数据 wg.Add(1) chd <- true go func(tmpHit *es.SearchHit) { defer func() { <-chd wg.Done() }() tmp := make(map[string]interface{}) if json.Unmarshal(tmpHit.Source, &tmp) == nil { //piciLock.Lock() //if util.Int64All(tmp["pici"]) > config.Conf.Serve.Pici { // config.Conf.Serve.Pici = util.Int64All(tmp["pici"]) //} //piciLock.Unlock() update := make(map[string]interface{}) if tmp["tag_topinformation"] != nil { update["tag_topinformation"] = tmp["tag_topinformation"] } if tmp["tag_subinformation"] != nil { update["tag_subinformation"] = tmp["tag_subinformation"] } //if tmp["tag_topinformation_ai"] != nil { // update["tag_topinformation_ai"] = tmp["tag_topinformation_ai"] //} if tmp["tag_subinformation_ai"] != nil { update["tag_subinformation_ai"] = tmp["tag_subinformation_ai"] } if tmp["tag_set"] != nil { update["tag_set"] = tmp["tag_set"] } if tmp["property_form"] != nil { update["property_form"] = tmp["property_form"] } tag_topinformationArr := gconv.Strings(tmp["tag_topinformation"]) tag_topinformation_aiArr := gconv.Strings(tmp["tag_topinformation_ai"]) //数据合并去重 tag_topinformation := uniqueMerge(tag_topinformation_aiArr, tag_topinformationArr) if len(tag_topinformation) > 0 { update["tag_topinformation"] = tag_topinformation } //没有标签时 //if tmp["tag_topinformation"] == nil { // //updatePool <- []map[string]interface{}{ // {"ids": tmpHit.Id}, // {"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id}, // "$unset": bson.M{"tag_topinformation": 1, "tag_subinformation": 1}}, //} ////因为项目删除了tag_topinformation,tag_subinformation字段,所以索引也同步删除 //pid := getPidByIds(tmpHit.Id) //if pid != "" { // update["tag_topinformation"] = []string{} // update["tag_subinformation"] = []string{} // updateEsPool <- []map[string]interface{}{ // {"_id": pid}, // update, // } //} //} else { if len(update) > 0 { //fmt.Println(tmpHit.Id, update) updatePool <- []map[string]interface{}{ {"ids": tmpHit.Id}, {"$set": update, "$addToSet": bson.M{"tag_information_ids": tmpHit.Id}}, } //修改es pid := getPidByIds(tmpHit.Id) if pid != "" { update["id"] = pid updateEsPool <- []map[string]interface{}{ {"_id": pid}, update, } } } //} } }(hit) *countDocs += 1 if *countDocs%10000 == 0 { util.Debug("Current:", *countDocs) } } } func taskT() { sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} q := bson.M{"firsttime": bson.M{"$gte": 1711360551}} query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) // go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if tmp["tag_topinformation"] != nil && tmp["tag_subinformation"] != nil { taskinfotA(tmp) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } var YeTai = map[string]string{ "21": "住宅", "22": "政府办公楼", "23": "学校", "24": "医院", "25": "产业园区", "26": "旅游景区", "27": "交通运输", "28": "商务办公楼", "29": "酒店", } var ZhouQi = map[int]string{ 11: "1年以下", 12: "1年", 13: "2年", 14: "3年", 15: "5年", 16: "其他", } func taskinfotA(tmp map[string]interface{}) { save := make(map[string]interface{}) pid := mongodb.BsonIdToSId(tmp["_id"]) ids := util.ObjArrToStringArr(tmp["ids"].([]interface{})) save["source_id"] = pid save["project_name"] = util.ObjToString(tmp["projectname"]) save["area"] = tmp["area"] save["city"] = tmp["city"] save["district"] = tmp["district"] save["budget"] = tmp["budget"] save["bidamount"] = tmp["bidamount"] save["first_url"] = fmt.Sprintf("https://www.jianyu360.cn/article/content/%s.html", util.CommonEncodeArticle("content", ids[0])) save["business_label"] = getStr(util.ObjToString(tmp["buyerclass"])) //bt := util.ObjToString(tmp["bidtype"]) bs := util.ObjToString(tmp["bidstatus"]) if bs == "中标" || bs == "成交" || bs == "合同" { save["bidstatus"] = 1 } else if bs == "废标" || bs == "流标" { save["bidstatus"] = 0 } else { save["bidstatus"] = 2 } if tmp["firsttime"] != nil && util.IntAll(tmp["firsttime"]) > 0 { t := util.Int64All(tmp["firsttime"]) save["publish_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } if util.IntAll(save["bidstatus"]) == 1 && util.IntAll(tmp["jgtime"]) > 0 { t := util.Int64All(tmp["jgtime"]) save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } // 补充结果时间 //if util.Int64All(save["result_time"]) <= 0 && util.IntAll(save["bidstatus"]) == 1 { // t := util.Int64All(tmp["lasttime"]) // if t > 0 { // save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) // } //} save["create_time"] = time.Now().Format(util.Date_Full_Layout) save["update_time"] = time.Now().Format(util.Date_Full_Layout) for _, id := range ids { if info := CkhTool.FindOne("information", bson.M{"datajson_id": id}, "starttime,endtime,datajson_expiredate", ""); info != nil && len(*info) > 0 { if (*info)["starttime"] != nil && util.IntAll((*info)["starttime"]) > 0 { t := util.Int64All((*info)["starttime"]) save["contract_cycle_start_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } if (*info)["datajson_expiredate"] != nil && util.IntAll((*info)["datajson_expiredate"]) > 0 { t := util.Int64All((*info)["datajson_expiredate"]) save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } if (*info)["endtime"] != nil && util.IntAll((*info)["endtime"]) > 0 && save["contract_cycle_end_time"] == nil { t := util.Int64All((*info)["endtime"]) save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } break } } // 行业标签 v1 := util.ObjArrToStringArr(tmp["tag_subinformation"].([]interface{})) for _, sub := range v1 { MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": sub, "stype": 1}) } save["industry_label"] = strings.Join(v1, ",") // 物业业态 if tagset, ok := tmp["tag_set"].(map[string]interface{}); ok { if wuye, o1 := tagset["wuye"].(map[string]interface{}); o1 { if yt := util.ObjToString(wuye["property_form"]); yt != "" { var v2 []string for _, s := range strings.Split(yt, ",") { if s1 := YeTai[s]; s1 != "" { v2 = append(v2, s1) MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": s1, "stype": 2}) } } save["activities_label"] = strings.Join(v2, ",") } if p := util.IntAll(wuye["period"]); p != 0 && ZhouQi[p] != "" { save["contract_cycle"] = ZhouQi[p] if util.IntAll(save["bidstatus"]) == 1 { if util.IntAll(tmp["jgtime"]) > 0 { if et := getDate(p, util.Int64All(tmp["jgtime"])); et > 0 { save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout) } } else if t := getDate(p, util.Int64All(tmp["lasttime"])); t > 0 { if t > 0 { save["next_bid_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } } } } } // 2023年之后的数据 if util.IntAll(save["bidstatus"]) == 1 { if save["next_bid_time"] == nil && util.Int64All(tmp["jgtime"]) > 1672502400 { st, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_start_time"])) et, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_end_time"])) if t := et.Unix() - st.Unix(); t < 90*24*60*60 { if tmp["jgtime"] != nil && util.IntAll(tmp["jgtime"]) > 0 { if et1 := getDate(12, util.Int64All(tmp["jgtime"])); et1 > 0 { save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout) } } else { if t1 := util.Int64All(tmp["lasttime"]); t1 > 0 { if et1 := getDate(12, util.Int64All(tmp["lasttime"])); et1 > 0 { save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout) } } } } else { if t1 := util.Int64All(tmp["jgtime"]); t1 > 0 { et1 := t1 + t save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout) } else { if t2 := util.Int64All(tmp["lasttime"]); t2 > 0 { et1 := t2 + t save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout) } } } } // 2023年之后的数据,无合约周期默认 1年 if save["next_bid_time"] == nil && util.Int64All(save["jgtime"]) > 1672502400 { if et := getDate(12, util.Int64All(tmp["jgtime"])); et > 0 { save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout) } //else { // if t := util.Int64All(tmp["lasttime"]); t > 0 { // if et := getDate(12, util.Int64All(tmp["lasttime"])); et > 0 { // save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout) // } // } //} } } // 中标单位 for _, w := range strings.Split(util.ObjToString(tmp["s_winner"]), ",") { if w != "" { MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": w, "stype": 3}) } } save["winner"] = util.ObjToString(tmp["s_winner"]) savePool <- save } func getStr(b string) string { if b == "" { return "" } a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)" a2 := "(卫健委|医疗)" a3 := "(教育|学校)" a4 := "(人行|金融业)" a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)" if strings.Contains(a1, b) { return "政府机构" } else if strings.Contains(a2, b) { return "医疗单位" } else if strings.Contains(a3, b) { return "教育单位" } else if strings.Contains(a4, b) { return "金融企业" } else if strings.Contains(a5, b) { return "商业公司" } return "" } func getDate(p int, stime int64) int64 { if p == 11 { } else if p == 12 { return stime + (365 * 24 * 60 * 60) } else if p == 13 { return stime + (365 * 24 * 60 * 60 * 2) } else if p == 14 { return stime + (365 * 24 * 60 * 60 * 3) } else if p == 15 { return stime + (365 * 24 * 60 * 60 * 5) } return 0 } // 根据ids获取项目id func getPidByIds(ids string) string { data, ok := Mgo.FindOne(config.Conf.DB.Mongo.Coll, map[string]interface{}{"ids": ids}) if ok && data != nil && len(*data) > 0 { return mongodb.BsonIdToSId((*data)["_id"]) } return "" } func uniqueMerge(arr1, arr2 []string) []string { uniqueMap := make(map[string]bool) result := []string{} // 合并并去重第一个数组 for _, item := range arr1 { if _, found := uniqueMap[item]; !found { uniqueMap[item] = true result = append(result, item) } } // 合并并去重第二个数组 for _, item := range arr2 { if _, found := uniqueMap[item]; !found { uniqueMap[item] = true result = append(result, item) } } return result }