Java手写线程池之向JDK线程池进发

2022-10-23 19:03:09
目录
前言JDK线程池一瞥自己动手实现线程池线程池参数介绍实现Runnable实现Callable拒绝策略的实现线程池关闭实现工作线程的工作实现线程池实现的BUG完整代码线程池测试总结

前言

在前面的文章自己动手写乞丐版线程池中,我们写了一个非常简单的线程池实现,这个只是一个非常简单的实现,在本篇文章当中我们将要实现一个和JDK内部实现的线程池非常相似的线程池。

JDK线程池一瞥

我们首先看一个JDK给我们提供的线程池ThreadPoolExecutor的构造函数的参数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

参数解释:

1.corePoolSize:这个参数你可以理解为线程池当中至少需要 corePoolSize 个线程,初始时线程池当中线程的个数为0,当线程池当中线程的个数小于 corePoolSize 每次提交一个任务都会创建一个线程,并且先执行这个提交的任务,然后再去任务队列里面去获取新的任务,然后再执行。

2.maximumPoolSize:这个参数指的是线程池当中能够允许的最大的线程的数目,当任务队列满了之后如果这个时候有新的任务想要加入队列当中,当发现队列满了之后就创建新的线程去执行任务,但是需要满足最大的线程的个数不能够超过 maximumPoolSize 。

3.keepAliveTime 和 unit:这个主要是用于时间的表示,当队列当中多长时间没有数据的时候线程自己退出,前面谈到了线程池当中任务过多的时候会超过 corePoolSize ,当线程池闲下来的时候这些多余的线程就可以退出了。

4.workQueue:这个就是用于保存任务的阻塞队列。

5.threadFactory:这个参数倒不是很重要,线程工厂。

