fusionFullData.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. qu "qfw/util"
  6. "qfw/util/elastic"
  7. "strings"
  8. "sync"
  9. "time"
  10. es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
  11. )
  12. func startFusionData() {
  13. start := int(time.Now().Unix())
  14. log.Println("开始遍历索引-进行融合............")
  15. //遍历索引
  16. esclient := elastic.GetEsConn()
  17. defer elastic.DestoryEsConn(esclient)
  18. if esclient == nil {
  19. log.Fatalln("连接池异常")
  20. }
  21. q :=es_elastic.NewBoolQuery()
  22. cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
  23. Size(200).Do()
  24. if err != nil {
  25. log.Fatal("cursor",err)
  26. }
  27. if cursor.Results == nil {
  28. log.Fatalf("results != nil; got nil")
  29. }
  30. if cursor.Results.Hits == nil {
  31. log.Fatalf("expected results.Hits != nil; got nil")
  32. }
  33. log.Println("查询正常-总数:",cursor.TotalHits())
  34. //多线程 - 处理数据
  35. pool_mgo := make(chan bool, mgo_pool)
  36. wg_mgo := &sync.WaitGroup{}
  37. pages,numDocs := 0,0
  38. for {
  39. searchResult, err := cursor.Next()
  40. if err != nil {
  41. if err.Error() == "EOS" {
  42. break
  43. }else {
  44. log.Fatal("cursor searchResult",err)
  45. }
  46. }
  47. pages++
  48. isLog := false
  49. for _, hit := range searchResult.Hits.Hits {
  50. tmp := make(map[string]interface{})
  51. err := json.Unmarshal(*hit.Source, &tmp)
  52. if err != nil {
  53. log.Println("json Unmarshal error")
  54. continue
  55. }
  56. if !isLog && numDocs%10000==0 {
  57. log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
  58. isLog = true
  59. }
  60. numDocs++
  61. fusion_ids := qu.ObjToString(tmp["allids"])
  62. fusionArr := strings.Split(fusion_ids, ",")
  63. sourceid := qu.ObjToString(tmp["_id"])
  64. pool_mgo <- true
  65. wg_mgo.Add(1)
  66. go func(sourceid string, fusionArr []string) {
  67. defer func() {
  68. <-pool_mgo
  69. wg_mgo.Done()
  70. }()
  71. weight := NewWeightData(fusionArr)
  72. weight.analyzeBuildStandardData()
  73. if len(fusionArr) <= 1 { //单组数据
  74. saveFusionData, saveRecordData := weight.dealWithAddFusionStruct()
  75. saveid := mgo.Save(fusion_coll_name, saveFusionData)
  76. //新增-Record
  77. saveRecordData["_id"] = saveid
  78. mgo.Save(record_coll_name,saveRecordData)
  79. //UpdateRecord.add_pool <- saveRecordData //批量新增
  80. //批量更新Es
  81. //fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
  82. //updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
  83. //updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
  84. //UpdateElastic.update_pool <- map[string]string{
  85. // "id":sourceid,
  86. // "updateStr":updateStr1+updateStr2,
  87. //}
  88. }else { //多组数据
  89. saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct()
  90. saveid := mgo.Save(fusion_coll_name, saveFusionData)
  91. //新增-Record
  92. saveRecordData["_id"] = saveid
  93. mgo.Save(record_coll_name,saveRecordData)
  94. //UpdateRecord.add_pool <- saveRecordData //批量新增
  95. //批量更新Es
  96. //fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
  97. //updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
  98. //updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
  99. //UpdateElastic.update_pool <- map[string]string{
  100. // "id":sourceid,
  101. // "updateStr":updateStr1+updateStr2,
  102. //}
  103. }
  104. }(sourceid, fusionArr)
  105. }
  106. }
  107. log.Println("遍历Es结束......")
  108. wg_mgo.Wait()
  109. log.Println("fusion is over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
  110. }
  111. func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
  112. startFusionData()
  113. return
  114. log.Println("开始全量融合流程")
  115. defer qu.Catch()
  116. //区间id
  117. q := map[string]interface{}{
  118. "_id": map[string]interface{}{
  119. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  120. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  121. },
  122. }
  123. log.Println("查询条件:",q)
  124. sess := mgo.GetMgoConn()
  125. defer mgo.DestoryMongoConn(sess)
  126. it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  127. index,isOK:=0,0
  128. start := int(time.Now().Unix())
  129. //多线程升索引
  130. pool_es := make(chan bool, es_pool)
  131. wg_es := &sync.WaitGroup{}
  132. tmpEsMap := make(map[string]string,0)
  133. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  134. //每遍历isgroupfn条 划分组别
  135. if index%isgroupfn==0 && index!=0 {
  136. log.Println("current index",index,tmp["_id"])
  137. //新的一组执行上一组生索引
  138. for k,v:=range tmpEsMap {
  139. pool_es <- true
  140. wg_es.Add(1)
  141. go func(es_id string,cur_ids string) {
  142. defer func() {
  143. <-pool_es
  144. wg_es.Done()
  145. }()
  146. if es_id!="" && cur_ids!="" {
  147. dataArr := *elastic.GetById(esIndex,esType,es_id)
  148. if len(dataArr)>0 { //存在-更新
  149. allids := qu.ObjToString(dataArr[0]["allids"])
  150. allids = allids+","+cur_ids
  151. updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  152. elastic.Update(esIndex,esType,es_id, updateStr)
  153. }else { //不存在-新增
  154. savetmp := make(map[string]interface{}, 0)
  155. savetmp["allids"] = cur_ids
  156. savetmp["_id"] = StringTOBsonId(es_id)
  157. savetmp["template_id"] = ""
  158. savetmp["fusion_id"] = ""
  159. elastic.Save(esIndex, esType, savetmp)
  160. }
  161. }else {
  162. log.Println("异常",es_id,cur_ids)
  163. }
  164. }(k,v)
  165. }
  166. wg_es.Wait()
  167. tmpEsMap = make(map[string]string,0)
  168. }
  169. repeat := qu.IntAll(tmp["repeat"])
  170. sourceid := BsonTOStringId(tmp["_id"])
  171. repeatid := BsonTOStringId(tmp["_id"])
  172. if repeat==1 {
  173. sourceid = qu.ObjToString(tmp["repeat_id"])
  174. }else {
  175. isOK++
  176. }
  177. if tmpEsMap[sourceid]!="" {
  178. ids := tmpEsMap[sourceid]
  179. ids = ids+","+repeatid
  180. tmpEsMap[sourceid] = ids
  181. }else {
  182. tmpEsMap[sourceid] = repeatid
  183. }
  184. tmp = make(map[string]interface{})
  185. }
  186. log.Println("task first:",index,"不重复数:",isOK,"遍历分组数据用时:",int(time.Now().Unix())-start,"秒")
  187. //处理剩余数据
  188. if len(tmpEsMap)>0 {
  189. log.Println("处理剩余数据:",len(tmpEsMap))
  190. for k,v:=range tmpEsMap {
  191. pool_es <- true
  192. wg_es.Add(1)
  193. go func(es_id string,cur_ids string) {
  194. defer func() {
  195. <-pool_es
  196. wg_es.Done()
  197. }()
  198. if es_id!="" && cur_ids!="" {
  199. dataArr := *elastic.GetById(esIndex,esType,es_id)
  200. if len(dataArr)>0 { //存在-更新
  201. allids := qu.ObjToString(dataArr[0]["allids"])
  202. allids = allids+","+cur_ids
  203. updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  204. elastic.Update(esIndex,esType,es_id, updateStr)
  205. }else { //不存在-新增
  206. savetmp := make(map[string]interface{}, 0)
  207. savetmp["allids"] = cur_ids
  208. savetmp["_id"] = StringTOBsonId(es_id)
  209. savetmp["template_id"] = ""
  210. savetmp["fusion_id"] = ""
  211. elastic.Save(esIndex, esType, savetmp)
  212. }
  213. }else {
  214. log.Println("异常",es_id,cur_ids)
  215. }
  216. }(k,v)
  217. }
  218. wg_es.Wait()
  219. tmpEsMap = make(map[string]string,0)
  220. }
  221. log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒")
  222. time.Sleep(30 * time.Second)
  223. //具体融合数据的方法
  224. startFusionData()
  225. log.Println("睡眠30秒,然后在发广播")
  226. time.Sleep(30 * time.Second)
  227. taskSendFusionUdp(mapInfo)
  228. }