package main import ( "fmt" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "sync" "time" ) var ( Mgo *mongodb.MongodbSim Es *elastic.Elastic EsNew *elastic.Elastic Es3 *elastic.Elastic MatchArr = make([]TagMatching, 0) // 存放标签规则 globalRegs = make([]TagMatching, 0) //关键词规则,是标签规则大前提 //更新es updateEsPool = make(chan []map[string]interface{}, 5000) updateEsSp = make(chan bool, 5) //保存协程 // 更新mongo updatePool = make(chan []map[string]interface{}, 5000) updateSp = make(chan bool, 5) ) func main() { go updateEsMethod() // 更新es go updateMethod() Init() //oss.InitOss() InitRule() taskRun() log.Println("over") c := make(chan bool, 1) <-c } // taskRun 执行 func taskRun() { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) ch := make(chan bool, 15) wg := &sync.WaitGroup{} //查询条件 q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId("65b7ebfc66cf0db42a996a65"), "$lte": mongodb.StringTOBsonId("6612323266cf0db42a75789f"), //"$lte": mongodb.StringTOBsonId("5f00e67e52c1d9fbf8367996"), //测试环境 }, } count := 0 selected := map[string]interface{}{"title": 1, "detail": 1, "projectname": 1, "purchasing": 1, "buyer": 1, "attach_text": 1} it := sess.DB("qfw").C("bidding").Find(q).Select(selected).Iter() for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%5000 == 0 { log.Println("current:", count, tmp["_id"]) } ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() // 存在附件时 if atta, ok := tmp["attach_text"]; ok && atta != nil { id := mongodb.BsonIdToSId(tmp["_id"]) if id == "" { return } err, doc := Es.GetById("bidding", id) if err != nil { return } else { tmp["filetext"] = doc["filetext"] } } gs, _, _ := TaskTags(tmp, globalRegs) if len(gs) > 0 { tags, match, add := TaskTags(tmp, MatchArr) if len(tags) > 0 { update := map[string]interface{}{ "mobile_tag": tags, } log.Println("id--", mongodb.BsonIdToSId(tmp["_id"]), match, add) //更新MongoDB updatePool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": update}, } //Mgo.UpdateById("bidding", mongodb.BsonIdToSId(tmp["_id"]), map[string]interface{}{"$set": update}) //更新es updateEsPool <- []map[string]interface{}{ {"_id": mongodb.BsonIdToSId(tmp["_id"])}, update, } } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() fmt.Println("over ---> ", count) } // updateMethod 更新MongoDB func updateMethod() { arru := make([][]map[string]interface{}, 50) indexu := 0 for { select { case v := <-updatePool: arru[indexu] = v indexu++ if indexu == 50 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 50) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateSp }() Mgo.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 50) indexu = 0 } } } } // updateEsMethod 更新es func updateEsMethod() { arru := make([][]map[string]interface{}, 200) indexu := 0 for { select { case v := <-updateEsPool: arru[indexu] = v indexu++ if indexu == 200 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding", arru...) }(arru) arru = make([][]map[string]interface{}, 200) indexu = 0 } case <-time.After(1000 * time.Millisecond): if indexu > 0 { updateEsSp <- true go func(arru [][]map[string]interface{}) { defer func() { <-updateEsSp }() Es.UpdateBulk("bidding", arru...) EsNew.UpdateBulk("bidding", arru...) }(arru[:indexu]) arru = make([][]map[string]interface{}, 200) indexu = 0 } } } }