main.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/cron"
  10. "gopkg.in/mgo.v2/bson"
  11. "log"
  12. mu "mfw/util"
  13. "net"
  14. "os"
  15. "qfw/util"
  16. "regexp"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. )
  22. var (
  23. Sysconfig map[string]interface{} //配置文件
  24. mconf map[string]interface{} //mongodb配置信息
  25. mgo *MongodbSim //mongodb操作对象
  26. task_mgo *MongodbSim //mongodb操作对象
  27. task_collName string
  28. extract string
  29. extract_back string
  30. udpclient mu.UdpClient //udp对象
  31. nextNode []map[string]interface{} //下节点数组
  32. dupdays = 7 //初始化判重范围
  33. DM *datamap //
  34. //正则筛选相关
  35. FilterRegTitle = regexp.MustCompile("^_$")
  36. FilterRegTitle_0 = regexp.MustCompile("^_$")
  37. FilterRegTitle_1 = regexp.MustCompile("^_$")
  38. FilterRegTitle_2 = regexp.MustCompile("^_$")
  39. isMerger bool //是否合并
  40. threadNum int //线程数量
  41. SiteMap map[string]map[string]interface{} //站点map
  42. LowHeavy bool //低质量数据判重
  43. TimingTask bool //是否定时任务
  44. timingSpanDay int64 //时间跨度
  45. timingPubScope int64 //发布时间周期
  46. gtid,lteid,lastid string
  47. IdType bool //默认object类型
  48. updatelock sync.Mutex //锁
  49. )
  50. func init() {
  51. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  52. flag.StringVar(&gtid, "gtid", "", "历史的起始id")
  53. flag.Parse()
  54. //172.17.145.163:27080
  55. util.ReadConfig(&Sysconfig)
  56. task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
  57. task_mgo = &MongodbSim{
  58. MongodbAddr: task_mconf["task_addrName"].(string),
  59. DbName: task_mconf["task_dbName"].(string),
  60. Size: util.IntAllDef(task_mconf["task_pool"], 10),
  61. }
  62. task_mgo.InitPool()
  63. task_collName = task_mconf["task_collName"].(string)
  64. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  65. mconf = Sysconfig["mongodb"].(map[string]interface{})
  66. mgo = &MongodbSim{
  67. MongodbAddr: mconf["addr"].(string),
  68. DbName: mconf["db"].(string),
  69. Size: util.IntAllDef(mconf["pool"], 10),
  70. }
  71. mgo.InitPool()
  72. extract = mconf["extract"].(string)
  73. extract_back = mconf["extract_back"].(string)
  74. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  75. //加载数据
  76. DM = NewDatamap(dupdays, lastid)
  77. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  78. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  79. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  80. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  81. isMerger = Sysconfig["isMerger"].(bool)
  82. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  83. LowHeavy = Sysconfig["lowHeavy"].(bool)
  84. TimingTask = Sysconfig["timingTask"].(bool)
  85. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  86. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  87. //站点配置
  88. site := mconf["site"].(map[string]interface{})
  89. SiteMap = make(map[string]map[string]interface{}, 0)
  90. start := int(time.Now().Unix())
  91. sess_site := mgo.GetMgoConn()
  92. defer mgo.DestoryMongoConn(sess_site)
  93. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  94. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  95. data_map := map[string]interface{}{
  96. "area": util.ObjToString(site_dict["area"]),
  97. "city": util.ObjToString(site_dict["city"]),
  98. "district": util.ObjToString(site_dict["district"]),
  99. "sitetype": util.ObjToString(site_dict["sitetype"]),
  100. "level": util.ObjToString(site_dict["level"]),
  101. "weight": util.ObjToString(site_dict["weight"]),
  102. }
  103. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  104. }
  105. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  106. }
  107. func main() {
  108. go checkMapJob()
  109. updport := Sysconfig["udpport"].(string)
  110. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  111. udpclient.Listen(processUdpMsg)
  112. log.Println("Udp服务监听", updport)
  113. if TimingTask {
  114. go historyTaskDay()
  115. }
  116. time.Sleep(99999 * time.Hour)
  117. }
  118. //测试组人员使用
  119. func mainT() {
  120. if TimingTask {
  121. go historyTaskDay()
  122. time.Sleep(99999 * time.Hour)
  123. } else {
  124. //IdType = true //打开id字符串模式
  125. sid := "1f16936d52c1d9fbf843c60e"
  126. eid := "9f16936d52c1d9fbf843c60e"
  127. log.Println("正常判重测试开始")
  128. log.Println(sid, "---", eid)
  129. mapinfo := map[string]interface{}{}
  130. if sid == "" || eid == "" {
  131. log.Println("sid,eid参数不能为空")
  132. os.Exit(0)
  133. }
  134. mapinfo["gtid"] = sid
  135. mapinfo["lteid"] = eid
  136. mapinfo["stop"] = "true"
  137. task([]byte{}, mapinfo)
  138. time.Sleep(99999 * time.Hour)
  139. }
  140. }
  141. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  142. fmt.Println("接受的段数据")
  143. switch act {
  144. case mu.OP_TYPE_DATA: //上个节点的数据
  145. //从表中开始处理
  146. var mapInfo map[string]interface{}
  147. err := json.Unmarshal(data, &mapInfo)
  148. log.Println("err:", err, "mapInfo:", mapInfo)
  149. if err != nil {
  150. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  151. } else if mapInfo != nil {
  152. taskType := util.ObjToString(mapInfo["stype"])
  153. if taskType == "normalTask" {
  154. //判重流程
  155. go task(data, mapInfo)
  156. } else {
  157. //其他
  158. go task(data, mapInfo)
  159. }
  160. key, _ := mapInfo["key"].(string)
  161. if key == "" {
  162. key = "udpok"
  163. }
  164. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  165. }
  166. case mu.OP_NOOP: //下个节点回应
  167. ok := string(data)
  168. if ok != "" {
  169. log.Println("ok:", ok)
  170. udptaskmap.Delete(ok)
  171. }
  172. }
  173. }
  174. //开始判重程序
  175. func task(data []byte, mapInfo map[string]interface{}) {
  176. log.Println("开始数据判重")
  177. defer util.Catch()
  178. //区间id
  179. q := map[string]interface{}{
  180. "_id": map[string]interface{}{
  181. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  182. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  183. },
  184. }
  185. if IdType {
  186. q = map[string]interface{}{
  187. "_id": map[string]interface{}{
  188. "$gt": mapInfo["gtid"].(string),
  189. "$lte": mapInfo["lteid"].(string),
  190. },
  191. }
  192. }
  193. log.Println(mgo.DbName, extract, q)
  194. sess := mgo.GetMgoConn()
  195. defer mgo.DestoryMongoConn(sess)
  196. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  197. updateExtract := [][]map[string]interface{}{}
  198. pool := make(chan bool, threadNum)
  199. wg := &sync.WaitGroup{}
  200. n, repeateN := 0, 0
  201. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  202. if n%10000 == 0 {
  203. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  204. }
  205. source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
  206. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  207. repeateN++
  208. updateExtract = append(updateExtract, []map[string]interface{}{
  209. map[string]interface{}{
  210. "_id": tmp["_id"],
  211. },
  212. map[string]interface{}{
  213. "$set": map[string]interface{}{
  214. "repeat": 1,
  215. "dataging":0,
  216. "repeat_reason": "sourcewebsite为1,重复",
  217. },
  218. },
  219. })
  220. if len(updateExtract) >= 200 {
  221. mgo.UpSertBulk(extract, updateExtract...)
  222. updateExtract = [][]map[string]interface{}{}
  223. }
  224. tmp = make(map[string]interface{})
  225. continue
  226. }
  227. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
  228. util.IntAll(tmp["dataging"]) == 1 {
  229. if util.IntAll(tmp["repeat"]) == 1 {
  230. repeateN++
  231. }
  232. tmp = make(map[string]interface{})
  233. continue
  234. }
  235. pool <- true
  236. wg.Add(1)
  237. go func(tmp map[string]interface{}) {
  238. defer func() {
  239. <-pool
  240. wg.Done()
  241. }()
  242. info := NewInfo(tmp)
  243. if !LowHeavy { //是否进行低质量数据判重
  244. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  245. updateExtract = append(updateExtract, []map[string]interface{}{
  246. map[string]interface{}{
  247. "_id": tmp["_id"],
  248. },
  249. map[string]interface{}{
  250. "$set": map[string]interface{}{
  251. "repeat": -1, //无效数据标签
  252. },
  253. },
  254. })
  255. if len(updateExtract) >= 200 {
  256. mgo.UpSertBulk(extract, updateExtract...)
  257. updateExtract = [][]map[string]interface{}{}
  258. }
  259. return
  260. }
  261. }
  262. //正常判重
  263. b, source, reason := DM.check(info)
  264. if b { //有重复,生成更新语句,更新抽取和更新招标
  265. repeateN++
  266. var updateID = map[string]interface{}{} //记录更新判重的
  267. updateID["_id"] = StringTOBsonId(info.id)
  268. if IdType {
  269. updateID["_id"] = info.id
  270. }
  271. repeat_ids:=source.repeat_ids
  272. repeat_ids = append(repeat_ids,info.id)
  273. source.repeat_ids = repeat_ids
  274. //替换数据池-更新
  275. DM.replacePoolData(source)
  276. updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
  277. map[string]interface{}{
  278. "_id": StringTOBsonId(source.id),
  279. },
  280. map[string]interface{}{
  281. "$set": map[string]interface{}{
  282. "repeat_ids": repeat_ids,
  283. },
  284. },
  285. })
  286. updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
  287. updateID,
  288. map[string]interface{}{
  289. "$set": map[string]interface{}{
  290. "repeat": 1,
  291. "repeat_reason": reason,
  292. "repeat_id": source.id,
  293. },
  294. },
  295. })
  296. //是否合并-低质量数据不合并
  297. if isMerger && !strings.Contains(reason,"低质量"){
  298. newData, update_map ,isReplace := mergeDataFields(source, info)
  299. if isReplace {//替换-数据池
  300. fmt.Println("合并更新的id:",source.id)
  301. //数据池 - 替换
  302. DM.replacePoolData(newData)
  303. //mongo更新 - 具体字段 - merge
  304. mgo.UpdateById(extract,source.id,update_map)
  305. //发udp 更新索引
  306. //for _, to := range nextNode {
  307. // key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
  308. // by, _ := json.Marshal(map[string]interface{}{
  309. // "gtid": source.id,
  310. // "lteid": source.id,
  311. // "stype": "biddingall",
  312. // "key": key,
  313. // })
  314. // addr := &net.UDPAddr{
  315. // IP: net.ParseIP(to["addr"].(string)),
  316. // Port: util.IntAll(to["port"]),
  317. // }
  318. // node := &udpNode{by, addr, time.Now().Unix(), 0}
  319. // udptaskmap.Store(key, node)
  320. // udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  321. //}
  322. }
  323. }
  324. }
  325. }(tmp)
  326. if len(updateExtract) >= 200 {
  327. mgo.UpSertBulk(extract, updateExtract...)
  328. updateExtract = [][]map[string]interface{}{}
  329. }
  330. tmp = make(map[string]interface{})
  331. }
  332. wg.Wait()
  333. if len(updateExtract) > 0 {
  334. mgo.UpSertBulk(extract, updateExtract...)
  335. }
  336. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  337. time.Sleep(30 * time.Second)
  338. //任务完成,开始发送广播通知下面节点
  339. if n >= repeateN && mapInfo["stop"] == nil {
  340. log.Println("判重任务完成发送udp")
  341. for _, to := range nextNode {
  342. sid, _ := mapInfo["gtid"].(string)
  343. eid, _ := mapInfo["lteid"].(string)
  344. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  345. by, _ := json.Marshal(map[string]interface{}{
  346. "gtid": sid,
  347. "lteid": eid,
  348. "stype": util.ObjToString(to["stype"]),
  349. "key": key,
  350. })
  351. addr := &net.UDPAddr{
  352. IP: net.ParseIP(to["addr"].(string)),
  353. Port: util.IntAll(to["port"]),
  354. }
  355. node := &udpNode{by, addr, time.Now().Unix(), 0}
  356. udptaskmap.Store(key, node)
  357. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  358. }
  359. }
  360. }
  361. func historyTaskDay() {
  362. defer util.Catch()
  363. for {
  364. start:=time.Now().Unix()
  365. if gtid=="" {
  366. log.Println("请传gtid,否则无法运行")
  367. os.Exit(0)
  368. return
  369. }
  370. if lteid!="" {
  371. //先进行数据迁移
  372. log.Println("开启一次迁移任务",gtid,lteid)
  373. moveHistoryData(gtid,lteid)
  374. gtid = lteid //替换数据
  375. }
  376. //查询表最后一个id
  377. task_sess := task_mgo.GetMgoConn()
  378. defer task_mgo.DestoryMongoConn(task_sess)
  379. q:=map[string]interface{}{
  380. "isused":true,
  381. }
  382. between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
  383. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
  384. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  385. lteid = util.ObjToString(tmp["gtid"])
  386. log.Println("查询的最后一个任务Id:",lteid)
  387. break
  388. }
  389. //
  390. log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
  391. time.Sleep(5 * time.Minute)
  392. sess := mgo.GetMgoConn()//连接器
  393. defer mgo.DestoryMongoConn(sess)
  394. //开始判重
  395. q = map[string]interface{}{
  396. "_id": map[string]interface{}{
  397. "$gt": StringTOBsonId(gtid),
  398. "$lte": StringTOBsonId(lteid),
  399. },
  400. }
  401. log.Println("历史判重查询条件:",q,"时间:", between_time)
  402. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  403. num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
  404. updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
  405. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  406. dayArr := []map[string]interface{}{}
  407. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  408. if num%10000 == 0 {
  409. log.Println("正序遍历:", num)
  410. }
  411. source := util.ObjToMap(tmp["jsondata"])
  412. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  413. outnum++
  414. updatelock.Lock()
  415. updateExtract = append(updateExtract, []map[string]interface{}{
  416. map[string]interface{}{
  417. "_id": tmp["_id"],
  418. },
  419. map[string]interface{}{
  420. "$set": map[string]interface{}{
  421. "repeat": 1,
  422. "dataging": 0,
  423. "repeat_reason": "sourcewebsite为1 重复",
  424. },
  425. },
  426. })
  427. if len(updateExtract) > 200 {
  428. log.Println("sourcewebsite,批量更新")
  429. mgo.UpSertBulk(extract, updateExtract...)
  430. updateExtract = [][]map[string]interface{}{}
  431. }
  432. updatelock.Unlock()
  433. tmp = make(map[string]interface{})
  434. continue
  435. }
  436. //取-符合-发布时间X年内的数据
  437. updatelock.Lock()
  438. if util.IntAll(tmp["dataging"]) == 1 {
  439. pubtime := util.Int64All(tmp["publishtime"])
  440. if pubtime > 0 && pubtime >= between_time {
  441. oknum++
  442. if deterTime==0 {
  443. log.Println("找到第一条符合条件的数据")
  444. deterTime = util.Int64All(tmp["publishtime"])
  445. dayArr = append(dayArr,tmp)
  446. }else {
  447. if pubtime-deterTime >timingSpanDay*86400 {
  448. //新数组重新构建,当前组数据加到全部组数据
  449. pendAllArr = append(pendAllArr,dayArr)
  450. dayArr = []map[string]interface{}{}
  451. deterTime = util.Int64All(tmp["publishtime"])
  452. dayArr = append(dayArr,tmp)
  453. }else {
  454. dayArr = append(dayArr,tmp)
  455. }
  456. }
  457. }else {
  458. outnum++
  459. //不在两年内的也清标记
  460. updateExtract = append(updateExtract, []map[string]interface{}{
  461. map[string]interface{}{
  462. "_id": tmp["_id"],
  463. },
  464. map[string]interface{}{
  465. "$set": map[string]interface{}{
  466. "dataging": 0,
  467. },
  468. },
  469. })
  470. if len(updateExtract) > 200 {
  471. log.Println("不在周期内符合dataging==1,批量更新")
  472. mgo.UpSertBulk(extract, updateExtract...)
  473. updateExtract = [][]map[string]interface{}{}
  474. }
  475. }
  476. }
  477. updatelock.Unlock()
  478. tmp = make(map[string]interface{})
  479. }
  480. //批量更新标记
  481. updatelock.Lock()
  482. if len(updateExtract) > 0 {
  483. log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
  484. mgo.UpSertBulk(extract, updateExtract...)
  485. updateExtract = [][]map[string]interface{}{}
  486. }
  487. updatelock.Unlock()
  488. if len(dayArr)>0 {
  489. pendAllArr = append(pendAllArr,dayArr)
  490. dayArr = []map[string]interface{}{}
  491. }
  492. log.Println("查询数量:",num,"符合条件:",oknum)
  493. if len(pendAllArr) <= 0 {
  494. log.Println("没找到dataging==1的数据")
  495. }
  496. //测试分组数量是否正确
  497. testNum:=0
  498. for k,v:=range pendAllArr {
  499. log.Println("第",k,"组--","数量:",len(v))
  500. testNum = testNum+len(v)
  501. }
  502. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  503. n, repeateN := 0, 0
  504. log.Println("线程数:",threadNum)
  505. pool := make(chan bool, threadNum)
  506. wg := &sync.WaitGroup{}
  507. for k,v:=range pendAllArr { //每组结束更新一波数据
  508. pool <- true
  509. wg.Add(1)
  510. go func(k int, v []map[string]interface{}) {
  511. defer func() {
  512. <-pool
  513. wg.Done()
  514. }()
  515. //每组临时数组 - 互不干扰
  516. groupUpdateExtract := [][]map[string]interface{}{}
  517. //
  518. groupOtherExtract := [][]map[string]interface{}{}
  519. //构建当前组的数据池
  520. log.Println("构建第", k, "组---(数据池)")
  521. //当前组的第一个发布时间
  522. first_pt := util.Int64All(v[len(v)-1]["publishtime"])
  523. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  524. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  525. n = n + len(v)
  526. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  527. for _, tmp := range v {
  528. info := NewInfo(tmp)
  529. if !LowHeavy { //是否进行低质量数据判重
  530. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  531. log.Println("无效数据")
  532. updatelock.Lock()
  533. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  534. map[string]interface{}{
  535. "_id": tmp["_id"],
  536. },
  537. map[string]interface{}{
  538. "$set": map[string]interface{}{
  539. "repeat": -1, //无效数据标签
  540. "dataging": 0,
  541. },
  542. },
  543. })
  544. if len(groupUpdateExtract) > 200 {
  545. mgo.UpSertBulk(extract, groupUpdateExtract...)
  546. groupUpdateExtract = [][]map[string]interface{}{}
  547. }
  548. updatelock.Unlock()
  549. return
  550. }
  551. }
  552. b, source, reason := curTM.check(info)
  553. if b { //有重复,生成更新语句,更新抽取和更新招标
  554. repeateN++
  555. //重复数据打标签
  556. repeat_ids:=source.repeat_ids
  557. repeat_ids = append(repeat_ids,info.id)
  558. source.repeat_ids = repeat_ids
  559. //替换数据池-更新
  560. DM.replacePoolData(source)
  561. updatelock.Lock()
  562. //更新数据源- 14 或者 15
  563. //判断是否在当前段落
  564. if judgeIsCurIds(gtid,lteid,source.id) {
  565. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
  566. map[string]interface{}{
  567. "_id": StringTOBsonId(source.id),
  568. },
  569. map[string]interface{}{
  570. "$set": map[string]interface{}{
  571. "repeat_ids": repeat_ids,
  572. },
  573. },
  574. })
  575. }else {
  576. groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
  577. map[string]interface{}{
  578. "_id": StringTOBsonId(source.id),
  579. },
  580. map[string]interface{}{
  581. "$set": map[string]interface{}{
  582. "repeat_ids": repeat_ids,
  583. },
  584. },
  585. })
  586. }
  587. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  588. map[string]interface{}{
  589. "_id": tmp["_id"],
  590. },
  591. map[string]interface{}{
  592. "$set": map[string]interface{}{
  593. "repeat": 1,
  594. "repeat_reason": reason,
  595. "repeat_id": source.id,
  596. "dataging": 0,
  597. },
  598. },
  599. })
  600. if len(groupUpdateExtract) > 200 {
  601. mgo.UpSertBulk(extract, groupUpdateExtract...)
  602. groupUpdateExtract = [][]map[string]interface{}{}
  603. }
  604. if len(groupOtherExtract) > 200 {
  605. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  606. groupOtherExtract = [][]map[string]interface{}{}
  607. }
  608. updatelock.Unlock()
  609. } else {
  610. updatelock.Lock()
  611. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  612. map[string]interface{}{
  613. "_id": tmp["_id"],
  614. },
  615. map[string]interface{}{
  616. "$set": map[string]interface{}{
  617. "dataging": 0, //符合条件的都为dataging==0
  618. },
  619. },
  620. })
  621. if len(groupUpdateExtract) > 200 {
  622. mgo.UpSertBulk(extract, groupUpdateExtract...)
  623. groupUpdateExtract = [][]map[string]interface{}{}
  624. }
  625. updatelock.Unlock()
  626. }
  627. }
  628. //每组数据结束-更新数据
  629. updatelock.Lock()
  630. if len(groupUpdateExtract) > 0 {
  631. mgo.UpSertBulk(extract, groupUpdateExtract...)
  632. }
  633. if len(groupOtherExtract) > 0 {
  634. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  635. }
  636. updatelock.Unlock()
  637. }(k, v)
  638. }
  639. wg.Wait()
  640. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  641. if n >= repeateN && gtid!=lteid{
  642. for _, to := range nextNode {
  643. next_sid := util.BsonIdToSId(gtid)
  644. next_eid := util.BsonIdToSId(lteid)
  645. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  646. by, _ := json.Marshal(map[string]interface{}{
  647. "gtid": next_sid,
  648. "lteid": next_eid,
  649. "stype": util.ObjToString(to["stype"]),
  650. "key": key,
  651. })
  652. addr := &net.UDPAddr{
  653. IP: net.ParseIP(to["addr"].(string)),
  654. Port: util.IntAll(to["port"]),
  655. }
  656. node := &udpNode{by, addr, time.Now().Unix(), 0}
  657. udptaskmap.Store(key, node)
  658. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  659. }
  660. }
  661. end:=time.Now().Unix()
  662. log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
  663. log.Println(gtid,lteid)
  664. if end-start<60*5 {
  665. log.Println("睡眠.............")
  666. time.Sleep(5 * time.Minute)
  667. }
  668. log.Println("继续下一段的历史判重")
  669. }
  670. }
  671. //判断是否在当前id段落
  672. func judgeIsCurIds (gtid string,lteid string,curid string) bool {
  673. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  674. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  675. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  676. if cur_time>gt_time&&cur_time<=lte_time {
  677. return true
  678. }
  679. return false
  680. }
  681. //迁移上一段数据
  682. func moveHistoryData(startid string,endid string) {
  683. sess := mgo.GetMgoConn()
  684. defer mgo.DestoryMongoConn(sess)
  685. year, month, day := time.Now().Date()
  686. q := map[string]interface{}{
  687. "_id": map[string]interface{}{
  688. "$gt": StringTOBsonId(startid),
  689. "$lte": StringTOBsonId(endid),
  690. },
  691. }
  692. log.Println(q)
  693. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  694. index := 0
  695. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  696. mgo.Save(extract_back, tmp)
  697. tmp = map[string]interface{}{}
  698. if index%1000 == 0 {
  699. log.Println("index", index)
  700. }
  701. }
  702. log.Println("save to", extract_back, " ok index", index)
  703. qv := map[string]interface{}{
  704. "comeintime": map[string]interface{}{
  705. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
  706. },
  707. }
  708. delnum := mgo.Delete(extract, qv)
  709. log.Println("remove from ", extract, delnum)
  710. }
  711. func moveTimeoutData() {
  712. log.Println("部署迁移定时任务")
  713. c := cron.New()
  714. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  715. c.Start()
  716. }
  717. func moveOnceTimeOut() {
  718. log.Println("执行一次迁移超时数据")
  719. sess := mgo.GetMgoConn()
  720. defer mgo.DestoryMongoConn(sess)
  721. now:=time.Now()
  722. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  723. task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  724. q := map[string]interface{}{
  725. "_id": map[string]interface{}{
  726. "$lt": StringTOBsonId(task_id),
  727. },
  728. }
  729. it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
  730. index := 0
  731. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  732. if index%10000 == 0 {
  733. log.Println("index", index)
  734. }
  735. del_id:=BsonTOStringId(tmp["_id"])
  736. mgo.Save("result_20200713", tmp)
  737. mgo.DeleteById("result_20200714",del_id)
  738. tmp = map[string]interface{}{}
  739. }
  740. log.Println("save and delete", " ok index", index)
  741. }