Ver código fonte

重启,redis判重

xuzhiheng 5 anos atrás
pai
commit
78c74743e8

Diferenças do arquivo suprimidas por serem muito extensas
+ 0 - 47
customerdata/src/config.json


+ 3 - 0
customerdata/src/main.go

@@ -5,6 +5,7 @@ import (
 	"log"
 	qu "qfw/util"
 	"qfw/util/elastic"
+	"qfw/util/redis"
 	"sync"
 	"util/mgodb"
 
@@ -21,6 +22,7 @@ type sysconfig struct {
 	Enterprise    map[string]interface{} `json:"enterprise"`
 	Save          map[string]interface{} `json:"save"`
 	Es            map[string]interface{} `json:"es"`
+	RedisAddrs    string                 `json:"redis_addrs"`
 	ProjectAppid  []string               `json:"project_appid"`
 	SWinnerFilter []string               `json:"s_winner_filter"`
 }
@@ -98,6 +100,7 @@ func init() {
 		I_size:  qu.IntAllDef(es["pool"], 15),
 	}
 	Es.InitElasticSize()
+	redis.InitRedis(Sysconfig.RedisAddrs)
 	Index = qu.ObjToString(es["index"])
 	Itype = qu.ObjToString(es["itype"])
 	//

+ 12 - 3
customerdata/src/task.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"log"
 	qu "qfw/util"
+	"qfw/util/redis"
 	"regexp"
 	. "sqlmodel"
 	"strings"
@@ -37,14 +38,13 @@ func StartTask() {
 func GetCustomerData() {
 	defer qu.Catch()
 	log.Println("Init Customer...")
-	idRange, ok := GetIdRange() //获取id区间
+	idRange, ok, endId := GetIdRange() //获取id区间
 	if !ok {
 		return
 	}
 	logger.Debug("此次任务ID区间:", idRange)
 	//查询企业库开启推送的客户
 	customers, _ := MgoTag.Find("euser", map[string]interface{}{"i_push": 1, "b_delete": false}, nil, nil)
-
 	for _, c := range customers {
 		customerId := mgoutil.BsonTOStringId(c["_id"])
 		customer := qu.ObjToString(c["s_name"])   //客户名称
@@ -86,6 +86,9 @@ func GetCustomerData() {
 		cus.RemoveRepeatData()    //数据去重
 		cus.AssembelAndSaveData() //组装、保存数据
 	}
+	Sysconfig.LatestId = endId
+	qu.WriteSysConfig(Sysconfig)
+	logger.Debug("定时任务结束-endId-Sysconfig.LatestId ", endId)
 }
 
 //获取客户打标签规则
@@ -206,6 +209,13 @@ func (c *Customer) GetData(stype string) {
 									}
 								}
 								id := qu.ObjToString(tmp["_id"])
+								isExists, err := redis.Exists("datag", c.AppId+"_"+id)
+								if err != nil {
+									log.Println("redis信息id判重出错 ", err)
+								} else if isExists {
+									log.Println("信息id重复 ", id)
+									return
+								}
 								tmp["id"] = id //记录数据原有id
 								delete(tmp, "_id")
 								if sr.ExtFieldType == 2 {
@@ -507,7 +517,6 @@ func (c *Customer) AssembelAndSaveData() {
 
 //获取用户所有规则
 func (d *Department) GetSearchRules(cid, stype string, idRange bson.M) {
-
 	defer qu.Catch()
 	searchRules, _ := MgoTag.Find("euserdepartrule", map[string]interface{}{"s_userid": cid, "s_departid": d.ID, "i_isuse": 1, "b_delete": false}, nil, nil)
 	if len(searchRules) > 0 {

+ 49 - 28
customerdata/src/util.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"log"
 	qu "qfw/util"
+	"qfw/util/redis"
 	"regexp"
 	"strings"
 	"time"
@@ -187,36 +188,55 @@ func GetNotkeyAndKeyAddWord(list primitive.A, fieldMap map[string]interface{}, h
 }
 
 //根据时间获取起始和终止ID范围
-func GetIdRange() (bson.M, bool) {
+func GetIdRange() (bson.M, bool, string) {
 	defer qu.Catch()
-	now := time.Now().Unix()
-	for { //当前时间一直向前推半小时,直到取到数据
-		now = now - 600 //10分钟前
-		endTime := time.Unix(now, 0)
-		endId := bson.NewObjectIdWithTime(endTime).Hex()
-		if endId > LatestId {
-			esquery := `{"query": {"bool": {"must": [{"range": {"id": {"gt": "` + LatestId + `" , "lte": "` + endId + `"}}}]}}, "sort": [{"comeintime": "desc"}]}`
-			if Es.Count(Index, Itype, esquery) > 0 { //有数据返回id区间
-				list := Es.Get(Index, Itype, esquery)
-				tmpRange := bson.M{
-					"range": bson.M{
-						"id": bson.M{
-							"lte": endId,
-							"gt":  LatestId,
-						},
-					},
-				}
-				LatestId = qu.ObjToString((*list)[0]["_id"])
-				Sysconfig.LatestId = LatestId
-				qu.WriteSysConfig(Sysconfig)
-				return tmpRange, true
-			}
-		} else { //结束id不大于起始id 退出
-			logger.Debug("Search End ID Range Error. Sid:", LatestId, "Eid:", endId)
-			break
-		}
+	// now := time.Now().Unix()
+	// for { //当前时间一直向前推半小时,直到取到数据
+	// now = now - 600 //10分钟前
+	// endTime := time.Unix(now, 0)
+	// endId := bson.NewObjectIdWithTime(endTime).Hex()
+	// if (now1 - now) > 3*600 {
+	// 	return bson.M{"range": bson.M{
+	// 		"id": bson.M{
+	// 			"lte": endId,
+	// 			"gt":  LatestId,
+	// 		},
+	// 	}}, true
+	// }
+	idQuery, endId := "", ""
+	esquery := `{"query":{"filtered":{"filter":{"bool":{"must":{"range":{"id":{"gt":"%s"}}}}}}},"_source":["_id","comeintime"],"sort":{"id":"desc"},"from":0,"size":1}`
+	if LatestId == "" {
+		idQuery = strings.Replace(fmt.Sprintf(esquery, LatestId), `"gt"`, `"gte"`, -1)
+	} else {
+		idQuery = fmt.Sprintf(esquery, LatestId)
 	}
-	return bson.M{}, false
+	resId := Es.Get(Index, Itype, idQuery)
+	if resId != nil && *resId != nil && len(*resId) == 1 {
+		endId = qu.ObjToString((*resId)[0]["_id"])
+	} else {
+		logger.Debug("获取本次查询的最大id的时候,未查找到数据...", idQuery)
+		return bson.M{}, false, ""
+	}
+	// if endId > LatestId {
+	// if Es.Count(Index, Itype, esquery) > 0 { //有数据返回id区间
+	// list := Es.Get(Index, Itype, esquery)
+	tmpRange := bson.M{
+		"range": bson.M{
+			"id": bson.M{
+				"lte": endId,
+				"gt":  LatestId,
+			},
+		},
+	}
+	LatestId = endId
+	return tmpRange, true, endId
+	// }
+	// } else { //结束id不大于起始id 退出
+	// logger.Debug("Search End ID Range Error. Sid:", LatestId, "Eid:", endId)
+	// break
+	// }
+	// }
+	// return bson.M{}, false
 	// now := time.Now()
 	// end := now.Unix() - int64(60*now.Minute()) - int64(now.Second())
 	// start := end - TaskTime*3600
@@ -478,6 +498,7 @@ func AssembelSave(tmp map[string]interface{}, IsSearchHosp, IsSearchEnps bool, a
 			}
 		}
 	}
+	redis.Put("datag", appid+"_"+id, 1, 3*24*60*60)
 	//
 	MgoSaveCache <- tmp
 	return true

+ 5 - 0
src/config.json

@@ -80,5 +80,10 @@
 		"(实施方)", "(无分包)", "(小微企业)", "(一)", "(一包)", "(一标段)", "(乙方)", "(元)", "(中型企业)", "(重点部分看控)",
 		"(主):",
 		"(主)", "(主)", "(成)", "(主办方)", "(主办方)", "(主体)", "(主体)", "(成员)", "(成员方)", "包1:", "包2:", "/"
+	],
+	"filetext_appid": [
+		"111",
+		"222",
+		"jyPB1XQgsGBQNbQElICQNW"
 	]
 }

+ 1 - 0
src/history/datamodel.go

@@ -75,6 +75,7 @@ var FielsArr = map[string]interface{}{
 }
 var ProjectAppidMap = map[string]bool{}
 var projectIdMap = sync.Map{}
+var FileTextAppidMap = map[string]bool{}
 
 //客户模型
 type Customer struct {

+ 5 - 1
src/history/historytask.go

@@ -36,6 +36,10 @@ func (this *HistoryData) HistoryTask(history_id string) {
 		pushModel := qu.IntAll(c["i_pushmodel"])     //推送模式
 		email := qu.ObjToString(c["sendMail"])       //推送邮箱
 		isDup := qu.IntAll(c["isDup"])               //是否去重
+		isfile := false
+		if FileTextAppidMap[appId] {
+			isfile = true
+		}
 
 		cus := &Customer{}
 		cus.SaveDataMap = map[string]map[string]interface{}{}
@@ -86,7 +90,7 @@ func (this *HistoryData) HistoryTask(history_id string) {
 			if len(xlsxArr) != xlsxCount {
 				log.Println("excel数据量错误")
 			}
-			GetXlsxs(xlsxArr, customer_name, email, history_id)
+			GetXlsxs(xlsxArr, customer_name, email, history_id, isfile)
 			go UpdateHistoryState(2, history_id, xlsxCount)
 			xlsxArr = []map[string]interface{}{}
 			xlsxCount = 0

+ 34 - 4
src/history/util_history.go

@@ -816,7 +816,7 @@ func SkipData(tmp map[string]interface{}) bool {
 	return false
 }
 
-func GetXlsxs(mMap []map[string]interface{}, fn, email, id string) {
+func GetXlsxs(mMap []map[string]interface{}, fn, email, id string, isfile bool) {
 	if id != "" {
 		query := bson.M{
 			"_id": bson.ObjectIdHex(id),
@@ -828,8 +828,18 @@ func GetXlsxs(mMap []map[string]interface{}, fn, email, id string) {
 			if err != nil {
 				log.Println("fields file not foud", err.Error())
 			}
+			style := xlsx.NewStyle()
+			style.Font.Size = 12
+			style.Font.Bold = true
+			style.Alignment.Vertical = "center"
+			style.Alignment.Horizontal = "center"
 			if dataType == 1 {
 				sh := xf.Sheets[0]
+				if isfile {
+					cell := sh.Rows[0].AddCell()
+					cell.SetValue("附件")
+					cell.SetStyle(style)
+				}
 				for i, v := range mMap {
 					row := sh.AddRow()
 					row.AddCell().SetInt(i + 1)
@@ -852,11 +862,19 @@ func GetXlsxs(mMap []map[string]interface{}, fn, email, id string) {
 					}
 					row.AddCell().SetValue(v["projectname"])
 					row.AddCell().SetValue(v["detail"])
-					row.AddCell().SetValue(v["s_jyhref"])
+					row.AddCell().SetValue(v["jybxhref"])
+					if isfile {
+						row.AddCell().SetValue(v["filetext"])
+					}
 				}
 				xf.Sheets = xf.Sheets[0:1]
 			} else if dataType == 2 {
 				sh := xf.Sheets[1]
+				if isfile {
+					cell := sh.Rows[0].AddCell()
+					cell.SetValue("附件")
+					cell.SetStyle(style)
+				}
 				for i, v := range mMap {
 					row := sh.AddRow()
 					row.AddCell().SetInt(i + 1)
@@ -872,7 +890,7 @@ func GetXlsxs(mMap []map[string]interface{}, fn, email, id string) {
 						row.AddCell()
 					}
 					row.AddCell().SetValue(v["href"])
-					row.AddCell().SetValue(v["s_jyhref"])
+					row.AddCell().SetValue(v["jybxhref"])
 					row.AddCell().SetValue(v["projectcode"])
 					row.AddCell().SetValue(v["projectname"])
 					row.AddCell().SetValue(v["projectscope"])
@@ -895,19 +913,28 @@ func GetXlsxs(mMap []map[string]interface{}, fn, email, id string) {
 					row.AddCell().SetValue(v["buyerperson"])
 					row.AddCell().SetValue(v["buyertel"])
 					row.AddCell().SetValue(v["agency"])
-					row.AddCell().SetValue(v["winner"])
+					row.AddCell().SetValue(v["s_winner"])
 					row.AddCell().SetValue(v["winnerperson"])
 					row.AddCell().SetValue(v["winnertel"])
 					row.AddCell().SetValue(v["legal_person"])
 					row.AddCell().SetValue(v["company_phone"])
 					row.AddCell().SetValue(v["company_email"])
+					if isfile {
+						row.AddCell().SetValue(v["filetext"])
+					}
 				}
 				xf.Sheets = xf.Sheets[1:2]
 			} else {
 				sh := xf.Sheets[2]
+				if isfile {
+					cell := sh.Rows[0].AddCell()
+					cell.SetValue("附件")
+					cell.SetStyle(style)
+				}
 				for _, v := range mMap {
 					row := sh.AddRow()
 					// row.AddCell().SetInt(i + 1)
+					row.AddCell().SetValue(v["rulename"])
 					row.AddCell().SetValue(v["matchkey"])
 					row.AddCell().SetValue(v["area"])
 					row.AddCell().SetValue(v["city"])
@@ -942,6 +969,9 @@ func GetXlsxs(mMap []map[string]interface{}, fn, email, id string) {
 					row.AddCell().SetValue(v["stock_name"])
 					ids := SE.EncodeString(qu.ObjToString(v["id"]))
 					row.AddCell().SetValue(ids)
+					if isfile {
+						row.AddCell().SetValue(v["filetext"])
+					}
 				}
 				xf.Sheets = xf.Sheets[2:3]
 			}

+ 3 - 0
src/main.go

@@ -57,6 +57,9 @@ func init() {
 	for _, v := range util.Sysconfig["project_appid"].([]interface{}) {
 		history.ProjectAppidMap[v.(string)] = true
 	}
+	for _, s := range util.Sysconfig["filetext_appid"].([]interface{}) {
+		history.FileTextAppidMap[s.(string)] = true
+	}
 }
 
 func main() {

+ 2 - 2
src/util/parsxlsx.go

@@ -22,7 +22,7 @@ const (
 	Globalnotkey   = "s_globalnotkey"   //全局排除词
 	Globalclearkey = "s_globalclearkey" //全局排除词
 
-	Tagname			= "s_name"		//标签的关键词规则名称
+	Tagname = "s_name" //标签的关键词规则名称
 )
 
 //通用标签相关字段
@@ -330,7 +330,7 @@ func ResponseXlsx_Data(id string) string {
 					row.AddCell().SetValue(v["buyerperson"])
 					row.AddCell().SetValue(v["buyertel"])
 					row.AddCell().SetValue(v["agency"])
-					row.AddCell().SetValue(v["winner"])
+					row.AddCell().SetValue(v["s_winner"])
 					row.AddCell().SetValue(v["winnerperson"])
 					row.AddCell().SetValue(v["winnertel"])
 					row.AddCell().SetValue(v["legal_person"])

BIN
src/web/res/fields.xlsx


Alguns arquivos não foram mostrados porque muitos arquivos mudaram nesse diff