并发编程: 管程

张天宇 on 2020-06-07

Java并发编程第二篇,共享模型之管程,各种锁。

1. 共享带来的问题

临界区 Critical Section
  • 一个程序运行多个线程本身是没有问题的
  • 问题出在多个线程访问共享资源
    • 多个线程访问共享资源其实也没有问题
    • 在多个线程对共享资源读写操作时发生指令交错,就会出现问题
  • 一段代码块内如果存在对共享资源的多线程读写操作,称这段代码为临界区
1
2
3
4
5
6
7
8
9
10
static int counter = 0;
static void increment(){
// 临界区
counter++;
}
static void decrement(){
// 临界区
counter--;
}
// 参考JVM内存模型中原子操作类
竞态条件 Race Condition

多个线程在临界区执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件。

为了避免临界区的竞态条件发生,有多种手段可以达到目的。

  • 阻塞式的解决方案,synchronized,Lock
  • 非阻塞式的解决方案:原子变量

2. synchronized解决方案

俗称对象锁,它采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其他线程再想获取这个对象锁时就会被阻塞住,这样就能保证拥有锁的线程可以安全的执行临界区的代码,不用担心上下文的切换。

虽然Java中互斥和同步都可以采用synchronized关键字来完成,但他们还是有区别的:

  • 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
  • 同步是由于线程执行的先后、顺序不同,需要一个线程等待其他线程运行到某个点

Synchonized是基于进入和退出Monitor对象来实现方法同步和代码块同步,但两者的实现细节不一样。Synchronized 用在方法上时,在字节码中是通过方法的 ACC_SYNCHRONIZED 标志来实现的。而代码块同步则是使用monitorenter和monitorexit指令实现的。

monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处,JVM要保证每个monitorenter必须有对应的monitorexit与之配对。任何对象都有一个monitor与之关联,当且一个monitor被持有后,它将处于锁定状态。线程执行到monitorenter指令时,将会尝试获取对象所对应的monitor的所有权,即尝试获得对象的锁,当获得对象的monitor以后,monitor内部的计数器就会自增(初始为0),当同一个线程再次获得monitor的时候,计数器会再次自增。当同一个线程执行monitorexit指令的时候,计数器会进行自减,当计数器为0的时候,monitor就会被释放,其他线程便可以获得monitor。

语法
1
2
3
synchronized (对象){
// 临界区
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int counter = 0;
static final Object room = new Object();
public static void main(String[] args) throws InterruptedException{
Thread t1 = new Thread(() -> {
for (int i = 0; i <5000; i++){
synchronized (room){
counter++;
}
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i <5000; i++){
synchronized (room){
counter--;
}
}
});
t1.start();
t2.start();
t1.join();
t2/join();
}

synchronized实际上是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切断所打断。

面向对象改进
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Slf4j
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Room room = new Room();
Thread thread = new Thread(() -> {
for (int i = 0; i < 5000; i++)
room.increment();
});
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 5000; i++)
room.decrement();
});
thread.start();
thread1.start();
thread.join();
thread1.join();
log.debug("主线程结束{}", room.getCounter());
}
}
class Room {
private int counter = 0;
public void increment() {
synchronized (this) {
counter++;
}
}
public void decrement() {
synchronized (this) {
counter--;
}
}
public int getCounter() {
synchronized (this) {
return counter;
}
}
}

3. 方法上的synchronized

1
2
3
4
5
6
7
8
9
10
11
class test {
public synchronized void test() {
}
}
// 等价于
class test {
public void test() {
synchronized(this) {
}
}
}
1
2
3
4
5
6
7
8
9
10
11
class test {
public synchronized static void test() {
}
}
// 等价于
class test {
public static void test() {
synchronized(test.class) {
}
}
}
synchronized 关键字修饰方法和代码的区别
  • synchronized修饰方法:同步是隐式的,给方法添加ACC_SYNCHRONIZED的访问修饰符。当某个线程要访问某个方法的时候,会检查是否有ACC_SYNCHRONIZED,如有,则需要先获取当前对象的监视器锁,然后才开始执行方法。方法执行之后再释放监视器锁。
  • 同步代码块:采用monitorentermonitorexit两个指令来实现同步。在执行monitorenter指令时,首先要尝试获取对象的锁。如果这个对象没被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1,相应的,在执行monitorexit指令时会将锁计数器减1,当计数器为0的时候,锁就会被释放。如果获取对象锁失败,那当前线程就要阻塞等待。直到对象锁被另外一个线程释放为止。

练习:线程八锁

情况1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Number n1 = new Number();
new Thread(() -> {
log.debug("begin");
n1.a();
}).start();
new Thread(() -> {
log.debug("begin");
n1.b();
}).start();
}
}
@Slf4j
class Number {
public synchronized void a() {
log.debug("1");
}

public synchronized void b() {
log.debug("2");
}
}
/**
两个锁锁的是同一个对象,12或21都可能
2020-06-03 19:28:16.057 DEBUG Test - begin
2020-06-03 19:28:16.059 DEBUG Test - begin
2020-06-03 19:28:16.061 DEBUG Number - 1
2020-06-03 19:28:16.061 DEBUG Number - 2
**/

