在上一篇文章:Scrapy源码分析(二)运行入口,我们主要剖析了 Scrapy 是如何运行起来的核心逻辑,也就是在真正执行抓取任务之前,Scrapy 都做了哪些工作。
这篇文章,我们就来进一步剖析一下,Scrapy 有哪些核心组件?以及它们主要负责了哪些工作?这些组件为了完成这些功能,内部又是如何实现的。
爬虫类
我们接着上一篇结束的地方开始讲起。上次讲到 Scrapy 运行起来后,执行到最后到了 Crawler
的 crawl
方法,我们来看这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @defer.inlineCallbacks def crawl(self, *args, **kwargs): assert not self.crawling, "Crawling already taking place" self.crawling = True try: self.spider = self._create_spider(*args, **kwargs) self.engine = self._create_engine() start_requests = iter(self.spider.start_requests()) yield self.engine.open_spider(self.spider, start_requests) yield defer.maybeDeferred(self.engine.start) except Exception: if six.PY2: exc_info = sys.exc_info() self.crawling = False if self.engine is not None: yield self.engine.close() if six.PY2: six.reraise(*exc_info) raise
|
执行到这里,我们看到首先创建了爬虫实例,然后创建了引擎,最后把爬虫交给引擎来处理了。
在上一篇文章我们也讲到,在 Crawler
实例化时,会创建 SpiderLoader
,它会根据我们定义的配置文件 settings.py
找到存放爬虫的位置,我们写的爬虫代码都在这里。
然后 SpiderLoader
会扫描这些代码文件,并找到父类是 scrapy.Spider
爬虫类,然后根据爬虫类中的 name
属性(在编写爬虫时,这个属性是必填的),生成一个 {spider_name: spider_cls}
的字典,最后根据 scrapy crawl <spider_name>
命令中的 spider_name
找到我们写的爬虫类,然后实例化它,在这里就是调用了_create_spider
方法:
1 2 3
| def _create_spider(self, *args, **kwargs): return self.spidercls.from_crawler(self, *args, **kwargs)
|
实例化爬虫比较有意思,它不是通过普通的构造方法进行初始化,而是调用了类方法 from_crawler
进行的初始化,找到 scrapy.Spider
类:
1 2 3 4 5 6 7 8 9 10 11
| @classmethod def from_crawler(cls, crawler, *args, **kwargs): spider = cls(*args, **kwargs) spider._set_crawler(crawler) return spider def _set_crawler(self, crawler): self.crawler = crawler self.settings = crawler.settings crawler.signals.connect(self.close, signals.spider_closed)
|
在这里我们可以看到,这个类方法其实也是调用了构造方法,进行实例化,同时也拿到了 settings
配置,来看构造方法干了些什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class Spider(object_ref): name = None custom_settings = None
def __init__(self, name=None, **kwargs): if name is not None: self.name = name elif not getattr(self, 'name', None): raise ValueError("%s must have a name" % type(self).__name__) self.__dict__.update(kwargs) if not hasattr(self, 'start_urls'): self.start_urls = []
|
看到这里是不是很熟悉?这里就是我们平时编写爬虫类时,最常用的几个属性:name
、start_urls
、custom_settings
:
name
:在运行爬虫时通过它找到我们编写的爬虫类;
start_urls
:抓取入口,也可以叫做种子URL;
custom_settings
:爬虫自定义配置,会覆盖配置文件中的配置项;
引擎
分析完爬虫类的初始化后,还是回到 Crawler
的 crawl
方法,紧接着就是创建引擎对象,也就是 _create_engine
方法,看看初始化时都发生了什么?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class ExecutionEngine(object): """引擎""" def __init__(self, crawler, spider_closed_callback): self.crawler = crawler self.settings = crawler.settings self.signals = crawler.signals self.logformatter = crawler.logformatter self.slot = None self.spider = None self.running = False self.paused = False self.scheduler_cls = load_object(self.settings['SCHEDULER']) downloader_cls = load_object(self.settings['DOWNLOADER']) self.downloader = downloader_cls(crawler) self.scraper = Scraper(crawler) self._spider_closed_callback = spider_closed_callback
|
在这里我们能看到,主要是对其他几个核心组件进行定义和初始化,主要包括包括:Scheduler
、Downloader
、Scrapyer
,其中 Scheduler
只进行了类定义,没有实例化。
也就是说,引擎是整个 Scrapy 的核心大脑,它负责管理和调度这些组件,让这些组件更好地协调工作。
下面我们依次来看这几个核心组件都是如何初始化的?
调度器
调度器初始化发生在引擎的 open_spider
方法中,我们提前来看一下调度器的初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class Scheduler(object): """调度器""" def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None, logunser=False, stats=None, pqclass=None): self.df = dupefilter self.dqdir = self._dqdir(jobdir) self.pqclass = pqclass self.dqclass = dqclass self.mqclass = mqclass self.logunser = logunser self.stats = stats @classmethod def from_crawler(cls, crawler): settings = crawler.settings dupefilter_cls = load_object(settings['DUPEFILTER_CLASS']) dupefilter = dupefilter_cls.from_settings(settings) pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE']) dqclass = load_object(settings['SCHEDULER_DISK_QUEUE']) mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE']) logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS', settings.getbool('SCHEDULER_DEBUG')) return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser, stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass)
|
可以看到,调度器的初始化主要做了 2 件事:
- 实例化请求指纹过滤器:主要用来过滤重复请求;
- 定义不同类型的任务队列:优先级任务队列、基于磁盘的任务队列、基于内存的任务队列;
请求指纹过滤器又是什么?
在配置文件中,我们可以看到定义的默认指纹过滤器是 RFPDupeFilter
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class RFPDupeFilter(BaseDupeFilter): """请求指纹过滤器""" def __init__(self, path=None, debug=False): self.file = None self.fingerprints = set() self.logdupes = True self.debug = debug self.logger = logging.getLogger(__name__) if path: self.file = open(os.path.join(path, 'requests.seen'), 'a+') self.file.seek(0) self.fingerprints.update(x.rstrip() for x in self.file)
@classmethod def from_settings(cls, settings): debug = settings.getbool('DUPEFILTER_DEBUG') return cls(job_dir(settings), debug)
|
请求指纹过滤器初始化时,定义了指纹集合,这个集合使用内存实现的 Set
,而且可以控制这些指纹是否存入磁盘以供下次重复使用。
也就是说,指纹过滤器的主要职责是:过滤重复请求,可自定义过滤规则。
在下篇文章中我们会介绍到,每个请求是根据什么规则生成指纹的,然后是又如何实现重复请求过滤逻辑的,这里我们先知道它的功能即可。
下面来看调度器定义的任务队列都有什么作用?
调度器默认定义了 2 种队列类型:
- 基于磁盘的任务队列:在配置文件可配置存储路径,每次执行后会把队列任务保存到磁盘上;
- 基于内存的任务队列:每次都在内存中执行,下次启动则消失;
配置文件默认定义如下:
1 2 3 4 5 6
| SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'
SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'
SCHEDULER_PRIORITY_QUEUE = 'queuelib.PriorityQueue'
|
如果我们在配置文件中定义了 JOBDIR
配置项,那么每次执行爬虫时,都会把任务队列保存在磁盘中,下次启动爬虫时可以重新加载继续执行我们的任务。
如果没有定义这个配置项,那么默认使用的是内存队列。
细心的你可能会发现,默认定义的这些队列结构都是后进先出的,什么意思呢?
也就是在运行我们的爬虫代码时,如果生成一个抓取任务,放入到任务队列中,那么下次抓取就会从任务队列中先获取到这个任务,优先执行。
这么实现意味什么呢?其实意味着:Scrapy 默认的采集规则是深度优先!
如何改变这种机制,变为广度优先采集呢?这时候我们就要看一下 scrapy.squeues
模块了,在这里定义了很多种队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| PickleFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue, \ _pickle_serialize, pickle.loads)
PickleLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, \ _pickle_serialize, pickle.loads)
MarshalFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue, \ marshal.dumps, marshal.loads)
MarshalLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, \ marshal.dumps, marshal.loads)
FifoMemoryQueue = queue.FifoMemoryQueue
LifoMemoryQueue = queue.LifoMemoryQueue
|
如果我们想把抓取任务改为广度优先,我们只需要在配置文件中把队列类修改为先进先出队列类就可以了!从这里我们也可以看出,Scrapy 各个组件之间的耦合性非常低,每个模块都是可自定义的。
如果你想探究这些队列是如何实现的,可以参考 Scrapy 作者写的 scrapy/queuelib 项目,在 Github 上就可以找到,在这里有这些队列的具体实现。
下载器
回到引擎的初始化的地方,接下来我们来看,下载器是如何初始化的。
在默认的配置文件 default_settings.py
中,下载器配置如下:
1
| DOWNLOADER = 'scrapy.core.downloader.Downloader'
|
我们来看 Downloader
类的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| class Downloader(object): """下载器""" def __init__(self, crawler): self.settings = crawler.settings self.signals = crawler.signals self.slots = {} self.active = set() self.handlers = DownloadHandlers(crawler) self.total_concurrency = self.settings.getint('CONCURRENT_REQUESTS') self.domain_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN') self.ip_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_IP') self.randomize_delay = self.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY') self.middleware = DownloaderMiddlewareManager.from_crawler(crawler) self._slot_gc_loop = task.LoopingCall(self._slot_gc) self._slot_gc_loop.start(60)
|
在这个过程中,主要是初始化了下载处理器、下载器中间件管理器以及从配置文件中拿到抓取请求控制的相关参数。
那么下载处理器是做什么的?下载器中间件又负责哪些工作?
先来看 DownloadHandlers
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class DownloadHandlers(object): """下载器处理器""" def __init__(self, crawler): self._crawler = crawler self._schemes = {} self._handlers = {} self._notconfigured = {} handlers = without_none_values( crawler.settings.getwithbase('DOWNLOAD_HANDLERS')) for scheme, clspath in six.iteritems(handlers): self._schemes[scheme] = clspath
crawler.signals.connect(self._close, signals.engine_stopped)
|
下载处理器在默认的配置文件中是这样配置的:
1 2 3 4 5 6 7 8 9 10
| DOWNLOAD_HANDLERS = {}
DOWNLOAD_HANDLERS_BASE = { 'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler', 'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler', 'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler', 's3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler', 'ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler', }
|
看到这里你应该能明白了,下载处理器会根据下载资源的类型,选择对应的下载器去下载资源。其中我们最常用的就是 http
和 https
对应的处理器。
但是请注意,在这里,这些下载器是没有被实例化的,只有在真正发起网络请求时,才会进行初始化,而且只会初始化一次,后面文章会讲到。
下面我们来看下载器中间件 DownloaderMiddlewareManager
初始化过程,同样地,这里又调用了类方法 from_crawler
进行初始化,而且 DownloaderMiddlewareManager
继承了MiddlewareManager
类,来看它在初始化做了哪些工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| class MiddlewareManager(object): """所有中间件的父类,提供中间件公共的方法""" component_name = 'foo middleware' @classmethod def from_crawler(cls, crawler): return cls.from_settings(crawler.settings, crawler) @classmethod def from_settings(cls, settings, crawler=None): mwlist = cls._get_mwlist_from_settings(settings) middlewares = [] enabled = [] for clspath in mwlist: try: mwcls = load_object(clspath) if crawler and hasattr(mwcls, 'from_crawler'): mw = mwcls.from_crawler(crawler) elif hasattr(mwcls, 'from_settings'): mw = mwcls.from_settings(settings) else: mw = mwcls() middlewares.append(mw) enabled.append(clspath) except NotConfigured as e: if e.args: clsname = clspath.split('.')[-1] logger.warning("Disabled %(clsname)s: %(eargs)s", {'clsname': clsname, 'eargs': e.args[0]}, extra={'crawler': crawler})
logger.info("Enabled %(componentname)ss:\n%(enabledlist)s", {'componentname': cls.component_name, 'enabledlist': pprint.pformat(enabled)}, extra={'crawler': crawler}) return cls(*middlewares)
@classmethod def _get_mwlist_from_settings(cls, settings): raise NotImplementedError def __init__(self, *middlewares): self.middlewares = middlewares self.methods = defaultdict(list) for mw in middlewares: self._add_middleware(mw) def _add_middleware(self, mw): if hasattr(mw, 'open_spider'): self.methods['open_spider'].append(mw.open_spider) if hasattr(mw, 'close_spider'): self.methods['close_spider'].insert(0, mw.close_spider)
|
DownloaderMiddlewareManager
实例化过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class DownloaderMiddlewareManager(MiddlewareManager): """下载中间件管理器""" component_name = 'downloader middleware'
@classmethod def _get_mwlist_from_settings(cls, settings): return build_component_list( settings.getwithbase('DOWNLOADER_MIDDLEWARES'))
def _add_middleware(self, mw): if hasattr(mw, 'process_request'): self.methods['process_request'].append(mw.process_request) if hasattr(mw, 'process_response'): self.methods['process_response'].insert(0, mw.process_response) if hasattr(mw, 'process_exception'): self.methods['process_exception'].insert(0, mw.process_exception)
|
下载器中间件管理器继承了 MiddlewareManager
类,然后重写了 _add_middleware
方法,为下载行为定义默认的下载前、下载后、异常时对应的处理方法。
这里我们可以想一下,中间件这么做的好处是什么?
从这里能大概看出,从某个组件流向另一个组件时,会经过一系列中间件,每个中间件都定义了自己的处理流程,相当于一个个管道,输入时可以针对数据进行处理,然后送达到另一个组件,另一个组件处理完逻辑后,又经过这一系列中间件,这些中间件可再针对这个响应结果进行处理,最终输出。
Scraper
下载器实例化完了之后,回到引擎的初始化方法中,然后就是实例化 Scraper
,在Scrapy源码分析(一)架构概览这篇文章中我提到过,这个类没有在架构图中出现,但这个类其实是处于Engine
、Spiders
、Pipeline
之间,是连通这三个组件的桥梁。
我们来看一下它的初始化过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class Scraper(object):
def __init__(self, crawler): self.slot = None self.spidermw = SpiderMiddlewareManager.from_crawler(crawler) itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR']) self.itemproc = itemproc_cls.from_crawler(crawler) self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS') self.crawler = crawler self.signals = crawler.signals self.logformatter = crawler.logformatter
|
Scraper
创建了 SpiderMiddlewareManager
,它的初始化过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class SpiderMiddlewareManager(MiddlewareManager): """爬虫中间件管理器""" component_name = 'spider middleware'
@classmethod def _get_mwlist_from_settings(cls, settings): return build_component_list(settings.getwithbase('SPIDER_MIDDLEWARES'))
def _add_middleware(self, mw): super(SpiderMiddlewareManager, self)._add_middleware(mw) if hasattr(mw, 'process_spider_input'): self.methods['process_spider_input'].append(mw.process_spider_input) if hasattr(mw, 'process_spider_output'): self.methods['process_spider_output'].insert(0, mw.process_spider_output) if hasattr(mw, 'process_spider_exception'): self.methods['process_spider_exception'].insert(0, mw.process_spider_exception) if hasattr(mw, 'process_start_requests'): self.methods['process_start_requests'].insert(0, mw.process_start_requests)
|
爬虫中间件管理器初始化与之前的下载器中间件管理器类似,先是从配置文件中加载了默认的爬虫中间件类,然后依次注册爬虫中间件的一系列流程方法。配置文件中定义的默认的爬虫中间件类如下:
1 2 3 4 5 6 7 8
| SPIDER_MIDDLEWARES_BASE = { 'scrapy.spidermiddlewares.httperror.HttpErrorMiddleware': 50, 'scrapy.spidermiddlewares.offsite.OffsiteMiddleware': 500, 'scrapy.spidermiddlewares.referer.RefererMiddleware': 700, 'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware': 800, 'scrapy.spidermiddlewares.depth.DepthMiddleware': 900, }
|
这里解释一下,这些默认的爬虫中间件的职责:
- HttpErrorMiddleware:针对非 200 响应错误进行逻辑处理;
- OffsiteMiddleware:如果Spider中定义了
allowed_domains
,会自动过滤除此之外的域名请求;
- RefererMiddleware:追加
Referer
头信息;
- UrlLengthMiddleware:过滤 URL 长度超过限制的请求;
- DepthMiddleware:过滤超过指定深度的抓取请求;
当然,在这里你也可以定义自己的爬虫中间件,来处理自己所需的逻辑。
爬虫中间件管理器初始化完之后,然后就是 Pipeline
组件的初始化,默认的 Pipeline
组件是 ItemPipelineManager
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class ItemPipelineManager(MiddlewareManager):
component_name = 'item pipeline'
@classmethod def _get_mwlist_from_settings(cls, settings): return build_component_list(settings.getwithbase('ITEM_PIPELINES'))
def _add_middleware(self, pipe): super(ItemPipelineManager, self)._add_middleware(pipe) if hasattr(pipe, 'process_item'): self.methods['process_item'].append(pipe.process_item)
def process_item(self, item, spider): return self._process_chain('process_item', item, spider)
|
我们可以看到 ItemPipelineManager
也是中间件管理器的一个子类,由于它的行为非常类似于中间件,但由于功能较为独立,所以属于核心组件之一。
从 Scraper
的初始化过程我们可以看出,它管理着 Spiders
和 Pipeline
相关的数据交互。
总结
好了,这篇文章我们主要剖析了 Scrapy 涉及到的核心的组件,主要包括:引擎、下载器、调度器、爬虫类、输出处理器,以及它们各自都是如何初始化的,在初始化过程中,它们又包含了哪些子模块来辅助完成这些模块的功能。
这些组件各司其职,相互协调,共同完成爬虫的抓取任务,而且从代码中我们也能发现,每个组件类都是定义在配置文件中的,也就是说我们可以实现自己的逻辑,然后替代这些组件,这样的设计模式也非常值得我们学习。
在下一篇文章中,我会带你剖析 Scrapy 最为核心的处理流程,分析这些组件具体是如何相互协作,完成我们的抓取任务的。
附: