Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1169191
  • 博文数量: 234
  • 博客积分: 5592
  • 博客等级: 大校
  • 技术积分: 1987
  • 用 户 组: 普通用户
  • 注册时间: 2009-12-23 14:12
文章分类

全部博文(234)

文章存档

2015年(1)

2013年(4)

2012年(16)

2011年(204)

2010年(9)

分类: Python/Ruby

2011-09-06 15:24:01

1水平分库

最近在做一个IM系统,之前的旧系统没有考虑到用户量会增长得这么庞大,导致现在数据库性能瓶颈非常严重,迫切需要分库,用于减少每个库的用户数量,进而分摊负载,最终达到数据库横向扩展的目的。

数据库水平分库是以用户Id为分库的依据,同一个用户的所有数据都在同一个库上,每个库有着相同的表结构。为了实现开发人员来说对数据库的透明访问,分库框架需要解决二个问题:
1、 方法参数中有用户id的数据的新增,查询及修改
2、 方法参数中无用户id的数据的查询

 

2用户id

把用户名和密码所在的表定义为用户表,用户id即是用户表中的惟一性标识的整形值,如果用户的用户名只有一种方式,那么id可以是用户名的hash值,此时用户表也是分库的;如果用户的用户名有多种方式,比如允许用户使用email登陆,也允许用户使用手机号码登陆,那么用户id应该是用户表中的递增字段值,此时用户表应该是不分库的,这时可以把用户表独立为另一个库,称之为认证库。我们的项目应用是属于后者。

1 3 解决方案 3.1    说明

简单服务即为DAO,每个domain都对应一个简单服务,简单服务之间不允许互相依赖;复杂服务可以依赖多个简单服务,但不能直接访问数据库,复杂服务对数据库的操作必须通过单简单服务。

使用hibernate作为访问数据库的中间层,结合SpringAop拦截方法,简单服务代理与简单服务实现相同的接口,一个简单服务对应二个实例,一个引用动态获取数据库连接的sessionFactory,另一个引用Hibernate ShardssessionFactory

3.2    方法参数中有用户Id

Spring Aop拦截简单服务代理的所有方法,如果方法的第一个参数为userid,则将userid

放到当前线程中,并选择引用动态获取数据库连接的sessionFactory的简单服务实例,在获取数据库连接时根据当前线程的userid选择相应连接,流程如下:

 分库,有id

 

3.3    方法参数中无用户Id

Spring Aop拦截简单服务代理的所有方法,如果方法的第一个参数为非userid,选择引用Hibernate ShardssessionFactory的简单服务实例,遍历所有数据库,并返回汇总后的数据。这种情况下只允许读,不允许写。流程如下:

分库,无id

1  4实现 4.1    简单服务代理

对每个简单服务用jdk动态代理生成一个代理对像,复杂服务依赖代理对像。

4.2    实例化

在简单服务类上标注@DetachDbService,则会产生三个实例(框架实现):

1.       简单服务代理实例

2.       引用动态获取数据库连接的sessionFactory的简单服务实例

3.       引用Hibernate ShardssessionFactory简单服务实例

4.3    方法参数

如果是到某个库获取数据,则第一个参数必须为Long或者UseridAble类型,用于获取userid

4.4    userid与数据库关系

可选方案

优点

缺点

按号段分

可部分迁移

数据分布不均

取模

数据分布均匀

迁移数据量是1/(n+1),不能按服务器性能分配

在认证库中保存数据库配置

灵活,可部分迁移

查询前需要先从数据库或缓存中获得此配置

 

总的来说,取模是最优方案,但是考虑到服务器性能可能不一致,而又需要充分利用服务器资源,所以需要在取模的同时加上权重。比如现在有二台数据库,权重为12,那么用户id先对3取模,0的为第一台服务器,12的为第二台服务器。

 

4.5精确分页

由于hibernate shards不能到某个库或者其中的几个库中去查询,并且它的分页是先到所有的库中将所有符合条件的数据取回到内存中再进行分页,所以不可能使用它的分页。      

hibernate shards到各个库上查出符合条件的数目及数据库标识(标识为查询表中最小用户id),返回结果后对标识进行排序(这样确保同样的查询条件在翻页的时候能够以同样的顺序查询数据库,以达到精确查询的目的)。根据这个结果计算出每个数据库取值的段,然后用动态数据库连接按之前排好的顺序遍历数据库进行查找,段为0的直接跳过,找满结果则返回。

比如现在有3个库,要查询所在地为深圳的用户,通过hibernate shards查得数据如下:

 

深圳地区用户总数

深圳特区用户最小id

DB1

7

2

DB2

5

1

DB3

30

3

这时按用户最小id排序结果是DB2,DB1,DB3

假设每页10条记录,

第一页的数据是从DB2中取5条,DB1中取前5条,不需要到DB3去取

第二页的数据是从DB1中取后2条,在DB3中取前8条,不需要到DB1中去取

第三页数据是从DB3中取第9到第18条,不需要到DB1DB2中去取

… …

缺点:不能精确排序

 

 

5关键代码

 

