main.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. package main
  2. import (
  3. util "app.yhyue.com/data_processing/common_utils"
  4. "app.yhyue.com/data_processing/common_utils/elastic"
  5. "app.yhyue.com/data_processing/common_utils/log"
  6. "app.yhyue.com/data_processing/common_utils/mongodb"
  7. "app.yhyue.com/data_processing/common_utils/udp"
  8. "encoding/json"
  9. "esindex/config"
  10. "esindex/oss"
  11. "fmt"
  12. "github.com/robfig/cron"
  13. "go.uber.org/zap"
  14. "io/ioutil"
  15. "net"
  16. "net/http"
  17. "sync"
  18. "time"
  19. )
  20. var (
  21. MgoB *mongodb.MongodbSim
  22. MgoP *mongodb.MongodbSim
  23. MgoQ *mongodb.MongodbSim
  24. Es, Es1 *elastic.Elastic
  25. UdpClient udp.UdpClient
  26. UdpTaskMap = &sync.Map{}
  27. JyUdpAddr *net.UDPAddr
  28. EsBulkSize = 100 // es批量保存大小
  29. updateBiddingPool = make(chan []map[string]interface{}, 5000) //更新bingding数据
  30. updateBiddingSp = make(chan bool, 5)
  31. saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
  32. saveEsSp = make(chan bool, 5)
  33. saveProjectEsPool = make(chan map[string]interface{}, 5000) //保存project数据到es
  34. saveProjectSp = make(chan bool, 5)
  35. saveEsAllPool = make(chan map[string]interface{}, 5000)
  36. saveEsAllSp = make(chan bool, 5)
  37. //saveErrBidPool = make(chan map[string]interface{}, 5000)
  38. //saveBidSp = make(chan bool, 5)
  39. //detailLength = 50000 // es保存detail长度
  40. fileLength = 50000 // es保存附件文本长度
  41. //pscopeLength = 32766 // projectscope长度
  42. )
  43. func init() {
  44. config.Init("./common.toml")
  45. InitLog()
  46. InitMgo()
  47. InitEs()
  48. InitField()
  49. InitEsBiddingField()
  50. oss.InitOss()
  51. JyUdpAddr = &net.UDPAddr{
  52. IP: net.ParseIP(config.Conf.Udp.JyAddr),
  53. Port: util.IntAll(config.Conf.Udp.JyPort),
  54. }
  55. log.Info("init success")
  56. }
  57. func main() {
  58. go checkMapJob() //udp 发送邮件
  59. go task_index() //定时同步更新winner_enterprise、buyer_enterprise ES索引;这个功能很少变动,几乎不需要维护
  60. go UpdateBidding() //更新bidding表数据
  61. go SaveEsMethod()
  62. go SaveAllEsMethod()
  63. go SaveProjectEs()
  64. //go SaveBidErr()
  65. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  66. UdpClient.Listen(processUdpMsg)
  67. log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
  68. ch := make(chan bool, 1)
  69. <-ch
  70. }
  71. var pool = make(chan bool, 20)
  72. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  73. switch act {
  74. case udp.OP_TYPE_DATA:
  75. var mapInfo map[string]interface{}
  76. err := json.Unmarshal(data, &mapInfo)
  77. log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
  78. if err != nil {
  79. UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  80. } else if mapInfo != nil {
  81. key, _ := mapInfo["key"].(string)
  82. if key == "" {
  83. key = "udpok"
  84. }
  85. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  86. tasktype, _ := mapInfo["stype"].(string)
  87. switch tasktype {
  88. case "index-by-id": //单个索引
  89. pool <- true
  90. go func() {
  91. defer func() {
  92. <-pool
  93. }()
  94. biddingTaskById(mapInfo)
  95. }()
  96. case "bidding":
  97. pool <- true
  98. go func() {
  99. defer func() {
  100. <-pool
  101. }()
  102. biddingTask(mapInfo)
  103. }()
  104. case "biddingall":
  105. pool <- true
  106. go func() {
  107. defer func() {
  108. <-pool
  109. }()
  110. biddingAllTask(mapInfo)
  111. }()
  112. case "bidding_all_data": //根据biddingall配置文件,存量迁移数据
  113. pool <- true
  114. go func() {
  115. defer func() {
  116. <-pool
  117. }()
  118. biddingAllDataTask()
  119. }()
  120. case "bidding_history":
  121. pool <- true
  122. go func() {
  123. defer func() {
  124. <-pool
  125. }()
  126. biddingTask(mapInfo)
  127. }()
  128. case "project":
  129. pool <- true
  130. go func() {
  131. defer func() {
  132. <-pool
  133. }()
  134. projectTask(data, mapInfo)
  135. }()
  136. case "biddingdata": //bidding全量数据
  137. pool <- true
  138. go func() {
  139. defer func() {
  140. <-pool
  141. }()
  142. biddingDataTask(data, mapInfo)
  143. }()
  144. case "biddingdelbyextracttype": //根据bidding表extracttype=-1,删除es中重复数据
  145. pool <- true
  146. go func() {
  147. defer func() {
  148. <-pool
  149. }()
  150. biddingDelByExtracttype(data, mapInfo)
  151. }()
  152. default:
  153. pool <- true
  154. go func() {
  155. defer func() {
  156. <-pool
  157. }()
  158. log.Info("err", zap.Any("mapInfo", mapInfo))
  159. }()
  160. }
  161. }
  162. case udp.OP_NOOP: //下个节点回应
  163. ok := string(data)
  164. if ok != "" {
  165. log.Info("udp re", zap.String("data:", ok))
  166. UdpTaskMap.Delete(ok)
  167. }
  168. }
  169. }
  170. func task_index() {
  171. c := cron.New()
  172. _ = c.AddFunc("0 0 0 * * ?", func() { task_winneres() }) //每天凌晨执行一次winner生索引
  173. _ = c.AddFunc("0 0 1 * * ?", func() { task_buyeres() }) //每天1点执行一次buyer生索引
  174. c.Start()
  175. }
  176. func task_winneres() {
  177. log.Info("定时任务,winneres")
  178. winnerEsTaskOnce()
  179. }
  180. func task_buyeres() {
  181. log.Info("定时任务,buyeres")
  182. buyerEsTaskOnce()
  183. }
  184. type UdpNode struct {
  185. data []byte
  186. addr *net.UDPAddr
  187. timestamp int64
  188. retry int
  189. }
  190. //UpdateBidding 更新bidding表数据
  191. func UpdateBidding() {
  192. arru := make([][]map[string]interface{}, 200)
  193. indexu := 0
  194. for {
  195. select {
  196. case v := <-updateBiddingPool:
  197. arru[indexu] = v
  198. indexu++
  199. if indexu == 200 {
  200. updateBiddingSp <- true
  201. go func(arru [][]map[string]interface{}) {
  202. defer func() {
  203. <-updateBiddingSp
  204. }()
  205. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
  206. }(arru)
  207. arru = make([][]map[string]interface{}, 200)
  208. indexu = 0
  209. }
  210. case <-time.After(1000 * time.Millisecond):
  211. if indexu > 0 {
  212. updateBiddingSp <- true
  213. go func(arru [][]map[string]interface{}) {
  214. defer func() {
  215. <-updateBiddingSp
  216. }()
  217. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
  218. }(arru[:indexu])
  219. arru = make([][]map[string]interface{}, 200)
  220. indexu = 0
  221. }
  222. }
  223. }
  224. }
  225. //func SaveBidErr() {
  226. // arru := make([]map[string]interface{}, 200)
  227. // indexu := 0
  228. // for {
  229. // select {
  230. // case v := <-saveErrBidPool:
  231. // arru[indexu] = v
  232. // indexu++
  233. // if indexu == 200 {
  234. // saveBidSp <- true
  235. // go func(arru []map[string]interface{}) {
  236. // defer func() {
  237. // <-saveBidSp
  238. // }()
  239. // MgoB.SaveBulk("bidding_es_err_record", arru...)
  240. // }(arru)
  241. // arru = make([]map[string]interface{}, 200)
  242. // indexu = 0
  243. // }
  244. // case <-time.After(1000 * time.Millisecond):
  245. // if indexu > 0 {
  246. // saveBidSp <- true
  247. // go func(arru []map[string]interface{}) {
  248. // defer func() {
  249. // <-saveBidSp
  250. // }()
  251. // MgoB.SaveBulk("bidding_es_err_record", arru...)
  252. // }(arru[:indexu])
  253. // arru = make([]map[string]interface{}, 200)
  254. // indexu = 0
  255. // }
  256. // }
  257. // }
  258. //}
  259. //SaveEsMethod 保存到es
  260. func SaveEsMethod() {
  261. arru := make([]map[string]interface{}, EsBulkSize)
  262. indexu := 0
  263. for {
  264. select {
  265. case v := <-saveEsPool:
  266. arru[indexu] = v
  267. indexu++
  268. if indexu == EsBulkSize {
  269. saveEsSp <- true
  270. go func(arru []map[string]interface{}) {
  271. defer func() {
  272. <-saveEsSp
  273. }()
  274. Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
  275. }(arru)
  276. arru = make([]map[string]interface{}, EsBulkSize)
  277. indexu = 0
  278. }
  279. case <-time.After(1000 * time.Millisecond):
  280. if indexu > 0 {
  281. saveEsSp <- true
  282. go func(arru []map[string]interface{}) {
  283. defer func() {
  284. <-saveEsSp
  285. }()
  286. Es.BulkSave(config.Conf.DB.Es.IndexB, arru)
  287. }(arru[:indexu])
  288. arru = make([]map[string]interface{}, EsBulkSize)
  289. indexu = 0
  290. }
  291. }
  292. }
  293. }
  294. func SaveAllEsMethod() {
  295. arru := make([]map[string]interface{}, EsBulkSize)
  296. indexu := 0
  297. for {
  298. select {
  299. case v := <-saveEsAllPool:
  300. arru[indexu] = v
  301. indexu++
  302. if indexu == EsBulkSize {
  303. saveEsAllSp <- true
  304. go func(arru []map[string]interface{}) {
  305. defer func() {
  306. <-saveEsAllSp
  307. }()
  308. Es1.BulkSave("biddingall", arru)
  309. }(arru)
  310. arru = make([]map[string]interface{}, EsBulkSize)
  311. indexu = 0
  312. }
  313. case <-time.After(1000 * time.Millisecond):
  314. if indexu > 0 {
  315. saveEsAllSp <- true
  316. go func(arru []map[string]interface{}) {
  317. defer func() {
  318. <-saveEsAllSp
  319. }()
  320. Es1.BulkSave("biddingall", arru)
  321. }(arru[:indexu])
  322. arru = make([]map[string]interface{}, EsBulkSize)
  323. indexu = 0
  324. }
  325. }
  326. }
  327. }
  328. func SaveProjectEs() {
  329. arru := make([]map[string]interface{}, EsBulkSize)
  330. indexu := 0
  331. for {
  332. select {
  333. case v := <-saveProjectEsPool:
  334. arru[indexu] = v
  335. indexu++
  336. if indexu == EsBulkSize {
  337. saveProjectSp <- true
  338. go func(arru []map[string]interface{}) {
  339. defer func() {
  340. <-saveProjectSp
  341. }()
  342. Es.BulkSave(config.Conf.DB.Es.IndexP, arru)
  343. }(arru)
  344. arru = make([]map[string]interface{}, EsBulkSize)
  345. indexu = 0
  346. }
  347. case <-time.After(1000 * time.Millisecond):
  348. if indexu > 0 {
  349. saveProjectSp <- true
  350. go func(arru []map[string]interface{}) {
  351. defer func() {
  352. <-saveProjectSp
  353. }()
  354. Es.BulkSave(config.Conf.DB.Es.IndexP, arru)
  355. }(arru[:indexu])
  356. arru = make([]map[string]interface{}, EsBulkSize)
  357. indexu = 0
  358. }
  359. }
  360. }
  361. }
  362. func checkMapJob() {
  363. if config.Conf.Mail.Send {
  364. log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
  365. for {
  366. UdpTaskMap.Range(func(k, v interface{}) bool {
  367. now := time.Now().Unix()
  368. node, _ := v.(*UdpNode)
  369. if now-node.timestamp > 120 {
  370. node.retry++
  371. if node.retry > 5 {
  372. UdpTaskMap.Delete(k)
  373. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-sync-send-fail", k.(string)))
  374. if err == nil {
  375. defer res.Body.Close()
  376. read, err := ioutil.ReadAll(res.Body)
  377. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  378. }
  379. } else {
  380. log.Info("udp重发", zap.Any("k:", k))
  381. UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  382. }
  383. } else if now-node.timestamp > 10 {
  384. log.Info("udp任务超时中..", zap.Any("k:", k))
  385. }
  386. return true
  387. })
  388. time.Sleep(60 * time.Second)
  389. }
  390. }
  391. }
  392. func task() {
  393. sess := MgoB.GetMgoConn()
  394. defer MgoB.DestoryMongoConn(sess)
  395. ch := make(chan bool, 10)
  396. wg := &sync.WaitGroup{}
  397. query := sess.DB("qfw").C("result_replace_repair_log").Find(nil).Iter()
  398. count := 0
  399. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  400. if count%1000 == 0 {
  401. util.Debug("current ---", count)
  402. }
  403. ch <- true
  404. wg.Add(1)
  405. go func(tmp map[string]interface{}) {
  406. defer func() {
  407. <-ch
  408. wg.Done()
  409. }()
  410. if id := util.ObjToString(tmp["replace_id"]); mongodb.IsObjectIdHex(id) {
  411. biddingTaskById(map[string]interface{}{"infoid": id, "stype": "bidding"})
  412. }
  413. }(tmp)
  414. tmp = make(map[string]interface{})
  415. }
  416. wg.Wait()
  417. util.Debug("over ---", count)
  418. }