apple 7c1103fd31 局部变量-更新 5 years ago
..
README.md 0b706a3c3a 去除打印-历史 5 years ago
config.json 4799d43fcf 历史-任务-功能-备份 5 years ago
dataMethod.go d40d0f3d74 判重 合并 5 years ago
dataMethodHeavy.go d40d0f3d74 判重 合并 5 years ago
dataMethodMerge.go da2c483229 数据合并-相关功能 5 years ago
datamap.go 0b706a3c3a 去除打印-历史 5 years ago
main.go 7c1103fd31 局部变量-更新 5 years ago
mgo.go f60e2722b1 优化 5 years ago
udptaskmap.go 083ce1e3e3 项目迁移 6 years ago

README.md

{

"udpport": ":1785",
"dupdays": 5,
"mongodb": {
    "addr": "172.17.4.187:27083",
    "pool": 5,
    "db": "qfw",
    "extract": "result_file_20200410",
    "extract_back": "result_file_20200409",
    "site": {
        "dbname": "qfw",
        "coll": "site"
    }
},
"jkmail": {
    "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
    "api": "http://10.171.112.160:19281/_send/_mail"
},
"nextNode": [
    {
        "addr": "127.0.0.1",
        "port": 1783,
        "stype": "bidding",
        "memo": "创建招标数据索引new"
    }
],
"threads": 1,
"isMerger": true,
"lowHeavy":true,
"timingTask":false,
"timingSpanDay": 3,
"timingPubScope": 720,
"specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"

}

{

"udpport": ":17859",
"dupdays": 5,
"mongodb": {
    "addr": "192.168.3.207:27092",
    "pool": 5,
    "db": "extract_kf",
    "extract": "a_testbidding",
    "extract_back": "a_testbidding",
    "site": {
        "dbname": "extract_kf",
        "coll": "site"
    }
},
"jkmail": {
    "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
    "api": "http://10.171.112.160:19281/_send/_mail"
},
"nextNode": [
],
"threads": 1,
"isMerger": true,
"lowHeavy":true,
"timingTask":false,
"timingSpanDay": 3,
"timingPubScope": 720,
"specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"

}

"nextNode": [

                    {
                        "addr": "172.17.145.179",
                        "port": 1782,
                        "stype": "project",
                        "memo": "合并项目"
                    },
                    {
                        "addr": "127.0.0.1",
                        "port": 1783,
                        "stype": "bidding",
                        "memo": "创建招标数据索引new"
                    }
           ],

//定时任务--定时任务--定时任务--暂未用 func timedTaskDay() {

log.Println("部署定时任务")
c := cron.New()
c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() })
c.Start()
//timedTaskOnce()

} func timedTaskOnce() {

defer util.Catch()
log.Println("开始一次迁移任务")
movedata()
log.Println("开始一次任务判重")
//当前时间-8   -4 小时
now := time.Now()
log.Println(now)
preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
log.Println(preTime,curTime)
task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
between_time := curTime.Unix() - (86400 * timingPubScope)
log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
//区间id
q_start := map[string]interface{}{
    "_id": map[string]interface{}{
        "$gt": StringTOBsonId(task_sid),
        "$lte": StringTOBsonId(task_eid),
    },
}
//q_start = map[string]interface{}{
//  "_id": map[string]interface{}{
//      "$gte": StringTOBsonId("5f184cd552c1d9fbf84519d3"),
//      "$lte": StringTOBsonId("5f184d3852c1d9fbf8451a2a"),
//  },
//}

sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
pendAllArr:=[][]map[string]interface{}{}//待处理数组
dayArr := []map[string]interface{}{}
for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
    if num%10000 == 0 {
        log.Println("正序遍历:", num)
    }
    source := util.ObjToMap(tmp["jsondata"])
    if util.IntAll((*source)["sourcewebsite"]) == 1 {
        updateExtract = append(updateExtract, []map[string]interface{}{
            map[string]interface{}{
                "_id": tmp["_id"],
            },
            map[string]interface{}{
                "$set": map[string]interface{}{
                    "repeat": 1,
                    "dataging": 0,
                    "repeat_reason": "sourcewebsite为1 重复",
                },
            },
        })
        if len(updateExtract) > 50 {
            mgo.UpSertBulk(extract, updateExtract...)
            updateExtract = [][]map[string]interface{}{}
        }


        tmp = make(map[string]interface{})
        continue
    }

    //取-符合-发布时间X年内的数据
    if util.IntAll(tmp["dataging"]) == 1 {
        pubtime := util.Int64All(tmp["publishtime"])
        if pubtime > 0 && pubtime >= between_time {
            oknum++
            if deterTime==0 {
                log.Println("找到第一条符合条件的数据")
                deterTime = util.Int64All(tmp["publishtime"])
                dayArr = append(dayArr,tmp)
            }else {
                if pubtime-deterTime >timingSpanDay*86400 {
                    //新数组重新构建,当前组数据加到全部组数据
                    pendAllArr = append(pendAllArr,dayArr)
                    dayArr = []map[string]interface{}{}
                    deterTime = util.Int64All(tmp["publishtime"])
                    dayArr = append(dayArr,tmp)
                }else {
                    dayArr = append(dayArr,tmp)
                }
            }
        }else {
            //不在两年内的也清标记
            updateExtract = append(updateExtract, []map[string]interface{}{
                map[string]interface{}{
                    "_id": tmp["_id"],
                },
                map[string]interface{}{
                    "$set": map[string]interface{}{
                        "dataging": 0,
                    },
                },
            })
            if len(updateExtract) > 50 {
                mgo.UpSertBulk(extract, updateExtract...)
                updateExtract = [][]map[string]interface{}{}
            }

        }
    }
    tmp = make(map[string]interface{})
}