Java代码 复制代码 收藏代码
  1. package com.konceptusa.infinet.annotation;   
  2.   
  3. import java.lang.annotation.Documented;   
  4. import java.lang.annotation.ElementType;   
  5. import java.lang.annotation.Retention;   
  6. import java.lang.annotation.RetentionPolicy;   
  7. import java.lang.annotation.Target;   
  8.   
  9. import org.springframework.beans.factory.annotation.Autowire;   
  10.   
  11. /**  
  12.  * 简单服务类实例化标注  
  13.  * @author Jwin  
  14.  *  
  15.  */  
  16. @Retention(RetentionPolicy.RUNTIME)   
  17. @Target( { ElementType.TYPE })   
  18. @Documented  
  19. public @interface DetachDbService   
  20. {   
  21.     boolean lazy() default false;   
  22.     Autowire autoWire() default Autowire.BY_NAME;   
  23.     String init() default "";   
  24.     String destroy() default "";   
  25. }  
package com.konceptusa.infinet.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.beans.factory.annotation.Autowire; /** * 简单服务类实例化标注 * @author Jwin * */ @Retention(RetentionPolicy.RUNTIME) @Target( { ElementType.TYPE }) @Documented public @interface DetachDbService { boolean lazy() default false; Autowire autoWire() default Autowire.BY_NAME; String init() default ""; String destroy() default ""; }

 

 

Java代码 复制代码 收藏代码
  1. package com.konceptusa.infinet.annotation.handler;   
  2.   
  3. import java.util.ArrayList;   
  4. import java.util.List;   
  5.   
  6. import org.apache.commons.logging.Log;   
  7. import org.apache.commons.logging.LogFactory;   
  8. import org.springframework.aop.framework.ProxyFactoryBean;   
  9. import org.springframework.beans.MutablePropertyValues;   
  10. import org.springframework.beans.factory.config.RuntimeBeanReference;   
  11. import org.springframework.beans.factory.support.RootBeanDefinition;   
  12.   
  13. import com.konceptusa.framework.annotation.IllegalConfigException;   
  14. import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler;   
  15. import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils;   
  16. import com.konceptusa.infinet.annotation.DetachDbService;   
  17.   
  18. /**  
  19.  * 向spring中注册简单服务代理实例,引用动态数据库连结的简单服务实例,引用hibernate shards的简单服务实例  
  20.  * @author Jwin  
  21.  *   
  22.  */  
  23. public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler   
  24. {   
  25.     private final static String SESSIONFACTORYNAME = "sessionFactory";   
  26.     public final static String DYNAMIC_POSTFIX = "Dynamic";   
  27.     public final static String SHARDS_POSTFIX = "Shards";   
  28.     private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor";   
  29.   
  30.     private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class);   
  31.   
  32.     public Class annotation()   
  33.     {   
  34.         return DetachDbService.class;   
  35.     }   
  36.   
  37.     @Override  
  38.     protected void handle(DetachDbService s, Class target)   
  39.     {   
  40.         String name = target.getSimpleName();   
  41.         if (!name.endsWith("ServiceImpl"))   
  42.         {   
  43.             throw new IllegalConfigException(target.getName()   
  44.                     + " is not a service bean.service bean 's class name must be end with 'ServiceImpl'");   
  45.         }   
  46.         name = getBeanName(name);   
  47.         String dynamicName = name + DYNAMIC_POSTFIX;   
  48.         String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX;   
  49.         //生成动态获取数据库连接的简单服务实例   
  50.         createBean(s, target, dynamicName, dynamicSessionFactory);                 
  51.         String shardsName = name + SHARDS_POSTFIX;   
  52.         String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX;   
  53.         //生成查询所有数据库的简单服务实例   
  54.         createBean(s, target, shardsName, shardsFactory);   
  55.         //生成简单服务代理类   
  56.         RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name);   
  57.         MutablePropertyValues mpv = new MutablePropertyValues();   
  58.         mpv.addPropertyValue("target"new RuntimeBeanReference(shardsName));   
  59.         List interceptorNamesList = new ArrayList();   
  60.         interceptorNamesList.add(DETACHDBINTERCEPTOR);   
  61.         mpv.addPropertyValue("interceptorNames", interceptorNamesList);   
  62.         definition.setPropertyValues(mpv);   
  63.         registerBeanDefinition(name, definition);          
  64.     }   
  65.   
  66.     private void createBean(DetachDbService s, Class target, String name, String sessionFactory)   
  67.     {   
  68.         RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name);   
  69.         MutablePropertyValues mpv = new MutablePropertyValues();   
  70.         mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory));   
  71.         beanDefinition.setPropertyValues(mpv);   
  72.         registerBeanDefinition(name, beanDefinition);   
  73.     }   
  74.   
  75.     private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name)   
  76.     {   
  77.         RootBeanDefinition definition = new RootBeanDefinition();   
  78.         definition.setAbstract(false);   
  79.         definition.setBeanClass(target);   
  80.         definition.setSingleton(true);   
  81.         definition.setLazyInit(s.lazy());   
  82.         definition.setAutowireCandidate(true);   
  83.         definition.setAutowireMode(s.autoWire().value());   
  84.   
  85.         if (!"".equals(s.init()))   
  86.         {   
  87.             definition.setInitMethodName(s.init().trim());   
  88.         }   
  89.         if (!"".equals(s.destroy()))   
  90.         {   
  91.             definition.setDestroyMethodName(s.destroy().trim());   
  92.         }   
  93.   
  94.         if (LOG.isDebugEnabled())   
  95.         {   
  96.             LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]");   
  97.         }   
  98.         SpringAnnotationUtils.readProperties(target, definition);   
  99.         return definition;   
  100.     }   
  101.   
  102.     private String getBeanName(String name)   
  103.     {   
  104.         name = name.substring(0, name.length() - "Impl".length());   
  105.         name = name.substring(01).toLowerCase() + name.substring(1, name.length());   
  106.         return name;   
  107.     }   
  108.   
  109. }  
