main.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "github.com/cron"
  9. "gopkg.in/mgo.v2/bson"
  10. "log"
  11. mu "mfw/util"
  12. "net"
  13. "os"
  14. "qfw/util"
  15. "regexp"
  16. "strconv"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. Sysconfig map[string]interface{} //配置文件
  22. mconf map[string]interface{} //mongodb配置信息
  23. mgo *MongodbSim //mongodb操作对象
  24. task_mgo *MongodbSim //mongodb操作对象
  25. task_collName string
  26. extract string
  27. extract_back string
  28. udpclient mu.UdpClient //udp对象
  29. nextNode []map[string]interface{} //下节点数组
  30. dupdays = 7 //初始化判重范围
  31. DM *datamap //
  32. Update *updateInfo
  33. //正则筛选相关
  34. FilterRegTitle = regexp.MustCompile("^_$")
  35. FilterRegTitle_0 = regexp.MustCompile("^_$")
  36. FilterRegTitle_1 = regexp.MustCompile("^_$")
  37. FilterRegTitle_2 = regexp.MustCompile("^_$")
  38. isMerger bool //是否合并
  39. threadNum int //线程数量
  40. SiteMap map[string]map[string]interface{} //站点map
  41. LowHeavy bool //低质量数据判重
  42. TimingTask bool //是否定时任务
  43. timingSpanDay int64 //时间跨度
  44. timingPubScope int64 //发布时间周期
  45. gtid,lastid,gtept,ltept string //命令输入
  46. lteid string //历史增量属性
  47. IsFull bool //是否全量
  48. updatelock sync.Mutex //锁4
  49. userName,passWord string //mongo -用户密码
  50. )
  51. var udptask chan struct{} = make(chan struct{}, 1)
  52. func init() {
  53. flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
  54. flag.StringVar(&gtid, "gtid", "", "历史增量的起始id") //历史
  55. flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
  56. flag.StringVar(&ltept, "ltept", "", "全量lte发布时间") //全量区间pt
  57. flag.Parse()
  58. util.ReadConfig(&Sysconfig)
  59. userName = util.ObjToString(Sysconfig["userName"])
  60. passWord = util.ObjToString(Sysconfig["passWord"])
  61. log.Println("集群用户密码:",userName,passWord)
  62. task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
  63. task_mgo = &MongodbSim{
  64. MongodbAddr: task_mconf["task_addrName"].(string),
  65. DbName: task_mconf["task_dbName"].(string),
  66. Size: util.IntAllDef(task_mconf["task_pool"], 10),
  67. UserName: userName,
  68. Password: passWord,
  69. }
  70. task_mgo.InitPool()
  71. task_collName = task_mconf["task_collName"].(string)
  72. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  73. mconf = Sysconfig["mongodb"].(map[string]interface{})
  74. mgo = &MongodbSim{
  75. MongodbAddr: mconf["addr"].(string),
  76. DbName: mconf["db"].(string),
  77. Size: util.IntAllDef(mconf["pool"], 10),
  78. }
  79. mgo.InitPool()
  80. extract = mconf["extract"].(string)
  81. extract_back = mconf["extract_back"].(string)
  82. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  83. //加载数据
  84. DM = NewDatamap(dupdays, lastid)
  85. //更新池
  86. Update = newUpdatePool()
  87. go Update.updateData()
  88. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  89. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  90. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  91. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  92. isMerger = Sysconfig["isMerger"].(bool)
  93. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  94. LowHeavy = Sysconfig["lowHeavy"].(bool)
  95. TimingTask = Sysconfig["timingTask"].(bool)
  96. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  97. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  98. //站点配置
  99. site := mconf["site"].(map[string]interface{})
  100. SiteMap = make(map[string]map[string]interface{}, 0)
  101. start := int(time.Now().Unix())
  102. sess_site := mgo.GetMgoConn()
  103. defer mgo.DestoryMongoConn(sess_site)
  104. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  105. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  106. data_map := map[string]interface{}{
  107. "area": util.ObjToString(site_dict["area"]),
  108. "city": util.ObjToString(site_dict["city"]),
  109. "district": util.ObjToString(site_dict["district"]),
  110. "sitetype": util.ObjToString(site_dict["sitetype"]),
  111. "level": util.ObjToString(site_dict["level"]),
  112. "weight": util.ObjToString(site_dict["weight"]),
  113. }
  114. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  115. }
  116. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  117. }
  118. func main() {
  119. go checkMapJob()
  120. updport := Sysconfig["udpport"].(string)
  121. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  122. udpclient.Listen(processUdpMsg)
  123. log.Println("Udp服务监听", updport)
  124. if TimingTask {
  125. log.Println("正常历史部署")
  126. go historyTaskDay()
  127. }else {
  128. if gtept!=""&&ltept!="" {
  129. log.Println("全量判重-准备开始")
  130. IsFull = true //全量判重
  131. sid := "1fffffffffffffffffffffff"
  132. eid := "9fffffffffffffffffffffff"
  133. mapinfo := map[string]interface{}{}
  134. if sid == "" || eid == "" {
  135. log.Println("sid,eid参数不能为空")
  136. os.Exit(0)
  137. }
  138. mapinfo["gtid"] = sid
  139. mapinfo["lteid"] = eid
  140. mapinfo["stop"] = "true"
  141. task([]byte{}, mapinfo)
  142. time.Sleep(99999 * time.Hour)
  143. }else {
  144. //正常增量
  145. log.Println("正常增量部署")
  146. }
  147. }
  148. time.Sleep(99999 * time.Hour)
  149. }
  150. //测试组人员使用
  151. func mainT() {
  152. if TimingTask {
  153. go historyTaskDay()
  154. time.Sleep(99999 * time.Hour)
  155. } else {
  156. IsFull = true //全量判重
  157. sid := "1fffffffffffffffffffffff"
  158. eid := "9fffffffffffffffffffffff"
  159. mapinfo := map[string]interface{}{}
  160. if sid == "" || eid == "" {
  161. log.Println("sid,eid参数不能为空")
  162. os.Exit(0)
  163. }
  164. mapinfo["gtid"] = sid
  165. mapinfo["lteid"] = eid
  166. mapinfo["stop"] = "true"
  167. log.Println("测试:全量判重-准备开始")
  168. task([]byte{}, mapinfo)
  169. time.Sleep(99999 * time.Hour)
  170. }
  171. }
  172. //upd接收
  173. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  174. select {
  175. case udptask <- struct{}{}:
  176. log.Println("...接收段落,通道正常...")
  177. switch act {
  178. case mu.OP_TYPE_DATA: //上个节点的数据
  179. var mapInfo map[string]interface{}
  180. err := json.Unmarshal(data, &mapInfo)
  181. if err != nil {
  182. log.Println("error data:", err)
  183. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  184. } else if mapInfo != nil {
  185. key, _ := mapInfo["key"].(string)
  186. if key == "" {
  187. key = "udpok"
  188. }
  189. log.Println("当前段落,需要判重...",mapInfo)
  190. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  191. task(data, mapInfo)
  192. }
  193. log.Println("此段任务结束...",err,mapInfo)
  194. <-udptask
  195. case mu.OP_NOOP: //下个节点回应
  196. ok := string(data)
  197. if ok != "" {
  198. log.Println("下节点回应-ok:", ok)
  199. udptaskmap.Delete(ok)
  200. }
  201. <-udptask
  202. }
  203. case <-time.After(2 * time.Second):
  204. switch act {
  205. case mu.OP_TYPE_DATA: //上个节点的数据
  206. log.Println("通道堵塞中...上节点")
  207. udpclient.WriteUdp([]byte("repeat_busy"), mu.OP_NOOP, ra)
  208. case mu.OP_NOOP: //下个节点回应
  209. log.Println("通道堵塞中...下节点")
  210. ok := string(data)
  211. if ok != "" {
  212. log.Println("下节点回应-ok:", ok)
  213. udptaskmap.Delete(ok)
  214. }
  215. }
  216. }
  217. //udptask <- struct{}{}
  218. //defer func() {
  219. // <-udptask
  220. //}()
  221. }
  222. //开始判重程序
  223. func task(data []byte, mapInfo map[string]interface{}) {
  224. log.Println("开始数据判重")
  225. defer util.Catch()
  226. //区间id
  227. q := map[string]interface{}{
  228. "_id": map[string]interface{}{
  229. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  230. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  231. },
  232. }
  233. //全量
  234. if IsFull && gtept!="" && ltept!=""{
  235. log.Println("执行全量分段模式")
  236. log.Println(gtept,"---",ltept)
  237. q = map[string]interface{}{
  238. "publishtime": map[string]interface{}{
  239. "$gte": util.Int64All(gtept),
  240. "$lte": util.Int64All(ltept),
  241. },
  242. }
  243. }
  244. log.Println("查询条件:",mgo.DbName, extract, q)
  245. sess := mgo.GetMgoConn()
  246. defer mgo.DestoryMongoConn(sess)
  247. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  248. pool := make(chan bool, threadNum)
  249. wg := &sync.WaitGroup{}
  250. n, repeateN := 0, 0
  251. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  252. if n%1000 == 0 {
  253. log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
  254. }
  255. if util.IntAll(tmp["repeat"]) == 1 {
  256. repeateN++
  257. tmp = make(map[string]interface{})
  258. continue
  259. }
  260. if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
  261. tmp = make(map[string]interface{})
  262. continue
  263. }
  264. pool <- true
  265. wg.Add(1)
  266. go func(tmp map[string]interface{}) {
  267. defer func() {
  268. <-pool
  269. wg.Done()
  270. }()
  271. info := NewInfo(tmp)
  272. //正常判重
  273. b, source, reason := DM.check(info)
  274. if b {
  275. repeateN++
  276. var updateID = map[string]interface{}{} //记录更新判重的
  277. updateID["_id"] = StringTOBsonId(info.id)
  278. repeat_ids:=source.repeat_ids
  279. repeat_ids = append(repeat_ids,info.id)
  280. source.repeat_ids = repeat_ids
  281. //替换数据池-更新
  282. DM.replacePoolData(source)
  283. Update.updatePool <- []map[string]interface{}{//原始数据打标签
  284. map[string]interface{}{
  285. "_id": StringTOBsonId(source.id),
  286. },
  287. map[string]interface{}{
  288. "$set": map[string]interface{}{
  289. "repeat_ids": repeat_ids,
  290. },
  291. },
  292. }
  293. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  294. updateID,
  295. map[string]interface{}{
  296. "$set": map[string]interface{}{
  297. "repeat": 1,
  298. "repeat_reason": reason,
  299. "repeat_id": source.id,
  300. "dataging": 0,
  301. "updatetime_repeat" :util.Int64All(time.Now().Unix()),
  302. },
  303. },
  304. }
  305. }
  306. }(tmp)
  307. tmp = make(map[string]interface{})
  308. }
  309. wg.Wait()
  310. log.Println("this current task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  311. //log.Println("当前数据池的数量:",DM.currentTotalCount())
  312. //睡眠时间30s 目的是让数据池更新所有数据...
  313. time.Sleep(15 * time.Second)
  314. //更新Ocr的标记
  315. updateOcrFileData(mapInfo["lteid"].(string))
  316. //任务完成,开始发送广播通知下面节点
  317. if n >= repeateN && mapInfo["stop"] == nil {
  318. log.Println("判重任务完成发送udp")
  319. for _, to := range nextNode {
  320. sid, _ := mapInfo["gtid"].(string)
  321. eid, _ := mapInfo["lteid"].(string)
  322. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  323. by, _ := json.Marshal(map[string]interface{}{
  324. "gtid": sid,
  325. "lteid": eid,
  326. "stype": util.ObjToString(to["stype"]),
  327. "key": key,
  328. })
  329. addr := &net.UDPAddr{
  330. IP: net.ParseIP(to["addr"].(string)),
  331. Port: util.IntAll(to["port"]),
  332. }
  333. node := &udpNode{by, addr, time.Now().Unix(), 0}
  334. udptaskmap.Store(key, node)
  335. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  336. }
  337. }
  338. }
  339. func updateOcrFileData(cur_lteid string) {
  340. //更新ocr 分类表-判重的状态
  341. log.Println("开始更新Ocr表-标记",cur_lteid)
  342. task_sess := task_mgo.GetMgoConn()
  343. defer task_mgo.DestoryMongoConn(task_sess)
  344. q_task:=map[string]interface{}{}
  345. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
  346. isUpdateOcr:=false
  347. updateOcrFile:=[][]map[string]interface{}{}
  348. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  349. cur_id := BsonTOStringId(tmp["_id"])
  350. lteid:=util.ObjToString(tmp["lteid"])
  351. if (lteid==cur_lteid) { //需要更新
  352. log.Println("找到该lteid数据",cur_lteid,cur_id)
  353. isUpdateOcr = true
  354. updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
  355. map[string]interface{}{
  356. "_id": tmp["_id"],
  357. },
  358. map[string]interface{}{
  359. "$set": map[string]interface{}{
  360. "is_repeat_status": 1,
  361. "is_repeat_time" : util.Int64All(time.Now().Unix()),
  362. },
  363. },
  364. })
  365. tmp = make(map[string]interface{})
  366. break
  367. }else {
  368. tmp = make(map[string]interface{})
  369. }
  370. }
  371. if !isUpdateOcr {
  372. log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
  373. }else {
  374. if len(updateOcrFile) > 0 {
  375. task_mgo.UpSertBulk(task_collName, updateOcrFile...)
  376. }
  377. }
  378. }
  379. //历史判重
  380. func historyTaskDay() {
  381. defer util.Catch()
  382. for {
  383. start:=time.Now().Unix()
  384. if gtid=="" {
  385. log.Println("请传gtid,否则无法运行")
  386. os.Exit(0)
  387. return
  388. }
  389. if lteid!="" {
  390. //先进行数据迁移
  391. log.Println("开启一次迁移任务",gtid,lteid)
  392. moveHistoryData(gtid,lteid)
  393. gtid = lteid //替换数据
  394. }
  395. //查询表最后一个id
  396. task_sess := task_mgo.GetMgoConn()
  397. defer task_mgo.DestoryMongoConn(task_sess)
  398. q:=map[string]interface{}{}
  399. between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
  400. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
  401. isRepeatStatus:=false
  402. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  403. is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
  404. if is_repeat_status == 1 {
  405. lteid = util.ObjToString(tmp["lteid"])
  406. log.Println("查询的最后一个已标记的任务lteid:",lteid)
  407. isRepeatStatus = true
  408. tmp = make(map[string]interface{})
  409. break
  410. }else {
  411. tmp = make(map[string]interface{})
  412. }
  413. }
  414. if !isRepeatStatus {
  415. log.Println("查询不到有标记的lteid数据")
  416. log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
  417. time.Sleep(5 * time.Minute)
  418. continue
  419. }
  420. log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
  421. time.Sleep(5 * time.Minute)
  422. sess := mgo.GetMgoConn()//连接器
  423. defer mgo.DestoryMongoConn(sess)
  424. //开始判重
  425. q = map[string]interface{}{
  426. "_id": map[string]interface{}{
  427. "$gt": StringTOBsonId(gtid),
  428. "$lte": StringTOBsonId(lteid),
  429. },
  430. }
  431. log.Println("历史判重查询条件:",q,"时间:", between_time)
  432. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  433. num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
  434. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  435. dayArr := []map[string]interface{}{}
  436. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  437. if num%10000 == 0 {
  438. log.Println("正序遍历:", num)
  439. }
  440. //取-符合-发布时间X年内的数据
  441. if util.IntAll(tmp["dataging"]) == 1 {
  442. pubtime := util.Int64All(tmp["publishtime"])
  443. if pubtime > 0 && pubtime >= between_time {
  444. oknum++
  445. if deterTime==0 {
  446. log.Println("找到第一条符合条件的数据")
  447. deterTime = util.Int64All(tmp["publishtime"])
  448. dayArr = append(dayArr,tmp)
  449. }else {
  450. if pubtime-deterTime >timingSpanDay*86400 {
  451. //新数组重新构建,当前组数据加到全部组数据
  452. pendAllArr = append(pendAllArr,dayArr)
  453. dayArr = []map[string]interface{}{}
  454. deterTime = util.Int64All(tmp["publishtime"])
  455. dayArr = append(dayArr,tmp)
  456. }else {
  457. dayArr = append(dayArr,tmp)
  458. }
  459. }
  460. }else {
  461. outnum++
  462. //不在两年内的也清标记
  463. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  464. map[string]interface{}{
  465. "_id": tmp["_id"],
  466. },
  467. map[string]interface{}{
  468. "$set": map[string]interface{}{
  469. "dataging": 0,
  470. "history_updatetime":util.Int64All(time.Now().Unix()),
  471. },
  472. },
  473. }
  474. }
  475. }
  476. tmp = make(map[string]interface{})
  477. }
  478. if len(dayArr)>0 {
  479. pendAllArr = append(pendAllArr,dayArr)
  480. dayArr = []map[string]interface{}{}
  481. }
  482. log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
  483. if len(pendAllArr) <= 0 {
  484. log.Println("没找到dataging==1的数据")
  485. }
  486. //测试分组数量是否正确
  487. testNum:=0
  488. for k,v:=range pendAllArr {
  489. log.Println("第",k,"组--","数量:",len(v))
  490. testNum = testNum+len(v)
  491. }
  492. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  493. n, repeateN := 0, 0
  494. log.Println("线程数:",threadNum)
  495. pool := make(chan bool, threadNum)
  496. wg := &sync.WaitGroup{}
  497. for k,v:=range pendAllArr { //每组结束更新一波数据
  498. pool <- true
  499. wg.Add(1)
  500. go func(k int, v []map[string]interface{}) {
  501. defer func() {
  502. <-pool
  503. wg.Done()
  504. }()
  505. //相关ids 跨表
  506. groupOtherExtract := [][]map[string]interface{}{}
  507. //构建当前组的数据池
  508. log.Println("构建第", k, "组---(数据池)")
  509. //当前组的第一个发布时间
  510. first_pt := util.Int64All(v[len(v)-1]["publishtime"])
  511. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  512. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  513. n = n + len(v)
  514. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  515. for _, tmp := range v {
  516. info := NewInfo(tmp)
  517. b, source, reason := curTM.check(info)
  518. if b { //有重复,生成更新语句,更新抽取和更新招标
  519. repeateN++
  520. //重复数据打标签
  521. repeat_ids:=source.repeat_ids
  522. repeat_ids = append(repeat_ids,info.id)
  523. source.repeat_ids = repeat_ids
  524. updatelock.Lock()
  525. //替换数据池-更新
  526. DM.replacePoolData(source)
  527. //更新数据源
  528. //判断是否在当前段落
  529. if judgeIsCurIds(gtid,lteid,source.id) {
  530. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  531. map[string]interface{}{
  532. "_id": StringTOBsonId(source.id),
  533. },
  534. map[string]interface{}{
  535. "$set": map[string]interface{}{
  536. "repeat_ids": repeat_ids,
  537. },
  538. },
  539. }
  540. }else {
  541. groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
  542. map[string]interface{}{
  543. "_id": StringTOBsonId(source.id),
  544. },
  545. map[string]interface{}{
  546. "$set": map[string]interface{}{
  547. "repeat_ids": repeat_ids,
  548. },
  549. },
  550. })
  551. }
  552. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  553. map[string]interface{}{
  554. "_id": tmp["_id"],
  555. },
  556. map[string]interface{}{
  557. "$set": map[string]interface{}{
  558. "repeat": 1,
  559. "repeat_reason": reason,
  560. "repeat_id": source.id,
  561. "dataging": 0,
  562. "history_updatetime":util.Int64All(time.Now().Unix()),
  563. },
  564. },
  565. }
  566. if len(groupOtherExtract) >= 500 {
  567. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  568. groupOtherExtract = [][]map[string]interface{}{}
  569. }
  570. updatelock.Unlock()
  571. } else {
  572. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  573. map[string]interface{}{
  574. "_id": tmp["_id"],
  575. },
  576. map[string]interface{}{
  577. "$set": map[string]interface{}{
  578. "dataging": 0, //符合条件的都为dataging==0
  579. "history_updatetime":util.Int64All(time.Now().Unix()),
  580. },
  581. },
  582. }
  583. }
  584. }
  585. //每组数据结束-更新数据
  586. updatelock.Lock()
  587. if len(groupOtherExtract) > 0 {
  588. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  589. }
  590. updatelock.Unlock()
  591. }(k, v)
  592. }
  593. wg.Wait()
  594. log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
  595. time.Sleep(30 * time.Second)
  596. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  597. if n >= repeateN && gtid!=lteid{
  598. for _, to := range nextNode {
  599. next_sid := util.BsonIdToSId(gtid)
  600. next_eid := util.BsonIdToSId(lteid)
  601. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  602. by, _ := json.Marshal(map[string]interface{}{
  603. "gtid": next_sid,
  604. "lteid": next_eid,
  605. "stype": util.ObjToString(to["stype"]),
  606. "key": key,
  607. })
  608. addr := &net.UDPAddr{
  609. IP: net.ParseIP(to["addr"].(string)),
  610. Port: util.IntAll(to["port"]),
  611. }
  612. node := &udpNode{by, addr, time.Now().Unix(), 0}
  613. udptaskmap.Store(key, node)
  614. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  615. }
  616. }
  617. end:=time.Now().Unix()
  618. log.Println(gtid,lteid)
  619. if end-start<60*5 {
  620. log.Println("睡眠.............")
  621. time.Sleep(5 * time.Minute)
  622. }
  623. log.Println("继续下一段的历史判重")
  624. }
  625. }
  626. //判断是否在当前id段落
  627. func judgeIsCurIds (gtid string,lteid string,curid string) bool {
  628. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  629. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  630. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  631. if cur_time>=gt_time&&cur_time<=lte_time {
  632. return true
  633. }
  634. return false
  635. }
  636. //迁移上一段数据
  637. func moveHistoryData(startid string,endid string) {
  638. sess := mgo.GetMgoConn()
  639. defer mgo.DestoryMongoConn(sess)
  640. year, month, day := time.Now().Date()
  641. q := map[string]interface{}{
  642. "_id": map[string]interface{}{
  643. "$gt": StringTOBsonId(startid),
  644. "$lte": StringTOBsonId(endid),
  645. },
  646. }
  647. log.Println(q)
  648. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  649. index := 0
  650. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  651. mgo.Save(extract_back, tmp)
  652. tmp = map[string]interface{}{}
  653. if index%1000 == 0 {
  654. log.Println("index", index)
  655. }
  656. }
  657. log.Println("save to", extract_back, " ok index", index)
  658. qv := map[string]interface{}{
  659. "comeintime": map[string]interface{}{
  660. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
  661. },
  662. }
  663. delnum := mgo.Delete(extract, qv)
  664. log.Println("remove from ", extract, delnum)
  665. }
  666. func moveTimeoutData() {
  667. log.Println("部署迁移定时任务")
  668. c := cron.New()
  669. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  670. c.Start()
  671. }
  672. func moveOnceTimeOut() {
  673. log.Println("执行一次迁移超时数据")
  674. sess := mgo.GetMgoConn()
  675. defer mgo.DestoryMongoConn(sess)
  676. now:=time.Now()
  677. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  678. task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  679. q := map[string]interface{}{
  680. "_id": map[string]interface{}{
  681. "$lt": StringTOBsonId(task_id),
  682. },
  683. }
  684. it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
  685. index := 0
  686. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  687. if index%10000 == 0 {
  688. log.Println("index", index)
  689. }
  690. del_id:=BsonTOStringId(tmp["_id"])
  691. mgo.Save("result_20200713", tmp)
  692. mgo.DeleteById("result_20200714",del_id)
  693. tmp = map[string]interface{}{}
  694. }
  695. log.Println("save and delete", " ok index", index)
  696. }