main.go 11 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "go.mongodb.org/mongo-driver/bson/primitive"
  5. "log"
  6. mu "mfw/util"
  7. "net"
  8. "os"
  9. "qfw/common/src/qfw/util"
  10. qu "qfw/util"
  11. "strconv"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. sysconfig map[string]interface{} //配置文件
  17. mgo *MongodbSim //mongodb操作对象
  18. udpclient mu.UdpClient //udp对象
  19. nextNode []map[string]interface{} //下节点数组
  20. coll_name string
  21. fusion_coll_name string
  22. record_coll_name string //表名
  23. NoNeedFusionKey map[string]interface{} //不需要融合的key
  24. UpdateFusion *updateFusionInfo
  25. UpdateRecord *updateRecordInfo //更新池
  26. )
  27. func initMgo() {
  28. mconf := sysconfig["mongodb"].(map[string]interface{})
  29. log.Println(mconf)
  30. mgo = &MongodbSim{
  31. MongodbAddr: mconf["addrName"].(string),
  32. DbName: mconf["dbName"].(string),
  33. Size: qu.IntAllDef(mconf["pool"], 10),
  34. }
  35. mgo.InitPool()
  36. coll_name = mconf["collName"].(string)
  37. fusion_coll_name = sysconfig["fusion_coll_name"].(string)
  38. record_coll_name = sysconfig["record_coll_name"].(string)
  39. NoNeedFusionKey = sysconfig["notFusionKey"].(map[string]interface{})
  40. }
  41. func init() {
  42. //加载配置文件
  43. qu.ReadConfig(&sysconfig)
  44. initMgo()
  45. //更新池
  46. UpdateFusion = newUpdateFusionPool()
  47. go UpdateFusion.updateFusionData()
  48. UpdateRecord = newUpdateRecordPool()
  49. go UpdateRecord.updateRecordData()
  50. log.Println("采用udp模式")
  51. }
  52. func mainT() {
  53. go checkMapJob()
  54. updport := sysconfig["udpport"].(string)
  55. udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
  56. udpclient.Listen(processUdpMsg)
  57. log.Println("Udp服务监听", updport)
  58. time.Sleep(99999 * time.Hour)
  59. }
  60. //快速测试使用
  61. func main() {
  62. sid := "100000000000000000000000"
  63. eid := "900000000000000000000000"
  64. //log.Println(sid, "---", eid)
  65. mapinfo := map[string]interface{}{}
  66. if sid == "" || eid == "" {
  67. log.Println("sid,eid参数不能为空")
  68. os.Exit(0)
  69. }
  70. mapinfo["gtid"] = sid
  71. mapinfo["lteid"] = eid
  72. startTask([]byte{}, mapinfo)
  73. time.Sleep(99999 * time.Hour)
  74. }
  75. //融合具体方法
  76. func startTask(data []byte, mapInfo map[string]interface{}) {
  77. log.Println("开始融合流程")
  78. defer qu.Catch()
  79. //区间id
  80. q := map[string]interface{}{
  81. "_id": map[string]interface{}{
  82. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  83. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  84. },
  85. }
  86. log.Println("查询条件:",q)
  87. sess := mgo.GetMgoConn()
  88. defer mgo.DestoryMongoConn(sess)
  89. it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  90. //编译不同的融合组,如何划分组
  91. /***********************/
  92. /***********************/
  93. /***y
  94. ********************/
  95. /***********************/
  96. fusionDataGroupArr := make([][]string,0) //待融合组
  97. addOrUpdateArr := make([]bool,0) //新增-bool-记录-组新增,组更新
  98. infoFusionArr := make([]map[string]interface{},0) //记录取融合表的数据
  99. repeatArr,sourceArr,index := make([]string,0),make([]string,0),0 //重复数据组
  100. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  101. if index%1000 == 0 {
  102. log.Println("current index",index,tmp["_id"])
  103. }
  104. tmpId:=BsonTOStringId(tmp["_id"])
  105. repeat:=qu.IntAll(tmp["repeat"])
  106. sourceid:=qu.ObjToString(tmp["repeat_id"])
  107. if repeat==1 {
  108. repeatArr = append(repeatArr,tmpId)
  109. sourceArr = append(sourceArr,sourceid)
  110. }else {
  111. fusionDataGroupArr = append(fusionDataGroupArr,[]string{tmpId})
  112. addOrUpdateArr = append(addOrUpdateArr,false)
  113. infoFusionArr = append(infoFusionArr, map[string]interface{}{})
  114. }
  115. tmp = make(map[string]interface{})
  116. }
  117. log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
  118. //根据重复组,重新划分新的组别
  119. for i:=0;i<len(repeatArr);i++ {
  120. sourceid := sourceArr[i]
  121. isAddExist,index := false,0
  122. //根据原sourceid 直接遍历组
  123. R: for k,v:=range fusionDataGroupArr{
  124. for _,v1:=range v{
  125. if v1==sourceid {
  126. index = k
  127. isAddExist = true
  128. break R
  129. }
  130. }
  131. }
  132. if isAddExist { //数组截取替换-找到指定
  133. arr := make([]string,0)
  134. arr = fusionDataGroupArr[index]
  135. arr = append(arr,repeatArr[i])//组拼接当前id
  136. fusionDataGroupArr[index] = arr
  137. }else {//当前段落未找到-需要查询融合表,,遍历融合表
  138. arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
  139. arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
  140. arr = append(arr,repeatArr[i])//组拼接当前id
  141. if len(arr)<1 { //异常错误,新增
  142. log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
  143. arr_error := make([]string,0)
  144. arr_error = append(arr_error,repeatArr[i])//组拼接当前id
  145. fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
  146. addOrUpdateArr = append(addOrUpdateArr,false)
  147. infoFusionArr = append(infoFusionArr, map[string]interface{}{})
  148. }else { //正常更新
  149. fusionDataGroupArr = append(fusionDataGroupArr,arr)
  150. addOrUpdateArr = append(addOrUpdateArr,true)
  151. infoFusionArr = append(infoFusionArr,fusionTmpData)
  152. }
  153. }
  154. //不断改变中
  155. //log.Println("当前分组数量:",len(fusionDataGroupArr))
  156. }
  157. log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
  158. log.Println("********************分割线********************")
  159. log.Println("********************分割线********************")
  160. log.Println("********************分割线********************")
  161. log.Println("开始处理新增分组... ...")
  162. start := int(time.Now().Unix())
  163. //进行分组融合
  164. pool := make(chan bool, 1)
  165. wg := &sync.WaitGroup{}
  166. for i:=0;i<len(fusionDataGroupArr);i++ {
  167. fusionArr := fusionDataGroupArr[i]
  168. //构建数据
  169. log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
  170. //多线程 - 处理数据
  171. pool <- true
  172. wg.Add(1)
  173. go func(fusionArr []string,i int) {
  174. defer func() {
  175. <-pool
  176. wg.Done()
  177. }()
  178. weight :=NewWeightData(fusionArr)
  179. ////整理数据-筛选排名,模板
  180. weight.analyzeBuildStandardData()
  181. if len(fusionArr)<=1 {
  182. //log.Println("单组生成... ...")
  183. saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
  184. saveid:=mgo.Save(fusion_coll_name,saveFusionData)
  185. saveRecordData["_id"] = saveid
  186. mgo.Save(record_coll_name,saveRecordData)
  187. }else {
  188. if addOrUpdateArr[i] {
  189. //log.Println("多组更新... ...")
  190. tmpdata:=infoFusionArr[i]
  191. updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
  192. UpdateFusion.updatePool <- []map[string]interface{}{
  193. map[string]interface{}{
  194. "_id": tmpdata["_id"],
  195. },
  196. updateFusionData,
  197. }
  198. UpdateRecord.updatePool <- []map[string]interface{}{
  199. map[string]interface{}{
  200. "_id": tmpdata["_id"],
  201. },
  202. updateRecordData,
  203. }
  204. }else {
  205. //log.Println("多组生成... ...")
  206. saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
  207. saveid:=mgo.Save(fusion_coll_name,saveFusionData)
  208. saveRecordData["_id"] = saveid
  209. mgo.Save(record_coll_name,saveRecordData)
  210. }
  211. }
  212. }(fusionArr,i)
  213. }
  214. wg.Wait()
  215. log.Println("新增融合over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
  216. time.Sleep(30 * time.Second)
  217. //任务完成,开始发送广播通知下面节点
  218. taskSendFusionUdp(mapInfo)
  219. }
  220. //查询融合表数据-找到对应组id
  221. func dealWithFindFusionDataArr(sourceid string) ([]string,map[string]interface{}) {
  222. newArr ,arr := make([]string,0),make(primitive.A,0)
  223. tmpData:=make(map[string]interface{},0)
  224. q := map[string]interface{}{}
  225. sess := mgo.GetMgoConn()
  226. defer mgo.DestoryMongoConn(sess)
  227. it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
  228. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  229. //log.Println(reflect.TypeOf(tmp["fusion_allids"]))
  230. if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
  231. for _,v:=range fusion_allids {
  232. if v==sourceid {
  233. //找到目标组-
  234. arr = fusion_allids
  235. tmpData = tmp
  236. tmp = make(map[string]interface{})
  237. break
  238. }
  239. }
  240. }
  241. tmp = make(map[string]interface{})
  242. }
  243. for _,v:=range arr{
  244. newArr = append(newArr,qu.ObjToString(v))
  245. }
  246. return newArr,tmpData
  247. }
  248. //查询记录1表数据-找到对应的id , 更新用到
  249. func dealWithFindRecordData(sourceid string) string {
  250. newArr ,arr := make([]string,0),make(primitive.A,0)
  251. //tmpData:=make(map[string]interface{},0)
  252. q := map[string]interface{}{}
  253. sess := mgo.GetMgoConn()
  254. defer mgo.DestoryMongoConn(sess)
  255. it := sess.DB(mgo.DbName).C(fusion_coll_name).Find(&q).Iter()
  256. for tmp := make(map[string]interface{}); it.Next(&tmp); {
  257. //log.Println(reflect.TypeOf(tmp["fusion_allids"]))
  258. if fusion_allids,b := tmp["fusion_allids"].(primitive.A);b {
  259. for _,v:=range fusion_allids {
  260. if v==sourceid {
  261. //找到目标组-
  262. arr = fusion_allids
  263. //tmpData = tmp
  264. tmp = make(map[string]interface{})
  265. break
  266. }
  267. }
  268. }
  269. tmp = make(map[string]interface{})
  270. }
  271. for _,v:=range arr{
  272. newArr = append(newArr,qu.ObjToString(v))
  273. }
  274. return ""
  275. }
  276. //udp 监听
  277. func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
  278. switch act {
  279. case mu.OP_TYPE_DATA: //上个节点的数据
  280. //从表中开始处理
  281. var mapInfo map[string]interface{}
  282. err := json.Unmarshal(data, &mapInfo)
  283. log.Println("err:", err, "mapInfo:", mapInfo)
  284. if err != nil {
  285. udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra)
  286. } else if mapInfo != nil {
  287. taskType := qu.ObjToString(mapInfo["stype"])
  288. if taskType == "fusion" {
  289. go startTask(data, mapInfo)
  290. } else {
  291. log.Println("未知类型:融合异常... ...")
  292. }
  293. key, _ := mapInfo["key"].(string)
  294. if key == "" {
  295. key = "udpok"
  296. }
  297. udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra)
  298. }
  299. case mu.OP_NOOP: //下个节点回应
  300. ok := string(data)
  301. if ok != "" {
  302. log.Println("ok:", ok)
  303. udptaskmap.Delete(ok)
  304. }
  305. }
  306. }
  307. //结束发送udp
  308. func taskSendFusionUdp(mapinfo map[string]interface{}) {
  309. //log.Println("信息融合结束-发送udp")
  310. for _, to := range nextNode {
  311. sid, _ := mapinfo["gtid"].(string)
  312. eid, _ := mapinfo["lteid"].(string)
  313. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  314. by, _ := json.Marshal(map[string]interface{}{
  315. "gtid": sid,
  316. "lteid": eid,
  317. "stype": util.ObjToString(to["stype"]),
  318. "key": key,
  319. })
  320. addr := &net.UDPAddr{
  321. IP: net.ParseIP(to["addr"].(string)),
  322. Port: util.IntAll(to["port"]),
  323. }
  324. node := &udpNode{by, addr, time.Now().Unix(), 0}
  325. udptaskmap.Store(key, node)
  326. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  327. }
  328. }
  329. //判断是否在当前id段落
  330. func judgeIsCurIds (gtid string,lteid string,curid string) bool {
  331. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  332. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  333. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  334. if cur_time>gt_time&&cur_time<=lte_time {
  335. return true
  336. }
  337. return false
  338. }