123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- package main
- import (
- "fmt"
- "log"
- qutil "qfw/util"
- elastic "qfw/util/elastic"
- "strings"
- //elastic "qfw/util/elastic_v5"
- // "strings"
- "sync"
- "time"
- "gopkg.in/mgo.v2/bson"
- )
- func biddingBackTask2(data []byte, mapInfo map[string]interface{}) {
- defer qutil.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- c, _ := mapInfo["coll"].(string)
- if c == "" {
- c, _ = biddingback["collect"].(string)
- }
- cs := strings.Split(c, ",")
- for _, c := range cs {
- //bidding库
- session := mgo.GetMgoConn(86400)
- defer mgo.DestoryMongoConn(session)
- //连接信息
- db, _ := biddingback["db"].(string)
- index, _ := biddingback["index"].(string)
- itype, _ := biddingback["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- //线程池
- UpdatesLock := sync.Mutex{}
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- query := session.DB(db).C(c).Find(q).Select(bson.M{
- "contenthtml": 0,
- "s_sha": 0,
- }).Sort("_id").Iter()
- //查询抽取结果
- n := 0
- //更新数组
- arrEs := []map[string]interface{}{}
- //对比两张表数据,减少查询次数
- thread := qutil.IntAll(mapInfo["thread"])
- //不传为0只生成招标索引,1生成招标+预览,2只生成预览
- if thread < 1 {
- thread = 3
- }
- log.Println("es线程数:", thread)
- espool := make(chan bool, thread)
- now1 := time.Now().Unix()
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if qutil.IntAll(tmp["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- ct := qutil.Int64All(tmp["comeintime"])
- pt := qutil.Int64All(tmp["publishtime"])
- if pt > ct+86400 || pt > now1 { //时间问题,需要更新
- if ct > now1 {
- ct = now1
- }
- tmp["publishtime"] = ct
- }
- ps, _ := tmp["projectscope"].(string)
- if ps == "" {
- tmp["projectscope"] = "" //= tmp["detail"]
- }
- if len(ps) > ESLEN {
- tmp["projectscope"] = string(([]rune(ps))[:4000])
- }
- if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
- tmp["budget"] = nil
- } else if sbd, ok := tmp["budget"].(string); ok {
- tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
- }
- if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
- tmp["bidamount"] = nil
- } else if sbd, ok := tmp["bidamount"].(string); ok {
- tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
- }
- UpdatesLock.Lock()
- newTmp := map[string]interface{}{}
- for _, v := range biddingIndexFields {
- if tmp[v] != nil {
- if "projectinfo" == v {
- //处理附件 content
- mp, _ := tmp[v].(map[string]interface{})
- if mp != nil {
- newmap := map[string]interface{}{}
- for _, v1 := range projectinfoFields {
- if mp[v1] != nil {
- newmap[v1] = mp[v1]
- }
- }
- if len(newmap) > 0 {
- newTmp[v] = newmap
- }
- attachments := mp["attachments"]
- con := ""
- if attachments != nil {
- am, _ := attachments.(map[string]interface{})
- if am != nil {
- for _, v1 := range am {
- vm, _ := v1.(map[string]interface{})
- if vm != nil {
- c, _ := vm["content"].(string)
- con += c
- }
- }
- }
- }
- if con != "" {
- con = FilterDetailSpace(con)
- newTmp["attachments"] = con
- }
- }
- } else {
- if v == "detail" {
- detail, _ := tmp[v].(string)
- newTmp[v] = FilterDetail(detail)
- } else {
- newTmp[v] = tmp[v]
- }
- }
- } else if v == "budget" || v == "bidamount" {
- newTmp[v] = nil
- }
- }
- arrEs = append(arrEs, newTmp)
- if len(arrEs) >= BulkSizeBack {
- tmps := arrEs
- espool <- true
- go func(tmps []map[string]interface{}) {
- defer func() {
- <-espool
- }()
- elastic.BulkSave(index, itype, &tmps, true)
- }(tmps)
- arrEs = []map[string]interface{}{}
- }
- UpdatesLock.Unlock()
- if n%1000 == 0 {
- log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"]))
- }
- tmp = make(map[string]interface{})
- }
- UpdatesLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- }
- UpdatesLock.Unlock()
- log.Println(mapInfo, "create biddingback2 index...over", c, n)
- }
- }
|