Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1210665
  • 博文数量: 259
  • 博客积分: 10
  • 博客等级: 民兵
  • 技术积分: 2518
  • 用 户 组: 普通用户
  • 注册时间: 2012-10-13 16:12
个人简介

科技改变世界,技术改变人生。

文章分类

全部博文(259)

分类: 大数据

2016-02-25 17:39:59

Flume NG节点组成图:



此博文配置方法,将/var/log/hadoop/hdfs里面的文件重命名为x.COMPLETED,建议将建立一个flume收集日志的目录如abc,将要处理的log cp到这个abc,实际的配置配上abc的目录,之后定时删除.COMPLETED结尾的文件。

说明:
server5收集日志信息传送到server4,server4将数据上传到hdfs
server4主机名:testserver3.bj
日志的目录:/var/log/hadoop/hdfs

Collector的server4节点,上传HDFS
vi agent1.conf
agent1.sources=source1
agent1.channels=channel1
agent1.sinks=sink1

agent1.sources.source1.type=avro
agent1.sources.source1.bind=0.0.0.0      #hdfs创建ip的目录对应%{host}
agent1.sources.source1.port=44444
agent1.sources.source1.channels=channel1
agent1.sources.source1.interceptors = i1 i2
agent1.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent1.sources.source1.interceptors.i1.preserveExisting = true
agent1.sources.source1.interceptors.i1.useIP = true
agent1.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=10000
agent1.channels.channel1.transactionCapacity=1000
agent1.channels.channel1.keep-alive=30

agent1.sinks.sink1.type=hdfs
agent1.sinks.sink1.channel=channel1
agent1.sinks.sink1.hdfs.path=hdfs://testcluster/flume/events/%{host}/%y-%m-%d
agent1.sinks.sink1.hdfs.fileType=DataStream
agent1.sinks.sink1.hdfs.writeFormat=Text
agent1.sinks.sink1.hdfs.rollInterval=0
agent1.sinks.sink1.hdfs.rollSize=10000   #上传到hdfs按照10k大小切分
agent1.sinks.sink1.hdfs.rollCount=0   #不设置的话默认是10行,上传的文件内容如果多于10行,会被切分成两个文件,0不按照行数切分
agent1.sinks.sink1.hdfs.idleTimeout=5


server5节点

vi agent1.conf
#agent1表示代理名称
agent1.sources=source1
agent1.sinks=sink1
agent1.channels=channel1


#配置source1
agent1.sources.source1.type=spooldir
agent1.sources.source1.spoolDir=/var/log/hadoop/hdfs
agent1.sources.source1.channels=channel1

#配置sink1
agent1.sinks.sink1.type=avro
agent1.sinks.sink1.hostname=testserver3.bj
agent1.sinks.sink1.port=44444
agent1.sinks.sink1.channel=channel1

#配置channel1
agent1.channels.channel1.type=file
agent1.channels.channel1.checkpointDir=/usr/local/flume_tmp/checkpoint
agent1.channels.channel1.dataDirs=/usr/local/flume_tmp/


启动flume
server4:
./bin/flume-ng agent --conf conf --conf-file conf/agent1.conf --name agent1 -Dflume.root.logger=INFO,console

server5:
./bin/flume-ng agent --conf conf --conf-file conf/agent1.conf --name agent1 -Dflume.root.logger=INFO,console

需要后台的话加上nohub .. &

报错:
15/05/20 10:12:25 ERROR source.SpoolDirectorySource: Uncaught exception in Runnable
java.lang.IllegalStateException: Serializer has been closed
 at org.apache.flume.serialization.LineDeserializer.ensureOpen(LineDeserializer.java:124)
 at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:88)
 at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:221)
 at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:160)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
 at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)

看了一下spool的目录下的文件,发现有个文件已经处理了(后缀加上了.COMPLETED),然后目录下还有一个文件跟这个是同名的,目录下存在:
123.log.COMPLETED和123.log两个文件,就会报上诉错误,避免出现这样的问题

问题:
大文件拷贝(mv,cp, scp等)到监控目录,确实出现了exception的问题(Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.)

解决方法:
Spooling Directory Source提供了下面一个参数:当对文件完成采集时将文件结尾添加后缀名
fileSuffix    .COMPLETED    Suffix to append to completely ingested files
所以我们只需要把要移动的文件名加上后缀后移动到监控目录,然后再修改文件名将后缀去掉就可以解决这个异常。


说明:
10:37:48,332 (pool-6-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run
(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.
到了这里说明你的程序运行正常了,但是你的监视目录下没有新文件的产生,所以会一直出现上面的那条信息

参考:
http://www.flybi.net/blog/lp_hadoop/1241
http://blog.csdn.net/wulantian/article/details/46341135
http://www.cnblogs.com/cswuyg/p/4498804.html
http://my.oschina.net/leejun2005/blog/288136?fromerr=i2i4118B#OSC_h1_1
阅读(3459) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~