|
@@ -16,6 +16,9 @@ import (
|
|
|
"regexp"
|
|
|
"sync"
|
|
|
"time"
|
|
|
+
|
|
|
+ "github.com/cron"
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
)
|
|
|
|
|
|
var (
|
|
@@ -23,12 +26,14 @@ var (
|
|
|
mconf map[string]interface{} //mongodb配置信息
|
|
|
mgo *MongodbSim //mongodb操作对象
|
|
|
extract string
|
|
|
+ extract_back string
|
|
|
udpclient mu.UdpClient //udp对象
|
|
|
nextNode []map[string]interface{} //下节点数组
|
|
|
dupdays = 5 //初始化判重范围
|
|
|
DM *datamap //
|
|
|
HM *historymap //判重数据
|
|
|
- lastid = ""
|
|
|
+
|
|
|
+ lastid = ""
|
|
|
|
|
|
//正则筛选相关
|
|
|
FilterRegTitle = regexp.MustCompile("^_$")
|
|
@@ -36,12 +41,15 @@ var (
|
|
|
FilterRegTitle_1 = regexp.MustCompile("^_$")
|
|
|
FilterRegTitle_2 = regexp.MustCompile("^_$")
|
|
|
|
|
|
- isMerger bool //是否合并
|
|
|
- Is_Sort bool //是否排序
|
|
|
- threadNum int //线程数量
|
|
|
- SiteMap map[string]map[string]interface{} //站点map
|
|
|
- LowHeavy bool //低质量数据判重
|
|
|
- sid, eid string //测试人员判重使用
|
|
|
+ isMerger bool //是否合并
|
|
|
+ Is_Sort bool //是否排序
|
|
|
+ threadNum int //线程数量
|
|
|
+ SiteMap map[string]map[string]interface{} //站点map
|
|
|
+ LowHeavy bool //低质量数据判重
|
|
|
+ TimingTask bool //是否定时任务
|
|
|
+ timingSpanDay int64 //时间跨度
|
|
|
+ timingPubScope int64 //发布时间周期
|
|
|
+ sid, eid string //测试人员判重使用
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
@@ -60,6 +68,8 @@ func init() {
|
|
|
}
|
|
|
mgo.InitPool()
|
|
|
extract = mconf["extract"].(string)
|
|
|
+ extract_back = mconf["extract_back"].(string)
|
|
|
+
|
|
|
dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
|
|
|
//加载数据
|
|
|
DM = NewDatamap(dupdays, lastid)
|
|
@@ -70,7 +80,11 @@ func init() {
|
|
|
isMerger = Sysconfig["isMerger"].(bool)
|
|
|
Is_Sort = Sysconfig["isSort"].(bool)
|
|
|
threadNum = util.IntAllDef(Sysconfig["threads"], 1)
|
|
|
- LowHeavy = Sysconfig["lowHeavy"].(bool)
|
|
|
+ LowHeavy = Sysconfig["lowHeavy"].(bool)
|
|
|
+ TimingTask = Sysconfig["timingTask"].(bool)
|
|
|
+ timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
|
|
|
+ timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
|
|
|
+
|
|
|
//站点配置
|
|
|
site := mconf["site"].(map[string]interface{})
|
|
|
SiteMap = make(map[string]map[string]interface{}, 0)
|
|
@@ -95,8 +109,13 @@ func main() {
|
|
|
go checkMapJob()
|
|
|
updport := Sysconfig["udpport"].(string)
|
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
|
- udpclient.Listen(processUdpMsg)
|
|
|
- log.Println("Udp服务监听", updport)
|
|
|
+ if TimingTask {
|
|
|
+ go timedTaskDay()
|
|
|
+ } else {
|
|
|
+ udpclient.Listen(processUdpMsg)
|
|
|
+ log.Println("Udp服务监听", updport)
|
|
|
+ }
|
|
|
+
|
|
|
time.Sleep(99999 * time.Hour)
|
|
|
}
|
|
|
|
|
@@ -110,21 +129,24 @@ func mainT() {
|
|
|
ObjectId("5db2735ba5cb26b9b7c99c6f")
|
|
|
*/
|
|
|
log.Println("测试开始")
|
|
|
- sid = "5da3f2c5a5cb26b9b79847fc"
|
|
|
- eid = "5db2735ba5cb26b9b7c99c6f"
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- mapinfo := map[string]interface{}{}
|
|
|
- if sid == "" || eid == "" {
|
|
|
- log.Println("sid,eid参数不能为空")
|
|
|
- os.Exit(0)
|
|
|
+ if TimingTask {
|
|
|
+ go timedTaskDay()
|
|
|
+ time.Sleep(99999 * time.Hour)
|
|
|
+ }else {
|
|
|
+ //sid = "5da3f2c5a5cb26b9b79847fc"
|
|
|
+ //eid = "5db2735ba5cb26b9b7c99c6f"
|
|
|
+ log.Println(sid,"---",eid)
|
|
|
+ mapinfo := map[string]interface{}{}
|
|
|
+ if sid == "" || eid == "" {
|
|
|
+ log.Println("sid,eid参数不能为空")
|
|
|
+ os.Exit(0)
|
|
|
+ }
|
|
|
+ mapinfo["gtid"] = sid
|
|
|
+ mapinfo["lteid"] = eid
|
|
|
+ mapinfo["stop"] = "true"
|
|
|
+ task([]byte{}, mapinfo)
|
|
|
+ time.Sleep(10 * time.Second)
|
|
|
}
|
|
|
- mapinfo["gtid"] = sid
|
|
|
- mapinfo["lteid"] = eid
|
|
|
- mapinfo["stop"] = "true"
|
|
|
- task([]byte{}, mapinfo)
|
|
|
- time.Sleep(10 * time.Second)
|
|
|
}
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
fmt.Println("接受的段数据")
|
|
@@ -139,7 +161,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
} else if mapInfo != nil {
|
|
|
taskType := util.ObjToString(mapInfo["stype"])
|
|
|
if taskType == "historyTask" {
|
|
|
- //更新流程
|
|
|
+ //历史更新流程
|
|
|
go historyTask(data, mapInfo)
|
|
|
} else if taskType == "normalTask" {
|
|
|
//判重流程
|
|
@@ -196,7 +218,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
if n%10000 == 0 {
|
|
|
log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
|
|
|
}
|
|
|
- if util.IntAll(tmp["repeat"]) == 1 {
|
|
|
+ if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
|
|
|
tmp = make(map[string]interface{})
|
|
|
repeateN++
|
|
|
continue
|
|
@@ -209,7 +231,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
wg.Done()
|
|
|
}()
|
|
|
info := NewInfo(tmp)
|
|
|
- if !LowHeavy { //是否进行低质量数据判重
|
|
|
+ if !LowHeavy { //是否进行低质量数据判重
|
|
|
if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
map[string]interface{}{
|
|
@@ -217,7 +239,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
},
|
|
|
map[string]interface{}{
|
|
|
"$set": map[string]interface{}{
|
|
|
- "repeat": -1,//无效数据标签
|
|
|
+ "repeat": -1, //无效数据标签
|
|
|
},
|
|
|
},
|
|
|
})
|
|
@@ -239,9 +261,9 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- repeat_id := source.id//初始化一个数据
|
|
|
+ repeat_id := source.id //初始化一个数据
|
|
|
|
|
|
- if isMerger {//合并相关
|
|
|
+ if isMerger { //合并相关
|
|
|
basic_bool := basicDataScore(source, info)
|
|
|
if basic_bool {
|
|
|
//已原始数据为标准 - 对比数据打判重标签-
|
|
@@ -305,7 +327,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
merge_map,
|
|
|
})
|
|
|
}
|
|
|
- }else { //高质量数据
|
|
|
+ } else { //高质量数据
|
|
|
basic_bool := basicDataScore(source, info)
|
|
|
if !basic_bool {
|
|
|
DM.replaceSourceData(info, source.id) //替换
|
|
@@ -371,7 +393,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
sess := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
|
|
|
- q:= map[string]interface{}{
|
|
|
+ q := map[string]interface{}{
|
|
|
"_id": map[string]interface{}{
|
|
|
"$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
@@ -436,6 +458,11 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
if n%10000 == 0 {
|
|
|
log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
|
|
|
}
|
|
|
+ if util.IntAll(tmp["dataging"]) == 1 {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
pool <- true
|
|
|
wg.Add(1)
|
|
|
go func(tmp map[string]interface{}) {
|
|
@@ -444,7 +471,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
wg.Done()
|
|
|
}()
|
|
|
info := NewInfo(tmp)
|
|
|
- if !LowHeavy { //是否进行低质量数据判重
|
|
|
+ if !LowHeavy { //是否进行低质量数据判重
|
|
|
if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
map[string]interface{}{
|
|
@@ -452,7 +479,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
},
|
|
|
map[string]interface{}{
|
|
|
"$set": map[string]interface{}{
|
|
|
- "repeat": -1,//无效数据标签
|
|
|
+ "repeat": -1, //无效数据标签
|
|
|
},
|
|
|
},
|
|
|
})
|
|
@@ -556,7 +583,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
merge_map,
|
|
|
})
|
|
|
}
|
|
|
- }else { //高质量数据
|
|
|
+ } else { //高质量数据
|
|
|
basic_bool := basicDataScore(source, info)
|
|
|
if !basic_bool {
|
|
|
HM.replaceSourceData(info, source.id) //替换
|
|
@@ -616,6 +643,290 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+//定时任务
|
|
|
+func timedTaskDay() {
|
|
|
+ c := cron.New()
|
|
|
+ c.AddFunc("0 0 0 * * ?", func() { timedTaskOnce() }) //每天凌晨执行一次
|
|
|
+ c.AddFunc("0 0 2 * * ?", func() { movedata() }) //每天凌晨1点执行一次
|
|
|
+ c.Start()
|
|
|
+ timedTaskOnce()
|
|
|
+}
|
|
|
+func timedTaskOnce() {
|
|
|
+
|
|
|
+ log.Println("开始一次定时任务")
|
|
|
+ defer util.Catch()
|
|
|
+
|
|
|
+
|
|
|
+ now := time.Now()
|
|
|
+ preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
|
|
|
+ curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
|
|
|
+ task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
|
|
|
+ task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
|
|
|
+
|
|
|
+ //发布时间间隔时间 半年
|
|
|
+ //测试数据 6点每个间隔6个月
|
|
|
+ //task_sid = "5e20965785a9271abf0ad6bd"
|
|
|
+ //task_eid = "5e20968d85a9271abf0ad6c2"
|
|
|
+ //between_time := int64(1565801997)
|
|
|
+
|
|
|
+ //测试数据 180个点 每个隔1天
|
|
|
+ //task_sid = "5e208f9b50b5ea296eccbb8a"
|
|
|
+ //task_eid = "5e20968d85a9271abf0ad6c2"
|
|
|
+ //between_time := int64(1563641997)
|
|
|
+
|
|
|
+ between_time := curTime.Unix()-(86400*timingPubScope)
|
|
|
+ lasttime := int64(0)
|
|
|
+ log.Println(task_sid, task_eid,curTime.Unix(),between_time)
|
|
|
+ //区间id
|
|
|
+ q_start := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gte": StringTOBsonId(task_sid),
|
|
|
+ "$lte": StringTOBsonId(task_eid),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ sess_start := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess_start)
|
|
|
+ it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
|
|
|
+ startNum := 0
|
|
|
+ for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
|
|
|
+
|
|
|
+ if startNum%10000 == 0 {
|
|
|
+ log.Println("正序遍历:", startNum)
|
|
|
+ }
|
|
|
+ //取-符合-发布时间半年内的数据
|
|
|
+ if util.IntAll(tmp_start["dataging"]) == 1 {
|
|
|
+ pubtime := util.Int64All(tmp_start["publishtime"])
|
|
|
+ //log.Println(startNum,"--",pubtime,"--",between_time)
|
|
|
+ if pubtime>0 && pubtime>=between_time {
|
|
|
+ lasttime = pubtime
|
|
|
+ log.Println("找到第一条符合条件的数据")
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("... ...",lasttime,)
|
|
|
+ if lasttime <=0 {
|
|
|
+ log.Println("没找到dataging==1的数据")
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ //构建第一条需要判重的数据 (数据池)
|
|
|
+ log.Println("开始构建第一条需要判重的数据 ---(数据池)")
|
|
|
+ DM = TimedTaskDatamap(dupdays,lasttime)
|
|
|
+
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gte": StringTOBsonId(task_sid),
|
|
|
+ "$lte": StringTOBsonId(task_eid),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
+ updateExtract := [][]map[string]interface{}{}
|
|
|
+ log.Println("线程数只能为1")
|
|
|
+ pool := make(chan bool, threadNum)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ n, repeateN := 0, 0
|
|
|
+ pre_publishtime :=int64(0)
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
|
|
|
+ if n%10000 == 0 {
|
|
|
+ log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
|
|
|
+ }
|
|
|
+
|
|
|
+ //log.Println("当前测试重复数量:",repeateN)
|
|
|
+ if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if util.IntAll(tmp["dataging"]) != 1 {
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+
|
|
|
+ //log.Println("上个时间:",pre_publishtime,"当前时间--",util.Int64All(tmp["publishtime"]))
|
|
|
+
|
|
|
+ if pre_publishtime==0 {
|
|
|
+ pre_publishtime = util.Int64All(tmp["publishtime"])
|
|
|
+ }else {
|
|
|
+ //时间跨度是否大于X天
|
|
|
+ if (util.Int64All(tmp["publishtime"])-pre_publishtime) >=(86400*timingSpanDay) {
|
|
|
+ //重新构建数据池
|
|
|
+ //log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
|
|
|
+ pre_publishtime = util.Int64All(tmp["publishtime"])
|
|
|
+ DM = TimedTaskDatamap(dupdays,pre_publishtime)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ info := NewInfo(tmp)
|
|
|
+ if !LowHeavy { //是否进行低质量数据判重
|
|
|
+ if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
+ log.Println("测试-无效数据")
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": -1, //无效数据标签
|
|
|
+ "dataging": 0,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) > 500 {
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ b, source, reason := DM.check(info)
|
|
|
+ log.Println("判重结果",b,reason)
|
|
|
+ if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
|
+ repeateN++
|
|
|
+ var is_replace = false
|
|
|
+ var mergeArr = []int64{} //更改合并数组记录
|
|
|
+ var newData = &Info{} //更换新的数据池数据
|
|
|
+ var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
|
+ var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id := source.id //初始化一个数据
|
|
|
+
|
|
|
+ if isMerger { //合并相关
|
|
|
+ basic_bool := basicDataScore(source, info)
|
|
|
+ if basic_bool {
|
|
|
+ //已原始数据为标准 - 对比数据打判重标签-
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
+ DM.replaceSourceData(newData, source.id) //替换
|
|
|
+ //对比数据打重复标签的id,原始数据id的记录
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id = source.id
|
|
|
+ } else {
|
|
|
+ //已对比数据为标准 ,数据池的数据打判重标签
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
+ DM.replaceSourceData(newData, source.id) //替换
|
|
|
+ //原始数据打重复标签的id, 对比数据id的记录
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ repeat_id = info.id
|
|
|
+ }
|
|
|
+
|
|
|
+ merge_map := make(map[string]interface{}, 0)
|
|
|
+ if is_replace { //有过合并-更新数据
|
|
|
+ merge_map = map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "merge": newData.mergemap,
|
|
|
+ "dataging": 0,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ //更新合并后的数据
|
|
|
+ for _, value := range mergeArr {
|
|
|
+ if value == 0 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
+ merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
+ } else if value == 1 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
+ merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
+ } else if value == 2 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
|
|
|
+ } else if value == 3 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
|
|
|
+ } else if value == 4 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
|
|
|
+ } else if value == 5 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
|
|
|
+ } else if value == 6 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
|
|
|
+ } else if value == 7 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
|
|
|
+ } else if value == 8 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
|
+ } else if value == 9 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
|
+ } else if value == 10 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
|
+ } else if value == 11 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
|
+ } else {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //模板数据更新
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ merge_idMap,
|
|
|
+ merge_map,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ } else { //高质量数据
|
|
|
+ basic_bool := basicDataScore(source, info)
|
|
|
+ if !basic_bool {
|
|
|
+ DM.replaceSourceData(info, source.id) //替换
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id = info.id
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //重复数据打标签
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ repeat_idMap,
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 1,
|
|
|
+ "repeat_reason": reason,
|
|
|
+ "repeat_id": repeat_id,
|
|
|
+ "dataging": 0,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+
|
|
|
+ }
|
|
|
+ }(tmp)
|
|
|
+ if len(updateExtract) > 500 {
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ if len(updateExtract) > 0 {
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ }
|
|
|
+ log.Println("this timeTask over.", n, "repeateN:", repeateN)
|
|
|
+
|
|
|
+ //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
|
|
|
+ if n > repeateN {
|
|
|
+ for _, to := range nextNode {
|
|
|
+ next_sid := util.BsonIdToSId(task_sid)
|
|
|
+ next_eid := util.BsonIdToSId(task_eid)
|
|
|
+ key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
|
|
|
+ by, _ := json.Marshal(map[string]interface{}{
|
|
|
+ "gtid": next_sid,
|
|
|
+ "lteid": next_eid,
|
|
|
+ "stype": util.ObjToString(to["stype"]),
|
|
|
+ "key": key,
|
|
|
+ })
|
|
|
+ addr := &net.UDPAddr{
|
|
|
+ IP: net.ParseIP(to["addr"].(string)),
|
|
|
+ Port: util.IntAll(to["port"]),
|
|
|
+ }
|
|
|
+ node := &udpNode{by, addr, time.Now().Unix(), 0}
|
|
|
+ udptaskmap.Store(key, node)
|
|
|
+ udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
//合并字段-并更新merge字段的值
|
|
|
func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
|
|
|
|
|
@@ -922,3 +1233,28 @@ func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
|
|
|
}
|
|
|
return false
|
|
|
}
|
|
|
+
|
|
|
+//迁移数据dupdays+5之前的数据
|
|
|
+func movedata() {
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ year, month, day := time.Now().Date()
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+5) * 24 * time.Hour).Unix(),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Println(q)
|
|
|
+ it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
|
|
|
+ index := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
+ mgo.Save(extract+"_back", tmp)
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ if index%1000 == 0 {
|
|
|
+ log.Println("index", index)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Println("save to", extract+"_back", " ok index", index)
|
|
|
+ delnum := mgo.Delete(extract, q)
|
|
|
+ log.Println("remove from ", extract, delnum)
|
|
|
+}
|