123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package main
- import (
- "encoding/json"
- "fmt"
- "go.uber.org/zap"
- utils "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "os"
- )
- var (
- Sysconfig map[string]interface{} //配置文件
- Mgo *mongodb.MongodbSim
- Dbname string
- Dbcoll string
- Es *elastic.Elastic
- Index string
- //Itype string
- EsFields []string
- //Updatetime int64
- localPort string // 本地监听端口
- UdpClient udp.UdpClient
- )
- var EsSaveCache = make(chan map[string]interface{}, 5000)
- var SP = make(chan bool, 5)
- func init() {
- utils.ReadConfig(&Sysconfig)
- //utils.ReadConfig("test.json", &Sysconfig)
- Dbname = Sysconfig["dbname"].(string) //
- Dbcoll = Sysconfig["dbcoll"].(string) //qyxy_std
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: Sysconfig["mgodb"].(string),
- Size: utils.IntAllDef(Sysconfig["dbsize"], 5),
- DbName: Dbname,
- UserName: Sysconfig["uname"].(string),
- Password: Sysconfig["upwd"].(string),
- }
- Mgo.InitPool()
- //es
- econf := Sysconfig["elastic"].(map[string]interface{})
- Index = econf["index"].(string)
- //Itype = econf["itype"].(string)
- Es = &elastic.Elastic{
- S_esurl: econf["addr"].(string),
- I_size: utils.IntAllDef(econf["pool"], 12),
- Username: econf["username"].(string),
- Password: econf["password"].(string),
- }
- Es.InitElasticSize()
- EsFields = utils.ObjArrToStringArr(econf["esfields"].([]interface{}))
- //Updatetime = utils.Int64All(Sysconfig["updatetime"])
- localPort = Sysconfig["local_port"].(string) //udp 本地监听地址
- UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
- InitLog()
- }
- func InitLog() {
- err := log.InitLog(
- log.Path("./logs/log.out"),
- log.Level("info"),
- log.Compress(true),
- log.MaxSize(10),
- log.MaxBackups(10),
- log.MaxAge(7),
- log.Format("json"),
- )
- if err != nil {
- fmt.Printf("InitLog failed: %v\n", err)
- os.Exit(1)
- }
- }
- func main() {
- UdpClient.Listen(processUdpMsg)
- log.Info("main", zap.String("Udp服务监听", localPort))
- //go StdAll()
- go SaveEs()
- ch := make(chan bool, 1)
- <-ch
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
- }
- log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo))
- if mapInfo != nil {
- //相应UDP回答
- key := utils.ObjToString(mapInfo["key"])
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- }
- if tasktype, ok := mapInfo["stype"].(string); ok {
- switch tasktype {
- case "stdall":
- go StdAll()
- default:
- fmt.Println("tasktype", tasktype)
- }
- } else {
- //拿到同步信号,开始同步数据
- if _, ok := mapInfo["start"]; ok {
- var start_time, end_time int64
- if _, ok2 := mapInfo["start_time"]; ok2 {
- start_time = utils.Int64All(mapInfo["start_time"])
- end_time = utils.Int64All(mapInfo["end_time"])
- }
- var q map[string]interface{}
- if start_time > 0 {
- if end_time > 0 {
- q = map[string]interface{}{
- "updatetime": map[string]interface{}{
- "$gte": start_time,
- "$lte": end_time,
- },
- }
- } else {
- q = map[string]interface{}{
- "updatetime": map[string]interface{}{
- "$gte": start_time,
- },
- }
- }
- go StdAdd(q) //读取qyxy_std 数据,放入es 数组
- } else {
- fmt.Println("参数 start_time 为0")
- }
- }
- }
- default:
- log.Info("processUdpMsg", zap.String("mapinfo", string(data)))
- }
- }
|