并发编程: J.U.C.

张天宇 on 2020-07-05

Java并发编程终结篇,J.U.C.。

AQS

概述

全称AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。

特点:

  • 用state属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState - 获取 state 状态
    • setState - 设置 state 状态
    • compareAndSetState - cas 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

子类主要实现一些方法(默认抛出 UnsupportedOperationException):

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

获取锁的姿势:

1
2
3
4
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}

释放锁的姿势:

1
2
3
4
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}

实现不可重入锁

自定义同步器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 锁已经加上了,并且设置owner为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
自定义锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class MyLock implements Lock {
// 独占锁,同步器类
class MySync extends AbstractQueuedSynchronizer {
// ...
}
private MySync mySync = new MySync();
@Override // 加锁
public void lock() {
mySync.acquire(1);
}
@Override // 加锁,可打断
public void lockInterruptibly() throws InterruptedException {
mySync.acquireInterruptibly(1);
}
@Override // 尝试加锁,只试一次
public boolean tryLock() {
return mySync.tryAcquire(1);
}
@Override // 带超时加锁
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return mySync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override // 解锁
public void unlock() {
mySync.release(1);
}
@Override // 创建条件变量
public Condition newCondition() {
return mySync.newCondition();
}
}
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking..");
Thread.sleep(3000);
} catch (Exception e) {
e.printStackTrace();
} finally {
log.debug("unlocking..");
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking..");
} finally {
log.debug("unlocking..");
lock.unlock();
}
}, "t2").start();
/**
2020-07-03 21:04:45.197 [t1] DEBUG TestPool - locking..
2020-07-03 21:04:48.203 [t1] DEBUG TestPool - unlocking..
2020-07-03 21:04:48.203 [t2] DEBUG TestPool - locking..
2020-07-03 21:04:48.203 [t2] DEBUG TestPool - unlocking..
不可重入测试,如果改为下面代码,会发现自己也会被挡住,只会打印一次locking
lock.lock();
log.debug("locking...");
lock.lock();
log.debug("locking...");
**/

ReentrantLock原理

非公平锁实现原理

加锁流程

构造器默认为非公平实现

1
2
3
public ReentrantLock() {
sync = new NonfairSync(); //NonfairSync继承自AQS
}

没有竞争时候

1
2
3
4
5
6
7
8
final void lock() {
// 没有竞争时候
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 竞争出现了
acquire(1);
}
  1. CAS 尝试将 state 由 0 改为 1
  2. 成功后将owner改成当前线程

第一个竞争出现的时候

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

Thread-1执行了

  1. CAS尝试将state由0改为1,结果失败
  2. 进入tryAcquire逻辑,这时state已经是1,结果仍然失败
  3. 接下来进入addWaiter逻辑,构造Node队列
    • 图中黄色三角表示该Node的 waitStatus 状态,其中0为默认正常状态
    • Node的创建是懒惰的
    • 首次创建会创建两个Node,其中第一个Node称为Dummy(哑元)或哨兵,用来占位,并不关联线程

当前线程进入acquireQueued逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入park阻塞
  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为1,失败
  3. 进入shouldParkAfterFailedAcquire逻辑,将前驱 node,即head的waitStatus改为-1,表示它有责任唤醒它的后继节点(Thread-1长期未获得锁应该进入阻塞,所以需要有一个结点唤醒,由前驱唤醒),这次返回false
  4. shouldParkAfterFailedAcquire执行完毕后再次进入循环,再次tryAcquire尝试获取锁,这时候state仍为1,失败
  5. 当再次进入shouldParkAfterFailedAcquire时候,这时候因为前驱node的waitStatus已经是-1,这次返回true
  6. 进入parkAndCheckInterupt,Thread-1 park,灰色表示

再次有多个线程经历上述竞争失败,会变成下图

Thread-0释放锁,进入tryRelease流程,如果成功

  • 设置exclusiveOwnerThread为null
  • state = 0

