|
@@ -2,29 +2,19 @@ package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
- "go.mongodb.org/mongo-driver/bson"
|
|
|
- "mongodb"
|
|
|
- "qfw/util/redis"
|
|
|
- "reflect"
|
|
|
- //"fmt"
|
|
|
- "log"
|
|
|
mu "mfw/util"
|
|
|
- "net"
|
|
|
- qutil "qfw/util"
|
|
|
- elastic "qfw/util/elastic"
|
|
|
+ "mongodb"
|
|
|
+ "qfw/util"
|
|
|
"regexp"
|
|
|
"strings"
|
|
|
- "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-var date1 = regexp.MustCompile("20[0-2][0-9][年|\\-\\/|.][0-9]{1,2}[月|\\-|\\/|.][0-9]{1,2}[日]?")
|
|
|
-
|
|
|
-//对字段处理 bidamount budget
|
|
|
//招标数据表和抽取表一一对应开始更新
|
|
|
|
|
|
-func biddingTask(data []byte, mapInfo map[string]interface{}, tasktype string) {
|
|
|
- defer qutil.Catch()
|
|
|
+func (t *TaskInfo) biddingTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
+ defer util.Catch()
|
|
|
+
|
|
|
q, _ := mapInfo["query"].(map[string]interface{})
|
|
|
bkey, _ := mapInfo["bkey"].(string)
|
|
|
if q == nil {
|
|
@@ -35,107 +25,88 @@ func biddingTask(data []byte, mapInfo map[string]interface{}, tasktype string) {
|
|
|
},
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- //logger.SetRollingDaily("./logs", "id.log")
|
|
|
- //连接信息
|
|
|
- c, _ := bidding["collect"].(string)
|
|
|
- extractc, _ := bidding["extractcollect"].(string)
|
|
|
- db, _ := bidding["db"].(string)
|
|
|
- extractdb, _ := bidding["extractdb"].(string)
|
|
|
- index, _ := bidding["index"].(string)
|
|
|
- itype, _ := bidding["type"].(string)
|
|
|
-
|
|
|
//extract库
|
|
|
- extractsession := extractmgo.GetMgoConn()
|
|
|
- defer extractmgo.DestoryMongoConn(extractsession)
|
|
|
- extractquery := extractsession.DB(extractdb).C(extractc).Find(q).Sort("_id").Iter()
|
|
|
+ extractConn := extractMgo.GetMgoConn()
|
|
|
+ defer extractMgo.DestoryMongoConn(extractConn)
|
|
|
+ extractc, _ := extract["collect"].(string)
|
|
|
+ extractResult := extractConn.DB(extractMgo.DbName).C(extractc).Find(q).Sort("_id").Iter()
|
|
|
eMap := map[string]map[string]interface{}{}
|
|
|
extCount, repeatCount := 0, 0
|
|
|
- for tmp := make(map[string]interface{}); extractquery.Next(tmp); extCount++ {
|
|
|
- if qutil.IntAll(tmp["repeat"]) == 1 {
|
|
|
+ for tmp := make(map[string]interface{}); extractResult.Next(tmp); extCount++ {
|
|
|
+ if util.IntAll(tmp["repeat"]) == 1 {
|
|
|
repeatCount++
|
|
|
}
|
|
|
tid := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
eMap[tid] = tmp
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
+ util.Debug("抽取表 重复数据量:", extCount, repeatCount)
|
|
|
|
|
|
//bidding库
|
|
|
- session := mgo.GetMgoConn()
|
|
|
- count, _ := session.DB(db).C(c).Find(&q).Count()
|
|
|
- log.Println("抽取表 重复数据量:", extCount, repeatCount)
|
|
|
- log.Println("查询语句:", q, "同步总数:", count, "elastic库:", index)
|
|
|
+ biddingConn := biddingMgo.GetMgoConn()
|
|
|
+ c, _ := bidding["collect"].(string)
|
|
|
+ count, _ := biddingConn.DB(biddingMgo.DbName).C(c).Find(&q).Count()
|
|
|
+ util.Debug("查询语句:", q, "同步总数:", count)
|
|
|
n1, n2 := 0, 0
|
|
|
if count < 200000 {
|
|
|
var res []map[string]interface{}
|
|
|
- //res := make([]map[string]interface{}, 1)
|
|
|
- result := session.DB(db).C(c).Find(q).Select(map[string]interface{}{
|
|
|
- "projectinfo.attachment": 0,
|
|
|
- "contenthtml": 0,
|
|
|
+ result := biddingConn.DB(biddingMgo.DbName).C(c).Find(q).Select(map[string]interface{}{
|
|
|
+ "contenthtml": 0,
|
|
|
}).Iter()
|
|
|
for tmp := make(map[string]interface{}); result.Next(tmp); {
|
|
|
res = append(res, tmp)
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- mgo.DestoryMongoConn(session)
|
|
|
- log.Println("查询结果", "bidding:", count, "抽取:", extCount)
|
|
|
- if int64(len(res)) != count {
|
|
|
- time.Sleep(20 * time.Second)
|
|
|
- toadd := &net.UDPAddr{
|
|
|
- IP: net.ParseIP("127.0.0.1"),
|
|
|
- Port: qutil.IntAll(Sysconfig["udpport"]),
|
|
|
- }
|
|
|
- udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd)
|
|
|
- }
|
|
|
- n1, n2 = doIndex(res, eMap, index, itype, db, c, bkey, tasktype)
|
|
|
- //if int64(n1 + n2) != count {
|
|
|
- // log.Println("任务错误,结果不一致")
|
|
|
+ biddingMgo.DestoryMongoConn(biddingConn)
|
|
|
+ util.Debug("查询结果", "bidding:", count, "抽取:", extCount)
|
|
|
+ //if int64(len(res)) != count {
|
|
|
+ // time.Sleep(20 * time.Second)
|
|
|
+ // toadd := &net.UDPAddr{
|
|
|
+ // IP: net.ParseIP("127.0.0.1"),
|
|
|
+ // Port: util.IntAll(Sysconfig["udpport"]),
|
|
|
+ // }
|
|
|
+ // udpclient.WriteUdp(data, mu.OP_TYPE_DATA, toadd)
|
|
|
//}
|
|
|
+ n1, n2 = t.doIndex(res, eMap, bkey)
|
|
|
} else {
|
|
|
- log.Println("数据量太大,放弃!", count)
|
|
|
- mgo.DestoryMongoConn(session)
|
|
|
+ util.Debug("数据量太大,放弃!", count)
|
|
|
+ biddingMgo.DestoryMongoConn(biddingConn)
|
|
|
}
|
|
|
- log.Println(mapInfo, "create bidding index...over", "all:", count, "bidding size:", n1, ",es size:", n2)
|
|
|
- if tasktype == "bidding_history" {
|
|
|
- qutil.Debug(tasktype)
|
|
|
+ util.Debug(mapInfo, "create bidding index...over", "all:", count, "bidding size:", n1, ",es size:", n2)
|
|
|
+ if t.stype == "bidding_history" {
|
|
|
// 历史判重id段结束之后 生全量数据索引
|
|
|
- biddingDataTask(data, mapInfo)
|
|
|
+ t.stype = "biddingdata"
|
|
|
+ t.thread = 30
|
|
|
+ t.biddingDataTask(data, mapInfo)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, index, itype, db, c, bkey, tasktype string) (int, int) {
|
|
|
+func (t *TaskInfo) doIndex(infos []map[string]interface{}, eMap map[string]map[string]interface{}, bkey string) (int, int) {
|
|
|
n1, n2 := 0, 0 //bidding数量,索引数量
|
|
|
- //线程池
|
|
|
- UpdatesLock := sync.Mutex{}
|
|
|
- fields := strings.Split(bidding["fields"].(string), ",")
|
|
|
- //更新数组
|
|
|
- arr := [][]map[string]interface{}{}
|
|
|
- arrEs := []map[string]interface{}{}
|
|
|
//对比两张表数据,减少查询次数
|
|
|
var compare map[string]interface{}
|
|
|
- log.Println("开始迭代..")
|
|
|
+ util.Debug("start ...")
|
|
|
for n, tmp := range infos {
|
|
|
n1++
|
|
|
- if sensitive := qutil.ObjToString(tmp["sensitive"]); sensitive == "测试" || sensitive == "异常" { //bidding中有敏感词,不生索引
|
|
|
+ if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
|
|
|
tmp = make(map[string]interface{})
|
|
|
continue
|
|
|
}
|
|
|
tid := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
- //loginfo := make(map[string]interface{}) // 日志
|
|
|
update := map[string]interface{}{} //要更新的mongo数据
|
|
|
//对比方法----------------
|
|
|
if eMap[tid] != nil {
|
|
|
compare = eMap[tid]
|
|
|
- if tasktype == "bidding" {
|
|
|
+ if t.stype == "bidding" {
|
|
|
// 增量id段 正常数据
|
|
|
- if num := qutil.IntAll(compare["dataging"]); num == 1 { //extract中dataging=1跳过
|
|
|
+ if num := util.IntAll(compare["dataging"]); num == 1 { //extract中dataging=1跳过
|
|
|
tmp = make(map[string]interface{})
|
|
|
compare = nil
|
|
|
continue
|
|
|
}
|
|
|
delete(eMap, tid)
|
|
|
}
|
|
|
- if tasktype == "bidding_history" {
|
|
|
+ if t.stype == "bidding_history" {
|
|
|
//增量id段 历史数据
|
|
|
if compare["history_updatetime"] == nil { //extract中history_updatetime不存在跳过
|
|
|
tmp = make(map[string]interface{})
|
|
@@ -152,7 +123,7 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
}
|
|
|
}
|
|
|
//更新bidding表,生成索引
|
|
|
- for _, k := range fields {
|
|
|
+ for _, k := range biddingMgoFields {
|
|
|
v1 := compare[k] //extract
|
|
|
v2 := tmp[k] //bidding
|
|
|
if v2 == nil && v1 != nil && !modifyinfo[k] {
|
|
@@ -167,68 +138,26 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- if qutil.IntAll(compare["repeat"]) == 1 {
|
|
|
+ if util.IntAll(compare["repeat"]) == 1 {
|
|
|
update["extracttype"] = -1
|
|
|
} else {
|
|
|
update["extracttype"] = 1
|
|
|
}
|
|
|
} else {
|
|
|
compare = nil
|
|
|
- if qutil.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
|
|
|
+ if util.IntAll(tmp["dataging"]) == 1 { //修改未抽取的bidding数据的dataging
|
|
|
update["dataging"] = 0
|
|
|
}
|
|
|
}
|
|
|
//下面可以多线程跑的--->
|
|
|
//处理分类
|
|
|
if compare != nil { //extract
|
|
|
- subscopeclass, _ := compare["subscopeclass"].([]interface{}) //subscopeclass
|
|
|
- if subscopeclass != nil {
|
|
|
- //str := ","
|
|
|
- m1 := map[string]bool{}
|
|
|
- newclass := []string{}
|
|
|
- for _, sc := range subscopeclass {
|
|
|
- sclass, _ := sc.(string)
|
|
|
- if !m1[sclass] {
|
|
|
- m1[sclass] = true
|
|
|
- //str += sclass + ","
|
|
|
- newclass = append(newclass, sclass)
|
|
|
- }
|
|
|
- }
|
|
|
- update["s_subscopeclass"] = strings.Join(newclass, ",")
|
|
|
- update["subscopeclass"] = newclass
|
|
|
- }
|
|
|
- topscopeclass, _ := compare["topscopeclass"].([]interface{}) //topscopeclass
|
|
|
- if topscopeclass != nil {
|
|
|
- m2 := map[string]bool{}
|
|
|
- newclass := []string{}
|
|
|
- for _, tc := range topscopeclass {
|
|
|
- tclass, _ := tc.(string)
|
|
|
- tclass = reg_letter.ReplaceAllString(tclass, "") // 去除字母
|
|
|
- if !m2[tclass] {
|
|
|
- m2[tclass] = true
|
|
|
- newclass = append(newclass, tclass)
|
|
|
- }
|
|
|
- }
|
|
|
- update["s_topscopeclass"] = strings.Join(newclass, ",")
|
|
|
- }
|
|
|
- if package1 := compare["package"]; package1 != nil {
|
|
|
- packageM, _ := package1.(map[string]interface{})
|
|
|
- for _, p := range packageM {
|
|
|
- pm, _ := p.(map[string]interface{})
|
|
|
- if qutil.ObjToString(pm["winner"]) != "" || qutil.Float64All(pm["budget"]) > 0 ||
|
|
|
- qutil.Float64All(pm["bidamount"]) > 0 {
|
|
|
- update["multipackage"] = 1
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- update["multipackage"] = 0
|
|
|
- }
|
|
|
+ FieldMethod(compare, update)
|
|
|
compare = nil
|
|
|
} else {
|
|
|
- area := qutil.ObjToString(tmp["area"])
|
|
|
- city := qutil.ObjToString(tmp["city"])
|
|
|
- district := qutil.ObjToString(tmp["district"])
|
|
|
+ area := util.ObjToString(tmp["area"])
|
|
|
+ city := util.ObjToString(tmp["city"])
|
|
|
+ district := util.ObjToString(tmp["district"])
|
|
|
rdata := standardCheckCity(area, city, district)
|
|
|
if len(rdata) > 0 {
|
|
|
for k, v := range rdata {
|
|
@@ -246,260 +175,57 @@ func doIndex(infos []map[string]interface{}, eMap map[string]map[string]interfac
|
|
|
for tk, tv := range update {
|
|
|
tmp[tk] = tv
|
|
|
}
|
|
|
+
|
|
|
+ extractMap := make(map[string]interface{})
|
|
|
if tmp["s_winner"] != "" {
|
|
|
- sWinnerarr := strings.Split(qutil.ObjToString(tmp["s_winner"]), ",")
|
|
|
- var cid []string
|
|
|
- for _, w := range sWinnerarr {
|
|
|
- if w != "" {
|
|
|
- id := redis.GetStr("qyxy_id", w)
|
|
|
- if id == "" {
|
|
|
- ents, _ := mgostandard.Find("qyxy_std", bson.M{"company_name": w}, bson.M{"updatetime": -1}, nil, false, -1, -1)
|
|
|
- if len(*ents) > 0 {
|
|
|
- id = qutil.ObjToString((*ents)[0]["_id"])
|
|
|
- redis.PutCKV("qyxy_id", w, id)
|
|
|
- } else {
|
|
|
- ent, _ := qyxydb.FindOne("company_history_name", bson.M{"history_name": w})
|
|
|
- if len(*ent) > 0 {
|
|
|
- id = qutil.ObjToString((*ent)["company_id"])
|
|
|
- redis.PutCKV("qyxy_id", w, id)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if id == "" {
|
|
|
- id = "-"
|
|
|
- }
|
|
|
- cid = append(cid, id)
|
|
|
- //ent, _ := mgostandard.FindOne("qyxy_historyname", map[string]interface{}{"company_name": w})
|
|
|
- //if len(*ent) > 0 {
|
|
|
- // cid = append(cid, qutil.ObjToString((*ent)["company_id"]))
|
|
|
- //}else {
|
|
|
- // ent, _ = mgostandard.FindOne("qyxy_std", map[string]interface{}{"company_name": w})
|
|
|
- // if len(*ent) > 0 {
|
|
|
- // cid = append(cid, qutil.ObjToString((*ent)["_id"]))
|
|
|
- // }
|
|
|
- //}
|
|
|
- }
|
|
|
- }
|
|
|
+ cid := FieldFun(tmp)
|
|
|
if len(cid) > 0 {
|
|
|
tmp["entidlist"] = cid
|
|
|
update["entidlist"] = cid
|
|
|
- tmp_up := []map[string]interface{}{}
|
|
|
- tmp_up = append(tmp_up, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
- tmp_up = append(tmp_up, map[string]interface{}{"$set": map[string]interface{}{"entidlist": cid}})
|
|
|
- UpdataMgoCache <- tmp_up
|
|
|
+ extractMap["entidlist"] = cid
|
|
|
}
|
|
|
}
|
|
|
- //对projectscope字段的索引处理
|
|
|
- ps, _ := tmp["projectscope"].(string)
|
|
|
- if len(ps) > ESLEN {
|
|
|
- tmp["projectscope"] = string(([]rune(ps))[:4000])
|
|
|
- }
|
|
|
- //对标的物为空处理
|
|
|
- if filetext := getFileText(tmp); len(filetext) > 10 { //attach_text
|
|
|
- // if site, _ := tmp["site"].(string); site == "中国招标投标公共服务平台" { //site:中国招标投标公共服务平台 detail替换成filetext 并加入标记filedetail=1
|
|
|
- // tmp["detail"] = filetext //更新es中detail
|
|
|
- // update["detail"] = filetext //更新mongo中detail
|
|
|
- // update["filedetail"] = 1 //mongo中打标记
|
|
|
- // }
|
|
|
- tmp["filetext"] = filetext
|
|
|
- }
|
|
|
- if purchasing, ok := tmp["purchasing"].(string); ok && purchasing == "" {
|
|
|
- delete(tmp, "purchasing")
|
|
|
- }
|
|
|
- if purchasinglist, ok := tmp["purchasinglist"].([]interface{}); ok && len(purchasinglist) == 0 {
|
|
|
- delete(tmp, "purchasinglist")
|
|
|
- }
|
|
|
- //数据为空处理
|
|
|
- for _, f := range []string{"bidstatus", "city", "district", "channel"} {
|
|
|
- if fVal, ok := tmp[f].(string); ok && fVal == "" {
|
|
|
- delete(tmp, f)
|
|
|
+ // 6.10 剑鱼发布信息分类处理, 写在这里是为了修改抽取表
|
|
|
+ TypeMethod(tmp, update, extractMap)
|
|
|
+ if len(extractMap) > 0 {
|
|
|
+ if extractMap["toptype"] != nil && extractMap["subtype"] == nil {
|
|
|
+ updateExtractPool <- []map[string]interface{}{
|
|
|
+ {"_id": tmp["_id"]},
|
|
|
+ {"$set": extractMap, "$unset": map[string]interface{}{"subtype": ""}},
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ updateExtractPool <- []map[string]interface{}{
|
|
|
+ {"_id": tmp["_id"]},
|
|
|
+ {"$set": extractMap},
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- UpdatesLock.Lock()
|
|
|
- // for k1, _ := range tmp {
|
|
|
- // if strings.HasSuffix(k1, "_b") || k1 == "contenthtml" {
|
|
|
- // delete(tmp, k1)
|
|
|
- // }
|
|
|
- // }
|
|
|
- go IS.Add("bidding")
|
|
|
- if qutil.IntAll(update["extracttype"]) != -1 {
|
|
|
- n2++
|
|
|
- newTmp := map[string]interface{}{} //最终生索引的数据
|
|
|
- for field, ftype := range biddingIndexFieldsMap { //
|
|
|
- if tmp[field] != nil { //
|
|
|
- if field == "projectinfo" {
|
|
|
- mp, _ := tmp[field].(map[string]interface{})
|
|
|
- if mp != nil {
|
|
|
- newmap := map[string]interface{}{}
|
|
|
- for k, ktype := range projectinfoFieldsMap {
|
|
|
- mpv := mp[k]
|
|
|
- if mpv != nil && reflect.TypeOf(mpv).String() == ktype {
|
|
|
- newmap[k] = mp[k]
|
|
|
- }
|
|
|
- }
|
|
|
- if len(newmap) > 0 {
|
|
|
- newTmp[field] = newmap
|
|
|
- }
|
|
|
- }
|
|
|
- } else if field == "purchasinglist" { //标的物处理
|
|
|
- purchasinglist_new := []map[string]interface{}{}
|
|
|
- if pcl, _ := tmp[field].([]interface{}); len(pcl) > 0 {
|
|
|
- for _, ls := range pcl {
|
|
|
- lsm_new := make(map[string]interface{})
|
|
|
- lsm := ls.(map[string]interface{})
|
|
|
- for pf, pftype := range purchasinglistFieldsMap {
|
|
|
- lsmv := lsm[pf]
|
|
|
- if lsmv != nil && reflect.TypeOf(lsmv).String() == pftype {
|
|
|
- lsm_new[pf] = lsm[pf]
|
|
|
- }
|
|
|
- }
|
|
|
- if lsm_new != nil && len(lsm_new) > 0 {
|
|
|
- purchasinglist_new = append(purchasinglist_new, lsm_new)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if len(purchasinglist_new) > 0 {
|
|
|
- newTmp[field] = purchasinglist_new
|
|
|
- }
|
|
|
- } else if field == "winnerorder" { //中标候选
|
|
|
- winnerorder_new := []map[string]interface{}{}
|
|
|
- if winnerorder, _ := tmp[field].([]interface{}); len(winnerorder) > 0 {
|
|
|
- for _, win := range winnerorder {
|
|
|
- winMap_new := make(map[string]interface{})
|
|
|
- winMap := win.(map[string]interface{})
|
|
|
- for wf, wftype := range winnerorderlistFieldsMap {
|
|
|
- wfv := winMap[wf]
|
|
|
- if wfv != nil && reflect.TypeOf(wfv).String() == wftype {
|
|
|
- if wf == "sort" && qutil.Int64All(wfv) > 100 {
|
|
|
- continue
|
|
|
- }
|
|
|
- winMap_new[wf] = winMap[wf]
|
|
|
- }
|
|
|
- }
|
|
|
- if winMap_new != nil && len(winMap_new) > 0 {
|
|
|
- winnerorder_new = append(winnerorder_new, winMap_new)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if len(winnerorder_new) > 0 {
|
|
|
- newTmp[field] = winnerorder_new
|
|
|
- }
|
|
|
- } else if field == "qualifies" {
|
|
|
- //项目资质
|
|
|
- qs := []string{}
|
|
|
- if q, _ := tmp[field].([]interface{}); len(q) > 0 {
|
|
|
- for _, v := range q {
|
|
|
- v1 := v.(map[string]interface{})
|
|
|
- qs = append(qs, qutil.ObjToString(v1["key"]))
|
|
|
- }
|
|
|
- }
|
|
|
- if len(qs) > 0 {
|
|
|
- newTmp[field] = strings.Join(qs, ",")
|
|
|
- }
|
|
|
- } else if field == "detail" { //过滤
|
|
|
- detail, _ := tmp[field].(string)
|
|
|
- if len([]rune(detail)) > detailLength {
|
|
|
- detail = detail[:detailLength]
|
|
|
- }
|
|
|
- newTmp[field] = qutil.ObjToString(tmp["title"]) + " " + FilterDetail(detail)
|
|
|
- } else if field == "_id" || field == "topscopeclass" { //不做处理
|
|
|
- newTmp[field] = tmp[field]
|
|
|
- } else if field == "publishtime" || field == "comeintime" {
|
|
|
- //字段类型不正确,特别处理
|
|
|
- if tmp[field] != nil && qutil.Int64All(tmp[field]) > 0 {
|
|
|
- newTmp[field] = qutil.Int64All(tmp[field])
|
|
|
- }
|
|
|
|
|
|
- } else if field == "review_experts" {
|
|
|
- // 评审专家
|
|
|
- if arr, ok := tmp["review_experts"].([]interface{}); ok && len(arr) > 0 {
|
|
|
- arr1 := qutil.ObjArrToStringArr(arr)
|
|
|
- newTmp[field] = strings.Join(arr1, ",")
|
|
|
- }
|
|
|
- } else if field == "entidlist" {
|
|
|
- newTmp[field] = tmp[field]
|
|
|
- } else if field == "bidopentime" {
|
|
|
- if tmp[field] != nil && tmp["bidendtime"] == nil {
|
|
|
- newTmp["bidendtime"] = tmp[field]
|
|
|
- newTmp[field] = tmp[field]
|
|
|
- } else if tmp[field] == nil && tmp["bidendtime"] != nil {
|
|
|
- newTmp["bidendtime"] = tmp["bidendtime"]
|
|
|
- newTmp[field] = tmp["bidendtime"]
|
|
|
- } else {
|
|
|
- if tmp["bidopentime"] != nil {
|
|
|
- newTmp[field] = tmp["bidopentime"]
|
|
|
- }
|
|
|
- }
|
|
|
- } else { //其它字段判断数据类型,不正确舍弃
|
|
|
- if fieldval := tmp[field]; reflect.TypeOf(fieldval).String() != ftype {
|
|
|
- continue
|
|
|
- } else {
|
|
|
- if fieldval != "" {
|
|
|
- newTmp[field] = fieldval
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- YuceEndtime(newTmp) // 预测结果时间
|
|
|
- newTmp["createtime"] = time.Now().Unix() // es库数据创建时间,只有增量数据有
|
|
|
- if qutil.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
|
|
|
+ clearMap(tmp)
|
|
|
+ //go IS.Add("bidding")
|
|
|
+ if util.IntAll(update["extracttype"]) != -1 {
|
|
|
+ n2++
|
|
|
+ newTmp := t.GetEsField(tmp, update)
|
|
|
+ if util.ObjToString(newTmp["spidercode"]) == "a_jyxxfbpt_gg" {
|
|
|
// 剑鱼信息发布数据 通过udp通知信息发布程序
|
|
|
go UdpMethod(mongodb.BsonIdToSId(newTmp["_id"]))
|
|
|
}
|
|
|
- arrEs = append(arrEs, newTmp)
|
|
|
+ saveEsPool <- newTmp
|
|
|
+
|
|
|
}
|
|
|
if len(update) > 0 {
|
|
|
delete(update, "winnerorder") //winnerorder不需要更新到bindding表,删除
|
|
|
- arr = append(arr, []map[string]interface{}{
|
|
|
- {"_id": tmp["_id"]},
|
|
|
+ updateBiddingPool <- []map[string]interface{}{{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
{"$set": update},
|
|
|
- })
|
|
|
- }
|
|
|
- if len(arr) >= BulkSize-1 {
|
|
|
- mgo.UpdateBulkAll(db, c, arr...)
|
|
|
- arr = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- if len(arrEs) >= BulkSize-1 {
|
|
|
- tmps := arrEs
|
|
|
- if StopFlag {
|
|
|
- qutil.Debug("es队列紧张,暂停10s执行")
|
|
|
- time.Sleep(time.Second * 10)
|
|
|
}
|
|
|
- elastic.BulkSave(index, itype, &tmps, true)
|
|
|
- if other_index != "" && other_itype != "" {
|
|
|
- elastic.BulkSave(other_index, other_itype, &tmps, true)
|
|
|
- }
|
|
|
- if len(multiIndex) == 2 {
|
|
|
- elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
|
|
|
- }
|
|
|
- arrEs = []map[string]interface{}{}
|
|
|
}
|
|
|
- UpdatesLock.Unlock()
|
|
|
- if n%100 == 0 {
|
|
|
- log.Println("current:", n)
|
|
|
+ if n%200 == 0 {
|
|
|
+ util.Debug("current:", n)
|
|
|
}
|
|
|
tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- UpdatesLock.Lock()
|
|
|
- if len(arr) > 0 {
|
|
|
- mgo.UpdateBulkAll(db, c, arr...)
|
|
|
- }
|
|
|
- if len(arrEs) > 0 {
|
|
|
- tmps := arrEs
|
|
|
- if StopFlag {
|
|
|
- qutil.Debug("es队列紧张,暂停10s执行")
|
|
|
- time.Sleep(time.Second * 10)
|
|
|
- }
|
|
|
- elastic.BulkSave(index, itype, &tmps, true)
|
|
|
- if other_index != "" && other_itype != "" {
|
|
|
- bidding_other_es.BulkSave(other_index, other_itype, &tmps, true)
|
|
|
- }
|
|
|
- if len(multiIndex) == 2 {
|
|
|
- elastic.BulkSave(multiIndex[0], multiIndex[1], &tmps, true)
|
|
|
- }
|
|
|
- }
|
|
|
- UpdatesLock.Unlock()
|
|
|
return n1, n2
|
|
|
}
|
|
|
|
|
@@ -515,7 +241,7 @@ var MSG_SERVER = "123.56.236.148:7070"
|
|
|
var DesLen = 120
|
|
|
|
|
|
func inits() {
|
|
|
- ser := qutil.ObjToString(Sysconfig["msg_server"])
|
|
|
+ ser := util.ObjToString(Sysconfig["msg_server"])
|
|
|
if ser != "" {
|
|
|
MSG_SERVER = ser
|
|
|
}
|
|
@@ -525,7 +251,7 @@ func inits() {
|
|
|
MsgServerAddr: MSG_SERVER,
|
|
|
CanHandleEvents: []int{},
|
|
|
OnConnectSuccess: func() {
|
|
|
- log.Println("c.")
|
|
|
+ util.Debug("剑鱼关键词 client")
|
|
|
},
|
|
|
ReadBufferSize: 10,
|
|
|
WriteBufferSize: 10,
|
|
@@ -534,18 +260,17 @@ func inits() {
|
|
|
|
|
|
}
|
|
|
|
|
|
-//var clientlock = &sync.Mutex{}
|
|
|
var keypool = make(chan bool, 1)
|
|
|
|
|
|
func DealInfo(obj, update *map[string]interface{}) {
|
|
|
- defer qutil.Catch()
|
|
|
+ defer util.Catch()
|
|
|
if (*obj)["keywords"] != nil && (*obj)["description"] != nil {
|
|
|
return
|
|
|
} else {
|
|
|
(*update)["keywords"] = ""
|
|
|
(*update)["description"] = ""
|
|
|
}
|
|
|
- title := qutil.ObjToString((*obj)["title"])
|
|
|
+ title := util.ObjToString((*obj)["title"])
|
|
|
var m [][]string
|
|
|
select {
|
|
|
case <-func() <-chan bool {
|
|
@@ -593,9 +318,9 @@ func DealInfo(obj, update *map[string]interface{}) {
|
|
|
(*update)["keywords"] = keywords
|
|
|
content := ""
|
|
|
if (*obj)["detail_bak"] != nil {
|
|
|
- content = qutil.ObjToString((*obj)["detail_bak"])
|
|
|
+ content = util.ObjToString((*obj)["detail_bak"])
|
|
|
} else {
|
|
|
- content = qutil.ObjToString((*obj)["detail"])
|
|
|
+ content = util.ObjToString((*obj)["detail"])
|
|
|
}
|
|
|
//内容替换
|
|
|
content = strings.Replace(content, " ", "", -1)
|
|
@@ -606,7 +331,6 @@ func DealInfo(obj, update *map[string]interface{}) {
|
|
|
if strings.HasPrefix(content, ",") {
|
|
|
content = content[1:]
|
|
|
}
|
|
|
- //log.Println(content)
|
|
|
tc := []rune(content)
|
|
|
ltc := len(tc)
|
|
|
description := content
|
|
@@ -618,104 +342,51 @@ func DealInfo(obj, update *map[string]interface{}) {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-// 预测结果时间
|
|
|
-func YuceEndtime(tmp map[string]interface{}) {
|
|
|
- flag := true
|
|
|
- scope := []string{"服务采购_法律咨询", "服务采购_会计", "服务采购_物业", "服务采购_审计", "服务采购_安保", "服务采购_仓储物流",
|
|
|
- "服务采购_广告宣传印刷"}
|
|
|
- subscopeclass := qutil.ObjToString(tmp["s_subscopeclass"])
|
|
|
- for _, v := range scope {
|
|
|
- if strings.Contains(subscopeclass, v) {
|
|
|
- flag = false
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
- if flag {
|
|
|
- return
|
|
|
- }
|
|
|
- subtype := qutil.ObjToString(tmp["subtype"])
|
|
|
- if subtype == "成交" || subtype == "合同" {
|
|
|
- // yucestarttime、yuceendtime
|
|
|
- yucestarttime, yuceendtime := int64(0), int64(0)
|
|
|
- // 项目周期中
|
|
|
- if qutil.ObjToString(tmp["projectperiod"]) != "" {
|
|
|
- dateStr := date1.FindStringSubmatch(qutil.ObjToString(tmp["projectperiod"]))
|
|
|
- if len(dateStr) == 2 {
|
|
|
- sdate := FormatDateStr(dateStr[0])
|
|
|
- edate := FormatDateStr(dateStr[1])
|
|
|
- if sdate < edate && sdate != 0 && edate != 0 {
|
|
|
- yucestarttime = sdate
|
|
|
- yuceendtime = edate
|
|
|
+// @Description tmp修改索引,update 修改bidding表,extractM修改抽取表
|
|
|
+// @Author J 2022/6/10 10:29 AM
|
|
|
+func TypeMethod(tmp, update, extractM map[string]interface{}) {
|
|
|
+ if jyData, ok := tmp["jyfb_data"].(map[string]interface{}); ok {
|
|
|
+ if t := util.ObjToString(jyData["type"]); t != "" {
|
|
|
+ switch t {
|
|
|
+ //case "采购信息":
|
|
|
+ case "招标公告":
|
|
|
+ if util.ObjToString(tmp["toptype"]) != "招标" {
|
|
|
+ tmp["toptype"] = "招标"
|
|
|
+ update["toptype"] = "招标"
|
|
|
+ extractM["toptype"] = "招标"
|
|
|
+ delete(tmp, "subtype")
|
|
|
+ delete(update, "subtype")
|
|
|
}
|
|
|
- }
|
|
|
- }
|
|
|
- if yucestarttime > 0 && yuceendtime > yucestarttime {
|
|
|
- tmp["yuceendtime"] = yuceendtime
|
|
|
- return
|
|
|
- }
|
|
|
- // 预测开始时间 合同签订日期
|
|
|
- if yucestarttime == 0 {
|
|
|
- if qutil.IntAll(tmp["signaturedate"]) <= 0 {
|
|
|
- if qutil.IntAll(tmp["publishtime"]) <= 0 {
|
|
|
- return
|
|
|
- } else {
|
|
|
- yucestarttime = qutil.Int64All(tmp["publishtime"])
|
|
|
+ case "采购意向":
|
|
|
+ if util.ObjToString(tmp["toptype"]) != "采购意向" {
|
|
|
+ tmp["toptype"] = "采购意向"
|
|
|
+ tmp["subtype"] = "采购意向"
|
|
|
+ update["toptype"] = "采购意向"
|
|
|
+ update["subtype"] = "采购意向"
|
|
|
+ extractM["toptype"] = "采购意向"
|
|
|
+ extractM["subtype"] = "采购意向"
|
|
|
+ }
|
|
|
+ case "招标预告":
|
|
|
+ if util.ObjToString(tmp["toptype"]) != "预告" {
|
|
|
+ tmp["toptype"] = "预告"
|
|
|
+ update["toptype"] = "预告"
|
|
|
+ extractM["toptype"] = "预告"
|
|
|
+ delete(tmp, "subtype")
|
|
|
+ delete(update, "subtype")
|
|
|
+ }
|
|
|
+ case "招标结果":
|
|
|
+ if util.ObjToString(tmp["toptype"]) != "结果" {
|
|
|
+ tmp["toptype"] = "结果"
|
|
|
+ update["toptype"] = "结果"
|
|
|
+ extractM["toptype"] = "结果"
|
|
|
+ delete(tmp, "subtype")
|
|
|
+ delete(update, "subtype")
|
|
|
}
|
|
|
- } else {
|
|
|
- yucestarttime = qutil.Int64All(tmp["signaturedate"])
|
|
|
- }
|
|
|
- }
|
|
|
- // 预测结束时间
|
|
|
- if yucestarttime > 0 && yuceendtime == 0 {
|
|
|
- if qutil.IntAll(tmp["project_duration"]) > 0 && qutil.ObjToString(tmp["project_timeunit"]) != "" {
|
|
|
- yuceendtime = YcEndTime(yucestarttime, qutil.IntAll(tmp["project_duration"]), qutil.ObjToString(tmp["project_timeunit"]))
|
|
|
- tmp["yuceendtime"] = yuceendtime
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func YcEndTime(starttime int64, num int, unit string) int64 {
|
|
|
- yuceendtime := int64(0)
|
|
|
- if unit == "日历天" || unit == "天" || unit == "日" {
|
|
|
- yuceendtime = starttime + int64(num*86400)
|
|
|
- } else if unit == "周" {
|
|
|
- yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num*7).Unix()
|
|
|
- } else if unit == "月" {
|
|
|
- yuceendtime = time.Unix(starttime, 0).AddDate(0, num, 0).Unix()
|
|
|
- } else if unit == "年" {
|
|
|
- yuceendtime = time.Unix(starttime, 0).AddDate(num, 0, 0).Unix()
|
|
|
- } else if unit == "工作日" {
|
|
|
- n := num / 7 * 2
|
|
|
- yuceendtime = time.Unix(starttime, 0).AddDate(0, 0, num+n).Unix()
|
|
|
- }
|
|
|
- return yuceendtime
|
|
|
-}
|
|
|
-
|
|
|
-func FormatDateStr(ds string) int64 {
|
|
|
-
|
|
|
- ds = strings.Replace(ds, "年", "-", -1)
|
|
|
- ds = strings.Replace(ds, "月", "-", -1)
|
|
|
- ds = strings.Replace(ds, "日", "", -1)
|
|
|
- ds = strings.Replace(ds, "/", "-", -1)
|
|
|
- ds = strings.Replace(ds, ".", "-", -1)
|
|
|
-
|
|
|
- location, err := time.ParseInLocation(qutil.Date_Short_Layout, ds, time.Local)
|
|
|
- if err != nil {
|
|
|
- qutil.Debug(err)
|
|
|
- return 0
|
|
|
- } else {
|
|
|
- return location.Unix()
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-type Request struct {
|
|
|
- InfoId string
|
|
|
-}
|
|
|
-type Response struct {
|
|
|
- Rep []map[string]interface{}
|
|
|
-}
|
|
|
-
|
|
|
// @Description rpc调用信息发布程序接口
|
|
|
// @Author J 2022/4/13 9:13 AM
|
|
|
func UdpMethod(id string) {
|
|
@@ -724,6 +395,6 @@ func UdpMethod(id string) {
|
|
|
"stype": "jyfb_data_over",
|
|
|
}
|
|
|
datas, _ := json.Marshal(mapinfo)
|
|
|
- qutil.Debug(JyUdpAddr, string(datas))
|
|
|
+ util.Debug(JyUdpAddr, string(datas))
|
|
|
_ = udpclient.WriteUdp(datas, mu.OP_TYPE_DATA, JyUdpAddr)
|
|
|
}
|