Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4608490
  • 博文数量: 1214
  • 博客积分: 13195
  • 博客等级: 上将
  • 技术积分: 9105
  • 用 户 组: 普通用户
  • 注册时间: 2007-01-19 14:41
个人简介

C++,python,热爱算法和机器学习

文章分类

全部博文(1214)

文章存档

2021年(13)

2020年(49)

2019年(14)

2018年(27)

2017年(69)

2016年(100)

2015年(106)

2014年(240)

2013年(5)

2012年(193)

2011年(155)

2010年(93)

2009年(62)

2008年(51)

2007年(37)

分类: 架构设计与优化

2020-06-15 16:01:08

利用Flume 汇入数据到HBase:Flume-hbase-sink 使用方法详解
 https://blog.csdn.net/mnasd/article/details/81878944


一、HBasesinks的三种序列化模式使用说明
1.1 HBasesink--SimpleHbaseEventSerializer
如下是展示如何使用 HBasesink--SimpleHbaseEventSerializer:


agenttest.channels = memoryChannel-1
agenttest.sinks = hbaseSink-1
agenttest.sinks.hbaseSink-1.type = org.apache.flume.sink.hbase.HBaseSink
agenttest.sinks.hbaseSink-1.table = test_hbase_table  //HBase表名
agenttest.sinks.hbaseSink-1.columnFamily = familycolumn-1  //HBase表的列族名称
agenttest.sinks.hbaseSink-1.serializer= org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
agenttest.sinks.hbaseSink-1.serializer.payloadColumn = columnname  //HBase表的列族下的某个列名称
agenttest.sinks.hbaseSink-1.channels = memoryChannel-1
注:当指定存入到HBase表的某个列族的指定列column时,不能写成:


agenttest.sinks.hbaseSink-1.columnName = columnname
或者:
agenttest.sinks.hbaseSink-1.column = columnname
这些都是网上的错误写法!另外两个序列化模式也是不能这样使用。


1.2 HBasesink--RegexHbaseEventSerializer
如下是展示如何使用 HBasesink--RegexHbaseEventSerializer(使用正则匹配切割event,然后存入HBase表的多个列):


agenttest.channels = memoryChannel-2
agenttest.sinks = hbaseSink-2
agenttest.sinks.hbaseSink-2.type = org.apache.flume.sink.hbase.HBaseSink
agenttest.sinks.hbaseSink-2.table = test_hbase_table
agenttest.sinks.hbaseSink-2.columnFamily = familycolumn-2
agenttest.sinks.hbaseSink-2.serializer= org.apache.flume.sink.hbase.RegexHbaseEventSerializer
// 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
agent.sinks.hbaseSink-2.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
// 指定上面正则匹配到的数据对应的hbase的familycolumn-2 列族下的4个cloumn列名
agent.sinks.hbaseSink-2.serializer.colNames = column-1,column-2,column-3,column-4
#agent.sinks.hbaseSink-2.serializer.payloadColumn = test
agenttest.sinks.hbaseSink-2.channels = memoryChannel-2
1.3 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer
如下是展示如何使用 AsyncHBaseSink--SimpleAsyncHbaseEventSerializer: 


agenttest.channels = memoryChannel-3
agenttest.sinks = hbaseSink-3
agenttest.sinks.hbaseSink-3.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agenttest.sinks.hbaseSink-3.table = test_hbase_table
agenttest.sinks.hbaseSink-3.columnFamily = familycolumn-3
agenttest.sinks.hbaseSink-3.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agenttest.sinks.hbaseSink-3.serializer.payloadColumn = columnname  //HBase表的列族下的某个列名称
agenttest.sinks.hbaseSink-3.channels = memoryChannel-3  
二、具体案例示例---利用flume+HBase构建大数据采集汇总系统
2.1 利用SimpleHbaseEventSerializer序列化模式
我们首先在HBase里面建立一个表mikeal-hbase-table,拥有familyclom1和familyclom2两个列族:


hbase(main):102:0> create 'mikeal-hbase-table','familyclom1','familyclom2'
0 row(s) in 1.2490 seconds
=> Hbase::Table - mikeal-hbase-table
然后写一个flume的配置文件test-flume-into-hbase.conf:


# 从文件读取实时消息,不做处理直接存储到Hbase
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink
 
# logfile-source配置
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50
# 组合source和channel
agent.sources.logfile-source.channels = file-channel
 
