package main import ( "fmt" "log" mgoutil "mongodb" qu "qfw/util" "strings" "sync" "time" "github.com/donnie4w/go-logger/logger" "go.mongodb.org/mongo-driver/bson" //"go.mongodb.org/mongo-driver/bson/primitive" ) const ESMODEL = ` { "query": { "filtered": { "filter": { "bool": { "must": [ { "term": { "buyer": "%s" } } ] } }, "query": { "bool": { "should": [ { "multi_match": { "query": "%s", "type": "phrase", "fields": [ "purchasing", "s_projectname", "title" ] } } ] } } } }, "from": 0, "size": 100, "sort": [ { "publishtime": "desc" } ], "_source": [ "buyerperson", "buyertel", "projectname", "_id" ] } ` func SaveMgo() { log.Println("Mgo Save...") arru := make([]map[string]interface{}, 200) indexu := 0 for { select { case v := <-MgoSaveCache: arru[indexu] = v indexu++ if indexu == 200 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() MixMgo.SaveBulk(CollSave, arru...) }(arru) arru = make([]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() MixMgo.SaveBulk(CollSave, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, 200) indexu = 0 } } } } func GetProjectData(sid, eid string) { defer qu.Catch() sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) query := bson.M{ "_id": bson.M{ "$gt": mgoutil.StringTOBsonId(sid), "$lte": mgoutil.StringTOBsonId(eid), }, "toptype": "拟建", } filed := map[string]interface{}{ "area": 1, "city": 1, "buyer": 1, "projectname": 1, "title": 1, "href": 1, "publishtime": 1, "main_project": 1, "nature": 1, "top_category": 1, "sub_category": 1, "stage": 1, "approvestatus": 1, "projectinfo": 1, "projectcode": 1, } logger.Debug("query:", query) count, _ := sess.DB(Dbname).C(CollPro).Find(query).Count() log.Println("共查询:", count, "条") if count == 0 { return } it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter() pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} sum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ { if sum%100 == 0 { log.Println("current:", sum) } pool <- true wg.Add(1) go func(pro map[string]interface{}) { defer func() { <-pool wg.Done() }() //stage必须存在 stage := qu.ObjToString(pro["stage"]) if stage == "" { return } //top_category必须存在 top_category := qu.ObjToString(pro["top_category"]) if top_category == "" { return } //approvestatus审批通过 approvestatus := qu.ObjToString(pro["approvestatus"]) if approvestatus != "审批通过" { return } //projectinfo.projecttype 报建、核准类、核准、审批类、审批、null projectinfo := pro["projectinfo"].(map[string]interface{}) if projecttype, ok := projectinfo["projecttype"].(string); ok { if !ProjectTypeMap[projecttype] { //有值且不在范围内不进行项目预测 return } } delete(pro, "projectinfo") //nature nature := qu.ObjToString(pro["nature"]) if !NatureMap[nature] { return } //buyer存在且该buyer不仅有拟建数据 buyer := qu.ObjToString(pro["buyer"]) if buyer != "" { esqyery := `{"query": {"bool": {"must": [{"term": {"buyer": "` + buyer + `"}}],"must_not": [{"term": {"toptype": "拟建"}}]}},"from": 0,"size": 1}` list := Es.Get(Index, Itype, esqyery) if list == nil || len(*list) == 0 { //buyer仅有拟建数据不预测 return } } //id id := mgoutil.BsonIdToSId(pro["_id"]) pro["infoid"] = id pro["jyhref"] = `https://www.jianyu360.com/article/content/` + qu.CommonEncodeArticle("content", id) + `.html` delete(pro, "_id") //yucetime pro["yucetime"] = time.Now().Unix() //buyerclass ent, _ := MixMgo.FindOne(CollEnt, bson.M{"buyer_name": buyer}) if len(*ent) > 0 && (*ent)["buyerclass"] != nil { arr := (*ent)["buyerclass"].([]interface{}) if len(arr) == 1 { pro["buyerclass"] = arr } else { var arrTmp []string for _, v := range arr { val := qu.ObjToString(v) if val != "其它" { arrTmp = append(arrTmp, val) } } pro["buyerclass"] = arrTmp } } maps := []map[string]interface{}{} sub_category := qu.ObjToString(pro["sub_category"]) arr := Forecast[stage] ForecastFlag := 0 //标记查询了几次标签表 //qu.Debug("top_category---", top_category, "sub_category---", sub_category, "stage---", stage) for { //查project_biaoqian tmpArr := arr q := bson.M{} //查询条件 if sub_category != "" { //sub_category存在优先用sub_category q = bson.M{ //"sub_category": sub_category, "sub_category": bson.M{"$elemMatch": bson.M{"$eq": sub_category}}, } } else { //top_category q = bson.M{ "top_category": top_category, } } if stage == "后期施工" || stage == "竣工验收" || stage == "运行维护" { //qu.Debug("ForecastFlag---", ForecastFlag) if ForecastFlag == 0 { //第一次增加main_project判断 main_project := qu.ObjToString(pro["main_project"]) //qu.Debug("main_project---", main_project) if main_project != "" { //main_project存在 tmpArr = append(tmpArr, "物品采购") //q["main_project"] = main_project q["main_project"] = bson.M{"$elemMatch": bson.M{"$eq": main_project}} q["stage"] = bson.M{"$in": tmpArr} } else { //main_project不存在 if stage == "运行维护" { //stage是运行维护且main_project是空不预测 //qu.Debug("---return---") return } else { q["stage"] = bson.M{"$in": tmpArr} ForecastFlag++ } } } else if ForecastFlag == 1 { q["stage"] = bson.M{"$in": tmpArr} } ForecastFlag++ } else { //规划可研、立项环评、勘察设计、建设准备、前期施工 q["stage"] = bson.M{"$in": tmpArr} ForecastFlag = 2 } //qu.Debug("ForecastFlag---", ForecastFlag, "q--------------", q) result, _ := MixMgo.Find(CollTag, q, nil, nil, false, -1, -1) //qu.Debug("result---", len(*result)) if len(*result) == 0 && ForecastFlag <= 1 { continue } else if len(*result) >= 1 { //第一次查询有results就不再查询 ForecastFlag++ } for _, t := range *result { t["p_rate"] = Rate t["time"] = "" projects := GetProjects(qu.ObjToString(t["purchasing"]), buyer) if len(projects) > 0 { t["p_projects"] = projects } maps = append(maps, t) } //qu.Debug("ForecastFlag---", ForecastFlag) if ForecastFlag >= 2 { break } } if len(maps) > 0 { pro["results"] = maps } MgoSaveCache <- pro }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("Run Over...Count:", sum) } func GetProjects(purchasing, buyer string) (projects []map[string]interface{}) { if purchasing != "" { for _, text := range strings.Split(purchasing, ",") { latest_project := map[string]interface{}{} //存储最后一条数据信息 result_project := map[string]interface{}{} //存储每个purchasing所查询的招标信息 esquery := fmt.Sprintf(ESMODEL, buyer, text) list := Es.Get(Index, Itype, esquery) if list != nil && len(*list) > 0 { for i, l := range *list { p_phone := qu.ObjToString(l["buyertel"]) if p_phone != "" { //记录有联系电话的最新信息 result_project["p_purchasing"] = text result_project["p_phone"] = p_phone if p_person := qu.ObjToString(l["buyerperson"]); p_person != "" { result_project["p_person"] = p_person } result_project["p_id"] = qu.ObjToString(l["_id"]) result_project["p_orther"] = qu.ObjToString(l["projectname"]) break } if i == 0 { //记录第一条数据信息 latest_project["p_purchasing"] = text // if p_phone != "" { // latest_project["p_phone"] = p_phone // } if p_person := qu.ObjToString(l["buyerperson"]); p_person != "" { latest_project["p_person"] = p_person } latest_project["p_id"] = qu.ObjToString(l["_id"]) latest_project["p_orther"] = qu.ObjToString(l["projectname"]) } } } if len(result_project) > 0 { projects = append(projects, result_project) } else if len(latest_project) > 0 { projects = append(projects, latest_project) } } } return } /* func GetProjectData_back(t string) { defer qu.Catch() sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) uptime, err := strconv.ParseInt(t, 10, 64) if err != nil { qu.Debug("时间转换错误:", t) return } query := bson.M{ "updatetime": bson.M{"$gt": uptime}, "o_projectinfo.nature": bson.M{"$in": Nature}, "spidercode": bson.M{"$in": SpiderCodes}, "stage": bson.M{"$exists": true}, "$or": []bson.M{ {"category_buyer": bson.M{"$in": Category}}, {"category_purpose": bson.M{"$in": Category}}, }, } filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "category": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1, "title": 1} count, _ := sess.DB(Dbname).C(CollPro).Find(query).Count() log.Println("共查询:", count, "条") if count == 0 { return } it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter() pool := make(chan bool, 10) //控制线程数 wg := &sync.WaitGroup{} //lock := &sync.Mutex{} //控制读写 sum := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ { if sum%100 == 0 { log.Println("current:", sum) } pool <- true wg.Add(1) go func(pro map[string]interface{}) { defer func() { <-pool wg.Done() }() stage := qu.ObjToString(pro["stage"]) if stage == "" { //log.Println("stage is null", pro["infoid"]) return } if id, ok := pro["_id"].(string); ok && id != "" { pro["infoid"] = id } else { pro["infoid"] = mgoutil.BsonIdToSId(pro["_id"]) } pro["yucetime"] = time.Now().Unix() pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"] buyer := (*qu.ObjToMap(pro["o_projectinfo"]))["buyer"] delete(pro, "_id") delete(pro, "o_projectinfo") //qu.Debug("buyer---", buyer) ent, _ := MixMgo.FindOne(CollEnt, bson.M{"buyer_name": buyer}) if len(*ent) > 0 && (*ent)["buyerclass"] != nil { arr := (*ent)["buyerclass"].(primitive.A) if len(arr) == 1 { pro["buyerclass"] = arr } else { var arrTmp []string for _, v := range arr { val := qu.ObjToString(v) if val != "其它" { arrTmp = append(arrTmp, val) } } pro["buyerclass"] = arrTmp } } //qu.Debug("buyerclass---", pro["buyerclass"]) category := GetCategory(pro) //qu.Debug("category---", category) if category == "" { return } q := bson.M{ "category": category, "stage": bson.M{"$in": Forecast[stage]}, } //qu.Debug("q----", q) result, _ := MixMgo.Find(CollTag, q, nil, nil, false, -1, -1) //qu.Debug("result---", *result) maps := []map[string]interface{}{} for _, t := range *result { // if len(t) == 0 { // continue // } r := make(map[string]interface{}) r["stage"] = t["stage"] r["purchase_classify"] = t["purchase_classify"] r["purchasing"] = t["purchasing"] r["p_rate"] = Rate r["time"] = "" //tmp["p_projects"] = "" 暂无该字段 maps = append(maps, r) } if len(maps) > 0 { pro["results"] = maps } //qu.Debug("pro---", pro) MgoSaveCache <- pro }(tmp) tmp = make(map[string]interface{}) } wg.Wait() log.Println("Run Over...Count:", sum) }*/