Java 线程与线程池

Java 线程与线程池

线程的状态

  • NEW, 新建状态, 线程被创建出来, 但尚未启动时的线程状态

  • RUNNABLE, 就绪状态, 表示可以运行的线程状态, 它可能正在运行, 或者是在排队等待操作系统给它分配 CPU 资源

  • BLOCKED, 阻塞等待锁的线程状态, 表示处于阻塞状态的线程正在等待监视器锁, 比如等待执行 synchronized 代码块或者使用 synchronized 标记的方法

  • WAITING, 等待状态, 一个处于等待状态的线程正在等待另一个线程执行某个特定的动作, 比如, 一个线程调用了 Object.wait() 方法, 那它就在等待另一个线程调用 Object.notify()Object.notifyAll() 方法

  • TIMED_WAITING, 计时等待状态, 和 WAITING 类似, 它只是多了超时时间, 比如调用了有超时时间设置的方法 Object.wait(long timeout)Thread.join(long timeout) 等这些方法时, 它才会进入此状态

  • TERMINATED, 终止状态, 表示线程已经执行完成

关于 Object.wait/notify

相关代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class WaitNotifyCase {
public static void main(String[] args) {
final Object lock = new Object();

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread A is waiting to get lock");
synchronized (lock) {
try {
System.out.println("thread A get lock");
TimeUnit.SECONDS.sleep(1);
System.out.println("thread A do wait method");
lock.wait();
System.out.println("wait end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("thread B is waiting to get lock");
synchronized (lock) {
System.out.println("thread B get lock");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.notify();
System.out.println("thread B do notify method");
}
}
}).start();
}
}

执行结果

1
2
3
4
5
6
7
thread A is waiting to get lock
thread A get lock
thread B is waiting to get lock
thread A do wait method
thread B get lock
thread B do notify method
wait end

疑问

  • 进入 wait/notify 方法之前, 为什么要获取 synchronized 锁?
  • 线程 A 获取了 synchronized 锁, 执行 wait 方法并挂起, 线程 B 又如何再次获取锁?

分析

  • synchronized 代码块通过 javap 生成的字节码中包含 monitorentermonitorexit 指令, 执行 monitorenter 指令可以获取对象的 monitor , 在 wait() 接口注释中有标明 The current thread must own this object's monitor , 所以通过 synchronized 该线程持有了对象的 monitor 的情况下才能调用对象的 wait() 方法
  • wait() 接口注释中还提到调用 wait() 后该线程会释放持有的 monitor 进入等待状态直到被唤醒, 被唤醒的线程还要等到能重新持有 monitor 才会继续执行
  • 线程状态变化:
    1. 调用 wait(): RUNNABLE -> WAITING
    2. 调用 notify:
      • WAITING -> BLOCKED -> RUNNABLE
      • WAITING -> RUNNABLE
      • 具体看 JVM 实现和策略配置

