这点不谈异步,会放在另一节讲。
1) 基础但实际可能不会常用:
synchronized(obj) {
while ( x > 1){
try {
obj.wait()
} catch (InterruptedException e) {}
obj.notifyAll();
}
}
sychronized block on individual attirbutes = Lock for each attribute
Lock.lock()/unlock() => lock.writeLock() = ReentrantReadWriteLock
ReentrantLock.newCondition() => condition1.await() / .signalAll()
2) 高级的常用的
new Semaphore(number) .acquire()/.release() =》 number 为1则退化为mutex,
number 大于1 则为资源池的mutex, 编程模型如下
try {
semaphore.acquire()
int index = getFree()
free[index] = true
} catch (InterruptedException e) {}
finally { semaphore.release() }
注意getFree()本身需要一个Lock, pool.lokc()/.unlock()
getFree() {
for(int i =0 ; i
ret = i; free[i] = false; // 占用
}
}
等待多个并发事件发生完,模型如下
每个参与者到达:
new CountDownLatch(number)
public void arrive(String name){
countdownLatch.countDown()
}
主线程等待者:
wait for all the participants
try { countdownLatch.await() }
catch (InterruptedException e) {}
相似的还有一个 new CyclicBarrier(Participants_Count, Runnable() ), 跟 CountdownLatch差不多,只是可以重置,传一个回调在集合点运行
每个参与者 调用 barrier.await()
多阶段任务同步可以用 new Phaser(Participants_Count) , 也就说要有Participants_Count 这么多个线程要在某个阶段结束前调用arriveAndAwaitAdvance()
【 The Phaser class provides us with the mechanism to synchronize the threads at the end of each step, so no thread starts its second step until all the threads have finished the first one. 】
每个参与者线程的主代码如下,基本上每上步都要等其它的参与者完成前一步
注意第一次调用只是让各个参与者注册,只有全部注册完毕,第一阶段才会统一开始 【won't begin until all the participant threads have been created.】
phaser.arriveAndAwaitAdvance()
真正开始
phase1()
phaser.arriveAndAwaitAdvance()
phase2()
phaser.arriveAndAwaitAdvance()
............
finalPhase()
phaser.arriveAndDeregister()
【Registering participants in the Phaser】
When you create a Phaser object, you indicate how many participants will have that phaser. But the Phaser class has two methods to increment the number of participants of a phaser. These methods are as follows:
f register(): This method adds a new participant to Phaser. This new participant will be considered as unarrived to the actual phase.
f bulkRegister(int Parties): This method adds the specified number of participants to the phaser. These new participants will be considered as unarrived to the actual phase.
The only method provided by the Phaser class to decrement the number of participants is the arriveAndDeregister() method that notifies the phaser that the thread has finished the actual phase, and it doesn't want to continue with the phased operation.
【Forcing the termination of a Phaser】
When a phaser has zero participants, it enters a state denoted by Termination. The Phaser class provides forceTermination() to change the status of the phaser and makes it enter in the Termination state independently of the number of participants registered in the phaser. This mechanism may be useful when one of the participants has an error situation, to force the termination of the phaser.
When a phaser is in the Termination state, the awaitAdvance() and arriveAndAwaitAdvance() methods immediately return a negative number, instead of a positive one that returns normally. If you know that your phaser could be terminated, you should verify the return value of those methods to know if the phaser has been terminated.
注意 Phaser有一个回调函数 onAdvance(), 可以在前进的每一阶段调用。比哪说有3个阶段,每个阶段打印一些信息:
public class MyPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
return studentsArrived();
case 1:
return finishFirstExercise();
case 2:
return finishSecondExercise();
case 3:
return finishExam();
default:
return true;
}
}
最后一个,在同步点的参与者线程间交换数据
Exchanger> exchanger=new Exchanger<>();
参与者线程调用 exchanger.exchange()
阅读(1241) | 评论(0) | 转发(0) |