Java 并发编程实战 笔记

Java 并发编程实战 笔记

二. 线程安全

2.2 原子性

  • 原子操作指不可分割的操作

  • 竞态条件 (Race Condition)

    • 当某个计算的正确性取决于多个线程交替执行的时序时, 那就会发生竞态条件
    • 最常见的竞态条件类型就是”先检查后执行
      • 例如在延迟加载的实现中, 需要先判断对象是否已初始化, 否则需要先进行初始化
      • 两个线程同时进入时可能会造成初始化两次
  • 可以使用 JUC 包中的 Atomic 类保证变量状态的原子性

2.3 加锁机制

  • Atomic 类并不能保证竞态条件下的线程安全

  • 内置锁 synchronized

    • 可重入
      • 获取锁的操作粒度是”线程”, 而不是”调用”
      • 避免了子类重写父类 synchronized 方法时调用父类方法会死锁的情况

2.4 用锁来保护状态

  • 对于被多线程访问的变量, 在访问它时都需要持有同一个锁

2.5 活跃性与性能

  • 要判断同步代码块的合理大小, 需要在安全性(必须满足), 简单性和性能等需求之间进行权衡

三. 对象的共享

3.1 可见性

  • 多线程下存在可见性和重排序的情况

3.1.1 失效数据

  • 在每次访问变量时都使用同步可以避免失效数据

3.1.2 非原子的 64 为操作

  • 在 32 位操作系统下, JVM 允许将 64 位数值 (double, long) 的读写操作分解为两个 32 位的操作

3.1.3 加锁与可见性

  • 加锁的含义不仅仅局限于互斥行为, 还包括内存可见性

3.1.4 volatile 变量

  • volatile 可以确保变量的更新通知到其他线程
  • 并不能保证原子性

3.2 发布与逸出

  • 发布一个对象是指使其能够在当前作用于之外的代码中使用
  • 某个不该被发布的对象被发布时, 被称为逸出
  • 避免在构造函数中将还未完成初始化的对象 this 指针逸出

3.3 线程封闭

  • 仅在单线程内访问数据, 被称为线程封闭 (Thread Confinement)
  • 栈封闭指只能通过局部变量才能访问对象, 因为局部变量位于执行线程的栈中
  • ThreadLocal 能使每个使用变量的线程都有一份独立的副本
    • 避免了传递执行上下文信息
    • 但降低了代码的可重用性, 并在类之间引入隐含的耦合

3.4 不变性

  • 不可变对象一定是线程安全的

3.5 安全发布

  • 多线程间共享数据需要同步, 否则因可见性等问题会出现未知现象
  • Java 内存模型为不可变对象提供了特殊的初始化安全性保证, 从而不需要同步也能共享该对象
  • 发布一个静态对象最简单安全的方式是使用静态初始化器
  • 安全发布只能确保发布时状态的可见性, 后续对该对象的访问修改还是需要使用同步机制

四. 对象的组合

4.1 设计线程安全的类

  • 设计线程安全类的过程需要包含三个基本要素

    • 找出构成对象状态的所有变量
    • 找出约束状态变量的不变性条件
    • 建立对象状态的并发访问管理策略
  • 收集同步的需求, 了解对象在状态变量上的各种约束条件以及依赖状态的操作, 需要借助同步与封装

4.2 实例封闭

  • 将数据封装在对象内部, 可以将数据的访问限制在对象的方法上, 从而更容易确保持有锁

4.3 线程安全的委托

4.4 在现有的线程安全类中添加功能

  • 使用组合而非其他的方式给某个对象添加线程安全的方法

4.5 将同步策略文档化

  • 在文档中说明使用者需要了解的线程安全性保证, 以及代码维护人员需要了解的同步策略

五. 基础构建模块

5.1 同步容器类

5.1.1 同步容器类的问题

  • 同步容器类是线程安全的, 但还是需要额外加锁来保护复合操作, 如遍历删除

5.1.2 迭代器与 ConcurrentModificationException

  • 在设计同步容器类的迭代器时并没有考虑并发修改的问题, 当发现在迭代过程中容器被修改时, 会抛出 ConcurrentModificationException
  • 实现方式为将计数器的变化与容器关联, 在迭代期间计数器被修改, 则 hasNextnext 会抛出 ConcurrentModificationException
  • 在单线程中也可能抛出 ConcurrentModificationException, 当对象直接从容器删除而不是通过 Iterator.remove 来删除时

5.1.3 隐藏迭代器

  • 在某些情况下, 迭代器会隐藏起来, 比如打印容器时会迭代调用每个元素的 toString 方法
  • 容器的 hashCodeequals 等方法也会间接的执行迭代操作, 所以都可能抛出 ConcurrentModificationException

