fusionAddData.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. qu "qfw/util"
  6. "qfw/util/elastic"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. //func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
  12. //
  13. // log.Println("开始全量融合流程")
  14. // defer qu.Catch()
  15. // //区间id
  16. // q := map[string]interface{}{
  17. // "_id": map[string]interface{}{
  18. // "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  19. // "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  20. // },
  21. // }
  22. // log.Println("查询条件:",q)
  23. // sess := mgo.GetMgoConn()
  24. // defer mgo.DestoryMongoConn(sess)
  25. // it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  26. // //编译不同的融合组,如何划分组
  27. // fusionDataGroupMap := make(map[string][]string,0) //待融合组
  28. //
  29. // norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0 //重复数据组
  30. //
  31. // start := int(time.Now().Unix())
  32. // for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  33. // if index%10000 == 0 {
  34. // log.Println("current index",index,tmp["_id"])
  35. // }
  36. // tmpId:=BsonTOStringId(tmp["_id"])
  37. // repeat:=qu.IntAll(tmp["repeat"])
  38. // sourceid:=qu.ObjToString(tmp["repeat_id"])
  39. // if repeat==1 {
  40. // repeatArr = append(repeatArr,tmpId)
  41. // sourceArr = append(sourceArr,sourceid)
  42. // }else {
  43. // norepeatArr = append(norepeatArr,tmpId)
  44. // }
  45. //
  46. // tmp = make(map[string]interface{})
  47. // }
  48. //
  49. // log.Println("task first:",index,len(norepeatArr),"+",len(repeatArr))
  50. // log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
  51. //
  52. // //根据重复组,重新划分新的组别
  53. // start = int(time.Now().Unix())
  54. //
  55. // //多线程升索引
  56. // pool_es := make(chan bool, es_pool)
  57. // wg_es := &sync.WaitGroup{}
  58. // tmpEsMap := make(map[string]string,0)
  59. // isGroupNum := 1000
  60. // for i:=0;i<len(repeatArr);i++ {
  61. // if i%10000 == 0 {
  62. // log.Println("curent index ",i)
  63. // }
  64. // if i%isGroupNum==0 && i!=0 {
  65. // //新的一组执行上一组生索引
  66. // for k,v:=range tmpEsMap {
  67. // pool_es <- true
  68. // wg_es.Add(1)
  69. // go func(es_id string,cur_ids string) {
  70. // defer func() {
  71. // <-pool_es
  72. // wg_es.Done()
  73. // }()
  74. // if es_id!="" && cur_ids!="" {
  75. // dataArr := *elastic.GetById(esIndex,esType,es_id)
  76. // if len(dataArr)>0 { //存在-更新
  77. // allids := qu.ObjToString(dataArr[0]["allids"])
  78. // allids = allids+","+cur_ids
  79. // updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  80. // elastic.Update(esIndex,esType,es_id, updateStr)
  81. // }else { //不存在-新增
  82. // savetmp := make(map[string]interface{}, 0)
  83. // savetmp["allids"] = cur_ids
  84. // savetmp["_id"] = StringTOBsonId(es_id)
  85. // savetmp["template_id"] = ""
  86. // savetmp["fusion_id"] = ""
  87. // elastic.Save(esIndex, esType, savetmp)
  88. // }
  89. // }else {
  90. // log.Println("异常",es_id,cur_ids)
  91. // }
  92. // }(k,v)
  93. //
  94. // }
  95. // wg_es.Wait()
  96. //
  97. // tmpEsMap = make(map[string]string,0)
  98. // }
  99. // //新增一条数据
  100. // repeatid :=repeatArr[i]
  101. // sourceid := sourceArr[i]
  102. // if tmpEsMap[sourceid]!="" {
  103. // ids := tmpEsMap[sourceid]
  104. // ids = ids+","+repeatid
  105. // tmpEsMap[sourceid] = ids
  106. // }else {
  107. // tmpEsMap[sourceid] = sourceid+","+repeatid
  108. // }
  109. // }
  110. //
  111. // //处理剩余数据
  112. // if len(tmpEsMap)>0 {
  113. // for k,v:=range tmpEsMap {
  114. // pool_es <- true
  115. // wg_es.Add(1)
  116. // go func(es_id string,cur_ids string) {
  117. // defer func() {
  118. // <-pool_es
  119. // wg_es.Done()
  120. // }()
  121. // dataArr := *elastic.GetById(esIndex,esType,es_id)
  122. // if len(dataArr)>0 { //存在-更新
  123. // allids := qu.ObjToString(dataArr[0]["allids"])
  124. // allids = allids+","+cur_ids
  125. // updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  126. // elastic.Update(esIndex,esType,es_id, updateStr)
  127. // }else { //不存在-新增
  128. // savetmp := make(map[string]interface{}, 0)
  129. // savetmp["allids"] = cur_ids
  130. // savetmp["_id"] = StringTOBsonId(es_id)
  131. // savetmp["template_id"] = ""
  132. // savetmp["fusion_id"] = ""
  133. // elastic.Save(esIndex,esType, savetmp)
  134. // }
  135. // }(k,v)
  136. // }
  137. // wg_es.Wait()
  138. // tmpEsMap = make(map[string]string,0)
  139. //
  140. // }
  141. //
  142. //
  143. // log.Println("前置索引准备完毕......耗时:",int(time.Now().Unix())-start,"秒")
  144. //
  145. // start = int(time.Now().Unix())
  146. // log.Println("开始数据分组... ... ... ...")
  147. // log.Println("开始数据分组... ... ... ...")
  148. // log.Println("开始数据分组... ... ... ...")
  149. //
  150. // //查询分组-多线程
  151. // for i:=0;i<len(norepeatArr);i++ {
  152. // if i%10000==0 {
  153. // log.Println("cur index ",i,norepeatArr[i])
  154. // }
  155. // sourceid:=norepeatArr[i]
  156. // pool_es <- true
  157. // wg_es.Add(1)
  158. // go func(sourceid string) {
  159. // defer func() {
  160. // <-pool_es
  161. // wg_es.Done()
  162. // }()
  163. // dataArr := *elastic.GetById(esIndex,esType,sourceid)
  164. // if len(dataArr)>0 { //存在值
  165. // allids := qu.ObjToString(dataArr[0]["allids"])
  166. // arr := strings.Split(allids,",")
  167. // updatelock.Lock()
  168. // fusionDataGroupMap[sourceid] = arr
  169. // updatelock.Unlock()
  170. // }else {
  171. // arr:=[]string{sourceid}
  172. // updatelock.Lock()
  173. // fusionDataGroupMap[sourceid] = arr
  174. // updatelock.Unlock()
  175. //
  176. //
  177. // }
  178. //
  179. // }(sourceid)
  180. // }
  181. // wg_es.Wait()
  182. //
  183. //
  184. // log.Println("最终待融合分组数量:",len(fusionDataGroupMap))
  185. // log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
  186. // log.Println("********************分割线********************")
  187. // log.Println("********************分割线********************")
  188. // log.Println("********************分割线********************")
  189. //
  190. //
  191. // log.Println("开始进行正式分组融合......先睡秒30秒")
  192. // time.Sleep(30 * time.Second)
  193. //
  194. // start = int(time.Now().Unix())
  195. // //多线程 - 处理数据
  196. // pool_mgo := make(chan bool, mgo_pool)
  197. // wg_mgo := &sync.WaitGroup{}
  198. //
  199. // fusionIndex:=0
  200. // for k,v:=range fusionDataGroupMap {
  201. // fusionIndex++
  202. // pool_mgo <- true
  203. // wg_mgo.Add(1)
  204. // go func(sourceid string ,fusionArr []string,fusionIndex int) {
  205. // defer func() {
  206. // <-pool_mgo
  207. // wg_mgo.Done()
  208. // }()
  209. // if fusionIndex % 10000==0 {
  210. // log.Println("数据融合数量:",fusionIndex,sourceid)
  211. // }
  212. // weight :=NewWeightData(fusionArr)
  213. // weight.analyzeBuildStandardData()
  214. // if len(fusionArr)<=1 { //单组数据-需要新增Es
  215. // saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
  216. // saveid:=mgo.Save(fusion_coll_name,saveFusionData)
  217. // saveRecordData["_id"] = saveid
  218. // mgo.Save(record_coll_name,saveRecordData)
  219. //
  220. // //新增es
  221. // savetmp := make(map[string]interface{}, 0)
  222. // fusionid:=BsonTOStringId(saveid)
  223. // savetmp["_id"] = StringTOBsonId(sourceid)
  224. // savetmp["allids"] = sourceid
  225. // savetmp["template_id"] = sourceid
  226. // savetmp["fusion_id"] = fusionid
  227. // elastic.Save(esIndex,esType,savetmp)
  228. //
  229. //
  230. // }else {
  231. // saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
  232. // saveid:=mgo.Save(fusion_coll_name,saveFusionData)
  233. // saveRecordData["_id"] = saveid
  234. // mgo.Save(record_coll_name,saveRecordData)
  235. //
  236. // //更新数据-融合id-模板id等 `ctx._source.age=101;ctx._source.name="张三"`
  237. // fusion_id,template_id:=BsonTOStringId(saveid),qu.ObjToString(saveFusionData["fusion_templateid"])
  238. // updateStr1 := `ctx._source.template_id=`+ `"`+template_id+`";`
  239. // updateStr2 := `ctx._source.fusion_id=`+ `"`+fusion_id+`"`
  240. // elastic.Update(esIndex,esType,sourceid, updateStr1+updateStr2)
  241. //
  242. // }
  243. // }(k,v,fusionIndex)
  244. // }
  245. //
  246. // wg_mgo.Wait()
  247. //
  248. // log.Println("fusion is over :",fusionIndex,len(fusionDataGroupMap),"用时:",int(time.Now().Unix())-start,"秒")
  249. // log.Println("睡眠30秒,然后在发广播")
  250. //
  251. // time.Sleep(30 * time.Second)
  252. //
  253. // //任务完成,开始发送广播通知下面节点
  254. //
  255. // taskSendFusionUdp(mapInfo)
  256. //
  257. //}
  258. //增量-融合一小段
  259. func startTaskAddData(data []byte, mapInfo map[string]interface{}) {
  260. log.Println("开始增量融合流程")
  261. defer qu.Catch()
  262. //区间id
  263. q := map[string]interface{}{
  264. "_id": map[string]interface{}{
  265. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  266. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  267. },
  268. }
  269. log.Println("查询条件:",q)
  270. sess := mgo.GetMgoConn()
  271. defer mgo.DestoryMongoConn(sess)
  272. it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  273. //编译不同的融合组,如何划分组
  274. //待融合组
  275. fusionDataGroupArr := make([][]string,0)
  276. //需要更新组
  277. updateFusionMap,curFusionKeyMap:=make(map[string]interface{},0),make(map[string]interface{},0)
  278. //重复数据组
  279. norepeatArr,repeatArr,sourceArr,index := make([]string,0),make([]string,0),make([]string,0),0
  280. start := int(time.Now().Unix())
  281. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  282. if index%10000 == 0 {
  283. log.Println("current index",index,tmp["_id"])
  284. }
  285. tmpId:=BsonTOStringId(tmp["_id"])
  286. repeat:=qu.IntAll(tmp["repeat"])
  287. sourceid:=qu.ObjToString(tmp["repeat_id"])
  288. if repeat==1 {
  289. repeatArr = append(repeatArr,tmpId)
  290. sourceArr = append(sourceArr,sourceid)
  291. }else {
  292. norepeatArr = append(repeatArr,tmpId)
  293. }
  294. tmp = make(map[string]interface{})
  295. }
  296. log.Println("task first:",index,len(fusionDataGroupArr),"+",len(repeatArr))
  297. log.Println("遍历数据用时:",int(time.Now().Unix())-start,"秒")
  298. //根据重复组,重新划分新的组别
  299. start = int(time.Now().Unix())
  300. elastic.InitElasticSize("http://192.168.3.11:9800",10)
  301. for i:=0;i<len(repeatArr);i++ {
  302. //查询ES-升索引
  303. repeatid := repeatArr[i]
  304. sourceid := sourceArr[i]
  305. key := fmt.Sprintf("%s",sourceid)
  306. dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
  307. if len(dataArr)>0 { //存在值
  308. if curFusionKeyMap[key]==nil { //存在融合表-不在当前id段落内
  309. updateFusionMap[key] = ""
  310. }
  311. //es 随时更新ids
  312. allids := qu.ObjToString(dataArr[0]["allids"])
  313. allids = allids+","+repeatid
  314. updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  315. b:=elastic.Update("allzktest","allzktest",sourceid, updateStr)
  316. if !b {
  317. log.Println("es更新异常",repeatid,sourceid)
  318. }
  319. }else {
  320. //索引查不到-确定新增- es 随时新增ids
  321. savetmp := make(map[string]interface{}, 0)
  322. savetmp["allids"] = repeatid
  323. savetmp["_id"] = StringTOBsonId(sourceid)
  324. b:=elastic.Save("allzktest", "allzktest", savetmp)
  325. if !b {
  326. log.Println("es保存异常",repeatid,sourceid)
  327. }
  328. curFusionKeyMap[key] = ""
  329. }
  330. }
  331. log.Println("前置索引准备完毕... ...","耗时:",int(time.Now().Unix())-start,"秒")
  332. start = int(time.Now().Unix())
  333. log.Println("开始数据分组... ... ... ...")
  334. log.Println("开始数据分组... ... ... ...")
  335. log.Println("开始数据分组... ... ... ...")
  336. //当前段落组
  337. for i:=0;i<len(norepeatArr);i++ {
  338. sourceid:=norepeatArr[i]
  339. dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
  340. if len(dataArr)>0 { //存在值
  341. allids := qu.ObjToString(dataArr[0]["allids"])
  342. arr := strings.Split(allids,",")
  343. arr = append(arr,sourceid)
  344. fusionDataGroupArr = append(fusionDataGroupArr,arr)
  345. }else {
  346. arr:=[]string{sourceid}
  347. fusionDataGroupArr = append(fusionDataGroupArr,arr)
  348. }
  349. }
  350. //更新组
  351. for k,_:=range updateFusionMap {
  352. sourceid:=qu.ObjToString(k)
  353. dataArr := *elastic.GetById("allzktest","allzktest",sourceid)
  354. if len(dataArr)>0 { //存在值
  355. allids := qu.ObjToString(dataArr[0]["allids"])
  356. arr := strings.Split(allids,",")
  357. arr = append(arr,sourceid)
  358. fusionDataGroupArr = append(fusionDataGroupArr,arr)
  359. }else {
  360. log.Println("融合表更新,查询Es异常:",sourceid)
  361. }
  362. }
  363. //isErrNum:=0
  364. //for i:=0;i<len(repeatArr);i++ {
  365. // sourceid := sourceArr[i]
  366. // isAddExist,index := false,0
  367. // //根据原sourceid 直接遍历组
  368. //R: for k,v:=range fusionDataGroupArr{
  369. // for _,v1:=range v{
  370. // if v1==sourceid {
  371. // index = k
  372. // isAddExist = true
  373. // break R
  374. // }
  375. // }
  376. // }
  377. // if i%1000 == 0 {
  378. // log.Println("分组中...","current index",i,repeatArr[i])
  379. // }
  380. //
  381. // if isAddExist { //数组截取替换-找到指定
  382. // arr := make([]string,0)
  383. // arr = fusionDataGroupArr[index]
  384. // arr = append(arr,repeatArr[i])//组拼接当前id
  385. // fusionDataGroupArr[index] = arr
  386. // log.Println("... ... 正常单组新增",i)
  387. //
  388. // }else {//当前段落未找到-需要查询融合表,,遍历融合表
  389. // arr,fusionTmpData := make([]string,0),make(map[string]interface{},0)
  390. // arr,fusionTmpData = dealWithFindFusionDataArr(sourceid)
  391. // arr = append(arr,repeatArr[i])//组拼接当前id
  392. //
  393. //
  394. //
  395. //
  396. //
  397. // if len(arr)==1 { //异常错误,新增
  398. // isErrNum++
  399. // log.Println("... ... 数据异常异常,融合表,当前组均找不到数据",repeatArr[i])
  400. // arr_error := make([]string,0)
  401. // arr_error = append(arr_error,repeatArr[i])//组拼接当前id
  402. // fusionDataGroupArr = append(fusionDataGroupArr,arr_error)
  403. // addOrUpdateArr = append(addOrUpdateArr,false)
  404. // infoFusionArr = append(infoFusionArr, map[string]interface{}{})
  405. // }else { //正常更新
  406. // log.Println("... ... 正常多组新增",i)
  407. // fusionDataGroupArr = append(fusionDataGroupArr,arr)
  408. // addOrUpdateArr = append(addOrUpdateArr,true)
  409. // infoFusionArr = append(infoFusionArr,fusionTmpData)
  410. // }
  411. //
  412. // }
  413. // //不断改变中
  414. // if i%1000 == 0 {
  415. // log.Println("当前分组数量:",len(fusionDataGroupArr))
  416. // }
  417. //}
  418. log.Println("最终待融合分组数量:",len(fusionDataGroupArr))
  419. log.Println("分组完毕数据用时:",int(time.Now().Unix())-start,"秒")
  420. log.Println("********************分割线********************")
  421. log.Println("********************分割线********************")
  422. log.Println("********************分割线********************")
  423. log.Println("开始处理分组融合... ... ... ...")
  424. log.Println("开始处理分组融合... ... ... ...")
  425. log.Println("开始处理分组融合... ... ... ...")
  426. start = int(time.Now().Unix())
  427. //多线程 - 处理数据
  428. pool := make(chan bool, 3)
  429. wg := &sync.WaitGroup{}
  430. for i:=0;i<len(fusionDataGroupArr);i++ {
  431. fusionArr := fusionDataGroupArr[i]
  432. pool <- true
  433. wg.Add(1)
  434. go func(fusionArr []string,i int) {
  435. defer func() {
  436. <-pool
  437. wg.Done()
  438. }()
  439. //构建数据
  440. if (i+1)%500 == 0 {
  441. log.Println("构建第",i+1,"组数据...","数量:",len(fusionArr),fusionArr)
  442. }
  443. weight :=NewWeightData(fusionArr)
  444. ////整理数据-筛选排名,模板
  445. weight.analyzeBuildStandardData()
  446. if len(fusionArr)<=1 {
  447. saveFusionData,saveRecordData := weight.dealWithAddFusionStruct()
  448. saveid:=mgo.Save(fusion_coll_name,saveFusionData)
  449. saveRecordData["_id"] = saveid
  450. mgo.Save(record_coll_name,saveRecordData)
  451. }else {
  452. //if addOrUpdateArr[i] {
  453. // //log.Println("多组更新... ...")
  454. // tmpdata:=infoFusionArr[i]
  455. // updateFusionData,updateRecordData := weight.dealWithMultipleUpdateFusionStruct(tmpdata)
  456. //
  457. // UpdateFusion.updatePool <- []map[string]interface{}{
  458. // map[string]interface{}{
  459. // "_id": tmpdata["_id"],
  460. // },
  461. // updateFusionData,
  462. // }
  463. // UpdateRecord.updatePool <- []map[string]interface{}{
  464. // map[string]interface{}{
  465. // "_id": tmpdata["_id"],
  466. // },
  467. // updateRecordData,
  468. // }
  469. //}else {
  470. // //log.Println("多组生成... ...")
  471. // saveFusionData,saveRecordData := weight.dealWithMultipleAddFusionStruct()
  472. // saveid:=mgo.Save(fusion_coll_name,saveFusionData)
  473. // saveRecordData["_id"] = saveid
  474. // mgo.Save(record_coll_name,saveRecordData)
  475. //}
  476. }
  477. }(fusionArr,i)
  478. }
  479. wg.Wait()
  480. log.Println("fusion is over :",len(fusionDataGroupArr),"用时:",int(time.Now().Unix())-start,"秒")
  481. log.Println("睡眠30秒,然后在发广播")
  482. time.Sleep(30 * time.Second)
  483. //任务完成,开始发送广播通知下面节点
  484. taskSendFusionUdp(mapInfo)
  485. }