main.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package main
  2. /**
  3. 招标信息判重
  4. **/
  5. import (
  6. "encoding/json"
  7. "flag"
  8. "flow_repeat/nsqdata"
  9. "fmt"
  10. "log"
  11. "net"
  12. "regexp"
  13. "sync"
  14. "time"
  15. "jygit.jydev.jianyu360.cn/BP/jynats/jnats"
  16. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  17. mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  18. )
  19. var (
  20. Sysconfig map[string]interface{} //配置文件
  21. data_mgo, task_mgo, spider_mgo *MongodbSim
  22. task_coll, task_bidding, spider_coll string
  23. extract, extract_back, extract_log string
  24. udpclient mu.UdpClient
  25. nextNode []map[string]interface{}
  26. dupdays = 7
  27. DM, FullDM *datamap
  28. Update *updateInfo
  29. AddGroupPool *addGroupInfo
  30. //正则筛选相关
  31. FilterRegTitle = regexp.MustCompile("^_$")
  32. FilterRegTitle_0 = regexp.MustCompile("^_$")
  33. FilterRegTitle_1 = regexp.MustCompile("^_$")
  34. FilterRegTitle_2 = regexp.MustCompile("^_$")
  35. threadNum int
  36. SiteMap map[string]map[string]interface{}
  37. LowHeavy, TimingTask, IsFull, isUpdateSite bool
  38. timingSpanDay, timingPubScope int64
  39. gtid, lastid, sec_gtid, sec_lteid, lteid string
  40. updatelock, datalock, numlock, cronlock sync.Mutex
  41. jyfb_data map[string]string
  42. taskList []map[string]interface{}
  43. nspdata_1, nspdata_2 *nsqdata.Producer
  44. responselock sync.Mutex
  45. lastNodeResponse int64
  46. jn *jnats.Jnats
  47. )
  48. // 初始化加载
  49. func init() {
  50. flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
  51. flag.StringVar(&gtid, "gtid", "", "历史增量的起始id") //历史
  52. flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
  53. flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
  54. flag.Parse()
  55. qu.ReadConfig(&Sysconfig)
  56. InitAllInfos() //加载所有信息...
  57. }
  58. func main() {
  59. if TimingTask {
  60. log.Println("正常历史部署...组装...")
  61. go historyRepeat()
  62. } else {
  63. log.Println("正常增量部署...流式...")
  64. jn = jnats.NewJnats("192.168.3.240:19092")
  65. //
  66. ////先消费,带zip压缩,用于跨网传输节省流量
  67. //jn.SubZip("test", func(msg *nats.Msg) {
  68. // log.Println(string(msg.Data))
  69. // //回执消息
  70. // msg.Respond([]byte("receive msg:" + string(msg.Data)))
  71. //})
  72. }
  73. time.Sleep(99999 * time.Hour)
  74. }
  75. func mainTest() {
  76. increaseRepeat(map[string]interface{}{
  77. "gtid": "12ec61170ae152a3c2310f02",
  78. "lteid": "92ec61170ae152a3c2310f02",
  79. })
  80. time.Sleep(99999 * time.Hour)
  81. }
  82. // 主函数
  83. func mainTestTest() {
  84. go checkMailJob()
  85. lastNodeResponse = time.Now().Unix()
  86. updport := Sysconfig["udpport"].(string)
  87. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  88. udpclient.Listen(processUdpMsg)
  89. log.Println("Udp服务监听", updport)
  90. if TimingTask {
  91. log.Println("正常历史部署...")
  92. go historyRepeat()
  93. } else {
  94. if !IsFull {
  95. log.Println("正常增量部署与监控机制...")
  96. go lastUdpJob()
  97. go getRepeatTask()
  98. }
  99. }
  100. time.Sleep(99999 * time.Hour)
  101. }
  102. // udp接收
  103. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  104. switch act {
  105. case mu.OP_TYPE_DATA:
  106. var mapInfo map[string]interface{}
  107. err := json.Unmarshal(data, &mapInfo)
  108. if err != nil {
  109. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  110. } else if mapInfo != nil {
  111. sid, eid := qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])
  112. stype := qu.ObjToString(mapInfo["stype"])
  113. if stype == "monitor" {
  114. log.Println("收到监测......")
  115. key := qu.ObjToString(mapInfo["key"])
  116. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  117. return
  118. }
  119. if sid == "" || eid == "" {
  120. log.Println("接收id段异常-err ", "sid=", sid, ",eid=", eid)
  121. } else {
  122. lastNodeResponse = time.Now().Unix()
  123. key := sid + "-" + eid + "-" + qu.ObjToString(mapInfo["stype"])
  124. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  125. //计算是否需要加载站点~每天加载一次
  126. if isUpdateSite {
  127. initSite()
  128. }
  129. //插入任务-判断任务-是否存在
  130. updatelock.Lock()
  131. taskList = append(taskList, mapInfo)
  132. log.Println("udp收到任务...数量:", len(taskList), "具体任务:", taskList)
  133. updatelock.Unlock()
  134. }
  135. }
  136. case mu.OP_NOOP: //下个节点回应
  137. log.Println("下节点回应:", string(data))
  138. udptaskmap.Delete(string(data))
  139. }
  140. }
  141. // 监听-获取-分发判重任务
  142. func getRepeatTask() {
  143. for {
  144. if len(taskList) > 0 {
  145. updatelock.Lock()
  146. len_list := len(taskList)
  147. if len_list > 1 {
  148. first_id := taskList[0]["gtid"]
  149. end_id := taskList[len_list-1]["lteid"]
  150. if first_id != "" && end_id != "" {
  151. log.Println("合并段落~正常~", first_id, "~", end_id)
  152. increaseRepeat(map[string]interface{}{
  153. "gtid": first_id,
  154. "lteid": end_id,
  155. })
  156. taskList = taskList[len_list:]
  157. log.Println("此段落结束当前任务池...", len(taskList), taskList)
  158. } else {
  159. log.Println("合并段落~错误~正常取段落~~~")
  160. mapInfo := taskList[0]
  161. if mapInfo != nil {
  162. increaseRepeat(mapInfo) //判重方法
  163. }
  164. taskList = taskList[1:]
  165. log.Println("此段落结束当前任务池...", len(taskList), taskList)
  166. }
  167. } else {
  168. mapInfo := taskList[0]
  169. if mapInfo != nil {
  170. increaseRepeat(mapInfo) //判重方法
  171. }
  172. taskList = taskList[1:]
  173. log.Println("此段落结束当前任务池...", len(taskList), taskList)
  174. }
  175. updatelock.Unlock()
  176. } else {
  177. time.Sleep(15 * time.Second)
  178. }
  179. }
  180. }
  181. func lastUdpJob() {
  182. for {
  183. responselock.Lock()
  184. if time.Now().Unix()-lastNodeResponse >= 1800 {
  185. lastNodeResponse = time.Now().Unix() //重置时间
  186. sendErrMailApi("判重增量~发现处理流程超时~给予告警", fmt.Sprintf("半小时左右~无新段落数据进入判重增量流程...相关人员检查..."))
  187. }
  188. responselock.Unlock()
  189. time.Sleep(300 * time.Second)
  190. }
  191. }