package main import ( "encoding/json" "go.mongodb.org/mongo-driver/bson" "mongodb" "qfw/util/redis" "reflect" //"fmt" "log" mu "mfw/util" "net" qutil "qfw/util" elastic "qfw/util/elastic" "regexp" "strings" "sync" "time" ) var date1 = regexp.MustCompile("20[0-2][0-9][年|\\-\\/|.][0-9]{1,2}[月|\\-|\\/|.][0-9]{1,2}[日]?") //对字段处理 bidamount budget //招标数据表和抽取表一一对应开始更新 func biddingTask(data []byte, mapInfo map[string]interface{}, tasktype string) { defer qutil.Catch() q, _ := mapInfo["query"].(map[string]interface{}) bkey, _ := mapInfo["bkey"].(string) if q == nil { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mongodb.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": mongodb.StringTOBsonId(mapInfo["lteid"].(string)), }, } } //logger.SetRollingDaily("./logs", "id.log") //连接信息 c, _ := bidding["collect"].(string) extractc, _ := bidding["extractcollect"].(string) db, _ := bidding["db"].(string) extractdb, _ := bidding["extractdb"].(string) index, _ := bidding["index"].(string) itype, _ := bidding["type"].(string) //extract库 extractsession := extractmgo.GetMgoConn() defer extractmgo.DestoryMongoConn(extractsession) extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter() eMap := map[string]map[string]interface{}{} extCount, repeatCount := 0, 0 for tmp := make(map[string]interface{}); extractquery.Next(tmp); extCount++ { if qutil.IntAll(tmp["repeat"]) == 1 { repeatCount++ } tid := mongodb.BsonIdToSId(tmp["_id"]) eMap[tid] = tmp tmp = make(map[string]interface{}) } //bidding库 session := mgo.GetMgoConn() count, _ := session.DB(db).C(c).Find(&q).Count() log.Println("抽取表 重复数据量:", extCount, repeatCount) log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index) n1, n2 := 0, 0 if count < 200000 { var res []map[string]interface{} //res := make([]map[string]interface{}, 1) result := session.DB(db).C(c).Find(q).Select(map[string]interface{}{ "projectinfo.attachment": 0, "contenthtml": 0, }).Iter() for tmp := make(map[string]interface{}); result.Next(tmp); { res = append(res, tmp) tmp = make(map[string]interface{}) } mgo.DestoryMongoConn(session) log.Println("查询结果", "bidding:", count, "抽取:", extCount) if int64(len(res)) != count { time.Sleep(20 * time.Second) toadd := &net.UDPAddr{ IP: net.ParseIP("127.0.0.1"), Port: qutil.IntAll(Sysconfig["udpport"]), } udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd) } n1, n2 = doIndex(res, eMap, index, itype, db, c, bkey, tasktype) //if int64(n1 + n2) != count { // log.Println("任务错误,结果不一致") //} } else { log.Println("数据量太大,放弃!", count) mgo.DestoryMongoConn(session) } log.Println(mapInfo, "create bidding index...over", "all:", count, "bidding size:", n1, ",es size:", n2) if tasktype == "bidding_history" { qutil.Debug(tasktype) // 历史判重id段结束之后 生全量数据索引 biddingDataTask(data, mapInfo) } } func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey, tasktype string) (int, int) { n1, n2 := 0, 0 //bidding数量,索引数量 //线程池 UpdatesLock := sync.Mutex{} fields := strings.Split(bidding["fields"].(string), ",") //更新数组 arr := [][]map[string]interface{}{} arrEs := []map[string]interface{}{} //对比两张表数据,减少查询次数 var compare map[string]interface{} log.Println("开始迭代..") for n, tmp := range infos { n1++ if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive == "测试" || sensitive == "异常" { //bidding中有敏感词,不生索引 tmp = make(map[string]interface{}) continue } tid := mongodb.BsonIdToSId(tmp["_id"]) //loginfo := make(map[string]interface{}) // 日志 update := map[string]interface{}{} //要更新的mongo数据 //对比方法---------------- if eMap[tid] != nil { compare = eMap[tid] if tasktype == "bidding" { // 增量id段 正常数据 if num := qutil.IntAll(compare["dataging"]); num == 1 { //extract中dataging=1跳过 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) } if tasktype == "bidding_history" { //增量id段 历史数据 if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过 tmp = make(map[string]interface{}) compare = nil continue } delete(eMap, tid) } //更新bidding表,生成索引;bidding表modifyinfo中的字段不更新 modifyinfo := make(map[string]bool) if tmpmodifyinfo, ok := tmp["modifyinfo"].(map[string]interface{}); ok && tmpmodifyinfo != nil { for k, _ := range tmpmodifyinfo { modifyinfo[k] = true } } //更新bidding表,生成索引 for _, k := range fields { v1 := compare[k] //extract v2 := tmp[k] //bidding if v2 == nil && v1 != nil && !modifyinfo[k] { update[k] = v1 } else if v2 != nil && v1 != nil && !modifyinfo[k] { //update[k+"_b"] = v2 update[k] = v1 } else if v2 != nil && v1 == nil { //update[k+"_b"] = v2 if k == "area" || k == "city" || k == "district" { update[k] = "" } } } if qutil.IntAll(compare["repeat"]) == 1 { update["extracttype"] = -1 } else { update["extracttype"] = 1 } } else { compare = nil if qutil.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging update["dataging"] = 0 } } //下面可以多线程跑的---> //处理分类 if compare != nil { //extract subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass if subscopeclass != nil { //str := "," m1 := map[string]bool{} newclass := []string{} for _, sc := range subscopeclass { sclass, _ := sc.(string) if !m1[sclass] { m1[sclass] = true //str += sclass + "," newclass = append(newclass, sclass) } } update["s_subscopeclass"] = strings.Join(newclass, ",") update["subscopeclass"] = newclass } topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass if topscopeclass != nil { m2 := map[string]bool{} newclass := []string{} for _, tc := range topscopeclass { tclass, _ := tc.(string) tclass = reg_letter.ReplaceAllString(tclass, "") // 去除字母 if !m2[tclass] { m2[tclass] = true newclass = append(newclass, tclass) } } update["s_topscopeclass"] = strings.Join(newclass, ",") } if package1 := compare["package"]; package1 != nil { packageM, _ := package1.(map[string]interface{}) for _, p := range packageM { pm, _ := p.(map[string]interface{}) if qutil.ObjToString(pm["winner"]) != "" || qutil.Float64All(pm["budget"]) > 0 || qutil.Float64All(pm["bidamount"]) > 0 { update["multipackage"] = 1 break } } } else { update["multipackage"] = 0 } compare = nil } else { area := qutil.ObjToString(tmp["area"]) city := qutil.ObjToString(tmp["city"]) district := qutil.ObjToString(tmp["district"]) rdata := standardCheckCity(area, city, district) if len(rdata) > 0 { for k, v := range rdata { update[k] = v } } } //------------------对比结束 //处理key descript if bkey == "" { DealInfo(&tmp, &update) } //同时保存到elastic for tk, tv := range update { tmp[tk] = tv } if tmp["s_winner"] != "" { sWinnerarr := strings.Split(qutil.ObjToString(tmp["s_winner"]), ",") var cid []string for _, w := range sWinnerarr { if w != "" { id := redis.GetStr("qyxy_id", w) if id == "" { ents, _ := mgostandard.Find("qyxy_std", bson.M{"company_name": w}, bson.M{"updatetime": -1}, nil, false, -1, -1) if len(*ents) > 0 { id = qutil.ObjToString((*ents)[0]["_id"]) redis.PutCKV("qyxy_id", w, id) } else { ent, _ := qyxydb.FindOne("company_history_name", bson.M{"history_name": w}) if len(*ent) > 0 { id = qutil.ObjToString((*ent)["company_id"]) redis.PutCKV("qyxy_id", w, id) } } } if id == "" { id = "-" } cid = append(cid, id) //ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"company_name": w}) //if len(*ent) > 0 { // cid = append(cid, qutil.ObjToString((*ent)["company_id"])) //}else { // ent, _ = mgostandard.FindOne("qyxy_std", map[string]interface{}{"company_name": w}) // if len(*ent) > 0 { // cid = append(cid, qutil.ObjToString((*ent)["_id"])) // } //} } } if len(cid) > 0 { tmp["entidlist"] = cid update["entidlist"] = cid tmp_up := []map[string]interface{}{} tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]}) tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}}) UpdataMgoCache <- tmp_up } } //对projectscope字段的索引处理 ps, _ := tmp["projectscope"].(string) if len(ps) > ESLEN { tmp["projectscope"] = string(([]rune(ps))[:4000]) } //对标的物为空处理 if filetext := getFileText(tmp); len(filetext) > 10 { //attach_text // if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1 // tmp["detail"] = filetext //更新es中detail // update["detail"] = filetext //更新mongo中detail // update["filedetail"] = 1 //mongo中打标记 // } tmp["filetext"] = filetext } if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" { delete(tmp, "purchasing") } if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 { delete(tmp, "purchasinglist") } //数据为空处理 for _, f := range []string{"bidstatus", "city", "district", "channel"} { if fVal, ok := tmp[f].(string); ok && fVal == "" { delete(tmp, f) } } UpdatesLock.Lock() // for k1, _ := range tmp { // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" { // delete(tmp, k1) // } // } go IS.Add("bidding") if qutil.IntAll(update["extracttype"]) != -1 { n2++ newTmp := map[string]interface{}{} //最终生索引的数据 for field, ftype := range biddingIndexFieldsMap { // if tmp[field] != nil { // if field == "projectinfo" { mp, _ := tmp[field].(map[string]interface{}) if mp != nil { newmap := map[string]interface{}{} for k, ktype := range projectinfoFieldsMap { mpv := mp[k] if mpv != nil && reflect.TypeOf(mpv).String() == ktype { newmap[k] = mp[k] } } if len(newmap) > 0 { newTmp[field] = newmap } } } else if field == "purchasinglist" { //标的物处理 purchasinglist_new := []map[string]interface{}{} if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 { for _, ls := range pcl { lsm_new := make(map[string]interface{}) lsm := ls.(map[string]interface{}) for pf, pftype := range purchasinglistFieldsMap { lsmv := lsm[pf] if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype { lsm_new[pf] = lsm[pf] } } if lsm_new != nil && len(lsm_new) > 0 { purchasinglist_new = append(purchasinglist_new, lsm_new) } } } if len(purchasinglist_new) > 0 { newTmp[field] = purchasinglist_new } } else if field == "winnerorder" { //中标候选 winnerorder_new := []map[string]interface{}{} if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 { for _, win := range winnerorder { winMap_new := make(map[string]interface{}) winMap := win.(map[string]interface{}) for wf, wftype := range winnerorderlistFieldsMap { wfv := winMap[wf] if wfv != nil && reflect.TypeOf(wfv).String() == wftype { if wf == "sort" && qutil.Int64All(wfv) > 100 { continue } winMap_new[wf] = winMap[wf] } } if winMap_new != nil && len(winMap_new) > 0 { winnerorder_new = append(winnerorder_new, winMap_new) } } } if len(winnerorder_new) > 0 { newTmp[field] = winnerorder_new } } else if field == "qualifies" { //项目资质 qs := []string{} if q, _ := tmp[field].([]interface{}); len(q) > 0 { for _, v := range q { v1 := v.(map[string]interface{}) qs = append(qs, qutil.ObjToString(v1["key"])) } } if len(qs) > 0 { newTmp[field] = strings.Join(qs, ",") } } else if field == "detail" { //过滤 detail, _ := tmp[field].(string) if len([]rune(detail)) > detailLength { detail = detail[:detailLength] } newTmp[field] = qutil.ObjToString(tmp["title"]) + " " + FilterDetail(detail) } else if field == "_id" || field == "topscopeclass" { //不做处理 newTmp[field] = tmp[field] } else if field == "publishtime" || field == "comeintime" { //字段类型不正确,特别处理 if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 { newTmp[field] = qutil.Int64All(tmp[field]) } } else if field == "review_experts" { // 评审专家 if arr, ok := tmp["review_experts"].([]interface{}); ok && len(arr) > 0 { arr1 := qutil.ObjArrToStringArr(arr) newTmp[field] = strings.Join(arr1, ",") } } else if field == "entidlist" { newTmp[field] = tmp[field] } else if field == "bidopentime" { if tmp[field] != nil && tmp["bidendtime"] == nil { newTmp["bidendtime"] = tmp[field] newTmp[field] = tmp[field] } else if tmp[field] == nil && tmp["bidendtime"] != nil { newTmp["bidendtime"] = tmp["bidendtime"] newTmp[field] = tmp["bidendtime"] } else { if tmp["bidopentime"] != nil { newTmp[field] = tmp["bidopentime"] } } } else { //其它字段判断数据类型,不正确舍弃 if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype { continue } else { if fieldval != "" { newTmp[field] = fieldval } } } } } YuceEndtime(newTmp) // 预测结果时间 newTmp["createtime"] = time.Now().Unix() // es库数据创建时间,只有增量数据有 if qutil.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" { // 剑鱼信息发布数据 通过udp通知信息发布程序 go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"])) } arrEs = append(arrEs, newTmp) } if len(update) > 0 { delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除 arr = append(arr, []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": update}, }) } if len(arr) >= BulkSize-1 { mgo.UpdateBulkAll(db, c, arr...) arr = [][]map[string]interface{}{} } if len(arrEs) >= BulkSize-1 { tmps := arrEs if StopFlag { qutil.Debug("es队列紧张,暂停10s执行") time.Sleep(time.Second * 10) } elastic.BulkSave(index, itype, &tmps, true) if other_index != "" && other_itype != "" { elastic.BulkSave(other_index, other_itype, &tmps, true) } if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } arrEs = []map[string]interface{}{} } UpdatesLock.Unlock() if n%100 == 0 { log.Println("current:", n) } tmp = make(map[string]interface{}) } UpdatesLock.Lock() if len(arr) > 0 { mgo.UpdateBulkAll(db, c, arr...) } if len(arrEs) > 0 { tmps := arrEs if StopFlag { qutil.Debug("es队列紧张,暂停10s执行") time.Sleep(time.Second * 10) } elastic.BulkSave(index, itype, &tmps, true) if other_index != "" && other_itype != "" { bidding_other_es.BulkSave(other_index, other_itype, &tmps, true) } if len(multiIndex) == 2 { elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true) } } UpdatesLock.Unlock() return n1, n2 } var client *mu.Client var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$") var reg_space = regexp.MustCompile("(?ism)(.*?)|([.#]?\\w{1,20}\\{.*?\\})|(<.*?>)|(\\\\t)+|\\t|( +)|( +)|(" + string(rune(160)) + "+)") var reg_row = regexp.MustCompile("(?i)<(tr|div|p)[^>]*?>|(\\n)+") var reg_dh = regexp.MustCompile("[,]+") var reg_newdb = regexp.MustCompile("([:,、:,。.;])[,]") var reg_no = regexp.MustCompile("^[0-9]*$") var reg_letter = regexp.MustCompile("[a-z]*") var MSG_SERVER = "123.56.236.148:7070" var DesLen = 120 func inits() { ser := qutil.ObjToString(Sysconfig["msg_server"]) if ser != "" { MSG_SERVER = ser } cf := &mu.ClientConfig{ ClientName: "剑鱼抽关键词", EventHandler: func(p *mu.Packet) {}, MsgServerAddr: MSG_SERVER, CanHandleEvents: []int{}, OnConnectSuccess: func() { log.Println("c.") }, ReadBufferSize: 10, WriteBufferSize: 10, } client, _ = mu.NewClient(cf) } //var clientlock = &sync.Mutex{} var keypool = make(chan bool, 1) func DealInfo(obj, update *map[string]interface{}) { defer qutil.Catch() if (*obj)["keywords"] != nil && (*obj)["description"] != nil { return } else { (*update)["keywords"] = "" (*update)["description"] = "" } title := qutil.ObjToString((*obj)["title"]) var m [][]string select { case <-func() <-chan bool { ch := make(chan bool, 1) go func(chan bool) { select { case keypool <- true: defer func() { <-keypool }() ret, _ := client.Call("", mu.UUID(8), 4010, mu.SENDTO_TYPE_RAND_RECIVER, title, 1) json.Unmarshal(ret, &m) case <-time.After(10 * time.Millisecond): } ch <- true }(ch) return ch }(): case <-time.After(40 * time.Millisecond): } arr := []string{} keyword := []string{} keywordnew := []string{} for _, tmp := range m { if reg.MatchString(tmp[0]) { arr = append(arr, tmp[0]) } else { if len(arr) > 0 { str := strings.Join(arr, "") keyword = append(keyword, str) arr = []string{} } if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) { keyword = append(keyword, tmp[0]) } } } for _, v := range keyword { v = reg_no.ReplaceAllString(v, "") if len(v) > 0 { keywordnew = append(keywordnew, v) } } keywords := strings.Join(keywordnew, ",") (*update)["keywords"] = keywords content := "" if (*obj)["detail_bak"] != nil { content = qutil.ObjToString((*obj)["detail_bak"]) } else { content = qutil.ObjToString((*obj)["detail"]) } //内容替换 content = strings.Replace(content, " ", "", -1) content = reg_space.ReplaceAllString(content, "") content = reg_row.ReplaceAllString(content, ",") content = reg_dh.ReplaceAllString(content, ",") content = reg_newdb.ReplaceAllString(content, "$1") if strings.HasPrefix(content, ",") { content = content[1:] } //log.Println(content) tc := []rune(content) ltc := len(tc) description := content if ltc > DesLen { description = string(tc[:DesLen]) } (*update)["description"] = description //保存到数据库 return } // 预测结果时间 func YuceEndtime(tmp map[string]interface{}) { flag := true scope := []string{"服务采购_法律咨询", "服务采购_会计", "服务采购_物业", "服务采购_审计", "服务采购_安保", "服务采购_仓储物流", "服务采购_广告宣传印刷"} subscopeclass := qutil.ObjToString(tmp["s_subscopeclass"]) for _, v := range scope { if strings.Contains(subscopeclass, v) { flag = false break } } if flag { return } subtype := qutil.ObjToString(tmp["subtype"]) if subtype == "成交" || subtype == "合同" { // yucestarttime、yuceendtime yucestarttime, yuceendtime := int64(0), int64(0) // 项目周期中 if qutil.ObjToString(tmp["projectperiod"]) != "" { dateStr := date1.FindStringSubmatch(qutil.ObjToString(tmp["projectperiod"])) if len(dateStr) == 2 { sdate := FormatDateStr(dateStr[0]) edate := FormatDateStr(dateStr[1]) if sdate < edate && sdate != 0 && edate != 0 { yucestarttime = sdate yuceendtime = edate } } } if yucestarttime > 0 && yuceendtime > yucestarttime { tmp["yuceendtime"] = yuceendtime return } // 预测开始时间 合同签订日期 if yucestarttime == 0 { if qutil.IntAll(tmp["signaturedate"]) <= 0 { if qutil.IntAll(tmp["publishtime"]) <= 0 { return } else { yucestarttime = qutil.Int64All(tmp["publishtime"]) } } else { yucestarttime = qutil.Int64All(tmp["signaturedate"]) } } // 预测结束时间 if yucestarttime > 0 && yuceendtime == 0 { if qutil.IntAll(tmp["project_duration"]) > 0 && qutil.ObjToString(tmp["project_timeunit"]) != "" { yuceendtime = YcEndTime(yucestarttime, qutil.IntAll(tmp["project_duration"]), qutil.ObjToString(tmp["project_timeunit"])) tmp["yuceendtime"] = yuceendtime } } } } func YcEndTime(starttime int64, num int, unit string) int64 { yuceendtime := int64(0) if unit == "日历天" || unit == "天" || unit == "日" { yuceendtime = starttime + int64(num*86400) } else if unit == "周" { yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix() } else if unit == "月" { yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix() } else if unit == "年" { yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix() } else if unit == "工作日" { n := num / 7 * 2 yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix() } return yuceendtime } func FormatDateStr(ds string) int64 { ds = strings.Replace(ds, "年", "-", -1) ds = strings.Replace(ds, "月", "-", -1) ds = strings.Replace(ds, "日", "", -1) ds = strings.Replace(ds, "/", "-", -1) ds = strings.Replace(ds, ".", "-", -1) location, err := time.ParseInLocation(qutil.Date_Short_Layout, ds, time.Local) if err != nil { qutil.Debug(err) return 0 } else { return location.Unix() } } type Request struct { InfoId string } type Response struct { Rep []map[string]interface{} } // @Description rpc调用信息发布程序接口 // @Author J 2022/4/13 9:13 AM func UdpMethod(id string) { mapinfo := map[string]interface{}{ "infoid": id, "stype": "jyfb_data_over", } datas, _ := json.Marshal(mapinfo) qutil.Debug(JyUdpAddr, string(datas)) _ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, JyUdpAddr) }