深入: 什么是 monitor

  • HotSpot 虚拟机中 (1.7 版本), monitor 采用 ObjectMonitor 实现

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      ObjectMonitor() {
      _header = NULL;
      _count = 0; // 用来记录该线程获取锁的次数
      _waiters = 0,
      _recursions = 0; // 锁的重入次数
      _object = NULL; // 对应的对象
      _owner = NULL; // 指向持有 ObjectMonitor 对象的线程
      _WaitSet = NULL; // 处于 WAITING 状态的线程, 会被加入到 _WaitSet
      _WaitSetLock = 0 ;
      _Responsible = NULL ;
      _succ = NULL ;
      _cxq = NULL ; // 竞争锁的线程都会先通过互斥同步或 CAS 操作进入 cxq, 队首的对象会进入到 EntryList 中, 进行 tryLock 操作
      FreeNext = NULL ;
      _EntryList = NULL ; // 处于 BLOCKED 状态的线程, 会被加入到 _EntryList
      _SpinFreq = 0 ;
      _SpinClock = 0 ;
      OwnerIsThread = 0 ;
      }
  • 每个线程都有两个 ObjectMonitor 对象列表, 分别为 freeused 列表, 如果当前 free 列表为空, 线程将向全局 global ListLock 请求分配 ObjectMonitor

  • ObjectMonitor 对象中有两个队列:_WaitSet_EntryList, 用来保存 ObjectWaiter 对象列表;_owner 指向获得 ObjectMonitor 对象的线程

    • objectMonitor2
  • 每个等待锁的线程都会被封装成 ObjectWaiter 对象

    • ObjectWaiter 对象是双向链表结构, 保存了_thread(当前线程)以及当前的状态 TState等数据
    • objectWaiter
  • ObjectMonitor 获得锁是通过 void ATTR enter(TRAPS); 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void ATTR ObjectMonitor::enter(TRAPS) {
Thread * const Self = THREAD ;
void * cur ;
// 通过 CAS 尝试把 monitor 的 _owner 设置为当前线程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
// 获取锁失败
if (cur == NULL) { assert (_recursions == 0 , "invariant") ;
assert (_owner == Self, "invariant") ;
// CONSIDER: set or assert OwnerIsThread == 1
return ;
}
// 如果旧值和当前线程一样, 说明当前线程已经持有锁, 此次为重入, _recursions 自增即可
if (cur == Self) {
// TODO-FIXME: check for integer overflow! BUGID 6557169.
_recursions ++ ;
return ;
}

// 如果当前线程是第一次进入该 monitor, 设置 _recursions 为 1, _owner 为当前线程
if (Self->is_lock_owned ((address)cur)) {
assert (_recursions == 0, "internal state error");
_recursions = 1 ;
// Commute owner from a thread-specific on-stack BasicLockObject address to
// a full-fledged "Thread *".
_owner = Self ;
OwnerIsThread = 1 ;
return ;
}

// 省略部分代码。

// 通过自旋执行 ObjectMonitor::EnterI 方法等待锁的释放
for (;;) {
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition()
// or java_suspend_self()

EnterI (THREAD) ;

if (!ExitSuspendEquivalent(jt)) break ;

// We have acquired the contended monitor, but while we were
// waiting another thread suspended us. We don't want to enter
// the monitor while suspended because that would surprise the
// thread that suspended us.
//
_recursions = 0 ;
_succ = NULL ;
exit (Self) ;

jt->java_suspend_self();
}
}

lockenter

  • ObjectMonitor 释放锁是通过 void ATTR exit(TRAPS); 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void ATTR ObjectMonitor::exit(TRAPS) {
Thread * Self = THREAD ;
// 如果当前线程不是 Monitor 的所有者
if (THREAD != _owner) {
if (THREAD->is_lock_owned((address) _owner)) {
// Transmute _owner from a BasicLock pointer to a Thread address.
// We don't need to hold _mutex for this transition.
// Non-null to Non-null is safe as long as all readers can
// tolerate either flavor.
assert (_recursions == 0, "invariant") ;
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
} else {
// NOTE: we need to handle unbalanced monitor enter/exit
// in native code by throwing an exception.
// TODO: Throw an IllegalMonitorStateException ?
TEVENT (Exit - Throw IMSX) ;
assert(false, "Non-balanced monitor enter/exit!");
if (false) {
THROW(vmSymbols::java_lang_IllegalMonitorStateException());
}
return;
}
}
// 如果 _recursions 次数不为 0.自减
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}

// 省略部分代码, 根据不同的策略(由 QMode 指定), 从 cxq 或 EntryList 中获取头节点, 通过ObjectMonitor::ExitEpilog 方法唤醒该节点封装的线程, 唤醒操作最终由 unpark 完成。

lockexit

  • lock.wait() 方法最终通过 ObjectMonitorvoid wait(jlong millis, bool interruptable, TRAPS); 实现:

    1. 将当前线程封装成 ObjectWaiter 对象 node
    2. 通过 ObjectMonitor::AddWaiter 方法将 node 添加到 _WaitSet 列表中
    3. 通过 ObjectMonitor::exit 方法释放当前的 ObjectMonitor 对象, 这样其它竞争线程就可以获取该 ObjectMonitor 对象
    4. 最终底层的 park 方法会挂起线程
  • lock.notify() 方法最终通过 ObjectMonitorvoid notify(TRAPS) 实现:

  1. 如果当前 _WaitSet 为空, 即没有正在等待的线程, 则直接返回
    1. 通过 ObjectMonitor::DequeueWaiter 方法, 获取 _WaitSet 列表中的第一个 ObjectWaiter节点
    2. 根据不同的策略, 将取出来的 ObjectWaiter 节点加入到 _EntryList 或则通过Atomic::cmpxchg_ptr 指令进行自旋操作 _cxq

