文章转自: https://blog.csdn.net/s751167520/article/details/107512281
概述 在实际业务场景中,经常会遇到多个线程调用不同的子系统,某些子系统之间的调用需要通过顺序性来保证数据的一致性。也是一个比较经典的多线程 面试题。
有这样一个面试题:如何保证T1,T2,T3三个线程顺序执行。
使用Join join的作用 Thread类中的join方法的主要作用就是同步,它可以使得线程之间的并行执行变为串行执行。当我们调用某个线程的这个方法时,这个方法会挂起调用线程,直到被调用线程结束执行,调用线程才会继续执行。
join源代码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final synchronized void join (long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0 ; if (millis < 0 ) { throw new IllegalArgumentException ("timeout value is negative" ); } if (millis == 0 ) { while (isAlive()) { wait(0 ); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0 ) { break ; } wait(delay); now = System.currentTimeMillis() - base; } } }
由此可以看到,join的实现原理是通过 isAlive()方法判断线程是否存活。
如果join方法参数为0,则会调用isAlive()方法,检测线程是否存活,如果存活就调用wait方法,一直阻塞。
如果参数为负数,则直接报错:”timeout value is negative”
如果参数大于0,使用while判断线程是否存活,存活的话就一直判断当前线程执行的时间并且计算还需要等待的时间,如果等待时间小于等于0就跳出循环,否则就继续wait
join和start的优先级 join方法必须在线程start方法调用之后调用才有意义。如果一个线程没有start,那它也就无法同步。这是由于只有执行完start方法才会创建线程。join才会有意义。
使用join实现同步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class RedisDemoTest { public static void main (String[] args) throws InterruptedException { ThreadJoinFunction t1 = new ThreadJoinFunction ("系统A调用开始" ); ThreadJoinFunction t2 = new ThreadJoinFunction ("系统B调用开始" ); ThreadJoinFunction t3 = new ThreadJoinFunction ("系统C调用开始" ); t1.start(); t1.join(); t2.start(); t2.join(); t3.start(); t3.join(); } } class ThreadJoinFunction extends Thread { public ThreadJoinFunction (String name) { super (name); } @Override public void run () { System.out.println(this .getName()); } }
使用CountDownLatch CountDownLatch的作用
某一线程在开始运行前等待n个线程执行完毕。
实现多个线程开始执行任务的最大并行性。
CountDownLatch源代码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc == 0 ; } } } private final Sync sync; public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); } 让当前线程等待直到count减数为0 ,除非线程被中断。如果count为0 ,线程将立即返回--不再阻塞等待。 如果当前计数大于零,则出于线程调度目的,当前线程将禁用,并处于休眠状态,直到发生以下两种情况之一: 1. countDown方法调用导致count减数为0 ; 2. 别的线程中断了当前线程 线程等待时,如果被中断将会抛出InterruptedException异常 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } 使当前线程处理等待状态直到count减为0 或者指定等待时间过去。 如果当前count是0 ,则线程立即返回true 。 如果当前计数大于零,则出于线程调度目的,当前线程将禁用,并处于休眠状态,直到发生以下三种情况之一: 1. countDown方法调用导致count减数为0 ; 2. 别的线程中断了当前线程 3. 指定等待时间过去 如果等待时间过去但是count>0 ,则返回false 。如果等待时间时间小于或等于零,方法将不会等待。 public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void countDown () { sync.releaseShared(1 ); } public long getCount () { return sync.getCount(); } public String toString () { return super .toString() + "[Count = " + sync.getCount() + "]" ; } }
使用CountDownLatch实现同步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class RedisDemoTest { public static class CountDownLatchTest { CountDownLatch latch = new CountDownLatch (2 ); public static void main (String[] args) { CountDownLatchTest o = new CountDownLatchTest (); T1 t1 = o.new T1 ("系统A调用开始" ); T1 t2 = o.new T1 ("系统B调用开始" ); T2 t3 = o.new T2 ("系统C调用开始" ); t1.start(); t3.start(); t2.start(); } class T1 extends Thread { public T1 (String name) { super (name); } @Override public void run () { System.out.println(this .getName()); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } latch.countDown(); } } class T2 extends Thread { public T2 (String name) { super (name); } @Override public void run () { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this .getName()); } } } }
使用Cyclicbarrier Cyclicbarrier的作用
让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
所有的线程释放彼此之后,这个屏障是可以重新使用的(reset()方法重置屏障点)。这一点与CountDownLatch不同
让一组线程到达一个同步点后再一起继续运行,在其中任意一个线程未达到同步点,其他到达的线程均会被阻塞。
CyclicBarrier源代码分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 public class CyclicBarrier { private static class Generation { boolean broken = false ; } private final ReentrantLock lock = new ReentrantLock (); private final Condition trip = lock.newCondition(); private final int parties; private final Runnable barrierCommand; private Generation generation = new Generation (); private int count; private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation (); } private void breakBarrier () { generation.broken = true ; count = parties; trip.signalAll(); } - 使用ReentrantLock保证每一次操作线程安全; - 线程等待/唤醒使用Lock配合Condition来实现; - 线程被唤醒的条件:等待超时或者所有线程都到达barrier。 private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException (); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException (); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException (); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException (); } } } finally { lock.unlock(); } } public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException (); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } public CyclicBarrier (int parties) { this (parties, null ); } public int getParties () { return parties; } public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error (toe); } } public int await (long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true , unit.toNanos(timeout)); } public boolean isBroken () { final ReentrantLock lock = this .lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } public void reset () { final ReentrantLock lock = this .lock; lock.lock(); try { breakBarrier(); nextGeneration(); } finally { lock.unlock(); } } public int getNumberWaiting () { final ReentrantLock lock = this .lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
使用CyclicBarrier实现同步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 public class RedisDemoTest { public static void main (String[] args) { final CyclicBarrier c1 = new CyclicBarrier (2 ); final CyclicBarrier c2 = new CyclicBarrier (2 ); final CyclicBarrier c3 = new CyclicBarrier (2 ); Thread t1 = new Thread (new Runnable () { @Override public void run () { try { System.out.println("系统A调用开始" ); c1.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); Thread t2 = new Thread (new Runnable () { @Override public void run () { try { c1.await(); System.out.println("系统B调用开始" ); c2.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); Thread t3 = new Thread (new Runnable () { @Override public void run () { try { c2.await(); System.out.println("系统C调用开始" ); c3.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); t1.start(); t2.start(); t3.start(); } } Process finished with exit code 137 (interrupted by signal 9 : SIGKILL)
CachedThreadPool FutureTask一个可取消的异步计算,FutureTask 实现了Future的基本方法,提空 start cancel 操作,可以查询计算是否已经完成,并且可以获取计算的结果。结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成,那么计算就不能再次启动或是取消。 一个FutureTask 可以用来包装一个 Callable 或是一个runnable对象。因为FurtureTask实现了Runnable方法,所以一个 FutureTask可以提交(submit)给一个Excutor执行(excution)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ThreadTest3 { public static void main (String[] args) { FutureTask<Integer> future1= new FutureTask <Integer>(new Work (null )); Thread t1 = new Thread (future1); FutureTask<Integer> future2= new FutureTask <Integer>(new Work (future1)); Thread t2 = new Thread (future2); FutureTask<Integer> future3= new FutureTask <Integer>(new Work (future2)); Thread t3 = new Thread (future3); t1.start(); t2.start(); t3.start(); } static class Work implements Callable <Integer> { private FutureTask<Integer> beforeFutureTask; public Work (FutureTask<Integer> beforeFutureTask) { this .beforeFutureTask = beforeFutureTask; } public Integer call () throws Exception { if (beforeFutureTask != null ) { Integer result = beforeFutureTask.get(); System.out.println("thread start:" + Thread.currentThread().getName()); } else { System.out.println("thread start:" + Thread.currentThread().getName()); } return 0 ; } } }
使用blockingQueue 阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class ThreadTest4 {public static void main (String[] args) { BlockingQueue<Thread> blockingQueue = new LinkedBlockingQueue <Thread>(); Thread t1 = new Thread (new Work ()); Thread t2 = new Thread (new Work ()); Thread t3 = new Thread (new Work ()); blockingQueue.add(t1); blockingQueue.add(t2); blockingQueue.add(t3); for (int i=0 ;i<3 ;i++) { Thread t = null ; try { t = blockingQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } t.start(); while (t.isAlive()); } } static class Work implements Runnable { public void run () { System.out.println("thread start:" + Thread.currentThread().getName()); } } }
使用单个线程池 newSingleThreadExecutor返回以个包含单线程的Executor,将多个任务交给此Exector时,这个线程处理完一个任务后接着处理下一个任务,若该线程出现异常,将会有一个新的线程来替代。
这个其实是伪多线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class ThreadTest5 { public static void main (String[] args) throws InterruptedException { final Thread t1 = new Thread (new Runnable () { public void run () { System.out.println(Thread.currentThread().getName() + " run 1" ); } }, "T1" ); final Thread t2 = new Thread (new Runnable () { public void run () { System.out.println(Thread.currentThread().getName() + " run 2" ); try { t1.join(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } }, "T2" ); final Thread t3 = new Thread (new Runnable () { public void run () { System.out.println(Thread.currentThread().getName() + " run 3" ); try { t2.join(10 ); } catch (InterruptedException e) { e.printStackTrace(); } } }, "T3" ); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(t1); executor.submit(t2); executor.submit(t3); executor.shutdown(); } }
其他方法 信号量 //todo
wait //todo
线程池的single //todo
使用场景对比 使用场景不同 CyclicBarrier适用于一组线程之间的相互等待,而CountDownLatch、join适用于某线程或某组线程等待另一组线程的场景。 CountDownLatch两个方法配合使用也可以实现CyclicBarrier的功能的,即在线程内调用countDown() 和 await() 实现方式不同 CountDownLatch是通过AQS共享锁实现的 CyclicBarrier核心是通过ReentranLock非公平锁(独占锁)实现的 join则是利用自旋Object.wait()实现。 使用规则不同 CountDownLatch的计数器无法被重置; CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。