main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/cron"
  10. "gopkg.in/mgo.v2/bson"
  11. "log"
  12. mu "mfw/util"
  13. "net"
  14. "os"
  15. "qfw/util"
  16. "regexp"
  17. "strconv"
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. Sysconfig map[string]interface{} //配置文件
  23. mconf map[string]interface{} //mongodb配置信息
  24. mgo *MongodbSim //mongodb操作对象
  25. task_mgo *MongodbSim //mongodb操作对象
  26. task_collName string
  27. extract string
  28. extract_back string
  29. udpclient mu.UdpClient //udp对象
  30. nextNode []map[string]interface{} //下节点数组
  31. dupdays = 7 //初始化判重范围
  32. DM *datamap //
  33. Update *updateInfo
  34. //正则筛选相关
  35. FilterRegTitle = regexp.MustCompile("^_$")
  36. FilterRegTitle_0 = regexp.MustCompile("^_$")
  37. FilterRegTitle_1 = regexp.MustCompile("^_$")
  38. FilterRegTitle_2 = regexp.MustCompile("^_$")
  39. isMerger bool //是否合并
  40. threadNum int //线程数量
  41. SiteMap map[string]map[string]interface{} //站点map
  42. LowHeavy bool //低质量数据判重
  43. TimingTask bool //是否定时任务
  44. timingSpanDay int64 //时间跨度
  45. timingPubScope int64 //发布时间周期
  46. gtid,lastid,gtept,ltept string //命令输入
  47. lteid string //历史增量属性
  48. IsFull bool //是否全量
  49. updatelock sync.Mutex //锁4
  50. )
  51. func init() {
  52. flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
  53. flag.StringVar(&gtid, "gtid", "", "历史增量的起始id") //历史
  54. flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
  55. flag.StringVar(&ltept, "ltept", "", "全量lte发布时间") //全量区间pt
  56. flag.Parse()
  57. util.ReadConfig(&Sysconfig)
  58. task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
  59. task_mgo = &MongodbSim{
  60. MongodbAddr: task_mconf["task_addrName"].(string),
  61. DbName: task_mconf["task_dbName"].(string),
  62. Size: util.IntAllDef(task_mconf["task_pool"], 10),
  63. }
  64. task_mgo.InitPool()
  65. task_collName = task_mconf["task_collName"].(string)
  66. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  67. mconf = Sysconfig["mongodb"].(map[string]interface{})
  68. mgo = &MongodbSim{
  69. MongodbAddr: mconf["addr"].(string),
  70. DbName: mconf["db"].(string),
  71. Size: util.IntAllDef(mconf["pool"], 10),
  72. }
  73. mgo.InitPool()
  74. extract = mconf["extract"].(string)
  75. extract_back = mconf["extract_back"].(string)
  76. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  77. //加载数据
  78. DM = NewDatamap(dupdays, lastid)
  79. //更新池
  80. Update = newUpdatePool()
  81. go Update.updateData()
  82. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  83. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  84. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  85. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  86. isMerger = Sysconfig["isMerger"].(bool)
  87. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  88. LowHeavy = Sysconfig["lowHeavy"].(bool)
  89. TimingTask = Sysconfig["timingTask"].(bool)
  90. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  91. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  92. //站点配置
  93. site := mconf["site"].(map[string]interface{})
  94. SiteMap = make(map[string]map[string]interface{}, 0)
  95. start := int(time.Now().Unix())
  96. sess_site := mgo.GetMgoConn()
  97. defer mgo.DestoryMongoConn(sess_site)
  98. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  99. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  100. data_map := map[string]interface{}{
  101. "area": util.ObjToString(site_dict["area"]),
  102. "city": util.ObjToString(site_dict["city"]),
  103. "district": util.ObjToString(site_dict["district"]),
  104. "sitetype": util.ObjToString(site_dict["sitetype"]),
  105. "level": util.ObjToString(site_dict["level"]),
  106. "weight": util.ObjToString(site_dict["weight"]),
  107. }
  108. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  109. }
  110. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  111. }
  112. func main() {
  113. go checkMapJob()
  114. updport := Sysconfig["udpport"].(string)
  115. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  116. udpclient.Listen(processUdpMsg)
  117. log.Println("Udp服务监听", updport)
  118. if TimingTask {
  119. log.Println("正常历史部署")
  120. go historyTaskDay()
  121. }else {
  122. if gtept!=""&&ltept!="" {
  123. log.Println("全量判重-准备开始")
  124. IsFull = true //全量判重
  125. sid := "1fffffffffffffffffffffff"
  126. eid := "9fffffffffffffffffffffff"
  127. mapinfo := map[string]interface{}{}
  128. if sid == "" || eid == "" {
  129. log.Println("sid,eid参数不能为空")
  130. os.Exit(0)
  131. }
  132. mapinfo["gtid"] = sid
  133. mapinfo["lteid"] = eid
  134. mapinfo["stop"] = "true"
  135. task([]byte{}, mapinfo)
  136. time.Sleep(99999 * time.Hour)
  137. }else {
  138. //正常增量
  139. log.Println("正常增量部署")
  140. }
  141. }
  142. time.Sleep(99999 * time.Hour)
  143. }
  144. //测试组人员使用
  145. func mainT() {
  146. if TimingTask {
  147. go historyTaskDay()
  148. time.Sleep(99999 * time.Hour)
  149. } else {
  150. IsFull = true //全量判重
  151. sid := "1fffffffffffffffffffffff"
  152. eid := "9fffffffffffffffffffffff"
  153. mapinfo := map[string]interface{}{}
  154. if sid == "" || eid == "" {
  155. log.Println("sid,eid参数不能为空")
  156. os.Exit(0)
  157. }
  158. mapinfo["gtid"] = sid
  159. mapinfo["lteid"] = eid
  160. mapinfo["stop"] = "true"
  161. log.Println("测试:全量判重-准备开始")
  162. task([]byte{}, mapinfo)
  163. time.Sleep(99999 * time.Hour)
  164. }
  165. }
  166. //upd接收
  167. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  168. fmt.Println("接受的段数据")
  169. switch act {
  170. case mu.OP_TYPE_DATA: //上个节点的数据
  171. //从表中开始处理
  172. var mapInfo map[string]interface{}
  173. err := json.Unmarshal(data, &mapInfo)
  174. log.Println("err:", err, "mapInfo:", mapInfo)
  175. if err != nil {
  176. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  177. } else if mapInfo != nil {
  178. go task(data, mapInfo)
  179. key, _ := mapInfo["key"].(string)
  180. if key == "" {
  181. key = "udpok"
  182. }
  183. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  184. }
  185. case mu.OP_NOOP: //下个节点回应
  186. ok := string(data)
  187. if ok != "" {
  188. log.Println("ok:", ok)
  189. udptaskmap.Delete(ok)
  190. }
  191. }
  192. }
  193. //开始判重程序
  194. func task(data []byte, mapInfo map[string]interface{}) {
  195. log.Println("开始数据判重")
  196. defer util.Catch()
  197. //区间id
  198. q := map[string]interface{}{
  199. "_id": map[string]interface{}{
  200. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  201. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  202. },
  203. }
  204. //全量
  205. if IsFull && gtept!="" && ltept!=""{
  206. log.Println("执行全量分段模式")
  207. log.Println(gtept,"---",ltept)
  208. q = map[string]interface{}{
  209. "publishtime": map[string]interface{}{
  210. "$gte": util.Int64All(gtept),
  211. "$lte": util.Int64All(ltept),
  212. },
  213. }
  214. }
  215. log.Println("查询条件:",mgo.DbName, extract, q)
  216. sess := mgo.GetMgoConn()
  217. defer mgo.DestoryMongoConn(sess)
  218. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  219. pool := make(chan bool, threadNum)
  220. wg := &sync.WaitGroup{}
  221. n, repeateN := 0, 0
  222. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  223. if n%100 == 0 {
  224. log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
  225. }
  226. if util.IntAll(tmp["repeat"]) == 1 {
  227. repeateN++
  228. tmp = make(map[string]interface{})
  229. continue
  230. }
  231. if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
  232. tmp = make(map[string]interface{})
  233. continue
  234. }
  235. pool <- true
  236. wg.Add(1)
  237. go func(tmp map[string]interface{}) {
  238. defer func() {
  239. <-pool
  240. wg.Done()
  241. }()
  242. info := NewInfo(tmp)
  243. //正常判重
  244. b, source, reason := DM.check(info)
  245. if b {
  246. repeateN++
  247. var updateID = map[string]interface{}{} //记录更新判重的
  248. updateID["_id"] = StringTOBsonId(info.id)
  249. repeat_ids:=source.repeat_ids
  250. repeat_ids = append(repeat_ids,info.id)
  251. source.repeat_ids = repeat_ids
  252. //替换数据池-更新
  253. DM.replacePoolData(source)
  254. Update.updatePool <- []map[string]interface{}{//原始数据打标签
  255. map[string]interface{}{
  256. "_id": StringTOBsonId(source.id),
  257. },
  258. map[string]interface{}{
  259. "$set": map[string]interface{}{
  260. "repeat_ids": repeat_ids,
  261. },
  262. },
  263. }
  264. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  265. updateID,
  266. map[string]interface{}{
  267. "$set": map[string]interface{}{
  268. "repeat": 1,
  269. "repeat_reason": reason,
  270. "repeat_id": source.id,
  271. "dataging": 0,
  272. },
  273. },
  274. }
  275. }
  276. }(tmp)
  277. tmp = make(map[string]interface{})
  278. }
  279. wg.Wait()
  280. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  281. time.Sleep(30 * time.Second)
  282. //更新Ocr的标记
  283. updateOcrFileData(mapInfo["lteid"].(string))
  284. //任务完成,开始发送广播通知下面节点
  285. if n >= repeateN && mapInfo["stop"] == nil {
  286. log.Println("判重任务完成发送udp")
  287. for _, to := range nextNode {
  288. sid, _ := mapInfo["gtid"].(string)
  289. eid, _ := mapInfo["lteid"].(string)
  290. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  291. by, _ := json.Marshal(map[string]interface{}{
  292. "gtid": sid,
  293. "lteid": eid,
  294. "stype": util.ObjToString(to["stype"]),
  295. "key": key,
  296. })
  297. addr := &net.UDPAddr{
  298. IP: net.ParseIP(to["addr"].(string)),
  299. Port: util.IntAll(to["port"]),
  300. }
  301. node := &udpNode{by, addr, time.Now().Unix(), 0}
  302. udptaskmap.Store(key, node)
  303. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  304. }
  305. }
  306. }
  307. func updateOcrFileData(cur_lteid string) {
  308. //更新ocr 分类表-判重的状态
  309. log.Println("开始更新Ocr表-标记",cur_lteid)
  310. task_sess := task_mgo.GetMgoConn()
  311. defer task_mgo.DestoryMongoConn(task_sess)
  312. q_task:=map[string]interface{}{}
  313. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
  314. isUpdateOcr:=false
  315. updateOcrFile:=[][]map[string]interface{}{}
  316. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  317. cur_id := BsonTOStringId(tmp["_id"])
  318. lteid:=util.ObjToString(tmp["lteid"])
  319. if (lteid==cur_lteid) { //需要更新
  320. log.Println("找到该lteid数据",cur_lteid,cur_id)
  321. isUpdateOcr = true
  322. updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
  323. map[string]interface{}{
  324. "_id": tmp["_id"],
  325. },
  326. map[string]interface{}{
  327. "$set": map[string]interface{}{
  328. "is_repeat_status": 1,
  329. "is_repeat_time" : util.Int64All(time.Now().Unix()),
  330. },
  331. },
  332. })
  333. tmp = make(map[string]interface{})
  334. break
  335. }else {
  336. tmp = make(map[string]interface{})
  337. }
  338. }
  339. if !isUpdateOcr {
  340. log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
  341. }else {
  342. if len(updateOcrFile) > 0 {
  343. task_mgo.UpSertBulk(task_collName, updateOcrFile...)
  344. }
  345. }
  346. }
  347. //历史判重
  348. func historyTaskDay() {
  349. defer util.Catch()
  350. for {
  351. start:=time.Now().Unix()
  352. if gtid=="" {
  353. log.Println("请传gtid,否则无法运行")
  354. os.Exit(0)
  355. return
  356. }
  357. if lteid!="" {
  358. //先进行数据迁移
  359. log.Println("开启一次迁移任务",gtid,lteid)
  360. moveHistoryData(gtid,lteid)
  361. gtid = lteid //替换数据
  362. }
  363. //查询表最后一个id
  364. task_sess := task_mgo.GetMgoConn()
  365. defer task_mgo.DestoryMongoConn(task_sess)
  366. q:=map[string]interface{}{}
  367. between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
  368. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
  369. isRepeatStatus:=false
  370. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  371. is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
  372. if is_repeat_status == 1 {
  373. lteid = util.ObjToString(tmp["lteid"])
  374. log.Println("查询的最后一个已标记的任务lteid:",lteid)
  375. isRepeatStatus = true
  376. tmp = make(map[string]interface{})
  377. break
  378. }else {
  379. tmp = make(map[string]interface{})
  380. }
  381. }
  382. if !isRepeatStatus {
  383. log.Println("查询不到有标记的lteid数据")
  384. log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
  385. time.Sleep(5 * time.Minute)
  386. continue
  387. }
  388. log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
  389. time.Sleep(5 * time.Minute)
  390. sess := mgo.GetMgoConn()//连接器
  391. defer mgo.DestoryMongoConn(sess)
  392. //开始判重
  393. q = map[string]interface{}{
  394. "_id": map[string]interface{}{
  395. "$gt": StringTOBsonId(gtid),
  396. "$lte": StringTOBsonId(lteid),
  397. },
  398. }
  399. log.Println("历史判重查询条件:",q,"时间:", between_time)
  400. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  401. num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
  402. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  403. dayArr := []map[string]interface{}{}
  404. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  405. if num%10000 == 0 {
  406. log.Println("正序遍历:", num)
  407. }
  408. //取-符合-发布时间X年内的数据
  409. if util.IntAll(tmp["dataging"]) == 1 {
  410. pubtime := util.Int64All(tmp["publishtime"])
  411. if pubtime > 0 && pubtime >= between_time {
  412. oknum++
  413. if deterTime==0 {
  414. log.Println("找到第一条符合条件的数据")
  415. deterTime = util.Int64All(tmp["publishtime"])
  416. dayArr = append(dayArr,tmp)
  417. }else {
  418. if pubtime-deterTime >timingSpanDay*86400 {
  419. //新数组重新构建,当前组数据加到全部组数据
  420. pendAllArr = append(pendAllArr,dayArr)
  421. dayArr = []map[string]interface{}{}
  422. deterTime = util.Int64All(tmp["publishtime"])
  423. dayArr = append(dayArr,tmp)
  424. }else {
  425. dayArr = append(dayArr,tmp)
  426. }
  427. }
  428. }else {
  429. outnum++
  430. //不在两年内的也清标记
  431. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  432. map[string]interface{}{
  433. "_id": tmp["_id"],
  434. },
  435. map[string]interface{}{
  436. "$set": map[string]interface{}{
  437. "dataging": 0,
  438. "history_updatetime":util.Int64All(time.Now().Unix()),
  439. },
  440. },
  441. }
  442. }
  443. }
  444. tmp = make(map[string]interface{})
  445. }
  446. if len(dayArr)>0 {
  447. pendAllArr = append(pendAllArr,dayArr)
  448. dayArr = []map[string]interface{}{}
  449. }
  450. log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
  451. if len(pendAllArr) <= 0 {
  452. log.Println("没找到dataging==1的数据")
  453. }
  454. //测试分组数量是否正确
  455. testNum:=0
  456. for k,v:=range pendAllArr {
  457. log.Println("第",k,"组--","数量:",len(v))
  458. testNum = testNum+len(v)
  459. }
  460. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  461. n, repeateN := 0, 0
  462. log.Println("线程数:",threadNum)
  463. pool := make(chan bool, threadNum)
  464. wg := &sync.WaitGroup{}
  465. for k,v:=range pendAllArr { //每组结束更新一波数据
  466. pool <- true
  467. wg.Add(1)
  468. go func(k int, v []map[string]interface{}) {
  469. defer func() {
  470. <-pool
  471. wg.Done()
  472. }()
  473. //相关ids 跨表
  474. groupOtherExtract := [][]map[string]interface{}{}
  475. //构建当前组的数据池
  476. log.Println("构建第", k, "组---(数据池)")
  477. //当前组的第一个发布时间
  478. first_pt := util.Int64All(v[len(v)-1]["publishtime"])
  479. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  480. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  481. n = n + len(v)
  482. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  483. for _, tmp := range v {
  484. info := NewInfo(tmp)
  485. b, source, reason := curTM.check(info)
  486. if b { //有重复,生成更新语句,更新抽取和更新招标
  487. repeateN++
  488. //重复数据打标签
  489. repeat_ids:=source.repeat_ids
  490. repeat_ids = append(repeat_ids,info.id)
  491. source.repeat_ids = repeat_ids
  492. updatelock.Lock()
  493. //替换数据池-更新
  494. DM.replacePoolData(source)
  495. //更新数据源
  496. //判断是否在当前段落
  497. if judgeIsCurIds(gtid,lteid,source.id) {
  498. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  499. map[string]interface{}{
  500. "_id": StringTOBsonId(source.id),
  501. },
  502. map[string]interface{}{
  503. "$set": map[string]interface{}{
  504. "repeat_ids": repeat_ids,
  505. },
  506. },
  507. }
  508. }else {
  509. groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
  510. map[string]interface{}{
  511. "_id": StringTOBsonId(source.id),
  512. },
  513. map[string]interface{}{
  514. "$set": map[string]interface{}{
  515. "repeat_ids": repeat_ids,
  516. },
  517. },
  518. })
  519. }
  520. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  521. map[string]interface{}{
  522. "_id": tmp["_id"],
  523. },
  524. map[string]interface{}{
  525. "$set": map[string]interface{}{
  526. "repeat": 1,
  527. "repeat_reason": reason,
  528. "repeat_id": source.id,
  529. "dataging": 0,
  530. "history_updatetime":util.Int64All(time.Now().Unix()),
  531. },
  532. },
  533. }
  534. if len(groupOtherExtract) >= 500 {
  535. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  536. groupOtherExtract = [][]map[string]interface{}{}
  537. }
  538. updatelock.Unlock()
  539. } else {
  540. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  541. map[string]interface{}{
  542. "_id": tmp["_id"],
  543. },
  544. map[string]interface{}{
  545. "$set": map[string]interface{}{
  546. "dataging": 0, //符合条件的都为dataging==0
  547. "history_updatetime":util.Int64All(time.Now().Unix()),
  548. },
  549. },
  550. }
  551. }
  552. }
  553. //每组数据结束-更新数据
  554. updatelock.Lock()
  555. if len(groupOtherExtract) > 0 {
  556. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  557. }
  558. updatelock.Unlock()
  559. }(k, v)
  560. }
  561. wg.Wait()
  562. log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
  563. time.Sleep(30 * time.Second)
  564. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  565. if n >= repeateN && gtid!=lteid{
  566. for _, to := range nextNode {
  567. next_sid := util.BsonIdToSId(gtid)
  568. next_eid := util.BsonIdToSId(lteid)
  569. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  570. by, _ := json.Marshal(map[string]interface{}{
  571. "gtid": next_sid,
  572. "lteid": next_eid,
  573. "stype": util.ObjToString(to["stype"]),
  574. "key": key,
  575. })
  576. addr := &net.UDPAddr{
  577. IP: net.ParseIP(to["addr"].(string)),
  578. Port: util.IntAll(to["port"]),
  579. }
  580. node := &udpNode{by, addr, time.Now().Unix(), 0}
  581. udptaskmap.Store(key, node)
  582. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  583. }
  584. }
  585. end:=time.Now().Unix()
  586. log.Println(gtid,lteid)
  587. if end-start<60*5 {
  588. log.Println("睡眠.............")
  589. time.Sleep(5 * time.Minute)
  590. }
  591. log.Println("继续下一段的历史判重")
  592. }
  593. }
  594. //判断是否在当前id段落
  595. func judgeIsCurIds (gtid string,lteid string,curid string) bool {
  596. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  597. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  598. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  599. if cur_time>=gt_time&&cur_time<=lte_time {
  600. return true
  601. }
  602. return false
  603. }
  604. //迁移上一段数据
  605. func moveHistoryData(startid string,endid string) {
  606. sess := mgo.GetMgoConn()
  607. defer mgo.DestoryMongoConn(sess)
  608. year, month, day := time.Now().Date()
  609. q := map[string]interface{}{
  610. "_id": map[string]interface{}{
  611. "$gt": StringTOBsonId(startid),
  612. "$lte": StringTOBsonId(endid),
  613. },
  614. }
  615. log.Println(q)
  616. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  617. index := 0
  618. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  619. mgo.Save(extract_back, tmp)
  620. tmp = map[string]interface{}{}
  621. if index%1000 == 0 {
  622. log.Println("index", index)
  623. }
  624. }
  625. log.Println("save to", extract_back, " ok index", index)
  626. qv := map[string]interface{}{
  627. "comeintime": map[string]interface{}{
  628. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
  629. },
  630. }
  631. delnum := mgo.Delete(extract, qv)
  632. log.Println("remove from ", extract, delnum)
  633. }
  634. func moveTimeoutData() {
  635. log.Println("部署迁移定时任务")
  636. c := cron.New()
  637. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  638. c.Start()
  639. }
  640. func moveOnceTimeOut() {
  641. log.Println("执行一次迁移超时数据")
  642. sess := mgo.GetMgoConn()
  643. defer mgo.DestoryMongoConn(sess)
  644. now:=time.Now()
  645. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  646. task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  647. q := map[string]interface{}{
  648. "_id": map[string]interface{}{
  649. "$lt": StringTOBsonId(task_id),
  650. },
  651. }
  652. it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
  653. index := 0
  654. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  655. if index%10000 == 0 {
  656. log.Println("index", index)
  657. }
  658. del_id:=BsonTOStringId(tmp["_id"])
  659. mgo.Save("result_20200713", tmp)
  660. mgo.DeleteById("result_20200714",del_id)
  661. tmp = map[string]interface{}{}
  662. }
  663. log.Println("save and delete", " ok index", index)
  664. }