文章转自: https://blog.csdn.net/s751167520/article/details/107512281

概述

在实际业务场景中,经常会遇到多个线程调用不同的子系统,某些子系统之间的调用需要通过顺序性来保证数据的一致性。也是一个比较经典的多线程面试题。

有这样一个面试题:如何保证T1,T2,T3三个线程顺序执行。

使用Join

join的作用

Thread类中的join方法的主要作用就是同步,它可以使得线程之间的并行执行变为串行执行。当我们调用某个线程的这个方法时,这个方法会挂起调用线程,直到被调用线程结束执行,调用线程才会继续执行。

join源代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final synchronized void join(long millis)
throws InterruptedException {
// 获取当前基础四件
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

由此可以看到,join的实现原理是通过 isAlive()方法判断线程是否存活。

  • 如果join方法参数为0,则会调用isAlive()方法,检测线程是否存活,如果存活就调用wait方法,一直阻塞。
  • 如果参数为负数,则直接报错:”timeout value is negative”
  • 如果参数大于0,使用while判断线程是否存活,存活的话就一直判断当前线程执行的时间并且计算还需要等待的时间,如果等待时间小于等于0就跳出循环,否则就继续wait

join和start的优先级

join方法必须在线程start方法调用之后调用才有意义。如果一个线程没有start,那它也就无法同步。这是由于只有执行完start方法才会创建线程。join才会有意义。

使用join实现同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class RedisDemoTest {

public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
ThreadJoinFunction t1 = new ThreadJoinFunction("系统A调用开始");
ThreadJoinFunction t2 = new ThreadJoinFunction("系统B调用开始");
ThreadJoinFunction t3 = new ThreadJoinFunction("系统C调用开始");

t1.start();
t1.join();
t2.start();
t2.join();
t3.start();
t3.join();
}

}

class ThreadJoinFunction extends Thread{
public ThreadJoinFunction(String name){
super(name);
}
@Override
public void run(){
System.out.println(this.getName());
}
}

// 执行结果
// 系统A调用开始
// 系统B调用开始
// 系统C调用开始
// Process finished with exit code 0

使用CountDownLatch

CountDownLatch的作用

  • 某一线程在开始运行前等待n个线程执行完毕。
  • 实现多个线程开始执行任务的最大并行性。

CountDownLatch源代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class CountDownLatch {

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

//设置初始计数
Sync(int count) {
setState(count);
}

//获取当前计数
int getCount() {
return getState();
}

//这里重写了AQS的tryAcquireShared方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

//如果count-1,如果count变为0,则唤醒所有
protected boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前的状态
int c = getState();
if (c == 0)
return false;
int nextc = c-1;

//CAS思想比较实际值和预期值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final Sync sync;

//count表示线程通过await前必须要执行的次数,count不能小于0,小于0会抛异常
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

让当前线程等待直到count减数为0,除非线程被中断。如果count为0,线程将立即返回--不再阻塞等待。
如果当前计数大于零,则出于线程调度目的,当前线程将禁用,并处于休眠状态,直到发生以下两种情况之一:
1.countDown方法调用导致count减数为0
2.别的线程中断了当前线程
线程等待时,如果被中断将会抛出InterruptedException异常
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

使当前线程处理等待状态直到count减为0或者指定等待时间过去。
如果当前count是0,则线程立即返回true
如果当前计数大于零,则出于线程调度目的,当前线程将禁用,并处于休眠状态,直到发生以下三种情况之一:
1.countDown方法调用导致count减数为0
2.别的线程中断了当前线程
3.指定等待时间过去
如果等待时间过去但是count>0,则返回false。如果等待时间时间小于或等于零,方法将不会等待。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
sync.releaseShared(1);
}

public long getCount() {
return sync.getCount();
}

public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}

使用CountDownLatch实现同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class RedisDemoTest {

public static class CountDownLatchTest {
CountDownLatch latch = new CountDownLatch(2);

public static void main(String[] args) {
CountDownLatchTest o = new CountDownLatchTest();
T1 t1 = o.new T1("系统A调用开始");
T1 t2 = o.new T1("系统B调用开始");
T2 t3 = o.new T2("系统C调用开始");

t1.start();
t3.start();
t2.start();
}

class T1 extends Thread{
public T1(String name) {
super(name);
}

@Override
public void run() {
System.out.println(this.getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
}
}

class T2 extends Thread{
public T2(String name) {
super(name);
}

@Override
public void run() {
try {
latch.await(); //等待2个T1线程执行完(阻塞直到计数为0)
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.getName());
}
}
}
}


// 执行结果
// 系统A调用开始
// 系统B调用开始
// 系统C调用开始
// Process finished with exit code 0

使用Cyclicbarrier

Cyclicbarrier的作用

  • 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
  • 所有的线程释放彼此之后,这个屏障是可以重新使用的(reset()方法重置屏障点)。这一点与CountDownLatch不同
  • 让一组线程到达一个同步点后再一起继续运行,在其中任意一个线程未达到同步点,其他到达的线程均会被阻塞。

CyclicBarrier源代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
//创建CyclicBarrier后,每个线程调用await方法告诉CyclicBarrier自己已经到达同步点,然后当前线程被阻塞。
public class CyclicBarrier {

private static class Generation {
boolean broken = false;
}

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;

private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}

private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}



