Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2538807
  • 博文数量: 308
  • 博客积分: 5547
  • 博客等级: 大校
  • 技术积分: 3782
  • 用 户 组: 普通用户
  • 注册时间: 2009-11-24 09:47
个人简介

hello world.

文章分类

全部博文(308)

分类: Java

2013-04-19 16:11:59

    如果你搜索,生产者,消费者模式。你会看到很多的原理,例子等。而这些例子,大部分都有一个共同特征,那就是,这个程序是不停的在进行运行,没有停止下来。当然了,这方便教学。让人能很方便,快捷的明白生产者,消费者程序模式的原理。但在具体的工作中,似乎不太常用。
因为一般在工作中,生产者的数量是固定的,程序总要运行完,总要停止。我在工作中,就遇到这样的情况。
    因为工作需要,我需要给另外一个系统提供全量数据,经过初期的数据整理工作,全量数据达到400多万的数据量,但还有一些数据项,在另外的系统中,我只能通过WebService调用其他系统,以获取我需要的数据,才能最终完成数据工作。这也就意味着,每一条数据我都要进行一次请求另外的系统。因为这400多万的数据量。你可以用简单的方法,如select * from tbA,可400多万的数据量(也许你处理的数据量,比我的还要大),这样搞,很可能内存会溢出。这时候,大家一般都会想到使用分块处理,比如说一次读2万条,处理完后,然后再继续读取,处理,直到数据处理完。这个时候,在这里我采用一个生产者,多个消费者的方式。
    选定这种处理方式后,肯定需要对已经处理过的数据,做一个标识。这样就可以区分,已经完成的,未完成的。因此我在原来的表结构上追加一个字段 deal_tag。默认值是0。
    处理的方式如下:
    生产者读取指定的数据记录数(假设为2万)。
    1.更新rownum<=20000的数据,将其标识位更新为'B'。此部操作的目的是,选取要处理的数据。
    2.查询标志位为'B'的数据,将其读到缓冲区中,更新原有的'B'标志位'BB'标识此数据已经进入缓冲区。
    3.此时消费者开始对进入缓冲区的数据,进行处理。当缓冲区的数据处理完后,对通知生产者继续读数据,如果要进行处理的数据读完后,生产者,更新一个生产者完成的标志。此时生产者线程正常结束。消费者结束的标志为:生产完成的标志为true,并且缓冲区中也没有数据,消费者也就结束。当消费者线程都完成后,则程序也就结束了。
     是不是,说的你也迷糊了。那先把代码放上来(资源管理):

点击(此处)折叠或打开

  1. package com.epro.client;


  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.log4j.Logger;
  5. import com.epro.entity.FaultInfo;


  6. /**
  7.  * objcect: TradeManage
  8.  * author 程晓鹏
  9.  * description: 工单管理类
  10.  * date Mar 19, 2013 3:56:33 PM
  11.  */
  12. public class TradeManage {
  13.     
  14.     private List<FaultInfo> tradeList; //工单数据列表
  15.     private static Logger log = Logger.getLogger(TradeManage.class);
  16.     public boolean readOK;
  17.     
  18.     /**
  19.      * 默认构造函数
  20.      */
  21.     public TradeManage(){
  22.         tradeList = new ArrayList<FaultInfo>();
  23.         readOK = false;
  24.     }
  25.     
  26.     
  27.     /**
  28.      * 进行资源生产
  29.      * @param listProducer 生产的资源集合
  30.      */
  31.     public synchronized void increace(List<FaultInfo> listProducer){
  32.         while(tradeList.size() != 0){
  33.             try{
  34.                 wait();
  35.             }catch (InterruptedException e) {
  36.                 e.printStackTrace();
  37.                 log.error("increace error: " + e.getMessage());
  38.             }
  39.         }
  40.         if(listProducer.size() > 0){
  41.             this.tradeList.addAll(listProducer);            
  42.         }else{
  43.             readOK = true; //没有往缓冲区中放数据,说明读取操作完成
  44.         }
  45.         
  46.         this.notifyAll();
  47.     }
  48.     
  49.     /**
  50.      * 消费者进行资源消费
  51.      * @return FaultInfo对象
  52.      */
  53.     public synchronized FaultInfo decreace(){
  54.         FaultInfo result = null;
  55.         while(tradeList.size() == 0){
  56.             if(!readOK){
  57.                 try{
  58.      wait();
  59.                 }
  60.                 catch (InterruptedException e) {
  61.      e.printStackTrace();
  62.      log.error("decreace error: " + e.getMessage());
  63.      }
  64.             }else{
  65.                 break;
  66.             }
  67.         }
  68.         
  69.         if(tradeList.size() > 0){
  70.             result = tradeList.remove(0);
  71.         }
  72.         this.notifyAll();
  73.         return result;
  74.     }
  75. }
