main.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. mu "mfw/util"
  7. "mongodb"
  8. "net"
  9. nsq "nsq"
  10. "os"
  11. "os/signal"
  12. "qfw/util"
  13. "qfw/util/elastic"
  14. "strings"
  15. "syscall"
  16. "time"
  17. )
  18. var (
  19. udpclient mu.UdpClient //udp对象
  20. SingleThread = make(chan bool, 1)
  21. SingleClear = 0
  22. toaddr = []*net.UDPAddr{} //下节点对象
  23. ChSign = make(chan os.Signal)
  24. Es *elastic.Elastic
  25. Index string
  26. Itype string
  27. Mcmer *nsq.Consumer
  28. sid, eid string //测试使用
  29. UdpChan = make(chan map[string]interface{}, 500)
  30. )
  31. func init() {
  32. signal.Notify(ChSign)
  33. go DealSign()
  34. nextNode := util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  35. for _, m := range nextNode {
  36. toaddr = append(toaddr, &net.UDPAddr{
  37. IP: net.ParseIP(m["addr"].(string)),
  38. Port: util.IntAll(m["port"]),
  39. })
  40. }
  41. es := Sysconfig["es"].(map[string]interface{})
  42. Es = &elastic.Elastic{
  43. S_esurl: util.ObjToString(es["addr"]),
  44. I_size: util.IntAllDef(es["pool"], 10),
  45. }
  46. Index = util.ObjToString(es["index"])
  47. Itype = util.ObjToString(es["itype"])
  48. Es.InitElasticSize()
  49. P_QL = NewPT()
  50. go P_QL.updateAllQueue()
  51. go P_QL.clearMem()
  52. }
  53. var queryClose = make(chan bool)
  54. var queryCloseOver = make(chan bool)
  55. func DealSign() {
  56. for {
  57. select {
  58. case sign := <-ChSign:
  59. //log.Println("receive:", sign)
  60. if v, ok := sign.(syscall.Signal); ok && v == os.Interrupt {
  61. log.Println("receice signal..,start close iter")
  62. if P_QL.Brun {
  63. queryClose <- true
  64. select {
  65. case <-queryCloseOver:
  66. case <-time.After(30 * time.Second):
  67. }
  68. }
  69. util.ReadConfig(&Sysconfig)
  70. log.Println("signal deal over")
  71. }
  72. }
  73. }
  74. }
  75. func main() {
  76. P_QL.loadSpiderCode()
  77. P_QL.loadSite()
  78. if Sysconfig["loadStart"] != nil {
  79. loadStart := util.Int64All(Sysconfig["loadStart"])
  80. if loadStart > -1 {
  81. P_QL.loadData(loadStart)
  82. }
  83. }
  84. //go checkMapJob()
  85. go P_QL.nsqMethod()
  86. for {
  87. mapinfo, ok := <-UdpChan
  88. if !ok {
  89. continue
  90. }
  91. SingleThread <- true
  92. tasktype := util.ObjToString(mapinfo["stype"])
  93. switch tasktype {
  94. case "ql": //全量合并
  95. go func() {
  96. defer func() {
  97. <-SingleThread
  98. }()
  99. P_QL.currentType = tasktype
  100. P_QL.pici = time.Now().Unix()
  101. P_QL.taskQl(mapinfo)
  102. }()
  103. case "project": //增量合并,
  104. go func() {
  105. defer func() {
  106. <-SingleThread
  107. }()
  108. P_QL.currentType = tasktype
  109. P_QL.pici = time.Now().Unix()
  110. P_QL.taskZl(mapinfo)
  111. }()
  112. case "project_history": //增量合并, id段历史数据
  113. go func() {
  114. defer func() {
  115. <-SingleThread
  116. }()
  117. P_QL.currentType = tasktype
  118. P_QL.pici = time.Now().Unix()
  119. P_QL.taskZl(mapinfo)
  120. }()
  121. case "updateInfo": //招标字段变更
  122. go func() {
  123. defer func() {
  124. <-SingleThread
  125. }()
  126. P_QL.currentType = tasktype
  127. P_QL.pici = time.Now().Unix()
  128. P_QL.taskUpdateInfo(mapinfo)
  129. }()
  130. case "updatePro": //修改项目外围字段(只修改外围字段值)
  131. go func() {
  132. defer func() {
  133. <-SingleThread
  134. }()
  135. P_QL.currentType = tasktype
  136. P_QL.pici = time.Now().Unix()
  137. P_QL.taskUpdatePro(mapinfo)
  138. }()
  139. case "deleteInfo": // 删除招标公告
  140. go func() {
  141. defer func() {
  142. <-SingleThread
  143. }()
  144. P_QL.currentType = tasktype
  145. P_QL.pici = time.Now().Unix()
  146. P_QL.delInfoPro(mapinfo)
  147. }()
  148. case "spider": // 爬虫代码code、isflow
  149. go func() {
  150. defer func() {
  151. <-SingleThread
  152. }()
  153. go P_QL.loadSpiderCode()
  154. }()
  155. case "history": //历史数据合并,暂时不写
  156. go func() {
  157. defer func() {
  158. <-SingleThread
  159. }()
  160. }()
  161. default:
  162. <-SingleThread
  163. }
  164. }
  165. }
  166. func mainT() {
  167. sid = "623d1ed4923488e17244bea7"
  168. eid = "6327653467a6b0a286122df9"
  169. //flag.StringVar(&sid, "sid", "", "开始id")
  170. //flag.StringVar(&eid, "eid", "", "结束id")
  171. //flag.Parse()
  172. mapinfo := map[string]interface{}{}
  173. if sid == "" || eid == "" {
  174. log.Println("sid, eid参数不能为空")
  175. os.Exit(0)
  176. }
  177. mapinfo["gtid"] = sid
  178. mapinfo["lteid"] = eid
  179. mapinfo["stype"] = "ql"
  180. mapinfo["ip"] = "127.0.0.1"
  181. mapinfo["port"] = Sysconfig["udpport"]
  182. P_QL.loadSpiderCode()
  183. P_QL.loadSite()
  184. if Sysconfig["loadStart"] != nil {
  185. loadStart := util.Int64All(Sysconfig["loadStart"])
  186. if loadStart > -1 {
  187. P_QL.loadData(loadStart)
  188. }
  189. }
  190. P_QL.loadSite()
  191. P_QL.currentType = mapinfo["stype"].(string)
  192. P_QL.pici = time.Now().Unix()
  193. P_QL.taskQl(mapinfo)
  194. time.Sleep(99999 * time.Hour)
  195. }
  196. //udp调用信号
  197. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  198. switch act {
  199. case mu.OP_TYPE_DATA: //上个节点的数据
  200. var mapInfo map[string]interface{}
  201. err := json.Unmarshal(data, &mapInfo)
  202. log.Println("err:", err, "mapInfo:", mapInfo)
  203. if err != nil {
  204. _ = udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  205. } else if mapInfo != nil {
  206. key, _ := mapInfo["key"].(string)
  207. if key == "" {
  208. key = "udpok"
  209. }
  210. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  211. UdpChan <- mapInfo
  212. }
  213. case mu.OP_NOOP: //下个节点回应
  214. ok := string(data)
  215. if ok != "" {
  216. udptaskmap.Delete(ok)
  217. log.Println("ok:", ok)
  218. }
  219. }
  220. }
  221. // @Description nsq处理id不变,内容替换的竞品数据
  222. // @Author J 2022/8/10 11:40
  223. func (p *ProjectTask) nsqMethod() {
  224. cof := Sysconfig["nsq_id"].(map[string]interface{})
  225. var err error
  226. Mcmer, err = nsq.NewConsumer(&nsq.Cconfig{
  227. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  228. Addr: util.ObjToString(cof["addr"]),
  229. ConnectType: 0, //默认连接nsqd
  230. Topic: util.ObjToString(cof["topic"]),
  231. Channel: util.ObjToString(cof["channel"]),
  232. Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
  233. })
  234. if err != nil {
  235. util.Debug("nsqMethod err: ", err.Error())
  236. }
  237. for {
  238. select {
  239. case obj := <-Mcmer.Ch: //从通道读取即可
  240. util.Debug("project nsq: " + fmt.Sprint(obj))
  241. id := strings.Split(util.ObjToString(obj), "=")
  242. if mongodb.IsObjectIdHex(id[1]) {
  243. p.taskinfo(id[1])
  244. } else {
  245. util.Debug("jy nsq id err: ", id[1])
  246. }
  247. }
  248. }
  249. }