当前队列不为null,并且head的waitStatus = -1,进入unparkSuccessor流程。找到队列中离head最近的一个没取消的Node,unpark恢复其运行。回到Thread-1的acquireQueued流程

如果没有竞争加锁成功,会设置

  • exclusiveOwnerThread为Thread-1,state = 1
  • head指向刚刚Thread-1所在的Node,该Node清空Thread
  • 原本的head因为从链表断开,而可被垃圾回收

如果这时候有其他线程来竞争,如Thread-4,又碰巧被Thread-4抢先

  • Thread-4被设置为exclusiveOwnerThread,state = 1
  • Thread-1再次进入acquireQueued流程,获取锁失败,重新进入park阻塞

锁重入原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static final class NonfairSync extends Sync {
// ...
// Sync 继承过来的方法, 方便阅读, 放在此处
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// 主要完成了state++的操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// 主要完成了state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功,否则直接返回更新后的state
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}

可打断原理

不可打断模式

在此模式下,即使它被打断,仍会驻留在AQS队列中,一直要等到获得锁后方能得知自己被打断了。

1
2
3
4
5
6
7
if (
shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()
) {
// 如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}
可打断模式
1
2
3
4
5
6
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) {
// 在 park 过程中如果被 interrupt 会进入此
// 这时候抛出异常, 而不会再次进入 for (;;)
throw new InterruptedException();
}

公平锁实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 非公平锁实现
// if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

条件变量实现原理

每个条件变量其实就对应着一个等待队列,其实现类是ConditionObject。

await

开始hThread-0持有锁,调用await,进入ConditionObject的addConditionWaiter流程。创建新的Node状态为-2(Node.CONDITION),关联Thread-0,加入等待队列尾部。

接下来j进入AQS的fullyRelease流程,释放同步器上的所有锁。

Unpark AQS队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么Thread-1竞争成功。

Park阻塞Thread-0。

signal

以上图为例,接下来假设Thread-1去唤醒Thread-0。进入ConditionObject的doSignal流程,取得等待队列中的第一个Node,即Thread-0所在的Node。

执行transferForSignal流程,将该Node加入到AQS尾部,将Thread-0的waitStatus改为0,Thread-3的waitStatus改为-1。

Thread-1释放锁,进入Unlock流程。

读写锁

ReentrantReadWriteLock

当读操作远远多于写操作的时候,这时候使用读写锁让读-读可以并发,提高性能,类似于数据库中的select ... from ... lock in share mode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j
class Container {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁。。。");
r.lock();
try {
log.debug("读取");
Thread.sleep(1000);
return data;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放读锁。。。");
r.unlock();
}
return null;
}
public void write() {
log.debug("获取写锁。。。");
w.lock();
try {
log.debug("写入");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放写锁");
w.unlock();
}
}
}
public static void main(String[] args) {
Container dataContainer = new Container();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
new Thread(() -> {
dataContainer.write();
}, "t3").start();
}
/**
其中一种情况,可以看到写锁和读锁是互斥的
2020-07-05 15:36:56.489 [t1] DEBUG Container - 获取读锁。。。
2020-07-05 15:36:56.489 [t2] DEBUG Container - 获取读锁。。。
2020-07-05 15:36:56.489 [t3] DEBUG Container - 获取写锁。。。
2020-07-05 15:36:56.492 [t1] DEBUG Container - 读取
2020-07-05 15:36:56.492 [t2] DEBUG Container - 读取
2020-07-05 15:36:57.492 [t1] DEBUG Container - 释放读锁。。。
2020-07-05 15:36:57.492 [t2] DEBUG Container - 释放读锁。。。
2020-07-05 15:36:57.492 [t3] DEBUG Container - 写入
2020-07-05 15:36:58.493 [t3] DEBUG Container - 释放写锁
**/

