package main import ( "encoding/json" "fmt" "go.uber.org/zap" utils "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "os" ) var ( Sysconfig map[string]interface{} //配置文件 Mgo *mongodb.MongodbSim Dbname string Dbcoll string Es *elastic.Elastic Index string //Itype string EsFields []string //Updatetime int64 localPort string // 本地监听端口 UdpClient udp.UdpClient ) var EsSaveCache = make(chan map[string]interface{}, 5000) var SP = make(chan bool, 5) func init() { utils.ReadConfig(&Sysconfig) //utils.ReadConfig("test.json", &Sysconfig) Dbname = Sysconfig["dbname"].(string) // Dbcoll = Sysconfig["dbcoll"].(string) //qyxy_std Mgo = &mongodb.MongodbSim{ MongodbAddr: Sysconfig["mgodb"].(string), Size: utils.IntAllDef(Sysconfig["dbsize"], 5), DbName: Dbname, UserName: Sysconfig["uname"].(string), Password: Sysconfig["upwd"].(string), } Mgo.InitPool() //es econf := Sysconfig["elastic"].(map[string]interface{}) Index = econf["index"].(string) //Itype = econf["itype"].(string) Es = &elastic.Elastic{ S_esurl: econf["addr"].(string), I_size: utils.IntAllDef(econf["pool"], 12), Username: econf["username"].(string), Password: econf["password"].(string), } Es.InitElasticSize() EsFields = utils.ObjArrToStringArr(econf["esfields"].([]interface{})) //Updatetime = utils.Int64All(Sysconfig["updatetime"]) localPort = Sysconfig["local_port"].(string) //udp 本地监听地址 UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024} InitLog() } func InitLog() { err := log.InitLog( log.Path("./logs/log.out"), log.Level("info"), log.Compress(true), log.MaxSize(10), log.MaxBackups(10), log.MaxAge(7), log.Format("json"), ) if err != nil { fmt.Printf("InitLog failed: %v\n", err) os.Exit(1) } } func main() { UdpClient.Listen(processUdpMsg) log.Info("main", zap.String("Udp服务监听", localPort)) //go StdAll() go SaveEs() ch := make(chan bool, 1) <-ch } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { log.Info("processUdpMsg", zap.Any("Unmarshal err", err)) } log.Info("processUdpMsg", zap.Any("mapinfo", mapInfo)) if tasktype, ok := mapInfo["stype"].(string); ok { switch tasktype { case "stdall": go StdAll() default: fmt.Println("tasktype", tasktype) } } else { //拿到同步信号,开始同步数据 if _, ok := mapInfo["start"]; ok { var start_time, end_time int64 if _, ok2 := mapInfo["start_time"]; ok2 { start_time = utils.Int64All(mapInfo["start_time"]) end_time = utils.Int64All(mapInfo["end_time"]) } var q map[string]interface{} if start_time > 0 { if end_time > 0 { q = map[string]interface{}{ "updatetime": map[string]interface{}{ "$gte": start_time, "$lte": end_time, }, } } else { q = map[string]interface{}{ "updatetime": map[string]interface{}{ "$gte": start_time, }, } } go StdAdd(q) //读取qyxy_std 数据,放入es 数组 } else { fmt.Println("参数 start_time 为0") } } } default: log.Info("processUdpMsg", zap.String("mapinfo", string(data))) } }