Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2486004
  • 博文数量: 308
  • 博客积分: 5547
  • 博客等级: 大校
  • 技术积分: 3782
  • 用 户 组: 普通用户
  • 注册时间: 2009-11-24 09:47
个人简介

hello world.

文章分类

全部博文(308)

分类: Java

2016-10-25 10:27:21

    在工作中,我们有时候需要对表,甚至是多个表,进行循环扫描,如果含有数据进行数据处理。这个让人很容易想到用到,生产者消费者模式,进行数据处理。如果具体的编程时你会发现,网上的很多例子是在单生产者多消费者模式下,正常运行,如果出现了多个生产者时,则会造成阻塞,或者有一个另外一个生产者的数据,无法及时的处理。
    仔细阅读代码,你会发现问题在于,当多个生产者,监控到缓冲区中没有数据,而临界资源访问时,只能是单个线程进行。这样会造成一个生产者进入临界资源,而另外的生成者线程被阻塞,当访问临界资源的那个生产者,结束访问临界资源后,缓冲区已经有了资源,这个时候,按照逻辑,另外的生成者则被阻塞。在极端的情况下,则会造成多个生产者,一直被阻塞无法进入缓冲区,无法进行数据处理。
    明白问题所在,一般就知道了解决的思路。问题在于多个生产者竞争访问临界资源。我采用的思路是,将多个生成者转化为一个生产者,用一个生产者访问临界资源。通过观察者模式,多需要生产数据时,遍历各个生成者,将数据汇总后,一次送入缓冲区。另外设置监听线程,只要缓冲区没有数据,即通过观察者模式,通知生成者生产数据(也就是扫表操作)。为方便使用,采用泛型类,泛型接口。方便系统接入。代码如下:

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer;

  2. import java.util.ArrayList;
  3. import java.util.LinkedList;
  4. import java.util.List;

  5. import org.apache.log4j.Logger;

  6. //生产者,消费者所使用的缓冲区
  7. //author:程晓鹏
  8. //date:2016.10.17
  9. public class BufferStorage<T>{
  10.     
  11.     private static final Logger log = Logger.getLogger(BufferStorage.class);
  12.     private LinkedList<T> bufferList = new LinkedList<T>(); //缓冲区集合
  13.     
  14.      //缓冲区中的产品个数
  15.      public int bufferSize(){
  16.          synchronized (this.bufferList){
  17.              return this.bufferList.size();
  18.          }
  19.      }
  20.     
  21.     //生产者,生产产品
  22.     public void produce(List<T> t){
  23.         synchronized (this.bufferList){
  24.             log.info("【缓冲区】生产者的产品,进入缓冲区,数量:" + t.size());
  25.             while(this.bufferList.size() > 0){ //当缓冲区中有数据时,暂停生产
  26.                 try{
  27.                     bufferList.wait(); //生产阻塞
  28.                 }catch (InterruptedException e){
  29.                     e.printStackTrace();
  30.                 }
  31.             }
  32.             
  33.             log.info("【缓冲区】产品开始添加到缓冲区");
  34.             for(int i=0; i<t.size(); i++){
  35.                 bufferList.add(t.get(i));
  36.             }
  37.             log.info("【缓冲区】产品添加完成 size=" + t.size() + "\n");            
  38.             bufferList.notifyAll();
  39.         }
  40.     }
  41.     
  42.     //消费者,消费产品
  43.     public T consume(){    
  44.         synchronized (this.bufferList){
  45.             while(this.bufferList.size() == 0){ //当缓冲区,没有产品时,消费者被阻塞
  46.                 try {
  47.                     this.bufferList.wait();    
  48.                 } catch (InterruptedException e) {
  49.                     e.printStackTrace();
  50.                 }
  51.             }
  52.             
  53.             T result = this.bufferList.getFirst();
  54.             this.bufferList.remove();
  55.             this.bufferList.notifyAll();
  56.             return result;
  57.         }
  58.         
  59.     }
  60.     
  61. }

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer;

  2. import java.util.List;

  3. //生产,观察者接口
  4. //author:程晓鹏
  5. //date:2016.10.17
  6. public interface IWatcherProducer<T> {
  7.     //开始生产产品
  8.     public List<T> beginProduce(String msg);
  9. }

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer;

  2. import java.util.List;
  3. import org.apache.log4j.Logger;

  4. //产品生产者类
  5. //author:程晓鹏
  6. //date:2016.10.17
  7. public abstract class ProducerThread<T> extends Thread implements IWatcherProducer<T>{
  8.     
  9.     private static final Logger log = Logger.getLogger(ProducerThread.class);

  10.      //默认构造函数
  11.      public ProducerThread(){
  12.         
  13.      }
  14.     
  15.      //开始运行
  16.      public void run(){
  17.         
  18.      }
  19.     
  20.      //实现接口
  21.      public List<T> beginProduce(String msg){
  22.          log.info("【生产者 thread_id="+this.getId()+"】收到管理线程发送的消息:" + msg);        
  23.          List<T> productData = this.produce();
  24.          int size = productData!=null ? productData.size() : 0;
  25.          log.info("【生产者】生产产品数量:" + size + " thread_id:" + this.getId());
  26.          return productData;
  27.      }
  28.     
  29.      //抽象生产者生成产品
  30.      public abstract List<T> produce();    
  31. }

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer;

  2. import org.apache.log4j.Logger;

  3. //消费者类
  4. //author:程晓鹏
  5. //date:2016.10.17
  6. public abstract class ConsumerThread<T> extends Thread{
  7.     private static final Logger log = Logger.getLogger(ConsumerThread.class);
  8.     private BufferStorage<T> buffstorage; //缓冲区仓库
  9.     private long lthreadId = this.getId(); //线程id
  10.     private long sleepTime;
  11.     
  12.      //构造函数
  13.      public ConsumerThread(BufferStorage<T> storage){
  14.          this.buffstorage = storage;
  15.          sleepTime = 100;
  16.      }
  17.     
  18.      public void run(){
  19.          this.beginConsume();        
  20.      }
  21.     
  22.      private void beginConsume(){
  23.          while(true){
  24.              log.info("【消费者】缓冲区,产品数:" + this.buffstorage.bufferSize() + " thread_id:" + lthreadId);
  25.              if(this.buffstorage.bufferSize() == 0){
  26.                  log.info("【消费者】缓冲区无产品,消费者线程休眠 " + sleepTime + "ms.");
  27.                  try {
  28.                     sleep(sleepTime);
  29.                  } catch (InterruptedException e) {
  30.                     // TODO Auto-generated catch block
  31.                     e.printStackTrace();
  32.                  }                
  33.              }
  34.             
  35.              T t = this.buffstorage.consume();
  36.              if(t != null){
  37.                  this.consume(t);
  38.              }
  39.          }
  40.      }
  41.     
  42.      //消费者,消费产品
  43.      public abstract void consume(T t);
  44. }

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer;

  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.log4j.Logger;

  5. //线程管理类
  6. public class ManageThread<T> extends Thread{
  7.     private static final Logger log = Logger.getLogger(ManageThread.class);
  8.     private List<IWatcherProducer<T>> listProducer = new ArrayList<IWatcherProducer<T>>(); //生产者观察者集合
  9.     private List<T> productDataList = new ArrayList<T>(); //生产者生产的产品集合
  10.     private BufferStorage<T> buffstorage; //缓冲区仓库
  11.     private long sleepTime;
  12.     
  13.     //构造函数
  14.      public ManageThread(BufferStorage<T> storage){
  15.          this.buffstorage = storage;
  16.          sleepTime = 1000;
  17.      }
  18.     
  19.     //注册生产观察者对象
  20.      public void attachProducer(IWatcherProducer<T> observer){        
  21.          this.listProducer.add(observer);
  22.      }
  23.     
  24.      //删除生产观察者对象
  25.      public void detachProducer(IWatcherProducer<T> observer){
  26.          this.listProducer.remove(observer);
  27.      }
  28.     
  29.      //通知所有注册的生产观察者对象
  30.      private void nodifyProducerObservers(String msg){
  31.          synchronized(this.productDataList){
  32.              int len = this.listProducer.size();
  33.              for(int i=0; i<len; i++){
  34.                  IWatcherProducer<T> observer = this.listProducer.get(i);
  35.                  log.info(String.format("【管理线程】" + msg + " --> 进度 %d/%d", i+1, len));
  36.                  List<T> data = observer.beginProduce(msg);
  37.                  if(null != data){
  38.                      for(int j=0; j<data.size(); j++){
  39.                          productDataList.add(data.get(j));
  40.                      }
  41.                  }
  42.              }
  43.              log.info("【管理线程】通知所有已注册的生产者 size:"+this.productDataList.size()+" finish.\n");
  44.          }
  45.      }
  46.     
  47.      public void run(){
  48.          while(true){
  49.              this.beginRun();
  50.              try {
  51.                     this.sleep(sleepTime);
  52.                 } catch (InterruptedException e) {
  53.                     // TODO Auto-generated catch block
  54.                     e.printStackTrace();
  55.                 }
  56.          }
  57.      }
  58.     
  59.      //开始运行
  60.      private void beginRun(){
  61.          log.info("【管理线程】缓冲区,产品数:" + this.buffstorage.bufferSize() + " thread_id:" +this.getId());
  62.          if(this.buffstorage.bufferSize() == 0){
  63.              this.nodifyProducerObservers("缓冲区中没有产品,生产者需要生产产品.");
  64.              if(this.productDataList.size() > 0){
  65.              this.buffstorage.produce(this.productDataList); //送入缓冲区
  66.              this.productDataList.clear(); //清空数据
  67.          }
  68.          }
  69.      }
  70. }
