Skip to content

共享模型之管程

共享问题

临界区

多个线程访问共享资源,且在多个线程对共享资源读写操作时发生指令交错,就会出现问题

临界区:存在对共享资源的多线程读写操作的代码块

竞态条件

多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

解决方式

  1. 非阻塞式解决方案:原子变量
  2. 阻塞式解决方案:Lock、synchronized

synchronized 初识

synchronized 概述

synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。

synchronized 语法

java
synchronized(对象) {
    临界区
}

方法上的 synchronized

对于成员方法,锁当前对象

java
class Test{
    public synchronized void test() {

    }
}

// 等价于

class Test{
    public void test() {
        synchronized(this) {

        }
    }
}

对于非成员方法,锁 Class 类对象

java
class Test{
    public synchronized static void test() {
    }
}

// 等价于

class Test{
    public static void test() {
        synchronized(Test.class) {

        }
    }
}

线程安全分析

线程安全场景分析

成员变量

多个线程访问一个共享资源会出现线程安全问题。

java
class ThreadUnsafe {
    ArrayList<String> list = new ArrayList<>();
    public void method1(int loopNumber) {
        for (int i = 0; i < loopNumber; i++) {
            // { 临界区, 会产生竞态条件
            method2();
            method3();
            // } 临界区
        }
    }
    private void method2() {
        // 不是原子操作
        list.add("1");
    }
    private void method3() {
        // 不是原子操作
        list.remove(0);
    }
}

局部变量

每个线程都使用各自的资源,不会出现问题。

java
class ThreadSafe {
    public final void method1(int loopNumber) {
        ArrayList<String> list = new ArrayList<>();
        for (int i = 0; i < loopNumber; i++) {
            method2(list);
            method3(list);
        }
    }
    private void method2(ArrayList<String> list) {
        list.add("1");
    }
    private void method3(ArrayList<String> list) {
        list.remove(0);
    }
}

常见线程安全类

  • String、Integer
  • StringBuffer
  • Vector、HashTable
  • juc 包的类

对于线程安全类,多个线程调用单个实例的某个方法时,是线程安全的。方法是原子操作,但多个方法组合不是原子的。

String、Integer 等不可变类是线程安全的。

无状态(没有成员变量)的对象也是线程安全的。

String 类为什么设计成 final 修饰的?

避免子类继承后重写方法后,破坏线程安全性。

Monitor

Java 对象头

Java 对象的组成部分

mermaid
graph TB
    A[Java对象] --> B[对象头]
    A --> C[实例数据]
    A --> D[对齐填充字节]

    B --> E[Mark Word]
    B --> F[指向类的指针]
    B --> G["数组长度(数组特有)"]

    style A fill:#f9f,stroke:#333,stroke-width:4px;
    style B fill:#bbf,stroke:#333,stroke-width:2px;
    style C fill:#bbf,stroke:#333,stroke-width:2px;
    style D fill:#bbf,stroke:#333,stroke-width:2px;
    style E fill:#bfb,stroke:#333,stroke-width:1px;
    style F fill:#bfb,stroke:#333,stroke-width:1px;
    style G fill:#bfb,stroke:#333,stroke-width:1px;

Mark Word 结构

锁状态25bit4bit1bit2bit
23bit2bit是否偏向锁锁标志位
无锁对象的HashCode分代年龄001
偏向锁线程IDEpoch分代年龄101
轻量级锁指向栈中锁记录的指针00
重量级锁指向重量级锁的指针10
GC标记11

Monitor 原理

Monitor 是监视器或管程。

每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针。

  1. 刚开始 Monitor 中 Owner 为 null
  2. 当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor中只能有一个 Owner
  3. 在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行 synchronized(obj),就会进入 EntryList,线程进入 BLOCKED 状态
  4. Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争是非公平的
  5. 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足,进入 WAITING 状态的线程,用于 wait-notify 机制

synchronized 原理

锁升级过程

mermaid
flowchart TD;
    A[偏向锁] -->|线程获取| B[轻量级锁]
    B -->|CAS成功| C[临界区]
    C -->|释放锁| A
    B -->|CAS失败| D[锁竞争]
    D -->|升级| E[重量级锁]
    E -->|线程阻塞| F[等待唤醒]
    F -->|锁释放| G[重新尝试]
    G -->|获取锁| C
    D -->|自旋| H[锁自旋]
    H -->|获取成功| C
    H -->|获取失败| D

轻量级锁

无竞争线程时使用轻量级锁,有竞争会膨胀为重量级锁。

