1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261 |
- package main
- /**
- 招标信息判重
- **/
- import (
- "encoding/json"
- "flag"
- "fmt"
- "log"
- mu "mfw/util"
- "net"
- "os"
- "qfw/util"
- "regexp"
- "sync"
- "time"
- "github.com/cron"
- "gopkg.in/mgo.v2/bson"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- mconf map[string]interface{} //mongodb配置信息
- mgo *MongodbSim //mongodb操作对象
- extract string
- extract_back string
- udpclient mu.UdpClient //udp对象
- nextNode []map[string]interface{} //下节点数组
- dupdays = 5 //初始化判重范围
- DM *datamap //
- HM *historymap //判重数据
- lastid = ""
- //正则筛选相关
- FilterRegTitle = regexp.MustCompile("^_$")
- FilterRegTitle_0 = regexp.MustCompile("^_$")
- FilterRegTitle_1 = regexp.MustCompile("^_$")
- FilterRegTitle_2 = regexp.MustCompile("^_$")
- isMerger bool //是否合并
- Is_Sort bool //是否排序
- threadNum int //线程数量
- SiteMap map[string]map[string]interface{} //站点map
- LowHeavy bool //低质量数据判重
- TimingTask bool //是否定时任务
- timingSpanDay int64 //时间跨度
- timingPubScope int64 //发布时间周期
- sid, eid string //测试人员判重使用
- )
- func init() {
- flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
- flag.StringVar(&sid, "sid", "", "开始id")
- flag.StringVar(&eid, "eid", "", "结束id")
- flag.Parse()
- //172.17.145.163:27080
- util.ReadConfig(&Sysconfig)
- nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- mconf = Sysconfig["mongodb"].(map[string]interface{})
- mgo = &MongodbSim{
- MongodbAddr: mconf["addr"].(string),
- DbName: mconf["db"].(string),
- Size: util.IntAllDef(mconf["pool"], 10),
- }
- mgo.InitPool()
- extract = mconf["extract"].(string)
- extract_back = mconf["extract_back"].(string)
- dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
- //加载数据
- DM = NewDatamap(dupdays, lastid)
- FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
- FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
- FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
- FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
- isMerger = Sysconfig["isMerger"].(bool)
- Is_Sort = Sysconfig["isSort"].(bool)
- threadNum = util.IntAllDef(Sysconfig["threads"], 1)
- LowHeavy = Sysconfig["lowHeavy"].(bool)
- TimingTask = Sysconfig["timingTask"].(bool)
- timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
- timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
- //站点配置
- site := mconf["site"].(map[string]interface{})
- SiteMap = make(map[string]map[string]interface{}, 0)
- start := int(time.Now().Unix())
- sess_site := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess_site)
- res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
- for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
- data_map := map[string]interface{}{
- "area": util.ObjToString(site_dict["area"]),
- "city": util.ObjToString(site_dict["city"]),
- "district": util.ObjToString(site_dict["district"]),
- "sitetype": util.ObjToString(site_dict["sitetype"]),
- "level": util.ObjToString(site_dict["level"]),
- }
- SiteMap[util.ObjToString(site_dict["site"])] = data_map
- }
- log.Printf("站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
- }
- func main() {
- go checkMapJob()
- updport := Sysconfig["udpport"].(string)
- udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
- if TimingTask {
- go timedTaskDay()
- } else {
- udpclient.Listen(processUdpMsg)
- log.Println("Udp服务监听", updport)
- }
- time.Sleep(99999 * time.Hour)
- }
- //测试组人员使用
- func mainT() {
- /*
- ObjectId("5da3f31aa5cb26b9b798d3aa")
- ObjectId("5da418c4a5cb26b9b7e3e9a6")
- ObjectId("5da3f2c5a5cb26b9b79847fc")
- ObjectId("5db2735ba5cb26b9b7c99c6f")
- */
- if TimingTask {
- log.Println("定时任务测试开始")
- go timedTaskDay()
- time.Sleep(99999 * time.Hour)
- } else {
- sid = "5c2c10fda5cb26b9b75e6f7f"
- eid = "5e976e4a50b5ea296ef376b9"
- log.Println("正常判重测试开始")
- log.Println(sid, "---", eid)
- mapinfo := map[string]interface{}{}
- if sid == "" || eid == "" {
- log.Println("sid,eid参数不能为空")
- os.Exit(0)
- }
- mapinfo["gtid"] = sid
- mapinfo["lteid"] = eid
- mapinfo["stop"] = "true"
- task([]byte{}, mapinfo)
- time.Sleep(10 * time.Second)
- }
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- fmt.Println("接受的段数据")
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- //从表中开始处理
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Println("err:", err, "mapInfo:", mapInfo)
- if err != nil {
- udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
- } else if mapInfo != nil {
- taskType := util.ObjToString(mapInfo["stype"])
- if taskType == "historyTask" {
- //历史更新流程
- go historyTask(data, mapInfo)
- } else if taskType == "normalTask" {
- //判重流程
- go task(data, mapInfo)
- } else {
- //其他
- go task(data, mapInfo)
- }
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Println("ok:", ok)
- udptaskmap.Delete(ok)
- }
- }
- }
- //开始判重程序
- func task(data []byte, mapInfo map[string]interface{}) {
- log.Println("开始数据判重")
- defer util.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println(mgo.DbName, extract, q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- //是否排序
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("_id").Iter()
- if Is_Sort {
- log.Println("排序:publishtime")
- it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
- }
- //it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
- updateExtract := [][]map[string]interface{}{}
- log.Println("线程数:", threadNum)
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- n, repeateN := 0, 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- if n%10000 == 0 {
- log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
- }
- if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
- tmp = make(map[string]interface{})
- repeateN++
- continue
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- info := NewInfo(tmp)
- if !LowHeavy { //是否进行低质量数据判重
- if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
- updateExtract = append(updateExtract, []map[string]interface{}{
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": -1, //无效数据标签
- },
- },
- })
- if len(updateExtract) > 500 {
- mgo.UpSertBulk(extract, updateExtract...)
- updateExtract = [][]map[string]interface{}{}
- }
- return
- }
- }
- b, source, reason := DM.check(info)
- if b { //有重复,生成更新语句,更新抽取和更新招标
- repeateN++
- var is_replace = false
- var mergeArr = []int64{} //更改合并数组记录
- var newData = &Info{} //更换新的数据池数据
- var repeat_idMap = map[string]interface{}{} //记录判重的
- var merge_idMap = map[string]interface{}{} //记录合并的
- repeat_idMap["_id"] = StringTOBsonId(info.id)
- merge_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id := source.id //初始化一个数据
- if isMerger { //合并相关
- basic_bool := basicDataScore(source, info)
- if basic_bool {
- //已原始数据为标准 - 对比数据打判重标签-
- newData, mergeArr, is_replace = mergeDataFields(source, info)
- DM.replaceSourceData(newData, source.id) //替换
- //对比数据打重复标签的id,原始数据id的记录
- repeat_idMap["_id"] = StringTOBsonId(info.id)
- merge_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id = source.id
- } else {
- //已对比数据为标准 ,数据池的数据打判重标签
- newData, mergeArr, is_replace = mergeDataFields(info, source)
- DM.replaceSourceData(newData, source.id) //替换
- //原始数据打重复标签的id, 对比数据id的记录
- repeat_idMap["_id"] = StringTOBsonId(source.id)
- merge_idMap["_id"] = StringTOBsonId(info.id)
- repeat_id = info.id
- }
- merge_map := make(map[string]interface{}, 0)
- if is_replace { //有过合并-更新数据
- merge_map = map[string]interface{}{
- "$set": map[string]interface{}{
- "merge": newData.mergemap,
- },
- }
- //更新合并后的数据
- for _, value := range mergeArr {
- if value == 0 {
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
- } else if value == 1 {
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
- } else if value == 2 {
- merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
- } else if value == 3 {
- merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
- } else if value == 4 {
- merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
- } else if value == 5 {
- merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
- } else if value == 6 {
- merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
- } else if value == 7 {
- merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
- } else if value == 8 {
- merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
- } else if value == 9 {
- merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
- } else if value == 10 {
- merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
- } else if value == 11 {
- merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
- } else {
- }
- }
- //模板数据更新
- updateExtract = append(updateExtract, []map[string]interface{}{
- merge_idMap,
- merge_map,
- })
- }
- } else { //高质量数据
- basic_bool := basicDataScore(source, info)
- if !basic_bool {
- DM.replaceSourceData(info, source.id) //替换
- repeat_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id = info.id
- }
- }
- //重复数据打标签
- updateExtract = append(updateExtract, []map[string]interface{}{
- repeat_idMap,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeat_reason": reason,
- "repeat_id": repeat_id,
- },
- },
- })
- }
- }(tmp)
- if len(updateExtract) > 500 {
- mgo.UpSertBulk(extract, updateExtract...)
- updateExtract = [][]map[string]interface{}{}
- }
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- if len(updateExtract) > 0 {
- mgo.UpSertBulk(extract, updateExtract...)
- }
- log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
- //任务完成,开始发送广播通知下面节点
- if n > repeateN && mapInfo["stop"] == nil {
- for _, to := range nextNode {
- sid, _ := mapInfo["gtid"].(string)
- eid, _ := mapInfo["lteid"].(string)
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": util.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- }
- //支持历史更新
- func historyTask(data []byte, mapInfo map[string]interface{}) {
- fmt.Println("开始取历史时间段")
- defer util.Catch()
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
- minTime, maxTime := int64(0), int64(0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
- //取出最大最小时间
- info_time := tmp["comeintime"]
- if Is_Sort {
- info_time = tmp["publishtime"]
- }
- if minTime == 0 || maxTime == 0 && util.Int64All(info_time) != 0 {
- minTime = util.Int64All(info_time)
- maxTime = util.Int64All(info_time)
- } else {
- t := util.Int64All(info_time)
- if t < minTime && t != 0 {
- minTime = t
- }
- if t > maxTime && t != 0 {
- maxTime = t
- }
- }
- }
- //时间不正确时
- if minTime == 0 && maxTime == 0 {
- log.Println("段数据区间 不符合")
- return
- }
- fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
- gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
- fmt.Println(gtid, lteid)
- HM = NewHistorymap(gtid, lteid, minTime, maxTime)
- fmt.Println("开始历史数据判重")
- defer util.Catch()
- //区间id
- sess_history := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess_history)
- q_history := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println(mgo.DbName, extract, q_history)
- //是否排序
- it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Iter()
- if Is_Sort {
- it_history = sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
- }
- updateExtract := [][]map[string]interface{}{}
- log.Println("线程数:", threadNum)
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- n, repeateN := 0, 0
- for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
- if n%10000 == 0 {
- log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
- }
- if util.IntAll(tmp["dataging"]) == 1 {
- tmp = make(map[string]interface{})
- continue
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- info := NewInfo(tmp)
- if !LowHeavy { //是否进行低质量数据判重
- if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
- updateExtract = append(updateExtract, []map[string]interface{}{
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": -1, //无效数据标签
- },
- },
- })
- if len(updateExtract) > 500 {
- mgo.UpSertBulk(extract, updateExtract...)
- updateExtract = [][]map[string]interface{}{}
- }
- return
- }
- }
- b, source, reason := HM.checkHistory(info)
- if b { //有重复,生成更新语句,更新抽取和更新招标
- if reason == "未判重记录" {
- fmt.Println("未判重记录")
- //把info的数据判重的标签更换,并新增字段
- HM.replaceSourceData(info, info.id) //替换即添加
- updateExtract = append(updateExtract, []map[string]interface{}{
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 0,
- "repeatid": -2,
- },
- },
- })
- } else {
- repeateN++
- var is_replace = false
- var mergeArr = []int64{} //更改合并数组记录
- var newData = &Info{} //更换新的数据池数据
- var repeat_idMap = map[string]interface{}{} //记录判重的
- var merge_idMap = map[string]interface{}{} //记录合并的
- repeat_idMap["_id"] = StringTOBsonId(info.id)
- merge_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id := source.id
- //以下合并相关
- if isMerger {
- basic_bool := basicDataScore(source, info)
- if basic_bool {
- //已原始数据为标准 - 对比数据打判重标签-
- newData, mergeArr, is_replace = mergeDataFields(source, info)
- HM.replaceSourceData(newData, source.id) //替换
- //对比数据打重复标签的id,原始数据id的记录
- repeat_idMap["_id"] = StringTOBsonId(info.id)
- merge_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id = source.id
- } else {
- //已对比数据为标准 ,数据池的数据打判重标签
- newData, mergeArr, is_replace = mergeDataFields(info, source)
- HM.replaceSourceData(newData, source.id) //替换
- //原始数据打重复标签的id, 对比数据id的记录
- repeat_idMap["_id"] = StringTOBsonId(source.id)
- merge_idMap["_id"] = StringTOBsonId(info.id)
- repeat_id = info.id
- }
- merge_map := make(map[string]interface{}, 0)
- if is_replace { //有过合并-更新数据
- merge_map = map[string]interface{}{
- "$set": map[string]interface{}{
- "merge": newData.mergemap,
- },
- }
- //更新合并后的数据
- for _, value := range mergeArr {
- if value == 0 {
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
- } else if value == 1 {
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
- } else if value == 2 {
- merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
- } else if value == 3 {
- merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
- } else if value == 4 {
- merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
- } else if value == 5 {
- merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
- } else if value == 6 {
- merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
- } else if value == 7 {
- merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
- } else if value == 8 {
- merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
- } else if value == 9 {
- merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
- } else if value == 10 {
- merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
- } else if value == 11 {
- merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
- } else {
- }
- }
- //模板数据更新
- updateExtract = append(updateExtract, []map[string]interface{}{
- merge_idMap,
- merge_map,
- })
- }
- } else { //高质量数据
- basic_bool := basicDataScore(source, info)
- if !basic_bool {
- HM.replaceSourceData(info, source.id) //替换
- repeat_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id = info.id
- }
- }
- //重复数据打标签
- updateExtract = append(updateExtract, []map[string]interface{}{
- repeat_idMap,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeat_reason": reason,
- "repeat_id": repeat_id,
- },
- },
- })
- }
- }
- }(tmp)
- if len(updateExtract) > 500 {
- mgo.UpSertBulk(extract, updateExtract...)
- updateExtract = [][]map[string]interface{}{}
- }
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- if len(updateExtract) > 0 {
- mgo.UpSertBulk(extract, updateExtract...)
- //mgo.UpdateBulk(bidding, updateBidding...)
- }
- log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
- //任务完成,开始发送广播通知下面节点
- if n > repeateN && mapInfo["stop"] == nil {
- for _, to := range nextNode {
- sid, _ := mapInfo["gtid"].(string)
- eid, _ := mapInfo["lteid"].(string)
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": util.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- }
- //定时任务
- func timedTaskDay() {
- c := cron.New()
- c.AddFunc("0 0 1 * * ?", func() { movedata() }) //每天凌晨1点执行一次
- c.AddFunc("0 0 2 * * ?", func() { timedTaskOnce() }) //每天凌晨2点执行一次
- c.Start()
- timedTaskOnce()
- }
- func timedTaskOnce() {
- log.Println("开始一次定时任务")
- defer util.Catch()
- now := time.Now()
- preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
- curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
- task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
- task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
- //发布时间间隔时间 半年
- //测试数据 6点每个间隔6个月
- //task_sid = "5e20965785a9271abf0ad6bd"
- //task_eid = "5e20968d85a9271abf0ad6c2"
- //between_time := int64(1565801997)
- //测试数据 180个点 每个隔1天
- //task_sid = "5e208f9b50b5ea296eccbb8a"
- //task_eid = "5e20968d85a9271abf0ad6c2"
- //between_time := int64(1563641997)
- between_time := curTime.Unix() - (86400 * timingPubScope)
- lasttime := int64(0)
- log.Println(task_sid, task_eid, curTime.Unix(), between_time)
- //区间id
- q_start := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": StringTOBsonId(task_sid),
- "$lte": StringTOBsonId(task_eid),
- },
- }
- sess_start := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess_start)
- it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
- startNum := 0
- for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
- if startNum%10000 == 0 {
- log.Println("正序遍历:", startNum)
- }
- //取-符合-发布时间半年内的数据
- if util.IntAll(tmp_start["dataging"]) == 1 {
- pubtime := util.Int64All(tmp_start["publishtime"])
- if pubtime > 0 && pubtime >= between_time {
- lasttime = pubtime
- log.Println("找到第一条符合条件的数据")
- break
- }
- }
- }
- log.Println("... ...", lasttime)
- if lasttime <= 0 {
- log.Println("没找到dataging==1的数据")
- return
- }
- //构建第一条需要判重的数据 (数据池)
- log.Println("开始构建第一条需要判重的数据 ---(数据池)")
- DM = TimedTaskDatamap(dupdays, lasttime)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gte": StringTOBsonId(task_sid),
- "$lte": StringTOBsonId(task_eid),
- },
- }
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
- updateExtract := [][]map[string]interface{}{}
- log.Println("线程数只能为1")
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- n, repeateN := 0, 0
- pre_publishtime := int64(0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
- if n%10000 == 0 {
- log.Println("定时任务判重当前数量current:", n, tmp["_id"], "repeateN:", repeateN)
- }
- //log.Println("当前测试重复数量:",repeateN)
- if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
- tmp = make(map[string]interface{})
- continue
- }
- if util.IntAll(tmp["dataging"]) != 1 {
- tmp = make(map[string]interface{})
- continue
- }
- pool <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //log.Println("上个时间:",pre_publishtime,"当前时间--",util.Int64All(tmp["publishtime"]))
- if pre_publishtime == 0 {
- pre_publishtime = util.Int64All(tmp["publishtime"])
- } else {
- //时间跨度是否大于X天
- if (util.Int64All(tmp["publishtime"]) - pre_publishtime) >= (86400 * timingSpanDay) {
- //重新构建数据池
- //log.Println("超过跨度-重新构建:",util.Int64All(tmp["publishtime"]),"---",pre_publishtime)
- pre_publishtime = util.Int64All(tmp["publishtime"])
- DM = TimedTaskDatamap(dupdays, pre_publishtime)
- }
- }
- info := NewInfo(tmp)
- if !LowHeavy { //是否进行低质量数据判重
- if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
- log.Println("无效数据")
- updateExtract = append(updateExtract, []map[string]interface{}{
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": -1, //无效数据标签
- "dataging": 0,
- },
- },
- })
- if len(updateExtract) > 500 {
- mgo.UpSertBulk(extract, updateExtract...)
- updateExtract = [][]map[string]interface{}{}
- }
- return
- }
- }
- b, source, reason := DM.check(info)
- //log.Println("判重结果", b, reason)
- if b { //有重复,生成更新语句,更新抽取和更新招标
- repeateN++
- var is_replace = false
- var mergeArr = []int64{} //更改合并数组记录
- var newData = &Info{} //更换新的数据池数据
- var repeat_idMap = map[string]interface{}{} //记录判重的
- var merge_idMap = map[string]interface{}{} //记录合并的
- repeat_idMap["_id"] = StringTOBsonId(info.id)
- merge_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id := source.id //初始化一个数据
- if isMerger { //合并相关
- basic_bool := basicDataScore(source, info)
- if basic_bool {
- //已原始数据为标准 - 对比数据打判重标签-
- newData, mergeArr, is_replace = mergeDataFields(source, info)
- DM.replaceSourceData(newData, source.id) //替换
- //对比数据打重复标签的id,原始数据id的记录
- repeat_idMap["_id"] = StringTOBsonId(info.id)
- merge_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id = source.id
- } else {
- //已对比数据为标准 ,数据池的数据打判重标签
- newData, mergeArr, is_replace = mergeDataFields(info, source)
- DM.replaceSourceData(newData, source.id) //替换
- //原始数据打重复标签的id, 对比数据id的记录
- repeat_idMap["_id"] = StringTOBsonId(source.id)
- merge_idMap["_id"] = StringTOBsonId(info.id)
- repeat_id = info.id
- }
- merge_map := make(map[string]interface{}, 0)
- if is_replace { //有过合并-更新数据
- merge_map = map[string]interface{}{
- "$set": map[string]interface{}{
- "merge": newData.mergemap,
- "dataging": 0,
- },
- }
- //更新合并后的数据
- for _, value := range mergeArr {
- if value == 0 {
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
- } else if value == 1 {
- merge_map["$set"].(map[string]interface{})["area"] = newData.area
- merge_map["$set"].(map[string]interface{})["city"] = newData.city
- } else if value == 2 {
- merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
- } else if value == 3 {
- merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
- } else if value == 4 {
- merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
- } else if value == 5 {
- merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
- } else if value == 6 {
- merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
- } else if value == 7 {
- merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
- } else if value == 8 {
- merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
- } else if value == 9 {
- merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
- } else if value == 10 {
- merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
- } else if value == 11 {
- merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
- } else {
- }
- }
- //模板数据更新
- updateExtract = append(updateExtract, []map[string]interface{}{
- merge_idMap,
- merge_map,
- })
- }
- } else { //高质量数据
- basic_bool := basicDataScore(source, info)
- if !basic_bool {
- DM.replaceSourceData(info, source.id) //替换
- repeat_idMap["_id"] = StringTOBsonId(source.id)
- repeat_id = info.id
- }
- }
- //重复数据打标签
- updateExtract = append(updateExtract, []map[string]interface{}{
- repeat_idMap,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeat_reason": reason,
- "repeat_id": repeat_id,
- "dataging": 0,
- },
- },
- })
- }
- }(tmp)
- if len(updateExtract) > 500 {
- mgo.UpSertBulk(extract, updateExtract...)
- updateExtract = [][]map[string]interface{}{}
- }
- tmp = make(map[string]interface{})
- }
- wg.Wait()
- if len(updateExtract) > 0 {
- mgo.UpSertBulk(extract, updateExtract...)
- }
- log.Println("this timeTask over.", n, "repeateN:", repeateN)
- //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
- if n > repeateN {
- for _, to := range nextNode {
- next_sid := util.BsonIdToSId(task_sid)
- next_eid := util.BsonIdToSId(task_eid)
- key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": next_sid,
- "lteid": next_eid,
- "stype": util.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- }
- //合并字段-并更新merge字段的值
- func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
- merge_recordMap := make(map[string]interface{}, 0)
- mergeArr := make([]int64, 0)
- //是否替换数据了-记录原始的数据
- is_replace := false
- //1、城市
- if source.area == "" || source.area == "全国" {
- //为空
- if info.area != "全国" && info.area != "" {
- merge_recordMap["area"] = source.area
- merge_recordMap["city"] = source.city
- source.area = info.area
- source.city = info.city
- mergeArr = append(mergeArr, 1)
- is_replace = true
- }
- } else {
- //不为空-查看站点相关-有值必替换
- if source.is_site {
- //是站点替换的城市
- merge_recordMap["site_area"] = source.area
- merge_recordMap["site_city"] = source.city
- mergeArr = append(mergeArr, 0)
- is_replace = true
- source.is_site = false
- }
- }
- //2、项目名称
- if source.projectname == "" && info.projectname != "" {
- merge_recordMap["projectname"] = source.projectname
- source.projectname = info.projectname
- mergeArr = append(mergeArr, 2)
- is_replace = true
- }
- //3、项目编号
- if source.projectcode == "" && info.projectcode != "" {
- merge_recordMap["projectcode"] = source.projectcode
- source.projectcode = info.projectcode
- mergeArr = append(mergeArr, 3)
- is_replace = true
- }
- //4、采购单位
- if source.buyer == "" && info.buyer != "" {
- merge_recordMap["buyer"] = source.buyer
- source.buyer = info.buyer
- mergeArr = append(mergeArr, 4)
- is_replace = true
- }
- //5、预算
- if source.budget == 0 && info.budget != 0 {
- merge_recordMap["budget"] = source.budget
- source.budget = info.budget
- mergeArr = append(mergeArr, 5)
- is_replace = true
- }
- //6、中标单位
- if source.winner == "" && info.winner != "" {
- merge_recordMap["winner"] = source.winner
- source.winner = info.winner
- mergeArr = append(mergeArr, 6)
- is_replace = true
- }
- //7、中标金额
- if source.bidamount == 0 && info.bidamount != 0 {
- merge_recordMap["bidamount"] = source.bidamount
- source.bidamount = info.bidamount
- mergeArr = append(mergeArr, 7)
- is_replace = true
- }
- //8、开标时间-地点
- if source.bidopentime == 0 && info.bidopentime != 0 {
- merge_recordMap["bidopentime"] = source.bidopentime
- source.bidopentime = info.bidopentime
- mergeArr = append(mergeArr, 8)
- is_replace = true
- }
- //9、合同编号
- if source.contractnumber == "" && info.contractnumber != "" {
- merge_recordMap["contractnumber"] = source.contractnumber
- source.contractnumber = info.contractnumber
- mergeArr = append(mergeArr, 9)
- is_replace = true
- }
- //10、发布时间
- if source.publishtime == 0 && info.publishtime != 0 {
- merge_recordMap["publishtime"] = source.publishtime
- source.publishtime = info.publishtime
- mergeArr = append(mergeArr, 10)
- is_replace = true
- }
- //11、代理机构
- if source.agency == "" && info.agency != "" {
- merge_recordMap["agency"] = source.agency
- source.agency = info.agency
- mergeArr = append(mergeArr, 11)
- is_replace = true
- }
- if is_replace { //有过替换更新
- //总次数+1
- source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
- merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
- //和哪一个数据id进行非空替换的-记录
- key := info.id
- source.mergemap[key] = merge_recordMap
- }
- //待进一步优化
- return source, mergeArr, is_replace
- }
- //权重评估
- func basicDataScore(v *Info, info *Info) bool {
- /*
- 权重评估
- 网站优先级判定规则:
- 1、中央>省>市>县区
- 2、政府采购>公共资源>采购单位官网>招标代理公司/平台
- */
- v_score, info_score := -1, -1
- dict_v := SiteMap[v.site]
- dict_info := SiteMap[info.site]
- //先判断level
- if dict_v != nil {
- v_level := util.ObjToString(dict_v["level"])
- if v_level == "中央" {
- v_score = 4
- } else if v_level == "省级" {
- v_score = 3
- } else if v_level == "市级" {
- v_score = 2
- } else if v_level == "县区" {
- v_score = 1
- } else if v_level == "" {
- } else {
- v_score = 0
- }
- }
- if dict_info != nil {
- info_level := util.ObjToString(dict_info["level"])
- if info_level == "中央" {
- info_score = 4
- } else if info_level == "省级" {
- info_score = 3
- } else if info_level == "市级" {
- info_score = 2
- } else if info_level == "县区" {
- info_score = 1
- } else if info_level == "" {
- } else {
- v_score = 0
- }
- }
- if v_score > info_score {
- return true
- }
- if v_score < info_score {
- return false
- }
- //判断sitetype
- if dict_v != nil {
- v_sitetype := util.ObjToString(dict_v["sitetype"])
- if v_sitetype == "政府采购" || v_sitetype == "政府门户" {
- v_score = 4
- } else if v_sitetype == "公共资源" {
- v_score = 3
- } else if v_sitetype == "官方网站" {
- v_score = 2
- } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
- v_score = 1
- } else if v_sitetype == "" {
- } else {
- v_score = 0
- }
- }
- if dict_info != nil {
- info_sitetype := util.ObjToString(dict_info["sitetype"])
- if info_sitetype == "政府采购" || info_sitetype == "政府门户" {
- info_score = 4
- } else if info_sitetype == "公共资源" {
- info_score = 3
- } else if info_sitetype == "官方网站" {
- info_score = 2
- } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
- info_score = 1
- } else if info_sitetype == "" {
- } else {
- info_score = 0
- }
- }
- if v_score > info_score {
- return true
- }
- if v_score < info_score {
- return false
- }
- //网站评估
- m, n := 0, 0
- if v.projectname != "" {
- m++
- }
- if v.buyer != "" {
- m++
- }
- if v.projectcode != "" || v.contractnumber != "" {
- m++
- }
- if v.budget != 0 {
- m++
- }
- if v.bidamount != 0 {
- m++
- }
- if v.winner != "" {
- m++
- }
- if v.bidopentime != 0 {
- m++
- }
- if v.bidopenaddress != "" {
- m++
- }
- if v.agency != "" {
- m = m + 2
- }
- if v.city != "" {
- m = m + 2
- }
- if info.projectname != "" {
- n++
- }
- if info.buyer != "" {
- n++
- }
- if info.projectcode != "" || info.contractnumber != "" {
- n++
- }
- if info.budget != 0 {
- n++
- }
- if info.bidamount != 0 {
- n++
- }
- if info.winner != "" {
- n++
- }
- if info.bidopentime != 0 {
- n++
- }
- if info.bidopenaddress != "" {
- n++
- }
- if info.agency != "" {
- n = n + 2
- }
- if info.city != "" {
- n = n + 2
- }
- if m > n {
- return true
- } else if m == n {
- if v.publishtime >= info.publishtime {
- return true
- } else {
- return false
- }
- } else {
- return false
- }
- }
- //无效数据
- func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
- var n int
- if d1 != "" {
- n++
- }
- if d2 != "" {
- n++
- }
- if d3 != "" {
- n++
- }
- if d4 != "" {
- n++
- }
- if n == 0 {
- return true
- }
- return false
- }
- //迁移数据dupdays+5之前的数据
- func movedata() {
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- year, month, day := time.Now().Date()
- q := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix(),
- },
- }
- log.Println(q)
- it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
- index := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- mgo.Save(extract_back, tmp)
- tmp = map[string]interface{}{}
- if index%1000 == 0 {
- log.Println("index", index)
- }
- }
- log.Println("save to", extract_back, " ok index", index)
- delnum := mgo.Delete(extract, q)
- log.Println("remove from ", extract, delnum)
- }
|