|
@@ -26,22 +26,50 @@ var (
|
|
PoolSize = 100
|
|
PoolSize = 100
|
|
)
|
|
)
|
|
|
|
|
|
-type Job struct {
|
|
|
|
- Name string //脚本名称
|
|
|
|
- Appid string //用户唯一标识
|
|
|
|
- MatchScore float64 //打分
|
|
|
|
- Department string //标签
|
|
|
|
- Results *[]map[string]interface{} //最终要存库的数据
|
|
|
|
|
|
+type MyJobs struct {
|
|
|
|
+ Appid string
|
|
Lock sync.Mutex
|
|
Lock sync.Mutex
|
|
WaitGroup sync.WaitGroup
|
|
WaitGroup sync.WaitGroup
|
|
- ScriptFile string
|
|
|
|
- EachListPool chan bool
|
|
|
|
|
|
+ Jobs []*Job
|
|
RedisFiveData map[string]*[]*map[string]interface{}
|
|
RedisFiveData map[string]*[]*map[string]interface{}
|
|
- FilterCount int
|
|
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//更新redis
|
|
|
|
+func (j *MyJobs) UpdateRedis() {
|
|
|
|
+ log.Println(j.Appid, "更新redis。。。")
|
|
|
|
+ if len(j.RedisFiveData) == 0 {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ now := time.Now()
|
|
|
|
+ for k, v := range j.RedisFiveData {
|
|
|
|
+ if v == nil || len(*v) == 0 {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ createtime, _ := time.ParseInLocation(qutil.Date_yyyyMMdd, strings.Split(k, "_")[3], time.Local)
|
|
|
|
+ timeout := createtime.AddDate(0, 0, 5).Sub(now).Seconds()
|
|
|
|
+ if timeout > 0 {
|
|
|
|
+ redis.Put("filter", k, v, qutil.IntAll(timeout))
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ j.RedisFiveData = map[string]*[]*map[string]interface{}{}
|
|
|
|
+ log.Println(j.Appid, "更新redis over!")
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type Job struct {
|
|
|
|
+ Name string //脚本名称
|
|
|
|
+ Appid string //用户唯一标识
|
|
|
|
+ MatchScore int //打分
|
|
|
|
+ Department string //标签
|
|
|
|
+ Results *[]map[string]interface{} //最终要存库的数据
|
|
|
|
+ Lock sync.Mutex
|
|
|
|
+ WaitGroup sync.WaitGroup
|
|
|
|
+ ScriptFile string
|
|
|
|
+ EachListPool chan bool
|
|
|
|
+ FilterCount int
|
|
}
|
|
}
|
|
|
|
|
|
//任务
|
|
//任务
|
|
-func (j *Job) Start(list *[]map[string]interface{}) {
|
|
|
|
|
|
+func (j *Job) Start(myJobs *MyJobs, list *[]map[string]interface{}) {
|
|
defer qutil.Catch()
|
|
defer qutil.Catch()
|
|
count := 0
|
|
count := 0
|
|
for _, v := range *list {
|
|
for _, v := range *list {
|
|
@@ -60,7 +88,7 @@ func (j *Job) Start(list *[]map[string]interface{}) {
|
|
result := j.ExecJob(script, &info)
|
|
result := j.ExecJob(script, &info)
|
|
//保存
|
|
//保存
|
|
if result != nil && len(*result) > 0 && IsSave {
|
|
if result != nil && len(*result) > 0 && IsSave {
|
|
- j.Save(result, false)
|
|
|
|
|
|
+ j.Save(myJobs, result, false)
|
|
}
|
|
}
|
|
}(v)
|
|
}(v)
|
|
if count%200 == 0 {
|
|
if count%200 == 0 {
|
|
@@ -69,8 +97,7 @@ func (j *Job) Start(list *[]map[string]interface{}) {
|
|
count++
|
|
count++
|
|
}
|
|
}
|
|
j.WaitGroup.Wait()
|
|
j.WaitGroup.Wait()
|
|
- j.Save(nil, true)
|
|
|
|
- j.UpdateRedis()
|
|
|
|
|
|
+ j.Save(myJobs, nil, true)
|
|
log.Println("脚本", j.Name, "执行完毕!")
|
|
log.Println("脚本", j.Name, "执行完毕!")
|
|
}
|
|
}
|
|
|
|
|
|
@@ -111,7 +138,7 @@ func (j *Job) ExecJob(script *Script, info *map[string]interface{}) *map[string]
|
|
}
|
|
}
|
|
|
|
|
|
//保存到mongodb
|
|
//保存到mongodb
|
|
-func (j *Job) Save(result *map[string]interface{}, flag bool) {
|
|
|
|
|
|
+func (j *Job) Save(myJobs *MyJobs, result *map[string]interface{}, flag bool) {
|
|
j.Lock.Lock()
|
|
j.Lock.Lock()
|
|
defer j.Lock.Unlock()
|
|
defer j.Lock.Unlock()
|
|
if result != nil {
|
|
if result != nil {
|
|
@@ -120,8 +147,9 @@ func (j *Job) Save(result *map[string]interface{}, flag bool) {
|
|
(*result)["appid"] = j.Appid
|
|
(*result)["appid"] = j.Appid
|
|
(*result)["department"] = j.Department
|
|
(*result)["department"] = j.Department
|
|
(*result)["matchscore"] = j.MatchScore
|
|
(*result)["matchscore"] = j.MatchScore
|
|
|
|
+ (*result)["remark"] = ""
|
|
delete(*result, "_id")
|
|
delete(*result, "_id")
|
|
- if j.Filter(*result) {
|
|
|
|
|
|
+ if j.Filter(myJobs, *result) {
|
|
j.FilterCount++
|
|
j.FilterCount++
|
|
} else {
|
|
} else {
|
|
*j.Results = append(*j.Results, *result)
|
|
*j.Results = append(*j.Results, *result)
|
|
@@ -144,7 +172,9 @@ func (j *Job) Save(result *map[string]interface{}, flag bool) {
|
|
}
|
|
}
|
|
|
|
|
|
//过滤
|
|
//过滤
|
|
-func (j *Job) Filter(result map[string]interface{}) bool {
|
|
|
|
|
|
+func (j *Job) Filter(myJobs *MyJobs, result map[string]interface{}) bool {
|
|
|
|
+ myJobs.Lock.Lock()
|
|
|
|
+ defer myJobs.Lock.Unlock()
|
|
area := qutil.ObjToString(result["area"])
|
|
area := qutil.ObjToString(result["area"])
|
|
if area == "A" {
|
|
if area == "A" {
|
|
area = "全国"
|
|
area = "全国"
|
|
@@ -169,10 +199,10 @@ L:
|
|
keys = append(keys, fmt.Sprintf("%s_%s_%s_%s_%s", j.Appid, j.Department, toptype, v, "全国"))
|
|
keys = append(keys, fmt.Sprintf("%s_%s_%s_%s_%s", j.Appid, j.Department, toptype, v, "全国"))
|
|
}
|
|
}
|
|
for _, key := range keys {
|
|
for _, key := range keys {
|
|
- fiveData := j.RedisFiveData[key]
|
|
|
|
|
|
+ fiveData := myJobs.RedisFiveData[key]
|
|
if fiveData == nil {
|
|
if fiveData == nil {
|
|
fiveData = &[]*map[string]interface{}{}
|
|
fiveData = &[]*map[string]interface{}{}
|
|
- j.RedisFiveData[key] = fiveData
|
|
|
|
|
|
+ myJobs.RedisFiveData[key] = fiveData
|
|
redisDatas, _ := redis.Get("filter", key).([]interface{})
|
|
redisDatas, _ := redis.Get("filter", key).([]interface{})
|
|
for _, rsd := range redisDatas {
|
|
for _, rsd := range redisDatas {
|
|
var rddm map[string]interface{}
|
|
var rddm map[string]interface{}
|
|
@@ -245,31 +275,12 @@ L:
|
|
"city": city,
|
|
"city": city,
|
|
"publishtime": publishtime,
|
|
"publishtime": publishtime,
|
|
}
|
|
}
|
|
- array := j.RedisFiveData[redisKey]
|
|
|
|
|
|
+ array := myJobs.RedisFiveData[redisKey]
|
|
if array == nil {
|
|
if array == nil {
|
|
array = &[]*map[string]interface{}{}
|
|
array = &[]*map[string]interface{}{}
|
|
- j.RedisFiveData[redisKey] = array
|
|
|
|
|
|
+ myJobs.RedisFiveData[redisKey] = array
|
|
}
|
|
}
|
|
*array = append(*array, &data)
|
|
*array = append(*array, &data)
|
|
}
|
|
}
|
|
return false
|
|
return false
|
|
}
|
|
}
|
|
-
|
|
|
|
-//更新redis
|
|
|
|
-func (j *Job) UpdateRedis() {
|
|
|
|
- if j.RedisFiveData == nil || len(j.RedisFiveData) == 0 {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- now := time.Now()
|
|
|
|
- for k, v := range j.RedisFiveData {
|
|
|
|
- if v == nil || len(*v) == 0 {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
- createtime, _ := time.ParseInLocation(qutil.Date_yyyyMMdd, strings.Split(k, "_")[3], time.Local)
|
|
|
|
- timeout := createtime.AddDate(0, 0, 5).Sub(now).Seconds()
|
|
|
|
- if timeout > 0 {
|
|
|
|
- redis.Put("filter", k, v, qutil.IntAll(timeout))
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- j.RedisFiveData = map[string]*[]*map[string]interface{}{}
|
|
|
|
-}
|
|
|