目录前言JDK线程池一瞥自己动手实现线程池线程池参数介绍实现Runnable实现Callable拒绝策略的实现线程池关闭实现工作线程的工作实现线程池实现的BUG完整代码线程池测试总结前言在前面的文章自...
目录
前言JDK线程池一瞥
自己动手实现线程池
线程池参数介绍
实现Runnable
实现Callable
拒绝策略的实现
线程池关闭实现
工作bSxHKevYLI线程的工作实现
线程池实现的BUG
完整代码
线程池测试
总结
前言
在前面的文章自己动手写乞丐版线程池中,我们写了一个非常简单的线程池实现,这个只是一个非常简单的实现,在本篇文章当中我们将要实现一个和JDK内部实现的线程池非常相似的线程池。
JDK线程池一瞥
我们首先看一个JDK给我们提供的线程池ThreadPoolExecutor的构造函数的参数:
publicThreadPoolExecutor(intcorePoolSize, intmaximumPoolSize, longkeepAliveTime, TimeUnitunit, blockingQueue<Runnable>workQueue, ThreadFactorythreadFactory, RejectedExecutionHandlerhandler)
参数解释:
1.corePoolSize:这个参数你可以理解为线程池当中至少需要 corePoolSize 个线程,初始时线程池当中线程的个数为0,当线程池当中线程的个数小于 corePoolSize 每次提交一个任务都会创建一个线程,并且先执行这个提交的任务,然后再去任务队列里面去获取新的任务,然后再执行。
2.maximumPoolSize:这个参数指的是线程池当中能够允许的最大的线程的数目,当任务队列满了之后如果这个时候有新的任务想要加入队列当中,当发现队列满了之后就创建新的线程去执行任务,但是需要满足最大的线程的个数不能够超过 maximumPoolSize 。
3.keepAliveTime 和 unit:这个主要是用于时间的表示,当队列当中多长时间没有数据的时候线程自己退出,前面谈到了线程池当中任务过多的时候会超过 corePoolSize ,当线程池闲下来的时候这些多余的线程就可以退出了。
4.workQueue:这个就是用于保存任务的阻塞队列。
5.threadFactory:这个参数倒不是很重要,线程工厂python。
6.handler:这个表示拒绝策略,JDK给我们提供了四种策略:
AbortPolicy:抛出异常。DiscardPolicy:放弃这个任务。
CallerRunPolicy:提交任务的线程执行。
DiscardOldestPolicy:放弃等待时间最长的任务。
如果上面的参数你不能够理解,可以先阅读这篇文章自己动手写乞丐版线程池。基于上面谈到的参数,线程池当中提交任务的流程大致如下图所示:

