package main import ( "context" "data_project_information/config" "encoding/json" "fmt" es "github.com/olivere/elastic/v7" "go.mongodb.org/mongo-driver/bson" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "strings" "sync" "time" ) func task() { client := Es.GetEsConn() defer Es.DestoryEsConn(client) wg := &sync.WaitGroup{} tf := []int64{1577808000, 1609430400, 1640966400, 1672502400, 1712851200} fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set") // 查询字段 for i, tm := range tf { query := es.NewBoolQuery(). Must(es.NewMatchQuery("tag_topinformation", "情报_物业")) if i == 0 { query.Must(es.NewRangeQuery("comeintime").Lte(tm)) } else if i == 1 { query.Must(es.NewRangeQuery("comeintime").Gte(tf[0]).Lte(tm)) } else if i == 2 { query.Must(es.NewRangeQuery("comeintime").Gte(tf[1]).Lte(tm)) } else if i == 3 { query.Must(es.NewRangeQuery("comeintime").Gte(tf[2]).Lte(tm)) } else { query.Must(es.NewRangeQuery("comeintime").Lte(tf[3])) } util.Debug(fmt.Sprintf("第%d次查询,数据量为:%d", i+1, Es.Count("bidding", query))) countDocs := 0 res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标 if err == nil { taskInfoA(res, wg, &countDocs) scrollId := res.ScrollId for { searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询 if err != nil { util.Debug("Es Search Data Error:", err.Error()) break } taskInfoA(searchResult, wg, &countDocs) scrollId = searchResult.ScrollId } wg.Wait() util.Debug(fmt.Sprintf("第%d次处理结束,处理文档%d条", i+1, countDocs)) _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标 } else { util.Debug(err) } } } func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) { chd := make(chan bool, 5) for _, hit := range searchResult.Hits.Hits { //开始处理数据 wg.Add(1) chd <- true go func(tmpHit *es.SearchHit) { defer func() { <-chd wg.Done() }() tmp := make(map[string]interface{}) if json.Unmarshal(tmpHit.Source, &tmp) == nil { update := make(map[string]interface{}) update["tag_topinformation"] = tmp["tag_topinformation"] update["tag_subinformation"] = tmp["tag_subinformation"] if tmp["tag_set"] != nil { update["tag_set"] = tmp["tag_set"] } //updatePool <- []map[string]interface{}{ // {"ids": tmpHit.Id}, // {"$set": update, "$push": bson.M{"tag_information_ids": tmpHit.Id}}, //} updatePool <- []map[string]interface{}{ {"ids": tmpHit.Id}, {"$set": update}, } } }(hit) *countDocs += 1 if *countDocs%10000 == 0 { util.Debug("Current:", *countDocs) } } } func taskT() { sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} q := bson.M{"firsttime": bson.M{"$gte": 1577808000}} query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Iter() count := 0 for tmp := make(map[string]interface{}); query.Next(tmp); count++ { if count%20000 == 0 { log.Info(fmt.Sprintf("current --- %d", count)) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() if tmp["tag_topinformation"] != nil && tmp["tag_subinformation"] != nil { taskinfotA(tmp) } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Info(fmt.Sprintf("over --- %d", count)) } var YeTai = map[string]string{ "21": "住宅", "22": "政府办公楼", "23": "学校", "24": "医院", "25": "产业园区", "26": "旅游景区", "27": "交通运输", "28": "商务办公楼", "29": "酒店", } var ZhouQi = map[int]string{ 11: "1年以下", 12: "1年", 13: "2年", 14: "3年", 15: "5年", 16: "其他", } func taskinfotA(tmp map[string]interface{}) { save := make(map[string]interface{}) pid := mongodb.BsonIdToSId(tmp["_id"]) ids := util.ObjArrToStringArr(tmp["ids"].([]interface{})) save["source_id"] = pid save["project_name"] = util.ObjToString(tmp["projectname"]) save["area"] = tmp["area"] save["city"] = tmp["city"] save["district"] = tmp["district"] save["budget"] = tmp["budget"] save["bidamount"] = tmp["bidamount"] save["first_url"] = fmt.Sprintf("https://www.jianyu360.cn/article/content/%s.html", util.CommonEncodeArticle("content", ids[0])) save["business_label"] = getStr(util.ObjToString(tmp["buyerclass"])) //bt := util.ObjToString(tmp["bidtype"]) bs := util.ObjToString(tmp["bidstatus"]) if bs == "中标" || bs == "成交" || bs == "合同" { save["bidstatus"] = 1 } else if bs == "废标" || bs == "流标" { save["bidstatus"] = 0 } else { save["bidstatus"] = 2 } if tmp["firsttime"] != nil && util.IntAll(tmp["firsttime"]) > 0 { t := util.Int64All(tmp["firsttime"]) save["publish_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } if util.IntAll(save["bidstatus"]) == 1 && util.IntAll(tmp["jgtime"]) > 0 { t := util.Int64All(tmp["jgtime"]) save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } // 补充结果时间 //if util.Int64All(save["result_time"]) <= 0 && util.IntAll(save["bidstatus"]) == 1 { // t := util.Int64All(tmp["lasttime"]) // if t > 0 { // save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) // } //} save["create_time"] = time.Now().Format(util.Date_Full_Layout) save["update_time"] = time.Now().Format(util.Date_Full_Layout) for _, id := range ids { if info := CkhTool.FindOne("information", bson.M{"datajson_id": id}, "starttime,endtime,datajson_expiredate", ""); info != nil && len(*info) > 0 { if (*info)["starttime"] != nil && util.IntAll((*info)["starttime"]) > 0 { t := util.Int64All((*info)["starttime"]) save["contract_cycle_start_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } if (*info)["datajson_expiredate"] != nil && util.IntAll((*info)["datajson_expiredate"]) > 0 { t := util.Int64All((*info)["datajson_expiredate"]) save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } if (*info)["endtime"] != nil && util.IntAll((*info)["endtime"]) > 0 && save["contract_cycle_end_time"] == nil { t := util.Int64All((*info)["endtime"]) save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } break } } // 行业标签 v1 := util.ObjArrToStringArr(tmp["tag_subinformation"].([]interface{})) for _, sub := range v1 { MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": sub, "stype": 1}) } save["industry_label"] = strings.Join(v1, ",") // 物业业态 if tagset, ok := tmp["tag_set"].(map[string]interface{}); ok { if wuye, o1 := tagset["wuye"].(map[string]interface{}); o1 { if yt := util.ObjToString(wuye["property_form"]); yt != "" { var v2 []string for _, s := range strings.Split(yt, ",") { if s1 := YeTai[s]; s1 != "" { v2 = append(v2, s1) MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": s1, "stype": 2}) } } save["activities_label"] = strings.Join(v2, ",") } if p := util.IntAll(wuye["period"]); p != 0 && ZhouQi[p] != "" { save["contract_cycle"] = ZhouQi[p] if util.IntAll(save["bidstatus"]) == 1 { if util.IntAll(tmp["jgtime"]) > 0 { if et := getDate(p, util.Int64All(tmp["jgtime"])); et > 0 { save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout) } } else if t := getDate(p, util.Int64All(tmp["lasttime"])); t > 0 { if t > 0 { save["next_bid_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout) } } } } } } // 2023年之后的数据 if util.IntAll(save["bidstatus"]) == 1 { if save["next_bid_time"] == nil && util.Int64All(tmp["jgtime"]) > 1672502400 { st, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_start_time"])) et, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_end_time"])) if t := et.Unix() - st.Unix(); t < 90*24*60*60 { if tmp["jgtime"] != nil && util.IntAll(tmp["jgtime"]) > 0 { if et1 := getDate(12, util.Int64All(tmp["jgtime"])); et1 > 0 { save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout) } } else { if t1 := util.Int64All(tmp["lasttime"]); t1 > 0 { if et1 := getDate(12, util.Int64All(tmp["lasttime"])); et1 > 0 { save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout) } } } } else { if t1 := util.Int64All(tmp["jgtime"]); t1 > 0 { et1 := t1 + t save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout) } else { if t2 := util.Int64All(tmp["lasttime"]); t2 > 0 { et1 := t2 + t save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout) } } } } // 2023年之后的数据,无合约周期默认 1年 if save["next_bid_time"] == nil && util.Int64All(save["jgtime"]) > 1672502400 { if et := getDate(12, util.Int64All(tmp["jgtime"])); et > 0 { save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout) } //else { // if t := util.Int64All(tmp["lasttime"]); t > 0 { // if et := getDate(12, util.Int64All(tmp["lasttime"])); et > 0 { // save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout) // } // } //} } } // 中标单位 for _, w := range strings.Split(util.ObjToString(tmp["s_winner"]), ",") { if w != "" { MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": w, "stype": 3}) } } save["winner"] = util.ObjToString(tmp["s_winner"]) savePool <- save } func getStr(b string) string { if b == "" { return "" } a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然\n资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)" a2 := "(卫健委|医疗)" a3 := "(教育|学校)" a4 := "(人行l金融业)" a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)" if strings.Contains(a1, b) { return "政府机构" } else if strings.Contains(a2, b) { return "医疗单位" } else if strings.Contains(a3, b) { return "教育单位" } else if strings.Contains(a4, b) { return "金融企业" } else if strings.Contains(a5, b) { return "商业公司" } return "" } func getDate(p int, stime int64) int64 { if p == 11 { } else if p == 12 { return stime + (365 * 24 * 60 * 60) } else if p == 13 { return stime + (365 * 24 * 60 * 60 * 2) } else if p == 14 { return stime + (365 * 24 * 60 * 60 * 3) } else if p == 15 { return stime + (365 * 24 * 60 * 60 * 5) } return 0 }