测试程序如下:

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer.demo;

  2. import java.util.ArrayList;
  3. import java.util.List;


  4. import com.hualong.entity.ResponseXml;
  5. import com.hualong.tux.CallLua;
  6. import com.hualong.tux.SmsData;
  7. import com.huawei.G4Data.Java2Xml;
  8. import com.common.ProducerConsumer.BufferStorage;
  9. import com.common.ProducerConsumer.ConsumerThread;

  10. //短信消费者
  11. public class SmsConsumer<SmsData> extends ConsumerThread<com.hualong.tux.SmsData>{

  12.     public SmsConsumer(BufferStorage<com.hualong.tux.SmsData> storage) {
  13.         super(storage);
  14.     }
  15.     
  16.     //消费者
  17.     public void consume(com.hualong.tux.SmsData t)
  18.     {
  19.         Java2Xml myxml = new Java2Xml();
  20.         ResponseXml result = CallLua.callBsh(t, "cxll_4g.bsh");
  21.         myxml.loadJavaObject(result);
  22.         String str ="";
  23.         String strXml = myxml.getXmlResult();
  24.         System.out.println(strXml); //打印结果
  25.     }
  26. }

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer.demo;

  2. import java.util.ArrayList;
  3. import java.util.List;

  4. import com.hualong.tux.SmsData;
  5. import com.common.ProducerConsumer.BufferStorage;
  6. import com.common.ProducerConsumer.ProducerThread;

  7. public class SmsProducer<SmsData> extends ProducerThread<com.hualong.tux.SmsData> {
  8.     
  9.     private int i_max_request = 2;
  10.     private int i_count = 0;
  11.     
  12.     //构造函数
  13.     public SmsProducer() {
  14.         super();
  15.     }
  16.     
  17.     public List<com.hualong.tux.SmsData> produce()
  18.     {
  19.         List<com.hualong.tux.SmsData> listArray = new ArrayList<com.hualong.tux.SmsData>();
  20.         if(i_count < i_max_request){ //模拟正在请求时,没有数据的情况
  21.             for(int i=0; i<5; i++){
  22.                 com.hualong.tux.SmsData sms = new com.hualong.tux.SmsData();
  23.                 sms.setRowid("AAAV6cAAGAAM3S9AAB");
  24.                 sms.setSmsId("MO_0934410000271A_20161014093441");
  25.                 sms.setUserNum("66666666666");
  26.                 sms.setCityCode("0372");
  27.                 sms.setMessage("0760ABC11");
  28.                 
  29.                 listArray.add(sms);
  30.             }
  31.             i_count++;
  32.         }
  33.         return listArray;
  34.         
  35.     }
  36. }

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer.demo;

  2. import java.util.ArrayList;
  3. import java.util.List;

  4. import com.hualong.tux.SmsData;
  5. import com.common.ProducerConsumer.BufferStorage;
  6. import com.common.ProducerConsumer.ProducerThread;

  7. public class SmsProducer2<SmsData> extends ProducerThread<com.hualong.tux.SmsData> {
  8.     
  9.     private int i_max_request = 2;
  10.     private int i_count = 0;
  11.     
  12.     //构造函数
  13.     public SmsProducer2(){
  14.         super();
  15.     }
  16.     
  17.     public List<com.hualong.tux.SmsData> produce()
  18.     {
  19.         List<com.hualong.tux.SmsData> listArray = new ArrayList<com.hualong.tux.SmsData>();
  20.         if(i_count < i_max_request){ //模拟正在请求时,没有数据的情况
  21.             for(int i=0; i<6; i++){
  22.                 com.hualong.tux.SmsData sms = new com.hualong.tux.SmsData();
  23.                 sms.setRowid("AAAV6cAAGAAM3S9AAB");
  24.                 sms.setSmsId("MO_0934410000271A_20161014093441");
  25.                 sms.setUserNum("18639551269");
  26.                 sms.setCityCode("0371");
  27.                 sms.setMessage("07600011");
  28.                 
  29.                 listArray.add(sms);
  30.             }
  31.             i_count++;
  32.         }
  33.         return listArray;
  34.         
  35.     }
  36. }

