fuwencai 4 年之前
当前提交
dc87ed5060

+ 7 - 0
entity/deduplication.go

@@ -0,0 +1,7 @@
+package entity
+
+type Deduplication struct {
+	InfoId   string `xorm:"info_id" form:"info_id" json:"info_id"`
+	PersonId string `xorm:"person_id" form:"person_id" json:"person_id"`
+	EntId    string `xorm:"ent_id" form:"ent_id" json:"ent_id"`
+}

+ 54 - 0
rpc/deduplication.go

@@ -0,0 +1,54 @@
+// Code generated by goctl. DO NOT EDIT!
+// Source: deduplication.proto
+
+package main
+
+import (
+	"dataDeduplication/service"
+	"flag"
+	"fmt"
+	_ "github.com/go-sql-driver/mysql"
+	"github.com/go-xorm/xorm"
+	"log"
+
+	"dataDeduplication/rpc/deduplication"
+	"dataDeduplication/rpc/internal/config"
+	"dataDeduplication/rpc/internal/server"
+	"dataDeduplication/rpc/internal/svc"
+
+	"github.com/tal-tech/go-zero/core/conf"
+	"github.com/tal-tech/go-zero/zrpc"
+	"google.golang.org/grpc"
+)
+
+var configFile = flag.String("f", "etc/deduplication.yaml", "the config file")
+
+func main() {
+	flag.Parse()
+
+	var c config.Config
+	conf.MustLoad(*configFile, &c)
+	ctx := svc.NewServiceContext(c)
+	srv := server.NewDeduplicationServer(ctx)
+
+	s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
+		deduplication.RegisterDeduplicationServer(grpcServer, srv)
+	})
+	defer s.Stop()
+
+	fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
+	s.Start()
+}
+//创建orm引擎
+func init() {
+	conf.MustLoad(*configFile, &config.ConfigJson)
+	var err error
+	log.Println(config.ConfigJson.DataSource)
+	service.Engine, err = xorm.NewEngine("mysql", config.ConfigJson.DataSource)
+	log.Println(err)
+	service.Engine.ShowSQL(true)
+	if err != nil {
+		log.Fatal("数据库连接失败:", err)
+	}
+	fmt.Println(config.ConfigJson.DataSource+"链接成功")
+}

+ 30 - 0
rpc/deduplication.proto

@@ -0,0 +1,30 @@
+syntax = "proto3";
+
+package deduplication;
+
+message Request {
+  string personId = 1;     //人员id
+  string  infoId = 2;      //信息id  逗号分隔
+  string entId = 3;        // 企业id
+  bool isInsert = 4;        // 是否插入不重复的数据
+  bool  isEnt = 5 ;   // 是否按企业id判重
+}
+
+message Info {
+  int64  totalCount = 1; // 本次查询info_id总量
+  int64  existCount = 2; // 已存在的info_id的数量
+  int64  NewCount = 3;   // 不存在的info_id的数量
+  bool  isInsert = 4 ;   // 是否插入新数据
+
+}
+
+message Response {
+  int64  code = 1;    //响应代码
+  string message = 2;  //响应消息
+  Info data = 3;     //响应内容
+}
+
+service Deduplication {
+  // 数据判重
+  rpc dataDeduplication(Request) returns(Response);
+}

+ 453 - 0
rpc/deduplication/deduplication.pb.go

