main.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/cron"
  10. "log"
  11. mu "mfw/util"
  12. "net"
  13. "os"
  14. "qfw/util"
  15. "regexp"
  16. "sync"
  17. "time"
  18. "gopkg.in/mgo.v2/bson"
  19. )
  20. var (
  21. Sysconfig map[string]interface{} //配置文件
  22. mconf map[string]interface{} //mongodb配置信息
  23. mgo *MongodbSim //mongodb操作对象
  24. extract string
  25. extract_back string
  26. udpclient mu.UdpClient //udp对象
  27. nextNode []map[string]interface{} //下节点数组
  28. dupdays = 5 //初始化判重范围
  29. DM *datamap //
  30. //正则筛选相关
  31. FilterRegTitle = regexp.MustCompile("^_$")
  32. FilterRegTitle_0 = regexp.MustCompile("^_$")
  33. FilterRegTitle_1 = regexp.MustCompile("^_$")
  34. FilterRegTitle_2 = regexp.MustCompile("^_$")
  35. isMerger bool //是否合并
  36. Is_Sort bool //是否排序
  37. threadNum int //线程数量
  38. SiteMap map[string]map[string]interface{} //站点map
  39. LowHeavy bool //低质量数据判重
  40. TimingTask bool //是否定时任务
  41. timingSpanDay int64 //时间跨度
  42. timingPubScope int64 //发布时间周期
  43. sid,eid,lastid string //测试人员判重使用
  44. IdType bool //默认object类型
  45. )
  46. func init() {
  47. return
  48. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  49. flag.StringVar(&sid, "sid", "", "开始id")
  50. flag.StringVar(&eid, "eid", "", "结束id")
  51. flag.Parse()
  52. //172.17.145.163:27080
  53. util.ReadConfig(&Sysconfig)
  54. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  55. mconf = Sysconfig["mongodb"].(map[string]interface{})
  56. mgo = &MongodbSim{
  57. MongodbAddr: mconf["addr"].(string),
  58. DbName: mconf["db"].(string),
  59. Size: util.IntAllDef(mconf["pool"], 10),
  60. }
  61. mgo.InitPool()
  62. extract = mconf["extract"].(string)
  63. extract_back = mconf["extract_back"].(string)
  64. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  65. //加载数据
  66. DM = NewDatamap(dupdays, lastid)
  67. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  68. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  69. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  70. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  71. isMerger = Sysconfig["isMerger"].(bool)
  72. Is_Sort = Sysconfig["isSort"].(bool)
  73. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  74. LowHeavy = Sysconfig["lowHeavy"].(bool)
  75. TimingTask = Sysconfig["timingTask"].(bool)
  76. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  77. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  78. //站点配置
  79. site := mconf["site"].(map[string]interface{})
  80. SiteMap = make(map[string]map[string]interface{}, 0)
  81. start := int(time.Now().Unix())
  82. sess_site := mgo.GetMgoConn()
  83. defer mgo.DestoryMongoConn(sess_site)
  84. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  85. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  86. data_map := map[string]interface{}{
  87. "area": util.ObjToString(site_dict["area"]),
  88. "city": util.ObjToString(site_dict["city"]),
  89. "district": util.ObjToString(site_dict["district"]),
  90. "sitetype": util.ObjToString(site_dict["sitetype"]),
  91. "level": util.ObjToString(site_dict["level"]),
  92. "weight": util.ObjToString(site_dict["weight"]),
  93. }
  94. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  95. }
  96. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  97. }
  98. func mainT() {
  99. go checkMapJob()
  100. updport := Sysconfig["udpport"].(string)
  101. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  102. udpclient.Listen(processUdpMsg)
  103. log.Println("Udp服务监听", updport)
  104. if TimingTask {
  105. go timedTaskDay()
  106. }
  107. time.Sleep(99999 * time.Hour)
  108. }
  109. //测试组人员使用
  110. func main() {
  111. if TimingTask {
  112. log.Println("定时任务测试开始")
  113. go timedTaskDay()
  114. time.Sleep(99999 * time.Hour)
  115. } else {
  116. //2019年8月1日-8月17日 712646
  117. IdType = true
  118. sid = "5d41607aa5cb26b9b734fe30"
  119. eid = "5eb172e1f2c1a7850bad1c39"
  120. log.Println("正常判重测试开始")
  121. log.Println(sid, "---", eid)
  122. mapinfo := map[string]interface{}{}
  123. if sid == "" || eid == "" {
  124. log.Println("sid,eid参数不能为空")
  125. os.Exit(0)
  126. }
  127. mapinfo["gtid"] = sid
  128. mapinfo["lteid"] = eid
  129. //mapinfo["stop"] = "true"
  130. task([]byte{}, mapinfo)
  131. time.Sleep(99999 * time.Second)
  132. }
  133. }
  134. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  135. fmt.Println("接受的段数据")
  136. switch act {
  137. case mu.OP_TYPE_DATA: //上个节点的数据
  138. //从表中开始处理
  139. var mapInfo map[string]interface{}
  140. err := json.Unmarshal(data, &mapInfo)
  141. log.Println("err:", err, "mapInfo:", mapInfo)
  142. if err != nil {
  143. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  144. } else if mapInfo != nil {
  145. taskType := util.ObjToString(mapInfo["stype"])
  146. if taskType == "normalTask" {
  147. //判重流程
  148. go task(data, mapInfo)
  149. } else {
  150. //其他
  151. go task(data, mapInfo)
  152. }
  153. key, _ := mapInfo["key"].(string)
  154. if key == "" {
  155. key = "udpok"
  156. }
  157. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  158. }
  159. case mu.OP_NOOP: //下个节点回应
  160. ok := string(data)
  161. if ok != "" {
  162. log.Println("ok:", ok)
  163. udptaskmap.Delete(ok)
  164. }
  165. }
  166. }
  167. //开始判重程序
  168. func task(data []byte, mapInfo map[string]interface{}) {
  169. log.Println("开始数据判重")
  170. defer util.Catch()
  171. //区间id
  172. q := map[string]interface{}{
  173. "_id": map[string]interface{}{
  174. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  175. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  176. },
  177. }
  178. if IdType {
  179. q = map[string]interface{}{
  180. "_id": map[string]interface{}{
  181. "$gt": mapInfo["gtid"].(string),
  182. "$lte": mapInfo["lteid"].(string),
  183. },
  184. }
  185. }
  186. log.Println(mgo.DbName, extract, q)
  187. sess := mgo.GetMgoConn()
  188. defer mgo.DestoryMongoConn(sess)
  189. //是否排序
  190. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("_id").Iter()
  191. if Is_Sort {
  192. log.Println("排序:publishtime")
  193. it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  194. }
  195. updateExtract := [][]map[string]interface{}{}
  196. log.Println("线程数:", threadNum)
  197. pool := make(chan bool, threadNum)
  198. wg := &sync.WaitGroup{}
  199. n, repeateN := 0, 0
  200. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  201. if n%10000 == 0 {
  202. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  203. }
  204. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
  205. tmp = make(map[string]interface{})
  206. repeateN++
  207. continue
  208. }
  209. pool <- true
  210. wg.Add(1)
  211. go func(tmp map[string]interface{}) {
  212. defer func() {
  213. <-pool
  214. wg.Done()
  215. }()
  216. info := NewInfo(tmp)
  217. if !LowHeavy { //是否进行低质量数据判重
  218. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  219. updateExtract = append(updateExtract, []map[string]interface{}{
  220. map[string]interface{}{
  221. "_id": tmp["_id"],
  222. },
  223. map[string]interface{}{
  224. "$set": map[string]interface{}{
  225. "repeat": -1, //无效数据标签
  226. },
  227. },
  228. })
  229. if len(updateExtract) > 500 {
  230. mgo.UpSertBulk(extract, updateExtract...)
  231. updateExtract = [][]map[string]interface{}{}
  232. }
  233. return
  234. }
  235. }
  236. b, source, reason := DM.check(info)
  237. if b { //有重复,生成更新语句,更新抽取和更新招标
  238. repeateN++
  239. var is_replace = false
  240. var mergeArr = []int64{} //更改合并数组记录
  241. var newData = &Info{} //更换新的数据池数据
  242. var repeat_idMap = map[string]interface{}{} //记录判重的
  243. var merge_idMap = map[string]interface{}{} //记录合并的
  244. repeat_idMap["_id"] = StringTOBsonId(info.id)
  245. if IdType {
  246. repeat_idMap["_id"] = info.id
  247. }
  248. merge_idMap["_id"] = StringTOBsonId(source.id)
  249. repeat_id := source.id //初始化一个数据
  250. if isMerger { //合并相关
  251. basic_bool := basicDataScore(source, info)
  252. if basic_bool {
  253. //已原始数据为标准 - 对比数据打判重标签-
  254. newData, mergeArr, is_replace = mergeDataFields(source, info)
  255. DM.replaceSourceData(newData, source) //替换
  256. //对比数据打重复标签的id,原始数据id的记录
  257. repeat_idMap["_id"] = StringTOBsonId(info.id)
  258. merge_idMap["_id"] = StringTOBsonId(source.id)
  259. if IdType {
  260. repeat_idMap["_id"] = info.id
  261. merge_idMap["_id"] = source.id
  262. }
  263. repeat_id = source.id
  264. } else {
  265. //已对比数据为标准 ,数据池的数据打判重标签
  266. newData, mergeArr, is_replace = mergeDataFields(info, source)
  267. DM.replaceSourceData(newData, source) //替换
  268. //原始数据打重复标签的id, 对比数据id的记录
  269. repeat_idMap["_id"] = StringTOBsonId(source.id)
  270. merge_idMap["_id"] = StringTOBsonId(info.id)
  271. if IdType {
  272. repeat_idMap["_id"] = source.id
  273. merge_idMap["_id"] = info.id
  274. }
  275. repeat_id = info.id
  276. }
  277. merge_map := make(map[string]interface{}, 0)
  278. if is_replace { //有过合并-更新数据
  279. merge_map = map[string]interface{}{
  280. "$set": map[string]interface{}{
  281. "merge": newData.mergemap,
  282. },
  283. }
  284. //更新合并后的数据
  285. for _, value := range mergeArr {
  286. if value == 0 {
  287. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  288. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  289. } else if value == 1 {
  290. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  291. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  292. } else if value == 2 {
  293. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  294. } else if value == 3 {
  295. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  296. } else if value == 4 {
  297. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  298. } else if value == 5 {
  299. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  300. } else if value == 6 {
  301. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  302. } else if value == 7 {
  303. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  304. } else if value == 8 {
  305. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  306. } else if value == 9 {
  307. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  308. } else if value == 10 {
  309. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  310. } else if value == 11 {
  311. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  312. } else {
  313. }
  314. }
  315. //模板数据更新
  316. updateExtract = append(updateExtract, []map[string]interface{}{
  317. merge_idMap,
  318. merge_map,
  319. })
  320. }
  321. } else { //高质量数据
  322. basic_bool := basicDataScore(source, info)
  323. if !basic_bool {
  324. DM.replaceSourceData(info, source) //替换
  325. repeat_idMap["_id"] = StringTOBsonId(source.id)
  326. if IdType {
  327. repeat_idMap["_id"] = source.id
  328. }
  329. repeat_id = info.id
  330. }
  331. }
  332. log.Println("最终结果","目标id:",repeat_idMap["_id"])
  333. //重复数据打标签
  334. updateExtract = append(updateExtract, []map[string]interface{}{
  335. repeat_idMap,
  336. map[string]interface{}{
  337. "$set": map[string]interface{}{
  338. "repeat": 1,
  339. "repeat_reason": reason,
  340. "repeat_id": repeat_id,
  341. },
  342. },
  343. })
  344. }
  345. }(tmp)
  346. if len(updateExtract) > 500 {
  347. mgo.UpSertBulk(extract, updateExtract...)
  348. updateExtract = [][]map[string]interface{}{}
  349. }
  350. tmp = make(map[string]interface{})
  351. }
  352. wg.Wait()
  353. if len(updateExtract) > 0 {
  354. mgo.UpSertBulk(extract, updateExtract...)
  355. }
  356. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  357. //任务完成,开始发送广播通知下面节点
  358. if n > repeateN && mapInfo["stop"] == nil {
  359. log.Println("判重任务完成发送udp")
  360. for _, to := range nextNode {
  361. sid, _ := mapInfo["gtid"].(string)
  362. eid, _ := mapInfo["lteid"].(string)
  363. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  364. by, _ := json.Marshal(map[string]interface{}{
  365. "gtid": sid,
  366. "lteid": eid,
  367. "stype": util.ObjToString(to["stype"]),
  368. "key": key,
  369. })
  370. addr := &net.UDPAddr{
  371. IP: net.ParseIP(to["addr"].(string)),
  372. Port: util.IntAll(to["port"]),
  373. }
  374. node := &udpNode{by, addr, time.Now().Unix(), 0}
  375. udptaskmap.Store(key, node)
  376. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  377. }
  378. }
  379. }
  380. //定时任务
  381. func timedTaskDay() {
  382. log.Println("部署定时任务")
  383. c := cron.New()
  384. c.AddFunc("0 0 1 * * ?", func() { movedata() }) //每天凌晨1点执行一次
  385. c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() }) //每天凌晨2点执行一次
  386. c.Start()
  387. //timedTaskOnce()
  388. }
  389. func timedTaskOnce() {
  390. log.Println("开始一次定时任务")
  391. defer util.Catch()
  392. //当前时间-8 -4 小时
  393. now := time.Now()
  394. log.Println(now)
  395. preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local)
  396. curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local)
  397. log.Println(preTime,curTime)
  398. task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  399. task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  400. between_time := curTime.Unix() - (86400 * timingPubScope)
  401. log.Println("id区间:",task_sid, task_eid,"时间:", between_time)
  402. //区间id
  403. q_start := map[string]interface{}{
  404. "_id": map[string]interface{}{
  405. "$gte": StringTOBsonId(task_sid),
  406. "$lte": StringTOBsonId(task_eid),
  407. },
  408. }
  409. sess := mgo.GetMgoConn()
  410. defer mgo.DestoryMongoConn(sess)
  411. it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter()
  412. num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
  413. updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
  414. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  415. dayArr := []map[string]interface{}{}
  416. for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ {
  417. if num%10000 == 0 {
  418. log.Println("正序遍历:", num)
  419. }
  420. //取-符合-发布时间半年内的数据
  421. if util.IntAll(tmp["dataging"]) == 1 {
  422. pubtime := util.Int64All(tmp["publishtime"])
  423. if pubtime > 0 && pubtime >= between_time {
  424. oknum++
  425. if deterTime==0 {
  426. log.Println("找到第一条符合条件的数据")
  427. deterTime = util.Int64All(tmp["publishtime"])
  428. dayArr = append(dayArr,tmp)
  429. }else {
  430. if pubtime-deterTime >timingSpanDay*86400 {
  431. //新数组重新构建,当前组数据加到全部组数据
  432. pendAllArr = append(pendAllArr,dayArr)
  433. dayArr = []map[string]interface{}{}
  434. deterTime = util.Int64All(tmp["publishtime"])
  435. dayArr = append(dayArr,tmp)
  436. }else {
  437. dayArr = append(dayArr,tmp)
  438. }
  439. }
  440. }else {
  441. //不在两年内的也清标记
  442. updateExtract = append(updateExtract, []map[string]interface{}{
  443. map[string]interface{}{
  444. "_id": tmp["_id"],
  445. },
  446. map[string]interface{}{
  447. "$set": map[string]interface{}{
  448. "dataging": 0,
  449. },
  450. },
  451. })
  452. if len(updateExtract) > 50 {
  453. mgo.UpSertBulk(extract, updateExtract...)
  454. updateExtract = [][]map[string]interface{}{}
  455. }
  456. }
  457. }
  458. tmp = make(map[string]interface{})
  459. }
  460. //批量更新标记
  461. if len(updateExtract) > 0 {
  462. mgo.UpSertBulk(extract, updateExtract...)
  463. updateExtract = [][]map[string]interface{}{}
  464. }
  465. if len(dayArr)>0 {
  466. pendAllArr = append(pendAllArr,dayArr)
  467. dayArr = []map[string]interface{}{}
  468. }
  469. log.Println("查询数量:",num,"符合条件:",oknum)
  470. if len(pendAllArr) <= 0 {
  471. log.Println("没找到dataging==1的数据")
  472. return
  473. }
  474. //测试分组数量是否正确
  475. testNum:=0
  476. for k,v:=range pendAllArr {
  477. log.Println("第",k,"组--","数量:",len(v))
  478. testNum = testNum+len(v)
  479. }
  480. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  481. n, repeateN := 0, 0
  482. for k,v:=range pendAllArr {
  483. //构建当前组的数据池
  484. log.Println("构建第",k,"组---(数据池)")
  485. DM = TimedTaskDatamap(dupdays, util.Int64All(v[0]["publishtime"]))
  486. log.Println("开始遍历判重第",k,"组 共计数量:",len(v))
  487. n = n+len(v)
  488. log.Println("统计目前总数量:",n,"重复数量:",repeateN)
  489. for _,tmp:=range v {
  490. info := NewInfo(tmp)
  491. if !LowHeavy { //是否进行低质量数据判重
  492. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  493. log.Println("无效数据")
  494. updateExtract = append(updateExtract, []map[string]interface{}{
  495. map[string]interface{}{
  496. "_id": tmp["_id"],
  497. },
  498. map[string]interface{}{
  499. "$set": map[string]interface{}{
  500. "repeat": -1, //无效数据标签
  501. "dataging": 0,
  502. },
  503. },
  504. })
  505. if len(updateExtract) > 50 {
  506. mgo.UpSertBulk(extract, updateExtract...)
  507. updateExtract = [][]map[string]interface{}{}
  508. }
  509. continue
  510. }
  511. }
  512. b, source, reason := DM.check(info)
  513. if b { //有重复,生成更新语句,更新抽取和更新招标
  514. log.Println("判重结果", b, reason,"目标id",info.id)
  515. repeateN++
  516. //重复数据打标签
  517. updateExtract = append(updateExtract, []map[string]interface{}{
  518. map[string]interface{}{
  519. "_id": tmp["_id"],
  520. },
  521. map[string]interface{}{
  522. "$set": map[string]interface{}{
  523. "repeat": 1,
  524. "repeat_reason": reason,
  525. "repeat_id": source.id,
  526. "dataging": 0,
  527. },
  528. },
  529. })
  530. }else {
  531. updateExtract = append(updateExtract, []map[string]interface{}{
  532. map[string]interface{}{
  533. "_id": tmp["_id"],
  534. },
  535. map[string]interface{}{
  536. "$set": map[string]interface{}{
  537. "dataging": 0,//符合条件的都为dataging==0
  538. },
  539. },
  540. })
  541. }
  542. if len(updateExtract) > 50 {
  543. mgo.UpSertBulk(extract, updateExtract...)
  544. updateExtract = [][]map[string]interface{}{}
  545. }
  546. }
  547. }
  548. if len(updateExtract) > 0 {
  549. mgo.UpSertBulk(extract, updateExtract...)
  550. updateExtract = [][]map[string]interface{}{}
  551. }
  552. log.Println("this timeTask over.", n, "repeateN:", repeateN)
  553. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  554. if n > repeateN {
  555. for _, to := range nextNode {
  556. next_sid := util.BsonIdToSId(task_sid)
  557. next_eid := util.BsonIdToSId(task_eid)
  558. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  559. by, _ := json.Marshal(map[string]interface{}{
  560. "gtid": next_sid,
  561. "lteid": next_eid,
  562. "stype": util.ObjToString(to["stype"]),
  563. "key": key,
  564. })
  565. addr := &net.UDPAddr{
  566. IP: net.ParseIP(to["addr"].(string)),
  567. Port: util.IntAll(to["port"]),
  568. }
  569. node := &udpNode{by, addr, time.Now().Unix(), 0}
  570. udptaskmap.Store(key, node)
  571. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  572. }
  573. }
  574. }
  575. //合并字段-并更新merge字段的值
  576. func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
  577. merge_recordMap := make(map[string]interface{}, 0)
  578. mergeArr := make([]int64, 0)
  579. //是否替换数据了-记录原始的数据
  580. is_replace := false
  581. //1、城市
  582. if source.area == "" || source.area == "全国" {
  583. //为空
  584. if info.area != "全国" && info.area != "" {
  585. merge_recordMap["area"] = source.area
  586. merge_recordMap["city"] = source.city
  587. source.area = info.area
  588. source.city = info.city
  589. mergeArr = append(mergeArr, 1)
  590. is_replace = true
  591. }
  592. } else {
  593. //不为空-查看站点相关-有值必替换
  594. if source.is_site {
  595. //是站点替换的城市
  596. merge_recordMap["site_area"] = source.area
  597. merge_recordMap["site_city"] = source.city
  598. mergeArr = append(mergeArr, 0)
  599. is_replace = true
  600. source.is_site = false
  601. }
  602. }
  603. //2、项目名称
  604. if source.projectname == "" && info.projectname != "" {
  605. merge_recordMap["projectname"] = source.projectname
  606. source.projectname = info.projectname
  607. mergeArr = append(mergeArr, 2)
  608. is_replace = true
  609. }
  610. //3、项目编号
  611. if source.projectcode == "" && info.projectcode != "" {
  612. merge_recordMap["projectcode"] = source.projectcode
  613. source.projectcode = info.projectcode
  614. mergeArr = append(mergeArr, 3)
  615. is_replace = true
  616. }
  617. //4、采购单位
  618. if source.buyer == "" && info.buyer != "" {
  619. merge_recordMap["buyer"] = source.buyer
  620. source.buyer = info.buyer
  621. mergeArr = append(mergeArr, 4)
  622. is_replace = true
  623. }
  624. //5、预算
  625. if source.budget == 0 && info.budget != 0 {
  626. merge_recordMap["budget"] = source.budget
  627. source.budget = info.budget
  628. mergeArr = append(mergeArr, 5)
  629. is_replace = true
  630. }
  631. //6、中标单位
  632. if source.winner == "" && info.winner != "" {
  633. merge_recordMap["winner"] = source.winner
  634. source.winner = info.winner
  635. mergeArr = append(mergeArr, 6)
  636. is_replace = true
  637. }
  638. //7、中标金额
  639. if source.bidamount == 0 && info.bidamount != 0 {
  640. merge_recordMap["bidamount"] = source.bidamount
  641. source.bidamount = info.bidamount
  642. mergeArr = append(mergeArr, 7)
  643. is_replace = true
  644. }
  645. //8、开标时间-地点
  646. if source.bidopentime == 0 && info.bidopentime != 0 {
  647. merge_recordMap["bidopentime"] = source.bidopentime
  648. source.bidopentime = info.bidopentime
  649. mergeArr = append(mergeArr, 8)
  650. is_replace = true
  651. }
  652. //9、合同编号
  653. if source.contractnumber == "" && info.contractnumber != "" {
  654. merge_recordMap["contractnumber"] = source.contractnumber
  655. source.contractnumber = info.contractnumber
  656. mergeArr = append(mergeArr, 9)
  657. is_replace = true
  658. }
  659. //10、发布时间
  660. if source.publishtime == 0 && info.publishtime != 0 {
  661. merge_recordMap["publishtime"] = source.publishtime
  662. source.publishtime = info.publishtime
  663. mergeArr = append(mergeArr, 10)
  664. is_replace = true
  665. }
  666. //11、代理机构
  667. if source.agency == "" && info.agency != "" {
  668. merge_recordMap["agency"] = source.agency
  669. source.agency = info.agency
  670. mergeArr = append(mergeArr, 11)
  671. is_replace = true
  672. }
  673. if is_replace { //有过替换更新
  674. //总次数+1
  675. source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
  676. merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
  677. //和哪一个数据id进行非空替换的-记录
  678. key := info.id
  679. source.mergemap[key] = merge_recordMap
  680. }
  681. //待进一步优化
  682. return source, mergeArr, is_replace
  683. }
  684. //权重评估
  685. func basicDataScore(v *Info, info *Info) bool {
  686. /*
  687. 权重评估
  688. 网站优先级判定规则:
  689. 1、中央>省>市>县区
  690. 2、政府采购>公共资源>采购单位官网>招标代理公司/平台
  691. */
  692. v_score, info_score := -1, -1
  693. dict_v := SiteMap[v.site]
  694. dict_info := SiteMap[info.site]
  695. //先判断level
  696. if dict_v != nil {
  697. v_level := util.ObjToString(dict_v["level"])
  698. if v_level == "国家" {
  699. v_score = 4
  700. } else if v_level == "省级" {
  701. v_score = 3
  702. } else if v_level == "市级" {
  703. v_score = 2
  704. } else if v_level == "县区" {
  705. v_score = 1
  706. } else if v_level == "" {
  707. } else {
  708. v_score = 0
  709. }
  710. }
  711. if dict_info != nil {
  712. info_level := util.ObjToString(dict_info["level"])
  713. if info_level == "国家" {
  714. info_score = 4
  715. } else if info_level == "省级" {
  716. info_score = 3
  717. } else if info_level == "市级" {
  718. info_score = 2
  719. } else if info_level == "县区" {
  720. info_score = 1
  721. } else if info_level == "" {
  722. } else {
  723. v_score = 0
  724. }
  725. }
  726. if v_score > info_score {
  727. return true
  728. }
  729. if v_score < info_score {
  730. return false
  731. }
  732. //判断sitetype
  733. if dict_v != nil {
  734. v_sitetype := util.ObjToString(dict_v["sitetype"])
  735. if v_sitetype == "政府采购" {
  736. v_score = 4
  737. } else if v_sitetype == "公共资源" {
  738. v_score = 3
  739. } else if v_sitetype == "官方网站"|| v_sitetype == "政府门户" {
  740. v_score = 2
  741. } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
  742. v_score = 1
  743. } else if v_sitetype == "" {
  744. } else {
  745. v_score = 0
  746. }
  747. }
  748. if dict_info != nil {
  749. info_sitetype := util.ObjToString(dict_info["sitetype"])
  750. if info_sitetype == "政府采购" {
  751. info_score = 4
  752. } else if info_sitetype == "公共资源" {
  753. info_score = 3
  754. } else if info_sitetype == "官方网站"|| info_sitetype == "政府门户" {
  755. info_score = 2
  756. } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
  757. info_score = 1
  758. } else if info_sitetype == "" {
  759. } else {
  760. info_score = 0
  761. }
  762. }
  763. if v_score > info_score {
  764. return true
  765. }
  766. if v_score < info_score {
  767. return false
  768. }
  769. if v_score == info_score {//同sitetype 情况下 分析weight
  770. v_weight := util.IntAll(dict_v["weight"])
  771. info_weight := util.IntAll(dict_info["weight"])
  772. if v_weight>info_weight {
  773. return true
  774. }
  775. if info_weight>v_weight {
  776. return false
  777. }
  778. }
  779. //网站评估
  780. m, n := 0, 0
  781. if v.projectname != "" {
  782. m++
  783. }
  784. if v.buyer != "" {
  785. m++
  786. }
  787. if v.projectcode != "" || v.contractnumber != "" {
  788. m++
  789. }
  790. if v.budget != 0 {
  791. m++
  792. }
  793. if v.bidamount != 0 {
  794. m++
  795. }
  796. if v.winner != "" {
  797. m++
  798. }
  799. if v.bidopentime != 0 {
  800. m++
  801. }
  802. if v.bidopenaddress != "" {
  803. m++
  804. }
  805. if v.agency != "" {
  806. m = m + 2
  807. }
  808. if v.city != "" {
  809. m = m + 2
  810. }
  811. if info.projectname != "" {
  812. n++
  813. }
  814. if info.buyer != "" {
  815. n++
  816. }
  817. if info.projectcode != "" || info.contractnumber != "" {
  818. n++
  819. }
  820. if info.budget != 0 {
  821. n++
  822. }
  823. if info.bidamount != 0 {
  824. n++
  825. }
  826. if info.winner != "" {
  827. n++
  828. }
  829. if info.bidopentime != 0 {
  830. n++
  831. }
  832. if info.bidopenaddress != "" {
  833. n++
  834. }
  835. if info.agency != "" {
  836. n = n + 2
  837. }
  838. if info.city != "" {
  839. n = n + 2
  840. }
  841. if m > n {
  842. return true
  843. } else if m == n {
  844. if v.publishtime >= info.publishtime {
  845. return true
  846. } else {
  847. return false
  848. }
  849. } else {
  850. return false
  851. }
  852. }
  853. //无效数据
  854. func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
  855. var n int
  856. if d1 != "" {
  857. n++
  858. }
  859. if d2 != "" {
  860. n++
  861. }
  862. if d3 != "" {
  863. n++
  864. }
  865. if d4 != "" {
  866. n++
  867. }
  868. if n == 0 {
  869. return true
  870. }
  871. return false
  872. }
  873. //迁移数据dupdays+5之前的数据
  874. func movedata() {
  875. sess := mgo.GetMgoConn()
  876. defer mgo.DestoryMongoConn(sess)
  877. year, month, day := time.Now().Date()
  878. q := map[string]interface{}{
  879. "comeintime": map[string]interface{}{
  880. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix(),
  881. },
  882. }
  883. log.Println(q)
  884. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  885. index := 0
  886. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  887. mgo.Save(extract_back, tmp)
  888. tmp = map[string]interface{}{}
  889. if index%1000 == 0 {
  890. log.Println("index", index)
  891. }
  892. }
  893. log.Println("save to", extract_back, " ok index", index)
  894. delnum := mgo.Delete(extract, q)
  895. log.Println("remove from ", extract, delnum)
  896. }