Browse Source

details_webcookie

maguopeng 3 năm trước cách đây
mục cha
commit
8ca61541da

+ 1 - 2
Details/details_webcookie.py

@@ -190,8 +190,7 @@ class Details(feapder.Spider):
         key = down_mid.get("key")
         page_url = down_mid.get("page_url")
         cookie_key = down_mid.get("cookie_key")
-        print(page_url,cookie_key)
-
+        request.headers={"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36"}
         cookie_pool = WebCookiePool(redis_key=key, page_url=page_url, cookie_key=cookie_key)
         request.cookies = cookie_pool.get_cookie()
         return request

+ 1 - 2
FworkSpider/feapder/commands/create/create_spider.py

@@ -18,8 +18,7 @@ from .create_init import CreateInit
 
 def deal_file_info(file):
     file = file.replace("{DATE}", tools.get_current_date())
-    # file = file.replace("{USER}", getpass.getuser())
-    file = file.replace("{USER}", os.path.basename(os.getcwd()))
+    file = file.replace("{USER}", getpass.getuser())
 
     return file
 

+ 0 - 6
FworkSpider/feapder/core/base_parser.py

@@ -95,12 +95,6 @@ class BaseParser(object):
         ---------
         @param request:  request.url 为文件下载地址, 该方法需要自行调用
         request.INFO  为上传文件时所需要提供的部分参数  必传
-         info = {
-            "org_url": "http://www...",  # 文件下载连接
-            "filename": f"{list_item.title}.docx",  # 文件名
-            "channel": list_item.channel,
-            "ftype": 'docx,zip,ftp', # 文件类型
-        }
         request.headers 则存放请求的必要参数,如:parmas,headers  必传
         ---------
         @result: request / item / callback / None (返回值必须可迭代),正常处理为 None 即可

+ 34 - 15
FworkSpider/feapder/core/parser_control.py

@@ -141,21 +141,34 @@ class PaserControl(threading.Thread):
                                     )
                                 used_download_midware_enable = True
                                 if not response:
