HelloWorld.zip
-
package elementary;
-
-
import java.io.IOException;
-
import java.text.SimpleDateFormat;
-
import java.util.ArrayList;
-
import java.util.Date;
-
import java.util.List;
-
import java.util.concurrent.atomic.AtomicInteger;
-
import java.util.concurrent.ExecutorService;
-
import java.util.concurrent.Executors;
-
import java.util.concurrent.TimeUnit;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.hbase.Cell;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.HColumnDescriptor;
-
import org.apache.hadoop.hbase.HTableDescriptor;
-
import org.apache.hadoop.hbase.MasterNotRunningException;
-
import org.apache.hadoop.hbase.TableName;
-
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-
import org.apache.hadoop.hbase.client.Delete;
-
import org.apache.hadoop.hbase.client.Get;
-
import org.apache.hadoop.hbase.client.Admin;
-
import org.apache.hadoop.hbase.client.BufferedMutator;
-
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
-
import org.apache.hadoop.hbase.client.Connection;
-
import org.apache.hadoop.hbase.client.ConnectionFactory;
-
import org.apache.hadoop.hbase.client.Table;
-
import org.apache.hadoop.hbase.client.Put;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.ResultScanner;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.util.ThreadUtil;
-
-
public class HelloWorld {
-
private static Configuration conf = null;
-
private static Connection conn = null;
-
private static Admin admin = null;
-
public static AtomicInteger count = new AtomicInteger();
-
-
/**
-
* 初始化配置
-
*/
-
static {
-
conf = HBaseConfiguration.create();
-
//如果沒有配置文件,一定要記得手動宣告
-
-
conf.set("hbase.zookeeper.quorum", "10.148.137.143");
-
conf.set("hbase.zookeeper.property.clientPort", "2181");
-
}
-
-
static {
-
try {
-
conn = ConnectionFactory.createConnection();
-
admin = conn.getAdmin();
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
static public class MyThread extends Thread
-
{
-
int _start;
-
String _tablename;
-
Connection conn;
-
//BufferedMutator table;
-
Table table;
-
-
public MyThread(int start, String tablename) {
-
_start = start;
-
_tablename = tablename;
-
}
-
-
public void run() {
-
String tablename = _tablename;
-
Thread current = Thread.currentThread();
-
long thread_id = current.getId();
-
System.out.printf("thread[%d] run\n", thread_id);
-
-
try {
-
conn = ConnectionFactory.createConnection();
-
//BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename));
-
//params.writeBufferSize(1024 * 4);
-
//table = conn.getBufferedMutator(params);
-
table = conn.getTable(TableName.valueOf(tablename));
-
-
for (int j=_start; j<100; ++j) {
-
for (int i=0; i<10000000; ++i) {
-
// zkb_0_0
-
String zkb = "zkb_" + String.valueOf(_start) + "_" + String.valueOf(i);
-
Put put = new Put(Bytes.toBytes(zkb));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field1"),Bytes.toBytes(String.valueOf(i+0)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field2"),Bytes.toBytes(String.valueOf(i+1)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field3"),Bytes.toBytes(String.valueOf(i+2)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field4"),Bytes.toBytes(String.valueOf(i+3)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field5"),Bytes.toBytes(String.valueOf(i+4)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field6"),Bytes.toBytes(String.valueOf(i+5)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field7"),Bytes.toBytes(String.valueOf(i+6)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field8"),Bytes.toBytes(String.valueOf(i+7)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field9"),Bytes.toBytes(String.valueOf(i+8)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field10"),Bytes.toBytes(String.valueOf(i+9)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field11"),Bytes.toBytes(String.valueOf(i+10)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field12"),Bytes.toBytes(String.valueOf(i+11)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field13"),Bytes.toBytes(String.valueOf(i+12)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field14"),Bytes.toBytes(String.valueOf(i+13)));
-
put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field15"),Bytes.toBytes(String.valueOf(i+14)));
-
//table.mutate(put);
-
table.put(put);
-
-
int m = HelloWorld.count.incrementAndGet();
-
if (m % 10000 == 0) {
-
Date dt = new Date();
-
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss aa");
-
String now = sdf.format(dt);
-
System.out.printf("[%s] thread[%d] m=%d, j=%d, i=%d\n", now, thread_id, m, j, i);
-
}
-
}
-
}
-
-
System.out.printf("thread[%d] over\n", thread_id);
-
}
-
catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
-
/**
-
* 建立表格
-
* @param tablename
-
* @param cfs
-
*/
-
public static void createTable(String tablename, String[] cfs){
-
try {
-
if (admin.tableExists(TableName.valueOf(tablename))) {
-
System.out.println("table already exists!");
-
} else {
-
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename));
-
for (int i = 0; i < cfs.length; i++) {
-
HColumnDescriptor desc = new HColumnDescriptor(cfs[i]);
-
desc.setMaxVersions(3650);
-
tableDesc.addFamily(desc);
-
}
-
-
byte[][] splitKeys = new byte[][] {
-
Bytes.toBytes("zkb_0_0"),
-
Bytes.toBytes("zkb_10_0"),
-
Bytes.toBytes("zkb_20_0"),
-
Bytes.toBytes("zkb_30_0"),
-
Bytes.toBytes("zkb_40_0"),
-
Bytes.toBytes("zkb_50_0"),
-
Bytes.toBytes("zkb_60_0"),
-
Bytes.toBytes("zkb_70_0"),
-
Bytes.toBytes("zkb_80_0"),
-
Bytes.toBytes("zkb_90_0"),
-
Bytes.toBytes("zkb_100_0")
-
};
-
admin.createTable(tableDesc, splitKeys);
-
admin.close();
-
System.out.println("create table " + tablename + " ok.");
-
}
-
} catch (MasterNotRunningException e) {
-
e.printStackTrace();
-
} catch (ZooKeeperConnectionException e) {
-
e.printStackTrace();
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 刪除表格
-
* @param tablename
-
*/
-
public static void deleteTable(String tablename){
-
try {
-
//Connection conn = ConnectionFactory.createConnection();
-
//Admin admin = conn.getAdmin();
-
admin.disableTable(TableName.valueOf(tablename));
-
admin.deleteTable(TableName.valueOf(tablename));
-
System.out.println("delete table " + tablename + " ok.");
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 刪除一筆資料
-
* @param tableName
-
* @param rowKey
-
*/
-
public static void delRecord (String tableName, String rowKey){
-
try {
-
Table table = conn.getTable(TableName.valueOf(tableName));
-
-
List<Delete> list = new ArrayList<Delete>();
-
Delete del = new Delete(rowKey.getBytes());
-
list.add(del);
-
table.delete(list);
-
System.out.println("del recored " + rowKey + " ok.");
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 取得一筆資料
-
* @param tableName
-
* @param rowKey
-
*/
-
public static void getOneRecord (String tableName, String rowKey){
-
try {
-
Table table = conn.getTable(TableName.valueOf(tableName));
-
-
Get get = new Get(rowKey.getBytes());
-
Result rs = table.get(get);
-
List<Cell> list = rs.listCells();
-
for(Cell cell:list){
-
System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
-
System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
-
System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
-
System.out.print(cell.getTimestamp() + " " );
-
System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
-
System.out.println("");
-
}
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 取得所有資料
-
* @param tableName
-
*/
-
public static void getAllRecord (String tableName) {
-
try{
-
//Connection conn = ConnectionFactory.createConnection();
-
Table table = conn.getTable(TableName.valueOf(tableName));
-
-
Scan scan = new Scan();
-
ResultScanner resultscanner = table.getScanner(scan);
-
for(Result rs:resultscanner){
-
List<Cell> list = rs.listCells();
-
for(Cell cell:list){
-
System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
-
System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
-
System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
-
System.out.print(cell.getTimestamp() + " " );
-
System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
-
System.out.println("");
-
}
-
}
-
} catch (IOException e){
-
e.printStackTrace();
-
}
-
}
-
-
/**
-
* 取得Family清單
-
* @param tableName
-
* @return
-
*/
-
public static ArrayList<String> getAllFamilyName(String tableName) {
-
ArrayList<String> familyname_list = new ArrayList<String>();
-
try{
-
//Connection conn = ConnectionFactory.createConnection();
-
Table table = conn.getTable(TableName.valueOf(tableName));
-
-
HTableDescriptor htabledescriptor = table.getTableDescriptor();
-
HColumnDescriptor[] hdlist = htabledescriptor.getColumnFamilies();
-
for(int i=0;i<hdlist.length;i++){
-
HColumnDescriptor hd = hdlist[i];
-
familyname_list.add(hd.getNameAsString());
-
}
-
} catch (IOException e){
-
e.printStackTrace();
-
}
-
return familyname_list;
-
}
-
-
// java -cp HelloWorld.jar:`ls lib/*.jar|awk '{printf("%s:", $0)}'` elementary.HelloWorld 5
-
public static void main(String[] args) {
-
System.out.println("HelloWorldX");
-
if (args.length > 0)
-
System.out.println(args[0]);
-
-
int start = 0;
-
if (args.length > 1)
-
start = Integer.valueOf(args[1]);
-
if (start < 0)
-
start = 0;
-
-
int num_threads = 16;
-
if (args.length > 2)
-
num_threads = Integer.valueOf(args[2]);
-
-
try {
-
String tablename = "scores";
-
String[] familys = {"grade", "course"};
-
HelloWorld.createTable(tablename, familys);
-
-
//ExecutorService thread_pool = Executors.newSingleThreadExecutor();
-
ExecutorService thread_pool = Executors.newFixedThreadPool(num_threads);
-
Thread[] pool = new HelloWorld.MyThread[80];
-
for (int i=0; i<pool.length; ++i) {
-
pool[i] = new HelloWorld.MyThread(i, tablename);
-
thread_pool.execute(pool[i]);
-
}
-
-
thread_pool.shutdown();
-
System.out.println("over");
-
}
-
catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
-
}
阅读(21004) | 评论(0) | 转发(0) |