Browse Source

融合批量更新

apple 4 years ago
parent
commit
abee5ac3bd

+ 4 - 3
udpfusion/src/config.json

@@ -3,9 +3,9 @@
   "mongodb": {
     "addrName": "192.168.3.207:27092",
     "dbName": "zhengkun",
-    "collName": "fusion_test",
-    "pool": 10,
-    "mgo_pool": 3,
+    "collName": "all_01_02_fusiontest",
+    "pool": 20,
+    "mgo_pool": 10,
     "site": {
       "dbname": "zhengkun",
       "coll": "site"
@@ -20,6 +20,7 @@
   },
   "fusion_coll_name":"zk_fusiondata",
   "record_coll_name":"zk_recorddata",
+  "isgroupfn": 1000,
   "jkmail": {
     "to": "zhengkun@topnet.net.cn",
     "api": "http://10.171.112.160:19281/_send/_mail"

+ 423 - 174
udpfusion/src/fusionAddData.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"fmt"
 	"log"
 	qu "qfw/util"
 	"qfw/util/elastic"
@@ -10,9 +9,10 @@ import (
 	"time"
 )
 
-//增量-融合-一小段
-func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
-	log.Println("开始增量融合流程")
+
+func startTaskAddAddData(data []byte, mapInfo map[string]interface{}) {
+
+	log.Println("开始全量融合流程")
 	defer qu.Catch()
 	//区间id
 	q := map[string]interface{}{
@@ -26,14 +26,9 @@ func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
 	//编译不同的融合组,如何划分组
+	fusionDataGroupMap := make(map[string][]string,0) //待融合组
 
-	//待融合组
-	fusionDataGroupArr := make([][]string,0)
-	//需要更新组
-	updateFusionMap,curFusionKeyMap:=make(map[string]interface{},0),make(map[string]interface{},0)
-	//重复数据组
-	norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0
-
+	norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0 //重复数据组
 
 	start := int(time.Now().Unix())
 	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
@@ -47,224 +42,478 @@ func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
 			repeatArr = append(repeatArr,tmpId)
 			sourceArr = append(sourceArr,sourceid)
 		}else {
-			norepeatArr = append(repeatArr,tmpId)
+			norepeatArr = append(norepeatArr,tmpId)
 		}
+
 		tmp = make(map[string]interface{})
 	}
 
-	log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
+	log.Println("task first:",index,len(norepeatArr),"+",len(repeatArr))
 	log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
 
 	//根据重复组,重新划分新的组别
 	start = int(time.Now().Unix())
-	elastic.InitElasticSize("http://192.168.3.11:9800",10)
+
+	//多线程升索引
+	pool_es := make(chan bool, es_pool)
+	wg_es := &sync.WaitGroup{}
+	tmpEsMap := make(map[string]string,0)
+	isGroupNum := 1000
 	for i:=0;i<len(repeatArr);i++ {
-		//查询ES-升索引
-		repeatid := repeatArr[i]
-		sourceid := sourceArr[i]
-		key := fmt.Sprintf("%s",sourceid)
-		dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
-		if len(dataArr)>0 { //存在值
+		if i%10000 == 0 {
+			log.Println("curent index ",i)
+		}
+		if i%isGroupNum==0 && i!=0 {
+			//新的一组执行上一组生索引
+			for k,v:=range tmpEsMap {
+				pool_es <- true
+				wg_es.Add(1)
+				go func(es_id string,cur_ids string) {
+					defer func() {
+						<-pool_es
+						wg_es.Done()
+					}()
+					if es_id!="" && cur_ids!="" {
+						dataArr := *elastic.GetById(esIndex,esType,es_id)
+						if len(dataArr)>0 { //存在-更新
+							allids := qu.ObjToString(dataArr[0]["allids"])
+							allids = allids+","+cur_ids
+							updateStr := `ctx._source.allids=`+ `"`+allids+`"`
+							elastic.Update(esIndex,esType,es_id, updateStr)
+						}else { //不存在-新增
+							savetmp := make(map[string]interface{}, 0)
+							savetmp["allids"] = cur_ids
+							savetmp["_id"] = StringTOBsonId(es_id)
+							savetmp["template_id"] = ""
+							savetmp["fusion_id"] = ""
+							elastic.Save(esIndex, esType, savetmp)
+						}
+					}else {
+						log.Println("异常",es_id,cur_ids)
+					}
+				}(k,v)
 
-			if curFusionKeyMap[key]==nil { //存在融合表-不在当前id段落内
-				updateFusionMap[key] = ""
-			}
-			//es 随时更新ids
-			allids := qu.ObjToString(dataArr[0]["allids"])
-			allids = allids+","+repeatid
-			updateStr := `ctx._source.allids=`+ `"`+allids+`"`
-			b:=elastic.Update("allzktest","allzktest",sourceid, updateStr)
-			if !b {
-				log.Println("es更新异常",repeatid,sourceid)
 			}
+			wg_es.Wait()
+
+			tmpEsMap = make(map[string]string,0)
+		}
+		//新增一条数据
+		repeatid :=repeatArr[i]
+		sourceid := sourceArr[i]
+		if tmpEsMap[sourceid]!="" {
+			ids := tmpEsMap[sourceid]
+			ids = ids+","+repeatid
+			tmpEsMap[sourceid] = ids
 		}else {
-			//索引查不到-确定新增- es 随时新增ids
-			savetmp := make(map[string]interface{}, 0)
-			savetmp["allids"] = repeatid
-			savetmp["_id"] = StringTOBsonId(sourceid)
-			b:=elastic.Save("allzktest", "allzktest", savetmp)
-			if !b {
-				log.Println("es保存异常",repeatid,sourceid)
-			}
-			curFusionKeyMap[key] = ""
+			tmpEsMap[sourceid] = sourceid+","+repeatid
+		}
+	}
+
+	//处理剩余数据
+	if len(tmpEsMap)>0 {
+		for k,v:=range tmpEsMap {
+			pool_es <- true
+			wg_es.Add(1)
+			go func(es_id string,cur_ids string) {
+				defer func() {
+					<-pool_es
+					wg_es.Done()
+				}()
+				dataArr := *elastic.GetById(esIndex,esType,es_id)
+				if len(dataArr)>0 { //存在-更新
+					allids := qu.ObjToString(dataArr[0]["allids"])
+					allids = allids+","+cur_ids
+					updateStr := `ctx._source.allids=`+ `"`+allids+`"`
+					elastic.Update(esIndex,esType,es_id, updateStr)
+				}else { //不存在-新增
+					savetmp := make(map[string]interface{}, 0)
+					savetmp["allids"] = cur_ids
+					savetmp["_id"] = StringTOBsonId(es_id)
+					savetmp["template_id"] = ""
+					savetmp["fusion_id"] = ""
+					elastic.Save(esIndex,esType, savetmp)
+				}
+			}(k,v)
 		}
+		wg_es.Wait()
+		tmpEsMap = make(map[string]string,0)
+
 	}
 
-	log.Println("前置索引准备完毕... ...","耗时:",int(time.Now().Unix())-start,"秒")
 
+	log.Println("前置索引准备完毕......耗时:",int(time.Now().Unix())-start,"秒")
 
 	start = int(time.Now().Unix())
 	log.Println("开始数据分组... ... ... ...")
 	log.Println("开始数据分组... ... ... ...")
 	log.Println("开始数据分组... ... ... ...")
 
-	//当前段落组
+	//查询分组-多线程
 	for i:=0;i<len(norepeatArr);i++ {
-		sourceid:=norepeatArr[i]
-		dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
-		if len(dataArr)>0 { //存在值
-			allids := qu.ObjToString(dataArr[0]["allids"])
-			arr := strings.Split(allids,",")
-			arr = append(arr,sourceid)
-			fusionDataGroupArr = append(fusionDataGroupArr,arr)
-		}else {
-			arr:=[]string{sourceid}
-			fusionDataGroupArr = append(fusionDataGroupArr,arr)
+		if i%10000==0 {
+			log.Println("cur index ",i,norepeatArr[i])
 		}
-	}
-	//更新组
-	for k,_:=range updateFusionMap {
-		sourceid:=qu.ObjToString(k)
-		dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
-		if len(dataArr)>0 { //存在值
-			allids := qu.ObjToString(dataArr[0]["allids"])
-			arr := strings.Split(allids,",")
-			arr = append(arr,sourceid)
-			fusionDataGroupArr = append(fusionDataGroupArr,arr)
-		}else {
-			log.Println("融合表更新,查询Es异常:",sourceid)
-		}
-	}
+		sourceid:=norepeatArr[i]
+		pool_es <- true
+		wg_es.Add(1)
+		go func(sourceid string) {
+			defer func() {
+				<-pool_es
+				wg_es.Done()
+			}()
+			dataArr := *elastic.GetById(esIndex,esType,sourceid)
+			if len(dataArr)>0 { //存在值
+				allids := qu.ObjToString(dataArr[0]["allids"])
+				arr := strings.Split(allids,",")
+				updatelock.Lock()
+				fusionDataGroupMap[sourceid] = arr
+				updatelock.Unlock()
+			}else {
+				arr:=[]string{sourceid}
+				updatelock.Lock()
+				fusionDataGroupMap[sourceid] = arr
+				updatelock.Unlock()
+
+
+			}
 
+		}(sourceid)
+	}
+	wg_es.Wait()
 
 
-	//isErrNum:=0
-	//for i:=0;i<len(repeatArr);i++ {
-	//	sourceid := sourceArr[i]
-	//	isAddExist,index := false,0
-	//	//根据原sourceid 直接遍历组
-	//R:	for k,v:=range fusionDataGroupArr{
-	//		for _,v1:=range v{
-	//			if v1==sourceid {
-	//				index = k
-	//				isAddExist = true
-	//				break R
-	//			}
-	//		}
-	//	}
-	//	if i%1000 == 0 {
-	//		log.Println("分组中...","current index",i,repeatArr[i])
-	//	}
-	//
-	//	if isAddExist { //数组截取替换-找到指定
-	//		arr := make([]string,0)
-	//		arr = fusionDataGroupArr[index]
-	//		arr = append(arr,repeatArr[i])//组拼接当前id
-	//		fusionDataGroupArr[index] = arr
-	//		log.Println("... ... 正常单组新增",i)
-	//
-	//	}else {//当前段落未找到-需要查询融合表,,遍历融合表
-	//		arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
-	//		arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
-	//		arr = append(arr,repeatArr[i])//组拼接当前id
-	//
-	//
-	//
-	//
-	//
-	//		if len(arr)==1 { //异常错误,新增
-	//			isErrNum++
-	//			log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
-	//			arr_error := make([]string,0)
-	//			arr_error = append(arr_error,repeatArr[i])//组拼接当前id
-	//			fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
-	//			addOrUpdateArr = append(addOrUpdateArr,false)
-	//			infoFusionArr = append(infoFusionArr, map[string]interface{}{})
-	//		}else { //正常更新
-	//			log.Println("... ... 正常多组新增",i)
-	//			fusionDataGroupArr = append(fusionDataGroupArr,arr)
-	//			addOrUpdateArr = append(addOrUpdateArr,true)
-	//			infoFusionArr = append(infoFusionArr,fusionTmpData)
-	//		}
-	//
-	//	}
-	//	//不断改变中
-	//	if i%1000 == 0 {
-	//		log.Println("当前分组数量:",len(fusionDataGroupArr))
-	//	}
-	//}
-
-
-
-
-	log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
+	log.Println("最终待融合分组数量:",len(fusionDataGroupMap))
 	log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
 	log.Println("********************分割线********************")
 	log.Println("********************分割线********************")
 	log.Println("********************分割线********************")
 
 
-	log.Println("开始处理分组融合... ... ... ...")
-	log.Println("开始处理分组融合... ... ... ...")
-	log.Println("开始处理分组融合... ... ... ...")
+	log.Println("开始进行正式分组融合......先睡秒30秒")
+	time.Sleep(30 * time.Second)
 
 	start = int(time.Now().Unix())
 	//多线程 - 处理数据
-	pool := make(chan bool, 3)
-	wg := &sync.WaitGroup{}
-
-	for i:=0;i<len(fusionDataGroupArr);i++ {
-		fusionArr := fusionDataGroupArr[i]
-		pool <- true
-		wg.Add(1)
-		go func(fusionArr []string,i int) {
+	pool_mgo := make(chan bool, mgo_pool)
+	wg_mgo := &sync.WaitGroup{}
+
+	fusionIndex:=0
+	for k,v:=range fusionDataGroupMap {
+		fusionIndex++
+		pool_mgo <- true
+		wg_mgo.Add(1)
+		go func(sourceid string ,fusionArr []string,fusionIndex int) {
 			defer func() {
-				<-pool
-				wg.Done()
+				<-pool_mgo
+				wg_mgo.Done()
 			}()
-			//构建数据
-			if (i+1)%500 == 0 {
-				log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
+			if fusionIndex % 10000==0 {
+				log.Println("数据融合数量:",fusionIndex,sourceid)
 			}
 			weight :=NewWeightData(fusionArr)
-			////整理数据-筛选排名,模板
 			weight.analyzeBuildStandardData()
-
-			if len(fusionArr)<=1 {
+			if len(fusionArr)<=1 { //单组数据-需要新增Es
 				saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
 				saveid:=mgo.Save(fusion_coll_name,saveFusionData)
 				saveRecordData["_id"] = saveid
 				mgo.Save(record_coll_name,saveRecordData)
-			}else {
-				//if addOrUpdateArr[i] {
-				//	//log.Println("多组更新... ...")
-				//	tmpdata:=infoFusionArr[i]
-				//	updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
-				//
-				//	UpdateFusion.updatePool <- []map[string]interface{}{
-				//		map[string]interface{}{
-				//			"_id": tmpdata["_id"],
-				//		},
-				//		updateFusionData,
-				//	}
-				//	UpdateRecord.updatePool <- []map[string]interface{}{
-				//		map[string]interface{}{
-				//			"_id": tmpdata["_id"],
-				//		},
-				//		updateRecordData,
-				//	}
-				//}else {
-				//	//log.Println("多组生成... ...")
-				//	saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
-				//	saveid:=mgo.Save(fusion_coll_name,saveFusionData)
-				//	saveRecordData["_id"] = saveid
-				//	mgo.Save(record_coll_name,saveRecordData)
-				//}
-			}
 
+				//新增es
+				savetmp := make(map[string]interface{}, 0)
+				fusionid:=BsonTOStringId(saveid)
+				savetmp["_id"] = StringTOBsonId(sourceid)
+				savetmp["allids"] = sourceid
+				savetmp["template_id"] = sourceid
+				savetmp["fusion_id"] = fusionid
+				elastic.Save(esIndex,esType,savetmp)
 
-		}(fusionArr,i)
 
+			}else {
+				saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
+				saveid:=mgo.Save(fusion_coll_name,saveFusionData)
+				saveRecordData["_id"] = saveid
+				mgo.Save(record_coll_name,saveRecordData)
 
+				//更新数据-融合id-模板id等 `ctx._source.age=101;ctx._source.name="张三"`
+				fusion_id,template_id:=BsonTOStringId(saveid),qu.ObjToString(saveFusionData["fusion_templateid"])
+				updateStr1 := `ctx._source.template_id=`+ `"`+template_id+`";`
+				updateStr2 := `ctx._source.fusion_id=`+ `"`+fusion_id+`"`
+				elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
 
+			}
+		}(k,v,fusionIndex)
 	}
 
-	wg.Wait()
+	wg_mgo.Wait()
 
-	log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
+	log.Println("fusion is over :",fusionIndex,len(fusionDataGroupMap),"用时:",int(time.Now().Unix())-start,"秒")
 	log.Println("睡眠30秒,然后在发广播")
+
 	time.Sleep(30 * time.Second)
+
 	//任务完成,开始发送广播通知下面节点
+
 	taskSendFusionUdp(mapInfo)
 
-}
+}
+
+
+//增量-融合-一小段
+//func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
+//	log.Println("开始增量融合流程")
+//	defer qu.Catch()
+//	//区间id
+//	q := map[string]interface{}{
+//		"_id": map[string]interface{}{
+//			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+//			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
+//		},
+//	}
+//	log.Println("查询条件:",q)
+//	sess := mgo.GetMgoConn()
+//	defer mgo.DestoryMongoConn(sess)
+//	it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
+//	//编译不同的融合组,如何划分组
+//
+//	//待融合组
+//	fusionDataGroupArr := make([][]string,0)
+//	//需要更新组
+//	updateFusionMap,curFusionKeyMap:=make(map[string]interface{},0),make(map[string]interface{},0)
+//	//重复数据组
+//	norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0
+//
+//
+//	start := int(time.Now().Unix())
+//	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+//		if index%10000 == 0 {
+//			log.Println("current index",index,tmp["_id"])
+//		}
+//		tmpId:=BsonTOStringId(tmp["_id"])
+//		repeat:=qu.IntAll(tmp["repeat"])
+//		sourceid:=qu.ObjToString(tmp["repeat_id"])
+//		if repeat==1 {
+//			repeatArr = append(repeatArr,tmpId)
+//			sourceArr = append(sourceArr,sourceid)
+//		}else {
+//			norepeatArr = append(repeatArr,tmpId)
+//		}
+//		tmp = make(map[string]interface{})
+//	}
+//
+//	log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
+//	log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
+//
+//	//根据重复组,重新划分新的组别
+//	start = int(time.Now().Unix())
+//	elastic.InitElasticSize("http://192.168.3.11:9800",10)
+//	for i:=0;i<len(repeatArr);i++ {
+//		//查询ES-升索引
+//		repeatid := repeatArr[i]
+//		sourceid := sourceArr[i]
+//		key := fmt.Sprintf("%s",sourceid)
+//		dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
+//		if len(dataArr)>0 { //存在值
+//
+//			if curFusionKeyMap[key]==nil { //存在融合表-不在当前id段落内
+//				updateFusionMap[key] = ""
+//			}
+//			//es 随时更新ids
+//			allids := qu.ObjToString(dataArr[0]["allids"])
+//			allids = allids+","+repeatid
+//			updateStr := `ctx._source.allids=`+ `"`+allids+`"`
+//			b:=elastic.Update("allzktest","allzktest",sourceid, updateStr)
+//			if !b {
+//				log.Println("es更新异常",repeatid,sourceid)
+//			}
+//		}else {
+//			//索引查不到-确定新增- es 随时新增ids
+//			savetmp := make(map[string]interface{}, 0)
+//			savetmp["allids"] = repeatid
+//			savetmp["_id"] = StringTOBsonId(sourceid)
+//			b:=elastic.Save("allzktest", "allzktest", savetmp)
+//			if !b {
+//				log.Println("es保存异常",repeatid,sourceid)
+//			}
+//			curFusionKeyMap[key] = ""
+//		}
+//	}
+//
+//	log.Println("前置索引准备完毕... ...","耗时:",int(time.Now().Unix())-start,"秒")
+//
+//
+//	start = int(time.Now().Unix())
+//	log.Println("开始数据分组... ... ... ...")
+//	log.Println("开始数据分组... ... ... ...")
+//	log.Println("开始数据分组... ... ... ...")
+//
+//	//当前段落组
+//	for i:=0;i<len(norepeatArr);i++ {
+//		sourceid:=norepeatArr[i]
+//		dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
+//		if len(dataArr)>0 { //存在值
+//			allids := qu.ObjToString(dataArr[0]["allids"])
+//			arr := strings.Split(allids,",")
+//			arr = append(arr,sourceid)
+//			fusionDataGroupArr = append(fusionDataGroupArr,arr)
+//		}else {
+//			arr:=[]string{sourceid}
+//			fusionDataGroupArr = append(fusionDataGroupArr,arr)
+//		}
+//	}
+//	//更新组
+//	for k,_:=range updateFusionMap {
+//		sourceid:=qu.ObjToString(k)
+//		dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
+//		if len(dataArr)>0 { //存在值
+//			allids := qu.ObjToString(dataArr[0]["allids"])
+//			arr := strings.Split(allids,",")
+//			arr = append(arr,sourceid)
+//			fusionDataGroupArr = append(fusionDataGroupArr,arr)
+//		}else {
+//			log.Println("融合表更新,查询Es异常:",sourceid)
+//		}
+//	}
+//
+//
+//
+//	//isErrNum:=0
+//	//for i:=0;i<len(repeatArr);i++ {
+//	//	sourceid := sourceArr[i]
+//	//	isAddExist,index := false,0
+//	//	//根据原sourceid 直接遍历组
+//	//R:	for k,v:=range fusionDataGroupArr{
+//	//		for _,v1:=range v{
+//	//			if v1==sourceid {
+//	//				index = k
+//	//				isAddExist = true
+//	//				break R
+//	//			}
+//	//		}
+//	//	}
+//	//	if i%1000 == 0 {
+//	//		log.Println("分组中...","current index",i,repeatArr[i])
+//	//	}
+//	//
+//	//	if isAddExist { //数组截取替换-找到指定
+//	//		arr := make([]string,0)
+//	//		arr = fusionDataGroupArr[index]
+//	//		arr = append(arr,repeatArr[i])//组拼接当前id
+//	//		fusionDataGroupArr[index] = arr
+//	//		log.Println("... ... 正常单组新增",i)
+//	//
+//	//	}else {//当前段落未找到-需要查询融合表,,遍历融合表
+//	//		arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
+//	//		arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
+//	//		arr = append(arr,repeatArr[i])//组拼接当前id
+//	//
+//	//
+//	//
+//	//
+//	//
+//	//		if len(arr)==1 { //异常错误,新增
+//	//			isErrNum++
+//	//			log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
+//	//			arr_error := make([]string,0)
+//	//			arr_error = append(arr_error,repeatArr[i])//组拼接当前id
+//	//			fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
+//	//			addOrUpdateArr = append(addOrUpdateArr,false)
+//	//			infoFusionArr = append(infoFusionArr, map[string]interface{}{})
+//	//		}else { //正常更新
+//	//			log.Println("... ... 正常多组新增",i)
+//	//			fusionDataGroupArr = append(fusionDataGroupArr,arr)
+//	//			addOrUpdateArr = append(addOrUpdateArr,true)
+//	//			infoFusionArr = append(infoFusionArr,fusionTmpData)
+//	//		}
+//	//
+//	//	}
+//	//	//不断改变中
+//	//	if i%1000 == 0 {
+//	//		log.Println("当前分组数量:",len(fusionDataGroupArr))
+//	//	}
+//	//}
+//
+//
+//
+//
+//	log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
+//	log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
+//	log.Println("********************分割线********************")
+//	log.Println("********************分割线********************")
+//	log.Println("********************分割线********************")
+//
+//
+//	log.Println("开始处理分组融合... ... ... ...")
+//	log.Println("开始处理分组融合... ... ... ...")
+//	log.Println("开始处理分组融合... ... ... ...")
+//
+//	start = int(time.Now().Unix())
+//	//多线程 - 处理数据
+//	pool := make(chan bool, 3)
+//	wg := &sync.WaitGroup{}
+//
+//	for i:=0;i<len(fusionDataGroupArr);i++ {
+//		fusionArr := fusionDataGroupArr[i]
+//		pool <- true
+//		wg.Add(1)
+//		go func(fusionArr []string,i int) {
+//			defer func() {
+//				<-pool
+//				wg.Done()
+//			}()
+//			//构建数据
+//			if (i+1)%500 == 0 {
+//				log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
+//			}
+//			weight :=NewWeightData(fusionArr)
+//			////整理数据-筛选排名,模板
+//			weight.analyzeBuildStandardData()
+//
+//			if len(fusionArr)<=1 {
+//				saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
+//				saveid:=mgo.Save(fusion_coll_name,saveFusionData)
+//				saveRecordData["_id"] = saveid
+//				mgo.Save(record_coll_name,saveRecordData)
+//			}else {
+//				//if addOrUpdateArr[i] {
+//				//	//log.Println("多组更新... ...")
+//				//	tmpdata:=infoFusionArr[i]
+//				//	updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
+//				//
+//				//	UpdateFusion.updatePool <- []map[string]interface{}{
+//				//		map[string]interface{}{
+//				//			"_id": tmpdata["_id"],
+//				//		},
+//				//		updateFusionData,
+//				//	}
+//				//	UpdateRecord.updatePool <- []map[string]interface{}{
+//				//		map[string]interface{}{
+//				//			"_id": tmpdata["_id"],
+//				//		},
+//				//		updateRecordData,
+//				//	}
+//				//}else {
+//				//	//log.Println("多组生成... ...")
+//				//	saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
+//				//	saveid:=mgo.Save(fusion_coll_name,saveFusionData)
+//				//	saveRecordData["_id"] = saveid
+//				//	mgo.Save(record_coll_name,saveRecordData)
+//				//}
+//			}
+//
+//
+//		}(fusionArr,i)
+//
+//
+//
+//	}
+//
+//	wg.Wait()
+//
+//	log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
+//	log.Println("睡眠30秒,然后在发广播")
+//	time.Sleep(30 * time.Second)
+//	//任务完成,开始发送广播通知下面节点
+//	taskSendFusionUdp(mapInfo)
+//
+//}

+ 163 - 159
udpfusion/src/fusionFullData.go

@@ -1,20 +1,142 @@
 package main
 
 import (
+	"encoding/json"
 	"log"
 	qu "qfw/util"
 	"qfw/util/elastic"
 	"strings"
 	"sync"
 	"time"
+	es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
 )
 
+func startFusionData()  {
+	start := int(time.Now().Unix())
+	log.Println("开始遍历索引-进行融合............")
+	//遍历索引
+	esclient := elastic.GetEsConn()
+	defer elastic.DestoryEsConn(esclient)
+
+	if esclient == nil {
+		log.Fatalln("连接池异常")
+	}
+	q :=es_elastic.NewBoolQuery()
+	cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
+		Size(200).Do()
+	if err != nil {
+		log.Fatal(err)
+	}
+	if cursor.Results == nil {
+		log.Fatalf("results != nil; got nil")
+	}
+	if cursor.Results.Hits == nil {
+		log.Fatalf("expected results.Hits != nil; got nil")
+	}
+	log.Println("查询总数:",cursor.TotalHits())
+
+
+	//多线程 - 处理数据
+	pool_mgo := make(chan bool, mgo_pool)
+	wg_mgo := &sync.WaitGroup{}
+	pages,numDocs := 0,0
+
+	for {
+		searchResult, err := cursor.Next()
+		if err == es_elastic.EOS {
+			break
+		}
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		pages++
+		isLog := false
+		for _, hit := range searchResult.Hits.Hits {
+			if hit.Index != esIndex {
+				log.Fatalf("expected SearchResult.Hits.Hit.Index = %q; got %q", esIndex, hit.Index)
+			}
+			tmp := make(map[string]interface{})
+			err := json.Unmarshal(*hit.Source, &tmp)
+			if err != nil {
+				log.Fatalln(err)
+			}
+			if !isLog && numDocs%10000==0 {
+				log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
+				isLog = true
+			}
+
+			numDocs++
+			fusion_ids := qu.ObjToString(tmp["allids"])
+			fusionArr := strings.Split(fusion_ids, ",")
+			sourceid := qu.ObjToString(tmp["_id"])
+			pool_mgo <- true
+			wg_mgo.Add(1)
+			go func(sourceid string, fusionArr []string) {
+				defer func() {
+					<-pool_mgo
+					wg_mgo.Done()
+				}()
+				weight := NewWeightData(fusionArr)
+				weight.analyzeBuildStandardData()
+
+				if len(fusionArr) <= 1 { //单组数据
+					saveFusionData, saveRecordData := weight.dealWithAddFusionStruct()
+					saveid := mgo.Save(fusion_coll_name, saveFusionData)
+					//新增-Record
+					saveRecordData["_id"] = saveid
+					UpdateRecord.add_pool <- saveRecordData
+					//批量更新Es
+					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					UpdateElastic.update_pool <- map[string]string{
+						"id":sourceid,
+						"updateStr":updateStr1+updateStr2,
+					}
+				}else { //多组数据
+					saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct()
+					saveid := mgo.Save(fusion_coll_name, saveFusionData)
+					//新增-Record
+					saveRecordData["_id"] = saveid
+					UpdateRecord.add_pool <- saveRecordData
+					//批量更新Es
+					fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
+					updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
+					updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
+					UpdateElastic.update_pool <- map[string]string{
+						"id":sourceid,
+						"updateStr":updateStr1+updateStr2,
+					}
+
+				}
+			}(sourceid, fusionArr)
+		}
+
+	}
+
+	wg_mgo.Wait()
+
+	if pages <= 0 {
+		log.Fatalf("expected to retrieve at least 1 page; got %d", pages)
+	}
+
+
+	log.Println("fusion is over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
+
+}
 
 
 func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 
+
+	startFusionData()
+	return
+
+
 	log.Println("开始全量融合流程")
 	defer qu.Catch()
+
 	//区间id
 	q := map[string]interface{}{
 		"_id": map[string]interface{}{
@@ -26,45 +148,17 @@ func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
-	//编译不同的融合组,如何划分组
-	fusionDataGroupMap := make(map[string][]string,0) //待融合组
-
-	norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0 //重复数据组
-
+	index,isOK:=0,0
 	start := int(time.Now().Unix())
-	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
-		if index%10000 == 0 {
-			log.Println("current index",index,tmp["_id"])
-		}
-		tmpId:=BsonTOStringId(tmp["_id"])
-		repeat:=qu.IntAll(tmp["repeat"])
-		sourceid:=qu.ObjToString(tmp["repeat_id"])
-		if repeat==1 {
-			repeatArr = append(repeatArr,tmpId)
-			sourceArr = append(sourceArr,sourceid)
-		}else {
-			norepeatArr = append(norepeatArr,tmpId)
-		}
-
-		tmp = make(map[string]interface{})
-	}
-
-	log.Println("task first:",index,len(norepeatArr),"+",len(repeatArr))
-	log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
-
-	//根据重复组,重新划分新的组别
-	start = int(time.Now().Unix())
-
 	//多线程升索引
 	pool_es := make(chan bool, es_pool)
 	wg_es := &sync.WaitGroup{}
 	tmpEsMap := make(map[string]string,0)
-	isGroupNum := 1000
-	for i:=0;i<len(repeatArr);i++ {
-		if i%10000 == 0 {
-			log.Println("curent index ",i)
-		}
-		if i%isGroupNum==0 && i!=0 {
+
+	for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
+		//每遍历isgroupfn条 划分组别
+		if index%isgroupfn==0 && index!=0 {
+			log.Println("current index",index,tmp["_id"])
 			//新的一组执行上一组生索引
 			for k,v:=range tmpEsMap {
 				pool_es <- true
@@ -96,23 +190,33 @@ func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 
 			}
 			wg_es.Wait()
-
 			tmpEsMap = make(map[string]string,0)
 		}
-		//新增一条数据
-		repeatid :=repeatArr[i]
-		sourceid := sourceArr[i]
+
+		repeat := qu.IntAll(tmp["repeat"])
+		sourceid := BsonTOStringId(tmp["_id"])
+		repeatid := BsonTOStringId(tmp["_id"])
+		if repeat==1 {
+			sourceid = qu.ObjToString(tmp["repeat_id"])
+		}else {
+			isOK++
+		}
 		if tmpEsMap[sourceid]!="" {
 			ids := tmpEsMap[sourceid]
 			ids = ids+","+repeatid
 			tmpEsMap[sourceid] = ids
 		}else {
-			tmpEsMap[sourceid] = sourceid+","+repeatid
+			tmpEsMap[sourceid] = repeatid
 		}
+
+		tmp = make(map[string]interface{})
 	}
 
+	log.Println("task first:",index,"不重复数:",isOK,"遍历分组数据用时:",int(time.Now().Unix())-start,"秒")
+
 	//处理剩余数据
 	if len(tmpEsMap)>0 {
+		log.Println("处理剩余数据:",len(tmpEsMap))
 		for k,v:=range tmpEsMap {
 			pool_es <- true
 			wg_es.Add(1)
@@ -121,19 +225,23 @@ func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 					<-pool_es
 					wg_es.Done()
 				}()
-				dataArr := *elastic.GetById(esIndex,esType,es_id)
-				if len(dataArr)>0 { //存在-更新
-					allids := qu.ObjToString(dataArr[0]["allids"])
-					allids = allids+","+cur_ids
-					updateStr := `ctx._source.allids=`+ `"`+allids+`"`
-					elastic.Update(esIndex,esType,es_id, updateStr)
-				}else { //不存在-新增
-					savetmp := make(map[string]interface{}, 0)
-					savetmp["allids"] = cur_ids
-					savetmp["_id"] = StringTOBsonId(es_id)
-					savetmp["template_id"] = ""
-					savetmp["fusion_id"] = ""
-					elastic.Save(esIndex,esType, savetmp)
+				if es_id!="" && cur_ids!="" {
+					dataArr := *elastic.GetById(esIndex,esType,es_id)
+					if len(dataArr)>0 { //存在-更新
+						allids := qu.ObjToString(dataArr[0]["allids"])
+						allids = allids+","+cur_ids
+						updateStr := `ctx._source.allids=`+ `"`+allids+`"`
+						elastic.Update(esIndex,esType,es_id, updateStr)
+					}else { //不存在-新增
+						savetmp := make(map[string]interface{}, 0)
+						savetmp["allids"] = cur_ids
+						savetmp["_id"] = StringTOBsonId(es_id)
+						savetmp["template_id"] = ""
+						savetmp["fusion_id"] = ""
+						elastic.Save(esIndex, esType, savetmp)
+					}
+				}else {
+					log.Println("异常",es_id,cur_ids)
 				}
 			}(k,v)
 		}
@@ -141,121 +249,17 @@ func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
 		tmpEsMap = make(map[string]string,0)
 
 	}
+	log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒")
 
 
-	log.Println("前置索引准备完毕......耗时:",int(time.Now().Unix())-start,"秒")
-
-	start = int(time.Now().Unix())
-	log.Println("开始数据分组... ... ... ...")
-	log.Println("开始数据分组... ... ... ...")
-	log.Println("开始数据分组... ... ... ...")
-
-	//查询分组-多线程
-	for i:=0;i<len(norepeatArr);i++ {
-		if i%10000==0 {
-			log.Println("cur index ",i,norepeatArr[i])
-		}
-		sourceid:=norepeatArr[i]
-		pool_es <- true
-		wg_es.Add(1)
-		go func(sourceid string) {
-			defer func() {
-				<-pool_es
-				wg_es.Done()
-			}()
-			dataArr := *elastic.GetById(esIndex,esType,sourceid)
-			if len(dataArr)>0 { //存在值
-				allids := qu.ObjToString(dataArr[0]["allids"])
-				arr := strings.Split(allids,",")
-				updatelock.Lock()
-				fusionDataGroupMap[sourceid] = arr
-				updatelock.Unlock()
-			}else {
-				arr:=[]string{sourceid}
-				updatelock.Lock()
-				fusionDataGroupMap[sourceid] = arr
-				updatelock.Unlock()
-
-
-			}
-
-		}(sourceid)
-	}
-	wg_es.Wait()
-
-
-	log.Println("最终待融合分组数量:",len(fusionDataGroupMap))
-	log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
-	log.Println("********************分割线********************")
-	log.Println("********************分割线********************")
-	log.Println("********************分割线********************")
-
-
-	log.Println("开始进行正式分组融合......先睡秒30秒")
 	time.Sleep(30 * time.Second)
 
-	start = int(time.Now().Unix())
-	//多线程 - 处理数据
-	pool_mgo := make(chan bool, mgo_pool)
-	wg_mgo := &sync.WaitGroup{}
-
-	fusionIndex:=0
-	for k,v:=range fusionDataGroupMap {
-		fusionIndex++
-		pool_mgo <- true
-		wg_mgo.Add(1)
-		go func(sourceid string ,fusionArr []string,fusionIndex int) {
-			defer func() {
-				<-pool_mgo
-				wg_mgo.Done()
-			}()
-			if fusionIndex % 10000==0 {
-				log.Println("数据融合数量:",fusionIndex,sourceid)
-			}
-			weight :=NewWeightData(fusionArr)
-			weight.analyzeBuildStandardData()
-			if len(fusionArr)<=1 { //单组数据-需要新增Es
-				log.Println("")
-				saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
-				saveid:=mgo.Save(fusion_coll_name,saveFusionData)
-				saveRecordData["_id"] = saveid
-				mgo.Save(record_coll_name,saveRecordData)
-
-				//新增es
-				savetmp := make(map[string]interface{}, 0)
-				fusionid:=BsonTOStringId(saveid)
-				savetmp["_id"] = StringTOBsonId(sourceid)
-				savetmp["allids"] = sourceid
-				savetmp["template_id"] = sourceid
-				savetmp["fusion_id"] = fusionid
-				elastic.Save(esIndex,esType,savetmp)
-
-
-			}else {
-				saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
-				saveid:=mgo.Save(fusion_coll_name,saveFusionData)
-				saveRecordData["_id"] = saveid
-				mgo.Save(record_coll_name,saveRecordData)
-
-				//更新数据-融合id-模板id等 `ctx._source.age=101;ctx._source.name="张三"`
-				fusion_id,template_id:=BsonTOStringId(saveid),qu.ObjToString(saveFusionData["fusion_templateid"])
-				updateStr1 := `ctx._source.template_id=`+ `"`+template_id+`";`
-				updateStr2 := `ctx._source.fusion_id=`+ `"`+fusion_id+`"`
-				elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
 
-			}
-		}(k,v,fusionIndex)
-	}
+	//具体融合数据的方法
+	startFusionData()
 
-	wg_mgo.Wait()
-
-	log.Println("fusion is over :",fusionIndex,len(fusionDataGroupMap),"用时:",int(time.Now().Unix())-start,"秒")
 	log.Println("睡眠30秒,然后在发广播")
-
 	time.Sleep(30 * time.Second)
 
-	//任务完成,开始发送广播通知下面节点
-
 	taskSendFusionUdp(mapInfo)
-
 }

+ 13 - 9
udpfusion/src/main.go

@@ -21,11 +21,14 @@ var (
 	coll_name 	 						string
 	fusion_coll_name,record_coll_name	string						//新增表名
 	NoNeedFusionKey 					map[string]interface{}   	//不需要融合的key
-	UpdateFusion						*updateFusionInfo
-	UpdateRecord						*updateRecordInfo			//更新池
+
+	//UpdateFusion						*updateFusionInfo			//更新Fusion池
+	UpdateRecord						*updateRecordInfo			//更新Record池
+	UpdateElastic						*updateEsInfo				//更新Es池
+
 	siteJsonData						map[string]string			//站点池
 	esIndex,esType					    string						//索引-类型
-	mgo_pool,es_pool					int
+	mgo_pool,es_pool,isgroupfn			int
 	updatelock 							sync.Mutex
 )
 
@@ -42,9 +45,10 @@ func initMgoAndSite()  {
 
 
 	coll_name = mgoconf["collName"].(string)
-	mgo_pool = qu.IntAllDef(mgoconf["mgo_pool"], 3)
+	mgo_pool = qu.IntAllDef(mgoconf["mgo_pool"], 5)
 	fusion_coll_name = sysconfig["fusion_coll_name"].(string)
 	record_coll_name = sysconfig["record_coll_name"].(string)
+	isgroupfn = qu.IntAllDef(sysconfig["isgroupfn"], 10000)
 	NoNeedFusionKey = sysconfig["notFusionKey"].(map[string]interface{})
 
 
@@ -78,13 +82,13 @@ func init() {
 	initMgoAndSite()
 	initEs()
 
-	//更新池
-	UpdateFusion = newUpdateFusionPool()
-	go UpdateFusion.updateFusionData()
 
-	UpdateRecord = newUpdateRecordPool()
-	go UpdateRecord.updateRecordData()
+	//增量修改方式
+	UpdateRecord = newAddRecordPool()
+	go UpdateRecord.addRecordData()
 
+	UpdateElastic = newUpdateEsPool()
+	go UpdateElastic.updateEsData()
 
 
 	log.Println("采用udp模式")

+ 67 - 0
udpfusion/src/updateElastic.go

@@ -0,0 +1,67 @@
+package main
+
+import (
+	"qfw/util/elastic"
+	"log"
+	"time"
+)
+
+type updateEsInfo struct {
+
+	//更新或新增通道
+	update_pool chan map[string]string
+	//数量
+	saveSize   	int
+
+}
+
+
+
+
+
+var sp_es = make(chan bool, 10)
+
+//批量更新对象
+func newUpdateEsPool() *updateEsInfo {
+	update:=&updateEsInfo{make(chan map[string]string, 50000),200}
+	return update
+}
+
+//更新池
+func (update *updateEsInfo) updateEsData() {
+	log.Println("监听Es......更新数据")
+	tmpArr := make([]map[string]string, update.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-update.update_pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == update.saveSize {
+				sp_es <- true
+				go func(dataArr []map[string]string) {
+					defer func() {
+						<-sp_es
+					}()
+					//批量更新
+					elastic.BulkUpdateArr(esIndex,esType,dataArr)
+				}(tmpArr)
+				tmpArr = make([]map[string]string, update.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			if tmpIndex > 0 {
+				sp_es <- true
+				go func(dataArr []map[string]string) {
+					defer func() {
+						<-sp_es
+					}()
+					//批量更新
+					elastic.BulkUpdateArr(esIndex,esType,dataArr)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([]map[string]string, update.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}

+ 58 - 5
udpfusion/src/updateRecord.go

@@ -8,7 +8,8 @@ import (
 type updateRecordInfo struct {
 
 	//更新或新增通道
-	updatePool chan []map[string]interface{}
+	add_pool chan map[string]interface{}
+	update_pool chan []map[string]interface{}
 	//数量
 	saveSize   	int
 
@@ -17,21 +18,71 @@ type updateRecordInfo struct {
 
 
 
-var sp_r = make(chan bool, 5)
 
+var sp_r = make(chan bool,10)
+
+//批量更新对象
 func newUpdateRecordPool() *updateRecordInfo {
-	update:=&updateRecordInfo{make(chan []map[string]interface{}, 50000),500}
+	update:=&updateRecordInfo{nil,make(chan []map[string]interface{}, 50000),200}
+	return update
+}
+//批量新增对象
+func newAddRecordPool() *updateRecordInfo {
+	update:=&updateRecordInfo{make(chan map[string]interface{}, 50000),nil,200}
 	return update
 }
 
 
+//新增池
+func (update *updateRecordInfo) addRecordData() {
+	log.Println("监听日志......新增数据")
+	tmpArr := make([]map[string]interface{}, update.saveSize)
+	tmpIndex := 0
+	for {
+		select {
+		case value := <-update.add_pool:
+			tmpArr[tmpIndex] = value
+			tmpIndex++
+			if tmpIndex == update.saveSize {
+				sp_r <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp_r
+					}()
+					//批量新增
+					mgo.SaveBulk(record_coll_name, dataArr...)
+
+				}(tmpArr)
+				tmpArr = make([]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		case <-time.After(10 * time.Second)://无反应时每x秒检测一次
+			//log.Println("10秒检测",tmpIndex)
+			if tmpIndex > 0 {
+				sp_r <- true
+				go func(dataArr []map[string]interface{}) {
+					defer func() {
+						<-sp_r
+					}()
+					//批量新增
+
+					mgo.SaveBulk(record_coll_name, dataArr...)
+				}(tmpArr[:tmpIndex])
+				tmpArr = make([]map[string]interface{}, update.saveSize)
+				tmpIndex = 0
+			}
+		}
+	}
+}
+
+//更新池
 func (update *updateRecordInfo) updateRecordData() {
-	log.Println("监听日志--待更新数据")
+	log.Println("监听日志......更新数据")
 	tmpArr := make([][]map[string]interface{}, update.saveSize)
 	tmpIndex := 0
 	for {
 		select {
-		case value := <-update.updatePool:
+		case value := <-update.update_pool:
 			tmpArr[tmpIndex] = value
 			tmpIndex++
 			if tmpIndex == update.saveSize {
@@ -40,6 +91,7 @@ func (update *updateRecordInfo) updateRecordData() {
 					defer func() {
 						<-sp_r
 					}()
+					//批量更新
 					mgo.UpSertBulk(record_coll_name, dataArr...)
 				}(tmpArr)
 				tmpArr = make([][]map[string]interface{}, update.saveSize)
@@ -52,6 +104,7 @@ func (update *updateRecordInfo) updateRecordData() {
 					defer func() {
 						<-sp_r
 					}()
+					//批量更新
 					mgo.UpSertBulk(record_coll_name, dataArr...)
 				}(tmpArr[:tmpIndex])
 				tmpArr = make([][]map[string]interface{}, update.saveSize)

+ 79 - 15
udpfusion/src/weightFusion.go

@@ -115,20 +115,48 @@ func (weight *weightDataMap) dealWithMultipleAddFusionStruct ()(map[string]inter
                 "id" : "603bc7d036baf5b8f2bb159a",
                 "value" : 8005829.82
             }
+			"attach_text" : [
+                {
+                    "id" : "5ff1f441c2c0c99d52d74130",
+                    "value" : {
+                        "0" : {
+                            "file_name" : "技术参数.docx",
+                            "attach_url" : "93736308-4de4-11eb-9409-0242ac120005"
+                        }
+                    }
+                }
+            ],
+
 	}
 	*/
 
-	//
+	//attach_text需要单独处理
 	fieldCal := make(map[string]interface{},0)
 	for k,v:=range recordDict{
-		dict := *qu.ObjToMap(v)
-		tmp_id := qu.ObjToString(dict["id"])
-		if fieldCal[tmp_id]==nil {
-			fieldCal[tmp_id] = []interface{}{k}
+		if k=="attach_text" {
+			if attachArr,b := v.([]map[string]interface{});b {
+				for _,v1:=range attachArr {
+					dict := *qu.ObjToMap(v1)
+					tmp_id := qu.ObjToString(dict["id"])
+					if fieldCal[tmp_id]==nil {
+						fieldCal[tmp_id] = []interface{}{k}
+					}else {
+						arr := fieldCal[tmp_id].([]interface{})
+						arr = append(arr,k)
+						fieldCal[tmp_id] = arr
+					}
+				}
+			}
 		}else {
-			arr := fieldCal[tmp_id].([]interface{})
-			arr = append(arr,k)
-			fieldCal[tmp_id] = arr
+			dict := *qu.ObjToMap(v)
+			tmp_id := qu.ObjToString(dict["id"])
+			if fieldCal[tmp_id]==nil {
+				fieldCal[tmp_id] = []interface{}{k}
+			}else {
+				arr := fieldCal[tmp_id].([]interface{})
+				arr = append(arr,k)
+				fieldCal[tmp_id] = arr
+			}
 		}
 	}
 
@@ -197,17 +225,53 @@ func (weight *weightDataMap) dealWithMultipleUpdateFusionStruct (tmpData map[str
 	newRecordDict["number"] = number
 
 
+	/*
+	"data" : {
+            "attach_text" : [
+                {
+                    "id" : "5ff1f441c2c0c99d52d74130",
+                    "value" : {
+                        "0" : {
+                            "file_name" : "技术参数.docx",
+                            "attach_url" : "93736308-4de4-11eb-9409-0242ac120005"
+                        }
+                    }
+                }
+            ],
+            "isextFile" : {
+                "id" : "5ff1f441c2c0c99d52d74130",
+                "value" : true
+            }
+        }
+	*/
 
-
+	//attach_text需要单独处理
 	fieldCal := make(map[string]interface{},0)
 	for k,v:=range recordDict{
-		tmp_id := qu.ObjToString(v.(map[string]interface{})["id"])
-		if fieldCal[tmp_id]==nil {
-			fieldCal[tmp_id] = []interface{}{k}
+		if k=="attach_text" {
+			if attachArr,b := v.([]map[string]interface{});b {
+				for _,v1:=range attachArr {
+					dict := *qu.ObjToMap(v1)
+					tmp_id := qu.ObjToString(dict["id"])
+					if fieldCal[tmp_id]==nil {
+						fieldCal[tmp_id] = []interface{}{k}
+					}else {
+						arr := fieldCal[tmp_id].([]interface{})
+						arr = append(arr,k)
+						fieldCal[tmp_id] = arr
+					}
+				}
+			}
 		}else {
-			arr := fieldCal[tmp_id].([]interface{})
-			arr = append(arr,k)
-			fieldCal[tmp_id] = arr
+			dict := *qu.ObjToMap(v)
+			tmp_id := qu.ObjToString(dict["id"])
+			if fieldCal[tmp_id]==nil {
+				fieldCal[tmp_id] = []interface{}{k}
+			}else {
+				arr := fieldCal[tmp_id].([]interface{})
+				arr = append(arr,k)
+				fieldCal[tmp_id] = arr
+			}
 		}
 	}