main.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "log"
  10. mu "mfw/util"
  11. "net"
  12. "os"
  13. "qfw/util"
  14. "regexp"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var (
  20. Sysconfig map[string]interface{} //配置文件
  21. mconf map[string]interface{} //mongodb配置信息
  22. mgo *MongodbSim //mongodb操作对象
  23. extract string
  24. extract_back string
  25. udpclient mu.UdpClient //udp对象
  26. nextNode []map[string]interface{} //下节点数组
  27. dupdays = 5 //初始化判重范围
  28. DM *datamap //
  29. //正则筛选相关
  30. FilterRegTitle = regexp.MustCompile("^_$")
  31. FilterRegTitle_0 = regexp.MustCompile("^_$")
  32. FilterRegTitle_1 = regexp.MustCompile("^_$")
  33. FilterRegTitle_2 = regexp.MustCompile("^_$")
  34. isMerger bool //是否合并
  35. threadNum int //线程数量
  36. SiteMap map[string]map[string]interface{} //站点map
  37. LowHeavy bool //低质量数据判重
  38. TimingTask bool //是否定时任务
  39. timingSpanDay int64 //时间跨度
  40. timingPubScope int64 //发布时间周期
  41. gtid,lteid,lastid string
  42. IdType bool //默认object类型
  43. )
  44. func init() {
  45. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  46. flag.StringVar(&gtid, "gtid", "", "历史的起始id")
  47. flag.Parse()
  48. //172.17.145.163:27080
  49. util.ReadConfig(&Sysconfig)
  50. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  51. mconf = Sysconfig["mongodb"].(map[string]interface{})
  52. mgo = &MongodbSim{
  53. MongodbAddr: mconf["addr"].(string),
  54. DbName: mconf["db"].(string),
  55. Size: util.IntAllDef(mconf["pool"], 10),
  56. }
  57. mgo.InitPool()
  58. extract = mconf["extract"].(string)
  59. extract_back = mconf["extract_back"].(string)
  60. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  61. //加载数据
  62. DM = NewDatamap(dupdays, lastid)
  63. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  64. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  65. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  66. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  67. isMerger = Sysconfig["isMerger"].(bool)
  68. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  69. LowHeavy = Sysconfig["lowHeavy"].(bool)
  70. TimingTask = Sysconfig["timingTask"].(bool)
  71. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  72. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  73. //站点配置
  74. site := mconf["site"].(map[string]interface{})
  75. SiteMap = make(map[string]map[string]interface{}, 0)
  76. start := int(time.Now().Unix())
  77. sess_site := mgo.GetMgoConn()
  78. defer mgo.DestoryMongoConn(sess_site)
  79. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  80. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  81. data_map := map[string]interface{}{
  82. "area": util.ObjToString(site_dict["area"]),
  83. "city": util.ObjToString(site_dict["city"]),
  84. "district": util.ObjToString(site_dict["district"]),
  85. "sitetype": util.ObjToString(site_dict["sitetype"]),
  86. "level": util.ObjToString(site_dict["level"]),
  87. "weight": util.ObjToString(site_dict["weight"]),
  88. }
  89. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  90. }
  91. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  92. }
  93. func main() {
  94. go checkMapJob()
  95. updport := Sysconfig["udpport"].(string)
  96. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  97. udpclient.Listen(processUdpMsg)
  98. log.Println("Udp服务监听", updport)
  99. if TimingTask {
  100. go historyTaskDay()
  101. }
  102. time.Sleep(99999 * time.Hour)
  103. }
  104. //测试组人员使用
  105. func mainT() {
  106. testRepairData11()
  107. return
  108. if TimingTask {
  109. log.Println("新历史任务测试开始")
  110. go historyTaskDay()
  111. //go timedTaskDay()
  112. time.Sleep(99999 * time.Hour)
  113. } else {
  114. //IdType = true //打开id字符串模式
  115. sid := "4f16936d52c1d9fbf843c60e"
  116. eid := "6f16936d52c1d9fbf843c60e"
  117. log.Println("正常判重测试开始")
  118. log.Println(sid, "---", eid)
  119. mapinfo := map[string]interface{}{}
  120. if sid == "" || eid == "" {
  121. log.Println("sid,eid参数不能为空")
  122. os.Exit(0)
  123. }
  124. mapinfo["gtid"] = sid
  125. mapinfo["lteid"] = eid
  126. mapinfo["stop"] = "true"
  127. task([]byte{}, mapinfo)
  128. time.Sleep(99999 * time.Hour)
  129. }
  130. }
  131. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  132. fmt.Println("接受的段数据")
  133. switch act {
  134. case mu.OP_TYPE_DATA: //上个节点的数据
  135. //从表中开始处理
  136. var mapInfo map[string]interface{}
  137. err := json.Unmarshal(data, &mapInfo)
  138. log.Println("err:", err, "mapInfo:", mapInfo)
  139. if err != nil {
  140. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  141. } else if mapInfo != nil {
  142. taskType := util.ObjToString(mapInfo["stype"])
  143. if taskType == "normalTask" {
  144. //判重流程
  145. go task(data, mapInfo)
  146. } else {
  147. //其他
  148. go task(data, mapInfo)
  149. }
  150. key, _ := mapInfo["key"].(string)
  151. if key == "" {
  152. key = "udpok"
  153. }
  154. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  155. }
  156. case mu.OP_NOOP: //下个节点回应
  157. ok := string(data)
  158. if ok != "" {
  159. log.Println("ok:", ok)
  160. udptaskmap.Delete(ok)
  161. }
  162. }
  163. }
  164. //开始判重程序
  165. func task(data []byte, mapInfo map[string]interface{}) {
  166. log.Println("开始数据判重")
  167. defer util.Catch()
  168. //区间id
  169. q := map[string]interface{}{
  170. "_id": map[string]interface{}{
  171. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  172. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  173. },
  174. }
  175. if IdType {
  176. q = map[string]interface{}{
  177. "_id": map[string]interface{}{
  178. "$gt": mapInfo["gtid"].(string),
  179. "$lte": mapInfo["lteid"].(string),
  180. },
  181. }
  182. }
  183. log.Println(mgo.DbName, extract, q)
  184. sess := mgo.GetMgoConn()
  185. defer mgo.DestoryMongoConn(sess)
  186. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  187. updateExtract := [][]map[string]interface{}{}
  188. pool := make(chan bool, threadNum)
  189. wg := &sync.WaitGroup{}
  190. n, repeateN := 0, 0
  191. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  192. if n%10000 == 0 {
  193. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  194. }
  195. source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
  196. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  197. repeateN++
  198. updateExtract = append(updateExtract, []map[string]interface{}{
  199. map[string]interface{}{
  200. "_id": tmp["_id"],
  201. },
  202. map[string]interface{}{
  203. "$set": map[string]interface{}{
  204. "repeat": 1,
  205. "dataging":0,
  206. "repeat_reason": "sourcewebsite为1,重复",
  207. },
  208. },
  209. })
  210. if len(updateExtract) >= 200 {
  211. mgo.UpSertBulk(extract, updateExtract...)
  212. updateExtract = [][]map[string]interface{}{}
  213. }
  214. tmp = make(map[string]interface{})
  215. continue
  216. }
  217. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
  218. util.IntAll(tmp["dataging"]) == 1 {
  219. if util.IntAll(tmp["repeat"]) == 1 {
  220. repeateN++
  221. }
  222. tmp = make(map[string]interface{})
  223. continue
  224. }
  225. pool <- true
  226. wg.Add(1)
  227. go func(tmp map[string]interface{}) {
  228. defer func() {
  229. <-pool
  230. wg.Done()
  231. }()
  232. info := NewInfo(tmp)
  233. if !LowHeavy { //是否进行低质量数据判重
  234. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  235. updateExtract = append(updateExtract, []map[string]interface{}{
  236. map[string]interface{}{
  237. "_id": tmp["_id"],
  238. },
  239. map[string]interface{}{
  240. "$set": map[string]interface{}{
  241. "repeat": -1, //无效数据标签
  242. },
  243. },
  244. })
  245. if len(updateExtract) >= 200 {
  246. mgo.UpSertBulk(extract, updateExtract...)
  247. updateExtract = [][]map[string]interface{}{}
  248. }
  249. return
  250. }
  251. }
  252. //正常判重
  253. b, source, reason := DM.check(info)
  254. if b { //有重复,生成更新语句,更新抽取和更新招标
  255. repeateN++
  256. var updateID = map[string]interface{}{} //记录更新判重的
  257. updateID["_id"] = StringTOBsonId(info.id)
  258. if IdType {
  259. updateID["_id"] = info.id
  260. }
  261. updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
  262. updateID,
  263. map[string]interface{}{
  264. "$set": map[string]interface{}{
  265. "repeat": 1,
  266. "repeat_reason": reason,
  267. "repeat_id": source.id,
  268. },
  269. },
  270. })
  271. //是否合并-低质量数据不合并
  272. if isMerger && !strings.Contains(reason,"低质量"){
  273. newData, update_map ,isReplace := mergeDataFields(source, info)
  274. if isReplace {//替换-数据池
  275. fmt.Println("合并更新的id:",source.id)
  276. //数据池 - 替换
  277. DM.replacePoolData(newData)
  278. //mongo更新 - 具体字段 - merge
  279. mgo.UpdateById(extract,source.id,update_map)
  280. //发udp 更新索引
  281. //for _, to := range nextNode {
  282. // key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
  283. // by, _ := json.Marshal(map[string]interface{}{
  284. // "gtid": source.id,
  285. // "lteid": source.id,
  286. // "stype": "biddingall",
  287. // "key": key,
  288. // })
  289. // addr := &net.UDPAddr{
  290. // IP: net.ParseIP(to["addr"].(string)),
  291. // Port: util.IntAll(to["port"]),
  292. // }
  293. // node := &udpNode{by, addr, time.Now().Unix(), 0}
  294. // udptaskmap.Store(key, node)
  295. // udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  296. //}
  297. }
  298. }
  299. }
  300. }(tmp)
  301. if len(updateExtract) >= 200 {
  302. mgo.UpSertBulk(extract, updateExtract...)
  303. updateExtract = [][]map[string]interface{}{}
  304. }
  305. tmp = make(map[string]interface{})
  306. }
  307. wg.Wait()
  308. if len(updateExtract) > 0 {
  309. mgo.UpSertBulk(extract, updateExtract...)
  310. }
  311. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  312. time.Sleep(60 * time.Second)
  313. //任务完成,开始发送广播通知下面节点
  314. if n > repeateN && mapInfo["stop"] == nil {
  315. log.Println("判重任务完成发送udp")
  316. for _, to := range nextNode {
  317. sid, _ := mapInfo["gtid"].(string)
  318. eid, _ := mapInfo["lteid"].(string)
  319. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  320. by, _ := json.Marshal(map[string]interface{}{
  321. "gtid": sid,
  322. "lteid": eid,
  323. "stype": util.ObjToString(to["stype"]),
  324. "key": key,
  325. })
  326. addr := &net.UDPAddr{
  327. IP: net.ParseIP(to["addr"].(string)),
  328. Port: util.IntAll(to["port"]),
  329. }
  330. node := &udpNode{by, addr, time.Now().Unix(), 0}
  331. udptaskmap.Store(key, node)
  332. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  333. }
  334. }
  335. }
  336. func historyTaskDay() {
  337. defer util.Catch()
  338. for {
  339. start:=time.Now().Unix()
  340. if gtid=="" {
  341. log.Println("请传gtid,否则无法运行")
  342. os.Exit(0)
  343. return
  344. }
  345. if lteid!="" {
  346. //先进行数据迁移
  347. log.Println("开启一次迁移任务",gtid,lteid)
  348. moveHistoryData(gtid,lteid)
  349. gtid = lteid //替换数据
  350. }
  351. //查询表最后一个id
  352. sess := mgo.GetMgoConn()
  353. defer mgo.DestoryMongoConn(sess)
  354. q:=map[string]interface{}{}
  355. between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
  356. it_last := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("-_id").Iter()
  357. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  358. lteid = BsonTOStringId(tmp["_id"])
  359. break
  360. }
  361. //开始判重
  362. q = map[string]interface{}{
  363. "_id": map[string]interface{}{
  364. "$gt": StringTOBsonId(gtid),
  365. "$lte": StringTOBsonId(lteid),
  366. },
  367. }
  368. log.Println("历史判重查询条件:",q,"时间:", between_time)
  369. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  370. num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数
  371. updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
  372. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  373. dayArr := []map[string]interface{}{}
  374. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  375. if num%10000 == 0 {
  376. log.Println("正序遍历:", num)
  377. }
  378. source := util.ObjToMap(tmp["jsondata"])
  379. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  380. updateExtract = append(updateExtract, []map[string]interface{}{
  381. map[string]interface{}{
  382. "_id": tmp["_id"],
  383. },
  384. map[string]interface{}{
  385. "$set": map[string]interface{}{
  386. "repeat": 1,
  387. "dataging": 0,
  388. "repeat_reason": "sourcewebsite为1 重复",
  389. },
  390. },
  391. })
  392. if len(updateExtract) > 50 {
  393. mgo.UpSertBulk(extract, updateExtract...)
  394. updateExtract = [][]map[string]interface{}{}
  395. }
  396. tmp = make(map[string]interface{})
  397. continue
  398. }
  399. //取-符合-发布时间X年内的数据
  400. if util.IntAll(tmp["dataging"]) == 1 {
  401. pubtime := util.Int64All(tmp["publishtime"])
  402. if pubtime > 0 && pubtime >= between_time {
  403. oknum++
  404. if deterTime==0 {
  405. log.Println("找到第一条符合条件的数据")
  406. deterTime = util.Int64All(tmp["publishtime"])
  407. dayArr = append(dayArr,tmp)
  408. }else {
  409. if pubtime-deterTime >timingSpanDay*86400 {
  410. //新数组重新构建,当前组数据加到全部组数据
  411. pendAllArr = append(pendAllArr,dayArr)
  412. dayArr = []map[string]interface{}{}
  413. deterTime = util.Int64All(tmp["publishtime"])
  414. dayArr = append(dayArr,tmp)
  415. }else {
  416. dayArr = append(dayArr,tmp)
  417. }
  418. }
  419. }else {
  420. //不在两年内的也清标记
  421. updateExtract = append(updateExtract, []map[string]interface{}{
  422. map[string]interface{}{
  423. "_id": tmp["_id"],
  424. },
  425. map[string]interface{}{
  426. "$set": map[string]interface{}{
  427. "dataging": 0,
  428. },
  429. },
  430. })
  431. if len(updateExtract) > 50 {
  432. mgo.UpSertBulk(extract, updateExtract...)
  433. updateExtract = [][]map[string]interface{}{}
  434. }
  435. }
  436. }
  437. tmp = make(map[string]interface{})
  438. }
  439. //批量更新标记
  440. if len(updateExtract) > 0 {
  441. mgo.UpSertBulk(extract, updateExtract...)
  442. updateExtract = [][]map[string]interface{}{}
  443. }
  444. if len(dayArr)>0 {
  445. pendAllArr = append(pendAllArr,dayArr)
  446. dayArr = []map[string]interface{}{}
  447. }
  448. log.Println("查询数量:",num,"符合条件:",oknum)
  449. if len(pendAllArr) <= 0 {
  450. log.Println("没找到dataging==1的数据")
  451. }
  452. //测试分组数量是否正确
  453. testNum:=0
  454. for k,v:=range pendAllArr {
  455. log.Println("第",k,"组--","数量:",len(v))
  456. testNum = testNum+len(v)
  457. }
  458. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  459. n, repeateN := 0, 0
  460. log.Println("线程数:",threadNum)
  461. pool := make(chan bool, threadNum)
  462. wg := &sync.WaitGroup{}
  463. for k,v:=range pendAllArr { //每组结束更新一波数据
  464. pool <- true
  465. wg.Add(1)
  466. go func(k int, v []map[string]interface{}) {
  467. defer func() {
  468. <-pool
  469. wg.Done()
  470. }()
  471. //每组临时数组 - 互不干扰
  472. groupUpdateExtract := [][]map[string]interface{}{}
  473. //构建当前组的数据池
  474. log.Println("构建第", k, "组---(数据池)")
  475. //当前组的第一个发布时间
  476. first_pt := util.Int64All(v[len(v)-1]["publishtime"])
  477. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  478. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  479. n = n + len(v)
  480. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  481. for _, tmp := range v {
  482. info := NewInfo(tmp)
  483. if !LowHeavy { //是否进行低质量数据判重
  484. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  485. log.Println("无效数据")
  486. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  487. map[string]interface{}{
  488. "_id": tmp["_id"],
  489. },
  490. map[string]interface{}{
  491. "$set": map[string]interface{}{
  492. "repeat": -1, //无效数据标签
  493. "dataging": 0,
  494. },
  495. },
  496. })
  497. if len(groupUpdateExtract) > 50 {
  498. mgo.UpSertBulk(extract, groupUpdateExtract...)
  499. groupUpdateExtract = [][]map[string]interface{}{}
  500. }
  501. return
  502. }
  503. }
  504. b, source, reason := curTM.check(info)
  505. if b { //有重复,生成更新语句,更新抽取和更新招标
  506. repeateN++
  507. //重复数据打标签
  508. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  509. map[string]interface{}{
  510. "_id": tmp["_id"],
  511. },
  512. map[string]interface{}{
  513. "$set": map[string]interface{}{
  514. "repeat": 1,
  515. "repeat_reason": reason,
  516. "repeat_id": source.id,
  517. "dataging": 0,
  518. },
  519. },
  520. })
  521. } else {
  522. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  523. map[string]interface{}{
  524. "_id": tmp["_id"],
  525. },
  526. map[string]interface{}{
  527. "$set": map[string]interface{}{
  528. "dataging": 0, //符合条件的都为dataging==0
  529. },
  530. },
  531. })
  532. }
  533. if len(groupUpdateExtract) > 50 {
  534. mgo.UpSertBulk(extract, groupUpdateExtract...)
  535. groupUpdateExtract = [][]map[string]interface{}{}
  536. }
  537. }
  538. //每组数据结束-更新数据
  539. if len(groupUpdateExtract) > 0 {
  540. mgo.UpSertBulk(extract, groupUpdateExtract...)
  541. }
  542. }(k, v)
  543. }
  544. wg.Wait()
  545. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  546. if n > repeateN {
  547. for _, to := range nextNode {
  548. next_sid := util.BsonIdToSId(gtid)
  549. next_eid := util.BsonIdToSId(lteid)
  550. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  551. by, _ := json.Marshal(map[string]interface{}{
  552. "gtid": next_sid,
  553. "lteid": next_eid,
  554. "stype": util.ObjToString(to["stype"]),
  555. "key": key,
  556. })
  557. addr := &net.UDPAddr{
  558. IP: net.ParseIP(to["addr"].(string)),
  559. Port: util.IntAll(to["port"]),
  560. }
  561. node := &udpNode{by, addr, time.Now().Unix(), 0}
  562. udptaskmap.Store(key, node)
  563. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  564. }
  565. }
  566. end:=time.Now().Unix()
  567. log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
  568. log.Println(gtid,lteid)
  569. if end-start<60*5 {
  570. log.Println("睡眠.............")
  571. time.Sleep(5 * time.Minute)
  572. }
  573. log.Println("继续下一段的历史判重")
  574. }
  575. }
  576. //迁移上一段数据
  577. func moveHistoryData(startid string,endid string) {
  578. sess := mgo.GetMgoConn()
  579. defer mgo.DestoryMongoConn(sess)
  580. year, month, day := time.Now().Date()
  581. q := map[string]interface{}{
  582. "_id": map[string]interface{}{
  583. "$gt": StringTOBsonId(startid),
  584. "$lte": StringTOBsonId(endid),
  585. },
  586. }
  587. log.Println(q)
  588. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  589. index := 0
  590. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  591. mgo.Save(extract_back, tmp)
  592. tmp = map[string]interface{}{}
  593. if index%1000 == 0 {
  594. log.Println("index", index)
  595. }
  596. }
  597. log.Println("save to", extract_back, " ok index", index)
  598. qv := map[string]interface{}{
  599. "comeintime": map[string]interface{}{
  600. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(),
  601. },
  602. }
  603. delnum := mgo.Delete(extract, qv)
  604. log.Println("remove from ", extract, delnum)
  605. }