package main import ( "flow_repeat/nsqdata" "github.com/robfig/cron/v3" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "regexp" ) func InitAllInfos() { initMgo() initVar() initNsq() initSite() initData() } func initMgo() { spider_mconf := Sysconfig["spider_mongodb"].(map[string]interface{}) spider_mgo = &MongodbSim{ MongodbAddr: spider_mconf["spider_addr"].(string), DbName: spider_mconf["spider_db"].(string), Size: qu.IntAllDef(spider_mconf["spider_pool"], 5), UserName: spider_mconf["username"].(string), Password: spider_mconf["password"].(string), } spider_mgo.InitPool() spider_coll = spider_mconf["spider_coll"].(string) task_mconf := Sysconfig["task_mongodb"].(map[string]interface{}) task_mgo = &MongodbSim{ MongodbAddr: task_mconf["task_addr"].(string), DbName: task_mconf["task_db"].(string), Size: qu.IntAllDef(task_mconf["task_pool"], 10), UserName: task_mconf["username"].(string), Password: task_mconf["password"].(string), } task_mgo.InitPool() task_coll = task_mconf["task_coll"].(string) task_bidding = task_mconf["task_bidding"].(string) nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) mconf := Sysconfig["mongodb"].(map[string]interface{}) data_mgo = &MongodbSim{ MongodbAddr: mconf["addr"].(string), DbName: mconf["db"].(string), Size: qu.IntAllDef(mconf["pool"], 10), UserName: mconf["username"].(string), Password: mconf["password"].(string), } data_mgo.InitPool() extract = mconf["extract"].(string) extract_back = mconf["extract_back"].(string) extract_log = mconf["extract_log"].(string) } func initVar() { FilterRegTitle = regexp.MustCompile(qu.ObjToString(Sysconfig["specialwords"])) FilterRegTitle_0 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_0"])) FilterRegTitle_1 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_1"])) FilterRegTitle_2 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_2"])) threadNum = qu.IntAllDef(Sysconfig["threads"], 1) LowHeavy = Sysconfig["lowHeavy"].(bool) TimingTask = Sysconfig["timingTask"].(bool) timingSpanDay = qu.Int64All(Sysconfig["timingSpanDay"]) timingPubScope = qu.Int64All(Sysconfig["timingPubScope"]) jyfb_arr := qu.ObjArrToStringArr(Sysconfig["jyfb_data"].([]interface{})) jyfb_data = make(map[string]string, 0) for _, v := range jyfb_arr { jyfb_data[v] = v } } func initSite() { cronlock.Lock() isUpdateSite = false SiteMap = make(map[string]map[string]interface{}, 0) sess := spider_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) q := map[string]interface{}{} res := sess.DB(spider_mgo.DbName).C(spider_coll).Find(&q).Sort("_id").Iter() for tmp := make(map[string]interface{}); res.Next(&tmp); { data := map[string]interface{}{ "area": qu.ObjToString(tmp["area"]), "city": qu.ObjToString(tmp["city"]), "district": qu.ObjToString(tmp["district"]), } SiteMap[qu.ObjToString(tmp["site"])] = data } log.Println("new站点加载完毕~", len(SiteMap)) cronlock.Unlock() } func initNsq() { nsqAddr := "172.17.162.36:4150" if !IsFull { var err error nspdata_1, err = nsqdata.NewProducer(nsqAddr, "bidding_id", true) if err != nil { log.Fatal("通道配置异常~", err) } else { log.Println("通道配置正常") } nspdata_2, err = nsqdata.NewProducer(nsqAddr, "project_id", true) if err != nil { log.Fatal("通道配置异常~", err) } else { log.Println("通道配置正常~") } } } func initData() { dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5) DM = NewDatamap(dupdays, lastid) Update = newUpdatePool() go Update.updateData() c := cron.New() c.AddFunc("0 0 6 * * ?", func() { isUpdateSite = true }) c.Start() }