情况2,修改Number类如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
class Number {
public synchronized void a() {
Thred.sleep(1000);
log.debug("1");
}

public synchronized void b() {
log.debug("2");
}
}
/**
此时可能会出现,等一秒,1,释放锁,2
或者2,释放锁,等一秒,1
**/

情况3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Number n1 = new Number();
new Thread(() -> {
log.debug("begin");
n1.a();
}).start();
new Thread(() -> {
log.debug("begin");
n1.b();
}).start();
new Thread(() -> {
log.debug("begin");
n1.c();
}).start();
}
}
@Slf4j
class Number {
public synchronized void a() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
public void c() {
log.debug("3");
}
}
/** 3, 1s, 12
23/32, 1s, 1
2020-06-03 19:36:22.053 DEBUG Test - begin
2020-06-03 19:36:22.066 DEBUG Test - begin
2020-06-03 19:36:22.067 DEBUG Test - begin
2020-06-03 19:36:22.068 DEBUG Number - 3
2020-06-03 19:36:23.060 DEBUG Number - 1
2020-06-03 19:36:23.060 DEBUG Number - 2
**/

情况4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Number n1 = new Number();
Number n2 = new Number();
new Thread(() -> {
log.debug("begin");
n1.a();
}).start();
new Thread(() -> {
log.debug("begin");
n2.b();
}).start();
}
}
@Slf4j
class Number {
public synchronized void a() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
/**
无互斥发生,2, 1s, 1
**/

情况5

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Number n1 = new Number();
new Thread(() -> {
log.debug("begin");
n1.a();
}).start();
new Thread(() -> {
log.debug("begin");
n1.b();
}).start();
}
}
@Slf4j
class Number {
public static synchronized void a() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
/**
a 锁住的是类对象,b 锁住的是this,不存在互斥的问题,2,1s,1
**/

情况6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Number n1 = new Number();
Number n2 = new Number();
new Thread(() -> {
log.debug("begin");
n1.a();
}).start();
new Thread(() -> {
log.debug("begin");
n1.b();
}).start();
}
}
@Slf4j
class Number {
public static synchronized void a() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("1");
}
public static synchronized void b() {
log.debug("2");
}
}
/**
有互斥情况发生,看CPU先调度谁
**/

情况7

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Number n1 = new Number();
Number n2 = new Number();
new Thread(() -> {
log.debug("begin");
n1.a();
}).start();
new Thread(() -> {
log.debug("begin");
n2.b();
}).start();
}
}
@Slf4j
class Number {
public static synchronized void a() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
/**
不存在互斥,并行执行,先2,1s,1
**/

情况8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Number n1 = new Number();
Number n2 = new Number();
new Thread(() -> {
log.debug("begin");
n1.a();
}).start();
new Thread(() -> {
log.debug("begin");
n2.b();
}).start();
}
}
@Slf4j
class Number {
public static synchronized void a() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("1");
}
public static synchronized void b() {
log.debug("2");
}
}
/**
有互斥现象存在
**/

4. 线程安全分析

成员变量和静态变量是否线程安全
  • 如果它们没有共享,则线程安全
  • 如果它们被共享了,根据它们的状态是否能被改变,又分两种情况:
    • 如果只有读操作,则线程安全
    • 如果只有写操作,则这段代码属于临界区,需要考虑线程安全
局部变量是否线程安全
  • 局部变量是线程安全的
  • 但局部变量引用的对象未必
    • 如果该对象没有逃离方法的作用域访问,它是线程安全的
    • 如果该对象逃离方法的作用范围,需要考虑线程安全
局部变量线程安全分析
1
2
3
4
public static void text(){
int i = 10;
i++;
}

每个线程调用test()方法时局部变量i,会在每个线程的栈帧内存中被创建多份,因此不存在共享。

但局部变量的引用稍有不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class Test {
static final int THREAD_NUMBER = 2;
static final int LOOP_NUMBER = 200;
public static void main(String[] args) {
ThreadUnsafe test = new ThreadUnsafe();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> {
test.method1(LOOP_NUMBER);
}, "Thread" + i).start();
}
}
}
class ThreadUnsafe {
ArrayList<String> list = new ArrayList<>();
public void method1(int loopNumber) {
for (int i = 0; i < loopNumber; i++) {
// 临界区, 会产生竞态条件
method2();
method3();
// 临界区
}
}
private void method2() {
list.add("1");
}
private void method3() {
list.remove(0);
}
}
/**
其中一种情况是,如果线程2还未add,线程1remove就会报错
Exception in thread "Thread0" java.lang.IndexOutOfBoundsException: Index: 0, Size: 1
**/

无论哪个线程中的method2引用的都是同一个对象中的list成员变量,method3与method2分析相同。

修改为局部变量,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class Test {
static final int THREAD_NUMBER = 2;
static final int LOOP_NUMBER = 200;

public static void main(String[] args) {
ThreadSafe test = new ThreadSafe();
for (int i = 0; i < THREAD_NUMBER; i++) {
new Thread(() -> {
test.method1(LOOP_NUMBER);
}, "Thread" + i).start();
}
}
}
class ThreadSafe {
public final void method1(int loopNumber) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < loopNumber; i++) {
// { 临界区, 会产生竞态条件
method2(list);
method3(list);
// } 临界区
}
}
private void method2(ArrayList<String> list) {
list.add("1");
}
private void method3(ArrayList<String> list) {
list.remove(0);
}
}
  • list是局部变量,每个线程调用时会创建其不同实例,没有共享
  • 而method2的参数是由method1中传递过来的,与method1引用同一个对象
  • method3的参数分析与method2相同
