|
@@ -4,9 +4,14 @@
|
|
|
package qyfw
|
|
|
|
|
|
import (
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
"log"
|
|
|
+ "math"
|
|
|
qutil "qfw/util"
|
|
|
"qfw/util/mongodb"
|
|
|
+ "qfw/util/redis"
|
|
|
+ "strings"
|
|
|
"sync"
|
|
|
"time"
|
|
|
"util"
|
|
@@ -22,17 +27,22 @@ var (
|
|
|
)
|
|
|
|
|
|
type Job struct {
|
|
|
- Name string //脚本名称
|
|
|
- Appid string //用户唯一标识
|
|
|
- Results *[]map[string]interface{} //最终要存库的数据
|
|
|
- Lock sync.Mutex
|
|
|
- WaitGroup sync.WaitGroup
|
|
|
- ScriptFile string
|
|
|
- EachListPool chan bool
|
|
|
+ Name string //脚本名称
|
|
|
+ Appid string //用户唯一标识
|
|
|
+ MatchScore float64 //打分
|
|
|
+ Department string //标签
|
|
|
+ Results *[]map[string]interface{} //最终要存库的数据
|
|
|
+ Lock sync.Mutex
|
|
|
+ WaitGroup sync.WaitGroup
|
|
|
+ ScriptFile string
|
|
|
+ EachListPool chan bool
|
|
|
+ RedisFiveData map[string]*[]*map[string]interface{}
|
|
|
+ FilterCount int
|
|
|
}
|
|
|
|
|
|
//任务
|
|
|
func (j *Job) Start(list *[]map[string]interface{}) {
|
|
|
+ defer qutil.Catch()
|
|
|
count := 0
|
|
|
for _, v := range *list {
|
|
|
j.EachListPool <- true
|
|
@@ -50,7 +60,7 @@ func (j *Job) Start(list *[]map[string]interface{}) {
|
|
|
result := j.ExecJob(script, &info)
|
|
|
//保存
|
|
|
if result != nil && len(*result) > 0 && IsSave {
|
|
|
- j.Save(result, j.Appid, false)
|
|
|
+ j.Save(result, false)
|
|
|
}
|
|
|
}(v)
|
|
|
if count%200 == 0 {
|
|
@@ -59,7 +69,8 @@ func (j *Job) Start(list *[]map[string]interface{}) {
|
|
|
count++
|
|
|
}
|
|
|
j.WaitGroup.Wait()
|
|
|
- j.Save(nil, j.Appid, true)
|
|
|
+ j.Save(nil, true)
|
|
|
+ j.UpdateRedis()
|
|
|
log.Println("脚本", j.Name, "执行完毕!")
|
|
|
}
|
|
|
|
|
@@ -100,27 +111,165 @@ func (j *Job) ExecJob(script *Script, info *map[string]interface{}) *map[string]
|
|
|
}
|
|
|
|
|
|
//保存到mongodb
|
|
|
-func (j *Job) Save(result *map[string]interface{}, appid string, flag bool) {
|
|
|
+func (j *Job) Save(result *map[string]interface{}, flag bool) {
|
|
|
j.Lock.Lock()
|
|
|
defer j.Lock.Unlock()
|
|
|
if result != nil {
|
|
|
(*result)["createtime"] = time.Now().Unix()
|
|
|
- (*result)["appid"] = appid
|
|
|
(*result)["id"] = (*result)["_id"]
|
|
|
+ (*result)["appid"] = j.Appid
|
|
|
+ (*result)["department"] = j.Department
|
|
|
+ (*result)["matchscore"] = j.MatchScore
|
|
|
delete(*result, "_id")
|
|
|
- *j.Results = append(*j.Results, *result)
|
|
|
+ if j.Filter(*result) {
|
|
|
+ j.FilterCount++
|
|
|
+ } else {
|
|
|
+ *j.Results = append(*j.Results, *result)
|
|
|
+ }
|
|
|
}
|
|
|
length := len(*j.Results)
|
|
|
if length == 0 {
|
|
|
+ if flag {
|
|
|
+ log.Println(j.Name, "save", 0, "filter", j.FilterCount)
|
|
|
+ j.FilterCount = 0
|
|
|
+ }
|
|
|
return
|
|
|
}
|
|
|
if length >= SaveSize || flag {
|
|
|
- thisSaveSize := SaveSize
|
|
|
- if flag {
|
|
|
- thisSaveSize = len(*j.Results)
|
|
|
- }
|
|
|
- log.Println(appid, "save", thisSaveSize)
|
|
|
+ log.Println(j.Name, "save", length, "filter", j.FilterCount)
|
|
|
mongodb.SaveBulk(Collection, *j.Results...)
|
|
|
j.Results = &[]map[string]interface{}{}
|
|
|
+ j.FilterCount = 0
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//过滤
|
|
|
+func (j *Job) Filter(result map[string]interface{}) bool {
|
|
|
+ area := qutil.ObjToString(result["area"])
|
|
|
+ if area == "A" {
|
|
|
+ area = "全国"
|
|
|
+ }
|
|
|
+ publishtime := qutil.Int64All(result["publishtime"])
|
|
|
+ toptype := qutil.ObjToString(result["toptype"])
|
|
|
+ title := qutil.ObjToString(result["title"])
|
|
|
+ buyer := qutil.ObjToString(result["buyer"])
|
|
|
+ projectname := qutil.ObjToString(result["projectname"])
|
|
|
+ projectcode := qutil.ObjToString(result["projectcode"])
|
|
|
+ city := qutil.ObjToString(result["city"])
|
|
|
+ now := time.Now()
|
|
|
+ nowDate := qutil.FormatDate(&now, qutil.Date_yyyyMMdd)
|
|
|
+ redisKey := fmt.Sprintf("%s_%s_%s_%s_%s", j.Appid, j.Department, toptype, nowDate, area)
|
|
|
+ fiveDay := util.GetLatelyFiveDay(5)
|
|
|
+ isRepeat := false
|
|
|
+L:
|
|
|
+ for _, v := range fiveDay {
|
|
|
+ keys := []string{fmt.Sprintf("%s_%s_%s_%s_%s", j.Appid, j.Department, toptype, v, area)}
|
|
|
+ //如果不是全国的话,和全国的数据进行比较
|
|
|
+ if area != "全国" {
|
|
|
+ keys = append(keys, fmt.Sprintf("%s_%s_%s_%s_%s", j.Appid, j.Department, toptype, v, "全国"))
|
|
|
+ }
|
|
|
+ for _, key := range keys {
|
|
|
+ fiveData := j.RedisFiveData[key]
|
|
|
+ if fiveData == nil {
|
|
|
+ fiveData = &[]*map[string]interface{}{}
|
|
|
+ j.RedisFiveData[key] = fiveData
|
|
|
+ redisDatas, _ := redis.Get("filter", key).([]interface{})
|
|
|
+ for _, rsd := range redisDatas {
|
|
|
+ var rddm map[string]interface{}
|
|
|
+ rsdByte, err := json.Marshal(rsd)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if err := json.Unmarshal(rsdByte, &rddm); err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if rddm != nil && len(rddm) > 0 {
|
|
|
+ *fiveData = append(*fiveData, &rddm)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for _, data := range *fiveData {
|
|
|
+ //buyer/projectname/projectcode三个有两个相等即为重复
|
|
|
+ r_publishtime := qutil.Int64All((*data)["publishtime"])
|
|
|
+ r_buyer := qutil.ObjToString((*data)["buyer"])
|
|
|
+ r_projectname := qutil.ObjToString((*data)["projectname"])
|
|
|
+ r_projectcode := qutil.ObjToString((*data)["projectcode"])
|
|
|
+ //只判断发布时间在5天之内的
|
|
|
+ if math.Abs(qutil.Float64All(publishtime-r_publishtime)) > 432000 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ if buyer == r_buyer && (projectname == r_projectname || projectcode == r_projectcode) {
|
|
|
+ isRepeat = true
|
|
|
+ break L
|
|
|
+ }
|
|
|
+ if projectname == r_projectname && (buyer == r_buyer || projectcode == r_projectcode) {
|
|
|
+ isRepeat = true
|
|
|
+ break L
|
|
|
+ }
|
|
|
+ if projectcode == r_projectcode && (buyer == r_buyer || projectname == r_projectname) {
|
|
|
+ isRepeat = true
|
|
|
+ break L
|
|
|
+ }
|
|
|
+ //标题长度大于10且相等即为重复
|
|
|
+ r_title := qutil.ObjToString((*data)["title"])
|
|
|
+ if len([]rune(title)) > 10 && title == r_title {
|
|
|
+ isRepeat = true
|
|
|
+ break L
|
|
|
+ }
|
|
|
+ //标题长度大于10且包含关系+buyer/projectname/projectcode/city(全国/A的只判断包含关系即可)相等即为重复
|
|
|
+ r_city := qutil.ObjToString((*data)["city"])
|
|
|
+ if len([]rune(title)) > 10 && len([]rune(r_title)) > 10 && (strings.Contains(title, r_title) || strings.Contains(r_title, title)) {
|
|
|
+ if area == "全国" {
|
|
|
+ isRepeat = true
|
|
|
+ break L
|
|
|
+ }
|
|
|
+ if buyer == r_buyer || projectname == r_projectname || projectcode == r_projectcode || city == r_city {
|
|
|
+ isRepeat = true
|
|
|
+ break L
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if isRepeat {
|
|
|
+ return true
|
|
|
+ }
|
|
|
+ if !isRepeat {
|
|
|
+ data := map[string]interface{}{
|
|
|
+ "title": title,
|
|
|
+ "buyer": buyer,
|
|
|
+ "projectname": projectname,
|
|
|
+ "projectcode": projectcode,
|
|
|
+ "city": city,
|
|
|
+ "publishtime": publishtime,
|
|
|
+ }
|
|
|
+ array := j.RedisFiveData[redisKey]
|
|
|
+ if array == nil {
|
|
|
+ array = &[]*map[string]interface{}{}
|
|
|
+ j.RedisFiveData[redisKey] = array
|
|
|
+ }
|
|
|
+ *array = append(*array, &data)
|
|
|
+ }
|
|
|
+ return false
|
|
|
+}
|
|
|
+
|
|
|
+//更新redis
|
|
|
+func (j *Job) UpdateRedis() {
|
|
|
+ if j.RedisFiveData == nil || len(j.RedisFiveData) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ now := time.Now()
|
|
|
+ for k, v := range j.RedisFiveData {
|
|
|
+ if v == nil || len(*v) == 0 {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ createtime, _ := time.ParseInLocation(qutil.Date_yyyyMMdd, strings.Split(k, "_")[3], time.Local)
|
|
|
+ timeout := createtime.AddDate(0, 0, 5).Sub(now).Seconds()
|
|
|
+ if timeout > 0 {
|
|
|
+ redis.Put("filter", k, v, qutil.IntAll(timeout))
|
|
|
+ }
|
|
|
}
|
|
|
+ j.RedisFiveData = map[string]*[]*map[string]interface{}{}
|
|
|
}
|