5.2 并发容器

5.2.1 ConcurrentHashMap

  • 同步容器类在执行每个操作期间都持有一个锁
  • 对于基于散列的容器, 如果 hashCode 不能很均匀的分布, 会影响散列表的性能, 特别是还持有锁的情况
  • ConcurrentHashMap 采用分段锁 (Lock Striping) 的机制, 实现了更高的吞吐量, 而在单线程环境只损失较小的性能
  • ConcurrentHashMap 和其他并发容器一起增强了同步容器类
    • 迭代器不会抛出 ConcurrentModificationException, 因此迭代过程不需要加锁
    • ConcurrentHashMap 的迭代器具有弱一致性 (weakly consistent) 而非 “及时失败” (fail-fast)
      • 弱一致性的迭代器允许并发的修改, 并可以 (但是不保证) 在迭代器被构造后将修改操作反映给容器
  • 对于需要在整个 Map 上进行计算的方法, 如 sizeisEmpty , 这些方法的语义被略微减弱了以反映容器的并发特性
    • 因为在计算的过程中可能已经过期, 实际上只是一个估计值
    • 事实上在并发环境下的用处很小
  • ConcurrentHashMap 中没有实现对 Map 加锁以提供独占访问
    • 在 HashTable 和 synchronizedMap 中, 获得 Map 的锁能防止其他线程访问这个 Map
  • ConcurrentHashMap 还实现一些原子的复合操作, 如 putIfAbsent

5.2.3 CopyOnWriteArrayList

  • Copy-On-Write 容器的线程安全性在于, 只要正确的发布一个不可变的对象, 则在访问的时候就不需要进行同步

  • 在每次修改时都会创建并重新发布一个新的容器副本, 从而实现可变性

  • Copy-On-Write 容器的迭代器保留一个指向底层基础数组的引用, 且数组不会被修改

    • 不会抛出 ConcurrentModificationException
    • 并且返回的元素与迭代器创建时的元素完全一致
  • 每次修改都会复制底层数组, 开销很大, 所以当迭代操作大于修改操作时才应该使用

5.3 阻塞队列和生产者-消费者

  • 阻塞队列提供了可阻塞的 puttake 方法, 以及支持定时的 offerpoll 方法
  • 阻塞队列支持生产者-消费者模式, 此模式解耦了生产数据与使用数据的过程
  • 类库中包含了 BlockingQueue 的多种实现
    • LinkedBlockingQueue 和 ArrayBlockingQueue 是 FIFO 队列
      • 两者分别与 LinkedList 和 ArrayList 类似但拥有更好的并发性能
    • PriorityBlockingQueue 是一个按优先级排序的队列
    • SynchronousQueue 是一个不存储元素的阻塞队列, 每个插入操作必须等到另一个线程调用移除操作, 否则插入操作一直处于阻塞状态
  • Java 6 加入了 Deque 和 BlockingDeque
    • 分别对 Queue 和 BlockingQueue 进行扩展
    • Deque 是一个双端队列, 可以在队列的头和尾高效的插入和移除
      • 具体实现包括 ArrayDeque 和 LinkedBlockingDeque
    • 适用于工作密取 (Work Stealing) 模式
      • 每个消费者持有各自的 Deque
      • 如果一个消费者消费完了自己的 Deque, 它会去其他消费者 Deque 尾部消费
      • 极大的减少了竞争

5.4 阻塞方法与中断方法

  • 线程可能会阻塞或暂停执行, 原因有多种:

    • 等待 I/O 操作结束
    • 等待获得锁
    • 等待从 Thread.sleep 中醒来
    • 等待另外一个线程的计算结果
  • 大多数阻塞的操作被中断时会抛出 InterruptedException

  • 中断是一种协助机制

    • 调用一个线程的 interrupt() 方法中断一个线程,并不是强行关闭这个线程,只是通知线程停止,将线程的中断标志位置为 true,线程是否中断,由线程本身决定, 线程可以进行停止前的释放资源, 完成必要的处理任务
  • 处理中断的两种选择

    1. 传递中断

      • 对于底层的方法, 在方法的签名上标注异常 (throws InterruptedException)

      • 抛出异常,而异常的真正处理,应该交给调用它的那个函数

      • 因为标注了异常, 调用者必须对 InterruptedException 异常进行处理

    2. 恢复中断

      • 在底层方法中 catch 处理异常
      • 处理完成后手动调用 Thread.currentThread().interrupt() 恢复中断

