最近做了一个爬虫项目,经过了 从无框架的httpclient+jsoup,到使用webmagic,再到改造webmagic做定制化使用的三个阶段。接下来,我讲介绍一下我们在这个过程中获得的经验和夸过的一些坑。
蛮荒时代
这里引用webmagic的爬虫框架介绍。大多数爬虫框架的核心模块都是下载模块(Downloader),处理模块(PageProcessor),调度模块(Scheduler),持久化模块(Pipeline)

在蛮荒时代,我们的认知有限,或者说是迫于团队环境因数有限。我们都是自己造石器。物理机器只有一台。敌方信息有多种,我们都需要爬取。下载模块最开始使用的是HttpClient。需要顺序性爬取的任务,使用mysql数据库中的各种奇葩状态字段来记录。
程序猿A:id1的任务1完成啦,B你快点开工;id2的任务1失败了,B你别查哦
程序猿B满头大汗中。
上述就是我们基于tbschedule来调度爬取任务的过程。哦,对了tbschedule是我们目前使用的最好的分布式调度工具。所以它目前的身份是业务调度中心,爬虫调度器。
处理模块?放宽心啦。我们处理模块还和我们目前的业务代码绑得紧紧的。持久模块?不就是mysql的CURD么?
考虑到2017年7月1日有一个互联网信息安全法的发布(货车帮都被抓走了那么多),虽然没有去恶意偷取别人信息,也没有去搞什么公民信息,但是为了安全。还是决定做一些拆分。爬虫就是爬虫,部署到外网服务器上去。业务是业务,就企业内部享受服务资源就行了。啊,上头开恩,给拨了2台机器了,好开明。
爬虫服务器只负责接受爬取任务请求,然后通过合适的方式回传到内网的数据仓库,哦对了,至此有了数据仓库概念了。因为以前都是落表。要求数据一致性,规整性比较高。所以爬取的数据利用率也非常差。数据仓库就是搭的一台ES服务器。
接下来,调度模块的改造,我们用了一个rabbitmq来作为一个中间信息传递媒介,相当于一个调度器。入口节点的拉取完成后,分析模块将中间结果发送到mq的特定queue中。下一级的下载模块,通过消费端监听触发下载、分析,再到下一级,最后返回给业务服务器。
分离出来的页面处理模块得到了很好的利用,在后期调度器再次重构,下载模块更换过程中几乎都没有什么修改。
后面就是针对下载模块的不断尝试。预设目标中加入了动态页面需要进行获取信息。phantomjs+selenium的配合可以让我们无惧此类页面。但是这两者配合也有坑。
先说说phantomjs,如果设置为不加载图片,那么这个下载的速度,真心很爽。
1 2 3 4 5
| PhantomJSOptions pjsOption = new PhantomJSOptions(); PhantomJSDriverService pjsService = PhantomJSDriverService.CreateDefaultService(); pjsService.LoadImages = false; PhantomJSDriver driver = new PhantomJSDriver(pjsService, pjsOption);
|
但是长期使用的实际效果看来,特别容易崩。如果不使用此特性。那么下载速度特别慢。基本上都需要达到主动触发timeout
1 2 3 4 5 6 7 8 9 10 11 12
| WebDriverWait webDriverWait = new WebDriverWait(phantomJsServer.getWebDriver(), 5); try { phantomJsServer.getWebDriver().get("https://ju.suning.com/"); } catch (Exception e) { LOGGER.warn("加载苏宁促销页面超时"); } try { webDriverWait.until(ExpectedConditionsMine.presenceOfElementLocated(By.cssSelector("#topCats > ul"))); } catch (Exception e) { LOGGER.error("等待加载关键元素超时"); }
|
由此,可以主动缩短其完全加载时间,只需要等待目标元素加载成功即可。速度相对提高,迈过这个坎。
phantomjs配合selenium的使用过程中,还遇到一个问题就是,内存与实例数的暴涨。由于请求到来的不确定性。是否开启新的phantomjs实例没有良好的设计,造成服务器假死。先控制了一个phantomjs实例最多开启50个tab页来控制。然后引入akka actor来控制phantomjs的实例开启,逐渐弱化mq的调度功能,改用actor的route来控制。
啊,作为现代人,蛮荒时代是多么的可怕,怎么还不结束,我都写的郁闷了,可见当时做的有多么郁闷。新加一个爬取元素,需要梳理整个流程。去冲控制?没有。延时?想通了就各自在自己顺眼的地方加一个sleep。统一配置?别想了。
洋务运动
团队走了人又加了人,我还是那个混世大魔王,不愿意让项目腐朽下去。决定自己先一个人摸索好一个目标网站的爬取。webmagic其实很早就接触过,当时只知道这工具是做爬虫的。然后对比了一下其他几个类似的项目,其实都在像scrapy致敬,但是当时的我不懂爬虫。其实这个阶段没有多少参考性,独立的工程,就在自己的开发机器上运行,然后用logstash将爬取好的数据推到线上es中去。这里最主要做的事就是一个人同时做多个网站的爬取,同时将webmagic先推荐给团队的其他同事,觉得好的就一起选择这种模式来爬取,当然中间也遇到很多怀疑的问题,然后自己会的直接回答。不会的就去翻翻源码,看看文档再来回答。
创新实践
团队目标网站有6个大型网站,最开始一个人维护两个。后面有3个网站都开始使用了webmagic框架。期间又加了一台外网机器作为爬虫机器。这个时候,3个爬虫工程要进行整合,脱离靠人来触发爬取的时代了。这个时候就看到了webmagic提供的RedisSpider。作者提供了F58PageProcessor, News163,JokejiModel这三个demo例子。但是实际上同时运行2个不同的spider实例并不会并发的分布式爬取对应的任务,任务仍然还在一个spider上实例上跑得飞起。
通过仔细分析RedisScheduler源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Override public synchronized Request poll(Task task) { Jedis jedis = pool.getResource(); try { String url = jedis.lpop(getQueueKey(task)); if (url == null) { return null; } String key = ITEM_PREFIX + task.getUUID(); String field = DigestUtils.shaHex(url); byte[] bytes = jedis.hget(key.getBytes(), field.getBytes()); if (bytes != null) { Request o = JSON.parseObject(new String(bytes), Request.class); return o; } Request request = new Request(url); return request; } finally { pool.returnResource(jedis); } } protected String getQueueKey(Task task) { return QUEUE_PREFIX + task.getUUID(); }
|
spider实例(task)必须拥有相同的UUID才能共享redis中的待执行任务队列。同时这个通过这个上面,也遇到了一个redis使用方面的坑。webmagic对于redis的清理只有resetDuplicateCheck
。这里的清理只清理掉了RedisScheduler三个key中的set_,还有queue_与item_,其中queue_与set_一样存放的是纯URL,item_中存放的是request对象。因为webmagic为了实现类似scrapy中的yeild效果。这里暂存了所有中间过程的Request。如果你不小心的在Request的extra中使用了比较大的数据。那么redis轻轻松松的膨胀了。
参考scrapy-redis中的scheduler.flush方法。也扩展了一个flush方法,具体请参见
但是这里还不够。因为没有合适的调用时机来调用flush,唯一能调用flush的时机是在爬取开始,或者爬取完成时,但是当我们爬取一个巨量网站时,顺序话要求相对比较高,频繁使用Request的extra属性,item膨胀的十分厉害,实际爬取中,爬取任务大概完成了60%左右时,内存占用2g-3g。如果是一个普通的2c4g云服务器,这太庞大了。于是我们深度定制了一个RedisSpider,在其onSuccess方法中添加
1 2 3 4 5 6 7 8
| protected void onSuccess(Request request) { if (scheduler instanceof MyRedisScheduler) { ((MyRedisScheduler) scheduler).cleanItem(this, request); } if (CollectionUtils.isNotEmpty(spiderListeners)) { spiderListeners.forEach(l->l.onSuccess(request)); } }
|
MyRedisScheduler.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| /** * clean target redis item key * @param task target task * @param request completed request */ public void cleanItem(Task task, Request request) { Jedis jedis = pool.getResource(); try { String key = ITEM_PREFIX + task.getUUID(); String field = DigestUtils.sha1Hex(request.getUrl()); jedis.hdel(key.getBytes(), field.getBytes()); } finally { jedis.close(); } }
|
这样item_中只会保留当前待处理或正在处理中的Request对象。Redis得到全身心的放松了。
接下来的改造就不一定适用于所有场景了。我仔细看了Spider中的run方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Override public void run() { checkRunningStat(); initComponent(); logger.info("Spider {} started!",getUUID()); while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) { final Request request = scheduler.poll(this); if (request == null) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } // wait until new url added waitNewUrl(); } else { threadPool.execute(new Runnable() { @Override public void run() { try { processRequest(request); onSuccess(request); } catch (Exception e) { onError(request); logger.error("process request " + request + " error", e); } finally { pageCount.incrementAndGet(); signalNewUrl(); } } }); } } stat.set(STAT_STOPPED); // release some resources if (destroyWhenExit) { close(); } logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get()); }
|
这里的listener监听回调只有每一个Request完成或失败后的onSuccess,onError。但是实际业务场景中,我们真的关心每一条数据的成功与失败么?一般都不会。如果爬取40w的数据量,掉几百条一般都是可以容忍的。在本项目中,真正看起来更加具有实用性的监听应该是一个批次任务完成,或整个实例的完成状态。
所以我们扩展了listener为
1 2 3 4 5 6
| public interface RedisSpiderListener extends SpiderListener {
void onFinish(Task task);
void onComplete(Task task); }
|
并直接重写了Spider的run方法为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Override public void run() { checkRunningStat(); initComponent(); logger.info("Spider {} started!",getUUID()); Jedis jedis = pool.getResource(); while (!Thread.currentThread().isInterrupted() && StringUtils.equalsIgnoreCase(jedis.get(getStatusKey()), Integer.toString(STAT_RUNNING))) { final Request request = scheduler.poll(this); if (request == null) { if (threadPool.getThreadAlive() == 0 && exitWhenComplete) { break; } if (threadPool.getThreadAlive() == 0) { onFinish(); } // wait until new url added waitNewUrl(); } else { threadPool.execute(new Runnable() { @Override public void run() { try { processRequest(request); onSuccess(request); } catch (Exception e) { onError(request); logger.error("process request " + request + " error", e); } finally { pageCount.incrementAndGet(); signalNewUrl(); } } }); } } compareAndSetStat(STAT_STOPPED); // release some resources if (destroyWhenExit) { close(); } onComplete(); logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get()); }
|
扩展现有webmagic的回调。因为是将spider与web服务配合在一起,所以exitWhenComplete配置为false,用的更多的是onFinish。实际效果看起来还不错。
webmagic的挖掘与学习之旅还很长,虽然是一个简单框架。但是看得出作者还是花了很多心思在上面的。感谢!