xuzhiheng 1 gadu atpakaļ
vecāks
revīzija
d4db73ff9e
1 mainītis faili ar 28 papildinājumiem un 22 dzēšanām
  1. 28 22
      CMPlatform/history/task.go

+ 28 - 22
CMPlatform/history/task.go

@@ -257,7 +257,13 @@ func (c *Customer) EsConGetDataV7(stype string, dataSource int, esCon *elastic.E
 				}
 				wg.Wait()
 				client.ClearScroll().ScrollId(scrollId).Do(ctx) //清理游标
-				c.tempChanOver <- true
+				// c.tempChanOver <- true
+				c.saveTempLock.Lock()
+				if len(c.saveTempArr) > 0 {
+					util.MgoSave.UpSertBulk(c.saveTempColl, c.saveTempArr...)
+					c.saveTempArr = [][]map[string]interface{}{}
+				}
+				c.saveTempLock.Unlock()
 				log.Debug(fmt.Sprintf("SearchRule ID:%s, Result Data Count:%d", sr.ID, numDocs))
 			} else {
 				log.Debug(fmt.Sprintf("Customer: %s, Departmnet: %s, TagName: %s, Es Search Data Error,Tag ID: %s", c.Name, dm.Name, sr.Name, sr.ID))
@@ -268,7 +274,7 @@ func (c *Customer) EsConGetDataV7(stype string, dataSource int, esCon *elastic.E
 
 func (c *Customer) processedData() {
 	ch := make(chan bool, 10)
-L:
+	// L:
 	for {
 		select {
 		case tmp := <-c.tempChan:
@@ -279,25 +285,25 @@ L:
 				}()
 				processedData_A(c, tmp)
 			}(tmp)
-		default:
-			select {
-			case tmp := <-c.tempChan:
-				ch <- true
-				go func(tmp *tempData) {
-					defer func() {
-						<-ch
-					}()
-					processedData_A(c, tmp)
-				}(tmp)
-			case <-c.tempChanOver:
-				c.saveTempLock.Lock()
-				if len(c.saveTempArr) > 0 {
-					util.MgoSave.UpSertBulk(c.saveTempColl, c.saveTempArr...)
-					c.saveTempArr = [][]map[string]interface{}{}
-				}
-				c.saveTempLock.Unlock()
-				break L
-			}
+			// default:
+			// 	select {
+			// 	case tmp := <-c.tempChan:
+			// 		ch <- true
+			// 		go func(tmp *tempData) {
+			// 			defer func() {
+			// 				<-ch
+			// 			}()
+			// 			processedData_A(c, tmp)
+			// 		}(tmp)
+			// 	case <-c.tempChanOver:
+			// 		c.saveTempLock.Lock()
+			// 		if len(c.saveTempArr) > 0 {
+			// 			util.MgoSave.UpSertBulk(c.saveTempColl, c.saveTempArr...)
+			// 			c.saveTempArr = [][]map[string]interface{}{}
+			// 		}
+			// 		c.saveTempLock.Unlock()
+			// 		break L
+			// 	}
 		}
 	}
 }
@@ -406,7 +412,7 @@ func processedData_A(c *Customer, data *tempData) {
 	/*
 		到此已经匹配完数据
 	*/
-	//log.Debug("---------------------", zap.String("id", id), zap.Any("IsMatch", IsMatch))
+	log.Debug("---------------------", zap.String("id", id), zap.Any("IsMatch", IsMatch))
 	if IsMatch {
 		//匹配成功,数据上新增规则id,matchKey,item并临时保存数据
 		// tmpMatchKey := MapDataToArr(matchKey)