Przeglądaj źródła

更新std;多协程处理

wcc 2 lat temu
rodzic
commit
1e065cac4a
4 zmienionych plików z 130 dodań i 106 usunięć
  1. BIN
      qyxy_es_new/qyxy_es7_0715
  2. 1 1
      qyxy_es_new/task.go
  3. 5 4
      qyxy_std_new/config.toml
  4. 124 101
      qyxy_std_new/main.go

BIN
qyxy_es_new/qyxy_es7_0715


+ 1 - 1
qyxy_es_new/task.go

@@ -36,7 +36,7 @@ func StdAdd(q interface{}) {
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
 
-	pool := make(chan bool, 5)
+	pool := make(chan bool, 10)
 	wg := &sync.WaitGroup{}
 	//q := bson.M{"_id": "affe29f8d061f3faa4170cafba41f316"}
 	//q := bson.M{"updatetime": bson.M{"$gt": Updatetime}}

+ 5 - 4
qyxy_std_new/config.toml

@@ -1,11 +1,12 @@
 [env]
-    addr = "172.17.145.163:27083"
+#    addr = "172.17.145.163:27083"
+    addr = "127.0.0.1:27017"
     dbname = "mixdata"
     dbsize =10
     dbsave = "qyxy_std"
-    username = "SJZY_RWMIX_Other"
-    password = "SJZY@M34I6x7D9ata"
-    lastid = 1212
+    username = ""
+    password = ""
+    lastid = 0
     localport =  ":18889"
     esport  =18890
     path = "" ## 路径默认为空,使用udp 传递的参数

+ 124 - 101
qyxy_std_new/main.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	util "app.yhyue.com/data_processing/common_utils"
 	"bufio"
 	"compress/gzip"
 	"encoding/json"
@@ -9,23 +8,26 @@ import (
 	"go.uber.org/zap"
 	"io"
 	"io/ioutil"
+	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
 	"jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
 	"net"
 	"os"
 	"strings"
+	"sync"
 	"time"
 )
 
 var (
-	CurrentColl string //当前表名
+	//CurrentColl string //当前表名
 	//SkipCollName string
 	//sendMsg      string
-	collCount   int // 当前表数据量
-	insertCount int // insert数据
-	updateCount int // update数据
-	saveLog     = make(map[string]interface{})
-	saveArr     [][]map[string]interface{}
+	//collCount   int // 当前表数据量
+	//insertCount int // insert数据
+	//updateCount int // update数据
+	//saveLog = make(map[string]interface{})
+	savelog sync.Map
+	//saveArr [][]map[string]interface{}
 
 	UdpClient  udp.UdpClient
 	qyxyEsAddr *net.UDPAddr
@@ -95,13 +97,9 @@ func dealPath(path string) {
 	if !strings.HasSuffix(path, "/") {
 		path = path + "/" ///Users/wangchengcheng/Desktop/jianyu/upload/20221119
 	}
+	var wg sync.WaitGroup
 	//std 程序只需要关注6个表
 	for _, c := range CollArr {
-		start := time.Now()
-		collCount = 0
-		insertCount = 0
-		updateCount = 0
-		CurrentColl = c
 		subPath := path + c + "/"
 		//判断文件夹存在
 		_, err := os.Stat(subPath) ///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/
@@ -109,45 +107,64 @@ func dealPath(path string) {
 			log.Info("dealPath", zap.Any("os.Stat err", subPath))
 			continue
 		}
-
-		subFiles, _ := ioutil.ReadDir(subPath)
-		for _, s := range subFiles { //七天的文件夹名
-			if s.IsDir() {
-				///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
-				dealinfo(subPath + s.Name())
-			}
-		}
-
-		// 判断最后的数据不足500条时 执行
-		if len(saveArr) > 0 {
-			tmps := saveArr
-			MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
-			saveArr = [][]map[string]interface{}{}
-		}
-
-		duration := time.Since(start)
-		result := map[string]interface{}{
-			"count":    collCount,
-			"duration": duration.Minutes(), //运行时长
-			"insert":   insertCount,        //插入数量
-			"update":   updateCount,        //更新数量
+		wg.Add(1)
+		go dealSubPath(c, subPath, &wg)
+	}
+	wg.Wait()
+	//最终记录
+	var saveRes = make(map[string]interface{})
+	for _, c := range CollArr {
+		data, ok := savelog.Load(c)
+		if ok {
+			saveRes[c] = data
 		}
-
-		saveLog[c] = result
 	}
 
-	MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveLog})
+	MongoTool.Save("save_log", map[string]interface{}{"createtime": time.Now().String(), "result": saveRes})
 
 	//执行完毕通知es 程序
 	data := map[string]interface{}{
 		"start":      true,
 		"start_time": startTime,
+		"end_time":   time.Now().Unix(),
 	}
 
 	SendUdpMsg(data, qyxyEsAddr)
 }
 
-func dealinfo(path string) {
+//dealSubPath 处理最里面层级数据;Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
+//c 当前表;
+func dealSubPath(c, subPath string, wg *sync.WaitGroup) {
+	defer wg.Done()
+	log.Info("dealSubPath", zap.String("开始处理path:", subPath))
+	start := time.Now()
+	var fileWg sync.WaitGroup
+	var linesMap sync.Map
+
+	subFiles, _ := ioutil.ReadDir(subPath)
+	for _, s := range subFiles { //七天的文件夹名
+		if s.IsDir() {
+			fileWg.Add(1)
+			///Users/wangchengcheng/Desktop/jianyu/upload/20221119/company_base/20221224
+			go dealinfo(c, subPath+s.Name(), &fileWg, &linesMap)
+		}
+	}
+
+	fileWg.Wait()
+
+	cCount, _ := linesMap.Load(c)
+
+	duration := time.Since(start)
+	result := map[string]interface{}{
+		"count":    cCount,
+		"duration": fmt.Sprintf("%v min", duration.Minutes()), //运行时长
+	}
+
+	savelog.Store(c, result)
+}
+
+func dealinfo(c, path string, wg *sync.WaitGroup, linesMap *sync.Map) {
+	defer wg.Done()
 	count := 0
 	file := path + "/split.json.gz"
 	log.Info("dealinfo", zap.Any("current date", file))
@@ -175,14 +192,17 @@ func dealinfo(path string) {
 			line, err := bfRd.ReadBytes('\n')
 			if err != nil {
 				if err == io.EOF {
+					log.Info("dealinfo", zap.String(fmt.Sprintf("%v/split.json.gz", path), "read gzip data finish!"))
 					fmt.Println("read gzip data finish! ")
 					break
 				} else {
-					fmt.Println("[read gzip data err]: ", err)
+					log.Error("dealinfo", zap.Any(fmt.Sprintf("%v/split.json.gz;read gzip err", path), err))
 				}
 			}
 			if len(line) > 0 {
-				count = hookfn(line, count)
+				count++
+				hookfn(c, line)
+				linesMap.Store(c, count)
 			}
 
 			if count%1000 == 0 {
@@ -193,25 +213,16 @@ func dealinfo(path string) {
 }
 
 //hookfn 处理拿到的每行数据
-func hookfn(line []byte, count int) int {
+func hookfn(c string, line []byte) {
 	tmp := make(map[string]interface{})
 	err := json.Unmarshal(line, &tmp)
 	if err != nil {
 		log.Error("hookfn", zap.Any("Unmarshal", err))
 	}
-	count++
-	collCount++
 
 	if _, ok := tmp["company_id"]; ok {
-		//统计插入、更新数量
-		if util.ObjToString(tmp["_operation_type"]) == "insert" {
-			insertCount++
-		} else {
-			updateCount++
-		}
-
 		//针对数据表 不同处理
-		switch CurrentColl {
+		switch c {
 		case "company_base":
 			dealCompanyBase(tmp)
 
@@ -230,12 +241,11 @@ func hookfn(line []byte, count int) int {
 		case "annual_report_website":
 			dealAnnualReportWebsite(tmp)
 		default:
-			fmt.Println("CurrentColl =>", CurrentColl)
+			fmt.Println("CurrentColl =>", c)
 
 		}
 	}
 
-	return count
 }
 
 //dealCompanyBase company_base数据表
@@ -384,18 +394,19 @@ func dealCompanyBase(data map[string]interface{}) {
 		{"_id": save["_id"]},
 		update,
 	}
+	MongoTool.UpSertBulk(GF.Env.Dbsave, updataInfo)
 
-	saveArr = append(saveArr, updataInfo)
-
-	//500 条处理一次,打印一次记录
-	if len(saveArr) >= 500 {
-		tmps := saveArr
-		res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
-		if !res {
-			log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
-		}
-		saveArr = [][]map[string]interface{}{}
-	}
+	//saveArr = append(saveArr, updataInfo)
+	//
+	////500 条处理一次,打印一次记录
+	//if len(saveArr) >= 500 {
+	//	tmps := saveArr
+	//	res := MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
+	//	if !res {
+	//		log.Info("dealCompanyBase", zap.Any("UpSertBulk company_base err", res))
+	//	}
+	//	saveArr = [][]map[string]interface{}{}
+	//}
 
 }
 
@@ -452,13 +463,15 @@ func dealCompanyEmployee(data map[string]interface{}) {
 		{"_id": data["company_id"]},
 		{"$set": save},
 	}
-	saveArr = append(saveArr, saveInfo)
-	//500 条处理一次,打印一次记录
-	if len(saveArr) >= 500 {
-		tmps := saveArr
-		MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
-		saveArr = [][]map[string]interface{}{}
-	}
+
+	MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
+	//saveArr = append(saveArr, saveInfo)
+	////500 条处理一次,打印一次记录
+	//if len(saveArr) >= 500 {
+	//	tmps := saveArr
+	//	MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
+	//	saveArr = [][]map[string]interface{}{}
+	//}
 }
 
 //dealCompanyPartner
@@ -530,13 +543,15 @@ func dealCompanyPartner(data map[string]interface{}) {
 		{"_id": data["company_id"]},
 		{"$set": save},
 	}
-	saveArr = append(saveArr, saveInfo)
-	//500 条处理一次,打印一次记录
-	if len(saveArr) >= 500 {
-		tmps := saveArr
-		MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
-		saveArr = [][]map[string]interface{}{}
-	}
+
+	MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
+	//saveArr = append(saveArr, saveInfo)
+	////500 条处理一次,打印一次记录
+	//if len(saveArr) >= 500 {
+	//	tmps := saveArr
+	//	MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
+	//	saveArr = [][]map[string]interface{}{}
+	//}
 }
 
 //dealAnnualReportBase annual_report_base
@@ -634,13 +649,15 @@ func dealAnnualReportBase(data map[string]interface{}) {
 		{"_id": data["company_id"]},
 		{"$set": save},
 	}
-	saveArr = append(saveArr, saveInfo)
-	//500 条处理一次,打印一次记录
-	if len(saveArr) >= 500 {
-		tmps := saveArr
-		MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
-		saveArr = [][]map[string]interface{}{}
-	}
+	MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
+	//
+	//saveArr = append(saveArr, saveInfo)
+	////500 条处理一次,打印一次记录
+	//if len(saveArr) >= 500 {
+	//	tmps := saveArr
+	//	MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
+	//	saveArr = [][]map[string]interface{}{}
+	//}
 }
 
 //dealHistoryName company_history_name
@@ -650,15 +667,17 @@ func dealHistoryName(data map[string]interface{}) {
 	save["updatetime"] = time.Now().Unix()
 
 	var names []string
-	name := data["history_name"].(string)
-	names = append(names, name)
-	save["history_name"] = strings.Join(names, ",")
+	if data["history_name"] != nil {
+		name := data["history_name"].(string)
+		names = append(names, name)
+		save["history_name"] = strings.Join(names, ",")
+	}
 
 	saveInfo := []map[string]interface{}{
 		{"_id": data["company_id"]},
 		{"$set": save},
 	}
-	saveArr = append(saveArr, saveInfo)
+	//saveArr = append(saveArr, saveInfo)
 
 	addSet := []map[string]interface{}{
 		{"_id": data["company_id"]},
@@ -668,13 +687,14 @@ func dealHistoryName(data map[string]interface{}) {
 	}
 	//单独对每条的历史名称追加数组
 	MongoTool.UpSertBulk(GF.Env.Dbsave, addSet)
+	MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
 
-	//500 条处理一次,打印一次记录
-	if len(saveArr) >= 500 {
-		tmps := saveArr
-		MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
-		saveArr = [][]map[string]interface{}{}
-	}
+	////500 条处理一次,打印一次记录
+	//if len(saveArr) >= 500 {
+	//	tmps := saveArr
+	//	MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
+	//	saveArr = [][]map[string]interface{}{}
+	//}
 
 }
 
@@ -699,12 +719,15 @@ func dealAnnualReportWebsite(data map[string]interface{}) {
 		{"_id": data["company_id"]},
 		{"$set": save},
 	}
-	saveArr = append(saveArr, saveInfo)
+
+	MongoTool.UpSertBulk(GF.Env.Dbsave, saveInfo)
+
+	//saveArr = append(saveArr, saveInfo)
 	//500 条处理一次,打印一次记录
-	if len(saveArr) >= 500 {
-		tmps := saveArr
-		MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
-		saveArr = [][]map[string]interface{}{}
-	}
+	//if len(saveArr) >= 500 {
+	//	tmps := saveArr
+	//	MongoTool.UpSertBulk(GF.Env.Dbsave, tmps...)
+	//	saveArr = [][]map[string]interface{}{}
+	//}
 
 }