main.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "log"
  9. mu "mfw/util"
  10. "net"
  11. "nsqdata"
  12. qu "qfw/util"
  13. "regexp"
  14. "sync"
  15. "time"
  16. )
  17. var (
  18. Sysconfig map[string]interface{} //配置文件
  19. data_mgo, task_mgo, spider_mgo *MongodbSim
  20. task_coll, task_bidding, spider_coll string
  21. extract, extract_back, extract_log string
  22. udpclient mu.UdpClient
  23. nextNode []map[string]interface{}
  24. dupdays = 7
  25. DM, FullDM *datamap
  26. Update *updateInfo
  27. AddGroupPool *addGroupInfo
  28. //正则筛选相关
  29. FilterRegTitle = regexp.MustCompile("^_$")
  30. FilterRegTitle_0 = regexp.MustCompile("^_$")
  31. FilterRegTitle_1 = regexp.MustCompile("^_$")
  32. FilterRegTitle_2 = regexp.MustCompile("^_$")
  33. threadNum int
  34. SiteMap map[string]map[string]interface{}
  35. LowHeavy, TimingTask, IsFull, isUpdateSite bool
  36. timingSpanDay, timingPubScope int64
  37. gtid, lastid, sec_gtid, sec_lteid, lteid string
  38. updatelock, datalock, numlock, cronlock sync.Mutex
  39. jyfb_data map[string]string
  40. taskList []map[string]interface{}
  41. nspdata_1, nspdata_2 *nsqdata.Producer
  42. )
  43. //初始化加载
  44. func init() {
  45. flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
  46. flag.StringVar(&gtid, "gtid", "", "历史增量的起始id") //历史
  47. flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
  48. flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
  49. flag.Parse()
  50. qu.ReadConfig(&Sysconfig)
  51. initMgo()
  52. initOther()
  53. initSite()
  54. }
  55. func mainT() {
  56. IsFull = true
  57. //AddGroupPool = newAddGroupPool()
  58. //go AddGroupPool.addGroupData()
  59. //fullDataRepeat() //全量判重
  60. increaseRepeat(map[string]interface{}{
  61. "gtid": "12ec61170ae152a3c2310f02",
  62. "lteid": "92ec61170ae152a3c2310f02",
  63. })
  64. //gtid = "62ec2dd00ae152a3c230c1a1"
  65. //lteid = "62ec2dd00ae152a3c230c1e1"
  66. //historyRepeat()
  67. time.Sleep(99999 * time.Hour)
  68. }
  69. //主函数
  70. func main() {
  71. go checkMapJob()
  72. updport := Sysconfig["udpport"].(string)
  73. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  74. udpclient.Listen(processUdpMsg)
  75. log.Println("Udp服务监听", updport)
  76. if TimingTask {
  77. log.Println("正常历史部署")
  78. go historyRepeat()
  79. } else {
  80. if !IsFull {
  81. log.Println("正常增量部署,监听任务")
  82. go getRepeatTask()
  83. }
  84. }
  85. time.Sleep(99999 * time.Hour)
  86. }
  87. //udp接收
  88. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  89. switch act {
  90. case mu.OP_TYPE_DATA:
  91. var mapInfo map[string]interface{}
  92. err := json.Unmarshal(data, &mapInfo)
  93. if err != nil {
  94. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  95. } else if mapInfo != nil {
  96. sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
  97. if sid == "" || eid == "" {
  98. log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
  99. } else {
  100. key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
  101. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  102. //计算是否需要加载站点~每天加载一次
  103. if isUpdateSite {
  104. initSite()
  105. }
  106. //插入任务-判断任务-是否存在
  107. updatelock.Lock()
  108. taskList = append(taskList, mapInfo)
  109. log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
  110. updatelock.Unlock()
  111. }
  112. }
  113. case mu.OP_NOOP: //下个节点回应
  114. ok := string(data)
  115. if ok != "" {
  116. log.Println("ok:", ok)
  117. udptaskmap.Delete(ok)
  118. }
  119. }
  120. }
  121. //监听-获取-分发判重任务
  122. func getRepeatTask() {
  123. for {
  124. if len(taskList) > 0 {
  125. updatelock.Lock()
  126. len_list := len(taskList)
  127. if len_list > 1 {
  128. first_id := taskList[0]["gtid"]
  129. end_id := taskList[len_list-1]["lteid"]
  130. if first_id != "" && end_id != "" {
  131. log.Println("合并段落~正常~", first_id, "~", end_id)
  132. increaseRepeat(map[string]interface{}{
  133. "gtid": first_id,
  134. "lteid": end_id,
  135. })
  136. taskList = taskList[len_list:]
  137. log.Println("此段落结束当前任务池...", len(taskList), taskList)
  138. } else {
  139. log.Println("合并段落~错误~正常取段落~~~")
  140. mapInfo := taskList[0]
  141. if mapInfo != nil {
  142. increaseRepeat(mapInfo) //判重方法
  143. }
  144. taskList = taskList[1:]
  145. log.Println("此段落结束当前任务池...", len(taskList), taskList)
  146. }
  147. } else {
  148. mapInfo := taskList[0]
  149. if mapInfo != nil {
  150. increaseRepeat(mapInfo) //判重方法
  151. }
  152. taskList = taskList[1:]
  153. log.Println("此段落结束当前任务池...", len(taskList), taskList)
  154. }
  155. updatelock.Unlock()
  156. } else {
  157. time.Sleep(15 * time.Second)
  158. }
  159. }
  160. }