@@ -0,0 +1,453 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.25.0
+// 	protoc        v3.15.1
+// source: deduplication.proto
+
+package deduplication
+
+import (
+	context "context"
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// This is a compile-time assertion that a sufficiently up-to-date version
+// of the legacy proto package is being used.
+const _ = proto.ProtoPackageIsVersion4
+
+type Request struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	PersonId string `protobuf:"bytes,1,opt,name=personId,proto3" json:"personId,omitempty"`  //人员id
+	InfoId   string `protobuf:"bytes,2,opt,name=infoId,proto3" json:"infoId,omitempty"`      //信息id  逗号分隔
+	EntId    string `protobuf:"bytes,3,opt,name=entId,proto3" json:"entId,omitempty"`        // 企业id
+	IsInsert bool   `protobuf:"varint,4,opt,name=isInsert,proto3" json:"isInsert,omitempty"` // 是否插入不重复的数据
+	IsEnt    bool   `protobuf:"varint,5,opt,name=isEnt,proto3" json:"isEnt,omitempty"`       // 是否按企业id判重
+}
+
+func (x *Request) Reset() {
+	*x = Request{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_deduplication_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *Request) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Request) ProtoMessage() {}
+
+func (x *Request) ProtoReflect() protoreflect.Message {
+	mi := &file_deduplication_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use Request.ProtoReflect.Descriptor instead.
+func (*Request) Descriptor() ([]byte, []int) {
+	return file_deduplication_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *Request) GetPersonId() string {
+	if x != nil {
+		return x.PersonId
+	}
+	return ""
+}
+
+func (x *Request) GetInfoId() string {
+	if x != nil {
+		return x.InfoId
+	}
+	return ""
+}
+
+func (x *Request) GetEntId() string {
+	if x != nil {
+		return x.EntId
+	}
+	return ""
+}
+
+func (x *Request) GetIsInsert() bool {
+	if x != nil {
+		return x.IsInsert
+	}
+	return false
+}
+
+func (x *Request) GetIsEnt() bool {
+	if x != nil {
+		return x.IsEnt
+	}
+	return false
+}
+
+type Info struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	TotalCount int64 `protobuf:"varint,1,opt,name=totalCount,proto3" json:"totalCount,omitempty"` // 本次查询info_id总量
+	ExistCount int64 `protobuf:"varint,2,opt,name=existCount,proto3" json:"existCount,omitempty"` // 已存在的info_id的数量
+	NewCount   int64 `protobuf:"varint,3,opt,name=NewCount,proto3" json:"NewCount,omitempty"`     // 不存在的info_id的数量
+	IsInsert   bool  `protobuf:"varint,4,opt,name=isInsert,proto3" json:"isInsert,omitempty"`     // 是否插入新数据
+}
+
+func (x *Info) Reset() {
+	*x = Info{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_deduplication_proto_msgTypes[1]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *Info) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Info) ProtoMessage() {}
+
+func (x *Info) ProtoReflect() protoreflect.Message {
+	mi := &file_deduplication_proto_msgTypes[1]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use Info.ProtoReflect.Descriptor instead.
+func (*Info) Descriptor() ([]byte, []int) {
+	return file_deduplication_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *Info) GetTotalCount() int64 {
+	if x != nil {
+		return x.TotalCount
+	}
+	return 0
+}
+
+func (x *Info) GetExistCount() int64 {
+	if x != nil {
+		return x.ExistCount
+	}
+	return 0
+}
+
+func (x *Info) GetNewCount() int64 {
+	if x != nil {
+		return x.NewCount
+	}
+	return 0
+}
+
+func (x *Info) GetIsInsert() bool {
+	if x != nil {
+		return x.IsInsert
+	}
+	return false
+}
+
+type Response struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Code    int64  `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`      //响应代码
+	Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` //响应消息
+	Data    *Info  `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`       //响应内容
+}
+
+func (x *Response) Reset() {
+	*x = Response{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_deduplication_proto_msgTypes[2]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *Response) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*Response) ProtoMessage() {}
+
+func (x *Response) ProtoReflect() protoreflect.Message {
+	mi := &file_deduplication_proto_msgTypes[2]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use Response.ProtoReflect.Descriptor instead.
+func (*Response) Descriptor() ([]byte, []int) {
+	return file_deduplication_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *Response) GetCode() int64 {
+	if x != nil {
+		return x.Code
+	}
+	return 0
+}
+
+func (x *Response) GetMessage() string {
+	if x != nil {
+		return x.Message
+	}
+	return ""
+}
+
+func (x *Response) GetData() *Info {
+	if x != nil {
+		return x.Data
+	}
+	return nil
+}
+
+var File_deduplication_proto protoreflect.FileDescriptor
+
+var file_deduplication_proto_rawDesc = []byte{
+	0x0a, 0x13, 0x64, 0x65, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x64, 0x65, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61,
+	0x74, 0x69, 0x6f, 0x6e, 0x22, 0x85, 0x01, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
+	0x12, 0x1a, 0x0a, 0x08, 0x70, 0x65, 0x72, 0x73, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x09, 0x52, 0x08, 0x70, 0x65, 0x72, 0x73, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06,
+	0x69, 0x6e, 0x66, 0x6f, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x69, 0x6e,
+	0x66, 0x6f, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x03, 0x20,
+	0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x69, 0x73,
+	0x49, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73,
+	0x49, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x73, 0x45, 0x6e, 0x74, 0x18,
+	0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x69, 0x73, 0x45, 0x6e, 0x74, 0x22, 0x7e, 0x0a, 0x04,
+	0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1e, 0x0a, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x75,
+	0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43,
+	0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x65, 0x78, 0x69, 0x73, 0x74, 0x43, 0x6f, 0x75,
+	0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x65, 0x78, 0x69, 0x73, 0x74, 0x43,
+	0x6f, 0x75, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x4e, 0x65, 0x77, 0x43, 0x6f, 0x75, 0x6e, 0x74,
+	0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x4e, 0x65, 0x77, 0x43, 0x6f, 0x75, 0x6e, 0x74,
+	0x12, 0x1a, 0x0a, 0x08, 0x69, 0x73, 0x49, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x18, 0x04, 0x20, 0x01,
+	0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x49, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x22, 0x61, 0x0a, 0x08,
+	0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07,
+	0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d,
+	0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x27, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03,
+	0x20, 0x01, 0x28, 0x0b, 0x32, 0x13, 0x2e, 0x64, 0x65, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61,
+	0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32,
+	0x55, 0x0a, 0x0d, 0x44, 0x65, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e,
+	0x12, 0x44, 0x0a, 0x11, 0x64, 0x61, 0x74, 0x61, 0x44, 0x65, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63,
+	0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x16, 0x2e, 0x64, 0x65, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63,
+	0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e,
+	0x64, 0x65, 0x64, 0x75, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x52, 0x65,
+	0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+	file_deduplication_proto_rawDescOnce sync.Once
+	file_deduplication_proto_rawDescData = file_deduplication_proto_rawDesc
+)
+
+func file_deduplication_proto_rawDescGZIP() []byte {
+	file_deduplication_proto_rawDescOnce.Do(func() {
+		file_deduplication_proto_rawDescData = protoimpl.X.CompressGZIP(file_deduplication_proto_rawDescData)
+	})
+	return file_deduplication_proto_rawDescData
+}
+
+var file_deduplication_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
+var file_deduplication_proto_goTypes = []interface{}{
+	(*Request)(nil),  // 0: deduplication.Request
+	(*Info)(nil),     // 1: deduplication.Info
+	(*Response)(nil), // 2: deduplication.Response
+}
+var file_deduplication_proto_depIdxs = []int32{
+	1, // 0: deduplication.Response.data:type_name -> deduplication.Info
+	0, // 1: deduplication.Deduplication.dataDeduplication:input_type -> deduplication.Request
+	2, // 2: deduplication.Deduplication.dataDeduplication:output_type -> deduplication.Response
+	2, // [2:3] is the sub-list for method output_type
+	1, // [1:2] is the sub-list for method input_type
+	1, // [1:1] is the sub-list for extension type_name
+	1, // [1:1] is the sub-list for extension extendee
+	0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_deduplication_proto_init() }
+func file_deduplication_proto_init() {
+	if File_deduplication_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_deduplication_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*Request); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_deduplication_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*Info); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_deduplication_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*Response); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_deduplication_proto_rawDesc,
+			NumEnums:      0,
+			NumMessages:   3,
+			NumExtensions: 0,
+			NumServices:   1,
+		},
+		GoTypes:           file_deduplication_proto_goTypes,
+		DependencyIndexes: file_deduplication_proto_depIdxs,
+		MessageInfos:      file_deduplication_proto_msgTypes,
+	}.Build()
+	File_deduplication_proto = out.File
+	file_deduplication_proto_rawDesc = nil
+	file_deduplication_proto_goTypes = nil
+	file_deduplication_proto_depIdxs = nil
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// DeduplicationClient is the client API for Deduplication service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type DeduplicationClient interface {
+	// 数据判重
+	DataDeduplication(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
+}
+
+type deduplicationClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewDeduplicationClient(cc grpc.ClientConnInterface) DeduplicationClient {
+	return &deduplicationClient{cc}
+}
+
+func (c *deduplicationClient) DataDeduplication(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
+	out := new(Response)
+	err := c.cc.Invoke(ctx, "/deduplication.Deduplication/dataDeduplication", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// DeduplicationServer is the server API for Deduplication service.
+type DeduplicationServer interface {
+	// 数据判重
+	DataDeduplication(context.Context, *Request) (*Response, error)
+}
+
+// UnimplementedDeduplicationServer can be embedded to have forward compatible implementations.
+type UnimplementedDeduplicationServer struct {
+}
+
+func (*UnimplementedDeduplicationServer) DataDeduplication(context.Context, *Request) (*Response, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method DataDeduplication not implemented")
+}
+
+func RegisterDeduplicationServer(s *grpc.Server, srv DeduplicationServer) {
+	s.RegisterService(&_Deduplication_serviceDesc, srv)
+}
+
+func _Deduplication_DataDeduplication_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(Request)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(DeduplicationServer).DataDeduplication(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/deduplication.Deduplication/DataDeduplication",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(DeduplicationServer).DataDeduplication(ctx, req.(*Request))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _Deduplication_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "deduplication.Deduplication",
+	HandlerType: (*DeduplicationServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "dataDeduplication",
+			Handler:    _Deduplication_DataDeduplication_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "deduplication.proto",
+}

+ 41 - 0
rpc/deduplicationclient/deduplication.go

@@ -0,0 +1,41 @@
+// Code generated by goctl. DO NOT EDIT!
+// Source: deduplication.proto
+
+//go:generate mockgen -destination ./deduplication_mock.go -package deduplicationclient -source $GOFILE
+
+package deduplicationclient
+
+import (
+	"context"
+
+	"dataDeduplication/rpc/deduplication"
+
+	"github.com/tal-tech/go-zero/zrpc"
+)
+
+type (
+	Request  = deduplication.Request
+	Info     = deduplication.Info
+	Response = deduplication.Response
+
+	Deduplication interface {
+		//  数据判重
+		DataDeduplication(ctx context.Context, in *Request) (*Response, error)
+	}
+
+	defaultDeduplication struct {
+		cli zrpc.Client
+	}
+)
+
+func NewDeduplication(cli zrpc.Client) Deduplication {
+	return &defaultDeduplication{
+		cli: cli,
+	}
+}
+
+//  数据判重
+func (m *defaultDeduplication) DataDeduplication(ctx context.Context, in *Request) (*Response, error) {
+	client := deduplication.NewDeduplicationClient(m.cli.Conn())
+	return client.DataDeduplication(ctx, in)
+}

+ 14 - 0
rpc/etc/deduplication.yaml

@@ -0,0 +1,14 @@
+Name: deduplication.rpc
+ListenOn: 127.0.0.1:8080
+Etcd:
+  Hosts:
+  - 127.0.0.1:2379
+  Key: deduplication.rpc
+DataSource: root:root@tcp(127.0.0.1:3306)/quchong?charset=utf8mb4&parseTime=true&loc=Local
+FileSystemConf:
+  Etcd:
+    Hosts:
+      - 127.0.0.1:2379
+    Key: deduplication.rpc
+CalleeId: deduplication.rpc
+Node: 1

+ 13 - 0
rpc/internal/config/config.go

@@ -0,0 +1,13 @@
+package config
+
+import "github.com/tal-tech/go-zero/zrpc"
+
+
+type Config struct {
+	zrpc.RpcServerConf
+	DataSource     string // 手动代码
+	Node           int    // 节点
+	CalleeId       string // 服务名字
+	FileSystemConf zrpc.RpcClientConf
+}
+var  ConfigJson Config

+ 41 - 0
rpc/internal/logic/datadeduplicationlogic.go

@@ -0,0 +1,41 @@
+package logic
+
+import (
+	"context"
+	"dataDeduplication/service"
+
+	"dataDeduplication/rpc/deduplication"
+	"dataDeduplication/rpc/internal/svc"
+
+	"github.com/tal-tech/go-zero/core/logx"
+)
+
+type DataDeduplicationLogic struct {
+	ctx    context.Context
+	svcCtx *svc.ServiceContext
+	logx.Logger
+}
+
+func NewDataDeduplicationLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DataDeduplicationLogic {
+	return &DataDeduplicationLogic{
+		ctx:    ctx,
+		svcCtx: svcCtx,
+		Logger: logx.WithContext(ctx),
+	}
+}
+var deduplicationService service.DeduplicationService
+
+// 去重
+func (l *DataDeduplicationLogic) DataDeduplication(in *deduplication.Request) (*deduplication.Response, error) {
+	// todo: add your logic here and delete this line
+	info,err:=deduplicationService.DataDeduplicateInsert(in)
+	code := 0
+	if err!= ""{
+			code = -1
+	}
+	return &deduplication.Response{
+		Data: info,
+		Message: err,
+		Code: int64(code),
+	}, nil
+}

+ 28 - 0
rpc/internal/server/deduplicationserver.go

@@ -0,0 +1,28 @@
+// Code generated by goctl. DO NOT EDIT!
+// Source: deduplication.proto
+
+package server
+
+import (
+	"context"
+
+	"dataDeduplication/rpc/deduplication"
+	"dataDeduplication/rpc/internal/logic"
+	"dataDeduplication/rpc/internal/svc"
+)
+
+type DeduplicationServer struct {
+	svcCtx *svc.ServiceContext
+}
+
+func NewDeduplicationServer(svcCtx *svc.ServiceContext) *DeduplicationServer {
+	return &DeduplicationServer{
+		svcCtx: svcCtx,
+	}
+}
+
+//  数据判重
+func (s *DeduplicationServer) DataDeduplication(ctx context.Context, in *deduplication.Request) (*deduplication.Response, error) {
+	l := logic.NewDataDeduplicationLogic(ctx, s.svcCtx)
+	return l.DataDeduplication(in)
+}

+ 13 - 0
rpc/internal/svc/servicecontext.go

@@ -0,0 +1,13 @@
+package svc
+
+import "dataDeduplication/rpc/internal/config"
+
+type ServiceContext struct {
+	Config config.Config
+}
+
+func NewServiceContext(c config.Config) *ServiceContext {
+	return &ServiceContext{
+		Config: c,
+	}
+}

+ 14 - 0
rpc/test/deduplication.yaml

@@ -0,0 +1,14 @@
+Name: deduplication.rpc
+ListenOn: 127.0.0.1:8080
+Etcd:
+  Hosts:
+    - 127.0.0.1:2379
+  Key: deduplication.rpc
+DataSource: root:root@tcp(127.0.0.1:3306)/quchong?charset=utf8mb4&parseTime=true&loc=Local
+FileSystemConf:
+  Etcd:
+    Hosts:
+      - 127.0.0.1:2379
+    Key: deduplication.rpc
+CalleeId: deduplication.rpc
+Node: 1

+ 38 - 0
rpc/test/test1_test.go

@@ -0,0 +1,38 @@
+package test
+
+import (
+
+	"context"
+	"dataDeduplication/rpc/deduplication"
+	"dataDeduplication/rpc/deduplicationclient"
+	"dataDeduplication/rpc/internal/config"
+	"flag"
+	"github.com/tal-tech/go-zero/core/conf"
+	"github.com/tal-tech/go-zero/zrpc"
+	"log"
+	"testing"
+	"time"
+)
+
+var configFile = flag.String("f", "deduplication.yaml", "the config file")
+var c config.Config
+func init() {
+	conf.MustLoad(*configFile, &c)
+}
+
+
+func Test_centerUserCenter(t *testing.T) {
+	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+	FileSystem := deduplicationclient.NewDeduplication(zrpc.MustNewClient(c.FileSystemConf))
+	req := &deduplication.Request{}
+	req.InfoId="1aaaa,2,5555,888,222,abc"
+	req.EntId="1"
+	req.PersonId="0"
+	req.IsEnt=true
+	req.IsInsert = true
+	res, err := FileSystem.DataDeduplication(ctx, req)
+
+	log.Println("err ", err)
+	log.Println("req ", res)
+}
+

+ 141 - 0
service/deduplication.go

@@ -0,0 +1,141 @@
+package service
+
+import (
+	"dataDeduplication/entity"
+	"dataDeduplication/rpc/deduplication"
+	"fmt"
+	"github.com/go-xorm/xorm"
+	"log"
+	"strconv"
+	"strings"
+)
+
+//定义orm引擎
+var Engine *xorm.Engine
+
+type DeduplicationService struct{}
+
+var PREFIX = "qc"
+
+//数据判重
+func (service *DeduplicationService) DataDeduplicateInsert(data *deduplication.Request) (*deduplication.Info, string) {
+	log.Println("开始=====")
+	orm := Engine.NewSession()
+	defer orm.Close()
+	// 模运算取企业id
+	number, _ := strconv.Atoi(data.EntId)
+	tableName := PREFIX + fmt.Sprintf("%03d", number%100)
+	// 查询
+	var rs []*entity.Deduplication
+	var tmpList []string
+	var valueList []interface{}
+	var  selectSql string
+	if data.IsEnt{
+		valueList = append(valueList, data.EntId)
+	}else {
+		valueList = append(valueList, data.PersonId, data.EntId)
+	}
+
+	for _, v := range strings.Split(data.InfoId, ",") {
+		tmpList = append(tmpList, "?")
+		valueList = append(valueList, v)
+	}
+	if data.IsEnt{
+		selectSql = fmt.Sprintf("ent_id=? and info_id in (%s)", strings.Join(tmpList, ","))
+	}else {
+		selectSql = fmt.Sprintf("person_id = ? and ent_id=? and info_id in (%s)", strings.Join(tmpList, ","))
+	}
+	log.Println(selectSql)
+	infoIdList := strings.Split(data.InfoId, ",")
+	totalInfoCount := len(infoIdList)
+	err := orm.Table(tableName).Cols("info_id").Where(selectSql, valueList...).Find(&rs)
+	totalExist := len(rs)
+	log.Println(totalExist,"已存在")
+	if err != nil {
+		log.Println(err,"判重查询失败")
+		return &deduplication.Info{
+			TotalCount: 0,
+			ExistCount: 0,
+			NewCount:   0,
+			IsInsert:   false,
+		}, "判重查询失败"
+	}
+
+	if data.IsInsert {
+
+		existIdMap :=map[string]bool{}
+		for _,v := range rs{
+			existIdMap[v.InfoId]=true
+		}
+		// 开启事务
+		orm.Begin()
+		// 新增
+		var insertList []entity.Deduplication
+		for _, id := range infoIdList {
+			if existIdMap[id]{
+				log.Println("id已存在",id,)
+				continue
+			}
+			log.Println("新增",id,)
+			temData := entity.Deduplication{
+				InfoId:   id,
+				EntId:    data.EntId,
+				PersonId: data.PersonId,
+			}
+			insertList = append(insertList, temData)
+			if len(insertList) > 100 {
+				_, err3 := orm.Table(tableName).Insert(insertList)
+				insertList = []entity.Deduplication{}
+				if err3 != nil {
+					orm.Rollback()
+					log.Println(err3,"新增数据失败")
+					return &deduplication.Info{
+						TotalCount: int64(totalInfoCount),
+						ExistCount: int64(totalExist),
+						NewCount:   int64(totalInfoCount - totalExist),
+						IsInsert:   false,
+					}, "新增数据失败"
+				}
+			}
+		}
+		if len(insertList) > 0 {
+			_, err3 := orm.Table(tableName).Insert(insertList)
+			if err3 != nil {
+				orm.Rollback()
+				log.Println(err3,"新增数据失败")
+				return &deduplication.Info{
+					TotalCount: int64(totalInfoCount),
+					ExistCount: int64(totalExist),
+					NewCount:   int64(totalInfoCount - totalExist),
+					IsInsert:   false,
+				}, "新增数据失败"
+			}
+		}
+		err := orm.Commit()
+		if err != nil {
+			log.Println("提交失败")
+			return &deduplication.Info{
+				TotalCount: int64(totalInfoCount),
+				ExistCount: int64(totalExist),
+				NewCount:   int64(totalInfoCount - totalExist),
+				IsInsert:   false,
+			}, "提交失败"
+		} else {
+			log.Println("提交成功")
+			return &deduplication.Info{
+				TotalCount: int64(totalInfoCount),
+				ExistCount: int64(totalExist),
+				NewCount:   int64(totalInfoCount - totalExist),
+				IsInsert:   true,
+			}, ""
+
+		}
+
+	}
+	return &deduplication.Info{
+		TotalCount: int64(totalInfoCount),
+		ExistCount: int64(totalExist),
+		NewCount:   int64(totalInfoCount - totalExist),
+		IsInsert:   false,
+	}, ""
+}