常见线程安全类
  • String
  • Integer
  • StringBuffer
  • Random
  • Vector
  • Hashtable
  • java.util.concurrent下的类

这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法,是线程安全的,也可以理解为:

  • 它们的每个方法是原子的
  • 但是它们多个方法组合不是原子的
不可变线程安全性

String、Integer等都是不可变类,因为其内部的状态不可改变,因此它们的方法都是线程安全的。

实例分析
例1
1
2
3
4
5
6
7
8
9
10
11
12
public class MyServlet extends HttpServlet {
// 多个请求读写内容造成混乱,不是线程安全问题
Map<String, Object> map = new HashMap<>();
// 不可变类,线程安全
String S1 = "...";
// 线程安全
final String S2 = "...";
// 不是线程安全
Date Da = new Date();
// 仍然不是
final Date D2 = new Date();
}
例2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ublic class MyServlet extends HttpServlet {
// 非线程安全
private UserService userService = new UserServiceImpl();
public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
// 记录调用次数
private int count = 0;
public void update() {
// ...
// 临界区
count++;
}
}
例3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Aspect
@Component
public class MyAspect {
// 不加Scope说明是单例,会被共享,会存在对象的并发修改,使用环绕通知
private long start = 0L;
@Before("execution(* *(..))")
public void before() {
start = System.nanoTime();
}
@After("execution(* *(..))")
public void after() {
long end = System.nanoTime();
System.out.println("cost time:" + (end-start));
}
}
例4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MyServlet extends HttpServlet {
// 线程安全,因为private私有,没有其他地方修改
private UserService userService = new UserServiceImpl();
public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
// 虽然会被共享,但是userDao里面没有可以被更改的属性了,所以是线程安全的
private UserDao userDao = new UserDaoImpl();
public void update() {
userDao.update();
}
}
public class UserDaoImpl implements UserDao {
// 没有成员变量的类一般是线程安全的
public void update() {
String sql = "update user set password = ? where username = ?";
// 方法内局部变量,线程安全
try (Connection conn = DriverManager.getConnection("","","")){
// ...
} catch (Exception e) {
// ...
}
}
}
例5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyServlet extends HttpServlet {
private UserService userService = new UserServiceImpl();
public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
private UserDao userDao = new UserDaoImpl();
public void update() {
userDao.update();
}
}
public class UserDaoImpl implements UserDao {
// 不是方法内的局部变量,而是方法变量,会被多个线程所共享
private Connection conn = null;
public void update() throws SQLException {
String sql = "update user set password = ? where username = ?";
conn = DriverManager.getConnection("","","");
// ...
conn.close();
}
}
例6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MyServlet extends HttpServlet {
private UserService userService = new UserServiceImpl();
public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
public void update() {
// 作为方法内局部变量出现,connection不会出现线程安全问题
UserDao userDao = new UserDaoImpl();
userDao.update();
}
}
public class UserDaoImpl implements UserDao {
private Connection = null;
public void update() throws SQLException {
String sql = "update user set password = ? where username = ?";
conn = DriverManager.getConnection("","","");
// ...
conn.close();
}
}
例7
1
2
3
4
5
6
7
8
9
10
11
public abstract class Test {
public void bar() {
// 是否安全
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
foo(sdf);
}
public abstract foo(SimpleDateFormat sdf);
public static void main(String[] args) {
new Test().bar();
}
}

其中foo的行为是不确定的,可能导致不安全的发生,被称之为外星方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void foo(SimpleDateFormat sdf) {
String dateStr = "1999-10-11 00:00:00";
for (int i = 0; i < 20; i++) {
// 在一个新的线程中使用
new Thread(() -> {
try {
sdf.parse(dateStr);
} catch (ParseException e) {
e.printStackTrace();
}
}).start();
}
}

6. Monitor概念

被翻译为监视器或者管程

每个Java对象都可以关联一个Monitor对象,如果使用synchronized给对象上锁(重量级)之后,该对象头的Mark Word中就被设置指向Monitor对象的指针。

  • 刚开始Monitor中Owner为null
  • 当Thread-2执行synchronized(obj)就会将Monitor的所有者Owner置为Thread-2,Monitor中只能有一个Owner
  • 在Thread-2上锁的过程中,如果Thread-3、Thread-4、Thread-5也来执行synchronized(obj),就会进入EntryList BLOCKED
  • Thread-2执行完同步代码块的内容,然后唤醒EntryList中等待的线程来竞争锁,竞争的时候是非公平的

注意

  • synchronized必须是进入同一个对象的monitor才有上述的效果
  • 不加synchronized的对象不会监视器,不遵从以上规则
