HOME 首页
SERVICE 服务产品
XINMEITI 新媒体代运营
CASE 服务案例
NEWS 热点资讯
ABOUT 关于我们
CONTACT 联系我们
创意岭
让品牌有温度、有情感
专注品牌策划15年

    线程池源码解析(线程池 原理)

    发布时间:2023-04-21 20:09:29     稿源: 创意岭    阅读: 56        

    大家好!今天让创意岭的小编来大家介绍下关于线程池源码解析的问题,以下是小编对此问题的归纳整理,让我们一起来看看吧。

    开始之前先推荐一个非常厉害的Ai人工智能工具,一键生成原创文章、方案、文案、工作计划、工作报告、论文、代码、作文、做题和对话答疑等等

    只需要输入关键词,就能返回你想要的内容,越精准,写出的就越详细,有微信小程序端、在线网页版、PC客户端

    官网:https://ai.de1919.com

    创意岭作为行业内优秀的企业,服务客户遍布全球各地,如需了解SEO相关业务请拨打电话175-8598-2043,或添加微信:1454722008

    本文目录:

    线程池源码解析(线程池 原理)

    一、源码修炼笔记之Dubbo线程池策略

    FixedThreadPool

    FixThreadPool内部是通过ThreadPoolExecutor来创建线程,核心线程数和最大线程数都是上下文中指定的线程数量threads,因为不存在空闲线程所以keepAliveTime为0,

    当queues=0,创建SynchronousQueue阻塞队列;

    当queues<0,创建无界的阻塞队列LinkedBlockingQueue;

    当queues>0,创建有界的阻塞队列LinkedBlockingQueue。

    采用dubbo自己实现的线程工厂NamedInternalThreadFactory,将线程置为守护线程(Demon)

    拒绝策略为AbortPolicyWithReport,策略为将调用时的堆栈信息保存到本地文件中,并抛出异常RejectedExecutionException

    CachedThreadPool

    CachedThreadPool与FixedThreadPool的区别是核心线程数和最大线程数不相等,通过alive来控制空闲线程的释放

    LimitedThreadPool

    LimitedThreadPool与CachedThreadPool的区别是空闲线程的超时时间为Long.MAX_VALUE,相当于线程数量不会动态变化了,创建的线程不会被释放。

    EagerThreadPool

    与上述三种线程池不同,EagerThreadPool并非通过JUC中的ThreadPoolExecutor来创建线程池,而是通过EagerThreadPoolExecutor来创建线程池,EagerThreadPoolExecutor继承自ThreadPoolExecutor,实现自定义的execute方法,采用的阻塞队列是TaskQueue,TaskQueue继承自LinkedBlockingQueue。

    execute方法首先调用ThreadPoolExecutor的execute方法,如果执行失败会重新放入TaskQueue进行重试。

    实现自定义的ThreadPool

    ThreadPool被定义为一个扩展点,如下所示,

    其默认实现是FixedThreadPool,可以通过实现该扩展来实现自定义的线程池策略。

    二、Java线程池中的核心线程是如何被重复利用的

    Java线程池中的核心线程是如何被重复利用的?

    引言

    在Java开发中,经常需要创建线程去执行一些任务,实现起来也非常方便,但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。此时,我们很自然会想到使用线程池来解决这个问题。

    使用线程池的好处:

    • 降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n多个子任务需要执行,如果我们为每个子任务都创建一个执行线程,而创建线程的过程是需要一定的系统消耗的,最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程,任务有多个,但执行任务的线程可以通过线程池来复用,这样减少了创建线程的开销,系统资源利用率得到了提升。

    • 降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的,而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程,如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。

    • 提升任务处理速度。线程池中长期驻留了一定数量的活线程,当任务需要执行时,我们不必先去创建线程,线程池会自己选择利用现有的活线程来处理任务。

    • 很显然,线程池一个很显著的特征就是“长期驻留了一定数量的活线程”,避免了频繁创建线程和销毁线程的开销,那么它是如何做到的呢?我们知道一个线程只要执行完了run()方法内的代码,这个线程的使命就完成了,等待它的就是销毁。既然这是个“活线程”,自然是不能很快就销毁的。为了搞清楚这个“活线程”是如何工作的,下面通过追踪源码来看看能不能解开这个疑问。

      分析方法

      在分析源码之前先来思考一下要怎么去分析,源码往往是比较复杂的,如果知识储备不够丰厚,很有可能会读不下去,或者读岔了。一般来讲要时刻紧跟着自己的目标来看代码,跟目标关系不大的代码可以不理会它,一些异常的处理也可以暂不理会,先看正常的流程。就我们现在要分析的源码而言,目标就是看看线程是如何被复用的。那么对于线程池的状态的管理以及非正常状态下的处理代码就可以不理会,具体来讲,在ThreadPollExcutor类中,有一个字段 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 是对线程池的运行状态和线程池中有效线程的数量进行控制的, 它包含两部分信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),还有几个对ctl进行计算的方法:

    • // 获取运行状态
    • private static int runStateOf(int c)     { return c & ~CAPACITY; }
    • // 获取活动线程数
    • private static int workerCountOf(int c)  { return c & CAPACITY; }123456
    • 以上两个方法在源码中经常用到,结合我们的目标,对运行状态的一些判断及处理可以不用去管,而对当前活动线程数要加以关注等等。

      下面将遵循这些原则来分析源码。

      解惑

      当我们要向线程池添加一个任务时是调用ThreadPollExcutor对象的execute(Runnable command)方法来完成的,所以先来看看ThreadPollExcutor类中的execute(Runnable command)方法的源码:

    • public void execute(Runnable command) {
    •    if (command == null)
    •        throw new NullPointerException();
    •    int c = ctl.get();
    •    if (workerCountOf(c) < corePoolSize) {
    •        if (addWorker(command, true))
    •            return;
    •        c = ctl.get();
    •    }
    •    if (isRunning(c) && workQueue.offer(command)) {
    •        int recheck = ctl.get();
    •        if (! isRunning(recheck) && remove(command))
    •            reject(command);
    •        else if (workerCountOf(recheck) == 0)
    •            addWorker(null, false);
    •    }
    •    else if (!addWorker(command, false))
    •        reject(command);
    • }123456789101112131415161718192021
    • 按照我们在分析方法中提到的一些原则,去掉一些相关性不强的代码,看看核心代码是怎样的。

    • // 为分析而简化后的代码
    • public void execute(Runnable command) {
    •    int c = ctl.get();
    •    if (workerCountOf(c) < corePoolSize) {
    •        // 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中,并把任务添加到该线程中
    •        if (addWorker(command, true))
    •            return;
    •        c = ctl.get();
    •    }
    •    // 如果当前活动线程数大于等于corePoolSize,则尝试将任务放入缓存队列
    •    if (workQueue.offer(command)) {
    •        int recheck = ctl.get();
    •        if (workerCountOf(recheck) == 0)
    •            addWorker(null, false);
    •    }else {
    •        // 缓存已满,新建一个线程放入线程池,并把任务添加到该线程中(此时新建的线程相当于非核心线程)
    •        addWorker(command, false)
    •    }
    • }12345678910111213141516171819202122
    • 这样一看,逻辑应该清晰很多了。

    • 如果 当前活动线程数 < 指定的核心线程数,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);

    • 如果 当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;

    • 如果 当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);

    • 接下来看 addWorker(Runnable firstTask, boolean core)方法

    • private boolean addWorker(Runnable firstTask, boolean core) {
    •    retry:
    •    for (;;) {
    •        int c = ctl.get();
    •        int rs = runStateOf(c);
    •        // Check if queue empty only if necessary.
    •        if (rs >= SHUTDOWN &&
    •            ! (rs == SHUTDOWN &&
    •               firstTask == null &&
    •               ! workQueue.isEmpty()))
    •            return false;
    •        for (;;) {
    •            int wc = workerCountOf(c);
    •            if (wc >= CAPACITY ||
    •                wc >= (core ? corePoolSize : maximumPoolSize))
    •                return false;
    •            if (compareAndIncrementWorkerCount(c))
    •                break retry;
    •            c = ctl.get();  // Re-read ctl
    •            if (runStateOf(c) != rs)
    •                continue retry;
    •            // else CAS failed due to workerCount change; retry inner loop
    •        }
    •    }
    •    boolean workerStarted = false;
    •    boolean workerAdded = false;
    •    Worker w = null;
    •    try {
    •        w = new Worker(firstTask);
    •        final Thread t = w.thread;
    •        if (t != null) {
    •            final ReentrantLock mainLock = this.mainLock;
    •            mainLock.lock();
    •            try {
    •                // Recheck while holding lock.
    •                // Back out on ThreadFactory failure or if
    •                // shut down before lock acquired.
    •                int rs = runStateOf(ctl.get());
    •                if (rs < SHUTDOWN ||
    •                    (rs == SHUTDOWN && firstTask == null)) {
    •                    if (t.isAlive()) // precheck that t is startable
    •                        throw new IllegalThreadStateException();
    •                    workers.add(w);
    •                    int s = workers.size();
    •                    if (s > largestPoolSize)
    •                        largestPoolSize = s;
    •                    workerAdded = true;
    •                }
    •            } finally {
    •                mainLock.unlock();
    •            }
    •            if (workerAdded) {
    •                t.start();
    •                workerStarted = true;
    •            }
    •        }
    •    } finally {
    •        if (! workerStarted)
    •            addWorkerFailed(w);
    •    }
    •    return workerStarted;
    • }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
    • 同样,我们也来简化一下:

    • // 为分析而简化后的代码
    • private boolean addWorker(Runnable firstTask, boolean core) {
    •    int wc = workerCountOf(c);
    •    if (wc >= (core ? corePoolSize : maximumPoolSize))
    •        // 如果当前活动线程数 >= 指定的核心线程数,不创建核心线程
    •        // 如果当前活动线程数 >= 指定的最大线程数,不创建非核心线程  
    •        return false;
    •    boolean workerStarted = false;
    •    boolean workerAdded = false;
    •    Worker w = null;
    •    try {
    •        // 新建一个Worker,将要执行的任务作为参数传进去
    •        w = new Worker(firstTask);
    •        final Thread t = w.thread;
    •        if (t != null) {
    •            workers.add(w);
    •            workerAdded = true;
    •            if (workerAdded) {
    •                // 启动刚刚新建的那个worker持有的线程,等下要看看这个线程做了啥
    •                t.start();
    •                workerStarted = true;
    •            }
    •        }
    •    } finally {
    •        if (! workerStarted)
    •            addWorkerFailed(w);
    •    }
    •    return workerStarted;
    • }1234567891011121314151617181920212223242526272829303132
    • 看到这里,我们大概能猜测到,addWorker方法的功能就是新建一个线程并启动这个线程,要执行的任务应该就是在这个线程中执行。为了证实我们的这种猜测需要再来看看Worker这个类。

    • private final class Worker
    •    extends AbstractQueuedSynchronizer
    •    implements Runnable{
    •    // ....
    • }
    • Worker(Runnable firstTask) {
    •    setState(-1); // inhibit interrupts until runWorker
    •    this.firstTask = firstTask;
    •    this.thread = getThreadFactory().newThread(this);
    • }123456789101112
    • 从上面的Worker类的声明可以看到,它实现了Runnable接口,以及从它的构造方法中可以知道待执行的任务赋值给了它的变量firstTask,并以它自己为参数新建了一个线程赋值给它的变量thread,那么运行这个线程的时候其实就是执行Worker的run()方法,来看一下这个方法:

    •    public void run() {
    •        runWorker(this);
    •    }
    •    final void runWorker(Worker w) {
    •    Thread wt = Thread.currentThread();
    •    Runnable task = w.firstTask;
    •    w.firstTask = null;
    •    w.unlock(); // allow interrupts
    •    boolean completedAbruptly = true;
    •    try {
    •        while (task != null || (task = getTask()) != null) {
    •            w.lock();
    •            // If pool is stopping, ensure thread is interrupted;
    •            // if not, ensure thread is not interrupted.  This
    •            // requires a recheck in second case to deal with
    •            // shutdownNow race while clearing interrupt
    •            if ((runStateAtLeast(ctl.get(), STOP) ||
    •                 (Thread.interrupted() &&
    •                  runStateAtLeast(ctl.get(), STOP))) &&
    •                !wt.isInterrupted())
    •                wt.interrupt();
    •            try {
    •                beforeExecute(wt, task);
    •                Throwable thrown = null;
    •                try {
    •                    task.run();
    •                } catch (RuntimeException x) {
    •                    thrown = x; throw x;
    •                } catch (Error x) {
    •                    thrown = x; throw x;
    •                } catch (Throwable x) {
    •                    thrown = x; throw new Error(x);
    •                } finally {
    •                    afterExecute(task, thrown);
    •                }
    •            } finally {
    •                task = null;
    •                w.completedTasks++;
    •                w.unlock();
    •            }
    •        }
    •        completedAbruptly = false;
    •    } finally {
    •        processWorkerExit(w, completedAbruptly);
    •    }
    • }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
    • 在run()方法中只调了一下 runWorker(this) 方法,再来简化一下这个 runWorker() 方法

    • // 为分析而简化后的代码
    • final void runWorker(Worker w) {
    •    Runnable task = w.firstTask;
    •    w.firstTask = null;
    •    while (task != null || (task = getTask()) != null) {
    •            try {
    •                task.run();
    •            } finally {
    •                task = null;
    •            }
    •        }
    • }12345678910111213
    • 很明显,runWorker()方法里面执行了我们新建Worker对象时传进去的待执行的任务,到这里为止貌似这个worker的run()方法就执行完了,既然执行完了那么这个线程也就没用了,只有等待虚拟机销毁了。那么回顾一下我们的目标:Java线程池中的核心线程是如何被重复利用的?好像并没有重复利用啊,新建一个线程,执行一个任务,然后就结束了,销毁了。没什么特别的啊,难道有什么地方漏掉了,被忽略了?再仔细看一下runWorker()方法的代码,有一个while循环,当执行完firstTask后task==null了,那么就会执行判断条件 (task = getTask()) != null,我们假设这个条件成立的话,那么这个线程就不止只执行一个任务了,可以执行多个任务了,也就实现了重复利用了。答案呼之欲出了,接着看getTask()方法

    • private Runnable getTask() {
    •    boolean timedOut = false; // Did the last poll() time out?
    •    for (;;) {
    •        int c = ctl.get();
    •        int rs = runStateOf(c);
    •        // Check if queue empty only if necessary.
    •        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    •            decrementWorkerCount();
    •            return null;
    •        }
    •        int wc = workerCountOf(c);
    •        // Are workers subject to culling?
    •        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    •        if ((wc > maximumPoolSize || (timed && timedOut))
    •            && (wc > 1 || workQueue.isEmpty())) {
    •            if (compareAndDecrementWorkerCount(c))
    •                return null;
    •            continue;
    •        }
    •        try {
    •            Runnable r = timed ?
    •                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    •                workQueue.take();
    •            if (r != null)
    •                return r;
    •            timedOut = true;
    •        } catch (InterruptedException retry) {
    •            timedOut = false;
    •        }
    •    }
    • }1234567891011121314151617181920212223242526272829303132333435363738
    • 老规矩,简化一下代码来看:

    • // 为分析而简化后的代码
    • private Runnable getTask() {
    •    boolean timedOut = false;
    •    for (;;) {
    •        int c = ctl.get();
    •        int wc = workerCountOf(c);
    •        // timed变量用于判断是否需要进行超时控制。
    •        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
    •        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
    •        // 对于超过核心线程数量的这些线程,需要进行超时控制
    •        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    •        if (timed && timedOut) {
    •            // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时,那么尝试将workerCount减1,即当前活动线程数减1,
    •            // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了
    •            if (compareAndDecrementWorkerCount(c))
    •                return null;
    •            continue;
    •        }
    •        try {
    •            Runnable r = timed ?
    •                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    •                workQueue.take();
    •            // 注意workQueue中的poll()方法与take()方法的区别
    •            //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null
    •            //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止
    •            if (r != null)
    •                return r;
    •            timedOut = true;
    •        } catch (InterruptedException retry) {
    •            timedOut = false;
    •        }
    •    }
    • }123456789101112131415161718192021222324252627282930313233343536373839
    • 从以上代码可以看出,getTask()的作用是

    • 如果当前活动线程数大于核心线程数,当去缓存队列中取任务的时候,如果缓存队列中没任务了,则等待keepAliveTime的时长,此时还没任务就返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数就会这样一个一个地销毁这些多余的线程。

    • 如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

    • 小结

      通过以上的分析,应该算是比较清楚地解答了“线程池中的核心线程是如何被重复利用的”这个问题,同时也对线程池的实现机制有了更进一步的理解:

    • 当有新任务来的时候,先看看当前的线程数有没有超过核心线程数,如果没超过就直接新建一个线程来执行新的任务,如果超过了就看看缓存队列有没有满,没满就将新任务放进缓存队列中,满了就新建一个线程来执行新的任务,如果线程池中的线程数已经达到了指定的最大线程数了,那就根据相应的策略拒绝任务。

    • 当缓存队列中的任务都执行完了的时候,线程池中的线程数如果大于核心线程数,就销毁多出来的线程,直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了,它们一直处于阻塞状态,等待新的任务到来。

    • 注意: 

      本文所说的“核心线程”、“非核心线程”是一个虚拟的概念,是为了方便描述而虚拟出来的概念,在代码中并没有哪个线程被标记为“核心线程”或“非核心线程”,所有线程都是一样的,只是当线程池中的线程多于指定的核心线程数量时,会将多出来的线程销毁掉,池中只保留指定个数的线程。那些被销毁的线程是随机的,可能是第一个创建的线程,也可能是最后一个创建的线程,或其它时候创建的线程。一开始我以为会有一些线程被标记为“核心线程”,而其它的则是“非核心线程”,在销毁多余线程的时候只销毁那些“非核心线程”,而“核心线程”不被销毁。这种理解是错误的。

      另外还有一个重要的接口 BlockingQueue 值得去了解,它定义了一些入队出队同步操作的方法,还可以阻塞,作用很大。

    三、JDK 提供的默认线程池介绍

    在上一篇文章中介绍了 JDK 中提供的线程池类 ThreadPoolExecutor 以及线程池的参数,在实际使用中需要了解个参数的含义从而才能正确的使用线程池来达到我们的目的;

    鉴于此 JDK 也给我们提供了几个可以开箱即用的默认线程池的实现,使用 JDK 的工具类Executors 操作即可;

    下面将介绍几个线程池的区别以及使用场景;

    结合上面的源码及上一篇关于线程池参数的介绍,基本上就可以知道 newFixedThreadPool 实现的线程池就和它的方法名一样是一个固定大小的线程池;

    固定是针对线程池中线程的数量而言的,可以看到这个线程池是一个核心线程数和最大线程数都一样的线程池,相应的超时时间也是设置的0,这里的阻塞队列用的是无参的 LinkedBlockingQueue 因此是一个最大限制是 int 的最大值的队列,基本上可以认为是一个无界的队列了;

    具体使用也非常简单用 Executors.newFixedThreadPool(num) 就可以使用了,但是这个线程需要注意的是:

    1、它创建的线程数是固定,因此设置的数量不能太大,太大会浪费资源;

    2、这个线程池的队列是一个无界的,因此如果任务积压太多会导致队列无限增长,可能会引起 OOM 异常;

    3、理论上应该不会触发拒绝策略,这里的拒绝策略是线程池默认的,因此会抛出异常;

    上面给出了和 newSingleThreadExecutor 相关的三个类的源码其都在 Executors 里,通过源码可以知道 newSingleThreadExecutor 创建的是一个只包含一个线程的线程池,因此它可以串行执行,其实这个线程池和 newFixedThreadPool(1) 类似,只是 newSingleThreadExecutor 会在垃圾回收的时候执行 shutdown,然后由于加了一层代理可能功能没有那么全,其他基本一样;

    从源码可以看出通过 newCachedThreadPool 创建的线程池其实是一个无限的线程池,由于使用的阻塞队列为同步阻塞队列,因此只要有新任务到达,它就会创建一个线程去处理任务,空闲的线程会在 60 秒后被销毁;

    由于该线程池的特殊性,因此它不适合处理执行时间较长的任务,如果任务执行时间长,将会产生很多线程,从而让 CPU 不堪重负;

    newWorkStealingPool 是 JDK8 提供的新的线程池,可以看到它不是由传统的 ThreadPoolExecutor 来实现的线程池,它是由 JDK7 中提供的 ForkJoinPool 提供的线程池,是对目前线程池的补充;

    它创建一个和cpu 核心想等的线程池,用来进行并行任务的执行,它是通过工作窃取的方式,使得创建的线程不会闲置。

    四、并发编程解惑之线程

    主要内容:

    进程是资源分配的最小单位,每个进程都有独立的代码和数据空间,一个进程包含 1 到 n 个线程。线程是 CPU 调度的最小单位,每个线程有独立的运行栈和程序计数器,线程切换开销小。

    Java 程序总是从主类的 main 方法开始执行,main 方法就是 Java 程序默认的主线程,而在 main 方法中再创建的线程就是其他线程。在 Java 中,每次程序启动至少启动 2 个线程。一个是 main 线程,一个是垃圾收集线程。每次使用 Java 命令启动一个 Java 程序,就相当于启动一个 JVM 实例,而每个 JVM 实例就是在操作系统中启动的一个进程。

    多线程可以通过继承或实现接口的方式创建。

    Thread 类是 JDK 中定义的用于控制线程对象的类,该类中封装了线程执行体 run() 方法。需要强调的一点是,线程执行先后与创建顺序无关。

    通过 Runnable 方式创建线程相比通过继承 Thread 类创建线程的优势是避免了单继承的局限性。若一个 boy 类继承了 person 类,boy 类就无法通过继承 Thread 类的方式来实现多线程。

    使用 Runnable 接口创建线程的过程:先是创建对象实例 MyRunnable,然后将对象 My Runnable 作为 Thread 构造方法的入参,来构造出线程。对于 new Thread(Runnable target) 创建的使用同一入参目标对象的线程,可以共享该入参目标对象 MyRunnable 的成员变量和方法,但 run() 方法中的局部变量相互独立,互不干扰。

    上面代码是 new 了三个不同的 My Runnable 对象,如果只想使用同一个对象,可以只 new 一个 MyRunnable 对象给三个 new Thread 使用。

    实现 Runnable 接口比继承 Thread 类所具有的优势:

    线程有新建、可运行、阻塞、等待、定时等待、死亡 6 种状态。一个具有生命的线程,总是处于这 6 种状态之一。 每个线程可以独立于其他线程运行,也可和其他线程协同运行。线程被创建后,调用 start() 方法启动线程,该线程便从新建态进入就绪状态。

    NEW 状态(新建状态) 实例化一个线程之后,并且这个线程没有开始执行,这个时候的状态就是 NEW 状态:

    RUNNABLE 状态(就绪状态):

    阻塞状态有 3 种:

    如果一个线程调用了一个对象的 wait 方法, 那么这个线程就会处于等待状态(waiting 状态)直到另外一个线程调用这个对象的 notify 或者 notifyAll 方法后才会解除这个状态。

    run() 里的代码执行完毕后,线程进入终结状态(TERMINATED 状态)。

    线程状态有 6 种:新建、可运行、阻塞、等待、定时等待、死亡。

    我们看下 join 方法的使用:

    运行结果:

    我们来看下 yield 方法的使用:

    运行结果:

    线程与线程之间是无法直接通信的,A 线程无法直接通知 B 线程,Java 中线程之间交换信息是通过共享的内存来实现的,控制共享资源的读写的访问,使得多个线程轮流执行对共享数据的操作,线程之间通信是通过对共享资源上锁或释放锁来实现的。线程排队轮流执行共享资源,这称为线程的同步。

    Java 提供了很多同步操作(也就是线程间的通信方式),同步可使用 synchronized 关键字、Object 类的 wait/notifyAll 方法、ReentrantLock 锁、无锁同步 CAS 等方式来实现。

    ReentrantLock 是 JDK 内置的一个锁对象,用于线程同步(线程通信),需要用户手动释放锁。

    运行结果:

    这表明同一时间段只能有 1 个线程执行 work 方法,因为 work 方法里的代码需要获取到锁才能执行,这就实现了多个线程间的通信,线程 0 获取锁,先执行,线程 1 等待,线程 0 释放锁,线程 1 继续执行。

    synchronized 是一种语法级别的同步方式,称为内置锁。该锁会在代码执行完毕后由 JVM 释放。

    输出结果跟 ReentrantLock 一样。

    Java 中的 Object 类默认是所有类的父类,该类拥有 wait、 notify、notifyAll 方法,其他对象会自动继承 Object 类,可调用 Object 类的这些方法实现线程间的通信。

    除了可以通过锁的方式来实现通信,还可通过无锁的方式来实现,无锁同 CAS(Compare-and-Swap,比较和交换)的实现,需要有 3 个操作数:内存地址 V,旧的预期值 A,即将要更新的目标值 B,当且仅当内存地址 V 的值与预期值 A 相等时,将内存地址 V 的值修改为目标值 B,否则就什么都不做。

    我们通过计算器的案例来演示无锁同步 CAS 的实现方式,非线程安全的计数方式如下:

    线程安全的计数方式如下:

    运行结果:

    线程安全累加的结果才是正确的,非线程安全会出现少计算值的情况。JDK 1.5 开始,并发包里提供了原子操作的类,AtomicBoolean 用原子方式更新的 boolean 值,AtomicInteger 用原子方式更新 int 值,AtomicLong 用原子方式更新 long 值。 AtomicInteger 和 AtomicLong 还提供了用原子方式将当前值自增 1 或自减 1 的方法,在多线程程序中,诸如 ++i 或 i++ 等运算不具有原子性,是不安全的线程操作之一。 通常我们使用 synchronized 将该操作变成一个原子操作,但 JVM 为此种操作提供了原子操作的同步类 Atomic,使用 AtomicInteger 做自增运算的性能是 ReentantLock 的好几倍。

    上面我们都是使用底层的方式实现线程间的通信的,但在实际的开发中,我们应该尽量远离底层结构,使用封装好的 API,例如 J.U.C 包(java.util.concurrent,又称并发包)下的工具类 CountDownLath、CyclicBarrier、Semaphore,来实现线程通信,协调线程执行。

    CountDownLatch 能够实现线程之间的等待,CountDownLatch 用于某一个线程等待若干个其他线程执行完任务之后,它才开始执行。

    CountDownLatch 类只提供了一个构造器:

    CountDownLatch 类中常用的 3 个方法:

    运行结果:

    CyclicBarrier 字面意思循环栅栏,通过它可以让一组线程等待至某个状态之后再全部同时执行。当所有等待线程都被释放以后,CyclicBarrier 可以被重复使用,所以有循环之意。

    相比 CountDownLatch,CyclicBarrier 可以被循环使用,而且如果遇到线程中断等情况时,可以利用 reset() 方法,重置计数器,CyclicBarrier 会比 CountDownLatch 更加灵活。

    CyclicBarrier 提供 2 个构造器:

    上面的方法中,参数 parties 指让多少个线程或者任务等待至 barrier 状态;参数 barrierAction 为当这些线程都达到 barrier 状态时会执行的内容。

    CyclicBarrier 中最重要的方法 await 方法,它有 2 个重载版本。下面方法用来挂起当前线程,直至所有线程都到达 barrier 状态再同时执行后续任务。

    而下面的方法则是让这些线程等待至一定的时间,如果还有线程没有到达 barrier 状态就直接让到达 barrier 的线程执行任务。

    运行结果:

    CyclicBarrier 用于一组线程互相等待至某个状态,然后这一组线程再同时执行,CountDownLatch 是不能重用的,而 CyclicBarrier 可以重用。

    Semaphore 类是一个计数信号量,它可以设定一个阈值,多个线程竞争获取许可信号,执行完任务后归还,超过阈值后,线程申请许可信号时将会被阻塞。Semaphore 可以用来 构建对象池,资源池,比如数据库连接池。

    假如在服务器上运行着若干个客户端请求的线程。这些线程需要连接到同一数据库,但任一时刻只能获得一定数目的数据库连接。要怎样才能够有效地将这些固定数目的数据库连接分配给大量的线程呢?

    给方法加同步锁,保证同一时刻只能有一个线程去调用此方法,其他所有线程排队等待,但若有 10 个数据库连接,也只有一个能被使用,效率太低。另外一种方法,使用信号量,让信号量许可与数据库可用连接数为相同数量,10 个数据库连接都能被使用,大大提高性能。

    上面三个工具类是 J.U.C 包的核心类,J.U.C 包的全景图就比较复杂了:

    J.U.C 包(java.util.concurrent)中的高层类(Lock、同步器、阻塞队列、Executor、并发容器)依赖基础类(AQS、非阻塞数据结构、原子变量类),而基础类是通过 CAS 和 volatile 来实现的。我们尽量使用顶层的类,避免使用基础类 CAS 和 volatile 来协调线程的执行。J.U.C 包其他的内容,在其他的篇章会有相应的讲解。

    Future 是一种异步执行的设计模式,类似 ajax 异步请求,不需要同步等待返回结果,可继续执行代码。使 Runnable(无返回值不支持上报异常)或 Callable(有返回值支持上报异常)均可开启线程执行任务。但是如果需要异步获取线程的返回结果,就需要通过 Future 来实现了。

    Future 是位于 java.util.concurrent 包下的一个接口,Future 接口封装了取消任务,获取任务结果的方法。

    在 Java 中,一般是通过继承 Thread 类或者实现 Runnable 接口来创建多线程, Runnable 接口不能返回结果,JDK 1.5 之后,Java 提供了 Callable 接口来封装子任务,Callable 接口可以获取返回结果。我们使用线程池提交 Callable 接口任务,将返回 Future 接口添加进 ArrayList 数组,最后遍历 FutureList,实现异步获取返回值。

    运行结果:

    上面就是异步线程执行的调用过程,实际开发中用得更多的是使用现成的异步框架来实现异步编程,如 RxJava,有兴趣的可以继续去了解,通常异步框架都是结合远程 HTTP 调用 Retrofit 框架来使用的,两者结合起来用,可以避免调用远程接口时,花费过多的时间在等待接口返回上。

    线程封闭是通过本地线程 ThreadLocal 来实现的,ThreadLocal 是线程局部变量(local vari able),它为每个线程都提供一个变量值的副本,每个线程对该变量副本的修改相互不影响。

    在 JVM 虚拟机中,堆内存用于存储共享的数据(实例对象),也就是主内存。Thread Local .set()、ThreadLocal.get() 方法直接在本地内存(工作内存)中写和读共享变量的副本,而不需要同步数据,不用像 synchronized 那样保证数据可见性,修改主内存数据后还要同步更新到工作内存。

    Myabatis、hibernate 是通过 threadlocal 来存储 session 的,每一个线程都维护着一个 session,对线程独享的资源操作很方便,也避免了线程阻塞。

    ThreadLocal 类位于 Thread 线程类内部,我们分析下它的源码:

    ThreadLocal 和 Synchonized 都用于解决多线程并发访问的问题,访问多线程共享的资源时,Synchronized 同步机制采用了以时间换空间的方式,提供一份变量让多个线程排队访问,而 ThreadLocal 采用了以空间换时间的方式,提供每个线程一个变量,实现数据隔离。

    ThreadLocal 可用于数据库连接 Connection 对象的隔离,使得每个请求线程都可以复用连接而又相互不影响。

    在 Java 里面,存在强引用、弱引用、软引用、虚引用。我们主要来了解下强引用和弱引用:

    上面 a、b 对实例 A、B 都是强引用

    而上面这种情况就不一样了,即使 b 被置为 null,但是 c 仍然持有对 C 对象实例的引用,而间接的保持着对 b 的强引用,所以 GC 不会回收分配给 b 的空间,导致 b 无法回收也没有被使用,造成了内存泄漏。这时可以通过 c = null; 来使得 c 被回收,但也可以通过弱引用来达到同样目的:

    从源码中可以看出 Entry 里的 key 对 ThreadLocal 实例是弱引用:

    Entry 里的 key 对 ThreadLocal 实例是弱引用,将 key 值置为 null,堆中的 ThreadLocal 实例是可以被垃圾收集器(GC)回收的。但是 value 却存在一条从 Current Thread 过来的强引用链,只有当当前线程 Current Thread 销毁时,value 才能被回收。在 threadLocal 被设为 null 以及线程结束之前,Entry 的键值对都不会被回收,出现内存泄漏。为了避免泄漏,在 ThreadLocalMap 中的 set/get Entry 方法里,会对 key 为 null 的情况进行判断,如果为 null 的话,就会对 value 置为 null。也可以通过 ThreadLocal 的 remove 方法(类似加锁和解锁,最后 remove 一下,解锁对象的引用)直接清除,释放内存空间。

    总结来说,利用 ThreadLocal 来访问共享数据时,JVM 通过设置 ThreadLocalMap 的 Key 为弱引用,来避免内存泄露,同时通过调用 remove、get、set 方法的时候,回收弱引用(Key 为 null 的 Entry)。当使用 static ThreadLocal 的时候(如上面的 Spring 多数据源),static 变量在类未加载的时候,它就已经加载,当线程结束的时候,static 变量不一定会被回收,比起普通成员变量使用的时候才加载,static 的生命周期变长了,若没有及时回收,容易产生内存泄漏。

    使用线程池,可以重用存在的线程,减少对象创建、消亡的开销,可控制最大并发线程数,避免资源竞争过度,还能实现线程定时执行、单线程执行、固定线程数执行等功能。

    Java 把线程的调用封装成了一个 Executor 接口,Executor 接口中定义了一个 execute 方法,用来提交线程的执行。Executor 接口的子接口是 ExecutorService,负责管理线程的执行。通过 Executors 类的静态方法可以初始化

    ExecutorService 线程池。Executors 类的静态方法可创建不同类型的线程池:

    但是,不建议使用 Executors 去创建线程池,而是通过 ThreadPoolExecutor 的方式,明确给出线程池的参数去创建,规避资源耗尽的风险。

    如果使用 Executors 去创建线程池:

    最佳的实践是通过 ThreadPoolExecutor 手动地去创建线程池,选取合适的队列存储任务,并指定线程池线程大小。通过线程池实现类 ThreadPoolExecutor 可构造出线程池的,构造函数有下面几个重要的参数:

    参数 1:corePoolSize

    线程池核心线程数。

    参数 2:workQueue

    阻塞队列,用于保存执行任务的线程,有 4 种阻塞队列可选:

    参数 3:maximunPoolSize

    线程池最大线程数。如果阻塞队列满了(有界的阻塞队列),来了一个新的任务,若线程池当前线程数小于最大线程数,则创建新的线程执行任务,否则交给饱和策略处理。如果是无界队列就不存在这种情况,任务都在无界队列里存储着。

    参数 4:RejectedExecutionHandler

    拒绝策略,当队列满了,而且线程达到了最大线程数后,对新任务采取的处理策略。

    有 4 种策略可选:

    最后,还可以自定义处理策略。

    参数 5:ThreadFactory

    创建线程的工厂。

    参数 6:keeyAliveTime

    线程没有任务执行时最多保持多久时间终止。当线程池中的线程数大于 corePoolSize 时,线程池中所有线程中的某一个线程的空闲时间若达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但如果调用了 allowCoreThread TimeOut(boolean value) 方法,线程池中的线程数就算不超过 corePoolSize,keepAlive Time 参数也会起作用,直到线程池中的线程数量变为 0。

    参数 7:TimeUnit

    配合第 6 个参数使用,表示存活时间的时间单位最佳的实践是通过 ThreadPoolExecutor 手动地去创建线程池,选取合适的队列存储任务,并指定线程池线程大小。

    运行结果:

    线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会不断的去获取队列里的任务来执行。Worker 的加锁解锁机制是继承 AQS 实现的。

    我们来看下 Worker 线程的运行过程:

    总结来说,如果当前运行的线程数小于 corePoolSize 线程数,则获取全局锁,然后创建新的线程来执行任务如果运行的线程数大于等于 corePoolSize 线程数,则将任务加入阻塞队列 BlockingQueue 如果阻塞队列已满,无法将任务加入 BlockingQueue,则获取全局所,再创建新的线程来执行任务

    如果新创建线程后使得线程数超过了 maximumPoolSize 线程数,则调用 Rejected ExecutionHandler.rejectedExecution() 方法根据对应的拒绝策略处理任务。

    CPU 密集型任务,线程执行任务占用 CPU 时间会比较长,应该配置相对少的线程数,避免过度争抢资源,可配置 N 个 CPU+1 个线程的线程池;但 IO 密集型任务则由于需要等待 IO 操作,线程经常处于等待状态,应该配置相对多的线程如 2*N 个 CPU 个线程,A 线程阻塞后,B 线程能马上执行,线程多竞争激烈,能饱和的执行任务。线程提交 SQL 后等待数据库返回结果时间较长的情况,CPU 空闲会较多,线程数应设置大些,让更多线程争取 CPU 的调度。

    以上就是关于线程池源码解析相关问题的回答。希望能帮到你,如有更多相关问题,您也可以联系我们的客服进行咨询,客服也会为您讲解更多精彩的知识和内容。


    推荐阅读:

    mysql主从复制原理(mysql主从复制原理,从库单线程的设计思路)

    线程排行榜(线程最高是多少)

    kotlin协成(kotlin协程和线程的区别)

    深圳景观设计考研手绘班(深圳景观设计考研手绘班推荐)

    快手最多关注5000人吗(快手最多关注5000人吗是真的吗)