注意事项

  • 读锁不支持条件变量

  • 重入时不支持升级,即持有读锁的情况下去获取写锁,会导致获取写锁永久等待

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    r.lock();
    try {
    // ...
    w.lock();
    try {
    // ...
    } finally{
    w.unlock();
    }
    } finally{
    r.unlock();
    }
  • 重入时支持降级,即持有写锁的情况下去获取读锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    class CachedData {
    Object data;
    // 是否有效,如果失效,需要重新计算 data
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    void processCachedData() {
    rwl.readLock().lock();
    if (!cacheValid) {
    // 获取写锁前必须释放读锁
    rwl.readLock().unlock();
    rwl.writeLock().lock();
    try {
    // 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
    if (!cacheValid) {
    data = ...
    cacheValid = true;
    }
    // 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
    rwl.readLock().lock();
    } finally {
    rwl.writeLock().unlock();
    }
    }
    // 自己用完数据, 释放读锁
    try {
    use(data);
    } finally {
    rwl.readLock().unlock();
    }
    }
    }

StampedLock

该类自JDK 8加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时候都必须配合戳的使用。

加解读锁

1
2
long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

1
2
long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock支持tryOptimisticRead()方法(乐观读),读取完毕之后需要做一次戳校验,如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

1
2
3
4
5
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 校验未通过,进行锁升级
}

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Slf4j
public class Test {
public static void main(String[] args) {
ContainerStamped dataContainer = new ContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
new Thread(() -> {
dataContainer.write(1000);
}, "t2").start();
}
}
@Slf4j
class ContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public ContainerStamped(int data) {
this.data = data;
}
public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("乐观锁{}", stamp);
try {
Thread.sleep(readTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (lock.validate(stamp)) {
log.debug("验戳{}", stamp);
}
log.debug("锁升级{}", stamp);
try {
stamp = lock.readLock();
log.debug("读锁{}", stamp);
Thread.sleep(readTime);
log.debug("读锁操作完成{}", stamp);
return data;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("读解锁{}", stamp);
lock.unlockRead(stamp);
}
return -1;
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("写锁{}", stamp);
try {
Thread.sleep(2000);
this.data = newData;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("写解锁{}", stamp);
lock.unlockWrite(stamp);
}
}
}
/**
2020-07-05 21:06:38.482 [t1] DEBUG ContainerStamped - 乐观锁256
2020-07-05 21:06:38.482 [t2] DEBUG ContainerStamped - 写锁384
2020-07-05 21:06:38.490 [t1] DEBUG ContainerStamped - 验戳失败256 // 锁升级,但是被写锁阻塞了
2020-07-05 21:06:40.488 [t2] DEBUG ContainerStamped - 写解锁384
2020-07-05 21:06:40.489 [t1] DEBUG ContainerStamped - 读锁513 // 等写锁释放之后才能加上
2020-07-05 21:06:40.491 [t1] DEBUG ContainerStamped - 读锁操作完成513
2020-07-05 21:06:40.491 [t1] DEBUG ContainerStamped - 读解锁513
**/

注意

  • StampedLock不支持条件变量
  • 不支持可重入

Semaphore

