0%

WebMagicInAction1

最近做了一个爬虫项目,经过了 从无框架的httpclient+jsoup,到使用webmagic,再到改造webmagic做定制化使用的三个阶段。接下来,我讲介绍一下我们在这个过程中获得的经验和夸过的一些坑。

蛮荒时代

这里引用webmagic的爬虫框架介绍。大多数爬虫框架的核心模块都是下载模块(Downloader),处理模块(PageProcessor),调度模块(Scheduler),持久化模块(Pipeline)
webmagic核心模块图
在蛮荒时代,我们的认知有限,或者说是迫于团队环境因数有限。我们都是自己造石器。物理机器只有一台。敌方信息有多种,我们都需要爬取。下载模块最开始使用的是HttpClient。需要顺序性爬取的任务,使用mysql数据库中的各种奇葩状态字段来记录。
程序猿A:id1的任务1完成啦,B你快点开工;id2的任务1失败了,B你别查哦
程序猿B满头大汗中。
上述就是我们基于tbschedule来调度爬取任务的过程。哦,对了tbschedule是我们目前使用的最好的分布式调度工具。所以它目前的身份是业务调度中心,爬虫调度器。
处理模块?放宽心啦。我们处理模块还和我们目前的业务代码绑得紧紧的。持久模块?不就是mysql的CURD么?

考虑到2017年7月1日有一个互联网信息安全法的发布(货车帮都被抓走了那么多),虽然没有去恶意偷取别人信息,也没有去搞什么公民信息,但是为了安全。还是决定做一些拆分。爬虫就是爬虫,部署到外网服务器上去。业务是业务,就企业内部享受服务资源就行了。啊,上头开恩,给拨了2台机器了,好开明。

爬虫服务器只负责接受爬取任务请求,然后通过合适的方式回传到内网的数据仓库,哦对了,至此有了数据仓库概念了。因为以前都是落表。要求数据一致性,规整性比较高。所以爬取的数据利用率也非常差。数据仓库就是搭的一台ES服务器。
接下来,调度模块的改造,我们用了一个rabbitmq来作为一个中间信息传递媒介,相当于一个调度器。入口节点的拉取完成后,分析模块将中间结果发送到mq的特定queue中。下一级的下载模块,通过消费端监听触发下载、分析,再到下一级,最后返回给业务服务器。
分离出来的页面处理模块得到了很好的利用,在后期调度器再次重构,下载模块更换过程中几乎都没有什么修改。
后面就是针对下载模块的不断尝试。预设目标中加入了动态页面需要进行获取信息。phantomjs+selenium的配合可以让我们无惧此类页面。但是这两者配合也有坑。
先说说phantomjs,如果设置为不加载图片,那么这个下载的速度,真心很爽。

1
2
3
4
5
PhantomJSOptions pjsOption = new PhantomJSOptions();
PhantomJSDriverService pjsService =
PhantomJSDriverService.CreateDefaultService();
pjsService.LoadImages = false;
PhantomJSDriver driver = new PhantomJSDriver(pjsService, pjsOption);

但是长期使用的实际效果看来,特别容易崩。如果不使用此特性。那么下载速度特别慢。基本上都需要达到主动触发timeout

1
2
3
4
5
6
7
8
9
10
11
12
WebDriverWait webDriverWait = new WebDriverWait(phantomJsServer.getWebDriver(), 5);
try {
phantomJsServer.getWebDriver().get("https://ju.suning.com/");
}
catch (Exception e) {
LOGGER.warn("加载苏宁促销页面超时");
}
try {
webDriverWait.until(ExpectedConditionsMine.presenceOfElementLocated(By.cssSelector("#topCats > ul")));
} catch (Exception e) {
LOGGER.error("等待加载关键元素超时");
}

由此,可以主动缩短其完全加载时间,只需要等待目标元素加载成功即可。速度相对提高,迈过这个坎。
phantomjs配合selenium的使用过程中,还遇到一个问题就是,内存与实例数的暴涨。由于请求到来的不确定性。是否开启新的phantomjs实例没有良好的设计,造成服务器假死。先控制了一个phantomjs实例最多开启50个tab页来控制。然后引入akka actor来控制phantomjs的实例开启,逐渐弱化mq的调度功能,改用actor的route来控制。

啊,作为现代人,蛮荒时代是多么的可怕,怎么还不结束,我都写的郁闷了,可见当时做的有多么郁闷。新加一个爬取元素,需要梳理整个流程。去冲控制?没有。延时?想通了就各自在自己顺眼的地方加一个sleep。统一配置?别想了。

洋务运动

团队走了人又加了人,我还是那个混世大魔王,不愿意让项目腐朽下去。决定自己先一个人摸索好一个目标网站的爬取。webmagic其实很早就接触过,当时只知道这工具是做爬虫的。然后对比了一下其他几个类似的项目,其实都在像scrapy致敬,但是当时的我不懂爬虫。其实这个阶段没有多少参考性,独立的工程,就在自己的开发机器上运行,然后用logstash将爬取好的数据推到线上es中去。这里最主要做的事就是一个人同时做多个网站的爬取,同时将webmagic先推荐给团队的其他同事,觉得好的就一起选择这种模式来爬取,当然中间也遇到很多怀疑的问题,然后自己会的直接回答。不会的就去翻翻源码,看看文档再来回答。

创新实践

