|
@@ -0,0 +1,804 @@
|
|
|
+{
|
|
|
+"udpport": ":1785",
|
|
|
+"dupdays": 7,
|
|
|
+"mongodb": {
|
|
|
+"addr": "172.17.4.85:27080",
|
|
|
+"pool": 10,
|
|
|
+"db": "qfw",
|
|
|
+"extract": "result_20220219",
|
|
|
+"extract_back": "result_20220218",
|
|
|
+"extract_log": "result_replace_log"
|
|
|
+},
|
|
|
+"task_mongodb": {
|
|
|
+"task_addr": "172.17.4.187:27082,172.17.145.163:27083",
|
|
|
+"task_db": "qfw",
|
|
|
+"task_coll": "bidding_processing_ids",
|
|
|
+"task_bidding": "bidding",
|
|
|
+"task_pool": 5
|
|
|
+},
|
|
|
+"spider_mongodb": {
|
|
|
+"spider_addr": "172.17.4.87:27080",
|
|
|
+"spider_db": "editor",
|
|
|
+"spider_coll": "site",
|
|
|
+"spider_pool": 5
|
|
|
+},
|
|
|
+"userName": "zhengkun",
|
|
|
+"passWord": "zk@123123",
|
|
|
+"jkmail": {
|
|
|
+"to": "zhengkun@topnet.net.cn,wangjianghan@topnet.net.cn",
|
|
|
+"api": "http://172.17.145.179:19281/_send/_mail"
|
|
|
+},
|
|
|
+"nextNode": [
|
|
|
+{
|
|
|
+"addr": "172.17.4.196",
|
|
|
+"port": 1787,
|
|
|
+"stype": "bidding",
|
|
|
+"memo": "同步程序id段udp"
|
|
|
+}
|
|
|
+],
|
|
|
+"jyfb_data": [
|
|
|
+"a_jyxxfbpt_gg"
|
|
|
+],
|
|
|
+"threads": 4,
|
|
|
+"lowHeavy":true,
|
|
|
+"timingTask":false,
|
|
|
+"timingSpanDay": 5,
|
|
|
+"timingPubScope": 1440,
|
|
|
+"specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
|
|
|
+"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
|
|
|
+"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
|
|
|
+"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
|
|
|
+"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+mgo = &MongodbSim{
|
|
|
+ MongodbAddr: "172.17.4.187:27083",
|
|
|
+ DbName: "qfw",
|
|
|
+ Size: 10,
|
|
|
+ }
|
|
|
+mgo.InitPool()
|
|
|
+ return
|
|
|
+
|
|
|
+func moveTimeoutData() {
|
|
|
+ log.Println("部署迁移定时任务")
|
|
|
+ c := cron.New()
|
|
|
+ c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
|
|
|
+ c.Start()
|
|
|
+}
|
|
|
+
|
|
|
+func moveOnceTimeOut() {
|
|
|
+ log.Println("执行一次迁移超时数据")
|
|
|
+
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ now:=time.Now()
|
|
|
+ move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix()
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$lt": move_time,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Println(q)
|
|
|
+ it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
|
|
|
+ index := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
+ if index%10000 == 0 {
|
|
|
+ log.Println("index", index)
|
|
|
+ }
|
|
|
+ del_id:=BsonTOStringId(tmp["_id"])
|
|
|
+ mgo.Save("result_20200713", tmp)
|
|
|
+ mgo.DeleteById("result_20200714",del_id)
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ log.Println("save and delete", " ok index", index)
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+{
|
|
|
+ "udpport": ":1785",
|
|
|
+ "dupdays": 7,
|
|
|
+ "mongodb": {
|
|
|
+ "addr": "172.17.4.85:27080",
|
|
|
+ "pool": 10,
|
|
|
+ "db": "qfw",
|
|
|
+ "extract": "result_20200715",
|
|
|
+ "extract_back": "result_20200714",
|
|
|
+ "site": {
|
|
|
+ "dbname": "qfw",
|
|
|
+ "coll": "site"
|
|
|
+ }
|
|
|
+ },
|
|
|
+ "jkmail": {
|
|
|
+ "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
|
|
|
+ "api": "http://172.17.145.179:19281/_send/_mail"
|
|
|
+ },
|
|
|
+ "nextNode": [
|
|
|
+ {
|
|
|
+ "addr": "127.0.0.1",
|
|
|
+ "port": 1783,
|
|
|
+ "stype": "bidding",
|
|
|
+ "memo": "创建招标数据索引new"
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "threads": 1,
|
|
|
+ "lowHeavy":true,
|
|
|
+ "timingTask":false,
|
|
|
+ "timingSpanDay": 5,
|
|
|
+ "timingPubScope": 720,
|
|
|
+ "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
|
|
|
+ "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
|
|
|
+ "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
|
|
|
+ "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
|
|
|
+ "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
|
|
|
+}
|
|
|
+
|
|
|
+func historyTaskDay() {
|
|
|
+ defer util.Catch()
|
|
|
+
|
|
|
+ for {
|
|
|
+ start:=time.Now().Unix()
|
|
|
+
|
|
|
+ if gtid=="" {
|
|
|
+ log.Println("请传gtid,否则无法运行")
|
|
|
+ os.Exit(0)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if lteid!="" {
|
|
|
+ //先进行数据迁移
|
|
|
+ log.Println("开启一次迁移任务",gtid,lteid)
|
|
|
+ moveHistoryData(gtid,lteid)
|
|
|
+ gtid = lteid //替换数据
|
|
|
+ }
|
|
|
+
|
|
|
+ //查询表最后一个id
|
|
|
+ task_sess := task_mgo.GetMgoConn()
|
|
|
+ defer task_mgo.DestoryMongoConn(task_sess)
|
|
|
+ q:=map[string]interface{}{
|
|
|
+ "isused":true,
|
|
|
+ }
|
|
|
+ between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
|
|
|
+ it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
|
|
|
+ lteid = util.ObjToString(tmp["gtid"])
|
|
|
+ log.Println("查询的最后一个任务Id:",lteid)
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
|
|
|
+ time.Sleep(5 * time.Minute)
|
|
|
+
|
|
|
+ sess := mgo.GetMgoConn()//连接器
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ //开始判重
|
|
|
+ q = map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(gtid),
|
|
|
+ "$lte": StringTOBsonId(lteid),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Println("历史判重查询条件:",q,"时间:", between_time)
|
|
|
+ it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
+ num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
|
|
|
+ updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
|
|
|
+ pendAllArr:=[][]map[string]interface{}{}//待处理数组
|
|
|
+ dayArr := []map[string]interface{}{}
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
|
|
|
+ if num%10000 == 0 {
|
|
|
+ log.Println("正序遍历:", num)
|
|
|
+ }
|
|
|
+ source := util.ObjToMap(tmp["jsondata"])
|
|
|
+ if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
+ outnum++
|
|
|
+ updatelock.Lock()
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 1,
|
|
|
+ "dataging": 0,
|
|
|
+ "repeat_reason": "sourcewebsite为1 重复",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) >= 200 {
|
|
|
+ log.Println("sourcewebsite,批量更新")
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ //取-符合-发布时间X年内的数据
|
|
|
+ updatelock.Lock()
|
|
|
+ if util.IntAll(tmp["dataging"]) == 1 {
|
|
|
+ pubtime := util.Int64All(tmp["publishtime"])
|
|
|
+ if pubtime > 0 && pubtime >= between_time {
|
|
|
+ oknum++
|
|
|
+ if deterTime==0 {
|
|
|
+ log.Println("找到第一条符合条件的数据")
|
|
|
+ deterTime = util.Int64All(tmp["publishtime"])
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }else {
|
|
|
+ if pubtime-deterTime >timingSpanDay*86400 {
|
|
|
+ //新数组重新构建,当前组数据加到全部组数据
|
|
|
+ pendAllArr = append(pendAllArr,dayArr)
|
|
|
+ dayArr = []map[string]interface{}{}
|
|
|
+ deterTime = util.Int64All(tmp["publishtime"])
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }else {
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ outnum++
|
|
|
+ //不在两年内的也清标记
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "dataging": 0,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) >= 200 {
|
|
|
+ log.Println("不在周期内符合dataging==1,批量更新")
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ //批量更新标记
|
|
|
+ updatelock.Lock()
|
|
|
+
|
|
|
+ if len(updateExtract) > 0 {
|
|
|
+ log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+
|
|
|
+ if len(dayArr)>0 {
|
|
|
+ pendAllArr = append(pendAllArr,dayArr)
|
|
|
+ dayArr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("查询数量:",num,"符合条件:",oknum)
|
|
|
+
|
|
|
+ if len(pendAllArr) <= 0 {
|
|
|
+ log.Println("没找到dataging==1的数据")
|
|
|
+ }
|
|
|
+
|
|
|
+ //测试分组数量是否正确
|
|
|
+ testNum:=0
|
|
|
+ for k,v:=range pendAllArr {
|
|
|
+ log.Println("第",k,"组--","数量:",len(v))
|
|
|
+ testNum = testNum+len(v)
|
|
|
+ }
|
|
|
+ log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
|
|
|
+
|
|
|
+ n, repeateN := 0, 0
|
|
|
+ log.Println("线程数:",threadNum)
|
|
|
+ pool := make(chan bool, threadNum)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ for k,v:=range pendAllArr { //每组结束更新一波数据
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(k int, v []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ //每组临时数组 - 互不干扰
|
|
|
+ groupUpdateExtract := [][]map[string]interface{}{}
|
|
|
+ //
|
|
|
+ groupOtherExtract := [][]map[string]interface{}{}
|
|
|
+
|
|
|
+ //构建当前组的数据池
|
|
|
+ log.Println("构建第", k, "组---(数据池)")
|
|
|
+ //当前组的第一个发布时间
|
|
|
+ first_pt := util.Int64All(v[len(v)-1]["publishtime"])
|
|
|
+ curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
|
|
|
+ log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
|
|
|
+ n = n + len(v)
|
|
|
+ log.Println("统计目前总数量:", n, "重复数量:", repeateN)
|
|
|
+ for _, tmp := range v {
|
|
|
+ info := NewInfo(tmp)
|
|
|
+ b, source, reason := curTM.check(info)
|
|
|
+ if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
|
+ repeateN++
|
|
|
+ //重复数据打标签
|
|
|
+ repeat_ids:=source.repeat_ids
|
|
|
+ repeat_ids = append(repeat_ids,info.id)
|
|
|
+ source.repeat_ids = repeat_ids
|
|
|
+ //替换数据池-更新
|
|
|
+ DM.replacePoolData(source)
|
|
|
+ updatelock.Lock()
|
|
|
+
|
|
|
+
|
|
|
+ //更新数据源- 14 或者 15
|
|
|
+ //判断是否在当前段落
|
|
|
+ if judgeIsCurIds(gtid,lteid,source.id) {
|
|
|
+ groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": StringTOBsonId(source.id),
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat_ids": repeat_ids,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }else {
|
|
|
+ groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": StringTOBsonId(source.id),
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat_ids": repeat_ids,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 1,
|
|
|
+ "repeat_reason": reason,
|
|
|
+ "repeat_id": source.id,
|
|
|
+ "dataging": 0,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ if len(groupUpdateExtract) >= 500 {
|
|
|
+ mgo.UpSertBulk(extract, groupUpdateExtract...)
|
|
|
+ groupUpdateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(groupOtherExtract) >= 500 {
|
|
|
+ mgo.UpSertBulk(extract_back, groupOtherExtract...)
|
|
|
+ groupOtherExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+
|
|
|
+ } else {
|
|
|
+ updatelock.Lock()
|
|
|
+
|
|
|
+ groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "dataging": 0, //符合条件的都为dataging==0
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ if len(groupUpdateExtract) >= 500 {
|
|
|
+ mgo.UpSertBulk(extract, groupUpdateExtract...)
|
|
|
+ groupUpdateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ updatelock.Unlock()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //每组数据结束-更新数据
|
|
|
+ updatelock.Lock()
|
|
|
+ if len(groupUpdateExtract) > 0 {
|
|
|
+ mgo.UpSertBulk(extract, groupUpdateExtract...)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(groupOtherExtract) > 0 {
|
|
|
+ mgo.UpSertBulk(extract_back, groupOtherExtract...)
|
|
|
+ }
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+ }(k, v)
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+
|
|
|
+ //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
|
|
|
+ if n >= repeateN && gtid!=lteid{
|
|
|
+ for _, to := range nextNode {
|
|
|
+ next_sid := util.BsonIdToSId(gtid)
|
|
|
+ next_eid := util.BsonIdToSId(lteid)
|
|
|
+ key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "gtid": next_sid,
|
|
|
+ "lteid": next_eid,
|
|
|
+ "stype": util.ObjToString(to["stype"]),
|
|
|
+ "key": key,
|
|
|
+ })
|
|
|
+ addr := &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(to["addr"].(string)),
|
|
|
+ Port: util.IntAll(to["port"]),
|
|
|
+ }
|
|
|
+ node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
+ udptaskmap.Store(key, node)
|
|
|
+ udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ end:=time.Now().Unix()
|
|
|
+
|
|
|
+ log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
|
|
|
+ log.Println(gtid,lteid)
|
|
|
+ if end-start<60*5 {
|
|
|
+ log.Println("睡眠.............")
|
|
|
+ time.Sleep(5 * time.Minute)
|
|
|
+ }
|
|
|
+ log.Println("继续下一段的历史判重")
|
|
|
+ }
|
|
|
+}func historyTaskDay() {
|
|
|
+ defer util.Catch()
|
|
|
+
|
|
|
+ for {
|
|
|
+ start:=time.Now().Unix()
|
|
|
+
|
|
|
+ if gtid=="" {
|
|
|
+ log.Println("请传gtid,否则无法运行")
|
|
|
+ os.Exit(0)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if lteid!="" {
|
|
|
+ //先进行数据迁移
|
|
|
+ log.Println("开启一次迁移任务",gtid,lteid)
|
|
|
+ moveHistoryData(gtid,lteid)
|
|
|
+ gtid = lteid //替换数据
|
|
|
+ }
|
|
|
+
|
|
|
+ //查询表最后一个id
|
|
|
+ task_sess := task_mgo.GetMgoConn()
|
|
|
+ defer task_mgo.DestoryMongoConn(task_sess)
|
|
|
+ q:=map[string]interface{}{
|
|
|
+ "isused":true,
|
|
|
+ }
|
|
|
+ between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
|
|
|
+ it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
|
|
|
+ for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
|
|
|
+ lteid = util.ObjToString(tmp["gtid"])
|
|
|
+ log.Println("查询的最后一个任务Id:",lteid)
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
|
|
|
+ time.Sleep(5 * time.Minute)
|
|
|
+
|
|
|
+ sess := mgo.GetMgoConn()//连接器
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ //开始判重
|
|
|
+ q = map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(gtid),
|
|
|
+ "$lte": StringTOBsonId(lteid),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Println("历史判重查询条件:",q,"时间:", between_time)
|
|
|
+ it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
+ num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
|
|
|
+ updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
|
|
|
+ pendAllArr:=[][]map[string]interface{}{}//待处理数组
|
|
|
+ dayArr := []map[string]interface{}{}
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
|
|
|
+ if num%10000 == 0 {
|
|
|
+ log.Println("正序遍历:", num)
|
|
|
+ }
|
|
|
+ source := util.ObjToMap(tmp["jsondata"])
|
|
|
+ if util.IntAll((*source)["sourcewebsite"]) == 1 {
|
|
|
+ outnum++
|
|
|
+ updatelock.Lock()
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 1,
|
|
|
+ "dataging": 0,
|
|
|
+ "repeat_reason": "sourcewebsite为1 重复",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) >= 200 {
|
|
|
+ log.Println("sourcewebsite,批量更新")
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ //取-符合-发布时间X年内的数据
|
|
|
+ updatelock.Lock()
|
|
|
+ if util.IntAll(tmp["dataging"]) == 1 {
|
|
|
+ pubtime := util.Int64All(tmp["publishtime"])
|
|
|
+ if pubtime > 0 && pubtime >= between_time {
|
|
|
+ oknum++
|
|
|
+ if deterTime==0 {
|
|
|
+ log.Println("找到第一条符合条件的数据")
|
|
|
+ deterTime = util.Int64All(tmp["publishtime"])
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }else {
|
|
|
+ if pubtime-deterTime >timingSpanDay*86400 {
|
|
|
+ //新数组重新构建,当前组数据加到全部组数据
|
|
|
+ pendAllArr = append(pendAllArr,dayArr)
|
|
|
+ dayArr = []map[string]interface{}{}
|
|
|
+ deterTime = util.Int64All(tmp["publishtime"])
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }else {
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ outnum++
|
|
|
+ //不在两年内的也清标记
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "dataging": 0,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) >= 200 {
|
|
|
+ log.Println("不在周期内符合dataging==1,批量更新")
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ //批量更新标记
|
|
|
+ updatelock.Lock()
|
|
|
+
|
|
|
+ if len(updateExtract) > 0 {
|
|
|
+ log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+
|
|
|
+ if len(dayArr)>0 {
|
|
|
+ pendAllArr = append(pendAllArr,dayArr)
|
|
|
+ dayArr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("查询数量:",num,"符合条件:",oknum)
|
|
|
+
|
|
|
+ if len(pendAllArr) <= 0 {
|
|
|
+ log.Println("没找到dataging==1的数据")
|
|
|
+ }
|
|
|
+
|
|
|
+ //测试分组数量是否正确
|
|
|
+ testNum:=0
|
|
|
+ for k,v:=range pendAllArr {
|
|
|
+ log.Println("第",k,"组--","数量:",len(v))
|
|
|
+ testNum = testNum+len(v)
|
|
|
+ }
|
|
|
+ log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
|
|
|
+
|
|
|
+ n, repeateN := 0, 0
|
|
|
+ log.Println("线程数:",threadNum)
|
|
|
+ pool := make(chan bool, threadNum)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ for k,v:=range pendAllArr { //每组结束更新一波数据
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(k int, v []map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ //每组临时数组 - 互不干扰
|
|
|
+ groupUpdateExtract := [][]map[string]interface{}{}
|
|
|
+ //
|
|
|
+ groupOtherExtract := [][]map[string]interface{}{}
|
|
|
+
|
|
|
+ //构建当前组的数据池
|
|
|
+ log.Println("构建第", k, "组---(数据池)")
|
|
|
+ //当前组的第一个发布时间
|
|
|
+ first_pt := util.Int64All(v[len(v)-1]["publishtime"])
|
|
|
+ curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
|
|
|
+ log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
|
|
|
+ n = n + len(v)
|
|
|
+ log.Println("统计目前总数量:", n, "重复数量:", repeateN)
|
|
|
+ for _, tmp := range v {
|
|
|
+ info := NewInfo(tmp)
|
|
|
+ b, source, reason := curTM.check(info)
|
|
|
+ if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
|
+ repeateN++
|
|
|
+ //重复数据打标签
|
|
|
+ repeat_ids:=source.repeat_ids
|
|
|
+ repeat_ids = append(repeat_ids,info.id)
|
|
|
+ source.repeat_ids = repeat_ids
|
|
|
+ //替换数据池-更新
|
|
|
+ DM.replacePoolData(source)
|
|
|
+ updatelock.Lock()
|
|
|
+
|
|
|
+
|
|
|
+ //更新数据源- 14 或者 15
|
|
|
+ //判断是否在当前段落
|
|
|
+ if judgeIsCurIds(gtid,lteid,source.id) {
|
|
|
+ groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": StringTOBsonId(source.id),
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat_ids": repeat_ids,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }else {
|
|
|
+ groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": StringTOBsonId(source.id),
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat_ids": repeat_ids,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 1,
|
|
|
+ "repeat_reason": reason,
|
|
|
+ "repeat_id": source.id,
|
|
|
+ "dataging": 0,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ if len(groupUpdateExtract) >= 500 {
|
|
|
+ mgo.UpSertBulk(extract, groupUpdateExtract...)
|
|
|
+ groupUpdateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(groupOtherExtract) >= 500 {
|
|
|
+ mgo.UpSertBulk(extract_back, groupOtherExtract...)
|
|
|
+ groupOtherExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+
|
|
|
+ } else {
|
|
|
+ updatelock.Lock()
|
|
|
+
|
|
|
+ groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "dataging": 0, //符合条件的都为dataging==0
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ if len(groupUpdateExtract) >= 500 {
|
|
|
+ mgo.UpSertBulk(extract, groupUpdateExtract...)
|
|
|
+ groupUpdateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ updatelock.Unlock()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //每组数据结束-更新数据
|
|
|
+ updatelock.Lock()
|
|
|
+ if len(groupUpdateExtract) > 0 {
|
|
|
+ mgo.UpSertBulk(extract, groupUpdateExtract...)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(groupOtherExtract) > 0 {
|
|
|
+ mgo.UpSertBulk(extract_back, groupOtherExtract...)
|
|
|
+ }
|
|
|
+ updatelock.Unlock()
|
|
|
+
|
|
|
+ }(k, v)
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+
|
|
|
+ //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
|
|
|
+ if n >= repeateN && gtid!=lteid{
|
|
|
+ for _, to := range nextNode {
|
|
|
+ next_sid := util.BsonIdToSId(gtid)
|
|
|
+ next_eid := util.BsonIdToSId(lteid)
|
|
|
+ key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "gtid": next_sid,
|
|
|
+ "lteid": next_eid,
|
|
|
+ "stype": util.ObjToString(to["stype"]),
|
|
|
+ "key": key,
|
|
|
+ })
|
|
|
+ addr := &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(to["addr"].(string)),
|
|
|
+ Port: util.IntAll(to["port"]),
|
|
|
+ }
|
|
|
+ node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
+ udptaskmap.Store(key, node)
|
|
|
+ udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ end:=time.Now().Unix()
|
|
|
+
|
|
|
+ log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
|
|
|
+ log.Println(gtid,lteid)
|
|
|
+ if end-start<60*5 {
|
|
|
+ log.Println("睡眠.............")
|
|
|
+ time.Sleep(5 * time.Minute)
|
|
|
+ }
|
|
|
+ log.Println("继续下一段的历史判重")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func getDB() *mgo.Database {
|
|
|
+session, err := mgo.Dial("127.0.0.1:27017")
|
|
|
+if err != nil {
|
|
|
+panic(err)
|
|
|
+}
|
|
|
+session.SetMode(mgo.Monotonic, true)
|
|
|
+db := session.DB("zhengkun")
|
|
|
+return db
|
|
|
+}
|
|
|
+
|