123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/elastic"
- "app.yhyue.com/data_processing/common_utils/log"
- "app.yhyue.com/data_processing/common_utils/mongodb"
- "app.yhyue.com/data_processing/common_utils/udp"
- "encoding/json"
- "field-dispose/config"
- "fmt"
- "go.uber.org/zap"
- "io/ioutil"
- "net"
- "net/http"
- "sync"
- "time"
- )
- var (
- MgoB *mongodb.MongodbSim
- Es *elastic.Elastic
- UdpClient udp.UdpClient
- UdpTaskMap = &sync.Map{}
- updatePool chan []map[string]interface{}
- updateSp chan bool
- updateEsPool chan []map[string]interface{}
- updateEsSp chan bool
- )
- func init() {
- config.Init("./common.toml")
- InitLog()
- InitMgo()
- updatePool = make(chan []map[string]interface{}, 5000)
- updateSp = make(chan bool, 5)
- updateEsPool = make(chan []map[string]interface{}, 5000)
- updateEsSp = make(chan bool, 2)
- UdpClient = udp.UdpClient{Local: config.Conf.Udp.LocPort, BufSize: 1024}
- UdpClient.Listen(processUdpMsg)
- log.Info("Udp服务监听", zap.String("port:", config.Conf.Udp.LocPort))
- }
- type UdpNode struct {
- data []byte
- addr *net.UDPAddr
- timestamp int64
- retry int
- }
- func main() {
- go SaveErrorInfo() //保存异常信息
- //go CheckErrorNum()
- //go updateEsMethod()
- go checkMapJob()
- go updateMethod()
- ch := make(chan bool, 1)
- <-ch
- }
- func InitMgo() {
- MgoB = &mongodb.MongodbSim{
- MongodbAddr: config.Conf.DB.Mongo.Addr,
- DbName: config.Conf.DB.Mongo.Dbname,
- Size: config.Conf.DB.Mongo.Size,
- UserName: config.Conf.DB.Mongo.User,
- Password: config.Conf.DB.Mongo.Password,
- }
- MgoB.InitPool()
- Es = &elastic.Elastic{
- S_esurl: config.Conf.DB.Es.Addr,
- I_size: config.Conf.DB.Es.Size,
- }
- Es.InitElasticSize()
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- defer util.Catch()
- switch act {
- case udp.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- log.Info("processUdpMsg", zap.Any("mapInfo:", mapInfo))
- gtid, _ := mapInfo["gtid"].(string)
- lteid, _ := mapInfo["lteid"].(string)
- if err != nil || gtid == "" || lteid == "" {
- UdpClient.WriteUdp([]byte("cgyx udp error"), udp.OP_NOOP, ra) //udp失败回写
- } else {
- //udp成功回写
- if k := util.ObjToString(mapInfo["key"]); k != "" {
- UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
- } else {
- k = fmt.Sprintf("%s-%s-%s", gtid, lteid, util.ObjToString(mapInfo["stype"]))
- UdpClient.WriteUdp([]byte(k), udp.OP_NOOP, ra)
- }
- log.Info("start dispose ...")
- getIntention(gtid, lteid, mapInfo)
- }
- case udp.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- log.Info("udp re", zap.String("data:", ok))
- UdpTaskMap.Delete(ok)
- }
- }
- }
- func NextNode(mapInfo map[string]interface{}) {
- var next = &net.UDPAddr{
- IP: net.ParseIP(config.Conf.Udp.Next.Addr),
- Port: util.IntAll(config.Conf.Udp.Next.Port),
- }
- mapInfo["stype"] = config.Conf.Udp.Next.Stype
- key := fmt.Sprintf("%s-%s-%s", util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), config.Conf.Udp.Next.Stype)
- mapInfo["key"] = key
- log.Info("udp next node", zap.Any("mapinfo:", mapInfo))
- datas, _ := json.Marshal(mapInfo)
- node := &UdpNode{datas, next, time.Now().Unix(), 0}
- UdpTaskMap.Store(key, node)
- _ = UdpClient.WriteUdp(datas, udp.OP_TYPE_DATA, next)
- }
- func checkMapJob() {
- if config.Conf.Mail.Send {
- log.Info("checkMapJob", zap.String("to:", config.Conf.Mail.To))
- for {
- UdpTaskMap.Range(func(k, v interface{}) bool {
- now := time.Now().Unix()
- node, _ := v.(*UdpNode)
- if now-node.timestamp > 120 {
- node.retry++
- if node.retry > 5 {
- UdpTaskMap.Delete(k)
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", config.Conf.Mail.Api, config.Conf.Mail.To, "field-py-dispose-send-fail", k.(string)))
- if err == nil {
- defer res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
- }
- } else {
- log.Info("udp重发", zap.Any("k:", k))
- UdpClient.WriteUdp(node.data, udp.OP_TYPE_DATA, node.addr)
- }
- } else if now-node.timestamp > 10 {
- log.Info("udp任务超时中..", zap.Any("k:", k))
- }
- return true
- })
- time.Sleep(60 * time.Second)
- }
- }
- }
- func updateMethod() {
- log.Info("updateMethod 保存...")
- arru := make([][]map[string]interface{}, 500)
- indexu := 0
- for {
- select {
- case v := <-updatePool:
- arru[indexu] = v
- indexu++
- if indexu == 500 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MgoB.UpdateBulk("bidding", arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateSp
- }()
- MgoB.UpdateBulk("bidding", arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 500)
- indexu = 0
- }
- }
- }
- }
- func updateEsMethod() {
- arru := make([][]map[string]interface{}, 200)
- indexu := 0
- for {
- select {
- case v := <-updateEsPool:
- arru[indexu] = v
- indexu++
- if indexu == 200 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS, arru...)
- }(arru)
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- case <-time.After(1000 * time.Millisecond):
- if indexu > 0 {
- updateEsSp <- true
- go func(arru [][]map[string]interface{}) {
- defer func() {
- <-updateEsSp
- }()
- Es.UpdateBulk(config.Conf.DB.Es.IndexS, config.Conf.DB.Es.TypeS, arru...)
- }(arru[:indexu])
- arru = make([][]map[string]interface{}, 200)
- indexu = 0
- }
- }
- }
- }
|