123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- package main
- import (
- "bufio"
- "compress/gzip"
- "encoding/json"
- "errors"
- "fmt"
- "go.uber.org/zap"
- "io"
- "io/ioutil"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "net/http"
- "os"
- "runtime"
- "strings"
- "sync"
- "time"
- )
- var (
- MongoTool *mongodb.MongodbSim
- //updatePool chan []map[string]interface{}
- //updateSp chan bool
- //saveSize int
- CurrentColl string
- //collCount int
- saveLog = make(map[string]interface{})
- UdpClient udp.UdpClient
- changeAddr *net.UDPAddr
- readPath string //文件夹目录
- jyUpdatetime int64
- )
- func init() {
- InitLog()
- err := InitConfig()
- if err != nil {
- log.Info("init", zap.Any("InitConfig", err))
- }
- InitMgo()
- readPath = GF.Env.Path
- changeAddr = &net.UDPAddr{
- Port: GF.Env.ChangePort,
- IP: net.ParseIP(GF.Env.TargetIp),
- }
- log.Info("init", zap.Any("changeAddr", changeAddr))
- }
- func main() {
- UdpClient = udp.UdpClient{Local: GF.Env.LocalPort, BufSize: 1024}
- UdpClient.Listen(processUdpMsg)
- log.Info("main", zap.String("Udp服务监听======= port:", GF.Env.LocalPort))
- ch := make(chan bool, 1)
- <-ch
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
- } else if mapInfo != nil {
- log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- //拿到同步信号,开始同步数据
- if _, ok := mapInfo["start"]; ok {
- if _, okk := mapInfo["path"]; okk {
- path := util.ObjToString(mapInfo["path"]) //udp 传递的路径
- //没有指定配置文件的指定目录,就使用udp 传递目录
- if path != "" {
- readPath = path
- }
- }
- //开始同步
- go task(readPath)
- }
- }
- default:
- log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "======"))
- }
- }
- func task(path string) {
- files, _ := ioutil.ReadDir(path)
- jyUpdatetime = time.Now().Unix() //数据更新时间,更新quxy_change 使用
- //annual_report_base/20221122/split.json.gz
- for _, f := range files {
- if f.IsDir() {
- start := time.Now()
- CurrentColl = f.Name() //annual_report_base
- collCount := 0 // 当前表的数据数量
- log.Info("task", zap.String("collection name", f.Name())) //annual_report_base
- //util.Debug("collection name:---", f.Name())
- if !strings.HasSuffix(path, "/") {
- path = path + "/"
- }
- subPath := path + f.Name() + "/"
- subFiles, _ := ioutil.ReadDir(subPath)
- for _, s := range subFiles {
- log.Info("task ", zap.String("当前文件:", s.Name()))
- if s.IsDir() {
- collCount = taskinfo(subPath+s.Name(), collCount) //annual_report_base/20221122
- //// 增加WaitGroup计数
- //go func(dirPath string, collCount int) {
- // collCount = taskinfo(dirPath, collCount)
- //}(subPath+s.Name(), collCount)
- //taskinfo(subPath + s.Name()) //annual_report_base/20221122
- }
- }
- // 判断最后的数据不足500条时 执行
- //if len(saveArr) > 0 {
- // tmps := saveArr
- // MongoTool.UpSertBulk(CurrentColl, tmps...)
- // saveArr = [][]map[string]interface{}{}
- //}
- duration := time.Since(start)
- result := map[string]interface{}{
- "count": collCount,
- "duration": duration.Minutes(),
- }
- saveLog[f.Name()] = result
- //sendMsg += f.Name() + ":" + strconv.Itoa(collCount) + ";"
- }
- }
- //执行完毕,通知qyxy_change,更新企业变更信息
- data := map[string]interface{}{
- "start": GF.Env.ChangeUdp,
- "jy_updatetime": jyUpdatetime,
- }
- if GF.Env.ChangeUdp {
- SendUdpMsg(data, changeAddr)
- }
- log.Info("task", zap.String("执行完毕", path))
- MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog})
- //邮件通知
- title := fmt.Sprintf("%s 数据处理完毕", path)
- // 将 map 转换为 JSON 字符串
- jsonBytes, err := json.Marshal(saveLog)
- if err != nil {
- log.Info("task", zap.Any("json 转换失败", saveLog))
- }
- // 转换为字符串并输出
- jsonStr := string(jsonBytes)
- SendMail(title, jsonStr)
- }
- // taskinfo 读取压缩包文件
- func taskinfo(path string, collCount int) int {
- count := 0 //读取的数量
- file := path + "/split.json.gz"
- log.Info("taskinfo", zap.Any("current file", file))
- // 检查文件是否存在
- fileInfo, err := os.Stat(file)
- if err != nil {
- log.Error("Error opening file:", zap.Error(err))
- return collCount
- }
- // 检查文件大小是否为0
- if fileInfo.Size() == 0 {
- log.Warn(file, zap.Error(errors.New("文件大小为0")))
- return collCount
- }
- // 打开本地gz格式压缩包
- fr, err := os.Open(file)
- if err != nil {
- log.Info("taskinfo", zap.Any("err", err))
- } else {
- fmt.Println("open file success!", file)
- }
- // defer: 在函数退出时,执行关闭文件
- defer fr.Close()
- // 创建gzip文件读取对象
- gr, err := gzip.NewReader(fr)
- if err != nil {
- log.Info("taskinfo", zap.Any("err", err))
- }
- // defer: 在函数退出时,执行关闭gzip对象
- defer gr.Close()
- bfRd := bufio.NewReader(gr)
- wg := sync.WaitGroup{}
- ch := make(chan bool, 5)
- for {
- line, err := bfRd.ReadBytes('\n')
- if err != nil {
- if err == io.EOF {
- log.Info("taskinfo", zap.String("EOF", "read gzip data finish! "))
- break
- } else {
- log.Info("taskinfo", zap.Any("[read gzip data err]:", err))
- }
- } else {
- count++
- if count%5000 == 0 {
- printMemoryUsage()
- log.Info("taskinfo", zap.Int("current count:"+file, count))
- }
- ch <- true
- wg.Add(1)
- go func(line []byte) {
- defer func() {
- <-ch
- wg.Done()
- }()
- hookfn(line)
- }(line)
- }
- }
- wg.Wait()
- collCount += count
- return collCount
- }
- // hookfn 处理数据,500条处理一次
- func hookfn(line []byte) {
- tmp := make(map[string]interface{})
- err := json.Unmarshal(line, &tmp)
- if err != nil {
- log.Info("hookfn", zap.Any("Unmarshal err", err))
- }
- if len(tmp) == 0 {
- return
- }
- if util.IntAll(tmp["id"]) == 0 {
- MongoTool.Save("wcc"+CurrentColl, tmp)
- } else {
- tmp["_id"] = util.IntAll(tmp["id"])
- tmp["id"] = fmt.Sprintf("%d", util.IntAll(tmp["id"]))
- tmp["jy_updatetime"] = time.Now().Unix()
- //if CurrentColl == "company_change" {
- // tmp["jy_updatetime"] = jyUpdatetime
- //}
- saveInfo := []map[string]interface{}{map[string]interface{}{"_id": tmp["_id"]}, map[string]interface{}{"$set": tmp}}
- tmpArr := [][]map[string]interface{}{saveInfo}
- MongoTool.UpSertBulk(CurrentColl, tmpArr...)
- }
- }
- // SendUdpMsg 通知处理企业新增数据
- func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
- bytes, _ := json.Marshal(data)
- UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
- log.Info("SendUdpMsg", zap.Any("data", data), zap.Any("target", target))
- }
- func printMemoryUsage() {
- var memStats runtime.MemStats
- runtime.ReadMemStats(&memStats)
- // 将字节转换为兆字节(MB)
- allocatedMB := float64(memStats.Alloc) / 1024 / 1024
- totalAllocatedMB := float64(memStats.TotalAlloc) / 1024 / 1024
- heapAllocMB := float64(memStats.HeapAlloc) / 1024 / 1024
- log.Info("printMemoryUsage", zap.Any("当前程序已分配的内存大小", allocatedMB))
- log.Info("printMemoryUsage", zap.Any("程序自启动以来总共分配的内存大小", totalAllocatedMB))
- log.Info("printMemoryUsage", zap.Any("堆上当前已分配但尚未释放的内存", heapAllocMB))
- log.Info("printMemoryUsage", zap.Any("堆上分配的对象数", memStats.HeapObjects))
- }
- // SendMail 发送邮件
- func SendMail(title, content string) {
- url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", GF.Email.Api, GF.Email.To, title, content)
- fmt.Println("url=>", url)
- res, err := http.Get(url)
- if err != nil {
- log.Info("SendMail", zap.Any("err", err))
- } else {
- log.Info("SendMail", zap.Any("res", res))
- }
- }
|