load_data.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package main
  2. import (
  3. "encoding/json"
  4. "go.uber.org/zap"
  5. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/redis"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. // 初始加载数据,默认加载最近6个月的数据
  13. func (p *ProjectTask) loadData(starttime int64) {
  14. log.Info("load project start..", zap.Int64("starttime", starttime))
  15. p.findLock.Lock()
  16. defer p.findLock.Unlock()
  17. sess := MgoP.GetMgoConn()
  18. defer MgoP.DestoryMongoConn(sess)
  19. loadOver := make(chan bool)
  20. q := map[string]interface{}{
  21. "lasttime": map[string]interface{}{"$gte": starttime},
  22. }
  23. field := map[string]interface{}{"list": 0}
  24. it := sess.DB(MgoP.DbName).C(p.coll).Find(&q).Select(field).Iter()
  25. n, count := 0, 0
  26. pool := make(chan *ProjectCache, 1000)
  27. go func() {
  28. for {
  29. select {
  30. case tmp := <-pool:
  31. n++
  32. if n%10000 == 0 {
  33. util.Debug("current", n, "mapPn", len(p.mapPn), "mapPc", len(p.mapPc), "mapPb", len(p.mapPb), "mapHref", len(p.mapHref)) //, tmp.ProjectName, tmp.MPN, tmp.ProjectCode, tmp.MPC, tmp.Buyer)
  34. }
  35. if tmp != nil {
  36. id := tmp.Id.Hex()
  37. for _, v := range append([]string{tmp.ProjectName}, tmp.MPN...) {
  38. if v != "" {
  39. //v = pcReplace.ReplaceAllString(v, "")
  40. if v != "" {
  41. k := p.mapPn[v]
  42. if k == nil {
  43. k = &Key{Arr: []string{id}}
  44. p.mapPn[v] = k
  45. } else {
  46. k.Arr = append(k.Arr, id)
  47. }
  48. }
  49. }
  50. }
  51. for _, v := range append([]string{tmp.ProjectCode}, tmp.MPC...) {
  52. if v != "" {
  53. //v = pcReplace.ReplaceAllString(v, "")
  54. if v != "" {
  55. k := p.mapPc[v]
  56. if k == nil {
  57. k = &Key{Arr: []string{id}}
  58. p.mapPc[v] = k
  59. } else {
  60. k.Arr = append(k.Arr, id)
  61. }
  62. }
  63. }
  64. }
  65. if tmp.Buyer != "" && len([]rune(tmp.Buyer)) > 2 {
  66. k := p.mapPb[tmp.Buyer]
  67. if k == nil {
  68. k = &Key{Arr: []string{id}}
  69. p.mapPb[tmp.Buyer] = k
  70. } else {
  71. k.Arr = append(k.Arr, id)
  72. }
  73. }
  74. p.AllIdsMapLock.Lock()
  75. p.AllIdsMap[id] = &ID{Id: id, P: tmp}
  76. p.AllIdsMapLock.Unlock()
  77. }
  78. case <-loadOver:
  79. return
  80. }
  81. }
  82. }()
  83. ch := make(chan bool, 3)
  84. wg := &sync.WaitGroup{}
  85. for tmp := make(map[string]interface{}); it.Next(tmp); count++ {
  86. //if count%20000 == 0 {
  87. // log.Println(fmt.Sprintf("iter --- %d", count))
  88. //}
  89. ch <- true
  90. wg.Add(1)
  91. go func(tmp map[string]interface{}) {
  92. defer func() {
  93. <-ch
  94. wg.Done()
  95. }()
  96. bys, _ := json.Marshal(tmp)
  97. var pc *ProjectCache
  98. _ = json.Unmarshal(bys, &pc)
  99. saveFiled(p, tmp, pc)
  100. redis.PutCKV("project", pc.Id.Hex(), tmp)
  101. pool <- pc
  102. }(tmp)
  103. tmp = make(map[string]interface{})
  104. }
  105. wg.Wait()
  106. time.Sleep(2 * time.Second)
  107. loadOver <- true
  108. log.Info("load project over..", zap.Int("n", n))
  109. }
  110. func (p *ProjectTask) loadSite() {
  111. log.Info("load site start..")
  112. p.findLock.Lock()
  113. defer p.findLock.Unlock()
  114. p.mapSiteLock.Lock()
  115. defer p.mapSiteLock.Unlock()
  116. sess := MgoS.GetMgoConn()
  117. defer MgoS.DestoryMongoConn(sess)
  118. q := map[string]interface{}{}
  119. it := sess.DB(MgoS.DbName).C(SiteColl).Find(&q).Iter()
  120. n := 0
  121. pool := make(chan *Site, 100)
  122. over := make(chan bool)
  123. go func() {
  124. for {
  125. select {
  126. case tmp := <-pool:
  127. n++
  128. //站点有效标记state
  129. if tmp != nil && tmp.Status == 5 {
  130. p.mapSite[tmp.Site] = tmp
  131. }
  132. case <-over:
  133. return
  134. }
  135. }
  136. }()
  137. for {
  138. result := make(map[string]interface{})
  139. if it.Next(&result) {
  140. go func(res map[string]interface{}) {
  141. bys, _ := json.Marshal(result)
  142. var tmp *Site
  143. _ = json.Unmarshal(bys, &tmp)
  144. pool <- tmp
  145. }(result)
  146. } else {
  147. break
  148. }
  149. }
  150. time.Sleep(2 * time.Second)
  151. over <- true
  152. log.Info("load site over..", zap.Int("n", n))
  153. }
  154. func saveFiled(p *ProjectTask, res map[string]interface{}, tmp *ProjectCache) {
  155. proHref := util.ObjToString(res["projecthref"])
  156. if proHref != "" {
  157. p.mapHrefLock.Lock()
  158. p.mapHref[proHref] = tmp.Id.Hex()
  159. p.mapHrefLock.Unlock()
  160. }
  161. if res["entidlist"] != nil {
  162. elist := util.ObjArrToStringArr(res["entidlist"].([]interface{}))
  163. wlist := strings.Split(util.ObjToString(res["s_winner"]), ",")
  164. buyer := util.ObjToString(res["buyer"])
  165. if len(elist) == len(wlist) && buyer != "" {
  166. for i, eid := range elist {
  167. if eid != "-" {
  168. text := buyer + "," + wlist[i]
  169. ex, _ := redis.Exists(RedisCode, text)
  170. if !ex {
  171. redis.PutCKV(RedisCode, text, tmp.Id.Hex())
  172. }
  173. }
  174. }
  175. }
  176. }
  177. //tmpMap := make(map[string]InfoField)
  178. //infoMap := res["infofield"].(map[string]interface{})
  179. //for _, v := range infoMap {
  180. // var field InfoField
  181. // b, _ := json.Marshal(v)
  182. // _ = json.Unmarshal(b, &field)
  183. // tmpMap[tmp.Id.Hex()] = field
  184. //}
  185. //tmp.InfoFiled = tmpMap
  186. }
  187. // 加载spidercode数据,isflow字段
  188. func (p *ProjectTask) loadSpiderCode() {
  189. log.Info("load spider code start..")
  190. p.findLock.Lock()
  191. defer p.findLock.Unlock()
  192. p.mapSpiderLock.Lock()
  193. defer p.mapSpiderLock.Unlock()
  194. sess := MgoS.GetMgoConn()
  195. defer MgoS.DestoryMongoConn(sess)
  196. q := map[string]interface{}{}
  197. field := map[string]interface{}{"code": 1, "isflow": 1}
  198. it := sess.DB(MgoS.DbName).C("luaconfig").Find(&q).Select(field).Iter()
  199. n := 0
  200. pool := make(chan map[string]interface{}, 100)
  201. over := make(chan bool)
  202. go func() {
  203. for {
  204. select {
  205. case tmp := <-pool:
  206. n++
  207. code := util.ObjToString(tmp["code"])
  208. p.mapSpider[code] = util.IntAll(tmp["isflow"])
  209. case <-over:
  210. return
  211. }
  212. }
  213. }()
  214. for {
  215. result := make(map[string]interface{})
  216. if it.Next(&result) {
  217. go func(res map[string]interface{}) {
  218. pool <- result
  219. }(result)
  220. } else {
  221. break
  222. }
  223. }
  224. time.Sleep(2 * time.Second)
  225. over <- true
  226. log.Info("load spider over..", zap.Int("n", n))
  227. }