1、创建锁记录 Object,每个线程都的栈帧都包含一个锁记录,内部可以存储锁定对象的 Mark Word

2、让锁记录中 Object reference 指向锁对象,并尝试使用 CAS 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录

3、如果 CAS 替换成功,对象头中存储了锁记录地址和状态 00 ,表示由该线程给对象加锁

4、如果 CAS 失败,有两种情况

  1. 如果是其它线程已经持有了该对象的轻量级锁。这表明有竞争,进入锁膨胀,升级成重量级锁
  2. 如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数

5、当退出 synchronized 代码块时,如果有取值为 null 的锁记录(重入锁),这时重置锁记录,表示重入计数 -1

6、当退出 synchronized 代码块,锁记录的值不为 null,尝试使用 CAS 将 Mark Word 的值恢复给对象 头

  1. 恢复成功,则解锁成功
  2. 恢复失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

锁膨胀

尝试加轻量级锁的过程中,CAS 操作无法成功(有其它线程已经为此对象加上了轻量级锁),会进行锁膨胀,变成重量级锁。

1、当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

2、这时 Thread-1 加轻量级锁失败,进入锁膨胀流程。

  1. 为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址
  2. 然后自己进入 Monitor 的 EntryList 中,线程进入 BLOCKED 状态

3、当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,恢复失败。这时会进入重量级解锁流程,按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 的线程

自旋优化

锁竞争的时可以使用自旋来进行优化。如果当前线程自旋成功(即这时候持锁线程已经释放了锁),当前线程就可以避免阻塞。

自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势

线程加入 EntryList 进入 BLOCKED 阻塞状态需要进行上下文切换,消耗较多资源。

偏向锁

偏向状态

问题:轻量级锁在没有竞争时(就当前线程使用),每次重入仍然需要执行 CAS 操作。

偏向锁:第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有。

处于偏向锁的对象解锁后,线程 id 仍存储于对象头中

  • 如果开启了偏向锁(默认开启),那么对象创建后,markword 值最后 3 位为 101,这时它的 thread、epoch、age 都为 0
  • 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数 - XX:BiasedLockingStartupDelay=0 来禁用延迟
  • 如果没有开启偏向锁,那么对象创建后,markword 值最后 3 位为 001,这时它的 hashcode、 age 都为 0。正常状态对象一开始是没有 hashCode 的,第一次调用才生成

撤销偏向锁 - hashCode

调用了对象的 hashCode() 方法,但偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode() 会导致偏向锁被撤销

轻量级锁会在锁记录中记录 hashCode(CAS 交换到锁对象中)

重量级锁会在 Monitor 中记录 hashCode

撤销偏向锁 - 其它线程使用对象

当有其它线程使用偏向锁对象时(不是竞争锁),会将偏向锁升级为轻量级锁

撤销偏向锁 - 调用 wait/notify

