123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- package main
- import (
- "fmt"
- "go.mongodb.org/mongo-driver/bson"
- "log"
- "strings"
- "sync"
- util "utils"
- "utils/mongodb"
- )
- //对字段处理 bidamount budget
- //招标数据表和抽取表一一对应开始更新
- func biddingMergeTask(data []byte, mapInfo map[string]interface{}) {
- defer util.Catch()
- thread := 40
- var mpool = make(chan bool, thread)
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- //bidding库
- session := biddingMgo.GetMgoConn()
- defer biddingMgo.DestoryMongoConn(session)
- //extract库
- extractsession := extractMgo.GetMgoConn()
- defer extractMgo.DestoryMongoConn(extractsession)
- //连接信息
- c, _ := mapInfo["coll"].(string)
- if c == "" {
- c, _ = bidding["collect"].(string)
- }
- extractc, _ := extract["collect"].(string)
- count, _ := session.DB(biddingMgo.DbName).C(c).Find(&q).Count()
- //线程池
- UpdatesLock := sync.Mutex{}
- log.Println("查询语句:", q, "同步总数:", count)
- //查询招标数据
- query := session.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{
- "projectinfo.attachment": 0,
- "contenthtml": 0,
- }).Sort("_id").Iter()
- //查询抽取结果
- extractquery := extractsession.DB(extractMgo.DbName).C(extractc).Find(q).Sort("_id").Iter()
- n := 0
- //更新数组
- arr := [][]map[string]interface{}{}
- //对比两张表数据,减少查询次数
- var compare bson.M
- bnil := false
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- update := map[string]interface{}{}
- //对比方法----------------
- for {
- if compare == nil {
- compare = make(bson.M)
- if !extractquery.Next(compare) {
- break
- }
- }
- if compare != nil {
- //对比
- cid := mongodb.BsonIdToSId(compare["_id"])
- tid := mongodb.BsonIdToSId(tmp["_id"])
- if cid == tid {
- bnil = false
- //更新bidding表,生成索引
- for _, k := range biddingMgoFields {
- v1 := compare[k]
- v2 := tmp[k]
- if v2 == nil && v1 != nil {
- update[k] = v1
- } else if v2 != nil && v1 != nil {
- //update[k+"_b"] = v2
- update[k] = v1
- } else if v2 != nil && v1 == nil {
- //update[k+"_b"] = v2
- }
- }
- if util.IntAll(compare["repeat"]) == 1 {
- update["extracttype"] = -1
- //} else if qutil.IntAll(tmp["extracttype"]) == -1 {
- } else {
- update["extracttype"] = 1
- }
- break
- } else {
- if cid < tid {
- bnil = false
- compare = nil
- continue
- } else {
- bnil = true
- break
- }
- }
- } else {
- bnil = false
- break
- }
- }
- //下面可以多线程跑的--->
- //处理分类
- mpool <- true
- go func(tmp, update, compare map[string]interface{}, bnil bool) {
- defer func() {
- <-mpool
- }()
- if !bnil && compare != nil {
- subscopeclass, _ := compare["subscopeclass"].([]interface{})
- if subscopeclass != nil {
- //str := ","
- m1 := map[string]bool{}
- newclass := []string{}
- for _, sc := range subscopeclass {
- sclass, _ := sc.(string)
- if !m1[sclass] {
- m1[sclass] = true
- //str += sclass + ","
- newclass = append(newclass, sclass)
- }
- }
- update["s_subscopeclass"] = strings.Join(newclass, ",")
- update["subscopeclass"] = newclass
- }
- //处理中标企业
- winner, _ := compare["winner"].(string)
- m1 := map[string]bool{}
- if winner != "" {
- m1[winner] = true
- }
- package1 := compare["package"]
- if package1 != nil {
- packageM, _ := package1.(map[string]interface{})
- for _, p := range packageM {
- pm, _ := p.(map[string]interface{})
- pw, _ := pm["winner"].(string)
- if pw != "" {
- m1[pw] = true
- }
- }
- }
- compare = nil
- if len(m1) > 0 {
- //str := ","
- winnerarr := []string{}
- for k, _ := range m1 {
- //str += k + ","
- winnerarr = append(winnerarr, k)
- }
- update["s_winner"] = strings.Join(winnerarr, ",")
- }
- }
- //------------------对比结束
- //处理key descript
- // if bkey == "" {
- // DealInfo(&tmp, &update)
- // }
- //同时保存到elastic
- for tk, tv := range update {
- tmp[tk] = tv
- }
- if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
- tmp["budget"] = nil
- } else if sbd, ok := tmp["budget"].(string); ok {
- tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
- }
- if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
- tmp["bidamount"] = nil
- } else if sbd, ok := tmp["bidamount"].(string); ok {
- tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
- }
- // for k1, _ := range tmp {
- // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
- // delete(tmp, k1)
- // }
- // }
- //go IS.Add("bidding")
- UpdatesLock.Lock()
- if len(update) > 0 {
- arr = append(arr, []map[string]interface{}{
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": update,
- },
- })
- }
- //if len(arr) >= BulkSize-1 {
- // mgo.UpdateBulkAll(db, c, arr...)
- // arr = [][]map[string]interface{}{}
- //}
- UpdatesLock.Unlock()
- }(tmp, update, compare, bnil)
- if n%100 == 0 {
- log.Println("current:", n)
- }
- tmp = make(map[string]interface{})
- }
- for i := 0; i < thread; i++ {
- mpool <- true
- }
- //UpdatesLock.Lock()
- //if len(arr) > 0 {
- // mgo.UpdateBulkAll(db, c, arr...)
- //}
- //UpdatesLock.Unlock()
- log.Println(mapInfo, "merge bidding...over", n)
- }
|