|
@@ -32,6 +32,7 @@ var (
|
|
|
|
|
|
//正则筛选相关
|
|
|
FilterRegTitle = regexp.MustCompile("^_$")
|
|
|
+ FilterRegTitle_0 = regexp.MustCompile("^_$")
|
|
|
FilterRegTitle_1 = regexp.MustCompile("^_$")
|
|
|
FilterRegTitle_2 = regexp.MustCompile("^_$")
|
|
|
|
|
@@ -39,14 +40,14 @@ var (
|
|
|
Is_Sort bool //是否排序
|
|
|
threadNum int //线程数量
|
|
|
SiteMap map[string]map[string]interface{} //站点map
|
|
|
- idtype, sid, eid string //测试人员判重使用
|
|
|
+ LowHeavy bool //低质量数据判重
|
|
|
+ sid, eid string //测试人员判重使用
|
|
|
)
|
|
|
|
|
|
func init() {
|
|
|
flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
|
|
|
flag.StringVar(&sid, "sid", "", "开始id")
|
|
|
flag.StringVar(&eid, "eid", "", "结束id")
|
|
|
- flag.StringVar(&idtype, "idtype", "", "id类型,默认ObjectId:0,String:1")
|
|
|
flag.Parse()
|
|
|
//172.17.145.163:27080
|
|
|
util.ReadConfig(&Sysconfig)
|
|
@@ -63,12 +64,13 @@ func init() {
|
|
|
//加载数据
|
|
|
DM = NewDatamap(dupdays, lastid)
|
|
|
FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
|
|
|
+ FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
|
|
|
FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
|
|
|
FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
|
|
|
isMerger = Sysconfig["isMerger"].(bool)
|
|
|
Is_Sort = Sysconfig["isSort"].(bool)
|
|
|
threadNum = util.IntAllDef(Sysconfig["threads"], 1)
|
|
|
-
|
|
|
+ LowHeavy = Sysconfig["lowHeavy"].(bool)
|
|
|
//站点配置
|
|
|
site := mconf["site"].(map[string]interface{})
|
|
|
SiteMap = make(map[string]map[string]interface{}, 0)
|
|
@@ -103,12 +105,14 @@ func mainT() {
|
|
|
/*
|
|
|
ObjectId("5da3f31aa5cb26b9b798d3aa")
|
|
|
ObjectId("5da418c4a5cb26b9b7e3e9a6")
|
|
|
- ObjectId("5df5071ce9d1f601e495fa54")
|
|
|
- ObjectId("5e09c05f0cf41612e0626abc")
|
|
|
+
|
|
|
+ ObjectId("5da3f2c5a5cb26b9b79847fc")
|
|
|
+ ObjectId("5db2735ba5cb26b9b7c99c6f")
|
|
|
*/
|
|
|
- log.Println("测试开始")
|
|
|
- sid = "5da3f31aa5cb26b9b798d3aa"
|
|
|
- eid = "5da418c4a5cb26b9b7e3e9a6"
|
|
|
+
|
|
|
+ //
|
|
|
+ sid = "5da3f2c5a5cb26b9b79847fc"
|
|
|
+ eid = "5db2735ba5cb26b9b7c99c6f"
|
|
|
mapinfo := map[string]interface{}{}
|
|
|
if sid == "" || eid == "" {
|
|
|
log.Println("sid,eid参数不能为空")
|
|
@@ -160,32 +164,23 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
|
|
|
//开始判重程序
|
|
|
func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
- fmt.Println("开始数据判重")
|
|
|
+ log.Println("开始数据判重")
|
|
|
defer util.Catch()
|
|
|
//区间id
|
|
|
- q := map[string]interface{}{}
|
|
|
- if idtype == "1" {
|
|
|
- q = map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": mapInfo["gtid"].(string),
|
|
|
- "$lte": mapInfo["lteid"].(string),
|
|
|
- },
|
|
|
- }
|
|
|
- } else {
|
|
|
- q = map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
- },
|
|
|
- }
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
}
|
|
|
log.Println(mgo.DbName, extract, q)
|
|
|
sess := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
|
|
|
//是否排序
|
|
|
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
|
|
|
+ it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("_id").Iter()
|
|
|
if Is_Sort {
|
|
|
+ log.Println("排序:publishtime")
|
|
|
it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
}
|
|
|
//it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
|
|
@@ -193,8 +188,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
log.Println("线程数:", threadNum)
|
|
|
pool := make(chan bool, threadNum)
|
|
|
wg := &sync.WaitGroup{}
|
|
|
- //mapLock := &sync.Mutex{}
|
|
|
n, repeateN := 0, 0
|
|
|
+
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
|
|
|
if n%10000 == 0 {
|
|
|
log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
|
|
@@ -207,132 +202,123 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
wg.Done()
|
|
|
}()
|
|
|
info := NewInfo(tmp)
|
|
|
- //是否为无效数据
|
|
|
- if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": tmp["_id"],
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": -1,
|
|
|
+ if !LowHeavy { //是否进行低质量数据判重
|
|
|
+ if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
},
|
|
|
- },
|
|
|
- })
|
|
|
- if len(updateExtract) > 500 {
|
|
|
- mgo.UpSertBulk(extract, updateExtract...)
|
|
|
- updateExtract = [][]map[string]interface{}{}
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": -1,//无效数据标签
|
|
|
+ },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) > 500 {
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ return
|
|
|
}
|
|
|
- } else {
|
|
|
- b, source, reason := DM.check(info)
|
|
|
- if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
|
- repeateN++
|
|
|
- var is_replace = false
|
|
|
- var mergeArr = []int64{} //更改合并数组记录
|
|
|
- var newData = &Info{} //更换新的数据池数据
|
|
|
- var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
|
- var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
- if idtype == "1" { //先临时决定一个id
|
|
|
- repeat_idMap["_id"] = info.id
|
|
|
- merge_idMap["_id"] = source.id
|
|
|
- } else {
|
|
|
+ }
|
|
|
+
|
|
|
+ b, source, reason := DM.check(info)
|
|
|
+ if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
|
+ repeateN++
|
|
|
+ var is_replace = false
|
|
|
+ var mergeArr = []int64{} //更改合并数组记录
|
|
|
+ var newData = &Info{} //更换新的数据池数据
|
|
|
+ var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
|
+ var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id := source.id//初始化一个数据
|
|
|
+
|
|
|
+ if isMerger {//合并相关
|
|
|
+ basic_bool := basicDataScore(source, info)
|
|
|
+ if basic_bool {
|
|
|
+ //已原始数据为标准 - 对比数据打判重标签-
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
+ DM.replaceSourceData(newData, source.id) //替换
|
|
|
+ //对比数据打重复标签的id,原始数据id的记录
|
|
|
repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id = source.id
|
|
|
+ } else {
|
|
|
+ //已对比数据为标准 ,数据池的数据打判重标签
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
+ DM.replaceSourceData(newData, source.id) //替换
|
|
|
+ //原始数据打重复标签的id, 对比数据id的记录
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ repeat_id = info.id
|
|
|
}
|
|
|
- repeat_id := source.id
|
|
|
- //以下合并相关
|
|
|
- if isMerger {
|
|
|
- basic_bool := basicDataScore(source, info)
|
|
|
- if basic_bool {
|
|
|
- //已原始数据为标准 - 对比数据打判重标签-
|
|
|
- newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
- DM.replaceSourceData(newData, source.id) //替换
|
|
|
- //对比数据打重复标签的id,原始数据id的记录
|
|
|
- if idtype == "1" {
|
|
|
- repeat_idMap["_id"] = info.id
|
|
|
- merge_idMap["_id"] = source.id
|
|
|
- } else {
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- }
|
|
|
- repeat_id = source.id
|
|
|
- } else {
|
|
|
- //已对比数据为标准 ,数据池的数据打判重标签
|
|
|
- newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
- DM.replaceSourceData(newData, source.id) //替换
|
|
|
|
|
|
- //原始数据打重复标签的id, 对比数据id的记录
|
|
|
- if idtype == "1" {
|
|
|
- repeat_idMap["_id"] = source.id
|
|
|
- merge_idMap["_id"] = info.id
|
|
|
- } else {
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- }
|
|
|
- repeat_id = info.id
|
|
|
+ merge_map := make(map[string]interface{}, 0)
|
|
|
+ if is_replace { //有过合并-更新数据
|
|
|
+ merge_map = map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "merge": newData.mergemap,
|
|
|
+ },
|
|
|
}
|
|
|
-
|
|
|
- merge_map := make(map[string]interface{}, 0)
|
|
|
- if is_replace { //有过合并-更新数据
|
|
|
-
|
|
|
- merge_map = map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "merge": newData.mergemap,
|
|
|
- },
|
|
|
- }
|
|
|
-
|
|
|
- //更新合并后的数据
|
|
|
- for _, value := range mergeArr {
|
|
|
- if value == 0 {
|
|
|
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
- } else if value == 1 {
|
|
|
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
- } else if value == 2 {
|
|
|
- merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
|
|
|
- } else if value == 3 {
|
|
|
- merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
|
|
|
- } else if value == 4 {
|
|
|
- merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
|
|
|
- } else if value == 5 {
|
|
|
- merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
|
|
|
- } else if value == 6 {
|
|
|
- merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
|
|
|
- } else if value == 7 {
|
|
|
- merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
|
|
|
- } else if value == 8 {
|
|
|
- merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
|
- } else if value == 9 {
|
|
|
- merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
|
- } else if value == 10 {
|
|
|
- merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
|
- } else if value == 11 {
|
|
|
- merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
|
- } else {
|
|
|
- }
|
|
|
+ //更新合并后的数据
|
|
|
+ for _, value := range mergeArr {
|
|
|
+ if value == 0 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
+ merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
+ } else if value == 1 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
+ merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
+ } else if value == 2 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
|
|
|
+ } else if value == 3 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
|
|
|
+ } else if value == 4 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
|
|
|
+ } else if value == 5 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
|
|
|
+ } else if value == 6 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
|
|
|
+ } else if value == 7 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
|
|
|
+ } else if value == 8 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
|
+ } else if value == 9 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
|
+ } else if value == 10 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
|
+ } else if value == 11 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
|
+ } else {
|
|
|
}
|
|
|
- //模板数据更新
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- merge_idMap,
|
|
|
- merge_map,
|
|
|
- })
|
|
|
}
|
|
|
+ //模板数据更新
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ merge_idMap,
|
|
|
+ merge_map,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }else { //高质量数据
|
|
|
+ basic_bool := basicDataScore(source, info)
|
|
|
+ if !basic_bool {
|
|
|
+ DM.replaceSourceData(info, source.id) //替换
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id = info.id
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- //重复数据打标签
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- repeat_idMap,
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": 1,
|
|
|
- "repeat_reason": reason,
|
|
|
- "repeat_id": repeat_id,
|
|
|
- },
|
|
|
+ //重复数据打标签
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ repeat_idMap,
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 1,
|
|
|
+ "repeat_reason": reason,
|
|
|
+ "repeat_id": repeat_id,
|
|
|
},
|
|
|
- })
|
|
|
+ },
|
|
|
+ })
|
|
|
|
|
|
- }
|
|
|
}
|
|
|
}(tmp)
|
|
|
if len(updateExtract) > 500 {
|
|
@@ -344,7 +330,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
|
wg.Wait()
|
|
|
if len(updateExtract) > 0 {
|
|
|
mgo.UpSertBulk(extract, updateExtract...)
|
|
|
- //mgo.UpdateBulk(bidding, updateBidding...)
|
|
|
}
|
|
|
log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
|
|
|
@@ -379,23 +364,12 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
sess := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess)
|
|
|
|
|
|
- var q map[string]interface{}
|
|
|
- if idtype == "1" {
|
|
|
- q = map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": mapInfo["gtid"].(string),
|
|
|
- "$lte": mapInfo["lteid"].(string),
|
|
|
- },
|
|
|
- }
|
|
|
- } else {
|
|
|
- q = map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
- },
|
|
|
- }
|
|
|
+ q:= map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
}
|
|
|
-
|
|
|
it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
|
|
|
minTime, maxTime := int64(0), int64(0)
|
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
@@ -426,27 +400,18 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
|
|
|
fmt.Println(gtid, lteid)
|
|
|
HM = NewHistorymap(gtid, lteid, minTime, maxTime)
|
|
|
+
|
|
|
fmt.Println("开始历史数据判重")
|
|
|
|
|
|
defer util.Catch()
|
|
|
//区间id
|
|
|
sess_history := mgo.GetMgoConn()
|
|
|
defer mgo.DestoryMongoConn(sess_history)
|
|
|
- var q_history map[string]interface{}
|
|
|
- if idtype == "1" {
|
|
|
- q_history = map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": mapInfo["gtid"].(string),
|
|
|
- "$lte": mapInfo["lteid"].(string),
|
|
|
- },
|
|
|
- }
|
|
|
- } else {
|
|
|
- q_history = map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
- },
|
|
|
- }
|
|
|
+ q_history := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
}
|
|
|
log.Println(mgo.DbName, extract, q_history)
|
|
|
|
|
@@ -459,7 +424,6 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
log.Println("线程数:", threadNum)
|
|
|
pool := make(chan bool, threadNum)
|
|
|
wg := &sync.WaitGroup{}
|
|
|
- //mapLock := &sync.Mutex{}
|
|
|
n, repeateN := 0, 0
|
|
|
for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
|
|
|
if n%10000 == 0 {
|
|
@@ -473,149 +437,162 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
|
wg.Done()
|
|
|
}()
|
|
|
info := NewInfo(tmp)
|
|
|
- if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": tmp["_id"],
|
|
|
- },
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": -1,
|
|
|
+ if !LowHeavy { //是否进行低质量数据判重
|
|
|
+ if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
},
|
|
|
- },
|
|
|
- })
|
|
|
- if len(updateExtract) > 500 {
|
|
|
- mgo.UpSertBulk(extract, updateExtract...)
|
|
|
- updateExtract = [][]map[string]interface{}{}
|
|
|
- }
|
|
|
- } else {
|
|
|
- b, source, reason := HM.checkHistory(info)
|
|
|
- if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
|
- if reason == "未判重记录" {
|
|
|
- fmt.Println("未判重记录")
|
|
|
- //把info的数据判重的标签更换,并新增字段
|
|
|
- DM.replaceSourceData(info, info.id) //替换即添加
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- map[string]interface{}{
|
|
|
- "_id": tmp["_id"],
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": -1,//无效数据标签
|
|
|
},
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": 0,
|
|
|
- "repeatid": -2,
|
|
|
- },
|
|
|
+ },
|
|
|
+ })
|
|
|
+ if len(updateExtract) > 500 {
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ updateExtract = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+ b, source, reason := HM.checkHistory(info)
|
|
|
+ if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
|
+ if reason == "未判重记录" {
|
|
|
+ fmt.Println("未判重记录")
|
|
|
+ //把info的数据判重的标签更换,并新增字段
|
|
|
+ HM.replaceSourceData(info, info.id) //替换即添加
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ map[string]interface{}{
|
|
|
+ "_id": tmp["_id"],
|
|
|
+ },
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 0,
|
|
|
+ "repeatid": -2,
|
|
|
},
|
|
|
- })
|
|
|
- } else {
|
|
|
- repeateN++
|
|
|
- var is_replace = false
|
|
|
- var mergeArr = []int64{} //更改合并数组记录
|
|
|
- var newData = &Info{} //更换新的数据池数据
|
|
|
- var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
|
- var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
- if idtype == "1" { //先临时决定一个id
|
|
|
- repeat_idMap["_id"] = info.id
|
|
|
- merge_idMap["_id"] = source.id
|
|
|
- } else {
|
|
|
+ },
|
|
|
+ })
|
|
|
+ } else {
|
|
|
+ repeateN++
|
|
|
+ var is_replace = false
|
|
|
+ var mergeArr = []int64{} //更改合并数组记录
|
|
|
+ var newData = &Info{} //更换新的数据池数据
|
|
|
+ var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
|
+ var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id := source.id
|
|
|
+ //以下合并相关
|
|
|
+ if isMerger {
|
|
|
+ basic_bool := basicDataScore(source, info)
|
|
|
+ if basic_bool {
|
|
|
+ //已原始数据为标准 - 对比数据打判重标签-
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
+ HM.replaceSourceData(newData, source.id) //替换
|
|
|
+ //对比数据打重复标签的id,原始数据id的记录
|
|
|
repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id = source.id
|
|
|
+ } else {
|
|
|
+ //已对比数据为标准 ,数据池的数据打判重标签
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
+ HM.replaceSourceData(newData, source.id) //替换
|
|
|
+ //原始数据打重复标签的id, 对比数据id的记录
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
+ repeat_id = info.id
|
|
|
}
|
|
|
- repeat_id := source.id
|
|
|
- //以下合并相关
|
|
|
- if isMerger {
|
|
|
- basic_bool := basicDataScore(source, info)
|
|
|
- if basic_bool {
|
|
|
- //已原始数据为标准 - 对比数据打判重标签-
|
|
|
- newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
|
- DM.replaceSourceData(newData, source.id) //替换
|
|
|
- //对比数据打重复标签的id,原始数据id的记录
|
|
|
- if idtype == "1" {
|
|
|
- repeat_idMap["_id"] = info.id
|
|
|
- merge_idMap["_id"] = source.id
|
|
|
- } else {
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- }
|
|
|
- repeat_id = source.id
|
|
|
- } else {
|
|
|
- //已对比数据为标准 ,数据池的数据打判重标签
|
|
|
- newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
|
- DM.replaceSourceData(newData, source.id) //替换
|
|
|
-
|
|
|
- //原始数据打重复标签的id, 对比数据id的记录
|
|
|
- if idtype == "1" {
|
|
|
- repeat_idMap["_id"] = source.id
|
|
|
- merge_idMap["_id"] = info.id
|
|
|
- } else {
|
|
|
- repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
- merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
- }
|
|
|
- repeat_id = info.id
|
|
|
- }
|
|
|
|
|
|
- merge_map := make(map[string]interface{}, 0)
|
|
|
- if is_replace { //有过合并-更新数据
|
|
|
-
|
|
|
- merge_map = map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "merge": newData.mergemap,
|
|
|
- },
|
|
|
- }
|
|
|
+ merge_map := make(map[string]interface{}, 0)
|
|
|
+ if is_replace { //有过合并-更新数据
|
|
|
+ merge_map = map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "merge": newData.mergemap,
|
|
|
+ },
|
|
|
+ }
|
|
|
|
|
|
- //更新合并后的数据
|
|
|
- for _, value := range mergeArr {
|
|
|
- if value == 0 {
|
|
|
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
- } else if value == 1 {
|
|
|
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
- } else if value == 2 {
|
|
|
- merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
|
|
|
- } else if value == 3 {
|
|
|
- merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
|
|
|
- } else if value == 4 {
|
|
|
- merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
|
|
|
- } else if value == 5 {
|
|
|
- merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
|
|
|
- } else if value == 6 {
|
|
|
- merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
|
|
|
- } else if value == 7 {
|
|
|
- merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
|
|
|
- } else if value == 8 {
|
|
|
- merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
|
- } else if value == 9 {
|
|
|
- merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
|
- } else if value == 10 {
|
|
|
- merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
|
- } else if value == 11 {
|
|
|
- merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
|
- } else {
|
|
|
- }
|
|
|
+ //更新合并后的数据
|
|
|
+ for _, value := range mergeArr {
|
|
|
+ if value == 0 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
+ merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
+ } else if value == 1 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["area"] = newData.area
|
|
|
+ merge_map["$set"].(map[string]interface{})["city"] = newData.city
|
|
|
+ } else if value == 2 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
|
|
|
+ } else if value == 3 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
|
|
|
+ } else if value == 4 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
|
|
|
+ } else if value == 5 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
|
|
|
+ } else if value == 6 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
|
|
|
+ } else if value == 7 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
|
|
|
+ } else if value == 8 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
|
+ } else if value == 9 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
|
+ } else if value == 10 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
|
+ } else if value == 11 {
|
|
|
+ merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
|
+ } else {
|
|
|
}
|
|
|
- //模板数据更新
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- merge_idMap,
|
|
|
- merge_map,
|
|
|
- })
|
|
|
}
|
|
|
+ //模板数据更新
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ merge_idMap,
|
|
|
+ merge_map,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }else { //高质量数据
|
|
|
+ basic_bool := basicDataScore(source, info)
|
|
|
+ if !basic_bool {
|
|
|
+ HM.replaceSourceData(info, source.id) //替换
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
+ repeat_id = info.id
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- //重复数据打标签
|
|
|
- updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
- repeat_idMap,
|
|
|
- map[string]interface{}{
|
|
|
- "$set": map[string]interface{}{
|
|
|
- "repeat": 1,
|
|
|
- "repeat_reason": reason,
|
|
|
- "repeat_id": repeat_id,
|
|
|
- },
|
|
|
+ //重复数据打标签
|
|
|
+ updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ repeat_idMap,
|
|
|
+ map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "repeat": 1,
|
|
|
+ "repeat_reason": reason,
|
|
|
+ "repeat_id": repeat_id,
|
|
|
},
|
|
|
- })
|
|
|
+ },
|
|
|
+ })
|
|
|
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ //if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
|
+ // updateExtract = append(updateExtract, []map[string]interface{}{
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "_id": tmp["_id"],
|
|
|
+ // },
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "$set": map[string]interface{}{
|
|
|
+ // "repeat": -1,
|
|
|
+ // },
|
|
|
+ // },
|
|
|
+ // })
|
|
|
+ // if len(updateExtract) > 500 {
|
|
|
+ // mgo.UpSertBulk(extract, updateExtract...)
|
|
|
+ // updateExtract = [][]map[string]interface{}{}
|
|
|
+ // }
|
|
|
+ //} else {
|
|
|
+ //
|
|
|
+ //}
|
|
|
}(tmp)
|
|
|
if len(updateExtract) > 500 {
|
|
|
mgo.UpSertBulk(extract, updateExtract...)
|