package main import ( "encoding/json" "reflect" //"fmt" "log" mu "mfw/util" "net" qutil "qfw/util" elastic "qfw/util/elastic" "regexp" "strings" "sync" "time" "gopkg.in/mgo.v2/bson" ) //对字段处理 bidamount budget //招标数据表和抽取表一一对应开始更新 func biddingTask(data []byte, mapInfo map[string]interface{}) { defer qutil.Catch() q, _ := mapInfo["query"].(map[string]interface{}) bkey, _ := mapInfo["bkey"].(string) if q == nil { q = map[string]interface{}{ "_id": bson.M{ "$gt": qutil.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": qutil.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //连接信息 c, _ := bidding["collect"].(string) extractc, _ := bidding["extractcollect"].(string) db, _ := bidding["db"].(string) extractdb, _ := bidding["extractdb"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) //extract库 extractsession := extractmgo.GetMgoConn(86400) defer extractmgo.DestoryMongoConn(extractsession) extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter() eMap := map[string]map[string]interface{}{} for tmp := make(map[string]interface{}); extractquery.Next(tmp); { tid := qutil.BsonIdToSId(tmp["_id"]) eMap[tid] = tmp tmp = make(map[string]interface{}) } //bidding库 session := mgo.GetMgoConn(86400) count, _ := session.DB(db).C(c).Find(&q).Count() log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) n1, n2 := 0, 0 if count < 200000 { res := make([]map[string]interface{}, 1) session.DB(db).C(c).Find(q).Select(bson.M{ "projectinfo.attachment": 0, "contenthtml": 0, }).All(&res) mgo.DestoryMongoConn(session) if len(res) != count { log.Println("查询结果不一致", "count:", count, "res:", len(res)) time.Sleep(20 * time.Second) toadd := &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: qutil.IntAll(Sysconfig["udpport"]), } udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd) } else { n1, n2 = doIndex(res, eMap, index, itype, db, c, bkey) if (n1 + n2) != count { log.Println("任务错误,结果不一致") } } } else { log.Println("数据量太大,放弃!", count) mgo.DestoryMongoConn(session) } log.Println(mapInfo, "create bidding index...over", "all:", count, "n1:", n1, "n2:", n2) //go delEs(mapInfo, index, itype, db, c) //删除索引 } //删除索引 // func delEs(mapInfo map[string]interface{}, index, itype, db, c string) { // defer qutil.Catch() // other_delete := false // if other_index != "" && other_itype != "" { // other_delete = true // } // ids := qutil.ObjToString(mapInfo["ids"]) // idsarr := strings.Split(ids, ",") // log.Println("delete ids count:", len(idsarr)) // n1 := 0 // update := [][]map[string]interface{}{} //将bidding表中的extracttype改为-1 // set := map[string]interface{}{ // "$set": map[string]interface{}{"extracttype": -1}, // } // for _, id := range idsarr { // if id != "" { // update = append(update, []map[string]interface{}{ //更新 // map[string]interface{}{ // "_id": qutil.StringTOBsonId(id), // }, // set, // }) // if elastic.DelById(index, itype, id) { // n1++ // } // if other_delete { // bidding_other_es.DelById(other_index, other_itype, id) // } // } // } // //更新 // if len(update) > 0 { // mgo.UpdateBulkAll(db, c, update...) // } // log.Println("result delete bidding index...over", "all:", n1) // } func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey string) (int, int) { qutil.Debug(len(infos)) n1, n2 := 0, 0 //线程池 UpdatesLock := sync.Mutex{} fields := strings.Split(bidding["fields"].(string), ",") //更新数组 arr := [][]map[string]interface{}{} arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 var compare bson.M log.Println("开始迭代..") for n, tmp := range infos { n1++ // if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive != "" { //bidding中有敏感词,不生索引 // tmp = make(map[string]interface{}) // continue // } update := map[string]interface{}{} //要更新的mongo数据 //对比方法---------------- tid := qutil.BsonIdToSId(tmp["_id"]) if eMap[tid] != nil { compare = eMap[tid] if qutil.IntAll(compare["dataging"]) == 1 { //extract中dataging=1不生索引 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) //更新bidding表,生成索引;bidding表modifyinfo中的字段不更新 modifyinfo := make(map[string]bool) if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil { for k, _ := range tmpmodifyinfo { modifyinfo[k] = true } } //更新bidding表,生成索引 for _, k := range fields { v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil && !modifyinfo[k] { update[k] = v1 } else if v2 != nil && v1 != nil && !modifyinfo[k] { //update[k+"_b"] = v2 update[k] = v1 } else if v2 != nil && v1 == nil { //update[k+"_b"] = v2 } } if qutil.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 } else { update["extracttype"] = 1 } } else { compare = nil if qutil.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging update["dataging"] = 0 } } //下面可以多线程跑的---> //处理分类 if compare != nil { //extract qutil.Debug("111111111111111111") subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass if subscopeclass != nil { //str := "," m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true //str += sclass + "," newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass if topscopeclass != nil { m2 := map[string]bool{} newclass := []string{} for _, tc := range topscopeclass { tclass, _ := tc.(string) tclass = reg_letter.ReplaceAllString(tclass, "") // 去除字母 if !m2[tclass] { m2[tclass] = true newclass = append(newclass, tclass) } } update["s_topscopeclass"] = strings.Join(newclass, ",") } //处理中标企业 // winner, _ := compare["winner"].(string) // m1 := map[string]bool{} // if winner != "" { // m1[winner] = true // } // package1 := compare["package"] // if package1 != nil { // packageM, _ := package1.(map[string]interface{}) // for _, p := range packageM { // pm, _ := p.(map[string]interface{}) // pw, _ := pm["winner"].(string) // if pw != "" { // m1[pw] = true // } // } // } compare = nil // if len(m1) > 0 { // //str := "," // winnerarr := []string{} // for k, _ := range m1 { // //str += k + "," // winnerarr = append(winnerarr, k) // } // update["s_winner"] = strings.Join(winnerarr, ",") // } } //------------------对比结束 //处理key descript if bkey == "" { DealInfo(&tmp, &update) } qutil.Debug(2222222222222222) //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } if tmp["s_winner"] != "" { sWinnerarr := strings.Split(qutil.ObjToString(tmp["s_winner"]), ",") var cid []string for _, w := range sWinnerarr { if w != "" { ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"compnay_name": w}) if len(*ent) > 0 { cid = append(cid, qutil.ObjToString((*ent)["company_id"])) } } } qutil.Debug(5555555555555) if len(cid) > 0 { tmp["entidlist"] = cid update["entidlist"] = cid tmp_up := []map[string]interface{}{} tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]}) tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}}) UpdataMgoCache <- tmp_up } } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } //对标的物为空处理 if filetext := getFileText(tmp); len(filetext) > 10 { //attach_text // if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1 // tmp["detail"] = filetext //更新es中detail // update["detail"] = filetext //更新mongo中detail // update["filedetail"] = 1 //mongo中打标记 // } tmp["filetext"] = filetext } if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" { delete(tmp, "purchasing") } if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 { delete(tmp, "purchasinglist") } //数据为空处理 for _, f := range []string{"bidstatus", "city", "district", "channel"} { if fVal, ok := tmp[f].(string); ok && fVal == "" { delete(tmp, f) } } UpdatesLock.Lock() // for k1, _ := range tmp { // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" { // delete(tmp, k1) // } // } go IS.Add("bidding") if qutil.IntAll(update["extracttype"]) != -1 { qutil.Debug(44444444444444) newTmp := map[string]interface{}{} //最终生索引的数据 for field, ftype := range biddingIndexFieldsMap { // if tmp[field] != nil { // if field == "projectinfo" { mp, _ := tmp[field].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for k, ktype := range projectinfoFieldsMap { mpv := mp[k] if mpv != nil && reflect.TypeOf(mpv).String() == ktype { newmap[k] = mp[k] } } if len(newmap) > 0 { newTmp[field] = newmap } } } else if field == "purchasinglist" { //标的物处理 purchasinglist_new := []map[string]interface{}{} if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 { for _, ls := range pcl { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for pf, pftype := range purchasinglistFieldsMap { lsmv := lsm[pf] if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } } if len(purchasinglist_new) > 0 { newTmp[field] = purchasinglist_new } } else if field == "winnerorder" { //中标候选 winnerorder_new := []map[string]interface{}{} if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 { for _, win := range winnerorder { winMap_new := make(map[string]interface{}) winMap := win.(map[string]interface{}) for wf, wftype := range winnerorderlistFieldsMap { wfv := winMap[wf] if wfv != nil && reflect.TypeOf(wfv).String() == wftype { if wf == "sort" && qutil.Int64All(wfv) > 100 { continue } winMap_new[wf] = winMap[wf] } } if winMap_new != nil && len(winMap_new) > 0 { winnerorder_new = append(winnerorder_new, winMap_new) } } } if len(winnerorder_new) > 0 { newTmp[field] = winnerorder_new } } else if field == "qualifies" { //项目资质 qs := []string{} if q, _ := tmp[field].([]interface{}); len(q) > 0 { for _, v := range q { v1 := v.(map[string]interface{}) qs = append(qs, qutil.ObjToString(v1["key"])) } } if len(qs) > 0 { newTmp[field] = strings.Join(qs, ",") } } else if field == "detail" { //过滤 detail, _ := tmp[field].(string) if len([]rune(detail)) > detailLength { detail = detail[:detailLength] } newTmp[field] = FilterDetail(detail) } else if field == "_id" || field == "topscopeclass" { //不做处理 newTmp[field] = tmp[field] } else if field == "publishtime" || field == "comeintime" { //字段类型不正确,特别处理 if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 { newTmp[field] = qutil.Int64All(tmp[field]) } } else { //其它字段判断数据类型,不正确舍弃 if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype { continue } else { newTmp[field] = fieldval } } } } // for _, v := range biddingIndexFields { //索引字段 // if tmp[v] != nil { // if "projectinfo" == v { // mp, _ := tmp[v].(map[string]interface{}) // if mp != nil { // newmap := map[string]interface{}{} // for _, v1 := range projectinfoFields { // if mp[v1] != nil { // newmap[v1] = fmt.Sprint(mp[v1]) // } // } // if len(newmap) > 0 { // newTmp[v] = newmap // } // // attachments := mp["attachments"] // // con := "" // // if attachments != nil { // // am, _ := attachments.(map[string]interface{}) // // if am != nil { // // for _, v1 := range am { // // vm, _ := v1.(map[string]interface{}) // // if vm != nil { // // c, _ := vm["content"].(string) // // con += c // // } // // } // // } // // } // // con = FilterDetailSpace(con) // // if con != "" { // // newTmp["attachments"] = con // // } // } // } else if v == "purchasinglist" { //标的物处理 // purchasinglist_new := []map[string]interface{}{} // if pcl, _ := tmp[v].([]interface{}); len(pcl) > 0 { // for _, ls := range pcl { // lsm_new := make(map[string]interface{}) // lsm := ls.(map[string]interface{}) // for _, pf := range purchasinglistFields { // if lsm[pf] != nil { // lsm_new[pf] = lsm[pf] // } // } // if lsm_new != nil && len(lsm_new) > 0 { // purchasinglist_new = append(purchasinglist_new, lsm_new) // } // } // } // if len(purchasinglist_new) > 0 { // newTmp[v] = purchasinglist_new // } // /*} else if v == "winnerorder" { //中标候选 // winnerorder_new := []map[string]interface{}{} // if winnerorder, _ := tmp[v].([]interface{}); len(winnerorder) > 0 { // for _, win := range winnerorder { // winMap_new := make(map[string]interface{}) // winMap := win.(map[string]interface{}) // for _, wf := range winnerorderlistFields { // if wfv := winMap[wf]; wfv != nil { // if wf == "sort" && qutil.Int64All(wfv) > 100 { // continue // } // winMap_new[wf] = winMap[wf] // } // } // if winMap_new != nil && len(winMap_new) > 0 { // winnerorder_new = append(winnerorder_new, winMap_new) // } // } // } // if len(winnerorder_new) > 0 { // newTmp[v] = winnerorder_new // } // */ // } else { // if v == "detail" { // detail, _ := tmp[v].(string) // if len([]rune(detail)) > detailLength { // detail = detail[:detailLength] // } // newTmp[v] = FilterDetail(detail) // } else { // newTmp[v] = tmp[v] // } // } // } // } arrEs = append(arrEs, newTmp) qutil.Debug(newTmp) } if len(update) > 0 { delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除 arr = append(arr, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": update, }, }) } if len(arr) >= BulkSize-1 { mgo.UpdateBulkAll(db, c, arr...) arr = [][]map[string]interface{}{} } if len(arrEs) >= BulkSize-1 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if other_index != "" && other_itype != "" { bidding_other_es.BulkSave(other_index, other_itype, &tmps, true) } if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() if n%100 == 0 { log.Println("current:", n) } tmp = make(map[string]interface{}) } UpdatesLock.Lock() if len(arr) > 0 { mgo.UpdateBulkAll(db, c, arr...) } if len(arrEs) > 0 { tmps := arrEs elastic.BulkSave(index, itype, &tmps, true) if other_index != "" && other_itype != "" { bidding_other_es.BulkSave(other_index, other_itype, &tmps, true) } if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } } UpdatesLock.Unlock() return n1, n2 } var client *mu.Client var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$") var reg_space = regexp.MustCompile("(?ism)(.*?)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)") var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+") var reg_dh = regexp.MustCompile("[,]+") var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]") var reg_no = regexp.MustCompile("^[0-9]*$") var reg_letter = regexp.MustCompile("[a-z]*") var MSG_SERVER = "123.56.236.148:7070" var DesLen = 120 func inits() { ser := qutil.ObjToString(Sysconfig["msg_server"]) if ser != "" { MSG_SERVER = ser } cf := &mu.ClientConfig{ ClientName: "剑鱼抽关键词", EventHandler: func(p *mu.Packet) {}, MsgServerAddr: MSG_SERVER, CanHandleEvents: []int{}, OnConnectSuccess: func() { log.Println("c.") }, ReadBufferSize: 10, WriteBufferSize: 10, } client, _ = mu.NewClient(cf) } //var clientlock = &sync.Mutex{} var keypool = make(chan bool, 1) func DealInfo(obj, update *map[string]interface{}) { defer qutil.Catch() if (*obj)["keywords"] != nil && (*obj)["description"] != nil { return } else { (*update)["keywords"] = "" (*update)["description"] = "" } title := qutil.ObjToString((*obj)["title"]) var m [][]string select { case <-func() <-chan bool { ch := make(chan bool, 1) go func(chan bool) { select { case keypool <- true: defer func() { <-keypool }() ret, _ := client.Call("", mu.UUID(8), 4010, mu.SENDTO_TYPE_RAND_RECIVER, title, 1) json.Unmarshal(ret, &m) case <-time.After(10 * time.Millisecond): } ch <- true }(ch) return ch }(): case <-time.After(40 * time.Millisecond): } arr := []string{} keyword := []string{} keywordnew := []string{} for _, tmp := range m { if reg.MatchString(tmp[0]) { arr = append(arr, tmp[0]) } else { if len(arr) > 0 { str := strings.Join(arr, "") keyword = append(keyword, str) arr = []string{} } if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) { keyword = append(keyword, tmp[0]) } } } for _, v := range keyword { v = reg_no.ReplaceAllString(v, "") if len(v) > 0 { keywordnew = append(keywordnew, v) } } keywords := strings.Join(keywordnew, ",") (*update)["keywords"] = keywords content := "" if (*obj)["detail_bak"] != nil { content = qutil.ObjToString((*obj)["detail_bak"]) } else { content = qutil.ObjToString((*obj)["detail"]) } //内容替换 content = strings.Replace(content, " ", "", -1) content = reg_space.ReplaceAllString(content, "") content = reg_row.ReplaceAllString(content, ",") content = reg_dh.ReplaceAllString(content, ",") content = reg_newdb.ReplaceAllString(content, "$1") if strings.HasPrefix(content, ",") { content = content[1:] } //log.Println(content) tc := []rune(content) ltc := len(tc) description := content if ltc > DesLen { description = string(tc[:DesLen]) } (*update)["description"] = description //保存到数据库 return }