123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- package main
- import (
- "fmt"
- "log"
- mgoutil "mongodb"
- qu "qfw/util"
- "strings"
- "sync"
- "time"
- "github.com/donnie4w/go-logger/logger"
- "go.mongodb.org/mongo-driver/bson"
- //"go.mongodb.org/mongo-driver/bson/primitive"
- )
- const ESMODEL = `
- {
- "query": {
- "filtered": {
- "filter": {
- "bool": {
- "must": [
- {
- "term": {
- "buyer": "%s"
- }
- }
- ]
- }
- },
- "query": {
- "bool": {
- "should": [
- {
- "multi_match": {
- "query": "%s",
- "type": "phrase",
- "fields": [
- "purchasing",
- "s_projectname",
- "title"
- ]
- }
- }
- ]
- }
- }
- }
- },
- "from": 0,
- "size": 100,
- "sort": [
- {
- "publishtime": "desc"
- }
- ],
- "_source": [
- "buyerperson",
- "buyertel",
- "projectname",
- "_id"
- ]
- }
- `
- func SaveMgo() {
- log.Println("Mgo Save...")
- arru := make([]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-MgoSaveCache:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MixMgo.SaveBulk(CollSave, arru...)
- }(arru)
- arru = make([]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MixMgo.SaveBulk(CollSave, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
- func GetProjectData(sid, eid string) {
- defer qu.Catch()
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- query := bson.M{
- "_id": bson.M{
- "$gt": mgoutil.StringTOBsonId(sid),
- "$lte": mgoutil.StringTOBsonId(eid),
- },
- "toptype": "拟建",
- }
- filed := map[string]interface{}{
- "area": 1,
- "city": 1,
- "buyer": 1,
- "projectname": 1,
- "title": 1,
- "href": 1,
- "publishtime": 1,
- "main_project": 1,
- "nature": 1,
- "top_category": 1,
- "sub_category": 1,
- "stage": 1,
- "approvestatus": 1,
- "projectinfo": 1,
- "projectcode": 1,
- }
- logger.Debug("query:", query)
- count, _ := sess.DB(Dbname).C(CollPro).Find(query).Count()
- log.Println("共查询:", count, "条")
- if count == 0 {
- return
- }
- it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- sum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
- if sum%100 == 0 {
- log.Println("current:", sum)
- }
- pool <- true
- wg.Add(1)
- go func(pro map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //stage必须存在
- stage := qu.ObjToString(pro["stage"])
- if stage == "" {
- return
- }
- //top_category必须存在
- top_category := qu.ObjToString(pro["top_category"])
- if top_category == "" {
- return
- }
- //approvestatus审批通过
- approvestatus := qu.ObjToString(pro["approvestatus"])
- if approvestatus != "审批通过" {
- return
- }
- //projectinfo.projecttype 报建、核准类、核准、审批类、审批、null
- projectinfo := pro["projectinfo"].(map[string]interface{})
- if projecttype, ok := projectinfo["projecttype"].(string); ok {
- if !ProjectTypeMap[projecttype] { //有值且不在范围内不进行项目预测
- return
- }
- }
- delete(pro, "projectinfo")
- //nature
- nature := qu.ObjToString(pro["nature"])
- if !NatureMap[nature] {
- return
- }
- //buyer存在且该buyer不仅有拟建数据
- buyer := qu.ObjToString(pro["buyer"])
- if buyer != "" {
- esqyery := `{"query": {"bool": {"must": [{"term": {"buyer": "` + buyer + `"}}],"must_not": [{"term": {"toptype": "拟建"}}]}},"from": 0,"size": 1}`
- list := Es.Get(Index, Itype, esqyery)
- if list == nil || len(*list) == 0 { //buyer仅有拟建数据不预测
- return
- }
- }
- //id
- id := mgoutil.BsonIdToSId(pro["_id"])
- pro["infoid"] = id
- pro["jyhref"] = `https://www.jianyu360.com/article/content/` + qu.CommonEncodeArticle("content", id) + `.html`
- delete(pro, "_id")
- //yucetime
- pro["yucetime"] = time.Now().Unix()
- //buyerclass
- ent, _ := MixMgo.FindOne(CollEnt, bson.M{"buyer_name": buyer})
- if len(*ent) > 0 && (*ent)["buyerclass"] != nil {
- arr := (*ent)["buyerclass"].([]interface{})
- if len(arr) == 1 {
- pro["buyerclass"] = arr
- } else {
- var arrTmp []string
- for _, v := range arr {
- val := qu.ObjToString(v)
- if val != "其它" {
- arrTmp = append(arrTmp, val)
- }
- }
- pro["buyerclass"] = arrTmp
- }
- }
- maps := []map[string]interface{}{}
- sub_category := qu.ObjToString(pro["sub_category"])
- arr := Forecast[stage]
- ForecastFlag := 0 //标记查询了几次标签表
- //qu.Debug("top_category---", top_category, "sub_category---", sub_category, "stage---", stage)
- for { //查project_biaoqian
- tmpArr := arr
- q := bson.M{} //查询条件
- if sub_category != "" { //sub_category存在优先用sub_category
- q = bson.M{
- //"sub_category": sub_category,
- "sub_category": bson.M{"$elemMatch": bson.M{"$eq": sub_category}},
- }
- } else { //top_category
- q = bson.M{
- "top_category": top_category,
- }
- }
- if stage == "后期施工" || stage == "竣工验收" || stage == "运行维护" {
- //qu.Debug("ForecastFlag---", ForecastFlag)
- if ForecastFlag == 0 { //第一次增加main_project判断
- main_project := qu.ObjToString(pro["main_project"])
- //qu.Debug("main_project---", main_project)
- if main_project != "" { //main_project存在
- tmpArr = append(tmpArr, "物品采购")
- //q["main_project"] = main_project
- q["main_project"] = bson.M{"$elemMatch": bson.M{"$eq": main_project}}
- q["stage"] = bson.M{"$in": tmpArr}
- } else { //main_project不存在
- if stage == "运行维护" { //stage是运行维护且main_project是空不预测
- //qu.Debug("---return---")
- return
- } else {
- q["stage"] = bson.M{"$in": tmpArr}
- ForecastFlag++
- }
- }
- } else if ForecastFlag == 1 {
- q["stage"] = bson.M{"$in": tmpArr}
- }
- ForecastFlag++
- } else { //规划可研、立项环评、勘察设计、建设准备、前期施工
- q["stage"] = bson.M{"$in": tmpArr}
- ForecastFlag = 2
- }
- //qu.Debug("ForecastFlag---", ForecastFlag, "q--------------", q)
- result, _ := MixMgo.Find(CollTag, q, nil, nil, false, -1, -1)
- //qu.Debug("result---", len(*result))
- if len(*result) == 0 && ForecastFlag <= 1 {
- continue
- } else if len(*result) >= 1 { //第一次查询有results就不再查询
- ForecastFlag++
- }
- for _, t := range *result {
- t["p_rate"] = Rate
- t["time"] = ""
- projects := GetProjects(qu.ObjToString(t["purchasing"]), buyer)
- if len(projects) > 0 {
- t["p_projects"] = projects
- }
- maps = append(maps, t)
- }
- //qu.Debug("ForecastFlag---", ForecastFlag)
- if ForecastFlag >= 2 {
- break
- }
- }
- if len(maps) > 0 {
- pro["results"] = maps
- }
- MgoSaveCache <- pro
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Println("Run Over...Count:", sum)
- }
- func GetProjects(purchasing, buyer string) (projects []map[string]interface{}) {
- if purchasing != "" {
- for _, text := range strings.Split(purchasing, ",") {
- latest_project := map[string]interface{}{} //存储最后一条数据信息
- result_project := map[string]interface{}{} //存储每个purchasing所查询的招标信息
- esquery := fmt.Sprintf(ESMODEL, buyer, text)
- list := Es.Get(Index, Itype, esquery)
- if list != nil && len(*list) > 0 {
- for i, l := range *list {
- p_phone := qu.ObjToString(l["buyertel"])
- if p_phone != "" { //记录有联系电话的最新信息
- result_project["p_purchasing"] = text
- result_project["p_phone"] = p_phone
- if p_person := qu.ObjToString(l["buyerperson"]); p_person != "" {
- result_project["p_person"] = p_person
- }
- result_project["p_id"] = qu.ObjToString(l["_id"])
- result_project["p_orther"] = qu.ObjToString(l["projectname"])
- break
- }
- if i == 0 { //记录第一条数据信息
- latest_project["p_purchasing"] = text
- // if p_phone != "" {
- // latest_project["p_phone"] = p_phone
- // }
- if p_person := qu.ObjToString(l["buyerperson"]); p_person != "" {
- latest_project["p_person"] = p_person
- }
- latest_project["p_id"] = qu.ObjToString(l["_id"])
- latest_project["p_orther"] = qu.ObjToString(l["projectname"])
- }
- }
- }
- if len(result_project) > 0 {
- projects = append(projects, result_project)
- } else if len(latest_project) > 0 {
- projects = append(projects, latest_project)
- }
- }
- }
- return
- }
- /*
- func GetProjectData_back(t string) {
- defer qu.Catch()
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- uptime, err := strconv.ParseInt(t, 10, 64)
- if err != nil {
- qu.Debug("时间转换错误:", t)
- return
- }
- query := bson.M{
- "updatetime": bson.M{"$gt": uptime},
- "o_projectinfo.nature": bson.M{"$in": Nature},
- "spidercode": bson.M{"$in": SpiderCodes},
- "stage": bson.M{"$exists": true},
- "$or": []bson.M{
- {"category_buyer": bson.M{"$in": Category}},
- {"category_purpose": bson.M{"$in": Category}},
- },
- }
- filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "category": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1, "title": 1}
- count, _ := sess.DB(Dbname).C(CollPro).Find(query).Count()
- log.Println("共查询:", count, "条")
- if count == 0 {
- return
- }
- it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
- pool := make(chan bool, 10) //控制线程数
- wg := &sync.WaitGroup{}
- //lock := &sync.Mutex{} //控制读写
- sum := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); sum++ {
- if sum%100 == 0 {
- log.Println("current:", sum)
- }
- pool <- true
- wg.Add(1)
- go func(pro map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- stage := qu.ObjToString(pro["stage"])
- if stage == "" {
- //log.Println("stage is null", pro["infoid"])
- return
- }
- if id, ok := pro["_id"].(string); ok && id != "" {
- pro["infoid"] = id
- } else {
- pro["infoid"] = mgoutil.BsonIdToSId(pro["_id"])
- }
- pro["yucetime"] = time.Now().Unix()
- pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"]
- buyer := (*qu.ObjToMap(pro["o_projectinfo"]))["buyer"]
- delete(pro, "_id")
- delete(pro, "o_projectinfo")
- //qu.Debug("buyer---", buyer)
- ent, _ := MixMgo.FindOne(CollEnt, bson.M{"buyer_name": buyer})
- if len(*ent) > 0 && (*ent)["buyerclass"] != nil {
- arr := (*ent)["buyerclass"].(primitive.A)
- if len(arr) == 1 {
- pro["buyerclass"] = arr
- } else {
- var arrTmp []string
- for _, v := range arr {
- val := qu.ObjToString(v)
- if val != "其它" {
- arrTmp = append(arrTmp, val)
- }
- }
- pro["buyerclass"] = arrTmp
- }
- }
- //qu.Debug("buyerclass---", pro["buyerclass"])
- category := GetCategory(pro)
- //qu.Debug("category---", category)
- if category == "" {
- return
- }
- q := bson.M{
- "category": category,
- "stage": bson.M{"$in": Forecast[stage]},
- }
- //qu.Debug("q----", q)
- result, _ := MixMgo.Find(CollTag, q, nil, nil, false, -1, -1)
- //qu.Debug("result---", *result)
- maps := []map[string]interface{}{}
- for _, t := range *result {
- // if len(t) == 0 {
- // continue
- // }
- r := make(map[string]interface{})
- r["stage"] = t["stage"]
- r["purchase_classify"] = t["purchase_classify"]
- r["purchasing"] = t["purchasing"]
- r["p_rate"] = Rate
- r["time"] = ""
- //tmp["p_projects"] = "" 暂无该字段
- maps = append(maps, r)
- }
- if len(maps) > 0 {
- pro["results"] = maps
- }
- //qu.Debug("pro---", pro)
- MgoSaveCache <- pro
- }(tmp)
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- log.Println("Run Over...Count:", sum)
- }*/
|