xuzhiheng 2 жил өмнө
parent
commit
f97b2649c9

+ 12 - 1
customerdata/src/task.go

@@ -180,9 +180,20 @@ func (m *MySource) Source() (interface{}, error) {
 func (c *Customer) EsConGetDataV7(stype string, esCon *esv.EsV7) {
 	client := esCon.GetEsConn()
 	defer esCon.DestoryEsConn(client)
-	ctx, _ := context.WithTimeout(context.Background(), 10*time.Minute)
+	ctx, _ := context.WithTimeout(context.Background(), 30*time.Minute)
 	for _, dm := range c.Departments {
 		for _, sr := range dm.Rules {
+			for {
+				listLen := redis.GetInt("session", "es_status")
+				if listLen == 0 {
+					log.Println("es空闲!")
+					break
+				} else if listLen == 1 || listLen == 2 {
+					log.Println("系统繁忙,请稍后再试 ", listLen)
+				}
+				time.Sleep(5 * time.Second)
+			}
+
 			//测试
 			// MgoDataTest(sr, dm, c)
 			// return

+ 1 - 1
src/config.json

@@ -216,7 +216,7 @@
   "need_projectid_appid": [
     "jyFApXQQIEAw5TTUZOMBpD"
   ],
-  "redis_addrs": "datag=192.168.3.11:1712,other=192.168.3.11:1712",
+  "redis_addrs": "datag=192.168.3.11:1712,session=192.168.3.11:1712,other=192.168.3.11:1712",
   "jyPushMysql": {
     "username": "root",
     "password": "Topnet123",

+ 11 - 311
src/history/task.go

@@ -6,6 +6,7 @@ import (
 	"log"
 	mongoutil "qfw/mongodb"
 	qu "qfw/util"
+	"qfw/util/redis"
 	"regexp"
 	. "sqlmodel"
 	"strings"
@@ -19,7 +20,6 @@ import (
 	esv "es"
 
 	esV7 "github.com/olivere/elastic"
-	es "gopkg.in/olivere/elastic.v1"
 )
 
 // 获取客户打标签规则
@@ -161,328 +161,28 @@ func (c *Customer) GetData(stype string, dataSource int) {
 	esConfig := Sysconfig["es"].(map[string]interface{})
 	esversion := qu.ObjToString(esConfig["version"])
 	if esversion == "v1" {
-		esCon := esv.VarEs.(*esv.EsV1)
-		c.EsConGetDataV1(stype, dataSource, esCon)
 	} else {
 		esCon := esv.VarEs.(*esv.EsV7)
 		c.EsConGetDataV7(stype, dataSource, esCon)
 	}
 }
 
-func (c *Customer) EsConGetDataV1(stype string, dataSource int, esCon *esv.EsV1) {
-	client := esCon.GetEsConn()
-	defer esCon.DestoryEsConn(client)
-	for _, dm := range c.Departments {
-		for _, sr := range dm.Rules {
-			ch := make(chan bool, 10)
-			wg := &sync.WaitGroup{}
-			log.Println("------------------", dataSource, EsAllIndex)
-			esIndex := Index
-			if dataSource == 2 {
-				esIndex = EsAllIndex
-			}
-			escount := Es.Count(esIndex, Itype, sr.EsQuery)
-			log.Println("index", esIndex, "type", Itype)
-			log.Println("查询总数:", escount, "规则ID:", sr.ID, "EsQuery:", sr.EsQuery)
-			if escount == 0 {
-				continue
-			}
-			//查询条件类型转换
-			var q es.Query
-			tmpQuery := es.BoolQuery{
-				QueryStrings: sr.EsQuery,
-			}
-			q = tmpQuery
-
-			//游标查询,index不支持别名,只能写索引库的名称
-			res, err := client.Scroll(esIndex).Query(q).Size(200).Do() //查询一条获取游标
-			if err == nil {
-				numDocs := 0
-				scrollId := res.ScrollId
-				for {
-					if scrollId == "" {
-						log.Println("ScrollId Is Error")
-						break
-					}
-					searchResult, err := client.Scroll(Index).Size(200).ScrollId(scrollId).Do() //查询
-					if err != nil {
-						if err.Error() == "EOS" { //迭代完毕
-							log.Println("Es Search Data Over:", err)
-						} else {
-							log.Println("Es Search Data Error:", err)
-						}
-						break
-					}
-					for _, hit := range searchResult.Hits.Hits {
-						//开始处理数据
-						wg.Add(1)
-						ch <- true
-						go func(tmpHit *es.SearchHit) {
-							defer func() {
-								<-ch
-								wg.Done()
-							}()
-							tmp := make(map[string]interface{})
-							if json.Unmarshal(*tmpHit.Source, &tmp) == nil {
-								if stype != "history" {
-									if !SkipData(tmp) {
-										qu.Debug("跳过该条数据,发布时间在入库时间7天之前,", qu.ObjToString(tmp["_id"]))
-										return
-									}
-								}
-								id := qu.ObjToString(tmp["_id"])
-								//亚信
-								if CheckBidOpenAppidMap[c.AppId] {
-									if tmp["bidopentime"] != nil {
-										bidopentime := qu.Int64All(tmp["bidopentime"])
-										comeintime := qu.Int64All(tmp["comeintime"])
-										if bidopentime-comeintime <= 7*24*60*60 {
-											qu.Debug("跳过该条数据,开标时间-入库时间<=7天,", id)
-											return
-										}
-									}
-								}
-								//河南移动,过滤掉中国移动采购网招标数据
-								if CheckBidHrefRuleIdMap[dm.ID] {
-									if strings.Contains(qu.ObjToString(tmp["href"]), "b2b.10086.cn") {
-										qu.Debug("跳过该条数据,公告原网址中包含 b2b.10086.cn,", id)
-										return
-									}
-								}
-								//
-								tmp["id"] = id //记录数据原有id
-								delete(tmp, "_id")
-								if sr.ExtFieldType == 2 {
-									findwinner := ""
-									s_winner := strings.Split(qu.ObjToString(tmp["s_winner"]), ",")
-									if len(s_winner) > 0 {
-										for i := 0; i < len(s_winner); i++ {
-											findwinners := strings.TrimSpace(s_winner[i])
-											if findwinners != "" {
-												for _, v := range Sysconfig["s_winner_filter"].([]interface{}) {
-													strings.ReplaceAll(findwinners, v.(string), "")
-												}
-												if findwinners != "" {
-													findwinner = findwinners
-													break
-												}
-											}
-										}
-									}
-									// findwinner := strings.TrimSpace(qu.ObjToString(tmp["winner"]))
-									if findwinner != "" {
-										finddata := MgoEnps.FindOne(EnpsColl, bson.M{"company_name": findwinner})
-										if finddata != nil {
-											if legal_person := qu.ObjToString(finddata["legal_person"]); legal_person != "" {
-												tmp["legal_person"] = legal_person
-											}
-											if email := qu.ObjToString(finddata["company_email"]); email != "" {
-												tmp["company_email"] = email
-											}
-											if phone := qu.ObjToString(finddata["company_phone"]); phone != "" {
-												tmp["company_phone"] = phone
-											}
-										}
-									}
-								}
-								matchKey := map[string]bool{}     //记录所有匹配上的关键词
-								matchKeyType := map[string]bool{} //记录关键词对应的匹配方式
-								//先获取用到的所有字段值
-								fieldText := map[string]interface{}{}
-								for field, _ := range sr.Fields {
-									text := qu.ObjToString(tmp[field])
-									text = ProcessData(text) //处理文本(字母转大写,删除一些符号)
-									fieldText[field] = text
-								}
-								//清理词清理
-								for _, cwm := range sr.GCW.MatchType {
-									if text := qu.ObjToString(fieldText[cwm]); text != "" {
-										for _, gcw_reg := range sr.GCW.KeyReg {
-											text = gcw_reg.ReplaceAllString(text, "")
-										}
-										fieldText[cwm] = text
-									}
-								}
-								//精准筛选规则2022-10-19
-								if c.Exact == 1 && sr.ExactRule != "" {
-									nameArr := []string{}
-									data, ok := Mgo.Find("groups", map[string]interface{}{"ruleId": sr.ID}, nil, nil, false, -1, -1)
-									if ok && data != nil && len(*data) > 0 {
-										for _, v := range *data {
-											nameArr = append(nameArr, qu.ObjToString(v["name"]))
-										}
-									}
-									exactResult := exactMatchs(sr.ExactRule, qu.ObjToString(tmp["title"]), qu.ObjToString(tmp["detail"]), sr.Maths, nameArr)
-									qu.Debug("-------------------精准匹配", id, exactResult)
-									if !exactResult {
-										return
-									}
-								}
-								//
-								/*
-									因为要记录所有匹配上的关键词,所有优先匹配附加词,在匹配关键词
-								*/
-								//1.附加词匹配
-								IsMatch := false
-								// qu.Debug("sr.AW---", len(sr.AW))
-								for i, aw := range sr.AW {
-									// qu.Debug("-------------------------开始附加词匹配--------------------------")
-									IsMatchAddKey := RegMatch(fieldText, aw.MatchType, aw.KeyReg, nil, nil, false, true)
-									// qu.Debug(IsMatchAddKey, "------------------------------------------------------------")
-
-									//2.关键词匹配
-									if IsMatchAddKey {
-										kw := sr.KW[i]
-										// qu.Debug("-------------------------开始关键词匹配--------------------------")
-										IsMatchKey := RegMatch(fieldText, kw.MatchType, kw.KeyReg, matchKey, matchKeyType, true, false)
-										// qu.Debug(IsMatchKey, "------------------------------------------------------------")
-										if IsMatchKey {
-											IsMatch = true
-										}
-									}
-								}
-								if len(sr.AW) == 0 {
-									IsMatch = true
-								}
-								/*
-									到此已经匹配完数据
-								*/
-								qu.Debug("---------------------", id, IsMatch)
-								if IsMatch { //匹配成功,数据上新增规则id,matchKey,item并临时保存数据
-									// tmpMatchKey := MapDataToArr(matchKey)
-									tmpMatchKeyType := MapDataToArr(matchKeyType)
-									tmp["matchkey"] = GetMactchKeys(sr.Maths, tmp)
-									tmp["matchtype"] = strings.Join(tmpMatchKeyType, ",")
-									tmp["ruleid"] = sr.ID
-									tmp["rulename"] = sr.Name
-									tmpBuyerClass := qu.ObjToString(tmp["buyerclass"])
-
-									//开始打标签
-									//qu.Debug("c.IsTagRule+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
-									if c.IsTagRule {
-										tagNameMap := map[string]bool{}
-										tagIdMap := map[string]bool{}
-										//qu.Debug("c.TagRules---", len(c.TagRules))
-										for _, tr := range c.TagRules {
-											if tr.DepartRuleIds[sr.ID] {
-												//先获取用到的所有字段值
-												for field, _ := range tr.Fields {
-													if fieldText[field] == nil { //补充fieldText
-														text := qu.ObjToString(tmp[field])
-														text = ProcessData(text) //处理文本(字母转大写,删除一些符号)
-														fieldText[field] = text
-													}
-												}
-												matchKeyTag := map[string]bool{}     //记录所有标签里的匹配上的关键词
-												matchKeyTypeTag := map[string]bool{} //记录标签里的关键词对应的匹配方式
-
-												//qu.Debug("-------------------------开始排除词匹配--------------------------")
-												//qu.Debug("tr.NW---", len(tr.NW))
-												for j, tag_nw := range tr.NW { //排除词匹配
-													IsMatchNotKey := RegMatch(fieldText, tag_nw.MatchType, tag_nw.KeyReg, nil, nil, false, false)
-													//qu.Debug(IsMatchNotKey, "------------------------------------------------------------")
-													if !IsMatchNotKey { //排除词未匹配,匹配附加词关键词
-														if RegMatch(fieldText, tr.AW[j].MatchType, tr.AW[j].KeyReg, nil, nil, false, true) && RegMatch(fieldText, tr.KW[j].MatchType, tr.KW[j].KeyReg, matchKeyTag, matchKeyTypeTag, true, false) {
-															tagname := tr.TagNames[j]
-															tagBuyerClass := tr.BuyerClass[j]
-															if tagBuyerClass != "" {
-																if strings.Contains(tagBuyerClass, tmpBuyerClass) {
-																	if tagname == "" {
-																		tempList := []string{}
-																		for k, _ := range matchKeyTag {
-																			tempList = append(tempList, k)
-																		}
-																		tagname = strings.Join(tempList, ",")
-																		log.Println("=====tagname为空取匹配词为标签名称", tagname)
-																	}
-																	//qu.Debug("tagname-----", tagname)
-																	tagNameMap[tagname] = true
-																	tagIdMap[tr.ID] = true
-																}
-															} else {
-																if tagname == "" {
-																	tempList := []string{}
-																	for k, _ := range matchKeyTag {
-																		tempList = append(tempList, k)
-																	}
-																	tagname = strings.Join(tempList, ",")
-																	log.Println("=====tagname为空取匹配词为标签名称", tagname)
-																}
-																//qu.Debug("tagname-----", tagname)
-																tagNameMap[tagname] = true
-																tagIdMap[tr.ID] = true
-															}
-														}
-													}
-												}
-											}
-										}
-										//tagname
-										tagNameArr := MapDataToArr(tagNameMap)
-										tagIdArr := MapDataToArr(tagIdMap)
-										if len(tagNameArr) > 0 {
-											tmp["tagname"] = strings.Join(tagNameArr, ",")
-											if DisPackageAppidMap[c.AppId] {
-												tmp["buyer_type"] = strings.Join(tagNameArr, ",")
-											}
-											if c.PushModel == 2 {
-												tmp["item"] = strings.Join(tagNameArr, ",")
-											}
-											tmp["tagid"] = strings.Join(tagIdArr, ",")
-										}
-									}
-									//item
-									switch c.PushModel {
-									case 0:
-										tmp["item"] = "数据"
-									case 1:
-										tmp["item"] = dm.Name
-									case 2:
-										//tmp["item"] = sr.Name
-									case 3:
-										tmp["item"] = dm.Name + "_" + sr.Name
-									case 4:
-										tmp["item"] = sr.Name
-									}
-									//appid
-									tmp["appid"] = c.AppId
-									//部门名称
-									tmp["departname"] = dm.Name
-									tmp["departid"] = dm.ID
-									//存储数据
-									dm.DataLock.Lock()
-									//qu.Debug("tmp---", tmp)
-									tmpMap := map[string]interface{}{id: tmp}
-									dm.DepartmentData[sr.ID] = append(dm.DepartmentData[sr.ID], tmpMap)
-									dm.DataLock.Unlock()
-								} else {
-									// qu.Debug("------------", id, IsMatch)
-								}
-							}
-						}(hit)
-						numDocs += 1
-						if numDocs%500 == 0 {
-							log.Println("Current:", numDocs)
-						}
-					}
-					scrollId = searchResult.ScrollId
-				}
-				wg.Wait()
-				client.ClearScroll().ScrollId(scrollId).Do() //清理游标
-				log.Println("SearchRule ID", sr.ID, "Result Data Count:", numDocs)
-			} else {
-				log.Println("Customer:", c.Name, "Departmnet", dm.Name, "TagName", sr.Name, "Es Search Data Error,Tag ID:", sr.ID)
-			}
-		}
-	}
-}
-
 func (c *Customer) EsConGetDataV7(stype string, dataSource int, esCon *esv.EsV7) {
 	client := esCon.GetEsConn()
 	defer esCon.DestoryEsConn(client)
 	ctx, _ := context.WithTimeout(context.Background(), 3*time.Hour)
 	for _, dm := range c.Departments {
 		for _, sr := range dm.Rules {
+			for {
+				listLen := redis.GetInt("session", "es_status")
+				if listLen == 0 {
+					log.Println("es空闲!")
+					break
+				} else if listLen == 1 || listLen == 2 {
+					log.Println("系统繁忙,请稍后再试 ", listLen)
+				}
+				time.Sleep(5 * time.Second)
+			}
 			ch := make(chan bool, 10)
 			wg := &sync.WaitGroup{}
 			esIndex := Index

+ 11 - 14
src/util/utiltag.go

@@ -140,22 +140,19 @@ func searchDataArr(index, esquery, sdataid string, i_maxnum int64, tags map[stri
 	var (
 		err    error
 		counts = int64(0)
-		times  = 0
 	)
-	for {
-		listLen := int(redis.LLEN("datag", "jyqyfw_es_query"))
-		if listLen < 5 {
-			redis.RPUSH("datag", "jyqyfw_es_query", 1)
-			err, counts = searchData(index, esquery, sdataid, i_maxnum, tags, maths)
-			redis.LPOP("datag", "jyqyfw_es_query")
-			break
-		} else if listLen > 20 || times > 60 {
-			err = errors.New("系统繁忙,请稍后再试")
-			break
-		}
-		times += 2
-		time.Sleep(2 * time.Second)
+	// for {
+	listLen := redis.GetInt("session", "es_status")
+	if listLen == 0 {
+		log.Println("es空闲!")
+		err, counts = searchData(index, esquery, sdataid, i_maxnum, tags, maths)
+		// break
+	} else if listLen == 1 || listLen == 2 {
+		err = errors.New("系统繁忙,请稍后再试")
+		// break
 	}
+	// time.Sleep(5 * time.Second)
+	// }
 	return err, counts
 }