在工作中,我们有时候需要对表,甚至是多个表,进行循环扫描,如果含有数据进行数据处理。这个让人很容易想到用到,生产者消费者模式,进行数据处理。如果具体的编程时你会发现,网上的很多例子是在单生产者多消费者模式下,正常运行,如果出现了多个生产者时,则会造成阻塞,或者有一个另外一个生产者的数据,无法及时的处理。
仔细阅读代码,你会发现问题在于,当多个生产者,监控到缓冲区中没有数据,而临界资源访问时,只能是单个线程进行。这样会造成一个生产者进入临界资源,而另外的生成者线程被阻塞,当访问临界资源的那个生产者,结束访问临界资源后,缓冲区已经有了资源,这个时候,按照逻辑,另外的生成者则被阻塞。在极端的情况下,则会造成多个生产者,一直被阻塞无法进入缓冲区,无法进行数据处理。
明白问题所在,一般就知道了解决的思路。问题在于多个生产者竞争访问临界资源。我采用的思路是,将多个生成者转化为一个生产者,用一个生产者访问临界资源。通过观察者模式,多需要生产数据时,遍历各个生成者,将数据汇总后,一次送入缓冲区。另外设置监听线程,只要缓冲区没有数据,即通过观察者模式,通知生成者生产数据(也就是扫表操作)。为方便使用,采用泛型类,泛型接口。方便系统接入。代码如下:
-
package com.common.ProducerConsumer;
-
-
import java.util.ArrayList;
-
import java.util.LinkedList;
-
import java.util.List;
-
-
import org.apache.log4j.Logger;
-
-
//生产者,消费者所使用的缓冲区
-
//author:程晓鹏
-
//date:2016.10.17
-
public class BufferStorage<T>{
-
-
private static final Logger log = Logger.getLogger(BufferStorage.class);
-
private LinkedList<T> bufferList = new LinkedList<T>(); //缓冲区集合
-
-
//缓冲区中的产品个数
-
public int bufferSize(){
-
synchronized (this.bufferList){
-
return this.bufferList.size();
-
}
-
}
-
-
//生产者,生产产品
-
public void produce(List<T> t){
-
synchronized (this.bufferList){
-
log.info("【缓冲区】生产者的产品,进入缓冲区,数量:" + t.size());
-
while(this.bufferList.size() > 0){ //当缓冲区中有数据时,暂停生产
-
try{
-
bufferList.wait(); //生产阻塞
-
}catch (InterruptedException e){
-
e.printStackTrace();
-
}
-
}
-
-
log.info("【缓冲区】产品开始添加到缓冲区");
-
for(int i=0; i<t.size(); i++){
-
bufferList.add(t.get(i));
-
}
-
log.info("【缓冲区】产品添加完成 size=" + t.size() + "\n");
-
bufferList.notifyAll();
-
}
-
}
-
-
//消费者,消费产品
-
public T consume(){
-
synchronized (this.bufferList){
-
while(this.bufferList.size() == 0){ //当缓冲区,没有产品时,消费者被阻塞
-
try {
-
this.bufferList.wait();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
-
T result = this.bufferList.getFirst();
-
this.bufferList.remove();
-
this.bufferList.notifyAll();
-
return result;
-
}
-
-
}
-
-
}
-
package com.common.ProducerConsumer;
-
-
import java.util.List;
-
-
//生产,观察者接口
-
//author:程晓鹏
-
//date:2016.10.17
-
public interface IWatcherProducer<T> {
-
//开始生产产品
-
public List<T> beginProduce(String msg);
-
}
-
package com.common.ProducerConsumer;
-
-
import java.util.List;
-
import org.apache.log4j.Logger;
-
-
//产品生产者类
-
//author:程晓鹏
-
//date:2016.10.17
-
public abstract class ProducerThread<T> extends Thread implements IWatcherProducer<T>{
-
-
private static final Logger log = Logger.getLogger(ProducerThread.class);
-
-
//默认构造函数
-
public ProducerThread(){
-
-
}
-
-
//开始运行
-
public void run(){
-
-
}
-
-
//实现接口
-
public List<T> beginProduce(String msg){
-
log.info("【生产者 thread_id="+this.getId()+"】收到管理线程发送的消息:" + msg);
-
List<T> productData = this.produce();
-
int size = productData!=null ? productData.size() : 0;
-
log.info("【生产者】生产产品数量:" + size + " thread_id:" + this.getId());
-
return productData;
-
}
-
-
//抽象生产者生成产品
-
public abstract List<T> produce();
-
}
-
package com.common.ProducerConsumer;
-
-
import org.apache.log4j.Logger;
-
-
//消费者类
-
//author:程晓鹏
-
//date:2016.10.17
-
public abstract class ConsumerThread<T> extends Thread{
-
private static final Logger log = Logger.getLogger(ConsumerThread.class);
-
private BufferStorage<T> buffstorage; //缓冲区仓库
-
private long lthreadId = this.getId(); //线程id
-
private long sleepTime;
-
-
//构造函数
-
public ConsumerThread(BufferStorage<T> storage){
-
this.buffstorage = storage;
-
sleepTime = 100;
-
}
-
-
public void run(){
-
this.beginConsume();
-
}
-
-
private void beginConsume(){
-
while(true){
-
log.info("【消费者】缓冲区,产品数:" + this.buffstorage.bufferSize() + " thread_id:" + lthreadId);
-
if(this.buffstorage.bufferSize() == 0){
-
log.info("【消费者】缓冲区无产品,消费者线程休眠 " + sleepTime + "ms.");
-
try {
-
sleep(sleepTime);
-
} catch (InterruptedException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
-
T t = this.buffstorage.consume();
-
if(t != null){
-
this.consume(t);
-
}
-
}
-
}
-
-
//消费者,消费产品
-
public abstract void consume(T t);
-
}
-
package com.common.ProducerConsumer;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
import org.apache.log4j.Logger;
-
-
//线程管理类
-
public class ManageThread<T> extends Thread{
-
private static final Logger log = Logger.getLogger(ManageThread.class);
-
private List<IWatcherProducer<T>> listProducer = new ArrayList<IWatcherProducer<T>>(); //生产者观察者集合
-
private List<T> productDataList = new ArrayList<T>(); //生产者生产的产品集合
-
private BufferStorage<T> buffstorage; //缓冲区仓库
-
private long sleepTime;
-
-
//构造函数
-
public ManageThread(BufferStorage<T> storage){
-
this.buffstorage = storage;
-
sleepTime = 1000;
-
}
-
-
//注册生产观察者对象
-
public void attachProducer(IWatcherProducer<T> observer){
-
this.listProducer.add(observer);
-
}
-
-
//删除生产观察者对象
-
public void detachProducer(IWatcherProducer<T> observer){
-
this.listProducer.remove(observer);
-
}
-
-
//通知所有注册的生产观察者对象
-
private void nodifyProducerObservers(String msg){
-
synchronized(this.productDataList){
-
int len = this.listProducer.size();
-
for(int i=0; i<len; i++){
-
IWatcherProducer<T> observer = this.listProducer.get(i);
-
log.info(String.format("【管理线程】" + msg + " --> 进度 %d/%d", i+1, len));
-
List<T> data = observer.beginProduce(msg);
-
if(null != data){
-
for(int j=0; j<data.size(); j++){
-
productDataList.add(data.get(j));
-
}
-
}
-
}
-
log.info("【管理线程】通知所有已注册的生产者 size:"+this.productDataList.size()+" finish.\n");
-
}
-
}
-
-
public void run(){
-
while(true){
-
this.beginRun();
-
try {
-
this.sleep(sleepTime);
-
} catch (InterruptedException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
}
-
-
//开始运行
-
private void beginRun(){
-
log.info("【管理线程】缓冲区,产品数:" + this.buffstorage.bufferSize() + " thread_id:" +this.getId());
-
if(this.buffstorage.bufferSize() == 0){
-
this.nodifyProducerObservers("缓冲区中没有产品,生产者需要生产产品.");
-
if(this.productDataList.size() > 0){
-
this.buffstorage.produce(this.productDataList); //送入缓冲区
-
this.productDataList.clear(); //清空数据
-
}
-
}
-
}
-
}
测试程序如下:
-
package com.common.ProducerConsumer.demo;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
-
-
import com.hualong.entity.ResponseXml;
-
import com.hualong.tux.CallLua;
-
import com.hualong.tux.SmsData;
-
import com.huawei.G4Data.Java2Xml;
-
import com.common.ProducerConsumer.BufferStorage;
-
import com.common.ProducerConsumer.ConsumerThread;
-
-
//短信消费者
-
public class SmsConsumer<SmsData> extends ConsumerThread<com.hualong.tux.SmsData>{
-
-
public SmsConsumer(BufferStorage<com.hualong.tux.SmsData> storage) {
-
super(storage);
-
}
-
-
//消费者
-
public void consume(com.hualong.tux.SmsData t)
-
{
-
Java2Xml myxml = new Java2Xml();
-
ResponseXml result = CallLua.callBsh(t, "cxll_4g.bsh");
-
myxml.loadJavaObject(result);
-
String str ="";
-
String strXml = myxml.getXmlResult();
-
System.out.println(strXml); //打印结果
-
}
-
}
-
package com.common.ProducerConsumer.demo;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
-
import com.hualong.tux.SmsData;
-
import com.common.ProducerConsumer.BufferStorage;
-
import com.common.ProducerConsumer.ProducerThread;
-
-
public class SmsProducer<SmsData> extends ProducerThread<com.hualong.tux.SmsData> {
-
-
private int i_max_request = 2;
-
private int i_count = 0;
-
-
//构造函数
-
public SmsProducer() {
-
super();
-
}
-
-
public List<com.hualong.tux.SmsData> produce()
-
{
-
List<com.hualong.tux.SmsData> listArray = new ArrayList<com.hualong.tux.SmsData>();
-
if(i_count < i_max_request){ //模拟正在请求时,没有数据的情况
-
for(int i=0; i<5; i++){
-
com.hualong.tux.SmsData sms = new com.hualong.tux.SmsData();
-
sms.setRowid("AAAV6cAAGAAM3S9AAB");
-
sms.setSmsId("MO_0934410000271A_20161014093441");
-
sms.setUserNum("66666666666");
-
sms.setCityCode("0372");
-
sms.setMessage("0760ABC11");
-
-
listArray.add(sms);
-
}
-
i_count++;
-
}
-
return listArray;
-
-
}
-
}
-
package com.common.ProducerConsumer.demo;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
-
import com.hualong.tux.SmsData;
-
import com.common.ProducerConsumer.BufferStorage;
-
import com.common.ProducerConsumer.ProducerThread;
-
-
public class SmsProducer2<SmsData> extends ProducerThread<com.hualong.tux.SmsData> {
-
-
private int i_max_request = 2;
-
private int i_count = 0;
-
-
//构造函数
-
public SmsProducer2(){
-
super();
-
}
-
-
public List<com.hualong.tux.SmsData> produce()
-
{
-
List<com.hualong.tux.SmsData> listArray = new ArrayList<com.hualong.tux.SmsData>();
-
if(i_count < i_max_request){ //模拟正在请求时,没有数据的情况
-
for(int i=0; i<6; i++){
-
com.hualong.tux.SmsData sms = new com.hualong.tux.SmsData();
-
sms.setRowid("AAAV6cAAGAAM3S9AAB");
-
sms.setSmsId("MO_0934410000271A_20161014093441");
-
sms.setUserNum("18639551269");
-
sms.setCityCode("0371");
-
sms.setMessage("07600011");
-
-
listArray.add(sms);
-
}
-
i_count++;
-
}
-
return listArray;
-
-
}
-
}
-
package com.common.ProducerConsumer.demo;
-
-
import org.apache.log4j.PropertyConfigurator;
-
-
import com.hualong.tux.SmsData;
-
import com.common.ProducerConsumer.BufferStorage;
-
import com.common.ProducerConsumer.ManageThread;
-
-
public class testclient {
-
public static void main(String[] args) {
-
BufferStorage<SmsData> buffer = new BufferStorage<SmsData>();
-
ManageThread<SmsData> manager = new ManageThread<SmsData>(buffer);
-
SmsProducer<SmsData> producer = new SmsProducer<SmsData>();
-
SmsProducer2<SmsData> producer2 = new SmsProducer2<SmsData>();
-
-
manager.attachProducer(producer); //注册生产者
-
manager.attachProducer(producer2);//注册生产者
-
-
SmsConsumer<SmsData> consumer = new SmsConsumer<SmsData>(buffer);
-
SmsConsumer<SmsData> consumer2 = new SmsConsumer<SmsData>(buffer);
-
SmsConsumer<SmsData> consumer3 = new SmsConsumer<SmsData>(buffer);
-
-
//log4j配置文件地址
-
String log4j_url = "/home/path/log4j_ProducerConsumer.properties";
-
PropertyConfigurator.configure(log4j_url);
-
-
manager.start();
-
producer.start();
-
producer2.start();
-
-
consumer.start();
-
consumer2.start();
-
consumer3.start();
-
}
-
}
-
log4j.rootLogger = info,default
-
-
log4j.appender.console = org.apache.log4j.ConsoleAppender
-
log4j.appender.console.Target = System.out
-
log4j.appender.console.layout = org.apache.log4j.PatternLayout
-
log4j.appender.console.layout.ConversionPattern = %p %d{HH:mm:ss} %l %m %n
-
-
log4j.appender.default = org.apache.log4j.DailyRollingFileAppender
-
log4j.appender.default.File = /home/smsapp/Sms4gClient/logs/sms4g.log
-
log4j.appender.default.Append = true
-
log4j.appender.default.Threshold = info
-
log4j.appender.default.layout = org.apache.log4j.PatternLayout
-
log4j.appender.default.layout.ConversionPattern = %p %d{MM-dd HH:mm:ss} %l %m %n
-
-
#class print #####################################################################
-
log4j.logger.com.huawei.ProducerConsumer.BufferStorage=info,buffer
-
log4j.appender.buffer = org.apache.log4j.DailyRollingFileAppender
-
log4j.appender.buffer.File = /home/smsapp/Sms4gClient/logs/buffer.log
-
log4j.appender.buffer.Append = true
-
log4j.appender.buffer.Threshold = info
-
log4j.appender.buffer.layout = org.apache.log4j.PatternLayout
-
log4j.appender.buffer.layout.ConversionPattern = %d{MM-dd HH:mm:ss} %m %n
-
-
log4j.logger.com.huawei.ProducerConsumer.ConsumerThread=info,Consumer
-
log4j.appender.Consumer = org.apache.log4j.DailyRollingFileAppender
-
log4j.appender.Consumer.File = /home/smsapp/Sms4gClient/logs/consumer.log
-
log4j.appender.Consumer.Append = true
-
log4j.appender.Consumer.Threshold = info
-
log4j.appender.Consumer.layout = org.apache.log4j.PatternLayout
-
log4j.appender.Consumer.layout.ConversionPattern = %d{MM-dd HH:mm:ss,SSS} %m %n
-
-
log4j.logger.com.huawei.ProducerConsumer.ManageThread=info,listener
-
log4j.appender.listener = org.apache.log4j.DailyRollingFileAppender
-
log4j.appender.listener.File = /home/smsapp/Sms4gClient/logs/listener.log
-
log4j.appender.listener.Append = true
-
log4j.appender.listener.Threshold = info
-
log4j.appender.listener.layout = org.apache.log4j.PatternLayout
-
log4j.appender.listener.layout.ConversionPattern = %d{MM-dd HH:mm:ss} %m %n
-
-
log4j.logger.com.huawei.ProducerConsumer.ProducerThread=info,producer
-
log4j.appender.producer = org.apache.log4j.DailyRollingFileAppender
-
log4j.appender.producer.File = /home/smsapp/Sms4gClient/logs/producer.log
-
log4j.appender.producer.Append = true
-
log4j.appender.producer.Threshold = info
-
log4j.appender.producer.layout = org.apache.log4j.PatternLayout
-
log4j.appender.producer.layout.ConversionPattern = %d{MM-dd HH:mm:ss} %m %n
ProducerConsumer.rar
阅读(3960) | 评论(0) | 转发(0) |