|
@@ -8,6 +8,7 @@ import (
|
|
|
"encoding/json"
|
|
|
"flag"
|
|
|
"fmt"
|
|
|
+ "github.com/cron"
|
|
|
"log"
|
|
|
mu "mfw/util"
|
|
|
"net"
|
|
@@ -16,19 +17,20 @@ import (
|
|
|
"regexp"
|
|
|
"sync"
|
|
|
"time"
|
|
|
+
|
|
|
+ "gopkg.in/mgo.v2/bson"
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- Sysconfig map[string]interface{} //配置文件
|
|
|
- mconf map[string]interface{} //mongodb配置信息
|
|
|
- mgo *MongodbSim //mongodb操作对象
|
|
|
- extract string
|
|
|
- udpclient mu.UdpClient //udp对象
|
|
|
- nextNode []map[string]interface{} //下节点数组
|
|
|
- dupdays = 5 //初始化判重范围
|
|
|
- DM *datamap //
|
|
|
- HM *historymap //判重数据
|
|
|
- lastid = ""
|
|
|
+ Sysconfig map[string]interface{} //配置文件
|
|
|
+ mconf map[string]interface{} //mongodb配置信息
|
|
|
+ mgo *MongodbSim //mongodb操作对象
|
|
|
+ extract string
|
|
|
+ extract_back string
|
|
|
+ udpclient mu.UdpClient //udp对象
|
|
|
+ nextNode []map[string]interface{} //下节点数组
|
|
|
+ dupdays = 5 //初始化判重范围
|
|
|
+ DM *datamap //
|
|
|
|
|
|
//正则筛选相关
|
|
|
FilterRegTitle = regexp.MustCompile("^_$")
|
|
@@ -36,15 +38,20 @@ var (
|
|
|
FilterRegTitle_1 = regexp.MustCompile("^_$")
|
|
|
FilterRegTitle_2 = regexp.MustCompile("^_$")
|
|
|
|
|
|
- isMerger bool //是否合并
|
|
|
- Is_Sort bool //是否排序
|
|
|
- threadNum int //线程数量
|
|
|
- SiteMap map[string]map[string]interface{} //站点map
|
|
|
- LowHeavy bool //低质量数据判重
|
|
|
- sid, eid string //测试人员判重使用
|
|
|
+ isMerger bool //是否合并
|
|
|
+ Is_Sort bool //是否排序
|
|
|
+ threadNum int //线程数量
|
|
|
+ SiteMap map[string]map[string]interface{} //站点map
|
|
|
+ LowHeavy bool //低质量数据判重
|
|
|
+ TimingTask bool //是否定时任务
|
|
|
+ timingSpanDay int64 //时间跨度
|
|
|
+ timingPubScope int64 //发布时间周期
|
|
|
+ sid,eid,lastid string //测试人员判重使用
|
|
|
+ IdType bool //默认object类型
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
|
+ //5ea9a4800000000000000000
|
|
|
flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
|
|
|
flag.StringVar(&sid, "sid", "", "开始id")
|
|
|
flag.StringVar(&eid, "eid", "", "结束id")
|
|
@@ -60,6 +67,8 @@ func init() {
|
|
|
}
|
|
|
mgo.InitPool()
|
|
|
extract = mconf["extract"].(string)
|
|
|
+ extract_back = mconf["extract_back"].(string)
|
|
|
+
|
|
|
dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
|
|
|
//加载数据
|
|
|
DM = NewDatamap(dupdays, lastid)
|
|
@@ -70,7 +79,11 @@ func init() {
|
|
|
isMerger = Sysconfig["isMerger"].(bool)
|
|
|
Is_Sort = Sysconfig["isSort"].(bool)
|
|
|
threadNum = util.IntAllDef(Sysconfig["threads"], 1)
|
|
|
- LowHeavy = Sysconfig["lowHeavy"].(bool)
|
|
|
+ LowHeavy = Sysconfig["lowHeavy"].(bool)
|
|
|
+ TimingTask = Sysconfig["timingTask"].(bool)
|
|
|
+ timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
|
|
|
+ timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
|
|
|
+
|
|
|
//站点配置
|
|
|
site := mconf["site"].(map[string]interface{})
|
|
|
SiteMap = make(map[string]map[string]interface{}, 0)
|
|
@@ -85,10 +98,11 @@ func init() {
|
|
|
"district": util.ObjToString(site_dict["district"]),
|
|
|
"sitetype": util.ObjToString(site_dict["sitetype"]),
|
|
|
"level": util.ObjToString(site_dict["level"]),
|
|
|
+ "weight": util.ObjToString(site_dict["weight"]),
|
|
|
}
|
|
|
SiteMap[util.ObjToString(site_dict["site"])] = data_map
|
|
|
}
|
|
|
- log.Printf("站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
|
|
|
+ log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
|
|
|
}
|
|
|
|
|
|
func main() {
|
|
@@ -97,34 +111,38 @@ func main() {
|
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
|
udpclient.Listen(processUdpMsg)
|
|
|
log.Println("Udp服务监听", updport)
|
|
|
+ if TimingTask {
|
|
|
+ go timedTaskDay()
|
|
|
+ }
|
|
|
+
|
|
|
time.Sleep(99999 * time.Hour)
|
|
|
}
|
|
|
|
|
|
//测试组人员使用
|
|
|
func mainT() {
|
|
|
- /*
|
|
|
- ObjectId("5da3f31aa5cb26b9b798d3aa")
|
|
|
- ObjectId("5da418c4a5cb26b9b7e3e9a6")
|
|
|
-
|
|
|
- ObjectId("5da3f2c5a5cb26b9b79847fc")
|
|
|
- ObjectId("5db2735ba5cb26b9b7c99c6f")
|
|
|
- */
|
|
|
- log.Println("测试开始")
|
|
|
- sid = "5da3f2c5a5cb26b9b79847fc"
|
|
|
- eid = "5db2735ba5cb26b9b7c99c6f"
|
|
|
-
|
|
|
-
|
|
|
|
|
|
- mapinfo := map[string]interface{}{}
|
|
|
- if sid == "" || eid == "" {
|
|
|
- log.Println("sid,eid参数不能为空")
|
|
|
- os.Exit(0)
|
|
|
+ if TimingTask {
|
|
|
+ log.Println("定时任务测试开始")
|
|
|
+ go timedTaskDay()
|
|
|
+ time.Sleep(99999 * time.Hour)
|
|
|
+ } else {
|
|
|
+ //2019年8月1日-8月17日 712646
|
|
|
+ IdType = true
|
|
|
+ sid = "5d41607aa5cb26b9b734fe30"
|
|
|
+ eid = "5eb172e1f2c1a7850bad1c39"
|
|
|
+ log.Println("正常判重测试开始")
|
|
|
+ log.Println(sid, "---", eid)
|
|
|
+ mapinfo := map[string]interface{}{}
|
|
|
+ if sid == "" || eid == "" {
|
|
|
+ log.Println("sid,eid参数不能为空")
|
|
|
+ os.Exit(0)
|
|
|
+ }
|
|
|
+ mapinfo["gtid"] = sid
|
|
|
+ mapinfo["lteid"] = eid
|
|
|
+ //mapinfo["stop"] = "true"
|
|
|
+ task([]byte{}, mapinfo)
|
|
|
+ time.Sleep(99999 * time.Second)
|
|
|
}
|
|
|
- mapinfo["gtid"] = sid
|
|
|
- mapinfo["lteid"] = eid
|
|
|
- mapinfo["stop"] = "true"
|
|
|
- task([]byte{}, mapinfo)
|
|
|
- time.Sleep(10 * time.Second)
|
|
|
}
|
|
|
func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
fmt.Println("接受的段数据")
|
|
@@ -138,17 +156,13 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
|
|
|
} else if mapInfo != nil {
|
|
|
taskType := util.ObjToString(mapInfo["stype"])
|
|
|
- if taskType == "historyTask" {
|
|
|
- //更新流程
|
|
|
- go historyTask(data, mapInfo)
|
|
|
- } else if taskType == "normalTask" {
|
|
|
+ if taskType == "normalTask" {
|
|
|
//判重流程
|
|
|
go task(data, mapInfo)
|
|
|
} else {
|
|
|
//其他
|
|
|
go task(data, mapInfo)
|
|
|
}
|
|
|
-
|
|
|
key, _ := mapInfo["key"].(string)
|
|
|
if key == "" {
|
|
|
key = "udpok"
|
|
@@ -175,6 +189,14 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
},
|
|
|
}
|
|
|
+ if IdType {
|
|
|
+ q = map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": mapInfo["gtid"].(string),
|
|
|
+ "$lte": mapInfo["lteid"].(string),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
log.Println(mgo.DbName, extract, q)
|
|
|
sess := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
@@ -185,7 +207,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
log.Println("排序:publishtime")
|
|
|
it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
}
|
|
|
- //it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
|
|
|
updateExtract := [][]map[string]interface{}{}
|
|
|
log.Println("线程数:", threadNum)
|
|
|
pool := make(chan bool, threadNum)
|
|
@@ -196,7 +217,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
if n%10000 == 0 {
|
|
|
log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
|
|
|
}
|
|
|
- if util.IntAll(tmp["repeat"]) == 1 {
|
|
|
+ if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
|
|
|
tmp = make(map[string]interface{})
|
|
|
repeateN++
|
|
|
continue
|
|
@@ -209,7 +230,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
wg.Done()
|
|
|
}()
|
|
|
info := NewInfo(tmp)
|
|
|
- if !LowHeavy { //是否进行低质量数据判重
|
|
|
+ if !LowHeavy { //是否进行低质量数据判重
|
|
|
if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
map[string]interface{}{
|
|
@@ -217,7 +238,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
},
|
|
|
map[string]interface{}{
|
|
|
"$set": map[string]interface{}{
|
|
|
- "repeat": -1,//无效数据标签
|
|
|
+ "repeat": -1, //无效数据标签
|
|
|
},
|
|
|
},
|
|
|
})
|
|
@@ -238,26 +259,38 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
|
var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ if IdType {
|
|
|
+ repeat_idMap["_id"] = info.id
|
|
|
+ }
|
|
|
merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- repeat_id := source.id//初始化一个数据
|
|
|
+ repeat_id := source.id //初始化一个数据
|
|
|
|
|
|
- if isMerger {//合并相关
|
|
|
+ if isMerger { //合并相关
|
|
|
basic_bool := basicDataScore(source, info)
|
|
|
if basic_bool {
|
|
|
//已原始数据为标准 - 对比数据打判重标签-
|
|
|
newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
- DM.replaceSourceData(newData, source.id) //替换
|
|
|
+ DM.replaceSourceData(newData, source) //替换
|
|
|
//对比数据打重复标签的id,原始数据id的记录
|
|
|
repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+
|
|
|
+ if IdType {
|
|
|
+ repeat_idMap["_id"] = info.id
|
|
|
+ merge_idMap["_id"] = source.id
|
|
|
+ }
|
|
|
repeat_id = source.id
|
|
|
} else {
|
|
|
//已对比数据为标准 ,数据池的数据打判重标签
|
|
|
newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
- DM.replaceSourceData(newData, source.id) //替换
|
|
|
+ DM.replaceSourceData(newData, source) //替换
|
|
|
//原始数据打重复标签的id, 对比数据id的记录
|
|
|
repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ if IdType {
|
|
|
+ repeat_idMap["_id"] = source.id
|
|
|
+ merge_idMap["_id"] = info.id
|
|
|
+ }
|
|
|
repeat_id = info.id
|
|
|
}
|
|
|
|
|
@@ -305,15 +338,22 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
merge_map,
|
|
|
})
|
|
|
}
|
|
|
- }else { //高质量数据
|
|
|
+ } else { //高质量数据
|
|
|
basic_bool := basicDataScore(source, info)
|
|
|
if !basic_bool {
|
|
|
- DM.replaceSourceData(info, source.id) //替换
|
|
|
+ DM.replaceSourceData(info, source) //替换
|
|
|
repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ if IdType {
|
|
|
+ repeat_idMap["_id"] = source.id
|
|
|
+ }
|
|
|
repeat_id = info.id
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ log.Println("最终结果","目标id:",repeat_idMap["_id"])
|
|
|
+
|
|
|
+
|
|
|
//重复数据打标签
|
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
repeat_idMap,
|
|
@@ -342,6 +382,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
|
|
|
//任务完成,开始发送广播通知下面节点
|
|
|
if n > repeateN && mapInfo["stop"] == nil {
|
|
|
+ log.Println("判重任务完成发送udp")
|
|
|
for _, to := range nextNode {
|
|
|
sid, _ := mapInfo["gtid"].(string)
|
|
|
eid, _ := mapInfo["lteid"].(string)
|
|
@@ -363,245 +404,201 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//支持历史更新
|
|
|
-func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
+//定时任务
|
|
|
+func timedTaskDay() {
|
|
|
+ log.Println("部署定时任务")
|
|
|
+ c := cron.New()
|
|
|
+ c.AddFunc("0 0 1 * * ?", func() { movedata() }) //每天凌晨1点执行一次
|
|
|
+ c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() }) //每天凌晨2点执行一次
|
|
|
+ c.Start()
|
|
|
+ //timedTaskOnce()
|
|
|
+}
|
|
|
+func timedTaskOnce() {
|
|
|
|
|
|
- fmt.Println("开始取历史时间段")
|
|
|
+ log.Println("开始一次定时任务")
|
|
|
defer util.Catch()
|
|
|
- sess := mgo.GetMgoConn()
|
|
|
- defer mgo.DestoryMongoConn(sess)
|
|
|
-
|
|
|
- q:= map[string]interface{}{
|
|
|
+ //当前时间-8 -4 小时
|
|
|
+ now := time.Now()
|
|
|
+ log.Println(now)
|
|
|
+ preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
|
|
|
+ curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
|
|
|
+ log.Println(preTime,curTime)
|
|
|
+ task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
|
|
|
+ task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
|
|
|
+ between_time := curTime.Unix() - (86400 * timingPubScope)
|
|
|
+ log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
|
|
|
+ //区间id
|
|
|
+ q_start := map[string]interface{}{
|
|
|
"_id": map[string]interface{}{
|
|
|
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ "$gte": StringTOBsonId(task_sid),
|
|
|
+ "$lte": StringTOBsonId(task_eid),
|
|
|
},
|
|
|
}
|
|
|
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
|
|
|
- minTime, maxTime := int64(0), int64(0)
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
|
- //取出最大最小时间
|
|
|
- info_time := tmp["comeintime"]
|
|
|
- if Is_Sort {
|
|
|
- info_time = tmp["publishtime"]
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
|
|
|
+ num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
|
|
|
+ updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
|
|
|
+ pendAllArr:=[][]map[string]interface{}{}//待处理数组
|
|
|
+ dayArr := []map[string]interface{}{}
|
|
|
+ for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
|
|
|
+ if num%10000 == 0 {
|
|
|
+ log.Println("正序遍历:", num)
|
|
|
}
|
|
|
- if minTime == 0 || maxTime == 0 && util.Int64All(info_time) != 0 {
|
|
|
- minTime = util.Int64All(info_time)
|
|
|
- maxTime = util.Int64All(info_time)
|
|
|
- } else {
|
|
|
- t := util.Int64All(info_time)
|
|
|
- if t < minTime && t != 0 {
|
|
|
- minTime = t
|
|
|
- }
|
|
|
- if t > maxTime && t != 0 {
|
|
|
- maxTime = t
|
|
|
+ //取-符合-发布时间半年内的数据
|
|
|
+ if util.IntAll(tmp["dataging"]) == 1 {
|
|
|
+ pubtime := util.Int64All(tmp["publishtime"])
|
|
|
+ if pubtime > 0 && pubtime >= between_time {
|
|
|
+ oknum++
|
|
|
+ if deterTime==0 {
|
|
|
+ log.Println("找到第一条符合条件的数据")
|
|
|
+ deterTime = util.Int64All(tmp["publishtime"])
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }else {
|
|
|
+ if pubtime-deterTime >timingSpanDay*86400 {
|
|
|
+ //新数组重新构建,当前组数据加到全部组数据
|
|
|
+ pendAllArr = append(pendAllArr,dayArr)
|
|
|
+ dayArr = []map[string]interface{}{}
|
|
|
+ deterTime = util.Int64All(tmp["publishtime"])
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }else {
|
|
|
+ dayArr = append(dayArr,tmp)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ //不在两年内的也清标记
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "dataging": 0,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) > 50 {
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- //时间不正确时
|
|
|
- if minTime == 0 && maxTime == 0 {
|
|
|
- log.Println("段数据区间 不符合")
|
|
|
- return
|
|
|
+
|
|
|
+
|
|
|
+ //批量更新标记
|
|
|
+ if len(updateExtract) > 0 {
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
}
|
|
|
- fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
|
|
|
- gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
|
|
|
- fmt.Println(gtid, lteid)
|
|
|
- HM = NewHistorymap(gtid, lteid, minTime, maxTime)
|
|
|
|
|
|
- fmt.Println("开始历史数据判重")
|
|
|
+ if len(dayArr)>0 {
|
|
|
+ pendAllArr = append(pendAllArr,dayArr)
|
|
|
+ dayArr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
|
|
|
- defer util.Catch()
|
|
|
- //区间id
|
|
|
- sess_history := mgo.GetMgoConn()
|
|
|
- defer mgo.DestoryMongoConn(sess_history)
|
|
|
- q_history := map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
- },
|
|
|
+ log.Println("查询数量:",num,"符合条件:",oknum)
|
|
|
+
|
|
|
+ if len(pendAllArr) <= 0 {
|
|
|
+ log.Println("没找到dataging==1的数据")
|
|
|
+ return
|
|
|
}
|
|
|
- log.Println(mgo.DbName, extract, q_history)
|
|
|
|
|
|
- //是否排序
|
|
|
- it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Iter()
|
|
|
- if Is_Sort {
|
|
|
- it_history = sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
|
|
|
+ //测试分组数量是否正确
|
|
|
+ testNum:=0
|
|
|
+ for k,v:=range pendAllArr {
|
|
|
+ log.Println("第",k,"组--","数量:",len(v))
|
|
|
+ testNum = testNum+len(v)
|
|
|
}
|
|
|
- updateExtract := [][]map[string]interface{}{}
|
|
|
- log.Println("线程数:", threadNum)
|
|
|
- pool := make(chan bool, threadNum)
|
|
|
- wg := &sync.WaitGroup{}
|
|
|
+ log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
|
|
|
+
|
|
|
n, repeateN := 0, 0
|
|
|
- for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
|
|
|
- if n%10000 == 0 {
|
|
|
- log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
|
|
|
- }
|
|
|
- pool <- true
|
|
|
- wg.Add(1)
|
|
|
- go func(tmp map[string]interface{}) {
|
|
|
- defer func() {
|
|
|
- <-pool
|
|
|
- wg.Done()
|
|
|
- }()
|
|
|
+ for k,v:=range pendAllArr {
|
|
|
+ //构建当前组的数据池
|
|
|
+ log.Println("构建第",k,"组---(数据池)")
|
|
|
+ DM = TimedTaskDatamap(dupdays, util.Int64All(v[0]["publishtime"]))
|
|
|
+ log.Println("开始遍历判重第",k,"组 共计数量:",len(v))
|
|
|
+ n = n+len(v)
|
|
|
+ log.Println("统计目前总数量:",n,"重复数量:",repeateN)
|
|
|
+ for _,tmp:=range v {
|
|
|
info := NewInfo(tmp)
|
|
|
- if !LowHeavy { //是否进行低质量数据判重
|
|
|
+ if !LowHeavy { //是否进行低质量数据判重
|
|
|
if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
+ log.Println("无效数据")
|
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
map[string]interface{}{
|
|
|
"_id": tmp["_id"],
|
|
|
},
|
|
|
map[string]interface{}{
|
|
|
"$set": map[string]interface{}{
|
|
|
- "repeat": -1,//无效数据标签
|
|
|
+ "repeat": -1, //无效数据标签
|
|
|
+ "dataging": 0,
|
|
|
},
|
|
|
},
|
|
|
})
|
|
|
- if len(updateExtract) > 500 {
|
|
|
+ if len(updateExtract) > 50 {
|
|
|
mgo.UpSertBulk(extract, updateExtract...)
|
|
|
updateExtract = [][]map[string]interface{}{}
|
|
|
}
|
|
|
- return
|
|
|
+ continue
|
|
|
}
|
|
|
}
|
|
|
- b, source, reason := HM.checkHistory(info)
|
|
|
+ b, source, reason := DM.check(info)
|
|
|
if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
|
- if reason == "未判重记录" {
|
|
|
- fmt.Println("未判重记录")
|
|
|
- //把info的数据判重的标签更换,并新增字段
|
|
|
- HM.replaceSourceData(info, info.id) //替换即添加
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": tmp["_id"],
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": 0,
|
|
|
- "repeatid": -2,
|
|
|
- },
|
|
|
+ log.Println("判重结果", b, reason,"目标id",info.id)
|
|
|
+ repeateN++
|
|
|
+ //重复数据打标签
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 1,
|
|
|
+ "repeat_reason": reason,
|
|
|
+ "repeat_id": source.id,
|
|
|
+ "dataging": 0,
|
|
|
},
|
|
|
- })
|
|
|
- } else {
|
|
|
- repeateN++
|
|
|
- var is_replace = false
|
|
|
- var mergeArr = []int64{} //更改合并数组记录
|
|
|
- var newData = &Info{} //更换新的数据池数据
|
|
|
- var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
|
- var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- repeat_id := source.id
|
|
|
- //以下合并相关
|
|
|
- if isMerger {
|
|
|
- basic_bool := basicDataScore(source, info)
|
|
|
- if basic_bool {
|
|
|
- //已原始数据为标准 - 对比数据打判重标签-
|
|
|
- newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
- HM.replaceSourceData(newData, source.id) //替换
|
|
|
- //对比数据打重复标签的id,原始数据id的记录
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- repeat_id = source.id
|
|
|
- } else {
|
|
|
- //已对比数据为标准 ,数据池的数据打判重标签
|
|
|
- newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
- HM.replaceSourceData(newData, source.id) //替换
|
|
|
- //原始数据打重复标签的id, 对比数据id的记录
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- repeat_id = info.id
|
|
|
- }
|
|
|
-
|
|
|
- merge_map := make(map[string]interface{}, 0)
|
|
|
- if is_replace { //有过合并-更新数据
|
|
|
- merge_map = map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "merge": newData.mergemap,
|
|
|
- },
|
|
|
- }
|
|
|
-
|
|
|
- //更新合并后的数据
|
|
|
- for _, value := range mergeArr {
|
|
|
- if value == 0 {
|
|
|
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
- } else if value == 1 {
|
|
|
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
- } else if value == 2 {
|
|
|
- merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
|
|
|
- } else if value == 3 {
|
|
|
- merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
|
|
|
- } else if value == 4 {
|
|
|
- merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
|
|
|
- } else if value == 5 {
|
|
|
- merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
|
|
|
- } else if value == 6 {
|
|
|
- merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
|
|
|
- } else if value == 7 {
|
|
|
- merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
|
|
|
- } else if value == 8 {
|
|
|
- merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
|
- } else if value == 9 {
|
|
|
- merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
|
- } else if value == 10 {
|
|
|
- merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
|
- } else if value == 11 {
|
|
|
- merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
|
- } else {
|
|
|
- }
|
|
|
- }
|
|
|
- //模板数据更新
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- merge_idMap,
|
|
|
- merge_map,
|
|
|
- })
|
|
|
- }
|
|
|
- }else { //高质量数据
|
|
|
- basic_bool := basicDataScore(source, info)
|
|
|
- if !basic_bool {
|
|
|
- HM.replaceSourceData(info, source.id) //替换
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- repeat_id = info.id
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //重复数据打标签
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- repeat_idMap,
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": 1,
|
|
|
- "repeat_reason": reason,
|
|
|
- "repeat_id": repeat_id,
|
|
|
- },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }else {
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "dataging": 0,//符合条件的都为dataging==0
|
|
|
},
|
|
|
- })
|
|
|
-
|
|
|
- }
|
|
|
+ },
|
|
|
+ })
|
|
|
+ }
|
|
|
+ if len(updateExtract) > 50 {
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
}
|
|
|
- }(tmp)
|
|
|
- if len(updateExtract) > 500 {
|
|
|
- mgo.UpSertBulk(extract, updateExtract...)
|
|
|
- updateExtract = [][]map[string]interface{}{}
|
|
|
}
|
|
|
- tmp = make(map[string]interface{})
|
|
|
}
|
|
|
- wg.Wait()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
if len(updateExtract) > 0 {
|
|
|
mgo.UpSertBulk(extract, updateExtract...)
|
|
|
- //mgo.UpdateBulk(bidding, updateBidding...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
}
|
|
|
- log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
|
+ log.Println("this timeTask over.", n, "repeateN:", repeateN)
|
|
|
|
|
|
- //任务完成,开始发送广播通知下面节点
|
|
|
- if n > repeateN && mapInfo["stop"] == nil {
|
|
|
+ //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
|
|
|
+ if n > repeateN {
|
|
|
for _, to := range nextNode {
|
|
|
- sid, _ := mapInfo["gtid"].(string)
|
|
|
- eid, _ := mapInfo["lteid"].(string)
|
|
|
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
|
|
|
+ next_sid := util.BsonIdToSId(task_sid)
|
|
|
+ next_eid := util.BsonIdToSId(task_eid)
|
|
|
+ key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
|
|
|
by, _ := json.Marshal(map[string]interface{}{
|
|
|
- "gtid": sid,
|
|
|
- "lteid": eid,
|
|
|
+ "gtid": next_sid,
|
|
|
+ "lteid": next_eid,
|
|
|
"stype": util.ObjToString(to["stype"]),
|
|
|
"key": key,
|
|
|
})
|
|
@@ -747,7 +744,7 @@ func basicDataScore(v *Info, info *Info) bool {
|
|
|
//先判断level
|
|
|
if dict_v != nil {
|
|
|
v_level := util.ObjToString(dict_v["level"])
|
|
|
- if v_level == "中央" {
|
|
|
+ if v_level == "国家" {
|
|
|
v_score = 4
|
|
|
} else if v_level == "省级" {
|
|
|
v_score = 3
|
|
@@ -763,7 +760,7 @@ func basicDataScore(v *Info, info *Info) bool {
|
|
|
|
|
|
if dict_info != nil {
|
|
|
info_level := util.ObjToString(dict_info["level"])
|
|
|
- if info_level == "中央" {
|
|
|
+ if info_level == "国家" {
|
|
|
info_score = 4
|
|
|
} else if info_level == "省级" {
|
|
|
info_score = 3
|
|
@@ -788,11 +785,11 @@ func basicDataScore(v *Info, info *Info) bool {
|
|
|
//判断sitetype
|
|
|
if dict_v != nil {
|
|
|
v_sitetype := util.ObjToString(dict_v["sitetype"])
|
|
|
- if v_sitetype == "政府采购" || v_sitetype == "政府门户" {
|
|
|
+ if v_sitetype == "政府采购" {
|
|
|
v_score = 4
|
|
|
} else if v_sitetype == "公共资源" {
|
|
|
v_score = 3
|
|
|
- } else if v_sitetype == "官方网站" {
|
|
|
+ } else if v_sitetype == "官方网站"|| v_sitetype == "政府门户" {
|
|
|
v_score = 2
|
|
|
} else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
|
|
|
v_score = 1
|
|
@@ -804,11 +801,11 @@ func basicDataScore(v *Info, info *Info) bool {
|
|
|
|
|
|
if dict_info != nil {
|
|
|
info_sitetype := util.ObjToString(dict_info["sitetype"])
|
|
|
- if info_sitetype == "政府采购" || info_sitetype == "政府门户" {
|
|
|
+ if info_sitetype == "政府采购" {
|
|
|
info_score = 4
|
|
|
} else if info_sitetype == "公共资源" {
|
|
|
info_score = 3
|
|
|
- } else if info_sitetype == "官方网站" {
|
|
|
+ } else if info_sitetype == "官方网站"|| info_sitetype == "政府门户" {
|
|
|
info_score = 2
|
|
|
} else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
|
|
|
info_score = 1
|
|
@@ -825,6 +822,17 @@ func basicDataScore(v *Info, info *Info) bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
+ if v_score == info_score {//同sitetype 情况下 分析weight
|
|
|
+ v_weight := util.IntAll(dict_v["weight"])
|
|
|
+ info_weight := util.IntAll(dict_info["weight"])
|
|
|
+ if v_weight>info_weight {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ if info_weight>v_weight {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
//网站评估
|
|
|
m, n := 0, 0
|
|
|
if v.projectname != "" {
|
|
@@ -922,3 +930,28 @@ func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
|
|
|
}
|
|
|
return false
|
|
|
}
|
|
|
+
|
|
|
+//迁移数据dupdays+5之前的数据
|
|
|
+func movedata() {
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ year, month, day := time.Now().Date()
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix(),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Println(q)
|
|
|
+ it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
|
|
|
+ index := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
+ mgo.Save(extract_back, tmp)
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ if index%1000 == 0 {
|
|
|
+ log.Println("index", index)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Println("save to", extract_back, " ok index", index)
|
|
|
+ delnum := mgo.Delete(extract, q)
|
|
|
+ log.Println("remove from ", extract, delnum)
|
|
|
+}
|