package main import ( "encoding/json" "fmt" "log" mu "mfw/util" qutil "qfw/util" elastic "qfw/util/elastic" "regexp" "strings" "sync" "time" "qfw/util/redis" "gopkg.in/mgo.v2/bson" es "gopkg.in/olivere/elastic.v1" ) var BulkSize = 100 var ESLEN = 32766 //对字段处理 bidamount budget //招标数据表和抽取表一一对应开始更新 var esMap = map[string]bool{} var esLock = sync.Mutex{} func delEsTask() { time.AfterFunc(30*time.Second, delEsTask) esLock.Lock() delEsAndRedis() esLock.Unlock() } func delEsAndRedis() { if len(esMap) > 0 { client := elastic.GetEsConn() defer elastic.DestoryEsConn(client) if client != nil { defer qutil.Catch() req := client.Bulk() var keys = []interface{}{} str := "" for k, _ := range esMap { str += " " + k keys = append(keys, fmt.Sprintf("jypcdetail__rec%s", k)) req = req.Add(es.NewBulkDeleteRequest().Index(index).Type(itype).Id(k)) } res, err := req.Do() if err != nil { log.Println("批量删除es出错", err.Error()) } log.Println("del es", len(res.Succeeded())) log.Println(str) log.Println("del redis", redis.Del("other", keys...)) } else { log.Println("es client get failed") } esMap = map[string]bool{} } } func biddingTask(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() q, _ := mapInfo["query"].(map[string]interface{}) bkey, _ := mapInfo["bkey"].(string) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //连接信息 //线程池 UpdatesLock := sync.Mutex{} //bidding库 session := mgo.GetMgoConn(86400) defer mgo.DestoryMongoConn(session) count, _ := session.DB(db).C(c).Find(&q).Count() log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) //查询招标数据 query := session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).Sort("_id").Iter() //查询抽取结果 log.Println("查询抽取结果..") //extract库 n := 0 //更新数组 arr := [][]map[string]interface{}{} arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 var compare map[string]interface{} log.Println("开始迭代..") con := 0 for tmp := make(map[string]interface{}); query.Next(tmp); n++ { if qutil.IntAll(tmp["extracttype"]) == -1 || ((tmp["buyerclass"] != nil || tmp["projectname"] != nil || tmp["keywords"] != nil) && force == 0) { tmp = make(map[string]interface{}) con++ if con%500 == 0 { log.Println("跳过:", con) } continue } //删除索引和缓存 tid := qutil.BsonIdToSId(tmp["_id"]) if tid != "" { esLock.Lock() esMap[tid] = true if len(esMap) > 100 { delEsAndRedis() } esLock.Unlock() //log.Println("del es ", tid, elastic.DelById(index, itype, tid)) //删除缓存 //redis.DelByCodePattern("other", fmt.Sprintf("*%s*", tid)) } update := map[string]interface{}{} //对比方法---------------- compare = nil obj, bres := mgo.FindById(extractc, tid, nil) if bres && obj != nil { compare = *obj } //extractsession.DB(extractdb).C(extractc).FindId(tmp["_id"]).One(&compare) //下面可以多线程跑的---> //处理分类 if compare != nil { for _, k := range fields { v1 := compare[k] v2 := tmp[k] if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { update[k] = v1 } } subscopeclass, _ := compare["subscopeclass"].([]interface{}) if subscopeclass != nil { //str := "," m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true //str += sclass + "," newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } //处理中标企业 winner, _ := compare["winner"].(string) m1 := map[string]bool{} if winner != "" { m1[winner] = true } package1 := compare["package"] if package1 != nil { packageM, _ := package1.(map[string]interface{}) for _, p := range packageM { pm, _ := p.(map[string]interface{}) pw, _ := pm["winner"].(string) if pw != "" { m1[pw] = true } } } compare = nil if len(m1) > 0 { //str := "," winnerarr := []string{} for k, _ := range m1 { //str += k + "," winnerarr = append(winnerarr, k) } update["s_winner"] = strings.Join(winnerarr, ",") } } //------------------对比结束 //处理key descript if bkey == "" { DealInfo(&tmp, &update) } //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if ps == "" { tmp["projectscope"] = "" //= tmp["detail"] } if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } if s_budget := fmt.Sprint(tmp["budget"]); s_budget == "" || s_budget == "" || s_budget == "null" { tmp["budget"] = nil } else if sbd, ok := tmp["budget"].(string); ok { tmp["budget"] = ObjToMoney([]interface{}{sbd, sbd})[0] } if s_bidamount := fmt.Sprint(tmp["bidamount"]); s_bidamount == "" || s_bidamount == "" || s_bidamount == "null" { tmp["bidamount"] = nil } else if sbd, ok := tmp["bidamount"].(string); ok { tmp["bidamount"] = ObjToMoney([]interface{}{sbd, sbd})[0] } UpdatesLock.Lock() // for k1, _ := range tmp { // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" { // delete(tmp, k1) // } // } newTmp := map[string]interface{}{} for _, v := range biddingIndexFields { // if tmp[v] != nil { // newTmp[v] = tmp[v] // } if tmp[v] != nil { if "projectinfo" == v { mp, _ := tmp[v].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for _, v1 := range projectinfoFields { if mp[v1] != nil { newmap[v1] = mp[v1] } } newTmp[v] = newmap } } else { if v == "detail" { detail, _ := tmp[v].(string) newTmp[v] = FilterDetail(detail) } else { newTmp[v] = tmp[v] } } } } arrEs = append(arrEs, newTmp) if len(update) > 0 { arr = append(arr, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": update, }, }) } if len(arr) >= BulkSize-1 { mgo.UpdateBulkAll(db, c, arr...) arr = [][]map[string]interface{}{} } if len(arrEs) >= BulkSize-1 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() if n%100 == 0 { log.Println("current:", n) } tmp = make(map[string]interface{}) } UpdatesLock.Lock() if len(arr) > 0 { mgo.UpdateBulkAll(db, c, arr...) } if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) } UpdatesLock.Unlock() log.Println(mapInfo, "create bidding index...over", n, con) } var client *mu.Client var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$") var reg_space = regexp.MustCompile("(?ism)(.*?)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)") var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+") var reg_dh = regexp.MustCompile("[,]+") var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]") var reg_no = regexp.MustCompile("^[0-9]*$") var MSG_SERVER = "123.56.236.148:7070" var DesLen = 120 func inits() { ser := qutil.ObjToString(Sysconfig["msg_server"]) if ser != "" { MSG_SERVER = ser } cf := &mu.ClientConfig{ ClientName: "剑鱼抽关键词", EventHandler: func(p *mu.Packet) {}, MsgServerAddr: MSG_SERVER, CanHandleEvents: []int{}, OnConnectSuccess: func() { log.Println("c.") }, ReadBufferSize: 10, WriteBufferSize: 10, } client, _ = mu.NewClient(cf) } var keypool = make(chan bool, 1) func DealInfo(obj, update *map[string]interface{}) { defer qutil.Catch() if (*obj)["keywords"] != nil && (*obj)["description"] != nil { return } else { (*update)["keywords"] = "" (*update)["description"] = "" } title := qutil.ObjToString((*obj)["title"]) var m [][]string select { case <-func() <-chan bool { select { case keypool <- true: defer func() { <-keypool }() ret, _ := client.Call("", mu.UUID(8), 4010, mu.SENDTO_TYPE_RAND_RECIVER, title, 1) json.Unmarshal(ret, &m) case <-time.After(10 * time.Millisecond): } ch := make(chan bool, 1) ch <- true return ch }(): case <-time.After(50 * time.Millisecond): } arr := []string{} keyword := []string{} keywordnew := []string{} for _, tmp := range m { if reg.MatchString(tmp[0]) { arr = append(arr, tmp[0]) } else { if len(arr) > 0 { str := strings.Join(arr, "") keyword = append(keyword, str) arr = []string{} } if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) { keyword = append(keyword, tmp[0]) } } } for _, v := range keyword { v = reg_no.ReplaceAllString(v, "") if len(v) > 0 { keywordnew = append(keywordnew, v) } } keywords := strings.Join(keywordnew, ",") (*update)["keywords"] = keywords content := "" if (*obj)["detail_bak"] != nil { content = qutil.ObjToString((*obj)["detail_bak"]) } else { content = qutil.ObjToString((*obj)["detail"]) } //内容替换 content = strings.Replace(content, " ", "", -1) content = reg_space.ReplaceAllString(content, "") content = reg_row.ReplaceAllString(content, ",") content = reg_dh.ReplaceAllString(content, ",") content = reg_newdb.ReplaceAllString(content, "$1") if strings.HasPrefix(content, ",") { content = content[1:] } //log.Println(content) tc := []rune(content) ltc := len(tc) description := content if ltc > DesLen { description = string(tc[:DesLen]) } (*update)["description"] = description //保存到数据库 return } var filterReg = regexp.MustCompile("<[^>]+>") func FilterDetail(text string) string { return filterReg.ReplaceAllString(text, "") }