上一篇文章:Scrapy源码分析(三)核心组件初始化 ,我们已经分析了 Scrapy 核心组件的主要职责,以及它们在初始化时都完成了哪些工作。
这篇文章就让我们来看一下,也是 Scrapy 最核心的抓取流程是如何运行的,它是如何调度各个组件,完成整个抓取工作的。
运行入口 还是回到最初的入口,在Scrapy源码分析(二)运行入口 这篇文章中我们已经详细分析过了,在执行 Scrapy 命令时,主要经过以下几步:
调用 cmdline.py
的 execute
方法
找到对应的 命令实例
解析命令行
构建 CrawlerProcess
实例,调用 crawl
和 start
方法开始抓取
而 crawl
方法最终是调用了 Cralwer
实例的 crawl
,这个方法最终把控制权交给了Engine
,而 start
方法注册好协程池,就开始异步调度执行了。
我们来看 Cralwer
的 crawl
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @defer.inlineCallbacks def crawl (self, *args, **kwargs) : assert not self.crawling, "Crawling already taking place" self.crawling = True try : self.spider = self._create_spider(*args, **kwargs) self.engine = self._create_engine() start_requests = iter(self.spider.start_requests()) yield self.engine.open_spider(self.spider, start_requests) yield defer.maybeDeferred(self.engine.start) except Exception: if six.PY2: exc_info = sys.exc_info() self.crawling = False if self.engine is not None : yield self.engine.close() if six.PY2: six.reraise(*exc_info) raise
这里首先会创建出爬虫实例,然后创建引擎,之后调用了 spider
的 start_requests
方法,这个方法就是我们平时写的最多爬虫类的父类,它在 spiders/__init__.py
中定义:
1 2 3 4 5 6 7 8 def start_requests (self) : for url in self.start_urls: yield self.make_requests_from_url(url) def make_requests_from_url (self, url) : return Request(url, dont_filter=True )
构建请求 通过上面这段代码,我们能看到,平时我们必须要定义的 start_urls
属性,原来就是在这里用来构建 Request
的,来看 Request
的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class Request (object_ref) : def __init__ (self, url, callback=None, method='GET' , headers=None, body=None, cookies=None, meta=None, encoding='utf-8' , priority=0 , dont_filter=False, errback=None) : self._encoding = encoding self.method = str(method).upper() self._set_url(url) self._set_body(body) assert isinstance(priority, int), "Request priority not an integer: %r" % priority self.priority = priority assert callback or not errback, "Cannot use errback without a callback" self.callback = callback self.errback = errback self.cookies = cookies or {} self.headers = Headers(headers or {}, encoding=encoding) self.dont_filter = dont_filter self._meta = dict(meta) if meta else None
Request
对象比较简单,就是封装了请求参数、请求方法、回调以及可附加的属性信息。
当然,你也可以在子类中重写 start_requests
和 make_requests_from_url
这 2 个方法,用来自定义逻辑构建种子请求。
引擎调度 再回到 crawl
方法,构建好种子请求对象后,调用了 engine
的 open_spider
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @defer.inlineCallbacks def open_spider (self, spider, start_requests=() , close_if_idle=True) : assert self.has_capacity(), "No free spider slot when opening %r" % \ spider.name logger.info("Spider opened" , extra={'spider' : spider}) nextcall = CallLaterOnce(self._next_request, spider) scheduler = self.scheduler_cls.from_crawler(self.crawler) start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider) slot = Slot(start_requests, close_if_idle, nextcall, scheduler) self.slot = slot self.spider = spider yield scheduler.open(spider) yield self.scraper.open_spider(spider) self.crawler.stats.open_spider(spider) yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider) slot.nextcall.schedule() slot.heartbeat.start(5 )
在这里首先构建了一个 CallLaterOnce
,之后把 _next_request
方法注册了进去,看此类的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class CallLaterOnce (object) : def __init__ (self, func, *a, **kw) : self._func = func self._a = a self._kw = kw self._call = None def schedule (self, delay=0 ) : if self._call is None : self._call = reactor.callLater(delay, self) def cancel (self) : if self._call: self._call.cancel() def __call__ (self) : self._call = None return self._func(*self._a, **self._kw)
这里封装了循环执行的方法类,并且注册的方法会在 twisted
的 reactor
中异步执行,以后执行只需调用 schedule
,就会注册 self
到 reactor
的 callLater
中,然后它会执行 __call__
方法,最终执行的就是我们注册的方法。
而这里我们注册的方法就是引擎的 _next_request
,也就是说,此方法会循环调度,直到程序退出。
之后调用了爬虫中间件的 process_start_requests
方法,你可以定义多个自己的爬虫中间件,每个类都重写此方法,爬虫在调度之前会分别调用你定义好的爬虫中间件,来处理初始化请求,你可以进行过滤、加工、筛选以及你想做的任何逻辑。
这样做的好处就是,把想做的逻辑拆分成多个中间件,每个中间件功能独立,而且维护起来更加清晰。
调度器 接下来就要开始调度任务了,这里首先调用了 Scheduler
的 open
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def open (self, spider) : self.spider = spider self.mqs = self.pqclass(self._newmq) self.dqs = self._dq() if self.dqdir else None return self.df.open() def _dq (self) : activef = join(self.dqdir, 'active.json' ) if exists(activef): with open(activef) as f: prios = json.load(f) else : prios = () q = self.pqclass(self._newdq, startprios=prios) if q: logger.info("Resuming crawl (%(queuesize)d requests scheduled)" , {'queuesize' : len(q)}, extra={'spider' : self.spider}) return q
在 open
方法中,调度器会实例化出优先级队列,以及根据 dqdir
是否配置,决定是否使用磁盘队列,最后调用了请求指纹过滤器 的 open
方法,这个方法在父类 BaseDupeFilter
中定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class BaseDupeFilter (object) : @classmethod def from_settings (cls, settings) : return cls() def request_seen (self, request) : return False def open (self) : pass def close (self, reason) : pass def log (self, request, spider) : pas
请求过滤器提供了请求过滤的具体实现方式,Scrapy 默认提供了 RFPDupeFilter
过滤器实现过滤重复请求的逻辑,这里先对这个类有个了解,后面会讲具体是如何过滤重复请求的。
Scraper 再之后就调用 Scraper
的 open_spider
方法,在之前的文章中我们提到过,Scraper
类是连接 Engine
、Spider
、Item Pipeline
这 3 个组件的桥梁:
1 2 3 4 5 @defer.inlineCallbacks def open_spider (self, spider) : self.slot = Slot() yield self.itemproc.open_spider(spider)
这里的主要逻辑是 Scraper
调用所有 Pipeline
的 open_spider
方法,如果我们定义了多个 Pipeline
输出类,可以重写 open_spider
完成每个 Pipeline
在输出前的初始化工作。
循环调度 调用了一系列组件的 open
方法后,最后调用了 nextcall.schedule()
开始调度,也就是循环执行在上面注册的 _next_request
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 def _next_request (self, spider) : slot = self.slot if not slot: return if self.paused: return while not self._needs_backout(spider): if not self._next_request_from_scheduler(spider): break if slot.start_requests and not self._needs_backout(spider): try : request = next(slot.start_requests) except StopIteration: slot.start_requests = None except Exception: slot.start_requests = None logger.error('Error while obtaining start requests' , exc_info=True , extra={'spider' : spider}) else : self.crawl(request, spider) if self.spider_is_idle(spider) and slot.close_if_idle: self._spider_idle(spider) def _needs_backout (self, spider) : slot = self.slot return not self.running \ or slot.closing \ or self.downloader.needs_backout() \ or self.scraper.slot.needs_backout() def _next_request_from_scheduler (self, spider) : slot = self.slot request = slot.scheduler.next_request() if not request: return d = self._download(request, spider) d.addBoth(self._handle_downloader_output, request, spider) d.addErrback(lambda f: logger.info('Error while handling downloader output' , exc_info=failure_to_exc_info(f), extra={'spider' : spider})) d.addBoth(lambda _: slot.remove_request(request)) d.addErrback(lambda f: logger.info('Error while removing request from slot' , exc_info=failure_to_exc_info(f), extra={'spider' : spider})) d.addBoth(lambda _: slot.nextcall.schedule()) d.addErrback(lambda f: logger.info('Error while scheduling new request' , exc_info=failure_to_exc_info(f), extra={'spider' : spider})) return d def crawl (self, request, spider) : assert spider in self.open_spiders, \ "Spider %r not opened when crawling: %s" % (spider.name, request) self.schedule(request, spider) self.slot.nextcall.schedule() def schedule (self, request, spider) : self.signals.send_catch_log(signal=signals.request_scheduled, request=request, spider=spider) if not self.slot.scheduler.enqueue_request(request): self.signals.send_catch_log(signal=signals.request_dropped, request=request, spider=spider)
_next_request
方法首先调用 _needs_backout
检查是否需要等待,等待的条件有以下几种情况:
引擎是否主动关闭
Slot是否关闭
下载器在网络下载时是否超过预设参数
Scraper处理输出是否超过预设参数
如果不需要等待,则调用 _next_request_from_scheduler
,此方法从名字上就能看出,主要是从 Schduler
中获取 Request
。
这里要注意,在第一次调用此方法时,Scheduler
中是没有放入任何 Request
的,这里会直接break
出来,执行下面的逻辑,而下面就会调用 crawl
方法,实际是把请求放到 Scheduler
的请求队列,放入队列的过程会经过请求过滤器校验是否重复。
下次再调用 _next_request_from_scheduler
时,就能从 Scheduler
中获取到下载请求,然后执行下载动作。
先来看第一次调度,执行 crawl
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def crawl (self, request, spider) : assert spider in self.open_spiders, \ "Spider %r not opened when crawling: %s" % (spider.name, request) self.schedule(request, spider) self.slot.nextcall.schedule() def schedule (self, request, spider) : self.signals.send_catch_log(signal=signals.request_scheduled, request=request, spider=spider) if not self.slot.scheduler.enqueue_request(request): self.signals.send_catch_log(signal=signals.request_dropped, request=request, spider=spider)
调用引擎的 crawl
实际就是把请求放入 Scheduler
的队列中,下面看请求是如何入队列的。
请求入队 Scheduler
请求入队方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 def enqueue_request (self, request) : if not request.dont_filter and self.df.request_seen(request): self.df.log(request, self.spider) return False dqok = self._dqpush(request) if dqok: self.stats.inc_value('scheduler/enqueued/disk' , spider=self.spider) else : self._mqpush(request) self.stats.inc_value('scheduler/enqueued/memory' , spider=self.spider) self.stats.inc_value('scheduler/enqueued' , spider=self.spider) return True def _dqpush (self, request) : if self.dqs is None : return try : reqd = request_to_dict(request, self.spider) self.dqs.push(reqd, -request.priority) except ValueError as e: if self.logunser: msg = ("Unable to serialize request: %(request)s - reason:" " %(reason)s - no more unserializable requests will be" " logged (stats being collected)" ) logger.warning(msg, {'request' : request, 'reason' : e}, exc_info=True , extra={'spider' : self.spider}) self.logunser = False self.stats.inc_value('scheduler/unserializable' , spider=self.spider) return else : return True def _mqpush (self, request) : self.mqs.push(request, -request.priority)
在上一篇文章时有说到,调度器主要定义了 2 种队列:基于磁盘队列、基于内存队列。
如果在实例化 Scheduler
时候传入 jobdir
,则使用磁盘队列,否则使用内存队列,默认使用内存队列。
指纹过滤 上面说到,在请求入队之前,首先会通过请求指纹过滤器检查请求是否重复,也就是调用了过滤器的 request_seen
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def request_seen (self, request) : fp = self.request_fingerprint(request) if fp in self.fingerprints: return True self.fingerprints.add(fp) if self.file: self.file.write(fp + os.linesep) def request_fingerprint (self, request) : return request_fingerprint(request)
utils.request
的 request_fingerprint
逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def request_fingerprint (request, include_headers=None) : """生成请求指纹""" if include_headers: include_headers = tuple(to_bytes(h.lower()) for h in sorted(include_headers)) cache = _fingerprint_cache.setdefault(request, {}) if include_headers not in cache: fp = hashlib.sha1() fp.update(to_bytes(request.method)) fp.update(to_bytes(canonicalize_url(request.url))) fp.update(request.body or b'' ) if include_headers: for hdr in include_headers: if hdr in request.headers: fp.update(hdr) for v in request.headers.getlist(hdr): fp.update(v) cache[include_headers] = fp.hexdigest() return cache[include_headers]
这个过滤器先是通过 Request
对象生成一个请求指纹,在这里使用 sha1
算法,并记录到指纹集合,每次请求入队前先到这里验证一下指纹集合,如果已存在,则认为请求重复,则不会重复入队列。
不过如果我想不校验重复,也想重复爬取怎么办?看 enqueue_request
的第一行判断,仅需将 Request
实例的 dont_filter
设置为 True
就可以重复抓取此请求,非常灵活。
Scrapy 就是通过此逻辑实现重复请求的过滤,默认情况下,重复请求是不会进行重复抓取的。
下载请求 请求第一次进来后,肯定是不重复的,那么则会正常进入调度器队列。之后下一次调度,再次调用 _next_request_from_scheduler
方法,此时调用调度器的 next_request
方法,就是从调度器队列中取出一个请求,这次就要开始进行网络下载了,也就是调用 _download
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def _download (self, request, spider) : slot = self.slot slot.add_request(request) def _on_success (response) : assert isinstance(response, (Response, Request)) if isinstance(response, Response): response.request = request logkws = self.logformatter.crawled(request, response, spider) logger.log(*logformatter_adapter(logkws), extra={'spider' : spider}) self.signals.send_catch_log(signal=signals.response_received, \ response=response, request=request, spider=spider) return response def _on_complete (_) : slot.nextcall.schedule() return _ dwld = self.downloader.fetch(request, spider) dwld.addCallbacks(_on_success) dwld.addBoth(_on_complete) return dwld
在进行网络下载时,调用了 Downloader
的 fetch
:
1 2 3 4 5 6 7 8 9 10 11 def fetch (self, request, spider) : def _deactivate (response) : self.active.remove(request) return response self.active.add(request) dfd = self.middleware.download(self._enqueue_request, request, spider) return dfd.addBoth(_deactivate)
这里调用下载器中间件的 download
,并注册下载成功的回调方法是 _enqueue_request
,来看下载方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 def download (self, download_func, request, spider) : @defer.inlineCallbacks def process_request (request) : for method in self.methods['process_request' ]: response = yield method(request=request, spider=spider) assert response is None or isinstance(response, (Response, Request)), \ 'Middleware %s.process_request must return None, Response or Request, got %s' % \ (six.get_method_self(method).__class__.__name__, response.__class__.__name__) if response: defer.returnValue(response) defer.returnValue((yield download_func(request=request,spider=spider))) @defer.inlineCallbacks def process_response (response) : assert response is not None , 'Received None in process_response' if isinstance(response, Request): defer.returnValue(response) for method in self.methods['process_response' ]: response = yield method(request=request, response=response, spider=spider) assert isinstance(response, (Response, Request)), \ 'Middleware %s.process_response must return Response or Request, got %s' % \ (six.get_method_self(method).__class__.__name__, type(response)) if isinstance(response, Request): defer.returnValue(response) defer.returnValue(response) @defer.inlineCallbacks def process_exception (_failure) : exception = _failure.value for method in self.methods['process_exception' ]: response = yield method(request=request, exception=exception, spider=spider) assert response is None or isinstance(response, (Response, Request)), \ 'Middleware %s.process_exception must return None, Response or Request, got %s' % \ (six.get_method_self(method).__class__.__name__, type(response)) if response: defer.returnValue(response) defer.returnValue(_failure) deferred = mustbe_deferred(process_request, request) deferred.addErrback(process_exception) deferred.addCallback(process_response) return deferred
在下载过程中,首先找到所有定义好的下载器中间件,包括内置定义好的,也可以自己扩展下载器中间件,下载前先依次执行 process_request
,可对 Request
进行加工、处理、校验等操作,然后发起真正的网络下载,也就是第一个参数 download_func
,在这里是 Downloader
的 _enqueue_request
方法:
下载成功后回调 Downloader
的 _enqueue_request
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 def _enqueue_request (self, request, spider) : key, slot = self._get_slot(request, spider) request.meta['download_slot' ] = key def _deactivate (response) : slot.active.remove(request) return response slot.active.add(request) deferred = defer.Deferred().addBoth(_deactivate) slot.queue.append((request, deferred)) self._process_queue(spider, slot) return deferred def _process_queue (self, spider, slot) : if slot.latercall and slot.latercall.active(): return now = time() delay = slot.download_delay() if delay: penalty = delay - now + slot.lastseen if penalty > 0 : slot.latercall = reactor.callLater(penalty, self._process_queue, spider, slot) return while slot.queue and slot.free_transfer_slots() > 0 : slot.lastseen = now request, deferred = slot.queue.popleft() dfd = self._download(slot, request, spider) dfd.chainDeferred(deferred) if delay: self._process_queue(spider, slot) break def _download (self, slot, request, spider) : dfd = mustbe_deferred(self.handlers.download_request, request, spider) def _downloaded (response) : self.signals.send_catch_log(signal=signals.response_downloaded, response=response, request=request, spider=spider) return response dfd.addCallback(_downloaded) slot.transferring.add(request) def finish_transferring (_) : slot.transferring.remove(request) self._process_queue(spider, slot) return _ return dfd.addBoth(finish_transferring)
这里也维护了一个下载队列,可根据配置达到延迟下载的要求。真正发起下载请求是调用了 self.handlers.download_request
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 def download_request (self, request, spider) : scheme = urlparse_cached(request).scheme handler = self._get_handler(scheme) if not handler: raise NotSupported("Unsupported URL scheme '%s': %s" % (scheme, self._notconfigured[scheme])) return handler.download_request(request, spider) def _get_handler (self, scheme) : if scheme in self._handlers: return self._handlers[scheme] if scheme in self._notconfigured: return None if scheme not in self._schemes: self._notconfigured[scheme] = 'no handler available for that scheme' return None path = self._schemes[scheme] try : dhcls = load_object(path) dh = dhcls(self._crawler.settings) except NotConfigured as ex: self._notconfigured[scheme] = str(ex) return None except Exception as ex: logger.error('Loading "%(clspath)s" for scheme "%(scheme)s"' , {"clspath" : path, "scheme" : scheme}, exc_info=True , extra={'crawler' : self._crawler}) self._notconfigured[scheme] = str(ex) return None else : self._handlers[scheme] = dh return self._handlers[scheme]
下载前,先通过解析 request
的 scheme
来获取对应的下载处理器,默认配置文件中定义的下载处理器如下:
1 2 3 4 5 6 7 DOWNLOAD_HANDLERS_BASE = { 'file' : 'scrapy.core.downloader.handlers.file.FileDownloadHandler' , 'http' : 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler' , 'https' : 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler' , 's3' : 'scrapy.core.downloader.handlers.s3.S3DownloadHandler' , 'ftp' : 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler' , }
然后调用 download_request
方法,完成网络下载,这里不再详细讲解每个处理器的实现,简单来说,你可以把它想象成封装好的网络下载库,输入URL,它会给你输出下载结果,这样方便理解。
在下载过程中,如果发生异常情况,则会依次调用下载器中间件的 process_exception
方法,每个中间件只需定义自己的异常处理逻辑即可。
如果下载成功,则会依次执行下载器中间件的 process_response
方法,每个中间件可以进一步处理下载后的结果,最终返回。
这里值得提一下,process_request
方法是每个中间件顺序执行的,而 process_response
和 process_exception
方法是每个中间件倒序执行的,具体可看一下 DownaloderMiddlewareManager
的 _add_middleware
方法,就可以明白是如何注册这个方法链的。
拿到最终的下载结果后,再回到 ExecuteEngine
的 _next_request_from_scheduler
中,会看到调用了 _handle_downloader_output
,也就是处理下载结果的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def _handle_downloader_output (self, response, request, spider) : assert isinstance(response, (Request, Response, Failure)), response if isinstance(response, Request): self.crawl(response, spider) return d = self.scraper.enqueue_scrape(response, request, spider) d.addErrback(lambda f: logger.error('Error while enqueuing downloader output' , exc_info=failure_to_exc_info(f), extra={'spider' : spider})) return d
拿到下载结果后,主要分 2 个逻辑:
如果返回的是 Request
实例,则直接再次放入 Scheduler
请求队列
如果返回的是是 Response
或 Failure
实例,则调用 Scraper
的 enqueue_scrape
方法,做进一步处理
处理下载结果 请求入队逻辑不用再说,前面已经讲过。现在主要看 Scraper
的 enqueue_scrape
,看Scraper
组件是如何处理后续逻辑的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def enqueue_scrape (self, response, request, spider) : slot = self.slot dfd = slot.add_response_request(response, request) def finish_scraping (_) : slot.finish_response(response, request) self._check_if_closing(spider, slot) self._scrape_next(spider, slot) return _ dfd.addBoth(finish_scraping) dfd.addErrback( lambda f: logger.error('Scraper bug processing %(request)s' , {'request' : request}, exc_info=failure_to_exc_info(f), extra={'spider' : spider})) self._scrape_next(spider, slot) return dfd def _scrape_next (self, spider, slot) : while slot.queue: response, request, deferred = slot.next_response_request_deferred() self._scrape(response, request, spider).chainDeferred(deferred) def _scrape (self, response, request, spider) : assert isinstance(response, (Response, Failure)) dfd = self._scrape2(response, request, spider) dfd.addErrback(self.handle_spider_error, request, response, spider) dfd.addCallback(self.handle_spider_output, request, response, spider) return dfd def _scrape2 (self, request_result, request, spider) : if not isinstance(request_result, Failure): return self.spidermw.scrape_response( self.call_spider, request_result, request, spider) else : dfd = self.call_spider(request_result, request, spider) return dfd.addErrback( self._log_download_errors, request_result, request, spider)
首先把请求和响应加入到 Scraper
的处理队列中,然后从队列中获取到任务,如果不是异常结果,则调用爬虫中间件管理器 的 scrape_response
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def scrape_response (self, scrape_func, response, request, spider) : fname = lambda f:'%s.%s' % ( six.get_method_self(f).__class__.__name__, six.get_method_function(f).__name__) def process_spider_input (response) : for method in self.methods['process_spider_input' ]: try : result = method(response=response, spider=spider) assert result is None , \ 'Middleware %s must returns None or ' \ 'raise an exception, got %s ' \ % (fname(method), type(result)) except : return scrape_func(Failure(), request, spider) return scrape_func(response, request, spider) def process_spider_exception (_failure) : exception = _failure.value for method in self.methods['process_spider_exception' ]: result = method(response=response, exception=exception, spider=spider) assert result is None or _isiterable(result), \ 'Middleware %s must returns None, or an iterable object, got %s ' % \ (fname(method), type(result)) if result is not None : return result return _failure def process_spider_output (result) : for method in self.methods['process_spider_output' ]: result = method(response=response, result=result, spider=spider) assert _isiterable(result), \ 'Middleware %s must returns an iterable object, got %s ' % \ (fname(method), type(result)) return result dfd = mustbe_deferred(process_spider_input, response) dfd.addErrback(process_spider_exception) dfd.addCallback(process_spider_output) return dfd
有没有感觉套路很熟悉?与上面下载器中间件调用方式非常相似,也调用一系列的前置方法,再执行真正的处理逻辑,最后执行一系列的后置方法。
回调爬虫 接下来看一下,Scrapy 是如何执行我们写好的爬虫逻辑的,也就是 call_spider
方法,这里回调我们写好的爬虫类:
1 2 3 4 5 6 7 def call_spider (self, result, request, spider) : result.request = request dfd = defer_result(result) dfd.addCallbacks(request.callback or spider.parse, request.errback) return dfd.addCallback(iterate_spider_output)
看到这里,你应该更熟悉,平时我们写的最多的爬虫代码,parse
则是第一个回调方法。之后爬虫类拿到下载结果,就可以定义下载后的 callback
方法,也是在这里进行回调执行的。
处理输出 在与爬虫类交互完成之后,Scraper
调用了 handle_spider_output
方法处理爬虫的输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def handle_spider_output (self, result, request, response, spider) : if not result: return defer_succeed(None ) it = iter_errback(result, self.handle_spider_error, request, response, spider) dfd = parallel(it, self.concurrent_items, self._process_spidermw_output, request, response, spider) return dfd def _process_spidermw_output (self, output, request, response, spider) : if isinstance(output, Request): self.crawler.engine.crawl(request=output, spider=spider) elif isinstance(output, (BaseItem, dict)): self.slot.itemproc_size += 1 dfd = self.itemproc.process_item(output, spider) dfd.addBoth(self._itemproc_finished, output, response, spider) return dfd elif output is None : pass else : typename = type(output).__name__ logger.error('Spider must return Request, BaseItem, dict or None, ' 'got %(typename)r in %(request)s' , {'request' : request, 'typename' : typename}, extra={'spider' : spider})
执行完我们自定义的解析逻辑后,解析方法可返回新的 Request
或 BaseItem
实例。
如果是新的请求,则再次通过 Scheduler
进入请求队列,如果是 BaseItem
实例,则调用 Pipeline
管理器,依次执行 process_item
。我们想输出结果时,只需要定义 Pepeline
类,然后重写这个方法就可以了。
ItemPipeManager
处理逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class ItemPipelineManager (MiddlewareManager) : component_name = 'item pipeline' @classmethod def _get_mwlist_from_settings (cls, settings) : return build_component_list(settings.getwithbase('ITEM_PIPELINES' )) def _add_middleware (self, pipe) : super(ItemPipelineManager, self)._add_middleware(pipe) if hasattr(pipe, 'process_item' ): self.methods['process_item' ].append(pipe.process_item) def process_item (self, item, spider) : return self._process_chain('process_item' , item, spider)
可以看到 ItemPipeManager
也是一个中间件,和之前下载器中间件管理器和爬虫中间件管理器类似,如果子类有定义 process_item
,则依次执行它。
执行完之后,调用 _itemproc_finished
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def _itemproc_finished (self, output, item, response, spider) : self.slot.itemproc_size -= 1 if isinstance(output, Failure): ex = output.value if isinstance(ex, DropItem): logkws = self.logformatter.dropped(item, ex, response, spider) logger.log(*logformatter_adapter(logkws), extra={'spider' : spider}) return self.signals.send_catch_log_deferred( signal=signals.item_dropped, item=item, response=response, spider=spider, exception=output.value) else : logger.error('Error processing %(item)s' , {'item' : item}, exc_info=failure_to_exc_info(output), extra={'spider' : spider}) else : logkws = self.logformatter.scraped(output, response, spider) logger.log(*logformatter_adapter(logkws), extra={'spider' : spider}) return self.signals.send_catch_log_deferred( signal=signals.item_scraped, item=output, response=response, spider=spider)
这里可以看到,如果想在 Pipeline
中丢弃某个结果,直接抛出 DropItem
异常即可,Scrapy 会进行对应的处理。
到这里,抓取结果会根据自定义的输出类,然后输出到指定位置,而新的 Request
则会再次进入请求队列,等待引擎下一次调度,也就是再次调用 ExecutionEngine
的 _next_request
,直至请求队列没有新的任务,整个程序退出。
CrawlerSpider 以上,基本上整个核心抓取流程就讲完了。
这里再简单说一下 CrawlerSpider
类,我们平时用的也比较多,它其实就是继承了 Spider
类,然后重写了 parse
方法(这也是继承此类不要重写此方法的原因),并结合 Rule
规则类,来完成 Request
的自动提取逻辑。
Scrapy 提供了这个类方便我们更快速地编写爬虫代码,我们也可以基于此类进行再次封装,让我们的爬虫代码写得更简单。
由此我们也可看出,Scrapy 的每个模块的实现都非常纯粹,每个组件都通过配置文件定义连接起来,如果想要扩展或替换,只需定义并实现自己的处理逻辑即可,其他模块均不受任何影响,所以我们也可以看到,业界有非常多的 Scrapy 插件,都是通过此机制来实现的。
总结 这篇文章的代码量较多,也是 Scrapy 最为核心的抓取流程,如果你能把这块逻辑搞清楚了,那对 Scrapy 开发新的插件,或者在它的基础上进行二次开发也非常简单了。
总结一下整个抓取流程,还是用这两张图表示再清楚不过:
Scrapy 整体给我的感觉是,虽然它只是个单机版的爬虫框架,但我们可以非常方便地编写插件,或者自定义组件替换默认的功能,从而定制化我们自己的爬虫,最终可以实现一个功能强大的爬虫框架,例如分布式、代理调度、并发控制、可视化、监控等功能,它的灵活度非常高。
附: