文章转自:https://juejin.im/post/6844903635751534606

概述

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理地使用公共资源。Semaphore可以用作流量控制,特别是公共资源有限的应用场景,比如数据库的连接。

Semaphore主要用于管理信号量,同样在创建Semaphore对象实例的时候通过传入构造参数设定可供管理的信号量的数值。

简单说,信号量管理的信号就好比令牌,构造时传入令牌数量,也就是Semaphore控制并发的数量。

线程在执行并发的代码前要先获取信号(通过aquire函数获取信号许可),执行后归还信号(通过release方法归还),每次acquire信号成功后,Semaphore可用的信号量就会减一,同样release成功之后,Semaphore可用信号量的数目会加一,如果信号量的数量减为0,acquire调用就会阻塞,直到release调用释放信号后,aquire才会获得信号返回。

使用方法

一个应用程序要读取几万个文件的数据,由于都是IO密集型任务,所以可以启动几十个(例如10)个线程并发地读取。读取到内存中后还要将数据存储到数据库,但是数据库的连接只有3个。此时就必须设置策略控制只有10个线程同时获取数据库连接并保存数据,否则会报错,无法连接到数据库,这时Semaphore就派上用场了,用它来做流量控制,即连接到数据库的线程数。

一个例子:一个应用程序要读取几万个文件的数据,由于都是IO密集型任务,所以可以启动几十个(例如10)个线程并发地读取。读取到内存中后还要将数据存储到数据库,但是数据库的连接只有3个。此时就必须设置策略控制只有10个线程同时获取数据库连接并保存数据,否则会报错,无法连接到数据库,这时Semaphore就派上用场了,用它来做流量控制,即连接到数据库的线程数。

本例中有限的公共资源是数据库连接。Java实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {

/* 读取文件数据的线程数量,并创建此容量的线程池 */
private static final int THREAD_COUNT = 10;
private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);

/* 创建Semaphore对象实例,构造函数的参数指定信号量的数目,为了方便说明问题,设为3 */
private static Semaphore semaphore = new Semaphore(3);

public static void main(String[] args){

/* 创建线程读取数据,并尝试获取数据库连接,将数据存储到数据库中 */
for(int i = 0;i < THREAD_COUNT;i++){
final int index = i;

Runnable task = new Runnable() {
public void run() {
try {
/*从远程读数据*/
System.out.println("thread-"+ index + " is reading data from remote host");

/* 通过acquire 函数获取数据库连接,如果成功将数据存储到数据库 */
semaphore.acquire();
System.out.println("thread-"+ index + " is saving data....");
/*模拟存储数据耗时*/
Thread.sleep(10);
}catch (InterruptedException e){
e.printStackTrace();
}finally {

/* 最终使用release 函数释放信号量 */
semaphore.release();
}
}
};
executorService.execute(task);
}
}

}