123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- package main
- import (
- "fmt"
- "log"
- qutil "qfw/util"
- elastic "qfw/util/elastic"
- "strings"
- "sync"
- "gopkg.in/mgo.v2/bson"
- )
- //对字段处理 bidamount budget
- var indexfield = []string{
- "_id",
- "s_winner",
- "winner",
- "buyerclass",
- "title",
- "detail",
- "area",
- "site",
- "bidopendate",
- "bidopentime",
- "buyer",
- "city",
- "comeintime",
- "href",
- "infoformat",
- "projectcode",
- "projectname",
- "publishtime",
- "s_sha",
- "spidercode",
- "subtype",
- "toptype",
- "agency",
- "budget",
- "bidamount",
- "s_subscopeclass",
- "projectscope",
- "bidstatus",
- "projectinfo",
- "buyertel",
- "buyerperson",
- "projectid",
- "buyerclass",
- "district",
- "topscopeclass",
- }
- //招标数据表和抽取表一一对应开始更新
- func biddingDataTask(data []byte, mapInfo map[string]interface{}) {
- defer qutil.Catch()
- q, _ := mapInfo["query"].(map[string]interface{})
- if q == nil {
- q = map[string]interface{}{
- "_id": bson.M{
- "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- }
- //bidding库
- session := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(session)
- //extract库
- extractsession := extractmgo.GetMgoConn()
- defer extractmgo.DestoryMongoConn(extractsession)
- //连接信息
- c, _ := mapInfo["coll"].(string)
- if c == "" {
- c, _ = bidding["collect"].(string)
- }
- extractc, _ := bidding["extractcollect"].(string)
- db, _ := bidding["db"].(string)
- extractdb, _ := bidding["extractdb"].(string)
- index, _ := bidding["index"].(string)
- itype, _ := bidding["type"].(string)
- count, _ := session.DB(db).C(c).Find(&q).Count()
- fields := strings.Split(bidding["fields"].(string), ",")
- //线程池
- UpdatesLock := sync.Mutex{}
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
- //查询招标数据
- query := session.DB(db).C(c).Find(q).Select(bson.M{
- "projectinfo.attachment": 0,
- "contenthtml": 0,
- }).Sort("_id").Iter()
- //查询抽取结果
- extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter()
- n := 0
- //更新数组
- arrEs := []map[string]interface{}{}
- //对比两张表数据,减少查询次数
- var compare bson.M
- bnil := false
- for tmp := make(map[string]interface{}); query.Next(tmp); n++ {
- update := map[string]interface{}{}
- //对比方法----------------
- for {
- if compare == nil {
- compare = make(bson.M)
- if !extractquery.Next(compare) {
- break
- }
- }
- if compare != nil {
- //对比
- cid := qutil.BsonIdToSId(compare["_id"])
- tid := qutil.BsonIdToSId(tmp["_id"])
- if cid == tid {
- bnil = false
- //更新bidding表,生成索引
- for _, k := range fields {
- v1 := compare[k]
- v2 := tmp[k]
- if v2 == nil && v1 != nil {
- update[k] = v1
- } else if v2 != nil && v1 != nil {
- //update[k+"_b"] = v2
- update[k] = v1
- } else if v2 != nil && v1 == nil {
- update[k] = v2
- }
- }
- if qutil.IntAll(compare["repeat"]) == 1 {
- update["extracttype"] = -1
- } else if qutil.IntAll(tmp["extracttype"]) == -1 {
- update["extracttype"] = 1
- }
- break
- } else {
- if cid < tid {
- bnil = false
- compare = nil
- continue
- } else {
- bnil = true
- break
- }
- }
- } else {
- bnil = false
- break
- }
- }
- //下面可以多线程跑的--->
- //处理分类
- mpool <- true
- go func(tmp, update, compare map[string]interface{}, bnil bool) {
- defer func() {
- <-mpool
- }()
- if !bnil && compare != nil {
- subscopeclass, _ := compare["subscopeclass"].([]interface{})
- if subscopeclass != nil {
- //str := ","
- m1 := map[string]bool{}
- newclass := []string{}
- for _, sc := range subscopeclass {
- sclass, _ := sc.(string)
- if !m1[sclass] {
- m1[sclass] = true
- //str += sclass + ","
- newclass = append(newclass, sclass)
- }
- }
- update["s_subscopeclass"] = strings.Join(newclass, ",")
- update["subscopeclass"] = newclass
- }
- //处理中标企业
- winner, _ := compare["winner"].(string)
- m1 := map[string]bool{}
- if winner != "" {
- m1[winner] = true
- }
- package1 := compare["package"]
- if package1 != nil {
- packageM, _ := package1.(map[string]interface{})
- for _, p := range packageM {
- pm, _ := p.(map[string]interface{})
- pw, _ := pm["winner"].(string)
- if pw != "" {
- m1[pw] = true
- }
- }
- }
- compare = nil
- if len(m1) > 0 {
- //str := ","
- winnerarr := []string{}
- for k, _ := range m1 {
- //str += k + ","
- winnerarr = append(winnerarr, k)
- }
- update["s_winner"] = strings.Join(winnerarr, ",")
- }
- }
- //------------------对比结束
- //处理key descript
- // if bkey == "" {
- // DealInfo(&tmp, &update)
- // }
- //同时保存到elastic
- for tk, tv := range update {
- tmp[tk] = tv
- }
- //对projectscope字段的索引处理
- ps, _ := tmp["projectscope"].(string)
- if ps == "" {
- tmp["projectscope"] = "" //= tmp["detail"]
- }
- if len(ps) > ESLEN {
- tmp["projectscope"] = string(([]rune(ps))[:4000])
- }
- if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "<nil>" || s_budget == "null" {
- tmp["budget"] = nil
- } else if sbd, ok := tmp["budget"].(string); ok {
- tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0]
- }
- if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "<nil>" || s_bidamount == "null" {
- tmp["bidamount"] = nil
- } else if sbd, ok := tmp["bidamount"].(string); ok {
- tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0]
- }
- // for k1, _ := range tmp {
- // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
- // delete(tmp, k1)
- // }
- // }
- //go IS.Add("bidding")
- UpdatesLock.Lock()
- if qutil.IntAll(update["extracttype"]) != -1 {
- newTmp := map[string]interface{}{}
- for _, v := range indexfield {
- if tmp[v] != nil {
- if "projectinfo" == v {
- mp, _ := tmp[v].(map[string]interface{})
- if mp != nil {
- newmap := map[string]interface{}{}
- for _, v1 := range projectinfoFields {
- if mp[v1] != nil {
- newmap[v1] = mp[v1]
- }
- }
- newTmp[v] = newmap
- }
- } else {
- if v == "detail" {
- detail, _ := tmp[v].(string)
- newTmp[v] = FilterDetail(detail)
- } else {
- newTmp[v] = tmp[v]
- }
- }
- } else if v == "budget" || v == "bidamount" {
- newTmp[v] = nil
- }
- }
- arrEs = append(arrEs, newTmp)
- }
- if len(arrEs) >= BulkSize-1 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- if len(multiIndex) == 2 {
- elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
- }
- arrEs = []map[string]interface{}{}
- }
- UpdatesLock.Unlock()
- }(tmp, update, compare, bnil)
- if n%1000 == 0 {
- log.Println("current:", n)
- }
- tmp = make(map[string]interface{})
- }
- UpdatesLock.Lock()
- if len(arrEs) > 0 {
- tmps := arrEs
- elastic.BulkSave(index, itype, &tmps, true)
- if len(multiIndex) == 2 {
- elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
- }
- }
- UpdatesLock.Unlock()
- log.Println(mapInfo, "create bidding index...over", n)
- }
|