maxiaoshan 5 年之前
父節點
當前提交
91fabca8d8
共有 4 個文件被更改,包括 89 次插入55 次删除
  1. 23 1
      customerdata/src/historytask.go
  2. 55 48
      customerdata/src/task.go
  3. 5 1
      tagservice/src/historytask.go
  4. 6 5
      tagservice/src/newtask.go

+ 23 - 1
customerdata/src/historytask.go

@@ -2,10 +2,32 @@ package main
 
 import (
 	"log"
+
+	bson "gopkg.in/mgo.v2/bson"
 )
 
 //qu "qfw/util"
 
-func HistoryTask(sid, eid, customer string) {
+func HistoryTask(sid, eid, name string) {
 	log.Println("Start HistoryTask...")
+	tmpRange := bson.M{
+		"range": bson.M{
+			"id": bson.M{
+				"gte": sid,
+				"lte": eid,
+			},
+		},
+	}
+	//加载客户
+	oneCustomer := MgoTag.FindOne("customer", map[string]interface{}{"s_customername": name, "i_use": 1})
+	if len(oneCustomer) > 0 {
+		esArr := GetCustomerEs(name, tmpRange) //获取es
+		if len(esArr) > 0 {
+			csr := &Customer{
+				EsQuery: esArr,
+			}
+			csr.GetData()
+		}
+	}
+
 }

+ 55 - 48
customerdata/src/task.go

@@ -6,8 +6,10 @@ import (
 	"log"
 	. "model"
 	qu "qfw/util"
-
 	"sync"
+	"sync/atomic"
+
+	//"sync"
 	"time"
 
 	"github.com/cron"
@@ -25,15 +27,15 @@ type Customer struct {
 }
 
 func TimeTask() {
-	//InitCustomer()
+	go StartTask()
 	c := cron.New()
 	cronstr := "0 */" + fmt.Sprint(TaskTime) + " * * * ?"
 	//cronstr := "0 0 */" + fmt.Sprint(TaskTime) + " * * ?"//每TaskTime小时执行一次
 	qu.Debug("cronstr:", cronstr)
-	c.AddFunc(cronstr, func() { Start() })
+	//c.AddFunc(cronstr, func() { StartTask() })
 	c.Start()
 }
-func Start() {
+func StartTask() {
 	InitCustomer()
 	GetCustomerData()
 }
@@ -43,54 +45,19 @@ func InitCustomer() {
 	qu.Debug("Init Customer...")
 	tmpRange := GetIdRange() //获取id区间
 	qu.Debug(tmpRange)
-	return
 	customers, _ := MgoTag.Find("customer", map[string]interface{}{"i_use": 1}, nil, nil)
 	for _, c := range customers {
-		csr := &Customer{}
-		esArr := []string{}
 		customer := qu.ObjToString(c["s_customername"])
-		tags, _ := MgoTag.Find("tags", map[string]interface{}{"s_customer": customer}, nil, map[string]interface{}{"s_esquery": 1})
-		for _, t := range tags {
-			esquery := qu.ObjToString(t["s_esquery"])
-			query := map[string]*QueryObjecct{}
-			if json.Unmarshal([]byte(esquery), &query) == nil {
-				qb := query["query"]
-				filter := qb.Filtered.Filter
-				if filter != nil && filter.Bool != nil { //有filter
-					index := 0 //记录range的位置
-					for i, m := range filter.Bool.Must {
-						mMap := m.(map[string]interface{})
-						if mMap["range"] != nil { //有range
-							index = i
-							break
-						}
-					}
-					if index != 0 {
-						filter.Bool.Must[index] = tmpRange
-					} else {
-						filter.Bool.Must = append(filter.Bool.Must, tmpRange)
-					}
-				} else { //无filter则添加
-					bo := &BoolObject{}
-					bo.Must = append(bo.Must, tmpRange)
-					tmpFilter := &Filter{
-						Bool: bo,
-					}
-					qb.Filtered.Filter = tmpFilter
-				}
-				strquery, _ := json.Marshal(query)
-				qu.Debug("strquery---", string(strquery))
-				esArr = append(esArr, string(strquery))
-			}
-		}
+		esArr := GetCustomerEs(customer, tmpRange)
 		if len(esArr) > 0 {
+			csr := &Customer{}
 			csr.EsQuery = esArr
 			CustomerArr = append(CustomerArr, csr)
 		}
 	}
-	qu.Debug(len(CustomerArr))
 }
 
+//获取数据
 func GetCustomerData() {
 	for _, customer := range CustomerArr {
 		customer.GetData()
@@ -136,7 +103,6 @@ func (c *Customer) GetData() {
 					}
 					break
 				}
-
 				for _, hit := range searchResult.Hits.Hits {
 					//开始处理数据
 					wg.Add(1)
@@ -148,23 +114,64 @@ func (c *Customer) GetData() {
 						}()
 						tmp := make(map[string]interface{})
 						if json.Unmarshal(*tmpHit.Source, &tmp) == nil {
-							numTags++
+							atomic.AddInt64(&numTags, 1)
 						}
 					}(hit)
-					wg.Wait()
 					numDocs += 1
 					if numDocs%500 == 0 {
-						//log.Println("Current:", numDocs)
+						log.Println("Current:", numDocs)
 					}
 				}
 				scrollId = searchResult.ScrollId
 			}
+			wg.Wait()
 			client.ClearScroll().ScrollId(scrollId).Do() //清理游标
+			log.Println("Result Data Count:", numDocs)
 			log.Println("Result Data Count:", numDocs, "	Tags Data Count:", numTags)
 		}
 	}
 }
 
