在多线程交互的中2,经常有一个线程需要得到另个一线程的计算结果,我们常用的是Future异步模式来加以解决。
Future顾名思意,有点像期货市场的“期权”,是“对未来的一种凭证”,例如当我们买了某个房地产开发商的期房,交钱之后,开发商会给我们一个凭证
(期权),这个凭证告诉我们等明年某个时候拿这个凭证就可以拿到我们所需要的房子,但是现在房子还没建好。市场上之所以有“期货”,也正由于有这种需求,
才有这种供给。
这种应用在GUI上用的比较多,在设计模式中一般称为“虚拟代理模式”。
例如:现在有个这样的需求,Client向Server提交一个Request(int count,char
c),希望获取一个由count个字符c构造出来的字符串。比如发送Request(10,'K'),那么反馈字符串“KKKKKKKKKK”,但是我们
假设这个生成字符串的过程很费时间。
于是,为了获取比较好的交互性,我们的Server收到请求后,先构造一个FutureData,并把这个所谓的“期权(未来凭证)”反馈给
Client;于此同时,通过另一个并发线程去构造一个真正的字符串RealData,并在构造完毕后,RealData给FutureData报告一个
消息,说数据(期房)已经准备好了,此时Client可以通过期权拿到期房,但是假如我们的Client比较着急,还没等房子假好的时,就想要房子,怎么
办呢?这个时候我们可以阻塞Client所在的线程,让Client等待,直到最后RealData通知FutureData说房子好了,才返回。
这里的要点:
(1)Server先给Client一个“期权”,同时开一个线程去干活建房子(未来的“现房”);
(2)当“现房”RealData准备好了的时候,如何告诉FutureData说已经准备好了。(本处采用“回调过程”(借用观察者模式,来实现回调))
(3)如果客户比较着急,现房还没准备好的时候,就要取房,怎么办? 本处采用“阻塞”。
Data(公共数据接口)
- package com.umpay.future;
-
- public interface Data {
- public abstract String getContent();
- }
package com.umpay.future;
public interface Data {
public abstract String getContent();
}
FutureData(期权)
- package com.umpay.future.extend;
-
- import java.util.Observable;
- import java.util.Observer;
-
- import com.umpay.future.Data;
-
- public class FutureData2 implements Data,Observer {
-
-
-
-
-
-
- private volatile RealData2 realData2 = null;
-
-
-
- public boolean isFinished() {
- return realData2 != null;
- }
-
-
-
-
-
- public String getContent() {
- synchronized (mutex) {
- while(!isFinished()) {
- try {
- mutex.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- return realData2.getContent();
- }
- }
-
-
-
-
-
-
-
-
- public void update(Observable realData, Object event) {
- System.out.println("通知...."+event);
- if(!(realData instanceof RealData2)) {
- throw new IllegalArgumentException("主题的数据类型必须是RealData2");
- }
- if(!(event instanceof String)) {
- throw new IllegalArgumentException("事件的数据类型必须是String");
- }
- synchronized (mutex) {
- if(isFinished()) {
- mutex.notifyAll();
- return;
- }
- if("Finished".equals(event)) {
- realData2 = (RealData2)realData;
- mutex.notifyAll();
- }
- }
- }
-
- private Object mutex = new Object();
- }
package com.umpay.future.extend;
import java.util.Observable;
import java.util.Observer;
import com.umpay.future.Data;
public class FutureData2 implements Data,Observer {
/**
* 存放真实数据,并且标志真正的数据是否已经准备完毕
* 被多线程享受
* 如果realData2==null,表示数据还准备好
* */
private volatile RealData2 realData2 = null;
/**
* 查看真正的数据是否准备完毕
* */
public boolean isFinished() {
return realData2 != null;
}
/**
* 如果数据已经准备好,则返回真正的数据;
* 否则,阻塞调用线程,直到数据准备完毕后,才返回真实数据;
* */
public String getContent() {
synchronized (mutex) {
while(!isFinished()) {//只要数据没有准备完毕,就阻塞调用线程
try {
mutex.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData2.getContent();
}
}
/**
* 当 RealData2 准备完数据后,RealData2 应该通知 FutureData2 数据准备完毕。
* 并在输入参数 realData 传入真实数据,在参数 event 传入事件(比如数据如期准备好了,或出了什么异常)
*
* @param realData 真实的数据
* @param event 事件类型
* */
public void update(Observable realData, Object event) {
System.out.println("通知...."+event);
if(!(realData instanceof RealData2)) {
throw new IllegalArgumentException("主题的数据类型必须是RealData2");
}
if(!(event instanceof String)) {
throw new IllegalArgumentException("事件的数据类型必须是String");
}
synchronized (mutex) {
if(isFinished()) {
mutex.notifyAll();
return;//如果数据已经准备好了,直接返回.
}
if("Finished".equals(event)) {
realData2 = (RealData2)realData;//数据准备好了的时候,便可以通知数据准备好了
mutex.notifyAll();//唤醒被阻塞的线程
}
}
}
private Object mutex = new Object();
}
RealData(实际数据)
- package com.umpay.future.extend;
-
- import java.util.Observable;
-
- import com.umpay.future.Data;
-
- public class RealData2 extends Observable implements Data {
-
- private String content;
-
- public RealData2() {
-
- }
-
- public void createRealData2(int count, char c) {
- System.out.println(" making RealData(" + count + ", " + c
- + ") BEGIN");
- char[] buffer = new char[count];
- for (int i = 0; i < count; i++) {
- buffer[i] = c;
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- }
- System.out.println(" making RealData(" + count + ", " + c
- + ") END");
- this.content = new String(buffer);
-
-
- setChanged();
- notifyObservers("Finished");
- }
-
-
- public String getContent() {
- return content;
- }
- }
package com.umpay.future.extend;
import java.util.Observable;
import com.umpay.future.Data;
public class RealData2 extends Observable implements Data {
private String content;
public RealData2() {
}
public void createRealData2(int count, char c) {
System.out.println(" making RealData(" + count + ", " + c
+ ") BEGIN");
char[] buffer = new char[count];
for (int i = 0; i < count; i++) {
buffer[i] = c;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
System.out.println(" making RealData(" + count + ", " + c
+ ") END");
this.content = new String(buffer);
//真实数据准备完毕了,通知FutureData2说数据已经准备好了.
setChanged();//必须先设置本对象的状态发生了变化,并且通知所有的观察者
notifyObservers("Finished");
}
public String getContent() {
return content;
}
}
服务端代码:
- package com.umpay.future.extend;
-
- import com.umpay.future.Data;
-
- public class HostServer2 {
-
- public Data request(final int count, final char c) {
- System.out.println(" request(" + count + ", " + c + ") BEGIN");
-
-
- final FutureData2 future2 = new FutureData2();
-
-
- new Thread() {
- public void run() {
- RealData2 realdata2 = new RealData2();
- realdata2.addObserver(future2);
- realdata2.createRealData2(count, c);
- }
- }.start();
-
- System.out.println(" request(" + count + ", " + c + ") END");
-
-
- return future2;
- }
-
- }
package com.umpay.future.extend;
import com.umpay.future.Data;
public class HostServer2 {
public Data request(final int count, final char c) {
System.out.println(" request(" + count + ", " + c + ") BEGIN");
// (1) 建立FutureData的实体
final FutureData2 future2 = new FutureData2();
// (2) 为了建立RealData的实体,启动新的线程
new Thread() {
public void run() {
RealData2 realdata2 = new RealData2();
realdata2.addObserver(future2);//以便当RealData2把数据准备完毕后,通过该回调口子,通知FutureData2表示数据已经贮备好了
realdata2.createRealData2(count, c);
}
}.start();
System.out.println(" request(" + count + ", " + c + ") END");
// (3) 取回FutureData实体,作为传回值
return future2;
}
}
客户端代码:
- package com.umpay.future;
-
- import com.umpay.future.extend.HostServer2;
-
- public class MainClient {
- public static void main(String[] args) {
-
- testHostServer2();
- }
-
- static void testHostServer() {
- System.out.println("main BEGIN");
- HostServer hostServer = new HostServer();
- Data data1 = hostServer.request(10, 'A');
- Data data2 = hostServer.request(20, 'B');
- Data data3 = hostServer.request(30, 'C');
-
- System.out.println("main otherJob BEGIN");
-
-
-
-
- System.out.println("main otherJob END");
-
- System.out.println("data1 = " + data1.getContent());
- System.out.println("data2 = " + data2.getContent());
- System.out.println("data3 = " + data3.getContent());
- System.out.println("main END");
-
- }
-
- static void testHostServer2() {
- System.out.println("main BEGIN");
- HostServer2 hostServer2 = new HostServer2();
- Data data1 = hostServer2.request(10, 'A');
- Data data2 = hostServer2.request(20, 'B');
- Data data3 = hostServer2.request(30, 'C');
-
- System.out.println("main otherJob BEGIN");
-
-
-
-
- System.out.println("main otherJob END");
-
- System.out.println("data1 = " + data1.getContent());
- System.out.println("data2 = " + data2.getContent());
- System.out.println("data3 = " + data3.getContent());
- System.out.println("main END");
-
- }
- }
阅读(523) | 评论(1) | 转发(0) |