2021腾讯云限时秒杀,爆款1核2G云服务器298元/3年!(领取2860元代金券),
地址:https://cloud.tencent.com/act/cps/redirect?redirect=1062
2021阿里云最低价产品入口+领取代金券(老用户3折起),
入口地址:https://www.aliyun.com/minisite/goods
在开始说正事之前我先给大家介绍一下这份代码的背景,以免大家有一种雾里看花的感觉。在本系列的前几篇博客中有一篇是用多线程进行百度图片的抓取,但是当时使用的多线程是非常粗略的,只是开了几个线程让抓取的速度提升了一些(其实提升了很多),初步的使用了一下线程,这篇博客将线程的使用进行了一些深入。
博主最近准备学习分布式网络爬虫,鉴于手头资料太少,所以如果有这方面的爱好者欢迎大家共同交流与学习。
项目背景
博主这次的需求是抓取一些淘宝的数据,在还没有开始学习分布式之前,我们需要掌握基本的并行爬虫的相关知识。在这里我要先吐槽一下《自己动手写网络爬虫》这本书,不得不说,这本书让我认识到了什么叫做:有一本好书,真的会提升很多学习的效率。反正这本书不适合入门,而且非常的老,代码都不能用!!但其中还是有一些值得学习的思想,但。。。不说了,都是泪。。。
本次代码仍有很多不完善的地方,比如并没有用到线程池,然后只是给大家提供一种思路,代码并不完善,博主也在不断学习当中,不得不说Java网络爬虫的学习曲线还是很陡峭的,但我认为需要开发一个好爬虫是需要语言功底很扎实的,所以学习Java爬虫还是很值得的。
代码思想与实现
对于并行爬虫而言,处理空队列要比处理序列爬虫更加复杂,空的队列并不意味着爬虫已经完成了工作,因为此刻其他的进程或线程可能依然在解析网页,并且马上会产生新的URL。进程或者线程管理者需要给报告队列为空的线程发送临时的休眠信号,线程管理员需要不断追踪休眠线程的数目,只有当所有的线程都休眠的时候,爬虫才可以终止。
接下来就看一下具体的代码:
我们假设从Redis数据库中的爬虫队列里取待解析的url。
主线程:
package multithreading;
import java.util.ArrayList;
import java.util.List;
/** * Created by hg_yi on 17-6-13. * * 线程计数器应该是一个共享变量 */
public class MultithreadCrawler{
public static void main(String[] args) throws InterruptedException {
//创建一个收集线程的列表
List<Thread> threadList = new ArrayList<Thread>();
//创建线程的个数
int threadNum = 5;
RunThread run = new RunThread();
run.setThreads(threadNum);
//创建5个线程,并对其进行收集
for (int i = 0; i < threadNum; i++) {
Thread thread = new Thread(run);
thread.start();
threadList.add(thread);
}
//main线程需要等待所有子线程退出
while (threadList.size() > 0) {
Thread child = threadList.remove(0);
child.join();
}
}
}
代码分析:
可以看到,我创建了5个线程,博主也正在深入学习Java线程的相关知识之中,目前水平还是有限的,此博客主要是记录一些多线程爬虫中需要注意的东西。可以看到我使用了一个容器将所有创建的线程进行了收集,然后为了防止主线程提前退出而让所有子线程结束,我告知主线程需要等待每一个子线程执行完毕之后,你主线程才可以结束,也就是最后的while循环做的事情。
然后在for循环中我让每个子线程都执行RunThread类中的run方法,这样操作的目的主要是考虑到了线程之间数据共享的问题。
执行线程:
package multithreading;
import redisqueue.RedisQueue;
import java.util.ArrayList;
import java.util.List;
/** * Created by hg_yi on 17-6-13. * * 我们只保证在给数据库中写入url,还有就是改变线程线程计数器的值的时候,是需要同步的。 * * 很明显线程计数器threads是所有线程共享的。 */
public class RunThread extends Thread {
//线程计数器需要对所有线程可见,是共享变量
int threads = 0;
//redis队列的对象,也是所有对象共享的变量
RedisQueue redisQueue = new RedisQueue();
//创建线程锁
private static Object lock = new Object();
public void setThreads(int threads) {
this.threads = threads;
}
public void parseToVisitUrltoRedis() throws Exception {
//用来保存新提取出来的url列表(此变量不应是共享变量,我们把它变为每个线程的私有变量)
//我们应该知道的是在Java中哪些变量在线程之间是不共享的,参考资料:
//http://www.cnblogs.com/xudong-bupt/archive/2013/05/22/3087864.html
List<String> urlList = new ArrayList<String>();
while (true) {
//从爬虫队列中取出待抓取的url
if (!redisQueue.toVisitIsEmpty()) {
String url = redisQueue.getToVisit();
/** * 对此url进行解析,提取出新的url列表 * 解析出来的url顺便就写进urlList中了 * * 在这个过程中不要求保证同步,每个线程都负责解析自己所属的url,解析完成 * 之后将url写入自己的urlList之中,当在解析过程中发生阻塞,则切换到其他 * 线程,保证程序的高并发性。 */
/** * 在此同步块中主要进行提取出来的url的写操作,必须是同步操作,保证一个同 * 一时间只有一个线程在对Redis数据库进行写操作。 */
} else {
//在改变线程计数器的值的时候必须保证线程的同步性
synchronized (lock) {
//等待线程数的计数器的计数器减1
threads--;
//如果仍然有其他线程在活动,则通知此线程进行等待
if (threads > 0) {
/*调用线程的wait方法会将此线程挂起,直到有其他线程调用notify\ notifyAll将此线程进行唤醒*/
wait();
threads++;
} else {
//如果其他的线程都在等待,说明待抓取队列已空,则通知所有线程进行退出
notifyAll();
return;
}
}
}
}
}
public void run() {
//虽然run方法不能抛出异常,但是可以在run方法中进行try,catch
try {
parseToVisitUrltoRedis();
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码分析:
请大家务必详细看代码的注解!!!
请大家务必详细看代码的注解!!!
请大家务必详细看代码的注解!!!
在run方法中我们主要实现的就是在从Redis数据库中的url队列中提取到当前需要抓取的url并对其进行解析,将新url再添加到队列中。可以看到,通过引入一个线程计数器,我们解决了上述问题,如果大家还有什么问题的话欢迎在评论区进行交流。
关于线程池版本的爬虫代码,我就不再贴出,有兴趣的同学可以对代码重新进行优化。