소스 검색

邮件修改

Jianghan 1 년 전
부모
커밋
eb3d9fb4db

+ 1 - 0
CMPlatform/history/datamodel.go

@@ -189,6 +189,7 @@ type GlobalAddWord struct {
 */
 
 type XlsxData struct {
+	send      bool
 	xlsxArr   []map[string]interface{}
 	xlsxCount int
 }

+ 9 - 8
CMPlatform/history/historytask.go

@@ -13,7 +13,6 @@ import (
 	"time"
 
 	"app.yhyue.com/moapp/jybase/common"
-	"app.yhyue.com/moapp/jybase/date"
 	"app.yhyue.com/moapp/jybase/go-xweb/xweb"
 	"app.yhyue.com/moapp/jybase/log"
 	"app.yhyue.com/moapp/jybase/mongodb"
@@ -52,7 +51,7 @@ func (this *HistoryData) HistoryTask(history_id string) {
 	//加载一个客户
 	log.Debug("history_id", zap.String("history_id", history_id))
 	//是否根据项目id去重
-	xlsxData := &XlsxData{}
+	xlsxData := &XlsxData{send: true}
 	customer, _ := util.Mgo.Find("historylog", map[string]interface{}{"_id": mongodb.StringTOBsonId(history_id)}, nil, nil, false, -1, -1)
 	if len(*customer) == 1 {
 		c := (*customer)[0]
@@ -89,7 +88,8 @@ func (this *HistoryData) HistoryTask(history_id string) {
 		cus.SaveDataArr = []map[string]interface{}{}
 		cus.tempChan = make(chan *tempData, 2000)
 		cus.tempChanOver = make(chan bool)
-		cus.saveTempColl = fmt.Sprintf("temp_history_%s_%d", date.NowFormat(date.Date_yyyyMMdd), time.Now().Unix())
+		//cus.saveTempColl = fmt.Sprintf("temp_history_%s_%d", date.NowFormat(date.Date_yyyyMMdd), time.Now().Unix())
+		cus.saveTempColl = fmt.Sprintf("temp_history_%s", history_id)
 
 		cus.ID = customerId
 		cus.Name = customer_name
@@ -174,18 +174,19 @@ func (this *HistoryData) HistoryTask(history_id string) {
 				UpdateJyqyfwDatacount(appId, xlsxData.xlsxCount)
 			}
 		}
+		UpdateHistoryState(2, history_id, xlsxData.xlsxCount)
 		//发邮件
 		if i_pushtype == 0 {
 			time.Sleep(3 * time.Second)
-			if len(xlsxData.xlsxArr) != xlsxData.xlsxCount {
-				log.Error("excel数据量错误", zap.Int("xlsxArr", len(xlsxData.xlsxArr)), zap.Int("xlsxCount", xlsxData.xlsxCount))
+			if xlsxData.send {
+				if len(xlsxData.xlsxArr) != xlsxData.xlsxCount {
+					log.Error("excel数据量错误", zap.Int("xlsxArr", len(xlsxData.xlsxArr)), zap.Int("xlsxCount", xlsxData.xlsxCount))
+				}
+				GetXlsxs(xlsxData.xlsxArr, iContact, customer_name, email, history_id, isfile, isHenanMobile, isfilehref, appId)
 			}
-			GetXlsxs(xlsxData.xlsxArr, iContact, customer_name, email, history_id, isfile, isHenanMobile, isfilehref, appId)
-			UpdateHistoryState(2, history_id, xlsxData.xlsxCount)
 			xlsxData.xlsxArr = []map[string]interface{}{}
 			xlsxData.xlsxCount = 0
 		} else {
-			UpdateHistoryState(2, history_id, xlsxData.xlsxCount)
 			xlsxData.xlsxArr = []map[string]interface{}{}
 			xlsxData.xlsxCount = 0
 		}

+ 29 - 23
CMPlatform/history/task.go

@@ -274,7 +274,7 @@ func (c *Customer) EsConGetDataV7(stype string, dataSource int, esCon *elastic.E
 
 func (c *Customer) processedData() {
 	ch := make(chan bool, 10)
-	// L:
+L:
 	for {
 		select {
 		case tmp := <-c.tempChan:
@@ -285,25 +285,25 @@ func (c *Customer) processedData() {
 				}()
 				processedData_A(c, tmp)
 			}(tmp)
-			// default:
-			// 	select {
-			// 	case tmp := <-c.tempChan:
-			// 		ch <- true
-			// 		go func(tmp *tempData) {
-			// 			defer func() {
-			// 				<-ch
-			// 			}()
-			// 			processedData_A(c, tmp)
-			// 		}(tmp)
-			// 	case <-c.tempChanOver:
-			// 		c.saveTempLock.Lock()
-			// 		if len(c.saveTempArr) > 0 {
-			// 			util.MgoSave.UpSertBulk(c.saveTempColl, c.saveTempArr...)
-			// 			c.saveTempArr = [][]map[string]interface{}{}
-			// 		}
-			// 		c.saveTempLock.Unlock()
-			// 		break L
-			// 	}
+		default:
+			select {
+			case tmp := <-c.tempChan:
+				ch <- true
+				go func(tmp *tempData) {
+					defer func() {
+						<-ch
+					}()
+					processedData_A(c, tmp)
+				}(tmp)
+			case <-c.tempChanOver:
+				c.saveTempLock.Lock()
+				if len(c.saveTempArr) > 0 {
+					util.MgoSave.UpSertBulk(c.saveTempColl, c.saveTempArr...)
+					c.saveTempArr = [][]map[string]interface{}{}
+				}
+				c.saveTempLock.Unlock()
+				break L
+			}
 		}
 	}
 }
@@ -412,7 +412,9 @@ func processedData_A(c *Customer, data *tempData) {
 	/*
 		到此已经匹配完数据
 	*/
-	log.Debug("---------------------", zap.String("id", id), zap.Any("IsMatch", IsMatch))
+	if !IsMatch {
+		log.Debug("---------------------", zap.String("id", id), zap.Any("IsMatch", IsMatch))
+	}
 	if IsMatch {
 		//匹配成功,数据上新增规则id,matchKey,item并临时保存数据
 		// tmpMatchKey := MapDataToArr(matchKey)
@@ -625,11 +627,15 @@ func TagRuleFuc(appid string, pushModel, tag int, rules []*TagRule, sr *SearchRu
 }
 
 func (c *Customer) saveBeforeData(historyId string, isFilter, noticeFilter, dataTable, entId, iContact int, xlsxData *XlsxData) {
-	log.Debug("保存数据前置处理...")
+	log.Debug("保存数据前置处理..." + historyId)
 	sess := util.MgoSave.GetMgoConn()
 	defer util.MgoSave.DestoryMongoConn(sess)
 	ch := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
+	// 数据量大于2w 不发邮件
+	if util.MgoSave.Count(c.saveTempColl, nil) > 20000 {
+		xlsxData.send = false
+	}
 	query := sess.DB(util.MgoSave.DbName).C(c.saveTempColl).Find(nil).Select(nil).Iter()
 	count := 0
 	for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
@@ -649,7 +655,7 @@ func (c *Customer) saveBeforeData(historyId string, isFilter, noticeFilter, data
 		tmp = make(map[string]interface{})
 	}
 	wg.Wait()
-	log.Debug("saveBeforeData over ---", zap.Int("count", count))
+	log.Debug("saveBeforeData over ---", zap.Int("count", count), zap.String("historyId", historyId))
 }
 
 // 数据去重

+ 6 - 2
CMPlatform/history/util_history.go

@@ -740,7 +740,9 @@ func AssembelSave(tmp map[string]interface{}, IsSearchHosp, IsSearchEnps bool, h
 			if len(indexdata) > 0 {
 				for _, v := range indexdata {
 					MgoSaveCache <- v
-					xlsxData.xlsxArr = append(xlsxData.xlsxArr, v)
+					if xlsxData.send {
+						xlsxData.xlsxArr = append(xlsxData.xlsxArr, v)
+					}
 					xlsxData.xlsxCount++
 				}
 			}
@@ -998,7 +1000,9 @@ func AssembelSave(tmp map[string]interface{}, IsSearchHosp, IsSearchEnps bool, h
 			}
 		}
 		//MgoSaveCache <- tmp
-		xlsxData.xlsxArr = append(xlsxData.xlsxArr, tmp)
+		if xlsxData.send {
+			xlsxData.xlsxArr = append(xlsxData.xlsxArr, tmp)
+		}
 		xlsxData.xlsxCount++
 	}
 }

+ 1 - 1
CMPlatform/main.go

@@ -48,7 +48,7 @@ func init() {
 	xweb.AddAction(&push.Push{})
 	xweb.AddAction(&service.Groups{})
 	xweb.RootApp().AppConfig.SessionTimeout = 24 * time.Hour
-	xweb.RootApp().Logger.SetOutputLevel(1)
+	xweb.RootApp().Logger.SetOutputLevel(3)
 	go history.SaveMgo()
 	go history.SaveUsermailMgo()
 	mails, _ := util.Sysconfig["mail"].([]interface{})

+ 2 - 2
CMPlatform/util/config.go

@@ -409,9 +409,9 @@ func InitLog() {
 		log.Path(common.ObjToString(Sysconfig["logpath"])),
 		log.Level("debug"),
 		log.Compress(true),
-		log.MaxSize(20), // M
+		log.MaxSize(100), // M
 		log.MaxBackups(10),
-		log.MaxAge(7),
+		log.MaxAge(5),
 		log.Format("text"), // text or json output
 	)
 	if err != nil {

+ 23 - 23
CMPlatform/util/utiltag.go

@@ -1,21 +1,21 @@
 package util
 
 import (
+	"app.yhyue.com/moapp/jybase/common"
+	"app.yhyue.com/moapp/jybase/encrypt"
+	elastic "app.yhyue.com/moapp/jybase/es"
+	"app.yhyue.com/moapp/jybase/log"
 	"cmplatform/models"
 	sql "cmplatform/util/sqlmodel"
 	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
-	"log"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.uber.org/zap"
 	"regexp"
 	"strings"
 	"time"
-
-	"app.yhyue.com/moapp/jybase/common"
-	"app.yhyue.com/moapp/jybase/encrypt"
-	elastic "app.yhyue.com/moapp/jybase/es"
-	"go.mongodb.org/mongo-driver/bson"
 )
 
 const (
@@ -205,8 +205,8 @@ func searchData(index, esquery, sdataid string, i_maxnum int64, tags map[string]
 	searchResult, err := client.Search(index).Query(cc).Size(int(i_maxnum)).Do(ctx)
 	if err == nil && searchResult.Hits != nil {
 		datas := make([]map[string]interface{}, 0)
-		log.Println("es查询到的数量:", searchResult.Hits.TotalHits.Value)
-		log.Println("es查询数据数量:", len(searchResult.Hits.Hits))
+		log.Info(fmt.Sprintf("es查询到的数量:%d", searchResult.Hits.TotalHits.Value))
+		log.Info(fmt.Sprintf("es查询数据数量:%d", len(searchResult.Hits.Hits)))
 		for _, v := range searchResult.Hits.Hits {
 			item := make(map[string]interface{})
 			if json.Unmarshal(v.Source, &item) == nil {
@@ -298,14 +298,14 @@ func searchData(index, esquery, sdataid string, i_maxnum int64, tags map[string]
 			}
 		}
 		counts := Es.Count(index, Itype, esquery)
-		log.Println("esCount查询的数据量 ", counts)
+		log.Info(fmt.Sprintf("esCount查询的数据量 %d", counts))
 		Mgo.Update("cuserdepartrule", bson.M{"_id": tags["_id"]}, bson.M{
 			"$set": bson.M{
 				"i_estotal": counts,
 			}}, false, false)
 		return UtilEsSaveData(sdataid, index, &datas), counts
 	} else {
-		log.Println("err", err)
+		log.Error("err", zap.Error(err))
 		return err, 0
 	}
 }
@@ -346,7 +346,7 @@ func FilterTimeSql(esquery string, startTime, endTime int64) (string, error) {
 				if err == nil {
 					return string(strQuery), nil
 				} else {
-					log.Println("失败", err)
+					log.Error("失败", zap.Error(err))
 					return esquery, err
 				}
 			} else {
@@ -356,7 +356,7 @@ func FilterTimeSql(esquery string, startTime, endTime int64) (string, error) {
 			}
 		}
 	} else {
-		log.Println(json.Unmarshal([]byte(esquery), &query), "err")
+		log.Error("err", zap.Error(json.Unmarshal([]byte(esquery), &query)))
 		return esquery, errors.New("err")
 	}
 	return esquery, nil
@@ -403,7 +403,7 @@ func UtilEsFind2(tags map[string]interface{}, n int) (error, int64) {
 	//}
 	// 获取分隔区间
 	timeList := GetTimeInterval(time.Unix(common.Int64All(tags["i_starttime"]), 0), time.Unix(common.Int64All(tags["i_endtime"]), 0), n)
-	log.Println(timeList, "分隔后的时间===================")
+	log.Info("分隔后的时间===================", zap.Any("", timeList))
 	esQueryList := []string{}
 	//esquery = esquery[:len(esquery)-1] + `,"size":` + fmt.Sprintf("%d", i_maxnum) + `}`
 	// log.Println(esquery)
@@ -423,7 +423,7 @@ func UtilEsFind2(tags map[string]interface{}, n int) (error, int64) {
 					}
 				}
 			}
-			log.Println("timeList================", timeList)
+			log.Info("timeList================", zap.Any("", timeList))
 			// 生成新的es语句
 			for _, v := range timeList {
 				tmpRange_ := bson.M{
@@ -445,36 +445,36 @@ func UtilEsFind2(tags map[string]interface{}, n int) (error, int64) {
 					esQuery := string(strQuery)
 					esQueryList = append(esQueryList, esQuery)
 				} else {
-					log.Println("失败", err)
+					log.Error("失败", zap.Error(err))
 					return err, 0
 				}
 			}
 		}
 	} else {
-		log.Println(json.Unmarshal([]byte(esquery), &query), "err")
+		log.Error("err", zap.Error(json.Unmarshal([]byte(esquery), &query)))
 	}
 	// for _, v := range esQueryList {
 	// log.Println("es语句", v)
 	// }
-	log.Println("重新生成的es语句", esQueryList)
+	log.Info("重新生成的es语句", zap.Any("", esQueryList))
 	var totalCount int64
 	datas := make([]map[string]interface{}, 0)
 	for _, esSql := range esQueryList {
 		// 查询
 		esSql = esSql[:len(esSql)-1] + `,"size":` + fmt.Sprintf("%d", i_maxnum) + `}`
-		log.Println("本次查询==", esSql)
+		log.Info("本次查询==" + esSql)
 		index := EsIndex
 		if strings.Contains(esSql, "site") { //选择网站名称,使用全量数据索引
 			index = EsAllIndex
 		}
-		log.Println("es索引-------------------- ", index)
+		log.Info("es索引-------------------- " + index)
 		cc := &MySource{
 			Querys: esSql,
 		}
 		searchResult, err := client.Search(index).Query(cc).Do(ctx)
 		if err == nil && searchResult.Hits != nil {
-			log.Println("es查询到的数量:", searchResult.Hits.TotalHits)
-			log.Println("开始处理")
+			log.Info("es查询到的数量:", zap.Int64("", searchResult.Hits.TotalHits.Value))
+			log.Info("开始处理")
 
 			for _, v := range searchResult.Hits.Hits {
 				item := make(map[string]interface{})
@@ -579,12 +579,12 @@ func UtilEsFind2(tags map[string]interface{}, n int) (error, int64) {
 
 				}
 			}
-			log.Println("处理完成", len(datas))
+			log.Info("处理完成", zap.Int("count", len(datas)))
 
 			count := searchResult.Hits.TotalHits.Value
 			totalCount += count
 		} else {
-			log.Println(err)
+			log.Error("err", zap.Error(err))
 			return err, 0
 		}
 	}