zhengkun 8 сар өмнө
parent
commit
9cba86b3c9

+ 72 - 0
flow_repeat/README.md

@@ -802,3 +802,75 @@ db := session.DB("zhengkun")
 return db
 }
 
+
+
+//关闭数据替换功能
+//if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
+//	datalock.Lock()
+//	temp_source_id := source.id
+//	temp_info_id := info.id
+//	temp_source := info
+//	temp_source.id = temp_source_id
+//	curTM.replacePoolData(temp_source)
+//	//替换抽取表数据
+//	is_log, is_exists, ext_s_data, ext_i_data := confrimHistoryExtractData(temp_source_id, temp_info_id)
+//	is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
+//
+//	if is_log && is_bid {
+//		data_mgo.Save(extract_log, map[string]interface{}{
+//			"_id":        StringTOBsonId(temp_info_id),
+//			"replace_id": temp_source_id,
+//			"is_history": 1,
+//		})
+//		ext_s_data["repeat"] = 0
+//		ext_s_data["dataging"] = 0
+//		ext_i_data["repeat"] = 1
+//		ext_i_data["repeat_id"] = temp_source_id
+//		ext_i_data["repeat_reason"] = reason
+//		ext_i_data["dataging"] = 0
+//		ext_i_data["history_updatetime"] = qu.Int64All(time.Now().Unix())
+//		if is_exists {
+//			data_mgo.DeleteById(extract, temp_source_id)
+//			data_mgo.Save(extract, ext_s_data)
+//		} else {
+//			data_mgo.DeleteById(extract_back, temp_source_id)
+//			data_mgo.Save(extract_back, ext_s_data)
+//			is_del := data_mgo.DeleteById(extract, temp_source_id)
+//			if is_del > 0 {
+//				data_mgo.Save(extract, ext_s_data)
+//			}
+//		}
+//		data_mgo.DeleteById(extract, temp_info_id)
+//		data_mgo.Save(extract, ext_i_data)
+//
+//		task_mgo.DeleteById(task_bidding, temp_source_id)
+//		task_mgo.Save(task_bidding, bid_s_data)
+//		task_mgo.DeleteById(task_bidding, temp_info_id)
+//		task_mgo.Save(task_bidding, bid_i_data)
+//
+//		//通道填充数据
+//		msg := "id=" + temp_source_id
+//		_ = nspdata_1.Publish(msg)
+//		_ = nspdata_2.Publish(msg)
+//
+//	} else {
+//		log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
+//	}
+//
+//	datalock.Unlock()
+//} else {
+//	Update.updatePool <- []map[string]interface{}{ //重复数据打标签
+//		map[string]interface{}{
+//			"_id": tmp["_id"],
+//		},
+//		map[string]interface{}{
+//			"$set": map[string]interface{}{
+//				"repeat":             1,
+//				"repeat_reason":      reason,
+//				"repeat_id":          source.id,
+//				"dataging":           0,
+//				"history_updatetime": util.Int64All(time.Now().Unix()),
+//			},
+//		},
+//	}
+//}

+ 1 - 1
flow_repeat/datamap.go

