main.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. package main
  2. import (
  3. "encoding/json"
  4. "field_sync/config"
  5. "field_sync/oss"
  6. "fmt"
  7. "io/ioutil"
  8. "net"
  9. "net/http"
  10. "strings"
  11. "sync"
  12. "time"
  13. "log"
  14. elastic "app.yhyue.com/moapp/jybase/es"
  15. "go.uber.org/zap"
  16. "gopkg.in/mgo.v2/bson"
  17. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  18. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  19. gonsq "jygit.jydev.jianyu360.cn/data_processing/common_utils/nsq"
  20. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  21. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  22. )
  23. var (
  24. MgoB, MgoBP *mongodb.MongodbSim
  25. MgoE *mongodb.MongodbSim
  26. MgoQ *mongodb.MongodbSim // 企业
  27. MgoP *mongodb.MongodbSim // 凭安企业
  28. Es elastic.Es
  29. UdpClient udp.UdpClient
  30. UdpTaskMap = &sync.Map{}
  31. Mcmer *gonsq.Consumer
  32. MgoBulkSize = 200 // mgo批量保存大小
  33. updateBidPool = make(chan []map[string]interface{}, 5000)
  34. updateBidSp = make(chan bool, 5)
  35. updateExtPool = make(chan []map[string]interface{}, 5000)
  36. updateExtSp = make(chan bool, 5)
  37. )
  38. func init() {
  39. config.Init("./common.toml")
  40. oss.InitOss()
  41. InitFileInfo()
  42. // InitLog()
  43. InitMgo()
  44. InitEs()
  45. inits()
  46. redis.InitRedis1(config.Conf.DB.Redis.Addr, config.Conf.DB.Redis.DbIndex)
  47. log.Println("init success")
  48. }
  49. func main() {
  50. go checkMapJob()
  51. go nsqMethod()
  52. go UpdateBidding()
  53. go UpdateExtract()
  54. UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
  55. UdpClient.Listen(processUdpMsg)
  56. log.Println("Udp服务监听 port:", config.Conf.Udp.LocPort)
  57. info, _ := MgoBP.Find("bidding_processing_ids", `{"dataprocess_ai": 5}`, bson.M{"_id": 1}, nil, false, -1, -1)
  58. log.Println(len(*info))
  59. log.Println("size", len(*info))
  60. if len(*info) > 0 {
  61. for _, m := range *info {
  62. mapInfo := make(map[string]interface{})
  63. mapInfo["gtid"] = util.ObjToString(m["gtid"])
  64. mapInfo["lteid"] = util.ObjToString(m["lteid"])
  65. mapInfo["stype"] = "bidding"
  66. mapInfo["key"] = fmt.Sprintf("%s-%s-bidding", util.ObjToString(m["gtid"]), util.ObjToString(m["lteid"]))
  67. log.Println("--", mapInfo)
  68. biddingTask(nil, mapInfo)
  69. }
  70. }
  71. ch := make(chan bool, 1)
  72. <-ch
  73. }
  74. var pool = make(chan bool, 20)
  75. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  76. defer util.Catch()
  77. switch act {
  78. case udp.OP_TYPE_DATA: //上个节点的数据
  79. var mapInfo map[string]interface{}
  80. err := json.Unmarshal(data, &mapInfo)
  81. log.Println("processUdpMsg mapInfo:", mapInfo)
  82. if err != nil {
  83. UdpClient.WriteUdp([]byte("err:"+err.Error()), udp.OP_NOOP, ra)
  84. } else if mapInfo != nil {
  85. key, _ := mapInfo["key"].(string)
  86. if key == "" {
  87. key = "udpok"
  88. }
  89. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  90. tasktype, _ := mapInfo["stype"].(string)
  91. switch tasktype {
  92. case "bidding":
  93. pool <- true
  94. go func() {
  95. defer func() {
  96. <-pool
  97. }()
  98. biddingTask(data, mapInfo)
  99. }()
  100. case "bidding_history": //增量id段历史数据
  101. pool <- true
  102. go func() {
  103. defer func() {
  104. <-pool
  105. }()
  106. biddingTask(data, mapInfo)
  107. }()
  108. case "bidding_all": //id段存量数据
  109. pool <- true
  110. go func() {
  111. defer func() {
  112. <-pool
  113. }()
  114. biddingAllTask(data, mapInfo)
  115. }()
  116. case "monitor":
  117. //
  118. default:
  119. pool <- true
  120. go func() {
  121. defer func() {
  122. <-pool
  123. }()
  124. log.Println("err mapinfo ", mapInfo)
  125. }()
  126. }
  127. }
  128. case udp.OP_NOOP:
  129. ok := string(data)
  130. if ok != "" {
  131. log.Println("udp re data:", ok)
  132. UdpTaskMap.Delete(ok)
  133. }
  134. }
  135. }
  136. type UdpNode struct {
  137. data []byte
  138. addr *net.UDPAddr
  139. timestamp int64
  140. retry int
  141. }
  142. func NextNode(mapInfo map[string]interface{}, stype string) {
  143. var next = &net.UDPAddr{
  144. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  145. Port: util.IntAll(config.Conf.Udp.Next.Port),
  146. }
  147. mapInfo["stype"] = stype
  148. key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), stype)
  149. mapInfo["key"] = key
  150. log.Println("udp es node mapinfo:", mapInfo)
  151. datas, _ := json.Marshal(mapInfo)
  152. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  153. UdpTaskMap.Store(key, node)
  154. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  155. }
  156. func NextNodePro(mapInfo map[string]interface{}, stype string) {
  157. var next = &net.UDPAddr{
  158. IP: net.ParseIP(config.Conf.Udp.Project.Addr),
  159. Port: util.IntAll(config.Conf.Udp.Project.Port),
  160. }
  161. if stype == "bidding_history" {
  162. mapInfo["stype"] = "project_history"
  163. } else {
  164. mapInfo["stype"] = "project"
  165. }
  166. key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
  167. mapInfo["key"] = key
  168. log.Println("udp project node mapinfo:", mapInfo)
  169. datas, _ := json.Marshal(mapInfo)
  170. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  171. UdpTaskMap.Store(key, node)
  172. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  173. }
  174. func NextNodeBidData(mapInfo map[string]interface{}) {
  175. next := &net.UDPAddr{
  176. IP: net.ParseIP(config.Conf.Udp.Next.Addr),
  177. Port: util.IntAll(config.Conf.Udp.Next.Port),
  178. }
  179. mapInfo["stype"] = "biddingdata"
  180. mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
  181. log.Println("udp es node mapinfo:", mapInfo)
  182. datas, _ := json.Marshal(mapInfo)
  183. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  184. }
  185. func NextNodeTidbQyxy(mapInfo map[string]interface{}) {
  186. next := &net.UDPAddr{
  187. IP: net.ParseIP(config.Conf.Udp.Tidb.Addr),
  188. Port: util.IntAll(config.Conf.Udp.Tidb.Port),
  189. }
  190. mapInfo["stype"] = config.Conf.Udp.Tidb.Stype
  191. mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
  192. log.Println("udp tidb-qyxy node mapinfo:", mapInfo)
  193. datas, _ := json.Marshal(mapInfo)
  194. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  195. UdpTaskMap.Store(mapInfo["key"], node)
  196. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  197. }
  198. func NextNodeTidb(mapInfo map[string]interface{}, stype string) {
  199. next := &net.UDPAddr{
  200. IP: net.ParseIP(config.Conf.Udp.Tidb1.Addr),
  201. Port: util.IntAll(config.Conf.Udp.Tidb1.Port),
  202. }
  203. mapInfo["stype"] = stype
  204. mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
  205. log.Println("udp tidb-bidding node mapinfo:", mapInfo)
  206. datas, _ := json.Marshal(mapInfo)
  207. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  208. UdpTaskMap.Store(mapInfo["key"], node)
  209. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  210. }
  211. // NextNodeHn @Description 郑坤 海南数据处理
  212. // @Author J 2022/10/28 09:26
  213. func NextNodeHn(mapInfo map[string]interface{}) {
  214. next := &net.UDPAddr{
  215. IP: net.ParseIP(config.Conf.Udp.Tidb2.Addr),
  216. Port: util.IntAll(config.Conf.Udp.Tidb2.Port),
  217. }
  218. mapInfo["stype"] = "hainan"
  219. mapInfo["key"] = fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), util.ObjToString(mapInfo["stype"]))
  220. log.Println("NextNodeTidb mapinfo:", mapInfo)
  221. datas, _ := json.Marshal(mapInfo)
  222. node := &UdpNode{datas, next, time.Now().Unix(), 0}
  223. UdpTaskMap.Store(mapInfo["key"], node)
  224. _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
  225. }
  226. func UpdateBidding() {
  227. arru := make([][]map[string]interface{}, MgoBulkSize)
  228. indexu := 0
  229. for {
  230. select {
  231. case v := <-updateBidPool:
  232. arru[indexu] = v
  233. indexu++
  234. if indexu == MgoBulkSize {
  235. updateBidSp <- true
  236. go func(arru [][]map[string]interface{}) {
  237. defer func() {
  238. <-updateBidSp
  239. }()
  240. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
  241. }(arru)
  242. arru = make([][]map[string]interface{}, MgoBulkSize)
  243. indexu = 0
  244. }
  245. case <-time.After(1000 * time.Millisecond):
  246. if indexu > 0 {
  247. updateBidSp <- true
  248. go func(arru [][]map[string]interface{}) {
  249. defer func() {
  250. <-updateBidSp
  251. }()
  252. MgoB.UpdateBulk(config.Conf.DB.MongoB.Coll, arru...)
  253. }(arru[:indexu])
  254. arru = make([][]map[string]interface{}, MgoBulkSize)
  255. indexu = 0
  256. }
  257. }
  258. }
  259. }
  260. func UpdateExtract() {
  261. arru := make([][]map[string]interface{}, MgoBulkSize)
  262. indexu := 0
  263. for {
  264. select {
  265. case v := <-updateExtPool:
  266. arru[indexu] = v
  267. indexu++
  268. if indexu == MgoBulkSize {
  269. updateExtSp <- true
  270. go func(arru [][]map[string]interface{}) {
  271. defer func() {
  272. <-updateExtSp
  273. }()
  274. MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, arru...)
  275. }(arru)
  276. arru = make([][]map[string]interface{}, MgoBulkSize)
  277. indexu = 0
  278. }
  279. case <-time.After(1000 * time.Millisecond):
  280. if indexu > 0 {
  281. updateExtSp <- true
  282. go func(arru [][]map[string]interface{}) {
  283. defer func() {
  284. <-updateExtSp
  285. }()
  286. MgoE.UpdateBulk(config.Conf.DB.MongoE.Coll, arru...)
  287. }(arru[:indexu])
  288. arru = make([][]map[string]interface{}, MgoBulkSize)
  289. indexu = 0
  290. }
  291. }
  292. }
  293. }
  294. func checkMapJob() {
  295. if config.Conf.Mail.Send {
  296. log.Println("checkMapJob to:", config.Conf.Mail.To)
  297. for {
  298. UdpTaskMap.Range(func(k, v interface{}) bool {
  299. now := time.Now().Unix()
  300. node, _ := v.(*UdpNode)
  301. if now-node.timestamp > 120 {
  302. node.retry++
  303. if node.retry > 5 {
  304. UdpTaskMap.Delete(k)
  305. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-sync-send-fail", k.(string)))
  306. if err == nil {
  307. defer res.Body.Close()
  308. read, err := ioutil.ReadAll(res.Body)
  309. log.Println("send mail ... r:", string(read), "err:", err)
  310. }
  311. } else {
  312. log.Println("udp重发", zap.Any("k:", k))
  313. //UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
  314. }
  315. } else if now-node.timestamp > 10 {
  316. log.Println("udp任务超时中.. k:", k)
  317. }
  318. return true
  319. })
  320. time.Sleep(60 * time.Second)
  321. }
  322. }
  323. }
  324. // @Description nsq处理id不变,内容替换的竞品数据
  325. // @Author J 2022/8/10 11:40
  326. func nsqMethod() {
  327. var err error
  328. Mcmer, err = gonsq.NewConsumer(&gonsq.Cconfig{
  329. IsJsonEncode: true, //与生产者配置对应,设为true会取第1个字节进行类型判断
  330. Addr: config.Conf.Nsq.Addr,
  331. ConnectType: 0, //默认连接nsqd
  332. Topic: config.Conf.Nsq.Topic,
  333. Channel: config.Conf.Nsq.Channel,
  334. Concurrent: config.Conf.Nsq.Concurrent, //并发数
  335. })
  336. if err != nil {
  337. log.Println("nsqMethod err", err)
  338. }
  339. for {
  340. select {
  341. case obj := <-Mcmer.Ch: //从通道读取即可
  342. objstr := util.ObjToString(obj)
  343. log.Println("obj ", obj, objstr)
  344. id := strings.Split(objstr, "=")
  345. if len(id) > 1 {
  346. if bson.IsObjectIdHex(id[1]) {
  347. taskinfo(id[1])
  348. } else {
  349. log.Println("jy nsq id err id", objstr)
  350. }
  351. } else {
  352. log.Println("jy nsq id err id", objstr)
  353. }
  354. }
  355. }
  356. }