Hbase数据操作有两种接口,Thrift和REST。效率上Thrift略微占优势,就决定使用Thrift。此外还有Startbase, HappyBase等封装的第三方python接口。
1. Thrift interface 没有load balancing的代码
(all load balancing will need to be done with external tools such a DNS round-robin, a virtual IP address, or in code.)
2. Thrift and REST client hosts usually don’t run any other services (such as DataNodes or RegionServers) to keep the overhead low and responsiveness high for REST or Thrift interactions.
启动hbase 的 thrift server:
bin/hbase thrift start
制作Thrift python接口的软件包:
sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev
sudo apt-get install python2.7-dev git
git clone thrift
cd thrift
sudo make install
cd ..
mkdir HBaseThrift
cd HBaseThrift
thrift -gen py /home/ubuntu/hbase-0.98.1/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
mv gen-py/* .
rm -r gen-py/
mkdir thrift
cp -rp /home/ubuntu/thrift/lib/py/src/* ./thrift/
cp /home/ubuntu/hbase-0.98.1/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift .
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Yuande Liu
from thrift.transport import TSocket
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport
from hbase import Hbase
import struct
# Method for encoding ints with Thrift's string encoding
def encode(n):
return struct.pack("i", n)
# Method for decoding ints with Thrift's string encoding
def decode(s):
return int(s) if s.isdigit() else struct.unpack('i', s)[0]
class HBaseTest(object):
def __init__(self, table='test', host='', port=9090):
self.table = table
self.host = host
self.port = port
# Connect to HBase Thrift server
self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
self.protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
# Create and open the client connection
self.client = Hbase.Client(self.protocol)
# set type and field of column families
self.set_column_families([str, str, int], ['name', 'sex', 'age'])
def set_column_families(self, type_list, col_list=['name', 'sex', 'age']):
self.columnFamiliesType = type_list
self.columnFamilies = col_list
def _build_column_families(self):
""" give all column families name list, create a table
tables = self.client.getTableNames()
if self.table not in tables:
def __create_table(self, table):
""" create table in hbase with column families
columnFamilies = []
for columnFamily in self.columnFamilies:
name = Hbase.ColumnDescriptor(name=columnFamily)
self.client.createTable(table, columnFamilies)
def __del__(self):
def __del_table(self, table):
""" Delete a table, first need to disable it.
def getColumnDescriptors(self):
return self.client.getColumnDescriptors(self.table)
def put(self, rowKey, qualifier='0', *args):
""" put one row
:param *args: all values correspond to column families.
e.g. [name, sex, age]
>>> HBaseTest().put('test', '0', 'john', 'male', '95')
mutations = []
for j, column in enumerate(args):
if isinstance(column, str):
m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=column)
elif isinstance(column, int):
m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=encode(column))
self.client.mutateRow(self.table, rowKey, mutations, {})
def puts(self, rowKeys, values, qualifier='1'):
""" put sevel rows, `qualifier` is autoincrement
:param rowKeys: a single rowKey
:param values: values is a 2-dimension list, one piece element is [name, sex, age]
:param qualifier: column family qualifier
>>> HBaseTest().puts('test', [['lee', 'f', '27'], ['clark', 'm', 27], ['dan', 'f', '27']])
mutationsBatch = []
if not isinstance(rowKeys, list):
rowKeys = [rowKeys] * len(values)
for i, value in enumerate(values):
mutations = []
for j, column in enumerate(value):
if isinstance(column, str):
m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=column)
elif isinstance(column, int):
m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=encode(column))
qualifier = str( int(qualifier) + 1 )
mutationsBatch.append( Hbase.BatchMutation(row=rowKeys[i], mutations=mutations) )
self.client.mutateRows(self.table, mutationsBatch, {})
def getRow(self, row, qualifier='0'):
""" get one row from hbase table
:param row: row key
rows = self.client.getRow(self.table, row, {})
ret = []
for r in rows:
rd = { 'row': r.row }
for j, column in enumerate(self.columnFamilies):
if self.columnFamiliesType[j] == str:
rd.update({ column: r.columns.get(column+':'+qualifier).value })
elif self.columnFamiliesType[j] == int:
rd.update({ column: decode(r.columns.get(column+':'+qualifier).value) })
return ret
def getRows(self, rows, qualifier='0'):
""" get rows from hbase table, all the row specify the same `qualifier`
:param rows: a list of row key
grow = True if len(set(rows)) == 1 else False
for r in rows:
yield self.getRow(r, qualifier)
if grow: qualifier = str( int(qualifier) + 1 )
def scanner(self, numRows=100, startRow=None, stopRow=None):
""" scan the table
:param numRows: how much rows return in one iteration.
:param startRow: start scan row key
:param stopRow: stop scan row key
scan = Hbase.TScan(startRow, stopRow)
scannerId = self.client.scannerOpenWithScan(self.table, scan, {})
# row = self.client.scannerGet(scannerId)
ret = []
rowList = self.client.scannerGetList(scannerId, numRows)
while rowList:
for r in rowList:
rd = { 'row': r.row }
for k, v in r.columns.iteritems():
cf, qualifier = k.split(':')
if qualifier not in rd:
rd[qualifier] = {}
idx = self.columnFamilies.index(cf)
if self.columnFamiliesType[idx] == str:
rd[qualifier].update({ cf: v.value })
elif self.columnFamiliesType[idx] == int:
rd[qualifier].update({ cf: decode(v.value) })
rowList = self.client.scannerGetList(scannerId, numRows)
return ret
def demo():
ht = HBaseTest(table='test1')
values = [['lee', 'f', '27'], ['clark', 'm', 27], ['dan', 'f', '27']]
rowKey = 'cookie'
# ht.put(rowKey, '0', 'fish', 'f', '22')
# ht.puts(rowKey, values)
# print ht.getColumnDescriptors()
print ht.getRow(rowKey)
for i in ht.getRows([rowKey] * 4):
print i
print ht.scanner()
if __name__ == '__main__':
参考cloudera的 How-to: Use the HBase Thrift Interface 系列文章:
