123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- package main
- import (
- "flow_repeat/nsqdata"
- "github.com/robfig/cron/v3"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "regexp"
- )
- func InitAllInfos() {
- initMgo()
- initVar()
- initNsq()
- initSite()
- initData()
- }
- func initMgo() {
- spider_mconf := Sysconfig["spider_mongodb"].(map[string]interface{})
- spider_mgo = &MongodbSim{
- MongodbAddr: spider_mconf["spider_addr"].(string),
- DbName: spider_mconf["spider_db"].(string),
- Size: qu.IntAllDef(spider_mconf["spider_pool"], 5),
- UserName: spider_mconf["username"].(string),
- Password: spider_mconf["password"].(string),
- }
- spider_mgo.InitPool()
- spider_coll = spider_mconf["spider_coll"].(string)
- task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
- task_mgo = &MongodbSim{
- MongodbAddr: task_mconf["task_addr"].(string),
- DbName: task_mconf["task_db"].(string),
- Size: qu.IntAllDef(task_mconf["task_pool"], 10),
- UserName: task_mconf["username"].(string),
- Password: task_mconf["password"].(string),
- }
- task_mgo.InitPool()
- task_coll = task_mconf["task_coll"].(string)
- task_bidding = task_mconf["task_bidding"].(string)
- nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
- mconf := Sysconfig["mongodb"].(map[string]interface{})
- data_mgo = &MongodbSim{
- MongodbAddr: mconf["addr"].(string),
- DbName: mconf["db"].(string),
- Size: qu.IntAllDef(mconf["pool"], 10),
- UserName: mconf["username"].(string),
- Password: mconf["password"].(string),
- }
- data_mgo.InitPool()
- extract = mconf["extract"].(string)
- extract_back = mconf["extract_back"].(string)
- extract_log = mconf["extract_log"].(string)
- }
- func initVar() {
- FilterRegTitle = regexp.MustCompile(qu.ObjToString(Sysconfig["specialwords"]))
- FilterRegTitle_0 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_0"]))
- FilterRegTitle_1 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_1"]))
- FilterRegTitle_2 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_2"]))
- threadNum = qu.IntAllDef(Sysconfig["threads"], 1)
- LowHeavy = Sysconfig["lowHeavy"].(bool)
- TimingTask = Sysconfig["timingTask"].(bool)
- timingSpanDay = qu.Int64All(Sysconfig["timingSpanDay"])
- timingPubScope = qu.Int64All(Sysconfig["timingPubScope"])
- jyfb_arr := qu.ObjArrToStringArr(Sysconfig["jyfb_data"].([]interface{}))
- jyfb_data = make(map[string]string, 0)
- for _, v := range jyfb_arr {
- jyfb_data[v] = v
- }
- }
- func initSite() {
- cronlock.Lock()
- isUpdateSite = false
- SiteMap = make(map[string]map[string]interface{}, 0)
- sess := spider_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{}
- res := sess.DB(spider_mgo.DbName).C(spider_coll).Find(&q).Sort("_id").Iter()
- for tmp := make(map[string]interface{}); res.Next(&tmp); {
- data := map[string]interface{}{
- "area": qu.ObjToString(tmp["area"]),
- "city": qu.ObjToString(tmp["city"]),
- "district": qu.ObjToString(tmp["district"]),
- }
- SiteMap[qu.ObjToString(tmp["site"])] = data
- }
- log.Println("new站点加载完毕~", len(SiteMap))
- cronlock.Unlock()
- }
- func initNsq() {
- nsqAddr := "172.17.162.36:4150"
- if !IsFull {
- var err error
- nspdata_1, err = nsqdata.NewProducer(nsqAddr, "bidding_id", true)
- if err != nil {
- log.Fatal("通道配置异常~", err)
- } else {
- log.Println("通道配置正常")
- }
- nspdata_2, err = nsqdata.NewProducer(nsqAddr, "project_id", true)
- if err != nil {
- log.Fatal("通道配置异常~", err)
- } else {
- log.Println("通道配置正常~")
- }
- }
- }
- func initData() {
- dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5)
- DM = NewDatamap(dupdays, lastid)
- Update = newUpdatePool()
- go Update.updateData()
- c := cron.New()
- c.AddFunc("0 0 6 * * ?", func() {
- isUpdateSite = true
- })
- c.Start()
- }
|