package com.konceptusa.infinet.annotation.handler; import java.util.ArrayList; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.aop.framework.ProxyFactoryBean; import org.springframework.beans.MutablePropertyValues; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.RootBeanDefinition; import com.konceptusa.framework.annotation.IllegalConfigException; import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler; import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils; import com.konceptusa.infinet.annotation.DetachDbService; /** * 向spring中注册简单服务代理实例,引用动态数据库连结的简单服务实例,引用hibernate shards的简单服务实例 * @author Jwin * */ public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler { private final static String SESSIONFACTORYNAME = "sessionFactory"; public final static String DYNAMIC_POSTFIX = "Dynamic"; public final static String SHARDS_POSTFIX = "Shards"; private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor"; private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class); public Class annotation() { return DetachDbService.class; } @Override protected void handle(DetachDbService s, Class target) { String name = target.getSimpleName(); if (!name.endsWith("ServiceImpl")) { throw new IllegalConfigException(target.getName() + " is not a service bean.service bean 's class name must be end with 'ServiceImpl'"); } name = getBeanName(name); String dynamicName = name + DYNAMIC_POSTFIX; String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX; //生成动态获取数据库连接的简单服务实例 createBean(s, target, dynamicName, dynamicSessionFactory); String shardsName = name + SHARDS_POSTFIX; String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX; //生成查询所有数据库的简单服务实例 createBean(s, target, shardsName, shardsFactory); //生成简单服务代理类 RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name); MutablePropertyValues mpv = new MutablePropertyValues(); mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName)); List interceptorNamesList = new ArrayList(); interceptorNamesList.add(DETACHDBINTERCEPTOR); mpv.addPropertyValue("interceptorNames", interceptorNamesList); definition.setPropertyValues(mpv); registerBeanDefinition(name, definition); } private void createBean(DetachDbService s, Class target, String name, String sessionFactory) { RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name); MutablePropertyValues mpv = new MutablePropertyValues(); mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory)); beanDefinition.setPropertyValues(mpv); registerBeanDefinition(name, beanDefinition); } private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name) { RootBeanDefinition definition = new RootBeanDefinition(); definition.setAbstract(false); definition.setBeanClass(target); definition.setSingleton(true); definition.setLazyInit(s.lazy()); definition.setAutowireCandidate(true); definition.setAutowireMode(s.autoWire().value()); if (!"".equals(s.init())) { definition.setInitMethodName(s.init().trim()); } if (!"".equals(s.destroy())) { definition.setDestroyMethodName(s.destroy().trim()); } if (LOG.isDebugEnabled()) { LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]"); } SpringAnnotationUtils.readProperties(target, definition); return definition; } private String getBeanName(String name) { name = name.substring(0, name.length() - "Impl".length()); name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length()); return name; } }

 

Java代码 复制代码 收藏代码
  1. package com.konceptusa.infinet.detach.aop;   
  2.   
  3. import org.aopalliance.intercept.MethodInterceptor;   
  4. import org.aopalliance.intercept.MethodInvocation;   
  5. import org.apache.commons.logging.Log;   
  6. import org.apache.commons.logging.LogFactory;   
  7. import org.springframework.util.MethodInvoker;   
  8.   
  9. import com.konceptusa.framework.annotation.IllegalConfigException;   
  10. import com.konceptusa.framework.core.support.ObjectFactory;   
  11. import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;   
  12. import com.konceptusa.infinet.detach.UseridAble;   
  13. import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder;   
  14. import com.konceptusa.infinet.detach.datasource.UseridContextHolder;   
  15.   
  16. /**  
  17.  * 分库简单服务代理  
  18.  * @author Jwin  
  19.  *  
  20.  */  
  21. public class DetachDBInterceptor implements MethodInterceptor   
  22. {   
  23.     private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class);   
  24.     public Object invoke(MethodInvocation invoke) throws Throwable   
  25.     {   
  26.         int len = invoke.getArguments().length;   
  27.         Long id = null;   
  28.         if(len >= 1)   
  29.         {              
  30.             Object arg = invoke.getArguments()[0];   
  31.             if(arg instanceof UseridAble)   
  32.             {   
  33.                 UseridAble useridAble = (UseridAble) arg;   
  34.                 id = useridAble.getUserid();   
  35.             }   
  36.             else if(arg instanceof Long)   
  37.             {   
  38.                 id = (Long) arg;   
  39.             }   
  40.         }   
  41.         if(id != null)   
  42.         {   
  43.             UseridContextHolder.setUserid(id);   
  44.             try  
  45.             {              
  46.                 return invoke(invoke, id);   
  47.             }finally  
  48.             {   
  49.                 UseridContextHolder.removeUserid();   
  50.             }              
  51.         }   
  52.         else  
  53.         {   
  54.             return invoke(invoke, id);             
  55.         }   
  56.     }   
  57.     private Object invoke(MethodInvocation invoke, Long id) throws Throwable   
  58.     {   
  59.         String str = invoke.getThis().toString();   
  60.         int start = str.lastIndexOf(".");   
  61.         int end = str.lastIndexOf("@");   
  62.         String className = str.substring(start + 1, end);   
  63.         String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX;   
  64.         if(id == null && DataSourceIdContextHolder.getDataSourceId() == null)   
  65.         {   
  66.             postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX;   
  67.         }   
  68.         String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix;   
  69.         if(LOG.isDebugEnabled())   
  70.             LOG.debug("select service " + serviceName + " for userid = " + id);   
  71.         Object service = ObjectFactory.getManagedObject(serviceName);   
  72.         if(service == null)   
  73.         {   
  74.             throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context");   
  75.         }   
  76.         MethodInvoker invoker = new MethodInvoker();   
  77.         invoker.setArguments(invoke.getArguments());   
  78.         invoker.setTargetObject(service);   
  79.         invoker.setTargetMethod(invoke.getMethod().getName());   
  80.         invoker.prepare();   
  81.         return invoker.invoke();   
  82.     }   
  83.   
  84. }  
