main.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package main
  2. import (
  3. "encoding/json"
  4. "go.uber.org/zap"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. gonsq "jygit.jydev.jianyu360.cn/data_processing/common_utils/nsq"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  11. "net"
  12. "os"
  13. "os/signal"
  14. "project/config"
  15. "strings"
  16. "syscall"
  17. "time"
  18. )
  19. var (
  20. udpclient udp.UdpClient //udp对象
  21. SingleThread = make(chan bool, 1)
  22. SingleClear = 0
  23. toaddr = []*net.UDPAddr{} //下节点对象
  24. ChSign = make(chan os.Signal)
  25. Es *elastic.Elastic
  26. Mcmer *gonsq.Consumer
  27. sid, eid string //测试使用
  28. UdpChan = make(chan map[string]interface{}, 500)
  29. )
  30. func init() {
  31. signal.Notify(ChSign)
  32. go DealSign()
  33. for _, m := range config.Conf.UdpNode {
  34. toaddr = append(toaddr, &net.UDPAddr{
  35. IP: net.ParseIP(m.Addr),
  36. Port: util.IntAll(m.Port),
  37. })
  38. }
  39. Es = &elastic.Elastic{
  40. S_esurl: config.Conf.DB.Es.Addr,
  41. I_size: config.Conf.DB.Es.Size,
  42. Username: config.Conf.DB.Es.User,
  43. Password: config.Conf.DB.Es.Password,
  44. }
  45. Es.InitElasticSize()
  46. P_QL = NewPT()
  47. go P_QL.updateAllQueue()
  48. go P_QL.clearMem()
  49. }
  50. var queryClose = make(chan bool)
  51. var queryCloseOver = make(chan bool)
  52. func DealSign() {
  53. for {
  54. select {
  55. case sign := <-ChSign:
  56. //log.Println("receive:", sign)
  57. if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt {
  58. log.Info("receice signal..,start close iter")
  59. if P_QL.Brun {
  60. queryClose <- true
  61. select {
  62. case <-queryCloseOver:
  63. case <-time.After(30 * time.Second):
  64. }
  65. }
  66. log.Info("signal deal over")
  67. }
  68. }
  69. }
  70. }
  71. func mainT() {
  72. P_QL.loadSpiderCode()
  73. P_QL.loadSite()
  74. if config.Conf.Serve.LoadStart > 0 {
  75. P_QL.loadData(config.Conf.Serve.LoadStart)
  76. }
  77. //go checkMapJob()
  78. //go P_QL.nsqMethod()
  79. for {
  80. mapinfo, ok := <-UdpChan
  81. if !ok {
  82. continue
  83. }
  84. tasktype := util.ObjToString(mapinfo["stype"])
  85. switch tasktype {
  86. case "ql": //全量合并
  87. go func() {
  88. defer func() {
  89. <-SingleThread
  90. }()
  91. SingleThread <- true
  92. P_QL.currentType = tasktype
  93. P_QL.pici = time.Now().Unix()
  94. P_QL.taskQl(mapinfo)
  95. }()
  96. case "project": //增量合并,
  97. go func() {
  98. defer func() {
  99. <-SingleThread
  100. }()
  101. SingleThread <- true
  102. P_QL.currentType = tasktype
  103. P_QL.pici = time.Now().Unix()
  104. P_QL.taskZl(mapinfo)
  105. }()
  106. case "project_history": //增量合并, id段历史数据
  107. go func() {
  108. defer func() {
  109. <-SingleThread
  110. }()
  111. SingleThread <- true
  112. P_QL.currentType = tasktype
  113. P_QL.pici = time.Now().Unix()
  114. P_QL.taskZl(mapinfo)
  115. }()
  116. case "updateInfo": //招标字段变更
  117. go func() {
  118. defer func() {
  119. }()
  120. P_QL.currentType = tasktype
  121. P_QL.pici = time.Now().Unix()
  122. P_QL.taskUpdateInfo(mapinfo)
  123. }()
  124. case "updatePro": //修改项目外围字段(只修改外围字段值)
  125. go func() {
  126. defer func() {
  127. }()
  128. P_QL.currentType = tasktype
  129. P_QL.pici = time.Now().Unix()
  130. P_QL.taskUpdatePro(mapinfo)
  131. }()
  132. case "deleteInfo": // 删除招标公告
  133. go func() {
  134. defer func() {
  135. }()
  136. P_QL.currentType = tasktype
  137. P_QL.pici = time.Now().Unix()
  138. P_QL.delInfoPro(mapinfo)
  139. }()
  140. case "spider": // 爬虫代码code、isflow
  141. go func() {
  142. defer func() {
  143. }()
  144. go P_QL.loadSpiderCode()
  145. }()
  146. case "history": //历史数据合并,暂时不写
  147. go func() {
  148. defer func() {
  149. }()
  150. }()
  151. default:
  152. <-SingleThread
  153. }
  154. }
  155. }
  156. func main() {
  157. sid = "626cccaa631ff1ac3d29289e"
  158. eid = "640aa55d8aea8786d1cd0247"
  159. //flag.StringVar(&sid, "sid", "", "开始id")
  160. //flag.StringVar(&eid, "eid", "", "结束id")
  161. //flag.Parse()
  162. mapinfo := map[string]interface{}{}
  163. if sid == "" || eid == "" {
  164. log.Info("sid, eid参数不能为空")
  165. os.Exit(0)
  166. }
  167. mapinfo["gtid"] = sid
  168. mapinfo["lteid"] = eid
  169. mapinfo["stype"] = "project"
  170. mapinfo["ip"] = "127.0.0.1"
  171. mapinfo["port"] = "1782"
  172. P_QL.loadSpiderCode()
  173. P_QL.loadSite()
  174. if config.Conf.Serve.LoadStart > 0 {
  175. P_QL.loadData(config.Conf.Serve.LoadStart)
  176. }
  177. P_QL.loadSite()
  178. P_QL.currentType = mapinfo["stype"].(string)
  179. P_QL.pici = time.Now().Unix()
  180. P_QL.taskQl(mapinfo)
  181. time.Sleep(99999 * time.Hour)
  182. }
  183. // udp调用信号
  184. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  185. switch act {
  186. case udp.OP_TYPE_DATA: //上个节点的数据
  187. var mapInfo map[string]interface{}
  188. err := json.Unmarshal(data, &mapInfo)
  189. log.Info("udp---", zap.Any("mapInfo:", mapInfo))
  190. if err != nil {
  191. _ = udpclient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  192. } else if mapInfo != nil {
  193. key, _ := mapInfo["key"].(string)
  194. if key == "" {
  195. key = "udpok"
  196. }
  197. go udpclient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  198. if util.ObjToString(mapInfo["stype"]) != "monitor" {
  199. UdpChan <- mapInfo
  200. }
  201. }
  202. case udp.OP_NOOP: //下个节点回应
  203. ok := string(data)
  204. if ok != "" {
  205. udptaskmap.Delete(ok)
  206. log.Info("ok:" + ok)
  207. }
  208. }
  209. }
  210. // @Description nsq处理id不变,内容替换的竞品数据
  211. // @Author J 2022/8/10 11:40
  212. func (p *ProjectTask) nsqMethod() {
  213. var err error
  214. Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{
  215. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  216. Addr: config.Conf.Nsq.Addr,
  217. ConnectType: 0, //默认连接nsqd
  218. Topic: config.Conf.Nsq.Topic,
  219. Channel: config.Conf.Nsq.Channel,
  220. Concurrent: config.Conf.Nsq.Concurrent, //并发数
  221. })
  222. if err != nil {
  223. log.Info("nsqMethod err: " + err.Error())
  224. }
  225. for {
  226. select {
  227. case obj := <-Mcmer.Ch: //从通道读取即可
  228. id := strings.Split(util.ObjToString(obj), "=")
  229. if mongodb.IsObjectIdHex(id[1]) {
  230. p.taskinfo(id[1])
  231. } else {
  232. log.Info("jy nsq id err: " + id[1])
  233. }
  234. }
  235. }
  236. }