java
public static void main(String[] args) throws InterruptedException {
    Dog d = new Dog();
    
    new Thread(() -> {
        log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        synchronized (d) {
            log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
            try {
                d.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
        }
    }, "t1").start();
    
    new Thread(() -> {
        try {
            Thread.sleep(6000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        synchronized (d) {
            log.debug("notify");
            d.notify();
        }
    }, "t2").start();
}
[t1] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000101 
[t1] - 00000000 00000000 00000000 00000000 00011111 10110011 11111000 00000101 
[t2] - notify 
[t1] - 00000000 00000000 00000000 00000000 00011100 11010100 00001101 11001010

批量重偏向 & 批量撤销

当其他线程使用导致撤销偏向锁阈值超过 20 次后,jvm 会在给这些对象加锁时重新偏向至加锁线程

当撤销偏向锁阈值超过 40 次后,整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的

锁消除

JVM 会在运行时检测到某些代码段中的锁实际上并不需要被持有,从而消除这些不必要的锁。

wait & notify

wait & notify 概述

  1. obj.wait():让当前线程进入 object 监视器的线程到 waitSet 等待。会释放对象的锁,从而让其他线程就机会获取对象的锁,无限制等待,直到被 notify() 为止
  2. obj.notify():在 object 上正在 WaitSet 等待的线程中挑一个唤醒
  3. obj.notifyAll():让 object 上正在 WaitSet 等待的线程全部唤醒

sleep(long n)wait(long n) 的区别:

  1. sleep 是 Thread 方法,而 wait 是 Object 的方法
  2. sleep 不需要强制和 synchronized 配合使用,但 wait 必须和 synchronized 一起用
  3. sleep 在睡眠时不会释放对象锁的,但 wait 在等待的时会释放对象锁
  4. 它们的状态都是 TIMED_WAITING

wait & notify 最佳实践

java
static boolean ok = false;

// 线程1
synchronized(lock) {
    while(!ok) {
        lock.wait();
    }
    // 干活
}

// 线程2
synchronized(lock) {
    ok = true;
    lock.notifyAll();
}

同步:保护性暂停

保护性暂停:用于在一个线程等待另一个线程的执行结果

如果有结果不断从一个线程到另一个线程那么可以使用消息队列(生产者/消费者模型)

java
class GuardedObject {
    private Object res;

    public synchronized Object get() {
        while (res == null) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return res;
    }

    public synchronized Object get(long timeout) {
        long begin = System.currentTimeMillis();
        long passed = 0;
        while (res == null) {
            long wait = timeout - passed;
            if (wait <= 0) {
                break;
            }
            try {
                this.wait(wait);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            passed = System.currentTimeMillis() - begin;
        }
        return res;
    }

    public synchronized void set(Object res) {
        this.res = res;
        this.notifyAll();
    }
}

join() 的原理

join():调用者轮询检查线程 alive 的状态

java
public final synchronized void join(final long millis)
    throws InterruptedException {
    if (millis > 0) {
        if (isAlive()) {
            final long startTime = System.nanoTime();
            long delay = millis;
            do {
                wait(delay);
            } while (isAlive() && (delay = millis -
                                   TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
        }
    } else if (millis == 0) {
        while (isAlive()) {
            wait(0);
        }
    } else {
        throw new IllegalArgumentException("timeout value is negative");
    }
}

异步:生产者消费者

  1. 不需要产生和消费的线程一一对应
  2. 消费队列可以用来平衡生产和消费的线程资源
  3. 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  4. 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据

JDK 中各种阻塞队列,采用的就是这种模式

java
class Message {
    private int id;
    private Object message;

    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }

    public int getId() {
        return id;
    }

    public Object getMessage() {
        return message;
    }
}

class MessageQueue {
    private LinkedList<Message> queue;
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }

    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }

    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}

park & unpark

park & unpark 概述

LockSupport 类中的方法,先 park 再 unpark。

java
// 暂停当前线程
LockSupport.park(); 

// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

特点:

  1. wait,notify 和 notifyAll 必须配合 Object Monitor(synchronized 代码块) 一起使用,而 park,unpark 不需要
  2. park & unpark 可以先 unpark,而 wait & notify 不能先 notify
  3. park & unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,没那么精确

park & unpark 原理

park()

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,获得 _mutex 互斥锁
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

park() -> unpark()

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0

image-20240908212732772

unpark() -> park()

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

线程状态转换

  1. New -> Runnable:调用 t.start() 方法

  2. Runnable <--> Waiting:

    1. synchronized 获取锁之后,调用 obj.wait() 方法
    2. 调用 obj.notify()obj.notifyAll()t.interrupt() 方法
  3. Runnable <--> Waiting:

    1. 调用 t.join() 方法,等待指定线程结束
    2. 线程运行结束,或调用了当前线程的 interrupt()
  4. Runnable <--> Waiting:

    1. 调用 LockSupport.park() 方法
    2. 调用 LockSupport.unpark() 方法,或调用了当前线程的 interrupt()
  5. Runnable <--> Timed_Waiting:

    1. 在 synchronized 获取锁之后,调用 obj.wait(long n) 方法
    2. 到达指定时间后,或调用 obj.notify()obj.notifyAll()t.interrupt() 方法
  6. Runnable <--> Timed_Waiting:

    1. 调用 t.join(long n) 方法
    2. 线程运行结束、到达指定时间后,或调用了当前线程的 interrupt()
  7. Runnable <--> Timed_Waiting:

    1. 调用 LockSupport.parkNanos(long nanos) 方法
    2. 到达指定时间后、调用 LockSupport.unpark() 方法,或调用了当前线程的 interrupt()
  8. Runnable <--> Timed_Waiting:

    1. 调用 Thread.sleep(long n) 方法
    2. 到达指定时间后
  9. Runnable <--> Blocked:

    1. 等待 synchronized 代码块并竞争锁
    2. 竞争成功,获取到锁
  10. Runnable -> Terminated:线程运行结束

活跃性

死锁

死锁:t1 线程获得A对象锁,接下来想获取B对象的锁;t2 线程获得B对象锁,接下来想获取A对象的锁。

定位死锁:检测死锁可以使用 jconsole工具,或使用 jps 定位进程 id,再用 jstack 定位死锁。

哲学家就餐问题

  • 有五位哲学家,围坐在圆桌旁。
  • 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
  • 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。
  • 如果筷子被身边的人拿着,自己就得等待。

活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。

举例:t1 线程 a++,t2 线程 a-- 且两者频率相同。

饥饿

按顺序获取锁时,已获取到多个锁的线程更容易被调度运行。

ReentrantLock

ReentrantLock 概述

  1. 可中断
  2. 可以设置超时时间
  3. 可以设置为公平锁
  4. 支持多个条件变量
  5. 可重入
java
// 获取锁
reentrantLock.lock();
try {
    // 临界区
} finally {
    // 释放锁
    reentrantLock.unlock();
}

可重入

同一个线程如果首次获得了这把锁后,可以再次获取这把锁

java
static ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) {
    lock.lock();
    try {
        log.debug("main 获取锁");
        func();
    } finally {
        lock.unlock();
    }
}

static void func() {
    lock.lock();
    try {
        log.debug("func 获取锁");
    } finally {
        lock.unlock();
    }
}
15:22:43 [main] c.App - main 获取锁
15:22:43 [main] c.App - func 获取锁

可中断

等待锁时可以使用 interrupt() 打断等待。

java
lock.lockInterruptibly();
java
static ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) throws InterruptedException {
    Thread t = new Thread(() -> {
        log.debug("开始执行");
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            log.debug("被打断");
            return;
        }
        try {
            log.debug("获取到锁");
        } finally {
            lock.unlock();
        }
    }, "t");

    lock.lock();
    try {
        t.start();
        Thread.sleep(1000);
        t.interrupt();
    } finally {
        lock.unlock();
    }
}
15:27:59 [t] c.App - 开始执行
15:28:00 [t] c.App - 被打断

