|
@@ -7,7 +7,6 @@ package main
|
|
import (
|
|
import (
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"flag"
|
|
"flag"
|
|
- "github.com/cron"
|
|
|
|
"log"
|
|
"log"
|
|
mu "mfw/util"
|
|
mu "mfw/util"
|
|
"net"
|
|
"net"
|
|
@@ -19,17 +18,16 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
var (
|
|
var (
|
|
- Sysconfig map[string]interface{} //配置文件
|
|
|
|
- mconf map[string]interface{}
|
|
|
|
- data_mgo, task_mgo *MongodbSim
|
|
|
|
- task_collName, task_bidding string
|
|
|
|
- extract, extract_back, extract_log string
|
|
|
|
- udpclient mu.UdpClient
|
|
|
|
- nextNode []map[string]interface{}
|
|
|
|
- dupdays = 7
|
|
|
|
- DM, FullDM *datamap
|
|
|
|
- Update *updateInfo
|
|
|
|
- AddGroupPool *addGroupInfo
|
|
|
|
|
|
+ Sysconfig map[string]interface{} //配置文件
|
|
|
|
+ data_mgo, task_mgo, spider_mgo *MongodbSim
|
|
|
|
+ task_coll, task_bidding, spider_coll string
|
|
|
|
+ extract, extract_back, extract_log string
|
|
|
|
+ udpclient mu.UdpClient
|
|
|
|
+ nextNode []map[string]interface{}
|
|
|
|
+ dupdays = 7
|
|
|
|
+ DM, FullDM *datamap
|
|
|
|
+ Update *updateInfo
|
|
|
|
+ AddGroupPool *addGroupInfo
|
|
//正则筛选相关
|
|
//正则筛选相关
|
|
FilterRegTitle = regexp.MustCompile("^_$")
|
|
FilterRegTitle = regexp.MustCompile("^_$")
|
|
FilterRegTitle_0 = regexp.MustCompile("^_$")
|
|
FilterRegTitle_0 = regexp.MustCompile("^_$")
|
|
@@ -41,106 +39,11 @@ var (
|
|
timingSpanDay, timingPubScope int64
|
|
timingSpanDay, timingPubScope int64
|
|
gtid, lastid, sec_gtid, sec_lteid, lteid string
|
|
gtid, lastid, sec_gtid, sec_lteid, lteid string
|
|
updatelock, datalock, numlock, cronlock sync.Mutex
|
|
updatelock, datalock, numlock, cronlock sync.Mutex
|
|
- userName, passWord string
|
|
|
|
jyfb_data map[string]string
|
|
jyfb_data map[string]string
|
|
taskList []map[string]interface{}
|
|
taskList []map[string]interface{}
|
|
nspdata_1, nspdata_2 *nsqdata.Producer
|
|
nspdata_1, nspdata_2 *nsqdata.Producer
|
|
)
|
|
)
|
|
|
|
|
|
-func initMgo() {
|
|
|
|
- userName = qu.ObjToString(Sysconfig["userName"])
|
|
|
|
- passWord = qu.ObjToString(Sysconfig["passWord"])
|
|
|
|
- log.Println("集群用户密码:", userName, passWord)
|
|
|
|
- jyfb_arr := qu.ObjArrToStringArr(Sysconfig["jyfb_data"].([]interface{}))
|
|
|
|
- jyfb_data = make(map[string]string, 0)
|
|
|
|
- for _, v := range jyfb_arr {
|
|
|
|
- jyfb_data[v] = v
|
|
|
|
- }
|
|
|
|
- log.Println("伪判重~", jyfb_data)
|
|
|
|
-
|
|
|
|
- task_mconf := Sysconfig["task_mongodb"].(map[string]interface{})
|
|
|
|
- task_mgo = &MongodbSim{
|
|
|
|
- MongodbAddr: task_mconf["task_addrName"].(string),
|
|
|
|
- DbName: task_mconf["task_dbName"].(string),
|
|
|
|
- Size: qu.IntAllDef(task_mconf["task_pool"], 10),
|
|
|
|
- UserName: userName,
|
|
|
|
- Password: passWord,
|
|
|
|
- }
|
|
|
|
- task_mgo.InitPool()
|
|
|
|
- task_collName = task_mconf["task_collName"].(string)
|
|
|
|
- task_bidding = task_mconf["task_bidding"].(string)
|
|
|
|
-
|
|
|
|
- nextNode = qu.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{}))
|
|
|
|
- mconf = Sysconfig["mongodb"].(map[string]interface{})
|
|
|
|
- data_mgo = &MongodbSim{
|
|
|
|
- MongodbAddr: mconf["addr"].(string),
|
|
|
|
- DbName: mconf["db"].(string),
|
|
|
|
- Size: qu.IntAllDef(mconf["pool"], 10),
|
|
|
|
- }
|
|
|
|
- data_mgo.InitPool()
|
|
|
|
-
|
|
|
|
- extract = mconf["extract"].(string)
|
|
|
|
- extract_back = mconf["extract_back"].(string)
|
|
|
|
- extract_log = mconf["extract_log"].(string)
|
|
|
|
-
|
|
|
|
- FilterRegTitle = regexp.MustCompile(qu.ObjToString(Sysconfig["specialwords"]))
|
|
|
|
- FilterRegTitle_0 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_0"]))
|
|
|
|
- FilterRegTitle_1 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_1"]))
|
|
|
|
- FilterRegTitle_2 = regexp.MustCompile(qu.ObjToString(Sysconfig["specialtitle_2"]))
|
|
|
|
- threadNum = qu.IntAllDef(Sysconfig["threads"], 1)
|
|
|
|
- LowHeavy = Sysconfig["lowHeavy"].(bool)
|
|
|
|
- TimingTask = Sysconfig["timingTask"].(bool)
|
|
|
|
- timingSpanDay = qu.Int64All(Sysconfig["timingSpanDay"])
|
|
|
|
- timingPubScope = qu.Int64All(Sysconfig["timingPubScope"])
|
|
|
|
-}
|
|
|
|
-func initOther() {
|
|
|
|
- dupdays = qu.IntAllDef(Sysconfig["dupdays"], 5)
|
|
|
|
- DM = NewDatamap(dupdays, lastid)
|
|
|
|
- Update = newUpdatePool()
|
|
|
|
- go Update.updateData()
|
|
|
|
-
|
|
|
|
- if !IsFull {
|
|
|
|
- var err error
|
|
|
|
- nspdata_1, err = nsqdata.NewProducer("172.17.4.232:4150", "bidding_id", true)
|
|
|
|
- if err != nil {
|
|
|
|
- log.Fatal("通道配置异常~", err)
|
|
|
|
- } else {
|
|
|
|
- log.Println("通道配置正常")
|
|
|
|
- }
|
|
|
|
- nspdata_2, err = nsqdata.NewProducer("172.17.4.232:4150", "project_id", true)
|
|
|
|
- if err != nil {
|
|
|
|
- log.Fatal("通道配置异常~", err)
|
|
|
|
- } else {
|
|
|
|
- log.Println("通道配置正常~")
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- c := cron.New()
|
|
|
|
- c.AddFunc("0 0 1 ? * WED", func() {
|
|
|
|
- isUpdateSite = true
|
|
|
|
- })
|
|
|
|
- c.Start()
|
|
|
|
-}
|
|
|
|
-func initSite() {
|
|
|
|
- cronlock.Lock()
|
|
|
|
- site := mconf["site"].(map[string]interface{})
|
|
|
|
- SiteMap = make(map[string]map[string]interface{}, 0)
|
|
|
|
- start := int(time.Now().Unix())
|
|
|
|
- sess_site := data_mgo.GetMgoConn()
|
|
|
|
- defer data_mgo.DestoryMongoConn(sess_site)
|
|
|
|
- res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter()
|
|
|
|
- for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); {
|
|
|
|
- data_map := map[string]interface{}{
|
|
|
|
- "area": qu.ObjToString(site_dict["area"]),
|
|
|
|
- "city": qu.ObjToString(site_dict["city"]),
|
|
|
|
- "district": qu.ObjToString(site_dict["district"]),
|
|
|
|
- }
|
|
|
|
- SiteMap[qu.ObjToString(site_dict["site"])] = data_map
|
|
|
|
- }
|
|
|
|
- isUpdateSite = false
|
|
|
|
- log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap))
|
|
|
|
- cronlock.Unlock()
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
//初始化加载
|
|
//初始化加载
|
|
func init() {
|
|
func init() {
|
|
flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
|
|
flag.StringVar(&lastid, "id", "", "增量加载的lastid") //增量
|
|
@@ -148,7 +51,6 @@ func init() {
|
|
flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
|
|
flag.StringVar(&sec_gtid, "sec_gtid", "", "全量分段起始id")
|
|
flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
|
|
flag.StringVar(&sec_lteid, "sec_lteid", "", "全量分段结束id")
|
|
flag.Parse()
|
|
flag.Parse()
|
|
-
|
|
|
|
qu.ReadConfig(&Sysconfig)
|
|
qu.ReadConfig(&Sysconfig)
|
|
initMgo()
|
|
initMgo()
|
|
initOther()
|
|
initOther()
|
|
@@ -172,18 +74,16 @@ func mainT() {
|
|
|
|
|
|
//主函数
|
|
//主函数
|
|
func main() {
|
|
func main() {
|
|
-
|
|
|
|
go checkMapJob()
|
|
go checkMapJob()
|
|
updport := Sysconfig["udpport"].(string)
|
|
updport := Sysconfig["udpport"].(string)
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
|
|
udpclient.Listen(processUdpMsg)
|
|
udpclient.Listen(processUdpMsg)
|
|
log.Println("Udp服务监听", updport)
|
|
log.Println("Udp服务监听", updport)
|
|
-
|
|
|
|
if TimingTask {
|
|
if TimingTask {
|
|
log.Println("正常历史部署")
|
|
log.Println("正常历史部署")
|
|
go historyRepeat()
|
|
go historyRepeat()
|
|
} else {
|
|
} else {
|
|
- if !IsFull { //正常增量
|
|
|
|
|
|
+ if !IsFull {
|
|
log.Println("正常增量部署,监听任务")
|
|
log.Println("正常增量部署,监听任务")
|
|
go getRepeatTask()
|
|
go getRepeatTask()
|
|
}
|
|
}
|