apple 5 жил өмнө
parent
commit
dfbb1cf368

+ 2 - 2
udpfilterdup/src/datamap.go

@@ -515,8 +515,8 @@ func (d *datamap) update(t int64) {
 	if TimingTask {
 	if TimingTask {
 		d.keymap = d.GetLatelyFiveDay(t)
 		d.keymap = d.GetLatelyFiveDay(t)
 	}else {
 	}else {
-		d.keymap = d.GetLatelyFiveDay(t)//测试数据采用
-		//d.keymap = d.GetLatelyFiveDayDouble(t)
+		//d.keymap = d.GetLatelyFiveDay(t)//测试数据采用
+		d.keymap = d.GetLatelyFiveDayDouble(t)
 	}
 	}
 	m := map[string]bool{}
 	m := map[string]bool{}
 	for _, v := range d.keymap {
 	for _, v := range d.keymap {

+ 29 - 5
udpfilterdup/src/main.go

@@ -15,6 +15,7 @@ import (
 	"os"
 	"os"
 	"qfw/util"
 	"qfw/util"
 	"regexp"
 	"regexp"
+	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
@@ -210,7 +211,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	}
 	}
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort(sortName).Iter()
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort(sortName).Iter()
 	updateExtract := [][]map[string]interface{}{}
 	updateExtract := [][]map[string]interface{}{}
-	ids:=""
+	ids:=[]string{}
 	log.Println("线程数:", threadNum)
 	log.Println("线程数:", threadNum)
 	pool := make(chan bool, threadNum)
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	wg := &sync.WaitGroup{}
@@ -353,12 +354,35 @@ func task(data []byte, mapInfo map[string]interface{}) {
 							repeat_idMap["_id"] = source.id
 							repeat_idMap["_id"] = source.id
 						}
 						}
 						repeat_id = info.id
 						repeat_id = info.id
+						if len(ids)>=9 {
+							ids=append(ids,source.id)
+
+
+							for _, to := range nextNode {
+
+								key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
+								by, _ := json.Marshal(map[string]interface{}{
+									"gtid":  source.id,
+									"lteid": source.id,
+									"stype": util.ObjToString(to["stype"]),
+									"key":   key,
+									"ids":   strings.Join(ids, ","),
+								})
+								addr := &net.UDPAddr{
+									IP:   net.ParseIP(to["addr"].(string)),
+									Port: util.IntAll(to["port"]),
+								}
+								node := &udpNode{by, addr, time.Now().Unix(), 0}
+								udptaskmap.Store(key, node)
+								udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+							}
 
 
-						if ids=="" {
-							ids=source.id
+							//
+							ids = []string{}
 						}else {
 						}else {
-							ids=ids+","+source.id
+							ids=append(ids,source.id)
 						}
 						}
+
 					}
 					}
 				}
 				}
 				//if repeateN%150==0&&repeateN>0 {
 				//if repeateN%150==0&&repeateN>0 {
@@ -405,7 +429,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				"lteid": eid,
 				"lteid": eid,
 				"stype": util.ObjToString(to["stype"]),
 				"stype": util.ObjToString(to["stype"]),
 				"key":   key,
 				"key":   key,
-				"ids":   ids,
+				"ids":   strings.Join(ids, ","),
 			})
 			})
 			addr := &net.UDPAddr{
 			addr := &net.UDPAddr{
 				IP:   net.ParseIP(to["addr"].(string)),
 				IP:   net.ParseIP(to["addr"].(string)),