main.go 5.8 KB


  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "app.yhyue.com/data_processing/common_utils/nsq"
  8. "app.yhyue.com/data_processing/common_utils/udp"
  9. "encoding/json"
  10. "fmt"
  11. "go.uber.org/zap"
  12. "net"
  13. "os"
  14. "os/signal"
  15. "project/config"
  16. "strings"
  17. "syscall"
  18. "time"
  19. )
  20. var (
  21. udpclient udp.UdpClient //udp对象
  22. SingleThread = make(chan bool, 1)
  23. SingleClear = 0
  24. toaddr = []*net.UDPAddr{} //下节点对象
  25. ChSign = make(chan os.Signal)
  26. Es *elastic.Elastic
  27. Mcmer *gonsq.Consumer
  28. sid, eid string //测试使用
  29. UdpChan = make(chan map[string]interface{}, 500)
  30. )
  31. func init() {
  32. signal.Notify(ChSign)
  33. go DealSign()
  34. for _, m := range config.Conf.UdpNode {
  35. toaddr = append(toaddr, &net.UDPAddr{
  36. IP: net.ParseIP(m.Addr),
  37. Port: util.IntAll(m.Port),
  38. })
  39. }
  40. Es = &elastic.Elastic{
  41. S_esurl: config.Conf.DB.Es.Addr,
  42. I_size: config.Conf.DB.Es.Size,
  43. Username: config.Conf.DB.Es.User,
  44. Password: config.Conf.DB.Es.Password,
  45. }
  46. Es.InitElasticSize()
  47. P_QL = NewPT()
  48. go P_QL.updateAllQueue()
  49. go P_QL.clearMem()
  50. }
  51. var queryClose = make(chan bool)
  52. var queryCloseOver = make(chan bool)
  53. func DealSign() {
  54. for {
  55. select {
  56. case sign := <-ChSign:
  57. //log.Println("receive:", sign)
  58. if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt {
  59. log.Info("receice signal..,start close iter")
  60. if P_QL.Brun {
  61. queryClose <- true
  62. select {
  63. case <-queryCloseOver:
  64. case <-time.After(30 * time.Second):
  65. }
  66. }
  67. log.Info("signal deal over")
  68. }
  69. }
  70. }
  71. }
  72. func main() {
  73. P_QL.loadSpiderCode()
  74. P_QL.loadSite()
  75. if config.Conf.Serve.LoadStart > 0 {
  76. P_QL.loadData(config.Conf.Serve.LoadStart)
  77. }
  78. go checkMapJob()
  79. go P_QL.nsqMethod()
  80. for {
  81. mapinfo, ok := <-UdpChan
  82. if !ok {
  83. continue
  84. }
  85. SingleThread <- true
  86. tasktype := util.ObjToString(mapinfo["stype"])
  87. switch tasktype {
  88. case "ql": //全量合并
  89. go func() {
  90. defer func() {
  91. <-SingleThread
  92. }()
  93. P_QL.currentType = tasktype
  94. P_QL.pici = time.Now().Unix()
  95. P_QL.taskQl(mapinfo)
  96. }()
  97. case "project": //增量合并,
  98. go func() {
  99. defer func() {
  100. <-SingleThread
  101. }()
  102. P_QL.currentType = tasktype
  103. P_QL.pici = time.Now().Unix()
  104. P_QL.taskZl(mapinfo)
  105. }()
  106. case "project_history": //增量合并, id段历史数据
  107. go func() {
  108. defer func() {
  109. <-SingleThread
  110. }()
  111. P_QL.currentType = tasktype
  112. P_QL.pici = time.Now().Unix()
  113. P_QL.taskZl(mapinfo)
  114. }()
  115. case "updateInfo": //招标字段变更
  116. go func() {
  117. defer func() {
  118. <-SingleThread
  119. }()
  120. P_QL.currentType = tasktype
  121. P_QL.pici = time.Now().Unix()
  122. P_QL.taskUpdateInfo(mapinfo)
  123. }()
  124. case "updatePro": //修改项目外围字段(只修改外围字段值)
  125. go func() {
  126. defer func() {
  127. <-SingleThread
  128. }()
  129. P_QL.currentType = tasktype
  130. P_QL.pici = time.Now().Unix()
  131. P_QL.taskUpdatePro(mapinfo)
  132. }()
  133. case "deleteInfo": // 删除招标公告
  134. go func() {
  135. defer func() {
  136. <-SingleThread
  137. }()
  138. P_QL.currentType = tasktype
  139. P_QL.pici = time.Now().Unix()
  140. P_QL.delInfoPro(mapinfo)
  141. }()
  142. case "spider": // 爬虫代码code、isflow
  143. go func() {
  144. defer func() {
  145. <-SingleThread
  146. }()
  147. go P_QL.loadSpiderCode()
  148. }()
  149. case "history": //历史数据合并,暂时不写
  150. go func() {
  151. defer func() {
  152. <-SingleThread
  153. }()
  154. }()
  155. default:
  156. <-SingleThread
  157. }
  158. }
  159. }
  160. func mainT() {
  161. sid = "623d1ed4923488e17244bea7"
  162. eid = "6327653467a6b0a286122df9"
  163. //flag.StringVar(&sid, "sid", "", "开始id")
  164. //flag.StringVar(&eid, "eid", "", "结束id")
  165. //flag.Parse()
  166. mapinfo := map[string]interface{}{}
  167. if sid == "" || eid == "" {
  168. log.Info("sid, eid参数不能为空")
  169. os.Exit(0)
  170. }
  171. mapinfo["gtid"] = sid
  172. mapinfo["lteid"] = eid
  173. mapinfo["stype"] = "ql"
  174. mapinfo["ip"] = "127.0.0.1"
  175. mapinfo["port"] = "1782"
  176. P_QL.loadSpiderCode()
  177. P_QL.loadSite()
  178. if config.Conf.Serve.LoadStart > 0 {
  179. P_QL.loadData(config.Conf.Serve.LoadStart)
  180. }
  181. P_QL.loadSite()
  182. P_QL.currentType = mapinfo["stype"].(string)
  183. P_QL.pici = time.Now().Unix()
  184. P_QL.taskQl(mapinfo)
  185. time.Sleep(99999 * time.Hour)
  186. }
  187. //udp调用信号
  188. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  189. switch act {
  190. case udp.OP_TYPE_DATA: //上个节点的数据
  191. var mapInfo map[string]interface{}
  192. err := json.Unmarshal(data, &mapInfo)
  193. log.Info("udp---", zap.Any("mapInfo:", mapInfo))
  194. if err != nil {
  195. _ = udpclient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  196. } else if mapInfo != nil {
  197. key, _ := mapInfo["key"].(string)
  198. if key == "" {
  199. key = "udpok"
  200. }
  201. go udpclient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  202. UdpChan <- mapInfo
  203. }
  204. case udp.OP_NOOP: //下个节点回应
  205. ok := string(data)
  206. if ok != "" {
  207. udptaskmap.Delete(ok)
  208. log.Info("ok:" + ok)
  209. }
  210. }
  211. }
  212. // @Description nsq处理id不变,内容替换的竞品数据
  213. // @Author J 2022/8/10 11:40
  214. func (p *ProjectTask) nsqMethod() {
  215. var err error
  216. Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{
  217. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  218. Addr: config.Conf.Nsq.Addr,
  219. ConnectType: 0, //默认连接nsqd
  220. Topic: config.Conf.Nsq.Topic,
  221. Channel: config.Conf.Nsq.Channel,
  222. Concurrent: config.Conf.Nsq.Concurrent, //并发数
  223. })
  224. if err != nil {
  225. log.Info("nsqMethod err: " + err.Error())
  226. }
  227. for {
  228. select {
  229. case obj := <-Mcmer.Ch: //从通道读取即可
  230. log.Info("project nsq: " + fmt.Sprint(obj))
  231. id := strings.Split(util.ObjToString(obj), "=")
  232. if mongodb.IsObjectIdHex(id[1]) {
  233. p.taskinfo(id[1])
  234. } else {
  235. log.Info("jy nsq id err: " + id[1])
  236. }
  237. }
  238. }
  239. }