package main import ( "fmt" "log" "qfw/util" elastic "qfw/util/elastic" "gopkg.in/mgo.v2/bson" ) func projectTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } } session := extractmgo.GetMgoConn(3600) defer extractmgo.DestoryMongoConn(session) c, _ := project["collect"].(string) db, _ := project["db"].(string) index, _ := project["index"].(string) itype, _ := project["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Iter() arr := make([]map[string]interface{}, savesizei) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { delete(tmp, "package") if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { tmp["budget"] = nil } if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { tmp["bidamount"] = nil } go IS.Add("project") arr[i] = tmp n++ if i == savesizei-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, savesizei) } if n%savesizei == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println(mapInfo, "create project index...over", n) }