锁超时

java
// 立即失败
lock.tryLock();

// 超时失败
lock.tryLock(1, TimeUnit.SECONDS);
java
public static void main(String[] args) throws InterruptedException {
    ReentrantLock lock = new ReentrantLock();
    Thread t1 = new Thread(() -> {
        log.debug("启动...");
        if (!lock.tryLock()) {
            log.debug("获取立刻失败,返回");
            return;
        }
        try {
            log.debug("获得了锁");
        } finally {
            lock.unlock();
        }
    }, "t1");
    lock.lock();
    try {
        log.debug("获得了锁");
        t1.start();
        Thread.sleep(2);
    } finally {
        lock.unlock();
    }
}
15:42:23 [main] c.App - 获得了锁
15:42:23 [t1] c.App - 启动...
15:42:23 [t1] c.App - 获取立刻失败,返回

公平锁

ReentrantLock 默认是不公平的

java
// 创建公平锁
ReentrantLock lock = new ReentrantLock(true);

条件变量

synchronized 中也有条件变量,当条件不满足时(调用 wait() 方法)进入 waitSet 等待

ReentrantLock 支持多个条件变量的。

  • await 前需要先获得锁
  • await 执行后,会释放锁,进入 condition 等待
  • await 的线程被唤醒(或打断、或超时)会重新竞争锁
  • 竞争 lock 锁成功后,从 await 后继续执行
java
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;

public static void main(String[] args) {
    new Thread(() -> {
        try {
            lock.lock();
            while (!hasCigrette) {
                try {
                    waitCigaretteQueue.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("等到了它的烟");
        } finally {
            lock.unlock();
        }
    }).start();
    new Thread(() -> {
        try {
            lock.lock();
            while (!hasBreakfast) {
                try {
                    waitbreakfastQueue.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("等到了它的早餐");
        } finally {
            lock.unlock();
        }
    }).start();
    sleep(1);
    sendBreakfast();
    sleep(1);
    sendCigarette();
}

private static void sendCigarette() {
    lock.lock();
    try {
        log.debug("送烟来了");
        hasCigrette = true;
        waitCigaretteQueue.signal();
    } finally {
        lock.unlock();
    }
}

private static void sendBreakfast() {
    lock.lock();
    try {
        log.debug("送早餐来了");
        hasBreakfast = true;
        waitbreakfastQueue.signal();
    } finally {
        lock.unlock();
    }
}
18:52:27.680 [main] c.TestCondition - 送早餐来了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟

Released under the MIT License.