wangchuanjin 2 سال پیش
والد
کامیت
a86572e592
1فایلهای تغییر یافته به همراه55 افزوده شده و 87 حذف شده
  1. 55 87
      jyBXSubscribe/rpc/model/push.go

+ 55 - 87
jyBXSubscribe/rpc/model/push.go

@@ -1,19 +1,9 @@
 package model
 
 import (
-	"app.yhyue.com/moapp/jybase/common"
-	"app.yhyue.com/moapp/jybase/date"
-	"app.yhyue.com/moapp/jybase/encrypt"
-	"app.yhyue.com/moapp/jybase/esv1"
-	"app.yhyue.com/moapp/jybase/mongodb"
-	"app.yhyue.com/moapp/jybase/mysql"
-	"app.yhyue.com/moapp/jybase/redis"
-	"bp.jydev.jianyu360.cn/BaseService/jyCodeService/rpc/codeservice/codeservice"
 	"context"
 	"encoding/json"
 	"fmt"
-	"github.com/zeromicro/go-zero/core/logx"
-	"go.mongodb.org/mongo-driver/bson/primitive"
 	IC "jyBXSubscribe/rpc/init"
 	ms "jyBXSubscribe/rpc/model/service"
 	"jyBXSubscribe/rpc/type/bxsubscribe"
@@ -22,6 +12,17 @@ import (
 	"strings"
 	"sync"
 	"time"
+
+	"app.yhyue.com/moapp/jybase/common"
+	"app.yhyue.com/moapp/jybase/date"
+	"app.yhyue.com/moapp/jybase/encrypt"
+	"app.yhyue.com/moapp/jybase/esv1"
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"app.yhyue.com/moapp/jybase/mysql"
+	"app.yhyue.com/moapp/jybase/redis"
+	"bp.jydev.jianyu360.cn/BaseService/jyCodeService/rpc/codeservice/codeservice"
+	"github.com/zeromicro/go-zero/core/logx"
+	"go.mongodb.org/mongo-driver/bson/primitive"
 )
 
 const (
@@ -805,16 +806,10 @@ func (s *subscribePush) DefaultDatas(spqp *SubPushQueryParam) (hasNextPage bool,
 		//logx.Info(time.Since(t1), "count:", len(*list))
 		if list != nil && len(*list) > 0 {
 			total = int64(len(*list))
-			//超过50条先处理50条 返回前50条
-			listOne := *list
-			if len(*list) > pageSize {
-				listOne = (*list)[:pageSize]
-			}
-			result = s.listManager(spqp, listOne, bsp.Keyword, (len(listOne)+pageSize)/pageSize)
-			if len(*list) > pageSize {
+			result = s.listManager(spqp, *list, bsp.Keyword)
+			if len(result) > pageSize {
+				result = result[:pageSize]
 				hasNextPage = true
-				listOther := (*list)[pageSize:]
-				go s.listManager(spqp, listOther, bsp.Keyword, (len(listOther)+pageSize)/pageSize)
 			}
 		}
 	}
@@ -823,76 +818,49 @@ func (s *subscribePush) DefaultDatas(spqp *SubPushQueryParam) (hasNextPage bool,
 }
 
 // 保存推送表
-func (s *subscribePush) listManager(spqp *SubPushQueryParam, list []map[string]interface{}, keyword []ViewKeyWord, ccount int) (resultList []*bxsubscribe.SubscribeInfo) {
+func (s *subscribePush) listManager(spqp *SubPushQueryParam, list []map[string]interface{}, keyword []ViewKeyWord) (resultList []*bxsubscribe.SubscribeInfo) {
 	t2 := time.Now()
-	//now := time.Now().Unix()
-	var (
-		wg = &sync.WaitGroup{}
-		wc = make(chan bool, ccount)
-	)
+	var pushInsert = []interface{}{}
 	for _, v := range list {
-		wg.Add(1)
-		wc <- true
-		go func(v map[string]interface{}) {
-			defer func() {
-				wg.Done()
-				<-wc
-			}()
-			//redis.Put("pushcache_2_a", redisKey, 1, 86400)
-			title := strings.Replace(common.ObjToString(v["title"]), "\n", "", -1)
-			infoid := common.InterfaceToStr(v["_id"])
-			matchkeys := getKeys(title, keyword)
-			resultList = append(resultList, s.InfoFormat(&PushCa{
-				InfoId:     infoid,
-				Date:       time.Now().Unix(),
-				Index:      1,
-				Keys:       matchkeys,
-				FileExists: v["filetext"] != nil,
-			}, &v))
-			/*redisKey := fmt.Sprintf("pushinfo_%s_%s", spqp.UserId, common.ObjToString(v["_id"]))
-			  title := strings.Replace(common.ObjToString(v["title"]), "\n", "", -1)
-			  var pushInsert = []interface{}{}
-			  infoid := common.InterfaceToStr(v["_id"])
-			  log.Println("infoid", infoid)
-			  spqp.BaseServiceMysql.ExecTx("推送记录保存", func(tx *sql.Tx) bool {
-			  	//推送记录
-			  	matchkeys := getKeys(title, keyword)
-			  	matchkey := strings.Join(matchkeys, " ")
-			  	entid := spqp.EntId
-			  	entUserId := spqp.EntUserId
-			  	deptid := spqp.DeptId
-			  	id := int64(0)
-			  	switch s.ModuleFlag {
-			  	case "eType":
-			  		pushInsert = append(pushInsert, entid, deptid, entUserId, common.InterfaceToStr(infoid), matchkey, now)
-			  		id, _ = spqp.BaseServiceMysql.InsertIgnoreBatch(aboutDbMsg[s.ModuleFlag].MysqlTable, ennicheInsertCollKey, pushInsert)
-			  	case "mType":
-			  		pushInsert = append(pushInsert, common.InterfaceToStr(spqp.NewUserId), common.InterfaceToStr(infoid), matchkey, now)
-			  		id, _ = spqp.BaseServiceMysql.InsertIgnoreBatch(aboutDbMsg[s.ModuleFlag].MysqlTable, memberInsertCollKey, pushInsert)
-			  	case "vType":
-			  		pushInsert = append(pushInsert, common.InterfaceToStr(spqp.NewUserId), common.InterfaceToStr(infoid), matchkey, now, 1)
-			  		id, _ = spqp.BaseServiceMysql.InsertIgnoreBatch(aboutDbMsg[s.ModuleFlag].MysqlTable, subscribeInsertCollKey, pushInsert)
-			  	case "fType":
-			  		pushInsert = append(pushInsert, common.InterfaceToStr(spqp.NewUserId), common.InterfaceToStr(infoid), matchkey, now, 0)
-			  		id, _ = spqp.BaseServiceMysql.InsertIgnoreBatch(aboutDbMsg[s.ModuleFlag].MysqlTable, subscribeInsertCollKey, pushInsert)
-
-			  	}
-			  	if id > 0 {
-			  		redis.Put("pushcache_2_a", redisKey, 1, 86400)
-			  		resultList = append(resultList, s.InfoFormat(&PushCa{
-			  			InfoId:     infoid,
-			  			Date:       time.Now().Unix(),
-			  			Index:      id,
-			  			Keys:       matchkeys,
-			  			FileExists: v["filetext"] != nil,
-			  		}, &v))
-			  		return true
-			  	}
-			  	return false
-			  })*/
-		}(v)
-	}
-	wg.Wait()
+		title := strings.Replace(common.ObjToString(v["title"]), "\n", "", -1)
+		infoid := common.InterfaceToStr(v["_id"])
+		matchkeys := getKeys(title, keyword)
+		matchkey := strings.Join(matchkeys, " ")
+		redisKey := fmt.Sprintf("pushinfo_%s_%s", spqp.UserId, common.ObjToString(v["_id"]))
+		entid := spqp.EntId
+		entUserId := spqp.EntUserId
+		deptid := spqp.DeptId
+		switch s.ModuleFlag {
+		case "eType":
+			pushInsert = append(pushInsert, entid, deptid, entUserId, common.InterfaceToStr(infoid), matchkey, now)
+		case "mType":
+			pushInsert = append(pushInsert, common.InterfaceToStr(spqp.NewUserId), common.InterfaceToStr(infoid), matchkey, now)
+		case "vType":
+			pushInsert = append(pushInsert, common.InterfaceToStr(spqp.NewUserId), common.InterfaceToStr(infoid), matchkey, now, 1)
+		case "fType":
+			pushInsert = append(pushInsert, common.InterfaceToStr(spqp.NewUserId), common.InterfaceToStr(infoid), matchkey, now, 0)
+		}
+		redis.Put("pushcache_2_a", redisKey, 1, 86400)
+		resultList = append(resultList, s.InfoFormat(&PushCa{
+			InfoId:     infoid,
+			Date:       time.Now().Unix(),
+			Keys:       matchkeys,
+			FileExists: v["filetext"] != nil,
+		}, &v))
+	}
+	id := int64(0)
+	switch s.ModuleFlag {
+	case "eType":
+		id, _ = spqp.BaseServiceMysql.InsertIgnoreBatch(aboutDbMsg[s.ModuleFlag].MysqlTable, ennicheInsertCollKey, pushInsert)
+	case "mType":
+		id, _ = spqp.BaseServiceMysql.InsertIgnoreBatch(aboutDbMsg[s.ModuleFlag].MysqlTable, memberInsertCollKey, pushInsert)
+	default:
+		id, _ = spqp.BaseServiceMysql.InsertIgnoreBatch(aboutDbMsg[s.ModuleFlag].MysqlTable, subscribeInsertCollKey, pushInsert)
+	}
+	for i := 0; i < len(resultList); i++ {
+		resultList[i].Index = id
+		id--
+	}
 	log.Println("数据处理耗时:", time.Since(t2))
 	return
 }