GemFire 客服端/服务器 缓存模式能提供动态的服务器连接池管理,均衡和有条件的服务器加载,逻辑的服务器组管理,以及对高效**器的自动失效备援。
和以往旧版本的GemFire相比,在分布式系统的连接管理中,旧版本是采用DistributedSystem类进行连接管理。最新版本提供了CacheServer 和 CacheClient类实现客户端/服务器模式。以下是一个简单的例子:
服务器配置:
1.Server.xml
-
xml version="1.0"?>
-
-
quickstart.SimpleCacheLoader
2.服务器在这里使用的是GemFire cacheserver 进程。
客户端配置:
1. Client.xml
-
<client-cache>
-
<pool name="client" subscription-enabled="true">
-
<server host="localhost" port="40404" />
-
pool>
-
-
<region name="exampleRegion">
-
<region-attributes refid="CACHING_PROXY">
-
<cache-listener>
-
<class-name>quickstart.SimpleCacheListenerclass-name>
-
cache-listener>
-
region-attributes>
-
region>
-
client-cache>
quickstart.SimpleCacheListener
2. ClientWorker.java,该类用于往cache中get和put数据。
-
public class ClientWorker {
-
-
public static final String EXAMPLE_REGION_NAME = "exampleRegion";
-
-
public static void main(String[] args) throws Exception {
-
-
System.out.println("Connecting to the distributed system and creating the cache.");
-
-
ClientCache cache = new ClientCacheFactory()
-
.set("name", "ClientWorker")
-
.set("cache-xml-file", "xml/Client.xml")
-
.create();
-
-
-
Region exampleRegion = cache.getRegion(EXAMPLE_REGION_NAME);
-
System.out.println("Example region \"" + exampleRegion.getFullPath() + "\" created in cache.");
-
System.out.println();
-
System.out.println("Getting three values from the cache server.");
-
System.out.println("This will cause the server's loader to run, which will add the values");
-
System.out.println("to the server cache and return them to me. The values will also be");
-
System.out.println("forwarded to any other client that has subscribed to the region.");
-
-
-
for (int count = 0; count < 3; count++) {
-
String key = "key" + count;
-
System.out.println("Getting key " + key);
-
exampleRegion.get(key);
-
}
-
-
System.out.println("Note the other client's region listener in response to these gets.");
-
System.out.println("Press Enter to continue.");
-
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
-
bufferedReader.readLine();
-
-
System.out.println("Changing the data in my cache - all destroys and updates are forwarded");
-
System.out.println("through the server to other clients. Invalidations are not forwarded.");
-
-
-
System.out.println("Putting new value for key0");
-
exampleRegion.put("key0", "ClientValue0");
-
-
-
System.out.println("Invalidating key1");
-
exampleRegion.invalidate("key1");
-
-
-
System.out.println("Destroying key2");
-
exampleRegion.destroy("key2");
-
-
-
System.out.println("Closing the cache and disconnecting.");
-
cache.close();
-
-
System.out.println("In the other session, please hit Enter in the Consumer client");
-
System.out.println("and then stop the cacheserver with 'cacheserver stop'.");
-
}
-
}
public class ClientWorker {
public static final String EXAMPLE_REGION_NAME = "exampleRegion";
public static void main(String[] args) throws Exception {
System.out.println("Connecting to the distributed system and creating the cache.");
// Create the cache which causes the cache-xml-file to be parsed
ClientCache cache = new ClientCacheFactory()
.set("name", "ClientWorker")
.set("cache-xml-file", "xml/Client.xml")
.create();
// Get the exampleRegion
Region exampleRegion = cache.getRegion(EXAMPLE_REGION_NAME);
System.out.println("Example region \"" + exampleRegion.getFullPath() + "\" created in cache.");
System.out.println();
System.out.println("Getting three values from the cache server.");
System.out.println("This will cause the server's loader to run, which will add the values");
System.out.println("to the server cache and return them to me. The values will also be");
System.out.println("forwarded to any other client that has subscribed to the region.");
// Get three values from the cache
for (int count = 0; count < 3; count++) {
String key = "key" + count;
System.out.println("Getting key " + key);
exampleRegion.get(key);
}
System.out.println("Note the other client's region listener in response to these gets.");
System.out.println("Press Enter to continue.");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
bufferedReader.readLine();
System.out.println("Changing the data in my cache - all destroys and updates are forwarded");
System.out.println("through the server to other clients. Invalidations are not forwarded.");
// Update one value in the cache
System.out.println("Putting new value for key0");
exampleRegion.put("key0", "ClientValue0");
// Invalidate one entry in the cache
System.out.println("Invalidating key1");
exampleRegion.invalidate("key1");
// Destroy one entry in the cache
System.out.println("Destroying key2");
exampleRegion.destroy("key2");
// Close the cache and disconnect from GemFire distributed system
System.out.println("Closing the cache and disconnecting.");
cache.close();
System.out.println("In the other session, please hit Enter in the Consumer client");
System.out.println("and then stop the cacheserver with 'cacheserver stop'.");
}
}
3. ClientConsumer.java,模拟从server端获取数据。
-
public class ClientConsumer {
-
-
public static final String USAGE = "Usage: java ClientConsumer \n"
-
+ " register-interest-type may be one of the following:\n"
-
+ " all-keys Register interest in all keys on the server\n"
-
+ " keyset Register interest in a set of keys on the server\n"
-
+ " regex Register interest in keys on the server matching a regular expression\n";
-
-
public static final String EXAMPLE_REGION_NAME = "exampleRegion";
-
-
private static enum RegisterInterestType {
-
ALL_KEYS, KEYSET, REGEX
-
}
-
-
public static void main(String[] args) throws Exception {
-
-
if (args.length != 1) {
-
System.out.println(USAGE);
-
System.exit(1);
-
}
-
-
RegisterInterestType registerInterestType;
-
if (args[0].equals("all-keys")) {
-
registerInterestType = RegisterInterestType.ALL_KEYS;
-
} else if (args[0].equals("keyset")) {
-
registerInterestType = RegisterInterestType.KEYSET;
-
} else if (args[0].equals("regex")) {
-
registerInterestType = RegisterInterestType.REGEX;
-
} else {
-
registerInterestType = null;
-
System.out.println(USAGE);
-
System.exit(2);
-
}
-
-
-
System.out.println("Connecting to the distributed system and creating the cache.");
-
-
ClientCache cache = new ClientCacheFactory()
-
.set("name", "ClientConsumer")
-
.set("cache-xml-file", "xml/Client.xml")
-
.create();
-
-
-
Region exampleRegion = cache.getRegion(EXAMPLE_REGION_NAME);
-
System.out.println("Example region \"" + exampleRegion.getFullPath() + "\" created in cache. ");
-
-
switch (registerInterestType) {
-
case ALL_KEYS:
-
System.out.println("Asking the server to send me all data additions, updates, and destroys. ");
-
exampleRegion.registerInterest("ALL_KEYS");
-
break;
-
case KEYSET:
-
System.out.println("Asking the server to send me events for data with these keys: 'key0', 'key1'");
-
exampleRegion.registerInterest("key0");
-
exampleRegion.registerInterest("key1");
-
break;
-
case REGEX:
-
System.out.println("Asking the server to register interest in keys matching this");
-
System.out.println("regular expression: 'k.*2'");
-
exampleRegion.registerInterestRegex("k.*2");
-
break;
-
default:
-
-
throw new RuntimeException();
-
}
-
-
System.out.println("The data region has a listener that reports all changes to standard out.");
-
System.out.println();
-
System.out.println("Please run the worker client in another session. It will update the");
-
System.out.println("cache and the server will forward the updates to me. Note the listener");
-
System.out.println("output in this session.");
-
System.out.println();
-
System.out.println("When the other client finishes, hit Enter to exit this program.");
-
-
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
-
bufferedReader.readLine();
-
-
System.out.println("Closing the cache and disconnecting.");
-
cache.close();
-
}
-
}
public class ClientConsumer {
public static final String USAGE = "Usage: java ClientConsumer \n"
+ " register-interest-type may be one of the following:\n"
+ " all-keys Register interest in all keys on the server\n"
+ " keyset Register interest in a set of keys on the server\n"
+ " regex Register interest in keys on the server matching a regular expression\n";
public static final String EXAMPLE_REGION_NAME = "exampleRegion";
private static enum RegisterInterestType {
ALL_KEYS, KEYSET, REGEX
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.out.println(USAGE);
System.exit(1);
}
RegisterInterestType registerInterestType;
if (args[0].equals("all-keys")) {
registerInterestType = RegisterInterestType.ALL_KEYS;
} else if (args[0].equals("keyset")) {
registerInterestType = RegisterInterestType.KEYSET;
} else if (args[0].equals("regex")) {
registerInterestType = RegisterInterestType.REGEX;
} else {
registerInterestType = null;
System.out.println(USAGE);
System.exit(2);
}
// Subscribe to the indicated key set
System.out.println("Connecting to the distributed system and creating the cache.");
// Create the cache which causes the cache-xml-file to be parsed
ClientCache cache = new ClientCacheFactory()
.set("name", "ClientConsumer")
.set("cache-xml-file", "xml/Client.xml")
.create();
// Get the exampleRegion which is a subregion of /root
Region exampleRegion = cache.getRegion(EXAMPLE_REGION_NAME);
System.out.println("Example region \"" + exampleRegion.getFullPath() + "\" created in cache. ");
switch (registerInterestType) {
case ALL_KEYS:
System.out.println("Asking the server to send me all data additions, updates, and destroys. ");
exampleRegion.registerInterest("ALL_KEYS");
break;
case KEYSET:
System.out.println("Asking the server to send me events for data with these keys: 'key0', 'key1'");
exampleRegion.registerInterest("key0");
exampleRegion.registerInterest("key1");
break;
case REGEX:
System.out.println("Asking the server to register interest in keys matching this");
System.out.println("regular expression: 'k.*2'");
exampleRegion.registerInterestRegex("k.*2");
break;
default:
// Can't happen
throw new RuntimeException();
}
System.out.println("The data region has a listener that reports all changes to standard out.");
System.out.println();
System.out.println("Please run the worker client in another session. It will update the");
System.out.println("cache and the server will forward the updates to me. Note the listener");
System.out.println("output in this session.");
System.out.println();
System.out.println("When the other client finishes, hit Enter to exit this program.");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
bufferedReader.readLine();
System.out.println("Closing the cache and disconnecting.");
cache.close();
}
}
4. SimpleCacheLoader.java,如果该Region在服务器端没有数据,将自动加载数据。
-
public class SimpleCacheLoader implements CacheLoader, Declarable {
-
-
public String load(LoaderHelper helper) {
-
String key = helper.getKey();
-
System.out.println(" Loader called to retrieve value for " + key);
-
-
-
return "LoadedValue" + (Integer.parseInt(key.substring(3)));
-
}
-
-
public void close() {
-
-
}
-
-
public void init(Properties props) {
-
-
}
-
}
public class SimpleCacheLoader implements CacheLoader, Declarable {
public String load(LoaderHelper helper) {
String key = helper.getKey();
System.out.println(" Loader called to retrieve value for " + key);
// Create a value using the suffix number of the key (key1, key2, etc.)
return "LoadedValue" + (Integer.parseInt(key.substring(3)));
}
public void close() {
// do nothing
}
public void init(Properties props) {
// do nothing
}
}
5. SimpleCacheListener.java,一个异步的监听器用于监听本地缓存的变化。
-
public class SimpleCacheListener extends CacheListenerAdapter implements Declarable {
-
-
public void afterCreate(EntryEvent e) {
-
System.out.println(" Received afterCreate event for entry: " +
-
e.getKey() + ", " + e.getNewValue());
-
}
-
-
public void afterUpdate(EntryEvent e) {
-
System.out.println(" Received afterUpdate event for entry: " +
-
e.getKey() + ", " + e.getNewValue());
-
}
-
-
public void afterDestroy(EntryEvent e) {
-
System.out.println(" Received afterDestroy event for entry: " +
-
e.getKey());
-
}
-
-
public void afterInvalidate(EntryEvent e) {
-
System.out.println(" Received afterInvalidate event for entry: " +
-
e.getKey());
-
}
-
-
public void afterRegionLive(RegionEvent e) {
-
System.out.println(" Received afterRegionLive event, sent to durable clients after \nthe server has finished replaying stored events. ");
-
}
-
-
public void init(Properties props) {
-
-
}
-
-
}
public class SimpleCacheListener extends CacheListenerAdapter implements Declarable {
public void afterCreate(EntryEvent e) {
System.out.println(" Received afterCreate event for entry: " +
e.getKey() + ", " + e.getNewValue());
}
public void afterUpdate(EntryEvent e) {
System.out.println(" Received afterUpdate event for entry: " +
e.getKey() + ", " + e.getNewValue());
}
public void afterDestroy(EntryEvent e) {
System.out.println(" Received afterDestroy event for entry: " +
e.getKey());
}
public void afterInvalidate(EntryEvent e) {
System.out.println(" Received afterInvalidate event for entry: " +
e.getKey());
}
public void afterRegionLive(RegionEvent e) {
System.out.println(" Received afterRegionLive event, sent to durable clients after \nthe server has finished replaying stored events. ");
}
public void init(Properties props) {
// do nothing
}
}
启动步骤:
1. 启动GemFire cacheserver:
-
cacheserver start cache-xml-file=xml/Server.xml
cacheserver start cache-xml-file=xml/Server.xml
2. 启动customer client:
-
java quickstart.ClientConsumer all-keys
java quickstart.ClientConsumer all-keys
3. 启动ClientWorker:
-
java quickstart.ClientWorker
java quickstart.ClientWorker
4. 当所有客户端退出后,停止cacheserver:
cacheserver stop
阅读(2599) | 评论(0) | 转发(0) |