|
@@ -27,6 +27,7 @@ type Task struct {
|
|
IsBidding bool
|
|
IsBidding bool
|
|
Flows []string
|
|
Flows []string
|
|
SpiderScriptMap map[string]*spider.Spider
|
|
SpiderScriptMap map[string]*spider.Spider
|
|
|
|
+ CheckFields map[string]bool
|
|
}
|
|
}
|
|
|
|
|
|
func (t *Task) StartTask() {
|
|
func (t *Task) StartTask() {
|
|
@@ -116,9 +117,10 @@ func (t *Task) StartJob() {
|
|
//每个爬虫加载需要下载的数据
|
|
//每个爬虫加载需要下载的数据
|
|
q := map[string]interface{}{ //查询未下载成功的数据
|
|
q := map[string]interface{}{ //查询未下载成功的数据
|
|
"spidercode": spidercode,
|
|
"spidercode": spidercode,
|
|
- "state": map[string]interface{}{
|
|
|
|
- "$ne": 1,
|
|
|
|
- },
|
|
|
|
|
|
+ "state": 0,
|
|
|
|
+ //"state": map[string]interface{}{
|
|
|
|
+ // "$ne": 1,
|
|
|
|
+ //},
|
|
}
|
|
}
|
|
fields := map[string]interface{}{
|
|
fields := map[string]interface{}{
|
|
"href": 1,
|
|
"href": 1,
|
|
@@ -131,36 +133,36 @@ func (t *Task) StartJob() {
|
|
list, _ := MgoDT.Find(t.Coll, q, nil, fields, false, 0, 100)
|
|
list, _ := MgoDT.Find(t.Coll, q, nil, fields, false, 0, 100)
|
|
if list != nil && len(*list) > 0 {
|
|
if list != nil && len(*list) > 0 {
|
|
updateArr := [][]map[string]interface{}{}
|
|
updateArr := [][]map[string]interface{}{}
|
|
- for _, tmp := range *list {
|
|
|
|
|
|
+ for _, l := range *list {
|
|
//_id := tmp["_id"]
|
|
//_id := tmp["_id"]
|
|
update := []map[string]interface{}{}
|
|
update := []map[string]interface{}{}
|
|
data := map[string]interface{}{}
|
|
data := map[string]interface{}{}
|
|
- result := map[string]interface{}{}
|
|
|
|
- query := map[string]interface{}{"_id": tmp["_id"]}
|
|
|
|
- times := qu.IntAll(tmp["times"]) //获取下载次数
|
|
|
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
|
+ query := map[string]interface{}{"_id": l["_id"]}
|
|
|
|
+ times := qu.IntAll(l["times"]) //获取下载次数
|
|
//publishtime处理成字符串
|
|
//publishtime处理成字符串
|
|
- if publishtime, ok := tmp["publishtime"].(int64); ok { //int64
|
|
|
|
|
|
+ if publishtime, ok := l["publishtime"].(int64); ok { //int64
|
|
data["publishtime"] = qu.FormatDateByInt64(&publishtime, qu.Date_Full_Layout)
|
|
data["publishtime"] = qu.FormatDateByInt64(&publishtime, qu.Date_Full_Layout)
|
|
- } else if publishtime, ok := tmp["publishtime"].(string); ok {
|
|
|
|
|
|
+ } else if publishtime, ok := l["publishtime"].(string); ok {
|
|
data["publishtime"] = publishtime
|
|
data["publishtime"] = publishtime
|
|
}
|
|
}
|
|
//jsondata处理成字符串
|
|
//jsondata处理成字符串
|
|
- if jd, ok := tmp["jsondata"].(map[string]interface{}); ok && jd != nil {
|
|
|
|
|
|
+ if jd, ok := l["jsondata"].(map[string]interface{}); ok && jd != nil {
|
|
if jd_byte, err := json.Marshal(jd); err == nil && string(jd_byte) != "" {
|
|
if jd_byte, err := json.Marshal(jd); err == nil && string(jd_byte) != "" {
|
|
data["jsondata"] = string(jd_byte)
|
|
data["jsondata"] = string(jd_byte)
|
|
}
|
|
}
|
|
- } else if jd, ok := tmp["jsondata"].(string); ok && jd != "" {
|
|
|
|
|
|
+ } else if jd, ok := l["jsondata"].(string); ok && jd != "" {
|
|
data["jsondata"] = jd
|
|
data["jsondata"] = jd
|
|
}
|
|
}
|
|
- data["title"] = tmp["title"]
|
|
|
|
- data["href"] = tmp["href"]
|
|
|
|
|
|
+ data["title"] = l["title"]
|
|
|
|
+ data["href"] = l["href"]
|
|
var err interface{}
|
|
var err interface{}
|
|
//下载详情页
|
|
//下载详情页
|
|
- result, err = sp.DownloadDetailPage(data, result)
|
|
|
|
|
|
+ tmp, err = sp.DownloadDetailPage(data, tmp)
|
|
//删除多余字段
|
|
//删除多余字段
|
|
//delete(result, "exit")
|
|
//delete(result, "exit")
|
|
//delete(result, "checkpublishtime")
|
|
//delete(result, "checkpublishtime")
|
|
- if err != nil || len(result) == 0 { //下载失败
|
|
|
|
|
|
+ if err != nil || len(tmp) == 0 { //下载失败
|
|
times++
|
|
times++
|
|
ss := map[string]interface{}{"times": times}
|
|
ss := map[string]interface{}{"times": times}
|
|
if times >= 3 { //3次下载失败今天不再下载,state置为1
|
|
if times >= 3 { //3次下载失败今天不再下载,state置为1
|
|
@@ -173,7 +175,7 @@ func (t *Task) StartJob() {
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
//正文、附件分析,下载异常数据重新下载
|
|
//正文、附件分析,下载异常数据重新下载
|
|
- if AnalysisProjectInfo(result) {
|
|
|
|
|
|
+ if AnalysisProjectInfo(tmp) {
|
|
times++
|
|
times++
|
|
ss := map[string]interface{}{"times": times}
|
|
ss := map[string]interface{}{"times": times}
|
|
if times >= 3 { //3次下载失败今天不再下载,state置为-1
|
|
if times >= 3 { //3次下载失败今天不再下载,state置为-1
|
|
@@ -185,18 +187,19 @@ func (t *Task) StartJob() {
|
|
updateArr = append(updateArr, update)
|
|
updateArr = append(updateArr, update)
|
|
continue
|
|
continue
|
|
}
|
|
}
|
|
- l_np_publishtime := qu.Int64All(result["l_np_publishtime"])
|
|
|
|
|
|
+ l_np_publishtime := qu.Int64All(tmp["l_np_publishtime"])
|
|
if l_np_publishtime > time.Now().Unix() || l_np_publishtime == 0 { //防止发布时间超前
|
|
if l_np_publishtime > time.Now().Unix() || l_np_publishtime == 0 { //防止发布时间超前
|
|
l_np_publishtime = time.Now().Unix()
|
|
l_np_publishtime = time.Now().Unix()
|
|
}
|
|
}
|
|
- delete(result, "l_np_publishtime")
|
|
|
|
- result["publishtime"] = l_np_publishtime
|
|
|
|
|
|
+ delete(tmp, "l_np_publishtime")
|
|
|
|
+ tmp["publishtime"] = l_np_publishtime
|
|
if !t.IsBidding { //不是bidding数据,走保存服务需要以下字段
|
|
if !t.IsBidding { //不是bidding数据,走保存服务需要以下字段
|
|
- result["iscompete"] = sp.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
|
|
|
|
|
|
+ tmp["iscompete"] = sp.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
|
|
//spider.Store(sp.StoreMode, sp.StoreToMsgEvent, sp.Collection, sp.CoverAttr, result, true)
|
|
//spider.Store(sp.StoreMode, sp.StoreToMsgEvent, sp.Collection, sp.CoverAttr, result, true)
|
|
}
|
|
}
|
|
|
|
+ tmp["state"] = 1
|
|
|
|
+ result := t.CheckField(tmp) //校验字段
|
|
//下载成功
|
|
//下载成功
|
|
- result["state"] = 1
|
|
|
|
update = append(update, query)
|
|
update = append(update, query)
|
|
update = append(update, map[string]interface{}{"$set": result})
|
|
update = append(update, map[string]interface{}{"$set": result})
|
|
updateArr = append(updateArr, update)
|
|
updateArr = append(updateArr, update)
|
|
@@ -217,6 +220,51 @@ func (t *Task) StartJob() {
|
|
logger.Info(t.Name, t.ID, t.Coll, "重采完毕...")
|
|
logger.Info(t.Name, t.ID, t.Coll, "重采完毕...")
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+//字段校验
|
|
|
|
+func (t *Task) CheckField(tmp map[string]interface{}) map[string]interface{} {
|
|
|
|
+ defer qu.Catch()
|
|
|
|
+ result := map[string]interface{}{}
|
|
|
|
+ if len(t.CheckFields) > 0 {
|
|
|
|
+ for field, _ := range t.CheckFields {
|
|
|
|
+ fieldOk := true
|
|
|
|
+ if field == "projectinfo" { //附件信息校验
|
|
|
|
+ if projectinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 {
|
|
|
|
+ if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 {
|
|
|
|
+ for _, data := range attachments {
|
|
|
|
+ if d, ok := data.(map[string]interface{}); ok {
|
|
|
|
+ fid := qu.ObjToString(d["fid"])
|
|
|
|
+ if fid == "" { //附件上传成功
|
|
|
|
+ fieldOk = false
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ fieldOk = false
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ fieldOk = false
|
|
|
|
+ }
|
|
|
|
+ } else { //其它字段校验
|
|
|
|
+ if tmp[field] == nil || qu.ObjToString(tmp[field]) == "" {
|
|
|
|
+ fieldOk = false
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if !fieldOk { //字段值下载出错,该条数据下载失败
|
|
|
|
+ result["state"] = -1
|
|
|
|
+ return result
|
|
|
|
+ } else if fieldOk && t.IsBidding { //bidding数据指定更新某字段
|
|
|
|
+ result[field] = tmp[field]
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if t.IsBidding {
|
|
|
|
+ result["state"] = 1
|
|
|
|
+ return result
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return tmp
|
|
|
|
+}
|
|
|
|
+
|
|
//数据推送
|
|
//数据推送
|
|
func (t *Task) SendData() {
|
|
func (t *Task) SendData() {
|
|
if t.IsBidding { //1、bidding数据推送
|
|
if t.IsBidding { //1、bidding数据推送
|