相关问题

1. BLOCKED(阻塞等待)和 WAITING(等待)有什么区别?

  • 状态形成的调用方法不同
  • BLOCKED 可以理解为当前线程还处于活跃状态, 只是在阻塞等待其他线程使用完某个锁资源
  • WAITING 则是因为自身调用了 Object.wait() 或着是 Thread.join() 又或者是 LockSupport.park() 而进入等待状态, 只能等待其他线程执行某个特定的动作才能被继续唤醒, 比如当线程因为调用了 Object.wait() 而进入 WAITING 状态之后, 则需要等待另一个线程执行 Object.notify()Object.notifyAll() 才能被唤醒

2. start() 方法和 run() 方法有什么区别?

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public synchronized void start() {
        // 状态验证, 不等于 NEW 的状态会抛出异常
        if (threadStatus != 0)
            throw new IllegalThreadStateException();

        // 通知线程组, 此线程即将启动
        group.add(this);
        boolean started = false;
        try {
            start0();
            started = true;
        } finally {
            try {
                if (!started) {
    // 通知线程组, 此线程启动失败
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                // 不处理任何异常, 如果 start0 抛出异常, 则它将被传递到调用堆栈上
            }
        }
    }
    • start() 方法属于 Thread 自身的方法, 并且使用了 synchronized 来保证线程安全
  • run() 方法为 Runnable 的抽象方法, 重写的 run() 方法其实就是此线程要执行的业务方法

  • 调用 start() 方法是另起线程来运行 run() 方法中的内容

3. 线程的优先级有什么用?该如何设置?

  • Thread 源码中和线程优先级相关的属性有 3 个

    • 1
      2
      3
      4
      5
      6
      7
      8
      // 线程可以拥有的最小优先级
      public final static int MIN_PRIORITY = 1;

      // 线程默认优先级
      public final static int NORM_PRIORITY = 5;

      // 线程可以拥有的最大优先级
      public final static int MAX_PRIORITY = 10
  • 线程的优先级可以理解为线程抢占 CPU 时间片的概率, 优先级越高的线程优先执行的概率就越大, 但并不能保证优先级高的线程一定先执行

  • 在程序中我们可以通过 Thread.setPriority() 来设置优先级

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      public final void setPriority(int newPriority) {
          ThreadGroup g;
      // 检查当前线程是否有权限修改优先级
          checkAccess();
          // 先验证优先级的合理性
          if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) {
              throw new IllegalArgumentException();
          }
          if((g = getThreadGroup()) != null) {
              // 优先级如果超过线程组的最高优先级, 则把优先级设置为线程组的最高优先级
              if (newPriority > g.getMaxPriority()) {
                  newPriority = g.getMaxPriority();
              }
              setPriority0(priority = newPriority);
          }
      }

4. 线程的常用方法有哪些?

  • sleep

    • Thread.sleep() 让线程进入到 TIMED_WAITING 状态, 并停止占用 CPU 资源, 但是不释放持有的 monitor , 直到规定事件后再执行, 休眠期间如果被中断, 会抛出异常并清除中断状态
    • TimeUnit.SECONDS.sleep()Thread.sleep() 多了非负数判断
  • join

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      public final synchronized void join(long millis)
      throws InterruptedException {
      long base = System.currentTimeMillis();
      long now = 0;

      if (millis < 0) {
      throw new IllegalArgumentException("timeout value is negative");
      }

      if (millis == 0) {
      while (isAlive()) {
      wait(0);
      }
      } else {
      while (isAlive()) {
      long delay = millis - now;
      if (delay <= 0) {
      break;
      }
      wait(delay);
      now = System.currentTimeMillis() - base;
      }
      }
      }
    • 本质是用 wait() 实现, 这里 wait() 在循环中调用, 是为了避免可能发生的 虚假唤醒 (spurious wakeup) 情况

    • JVM 的 Thread 执行完毕会自动执行一次 notifyAll(), 所以不建议在程序中对 Thread 对象调用 wait/notify, 可能会造成干扰

  • yield

    • A hint to the scheduler that the current thread is willing to yield its current use of a processor. The scheduler is free to ignore this hint.

    • 状态依旧是 RUNNABLE, 不保证释放 CPU 资源

    • Thread.sleep(0) 可以重新触发 CPU 的竞争, 而 yield 不一定

  • interrupt

    • 通知线程停止, 而不是强制停止, 线程可以进行停止前的释放资源, 完成必要的处理任务
    • 在线程内可通过 isInterrupted() 判断终端并进行相应处理
    • 若线程处于等待或堵塞状态, 则会抛出 InterruptedException

