load_data.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "sync"
  7. "time"
  8. "go.uber.org/zap"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  12. )
  13. // 初始加载数据,默认加载最近6个月的数据
  14. func (p *ProjectTask) loadData(starttime int64) {
  15. log.Info("load project start..", zap.Int64("starttime", starttime))
  16. //清除进程时 会卡住注释掉
  17. // p.findLock.Lock()
  18. // defer p.findLock.Unlock()
  19. sess := MgoP.GetMgoConn()
  20. defer MgoP.DestoryMongoConn(sess)
  21. loadOver := make(chan bool)
  22. q := map[string]interface{}{
  23. "lasttime": map[string]interface{}{"$gte": starttime},
  24. }
  25. field := map[string]interface{}{"list": 0}
  26. it := sess.DB(MgoP.DbName).C(p.coll).Find(&q).Select(field).Iter()
  27. n, count := 0, 0
  28. pool := make(chan *ProjectCache, 1000)
  29. go func() {
  30. for {
  31. select {
  32. case tmp := <-pool:
  33. n++
  34. if n%10000 == 0 {
  35. util.Debug("current", n, "mapPn", len(p.mapPn), "mapPc", len(p.mapPc), "mapPb", len(p.mapPb), "mapHref", len(p.mapHref)) //, tmp.ProjectName, tmp.MPN, tmp.ProjectCode, tmp.MPC, tmp.Buyer)
  36. }
  37. if tmp != nil {
  38. id := tmp.Id.Hex()
  39. for _, v := range append([]string{tmp.ProjectName}, tmp.MPN...) {
  40. if v != "" {
  41. //v = pcReplace.ReplaceAllString(v, "")
  42. if v != "" {
  43. k := p.mapPn[v]
  44. if k == nil {
  45. k = &Key{Arr: []string{id}}
  46. p.mapPn[v] = k
  47. } else {
  48. k.Arr = append(k.Arr, id)
  49. }
  50. }
  51. }
  52. }
  53. for _, v := range append([]string{tmp.ProjectCode}, tmp.MPC...) {
  54. if v != "" {
  55. //v = pcReplace.ReplaceAllString(v, "")
  56. if v != "" {
  57. k := p.mapPc[v]
  58. if k == nil {
  59. k = &Key{Arr: []string{id}}
  60. p.mapPc[v] = k
  61. } else {
  62. k.Arr = append(k.Arr, id)
  63. }
  64. }
  65. }
  66. }
  67. if tmp.Buyer != "" && len([]rune(tmp.Buyer)) > 2 {
  68. k := p.mapPb[tmp.Buyer]
  69. if k == nil {
  70. k = &Key{Arr: []string{id}}
  71. p.mapPb[tmp.Buyer] = k
  72. } else {
  73. k.Arr = append(k.Arr, id)
  74. }
  75. }
  76. p.AllIdsMapLock.Lock()
  77. p.AllIdsMap[id] = &ID{Id: id, P: tmp}
  78. p.AllIdsMapLock.Unlock()
  79. }
  80. case <-loadOver:
  81. return
  82. }
  83. }
  84. }()
  85. ch := make(chan bool, 3)
  86. wg := &sync.WaitGroup{}
  87. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  88. if count%20000 == 0 {
  89. log.Info(fmt.Sprintf("iter --- %d", count))
  90. }
  91. ch <- true
  92. wg.Add(1)
  93. go func(tmp map[string]interface{}) {
  94. defer func() {
  95. <-ch
  96. wg.Done()
  97. }()
  98. bys, _ := json.Marshal(tmp)
  99. var pc *ProjectCache
  100. _ = json.Unmarshal(bys, &pc)
  101. saveFiled(p, tmp, pc)
  102. redis.PutCKV(RedisProject, fmt.Sprintf(P_KEY, pc.Id.Hex()), tmp)
  103. pool <- pc
  104. }(tmp)
  105. tmp = make(map[string]interface{})
  106. }
  107. wg.Wait()
  108. time.Sleep(2 * time.Second)
  109. loadOver <- true
  110. log.Info("load project over..", zap.Int("n", n))
  111. }
  112. func (p *ProjectTask) loadSite() {
  113. log.Info("load site start..")
  114. p.findLock.Lock()
  115. defer p.findLock.Unlock()
  116. p.mapSiteLock.Lock()
  117. defer p.mapSiteLock.Unlock()
  118. sess := MgoS.GetMgoConn()
  119. defer MgoS.DestoryMongoConn(sess)
  120. q := map[string]interface{}{}
  121. it := sess.DB(MgoS.DbName).C(SiteColl).Find(&q).Iter()
  122. n := 0
  123. pool := make(chan *Site, 100)
  124. over := make(chan bool)
  125. go func() {
  126. for {
  127. select {
  128. case tmp := <-pool:
  129. n++
  130. //站点有效标记state
  131. if tmp != nil && tmp.Status == 5 {
  132. p.mapSite[tmp.Site] = tmp
  133. }
  134. case <-over:
  135. return
  136. }
  137. }
  138. }()
  139. for {
  140. result := make(map[string]interface{})
  141. if it.Next(&result) {
  142. go func(res map[string]interface{}) {
  143. bys, _ := json.Marshal(result)
  144. var tmp *Site
  145. _ = json.Unmarshal(bys, &tmp)
  146. pool <- tmp
  147. }(result)
  148. } else {
  149. break
  150. }
  151. }
  152. time.Sleep(2 * time.Second)
  153. over <- true
  154. log.Info("load site over..", zap.Int("n", n))
  155. }
  156. func saveFiled(p *ProjectTask, res map[string]interface{}, tmp *ProjectCache) {
  157. proHref := util.ObjToString(res["projecthref"])
  158. if proHref != "" {
  159. p.mapHrefLock.Lock()
  160. p.mapHref[proHref] = tmp.Id.Hex()
  161. p.mapHrefLock.Unlock()
  162. }
  163. if res["entidlist"] != nil {
  164. elist := util.ObjArrToStringArr(res["entidlist"].([]interface{}))
  165. wlist := strings.Split(util.ObjToString(res["s_winner"]), ",")
  166. buyer := util.ObjToString(res["buyer"])
  167. if len(elist) == len(wlist) && buyer != "" {
  168. for i, eid := range elist {
  169. if eid != "-" {
  170. text := buyer + "," + wlist[i]
  171. ex, _ := redis.Exists(RedisBuyer, text)
  172. if !ex {
  173. redis.PutCKV(RedisBuyer, text, tmp.Id.Hex())
  174. }
  175. }
  176. }
  177. }
  178. }
  179. //tmpMap := make(map[string]InfoField)
  180. //infoMap := res["infofield"].(map[string]interface{})
  181. //for _, v := range infoMap {
  182. // var field InfoField
  183. // b, _ := json.Marshal(v)
  184. // _ = json.Unmarshal(b, &field)
  185. // tmpMap[tmp.Id.Hex()] = field
  186. //}
  187. //tmp.InfoFiled = tmpMap
  188. }
  189. // 加载spidercode数据,isflow字段
  190. func (p *ProjectTask) loadSpiderCode() {
  191. log.Info("load spider code start..")
  192. p.findLock.Lock()
  193. defer p.findLock.Unlock()
  194. p.mapSpiderLock.Lock()
  195. defer p.mapSpiderLock.Unlock()
  196. sess := MgoS.GetMgoConn()
  197. defer MgoS.DestoryMongoConn(sess)
  198. q := map[string]interface{}{}
  199. field := map[string]interface{}{"code": 1, "isflow": 1}
  200. it := sess.DB(MgoS.DbName).C("luaconfig").Find(&q).Select(field).Iter()
  201. n := 0
  202. pool := make(chan map[string]interface{}, 100)
  203. over := make(chan bool)
  204. go func() {
  205. for {
  206. select {
  207. case tmp := <-pool:
  208. n++
  209. code := util.ObjToString(tmp["code"])
  210. p.mapSpider[code] = util.IntAll(tmp["isflow"])
  211. case <-over:
  212. return
  213. }
  214. }
  215. }()
  216. for {
  217. result := make(map[string]interface{})
  218. if it.Next(&result) {
  219. go func(res map[string]interface{}) {
  220. pool <- result
  221. }(result)
  222. } else {
  223. break
  224. }
  225. }
  226. time.Sleep(2 * time.Second)
  227. over <- true
  228. log.Info("load spider over..", zap.Int("n", n))
  229. }