Chinaunix首页 | 论坛 | 博客
  • 博客访问: 197526
  • 博文数量: 8
  • 博客积分: 1410
  • 博客等级: 上尉
  • 技术积分: 502
  • 用 户 组: 普通用户
  • 注册时间: 2006-05-29 08:41
文章分类

全部博文(8)

文章存档

2008年(8)

我的朋友

分类: Java

2008-05-23 20:14:32

1)        CoherenceSessionHelpertransport manager添加到当前session中支持serverdatabase两种session

package com.oocl.isdc.sha.frm.tts.remotecommand;

 

import oracle.toplink.remotecommand.CommandProcessor;

import oracle.toplink.remotecommand.RemoteCommandManager;

import oracle.toplink.sessions.DatabaseSession;

import oracle.toplink.sessions.Session;

import oracle.toplink.threetier.Server;

 

import com.oocl.isdc.sha.frm.tts.config.TTSConfigConstants;

import com.oocl.isdc.sha.frm.tts.config.TTSConfigParser;

 

public class CoherenceSessionHelper {

    public static Session addCoherenceTransportManagerToSession(Session session) {

        RemoteCommandManager commandMgr = new RemoteCommandManager((CommandProcessor) session);

        commandMgr.setChannel((String) TTSConfigParser.getInstance().get(

                TTSConfigConstants.TOPLINK_COMMAND_CHANNEL_KEY));

        CoherenceTransportManager tm = new CoherenceTransportManager(commandMgr);

        tm.setInitialContextFactoryName(TTSConfigConstants.TTS_CONTEXT_FACTOYR_NAME);

        commandMgr.setUrl(tm.getServiceUrl());

        commandMgr.setTransportManager(tm);

        tm.setShouldRemoveConnectionOnError(true);

        if (session instanceof Server) {

            ((Server) session).setCommandManager(commandMgr);

            ((Server) session).setShouldPropagateChanges(true);

            ((Server) session).getCommandManager().initialize();

        } else if (session instanceof DatabaseSession) {

            ((DatabaseSession) session).setCommandManager(commandMgr);

            ((DatabaseSession) session).setShouldPropagateChanges(true);

            ((DatabaseSession) session).getCommandManager().initialize();

        } else {

            throw new IllegalArgumentException("Session must be a server or database session");

        }

        return session;

    }

}

2)        CoherenceCache:自定义一个Coherence NamedCache

 

package com.oocl.isdc.sha.frm.tts.cohererence.cache;

 

import java.net.InetAddress;

import java.util.Collection;

 

import com.oocl.isdc.sha.frm.tts.config.TTSConfigConstants;

import com.tangosol.net.CacheFactory;

import com.tangosol.net.Cluster;

import com.tangosol.net.Member;

import com.tangosol.net.NamedCache;

import com.tangosol.net.Service;

import com.tangosol.util.MapListener;

 

public class CoherenceCache {

    private NamedCache namedCache;

 

    public CoherenceCache() {

        this.namedCache = CacheFactory.getCache("toplinkSyn");

    }

 

    public void putCache(Object key, Object value) {

        namedCache.put(key, value);

    }

 

    public Object retrieveCache(Object key) {

        return namedCache.get(key);

    }

 

    public NamedCache getNamedCache() {

        return namedCache;

    }

   

    public void addMapListener(MapListener listener) {

        this.namedCache.addMapListener(listener);

    }

   

    public void removeMapListener(MapListener listener) {

        this.namedCache.removeMapListener(listener);

    }

   

    @SuppressWarnings("unchecked")

    public Collection retrieveCacheAll() {

       return  this.namedCache.values();

    }

   

    public String getUrl() {

        Member member = getLocalMember();

        InetAddress address = member.getAddress();

        String ipAddress = address.getHostAddress();

        int port = member.getPort();

        StringBuffer sb = new StringBuffer(TTSConfigConstants.TOPLINK_SERVICEID_PREFIX)

            .append(ipAddress).append(":").append(port);

        return sb.toString();

    }

   

    public Member getLocalMember() {

        Service service = namedCache.getCacheService();

        Cluster cluster = service.getCluster();   

        Member member = cluster.getLocalMember();

        return member;

    }

}

 

3)        CoherenceDiscoveryManager继承DiscoveryManager,主要重写了startDiscoverystopDiscovery方法

 

