fuwencai 4 년 전
부모
커밋
bf7e732a74
4개의 변경된 파일237개의 추가작업 그리고 5개의 파일을 삭제
  1. 1 1
      rpc/internal/logic/datadeduplicateandsavelogic.go
  2. 1 1
      rpc/internal/logic/datadeduplicatebyaccountlogic.go
  3. 53 2
      rpc/test/test1_test.go
  4. 182 1
      service/deduplication.go

+ 1 - 1
rpc/internal/logic/datadeduplicateandsavelogic.go

@@ -25,7 +25,7 @@ func NewDataDeduplicateAndSaveLogic(ctx context.Context, svcCtx *svc.ServiceCont
 
 //   根据账户id判重并存入数据
 func (l *DataDeduplicateAndSaveLogic) DataDeduplicateAndSave(in *deduplication.ByAccountRequest) (*deduplication.Response, error) {
-	info,err:=deduplicationService.DataDeduplicateByAccountId(in)
+	info,err:=deduplicationService.DataDeduplicateAndSave(in)
 	code := 0
 	if err!= ""{
 		code = -1

+ 1 - 1
rpc/internal/logic/datadeduplicatebyaccountlogic.go

@@ -25,7 +25,7 @@ func NewDataDeduplicateByAccountLogic(ctx context.Context, svcCtx *svc.ServiceCo
 
 //   根据账户id进行判重
 func (l *DataDeduplicateByAccountLogic) DataDeduplicateByAccount(in *deduplication.ByAccountRequest) (*deduplication.Response, error) {
-	info,err:=deduplicationService.DataDeduplicateAndSave(in)
+	info,err:=deduplicationService.DataDeduplicateByAccountId(in)
 	code := 0
 	if err!= ""{
 		code = -1

+ 53 - 2
rpc/test/test1_test.go

@@ -5,6 +5,10 @@ import (
 	"app.yhyue.com/moapp/dataDeduplication/rpc/deduplicationclient"
 	"app.yhyue.com/moapp/dataDeduplication/rpc/internal/config"
 	"context"
+	"crypto/md5"
+	"fmt"
+	"io"
+	"strconv"
 
 	"flag"
 	"github.com/tal-tech/go-zero/core/conf"
@@ -25,14 +29,61 @@ func Test_centerUserCenter(t *testing.T) {
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 	FileSystem := deduplicationclient.NewDeduplication(zrpc.MustNewClient(c.FileSystemConf))
 	req := &deduplication.Request{}
-	req.InfoId="1aaaa,2,5555,888,222,abc"
+	req.InfoId="4477889985"
 	req.EntId="1"
 	req.PersonId="0"
 	req.IsEnt=true
-	req.IsInsert = true
+	req.IsInsert = false
 	res, err := FileSystem.DataDeduplication(ctx, req)
 
 	log.Println("err ", err)
 	log.Println("req ", res)
 }
 
+func Test_AccountDedup(t *testing.T) {
+	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+	FileSystem := deduplicationclient.NewDeduplication(zrpc.MustNewClient(c.FileSystemConf))
+	req := &deduplication.ByAccountRequest{}
+	req.InfoId="2255"
+	req.AccountId="5b023122c9ebc25f793e14a7"
+	req.PersonId="5b023122c9ebc25f793e14a7"
+
+	//res, err := FileSystem.DataDeduplicateByAccount(ctx, req)
+	res, err := FileSystem.DataDeduplicateAndSave(ctx, req)
+
+	log.Println("err ", err)
+	log.Println("req ", res)
+	//ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+	//FileSystem := deduplicationclient.NewDeduplication(zrpc.MustNewClient(c.FileSystemConf))
+	//req := &deduplication.ByAccountRequest{}
+	//userid := "5b023122c9ebc25f793e14a7"
+	//b := a(userid)
+	//ss:=b[len(b)-2:]
+	//bt,_:=strconv.ParseInt(ss,16,64)
+	//log.Println(bt)
+}
+
+func a(data string) string {
+	t := md5.New()
+	io.WriteString(t, data)
+	return fmt.Sprintf("%x", t.Sum(nil))
+}
+func Hextob(str string)([]byte){
+	slen:=len(str)
+	bHex:=make([]byte,len(str)/2)
+	ii:=0
+	for i:=0;i<len(str);i=i+2 {
+		if slen!=1{
+			ss:=string(str[i])+string(str[i+1])
+			bt,_:=strconv.ParseInt(ss,16,64)
+			bHex[ii]=byte(bt)
+			ii=ii+1;
+			slen=slen-2;}
+	}
+	return bHex;
+}
+/*字节数组转16进制可以直接使用 用fmt就能转换*/
+func BytetoH(b []byte)(H string){
+	H=fmt.Sprintf("%x",b)
+	return;
+}

+ 182 - 1
service/deduplication.go

@@ -3,8 +3,10 @@ package service
 import (
 	"app.yhyue.com/moapp/dataDeduplication/entity"
 	"app.yhyue.com/moapp/dataDeduplication/rpc/deduplication"
+	"crypto/md5"
 	"fmt"
 	"github.com/go-xorm/xorm"
+	"io"
 	"log"
 	"strconv"
 	"strings"
@@ -138,6 +140,180 @@ func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.R
 		NewCount:   int64(totalInfoCount - totalExist),
 		IsInsert:   false,
 	}, ""
+}
+
+// 根据账户id进行判重
+func (service *DeduplicationService) DataDeduplicateByAccountId(data *deduplication.ByAccountRequest) (*deduplication.Info, string) {
+	log.Println("开始=====")
+	orm := Engine.NewSession()
+	defer orm.Close()
+	// 模运算取企业id  todo  需要区分是int类型还是mongodb objectid类型   看一下咋转
+	//var num01 int
+	number, errConv := strconv.Atoi(data.AccountId)
+	log.Println(55555,number,errConv)
+	if errConv!=nil{
+		log.Println("不是int类型的,hash后再取模寻表")
+		b := a(data.AccountId)
+		ss:=b[len(b)-2:]
+		bt,_:=strconv.ParseInt(ss,16,64)
+		number =int(bt)
+		log.Println("哈希取模后的表序号",number)
+	}
+	tableName := PREFIX + fmt.Sprintf("%03d", number%100)
+
+	// 查询
+	var rs entity.Deduplication
+	var tmpList []string
+	var valueList []interface{}
+	var selectSql string
+	valueList = append(valueList, data.AccountId)
+
+	for _, v := range strings.Split(data.InfoId, ",") {
+		if strings.TrimSpace(v)!=""{
+			tmpList = append(tmpList, "?")
+			valueList = append(valueList, v)
+		}
+
+	}
+
+	selectSql = fmt.Sprintf("account_id=? and info_id in (%s)", strings.Join(tmpList, ","))
+	infoIdList := strings.Split(data.InfoId, ",")
+	totalInfoCount := len(infoIdList)
+	count, err := orm.Table(tableName).Where(selectSql, valueList...).Count(rs)
+	log.Println(count, "已存在数据量")
+	if err != nil {
+		log.Println(err, "判重查询失败")
+		return &deduplication.Info{
+			TotalCount: 0,
+			ExistCount: 0,
+			NewCount:   0,
+			IsInsert:   false,
+		}, "判重查询失败"
+	}
+
+	return &deduplication.Info{
+		TotalCount: int64(totalInfoCount),
+		ExistCount: count,
+		NewCount:   int64(totalInfoCount) - count,
+		IsInsert:   false,
+	}, ""
+}
+// 根据账户id进行判重并存入数据
+func (service *DeduplicationService) DataDeduplicateAndSave(data *deduplication.ByAccountRequest) (*deduplication.Info, string) {
+	log.Println("开始=====")
+	orm := Engine.NewSession()
+	defer orm.Close()
+	// 模运算取企业id  企业id是int 类型的直接对100取模
+	// objectid类型的哈希后取模  这里使用md5后取后两位数字转10进制 然后对100取模
+	number, errConv := strconv.Atoi(data.AccountId)
+	log.Println(55555,number,errConv)
+	if errConv!=nil{
+		log.Println("不是int类型的,hash后再取模寻表")
+		b := a(data.AccountId)
+		ss:=b[len(b)-2:]
+		bt,_:=strconv.ParseInt(ss,16,64)
+		number =int(bt)
+		log.Println(bt)
+	}
+	tableName := PREFIX + fmt.Sprintf("%03d", number%100)
+	// 查询
+	var rs []*entity.Deduplication
+	var tmpList []string
+	var valueList []interface{}
+	var selectSql string
+	valueList = append(valueList, data.AccountId)
+
+	for _, v := range strings.Split(data.InfoId, ",") {
+		if strings.TrimSpace(v)!=""{
+			tmpList = append(tmpList, "?")
+			valueList = append(valueList, v)
+		}
+
+	}
+
+	selectSql = fmt.Sprintf("account_id=? and info_id in (%s)", strings.Join(tmpList, ","))
+	infoIdList := strings.Split(data.InfoId, ",")
+	totalInfoCount := len(infoIdList)
+	err := orm.Table(tableName).Cols("info_id").Where(selectSql, valueList...).Find(&rs)
+	totalExist := len(rs)
+	log.Println(totalExist, "已存在数据量")
+	if err != nil {
+		log.Println(err, "判重查询失败")
+		return &deduplication.Info{
+			TotalCount: 0,
+			ExistCount: 0,
+			NewCount:   0,
+		}, "判重查询失败"
+	}
+
+
+	existIdMap := map[string]bool{}
+	for _, v := range rs {
+		existIdMap[v.InfoId] = true
+	}
+	// 开启事务
+	orm.Begin()
+	// 新增
+	var insertList []entity.Deduplication
+	for _, id := range infoIdList {
+		if existIdMap[id] {
+			log.Println("id已存在", id)
+			continue
+		}
+		log.Println("新增", id)
+		temData := entity.Deduplication{
+			InfoId:   id,
+			EntId:    "0",
+			PersonId: data.PersonId,
+			AccountId: data.AccountId,
+		}
+		insertList = append(insertList, temData)
+		if len(insertList) > 500 {
+			_, err3 := orm.Table(tableName).Insert(insertList)
+			insertList = []entity.Deduplication{}
+			if err3 != nil {
+				orm.Rollback()
+				log.Println(err3, "新增数据失败")
+				return &deduplication.Info{
+					TotalCount: int64(totalInfoCount),
+					ExistCount: int64(totalExist),
+					NewCount:   int64(totalInfoCount - totalExist),
+				}, "新增数据失败"
+			}
+		}
+	}
+	if len(insertList) > 0 {
+		_, err3 := orm.Table(tableName).Insert(insertList)
+		if err3 != nil {
+			orm.Rollback()
+			log.Println(err3, "新增数据失败")
+			return &deduplication.Info{
+				TotalCount: int64(totalInfoCount),
+				ExistCount: int64(totalExist),
+				NewCount:   int64(totalInfoCount - totalExist),
+			}, "新增数据失败"
+		}
+	}
+	err2 := orm.Commit()
+	if err2 != nil {
+		log.Println("提交失败")
+		return &deduplication.Info{
+			TotalCount: int64(totalInfoCount),
+			ExistCount: int64(totalExist),
+			NewCount:   int64(totalInfoCount - totalExist),
+		}, "提交失败"
+	} else {
+		log.Println("提交成功")
+		return &deduplication.Info{
+			TotalCount: int64(totalInfoCount),
+			ExistCount: int64(totalExist),
+			NewCount:   int64(totalInfoCount - totalExist),
+		}, ""
+
+	}
+
+
+
 }
 func (service *DeduplicationService) EntCount(data *deduplication.GetEntCountRequest) (int64, string) {
 	log.Println("开始=====")
@@ -152,7 +328,12 @@ func (service *DeduplicationService) EntCount(data *deduplication.GetEntCountReq
 	if err != nil {
 		return 0, "查询失败"
 	}
-	log.Println("count=====",count)
+	log.Println("count=====", count)
 	return count, ""
 
 }
+func a(data string) string {
+	t := md5.New()
+	io.WriteString(t, data)
+	return fmt.Sprintf("%x", t.Sum(nil))
+}