main.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package main
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "encoding/json"
  6. "fmt"
  7. "go.uber.org/zap"
  8. "io"
  9. "io/ioutil"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  14. "net"
  15. "os"
  16. "strings"
  17. "time"
  18. )
  19. var (
  20. MongoTool *mongodb.MongodbSim
  21. //updatePool chan []map[string]interface{}
  22. //updateSp chan bool
  23. //saveSize int
  24. CurrentColl string
  25. collCount int
  26. saveLog = make(map[string]interface{})
  27. saveArr [][]map[string]interface{}
  28. UdpClient udp.UdpClient
  29. changeAddr *net.UDPAddr
  30. readPath string //文件夹目录
  31. //lastID int64
  32. )
  33. func init() {
  34. InitLog()
  35. err := InitConfig()
  36. if err != nil {
  37. log.Info("init", zap.Any("InitConfig", err))
  38. }
  39. InitMgo()
  40. readPath = GF.Env.Path
  41. changeAddr = &net.UDPAddr{
  42. Port: GF.Env.ChangePort,
  43. IP: net.ParseIP(GF.Env.TargetIp),
  44. }
  45. log.Info("init", zap.Any("changeAddr", changeAddr))
  46. }
  47. func main() {
  48. UdpClient = udp.UdpClient{Local: GF.Env.LocalPort, BufSize: 1024}
  49. UdpClient.Listen(processUdpMsg)
  50. log.Info("main", zap.String("Udp服务监听======= port:", GF.Env.LocalPort))
  51. ch := make(chan bool, 1)
  52. <-ch
  53. }
  54. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  55. switch act {
  56. case udp.OP_TYPE_DATA:
  57. var mapInfo map[string]interface{}
  58. err := json.Unmarshal(data, &mapInfo)
  59. if err != nil {
  60. log.Info("processUdpMsg", zap.Any("Unmarshal err", err))
  61. } else if mapInfo != nil {
  62. log.Info("processUdpMsg", zap.Any("mapInfo", mapInfo))
  63. key, _ := mapInfo["key"].(string)
  64. if key == "" {
  65. key = "udpok"
  66. }
  67. go UdpClient.WriteUdp([]byte(key), udp.OP_NOOP, ra)
  68. //拿到同步信号,开始同步数据
  69. if _, ok := mapInfo["start"]; ok {
  70. if _, okk := mapInfo["path"]; okk {
  71. path := util.ObjToString(mapInfo["path"]) //udp 传递的路径
  72. //没有指定配置文件的指定目录,就使用udp 传递目录
  73. if path != "" {
  74. readPath = path
  75. }
  76. }
  77. //开始同步
  78. go task(readPath)
  79. }
  80. }
  81. default:
  82. log.Info("processUdpMsg", zap.String("qyxy_listen_data_new", "======"))
  83. }
  84. }
  85. func task(path string) {
  86. files, _ := ioutil.ReadDir(path)
  87. //拿到company_change 插入之前的ID
  88. dats, _ := MongoTool.Find("company_change", nil, `{"_id":-1}`, nil, true, -1, 1)
  89. ds := *dats
  90. lastID := ds[0]["_id"]
  91. //annual_report_base/20221122/split.json.gz
  92. for _, f := range files {
  93. if f.IsDir() {
  94. start := time.Now()
  95. CurrentColl = f.Name() //annual_report_base
  96. collCount = 0 // 当前表的数据数量
  97. log.Info("task", zap.String("collection name", f.Name())) //annual_report_base
  98. //util.Debug("collection name:---", f.Name())
  99. if !strings.HasSuffix(path, "/") {
  100. path = path + "/"
  101. }
  102. subPath := path + f.Name() + "/"
  103. subFiles, _ := ioutil.ReadDir(subPath)
  104. for _, s := range subFiles {
  105. util.Debug(s.Name())
  106. if s.IsDir() {
  107. taskinfo(subPath + s.Name()) //annual_report_base/20221122
  108. }
  109. }
  110. // 判断最后的数据不足500条时 执行
  111. if len(saveArr) > 0 {
  112. tmps := saveArr
  113. MongoTool.UpSertBulk(CurrentColl, tmps...)
  114. saveArr = [][]map[string]interface{}{}
  115. }
  116. duration := time.Since(start)
  117. result := map[string]interface{}{
  118. "count": collCount,
  119. "duration": duration.Minutes(),
  120. }
  121. saveLog[f.Name()] = result
  122. //sendMsg += f.Name() + ":" + strconv.Itoa(collCount) + ";"
  123. }
  124. }
  125. //执行完毕,通知qyxy_change,更新企业变更信息
  126. data := map[string]interface{}{
  127. "start": true,
  128. "start_id": lastID,
  129. }
  130. SendUdpMsg(data, changeAddr)
  131. log.Info("task", zap.String("执行完毕", path))
  132. MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog})
  133. //SendMail(sendMsg)
  134. }
  135. //taskinfo 读取压缩包文件
  136. func taskinfo(path string) {
  137. count := 0 //读取的数量
  138. file := path + "/split.json.gz"
  139. log.Info("taskinfo", zap.Any("current date", file))
  140. // 打开本地gz格式压缩包
  141. fr, err := os.Open(file)
  142. if err != nil {
  143. panic(err)
  144. } else {
  145. println("open file success!")
  146. }
  147. // defer: 在函数退出时,执行关闭文件
  148. defer fr.Close()
  149. // 创建gzip文件读取对象
  150. gr, err := gzip.NewReader(fr)
  151. if err != nil {
  152. panic(err)
  153. }
  154. // defer: 在函数退出时,执行关闭gzip对象
  155. defer gr.Close()
  156. bfRd := bufio.NewReader(gr)
  157. for {
  158. line, err := bfRd.ReadBytes('\n')
  159. count = hookfn(line, count)
  160. if err != nil {
  161. if err == io.EOF {
  162. log.Info("taskinfo", zap.String("EOF", "read gzip data finish! "))
  163. break
  164. } else {
  165. log.Info("taskinfo", zap.Any("[read gzip data err]:", err))
  166. }
  167. }
  168. if count%5000 == 0 {
  169. log.Info("taskinfo", zap.Any("current exec", fmt.Sprintf("%s-%d", file, count)))
  170. //util.Debug("current exc---", file, count)
  171. }
  172. }
  173. }
  174. //hookfn 处理数据,500条处理一次
  175. func hookfn(line []byte, count int) int {
  176. tmp := make(map[string]interface{})
  177. err := json.Unmarshal(line, &tmp)
  178. if err != nil {
  179. log.Info("hookfn", zap.Any("Unmarshal err", err))
  180. //util.Debug("err---", err)
  181. }
  182. count++
  183. collCount++
  184. tmp["_id"] = util.IntAll(tmp["id"])
  185. tmp["id"] = fmt.Sprintf("%d", util.IntAll(tmp["id"]))
  186. saveInfo := []map[string]interface{}{
  187. {"_id": tmp["_id"]},
  188. {"$set": tmp},
  189. }
  190. saveArr = append(saveArr, saveInfo)
  191. //500 条处理一次,打印一次记录
  192. if len(saveArr) > 500 {
  193. tmps := saveArr
  194. MongoTool.UpSertBulk(CurrentColl, tmps...)
  195. saveArr = [][]map[string]interface{}{}
  196. }
  197. return count
  198. }
  199. //SendUdpMsg 通知处理企业新增数据
  200. func SendUdpMsg(data map[string]interface{}, target *net.UDPAddr) {
  201. bytes, _ := json.Marshal(data)
  202. UdpClient.WriteUdp(bytes, udp.OP_TYPE_DATA, target)
  203. log.Info("SendUdpMsg", zap.Any("data", data), zap.Any("target", target))
  204. }