5. 被弃用的方法有哪些? 为什么被弃用?

  • suspend
    • 使线程暂停, 但不会释放 monitor, 所以容易造成死锁
  • resume
    • 恢复通过调用 suspend() 方法而停止运行的线程
  • stop
    • 强制停止当前线程, 会释放该线程所持有对象的 monitor, 因而可能造成这些对象处于不一致的状态
    • 而且这个方法造成的 ThreadDeath 异常不像其他的检查期异常一样被捕获

线程池 ( ThreadPoolExecutor)

  • 线程池是为了避免线程频繁的创建和销毁带来的性能消耗, 而建立的一种池化技术, 它是把已创建的线程放入“池”中, 当有任务来临时就可以重用已有的线程, 无需等待创建的过程, 这样就可以有效提高程序的响应速度

  • img

构造函数

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler) {

    ......
    }
    1. corePoolSize 表示线程池的常驻核心线程数。如果设置为 0, 则表示在没有任何任务时, 销毁线程池;如果大于 0, 即使没有任务时也会保证线程池的线程数量等于此值。但需要注意, 此值如果设置的比较小, 则会频繁的创建和销毁线程;如果设置的比较大, 则会浪费系统资源, 所以开发者需要根据自己的实际业务来调整此值

    2. maximumPoolSize 表示线程池在任务最多时, 最大可以创建的线程数。官方规定此值必须大于 0, 也必须大于等于 corePoolSize, 此值只有在任务比较多, 且不能存放在任务队列时, 才会用到

    3. keepAliveTime 表示线程的存活时间, 当线程池空闲时并且超过了此时间, 多余的线程就会销毁, 直到线程池中的线程数量销毁的等于 corePoolSize 为止, 如果 maximumPoolSize 等于 corePoolSize, 那么线程池在空闲的时候也不会销毁任何线程

    4. unit 表示存活时间的单位, 它是配合 keepAliveTime 参数共同使用的

    5. workQueue 表示线程池执行的任务队列, 当线程池的所有线程都在处理任务时, 如果来了新任务就会缓存到此任务队列中排队等待执行

    6. threadFactory 表示线程的创建工厂, 此参数一般用的比较少, 我们通常在创建线程池时不指定此参数, 它会使用默认的线程创建工厂的方法来创建线程:

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        // 默认的线程创建工厂, 需要实现 ThreadFactory 接口
        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;

            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
            // 创建线程
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon()) 
                    t.setDaemon(false); // 创建一个非守护线程
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY); // 线程优先级设置为默认值
                return t;
            }
        }
      • 我们也可以自定义一个线程工厂, 通过实现 ThreadFactory 接口来完成, 这样就可以自定义线程的名称或线程执行的优先级了

    7. RejectedExecutionHandler 表示指定线程池的拒绝策略, 当线程池的任务已经在缓存队列 workQueue 中存储满了之后, 并且不能创建新的线程来执行此任务时, 就会用到此拒绝策略, 它属于一种限流保护的机制