# channel配置,使用本地file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data
 
# sink 配置为HBaseSink 和 SimpleHbaseEventSerializer
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
#HBase表名
agent.sinks.hbase-sink.table = mikeal-hbase-table
#HBase表的列族名称
agent.sinks.hbase-sink.columnFamily  = familyclom1
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
#HBase表的列族下的某个列名称
agent.sinks.hbase-sink.serializer.payloadColumn = cloumn-1
# 组合sink和channel
agent.sinks.hbase-sink.channel = file-channel
从配置文件可以看出,我们选择本地的/data/flume-hbase-test/mkhbasetable/data/nginx.log日志目录作为实时数据采集源,选择本地文件目录/data/flume-hbase-test/data作为channel,选择HBase为sink(也就是数据流向写入HBase)。


注意:提交 flume-ng 任务的用户,比如flume用户,必须要有/data/flume-hbase-test/mkhbasetable/data/nginx.log 和/data/flume-hbase-test/data 目录与文件的读写权限;也必须要有HBase的读写权限。


启动Flume:


bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase.conf -Dflume.root.logger=DEBUG,console
在另外一个shell客户端,输入:


echo "nging-1" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
echo "nging-2" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
再查看mikeal-hbase-table表:


数据已经作为value插入到表里面。


2.2 利用SimpleAsyncHbaseEventSerializer序列化模式
为了示例清晰,先把mikeal-hbase-table表数据清空:


truncate 'mikeal-hbase-table'    //truncate 和 delete 只删除数据不删除表的结构,
//drop 语句将删除表的结构被依赖的约束(constrain)、触发器(trigger)、索引(index)
然后写一个flume的配置文件test-flume-into-hbase-2.conf:


# 从文件读取实时消息,不做处理直接存储到Hbase
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink# logfile-source配置
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50
 
# channel配置,使用本地file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data
 
# sink 配置为 Hbase
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.AsyncHBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table
agent.sinks.hbase-sink.columnFamily  = familyclom1
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agent.sinks.hbase-sink.serializer.payloadColumn = cloumn-1
 
# 组合source、sink和channel
agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel
启动Flume:


bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase-2.conf -Dflume.root.logger=DEBUG,console
在另外一个shell客户端,输入:


echo "nging-1" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
echo "nging-two" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
echo "nging-three" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
再查看mikeal-hbase-table表:


 


2.3 利用RegexHbaseEventSerializer序列化模式
RegexHbaseEventSerializer可以使用正则匹配切割event,然后存入HBase表的多个列。因此,本文简单展示如何使用RegexHbaseEventSerializer对event进行切割然后存存入HBase的多个列。


为了示例清晰,先把mikeal-hbase-table表数据清空:


truncate 'mikeal-hbase-table'
然后写一个flume的配置文件test-flume-into-hbase-3.conf:


# 从文件读取实时消息,不做处理直接存储到Hbase
agent.sources = logfile-source
agent.channels = file-channel
agent.sinks = hbase-sink
 
# logfile-source配置
agent.sources.logfile-source.type = exec
agent.sources.logfile-source.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source.checkperiodic = 50
 
# channel配置,使用本地file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel.dataDirs = /data/flume-hbase-test/data
 
# sink 配置为 Hbase
agent.sinks.hbase-sink.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink.table = mikeal-hbase-table
agent.sinks.hbase-sink.columnFamily  = familyclom1
agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
agent.sinks.hbase-sink.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
agent.sinks.hbase-sink.serializer.colNames = time,url,number
 
# 组合source、sink和channel
agent.sources.logfile-source.channels = file-channel
agent.sinks.hbase-sink.channel = file-channel
启动Flume:


bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase-3.conf -Dflume.root.logger=DEBUG,console
在另外一个shell客户端,输入:


echo "[2016-12-22-19:59:59] [] [10]" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
echo "[2016-12-22 20:00:12] [] [19]" >> /data/flume-hbase-test/mkhbasetable/data/nginx.log;
再查看mikeal-hbase-table表:


 


可以看到数据已经按照规则:正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) ,进行切割,并且顺利地存入到mikeal-hbase-table表的time,url,number的三个column列。


三、多source,多channel和多sink的复杂案例
本文接下来展示一个比较复杂的flume导入数据到HBase的实际案例:多souce、多channel和多sink的场景。为了示例清晰,先把mikeal-hbase-table表数据清空:


