123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package main
- import (
- "go.mongodb.org/mongo-driver/bson"
- "log"
- "mongodb"
- qu "qfw/util"
- "strconv"
- "time"
- )
- var queryClose = make(chan bool)
- var queryCloseOver = make(chan bool)
- func SaveMgo() {
- log.Println("Mgo Save...")
- arru := make([]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-MgoSaveCache:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MongoTool.SaveBulk(CollSave, arru...)
- }(arru)
- arru = make([]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- SP <- true
- go func(arru []map[string]interface{}) {
- defer func() {
- <-SP
- }()
- MongoTool.SaveBulk(CollSave, arru...)
- }(arru[:indexu])
- arru = make([]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
- //项目数据
- func GetProjectData(t string) {
- defer qu.Catch()
- count, taskcount := 0, 0
- sess := MongoTool.GetMgoConn()
- defer MongoTool.DestoryMongoConn(sess)
- dataPool := make(chan map[string]interface{}, 2000)
- over := make(chan bool)
- pool := make(chan bool, 4)
- go func() {
- L:
- for {
- select {
- case tmp := <-dataPool:
- pool <- true
- taskcount++
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- }()
- ForecastMethod(tmp)
- }(tmp)
- case <-over:
- break L
- }
- }
- }()
- uptime, err := strconv.ParseInt(t, 10, 64)
- if err == nil {
- qu.Debug(err)
- }
- query := bson.M{
- "updatetime": bson.M{"$gt": uptime},
- "o_projectinfo.nature": bson.M{"$in": Nature},
- "spidercode": bson.M{"$in": SpiderCodes},
- "$or": []bson.M{
- {"category_buyer": bson.M{"$in": Category}},
- {"category_purpose": bson.M{"$in": Category}},
- },
- }
- qu.Debug("query-----", CollPro, query["updatetime"])
- filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "category": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1, "title": 1}
- it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter()
- var lastid interface{}
- L:
- for {
- select {
- case <-queryClose:
- log.Println("receive interrupt sign")
- log.Println("close iter..", lastid, it.Cursor.Close(nil))
- queryCloseOver <- true
- break L
- default:
- tmp := make(map[string]interface{})
- if it.Next(&tmp) {
- lastid = tmp["_id"]
- if count%1000 == 0 {
- log.Println("current", count, lastid)
- }
- dataPool <- tmp
- count++
- } else {
- break L
- }
- }
- }
- time.Sleep(5 * time.Second)
- over <- true
- //阻塞
- for n := 0; n < 4; n++ {
- pool <- true
- }
- }
- func ForecastMethod(pro map[string]interface{}) {
- pro["infoid"] = mongodb.BsonIdToSId(pro["_id"])
- pro["yucetime"] = time.Now().Unix()
- pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"]
- delete(pro, "_id")
- delete(pro, "o_projectinfo")
- category := GetCategory(pro)
- stage := qu.ObjToString(pro["stage"])
- q := bson.M{
- "category": category,
- "stage": bson.M{"$in": Forecast[stage]},
- }
- var maps []map[string]interface{}
- ent, _ := MongoTool.FindOne(CollEnt, bson.M{"buyer_name": pro["buyer"]})
- if len(*ent) > 0 && (*ent)["buyerclass"] != nil {
- arr := qu.ObjArrToStringArr((*ent)["buyerclass"].([]interface{}))
- if len(arr) == 1 {
- pro["buyerclass"] = arr
- }else {
- var arrTmp []string
- for _, v := range arr {
- if v != "其它" {
- arrTmp = append(arrTmp, v)
- }
- }
- pro["buyerclass"] = arrTmp
- }
- }
- result, _ := MongoTool.Find(CollTag, q, nil, nil, false, -1, -1)
- for _, t := range *result {
- if len(t) == 0 {
- continue
- }
- tmp := make(map[string]interface{})
- tmp["stage"] = t["stage"]
- tmp["purchase_classify"] = t["purchase_classify"]
- tmp["purchasing"] = t["purchasing"]
- tmp["p_rate"] = Rate
- tmp["time"] = ""
- //tmp["p_projects"] = "" 暂无该字段
- maps = append(maps, tmp)
- }
- if len(maps) > 0 {
- pro["results"] = maps
- }
- MgoSaveCache <- pro
- }
- func GetCategory(tmp map[string]interface{}) string {
- categoryBuyerIndex := -1
- categoryPurposeIndex := -1
- for k, v := range Category {
- if tmp["category_buyer"] != nil {
- if qu.ObjToString(tmp["category_buyer"]) == qu.ObjToString(v) {
- categoryBuyerIndex = k
- }
- }
- if tmp["category_purpose"] != nil {
- categoryPurposeIndex = k
- }
- }
- if categoryBuyerIndex >= categoryPurposeIndex {
- return qu.ObjToString(Category[categoryBuyerIndex])
- } else {
- return qu.ObjToString(Category[categoryPurposeIndex])
- }
- }
|