ctl

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING = -1 << COUNT_BITS;
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    private static final int STOP = 1 << COUNT_BITS;
    private static final int TIDYING = 2 << COUNT_BITS;
    private static final int TERMINATED = 3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    private static int workerCountOf(int c) { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 用一个 AtomicInteger 包装两个字段:

    • 高 3 位保存 runState, 低 29 位保存 workerCount
    • 用一个变量去存储两个值, 可避免在做相关决策时, 出现不一致的情况, 不必为了维护两者的一致, 而占用锁资源
    • workerCount: 有效线程数
    • runState: 线程池的运行状态
      • 定义
        • RUNNING: 接受新任务并处理排队的任务
        • SHUTDOWN: 拒绝接受新任务, 但是会处理还在排队的任务
        • STOP: 拒绝接受新任务, 也不处理排队中任务, 并且会中断正在执行的任务
        • TIDYING: 所有任务都已经停止, workerCount 为 0, 转换为状态 TIDYING 的线程将运行 terminated() 方法
        • TERMINATED: terminated() 执行完毕
      • 这些值之间的数字顺序很重要, 可以进行有序的比较
      • runState 随着时间逐步增加, 但不一定达到每个状态, 过渡的顺序为:
        • RUNNING -> SHUTDOWN, 在调用 shutdown() 时, 可能隐藏在 finalize() 中调用
        • (RUNNING or SHUTDOWN) -> STOP, 在调用 shutdownNow()
        • SHUTDOWN -> TIDYING, 当队列和池子内的任务都为空时
        • STOP -> TIDYING, 当池子内的任务为空时
        • TIDYING -> TERMINATED, 当 terminated() 执行完毕时

线程池工作流程

图2 ThreadPoolExecutor运行流程

通过 execute() 执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 当前工作的线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 创建新的线程执行此任务, 传入 true 以核心线程数作为判断阈值
        if (addWorker(command, true))
            return;
// 创建失败, 说明 ctl 有变化, 重新获取
        c = ctl.get();
    }
    // 检查线程池是否处于运行状态, 如果是则把任务添加到队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查线程池是否处于运行状态, 防止在第一次校验通过后线程池关闭
        // 如果是非运行状态, 则将刚加入队列的任务移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果线程池的线程数为 0 时(当 corePoolSize 设置为 0 时会发生)
        else if (workerCountOf(recheck) == 0)
            addWorker(nullfalse); // 新建线程执行任务, 传入 false 以最大线程数作为判断阈值
    }
    // 核心线程和队列都满了, 新建非核心线程执行
    else if (!addWorker(command, false)) 
        // 新建线程失败, 执行拒绝策略
        reject(command);
}
  • addWorker(Runnable firstTask, boolean core) 方法
    • firstTask, 线程应首先运行的任务, 如果没有则可以设置为 null
    • core, 判断是否可以创建线程的阀值(最大值), 如果等于 true 则表示使用 corePoolSize 作为阀值, false 则表示使用 maximumPoolSize 作为阀值
  • 任务调度流程

Worker

构造函数
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread; // Worker 持有的线程
    Runnable firstTask; // 初始化的任务, 可以为 null

    Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
    }

    ......
    }
执行任务流程
  • Worker执行任务
继承 AQS 原因分析

Worker 是通过继承 AQS, 使用 AQS 来实现独占锁这个功能。不用可重入锁 ReentrantLock 而用 AQS, 为的就是实现不可重入的特性去反应线程现在的执行状态

  1. Worker.lock 方法一旦获取了独占锁, 表示当前线程正在执行任务中, 正在执行任务的线程不应该被中断
  2. 如果正在执行任务,则不应该中断线程
  1. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程, interruptIdleWorkers 方法会使用 tryLock 方法来判断线程是否在执行任务, 如果是空闲状态则可以安全回收
  2. 之所以要不可重入, 是为了避免在 Worker 中会调用到线程池 interruptIdleWorkers , 像 setCorePoolSize 方法。如果使用 ReentrantLock, 它是可重入的, 这样会导致该 Worker 自己被中断
  • 此外, 在构造方法中执行了setState(-1);, 把 state 变量设置为 -1, 是因为 AQS 默认的 state 是 0, 如果刚创建了一个 Worker 对象, 还没有执行任务时, 这时就不应该被中断:

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      protected boolean tryAcquire(int unused) {
      if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
      }
      return false;
      }

      protected boolean tryRelease(int unused) {
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
      }
    • tryAcquire 方法是根据 state 是否是 0 来判断的, 所以, setState(-1);state 设置为 -1 是为了防止在执行任务前就中断了线程

    • runWorker 方法中会先调用 Worker 对象的 unlock 方法将 state 设置为 0, 允许中断和 Worker.lock

相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 用于操作 workers 
private final ReentrantLock mainLock = new ReentrantLock();

