Chinaunix首页 | 论坛 | 博客
  • 博客访问: 8168840
  • 博文数量: 595
  • 博客积分: 13065
  • 博客等级: 上将
  • 技术积分: 10334
  • 用 户 组: 普通用户
  • 注册时间: 2008-03-26 16:44
个人简介

推荐: blog.csdn.net/aquester https://github.com/eyjian https://www.cnblogs.com/aquester http://blog.chinaunix.net/uid/20682147.html

文章分类

全部博文(595)

分类: HADOOP

2016-03-09 17:37:36

HelloWorld.zip

点击(此处)折叠或打开

  1. package elementary;

  2. import java.io.IOException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Date;
  6. import java.util.List;
  7. import java.util.concurrent.atomic.AtomicInteger;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.TimeUnit;

  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.hbase.Cell;
  13. import org.apache.hadoop.hbase.HBaseConfiguration;
  14. import org.apache.hadoop.hbase.HColumnDescriptor;
  15. import org.apache.hadoop.hbase.HTableDescriptor;
  16. import org.apache.hadoop.hbase.MasterNotRunningException;
  17. import org.apache.hadoop.hbase.TableName;
  18. import org.apache.hadoop.hbase.ZooKeeperConnectionException;
  19. import org.apache.hadoop.hbase.client.Delete;
  20. import org.apache.hadoop.hbase.client.Get;
  21. import org.apache.hadoop.hbase.client.Admin;
  22. import org.apache.hadoop.hbase.client.BufferedMutator;
  23. import org.apache.hadoop.hbase.client.BufferedMutatorParams;
  24. import org.apache.hadoop.hbase.client.Connection;
  25. import org.apache.hadoop.hbase.client.ConnectionFactory;
  26. import org.apache.hadoop.hbase.client.Table;
  27. import org.apache.hadoop.hbase.client.Put;
  28. import org.apache.hadoop.hbase.client.Result;
  29. import org.apache.hadoop.hbase.client.ResultScanner;
  30. import org.apache.hadoop.hbase.client.Scan;
  31. import org.apache.hadoop.hbase.util.Bytes;
  32. import org.apache.hadoop.util.ThreadUtil;

  33. public class HelloWorld {
  34.     private static Configuration conf = null;
  35.     private static Connection conn = null;
  36.     private static Admin admin = null;
  37.     public static AtomicInteger count = new AtomicInteger();

  38.     /**
  39.      * 初始化配置
  40.      */
  41.     static {
  42.         conf = HBaseConfiguration.create();
  43.         //如果沒有配置文件,一定要記得手動宣告

  44.         conf.set("hbase.zookeeper.quorum", "10.148.137.143");
  45.         conf.set("hbase.zookeeper.property.clientPort", "2181");
  46.     }
  47.     
  48.     static {
  49.         try {
  50.          conn = ConnectionFactory.createConnection();
  51.      admin = conn.getAdmin();
  52.      } catch (IOException e) {
  53.      e.printStackTrace();
  54.      }
  55.     }

  56.     static public class MyThread extends Thread
  57.     {
  58.         int _start;
  59.         String _tablename;
  60.         Connection conn;
  61.         //BufferedMutator table;
  62.         Table table;

  63.         public MyThread(int start, String tablename) {
  64.             _start = start;
  65.             _tablename = tablename;
  66.         }
  67.         
  68.         public void run() {
  69.             String tablename = _tablename;
  70.             Thread current = Thread.currentThread();
  71.             long thread_id = current.getId();
  72.             System.out.printf("thread[%d] run\n", thread_id);
  73.             
  74.             try {
  75.                 conn = ConnectionFactory.createConnection();
  76.                 //BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename));
  77.                 //params.writeBufferSize(1024 * 4);
  78.                 //table = conn.getBufferedMutator(params);
  79.                 table = conn.getTable(TableName.valueOf(tablename));

  80.                 for (int j=_start; j<100; ++j) {
  81.                     for (int i=0; i<10000000; ++i) {
  82.                         // zkb_0_0
  83.                         String zkb = "zkb_" + String.valueOf(_start) + "_" + String.valueOf(i);
  84.                         Put put = new Put(Bytes.toBytes(zkb));
  85.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field1"),Bytes.toBytes(String.valueOf(i+0)));                  
  86.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field2"),Bytes.toBytes(String.valueOf(i+1)));
  87.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field3"),Bytes.toBytes(String.valueOf(i+2)));
  88.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field4"),Bytes.toBytes(String.valueOf(i+3)));
  89.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field5"),Bytes.toBytes(String.valueOf(i+4)));
  90.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field6"),Bytes.toBytes(String.valueOf(i+5)));
  91.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field7"),Bytes.toBytes(String.valueOf(i+6)));
  92.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field8"),Bytes.toBytes(String.valueOf(i+7)));
  93.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field9"),Bytes.toBytes(String.valueOf(i+8)));
  94.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field10"),Bytes.toBytes(String.valueOf(i+9)));
  95.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field11"),Bytes.toBytes(String.valueOf(i+10)));
  96.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field12"),Bytes.toBytes(String.valueOf(i+11)));
  97.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field13"),Bytes.toBytes(String.valueOf(i+12)));
  98.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field14"),Bytes.toBytes(String.valueOf(i+13)));
  99.                         put.addColumn(Bytes.toBytes("grade"),Bytes.toBytes("field15"),Bytes.toBytes(String.valueOf(i+14)));
  100.                         //table.mutate(put);
  101.                         table.put(put);

  102.              int m = HelloWorld.count.incrementAndGet();
  103.              if (m % 10000 == 0) {
  104.                  Date dt = new Date();
  105.                  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss aa");
  106.                  String now = sdf.format(dt);
  107.                  System.out.printf("[%s] thread[%d] m=%d, j=%d, i=%d\n", now, thread_id, m, j, i);
  108.              }
  109.                     }
  110.                 }

  111.                 System.out.printf("thread[%d] over\n", thread_id);
  112.             }
  113.             catch (Exception e) {
  114.                 e.printStackTrace();
  115.             }
  116.         }
  117.     }
  118.     
  119.     /**
  120.      * 建立表格
  121.      * @param tablename
  122.      * @param cfs
  123.      */
  124.     public static void createTable(String tablename, String[] cfs){
  125.         try {
  126.             if (admin.tableExists(TableName.valueOf(tablename))) {
  127.                 System.out.println("table already exists!");
  128.             } else {
  129.                 HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename));
  130.                 for (int i = 0; i < cfs.length; i++) {
  131.                     HColumnDescriptor desc = new HColumnDescriptor(cfs[i]);
  132.                     desc.setMaxVersions(3650);     
  133.                     tableDesc.addFamily(desc);
  134.                 }

  135.                 byte[][] splitKeys = new byte[][] {
  136.                     Bytes.toBytes("zkb_0_0"),
  137.                     Bytes.toBytes("zkb_10_0"),
  138.                     Bytes.toBytes("zkb_20_0"),
  139.                     Bytes.toBytes("zkb_30_0"),
  140.                     Bytes.toBytes("zkb_40_0"),
  141.                     Bytes.toBytes("zkb_50_0"),
  142.                     Bytes.toBytes("zkb_60_0"),
  143.                     Bytes.toBytes("zkb_70_0"),
  144.                     Bytes.toBytes("zkb_80_0"),
  145.                     Bytes.toBytes("zkb_90_0"),
  146.                     Bytes.toBytes("zkb_100_0")
  147.                 };
  148.                 admin.createTable(tableDesc, splitKeys);
  149.                 admin.close();
  150.                 System.out.println("create table " + tablename + " ok.");
  151.             }
  152.         } catch (MasterNotRunningException e) {
  153.             e.printStackTrace();
  154.         } catch (ZooKeeperConnectionException e) {
  155.             e.printStackTrace();
  156.         } catch (IOException e) {
  157.             e.printStackTrace();
  158.         }
  159.     }
  160.     
  161.     /**
  162.      * 刪除表格
  163.      * @param tablename
  164.      */
  165.     public static void deleteTable(String tablename){
  166.         try {
  167.             //Connection conn = ConnectionFactory.createConnection();
  168.             //Admin admin = conn.getAdmin();     
  169.             admin.disableTable(TableName.valueOf(tablename));
  170.             admin.deleteTable(TableName.valueOf(tablename));
  171.             System.out.println("delete table " + tablename + " ok.");
  172.         } catch (IOException e) {
  173.             e.printStackTrace();
  174.         }
  175.     }

  176.     /**
  177.      * 刪除一筆資料
  178.      * @param tableName
  179.      * @param rowKey
  180.      */
  181.     public static void delRecord (String tableName, String rowKey){
  182.         try {
  183.             Table table = conn.getTable(TableName.valueOf(tableName));
  184.             
  185.             List<Delete> list = new ArrayList<Delete>();
  186.             Delete del = new Delete(rowKey.getBytes());
  187.             list.add(del);
  188.             table.delete(list);
  189.             System.out.println("del recored " + rowKey + " ok.");
  190.         } catch (IOException e) {
  191.             e.printStackTrace();
  192.         }
  193.     }
  194.     
  195.     /**
  196.      * 取得一筆資料
  197.      * @param tableName
  198.      * @param rowKey
  199.      */
  200.     public static void getOneRecord (String tableName, String rowKey){
  201.         try {
  202.             Table table = conn.getTable(TableName.valueOf(tableName));
  203.             
  204.             Get get = new Get(rowKey.getBytes());
  205.             Result rs = table.get(get);
  206.             List<Cell> list = rs.listCells();
  207.             for(Cell cell:list){
  208.                 System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  209.                 System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  210.                 System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  211.                 System.out.print(cell.getTimestamp() + " " );
  212.                 System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  213.                 System.out.println("");
  214.             }
  215.         } catch (IOException e) {
  216.             e.printStackTrace();
  217.         }
  218.     }
  219.     
  220.     /**
  221.      * 取得所有資料
  222.      * @param tableName
  223.      */
  224.     public static void getAllRecord (String tableName) {
  225.         try{
  226.             //Connection conn = ConnectionFactory.createConnection();
  227.             Table table = conn.getTable(TableName.valueOf(tableName));
  228.             
  229.             Scan scan = new Scan();
  230.             ResultScanner resultscanner = table.getScanner(scan);
  231.             for(Result rs:resultscanner){
  232.                 List<Cell> list = rs.listCells();
  233.                 for(Cell cell:list){
  234.                     System.out.print(new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()) + " " );
  235.                     System.out.print(new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength()) + ":" );
  236.                     System.out.print(new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()) + " " );
  237.                     System.out.print(cell.getTimestamp() + " " );
  238.                     System.out.print(new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()) + " " );
  239.                     System.out.println("");
  240.                 }
  241.             }
  242.         } catch (IOException e){
  243.             e.printStackTrace();
  244.         }
  245.     }
  246.     
  247.     /**
  248.      * 取得Family清單
  249.      * @param tableName
  250.      * @return
  251.      */
  252.     public static ArrayList<String> getAllFamilyName(String tableName) {
  253.         ArrayList<String> familyname_list = new ArrayList<String>();
  254.         try{
  255.             //Connection conn = ConnectionFactory.createConnection();
  256.             Table table = conn.getTable(TableName.valueOf(tableName));
  257.             
  258.             HTableDescriptor htabledescriptor = table.getTableDescriptor();
  259.             HColumnDescriptor[] hdlist = htabledescriptor.getColumnFamilies();
  260.             for(int i=0;i<hdlist.length;i++){
  261.                 HColumnDescriptor hd = hdlist[i];
  262.                 familyname_list.add(hd.getNameAsString());
  263.             }
  264.         } catch (IOException e){
  265.             e.printStackTrace();
  266.         }
  267.         return familyname_list;
  268.     }

  269.     // java -cp HelloWorld.jar:`ls lib/*.jar|awk '{printf("%s:", $0)}'` elementary.HelloWorld 5
  270.     public static void main(String[] args) {
  271.         System.out.println("HelloWorldX");
  272.         if (args.length > 0)
  273.             System.out.println(args[0]);
  274.         
  275.         int start = 0;
  276.         if (args.length > 1)
  277.             start = Integer.valueOf(args[1]);
  278.         if (start < 0)
  279.             start = 0;
  280.         
  281.         int num_threads = 16;
  282.         if (args.length > 2)
  283.             num_threads = Integer.valueOf(args[2]);
  284.         
  285.         try {
  286.             String tablename = "scores";
  287.             String[] familys = {"grade", "course"};
  288.             HelloWorld.createTable(tablename, familys);

  289.             //ExecutorService thread_pool = Executors.newSingleThreadExecutor();
  290.             ExecutorService thread_pool = Executors.newFixedThreadPool(num_threads);
  291.             Thread[] pool = new HelloWorld.MyThread[80];
  292.             for (int i=0; i<pool.length; ++i) {
  293.                 pool[i] = new HelloWorld.MyThread(i, tablename);
  294.                 thread_pool.execute(pool[i]);
  295.             }
  296.             
  297.             thread_pool.shutdown();
  298.             System.out.println("over");
  299.         }
  300.         catch (Exception e) {
  301.             e.printStackTrace();
  302.         }
  303.     }
  304.     
  305. }

阅读(21010) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~