fusionAddData.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package main
  2. import (
  3. "log"
  4. qu "qfw/util"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. //增量-融合一小段
  10. func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
  11. log.Println("开始增量融合流程")
  12. defer qu.Catch()
  13. //区间id
  14. q := map[string]interface{}{
  15. "_id": map[string]interface{}{
  16. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  17. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  18. },
  19. }
  20. log.Println("查询条件:",q)
  21. sess := mgo.GetMgoConn()
  22. defer mgo.DestoryMongoConn(sess)
  23. it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  24. start := int(time.Now().Unix())
  25. tmpFusionMap := make(map[string]string,0)
  26. index,isOK:=0,0
  27. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  28. if index%10000 == 0 {
  29. log.Println("current index",index,tmp["_id"])
  30. }
  31. repeat := qu.IntAll(tmp["repeat"])
  32. repeatid,sourceid := BsonTOStringId(tmp["_id"]),BsonTOStringId(tmp["_id"])
  33. if repeat==1 {
  34. sourceid = qu.ObjToString(tmp["repeat_id"])
  35. }
  36. if sourceid!="" {
  37. if tmpFusionMap[sourceid]!="" {
  38. ids := tmpFusionMap[sourceid]
  39. ids = ids+","+repeatid
  40. tmpFusionMap[sourceid] = ids
  41. }else {
  42. tmpFusionMap[sourceid] = repeatid
  43. }
  44. }else {
  45. //log.Println("异常: sourceid 为空 ")
  46. }
  47. tmp = make(map[string]interface{})
  48. }
  49. log.Println("task mongo first:",index,len(tmpFusionMap),"开始确认最后增量组数据...")
  50. isUpdateMap:=make(map[string]string)
  51. for sourceid,value:=range tmpFusionMap {
  52. isOK++
  53. if isOK % 1000 ==0 {
  54. log.Println("当前数量:",isOK,sourceid)
  55. }
  56. data:=mgo.FindById(group_coll_name,sourceid)
  57. if data!=nil && len(data)>2 { //更新组
  58. allids:= qu.ObjToString(data["allids"])
  59. tmpFusionMap[sourceid] = allids+","+value
  60. isUpdateMap[sourceid] = qu.ObjToString(data["fusion_id"])
  61. }
  62. }
  63. log.Println("分组数据总用时:",int(time.Now().Unix())-start,"秒")
  64. stareFusionData(tmpFusionMap,isUpdateMap)
  65. log.Println("此段落结束,发送udp","...睡眠30s")
  66. time.Sleep(30 * time.Second)
  67. taskSendFusionUdp(mapInfo)
  68. }
  69. func stareFusionData(tmpFusionMap map[string]string,isUpdateMap map[string]string) {
  70. //根据重复组,重新划分新的组别
  71. log.Println("开始融合操作......")
  72. index,start :=0, int(time.Now().Unix())
  73. //多线程保存数据
  74. pool_mgo := make(chan bool, mgo_pool)
  75. wg_mgo := &sync.WaitGroup{}
  76. for sourceid,v:=range tmpFusionMap {
  77. fusionArr := strings.Split(v, ",")
  78. fusionid := ""
  79. if isUpdateMap[sourceid]!="" {
  80. fusionid = qu.ObjToString(isUpdateMap[sourceid])
  81. }
  82. pool_mgo <- true
  83. wg_mgo.Add(1)
  84. go func(sourceid string, fusionArr []string,fusionid string) {
  85. defer func() {
  86. <-pool_mgo
  87. wg_mgo.Done()
  88. }()
  89. weight := NewWeightData(fusionArr,sourceid)
  90. weight.analyzeBuildStandardData()
  91. saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
  92. if fusionid!="" {//更新
  93. saveFusionData, saveRecordData:=weight.dealWithMultipleUpdateFusionStruct(fusionid)
  94. //更新融合表
  95. UpdateFusionPool.pool <- []map[string]interface{}{
  96. map[string]interface{}{
  97. "_id": StringTOBsonId(fusionid),
  98. },
  99. saveFusionData,
  100. }
  101. //更新日志表
  102. UpdateRecordPool.pool <- []map[string]interface{}{
  103. map[string]interface{}{
  104. "_id": StringTOBsonId(fusionid),
  105. },
  106. map[string]interface{}{
  107. "$set": saveRecordData,
  108. },
  109. }
  110. //更新分组表
  111. UpdateGroupPool.pool <- []map[string]interface{}{
  112. map[string]interface{}{
  113. "_id": StringTOBsonId(sourceid),
  114. },
  115. map[string]interface{}{
  116. "$set": map[string]interface{}{
  117. "allids":strings.Join(fusionArr, ","),
  118. },
  119. },
  120. }
  121. }else {
  122. if len(fusionArr) <= 1 {
  123. saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
  124. }else {
  125. saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct()
  126. }
  127. //新增融合表
  128. saveid := mgo.Save(fusion_coll_name, saveFusionData)
  129. saveRecordData["_id"] = saveid
  130. //批量新增日志表
  131. AddRecordPool.pool <- saveRecordData
  132. //批量新增分组表
  133. AddGroupPool.pool <- map[string]interface{}{
  134. "_id":StringTOBsonId(sourceid),
  135. "allids":strings.Join(fusionArr, ","),
  136. "fusion_id": BsonTOStringId(saveid),
  137. "template_id":qu.ObjToString(saveFusionData["fusion_templateid"]),
  138. }
  139. }
  140. }(sourceid, fusionArr,fusionid)
  141. }
  142. wg_mgo.Wait()
  143. log.Println("add fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒")
  144. }