package com.konceptusa.infinet.detach.aop; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.util.MethodInvoker; import com.konceptusa.framework.annotation.IllegalConfigException; import com.konceptusa.framework.core.support.ObjectFactory; import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler; import com.konceptusa.infinet.detach.UseridAble; import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder; import com.konceptusa.infinet.detach.datasource.UseridContextHolder; /** * 分库简单服务代理 * @author Jwin * */ public class DetachDBInterceptor implements MethodInterceptor { private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class); public Object invoke(MethodInvocation invoke) throws Throwable { int len = invoke.getArguments().length; Long id = null; if(len >= 1) { Object arg = invoke.getArguments()[0]; if(arg instanceof UseridAble) { UseridAble useridAble = (UseridAble) arg; id = useridAble.getUserid(); } else if(arg instanceof Long) { id = (Long) arg; } } if(id != null) { UseridContextHolder.setUserid(id); try { return invoke(invoke, id); }finally { UseridContextHolder.removeUserid(); } } else { return invoke(invoke, id); } } private Object invoke(MethodInvocation invoke, Long id) throws Throwable { String str = invoke.getThis().toString(); int start = str.lastIndexOf("."); int end = str.lastIndexOf("@"); String className = str.substring(start + 1, end); String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX; if(id == null && DataSourceIdContextHolder.getDataSourceId() == null) { postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX; } String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix; if(LOG.isDebugEnabled()) LOG.debug("select service " + serviceName + " for userid = " + id); Object service = ObjectFactory.getManagedObject(serviceName); if(service == null) { throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context"); } MethodInvoker invoker = new MethodInvoker(); invoker.setArguments(invoke.getArguments()); invoker.setTargetObject(service); invoker.setTargetMethod(invoke.getMethod().getName()); invoker.prepare(); return invoker.invoke(); } }

 

Java代码 复制代码 收藏代码
  1. package com.konceptusa.infinet.detach.datasource;   
  2.   
  3. import java.util.HashMap;   
  4. import java.util.List;   
  5. import java.util.Map;   
  6. import java.util.Properties;   
  7.   
  8. import javax.sql.DataSource;   
  9.   
  10. import org.apache.commons.lang.StringUtils;   
  11. import org.apache.commons.logging.Log;   
  12. import org.apache.commons.logging.LogFactory;   
  13. import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;   
  14. import org.springframework.util.Assert;   
  15.   
  16. import com.konceptusa.framework.annotation.IllegalConfigException;   
  17. import com.konceptusa.infinet.detach.config.MultiHibernateProperties;   
  18. import com.konceptusa.infinet.detach.service.ISelectDBService;   
  19.   
  20. /**  
  21.  * 动态获取数据库连接基类  
  22.  * @author Jwin  
  23.  *   
  24.  */  
  25. public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource   
  26. {   
  27.     private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class);   
  28.     public final static int defaultDataSourceId = -1;   
  29.     protected MultiHibernateProperties multiHibernateProperties;   
  30.     protected ISelectDBService selectDBService;   
  31.     private String newWeights;   
  32.     private String oldWeights;   
  33.     private Map dataSourceMap = new HashMap();   
  34.     public void setSelectDBService(ISelectDBService selectDBService)   
  35.     {   
  36.         this.selectDBService = selectDBService;   
  37.     }   
  38.     public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties)   
  39.     {   
  40.         this.multiHibernateProperties = multiHibernateProperties;   
  41.     }   
  42.     @Override  
  43.     protected Object determineCurrentLookupKey()   
  44.     {   
  45.         Long id = UseridContextHolder.getUserid();   
  46.         return selectDBService.selectDb(id);   
  47.     }   
  48.        
  49.     @Override  
  50.     public void afterPropertiesSet()   
  51.     {   
  52.         LOG.info("init dynamic datasource start");   
  53.         Assert.notNull(multiHibernateProperties);   
  54.         Assert.notNull(selectDBService);   
  55.         List properties = multiHibernateProperties.getShardProperties();   
  56.         Assert.notEmpty(properties);   
  57.         int dataSourceCount = 0;   
  58.         for(Properties p : properties)   
  59.         {   
  60.             dataSourceCount++;   
  61.             createDataSource(dataSourceMap, p);   
  62.         }   
  63.         createDefaultDataSource(dataSourceMap);   
  64.         selectDBService.setDefaultDataSourceId(defaultDataSourceId);   
  65.         selectDBService.setDataSourceCount(dataSourceCount);   
  66.         setTargetDataSources(dataSourceMap);   
  67.         setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId));   
  68.         initWeight(dataSourceCount);   
  69.         super.afterPropertiesSet();   
  70.         LOG.info("init dynamic datasource success");   
  71.     }   
  72.     public void initWeight(int dataSourceCount)   
  73.     {   
  74.         Map oldWeightMap = new HashMap();   
  75.         Map newWeightMap = new HashMap();   
  76.         int totalOldWeight = 0;   
  77.         int totalNewWeight = 0;   
  78.         if(newWeights != null)   
  79.         {   
  80.             if(LOG.isInfoEnabled())   
  81.                 LOG.info("newWeights " + newWeights);   
  82.             String[] weights = StringUtils.split(newWeights,";");   
  83.             if(weights.length > dataSourceCount)   
  84.             {   
  85.                 throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");   
  86.             }   
  87.             for(int i=0;i
  88.             {   
  89.                 int w = Integer.parseInt(weights[i]);   
  90.                 for(int j=0;j
  91.                 {   
  92.                     newWeightMap.put(totalNewWeight + j, i);   
  93.                 }   
  94.                 totalNewWeight += w;   
  95.             }   
  96.         }   
  97.         else  
  98.         {   
  99.             totalNewWeight = dataSourceCount;   
  100.             for(int i=0;i
  101.             {   
  102.                 newWeightMap.put(i, i);   
  103.             }   
  104.         }   
  105.         if(oldWeights != null)   
  106.         {   
  107.             if(LOG.isInfoEnabled())   
  108.                 LOG.info("oldWeights " + oldWeights);   
  109.             String[] weights = StringUtils.split(oldWeights,";");   
  110.             if(weights.length > dataSourceCount)   
  111.             {   
  112.                 throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");   
  113.             }   
  114.             for(int i=0;i
  115.             {   
  116.                 int w = Integer.parseInt(weights[i]);   
  117.                 for(int j=0;j
  118.                 {   
  119.                     oldWeightMap.put(totalOldWeight + j, i);   
  120.                 }   
  121.                 totalOldWeight += w;   
  122.             }   
  123.         }   
  124.         else  
  125.         {   
  126.             totalOldWeight = dataSourceCount;   
  127.             for(int i=0;i
  128.             {   
  129.                 oldWeightMap.put(i, i);   
  130.             }   
  131.         }   
  132.         if(LOG.isInfoEnabled())   
  133.             LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight);   
  134.         selectDBService.setTotalNewWeight(totalNewWeight);   
  135.         selectDBService.setNewWeightIdMap(newWeightMap);   
  136.         selectDBService.setTotalOldWeight(totalOldWeight);   
  137.         selectDBService.setOldWeightIdMap(oldWeightMap);   
  138.     }   
  139.     protected abstract void createDataSource(Map dataSourceMap, Properties p);   
  140.     protected abstract void createDefaultDataSource(Map dataSourceMap);   
  141.     public void setNewWeights(String newWeights)   
  142.     {   
  143.         this.newWeights = newWeights;   
  144.     }   
  145.     public void setOldWeights(String oldWeights)   
  146.     {   
  147.         this.oldWeights = oldWeights;   
  148.     }   
  149.     public Map getDataSourceMap()   
  150.     {   
  151.         return dataSourceMap;   
  152.     }   
  153.   
  154. }  
package com.konceptusa.infinet.detach.datasource; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import javax.sql.DataSource; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import org.springframework.util.Assert; import com.konceptusa.framework.annotation.IllegalConfigException; import com.konceptusa.infinet.detach.config.MultiHibernateProperties; import com.konceptusa.infinet.detach.service.ISelectDBService; /** * 动态获取数据库连接基类 * @author Jwin * */ public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource { private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class); public final static int defaultDataSourceId = -1; protected MultiHibernateProperties multiHibernateProperties; protected ISelectDBService selectDBService; private String newWeights; private String oldWeights; private Map dataSourceMap = new HashMap(); public void setSelectDBService(ISelectDBService selectDBService) { this.selectDBService = selectDBService; } public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties) { this.multiHibernateProperties = multiHibernateProperties; } @Override protected Object determineCurrentLookupKey() { Long id = UseridContextHolder.getUserid(); return selectDBService.selectDb(id); } @Override public void afterPropertiesSet() { LOG.info("init dynamic datasource start"); Assert.notNull(multiHibernateProperties); Assert.notNull(selectDBService); List properties = multiHibernateProperties.getShardProperties(); Assert.notEmpty(properties); int dataSourceCount = 0; for(Properties p : properties) { dataSourceCount++; createDataSource(dataSourceMap, p); } createDefaultDataSource(dataSourceMap); selectDBService.setDefaultDataSourceId(defaultDataSourceId); selectDBService.setDataSourceCount(dataSourceCount); setTargetDataSources(dataSourceMap); setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId)); initWeight(dataSourceCount); super.afterPropertiesSet(); LOG.info("init dynamic datasource success"); } public void initWeight(int dataSourceCount) { Map oldWeightMap = new HashMap(); Map newWeightMap = new HashMap(); int totalOldWeight = 0; int totalNewWeight = 0; if(newWeights != null) { if(LOG.isInfoEnabled()) LOG.info("newWeights " + newWeights); String[] weights = StringUtils.split(newWeights,";"); if(weights.length > dataSourceCount) { throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]"); } for(int i=0;i dataSourceCount) { throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]"); } for(int i=0;i dataSourceMap, Properties p); protected abstract void createDefaultDataSource(Map dataSourceMap); public void setNewWeights(String newWeights) { this.newWeights = newWeights; } public void setOldWeights(String oldWeights) { this.oldWeights = oldWeights; } public Map getDataSourceMap() { return dataSourceMap; } }

 

 

