package main import ( "log" qutil "qfw/util" elastic "qfw/util/elastic" "strings" "sync" "gopkg.in/mgo.v2/bson" ) //对字段处理 bidamount budget //招标数据表和抽取表一一对应开始更新 func biddingAllTask(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() thread := 40 var mpool = make(chan bool, thread) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //bidding库 session := mgo.GetMgoConn() defer mgo.DestoryMongoConn(session) //extract库 extractsession := extractmgo.GetMgoConn() defer extractmgo.DestoryMongoConn(extractsession) //连接信息 c, _ := mapInfo["coll"].(string) if c == "" { c, _ = bidding["collect"].(string) } extractc, _ := bidding["extractcollect"].(string) db, _ := bidding["db"].(string) extractdb, _ := bidding["extractdb"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() fields := strings.Split(bidding["fields"].(string), ",") //线程池 UpdatesLock := sync.Mutex{} log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) //查询招标数据 query := session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Sort("_id").Iter() //查询抽取结果 extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter() n := 0 //更新数组 arr := [][]map[string]interface{}{} arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 var compare bson.M bnil := false for tmp := make(map[string]interface{}); query.Next(tmp); n++ { // if qutil.IntAll(tmp["dataging"]) == 1 { //dataging=1不生索引 // tmp = make(map[string]interface{}) // continue // } update := map[string]interface{}{} del := map[string]interface{}{} //记录extract没有值而bidding中有值的字段 //对比方法---------------- for { if compare == nil { compare = make(bson.M) if !extractquery.Next(compare) { break } } if compare != nil { //对比 cid := qutil.BsonIdToSId(compare["_id"]) tid := qutil.BsonIdToSId(tmp["_id"]) if cid == tid { bnil = false //更新bidding表,生成索引 for _, k := range fields { //fields更新到mongo的字段 v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 == nil { // if k == "s_subscopeclass" && del["subscopeclass"] == nil { continue } del[k] = 1 //qutil.Debug("抽取结果没有值,bidding有值:field--", k, "val--", v2) } } if qutil.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 } else { update["extracttype"] = 1 } break } else { if cid < tid { bnil = false compare = nil continue } else { bnil = true break } } } else { bnil = false break } } //下面可以多线程跑的---> //处理分类 mpool <- true _id := tmp["_id"] go func(tmp, update, compare, del map[string]interface{}, bnil bool) { defer func() { <-mpool }() if !bnil && compare != nil { subscopeclass, _ := compare["subscopeclass"].([]interface{}) if subscopeclass != nil { m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } //处理中标企业 // winner, _ := compare["winner"].(string) // m1 := map[string]bool{} // if winner != "" { // m1[winner] = true // } // package1 := compare["package"] // if package1 != nil { // packageM, _ := package1.(map[string]interface{}) // for _, p := range packageM { // pm, _ := p.(map[string]interface{}) // pw, _ := pm["winner"].(string) // if pw != "" { // m1[pw] = true // } // } // } compare = nil // if len(m1) > 0 { // //str := "," // winnerarr := []string{} // for k, _ := range m1 { // //str += k + "," // winnerarr = append(winnerarr, k) // } // update["s_winner"] = strings.Join(winnerarr, ",") // } } //------------------对比结束 //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } if tmp["supervisorrate"] != nil { //临时处理supervisorrate抽取类型为string不生索引 if _, ok := tmp["supervisorrate"].(string); ok { //supervisorrate数据为string类型 delete(tmp, "supervisorrate") } } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) // if ps == "" { // tmp["projectscope"] = "" // } if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } //对标的物为空处理 if filetext := getFileText(tmp); len(filetext) > 0 { tmp["filetext"] = filetext } if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" { delete(tmp, "purchasing") } if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 { delete(tmp, "purchasinglist") } //预算和中标金额 // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { // tmp["budget"] = nil // } else if sbd, ok := tmp["budget"].(string); ok { // tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] // } // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { // tmp["bidamount"] = nil // } else if sbd, ok := tmp["bidamount"].(string); ok { // tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] // } //go IS.Add("bidding") UpdatesLock.Lock() if qutil.IntAll(update["extracttype"]) != -1 { newTmp := map[string]interface{}{} for _, v := range biddingIndexFields { // if tmp[v] != nil && del[v] == nil { // if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap } } else if v == "purchasinglist" { //标的物处理 purchasinglist_new := []map[string]interface{}{} if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 { for _, ls := range pcl { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for _, pf := range purchasinglistFields { if lsm[pf] != nil { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } } if len(purchasinglist_new) > 0 { newTmp[v] = purchasinglist_new } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } /*else if v == "budget" || v == "bidamount" { newTmp[v] = nil }*/ } arrEs = append(arrEs, newTmp) } if len(update) > 0 { queryId := map[string]interface{}{"_id": tmp["_id"]} set := map[string]interface{}{"$set": update} if len(del) > 0 { //删除的数据 set["$unset"] = del } arr = append(arr, []map[string]interface{}{queryId, set}) } if len(arr) >= BulkSize { mgo.UpdateBulkAll(db, c, arr...) arr = [][]map[string]interface{}{} } if len(arrEs) >= BulkSize { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() }(tmp, update, compare, del, bnil) if n%1000 == 0 { log.Println("current:", n, _id) } tmp = make(map[string]interface{}) } for i := 0; i < thread; i++ { mpool <- true } UpdatesLock.Lock() //log.Println(db, c, index, itype, arr, arrEs) if len(arr) > 0 { mgo.UpdateBulkAll(db, c, arr...) } if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } } UpdatesLock.Unlock() log.Println(mapInfo, "create bidding index...over", n) }