生产者的代码,生产者,主要进行数据的读完,并放入到缓冲区中:

点击(此处)折叠或打开

  1. package com.epro.client;

  2. import java.sql.ResultSet;
  3. import java.sql.SQLException;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import java.util.regex.Matcher;
  7. import java.util.regex.Pattern;

  8. import org.apache.log4j.Logger;

  9. import com.epro.DBUtility.DBHelper;
  10. import com.epro.entity.FaultInfo;

  11. /**
  12.  * objcect: ReadThread
  13.  * author 程晓鹏
  14.  * description: 进行数据读取的线程
  15.  * date Mar 19, 2013 4:20:45 PM
  16.  */
  17. public class ReadThread implements Runnable{
  18.     
  19.     private TradeManage manage;
  20.     public static String masterTableName = "ti_all_112_disk2";
  21.     private int readSize;
  22.     private DBHelper db;
  23.     private int totalReadSize = 0;
  24.     private static Logger log = Logger.getLogger(ReadThread.class);
  25.     /**
  26.      * 默认构造函数
  27.      */
  28.     public ReadThread(TradeManage trade){
  29.         manage = trade;
  30.         db = new DBHelper();
  31.         readSize = 0;
  32.         totalReadSize = 0;
  33.     }
  34.     
  35.     public void run(){
  36.         do{
  37.             manage.increace(this.readData(50000));            
  38.         }while(readSize > 0);
  39.     }

  40.     /**
  41.      * 进行数据读取
  42.      * @param readCount 读取的数据量
  43.      * @return 返回读取的数据数
  44.      */
  45.     public List<FaultInfo> readData(int readCount){
  46.         List<FaultInfo> result = new ArrayList<FaultInfo>();
  47.         String strSql = "";
  48.         this.dealOK();    //对上一批处理过的数据,进行提交            
  49.         strSql = getInitDataSql(readCount); //将要进行数据的处理的标志位更改为'B'
  50.         db.executeUpdate(strSql); //执行操作。
  51.         strSql = this.getReadSql(); //读取标志位为 'B'的数据
  52.         ResultSet rs = null;            
  53.         try {
  54.             rs = db.executeQuery(strSql);
  55.             while(rs.next()){
  56.                 FaultInfo item = new FaultInfo();
  57.                 item.CityCode = getString(rs.getString("eparchy_code"));
  58.                 item.Num = getString(rs.getString("serial_number"));
  59.                 item.ServiceID = getString(rs.getString("service_id"));
  60.                 item.InstallAddress = getString(rs.getString("detail_install_address"));
  61.                 item.bizCode = getString(rs.getString("product_class"));
  62.                 item.TownFlag = getString(rs.getString("town_flag"));
  63.                 item.Accounts = getString(rs.getString("product_id"));
  64.                 item.LineBaud = getString(rs.getString("rate"));
  65.                 item.Inmethod = getString(rs.getString("in_net_mode"));
  66.                 item.ContactPersonName = getString(rs.getString("link_name"));
  67.                 item.ContactPersonunit = getString(rs.getString("work_name"));
  68.                 item.ContactPersonPhone = getString(rs.getString("link_phone"));
  69.                 item.AcceptPhoneNum = getString(rs.getString("handset_num"));
  70.                 item.IdCardType = getString(rs.getString("pspt_type_code"));
  71.                 item.OperaterType = getString(rs.getString("operate_type"));
  72.                 item.IdCardNum = getString(rs.getString("pspt_id"));
  73.                 item.InTime = getString(rs.getString("in_time"));
  74.                 item.CustTypeCode = getString(rs.getString("cust_type_code"));
  75.                 item.CustTypeName = getString(rs.getString("cust_type_name"));
  76.                 item.UserID = getString(rs.getString("user_id"));
  77.                 item.NasCode = getString(rs.getString("nas_code"));
  78.                 item.NasName = getString(rs.getString("nas_name"));
  79.                 item.ServiceName = getString(rs.getString("service_name"));
  80.                 item.OfficeId = getString(rs.getString("moffice_id")); //局向编码
  81.                 item.OfficeName = getString(rs.getString("moffice_name")); //局向名称
  82.                 item.CustID = getString(rs.getString("cust_id"));
  83.                 item.CustName = getString(rs.getString("cust_name"));
  84.                 result.add(item); //放入集合中    
  85.             }
  86.             rs.close();
  87.         } catch (SQLException e) {
  88.             e.printStackTrace();
  89.             log.error(e.getMessage());
  90.         }finally{
  91.             rs = null;
  92.             db.Close();
  93.         }
  94.         readSize = result.size(); //获取读取的数量
  95.         if(readSize > 0){
  96.             totalReadSize += readSize;
  97.             log.info("read size: " + readSize + " total read size: " + totalReadSize);
  98.             strSql = this.getReadOKSql();
  99.             db.executeUpdate(strSql); //执行操作。    
  100.             log.info("read ok.");
  101.         }
  102.         return result;
  103.         
  104.     }    

  105.     /**
  106.      * 当数据处理完成后,进行标识为更改
  107.      */
  108.     public void dealOK(){
  109.         String strSql = "";
  110.         if(readSize > 0){
  111.             strSql = getDealOKSql(); //提交上一批数据,更改标志位为'OK'
  112.             db.executeUpdate(strSql); //执行操作。
  113.             log.info("commit size is: " + readSize);
  114.         }
  115.     }
  116.     
  117.     /**
  118.      * 对要入缓冲区的数据进行标志位更改
  119.      * @param readCount 读取的数据条数
  120.      * @return 对要进入缓冲区的数据进行标志位更改
  121.      */
  122.     private String getInitDataSql(int readCount){
  123.         StringBuilder result = new StringBuilder();
  124.         result.append("update " + masterTableName +" set deal_tag = 'B' where rownum <= ");
  125.         result.append(readCount);
  126.         result.append(" and deal_tag = '0'");
  127.         
  128.         return result.toString();
  129.     }
  130.     
  131.     private String getDealOKSql(){
  132.          return "update " + masterTableName +" set deal_tag = 'OK' where deal_tag = 'BB'";
  133.     }
  134.     
  135.     /**
  136.      * 读取完成的SQL语句
  137.      * @return SQL语句
  138.      */
  139.     private String getReadOKSql(){
  140.         return "update " + masterTableName +" set deal_tag = 'BB' where deal_tag = 'B'";
  141.     }
  142.     
  143.     /**
  144.      * 读取数据的SQL
  145.      * @return SQL字符串
  146.      */
  147.     private String getReadSql(){
  148.         StringBuilder result = new StringBuilder();
  149.         result.append("select tb1.eparchy_code, tb1.serial_number, tb1.service_id, ");
  150.         result.append(" tb1.detail_install_address, tb1.product_class, tb1.town_flag,");
  151.         result.append(" tb1.product_id, tb1.rate, tb1.in_net_mode,");
  152.         result.append(" nvl(tb1.link_name, '') link_name, tb1.work_name, ");
  153.         result.append(" nvl(tb1.link_phone, '') link_phone, tb1.handset_num, tb1.pspt_type_code, ");
  154.         result.append(" tb1.operate_type, tb1.pspt_id, ");
  155.         result.append(" to_char(in_time, 'yyyy-MM-dd hh24:mi:ss') in_time,");
  156.         result.append(" tb1.cust_type_code, ");
  157.         result.append(" decode(tb1.cust_type_name, ");
  158.         result.append(" null,decode(tb1.cust_type_code, ");
  159.         result.append(" '0', '个人客户',");
  160.         result.append(" '1', '集团客户', ''), ");
  161.         result.append(" tb1.cust_type_name) cust_type_name, tb1.user_id, ");
  162.         result.append(" tb1.nas_code, tb1.nas_name, tb1.service_name, ");
  163.         result.append(" tb1.moffice_id, tb1.moffice_name, "); //追加局向编码,局向名称
  164.         result.append(" tb1.cust_id, tb1.cust_name ");
  165.         result.append("from ");
  166.         result.append(masterTableName);
  167.         result.append(" tb1 ");
  168.         result.append("where tb1.deal_tag = 'B' ");
  169.         
  170.         return result.toString();
  171.     }
  172.     
  173.     //对null值进行屏蔽,返回空字符串
  174.     private String getString(String strInfo){
  175.         String result = "";
  176.         if( null != strInfo && !"".equals(strInfo)){
  177.             Pattern p = Pattern.compile("\t|\r|\n");
  178.             Matcher m = p.matcher(strInfo);
  179.             result = m.replaceAll(""); //将制表符,回车,换行符过滤掉
  180.         }
  181.         
  182.         return result;
  183.     }    
  184. }
