12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- package main
- import (
- "go.mongodb.org/mongo-driver/bson"
- "log"
- "mongodb"
- "qfw/util"
- elastic "qfw/util/elastic"
- )
- func defaultFunc(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- tasktype, _ := mapInfo["stype"].(string)
- if tasktype == "" {
- return
- }
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- var Mgo *mongodb.MongodbSim
- c, _ := mapInfo["c"].(string)
- db, _ := mapInfo["d"].(string)
- index, _ := mapInfo["index"].(string)
- itype, _ := mapInfo["type"].(string)
- if itype == "" {
- itype = index
- }
- if mapInfo["mgoaddr"] != nil {
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: mapInfo["mgoaddr"].(string),
- Size: 5,
- DbName: db,
- }
- Mgo.InitPool()
- } else {
- Mgo = mgo
- }
- session := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(session)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Iter()
- arr := make([]map[string]interface{}, savesizei)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- go IS.Add(tasktype)
- arr[i] = tmp
- n++
- if i == savesizei-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, savesizei)
- }
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println(mapInfo, "create "+tasktype+" index...over", n)
- }
|