package main import ( "fmt" "go.mongodb.org/mongo-driver/bson" "log" "strings" "sync" util "utils" "utils/mongodb" ) //对字段处理 bidamount budget //招标数据表和抽取表一一对应开始更新 func biddingMergeTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() thread := 40 var mpool = make(chan bool, thread) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //bidding库 session := biddingMgo.GetMgoConn() defer biddingMgo.DestoryMongoConn(session) //extract库 extractsession := extractMgo.GetMgoConn() defer extractMgo.DestoryMongoConn(extractsession) //连接信息 c, _ := mapInfo["coll"].(string) if c == "" { c, _ = bidding["collect"].(string) } extractc, _ := extract["collect"].(string) count, _ := session.DB(biddingMgo.DbName).C(c).Find(&q).Count() //线程池 UpdatesLock := sync.Mutex{} log.Println("查询语句:", q, "同步总数:", count) //查询招标数据 query := session.DB(biddingMgo.DbName).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Sort("_id").Iter() //查询抽取结果 extractquery := extractsession.DB(extractMgo.DbName).C(extractc).Find(q).Sort("_id").Iter() n := 0 //更新数组 arr := [][]map[string]interface{}{} //对比两张表数据,减少查询次数 var compare bson.M bnil := false for tmp := make(map[string]interface{}); query.Next(tmp); n++ { update := map[string]interface{}{} //对比方法---------------- for { if compare == nil { compare = make(bson.M) if !extractquery.Next(compare) { break } } if compare != nil { //对比 cid := mongodb.BsonIdToSId(compare["_id"]) tid := mongodb.BsonIdToSId(tmp["_id"]) if cid == tid { bnil = false //更新bidding表,生成索引 for _, k := range biddingMgoFields { v1 := compare[k] v2 := tmp[k] if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { //update[k+"_b"] = v2 update[k] = v1 } else if v2 != nil && v1 == nil { //update[k+"_b"] = v2 } } if util.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 //} else if qutil.IntAll(tmp["extracttype"]) == -1 { } else { update["extracttype"] = 1 } break } else { if cid < tid { bnil = false compare = nil continue } else { bnil = true break } } } else { bnil = false break } } //下面可以多线程跑的---> //处理分类 mpool <- true go func(tmp, update, compare map[string]interface{}, bnil bool) { defer func() { <-mpool }() if !bnil && compare != nil { subscopeclass, _ := compare["subscopeclass"].([]interface{}) if subscopeclass != nil { //str := "," m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true //str += sclass + "," newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } //处理中标企业 winner, _ := compare["winner"].(string) m1 := map[string]bool{} if winner != "" { m1[winner] = true } package1 := compare["package"] if package1 != nil { packageM, _ := package1.(map[string]interface{}) for _, p := range packageM { pm, _ := p.(map[string]interface{}) pw, _ := pm["winner"].(string) if pw != "" { m1[pw] = true } } } compare = nil if len(m1) > 0 { //str := "," winnerarr := []string{} for k, _ := range m1 { //str += k + "," winnerarr = append(winnerarr, k) } update["s_winner"] = strings.Join(winnerarr, ",") } } //------------------对比结束 //处理key descript // if bkey == "" { // DealInfo(&tmp, &update) // } //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { tmp["budget"] = nil } else if sbd, ok := tmp["budget"].(string); ok { tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] } if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { tmp["bidamount"] = nil } else if sbd, ok := tmp["bidamount"].(string); ok { tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] } // for k1, _ := range tmp { // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" { // delete(tmp, k1) // } // } //go IS.Add("bidding") UpdatesLock.Lock() if len(update) > 0 { arr = append(arr, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": update, }, }) } //if len(arr) >= BulkSize-1 { // mgo.UpdateBulkAll(db, c, arr...) // arr = [][]map[string]interface{}{} //} UpdatesLock.Unlock() }(tmp, update, compare, bnil) if n%100 == 0 { log.Println("current:", n) } tmp = make(map[string]interface{}) } for i := 0; i < thread; i++ { mpool <- true } //UpdatesLock.Lock() //if len(arr) > 0 { // mgo.UpdateBulkAll(db, c, arr...) //} //UpdatesLock.Unlock() log.Println(mapInfo, "merge bidding...over", n) }