truncate 'mikeal-hbase-table'
然后写一个flume的配置文件test-flume-into-hbase-multi-position.conf:


# 从文件读取实时消息,不做处理直接存储到Hbase
agent.sources = logfile-source-1 logfile-source-2
agent.channels = file-channel-1 file-channel-2
agent.sinks = hbase-sink-1 hbase-sink-2
 
# logfile-source配置
agent.sources.logfile-source-1.type = exec
agent.sources.logfile-source-1.command = tail -f /data/flume-hbase-test/mkhbasetable/data/nginx.log
agent.sources.logfile-source-1.checkperiodic = 50
 
agent.sources.logfile-source-2.type = exec
agent.sources.logfile-source-2.command = tail -f /data/flume-hbase-test/mkhbasetable/data/tomcat.log
agent.sources.logfile-source-2.checkperiodic = 50
 
# channel配置,使用本地file
agent.channels.file-channel-1.type = file
agent.channels.file-channel-1.checkpointDir = /data/flume-hbase-test/checkpoint
agent.channels.file-channel-1.dataDirs = /data/flume-hbase-test/data
 
agent.channels.file-channel-2.type = file
agent.channels.file-channel-2.checkpointDir = /data/flume-hbase-test/checkpoint2
agent.channels.file-channel-2.dataDirs = /data/flume-hbase-test/data2
 
# sink 配置为 Hbase
agent.sinks.hbase-sink-1.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-1.table = mikeal-hbase-table
agent.sinks.hbase-sink-1.columnFamily  = familyclom1
agent.sinks.hbase-sink-1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
# 比如我要对nginx日志做分割,然后按列存储HBase,正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) 这种格式, 所以用下面的正则:
agent.sinks.hbase-sink-1.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
agent.sinks.hbase-sink-1.serializer.colNames = time,url,number
 
agent.sinks.hbase-sink-2.type = org.apache.flume.sink.hbase.HBaseSink
agent.sinks.hbase-sink-2.table = mikeal-hbase-table
agent.sinks.hbase-sink-2.columnFamily  = familyclom2
agent.sinks.hbase-sink-2.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent.sinks.hbase-sink-2.serializer.regex = \\[(.*?)\\]\\ \\[(.*?)\\]\\ \\[(.*?)\\]
agent.sinks.hbase-sink-2.serializer.colNames = time,IP,number
 
# 组合source、sink和channel
agent.sources.logfile-source-1.channels = file-channel-1
agent.sinks.hbase-sink-1.channel = file-channel-1
 
agent.sources.logfile-source-2.channels = file-channel-2
agent.sinks.hbase-sink-2.channel = file-channel-2
启动Flume:


bin/flume-ng agent --name agent --conf /etc/flume/conf/agent/ --conf-file /etc/flume/conf/agent/test-flume-into-hbase-multi-position.conf -Dflume.root.logger=DEBUG,console
在另外一个shell客户端,输入:


echo "[2016-12-22 20:04:12] [] [16]" >> nginx.log;
echo "[2016-12-22 20:04:13] [123.41.90.135] [22]" >> tomcat.log;
echo "[2016-12-22 20:05:19] [] [24]" >> nginx.log;
echo "[2016-12-22 20:05:21] [134.92.146.109] [25]" >> tomcat.log;
再查看mikeal-hbase-table表:可以看到数据已经按照规则:正则匹配分成的列为: ([xxx] [yyy] [zzz] [nnn] ...) ,进行切割,并且顺利地存入到mikeal-hbase-table表,并且按照familyclom1 和 familyclom2 两个列族分配存到三个cloumn列里面。


 


在本方案中,我们要将数据存储到HBase中,所以使用flume中提供的hbase sink,同时,为了清洗转换日志数据,我们实现自己的AsyncHbaseEventSerializer。https://www.cnblogs.com/gaopeng527/p/5010985.html
public class AsyncHbaseLTEEventSerializer implements AsyncHbaseEventSerializer {
    //表名
    private byte[] table;
    //列族
    private byte[] colFam;
    //当前事件
    private Event currentEvent;
    //列名
    private byte[][] columnNames;
    //用于向HBase批量存储数据
    private final List puts = new ArrayList();
    private final List incs = new ArrayList();
    //当前行键
    private byte[] currentRowKey;
    private final byte[] eventCountCol = "eventCount".getBytes();
    