// 持有线程的引用, 管理线程的生命周期
private final HashSet<Worker> workers = new HashSet<Worker>();

// 用于通知线程池终止完毕
private final Condition termination = mainLock.newCondition();

// 线程池曾经创建过的最大线程数量
private int largestPoolSize;

// 线程池已经执行的和未执行的任务总数
private long completedTaskCount;
  • 为什么workers 不采用线程安全的集合 ?
    • 有许多复合的操作, 比如说将 worker 添加到 workers 后还需要判断是否需要更新 largestPoolSize 等, workers 只在获取到 mainLock 的情况下才会进行读写
    • mainLock 也用于在中断线程的时候串行执行, 否则可能会并发进行线程中断, 引起不必要的中断高峰

addWorker : 增加工作线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

if (rs >= SHUTDOWN && // 线程池是否已停止
! (rs == SHUTDOWN && // 线程池是否正在停止
firstTask == null && ! workQueue.isEmpty()) // 线程是否用于执行剩余任务
)
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || // 线程数是否超过容量
wc >= (core ? corePoolSize : maximumPoolSize)) // 是否超过判断的阀值
return false;
if (compareAndIncrementWorkerCount(c)) // CAS 尝试登记线程数
break retry; // 登记成功
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 判断线程池状态运行过程中是否有改变
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 持有引用
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s; // 更新创建过的最大线程数
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程, 而线程的 run 方法就是执行 runWorker()
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

申请线程执行流程图

runWorker : 不断获取任务并执行

  • Worker 被创建出来后, 就会不断地进行轮询, 然后获取任务去执行, 核心线程可以无限等待获取任务, 非核心线程要限时获取任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
// 是否因为异常退出循环
boolean completedAbruptly = true;
try {
// 如果task为空, 则通过getTask来获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止, 那么要保证当前线程是中断状态, 否则要保证当前线程不是中断状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 标明是正常退出
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

执行任务流程

getTask : 从任务队列获取任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private Runnable getTask() {
// timeOut 表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
/*
* 1. 线程池已经 stop
* 2. 线程池处于 shutdown 并且队列为空
* 如果以上任何条件满足, 则将 workerCount 减 1 并返回 null
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// timed 用于判断是否需要进行超时控制
// allowCoreThreadTimeOut 默认是 false, 也就是核心线程不允许进行超时
// wc > corePoolSize, 表示当前线程池中的线程数量大于核心线程数量
// 对于超过核心线程数量的这些线程, 需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

/*
* 1. 判断 wc > maximumPoolSize 是因为可能通过 setMaximumPoolSize 修改过 maximumPoolSize
* 2. timed && timedOut 如果为 true, 表示当前操作需要进行超时控制, 并且上次从阻塞队列中获取任务发生了超时
* 满足 1 或 2 并且如果有效线程数量大于 1 或者阻塞队列是空的, 那么尝试将 workerCount 减 1
* 判断 wc > 1 是防止在 allowCoreThreadTimeOut 为 true 或 corePoolSize 为 0 时无线程执行还在等待中的任务
* 如果减 1 失败, 则返回重试
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
/*
* 根据 timed 来判断, 如果为 true, 则通过阻塞队列的 poll 方法进行超时控制
* 如果在 keepAliveTime 时间内没有获取到任务, 则返回 null
* 否则通过 take 方法, 如果这时队列为空, 则 take 方法会阻塞直到队列不为空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null, 说明已经超时, timedOut 设置为 true
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断, 则设置 timedOut 为 false 并返回循环重试
timedOut = false;
}
}
}
processWorkerExit : 线程回收
  • 线程池中线程的销毁依赖 JVM 的垃圾回收, 当线程池决定哪些线程需要回收时, 只需要将其引用消除即可

  • Worker 无法获取到任务, 也就是获取的任务为空时, 循环会结束, Worker 会主动消除自身在线程池内的引用

  • 线程回收的工作在 processWorkerExit 方法内完成

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      private void processWorkerExit(Worker w, boolean completedAbruptly) {
      // 如果 completedAbruptly 值为 true, 则说明线程执行时出现了异常, 需要将 workerCount 减 1
      // 如果线程执行时没有出现异常, 说明在 getTask() 方法中已经已经对 workerCount 进行了减 1 操作
      if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
      decrementWorkerCount();

      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
      // 统计完成的任务数
      completedTaskCount += w.completedTasks;
      // 从 workers 中移除, 也就表示着从线程池中移除了一个工作线程
      workers.remove(w);
      } finally {
      mainLock.unlock();
      }

      // 根据线程池状态进行判断是否结束线程池
      tryTerminate();

      int c = ctl.get();
      /*
      * 当线程池是 RUNNING 或 SHUTDOWN 状态时, 如果 worker 是异常结束, 那么会直接 addWorker
      * 如果 allowCoreThreadTimeOut 为 true, 并且等待队列有任务, 至少保留一个 worker
      * 如果 allowCoreThreadTimeOut 为 false, workerCount 不少于 corePoolSize
      */
      if (runStateLessThan(c, STOP)) {
      if (!completedAbruptly) {
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
      min = 1;
      if (workerCountOf(c) >= min)
      return; // replacement not needed
      }
      addWorker(null, false);
      }
      }
  • 事实上在这个方法中, 将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多, 线程池还要判断是什么引发了这次销毁, 是否要改变线程池的现阶段状态, 是否要根据新状态, 重新分配线程
tryTerminate : 根据状态判断是否结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 当前线程池的状态为以下几种情况时, 直接返回:
* 1. RUNNING, 因为还在运行中, 不能停止
* 2. TIDYING 或 TERMINATED, 说明正在或者已经在终止
* 3. SHUTDOWN 并且等待队列非空, 这时要执行完 workQueue 中的 task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;

// 到这个位置为以下情况之一
// 1. 线程池的状态为 SHUTDOWN 并且等待队列为空
// 2. 线程池的状态为 STOP

// 如果线程数量不为 0, 则中断一个空闲的工作线程, 并返回
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}

