package main import ( // "fmt" "log" qutil "qfw/util" elastic "qfw/util/elastic" "strings" "sync" "gopkg.in/mgo.v2/bson" ) //对字段处理 bidamount budget var indexfield = []string{ "_id", "s_winner", "winner", "buyerclass", "title", "detail", "area", "site", "bidopendate", "bidopentime", "buyer", "city", "comeintime", "href", "infoformat", "projectcode", "projectname", "publishtime", "s_sha", "spidercode", "subtype", "toptype", "agency", "budget", "bidamount", "s_subscopeclass", "projectscope", "bidstatus", "projectinfo", "buyertel", "buyerperson", "projectid", "buyerclass", "district", "topscopeclass", "attachments", } //招标数据表和抽取表一一对应开始更新 func biddingDataTask(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() thread := 40 var mpool = make(chan bool, thread) q, _ := mapInfo["query"].(map[string]interface{}) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //bidding库 session := mgo.GetMgoConn() defer mgo.DestoryMongoConn(session) //extract库 extractsession := extractmgo.GetMgoConn() defer extractmgo.DestoryMongoConn(extractsession) //连接信息 c, _ := mapInfo["coll"].(string) if c == "" { c, _ = bidding["collect"].(string) } extractc, _ := bidding["extractcollect"].(string) db, _ := bidding["db"].(string) extractdb, _ := bidding["extractdb"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) count, _ := session.DB(db).C(c).Find(&q).Count() fields := strings.Split(bidding["fields"].(string), ",") //线程池 UpdatesLock := sync.Mutex{} log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) //查询招标数据 query := session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Sort("_id").Iter() //查询抽取结果 extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter() n := 0 //更新数组 arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 var compare bson.M bnil := false for tmp := make(map[string]interface{}); query.Next(tmp); n++ { update := map[string]interface{}{} //对比方法---------------- for { if compare == nil { compare = make(bson.M) if !extractquery.Next(compare) { break } } if compare != nil { //对比 cid := qutil.BsonIdToSId(compare["_id"]) tid := qutil.BsonIdToSId(tmp["_id"]) if cid == tid { bnil = false //更新bidding表,生成索引 for _, k := range fields { v1 := compare[k] v2 := tmp[k] if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { //update[k+"_b"] = v2 update[k] = v1 } else if v2 != nil && v1 == nil { update[k] = v2 } } if qutil.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 //} else if qutil.IntAll(tmp["extracttype"]) == -1 { } else { update["extracttype"] = 1 } break } else { if cid < tid { bnil = false compare = nil continue } else { bnil = true break } } } else { bnil = false break } } //下面可以多线程跑的---> //处理分类 mpool <- true go func(tmp, update, compare map[string]interface{}, bnil bool) { defer func() { <-mpool }() if !bnil && compare != nil { subscopeclass, _ := compare["subscopeclass"].([]interface{}) if subscopeclass != nil { //str := "," m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true //str += sclass + "," newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } //处理中标企业 // winner, _ := compare["winner"].(string) // m1 := map[string]bool{} // if winner != "" { // m1[winner] = true // } // package1 := compare["package"] // if package1 != nil { // packageM, _ := package1.(map[string]interface{}) // for _, p := range packageM { // pm, _ := p.(map[string]interface{}) // pw, _ := pm["winner"].(string) // if pw != "" { // m1[pw] = true // } // } // } compare = nil // if len(m1) > 0 { // //str := "," // winnerarr := []string{} // for k, _ := range m1 { // //str += k + "," // winnerarr = append(winnerarr, k) // } // update["s_winner"] = strings.Join(winnerarr, ",") // } } //------------------对比结束 //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if ps == "" { tmp["projectscope"] = "" //= tmp["detail"] } if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } // if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { // tmp["budget"] = nil // } else if sbd, ok := tmp["budget"].(string); ok { // tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] // } // if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { // tmp["bidamount"] = nil // } else if sbd, ok := tmp["bidamount"].(string); ok { // tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] // } if qutil.IntAll(update["extracttype"]) != -1 { newTmp := map[string]interface{}{} for _, v := range indexfield { if tmp[v] != nil { if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap // attachments := mp["attachments"] // con := "" // if attachments != nil { // am, _ := attachments.(map[string]interface{}) // if am != nil { // for _, v1 := range am { // vm, _ := v1.(map[string]interface{}) // if vm != nil { // c, _ := vm["content"].(string) // con += c // } // } // } // } // if con != "" { // con = FilterDetailSpace(con) // newTmp["attachments"] = con // } } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } else if v == "budget" || v == "bidamount" { newTmp[v] = nil } } UpdatesLock.Lock() arrEs = append(arrEs, newTmp) UpdatesLock.Unlock() } UpdatesLock.Lock() if len(arrEs) >= BulkSize-1 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, false) arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() }(tmp, update, compare, bnil) if n%1000 == 0 { log.Println("current:", n) } tmp = make(map[string]interface{}) } for i := 0; i < thread; i++ { mpool <- true } UpdatesLock.Lock() if len(arrEs) > 0 { tmps := arrEs log.Println(tmps[0]) elastic.BulkSave(index, itype, &tmps, false) } UpdatesLock.Unlock() log.Println(mapInfo, "create bidding index...over", n) }