123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package main
- import (
- "go.mongodb.org/mongo-driver/bson"
- "log"
- util "utils"
- "utils/elastic"
- "utils/mongodb"
- )
- func standardTask(stype string, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- switch stype {
- case "winnerent":
- winnerEnt(q)
- case "buyerent":
- buyerEnt(q)
- case "agencyent":
- agencyEnt(q)
- }
- }
- //winnerent
- func winnerEnt(q map[string]interface{}) {
- session := standardMgo.GetMgoConn()
- defer standardMgo.DestoryMongoConn(session)
- winnerent, _ := standard["winnerent"].(map[string]interface{})
- c, _ := winnerent["collect"].(string)
- index, _ := winnerent["index"].(string)
- itype, _ := winnerent["type"].(string)
- count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(standardMgo.DbName).C(c).Find(q).Iter()
- arr := make([]map[string]interface{}, EsBulkSize)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- //不生索引字段
- delete(tmp, "partners")
- delete(tmp, "wechat_accounts")
- delete(tmp, "tmp_id")
- tmp["company"] = tmp["company_name"]
- arr[i] = tmp
- n++
- if i == EsBulkSize-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, EsBulkSize)
- }
- if n%EsBulkSize == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println("create winnerent index...over", n)
- }
- //buyerent
- func buyerEnt(q map[string]interface{}) {
- session := standardMgo.GetMgoConn()
- defer standardMgo.DestoryMongoConn(session)
- buyerent, _ := standard["buyerent"].(map[string]interface{})
- c, _ := buyerent["collect"].(string)
- index, _ := buyerent["index"].(string)
- itype, _ := buyerent["type"].(string)
- count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(standardMgo.DbName).C(c).Find(q).Iter()
- arr := make([]map[string]interface{}, EsBulkSize)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- //不生索引字段
- delete(tmp, "partners")
- delete(tmp, "wechat_accounts")
- delete(tmp, "tmp_id")
- tmp["buyer"] = tmp["buyer_name"]
- arr[i] = tmp
- n++
- if i == EsBulkSize-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, EsBulkSize)
- }
- if n%EsBulkSize == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println("create buyerent index...over", n)
- }
- //agencyent
- func agencyEnt(q map[string]interface{}) {
- session := standardMgo.GetMgoConn()
- defer standardMgo.DestoryMongoConn(session)
- agencyent, _ := standard["agencyent"].(map[string]interface{})
- c, _ := agencyent["collect"].(string)
- index, _ := agencyent["index"].(string)
- itype, _ := agencyent["type"].(string)
- count, _ := session.DB(standardMgo.DbName).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println(standardMgo.DbName, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(standardMgo.DbName).C(c).Find(q).Iter()
- arr := make([]map[string]interface{}, EsBulkSize)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- //不生索引字段
- delete(tmp, "partners")
- delete(tmp, "wechat_accounts")
- delete(tmp, "tmp_id")
- tmp["agency"] = tmp["agency_name"]
- arr[i] = tmp
- n++
- if i == EsBulkSize-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, EsBulkSize)
- }
- if n%EsBulkSize == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println("create agencyent index...over", n)
- }
|