如果你搜索,生产者,消费者模式。你会看到很多的原理,例子等。而这些例子,大部分都有一个共同特征,那就是,这个程序是不停的在进行运行,没有停止下来。当然了,这方便教学。让人能很方便,快捷的明白生产者,消费者程序模式的原理。但在具体的工作中,似乎不太常用。
因为一般在工作中,生产者的数量是固定的,程序总要运行完,总要停止。我在工作中,就遇到这样的情况。
因为工作需要,我需要给另外一个系统提供全量数据,经过初期的数据整理工作,全量数据达到400多万的数据量,但还有一些数据项,在另外的系统中,我只能通过WebService调用其他系统,以获取我需要的数据,才能最终完成数据工作。这也就意味着,每一条数据我都要进行一次请求另外的系统。因为这400多万的数据量。你可以用简单的方法,如select * from tbA,可400多万的数据量(也许你处理的数据量,比我的还要大),这样搞,很可能内存会溢出。这时候,大家一般都会想到使用分块处理,比如说一次读2万条,处理完后,然后再继续读取,处理,直到数据处理完。这个时候,在这里我采用一个生产者,多个消费者的方式。
选定这种处理方式后,肯定需要对已经处理过的数据,做一个标识。这样就可以区分,已经完成的,未完成的。因此我在原来的表结构上追加一个字段 deal_tag。默认值是0。
处理的方式如下:
生产者读取指定的数据记录数(假设为2万)。
1.更新rownum<=20000的数据,将其标识位更新为'B'。此部操作的目的是,选取要处理的数据。
2.查询标志位为'B'的数据,将其读到缓冲区中,更新原有的'B'标志位'BB'标识此数据已经进入缓冲区。
3.此时消费者开始对进入缓冲区的数据,进行处理。当缓冲区的数据处理完后,对通知生产者继续读数据,如果要进行处理的数据读完后,生产者,更新一个生产者完成的标志。此时生产者线程正常结束。消费者结束的标志为:生产完成的标志为true,并且缓冲区中也没有数据,消费者也就结束。当消费者线程都完成后,则程序也就结束了。
是不是,说的你也迷糊了。那先把代码放上来(资源管理):
-
package com.epro.client;
-
-
-
import java.util.ArrayList;
-
import java.util.List;
-
import org.apache.log4j.Logger;
-
import com.epro.entity.FaultInfo;
-
-
-
/**
-
* objcect: TradeManage
-
* author 程晓鹏
-
* description: 工单管理类
-
* date Mar 19, 2013 3:56:33 PM
-
*/
-
public class TradeManage {
-
-
private List<FaultInfo> tradeList; //工单数据列表
-
private static Logger log = Logger.getLogger(TradeManage.class);
-
public boolean readOK;
-
-
/**
-
* 默认构造函数
-
*/
-
public TradeManage(){
-
tradeList = new ArrayList<FaultInfo>();
-
readOK = false;
-
}
-
-
-
/**
-
* 进行资源生产
-
* @param listProducer 生产的资源集合
-
*/
-
public synchronized void increace(List<FaultInfo> listProducer){
-
while(tradeList.size() != 0){
-
try{
-
wait();
-
}catch (InterruptedException e) {
-
e.printStackTrace();
-
log.error("increace error: " + e.getMessage());
-
}
-
}
-
if(listProducer.size() > 0){
-
this.tradeList.addAll(listProducer);
-
}else{
-
readOK = true; //没有往缓冲区中放数据,说明读取操作完成
-
}
-
-
this.notifyAll();
-
}
-
-
/**
-
* 消费者进行资源消费
-
* @return FaultInfo对象
-
*/
-
public synchronized FaultInfo decreace(){
-
FaultInfo result = null;
-
while(tradeList.size() == 0){
-
if(!readOK){
-
try{
-
wait();
-
}
-
catch (InterruptedException e) {
-
e.printStackTrace();
-
log.error("decreace error: " + e.getMessage());
-
}
-
}else{
-
break;
-
}
-
}
-
-
if(tradeList.size() > 0){
-
result = tradeList.remove(0);
-
}
-
this.notifyAll();
-
return result;
-
}
-
}
生产者的代码,生产者,主要进行数据的读完,并放入到缓冲区中:
-
package com.epro.client;
-
-
import java.sql.ResultSet;
-
import java.sql.SQLException;
-
import java.util.ArrayList;
-
import java.util.List;
-
import java.util.regex.Matcher;
-
import java.util.regex.Pattern;
-
-
import org.apache.log4j.Logger;
-
-
import com.epro.DBUtility.DBHelper;
-
import com.epro.entity.FaultInfo;
-
-
/**
-
* objcect: ReadThread
-
* author 程晓鹏
-
* description: 进行数据读取的线程
-
* date Mar 19, 2013 4:20:45 PM
-
*/
-
public class ReadThread implements Runnable{
-
-
private TradeManage manage;
-
public static String masterTableName = "ti_all_112_disk2";
-
private int readSize;
-
private DBHelper db;
-
private int totalReadSize = 0;
-
private static Logger log = Logger.getLogger(ReadThread.class);
-
/**
-
* 默认构造函数
-
*/
-
public ReadThread(TradeManage trade){
-
manage = trade;
-
db = new DBHelper();
-
readSize = 0;
-
totalReadSize = 0;
-
}
-
-
public void run(){
-
do{
-
manage.increace(this.readData(50000));
-
}while(readSize > 0);
-
}
-
-
/**
-
* 进行数据读取
-
* @param readCount 读取的数据量
-
* @return 返回读取的数据数
-
*/
-
public List<FaultInfo> readData(int readCount){
-
List<FaultInfo> result = new ArrayList<FaultInfo>();
-
String strSql = "";
-
this.dealOK(); //对上一批处理过的数据,进行提交
-
strSql = getInitDataSql(readCount); //将要进行数据的处理的标志位更改为'B'
-
db.executeUpdate(strSql); //执行操作。
-
strSql = this.getReadSql(); //读取标志位为 'B'的数据
-
ResultSet rs = null;
-
try {
-
rs = db.executeQuery(strSql);
-
while(rs.next()){
-
FaultInfo item = new FaultInfo();
-
item.CityCode = getString(rs.getString("eparchy_code"));
-
item.Num = getString(rs.getString("serial_number"));
-
item.ServiceID = getString(rs.getString("service_id"));
-
item.InstallAddress = getString(rs.getString("detail_install_address"));
-
item.bizCode = getString(rs.getString("product_class"));
-
item.TownFlag = getString(rs.getString("town_flag"));
-
item.Accounts = getString(rs.getString("product_id"));
-
item.LineBaud = getString(rs.getString("rate"));
-
item.Inmethod = getString(rs.getString("in_net_mode"));
-
item.ContactPersonName = getString(rs.getString("link_name"));
-
item.ContactPersonunit = getString(rs.getString("work_name"));
-
item.ContactPersonPhone = getString(rs.getString("link_phone"));
-
item.AcceptPhoneNum = getString(rs.getString("handset_num"));
-
item.IdCardType = getString(rs.getString("pspt_type_code"));
-
item.OperaterType = getString(rs.getString("operate_type"));
-
item.IdCardNum = getString(rs.getString("pspt_id"));
-
item.InTime = getString(rs.getString("in_time"));
-
item.CustTypeCode = getString(rs.getString("cust_type_code"));
-
item.CustTypeName = getString(rs.getString("cust_type_name"));
-
item.UserID = getString(rs.getString("user_id"));
-
item.NasCode = getString(rs.getString("nas_code"));
-
item.NasName = getString(rs.getString("nas_name"));
-
item.ServiceName = getString(rs.getString("service_name"));
-
item.OfficeId = getString(rs.getString("moffice_id")); //局向编码
-
item.OfficeName = getString(rs.getString("moffice_name")); //局向名称
-
item.CustID = getString(rs.getString("cust_id"));
-
item.CustName = getString(rs.getString("cust_name"));
-
result.add(item); //放入集合中
-
}
-
rs.close();
-
} catch (SQLException e) {
-
e.printStackTrace();
-
log.error(e.getMessage());
-
}finally{
-
rs = null;
-
db.Close();
-
}
-
readSize = result.size(); //获取读取的数量
-
if(readSize > 0){
-
totalReadSize += readSize;
-
log.info("read size: " + readSize + " total read size: " + totalReadSize);
-
strSql = this.getReadOKSql();
-
db.executeUpdate(strSql); //执行操作。
-
log.info("read ok.");
-
}
-
return result;
-
-
}
-
-
/**
-
* 当数据处理完成后,进行标识为更改
-
*/
-
public void dealOK(){
-
String strSql = "";
-
if(readSize > 0){
-
strSql = getDealOKSql(); //提交上一批数据,更改标志位为'OK'
-
db.executeUpdate(strSql); //执行操作。
-
log.info("commit size is: " + readSize);
-
}
-
}
-
-
/**
-
* 对要入缓冲区的数据进行标志位更改
-
* @param readCount 读取的数据条数
-
* @return 对要进入缓冲区的数据进行标志位更改
-
*/
-
private String getInitDataSql(int readCount){
-
StringBuilder result = new StringBuilder();
-
result.append("update " + masterTableName +" set deal_tag = 'B' where rownum <= ");
-
result.append(readCount);
-
result.append(" and deal_tag = '0'");
-
-
return result.toString();
-
}
-
-
private String getDealOKSql(){
-
return "update " + masterTableName +" set deal_tag = 'OK' where deal_tag = 'BB'";
-
}
-
-
/**
-
* 读取完成的SQL语句
-
* @return SQL语句
-
*/
-
private String getReadOKSql(){
-
return "update " + masterTableName +" set deal_tag = 'BB' where deal_tag = 'B'";
-
}
-
-
/**
-
* 读取数据的SQL
-
* @return SQL字符串
-
*/
-
private String getReadSql(){
-
StringBuilder result = new StringBuilder();
-
result.append("select tb1.eparchy_code, tb1.serial_number, tb1.service_id, ");
-
result.append(" tb1.detail_install_address, tb1.product_class, tb1.town_flag,");
-
result.append(" tb1.product_id, tb1.rate, tb1.in_net_mode,");
-
result.append(" nvl(tb1.link_name, '') link_name, tb1.work_name, ");
-
result.append(" nvl(tb1.link_phone, '') link_phone, tb1.handset_num, tb1.pspt_type_code, ");
-
result.append(" tb1.operate_type, tb1.pspt_id, ");
-
result.append(" to_char(in_time, 'yyyy-MM-dd hh24:mi:ss') in_time,");
-
result.append(" tb1.cust_type_code, ");
-
result.append(" decode(tb1.cust_type_name, ");
-
result.append(" null,decode(tb1.cust_type_code, ");
-
result.append(" '0', '个人客户',");
-
result.append(" '1', '集团客户', ''), ");
-
result.append(" tb1.cust_type_name) cust_type_name, tb1.user_id, ");
-
result.append(" tb1.nas_code, tb1.nas_name, tb1.service_name, ");
-
result.append(" tb1.moffice_id, tb1.moffice_name, "); //追加局向编码,局向名称
-
result.append(" tb1.cust_id, tb1.cust_name ");
-
result.append("from ");
-
result.append(masterTableName);
-
result.append(" tb1 ");
-
result.append("where tb1.deal_tag = 'B' ");
-
-
return result.toString();
-
}
-
-
//对null值进行屏蔽,返回空字符串
-
private String getString(String strInfo){
-
String result = "";
-
if( null != strInfo && !"".equals(strInfo)){
-
Pattern p = Pattern.compile("\t|\r|\n");
-
Matcher m = p.matcher(strInfo);
-
result = m.replaceAll(""); //将制表符,回车,换行符过滤掉
-
}
-
-
return result;
-
}
-
}
消费者的代码,主要对进行缓冲区的数据,进行处理工作:
初始化,生产者,消费者的代码,就比较的简单了,我采用一个生产者,多个消费者的模式(因为读一次数据,就够消费者处理一阵子了)
-
/**
-
* 进行数据处理
-
*/
-
private void dealData(){
-
TradeManage trade = new TradeManage();
-
-
ReadThread read = new ReadThread(trade);
-
Thread myReadThread = new Thread(read);
-
log.info("master table: " + ReadThread.masterTableName );
-
myReadThread.start(); //启动读取数据库线程
-
-
DealData mydeal = new DealData(trade);
-
List<Thread> listThread = new ArrayList<Thread>();
-
for(int i=0; i<100; i++){
-
Thread myThread = new Thread(mydeal);
-
myThread.start(); //启动线程
-
listThread.add(myThread);
-
}
-
-
while(listThread.size() > 0){
-
Thread mythread = listThread.get(0);
-
if("TERMINATED".equals(mythread.getState().toString())){
-
listThread.remove(mythread);
-
}
-
}
-
read.dealOK(); //提交缓冲区数据,进行标志位更改
-
File genFile = mydeal.getGenerateFile(); //获取生成的文件
-
this.upFtpFile(genFile); //进行文件生成
-
}
阅读(8208) | 评论(1) | 转发(0) |