package main import ( "go.mongodb.org/mongo-driver/bson" "log" "mongodb" qu "qfw/util" "time" ) var queryClose = make(chan bool) var queryCloseOver = make(chan bool) func SaveMgo() { log.Println("Mgo Save...") arru := make([]map[string]interface{}, 200) indexu := 0 for { select { case v := <-MgoSaveCache: arru[indexu] = v indexu++ if indexu == 200 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() MongoTool.SaveBulk(CollSave, arru...) }(arru) arru = make([]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { SP <- true go func(arru []map[string]interface{}) { defer func() { <-SP }() MongoTool.SaveBulk(CollSave, arru...) }(arru[:indexu]) arru = make([]map[string]interface{}, 200) indexu = 0 } } } } //项目数据 func GetProjectData(t string) { defer qu.Catch() count, taskcount := 0, 0 sess := MongoTool.GetMgoConn() defer MongoTool.DestoryMongoConn(sess) dataPool := make(chan map[string]interface{}, 2000) over := make(chan bool) pool := make(chan bool, 4) go func() { L: for { select { case tmp := <-dataPool: pool <- true taskcount++ go func(tmp map[string]interface{}) { defer func() { <-pool }() ForecastMethod(tmp) }(tmp) case <-over: break L } } }() //uptime, err := strconv.ParseInt(t, 10, 64) //if err == nil { // qu.Debug(err) //} query := bson.M{ //"updatetime": bson.M{"$gt": uptime}, "o_projectinfo.nature": bson.M{"$in": Nature}, "spidercode": bson.M{"$in": SpiderCodes}, "$or": []bson.M{ {"category_buyer": bson.M{"$in": Category}}, {"category_purpose": bson.M{"$in": Category}}, }, } //qu.Debug("query-----", CollPro, query["updatetime"]) filed := map[string]interface{}{"area": 1, "city": 1, "buyer": 1, "projectname": 1, "nature": 1, "category_buyer": 1, "category_purpose": 1, "stage": 1, "o_projectinfo": 1, "title": 1} it := sess.DB(Dbname).C(CollPro).Select(filed).Find(query).Iter() var lastid interface{} L: for { select { case <-queryClose: log.Println("receive interrupt sign") log.Println("close iter..", lastid, it.Cursor.Close(nil)) queryCloseOver <- true break L default: tmp := make(map[string]interface{}) if it.Next(&tmp) { lastid = tmp["_id"] if count%1000 == 0 { log.Println("current", count, lastid) } dataPool <- tmp count++ } else { break L } } } time.Sleep(5 * time.Second) over <- true //阻塞 for n := 0; n < 4; n++ { pool <- true } } func ForecastMethod(pro map[string]interface{}) { pro["infoid"] = mongodb.BsonIdToSId(pro["_id"]) pro["yucetime"] = time.Now().Unix() pro["nature"] = (*qu.ObjToMap(pro["o_projectinfo"]))["nature"] delete(pro, "_id") delete(pro, "o_projectinfo") category := GetCategory(pro) stage := qu.ObjToString(pro["stage"]) q := bson.M{ "category": category, "stage": bson.M{"$in": Forecast[stage]}, } var maps []map[string]interface{} ent, _ := MongoTool.FindOne(CollEnt, bson.M{"buyer_name": pro["buyer"]}) if len(*ent) > 0 && (*ent)["buyerclass"] != nil { arr := qu.ObjArrToStringArr((*ent)["buyerclass"].([]interface{})) if len(arr) == 1 { pro["buyerclass"] = arr }else { var arrTmp []string for _, v := range arr { if v != "其它" { arrTmp = append(arrTmp, v) } } pro["buyerclass"] = arrTmp } } result, _ := MongoTool.Find(CollTag, q, nil, nil, false, -1, -1) for _, t := range *result { if len(t) == 0 { continue } tmp := make(map[string]interface{}) tmp["stage"] = t["stage"] tmp["purchase_classify"] = t["purchase_classify"] tmp["purchasing"] = t["purchasing"] tmp["p_rate"] = Rate tmp["time"] = "" //tmp["p_projects"] = "" 暂无该字段 maps = append(maps, tmp) } if len(maps) > 0 { pro["results"] = maps } MgoSaveCache <- pro } func GetCategory(tmp map[string]interface{}) string { categoryBuyerIndex := -1 categoryPurposeIndex := -1 for k, v := range Category { if tmp["category_buyer"] != nil { if qu.ObjToString(tmp["category_buyer"]) == qu.ObjToString(v) { categoryBuyerIndex = k } } if tmp["category_purpose"] != nil { categoryPurposeIndex = k } } if categoryBuyerIndex >= categoryPurposeIndex { return qu.ObjToString(Category[categoryBuyerIndex]) } else { return qu.ObjToString(Category[categoryPurposeIndex]) } }