zhangjinkun 5 vuotta sitten
vanhempi
commit
baab2586bf
1 muutettua tiedostoa jossa 23 lisäystä ja 13 poistoa
  1. 23 13
      fullproject/src_v1/task.go

+ 23 - 13
fullproject/src_v1/task.go

@@ -39,9 +39,9 @@ type ProjectTask struct {
 	mapPb, mapPn, mapPc map[string]*Key
 	//	mapPbLock, mapPnLock, mapPcLock sync.Mutex
 	//更新或新增通道
-	updatePool           chan []map[string]interface{}
-	savePool             chan map[string]interface{}
-	saveSign, updateSign chan bool
+	updatePool chan []map[string]interface{}
+	//savePool   chan map[string]interface{}
+	//saveSign, updateSign chan bool
 	//表名
 	coll string
 	//当前状态是全量还是增量
@@ -67,16 +67,17 @@ func NewPT() *ProjectTask {
 		thread:      4,
 		updatePool:  make(chan []map[string]interface{}, 5000),
 		//savePool:    make(chan map[string]interface{}, 2000),
-		wg:         sync.WaitGroup{},
-		AllIdsMap:  make(map[string]*ID, 5000000),
-		mapPb:      make(map[string]*Key, 1500000),
-		mapPn:      make(map[string]*Key, 5000000),
-		mapPc:      make(map[string]*Key, 5000000),
-		saveSize:   400,
-		saveSign:   make(chan bool, 1),
-		updateSign: make(chan bool, 1),
-		coll:       ProjectColl,
-		validTime:  int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
+		wg:        sync.WaitGroup{},
+		AllIdsMap: make(map[string]*ID, 5000000),
+		mapPb:     make(map[string]*Key, 1500000),
+		mapPn:     make(map[string]*Key, 5000000),
+		mapPc:     make(map[string]*Key, 5000000),
+		saveSize:  400,
+
+		//saveSign:   make(chan bool, 1),
+		//updateSign: make(chan bool, 1),
+		coll:      ProjectColl,
+		validTime: int64(util.IntAllDef(Sysconfig["validdays"], 150) * 86400),
 	}
 	return p
 }
@@ -86,6 +87,7 @@ var P_QL *ProjectTask
 //初始化全量合并对象
 func init() {
 	P_QL = NewPT()
+	log.Println(len(P_QL.updatePool))
 	go P_QL.updateAllQueue()
 	go P_QL.clearMem()
 }
@@ -280,6 +282,14 @@ func (p *ProjectTask) taskZl(udpInfo map[string]interface{}) {
 		//生成查询语句执行
 		p.enter(db, coll, q)
 	}
+	for {
+		if len(P_QL.updatePool) > 0 {
+			log.Println("等待调用udp", len(P_QL.updatePool))
+			time.Sleep(1 * time.Second)
+		} else {
+			break
+		}
+	}
 	if udpInfo["stop"] == nil {
 		nextNode(udpInfo, p.pici)
 	}