|
@@ -124,20 +124,23 @@ func main() {
|
|
|
|
|
|
//测试组人员使用
|
|
|
func mainT() {
|
|
|
+
|
|
|
+ //analysNoRepeatDataTest()
|
|
|
+ //return
|
|
|
+
|
|
|
if TimingTask {
|
|
|
log.Println("定时任务测试开始")
|
|
|
go timedTaskDay()
|
|
|
time.Sleep(99999 * time.Hour)
|
|
|
} else {
|
|
|
- //2019年8月1日-8月17日 712646
|
|
|
+
|
|
|
/*
|
|
|
- sid = "5d55031fa5cb26b9b7f57570"
|
|
|
- eid = "5e8c02b150b5ea296eed4509"
|
|
|
- 5e933b1a50b5ea296ef0e839
|
|
|
+ 5ef01220801f744d045f51f1
|
|
|
+ 5ef61eb3801f744d046402dd
|
|
|
*/
|
|
|
//IdType = true
|
|
|
- sid = "5ee1d3d59e628c599167adf1"
|
|
|
- eid = "5eea4291801f744d045c3169"
|
|
|
+ sid = "5ef01220801f744d045f51f1"
|
|
|
+ eid = "5ef61eb3801f744d046402dd"
|
|
|
log.Println("正常判重测试开始")
|
|
|
log.Println(sid, "---", eid)
|
|
|
mapinfo := map[string]interface{}{}
|
|
@@ -240,6 +243,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
map[string]interface{}{
|
|
|
"$set": map[string]interface{}{
|
|
|
"repeat": 1,
|
|
|
+ "dataging":0,
|
|
|
"repeat_reason": "sourcewebsite为1,重复",
|
|
|
},
|
|
|
},
|
|
@@ -309,33 +313,44 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
repeat_id := source.id //初始化一个数据
|
|
|
|
|
|
if isMerger { //合并相关
|
|
|
- basic_bool := basicDataScore(source, info)
|
|
|
- if basic_bool {
|
|
|
- //已原始数据为标准 - 对比数据打判重标签-
|
|
|
- newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
- DM.replaceSourceData(newData, source) //替换
|
|
|
- //对比数据打重复标签的id,原始数据id的记录
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
-
|
|
|
- if IdType {
|
|
|
- repeat_idMap["_id"] = info.id
|
|
|
- merge_idMap["_id"] = source.id
|
|
|
- }
|
|
|
- repeat_id = source.id
|
|
|
- } else {
|
|
|
- //已对比数据为标准 ,数据池的数据打判重标签
|
|
|
- newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
- DM.replaceSourceData(newData, source) //替换
|
|
|
- //原始数据打重复标签的id, 对比数据id的记录
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- if IdType {
|
|
|
- repeat_idMap["_id"] = source.id
|
|
|
- merge_idMap["_id"] = info.id
|
|
|
- }
|
|
|
- repeat_id = info.id
|
|
|
+
|
|
|
+ //已原始数据为标准 - 对比数据打判重标签-
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
+ //对比数据打重复标签的id,原始数据id的记录
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+
|
|
|
+ if IdType {
|
|
|
+ repeat_idMap["_id"] = info.id
|
|
|
+ merge_idMap["_id"] = source.id
|
|
|
}
|
|
|
+ repeat_id = source.id
|
|
|
+ //basic_bool := basicDataScore(source, info)
|
|
|
+ //if basic_bool {
|
|
|
+ // //已原始数据为标准 - 对比数据打判重标签-
|
|
|
+ // newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
+ // //对比数据打重复标签的id,原始数据id的记录
|
|
|
+ // repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ // merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ //
|
|
|
+ // if IdType {
|
|
|
+ // repeat_idMap["_id"] = info.id
|
|
|
+ // merge_idMap["_id"] = source.id
|
|
|
+ // }
|
|
|
+ // repeat_id = source.id
|
|
|
+ //} else {
|
|
|
+ // //已对比数据为标准 ,数据池的数据打判重标签
|
|
|
+ // newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
+ // DM.replaceSourceData(newData, source) //替换
|
|
|
+ // //原始数据打重复标签的id, 对比数据id的记录
|
|
|
+ // repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ // merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ // if IdType {
|
|
|
+ // repeat_idMap["_id"] = source.id
|
|
|
+ // merge_idMap["_id"] = info.id
|
|
|
+ // }
|
|
|
+ // repeat_id = info.id
|
|
|
+ //}
|
|
|
|
|
|
merge_map := make(map[string]interface{}, 0)
|
|
|
if is_replace { //有过合并-更新数据
|
|
@@ -381,51 +396,47 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
merge_map,
|
|
|
})
|
|
|
}
|
|
|
- } else { //高质量数据
|
|
|
- basic_bool := basicDataScore(source, info)
|
|
|
- if !basic_bool {
|
|
|
- DM.replaceSourceData(info, source) //替换
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- if IdType {
|
|
|
- repeat_idMap["_id"] = source.id
|
|
|
- }
|
|
|
- repeat_id = info.id
|
|
|
- if len(ids)>=9 {
|
|
|
- ids=append(ids,source.id)
|
|
|
-
|
|
|
-
|
|
|
- for _, to := range nextNode {
|
|
|
-
|
|
|
- key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
|
|
|
- by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": source.id,
|
|
|
- "lteid": source.id,
|
|
|
- "stype": util.ObjToString(to["stype"]),
|
|
|
- "key": key,
|
|
|
- "ids": strings.Join(ids, ","),
|
|
|
- })
|
|
|
- addr := &net.UDPAddr{
|
|
|
- IP: net.ParseIP(to["addr"].(string)),
|
|
|
- Port: util.IntAll(to["port"]),
|
|
|
- }
|
|
|
- node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
- udptaskmap.Store(key, node)
|
|
|
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- ids = []string{}
|
|
|
- }else {
|
|
|
- ids=append(ids,source.id)
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
+ } else { //高质量数据-备份
|
|
|
+
|
|
|
+ //basic_bool := basicDataScore(source, info)
|
|
|
+ //if !basic_bool {
|
|
|
+ // DM.replaceSourceData(info, source) //替换
|
|
|
+ // repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ // if IdType {
|
|
|
+ // repeat_idMap["_id"] = source.id
|
|
|
+ // }
|
|
|
+ // repeat_id = info.id
|
|
|
+ // if len(ids)>=9 {
|
|
|
+ // ids=append(ids,source.id)
|
|
|
+ //
|
|
|
+ //
|
|
|
+ // for _, to := range nextNode {
|
|
|
+ //
|
|
|
+ // key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
|
|
|
+ // by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ // "gtid": source.id,
|
|
|
+ // "lteid": source.id,
|
|
|
+ // "stype": util.ObjToString(to["stype"]),
|
|
|
+ // "key": key,
|
|
|
+ // "ids": strings.Join(ids, ","),
|
|
|
+ // })
|
|
|
+ // addr := &net.UDPAddr{
|
|
|
+ // IP: net.ParseIP(to["addr"].(string)),
|
|
|
+ // Port: util.IntAll(to["port"]),
|
|
|
+ // }
|
|
|
+ // node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
+ // udptaskmap.Store(key, node)
|
|
|
+ // udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // //
|
|
|
+ // ids = []string{}
|
|
|
+ // }else {
|
|
|
+ // ids=append(ids,source.id)
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ //}
|
|
|
}
|
|
|
- //if repeateN%150==0&&repeateN>0 {
|
|
|
- // fmt.Println("最终结果","目标id:",repeat_idMap["_id"])
|
|
|
- //}
|
|
|
-
|
|
|
-
|
|
|
|
|
|
//重复数据打标签
|
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|