Browse Source

融合数据-新增融合组表 ,进行融合业务逻辑

apple 4 years ago
parent
commit
9e4334ce89

+ 2 - 2
udpfilterdup/src/datamap.go

@@ -524,7 +524,7 @@ func (d *datamap) GetLatelyFiveDayDouble(t int64) []string  {//增量-两倍
 
 
 
-//替换原始数据池
+//替换原始数据池-更新
 func (d *datamap) replacePoolData(newData *Info) {
 	d.lock.Lock()
 	ct := newData.publishtime
@@ -543,7 +543,7 @@ func (d *datamap) replacePoolData(newData *Info) {
 
 
 
-//替换原始数据池
+//替换 - A-B - 原始数据池
 func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
 	//删除数据池的老数据
 	ct_old := oldData.publishtime

+ 1 - 0
udpfusion/src/config.json

@@ -18,6 +18,7 @@
     "index": "zktest",
     "type": "zktest"
   },
+  "group_coll_name":"zk_fusion_group",
   "fusion_coll_name":"zk_fusiondata",
   "record_coll_name":"zk_recorddata",
   "isgroupfn": 1000,

+ 92 - 72
udpfusion/src/fusionFullData.go

@@ -11,45 +11,41 @@ import (
 	es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
 )
 
-func startFusionData()  {
+func exportFusionMongoData()  {
+
 	start := int(time.Now().Unix())
-	log.Println("开始遍历索引-进行融合............")
+	log.Println("开始导出融合组数据......")
 	//遍历索引
 	esclient := elastic.GetEsConn()
 	defer elastic.DestoryEsConn(esclient)
 
 	if esclient == nil {
-		log.Fatalln("连接池异常")
+		log.Println("连接池异常")
 	}
 	q :=es_elastic.NewBoolQuery()
 	cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
 		Size(200).Do()
 	if err != nil {
-		log.Fatal("cursor",err)
+		log.Println("cursor",err)
 	}
 	if cursor.Results == nil {
-		log.Fatalf("results != nil; got nil")
+		log.Println("results != nil; got nil")
 	}
 	if cursor.Results.Hits == nil {
-		log.Fatalf("expected results.Hits != nil; got nil")
+		log.Println("expected results.Hits != nil; got nil")
 	}
-
-
 	log.Println("查询正常-总数:",cursor.TotalHits())
-
-
 	//多线程 - 处理数据
-	pool_mgo := make(chan bool, mgo_pool)
-	wg_mgo := &sync.WaitGroup{}
+	pool_es := make(chan bool, es_pool)
+	wg_es := &sync.WaitGroup{}
 	pages,numDocs := 0,0
-
 	for {
 		searchResult, err := cursor.Next()
 		if err != nil {
 			if err.Error() == "EOS" {
 				break
 			}else {
-				log.Fatal("cursor searchResult",err)
+				log.Println("cursor searchResult",err)
 			}
 		}
 		pages++
@@ -68,83 +64,105 @@ func startFusionData()  {
 
 			numDocs++
 			fusion_ids := qu.ObjToString(tmp["allids"])
-			fusionArr := strings.Split(fusion_ids, ",")
 			sourceid := qu.ObjToString(tmp["_id"])
-			pool_mgo <- true
-			wg_mgo.Add(1)
-			go func(sourceid string, fusionArr []string) {
+			pool_es <- true
+			wg_es.Add(1)
+			go func(sourceid string, fusionArr string) {
 				defer func() {
-					<-pool_mgo
-					wg_mgo.Done()
+					<-pool_es
+					wg_es.Done()
 				}()
-				weight := NewWeightData(fusionArr)
-				weight.analyzeBuildStandardData()
-
-				if len(fusionArr) <= 1 { //单组数据
-					saveFusionData, saveRecordData := weight.dealWithAddFusionStruct()
-					saveid := mgo.Save(fusion_coll_name, saveFusionData)
-					//新增-Record  批量新增-经测试-批量新增与多线程新增 速度306s-236s 相差20%的耗时
-					saveRecordData["_id"] = saveid
-					UpdateRecord.add_pool <- saveRecordData //批量新增
-
-					//批量更新Es -
-					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					//elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
-					//UpdateElastic.update_pool <- map[string]string{
-					//	"id":sourceid,
-					//	"updateStr":updateStr1+updateStr2,
-					//}
-
+				AddGroupPool.pool <- map[string]interface{}{
+					"_id":StringTOBsonId(sourceid),
+					"allids":fusion_ids,
+				}
+			}(sourceid, fusion_ids)
+		}
 
-					UpdateRecord.add_pool <- saveRecordData
-				}else {
-					saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct()
-					saveid := mgo.Save(fusion_coll_name, saveFusionData)
-					//新增-Record
-					saveRecordData["_id"] = saveid
-					UpdateRecord.add_pool <- saveRecordData //批量新增
+	}
+	log.Println("遍历Es结束......")
+	wg_es.Wait()
 
-					//批量更新Es -
-					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					//elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
-					//UpdateElastic.update_pool <- map[string]string{
-					//	"id":sourceid,
-					//	"updateStr":updateStr1+updateStr2,
-					//}
+	log.Println("fusion group over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
+}
 
-				}
-			}(sourceid, fusionArr)
+func startFusionData()  {
+	log.Println("开始全量融合流程...")
+	defer qu.Catch()
+	//可以开多程序-不同id段执行融合
+	q := map[string]interface{}{}
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(group_coll_name).Find(&q).Iter()
+	index,start :=0, int(time.Now().Unix())
+	//多线程保存数据
+	pool_mgo := make(chan bool, mgo_pool)
+	wg_mgo := &sync.WaitGroup{}
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		if index%10000==0 {
+			log.Println("current index",index,tmp["_id"])
 		}
+		fusion_ids := qu.ObjToString(tmp["allids"])
+		fusionArr := strings.Split(fusion_ids, ",")
+		sourceid := BsonTOStringId(tmp["_id"])
+		pool_mgo <- true
+		wg_mgo.Add(1)
+		go func(sourceid string, fusionArr []string) {
+			defer func() {
+				<-pool_mgo
+				wg_mgo.Done()
+			}()
+			weight := NewWeightData(fusionArr)
+			weight.analyzeBuildStandardData()
+
+			saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
+			if len(fusionArr) <= 1 {
+				saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
+			}else {
+				saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct()
+			}
+			//新增融合表
+			saveid := mgo.Save(fusion_coll_name, saveFusionData)
+			saveRecordData["_id"] = saveid
+			//批量新增日志表
+			AddRecordPool.pool <- saveRecordData
+			//批量更新分组表
+			UpdateGroupPool.pool <- []map[string]interface{}{
+				map[string]interface{}{
+					"_id": StringTOBsonId(sourceid),
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"fusion_id": BsonTOStringId(saveid),
+						"template_id":qu.ObjToString(saveFusionData["fusion_templateid"]),
+					},
+				},
+			}
+		}(sourceid, fusionArr)
 
+		tmp = make(map[string]interface{})
 	}
-	log.Println("遍历Es结束......")
 	wg_mgo.Wait()
 
-	log.Println("fusion is over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
+	log.Println("fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒")
 
 }
 
-func goUpdateEs(sourceid string,updateStr string)  {
-	UpdateElastic.update_pool <- map[string]string{
-		"id":sourceid,
-		"updateStr":updateStr,
-	}
-}
 
 func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 
+	//先到处具体需要融合组数据-存mongo
+	exportFusionMongoData()
+	time.Sleep(30 * time.Second)
 
+	//具体融合数据的方法
 	startFusionData()
+	time.Sleep(30 * time.Second)
 	return
 
 
 	log.Println("开始全量融合流程")
 	defer qu.Catch()
-
 	//区间id
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -258,16 +276,18 @@ func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 
 	}
 	log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒")
-
-
 	time.Sleep(30 * time.Second)
 
-
+	//先到处具体需要融合组数据-存mongo
+	exportFusionMongoData()
+	time.Sleep(30 * time.Second)
 	//具体融合数据的方法
 	startFusionData()
-
-	log.Println("睡眠30秒,然后在发广播")
 	time.Sleep(30 * time.Second)
 
 	taskSendFusionUdp(mapInfo)
 }
+
+
+
+

+ 41 - 20
udpfusion/src/main.go

@@ -20,16 +20,20 @@ var (
 	nextNode     						[]map[string]interface{} 	//下节点数组
 	coll_name 	 						string
 	fusion_coll_name,record_coll_name	string						//新增表名
+	group_coll_name						string						//融合组表记录
 	NoNeedFusionKey 					map[string]interface{}   	//不需要融合的key
-
-	//UpdateFusion						*updateFusionInfo			//更新Fusion池
-	UpdateRecord						*updateRecordInfo			//更新Record池
-	UpdateElastic						*updateEsInfo				//更新Es池
-
 	siteJsonData						map[string]string			//站点池
 	esIndex,esType					    string						//索引-类型
 	mgo_pool,es_pool,isgroupfn			int
 	updatelock 							sync.Mutex
+	IsFull								bool
+
+	AddGroupPool						*addGroupInfo				//更新池相关
+	UpdateGroupPool						*updateGroupInfo
+	AddFusionPool						*addFusionInfo
+	UpdateFusionPool					*updateFusionInfo
+	AddRecordPool						*addRecordInfo
+	UpdateRecordPool					*updateRecordInfo
 )
 
 
@@ -46,6 +50,7 @@ func initMgoAndSite()  {
 
 	coll_name = mgoconf["collName"].(string)
 	mgo_pool = qu.IntAllDef(mgoconf["mgo_pool"], 5)
+	group_coll_name = sysconfig["group_coll_name"].(string)
 	fusion_coll_name = sysconfig["fusion_coll_name"].(string)
 	record_coll_name = sysconfig["record_coll_name"].(string)
 	isgroupfn = qu.IntAllDef(sysconfig["isgroupfn"], 10000)
@@ -76,24 +81,43 @@ func initEs()  {
 
 }
 
-func init() {
-	//加载配置文件
-	qu.ReadConfig(&sysconfig)
-	initMgoAndSite()
-	initEs()
+func initAddUpdatePool()  {
+	IsFull = true //全量数据
+	if IsFull {
+		AddGroupPool = newAddGroupPool()
+		go AddGroupPool.addGroupData()
+
+		UpdateGroupPool = newUpdateGroupPool()
+		go UpdateGroupPool.updateGroupData()
 
+		AddFusionPool = newAddFusionPool()
+		go AddFusionPool.addFusionData()
 
-	//增量修改方式
-	UpdateRecord = newAddRecordPool()
-	go UpdateRecord.addRecordData()
+		AddRecordPool = newaddRecordPool()
+		go AddRecordPool.addRecordData()
 
-	UpdateElastic = newUpdateEsPool()
-	go UpdateElastic.updateEsData()
+	}else {
+		AddFusionPool = newAddFusionPool()
+		go AddFusionPool.addFusionData()
 
+		UpdateFusionPool = newupdateFusionPool()
+		go UpdateFusionPool.updateFusionData()
 
-	log.Println("采用udp模式")
+		AddRecordPool = newaddRecordPool()
+		go AddRecordPool.addRecordData()
+
+		UpdateRecordPool = newupdateRecordPool()
+		go UpdateRecordPool.updateRecordData()
+	}
 }
 
+func init() {
+	//加载配置文件
+	qu.ReadConfig(&sysconfig)
+	initMgoAndSite()
+	initEs()
+	initAddUpdatePool()
+}
 
 func mainT() {
 	go checkMapJob()
@@ -106,12 +130,9 @@ func mainT() {
 
 //快速测试使用
 func main() {
-	//0101-0301
-	//sid := "5fedf5800000000000000000"
-	//eid := "603bbe000000000000000000"
+	log.Println("开始测试... ...")
 	sid := "1fedf5800000000000000000"
 	eid := "903bbe000000000000000000"
-	//log.Println(sid, "---", eid)
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
 		log.Println("sid,eid参数不能为空")

+ 0 - 67
udpfusion/src/updateElastic.go

@@ -1,67 +0,0 @@
-package main
-
-import (
-	"qfw/util/elastic"
-	"log"
-	"time"
-)
-
-type updateEsInfo struct {
-
-	//更新通道
-	update_pool chan map[string]string
-	//数量
-	saveSize   	int
-
-}
-
-
-
-
-
-var sp_es = make(chan bool, 10)
-
-//批量更新对象
-func newUpdateEsPool() *updateEsInfo {
-	update:=&updateEsInfo{make(chan map[string]string, 50000),200}
-	return update
-}
-
-//更新池
-func (update *updateEsInfo) updateEsData() {
-	log.Println("监听Es......更新数据")
-	tmpArr := make([]map[string]string, update.saveSize)
-	tmpIndex := 0
-	for {
-		select {
-		case value := <-update.update_pool:
-			tmpArr[tmpIndex] = value
-			tmpIndex++
-			if tmpIndex == update.saveSize {
-				sp_es <- true
-				go func(dataArr []map[string]string) {
-					defer func() {
-						<-sp_es
-					}()
-					//批量更新
-					elastic.BulkUpdateArr(esIndex,esType,dataArr)
-				}(tmpArr)
-				tmpArr = make([]map[string]string, update.saveSize)
-				tmpIndex = 0
-			}
-		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
-			if tmpIndex > 0 {
-				sp_es <- true
-				go func(dataArr []map[string]string) {
-					defer func() {
-						<-sp_es
-					}()
-					//批量更新
-					elastic.BulkUpdateArr(esIndex,esType,dataArr)
-				}(tmpArr[:tmpIndex])
-				tmpArr = make([]map[string]string, update.saveSize)
-				tmpIndex = 0
-			}
-		}
-	}
-}

+ 0 - 63
udpfusion/src/updateFusion.go

@@ -1,63 +0,0 @@
-package main
-
-import (
-	"log"
-	"time"
-)
-
-type updateFusionInfo struct {
-
-	//更新或新增通道
-	updatePool chan []map[string]interface{}
-	//数量
-	saveSize   	int
-
-}
-
-
-
-
-var sp_f = make(chan bool, 5)
-
-func newUpdateFusionPool() *updateFusionInfo {
-	update:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),100}
-	return update
-}
-
-
-func (update *updateFusionInfo) updateFusionData() {
-	log.Println("监听--融合更新数据")
-
-	tmpArr := make([][]map[string]interface{}, update.saveSize)
-	tmpIndex := 0
-	for {
-		select {
-		case value := <-update.updatePool:
-			tmpArr[tmpIndex] = value
-			tmpIndex++
-			if tmpIndex == update.saveSize {
-				sp_f <- true
-				go func(dataArr [][]map[string]interface{}) {
-					defer func() {
-						<-sp_f
-					}()
-					mgo.UpSertBulk(fusion_coll_name, dataArr...)
-				}(tmpArr)
-				tmpArr = make([][]map[string]interface{}, update.saveSize)
-				tmpIndex = 0
-			}
-		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
-			if tmpIndex > 0 {
-				sp_f <- true
-				go func(dataArr [][]map[string]interface{}) {
-					defer func() {
-						<-sp_f
-					}()
-					mgo.UpSertBulk(fusion_coll_name, dataArr...)
-				}(tmpArr[:tmpIndex])
-				tmpArr = make([][]map[string]interface{}, update.saveSize)
-				tmpIndex = 0
-			}
-		}
-	}
-}

+ 296 - 0
udpfusion/src/updateMethod.go

@@ -0,0 +1,296 @@
+package main
+
+
+import (
+	"time"
+)
+
+//新增组
+type addGroupInfo struct {
+	pool chan map[string]interface{}
+	saveSize   	int
+}
+//更新组
+type updateGroupInfo struct {
+	pool chan []map[string]interface{}
+	saveSize   	int
+}
+
+//新增融合
+type addFusionInfo struct {
+	pool chan map[string]interface{}
+	saveSize   	int
+}
+//更新融合
+type updateFusionInfo struct {
+	pool chan []map[string]interface{}
+	saveSize   	int
+}
+//新增日志
+type addRecordInfo struct {
+	pool chan map[string]interface{}
+	saveSize   	int
+}
+//更新融合
+type updateRecordInfo struct {
+	pool chan []map[string]interface{}
+	saveSize   	int
+}
+
+var sp = make(chan bool,5)
+
+
+func newAddGroupPool() *addGroupInfo {
+	info:=&addGroupInfo{make(chan map[string]interface{}, 50000),200}
+	return info
+}
+func newUpdateGroupPool() *updateGroupInfo {
+	info:=&updateGroupInfo{make(chan []map[string]interface{}, 50000),200}
+	return info
+}
+func newAddFusionPool() *addFusionInfo {
+	info:=&addFusionInfo{make(chan map[string]interface{}, 50000),200}
+	return info
+}
+func newupdateFusionPool() *updateFusionInfo {
+	info:=&updateFusionInfo{make(chan []map[string]interface{}, 50000),200}
+	return info
+}
+func newaddRecordPool() *addRecordInfo {
+	info:=&addRecordInfo{make(chan map[string]interface{}, 50000),200}
+	return info
+}
+func newupdateRecordPool() *updateRecordInfo {
+	info:=&updateRecordInfo{make(chan []map[string]interface{}, 50000),200}
+	return info
+}
+
+
+//新增-组数据
+func (info *addGroupInfo) addGroupData() {
+	tmpArr := make([]map[string]interface{}, info.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-info.pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == info.saveSize {
+				sp <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					mgo.SaveBulk(group_coll_name, dataArr...)
+				}(tmpArr)
+				tmpArr = make([]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					mgo.SaveBulk(group_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}
+
+//更新-组数据
+func (info *updateGroupInfo) updateGroupData() {
+	tmpArr := make([][]map[string]interface{}, info.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-info.pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == info.saveSize {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					//批量更新
+					mgo.UpSertBulk(group_coll_name, dataArr...)
+				}(tmpArr)
+				tmpArr = make([][]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					//批量更新
+					mgo.UpSertBulk(group_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([][]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}
+
+
+
+
+//新增融合数据
+func (info *addFusionInfo) addFusionData() {
+	tmpArr := make([]map[string]interface{}, info.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-info.pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == info.saveSize {
+				sp <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					mgo.SaveBulk(fusion_coll_name, dataArr...)
+				}(tmpArr)
+				tmpArr = make([]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					mgo.SaveBulk(fusion_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}
+
+
+
+//更新融合数据
+func (info *updateFusionInfo) updateFusionData() {
+	tmpArr := make([][]map[string]interface{}, info.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-info.pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == info.saveSize {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					//批量更新
+					mgo.UpSertBulk(fusion_coll_name, dataArr...)
+				}(tmpArr)
+				tmpArr = make([][]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					//批量更新
+					mgo.UpSertBulk(fusion_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([][]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}
+
+//新增日志数据
+func (info *addRecordInfo) addRecordData() {
+	tmpArr := make([]map[string]interface{}, info.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-info.pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == info.saveSize {
+				sp <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					mgo.SaveBulk(record_coll_name, dataArr...)
+				}(tmpArr)
+				tmpArr = make([]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					mgo.SaveBulk(record_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}
+
+
+
+//更新日志数据
+func (info *updateRecordInfo) updateRecordData() {
+	tmpArr := make([][]map[string]interface{}, info.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-info.pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == info.saveSize {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					//批量更新
+					mgo.UpSertBulk(record_coll_name, dataArr...)
+				}(tmpArr)
+				tmpArr = make([][]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp <- true
+				go func(dataArr [][]map[string]interface{}) {
+					defer func() {
+						<-sp
+					}()
+					//批量更新
+					mgo.UpSertBulk(record_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([][]map[string]interface{}, info.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}

+ 0 - 132
udpfusion/src/updateRecord.go

@@ -1,132 +0,0 @@
-package main
-
-import (
-	"log"
-	"time"
-)
-
-type updateRecordInfo struct {
-
-	//新增通道
-	add_pool chan map[string]interface{}
-	//更新通道
-	update_pool chan []map[string]interface{}
-	//数量
-	saveSize   	int
-
-}
-
-
-
-
-
-var sp_r = make(chan bool,10)
-
-//批量更新对象
-func newUpdateRecordPool() *updateRecordInfo {
-	update:=&updateRecordInfo{nil,make(chan []map[string]interface{}, 50000),200}
-	return update
-}
-//批量新增对象
-func newAddRecordPool() *updateRecordInfo {
-	update:=&updateRecordInfo{make(chan map[string]interface{}, 50000),nil,200}
-	return update
-}
-
-
-//新增池
-func (update *updateRecordInfo) addRecordData() {
-	log.Println("监听日志......新增数据")
-	tmpArr := make([]map[string]interface{}, update.saveSize)
-	tmpIndex := 0
-	for {
-		select {
-		case value := <-update.add_pool:
-			tmpArr[tmpIndex] = value
-			tmpIndex++
-			if tmpIndex == update.saveSize {
-				sp_r <- true
-				go func(dataArr []map[string]interface{}) {
-					defer func() {
-						<-sp_r
-					}()
-					//批量新增
-					mgo.SaveBulk(record_coll_name, dataArr...)
-
-					/*
-
-
-					*/
-
-					//批量更新Es -问题耗时
-					//fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
-					//updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
-					//updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
-					//elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
-					//UpdateElastic.update_pool <- map[string]string{
-					//	"id":sourceid,
-					//	"updateStr":updateStr1+updateStr2,
-					//}
-
-
-
-				}(tmpArr)
-				tmpArr = make([]map[string]interface{}, update.saveSize)
-				tmpIndex = 0
-			}
-		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
-			//log.Println("10秒检测",tmpIndex)
-			if tmpIndex > 0 {
-				sp_r <- true
-				go func(dataArr []map[string]interface{}) {
-					defer func() {
-						<-sp_r
-					}()
-					//批量新增
-					mgo.SaveBulk(record_coll_name, dataArr...)
-				}(tmpArr[:tmpIndex])
-				tmpArr = make([]map[string]interface{}, update.saveSize)
-				tmpIndex = 0
-			}
-		}
-	}
-}
-
-//更新池
-func (update *updateRecordInfo) updateRecordData() {
-	log.Println("监听日志......更新数据")
-	tmpArr := make([][]map[string]interface{}, update.saveSize)
-	tmpIndex := 0
-	for {
-		select {
-		case value := <-update.update_pool:
-			tmpArr[tmpIndex] = value
-			tmpIndex++
-			if tmpIndex == update.saveSize {
-				sp_r <- true
-				go func(dataArr [][]map[string]interface{}) {
-					defer func() {
-						<-sp_r
-					}()
-					//批量更新
-					mgo.UpSertBulk(record_coll_name, dataArr...)
-				}(tmpArr)
-				tmpArr = make([][]map[string]interface{}, update.saveSize)
-				tmpIndex = 0
-			}
-		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
-			if tmpIndex > 0 {
-				sp_r <- true
-				go func(dataArr [][]map[string]interface{}) {
-					defer func() {
-						<-sp_r
-					}()
-					//批量更新
-					mgo.UpSertBulk(record_coll_name, dataArr...)
-				}(tmpArr[:tmpIndex])
-				tmpArr = make([][]map[string]interface{}, update.saveSize)
-				tmpIndex = 0
-			}
-		}
-	}
-}