123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- package main
- import (
- "bufio"
- "compress/gzip"
- "encoding/json"
- "fmt"
- "go.uber.org/zap"
- "io"
- "io/ioutil"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "net"
- "os"
- "strings"
- "time"
- )
- var (
- MongoTool *mongodb.MongodbSim
- //updatePool chan []map[string]interface{}
- //updateSp chan bool
- //saveSize int
- CurrentColl string
- collCount int
- saveLog = make(map[string]interface{})
- saveArr [][]map[string]interface{}
- UdpClient udp.UdpClient
- changeAddr *net.UDPAddr
- readPath string //文件夹目录
- //lastID int64
- )
- func init() {
- InitLog()
- err := InitConfig()
- if err != nil {
- log.Info("init", zap.Any("InitConfig", err))
- }
- InitMgo()
- readPath = GF.Env.Path
- changeAddr = &net.UDPAddr{
- Port: GF.Env.ChangePort,
- IP: net.ParseIP(GF.Env.TargetIp),
- }
- log.Info("init", zap.Any("changeAddr", changeAddr))
- }
- func main() {
- UdpClient = udp.UdpClient{Local: GF.Env.LocalPort, BufSize: 1024}
- UdpClient.Listen(processUdpMsg)
- log.Info("main", zap.String("Udp服务监听======= port:", GF.Env.LocalPort))
- ch := make(chan bool, 1)
- <-ch
- }
- func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
- switch act {
- case udp.OP_TYPE_DATA:
- var mapInfo map[string]interface{}
- err := json.Unmarshal(data, &mapInfo)
- if err != nil {
- log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
- } else if mapInfo != nil {
- log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
- key, _ := mapInfo["key"].(string)
- if key == "" {
- key = "udpok"
- }
- go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
- //拿到同步信号,开始同步数据
- if _, ok := mapInfo["start"]; ok {
- if _, okk := mapInfo["path"]; okk {
- path := util.ObjToString(mapInfo["path"]) //udp 传递的路径
- //没有指定配置文件的指定目录,就使用udp 传递目录
- if path != "" {
- readPath = path
- }
- }
- //开始同步
- go task(readPath)
- }
- }
- default:
- log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "======"))
- }
- }
- func task(path string) {
- files, _ := ioutil.ReadDir(path)
- //拿到company_change 插入之前的ID
- dats, _ := MongoTool.Find("company_change", nil, `{"_id":-1}`, nil, true, -1, 1)
- ds := *dats
- lastID := ds[0]["_id"]
- //annual_report_base/20221122/split.json.gz
- for _, f := range files {
- if f.IsDir() {
- start := time.Now()
- CurrentColl = f.Name() //annual_report_base
- collCount = 0 // 当前表的数据数量
- log.Info("task", zap.String("collection name", f.Name())) //annual_report_base
- //util.Debug("collection name:---", f.Name())
- if !strings.HasSuffix(path, "/") {
- path = path + "/"
- }
- subPath := path + f.Name() + "/"
- subFiles, _ := ioutil.ReadDir(subPath)
- for _, s := range subFiles {
- util.Debug(s.Name())
- if s.IsDir() {
- taskinfo(subPath + s.Name()) //annual_report_base/20221122
- }
- }
- // 判断最后的数据不足500条时 执行
- if len(saveArr) > 0 {
- tmps := saveArr
- MongoTool.UpSertBulk(CurrentColl, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- duration := time.Since(start)
- result := map[string]interface{}{
- "count": collCount,
- "duration": duration.Minutes(),
- }
- saveLog[f.Name()] = result
- //sendMsg += f.Name() + ":" + strconv.Itoa(collCount) + ";"
- }
- }
- //执行完毕,通知qyxy_change,更新企业变更信息
- data := map[string]interface{}{
- "start": true,
- "start_id": lastID,
- }
- SendUdpMsg(data, changeAddr)
- log.Info("task", zap.String("执行完毕", path))
- MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog})
- //SendMail(sendMsg)
- }
- //taskinfo 读取压缩包文件
- func taskinfo(path string) {
- count := 0 //读取的数量
- file := path + "/split.json.gz"
- log.Info("taskinfo", zap.Any("current date", file))
- // 打开本地gz格式压缩包
- fr, err := os.Open(file)
- if err != nil {
- panic(err)
- } else {
- println("open file success!")
- }
- // defer: 在函数退出时,执行关闭文件
- defer fr.Close()
- // 创建gzip文件读取对象
- gr, err := gzip.NewReader(fr)
- if err != nil {
- panic(err)
- }
- // defer: 在函数退出时,执行关闭gzip对象
- defer gr.Close()
- bfRd := bufio.NewReader(gr)
- for {
- line, err := bfRd.ReadBytes('\n')
- count = hookfn(line, count)
- if err != nil {
- if err == io.EOF {
- log.Info("taskinfo", zap.String("EOF", "read gzip data finish! "))
- break
- } else {
- log.Info("taskinfo", zap.Any("[read gzip data err]:", err))
- }
- }
- if count%5000 == 0 {
- log.Info("taskinfo", zap.Any("current exec", fmt.Sprintf("%s-%d", file, count)))
- //util.Debug("current exc---", file, count)
- }
- }
- }
- //hookfn 处理数据,500条处理一次
- func hookfn(line []byte, count int) int {
- tmp := make(map[string]interface{})
- err := json.Unmarshal(line, &tmp)
- if err != nil {
- log.Info("hookfn", zap.Any("Unmarshal err", err))
- //util.Debug("err---", err)
- }
- count++
- collCount++
- tmp["_id"] = util.IntAll(tmp["id"])
- tmp["id"] = fmt.Sprintf("%d", util.IntAll(tmp["id"]))
- saveInfo := []map[string]interface{}{
- {"_id": tmp["_id"]},
- {"$set": tmp},
- }
- saveArr = append(saveArr, saveInfo)
- //500 条处理一次,打印一次记录
- if len(saveArr) > 500 {
- tmps := saveArr
- MongoTool.UpSertBulk(CurrentColl, tmps...)
- saveArr = [][]map[string]interface{}{}
- }
- return count
- }
- //SendUdpMsg 通知处理企业新增数据
- func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
- bytes, _ := json.Marshal(data)
- UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
- log.Info("SendUdpMsg", zap.Any("data", data), zap.Any("target", target))
- }
|