main.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "fmt"
  9. "github.com/cron"
  10. "gopkg.in/mgo.v2/bson"
  11. "log"
  12. mu "mfw/util"
  13. "net"
  14. "os"
  15. "qfw/util"
  16. "regexp"
  17. "strings"
  18. "sync"
  19. "time"
  20. )
  21. var (
  22. Sysconfig map[string]interface{} //配置文件
  23. mconf map[string]interface{} //mongodb配置信息
  24. mgo *MongodbSim //mongodb操作对象
  25. extract string
  26. extract_back string
  27. udpclient mu.UdpClient //udp对象
  28. nextNode []map[string]interface{} //下节点数组
  29. dupdays = 5 //初始化判重范围
  30. DM *datamap //
  31. //正则筛选相关
  32. FilterRegTitle = regexp.MustCompile("^_$")
  33. FilterRegTitle_0 = regexp.MustCompile("^_$")
  34. FilterRegTitle_1 = regexp.MustCompile("^_$")
  35. FilterRegTitle_2 = regexp.MustCompile("^_$")
  36. isMerger bool //是否合并
  37. threadNum int //线程数量
  38. SiteMap map[string]map[string]interface{} //站点map
  39. LowHeavy bool //低质量数据判重
  40. TimingTask bool //是否定时任务
  41. timingSpanDay int64 //时间跨度
  42. timingPubScope int64 //发布时间周期
  43. gtid,lteid,lastid string
  44. IdType bool //默认object类型
  45. updatelock sync.Mutex //锁
  46. )
  47. func init() {
  48. flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据
  49. flag.StringVar(&gtid, "gtid", "", "历史的起始id")
  50. flag.Parse()
  51. //172.17.145.163:27080
  52. util.ReadConfig(&Sysconfig)
  53. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  54. mconf = Sysconfig["mongodb"].(map[string]interface{})
  55. mgo = &MongodbSim{
  56. MongodbAddr: mconf["addr"].(string),
  57. DbName: mconf["db"].(string),
  58. Size: util.IntAllDef(mconf["pool"], 10),
  59. }
  60. mgo.InitPool()
  61. extract = mconf["extract"].(string)
  62. extract_back = mconf["extract_back"].(string)
  63. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  64. //加载数据
  65. DM = NewDatamap(dupdays, lastid)
  66. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  67. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  68. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  69. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  70. isMerger = Sysconfig["isMerger"].(bool)
  71. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  72. LowHeavy = Sysconfig["lowHeavy"].(bool)
  73. TimingTask = Sysconfig["timingTask"].(bool)
  74. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  75. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  76. //站点配置
  77. site := mconf["site"].(map[string]interface{})
  78. SiteMap = make(map[string]map[string]interface{}, 0)
  79. start := int(time.Now().Unix())
  80. sess_site := mgo.GetMgoConn()
  81. defer mgo.DestoryMongoConn(sess_site)
  82. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  83. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  84. data_map := map[string]interface{}{
  85. "area": util.ObjToString(site_dict["area"]),
  86. "city": util.ObjToString(site_dict["city"]),
  87. "district": util.ObjToString(site_dict["district"]),
  88. "sitetype": util.ObjToString(site_dict["sitetype"]),
  89. "level": util.ObjToString(site_dict["level"]),
  90. "weight": util.ObjToString(site_dict["weight"]),
  91. }
  92. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  93. }
  94. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  95. }
  96. func main() {
  97. go checkMapJob()
  98. updport := Sysconfig["udpport"].(string)
  99. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  100. udpclient.Listen(processUdpMsg)
  101. log.Println("Udp服务监听", updport)
  102. if TimingTask {
  103. go historyTaskDay()
  104. }
  105. time.Sleep(99999 * time.Hour)
  106. }
  107. //测试组人员使用
  108. func mainT() {
  109. //testRepairData11()
  110. //return
  111. if TimingTask {
  112. log.Println("新历史任务测试开始")
  113. go historyTaskDay()
  114. time.Sleep(99999 * time.Hour)
  115. } else {
  116. //IdType = true //打开id字符串模式
  117. sid := "4f16936d52c1d9fbf843c60e"
  118. eid := "6f16936d52c1d9fbf843c60e"
  119. log.Println("正常判重测试开始")
  120. log.Println(sid, "---", eid)
  121. mapinfo := map[string]interface{}{}
  122. if sid == "" || eid == "" {
  123. log.Println("sid,eid参数不能为空")
  124. os.Exit(0)
  125. }
  126. mapinfo["gtid"] = sid
  127. mapinfo["lteid"] = eid
  128. mapinfo["stop"] = "true"
  129. task([]byte{}, mapinfo)
  130. time.Sleep(99999 * time.Hour)
  131. }
  132. }
  133. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  134. fmt.Println("接受的段数据")
  135. switch act {
  136. case mu.OP_TYPE_DATA: //上个节点的数据
  137. //从表中开始处理
  138. var mapInfo map[string]interface{}
  139. err := json.Unmarshal(data, &mapInfo)
  140. log.Println("err:", err, "mapInfo:", mapInfo)
  141. if err != nil {
  142. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  143. } else if mapInfo != nil {
  144. taskType := util.ObjToString(mapInfo["stype"])
  145. if taskType == "normalTask" {
  146. //判重流程
  147. go task(data, mapInfo)
  148. } else {
  149. //其他
  150. go task(data, mapInfo)
  151. }
  152. key, _ := mapInfo["key"].(string)
  153. if key == "" {
  154. key = "udpok"
  155. }
  156. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  157. }
  158. case mu.OP_NOOP: //下个节点回应
  159. ok := string(data)
  160. if ok != "" {
  161. log.Println("ok:", ok)
  162. udptaskmap.Delete(ok)
  163. }
  164. }
  165. }
  166. //开始判重程序
  167. func task(data []byte, mapInfo map[string]interface{}) {
  168. log.Println("开始数据判重")
  169. defer util.Catch()
  170. //区间id
  171. q := map[string]interface{}{
  172. "_id": map[string]interface{}{
  173. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  174. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  175. },
  176. }
  177. if IdType {
  178. q = map[string]interface{}{
  179. "_id": map[string]interface{}{
  180. "$gt": mapInfo["gtid"].(string),
  181. "$lte": mapInfo["lteid"].(string),
  182. },
  183. }
  184. }
  185. log.Println(mgo.DbName, extract, q)
  186. sess := mgo.GetMgoConn()
  187. defer mgo.DestoryMongoConn(sess)
  188. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  189. updateExtract := [][]map[string]interface{}{}
  190. pool := make(chan bool, threadNum)
  191. wg := &sync.WaitGroup{}
  192. n, repeateN := 0, 0
  193. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  194. if n%10000 == 0 {
  195. log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
  196. }
  197. source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
  198. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  199. repeateN++
  200. updateExtract = append(updateExtract, []map[string]interface{}{
  201. map[string]interface{}{
  202. "_id": tmp["_id"],
  203. },
  204. map[string]interface{}{
  205. "$set": map[string]interface{}{
  206. "repeat": 1,
  207. "dataging":0,
  208. "repeat_reason": "sourcewebsite为1,重复",
  209. },
  210. },
  211. })
  212. if len(updateExtract) >= 200 {
  213. mgo.UpSertBulk(extract, updateExtract...)
  214. updateExtract = [][]map[string]interface{}{}
  215. }
  216. tmp = make(map[string]interface{})
  217. continue
  218. }
  219. if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
  220. util.IntAll(tmp["dataging"]) == 1 {
  221. if util.IntAll(tmp["repeat"]) == 1 {
  222. repeateN++
  223. }
  224. tmp = make(map[string]interface{})
  225. continue
  226. }
  227. pool <- true
  228. wg.Add(1)
  229. go func(tmp map[string]interface{}) {
  230. defer func() {
  231. <-pool
  232. wg.Done()
  233. }()
  234. info := NewInfo(tmp)
  235. if !LowHeavy { //是否进行低质量数据判重
  236. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  237. updateExtract = append(updateExtract, []map[string]interface{}{
  238. map[string]interface{}{
  239. "_id": tmp["_id"],
  240. },
  241. map[string]interface{}{
  242. "$set": map[string]interface{}{
  243. "repeat": -1, //无效数据标签
  244. },
  245. },
  246. })
  247. if len(updateExtract) >= 200 {
  248. mgo.UpSertBulk(extract, updateExtract...)
  249. updateExtract = [][]map[string]interface{}{}
  250. }
  251. return
  252. }
  253. }
  254. //正常判重
  255. b, source, reason := DM.check(info)
  256. if b { //有重复,生成更新语句,更新抽取和更新招标
  257. repeateN++
  258. var updateID = map[string]interface{}{} //记录更新判重的
  259. updateID["_id"] = StringTOBsonId(info.id)
  260. if IdType {
  261. updateID["_id"] = info.id
  262. }
  263. repeat_ids:=source.repeat_ids
  264. repeat_ids = append(repeat_ids,info.id)
  265. source.repeat_ids = repeat_ids
  266. //替换数据池-更新
  267. DM.replacePoolData(source)
  268. updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
  269. map[string]interface{}{
  270. "_id": StringTOBsonId(source.id),
  271. },
  272. map[string]interface{}{
  273. "$set": map[string]interface{}{
  274. "repeat_ids": repeat_ids,
  275. },
  276. },
  277. })
  278. updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
  279. updateID,
  280. map[string]interface{}{
  281. "$set": map[string]interface{}{
  282. "repeat": 1,
  283. "repeat_reason": reason,
  284. "repeat_id": source.id,
  285. },
  286. },
  287. })
  288. //是否合并-低质量数据不合并
  289. if isMerger && !strings.Contains(reason,"低质量"){
  290. newData, update_map ,isReplace := mergeDataFields(source, info)
  291. if isReplace {//替换-数据池
  292. fmt.Println("合并更新的id:",source.id)
  293. //数据池 - 替换
  294. DM.replacePoolData(newData)
  295. //mongo更新 - 具体字段 - merge
  296. mgo.UpdateById(extract,source.id,update_map)
  297. //发udp 更新索引
  298. //for _, to := range nextNode {
  299. // key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
  300. // by, _ := json.Marshal(map[string]interface{}{
  301. // "gtid": source.id,
  302. // "lteid": source.id,
  303. // "stype": "biddingall",
  304. // "key": key,
  305. // })
  306. // addr := &net.UDPAddr{
  307. // IP: net.ParseIP(to["addr"].(string)),
  308. // Port: util.IntAll(to["port"]),
  309. // }
  310. // node := &udpNode{by, addr, time.Now().Unix(), 0}
  311. // udptaskmap.Store(key, node)
  312. // udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  313. //}
  314. }
  315. }
  316. }
  317. }(tmp)
  318. if len(updateExtract) >= 200 {
  319. mgo.UpSertBulk(extract, updateExtract...)
  320. updateExtract = [][]map[string]interface{}{}
  321. }
  322. tmp = make(map[string]interface{})
  323. }
  324. wg.Wait()
  325. if len(updateExtract) > 0 {
  326. mgo.UpSertBulk(extract, updateExtract...)
  327. }
  328. log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  329. time.Sleep(60 * time.Second)
  330. //任务完成,开始发送广播通知下面节点
  331. if n >= repeateN && mapInfo["stop"] == nil {
  332. log.Println("判重任务完成发送udp")
  333. for _, to := range nextNode {
  334. sid, _ := mapInfo["gtid"].(string)
  335. eid, _ := mapInfo["lteid"].(string)
  336. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  337. by, _ := json.Marshal(map[string]interface{}{
  338. "gtid": sid,
  339. "lteid": eid,
  340. "stype": util.ObjToString(to["stype"]),
  341. "key": key,
  342. })
  343. addr := &net.UDPAddr{
  344. IP: net.ParseIP(to["addr"].(string)),
  345. Port: util.IntAll(to["port"]),
  346. }
  347. node := &udpNode{by, addr, time.Now().Unix(), 0}
  348. udptaskmap.Store(key, node)
  349. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  350. }
  351. }
  352. }
  353. func historyTaskDay() {
  354. defer util.Catch()
  355. for {
  356. start:=time.Now().Unix()
  357. if gtid=="" {
  358. log.Println("请传gtid,否则无法运行")
  359. os.Exit(0)
  360. return
  361. }
  362. if lteid!="" {
  363. //先进行数据迁移
  364. log.Println("开启一次迁移任务",gtid,lteid)
  365. moveHistoryData(gtid,lteid)
  366. gtid = lteid //替换数据
  367. }
  368. //查询表最后一个id
  369. sess := mgo.GetMgoConn()
  370. defer mgo.DestoryMongoConn(sess)
  371. q:=map[string]interface{}{}
  372. between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
  373. it_last := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("-_id").Iter()
  374. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  375. lteid = BsonTOStringId(tmp["_id"])
  376. break
  377. }
  378. time.Sleep(5 * time.Minute)
  379. //开始判重
  380. q = map[string]interface{}{
  381. "_id": map[string]interface{}{
  382. "$gt": StringTOBsonId(gtid),
  383. "$lte": StringTOBsonId(lteid),
  384. },
  385. }
  386. log.Println("历史判重查询条件:",q,"时间:", between_time)
  387. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  388. num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
  389. updateExtract := [][]map[string]interface{}{}//批量更新mongo数组
  390. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  391. dayArr := []map[string]interface{}{}
  392. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  393. if num%10000 == 0 {
  394. log.Println("正序遍历:", num)
  395. }
  396. source := util.ObjToMap(tmp["jsondata"])
  397. if util.IntAll((*source)["sourcewebsite"]) == 1 {
  398. outnum++
  399. updatelock.Lock()
  400. updateExtract = append(updateExtract, []map[string]interface{}{
  401. map[string]interface{}{
  402. "_id": tmp["_id"],
  403. },
  404. map[string]interface{}{
  405. "$set": map[string]interface{}{
  406. "repeat": 1,
  407. "dataging": 0,
  408. "repeat_reason": "sourcewebsite为1 重复",
  409. },
  410. },
  411. })
  412. if len(updateExtract) > 200 {
  413. log.Println("sourcewebsite,批量更新")
  414. mgo.UpSertBulk(extract, updateExtract...)
  415. updateExtract = [][]map[string]interface{}{}
  416. }
  417. updatelock.Unlock()
  418. tmp = make(map[string]interface{})
  419. continue
  420. }
  421. //取-符合-发布时间X年内的数据
  422. updatelock.Lock()
  423. if util.IntAll(tmp["dataging"]) == 1 {
  424. pubtime := util.Int64All(tmp["publishtime"])
  425. if pubtime > 0 && pubtime >= between_time {
  426. oknum++
  427. if deterTime==0 {
  428. log.Println("找到第一条符合条件的数据")
  429. deterTime = util.Int64All(tmp["publishtime"])
  430. dayArr = append(dayArr,tmp)
  431. }else {
  432. if pubtime-deterTime >timingSpanDay*86400 {
  433. //新数组重新构建,当前组数据加到全部组数据
  434. pendAllArr = append(pendAllArr,dayArr)
  435. dayArr = []map[string]interface{}{}
  436. deterTime = util.Int64All(tmp["publishtime"])
  437. dayArr = append(dayArr,tmp)
  438. }else {
  439. dayArr = append(dayArr,tmp)
  440. }
  441. }
  442. }else {
  443. outnum++
  444. //不在两年内的也清标记
  445. updateExtract = append(updateExtract, []map[string]interface{}{
  446. map[string]interface{}{
  447. "_id": tmp["_id"],
  448. },
  449. map[string]interface{}{
  450. "$set": map[string]interface{}{
  451. "dataging": 0,
  452. },
  453. },
  454. })
  455. if len(updateExtract) > 200 {
  456. log.Println("不在周期内符合dataging==1,批量更新")
  457. mgo.UpSertBulk(extract, updateExtract...)
  458. updateExtract = [][]map[string]interface{}{}
  459. }
  460. }
  461. }
  462. updatelock.Unlock()
  463. tmp = make(map[string]interface{})
  464. }
  465. //批量更新标记
  466. updatelock.Lock()
  467. if len(updateExtract) > 0 {
  468. log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum)
  469. mgo.UpSertBulk(extract, updateExtract...)
  470. updateExtract = [][]map[string]interface{}{}
  471. }
  472. updatelock.Unlock()
  473. if len(dayArr)>0 {
  474. pendAllArr = append(pendAllArr,dayArr)
  475. dayArr = []map[string]interface{}{}
  476. }
  477. log.Println("查询数量:",num,"符合条件:",oknum)
  478. if len(pendAllArr) <= 0 {
  479. log.Println("没找到dataging==1的数据")
  480. }
  481. //测试分组数量是否正确
  482. testNum:=0
  483. for k,v:=range pendAllArr {
  484. log.Println("第",k,"组--","数量:",len(v))
  485. testNum = testNum+len(v)
  486. }
  487. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  488. n, repeateN := 0, 0
  489. log.Println("线程数:",threadNum)
  490. pool := make(chan bool, threadNum)
  491. wg := &sync.WaitGroup{}
  492. for k,v:=range pendAllArr { //每组结束更新一波数据
  493. pool <- true
  494. wg.Add(1)
  495. go func(k int, v []map[string]interface{}) {
  496. defer func() {
  497. <-pool
  498. wg.Done()
  499. }()
  500. //每组临时数组 - 互不干扰
  501. groupUpdateExtract := [][]map[string]interface{}{}
  502. //
  503. groupOtherExtract := [][]map[string]interface{}{}
  504. //构建当前组的数据池
  505. log.Println("构建第", k, "组---(数据池)")
  506. //当前组的第一个发布时间
  507. first_pt := util.Int64All(v[len(v)-1]["publishtime"])
  508. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  509. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  510. n = n + len(v)
  511. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  512. for _, tmp := range v {
  513. info := NewInfo(tmp)
  514. if !LowHeavy { //是否进行低质量数据判重
  515. if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
  516. log.Println("无效数据")
  517. updatelock.Lock()
  518. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  519. map[string]interface{}{
  520. "_id": tmp["_id"],
  521. },
  522. map[string]interface{}{
  523. "$set": map[string]interface{}{
  524. "repeat": -1, //无效数据标签
  525. "dataging": 0,
  526. },
  527. },
  528. })
  529. if len(groupUpdateExtract) > 200 {
  530. mgo.UpSertBulk(extract, groupUpdateExtract...)
  531. groupUpdateExtract = [][]map[string]interface{}{}
  532. }
  533. updatelock.Unlock()
  534. return
  535. }
  536. }
  537. b, source, reason := curTM.check(info)
  538. if b { //有重复,生成更新语句,更新抽取和更新招标
  539. repeateN++
  540. //重复数据打标签
  541. repeat_ids:=source.repeat_ids
  542. repeat_ids = append(repeat_ids,info.id)
  543. source.repeat_ids = repeat_ids
  544. //替换数据池-更新
  545. DM.replacePoolData(source)
  546. updatelock.Lock()
  547. //更新数据源- 14 或者 15
  548. //判断是否在当前段落
  549. if judgeIsCurIds(gtid,lteid,source.id) {
  550. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签
  551. map[string]interface{}{
  552. "_id": StringTOBsonId(source.id),
  553. },
  554. map[string]interface{}{
  555. "$set": map[string]interface{}{
  556. "repeat_ids": repeat_ids,
  557. },
  558. },
  559. })
  560. }else {
  561. groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
  562. map[string]interface{}{
  563. "_id": StringTOBsonId(source.id),
  564. },
  565. map[string]interface{}{
  566. "$set": map[string]interface{}{
  567. "repeat_ids": repeat_ids,
  568. },
  569. },
  570. })
  571. }
  572. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  573. map[string]interface{}{
  574. "_id": tmp["_id"],
  575. },
  576. map[string]interface{}{
  577. "$set": map[string]interface{}{
  578. "repeat": 1,
  579. "repeat_reason": reason,
  580. "repeat_id": source.id,
  581. "dataging": 0,
  582. },
  583. },
  584. })
  585. if len(groupUpdateExtract) > 200 {
  586. mgo.UpSertBulk(extract, groupUpdateExtract...)
  587. groupUpdateExtract = [][]map[string]interface{}{}
  588. }
  589. if len(groupOtherExtract) > 200 {
  590. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  591. groupOtherExtract = [][]map[string]interface{}{}
  592. }
  593. updatelock.Unlock()
  594. } else {
  595. updatelock.Lock()
  596. groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{
  597. map[string]interface{}{
  598. "_id": tmp["_id"],
  599. },
  600. map[string]interface{}{
  601. "$set": map[string]interface{}{
  602. "dataging": 0, //符合条件的都为dataging==0
  603. },
  604. },
  605. })
  606. if len(groupUpdateExtract) > 200 {
  607. mgo.UpSertBulk(extract, groupUpdateExtract...)
  608. groupUpdateExtract = [][]map[string]interface{}{}
  609. }
  610. updatelock.Unlock()
  611. }
  612. }
  613. //每组数据结束-更新数据
  614. updatelock.Lock()
  615. if len(groupUpdateExtract) > 0 {
  616. mgo.UpSertBulk(extract, groupUpdateExtract...)
  617. }
  618. if len(groupOtherExtract) > 0 {
  619. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  620. }
  621. updatelock.Unlock()
  622. }(k, v)
  623. }
  624. wg.Wait()
  625. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  626. if n >= repeateN && gtid!=lteid{
  627. for _, to := range nextNode {
  628. next_sid := util.BsonIdToSId(gtid)
  629. next_eid := util.BsonIdToSId(lteid)
  630. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  631. by, _ := json.Marshal(map[string]interface{}{
  632. "gtid": next_sid,
  633. "lteid": next_eid,
  634. "stype": util.ObjToString(to["stype"]),
  635. "key": key,
  636. })
  637. addr := &net.UDPAddr{
  638. IP: net.ParseIP(to["addr"].(string)),
  639. Port: util.IntAll(to["port"]),
  640. }
  641. node := &udpNode{by, addr, time.Now().Unix(), 0}
  642. udptaskmap.Store(key, node)
  643. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  644. }
  645. }
  646. end:=time.Now().Unix()
  647. log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
  648. log.Println(gtid,lteid)
  649. if end-start<60*5 {
  650. log.Println("睡眠.............")
  651. time.Sleep(5 * time.Minute)
  652. }
  653. log.Println("继续下一段的历史判重")
  654. }
  655. }
  656. //判断是否在当前id段落
  657. func judgeIsCurIds (gtid string,lteid string,curid string) bool {
  658. return false
  659. }
  660. //迁移上一段数据
  661. func moveHistoryData(startid string,endid string) {
  662. sess := mgo.GetMgoConn()
  663. defer mgo.DestoryMongoConn(sess)
  664. year, month, day := time.Now().Date()
  665. q := map[string]interface{}{
  666. "_id": map[string]interface{}{
  667. "$gt": StringTOBsonId(startid),
  668. "$lte": StringTOBsonId(endid),
  669. },
  670. }
  671. log.Println(q)
  672. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  673. index := 0
  674. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  675. mgo.Save(extract_back, tmp)
  676. tmp = map[string]interface{}{}
  677. if index%1000 == 0 {
  678. log.Println("index", index)
  679. }
  680. }
  681. log.Println("save to", extract_back, " ok index", index)
  682. qv := map[string]interface{}{
  683. "comeintime": map[string]interface{}{
  684. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
  685. },
  686. }
  687. delnum := mgo.Delete(extract, qv)
  688. log.Println("remove from ", extract, delnum)
  689. }
  690. func moveTimeoutData() {
  691. log.Println("部署迁移定时任务")
  692. c := cron.New()
  693. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  694. c.Start()
  695. }
  696. func moveOnceTimeOut() {
  697. log.Println("执行一次迁移超时数据")
  698. sess := mgo.GetMgoConn()
  699. defer mgo.DestoryMongoConn(sess)
  700. now:=time.Now()
  701. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  702. task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  703. q := map[string]interface{}{
  704. "_id": map[string]interface{}{
  705. "$lt": StringTOBsonId(task_id),
  706. },
  707. }
  708. it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
  709. index := 0
  710. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  711. if index%10000 == 0 {
  712. log.Println("index", index)
  713. }
  714. del_id:=BsonTOStringId(tmp["_id"])
  715. mgo.Save("result_20200713", tmp)
  716. mgo.DeleteById("result_20200714",del_id)
  717. tmp = map[string]interface{}{}
  718. }
  719. log.Println("save and delete", " ok index", index)
  720. }