synchronized进阶
1
2
3
4
5
6
7
8
9
10
11
12
static final Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized( obj ) {
// 同步块 B
}
}
  • 创建锁记录(LockRecord)对象,每个线程的栈帧都会包括一个锁记录的结构,内部可以存储锁定对象的MarkWord

  • 让锁记录中Object Reference指向锁对象,并尝试用CAS(Compare and swap)替换Object的MarkWord,将MarkWord的值存入锁记录

  • 如果CAS替换成功,对象头存储了锁记录地址和状态00,表示由线程给该对象加锁,这时图示如下

  • 如果CAS失败,有两种情况

    • 如果是其他线程已经持有了该Object的轻量级锁,这时候表明有竞争,进入锁膨胀过程

    • 如果是自己执行了synchronized锁重入,那么再添一条LockRecord作为重入的计数

  • 当退出synchronized代码块(解锁时)如果有取值为null的锁记录,表示有重入,这时重置锁记录,表示重入计数减一

  • 当退出synchronized代码块(解锁时)锁记录的值不为null,这时使用cas将MarkWord的值恢复给对象头

    • 成功,则解锁成功
    • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量锁,进入重量级锁解锁流程
锁膨胀

如果在尝试加轻量锁的过程中,CAS操作无法成功,这时一种情况就是有其他线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。

1
2
3
4
5
6
static Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块
}
}
  • 当Thread-1进行轻量级加锁时,Thread-0已经对该对象加了轻量级锁

  • 这时Thread-1加轻量级锁失败,进入锁膨胀流程

    • 即为Object对象申请Monitor锁,让Object指向重量级锁地址

    • 然后自己进入Monitor的EntryList BLOCKED

  • 当Thread-0退出同步代码块解锁时,使用CAS将MarkWord的值恢复给对象头,失败,这时会进入重量级解锁流程,即按照Monitor地址找到Monitor对象,设置Owner为null,唤醒EntryList中BLOCKED线程

自旋优化

重量级锁竞争的时候,还可以使用自旋来优化,如果当前线程自旋成功(即这时候锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。

  • 自旋重试成功的情况
  • 失败

偏向锁

轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行CAS操作。

Java6中引入了偏向锁来做进一步优化,只有第一次使用CAS将线程ID设置到对象的MarkWord头,之后发现这个线程ID是自己的就表示没有竞争,不用重新CAS,以后只要不发生竞争,这个对象就归该线程所有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final Object obj = new Object();
public static void m1() {
synchronized( obj ) {
// 同步块 A
m2();
}
}
public static void m2() {
synchronized( obj ) {
// 同步块 B
m3();
}
}
public static void m3() {
synchronized( obj ) {
// 同步块 C
}
}
偏向状态

一个对象创建时:

  • 如果开启了偏向锁(默认开启),那么对象创建后,markword值为0x05即后三位为101,这时他的thread、epoch、age都为0
  • 偏向锁默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加JVM参数-XX:biasedLockingStartupDelay=0来禁用延迟
  • 如果没有开启偏向锁,那么对象创建后,markword值为0x01即后三位为001,这时它的hashcode、age都为0,第一次用到hashcode时候才会赋值
  • -XX:-UseBiasedLocking,禁用偏向锁
  • 当一个可偏向的对象,调用了自己的hashcode之后,会撤销他的偏向状态
撤销-调用hashcode

调用了对象的hashcode,但偏向锁的对象markword中存储的是线程id,如果调用hashCode会导致偏向锁被撤销。

  • 轻量级锁会在锁记录中记录hashCode
  • 重量级锁会在Monitor中记录hashCode

在调用hashCode后使用偏向锁,记得去掉-XX:-UseBiasedLocking

撤销-其他线程使用对象

当有其他线程使用偏向锁对象时,会将偏向锁升级为轻量级锁,由可偏向变为不可偏向。

撤销-调用wait/notify

因为wait/notify只有重量级锁才有,因此调用了就会升级为重量级锁。

批量重偏向

如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程T1的对象仍然有机会重新偏向T2,重偏向会重置对象的ThreadID。

当撤销偏向锁阈值超过二十次以后,JVM会觉得自己是不是偏向错了,于是会给这些对象加锁时重新偏向至加锁线程。

批量撤销

当撤销偏向锁阈值超过40次后,JVM会觉得自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的。

锁消除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Fork(1)
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations=3)
@Measurement(iterations=5)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class MyBenchmark {
static int x = 0;
@Benchmark
public void a() throws Exception {
x++;
}
@Benchmark
public void b() throws Exception {
Object o = new Object();
synchronized (o) {
x++;
}
}
}
1
2
3
4
# java -jar benchmarks.jar 热点代码优化,性能相差无几
Benchmark Mode Samples Score Score error Units
c.i.MyBenchmark.a avgt 5 1.542 0.056 ns/op
c.i.MyBenchmark.b avgt 5 1.518 0.091 ns/op
1
2
3
4
# java -XX:-EliminateLocks -jar benchmarks.jar 关闭锁消除优化
Benchmark Mode Samples Score Score error Units
c.i.MyBenchmark.a avgt 5 1.507 0.108 ns/op
c.i.MyBenchmark.b avgt 5 16.976 1.572 ns/op

锁粗化:对相同对象多次加锁,导致线程发生多次重入,可以使用锁粗化方式来优化,这不同于之前讲的细分锁的粒度。

