initData.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package main
  2. import (
  3. "flow_repeat/nsqdata"
  4. "github.com/robfig/cron/v3"
  5. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. "log"
  7. "regexp"
  8. )
  9. func InitAllInfos() {
  10. initMgo()
  11. initVar()
  12. //initNsq() //支持数据替换
  13. initSite()
  14. initData()
  15. }
  16. func initMgo() {
  17. spider_mconf := Sysconfig["spider_mongodb"].(map[string]interface{})
  18. spider_mgo = &MongodbSim{
  19. MongodbAddr: spider_mconf["spider_addr"].(string),
  20. DbName: spider_mconf["spider_db"].(string),
  21. Size: qu.IntAllDef(spider_mconf["spider_pool"], 5),
  22. UserName: spider_mconf["username"].(string),
  23. Password: spider_mconf["password"].(string),
  24. }
  25. spider_mgo.InitPool()
  26. spider_coll = spider_mconf["spider_coll"].(string)
  27. task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
  28. task_mgo = &MongodbSim{
  29. MongodbAddr: task_mconf["task_addr"].(string),
  30. DbName: task_mconf["task_db"].(string),
  31. Size: qu.IntAllDef(task_mconf["task_pool"], 10),
  32. UserName: task_mconf["username"].(string),
  33. Password: task_mconf["password"].(string),
  34. }
  35. task_mgo.InitPool()
  36. task_coll = task_mconf["task_coll"].(string)
  37. task_bidding = task_mconf["task_bidding"].(string)
  38. nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
  39. mconf := Sysconfig["mongodb"].(map[string]interface{})
  40. data_mgo = &MongodbSim{
  41. MongodbAddr: mconf["addr"].(string),
  42. DbName: mconf["db"].(string),
  43. Size: qu.IntAllDef(mconf["pool"], 10),
  44. UserName: mconf["username"].(string),
  45. Password: mconf["password"].(string),
  46. }
  47. data_mgo.InitPool()
  48. extract = mconf["extract"].(string)
  49. extract_back = mconf["extract_back"].(string)
  50. extract_log = mconf["extract_log"].(string)
  51. }
  52. func initVar() {
  53. FilterRegTitle = regexp.MustCompile(qu.ObjToString(Sysconfig["specialwords"]))
  54. FilterRegTitle_0 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_0"]))
  55. FilterRegTitle_1 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_1"]))
  56. FilterRegTitle_2 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_2"]))
  57. threadNum = qu.IntAllDef(Sysconfig["threads"], 1)
  58. LowHeavy = Sysconfig["lowHeavy"].(bool)
  59. TimingTask = Sysconfig["timingTask"].(bool)
  60. timingSpanDay = qu.Int64All(Sysconfig["timingSpanDay"])
  61. timingPubScope = qu.Int64All(Sysconfig["timingPubScope"])
  62. jyfb_arr := qu.ObjArrToStringArr(Sysconfig["jyfb_data"].([]interface{}))
  63. jyfb_data = make(map[string]string, 0)
  64. for _, v := range jyfb_arr {
  65. jyfb_data[v] = v
  66. }
  67. }
  68. func initSite() {
  69. cronlock.Lock()
  70. isUpdateSite = false
  71. SiteMap = make(map[string]map[string]interface{}, 0)
  72. sess := spider_mgo.GetMgoConn()
  73. defer data_mgo.DestoryMongoConn(sess)
  74. q := map[string]interface{}{}
  75. res := sess.DB(spider_mgo.DbName).C(spider_coll).Find(&q).Sort("_id").Iter()
  76. for tmp := make(map[string]interface{}); res.Next(&tmp); {
  77. data := map[string]interface{}{
  78. "area": qu.ObjToString(tmp["area"]),
  79. "city": qu.ObjToString(tmp["city"]),
  80. "district": qu.ObjToString(tmp["district"]),
  81. }
  82. SiteMap[qu.ObjToString(tmp["site"])] = data
  83. }
  84. log.Println("new站点加载完毕~", len(SiteMap))
  85. cronlock.Unlock()
  86. }
  87. func initNsq() {
  88. nsqAddr := "172.17.162.36:4150"
  89. if !IsFull {
  90. var err error
  91. nspdata_1, err = nsqdata.NewProducer(nsqAddr, "bidding_id", true)
  92. if err != nil {
  93. log.Fatal("通道配置异常~", err)
  94. } else {
  95. log.Println("通道配置正常")
  96. }
  97. nspdata_2, err = nsqdata.NewProducer(nsqAddr, "project_id", true)
  98. if err != nil {
  99. log.Fatal("通道配置异常~", err)
  100. } else {
  101. log.Println("通道配置正常~")
  102. }
  103. }
  104. }
  105. func initData() {
  106. dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5)
  107. DM = NewDatamap(dupdays, lastid)
  108. Update = newUpdatePool()
  109. go Update.updateData()
  110. c := cron.New()
  111. c.AddFunc("0 0 6 * * ?", func() {
  112. isUpdateSite = true
  113. })
  114. c.Start()
  115. }