消费者的代码,主要对进行缓冲区的数据,进行处理工作:

点击(此处)折叠或打开

  1. package com.epro.client;

  2. import java.io.File;
  3. import java.util.regex.Matcher;
  4. import java.util.regex.Pattern;

  5. import org.apache.log4j.Logger;
  6. import com.epro.Res.ResFacade;
  7. import com.epro.Res.ResInfo;
  8. import com.epro.entity.FaultInfo;

  9. /**
  10.  * 处理数据类
  11.  * @author 程晓鹏
  12.  *
  13.  */
  14. public class DealData implements Runnable {
  15.     
  16.     private GenerateFile genFile;
  17.     private ResFacade res;
  18.     private static Logger log = Logger.getLogger(DealData.class);
  19.     private TradeManage mytrade;
  20.     /**
  21.      * 默认构造函数
  22.      */
  23.     public DealData(TradeManage trade){
  24.         mytrade = trade;
  25.         res = new ResFacade();
  26.         genFile = new GenerateFile("result");    
  27.     }
  28.     
  29.     public void run() {    
  30.         this.beginDealData();        
  31.     }
  32.     
  33.     private void beginDealData(){
  34.         FaultInfo trade = null; //工单ID
  35.         do{
  36.             trade = mytrade.decreace();
  37.             if(trade != null){ //当没有数据时,跳出循环
  38.                 this.dealTrade(trade);
  39.             }
  40.         }while(trade != null); //当消费的资源为NULL时,则说明工作已经完成,可以跳出循环,结束线程
  41.     }
  42.     
  43.     private String getInstallAddress(String serviceName, String nasCode, String nasName){
  44.         StringBuilder result = new StringBuilder();
  45.         result.append(serviceName);
  46.         result.append(";");
  47.         result.append(nasCode);
  48.         result.append(";");
  49.         result.append(nasName);
  50.         
  51.         return result.toString();
  52.     }    
  53.     
  54.     //获取Num2的数据
  55.     private String getNum2(String cityCode, String num1)
  56.     {
  57.         String result = "";
  58.         if(num1.length() > 0 && num1.length() <=8){
  59.             result = cityCode + num1;
  60.         }
  61.         
  62.         return result;        
  63.     }
  64.     
  65.     private void dealTrade(FaultInfo trade){
  66.         String strTag = ""; //处理状态标志位
  67.         if("0".equals(trade.bizCode) || "1".equals(trade.bizCode)){
  68.             ResInfo myRes = res.getResInfo(trade.UserID);
  69.             if("0".equals(myRes.Result)){
  70.                 trade.Num1 = getString(myRes.RelaProductNo);
  71.                 trade.OfficeId = getString(myRes.OfficeCode);
  72.                 trade.OfficeName = getString(myRes.OfficeName);
  73.                 trade.DeviceCode = getString(myRes.LnFacCode);
  74.                 trade.LineType = getString(myRes.LineType);
  75.                 trade.CommitmentFlag = getString(myRes.CityFlag);
  76.             }else{
  77.                 trade.Num1 = "";
  78.                 trade.DeviceCode = "";
  79.                 trade.LineType = "";
  80.                 trade.CommitmentFlag = "";
  81.             }
  82.             trade.Num2 = getNum2(trade.CityCode, trade.Num1);        
  83.             strTag = myRes.Result;
  84.         }else if("2".equals(trade.bizCode)){
  85.             trade.Num1 = trade.Num;
  86.             trade.Num2 = trade.Num;
  87.             trade.InstallAddress = getInstallAddress(trade.ServiceName,trade.NasCode, trade.NasName);
  88.             trade.OfficeId = getString(trade.NasCode);
  89.             trade.OfficeName = getString(trade.NasName);
  90.             trade.LineType = "";
  91.             trade.DeviceCode = "";
  92.             trade.CommitmentFlag = "";
  93.             strTag = "0";
  94.         }        

  95.         if(!"0".equals(strTag)){
  96.             log.info("user_id: " + trade.UserID + " error_id: " +strTag);
  97.         }
  98.         
  99.         genFile.append(trade);    //追加数据
  100.     }
  101.     
  102.     
  103.     /**
  104.      * 获取生成的文件
  105.      * @return File对象
  106.      */
  107.     public File getGenerateFile(){
  108.         return genFile.getFile();
  109.     }
  110.     
  111.     //对null值进行屏蔽,返回空字符串
  112.     private String getString(String strInfo){
  113.         String result = "";
  114.         if( null != strInfo && !"".equals(strInfo)){
  115.             Pattern p = Pattern.compile("\t|\r|\n");
  116.             Matcher m = p.matcher(strInfo);
  117.             result = m.replaceAll(""); //将制表符,回车,换行符过滤掉
  118.         }
  119.         
  120.         return result;
  121.     }
  122. }
