Forráskód Böngészése

Merge branch 'dev2.11' into dev2.1

* dev2.11:
  历史数据推送优化
Jianghan 1 éve
szülő
commit
78e9216682

+ 1 - 1
CMPlatform/config.json

@@ -225,7 +225,7 @@
   "need_projectid_appid": [
     "jyFApXQQIEAw5TTUZOMBpD"
   ],
-  "redis_addrs": "datag=192.168.3.11:1712,session=192.168.3.11:1712,other=192.168.3.11:1712,ent=192.168.3.11:1712",
+  "redis_addrs": "datag=192.168.3.149:1712,session=192.168.3.149:1712,other=192.168.3.11:1712,ent=192.168.3.149:1712",
   "jyPushMysql": {
     "username": "root",
     "password": "Topnet123",

+ 9 - 3
CMPlatform/history/datamodel.go

@@ -50,9 +50,15 @@ type Customer struct {
 	DataSave     string                            //临时存储
 	Exact        int                               //精准筛选服务开关
 
-	tempChan       chan *tempData
-	tempChanOver   chan bool
-	saveBeforeChan chan *tempData
+	tempChan     chan *tempData
+	tempChanOver chan bool
+
+	saveDatTp     map[string]map[string]interface{} //判重合并字段信息
+	saveDatTpLock sync.Mutex
+
+	saveTempArr  [][]map[string]interface{} // 临时表 批量保存
+	saveTempLock sync.Mutex
+	saveTempColl string // 临时表表名
 }
 
 type tempData struct {

+ 8 - 2
CMPlatform/history/historytask.go

@@ -2,10 +2,12 @@ package history
 
 import (
 	"app.yhyue.com/moapp/jybase/common"
+	"app.yhyue.com/moapp/jybase/date"
 	"app.yhyue.com/moapp/jybase/go-xweb/xweb"
 	"app.yhyue.com/moapp/jybase/log"
 	"app.yhyue.com/moapp/jybase/mongodb"
 	"cmplatform/util"
+	"context"
 	"encoding/json"
 	"fmt"
 	"github.com/antonmedv/expr"
@@ -82,10 +84,11 @@ func (this *HistoryData) HistoryTask(history_id string) {
 		}
 		cus := &Customer{}
 		cus.SaveDataMap = map[string]map[string]interface{}{}
+		cus.saveDatTp = map[string]map[string]interface{}{}
 		cus.SaveDataArr = []map[string]interface{}{}
 		cus.tempChan = make(chan *tempData, 2000)
 		cus.tempChanOver = make(chan bool)
-		cus.saveBeforeChan = make(chan *tempData, 2000)
+		cus.saveTempColl = fmt.Sprintf("temp_history_%s_%d", date.NowFormat(date.Date_yyyyMMdd), time.Now().Unix())
 
 		cus.ID = customerId
 		cus.Name = customer_name
@@ -160,6 +163,9 @@ func (this *HistoryData) HistoryTask(history_id string) {
 		cus.GetData("history", i_dataSource) //获取数据
 		//cus.RemoveRepeatData()               //数据去重
 		cus.saveBeforeData(history_id, isFilter, noticeFilter, dataTable, entId, iContact, xlsxData)
+		if err := util.MgoSave.C.Database(util.MgoSave.DbName).Collection(cus.saveTempColl).Drop(context.Background()); err != nil { // 删除临时表
+			log.Error("临时表删除失败", zap.Error(err), zap.String("coll", cus.saveTempColl))
+		}
 		//cus.AssembelAndSaveData(history_id, isFilter, noticeFilter, dataTable, entId, iContact, xlsxData) //组装、保存数据
 		if dataTable == 2 {
 			//使用pc商机管理的企业,数据存入mysql表,扣除数据量
@@ -171,7 +177,7 @@ func (this *HistoryData) HistoryTask(history_id string) {
 		if i_pushtype == 0 {
 			time.Sleep(3 * time.Second)
 			if len(xlsxData.xlsxArr) != xlsxData.xlsxCount {
-				log.Error("excel数据量错误")
+				log.Error("excel数据量错误", zap.Int("xlsxArr", len(xlsxData.xlsxArr)), zap.Int("xlsxCount", xlsxData.xlsxCount))
 			}
 			GetXlsxs(xlsxData.xlsxArr, iContact, customer_name, email, history_id, isfile, isHenanMobile, isfilehref, appId)
 			UpdateHistoryState(2, history_id, xlsxData.xlsxCount)

+ 117 - 35
CMPlatform/history/task.go

@@ -22,8 +22,7 @@ import (
 
 type mergeData struct {
 	stype string // new/update
-
-	data map[string]interface{}
+	data  map[string]interface{}
 }
 
 // 获取客户打标签规则
@@ -127,7 +126,7 @@ func (c *Customer) GetDepartments(stype string, departments []map[string]interfa
 
 // 获取数据
 func (c *Customer) GetData(stype string, dataSource int) {
-	log.Debug("开始匹配数据...")
+	log.Debug("匹配es状态...")
 	defer common.Catch()
 	esConfig := util.Sysconfig["es"].(map[string]interface{})
 	esversion := common.ObjToString(esConfig["version"])
@@ -160,9 +159,8 @@ func (c *Customer) GetData(stype string, dataSource int) {
 }
 
 func (c *Customer) EsConGetDataV7(stype string, dataSource int, esCon *elastic.EsV7) {
-
 	go c.processedData()
-
+	log.Debug("开始匹配数据...")
 	client := esCon.GetEsConn()
 	defer esCon.DestoryEsConn(client)
 	ctx, _ := context.WithTimeout(context.Background(), 3*time.Hour)
@@ -291,6 +289,12 @@ L:
 					processedData_A(c, tmp)
 				}(tmp)
 			case <-c.tempChanOver:
+				c.saveTempLock.Lock()
+				if len(c.saveTempArr) > 0 {
+					util.MgoSave.UpSertBulk(c.saveTempColl, c.saveTempArr...)
+					c.saveTempArr = [][]map[string]interface{}{}
+				}
+				c.saveTempLock.Unlock()
 				break L
 			}
 		}
@@ -401,7 +405,7 @@ func processedData_A(c *Customer, data *tempData) {
 	/*
 		到此已经匹配完数据
 	*/
-	log.Debug("---------------------", zap.String("id", id), zap.Any("IsMatch", IsMatch))
+	//log.Debug("---------------------", zap.String("id", id), zap.Any("IsMatch", IsMatch))
 	if IsMatch {
 		//匹配成功,数据上新增规则id,matchKey,item并临时保存数据
 		// tmpMatchKey := MapDataToArr(matchKey)
@@ -443,9 +447,28 @@ func processedData_A(c *Customer, data *tempData) {
 		//存储数据
 		//dm.DataLock.Lock()
 		//tmpMap := map[string]interface{}{id: tmp}
-		//dm.DepartmentData[sr.ID] = append(dm.DepartmentData[sr.ID], tmpMap)
+		//dm.SaveDataMap[sr.ID] = append(dm.SaveDataMap[sr.ID], tmpMap)
 		//dm.DataLock.Unlock()
-		c.saveBeforeChan <- &tempData{dm: dm, rule: sr, data: tmp}
+		//c.saveBeforeChan <- &tempData{dm: dm, rule: sr, data: tmp}
+
+		m1 := c.RemoveRepeatData(tmp)
+		c.saveTempLock.Lock()
+		if m1 != nil {
+			c.saveTempArr = append(c.saveTempArr, []map[string]interface{}{
+				{"_id": mongodb.StringTOBsonId(id)},
+				{"$set": m1},
+			})
+		} else {
+			c.saveTempArr = append(c.saveTempArr, []map[string]interface{}{
+				{"_id": mongodb.StringTOBsonId(id)},
+				{"$set": tmp},
+			})
+		}
+		if len(c.saveTempArr) > 200 {
+			util.MgoSave.UpSertBulk(c.saveTempColl, c.saveTempArr...)
+			c.saveTempArr = [][]map[string]interface{}{}
+		}
+		c.saveTempLock.Unlock()
 	} else {
 		// common.Debug("------------", id, IsMatch)
 	}
@@ -594,39 +617,35 @@ func TagRuleFuc(appid string, pushModel, tag int, rules []*TagRule, sr *SearchRu
 	}
 }
 
-func (c *Customer) saveBeforeData(historyId string, isFilter, noticeFilter, dataTable, entId, i_contact int, xlsxData *XlsxData) {
+func (c *Customer) saveBeforeData(historyId string, isFilter, noticeFilter, dataTable, entId, iContact int, xlsxData *XlsxData) {
 	log.Debug("保存数据前置处理...")
-	ch := make(chan bool, 10)
-L:
-	for {
-		select {
-		case tmp := <-c.saveBeforeChan:
-			ch <- true
-			go func(tmp *tempData) {
-				defer func() {
-					<-ch
-				}()
-				AssembelSave(tmp.data, c.IsSearchHosp, c.IsSearchEnps, historyId, c.AppId, isFilter, noticeFilter, dataTable, entId, i_contact, xlsxData, c.DataSave)
-			}(tmp)
-		default:
-			select {
-			case tmp := <-c.saveBeforeChan:
-				ch <- true
-				go func(tmp *tempData) {
-					defer func() {
-						<-ch
-					}()
-					AssembelSave(tmp.data, c.IsSearchHosp, c.IsSearchEnps, historyId, c.AppId, isFilter, noticeFilter, dataTable, entId, i_contact, xlsxData, c.DataSave)
-				}(tmp)
-			case <-c.tempChanOver:
-				break L
-			}
+	sess := util.MgoSave.GetMgoConn()
+	defer util.MgoSave.DestoryMongoConn(sess)
+	ch := make(chan bool, 5)
+	wg := &sync.WaitGroup{}
+	query := sess.DB(util.MgoSave.DbName).C(c.saveTempColl).Find(nil).Select(nil).Iter()
+	count := 0
+	for tmp := make(map[string]interface{}); query.Next(&tmp); count++ {
+		if count%2000 == 0 {
+			log.Debug("saveBeforeData current ---", zap.Int("count", count))
 		}
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			AssembelSave(tmp, c.IsSearchHosp, c.IsSearchEnps, historyId, c.AppId, isFilter, noticeFilter, dataTable, entId, iContact, xlsxData, c.DataSave)
+		}(tmp)
+		tmp = make(map[string]interface{})
 	}
+	wg.Wait()
+	log.Debug("saveBeforeData over ---", zap.Int("count", count))
 }
 
 // 数据去重
-func (c *Customer) RemoveRepeatData() {
+func (c *Customer) RemoveRepeatData1() {
 	log.Debug("开始数据去重...")
 	defer common.Catch()
 	for _, dm := range c.Departments {
@@ -675,6 +694,69 @@ func (c *Customer) RemoveRepeatData() {
 		//将部门数据清空
 		dm.DepartmentData = map[string][]map[string]interface{}{}
 	}
+
+}
+
+func (c *Customer) RemoveRepeatData(tmp map[string]interface{}) map[string]interface{} {
+	dataId := common.ObjToString(tmp["id"])
+	c.saveDatTpLock.Lock()
+	defer func() {
+		c.saveDatTpLock.Unlock()
+	}()
+	if c.PushModel == 0 { //全局模式所有数据去重
+		if c.saveDatTp[dataId] == nil {
+			c.saveDatTp[dataId] = buildM(tmp)
+		} else { //数据重复
+			cus_history := c.saveDatTp[dataId]
+			MergeData(cus_history, tmp, c.IsTagRule, true, c.PushModel) //合并字段
+			return buildM(cus_history)
+		}
+	} else if c.PushModel == 2 || c.PushModel == 3 { //部门内部去重
+		if c.saveDatTp[dataId] == nil {
+			c.saveDatTp[dataId] = buildM(tmp)
+		} else { //数据重复
+			dm_history := c.saveDatTp[dataId]
+			if common.ObjToString(dm_history["departid"]) == common.ObjToString(tmp["departid"]) {
+				MergeData(dm_history, tmp, c.IsTagRule, false, c.PushModel) //合并字段
+				return buildM(dm_history)
+			}
+		}
+	} else if c.PushModel == 4 { //规则模式数据合并
+		//c.SaveDataArr = append(c.SaveDataArr, tmp)
+		if c.saveDatTp[dataId] == nil {
+			tmp["itemdist"] = map[string]interface{}{common.ObjToString(tmp["item"]): common.ObjToString(tmp["matchkey"])}
+			c.saveDatTp[dataId] = buildM(tmp)
+		} else { //数据重复
+			cus_history := c.saveDatTp[dataId]
+			MergeDatas(cus_history, tmp, c.IsTagRule, false) //合并字段
+			return buildM(cus_history)
+		}
+	} else if c.PushModel == 1 { // 部门模式数据合并
+		if c.saveDatTp[dataId] == nil {
+			tmp["itemdist"] = map[string]interface{}{common.ObjToString(tmp["item"]): common.ObjToString(tmp["matchkey"])}
+			c.saveDatTp[dataId] = buildM(tmp)
+		} else { //数据重复
+			cus_history := c.saveDatTp[dataId]
+			MergeData(cus_history, tmp, c.IsTagRule, true, c.PushModel) //合并字段
+			return buildM(cus_history)
+		}
+	}
+	return nil
+}
+
+func buildM(tmp map[string]interface{}) map[string]interface{} {
+	return map[string]interface{}{
+		"matchkey":   tmp["matchkey"],
+		"matchtype":  tmp["matchtype"],
+		"ruleid":     tmp["ruleid"],
+		"rulename":   tmp["rulename"],
+		"tagname":    tmp["tagname"],
+		"tagid":      tmp["tagid"],
+		"departname": tmp["departname"],
+		"departid":   tmp["departid"],
+		"item":       tmp["item"],
+		"itemdist":   tmp["itemdist"],
+	}
 }
 
 // 组装保存数据

+ 4 - 3
CMPlatform/history/util_history.go

@@ -760,9 +760,10 @@ func AssembelSave(tmp map[string]interface{}, IsSearchHosp, IsSearchEnps bool, h
 		return
 	}
 	//匹配公告附件
+	f := bson.M{"projectinfo": 1, "detail": 1, "bidtype": 1, "owner": 1, "total_investment": 1, "approvestatus": 1, "approvetime": 1, "project_completedate": 1}
 	info, _ := util.MgoBidding.FindOne(util.BiddingColl, map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
 	if info == nil || len(*info) == 0 {
-		info, _ = util.MgoBidding.FindOne("bidding_back", map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
+		info, _ = util.MgoBidding.FindOneByField("bidding_back", map[string]interface{}{"_id": mongodb.StringTOBsonId(id)}, f)
 	}
 	if info != nil && len(*info) > 0 {
 		if (*info)["projectinfo"] != nil {
@@ -792,7 +793,7 @@ func AssembelSave(tmp map[string]interface{}, IsSearchHosp, IsSearchEnps bool, h
 							}
 						}
 					}
-					log.Debug("查询附件结果:" + id + ", 附件数量:" + fmt.Sprint(len(filesArr)))
+					//log.Debug("查询附件结果:" + id + ", 附件数量:" + fmt.Sprint(len(filesArr)))
 					if len(filesArr) > 0 {
 						if FilterFilehrefAppidMap[appid] {
 							filesArrs := []map[string]interface{}{}
@@ -2332,7 +2333,7 @@ func GetProjectId(id string) (string, int64, int64) {
 			}
 		}
 		projectId = strings.Join(projectIdArr, ",")
-		log.Debug("projectId: " + projectId)
+		//log.Debug("projectId: " + projectId)
 		//
 	} else {
 		log.Debug("ES未查到项目id: " + id)