// 到这个位置则说明线程数量为 0

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 尝试设置状态为 TIDYING, 如果成功则调用 terminated 方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated 方法默认什么都不做, 留给子类实现
terminated();
} finally {
// terminated() 执行完毕, 设置状态为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 通知完成终止
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
// 没设置成功则继续 CAS 尝试
}
}

shutdown , shutdownNow : 关闭线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全策略判断
checkShutdownAccess();
// 切换状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
}

public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置状态为 STOP
advanceRunState(STOP);
// 中断所有工作线程
interruptWorkers();
// 取出队列中没有被执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
interruptIdleWorkers, interruptWorkers : 中断工作线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 通过 w.tryLock 判断是否为空闲
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}


private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

......

void interruptIfStarted() {
Thread t;
// 如果正在执行任务, 中断线程
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

awaitTermination : 等待线程池完成终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// 通过 termination 进行等待
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}

相关问题

1. ThreadPoolExecutor 的执行方法有几种?它们有什么区别?

  • execute() VS submit()

    • 都是用来执行线程池任务, 它们最主要的区别是 submit() 方法可以接收线程池执行的返回值, 而 execute() 不能接收返回值

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      ThreadPoolExecutor executor = new ThreadPoolExecutor(21010L,
              TimeUnit.SECONDS, new LinkedBlockingQueue(20));
      // execute 使用
      executor.execute(new Runnable() {
          @Override
          public void run() {
              System.out.println("Hello, execute.");
          }
      });
      // submit 使用
      Future<String> future = executor.submit(new Callable<String>() {
          @Override
          public String call() throws Exception {
              System.out.println("Hello, submit.");
              return "Success";
          }
      });
      System.out.println(future.get());
    • execute() 方法属于 Executor 接口的方法, 而 submit() 方法则是属于 ExecutorService 接口的方法

    • submit() 中处理的任务如果抛出异常, 只有在调用返回的 Future 对象 get 方法时才会抛出

