package main import ( "fmt" "log" qutil "qfw/util" elastic "qfw/util/elastic" "strings" "sync" "gopkg.in/mgo.v2/bson" ) //对字段处理 bidamount budget var indexfield = []string{ "_id", "s_winner", "winner", "buyerclass", "title", "detail", "area", "site", "bidopendate", "bidopentime", "buyer", "city", "comeintime", "href", "infoformat", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "toptype", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus", "projectinfo", "buyertel", "buyerperson", "projectid", "buyerclass", "district", "topscopeclass", } //招标数据表和抽取表一一对应开始更新 func biddingDataTask(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //bidding库 session := mgo.GetMgoConn() defer mgo.DestoryMongoConn(session) //extract库 extractsession := extractmgo.GetMgoConn() defer extractmgo.DestoryMongoConn(extractsession) //连接信息 c, _ := mapInfo["coll"].(string) if c == "" { c, _ = bidding["collect"].(string) } extractc, _ := bidding["extractcollect"].(string) db, _ := bidding["db"].(string) extractdb, _ := bidding["extractdb"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() fields := strings.Split(bidding["fields"].(string), ",") //线程池 UpdatesLock := sync.Mutex{} log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) //查询招标数据 query := session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Sort("_id").Iter() //查询抽取结果 extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter() n := 0 //更新数组 arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 var compare bson.M bnil := false for tmp := make(map[string]interface{}); query.Next(tmp); n++ { update := map[string]interface{}{} //对比方法---------------- for { if compare == nil { compare = make(bson.M) if !extractquery.Next(compare) { break } } if compare != nil { //对比 cid := qutil.BsonIdToSId(compare["_id"]) tid := qutil.BsonIdToSId(tmp["_id"]) if cid == tid { bnil = false //更新bidding表,生成索引 for _, k := range fields { v1 := compare[k] v2 := tmp[k] if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { //update[k+"_b"] = v2 update[k] = v1 } else if v2 != nil && v1 == nil { update[k] = v2 } } if qutil.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 } else if qutil.IntAll(tmp["extracttype"]) == -1 { update["extracttype"] = 1 } break } else { if cid < tid { bnil = false compare = nil continue } else { bnil = true break } } } else { bnil = false break } } //下面可以多线程跑的---> //处理分类 mpool <- true go func(tmp, update, compare map[string]interface{}, bnil bool) { defer func() { <-mpool }() if !bnil && compare != nil { subscopeclass, _ := compare["subscopeclass"].([]interface{}) if subscopeclass != nil { //str := "," m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true //str += sclass + "," newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } //处理中标企业 winner, _ := compare["winner"].(string) m1 := map[string]bool{} if winner != "" { m1[winner] = true } package1 := compare["package"] if package1 != nil { packageM, _ := package1.(map[string]interface{}) for _, p := range packageM { pm, _ := p.(map[string]interface{}) pw, _ := pm["winner"].(string) if pw != "" { m1[pw] = true } } } compare = nil if len(m1) > 0 { //str := "," winnerarr := []string{} for k, _ := range m1 { //str += k + "," winnerarr = append(winnerarr, k) } update["s_winner"] = strings.Join(winnerarr, ",") } } //------------------对比结束 //处理key descript // if bkey == "" { // DealInfo(&tmp, &update) // } //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if ps == "" { tmp["projectscope"] = "" //= tmp["detail"] } if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { tmp["budget"] = nil } else if sbd, ok := tmp["budget"].(string); ok { tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] } if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { tmp["bidamount"] = nil } else if sbd, ok := tmp["bidamount"].(string); ok { tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] } // for k1, _ := range tmp { // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" { // delete(tmp, k1) // } // } //go IS.Add("bidding") UpdatesLock.Lock() if qutil.IntAll(update["extracttype"]) != -1 { newTmp := map[string]interface{}{} for _, v := range indexfield { if tmp[v] != nil { if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } else if v == "budget" || v == "bidamount" { newTmp[v] = nil } } arrEs = append(arrEs, newTmp) } if len(arrEs) >= BulkSize-1 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() }(tmp, update, compare, bnil) if n%1000 == 0 { log.Println("current:", n) } tmp = make(map[string]interface{}) } UpdatesLock.Lock() if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } } UpdatesLock.Unlock() log.Println(mapInfo, "create bidding index...over", n) }