main.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/cron"
  10. "gopkg.in/mgo.v2/bson"
  11. "log"
  12. mu "mfw/util"
  13. "net"
  14. "os"
  15. "qfw/util"
  16. "regexp"
  17. "strconv"
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. Sysconfig map[string]interface{} //配置文件
  23. mconf map[string]interface{} //mongodb配置信息
  24. mgo *MongodbSim //mongodb操作对象
  25. task_mgo *MongodbSim //mongodb操作对象
  26. task_collName string
  27. extract string
  28. extract_back string
  29. udpclient mu.UdpClient //udp对象
  30. nextNode []map[string]interface{} //下节点数组
  31. dupdays = 7 //初始化判重范围
  32. DM *datamap //
  33. //正则筛选相关
  34. FilterRegTitle = regexp.MustCompile("^_$")
  35. FilterRegTitle_0 = regexp.MustCompile("^_$")
  36. FilterRegTitle_1 = regexp.MustCompile("^_$")
  37. FilterRegTitle_2 = regexp.MustCompile("^_$")
  38. isMerger bool //是否合并
  39. threadNum int //线程数量
  40. SiteMap map[string]map[string]interface{} //站点map
  41. LowHeavy bool //低质量数据判重
  42. TimingTask bool //是否定时任务
  43. timingSpanDay int64 //时间跨度
  44. timingPubScope int64 //发布时间周期
  45. gtid,lteid,lastid,gtept,ltept string //命令输入
  46. IdType bool //默认object类型
  47. IsFull bool
  48. updatelock sync.Mutex //锁
  49. )
  50. func init() {
  51. flag.StringVar(&lastid, "id", "", "增量加载的lastid") //以小于等于此id开始加载最近几天的数据
  52. flag.StringVar(&gtid, "gtid", "", "历史增量的起始id")
  53. flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")
  54. flag.StringVar(&gtept, "ltept", "", "全量lte发布时间")
  55. flag.Parse()
  56. util.ReadConfig(&Sysconfig)
  57. task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
  58. task_mgo = &MongodbSim{
  59. MongodbAddr: task_mconf["task_addrName"].(string),
  60. DbName: task_mconf["task_dbName"].(string),
  61. Size: util.IntAllDef(task_mconf["task_pool"], 10),
  62. }
  63. task_mgo.InitPool()
  64. task_collName = task_mconf["task_collName"].(string)
  65. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  66. mconf = Sysconfig["mongodb"].(map[string]interface{})
  67. mgo = &MongodbSim{
  68. MongodbAddr: mconf["addr"].(string),
  69. DbName: mconf["db"].(string),
  70. Size: util.IntAllDef(mconf["pool"], 10),
  71. }
  72. mgo.InitPool()
  73. extract = mconf["extract"].(string)
  74. extract_back = mconf["extract_back"].(string)
  75. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  76. //加载数据
  77. DM = NewDatamap(dupdays, lastid)
  78. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  79. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  80. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  81. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  82. isMerger = Sysconfig["isMerger"].(bool)
  83. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  84. LowHeavy = Sysconfig["lowHeavy"].(bool)
  85. TimingTask = Sysconfig["timingTask"].(bool)
  86. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  87. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  88. //站点配置
  89. site := mconf["site"].(map[string]interface{})
  90. SiteMap = make(map[string]map[string]interface{}, 0)
  91. start := int(time.Now().Unix())
  92. sess_site := mgo.GetMgoConn()
  93. defer mgo.DestoryMongoConn(sess_site)
  94. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  95. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  96. data_map := map[string]interface{}{
  97. "area": util.ObjToString(site_dict["area"]),
  98. "city": util.ObjToString(site_dict["city"]),
  99. "district": util.ObjToString(site_dict["district"]),
  100. "sitetype": util.ObjToString(site_dict["sitetype"]),
  101. "level": util.ObjToString(site_dict["level"]),
  102. "weight": util.ObjToString(site_dict["weight"]),
  103. }
  104. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  105. }
  106. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  107. }
  108. func main() {
  109. go checkMapJob()
  110. updport := Sysconfig["udpport"].(string)
  111. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  112. udpclient.Listen(processUdpMsg)
  113. log.Println("Udp服务监听", updport)
  114. if TimingTask {
  115. go historyTaskDay()
  116. }
  117. time.Sleep(99999 * time.Hour)
  118. }
  119. //测试组人员使用
  120. func mainT() {
  121. if TimingTask {
  122. go historyTaskDay()
  123. time.Sleep(99999 * time.Hour)
  124. } else {
  125. //IdType = true //打开id字符串模式
  126. IsFull = true //全量判重
  127. sid := "1fffffffffffffffffffffff"
  128. eid := "9fffffffffffffffffffffff"
  129. log.Println("正常判重测试开始")
  130. log.Println(sid, "---", eid)
  131. mapinfo := map[string]interface{}{}
  132. if sid == "" || eid == "" {
  133. log.Println("sid,eid参数不能为空")
  134. os.Exit(0)
  135. }
  136. mapinfo["gtid"] = sid
  137. mapinfo["lteid"] = eid
  138. mapinfo["stop"] = "true"
  139. task([]byte{}, mapinfo)
  140. time.Sleep(99999 * time.Hour)
  141. }
  142. }
  143. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  144. fmt.Println("接受的段数据")
  145. switch act {
  146. case mu.OP_TYPE_DATA: //上个节点的数据
  147. //从表中开始处理
  148. var mapInfo map[string]interface{}
  149. err := json.Unmarshal(data, &mapInfo)
  150. log.Println("err:", err, "mapInfo:", mapInfo)
  151. if err != nil {
  152. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  153. } else if mapInfo != nil {
  154. go task(data, mapInfo)
  155. key, _ := mapInfo["key"].(string)
  156. if key == "" {
  157. key = "udpok"
  158. }
  159. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  160. }
  161. case mu.OP_NOOP: //下个节点回应
  162. ok := string(data)
  163. if ok != "" {
  164. log.Println("ok:", ok)
  165. udptaskmap.Delete(ok)
  166. }
  167. }
  168. }
  169. //开始判重程序
  170. func task(data []byte, mapInfo map[string]interface{}) {
  171. log.Println("开始数据判重")
  172. defer util.Catch()
  173. //区间id
  174. q := map[string]interface{}{
  175. "_id": map[string]interface{}{
  176. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  177. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  178. },
  179. }
  180. if IdType {
  181. q = map[string]interface{}{
  182. "_id": map[string]interface{}{
  183. "$gt": mapInfo["gtid"].(string),
  184. "$lte": mapInfo["lteid"].(string),
  185. },
  186. }
  187. }
  188. //增量
  189. if IsFull && gtept!="" && ltept!=""{
  190. log.Println("执行分段模式")
  191. q = map[string]interface{}{
  192. "publishtime": map[string]interface{}{
  193. "$gte": util.Int64All(gtept),
  194. "$lte": util.Int64All(ltept),
  195. },
  196. }
  197. }
  198. log.Println(mgo.DbName, extract, q)
  199. sess := mgo.GetMgoConn()
  200. defer mgo.DestoryMongoConn(sess)
  201. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  202. updateExtract := [][]map[string]interface{}{}
  203. pool := make(chan bool, threadNum)
  204. wg := &sync.WaitGroup{}
  205. n, repeateN := 0, 0
  206. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  207. if n%10000 == 0 {
  208. log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
  209. }
  210. source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
  211. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  212. repeateN++
  213. updatelock.Lock()
  214. updateExtract = append(updateExtract, []map[string]interface{}{
  215. map[string]interface{}{
  216. "_id": tmp["_id"],
  217. },
  218. map[string]interface{}{
  219. "$set": map[string]interface{}{
  220. "repeat": 1,
  221. "dataging":0,
  222. "repeat_reason": "sourcewebsite为1,重复",
  223. },
  224. },
  225. })
  226. if len(updateExtract) >= 500 {
  227. mgo.UpSertBulk(extract, updateExtract...)
  228. updateExtract = [][]map[string]interface{}{}
  229. }
  230. updatelock.Unlock()
  231. tmp = make(map[string]interface{})
  232. continue
  233. }
  234. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
  235. util.IntAll(tmp["dataging"]) == 1 {
  236. if util.IntAll(tmp["repeat"]) == 1 {
  237. repeateN++
  238. }
  239. tmp = make(map[string]interface{})
  240. continue
  241. }
  242. pool <- true
  243. wg.Add(1)
  244. go func(tmp map[string]interface{}) {
  245. defer func() {
  246. <-pool
  247. wg.Done()
  248. }()
  249. info := NewInfo(tmp)
  250. //正常判重
  251. b, source, reason := DM.check(info)
  252. if b { //有重复,生成更新语句,更新抽取和更新招标
  253. repeateN++
  254. var updateID = map[string]interface{}{} //记录更新判重的
  255. updateID["_id"] = StringTOBsonId(info.id)
  256. if IdType {
  257. updateID["_id"] = info.id
  258. }
  259. repeat_ids:=source.repeat_ids
  260. repeat_ids = append(repeat_ids,info.id)
  261. source.repeat_ids = repeat_ids
  262. //替换数据池-更新
  263. DM.replacePoolData(source)
  264. updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
  265. map[string]interface{}{
  266. "_id": StringTOBsonId(source.id),
  267. },
  268. map[string]interface{}{
  269. "$set": map[string]interface{}{
  270. "repeat_ids": repeat_ids,
  271. },
  272. },
  273. })
  274. updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
  275. updateID,
  276. map[string]interface{}{
  277. "$set": map[string]interface{}{
  278. "repeat": 1,
  279. "repeat_reason": reason,
  280. "repeat_id": source.id,
  281. },
  282. },
  283. })
  284. }
  285. }(tmp)
  286. updatelock.Lock()
  287. if len(updateExtract) >=500 {
  288. mgo.UpSertBulk(extract, updateExtract...)
  289. updateExtract = [][]map[string]interface{}{}
  290. }
  291. updatelock.Unlock()
  292. tmp = make(map[string]interface{})
  293. }
  294. wg.Wait()
  295. if len(updateExtract) > 0 {
  296. mgo.UpSertBulk(extract, updateExtract...)
  297. }
  298. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  299. time.Sleep(30 * time.Second)
  300. //任务完成,开始发送广播通知下面节点
  301. if n >= repeateN && mapInfo["stop"] == nil {
  302. log.Println("判重任务完成发送udp")
  303. for _, to := range nextNode {
  304. sid, _ := mapInfo["gtid"].(string)
  305. eid, _ := mapInfo["lteid"].(string)
  306. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  307. by, _ := json.Marshal(map[string]interface{}{
  308. "gtid": sid,
  309. "lteid": eid,
  310. "stype": util.ObjToString(to["stype"]),
  311. "key": key,
  312. })
  313. addr := &net.UDPAddr{
  314. IP: net.ParseIP(to["addr"].(string)),
  315. Port: util.IntAll(to["port"]),
  316. }
  317. node := &udpNode{by, addr, time.Now().Unix(), 0}
  318. udptaskmap.Store(key, node)
  319. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  320. }
  321. }
  322. }
  323. func historyTaskDay() {
  324. defer util.Catch()
  325. for {
  326. start:=time.Now().Unix()
  327. if gtid=="" {
  328. log.Println("请传gtid,否则无法运行")
  329. os.Exit(0)
  330. return
  331. }
  332. if lteid!="" {
  333. //先进行数据迁移
  334. log.Println("开启一次迁移任务",gtid,lteid)
  335. moveHistoryData(gtid,lteid)
  336. gtid = lteid //替换数据
  337. }
  338. //查询表最后一个id
  339. task_sess := task_mgo.GetMgoConn()
  340. defer task_mgo.DestoryMongoConn(task_sess)
  341. q:=map[string]interface{}{
  342. "isused":true,
  343. }
  344. between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
  345. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
  346. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  347. lteid = util.ObjToString(tmp["gtid"])
  348. log.Println("查询的最后一个任务Id:",lteid)
  349. break
  350. }
  351. //
  352. log.Println("查询完毕-先睡眠5分钟",gtid,lteid)
  353. time.Sleep(5 * time.Minute)
  354. sess := mgo.GetMgoConn()//连接器
  355. defer mgo.DestoryMongoConn(sess)
  356. //开始判重
  357. q = map[string]interface{}{
  358. "_id": map[string]interface{}{
  359. "$gt": StringTOBsonId(gtid),
  360. "$lte": StringTOBsonId(lteid),
  361. },
  362. }
  363. log.Println("历史判重查询条件:",q,"时间:", between_time)
  364. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  365. num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
  366. updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
  367. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  368. dayArr := []map[string]interface{}{}
  369. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  370. if num%10000 == 0 {
  371. log.Println("正序遍历:", num)
  372. }
  373. source := util.ObjToMap(tmp["jsondata"])
  374. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  375. outnum++
  376. updatelock.Lock()
  377. updateExtract = append(updateExtract, []map[string]interface{}{
  378. map[string]interface{}{
  379. "_id": tmp["_id"],
  380. },
  381. map[string]interface{}{
  382. "$set": map[string]interface{}{
  383. "repeat": 1,
  384. "dataging": 0,
  385. "repeat_reason": "sourcewebsite为1 重复",
  386. },
  387. },
  388. })
  389. if len(updateExtract) > 200 {
  390. log.Println("sourcewebsite,批量更新")
  391. mgo.UpSertBulk(extract, updateExtract...)
  392. updateExtract = [][]map[string]interface{}{}
  393. }
  394. updatelock.Unlock()
  395. tmp = make(map[string]interface{})
  396. continue
  397. }
  398. //取-符合-发布时间X年内的数据
  399. updatelock.Lock()
  400. if util.IntAll(tmp["dataging"]) == 1 {
  401. pubtime := util.Int64All(tmp["publishtime"])
  402. if pubtime > 0 && pubtime >= between_time {
  403. oknum++
  404. if deterTime==0 {
  405. log.Println("找到第一条符合条件的数据")
  406. deterTime = util.Int64All(tmp["publishtime"])
  407. dayArr = append(dayArr,tmp)
  408. }else {
  409. if pubtime-deterTime >timingSpanDay*86400 {
  410. //新数组重新构建,当前组数据加到全部组数据
  411. pendAllArr = append(pendAllArr,dayArr)
  412. dayArr = []map[string]interface{}{}
  413. deterTime = util.Int64All(tmp["publishtime"])
  414. dayArr = append(dayArr,tmp)
  415. }else {
  416. dayArr = append(dayArr,tmp)
  417. }
  418. }
  419. }else {
  420. outnum++
  421. //不在两年内的也清标记
  422. updateExtract = append(updateExtract, []map[string]interface{}{
  423. map[string]interface{}{
  424. "_id": tmp["_id"],
  425. },
  426. map[string]interface{}{
  427. "$set": map[string]interface{}{
  428. "dataging": 0,
  429. },
  430. },
  431. })
  432. if len(updateExtract) > 200 {
  433. log.Println("不在周期内符合dataging==1,批量更新")
  434. mgo.UpSertBulk(extract, updateExtract...)
  435. updateExtract = [][]map[string]interface{}{}
  436. }
  437. }
  438. }
  439. updatelock.Unlock()
  440. tmp = make(map[string]interface{})
  441. }
  442. //批量更新标记
  443. updatelock.Lock()
  444. if len(updateExtract) > 0 {
  445. log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
  446. mgo.UpSertBulk(extract, updateExtract...)
  447. updateExtract = [][]map[string]interface{}{}
  448. }
  449. updatelock.Unlock()
  450. if len(dayArr)>0 {
  451. pendAllArr = append(pendAllArr,dayArr)
  452. dayArr = []map[string]interface{}{}
  453. }
  454. log.Println("查询数量:",num,"符合条件:",oknum)
  455. if len(pendAllArr) <= 0 {
  456. log.Println("没找到dataging==1的数据")
  457. }
  458. //测试分组数量是否正确
  459. testNum:=0
  460. for k,v:=range pendAllArr {
  461. log.Println("第",k,"组--","数量:",len(v))
  462. testNum = testNum+len(v)
  463. }
  464. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  465. n, repeateN := 0, 0
  466. log.Println("线程数:",threadNum)
  467. pool := make(chan bool, threadNum)
  468. wg := &sync.WaitGroup{}
  469. for k,v:=range pendAllArr { //每组结束更新一波数据
  470. pool <- true
  471. wg.Add(1)
  472. go func(k int, v []map[string]interface{}) {
  473. defer func() {
  474. <-pool
  475. wg.Done()
  476. }()
  477. //每组临时数组 - 互不干扰
  478. groupUpdateExtract := [][]map[string]interface{}{}
  479. //
  480. groupOtherExtract := [][]map[string]interface{}{}
  481. //构建当前组的数据池
  482. log.Println("构建第", k, "组---(数据池)")
  483. //当前组的第一个发布时间
  484. first_pt := util.Int64All(v[len(v)-1]["publishtime"])
  485. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  486. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  487. n = n + len(v)
  488. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  489. for _, tmp := range v {
  490. info := NewInfo(tmp)
  491. b, source, reason := curTM.check(info)
  492. if b { //有重复,生成更新语句,更新抽取和更新招标
  493. repeateN++
  494. //重复数据打标签
  495. repeat_ids:=source.repeat_ids
  496. repeat_ids = append(repeat_ids,info.id)
  497. source.repeat_ids = repeat_ids
  498. //替换数据池-更新
  499. DM.replacePoolData(source)
  500. updatelock.Lock()
  501. //更新数据源- 14 或者 15
  502. //判断是否在当前段落
  503. if judgeIsCurIds(gtid,lteid,source.id) {
  504. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
  505. map[string]interface{}{
  506. "_id": StringTOBsonId(source.id),
  507. },
  508. map[string]interface{}{
  509. "$set": map[string]interface{}{
  510. "repeat_ids": repeat_ids,
  511. },
  512. },
  513. })
  514. }else {
  515. groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
  516. map[string]interface{}{
  517. "_id": StringTOBsonId(source.id),
  518. },
  519. map[string]interface{}{
  520. "$set": map[string]interface{}{
  521. "repeat_ids": repeat_ids,
  522. },
  523. },
  524. })
  525. }
  526. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  527. map[string]interface{}{
  528. "_id": tmp["_id"],
  529. },
  530. map[string]interface{}{
  531. "$set": map[string]interface{}{
  532. "repeat": 1,
  533. "repeat_reason": reason,
  534. "repeat_id": source.id,
  535. "dataging": 0,
  536. },
  537. },
  538. })
  539. if len(groupUpdateExtract) > 200 {
  540. mgo.UpSertBulk(extract, groupUpdateExtract...)
  541. groupUpdateExtract = [][]map[string]interface{}{}
  542. }
  543. if len(groupOtherExtract) > 200 {
  544. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  545. groupOtherExtract = [][]map[string]interface{}{}
  546. }
  547. updatelock.Unlock()
  548. } else {
  549. updatelock.Lock()
  550. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  551. map[string]interface{}{
  552. "_id": tmp["_id"],
  553. },
  554. map[string]interface{}{
  555. "$set": map[string]interface{}{
  556. "dataging": 0, //符合条件的都为dataging==0
  557. },
  558. },
  559. })
  560. if len(groupUpdateExtract) > 200 {
  561. mgo.UpSertBulk(extract, groupUpdateExtract...)
  562. groupUpdateExtract = [][]map[string]interface{}{}
  563. }
  564. updatelock.Unlock()
  565. }
  566. }
  567. //每组数据结束-更新数据
  568. updatelock.Lock()
  569. if len(groupUpdateExtract) > 0 {
  570. mgo.UpSertBulk(extract, groupUpdateExtract...)
  571. }
  572. if len(groupOtherExtract) > 0 {
  573. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  574. }
  575. updatelock.Unlock()
  576. }(k, v)
  577. }
  578. wg.Wait()
  579. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  580. if n >= repeateN && gtid!=lteid{
  581. for _, to := range nextNode {
  582. next_sid := util.BsonIdToSId(gtid)
  583. next_eid := util.BsonIdToSId(lteid)
  584. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  585. by, _ := json.Marshal(map[string]interface{}{
  586. "gtid": next_sid,
  587. "lteid": next_eid,
  588. "stype": util.ObjToString(to["stype"]),
  589. "key": key,
  590. })
  591. addr := &net.UDPAddr{
  592. IP: net.ParseIP(to["addr"].(string)),
  593. Port: util.IntAll(to["port"]),
  594. }
  595. node := &udpNode{by, addr, time.Now().Unix(), 0}
  596. udptaskmap.Store(key, node)
  597. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  598. }
  599. }
  600. end:=time.Now().Unix()
  601. log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
  602. log.Println(gtid,lteid)
  603. if end-start<60*5 {
  604. log.Println("睡眠.............")
  605. time.Sleep(5 * time.Minute)
  606. }
  607. log.Println("继续下一段的历史判重")
  608. }
  609. }
  610. //判断是否在当前id段落
  611. func judgeIsCurIds (gtid string,lteid string,curid string) bool {
  612. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  613. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  614. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  615. if cur_time>gt_time&&cur_time<=lte_time {
  616. return true
  617. }
  618. return false
  619. }
  620. //迁移上一段数据
  621. func moveHistoryData(startid string,endid string) {
  622. sess := mgo.GetMgoConn()
  623. defer mgo.DestoryMongoConn(sess)
  624. year, month, day := time.Now().Date()
  625. q := map[string]interface{}{
  626. "_id": map[string]interface{}{
  627. "$gt": StringTOBsonId(startid),
  628. "$lte": StringTOBsonId(endid),
  629. },
  630. }
  631. log.Println(q)
  632. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  633. index := 0
  634. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  635. mgo.Save(extract_back, tmp)
  636. tmp = map[string]interface{}{}
  637. if index%1000 == 0 {
  638. log.Println("index", index)
  639. }
  640. }
  641. log.Println("save to", extract_back, " ok index", index)
  642. qv := map[string]interface{}{
  643. "comeintime": map[string]interface{}{
  644. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
  645. },
  646. }
  647. delnum := mgo.Delete(extract, qv)
  648. log.Println("remove from ", extract, delnum)
  649. }
  650. func moveTimeoutData() {
  651. log.Println("部署迁移定时任务")
  652. c := cron.New()
  653. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  654. c.Start()
  655. }
  656. func moveOnceTimeOut() {
  657. log.Println("执行一次迁移超时数据")
  658. sess := mgo.GetMgoConn()
  659. defer mgo.DestoryMongoConn(sess)
  660. now:=time.Now()
  661. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  662. task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  663. q := map[string]interface{}{
  664. "_id": map[string]interface{}{
  665. "$lt": StringTOBsonId(task_id),
  666. },
  667. }
  668. it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
  669. index := 0
  670. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  671. if index%10000 == 0 {
  672. log.Println("index", index)
  673. }
  674. del_id:=BsonTOStringId(tmp["_id"])
  675. mgo.Save("result_20200713", tmp)
  676. mgo.DeleteById("result_20200714",del_id)
  677. tmp = map[string]interface{}{}
  678. }
  679. log.Println("save and delete", " ok index", index)
  680. }