5.5 同步工具类

5.5.1 闭锁

  • 闭锁可以延迟线程的进度直到其到达终止状态
  • CountDownLatch 是一种闭锁的实现
    • 有初始计数值
    • 计数值大于 0 时, 获取锁的线程会被阻塞
    • 计数值被减到 0 时, 所有被阻塞的线程同时被释放

5.5.2 FutureTask

  • FutureTask 也可以用做闭锁
  • 实现了 Future 语义, 表示一种抽象的可生成结果的计算
  • FutureTask 表示的计算是通过 Callable 实现的, 可以处于以下三种状态
    • 等待运行 (Waiting to run)
    • 正在运行 (Running)
    • 运行完成 (Completed)
  • Future.get 的行为取决于任务的状态
    • 如果任务已完成, 会立即返回结果
    • 否则将阻塞直到任务完成
    • 返回结果或者抛出异常

5.5.3 信号量

  • Semaphore 用来控制同时访问某个资源的操作数量

    • 信号量 S, 整型变量, 需要初始化值大于0
    • P 操作, 原子减少 S, 如果 S < 0, 则阻塞当前线程
      • 对应方法 Semaphore.acquire
    • V 操作, 原子增加 S, 如果 S <= 0, 则唤醒一个阻塞的线程
      • 对应方法 Semaphore.release
  • 可以利用信号量将任何容器变成有界阻塞容器

5.5.4 栅栏

  • 栅栏 (Barrier) 类似于闭锁, 能阻塞一组线程直到某个事件发生

    • 主要区别在于必须所有线程到达栅栏位置才能继续执行
    • 闭锁用于等待事件, 栅栏用于等待其他线程
  • CyclicBarrier

    • 1
      public CyclicBarrier(int parties, Runnable barrierAction)
      • parties 表示线程个数
      • barrierAction 表示线程都达到栅栏后先执行的一个方法
    • 1
      private int dowait(boolean timed, long nanos)
      • timed 表示是否需要超时
      • nanos 表示超时的具体时间
      • 超时会抛出 BrokenBarrierException
    • 1
      public void reset()
      • 重置栅栏, 原先在等待的线程会收到 BrokenBarrierException

六. 任务执行

6.1 在线程中执行任务

  • 串行处理机制无法提供高吞吐率或快速响应性
  • 为每个任务分配一个线程会造成很高的开销和资源消耗

6.2 Executor 框架

  • java.util.concurrent 包中提供了一种灵活的线程池实现作为 Executor 框架的一部分
  • Executor 基于生产者 - 消费者模式

6.2.2 执行策略

  • 在什么线程中执行任务
  • 任务按照什么顺序执行 (FIFO, LIFO, 优先级)
  • 有多少个任务能并发执行
  • 在队列中有多少任务在等待执行
  • 系统过载时的拒绝策略以及相关通知
  • 执行任务的前后

6.2.3 线程池

  • 线程池通过重用现有的线程而不是创建新线程而节省开销和响应性

  • Executors 中创建线程池的静态方法

    • newFixedThreadPool 创建一个核心和最大线程数量都固定的线程池, 等待队列的长度不受限制

    • newCachedThreadPool 创建一个可缓存的线程池, 会回收空闲线程, 最大线程数不受限制

    • newSingleThreadExecutor 创建一个单线程的 Executor, 等待队列的长度不受限制

    • newScheduledThreadPool 创建一个核心线程数量固定的线程池, 以延迟或定时的方式执行任务, 最大线程数不受限制

6.2.4 Executor 的生命周期

  • 如果无法正确的关闭 Executor , JVM 将无法结束
  • Executor 扩展了 ExecutorService 接口用于解决生命周期

6.3 找出可利用的并行性

  • Executor 框架使用 Runnable 作为其基本的任务表示形式

  • Runnable 有很大的局限的抽象, 不能返回值或抛出受检查的异常

  • Callable 是一种更好的抽象

  • Future 表示一个任务的生命周期

  • CompletionService 将 Executor 和 BlockingQueue 融合在一起, 可以从阻塞队列里取出完成的 Future

