123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- package main
- import (
- "context"
- "data_project_information/config"
- "encoding/json"
- "fmt"
- es "github.com/olivere/elastic/v7"
- "go.mongodb.org/mongo-driver/bson"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "strings"
- "sync"
- "time"
- )
- func task() {
- client := Es.GetEsConn()
- defer Es.DestoryEsConn(client)
- wg := &sync.WaitGroup{}
- tf := []int64{1577808000, 1609430400, 1640966400, 1672502400, 1712851200}
- fsc := es.NewFetchSourceContext(true).Include("tag_topinformation", "tag_subinformation", "tag_set") // 查询字段
- for i, tm := range tf {
- query := es.NewBoolQuery().
- Must(es.NewMatchQuery("tag_topinformation", "情报_物业"))
- if i == 0 {
- query.Must(es.NewRangeQuery("comeintime").Lte(tm))
- } else if i == 1 {
- query.Must(es.NewRangeQuery("comeintime").Gte(tf[0]).Lte(tm))
- } else if i == 2 {
- query.Must(es.NewRangeQuery("comeintime").Gte(tf[1]).Lte(tm))
- } else if i == 3 {
- query.Must(es.NewRangeQuery("comeintime").Gte(tf[2]).Lte(tm))
- } else {
- query.Must(es.NewRangeQuery("comeintime").Lte(tf[3]))
- }
- util.Debug(fmt.Sprintf("第%d次查询,数据量为:%d", i+1, Es.Count("bidding", query)))
- countDocs := 0
- res, err := client.Scroll().Index("bidding").Query(query).FetchSourceContext(fsc).Scroll("5m").Size(2000).Do(context.Background()) //查询一条获取游标
- if err == nil {
- taskInfoA(res, wg, &countDocs)
- scrollId := res.ScrollId
- for {
- searchResult, err := client.Scroll("1m").Index("bidding").ScrollId(scrollId).Size(2000).Do(context.TODO()) //查询
- if err != nil {
- util.Debug("Es Search Data Error:", err.Error())
- break
- }
- taskInfoA(searchResult, wg, &countDocs)
- scrollId = searchResult.ScrollId
- }
- wg.Wait()
- util.Debug(fmt.Sprintf("第%d次处理结束,处理文档%d条", i+1, countDocs))
- _, _ = client.ClearScroll().ScrollId(scrollId).Do(context.Background()) //清理游标
- } else {
- util.Debug(err)
- }
- }
- }
- func taskInfoA(searchResult *es.SearchResult, wg *sync.WaitGroup, countDocs *int) {
- chd := make(chan bool, 5)
- for _, hit := range searchResult.Hits.Hits {
- //开始处理数据
- wg.Add(1)
- chd <- true
- go func(tmpHit *es.SearchHit) {
- defer func() {
- <-chd
- wg.Done()
- }()
- tmp := make(map[string]interface{})
- if json.Unmarshal(tmpHit.Source, &tmp) == nil {
- update := make(map[string]interface{})
- update["tag_topinformation"] = tmp["tag_topinformation"]
- update["tag_subinformation"] = tmp["tag_subinformation"]
- if tmp["tag_set"] != nil {
- update["tag_set"] = tmp["tag_set"]
- }
- //updatePool <- []map[string]interface{}{
- // {"ids": tmpHit.Id},
- // {"$set": update, "$push": bson.M{"tag_information_ids": tmpHit.Id}},
- //}
- updatePool <- []map[string]interface{}{
- {"ids": tmpHit.Id},
- {"$set": update},
- }
- }
- }(hit)
- *countDocs += 1
- if *countDocs%10000 == 0 {
- util.Debug("Current:", *countDocs)
- }
- }
- }
- func taskT() {
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- q := bson.M{"firsttime": bson.M{"$gte": 1577808000}}
- query := sess.DB(config.Conf.DB.Mongo.Dbname).C(config.Conf.DB.Mongo.Coll).Find(q).Iter()
- count := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
- if count%20000 == 0 {
- log.Info(fmt.Sprintf("current --- %d", count))
- }
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- if tmp["tag_topinformation"] != nil && tmp["tag_subinformation"] != nil {
- taskinfotA(tmp)
- }
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Info(fmt.Sprintf("over --- %d", count))
- }
- var YeTai = map[string]string{
- "21": "住宅",
- "22": "政府办公楼",
- "23": "学校",
- "24": "医院",
- "25": "产业园区",
- "26": "旅游景区",
- "27": "交通运输",
- "28": "商务办公楼",
- "29": "酒店",
- }
- var ZhouQi = map[int]string{
- 11: "1年以下",
- 12: "1年",
- 13: "2年",
- 14: "3年",
- 15: "5年",
- 16: "其他",
- }
- func taskinfotA(tmp map[string]interface{}) {
- save := make(map[string]interface{})
- pid := mongodb.BsonIdToSId(tmp["_id"])
- ids := util.ObjArrToStringArr(tmp["ids"].([]interface{}))
- save["source_id"] = pid
- save["project_name"] = util.ObjToString(tmp["projectname"])
- save["area"] = tmp["area"]
- save["city"] = tmp["city"]
- save["district"] = tmp["district"]
- save["budget"] = tmp["budget"]
- save["bidamount"] = tmp["bidamount"]
- save["first_url"] = fmt.Sprintf("https://www.jianyu360.cn/article/content/%s.html", util.CommonEncodeArticle("content", ids[0]))
- save["business_label"] = getStr(util.ObjToString(tmp["buyerclass"]))
- //bt := util.ObjToString(tmp["bidtype"])
- bs := util.ObjToString(tmp["bidstatus"])
- if bs == "中标" || bs == "成交" || bs == "合同" {
- save["bidstatus"] = 1
- } else if bs == "废标" || bs == "流标" {
- save["bidstatus"] = 0
- } else {
- save["bidstatus"] = 2
- }
- if tmp["firsttime"] != nil && util.IntAll(tmp["firsttime"]) > 0 {
- t := util.Int64All(tmp["firsttime"])
- save["publish_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- if util.IntAll(save["bidstatus"]) == 1 && util.IntAll(tmp["jgtime"]) > 0 {
- t := util.Int64All(tmp["jgtime"])
- save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- // 补充结果时间
- //if util.Int64All(save["result_time"]) <= 0 && util.IntAll(save["bidstatus"]) == 1 {
- // t := util.Int64All(tmp["lasttime"])
- // if t > 0 {
- // save["result_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- // }
- //}
- save["create_time"] = time.Now().Format(util.Date_Full_Layout)
- save["update_time"] = time.Now().Format(util.Date_Full_Layout)
- for _, id := range ids {
- if info := CkhTool.FindOne("information", bson.M{"datajson_id": id}, "starttime,endtime,datajson_expiredate", ""); info != nil && len(*info) > 0 {
- if (*info)["starttime"] != nil && util.IntAll((*info)["starttime"]) > 0 {
- t := util.Int64All((*info)["starttime"])
- save["contract_cycle_start_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- if (*info)["datajson_expiredate"] != nil && util.IntAll((*info)["datajson_expiredate"]) > 0 {
- t := util.Int64All((*info)["datajson_expiredate"])
- save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- if (*info)["endtime"] != nil && util.IntAll((*info)["endtime"]) > 0 && save["contract_cycle_end_time"] == nil {
- t := util.Int64All((*info)["endtime"])
- save["contract_cycle_end_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- break
- }
- }
- // 行业标签
- v1 := util.ObjArrToStringArr(tmp["tag_subinformation"].([]interface{}))
- for _, sub := range v1 {
- MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": sub, "stype": 1})
- }
- save["industry_label"] = strings.Join(v1, ",")
- // 物业业态
- if tagset, ok := tmp["tag_set"].(map[string]interface{}); ok {
- if wuye, o1 := tagset["wuye"].(map[string]interface{}); o1 {
- if yt := util.ObjToString(wuye["property_form"]); yt != "" {
- var v2 []string
- for _, s := range strings.Split(yt, ",") {
- if s1 := YeTai[s]; s1 != "" {
- v2 = append(v2, s1)
- MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": s1, "stype": 2})
- }
- }
- save["activities_label"] = strings.Join(v2, ",")
- }
- if p := util.IntAll(wuye["period"]); p != 0 && ZhouQi[p] != "" {
- save["contract_cycle"] = ZhouQi[p]
- if util.IntAll(save["bidstatus"]) == 1 {
- if util.IntAll(tmp["jgtime"]) > 0 {
- if et := getDate(p, util.Int64All(tmp["jgtime"])); et > 0 {
- save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
- }
- } else if t := getDate(p, util.Int64All(tmp["lasttime"])); t > 0 {
- if t > 0 {
- save["next_bid_time"] = util.FormatDateByInt64(&t, util.Date_Full_Layout)
- }
- }
- }
- }
- }
- }
- // 2023年之后的数据
- if util.IntAll(save["bidstatus"]) == 1 {
- if save["next_bid_time"] == nil && util.Int64All(tmp["jgtime"]) > 1672502400 {
- st, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_start_time"]))
- et, _ := time.Parse(time.DateTime, util.ObjToString(save["contract_cycle_end_time"]))
- if t := et.Unix() - st.Unix(); t < 90*24*60*60 {
- if tmp["jgtime"] != nil && util.IntAll(tmp["jgtime"]) > 0 {
- if et1 := getDate(12, util.Int64All(tmp["jgtime"])); et1 > 0 {
- save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
- }
- } else {
- if t1 := util.Int64All(tmp["lasttime"]); t1 > 0 {
- if et1 := getDate(12, util.Int64All(tmp["lasttime"])); et1 > 0 {
- save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
- }
- }
- }
- } else {
- if t1 := util.Int64All(tmp["jgtime"]); t1 > 0 {
- et1 := t1 + t
- save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
- } else {
- if t2 := util.Int64All(tmp["lasttime"]); t2 > 0 {
- et1 := t2 + t
- save["next_bid_time"] = util.FormatDateByInt64(&et1, util.Date_Full_Layout)
- }
- }
- }
- }
- // 2023年之后的数据,无合约周期默认 1年
- if save["next_bid_time"] == nil && util.Int64All(save["jgtime"]) > 1672502400 {
- if et := getDate(12, util.Int64All(tmp["jgtime"])); et > 0 {
- save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
- }
- //else {
- // if t := util.Int64All(tmp["lasttime"]); t > 0 {
- // if et := getDate(12, util.Int64All(tmp["lasttime"])); et > 0 {
- // save["next_bid_time"] = util.FormatDateByInt64(&et, util.Date_Full_Layout)
- // }
- // }
- //}
- }
- }
- // 中标单位
- for _, w := range strings.Split(util.ObjToString(tmp["s_winner"]), ",") {
- if w != "" {
- MysqlTool.Insert("property_project_tag", bson.M{"source_id": pid, "value": w, "stype": 3})
- }
- }
- save["winner"] = util.ObjToString(tmp["s_winner"])
- savePool <- save
- }
- func getStr(b string) string {
- if b == "" {
- return ""
- }
- a1 := "(交通|运输物流|工信|农业|住建|城管|市政|出版广电|检察院|科技|民政|生态环境|市场监管|水利|应急管理|自然\n资源|财政|档案|党委办|组织|发改|宣传|政府办|政务中心|人大|政协|法院|公安|国资委|海关|机关事务|纪委|军队|人社|商务|审计税务|司法|体育|统计|统战|文旅|民宗|银保监|证监|气象|社会团体|公共资源交易)"
- a2 := "(卫健委|医疗)"
- a3 := "(教育|学校)"
- a4 := "(人行l金融业)"
- a5 := "(信息技术|电信行业|农林牧渔|建筑业|传媒|制造业|住宿餐饮|采矿业|能源化工|批发零售)"
- if strings.Contains(a1, b) {
- return "政府机构"
- } else if strings.Contains(a2, b) {
- return "医疗单位"
- } else if strings.Contains(a3, b) {
- return "教育单位"
- } else if strings.Contains(a4, b) {
- return "金融企业"
- } else if strings.Contains(a5, b) {
- return "商业公司"
- }
- return ""
- }
- func getDate(p int, stime int64) int64 {
- if p == 11 {
- } else if p == 12 {
- return stime + (365 * 24 * 60 * 60)
- } else if p == 13 {
- return stime + (365 * 24 * 60 * 60 * 2)
- } else if p == 14 {
- return stime + (365 * 24 * 60 * 60 * 3)
- } else if p == 15 {
- return stime + (365 * 24 * 60 * 60 * 5)
- }
- return 0
- }
|