    @Override
    public void configure(Context context) {
        //从配置文件中获取列名
        String cols = new String(context.getString("columns"));
        String[] names = cols.split(",");
        columnNames = new byte[names.length][];
        int i = 0;
        for(String name:names){
            columnNames[i++] = name.getBytes();
        } 
    }
 
    @Override
    public void configure(ComponentConfiguration conf) {
        // TODO Auto-generated method stub 
    }
 
    @Override
    public void cleanUp() {
        // TODO Auto-generated method stub
        table = null;
        colFam = null;
        currentEvent = null;
        columnNames = null;
        currentRowKey = null;
        
    }
 
    @Override
    public List getActions() {
        // 分割事件体获取各列的值
        String eventStr = new String(currentEvent.getBody());
        String[] cols = logTokenize(eventStr);
        puts.clear();
        //数据中的时间
        String time=cols[1];
        int n1 = 13-time.length();
        StringBuilder sb = new StringBuilder(time);
        for(int i=0;i             sb.insert(0, '0');
        }
        try {
            //使用自带的行键生成器生成行键
            currentRowKey = SimpleRowKeyGenerator.getUUIDKey(cols[0]+"-"+sb.toString());
        } catch (UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
//        currentRowKey = (cols[0]+"-"+System.currentTimeMillis()).getBytes(); 
        int n = cols.length;
        // 添加每列数据
        for(int i=0;i             PutRequest putReq = new PutRequest(table, currentRowKey,colFam,columnNames[i],cols[i].getBytes());
            puts.add(putReq);
        }        
        return puts;
    }
 
    @Override
    public List getIncrements() {
        // 增加接收到的事件数量
        incs.clear();
        incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
        return incs;
    }
 
    @Override
    //初始化表名和列名
    public void initialize(byte[] table, byte[] cf) {
        
        this.table = table;
        this.colFam = cf; 
    }
 
    @Override
    public void setEvent(Event event) {
        // TODO Auto-generated method stub
        this.currentEvent = event;
    }
    
    //从日志中获取列值信息
    public String[] logTokenize(String eventStr) {
 
//      String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\"";
//        Pattern p = Pattern.compile(logEntryPattern);
//        Matcher matcher = p.matcher(eventStr); 
 
/*        if (!matcher.matches()){
            System.err.println("Bad log entry (or problem with RE?):");
            System.err.println(eventStr);
            return null;
        }
*/
        
/*        String[] columns = new String[matcher.groupCount()];       
        for (int i = 0; i < matcher.groupCount(); i++){
            columns[i] = matcher.group(i+1);
        }*/
        
        String[] s = eventStr.split("[:,]");
        int n = s.length;
        String[] columns = new String[n/2];
        for(int i=0;2*i+1             columns[i] = s[2*i+1];
        } 
        return columns; 
    }  
}
2. 将上面的程序打包,放入flume的lib文件夹中
3. 配置Flume,实现采集和存储
配置文件flume-hbase.properties如下:


############################################
#  flume-src-agent config
###########################################
 
#agent section
agent.sources = s
agent.channels = c
agent.sinks = r
 
#source section
#agent.sources.s.type = exec
#agent.sources.s.command = tail -f -n+1 /usr/local/test.log
 
agent.sources.s.type = spooldir
agent.sources.s.spoolDir = /usr/local/flume-hbase
agent.sources.s.fileHeader = true
agent.sources.s.batchSize = 100
agent.sources.s.channels = c
 
 
# Each sink's type must be defined
agent.sinks.r.type = asynchbase
agent.sinks.r.table = car_table
agent.sinks.r.columnFamily = lte
agent.sinks.r.batchSize = 100
agent.sinks.r.serializer = com.ncc.dlut.AsyncHbaseLTEEventSerializer
agent.sinks.r.serializer.columns = cid,time,pci,st,ed,ta,lng,lat
 
#Specify the channel the sink should use
agent.sinks.r.channel = c
 
# Each channel's type is defined.
agent.channels.c.type = memory
agent.channels.c.capacity = 1000
 https://blog.csdn.net/yaoyasong/article/details/39400829


1.      首先开启Tomcat中的日志记录功能,并选择combined格式。


修改TOMCAT_PATH/conf/server.xml,增加日志记录:




               prefix="localhost_access_log." suffix=".txt" renameOnRotate="true"


