main.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "github.com/cron"
  9. "gopkg.in/mgo.v2/bson"
  10. "log"
  11. mu "mfw/util"
  12. "net"
  13. "os"
  14. "qfw/util"
  15. "regexp"
  16. "strconv"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. Sysconfig map[string]interface{} //配置文件
  22. mconf map[string]interface{} //mongodb配置信息
  23. mgo *MongodbSim //mongodb操作对象
  24. task_mgo *MongodbSim //mongodb操作对象
  25. task_collName string
  26. extract string
  27. extract_back string
  28. udpclient mu.UdpClient //udp对象
  29. nextNode []map[string]interface{} //下节点数组
  30. dupdays = 7 //初始化判重范围
  31. DM *datamap //
  32. Update *updateInfo
  33. //正则筛选相关
  34. FilterRegTitle = regexp.MustCompile("^_$")
  35. FilterRegTitle_0 = regexp.MustCompile("^_$")
  36. FilterRegTitle_1 = regexp.MustCompile("^_$")
  37. FilterRegTitle_2 = regexp.MustCompile("^_$")
  38. isMerger bool //是否合并
  39. threadNum int //线程数量
  40. SiteMap map[string]map[string]interface{} //站点map
  41. LowHeavy bool //低质量数据判重
  42. TimingTask bool //是否定时任务
  43. timingSpanDay int64 //时间跨度
  44. timingPubScope int64 //发布时间周期
  45. gtid,lastid,gtept,ltept string //命令输入
  46. lteid string //历史增量属性
  47. IsFull bool //是否全量
  48. updatelock sync.Mutex //锁4
  49. userName,passWord string //mongo -用户密码
  50. taskList []map[string]interface{} //任务池
  51. )
  52. //udp通道
  53. var udptask chan struct{} = make(chan struct{}, 1)
  54. func init() {
  55. flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
  56. flag.StringVar(&gtid, "gtid", "", "历史增量的起始id") //历史
  57. flag.StringVar(&gtept, "gtept", "", "全量gte发布时间")//全量区间pt
  58. flag.StringVar(&ltept, "ltept", "", "全量lte发布时间") //全量区间pt
  59. flag.Parse()
  60. util.ReadConfig(&Sysconfig)
  61. userName = util.ObjToString(Sysconfig["userName"])
  62. passWord = util.ObjToString(Sysconfig["passWord"])
  63. log.Println("集群用户密码:",userName,passWord)
  64. task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
  65. task_mgo = &MongodbSim{
  66. MongodbAddr: task_mconf["task_addrName"].(string),
  67. DbName: task_mconf["task_dbName"].(string),
  68. Size: util.IntAllDef(task_mconf["task_pool"], 10),
  69. UserName: userName,
  70. Password: passWord,
  71. }
  72. task_mgo.InitPool()
  73. task_collName = task_mconf["task_collName"].(string)
  74. nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  75. mconf = Sysconfig["mongodb"].(map[string]interface{})
  76. mgo = &MongodbSim{
  77. MongodbAddr: mconf["addr"].(string),
  78. DbName: mconf["db"].(string),
  79. Size: util.IntAllDef(mconf["pool"], 10),
  80. }
  81. mgo.InitPool()
  82. extract = mconf["extract"].(string)
  83. extract_back = mconf["extract_back"].(string)
  84. dupdays = util.IntAllDef(Sysconfig["dupdays"], 3)
  85. //加载数据
  86. DM = NewDatamap(dupdays, lastid)
  87. //更新池
  88. Update = newUpdatePool()
  89. go Update.updateData()
  90. FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"]))
  91. FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"]))
  92. FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"]))
  93. FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"]))
  94. isMerger = Sysconfig["isMerger"].(bool)
  95. threadNum = util.IntAllDef(Sysconfig["threads"], 1)
  96. LowHeavy = Sysconfig["lowHeavy"].(bool)
  97. TimingTask = Sysconfig["timingTask"].(bool)
  98. timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"])
  99. timingPubScope = util.Int64All(Sysconfig["timingPubScope"])
  100. //站点配置
  101. site := mconf["site"].(map[string]interface{})
  102. SiteMap = make(map[string]map[string]interface{}, 0)
  103. start := int(time.Now().Unix())
  104. sess_site := mgo.GetMgoConn()
  105. defer mgo.DestoryMongoConn(sess_site)
  106. res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
  107. for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
  108. data_map := map[string]interface{}{
  109. "area": util.ObjToString(site_dict["area"]),
  110. "city": util.ObjToString(site_dict["city"]),
  111. "district": util.ObjToString(site_dict["district"]),
  112. "sitetype": util.ObjToString(site_dict["sitetype"]),
  113. "level": util.ObjToString(site_dict["level"]),
  114. "weight": util.ObjToString(site_dict["weight"]),
  115. }
  116. SiteMap[util.ObjToString(site_dict["site"])] = data_map
  117. }
  118. log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
  119. }
  120. func mainT() {
  121. go checkMapJob()
  122. updport := Sysconfig["udpport"].(string)
  123. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  124. udpclient.Listen(processUdpMsg)
  125. log.Println("Udp服务监听", updport)
  126. if TimingTask {
  127. log.Println("正常历史部署")
  128. go historyTaskDay()
  129. }else {
  130. if gtept!=""&&ltept!="" {
  131. log.Println("全量判重-准备开始")
  132. IsFull = true //全量判重
  133. sid := "1fffffffffffffffffffffff"
  134. eid := "9fffffffffffffffffffffff"
  135. mapinfo := map[string]interface{}{}
  136. if sid == "" || eid == "" {
  137. log.Println("sid,eid参数不能为空")
  138. os.Exit(0)
  139. }
  140. mapinfo["gtid"] = sid
  141. mapinfo["lteid"] = eid
  142. mapinfo["stop"] = "true"
  143. taskRepeat(mapinfo)
  144. time.Sleep(99999 * time.Hour)
  145. }else {
  146. //正常增量
  147. log.Println("正常增量部署,监听任务")
  148. go getRepeatTask()
  149. }
  150. }
  151. time.Sleep(99999 * time.Hour)
  152. }
  153. //测试组人员使用
  154. func main() {
  155. if TimingTask {
  156. go historyTaskDay()
  157. time.Sleep(99999 * time.Hour)
  158. } else {
  159. IsFull = true //全量判重
  160. sid := "1fffffffffffffffffffffff"
  161. eid := "9fffffffffffffffffffffff"
  162. mapinfo := map[string]interface{}{}
  163. if sid == "" || eid == "" {
  164. log.Println("sid,eid参数不能为空")
  165. os.Exit(0)
  166. }
  167. mapinfo["gtid"] = sid
  168. mapinfo["lteid"] = eid
  169. mapinfo["stop"] = "true"
  170. log.Println("测试:全量判重-准备开始")
  171. taskRepeat(mapinfo)
  172. time.Sleep(99999 * time.Hour)
  173. }
  174. }
  175. //udp接收
  176. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  177. switch act {
  178. case mu.OP_TYPE_DATA:
  179. var mapInfo map[string]interface{}
  180. err := json.Unmarshal(data, &mapInfo)
  181. if err != nil {
  182. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  183. } else if mapInfo != nil {
  184. key, _ := mapInfo["key"].(string)
  185. if key == "" {
  186. key = "udpok"
  187. }
  188. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  189. //插入任务-判断任务-是否存在
  190. updatelock.Lock()
  191. taskList = append(taskList,mapInfo)
  192. log.Println("udp收到任务...数量:",len(taskList),"具体任务:",taskList)
  193. updatelock.Unlock()
  194. }
  195. case mu.OP_NOOP: //下个节点回应
  196. ok := string(data)
  197. if ok != "" {
  198. log.Println("ok:", ok)
  199. udptaskmap.Delete(ok)
  200. }
  201. }
  202. }
  203. //upd接收
  204. //func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  205. // select {
  206. // case udptask <- struct{}{}:
  207. // log.Println("...接收段落,通道正常...")
  208. // switch act {
  209. // case mu.OP_TYPE_DATA: //上个节点的数据
  210. // var mapInfo map[string]interface{}
  211. // err := json.Unmarshal(data, &mapInfo)
  212. // if err != nil {
  213. // log.Println("error data:", err)
  214. // udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  215. // } else if mapInfo != nil {
  216. // key, _ := mapInfo["key"].(string)
  217. // if key == "" {
  218. // key = "udpok"
  219. // }
  220. // log.Println("当前段落,需要判重...",mapInfo)
  221. // udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  222. // task(data, mapInfo)
  223. // }
  224. // log.Println("此段任务结束...",err,mapInfo)
  225. // <-udptask
  226. // case mu.OP_NOOP: //下个节点回应
  227. // ok := string(data)
  228. // if ok != "" {
  229. // log.Println("下节点回应-ok:", ok)
  230. // udptaskmap.Delete(ok)
  231. // }
  232. // <-udptask
  233. // }
  234. // case <-time.After(2 * time.Second):
  235. // switch act {
  236. // case mu.OP_TYPE_DATA: //上个节点的数据
  237. // log.Println("通道堵塞中...上节点")
  238. // udpclient.WriteUdp([]byte("repeat_busy"), mu.OP_NOOP, ra)
  239. // case mu.OP_NOOP: //下个节点回应
  240. // log.Println("通道堵塞中...下节点")
  241. // ok := string(data)
  242. // if ok != "" {
  243. // log.Println("下节点回应-ok:", ok)
  244. // udptaskmap.Delete(ok)
  245. // }
  246. // }
  247. // }
  248. //
  249. // //udptask <- struct{}{}
  250. // //defer func() {
  251. // // <-udptask
  252. // //}()
  253. //}
  254. //监听-获取-分发判重任务
  255. func getRepeatTask() {
  256. for {
  257. if len(taskList)>0 {
  258. updatelock.Lock()
  259. //log.Println("准备执行判重任务...")
  260. mapInfo := taskList[0]
  261. if mapInfo != nil {
  262. taskRepeat(mapInfo) //判重方法
  263. }
  264. taskList = taskList[1:]
  265. log.Println("此段落结束当前任务池...",len(taskList),taskList)
  266. updatelock.Unlock()
  267. }else {
  268. //log.Println("无任务...睡眠15s")
  269. time.Sleep(15 * time.Second)
  270. }
  271. }
  272. }
  273. //开始判重程序
  274. func taskRepeat(mapInfo map[string]interface{}) {
  275. defer util.Catch()
  276. //区间id
  277. q := map[string]interface{}{
  278. "_id": map[string]interface{}{
  279. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  280. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  281. },
  282. }
  283. //全量
  284. if IsFull && gtept!="" && ltept!=""{
  285. log.Println("执行全量分段模式")
  286. log.Println(gtept,"---",ltept)
  287. q = map[string]interface{}{
  288. "publishtime": map[string]interface{}{
  289. "$gte": util.Int64All(gtept),
  290. "$lte": util.Int64All(ltept),
  291. },
  292. }
  293. }
  294. //临时赋值
  295. log.Println("开始数据判重~查询条件:",mgo.DbName, extract, q)
  296. sess := mgo.GetMgoConn()
  297. defer mgo.DestoryMongoConn(sess)
  298. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  299. pool := make(chan bool, threadNum)
  300. wg := &sync.WaitGroup{}
  301. n, repeateN := 0, 0
  302. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  303. if n%1000 == 0 {
  304. log.Println("current:", n, tmp["_id"],tmp["publishtime"], "repeateN:", repeateN)
  305. }
  306. if util.IntAll(tmp["repeat"]) == 1 {
  307. repeateN++
  308. tmp = make(map[string]interface{})
  309. continue
  310. }
  311. if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
  312. tmp = make(map[string]interface{})
  313. continue
  314. }
  315. pool <- true
  316. wg.Add(1)
  317. go func(tmp map[string]interface{}) {
  318. defer func() {
  319. <-pool
  320. wg.Done()
  321. }()
  322. info := NewInfo(tmp)
  323. //正常判重
  324. b, source, reason := DM.check(info)
  325. if b {
  326. repeateN++
  327. var updateID = map[string]interface{}{} //记录更新判重的
  328. updateID["_id"] = StringTOBsonId(info.id)
  329. repeat_ids:=source.repeat_ids
  330. repeat_ids = append(repeat_ids,info.id)
  331. source.repeat_ids = repeat_ids
  332. //替换数据池-更新
  333. DM.replacePoolData(source)
  334. //Update.updatePool <- []map[string]interface{}{//原始数据打标签
  335. // map[string]interface{}{
  336. // "_id": StringTOBsonId(source.id),
  337. // },
  338. // map[string]interface{}{
  339. // "$set": map[string]interface{}{
  340. // "repeat_ids": repeat_ids,
  341. // },
  342. // },
  343. //}
  344. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  345. updateID,
  346. map[string]interface{}{
  347. "$set": map[string]interface{}{
  348. "repeat": 1,
  349. "repeat_reason": reason,
  350. "repeat_id": source.id,
  351. "dataging": 0,
  352. "updatetime_repeat" :util.Int64All(time.Now().Unix()),
  353. },
  354. },
  355. }
  356. }
  357. }(tmp)
  358. tmp = make(map[string]interface{})
  359. }
  360. wg.Wait()
  361. log.Println("this current task over.", n, "repeateN:", repeateN, mapInfo["stop"])
  362. //log.Println("当前数据池的数量:",DM.currentTotalCount())
  363. //睡眠时间30s 目的是让数据池更新所有数据...
  364. time.Sleep(15 * time.Second)
  365. //更新Ocr的标记
  366. if !IsFull {
  367. updateOcrFileData(mapInfo["lteid"].(string))
  368. //任务完成,开始发送广播通知下面节点
  369. if n >= repeateN && mapInfo["stop"] == nil {
  370. log.Println("判重任务完成发送udp")
  371. for _, to := range nextNode {
  372. sid, _ := mapInfo["gtid"].(string)
  373. eid, _ := mapInfo["lteid"].(string)
  374. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  375. by, _ := json.Marshal(map[string]interface{}{
  376. "gtid": sid,
  377. "lteid": eid,
  378. "stype": util.ObjToString(to["stype"]),
  379. "key": key,
  380. })
  381. addr := &net.UDPAddr{
  382. IP: net.ParseIP(to["addr"].(string)),
  383. Port: util.IntAll(to["port"]),
  384. }
  385. node := &udpNode{by, addr, time.Now().Unix(), 0}
  386. udptaskmap.Store(key, node)
  387. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  388. }
  389. }
  390. }
  391. }
  392. func updateOcrFileData(cur_lteid string) {
  393. //更新ocr 分类表-判重的状态
  394. log.Println("开始更新Ocr表-标记",cur_lteid)
  395. task_sess := task_mgo.GetMgoConn()
  396. defer task_mgo.DestoryMongoConn(task_sess)
  397. q_task:=map[string]interface{}{}
  398. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
  399. isUpdateOcr:=false
  400. updateOcrFile:=[][]map[string]interface{}{}
  401. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  402. cur_id := BsonTOStringId(tmp["_id"])
  403. lteid:=util.ObjToString(tmp["lteid"])
  404. if (lteid==cur_lteid) { //需要更新
  405. log.Println("找到该lteid数据",cur_lteid,cur_id)
  406. isUpdateOcr = true
  407. updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
  408. map[string]interface{}{
  409. "_id": tmp["_id"],
  410. },
  411. map[string]interface{}{
  412. "$set": map[string]interface{}{
  413. "is_repeat_status": 1,
  414. "is_repeat_time" : util.Int64All(time.Now().Unix()),
  415. },
  416. },
  417. })
  418. tmp = make(map[string]interface{})
  419. break
  420. }else {
  421. tmp = make(map[string]interface{})
  422. }
  423. }
  424. if !isUpdateOcr {
  425. log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
  426. }else {
  427. if len(updateOcrFile) > 0 {
  428. task_mgo.UpSertBulk(task_collName, updateOcrFile...)
  429. }
  430. }
  431. }
  432. //历史判重
  433. func historyTaskDay() {
  434. defer util.Catch()
  435. for {
  436. start:=time.Now().Unix()
  437. if gtid=="" {
  438. log.Println("请传gtid,否则无法运行")
  439. os.Exit(0)
  440. return
  441. }
  442. if lteid!="" {
  443. //先进行数据迁移
  444. log.Println("开启一次迁移任务",gtid,lteid)
  445. moveHistoryData(gtid,lteid)
  446. gtid = lteid //替换数据
  447. }
  448. //查询表最后一个id
  449. task_sess := task_mgo.GetMgoConn()
  450. defer task_mgo.DestoryMongoConn(task_sess)
  451. q:=map[string]interface{}{}
  452. between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期
  453. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
  454. isRepeatStatus:=false
  455. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  456. is_repeat_status:=util.IntAll(tmp["is_repeat_status"])
  457. if is_repeat_status == 1 {
  458. lteid = util.ObjToString(tmp["lteid"])
  459. log.Println("查询的最后一个已标记的任务lteid:",lteid)
  460. isRepeatStatus = true
  461. tmp = make(map[string]interface{})
  462. break
  463. }else {
  464. tmp = make(map[string]interface{})
  465. }
  466. }
  467. if !isRepeatStatus {
  468. log.Println("查询不到有标记的lteid数据")
  469. log.Println("睡眠5分钟 gtid:",gtid,"lteid:",lteid)
  470. time.Sleep(5 * time.Minute)
  471. continue
  472. }
  473. log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟",gtid,lteid)
  474. time.Sleep(5 * time.Minute)
  475. sess := mgo.GetMgoConn()//连接器
  476. defer mgo.DestoryMongoConn(sess)
  477. //开始判重
  478. q = map[string]interface{}{
  479. "_id": map[string]interface{}{
  480. "$gt": StringTOBsonId(gtid),
  481. "$lte": StringTOBsonId(lteid),
  482. },
  483. }
  484. log.Println("历史判重查询条件:",q,"时间:", between_time)
  485. it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  486. num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数
  487. pendAllArr:=[][]map[string]interface{}{}//待处理数组
  488. dayArr := []map[string]interface{}{}
  489. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  490. if num%10000 == 0 {
  491. log.Println("正序遍历:", num)
  492. }
  493. //取-符合-发布时间X年内的数据
  494. if util.IntAll(tmp["dataging"]) == 1 {
  495. pubtime := util.Int64All(tmp["publishtime"])
  496. if pubtime > 0 && pubtime >= between_time {
  497. oknum++
  498. if deterTime==0 {
  499. log.Println("找到第一条符合条件的数据")
  500. deterTime = util.Int64All(tmp["publishtime"])
  501. dayArr = append(dayArr,tmp)
  502. }else {
  503. if pubtime-deterTime >timingSpanDay*86400 {
  504. //新数组重新构建,当前组数据加到全部组数据
  505. pendAllArr = append(pendAllArr,dayArr)
  506. dayArr = []map[string]interface{}{}
  507. deterTime = util.Int64All(tmp["publishtime"])
  508. dayArr = append(dayArr,tmp)
  509. }else {
  510. dayArr = append(dayArr,tmp)
  511. }
  512. }
  513. }else {
  514. outnum++
  515. //不在两年内的也清标记
  516. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  517. map[string]interface{}{
  518. "_id": tmp["_id"],
  519. },
  520. map[string]interface{}{
  521. "$set": map[string]interface{}{
  522. "dataging": 0,
  523. "history_updatetime":util.Int64All(time.Now().Unix()),
  524. },
  525. },
  526. }
  527. }
  528. }
  529. tmp = make(map[string]interface{})
  530. }
  531. if len(dayArr)>0 {
  532. pendAllArr = append(pendAllArr,dayArr)
  533. dayArr = []map[string]interface{}{}
  534. }
  535. log.Println("查询数量:",num,"符合条件:",oknum,"未在两年内:",outnum)
  536. if len(pendAllArr) <= 0 {
  537. log.Println("没找到dataging==1的数据")
  538. }
  539. //测试分组数量是否正确
  540. testNum:=0
  541. for k,v:=range pendAllArr {
  542. log.Println("第",k,"组--","数量:",len(v))
  543. testNum = testNum+len(v)
  544. }
  545. log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum)
  546. n, repeateN := 0, 0
  547. log.Println("线程数:",threadNum)
  548. pool := make(chan bool, threadNum)
  549. wg := &sync.WaitGroup{}
  550. for k,v:=range pendAllArr { //每组结束更新一波数据
  551. pool <- true
  552. wg.Add(1)
  553. go func(k int, v []map[string]interface{}) {
  554. defer func() {
  555. <-pool
  556. wg.Done()
  557. }()
  558. //相关ids 跨表
  559. groupOtherExtract := [][]map[string]interface{}{}
  560. //构建当前组的数据池
  561. log.Println("构建第", k, "组---(数据池)")
  562. //当前组的第一个发布时间
  563. first_pt := util.Int64All(v[len(v)-1]["publishtime"])
  564. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  565. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  566. n = n + len(v)
  567. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  568. for _, tmp := range v {
  569. info := NewInfo(tmp)
  570. b, source, reason := curTM.check(info)
  571. if b { //有重复,生成更新语句,更新抽取和更新招标
  572. repeateN++
  573. //重复数据打标签
  574. repeat_ids:=source.repeat_ids
  575. repeat_ids = append(repeat_ids,info.id)
  576. source.repeat_ids = repeat_ids
  577. updatelock.Lock()
  578. //替换数据池-更新
  579. DM.replacePoolData(source)
  580. //更新数据源
  581. //判断是否在当前段落
  582. if judgeIsCurIds(gtid,lteid,source.id) {
  583. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  584. map[string]interface{}{
  585. "_id": StringTOBsonId(source.id),
  586. },
  587. map[string]interface{}{
  588. "$set": map[string]interface{}{
  589. "repeat_ids": repeat_ids,
  590. },
  591. },
  592. }
  593. }else {
  594. groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签
  595. map[string]interface{}{
  596. "_id": StringTOBsonId(source.id),
  597. },
  598. map[string]interface{}{
  599. "$set": map[string]interface{}{
  600. "repeat_ids": repeat_ids,
  601. },
  602. },
  603. })
  604. }
  605. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  606. map[string]interface{}{
  607. "_id": tmp["_id"],
  608. },
  609. map[string]interface{}{
  610. "$set": map[string]interface{}{
  611. "repeat": 1,
  612. "repeat_reason": reason,
  613. "repeat_id": source.id,
  614. "dataging": 0,
  615. "history_updatetime":util.Int64All(time.Now().Unix()),
  616. },
  617. },
  618. }
  619. if len(groupOtherExtract) >= 500 {
  620. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  621. groupOtherExtract = [][]map[string]interface{}{}
  622. }
  623. updatelock.Unlock()
  624. } else {
  625. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  626. map[string]interface{}{
  627. "_id": tmp["_id"],
  628. },
  629. map[string]interface{}{
  630. "$set": map[string]interface{}{
  631. "dataging": 0, //符合条件的都为dataging==0
  632. "history_updatetime":util.Int64All(time.Now().Unix()),
  633. },
  634. },
  635. }
  636. }
  637. }
  638. //每组数据结束-更新数据
  639. updatelock.Lock()
  640. if len(groupOtherExtract) > 0 {
  641. mgo.UpSertBulk(extract_back, groupOtherExtract...)
  642. }
  643. updatelock.Unlock()
  644. }(k, v)
  645. }
  646. wg.Wait()
  647. log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid)
  648. time.Sleep(30 * time.Second)
  649. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  650. if n >= repeateN && gtid!=lteid{
  651. for _, to := range nextNode {
  652. next_sid := util.BsonIdToSId(gtid)
  653. next_eid := util.BsonIdToSId(lteid)
  654. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  655. by, _ := json.Marshal(map[string]interface{}{
  656. "gtid": next_sid,
  657. "lteid": next_eid,
  658. "stype": util.ObjToString(to["stype"]),
  659. "key": key,
  660. })
  661. addr := &net.UDPAddr{
  662. IP: net.ParseIP(to["addr"].(string)),
  663. Port: util.IntAll(to["port"]),
  664. }
  665. node := &udpNode{by, addr, time.Now().Unix(), 0}
  666. udptaskmap.Store(key, node)
  667. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  668. }
  669. }
  670. end:=time.Now().Unix()
  671. log.Println(gtid,lteid)
  672. if end-start<60*5 {
  673. log.Println("睡眠.............")
  674. time.Sleep(5 * time.Minute)
  675. }
  676. log.Println("继续下一段的历史判重")
  677. }
  678. }
  679. //判断是否在当前id段落
  680. func judgeIsCurIds (gtid string,lteid string,curid string) bool {
  681. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  682. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  683. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  684. if cur_time>=gt_time&&cur_time<=lte_time {
  685. return true
  686. }
  687. return false
  688. }
  689. //迁移上一段数据
  690. func moveHistoryData(startid string,endid string) {
  691. sess := mgo.GetMgoConn()
  692. defer mgo.DestoryMongoConn(sess)
  693. year, month, day := time.Now().Date()
  694. q := map[string]interface{}{
  695. "_id": map[string]interface{}{
  696. "$gt": StringTOBsonId(startid),
  697. "$lte": StringTOBsonId(endid),
  698. },
  699. }
  700. log.Println(q)
  701. it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter()
  702. index := 0
  703. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  704. mgo.Save(extract_back, tmp)
  705. tmp = map[string]interface{}{}
  706. if index%1000 == 0 {
  707. log.Println("index", index)
  708. }
  709. }
  710. log.Println("save to", extract_back, " ok index", index)
  711. qv := map[string]interface{}{
  712. "comeintime": map[string]interface{}{
  713. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour*2).Unix(),
  714. },
  715. }
  716. delnum := mgo.Delete(extract, qv)
  717. log.Println("remove from ", extract, delnum)
  718. }
  719. func moveTimeoutData() {
  720. log.Println("部署迁移定时任务")
  721. c := cron.New()
  722. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  723. c.Start()
  724. }
  725. func moveOnceTimeOut() {
  726. log.Println("执行一次迁移超时数据")
  727. sess := mgo.GetMgoConn()
  728. defer mgo.DestoryMongoConn(sess)
  729. now:=time.Now()
  730. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  731. task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  732. q := map[string]interface{}{
  733. "_id": map[string]interface{}{
  734. "$lt": StringTOBsonId(task_id),
  735. },
  736. }
  737. it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter()
  738. index := 0
  739. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  740. if index%10000 == 0 {
  741. log.Println("index", index)
  742. }
  743. del_id:=BsonTOStringId(tmp["_id"])
  744. mgo.Save("result_20200713", tmp)
  745. mgo.DeleteById("result_20200714",del_id)
  746. tmp = map[string]interface{}{}
  747. }
  748. log.Println("save and delete", " ok index", index)
  749. }