七. 取消与关闭

  • 通常中断是实现取消最合理的方式

  • 最合理的中断策略是某种形式的线程级取消操作或服务级取消操作

    • 尽快退出, 在必要时进行清理, 通知所有者该线程已退出
  • 大多数可阻塞的库函数都是抛出 InterruptException 作为中断响应

  • 处理不可中断的阻塞

    • Java.io 包中的同步 Socket I/O
      • InputStream 和 OutputStream 中的 read, write 等方法都不会响应中断
      • 通过关闭底层的套接字, 可以使上述方法抛出 SocketException
    • Java.io 包中的同步 I/O
      • 大多数标准的 Channel 都实现了 InterruptibleChannel
      • 中断一个正在 InterruptibleChannel 上等待的线程会使所有在这条链路上阻塞的线程抛出 CloseByInterruptException 并关闭链路
      • 关闭一个 InterruptibleChannel 时会使所有在这条链路上阻塞的线程抛出 AsynchronousCloseException
    • Selector 的异步 I/O
      • 如果一个线程在调用 Selector.select 方法 (在 java.nio. channels 中) 时阻塞了, 调用 close 或 wakeup 方法会使线程抛出 CloseSelectorException
    • 获取某个锁
      • 如果一个线程由于等待内置锁而阻塞, 那将无法响应中断
      • 在 Lock 类中提供了 lockInterruptibly 方法允许线程响应中断
  • 避免使用终结器

八. 线程池的使用

8.1 在任务与执行策略之间的隐性耦合

  • 有些任务需要明确的指定执行策略
    • 依赖性任务
      • 可能产生死锁
    • 使用线程封闭机制的任务
      • 如果将 Executor 从单线程环境改为线程池环境, 会失去线程安全性
    • 对响应时间敏感的任务
      • 执行时间长的任务会影响时间短的任务
    • 使用 ThreadLocal 的任务

8.2 设置线程池的大小

image-20200908202033464

8.3 配置 ThreadPoolExcutor

1
2
3
4
5
6
7
8
9
10
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {

......
}
  1. corePoolSize 表示线程池的常驻核心线程数

    • 如果设置为 0, 则表示在没有任何任务时, 销毁线程池
    • 如果大于 0, 即使没有任务时也会保证线程池的线程数量等于此值
    • 此值如果设置的比较小, 则会频繁的创建和销毁线程
    • 如果设置的比较大, 则会浪费系统资源, 所以开发者需要根据自己的实际业务来调整此值
  2. maximumPoolSize 表示线程池在任务最多时, 最大可以创建的线程数

    • 此值必须大于 0, 也必须大于等于 corePoolSize, 此值只有在任务比较多, 且不能存放在任务队列时才会用到
  3. keepAliveTime 表示线程的存活时间

    • 当线程池空闲时并且超过了此时间, 多余的线程就会销毁, 直到线程池中的线程数量销毁的等于 corePoolSize 为止
    • 如果 maximumPoolSize 等于 corePoolSize, 那么线程池在空闲的时候也不会销毁任何线程
  4. unit 表示存活时间的单位, 它是配合 keepAliveTime 参数共同使用的

  5. workQueue 表示线程池执行的任务队列

    • 当线程池的所有线程都在处理任务时, 如果来了新任务就会缓存到此任务队列中排队等待执行
  6. threadFactory 表示线程的创建工厂, 此参数一般用的比较少, 我们通常在创建线程池时不指定此参数, 它会使用默认的线程创建工厂的方法来创建线程:

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      // 默认的线程创建工厂, 需要实现 ThreadFactory 接口
      static class DefaultThreadFactory implements ThreadFactory {
          private static final AtomicInteger poolNumber = new AtomicInteger(1);
          private final ThreadGroup group;
          private final AtomicInteger threadNumber = new AtomicInteger(1);
          private final String namePrefix;

          DefaultThreadFactory() {
              SecurityManager s = System.getSecurityManager();
              group = (s != null) ? s.getThreadGroup() :
                                    Thread.currentThread().getThreadGroup();
              namePrefix = "pool-" +
                            poolNumber.getAndIncrement() +
                           "-thread-";
          }
          // 创建线程
          public Thread newThread(Runnable r) {
              Thread t = new Thread(group, r,
                                    namePrefix + threadNumber.getAndIncrement(),
                                    0);
              if (t.isDaemon()) 
                  t.setDaemon(false); // 创建一个非守护线程
              if (t.getPriority() != Thread.NORM_PRIORITY)
                  t.setPriority(Thread.NORM_PRIORITY); // 线程优先级设置为默认值
              return t;
          }
      }
    • 我们也可以自定义一个线程工厂, 通过实现 ThreadFactory 接口来完成, 这样就可以自定义线程的名称或线程执行的优先级了

  7. RejectedExecutionHandler 表示指定线程池的拒绝策略

    • 当线程池的任务已经在缓存队列 workQueue 中存储满了之后, 并且不能创建新的线程来执行此任务时, 就会用到此拒绝策略, 它属于一种限流保护的机制