7. wait / notify

  • Owner线程发现条件不满足,调用wait方法,即可进入WaitSet变为Waiting状态
  • BLOCKED和WAITING的线程都处于阻塞状态,不占用CPU时间片
  • BLOCKED线程会在Owner线程释放锁时唤醒
  • WAITING线程会在Owner线程调用notify或notufyAll时候唤醒, 但唤醒后并不意味着立刻获得锁,仍需进入EntryList重新竞争
API
  • obj.wait()让进入object监视器的线程到waitSet等待
  • obj.notify()在object上正在waitSet等待的线程中挑一个唤醒
  • obj.notifyAll()让object上正在waitSet等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于Object对象的方法,必须获得此对象的锁,才能调用这几个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
final static Object obj = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait(); // 让线程在obj上一直等待下去
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码....");
}
}).start();
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait(); // 让线程在obj上一直等待下去
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码....");
}
}).start();
// 主线程两秒后执行
sleep(2);
log.debug("唤醒 obj 上其它线程");
synchronized (obj) {
obj.notify(); // 唤醒obj上一个线程
// obj.notifyAll(); // 唤醒obj上所有等待线程
}
}
1
2
3
4
5
# notify的一种结果
20:00:53.096 [Thread-0] c.TestWaitNotify - 执行....
20:00:53.099 [Thread-1] c.TestWaitNotify - 执行....
20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....
1
2
3
4
5
6
# notifyAll的结果
19:58:15.457 [Thread-0] c.TestWaitNotify - 执行....
19:58:15.460 [Thread-1] c.TestWaitNotify - 执行....
19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码....
19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码....

wait()方法会释放对象的锁,进入WaitSet等待区,从而让其他线程有机会获得对象的锁。无限制等待直到notify为止。

wait(long n)有时限的等待,到n毫秒后结束等待,或被notify,wait(long timeout, int nanos)纳秒。

8. wait/notify的正确姿势

