Forráskód Böngészése

消息中心发送消息去掉并发

jiaojiao7 3 éve
szülő
commit
fab6397a1e

+ 8 - 4
src/customerService/newsService.go

@@ -618,6 +618,7 @@ func SendMsg(param *Message, sendStatus int) (int, error) {
 	log.Println("*********", msgLogId)
 	log.Println("*********", msgLogId)
 	//立即发送
 	//立即发送
 	if param.SendMode == 2 {
 	if param.SendMode == 2 {
+		j := 0
 		projectIdMap := sync.Map{}
 		projectIdMap := sync.Map{}
 		orm := util.Tidb.NewSession()
 		orm := util.Tidb.NewSession()
 		err := orm.Begin()
 		err := orm.Begin()
@@ -644,24 +645,26 @@ func SendMsg(param *Message, sendStatus int) (int, error) {
 			UserId string `xorm:"userid"`
 			UserId string `xorm:"userid"`
 		})
 		})
 		rows, err := orm.Table("groupcustomers g").Select("u.userid").Join("LEFT", "customers_user u", "g.customerid = u.customerid").In("g.groupid ", strings.Split(param.UserGroupId, ",")).Rows(user)
 		rows, err := orm.Table("groupcustomers g").Select("u.userid").Join("LEFT", "customers_user u", "g.customerid = u.customerid").In("g.groupid ", strings.Split(param.UserGroupId, ",")).Rows(user)
-		if err != nil {
+		if err != nil || rows == nil {
 			log.Println("查询tidb中剑鱼用户出错:", err)
 			log.Println("查询tidb中剑鱼用户出错:", err)
 			return 0, err
 			return 0, err
 		}
 		}
 		log.Println("rows", rows)
 		log.Println("rows", rows)
 		defer rows.Close()
 		defer rows.Close()
 		for rows.Next() {
 		for rows.Next() {
+			j++
 			err = rows.Scan(user)
 			err = rows.Scan(user)
 			if err != nil {
 			if err != nil {
 				log.Println("迭代数据出错", err)
 				log.Println("迭代数据出错", err)
 			}
 			}
-			log.Println("单条数据:", user.UserId)
+			//log.Println("单条数据:", user.UserId)
 			userId := user.UserId
 			userId := user.UserId
 			i++
 			i++
 			if config.SysConfigs.UserIdMap[userId] != "" {
 			if config.SysConfigs.UserIdMap[userId] != "" {
 				userId = config.SysConfigs.UserIdMap[userId]
 				userId = config.SysConfigs.UserIdMap[userId]
 			}
 			}
 			if _, ok := projectIdMap.Load(userId); ok {
 			if _, ok := projectIdMap.Load(userId); ok {
+				log.Println("########################已发送,本次跳过。。。。。。。。。。。。。", userId)
 				continue
 				continue
 			} else {
 			} else {
 				projectIdMap.Store(userId, true)
 				projectIdMap.Store(userId, true)
@@ -711,7 +714,7 @@ func SendMsg(param *Message, sendStatus int) (int, error) {
 			}
 			}
 			if i == 100 {
 			if i == 100 {
 				//调用消息中台
 				//调用消息中台
-				go util.MultipleSaveMessage(msg, userIds, userNames)
+				util.MultipleSaveMessage(msg, userIds, userNames)
 				userNames = ""
 				userNames = ""
 				userIds = ""
 				userIds = ""
 				i = 0
 				i = 0
@@ -720,11 +723,12 @@ func SendMsg(param *Message, sendStatus int) (int, error) {
 		}
 		}
 		if i > 0 {
 		if i > 0 {
 			//调用中台接口
 			//调用中台接口
-			go util.MultipleSaveMessage(msg, userIds, userNames)
+			util.MultipleSaveMessage(msg, userIds, userNames)
 			userNames = ""
 			userNames = ""
 			userIds = ""
 			userIds = ""
 			i = 0
 			i = 0
 		}
 		}
+		log.Println("********************总数:", j)
 		return 1, nil
 		return 1, nil
 	}
 	}
 	return 0, errors.New("发送消息出错")
 	return 0, errors.New("发送消息出错")

+ 2 - 2
src/github.com/coreos/etcd/vendor/github.com/coreos/pkg/capnslog/init.go

@@ -38,12 +38,12 @@ func init() {
 }
 }
 
 
 func NewDefaultFormatter(out io.Writer) Formatter {
 func NewDefaultFormatter(out io.Writer) Formatter {
-	if syscall.Getppid() == 1 {
+	/*if syscall.Getppid() == 1 {
 		// We're running under init, which may be systemd.
 		// We're running under init, which may be systemd.
 		f, err := NewJournaldFormatter()
 		f, err := NewJournaldFormatter()
 		if err == nil {
 		if err == nil {
 			return f
 			return f
 		}
 		}
-	}
+	}*/
 	return NewPrettyFormatter(out, false)
 	return NewPrettyFormatter(out, false)
 }
 }

+ 2 - 2
src/task/task2.go

@@ -304,7 +304,7 @@ func TaskSaveMsg(msgId int, msg *map[string]interface{}, androidUrl, iosUrl stri
 		"msgType":    msgType,
 		"msgType":    msgType,
 		"link":       msgs["link"],
 		"link":       msgs["link"],
 		"appid":      util.AppId,
 		"appid":      util.AppId,
-		"msgLogId":   int64(msgId),
+		"msgLogId":   strconv.Itoa(msgId),
 	}
 	}
 	//本次次改不调用荟聚接口,改为直接查mysql
 	//本次次改不调用荟聚接口,改为直接查mysql
 	user := new(struct {
 	user := new(struct {
@@ -321,7 +321,7 @@ func TaskSaveMsg(msgId int, msg *map[string]interface{}, androidUrl, iosUrl stri
 		if err != nil {
 		if err != nil {
 			log.Println("迭代数据出错", err)
 			log.Println("迭代数据出错", err)
 		}
 		}
-		log.Println("单条数据:", user.UserId)
+		//log.Println("单条数据:", user.UserId)
 		userId := user.UserId
 		userId := user.UserId
 		i++
 		i++
 		if config.SysConfigs.UserIdMap[userId] != "" {
 		if config.SysConfigs.UserIdMap[userId] != "" {