@@ -313,7 +313,7 @@ L:
 					}
 					//相同发布时间-标题无包含关系 - 项目名称不等
 					if isTheSameDay(info.publishtime, v.publishtime) {
-						if !isTheSimilarName(info.title, v.title) {
+						if !isTheSimilarName(info.title, v.title){
 							continue
 						}
 					}

+ 241 - 0
flow_repeat/flow_historyRepeat.go

@@ -0,0 +1,241 @@
+package main
+
+import (
+	"encoding/json"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
+	"log"
+	"net"
+	"os"
+	"sync"
+	"time"
+)
+
+// 支持流式结构的历史判重...
+func historyFlowRepeat() {
+	defer qu.Catch()
+	//刚启动根据起始id...查询到至今的...往前推1小时...
+	for {
+		if gtid == "" {
+			log.Println("请传gtid,否则无法运行")
+			os.Exit(0)
+			return
+		}
+		if lteid != "" && !IsFull { //先进行数据迁移
+			log.Println("开启一次迁移任务", gtid, lteid)
+			moveHistoryData(gtid, lteid)
+			gtid = lteid //替换数据
+		}
+		//查询表最后一个id...
+		isRepeatStatus := false
+		lteid = FindOneLteid()
+		if lteid > gtid {
+			isRepeatStatus = true
+		}
+		if !isRepeatStatus {
+			log.Println("查询不到最新lteid数据...睡眠...")
+			time.Sleep(30 * time.Second)
+			continue
+		}
+		log.Println("查询找到最新的lteid...", gtid, lteid)
+		if isUpdateSite {
+			initSite()
+		}
+		sess := data_mgo.GetMgoConn() //连接器
+		defer data_mgo.DestoryMongoConn(sess)
+		between_time := time.Now().Unix() - (86400 * timingPubScope) //周期
+		//开始判重
+		q := map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  StringTOBsonId(gtid),
+				"$lte": StringTOBsonId(lteid),
+			},
+		}
+		log.Println("历史判重查询条件:", q, "时间:", between_time)
+		it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+		num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数
+		pendAllArr := [][]map[string]interface{}{}                              //待处理数组
+		dayArr := []map[string]interface{}{}
+		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
+			if num%10000 == 0 {
+				log.Println("正序遍历:", num)
+			}
+			//取-符合-发布时间X年内的数据
+			if qu.IntAll(tmp["dataging"]) == 1 {
+				pubtime := qu.Int64All(tmp["publishtime"])
+				if pubtime > 0 && pubtime >= between_time && qu.ObjToString(tmp["subtype"]) != "拟建" && qu.ObjToString(tmp["subtype"]) != "产权" &&
+					qu.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" {
+					oknum++
+					if deterTime == 0 {
+						log.Println("找到第一条符合条件的数据")
+						deterTime = qu.Int64All(tmp["publishtime"])
+						dayArr = append(dayArr, tmp)
+					} else {
+						if pubtime-deterTime > timingSpanDay*86400 {
+							//新数组重新构建,当前组数据加到全部组数据
+							pendAllArr = append(pendAllArr, dayArr)
+							dayArr = []map[string]interface{}{}
+							deterTime = qu.Int64All(tmp["publishtime"])
+							dayArr = append(dayArr, tmp)
+						} else {
+							dayArr = append(dayArr, tmp)
+						}
+					}
+				} else {
+					outnum++
+					update := map[string]interface{}{
+						"dataging":           0,
+						"history_updatetime": qu.Int64All(time.Now().Unix()),
+					}
+					//不在两年内的也清标记
+					Update.updatePool <- []map[string]interface{}{ //重复数据打标签
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": update,
+						},
+					}
+					//发送消息告知...需要进行更新操作...
+					sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"]))
+				}
+			}
+			tmp = make(map[string]interface{})
+		}
+
+		if len(dayArr) > 0 {
+			pendAllArr = append(pendAllArr, dayArr)
+			dayArr = []map[string]interface{}{}
+		}
+		log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum)
+		if len(pendAllArr) <= 0 {
+			log.Println("没找到dataging==1的数据")
+		}
+		//测试分组数量是否正确
+		testNum := 0
+		for k, v := range pendAllArr {
+			log.Println("第", k, "组--", "数量:", len(v))
+			testNum = testNum + len(v)
+		}
+		log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum)
+
+		n, repeateN := 0, 0
+		log.Println("线程数:", threadNum)
+		pool := make(chan bool, threadNum)
+		wg := &sync.WaitGroup{}
+		for k, v := range pendAllArr { //每组结束更新一波数据
+			pool <- true
+			wg.Add(1)
+			go func(k int, v []map[string]interface{}) {
+				defer func() {
+					<-pool
+					wg.Done()
+				}()
+				log.Println("构建第", k, "组---(数据池)")
+				//当前组的第一个发布时间
+				first_pt := qu.Int64All(v[len(v)-1]["publishtime"])
+				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
+				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
+				n = n + len(v)
+				log.Println("统计目前总数量:", n, "重复数量:", repeateN)
+				for _, tmp := range v {
+					info := NewInfo(tmp)
+					b, source, reason := curTM.check(info)
+					if b { //有重复,更新
+						repeateN++
+						update := map[string]interface{}{
+							"repeat":             1,
+							"repeat_reason":      reason,
+							"repeat_id":          source.id,
+							"dataging":           0,
+							"history_updatetime": qu.Int64All(time.Now().Unix()),
+						}
+						Update.updatePool <- []map[string]interface{}{ //重复数据打标签
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": update,
+							},
+						}
+
+						//发送消息告知...需要进行更新操作...
+						sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"]))
+
+					} else {
+						update := map[string]interface{}{
+							"dataging":           0, //符合条件的都为dataging==0
+							"history_updatetime": qu.Int64All(time.Now().Unix()),
+						}
+						Update.updatePool <- []map[string]interface{}{ //重复数据打标签
+							map[string]interface{}{
+								"_id": tmp["_id"],
+							},
+							map[string]interface{}{
+								"$set": update,
+							},
+						}
+						//发送消息告知...需要进行更新操作...
+						sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"]))
+					}
+				}
+			}(k, v)
+		}
+		wg.Wait()
+		log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid)
+		time.Sleep(10 * time.Second)
+		//发送upd支持第二阶段流程...
+		if gtid != lteid {
+			for _, to := range nextNode {
+				next_sid := qu.BsonIdToSId(gtid)
+				next_eid := qu.BsonIdToSId(lteid)
+				key := next_sid + "-" + next_eid + "-" + qu.ObjToString(to["stype"])
+				by, _ := json.Marshal(map[string]interface{}{
+					"gtid":  next_sid,
+					"lteid": next_eid,
+					"stype": qu.ObjToString(to["stype"]),
+					"key":   key,
+				})
+				addr := &net.UDPAddr{
+					IP:   net.ParseIP(to["addr"].(string)),
+					Port: qu.IntAll(to["port"]),
+				}
+				node := &udpNode{by, addr, time.Now().Unix(), 0}
+				udptaskmap.Store(key, node)
+				udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+			}
+		}
+		log.Println("继续下一段的历史判重")
+	}
+}
+
+// 查询最后一个id...倒推...100条?
+func FindOneLteid() string {
+	task_sess := task_mgo.GetMgoConn()
+	defer task_mgo.DestoryMongoConn(task_sess)
+	q, total := map[string]interface{}{}, 0
+	it_last := task_sess.DB(task_mgo.DbName).C(task_bidding).Find(&q).Sort("-_id").Iter()
+	for tmp := make(map[string]interface{}); it_last.Next(&tmp); total++ {
+		if total >= 100 {
+			lteid = qu.ObjToString(tmp["lteid"])
+			break
+		}
+		tmp = make(map[string]interface{})
+	}
+	return lteid
+}
+
+// 发送消息...流结构...
+func sendFlowRepeatInfo(update map[string]interface{}, tmpid string) {
+	msgInfo := MsgInfo{}
+	msgInfo.Id = tmpid
+	msgInfo.Data = update
+	bs, err := json.Marshal(msgInfo)
+	if err == nil {
+		jn.PubReqZip("repeat_task", bs, time.Second)
+	} else {
+		log.Println("异常发送流数据...", tmpid)
+		//需要保存记录 tmpid ...
+
+	}
+}

+ 73 - 0
flow_repeat/flow_increaseRepeat.go

@@ -0,0 +1,73 @@
+package main
+
+import qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+
+func increaseFlowRepeat(msgInfo *MsgInfo) {
+	msg_info := msgInfo.Data
+	tmp := *qu.ObjToMap(msg_info["ext"])
+	if tmp != nil {
+		//特殊类不判重
+		if qu.IntAll(tmp["repeat"]) == 1 || qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" ||
+			(qu.IntAll(tmp["dataging"]) == 1 && !IsFull) ||
+			(qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权") {
+
+		} else {
+			info := NewInfo(tmp)
+			b, source, reason := DM.check(info)
+			if b { //判断信息是否为-指定剑鱼发布数据
+				if jyfb_data[info.spidercode] != "" { //伪判重标记
+					tmp["repeat_jyfb"] = 1
+					msg_info["ext"] = tmp
+					msgInfo.Data = msg_info
+				} else { //判断是否为~替换数据~模式
+					if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
+						datalock.Lock()
+						temp_source_id := source.id
+						temp_info_id := info.id
+						temp_source := info
+						temp_source.id = temp_source_id
+						DM.replacePoolData(temp_source) //替换数据池数据
+						//替换抽取表数据
+						is_log, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id)
+						if is_log {
+							data_mgo.Save(extract_log, map[string]interface{}{
+								"_id":        tmp["_id"],
+								"replace_id": temp_source_id,
+								"is_history": 0,
+							})
+							ext_s_data["repeat"] = 0
+							ext_i_data["repeat"] = 1
+							ext_i_data["repeat_id"] = temp_source_id
+							ext_i_data["repeat_reason"] = reason
+
+							data_mgo.DeleteById(extract, temp_source_id)
+							data_mgo.Save(extract, ext_s_data)
+
+							is_del := data_mgo.DeleteById(extract_back, temp_source_id)
+							if is_del > 0 {
+								data_mgo.Save(extract_back, ext_s_data)
+							}
+							data_mgo.DeleteById(extract, temp_info_id)
+							data_mgo.Save(extract, ext_i_data)
+
+							//替换数据特殊消息...
+							msgInfo.Extend = Extend{
+								Repeat: MsgRepeat{
+									SId: temp_source_id,
+									RId: temp_info_id,
+								},
+							}
+						}
+						datalock.Unlock()
+					}
+					//写入具体数据...返回消息...
+					tmp["repeat"] = 1
+					tmp["repeat_reason"] = reason
+					tmp["repeat_id"] = source.id
+					msg_info["ext"] = tmp
+					msgInfo.Data = msg_info
+				}
+			}
+		}
+	}
+}

