|
5 years ago | |
---|---|---|
.. | ||
README.md | 5 years ago | |
config.json | 5 years ago | |
dataMethod.go | 5 years ago | |
dataMethodHeavy.go | 5 years ago | |
dataMethodMerge.go | 5 years ago | |
datamap.go | 5 years ago | |
main.go | 5 years ago | |
mgo.go | 5 years ago | |
udptaskmap.go | 6 years ago |
{
"udpport": ":1785",
"dupdays": 5,
"mongodb": {
"addr": "172.17.4.187:27083",
"pool": 5,
"db": "qfw",
"extract": "result_file_20200410",
"extract_back": "result_file_20200409",
"site": {
"dbname": "qfw",
"coll": "site"
}
},
"jkmail": {
"to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
"api": "http://10.171.112.160:19281/_send/_mail"
},
"nextNode": [
{
"addr": "127.0.0.1",
"port": 1783,
"stype": "bidding",
"memo": "创建招标数据索引new"
}
],
"threads": 1,
"isMerger": true,
"lowHeavy":true,
"timingTask":false,
"timingSpanDay": 3,
"timingPubScope": 720,
"specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
}
{
"udpport": ":17859",
"dupdays": 5,
"mongodb": {
"addr": "192.168.3.207:27092",
"pool": 5,
"db": "extract_kf",
"extract": "a_testbidding",
"extract_back": "a_testbidding",
"site": {
"dbname": "extract_kf",
"coll": "site"
}
},
"jkmail": {
"to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
"api": "http://10.171.112.160:19281/_send/_mail"
},
"nextNode": [
],
"threads": 1,
"isMerger": true,
"lowHeavy":true,
"timingTask":false,
"timingSpanDay": 3,
"timingPubScope": 720,
"specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
}
"nextNode": [
{
"addr": "172.17.145.179",
"port": 1782,
"stype": "project",
"memo": "合并项目"
},
{
"addr": "127.0.0.1",
"port": 1783,
"stype": "bidding",
"memo": "创建招标数据索引new"
}
],
//定时任务--定时任务--定时任务--暂未用 func timedTaskDay() {
log.Println("部署定时任务")
c := cron.New()
c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() })
c.Start()
//timedTaskOnce()
} func timedTaskOnce() {
defer util.Catch()
log.Println("开始一次迁移任务")
movedata()
log.Println("开始一次任务判重")
//当前时间-8 -4 小时
now := time.Now()
log.Println(now)
preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
log.Println(preTime,curTime)
task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
between_time := curTime.Unix() - (86400 * timingPubScope)
log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
//区间id
q_start := map[string]interface{}{
"_id": map[string]interface{}{
"$gt": StringTOBsonId(task_sid),
"$lte": StringTOBsonId(task_eid),
},
}
//q_start = map[string]interface{}{
// "_id": map[string]interface{}{
// "$gte": StringTOBsonId("5f184cd552c1d9fbf84519d3"),
// "$lte": StringTOBsonId("5f184d3852c1d9fbf8451a2a"),
// },
//}
sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
pendAllArr:=[][]map[string]interface{}{}//待处理数组
dayArr := []map[string]interface{}{}
for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
if num%10000 == 0 {
log.Println("正序遍历:", num)
}
source := util.ObjToMap(tmp["jsondata"])
if util.IntAll((*source)["sourcewebsite"]) == 1 {
updateExtract = append(updateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"repeat": 1,
"dataging": 0,
"repeat_reason": "sourcewebsite为1 重复",
},
},
})
if len(updateExtract) > 50 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
tmp = make(map[string]interface{})
continue
}
//取-符合-发布时间X年内的数据
if util.IntAll(tmp["dataging"]) == 1 {
pubtime := util.Int64All(tmp["publishtime"])
if pubtime > 0 && pubtime >= between_time {
oknum++
if deterTime==0 {
log.Println("找到第一条符合条件的数据")
deterTime = util.Int64All(tmp["publishtime"])
dayArr = append(dayArr,tmp)
}else {
if pubtime-deterTime >timingSpanDay*86400 {
//新数组重新构建,当前组数据加到全部组数据
pendAllArr = append(pendAllArr,dayArr)
dayArr = []map[string]interface{}{}
deterTime = util.Int64All(tmp["publishtime"])
dayArr = append(dayArr,tmp)
}else {
dayArr = append(dayArr,tmp)
}
}
}else {
//不在两年内的也清标记
updateExtract = append(updateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"dataging": 0,
},
},
})
if len(updateExtract) > 50 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
}
}
tmp = make(map[string]interface{})
}
//批量更新标记
if len(updateExtract) > 0 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
if len(dayArr)>0 {
pendAllArr = append(pendAllArr,dayArr)
dayArr = []map[string]interface{}{}
}
log.Println("查询数量:",num,"符合条件:",oknum)
if len(pendAllArr) <= 0 {
log.Println("没找到dataging==1的数据")
return
}
//测试分组数量是否正确
testNum:=0
for k,v:=range pendAllArr {
log.Println("第",k,"组--","数量:",len(v))
testNum = testNum+len(v)
}
log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
n, repeateN := 0, 0
pool := make(chan bool, 4)
wg := &sync.WaitGroup{}
for k,v:=range pendAllArr { //每组结束更新一波数据
pool <- true
wg.Add(1)
go func(k int,v []map[string]interface{}) {
defer func() {
<-pool
wg.Done()
}()
//构建当前组的数据池
log.Println("构建第",k,"组---(数据池)")
//当前组的第一个发布时间
first_pt :=util.Int64All(v[0]["publishtime"])
curTM := TimedTaskDatamap(dupdays, first_pt,int(k))
log.Println("开始遍历判重第",k,"组 共计数量:",len(v))
n = n+len(v)
log.Println("统计目前总数量:",n,"重复数量:",repeateN)
for _,tmp:=range v {
info := NewInfo(tmp)
if !LowHeavy { //是否进行低质量数据判重
if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
log.Println("无效数据")
updateExtract = append(updateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"repeat": -1, //无效数据标签
"dataging": 0,
},
},
})
if len(updateExtract) > 50 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
return
}
}
b, source, reason := curTM.check(info)
if b { //有重复,生成更新语句,更新抽取和更新招标
log.Println("判重结果", b, reason,"目标id",info.id)
repeateN++
//重复数据打标签
updateExtract = append(updateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"repeat": 1,
"repeat_reason": reason,
"repeat_id": source.id,
"dataging": 0,
},
},
})
}else {
updateExtract = append(updateExtract, []map[string]interface{}{
map[string]interface{}{
"_id": tmp["_id"],
},
map[string]interface{}{
"$set": map[string]interface{}{
"dataging": 0,//符合条件的都为dataging==0
},
},
})
}
if len(updateExtract) > 50 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
}
}(k,v)
//每组数据结束-更新数据
if len(updateExtract) > 0 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
}
if len(updateExtract) > 0 {
mgo.UpSertBulk(extract, updateExtract...)
updateExtract = [][]map[string]interface{}{}
}
log.Println("this timeTask over.", n, "repeateN:", repeateN)
//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
if n > repeateN {
for _, to := range nextNode {
next_sid := util.BsonIdToSId(task_sid)
next_eid := util.BsonIdToSId(task_eid)
key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
by, _ := json.Marshal(map[string]interface{}{
"gtid": next_sid,
"lteid": next_eid,
"stype": util.ObjToString(to["stype"]),
"key": key,
})
addr := &net.UDPAddr{
IP: net.ParseIP(to["addr"].(string)),
Port: util.IntAll(to["port"]),
}
node := &udpNode{by, addr, time.Now().Unix(), 0}
udptaskmap.Store(key, node)
udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
}
}
}
//迁移数据dupdays+5之前的数据 func movedata() {
sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
year, month, day := time.Now().Date()
now:=time.Now()
move_time := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local).Unix()
q := map[string]interface{}{
"comeintime": map[string]interface{}{
"$lt": move_time,
},
}
log.Println(q)
it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
index := 0
for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
mgo.Save(extract_back, tmp)
tmp = map[string]interface{}{}
if index%1000 == 0 {
log.Println("index", index)
}
}
log.Println("save to", extract_back, " ok index", index)
qv := map[string]interface{}{
"comeintime": map[string]interface{}{
"$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
},
}
delnum := mgo.Delete(extract, qv)
log.Println("remove from ", extract, delnum)
}