package com.oocl.isdc.sha.frm.tts.remotecommand;

 

import oracle.toplink.exceptions.ValidationException;

import oracle.toplink.remotecommand.DiscoveryManager;

import oracle.toplink.remotecommand.RemoteCommandManager;

 

public class CoherenceDiscoveryManager extends DiscoveryManager {

   

    public CoherenceDiscoveryManager(RemoteCommandManager rcm) {

        super(rcm);

    }

 

    public RemoteCommandManager getRemoteCommandManager() {

        return this.rcm;

    }

   

    public boolean isDiscoveryStopped() {

        throw ValidationException.operationNotSupported("isDiscoveryStopped");

    }

 

    public void startDiscovery() {

        ((CoherenceTransportManager)rcm.getTransportManager()).createLocalConnection();

    }

   

    /**

     * We must implement this method, and we keep it blank

     */

    public void stopDiscovery() {

     

    }

 

    public void setAnnouncementDelay(int millisecondsToDelay) {

        throw ValidationException.operationNotSupported("setAnnouncementDelay");

    }

 

    public int getAnnouncementDelay() {

        throw ValidationException.operationNotSupported("getAnnouncementDelay");

    }

 

    public String getMulticastGroupAddress() {

        throw ValidationException.operationNotSupported("getMulticastGroupAddress");

    }

 

    public void setMulticastGroupAddress(String address) {

        throw ValidationException.operationNotSupported("setMulticastGroupAddress");

    }

 

 

    public void setMulticastPort(int port) {

        throw ValidationException.operationNotSupported("setMulticastPort");

    }

 

    public int getMulticastPort() {

        throw ValidationException.operationNotSupported("getMulticastPort");

    }

}

 

3           命令行下mvn clean install部署Demo,部署的oc4j instance信息需要在ear下的pom.xml中给出。

    <properties>

       <home.j2ee>D:/oc4j_extended_101310/j2ee/homehome.j2ee>

       <oc4j.host>localhostoc4j.host>

       <rmi.port>23791rmi.port>

       <deploy.username>oc4jadmindeploy.username>

       <deploy.password>welcomedeploy.password>

    properties>

因为我们需要检测cache同步还需要部署到另外一个oc4j instance上,需要修改build.properties为:

coherence.member.name = tts-server1

coherence.local.address = 146.222.51.20

coherence.local.port = 8088

 

ear下的pom.xml为:

<properties>

       <home.j2ee>C:/oc4j_extended_101310/j2ee/homehome.j2ee>

       <oc4j.host>localhostoc4j.host>

       <rmi.port>23792rmi.port>

       <deploy.username>oc4jadmindeploy.username>

       <deploy.password>welcomedeploy.password>

    properties>

 

4           通过浏览器访问demo,并检测cache实现,我们在一个oc4j instance上修改employee的数据在另外一个oc4jinstance中就会立即呈现这个改变。并可以看到merge employee的相关toplinklog。如果没有cache同步(democachecache 同步策略都基本采用了toplink的默认配置),因为employee上有乐观锁,当在另外一个oc4j instance或者说服务器上修改employee的数据,就会出现乐观锁异常,会rollback此次修改,如果有cache同步就会拿到最新的数据而不需要直接访问db,最新的数据和db一致,大大减少了乐观锁出现的频率。当然为了辅助更好的使用cache同步我们还需要定义cache invalidation机制。当然还有很多其他的cache策略避免出现脏数据。

Toplink Log

[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Received remote command oracle.toplink.remotecommand.MergeChangeSetCommand from Service[OOCL Toplink Coherence, 26464827, tcmp://146.222.51.20:8089]

[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Executing command oracle.toplink.remotecommand.MergeChangeSetCommand from Service[OOCL Toplink Coherence, 26464827, tcmp://146.222.51.20:8089]

[TopLink 较详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Received updates from Remote Server

[TopLink 非常详细]: 2008.05.23 07:50:16.355--ServerSession(7674162)--Thread(Thread[DistributedCache:EventDispatcher,5,Cluster])--Merging com.oocl.isdc.sha.frm.tts.model.Employee: [558] from remote server

页面演示:

 

 

 

上面两个网页截图上的url表明是两个不同的oc4j server。并且他们用了同一个db也是同一个application。在在server1上修改数据:

 

 

阅读(1311) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~