+ 73 - 0
flow_repeat/flows.go

@@ -0,0 +1,73 @@
+package main
+
+import (
+	"github.com/nats-io/nats.go"
+	"go.mongodb.org/mongo-driver/bson"
+	"jygit.jydev.jianyu360.cn/BP/jynats/jnats"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"sync"
+)
+
+type MsgInfo struct {
+	Id       string                 //消息唯一id
+	CurrSetp string                 //当前步骤
+	NextSetp string                 //下个步骤,特殊流程增加
+	IsEnd    int                    //当前流程后结束 1-结束
+	Data     map[string]interface{} //数据内容
+	Err      string                 //错误信息 有错误会告警并终止流程
+	Stime    int64
+	Etime    int64
+	Extend   Extend
+}
+
+type Extend struct {
+	Repeat MsgRepeat
+}
+type MsgRepeat struct {
+	SId string //原始id
+	RId string //被替换id
+}
+
+var FlowTask = &sync.Map{}
+
+type FlowInfo struct {
+	msg     *nats.Msg
+	msgInfo *MsgInfo
+}
+
+// 增量判重使用...
+func initRepeatNats() {
+	jn = jnats.NewJnats("192.168.3.240:19090")
+	jn.SubZip("dataprocess.repeat", func(msg *nats.Msg) {
+		msgInfo := &MsgInfo{}
+		err := bson.Unmarshal(msg.Data, &msgInfo)
+		if err != nil {
+			msgInfo.Err = err.Error()
+			bs, _ := bson.Marshal(msgInfo)
+			msg.Respond(bs)
+		} else {
+			subtype := qu.ObjToString(msgInfo.Data["subtype"])
+			if ch, ok := FlowTask.Load(subtype); ok && ch != nil {
+				ch.(chan FlowInfo) <- FlowInfo{msg, msgInfo}
+			} else {
+				c := make(chan FlowInfo, 3000)
+				FlowTask.Store(subtype, c)
+				flowsChanRepeat(c)
+				c <- FlowInfo{msg, msgInfo}
+			}
+		}
+	})
+}
+
+func flowsChanRepeat(ch chan FlowInfo) {
+	go func() {
+		for {
+			select {
+			case info := <-ch:
+				increaseFlowRepeat(info.msgInfo)
+				bs, _ := bson.Marshal(info.msgInfo)
+				info.msg.Respond(bs)
+			}
+		}
+	}()
+}

+ 2 - 7
flow_repeat/go.sum

@@ -58,8 +58,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
 github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
 github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
@@ -118,8 +118,8 @@ github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+z
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
 github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
 github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
@@ -150,8 +150,6 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
-golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
 golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
 golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -171,8 +169,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
 golang.org/x/net v0.0.0-20210916014120-12bc252f5db8/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
-golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
 golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@@ -213,7 +209,6 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
 google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=

+ 0 - 70
flow_repeat/historyRepeat.go

@@ -176,76 +176,6 @@ func historyRepeat() {
 								},
 							},
 						}
