mark 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. qu "qfw/util"
  6. "qfw/util/elastic"
  7. "strings"
  8. "sync"
  9. "time"
  10. es_elastic "qfw/common/src/gopkg.in/olivere/elastic.v1"
  11. )
  12. func startTaskFullData(data []byte, mapInfo map[string]interface{}) {
  13. //临时测试
  14. //先导出具体需要融合组数据组-存mongo
  15. exportFusionMongoData()
  16. time.Sleep(60 * time.Second)
  17. //具体融合数据的方法
  18. startFusionData()
  19. time.Sleep(60 * time.Second)
  20. return
  21. log.Println("开始全量融合流程")
  22. defer qu.Catch()
  23. //区间id
  24. q := map[string]interface{}{
  25. "_id": map[string]interface{}{
  26. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  27. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  28. },
  29. }
  30. log.Println("查询条件:",q)
  31. sess := mgo.GetMgoConn()
  32. defer mgo.DestoryMongoConn(sess)
  33. it := sess.DB(mgo.DbName).C(coll_name).Find(&q).Iter()
  34. index,isOK:=0,0
  35. start := int(time.Now().Unix())
  36. //多线程升索引
  37. pool_es := make(chan bool, es_pool)
  38. wg_es := &sync.WaitGroup{}
  39. tmpEsMap := make(map[string]string,0)
  40. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  41. //每遍历isgroupfn条 划分组别
  42. if index%isgroupfn==0 && index!=0 {
  43. log.Println("current index",index,tmp["_id"])
  44. //新的一组执行上一组生索引
  45. for k,v:=range tmpEsMap {
  46. pool_es <- true
  47. wg_es.Add(1)
  48. go func(es_id string,cur_ids string) {
  49. defer func() {
  50. <-pool_es
  51. wg_es.Done()
  52. }()
  53. if es_id!="" && cur_ids!="" {
  54. dataArr := *elastic.GetById(esIndex,esType,es_id)
  55. if len(dataArr)>0 { //存在-更新
  56. allids := qu.ObjToString(dataArr[0]["allids"])
  57. allids = allids+","+cur_ids
  58. updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  59. elastic.Update(esIndex,esType,es_id, updateStr)
  60. }else { //不存在-新增
  61. savetmp := make(map[string]interface{}, 0)
  62. savetmp["allids"] = cur_ids
  63. savetmp["_id"] = StringTOBsonId(es_id)
  64. savetmp["template_id"] = ""
  65. savetmp["fusion_id"] = ""
  66. elastic.Save(esIndex, esType, savetmp)
  67. }
  68. }else {
  69. log.Println("异常",es_id,cur_ids)
  70. }
  71. }(k,v)
  72. }
  73. wg_es.Wait()
  74. tmpEsMap = make(map[string]string,0)
  75. }
  76. repeat := qu.IntAll(tmp["repeat"])
  77. sourceid := BsonTOStringId(tmp["_id"])
  78. repeatid := BsonTOStringId(tmp["_id"])
  79. if repeat==1 {
  80. sourceid = qu.ObjToString(tmp["repeat_id"])
  81. }else {
  82. isOK++
  83. }
  84. if tmpEsMap[sourceid]!="" {
  85. ids := tmpEsMap[sourceid]
  86. ids = ids+","+repeatid
  87. tmpEsMap[sourceid] = ids
  88. }else {
  89. tmpEsMap[sourceid] = repeatid
  90. }
  91. tmp = make(map[string]interface{})
  92. }
  93. log.Println("task first:",index,"不重复数:",isOK,"遍历分组数据用时:",int(time.Now().Unix())-start,"秒")
  94. //处理剩余数据
  95. if len(tmpEsMap)>0 {
  96. log.Println("处理剩余数据:",len(tmpEsMap))
  97. for k,v:=range tmpEsMap {
  98. pool_es <- true
  99. wg_es.Add(1)
  100. go func(es_id string,cur_ids string) {
  101. defer func() {
  102. <-pool_es
  103. wg_es.Done()
  104. }()
  105. if es_id!="" && cur_ids!="" {
  106. dataArr := *elastic.GetById(esIndex,esType,es_id)
  107. if len(dataArr)>0 { //存在-更新
  108. allids := qu.ObjToString(dataArr[0]["allids"])
  109. allids = allids+","+cur_ids
  110. updateStr := `ctx._source.allids=`+ `"`+allids+`"`
  111. elastic.Update(esIndex,esType,es_id, updateStr)
  112. }else { //不存在-新增
  113. savetmp := make(map[string]interface{}, 0)
  114. savetmp["allids"] = cur_ids
  115. savetmp["_id"] = StringTOBsonId(es_id)
  116. savetmp["template_id"] = ""
  117. savetmp["fusion_id"] = ""
  118. elastic.Save(esIndex, esType, savetmp)
  119. }
  120. }else {
  121. log.Println("异常",es_id,cur_ids)
  122. }
  123. }(k,v)
  124. }
  125. wg_es.Wait()
  126. tmpEsMap = make(map[string]string,0)
  127. }
  128. log.Println("索引准备完毕睡眠30s......耗时:",int(time.Now().Unix())-start,"秒")
  129. time.Sleep(60 * time.Second)
  130. //先到处具体需要融合组数据-存mongo
  131. exportFusionMongoData()
  132. time.Sleep(60 * time.Second)
  133. //具体融合数据的方法
  134. startFusionData()
  135. time.Sleep(60 * time.Second)
  136. taskSendFusionUdp(mapInfo)
  137. }
  138. func exportFusionMongoData() {
  139. start := int(time.Now().Unix())
  140. log.Println("开始导出融合组数据......")
  141. //遍历索引
  142. esclient := elastic.GetEsConn()
  143. defer elastic.DestoryEsConn(esclient)
  144. if esclient == nil {
  145. log.Println("连接池异常")
  146. }
  147. q :=es_elastic.NewBoolQuery()
  148. cursor, err := esclient.Scan(esIndex).Query(es_elastic.NewBoolQuery().Must(q)).
  149. Size(200).Do()
  150. if err != nil {
  151. log.Println("cursor",err)
  152. }
  153. if cursor.Results == nil {
  154. log.Println("results != nil; got nil")
  155. }
  156. if cursor.Results.Hits == nil {
  157. log.Println("expected results.Hits != nil; got nil")
  158. }
  159. log.Println("查询正常-总数:",cursor.TotalHits())
  160. //多线程 - 处理数据
  161. pool_es := make(chan bool, es_pool)
  162. wg_es := &sync.WaitGroup{}
  163. pages,numDocs := 0,0
  164. for {
  165. searchResult, err := cursor.Next()
  166. if err != nil {
  167. if err.Error() == "EOS" {
  168. break
  169. }else {
  170. log.Println("cursor searchResult",err)
  171. }
  172. }
  173. pages++
  174. isLog := false
  175. for _, hit := range searchResult.Hits.Hits {
  176. tmp := make(map[string]interface{})
  177. err := json.Unmarshal(*hit.Source, &tmp)
  178. if err != nil {
  179. log.Println("json Unmarshal error")
  180. continue
  181. }
  182. if !isLog && numDocs%10000==0 {
  183. log.Println("当前条数:", numDocs, "Es数据:", tmp["_id"])
  184. isLog = true
  185. }
  186. numDocs++
  187. fusion_ids := qu.ObjToString(tmp["allids"])
  188. sourceid := qu.ObjToString(tmp["_id"])
  189. pool_es <- true
  190. wg_es.Add(1)
  191. go func(sourceid string, fusionArr string) {
  192. defer func() {
  193. <-pool_es
  194. wg_es.Done()
  195. }()
  196. AddGroupPool.pool <- map[string]interface{}{
  197. "_id":StringTOBsonId(sourceid),
  198. "allids":fusion_ids,
  199. }
  200. }(sourceid, fusion_ids)
  201. }
  202. }
  203. log.Println("遍历Es结束......")
  204. wg_es.Wait()
  205. log.Println("fusion group over :",numDocs,"用时:",int(time.Now().Unix())-start,"秒")
  206. }
  207. func startFusionData() {
  208. log.Println("开始全量融合流程...")
  209. defer qu.Catch()
  210. //可以开多程序-不同id段执行融合
  211. q := map[string]interface{}{}
  212. sess := mgo.GetMgoConn()
  213. defer mgo.DestoryMongoConn(sess)
  214. it := sess.DB(mgo.DbName).C(group_coll_name).Find(&q).Iter()
  215. index,start :=0, int(time.Now().Unix())
  216. //多线程保存数据
  217. pool_mgo := make(chan bool, mgo_pool)
  218. wg_mgo := &sync.WaitGroup{}
  219. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  220. if index%10000==0 {
  221. log.Println("current index",index,tmp["_id"])
  222. }
  223. fusion_ids := qu.ObjToString(tmp["allids"])
  224. fusionArr := strings.Split(fusion_ids, ",")
  225. sourceid := BsonTOStringId(tmp["_id"])
  226. pool_mgo <- true
  227. wg_mgo.Add(1)
  228. go func(sourceid string, fusionArr []string) {
  229. defer func() {
  230. <-pool_mgo
  231. wg_mgo.Done()
  232. }()
  233. weight := NewWeightData(fusionArr)
  234. weight.analyzeBuildStandardData()
  235. saveFusionData, saveRecordData:= map[string]interface{}{},map[string]interface{}{}
  236. if len(fusionArr) <= 1 {
  237. saveFusionData, saveRecordData = weight.dealWithAddFusionStruct()
  238. }else {
  239. saveFusionData, saveRecordData = weight.dealWithMultipleAddFusionStruct()
  240. }
  241. //新增融合表
  242. saveid := mgo.Save(fusion_coll_name, saveFusionData)
  243. saveRecordData["_id"] = saveid
  244. //批量新增日志表
  245. AddRecordPool.pool <- saveRecordData
  246. //批量更新分组表
  247. UpdateGroupPool.pool <- []map[string]interface{}{
  248. map[string]interface{}{
  249. "_id": StringTOBsonId(sourceid),
  250. },
  251. map[string]interface{}{
  252. "$set": map[string]interface{}{
  253. "fusion_id": BsonTOStringId(saveid),
  254. "template_id":qu.ObjToString(saveFusionData["fusion_templateid"]),
  255. },
  256. },
  257. }
  258. }(sourceid, fusionArr)
  259. tmp = make(map[string]interface{})
  260. }
  261. wg_mgo.Wait()
  262. log.Println("fusion is over:",index,"总用时:",int(time.Now().Unix())-start,"秒")
  263. }
  264. //处理结构数据
  265. func (weight *weightDataMap)dealWithStructData(recordDict *map[string]interface{}) map[string]interface{} {
  266. //模板id 数据
  267. templateid:=weight.templateid
  268. templateTmp:=weight.data[templateid].data
  269. modifyData :=make(map[string]interface{},0)
  270. attach_text,isAttach:=make(map[string]interface{},0),false
  271. if tmp_arr,b := templateTmp["attach_text"].(map[string]interface{});b {
  272. //有值符合-
  273. attach_text = tmp_arr
  274. }
  275. //附件判重-并合并新增
  276. keyIndex := -1
  277. for k,_:=range attach_text {
  278. key:=qu.IntAll(k)
  279. if key>keyIndex {
  280. keyIndex = key
  281. }
  282. }
  283. for _,value_id :=range weight.saveids {
  284. if templateid == value_id {
  285. continue
  286. }
  287. rankData := weight.data[value_id].data //具体其他排名数据
  288. if attachData,b := rankData["attach_text"].(map[string]interface{});b {
  289. if len(attachData)>0 { //有值
  290. for _,v:=range attachData { //子元素
  291. if attach,isOK := v.(map[string]interface{});isOK {
  292. if !dealWithRepeatAttachData(attach_text,attach) {
  293. //符合条件-不重复直接添加
  294. keyIndex++
  295. saveKey := fmt.Sprintf("%v",keyIndex)
  296. attach_text[saveKey] = attach //key累加
  297. isAttach = true
  298. //多条情况-融合
  299. if (*recordDict)["attach_text"]==nil {
  300. (*recordDict)["attach_text"] = []map[string]interface{}{
  301. map[string]interface{}{
  302. "id":value_id,
  303. "value":attach,
  304. },
  305. }
  306. }else {
  307. arr := (*recordDict)["attach_text"].([]map[string]interface{})
  308. arr = append(arr,map[string]interface{}{
  309. "id":value_id,
  310. "value":attach,
  311. })
  312. (*recordDict)["attach_text"] = arr
  313. }
  314. }
  315. }
  316. }
  317. }
  318. }
  319. }
  320. //联系人 winnerorder
  321. winnerCount:=qu.IntAll(0)
  322. winnerArr,b,isWinner,winnerid:=make(primitive.A,0),false,false,templateid
  323. if winnerArr,b = templateTmp["winnerorder"].([]interface{});b {
  324. winnerCount = qu.IntAll(len(winnerArr))
  325. }
  326. //分包 package
  327. packageCount:=qu.IntAll(0)
  328. packageArr,b,isPackage,packageid:=make(map[string]interface{},0),false,false,templateid
  329. if packageArr,b = templateTmp["package"].(map[string]interface{});b {
  330. packageCount = qu.IntAll(len(packageArr))
  331. }
  332. //遍历其他数据-
  333. for _,value:=range weight.saveids {
  334. if templateid == value {
  335. continue
  336. }
  337. //winnerorder
  338. tmp:=weight.data[value].data
  339. if arr_1,winner_b := tmp["winnerorder"].(primitive.A);winner_b {
  340. count:=qu.IntAll(len(arr_1))
  341. if count > winnerCount {
  342. winnerCount = count
  343. winnerArr = arr_1
  344. isWinner = true
  345. winnerid = value
  346. }
  347. }
  348. //package
  349. if arr_2,package_b := (tmp["package"]).(map[string]interface{});package_b {
  350. count:=qu.IntAll(len(arr_2))
  351. if count > packageCount {
  352. packageCount = count
  353. packageArr = arr_2
  354. isPackage = true
  355. packageid = value
  356. }
  357. }
  358. }
  359. //改变的值
  360. if len(winnerArr)>0 && winnerArr!=nil && isWinner {
  361. modifyData["winnerorder"] = winnerArr
  362. (*recordDict)["winnerorder"] = map[string]interface{}{
  363. "id":winnerid,
  364. "value":winnerArr,
  365. }
  366. }
  367. if len(packageArr)>0 && packageArr!=nil && isPackage {
  368. modifyData["package"] = packageArr
  369. (*recordDict)["package"] = map[string]interface{}{
  370. "id":packageid,
  371. "value":packageArr,
  372. }
  373. }
  374. if len(attach_text)>0 && attach_text!=nil && isAttach {
  375. modifyData["attach_text"] = attach_text
  376. }
  377. return modifyData
  378. }