main.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "esindex/config"
  10. "esindex/oss"
  11. "fmt"
  12. "github.com/robfig/cron"
  13. "go.uber.org/zap"
  14. "io/ioutil"
  15. "net"
  16. "net/http"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. MgoB *mongodb.MongodbSim
  22. MgoP *mongodb.MongodbSim
  23. MgoQ *mongodb.MongodbSim
  24. Es, Es1 *elastic.Elastic
  25. UdpClient udp.UdpClient
  26. UdpTaskMap = &sync.Map{}
  27. JyUdpAddr *net.UDPAddr
  28. EsBulkSize = 100 // es批量保存大小
  29. updateBiddingPool = make(chan []map[string]interface{}, 5000)
  30. updateBiddingSp = make(chan bool, 5)
  31. saveEsPool = make(chan map[string]interface{}, 5000)
  32. saveEsSp = make(chan bool, 5)
  33. saveProjectEsPool = make(chan map[string]interface{}, 5000)
  34. saveProjectSp = make(chan bool, 5)
  35. saveEsAllPool = make(chan map[string]interface{}, 5000)
  36. saveEsAllSp = make(chan bool, 5)
  37. saveErrBidPool = make(chan map[string]interface{}, 5000)
  38. saveBidSp = make(chan bool, 5)
  39. detailLength = 50000 // es保存detail长度
  40. fileLength = 50000 // es保存附件文本长度
  41. pscopeLength = 32766 // projectscope长度
  42. )
  43. func init() {
  44. config.Init("./common.toml")
  45. InitLog()
  46. InitMgo()
  47. InitEs()
  48. oss.InitOss()
  49. JyUdpAddr = &net.UDPAddr{
  50. IP: net.ParseIP(config.Conf.Udp.JyAddr),
  51. Port: util.IntAll(config.Conf.Udp.JyPort),
  52. }
  53. log.Info("init success")
  54. }
  55. func main() {
  56. go checkMapJob()
  57. go task_index()
  58. go UpdateBidding()
  59. go SaveEsMethod()
  60. go SaveAllEsMethod()
  61. go SaveProjectEs()
  62. go SaveBidErr()
  63. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  64. UdpClient.Listen(processUdpMsg)
  65. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  66. ch := make(chan bool, 1)
  67. <-ch
  68. }
  69. var pool = make(chan bool, 20)
  70. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  71. switch act {
  72. case udp.OP_TYPE_DATA:
  73. var mapInfo map[string]interface{}
  74. err := json.Unmarshal(data, &mapInfo)
  75. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  76. if err != nil {
  77. UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  78. } else if mapInfo != nil {
  79. key, _ := mapInfo["key"].(string)
  80. if key == "" {
  81. key = "udpok"
  82. }
  83. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  84. tasktype, _ := mapInfo["stype"].(string)
  85. switch tasktype {
  86. case "index-by-id":
  87. pool <- true
  88. go func() {
  89. defer func() {
  90. <-pool
  91. }()
  92. biddingTaskById(mapInfo)
  93. }()
  94. case "bidding":
  95. pool <- true
  96. go func() {
  97. defer func() {
  98. <-pool
  99. }()
  100. biddingTask(mapInfo)
  101. }()
  102. case "biddingall":
  103. pool <- true
  104. go func() {
  105. defer func() {
  106. <-pool
  107. }()
  108. biddingAllTask(mapInfo)
  109. }()
  110. case "bidding_history":
  111. pool <- true
  112. go func() {
  113. defer func() {
  114. <-pool
  115. }()
  116. biddingTask(mapInfo)
  117. }()
  118. case "project":
  119. pool <- true
  120. go func() {
  121. defer func() {
  122. <-pool
  123. }()
  124. projectTask(data, mapInfo)
  125. }()
  126. case "biddingdata": //bidding全量数据
  127. pool <- true
  128. go func() {
  129. defer func() {
  130. <-pool
  131. }()
  132. biddingDataTask(data, mapInfo)
  133. }()
  134. case "biddingdelbyextracttype": //根据extracttype删除es
  135. pool <- true
  136. go func() {
  137. defer func() {
  138. <-pool
  139. }()
  140. biddingDelByExtracttype(data, mapInfo)
  141. }()
  142. default:
  143. pool <- true
  144. go func() {
  145. defer func() {
  146. <-pool
  147. }()
  148. log.Info("err", zap.Any("mapInfo", mapInfo))
  149. }()
  150. }
  151. }
  152. case udp.OP_NOOP: //下个节点回应
  153. ok := string(data)
  154. if ok != "" {
  155. log.Info("udp re", zap.String("data:", ok))
  156. UdpTaskMap.Delete(ok)
  157. }
  158. }
  159. }
  160. func task_index() {
  161. c := cron.New()
  162. _ = c.AddFunc("0 0 0 * * ?", func() { task_winneres() }) //每天凌晨执行一次winner生索引
  163. _ = c.AddFunc("0 0 1 * * ?", func() { task_buyeres() }) //每天1点执行一次buyer生索引
  164. c.Start()
  165. }
  166. func task_winneres() {
  167. log.Info("定时任务,winneres")
  168. winnerEsTaskOnce()
  169. }
  170. func task_buyeres() {
  171. log.Info("定时任务,buyeres")
  172. buyerEsTaskOnce()
  173. }
  174. type UdpNode struct {
  175. data []byte
  176. addr *net.UDPAddr
  177. timestamp int64
  178. retry int
  179. }
  180. func UpdateBidding() {
  181. arru := make([][]map[string]interface{}, 200)
  182. indexu := 0
  183. for {
  184. select {
  185. case v := <-updateBiddingPool:
  186. arru[indexu] = v
  187. indexu++
  188. if indexu == 200 {
  189. updateBiddingSp <- true
  190. go func(arru [][]map[string]interface{}) {
  191. defer func() {
  192. <-updateBiddingSp
  193. }()
  194. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
  195. }(arru)
  196. arru = make([][]map[string]interface{}, 200)
  197. indexu = 0
  198. }
  199. case <-time.After(1000 * time.Millisecond):
  200. if indexu > 0 {
  201. updateBiddingSp <- true
  202. go func(arru [][]map[string]interface{}) {
  203. defer func() {
  204. <-updateBiddingSp
  205. }()
  206. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
  207. }(arru[:indexu])
  208. arru = make([][]map[string]interface{}, 200)
  209. indexu = 0
  210. }
  211. }
  212. }
  213. }
  214. func SaveBidErr() {
  215. arru := make([]map[string]interface{}, 200)
  216. indexu := 0
  217. for {
  218. select {
  219. case v := <-saveErrBidPool:
  220. arru[indexu] = v
  221. indexu++
  222. if indexu == 200 {
  223. saveBidSp <- true
  224. go func(arru []map[string]interface{}) {
  225. defer func() {
  226. <-saveBidSp
  227. }()
  228. MgoB.SaveBulk("bidding_es_err_record", arru...)
  229. }(arru)
  230. arru = make([]map[string]interface{}, 200)
  231. indexu = 0
  232. }
  233. case <-time.After(1000 * time.Millisecond):
  234. if indexu > 0 {
  235. saveBidSp <- true
  236. go func(arru []map[string]interface{}) {
  237. defer func() {
  238. <-saveBidSp
  239. }()
  240. MgoB.SaveBulk("bidding_es_err_record", arru...)
  241. }(arru[:indexu])
  242. arru = make([]map[string]interface{}, 200)
  243. indexu = 0
  244. }
  245. }
  246. }
  247. }
  248. func SaveEsMethod() {
  249. arru := make([]map[string]interface{}, EsBulkSize)
  250. indexu := 0
  251. for {
  252. select {
  253. case v := <-saveEsPool:
  254. arru[indexu] = v
  255. indexu++
  256. if indexu == EsBulkSize {
  257. saveEsSp <- true
  258. go func(arru []map[string]interface{}) {
  259. defer func() {
  260. <-saveEsSp
  261. }()
  262. Es.BulkSave(config.Conf.DB.Es.IndexB, config.Conf.DB.Es.TypeB, &arru, true)
  263. }(arru)
  264. arru = make([]map[string]interface{}, EsBulkSize)
  265. indexu = 0
  266. }
  267. case <-time.After(1000 * time.Millisecond):
  268. if indexu > 0 {
  269. saveEsSp <- true
  270. go func(arru []map[string]interface{}) {
  271. defer func() {
  272. <-saveEsSp
  273. }()
  274. Es.BulkSave(config.Conf.DB.Es.IndexB, config.Conf.DB.Es.TypeB, &arru, true)
  275. }(arru[:indexu])
  276. arru = make([]map[string]interface{}, EsBulkSize)
  277. indexu = 0
  278. }
  279. }
  280. }
  281. }
  282. func SaveAllEsMethod() {
  283. arru := make([]map[string]interface{}, EsBulkSize)
  284. indexu := 0
  285. for {
  286. select {
  287. case v := <-saveEsAllPool:
  288. arru[indexu] = v
  289. indexu++
  290. if indexu == EsBulkSize {
  291. saveEsAllSp <- true
  292. go func(arru []map[string]interface{}) {
  293. defer func() {
  294. <-saveEsAllSp
  295. }()
  296. Es1.BulkSave("biddingall", "bidding", &arru, true)
  297. }(arru)
  298. arru = make([]map[string]interface{}, EsBulkSize)
  299. indexu = 0
  300. }
  301. case <-time.After(1000 * time.Millisecond):
  302. if indexu > 0 {
  303. saveEsAllSp <- true
  304. go func(arru []map[string]interface{}) {
  305. defer func() {
  306. <-saveEsAllSp
  307. }()
  308. Es1.BulkSave("biddingall", "bidding", &arru, true)
  309. }(arru[:indexu])
  310. arru = make([]map[string]interface{}, EsBulkSize)
  311. indexu = 0
  312. }
  313. }
  314. }
  315. }
  316. func SaveProjectEs() {
  317. arru := make([]map[string]interface{}, EsBulkSize)
  318. indexu := 0
  319. for {
  320. select {
  321. case v := <-saveProjectEsPool:
  322. arru[indexu] = v
  323. indexu++
  324. if indexu == EsBulkSize {
  325. saveProjectSp <- true
  326. go func(arru []map[string]interface{}) {
  327. defer func() {
  328. <-saveProjectSp
  329. }()
  330. Es.BulkSave(config.Conf.DB.Es.IndexP, config.Conf.DB.Es.TypeP, &arru, true)
  331. }(arru)
  332. arru = make([]map[string]interface{}, EsBulkSize)
  333. indexu = 0
  334. }
  335. case <-time.After(1000 * time.Millisecond):
  336. if indexu > 0 {
  337. saveProjectSp <- true
  338. go func(arru []map[string]interface{}) {
  339. defer func() {
  340. <-saveProjectSp
  341. }()
  342. Es.BulkSave(config.Conf.DB.Es.IndexP, config.Conf.DB.Es.TypeP, &arru, true)
  343. }(arru[:indexu])
  344. arru = make([]map[string]interface{}, EsBulkSize)
  345. indexu = 0
  346. }
  347. }
  348. }
  349. }
  350. func checkMapJob() {
  351. if config.Conf.Mail.Send {
  352. log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
  353. for {
  354. UdpTaskMap.Range(func(k, v interface{}) bool {
  355. now := time.Now().Unix()
  356. node, _ := v.(*UdpNode)
  357. if now-node.timestamp > 120 {
  358. node.retry++
  359. if node.retry > 5 {
  360. UdpTaskMap.Delete(k)
  361. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-sync-send-fail", k.(string)))
  362. if err == nil {
  363. defer res.Body.Close()
  364. read, err := ioutil.ReadAll(res.Body)
  365. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  366. }
  367. } else {
  368. log.Info("udp重发", zap.Any("k:", k))
  369. UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  370. }
  371. } else if now-node.timestamp > 10 {
  372. log.Info("udp任务超时中..", zap.Any("k:", k))
  373. }
  374. return true
  375. })
  376. time.Sleep(60 * time.Second)
  377. }
  378. }
  379. }
  380. func task() {
  381. sess := MgoB.GetMgoConn()
  382. defer MgoB.DestoryMongoConn(sess)
  383. ch := make(chan bool, 10)
  384. wg := &sync.WaitGroup{}
  385. query := sess.DB("qfw").C("bidding_0922").Find(nil).Iter()
  386. count := 0
  387. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  388. if count%1000 == 0 {
  389. util.Debug("current ---", count)
  390. }
  391. ch <- true
  392. wg.Add(1)
  393. go func(tmp map[string]interface{}) {
  394. defer func() {
  395. <-ch
  396. wg.Done()
  397. }()
  398. if id := util.ObjToString(tmp["infoid"]); mongodb.IsObjectIdHex(id) {
  399. biddingTaskById(map[string]interface{}{"infoid": id, "stype": "bidding"})
  400. }
  401. }(tmp)
  402. tmp = make(map[string]interface{})
  403. }
  404. wg.Wait()
  405. util.Debug("over ---", count)
  406. }