6.handler:这个表示拒绝策略,JDK给我们提供了四种策略:

    AbortPolicy:抛出异常。DiscardPolicy:放弃这个任务。CallerRunPolicy:提交任务的线程执行。DiscardOldestPolicy:放弃等待时间最长的任务。

    如果上面的参数你不能够理解,可以先阅读这篇文章自己动手写乞丐版线程池。基于上面谈到的参数,线程池当中提交任务的流程大致如下图所示:

    自己动手实现线程池

    根据前面的参数分析我们自己实现的线程池需要实现一下功能:

      能够提交Runnable的任务和Callable的任务。线程池能够自己实现动态的扩容和所容,动态调整线程池当中线程的数目,当任务多的时候能够增加线程的数目,当任务少的时候多出来的线程能够自动退出。有自己的拒绝策略,当任务队列满了,线程数也达到最大的时候,需要拒绝提交的任务。

      线程池参数介绍

        private AtomicInteger ct = new AtomicInteger(0); // 当前在执行任务的线程个数
        private int corePoolSize;
        private int maximumPoolSize;
        private long keepAliveTime;
        private TimeUnit unit;
        private BlockingQueue<Runnable> taskQueue;
        private RejectPolicy policy;
      
        private ArrayList<Worker> workers = new ArrayList<>();
      
        private volatile boolean isStopped;
        private boolean useTimed;

      参数解释如下:

        ct:表示当前线程池当中线程的个数。corePoolSize:线程池当中核心线程的个数,意义和上面谈到的JDK的线程池意义一致。maximumPoolSize:线程池当中最大的线程个数,意义和上面谈到的JDK的线程池意义一致。keepAliveTime 和 unit:和JDK线程池的参数意义一致。taskQueue:任务队列,用不保存提交的任务。policy:拒绝策略,主要有一下四种策略:
        public enum RejectPolicy {
        
          ABORT,
          CALLER_RUN,
          DISCARD_OLDEST,
          DISCARD
        }
        

        workers:用于保存工作线程。

        isStopped:线程池是否被关闭了。

        useTimed:主要是用于表示是否使用上面的 keepAliveTime 和 unit,如果使用就是在一定的时间内,如果没有从任务队列当中获取到任务,线程就从线程池退出,但是需要保证线程池当中最小的线程个数不小于 corePoolSize 。

        实现Runnable

          // 下面这个方法是向线程池提交任务
          public void execute(Runnable runnable) throws InterruptedException {
            checkPoolState();
        
            if (addWorker(runnable, false)  // 如果能够加入新的线程执行任务 加入成功就直接返回
                    || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 说明提交任务失败 任务队列已经满了
                    || addWorker(runnable, true)) // 使用能够使用的最大的线程数 (maximumPoolSize) 看是否能够产生新的线程
              return;
        
            // 如果任务队列满了而且不能够加入新的线程 则拒绝这个任务
            if (!taskQueue.offer(runnable))
              reject(runnable);
          }

        在上面的代码当中:

        checkPoolState函数是检查线程池的状态,当线程池被停下来之后就不能够在提交任务:

          private void checkPoolState() {
            if (isStopped) {
              // 如果线程池已经停下来了,就不在向任务队列当中提交任务了
              throw new RuntimeException("thread pool has been stopped, so quit submitting task");
            }
          }

        addWorker函数是往线程池当中提交任务并且产生一个线程,并且这个线程执行的第一个任务就是传递的参数。max表示线程的最大数目,max == true 的时候表示使用 maximumPoolSize 否则使用 corePoolSize,当返回值等于 true 的时候表示执行成功,否则表示执行失败。

          /**
           *
           * @param runnable 需要被执行的任务
           * @param max 是否使用 maximumPoolSize
           * @return boolean
           */
          public synchronized boolean addWorker(Runnable runnable, boolean max) {
        
            if (ct.get() >= corePoolSize && !max)
              return false;
            if (ct.get() >= maximumPoolSize && max)
              return false;
            Worker worker = new Worker(runnable);
            workers.add(worker);
            Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
            thread.start();
            return true;
          }

        实现Callable

        这个函数其实比较简单,只需要将传入的Callable对象封装成一个FutureTask对象即可,因为FutureTask实现了Callable和Runnable两个接口,然后将这个结果返回即可,得到这个对象,再调用对象的>

          public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
            checkPoolState();
            FutureTask<V> futureTask = new FutureTask<>(task);
            execute(futureTask);
            return futureTask;
          }
        

        拒绝策略的实现

        根据前面提到的各种策略的具体实现方式,具体的代码实现如下所示:

          private void reject(Runnable runnable) throws InterruptedException {
            switch (policy) {
              case ABORT:
                throw new RuntimeException("task queue is full");
              case CALLER_RUN:
                runnable.run();
              case DISCARD: // 直接放弃这个任务
                return;
              case DISCARD_OLDEST:
                // 放弃等待时间最长的任务 也就是队列当中的第一个任务
                taskQueue.poll();
                execute(runnable); // 重新执行这个任务
            }
          }
        

        线程池关闭实现

        一共两种方式实现线程池关闭:

          直接关闭线程池,不管任务队列当中的任务是否被全部执行完成。安全关闭线程池,先等待任务队列当中所有的任务被执行完成,再关闭线程池,但是在这个过程当中不允许继续提交任务了,这一点已经在函数>
            // 强制关闭线程池
            public synchronized void stop() {
              isStopped = true;
              for (Worker worker : workers) {
                worker.stopWorker();
              }
            }
          
            public synchronized void shutDown() {
              // 先表示关闭线程池 线程就不能再向线程池提交任务
              isStopped = true;
              // 先等待所有的任务执行完成再关闭线程池
              waitForAllTasks();
              stop();
            }
          
            private void waitForAllTasks() {
              // 当线程池当中还有任务的时候 就不退出循环
              while (taskQueue.size() > 0) {
                Thread.yield();
                try {
                  Thread.sleep(1000);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              }
            }
          

          工作线程的工作实现

              @Override
              public void run() {
                // 先执行传递过来的第一个任务 这里是一个小的优化 让线程直接执行第一个任务 不需要
                // 放入任务队列再取出来执行了
                firstTask.run();
          
                thisThread = Thread.currentThread();
                while (!isStopped) {
                  try {
                    // 是否使用时间就在这里显示出来了
                    Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
                    if (task == null) {
                      int i;
                      boolean exit = true;
                      // 如果当前线程数大于核心线程数 则使用 CAS 去退出 用于保证在线程安全下的退出
                      // 且保证线程的个数不小于 corePoolSize 下面这段代码需要仔细分析一下
                      if (ct.get() > corePoolSize) {
                        do{
                          i = ct.get();
                          if (i <= corePoolSize) {
                            exit = false;
                            break;
                          }
                        }while (!ct.compareAndSet(i, i - 1));
                        if (exit) {
                          return;
                        }
                      }
                    }else {
                      task.run();
                    }
                  } catch (InterruptedException e) {
                    // do nothing
                  }
                }
              }

          我们现在来仔细分析一下,线程退出线程池的时候是如何保证线程池当中总的线程数是不小于 corePoolSize 的!首先整体的框架是使用 CAS 进行实现,具体代码为 do ... while 操作,然后在 while 操作里面使用 CAS 进行测试替换,如果没有成功再次获取 ,当线程池当中核心线程的数目小于等于 corePoolSize 的时候也需要退出循环,因为线程池当中线程的个数不能小于 corePoolSize 。因此使用 break 跳出循环的线程是不会退出线程池的。

          线程池实现的BUG

          在我们自己实现的线程池当中当线程退出的时候,workers>

          完整代码

          package cscore.concurrent.java.threadpoolv2;
          
          
          import java.util.ArrayList;
          import java.util.concurrent.*;
          import java.util.concurrent.atomic.AtomicInteger;
          
          public class ThreadPool {
          
            private AtomicInteger ct = new AtomicInteger(0); // 当前在执行任务的线程个数
            private int corePoolSize;
            private int maximumPoolSize;
            private long keepAliveTime;
            private TimeUnit unit;
            private BlockingQueue<Runnable> taskQueue;
            private RejectPolicy policy;
          
            private ArrayList<Worker> workers = new ArrayList<>();
          
            private volatile boolean isStopped;
            private boolean useTimed;
          
            public int getCt() {
              return ct.get();
            }
          
            public ThreadPool(int corePoolSize, int maximumPoolSize, TimeUnit unit, long keepAliveTime, RejectPolicy policy
                    , int maxTasks) {
              // please add -ea to vm options to make assert keyword enable
              assert corePoolSize > 0;
              assert maximumPoolSize > 0;
              assert keepAliveTime >= 0;
              assert maxTasks > 0;
          
              this.corePoolSize = corePoolSize;
              this.maximumPoolSize = maximumPoolSize;
              this.unit = unit;
              this.policy = policy;
              this.keepAliveTime = keepAliveTime;
              taskQueue = new ArrayBlockingQueue<Runnable>(maxTasks);
              useTimed = keepAliveTime != 0;
            }
          
            /**
             *
             * @param runnable 需要被执行的任务
             * @param max 是否使用 maximumPoolSize
             * @return boolean
             */
            public synchronized boolean addWorker(Runnable runnable, boolean max) {
          
              if (ct.get() >= corePoolSize && !max)
                return false;
              if (ct.get() >= maximumPoolSize && max)
                return false;
              Worker worker = new Worker(runnable);
              workers.add(worker);
              Thread thread = new Thread(worker, "ThreadPool-" + "Thread-" + ct.addAndGet(1));
              thread.start();
              return true;
            }
          
            // 下面这个方法是向线程池提交任务
            public void execute(Runnable runnable) throws InterruptedException {
              checkPoolState();
          
              if (addWorker(runnable, false)  // 如果能够加入新的线程执行任务 加入成功就直接返回
                      || !taskQueue.offer(runnable) // 如果 taskQueue.offer(runnable) 返回 false 说明提交任务失败 任务队列已经满了
                      || addWorker(runnable, true)) // 使用能够使用的最大的线程数 (maximumPoolSize) 看是否能够产生新的线程
                return;
          
              // 如果任务队列满了而且不能够加入新的线程 则拒绝这个任务
              if (!taskQueue.offer(runnable))
                reject(runnable);
            }
          
            private void reject(Runnable runnable) throws InterruptedException {
              switch (policy) {
                case ABORT:
                  throw new RuntimeException("task queue is full");
                case CALLER_RUN:
                  runnable.run();
                case DISCARD:
                  return;
                case DISCARD_OLDEST:
                  // 放弃等待时间最长的任务
                  taskQueue.poll();
                  execute(runnable);
              }
            }
          
            private void checkPoolState() {
              if (isStopped) {
                // 如果线程池已经停下来了,就不在向任务队列当中提交任务了
                throw new RuntimeException("thread pool has been stopped, so quit submitting task");
              }
            }
          
            public <V> RunnableFuture<V> submit(Callable<V> task) throws InterruptedException {
              checkPoolState();
              FutureTask<V> futureTask = new FutureTask<>(task);
              execute(futureTask);
              return futureTask;
            }
          
            // 强制关闭线程池
            public synchronized void stop() {
              isStopped = true;
              for (Worker worker : workers) {
                worker.stopWorker();
              }
            }
          
            public synchronized void shutDown() {
              // 先表示关闭线程池 线程就不能再向线程池提交任务
              isStopped = true;
              // 先等待所有的任务执行完成再关闭线程池
              waitForAllTasks();
              stop();
            }
          
            private void waitForAllTasks() {
              // 当线程池当中还有任务的时候 就不退出循环
              while (taskQueue.size() > 0) {
                Thread.yield();
                try {
                  Thread.sleep(1000);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              }
            }
          
            class Worker implements Runnable {
          
              private Thread thisThread;
          
              private final Runnable firstTask;
              private volatile boolean isStopped;
          
              public Worker(Runnable firstTask) {
                this.firstTask = firstTask;
              }
          
              @Override
              public void run() {
                // 先执行传递过来的第一个任务 这里是一个小的优化 让线程直接执行第一个任务 不需要
                // 放入任务队列再取出来执行了
                firstTask.run();
          
                thisThread = Thread.currentThread();
                while (!isStopped) {
                  try {
                    Runnable task = useTimed ? taskQueue.poll(keepAliveTime, unit) : taskQueue.take();
                    if (task == null) {
                      int i;
                      boolean exit = true;
                      if (ct.get() > corePoolSize) {
                        do{
                          i = ct.get();
                          if (i <= corePoolSize) {
                            exit = false;
                            break;
                          }
                        }while (!ct.compareAndSet(i, i - 1));
                        if (exit) {
                          return;
                        }
                      }
                    }else {
                      task.run();
                    }
                  } catch (InterruptedException e) {
                    // do nothing
                  }
                }
              }
          
              public synchronized void stopWorker() {
                if (isStopped) {
                  throw new RuntimeException("thread has been interrupted");
                }
                isStopped = true;
                thisThread.interrupt();
              }
          
            }
          
          }
          

          线程池测试

          package cscore.concurrent.java.threadpoolv2;
          
          import java.util.concurrent.ExecutionException;
          import java.util.concurrent.RunnableFuture;
          import java.util.concurrent.TimeUnit;
          
          public class Test {
          
            public static void main(String[] args) throws InterruptedException, ExecutionException {
              var pool = new ThreadPool(2, 5, TimeUnit.SECONDS, 10, RejectPolicy.ABORT, 100000);
          
              for (int i = 0; i < 10; i++) {
                RunnableFuture<Integer> submit = pool.submit(() -> {
                  System.out.println(Thread.currentThread().getName() + " output a");
                  try {
                    Thread.sleep(10);
                  } catch (InterruptedException e) {
                    e.printStackTrace();
                  }
                  return 0;
                });
                System.out.println(submit.get());
              }
              int n = 15;
              while (n-- > 0) {
                System.out.println("Number Threads = " + pool.getCt());
                Thread.sleep(1000);
              }
              pool.shutDown();
            }
          }
          
          

          上面测试代码的输出结果如下所示:

          ThreadPool-Thread-2 output a
          ThreadPool-Thread-1 output a
          ThreadPool-Thread-3 output a
          ThreadPool-Thread-4 output a
          Number Threads = 5
          ThreadPool-Thread-5 output a
          ThreadPool-Thread-2 output a
          ThreadPool-Thread-1 output a
          ThreadPool-Thread-3 output a
          ThreadPool-Thread-4 output a
          ThreadPool-Thread-5 output a
          ThreadPool-Thread-2 output a
          ThreadPool-Thread-1 output a
          ThreadPool-Thread-4 output a
          ThreadPool-Thread-3 output a
          ThreadPool-Thread-5 output a
          ThreadPool-Thread-2 output a
          ThreadPool-Thread-1 output a
          ThreadPool-Thread-4 output a
          Number Threads = 5
          Number Threads = 5
          Number Threads = 5
          Number Threads = 5
          Number Threads = 5
          Number Threads = 5
          Number Threads = 5
          Number Threads = 5
          Number Threads = 5
          Number Threads = 3
          Number Threads = 2
          Number Threads = 2
          Number Threads = 2
          Number Threads = 2

          从上面的代码可以看出我们实现了正确的任务实现结果,同时线程池当中的核心线程数从 2 变到了 5 ,当线程池当中任务队列全部别执行完成之后,线程的数目重新降下来了,这确实是我们想要达到的结果。

          总结

          在本篇文章当中主要给大家介绍了如何实现一个类似于JDK中的线程池,里面有非常多的实现细节,大家可以仔细捋一下其中的流程,对线程池的理解将会非常有帮助。

          以上就是Java手写线程池之向JDK线程池进发的详细内容,更多关于Java手写线程池的资料请关注易采站长站其它相关文章!