main.go 12 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "net/http"
  10. "strings"
  11. "time"
  12. util "utils"
  13. "utils/nsq"
  14. "utils/redis"
  15. "utils/udp"
  16. )
  17. var (
  18. Mcmer *gonsq.Consumer
  19. )
  20. func main() {
  21. // company_id
  22. redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
  23. inits()
  24. //go inspectQuery()
  25. go checkMapJob()
  26. go task_index()
  27. go nsqMethod()
  28. go UpdateBidding()
  29. go UpdateExtract()
  30. go SaveEsMethod()
  31. go SaveAllEsMethod()
  32. go SaveElseEsMethod()
  33. go SaveProjectEs()
  34. updport := Sysconfig["udpport"].(string)
  35. udpclient = udp.UdpClient{Local: updport, BufSize: 1024}
  36. udpclient.Listen(processUdpMsg)
  37. util.Debug("Udp服务监听", updport)
  38. ch := make(chan bool, 1)
  39. <-ch
  40. }
  41. /**
  42. 检查es查询队列 10s查询一次
  43. */
  44. func inspectQuery() {
  45. ticker := time.NewTicker(time.Second * 10)
  46. url := esAddr + "/_nodes/stats/thread_pool"
  47. for range ticker.C {
  48. resp, _ := http.Get(url)
  49. if resp != nil && resp.Body != nil {
  50. defer resp.Body.Close()
  51. }
  52. body, _ := ioutil.ReadAll(resp.Body)
  53. respMap := make(map[string]interface{})
  54. err := json.Unmarshal(body, &respMap)
  55. if err == nil {
  56. if data, o1 := respMap["nodes"].(map[string]interface{}); o1 {
  57. if nodes, o2 := data[esNode].(map[string]interface{}); o2 {
  58. if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 {
  59. index, _ := pool["index"].(map[string]interface{})
  60. search, _ := pool["search"].(map[string]interface{})
  61. bulk, _ := pool["bulk"].(map[string]interface{})
  62. if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 {
  63. util.Debug("es thread_pool index queue---", index["queue"])
  64. util.Debug("es thread_pool search queue---", search["queue"])
  65. util.Debug("es thread_pool bulk queue---", bulk["queue"])
  66. StopFlag = true
  67. } else {
  68. StopFlag = false
  69. }
  70. }
  71. }
  72. }
  73. }
  74. }
  75. }
  76. var pool = make(chan bool, 20)
  77. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  78. switch act {
  79. case udp.OP_TYPE_DATA: //上个节点的数据
  80. //从表中开始处理生成企业数据
  81. var mapInfo map[string]interface{}
  82. err := json.Unmarshal(data, &mapInfo)
  83. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  84. if err != nil {
  85. udpclient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  86. } else if mapInfo != nil {
  87. key, _ := mapInfo["key"].(string)
  88. if key == "" {
  89. key = "udpok"
  90. }
  91. go udpclient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  92. tasktype, _ := mapInfo["stype"].(string)
  93. t := NewTk(mapInfo)
  94. switch tasktype {
  95. case "winner":
  96. pool <- true
  97. go func() {
  98. defer func() {
  99. <-pool
  100. }()
  101. winnerTask(data, mapInfo)
  102. }()
  103. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  104. pool <- true
  105. go func() {
  106. defer func() {
  107. <-pool
  108. }()
  109. t.thread = 1
  110. t.biddingTask(data, mapInfo)
  111. }()
  112. case "bidding_history": //增量id段历史数据
  113. pool <- true
  114. go func() {
  115. defer func() {
  116. <-pool
  117. }()
  118. t.thread = 1
  119. t.biddingTask(data, mapInfo)
  120. }()
  121. case "project":
  122. pool <- true
  123. go func() {
  124. defer func() {
  125. <-pool
  126. }()
  127. projectTask(data, project, mapInfo)
  128. }()
  129. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  130. pool <- true
  131. go func() {
  132. defer func() {
  133. <-pool
  134. }()
  135. t.thread = 30
  136. t.biddingBackTask(data, mapInfo)
  137. }()
  138. case "biddingall": //合并并重新生成索引,不生成关键词
  139. pool <- true
  140. go func() {
  141. defer func() {
  142. <-pool
  143. }()
  144. t.thread = 30
  145. t.biddingAllTask(data, mapInfo)
  146. }()
  147. case "biddingdata": //bidding全量数据
  148. pool <- true
  149. go func() {
  150. defer func() {
  151. <-pool
  152. }()
  153. t.thread = 30
  154. t.biddingDataTask(data, mapInfo)
  155. }()
  156. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  157. pool <- true
  158. go func() {
  159. defer func() {
  160. <-pool
  161. }()
  162. biddingMergeTask(data, mapInfo)
  163. }()
  164. case "buyer":
  165. pool <- true
  166. go func() {
  167. defer func() {
  168. <-pool
  169. }()
  170. buyerTask(data, mapInfo)
  171. }()
  172. case "winnerent": //标准库
  173. pool <- true
  174. go func() {
  175. defer func() {
  176. <-pool
  177. }()
  178. standardTask("winnerent", mapInfo)
  179. }()
  180. case "buyerent": //标准库
  181. pool <- true
  182. go func() {
  183. defer func() {
  184. <-pool
  185. }()
  186. standardTask("buyerent", mapInfo)
  187. }()
  188. case "agencyent": //标准库
  189. pool <- true
  190. go func() {
  191. defer func() {
  192. <-pool
  193. }()
  194. standardTask("agencyent", mapInfo)
  195. }()
  196. case "biddingdelbyextract": //根据repeat删除es
  197. pool <- true
  198. go func() {
  199. defer func() {
  200. <-pool
  201. }()
  202. biddingDelByExtract(data, mapInfo)
  203. }()
  204. case "biddingdelbyextracttype": //根据extracttype删除es
  205. pool <- true
  206. go func() {
  207. defer func() {
  208. <-pool
  209. }()
  210. biddingDelByExtracttype(data, mapInfo)
  211. }()
  212. default:
  213. pool <- true
  214. go func() {
  215. defer func() {
  216. <-pool
  217. }()
  218. util.Debug("err ---", mapInfo)
  219. }()
  220. }
  221. }
  222. case udp.OP_NOOP: //下个节点回应
  223. log.Println("发送成功", string(data))
  224. }
  225. }
  226. func SaveEsMethod() {
  227. arru := make([]map[string]interface{}, EsBulkSize)
  228. indexu := 0
  229. for {
  230. select {
  231. case v := <-saveEsPool:
  232. arru[indexu] = v
  233. indexu++
  234. if indexu == EsBulkSize {
  235. saveEsSp <- true
  236. go func(arru []map[string]interface{}) {
  237. defer func() {
  238. <-saveEsSp
  239. }()
  240. //Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  241. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  242. //if len(multiIndex) == 2 {
  243. // Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
  244. //}
  245. }(arru)
  246. arru = make([]map[string]interface{}, EsBulkSize)
  247. indexu = 0
  248. }
  249. case <-time.After(1000 * time.Millisecond):
  250. if indexu > 0 {
  251. saveEsSp <- true
  252. go func(arru []map[string]interface{}) {
  253. defer func() {
  254. <-saveEsSp
  255. }()
  256. //Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  257. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  258. //if len(multiIndex) == 2 {
  259. // Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
  260. //}
  261. }(arru[:indexu])
  262. arru = make([]map[string]interface{}, EsBulkSize)
  263. indexu = 0
  264. }
  265. }
  266. }
  267. }
  268. func SaveElseEsMethod() {
  269. arru := make([]map[string]interface{}, EsBulkSize)
  270. indexu := 0
  271. for {
  272. select {
  273. case v := <-saveEsElsePool:
  274. arru[indexu] = v
  275. indexu++
  276. if indexu == EsBulkSize {
  277. saveEsElseSp <- true
  278. go func(arru []map[string]interface{}) {
  279. defer func() {
  280. <-saveEsElseSp
  281. }()
  282. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  283. }(arru)
  284. arru = make([]map[string]interface{}, EsBulkSize)
  285. indexu = 0
  286. }
  287. case <-time.After(1000 * time.Millisecond):
  288. if indexu > 0 {
  289. saveEsElseSp <- true
  290. go func(arru []map[string]interface{}) {
  291. defer func() {
  292. <-saveEsElseSp
  293. }()
  294. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  295. }(arru[:indexu])
  296. arru = make([]map[string]interface{}, EsBulkSize)
  297. indexu = 0
  298. }
  299. }
  300. }
  301. }
  302. func SaveAllEsMethod() {
  303. arru := make([]map[string]interface{}, EsBulkSize)
  304. indexu := 0
  305. for {
  306. select {
  307. case v := <-saveEsAllPool:
  308. arru[indexu] = v
  309. indexu++
  310. if indexu == EsBulkSize {
  311. saveEsAllSp <- true
  312. go func(arru []map[string]interface{}) {
  313. defer func() {
  314. <-saveEsAllSp
  315. }()
  316. Es1.BulkSave("biddingall", "bidding", &arru, true)
  317. }(arru)
  318. arru = make([]map[string]interface{}, EsBulkSize)
  319. indexu = 0
  320. }
  321. case <-time.After(1000 * time.Millisecond):
  322. if indexu > 0 {
  323. saveEsAllSp <- true
  324. go func(arru []map[string]interface{}) {
  325. defer func() {
  326. <-saveEsAllSp
  327. }()
  328. Es1.BulkSave("biddingall", "bidding", &arru, true)
  329. }(arru[:indexu])
  330. arru = make([]map[string]interface{}, EsBulkSize)
  331. indexu = 0
  332. }
  333. }
  334. }
  335. }
  336. func SaveProjectEs() {
  337. arru := make([]map[string]interface{}, EsBulkSize)
  338. indexu := 0
  339. for {
  340. select {
  341. case v := <-saveProjectEsPool:
  342. arru[indexu] = v
  343. indexu++
  344. if indexu == EsBulkSize {
  345. saveProjectSp <- true
  346. go func(arru []map[string]interface{}) {
  347. defer func() {
  348. <-saveProjectSp
  349. }()
  350. //Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
  351. Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
  352. }(arru)
  353. arru = make([]map[string]interface{}, EsBulkSize)
  354. indexu = 0
  355. }
  356. case <-time.After(1000 * time.Millisecond):
  357. if indexu > 0 {
  358. saveProjectSp <- true
  359. go func(arru []map[string]interface{}) {
  360. defer func() {
  361. <-saveProjectSp
  362. }()
  363. //Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
  364. Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
  365. }(arru[:indexu])
  366. arru = make([]map[string]interface{}, EsBulkSize)
  367. indexu = 0
  368. }
  369. }
  370. }
  371. }
  372. func UpdateBidding() {
  373. arru := make([][]map[string]interface{}, MgoBulkSize)
  374. indexu := 0
  375. for {
  376. select {
  377. case v := <-updateBiddingPool:
  378. arru[indexu] = v
  379. indexu++
  380. if indexu == MgoBulkSize {
  381. updateBiddingSp <- true
  382. go func(arru [][]map[string]interface{}) {
  383. defer func() {
  384. <-updateBiddingSp
  385. }()
  386. biddingMgo.UpdateBulk(currentColl, arru...)
  387. }(arru)
  388. arru = make([][]map[string]interface{}, MgoBulkSize)
  389. indexu = 0
  390. }
  391. case <-time.After(1000 * time.Millisecond):
  392. if indexu > 0 {
  393. updateBiddingSp <- true
  394. go func(arru [][]map[string]interface{}) {
  395. defer func() {
  396. <-updateBiddingSp
  397. }()
  398. biddingMgo.UpdateBulk(currentColl, arru...)
  399. }(arru[:indexu])
  400. arru = make([][]map[string]interface{}, MgoBulkSize)
  401. indexu = 0
  402. }
  403. }
  404. }
  405. }
  406. func UpdateExtract() {
  407. extract := util.ObjToString(extract["collect"])
  408. arru := make([][]map[string]interface{}, MgoBulkSize)
  409. indexu := 0
  410. for {
  411. select {
  412. case v := <-updateExtractPool:
  413. arru[indexu] = v
  414. indexu++
  415. if indexu == MgoBulkSize {
  416. updateExtractSp <- true
  417. go func(arru [][]map[string]interface{}) {
  418. defer func() {
  419. <-updateExtractSp
  420. }()
  421. extractMgo.UpdateBulk(extract, arru...)
  422. }(arru)
  423. arru = make([][]map[string]interface{}, MgoBulkSize)
  424. indexu = 0
  425. }
  426. case <-time.After(1000 * time.Millisecond):
  427. if indexu > 0 {
  428. updateExtractSp <- true
  429. go func(arru [][]map[string]interface{}) {
  430. defer func() {
  431. <-updateExtractSp
  432. }()
  433. extractMgo.UpdateBulk(extract, arru...)
  434. }(arru[:indexu])
  435. arru = make([][]map[string]interface{}, MgoBulkSize)
  436. indexu = 0
  437. }
  438. }
  439. }
  440. }
  441. // @Description nsq处理id不变,内容替换的竞品数据
  442. // @Author J 2022/8/10 11:40
  443. func nsqMethod() {
  444. cof := Sysconfig["nsq_id"].(map[string]interface{})
  445. var err error
  446. Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{
  447. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  448. Addr: util.ObjToString(cof["addr"]),
  449. ConnectType: 0, //默认连接nsqd
  450. Topic: util.ObjToString(cof["topic"]),
  451. Channel: util.ObjToString(cof["channel"]),
  452. Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
  453. })
  454. if err != nil {
  455. util.Debug("nsqMethod err: ", err.Error())
  456. }
  457. for {
  458. select {
  459. case obj := <-Mcmer.Ch: //从通道读取即可
  460. util.Debug("index nsq: " + fmt.Sprint(obj))
  461. id := strings.Split(util.ObjToString(obj), "=")
  462. if bson.IsObjectIdHex(id[1]) {
  463. taskinfo(id[1])
  464. } else {
  465. util.Debug("jy nsq id err: ", id[1])
  466. }
  467. }
  468. }
  469. }