12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- package main
- import (
- "fmt"
- "log"
- "qfw/util"
- elastic "qfw/util/elastic"
- "gopkg.in/mgo.v2/bson"
- )
- func projectTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- session := extractmgo.GetMgoConn(3600)
- defer extractmgo.DestoryMongoConn(session)
- c, _ := project["collect"].(string)
- db, _ := project["db"].(string)
- index, _ := project["index"].(string)
- itype, _ := project["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- savepool := make(chan bool, 10)
- log.Println(db, c, "查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Iter()
- arr := make([]map[string]interface{}, savesizei)
- var n int
- i := 0
- for tmp := make(map[string]interface{}); query.Next(tmp); i = i + 1 {
- delete(tmp, "package")
- if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
- tmp["budget"] = nil
- }
- if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
- tmp["bidamount"] = nil
- }
- go IS.Add("project")
- arr[i] = tmp
- n++
- if i == savesizei-1 {
- savepool <- true
- tmps := arr
- go func(tmpn *[]map[string]interface{}) {
- defer func() {
- <-savepool
- }()
- elastic.BulkSave(index, itype, tmpn, true)
- }(&tmps)
- i = 0
- arr = make([]map[string]interface{}, savesizei)
- }
- if n%savesizei == 0 {
- log.Println("当前:", n)
- }
- tmp = make(map[string]interface{})
- }
- if i > 0 {
- elastic.BulkSave(index, itype, &arr, true)
- }
- log.Println(mapInfo, "create project index...over", n)
- }
|