搭建好了
可用的Hbase,开始操纵数据。
Hbase数据操作有两种接口,Thrift和REST。效率上Thrift略微占优势,就决定使用Thrift。此外还有Startbase, HappyBase等封装的第三方python接口。
使用Thrift需要知道的两点:
1. Thrift interface 没有load balancing的代码
(all load balancing will need to be done with external tools such a DNS round-robin, a virtual IP address, or in code.)
2. Thrift and REST client hosts usually don’t run any other services (such as DataNodes or RegionServers) to keep the overhead low and responsiveness high for REST or Thrift interactions.
启动hbase 的 thrift server:
bin/hbase thrift start
制作Thrift python接口的软件包:
-
sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev
-
sudo apt-get install python2.7-dev git
-
git clone thrift
-
cd thrift
-
./bootstrap.sh
-
./configure
-
make
-
sudo make install
-
cd ..
-
mkdir HBaseThrift
-
cd HBaseThrift
-
thrift -gen py /home/ubuntu/hbase-0.98.1/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
-
mv gen-py/* .
-
rm -r gen-py/
-
mkdir thrift
-
cp -rp /home/ubuntu/thrift/lib/py/src/* ./thrift/
-
cp /home/ubuntu/hbase-0.98.1/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift .
已经制作好并提交,方便大家使用。
写一个调用thrift接口的类,方便数据读写。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Yuande Liu
from thrift.transport import TSocket
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport
from hbase import Hbase
import struct
# Method for encoding ints with Thrift's string encoding
def encode(n):
return struct.pack("i", n)
# Method for decoding ints with Thrift's string encoding
def decode(s):
return int(s) if s.isdigit() else struct.unpack('i', s)[0]
class HBaseTest(object):
def __init__(self, table='test', host='127.0.0.1', port=9090):
self.table = table
self.host = host
self.port = port
# Connect to HBase Thrift server
self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
self.protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
# Create and open the client connection
self.client = Hbase.Client(self.protocol)
self.transport.open()
# set type and field of column families
self.set_column_families([str, str, int], ['name', 'sex', 'age'])
self._build_column_families()
def set_column_families(self, type_list, col_list=['name', 'sex', 'age']):
self.columnFamiliesType = type_list
self.columnFamilies = col_list
def _build_column_families(self):
""" give all column families name list, create a table
"""
tables = self.client.getTableNames()
if self.table not in tables:
self.__create_table(self.table)
def __create_table(self, table):
""" create table in hbase with column families
"""
columnFamilies = []
for columnFamily in self.columnFamilies:
name = Hbase.ColumnDescriptor(name=columnFamily)
columnFamilies.append(name)
self.client.createTable(table, columnFamilies)
def __del__(self):
self.transport.close()
def __del_table(self, table):
""" Delete a table, first need to disable it.
"""
self.client.disableTable(table)
self.client.deleteTable(table)
def getColumnDescriptors(self):
return self.client.getColumnDescriptors(self.table)
def put(self, rowKey, qualifier='0', *args):
""" put one row
:param *args: all values correspond to column families.
e.g. [name, sex, age]
Usage::
>>> HBaseTest().put('test', '0', 'john', 'male', '95')
"""
mutations = []
for j, column in enumerate(args):
if isinstance(column, str):
m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=column)
elif isinstance(column, int):
m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=encode(column))
mutations.append(m_name)
self.client.mutateRow(self.table, rowKey, mutations, {})
def puts(self, rowKeys, values, qualifier='1'):
""" put sevel rows, `qualifier` is autoincrement
:param rowKeys: a single rowKey
:param values: values is a 2-dimension list, one piece element is [name, sex, age]
:param qualifier: column family qualifier
Usage::
>>> HBaseTest().puts('test', [['lee', 'f', '27'], ['clark', 'm', 27], ['dan', 'f', '27']])
"""
mutationsBatch = []
if not isinstance(rowKeys, list):
rowKeys = [rowKeys] * len(values)
for i, value in enumerate(values):
mutations = []
for j, column in enumerate(value):
if isinstance(column, str):
m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=column)
elif isinstance(column, int):
m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=encode(column))
mutations.append(m_name)
qualifier = str( int(qualifier) + 1 )
mutationsBatch.append( Hbase.BatchMutation(row=rowKeys[i], mutations=mutations) )
self.client.mutateRows(self.table, mutationsBatch, {})
def getRow(self, row, qualifier='0'):
""" get one row from hbase table
:param row: row key
"""
rows = self.client.getRow(self.table, row, {})
ret = []
for r in rows:
rd = { 'row': r.row }
for j, column in enumerate(self.columnFamilies):
if self.columnFamiliesType[j] == str:
rd.update({ column: r.columns.get(column+':'+qualifier).value })
elif self.columnFamiliesType[j] == int:
rd.update({ column: decode(r.columns.get(column+':'+qualifier).value) })
ret.append(rd)
return ret
def getRows(self, rows, qualifier='0'):
""" get rows from hbase table, all the row specify the same `qualifier`
:param rows: a list of row key
"""
grow = True if len(set(rows)) == 1 else False
for r in rows:
yield self.getRow(r, qualifier)
if grow: qualifier = str( int(qualifier) + 1 )
def scanner(self, numRows=100, startRow=None, stopRow=None):
""" scan the table
:param numRows: how much rows return in one iteration.
:param startRow: start scan row key
:param stopRow: stop scan row key
"""
scan = Hbase.TScan(startRow, stopRow)
scannerId = self.client.scannerOpenWithScan(self.table, scan, {})
# row = self.client.scannerGet(scannerId)
ret = []
rowList = self.client.scannerGetList(scannerId, numRows)
while rowList:
for r in rowList:
rd = { 'row': r.row }
for k, v in r.columns.iteritems():
cf, qualifier = k.split(':')
if qualifier not in rd:
rd[qualifier] = {}
idx = self.columnFamilies.index(cf)
if self.columnFamiliesType[idx] == str:
rd[qualifier].update({ cf: v.value })
elif self.columnFamiliesType[idx] == int:
rd[qualifier].update({ cf: decode(v.value) })
ret.append(rd)
rowList = self.client.scannerGetList(scannerId, numRows)
self.client.scannerClose(scannerId)
return ret
def demo():
ht = HBaseTest(table='test1')
values = [['lee', 'f', '27'], ['clark', 'm', 27], ['dan', 'f', '27']]
rowKey = 'cookie'
# ht.put(rowKey, '0', 'fish', 'f', '22')
# ht.puts(rowKey, values)
# print ht.getColumnDescriptors()
print ht.getRow(rowKey)
for i in ht.getRows([rowKey] * 4):
print i
print ht.scanner()
if __name__ == '__main__':
demo()
程序说明:
-
对存入hbase的整型数据,存入时encode成字符串,取出时decode成整型。因为hbase不支持float,int型数据。
-
HBaseTest类初始化,如果table不存在则创建。
-
对已存在的table支持put存一行数据,puts存多行数据,getRow读一行,getRows读取多行,还有scanner全table扫描的几个函数。程序清单中有用法。
参考cloudera的 How-to: Use the HBase Thrift Interface 系列文章:
1,
2,
3。
阅读(7177) | 评论(1) | 转发(0) |