sleep和notify的区别
  • sleep是Thread方法,而wait是Object的方法
  • sleep不需要强制和synchronized配合使用,但wait需要和synchronized使用
  • sleep在睡眠的同时,不会释放对象锁的,但wait在等待的时候会释放对象锁
  • 它们的状态都是TIMED_WAITING
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) { // 使用while解决虚假唤醒问题
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小南").start();
new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外卖送到没?[{}]", hasTakeout);
if (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();
sleep(1);
new Thread(() -> {
synchronized (room) {
hasTakeout = true;
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送外卖的").start();

套路总结

1
2
3
4
5
6
7
8
9
10
synchronized(lock) {
while(条件不成立) {
lock.wait();
}
// 干活
}
//另一个线程
synchronized(lock) {
lock.notifyAll();
}
同步模式之保护性暂停

Guarded Suspension,用在一个线程等待另一个线程的执行结果。

  • 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个GuardedObject
  • 如果有结果不断从一个线程到另一个线程,可以使用消息队列
  • JDK中,join的实现、Future的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class GuardedObject {
private Object response;
private final Object lock = new Object();
public Object get() {
synchronized (lock) {
// 条件不满足则等待
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
try {
// 子线程执行下载
List<String> response = download();
log.debug("download complete...");
guardedObject.complete(response);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
log.debug("waiting...");
// 主线程阻塞等待
Object response = guardedObject.get();
log.debug("get response: [{}] lines", ((List<String>) response).size());
}
增加超时效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class GuardedObjectV {
private Object response;
private final Object lock = new Object();
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}",
timePassed, response == null);
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
log.debug("notify...");
lock.notifyAll();
}
}
}
join原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
解耦

以居民邮箱为例,在多个类之间进行参数传递,设计一个用来解耦的中间类,解耦结果等待者和结果生产者。

生产线程和结果使用线程是一一对应的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@Slf4j
public class Test {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Thread.sleep(1000);
for (Integer id : MailBoxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
//业务相关类
@Slf4j
class People extends Thread {
@Override
public void run() {
// 收信操作
GuardedObject guardedObject = MailBoxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{} 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j
class Postman extends Thread {
private int id;
private String mail;

public Postman(int id, String mall) {
this.id = id;
this.mail = mall;
}
@Override
public void run() {
GuardedObject guardedObject = MailBoxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
// 用于解耦的通用类
class MailBoxes {
public static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
class GuardedObject {
// 新建一个id用来表示GuardedObject
private int id;
private Object response;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
public Object get(long timeout) {
synchronized (this) {
long begin = System.currentTimeMillis();
long passedTime = 0;
while (response == null) {
long waitTime = timeout - passedTime;
if (timeout - passedTime <= 0) {
break;
}
try {
this.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passedTime = System.currentTimeMillis() - begin;
}
return response;
}
}
public void complete(Object response) {
synchronized (this) {
this.response = response;
this.notifyAll();
}
}
}
/**
2020-06-07 10:59:22.336 DEBUG People - 开始收信 id:3
2020-06-07 10:59:22.336 DEBUG People - 开始收信 id:1
2020-06-07 10:59:22.336 DEBUG People - 开始收信 id:2
2020-06-07 10:59:23.368 DEBUG Postman - 送信 id:3, 内容:内容3
2020-06-07 10:59:23.369 DEBUG People - 收到信 id:3 内容:内容3
2020-06-07 10:59:23.369 DEBUG Postman - 送信 id:2, 内容:内容2
2020-06-07 10:59:23.369 DEBUG People - 收到信 id:2 内容:内容2
2020-06-07 10:59:23.370 DEBUG Postman - 送信 id:1, 内容:内容1
2020-06-07 10:59:23.370 DEBUG People - 收到信 id:1 内容:内容1
**/
异步模式之生产者消费者
  • 与保护性暂停中的GuardedObject不同,不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK中各种阻塞队列,采用的就是这种模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@Slf4j
public class Test {
public static void main(String[] args) throws InterruptedException {
MessageQueue messageQueue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
messageQueue.put(new Message(id, "值" + id));
}, "生产者" + i).start();
}
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Message message = messageQueue.take();
}
}, "消费者").start();
}
}
@Slf4j
class MessageQueue {
private LinkedList<Message> queue;
private int capacity;

MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
while (queue.isEmpty()) {
log.debug("没货了,wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = queue.removeFirst();
log.debug("已经消费了一个消息");
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
while (queue.size() == capacity) {
log.debug("库存已经达到上限,wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
log.debug("已经生产了一个消息");
queue.notifyAll();
}
}
}
final class Message {
private int id;
private Object message;
Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
/**
2020-06-07 14:58:19.298 DEBUG MessageQueue - 已经生产了一个消息
2020-06-07 14:58:19.303 DEBUG MessageQueue - 已经生产了一个消息
2020-06-07 14:58:19.303 DEBUG MessageQueue - 库存已经达到上限,wait
2020-06-07 14:58:20.296 DEBUG MessageQueue - 已经消费了一个消息
2020-06-07 14:58:20.297 DEBUG MessageQueue - 已经生产了一个消息
2020-06-07 14:58:21.297 DEBUG MessageQueue - 已经消费了一个消息
2020-06-07 14:58:22.297 DEBUG MessageQueue - 已经消费了一个消息
2020-06-07 14:58:23.297 DEBUG MessageQueue - 没货了,wait
**/

9. Park & Unpark

基本使用
1
2
3
4
5
// 它们是LockSupport类的方法
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程的对象);
1
2
3
4
5
6
7
8
9
10
11
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);
特点
  • wait、notify和notifyAll必须配合Object Monitor一起使用,而park、unpark不必
  • park & unpark以线程为单位来阻塞和唤醒线程,而notify只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,不是那么精确
  • park & unpark可以先unpark,而wait & notify不能先notify
原理

每个线程都有自己的一个Parker对象,由三部分组成,_counter_cond和*_mutex*,打个比喻。

  • 线程就像一个旅人,Parker就像他随身携带的背包,条件变量就好比背包中的帐篷,_counter就好比背包中的备用干粮(0为耗尽,1为充足)
  • 调用park就是要看需不需要停下来歇息
    • 如果备用干粮耗尽,那么钻进帐篷休息
    • 如果备用干粮充足,那么不需停留,继续前进
  • 调用unpark,就好比令干粮充足
    • 如果这时线程还在帐篷,就唤醒让他继续前进
    • 如果这时线程还在运行,那么他下次调用park时,仅是消耗备用干粮,不需停留继续前进
      • 因为背包空间有限,多次调用unpark仅会补充一份备用干粮

10. 重新理解线程状态转换




11. 多把锁

一间屋子有学习和睡觉两个功能,如果A要学习,B要睡觉,但是如果只用一个屋子即一个对象锁的话,并发度很低,解决办法就是转杯多个房间即多个对象锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class BigRoom {
public void sleep() {
synchronized (this) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (this) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}

改进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}

将锁的细粒度细分

  • 好处:增强并发度
  • 坏处:如果一个线程需要同时获得多把锁,容易发生死锁

12. 活跃性

死锁

一个线程需要同时获得多把锁,这时候就容易发生死锁。

  • T1线程获得A对象锁,接下来想获得B对象锁
  • T2线程获得B对象锁,接下来想获得A对象锁

使用顺序加锁的方式解决该问题,即都按照AB的顺序获得锁。

定位死锁
  • 检测死锁可以使用jsconsole工具,或者使用jps定位进程id,再用jstact定位死锁
  • 避免死锁要注意加锁顺序
  • 如果某个进程进入了死循环,导致其它线程一直等待,对于这种情况,Linux下可以通过top先定位到CPU占用高的Java进程,再利用top -Hp 进程id来定位哪个进程,最后再用jstack排查
活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}
饥饿

一个线程由于优先级太低,始终得不到CPU调度执行,也不能结束。

13. ReentrantLock

可重入锁

  • 可中断
  • 可以设置超时时间
  • 可以设置公平锁
  • 支持多个条件变量

与synchronized一样,都支持可重入。

1
2
3
4
5
6
7
8
9
// 先创建一个ReentrantLock对象
// 获取锁
reentrantLock.lock();
try{
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入

可重入是指同一个线程如果首次获得了这把锁,那么是因为它是这把锁的拥有者,因此有权利再次获取这把锁。

如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
/**
17:59:11.862 [main] c.TestReentrant - execute method1
17:59:11.865 [main] c.TestReentrant - execute method2
17:59:11.865 [main] c.TestReentrant - execute method3
**/
可打断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
lock.lockInterruptibly(); // 使用可打断加锁方式
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
} finally {
lock.unlock();
}
/**
18:02:40.520 [main] c.TestInterrupt - 获得了锁
18:02:40.524 [t1] c.TestInterrupt - 启动...
18:02:41.530 [main] c.TestInterrupt - 执行打断
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr
onizer.java:898)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron
izer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17)
at java.lang.Thread.run(Thread.java:748)
18:02:41.532 [t1] c.TestInterrupt - 等锁的过程中被打断
**/

注意如果是不可中断模式lock(),那么即使使用了interrupt也不会让等待中断。

锁超时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取等待 1s 后失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}
// lock.tryLock()表示不等待直接获得锁

解决哲学家就餐问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Chopstick {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
private void eat() {
log.debug("eating...");
Sleeper.sleep(1);
}
@Override
public void run() {
while (true) {
// 获得左手筷子
synchronized (left) {
// 获得右手筷子
synchronized (right) {
// 吃饭
eat();
}
// 放下右手筷子
}
// 放下左手筷子
}
}
}

// 测试
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
// 当每个人都拿到一只筷子后出现死锁
  1. 调换顺序使方向相同,但是会产生饥饿问题

    1
    new Philosopher("阿基米德", c1, c5).start();
  2. 使用可重入锁解决问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // 使Chopstick继承ReentrantLock
    class Chopstick extends ReentrantLock {}
    // 修改哲学家吃饭逻辑,左手拿到筷子后如果右手拿不到筷子就释放左手中的筷子
    @Override
    public void run() {
    while (true) {
    // 尝试获得左手筷子
    if (left.tryLock()) {
    try {
    // 尝试获得右手筷子
    if (right.tryLock()) {
    try {
    eat();
    } finally {
    right.unlock();
    }
    }
    } finally {
    left.unlock();
    }
    }
    }
    }
公平锁

ReentrantLock默认是不公平的,即构造参数中默认不写为false,设置公平锁ReentrantLock lock = new ReentrantLock(true);

公平锁一般没有必要,会降低并发度。

条件变量

synchronized中也有条件变量,即WaitSet休息室。而ReentrantLock的条件变量要比synchronized强大,它支持多个条件变量。

  • synchronized让不满足条件的线程都在一间休息室等消息
  • ReentrantLock支持多间休息室,有专门等烟的休息室、等早餐的休息室,唤醒时也是按照休息室来唤醒

使用流程

  • await之前需要先获得锁
  • await执行后,会释放锁,进入conditionObject等待
  • await的线程被唤醒(或打断、或超时)去重新竞争lock锁
  • 竞争lock锁成功后,从await后继续执行

以送烟送外卖为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
同步模式之顺序控制
  1. 控制运行顺序

    • wait/notify版

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      // 用来同步的对象
      static Object obj = new Object();
      // t2 运行标记, 代表 t2 是否执行过
      static boolean t2runed = false;
      public static void main(String[] args) {
      Thread t1 = new Thread(() -> {
      synchronized (obj) {
      // 如果 t2 没有执行过
      while (!t2runed) {
      try {
      // t1 先等一会
      obj.wait();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
      }
      System.out.println(1);
      });
      Thread t2 = new Thread(() -> {
      System.out.println(2);
      synchronized (obj) {
      // 修改运行标记
      t2runed = true;
      // 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)
      obj.notifyAll();
      }
      });
      t1.start();
      t2.start();
      }
      // 可以修改为ReentrantLock重解该问题
    • park/unpark版

      • 首先,需要保证先wait再notify,否则wait线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait

      • 第二,如果有些干扰线程错误地notify了wait线程,条件不满足时还要重新等待,使用了while循环来解决
        此问题

      • 最后,唤醒对象上的wait线程需要使用notifyAll,因为『同步对象』上的等待线程可能不止一个

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        Thread t1 = new Thread(() -> {
        try { Thread.sleep(1000); } catch (InterruptedException e) { }
        // 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
        LockSupport.park();
        System.out.println("1");
        });
        Thread t2 = new Thread(() -> {
        System.out.println("2");
        // 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
        LockSupport.unpark(t1);
        });
        t1.start();
        t2.start();
        // park和unpark方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』,不需要『同步对象』和『运行标记』
    1. 交替输出

      线程1输出a5次,线程2输出b5次,线程3输出c5次,要求输出abcabcabcabcabc。

      • wait/notify版

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        class SyncWaitNotify {
        private int flag;
        private int loopNumber;
        public SyncWaitNotify(int flag, int loopNumber) {
        this.flag = flag;
        this.loopNumber = loopNumber;
        }
        public void print(int waitFlag, int nextFlag, String str) {
        for (int i = 0; i < loopNumber; i++) {
        synchronized (this) {
        while (this.flag != waitFlag) {
        try {
        this.wait();
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
        }
        System.out.print(str);
        flag = nextFlag;
        this.notifyAll();
        }
        }
        }
        }
        // 测试
        SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
        new Thread(() -> {
        syncWaitNotify.print(1, 2, "a");
        }).start();
        new Thread(() -> {
        syncWaitNotify.print(2, 3, "b");
        }).start();
        new Thread(() -> {
        syncWaitNotify.print(3, 1, "c");
        }).start();
  - ReentrantLock版本
  
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class AwaitSignal extends ReentrantLock {
public void start(Condition first) {
this.lock();
try {
log.debug("start");
first.signal();
} finally {
this.unlock();
}
}
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
this.lock();
try {
current.await();
log.debug(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.unlock();
}
}
}
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
}
// 测试
AwaitSignal as = new AwaitSignal(5);
Condition aWaitSet = as.newCondition();
Condition bWaitSet = as.newCondition();
Condition cWaitSet = as.newCondition();
new Thread(() -> {
as.print("a", aWaitSet, bWaitSet);
}).start();
new Thread(() -> {
as.print("b", bWaitSet, cWaitSet);
}).start();
new Thread(() -> {
as.print("c", cWaitSet, aWaitSet);
}).start();
as.start(aWaitSet); // 得先唤醒一个
- park/unpark版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class SyncPark {
private int loopNumber;
private Thread[] threads;
public SyncPark(int loopNumber) {
this.loopNumber = loopNumber;
}
public void setThreads(Thread... threads) {
this.threads = threads;
}
public void print(String str) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(nextThread());
}
}
private Thread nextThread() {
Thread current = Thread.currentThread();
int index = 0;
for (int i = 0; i < threads.length; i++) {
if(threads[i] == current) {
index = i;
break;
}
}
if(index < threads.length - 1) {
return threads[index+1];
} else {
return threads[0];
}
}
public void start() {
for (Thread thread : threads) {
thread.start();
}
LockSupport.unpark(threads[0]);
}
}
// 测试
SyncPark syncPark = new SyncPark(5);
Thread t1 = new Thread(() -> {
syncPark.print("a");
});
Thread t2 = new Thread(() -> {
syncPark.print("b");
});
Thread t3 = new Thread(() -> {
syncPark.print("c\n");
});
syncPark.setThreads(t1, t2, t3);
syncPark.start();

场景

交替打印ABABAABA
可重入锁配合 Condition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ABA {
private static int flag = 0;
private static ReentrantLock lock = new ReentrantLock();
private static Condition aCondition = lock.newCondition();
private static Condition bCondition = lock.newCondition();

static class PrintA implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
while (flag % 2 != 0) {
try {
aCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag++;
System.out.print("A");
bCondition.signal();
} finally {
lock.unlock();
}
}
}
}

static class PrintB implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
lock.lock();
try {
while (flag % 2 == 0) {
try {
bCondition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag++;
System.out.print("B");
aCondition.signal();
} finally {
lock.unlock();
}
}
}
}

public static void main(String[] args) {
new Thread(new PrintA()).start();
new Thread(new PrintB()).start();
}
}
可重入锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ABA {
private static int flag = 0;
private static Lock lock = new ReentrantLock();

static class PrintA implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; ) {
lock.lock();
try {
while (flag % 2 == 0) {
System.out.print("A");
flag++;
i++;
}
} finally {
lock.unlock();
}
}
}
}