点击(此处)折叠或打开

  1. package com.common.ProducerConsumer.demo;

  2. import org.apache.log4j.PropertyConfigurator;

  3. import com.hualong.tux.SmsData;
  4. import com.common.ProducerConsumer.BufferStorage;
  5. import com.common.ProducerConsumer.ManageThread;

  6. public class testclient {
  7.     public static void main(String[] args) {
  8.         BufferStorage<SmsData> buffer = new BufferStorage<SmsData>();
  9.         ManageThread<SmsData> manager = new ManageThread<SmsData>(buffer);
  10.         SmsProducer<SmsData> producer = new SmsProducer<SmsData>();
  11.         SmsProducer2<SmsData> producer2 = new SmsProducer2<SmsData>();
  12.         
  13.         manager.attachProducer(producer); //注册生产者
  14.         manager.attachProducer(producer2);//注册生产者
  15.         
  16.         SmsConsumer<SmsData> consumer = new SmsConsumer<SmsData>(buffer);
  17.         SmsConsumer<SmsData> consumer2 = new SmsConsumer<SmsData>(buffer);
  18.         SmsConsumer<SmsData> consumer3 = new SmsConsumer<SmsData>(buffer);
  19.         
  20.         //log4j配置文件地址
  21.         String log4j_url = "/home/path/log4j_ProducerConsumer.properties";
  22.         PropertyConfigurator.configure(log4j_url);
  23.         
  24.         manager.start();
  25.         producer.start();
  26.         producer2.start();
  27.         
  28.         consumer.start();
  29.         consumer2.start();
  30.         consumer3.start();
  31.     }
  32. }