//批量更新标记
if len(updateExtract) > 0 {
    mgo.UpSertBulk(extract, updateExtract...)
    updateExtract = [][]map[string]interface{}{}
}

if len(dayArr)>0 {
    pendAllArr = append(pendAllArr,dayArr)
    dayArr = []map[string]interface{}{}
}

log.Println("查询数量:",num,"符合条件:",oknum)

if len(pendAllArr) <= 0 {
    log.Println("没找到dataging==1的数据")
    return
}

//测试分组数量是否正确
testNum:=0
for k,v:=range pendAllArr {
    log.Println("第",k,"组--","数量:",len(v))
    testNum = testNum+len(v)
}
log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)

n, repeateN := 0, 0
pool := make(chan bool, 4)
wg := &sync.WaitGroup{}
for k,v:=range pendAllArr { //每组结束更新一波数据
    pool <- true
    wg.Add(1)
    go func(k int,v []map[string]interface{}) {
        defer func() {
            <-pool
            wg.Done()
        }()
        //构建当前组的数据池
        log.Println("构建第",k,"组---(数据池)")
        //当前组的第一个发布时间
        first_pt :=util.Int64All(v[0]["publishtime"])
        curTM := TimedTaskDatamap(dupdays, first_pt,int(k))
        log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
        n = n+len(v)
        log.Println("统计目前总数量:",n,"重复数量:",repeateN)
        for _,tmp:=range v {
            info := NewInfo(tmp)
            if !LowHeavy { //是否进行低质量数据判重
                if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
                    log.Println("无效数据")
                    updateExtract = append(updateExtract, []map[string]interface{}{
                        map[string]interface{}{
                            "_id": tmp["_id"],
                        },
                        map[string]interface{}{
                            "$set": map[string]interface{}{
                                "repeat":   -1, //无效数据标签
                                "dataging": 0,
                            },
                        },
                    })
                    if len(updateExtract) > 50 {
                        mgo.UpSertBulk(extract, updateExtract...)
                        updateExtract = [][]map[string]interface{}{}
                    }
                    return
                }
            }
            b, source, reason := curTM.check(info)
            if b { //有重复,生成更新语句,更新抽取和更新招标
                log.Println("判重结果", b, reason,"目标id",info.id)
                repeateN++
                //重复数据打标签
                updateExtract = append(updateExtract, []map[string]interface{}{
                    map[string]interface{}{
                        "_id": tmp["_id"],
                    },
                    map[string]interface{}{
                        "$set": map[string]interface{}{
                            "repeat":        1,
                            "repeat_reason": reason,
                            "repeat_id":     source.id,
                            "dataging":      0,
                        },
                    },
                })
            }else {
                updateExtract = append(updateExtract, []map[string]interface{}{
                    map[string]interface{}{
                        "_id": tmp["_id"],
                    },
                    map[string]interface{}{
                        "$set": map[string]interface{}{
                            "dataging": 0,//符合条件的都为dataging==0
                        },
                    },
                })
            }
            if len(updateExtract) > 50 {
                mgo.UpSertBulk(extract, updateExtract...)
                updateExtract = [][]map[string]interface{}{}
            }
        }
    }(k,v)

    //每组数据结束-更新数据
    if len(updateExtract) > 0 {
        mgo.UpSertBulk(extract, updateExtract...)
        updateExtract = [][]map[string]interface{}{}
    }
}



if len(updateExtract) > 0 {
    mgo.UpSertBulk(extract, updateExtract...)
    updateExtract = [][]map[string]interface{}{}
}
log.Println("this timeTask over.", n, "repeateN:", repeateN)

//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
if n > repeateN {
    for _, to := range nextNode {
        next_sid := util.BsonIdToSId(task_sid)
        next_eid := util.BsonIdToSId(task_eid)
        key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
        by, _ := json.Marshal(map[string]interface{}{
            "gtid":  next_sid,
            "lteid": next_eid,
            "stype": util.ObjToString(to["stype"]),
            "key":   key,
        })
        addr := &net.UDPAddr{
            IP:   net.ParseIP(to["addr"].(string)),
            Port: util.IntAll(to["port"]),
        }
        node := &udpNode{by, addr, time.Now().Unix(), 0}
        udptaskmap.Store(key, node)
        udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
    }
}

}

//迁移数据dupdays+5之前的数据 func movedata() {

sess := mgo.GetMgoConn()
defer mgo.DestoryMongoConn(sess)
year, month, day := time.Now().Date()
now:=time.Now()
move_time := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local).Unix()
q := map[string]interface{}{
    "comeintime": map[string]interface{}{
        "$lt": move_time,
    },
}
log.Println(q)
it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
index := 0
for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
    mgo.Save(extract_back, tmp)
    tmp = map[string]interface{}{}
    if index%1000 == 0 {
        log.Println("index", index)
    }
}
log.Println("save to", extract_back, " ok index", index)
qv := map[string]interface{}{
    "comeintime": map[string]interface{}{
        "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
    },
}
delnum := mgo.Delete(extract, qv)
log.Println("remove from ", extract, delnum)

}