初始化,生产者,消费者的代码,就比较的简单了,我采用一个生产者,多个消费者的模式(因为读一次数据,就够消费者处理一阵子了)

点击(此处)折叠或打开

  1. /**
  2.      * 进行数据处理
  3.      */
  4.     private void dealData(){
  5.         TradeManage trade = new TradeManage();
  6.         
  7.         ReadThread read = new ReadThread(trade);
  8.         Thread myReadThread = new Thread(read);
  9.      log.info("master table: " + ReadThread.masterTableName );
  10.         myReadThread.start(); //启动读取数据库线程
  11.         
  12.         DealData mydeal = new DealData(trade);
  13.         List<Thread> listThread = new ArrayList<Thread>();
  14.         for(int i=0; i<100; i++){            
  15.             Thread myThread = new Thread(mydeal);
  16.             myThread.start(); //启动线程
  17.             listThread.add(myThread);
  18.         }
  19.         
  20.         while(listThread.size() > 0){
  21.             Thread mythread = listThread.get(0);
  22.             if("TERMINATED".equals(mythread.getState().toString())){
  23.                 listThread.remove(mythread);
  24.             }
  25.         }
  26.         read.dealOK(); //提交缓冲区数据,进行标志位更改
  27.         File genFile = mydeal.getGenerateFile(); //获取生成的文件
  28.         this.upFtpFile(genFile); //进行文件生成
  29.     }



阅读(8157) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

brokephp2013-11-20 15:10:51

正在寻找这方面的东西。。。谢啦