+//获取用户所有es
+func GetCustomerEs(customer string, tmpRange bson.M) (esArr []string) {
+	tags, _ := MgoTag.Find("tags", map[string]interface{}{"s_customer": customer}, nil, map[string]interface{}{"s_esquery": 1})
+	for _, t := range tags {
+		esquery := qu.ObjToString(t["s_esquery"])
+		query := map[string]*QueryObjecct{}
+		if json.Unmarshal([]byte(esquery), &query) == nil {
+			qb := query["query"]
+			filter := qb.Filtered.Filter
+			if filter != nil && filter.Bool != nil { //有filter
+				index := 0 //记录range的位置
+				for i, m := range filter.Bool.Must {
+					mMap := m.(map[string]interface{})
+					if mMap["range"] != nil { //有range
+						index = i
+						break
+					}
+				}
+				if index != 0 {
+					filter.Bool.Must[index] = tmpRange
+				} else {
+					filter.Bool.Must = append(filter.Bool.Must, tmpRange)
+				}
+			} else { //无filter则添加
+				bo := &BoolObject{}
+				bo.Must = append(bo.Must, tmpRange)
+				tmpFilter := &Filter{
+					Bool: bo,
+				}
+				qb.Filtered.Filter = tmpFilter
+			}
+			strquery, _ := json.Marshal(query)
+			esArr = append(esArr, string(strquery))
+		} else {
+			log.Println(customer, "Es Error")
+		}
+	}
+	return
+}
+
 //根据时间获取起始和终止ID范围
 func GetIdRange() bson.M {
 	now := time.Now()
@@ -174,8 +181,8 @@ func GetIdRange() bson.M {
 	startTime := time.Unix(start, 0)
 	eid := bson.NewObjectIdWithTime(endTime).Hex()
 	sid := bson.NewObjectIdWithTime(startTime).Hex()
-	// sid = "5cf14f800000000000000000"
-	// eid = "5ed3d4800000000000000000"
+	sid = "5cf14f800000000000000000"
+	eid = "5ed3d4800000000000000000"
 	tmpRange := bson.M{
 		"range": bson.M{
 			"id": bson.M{

+ 5 - 1
tagservice/src/historytask.go

@@ -265,6 +265,7 @@ func EsHistoryData() {
 	if err == nil {
 		numDocs := 0
 		numTags := int64(0)
+		wg := &sync.WaitGroup{}
 		scrollId := res.ScrollId
 		for {
 			if scrollId == "" {
@@ -283,10 +284,12 @@ func EsHistoryData() {
 
 			for _, hit := range searchResult.Hits.Hits {
 				//开始处理数据
+				wg.Add(1)
 				data_channel <- true
 				go func(tmpHit *es.SearchHit) {
 					defer func() {
 						<-data_channel
+						wg.Done()
 					}()
 					tmp := make(map[string]interface{})
 					if json.Unmarshal(*tmpHit.Source, &tmp) == nil {
@@ -383,8 +386,9 @@ func EsHistoryData() {
 			}
 			scrollId = searchResult.ScrollId
 		}
+		wg.Wait()
 		client.ClearScroll().ScrollId(scrollId).Do() //清理游标
-		time.Sleep(5 * time.Second)
+		//time.Sleep(5 * time.Second)
 		log.Println("Result Data Count:", numDocs, "	Tags Data Count:", numTags)
 	} else {
 		log.Println("Es Scroll Find Error:", err)

+ 6 - 5
tagservice/src/newtask.go

@@ -318,6 +318,7 @@ func (t *Task) RunEs() {
 		numDocs := 0
 		numTags := int64(0)
 		scrollId := res.ScrollId
+		t.Wg = &sync.WaitGroup{}
 		for {
 			if scrollId == "" {
 				log.Println("ScrollId Is Error")
@@ -335,12 +336,12 @@ func (t *Task) RunEs() {
 
 			for _, hit := range searchResult.Hits.Hits {
 				//开始处理数据
-				//wg.Add(1)
+				t.Wg.Add(1)
 				t.DataChan <- true
 				go func(tmpHit *es.SearchHit) {
 					defer func() {
 						<-t.DataChan
-						//wg.Done()
+						t.Wg.Done()
 					}()
 					tmp := make(map[string]interface{})
 					if json.Unmarshal(*tmpHit.Source, &tmp) == nil {
@@ -429,7 +430,7 @@ func (t *Task) RunEs() {
 						}
 						addMap := map[string]interface{}{}
 						if len(tmpTagNameMap) > 0 { //有新标签或者历史标签
-							atomic.AddInt64(&numTags, +1) //n++ 计数
+							atomic.AddInt64(&numTags, 1) //n++ 计数
 							AddData(addMap, tmpPreTagMap)
 							AddData(addMap, tmpTagNameMap)
 						}
@@ -452,7 +453,6 @@ func (t *Task) RunEs() {
 						}
 					}
 				}(hit)
-				//wg.Wait()
 				numDocs += 1
 				if numDocs%500 == 0 {
 					log.Println("Current:", numDocs)
@@ -460,8 +460,9 @@ func (t *Task) RunEs() {
 			}
 			scrollId = searchResult.ScrollId
 		}
+		t.Wg.Wait()
 		client.ClearScroll().ScrollId(scrollId).Do() //清理游标
-		time.Sleep(5 * time.Second)
+		//time.Sleep(5 * time.Second)
 		log.Println("Result Data Count:", numDocs, "	Tags Data Count:", numTags)
 		t.StartId = endId //替换id
 		setid := map[string]interface{}{