Chinaunix首页 | 论坛 | 博客
  • 博客访问: 91100
  • 博文数量: 13
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 190
  • 用 户 组: 普通用户
  • 注册时间: 2013-02-18 14:35
文章分类
文章存档

2017年(1)

2016年(5)

2015年(1)

2013年(6)

我的朋友

分类: HADOOP

2016-03-03 09:55:04

 不说过程了,直接说结果!一对相连接的channel-HdfsSink,无意间配置如下:
...
agent.channels.common-channel.transactionCapacity=10
...
agent.sinks.hdfs-sink.hdfs.batchSize=20

  简单测试之后发现flume报如下异常,倒也正常……

[2015-12-17 11:42:09:694 ERROR][org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:467)]process failed
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or in creasing thread count
        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
        at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
[2015-12-17 11:42:09:696 ERROR][org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)]Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:471)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
        at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
        at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
        ... 3 more

  但是......但是老子发现停掉发送,channelSize一直不减,hdfs里的数据也一直在涨!!!而且永远停不下来,数据被永远重放!!!
  查看相关HdfsSink代码如下:


  public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    List writers = Lists.newArrayList();
    transaction.begin(); try { int txnEventCount = 0; for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        Event event = channel.take(); if (event == null) { break;
        }
        ...... bucketWriter.append(event); ......

      transaction.commit(); if (txnEventCount < 1) { return Status.BACKOFF;
      } else {
        sinkCounter.addToEventDrainSuccessCount(txnEventCount); return Status.READY;
      }
    } catch (IOException eIO) { transaction.rollback(); LOG.warn("HDFS IO error", eIO); return Status.BACKOFF;
    } catch (Throwable th) { transaction.rollback(); LOG.error("process failed", th); if (th instanceof Error) { throw (Error) th;
      } else { throw new EventDeliveryException(th);
      }
    } finally {
      transaction.close();
    }
  }

 

  看到了嘛!异常在取第11个数据时channel.take();出现,然后此次事物被回滚,但是,但是尼玛之前取出来的10个Event都被bucketWriter.append(event);了,也就是被写到hdfs了;
然后就是最初的现象了,事物一直不断的被回滚,但部分取到的数据却也写到hdfs了,这尼玛算是什么事务……
  虽然是不合理的配置参数,但flume启动时有一大陀检测参数的代码也没检测到这些,至少给报个错或WARN嘛!

原文出处:http://www.cnblogs.com/logicbaby/archive/2015/12/17/5053685.html

阅读(2520) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~