点击(此处)折叠或打开

  1. log4j.rootLogger = info,default

  2. log4j.appender.console = org.apache.log4j.ConsoleAppender
  3. log4j.appender.console.Target = System.out
  4. log4j.appender.console.layout = org.apache.log4j.PatternLayout
  5. log4j.appender.console.layout.ConversionPattern = %p %d{HH:mm:ss} %l %m %n

  6. log4j.appender.default = org.apache.log4j.DailyRollingFileAppender
  7. log4j.appender.default.File = /home/smsapp/Sms4gClient/logs/sms4g.log
  8. log4j.appender.default.Append = true
  9. log4j.appender.default.Threshold = info
  10. log4j.appender.default.layout = org.apache.log4j.PatternLayout
  11. log4j.appender.default.layout.ConversionPattern = %p %d{MM-dd HH:mm:ss} %l %m %n

  12. #class print #####################################################################
  13. log4j.logger.com.huawei.ProducerConsumer.BufferStorage=info,buffer
  14. log4j.appender.buffer = org.apache.log4j.DailyRollingFileAppender
  15. log4j.appender.buffer.File = /home/smsapp/Sms4gClient/logs/buffer.log
  16. log4j.appender.buffer.Append = true
  17. log4j.appender.buffer.Threshold = info
  18. log4j.appender.buffer.layout = org.apache.log4j.PatternLayout
  19. log4j.appender.buffer.layout.ConversionPattern = %d{MM-dd HH:mm:ss} %m %n

  20. log4j.logger.com.huawei.ProducerConsumer.ConsumerThread=info,Consumer
  21. log4j.appender.Consumer = org.apache.log4j.DailyRollingFileAppender
  22. log4j.appender.Consumer.File = /home/smsapp/Sms4gClient/logs/consumer.log
  23. log4j.appender.Consumer.Append = true
  24. log4j.appender.Consumer.Threshold = info
  25. log4j.appender.Consumer.layout = org.apache.log4j.PatternLayout
  26. log4j.appender.Consumer.layout.ConversionPattern = %d{MM-dd HH:mm:ss,SSS} %m %n

  27. log4j.logger.com.huawei.ProducerConsumer.ManageThread=info,listener
  28. log4j.appender.listener = org.apache.log4j.DailyRollingFileAppender
  29. log4j.appender.listener.File = /home/smsapp/Sms4gClient/logs/listener.log
  30. log4j.appender.listener.Append = true
  31. log4j.appender.listener.Threshold = info
  32. log4j.appender.listener.layout = org.apache.log4j.PatternLayout
  33. log4j.appender.listener.layout.ConversionPattern = %d{MM-dd HH:mm:ss} %m %n

  34. log4j.logger.com.huawei.ProducerConsumer.ProducerThread=info,producer
  35. log4j.appender.producer = org.apache.log4j.DailyRollingFileAppender
  36. log4j.appender.producer.File = /home/smsapp/Sms4gClient/logs/producer.log
  37. log4j.appender.producer.Append = true
  38. log4j.appender.producer.Threshold = info
  39. log4j.appender.producer.layout = org.apache.log4j.PatternLayout
  40. log4j.appender.producer.layout.ConversionPattern = %d{MM-dd HH:mm:ss} %m %n
ProducerConsumer.rar


阅读(3857) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~