               pattern="combined" />


这样,tomcat就会在logs目录下每天生成localhost_access_log文件并实时记录用户的访问情况。
 


public class AsyncHbaseLogEventSerializer implements AsyncHbaseEventSerializer{
      private byte[] table;
      private byte[] colFam;
      private Event currentEvent;
      private byte[][] columnNames;
      private final List puts = new ArrayList();
      private final List incs = new ArrayList();
      private byte[] currentRowKey;
      private final byte[] eventCountCol = "eventCount".getBytes();
      public void initialize(byte[] table, byte[] cf) {
               this.table = table;
               this.colFam = cf;
      }
      public void configure(Context context) {
               String cols = new String(context.getString("columns"));
               String[] names = cols.split(",");
               columnNames = new byte[names.length][];              
               int i = 0;
               for (String name : names) {
                         columnNames[i++] = name.getBytes();
               }
      }
      public void configure(ComponentConfiguration conf) {
      }
      public List getActions() {
               // Split the event body and get the values for the columns
               String eventStr = new String(currentEvent.getBody());
               String[] cols = logTokenize(eventStr);
               puts.clear();
               String req = cols[4];
               String reqPath = req.split(" ")[1];
               int pos = reqPath.indexOf("?");
        if (pos > 0) {
              reqPath = reqPath.substring(0,pos);
        }       
        if(reqPath.length() > 1 && reqPath.trim().endsWith("/")){
              reqPath = reqPath.substring(0,reqPath.length()-1);
        }       
        String req_ts_str = cols[3];
        Long currTime = System.currentTimeMillis();
        String currTimeStr = null;
        if (req_ts_str != null && !req_ts_str.equals("")){
                         SimpleDateFormat df = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);
                         SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                         try {
                                  currTimeStr = df2.format(df.parse(req_ts_str));
                                  currTime = df.parse(req_ts_str).getTime();
                         } catch (ParseException e) {
                                  System.out.println("parse req time error,using system.current time.");
                         }
               }
        long revTs = Long.MAX_VALUE - currTime;
        currentRowKey = (Long.toString(revTs) + reqPath).getBytes();
        System.out.println("currentRowKey: " + new String(currentRowKey));       
               for (int i = 0; i < cols.length; i++){
                         PutRequest putReq = new PutRequest(table, currentRowKey, colFam, columnNames[i], cols[i].getBytes());
                         puts.add(putReq);
               }
               //增加列
               PutRequest reqPathPutReq = new PutRequest(table, currentRowKey, colFam, "req_path".getBytes(), reqPath.getBytes());
               puts.add(reqPathPutReq);              
               PutRequest reqTsPutReq = new PutRequest(table, currentRowKey, colFam, "req_ts".getBytes(), Bytes.toBytes(currTimeStr));
               puts.add(reqTsPutReq);              
               String channelType = ChannelUtil.getType(cols[8]);
               PutRequest channelPutReq = new PutRequest(table, currentRowKey, colFam, "req_chan".getBytes(), Bytes.toBytes(channelType));
               puts.add(channelPutReq);              
               return puts;
      } 
      public String[] logTokenize(String eventStr) {
               String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+|-) \"([^\"]+)\" \"([^\"]+)\"";
        Pattern p = Pattern.compile(logEntryPattern);
        Matcher matcher = p.matcher(eventStr); 
        if (!matcher.matches())
        {
            System.err.println("Bad log entry (or problem with RE?):");
            System.err.println(eventStr);
            return null;
        }
        String[] columns = new String[matcher.groupCount()];       
        for (int i = 0; i < matcher.groupCount(); i++)
        {
            columns[i] = matcher.group(i+1);
        }
        return columns;
      } 
      public List getIncrements() {
               incs.clear();
               incs.add(new AtomicIncrementRequest(table, "totalEvents".getBytes(), colFam, eventCountCol));
               return incs;
      }
      public void setEvent(Event event) {
               this.currentEvent = event;
      }
      public void cleanUp() {
               table = null;
               colFam = null;
               currentEvent = null;
               columnNames = null;
               currentRowKey = null;
      }
 
————————————————
版权声明:本文为CSDN博主「曹雪朋」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_22473611/article/details/88101426
阅读(1733) | 评论(0) | 转发(0) |
0

上一篇:hbase 操作

下一篇:鸢尾花(iris)数据集分析

给主人留下些什么吧!~~