-						//关闭数据替换功能
-						//if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
-						//	datalock.Lock()
-						//	temp_source_id := source.id
-						//	temp_info_id := info.id
-						//	temp_source := info
-						//	temp_source.id = temp_source_id
-						//	curTM.replacePoolData(temp_source)
-						//	//替换抽取表数据
-						//	is_log, is_exists, ext_s_data, ext_i_data := confrimHistoryExtractData(temp_source_id, temp_info_id)
-						//	is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
-						//
-						//	if is_log && is_bid {
-						//		data_mgo.Save(extract_log, map[string]interface{}{
-						//			"_id":        StringTOBsonId(temp_info_id),
-						//			"replace_id": temp_source_id,
-						//			"is_history": 1,
-						//		})
-						//		ext_s_data["repeat"] = 0
-						//		ext_s_data["dataging"] = 0
-						//		ext_i_data["repeat"] = 1
-						//		ext_i_data["repeat_id"] = temp_source_id
-						//		ext_i_data["repeat_reason"] = reason
-						//		ext_i_data["dataging"] = 0
-						//		ext_i_data["history_updatetime"] = qu.Int64All(time.Now().Unix())
-						//		if is_exists {
-						//			data_mgo.DeleteById(extract, temp_source_id)
-						//			data_mgo.Save(extract, ext_s_data)
-						//		} else {
-						//			data_mgo.DeleteById(extract_back, temp_source_id)
-						//			data_mgo.Save(extract_back, ext_s_data)
-						//			is_del := data_mgo.DeleteById(extract, temp_source_id)
-						//			if is_del > 0 {
-						//				data_mgo.Save(extract, ext_s_data)
-						//			}
-						//		}
-						//		data_mgo.DeleteById(extract, temp_info_id)
-						//		data_mgo.Save(extract, ext_i_data)
-						//
-						//		task_mgo.DeleteById(task_bidding, temp_source_id)
-						//		task_mgo.Save(task_bidding, bid_s_data)
-						//		task_mgo.DeleteById(task_bidding, temp_info_id)
-						//		task_mgo.Save(task_bidding, bid_i_data)
-						//
-						//		//通道填充数据
-						//		msg := "id=" + temp_source_id
-						//		_ = nspdata_1.Publish(msg)
-						//		_ = nspdata_2.Publish(msg)
-						//
-						//	} else {
-						//		log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
-						//	}
-						//
-						//	datalock.Unlock()
-						//} else {
-						//	Update.updatePool <- []map[string]interface{}{ //重复数据打标签
-						//		map[string]interface{}{
-						//			"_id": tmp["_id"],
-						//		},
-						//		map[string]interface{}{
-						//			"$set": map[string]interface{}{
-						//				"repeat":             1,
-						//				"repeat_reason":      reason,
-						//				"repeat_id":          source.id,
-						//				"dataging":           0,
-						//				"history_updatetime": util.Int64All(time.Now().Unix()),
-						//			},
-						//		},
-						//	}
-						//}
 					} else {
 						Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 							map[string]interface{}{

+ 3 - 4
flow_repeat/initData.go

@@ -2,17 +2,16 @@ package main
 
 import (
 	"flow_repeat/nsqdata"
-	"log"
-	"regexp"
-
 	"github.com/robfig/cron/v3"
 	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	"log"
+	"regexp"
 )
 
 func InitAllInfos() {
 	initMgo()
 	initVar()
-	initNsq()
+	//initNsq() //支持数据替换
 	initSite()
 	initData()
 }

+ 9 - 15
flow_repeat/main.go

@@ -9,15 +9,14 @@ import (
 	"flag"
 	"flow_repeat/nsqdata"
 	"fmt"
+	"jygit.jydev.jianyu360.cn/BP/jynats/jnats"
+	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
+	mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 	"log"
 	"net"
 	"regexp"
 	"sync"
 	"time"
-
-	"jygit.jydev.jianyu360.cn/BP/jynats/jnats"
-	qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
-	mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 )
 
 var (
@@ -61,25 +60,20 @@ func init() {
 	InitAllInfos() //加载所有信息...
 }
 
-func main() {
+func mainT() {
+
 	if TimingTask {
 		log.Println("正常历史部署...组装...")
-		go historyRepeat()
+		go historyFlowRepeat()
 	} else {
 		log.Println("正常增量部署...流式...")
-		jn = jnats.NewJnats("192.168.3.240:19092")
-		//
-		////先消费,带zip压缩,用于跨网传输节省流量
-		//jn.SubZip("test", func(msg *nats.Msg) {
-		//	log.Println(string(msg.Data))
-		//	//回执消息
-		//	msg.Respond([]byte("receive msg:" + string(msg.Data)))
-		//})
+		go initRepeatNats()
 	}
 	time.Sleep(99999 * time.Hour)
 }
 
-func mainTest() {
+func main() {
+	IsFull = true
 	increaseRepeat(map[string]interface{}{
 		"gtid":  "12ec61170ae152a3c2310f02",
 		"lteid": "92ec61170ae152a3c2310f02",

+ 6 - 52
udp_repeat/src/README.md

@@ -1,55 +1,9 @@
-{
-"udpport": ":1785",
-"dupdays": 7,
-"mongodb": {
-"addr": "172.17.4.85:27080",
-"pool": 10,
-"db": "qfw",
-"extract": "result_20220219",
-"extract_back": "result_20220218",
-"extract_log": "result_replace_log"
-},
-"task_mongodb": {
-"task_addr": "172.17.4.187:27082,172.17.145.163:27083",
-"task_db": "qfw",
-"task_coll": "bidding_processing_ids",
-"task_bidding": "bidding",
-"task_pool": 5
-},
-"spider_mongodb": {
-"spider_addr": "172.17.4.87:27080",
-"spider_db": "editor",
-"spider_coll": "site",
-"spider_pool": 5
-},
-"userName": "zhengkun",
-"passWord": "zk@123123",
-"jkmail": {
-"to": "zhengkun@topnet.net.cn,wangjianghan@topnet.net.cn",
-"api": "http://172.17.145.179:19281/_send/_mail"
-},
-"nextNode": [
-{
-"addr": "172.17.4.196",
-"port": 1787,
-"stype": "bidding",
-"memo": "同步程序id段udp"
-}
-],
-"jyfb_data": [
-"a_jyxxfbpt_gg"
-],
-"threads": 4,
-"lowHeavy":true,
-"timingTask":false,
-"timingSpanDay": 5,
-"timingPubScope": 1440,
-"specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
-"specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
-"specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
-"specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
-"beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
-}
+
+
+
+
+
+
 
 
 

+ 22 - 11
udp_repeat/src/config.json

@@ -1,6 +1,8 @@
 {
     "udpport": ":1785",
     "dupdays": 7,
+    "update_ai": true,
+    "nsq_addr" :"172.17.162.36:4150",
     "mongodb": {
         "addr": "127.0.0.1:27017",
         "db": "zhengkun",
@@ -8,42 +10,51 @@
         "password": "",
         "extract": "repeat_test",
         "extract_back": "repeat_test",
-        "extract_log": "result_replace_log",
-        "pool": 5
+        "extract_log": "repeat_test",
+        "pool":10
     },
     "task_mongodb": {
         "task_addr": "127.0.0.1:27017",
         "task_db": "zhengkun",
         "username": "",
         "password": "",
-        "task_coll": "bidding_processing_ids",
-        "task_bidding": "bidding",
-        "task_pool": 5
+        "task_coll": "127.0.0.1:27017",
+        "task_pool": 10
     },
+    "qfw_mongodb": {
+        "qfw_addr": "127.0.0.1:27017",
+        "qfw_db": "qfw",
+        "username": "",
+        "password": "",
+        "qfw_coll": "repeat_test",
+        "task_pool": 10
+    },
+
     "spider_mongodb": {
         "spider_addr": "127.0.0.1:27017",
         "spider_db": "zhengkun",
         "username": "",
         "password": "",
         "spider_coll": "site",
-        "spider_pool": 5
+        "spider_pool": 10
     },
     "jkmail": {
-        "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn",
-        "api": "http://172.17.145.179:19281/_send/_mail"
+        "to": "zhengkun@topnet.net.cn,xuzhiheng@topnet.net.cn,wangchengcheng@topnet.net.cn",
+        "api": "http://172.17.162.36:19281/_send/_mail"
     },
     "nextNode": [],
     "jyfb_data": [
         "a_jyxxfbpt_gg"
     ],
-    "threads": 1,
+    "threads": 4,
     "lowHeavy":true,
     "timingTask":false,
-    "timingSpanDay": 4,
-    "timingPubScope": 720,
+    "timingSpanDay": 5,
+    "timingPubScope": 1440,
     "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
     "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
     "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
 }
+

+ 45 - 2
udp_repeat/src/dataMethod.go

@@ -171,6 +171,11 @@ func deleteExtraSpaceName(s string) string {
 	return string(s2)
 }
 
+func deleteExtraSuffixName(s string) string {
+	s = strings.ReplaceAll(s, "股份有限公司", "有限公司")
+	return s
+}
+
 // 中标金额倍率:10000
 func isBidWinningAmount(f1 float64, f2 float64) bool {
 	if f1 == f2 || f1*10000 == f2 || f2*10000 == f1 {
@@ -288,6 +293,44 @@ func leadingElementSame(v *Info, info *Info) bool {
 	return false
 }
 
+// 前置特殊组合重复-
+func leadingSpecElementSame(v *Info, info *Info) bool {
+	//项目名称包含关系,同一天,同城市,等主要字段一致,
+	if !isTheSameDay(info.publishtime, v.publishtime) {
+		return false
+	}
+	if info.projectname != "" && (strings.Contains(v.projectname, info.projectname) || strings.Contains(info.projectname, v.projectname)) {
+
+	} else {
+		return false
+	}
+	if info.buyer != "" && v.buyer == info.buyer {
+
+	} else {
+		return false
+	}
+	if info.city != "" && v.city == info.city {
+
+	} else {
+		return false
+	}
+
+	//而外舍弃判定
+	if info.projectcode != "" && v.projectcode != "" && v.projectcode != info.projectcode {
+		return false
+	}
+	if v.budget != info.budget && v.budget != 0 && info.budget != 0 {
+		return false
+	}
+	if isBidWinningAmount(v.bidamount, info.bidamount) && v.bidamount != 0.0 && info.bidamount != 0.0 {
+		return false
+	}
+	if v.winner != "" && info.winner != "" && v.winner != info.winner {
+		return false
+	}
+	return true
+}
+
 // 前置0 竞品要素简易计算
 func jingPinElementSame(v *Info, info *Info) bool {
 	if info.projectname != "" && v.projectname != info.projectname {
@@ -381,8 +424,8 @@ func confrimBiddingData(source_id string, info_id string) (bool, map[string]inte
 	source_data := map[string]interface{}{}
 	info_data := map[string]interface{}{}
 	isvalid := false
-	source_data = task_mgo.FindById(task_bidding, source_id)
-	info_data = task_mgo.FindById(task_bidding, info_id)
+	source_data = task_mgo.FindById(task_coll, source_id)
+	info_data = task_mgo.FindById(task_coll, info_id)
 	if len(source_data) > 2 && len(info_data) > 2 {
 		isvalid = true
 		ts_id := source_data["_id"]

+ 13 - 3
udp_repeat/src/datamap.go

@@ -217,6 +217,7 @@ func NewInfo(tmp map[string]interface{}) *Info {
 	info.city = qutil.ObjToString(tmp["city"])
 	info.agency = qutil.ObjToString(tmp["agency"])
 	info.winner = deleteExtraSpaceName(qutil.ObjToString(tmp["winner"]))
+	info.winner = deleteExtraSuffixName(info.winner)
 	info.budget = qutil.Float64All(tmp["budget"])
 	info.bidamount = qutil.Float64All(tmp["bidamount"])
 	info.publishtime = qutil.Int64All(tmp["publishtime"])
@@ -387,7 +388,16 @@ L:
 					}
 				}
 
-				//代理机构相同-非空相等
+				//新增特殊组合数据判重···
+				if leadingSpecElementSame(v, info) {
+					reason = "特别要素-相同-满足-同城-发布时间-采购单位等"
+					b = true
+					source = v
+					reasons = reason
+					break L
+				}
+
+				//要素判重
 				if v.agency != "" && info.agency != "" && v.agency == info.agency {
 					reason = reason + "同机构-"
 					repeat := false
@@ -466,7 +476,7 @@ func (d *datamap) update(t int64) {
 
 	} else {
 		if IsFull {
-			d.keymap = d.GetLatelyFiveDay(t) //全量
+			d.keymap = d.GetLatelyFiveFullDay(t) //全量
 		} else {
 			d.keymap = d.GetLatelyFiveDayDouble(t) //增量
 		}
@@ -488,7 +498,7 @@ func (d *datamap) update(t int64) {
 
 }
 
-func (d *datamap) GetLatelyFiveDay(t int64) []string {
+func (d *datamap) GetLatelyFiveFullDay(t int64) []string {
 	array := make([]string, d.days)
 	now := time.Unix(t, 0)
 	for i := 0; i < d.days; i++ {

+ 54 - 107
udp_repeat/src/historyRepeat.go

@@ -8,16 +8,37 @@ import (
 	mu "mfw/util"
 	"net"
 	"os"
-	qu "qfw/common/src/qfw/util"
-	"qfw/util"
+	qutil "qfw/util"
 	"strconv"
 	"sync"
 	"time"
 )
 
+func getHistoryInfoId(keystatus string) bool {
+	//查询表最后一个id
+	qfw_sess := qfw_mgo.GetMgoConn()
+	defer qfw_mgo.DestoryMongoConn(qfw_sess)
+	q := map[string]interface{}{}
+	it_last := qfw_sess.DB(qfw_mgo.DbName).C(qfw_coll).Find(&q).Sort("-_id").Iter()
+	isRepeatStatus := false
+	for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
+		is_repeat_status := qutil.IntAll(tmp[keystatus])
+		if is_repeat_status == 1 {
+			lteid = qutil.ObjToString(tmp["lteid"])
+			log.Println("查询的最后一个已标记的任务lteid:", lteid)
+			isRepeatStatus = true
+			tmp = make(map[string]interface{})
+			break
+		} else {
+			tmp = make(map[string]interface{})
+		}
+	}
+	return isRepeatStatus
+}
+
 // 历史判重
 func historyRepeat() {
-	defer util.Catch()
+	defer qutil.Catch()
 	for {
 		start := time.Now().Unix()
 		if gtid == "" {
@@ -30,23 +51,12 @@ func historyRepeat() {
 			moveHistoryData(gtid, lteid)
 			gtid = lteid //替换数据
 		}
-		//查询表最后一个id
-		task_sess := task_mgo.GetMgoConn()
-		defer task_mgo.DestoryMongoConn(task_sess)
-		q := map[string]interface{}{}
-		it_last := task_sess.DB(task_mgo.DbName).C(task_coll).Find(&q).Sort("-_id").Iter()
+		//查询历史数据id
 		isRepeatStatus := false
-		for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
-			is_repeat_status := util.IntAll(tmp["repeat_status"])
-			if is_repeat_status == 1 {
-				lteid = util.ObjToString(tmp["lteid"])
-				log.Println("查询的最后一个已标记的任务lteid:", lteid)
-				isRepeatStatus = true
-				tmp = make(map[string]interface{})
-				break
-			} else {
-				tmp = make(map[string]interface{})
-			}
+		if !update_ai {
+			isRepeatStatus = getHistoryInfoId("repeat_status")
+		} else {
+			isRepeatStatus = getHistoryInfoId("repeat_status_ai")
 		}
 		if !isRepeatStatus {
 			log.Println("查询不到有标记的lteid数据......睡眠......")
@@ -64,8 +74,12 @@ func historyRepeat() {
 		defer data_mgo.DestoryMongoConn(sess)
 		between_time := time.Now().Unix() - (86400 * timingPubScope) //两年周期
 
+		////临时-补齐差额
+		//log.Println("临时···66c58769b25c3e1deb79107c,66f617bdb25c3e1deb3ee999")
+		//gtid = "66c58769b25c3e1deb79107c"
+		//lteid = "66f617bdb25c3e1deb3ee999"
 		//开始判重
-		q = map[string]interface{}{
+		q := map[string]interface{}{
 			"_id": map[string]interface{}{
 				"$gt":  StringTOBsonId(gtid),
 				"$lte": StringTOBsonId(lteid),
@@ -78,24 +92,27 @@ func historyRepeat() {
 		dayArr := []map[string]interface{}{}
 		for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
 			if num%10000 == 0 {
-				log.Println("正序遍历:", num)
+				log.Println("正序遍历:", num, "~", oknum)
 			}
+			delete(tmp, "field_source")
+			delete(tmp, "regions_log")
+			delete(tmp, "kvtext")
 			//取-符合-发布时间X年内的数据
-			if util.IntAll(tmp["dataging"]) == 1 {
-				pubtime := util.Int64All(tmp["publishtime"])
-				if pubtime > 0 && pubtime >= between_time && qu.ObjToString(tmp["subtype"]) != "拟建" && qu.ObjToString(tmp["subtype"]) != "产权" &&
-					qu.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" {
+			if qutil.IntAll(tmp["dataging"]) == 1 {
+				pubtime := qutil.Int64All(tmp["publishtime"])
+				if pubtime > 0 && pubtime >= between_time && qutil.ObjToString(tmp["subtype"]) != "拟建" && qutil.ObjToString(tmp["subtype"]) != "产权" &&
+					qutil.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" {
 					oknum++
 					if deterTime == 0 {
 						log.Println("找到第一条符合条件的数据")
-						deterTime = util.Int64All(tmp["publishtime"])
+						deterTime = qutil.Int64All(tmp["publishtime"])
 						dayArr = append(dayArr, tmp)
 					} else {
 						if pubtime-deterTime > timingSpanDay*86400 {
 							//新数组重新构建,当前组数据加到全部组数据
 							pendAllArr = append(pendAllArr, dayArr)
 							dayArr = []map[string]interface{}{}
-							deterTime = util.Int64All(tmp["publishtime"])
+							deterTime = qutil.Int64All(tmp["publishtime"])
 							dayArr = append(dayArr, tmp)
 						} else {
 							dayArr = append(dayArr, tmp)
@@ -111,7 +128,7 @@ func historyRepeat() {
 						map[string]interface{}{
 							"$set": map[string]interface{}{
 								"dataging":           0,
-								"history_updatetime": util.Int64All(time.Now().Unix()),
+								"history_updatetime": qutil.Int64All(time.Now().Unix()),
 							},
 						},
 					}
@@ -153,7 +170,7 @@ func historyRepeat() {
 				}()
 				log.Println("构建第", k, "组---(数据池)")
 				//当前组的第一个发布时间
-				first_pt := util.Int64All(v[len(v)-1]["publishtime"])
+				first_pt := qutil.Int64All(v[len(v)-1]["publishtime"])
 				curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
 				log.Println("开始遍历判重第", k, "组  共计数量:", len(v))
 				n = n + len(v)
@@ -173,80 +190,10 @@ func historyRepeat() {
 									"repeat_reason":      reason,
 									"repeat_id":          source.id,
 									"dataging":           0,
-									"history_updatetime": util.Int64All(time.Now().Unix()),
+									"history_updatetime": qutil.Int64All(time.Now().Unix()),
 								},
 							},
 						}
-						//关闭数据替换功能
-						//if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
-						//	datalock.Lock()
-						//	temp_source_id := source.id
-						//	temp_info_id := info.id
-						//	temp_source := info
-						//	temp_source.id = temp_source_id
-						//	curTM.replacePoolData(temp_source)
-						//	//替换抽取表数据
-						//	is_log, is_exists, ext_s_data, ext_i_data := confrimHistoryExtractData(temp_source_id, temp_info_id)
-						//	is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
-						//
-						//	if is_log && is_bid {
-						//		data_mgo.Save(extract_log, map[string]interface{}{
-						//			"_id":        StringTOBsonId(temp_info_id),
-						//			"replace_id": temp_source_id,
-						//			"is_history": 1,
-						//		})
-						//		ext_s_data["repeat"] = 0
-						//		ext_s_data["dataging"] = 0
-						//		ext_i_data["repeat"] = 1
-						//		ext_i_data["repeat_id"] = temp_source_id
-						//		ext_i_data["repeat_reason"] = reason
-						//		ext_i_data["dataging"] = 0
-						//		ext_i_data["history_updatetime"] = qu.Int64All(time.Now().Unix())
-						//		if is_exists {
-						//			data_mgo.DeleteById(extract, temp_source_id)
-						//			data_mgo.Save(extract, ext_s_data)
-						//		} else {
-						//			data_mgo.DeleteById(extract_back, temp_source_id)
-						//			data_mgo.Save(extract_back, ext_s_data)
-						//			is_del := data_mgo.DeleteById(extract, temp_source_id)
-						//			if is_del > 0 {
-						//				data_mgo.Save(extract, ext_s_data)
-						//			}
-						//		}
-						//		data_mgo.DeleteById(extract, temp_info_id)
-						//		data_mgo.Save(extract, ext_i_data)
-						//
-						//		task_mgo.DeleteById(task_bidding, temp_source_id)
-						//		task_mgo.Save(task_bidding, bid_s_data)
-						//		task_mgo.DeleteById(task_bidding, temp_info_id)
-						//		task_mgo.Save(task_bidding, bid_i_data)
-						//
-						//		//通道填充数据
-						//		msg := "id=" + temp_source_id
-						//		_ = nspdata_1.Publish(msg)
-						//		_ = nspdata_2.Publish(msg)
-						//
-						//	} else {
-						//		log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
-						//	}
-						//
-						//	datalock.Unlock()
-						//} else {
-						//	Update.updatePool <- []map[string]interface{}{ //重复数据打标签
-						//		map[string]interface{}{
-						//			"_id": tmp["_id"],
-						//		},
-						//		map[string]interface{}{
-						//			"$set": map[string]interface{}{
-						//				"repeat":             1,
-						//				"repeat_reason":      reason,
-						//				"repeat_id":          source.id,
-						//				"dataging":           0,
-						//				"history_updatetime": util.Int64All(time.Now().Unix()),
-						//			},
-						//		},
-						//	}
-						//}
 					} else {
 						Update.updatePool <- []map[string]interface{}{ //重复数据打标签
 							map[string]interface{}{
@@ -255,7 +202,7 @@ func historyRepeat() {
 							map[string]interface{}{
 								"$set": map[string]interface{}{
 									"dataging":           0, //符合条件的都为dataging==0
-									"history_updatetime": util.Int64All(time.Now().Unix()),
+									"history_updatetime": qutil.Int64All(time.Now().Unix()),
 								},
 							},
 						}
@@ -271,18 +218,18 @@ func historyRepeat() {
 		//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
 		if gtid != lteid {
 			for _, to := range nextNode {
-				next_sid := util.BsonIdToSId(gtid)
-				next_eid := util.BsonIdToSId(lteid)
-				key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+				next_sid := qutil.BsonIdToSId(gtid)
+				next_eid := qutil.BsonIdToSId(lteid)
+				key := next_sid + "-" + next_eid + "-" + qutil.ObjToString(to["stype"])
 				by, _ := json.Marshal(map[string]interface{}{
 					"gtid":  next_sid,
 					"lteid": next_eid,
-					"stype": util.ObjToString(to["stype"]),
+					"stype": qutil.ObjToString(to["stype"]),
 					"key":   key,
 				})
 				addr := &net.UDPAddr{
 					IP:   net.ParseIP(to["addr"].(string)),
-					Port: util.IntAll(to["port"]),
+					Port: qutil.IntAll(to["port"]),
 				}
 				node := &udpNode{by, addr, time.Now().Unix(), 0}
 				udptaskmap.Store(key, node)
@@ -361,7 +308,7 @@ func moveOnceTimeOut() {
 	now := time.Now()
 
 	move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
-	task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
+	task_id := qutil.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
 			"$lt": StringTOBsonId(task_id),

+ 22 - 61
udp_repeat/src/increaseRepeat.go

@@ -97,7 +97,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 					} else {
 						num++
 						//判断是否为~替换数据~模式
-						if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
+						if judgeIsReplaceInfo(source.href, info.href) && !IsFull && !update_ai {
 							datalock.Lock()
 							temp_source_id := source.id
 							temp_info_id := info.id
@@ -120,24 +120,26 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 
 								data_mgo.DeleteById(extract, temp_source_id)
 								data_mgo.Save(extract, ext_s_data)
-
 								is_del := data_mgo.DeleteById(extract_back, temp_source_id)
 								if is_del > 0 {
 									data_mgo.Save(extract_back, ext_s_data)
 								}
-
 								data_mgo.DeleteById(extract, temp_info_id)
 								data_mgo.Save(extract, ext_i_data)
 
-								task_mgo.DeleteById(task_bidding, temp_source_id)
-								task_mgo.Save(task_bidding, bid_s_data)
-								task_mgo.DeleteById(task_bidding, temp_info_id)
-								task_mgo.Save(task_bidding, bid_i_data)
-
+								task_mgo.DeleteById(task_coll, temp_source_id)
+								task_mgo.Save(task_coll, bid_s_data)
+								task_mgo.DeleteById(task_coll, temp_info_id)
+								task_mgo.Save(task_coll, bid_i_data)
 								//通道填充数据
-								msg := "id=" + temp_source_id
-								_ = nspdata_1.Publish(msg)
-								_ = nspdata_2.Publish(msg)
+								if update_ai {
+									msg := "id=" + temp_source_id
+									_ = nsqdata_1.Publish(msg)
+									_ = nsqdata_2.Publish(msg)
+								} else {
+									msg := "id=" + temp_source_id
+									_ = nsqdata_1.Publish(msg)
+								}
 							} else {
 								log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
 							}
@@ -169,7 +171,7 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 	log.Println("当前~判重~结束~", total, "重复~", repeatN)
 	//更新流程记录表
 	updateProcessUdpIdsInfo(qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]))
-	time.Sleep(10 * time.Second)
+	time.Sleep(5 * time.Second)
 	log.Println("判重任务完成...发送下节点udp...")
 	for _, to := range nextNode {
 		sid, _ := mapInfo["gtid"].(string)
@@ -193,6 +195,9 @@ func increaseRepeat(mapInfo map[string]interface{}) {
 
 // 更新流程记录id段落
 func updateProcessUdpIdsInfo(sid string, eid string) {
+	if IsFull {
+		return
+	}
 	//判重有合并操作~所以要联合查询
 	query := map[string]interface{}{
 		"gtid": map[string]interface{}{
@@ -202,20 +207,18 @@ func updateProcessUdpIdsInfo(sid string, eid string) {
 			"$lte": eid,
 		},
 	}
-	datas, _ := task_mgo.Find(task_coll, query, nil, nil)
+	dataprocess["updatetime"] = time.Now().Unix()
+
+	datas, _ := qfw_mgo.Find(qfw_coll, query, nil, nil)
 	if len(datas) > 0 {
 		log.Println("开始更新流程段落记录~~", len(datas), "段")
 		for _, v := range datas {
 			up_id := BsonTOStringId(v["_id"])
 			if up_id != "" {
 				update := map[string]interface{}{
-					"$set": map[string]interface{}{
-						"dataprocess":   6,
-						"repeat_status": 1,
-						"updatetime":    time.Now().Unix(),
-					},
+					"$set": dataprocess,
 				}
-				task_mgo.UpdateById(task_coll, up_id, update)
+				qfw_mgo.UpdateById(qfw_coll, up_id, update)
 				log.Println("流程段落记录~~更新完毕~", update)
 			}
 		}
@@ -223,45 +226,3 @@ func updateProcessUdpIdsInfo(sid string, eid string) {
 		log.Println("未查询到记录id段落~", query)
 	}
 }
-
-// 更新ocr表~弃用
-func updateOcrFileData(cur_lteid string) {
-	//更新ocr 分类表-判重的状态
-	log.Println("开始更新Ocr表-标记", cur_lteid)
-	task_sess := task_mgo.GetMgoConn()
-	defer task_mgo.DestoryMongoConn(task_sess)
-	q_task := map[string]interface{}{}
-	it_last := task_sess.DB(task_mgo.DbName).C(task_coll).Find(&q_task).Sort("-_id").Iter()
-	isUpdateOcr := false
-	updateOcrFile := [][]map[string]interface{}{}
-	for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
-		cur_id := BsonTOStringId(tmp["_id"])
-		lte_id := qu.ObjToString(tmp["lteid"])
-		if lte_id == cur_lteid { //需要更新
-			log.Println("找到该lteid数据", cur_lteid, cur_id)
-			isUpdateOcr = true
-			updateOcrFile = append(updateOcrFile, []map[string]interface{}{ //重复数据打标签
-				map[string]interface{}{
-					"_id": tmp["_id"],
-				},
-				map[string]interface{}{
-					"$set": map[string]interface{}{
-						"is_repeat_status": 1,
-						"is_repeat_time":   qu.Int64All(time.Now().Unix()),
-					},
-				},
-			})
-			tmp = make(map[string]interface{})
-			break
-		} else {
-			tmp = make(map[string]interface{})
-		}
-	}
-	if !isUpdateOcr {
-		log.Println("出现异常问题,查询不到ocr的lteid", cur_lteid)
-	} else {
-		if len(updateOcrFile) > 0 {
-			task_mgo.UpSertBulk(task_coll, updateOcrFile...)
-		}
-	}
-}

+ 80 - 14
udp_repeat/src/initData.go

@@ -3,9 +3,43 @@ package main
 import (
 	"github.com/cron"
 	"github.com/go-xweb/log"
+	mu "mfw/util"
 	"nsqdata"
 	qu "qfw/util"
 	"regexp"
+	"sync"
+)
+
+var (
+	Sysconfig                               map[string]interface{} //配置文件
+	data_mgo, task_mgo, qfw_mgo, spider_mgo *MongodbSim
+	task_coll, qfw_coll, spider_coll        string
+	extract, extract_back, extract_log      string
+	udpclient                               mu.UdpClient
+	nextNode                                []map[string]interface{}
+	dupdays                                 = 7
+	DM, FullDM                              *datamap
+	Update                                  *updateInfo
+	AddGroupPool                            *addGroupInfo
+	//正则筛选相关
+	FilterRegTitle                             = regexp.MustCompile("^_$")
+	FilterRegTitle_0                           = regexp.MustCompile("^_$")
+	FilterRegTitle_1                           = regexp.MustCompile("^_$")
+	FilterRegTitle_2                           = regexp.MustCompile("^_$")
+	threadNum                                  int
+	SiteMap                                    map[string]map[string]interface{}
+	LowHeavy, TimingTask, IsFull, isUpdateSite bool
+	timingSpanDay, timingPubScope              int64
+	gtid, lastid, sec_gtid, sec_lteid, lteid   string
+	updatelock, datalock, numlock, cronlock    sync.Mutex
+	jyfb_data                                  map[string]string
+	taskList                                   []map[string]interface{}
+	nsqdata_1, nsqdata_2                       *nsqdata.Producer
+	responselock                               sync.Mutex
+	lastNodeResponse                           int64
+	update_ai                                  bool
+	dataprocess                                map[string]interface{}
+	nsq_addr                                   string
 )
 
 func initMgo() {
@@ -36,7 +70,17 @@ func initMgo() {
 	}
 	task_mgo.InitPool()
 	task_coll = task_mconf["task_coll"].(string)
-	task_bidding = task_mconf["task_bidding"].(string)
+
+	qfw_mconf := Sysconfig["qfw_mongodb"].(map[string]interface{})
+	qfw_mgo = &MongodbSim{
+		MongodbAddr: qfw_mconf["qfw_addr"].(string),
+		DbName:      qfw_mconf["qfw_db"].(string),
+		Size:        qu.IntAllDef(qfw_mconf["qfw_pool"], 10),
+		UserName:    qfw_mconf["username"].(string),
+		Password:    qfw_mconf["password"].(string),
+	}
+	qfw_mgo.InitPool()
+	qfw_coll = qfw_mconf["qfw_coll"].(string)
 
 	nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
 	mconf := Sysconfig["mongodb"].(map[string]interface{})
@@ -52,7 +96,6 @@ func initMgo() {
 	extract = mconf["extract"].(string)
 	extract_back = mconf["extract_back"].(string)
 	extract_log = mconf["extract_log"].(string)
-
 	FilterRegTitle = regexp.MustCompile(qu.ObjToString(Sysconfig["specialwords"]))
 	FilterRegTitle_0 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_0"]))
 	FilterRegTitle_1 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_1"]))
@@ -62,26 +105,49 @@ func initMgo() {
 	TimingTask = Sysconfig["timingTask"].(bool)
 	timingSpanDay = qu.Int64All(Sysconfig["timingSpanDay"])
 	timingPubScope = qu.Int64All(Sysconfig["timingPubScope"])
+
+	nsq_addr = Sysconfig["nsq_addr"].(string)
+	update_ai = Sysconfig["update_ai"].(bool)
+	if !update_ai {
+		dataprocess = map[string]interface{}{
+			"dataprocess":   6,
+			"repeat_status": 1,
+		}
+	} else {
+		dataprocess = map[string]interface{}{
+			"dataprocess_ai":   5,
+			"repeat_status_ai": 1,
+		}
+	}
 }
 func initOther() {
 	dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5)
 	DM = NewDatamap(dupdays, lastid)
 	Update = newUpdatePool()
 	go Update.updateData()
-	nsqAddr := "172.17.162.36:4150"
 	if !IsFull {
-		var err error
-		nspdata_1, err = nsqdata.NewProducer(nsqAddr, "bidding_id", true)
-		if err != nil {
-			log.Fatal("通道配置异常~", err)
-		} else {
-			log.Println("通道配置正常")
-		}
-		nspdata_2, err = nsqdata.NewProducer(nsqAddr, "project_id", true)
-		if err != nil {
-			log.Fatal("通道配置异常~", err)
+		if update_ai {
+			var err error
+			nsqdata_1, err = nsqdata.NewProducer(nsq_addr, "bidding_id_ai", true)
+			if err != nil {
+				log.Fatal("通道配置异常~bidding_id_ai", err)
+			} else {
+				log.Println("通道配置正常~bidding_id_ai")
+			}
+			nsqdata_2, err = nsqdata.NewProducer(nsq_addr, "project_id_ai", true)
+			if err != nil {
+				log.Fatal("通道配置异常~project_id_ai", err)
+			} else {
+				log.Println("通道配置正常~project_id_ai")
+			}
 		} else {
-			log.Println("通道配置正常~")
+			var err error
+			nsqdata_1, err = nsqdata.NewProducer(nsq_addr, "bidding_id", true)
+			if err != nil {
+				log.Fatal("通道配置异常~bidding_id_ai", err)
+			} else {
+				log.Println("通道配置正常~bidding_id_ai")
+			}
 		}
 	}
 	c := cron.New()

+ 15 - 47
udp_repeat/src/main.go

@@ -11,42 +11,10 @@ import (
 	"log"
 	mu "mfw/util"
 	"net"
-	"nsqdata"
 	qu "qfw/util"
-	"regexp"
-	"sync"
 	"time"
 )
 
-var (
-	Sysconfig                            map[string]interface{} //配置文件
-	data_mgo, task_mgo, spider_mgo       *MongodbSim
-	task_coll, task_bidding, spider_coll string
-	extract, extract_back, extract_log   string
-	udpclient                            mu.UdpClient
-	nextNode                             []map[string]interface{}
-	dupdays                              = 7
-	DM, FullDM                           *datamap
-	Update                               *updateInfo
-	AddGroupPool                         *addGroupInfo
-	//正则筛选相关
-	FilterRegTitle                             = regexp.MustCompile("^_$")
-	FilterRegTitle_0                           = regexp.MustCompile("^_$")
-	FilterRegTitle_1                           = regexp.MustCompile("^_$")
-	FilterRegTitle_2                           = regexp.MustCompile("^_$")
-	threadNum                                  int
-	SiteMap                                    map[string]map[string]interface{}
-	LowHeavy, TimingTask, IsFull, isUpdateSite bool
-	timingSpanDay, timingPubScope              int64
-	gtid, lastid, sec_gtid, sec_lteid, lteid   string
-	updatelock, datalock, numlock, cronlock    sync.Mutex
-	jyfb_data                                  map[string]string
-	taskList                                   []map[string]interface{}
-	nspdata_1, nspdata_2                       *nsqdata.Producer
-	responselock                               sync.Mutex
-	lastNodeResponse                           int64
-)
-
 // 初始化加载
 func init() {
 	flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
@@ -60,7 +28,8 @@ func init() {
 	initSite()
 }
 
-func mainT() {
+func main() {
+	IsFull = true //当前时间15天
 	increaseRepeat(map[string]interface{}{
 		"gtid":  "12ec61170ae152a3c2310f02",
 		"lteid": "92ec61170ae152a3c2310f02",
@@ -68,21 +37,8 @@ func mainT() {
 	time.Sleep(99999 * time.Hour)
 }
 
-func lastUdpJob() {
-	for {
-		responselock.Lock()
-		if time.Now().Unix()-lastNodeResponse >= 1800 {
-			lastNodeResponse = time.Now().Unix() //重置时间
-			sendErrMailApi("判重增量~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入判重增量流程...相关人员检查..."))
-		}
-		responselock.Unlock()
-		time.Sleep(300 * time.Second)
-	}
-}
-
 // 主函数
-func main() {
-
+func mainT() {
 	go checkMailJob()
 	lastNodeResponse = time.Now().Unix()
 	updport := Sysconfig["udpport"].(string)
@@ -182,3 +138,15 @@ func getRepeatTask() {
 		}
 	}
 }
+
+func lastUdpJob() {
+	for {
+		responselock.Lock()
+		if time.Now().Unix()-lastNodeResponse >= 1800 {
+			lastNodeResponse = time.Now().Unix() //重置时间
+			sendErrMailApi("判重增量~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入判重增量流程...相关人员检查..."))
+		}
+		responselock.Unlock()
+		time.Sleep(300 * time.Second)
+	}
+}