소스 검색

证书添加

WH01243 1 주 전
부모
커밋
6d7ed6673b

+ 40 - 39
client/chat/chat.pb.go

@@ -2,7 +2,7 @@
 // versions:
 // 	protoc-gen-go v1.36.6
 // 	protoc        v3.15.1
-// source: proto/chat.proto
+// source: chat.proto
 
 package chat
 
@@ -29,7 +29,7 @@ type PingRequest struct {
 
 func (x *PingRequest) Reset() {
 	*x = PingRequest{}
-	mi := &file_proto_chat_proto_msgTypes[0]
+	mi := &file_chat_proto_msgTypes[0]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -41,7 +41,7 @@ func (x *PingRequest) String() string {
 func (*PingRequest) ProtoMessage() {}
 
 func (x *PingRequest) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[0]
+	mi := &file_chat_proto_msgTypes[0]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -54,7 +54,7 @@ func (x *PingRequest) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use PingRequest.ProtoReflect.Descriptor instead.
 func (*PingRequest) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{0}
+	return file_chat_proto_rawDescGZIP(), []int{0}
 }
 
 type PingResponse struct {
@@ -67,7 +67,7 @@ type PingResponse struct {
 
 func (x *PingResponse) Reset() {
 	*x = PingResponse{}
-	mi := &file_proto_chat_proto_msgTypes[1]
+	mi := &file_chat_proto_msgTypes[1]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -79,7 +79,7 @@ func (x *PingResponse) String() string {
 func (*PingResponse) ProtoMessage() {}
 
 func (x *PingResponse) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[1]
+	mi := &file_chat_proto_msgTypes[1]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -92,7 +92,7 @@ func (x *PingResponse) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead.
 func (*PingResponse) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{1}
+	return file_chat_proto_rawDescGZIP(), []int{1}
 }
 
 func (x *PingResponse) GetStatus() string {
@@ -119,7 +119,7 @@ type JoinRequest struct {
 
 func (x *JoinRequest) Reset() {
 	*x = JoinRequest{}
-	mi := &file_proto_chat_proto_msgTypes[2]
+	mi := &file_chat_proto_msgTypes[2]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -131,7 +131,7 @@ func (x *JoinRequest) String() string {
 func (*JoinRequest) ProtoMessage() {}
 
 func (x *JoinRequest) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[2]
+	mi := &file_chat_proto_msgTypes[2]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -144,7 +144,7 @@ func (x *JoinRequest) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use JoinRequest.ProtoReflect.Descriptor instead.
 func (*JoinRequest) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{2}
+	return file_chat_proto_rawDescGZIP(), []int{2}
 }
 
 func (x *JoinRequest) GetUserId() string {
@@ -173,7 +173,7 @@ type Message struct {
 
 func (x *Message) Reset() {
 	*x = Message{}
-	mi := &file_proto_chat_proto_msgTypes[3]
+	mi := &file_chat_proto_msgTypes[3]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -185,7 +185,7 @@ func (x *Message) String() string {
 func (*Message) ProtoMessage() {}
 
 func (x *Message) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[3]
+	mi := &file_chat_proto_msgTypes[3]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -198,7 +198,7 @@ func (x *Message) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use Message.ProtoReflect.Descriptor instead.
 func (*Message) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{3}
+	return file_chat_proto_rawDescGZIP(), []int{3}
 }
 
 func (x *Message) GetUserId() string {
@@ -239,7 +239,7 @@ type MessageAck struct {
 
 func (x *MessageAck) Reset() {
 	*x = MessageAck{}
-	mi := &file_proto_chat_proto_msgTypes[4]
+	mi := &file_chat_proto_msgTypes[4]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -251,7 +251,7 @@ func (x *MessageAck) String() string {
 func (*MessageAck) ProtoMessage() {}
 
 func (x *MessageAck) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[4]
+	mi := &file_chat_proto_msgTypes[4]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -264,7 +264,7 @@ func (x *MessageAck) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use MessageAck.ProtoReflect.Descriptor instead.
 func (*MessageAck) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{4}
+	return file_chat_proto_rawDescGZIP(), []int{4}
 }
 
 func (x *MessageAck) GetSuccess() bool {
@@ -281,11 +281,12 @@ func (x *MessageAck) GetMessageId() string {
 	return ""
 }
 
-var File_proto_chat_proto protoreflect.FileDescriptor
+var File_chat_proto protoreflect.FileDescriptor
 
-const file_proto_chat_proto_rawDesc = "" +
+const file_chat_proto_rawDesc = "" +
 	"\n" +
-	"\x10proto/chat.proto\x12\x04chat\"\r\n" +
+	"\n" +
+	"chat.proto\x12\x04chat\"\r\n" +
 	"\vPingRequest\"D\n" +
 	"\fPingResponse\x12\x16\n" +
 	"\x06status\x18\x01 \x01(\tR\x06status\x12\x1c\n" +
@@ -306,29 +307,29 @@ const file_proto_chat_proto_rawDesc = "" +
 	"\vChatService\x12.\n" +
 	"\bJoinChat\x12\x11.chat.JoinRequest\x1a\r.chat.Message0\x01\x12.\n" +
 	"\vSendMessage\x12\r.chat.Message\x1a\x10.chat.MessageAck\x12-\n" +
-	"\x04Ping\x12\x11.chat.PingRequest\x1a\x12.chat.PingResponseB\rZ\v./chat;chatb\x06proto3"
+	"\x04Ping\x12\x11.chat.PingRequest\x1a\x12.chat.PingResponseB\x0eZ\f../chat;chatb\x06proto3"
 
 var (
-	file_proto_chat_proto_rawDescOnce sync.Once
-	file_proto_chat_proto_rawDescData []byte
+	file_chat_proto_rawDescOnce sync.Once
+	file_chat_proto_rawDescData []byte
 )
 
-func file_proto_chat_proto_rawDescGZIP() []byte {
-	file_proto_chat_proto_rawDescOnce.Do(func() {
-		file_proto_chat_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_proto_chat_proto_rawDesc), len(file_proto_chat_proto_rawDesc)))
+func file_chat_proto_rawDescGZIP() []byte {
+	file_chat_proto_rawDescOnce.Do(func() {
+		file_chat_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_chat_proto_rawDesc), len(file_chat_proto_rawDesc)))
 	})
-	return file_proto_chat_proto_rawDescData
+	return file_chat_proto_rawDescData
 }
 
-var file_proto_chat_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
-var file_proto_chat_proto_goTypes = []any{
+var file_chat_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
+var file_chat_proto_goTypes = []any{
 	(*PingRequest)(nil),  // 0: chat.PingRequest
 	(*PingResponse)(nil), // 1: chat.PingResponse
 	(*JoinRequest)(nil),  // 2: chat.JoinRequest
 	(*Message)(nil),      // 3: chat.Message
 	(*MessageAck)(nil),   // 4: chat.MessageAck
 }
-var file_proto_chat_proto_depIdxs = []int32{
+var file_chat_proto_depIdxs = []int32{
 	2, // 0: chat.ChatService.JoinChat:input_type -> chat.JoinRequest
 	3, // 1: chat.ChatService.SendMessage:input_type -> chat.Message
 	0, // 2: chat.ChatService.Ping:input_type -> chat.PingRequest
@@ -342,26 +343,26 @@ var file_proto_chat_proto_depIdxs = []int32{
 	0, // [0:0] is the sub-list for field type_name
 }
 
-func init() { file_proto_chat_proto_init() }
-func file_proto_chat_proto_init() {
-	if File_proto_chat_proto != nil {
+func init() { file_chat_proto_init() }
+func file_chat_proto_init() {
+	if File_chat_proto != nil {
 		return
 	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
 		File: protoimpl.DescBuilder{
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
-			RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_chat_proto_rawDesc), len(file_proto_chat_proto_rawDesc)),
+			RawDescriptor: unsafe.Slice(unsafe.StringData(file_chat_proto_rawDesc), len(file_chat_proto_rawDesc)),
 			NumEnums:      0,
 			NumMessages:   5,
 			NumExtensions: 0,
 			NumServices:   1,
 		},
-		GoTypes:           file_proto_chat_proto_goTypes,
-		DependencyIndexes: file_proto_chat_proto_depIdxs,
-		MessageInfos:      file_proto_chat_proto_msgTypes,
+		GoTypes:           file_chat_proto_goTypes,
+		DependencyIndexes: file_chat_proto_depIdxs,
+		MessageInfos:      file_chat_proto_msgTypes,
 	}.Build()
-	File_proto_chat_proto = out.File
-	file_proto_chat_proto_goTypes = nil
-	file_proto_chat_proto_depIdxs = nil
+	File_chat_proto = out.File
+	file_chat_proto_goTypes = nil
+	file_chat_proto_depIdxs = nil
 }

+ 2 - 2
client/chat/chat_grpc.pb.go

@@ -2,7 +2,7 @@
 // versions:
 // - protoc-gen-go-grpc v1.5.1
 // - protoc             v3.15.1
-// source: proto/chat.proto
+// source: chat.proto
 
 package chat
 
@@ -197,5 +197,5 @@ var ChatService_ServiceDesc = grpc.ServiceDesc{
 			ServerStreams: true,
 		},
 	},
-	Metadata: "proto/chat.proto",
+	Metadata: "chat.proto",
 }

+ 3 - 1
client/config.json

@@ -1,5 +1,7 @@
 {
   "serviceAddress1": "jybx3-webtest.jydev.jianyu360.com:50051",
   "serviceAddress": "127.0.0.1:50051",
-  "informationDelay": 20
+  "informationDelay": 20,
+  "personName": "jianyu",
+  "password": "jianyu@123"
 }

+ 2 - 0
client/config/config.go

@@ -8,6 +8,8 @@ import (
 type Config struct {
 	ServiceAddress   string `json:"serviceAddress"`
 	InformationDelay int    `json:"informationDelay"`
+	PersonName       string `json:"personName"`
+	Password         string `json:"password"`
 }
 
 var (

+ 1 - 1
client/proto/chat.proto

@@ -2,7 +2,7 @@ syntax = "proto3";
 
 package chat;
 
-option go_package = "./chat;chat";
+option go_package = "../chat;chat";
 
 service ChatService {
   rpc JoinChat(JoinRequest) returns (stream Message);

+ 179 - 76
client/service/chatClient.go

@@ -1,9 +1,11 @@
 package service
 
 import (
+	"client/config"
 	"context"
 	"fmt"
 	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/credentials/insecure"
 	"log"
 	"math"
 	"os"
@@ -15,18 +17,17 @@ import (
 	. "client/chat"
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/connectivity"
-	"google.golang.org/grpc/credentials/insecure"
 	"google.golang.org/grpc/keepalive"
 	"google.golang.org/grpc/status"
 )
 
 const (
-	initialReconnectInterval = 1 * time.Second
-	keepaliveTime            = 60 * time.Second
-	keepaliveTimeout         = 20 * time.Second
-	maxRetryCount            = 60
-	connectionTimeout        = 3 * time.Second
-	maxReconnectInterval     = 60 * time.Second
+	initialReconnectInterval = 2 * time.Second
+	keepaliveTime            = 30 * time.Second
+	keepaliveTimeout         = 10 * time.Second
+	maxRetryCount            = math.MaxInt32
+	connectionTimeout        = 5 * time.Second
+	maxReconnectInterval     = 120 * time.Second
 	healthCheckInterval      = 30 * time.Second
 )
 
@@ -61,21 +62,24 @@ func NewChatClient(userID, address string) *ChatClient {
 }
 
 // 连接服务器
-func (c *ChatClient) connect() error {
+func (c *ChatClient) connect(password string) error {
 	c.mu.Lock()
 	defer c.mu.Unlock()
-
+	// 1. 检查现有连接是否可用
+	// 1. 检查现有连接是否可用
 	if c.isConnected && c.conn.GetState() == connectivity.Ready {
 		return nil
 	}
 
+	// 创建gRPC连接(明文传输,仅用于测试)
 	log.Println("[连接] 尝试连接服务器...", c.serviceAddress)
 	ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
 	defer cancel()
 
 	conn, err := grpc.DialContext(ctx, c.serviceAddress,
-		grpc.WithTransportCredentials(insecure.NewCredentials()),
+		grpc.WithTransportCredentials(insecure.NewCredentials()), // 关键修改
 		grpc.WithBlock(),
+		grpc.WithPerRPCCredentials(&authCreds{password: password}),
 		grpc.WithDefaultCallOptions(
 			grpc.MaxCallRecvMsgSize(20*1024*1024),
 			grpc.MaxCallSendMsgSize(20*1024*1024),
@@ -189,55 +193,47 @@ func (c *ChatClient) reconnect() {
 	c.wg.Add(1)
 	defer c.wg.Done()
 
-	log.Printf("[重连] 开始重连流程,当前重试计数: %d", currentRetry)
-
 	for {
 		select {
 		case <-c.ctx.Done():
-			log.Println("[重连] 上下文取消,停止重连")
 			return
 		default:
-			c.mu.Lock()
-			if c.retryCount >= maxRetryCount {
-				log.Printf("[重连] 达到最大重试次数(%d),停止重连", maxRetryCount)
-				c.mu.Unlock()
+			// 彻底断开旧连接
+			c.disconnect()
+
+			// 尝试新连接
+			err := c.connect(config.Cfg.Password)
+			if err == nil {
+				log.Printf("重连成功")
+				go c.establishStream()
 				return
 			}
-			c.mu.Unlock()
-
-			log.Printf("[重连] 尝试第%d次连接...", currentRetry+1)
-			if err := c.connect(); err != nil {
-				log.Printf("[重连][用户:%s] 连接失败: %v", c.userID, err)
-
-				c.mu.Lock()
-				c.retryCount++
-				currentRetry = c.retryCount
-				c.mu.Unlock()
-
-				// 指数退避算法
-				backoff := initialReconnectInterval
-				if currentRetry > 0 {
-					backoff = time.Duration(math.Min(
-						float64(initialReconnectInterval)*math.Pow(1.5, float64(currentRetry)),
-						float64(maxReconnectInterval),
-					))
-				}
 
-				select {
-				case <-time.After(backoff):
-					continue
-				case <-c.ctx.Done():
-					log.Printf("[重连][用户:%s] 等待期间上下文取消", c.userID)
-					return
-				}
-			} else {
-				log.Printf("[重连][用户:%s] 连接成功!", c.userID)
-				go c.establishStream()
+			// 错误处理
+			if shouldStopRetry(err) {
+				log.Printf("不可恢复错误,停止重连")
+				return
+			}
+
+			// 智能退避
+			backoff := calculateBackoff(currentRetry)
+			log.Printf("等待 %v 后重试", backoff)
+
+			select {
+			case <-time.After(backoff):
+				currentRetry++
+			case <-c.ctx.Done():
 				return
 			}
 		}
 	}
 }
+func calculateBackoff(retryCount int) time.Duration {
+	base := float64(initialReconnectInterval)
+	max := float64(maxReconnectInterval)
+	backoff := base * math.Pow(1.5, float64(retryCount))
+	return time.Duration(math.Min(backoff, max))
+}
 
 // 建立流
 func (c *ChatClient) establishStream() {
@@ -287,13 +283,31 @@ func (c *ChatClient) establishStream() {
 func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error {
 	for {
 		msg, err := stream.Recv()
+		if msg == nil {
+			continue
+		}
+		// 添加空消息检查
 		if err != nil {
 			if status.Code(err) == codes.Canceled {
 				return nil // 正常关闭
 			}
-			return fmt.Errorf("接收消息错误: %v", err)
+
+			// 流错误时立即触发重连
+			log.Printf("[流] 接收错误: %v,触发重连", err)
+			go c.reconnect()
+			return err
 		}
 
+		if msg == nil || msg.Text == "" || msg.UserId == "" {
+			continue
+		}
+
+		if err != nil {
+			if status.Code(err) == codes.Canceled {
+				return nil // 正常关闭
+			}
+			return fmt.Errorf("接收消息错误: %v", err)
+		}
 		log.Printf("[接收] 收到消息: %+v", msg)
 		if msg.UserId == "系统" {
 			switch msg.Action {
@@ -319,7 +333,8 @@ func (c *ChatClient) SendMessage(text, action string) error {
 	c.mu.RLock()
 	defer c.mu.RUnlock()
 	if !c.isReady() {
-		return fmt.Errorf("未连接服务器")
+		go c.reconnect() // 立即触发重连
+		return fmt.Errorf("未连接服务器,正在尝试重连...")
 	}
 
 	msg := &Message{
@@ -332,12 +347,19 @@ func (c *ChatClient) SendMessage(text, action string) error {
 	ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
 	defer cancel()
 
-	_, err := c.client.SendMessage(ctx, msg)
+	aaa, err := c.client.SendMessage(ctx, msg)
+	log.Println(aaa)
 	if err != nil {
-		log.Printf("[发送] 发送失败: %v", err)
-		c.disconnect()
-		go c.reconnect()
-		return err
+		st, ok := status.FromError(err)
+		if ok {
+			switch st.Code() {
+			case codes.Unavailable, codes.DeadlineExceeded:
+				go c.reconnect() // 网络问题触发重连
+			case codes.Unauthenticated, codes.PermissionDenied:
+				// 认证错误不需要重连
+			}
+		}
+		return fmt.Errorf("发送失败: %v", err)
 	}
 	return nil
 }
@@ -347,8 +369,8 @@ func ConnectGRPC(userId, address string) {
 	log.Println("[主程序] 启动GRPC连接")
 	client = NewChatClient(userId, address)
 	defer client.Shutdown()
-
-	if err := client.connect(); err != nil {
+	go client.startConnectionMonitor()
+	if err := client.connect(config.Cfg.Password); err != nil {
 		log.Printf("[主程序] 初始连接失败: %v", err)
 		go client.reconnect()
 	} else {
@@ -368,38 +390,41 @@ func (c *ChatClient) isReady() bool {
 	}
 
 	state := c.conn.GetState()
-	// 只有当状态是Ready时才返回true
-	return state == connectivity.Ready
-}
+	if state != connectivity.Ready {
+		return false
+	}
 
-// startHealthCheck 启动健康检查
-func (c *ChatClient) startHealthCheck() {
-	if c.healthCheckTicker != nil {
-		c.healthCheckTicker.Stop()
+	// 增加更严格的活跃时间检查
+	if time.Since(c.lastPingTime) > 3*keepaliveTime {
+		return false
 	}
 
+	return true
+}
+
+// 修改健康检查实现
+func (c *ChatClient) startHealthCheck() {
 	c.healthCheckTicker = time.NewTicker(healthCheckInterval)
 	c.wg.Add(1)
 
 	go func() {
 		defer c.wg.Done()
-		defer c.healthCheckTicker.Stop()
 		for {
 			select {
 			case <-c.healthCheckTicker.C:
 				if !c.isReady() {
-					log.Printf("[健康检查][用户:%s] 连接不可用,触发重连", c.userID)
+					log.Printf("连接不可用,触发重连")
 					go c.reconnect()
 					continue
 				}
 
-				// 执行Ping检查
-				ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
+				// 主动健康检查
+				ctx, cancel := context.WithTimeout(c.ctx, 3*time.Second)
 				_, err := c.client.Ping(ctx, &PingRequest{})
 				cancel()
 
 				if err != nil {
-					log.Printf("[健康检查][用户:%s] Ping失败: %v", c.userID, err)
+					log.Printf("健康检查失败: %v", err)
 					c.disconnect()
 					go c.reconnect()
 				} else {
@@ -409,7 +434,6 @@ func (c *ChatClient) startHealthCheck() {
 				}
 
 			case <-c.ctx.Done():
-				log.Printf("[健康检查][用户:%s] 停止健康检查", c.userID)
 				return
 			}
 		}
@@ -421,18 +445,30 @@ func (c *ChatClient) checkHealth() error {
 	ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
 	defer cancel()
 
-	// 优先使用JoinChat作为健康检查
-	_, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID, Force: true})
+	// 方法1:使用已有方法检查
+	_, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID, Force: false})
 	if err != nil {
-		st, ok := status.FromError(err)
-		if ok && st.Code() == codes.Unimplemented {
-			// 如果JoinChat未实现,尝试其他方法
-			return c.fallbackHealthCheck()
-		}
-		return err
+		return fmt.Errorf("健康检查失败: %w", err)
 	}
+
 	return nil
 }
+func (c *ChatClient) checkHealthWithRetry(maxRetry int) error {
+	var lastErr error
+	for i := 0; i < maxRetry; i++ {
+		ctx, cancel := context.WithTimeout(c.ctx, 3*time.Second)
+		_, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID, Force: false})
+		cancel()
+
+		if err == nil {
+			return nil
+		}
+
+		lastErr = err
+		time.Sleep(time.Duration(i+1) * time.Second) // 指数退避
+	}
+	return lastErr
+}
 func (c *ChatClient) fallbackHealthCheck() error {
 	// 实现其他健康检查方式
 	return nil
@@ -450,3 +486,70 @@ func (c *ChatClient) Shutdown() {
 func (c *ChatClient) log() *log.Logger {
 	return log.New(os.Stdout, fmt.Sprintf("[用户:%s] ", c.userID), log.LstdFlags)
 }
+
+// 实现 credentials.PerRPCCredentials 接口
+type authCreds struct {
+	password string
+}
+
+func (c *authCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
+	return map[string]string{
+		"password": c.password,
+	}, nil
+}
+
+func (c *authCreds) RequireTransportSecurity() bool {
+	return false // 必须使用TLS
+}
+func (c *ChatClient) startConnectionMonitor() {
+	c.wg.Add(1)
+	go func() {
+		defer c.wg.Done()
+
+		ticker := time.NewTicker(1 * time.Minute) // 更频繁的检查
+		defer ticker.Stop()
+
+		for {
+			select {
+			case <-ticker.C:
+				c.mu.RLock()
+				conn := c.conn
+				c.mu.RUnlock()
+
+				if conn == nil {
+					log.Printf("[监控] 连接不存在,触发重连")
+					go c.reconnect()
+					continue
+				}
+
+				state := conn.GetState()
+				log.Printf("[监控] 当前连接状态: %v", state)
+
+				// 更全面的状态检查
+				if state == connectivity.TransientFailure ||
+					state == connectivity.Shutdown ||
+					(state == connectivity.Ready && time.Since(c.lastPingTime) > 2*keepaliveTime) {
+					log.Printf("[监控] 连接异常,触发重连")
+					go c.reconnect()
+				}
+
+			case <-c.ctx.Done():
+				log.Printf("[监控] 监控停止")
+				return
+			}
+		}
+	}()
+}
+func shouldStopRetry(err error) bool {
+	if st, ok := status.FromError(err); ok {
+		switch st.Code() {
+		case codes.Unavailable, codes.DeadlineExceeded:
+			return false // 可恢复错误
+		case codes.Unauthenticated, codes.PermissionDenied:
+			return true // 认证错误
+		case codes.Unimplemented:
+			return true // 方法未实现
+		}
+	}
+	return false
+}

+ 7 - 7
client/service/wx.go

@@ -131,7 +131,6 @@ func WxHandle() {
 func GetContacts() {
 	// 创建联系人数据切片
 	returnData := []map[string]interface{}{}
-	log.Println(app.WxClient.GetContacts())
 	// 遍历所有联系人
 	for _, c := range app.WxClient.GetContacts() {
 		// 过滤不需要的联系人类型:
@@ -149,6 +148,7 @@ func GetContacts() {
 			"remark":      c.Remark, // 备注名
 			"phone":       phone,
 			"appellation": name,
+			"personName":  config.Cfg.PersonName,
 		})
 
 	}
@@ -303,7 +303,7 @@ func doSendTalk(dataStr string) error {
 	dataMap := gconv.Map(dataStr)
 	taskId := gconv.String(dataMap["taskId"])
 	userMap := gconv.Map(dataMap["user"])
-	replyLanguage := gconv.String(dataMap["replyLanguage"])
+	//replyLanguage := gconv.String(dataMap["replyLanguage"])
 	wxId := gconv.String(userMap["wxid"])
 	appellation := gconv.String(userMap["appellation"])
 	batchCode := gconv.String(dataMap["batchCode"])
@@ -339,14 +339,14 @@ func doSendTalk(dataStr string) error {
 			}
 		}
 	}
-	time.Sleep(3 * time.Second)
-	if replyLanguage != "" {
-		ok1, err := sendText(replyLanguage, wxId)
+	//time.Sleep(3 * time.Second)
+	/*if replyLanguage != "" {
+		ok1, err := sendText(replyLanguage, wxId, "")
 		if !ok1 {
 			ok = false
 			log.Println(err)
 		}
-	}
+	}*/
 
 	// 发送回执
 	returnData := map[string]interface{}{
@@ -388,7 +388,7 @@ func Reject(text string) {
 	}
 	switch contentType {
 	case ContentTypeText, ContentTypeLink:
-		sendText(content, wxId)
+		sendText(content, wxId, "")
 	case ContentTypeImage:
 		sendImage(content, wxId, "A101")
 	case ContentTypeVideo:

+ 52 - 42
rpc/chat/chat.pb.go

@@ -2,7 +2,7 @@
 // versions:
 // 	protoc-gen-go v1.36.6
 // 	protoc        v3.15.1
-// source: proto/chat.proto
+// source: chat.proto
 
 package chat
 
@@ -29,7 +29,7 @@ type PingRequest struct {
 
 func (x *PingRequest) Reset() {
 	*x = PingRequest{}
-	mi := &file_proto_chat_proto_msgTypes[0]
+	mi := &file_chat_proto_msgTypes[0]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -41,7 +41,7 @@ func (x *PingRequest) String() string {
 func (*PingRequest) ProtoMessage() {}
 
 func (x *PingRequest) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[0]
+	mi := &file_chat_proto_msgTypes[0]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -54,19 +54,20 @@ func (x *PingRequest) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use PingRequest.ProtoReflect.Descriptor instead.
 func (*PingRequest) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{0}
+	return file_chat_proto_rawDescGZIP(), []int{0}
 }
 
 type PingResponse struct {
 	state         protoimpl.MessageState `protogen:"open.v1"`
-	Status        string                 `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` // 可以返回服务器状态
+	Status        string                 `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
+	Timestamp     int64                  `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
 	unknownFields protoimpl.UnknownFields
 	sizeCache     protoimpl.SizeCache
 }
 
 func (x *PingResponse) Reset() {
 	*x = PingResponse{}
-	mi := &file_proto_chat_proto_msgTypes[1]
+	mi := &file_chat_proto_msgTypes[1]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -78,7 +79,7 @@ func (x *PingResponse) String() string {
 func (*PingResponse) ProtoMessage() {}
 
 func (x *PingResponse) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[1]
+	mi := &file_chat_proto_msgTypes[1]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -91,7 +92,7 @@ func (x *PingResponse) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead.
 func (*PingResponse) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{1}
+	return file_chat_proto_rawDescGZIP(), []int{1}
 }
 
 func (x *PingResponse) GetStatus() string {
@@ -101,6 +102,13 @@ func (x *PingResponse) GetStatus() string {
 	return ""
 }
 
+func (x *PingResponse) GetTimestamp() int64 {
+	if x != nil {
+		return x.Timestamp
+	}
+	return 0
+}
+
 type JoinRequest struct {
 	state         protoimpl.MessageState `protogen:"open.v1"`
 	UserId        string                 `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"`
@@ -111,7 +119,7 @@ type JoinRequest struct {
 
 func (x *JoinRequest) Reset() {
 	*x = JoinRequest{}
-	mi := &file_proto_chat_proto_msgTypes[2]
+	mi := &file_chat_proto_msgTypes[2]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -123,7 +131,7 @@ func (x *JoinRequest) String() string {
 func (*JoinRequest) ProtoMessage() {}
 
 func (x *JoinRequest) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[2]
+	mi := &file_chat_proto_msgTypes[2]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -136,7 +144,7 @@ func (x *JoinRequest) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use JoinRequest.ProtoReflect.Descriptor instead.
 func (*JoinRequest) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{2}
+	return file_chat_proto_rawDescGZIP(), []int{2}
 }
 
 func (x *JoinRequest) GetUserId() string {
@@ -165,7 +173,7 @@ type Message struct {
 
 func (x *Message) Reset() {
 	*x = Message{}
-	mi := &file_proto_chat_proto_msgTypes[3]
+	mi := &file_chat_proto_msgTypes[3]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -177,7 +185,7 @@ func (x *Message) String() string {
 func (*Message) ProtoMessage() {}
 
 func (x *Message) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[3]
+	mi := &file_chat_proto_msgTypes[3]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -190,7 +198,7 @@ func (x *Message) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use Message.ProtoReflect.Descriptor instead.
 func (*Message) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{3}
+	return file_chat_proto_rawDescGZIP(), []int{3}
 }
 
 func (x *Message) GetUserId() string {
@@ -231,7 +239,7 @@ type MessageAck struct {
 
 func (x *MessageAck) Reset() {
 	*x = MessageAck{}
-	mi := &file_proto_chat_proto_msgTypes[4]
+	mi := &file_chat_proto_msgTypes[4]
 	ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 	ms.StoreMessageInfo(mi)
 }
@@ -243,7 +251,7 @@ func (x *MessageAck) String() string {
 func (*MessageAck) ProtoMessage() {}
 
 func (x *MessageAck) ProtoReflect() protoreflect.Message {
-	mi := &file_proto_chat_proto_msgTypes[4]
+	mi := &file_chat_proto_msgTypes[4]
 	if x != nil {
 		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
 		if ms.LoadMessageInfo() == nil {
@@ -256,7 +264,7 @@ func (x *MessageAck) ProtoReflect() protoreflect.Message {
 
 // Deprecated: Use MessageAck.ProtoReflect.Descriptor instead.
 func (*MessageAck) Descriptor() ([]byte, []int) {
-	return file_proto_chat_proto_rawDescGZIP(), []int{4}
+	return file_chat_proto_rawDescGZIP(), []int{4}
 }
 
 func (x *MessageAck) GetSuccess() bool {
@@ -273,14 +281,16 @@ func (x *MessageAck) GetMessageId() string {
 	return ""
 }
 
-var File_proto_chat_proto protoreflect.FileDescriptor
+var File_chat_proto protoreflect.FileDescriptor
 
-const file_proto_chat_proto_rawDesc = "" +
+const file_chat_proto_rawDesc = "" +
+	"\n" +
 	"\n" +
-	"\x10proto/chat.proto\x12\x04chat\"\r\n" +
-	"\vPingRequest\"&\n" +
+	"chat.proto\x12\x04chat\"\r\n" +
+	"\vPingRequest\"D\n" +
 	"\fPingResponse\x12\x16\n" +
-	"\x06status\x18\x01 \x01(\tR\x06status\"<\n" +
+	"\x06status\x18\x01 \x01(\tR\x06status\x12\x1c\n" +
+	"\ttimestamp\x18\x02 \x01(\x03R\ttimestamp\"<\n" +
 	"\vJoinRequest\x12\x17\n" +
 	"\auser_id\x18\x01 \x01(\tR\x06userId\x12\x14\n" +
 	"\x05force\x18\x02 \x01(\bR\x05force\"l\n" +
@@ -297,29 +307,29 @@ const file_proto_chat_proto_rawDesc = "" +
 	"\vChatService\x12.\n" +
 	"\bJoinChat\x12\x11.chat.JoinRequest\x1a\r.chat.Message0\x01\x12.\n" +
 	"\vSendMessage\x12\r.chat.Message\x1a\x10.chat.MessageAck\x12-\n" +
-	"\x04Ping\x12\x11.chat.PingRequest\x1a\x12.chat.PingResponseB\rZ\v./chat;chatb\x06proto3"
+	"\x04Ping\x12\x11.chat.PingRequest\x1a\x12.chat.PingResponseB\x0eZ\f../chat;chatb\x06proto3"
 
 var (
-	file_proto_chat_proto_rawDescOnce sync.Once
-	file_proto_chat_proto_rawDescData []byte
+	file_chat_proto_rawDescOnce sync.Once
+	file_chat_proto_rawDescData []byte
 )
 
-func file_proto_chat_proto_rawDescGZIP() []byte {
-	file_proto_chat_proto_rawDescOnce.Do(func() {
-		file_proto_chat_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_proto_chat_proto_rawDesc), len(file_proto_chat_proto_rawDesc)))
+func file_chat_proto_rawDescGZIP() []byte {
+	file_chat_proto_rawDescOnce.Do(func() {
+		file_chat_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_chat_proto_rawDesc), len(file_chat_proto_rawDesc)))
 	})
-	return file_proto_chat_proto_rawDescData
+	return file_chat_proto_rawDescData
 }
 
-var file_proto_chat_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
-var file_proto_chat_proto_goTypes = []any{
+var file_chat_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
+var file_chat_proto_goTypes = []any{
 	(*PingRequest)(nil),  // 0: chat.PingRequest
 	(*PingResponse)(nil), // 1: chat.PingResponse
 	(*JoinRequest)(nil),  // 2: chat.JoinRequest
 	(*Message)(nil),      // 3: chat.Message
 	(*MessageAck)(nil),   // 4: chat.MessageAck
 }
-var file_proto_chat_proto_depIdxs = []int32{
+var file_chat_proto_depIdxs = []int32{
 	2, // 0: chat.ChatService.JoinChat:input_type -> chat.JoinRequest
 	3, // 1: chat.ChatService.SendMessage:input_type -> chat.Message
 	0, // 2: chat.ChatService.Ping:input_type -> chat.PingRequest
@@ -333,26 +343,26 @@ var file_proto_chat_proto_depIdxs = []int32{
 	0, // [0:0] is the sub-list for field type_name
 }
 
-func init() { file_proto_chat_proto_init() }
-func file_proto_chat_proto_init() {
-	if File_proto_chat_proto != nil {
+func init() { file_chat_proto_init() }
+func file_chat_proto_init() {
+	if File_chat_proto != nil {
 		return
 	}
 	type x struct{}
 	out := protoimpl.TypeBuilder{
 		File: protoimpl.DescBuilder{
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
-			RawDescriptor: unsafe.Slice(unsafe.StringData(file_proto_chat_proto_rawDesc), len(file_proto_chat_proto_rawDesc)),
+			RawDescriptor: unsafe.Slice(unsafe.StringData(file_chat_proto_rawDesc), len(file_chat_proto_rawDesc)),
 			NumEnums:      0,
 			NumMessages:   5,
 			NumExtensions: 0,
 			NumServices:   1,
 		},
-		GoTypes:           file_proto_chat_proto_goTypes,
-		DependencyIndexes: file_proto_chat_proto_depIdxs,
-		MessageInfos:      file_proto_chat_proto_msgTypes,
+		GoTypes:           file_chat_proto_goTypes,
+		DependencyIndexes: file_chat_proto_depIdxs,
+		MessageInfos:      file_chat_proto_msgTypes,
 	}.Build()
-	File_proto_chat_proto = out.File
-	file_proto_chat_proto_goTypes = nil
-	file_proto_chat_proto_depIdxs = nil
+	File_chat_proto = out.File
+	file_chat_proto_goTypes = nil
+	file_chat_proto_depIdxs = nil
 }

+ 2 - 2
rpc/chat/chat_grpc.pb.go

@@ -2,7 +2,7 @@
 // versions:
 // - protoc-gen-go-grpc v1.5.1
 // - protoc             v3.15.1
-// source: proto/chat.proto
+// source: chat.proto
 
 package chat
 
@@ -197,5 +197,5 @@ var ChatService_ServiceDesc = grpc.ServiceDesc{
 			ServerStreams: true,
 		},
 	},
-	Metadata: "proto/chat.proto",
+	Metadata: "chat.proto",
 }

+ 2 - 1
rpc/config.json

@@ -40,5 +40,6 @@
   "replyLanguage": "拒收请回复R",
   "replyAutomatically": "已收到您的拒收反馈,感谢您的配合。",
   "webPort": "50053",
-  "grpcWebPort": "50051"
+  "grpcWebPort": "50051",
+  "password": "jianyu@123"
 }

+ 1 - 0
rpc/config/config.go

@@ -26,6 +26,7 @@ type Conf struct {
 	ReplyLanguage      string
 	GrpcWebPort        string
 	WebPort            string
+	Password           string
 }
 type Mongo struct {
 	Address string `json:"address"`

+ 50 - 19
rpc/main.go

@@ -6,9 +6,13 @@ import (
 	"context"
 	"fmt"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/credentials/insecure"
 	"google.golang.org/grpc/health"
 	"google.golang.org/grpc/health/grpc_health_v1"
 	"google.golang.org/grpc/keepalive"
+	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
 	"log"
 	"net"
 	"net/http"
@@ -32,25 +36,21 @@ type server struct {
 }
 
 func main() {
-
 	// 初始化配置检查
 	if config.DbConf == nil {
 		log.Fatal("配置未初始化")
 	}
-
 	// 创建服务器实例
 	srv := &server{
 		startTime: time.Now(),
 	}
-
 	// 设置上下文
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
-
 	go func() {
-		srv.initGRPCServer()
+		srv.initGRPCServer(config.DbConf.Password)
 		// 启动服务
-		srv.startServices(ctx)
+		srv.startServices()
 		// 设置定时任务
 		srv.setupTimedTasks(ctx)
 		// 等待终止信号
@@ -84,31 +84,30 @@ func (s *server) initHTTPServer() {
 	mux1 := http.NewServeMux()
 	xweb.RunBase(":"+config.DbConf.WebPort, mux1)
 }
-func (s *server) initGRPCServer() {
-	// 创建健康检查服务
-	s.healthServer = health.NewServer()
+func (s *server) initGRPCServer(password string) {
 	s.grpcServer = grpc.NewServer(
+		grpc.Creds(insecure.NewCredentials()),
+		grpc.ChainUnaryInterceptor(
+			PasswordAuthInterceptor(password),
+		),
 		grpc.KeepaliveParams(keepalive.ServerParameters{
-			Time:    60 * time.Second,
-			Timeout: 120 * time.Second,
+			Time:    3 * time.Minute,
+			Timeout: 30 * time.Second,
 		}),
 		grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
 			MinTime:             30 * time.Second,
 			PermitWithoutStream: true,
 		}),
-		grpc.MaxRecvMsgSize(20*1024*1024),
-		grpc.MaxSendMsgSize(20*1024*1024),
 	)
-
-	// 注册服务
-
+	// 3. 注册服务
 	RegisterChatServiceServer(s.grpcServer, service.Chatserver)
+	s.healthServer = health.NewServer()
 	grpc_health_v1.RegisterHealthServer(s.grpcServer, s.healthServer)
 	s.healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
 }
 
-func (s *server) startServices(ctx context.Context) {
-	// 启动gRPC服务
+func (s *server) startServices() {
+	// 只保留服务启动逻辑
 	lis, err := net.Listen("tcp", fmt.Sprintf(":%s", config.DbConf.GrpcWebPort))
 	if err != nil {
 		log.Fatalf("无法监听gRPC端口 %s: %v", config.DbConf.GrpcWebPort, err)
@@ -122,7 +121,6 @@ func (s *server) startServices(ctx context.Context) {
 		}
 	}()
 }
-
 func (s *server) setupTimedTasks(ctx context.Context) {
 	tasks := []struct {
 		interval time.Duration
@@ -186,3 +184,36 @@ func (s *server) gracefulShutdown() {
 
 	log.Println("服务已完全关闭")
 }
+
+// 认证拦截器
+func PasswordAuthInterceptor(password string) grpc.UnaryServerInterceptor {
+	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
+		// 从上下文中获取元数据
+		md, ok := metadata.FromIncomingContext(ctx)
+		if !ok {
+			log.Printf("认证失败: 缺少元数据 (请求方法: %s)", info.FullMethod)
+			return MessageAck{
+				MessageId: "缺少认证信息",
+			}, status.Error(codes.Unauthenticated, "缺少认证信息")
+		}
+
+		// 检查密码
+		pass := md.Get("password")
+		if len(pass) == 0 {
+			log.Printf("认证失败: 未提供密码 (请求方法: %s)", info.FullMethod)
+			return MessageAck{
+				MessageId: "未提供密码",
+			}, status.Error(codes.Unauthenticated, "未提供密码")
+		}
+
+		if pass[0] != password {
+			log.Printf("认证失败: 密码错误 (请求方法: %s)", info.FullMethod)
+			return MessageAck{
+				MessageId: "密码错误",
+			}, status.Error(codes.PermissionDenied, "密码错误")
+		}
+
+		log.Printf("认证成功 (请求方法: %s)", info.FullMethod)
+		return handler(ctx, req)
+	}
+}

+ 13 - 9
rpc/proto/chat.proto

@@ -2,31 +2,35 @@ syntax = "proto3";
 
 package chat;
 
-option go_package = "./chat;chat";
+option go_package = "../chat;chat";
 
 service ChatService {
-  rpc JoinChat(JoinRequest) returns (stream Message); // 服务端流
+  rpc JoinChat(JoinRequest) returns (stream Message);
   rpc SendMessage(Message) returns (MessageAck);
-  rpc Ping (PingRequest) returns (PingResponse);
-}
-message PingRequest {}  // 空请求
+  rpc Ping(PingRequest) returns (PingResponse);
 
+}
+message PingRequest {}
 message PingResponse {
-  string status = 1;  // 可以返回服务器状态
+  string status = 1;
+  int64 timestamp = 2;
 }
 message JoinRequest {
   string user_id = 1;
-  bool force = 2;
+  bool force=2;
 }
 
 message Message {
   string user_id = 1;
   string text = 2;
   int64 timestamp = 3;
-  string   action = 4;
+  string   action =4;
 }
 
 message MessageAck {
   bool success = 1;
   string message_id = 2;
-}
+}
+
+
+

+ 133 - 0
rpc/service/instructionOperation.go

@@ -0,0 +1,133 @@
+package service
+
+import (
+	util "app.yhyue.com/moapp/jybase/common"
+	"app.yhyue.com/moapp/jybase/date"
+	"fmt"
+	"github.com/gogf/gf/v2/util/gconv"
+	"log"
+	"rpc/config"
+	"time"
+)
+
+// 通讯录同步
+func SynchronousContacts(robotCode, contactsStr string) error {
+	// 1. 先更新所有联系人为待删除状态
+	err := config.WxRobot.Update("user_address_book",
+		map[string]interface{}{"robotCode": robotCode},
+		map[string]interface{}{"status": 1})
+
+	// 2. 转换输入数据并检查错误
+	contacts := gconv.Maps(contactsStr)
+	if contacts == nil || len(contacts) == 0 {
+		return fmt.Errorf("解析联系人数据失败: %v", err)
+	}
+
+	// 3. 批量处理联系人
+	for _, v := range contacts {
+		wxId := gconv.String(v["wxid"])
+		phone := gconv.String(v["phone"])
+
+		// 查询用户数据
+		datas, err := config.Mgo.Find("user", map[string]interface{}{
+			"i_appid": 2,
+			"$or": []map[string]interface{}{
+				{"s_phone": phone},
+				{"s_m_phone": phone},
+			},
+		}, `{"s_phone":-1}`, `{"s_phone":1,"s_m_phone":1,"base_user_id":1}`, false, 0, 1)
+
+		if datas == nil || len(*datas) == 0 {
+			// 没有匹配的用户,跳过处理(已标记为待删除)
+			continue
+		}
+		// 准备更新/插入数据
+		baseUserId := gconv.Int64((*datas)[0]["base_user_id"])
+		contactData := map[string]interface{}{
+			"remark":       gconv.String(v["remark"]),
+			"appellation":  gconv.String(v["appellation"]),
+			"phone":        phone,
+			"base_user_id": baseUserId,
+			"update_time":  time.Now().Format(date.Date_Full_Layout),
+			"status":       0,
+			"personName":   gconv.String(v["personName"]),
+		}
+
+		// 检查是否存在
+		count := config.WxRobot.Count("user_address_book",
+			map[string]interface{}{"wxid": wxId, "robotCode": robotCode})
+		ok := false
+		if count > 0 {
+			// 更新现有记录
+			ok = config.WxRobot.Update("user_address_book",
+				map[string]interface{}{"wxid": wxId, "robotCode": robotCode},
+				contactData)
+		} else {
+			data := config.WxRobot.FindOne("user_address_book", map[string]interface{}{
+				"wxid": wxId,
+			}, "", "")
+			is_refuse := 0
+			if data != nil && len(*datas) > 0 {
+				is_refuse = gconv.Int((*data)["is_refuse"])
+			}
+			// 插入新记录
+			contactData["wxid"] = wxId
+			contactData["code"] = gconv.String(v["code"])
+			contactData["name"] = gconv.String(v["name"])
+			contactData["is_refuse"] = is_refuse
+			contactData["create_time"] = time.Now().Format(date.Date_Full_Layout)
+			contactData["robotCode"] = robotCode
+			ok = config.WxRobot.Insert("user_address_book", contactData) > 0
+		}
+		if !ok {
+			// 记录错误但继续处理其他联系人
+			log.Printf("处理联系人 %s 失败: %v", wxId, err)
+		}
+	}
+	return nil
+}
+
+// 聊天记录保存
+func AddChatRecord(robotCode string, data string) {
+	chatData := gconv.Map(data)
+	wxId := gconv.String(chatData["roomid"])
+	content := gconv.String(chatData["content"])
+	userData := config.WxRobot.FindOne("user_address_book", map[string]interface{}{
+		"wxid":      wxId,
+		"robotCode": robotCode,
+	}, "", "")
+	if userData == nil {
+		return
+	}
+	if len(*userData) > 0 {
+		baseUserId := gconv.Int64((*userData)["base_user_id"])
+		if (content == "R" || content == "r") && !gconv.Bool(chatData["IsSelf"]) {
+			if config.WxRobot.Count("send_record", map[string]interface{}{
+				"base_user_id": baseUserId,
+				"send_status":  0,
+			}) > 0 {
+				config.WxRobot.Update("user_address_book", map[string]interface{}{
+					"wxid": wxId,
+				}, map[string]interface{}{
+					"is_refuse":   1,
+					"update_time": time.Now().Format(date.Date_Full_Layout),
+				})
+				//发送一条 拒绝自动回复
+				Chatserver.SpecifysystemMessage(robotCode, wxId, map[string]interface{}{
+					"content":      config.DbConf.ReplyAutomatically,
+					"content_type": 0,
+				}, "reject")
+			}
+
+		}
+		config.WxRobot.Insert("chat_history", map[string]interface{}{
+			"create_time":  time.Now().Format(date.Date_Full_Layout),
+			"robotCode":    robotCode,
+			"other_wxid":   wxId,
+			"content":      gconv.String(chatData["content"]),
+			"content_type": 1,
+			"is_own_send":  util.If(gconv.Bool(chatData["IsSelf"]), 1, 0),
+		})
+	}
+
+}

+ 0 - 131
rpc/service/service.go → rpc/service/scheduledTasks.go

@@ -2,23 +2,15 @@ package service
 
 import (
 	util "app.yhyue.com/moapp/jybase/common"
-	"app.yhyue.com/moapp/jybase/date"
 	"app.yhyue.com/moapp/jybase/redis"
 	"fmt"
 	"github.com/gogf/gf/v2/util/gconv"
 	"log"
 	"rpc/config"
 	"strings"
-	"sync"
 	"time"
 )
 
-var (
-	seq      int64
-	lastDay  string
-	seqMutex sync.Mutex
-)
-
 func Task() {
 	log.Println("定时任务查询")
 	allUserMap := InitUser()
@@ -175,129 +167,6 @@ func IsSameTime(timeStr1, timeStr2, layout string) bool {
 	return t1Str == timeStr2
 }
 
-// 通讯录同步
-func SynchronousContacts(robotCode, contactsStr string) error {
-	// 1. 先更新所有联系人为待删除状态
-	err := config.WxRobot.Update("user_address_book",
-		map[string]interface{}{"robotCode": robotCode},
-		map[string]interface{}{"status": 1})
-
-	// 2. 转换输入数据并检查错误
-	contacts := gconv.Maps(contactsStr)
-	if contacts == nil || len(contacts) == 0 {
-		return fmt.Errorf("解析联系人数据失败: %v", err)
-	}
-
-	// 3. 批量处理联系人
-	for _, v := range contacts {
-		wxId := gconv.String(v["wxid"])
-		phone := gconv.String(v["phone"])
-
-		// 查询用户数据
-		datas, err := config.Mgo.Find("user", map[string]interface{}{
-			"i_appid": 2,
-			"$or": []map[string]interface{}{
-				{"s_phone": phone},
-				{"s_m_phone": phone},
-			},
-		}, `{"s_phone":-1}`, `{"s_phone":1,"s_m_phone":1,"base_user_id":1}`, false, 0, 1)
-
-		if datas == nil || len(*datas) == 0 {
-			// 没有匹配的用户,跳过处理(已标记为待删除)
-			continue
-		}
-
-		// 准备更新/插入数据
-		baseUserId := gconv.Int64((*datas)[0]["base_user_id"])
-		contactData := map[string]interface{}{
-			"remark":       gconv.String(v["remark"]),
-			"appellation":  gconv.String(v["appellation"]),
-			"phone":        phone,
-			"base_user_id": baseUserId,
-			"update_time":  time.Now().Format(date.Date_Full_Layout),
-			"status":       0,
-		}
-
-		// 检查是否存在
-		count := config.WxRobot.Count("user_address_book",
-			map[string]interface{}{"wxid": wxId, "robotCode": robotCode})
-		ok := false
-		if count > 0 {
-			// 更新现有记录
-			ok = config.WxRobot.Update("user_address_book",
-				map[string]interface{}{"wxid": wxId, "robotCode": robotCode},
-				contactData)
-		} else {
-			data := config.WxRobot.FindOne("user_address_book", map[string]interface{}{
-				"wxid": wxId,
-			}, "", "")
-			is_refuse := 0
-			if data != nil && len(*datas) > 0 {
-				is_refuse = gconv.Int((*data)["is_refuse"])
-			}
-			// 插入新记录
-			contactData["wxid"] = wxId
-			contactData["code"] = gconv.String(v["code"])
-			contactData["name"] = gconv.String(v["name"])
-			contactData["is_refuse"] = is_refuse
-			contactData["create_time"] = time.Now().Format(date.Date_Full_Layout)
-			contactData["robotCode"] = robotCode
-			ok = config.WxRobot.Insert("user_address_book", contactData) > 0
-		}
-
-		if !ok {
-			// 记录错误但继续处理其他联系人
-			log.Printf("处理联系人 %s 失败: %v", wxId, err)
-		}
-	}
-	return nil
-}
-
-// 聊天记录保存
-func AddChatRecord(robotCode string, data string) {
-	chatData := gconv.Map(data)
-	wxId := gconv.String(chatData["roomid"])
-	content := gconv.String(chatData["content"])
-	userData := config.WxRobot.FindOne("user_address_book", map[string]interface{}{
-		"wxid":      wxId,
-		"robotCode": robotCode,
-	}, "", "")
-	if userData == nil {
-		return
-	}
-	if len(*userData) > 0 {
-		baseUserId := gconv.Int64((*userData)["base_user_id"])
-		if (content == "R" || content == "r") && !gconv.Bool(chatData["IsSelf"]) {
-			if config.WxRobot.Count("send_record", map[string]interface{}{
-				"base_user_id": baseUserId,
-				"send_status":  0,
-			}) > 0 {
-				config.WxRobot.Update("user_address_book", map[string]interface{}{
-					"wxid": wxId,
-				}, map[string]interface{}{
-					"is_refuse":   1,
-					"update_time": time.Now().Format(date.Date_Full_Layout),
-				})
-				//发送一条 拒绝自动回复
-				/*Chatserver.SpecifysystemMessage(robotCode, wxId, map[string]interface{}{
-					"content":      config.DbConf.ReplyAutomatically,
-					"content_type": 0,
-				}, "reject")*/
-			}
-
-		}
-		config.WxRobot.Insert("chat_history", map[string]interface{}{
-			"create_time":  time.Now().Format(date.Date_Full_Layout),
-			"robotCode":    robotCode,
-			"other_wxid":   wxId,
-			"content":      gconv.String(chatData["content"]),
-			"content_type": 1,
-			"is_own_send":  util.If(gconv.Bool(chatData["IsSelf"]), 1, 0),
-		})
-	}
-
-}
-
 // 用户信息初始化
 func InitUser() map[int64]map[string]interface{} {
 	allUserMap := make(map[int64]map[string]interface{})