123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520 |
- package main
- import (
- "fmt"
- "log"
- qu "qfw/util"
- "qfw/util/elastic"
- "strings"
- "sync"
- "time"
- )
- //func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
- //
- // log.Println("开始全量融合流程")
- // defer qu.Catch()
- // //区间id
- // q := map[string]interface{}{
- // "_id": map[string]interface{}{
- // "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- // "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- // },
- // }
- // log.Println("查询条件:",q)
- // sess := mgo.GetMgoConn()
- // defer mgo.DestoryMongoConn(sess)
- // it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
- // //编译不同的融合组,如何划分组
- // fusionDataGroupMap := make(map[string][]string,0) //待融合组
- //
- // norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0 //重复数据组
- //
- // start := int(time.Now().Unix())
- // for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- // if index%10000 == 0 {
- // log.Println("current index",index,tmp["_id"])
- // }
- // tmpId:=BsonTOStringId(tmp["_id"])
- // repeat:=qu.IntAll(tmp["repeat"])
- // sourceid:=qu.ObjToString(tmp["repeat_id"])
- // if repeat==1 {
- // repeatArr = append(repeatArr,tmpId)
- // sourceArr = append(sourceArr,sourceid)
- // }else {
- // norepeatArr = append(norepeatArr,tmpId)
- // }
- //
- // tmp = make(map[string]interface{})
- // }
- //
- // log.Println("task first:",index,len(norepeatArr),"+",len(repeatArr))
- // log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
- //
- // //根据重复组,重新划分新的组别
- // start = int(time.Now().Unix())
- //
- // //多线程升索引
- // pool_es := make(chan bool, es_pool)
- // wg_es := &sync.WaitGroup{}
- // tmpEsMap := make(map[string]string,0)
- // isGroupNum := 1000
- // for i:=0;i<len(repeatArr);i++ {
- // if i%10000 == 0 {
- // log.Println("curent index ",i)
- // }
- // if i%isGroupNum==0 && i!=0 {
- // //新的一组执行上一组生索引
- // for k,v:=range tmpEsMap {
- // pool_es <- true
- // wg_es.Add(1)
- // go func(es_id string,cur_ids string) {
- // defer func() {
- // <-pool_es
- // wg_es.Done()
- // }()
- // if es_id!="" && cur_ids!="" {
- // dataArr := *elastic.GetById(esIndex,esType,es_id)
- // if len(dataArr)>0 { //存在-更新
- // allids := qu.ObjToString(dataArr[0]["allids"])
- // allids = allids+","+cur_ids
- // updateStr := `ctx._source.allids=`+ `"`+allids+`"`
- // elastic.Update(esIndex,esType,es_id, updateStr)
- // }else { //不存在-新增
- // savetmp := make(map[string]interface{}, 0)
- // savetmp["allids"] = cur_ids
- // savetmp["_id"] = StringTOBsonId(es_id)
- // savetmp["template_id"] = ""
- // savetmp["fusion_id"] = ""
- // elastic.Save(esIndex, esType, savetmp)
- // }
- // }else {
- // log.Println("异常",es_id,cur_ids)
- // }
- // }(k,v)
- //
- // }
- // wg_es.Wait()
- //
- // tmpEsMap = make(map[string]string,0)
- // }
- // //新增一条数据
- // repeatid :=repeatArr[i]
- // sourceid := sourceArr[i]
- // if tmpEsMap[sourceid]!="" {
- // ids := tmpEsMap[sourceid]
- // ids = ids+","+repeatid
- // tmpEsMap[sourceid] = ids
- // }else {
- // tmpEsMap[sourceid] = sourceid+","+repeatid
- // }
- // }
- //
- // //处理剩余数据
- // if len(tmpEsMap)>0 {
- // for k,v:=range tmpEsMap {
- // pool_es <- true
- // wg_es.Add(1)
- // go func(es_id string,cur_ids string) {
- // defer func() {
- // <-pool_es
- // wg_es.Done()
- // }()
- // dataArr := *elastic.GetById(esIndex,esType,es_id)
- // if len(dataArr)>0 { //存在-更新
- // allids := qu.ObjToString(dataArr[0]["allids"])
- // allids = allids+","+cur_ids
- // updateStr := `ctx._source.allids=`+ `"`+allids+`"`
- // elastic.Update(esIndex,esType,es_id, updateStr)
- // }else { //不存在-新增
- // savetmp := make(map[string]interface{}, 0)
- // savetmp["allids"] = cur_ids
- // savetmp["_id"] = StringTOBsonId(es_id)
- // savetmp["template_id"] = ""
- // savetmp["fusion_id"] = ""
- // elastic.Save(esIndex,esType, savetmp)
- // }
- // }(k,v)
- // }
- // wg_es.Wait()
- // tmpEsMap = make(map[string]string,0)
- //
- // }
- //
- //
- // log.Println("前置索引准备完毕......耗时:",int(time.Now().Unix())-start,"秒")
- //
- // start = int(time.Now().Unix())
- // log.Println("开始数据分组... ... ... ...")
- // log.Println("开始数据分组... ... ... ...")
- // log.Println("开始数据分组... ... ... ...")
- //
- // //查询分组-多线程
- // for i:=0;i<len(norepeatArr);i++ {
- // if i%10000==0 {
- // log.Println("cur index ",i,norepeatArr[i])
- // }
- // sourceid:=norepeatArr[i]
- // pool_es <- true
- // wg_es.Add(1)
- // go func(sourceid string) {
- // defer func() {
- // <-pool_es
- // wg_es.Done()
- // }()
- // dataArr := *elastic.GetById(esIndex,esType,sourceid)
- // if len(dataArr)>0 { //存在值
- // allids := qu.ObjToString(dataArr[0]["allids"])
- // arr := strings.Split(allids,",")
- // updatelock.Lock()
- // fusionDataGroupMap[sourceid] = arr
- // updatelock.Unlock()
- // }else {
- // arr:=[]string{sourceid}
- // updatelock.Lock()
- // fusionDataGroupMap[sourceid] = arr
- // updatelock.Unlock()
- //
- //
- // }
- //
- // }(sourceid)
- // }
- // wg_es.Wait()
- //
- //
- // log.Println("最终待融合分组数量:",len(fusionDataGroupMap))
- // log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
- // log.Println("********************分割线********************")
- // log.Println("********************分割线********************")
- // log.Println("********************分割线********************")
- //
- //
- // log.Println("开始进行正式分组融合......先睡秒30秒")
- // time.Sleep(30 * time.Second)
- //
- // start = int(time.Now().Unix())
- // //多线程 - 处理数据
- // pool_mgo := make(chan bool, mgo_pool)
- // wg_mgo := &sync.WaitGroup{}
- //
- // fusionIndex:=0
- // for k,v:=range fusionDataGroupMap {
- // fusionIndex++
- // pool_mgo <- true
- // wg_mgo.Add(1)
- // go func(sourceid string ,fusionArr []string,fusionIndex int) {
- // defer func() {
- // <-pool_mgo
- // wg_mgo.Done()
- // }()
- // if fusionIndex % 10000==0 {
- // log.Println("数据融合数量:",fusionIndex,sourceid)
- // }
- // weight :=NewWeightData(fusionArr)
- // weight.analyzeBuildStandardData()
- // if len(fusionArr)<=1 { //单组数据-需要新增Es
- // saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
- // saveid:=mgo.Save(fusion_coll_name,saveFusionData)
- // saveRecordData["_id"] = saveid
- // mgo.Save(record_coll_name,saveRecordData)
- //
- // //新增es
- // savetmp := make(map[string]interface{}, 0)
- // fusionid:=BsonTOStringId(saveid)
- // savetmp["_id"] = StringTOBsonId(sourceid)
- // savetmp["allids"] = sourceid
- // savetmp["template_id"] = sourceid
- // savetmp["fusion_id"] = fusionid
- // elastic.Save(esIndex,esType,savetmp)
- //
- //
- // }else {
- // saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
- // saveid:=mgo.Save(fusion_coll_name,saveFusionData)
- // saveRecordData["_id"] = saveid
- // mgo.Save(record_coll_name,saveRecordData)
- //
- // //更新数据-融合id-模板id等 `ctx._source.age=101;ctx._source.name="张三"`
- // fusion_id,template_id:=BsonTOStringId(saveid),qu.ObjToString(saveFusionData["fusion_templateid"])
- // updateStr1 := `ctx._source.template_id=`+ `"`+template_id+`";`
- // updateStr2 := `ctx._source.fusion_id=`+ `"`+fusion_id+`"`
- // elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
- //
- // }
- // }(k,v,fusionIndex)
- // }
- //
- // wg_mgo.Wait()
- //
- // log.Println("fusion is over :",fusionIndex,len(fusionDataGroupMap),"用时:",int(time.Now().Unix())-start,"秒")
- // log.Println("睡眠30秒,然后在发广播")
- //
- // time.Sleep(30 * time.Second)
- //
- // //任务完成,开始发送广播通知下面节点
- //
- // taskSendFusionUdp(mapInfo)
- //
- //}
- //增量-融合一小段
- func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
- log.Println("开始增量融合流程")
- defer qu.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("查询条件:",q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
- //编译不同的融合组,如何划分组
- //待融合组
- fusionDataGroupArr := make([][]string,0)
- //需要更新组
- updateFusionMap,curFusionKeyMap:=make(map[string]interface{},0),make(map[string]interface{},0)
- //重复数据组
- norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0
- start := int(time.Now().Unix())
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- if index%10000 == 0 {
- log.Println("current index",index,tmp["_id"])
- }
- tmpId:=BsonTOStringId(tmp["_id"])
- repeat:=qu.IntAll(tmp["repeat"])
- sourceid:=qu.ObjToString(tmp["repeat_id"])
- if repeat==1 {
- repeatArr = append(repeatArr,tmpId)
- sourceArr = append(sourceArr,sourceid)
- }else {
- norepeatArr = append(repeatArr,tmpId)
- }
- tmp = make(map[string]interface{})
- }
- log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
- log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
- //根据重复组,重新划分新的组别
- start = int(time.Now().Unix())
- elastic.InitElasticSize("http://192.168.3.11:9800",10)
- for i:=0;i<len(repeatArr);i++ {
- //查询ES-升索引
- repeatid := repeatArr[i]
- sourceid := sourceArr[i]
- key := fmt.Sprintf("%s",sourceid)
- dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
- if len(dataArr)>0 { //存在值
- if curFusionKeyMap[key]==nil { //存在融合表-不在当前id段落内
- updateFusionMap[key] = ""
- }
- //es 随时更新ids
- allids := qu.ObjToString(dataArr[0]["allids"])
- allids = allids+","+repeatid
- updateStr := `ctx._source.allids=`+ `"`+allids+`"`
- b:=elastic.Update("allzktest","allzktest",sourceid, updateStr)
- if !b {
- log.Println("es更新异常",repeatid,sourceid)
- }
- }else {
- //索引查不到-确定新增- es 随时新增ids
- savetmp := make(map[string]interface{}, 0)
- savetmp["allids"] = repeatid
- savetmp["_id"] = StringTOBsonId(sourceid)
- b:=elastic.Save("allzktest", "allzktest", savetmp)
- if !b {
- log.Println("es保存异常",repeatid,sourceid)
- }
- curFusionKeyMap[key] = ""
- }
- }
- log.Println("前置索引准备完毕... ...","耗时:",int(time.Now().Unix())-start,"秒")
- start = int(time.Now().Unix())
- log.Println("开始数据分组... ... ... ...")
- log.Println("开始数据分组... ... ... ...")
- log.Println("开始数据分组... ... ... ...")
- //当前段落组
- for i:=0;i<len(norepeatArr);i++ {
- sourceid:=norepeatArr[i]
- dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
- if len(dataArr)>0 { //存在值
- allids := qu.ObjToString(dataArr[0]["allids"])
- arr := strings.Split(allids,",")
- arr = append(arr,sourceid)
- fusionDataGroupArr = append(fusionDataGroupArr,arr)
- }else {
- arr:=[]string{sourceid}
- fusionDataGroupArr = append(fusionDataGroupArr,arr)
- }
- }
- //更新组
- for k,_:=range updateFusionMap {
- sourceid:=qu.ObjToString(k)
- dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
- if len(dataArr)>0 { //存在值
- allids := qu.ObjToString(dataArr[0]["allids"])
- arr := strings.Split(allids,",")
- arr = append(arr,sourceid)
- fusionDataGroupArr = append(fusionDataGroupArr,arr)
- }else {
- log.Println("融合表更新,查询Es异常:",sourceid)
- }
- }
- //isErrNum:=0
- //for i:=0;i<len(repeatArr);i++ {
- // sourceid := sourceArr[i]
- // isAddExist,index := false,0
- // //根据原sourceid 直接遍历组
- //R: for k,v:=range fusionDataGroupArr{
- // for _,v1:=range v{
- // if v1==sourceid {
- // index = k
- // isAddExist = true
- // break R
- // }
- // }
- // }
- // if i%1000 == 0 {
- // log.Println("分组中...","current index",i,repeatArr[i])
- // }
- //
- // if isAddExist { //数组截取替换-找到指定
- // arr := make([]string,0)
- // arr = fusionDataGroupArr[index]
- // arr = append(arr,repeatArr[i])//组拼接当前id
- // fusionDataGroupArr[index] = arr
- // log.Println("... ... 正常单组新增",i)
- //
- // }else {//当前段落未找到-需要查询融合表,,遍历融合表
- // arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
- // arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
- // arr = append(arr,repeatArr[i])//组拼接当前id
- //
- //
- //
- //
- //
- // if len(arr)==1 { //异常错误,新增
- // isErrNum++
- // log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
- // arr_error := make([]string,0)
- // arr_error = append(arr_error,repeatArr[i])//组拼接当前id
- // fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
- // addOrUpdateArr = append(addOrUpdateArr,false)
- // infoFusionArr = append(infoFusionArr, map[string]interface{}{})
- // }else { //正常更新
- // log.Println("... ... 正常多组新增",i)
- // fusionDataGroupArr = append(fusionDataGroupArr,arr)
- // addOrUpdateArr = append(addOrUpdateArr,true)
- // infoFusionArr = append(infoFusionArr,fusionTmpData)
- // }
- //
- // }
- // //不断改变中
- // if i%1000 == 0 {
- // log.Println("当前分组数量:",len(fusionDataGroupArr))
- // }
- //}
- log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
- log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
- log.Println("********************分割线********************")
- log.Println("********************分割线********************")
- log.Println("********************分割线********************")
- log.Println("开始处理分组融合... ... ... ...")
- log.Println("开始处理分组融合... ... ... ...")
- log.Println("开始处理分组融合... ... ... ...")
- start = int(time.Now().Unix())
- //多线程 - 处理数据
- pool := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- for i:=0;i<len(fusionDataGroupArr);i++ {
- fusionArr := fusionDataGroupArr[i]
- pool <- true
- wg.Add(1)
- go func(fusionArr []string,i int) {
- defer func() {
- <-pool
- wg.Done()
- }()
- //构建数据
- if (i+1)%500 == 0 {
- log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
- }
- weight :=NewWeightData(fusionArr)
- ////整理数据-筛选排名,模板
- weight.analyzeBuildStandardData()
- if len(fusionArr)<=1 {
- saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
- saveid:=mgo.Save(fusion_coll_name,saveFusionData)
- saveRecordData["_id"] = saveid
- mgo.Save(record_coll_name,saveRecordData)
- }else {
- //if addOrUpdateArr[i] {
- // //log.Println("多组更新... ...")
- // tmpdata:=infoFusionArr[i]
- // updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
- //
- // UpdateFusion.updatePool <- []map[string]interface{}{
- // map[string]interface{}{
- // "_id": tmpdata["_id"],
- // },
- // updateFusionData,
- // }
- // UpdateRecord.updatePool <- []map[string]interface{}{
- // map[string]interface{}{
- // "_id": tmpdata["_id"],
- // },
- // updateRecordData,
- // }
- //}else {
- // //log.Println("多组生成... ...")
- // saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
- // saveid:=mgo.Save(fusion_coll_name,saveFusionData)
- // saveRecordData["_id"] = saveid
- // mgo.Save(record_coll_name,saveRecordData)
- //}
- }
- }(fusionArr,i)
- }
- wg.Wait()
- log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
- log.Println("睡眠30秒,然后在发广播")
- time.Sleep(30 * time.Second)
- //任务完成,开始发送广播通知下面节点
- taskSendFusionUdp(mapInfo)
- }
|