自己动手实现线程池
根据前面的参数分析我们自己实现的线程池需要实现一下功能:
能够提交Runnable的任务和Callable的任务。线程池能够自己实现动态的扩容和所容,动态调整线程池当中线程的数目,当任务多的时候能够增加线程的数目,当任务少的时候多出来的线程能够自动退出。
有自己的拒绝策略,当任务队列满了,线程数也达到最大的时候,需要拒绝提交的任务。
线程池参数介绍
privateAtomicIntegerct=newAtomicInteger(0);//当前在执行任务的线程个数 privateintcorePoolSize; privateintmaximumPoolSize; privatelongkeepAliveTime; privateTimeUnitunit; privateBlockingQueue<Runnable>taskQueue; privateRejectPolicypolicy; privateArrayList<Worker>workers=newArrayList<>(); privatevolatilebooleanisStopped; privatebooleanuseTimed;
参数解释如下:
ct:表示当前线程池当中线程的个数。corePoolSize:线程池当中核心线程的个数,意义和上面谈到的JDK的线程池意义一致。
maximumPoolSize:线程池当中最大的线程个数,意义和上面谈到的JDK的线程池意义一致。
keepAliveTime 和 unit:和JDK线程池的参数意义一致。
taskQueue:任务队列,用不保存提交的任务。
policy:拒绝策略,主要有一下四种策略:
publicenumRejectPolicy{
ABORT,
CALLER_RUN,
DISCARD_OLDEST,
DISCARD
}
workers:用于保存工作线程。
isStopped:线程池是否被关闭了。
useTimed:主要是用于表示是否使用上面的 keepAliveTime 和 unit,如果使用就是在一定的时间内,如果没有从任务队列当中获取到任务,线程就从线程池退出,但是需要保证线程池当中最小的线程个数不小于 corePoolSize 。
实现Runnable
//下面这个方法是向线程池提交任务
publicvoidexecute(Runnablerunnable)throwsInterruptedException{
checkPoolState();
if(addworker(runnable,false)//如果能够加入新的线程执行任务加入成功就直接返回
||!taskQueue.offer(runnable)//如果taskQueue.offer(runnable)返回false说明提交任务失败任务队列已经满了
||addWorker(runnable,true))//使用能够使用的最大的线程数(maximumPoolSize)看是否能够产生新的线程
return;
//如果任务队列满了而且不能够加入新的线程则拒绝这个任务
if(!taskQueue.offer(runnable))
reject(runnable);
}
在上面的代码当中:
checkPoolState函数是检查线程池的状态,当线程池被停下来之后就不能够在提交任务:
privatevoidcheckPoolState(){
if(isStopped){
//如果线程池已经停下来了,就不在向任务队列当中提交任务了
thrownewRuntimeException("threadpoolhasbeenstopped,soquitsubmittingtask");
}
}
addWorker函数是往线程池当中提交任务并且产生一个线程,并且这个线程执行的第一个任务就是传递的参数。max表示线程的最大数目,max == true 的时候表示使用 maximumPoolSize 否则使用 corePoolSize,当返回值等于 true 的时候表示执行成功,否则表示执行失败。
/**
*
*@paramrunnable需要被执行的任务
*@parammax是否使用maximumPoolSize
*@returnboolean
*/
publicsynchronizedbooleanaddWorker(Runnablerunnable,booleanmax){
if(ct.get()>=corePoolSize&&!max)
returnfalse;
if(ct.get()>=maximumPoolSize&&max)
returnfalse;
Workerworker=newWorker(runnable);
workers.add(worker);
Threadthread=newThread(worker,"ThreadPool-"+"Thread-"+ct.addAndGet(1));
thread.start();
returntrue;
}
实现Callable
这个函数其实比较简单,只需要将传入的Callable对象封装成一个FutureTask对象即可,因为FutureTask实现了Callable和Runnable两个接口,然后将这个结果返回即可,得到这个对象,再调用对象的 get 方法就能够得到结果。
public<V>RunnableFuture<V>submit(Callable<V>task)throwsInterruptedException{
checkPoolState();
FutureTask<V>futureTask=newFutureTask<>(task);
execute(futureTask);
returnfutureTask;
}
拒绝策略的实现
根据前面提到的各种策略的具体实现方式,具体的代码实现如下所示:
privatevoidreject(Runnablerunnable)throwsInterruptedException{
switch(policy){
caseABORT:
thrownewRuntimeException("taskqueueisfull");
caseCALLER_RUN:
runnable.run();
caseDISCARD://直接放弃这个任务
return;
caseDISCARD_OLDEST:
//放弃等待时间最长的任务也就是队列当中的第一个任务
taskQueue.poll();
execute(runnable);//重新执行这个任务
}
}
线程池关闭实现
一共两种方式实现线程池关闭:
直接关闭线程池,不管任务队列当中的任务是否被全部执行完成。安全关闭线程池,先等待任务队列当中所有的任务被执行完成,再关闭线程池,但是在这个过程当中不允许继续提交任务了,这一点已经在函数 checkPoolState 当中实现了。
//强制关闭线程池
publicsynchronizedvoidstop(){
isStopped=true;
for(Workerworker:workers){
worker.stopWorker();
}
}
publicsynchronizedvoidshutDown(){
//先表示关闭线程池线程就不能再向线程池提交任务
isStopped=true;
//先等待所有的任务执行完成再关闭线程池
waitForAllTasks();
stop();
}
privatevoidwaitForAllTasks(){
//当线程池当中还有任务的时候就不退出循环
while(taskQueue.size()>0){
Thread.yield();
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
工作线程的工作实现
@Override
publicvoidrun(){
//先执行传递过来的第一个任务这里是一个小的优化让线程直接执行第一个任务不需要
//放入任务队列再取出来执行了
firstTask.run();
thisThread=Thread.currentThread();
while(!isStopped){
try{
//是否使用时间就在这里显示出来了
Runnabletask=useTimed?taskQueue.poll(keepAliveTime,unit):taskQueue.take();
if(task==null){
inti;
booleanexit=true;
//如果当前线程数大于核心线程数则使用CAS去退出用于保证在线程安全下的退出
//且保证线程的个数不小于corePoolSize下面这段代码需要仔细分析一下
if(ct.get()>corePoolSize){
do{
i=ct.get();
if(i<=corePoolSize){
exit=false;
break;
}
}while(!ct.compareAndSet(i,i-1));
if(exit){
return;
}
}
}else{
task.run();
}
}catch(InterruptedExceptione){
//donothing
}
}
}
我们现在来仔细分析一下,线程退出线程池的时候是如何保证线程池当中总的线程数是不小于 corePoolSize 的!首先整体的框架是使用 CAS 进行实现,具体代码为 do ... while 操作,然后在 while 操作里面使用 CAS 进行测试替换,如果没有成功再次获取 ,当线程池当中核心线程的数目小于等于 corePoolSize 的时候也需要退出循环,因为线程池当中线程的个数不能小于 corePoolSize 。因此使用 break 跳出循环的线程是不会退出线程池的。
线程池实现的BUG
在我们自己实现的线程池当中当线程退出的时候,workers 当中还保存这指向这个线程的对象,但是当线程退出的时候我们还没有在 workers 当中删除这个对象,因此这个线程对象不会被垃圾回收器收集掉,但是我们这个只是一个线程池实现的例子而已,并不用于生产环境,只是为了帮助大家理解线程池的原理。
完整代码
packagecscore.concurrent.Java.threadpoolv2;
importjava.util.ArrayList;
importjava.util.concurrent.*;
importjava.util.concurrent.atomic.AtomicInteger;
publicclassThreadPool{
privateAtomicIntegerct=newAtomicInteger(0);//当前在执行任务的线程个数
privateintcorePoolSize;
privateintmaximumPoolSize;
privatelongkeepAliveTime;
privateTimeUnitunit;
privateBlockingQueue<Runnable>taskQueue;
privateRejectPolicypolicy;
privateArrayList<Worker>workers=newArrayList<>();
privatevolatilebooleanisStopped;
privatebooleanuseTimed;
publicintgetCt(){
returnct.get();
}
publicThreadPool(intcorePoolSize,intmaximumPoolSize,TimeUnitunit,longkeepAliveTime,RejectPolicypolicy
,intmaxTasks){
//pleaseadd-eatovmoptionstomakeassertkeywordenable
assertcorePoolSize>0;
assertmaximumPoolSize>0;
assertkeepAliveTime>=0;
assertmaxTasks>0;
this.corePoolSize=corePoolSize;
this.maximumPoolSize=maximumPoolSize;
this.unit=unit;
this.policy=policy;
this.keepAliveTime=keepAliveTime;
taskQueue=newArrayBlockingQueue<Runnable>(maxTasks);
useTimed=keepAliveTime!=0;
}
/**
*
*@paramrunnable需要被执行的任务
*@parammax是否使用maximumPoolSize
*@returnboolean
*/
publicsynchronizedbooleanaddWorker(Runnablerunnable,booleanmax){
if(ct.get()>=corePoolSize&&!max)
returnfalse;
if(ct.get()>=maximumPoolSize&&max)
returnfalse;
Workerworker=newWorker(runnable);
workers.add(worker);
Threadthread=newThread(worker,"ThreadPool-"+"Thread-"+ct.addAndGet(1));
thread.start();
returntrue;
}
//下面这个方法是向线程池提交任务
publicvoidexecute(Runnablerunnable)throwsInterruptedException{
checkPoolState();
if(addWorker(runnable,false)//如果能够加入新的线程执行任务加入成功就直接返回
||!taskQueue.offer(runnable)//如果taskQueue.offer(runnable)返回false说明提交任务失败任务队列已经满了
||addWorker(runnable,true))//使用能够使用的最大的线程数(maximumPoolSize)看是否能够产生新的线程
return;
//如果任务队列满了而且不能够加入新的线程则拒绝这个任务
if(!taskQueue.offer(runnable))
reject(runnable);
}
privatevoidreject(Runnablerunnable)throwsInterruptedException{
switch(policy){
caseABORT:
thrownewRuntimeException("taskqueueisfull");
caseCALLER_RUN:
runnable.run();
caseDISCARD:
return;
caseDISCARD_OLDEST:
//放弃等待时间最长的任务
taskQueue.poll();
execute(runnable);
}
}
privatevoidcheckPoolState(){
if(isStopped){
//如果线程池已经停下来了,就不在向任务队列当中提交任务了
thrownewRuntimeException("threadpoolhasbeenstopped,soquitsubmittingtask");
}
}
public<V>RunnableFuture<V>submit(Callable<V>task)throwsInterruptedException{
checkPoolState();
FutureTask<V>futureTask=newFutureTask<>(task);
execute(futureTask);
returnfutureTask;
}
//强制关闭线程池
publicsynchronizedvoidstop(){
isStopped=true;
for(Workerworker:workers){
worker.stopWorker();
}
}
publicsynchronizedvoidshutDown(){
//先表示关闭线程池线程就不能再向线程池提交任务
isStopped=true;
//先等待所有的任务执行完成再关闭线程池
waitForAllTasks();
stop();
}
privatevoidwaitForAllTasks(){
//当线程池当中还有任务的时候就不退出循环
while(taskQueue.size()>0){
Thread.yield();
try{
Thread.sleep(1000);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
classWorkerimplementsRunnable{
privateThreadthisThread;
privatefinalRunnablefirstTask;
privatevolatilebooleanisStopped;
publicWorker(RunnablefirstTask){
this.firstTask=firstTask;
}
@Override
publicvoidrun(){
//先执行传递过来的第一个任务这里是一个小的优化让线程直接执行第一个任务不需要
//放入任务队列再取出来执行了
firstTask.run();
thisThread=Thread.currentThread();
while(!isStopped){
try{
Runnabletask=useTimed?taskQueue.poll(keepAliveTime,unit):taskQueue.take();
if(task==null){
inti;
booleanexit=true;
if(ct.get()>corePoolSize){
do{
i=ct.get();
if(i<=corePoolSize){
exit=false;
break;
}
}while(!ct.compareAndSet(i,i-1));
if(exit){
return;
}
}
}else{
task.run();
}
}catch(InterruptedExceptione){
//donothing
}
}
}
publicsynchronizedvoidstopWorker(){
if(isStopped){
thrownewRuntimeException("threadhasbeeninterrupted");
}
isStopped=true;
thisThread.interrupt();
}
}
}
线程池测试
packagecscore.concurrent.java.threadpoolv2;
importjava.util.concurrent.ExecutionException;
importjava.util.concurrent.RunnableFuture;
importjava.util.concurrent.TimeUnit;
publicclassTest{
publicstaticvoidmain(String[]args)throwsInterruptedException,ExecutionException{
varpool=newThreadPool(2,5,TimeUnit.SECONDS,10,RejectPolicy.ABORT,100000);
for(inti=0;i<10;i++){
RunnableFuture<Integer>submit=pool.submit(()->{
System.out.println(Thread.currentThread().getName()+"outputa");
try{
Thread.sleep(10);
}catch(InterruptedExceptione){
e.printStackTrace();
}
return0;
});
System.out.println(submit.get());
}
intn=15;
while(n-->0){
System.out.println("NumberThreads="+pool.getCt());
Thread.sleep(1000);
}
pool.shutDown();
}
}
上面测试代码的输出结果如下所示:
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2
从上面的代码可以看出我们实现了正确的任务实现结果,同时线程池当中的核心线程数从 2 变到了 5 ,当线程池当中任务队列全部别执行完成之后,线程的数目重新降下来了,这确实是我们想要达到的结果。
总结
在本篇文章当中主要给大家介绍了如何实现一个类似于JDK中的线程池,里面有非常多的实现细节,大家可以仔细捋一下其中的流程,对线程池的理解将会非常有帮助。
以上就是Java手写线程池之向JDK线程池进发的详细内容,更多关于Java手写线程池的资料请关注我们其它相关文章!










