main.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "gopkg.in/mgo.v2/bson"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "net/http"
  10. "strings"
  11. "time"
  12. util "utils"
  13. "utils/nsq"
  14. "utils/udp"
  15. )
  16. var (
  17. Mcmer *gonsq.Consumer
  18. )
  19. func main() {
  20. // company_id
  21. //redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
  22. //inits()
  23. //go inspectQuery()
  24. //go checkMapJob()
  25. //go task_index()
  26. //go nsqMethod()
  27. go UpdateBidding()
  28. go UpdateExtract()
  29. go SaveEsMethod()
  30. go SaveAllEsMethod()
  31. go SaveElseEsMethod()
  32. go SaveProjectEs()
  33. updport := Sysconfig["udpport"].(string)
  34. udpclient = udp.UdpClient{Local: updport, BufSize: 1024}
  35. udpclient.Listen(processUdpMsg)
  36. util.Debug("Udp服务监听", updport)
  37. ch := make(chan bool, 1)
  38. <-ch
  39. }
  40. /**
  41. 检查es查询队列 10s查询一次
  42. */
  43. func inspectQuery() {
  44. ticker := time.NewTicker(time.Second * 10)
  45. url := esAddr + "/_nodes/stats/thread_pool"
  46. for range ticker.C {
  47. resp, _ := http.Get(url)
  48. if resp != nil && resp.Body != nil {
  49. defer resp.Body.Close()
  50. }
  51. body, _ := ioutil.ReadAll(resp.Body)
  52. respMap := make(map[string]interface{})
  53. err := json.Unmarshal(body, &respMap)
  54. if err == nil {
  55. if data, o1 := respMap["nodes"].(map[string]interface{}); o1 {
  56. if nodes, o2 := data[esNode].(map[string]interface{}); o2 {
  57. if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 {
  58. index, _ := pool["index"].(map[string]interface{})
  59. search, _ := pool["search"].(map[string]interface{})
  60. bulk, _ := pool["bulk"].(map[string]interface{})
  61. if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 {
  62. util.Debug("es thread_pool index queue---", index["queue"])
  63. util.Debug("es thread_pool search queue---", search["queue"])
  64. util.Debug("es thread_pool bulk queue---", bulk["queue"])
  65. StopFlag = true
  66. } else {
  67. StopFlag = false
  68. }
  69. }
  70. }
  71. }
  72. }
  73. }
  74. }
  75. var pool = make(chan bool, 20)
  76. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  77. switch act {
  78. case udp.OP_TYPE_DATA: //上个节点的数据
  79. //从表中开始处理生成企业数据
  80. var mapInfo map[string]interface{}
  81. err := json.Unmarshal(data, &mapInfo)
  82. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  83. if err != nil {
  84. udpclient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  85. } else if mapInfo != nil {
  86. key, _ := mapInfo["key"].(string)
  87. if key == "" {
  88. key = "udpok"
  89. }
  90. go udpclient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  91. tasktype, _ := mapInfo["stype"].(string)
  92. t := NewTk(mapInfo)
  93. switch tasktype {
  94. case "winner":
  95. pool <- true
  96. go func() {
  97. defer func() {
  98. <-pool
  99. }()
  100. winnerTask(data, mapInfo)
  101. }()
  102. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  103. pool <- true
  104. go func() {
  105. defer func() {
  106. <-pool
  107. }()
  108. t.thread = 1
  109. t.biddingTask(data, mapInfo)
  110. }()
  111. case "bidding_history": //增量id段历史数据
  112. pool <- true
  113. go func() {
  114. defer func() {
  115. <-pool
  116. }()
  117. t.thread = 1
  118. t.biddingTask(data, mapInfo)
  119. }()
  120. case "project":
  121. pool <- true
  122. go func() {
  123. defer func() {
  124. <-pool
  125. }()
  126. projectTask(data, project, mapInfo)
  127. }()
  128. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  129. pool <- true
  130. go func() {
  131. defer func() {
  132. <-pool
  133. }()
  134. t.thread = 30
  135. t.biddingBackTask(data, mapInfo)
  136. }()
  137. case "biddingall": //合并并重新生成索引,不生成关键词
  138. pool <- true
  139. go func() {
  140. defer func() {
  141. <-pool
  142. }()
  143. t.thread = 30
  144. t.biddingAllTask(data, mapInfo)
  145. }()
  146. case "biddingdata": //bidding全量数据
  147. pool <- true
  148. go func() {
  149. defer func() {
  150. <-pool
  151. }()
  152. t.thread = 30
  153. t.biddingDataTask(data, mapInfo)
  154. }()
  155. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  156. pool <- true
  157. go func() {
  158. defer func() {
  159. <-pool
  160. }()
  161. biddingMergeTask(data, mapInfo)
  162. }()
  163. case "buyer":
  164. pool <- true
  165. go func() {
  166. defer func() {
  167. <-pool
  168. }()
  169. buyerTask(data, mapInfo)
  170. }()
  171. case "winnerent": //标准库
  172. pool <- true
  173. go func() {
  174. defer func() {
  175. <-pool
  176. }()
  177. standardTask("winnerent", mapInfo)
  178. }()
  179. case "buyerent": //标准库
  180. pool <- true
  181. go func() {
  182. defer func() {
  183. <-pool
  184. }()
  185. standardTask("buyerent", mapInfo)
  186. }()
  187. case "agencyent": //标准库
  188. pool <- true
  189. go func() {
  190. defer func() {
  191. <-pool
  192. }()
  193. standardTask("agencyent", mapInfo)
  194. }()
  195. case "biddingdelbyextract": //根据repeat删除es
  196. pool <- true
  197. go func() {
  198. defer func() {
  199. <-pool
  200. }()
  201. biddingDelByExtract(data, mapInfo)
  202. }()
  203. case "biddingdelbyextracttype": //根据extracttype删除es
  204. pool <- true
  205. go func() {
  206. defer func() {
  207. <-pool
  208. }()
  209. biddingDelByExtracttype(data, mapInfo)
  210. }()
  211. default:
  212. pool <- true
  213. go func() {
  214. defer func() {
  215. <-pool
  216. }()
  217. util.Debug("err ---", mapInfo)
  218. }()
  219. }
  220. }
  221. case udp.OP_NOOP: //下个节点回应
  222. log.Println("发送成功", string(data))
  223. }
  224. }
  225. func SaveEsMethod() {
  226. arru := make([]map[string]interface{}, EsBulkSize)
  227. indexu := 0
  228. for {
  229. select {
  230. case v := <-saveEsPool:
  231. arru[indexu] = v
  232. indexu++
  233. if indexu == EsBulkSize {
  234. saveEsSp <- true
  235. go func(arru []map[string]interface{}) {
  236. defer func() {
  237. <-saveEsSp
  238. }()
  239. //Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  240. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  241. //if len(multiIndex) == 2 {
  242. // Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
  243. //}
  244. }(arru)
  245. arru = make([]map[string]interface{}, EsBulkSize)
  246. indexu = 0
  247. }
  248. case <-time.After(1000 * time.Millisecond):
  249. if indexu > 0 {
  250. saveEsSp <- true
  251. go func(arru []map[string]interface{}) {
  252. defer func() {
  253. <-saveEsSp
  254. }()
  255. //Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  256. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  257. //if len(multiIndex) == 2 {
  258. // Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
  259. //}
  260. }(arru[:indexu])
  261. arru = make([]map[string]interface{}, EsBulkSize)
  262. indexu = 0
  263. }
  264. }
  265. }
  266. }
  267. func SaveElseEsMethod() {
  268. arru := make([]map[string]interface{}, EsBulkSize)
  269. indexu := 0
  270. for {
  271. select {
  272. case v := <-saveEsElsePool:
  273. arru[indexu] = v
  274. indexu++
  275. if indexu == EsBulkSize {
  276. saveEsElseSp <- true
  277. go func(arru []map[string]interface{}) {
  278. defer func() {
  279. <-saveEsElseSp
  280. }()
  281. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  282. }(arru)
  283. arru = make([]map[string]interface{}, EsBulkSize)
  284. indexu = 0
  285. }
  286. case <-time.After(1000 * time.Millisecond):
  287. if indexu > 0 {
  288. saveEsElseSp <- true
  289. go func(arru []map[string]interface{}) {
  290. defer func() {
  291. <-saveEsElseSp
  292. }()
  293. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  294. }(arru[:indexu])
  295. arru = make([]map[string]interface{}, EsBulkSize)
  296. indexu = 0
  297. }
  298. }
  299. }
  300. }
  301. func SaveAllEsMethod() {
  302. arru := make([]map[string]interface{}, EsBulkSize)
  303. indexu := 0
  304. for {
  305. select {
  306. case v := <-saveEsAllPool:
  307. arru[indexu] = v
  308. indexu++
  309. if indexu == EsBulkSize {
  310. saveEsAllSp <- true
  311. go func(arru []map[string]interface{}) {
  312. defer func() {
  313. <-saveEsAllSp
  314. }()
  315. Es1.BulkSave("biddingall", "bidding", &arru, true)
  316. }(arru)
  317. arru = make([]map[string]interface{}, EsBulkSize)
  318. indexu = 0
  319. }
  320. case <-time.After(1000 * time.Millisecond):
  321. if indexu > 0 {
  322. saveEsAllSp <- true
  323. go func(arru []map[string]interface{}) {
  324. defer func() {
  325. <-saveEsAllSp
  326. }()
  327. Es1.BulkSave("biddingall", "bidding", &arru, true)
  328. }(arru[:indexu])
  329. arru = make([]map[string]interface{}, EsBulkSize)
  330. indexu = 0
  331. }
  332. }
  333. }
  334. }
  335. func SaveProjectEs() {
  336. arru := make([]map[string]interface{}, EsBulkSize)
  337. indexu := 0
  338. for {
  339. select {
  340. case v := <-saveProjectEsPool:
  341. arru[indexu] = v
  342. indexu++
  343. if indexu == EsBulkSize {
  344. saveProjectSp <- true
  345. go func(arru []map[string]interface{}) {
  346. defer func() {
  347. <-saveProjectSp
  348. }()
  349. //Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
  350. Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
  351. }(arru)
  352. arru = make([]map[string]interface{}, EsBulkSize)
  353. indexu = 0
  354. }
  355. case <-time.After(1000 * time.Millisecond):
  356. if indexu > 0 {
  357. saveProjectSp <- true
  358. go func(arru []map[string]interface{}) {
  359. defer func() {
  360. <-saveProjectSp
  361. }()
  362. //Es1.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
  363. Es2.BulkSave(util.ObjToString(project["index"]), util.ObjToString(project["type"]), &arru, true)
  364. }(arru[:indexu])
  365. arru = make([]map[string]interface{}, EsBulkSize)
  366. indexu = 0
  367. }
  368. }
  369. }
  370. }
  371. func UpdateBidding() {
  372. arru := make([][]map[string]interface{}, MgoBulkSize)
  373. indexu := 0
  374. for {
  375. select {
  376. case v := <-updateBiddingPool:
  377. arru[indexu] = v
  378. indexu++
  379. if indexu == MgoBulkSize {
  380. updateBiddingSp <- true
  381. go func(arru [][]map[string]interface{}) {
  382. defer func() {
  383. <-updateBiddingSp
  384. }()
  385. biddingMgo.UpdateBulk(currentColl, arru...)
  386. }(arru)
  387. arru = make([][]map[string]interface{}, MgoBulkSize)
  388. indexu = 0
  389. }
  390. case <-time.After(1000 * time.Millisecond):
  391. if indexu > 0 {
  392. updateBiddingSp <- true
  393. go func(arru [][]map[string]interface{}) {
  394. defer func() {
  395. <-updateBiddingSp
  396. }()
  397. biddingMgo.UpdateBulk(currentColl, arru...)
  398. }(arru[:indexu])
  399. arru = make([][]map[string]interface{}, MgoBulkSize)
  400. indexu = 0
  401. }
  402. }
  403. }
  404. }
  405. func UpdateExtract() {
  406. extract := util.ObjToString(extract["collect"])
  407. arru := make([][]map[string]interface{}, MgoBulkSize)
  408. indexu := 0
  409. for {
  410. select {
  411. case v := <-updateExtractPool:
  412. arru[indexu] = v
  413. indexu++
  414. if indexu == MgoBulkSize {
  415. updateExtractSp <- true
  416. go func(arru [][]map[string]interface{}) {
  417. defer func() {
  418. <-updateExtractSp
  419. }()
  420. extractMgo.UpdateBulk(extract, arru...)
  421. }(arru)
  422. arru = make([][]map[string]interface{}, MgoBulkSize)
  423. indexu = 0
  424. }
  425. case <-time.After(1000 * time.Millisecond):
  426. if indexu > 0 {
  427. updateExtractSp <- true
  428. go func(arru [][]map[string]interface{}) {
  429. defer func() {
  430. <-updateExtractSp
  431. }()
  432. extractMgo.UpdateBulk(extract, arru...)
  433. }(arru[:indexu])
  434. arru = make([][]map[string]interface{}, MgoBulkSize)
  435. indexu = 0
  436. }
  437. }
  438. }
  439. }
  440. // @Description nsq处理id不变,内容替换的竞品数据
  441. // @Author J 2022/8/10 11:40
  442. func nsqMethod() {
  443. cof := Sysconfig["nsq_id"].(map[string]interface{})
  444. var err error
  445. Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{
  446. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  447. Addr: util.ObjToString(cof["addr"]),
  448. ConnectType: 0, //默认连接nsqd
  449. Topic: util.ObjToString(cof["topic"]),
  450. Channel: util.ObjToString(cof["channel"]),
  451. Concurrent: util.IntAllDef(cof["concurrent"], 1), //并发数
  452. })
  453. if err != nil {
  454. util.Debug("nsqMethod err: ", err.Error())
  455. }
  456. for {
  457. select {
  458. case obj := <-Mcmer.Ch: //从通道读取即可
  459. util.Debug("index nsq: " + fmt.Sprint(obj))
  460. id := strings.Split(util.ObjToString(obj), "=")
  461. if bson.IsObjectIdHex(id[1]) {
  462. taskinfo(id[1])
  463. } else {
  464. util.Debug("jy nsq id err: ", id[1])
  465. }
  466. }
  467. }
  468. }