package main import ( "go.mongodb.org/mongo-driver/bson" "log" util "utils" "utils/elastic" "utils/mongodb" ) func standardTask(stype string, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } switch stype { case "winnerent": winnerEnt(q) case "buyerent": buyerEnt(q) case "agencyent": agencyEnt(q) } } //winnerent func winnerEnt(q map[string]interface{}) { session := standardMgo.GetMgoConn() defer standardMgo.DestoryMongoConn(session) winnerent, _ := standard["winnerent"].(map[string]interface{}) c, _ := winnerent["collect"].(string) index, _ := winnerent["index"].(string) itype, _ := winnerent["type"].(string) count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(standardMgo.DbName).C(c).Find(q).Iter() arr := make([]map[string]interface{}, EsBulkSize) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { //不生索引字段 delete(tmp, "partners") delete(tmp, "wechat_accounts") delete(tmp, "tmp_id") tmp["company"] = tmp["company_name"] arr[i] = tmp n++ if i == EsBulkSize-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, EsBulkSize) } if n%EsBulkSize == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println("create winnerent index...over", n) } //buyerent func buyerEnt(q map[string]interface{}) { session := standardMgo.GetMgoConn() defer standardMgo.DestoryMongoConn(session) buyerent, _ := standard["buyerent"].(map[string]interface{}) c, _ := buyerent["collect"].(string) index, _ := buyerent["index"].(string) itype, _ := buyerent["type"].(string) count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(standardMgo.DbName).C(c).Find(q).Iter() arr := make([]map[string]interface{}, EsBulkSize) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { //不生索引字段 delete(tmp, "partners") delete(tmp, "wechat_accounts") delete(tmp, "tmp_id") tmp["buyer"] = tmp["buyer_name"] arr[i] = tmp n++ if i == EsBulkSize-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, EsBulkSize) } if n%EsBulkSize == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println("create buyerent index...over", n) } //agencyent func agencyEnt(q map[string]interface{}) { session := standardMgo.GetMgoConn() defer standardMgo.DestoryMongoConn(session) agencyent, _ := standard["agencyent"].(map[string]interface{}) c, _ := agencyent["collect"].(string) index, _ := agencyent["index"].(string) itype, _ := agencyent["type"].(string) count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count() savepool := make(chan bool, 10) log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index) query := session.DB(standardMgo.DbName).C(c).Find(q).Iter() arr := make([]map[string]interface{}, EsBulkSize) var n int i := 0 for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 { //不生索引字段 delete(tmp, "partners") delete(tmp, "wechat_accounts") delete(tmp, "tmp_id") tmp["agency"] = tmp["agency_name"] arr[i] = tmp n++ if i == EsBulkSize-1 { savepool <- true tmps := arr go func(tmpn *[]map[string]interface{}) { defer func() { <-savepool }() elastic.BulkSave(index, itype, tmpn, true) }(&tmps) i = 0 arr = make([]map[string]interface{}, EsBulkSize) } if n%EsBulkSize == 0 { log.Println("当前:", n) } tmp = make(map[string]interface{}) } if i > 0 { elastic.BulkSave(index, itype, &arr, true) } log.Println("create agencyent index...over", n) }