Chinaunix首页 | 论坛 | 博客
  • 博客访问: 4584380
  • 博文数量: 1214
  • 博客积分: 13195
  • 博客等级: 上将
  • 技术积分: 9105
  • 用 户 组: 普通用户
  • 注册时间: 2007-01-19 14:41
个人简介

C++,python,热爱算法和机器学习

文章分类

全部博文(1214)

文章存档

2021年(13)

2020年(49)

2019年(14)

2018年(27)

2017年(69)

2016年(100)

2015年(106)

2014年(240)

2013年(5)

2012年(193)

2011年(155)

2010年(93)

2009年(62)

2008年(51)

2007年(37)

分类: 大数据

2014-04-22 16:10:06

  搭建好了可用的Hbase,开始操纵数据。
  Hbase数据操作有两种接口,Thrift和REST。效率上Thrift略微占优势,就决定使用Thrift。此外还有Startbase, HappyBase等封装的第三方python接口。


 使用Thrift需要知道的两点:

 1. Thrift interface 没有load balancing的代码all load balancing will need to be done with external tools such a DNS round-robin, a virtual IP address, or in code.
 2. Thrift and REST client hosts usually don’t run any other services (such as DataNodes or RegionServers) to keep the overhead low and responsiveness high for REST or Thrift interactions.


 启动hbase 的 thrift server:

  bin/hbase thrift start


  制作Thrift python接口的软件包:

  1. sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev
  2. sudo apt-get install python2.7-dev git
  3. git clone thrift
  4. cd thrift
  5. ./bootstrap.sh
  6. ./configure
  7. make
  8. sudo make install
  9. cd ..
  10. mkdir HBaseThrift
  11. cd HBaseThrift
  12. thrift -gen py /home/ubuntu/hbase-0.98.1/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
  13. mv gen-py/* .
  14. rm -r gen-py/
  15. mkdir thrift
  16. cp -rp /home/ubuntu/thrift/lib/py/src/* ./thrift/
  17. cp /home/ubuntu/hbase-0.98.1/hbase-thrift/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift .
已经制作好并提交,方便大家使用。


 写一个调用thrift接口的类,方便数据读写。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Yuande Liu

from thrift.transport import TSocket
from thrift.protocol import TBinaryProtocol
from thrift.transport import TTransport
from hbase import Hbase

import struct
# Method for encoding ints with Thrift's string encoding
def encode(n):
   return struct.pack("i", n)

# Method for decoding ints with Thrift's string encoding
def decode(s):
   return int(s) if s.isdigit() else struct.unpack('i', s)[0]



class HBaseTest(object):

   def __init__(self, table='test', host='127.0.0.1', port=9090):
       self.table = table
       self.host = host
       self.port = port

       # Connect to HBase Thrift server
       self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
       self.protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)

       # Create and open the client connection
       self.client = Hbase.Client(self.protocol)
       self.transport.open()


       # set type and field of column families
       self.set_column_families([strstrint], ['name''sex''age'])
       self._build_column_families()

   def set_column_families(self, type_list, col_list=['name''sex''age']):
       self.columnFamiliesType = type_list
       self.columnFamilies = col_list

   def _build_column_families(self):
       """ give all column families name list, create a table
       """
       tables = self.client.getTableNames()
       if self.table not in tables:
           self.__create_table(self.table)

   def __create_table(self, table):
       """ create table in hbase with column families
       """
       columnFamilies = []
       for columnFamily in self.columnFamilies:
           name = Hbase.ColumnDescriptor(name=columnFamily)
           columnFamilies.append(name)
       self.client.createTable(table, columnFamilies)

   def __del__(self):
       self.transport.close()

   def __del_table(self, table):
       """ Delete a table, first need to disable it.
       """
       self.client.disableTable(table)
       self.client.deleteTable(table)

   def getColumnDescriptors(self):
       return self.client.getColumnDescriptors(self.table)

   def put(self, rowKey, qualifier='0', *args):
       """ put one row

       :param *args: all values correspond to column families.
           e.g. [name, sex, age]

       Usage::

       >>> HBaseTest().put('test', '0', 'john', 'male', '95')

       """
       mutations = []
       for j, column in enumerate(args):
           if isinstance(column, str):
               m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=column)
           elif isinstance(column, int):
               m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=encode(column))
           mutations.append(m_name)
       self.client.mutateRow(self.table, rowKey, mutations, {})

   def puts(self, rowKeys, values, qualifier='1'):
       """ put sevel rows, `qualifier` is autoincrement

       :param rowKeys: a single rowKey
       :param values: values is a 2-dimension list, one piece element is [name, sex, age]
       :param qualifier: column family qualifier

       Usage::

       >>> HBaseTest().puts('test', [['lee', 'f', '27'], ['clark', 'm', 27], ['dan', 'f', '27']])

       """
       mutationsBatch = []
       if not isinstance(rowKeys, list):
           rowKeys = [rowKeys] * len(values)

       for i, value in enumerate(values):
           mutations = []
           for j, column in enumerate(value):
               if isinstance(column, str):
                   m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=column)
               elif isinstance(column, int):
                   m_name = Hbase.Mutation(column=self.columnFamilies[j]+':'+qualifier, value=encode(column))
               mutations.append(m_name)

           qualifier = strint(qualifier) + 1 )
           mutationsBatch.append( Hbase.BatchMutation(row=rowKeys[i], mutations=mutations) )
       self.client.mutateRows(self.table, mutationsBatch, {})


   def getRow(self, row, qualifier='0'):
       """ get one row from hbase table

       :param row: row key
       """
       rows = self.client.getRow(self.table, row, {})
       ret = []
       for r in rows:
           rd = { 'row': r.row }
           for j, column in enumerate(self.columnFamilies):
               if self.columnFamiliesType[j] == str:
                   rd.update({ column: r.columns.get(column+':'+qualifier).value })
               elif self.columnFamiliesType[j] == int:
                   rd.update({ column: decode(r.columns.get(column+':'+qualifier).value) })
           ret.append(rd)
       return ret

   def getRows(self, rows, qualifier='0'):
       """ get rows from hbase table, all the row specify the same `qualifier`

       :param rows: a list of row key
       """
       grow = True if len(set(rows)) == 1 else False

       for r in rows:
           yield self.getRow(r, qualifier)
           if grow: qualifier = strint(qualifier) + 1 )


   def scanner(self, numRows=100, startRow=None, stopRow=None):
       """ scan the table

       :param numRows: how much rows return in one iteration.
       :param startRow: start scan row key
       :param stopRow: stop scan row key
       """
       scan = Hbase.TScan(startRow, stopRow)
       scannerId = self.client.scannerOpenWithScan(self.table, scan, {})
#        row = self.client.scannerGet(scannerId)

       ret = []
       rowList = self.client.scannerGetList(scannerId, numRows)
       while rowList:
           for r in rowList:
               rd = { 'row': r.row }
               for k, v in r.columns.iteritems():
                   cf, qualifier = k.split(':')
                   if qualifier not in rd:
                       rd[qualifier] = {}

                   idx = self.columnFamilies.index(cf)
                   if self.columnFamiliesType[idx] == str:
                       rd[qualifier].update({ cf: v.value })
                   elif self.columnFamiliesType[idx] == int:
                       rd[qualifier].update({ cf: decode(v.value) })

               ret.append(rd)
           
           rowList = self.client.scannerGetList(scannerId, numRows)

       self.client.scannerClose(scannerId)
       return ret

def demo():
   ht = HBaseTest(table='test1')
   values = [['lee''f''27'], ['clark''m'27], ['dan''f''27']]
   rowKey = 'cookie'
#    ht.put(rowKey, '0', 'fish', 'f', '22')
#    ht.puts(rowKey, values)
#    print ht.getColumnDescriptors()

   print ht.getRow(rowKey)
   for i in ht.getRows([rowKey] * 4):
       print i
   print ht.scanner()

if __name__ == '__main__':
   demo()

程序说明:
  1. 对存入hbase的整型数据,存入时encode成字符串,取出时decode成整型。因为hbase不支持float,int型数据。
  2. HBaseTest类初始化,如果table不存在则创建。
  3. 对已存在的table支持put存一行数据,puts存多行数据,getRow读一行,getRows读取多行,还有scanner全table扫描的几个函数。程序清单中有用法。

参考cloudera的 How-to: Use the HBase Thrift Interface 系列文章:123
阅读(7177) | 评论(1) | 转发(0) |
给主人留下些什么吧!~~

CU博客助理2014-05-22 13:09:25

嘉宾点评:
在Amazon EC2云主机上实战HBase数据库的成果,博主讲述了Thrift接口的两点必备知识,并详细讲述了如何使用Python调用Thrift接口的经验,内容不错,很有参考价值。尤其值得一提的是,博主自己完成了Thrift Python接口的软件包,并共享到GitHub,值得赞扬。(感谢您参与“原创博文评选”获奖结果即将公布)