123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package main
- import (
- util "app.yhyue.com/data_processing/common_utils"
- "app.yhyue.com/data_processing/common_utils/mongodb"
- "app.yhyue.com/data_processing/common_utils/udp"
- "encoding/json"
- "fmt"
- "net"
- "regexp"
- )
- var (
- Sysconfig map[string]interface{}
- MgoMix, Mgo *mongodb.MongodbSim
- dbname string
- CollQy, CollSave string
- lastId int64 // company_change 开始读取的ID
- ChangeMap []map[string]interface{}
- timeReg, _ = regexp.Compile(`^[\d]{4}-[\d]{1,2}-[\d]{1,2}`)
- localPort string // 本地监听端口
- UdpClient udp.UdpClient
- )
- func init() {
- util.ReadConfig(&Sysconfig)
- localPort = Sysconfig["local_port"].(string) //udp 本地监听地址
- UdpClient = udp.UdpClient{Local: localPort, BufSize: 1024}
- dbname = util.ObjToString(Sysconfig["dbName"])
- MgoMix = &mongodb.MongodbSim{
- MongodbAddr: util.ObjToString(Sysconfig["dbServer"]),
- Size: util.IntAll(Sysconfig["dbSize"]),
- DbName: dbname,
- UserName: util.ObjToString(Sysconfig["uname"]),
- Password: util.ObjToString(Sysconfig["upwd"]),
- }
- MgoMix.InitPool()
- Mgo = &mongodb.MongodbSim{
- MongodbAddr: util.ObjToString(Sysconfig["company_server"]), // 172.17.4.181:27001
- Size: 10,
- DbName: util.ObjToString(Sysconfig["company_db"]),
- }
- Mgo.InitPool()
- CollQy = Sysconfig["coll_qy"].(string) //qyxy_std,全量同步的时候用到
- CollSave = Sysconfig["coll_change"].(string) //qyxy_change,
- lastId = util.Int64All(Sysconfig["lastId"])
- ChangeMap = util.ObjArrToMapArr(Sysconfig["changeType"].([]interface{}))
- initChangeMap()
- }
- func initChangeMap() {
- for _, v := range ChangeMap {
- list := v["change_keyword"].([]interface{})
- var regList []string
- if len(list) > 0 {
- for _, v1 := range list {
- reg := ".*" + util.ObjToString(v1) + ".*"
- regList = append(regList, reg)
- }
- v["change_key_reg"] = regList
- } else {
- v["change_key_reg"] = []string{".*"}
- }
- }
- }
- func main() {
- UdpClient.Listen(processUdpMsg)
- util.Debug("Udp服务监听======= port:", localPort)
- ch := make(chan bool, 1)
- <-ch
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- util.Debug("processUdpMsg mapinfo :=>", mapInfo)
- //拿到同步信号,开始同步数据
- if _, ok := mapInfo["start"]; ok {
- go IncData() //增量数据
- }
- if err != nil {
- util.Debug("Unmarshal err :=>", err)
- }
- default:
- fmt.Println("qyxy_listen_data_new :=====")
- }
- }
|