|
@@ -13,7 +13,6 @@ import (
|
|
"net"
|
|
"net"
|
|
"os"
|
|
"os"
|
|
"qfw/util"
|
|
"qfw/util"
|
|
- "qfw/util/mongodb"
|
|
|
|
"regexp"
|
|
"regexp"
|
|
"sync"
|
|
"sync"
|
|
"time"
|
|
"time"
|
|
@@ -22,7 +21,7 @@ import (
|
|
var (
|
|
var (
|
|
Sysconfig map[string]interface{} //配置文件
|
|
Sysconfig map[string]interface{} //配置文件
|
|
mconf map[string]interface{} //mongodb配置信息
|
|
mconf map[string]interface{} //mongodb配置信息
|
|
- mgo *mongodb.MongodbSim //mongodb操作对象
|
|
|
|
|
|
+ mgo *MongodbSim //mongodb操作对象
|
|
extract string
|
|
extract string
|
|
udpclient mu.UdpClient //udp对象
|
|
udpclient mu.UdpClient //udp对象
|
|
nextNode []map[string]interface{} //下节点数组
|
|
nextNode []map[string]interface{} //下节点数组
|
|
@@ -38,14 +37,13 @@ var (
|
|
FilterRegTitle_1 = regexp.MustCompile("^_$")
|
|
FilterRegTitle_1 = regexp.MustCompile("^_$")
|
|
FilterRegTitle_2 = regexp.MustCompile("^_$")
|
|
FilterRegTitle_2 = regexp.MustCompile("^_$")
|
|
|
|
|
|
- isMerger bool //是否合并
|
|
|
|
- threadNum int //线程数量
|
|
|
|
- SiteMap map[string]map[string]interface{} //站点map
|
|
|
|
- idtype, sid, eid string //测试人员判重使用
|
|
|
|
|
|
+ isMerger bool //是否合并
|
|
|
|
+ threadNum int //线程数量
|
|
|
|
+ SiteMap map[string]map[string]interface{} //站点map
|
|
|
|
+ idtype, sid, eid string //测试人员判重使用
|
|
)
|
|
)
|
|
|
|
|
|
func init() {
|
|
func init() {
|
|
-
|
|
|
|
flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
|
|
flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
|
|
flag.StringVar(&sid, "sid", "", "开始id")
|
|
flag.StringVar(&sid, "sid", "", "开始id")
|
|
flag.StringVar(&eid, "eid", "", "结束id")
|
|
flag.StringVar(&eid, "eid", "", "结束id")
|
|
@@ -55,13 +53,13 @@ func init() {
|
|
util.ReadConfig(&Sysconfig)
|
|
util.ReadConfig(&Sysconfig)
|
|
nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
|
|
nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
|
|
mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
- mgo = &mongodb.MongodbSim{
|
|
|
|
|
|
+ mgo = &MongodbSim{
|
|
MongodbAddr: mconf["addr"].(string),
|
|
MongodbAddr: mconf["addr"].(string),
|
|
DbName: mconf["db"].(string),
|
|
DbName: mconf["db"].(string),
|
|
Size: util.IntAllDef(mconf["pool"], 10),
|
|
Size: util.IntAllDef(mconf["pool"], 10),
|
|
}
|
|
}
|
|
- extract = mconf["extract"].(string)
|
|
|
|
mgo.InitPool()
|
|
mgo.InitPool()
|
|
|
|
+ extract = mconf["extract"].(string)
|
|
|
|
|
|
dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
|
|
dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
|
|
//加载数据
|
|
//加载数据
|
|
@@ -77,8 +75,8 @@ func init() {
|
|
SiteMap = make(map[string]map[string]interface{}, 0)
|
|
SiteMap = make(map[string]map[string]interface{}, 0)
|
|
start := int(time.Now().Unix())
|
|
start := int(time.Now().Unix())
|
|
sess_site := mgo.GetMgoConn()
|
|
sess_site := mgo.GetMgoConn()
|
|
- defer sess_site.Close()
|
|
|
|
- res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(nil).Sort("_id").Iter()
|
|
|
|
|
|
+ defer mgo.DestoryMongoConn(sess_site)
|
|
|
|
+ res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
|
|
for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
|
|
for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
|
|
data_map := map[string]interface{}{
|
|
data_map := map[string]interface{}{
|
|
"area": util.ObjToString(site_dict["area"]),
|
|
"area": util.ObjToString(site_dict["area"]),
|
|
@@ -102,15 +100,15 @@ func main() {
|
|
}
|
|
}
|
|
|
|
|
|
//测试组人员使用
|
|
//测试组人员使用
|
|
-func mainTT() {
|
|
|
|
|
|
+func mainT() {
|
|
/*
|
|
/*
|
|
- ObjectId("5da3f31aa5cb26b9b798d3aa")
|
|
|
|
- ObjectId("5da418c4a5cb26b9b7e3e9a6")
|
|
|
|
- ObjectId("5df5071ce9d1f601e495fa54")
|
|
|
|
- ObjectId("5e09c05f0cf41612e0626abc")
|
|
|
|
|
|
+ ObjectId("5da3f31aa5cb26b9b798d3aa")
|
|
|
|
+ ObjectId("5da418c4a5cb26b9b7e3e9a6")
|
|
|
|
+ ObjectId("5df5071ce9d1f601e495fa54")
|
|
|
|
+ ObjectId("5e09c05f0cf41612e0626abc")
|
|
*/
|
|
*/
|
|
- //sid = "5da3f31aa5cb26b9b798d3aa"
|
|
|
|
- //eid = "5da418c4a5cb26b9b7e3e9a6"
|
|
|
|
|
|
+ sid = "5df5071ce9d1f601e495fa54"
|
|
|
|
+ eid = "5e09c05f0cf41612e0626abc"
|
|
|
|
|
|
mapinfo := map[string]interface{}{}
|
|
mapinfo := map[string]interface{}{}
|
|
if sid == "" || eid == "" {
|
|
if sid == "" || eid == "" {
|
|
@@ -166,9 +164,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
fmt.Println("开始数据判重")
|
|
fmt.Println("开始数据判重")
|
|
defer util.Catch()
|
|
defer util.Catch()
|
|
//区间id
|
|
//区间id
|
|
- sess := mgo.GetMgoConn()
|
|
|
|
- defer mgo.DestoryMongoConn(sess)
|
|
|
|
- var q map[string]interface{}
|
|
|
|
|
|
+ q := map[string]interface{}{}
|
|
if idtype == "1" {
|
|
if idtype == "1" {
|
|
q = map[string]interface{}{
|
|
q = map[string]interface{}{
|
|
"_id": map[string]interface{}{
|
|
"_id": map[string]interface{}{
|
|
@@ -179,15 +175,18 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
} else {
|
|
} else {
|
|
q = map[string]interface{}{
|
|
q = map[string]interface{}{
|
|
"_id": map[string]interface{}{
|
|
"_id": map[string]interface{}{
|
|
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
|
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- log.Println(mgo.DbName,extract,q)
|
|
|
|
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
|
|
|
+ log.Println(mgo.DbName, extract, q)
|
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
|
+ //it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
|
|
|
|
+ it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
|
|
updateExtract := [][]map[string]interface{}{}
|
|
updateExtract := [][]map[string]interface{}{}
|
|
- log.Println("线程数:",threadNum)
|
|
|
|
|
|
+ log.Println("线程数:", threadNum)
|
|
pool := make(chan bool, threadNum)
|
|
pool := make(chan bool, threadNum)
|
|
|
|
|
|
wg := &sync.WaitGroup{}
|
|
wg := &sync.WaitGroup{}
|
|
@@ -206,7 +205,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
}()
|
|
}()
|
|
info := NewInfo(tmp)
|
|
info := NewInfo(tmp)
|
|
//是否为无效数据
|
|
//是否为无效数据
|
|
- if invalidData(info.buyer, info.projectname, info.projectcode,info.contractnumber) {
|
|
|
|
|
|
+ if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
map[string]interface{}{
|
|
map[string]interface{}{
|
|
"_id": tmp["_id"],
|
|
"_id": tmp["_id"],
|
|
@@ -218,45 +217,45 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
},
|
|
},
|
|
})
|
|
})
|
|
if len(updateExtract) > 500 {
|
|
if len(updateExtract) > 500 {
|
|
- mgo.UpdateBulk(extract, updateExtract...)
|
|
|
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
updateExtract = [][]map[string]interface{}{}
|
|
updateExtract = [][]map[string]interface{}{}
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
b, source, reason := DM.check(info)
|
|
b, source, reason := DM.check(info)
|
|
if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
if b { //有重复,生成更新语句,更新抽取和更新招标
|
|
repeateN++
|
|
repeateN++
|
|
- var is_replace = false
|
|
|
|
- var mergeArr = []int64{} //更改合并数组记录
|
|
|
|
- var newData = &Info{} //更换新的数据池数据
|
|
|
|
|
|
+ var is_replace = false
|
|
|
|
+ var mergeArr = []int64{} //更改合并数组记录
|
|
|
|
+ var newData = &Info{} //更换新的数据池数据
|
|
var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
- var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
|
- if idtype == "1" { //先临时决定一个id
|
|
|
|
|
|
+ var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
|
+ if idtype == "1" { //先临时决定一个id
|
|
repeat_idMap["_id"] = info.id
|
|
repeat_idMap["_id"] = info.id
|
|
merge_idMap["_id"] = source.id
|
|
merge_idMap["_id"] = source.id
|
|
} else {
|
|
} else {
|
|
- repeat_idMap["_id"] = util.StringTOBsonId(info.id)
|
|
|
|
- merge_idMap["_id"] = util.StringTOBsonId(source.id)
|
|
|
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
}
|
|
}
|
|
- repeat_id:=source.id
|
|
|
|
|
|
+ repeat_id := source.id
|
|
//以下合并相关
|
|
//以下合并相关
|
|
if isMerger {
|
|
if isMerger {
|
|
basic_bool := basicDataScore(source, info)
|
|
basic_bool := basicDataScore(source, info)
|
|
if basic_bool {
|
|
if basic_bool {
|
|
//已原始数据为标准 - 对比数据打判重标签-
|
|
//已原始数据为标准 - 对比数据打判重标签-
|
|
- newData, mergeArr,is_replace = mergeDataFields(source, info)
|
|
|
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
DM.replaceSourceData(newData, source.id) //替换
|
|
DM.replaceSourceData(newData, source.id) //替换
|
|
//对比数据打重复标签的id,原始数据id的记录
|
|
//对比数据打重复标签的id,原始数据id的记录
|
|
if idtype == "1" {
|
|
if idtype == "1" {
|
|
repeat_idMap["_id"] = info.id
|
|
repeat_idMap["_id"] = info.id
|
|
merge_idMap["_id"] = source.id
|
|
merge_idMap["_id"] = source.id
|
|
} else {
|
|
} else {
|
|
- repeat_idMap["_id"] = util.StringTOBsonId(info.id)
|
|
|
|
- merge_idMap["_id"] = util.StringTOBsonId(source.id)
|
|
|
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
}
|
|
}
|
|
repeat_id = source.id
|
|
repeat_id = source.id
|
|
} else {
|
|
} else {
|
|
//已对比数据为标准 ,数据池的数据打判重标签
|
|
//已对比数据为标准 ,数据池的数据打判重标签
|
|
- newData, mergeArr,is_replace = mergeDataFields(info, source)
|
|
|
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
DM.replaceSourceData(newData, source.id) //替换
|
|
DM.replaceSourceData(newData, source.id) //替换
|
|
|
|
|
|
//原始数据打重复标签的id, 对比数据id的记录
|
|
//原始数据打重复标签的id, 对比数据id的记录
|
|
@@ -264,19 +263,18 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
repeat_idMap["_id"] = source.id
|
|
repeat_idMap["_id"] = source.id
|
|
merge_idMap["_id"] = info.id
|
|
merge_idMap["_id"] = info.id
|
|
} else {
|
|
} else {
|
|
- repeat_idMap["_id"] = util.StringTOBsonId(source.id)
|
|
|
|
- merge_idMap["_id"] = util.StringTOBsonId(info.id)
|
|
|
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
}
|
|
}
|
|
repeat_id = info.id
|
|
repeat_id = info.id
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
- merge_map := make(map[string]interface{},0)
|
|
|
|
- if is_replace {//有过合并-更新数据
|
|
|
|
|
|
+ merge_map := make(map[string]interface{}, 0)
|
|
|
|
+ if is_replace { //有过合并-更新数据
|
|
|
|
|
|
merge_map = map[string]interface{}{
|
|
merge_map = map[string]interface{}{
|
|
"$set": map[string]interface{}{
|
|
"$set": map[string]interface{}{
|
|
- "merge":newData.mergemap,
|
|
|
|
|
|
+ "merge": newData.mergemap,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
|
|
@@ -304,11 +302,11 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
} else if value == 9 {
|
|
} else if value == 9 {
|
|
merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
- }else if value == 10 {
|
|
|
|
|
|
+ } else if value == 10 {
|
|
merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
- }else if value == 11 {
|
|
|
|
|
|
+ } else if value == 11 {
|
|
merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
- }else {
|
|
|
|
|
|
+ } else {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//模板数据更新
|
|
//模板数据更新
|
|
@@ -319,15 +317,14 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
//重复数据打标签
|
|
//重复数据打标签
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
repeat_idMap,
|
|
repeat_idMap,
|
|
map[string]interface{}{
|
|
map[string]interface{}{
|
|
"$set": map[string]interface{}{
|
|
"$set": map[string]interface{}{
|
|
- "repeat": 1,
|
|
|
|
|
|
+ "repeat": 1,
|
|
"repeat_reason": reason,
|
|
"repeat_reason": reason,
|
|
- "repeat_id":repeat_id,
|
|
|
|
|
|
+ "repeat_id": repeat_id,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
})
|
|
@@ -336,14 +333,14 @@ func task(data []byte, mapInfo map[string]interface{}) {
|
|
}
|
|
}
|
|
}(tmp)
|
|
}(tmp)
|
|
if len(updateExtract) > 500 {
|
|
if len(updateExtract) > 500 {
|
|
- mgo.UpdateBulk(extract, updateExtract...)
|
|
|
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
updateExtract = [][]map[string]interface{}{}
|
|
updateExtract = [][]map[string]interface{}{}
|
|
}
|
|
}
|
|
tmp = make(map[string]interface{})
|
|
tmp = make(map[string]interface{})
|
|
}
|
|
}
|
|
wg.Wait()
|
|
wg.Wait()
|
|
if len(updateExtract) > 0 {
|
|
if len(updateExtract) > 0 {
|
|
- mgo.UpdateBulk(extract, updateExtract...)
|
|
|
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
//mgo.UpdateBulk(bidding, updateBidding...)
|
|
//mgo.UpdateBulk(bidding, updateBidding...)
|
|
}
|
|
}
|
|
log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
@@ -390,8 +387,8 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
} else {
|
|
} else {
|
|
q = map[string]interface{}{
|
|
q = map[string]interface{}{
|
|
"_id": map[string]interface{}{
|
|
"_id": map[string]interface{}{
|
|
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
|
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -400,7 +397,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
minTime, maxTime := int64(0), int64(0)
|
|
minTime, maxTime := int64(0), int64(0)
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
//取出最大最小时间
|
|
//取出最大最小时间
|
|
- if minTime == 0 || maxTime == 0 &&util.Int64All(tmp["publishtime"])!=0{
|
|
|
|
|
|
+ if minTime == 0 || maxTime == 0 && util.Int64All(tmp["publishtime"]) != 0 {
|
|
minTime = util.Int64All(tmp["publishtime"])
|
|
minTime = util.Int64All(tmp["publishtime"])
|
|
maxTime = util.Int64All(tmp["publishtime"])
|
|
maxTime = util.Int64All(tmp["publishtime"])
|
|
} else {
|
|
} else {
|
|
@@ -414,19 +411,19 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//时间不正确时
|
|
//时间不正确时
|
|
- if minTime==0&&maxTime==0 {
|
|
|
|
|
|
+ if minTime == 0 && maxTime == 0 {
|
|
log.Println("段数据区间 publishtime不符合")
|
|
log.Println("段数据区间 publishtime不符合")
|
|
return
|
|
return
|
|
}
|
|
}
|
|
fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
|
|
fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
|
|
- gtid,lteid:= util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
|
|
|
|
- fmt.Println(gtid,lteid)
|
|
|
|
- HM = NewHistorymap(gtid,lteid, minTime, maxTime)
|
|
|
|
|
|
+ gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
|
|
|
|
+ fmt.Println(gtid, lteid)
|
|
|
|
+ HM = NewHistorymap(gtid, lteid, minTime, maxTime)
|
|
fmt.Println("开始历史数据判重")
|
|
fmt.Println("开始历史数据判重")
|
|
|
|
|
|
defer util.Catch()
|
|
defer util.Catch()
|
|
//区间id
|
|
//区间id
|
|
- sess_history:= mgo.GetMgoConn()
|
|
|
|
|
|
+ sess_history := mgo.GetMgoConn()
|
|
defer mgo.DestoryMongoConn(sess_history)
|
|
defer mgo.DestoryMongoConn(sess_history)
|
|
var q_history map[string]interface{}
|
|
var q_history map[string]interface{}
|
|
if idtype == "1" {
|
|
if idtype == "1" {
|
|
@@ -439,15 +436,15 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
} else {
|
|
} else {
|
|
q_history = map[string]interface{}{
|
|
q_history = map[string]interface{}{
|
|
"_id": map[string]interface{}{
|
|
"_id": map[string]interface{}{
|
|
- "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
|
- "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- log.Println(mgo.DbName,extract,q_history)
|
|
|
|
|
|
+ log.Println(mgo.DbName, extract, q_history)
|
|
it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
|
|
it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
|
|
updateExtract := [][]map[string]interface{}{}
|
|
updateExtract := [][]map[string]interface{}{}
|
|
- log.Println("线程数:",threadNum)
|
|
|
|
|
|
+ log.Println("线程数:", threadNum)
|
|
pool := make(chan bool, threadNum)
|
|
pool := make(chan bool, threadNum)
|
|
wg := &sync.WaitGroup{}
|
|
wg := &sync.WaitGroup{}
|
|
//mapLock := &sync.Mutex{}
|
|
//mapLock := &sync.Mutex{}
|
|
@@ -464,7 +461,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
wg.Done()
|
|
wg.Done()
|
|
}()
|
|
}()
|
|
info := NewInfo(tmp)
|
|
info := NewInfo(tmp)
|
|
- if invalidData(info.buyer, info.projectname, info.projectcode,info.contractnumber) {
|
|
|
|
|
|
+ if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
map[string]interface{}{
|
|
map[string]interface{}{
|
|
"_id": tmp["_id"],
|
|
"_id": tmp["_id"],
|
|
@@ -476,7 +473,7 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
},
|
|
},
|
|
})
|
|
})
|
|
if len(updateExtract) > 500 {
|
|
if len(updateExtract) > 500 {
|
|
- mgo.UpdateBulk(extract, updateExtract...)
|
|
|
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
updateExtract = [][]map[string]interface{}{}
|
|
updateExtract = [][]map[string]interface{}{}
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
@@ -499,38 +496,38 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
})
|
|
})
|
|
} else {
|
|
} else {
|
|
repeateN++
|
|
repeateN++
|
|
- var is_replace = false
|
|
|
|
- var mergeArr = []int64{} //更改合并数组记录
|
|
|
|
- var newData = &Info{} //更换新的数据池数据
|
|
|
|
|
|
+ var is_replace = false
|
|
|
|
+ var mergeArr = []int64{} //更改合并数组记录
|
|
|
|
+ var newData = &Info{} //更换新的数据池数据
|
|
var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
var repeat_idMap = map[string]interface{}{} //记录判重的
|
|
- var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
|
- if idtype == "1" { //先临时决定一个id
|
|
|
|
|
|
+ var merge_idMap = map[string]interface{}{} //记录合并的
|
|
|
|
+ if idtype == "1" { //先临时决定一个id
|
|
repeat_idMap["_id"] = info.id
|
|
repeat_idMap["_id"] = info.id
|
|
merge_idMap["_id"] = source.id
|
|
merge_idMap["_id"] = source.id
|
|
} else {
|
|
} else {
|
|
- repeat_idMap["_id"] = util.StringTOBsonId(info.id)
|
|
|
|
- merge_idMap["_id"] = util.StringTOBsonId(source.id)
|
|
|
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
}
|
|
}
|
|
- repeat_id:=source.id
|
|
|
|
|
|
+ repeat_id := source.id
|
|
//以下合并相关
|
|
//以下合并相关
|
|
if isMerger {
|
|
if isMerger {
|
|
basic_bool := basicDataScore(source, info)
|
|
basic_bool := basicDataScore(source, info)
|
|
if basic_bool {
|
|
if basic_bool {
|
|
//已原始数据为标准 - 对比数据打判重标签-
|
|
//已原始数据为标准 - 对比数据打判重标签-
|
|
- newData, mergeArr,is_replace = mergeDataFields(source, info)
|
|
|
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(source, info)
|
|
DM.replaceSourceData(newData, source.id) //替换
|
|
DM.replaceSourceData(newData, source.id) //替换
|
|
//对比数据打重复标签的id,原始数据id的记录
|
|
//对比数据打重复标签的id,原始数据id的记录
|
|
if idtype == "1" {
|
|
if idtype == "1" {
|
|
repeat_idMap["_id"] = info.id
|
|
repeat_idMap["_id"] = info.id
|
|
merge_idMap["_id"] = source.id
|
|
merge_idMap["_id"] = source.id
|
|
} else {
|
|
} else {
|
|
- repeat_idMap["_id"] = util.StringTOBsonId(info.id)
|
|
|
|
- merge_idMap["_id"] = util.StringTOBsonId(source.id)
|
|
|
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(info.id)
|
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(source.id)
|
|
}
|
|
}
|
|
repeat_id = source.id
|
|
repeat_id = source.id
|
|
} else {
|
|
} else {
|
|
//已对比数据为标准 ,数据池的数据打判重标签
|
|
//已对比数据为标准 ,数据池的数据打判重标签
|
|
- newData, mergeArr,is_replace = mergeDataFields(info, source)
|
|
|
|
|
|
+ newData, mergeArr, is_replace = mergeDataFields(info, source)
|
|
DM.replaceSourceData(newData, source.id) //替换
|
|
DM.replaceSourceData(newData, source.id) //替换
|
|
|
|
|
|
//原始数据打重复标签的id, 对比数据id的记录
|
|
//原始数据打重复标签的id, 对比数据id的记录
|
|
@@ -538,19 +535,18 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
repeat_idMap["_id"] = source.id
|
|
repeat_idMap["_id"] = source.id
|
|
merge_idMap["_id"] = info.id
|
|
merge_idMap["_id"] = info.id
|
|
} else {
|
|
} else {
|
|
- repeat_idMap["_id"] = util.StringTOBsonId(source.id)
|
|
|
|
- merge_idMap["_id"] = util.StringTOBsonId(info.id)
|
|
|
|
|
|
+ repeat_idMap["_id"] = StringTOBsonId(source.id)
|
|
|
|
+ merge_idMap["_id"] = StringTOBsonId(info.id)
|
|
}
|
|
}
|
|
repeat_id = info.id
|
|
repeat_id = info.id
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
- merge_map := make(map[string]interface{},0)
|
|
|
|
- if is_replace {//有过合并-更新数据
|
|
|
|
|
|
+ merge_map := make(map[string]interface{}, 0)
|
|
|
|
+ if is_replace { //有过合并-更新数据
|
|
|
|
|
|
merge_map = map[string]interface{}{
|
|
merge_map = map[string]interface{}{
|
|
"$set": map[string]interface{}{
|
|
"$set": map[string]interface{}{
|
|
- "merge":newData.mergemap,
|
|
|
|
|
|
+ "merge": newData.mergemap,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
|
|
@@ -578,11 +574,11 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
|
|
} else if value == 9 {
|
|
} else if value == 9 {
|
|
merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
|
|
- }else if value == 10 {
|
|
|
|
|
|
+ } else if value == 10 {
|
|
merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
|
|
- }else if value == 11 {
|
|
|
|
|
|
+ } else if value == 11 {
|
|
merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
|
|
- }else {
|
|
|
|
|
|
+ } else {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
//模板数据更新
|
|
//模板数据更新
|
|
@@ -593,15 +589,14 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
//重复数据打标签
|
|
//重复数据打标签
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
updateExtract = append(updateExtract, []map[string]interface{}{
|
|
repeat_idMap,
|
|
repeat_idMap,
|
|
map[string]interface{}{
|
|
map[string]interface{}{
|
|
"$set": map[string]interface{}{
|
|
"$set": map[string]interface{}{
|
|
- "repeat": 1,
|
|
|
|
|
|
+ "repeat": 1,
|
|
"repeat_reason": reason,
|
|
"repeat_reason": reason,
|
|
- "repeat_id":repeat_id,
|
|
|
|
|
|
+ "repeat_id": repeat_id,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
})
|
|
@@ -611,14 +606,14 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
}
|
|
}
|
|
}(tmp)
|
|
}(tmp)
|
|
if len(updateExtract) > 500 {
|
|
if len(updateExtract) > 500 {
|
|
- mgo.UpdateBulk(extract, updateExtract...)
|
|
|
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
updateExtract = [][]map[string]interface{}{}
|
|
updateExtract = [][]map[string]interface{}{}
|
|
}
|
|
}
|
|
tmp = make(map[string]interface{})
|
|
tmp = make(map[string]interface{})
|
|
}
|
|
}
|
|
wg.Wait()
|
|
wg.Wait()
|
|
if len(updateExtract) > 0 {
|
|
if len(updateExtract) > 0 {
|
|
- mgo.UpdateBulk(extract, updateExtract...)
|
|
|
|
|
|
+ mgo.UpSertBulk(extract, updateExtract...)
|
|
//mgo.UpdateBulk(bidding, updateBidding...)
|
|
//mgo.UpdateBulk(bidding, updateBidding...)
|
|
}
|
|
}
|
|
log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
|
|
@@ -647,14 +642,14 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
|
|
}
|
|
}
|
|
|
|
|
|
//合并字段-并更新merge字段的值
|
|
//合并字段-并更新merge字段的值
|
|
-func mergeDataFields(source *Info, info *Info) (*Info, []int64,bool) {
|
|
|
|
|
|
+func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
|
|
|
|
|
|
- merge_recordMap := make(map[string]interface{},0)
|
|
|
|
|
|
+ merge_recordMap := make(map[string]interface{}, 0)
|
|
mergeArr := make([]int64, 0)
|
|
mergeArr := make([]int64, 0)
|
|
//是否替换数据了-记录原始的数据
|
|
//是否替换数据了-记录原始的数据
|
|
- is_replace :=false
|
|
|
|
|
|
+ is_replace := false
|
|
//1、城市
|
|
//1、城市
|
|
- if source.area == "" || source.area == "全国"{
|
|
|
|
|
|
+ if source.area == "" || source.area == "全国" {
|
|
//为空
|
|
//为空
|
|
if info.area != "全国" && info.area != "" {
|
|
if info.area != "全国" && info.area != "" {
|
|
merge_recordMap["area"] = source.area
|
|
merge_recordMap["area"] = source.area
|
|
@@ -664,7 +659,7 @@ func mergeDataFields(source *Info, info *Info) (*Info, []int64,bool) {
|
|
mergeArr = append(mergeArr, 1)
|
|
mergeArr = append(mergeArr, 1)
|
|
is_replace = true
|
|
is_replace = true
|
|
}
|
|
}
|
|
- }else {
|
|
|
|
|
|
+ } else {
|
|
//不为空-查看站点相关-有值必替换
|
|
//不为空-查看站点相关-有值必替换
|
|
if source.is_site {
|
|
if source.is_site {
|
|
//是站点替换的城市
|
|
//是站点替换的城市
|
|
@@ -749,21 +744,17 @@ func mergeDataFields(source *Info, info *Info) (*Info, []int64,bool) {
|
|
is_replace = true
|
|
is_replace = true
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- if is_replace {//有过替换更新
|
|
|
|
|
|
+ if is_replace { //有过替换更新
|
|
//总次数+1
|
|
//总次数+1
|
|
- source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"])+1
|
|
|
|
|
|
+ source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
|
|
merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
|
|
merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
|
|
//和哪一个数据id进行非空替换的-记录
|
|
//和哪一个数据id进行非空替换的-记录
|
|
- key:=info.id
|
|
|
|
|
|
+ key := info.id
|
|
source.mergemap[key] = merge_recordMap
|
|
source.mergemap[key] = merge_recordMap
|
|
}
|
|
}
|
|
|
|
|
|
-
|
|
|
|
//以上合并过于简单,待进一步优化
|
|
//以上合并过于简单,待进一步优化
|
|
- return source, mergeArr,is_replace
|
|
|
|
|
|
+ return source, mergeArr, is_replace
|
|
}
|
|
}
|
|
|
|
|
|
//权重评估
|
|
//权重评估
|
|
@@ -867,7 +858,7 @@ func basicDataScore(v *Info, info *Info) bool {
|
|
if v.buyer != "" {
|
|
if v.buyer != "" {
|
|
m++
|
|
m++
|
|
}
|
|
}
|
|
- if v.projectcode != ""||v.contractnumber != "" {
|
|
|
|
|
|
+ if v.projectcode != "" || v.contractnumber != "" {
|
|
m++
|
|
m++
|
|
}
|
|
}
|
|
if v.budget != 0 {
|
|
if v.budget != 0 {
|
|
@@ -898,7 +889,7 @@ func basicDataScore(v *Info, info *Info) bool {
|
|
if info.buyer != "" {
|
|
if info.buyer != "" {
|
|
n++
|
|
n++
|
|
}
|
|
}
|
|
- if info.projectcode != "" || info.contractnumber != ""{
|
|
|
|
|
|
+ if info.projectcode != "" || info.contractnumber != "" {
|
|
n++
|
|
n++
|
|
}
|
|
}
|
|
if info.budget != 0 {
|
|
if info.budget != 0 {
|
|
@@ -951,7 +942,7 @@ func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
|
|
if d4 != "" {
|
|
if d4 != "" {
|
|
n++
|
|
n++
|
|
}
|
|
}
|
|
- if n == 0 {
|
|
|
|
|
|
+ if n == 0 {
|
|
return true
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
return false
|