main.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. package main
  2. import (
  3. "encoding/json"
  4. "io/ioutil"
  5. "log"
  6. mu "mfw/util"
  7. "net"
  8. "net/http"
  9. "qfw/util"
  10. "qfw/util/redis"
  11. "time"
  12. )
  13. func main() {
  14. // company_id
  15. redis.InitRedis1("qyxy_id=172.17.4.189:8379", 4)
  16. inits()
  17. //go inspectQuery()
  18. go checkMapJob()
  19. go task_index()
  20. go UpdateBidding()
  21. go UpdateExtract()
  22. go SaveEsMethod()
  23. go SaveAllEsMethod()
  24. go SaveElseEsMethod()
  25. updport := Sysconfig["udpport"].(string)
  26. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  27. udpclient.Listen(processUdpMsg)
  28. util.Debug("Udp服务监听", updport)
  29. ch := make(chan bool, 1)
  30. <-ch
  31. }
  32. /**
  33. 检查es查询队列 10s查询一次
  34. */
  35. func inspectQuery() {
  36. ticker := time.NewTicker(time.Second * 10)
  37. url := esAddr + "/_nodes/stats/thread_pool"
  38. for range ticker.C {
  39. resp, _ := http.Get(url)
  40. if resp != nil && resp.Body != nil {
  41. defer resp.Body.Close()
  42. }
  43. body, _ := ioutil.ReadAll(resp.Body)
  44. respMap := make(map[string]interface{})
  45. err := json.Unmarshal(body, &respMap)
  46. if err == nil {
  47. if data, o1 := respMap["nodes"].(map[string]interface{}); o1 {
  48. if nodes, o2 := data[esNode].(map[string]interface{}); o2 {
  49. if pool, o3 := nodes["thread_pool"].(map[string]interface{}); o3 {
  50. index, _ := pool["index"].(map[string]interface{})
  51. search, _ := pool["search"].(map[string]interface{})
  52. bulk, _ := pool["bulk"].(map[string]interface{})
  53. if util.IntAll(index["queue"]) > 0 || util.IntAll(search["queue"]) > 0 || util.IntAll(bulk["queue"]) > 0 {
  54. util.Debug("es thread_pool index queue---", index["queue"])
  55. util.Debug("es thread_pool search queue---", search["queue"])
  56. util.Debug("es thread_pool bulk queue---", bulk["queue"])
  57. StopFlag = true
  58. } else {
  59. StopFlag = false
  60. }
  61. }
  62. }
  63. }
  64. }
  65. }
  66. }
  67. var pool = make(chan bool, 20)
  68. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  69. switch act {
  70. case mu.OP_TYPE_DATA: //上个节点的数据
  71. //从表中开始处理生成企业数据
  72. var mapInfo map[string]interface{}
  73. err := json.Unmarshal(data, &mapInfo)
  74. log.Println("err:", err, "mapInfo:", mapInfo, string(data))
  75. if err != nil {
  76. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  77. } else if mapInfo != nil {
  78. key, _ := mapInfo["key"].(string)
  79. if key == "" {
  80. key = "udpok"
  81. }
  82. go udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  83. tasktype, _ := mapInfo["stype"].(string)
  84. t := NewTk(mapInfo)
  85. switch tasktype {
  86. case "winner":
  87. pool <- true
  88. go func() {
  89. defer func() {
  90. <-pool
  91. }()
  92. winnerTask(data, mapInfo)
  93. }()
  94. case "bidding": //实时+udp调用,可选择是否生成关键词, 一次性最大20万
  95. pool <- true
  96. go func() {
  97. defer func() {
  98. <-pool
  99. }()
  100. t.thread = 1
  101. t.biddingTask(data, mapInfo)
  102. }()
  103. case "bidding_history": //增量id段历史数据
  104. pool <- true
  105. go func() {
  106. defer func() {
  107. <-pool
  108. }()
  109. t.thread = 1
  110. t.biddingTask(data, mapInfo)
  111. }()
  112. case "project":
  113. pool <- true
  114. go func() {
  115. defer func() {
  116. <-pool
  117. }()
  118. projectTask(data, project, mapInfo)
  119. }()
  120. case "biddingback": //不联表,使用bidding表直接调用mongo库生成索引
  121. pool <- true
  122. go func() {
  123. defer func() {
  124. <-pool
  125. }()
  126. t.thread = 30
  127. t.biddingBackTask(data, mapInfo)
  128. }()
  129. case "biddingall": //合并并重新生成索引,不生成关键词
  130. pool <- true
  131. go func() {
  132. defer func() {
  133. <-pool
  134. }()
  135. t.thread = 30
  136. t.biddingAllTask(data, mapInfo)
  137. }()
  138. case "biddingdata": //bidding全量数据
  139. pool <- true
  140. go func() {
  141. defer func() {
  142. <-pool
  143. }()
  144. t.thread = 30
  145. t.biddingDataTask(data, mapInfo)
  146. }()
  147. case "biddingmerge": //重新合并但不生成索引,不生成关键词
  148. pool <- true
  149. go func() {
  150. defer func() {
  151. <-pool
  152. }()
  153. biddingMergeTask(data, mapInfo)
  154. }()
  155. case "buyer":
  156. pool <- true
  157. go func() {
  158. defer func() {
  159. <-pool
  160. }()
  161. buyerTask(data, mapInfo)
  162. }()
  163. case "winnerent": //标准库
  164. pool <- true
  165. go func() {
  166. defer func() {
  167. <-pool
  168. }()
  169. standardTask("winnerent", mapInfo)
  170. }()
  171. case "buyerent": //标准库
  172. pool <- true
  173. go func() {
  174. defer func() {
  175. <-pool
  176. }()
  177. standardTask("buyerent", mapInfo)
  178. }()
  179. case "agencyent": //标准库
  180. pool <- true
  181. go func() {
  182. defer func() {
  183. <-pool
  184. }()
  185. standardTask("agencyent", mapInfo)
  186. }()
  187. case "biddingdelbyextract": //根据repeat删除es
  188. pool <- true
  189. go func() {
  190. defer func() {
  191. <-pool
  192. }()
  193. biddingDelByExtract(data, mapInfo)
  194. }()
  195. case "biddingdelbyextracttype": //根据extracttype删除es
  196. pool <- true
  197. go func() {
  198. defer func() {
  199. <-pool
  200. }()
  201. biddingDelByExtracttype(data, mapInfo)
  202. }()
  203. default:
  204. pool <- true
  205. go func() {
  206. defer func() {
  207. <-pool
  208. }()
  209. util.Debug("err ---", mapInfo)
  210. }()
  211. }
  212. }
  213. case mu.OP_NOOP: //下个节点回应
  214. log.Println("发送成功", string(data))
  215. }
  216. }
  217. func SaveEsMethod() {
  218. arru := make([]map[string]interface{}, EsBulkSize)
  219. indexu := 0
  220. for {
  221. select {
  222. case v := <-saveEsPool:
  223. arru[indexu] = v
  224. indexu++
  225. if indexu == 200 {
  226. saveEsSp <- true
  227. go func(arru []map[string]interface{}) {
  228. defer func() {
  229. <-saveEsSp
  230. }()
  231. Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  232. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  233. if len(multiIndex) == 2 {
  234. Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
  235. }
  236. }(arru)
  237. arru = make([]map[string]interface{}, EsBulkSize)
  238. indexu = 0
  239. }
  240. case <-time.After(1000 * time.Millisecond):
  241. if indexu > 0 {
  242. saveEsSp <- true
  243. go func(arru []map[string]interface{}) {
  244. defer func() {
  245. <-saveEsSp
  246. }()
  247. Es1.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  248. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  249. if len(multiIndex) == 2 {
  250. Es1.BulkSave(multiIndex[0], multiIndex[1], &arru, true)
  251. }
  252. }(arru[:indexu])
  253. arru = make([]map[string]interface{}, EsBulkSize)
  254. indexu = 0
  255. }
  256. }
  257. }
  258. }
  259. func SaveElseEsMethod() {
  260. arru := make([]map[string]interface{}, EsBulkSize)
  261. indexu := 0
  262. for {
  263. select {
  264. case v := <-saveEsElsePool:
  265. arru[indexu] = v
  266. indexu++
  267. if indexu == 200 {
  268. saveEsElseSp <- true
  269. go func(arru []map[string]interface{}) {
  270. defer func() {
  271. <-saveEsElseSp
  272. }()
  273. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  274. }(arru)
  275. arru = make([]map[string]interface{}, EsBulkSize)
  276. indexu = 0
  277. }
  278. case <-time.After(1000 * time.Millisecond):
  279. if indexu > 0 {
  280. saveEsElseSp <- true
  281. go func(arru []map[string]interface{}) {
  282. defer func() {
  283. <-saveEsElseSp
  284. }()
  285. Es2.BulkSave(util.ObjToString(biddingIndex["index"]), util.ObjToString(biddingIndex["type"]), &arru, true)
  286. }(arru[:indexu])
  287. arru = make([]map[string]interface{}, EsBulkSize)
  288. indexu = 0
  289. }
  290. }
  291. }
  292. }
  293. func SaveAllEsMethod() {
  294. arru := make([]map[string]interface{}, EsBulkSize)
  295. indexu := 0
  296. for {
  297. select {
  298. case v := <-saveEsAllPool:
  299. arru[indexu] = v
  300. indexu++
  301. if indexu == 200 {
  302. saveEsAllSp <- true
  303. go func(arru []map[string]interface{}) {
  304. defer func() {
  305. <-saveEsAllSp
  306. }()
  307. Es1.BulkSave("bidding_all", "bidding", &arru, true)
  308. }(arru)
  309. arru = make([]map[string]interface{}, EsBulkSize)
  310. indexu = 0
  311. }
  312. case <-time.After(1000 * time.Millisecond):
  313. if indexu > 0 {
  314. saveEsAllSp <- true
  315. go func(arru []map[string]interface{}) {
  316. defer func() {
  317. <-saveEsAllSp
  318. }()
  319. Es1.BulkSave("bidding_all", "bidding", &arru, true)
  320. }(arru[:indexu])
  321. arru = make([]map[string]interface{}, EsBulkSize)
  322. indexu = 0
  323. }
  324. }
  325. }
  326. }
  327. func UpdateBidding() {
  328. arru := make([][]map[string]interface{}, MgoBulkSize)
  329. indexu := 0
  330. for {
  331. select {
  332. case v := <-updateBiddingPool:
  333. arru[indexu] = v
  334. indexu++
  335. if indexu == MgoBulkSize {
  336. updateBiddingSp <- true
  337. go func(arru [][]map[string]interface{}) {
  338. defer func() {
  339. <-updateBiddingSp
  340. }()
  341. biddingMgo.UpdateBulk(currentColl, arru...)
  342. }(arru)
  343. arru = make([][]map[string]interface{}, MgoBulkSize)
  344. indexu = 0
  345. }
  346. case <-time.After(1000 * time.Millisecond):
  347. if indexu > 0 {
  348. updateBiddingSp <- true
  349. go func(arru [][]map[string]interface{}) {
  350. defer func() {
  351. <-updateBiddingSp
  352. }()
  353. biddingMgo.UpdateBulk(currentColl, arru...)
  354. }(arru[:indexu])
  355. arru = make([][]map[string]interface{}, MgoBulkSize)
  356. indexu = 0
  357. }
  358. }
  359. }
  360. }
  361. func UpdateExtract() {
  362. extract := util.ObjToString(extract["collect"])
  363. arru := make([][]map[string]interface{}, MgoBulkSize)
  364. indexu := 0
  365. for {
  366. select {
  367. case v := <-updateExtractPool:
  368. arru[indexu] = v
  369. indexu++
  370. if indexu == MgoBulkSize {
  371. updateExtractSp <- true
  372. go func(arru [][]map[string]interface{}) {
  373. defer func() {
  374. <-updateExtractSp
  375. }()
  376. extractMgo.UpdateBulk(extract, arru...)
  377. }(arru)
  378. arru = make([][]map[string]interface{}, MgoBulkSize)
  379. indexu = 0
  380. }
  381. case <-time.After(1000 * time.Millisecond):
  382. if indexu > 0 {
  383. updateExtractSp <- true
  384. go func(arru [][]map[string]interface{}) {
  385. defer func() {
  386. <-updateExtractSp
  387. }()
  388. extractMgo.UpdateBulk(extract, arru...)
  389. }(arru[:indexu])
  390. arru = make([][]map[string]interface{}, MgoBulkSize)
  391. indexu = 0
  392. }
  393. }
  394. }
  395. }