Java代码 复制代码 收藏代码
  1. package com.konceptusa.infinet.detach.datasource;   
  2.   
  3. import java.beans.PropertyVetoException;   
  4. import java.util.Map;   
  5. import java.util.Properties;   
  6.   
  7. import javax.sql.DataSource;   
  8.   
  9. import org.apache.commons.logging.Log;   
  10. import org.apache.commons.logging.LogFactory;   
  11.   
  12. import com.konceptusa.framework.annotation.IllegalConfigException;   
  13. import com.konceptusa.infinet.detach.config.MultiHibernateProperties;   
  14. import com.mchange.v2.c3p0.ComboPooledDataSource;   
  15.   
  16. /**  
  17.  * 基于c3p0连接池的动态获取连接类  
  18.  * @author Jwin  
  19.  *   
  20.  */  
  21. public class DynamicC3p0DataSource extends AbstractDynamicDataSource   
  22. {   
  23.     private final static Log LOG = LogFactory.getLog(DynamicC3p0DataSource.class);   
  24.     private int initialSize = 1;   
  25.     private int maxActive = 1;   
  26.     private int minActive = 1;   
  27.     private int maxIdleTime = 30;   
  28.     private String automaticTestTable = "Test";   
  29.     private int acquireIncrement = 3;   
  30.     private int maxStatements = 100;   
  31.     private int maxStatementsPerConnection = 3;   
  32.     private int numHelperThreads = 3;   
  33.     private int idleConnectionTestPeriod = 30;   
  34.     protected void createDefaultDataSource(Map dataSourceMap)   
  35.     {   
  36.         ComboPooledDataSource dataSource = new ComboPooledDataSource();   
  37.         dataSource.setUser("sa");   
  38.         dataSource.setPassword("");   
  39.         dataSource.setJdbcUrl("jdbc:hsqldb:mem:" + getClass().getSimpleName().toLowerCase());   
  40.         try  
  41.         {   
  42.             dataSource.setDriverClass("org.hsqldb.jdbcDriver");   
  43.         } catch (PropertyVetoException e)   
  44.         {   
  45.             throw new IllegalConfigException(e);   
  46.         }   
  47.         dataSource.setInitialPoolSize(initialSize);   
  48.         dataSource.setMaxPoolSize(maxActive);   
  49.         dataSource.setMinPoolSize(minActive);   
  50.         dataSource.setMaxIdleTime(maxIdleTime);   
  51.         dataSource.setAcquireIncrement(acquireIncrement);   
  52.         dataSource.setNumHelperThreads(numHelperThreads);   
  53.         dataSource.setAutomaticTestTable(automaticTestTable);   
  54.         dataSource.setMaxStatements(maxStatements);   
  55.         dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);   
  56.         dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);   
  57.         dataSourceMap.put(defaultDataSourceId, dataSource);   
  58.     }   
  59.     @Override  
  60.     protected void createDataSource(Map dataSourceMap, Properties p)   
  61.     {   
  62.         ComboPooledDataSource dataSource = new ComboPooledDataSource();   
  63.         dataSource.setJdbcUrl(p.getProperty(MultiHibernateProperties.connectionUrlKey));   
  64.         LOG.info("init datasource url " + dataSource.getJdbcUrl());   
  65.         dataSource.setUser(p.getProperty(MultiHibernateProperties.connectionUsernameKey));   
  66.         dataSource.setPassword(p.getProperty(MultiHibernateProperties.connectionPasswordKey));   
  67.         try  
  68.         {   
  69.             dataSource.setDriverClass(p.getProperty(MultiHibernateProperties.connectionDriverClassKey));   
  70.         } catch (PropertyVetoException e)   
  71.         {   
  72.             throw new IllegalConfigException(e);   
  73.         }   
  74.         dataSource.setInitialPoolSize(initialSize);   
  75.         dataSource.setMaxPoolSize(maxActive);   
  76.         dataSource.setMinPoolSize(minActive);   
  77.         dataSource.setMaxIdleTime(maxIdleTime);   
  78.         dataSource.setAcquireIncrement(acquireIncrement);   
  79.         dataSource.setNumHelperThreads(numHelperThreads);   
  80.         dataSource.setAutomaticTestTable(automaticTestTable);   
  81.         dataSource.setMaxStatements(maxStatements);   
  82.         dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);   
  83.         dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);   
  84.         String id = p.getProperty(MultiHibernateProperties.shardIdKey);   
  85.         dataSourceMap.put(Integer.parseInt(id), dataSource);   
  86.     }   
  87.     public void setInitialSize(int initialSize)   
  88.     {   
  89.         this.initialSize = initialSize;   
  90.     }   
  91.     public void setMaxActive(int maxActive)   
  92.     {   
  93.         this.maxActive = maxActive;   
  94.     }   
  95.     public void setMaxIdleTime(int maxIdle)   
  96.     {   
  97.         this.maxIdleTime = maxIdle;   
  98.     }      
  99.     public void setAcquireIncrement(int acquireIncrement)   
  100.     {   
  101.         this.acquireIncrement = acquireIncrement;   
  102.     }   
  103.     public void setMaxStatements(int maxStatements)   
  104.     {   
  105.         this.maxStatements = maxStatements;   
  106.     }   
  107.     public void setMaxStatementsPerConnection(int maxStatementsPerConnection)   
  108.     {   
  109.         this.maxStatementsPerConnection = maxStatementsPerConnection;   
  110.     }   
  111.     public void setNumHelperThreads(int numHelperThreads)   
  112.     {   
  113.         this.numHelperThreads = numHelperThreads;   
  114.     }   
  115.     public void setAutomaticTestTable(String automaticTestTable)   
  116.     {   
  117.         this.automaticTestTable = automaticTestTable;   
  118.     }   
  119.     public void setMinActive(int minActive)   
  120.     {   
  121.         this.minActive = minActive;   
  122.     }   
  123.     public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod)   
  124.     {   
  125.         this.idleConnectionTestPeriod = idleConnectionTestPeriod;   
  126.     }   
  127.        
  128.   
  129. }  
