Java并发编程第二篇,共享模型之管程,各种锁。
1. 共享带来的问题
临界区 Critical Section
一个程序运行多个线程本身是没有问题的
问题出在多个线程访问共享资源
多个线程访问共享资源其实也没有问题
在多个线程对共享资源读写操作时发生指令交错,就会出现问题
一段代码块内如果存在对共享资源的多线程读写操作,称这段代码为临界区
1 2 3 4 5 6 7 8 9 10 static int counter = 0 ;static void increment () { counter++; } static void decrement () { counter--; }
竞态条件 Race Condition
多个线程在临界区执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件。
为了避免临界区的竞态条件发生,有多种手段可以达到目的。
阻塞式的解决方案,synchronized,Lock
非阻塞式的解决方案:原子变量
2. synchronized解决方案
俗称对象锁,它采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其他线程再想获取这个对象锁时就会被阻塞住,这样就能保证拥有锁的线程可以安全的执行临界区的代码,不用担心上下文的切换。
虽然Java中互斥和同步都可以采用synchronized关键字来完成,但他们还是有区别的:
互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
同步是由于线程执行的先后、顺序不同,需要一个线程等待其他线程运行到某个点
Synchonized是基于进入和退出Monitor对象来实现方法同步和代码块同步,但两者的实现细节不一样。Synchronized 用在方法上时,在字节码中是通过方法的 ACC_SYNCHRONIZED 标志来实现的。而代码块同步则是使用monitorenter和monitorexit指令实现的。
monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处,JVM要保证每个monitorenter必须有对应的monitorexit与之配对。任何对象都有一个monitor与之关联,当且一个monitor被持有后,它将处于锁定状态。线程执行到monitorenter指令时,将会尝试获取对象所对应的monitor的所有权,即尝试获得对象的锁,当获得对象的monitor以后,monitor内部的计数器就会自增(初始为0),当同一个线程再次获得monitor的时候,计数器会再次自增。当同一个线程执行monitorexit指令的时候,计数器会进行自减,当计数器为0的时候,monitor就会被释放,其他线程便可以获得monitor。
语法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 static int counter = 0 ;static final Object room = new Object();public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { for (int i = 0 ; i <5000 ; i++){ synchronized (room){ counter++; } } }); Thread t2 = new Thread(() -> { for (int i = 0 ; i <5000 ; i++){ synchronized (room){ counter--; } } }); t1.start(); t2.start(); t1.join(); t2/join(); }
synchronized实际上是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切断所打断。
面向对象改进
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Slf 4jpublic class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { Room room = new Room(); Thread thread = new Thread(() -> { for (int i = 0 ; i < 5000 ; i++) room.increment(); }); Thread thread1 = new Thread(() -> { for (int i = 0 ; i < 5000 ; i++) room.decrement(); }); thread.start(); thread1.start(); thread.join(); thread1.join(); log.debug("主线程结束{}" , room.getCounter()); } } class Room { private int counter = 0 ; public void increment () { synchronized (this ) { counter++; } } public void decrement () { synchronized (this ) { counter--; } } public int getCounter () { synchronized (this ) { return counter; } } }
3. 方法上的synchronized
1 2 3 4 5 6 7 8 9 10 11 class test { public synchronized void test () { } } class test { public void test () { synchronized (this ) { } } }
1 2 3 4 5 6 7 8 9 10 11 class test { public synchronized static void test () { } } class test { public static void test () { synchronized (test.class ) { } } }
synchronized 关键字修饰方法和代码的区别
synchronized修饰方法 :同步是隐式的,给方法添加ACC_SYNCHRONIZED的访问修饰符。当某个线程要访问某个方法的时候,会检查是否有ACC_SYNCHRONIZED,如有,则需要先获取当前对象的监视器锁,然后才开始执行方法。方法执行之后再释放监视器锁。
同步代码块 :采用monitorenter
、monitorexit
两个指令来实现同步。在执行monitorenter指令时,首先要尝试获取对象的锁。如果这个对象没被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1,相应的,在执行monitorexit指令时会将锁计数器减1,当计数器为0的时候,锁就会被释放。如果获取对象锁失败,那当前线程就要阻塞等待。直到对象锁被另外一个线程释放为止。
练习:线程八锁
情况1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf 4jpublic class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); new Thread(() -> { log.debug("begin" ); n1.a(); }).start(); new Thread(() -> { log.debug("begin" ); n1.b(); }).start(); } } @Slf 4jclass Number { public synchronized void a () { log.debug("1" ); } public synchronized void b () { log.debug("2" ); } }
情况2,修改Number类如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf 4jclass Number { public synchronized void a () { Thred.sleep(1000 ); log.debug("1" ); } public synchronized void b () { log.debug("2" ); } }
情况3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Slf 4jpublic class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); new Thread(() -> { log.debug("begin" ); n1.a(); }).start(); new Thread(() -> { log.debug("begin" ); n1.b(); }).start(); new Thread(() -> { log.debug("begin" ); n1.c(); }).start(); } } @Slf 4jclass Number { public synchronized void a () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1" ); } public synchronized void b () { log.debug("2" ); } public void c () { log.debug("3" ); } }
情况4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf 4jpublic class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); Number n2 = new Number(); new Thread(() -> { log.debug("begin" ); n1.a(); }).start(); new Thread(() -> { log.debug("begin" ); n2.b(); }).start(); } } @Slf 4jclass Number { public synchronized void a () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1" ); } public synchronized void b () { log.debug("2" ); } }
情况5
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf 4jpublic class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); new Thread(() -> { log.debug("begin" ); n1.a(); }).start(); new Thread(() -> { log.debug("begin" ); n1.b(); }).start(); } } @Slf 4jclass Number { public static synchronized void a () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1" ); } public synchronized void b () { log.debug("2" ); } }
情况6
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf 4jpublic class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); Number n2 = new Number(); new Thread(() -> { log.debug("begin" ); n1.a(); }).start(); new Thread(() -> { log.debug("begin" ); n1.b(); }).start(); } } @Slf 4jclass Number { public static synchronized void a () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1" ); } public static synchronized void b () { log.debug("2" ); } }
情况7
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf 4jpublic class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); Number n2 = new Number(); new Thread(() -> { log.debug("begin" ); n1.a(); }).start(); new Thread(() -> { log.debug("begin" ); n2.b(); }).start(); } } @Slf 4jclass Number { public static synchronized void a () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1" ); } public synchronized void b () { log.debug("2" ); } }
情况8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf 4jpublic class Test { public static void main (String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); Number n2 = new Number(); new Thread(() -> { log.debug("begin" ); n1.a(); }).start(); new Thread(() -> { log.debug("begin" ); n2.b(); }).start(); } } @Slf 4jclass Number { public static synchronized void a () { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1" ); } public static synchronized void b () { log.debug("2" ); } }
4. 线程安全分析
成员变量和静态变量是否线程安全
如果它们没有共享,则线程安全
如果它们被共享了,根据它们的状态是否能被改变,又分两种情况:
如果只有读操作,则线程安全
如果只有写操作,则这段代码属于临界区,需要考虑线程安全
局部变量是否线程安全
局部变量是线程安全的
但局部变量引用的对象未必
如果该对象没有逃离方法的作用域访问,它是线程安全的
如果该对象逃离方法的作用范围,需要考虑线程安全
局部变量线程安全分析
1 2 3 4 public static void text () { int i = 10 ; i++; }
每个线程调用test()方法时局部变量i,会在每个线程的栈帧内存中被创建多份,因此不存在共享。
但局部变量的引用稍有不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf 4jpublic class Test { static final int THREAD_NUMBER = 2 ; static final int LOOP_NUMBER = 200 ; public static void main (String[] args) { ThreadUnsafe test = new ThreadUnsafe(); for (int i = 0 ; i < THREAD_NUMBER; i++) { new Thread(() -> { test.method1(LOOP_NUMBER); }, "Thread" + i).start(); } } } class ThreadUnsafe { ArrayList<String> list = new ArrayList<>(); public void method1 (int loopNumber) { for (int i = 0 ; i < loopNumber; i++) { method2(); method3(); } } private void method2 () { list.add("1" ); } private void method3 () { list.remove(0 ); } }
无论哪个线程中的method2引用的都是同一个对象中的list成员变量,method3与method2分析相同。
修改为局部变量,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Slf 4jpublic class Test { static final int THREAD_NUMBER = 2 ; static final int LOOP_NUMBER = 200 ; public static void main (String[] args) { ThreadSafe test = new ThreadSafe(); for (int i = 0 ; i < THREAD_NUMBER; i++) { new Thread(() -> { test.method1(LOOP_NUMBER); }, "Thread" + i).start(); } } } class ThreadSafe { public final void method1 (int loopNumber) { ArrayList<String> list = new ArrayList<>(); for (int i = 0 ; i < loopNumber; i++) { method2(list); method3(list); } } private void method2 (ArrayList<String> list) { list.add("1" ); } private void method3 (ArrayList<String> list) { list.remove(0 ); } }
list是局部变量,每个线程调用时会创建其不同实例,没有共享
而method2的参数是由method1中传递过来的,与method1引用同一个对象
method3的参数分析与method2相同
常见线程安全类
String
Integer
StringBuffer
Random
Vector
Hashtable
java.util.concurrent下的类
这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法,是线程安全的,也可以理解为:
它们的每个方法是原子的
但是它们多个方法组合不是原子的
不可变线程安全性
String、Integer等都是不可变类,因为其内部的状态不可改变,因此它们的方法都是线程安全的。
实例分析
例1
1 2 3 4 5 6 7 8 9 10 11 12 public class MyServlet extends HttpServlet { Map<String, Object> map = new HashMap<>(); String S1 = "..." ; final String S2 = "..." ; Date Da = new Date(); final Date D2 = new Date(); }
例2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ublic class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl(); public void doGet (HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private int count = 0 ; public void update () { count++; } }
例3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Aspect @Component public class MyAspect { private long start = 0L ; @Before ("execution(* *(..))" ) public void before () { start = System.nanoTime(); } @After ("execution(* *(..))" ) public void after () { long end = System.nanoTime(); System.out.println("cost time:" + (end-start)); } }
例4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl(); public void doGet (HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private UserDao userDao = new UserDaoImpl(); public void update () { userDao.update(); } } public class UserDaoImpl implements UserDao { public void update () { String sql = "update user set password = ? where username = ?" ; try (Connection conn = DriverManager.getConnection("" ,"" ,"" )){ } catch (Exception e) { } } }
例5
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl(); public void doGet (HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private UserDao userDao = new UserDaoImpl(); public void update () { userDao.update(); } } public class UserDaoImpl implements UserDao { private Connection conn = null ; public void update () throws SQLException { String sql = "update user set password = ? where username = ?" ; conn = DriverManager.getConnection("" ,"" ,"" ); conn.close(); } }
例6
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl(); public void doGet (HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { public void update () { UserDao userDao = new UserDaoImpl(); userDao.update(); } } public class UserDaoImpl implements UserDao { private Connection = null ; public void update () throws SQLException { String sql = "update user set password = ? where username = ?" ; conn = DriverManager.getConnection("" ,"" ,"" ); conn.close(); } }
例7
1 2 3 4 5 6 7 8 9 10 11 public abstract class Test { public void bar () { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ); foo(sdf); } public abstract foo (SimpleDateFormat sdf) ; public static void main (String[] args) { new Test().bar(); } }
其中foo的行为是不确定的,可能导致不安全的发生,被称之为外星方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 public void foo (SimpleDateFormat sdf) { String dateStr = "1999-10-11 00:00:00" ; for (int i = 0 ; i < 20 ; i++) { new Thread(() -> { try { sdf.parse(dateStr); } catch (ParseException e) { e.printStackTrace(); } }).start(); } }
6. Monitor概念
被翻译为监视器或者管程
每个Java对象都可以关联一个Monitor对象,如果使用synchronized给对象上锁(重量级)之后,该对象头的Mark Word中就被设置指向Monitor对象的指针。
刚开始Monitor中Owner为null
当Thread-2执行synchronized(obj)就会将Monitor的所有者Owner置为Thread-2,Monitor中只能有一个Owner
在Thread-2上锁的过程中,如果Thread-3、Thread-4、Thread-5也来执行synchronized(obj),就会进入EntryList BLOCKED
Thread-2执行完同步代码块的内容,然后唤醒EntryList中等待的线程来竞争锁,竞争的时候是非公平的
注意
synchronized必须是进入同一个对象的monitor才有上述的效果
不加synchronized的对象不会监视器,不遵从以上规则
synchronized进阶
1 2 3 4 5 6 7 8 9 10 11 12 static final Object obj = new Object();public static void method1 () { synchronized ( obj ) { method2(); } } public static void method2 () { synchronized ( obj ) { } }
创建锁记录(LockRecord)对象,每个线程的栈帧都会包括一个锁记录的结构,内部可以存储锁定对象的MarkWord
让锁记录中Object Reference指向锁对象,并尝试用CAS(Compare and swap)替换Object的MarkWord,将MarkWord的值存入锁记录
如果CAS替换成功,对象头存储了锁记录地址和状态00,表示由线程给该对象加锁,这时图示如下
如果CAS失败,有两种情况
当退出synchronized代码块(解锁时)如果有取值为null的锁记录,表示有重入,这时重置锁记录,表示重入计数减一
当退出synchronized代码块(解锁时)锁记录的值不为null,这时使用cas将MarkWord的值恢复给对象头
成功,则解锁成功
失败,说明轻量级锁进行了锁膨胀或已经升级为重量锁,进入重量级锁解锁流程
锁膨胀
如果在尝试加轻量锁的过程中,CAS操作无法成功,这时一种情况就是有其他线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。
1 2 3 4 5 6 static Object obj = new Object();public static void method1 () { synchronized ( obj ) { } }
当Thread-1进行轻量级加锁时,Thread-0已经对该对象加了轻量级锁
这时Thread-1加轻量级锁失败,进入锁膨胀流程
当Thread-0退出同步代码块解锁时,使用CAS将MarkWord的值恢复给对象头,失败,这时会进入重量级解锁流程,即按照Monitor地址找到Monitor对象,设置Owner为null,唤醒EntryList中BLOCKED线程
自旋优化
重量级锁竞争的时候,还可以使用自旋来优化,如果当前线程自旋成功(即这时候锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。
失败
偏向锁
轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行CAS操作。
Java6中引入了偏向锁来做进一步优化,只有第一次使用CAS将线程ID设置到对象的MarkWord头,之后发现这个线程ID是自己的就表示没有竞争,不用重新CAS,以后只要不发生竞争,这个对象就归该线程所有。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static final Object obj = new Object();public static void m1 () { synchronized ( obj ) { m2(); } } public static void m2 () { synchronized ( obj ) { m3(); } } public static void m3 () { synchronized ( obj ) { } }
偏向状态
一个对象创建时:
如果开启了偏向锁(默认开启),那么对象创建后,markword值为0x05即后三位为101,这时他的thread、epoch、age都为0
偏向锁默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加JVM参数-XX:biasedLockingStartupDelay=0
来禁用延迟
如果没有开启偏向锁,那么对象创建后,markword值为0x01即后三位为001,这时它的hashcode、age都为0,第一次用到hashcode时候才会赋值
-XX:-UseBiasedLocking,禁用偏向锁
当一个可偏向的对象,调用了自己的hashcode之后,会撤销他的偏向状态
撤销-调用hashcode
调用了对象的hashcode,但偏向锁的对象markword中存储的是线程id,如果调用hashCode会导致偏向锁被撤销。
轻量级锁会在锁记录中记录hashCode
重量级锁会在Monitor中记录hashCode
在调用hashCode后使用偏向锁,记得去掉-XX:-UseBiasedLocking
。
撤销-其他线程使用对象
当有其他线程使用偏向锁对象时,会将偏向锁升级为轻量级锁,由可偏向变为不可偏向。
撤销-调用wait/notify
因为wait/notify只有重量级锁才有,因此调用了就会升级为重量级锁。
批量重偏向
如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程T1的对象仍然有机会重新偏向T2,重偏向会重置对象的ThreadID。
当撤销偏向锁阈值超过二十次以后,JVM会觉得自己是不是偏向错了,于是会给这些对象加锁时重新偏向至加锁线程。
批量撤销
当撤销偏向锁阈值超过40次后,JVM会觉得自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的。
锁消除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Fork (1 )@BenchmarkMode (Mode.AverageTime)@Warmup (iterations=3 )@Measurement (iterations=5 )@OutputTimeUnit (TimeUnit.NANOSECONDS)public class MyBenchmark { static int x = 0 ; @Benchmark public void a () throws Exception { x++; } @Benchmark public void b () throws Exception { Object o = new Object(); synchronized (o) { x++; } } }
1 2 3 4 # java -jar benchmarks.jar 热点代码优化,性能相差无几 Benchmark Mode Samples Score Score error Units c.i.MyBenchmark.a avgt 5 1.542 0.056 ns/op c.i.MyBenchmark.b avgt 5 1.518 0.091 ns/op
1 2 3 4 # java -XX:-EliminateLocks -jar benchmarks.jar 关闭锁消除优化 Benchmark Mode Samples Score Score error Units c.i.MyBenchmark.a avgt 5 1.507 0.108 ns/op c.i.MyBenchmark.b avgt 5 16.976 1.572 ns/op
锁粗化:对相同对象多次加锁,导致线程发生多次重入,可以使用锁粗化方式来优化,这不同于之前讲的细分锁的粒度。
7. wait / notify
Owner线程发现条件不满足,调用wait方法,即可进入WaitSet变为Waiting状态
BLOCKED和WAITING的线程都处于阻塞状态,不占用CPU时间片
BLOCKED线程会在Owner线程释放锁时唤醒
WAITING线程会在Owner线程调用notify或notufyAll时候唤醒, 但唤醒后并不意味着立刻获得锁,仍需进入EntryList重新竞争
API
obj.wait()让进入object监视器的线程到waitSet等待
obj.notify()在object上正在waitSet等待的线程中挑一个唤醒
obj.notifyAll()让object上正在waitSet等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于Object对象的方法,必须获得此对象的锁,才能调用这几个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 final static Object obj = new Object();public static void main (String[] args) { new Thread(() -> { synchronized (obj) { log.debug("执行...." ); try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...." ); } }).start(); new Thread(() -> { synchronized (obj) { log.debug("执行...." ); try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...." ); } }).start(); sleep(2 ); log.debug("唤醒 obj 上其它线程" ); synchronized (obj) { obj.notify(); } }
1 2 3 4 5 # notify的一种结果 20:00:53.096 [Thread-0] c.TestWaitNotify - 执行.... 20:00:53.099 [Thread-1] c.TestWaitNotify - 执行.... 20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程 20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....
1 2 3 4 5 6 # notifyAll的结果 19:58:15.457 [Thread-0] c.TestWaitNotify - 执行.... 19:58:15.460 [Thread-1] c.TestWaitNotify - 执行.... 19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程 19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码.... 19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码....
wait()方法会释放对象的锁,进入WaitSet等待区,从而让其他线程有机会获得对象的锁。无限制等待直到notify为止。
wait(long n)有时限的等待,到n毫秒后结束等待,或被notify,wait(long timeout, int nanos)纳秒。
8. wait/notify的正确姿势
sleep和notify的区别
sleep是Thread方法,而wait是Object的方法
sleep不需要强制和synchronized配合使用,但wait需要和synchronized使用
sleep在睡眠的同时,不会释放对象锁的,但wait在等待的时候会释放对象锁
它们的状态都是TIMED_WAITING
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 new Thread(() -> { synchronized (room) { log.debug("有烟没?[{}]" , hasCigarette); while (!hasCigarette) { log.debug("没烟,先歇会!" ); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("有烟没?[{}]" , hasCigarette); if (hasCigarette) { log.debug("可以开始干活了" ); } else { log.debug("没干成活..." ); } } }, "小南" ).start(); new Thread(() -> { synchronized (room) { Thread thread = Thread.currentThread(); log.debug("外卖送到没?[{}]" , hasTakeout); if (!hasTakeout) { log.debug("没外卖,先歇会!" ); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("外卖送到没?[{}]" , hasTakeout); if (hasTakeout) { log.debug("可以开始干活了" ); } else { log.debug("没干成活..." ); } } }, "小女" ).start(); sleep(1 ); new Thread(() -> { synchronized (room) { hasTakeout = true ; log.debug("外卖到了噢!" ); room.notifyAll(); } }, "送外卖的" ).start();
套路总结
1 2 3 4 5 6 7 8 9 10 synchronized (lock) { while (条件不成立) { lock.wait(); } } synchronized (lock) { lock.notifyAll(); }
同步模式之保护性暂停
Guarded Suspension,用在一个线程等待另一个线程的执行结果。
有一个结果需要从一个线程传递到另一个线程,让它们关联同一个GuardedObject
如果有结果不断从一个线程到另一个线程,可以使用消息队列
JDK中,join的实现、Future的实现,采用的就是此模式
因为要等待另一方的结果,因此归类到同步模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class GuardedObject { private Object response; private final Object lock = new Object(); public Object get () { synchronized (lock) { while (response == null ) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public void complete (Object response) { synchronized (lock) { this .response = response; lock.notifyAll(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { try { List<String> response = download(); log.debug("download complete..." ); guardedObject.complete(response); } catch (IOException e) { e.printStackTrace(); } }).start(); log.debug("waiting..." ); Object response = guardedObject.get(); log.debug("get response: [{}] lines" , ((List<String>) response).size()); }
增加超时效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 class GuardedObjectV { private Object response; private final Object lock = new Object(); public Object get (long millis) { synchronized (lock) { long begin = System.currentTimeMillis(); long timePassed = 0 ; while (response == null ) { long waitTime = millis - timePassed; log.debug("waitTime: {}" , waitTime); if (waitTime <= 0 ) { log.debug("break..." ); break ; } try { lock.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } timePassed = System.currentTimeMillis() - begin; log.debug("timePassed: {}, object is null {}" , timePassed, response == null ); } return response; } } public void complete (Object response) { synchronized (lock) { this .response = response; log.debug("notify..." ); lock.notifyAll(); } } }
join原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public final void join () throws InterruptedException { join(0 ); } public final synchronized void join (long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0 ; if (millis < 0 ) { throw new IllegalArgumentException("timeout value is negative" ); } if (millis == 0 ) { while (isAlive()) { wait(0 ); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0 ) { break ; } wait(delay); now = System.currentTimeMillis() - base; } } }
解耦
以居民邮箱为例,在多个类之间进行参数传递,设计一个用来解耦的中间类,解耦结果等待者和结果生产者。
生产线程和结果使用线程是一一对应的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 @Slf 4jpublic class Test { public static void main (String[] args) throws InterruptedException { for (int i = 0 ; i < 3 ; i++) { new People().start(); } Thread.sleep(1000 ); for (Integer id : MailBoxes.getIds()) { new Postman(id, "内容" + id).start(); } } } @Slf 4jclass People extends Thread { @Override public void run () { GuardedObject guardedObject = MailBoxes.createGuardedObject(); log.debug("开始收信 id:{}" , guardedObject.getId()); Object mail = guardedObject.get(5000 ); log.debug("收到信 id:{} 内容:{}" , guardedObject.getId(), mail); } } @Slf 4jclass Postman extends Thread { private int id; private String mail; public Postman (int id, String mall) { this .id = id; this .mail = mall; } @Override public void run () { GuardedObject guardedObject = MailBoxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}" , id, mail); guardedObject.complete(mail); } } class MailBoxes { public static Map<Integer, GuardedObject> boxes = new Hashtable<>(); private static int id = 1 ; private static synchronized int generateId () { return id++; } public static GuardedObject getGuardedObject (int id) { return boxes.remove(id); } public static GuardedObject createGuardedObject () { GuardedObject go = new GuardedObject(generateId()); boxes.put(go.getId(), go); return go; } public static Set<Integer> getIds () { return boxes.keySet(); } } class GuardedObject { private int id; private Object response; public GuardedObject (int id) { this .id = id; } public int getId () { return id; } public Object get (long timeout) { synchronized (this ) { long begin = System.currentTimeMillis(); long passedTime = 0 ; while (response == null ) { long waitTime = timeout - passedTime; if (timeout - passedTime <= 0 ) { break ; } try { this .wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } } public void complete (Object response) { synchronized (this ) { this .response = response; this .notifyAll(); } } }
异步模式之生产者消费者
与保护性暂停中的GuardedObject不同,不需要产生结果和消费结果的线程一一对应
消费队列可以用来平衡生产和消费的线程资源
生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
JDK中各种阻塞队列,采用的就是这种模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 @Slf 4jpublic class Test { public static void main (String[] args) throws InterruptedException { MessageQueue messageQueue = new MessageQueue(2 ); for (int i = 0 ; i < 3 ; i++) { int id = i; new Thread(() -> { messageQueue.put(new Message(id, "值" + id)); }, "生产者" + i).start(); } new Thread(() -> { while (true ) { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } Message message = messageQueue.take(); } }, "消费者" ).start(); } } @Slf 4jclass MessageQueue { private LinkedList<Message> queue; private int capacity; MessageQueue(int capacity) { this .capacity = capacity; queue = new LinkedList<>(); } public Message take () { synchronized (queue) { while (queue.isEmpty()) { log.debug("没货了,wait" ); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = queue.removeFirst(); log.debug("已经消费了一个消息" ); queue.notifyAll(); return message; } } public void put (Message message) { synchronized (queue) { while (queue.size() == capacity) { log.debug("库存已经达到上限,wait" ); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(message); log.debug("已经生产了一个消息" ); queue.notifyAll(); } } } final class Message { private int id; private Object message; Message(int id, Object message) { this .id = id; this .message = message; } public int getId () { return id; } public Object getMessage () { return message; } }
9. Park & Unpark
基本使用
1 2 3 4 5 LockSupport.park(); LockSupport.unpark(暂停线程的对象);
1 2 3 4 5 6 7 8 9 10 11 Thread t1 = new Thread(() -> { log.debug("start..." ); sleep(1 ); log.debug("park..." ); LockSupport.park(); log.debug("resume..." ); },"t1" ); t1.start(); sleep(2 ); log.debug("unpark..." ); LockSupport.unpark(t1);
特点
wait、notify和notifyAll必须配合Object Monitor一起使用,而park、unpark不必
park & unpark以线程为单位来阻塞和唤醒线程,而notify只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,不是那么精确
park & unpark可以先unpark,而wait & notify不能先notify
原理
每个线程都有自己的一个Parker对象,由三部分组成,_counter ,_cond 和*_mutex*,打个比喻。
线程就像一个旅人,Parker就像他随身携带的背包,条件变量就好比背包中的帐篷,_counter 就好比背包中的备用干粮(0为耗尽,1为充足)
调用park就是要看需不需要停下来歇息
如果备用干粮耗尽,那么钻进帐篷休息
如果备用干粮充足,那么不需停留,继续前进
调用unpark,就好比令干粮充足
如果这时线程还在帐篷,就唤醒让他继续前进
如果这时线程还在运行,那么他下次调用park时,仅是消耗备用干粮,不需停留继续前进
因为背包空间有限,多次调用unpark仅会补充一份备用干粮
10. 重新理解线程状态转换
11. 多把锁
一间屋子有学习和睡觉两个功能,如果A要学习,B要睡觉,但是如果只用一个屋子即一个对象锁的话,并发度很低,解决办法就是转杯多个房间即多个对象锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class BigRoom { public void sleep () { synchronized (this ) { log.debug("sleeping 2 小时" ); Sleeper.sleep(2 ); } } public void study () { synchronized (this ) { log.debug("study 1 小时" ); Sleeper.sleep(1 ); } } }
改进
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class BigRoom { private final Object studyRoom = new Object(); private final Object bedRoom = new Object(); public void sleep () { synchronized (bedRoom) { log.debug("sleeping 2 小时" ); Sleeper.sleep(2 ); } } public void study () { synchronized (studyRoom) { log.debug("study 1 小时" ); Sleeper.sleep(1 ); } } }
将锁的细粒度细分
好处:增强并发度
坏处:如果一个线程需要同时获得多把锁,容易发生死锁
12. 活跃性
死锁
一个线程需要同时获得多把锁,这时候就容易发生死锁。
T1线程获得A对象锁,接下来想获得B对象锁
T2线程获得B对象锁,接下来想获得A对象锁
使用顺序加锁的方式解决该问题,即都按照AB的顺序获得锁。
定位死锁
检测死锁可以使用jsconsole工具,或者使用jps定位进程id,再用jstact定位死锁
避免死锁要注意加锁顺序
如果某个进程进入了死循环,导致其它线程一直等待,对于这种情况,Linux下可以通过top先定位到CPU占用高的Java进程,再利用top -Hp 进程id
来定位哪个进程,最后再用jstack排查
活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class TestLiveLock { static volatile int count = 10 ; static final Object lock = new Object(); public static void main (String[] args) { new Thread(() -> { while (count > 0 ) { sleep(0.2 ); count--; log.debug("count: {}" , count); } }, "t1" ).start(); new Thread(() -> { while (count < 20 ) { sleep(0.2 ); count++; log.debug("count: {}" , count); } }, "t2" ).start(); } }
饥饿
一个线程由于优先级太低,始终得不到CPU调度执行,也不能结束。
13. ReentrantLock
可重入锁
可中断
可以设置超时时间
可以设置公平锁
支持多个条件变量
与synchronized一样,都支持可重入。
1 2 3 4 5 6 7 8 9 reentrantLock.lock(); try { } finally { reentrantLock.unlock(); }
可重入
可重入是指同一个线程如果首次获得了这把锁,那么是因为它是这把锁的拥有者,因此有权利再次获取这把锁。
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 static ReentrantLock lock = new ReentrantLock();public static void main (String[] args) { method1(); } public static void method1 () { lock.lock(); try { log.debug("execute method1" ); method2(); } finally { lock.unlock(); } } public static void method2 () { lock.lock(); try { log.debug("execute method2" ); method3(); } finally { lock.unlock(); } } public static void method3 () { lock.lock(); try { log.debug("execute method3" ); } finally { lock.unlock(); } }
可打断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..." ); try { lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("等锁的过程中被打断" ); return ; } try { log.debug("获得了锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("获得了锁" ); t1.start(); try { sleep(1 ); t1.interrupt(); log.debug("执行打断" ); } finally { lock.unlock(); }
注意如果是不可中断模式lock()
,那么即使使用了interrupt也不会让等待中断。
锁超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..." ); try { if (!lock.tryLock(1 , TimeUnit.SECONDS)) { log.debug("获取等待 1s 后失败,返回" ); return ; } } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("获得了锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("获得了锁" ); t1.start(); try { sleep(2 ); } finally { lock.unlock(); }
解决哲学家就餐问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class Chopstick { String name; public Chopstick (String name) { this .name = name; } @Override public String toString () { return "筷子{" + name + '}' ; } } class Philosopher extends Thread { Chopstick left; Chopstick right; public Philosopher (String name, Chopstick left, Chopstick right) { super (name); this .left = left; this .right = right; } private void eat () { log.debug("eating..." ); Sleeper.sleep(1 ); } @Override public void run () { while (true ) { synchronized (left) { synchronized (right) { eat(); } } } } } Chopstick c1 = new Chopstick("1" ); Chopstick c2 = new Chopstick("2" ); Chopstick c3 = new Chopstick("3" ); Chopstick c4 = new Chopstick("4" ); Chopstick c5 = new Chopstick("5" ); new Philosopher("苏格拉底" , c1, c2).start();new Philosopher("柏拉图" , c2, c3).start();new Philosopher("亚里士多德" , c3, c4).start();new Philosopher("赫拉克利特" , c4, c5).start();new Philosopher("阿基米德" , c5, c1).start();
调换顺序使方向相同,但是会产生饥饿问题
1 new Philosopher("阿基米德" , c1, c5).start();
使用可重入锁解决问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class Chopstick extends ReentrantLock {}@Override public void run () { while (true ) { if (left.tryLock()) { try { if (right.tryLock()) { try { eat(); } finally { right.unlock(); } } } finally { left.unlock(); } } } }
公平锁
ReentrantLock默认是不公平的,即构造参数中默认不写为false
,设置公平锁ReentrantLock lock = new ReentrantLock(true);
。
公平锁一般没有必要,会降低并发度。
条件变量
synchronized中也有条件变量,即WaitSet休息室。而ReentrantLock的条件变量要比synchronized强大,它支持多个条件变量。
synchronized让不满足条件的线程都在一间休息室等消息
ReentrantLock支持多间休息室,有专门等烟的休息室、等早餐的休息室,唤醒时也是按照休息室来唤醒
使用流程
await之前需要先获得锁
await执行后,会释放锁,进入conditionObject等待
await的线程被唤醒(或打断、或超时)去重新竞争lock锁
竞争lock锁成功后,从await后继续执行
以送烟送外卖为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 static ReentrantLock lock = new ReentrantLock();static Condition waitCigaretteQueue = lock.newCondition();static Condition waitbreakfastQueue = lock.newCondition();static volatile boolean hasCigrette = false ;static volatile boolean hasBreakfast = false ;public static void main (String[] args) { new Thread(() -> { try { lock.lock(); while (!hasCigrette) { try { waitCigaretteQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("等到了它的烟" ); } finally { lock.unlock(); } }).start(); new Thread(() -> { try { lock.lock(); while (!hasBreakfast) { try { waitbreakfastQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("等到了它的早餐" ); } finally { lock.unlock(); } }).start(); sleep(1 ); sendBreakfast(); sleep(1 ); sendCigarette(); } private static void sendCigarette () { lock.lock(); try { log.debug("送烟来了" ); hasCigrette = true ; waitCigaretteQueue.signal(); } finally { lock.unlock(); } } private static void sendBreakfast () { lock.lock(); try { log.debug("送早餐来了" ); hasBreakfast = true ; waitbreakfastQueue.signal(); } finally { lock.unlock(); } }
同步模式之顺序控制
控制运行顺序
wait/notify版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 static Object obj = new Object();static boolean t2runed = false ;public static void main (String[] args) { Thread t1 = new Thread(() -> { synchronized (obj) { while (!t2runed) { try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println(1 ); }); Thread t2 = new Thread(() -> { System.out.println(2 ); synchronized (obj) { t2runed = true ; obj.notifyAll(); } }); t1.start(); t2.start(); }
park/unpark版
首先,需要保证先wait再notify,否则wait线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
第二,如果有些干扰线程错误地notify了wait线程,条件不满足时还要重新等待,使用了while循环来解决
此问题
最后,唤醒对象上的wait线程需要使用notifyAll,因为『同步对象』上的等待线程可能不止一个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Thread t1 = new Thread(() -> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { } LockSupport.park(); System.out.println("1" ); }); Thread t2 = new Thread(() -> { System.out.println("2" ); LockSupport.unpark(t1); }); t1.start(); t2.start();
交替输出
线程1输出a5次,线程2输出b5次,线程3输出c5次,要求输出abcabcabcabcabc。
wait/notify版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class SyncWaitNotify { private int flag; private int loopNumber; public SyncWaitNotify (int flag, int loopNumber) { this .flag = flag; this .loopNumber = loopNumber; } public void print (int waitFlag, int nextFlag, String str) { for (int i = 0 ; i < loopNumber; i++) { synchronized (this ) { while (this .flag != waitFlag) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); flag = nextFlag; this .notifyAll(); } } } } SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1 , 5 ); new Thread(() -> { syncWaitNotify.print(1 , 2 , "a" ); }).start(); new Thread(() -> { syncWaitNotify.print(2 , 3 , "b" ); }).start(); new Thread(() -> { syncWaitNotify.print(3 , 1 , "c" ); }).start();
- ReentrantLock版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 class AwaitSignal extends ReentrantLock { public void start (Condition first) { this .lock(); try { log.debug("start" ); first.signal(); } finally { this .unlock(); } } public void print (String str, Condition current, Condition next) { for (int i = 0 ; i < loopNumber; i++) { this .lock(); try { current.await(); log.debug(str); next.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { this .unlock(); } } } private int loopNumber; public AwaitSignal (int loopNumber) { this .loopNumber = loopNumber; } } AwaitSignal as = new AwaitSignal(5 ); Condition aWaitSet = as.newCondition(); Condition bWaitSet = as.newCondition(); Condition cWaitSet = as.newCondition(); new Thread(() -> { as.print("a" , aWaitSet, bWaitSet); }).start(); new Thread(() -> { as.print("b" , bWaitSet, cWaitSet); }).start(); new Thread(() -> { as.print("c" , cWaitSet, aWaitSet); }).start(); as.start(aWaitSet);
- park/unpark版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class SyncPark { private int loopNumber; private Thread[] threads; public SyncPark (int loopNumber) { this .loopNumber = loopNumber; } public void setThreads (Thread... threads) { this .threads = threads; } public void print (String str) { for (int i = 0 ; i < loopNumber; i++) { LockSupport.park(); System.out.print(str); LockSupport.unpark(nextThread()); } } private Thread nextThread () { Thread current = Thread.currentThread(); int index = 0 ; for (int i = 0 ; i < threads.length; i++) { if (threads[i] == current) { index = i; break ; } } if (index < threads.length - 1 ) { return threads[index+1 ]; } else { return threads[0 ]; } } public void start () { for (Thread thread : threads) { thread.start(); } LockSupport.unpark(threads[0 ]); } } SyncPark syncPark = new SyncPark(5 ); Thread t1 = new Thread(() -> { syncPark.print("a" ); }); Thread t2 = new Thread(() -> { syncPark.print("b" ); }); Thread t3 = new Thread(() -> { syncPark.print("c\n" ); }); syncPark.setThreads(t1, t2, t3); syncPark.start();
场景
交替打印ABABAABA
可重入锁配合 Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;public class ABA { private static int flag = 0 ; private static ReentrantLock lock = new ReentrantLock(); private static Condition aCondition = lock.newCondition(); private static Condition bCondition = lock.newCondition(); static class PrintA implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { lock.lock(); try { while (flag % 2 != 0 ) { try { aCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } flag++; System.out.print("A" ); bCondition.signal(); } finally { lock.unlock(); } } } } static class PrintB implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { lock.lock(); try { while (flag % 2 == 0 ) { try { bCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } flag++; System.out.print("B" ); aCondition.signal(); } finally { lock.unlock(); } } } } public static void main (String[] args) { new Thread(new PrintA()).start(); new Thread(new PrintB()).start(); } }
可重入锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class ABA { private static int flag = 0 ; private static Lock lock = new ReentrantLock(); static class PrintA implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; ) { lock.lock(); try { while (flag % 2 == 0 ) { System.out.print("A" ); flag++; i++; } } finally { lock.unlock(); } } } } static class PrintB implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; ) { lock.lock(); try { while (flag % 2 != 0 ) { System.out.print("B" ); flag++; i++; } } finally { lock.unlock(); } } } } public static void main (String[] args) { new Thread(new PrintA()).start(); new Thread(new PrintB()).start(); } }
wait() 和 notify()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class ABA { private static int flag = 0 ; private Object lock; private synchronized void printA () { while (flag % 2 != 0 ) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } flag++; System.out.print("A" ); notify(); } private synchronized void printB () { while (flag % 2 == 0 ) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } flag++; System.out.print("B" ); notify(); } class PrintA implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { printA(); } } } class PrintB implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { printB(); } } } public static void main (String[] args) { ABA aba = new ABA(); PrintA printA = aba.new PrintA(); PrintB printB = aba.new PrintB(); new Thread(printA).start(); new Thread(printB).start(); } }
信号量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import java.util.concurrent.Semaphore;public class ABA { private static Semaphore pa = new Semaphore(1 ); private static Semaphore pb = new Semaphore(0 ); static class PrintA implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { pa.acquire(); System.out.print("A" ); pb.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class PrintB implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { pb.acquire(); System.out.print("B" ); pa.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main (String[] args) { new Thread(new PrintA()).start(); new Thread(new PrintB()).start(); } }
i++ 线程安全性
i++ 是不安全的,它是一个复合操作,可分为三个阶段:
读值,从内存到寄存器
+1,寄存器自增
写值,写回内存
这三步之间都可能会有 CPU 调度,造成值被修改,造成脏读和脏写。如果是方法内定义的,一定是线程安全的,因为每个方法栈是线程私有的;如果是类的静态成员变量,则不是线程安全的,因为线程共享栈区,不共享堆区和全局区。
用 Volatile 虽然能保证可见性,但是不能保证原子性,如果想要保证多线程下的安全性,可以使用原子变量(AtomicInteger)、synchronized 和 Lock锁实现。AtomicInteger 和各种 Lock 都可以确保线程安全,AtomicInteger 的效率高是因为它的互斥区非常小,而 Lock 的互斥区是拿到锁到放锁之间的区域,至少三条指令。
Java 线程同步的几种方法
synchronized
wait() 和 notify()
volatile
reentrantlock
ThreadLocal 局部变量
使用阻塞队列BlockingQueue
使用原子变量 AtomicInteger