|
@@ -1,6 +1,7 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
+ "fmt"
|
|
|
"log"
|
|
|
qu "qfw/util"
|
|
|
"qfw/util/elastic"
|
|
@@ -10,258 +11,9 @@ import (
|
|
|
)
|
|
|
|
|
|
|
|
|
-func startTaskAddAddData(data []byte, mapInfo map[string]interface{}) {
|
|
|
-
|
|
|
- log.Println("开始全量融合流程")
|
|
|
- defer qu.Catch()
|
|
|
- //区间id
|
|
|
- q := map[string]interface{}{
|
|
|
- "_id": map[string]interface{}{
|
|
|
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
- },
|
|
|
- }
|
|
|
- log.Println("查询条件:",q)
|
|
|
- sess := mgo.GetMgoConn()
|
|
|
- defer mgo.DestoryMongoConn(sess)
|
|
|
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
- //编译不同的融合组,如何划分组
|
|
|
- fusionDataGroupMap := make(map[string][]string,0) //待融合组
|
|
|
-
|
|
|
- norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0 //重复数据组
|
|
|
-
|
|
|
- start := int(time.Now().Unix())
|
|
|
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
- if index%10000 == 0 {
|
|
|
- log.Println("current index",index,tmp["_id"])
|
|
|
- }
|
|
|
- tmpId:=BsonTOStringId(tmp["_id"])
|
|
|
- repeat:=qu.IntAll(tmp["repeat"])
|
|
|
- sourceid:=qu.ObjToString(tmp["repeat_id"])
|
|
|
- if repeat==1 {
|
|
|
- repeatArr = append(repeatArr,tmpId)
|
|
|
- sourceArr = append(sourceArr,sourceid)
|
|
|
- }else {
|
|
|
- norepeatArr = append(norepeatArr,tmpId)
|
|
|
- }
|
|
|
-
|
|
|
- tmp = make(map[string]interface{})
|
|
|
- }
|
|
|
-
|
|
|
- log.Println("task first:",index,len(norepeatArr),"+",len(repeatArr))
|
|
|
- log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
|
|
|
-
|
|
|
- //根据重复组,重新划分新的组别
|
|
|
- start = int(time.Now().Unix())
|
|
|
-
|
|
|
- //多线程升索引
|
|
|
- pool_es := make(chan bool, es_pool)
|
|
|
- wg_es := &sync.WaitGroup{}
|
|
|
- tmpEsMap := make(map[string]string,0)
|
|
|
- isGroupNum := 1000
|
|
|
- for i:=0;i<len(repeatArr);i++ {
|
|
|
- if i%10000 == 0 {
|
|
|
- log.Println("curent index ",i)
|
|
|
- }
|
|
|
- if i%isGroupNum==0 && i!=0 {
|
|
|
- //新的一组执行上一组生索引
|
|
|
- for k,v:=range tmpEsMap {
|
|
|
- pool_es <- true
|
|
|
- wg_es.Add(1)
|
|
|
- go func(es_id string,cur_ids string) {
|
|
|
- defer func() {
|
|
|
- <-pool_es
|
|
|
- wg_es.Done()
|
|
|
- }()
|
|
|
- if es_id!="" && cur_ids!="" {
|
|
|
- dataArr := *elastic.GetById(esIndex,esType,es_id)
|
|
|
- if len(dataArr)>0 { //存在-更新
|
|
|
- allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
- allids = allids+","+cur_ids
|
|
|
- updateStr := `ctx._source.allids=`+ `"`+allids+`"`
|
|
|
- elastic.Update(esIndex,esType,es_id, updateStr)
|
|
|
- }else { //不存在-新增
|
|
|
- savetmp := make(map[string]interface{}, 0)
|
|
|
- savetmp["allids"] = cur_ids
|
|
|
- savetmp["_id"] = StringTOBsonId(es_id)
|
|
|
- savetmp["template_id"] = ""
|
|
|
- savetmp["fusion_id"] = ""
|
|
|
- elastic.Save(esIndex, esType, savetmp)
|
|
|
- }
|
|
|
- }else {
|
|
|
- log.Println("异常",es_id,cur_ids)
|
|
|
- }
|
|
|
- }(k,v)
|
|
|
-
|
|
|
- }
|
|
|
- wg_es.Wait()
|
|
|
-
|
|
|
- tmpEsMap = make(map[string]string,0)
|
|
|
- }
|
|
|
- //新增一条数据
|
|
|
- repeatid :=repeatArr[i]
|
|
|
- sourceid := sourceArr[i]
|
|
|
- if tmpEsMap[sourceid]!="" {
|
|
|
- ids := tmpEsMap[sourceid]
|
|
|
- ids = ids+","+repeatid
|
|
|
- tmpEsMap[sourceid] = ids
|
|
|
- }else {
|
|
|
- tmpEsMap[sourceid] = sourceid+","+repeatid
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //处理剩余数据
|
|
|
- if len(tmpEsMap)>0 {
|
|
|
- for k,v:=range tmpEsMap {
|
|
|
- pool_es <- true
|
|
|
- wg_es.Add(1)
|
|
|
- go func(es_id string,cur_ids string) {
|
|
|
- defer func() {
|
|
|
- <-pool_es
|
|
|
- wg_es.Done()
|
|
|
- }()
|
|
|
- dataArr := *elastic.GetById(esIndex,esType,es_id)
|
|
|
- if len(dataArr)>0 { //存在-更新
|
|
|
- allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
- allids = allids+","+cur_ids
|
|
|
- updateStr := `ctx._source.allids=`+ `"`+allids+`"`
|
|
|
- elastic.Update(esIndex,esType,es_id, updateStr)
|
|
|
- }else { //不存在-新增
|
|
|
- savetmp := make(map[string]interface{}, 0)
|
|
|
- savetmp["allids"] = cur_ids
|
|
|
- savetmp["_id"] = StringTOBsonId(es_id)
|
|
|
- savetmp["template_id"] = ""
|
|
|
- savetmp["fusion_id"] = ""
|
|
|
- elastic.Save(esIndex,esType, savetmp)
|
|
|
- }
|
|
|
- }(k,v)
|
|
|
- }
|
|
|
- wg_es.Wait()
|
|
|
- tmpEsMap = make(map[string]string,0)
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- log.Println("前置索引准备完毕......耗时:",int(time.Now().Unix())-start,"秒")
|
|
|
-
|
|
|
- start = int(time.Now().Unix())
|
|
|
- log.Println("开始数据分组... ... ... ...")
|
|
|
- log.Println("开始数据分组... ... ... ...")
|
|
|
- log.Println("开始数据分组... ... ... ...")
|
|
|
-
|
|
|
- //查询分组-多线程
|
|
|
- for i:=0;i<len(norepeatArr);i++ {
|
|
|
- if i%10000==0 {
|
|
|
- log.Println("cur index ",i,norepeatArr[i])
|
|
|
- }
|
|
|
- sourceid:=norepeatArr[i]
|
|
|
- pool_es <- true
|
|
|
- wg_es.Add(1)
|
|
|
- go func(sourceid string) {
|
|
|
- defer func() {
|
|
|
- <-pool_es
|
|
|
- wg_es.Done()
|
|
|
- }()
|
|
|
- dataArr := *elastic.GetById(esIndex,esType,sourceid)
|
|
|
- if len(dataArr)>0 { //存在值
|
|
|
- allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
- arr := strings.Split(allids,",")
|
|
|
- updatelock.Lock()
|
|
|
- fusionDataGroupMap[sourceid] = arr
|
|
|
- updatelock.Unlock()
|
|
|
- }else {
|
|
|
- arr:=[]string{sourceid}
|
|
|
- updatelock.Lock()
|
|
|
- fusionDataGroupMap[sourceid] = arr
|
|
|
- updatelock.Unlock()
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- }(sourceid)
|
|
|
- }
|
|
|
- wg_es.Wait()
|
|
|
-
|
|
|
-
|
|
|
- log.Println("最终待融合分组数量:",len(fusionDataGroupMap))
|
|
|
- log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
|
|
|
- log.Println("********************分割线********************")
|
|
|
- log.Println("********************分割线********************")
|
|
|
- log.Println("********************分割线********************")
|
|
|
-
|
|
|
-
|
|
|
- log.Println("开始进行正式分组融合......先睡秒30秒")
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
-
|
|
|
- start = int(time.Now().Unix())
|
|
|
- //多线程 - 处理数据
|
|
|
- pool_mgo := make(chan bool, mgo_pool)
|
|
|
- wg_mgo := &sync.WaitGroup{}
|
|
|
-
|
|
|
- fusionIndex:=0
|
|
|
- for k,v:=range fusionDataGroupMap {
|
|
|
- fusionIndex++
|
|
|
- pool_mgo <- true
|
|
|
- wg_mgo.Add(1)
|
|
|
- go func(sourceid string ,fusionArr []string,fusionIndex int) {
|
|
|
- defer func() {
|
|
|
- <-pool_mgo
|
|
|
- wg_mgo.Done()
|
|
|
- }()
|
|
|
- if fusionIndex % 10000==0 {
|
|
|
- log.Println("数据融合数量:",fusionIndex,sourceid)
|
|
|
- }
|
|
|
- weight :=NewWeightData(fusionArr)
|
|
|
- weight.analyzeBuildStandardData()
|
|
|
- if len(fusionArr)<=1 { //单组数据-需要新增Es
|
|
|
- saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
|
|
|
- saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
- saveRecordData["_id"] = saveid
|
|
|
- mgo.Save(record_coll_name,saveRecordData)
|
|
|
-
|
|
|
- //新增es
|
|
|
- savetmp := make(map[string]interface{}, 0)
|
|
|
- fusionid:=BsonTOStringId(saveid)
|
|
|
- savetmp["_id"] = StringTOBsonId(sourceid)
|
|
|
- savetmp["allids"] = sourceid
|
|
|
- savetmp["template_id"] = sourceid
|
|
|
- savetmp["fusion_id"] = fusionid
|
|
|
- elastic.Save(esIndex,esType,savetmp)
|
|
|
-
|
|
|
-
|
|
|
- }else {
|
|
|
- saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
|
|
|
- saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
- saveRecordData["_id"] = saveid
|
|
|
- mgo.Save(record_coll_name,saveRecordData)
|
|
|
-
|
|
|
- //更新数据-融合id-模板id等 `ctx._source.age=101;ctx._source.name="张三"`
|
|
|
- fusion_id,template_id:=BsonTOStringId(saveid),qu.ObjToString(saveFusionData["fusion_templateid"])
|
|
|
- updateStr1 := `ctx._source.template_id=`+ `"`+template_id+`";`
|
|
|
- updateStr2 := `ctx._source.fusion_id=`+ `"`+fusion_id+`"`
|
|
|
- elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
|
|
|
-
|
|
|
- }
|
|
|
- }(k,v,fusionIndex)
|
|
|
- }
|
|
|
-
|
|
|
- wg_mgo.Wait()
|
|
|
-
|
|
|
- log.Println("fusion is over :",fusionIndex,len(fusionDataGroupMap),"用时:",int(time.Now().Unix())-start,"秒")
|
|
|
- log.Println("睡眠30秒,然后在发广播")
|
|
|
-
|
|
|
- time.Sleep(30 * time.Second)
|
|
|
-
|
|
|
- //任务完成,开始发送广播通知下面节点
|
|
|
-
|
|
|
- taskSendFusionUdp(mapInfo)
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-//增量-融合-一小段
|
|
|
//func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
|
|
|
-// log.Println("开始增量融合流程")
|
|
|
+//
|
|
|
+// log.Println("开始全量融合流程")
|
|
|
// defer qu.Catch()
|
|
|
// //区间id
|
|
|
// q := map[string]interface{}{
|
|
@@ -275,14 +27,9 @@ func startTaskAddAddData(data []byte, mapInfo map[string]interface{}) {
|
|
|
// defer mgo.DestoryMongoConn(sess)
|
|
|
// it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
// //编译不同的融合组,如何划分组
|
|
|
+// fusionDataGroupMap := make(map[string][]string,0) //待融合组
|
|
|
//
|
|
|
-// //待融合组
|
|
|
-// fusionDataGroupArr := make([][]string,0)
|
|
|
-// //需要更新组
|
|
|
-// updateFusionMap,curFusionKeyMap:=make(map[string]interface{},0),make(map[string]interface{},0)
|
|
|
-// //重复数据组
|
|
|
-// norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0
|
|
|
-//
|
|
|
+// norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0 //重复数据组
|
|
|
//
|
|
|
// start := int(time.Now().Unix())
|
|
|
// for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
@@ -296,224 +43,478 @@ func startTaskAddAddData(data []byte, mapInfo map[string]interface{}) {
|
|
|
// repeatArr = append(repeatArr,tmpId)
|
|
|
// sourceArr = append(sourceArr,sourceid)
|
|
|
// }else {
|
|
|
-// norepeatArr = append(repeatArr,tmpId)
|
|
|
+// norepeatArr = append(norepeatArr,tmpId)
|
|
|
// }
|
|
|
+//
|
|
|
// tmp = make(map[string]interface{})
|
|
|
// }
|
|
|
//
|
|
|
-// log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
|
|
|
+// log.Println("task first:",index,len(norepeatArr),"+",len(repeatArr))
|
|
|
// log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
|
|
|
//
|
|
|
// //根据重复组,重新划分新的组别
|
|
|
// start = int(time.Now().Unix())
|
|
|
-// elastic.InitElasticSize("http://192.168.3.11:9800",10)
|
|
|
+//
|
|
|
+// //多线程升索引
|
|
|
+// pool_es := make(chan bool, es_pool)
|
|
|
+// wg_es := &sync.WaitGroup{}
|
|
|
+// tmpEsMap := make(map[string]string,0)
|
|
|
+// isGroupNum := 1000
|
|
|
// for i:=0;i<len(repeatArr);i++ {
|
|
|
-// //查询ES-升索引
|
|
|
-// repeatid := repeatArr[i]
|
|
|
-// sourceid := sourceArr[i]
|
|
|
-// key := fmt.Sprintf("%s",sourceid)
|
|
|
-// dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
|
|
|
-// if len(dataArr)>0 { //存在值
|
|
|
+// if i%10000 == 0 {
|
|
|
+// log.Println("curent index ",i)
|
|
|
+// }
|
|
|
+// if i%isGroupNum==0 && i!=0 {
|
|
|
+// //新的一组执行上一组生索引
|
|
|
+// for k,v:=range tmpEsMap {
|
|
|
+// pool_es <- true
|
|
|
+// wg_es.Add(1)
|
|
|
+// go func(es_id string,cur_ids string) {
|
|
|
+// defer func() {
|
|
|
+// <-pool_es
|
|
|
+// wg_es.Done()
|
|
|
+// }()
|
|
|
+// if es_id!="" && cur_ids!="" {
|
|
|
+// dataArr := *elastic.GetById(esIndex,esType,es_id)
|
|
|
+// if len(dataArr)>0 { //存在-更新
|
|
|
+// allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
+// allids = allids+","+cur_ids
|
|
|
+// updateStr := `ctx._source.allids=`+ `"`+allids+`"`
|
|
|
+// elastic.Update(esIndex,esType,es_id, updateStr)
|
|
|
+// }else { //不存在-新增
|
|
|
+// savetmp := make(map[string]interface{}, 0)
|
|
|
+// savetmp["allids"] = cur_ids
|
|
|
+// savetmp["_id"] = StringTOBsonId(es_id)
|
|
|
+// savetmp["template_id"] = ""
|
|
|
+// savetmp["fusion_id"] = ""
|
|
|
+// elastic.Save(esIndex, esType, savetmp)
|
|
|
+// }
|
|
|
+// }else {
|
|
|
+// log.Println("异常",es_id,cur_ids)
|
|
|
+// }
|
|
|
+// }(k,v)
|
|
|
//
|
|
|
-// if curFusionKeyMap[key]==nil { //存在融合表-不在当前id段落内
|
|
|
-// updateFusionMap[key] = ""
|
|
|
-// }
|
|
|
-// //es 随时更新ids
|
|
|
-// allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
-// allids = allids+","+repeatid
|
|
|
-// updateStr := `ctx._source.allids=`+ `"`+allids+`"`
|
|
|
-// b:=elastic.Update("allzktest","allzktest",sourceid, updateStr)
|
|
|
-// if !b {
|
|
|
-// log.Println("es更新异常",repeatid,sourceid)
|
|
|
// }
|
|
|
+// wg_es.Wait()
|
|
|
+//
|
|
|
+// tmpEsMap = make(map[string]string,0)
|
|
|
+// }
|
|
|
+// //新增一条数据
|
|
|
+// repeatid :=repeatArr[i]
|
|
|
+// sourceid := sourceArr[i]
|
|
|
+// if tmpEsMap[sourceid]!="" {
|
|
|
+// ids := tmpEsMap[sourceid]
|
|
|
+// ids = ids+","+repeatid
|
|
|
+// tmpEsMap[sourceid] = ids
|
|
|
// }else {
|
|
|
-// //索引查不到-确定新增- es 随时新增ids
|
|
|
-// savetmp := make(map[string]interface{}, 0)
|
|
|
-// savetmp["allids"] = repeatid
|
|
|
-// savetmp["_id"] = StringTOBsonId(sourceid)
|
|
|
-// b:=elastic.Save("allzktest", "allzktest", savetmp)
|
|
|
-// if !b {
|
|
|
-// log.Println("es保存异常",repeatid,sourceid)
|
|
|
-// }
|
|
|
-// curFusionKeyMap[key] = ""
|
|
|
+// tmpEsMap[sourceid] = sourceid+","+repeatid
|
|
|
// }
|
|
|
// }
|
|
|
//
|
|
|
-// log.Println("前置索引准备完毕... ...","耗时:",int(time.Now().Unix())-start,"秒")
|
|
|
+// //处理剩余数据
|
|
|
+// if len(tmpEsMap)>0 {
|
|
|
+// for k,v:=range tmpEsMap {
|
|
|
+// pool_es <- true
|
|
|
+// wg_es.Add(1)
|
|
|
+// go func(es_id string,cur_ids string) {
|
|
|
+// defer func() {
|
|
|
+// <-pool_es
|
|
|
+// wg_es.Done()
|
|
|
+// }()
|
|
|
+// dataArr := *elastic.GetById(esIndex,esType,es_id)
|
|
|
+// if len(dataArr)>0 { //存在-更新
|
|
|
+// allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
+// allids = allids+","+cur_ids
|
|
|
+// updateStr := `ctx._source.allids=`+ `"`+allids+`"`
|
|
|
+// elastic.Update(esIndex,esType,es_id, updateStr)
|
|
|
+// }else { //不存在-新增
|
|
|
+// savetmp := make(map[string]interface{}, 0)
|
|
|
+// savetmp["allids"] = cur_ids
|
|
|
+// savetmp["_id"] = StringTOBsonId(es_id)
|
|
|
+// savetmp["template_id"] = ""
|
|
|
+// savetmp["fusion_id"] = ""
|
|
|
+// elastic.Save(esIndex,esType, savetmp)
|
|
|
+// }
|
|
|
+// }(k,v)
|
|
|
+// }
|
|
|
+// wg_es.Wait()
|
|
|
+// tmpEsMap = make(map[string]string,0)
|
|
|
+//
|
|
|
+// }
|
|
|
//
|
|
|
//
|
|
|
+// log.Println("前置索引准备完毕......耗时:",int(time.Now().Unix())-start,"秒")
|
|
|
+//
|
|
|
// start = int(time.Now().Unix())
|
|
|
// log.Println("开始数据分组... ... ... ...")
|
|
|
// log.Println("开始数据分组... ... ... ...")
|
|
|
// log.Println("开始数据分组... ... ... ...")
|
|
|
//
|
|
|
-// //当前段落组
|
|
|
+// //查询分组-多线程
|
|
|
// for i:=0;i<len(norepeatArr);i++ {
|
|
|
-// sourceid:=norepeatArr[i]
|
|
|
-// dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
|
|
|
-// if len(dataArr)>0 { //存在值
|
|
|
-// allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
-// arr := strings.Split(allids,",")
|
|
|
-// arr = append(arr,sourceid)
|
|
|
-// fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
-// }else {
|
|
|
-// arr:=[]string{sourceid}
|
|
|
-// fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
-// }
|
|
|
-// }
|
|
|
-// //更新组
|
|
|
-// for k,_:=range updateFusionMap {
|
|
|
-// sourceid:=qu.ObjToString(k)
|
|
|
-// dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
|
|
|
-// if len(dataArr)>0 { //存在值
|
|
|
-// allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
-// arr := strings.Split(allids,",")
|
|
|
-// arr = append(arr,sourceid)
|
|
|
-// fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
-// }else {
|
|
|
-// log.Println("融合表更新,查询Es异常:",sourceid)
|
|
|
+// if i%10000==0 {
|
|
|
+// log.Println("cur index ",i,norepeatArr[i])
|
|
|
// }
|
|
|
-// }
|
|
|
+// sourceid:=norepeatArr[i]
|
|
|
+// pool_es <- true
|
|
|
+// wg_es.Add(1)
|
|
|
+// go func(sourceid string) {
|
|
|
+// defer func() {
|
|
|
+// <-pool_es
|
|
|
+// wg_es.Done()
|
|
|
+// }()
|
|
|
+// dataArr := *elastic.GetById(esIndex,esType,sourceid)
|
|
|
+// if len(dataArr)>0 { //存在值
|
|
|
+// allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
+// arr := strings.Split(allids,",")
|
|
|
+// updatelock.Lock()
|
|
|
+// fusionDataGroupMap[sourceid] = arr
|
|
|
+// updatelock.Unlock()
|
|
|
+// }else {
|
|
|
+// arr:=[]string{sourceid}
|
|
|
+// updatelock.Lock()
|
|
|
+// fusionDataGroupMap[sourceid] = arr
|
|
|
+// updatelock.Unlock()
|
|
|
+//
|
|
|
//
|
|
|
+// }
|
|
|
+//
|
|
|
+// }(sourceid)
|
|
|
+// }
|
|
|
+// wg_es.Wait()
|
|
|
//
|
|
|
//
|
|
|
-// //isErrNum:=0
|
|
|
-// //for i:=0;i<len(repeatArr);i++ {
|
|
|
-// // sourceid := sourceArr[i]
|
|
|
-// // isAddExist,index := false,0
|
|
|
-// // //根据原sourceid 直接遍历组
|
|
|
-// //R: for k,v:=range fusionDataGroupArr{
|
|
|
-// // for _,v1:=range v{
|
|
|
-// // if v1==sourceid {
|
|
|
-// // index = k
|
|
|
-// // isAddExist = true
|
|
|
-// // break R
|
|
|
-// // }
|
|
|
-// // }
|
|
|
-// // }
|
|
|
-// // if i%1000 == 0 {
|
|
|
-// // log.Println("分组中...","current index",i,repeatArr[i])
|
|
|
-// // }
|
|
|
-// //
|
|
|
-// // if isAddExist { //数组截取替换-找到指定
|
|
|
-// // arr := make([]string,0)
|
|
|
-// // arr = fusionDataGroupArr[index]
|
|
|
-// // arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
-// // fusionDataGroupArr[index] = arr
|
|
|
-// // log.Println("... ... 正常单组新增",i)
|
|
|
-// //
|
|
|
-// // }else {//当前段落未找到-需要查询融合表,,遍历融合表
|
|
|
-// // arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
|
|
|
-// // arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
|
|
|
-// // arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
-// //
|
|
|
-// //
|
|
|
-// //
|
|
|
-// //
|
|
|
-// //
|
|
|
-// // if len(arr)==1 { //异常错误,新增
|
|
|
-// // isErrNum++
|
|
|
-// // log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
|
|
|
-// // arr_error := make([]string,0)
|
|
|
-// // arr_error = append(arr_error,repeatArr[i])//组拼接当前id
|
|
|
-// // fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
|
|
|
-// // addOrUpdateArr = append(addOrUpdateArr,false)
|
|
|
-// // infoFusionArr = append(infoFusionArr, map[string]interface{}{})
|
|
|
-// // }else { //正常更新
|
|
|
-// // log.Println("... ... 正常多组新增",i)
|
|
|
-// // fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
-// // addOrUpdateArr = append(addOrUpdateArr,true)
|
|
|
-// // infoFusionArr = append(infoFusionArr,fusionTmpData)
|
|
|
-// // }
|
|
|
-// //
|
|
|
-// // }
|
|
|
-// // //不断改变中
|
|
|
-// // if i%1000 == 0 {
|
|
|
-// // log.Println("当前分组数量:",len(fusionDataGroupArr))
|
|
|
-// // }
|
|
|
-// //}
|
|
|
-//
|
|
|
-//
|
|
|
-//
|
|
|
-//
|
|
|
-// log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
|
|
|
+// log.Println("最终待融合分组数量:",len(fusionDataGroupMap))
|
|
|
// log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
|
|
|
// log.Println("********************分割线********************")
|
|
|
// log.Println("********************分割线********************")
|
|
|
// log.Println("********************分割线********************")
|
|
|
//
|
|
|
//
|
|
|
-// log.Println("开始处理分组融合... ... ... ...")
|
|
|
-// log.Println("开始处理分组融合... ... ... ...")
|
|
|
-// log.Println("开始处理分组融合... ... ... ...")
|
|
|
+// log.Println("开始进行正式分组融合......先睡秒30秒")
|
|
|
+// time.Sleep(30 * time.Second)
|
|
|
//
|
|
|
// start = int(time.Now().Unix())
|
|
|
// //多线程 - 处理数据
|
|
|
-// pool := make(chan bool, 3)
|
|
|
-// wg := &sync.WaitGroup{}
|
|
|
-//
|
|
|
-// for i:=0;i<len(fusionDataGroupArr);i++ {
|
|
|
-// fusionArr := fusionDataGroupArr[i]
|
|
|
-// pool <- true
|
|
|
-// wg.Add(1)
|
|
|
-// go func(fusionArr []string,i int) {
|
|
|
+// pool_mgo := make(chan bool, mgo_pool)
|
|
|
+// wg_mgo := &sync.WaitGroup{}
|
|
|
+//
|
|
|
+// fusionIndex:=0
|
|
|
+// for k,v:=range fusionDataGroupMap {
|
|
|
+// fusionIndex++
|
|
|
+// pool_mgo <- true
|
|
|
+// wg_mgo.Add(1)
|
|
|
+// go func(sourceid string ,fusionArr []string,fusionIndex int) {
|
|
|
// defer func() {
|
|
|
-// <-pool
|
|
|
-// wg.Done()
|
|
|
+// <-pool_mgo
|
|
|
+// wg_mgo.Done()
|
|
|
// }()
|
|
|
-// //构建数据
|
|
|
-// if (i+1)%500 == 0 {
|
|
|
-// log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
|
|
|
+// if fusionIndex % 10000==0 {
|
|
|
+// log.Println("数据融合数量:",fusionIndex,sourceid)
|
|
|
// }
|
|
|
// weight :=NewWeightData(fusionArr)
|
|
|
-// ////整理数据-筛选排名,模板
|
|
|
// weight.analyzeBuildStandardData()
|
|
|
-//
|
|
|
-// if len(fusionArr)<=1 {
|
|
|
+// if len(fusionArr)<=1 { //单组数据-需要新增Es
|
|
|
// saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
|
|
|
// saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
// saveRecordData["_id"] = saveid
|
|
|
// mgo.Save(record_coll_name,saveRecordData)
|
|
|
-// }else {
|
|
|
-// //if addOrUpdateArr[i] {
|
|
|
-// // //log.Println("多组更新... ...")
|
|
|
-// // tmpdata:=infoFusionArr[i]
|
|
|
-// // updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
|
|
|
-// //
|
|
|
-// // UpdateFusion.updatePool <- []map[string]interface{}{
|
|
|
-// // map[string]interface{}{
|
|
|
-// // "_id": tmpdata["_id"],
|
|
|
-// // },
|
|
|
-// // updateFusionData,
|
|
|
-// // }
|
|
|
-// // UpdateRecord.updatePool <- []map[string]interface{}{
|
|
|
-// // map[string]interface{}{
|
|
|
-// // "_id": tmpdata["_id"],
|
|
|
-// // },
|
|
|
-// // updateRecordData,
|
|
|
-// // }
|
|
|
-// //}else {
|
|
|
-// // //log.Println("多组生成... ...")
|
|
|
-// // saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
|
|
|
-// // saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
-// // saveRecordData["_id"] = saveid
|
|
|
-// // mgo.Save(record_coll_name,saveRecordData)
|
|
|
-// //}
|
|
|
-// }
|
|
|
//
|
|
|
+// //新增es
|
|
|
+// savetmp := make(map[string]interface{}, 0)
|
|
|
+// fusionid:=BsonTOStringId(saveid)
|
|
|
+// savetmp["_id"] = StringTOBsonId(sourceid)
|
|
|
+// savetmp["allids"] = sourceid
|
|
|
+// savetmp["template_id"] = sourceid
|
|
|
+// savetmp["fusion_id"] = fusionid
|
|
|
+// elastic.Save(esIndex,esType,savetmp)
|
|
|
//
|
|
|
-// }(fusionArr,i)
|
|
|
//
|
|
|
+// }else {
|
|
|
+// saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
|
|
|
+// saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
+// saveRecordData["_id"] = saveid
|
|
|
+// mgo.Save(record_coll_name,saveRecordData)
|
|
|
//
|
|
|
+// //更新数据-融合id-模板id等 `ctx._source.age=101;ctx._source.name="张三"`
|
|
|
+// fusion_id,template_id:=BsonTOStringId(saveid),qu.ObjToString(saveFusionData["fusion_templateid"])
|
|
|
+// updateStr1 := `ctx._source.template_id=`+ `"`+template_id+`";`
|
|
|
+// updateStr2 := `ctx._source.fusion_id=`+ `"`+fusion_id+`"`
|
|
|
+// elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
|
|
|
//
|
|
|
+// }
|
|
|
+// }(k,v,fusionIndex)
|
|
|
// }
|
|
|
//
|
|
|
-// wg.Wait()
|
|
|
+// wg_mgo.Wait()
|
|
|
//
|
|
|
-// log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
|
|
|
+// log.Println("fusion is over :",fusionIndex,len(fusionDataGroupMap),"用时:",int(time.Now().Unix())-start,"秒")
|
|
|
// log.Println("睡眠30秒,然后在发广播")
|
|
|
+//
|
|
|
// time.Sleep(30 * time.Second)
|
|
|
+//
|
|
|
// //任务完成,开始发送广播通知下面节点
|
|
|
+//
|
|
|
// taskSendFusionUdp(mapInfo)
|
|
|
//
|
|
|
-//}
|
|
|
+//}
|
|
|
+
|
|
|
+
|
|
|
+//增量-融合一小段
|
|
|
+func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
|
|
|
+ log.Println("开始增量融合流程")
|
|
|
+ defer qu.Catch()
|
|
|
+ //区间id
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
|
|
|
+ "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Println("查询条件:",q)
|
|
|
+ sess := mgo.GetMgoConn()
|
|
|
+ defer mgo.DestoryMongoConn(sess)
|
|
|
+ it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
|
|
|
+ //编译不同的融合组,如何划分组
|
|
|
+
|
|
|
+ //待融合组
|
|
|
+ fusionDataGroupArr := make([][]string,0)
|
|
|
+ //需要更新组
|
|
|
+ updateFusionMap,curFusionKeyMap:=make(map[string]interface{},0),make(map[string]interface{},0)
|
|
|
+ //重复数据组
|
|
|
+ norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0
|
|
|
+
|
|
|
+
|
|
|
+ start := int(time.Now().Unix())
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
|
|
|
+ if index%10000 == 0 {
|
|
|
+ log.Println("current index",index,tmp["_id"])
|
|
|
+ }
|
|
|
+ tmpId:=BsonTOStringId(tmp["_id"])
|
|
|
+ repeat:=qu.IntAll(tmp["repeat"])
|
|
|
+ sourceid:=qu.ObjToString(tmp["repeat_id"])
|
|
|
+ if repeat==1 {
|
|
|
+ repeatArr = append(repeatArr,tmpId)
|
|
|
+ sourceArr = append(sourceArr,sourceid)
|
|
|
+ }else {
|
|
|
+ norepeatArr = append(repeatArr,tmpId)
|
|
|
+ }
|
|
|
+ tmp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
|
|
|
+ log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
|
|
|
+
|
|
|
+ //根据重复组,重新划分新的组别
|
|
|
+ start = int(time.Now().Unix())
|
|
|
+ elastic.InitElasticSize("http://192.168.3.11:9800",10)
|
|
|
+ for i:=0;i<len(repeatArr);i++ {
|
|
|
+ //查询ES-升索引
|
|
|
+ repeatid := repeatArr[i]
|
|
|
+ sourceid := sourceArr[i]
|
|
|
+ key := fmt.Sprintf("%s",sourceid)
|
|
|
+ dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
|
|
|
+ if len(dataArr)>0 { //存在值
|
|
|
+
|
|
|
+ if curFusionKeyMap[key]==nil { //存在融合表-不在当前id段落内
|
|
|
+ updateFusionMap[key] = ""
|
|
|
+ }
|
|
|
+ //es 随时更新ids
|
|
|
+ allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
+ allids = allids+","+repeatid
|
|
|
+ updateStr := `ctx._source.allids=`+ `"`+allids+`"`
|
|
|
+ b:=elastic.Update("allzktest","allzktest",sourceid, updateStr)
|
|
|
+ if !b {
|
|
|
+ log.Println("es更新异常",repeatid,sourceid)
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ //索引查不到-确定新增- es 随时新增ids
|
|
|
+ savetmp := make(map[string]interface{}, 0)
|
|
|
+ savetmp["allids"] = repeatid
|
|
|
+ savetmp["_id"] = StringTOBsonId(sourceid)
|
|
|
+ b:=elastic.Save("allzktest", "allzktest", savetmp)
|
|
|
+ if !b {
|
|
|
+ log.Println("es保存异常",repeatid,sourceid)
|
|
|
+ }
|
|
|
+ curFusionKeyMap[key] = ""
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Println("前置索引准备完毕... ...","耗时:",int(time.Now().Unix())-start,"秒")
|
|
|
+
|
|
|
+
|
|
|
+ start = int(time.Now().Unix())
|
|
|
+ log.Println("开始数据分组... ... ... ...")
|
|
|
+ log.Println("开始数据分组... ... ... ...")
|
|
|
+ log.Println("开始数据分组... ... ... ...")
|
|
|
+
|
|
|
+ //当前段落组
|
|
|
+ for i:=0;i<len(norepeatArr);i++ {
|
|
|
+ sourceid:=norepeatArr[i]
|
|
|
+ dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
|
|
|
+ if len(dataArr)>0 { //存在值
|
|
|
+ allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
+ arr := strings.Split(allids,",")
|
|
|
+ arr = append(arr,sourceid)
|
|
|
+ fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
+ }else {
|
|
|
+ arr:=[]string{sourceid}
|
|
|
+ fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //更新组
|
|
|
+ for k,_:=range updateFusionMap {
|
|
|
+ sourceid:=qu.ObjToString(k)
|
|
|
+ dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
|
|
|
+ if len(dataArr)>0 { //存在值
|
|
|
+ allids := qu.ObjToString(dataArr[0]["allids"])
|
|
|
+ arr := strings.Split(allids,",")
|
|
|
+ arr = append(arr,sourceid)
|
|
|
+ fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
+ }else {
|
|
|
+ log.Println("融合表更新,查询Es异常:",sourceid)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ //isErrNum:=0
|
|
|
+ //for i:=0;i<len(repeatArr);i++ {
|
|
|
+ // sourceid := sourceArr[i]
|
|
|
+ // isAddExist,index := false,0
|
|
|
+ // //根据原sourceid 直接遍历组
|
|
|
+ //R: for k,v:=range fusionDataGroupArr{
|
|
|
+ // for _,v1:=range v{
|
|
|
+ // if v1==sourceid {
|
|
|
+ // index = k
|
|
|
+ // isAddExist = true
|
|
|
+ // break R
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ // if i%1000 == 0 {
|
|
|
+ // log.Println("分组中...","current index",i,repeatArr[i])
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // if isAddExist { //数组截取替换-找到指定
|
|
|
+ // arr := make([]string,0)
|
|
|
+ // arr = fusionDataGroupArr[index]
|
|
|
+ // arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
+ // fusionDataGroupArr[index] = arr
|
|
|
+ // log.Println("... ... 正常单组新增",i)
|
|
|
+ //
|
|
|
+ // }else {//当前段落未找到-需要查询融合表,,遍历融合表
|
|
|
+ // arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
|
|
|
+ // arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
|
|
|
+ // arr = append(arr,repeatArr[i])//组拼接当前id
|
|
|
+ //
|
|
|
+ //
|
|
|
+ //
|
|
|
+ //
|
|
|
+ //
|
|
|
+ // if len(arr)==1 { //异常错误,新增
|
|
|
+ // isErrNum++
|
|
|
+ // log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
|
|
|
+ // arr_error := make([]string,0)
|
|
|
+ // arr_error = append(arr_error,repeatArr[i])//组拼接当前id
|
|
|
+ // fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
|
|
|
+ // addOrUpdateArr = append(addOrUpdateArr,false)
|
|
|
+ // infoFusionArr = append(infoFusionArr, map[string]interface{}{})
|
|
|
+ // }else { //正常更新
|
|
|
+ // log.Println("... ... 正常多组新增",i)
|
|
|
+ // fusionDataGroupArr = append(fusionDataGroupArr,arr)
|
|
|
+ // addOrUpdateArr = append(addOrUpdateArr,true)
|
|
|
+ // infoFusionArr = append(infoFusionArr,fusionTmpData)
|
|
|
+ // }
|
|
|
+ //
|
|
|
+ // }
|
|
|
+ // //不断改变中
|
|
|
+ // if i%1000 == 0 {
|
|
|
+ // log.Println("当前分组数量:",len(fusionDataGroupArr))
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
|
|
|
+ log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
|
|
|
+ log.Println("********************分割线********************")
|
|
|
+ log.Println("********************分割线********************")
|
|
|
+ log.Println("********************分割线********************")
|
|
|
+
|
|
|
+
|
|
|
+ log.Println("开始处理分组融合... ... ... ...")
|
|
|
+ log.Println("开始处理分组融合... ... ... ...")
|
|
|
+ log.Println("开始处理分组融合... ... ... ...")
|
|
|
+
|
|
|
+ start = int(time.Now().Unix())
|
|
|
+ //多线程 - 处理数据
|
|
|
+ pool := make(chan bool, 3)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+
|
|
|
+ for i:=0;i<len(fusionDataGroupArr);i++ {
|
|
|
+ fusionArr := fusionDataGroupArr[i]
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(fusionArr []string,i int) {
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ //构建数据
|
|
|
+ if (i+1)%500 == 0 {
|
|
|
+ log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
|
|
|
+ }
|
|
|
+ weight :=NewWeightData(fusionArr)
|
|
|
+ ////整理数据-筛选排名,模板
|
|
|
+ weight.analyzeBuildStandardData()
|
|
|
+
|
|
|
+ if len(fusionArr)<=1 {
|
|
|
+ saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
|
|
|
+ saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
+ saveRecordData["_id"] = saveid
|
|
|
+ mgo.Save(record_coll_name,saveRecordData)
|
|
|
+ }else {
|
|
|
+ //if addOrUpdateArr[i] {
|
|
|
+ // //log.Println("多组更新... ...")
|
|
|
+ // tmpdata:=infoFusionArr[i]
|
|
|
+ // updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
|
|
|
+ //
|
|
|
+ // UpdateFusion.updatePool <- []map[string]interface{}{
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "_id": tmpdata["_id"],
|
|
|
+ // },
|
|
|
+ // updateFusionData,
|
|
|
+ // }
|
|
|
+ // UpdateRecord.updatePool <- []map[string]interface{}{
|
|
|
+ // map[string]interface{}{
|
|
|
+ // "_id": tmpdata["_id"],
|
|
|
+ // },
|
|
|
+ // updateRecordData,
|
|
|
+ // }
|
|
|
+ //}else {
|
|
|
+ // //log.Println("多组生成... ...")
|
|
|
+ // saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
|
|
|
+ // saveid:=mgo.Save(fusion_coll_name,saveFusionData)
|
|
|
+ // saveRecordData["_id"] = saveid
|
|
|
+ // mgo.Save(record_coll_name,saveRecordData)
|
|
|
+ //}
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ }(fusionArr,i)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+
|
|
|
+ log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
|
|
|
+ log.Println("睡眠30秒,然后在发广播")
|
|
|
+ time.Sleep(30 * time.Second)
|
|
|
+ //任务完成,开始发送广播通知下面节点
|
|
|
+ taskSendFusionUdp(mapInfo)
|
|
|
+
|
|
|
+}
|