package com.konceptusa.infinet.detach.datasource; import java.beans.PropertyVetoException; import java.util.Map; import java.util.Properties; import javax.sql.DataSource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.konceptusa.framework.annotation.IllegalConfigException; import com.konceptusa.infinet.detach.config.MultiHibernateProperties; import com.mchange.v2.c3p0.ComboPooledDataSource; /** * 基于c3p0连接池的动态获取连接类 * @author Jwin * */ public class DynamicC3p0DataSource extends AbstractDynamicDataSource { private final static Log LOG = LogFactory.getLog(DynamicC3p0DataSource.class); private int initialSize = 1; private int maxActive = 1; private int minActive = 1; private int maxIdleTime = 30; private String automaticTestTable = "Test"; private int acquireIncrement = 3; private int maxStatements = 100; private int maxStatementsPerConnection = 3; private int numHelperThreads = 3; private int idleConnectionTestPeriod = 30; protected void createDefaultDataSource(Map dataSourceMap) { ComboPooledDataSource dataSource = new ComboPooledDataSource(); dataSource.setUser("sa"); dataSource.setPassword(""); dataSource.setJdbcUrl("jdbc:hsqldb:mem:" + getClass().getSimpleName().toLowerCase()); try { dataSource.setDriverClass("org.hsqldb.jdbcDriver"); } catch (PropertyVetoException e) { throw new IllegalConfigException(e); } dataSource.setInitialPoolSize(initialSize); dataSource.setMaxPoolSize(maxActive); dataSource.setMinPoolSize(minActive); dataSource.setMaxIdleTime(maxIdleTime); dataSource.setAcquireIncrement(acquireIncrement); dataSource.setNumHelperThreads(numHelperThreads); dataSource.setAutomaticTestTable(automaticTestTable); dataSource.setMaxStatements(maxStatements); dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection); dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod); dataSourceMap.put(defaultDataSourceId, dataSource); } @Override protected void createDataSource(Map dataSourceMap, Properties p) { ComboPooledDataSource dataSource = new ComboPooledDataSource(); dataSource.setJdbcUrl(p.getProperty(MultiHibernateProperties.connectionUrlKey)); LOG.info("init datasource url " + dataSource.getJdbcUrl()); dataSource.setUser(p.getProperty(MultiHibernateProperties.connectionUsernameKey)); dataSource.setPassword(p.getProperty(MultiHibernateProperties.connectionPasswordKey)); try { dataSource.setDriverClass(p.getProperty(MultiHibernateProperties.connectionDriverClassKey)); } catch (PropertyVetoException e) { throw new IllegalConfigException(e); } dataSource.setInitialPoolSize(initialSize); dataSource.setMaxPoolSize(maxActive); dataSource.setMinPoolSize(minActive); dataSource.setMaxIdleTime(maxIdleTime); dataSource.setAcquireIncrement(acquireIncrement); dataSource.setNumHelperThreads(numHelperThreads); dataSource.setAutomaticTestTable(automaticTestTable); dataSource.setMaxStatements(maxStatements); dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection); dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod); String id = p.getProperty(MultiHibernateProperties.shardIdKey); dataSourceMap.put(Integer.parseInt(id), dataSource); } public void setInitialSize(int initialSize) { this.initialSize = initialSize; } public void setMaxActive(int maxActive) { this.maxActive = maxActive; } public void setMaxIdleTime(int maxIdle) { this.maxIdleTime = maxIdle; } public void setAcquireIncrement(int acquireIncrement) { this.acquireIncrement = acquireIncrement; } public void setMaxStatements(int maxStatements) { this.maxStatements = maxStatements; } public void setMaxStatementsPerConnection(int maxStatementsPerConnection) { this.maxStatementsPerConnection = maxStatementsPerConnection; } public void setNumHelperThreads(int numHelperThreads) { this.numHelperThreads = numHelperThreads; } public void setAutomaticTestTable(String automaticTestTable) { this.automaticTestTable = automaticTestTable; } public void setMinActive(int minActive) { this.minActive = minActive; } public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod) { this.idleConnectionTestPeriod = idleConnectionTestPeriod; } }

 

