共享模型之管程
共享问题
临界区
多个线程访问共享资源,且在多个线程对共享资源读写操作时发生指令交错,就会出现问题
临界区:存在对共享资源的多线程读写操作的代码块
竞态条件
多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件
解决方式
- 非阻塞式解决方案:原子变量
- 阻塞式解决方案:Lock、synchronized
synchronized 初识
synchronized 概述
synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。
synchronized 语法
synchronized(对象) {
临界区
}
方法上的 synchronized
对于成员方法,锁当前对象
class Test{
public synchronized void test() {
}
}
// 等价于
class Test{
public void test() {
synchronized(this) {
}
}
}
对于非成员方法,锁 Class 类对象
class Test{
public synchronized static void test() {
}
}
// 等价于
class Test{
public static void test() {
synchronized(Test.class) {
}
}
}
线程安全分析
线程安全场景分析
成员变量
多个线程访问一个共享资源会出现线程安全问题。
class ThreadUnsafe {
ArrayList<String> list = new ArrayList<>();
public void method1(int loopNumber) {
for (int i = 0; i < loopNumber; i++) {
// { 临界区, 会产生竞态条件
method2();
method3();
// } 临界区
}
}
private void method2() {
// 不是原子操作
list.add("1");
}
private void method3() {
// 不是原子操作
list.remove(0);
}
}
局部变量
每个线程都使用各自的资源,不会出现问题。
class ThreadSafe {
public final void method1(int loopNumber) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < loopNumber; i++) {
method2(list);
method3(list);
}
}
private void method2(ArrayList<String> list) {
list.add("1");
}
private void method3(ArrayList<String> list) {
list.remove(0);
}
}
常见线程安全类
- String、Integer
- StringBuffer
- Vector、HashTable
- juc 包的类
对于线程安全类,多个线程调用单个实例的某个方法时,是线程安全的。方法是原子操作,但多个方法组合不是原子的。
String、Integer 等不可变类是线程安全的。
无状态(没有成员变量)的对象也是线程安全的。
String 类为什么设计成 final 修饰的?
避免子类继承后重写方法后,破坏线程安全性。
Monitor
Java 对象头
Java 对象的组成部分
graph TB
A[Java对象] --> B[对象头]
A --> C[实例数据]
A --> D[对齐填充字节]
B --> E[Mark Word]
B --> F[指向类的指针]
B --> G["数组长度(数组特有)"]
style A fill:#f9f,stroke:#333,stroke-width:4px;
style B fill:#bbf,stroke:#333,stroke-width:2px;
style C fill:#bbf,stroke:#333,stroke-width:2px;
style D fill:#bbf,stroke:#333,stroke-width:2px;
style E fill:#bfb,stroke:#333,stroke-width:1px;
style F fill:#bfb,stroke:#333,stroke-width:1px;
style G fill:#bfb,stroke:#333,stroke-width:1px;
Mark Word 结构
锁状态 | 25bit | 4bit | 1bit | 2bit | |
---|---|---|---|---|---|
23bit | 2bit | 是否偏向锁 | 锁标志位 | ||
无锁 | 对象的HashCode | 分代年龄 | 0 | 01 | |
偏向锁 | 线程ID | Epoch | 分代年龄 | 1 | 01 |
轻量级锁 | 指向栈中锁记录的指针 | 00 | |||
重量级锁 | 指向重量级锁的指针 | 10 | |||
GC标记 | 空 | 11 |
Monitor 原理
Monitor 是监视器或管程。
每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针。
- 刚开始 Monitor 中 Owner 为 null
- 当 Thread-2 执行
synchronized(obj)
就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor中只能有一个 Owner - 在 Thread-2 上锁的过程中,如果 Thread-3,Thread-4,Thread-5 也来执行
synchronized(obj)
,就会进入 EntryList,线程进入 BLOCKED 状态 - Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争是非公平的
- 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足,进入 WAITING 状态的线程,用于 wait-notify 机制
synchronized 原理
锁升级过程
flowchart TD;
A[偏向锁] -->|线程获取| B[轻量级锁]
B -->|CAS成功| C[临界区]
C -->|释放锁| A
B -->|CAS失败| D[锁竞争]
D -->|升级| E[重量级锁]
E -->|线程阻塞| F[等待唤醒]
F -->|锁释放| G[重新尝试]
G -->|获取锁| C
D -->|自旋| H[锁自旋]
H -->|获取成功| C
H -->|获取失败| D
轻量级锁
无竞争线程时使用轻量级锁,有竞争会膨胀为重量级锁。
1、创建锁记录 Object,每个线程都的栈帧都包含一个锁记录,内部可以存储锁定对象的 Mark Word
2、让锁记录中 Object reference 指向锁对象,并尝试使用 CAS 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录
3、如果 CAS 替换成功,对象头中存储了锁记录地址和状态 00 ,表示由该线程给对象加锁
4、如果 CAS 失败,有两种情况
- 如果是其它线程已经持有了该对象的轻量级锁。这表明有竞争,进入锁膨胀,升级成重量级锁
- 如果是自己执行了 synchronized 锁重入,那么再添加一条 Lock Record 作为重入的计数
5、当退出 synchronized 代码块时,如果有取值为 null 的锁记录(重入锁),这时重置锁记录,表示重入计数 -1
6、当退出 synchronized 代码块,锁记录的值不为 null,尝试使用 CAS 将 Mark Word 的值恢复给对象 头
- 恢复成功,则解锁成功
- 恢复失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程
锁膨胀
尝试加轻量级锁的过程中,CAS 操作无法成功(有其它线程已经为此对象加上了轻量级锁),会进行锁膨胀,变成重量级锁。
1、当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁
2、这时 Thread-1 加轻量级锁失败,进入锁膨胀流程。
- 为 Object 对象申请 Monitor 锁,让 Object 指向重量级锁地址
- 然后自己进入 Monitor 的 EntryList 中,线程进入 BLOCKED 状态
3、当 Thread-0 退出同步块解锁时,使用 cas 将 Mark Word 的值恢复给对象头,恢复失败。这时会进入重量级解锁流程,按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 的线程
自旋优化
锁竞争的时可以使用自旋来进行优化。如果当前线程自旋成功(即这时候持锁线程已经释放了锁),当前线程就可以避免阻塞。
自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势
线程加入 EntryList 进入 BLOCKED 阻塞状态需要进行上下文切换,消耗较多资源。
偏向锁
偏向状态
问题:轻量级锁在没有竞争时(就当前线程使用),每次重入仍然需要执行 CAS 操作。
偏向锁:第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有。
处于偏向锁的对象解锁后,线程 id 仍存储于对象头中
- 如果开启了偏向锁(默认开启),那么对象创建后,markword 值最后 3 位为 101,这时它的 thread、epoch、age 都为 0
- 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数
- XX:BiasedLockingStartupDelay=0
来禁用延迟 - 如果没有开启偏向锁,那么对象创建后,markword 值最后 3 位为 001,这时它的 hashcode、 age 都为 0。正常状态对象一开始是没有 hashCode 的,第一次调用才生成
撤销偏向锁 - hashCode
调用了对象的 hashCode()
方法,但偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode()
会导致偏向锁被撤销
轻量级锁会在锁记录中记录 hashCode(CAS 交换到锁对象中)
重量级锁会在 Monitor 中记录 hashCode
撤销偏向锁 - 其它线程使用对象
当有其它线程使用偏向锁对象时(不是竞争锁),会将偏向锁升级为轻量级锁
撤销偏向锁 - 调用 wait/notify
public static void main(String[] args) throws InterruptedException {
Dog d = new Dog();
new Thread(() -> {
log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
synchronized (d) {
log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
try {
d.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug(ClassLayout.parseInstance(d).toPrintableSimple(true));
}
}, "t1").start();
new Thread(() -> {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (d) {
log.debug("notify");
d.notify();
}
}, "t2").start();
}
[t1] - 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000101
[t1] - 00000000 00000000 00000000 00000000 00011111 10110011 11111000 00000101
[t2] - notify
[t1] - 00000000 00000000 00000000 00000000 00011100 11010100 00001101 11001010
批量重偏向 & 批量撤销
当其他线程使用导致撤销偏向锁阈值超过 20 次后,jvm 会在给这些对象加锁时重新偏向至加锁线程
当撤销偏向锁阈值超过 40 次后,整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的
锁消除
JVM 会在运行时检测到某些代码段中的锁实际上并不需要被持有,从而消除这些不必要的锁。
wait & notify
wait & notify 概述
obj.wait()
:让当前线程进入 object 监视器的线程到 waitSet 等待。会释放对象的锁,从而让其他线程就机会获取对象的锁,无限制等待,直到被notify()
为止obj.notify()
:在 object 上正在 WaitSet 等待的线程中挑一个唤醒obj.notifyAll()
:让 object 上正在 WaitSet 等待的线程全部唤醒
sleep(long n)
和wait(long n)
的区别:
- sleep 是 Thread 方法,而 wait 是 Object 的方法
- sleep 不需要强制和 synchronized 配合使用,但 wait 必须和 synchronized 一起用
- sleep 在睡眠时不会释放对象锁的,但 wait 在等待的时会释放对象锁
- 它们的状态都是 TIMED_WAITING
wait & notify 最佳实践
static boolean ok = false;
// 线程1
synchronized(lock) {
while(!ok) {
lock.wait();
}
// 干活
}
// 线程2
synchronized(lock) {
ok = true;
lock.notifyAll();
}
同步:保护性暂停
保护性暂停:用于在一个线程等待另一个线程的执行结果
如果有结果不断从一个线程到另一个线程那么可以使用消息队列(生产者/消费者模型)
class GuardedObject {
private Object res;
public synchronized Object get() {
while (res == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return res;
}
public synchronized Object get(long timeout) {
long begin = System.currentTimeMillis();
long passed = 0;
while (res == null) {
long wait = timeout - passed;
if (wait <= 0) {
break;
}
try {
this.wait(wait);
} catch (InterruptedException e) {
e.printStackTrace();
}
passed = System.currentTimeMillis() - begin;
}
return res;
}
public synchronized void set(Object res) {
this.res = res;
this.notifyAll();
}
}
join() 的原理
join()
:调用者轮询检查线程 alive 的状态
public final synchronized void join(final long millis)
throws InterruptedException {
if (millis > 0) {
if (isAlive()) {
final long startTime = System.nanoTime();
long delay = millis;
do {
wait(delay);
} while (isAlive() && (delay = millis -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
}
} else if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
throw new IllegalArgumentException("timeout value is negative");
}
}
异步:生产者消费者
- 不需要产生和消费的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK 中各种阻塞队列,采用的就是这种模式
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
class MessageQueue {
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
while (queue.isEmpty()) {
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
while (queue.size() == capacity) {
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}
park & unpark
park & unpark 概述
LockSupport 类中的方法,先 park 再 unpark。
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
特点:
- wait,notify 和 notifyAll 必须配合 Object Monitor(synchronized 代码块) 一起使用,而 park,unpark 不需要
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
- park & unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,没那么精确
park & unpark 原理
park()
- 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为 0,获得_mutex
互斥锁 - 线程进入
_cond
条件变量阻塞 - 设置
_counter = 0
park() -> unpark()
- 调用
Unsafe.unpark(Thread_0)
方法,设置_counter
为 1 - 唤醒
_cond
条件变量中的 Thread_0 - Thread_0 恢复运行
- 设置
_counter
为 0
unpark() -> park()
- 调用
Unsafe.unpark(Thread_0)
方法,设置_counter
为 1 - 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为 1,这时线程无需阻塞,继续运行 - 设置
_counter
为 0
线程状态转换
New -> Runnable:调用
t.start()
方法Runnable <--> Waiting:
- 在
synchronized
获取锁之后,调用obj.wait()
方法 - 调用
obj.notify()
、obj.notifyAll()
、t.interrupt()
方法
- 在
Runnable <--> Waiting:
- 调用
t.join()
方法,等待指定线程结束 - 线程运行结束,或调用了当前线程的
interrupt()
时
- 调用
Runnable <--> Waiting:
- 调用
LockSupport.park()
方法 - 调用
LockSupport.unpark()
方法,或调用了当前线程的interrupt()
时
- 调用
Runnable <--> Timed_Waiting:
- 在 synchronized 获取锁之后,调用
obj.wait(long n)
方法 - 到达指定时间后,或调用
obj.notify()
、obj.notifyAll()
、t.interrupt()
方法
- 在 synchronized 获取锁之后,调用
Runnable <--> Timed_Waiting:
- 调用
t.join(long n)
方法 - 线程运行结束、到达指定时间后,或调用了当前线程的
interrupt()
时
- 调用
Runnable <--> Timed_Waiting:
- 调用
LockSupport.parkNanos(long nanos)
方法 - 到达指定时间后、调用
LockSupport.unpark()
方法,或调用了当前线程的interrupt()
时
- 调用
Runnable <--> Timed_Waiting:
- 调用
Thread.sleep(long n)
方法 - 到达指定时间后
- 调用
Runnable <--> Blocked:
- 等待
synchronized
代码块并竞争锁 - 竞争成功,获取到锁
- 等待
Runnable -> Terminated:线程运行结束
活跃性
死锁
死锁:t1 线程获得A对象锁,接下来想获取B对象的锁;t2 线程获得B对象锁,接下来想获取A对象的锁。
定位死锁:检测死锁可以使用 jconsole工具,或使用 jps 定位进程 id,再用 jstack 定位死锁。
哲学家就餐问题
- 有五位哲学家,围坐在圆桌旁。
- 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。
- 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。
- 如果筷子被身边的人拿着,自己就得等待。
活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。
举例:t1 线程 a++,t2 线程 a-- 且两者频率相同。
饥饿
按顺序获取锁时,已获取到多个锁的线程更容易被调度运行。
ReentrantLock
ReentrantLock 概述
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
- 可重入
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入
同一个线程如果首次获得了这把锁后,可以再次获取这把锁
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try {
log.debug("main 获取锁");
func();
} finally {
lock.unlock();
}
}
static void func() {
lock.lock();
try {
log.debug("func 获取锁");
} finally {
lock.unlock();
}
}
15:22:43 [main] c.App - main 获取锁
15:22:43 [main] c.App - func 获取锁
可中断
等待锁时可以使用 interrupt()
打断等待。
lock.lockInterruptibly();
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
log.debug("开始执行");
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
log.debug("被打断");
return;
}
try {
log.debug("获取到锁");
} finally {
lock.unlock();
}
}, "t");
lock.lock();
try {
t.start();
Thread.sleep(1000);
t.interrupt();
} finally {
lock.unlock();
}
}
15:27:59 [t] c.App - 开始执行
15:28:00 [t] c.App - 被打断
锁超时
// 立即失败
lock.tryLock();
// 超时失败
lock.tryLock(1, TimeUnit.SECONDS);
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
try {
log.debug("获得了锁");
t1.start();
Thread.sleep(2);
} finally {
lock.unlock();
}
}
15:42:23 [main] c.App - 获得了锁
15:42:23 [t1] c.App - 启动...
15:42:23 [t1] c.App - 获取立刻失败,返回
公平锁
ReentrantLock 默认是不公平的
// 创建公平锁
ReentrantLock lock = new ReentrantLock(true);
条件变量
synchronized
中也有条件变量,当条件不满足时(调用wait()
方法)进入 waitSet 等待
ReentrantLock
支持多个条件变量的。
- await 前需要先获得锁
- await 执行后,会释放锁,进入 condition 等待
- await 的线程被唤醒(或打断、或超时)会重新竞争锁
- 竞争 lock 锁成功后,从 await 后继续执行
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
18:52:27.680 [main] c.TestCondition - 送早餐来了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