信号量,用来限制能同时访问共享资源的线程上限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Test {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 7; i++) {
new Thread(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running..");
Thread.sleep(1000);
log.debug("end...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
/**
2020-07-05 21:19:23.844 [Thread-2] DEBUG Test - running..
2020-07-05 21:19:23.844 [Thread-1] DEBUG Test - running..
2020-07-05 21:19:24.847 [Thread-1] DEBUG Test - end...
2020-07-05 21:19:24.847 [Thread-2] DEBUG Test - end...
2020-07-05 21:19:24.847 [Thread-3] DEBUG Test - running..
2020-07-05 21:19:24.847 [Thread-4] DEBUG Test - running..
2020-07-05 21:19:25.848 [Thread-3] DEBUG Test - end...
2020-07-05 21:19:25.848 [Thread-4] DEBUG Test - end...
2020-07-05 21:19:25.848 [Thread-6] DEBUG Test - running..
2020-07-05 21:19:25.848 [Thread-5] DEBUG Test - running..
2020-07-05 21:19:26.848 [Thread-6] DEBUG Test - end...
2020-07-05 21:19:26.848 [Thread-5] DEBUG Test - end...
2020-07-05 21:19:26.848 [Thread-7] DEBUG Test - running..
2020-07-05 21:19:27.848 [Thread-7] DEBUG Test - end...
**/
  • 使用semaphore限流。在访问高峰期时,让请求线程阻塞,高峰期过去之后再释放许可,当然它只适合限制单机数量,并且仅是限制线程数,而不是限制资源数。

  • 用semaphore实现简单连接池,对比享元模式用wait/notify实现,性能和可读性显然更好,注意下面实现中线程数和数据库连接数是相等的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    @Slf4j
    class Pool {
    private final int poolSize;
    private Connection[] connections;
    private AtomicIntegerArray states;
    private Semaphore semaphore;
    public Pool(int poolSize) {
    this.poolSize = poolSize;
    this.semaphore = new Semaphore(poolSize);
    this.states = new AtomicIntegerArray(new int[poolSize]);
    for (int i = 0; i < poolSize; i++) {
    connections[i] = new DBConect("连接" + (i + 1));
    }
    }
    public Connection borrow() {
    try {
    semaphore.acquire();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    for (int i = 0; i < poolSize; i++) {
    if (states.get(i) == 0) {
    if (states.compareAndSet(i, 0, 1)) {
    log.debug("borrow{}", connections[i]);
    return connections[i];
    }
    }
    }
    return null;
    }
    public void free(Connection connection) {
    for (int i = 0; i < poolSize; i++) {
    if (connections[i] == connection) {
    states.set(i, 0);
    log.debug("free{}", connection);
    semaphore.release();
    break;
    }
    }
    }
    }

    假设刚开始,permits(state)为3,这时候有五个线程来获取资源。假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而Thread-0和Thread-3竞争失败,进入AQS队列park 阻塞。这时Thread-4释放了permits,接下来Thread-0竞争成功,permits再次设置为0,设置自己为head节点,断开原来的head节点,unpark接下来的Thread-3节点,但由于permits是0,因此Thread-3在尝试不成功后再次进入park状态。

CountDownLatch

CountDownLatch 在多线程并发编程中充当一个计时器的功能,并且维护一个 count 的变量,并且其操作都是原子操作,该类主要通过 countDown() 和 await() 两个方法实现功能的,首先通过建立 CountDownLatch 对象,并且传入参数即为 count 初始值。如果一个线程调用了 await() 方法,那么这个线程便进入阻塞状态,并进入阻塞队列。如果一个线程调用了 countDown() 方法,则会使 count-1;当 count 的值为 0 时,这时候阻塞队列中调用 await() 方法的线程便会逐个被唤醒,从而进入后续的操作。

Thread 对象的 join 方法可以实现相同的功能,但是特别地,当使用了线程池时,则 join() 方法便无法实现。但 CountDownLatch 依然可以实现功能。

CountDownLatch 类主要使用的场景有明显的顺序要求:比如所有英雄都加载完之后才能进图游戏等等,因此 CountDownLatch 完善的是某种逻辑上的功能,使得线程按照正确的逻辑进行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class Test {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() -> {
log.debug("start...");
sleep(1);
countDownLatch.countDown();
log.debug("end...");
}).start();
new Thread(() -> {
log.debug("start...");
sleep(2);
countDownLatch.countDown();
log.debug("end...");
}).start();
new Thread(() -> {
log.debug("start...");
sleep(3);
countDownLatch.countDown();
log.debug("end...");
}).start();
log.debug("main thread start,,,");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("main thread end,,,");
}
public static void sleep(int timeout) {
try {
Thread.sleep(timeout * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
2020-07-06 19:46:06.721 [Thread-3] DEBUG Test - start...
2020-07-06 19:46:06.721 [Thread-2] DEBUG Test - start...
2020-07-06 19:46:06.721 [main] DEBUG Test - main thread start,,,
2020-07-06 19:46:06.721 [Thread-1] DEBUG Test - start...
2020-07-06 19:46:07.726 [Thread-1] DEBUG Test - end...
2020-07-06 19:46:08.727 [Thread-2] DEBUG Test - end...
2020-07-06 19:46:09.725 [Thread-3] DEBUG Test - end...
2020-07-06 19:46:09.725 [main] DEBUG Test - main thread end,,,
**/

线程池改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CountDownLatch latch = new CountDownLatch(2);
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
executorService.submit(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
executorService.submit(() -> {
log.debug("waiting...");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("waiting end...{}", latch.getCount());
});

应用,等玩家都加载完毕后开启游戏:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService executorService = Executors.newFixedThreadPool(10);
Random r = new Random();
String[] all = new String[10];
for (int i = 0; i < 10; i++) {
int k = i;
executorService.submit(() -> {
for (int j = 0; j <= 100; j++) {
sleep(r.nextInt(100));
all[k] = j + "%";
// 不换行替换原来位置
System.out.print("\r" + Arrays.toString(all));
}
latch.countDown();
});
}
latch.await();
System.out.println("\n游戏开始");
executorService.shutdown();
}
/**
[100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%, 100%]
游戏开始
**/

微服务中等待多个服务器返回的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
log.debug("start");
executorService.submit(() -> {
log.debug("等待{}", method1());
latch.countDown();
});
executorService.submit(() -> {
log.debug("等待{}", method2());
latch.countDown();
});
executorService.submit(() -> {
log.debug("等待{}", method3());
latch.countDown();
});
latch.await();
System.out.println("end");
executorService.shutdown();
}
// 假设有三个方法分别从其他服务器获取数据
public static int method1() {return sleep(1);}
public static int method2() {return sleep(3);}
public static int method3() {return sleep(5);}
/**
2020-07-06 20:42:08.877 [main] DEBUG Test - start
2020-07-06 20:42:09.963 [pool-2-thread-1] DEBUG Test - 等待1
2020-07-06 20:42:11.965 [pool-2-thread-2] DEBUG Test - 等待3
2020-07-06 20:42:13.965 [pool-2-thread-3] DEBUG Test - 等待5
end
**/
// 将数据返回到主线程
public static void main(String[] args) throws InterruptedException, ExecutionException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
log.debug("start");
Future<Integer> f1 = executorService.submit(() -> {
return method1();
});
Future<Integer> f2 = executorService.submit(() -> {
return method2();
});
Future<Integer> f3 = executorService.submit(() -> {
return method3();
});
System.out.println("结果" + f1.get());
System.out.println("结果" + f2.get());
System.out.println("结果" + f3.get());
System.out.println("end");
executorService.shutdown();
}

CyclicBarrier

循环栅栏,用来进行线程协作,等待线程满足某个个数。构造时候设置计数个数,每个线程运行到某个需要同步的时刻,调用await进行等待,当等待线程数满足计数个数时候,继续运行。用来解决CountDownlatch不能重用的问题,比如某几个同步线程需要运行三遍。CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 初始化数量和计数个数要一致
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println("中断点...");
});
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
executorService.submit(() -> {
System.out.println("线程1开始..." + new Date());
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..." + new Date());
});
executorService.submit(() -> {
System.out.println("线程2开始..." + new Date());
try {
Thread.sleep(1000);
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..." + new Date());
});
}
/**
线程1开始...Mon Jul 06 21:13:42 CST 2020
线程2开始...Mon Jul 06 21:13:42 CST 2020
中断点...
线程2继续向下运行...Mon Jul 06 21:13:43 CST 2020
线程1继续向下运行...Mon Jul 06 21:13:43 CST 2020
线程1开始...Mon Jul 06 21:13:43 CST 2020
线程2开始...Mon Jul 06 21:13:43 CST 2020
中断点...
线程1继续向下运行...Mon Jul 06 21:13:44 CST 2020
线程2继续向下运行...Mon Jul 06 21:13:44 CST 2020
**/

线程安全集合类

  • 遗留的线程安全集合,如HashTable,Vector

  • 使用Collections装饰的线程安全集合,如

    • Collections.synchronizedCollection
    • Collections.synchronizedList
    • Collections.synchronizedMap
    • Collections.synchronizedSet
    • Collections.synchronizedNavigableMap
    • Collections.synchronizedNavigableSet
    • Collections.synchronizedSortedMap
    • Collections.synchronizedSortedSet
  • JUC

    • 它们有规律,里面包含三类关键词:Blocking、CopyOnWrite、Concurrent

    • Blocking 大部分实现基于锁,并提供用来阻塞的方法

    • CopyOnWrite 之类容器修改开销相对较重

    • Concurrent 类型的容器

      • 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
      • 弱一致性
      • 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
        求大小弱一致性,size 操作未必是 100% 准确,fast-save
      • 读取弱一致性

      遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出
      ConcurrentModificationException,不再继续遍历

ConcurrentHashMap

ConcurrentHashMap从JDK1.5开始随java.util.concurrent包一起引入JDK中,在JDK8以前,ConcurrentHashMap都是基于Segment分段锁来实现的,在JDK8以后,就换成synchronized和CAS这套实现机制了。

JDK1.8中的ConcurrentHashMap不再使用Segment分段锁,而是以table数组的头结点作为synchronized的锁。和JDK1.8中的HashMap类似,对于hashCode相同的时候,在Node节点的数量少于8个时,这时的Node存储结构是链表形式,时间复杂度为O(N),当Node节点的个数超过8个时,则会转换为红黑树,此时访问的时间复杂度为O(long(N))。旧版本的一个segment锁,保护了多个hash桶,而jdk8版本的一个锁只保护一个hash桶,由于锁的粒度变小了,写操作的并发性得到了极大的提升。

如何保证线程安全
  • 使用volatile保证当Node中的值变化时对于其他线程是可见的
  • 使用table数组的头结点作为synchronized的锁来保证写操作的安全
  • 当头结点为null时,使用CAS操作来保证数据能正确的写入。
高效扩容
  • 扩容线程增大

    扩容时,需要锁的保护。因此:旧版本最多可以同时扩容的线程数是segment锁的个数
    而jdk8的版本,理论上最多可以同时扩容的线程数是:hash桶的个数(table数组的长度)。但是为了防止扩容线程过多,ConcurrentHashMap规定了扩容线程每次最少迁移16个hash桶,因此jdk8的版本实际上最多可以同时扩容的线程数是:hash桶的个数/16

  • 扩容期间,依然保证较高的并发度

    旧版本的segment锁,锁定范围太大,导致扩容期间,写并发度,严重下降。而新版本的采用更加细粒度的hash桶级别锁,扩容期间,依然可以保证写操作的并发度。

ConcurrentHashMap的put方法是如何通过CAS确保线程安全的

假设此时有2个put线程,都发现此时桶为空,线程一执行casTabAt(tab,i,null,node1),此时tab[i]等于预期值null,因此会插入node1。随后线程二执行casTabAt(tba,i,null,node2),此时tab[i]不等于预期值null,插入失败。然后线程二会回到for循环开始处,重新获取tab[i]作为预期值,重复上述逻辑。

以上通过for循环+CAS操作,实现并发安全的方式就是无锁算法(lock free)的经典实现。

重要属性和内部类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash 表
transient volatile Node<K,V>[] table;
// 扩容时的 新 hash 表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}

HashMap 和 TreeMap

这里他们两个都不是线程安全的,放在这主要是和上边的并发对比一下。

  1. HashMap是通过hashcode()对其内容进行快速查找的;HashMap中的元素是没有顺序的;TreeMap中所有的元素都是有某一固定顺序的,如果需要得到一个有序的结果,就应该使用TreeMap。
  2. HashMap和TreeMap都不是线程安全的;
  3. HashMap继承AbstractMap类;覆盖了hashcode() 和equals() 方法,以确保两个相等的映射返回相同的哈希值;TreeMap继承SortedMap类;他保持键的有序顺序;
  4. HashMap:基于hash表实现的;使用HashMap要求添加的键类明确定义了hashcode() 和equals() (可以重写该方法);为了优化HashMap的空间使用,可以调优初始容量和负载因子;TreeMap:基于红黑树实现的;TreeMap就没有调优选项,因为红黑树总是处于平衡的状态;
  5. HashMap:适用于Map插入,删除,定位元素;TreeMap:适用于按自然顺序或自定义顺序遍历键(key);

Hashmap 是一个最常用的Map,它根据键的HashCode 值存储数据,根据键可以直接获取它的值,具有很快的访问速度,遍历时,取得数据的顺序是完全随机的。HashMap最多只允许一条记录的键为Null;允许多条记录的值为 Null;HashMap不支持线程的同步,即任一时刻可以有多个线程同时写HashMap;可能会导致数据的不一致。如果需要同步,可以用Collections的synchronizedMap方法使HashMap具有同步的能力,或者使用ConcurrentHashMap。

Hashtable与 HashMap类似,它继承自Dictionary类,不同的是:它不允许记录的键或者值为空;它支持线程的同步,即任一时刻只有一个线程能写Hashtable,因此也导致了 Hashtable在写入时会比较慢。

LinkedHashMap保存了记录的插入顺序,在用Iterator遍历LinkedHashMap时,先得到的记录肯定是先插入的.也可以在构造时用带参数,按照应用次数排序。在遍历的时候会比HashMap慢,不过有种情况例外,当HashMap容量很大,实际数据较少时,遍历起来可能会比LinkedHashMap慢,因为LinkedHashMap的遍历速度只和实际数据有关,和容量无关,而HashMap的遍历速度和他的容量有关。

TreeMap实现SortMap接口,能够把它保存的记录根据键排序,默认是按键值的升序排序,也可以指定排序的比较器,当用Iterator 遍历TreeMap时,得到的记录是排过序的。

synchronized 和 reentrantlock

共同点

  • 用来协调多线程对共享对象、变量的访问
  • 都是可重入锁,同一线程可以多次获得同一个锁
  • 保证了可见性和互斥性

不同点

  1. ReentrantLock 显式的获得、释放锁,synchronized 隐式获得释放锁
  2. ReentrantLock 可响应中断、可轮回,synchronized 是不可以响应中断的,为处理锁的不可用性提供了更高的灵活性
  3. ReentrantLock 是 API 级别的,synchronized 是 JVM 级别的
  4. ReentrantLock 可以实现公平锁
  5. ReentrantLock 通过 Condition 可以绑定多个条件
  6. 底层实现不一样, synchronized 是同步阻塞,使用的是悲观并发策略,lock 是同步非阻塞,采用的是乐观并发策略
  7. Lock 是一个接口,而 synchronized 是 Java 中的关键字,synchronized 是内置的语言实现。
  8. synchronized 在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而 Lock 在发生异常时,如果没有主动通过 unLock()去释放锁,则很可能造成死锁现象,因此使用 Lock 时需要在 finally 块中释放锁。
  9. Lock 可以让等待锁的线程响应中断,而 synchronized 却不行,使用 synchronized 时,等待的线程会一直等待下去,不能够响应中断。
  10. 通过 Lock 可以知道有没有成功获取锁,而 synchronized 却无法办到。
  11. Lock 可以提高多个线程进行读操作的效率,既就是实现读写锁等。