123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package main
- import (
- "encoding/json"
- "fmt"
- "github.com/robfig/cron/v3"
- "go.mongodb.org/mongo-driver/bson/primitive"
- "go.uber.org/zap"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "time"
- )
- var (
- MgoB *mongodb.MongodbSim
- UdpClient udp.UdpClient
- nextAddr *net.UDPAddr
- Repeat = false //抽取是否回复
- )
- //processUdpMsg 处理udp
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- //
- //var mapInfo map[string]interface{}
- //err := json.Unmarshal(data, &mapInfo)
- //log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
- //if err != nil {
- // fmt.Println(err)
- //}
- //if mapInfo != nil {
- // key, _ := mapInfo["key"].(string)
- // if key == "" {
- // key = "udpok"
- // }
- // go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- //}
- case udp.OP_NOOP:
- da := string(data)
- log.Info("收到回复数据", zap.String("data", da))
- if da == "ok" {
- Repeat = true
- }
- default:
- fmt.Println("current_listen : processUdpMsg =====", act)
- }
- }
- func main() {
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- if GF.Env.SpecType == "day" {
- _, err := c.AddFunc(GF.Env.Spec, dealIndexByDay)
- if err != nil {
- log.Info("main", zap.Any("AddFunc err", err))
- }
- } else if GF.Env.SpecType == "month" {
- _, err := c.AddFunc(GF.Env.Spec, dealIndexByMonth)
- if err != nil {
- log.Info("main", zap.Any("AddFunc err", err))
- }
- } else if GF.Env.SpecType == "hour" {
- _, err := c.AddFunc(GF.Env.Spec, dealIndexByHour)
- if err != nil {
- log.Info("main", zap.Any("AddFunc err", err))
- }
- }
- c.Start()
- defer c.Stop()
- //发送数据给抽取
- if GF.Env.Send {
- go SendPreData()
- }
- log.Info("main", zap.String("监听端口:", GF.Env.LocalPort))
- select {}
- }
- //SendPreData 发送预处理数据给 抽取程序
- func SendPreData() {
- f_sid := ""
- n_sid := ""
- f_lid := "" //file 最后一个分类结束ID
- n_lid := ""
- where := map[string]interface{}{
- "extracttype": 9,
- "biddingid": map[string]interface{}{
- "$exists": 1,
- },
- }
- fileDataa, _ := MgoB.Find("bidding_file", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
- fileDaa := *fileDataa
- f_sid = BsonIdToSId(fileDaa[0]["_id"])
- nomalDataa, _ := MgoB.Find("bidding_nomal", where, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
- nomalDaa := *nomalDataa
- n_sid = BsonIdToSId(nomalDaa[0]["_id"])
- for {
- fileData, _ := MgoB.Find("bidding_file", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
- fileDa := *fileData
- fid := BsonIdToSId(fileDa[0]["_id"])
- if fid != "" {
- f_lid = fid
- }
- nomalData, _ := MgoB.Find("bidding_nomal", map[string]interface{}{"extracttype": 9}, `{"_id":-1}`, map[string]interface{}{"_id": 1}, true, -1, 1)
- nomalDa := *nomalData
- nid := BsonIdToSId(nomalDa[0]["_id"])
- if nid != "" {
- n_lid = nid
- }
- //起始ID 等于 结束ID
- if f_lid == f_sid && n_lid == n_sid {
- time.Sleep(time.Second * 5)
- } else if f_lid == "" && n_lid == "" {
- time.Sleep(time.Second * 5)
- } else {
- //log.Info("main", zap.String("f_lid", f_lid), zap.String("n_lid", n_lid))
- data := map[string]interface{}{
- "file": fmt.Sprintf("%s-%s", f_sid, f_lid),
- "nomal": fmt.Sprintf("%s-%s", n_sid, n_lid),
- }
- SendUdpMsg(data, nextAddr)
- time.Sleep(time.Second * 3)
- if !Repeat {
- time.Sleep(time.Second * 10)
- SendUdpMsg(data, nextAddr)
- time.Sleep(time.Second * 10)
- SendUdpMsg(data, nextAddr)
- log.Error("没有收到回复", zap.Any("data", data))
- }
- f_sid = f_lid
- n_sid = n_lid
- }
- }
- }
- //BsonIdToSId 根据bsonID转string
- func BsonIdToSId(uid interface{}) string {
- if uid == nil {
- return ""
- } else if u, ok := uid.(string); ok {
- return u
- } else if u, ok := uid.(primitive.ObjectID); ok {
- return u.Hex()
- } else {
- return ""
- }
- }
- //SendUdpMsg 通知处理企业新增数据
- func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
- bytes, _ := json.Marshal(data)
- UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
- log.Info("SendUdpMsg", zap.Any("data", data))
- log.Info("SendUdpMsg", zap.Any("target", target))
- }
|