fusionFullData.go 7.3 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. qu "qfw/util"
  6. "qfw/util/elastic"
  7. "strings"
  8. "sync"
  9. "time"
  10. es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
  11. )
  12. func startFusionData() {
  13. start := int(time.Now().Unix())
  14. log.Println("开始遍历索引-进行融合............")
  15. //遍历索引
  16. esclient := elastic.GetEsConn()
  17. defer elastic.DestoryEsConn(esclient)
  18. if esclient == nil {
  19. log.Fatalln("连接池异常")
  20. }
  21. q :=es_elastic.NewBoolQuery()
  22. cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
  23. Size(200).Do()
  24. if err != nil {
  25. log.Fatal("cursor",err)
  26. }
  27. if cursor.Results == nil {
  28. log.Fatalf("results != nil; got nil")
  29. }
  30. if cursor.Results.Hits == nil {
  31. log.Fatalf("expected results.Hits != nil; got nil")
  32. }
  33. log.Println("查询正常-总数:",cursor.TotalHits())
  34. //多线程 - 处理数据
  35. pool_mgo := make(chan bool, mgo_pool)
  36. wg_mgo := &sync.WaitGroup{}
  37. pages,numDocs := 0,0
  38. for {
  39. searchResult, err := cursor.Next()
  40. if err != nil {
  41. if err.Error() == "EOS" {
  42. break
  43. }else {
  44. log.Fatal("cursor searchResult",err)
  45. }
  46. }
  47. pages++
  48. isLog := false
  49. for _, hit := range searchResult.Hits.Hits {
  50. tmp := make(map[string]interface{})
  51. err := json.Unmarshal(*hit.Source, &tmp)
  52. if err != nil {
  53. log.Println("json Unmarshal error")
  54. continue
  55. }
  56. if !isLog && numDocs%10000==0 {
  57. log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
  58. isLog = true
  59. }
  60. numDocs++
  61. fusion_ids := qu.ObjToString(tmp["allids"])
  62. fusionArr := strings.Split(fusion_ids, ",")
  63. sourceid := qu.ObjToString(tmp["_id"])
  64. pool_mgo <- true
  65. wg_mgo.Add(1)
  66. go func(sourceid string, fusionArr []string) {
  67. defer func() {
  68. <-pool_mgo
  69. wg_mgo.Done()
  70. }()
  71. weight := NewWeightData(fusionArr)
  72. weight.analyzeBuildStandardData()
  73. if len(fusionArr) <= 1 { //单组数据
  74. saveFusionData, saveRecordData := weight.dealWithAddFusionStruct()
  75. saveid := mgo.Save(fusion_coll_name, saveFusionData)
  76. //新增-Record 批量新增-经测试-批量新增与多线程新增 速度306s-236s 相差20%的耗时
  77. saveRecordData["_id"] = saveid
  78. UpdateRecord.add_pool <- saveRecordData
  79. //批量更新Es -问题耗时
  80. fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
  81. updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
  82. updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
  83. elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
  84. UpdateElastic.update_pool <- map[string]string{
  85. "id":sourceid,
  86. "updateStr":updateStr1+updateStr2,
  87. }
  88. }else {
  89. saveFusionData, saveRecordData := weight.dealWithMultipleAddFusionStruct()
  90. saveid := mgo.Save(fusion_coll_name, saveFusionData)
  91. //新增-Record
  92. saveRecordData["_id"] = saveid
  93. UpdateRecord.add_pool <- saveRecordData //批量新增
  94. //批量更新Es -
  95. fusion_id, template_id := BsonTOStringId(saveid), qu.ObjToString(saveFusionData["fusion_templateid"])
  96. updateStr1 := `ctx._source.template_id=` + `"` + template_id + `";`
  97. updateStr2 := `ctx._source.fusion_id=` + `"` + fusion_id + `"`
  98. elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
  99. UpdateElastic.update_pool <- map[string]string{
  100. "id":sourceid,
  101. "updateStr":updateStr1+updateStr2,
  102. }
  103. }
  104. }(sourceid, fusionArr)
  105. }
  106. }
  107. log.Println("遍历Es结束......")
  108. wg_mgo.Wait()
  109. log.Println("fusion is over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
  110. }
  111. func goUpdateEs(sourceid string,updateStr string) {
  112. UpdateElastic.update_pool <- map[string]string{
  113. "id":sourceid,
  114. "updateStr":updateStr,
  115. }
  116. }
  117. func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
  118. startFusionData()
  119. return
  120. log.Println("开始全量融合流程")
  121. defer qu.Catch()
  122. //区间id
  123. q := map[string]interface{}{
  124. "_id": map[string]interface{}{
  125. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  126. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  127. },
  128. }
  129. log.Println("查询条件:",q)
  130. sess := mgo.GetMgoConn()
  131. defer mgo.DestoryMongoConn(sess)
  132. it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  133. index,isOK:=0,0
  134. start := int(time.Now().Unix())
  135. //多线程升索引
  136. pool_es := make(chan bool, es_pool)
  137. wg_es := &sync.WaitGroup{}
  138. tmpEsMap := make(map[string]string,0)
  139. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  140. //每遍历isgroupfn条 划分组别
  141. if index%isgroupfn==0 && index!=0 {
  142. log.Println("current index",index,tmp["_id"])
  143. //新的一组执行上一组生索引
  144. for k,v:=range tmpEsMap {
  145. pool_es <- true
  146. wg_es.Add(1)
  147. go func(es_id string,cur_ids string) {
  148. defer func() {
  149. <-pool_es
  150. wg_es.Done()
  151. }()
  152. if es_id!="" && cur_ids!="" {
  153. dataArr := *elastic.GetById(esIndex,esType,es_id)
  154. if len(dataArr)>0 { //存在-更新
  155. allids := qu.ObjToString(dataArr[0]["allids"])
  156. allids = allids+","+cur_ids
  157. updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  158. elastic.Update(esIndex,esType,es_id, updateStr)
  159. }else { //不存在-新增
  160. savetmp := make(map[string]interface{}, 0)
  161. savetmp["allids"] = cur_ids
  162. savetmp["_id"] = StringTOBsonId(es_id)
  163. savetmp["template_id"] = ""
  164. savetmp["fusion_id"] = ""
  165. elastic.Save(esIndex, esType, savetmp)
  166. }
  167. }else {
  168. log.Println("异常",es_id,cur_ids)
  169. }
  170. }(k,v)
  171. }
  172. wg_es.Wait()
  173. tmpEsMap = make(map[string]string,0)
  174. }
  175. repeat := qu.IntAll(tmp["repeat"])
  176. sourceid := BsonTOStringId(tmp["_id"])
  177. repeatid := BsonTOStringId(tmp["_id"])
  178. if repeat==1 {
  179. sourceid = qu.ObjToString(tmp["repeat_id"])
  180. }else {
  181. isOK++
  182. }
  183. if tmpEsMap[sourceid]!="" {
  184. ids := tmpEsMap[sourceid]
  185. ids = ids+","+repeatid
  186. tmpEsMap[sourceid] = ids
  187. }else {
  188. tmpEsMap[sourceid] = repeatid
  189. }
  190. tmp = make(map[string]interface{})
  191. }
  192. log.Println("task first:",index,"不重复数:",isOK,"遍历分组数据用时:",int(time.Now().Unix())-start,"秒")
  193. //处理剩余数据
  194. if len(tmpEsMap)>0 {
  195. log.Println("处理剩余数据:",len(tmpEsMap))
  196. for k,v:=range tmpEsMap {
  197. pool_es <- true
  198. wg_es.Add(1)
  199. go func(es_id string,cur_ids string) {
  200. defer func() {
  201. <-pool_es
  202. wg_es.Done()
  203. }()
  204. if es_id!="" && cur_ids!="" {
  205. dataArr := *elastic.GetById(esIndex,esType,es_id)
  206. if len(dataArr)>0 { //存在-更新
  207. allids := qu.ObjToString(dataArr[0]["allids"])
  208. allids = allids+","+cur_ids
  209. updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  210. elastic.Update(esIndex,esType,es_id, updateStr)
  211. }else { //不存在-新增
  212. savetmp := make(map[string]interface{}, 0)
  213. savetmp["allids"] = cur_ids
  214. savetmp["_id"] = StringTOBsonId(es_id)
  215. savetmp["template_id"] = ""
  216. savetmp["fusion_id"] = ""
  217. elastic.Save(esIndex, esType, savetmp)
  218. }
  219. }else {
  220. log.Println("异常",es_id,cur_ids)
  221. }
  222. }(k,v)
  223. }
  224. wg_es.Wait()
  225. tmpEsMap = make(map[string]string,0)
  226. }
  227. log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒")
  228. time.Sleep(30 * time.Second)
  229. //具体融合数据的方法
  230. startFusionData()
  231. log.Println("睡眠30秒,然后在发广播")
  232. time.Sleep(30 * time.Second)
  233. taskSendFusionUdp(mapInfo)
  234. }