123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package main
- import (
- "encoding/json"
- "fmt"
- "io/ioutil"
- "log"
- mu "mfw/util"
- "net"
- "net/http"
- qu "qfw/util"
- "time"
- . "util"
- )
- //存储udp,防止多个udp同时执行
- //var DataChannel = make(chan map[string]string, 50)
- type UdpNode struct {
- Data []byte
- Addr *net.UDPAddr
- Timestamp int64
- Retry int
- }
- //初始化udp
- func InitUdp() {
- defer qu.Catch()
- Udpclient = mu.UdpClient{Local: UdpPort, BufSize: 1024}
- Udpclient.Listen(processUdpMsg)
- qu.Debug("Udp服务监听", UdpPort)
- }
- //udp调用信号
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- defer qu.Catch()
- switch act {
- case mu.OP_TYPE_DATA: //上个节点的数据
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- qu.Debug("err:", err, "mapInfo:", mapInfo)
- stype := qu.ObjToString(mapInfo["stype"])
- if err != nil || stype == "" {
- Udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) //回执
- } else if mapInfo != nil { //接收到udp,回执
- gtid := qu.ObjToString(mapInfo["gtid"])
- lteid := qu.ObjToString(mapInfo["lteid"])
- Udpclient.WriteUdp([]byte(gtid+"-"+lteid+"-"+stype), mu.OP_NOOP, ra)
- if gtid == "" || lteid == "" {
- qu.Debug("id段错误")
- } else if stype == "update" { //更新bidding
- UpdateBiddingData(gtid, lteid)
- }
- }
- case mu.OP_NOOP: //下个节点回应
- ok := string(data)
- if ok != "" {
- qu.Debug("ok:", ok)
- UdptaskMap.Delete(ok)
- }
- }
- }
- //发送udp的时候一个key参数
- func SendUdp(gtid, lteid, stype, udpaddr string, udpport int) {
- defer qu.Catch()
- key := gtid + "-" + lteid + "-" + stype
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": gtid,
- "lteid": lteid,
- "stype": stype,
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(udpaddr),
- Port: udpport,
- }
- node := &UdpNode{by, addr, time.Now().Unix(), 0}
- UdptaskMap.Store(key, node)
- qu.Debug("send:", stype, key)
- Udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- func checkMapJob() {
- //阿里云内网无法发送邮件
- log.Println("start checkMapJob")
- for {
- UdptaskMap.Range(func(k, v interface{}) bool {
- now := time.Now().Unix()
- node, _ := v.(*UdpNode)
- if now-node.Timestamp > 120 {
- node.Retry++
- if node.Retry > 10 {
- log.Println("udp重试失败", k)
- UdptaskMap.Delete(k)
- res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", Api, Tomail, "downloadfile-send-fail", k.(string)))
- if err == nil {
- defer res.Body.Close()
- read, err := ioutil.ReadAll(res.Body)
- log.Println("邮件发发送:", string(read), err)
- }
- } else {
- log.Println("udp重发", k)
- Udpclient.WriteUdp(node.Data, mu.OP_TYPE_DATA, node.Addr)
- }
- } else if now-node.Timestamp > 10 {
- log.Println("udp任务超时中..", k)
- }
- return true
- })
- time.Sleep(60 * time.Second)
- }
- }
- //func sendMail(key string) {
- // for i := 1; i <= 3; i++ {
- // res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, tomail, "redownload-receive-oss-fail", "发送附件识别失败:"+key))
- // if err == nil {
- // res.Body.Close()
- // read, err := ioutil.ReadAll(res.Body)
- // log.Println("邮件发送:", string(read), err)
- // break
- // }
- // }
- //}
|