123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- package main
- import (
- "fmt"
- "log"
- qutil "qfw/util"
- elastic "qfw/util/elastic"
- //elastic "qfw/util/elastic_v5"
- "regexp"
- // "strings"
- "sync"
- "time"
- "gopkg.in/mgo.v2/bson"
- )
- var (
- BulkSizeBack = 400
- ESLEN = 32766
- )
- func biddingBackTask(data []byte, mapInfo map[string]interface{}) {
- defer qutil.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- //bidding库
- session := mgo.GetMgoConn(86400)
- defer mgo.DestoryMongoConn(session)
- //连接信息
- c, _ := mapInfo["coll"].(string)
- if c == "" {
- // c, _ = bidding["collect"].(string)
- c, _ = biddingback["collect"].(string)
- }
- db, _ := biddingback["db"].(string)
- index, _ := biddingback["index"].(string)
- itype, _ := biddingback["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- //线程池
- UpdatesLock := sync.Mutex{}
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- //查询招标数据
- // query := session.DB(db).C(c).Find(q).Select(bson.M{
- // //"projectinfo.attachment": 0,
- // "contenthtml": 0,
- // }).Sort("_id").Iter()
- query := session.DB(db).C(c).Find(q).Select(bson.M{
- "contenthtml": 0,
- "s_sha": 0,
- }).Sort("_id").Iter()
- //查询抽取结果
- n := 0
- //更新数组
- arrEs := []map[string]interface{}{}
- //对比两张表数据,减少查询次数
- thread := qutil.IntAll(mapInfo["thread"])
- //不传为0只生成招标索引,1生成招标+预览,2只生成预览
- _multiIndex := qutil.IntAll(mapInfo["multiIndex"])
- if thread < 1 {
- thread = 3
- }
- log.Println("es线程数:", thread)
- espool := make(chan bool, thread)
- now1 := time.Now().Unix()
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- if qutil.IntAll(tmp["extracttype"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- } else {
- tmp["extracttype"] = 1
- }
- ct := qutil.Int64All(tmp["comeintime"])
- pt := qutil.Int64All(tmp["publishtime"])
- if pt > ct+86400 || pt > now1 { //时间问题,需要更新
- if ct > now1 {
- ct = now1
- }
- tmp["publishtime"] = ct
- }
- ps, _ := tmp["projectscope"].(string)
- if ps == "" {
- tmp["projectscope"] = "" //= tmp["detail"]
- }
- if len(ps) > ESLEN {
- tmp["projectscope"] = string(([]rune(ps))[:4000])
- }
- if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
- tmp["budget"] = nil
- } else if sbd, ok := tmp["budget"].(string); ok {
- tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
- }
- if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
- tmp["bidamount"] = nil
- } else if sbd, ok := tmp["bidamount"].(string); ok {
- tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
- }
- UpdatesLock.Lock()
- newTmp := map[string]interface{}{}
- for _, v := range biddingIndexFields {
- if tmp[v] != nil {
- if "projectinfo" == v {
- mp, _ := tmp[v].(map[string]interface{})
- if mp != nil {
- newmap := map[string]interface{}{}
- for _, v1 := range projectinfoFields {
- if mp[v1] != nil {
- newmap[v1] = mp[v1]
- }
- }
- newTmp[v] = newmap
- }
- } else {
- if v == "detail" {
- detail, _ := tmp[v].(string)
- newTmp[v] = FilterDetail(detail)
- } else {
- newTmp[v] = tmp[v]
- }
- }
- } else if v == "budget" || v == "bidamount" {
- newTmp[v] = nil
- }
- }
- arrEs = append(arrEs, newTmp)
- if len(arrEs) >= BulkSizeBack {
- tmps := arrEs
- espool <- true
- go func(tmps []map[string]interface{}) {
- defer func() {
- <-espool
- }()
- if _multiIndex == 0 {
- elastic.BulkSave(index, itype, &tmps, true)
- } else if _multiIndex == 1 && len(multiIndex) == 2 {
- elastic.BulkSave(index, itype, &tmps, true)
- elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
- } else if _multiIndex == 2 && len(multiIndex) == 2 {
- elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
- }
- }(tmps)
- arrEs = []map[string]interface{}{}
- }
- UpdatesLock.Unlock()
- if n%1000 == 0 {
- log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"]))
- }
- tmp = make(map[string]interface{})
- }
- UpdatesLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- if _multiIndex == 0 {
- elastic.BulkSave(index, itype, &tmps, true)
- } else if _multiIndex == 1 && len(multiIndex) == 2 {
- elastic.BulkSave(index, itype, &tmps, true)
- elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
- } else if _multiIndex == 2 && len(multiIndex) == 2 {
- elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
- }
- }
- UpdatesLock.Unlock()
- log.Println(mapInfo, "create biddingback index...over", n)
- }
- var filterReg = regexp.MustCompile("<[^>]+>")
- var filterSpace = regexp.MustCompile("<[^>]*?>|[\\s\u3000\u2003\u00a0]")
- func FilterDetail(text string) string {
- return filterReg.ReplaceAllString(text, "")
- }
- func FilterDetailSpace(text string) string {
- return filterSpace.ReplaceAllString(text, "")
- }
|