package main import ( "bufio" "compress/gzip" "encoding/json" "fmt" "go.uber.org/zap" "io" "io/ioutil" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "os" "strings" "time" ) var ( MongoTool *mongodb.MongodbSim //updatePool chan []map[string]interface{} //updateSp chan bool //saveSize int CurrentColl string collCount int saveLog = make(map[string]interface{}) saveArr [][]map[string]interface{} UdpClient udp.UdpClient changeAddr *net.UDPAddr readPath string //文件夹目录 //lastID int64 ) func init() { InitLog() err := InitConfig() if err != nil { log.Info("init", zap.Any("InitConfig", err)) } InitMgo() readPath = GF.Env.Path changeAddr = &net.UDPAddr{ Port: GF.Env.ChangePort, IP: net.ParseIP(GF.Env.TargetIp), } log.Info("init", zap.Any("changeAddr", changeAddr)) } func main() { UdpClient = udp.UdpClient{Local: GF.Env.LocalPort, BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Info("main", zap.String("Udp服务监听======= port:", GF.Env.LocalPort)) ch := make(chan bool, 1) <-ch } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { log.Info("processUdpMsg", zap.Any("Unmarshal err", err)) } else if mapInfo != nil { log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo)) key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) //拿到同步信号,开始同步数据 if _, ok := mapInfo["start"]; ok { if _, okk := mapInfo["path"]; okk { path := util.ObjToString(mapInfo["path"]) //udp 传递的路径 //没有指定配置文件的指定目录,就使用udp 传递目录 if path != "" { readPath = path } } //开始同步 go task(readPath) } } default: log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "======")) } } func task(path string) { files, _ := ioutil.ReadDir(path) //拿到company_change 插入之前的ID dats, _ := MongoTool.Find("company_change", nil, `{"_id":-1}`, nil, true, -1, 1) ds := *dats lastID := ds[0]["_id"] //annual_report_base/20221122/split.json.gz for _, f := range files { if f.IsDir() { start := time.Now() CurrentColl = f.Name() //annual_report_base collCount = 0 // 当前表的数据数量 log.Info("task", zap.String("collection name", f.Name())) //annual_report_base //util.Debug("collection name:---", f.Name()) if !strings.HasSuffix(path, "/") { path = path + "/" } subPath := path + f.Name() + "/" subFiles, _ := ioutil.ReadDir(subPath) for _, s := range subFiles { util.Debug(s.Name()) if s.IsDir() { taskinfo(subPath + s.Name()) //annual_report_base/20221122 } } // 判断最后的数据不足500条时 执行 if len(saveArr) > 0 { tmps := saveArr MongoTool.UpSertBulk(CurrentColl, tmps...) saveArr = [][]map[string]interface{}{} } duration := time.Since(start) result := map[string]interface{}{ "count": collCount, "duration": duration.Minutes(), } saveLog[f.Name()] = result //sendMsg += f.Name() + ":" + strconv.Itoa(collCount) + ";" } } //执行完毕,通知qyxy_change,更新企业变更信息 data := map[string]interface{}{ "start": true, "start_id": lastID, } SendUdpMsg(data, changeAddr) log.Info("task", zap.String("执行完毕", path)) MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog}) //SendMail(sendMsg) } //taskinfo 读取压缩包文件 func taskinfo(path string) { count := 0 //读取的数量 file := path + "/split.json.gz" log.Info("taskinfo", zap.Any("current date", file)) // 打开本地gz格式压缩包 fr, err := os.Open(file) if err != nil { panic(err) } else { println("open file success!") } // defer: 在函数退出时,执行关闭文件 defer fr.Close() // 创建gzip文件读取对象 gr, err := gzip.NewReader(fr) if err != nil { panic(err) } // defer: 在函数退出时,执行关闭gzip对象 defer gr.Close() bfRd := bufio.NewReader(gr) for { line, err := bfRd.ReadBytes('\n') count = hookfn(line, count) if err != nil { if err == io.EOF { log.Info("taskinfo", zap.String("EOF", "read gzip data finish! ")) break } else { log.Info("taskinfo", zap.Any("[read gzip data err]:", err)) } } if count%5000 == 0 { log.Info("taskinfo", zap.Any("current exec", fmt.Sprintf("%s-%d", file, count))) //util.Debug("current exc---", file, count) } } } //hookfn 处理数据,500条处理一次 func hookfn(line []byte, count int) int { tmp := make(map[string]interface{}) err := json.Unmarshal(line, &tmp) if err != nil { log.Info("hookfn", zap.Any("Unmarshal err", err)) //util.Debug("err---", err) } count++ collCount++ tmp["_id"] = util.IntAll(tmp["id"]) tmp["id"] = fmt.Sprintf("%d", util.IntAll(tmp["id"])) saveInfo := []map[string]interface{}{ {"_id": tmp["_id"]}, {"$set": tmp}, } saveArr = append(saveArr, saveInfo) //500 条处理一次,打印一次记录 if len(saveArr) > 500 { tmps := saveArr MongoTool.UpSertBulk(CurrentColl, tmps...) saveArr = [][]map[string]interface{}{} } return count } //SendUdpMsg 通知处理企业新增数据 func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) { bytes, _ := json.Marshal(data) UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target) log.Info("SendUdpMsg", zap.Any("data", data), zap.Any("target", target)) }