Jelajahi Sumber

Merge branch 'dev/v1.2.1.1_fuwencai' of moapp/dataDeduplication into hotfix/v1.2.1.1

fuwencai 1 tahun lalu
induk
melakukan
99bade767f
4 mengubah file dengan 74 tambahan dan 38 penghapusan
  1. 8 4
      rpc/deduplication.go
  2. 4 1
      rpc/etc/deduplication.yaml
  3. 5 2
      rpc/internal/config/config.go
  4. 57 31
      service/deduplication.go

+ 8 - 4
rpc/deduplication.go

@@ -15,8 +15,6 @@ import (
 	"github.com/go-xorm/xorm"
 	"github.com/go-xorm/xorm"
 	"log"
 	"log"
 
 
-
-
 	"github.com/tal-tech/go-zero/core/conf"
 	"github.com/tal-tech/go-zero/core/conf"
 	"github.com/tal-tech/go-zero/zrpc"
 	"github.com/tal-tech/go-zero/zrpc"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc"
@@ -40,9 +38,10 @@ func main() {
 	fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
 	fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
 	s.Start()
 	s.Start()
 }
 }
-//创建orm引擎
+
 func init() {
 func init() {
 	conf.MustLoad(*configFile, &config.ConfigJson)
 	conf.MustLoad(*configFile, &config.ConfigJson)
+	//创建orm引擎
 	var err error
 	var err error
 	log.Println(config.ConfigJson.DataSource)
 	log.Println(config.ConfigJson.DataSource)
 	service.Engine, err = xorm.NewEngine("mysql", config.ConfigJson.DataSource)
 	service.Engine, err = xorm.NewEngine("mysql", config.ConfigJson.DataSource)
@@ -51,5 +50,10 @@ func init() {
 	if err != nil {
 	if err != nil {
 		log.Fatal("数据库连接失败:", err)
 		log.Fatal("数据库连接失败:", err)
 	}
 	}
-	fmt.Println(config.ConfigJson.DataSource+"链接成功")
+	service.Engine.SetMaxOpenConns(config.ConfigJson.MaxOpenConns)
+	service.Engine.SetMaxIdleConns(config.ConfigJson.MaxIdleConns)
+	fmt.Println(config.ConfigJson.DataSource + "链接成功")
+	// 初始化
+	service.SaveNumChan = make(chan int, config.ConfigJson.SaveChanNum)
+	log.Println("初始化完成")
 }
 }

+ 4 - 1
rpc/etc/deduplication.yaml

@@ -4,7 +4,10 @@ Etcd:
   Hosts:
   Hosts:
   - 127.0.0.1:2379
   - 127.0.0.1:2379
   Key: deduplication.rpc
   Key: deduplication.rpc
-DataSource: root:root@tcp(127.0.0.1:3306)/quchong?charset=utf8mb4&parseTime=true&loc=Local
+DataSource: root:Topnet123@(192.168.3.149:3306)/jianyu?charset=utf8mb4&parseTime=true&loc=Local
+MaxIdleConns: 25
+MaxOpenConns: 50
+SaveChanNum: 4
 FileSystemConf:
 FileSystemConf:
   Etcd:
   Etcd:
     Hosts:
     Hosts:

+ 5 - 2
rpc/internal/config/config.go

@@ -2,12 +2,15 @@ package config
 
 
 import "github.com/tal-tech/go-zero/zrpc"
 import "github.com/tal-tech/go-zero/zrpc"
 
 
-
 type Config struct {
 type Config struct {
 	zrpc.RpcServerConf
 	zrpc.RpcServerConf
 	DataSource     string // 手动代码
 	DataSource     string // 手动代码
 	Node           int    // 节点
 	Node           int    // 节点
 	CalleeId       string // 服务名字
 	CalleeId       string // 服务名字
 	FileSystemConf zrpc.RpcClientConf
 	FileSystemConf zrpc.RpcClientConf
+	MaxIdleConns   int // 数据库连接最大空闲数
+	MaxOpenConns   int // 数据库连接数
+	SaveChanNum    int // 保存数据能用的数据库最大连接数
 }
 }
-var  ConfigJson Config
+
+var ConfigJson Config

+ 57 - 31
service/deduplication.go

@@ -19,6 +19,7 @@ var Engine *xorm.Engine
 type DeduplicationService struct{}
 type DeduplicationService struct{}
 
 
 var PREFIX = "qc"
 var PREFIX = "qc"
+var SaveNumChan chan int
 
 
 //数据判重
 //数据判重
 func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.Request) (*deduplication.Info, string) {
 func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.Request) (*deduplication.Info, string) {
@@ -67,6 +68,8 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 	log.Println(totalInfoCount, "本批次去重后总量")
 	log.Println(totalInfoCount, "本批次去重后总量")
 	log.Println(totalInfoCount-totalExist, "newCount")
 	log.Println(totalInfoCount-totalExist, "newCount")
 	if data.IsInsert {
 	if data.IsInsert {
+		insertSql := fmt.Sprintf("insert IGNORE  into %s (info_id,ent_id,person_id,account_id,data_desc) values  ", tableName)
+		parmList := []string{}
 		existIdMap := map[string]bool{}
 		existIdMap := map[string]bool{}
 		for _, v := range rs {
 		for _, v := range rs {
 			existIdMap[v.InfoId] = true
 			existIdMap[v.InfoId] = true
@@ -74,25 +77,25 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 		// 开启事务
 		// 开启事务
 		orm.Begin()
 		orm.Begin()
 		// 新增
 		// 新增
-		var insertList []entity.Deduplication
+		//var insertList []entity.Deduplication
+		var insertList []interface{}
 		for id, _ := range mapData {
 		for id, _ := range mapData {
 			if existIdMap[id] {
 			if existIdMap[id] {
 				log.Println("id已存在", id)
 				log.Println("id已存在", id)
 				continue
 				continue
 			}
 			}
 			log.Println("新增", id)
 			log.Println("新增", id)
-			temData := entity.Deduplication{
-				InfoId:   id,
-				EntId:    data.EntId,
-				PersonId: data.PersonId,
-			}
-			insertList = append(insertList, temData)
-			if len(insertList) > 100 {
-				_, err3 := orm.Table(tableName).Insert(insertList)
-				insertList = []entity.Deduplication{}
+			parmList = append(parmList, "(?,?,?,?,?)")
+			insertList = append(insertList, id, data.EntId, data.PersonId, "", 0)
+			if len(insertList) >= 100 {
+				insertSqlS := fmt.Sprintf("%s %s", insertSql, strings.Join(parmList, ","))
+				tmp := []interface{}{insertSqlS}
+				tmp = append(tmp, insertList...)
+				_, err3 := orm.Exec(tmp...)
 				if err3 != nil {
 				if err3 != nil {
 					orm.Rollback()
 					orm.Rollback()
-					log.Println(err3, "新增数据失败")
+					log.Printf("DataDeduplicateInsert 执行失败已回滚,err:%s", err3)
+					log.Printf("DataDeduplicateInsert 执行失败sql:%s,该批次部分数据:%v", insertSqlS, insertList)
 					return &deduplication.Info{
 					return &deduplication.Info{
 						TotalCount: int64(totalInfoCount),
 						TotalCount: int64(totalInfoCount),
 						ExistCount: int64(totalExist),
 						ExistCount: int64(totalExist),
@@ -100,13 +103,19 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 						IsInsert:   false,
 						IsInsert:   false,
 					}, "新增数据失败"
 					}, "新增数据失败"
 				}
 				}
+				insertList = []interface{}{}
+				parmList = []string{}
 			}
 			}
 		}
 		}
 		if len(insertList) > 0 {
 		if len(insertList) > 0 {
-			_, err3 := orm.Table(tableName).Insert(insertList)
+			insertSqlS := fmt.Sprintf("%s %s", insertSql, strings.Join(parmList, ","))
+			tmp := []interface{}{insertSqlS}
+			tmp = append(tmp, insertList...)
+			_, err3 := orm.Exec(tmp...)
 			if err3 != nil {
 			if err3 != nil {
 				orm.Rollback()
 				orm.Rollback()
-				log.Println(err3, "新增数据失败")
+				log.Printf("DataDeduplicateInsert 执行失败已回滚,err:%s", err3)
+				log.Printf("DataDeduplicateInsert 执行失败sql:%s,该批次部分数据:%v", insertSqlS, insertList)
 				return &deduplication.Info{
 				return &deduplication.Info{
 					TotalCount: int64(totalInfoCount),
 					TotalCount: int64(totalInfoCount),
 					ExistCount: int64(totalExist),
 					ExistCount: int64(totalExist),
@@ -117,7 +126,8 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 		}
 		}
 		err := orm.Commit()
 		err := orm.Commit()
 		if err != nil {
 		if err != nil {
-			log.Println("提交失败")
+			log.Printf("DataDeduplicateInsert commit err:%s  ", err)
+			log.Printf("DataDeduplicateInsert 提交失败该批次部分数据:%v", insertList...)
 			return &deduplication.Info{
 			return &deduplication.Info{
 				TotalCount: int64(totalInfoCount),
 				TotalCount: int64(totalInfoCount),
 				ExistCount: int64(totalExist),
 				ExistCount: int64(totalExist),
@@ -149,8 +159,7 @@ func (service *DeduplicationService) DataDeduplicateByAccountId(data *deduplicat
 	log.Println("开始=====")
 	log.Println("开始=====")
 	orm := Engine.NewSession()
 	orm := Engine.NewSession()
 	defer orm.Close()
 	defer orm.Close()
-	// 模运算取企业id  todo  需要区分是int类型还是mongodb objectid类型   看一下咋转
-	//var num01 int
+	// 模运算取企业id    需要区分是int类型还是mongodb objectid类型
 	number, errConv := strconv.Atoi(data.AccountId)
 	number, errConv := strconv.Atoi(data.AccountId)
 	log.Println("账户id转换:", number, errConv)
 	log.Println("账户id转换:", number, errConv)
 	if errConv != nil {
 	if errConv != nil {
@@ -212,7 +221,7 @@ func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.
 	// 模运算取企业id  企业id是int 类型的直接对100取模
 	// 模运算取企业id  企业id是int 类型的直接对100取模
 	// objectid类型的哈希后取模  这里使用md5后取后两位数字转10进制 然后对100取模
 	// objectid类型的哈希后取模  这里使用md5后取后两位数字转10进制 然后对100取模
 	number, errConv := strconv.Atoi(data.AccountId)
 	number, errConv := strconv.Atoi(data.AccountId)
-	log.Println(55555, number, errConv)
+	log.Println("数据判重:判断是否int类型", number, errConv)
 	if errConv != nil {
 	if errConv != nil {
 		log.Println("不是int类型的,hash后再取模寻表")
 		log.Println("不是int类型的,hash后再取模寻表")
 		b := a(data.AccountId)
 		b := a(data.AccountId)
@@ -265,7 +274,7 @@ func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.
 			log.Println("id已存在", id)
 			log.Println("id已存在", id)
 			continue
 			continue
 		}
 		}
-		log.Println("新增", id)
+		//log.Println("新增", id)
 		temData := entity.Deduplication{
 		temData := entity.Deduplication{
 			InfoId:    id,
 			InfoId:    id,
 			EntId:     "0",
 			EntId:     "0",
@@ -276,7 +285,6 @@ func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.
 		insertList = append(insertList, temData)
 		insertList = append(insertList, temData)
 
 
 	}
 	}
-	//log.Println(totalInfoCount, count, "88888888")
 	log.Println(len(insertList))
 	log.Println(len(insertList))
 	go SaveMysql(tableName, insertList)
 	go SaveMysql(tableName, insertList)
 	log.Println("结束=====", time.Now())
 	log.Println("结束=====", time.Now())
@@ -312,34 +320,52 @@ func a(data string) string {
 }
 }
 
 
 func SaveMysql(tableName string, saveList []entity.Deduplication) {
 func SaveMysql(tableName string, saveList []entity.Deduplication) {
-	//
+	SaveNumChan <- 1
 	log.Println("保存数据开始")
 	log.Println("保存数据开始")
 	orm := Engine.NewSession()
 	orm := Engine.NewSession()
-	defer orm.Close()
+	defer func() {
+		orm.Close()
+		<-SaveNumChan
+	}()
 	orm.Begin()
 	orm.Begin()
-	insertList := []entity.Deduplication{}
+	insertList := []interface{}{}
+	insertSql := fmt.Sprintf("insert IGNORE  into %s (info_id,ent_id,person_id,account_id,data_desc) values  ", tableName)
+	parmList := []string{}
 	for _, saveData := range saveList {
 	for _, saveData := range saveList {
-		insertList = append(insertList, saveData)
+		parmList = append(parmList, "(?,?,?,?,?)")
+		insertList = append(insertList, saveData.InfoId, saveData.EntId, saveData.PersonId, saveData.AccountId, saveData.DataDesc)
 		if len(insertList) > 500 {
 		if len(insertList) > 500 {
-			_, err3 := orm.Table(tableName).Insert(insertList)
-			insertList = []entity.Deduplication{}
+			insertSqlS := fmt.Sprintf("%s %s", insertSql, strings.Join(parmList, ","))
+			tmp := []interface{}{insertSqlS}
+			tmp = append(tmp, insertList...)
+			_, err3 := orm.Exec(tmp...)
 			if err3 != nil {
 			if err3 != nil {
 				orm.Rollback()
 				orm.Rollback()
-				log.Println(err3, "新增数据失败")
+				log.Printf("DataDeduplicateAndSave执行失败err:%s", err3)
+				log.Printf("DataDeduplicateAndSave执行失败sql:%s,该批次部分数据:%v", insertSqlS, insertList)
+				return
 			}
 			}
+			insertList = []interface{}{}
+			parmList = []string{}
 		}
 		}
 	}
 	}
 	if len(insertList) > 0 {
 	if len(insertList) > 0 {
-		_, err3 := orm.Table(tableName).Insert(insertList)
+		insertSqlS := fmt.Sprintf("%s %s", insertSql, strings.Join(parmList, ","))
+		tmp := []interface{}{insertSqlS}
+		tmp = append(tmp, insertList...)
+		_, err3 := orm.Exec(tmp...)
 		if err3 != nil {
 		if err3 != nil {
 			orm.Rollback()
 			orm.Rollback()
-			log.Println(err3, "新增数据失败")
-
+			log.Printf("DataDeduplicateAndSave执行失败err:%s", err3)
+			log.Printf("DataDeduplicateAndSave执行失败sql:%s,该批次部分数据:%v", insertSqlS, insertList)
+			return
 		}
 		}
 
 
 	}
 	}
 	err2 := orm.Commit()
 	err2 := orm.Commit()
-	log.Println("err2", err2)
+	if err2 != nil {
+		log.Printf("DataDeduplicateAndSave commit失败:%s ", err2)
+		log.Printf("DataDeduplicateAndSave commit 该批次部分数据:%v", insertList)
+	}
 	log.Println("保存数据结束")
 	log.Println("保存数据结束")
-
 }
 }