main.go 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "os"
  13. "qfw/util"
  14. "regexp"
  15. "sync"
  16. "time"
  17. "github.com/cron"
  18. "gopkg.in/mgo.v2/bson"
  19. )
  20. var (
  21. Sysconfig map[string]interface{} //配置文件
  22. mconf map[string]interface{} //mongodb配置信息
  23. mgo *MongodbSim //mongodb操作对象
  24. extract string
  25. udpclient mu.UdpClient //udp对象
  26. nextNode []map[string]interface{} //下节点数组
  27. dupdays = 5 //初始化判重范围
  28. DM *datamap //
  29. HM *historymap //判重数据
  30. lastid = ""
  31. //正则筛选相关
  32. FilterRegTitle = regexp.MustCompile("^_$")
  33. FilterRegTitle_0 = regexp.MustCompile("^_$")
  34. FilterRegTitle_1 = regexp.MustCompile("^_$")
  35. FilterRegTitle_2 = regexp.MustCompile("^_$")
  36. isMerger bool //是否合并
  37. Is_Sort bool //是否排序
  38. threadNum int //线程数量
  39. SiteMap map[string]map[string]interface{} //站点map
  40. LowHeavy bool //低质量数据判重
  41. TimingTask bool //是否定时任务
  42. sid, eid string //测试人员判重使用
  43. )
  44. func init() {
  45. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  46. flag.StringVar(&sid, "sid", "", "开始id")
  47. flag.StringVar(&eid, "eid", "", "结束id")
  48. flag.Parse()
  49. //172.17.145.163:27080
  50. util.ReadConfig(&Sysconfig)
  51. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  52. mconf = Sysconfig["mongodb"].(map[string]interface{})
  53. mgo = &MongodbSim{
  54. MongodbAddr: mconf["addr"].(string),
  55. DbName: mconf["db"].(string),
  56. Size: util.IntAllDef(mconf["pool"], 10),
  57. }
  58. mgo.InitPool()
  59. extract = mconf["extract"].(string)
  60. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  61. //加载数据
  62. DM = NewDatamap(dupdays, lastid)
  63. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  64. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  65. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  66. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  67. isMerger = Sysconfig["isMerger"].(bool)
  68. Is_Sort = Sysconfig["isSort"].(bool)
  69. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  70. LowHeavy = Sysconfig["lowHeavy"].(bool)
  71. TimingTask = Sysconfig["timingTask"].(bool)
  72. //站点配置
  73. site := mconf["site"].(map[string]interface{})
  74. SiteMap = make(map[string]map[string]interface{}, 0)
  75. start := int(time.Now().Unix())
  76. sess_site := mgo.GetMgoConn()
  77. defer mgo.DestoryMongoConn(sess_site)
  78. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  79. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  80. data_map := map[string]interface{}{
  81. "area": util.ObjToString(site_dict["area"]),
  82. "city": util.ObjToString(site_dict["city"]),
  83. "district": util.ObjToString(site_dict["district"]),
  84. "sitetype": util.ObjToString(site_dict["sitetype"]),
  85. "level": util.ObjToString(site_dict["level"]),
  86. }
  87. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  88. }
  89. log.Printf("站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  90. }
  91. func main() {
  92. go checkMapJob()
  93. updport := Sysconfig["udpport"].(string)
  94. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  95. if TimingTask {
  96. go timedTaskDay()
  97. } else {
  98. udpclient.Listen(processUdpMsg)
  99. log.Println("Udp服务监听", updport)
  100. }
  101. time.Sleep(99999 * time.Hour)
  102. }
  103. //测试组人员使用
  104. func mainT() {
  105. /*
  106. ObjectId("5da3f31aa5cb26b9b798d3aa")
  107. ObjectId("5da418c4a5cb26b9b7e3e9a6")
  108. ObjectId("5da3f2c5a5cb26b9b79847fc")
  109. ObjectId("5db2735ba5cb26b9b7c99c6f")
  110. */
  111. log.Println("测试开始")
  112. sid = "5da3f2c5a5cb26b9b79847fc"
  113. eid = "5db2735ba5cb26b9b7c99c6f"
  114. mapinfo := map[string]interface{}{}
  115. if sid == "" || eid == "" {
  116. log.Println("sid,eid参数不能为空")
  117. os.Exit(0)
  118. }
  119. mapinfo["gtid"] = sid
  120. mapinfo["lteid"] = eid
  121. mapinfo["stop"] = "true"
  122. task([]byte{}, mapinfo)
  123. time.Sleep(10 * time.Second)
  124. }
  125. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  126. fmt.Println("接受的段数据")
  127. switch act {
  128. case mu.OP_TYPE_DATA: //上个节点的数据
  129. //从表中开始处理
  130. var mapInfo map[string]interface{}
  131. err := json.Unmarshal(data, &mapInfo)
  132. log.Println("err:", err, "mapInfo:", mapInfo)
  133. if err != nil {
  134. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  135. } else if mapInfo != nil {
  136. taskType := util.ObjToString(mapInfo["stype"])
  137. if taskType == "historyTask" {
  138. //历史更新流程
  139. go historyTask(data, mapInfo)
  140. } else if taskType == "normalTask" {
  141. //判重流程
  142. go task(data, mapInfo)
  143. } else {
  144. //其他
  145. go task(data, mapInfo)
  146. }
  147. key, _ := mapInfo["key"].(string)
  148. if key == "" {
  149. key = "udpok"
  150. }
  151. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  152. }
  153. case mu.OP_NOOP: //下个节点回应
  154. ok := string(data)
  155. if ok != "" {
  156. log.Println("ok:", ok)
  157. udptaskmap.Delete(ok)
  158. }
  159. }
  160. }
  161. //开始判重程序
  162. func task(data []byte, mapInfo map[string]interface{}) {
  163. log.Println("开始数据判重")
  164. defer util.Catch()
  165. //区间id
  166. q := map[string]interface{}{
  167. "_id": map[string]interface{}{
  168. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  169. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  170. },
  171. }
  172. log.Println(mgo.DbName, extract, q)
  173. sess := mgo.GetMgoConn()
  174. defer mgo.DestoryMongoConn(sess)
  175. //是否排序
  176. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("_id").Iter()
  177. if Is_Sort {
  178. log.Println("排序:publishtime")
  179. it = sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  180. }
  181. //it = sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  182. updateExtract := [][]map[string]interface{}{}
  183. log.Println("线程数:", threadNum)
  184. pool := make(chan bool, threadNum)
  185. wg := &sync.WaitGroup{}
  186. n, repeateN := 0, 0
  187. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  188. if n%10000 == 0 {
  189. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  190. }
  191. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
  192. tmp = make(map[string]interface{})
  193. repeateN++
  194. continue
  195. }
  196. pool <- true
  197. wg.Add(1)
  198. go func(tmp map[string]interface{}) {
  199. defer func() {
  200. <-pool
  201. wg.Done()
  202. }()
  203. info := NewInfo(tmp)
  204. if !LowHeavy { //是否进行低质量数据判重
  205. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  206. updateExtract = append(updateExtract, []map[string]interface{}{
  207. map[string]interface{}{
  208. "_id": tmp["_id"],
  209. },
  210. map[string]interface{}{
  211. "$set": map[string]interface{}{
  212. "repeat": -1, //无效数据标签
  213. },
  214. },
  215. })
  216. if len(updateExtract) > 500 {
  217. mgo.UpSertBulk(extract, updateExtract...)
  218. updateExtract = [][]map[string]interface{}{}
  219. }
  220. return
  221. }
  222. }
  223. b, source, reason := DM.check(info)
  224. if b { //有重复,生成更新语句,更新抽取和更新招标
  225. repeateN++
  226. var is_replace = false
  227. var mergeArr = []int64{} //更改合并数组记录
  228. var newData = &Info{} //更换新的数据池数据
  229. var repeat_idMap = map[string]interface{}{} //记录判重的
  230. var merge_idMap = map[string]interface{}{} //记录合并的
  231. repeat_idMap["_id"] = StringTOBsonId(info.id)
  232. merge_idMap["_id"] = StringTOBsonId(source.id)
  233. repeat_id := source.id //初始化一个数据
  234. if isMerger { //合并相关
  235. basic_bool := basicDataScore(source, info)
  236. if basic_bool {
  237. //已原始数据为标准 - 对比数据打判重标签-
  238. newData, mergeArr, is_replace = mergeDataFields(source, info)
  239. DM.replaceSourceData(newData, source.id) //替换
  240. //对比数据打重复标签的id,原始数据id的记录
  241. repeat_idMap["_id"] = StringTOBsonId(info.id)
  242. merge_idMap["_id"] = StringTOBsonId(source.id)
  243. repeat_id = source.id
  244. } else {
  245. //已对比数据为标准 ,数据池的数据打判重标签
  246. newData, mergeArr, is_replace = mergeDataFields(info, source)
  247. DM.replaceSourceData(newData, source.id) //替换
  248. //原始数据打重复标签的id, 对比数据id的记录
  249. repeat_idMap["_id"] = StringTOBsonId(source.id)
  250. merge_idMap["_id"] = StringTOBsonId(info.id)
  251. repeat_id = info.id
  252. }
  253. merge_map := make(map[string]interface{}, 0)
  254. if is_replace { //有过合并-更新数据
  255. merge_map = map[string]interface{}{
  256. "$set": map[string]interface{}{
  257. "merge": newData.mergemap,
  258. },
  259. }
  260. //更新合并后的数据
  261. for _, value := range mergeArr {
  262. if value == 0 {
  263. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  264. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  265. } else if value == 1 {
  266. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  267. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  268. } else if value == 2 {
  269. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  270. } else if value == 3 {
  271. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  272. } else if value == 4 {
  273. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  274. } else if value == 5 {
  275. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  276. } else if value == 6 {
  277. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  278. } else if value == 7 {
  279. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  280. } else if value == 8 {
  281. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  282. } else if value == 9 {
  283. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  284. } else if value == 10 {
  285. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  286. } else if value == 11 {
  287. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  288. } else {
  289. }
  290. }
  291. //模板数据更新
  292. updateExtract = append(updateExtract, []map[string]interface{}{
  293. merge_idMap,
  294. merge_map,
  295. })
  296. }
  297. } else { //高质量数据
  298. basic_bool := basicDataScore(source, info)
  299. if !basic_bool {
  300. DM.replaceSourceData(info, source.id) //替换
  301. repeat_idMap["_id"] = StringTOBsonId(source.id)
  302. repeat_id = info.id
  303. }
  304. }
  305. //重复数据打标签
  306. updateExtract = append(updateExtract, []map[string]interface{}{
  307. repeat_idMap,
  308. map[string]interface{}{
  309. "$set": map[string]interface{}{
  310. "repeat": 1,
  311. "repeat_reason": reason,
  312. "repeat_id": repeat_id,
  313. },
  314. },
  315. })
  316. }
  317. }(tmp)
  318. if len(updateExtract) > 500 {
  319. mgo.UpSertBulk(extract, updateExtract...)
  320. updateExtract = [][]map[string]interface{}{}
  321. }
  322. tmp = make(map[string]interface{})
  323. }
  324. wg.Wait()
  325. if len(updateExtract) > 0 {
  326. mgo.UpSertBulk(extract, updateExtract...)
  327. }
  328. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  329. //任务完成,开始发送广播通知下面节点
  330. if n > repeateN && mapInfo["stop"] == nil {
  331. for _, to := range nextNode {
  332. sid, _ := mapInfo["gtid"].(string)
  333. eid, _ := mapInfo["lteid"].(string)
  334. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  335. by, _ := json.Marshal(map[string]interface{}{
  336. "gtid": sid,
  337. "lteid": eid,
  338. "stype": util.ObjToString(to["stype"]),
  339. "key": key,
  340. })
  341. addr := &net.UDPAddr{
  342. IP: net.ParseIP(to["addr"].(string)),
  343. Port: util.IntAll(to["port"]),
  344. }
  345. node := &udpNode{by, addr, time.Now().Unix(), 0}
  346. udptaskmap.Store(key, node)
  347. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  348. }
  349. }
  350. }
  351. //支持历史更新
  352. func historyTask(data []byte, mapInfo map[string]interface{}) {
  353. fmt.Println("开始取历史时间段")
  354. defer util.Catch()
  355. sess := mgo.GetMgoConn()
  356. defer mgo.DestoryMongoConn(sess)
  357. q := map[string]interface{}{
  358. "_id": map[string]interface{}{
  359. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  360. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  361. },
  362. }
  363. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  364. minTime, maxTime := int64(0), int64(0)
  365. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  366. //取出最大最小时间
  367. info_time := tmp["comeintime"]
  368. if Is_Sort {
  369. info_time = tmp["publishtime"]
  370. }
  371. if minTime == 0 || maxTime == 0 && util.Int64All(info_time) != 0 {
  372. minTime = util.Int64All(info_time)
  373. maxTime = util.Int64All(info_time)
  374. } else {
  375. t := util.Int64All(info_time)
  376. if t < minTime && t != 0 {
  377. minTime = t
  378. }
  379. if t > maxTime && t != 0 {
  380. maxTime = t
  381. }
  382. }
  383. }
  384. //时间不正确时
  385. if minTime == 0 && maxTime == 0 {
  386. log.Println("段数据区间 不符合")
  387. return
  388. }
  389. fmt.Println("最小时间==", minTime, "最大时间==", maxTime)
  390. gtid, lteid := util.BsonIdToSId(mapInfo["gtid"].(string)), util.BsonIdToSId(mapInfo["lteid"].(string))
  391. fmt.Println(gtid, lteid)
  392. HM = NewHistorymap(gtid, lteid, minTime, maxTime)
  393. fmt.Println("开始历史数据判重")
  394. defer util.Catch()
  395. //区间id
  396. sess_history := mgo.GetMgoConn()
  397. defer mgo.DestoryMongoConn(sess_history)
  398. q_history := map[string]interface{}{
  399. "_id": map[string]interface{}{
  400. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  401. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  402. },
  403. }
  404. log.Println(mgo.DbName, extract, q_history)
  405. //是否排序
  406. it_history := sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Iter()
  407. if Is_Sort {
  408. it_history = sess_history.DB(mgo.DbName).C(extract).Find(&q_history).Sort("publishtime").Iter()
  409. }
  410. updateExtract := [][]map[string]interface{}{}
  411. log.Println("线程数:", threadNum)
  412. pool := make(chan bool, threadNum)
  413. wg := &sync.WaitGroup{}
  414. n, repeateN := 0, 0
  415. for tmp := make(map[string]interface{}); it_history.Next(&tmp); n++ {
  416. if n%10000 == 0 {
  417. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  418. }
  419. if util.IntAll(tmp["dataging"]) == 1 {
  420. tmp = make(map[string]interface{})
  421. continue
  422. }
  423. pool <- true
  424. wg.Add(1)
  425. go func(tmp map[string]interface{}) {
  426. defer func() {
  427. <-pool
  428. wg.Done()
  429. }()
  430. info := NewInfo(tmp)
  431. if !LowHeavy { //是否进行低质量数据判重
  432. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  433. updateExtract = append(updateExtract, []map[string]interface{}{
  434. map[string]interface{}{
  435. "_id": tmp["_id"],
  436. },
  437. map[string]interface{}{
  438. "$set": map[string]interface{}{
  439. "repeat": -1, //无效数据标签
  440. },
  441. },
  442. })
  443. if len(updateExtract) > 500 {
  444. mgo.UpSertBulk(extract, updateExtract...)
  445. updateExtract = [][]map[string]interface{}{}
  446. }
  447. return
  448. }
  449. }
  450. b, source, reason := HM.checkHistory(info)
  451. if b { //有重复,生成更新语句,更新抽取和更新招标
  452. if reason == "未判重记录" {
  453. fmt.Println("未判重记录")
  454. //把info的数据判重的标签更换,并新增字段
  455. HM.replaceSourceData(info, info.id) //替换即添加
  456. updateExtract = append(updateExtract, []map[string]interface{}{
  457. map[string]interface{}{
  458. "_id": tmp["_id"],
  459. },
  460. map[string]interface{}{
  461. "$set": map[string]interface{}{
  462. "repeat": 0,
  463. "repeatid": -2,
  464. },
  465. },
  466. })
  467. } else {
  468. repeateN++
  469. var is_replace = false
  470. var mergeArr = []int64{} //更改合并数组记录
  471. var newData = &Info{} //更换新的数据池数据
  472. var repeat_idMap = map[string]interface{}{} //记录判重的
  473. var merge_idMap = map[string]interface{}{} //记录合并的
  474. repeat_idMap["_id"] = StringTOBsonId(info.id)
  475. merge_idMap["_id"] = StringTOBsonId(source.id)
  476. repeat_id := source.id
  477. //以下合并相关
  478. if isMerger {
  479. basic_bool := basicDataScore(source, info)
  480. if basic_bool {
  481. //已原始数据为标准 - 对比数据打判重标签-
  482. newData, mergeArr, is_replace = mergeDataFields(source, info)
  483. HM.replaceSourceData(newData, source.id) //替换
  484. //对比数据打重复标签的id,原始数据id的记录
  485. repeat_idMap["_id"] = StringTOBsonId(info.id)
  486. merge_idMap["_id"] = StringTOBsonId(source.id)
  487. repeat_id = source.id
  488. } else {
  489. //已对比数据为标准 ,数据池的数据打判重标签
  490. newData, mergeArr, is_replace = mergeDataFields(info, source)
  491. HM.replaceSourceData(newData, source.id) //替换
  492. //原始数据打重复标签的id, 对比数据id的记录
  493. repeat_idMap["_id"] = StringTOBsonId(source.id)
  494. merge_idMap["_id"] = StringTOBsonId(info.id)
  495. repeat_id = info.id
  496. }
  497. merge_map := make(map[string]interface{}, 0)
  498. if is_replace { //有过合并-更新数据
  499. merge_map = map[string]interface{}{
  500. "$set": map[string]interface{}{
  501. "merge": newData.mergemap,
  502. },
  503. }
  504. //更新合并后的数据
  505. for _, value := range mergeArr {
  506. if value == 0 {
  507. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  508. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  509. } else if value == 1 {
  510. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  511. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  512. } else if value == 2 {
  513. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  514. } else if value == 3 {
  515. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  516. } else if value == 4 {
  517. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  518. } else if value == 5 {
  519. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  520. } else if value == 6 {
  521. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  522. } else if value == 7 {
  523. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  524. } else if value == 8 {
  525. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  526. } else if value == 9 {
  527. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  528. } else if value == 10 {
  529. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  530. } else if value == 11 {
  531. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  532. } else {
  533. }
  534. }
  535. //模板数据更新
  536. updateExtract = append(updateExtract, []map[string]interface{}{
  537. merge_idMap,
  538. merge_map,
  539. })
  540. }
  541. } else { //高质量数据
  542. basic_bool := basicDataScore(source, info)
  543. if !basic_bool {
  544. HM.replaceSourceData(info, source.id) //替换
  545. repeat_idMap["_id"] = StringTOBsonId(source.id)
  546. repeat_id = info.id
  547. }
  548. }
  549. //重复数据打标签
  550. updateExtract = append(updateExtract, []map[string]interface{}{
  551. repeat_idMap,
  552. map[string]interface{}{
  553. "$set": map[string]interface{}{
  554. "repeat": 1,
  555. "repeat_reason": reason,
  556. "repeat_id": repeat_id,
  557. },
  558. },
  559. })
  560. }
  561. }
  562. }(tmp)
  563. if len(updateExtract) > 500 {
  564. mgo.UpSertBulk(extract, updateExtract...)
  565. updateExtract = [][]map[string]interface{}{}
  566. }
  567. tmp = make(map[string]interface{})
  568. }
  569. wg.Wait()
  570. if len(updateExtract) > 0 {
  571. mgo.UpSertBulk(extract, updateExtract...)
  572. //mgo.UpdateBulk(bidding, updateBidding...)
  573. }
  574. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  575. //任务完成,开始发送广播通知下面节点
  576. if n > repeateN && mapInfo["stop"] == nil {
  577. for _, to := range nextNode {
  578. sid, _ := mapInfo["gtid"].(string)
  579. eid, _ := mapInfo["lteid"].(string)
  580. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  581. by, _ := json.Marshal(map[string]interface{}{
  582. "gtid": sid,
  583. "lteid": eid,
  584. "stype": util.ObjToString(to["stype"]),
  585. "key": key,
  586. })
  587. addr := &net.UDPAddr{
  588. IP: net.ParseIP(to["addr"].(string)),
  589. Port: util.IntAll(to["port"]),
  590. }
  591. node := &udpNode{by, addr, time.Now().Unix(), 0}
  592. udptaskmap.Store(key, node)
  593. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  594. }
  595. }
  596. }
  597. //定时任务
  598. func timedTaskDay() {
  599. c := cron.New()
  600. c.AddFunc("0 0 0 * * ?", func() { timedTaskOnce() }) //每天凌晨执行一次
  601. c.AddFunc("0 1 0 * * ?", func() { movedata() }) //每天凌晨1点执行一次
  602. c.Start()
  603. timedTaskOnce()
  604. }
  605. func timedTaskOnce() {
  606. log.Println("开始一次定时任务")
  607. now := time.Now()
  608. preTime := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, time.Local)
  609. curTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  610. task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
  611. task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
  612. lastid := task_sid
  613. log.Println(task_sid, task_eid)
  614. //ObjectId("5da3f31aa5cb26b9b798d3aa")
  615. //ObjectId("5da418c4a5cb26b9b7e3e9a6")
  616. //task_sid = "5da3f31aa5cb26b9b798d3aa"
  617. //task_eid = "5da418c4a5cb26b9b7e3e9a6"
  618. defer util.Catch()
  619. //区间id
  620. q := map[string]interface{}{
  621. "_id": map[string]interface{}{
  622. "$gte": StringTOBsonId(task_sid),
  623. "$lte": StringTOBsonId(task_eid),
  624. },
  625. }
  626. sess_start := mgo.GetMgoConn()
  627. defer mgo.DestoryMongoConn(sess_start)
  628. it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  629. startNum := 0
  630. for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
  631. if startNum%10000 == 0 {
  632. log.Println("正序遍历:", startNum)
  633. }
  634. if util.IntAll(tmp_start["dataging"]) == 1 { //取起始id
  635. lastid = BsonTOStringId(tmp_start["_id"])
  636. break
  637. }
  638. }
  639. DM = NewDatamap(dupdays, lastid)
  640. log.Println("本地数据加载完成,定时任务数据判重开始")
  641. sess := mgo.GetMgoConn()
  642. defer mgo.DestoryMongoConn(sess)
  643. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  644. updateExtract := [][]map[string]interface{}{}
  645. log.Println("线程数:", threadNum)
  646. pool := make(chan bool, threadNum)
  647. wg := &sync.WaitGroup{}
  648. n, repeateN := 0, 0
  649. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  650. if n%10000 == 0 {
  651. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  652. }
  653. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1 {
  654. tmp = make(map[string]interface{})
  655. continue
  656. }
  657. if util.IntAll(tmp["dataging"]) != 1 {
  658. tmp = make(map[string]interface{})
  659. continue
  660. }
  661. pool <- true
  662. wg.Add(1)
  663. go func(tmp map[string]interface{}) {
  664. defer func() {
  665. <-pool
  666. wg.Done()
  667. }()
  668. info := NewInfo(tmp)
  669. if !LowHeavy { //是否进行低质量数据判重
  670. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  671. updateExtract = append(updateExtract, []map[string]interface{}{
  672. map[string]interface{}{
  673. "_id": tmp["_id"],
  674. },
  675. map[string]interface{}{
  676. "$set": map[string]interface{}{
  677. "repeat": -1, //无效数据标签
  678. "dataging": 0,
  679. },
  680. },
  681. })
  682. if len(updateExtract) > 500 {
  683. mgo.UpSertBulk(extract, updateExtract...)
  684. updateExtract = [][]map[string]interface{}{}
  685. }
  686. return
  687. }
  688. }
  689. b, source, reason := DM.check(info)
  690. if b { //有重复,生成更新语句,更新抽取和更新招标
  691. repeateN++
  692. var is_replace = false
  693. var mergeArr = []int64{} //更改合并数组记录
  694. var newData = &Info{} //更换新的数据池数据
  695. var repeat_idMap = map[string]interface{}{} //记录判重的
  696. var merge_idMap = map[string]interface{}{} //记录合并的
  697. repeat_idMap["_id"] = StringTOBsonId(info.id)
  698. merge_idMap["_id"] = StringTOBsonId(source.id)
  699. repeat_id := source.id //初始化一个数据
  700. if isMerger { //合并相关
  701. basic_bool := basicDataScore(source, info)
  702. if basic_bool {
  703. //已原始数据为标准 - 对比数据打判重标签-
  704. newData, mergeArr, is_replace = mergeDataFields(source, info)
  705. DM.replaceSourceData(newData, source.id) //替换
  706. //对比数据打重复标签的id,原始数据id的记录
  707. repeat_idMap["_id"] = StringTOBsonId(info.id)
  708. merge_idMap["_id"] = StringTOBsonId(source.id)
  709. repeat_id = source.id
  710. } else {
  711. //已对比数据为标准 ,数据池的数据打判重标签
  712. newData, mergeArr, is_replace = mergeDataFields(info, source)
  713. DM.replaceSourceData(newData, source.id) //替换
  714. //原始数据打重复标签的id, 对比数据id的记录
  715. repeat_idMap["_id"] = StringTOBsonId(source.id)
  716. merge_idMap["_id"] = StringTOBsonId(info.id)
  717. repeat_id = info.id
  718. }
  719. merge_map := make(map[string]interface{}, 0)
  720. if is_replace { //有过合并-更新数据
  721. merge_map = map[string]interface{}{
  722. "$set": map[string]interface{}{
  723. "merge": newData.mergemap,
  724. "dataging": 0,
  725. },
  726. }
  727. //更新合并后的数据
  728. for _, value := range mergeArr {
  729. if value == 0 {
  730. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  731. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  732. } else if value == 1 {
  733. merge_map["$set"].(map[string]interface{})["area"] = newData.area
  734. merge_map["$set"].(map[string]interface{})["city"] = newData.city
  735. } else if value == 2 {
  736. merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
  737. } else if value == 3 {
  738. merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
  739. } else if value == 4 {
  740. merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
  741. } else if value == 5 {
  742. merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
  743. } else if value == 6 {
  744. merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
  745. } else if value == 7 {
  746. merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
  747. } else if value == 8 {
  748. merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
  749. } else if value == 9 {
  750. merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
  751. } else if value == 10 {
  752. merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
  753. } else if value == 11 {
  754. merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
  755. } else {
  756. }
  757. }
  758. //模板数据更新
  759. updateExtract = append(updateExtract, []map[string]interface{}{
  760. merge_idMap,
  761. merge_map,
  762. })
  763. }
  764. } else { //高质量数据
  765. basic_bool := basicDataScore(source, info)
  766. if !basic_bool {
  767. DM.replaceSourceData(info, source.id) //替换
  768. repeat_idMap["_id"] = StringTOBsonId(source.id)
  769. repeat_id = info.id
  770. }
  771. }
  772. //重复数据打标签
  773. updateExtract = append(updateExtract, []map[string]interface{}{
  774. repeat_idMap,
  775. map[string]interface{}{
  776. "$set": map[string]interface{}{
  777. "repeat": 1,
  778. "repeat_reason": reason,
  779. "repeat_id": repeat_id,
  780. "dataging": 0,
  781. },
  782. },
  783. })
  784. }
  785. }(tmp)
  786. if len(updateExtract) > 500 {
  787. mgo.UpSertBulk(extract, updateExtract...)
  788. updateExtract = [][]map[string]interface{}{}
  789. }
  790. tmp = make(map[string]interface{})
  791. }
  792. wg.Wait()
  793. if len(updateExtract) > 0 {
  794. mgo.UpSertBulk(extract, updateExtract...)
  795. }
  796. log.Println("this timeTask over.", n, "repeateN:", repeateN)
  797. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  798. if n > repeateN {
  799. for _, to := range nextNode {
  800. next_sid := util.BsonIdToSId(task_sid)
  801. next_eid := util.BsonIdToSId(task_eid)
  802. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  803. by, _ := json.Marshal(map[string]interface{}{
  804. "gtid": next_sid,
  805. "lteid": next_eid,
  806. "stype": util.ObjToString(to["stype"]),
  807. "key": key,
  808. })
  809. addr := &net.UDPAddr{
  810. IP: net.ParseIP(to["addr"].(string)),
  811. Port: util.IntAll(to["port"]),
  812. }
  813. node := &udpNode{by, addr, time.Now().Unix(), 0}
  814. udptaskmap.Store(key, node)
  815. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  816. }
  817. }
  818. }
  819. //合并字段-并更新merge字段的值
  820. func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
  821. merge_recordMap := make(map[string]interface{}, 0)
  822. mergeArr := make([]int64, 0)
  823. //是否替换数据了-记录原始的数据
  824. is_replace := false
  825. //1、城市
  826. if source.area == "" || source.area == "全国" {
  827. //为空
  828. if info.area != "全国" && info.area != "" {
  829. merge_recordMap["area"] = source.area
  830. merge_recordMap["city"] = source.city
  831. source.area = info.area
  832. source.city = info.city
  833. mergeArr = append(mergeArr, 1)
  834. is_replace = true
  835. }
  836. } else {
  837. //不为空-查看站点相关-有值必替换
  838. if source.is_site {
  839. //是站点替换的城市
  840. merge_recordMap["site_area"] = source.area
  841. merge_recordMap["site_city"] = source.city
  842. mergeArr = append(mergeArr, 0)
  843. is_replace = true
  844. source.is_site = false
  845. }
  846. }
  847. //2、项目名称
  848. if source.projectname == "" && info.projectname != "" {
  849. merge_recordMap["projectname"] = source.projectname
  850. source.projectname = info.projectname
  851. mergeArr = append(mergeArr, 2)
  852. is_replace = true
  853. }
  854. //3、项目编号
  855. if source.projectcode == "" && info.projectcode != "" {
  856. merge_recordMap["projectcode"] = source.projectcode
  857. source.projectcode = info.projectcode
  858. mergeArr = append(mergeArr, 3)
  859. is_replace = true
  860. }
  861. //4、采购单位
  862. if source.buyer == "" && info.buyer != "" {
  863. merge_recordMap["buyer"] = source.buyer
  864. source.buyer = info.buyer
  865. mergeArr = append(mergeArr, 4)
  866. is_replace = true
  867. }
  868. //5、预算
  869. if source.budget == 0 && info.budget != 0 {
  870. merge_recordMap["budget"] = source.budget
  871. source.budget = info.budget
  872. mergeArr = append(mergeArr, 5)
  873. is_replace = true
  874. }
  875. //6、中标单位
  876. if source.winner == "" && info.winner != "" {
  877. merge_recordMap["winner"] = source.winner
  878. source.winner = info.winner
  879. mergeArr = append(mergeArr, 6)
  880. is_replace = true
  881. }
  882. //7、中标金额
  883. if source.bidamount == 0 && info.bidamount != 0 {
  884. merge_recordMap["bidamount"] = source.bidamount
  885. source.bidamount = info.bidamount
  886. mergeArr = append(mergeArr, 7)
  887. is_replace = true
  888. }
  889. //8、开标时间-地点
  890. if source.bidopentime == 0 && info.bidopentime != 0 {
  891. merge_recordMap["bidopentime"] = source.bidopentime
  892. source.bidopentime = info.bidopentime
  893. mergeArr = append(mergeArr, 8)
  894. is_replace = true
  895. }
  896. //9、合同编号
  897. if source.contractnumber == "" && info.contractnumber != "" {
  898. merge_recordMap["contractnumber"] = source.contractnumber
  899. source.contractnumber = info.contractnumber
  900. mergeArr = append(mergeArr, 9)
  901. is_replace = true
  902. }
  903. //10、发布时间
  904. if source.publishtime == 0 && info.publishtime != 0 {
  905. merge_recordMap["publishtime"] = source.publishtime
  906. source.publishtime = info.publishtime
  907. mergeArr = append(mergeArr, 10)
  908. is_replace = true
  909. }
  910. //11、代理机构
  911. if source.agency == "" && info.agency != "" {
  912. merge_recordMap["agency"] = source.agency
  913. source.agency = info.agency
  914. mergeArr = append(mergeArr, 11)
  915. is_replace = true
  916. }
  917. if is_replace { //有过替换更新
  918. //总次数+1
  919. source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1
  920. merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"])
  921. //和哪一个数据id进行非空替换的-记录
  922. key := info.id
  923. source.mergemap[key] = merge_recordMap
  924. }
  925. //待进一步优化
  926. return source, mergeArr, is_replace
  927. }
  928. //权重评估
  929. func basicDataScore(v *Info, info *Info) bool {
  930. /*
  931. 权重评估
  932. 网站优先级判定规则:
  933. 1、中央>省>市>县区
  934. 2、政府采购>公共资源>采购单位官网>招标代理公司/平台
  935. */
  936. v_score, info_score := -1, -1
  937. dict_v := SiteMap[v.site]
  938. dict_info := SiteMap[info.site]
  939. //先判断level
  940. if dict_v != nil {
  941. v_level := util.ObjToString(dict_v["level"])
  942. if v_level == "中央" {
  943. v_score = 4
  944. } else if v_level == "省级" {
  945. v_score = 3
  946. } else if v_level == "市级" {
  947. v_score = 2
  948. } else if v_level == "县区" {
  949. v_score = 1
  950. } else if v_level == "" {
  951. } else {
  952. v_score = 0
  953. }
  954. }
  955. if dict_info != nil {
  956. info_level := util.ObjToString(dict_info["level"])
  957. if info_level == "中央" {
  958. info_score = 4
  959. } else if info_level == "省级" {
  960. info_score = 3
  961. } else if info_level == "市级" {
  962. info_score = 2
  963. } else if info_level == "县区" {
  964. info_score = 1
  965. } else if info_level == "" {
  966. } else {
  967. v_score = 0
  968. }
  969. }
  970. if v_score > info_score {
  971. return true
  972. }
  973. if v_score < info_score {
  974. return false
  975. }
  976. //判断sitetype
  977. if dict_v != nil {
  978. v_sitetype := util.ObjToString(dict_v["sitetype"])
  979. if v_sitetype == "政府采购" || v_sitetype == "政府门户" {
  980. v_score = 4
  981. } else if v_sitetype == "公共资源" {
  982. v_score = 3
  983. } else if v_sitetype == "官方网站" {
  984. v_score = 2
  985. } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" {
  986. v_score = 1
  987. } else if v_sitetype == "" {
  988. } else {
  989. v_score = 0
  990. }
  991. }
  992. if dict_info != nil {
  993. info_sitetype := util.ObjToString(dict_info["sitetype"])
  994. if info_sitetype == "政府采购" || info_sitetype == "政府门户" {
  995. info_score = 4
  996. } else if info_sitetype == "公共资源" {
  997. info_score = 3
  998. } else if info_sitetype == "官方网站" {
  999. info_score = 2
  1000. } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" {
  1001. info_score = 1
  1002. } else if info_sitetype == "" {
  1003. } else {
  1004. info_score = 0
  1005. }
  1006. }
  1007. if v_score > info_score {
  1008. return true
  1009. }
  1010. if v_score < info_score {
  1011. return false
  1012. }
  1013. //网站评估
  1014. m, n := 0, 0
  1015. if v.projectname != "" {
  1016. m++
  1017. }
  1018. if v.buyer != "" {
  1019. m++
  1020. }
  1021. if v.projectcode != "" || v.contractnumber != "" {
  1022. m++
  1023. }
  1024. if v.budget != 0 {
  1025. m++
  1026. }
  1027. if v.bidamount != 0 {
  1028. m++
  1029. }
  1030. if v.winner != "" {
  1031. m++
  1032. }
  1033. if v.bidopentime != 0 {
  1034. m++
  1035. }
  1036. if v.bidopenaddress != "" {
  1037. m++
  1038. }
  1039. if v.agency != "" {
  1040. m = m + 2
  1041. }
  1042. if v.city != "" {
  1043. m = m + 2
  1044. }
  1045. if info.projectname != "" {
  1046. n++
  1047. }
  1048. if info.buyer != "" {
  1049. n++
  1050. }
  1051. if info.projectcode != "" || info.contractnumber != "" {
  1052. n++
  1053. }
  1054. if info.budget != 0 {
  1055. n++
  1056. }
  1057. if info.bidamount != 0 {
  1058. n++
  1059. }
  1060. if info.winner != "" {
  1061. n++
  1062. }
  1063. if info.bidopentime != 0 {
  1064. n++
  1065. }
  1066. if info.bidopenaddress != "" {
  1067. n++
  1068. }
  1069. if info.agency != "" {
  1070. n = n + 2
  1071. }
  1072. if info.city != "" {
  1073. n = n + 2
  1074. }
  1075. if m > n {
  1076. return true
  1077. } else if m == n {
  1078. if v.publishtime >= info.publishtime {
  1079. return true
  1080. } else {
  1081. return false
  1082. }
  1083. } else {
  1084. return false
  1085. }
  1086. }
  1087. //无效数据
  1088. func invalidData(d1 string, d2 string, d3 string, d4 string) bool {
  1089. var n int
  1090. if d1 != "" {
  1091. n++
  1092. }
  1093. if d2 != "" {
  1094. n++
  1095. }
  1096. if d3 != "" {
  1097. n++
  1098. }
  1099. if d4 != "" {
  1100. n++
  1101. }
  1102. if n == 0 {
  1103. return true
  1104. }
  1105. return false
  1106. }
  1107. //迁移数据dupdays+5之前的数据
  1108. func movedata() {
  1109. sess := mgo.GetMgoConn()
  1110. defer mgo.DestoryMongoConn(sess)
  1111. year, month, day := time.Now().Date()
  1112. q := map[string]interface{}{
  1113. "comeintime": map[string]interface{}{
  1114. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+5) * 24 * time.Hour).Unix(),
  1115. },
  1116. }
  1117. log.Println(q)
  1118. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  1119. index := 0
  1120. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  1121. mgo.Save(extract+"_back", tmp)
  1122. tmp = map[string]interface{}{}
  1123. if index%1000 == 0 {
  1124. log.Println("index", index)
  1125. }
  1126. }
  1127. log.Println("save to", extract+"_back", " ok index", index)
  1128. delnum := mgo.Delete(extract, q)
  1129. log.Println("remove from ", extract, delnum)
  1130. }