8.4 扩展 ThreadPoolExcutor

  • 通过重写 beforeExecute()afterExecute() 方法, 我们可以在扩展方法中添加日志或者实现数据统计, 比如统计线程的执行时间

十. 避免活跃性危险

10.1 死锁

  • 抱死 Deadly Embrace

    • 线程 A 持有锁 L 并想获得锁 R 时, 线程 B 持有锁 R 并尝试获得锁 L
    • image-20200908213324666
  • 数据库系统的设计中考虑了监测死锁以及从死锁中恢复

    • 当检测到一组事务 (Transaction) 发生了死锁时 (通过在表示等待关系的有向图中搜索循环)
    • 选择一个牺牲者并放弃这个事务, 释放它所持有的资源
  • 锁顺序死锁

    • 如果按照相同循序来请求锁, 就不会出现死锁
    • 业务中根据一个唯一不可变, 且具备可比性的值 (如数据库中的 id) 来排序加锁
  • 在协作对象之间发生的死锁

    • 如果在持有锁时调用某个外部方法, 将会出现活跃性问题
    • 在这个外部方法中可能会获取其他锁, 从而产生死锁
    • 或者阻塞时间过长, 导致其他线程无法及时获得当前被持有的锁
  • 在调用某个方法时不需要持有锁, 那这种调用称为开放调用

10.2 死锁的避免和诊断

  • 使用支持定时的锁
  • 通过 Thread Dump 分析死锁

10.3 其他活跃性危险

  • 饥饿 (Starvation)
    • 线程由于不能访问所需要的资源而无法继续执行时, 就发生了饥饿
    • 引发饥饿的最常见资源就是 CPU 时钟周期
    • 线程优先级可能会导致低优先级的线程饥饿
  • 糟糕的响应性
  • 活锁 (Livelock)
    • 错误的将不可修复的错误作为可修复的错误, 不断尝试修复导致活锁

十一. 性能与可伸缩性

11.1 对性能的思考

  • 可伸缩性: 当增加计算资源时 (如 cpu, 内存, 存储容量或带宽), 程序的吞吐量或者处理能力能相应地增加
  • 避免不成熟的优化, 首先正确运行, 然后再考虑提高速度

11.2 Amdahl 定律

  • image-20200914215649373

11.3 线程引入的开销

  • 上下文切换
  • 内存同步
    • 非竞争同步会被 JVM 进行优化, 如锁消除和锁粒度粗化
  • 阻塞

11.4 减少锁竞争

  • 降低锁竞争程度的方式
    • 减少持有锁的时间
      • 缩小锁的范围
        • 主要考虑大量计算或阻塞操作的代码
      • 锁分解, 锁分段
        • 避免热点域
    • 降低请求锁的频率
    • 使用带有协调机制的独占锁
      • 并发容器
      • 读写锁
      • 不可变对象
      • 原子变量
  • 对象分配操作比同步的开销低, 对于性能优化来说, 对象池的用途有限

11.5 比较 Map 的性能

  • image-20200915222255567
  • 当竞争变的激烈时, 每个操作消耗的时间大部分用于上下文切换和调度延迟, 再加入更多线程也不会提高太多吞吐量

十二. 并发程序的测试

  • 并发测试大致分为两类

    • 安全性测试
      • 不发生任何错误的行为
      • 要找出容易检查的属性, 这些属性在发生错误时极可能失败, 同时检查的代码不会限制并发性
    • 活跃性测试
      • 性能测试
        • 吞吐量: 一组并发任务中已经完成任务所占的比例
        • 响应性: 请求从发出到完成之间的时间 (延迟)
        • 可伸缩性: 在增加更多资源的情况下, 吞吐量提升的情况
  • 可用通过 Thread.yieldThread.sleep(0) 产生更多交替操作, 提高出错的概率

12.3 避免性能测试的陷阱

  • 垃圾回收

    • 确保测试运行期间能执行多次垃圾回收
  • 动态编译

    • 当某个类第一次被加载时, HotSpot JVM 会通过解译字节码的方式来执行它, 当某个方法运行次数足够多, 动态编译器会将其编译为机器码, 后面代码的执行就从解释执行变为直接执行
    • 代码还可能被反编译已经重新编译
    • 确保测试运行足够长, 或先预运行一段时间
  • 对代码路径的不真实采样

    • JVM 可能会基于测试情况下临时有效的假设进行优化
    • 测试程序不仅要大致判断典型的使用模式, 还需要尽量覆盖执行的代码路径集合
  • 无用代码的消除

    • 避免测试时的没覆盖到的代码被编译器优化消除
    • 可以在相关代码中打印空白字符串

十三. 显式锁