浏览代码

feat:提交

fuwencai 1 年之前
父节点
当前提交
3d63b1eb1e
共有 4 个文件被更改,包括 40 次插入14 次删除
  1. 4 1
      rpc/deduplication.go
  2. 1 0
      rpc/etc/deduplication.yaml
  3. 1 0
      rpc/internal/config/config.go
  4. 34 13
      service/deduplication.go

+ 4 - 1
rpc/deduplication.go

@@ -39,9 +39,9 @@ func main() {
 	s.Start()
 }
 
-//创建orm引擎
 func init() {
 	conf.MustLoad(*configFile, &config.ConfigJson)
+	//创建orm引擎
 	var err error
 	log.Println(config.ConfigJson.DataSource)
 	service.Engine, err = xorm.NewEngine("mysql", config.ConfigJson.DataSource)
@@ -53,4 +53,7 @@ func init() {
 	service.Engine.SetMaxOpenConns(config.ConfigJson.MaxOpenConns)
 	service.Engine.SetMaxIdleConns(config.ConfigJson.MaxIdleConns)
 	fmt.Println(config.ConfigJson.DataSource + "链接成功")
+	// 初始化
+	service.SaveNumChan = make(chan int, config.ConfigJson.SaveChanNum)
+	log.Println("初始化完成")
 }

+ 1 - 0
rpc/etc/deduplication.yaml

@@ -7,6 +7,7 @@ Etcd:
 DataSource: root:Topnet123@(192.168.3.149:3306)/jianyu?charset=utf8mb4&parseTime=true&loc=Local
 MaxIdleConns: 25
 MaxOpenConns: 50
+SaveChanNum: 4
 FileSystemConf:
   Etcd:
     Hosts:

+ 1 - 0
rpc/internal/config/config.go

@@ -10,6 +10,7 @@ type Config struct {
 	FileSystemConf zrpc.RpcClientConf
 	MaxIdleConns   int // 数据库连接最大空闲数
 	MaxOpenConns   int // 数据库连接数
+	SaveChanNum    int // 保存数据能用的数据库最大连接数
 }
 
 var ConfigJson Config

+ 34 - 13
service/deduplication.go

@@ -19,6 +19,7 @@ var Engine *xorm.Engine
 type DeduplicationService struct{}
 
 var PREFIX = "qc"
+var SaveNumChan chan int
 
 //数据判重
 func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.Request) (*deduplication.Info, string) {
@@ -91,11 +92,10 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 				tmp := []interface{}{insertSql}
 				tmp = append(tmp, insertList...)
 				_, err3 := orm.Exec(tmp...)
-				insertList = []interface{}{}
-				parmList = []string{}
 				if err3 != nil {
 					orm.Rollback()
-					log.Println(err3, "新增数据失败")
+					log.Printf("DataDeduplicateInsert 执行失败已回滚,err:%s", err3)
+					log.Printf("DataDeduplicateInsert 执行失败sql:%s,该批次部分数据:%v", insertSql, insertList)
 					return &deduplication.Info{
 						TotalCount: int64(totalInfoCount),
 						ExistCount: int64(totalExist),
@@ -103,6 +103,8 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 						IsInsert:   false,
 					}, "新增数据失败"
 				}
+				insertList = []interface{}{}
+				parmList = []string{}
 			}
 		}
 		if len(insertList) > 0 {
@@ -112,7 +114,8 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 			_, err3 := orm.Exec(tmp...)
 			if err3 != nil {
 				orm.Rollback()
-				log.Println(err3, "新增数据失败")
+				log.Printf("DataDeduplicateInsert 执行失败已回滚,err:%s", err3)
+				log.Printf("DataDeduplicateInsert 执行失败sql:%s,该批次部分数据:%v", insertSql, insertList)
 				return &deduplication.Info{
 					TotalCount: int64(totalInfoCount),
 					ExistCount: int64(totalExist),
@@ -124,6 +127,8 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 		err := orm.Commit()
 		if err != nil {
 			log.Println("提交失败")
+			log.Printf("DataDeduplicateInsert commit err:%s  最后一个sql:%s", err, insertSql)
+			log.Printf("DataDeduplicateInsert 提交失败该批次部分数据:%v", insertList...)
 			return &deduplication.Info{
 				TotalCount: int64(totalInfoCount),
 				ExistCount: int64(totalExist),
@@ -270,7 +275,7 @@ func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.
 			log.Println("id已存在", id)
 			continue
 		}
-		log.Println("新增", id)
+		//log.Println("新增", id)
 		temData := entity.Deduplication{
 			InfoId:    id,
 			EntId:     "0",
@@ -316,10 +321,13 @@ func a(data string) string {
 }
 
 func SaveMysql(tableName string, saveList []entity.Deduplication) {
-	//
+	SaveNumChan <- 1
 	log.Println("保存数据开始")
 	orm := Engine.NewSession()
-	defer orm.Close()
+	defer func() {
+		orm.Close()
+		<-SaveNumChan
+	}()
 	orm.Begin()
 	insertList := []interface{}{}
 	insertSql := fmt.Sprintf("insert IGNORE  into %s (info_id,ent_id,person_id,account_id,data_desc) values  ", tableName)
@@ -332,12 +340,14 @@ func SaveMysql(tableName string, saveList []entity.Deduplication) {
 			tmp := []interface{}{insertSql}
 			tmp = append(tmp, insertList...)
 			_, err3 := orm.Exec(tmp...)
-			insertList = []interface{}{}
-			parmList = []string{}
 			if err3 != nil {
 				orm.Rollback()
-				log.Println(err3, "新增数据失败")
+				log.Printf("DataDeduplicateAndSave执行失败err:%s", err3)
+				log.Printf("DataDeduplicateAndSave执行失败sql:%s,该批次部分数据:%v", insertSql, insertList)
+				return
 			}
+			insertList = []interface{}{}
+			parmList = []string{}
 		}
 	}
 	if len(insertList) > 0 {
@@ -347,12 +357,23 @@ func SaveMysql(tableName string, saveList []entity.Deduplication) {
 		_, err3 := orm.Exec(tmp...)
 		if err3 != nil {
 			orm.Rollback()
-			log.Println(err3, "新增数据失败")
+			log.Printf("DataDeduplicateAndSave执行失败err:%s", err3)
+			log.Printf("DataDeduplicateAndSave执行失败sql:%s,该批次部分数据:%v", insertSql, insertList)
+			return
 		}
 
 	}
 	err2 := orm.Commit()
-	log.Println("err2", err2)
+	if err2 != nil {
+		log.Printf("DataDeduplicateAndSave commit失败:%s 最后一次sql:%s", err2, insertSql)
+		log.Printf("DataDeduplicateAndSave commit 该批次部分数据:%v", insertList)
+	}
 	log.Println("保存数据结束")
-
 }
+
+//type saveChanStruct struct {
+//	TableName string
+//	Data      []entity.Deduplication
+//}
+//
+//var SaveChan = make(chan saveChanStruct, 10000)