团队目标网站有6个大型网站,最开始一个人维护两个。后面有3个网站都开始使用了webmagic框架。期间又加了一台外网机器作为爬虫机器。这个时候,3个爬虫工程要进行整合,脱离靠人来触发爬取的时代了。这个时候就看到了webmagic提供的RedisSpider。作者提供了F58PageProcessor, News163,JokejiModel这三个demo例子。但是实际上同时运行2个不同的spider实例并不会并发的分布式爬取对应的任务,任务仍然还在一个spider上实例上跑得飞起。
通过仔细分析RedisScheduler源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public synchronized Request poll(Task task) {
Jedis jedis = pool.getResource();
try {
String url = jedis.lpop(getQueueKey(task));
if (url == null) {
return null;
}
String key = ITEM_PREFIX + task.getUUID();
String field = DigestUtils.shaHex(url);
byte[] bytes = jedis.hget(key.getBytes(), field.getBytes());
if (bytes != null) {
Request o = JSON.parseObject(new String(bytes), Request.class);
return o;
}
Request request = new Request(url);
return request;
} finally {
pool.returnResource(jedis);
}
}
protected String getQueueKey(Task task) {
return QUEUE_PREFIX + task.getUUID();
}

spider实例(task)必须拥有相同的UUID才能共享redis中的待执行任务队列。同时这个通过这个上面,也遇到了一个redis使用方面的坑。webmagic对于redis的清理只有resetDuplicateCheck。这里的清理只清理掉了RedisScheduler三个key中的set_,还有queue_与item_,其中queue_与set_一样存放的是纯URL,item_中存放的是request对象。因为webmagic为了实现类似scrapy中的yeild效果。这里暂存了所有中间过程的Request。如果你不小心的在Request的extra中使用了比较大的数据。那么redis轻轻松松的膨胀了。

参考scrapy-redis中的scheduler.flush方法。也扩展了一个flush方法,具体请参见
但是这里还不够。因为没有合适的调用时机来调用flush,唯一能调用flush的时机是在爬取开始,或者爬取完成时,但是当我们爬取一个巨量网站时,顺序话要求相对比较高,频繁使用Request的extra属性,item膨胀的十分厉害,实际爬取中,爬取任务大概完成了60%左右时,内存占用2g-3g。如果是一个普通的2c4g云服务器,这太庞大了。于是我们深度定制了一个RedisSpider,在其onSuccess方法中添加

1
2
3
4
5
6
7
8
protected void onSuccess(Request request) {
if (scheduler instanceof MyRedisScheduler) {
((MyRedisScheduler) scheduler).cleanItem(this, request);
}
if (CollectionUtils.isNotEmpty(spiderListeners)) {
spiderListeners.forEach(l->l.onSuccess(request));
}
}

MyRedisScheduler.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* clean target redis item key
* @param task target task
* @param request completed request
*/
public void cleanItem(Task task, Request request) {
Jedis jedis = pool.getResource();
try {
String key = ITEM_PREFIX + task.getUUID();
String field = DigestUtils.sha1Hex(request.getUrl());
jedis.hdel(key.getBytes(), field.getBytes());
} finally {
jedis.close();
}
}

这样item_中只会保留当前待处理或正在处理中的Request对象。Redis得到全身心的放松了。

接下来的改造就不一定适用于所有场景了。我仔细看了Spider中的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void run() {
checkRunningStat();
initComponent();
logger.info("Spider {} started!",getUUID());
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
final Request request = scheduler.poll(this);
if (request == null) {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break;
}
// wait until new url added
waitNewUrl();
} else {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
processRequest(request);
onSuccess(request);
} catch (Exception e) {
onError(request);
logger.error("process request " + request + " error", e);
} finally {
pageCount.incrementAndGet();
signalNewUrl();
}
}
});
}
}
stat.set(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {
close();
}
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}

这里的listener监听回调只有每一个Request完成或失败后的onSuccess,onError。但是实际业务场景中,我们真的关心每一条数据的成功与失败么?一般都不会。如果爬取40w的数据量,掉几百条一般都是可以容忍的。在本项目中,真正看起来更加具有实用性的监听应该是一个批次任务完成,或整个实例的完成状态。
所以我们扩展了listener为

1
2
3
4
5
6
public interface RedisSpiderListener extends SpiderListener {

void onFinish(Task task);

void onComplete(Task task);
}

并直接重写了Spider的run方法为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Override
public void run() {
checkRunningStat();
initComponent();
logger.info("Spider {} started!",getUUID());
Jedis jedis = pool.getResource();
while (!Thread.currentThread().isInterrupted()
&& StringUtils.equalsIgnoreCase(jedis.get(getStatusKey()), Integer.toString(STAT_RUNNING))) {
final Request request = scheduler.poll(this);
if (request == null) {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break;
}
if (threadPool.getThreadAlive() == 0) {
onFinish();
}
// wait until new url added
waitNewUrl();
} else {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
processRequest(request);
onSuccess(request);
} catch (Exception e) {
onError(request);
logger.error("process request " + request + " error", e);
} finally {
pageCount.incrementAndGet();
signalNewUrl();
}
}
});
}
}
compareAndSetStat(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {
close();
}
onComplete();
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}

扩展现有webmagic的回调。因为是将spider与web服务配合在一起,所以exitWhenComplete配置为false,用的更多的是onFinish。实际效果看起来还不错。
webmagic的挖掘与学习之旅还很长,虽然是一个简单框架。但是看得出作者还是花了很多心思在上面的。感谢!