package main import ( "encoding/json" "fmt" "log" mu "mfw/util" "net" qutil "qfw/util" elastic "qfw/util/elastic" "regexp" "strings" "sync" "time" "gopkg.in/mgo.v2/bson" ) //对字段处理 bidamount budget //招标数据表和抽取表一一对应开始更新 func biddingTask(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() q, _ := mapInfo["query"].(map[string]interface{}) bkey, _ := mapInfo["bkey"].(string) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //连接信息 c, _ := bidding["collect"].(string) extractc, _ := bidding["extractcollect"].(string) db, _ := bidding["db"].(string) extractdb, _ := bidding["extractdb"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) //extract库 extractsession := extractmgo.GetMgoConn(86400) defer extractmgo.DestoryMongoConn(extractsession) extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter() eMap := map[string]map[string]interface{}{} for tmp := make(map[string]interface{}); extractquery.Next(tmp); { tid := qutil.BsonIdToSId(tmp["_id"]) eMap[tid] = tmp tmp = make(map[string]interface{}) } //bidding库 session := mgo.GetMgoConn(86400) count, _ := session.DB(db).C(c).Find(&q).Count() log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) n1, n2 := 0, 0 if count < 200000 { res := make([]map[string]interface{}, 1) session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).All(&res) mgo.DestoryMongoConn(session) if len(res) != count { log.Println("查询结果不一致", "count:", count, "res:", len(res)) time.Sleep(20 * time.Second) toadd := &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: qutil.IntAll(Sysconfig["udpport"]), } udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd) } else { n1, n2 = doIndex(res, eMap, index, itype, db, c, bkey) if (n1 + n2) != count { log.Println("任务错误,结果不一致") } } } else { log.Println("数据量太大,放弃!", count) mgo.DestoryMongoConn(session) } log.Println(mapInfo, "create bidding index...over", "all:", count, "n1:", n1, "n2:", n2) } func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) { n1, n2 := 0, 0 //线程池 UpdatesLock := sync.Mutex{} fields := strings.Split(bidding["fields"].(string), ",") //更新数组 arr := [][]map[string]interface{}{} arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 var compare bson.M log.Println("开始迭代..") for n, tmp := range infos { n1++ update := map[string]interface{}{} //对比方法---------------- tid := qutil.BsonIdToSId(tmp["_id"]) if eMap[tid] != nil { compare = eMap[tid] delete(eMap, tid) //更新bidding表,生成索引 for _, k := range fields { v1 := compare[k] v2 := tmp[k] if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { //update[k+"_b"] = v2 update[k] = v1 } else if v2 != nil && v1 == nil { //update[k+"_b"] = v2 } } if qutil.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 } else { update["extracttype"] = 1 } } else { compare = nil } //下面可以多线程跑的---> //处理分类 if compare != nil { subscopeclass, _ := compare["subscopeclass"].([]interface{}) if subscopeclass != nil { //str := "," m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true //str += sclass + "," newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } //处理中标企业 winner, _ := compare["winner"].(string) m1 := map[string]bool{} if winner != "" { m1[winner] = true } package1 := compare["package"] if package1 != nil { packageM, _ := package1.(map[string]interface{}) for _, p := range packageM { pm, _ := p.(map[string]interface{}) pw, _ := pm["winner"].(string) if pw != "" { m1[pw] = true } } } compare = nil if len(m1) > 0 { //str := "," winnerarr := []string{} for k, _ := range m1 { //str += k + "," winnerarr = append(winnerarr, k) } update["s_winner"] = strings.Join(winnerarr, ",") } } //------------------对比结束 //处理key descript if bkey == "" { DealInfo(&tmp, &update) } //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if ps == "" { tmp["projectscope"] = "" //= tmp["detail"] } if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { tmp["budget"] = nil } else if sbd, ok := tmp["budget"].(string); ok { tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] } if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { tmp["bidamount"] = nil } else if sbd, ok := tmp["bidamount"].(string); ok { tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] } UpdatesLock.Lock() // for k1, _ := range tmp { // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" { // delete(tmp, k1) // } // } go IS.Add("bidding") if qutil.IntAll(update["extracttype"]) != -1 { newTmp := map[string]interface{}{} for _, v := range biddingIndexFields { // if tmp[v] != nil { // newTmp[v] = tmp[v] // } if tmp[v] != nil { if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap attachments := mp["attachments"] con := "" if attachments != nil { am, _ := attachments.(map[string]interface{}) if am != nil { for _, v1 := range am { vm, _ := v1.(map[string]interface{}) if vm != nil { c, _ := vm["content"].(string) con += c } } } } if con != "" { con = FilterDetailSpace(con) newTmp["attachments"] = con } } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } else if v == "budget" || v == "bidamount" { newTmp[v] = nil } } arrEs = append(arrEs, newTmp) } if len(update) > 0 { arr = append(arr, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": update, }, }) } if len(arr) >= BulkSize-1 { mgo.UpdateBulkAll(db, c, arr...) arr = [][]map[string]interface{}{} } if len(arrEs) >= BulkSize-1 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() if n%100 == 0 { log.Println("current:", n) } tmp = make(map[string]interface{}) } UpdatesLock.Lock() if len(arr) > 0 { mgo.UpdateBulkAll(db, c, arr...) } if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } } UpdatesLock.Unlock() return n1, n2 } var client *mu.Client var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$") var reg_space = regexp.MustCompile("(?ism)(.*?)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)") var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+") var reg_dh = regexp.MustCompile("[,]+") var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]") var reg_no = regexp.MustCompile("^[0-9]*$") var MSG_SERVER = "123.56.236.148:7070" var DesLen = 120 func inits() { ser := qutil.ObjToString(Sysconfig["msg_server"]) if ser != "" { MSG_SERVER = ser } cf := &mu.ClientConfig{ ClientName: "剑鱼抽关键词", EventHandler: func(p *mu.Packet) {}, MsgServerAddr: MSG_SERVER, CanHandleEvents: []int{}, OnConnectSuccess: func() { log.Println("c.") }, ReadBufferSize: 10, WriteBufferSize: 10, } client, _ = mu.NewClient(cf) } //var clientlock = &sync.Mutex{} var keypool = make(chan bool, 1) func DealInfo(obj, update *map[string]interface{}) { defer qutil.Catch() if (*obj)["keywords"] != nil && (*obj)["description"] != nil { return } else { (*update)["keywords"] = "" (*update)["description"] = "" } title := qutil.ObjToString((*obj)["title"]) var m [][]string select { case <-func() <-chan bool { ch := make(chan bool, 1) go func(chan bool) { select { case keypool <- true: defer func() { <-keypool }() ret, _ := client.Call("", mu.UUID(8), 4010, mu.SENDTO_TYPE_RAND_RECIVER, title, 1) json.Unmarshal(ret, &m) case <-time.After(10 * time.Millisecond): } ch <- true }(ch) return ch }(): case <-time.After(40 * time.Millisecond): } arr := []string{} keyword := []string{} keywordnew := []string{} for _, tmp := range m { if reg.MatchString(tmp[0]) { arr = append(arr, tmp[0]) } else { if len(arr) > 0 { str := strings.Join(arr, "") keyword = append(keyword, str) arr = []string{} } if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) { keyword = append(keyword, tmp[0]) } } } for _, v := range keyword { v = reg_no.ReplaceAllString(v, "") if len(v) > 0 { keywordnew = append(keywordnew, v) } } keywords := strings.Join(keywordnew, ",") (*update)["keywords"] = keywords content := "" if (*obj)["detail_bak"] != nil { content = qutil.ObjToString((*obj)["detail_bak"]) } else { content = qutil.ObjToString((*obj)["detail"]) } //内容替换 content = strings.Replace(content, " ", "", -1) content = reg_space.ReplaceAllString(content, "") content = reg_row.ReplaceAllString(content, ",") content = reg_dh.ReplaceAllString(content, ",") content = reg_newdb.ReplaceAllString(content, "$1") if strings.HasPrefix(content, ",") { content = content[1:] } //log.Println(content) tc := []rune(content) ltc := len(tc) description := content if ltc > DesLen { description = string(tc[:DesLen]) } (*update)["description"] = description //保存到数据库 return }