是一種滿足一種規(guī)則的類的統(tǒng)稱。它有以下特性:
· 它是一個(gè)對(duì)象。
· 封裝狀態(tài),而這些狀態(tài)決定著線程執(zhí)行到某一點(diǎn)是通過還是被迫等待
· 提供操作狀態(tài)的方法。
其實(shí)BlockingQueue就是一種Synchronizer。Java還提供了其他幾種Synchronizer。
CountDownLatch是一種閉鎖,它通過內(nèi)部一個(gè)計(jì)數(shù)器count來標(biāo)示狀態(tài)。一個(gè)線程調(diào)用await()方法之后,當(dāng)count>0時(shí),所有調(diào)用其await方法的線程都需等待,當(dāng)通過其countDown方法將count降為0時(shí)所有等待的線程將會(huì)被喚起。使用實(shí)例如下所示:
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
//開始的"門",計(jì)數(shù)是1
final CountDownLatch startGate = new CountDownLatch(1);
//結(jié)束的"門",計(jì)數(shù)是工作的線程數(shù)
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
//使得每個(gè)線程都處于等待狀態(tài),確保他們已經(jīng)處于等待狀態(tài)
startGate.await();
try {
task.run();
}finally {
//每個(gè)線程都為結(jié)束線程減1
endGate.countDown();
}
} catch (InterruptedException ignored) { }
}
};
t.start();
}
long start = System.nanoTime();
//開始運(yùn)行,每個(gè)線程都已經(jīng)準(zhǔn)備好了
startGate.countDown();
//直到每個(gè)線程都與你選哪個(gè)完畢,獲得執(zhí)行時(shí)間
endGate.await();
long end = System.nanoTime();
return end-start;
}
}
閉鎖主要是主線程等待子線程,等待的是某個(gè)事件,子線程在countDown之后就可以結(jié)束,將數(shù)量減1,所有子線程結(jié)束之后,到0,那么主線程的等待也結(jié)束了,繼續(xù)往下(和關(guān)卡的對(duì)比)。
當(dāng)然也可以實(shí)現(xiàn)類似關(guān)卡的功能,初始化一個(gè)1的內(nèi)部計(jì)數(shù)器,調(diào)用await方法讓它停下來,
另一個(gè)可以作為閉鎖的類是FutureTask,它描述了一個(gè)抽象的可攜帶結(jié)果的計(jì)算。它是通過Callable實(shí)現(xiàn)的,它等價(jià)于一個(gè)可攜帶結(jié)果的Runnable,并且有3個(gè)狀態(tài):等待、運(yùn)行和完成(包括正常結(jié)束、取消和異常)。一旦FutureTask進(jìn)入完成狀態(tài),它就永遠(yuǎn)停留在這個(gè)狀態(tài)上。
Semaphore類實(shí)際上就是操作系統(tǒng)中談到的信號(hào)量的一種實(shí)現(xiàn),其原理就不再累述,可見探索并發(fā)編程------操作系統(tǒng)篇
它就像一個(gè)容量池,用于限制可以訪問某些資源的線程數(shù)目。一般操作系統(tǒng)的進(jìn)程調(diào)度中使用了PV原語,需要設(shè)置一個(gè)信號(hào)量信號(hào)量表示可用資源的數(shù)量,P原語就相當(dāng)于acquire,V原語就相當(dāng)于release。
具體使用就是通過其acquire(獲得一個(gè)許可)和release(釋放許可)方法來完成,如以下示例:
/**
* 信號(hào)量測(cè)試
* 這里用于控制對(duì)內(nèi)容池的訪問,內(nèi)容池的大小作為Semaphore的構(gòu)造函數(shù)傳遞給它
* 每個(gè)線程獲取數(shù)據(jù)之前必須獲得許可。這樣就限制了訪問線程池的數(shù)目
*
* @author Administrator
*
*/
public class SemaphoreTes {
private static final int MAX_AVAILABLE = 5;
protected Object[] items = { "AAA", "BBB", "CCC", "DDD", "EEE" };
protected boolean[] used = new boolean[MAX_AVAILABLE];
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public static void main(String args[]) {
final SemaphoreTes pool = new SemaphoreTes();
Runnable runner = new Runnable() {
@Override
public void run() {
try {
Object o;
o = pool.getItem();
System.out.println(Thread.currentThread().getName()
+ " acquire " + o);
Thread.sleep(1000);
pool.putItem(o);
System.out.println(Thread.currentThread().getName()
+ " release " + o);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10; i++)// 構(gòu)造 10 個(gè)線程
{
Thread t = new Thread(runner, "t" + i);
t.start();
}
}
// 獲取數(shù)據(jù),需要得到許可
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
// 放回?cái)?shù)據(jù),釋放許可
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
輸出:
t0 acquire AAA
t1 acquire BBB
t2 acquire CCC
t3 acquire DDD
t4 acquire EEE
t0 release AAA
t1 release BBB
t2 release CCC
t6 acquire AAA
t7 acquire BBB
t5 acquire CCC
t8 acquire DDD
t3 release DDD
t9 acquire EEE
t4 release EEE
t6 release AAA
t7 release BBB
t8 release DDD
t5 release CCC
t9 release EEE
信號(hào)量被初始化為容器鎖期望容量的最大值,add操作在向底層容器添加條目之前,需要先獲得一個(gè)許可,并且如果沒有加入任何東西,則立刻釋放許可。同樣,一個(gè)成功過的remove操作釋放ige許可,使得更多的元素能夠加入其中。
將信號(hào)量初始化為1,使得它在使用時(shí)最多只有一個(gè)可用的許可,從而可用作一個(gè)相互排斥的鎖。這通常也成為二進(jìn)制信號(hào)量,因?yàn)樗挥袃煞N狀態(tài):一個(gè)可用的許可,0個(gè)可用的許可。按此方式使用時(shí),二進(jìn)制信號(hào)量具有某種屬性(與很多Lock實(shí)現(xiàn)不同),即可以由線程釋放“鎖”,控制”鎖“,而不是由所有者(因?yàn)樾盘?hào)量沒有權(quán)的概念)。在某些專門的上下文(如死鎖恢復(fù))中會(huì)很有用。
在實(shí)際應(yīng)用中,有時(shí)候需要多個(gè)線程同時(shí)工作以完成同一件事情,而且在完成過程中,往往會(huì)等所欲線程都到達(dá)某一個(gè)階段以后再統(tǒng)一執(zhí)行。
比如有幾個(gè)旅行團(tuán)需要途徑深圳、廣州、最后到達(dá)重慶。旅行團(tuán)中自駕游的,有徒步的,有乘坐旅游大巴的;這些旅行團(tuán)同時(shí)出發(fā),并且每到一個(gè)目的地,都要的古代其他旅行團(tuán)到達(dá)此地以后再同時(shí)出發(fā),直到都到達(dá)終點(diǎn)站重慶。這種情況下,CyclicBarrier就可以用了。
它是一個(gè)同步輔助類,它允許一組線程相互等待,直到到達(dá)某個(gè)公共屏障點(diǎn)(common barrier point)。因?yàn)樵?/font>barrier在釋放等待線程以后可以重用,所以稱為循環(huán)的barrier。CyclicBarrier最重要的屬性就是參與者個(gè)數(shù),另一個(gè)重要方法就是await()。當(dāng)所有線程都調(diào)用了await()后,就表示這些線程都可以繼續(xù)執(zhí)行,否則繼續(xù)等待。
關(guān)卡和閉鎖類似,也是阻塞一組線程,直到某件事情發(fā)生,而不同在于關(guān)卡是等到符合某種條件的所有線程都達(dá)到關(guān)卡點(diǎn)。具體使用上可以用CyclicBarrier來應(yīng)用關(guān)卡。
private int threadCount;
private CyclicBarrier barrier;
private int loopCount = 10;
public CyclicBarrierTest(int threadCount) {
this.threadCount = threadCount;
barrier = new CyclicBarrier(threadCount, new Runnable() {
public void run() {
System.out.println("---------------");
}
});
for (int i = 0; i < threadCount; ++i) {
Thread thread = new Thread("test-thread " + i) {
public void run() {
for (int j = 0; j < loopCount; ++j) {
doTest(j);
try {
/**
* 在到達(dá)10次以后,大家都會(huì)通過
* 在這里通過之前,會(huì)執(zhí)行barrier的那個(gè)回調(diào)方法
* 而且他還可以被循環(huán)使用
*/
barrier.await();
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e) {
return;
}
System.out.println("goon"+j);
}
}
};
thread.start();
}
}
private void doTest(int i) { /* do xxx */
System.out.println("test"+i);
}
public static void main(String args[]){
new CyclicBarrierTest(9);
}
在每個(gè)線程結(jié)束之后執(zhí)行,可以統(tǒng)計(jì)用。
和閉鎖不同的是,他要求子線程之間相互等待,直到大家都達(dá)到后,才能繼續(xù)。
淘寶面試題:如何充分利用多核CPU,計(jì)算很大的List中所有整數(shù)的和
java多線程學(xué)習(xí)-java.util.concurrent詳解(一) Latch/Barrier
提供了一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),一對(duì)線程可以交換數(shù)據(jù)。每個(gè)線程通過exchange()方法的入口提供數(shù)據(jù)給他的伙伴線程,并接收伙伴線程的數(shù)據(jù),并返回。
當(dāng)運(yùn)行不對(duì)稱的活動(dòng)時(shí)很有用,比如當(dāng)一個(gè)線程填充buffer,另一個(gè)線程buffer中消費(fèi)數(shù)據(jù);這些線程可以用exchanger來交換數(shù)據(jù)。當(dāng)兩個(gè)線程通過exchanger交互了對(duì)象,這個(gè)交換對(duì)于連個(gè)線程來說都是安全的。所以在特定的使用場(chǎng)景比較有用(兩個(gè)伙伴線程之間的數(shù)據(jù)交互)
注意:
1.初始化Exchanger對(duì)象時(shí),可以通過泛型指定杯子能交換的信息類型。如“new Exchanger<String>;” 表示只能交換String類型的信息。
2. Exchanger的exchanger方法表示當(dāng)前線程準(zhǔn)備交換信息,等待其他線程與它交換信息。當(dāng)有其他線程調(diào)用該Exchanger對(duì)象的exchange方法時(shí),立即交換信息。
// 描述一個(gè)裝水的杯子
public static class Cup{
// 標(biāo)識(shí)杯子是否有水
private boolean full = false;
public Cup(boolean full){
this.full = full;
}
// 添水,假設(shè)需要5s
public void addWater(){
if (!this.full){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
this.full = true;
}
}
// 喝水,假設(shè)需要10s
public void drinkWater(){
if (this.full){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
this.full = false;
}
}
}
public static void testExchanger() {
// 初始化一個(gè)Exchanger,并規(guī)定可交換的信息類型是杯子
final Exchanger<Cup> exchanger = new Exchanger<Cup>();
// 初始化一個(gè)空的杯子和裝滿水的杯子
final Cup initialEmptyCup = new Cup(false);
final Cup initialFullCup = new Cup(true);
//服務(wù)生線程
class Waiter implements Runnable {
public void run() {
Cup currentCup = initialEmptyCup;
try {
int i=0;
while (i < 2){
System.out.println("服務(wù)生開始往杯子中添水:"
+ System.currentTimeMillis());
// 往空的杯子里加水
currentCup.addWater();
System.out.println("服務(wù)生添水完畢:"
+ System.currentTimeMillis());
// 杯子滿后和顧客的空杯子交換
System.out.println("服務(wù)生等待與顧客交換杯子:"
+ System.currentTimeMillis());
currentCup = exchanger.exchange(currentCup);
System.out.println("服務(wù)生與顧客交換杯子完畢:"
+ System.currentTimeMillis());
i++;
}
} catch (InterruptedException ex) {
}
}
}
//顧客線程
class Customer implements Runnable {
public void run() {
Cup currentCup = initialFullCup;
try {
int i=0;
while (i < 2){
System.out.println("顧客開始喝水:"
+ System.currentTimeMillis());
//把杯子里的水喝掉
currentCup.drinkWater();
System.out.println("顧客喝水完畢:"
+ System.currentTimeMillis());
//將空杯子和服務(wù)生的滿杯子交換
System.out.println("顧客等待與服務(wù)生交換杯子:"
+ System.currentTimeMillis());
currentCup = exchanger.exchange(currentCup);
System.out.println("顧客與服務(wù)生交換杯子完畢:"
+ System.currentTimeMillis());
i++;
}
} catch (InterruptedException ex) {
}
}
}
new Thread(new Waiter()).start();
new Thread(new Customer()).start();
}
public static void main(String[] args) {
ExchangerTest.testExchanger();
}
服務(wù)生開始往杯子中添水:1288093204390
顧客開始喝水:1288093204406
服務(wù)生添水完畢:1288093209390
服務(wù)生等待與顧客交換杯子:1288093209390
顧客喝水完畢:1288093214406
顧客等待與服務(wù)生交換杯子:1288093214406
服務(wù)生與顧客交換杯子完畢:1288093214406
服務(wù)生開始往杯子中添水:1288093214406
顧客與服務(wù)生交換杯子完畢:1288093214406
① Future接口標(biāo)識(shí)異步計(jì)算的結(jié)果,它提供了檢查計(jì)算是否完成的方法isDone(),以等待計(jì)算的完成,并取得計(jì)算的結(jié)果。
② 計(jì)算完成以后只能使用get獲得計(jì)算結(jié)果
③ 有必要,可以在完成前阻塞此方法
④ 取消,則以cancel方法來執(zhí)行
FutureTask類是Future的一個(gè)實(shí)現(xiàn),并且實(shí)現(xiàn)了Runnable,所以可以通過Executor(線程池)來執(zhí)行。
如過在主線程中需要執(zhí)行比較耗時(shí)的操作,但又不想阻塞主線程時(shí),可以把這些業(yè)務(wù)交給Future對(duì)象在后臺(tái)完成,當(dāng)主線程將來需要時(shí),可以通Future對(duì)象獲得后臺(tái)作業(yè)的計(jì)算結(jié)果或者執(zhí)行狀態(tài)。
個(gè)人的感覺來看,Future和Runnable和
聯(lián)系客服