소스 검색

优化代码

WH01243 2 주 전
부모
커밋
d61699ce6e
7개의 변경된 파일106개의 추가작업 그리고 126개의 파일을 삭제
  1. 1 1
      client/config.json
  2. 1 1
      client/log.go
  3. 36 20
      client/service/chatClient.go
  4. 1 52
      rpc/go.mod
  5. 2 6
      rpc/go.sum
  6. 65 45
      rpc/service/service.go
  7. 0 1
      rpc/service/wxRobot.go

+ 1 - 1
client/config.json

@@ -1,4 +1,4 @@
 {
-  "serviceAddress": "192.168.137.1:50051",
+  "serviceAddress": "jybx3-webtest.jydev.jianyu360.com:50051",
   "informationDelay": 20
 }

+ 1 - 1
client/log.go

@@ -37,7 +37,7 @@ func initLog(saveDay int) {
 	go logfile()
 	task := cron.New()
 	task.Start()
-	task.AddFunc("0 0 0/24 * * ?", func() {
+	task.AddFunc("0 0 0/12 * * ?", func() {
 		go logfile()
 		time.Sleep(50 * time.Second)
 		if saveDay > 0 {

+ 36 - 20
client/service/chatClient.go

@@ -119,7 +119,14 @@ func (c *ChatClient) connect() error {
 	c.retryCount = 0
 	c.lastPingTime = time.Now()
 	// 启动健康检查
-	c.startHealthCheck()
+	if c.healthCheckTicker != nil {
+		c.healthCheckTicker.Stop()
+		c.healthCheckTicker = nil
+	}
+	go func() {
+		time.Sleep(10 * time.Second) // 给连接稳定时间
+		c.startHealthCheck()
+	}()
 	log.Printf("[连接][用户:%s] 服务器连接成功", c.userID)
 	return nil
 }
@@ -237,37 +244,39 @@ func (c *ChatClient) establishStream() {
 	c.wg.Add(1)
 	defer c.wg.Done()
 
+	retryDelay := time.Second
 	for {
 		select {
 		case <-c.ctx.Done():
 			return
 		default:
-			c.mu.RLock()
-			connected := c.isConnected
-			client := c.client
-			c.mu.RUnlock()
-
-			if !connected {
-				time.Sleep(1 * time.Second)
+			// 添加连接状态检查
+			if !c.isReady() {
+				time.Sleep(retryDelay)
+				retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
 				continue
 			}
 
-			c.streamMutex.Lock()
-			stream, err := client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID})
+			// 尝试建立流
+			stream, err := func() (ChatService_JoinChatClient, error) {
+				c.streamMutex.Lock()
+				defer c.streamMutex.Unlock()
+				return c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID})
+			}()
+
 			if err != nil {
-				c.streamMutex.Unlock()
-				log.Printf("[流] 建立流失败: %v", err)
-				c.disconnect()
-				go c.reconnect()
-				return
+				log.Printf("[流] 建立流失败: %v,等待重试...", err)
+				time.Sleep(retryDelay)
+				retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
+				continue
 			}
-			c.stream = stream
-			c.streamMutex.Unlock()
 
+			// 重置重试延迟
+			retryDelay = time.Second
+
+			// 处理消息
 			if err := c.receiveMessages(stream); err != nil {
 				log.Printf("[流] 接收消息错误: %v", err)
-				c.disconnect()
-				go c.reconnect()
 				return
 			}
 		}
@@ -353,7 +362,14 @@ func ConnectGRPC(userId, address string) {
 func (c *ChatClient) isReady() bool {
 	c.mu.RLock()
 	defer c.mu.RUnlock()
-	return c.isConnected && c.conn != nil && c.conn.GetState() == connectivity.Ready
+
+	if !c.isConnected || c.conn == nil {
+		return false
+	}
+
+	state := c.conn.GetState()
+	// 只有当状态是Ready时才返回true
+	return state == connectivity.Ready
 }
 
 // startHealthCheck 启动健康检查

+ 1 - 52
rpc/go.mod

@@ -1,55 +1,4 @@
 module rpc
 
-go 1.23.0
+go 1.20
 
-
-require (
-	app.yhyue.com/moapp/jybase v0.0.0-20250509080440-038d69d3ad3b
-	github.com/ClickHouse/clickhouse-go/v2 v2.2.0
-	github.com/go-xweb/xweb v0.2.1
-	github.com/gogf/gf/v2 v2.7.0
-	github.com/robfig/cron v1.2.0
-	google.golang.org/grpc v1.73.0
-	google.golang.org/protobuf v1.36.6
-)
-
-require (
-	filippo.io/edwards25519 v1.1.0 // indirect
-	github.com/fatih/color v1.16.0 // indirect
-	github.com/fsnotify/fsnotify v1.7.0 // indirect
-	github.com/garyburd/redigo v1.6.2 // indirect
-	github.com/go-sql-driver/mysql v1.8.1 // indirect
-	github.com/go-xweb/httpsession v0.0.0-20141220075701-356d3b4d38d6 // indirect
-	github.com/go-xweb/log v0.0.0-20140701090824-270d183ad77e // indirect
-	github.com/go-xweb/uuid v0.0.0-20140604020037-d7dce341f851 // indirect
-	github.com/golang/snappy v0.0.4 // indirect
-	github.com/gomodule/redigo v2.0.0+incompatible // indirect
-	github.com/google/uuid v1.6.0 // indirect
-	github.com/howeyc/fsnotify v0.9.0 // indirect
-	github.com/jinzhu/inflection v1.0.0 // indirect
-	github.com/jinzhu/now v1.1.1 // indirect
-	github.com/klauspost/compress v1.18.0 // indirect
-	github.com/lunny/csession v0.0.0-20130910075847-fe53c5de3dfd // indirect
-	github.com/magiconair/properties v1.8.7 // indirect
-	github.com/mattn/go-sqlite3 v1.14.28 // indirect
-	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
-	github.com/paulmach/orb v0.11.1 // indirect
-	github.com/pierrec/lz4/v4 v4.1.22 // indirect
-	github.com/shopspring/decimal v1.4.0 // indirect
-	github.com/xdg-go/pbkdf2 v1.0.0 // indirect
-	github.com/xdg-go/scram v1.1.2 // indirect
-	github.com/xdg-go/stringprep v1.0.4 // indirect
-	github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
-	go.mongodb.org/mongo-driver v1.14.0 // indirect
-	go.opentelemetry.io/otel v1.36.0 // indirect
-	go.opentelemetry.io/otel/sdk v1.36.0 // indirect
-	go.opentelemetry.io/otel/trace v1.36.0 // indirect
-	golang.org/x/crypto v0.39.0 // indirect
-	golang.org/x/net v0.41.0 // indirect
-	golang.org/x/sync v0.15.0 // indirect
-	golang.org/x/sys v0.33.0 // indirect
-	golang.org/x/text v0.26.0 // indirect
-	google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect
-	gorm.io/driver/mysql v1.0.5 // indirect
-	gorm.io/gorm v1.21.3 // indirect
-)

+ 2 - 6
rpc/go.sum

@@ -45,8 +45,6 @@ github.com/gogf/gf/v2 v2.7.0 h1:CjxhbMiE7oqf6K8ZtGuKt3dQEwK4vL6LhiI+dI7tJGU=
 github.com/gogf/gf/v2 v2.7.0/go.mod h1:Qu8nimKt9aupJQcdUL85tWF4Mfxocz97zUt8UC4abVI=
 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
 github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
-github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
-github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
@@ -158,8 +156,6 @@ go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCRE
 go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs=
 go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs=
 go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY=
-go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
-go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
 go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
 go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w=
 go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
@@ -224,8 +220,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g=
 google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
-google.golang.org/grpc v1.73.0 h1:VIWSmpI2MegBtTuFt5/JWy2oXxtjJ/e89Z70ImfD2ok=
-google.golang.org/grpc v1.73.0/go.mod h1:50sbHOUqWoCQGI8V2HQLJM0B+LMlIUjNSZmow7EVBQc=
+google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
+google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=

+ 65 - 45
rpc/service/service.go

@@ -176,63 +176,80 @@ func IsSameTime(timeStr1, timeStr2, layout string) bool {
 }
 
 // 通讯录同步
-func SynchronousContacts(robotCode, contactsStr string) {
-	config.WxRobot.Update("user_address_book", map[string]interface{}{
-		"robotCode": robotCode,
-	}, map[string]interface{}{
-		"status": 1,
-	})
-	for _, v := range gconv.Maps(contactsStr) {
+func SynchronousContacts(robotCode, contactsStr string) error {
+	// 1. 先更新所有联系人为待删除状态
+	err := config.WxRobot.Update("user_address_book",
+		map[string]interface{}{"robotCode": robotCode},
+		map[string]interface{}{"status": 1})
+
+	// 2. 转换输入数据并检查错误
+	contacts := gconv.Maps(contactsStr)
+	if contacts == nil || len(contacts) == 0 {
+		return fmt.Errorf("解析联系人数据失败: %v", err)
+	}
+
+	// 3. 批量处理联系人
+	for _, v := range contacts {
 		wxId := gconv.String(v["wxid"])
 		phone := gconv.String(v["phone"])
-		datas, _ := config.Mgo.Find("user", map[string]interface{}{
+
+		// 查询用户数据
+		datas, err := config.Mgo.Find("user", map[string]interface{}{
 			"i_appid": 2,
 			"$or": []map[string]interface{}{
 				{"s_phone": phone},
 				{"s_m_phone": phone},
 			},
 		}, `{"s_phone":-1}`, `{"s_phone":1,"s_m_phone":1,"base_user_id":1}`, false, 0, 1)
-		if datas != nil || len(*datas) == 0 {
-			//删除通讯录
-			config.WxRobot.Update("user_address_book", map[string]interface{}{
-				"wxid":      wxId,
-				"robotCode": robotCode,
-			}, map[string]interface{}{
-				"status": 1,
-			})
+
+		if datas == nil || len(*datas) == 0 {
+			// 没有匹配的用户,跳过处理(已标记为待删除)
+			continue
 		}
-		//做一个通讯录变更
+
+		// 准备更新/插入数据
 		baseUserId := gconv.Int64((*datas)[0]["base_user_id"])
-		if config.WxRobot.Count("user_address_book", map[string]interface{}{
-			"wxid": wxId, "robotCode": robotCode}) > 0 {
-			//修改数据
-			config.WxRobot.Update("user_address_book", map[string]interface{}{
-				"wxid":      wxId,
-				"robotCode": robotCode,
-			}, map[string]interface{}{
-				"remark":       gconv.String(v["remark"]),
-				"phone":        phone,
-				"base_user_id": baseUserId,
-				"update_time":  time.Now().Format(date.Date_Full_Layout),
-				"status":       0,
-			})
+		contactData := map[string]interface{}{
+			"remark":       gconv.String(v["remark"]),
+			"phone":        phone,
+			"base_user_id": baseUserId,
+			"update_time":  time.Now().Format(date.Date_Full_Layout),
+			"status":       0,
+		}
+
+		// 检查是否存在
+		count := config.WxRobot.Count("user_address_book",
+			map[string]interface{}{"wxid": wxId, "robotCode": robotCode})
+		ok := false
+		if count > 0 {
+			// 更新现有记录
+			ok = config.WxRobot.Update("user_address_book",
+				map[string]interface{}{"wxid": wxId, "robotCode": robotCode},
+				contactData)
 		} else {
-			//新增数据
-			config.WxRobot.Insert("user_address_book", map[string]interface{}{
-				"remark":       gconv.String(v["remark"]),
-				"phone":        phone,
-				"base_user_id": baseUserId,
-				"status":       0,
-				"wxid":         wxId,
-				"code":         gconv.String(v["code"]),
-				"name":         gconv.String(v["name"]),
-				"is_refuse":    0,
-				"create_time":  time.Now().Format(date.Date_Full_Layout),
-				"update_time":  time.Now().Format(date.Date_Full_Layout),
-				"robotCode":    robotCode,
-			})
+			data := config.WxRobot.FindOne("user_address_book", map[string]interface{}{
+				"wxid": wxId,
+			}, "", "")
+			is_refuse := 0
+			if data != nil && len(*datas) > 0 {
+				is_refuse = gconv.Int((*data)["is_refuse"])
+			}
+			// 插入新记录
+			contactData["wxid"] = wxId
+			contactData["code"] = gconv.String(v["code"])
+			contactData["name"] = gconv.String(v["name"])
+			contactData["is_refuse"] = is_refuse
+			contactData["create_time"] = time.Now().Format(date.Date_Full_Layout)
+			contactData["robotCode"] = robotCode
+			ok = config.WxRobot.Insert("user_address_book", contactData) > 0
+		}
+
+		if !ok {
+			// 记录错误但继续处理其他联系人
+			log.Printf("处理联系人 %s 失败: %v", wxId, err)
 		}
 	}
+	return nil
 }
 
 // 聊天记录保存
@@ -244,6 +261,9 @@ func AddChatRecord(robotCode string, data string) {
 		"wxid":      wxId,
 		"robotCode": robotCode,
 	}, "", "")
+	if userData == nil {
+		return
+	}
 	if len(*userData) > 0 {
 		baseUserId := gconv.Int64((*userData)["base_user_id"])
 		if (content == "R" || content == "r") && !gconv.Bool(chatData["IsSelf"]) {
@@ -252,7 +272,7 @@ func AddChatRecord(robotCode string, data string) {
 				"send_status":  0,
 			}) > 0 {
 				config.WxRobot.Update("user_address_book", map[string]interface{}{
-					"base_user_id": baseUserId,
+					"wxid": wxId,
 				}, map[string]interface{}{
 					"is_refuse":   1,
 					"update_time": time.Now().Format(date.Date_Full_Layout),

+ 0 - 1
rpc/service/wxRobot.go

@@ -98,7 +98,6 @@ func (a *WXroBot) UpdateTaskHandler() {
 			"task_type":       req.TaskType,
 			"send_time_type":  req.SendTimeType,
 			"send_time":       req.SendTime,
-			"task_status":     0,
 			"update_time":     time.Now().Format(time.DateTime),
 			"send_group_name": req.SendGroupName,
 		})