|
4 жил өмнө | |
---|---|---|
.. | ||
README.md | 4 жил өмнө | |
config.json | 4 жил өмнө | |
dataMethod.go | 4 жил өмнө | |
dataMethodHeavy.go | 4 жил өмнө | |
dataMethodMerge.go | 4 жил өмнө | |
datamap.go | 4 жил өмнө | |
main.go | 4 жил өмнө | |
mgo.go | 4 жил өмнө | |
udptaskmap.go | 4 жил өмнө |
{
"udpport": ":19097",
"dupdays": 7,
"mongodb": {
"addr": "172.17.4.85:27080",
"pool": 10,
"db": "qfw",
"extract": "result_20200715",
"extract_back": "result_20200714",
"site": {
"dbname": "qfw",
"coll": "site"
}
},
"task_mongodb": {
"task_addrName": "172.17.4.187:27081",
"task_dbName": "qfw",
"task_collName": "ocr_flie_over",
"pool": 5
},
"jkmail": {
"to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
"api": "http://10.171.112.160:19281/_send/_mail"
},
"nextNode": [
{
"addr": "172.17.4.194",
"port": 1782,
"stype": "project",
"memo": "合并项目"
},
{
"addr": "127.0.0.1",
"port": 1783,
"stype": "bidding",
"memo": "创建招标数据索引new"
}
],
"threads": 1,
"isMerger": false,
"lowHeavy":true,
"timingTask":false,
"timingSpanDay": 5,
"timingPubScope": 720,
"specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
}
mgo = &MongodbSim{
MongodbAddr: "172.17.4.187:27083",
DbName: "qfw",
Size: 10,
}
mgo.InitPool()
return
func moveTimeoutData() {
log.Println("部署迁移定时任务")
c := cron.New()
c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
c.Start()
}
func moveOnceTimeOut() {
log.Println("执行一次迁移超时数据")
sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
now:=time.Now()
move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix()
q := map[string]interface{}{
"comeintime": map[string]interface{}{
"$lt": move_time,
},
}
log.Println(q)
it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
index := 0
for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
if index%10000 == 0 {
log.Println("index", index)
}
del_id:=BsonTOStringId(tmp["_id"])
mgo.Save("result_20200713", tmp)
mgo.DeleteById("result_20200714",del_id)
tmp = map[string]interface{}{}
}
log.Println("save and delete", " ok index", index)
}
{
"udpport": ":1785",
"dupdays": 7,
"mongodb": {
"addr": "172.17.4.85:27080",
"pool": 10,
"db": "qfw",
"extract": "result_20200715",
"extract_back": "result_20200714",
"site": {
"dbname": "qfw",
"coll": "site"
}
},
"jkmail": {
"to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
"api": "http://172.17.145.179:19281/_send/_mail"
},
"nextNode": [
{
"addr": "127.0.0.1",
"port": 1783,
"stype": "bidding",
"memo": "创建招标数据索引new"
}
],
"threads": 1,
"isMerger": false,
"lowHeavy":true,
"timingTask":false,
"timingSpanDay": 5,
"timingPubScope": 720,
"specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
}
func repairHistory() {
defer util.Catch()
log.Println("执行修复程序")
sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
q:=map[string]interface{}{}
between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
//开始判重
q = map[string]interface{}{
"_id": map[string]interface{}{
"$gt": StringTOBsonId("5f15bf800000000000000000"),
"$lte": StringTOBsonId("5f2375b2a120e23754be1039"),
},
}
log.Println("历史判重查询条件:",q,"时间:", between_time)
it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
pendAllArr:=[][]map[string]interface{}{}//待处理数组
dayArr := []map[string]interface{}{}
for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
if num%10000 == 0 {
log.Println("正序遍历:", num)
}
source := util.ObjToMap(tmp["jsondata"])
if util.IntAll((*source)["sourcewebsite"]) == 1 {
updateExtract = append(updateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"repeat": 1,
"dataging": 0,
"repeat_reason": "sourcewebsite为1 重复",
},
},
})
if len(updateExtract) > 50 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
tmp = make(map[string]interface{})
continue
}
//取-符合-发布时间X年内的数据
if util.IntAll(tmp["dataging"]) == 1 {
pubtime := util.Int64All(tmp["publishtime"])
if pubtime > 0 && pubtime >= between_time {
oknum++
if deterTime==0 {
log.Println("找到第一条符合条件的数据")
deterTime = util.Int64All(tmp["publishtime"])
dayArr = append(dayArr,tmp)
}else {
if pubtime-deterTime >timingSpanDay*86400 {
//新数组重新构建,当前组数据加到全部组数据
pendAllArr = append(pendAllArr,dayArr)
dayArr = []map[string]interface{}{}
deterTime = util.Int64All(tmp["publishtime"])
dayArr = append(dayArr,tmp)
}else {
dayArr = append(dayArr,tmp)
}
}
}else {
//不在两年内的也清标记
updateExtract = append(updateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"dataging": 0,
},
},
})
if len(updateExtract) > 50 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
}
}
tmp = make(map[string]interface{})
}
//批量更新标记
if len(updateExtract) > 0 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
if len(dayArr)>0 {
pendAllArr = append(pendAllArr,dayArr)
dayArr = []map[string]interface{}{}
}
log.Println("查询数量:",num,"符合条件:",oknum)
if len(pendAllArr) <= 0 {
log.Println("没找到dataging==1的数据")
}
//测试分组数量是否正确
testNum:=0
for k,v:=range pendAllArr {
log.Println("第",k,"组--","数量:",len(v))
testNum = testNum+len(v)
}
log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
n, repeateN := 0, 0
pool := make(chan bool, 2)
wg := &sync.WaitGroup{}
for k,v:=range pendAllArr { //每组结束更新一波数据
pool <- true
wg.Add(1)
go func(k int, v []map[string]interface{}) {
defer func() {
<-pool
wg.Done()
}()
//每组临时数组 - 互不干扰
groupUpdateExtract := [][]map[string]interface{}{}
//构建当前组的数据池
log.Println("构建第", k, "组---(数据池)")
//当前组的第一个发布时间
first_pt := util.Int64All(v[len(v)-1]["publishtime"])
curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
n = n + len(v)
log.Println("统计目前总数量:", n, "重复数量:", repeateN)
for _, tmp := range v {
info := NewInfo(tmp)
if !LowHeavy { //是否进行低质量数据判重
if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
log.Println("无效数据")
groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"repeat": -1, //无效数据标签
"dataging": 0,
},
},
})
if len(groupUpdateExtract) > 50 {
mgo.UpSertBulk(extract, groupUpdateExtract...)
groupUpdateExtract = [][]map[string]interface{}{}
}
return
}
}
b, source, reason := curTM.check(info)
if b { //有重复,生成更新语句,更新抽取和更新招标
repeateN++
//重复数据打标签
groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"repeat": 1,
"repeat_reason": reason,
"repeat_id": source.id,
"dataging": 0,
},
},
})
} else {
groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"dataging": 0, //符合条件的都为dataging==0
},
},
})
}
if len(groupUpdateExtract) > 50 {
mgo.UpSertBulk(extract, groupUpdateExtract...)
groupUpdateExtract = [][]map[string]interface{}{}
}
}
//每组数据结束-更新数据
if len(groupUpdateExtract) > 0 {
mgo.UpSertBulk(extract, groupUpdateExtract...)
}
}(k, v)
}
wg.Wait()
time.Sleep(30 * time.Second)
log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid)
log.Println("修复结束")
}
//if !LowHeavy { //是否进行低质量数据判重
// if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
// updateExtract = append(updateExtract, []map[string]interface{}{
// map[string]interface{}{
// "_id": tmp["_id"],
// },
// map[string]interface{}{
// "$set": map[string]interface{}{
// "repeat": -1, //无效数据标签
// },
// },
// })
// if len(updateExtract) >= 200 {
// mgo.UpSertBulk(extract, updateExtract...)
// updateExtract = [][]map[string]interface{}{}
// }
// return
// }
//}
if !LowHeavy { //是否进行低质量数据判重
if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
log.Println("无效数据")
updatelock.Lock()
groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"repeat": -1, //无效数据标签
"dataging": 0,
},
},
})
if len(groupUpdateExtract) > 200 {
mgo.UpSertBulk(extract, groupUpdateExtract...)
groupUpdateExtract = [][]map[string]interface{}{}
}
updatelock.Unlock()
return
}
}
//是否合并-低质量数据不合并
if isMerger && !strings.Contains(reason,"低质量"){
newData, update_map ,isReplace := mergeDataFields(source, info)
if isReplace {//替换-数据池
fmt.Println("合并更新的id:",source.id)
//数据池 - 替换
DM.replacePoolData(newData)
//mongo更新 - 具体字段 - merge
mgo.UpdateById(extract,source.id,update_map)
//发udp 更新索引
//for _, to := range nextNode {
// key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
// by, _ := json.Marshal(map[string]interface{}{
// "gtid": source.id,
// "lteid": source.id,
// "stype": "biddingall",
// "key": key,
// })
// addr := &net.UDPAddr{
// IP: net.ParseIP(to["addr"].(string)),
// Port: util.IntAll(to["port"]),
// }
// node := &udpNode{by, addr, time.Now().Unix(), 0}
// udptaskmap.Store(key, node)
// udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
//}
}
}