|
@@ -0,0 +1,168 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "log"
|
|
|
+ qutil "qfw/util"
|
|
|
+ elastic "qfw/util/elastic"
|
|
|
+ "strings"
|
|
|
+ //elastic "qfw/util/elastic_v5"
|
|
|
+ // "strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
+)
|
|
|
+
|
|
|
+func biddingBackTask2(data []byte, mapInfo map[string]interface{}) {
|
|
|
+ defer qutil.Catch()
|
|
|
+ q, _ := mapInfo["query"].(map[string]interface{})
|
|
|
+ if q == nil {
|
|
|
+ q = map[string]interface{}{
|
|
|
+ "_id": bson.M{
|
|
|
+ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ c, _ := mapInfo["coll"].(string)
|
|
|
+ if c == "" {
|
|
|
+ c, _ = biddingback["collect"].(string)
|
|
|
+ }
|
|
|
+ cs := strings.Split(c, ",")
|
|
|
+ for _, c := range cs {
|
|
|
+ //bidding库
|
|
|
+ session := mgo.GetMgoConn(86400)
|
|
|
+ defer mgo.DestoryMongoConn(session)
|
|
|
+ //连接信息
|
|
|
+
|
|
|
+ db, _ := biddingback["db"].(string)
|
|
|
+ index, _ := biddingback["index"].(string)
|
|
|
+ itype, _ := biddingback["type"].(string)
|
|
|
+ count, _ := session.DB(db).C(c).Find(&q).Count()
|
|
|
+ //线程池
|
|
|
+ UpdatesLock := sync.Mutex{}
|
|
|
+ log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
|
|
|
+ query := session.DB(db).C(c).Find(q).Select(bson.M{
|
|
|
+ "contenthtml": 0,
|
|
|
+ "s_sha": 0,
|
|
|
+ }).Sort("_id").Iter()
|
|
|
+ //查询抽取结果
|
|
|
+ n := 0
|
|
|
+ //更新数组
|
|
|
+ arrEs := []map[string]interface{}{}
|
|
|
+ //对比两张表数据,减少查询次数
|
|
|
+ thread := qutil.IntAll(mapInfo["thread"])
|
|
|
+ //不传为0只生成招标索引,1生成招标+预览,2只生成预览
|
|
|
+ if thread < 1 {
|
|
|
+ thread = 3
|
|
|
+ }
|
|
|
+ log.Println("es线程数:", thread)
|
|
|
+ espool := make(chan bool, thread)
|
|
|
+ now1 := time.Now().Unix()
|
|
|
+ for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
|
|
|
+ if qutil.IntAll(tmp["extracttype"]) == -1 {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ ct := qutil.Int64All(tmp["comeintime"])
|
|
|
+ pt := qutil.Int64All(tmp["publishtime"])
|
|
|
+ if pt > ct+86400 || pt > now1 { //时间问题,需要更新
|
|
|
+ if ct > now1 {
|
|
|
+ ct = now1
|
|
|
+ }
|
|
|
+ tmp["publishtime"] = ct
|
|
|
+ }
|
|
|
+
|
|
|
+ ps, _ := tmp["projectscope"].(string)
|
|
|
+ if ps == "" {
|
|
|
+ tmp["projectscope"] = "" //= tmp["detail"]
|
|
|
+ }
|
|
|
+ if len(ps) > ESLEN {
|
|
|
+ tmp["projectscope"] = string(([]rune(ps))[:4000])
|
|
|
+ }
|
|
|
+ if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
|
|
|
+ tmp["budget"] = nil
|
|
|
+ } else if sbd, ok := tmp["budget"].(string); ok {
|
|
|
+ tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
|
|
|
+ }
|
|
|
+ if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
|
|
|
+ tmp["bidamount"] = nil
|
|
|
+ } else if sbd, ok := tmp["bidamount"].(string); ok {
|
|
|
+ tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
|
|
|
+ }
|
|
|
+ UpdatesLock.Lock()
|
|
|
+ newTmp := map[string]interface{}{}
|
|
|
+ for _, v := range biddingIndexFields {
|
|
|
+ if tmp[v] != nil {
|
|
|
+ if "projectinfo" == v {
|
|
|
+ //处理附件 content
|
|
|
+ mp, _ := tmp[v].(map[string]interface{})
|
|
|
+ if mp != nil {
|
|
|
+ newmap := map[string]interface{}{}
|
|
|
+ for _, v1 := range projectinfoFields {
|
|
|
+ if mp[v1] != nil {
|
|
|
+ newmap[v1] = mp[v1]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if len(newmap) > 0 {
|
|
|
+ newTmp[v] = newmap
|
|
|
+ }
|
|
|
+ attachments := mp["attachments"]
|
|
|
+ con := ""
|
|
|
+ if attachments != nil {
|
|
|
+ am, _ := attachments.(map[string]interface{})
|
|
|
+ if am != nil {
|
|
|
+ for _, v1 := range am {
|
|
|
+ vm, _ := v1.(map[string]interface{})
|
|
|
+ if vm != nil {
|
|
|
+ c, _ := vm["content"].(string)
|
|
|
+ con += c
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if con != "" {
|
|
|
+ con = FilterDetailSpace(con)
|
|
|
+ newTmp["attachments"] = con
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if v == "detail" {
|
|
|
+ detail, _ := tmp[v].(string)
|
|
|
+ newTmp[v] = FilterDetail(detail)
|
|
|
+ } else {
|
|
|
+ newTmp[v] = tmp[v]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else if v == "budget" || v == "bidamount" {
|
|
|
+ newTmp[v] = nil
|
|
|
+ }
|
|
|
+ }
|
|
|
+ arrEs = append(arrEs, newTmp)
|
|
|
+ if len(arrEs) >= BulkSizeBack {
|
|
|
+ tmps := arrEs
|
|
|
+ espool <- true
|
|
|
+ go func(tmps []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-espool
|
|
|
+ }()
|
|
|
+ elastic.BulkSave(index, itype, &tmps, true)
|
|
|
+ }(tmps)
|
|
|
+ arrEs = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ UpdatesLock.Unlock()
|
|
|
+ if n%1000 == 0 {
|
|
|
+ log.Println("current:", n, qutil.BsonIdToSId(tmp["_id"]))
|
|
|
+ }
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ UpdatesLock.Lock()
|
|
|
+ if len(arrEs) > 0 {
|
|
|
+ tmps := arrEs
|
|
|
+ elastic.BulkSave(index, itype, &tmps, true)
|
|
|
+ }
|
|
|
+ UpdatesLock.Unlock()
|
|
|
+ log.Println(mapInfo, "create biddingback2 index...over", c, n)
|
|
|
+ }
|
|
|
+}
|