123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- package main
- import (
- "encoding/json"
- "log"
- qu "qfw/util"
- "qfw/util/elastic"
- "strings"
- "sync"
- "time"
- es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
- )
- func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
- //临时测试
- //先导出具体需要融合组数据组-存mongo
- exportFusionMongoData()
- time.Sleep(60 * time.Second)
- //具体融合数据的方法
- startFusionData()
- time.Sleep(60 * time.Second)
- return
- log.Println("开始全量融合流程")
- defer qu.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("查询条件:",q)
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
- index,isOK:=0,0
- start := int(time.Now().Unix())
- //多线程升索引
- pool_es := make(chan bool, es_pool)
- wg_es := &sync.WaitGroup{}
- tmpEsMap := make(map[string]string,0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- //每遍历isgroupfn条 划分组别
- if index%isgroupfn==0 && index!=0 {
- log.Println("current index",index,tmp["_id"])
- //新的一组执行上一组生索引
- for k,v:=range tmpEsMap {
- pool_es <- true
- wg_es.Add(1)
- go func(es_id string,cur_ids string) {
- defer func() {
- <-pool_es
- wg_es.Done()
- }()
- if es_id!="" && cur_ids!="" {
- dataArr := *elastic.GetById(esIndex,esType,es_id)
- if len(dataArr)>0 { //存在-更新
- allids := qu.ObjToString(dataArr[0]["allids"])
- allids = allids+","+cur_ids
- updateStr := `ctx._source.allids=`+ `"`+allids+`"`
- elastic.Update(esIndex,esType,es_id, updateStr)
- }else { //不存在-新增
- savetmp := make(map[string]interface{}, 0)
- savetmp["allids"] = cur_ids
- savetmp["_id"] = StringTOBsonId(es_id)
- savetmp["template_id"] = ""
- savetmp["fusion_id"] = ""
- elastic.Save(esIndex, esType, savetmp)
- }
- }else {
- log.Println("异常",es_id,cur_ids)
- }
- }(k,v)
- }
- wg_es.Wait()
- tmpEsMap = make(map[string]string,0)
- }
- repeat := qu.IntAll(tmp["repeat"])
- sourceid := BsonTOStringId(tmp["_id"])
- repeatid := BsonTOStringId(tmp["_id"])
- if repeat==1 {
- sourceid = qu.ObjToString(tmp["repeat_id"])
- }else {
- isOK++
- }
- if tmpEsMap[sourceid]!="" {
- ids := tmpEsMap[sourceid]
- ids = ids+","+repeatid
- tmpEsMap[sourceid] = ids
- }else {
- tmpEsMap[sourceid] = repeatid
- }
- tmp = make(map[string]interface{})
- }
- log.Println("task first:",index,"不重复数:",isOK,"遍历分组数据用时:",int(time.Now().Unix())-start,"秒")
- //处理剩余数据
- if len(tmpEsMap)>0 {
- log.Println("处理剩余数据:",len(tmpEsMap))
- for k,v:=range tmpEsMap {
- pool_es <- true
- wg_es.Add(1)
- go func(es_id string,cur_ids string) {
- defer func() {
- <-pool_es
- wg_es.Done()
- }()
- if es_id!="" && cur_ids!="" {
- dataArr := *elastic.GetById(esIndex,esType,es_id)
- if len(dataArr)>0 { //存在-更新
- allids := qu.ObjToString(dataArr[0]["allids"])
- allids = allids+","+cur_ids
- updateStr := `ctx._source.allids=`+ `"`+allids+`"`
- elastic.Update(esIndex,esType,es_id, updateStr)
- }else { //不存在-新增
- savetmp := make(map[string]interface{}, 0)
- savetmp["allids"] = cur_ids
- savetmp["_id"] = StringTOBsonId(es_id)
- savetmp["template_id"] = ""
- savetmp["fusion_id"] = ""
- elastic.Save(esIndex, esType, savetmp)
- }
- }else {
- log.Println("异常",es_id,cur_ids)
- }
- }(k,v)
- }
- wg_es.Wait()
- tmpEsMap = make(map[string]string,0)
- }
- log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒")
- time.Sleep(60 * time.Second)
- //先到处具体需要融合组数据-存mongo
- exportFusionMongoData()
- time.Sleep(60 * time.Second)
- //具体融合数据的方法
- startFusionData()
- time.Sleep(60 * time.Second)
- taskSendFusionUdp(mapInfo)
- }
- func exportFusionMongoData() {
- start := int(time.Now().Unix())
- log.Println("开始导出融合组数据......")
- //遍历索引
- esclient := elastic.GetEsConn()
- defer elastic.DestoryEsConn(esclient)
- if esclient == nil {
- log.Println("连接池异常")
- }
- q :=es_elastic.NewBoolQuery()
- cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
- Size(200).Do()
- if err != nil {
- log.Println("cursor",err)
- }
- if cursor.Results == nil {
- log.Println("results != nil; got nil")
- }
- if cursor.Results.Hits == nil {
- log.Println("expected results.Hits != nil; got nil")
- }
- log.Println("查询正常-总数:",cursor.TotalHits())
- //多线程 - 处理数据
- pool_es := make(chan bool, es_pool)
- wg_es := &sync.WaitGroup{}
- pages,numDocs := 0,0
- for {
- searchResult, err := cursor.Next()
- if err != nil {
- if err.Error() == "EOS" {
- break
- }else {
- log.Println("cursor searchResult",err)
- }
- }
- pages++
- isLog := false
- for _, hit := range searchResult.Hits.Hits {
- tmp := make(map[string]interface{})
- err := json.Unmarshal(*hit.Source, &tmp)
- if err != nil {
- log.Println("json Unmarshal error")
- continue
- }
- if !isLog && numDocs%10000==0 {
- log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
- isLog = true
- }
- numDocs++
- fusion_ids := qu.ObjToString(tmp["allids"])
- sourceid := qu.ObjToString(tmp["_id"])
- pool_es <- true
- wg_es.Add(1)
- go func(sourceid string, fusionArr string) {
- defer func() {
- <-pool_es
- wg_es.Done()
- }()
- AddGroupPool.pool <- map[string]interface{}{
- "_id":StringTOBsonId(sourceid),
- "allids":fusion_ids,
- }
- }(sourceid, fusion_ids)
- }
- }
- log.Println("遍历Es结束......")
- wg_es.Wait()
- log.Println("fusion group over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
- }
- func startFusionData() {
- log.Println("开始全量融合流程...")
- defer qu.Catch()
- //可以开多程序-不同id段执行融合
- q := map[string]interface{}{}
- sess := mgo.GetMgoConn()
- defer mgo.DestoryMongoConn(sess)
- it := sess.DB(mgo.DbName).C(group_coll_name).Find(&q).Iter()
- index,start :=0, int(time.Now().Unix())
- //多线程保存数据
- pool_mgo := make(chan bool, mgo_pool)
- wg_mgo := &sync.WaitGroup{}
- for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
- if index%10000==0 {
- log.Println("current index",index,tmp["_id"])
- }
- fusion_ids := qu.ObjToString(tmp["allids"])
- fusionArr := strings.Split(fusion_ids, ",")
- sourceid := BsonTOStringId(tmp["_id"])
- pool_mgo <- true
- wg_mgo.Add(1)
- go func(sourceid string, fusionArr []string) {
- defer func() {
- <-pool_mgo
- wg_mgo.Done()
- }()
- weight := NewWeightData(fusionArr)
- weight.analyzeBuildStandardData()
- saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
- if len(fusionArr) <= 1 {
- saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
- }else {
- saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct()
- }
- //新增融合表
- saveid := mgo.Save(fusion_coll_name, saveFusionData)
- saveRecordData["_id"] = saveid
- //批量新增日志表
- AddRecordPool.pool <- saveRecordData
- //批量更新分组表
- UpdateGroupPool.pool <- []map[string]interface{}{
- map[string]interface{}{
- "_id": StringTOBsonId(sourceid),
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "fusion_id": BsonTOStringId(saveid),
- "template_id":qu.ObjToString(saveFusionData["fusion_templateid"]),
- },
- },
- }
- }(sourceid, fusionArr)
- tmp = make(map[string]interface{})
- }
- wg_mgo.Wait()
- log.Println("fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒")
- }
- //处理结构数据
- func (weight *weightDataMap)dealWithStructData(recordDict *map[string]interface{}) map[string]interface{} {
- //模板id 数据
- templateid:=weight.templateid
- templateTmp:=weight.data[templateid].data
- modifyData :=make(map[string]interface{},0)
- attach_text,isAttach:=make(map[string]interface{},0),false
- if tmp_arr,b := templateTmp["attach_text"].(map[string]interface{});b {
- //有值符合-
- attach_text = tmp_arr
- }
- //附件判重-并合并新增
- keyIndex := -1
- for k,_:=range attach_text {
- key:=qu.IntAll(k)
- if key>keyIndex {
- keyIndex = key
- }
- }
- for _,value_id :=range weight.saveids {
- if templateid == value_id {
- continue
- }
- rankData := weight.data[value_id].data //具体其他排名数据
- if attachData,b := rankData["attach_text"].(map[string]interface{});b {
- if len(attachData)>0 { //有值
- for _,v:=range attachData { //子元素
- if attach,isOK := v.(map[string]interface{});isOK {
- if !dealWithRepeatAttachData(attach_text,attach) {
- //符合条件-不重复直接添加
- keyIndex++
- saveKey := fmt.Sprintf("%v",keyIndex)
- attach_text[saveKey] = attach //key累加
- isAttach = true
- //多条情况-融合
- if (*recordDict)["attach_text"]==nil {
- (*recordDict)["attach_text"] = []map[string]interface{}{
- map[string]interface{}{
- "id":value_id,
- "value":attach,
- },
- }
- }else {
- arr := (*recordDict)["attach_text"].([]map[string]interface{})
- arr = append(arr,map[string]interface{}{
- "id":value_id,
- "value":attach,
- })
- (*recordDict)["attach_text"] = arr
- }
- }
- }
- }
- }
- }
- }
- //联系人 winnerorder
- winnerCount:=qu.IntAll(0)
- winnerArr,b,isWinner,winnerid:=make(primitive.A,0),false,false,templateid
- if winnerArr,b = templateTmp["winnerorder"].([]interface{});b {
- winnerCount = qu.IntAll(len(winnerArr))
- }
- //分包 package
- packageCount:=qu.IntAll(0)
- packageArr,b,isPackage,packageid:=make(map[string]interface{},0),false,false,templateid
- if packageArr,b = templateTmp["package"].(map[string]interface{});b {
- packageCount = qu.IntAll(len(packageArr))
- }
- //遍历其他数据-
- for _,value:=range weight.saveids {
- if templateid == value {
- continue
- }
- //winnerorder
- tmp:=weight.data[value].data
- if arr_1,winner_b := tmp["winnerorder"].(primitive.A);winner_b {
- count:=qu.IntAll(len(arr_1))
- if count > winnerCount {
- winnerCount = count
- winnerArr = arr_1
- isWinner = true
- winnerid = value
- }
- }
- //package
- if arr_2,package_b := (tmp["package"]).(map[string]interface{});package_b {
- count:=qu.IntAll(len(arr_2))
- if count > packageCount {
- packageCount = count
- packageArr = arr_2
- isPackage = true
- packageid = value
- }
- }
- }
- //改变的值
- if len(winnerArr)>0 && winnerArr!=nil && isWinner {
- modifyData["winnerorder"] = winnerArr
- (*recordDict)["winnerorder"] = map[string]interface{}{
- "id":winnerid,
- "value":winnerArr,
- }
- }
- if len(packageArr)>0 && packageArr!=nil && isPackage {
- modifyData["package"] = packageArr
- (*recordDict)["package"] = map[string]interface{}{
- "id":packageid,
- "value":packageArr,
- }
- }
- if len(attach_text)>0 && attach_text!=nil && isAttach {
- modifyData["attach_text"] = attach_text
- }
- return modifyData
- }
|