package main import ( "encoding/json" "fmt" "regexp" "strings" "time" util "utils" "utils/mfw" "utils/mongodb" "utils/udp" ) //招标数据表和抽取表一一对应开始更新 func (t *TaskInfo) biddingTask(data []byte, mapInfo map[string]interface{}) { defer util.Catch() q, _ := mapInfo["query"].(map[string]interface{}) bkey, _ := mapInfo["bkey"].(string) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //extract库 extractConn := extractMgo.GetMgoConn() defer extractMgo.DestoryMongoConn(extractConn) extractc, _ := extract["collect"].(string) extractResult := extractConn.DB(extractMgo.DbName).C(extractc).Find(q).Sort("_id").Iter() eMap := map[string]map[string]interface{}{} extCount, repeatCount := 0, 0 for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ { if util.IntAll(tmp["repeat"]) == 1 { repeatCount++ } tid := mongodb.BsonIdToSId(tmp["_id"]) eMap[tid] = tmp tmp = make(map[string]interface{}) } util.Debug("抽取表 重复数据量:", extCount, repeatCount) //bidding库 biddingConn := biddingMgo.GetMgoConn() c, _ := bidding["collect"].(string) count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count() util.Debug("查询语句:", q, "同步总数:", count) n1, n2 := 0, 0 if count < 200000 { var res []map[string]interface{} result := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(map[string]interface{}{ "contenthtml": 0, }).Iter() for tmp := make(map[string]interface{}); result.Next(tmp); { res = append(res, tmp) tmp = make(map[string]interface{}) } biddingMgo.DestoryMongoConn(biddingConn) util.Debug("查询结果", "bidding:", count, "抽取:", extCount) //if int64(len(res)) != count { // time.Sleep(20 * time.Second) // toadd := &net.UDPAddr{ // IP: net.ParseIP("127.0.0.1"), // Port: util.IntAll(Sysconfig["udpport"]), // } // udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd) //} n1, n2 = t.doIndex(res, eMap, bkey) } else { util.Debug("数据量太大,放弃!", count) biddingMgo.DestoryMongoConn(biddingConn) } util.Debug(mapInfo, "create bidding index...over", "all:", count, "bidding size:", n1, ",es size:", n2) if t.stype == "bidding_history" { // 历史判重id段结束之后 生全量数据索引 t.stype = "biddingdata" t.thread = 30 t.biddingDataTask(data, mapInfo) } } func (t *TaskInfo) doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey string) (int, int) { n1, n2 := 0, 0 //bidding数量,索引数量 //对比两张表数据,减少查询次数 var compare map[string]interface{} util.Debug("start ...") for n, tmp := range infos { n1++ if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) continue } tid := mongodb.BsonIdToSId(tmp["_id"]) update := map[string]interface{}{} //要更新的mongo数据 //对比方法---------------- if eMap[tid] != nil { compare = eMap[tid] if t.stype == "bidding" { // 增量id段 正常数据 if num := util.IntAll(compare["dataging"]); num == 1 { //extract中dataging=1跳过 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) } if t.stype == "bidding_history" { //增量id段 历史数据 if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) } //更新bidding表,生成索引;bidding表modifyinfo中的字段不更新 modifyinfo := make(map[string]bool) if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil { for k, _ := range tmpmodifyinfo { modifyinfo[k] = true } } //更新bidding表,生成索引 for _, k := range biddingMgoFields { v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil && !modifyinfo[k] { update[k] = v1 } else if v2 != nil && v1 != nil && !modifyinfo[k] { //update[k+"_b"] = v2 update[k] = v1 } else if v2 != nil && v1 == nil { //update[k+"_b"] = v2 if k == "area" || k == "city" || k == "district" { update[k] = "" } } } if util.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 } else { update["extracttype"] = 1 } } else { compare = nil if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging update["dataging"] = 0 } } //下面可以多线程跑的---> //处理分类 if compare != nil { //extract FieldMethod(compare, update) compare = nil } else { area := util.ObjToString(tmp["area"]) city := util.ObjToString(tmp["city"]) district := util.ObjToString(tmp["district"]) rdata := standardCheckCity(area, city, district) if len(rdata) > 0 { for k, v := range rdata { update[k] = v } } } //------------------对比结束 //处理key descript if bkey == "" { DealInfo(&tmp, &update) } //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } extractMap := make(map[string]interface{}) if tmp["s_winner"] != "" { cid := FieldFun(tmp) if len(cid) > 0 { tmp["entidlist"] = cid update["entidlist"] = cid extractMap["entidlist"] = cid } } // 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表 TypeMethod(tmp, update, extractMap) if len(extractMap) > 0 { if extractMap["toptype"] != nil && extractMap["subtype"] == nil { updateExtractPool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}}, } } else { updateExtractPool <- []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": extractMap}, } } } // 附件有效字段 if i := validFile(tmp); i != 0 { if i == -1 { tmp["isValidFile"] = false update["isValidFile"] = false } else { tmp["isValidFile"] = true update["isValidFile"] = true } } clearMap(tmp) //go IS.Add("bidding") if util.IntAll(update["extracttype"]) != -1 { n2++ newTmp := GetEsField(tmp, update, t.stype) newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段 if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" { // 剑鱼信息发布数据 通过udp通知信息发布程序 go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"])) } saveEsPool <- newTmp } if len(update) > 0 { delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除 updateBiddingPool <- []map[string]interface{}{{ "_id": tmp["_id"], }, {"$set": update}, } } if n%200 == 0 { util.Debug("current:", n) } tmp = make(map[string]interface{}) } return n1, n2 } var client *mfw.Client var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$") var reg_space = regexp.MustCompile("(?ism)(.*?)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)") var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+") var reg_dh = regexp.MustCompile("[,]+") var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]") var reg_no = regexp.MustCompile("^[0-9]*$") var reg_letter = regexp.MustCompile("[a-z]*") var MSG_SERVER = "123.56.236.148:7070" var DesLen = 120 func inits() { ser := util.ObjToString(Sysconfig["msg_server"]) if ser != "" { MSG_SERVER = ser } cf := &mfw.ClientConfig{ ClientName: "剑鱼抽关键词", EventHandler: func(p *mfw.Packet) {}, MsgServerAddr: MSG_SERVER, CanHandleEvents: []int{}, OnConnectSuccess: func() { util.Debug("剑鱼关键词 client") }, ReadBufferSize: 10, WriteBufferSize: 10, } client, _ = mfw.NewClient(cf) } var keypool = make(chan bool, 1) func DealInfo(obj, update *map[string]interface{}) { defer util.Catch() if (*obj)["keywords"] != nil && (*obj)["description"] != nil { return } else { (*update)["keywords"] = "" (*update)["description"] = "" } title := util.ObjToString((*obj)["title"]) var m [][]string select { case <-func() <-chan bool { ch := make(chan bool, 1) go func(chan bool) { select { case keypool <- true: defer func() { <-keypool }() ret, _ := client.Call("", mfw.UUID(8), 4010, mfw.SENDTO_TYPE_RAND_RECIVER, title, 1) json.Unmarshal(ret, &m) case <-time.After(10 * time.Millisecond): } ch <- true }(ch) return ch }(): case <-time.After(40 * time.Millisecond): } arr := []string{} keyword := []string{} keywordnew := []string{} for _, tmp := range m { if reg.MatchString(tmp[0]) { arr = append(arr, tmp[0]) } else { if len(arr) > 0 { str := strings.Join(arr, "") keyword = append(keyword, str) arr = []string{} } if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) { keyword = append(keyword, tmp[0]) } } } for _, v := range keyword { v = reg_no.ReplaceAllString(v, "") if len(v) > 0 { keywordnew = append(keywordnew, v) } } keywords := strings.Join(keywordnew, ",") (*update)["keywords"] = keywords content := "" if (*obj)["detail_bak"] != nil { content = util.ObjToString((*obj)["detail_bak"]) } else { content = util.ObjToString((*obj)["detail"]) } //内容替换 content = strings.Replace(content, " ", "", -1) content = reg_space.ReplaceAllString(content, "") content = reg_row.ReplaceAllString(content, ",") content = reg_dh.ReplaceAllString(content, ",") content = reg_newdb.ReplaceAllString(content, "$1") if strings.HasPrefix(content, ",") { content = content[1:] } tc := []rune(content) ltc := len(tc) description := content if ltc > DesLen { description = string(tc[:DesLen]) } (*update)["description"] = description //保存到数据库 return } // @Description tmp修改索引,update 修改bidding表,extractM修改抽取表 // @Author J 2022/6/10 10:29 AM func TypeMethod(tmp, update, extractM map[string]interface{}) { if jyData, ok := tmp["jyfb_data"].(map[string]interface{}); ok { if t := util.ObjToString(jyData["type"]); t != "" { switch t { //case "采购信息": case "招标公告": if util.ObjToString(tmp["toptype"]) != "招标" { tmp["toptype"] = "招标" update["toptype"] = "招标" extractM["toptype"] = "招标" delete(tmp, "subtype") delete(update, "subtype") } case "采购意向": if util.ObjToString(tmp["toptype"]) != "采购意向" { tmp["toptype"] = "采购意向" tmp["subtype"] = "采购意向" update["toptype"] = "采购意向" update["subtype"] = "采购意向" extractM["toptype"] = "采购意向" extractM["subtype"] = "采购意向" } case "招标预告": if util.ObjToString(tmp["toptype"]) != "预告" { tmp["toptype"] = "预告" update["toptype"] = "预告" extractM["toptype"] = "预告" delete(tmp, "subtype") delete(update, "subtype") } case "招标结果": if util.ObjToString(tmp["toptype"]) != "结果" { tmp["toptype"] = "结果" update["toptype"] = "结果" extractM["toptype"] = "结果" delete(tmp, "subtype") delete(update, "subtype") } } } } } // @Description rpc调用信息发布程序接口 // @Author J 2022/4/13 9:13 AM func UdpMethod(id string) { mapinfo := map[string]interface{}{ "infoid": id, "stype": "jyfb_data_over", } datas, _ := json.Marshal(mapinfo) util.Debug(JyUdpAddr, string(datas)) _ = udpclient.WriteUdp(datas, udp.OP_TYPE_DATA, JyUdpAddr) } // @Description id不变,内容变化 重新索引数据 // @Author J 2022/8/10 13:29 func taskinfo(id string) { tmp, _ := biddingMgo.FindById("bidding", id, nil) if tmp == nil || len(*tmp) == 0 { util.Debug(fmt.Sprintf("taskinfo bidding id=%s 未查询到数据", id)) return } extractM, _ := extractMgo.FindById(util.ObjToString(extract["collect"]), id, nil) if extractM == nil || len(*extractM) == 0 { extractM, _ = extractMgo.FindById(util.ObjToString(extract["collect1"]), id, nil) if extractM == nil || len(*extractM) == 0 { util.Debug(fmt.Sprintf("taskinfo extract id=%s 未查询到数据", id)) return } } update := map[string]interface{}{} //要更新的mongo数据 //更新bidding表字段 for _, k := range biddingMgoFields { v1 := (*extractM)[k] //extract v2 := (*tmp)[k] //bidding if v2 == nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 != nil { update[k] = v1 } else if v2 != nil && v1 == nil { if k == "area" || k == "city" || k == "district" { update[k] = "" } } } if util.IntAll((*extractM)["repeat"]) == 1 { update["extracttype"] = -1 } else { update["extracttype"] = 1 } if util.IntAll((*tmp)["dataging"]) == 1 { //修改未抽取的bidding数据的dataging update["dataging"] = 0 } //处理分类 FieldMethod(*extractM, update) //同时保存到elastic for tk, tv := range update { (*tmp)[tk] = tv } extractMap := make(map[string]interface{}) if util.ObjToString((*tmp)["s_winner"]) != "" { cid := FieldFun(*tmp) if len(cid) > 0 { (*tmp)["entidlist"] = cid update["entidlist"] = cid extractMap["entidlist"] = cid } updateExtractPool <- []map[string]interface{}{ {"_id": mongodb.StringTOBsonId(id)}, {"$set": extractMap}, } } // 附件有效字段 if i := validFile(*tmp); i != 0 { if i == -1 { (*tmp)["isValidFile"] = false update["isValidFile"] = false } else { (*tmp)["isValidFile"] = true update["isValidFile"] = true } } clearMap(*tmp) if util.IntAll(update["extracttype"]) != -1 { newTmp := GetEsField(*tmp, update, "") newTmp["dataweight"] = 0 //索引数据新增 jy置顶字段 saveEsPool <- newTmp } if len(update) > 0 { delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除 updateBiddingPool <- []map[string]interface{}{{ "_id": mongodb.StringTOBsonId(id), }, {"$set": update}, } } }