- 使用ReentrantLock保证每一次操作线程安全;
- 线程等待/唤醒使用Lock配合Condition来实现;
- 线程被唤醒的条件:等待超时或者所有线程都到达barrier。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//当所有线程都到达同步点(barrier)时,唤醒所有的等待线程,一起往下继续运行,可根据参数barrierAction决定优先执行的线程。
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

//线程未到达同步点(barrier)时,线程进入Condition自旋等待,直到等待超时或者所有线程都到达barrier时被唤醒。
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

//由于线程之前的调度是由CPU决定的,默认的构造方法无法设置线程执行优先级,CyclicBarrier提供一个更高级的构造函数CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达同步点时,优先执行线程barrierAction,这样可以更加方便的处理一些负责的业务场景。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

//默认构造方法,参数表示拦截的线程数量。
public CyclicBarrier(int parties) {
this(parties, null);
}

public int getParties() {
return parties;
}

//默认的await实现
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

//带超时时间的await实现,核心方法均为dowait()
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}

使用CyclicBarrier实现同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class RedisDemoTest {
public static void main(String[] args) {
final CyclicBarrier c1 = new CyclicBarrier(2);
final CyclicBarrier c2 = new CyclicBarrier(2);
final CyclicBarrier c3 = new CyclicBarrier(2);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("系统A调用开始");
c1.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

}
});

Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
c1.await();
System.out.println("系统B调用开始");
c2.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});

Thread t3 = new Thread(new Runnable() {
@Override
public void run() {
try {
c2.await();
System.out.println("系统C调用开始");
c3.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});

t1.start();
t2.start();
t3.start();
}
}

// 系统A调用开始
// 系统B调用开始
// 系统C调用开始

Process finished with exit code 137 (interrupted by signal 9: SIGKILL)

CachedThreadPool

FutureTask一个可取消的异步计算,FutureTask 实现了Future的基本方法,提空 start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果。结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成,那么计算就不能再次启动或是取消。
一个FutureTask 可以用来包装一个 Callable 或是一个runnable对象。因为FurtureTask实现了Runnable方法,所以一个 FutureTask可以提交(submit)给一个Excutor执行(excution)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 public class ThreadTest3 {
// T1、T2、T3三个线程顺序执行
public static void main(String[] args) {
FutureTask<Integer> future1= new FutureTask<Integer>(new Work(null));
Thread t1 = new Thread(future1);

FutureTask<Integer> future2= new FutureTask<Integer>(new Work(future1));
Thread t2 = new Thread(future2);

FutureTask<Integer> future3= new FutureTask<Integer>(new Work(future2));
Thread t3 = new Thread(future3);

t1.start();
t2.start();
t3.start();
}

static class Work implements Callable<Integer> {
private FutureTask<Integer> beforeFutureTask;
public Work(FutureTask<Integer> beforeFutureTask) {
this.beforeFutureTask = beforeFutureTask;
}
public Integer call() throws Exception {
if (beforeFutureTask != null) {
Integer result = beforeFutureTask.get();//阻塞等待
System.out.println("thread start:" + Thread.currentThread().getName());
} else {
System.out.println("thread start:" + Thread.currentThread().getName());
}
return 0;
}
}
}

使用blockingQueue

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ThreadTest4 {
// T1、T2、T3三个线程顺序执行
public static void main(String[] args) {
//blockingQueue保证顺序
BlockingQueue<Thread> blockingQueue = new LinkedBlockingQueue<Thread>();
Thread t1 = new Thread(new Work());
Thread t2 = new Thread(new Work());
Thread t3 = new Thread(new Work());

blockingQueue.add(t1);
blockingQueue.add(t2);
blockingQueue.add(t3);

for (int i=0;i<3;i++) {
Thread t = null;
try {
t = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
t.start();
//检测线程是否还活着
while (t.isAlive());
}
}

static class Work implements Runnable {

public void run() {
System.out.println("thread start:" + Thread.currentThread().getName());
}
}
}

使用单个线程池

newSingleThreadExecutor返回以个包含单线程的Executor,将多个任务交给此Exector时,这个线程处理完一个任务后接着处理下一个任务,若该线程出现异常,将会有一个新的线程来替代。

这个其实是伪多线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ThreadTest5 {

public static void main(String[] args) throws InterruptedException {
final Thread t1 = new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + " run 1");
}
}, "T1");
final Thread t2 = new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + " run 2");
try {
t1.join(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "T2");
final Thread t3 = new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + " run 3");
try {
t2.join(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "T3");

//使用 单个任务的线程池来实现。保证线程的依次执行
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(t1);
executor.submit(t2);
executor.submit(t3);
executor.shutdown();
}
}

其他方法
信号量
//todo

wait
//todo

线程池的single
//todo

使用场景对比
使用场景不同
  CyclicBarrier适用于一组线程之间的相互等待,而CountDownLatch、join适用于某线程或某组线程等待另一组线程的场景。
  CountDownLatch两个方法配合使用也可以实现CyclicBarrier的功能的,即在线程内调用countDown() 和 await()
实现方式不同
  CountDownLatch是通过AQS共享锁实现的
  CyclicBarrier核心是通过ReentranLock非公平锁(独占锁)实现的
  join则是利用自旋Object.wait()实现。
使用规则不同
  CountDownLatch的计数器无法被重置;
  CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。