static class PrintB implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; ) {
lock.lock();
try {
while (flag % 2 != 0) {
System.out.print("B");
flag++;
i++;
}
} finally {
lock.unlock();
}
}
}
}

public static void main(String[] args) {
new Thread(new PrintA()).start();
new Thread(new PrintB()).start();
}
}
wait() 和 notify()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class ABA {
private static int flag = 0;
private Object lock;

private synchronized void printA() {
while (flag % 2 != 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag++;
System.out.print("A");
notify();
}

private synchronized void printB() {
while (flag % 2 == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag++;
System.out.print("B");
notify();
}

class PrintA implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
printA();
}
}
}

class PrintB implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
printB();
}
}
}

public static void main(String[] args) {
ABA aba = new ABA();
PrintA printA = aba.new PrintA();
PrintB printB = aba.new PrintB();
new Thread(printA).start();
new Thread(printB).start();
}
}
信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.Semaphore;
public class ABA {
private static Semaphore pa = new Semaphore(1);
private static Semaphore pb = new Semaphore(0);
static class PrintA implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
pa.acquire();
System.out.print("A");
pb.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class PrintB implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
pb.acquire();
System.out.print("B");
pa.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
new Thread(new PrintA()).start();
new Thread(new PrintB()).start();
}
}
i++ 线程安全性

i++ 是不安全的,它是一个复合操作,可分为三个阶段:

  • 读值,从内存到寄存器
  • +1,寄存器自增
  • 写值,写回内存

这三步之间都可能会有 CPU 调度,造成值被修改,造成脏读和脏写。如果是方法内定义的,一定是线程安全的,因为每个方法栈是线程私有的;如果是类的静态成员变量,则不是线程安全的,因为线程共享栈区,不共享堆区和全局区。

用 Volatile 虽然能保证可见性,但是不能保证原子性,如果想要保证多线程下的安全性,可以使用原子变量(AtomicInteger)、synchronized 和 Lock锁实现。AtomicInteger 和各种 Lock 都可以确保线程安全,AtomicInteger 的效率高是因为它的互斥区非常小,而 Lock 的互斥区是拿到锁到放锁之间的区域,至少三条指令。

Java 线程同步的几种方法

  1. synchronized
  2. wait() 和 notify()
  3. volatile
  4. reentrantlock
  5. ThreadLocal 局部变量
  6. 使用阻塞队列BlockingQueue
  7. 使用原子变量 AtomicInteger