2. 拒绝策略的分类有哪些? 如何自定义拒绝策略?

  • 自带的拒绝策略有 4 种:

    • AbortPolicy, 终止策略, 线程池会抛出异常并终止执行, 它是默认的拒绝策略
    • CallerRunsPolicy, 把任务交给当前线程来执行
    • DiscardPolicy, 忽略此任务
    • DiscardOldestPolicy, 忽略最早的任务(最先加入队列的任务)
  • 自定义拒绝策略

    • 自定义拒绝策略只需要新建一个 RejectedExecutionHandler 对象, 然后重写它的 rejectedExecution() 方法即可

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      ThreadPoolExecutor executor = new ThreadPoolExecutor(1310,
              TimeUnit.SECONDS, new LinkedBlockingQueue<>(2),
              new RejectedExecutionHandler() {  // 添加自定义拒绝策略
                  @Override
                  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                      // 业务处理方法
                      System.out.println("执行自定义拒绝策略");
                  }
              });
      for (int i = 0; i < 6; i++) {
          executor.execute(() -> {
              System.out.println(Thread.currentThread().getName());
          });
      }

3. 线程池的工作队列有哪些?

  • ArrayBlockingQueue, 是一个用数组实现的有界阻塞队列, 按 FIFO 排序任务, 支持公平锁和非公平锁
  • LinkedBlockingQueue, 基于链表结构的阻塞队列, 按 FIFO 排序任务, 容量可以选择进行设置, 不设置的话, 将是一个无边界的阻塞队列, 最大长度为 Integer.MAX_VALUE, 吞吐量通常要高于 ArrayBlockingQuene
  • DelayQueue, 是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序, 否则根据插入到队列的先后排序
  • PriorityBlockingQueue, 是具有优先级的无界阻塞队列, 不能保证同优先级元素的顺序
  • SynchronousQueue, 一个不存储元素的阻塞队列, 每个插入操作必须等到另一个线程调用移除操作, 否则插入操作一直处于阻塞状态, 吞吐量通常要高于 LinkedBlockingQueue
  • LinkedBlockingDeque, 一个由链表结构组成的双向阻塞队列, 队列头尾都可以插入和移除元素, 多线程并发时, 可以将锁的竞争最多 降到一半

4. ThreadPoolExecutor 如何实现扩展?

  • 通过重写 beforeExecute()afterExecute() 方法, 我们可以在扩展方法中添加日志或者实现数据统计, 比如统计线程的执行时间

关于 Executors 内的线程池对象

  • Executors 源码中 Executors.newFixedThreadPool()Executors.newSingleThreadExecutor()Executors.newCachedThreadPool() 等方法的底层都是通过 ThreadPoolExecutor 实现的

    • FixedThreadPool (固定数目线程的线程池)
      • 适用于处理 CPU 密集型的任务, 确保 CPU 在长期被工作线程使用的情况下, 尽可能的少的分配线程
      • 特点
        • 核心线程数和最大线程数大小一样
        • keepAliveTime 为 0
        • 阻塞队列为 LinkedBlockingQueue
    • CachedThreadPool (可缓存线程的线程池)
      • 适用于并发执行大量短期的小任务
      • 特点
        • 核心线程数为 0
        • 最大线程数为 Integer.MAX_VALUE
        • 阻塞队列为 SynchronousQueue
        • 非核心线程空闲存活时间为 60 秒
    • SingleThreadExecutor (单线程的线程池)
      • 适用于串行执行任务的场景, 一个任务一个任务地执行
      • 特点
        • 核心线程数为 1
        • 最大线程数也为 1
        • 阻塞队列是 LinkedBlockingQueue
        • keepAliveTime 为 0
    • ScheduledThreadPool (定时及周期执行的线程池)
      • 周期性执行任务的场景, 需要限制线程数量的场景
      • 特点
        • 最大线程数为 Integer.MAX_VALUE
        • 阻塞队列是 DelayedWorkQueue
        • keepAliveTime 为 0
        • scheduleAtFixedRate() 按某种速率周期执行
        • scheduleWithFixedDelay() 在某个延迟后执行
  • 在阿里巴巴的《 Java 开发手册 》中是这样规定的:

    • 线程池不允许使用 Executors 去创建, 而是通过 ThreadPoolExecutor 的方式, 这样的处理方式让写的读者更加明确线程池的运行规则, 规避资源耗尽的风险。

    • Executors 返回的线程池对象的弊端如下:

      • FixedThreadPoolSingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE, 可能会堆积大量的请求, 从而导致 OOM
      • CachedThreadPoolScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程, 从而导致 OOM

参考

https://www.jianshu.com/p/f4454164c017

https://www.hollischuang.com/archives/2030

https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html