package main import ( "go.mongodb.org/mongo-driver/bson" "log" "mongodb" "qfw/util" elastic "qfw/util/elastic" ) func defaultFunc(data []byte, mapInfo map[string]interface{}) { defer util.Catch() tasktype, _ := mapInfo["stype"].(string) if tasktype == "" { return } q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } var Mgo *mongodb.MongodbSim c, _ := mapInfo["c"].(string) db, _ := mapInfo["d"].(string) index, _ := mapInfo["index"].(string) itype, _ := mapInfo["type"].(string) if itype == "" { itype = index } if mapInfo["mgoaddr"] != nil { Mgo = &mongodb.MongodbSim{ MongodbAddr: mapInfo["mgoaddr"].(string), Size: 5, DbName: db, } Mgo.InitPool() } else { Mgo = mgo } session := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(session) count, _ := session.DB(db).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(db).C(c).Find(q).Iter() arr := make([]map[string]interface{}, savesizei) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { go IS.Add(tasktype) arr[i] = tmp n++ if i == savesizei-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, savesizei) } if n%savesizei == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println(mapInfo, "create "+tasktype+" index...over", n) }