Java代码 复制代码 收藏代码
  1. package com.konceptusa.infinet.imsupport.detach;   
  2.   
  3. import java.util.ArrayList;   
  4. import java.util.Collection;   
  5. import java.util.Collections;   
  6. import java.util.List;   
  7.   
  8. import org.apache.commons.logging.Log;   
  9. import org.apache.commons.logging.LogFactory;   
  10. import org.springframework.transaction.annotation.Propagation;   
  11. import org.springframework.transaction.annotation.Transactional;   
  12.   
  13. import com.konceptusa.framework.annotation.IllegalConfigException;   
  14. import com.konceptusa.framework.core.dao.HibernateQueryListCallback;   
  15. import com.konceptusa.framework.core.dao.hql.Hql;   
  16. import com.konceptusa.framework.core.service.BaseServiceSupport;   
  17. import com.konceptusa.framework.core.service.Page;   
  18. import com.konceptusa.framework.core.support.ObjectFactory;   
  19. import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;   
  20. import com.konceptusa.infinet.detach.CountId;   
  21. import com.konceptusa.infinet.detach.CountIdComparetor;   
  22. import com.konceptusa.infinet.detach.MagrateAble;   
  23. import com.konceptusa.infinet.detach.QueryListAble;   
  24. import com.konceptusa.infinet.detach.datasource.UseridContextHolder;   
  25.   
  26. /**  
  27.  * 多个数据库综合查询,简单服务类父类  
  28.  * @author Jwin  
  29.  *  
  30.  * @param   
  31.  */  
  32. @Transactional(readOnly=true, rollbackFor = Exception.class)   
  33. public abstract class BaseServiceSupportForMulti extends BaseServiceSupport implements QueryListAble,MagrateAble   
  34. {   
  35.     private final static Log LOG = LogFactory.getLog(BaseServiceSupportForMulti.class);   
  36.     @Override  
  37.     protected int findCountByHql(Hql hql)   
  38.     {   
  39.         List countList = (List) getHibernateTemplate().execute(   
  40.                 new HibernateQueryListCallback(new Hql("select count(*) "  
  41.                         + hql.getHql(), hql.getCache(), hql.getParameters())));   
  42.         Long counts = 0L;   
  43.         for(Long count : countList)   
  44.         {   
  45.             counts += count;   
  46.         }   
  47.         return counts.intValue();   
  48.     }   
  49.     @Transactional(readOnly=true, rollbackFor = Exception.class,propagation=Propagation.NOT_SUPPORTED)   
  50.     public List queryList(Hql hql, int from, int offset)   
  51.     {   
  52.         return queryListByHql(hql, from, offset);   
  53.     }   
  54.   
  55.     public List queryCount(Hql hql)   
  56.     {   
  57.         List list = queryListByHql(hql);   
  58.         List countList = new ArrayList(list.size());    
  59.         for(Object[] l : list)   
  60.         {   
  61.             if(l[1] != null)   
  62.             {                  
  63.                 CountId count = new CountId((Long) l[1],(Long)l[0]);   
  64.                 countList.add(count);   
  65.             }   
  66.         }   
  67.         Collections.sort(countList, new CountIdComparetor());   
  68.         return countList;   
  69.     }   
  70.     protected String getBeanName(String name)   
  71.     {   
  72.         name = name.substring(0, name.length() - "Impl".length());   
  73.         name = name.substring(01).toLowerCase() + name.substring(1, name.length());   
  74.         return name;   
  75.     }   
  76.     protected Page queryPageByHql(Hql hql,String useridName, int start, int offset)   
  77.     {   
  78.         Hql countHql = new Hql("select count(*),min(" + useridName + ") "  
  79.                 + hql.getHql(), hql.getCache(), hql.getParameters());   
  80.         return queryPageByHql(countHql, hql, start, offset);   
  81.     }   
  82.     //先查出各个数据库的总数及标识,然后对标识进行排序,最后根据这个结果遍历数据库进行分页查找,找满结果则返回。   
  83.     private Page queryPageByHql(Hql countHql,Hql listHql,int start, int offset)   
  84.     {   
  85.         QueryListAble serviceShards = getShardsService();   
  86.         QueryListAble serviceDynamic = getDynamicService();   
  87.         List countList = serviceShards.queryCount(countHql);   
  88.         //相对于当前之前所有数据库的总数偏移   
  89.         int totalCount = 0;   
  90.         //相对于所有数据库的结束偏移   
  91.         int end = start + offset;   
  92.         //相对于当前数据库的开始偏移量   
  93.         int startRelative = -1;   
  94.         List queryList = new ArrayList(offset);   
  95.         for(CountId count : countList)   
  96.         {   
  97.             totalCount += count.getCount();   
  98.             //之前所有库总数小于开始偏移量,继续下一个数据库   
  99.             if(totalCount < start)   
  100.             {   
  101.                 continue;   
  102.             }   
  103.             //之前所有库总数第一次大于开始偏移量   
  104.             if(startRelative == -1)   
  105.             {                  
  106.                 startRelative = count.getCount().intValue() - (totalCount - start);   
  107.             }   
  108.             else  
  109.             {   
  110.                 startRelative = 0;   
  111.             }   
  112.             int relativeCount = totalCount - end;   
  113.             if(relativeCount >= 0)   
  114.             {   
  115.                 UseridContextHolder.setUserid(count.getId());   
  116.                 try  
  117.                 {   
  118.                     //计算相对于当前库的偏移   
  119.                     int offsetRelative = count.getCount().intValue() - relativeCount - startRelative;   
  120.                     LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());   
  121.                     queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));                    
  122.                 }finally  
  123.                 {   
  124.                     UseridContextHolder.removeUserid();   
  125.                 }   
  126.                 break;   
  127.             }   
  128.             UseridContextHolder.setUserid(count.getId());   
  129.             try  
  130.             {                  
  131.                 //计算相对于当前库的偏移   
  132.                 int offsetRelative = totalCount - startRelative;   
  133.                 LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());   
  134.                 queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));                    
  135.             } finally  
  136.             {   
  137.                 UseridContextHolder.removeUserid();   
  138.             }   
  139.         }   
  140.         totalCount = 0;   
  141.         for(CountId count : countList)   
  142.         {   
  143.             totalCount += count.getCount();   
  144.         }   
  145.         return new Page(totalCount, queryList);                   
  146.     }   
  147.     protected Page queryPageByHql(String hqlstr,String useridName, int start, int offset,Object ... values)   
  148.     {   
  149.         Hql listHql = Hql.createIndexHql(  
阅读(4091) | 评论(0) | 转发(1) |
给主人留下些什么吧!~~