package main import ( "bufio" "compress/gzip" "encoding/json" "errors" "fmt" "go.uber.org/zap" "io" "io/ioutil" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "net" "net/http" "os" "runtime" "strings" "sync" "time" ) var ( MongoTool *mongodb.MongodbSim //updatePool chan []map[string]interface{} //updateSp chan bool //saveSize int CurrentColl string //collCount int saveLog = make(map[string]interface{}) UdpClient udp.UdpClient changeAddr *net.UDPAddr readPath string //文件夹目录 jyUpdatetime int64 ) func init() { InitLog() err := InitConfig() if err != nil { log.Info("init", zap.Any("InitConfig", err)) } InitMgo() readPath = GF.Env.Path changeAddr = &net.UDPAddr{ Port: GF.Env.ChangePort, IP: net.ParseIP(GF.Env.TargetIp), } log.Info("init", zap.Any("changeAddr", changeAddr)) } func main() { UdpClient = udp.UdpClient{Local: GF.Env.LocalPort, BufSize: 1024} UdpClient.Listen(processUdpMsg) log.Info("main", zap.String("Udp服务监听======= port:", GF.Env.LocalPort)) ch := make(chan bool, 1) <-ch } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { switch act { case udp.OP_TYPE_DATA: var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) if err != nil { log.Info("processUdpMsg", zap.Any("Unmarshal err", err)) } else if mapInfo != nil { log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo)) key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra) //拿到同步信号,开始同步数据 if _, ok := mapInfo["start"]; ok { if _, okk := mapInfo["path"]; okk { path := util.ObjToString(mapInfo["path"]) //udp 传递的路径 //没有指定配置文件的指定目录,就使用udp 传递目录 if path != "" { readPath = path } } //开始同步 go task(readPath) } } default: log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "======")) } } func task(path string) { files, _ := ioutil.ReadDir(path) jyUpdatetime = time.Now().Unix() //数据更新时间,更新quxy_change 使用 //annual_report_base/20221122/split.json.gz for _, f := range files { if f.IsDir() { start := time.Now() CurrentColl = f.Name() //annual_report_base collCount := 0 // 当前表的数据数量 log.Info("task", zap.String("collection name", f.Name())) //annual_report_base //util.Debug("collection name:---", f.Name()) if !strings.HasSuffix(path, "/") { path = path + "/" } subPath := path + f.Name() + "/" subFiles, _ := ioutil.ReadDir(subPath) for _, s := range subFiles { log.Info("task ", zap.String("当前文件:", s.Name())) if s.IsDir() { collCount = taskinfo(subPath+s.Name(), collCount) //annual_report_base/20221122 //// 增加WaitGroup计数 //go func(dirPath string, collCount int) { // collCount = taskinfo(dirPath, collCount) //}(subPath+s.Name(), collCount) //taskinfo(subPath + s.Name()) //annual_report_base/20221122 } } // 判断最后的数据不足500条时 执行 //if len(saveArr) > 0 { // tmps := saveArr // MongoTool.UpSertBulk(CurrentColl, tmps...) // saveArr = [][]map[string]interface{}{} //} duration := time.Since(start) result := map[string]interface{}{ "count": collCount, "duration": duration.Minutes(), } saveLog[f.Name()] = result //sendMsg += f.Name() + ":" + strconv.Itoa(collCount) + ";" } } //执行完毕,通知qyxy_change,更新企业变更信息 data := map[string]interface{}{ "start": GF.Env.ChangeUdp, "jy_updatetime": jyUpdatetime, } if GF.Env.ChangeUdp { SendUdpMsg(data, changeAddr) } log.Info("task", zap.String("执行完毕", path)) MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog}) //邮件通知 title := fmt.Sprintf("%s 数据处理完毕", path) // 将 map 转换为 JSON 字符串 jsonBytes, err := json.Marshal(saveLog) if err != nil { log.Info("task", zap.Any("json 转换失败", saveLog)) } // 转换为字符串并输出 jsonStr := string(jsonBytes) SendMail(title, jsonStr) } // taskinfo 读取压缩包文件 func taskinfo(path string, collCount int) int { count := 0 //读取的数量 file := path + "/split.json.gz" log.Info("taskinfo", zap.Any("current file", file)) // 检查文件是否存在 fileInfo, err := os.Stat(file) if err != nil { log.Error("Error opening file:", zap.Error(err)) return collCount } // 检查文件大小是否为0 if fileInfo.Size() == 0 { log.Warn(file, zap.Error(errors.New("文件大小为0"))) return collCount } // 打开本地gz格式压缩包 fr, err := os.Open(file) if err != nil { log.Info("taskinfo", zap.Any("err", err)) } else { fmt.Println("open file success!", file) } // defer: 在函数退出时,执行关闭文件 defer fr.Close() // 创建gzip文件读取对象 gr, err := gzip.NewReader(fr) if err != nil { log.Info("taskinfo", zap.Any("err", err)) } // defer: 在函数退出时,执行关闭gzip对象 defer gr.Close() bfRd := bufio.NewReader(gr) wg := sync.WaitGroup{} ch := make(chan bool, 5) for { line, err := bfRd.ReadBytes('\n') if err != nil { if err == io.EOF { log.Info("taskinfo", zap.String("EOF", "read gzip data finish! ")) break } else { log.Info("taskinfo", zap.Any("[read gzip data err]:", err)) } } else { count++ if count%5000 == 0 { printMemoryUsage() log.Info("taskinfo", zap.Int("current count:"+file, count)) } ch <- true wg.Add(1) go func(line []byte) { defer func() { <-ch wg.Done() }() hookfn(line) }(line) } } wg.Wait() collCount += count return collCount } // hookfn 处理数据,500条处理一次 func hookfn(line []byte) { tmp := make(map[string]interface{}) err := json.Unmarshal(line, &tmp) if err != nil { log.Info("hookfn", zap.Any("Unmarshal err", err)) } if len(tmp) == 0 { return } if util.IntAll(tmp["id"]) == 0 { MongoTool.Save("wcc"+CurrentColl, tmp) } else { tmp["_id"] = util.IntAll(tmp["id"]) tmp["id"] = fmt.Sprintf("%d", util.IntAll(tmp["id"])) tmp["jy_updatetime"] = time.Now().Unix() //if CurrentColl == "company_change" { // tmp["jy_updatetime"] = jyUpdatetime //} saveInfo := []map[string]interface{}{map[string]interface{}{"_id": tmp["_id"]}, map[string]interface{}{"$set": tmp}} tmpArr := [][]map[string]interface{}{saveInfo} MongoTool.UpSertBulk(CurrentColl, tmpArr...) } } // SendUdpMsg 通知处理企业新增数据 func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) { bytes, _ := json.Marshal(data) UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target) log.Info("SendUdpMsg", zap.Any("data", data), zap.Any("target", target)) } func printMemoryUsage() { var memStats runtime.MemStats runtime.ReadMemStats(&memStats) // 将字节转换为兆字节(MB) allocatedMB := float64(memStats.Alloc) / 1024 / 1024 totalAllocatedMB := float64(memStats.TotalAlloc) / 1024 / 1024 heapAllocMB := float64(memStats.HeapAlloc) / 1024 / 1024 log.Info("printMemoryUsage", zap.Any("当前程序已分配的内存大小", allocatedMB)) log.Info("printMemoryUsage", zap.Any("程序自启动以来总共分配的内存大小", totalAllocatedMB)) log.Info("printMemoryUsage", zap.Any("堆上当前已分配但尚未释放的内存", heapAllocMB)) log.Info("printMemoryUsage", zap.Any("堆上分配的对象数", memStats.HeapObjects)) } // SendMail 发送邮件 func SendMail(title, content string) { url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", GF.Email.Api, GF.Email.To, title, content) fmt.Println("url=>", url) res, err := http.Get(url) if err != nil { log.Info("SendMail", zap.Any("err", err)) } else { log.Info("SendMail", zap.Any("res", res)) } }