+                                    try:
+                                        response = (
+                                            request_temp.get_response()
+                                            if not setting.RESPONSE_CACHED_USED
+                                            else request_temp.get_response_from_cached(
+                                                save_cached=False
+                                            )
+                                        )
+                                    except Exception as e:
+                                        log.info("requests", extra={"url": request.url or request_temp.url, "code": -1,"error_info":e})
+                                        raise Exception(
+                                            "连接超时 url: %s" % (request.url or request_temp.url)
+                                        )
+
+                            else:
+                                try:
                                     response = (
-                                        request_temp.get_response()
+                                        request.get_response()
                                         if not setting.RESPONSE_CACHED_USED
-                                        else request_temp.get_response_from_cached(
+                                        else request.get_response_from_cached(
                                             save_cached=False
                                         )
                                     )
-                            else:
-                                response = (
-                                    request.get_response()
-                                    if not setting.RESPONSE_CACHED_USED
-                                    else request.get_response_from_cached(
-                                        save_cached=False
+                                except Exception as e:
+                                    log.info("requests", extra={"url": request.url or request_temp.url, "code": -1, "error_info": e})
+                                    raise Exception(
+                                        "连接超时 url: %s" % (request.url or request_temp.url)
                                     )
-                                )
 
                             if response == None:
                                 raise Exception(
@@ -535,13 +548,19 @@ class AirSpiderParserControl(PaserControl):
                                 request = request_temp
 
                             if not response:
-                                response = (
-                                    request.get_response()
-                                    if not setting.RESPONSE_CACHED_USED
-                                    else request.get_response_from_cached(
-                                        save_cached=False
+                                try:
+                                    response = (
+                                        request.get_response()
+                                        if not setting.RESPONSE_CACHED_USED
+                                        else request.get_response_from_cached(
+                                            save_cached=False
+                                        )
+                                    )
+                                except Exception as e:
+                                    log.info("requests", extra={"url": request.url or request_temp.url, "code": -1, "error_info": e})
+                                    raise Exception(
+                                        "连接超时 url: %s" % (request.url or request_temp.url)
                                     )
-                                )
 
                         else:
                             response = None

+ 5 - 2
FworkSpider/feapder/core/scheduler.py

@@ -324,13 +324,14 @@ class Scheduler(threading.Thread):
         """
         # 每分钟检查一次
         now_time = time.time()
-        if now_time - self._last_check_task_status_time > 60:
+        if now_time - self._last_check_task_status_time > 30:
             self._last_check_task_status_time = now_time
         else:
             return
 
         # 检查redis中任务状态,若连续20分钟内任务数量未发生变化(parser可能卡死),则发出报警信息
         task_count = self._redisdb.zget_count(self._tab_requests)
+        print(task_count)
 
         if task_count:
             if task_count != self._last_task_count:
@@ -349,7 +350,7 @@ class Scheduler(threading.Thread):
 
                     -- 取值
                     local last_timestamp = redis.call('hget', KEYS[1], field)
-                    if last_timestamp and current_timestamp - last_timestamp >= 1200 then
+                    if last_timestamp and current_timestamp - last_timestamp >= 600 then
                         return current_timestamp - last_timestamp -- 返回任务停滞时间 秒
                     end
 
@@ -376,6 +377,8 @@ class Scheduler(threading.Thread):
                         self._spider_name, tools.format_seconds(overtime)
                     )
                     log.error(msg)
+                    log.error("爬虫任务异常停滞,爬虫将强制退出")
+                    exit()
                     self.send_msg(
                         msg,
                         level="error",

+ 1 - 1
FworkSpider/feapder/network/cookie_pool.py

@@ -242,7 +242,7 @@ class LoginCookiePool(CookiePoolInterface):
         self._password_key = password_key
 
         self._redisdb = RedisDB()
-        self._mysqldb = ()
+        self._mysqldb = MysqlDB()
 
         self.create_userbase()
 

+ 20 - 20
FworkSpider/feapder/network/proxy_file/a62f3217a0981b7b2117d9d0af64c2db.txt

@@ -1,20 +1,20 @@
-175.162.217.157:8860&&1643361380
-222.86.85.51:8861&&1643361867
-222.86.85.51:8860&&1643361867
-182.101.215.123:8861&&1643361013
-182.34.32.132:8860&&1643361124
-182.101.215.123:8860&&1643361013
-182.34.32.132:8861&&1643361124
-113.123.0.11:8861&&1643361579
-113.123.0.11:8860&&1643361579
-117.66.140.217:8860&&1643361016
-117.66.140.217:8861&&1643361016
-123.10.66.129:8860&&1643361437
-123.10.66.129:8861&&1643361437
-123.169.34.75:8860&&1643360309
-123.169.34.75:8861&&1643360309
-175.162.217.157:8861&&1643361379
-111.179.73.220:8860&&1643360596
-111.179.73.220:8861&&1643360596
-36.62.71.201:8861&&1643360585
-36.62.71.201:8860&&1643360585
+122.159.219.174:8860&&1653299700
+182.34.19.216:8860&&1653299010
+106.35.223.168:8861&&1653298655
+125.45.91.69:8861&&1653298844
+125.45.91.69:8860&&1653298844
+122.159.219.174:8861&&1653299700
+106.35.223.168:8860&&1653298655
+182.34.19.216:8861&&1653299010
+113.121.20.254:8861&&1653300488
+125.72.106.216:8861&&1653300251
+113.121.20.254:8860&&1653300488
+125.72.106.216:8860&&1653300251
+119.112.80.248:8861&&1653298967
+119.112.80.248:8860&&1653298967
+58.213.26.197:8860&&1653298952
+58.213.26.197:8861&&1653298952
+113.226.110.38:8861&&1653300048
+113.226.110.38:8860&&1653300048
+113.121.41.156:8860&&1653299102
+113.121.41.156:8861&&1653299102

+ 11 - 4
FworkSpider/feapder/network/request.py

@@ -9,6 +9,7 @@ Created on 2018-07-25 11:49:08
 """
 
 import requests
+from func_timeout import func_set_timeout, FunctionTimedOut
 from requests.adapters import HTTPAdapter
 from requests.cookies import RequestsCookieJar
 from requests.packages.urllib3.exceptions import InsecureRequestWarning
@@ -19,9 +20,9 @@ from feapder.db.redisdb import RedisDB
 from feapder.network import user_agent
 from feapder.network.proxy_pool import ProxyPool
 from feapder.network.response import Response
-from feapder.utils.log import log
+from feapder.utils.log import Log
 from feapder.utils.webdriver import WebDriverPool
-
+log = Log()
 # 屏蔽warning信息
 requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
 
@@ -247,6 +248,7 @@ class Request(object):
             else self.callback
         )
 
+    @func_set_timeout(30)
     def get_response(self, save_cached=False):
         """
         获取带有selector功能的response
@@ -397,7 +399,7 @@ class Request(object):
 
         if save_cached:
             self.save_cached(response, expire_time=self.__class__.cached_expire_time)
-
+        log.info("requests",extra={"url":response.url,"code":response.status_code})
         return response
 
     def proxies(self):
@@ -485,7 +487,12 @@ class Request(object):
         response_dict = self._cache_db.strget(self._cached_redis_key)
         if not response_dict:
             log.info("无response缓存  重新下载")
-            response_obj = self.get_response(save_cached=save_cached)
+            try:
+                response_obj = self.get_response(save_cached=save_cached)
+            except FunctionTimedOut:
+                log.info("请求超时")
+                log.info("requests", extra={"url": self.url, "code": 0})
+
         else:
             response_dict = eval(response_dict)
             response_obj = Response.from_dict(response_dict)

+ 13 - 31
FworkSpider/feapder/templates/spider_list_template.tmpl

@@ -2,14 +2,14 @@
 """
 Created on {DATE}
 ---------
-@summary: ${spider_name}
+@summary:
 ---------
 @author: {USER}
 """
-import sys
-sys.path.append('/app/spiders/sword_feapder/FworkSpider')
+
 import feapder
-from items.spider_item import DataBakItem,MgpListItem,ListItem
+from items.spider_item import DataBakItem,MgpListItem
+from untils.proxy_pool import ProxyPool
 from feapder.dedup import Dedup
 from collections import namedtuple
 
@@ -17,20 +17,21 @@ from collections import namedtuple
 class ${spider_name}(feapder.Spider):
 
     def start_callback(self):
+         self.count = 0
          Menu = namedtuple('Menu', ['channel', 'code', 'types', 'crawl_page'])
 
          self.menus = [
-             Menu('${spider_name}抓取栏目', '${spider_name}爬虫code', "自定义参数", 1),
-             Menu('${spider_name}抓取栏目', '${spider_name}爬虫code', "Notice", 1),
+             Menu('${spider_name}', '${spider_name}', "Notice", 1),
+             Menu('${spider_name}', '${spider_name}', "Notice", 1),
          ]
     def start_requests(self):
          for menu in self.menus:
-             for page in range(1,menu.crawl_page+1):
-                 start_url = f''
-                 yield feapder.Request(url=start_url, item=menu._asdict(),proxies=False)
+            start_url = f''
+            yield feapder.Request(url=start_url, item=menu._asdict())
 
     def parse(self, request, response):
         menu = request.item
+        self.count += 1   # 一个计数器
         dedup = Dedup(Dedup.BloomFilter)
         href_list = []
         info_list = []
@@ -55,34 +56,15 @@ class ${spider_name}(feapder.Spider):
             list_item.parse = "self.detail_get"
             list_item.parser_name = "details"
             list_item.item = data_item.to_dict
-            list_item.deal_detail = ['//div[@class="****"]',"*****"]
-            list_item.proxies = False
+            list_item.xpath = ['//****',"*****"]
+            list_item.author = "****"
             list_item.parse_url = href
-            list_item.pri = 1
-            list.files={
-                "list_xpath":'//div[@class="notice-foot"]/a',
-                "url_xpath":'./@href',
-                "name_xpath":'./text()',
-                "files_type":('zip','doxc','ftp'),
-                "file_type":'zip',
-                "url_key":'attachmentDownload',
-                # "host":'http',
-                "kwargs":{"headers": {
-                    "user-agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36"
-                }}
             href_list.append(href)
             yield list_item
-        list = ListItem()
-        list.site = self.site
-        list.channel = menu.get("channel")
-        list.spidercode = menu.get("code")
-        list.url = request.url
-        list.count = len(info_list)
-        list.rel_count = len(href_list)
         dedup.add(href_list)
 
     def end_callback(self):
         print("爬虫结束")
 
 if __name__ == "__main__":
-    ${spider_name}(redis_key="{USER}:${spider_name}").start()
+    ${spider_name}(redis_key="fwork:${spider_name}").start()

+ 1 - 1
FworkSpider/feapder/templates/spider_template.tmpl

@@ -64,4 +64,4 @@ class ${spider_name}(feapder.Spider):
         return request
 
 if __name__ == "__main__":
-    ${spider_name}(redis_key="{USER}:${spider_name}").start()
+    ${spider_name}(redis_key="fwork:${spider_name}").start()

+ 17 - 28
FworkSpider/feapder/utils/aliyun.py

@@ -56,7 +56,7 @@ class UploadOSS:
                 else:
                     return "{:.1f} kb".format(_kb)
 
-    def get_state(self, attachment,count=0, **kwargs):
+    def get_state(self, attachment, **kwargs):
         """
         下载附件并上传阿里oss
 
@@ -78,10 +78,7 @@ class UploadOSS:
                 if not os.path.exists(img_dir):
                     os.makedirs(img_dir, mode=0o777, exist_ok=True)
                 # 打开目录,放入下载的附件
-                filname = hashlib.md5(attachment["filename"].encode("utf-8"))
-                filname = filname.hexdigest() #加密1次
-                types = attachment["ftype"]
-                self.file_path = "{}/{}".format(img_dir, filname+'.'+types)
+                self.file_path = "{}/{}".format(img_dir, attachment["filename"])
                 with open(self.file_path, 'wb') as f:
                     f.write(self.file_stream)
                 # 上传附件
@@ -92,16 +89,13 @@ class UploadOSS:
                 # 返回附件上传处理信息
                 return file_state
             else:
-                if count<3:
-                    self.post_state(attachment,count=count+1, **kwargs)
-                else:
-                    # attachment["ftype"] = str(attachment["filename"]).split(".")[1]
-                    attachment["url"] = 'oss'
-                    attachment["fid"] = self.fid + "." + attachment["ftype"]
-                    attachment["size"] = '0kb'
-                    attachment["false"] = True
-                    return attachment
-    def post_state(self, attachment,count=0, **kwargs):
+                attachment["ftype"] = str(attachment["filename"]).split(".")[1]
+                attachment["url"] = 'oss'
+                attachment["fid"] = self.fid + "." + attachment["ftype"]
+                attachment["size"] = '0kb'
+                attachment["false"] = True
+                return attachment
+    def post_state(self, attachment, **kwargs):
         """
         下载附件并上传阿里oss
 
@@ -122,10 +116,7 @@ class UploadOSS:
                 if not os.path.exists(img_dir):
                     os.makedirs(img_dir, mode=0o777, exist_ok=True)
                 # 打开目录,放入下载的附件
-                filname = hashlib.md5(attachment["filename"].encode("utf-8"))
-                filname = filname.hexdigest()  # 加密1次
-                types = attachment["ftype"]
-                self.file_path = "{}/{}".format(img_dir, filname + '.' + types)
+                self.file_path = "{}/{}{}".format(img_dir,time.time(),attachment["filename"])
 
                 with open(self.file_path, 'wb') as f:
                     f.write(self.file_stream)
@@ -137,14 +128,12 @@ class UploadOSS:
                 # 返回附件上传处理信息
                 return file_state
             else:
-                if count<3:
-                    self.post_state(attachment,count=count+1, **kwargs)
-                else:
-                    attachment["url"] = 'oss'
-                    attachment["fid"] = self.fid + "." + attachment["ftype"]
-                    attachment["size"] = '0kb'
-                    attachment["false"] = True
-                    return attachment
+                attachment["ftype"] = str(attachment["filename"]).split(".")[1]
+                attachment["url"] = 'oss'
+                attachment["fid"] = self.fid + "." + attachment["ftype"]
+                attachment["size"] = '0kb'
+                attachment["false"] = True
+                return attachment
 
     def put_oss_from_local(self):
         """上传一个本地文件到阿里OSS的普通文件"""
@@ -159,7 +148,7 @@ class UploadOSS:
         @param attachment: 附件
         @return: 附件上传处理信息
         """
-        # attachment["ftype"] = str(attachment["filename"]).split(".")[1]
+        attachment["ftype"] = str(attachment["filename"]).split(".")[1]
         attachment["url"] = 'oss'
         attachment["fid"] = self.fid + "." + attachment["ftype"]
         attachment["size"] = self.file_size

+ 47 - 41
FworkSpider/feapder/utils/log.py

@@ -10,11 +10,10 @@ Created on 2018-12-08 16:50
 import logging
 import os
 import sys
-import time
 from logging.handlers import BaseRotatingHandler
 
+import logstash
 import loguru
-import pymongo
 from better_exceptions import format_exception
 
 import feapder.setting as setting
@@ -41,47 +40,45 @@ class RotatingFileHandler(BaseRotatingHandler):
         self.max_bytes = max_bytes
         self.backup_count = backup_count
         self.placeholder = str(len(str(backup_count)))
-        self._to_db = None
-        self.filename = filename
-
-
-    @property
-    def to_db(self):
-        if not self._to_db:
-            self._to_db = pymongo.MongoClient(setting.MONGO_IP, setting.MONGO_PORT)
-
-        return self._to_db.pyspider
 
+    def doRollover(self):
+        if self.stream:
+            self.stream.close()
+            self.stream = None
+        if self.backup_count > 0:
+            for i in range(self.backup_count - 1, 0, -1):
+                sfn = ("%0" + self.placeholder + "d.") % i  # '%2d.'%i -> 02
+                sfn = sfn.join(self.baseFilename.split("."))
+                # sfn = "%d_%s" % (i, self.baseFilename)
+                # dfn = "%d_%s" % (i + 1, self.baseFilename)
+                dfn = ("%0" + self.placeholder + "d.") % (i + 1)
+                dfn = dfn.join(self.baseFilename.split("."))
+                if os.path.exists(sfn):
+                    # print "%s -> %s" % (sfn, dfn)
+                    if os.path.exists(dfn):
+                        os.remove(dfn)
+                    os.rename(sfn, dfn)
+            dfn = (("%0" + self.placeholder + "d.") % 1).join(
+                self.baseFilename.split(".")
+            )
+            if os.path.exists(dfn):
+                os.remove(dfn)
+            # Issue 18940: A file may not have been created if delay is True.
+            if os.path.exists(self.baseFilename):
+                os.rename(self.baseFilename, dfn)
+        if not self.delay:
+            self.stream = self._open()
 
     def shouldRollover(self, record):
-        parmars = {
-            "spider_name":record.name,
-            "msg":record.msg,
-            "Message":str(record.getMessage)
-        }
-        if record.levelname == "ERROR":
-            crawl_type = 'list'
-            if 'detail' in record.name:
-                crawl_type = 'detail'
-            url = ''
-            item={
-                "recordname":record.name,
-                "spidercode":"spidercode",
-                "author":self.filename,
-                "account":"",
-                "crawl_time":time.time(),
-                "crawl_type": crawl_type,
-                "status_code":"status_code",
-                "url":url,
-                "reason":record.msg,
-                'parmars': parmars,
-            }
-
-            # print('<<<<<<<<<<<<<<<<<<<<<<<插入error_info')
-            # print(item)
-            # print(self.to_db.error_info)
-            # self.to_db.error_info.insert_one(item)
 
+        if self.stream is None:  # delay was set...
+            self.stream = self._open()
+        if self.max_bytes > 0:  # are we rolling over?
+            msg = "%s\n" % self.format(record)
+            self.stream.seek(0, 2)  # due to non-posix-compliant Windows feature
+            if self.stream.tell() + len(msg) >= self.max_bytes:
+                return 1
+        return 0
 
 
 def get_logger(
@@ -90,6 +87,7 @@ def get_logger(
     log_level=None,
     is_write_to_console=None,
     is_write_to_file=None,
+    is_send_to_logstash = None,
     color=None,
     mode=None,
     max_bytes=None,
@@ -113,6 +111,7 @@ def get_logger(
     @result:
     """
     # 加载setting里最新的值
+    # name = os.path.split(os.getcwd())[-1]
     name = name or setting.LOG_NAME
     path = path or setting.LOG_PATH
     log_level = log_level or setting.LOG_LEVEL
@@ -126,6 +125,11 @@ def get_logger(
         if is_write_to_file is not None
         else setting.LOG_IS_WRITE_TO_FILE
     )
+    is_send_to_logstash = (
+        is_send_to_logstash
+        if is_send_to_logstash is not None
+        else setting.LOG_IS_SEND_TO_LOGSTASH
+    )
     color = color if color is not None else setting.LOG_COLOR
     mode = mode or setting.LOG_MODE
     max_bytes = max_bytes or setting.LOG_MAX_BYTES
@@ -144,8 +148,8 @@ def get_logger(
 
     # 定义一个RotatingFileHandler,最多备份5个日志文件,每个日志文件最大10M
     if is_write_to_file:
-        # if path and not os.path.exists(os.path.dirname(path)):
-        #     os.makedirs(os.path.dirname(path))
+        if path and not os.path.exists(os.path.dirname(path)):
+            os.makedirs(os.path.dirname(path))
 
         rf_handler = RotatingFileHandler(
             path,
@@ -156,6 +160,8 @@ def get_logger(
         )
         rf_handler.setFormatter(formatter)
         logger.addHandler(rf_handler)
+    if is_send_to_logstash:
+        logger.addHandler(logstash.TCPLogstashHandler(setting.LOGSTASH_IP, setting.LOGSTASH_PORT, version=1))
     if color and is_write_to_console:
         loguru_handler = InterceptHandler()
         loguru_handler.setFormatter(formatter)

+ 1 - 1
FworkSpider/feapder/utils/redis_lock.py

@@ -107,7 +107,7 @@ class RedisLock:
                 time.sleep(1)
                 continue
             self.redis_conn.expire(self.lock_key, expire + 5)  # 延长5秒
-            time.sleep(expire)  # 临过期5秒前,再次延长
+            time.sleep(5)  # 临过期5秒前,再次延长
             spend_time += expire
             if self.lock_timeout and spend_time > self.lock_timeout:
                 log.info("锁超时,释放")

+ 4 - 1
FworkSpider/feapder/utils/tools.py

@@ -7,6 +7,7 @@ Created on 2018-09-06 14:21
 @author: Boris
 @email: boris_liu@foxmail.com
 """
+print('123木头人')
 import asyncio
 import calendar
 import codecs
@@ -47,6 +48,7 @@ from w3lib.url import canonicalize_url as _canonicalize_url
 import feapder.setting as setting
 from feapder.utils.email_sender import EmailSender
 from feapder.utils.log import log
+
 os.environ["EXECJS_RUNTIME"] = "Node"  # 设置使用node执行js
 
 # 全局取消ssl证书验证
@@ -56,7 +58,8 @@ TIME_OUT = 30
 TIMER_TIME = 5
 
 redisdb = None
-
+def ccmu():
+    print('sss')
 
 def get_redisdb():
     global redisdb

+ 13 - 5
FworkSpider/items/spider_item.py

@@ -1,9 +1,8 @@
 from feapder import Item
-from untils.tools import int2long,substitute,text_search
+from untils.tools import int2long,substitute,text_search,CheckPrePareRequest
 import time
 from feapder.utils.log import log
 from feapder.utils.tools import get_current_date
-from crawlab import save_item
 from datetime import datetime
 import os
 from feapder import setting
@@ -63,9 +62,7 @@ class DataBakItem(Item):
             if text_search(self.detail).total == 0:
                 # 无正文内容时,该内容直接标记true, 不在被统计
                 self.sendflag = "true"
-        save_item({"site": self.site, "title": self.title,"error":False,
-                   "spidercode":self.spidercode,"channel":self.channel,
-                   })
+
 
 
 class MgpListItem(Item):
@@ -76,6 +73,7 @@ class MgpListItem(Item):
         self.item = "" # 传过来的参数
         self.parser_name = "" # 处理详情页的爬虫名
         self.date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 当前日期时间
+        self.comeintime = int2long(int(time.time())) # 当前日期时间戳
         self.deal_detail = [] # 定义解析详情页主页内容的解析,detail_get是一个xpath列表,detail_post 则是一段处理代码
         self.create_time = None # 定义解析详情页发布时间的xpath,列表页无发布时间时应用
         self.parse_url = "" # 定义解析详情页主页内容的xpath
@@ -89,10 +87,19 @@ class MgpListItem(Item):
         self.proxies = True # 爬虫报警级 可分9级
         self.files = False # 附件采集配置
         self.error = None
+        self.spidercode = ""
         # self.error_info =
     def pre_to_db(self):
         # 生成入库时间戳(秒级), 定义为long型
         self.author = os.path.basename(os.getcwd())
+        self.spidercode = self.item.get("spidercode")
+
+        if "通知公告" in self.item.get("channel"):
+            CheckPrePareRequest().check_crawl_title(self.item.get("title"))
+        elif "公告公示" in self.item.get("channel"):
+            CheckPrePareRequest().check_crawl_title(self.item.get("title"))
+        #  '''
+
 class ListItem(Item):
     def __init__(self):
         self.spidercode = ""  # 爬虫代码(编辑器爬虫平台定义)
@@ -100,6 +107,7 @@ class ListItem(Item):
         self.channel = ""  # 采集的版块(编辑器爬虫平台定义)
         self.url = ''
         self.count=0
+        self.code=-1
         self.rel_count = 0
 
     def pre_to_db(self):

+ 7 - 46
FworkSpider/mongo_pipeline.py

@@ -9,12 +9,13 @@ Created on 2021-04-18 14:12:21
 """
 from typing import Dict, List, Tuple
 import time
-from feapder.db.mongodb import MongoDB
+# from feapder.db.mongodb import MongoDB
+from feapder.db.redisdb import RedisDB
 from feapder.dedup import Dedup
 from feapder.pipelines import BasePipeline
 from feapder.utils.log import log
 from untils.tools import *
-from crawlab import save_item
+# from crawlab import save_item
 
 
 
@@ -25,7 +26,7 @@ class MongoPipeline(BasePipeline):
     @property
     def to_db(self):
         if not self._to_db:
-            self._to_db = MongoDB()
+            self._to_db = RedisDB()
 
         return self._to_db
 
@@ -40,56 +41,16 @@ class MongoPipeline(BasePipeline):
                  若False,不会将本批数据入到去重库,以便再次入库
         """
         try:
-            print(table)
-            add_count = self.to_db.add_batch(coll_name=table, datas=items)
-            for item in items:
-                dedup = Dedup(Dedup.BloomFilter)
-                dedup.add([item.get("href")])
-                # save_item({'count':item.get("href")})
+            add_count = self.to_db.lpush(table="savemongo:"+table, values=items)
+            # add_count = self.to_db.lpop(table="savemongo:"+table, values=items)
             datas_size = len(items)
             log.info(
                 "共导出 %s 条数据到 %s,  新增 %s条, 重复 %s 条"
-                % (datas_size, table, add_count, datas_size - add_count)
+                % (datas_size, table, len(items), datas_size - len(items))
             )
-            if table == "mgp_list":
-                save_item({"site": "新增/回填", "title": add_count})
 
             return True
         except Exception as e:
             log.exception(e)
             return False
 
-    def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
-        """
-        更新数据
-        Args:
-            table: 表名
-            items: 数据,[{},{},...]
-            update_keys: 更新的字段, 如 ("title", "publish_time")
-
-        Returns: 是否更新成功 True / False
-                 若False,不会将本批数据入到去重库,以便再次入库
-
-        """
-        try:
-            add_count = self.to_db.add_batch(
-                coll_name=table,
-                datas=items,
-                update_columns=update_keys or list(items[0].keys()),
-            )
-            datas_size = len(items)
-            update_count = datas_size - add_count
-            msg = "共导出 %s 条数据到 %s,  新增 %s 条, 更新 %s 条" % (
-                datas_size,
-                table,
-                add_count,
-                update_count,
-            )
-            if update_keys:
-                msg += " 更新字段为 {}".format(update_keys)
-            log.info(msg)
-
-            return True
-        except Exception as e:
-            log.exception(e)
-            return False

+ 14 - 13
FworkSpider/untils/WebCookiePool.py

@@ -3,13 +3,11 @@ import sys
 import requests
 import re,execjs
 
-
-sys.path.append('C:/Users/topnet/Desktop/crawlab_feader/FworkSpider')
 sys.path.append('/app/spiders/sword_feapder/FworkSpider')
 # from utils.cookie_pool import PageCookiePool
-from feapder.utils.webdriver import WebDriverPool
+from feapder.utils.webdriver import WebDriver
 from feapder.utils.log import log
-from FworkSpider.untils.cookie_pool import PageCookiePool
+from untils.cookie_pool import PageCookiePool
 
 class WebCookiePool(PageCookiePool):
     def __init__(self, redis_key, page_url=None,cookie_key=None,
@@ -21,30 +19,33 @@ class WebCookiePool(PageCookiePool):
         self._kwargs = kwargs
         self._kwargs.setdefault("load_images", False)
         self._kwargs.setdefault("headless", True)
-        self._kwargs.setdefault("executable_path", "D:\\geckodriver.exe")
         self._kwargs.setdefault("driver_type", "FIREFOX")
 
-
     def create_cookie(self):
-        with WebDriverPool(**self._kwargs).get() as driver_pool:
-            # driver = driver_pool.driver
-            driver_pool.get(self.page_url)
+        with WebDriver(**self._kwargs) as driver_pool:
             import time
+            # time.sleep(1111)
             try:
+                # driver_pool = self.driver_pool.get()
+                driver_pool.get(self.page_url)
                 count = 0
                 while self.cookie_key not in driver_pool.cookies.keys():
                     time.sleep(1)
                     count+=1
                     if count>=30:
-                        # driver_pool.close()
                         return
                 cookies = driver_pool.cookies
-                # driver_pool.close()
                 return cookies
-                # driver_pool.close()
             except Exception as e:
                 log.error(f"获取cookie失败,{e}")
 
 
 if __name__ == '__main__':
-    WebCookiePool(redis_key='gdcookie',cookie_key='SUB',page_url="https://weibo.com/p/1005051203448454/home?from=page_100505_profile&wvr=6&mod=data&is_all=1#place").create_cookie()
+    for i in range(10):
+        print(f'开始第{i+1}次获取cookie')
+        if i%3==0:
+            WebCookiePool(redis_key='gdcookie',cookie_key='SUB',page_url="https://weibo.com/p/1005051203448454/home?from=page_100505_profile&wvr=6&mod=data&is_all=1#place").create_cookie()
+        elif i%3==1:
+            WebCookiePool(redis_key='gd2cookie',cookie_key='locale',page_url="https://www.jianshu.com/p/4c5bc85fc3fd").create_cookie()
+        else:
+            WebCookiePool(redis_key='gd3cookie',cookie_key='cna',page_url="https://docs-next.crawlab.cn/zh/guide/installation/docker.html#%E5%A4%96%E9%83%A8-mongodb").create_cookie()

+ 0 - 22
FworkSpider/untils/__init__.py

@@ -1,22 +0,0 @@
-
-
-
-'''
-    时间一晃而过,转眼间两已经入职近三个月,我有幸来到公司剑雨产品部工作,在这短暂的三个月中,在公司领导的亲切关怀和指导下,在同事们的热情帮助下我很快的熟悉了公司环境,
-适应了新的工作岗位,现将我试用期的工作情况简要小结如下
-    一、严格遵守公司各项规章制度。上班开始,我认真学习了公司《员工手册》及各项管理制度,并严格遵守,做到了无违规现象。
-    二、主动学习、尽快适应,迅速熟悉环境和工作内容。首先从尽快熟悉工作环境和工作内容;其次,主动、虚心向主管、同事请教、学习,基本掌握了日常上班的工作内容,工作流程、工作方法。
-    三、工作积极、认真、负责,通过不断学习、虚心请教,总结积累,较好的完成了领导安排的各项工作任务。
-        1、开发爬虫管理平台
-        2、搭建定制爬虫框架,开发通用模块、伪代码生成器,以达到提升开发效率的目标
-        3、实现管理平台的线上部署与基础测试,目前已部署爬虫15个,且正常运行中
-        4、编写发文档、在小组内进行相关人员的培训,让小组的人一起来对这个框架和管理平台进行测评
-        5、日常数据采集,目前开发共三十个平台爬虫,涉及一百多个栏目,数据采集量达二十多万
-    四、与同事之间和谐相处、加强沟通、团结协作,以尽快更好的融入团队。
-    五、存在问题及解决办法:
-        1、与同事间的沟通交流较少,以后要加强同事间的沟通交流
-        2、js反爬比较能力不够强,以后多学习js相关知识,提高js反爬能力
-        3、逻辑不够严谨,仔细仔细再仔细,
-
-
-'''

+ 60 - 80
FworkSpider/untils/attachment.py

@@ -7,8 +7,6 @@ from urllib.parse import urlparse, unquote
 
 import requests
 import urllib3
-import sys
-sys.path.append('C:/Users/topnet/Desktop/crawlab_feader/FworkSpider')
 
 from feapder.setting import headers
 from untils.execptions import AttachmentNullError
@@ -18,33 +16,26 @@ from untils.proxy_pool import ProxyPool
 urllib3.disable_warnings()
 
 
-def sha1(val):
-    _sha1 = hashlib.sha1()
+def hex_sha1(val):
+    sha1 = hashlib.sha1()
     if isinstance(val, bytes):
-        _sha1.update(str(val).encode("utf-8"))
+        sha1.update(str(val).encode("utf-8"))
     elif isinstance(val, str):
-        _sha1.update(val.encode("utf-8"))
-    return _sha1.hexdigest()
+        sha1.update(val.encode("utf-8"))
+    res = sha1.hexdigest()
+    return res
 
 
-def remove(file_path: str):
-    os.remove(file_path)
-
-
-def getsize(file_path: str):
-    try:
-        return os.path.getsize(file_path)
-    except FileNotFoundError:
-        return 0
-
+def extract_file_type(text):
+    if text is None:
+        return None
 
-def discern_file_format(text):
     file_types = {
-        'pdf', 'doc', 'docx', 'rar', 'zip', 'gzzb', 'jpg', 'png', 'swf'
+        'pdf', 'doc', 'docx', 'rar', 'zip', 'gzzb', 'jpg', 'png'
     }
     for file_type in file_types:
-        all_file_format = [file_type, file_type.upper()]
-        for t in all_file_format:
+        tmp = [file_type, file_type.upper()]
+        for t in tmp:
             result = re.match(f'.*{t}$', text, re.S)
             if result is not None:
                 return t
@@ -52,14 +43,7 @@ def discern_file_format(text):
         return None
 
 
-def extract_file_type(text):
-    if text is None:
-        return None
-    return discern_file_format(text)
-
-
-def extract_file_name_by_href(href: str, file_type: str):
-    """从url中抽取文件名称"""
+def extract_file_name(href: str, file_type: str):
     # 中文标点符号:[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]
     # 中文字符:[\u4e00 -\u9fa5]
     zh_char_pattern = '[\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b\u4e00-\u9fa5]+'
@@ -74,56 +58,29 @@ def extract_file_name_by_href(href: str, file_type: str):
     return None
 
 
-def extract_file_name(text):
-    file_type = discern_file_format(text)
-    if file_type is not None:
-        repl = '.{}'.format(file_type)
-        text = text.replace(repl, '')
-    return text
-
-
 def verify_file_name(name):
     if extract_file_type(name) is None:
         raise ValueError
 
 
-class AttachmentNullError(Exception):
-
-    def __init__(self, code: int = 10004, reason: str = '附件下载异常', **kwargs):
-        self.code = code
-        self.reason = reason
-        self.err_details = kwargs
-        for key, val in kwargs.items():
-            setattr(self, key, val)
-
-
 class AttachmentDownloader:
 
     def __init__(self):
-        self.dir_name = 'file'
+        self.dir_name = '/file'
+
+    def create_dir(self):
+        if not os.path.exists(self.dir_name):
+            os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
 
-    def get_file_path(self, filename, file_type):
-        os.makedirs(self.dir_name, mode=0o777, exist_ok=True)
-        sha1_name = sha1("{}_{}".format(filename, uuid.uuid4()))
-        tmp_name = "{}.{}".format(sha1_name, file_type)
+    def create_file_path(self, filename, file_type):
+        self.create_dir()
+        sign = hex_sha1("{}_{}".format(filename, uuid.uuid4()))
+        tmp_name = "{}.{}".format(sign, file_type)
         return "{}/{}".format(self.dir_name, tmp_name)
 
     @staticmethod
     def create_fid(file_stream: bytes):
-        return sha1(file_stream)
-
-    @staticmethod
-    def file_size(file_path: str):
-        _kb = float(getsize(file_path)) / 1024
-        if _kb >= 1024:
-            _M = _kb / 1024
-            if _M >= 1024:
-                _G = _M / 1024
-                return "{:.1f} G".format(_G)
-            else:
-                return "{:.1f} M".format(_M)
-        else:
-            return "{:.1f} kb".format(_kb)
+        return hex_sha1(file_stream)
 
     @staticmethod
     def _fetch_attachment(
@@ -162,6 +119,29 @@ class AttachmentDownloader:
                 retries += 1
         return b''
 
+    @staticmethod
+    def clean_attachment(file_path):
+        os.remove(file_path)
+
+    @staticmethod
+    def getsize(file_path: str):
+        def _getsize(filename):
+            try:
+                return os.path.getsize(filename)
+            except:
+                return 0
+
+        _kb = float(_getsize(file_path)) / 1024
+        if _kb >= 1024:
+            _M = _kb / 1024
+            if _M >= 1024:
+                _G = _M / 1024
+                return "{:.1f} G".format(_G)
+            else:
+                return "{:.1f} M".format(_M)
+        else:
+            return "{:.1f} kb".format(_kb)
+
     def fetch_attachment(
             self,
             file_name: str,
@@ -174,7 +154,7 @@ class AttachmentDownloader:
         if not file_name or not file_type or not download_url:
             raise AttachmentNullError
 
-        file_path = self.get_file_path(file_name, file_type)
+        file_path = self.create_file_path(file_name, file_type)
         file_stream = self._fetch_attachment(
             download_url,
             file_path,
@@ -184,35 +164,35 @@ class AttachmentDownloader:
         )
         if len(file_stream) > 0:
             fid = self.create_fid(file_stream)
-            '''上传/下载,无论失败/成功最终返回附件信息'''
+            '''上传/下载,无论失败成功都需要给出文件基础信息'''
             try:
                 result = {
-                    'filename': '{}.{}'.format(file_name, file_type),
+                    'filename': file_name,
                     'ftype': file_type,
                     'fid': "{}.{}".format(fid, file_type),
                     'org_url': download_url,
-                    'size': self.file_size(file_path),
+                    'size': self.getsize(file_path),
                     'url': 'oss',
                 }
                 AliYunService().push_oss_from_local(result['fid'], file_path)
             except Exception:
                 result = {
-                    'filename': '{}.{}'.format(file_name, file_type),
+                    'filename': file_name,
                     'org_url': download_url,
                 }
+            self.clean_attachment(file_path)
         else:
             result = {
-                'filename': '{}.{}'.format(file_name, file_type),
+                'filename': file_name,
                 'org_url': download_url,
             }
-        remove(file_path)
         return result
 
 
-if __name__ == '__main__':
-    a = AttachmentDownloader().fetch_attachment(
-        file_name='成建制移民村(五标段)合同',
-        file_type='pdf',
-        download_url='http://222.75.70.90/NXGPPSP_MG/downloadFileServlet?req=F&num=8b80b23f7e729b88017e758a1b03422c'
-    )
-    print(a)
+# if __name__ == '__main__':
+    # a = AttachmentDownloader().fetch_attachment(
+    #     file_name='成建制移民村(五标段)合同',
+    #     file_type='pdf',
+    #     download_url='http://222.75.70.90/NXGPPSP_MG/downloadFileServlet?req=F&num=8b80b23f7e729b88017e758a1b03422c'
+    # )
+    # print(a)

+ 10 - 3
FworkSpider/untils/cookie_pool.py

@@ -16,13 +16,13 @@ import warnings
 from collections import Iterable
 from enum import Enum, unique
 import requests
+from feapder.db.mongodb import MongoDB
 
 import feapder.utils.tools as tools
 from feapder import setting
 from feapder.network import user_agent
 
 from feapder.db.mysqldb import MysqlDB
-from feapder.db.mongodb import MongoDB
 from feapder.db.redisdb import RedisDB
 from feapder.utils import metrics
 from feapder.utils.log import log
@@ -178,7 +178,7 @@ class PageCookiePool(CookiePoolInterface):
             try:
                 cookie_info = self._redisdb.rpoplpush(self._tab_cookie_pool)
                 if not cookie_info and wait_when_null:
-                    log.info("暂无cookie 生产中...")
+                    log.info("暂无cookie 生产中..."+self._tab_cookie_pool)
                     self._keep_alive = False
                     self._min_cookies = 1
                     with RedisLock(
@@ -291,7 +291,7 @@ class LoginCookiePool(CookiePoolInterface):
             try:
                 user_cookie = self._redisdb.rpoplpush(self._tab_cookie_pool)
                 if not user_cookie and wait_when_null:
-                    log.info("暂无cookie 生产中...")
+                    log.info("暂无cookie 生产中..."+self._tab_cookie_pool)
                     self.login()
                     continue
 
@@ -785,3 +785,10 @@ class LimitTimesUserPool:
         for limit_times_user in self.limit_times_users:
             if limit_times_user.username == username:
                 limit_times_user.record_user_status(LimitTimesUserStatus.EXCEPTION)
+
+# if __name__ == '__main__':
+#     cookiepool = PageCookiePool(redis_key='fwork:gszfcg',
+#                                 page_url='http://www.ccgp-hubei.gov.cn/notice/cgyxgg/index_1.html',
+#                                 driver_type='FIREFOX',
+#                                 executable_path="D:\\geckodriver.exe")
+#     cookiepool.create_cookie()

+ 0 - 7
FworkSpider/untils/create_menus.py

@@ -1,10 +1,3 @@
-# -*- coding: utf-8 -*-
-"""
-Created on 2021-12-13 10:04:03
----------
-@summary:  快捷创建meaus配置
-
-"""
 from feapder.db.mongodb import MongoDB
 
 

+ 69 - 0
FworkSpider/untils/tools.py

@@ -6,6 +6,7 @@ import requests
 from setting import WECHAT_WARNING_URL,WECHAT_WARNING_PHONE,WARNING_INTERVAL,WECHAT_WARNING_ALL
 import bson
 from feapder.utils.log import log
+from feapder.db.mongodb import MongoDB
 
 
 SearchText = namedtuple('SearchText', ['total'])
@@ -116,6 +117,14 @@ def int2long(param: int):
     """int 转换成 long """
     return bson.int64.Int64(param)
 
+def get_spiders(menus):
+    db = MongoDB(db="editor")
+    for menu in menus:
+        spider_info = db.find('luaconfig',{"code":menu.code})
+        if len(spider_info) >0:
+            if spider_info[0].get("state") not in (11,):
+                menus.remove(menu)
+
 def wechat_warning(
     message,
     message_prefix=None,
@@ -161,3 +170,63 @@ def wechat_warning(
     except Exception as e:
         log.error("报警发送失败。 报警内容 {}, error: {}".format(message, e))
         return False
+
+class JyBasicException(Exception):
+
+    def __init__(self, code: int, reason: str, **kwargs):
+        self.code = code
+        self.reason = reason
+        self.err_details = kwargs
+        for key, val in kwargs.items():
+            setattr(self, key, val)
+
+class CustomCheckError(JyBasicException):
+
+    def __init__(self, code: int = 10002, reason: str = '特征条件检查异常', **kwargs):
+        self.code = code
+        self.reason = reason
+        self.err_details = kwargs
+        for key, val in kwargs.items():
+            setattr(self, key, val)
+class CheckPrePareRequest:
+
+    def __init__(self):
+        self.crawl_keywords = {
+            '招标', '流标', '评标', '询价', '中标候选人', '抽签', '谈判', '中选', '意见征询',
+            '更正公告', '废标', '补遗', '议价', '邀请', '资格预审', '竞标', '变更', '遴选',
+            '磋商', '项目', '评审', '询比', '开标', '澄清', '比选', '中止', '采购', '竟价',
+            '招投标', '拟建', '成交', '中标', '竞争性谈判', '工程', '验收公告', '更正',
+            '单一来源', '变更公告', '合同', '违规', '评判', '监理', '竞价', '答疑',
+            '终止', '系统'
+        }
+
+    @staticmethod
+    def check_es_cache(title: str, publish_time: int, rows: dict):
+        """
+
+        :param title:  标题
+        :param publish_time: 发布时间的时间戳(l_np_publishtime)
+        :param rows: 采集内容
+        """
+        pass
+        # retrieved_result = es_query(title, publish_time)
+        # if retrieved_result != 0:
+        #     '''es查询数据结果'''
+        #     rows['count'] = retrieved_result
+        #     raise CustomCheckError(code=10105, reason='标题内容已存在es')
+
+    def check_crawl_title(self, title: str):
+        for keyword in self.crawl_keywords:
+            valid_keyword = re.search(keyword, title)
+            if valid_keyword is not None:
+                break
+        else:
+            raise CustomCheckError(code=10106, reason='标题未检索到采集关键词', title=title)
+
+    def __check(self, rows: dict):
+        title, publish_time = rows['title'], rows['l_np_publishtime']
+        self.check_crawl_title(title)
+        self.check_es_cache(title, publish_time, rows)
+
+    def __call__(self, rows: dict, *args, **kwargs):
+        self.__check(rows)