|
@@ -13,6 +13,7 @@ import (
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
|
|
|
"net"
|
|
|
"os"
|
|
|
+ "runtime"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"time"
|
|
@@ -47,7 +48,7 @@ func main() {
|
|
|
IP: net.ParseIP(GF.Env.Targetip),
|
|
|
}
|
|
|
log.Info("main", zap.Any("qyxyEsAddr", qyxyEsAddr))
|
|
|
- startTime = time.Now().Unix()
|
|
|
+
|
|
|
UdpClient.Listen(processUdpMsg)
|
|
|
log.Info("main", zap.String("Udp服务监听本地端口", localPort))
|
|
|
|
|
@@ -83,6 +84,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
|
|
|
// 开始执行
|
|
|
log.Info("processUdpMsg", zap.String("readPath", readPath))
|
|
|
if readPath != "" {
|
|
|
+ startTime = time.Now().Unix()
|
|
|
go dealPath(readPath)
|
|
|
}
|
|
|
}
|
|
@@ -97,7 +99,7 @@ func dealPath(path string) {
|
|
|
if !strings.HasSuffix(path, "/") {
|
|
|
path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
|
|
|
}
|
|
|
- var wg sync.WaitGroup
|
|
|
+ //var wg sync.WaitGroup
|
|
|
//std 程序只需要关注6个表
|
|
|
for _, c := range CollArr {
|
|
|
subPath := path + c + "/"
|
|
@@ -107,10 +109,11 @@ func dealPath(path string) {
|
|
|
log.Info("dealPath", zap.Any("os.Stat err", subPath))
|
|
|
continue
|
|
|
}
|
|
|
- wg.Add(1)
|
|
|
- go dealSubPath(c, subPath, &wg)
|
|
|
+ //wg.Add(1)
|
|
|
+ dealSubPath(c, subPath)
|
|
|
+ //go dealSubPath(c, subPath, &wg)
|
|
|
}
|
|
|
- wg.Wait()
|
|
|
+ //wg.Wait()
|
|
|
|
|
|
//更新 nseo_id
|
|
|
updateStd()
|
|
@@ -137,25 +140,26 @@ func dealPath(path string) {
|
|
|
SendUdpMsg(data, qyxyEsAddr)
|
|
|
}
|
|
|
|
|
|
-//dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
|
|
|
-//c 当前表;
|
|
|
-func dealSubPath(c, subPath string, wg *sync.WaitGroup) {
|
|
|
- defer wg.Done()
|
|
|
+// dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
|
|
|
+// c 当前表;
|
|
|
+func dealSubPath(c, subPath string) {
|
|
|
+ //defer wg.Done()
|
|
|
log.Info("dealSubPath", zap.String("开始处理path:", subPath))
|
|
|
start := time.Now()
|
|
|
- var fileWg sync.WaitGroup
|
|
|
+ //var fileWg sync.WaitGroup
|
|
|
var linesMap sync.Map
|
|
|
|
|
|
subFiles, _ := ioutil.ReadDir(subPath)
|
|
|
for _, s := range subFiles { //七天的文件夹名
|
|
|
if s.IsDir() {
|
|
|
- fileWg.Add(1)
|
|
|
+ //fileWg.Add(1)
|
|
|
+ dealinfo(c, subPath+s.Name(), &linesMap)
|
|
|
///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
|
|
|
- go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap)
|
|
|
+ //go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- fileWg.Wait()
|
|
|
+ //fileWg.Wait()
|
|
|
|
|
|
cCount, _ := linesMap.Load(c)
|
|
|
|
|
@@ -168,30 +172,40 @@ func dealSubPath(c, subPath string, wg *sync.WaitGroup) {
|
|
|
savelog.Store(c, result)
|
|
|
}
|
|
|
|
|
|
-func dealinfo(c, path string, wg *sync.WaitGroup, linesMap *sync.Map) {
|
|
|
- defer wg.Done()
|
|
|
+func dealinfo(c, path string, linesMap *sync.Map) {
|
|
|
+ //defer wg.Done()
|
|
|
count := 0
|
|
|
file := path + "/split.json.gz"
|
|
|
log.Info("dealinfo", zap.Any("current date", file))
|
|
|
- _, err := os.Stat(file)
|
|
|
+ fileInfo, err := os.Stat(file)
|
|
|
+
|
|
|
+ if fileInfo.Size() == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
if err == nil {
|
|
|
// 打开本地gz格式压缩包
|
|
|
fr, err := os.Open(file)
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Info("dealinfo", zap.Error(err))
|
|
|
+ return
|
|
|
} else {
|
|
|
- println("open file success!")
|
|
|
+ fmt.Println("open file success!", file)
|
|
|
}
|
|
|
// defer: 在函数退出时,执行关闭文件
|
|
|
defer fr.Close()
|
|
|
// 创建gzip文件读取对象
|
|
|
gr, err := gzip.NewReader(fr)
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Info("reader "+file, zap.Error(err))
|
|
|
+ return
|
|
|
}
|
|
|
// defer: 在函数退出时,执行关闭gzip对象
|
|
|
defer gr.Close()
|
|
|
|
|
|
+ wg := sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 3)
|
|
|
+
|
|
|
bfRd := bufio.NewReader(gr)
|
|
|
for {
|
|
|
line, err := bfRd.ReadBytes('\n')
|
|
@@ -206,18 +220,28 @@ func dealinfo(c, path string, wg *sync.WaitGroup, linesMap *sync.Map) {
|
|
|
}
|
|
|
if len(line) > 0 {
|
|
|
count++
|
|
|
- hookfn(c, line)
|
|
|
linesMap.Store(c, count)
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(c string, line []byte) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ hookfn(c, line)
|
|
|
+ }(c, line)
|
|
|
}
|
|
|
|
|
|
- if count%1000 == 0 {
|
|
|
+ if count%5000 == 0 {
|
|
|
+ printMemoryUsage()
|
|
|
log.Info("dealinfo", zap.Any("current exc---", fmt.Sprintf("%s-%d", file, count)))
|
|
|
}
|
|
|
}
|
|
|
+ wg.Wait()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-//hookfn 处理拿到的每行数据
|
|
|
+// hookfn 处理拿到的每行数据
|
|
|
func hookfn(c string, line []byte) {
|
|
|
tmp := make(map[string]interface{})
|
|
|
err := json.Unmarshal(line, &tmp)
|
|
@@ -253,7 +277,7 @@ func hookfn(c string, line []byte) {
|
|
|
|
|
|
}
|
|
|
|
|
|
-//dealCompanyBase company_base数据表
|
|
|
+// dealCompanyBase company_base数据表
|
|
|
func dealCompanyBase(data map[string]interface{}) {
|
|
|
update := make(map[string]interface{})
|
|
|
save := make(map[string]interface{})
|
|
@@ -416,7 +440,7 @@ func dealCompanyBase(data map[string]interface{}) {
|
|
|
|
|
|
}
|
|
|
|
|
|
-//dealCompanyEmployee company_employee
|
|
|
+// dealCompanyEmployee company_employee
|
|
|
func dealCompanyEmployee(data map[string]interface{}) {
|
|
|
save := make(map[string]interface{})
|
|
|
save["_id"] = data["company_id"]
|
|
@@ -480,7 +504,7 @@ func dealCompanyEmployee(data map[string]interface{}) {
|
|
|
//}
|
|
|
}
|
|
|
|
|
|
-//dealCompanyPartner
|
|
|
+// dealCompanyPartner
|
|
|
func dealCompanyPartner(data map[string]interface{}) {
|
|
|
save := make(map[string]interface{})
|
|
|
save["_id"] = data["company_id"]
|
|
@@ -560,7 +584,7 @@ func dealCompanyPartner(data map[string]interface{}) {
|
|
|
//}
|
|
|
}
|
|
|
|
|
|
-//dealAnnualReportBase annual_report_base
|
|
|
+// dealAnnualReportBase annual_report_base
|
|
|
func dealAnnualReportBase(data map[string]interface{}) {
|
|
|
save := make(map[string]interface{})
|
|
|
save["_id"] = data["company_id"]
|
|
@@ -666,7 +690,7 @@ func dealAnnualReportBase(data map[string]interface{}) {
|
|
|
//}
|
|
|
}
|
|
|
|
|
|
-//dealHistoryName company_history_name
|
|
|
+// dealHistoryName company_history_name
|
|
|
func dealHistoryName(data map[string]interface{}) {
|
|
|
save := make(map[string]interface{})
|
|
|
save["_id"] = data["company_id"]
|
|
@@ -704,7 +728,7 @@ func dealHistoryName(data map[string]interface{}) {
|
|
|
|
|
|
}
|
|
|
|
|
|
-//dealAnnualReportWebsite annual_report_website
|
|
|
+// dealAnnualReportWebsite annual_report_website
|
|
|
func dealAnnualReportWebsite(data map[string]interface{}) {
|
|
|
save := make(map[string]interface{})
|
|
|
save["_id"] = data["company_id"]
|
|
@@ -737,3 +761,19 @@ func dealAnnualReportWebsite(data map[string]interface{}) {
|
|
|
//}
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+func printMemoryUsage() {
|
|
|
+ var memStats runtime.MemStats
|
|
|
+ runtime.ReadMemStats(&memStats)
|
|
|
+
|
|
|
+ // 将字节转换为兆字节(MB)
|
|
|
+ allocatedMB := float64(memStats.Alloc) / 1024 / 1024
|
|
|
+ totalAllocatedMB := float64(memStats.TotalAlloc) / 1024 / 1024
|
|
|
+ heapAllocMB := float64(memStats.HeapAlloc) / 1024 / 1024
|
|
|
+
|
|
|
+ log.Info("printMemoryUsage", zap.Any("当前程序已分配的内存大小", allocatedMB))
|
|
|
+ log.Info("printMemoryUsage", zap.Any("程序自启动以来总共分配的内存大小", totalAllocatedMB))
|
|
|
+ log.Info("printMemoryUsage", zap.Any("堆上当前已分配但尚未释放的内存", heapAllocMB))
|
|
|
+ log.Info("printMemoryUsage", zap.Any("堆上分配的对象数", memStats.HeapObjects))
|
|
|
+
|
|
|
+}
|