Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1644994
  • 博文数量: 201
  • 博客积分: 2812
  • 博客等级: 少校
  • 技术积分: 3029
  • 用 户 组: 普通用户
  • 注册时间: 2011-01-18 18:28
个人简介

从事数据库工作多年,目前看好分布式NeSQL/HTAP数据库在企业客户市场的发展。未来的主要方向是——致力于 NewSQL/HTAP 数据库的推广普及。

文章存档

2016年(1)

2015年(8)

2014年(23)

2013年(50)

2012年(32)

2011年(87)

分类: Sybase

2015-07-20 16:44:53

在先前的博文中介绍了IQ中使用java语言编写标量udf和表udf例子。在这篇文章中将向大家介绍一个例子,通过这个例子说明IQ使用java udf访问hadoop的方法。

例子程序的功能说明:

  * 在java udf函数中调用org.apache.hadoop.util.RunJar类(传递jarFile、className,hdfs input dir,hdfs output dir等参数),由这个类调用Hadoop MapReduce 作业。

 * 由MapReduce作业完成计算处理,形成的结果存放在HDFS相应的目录中(由hdfs output dir参数指定)

 * 在java udf函数中读取MapReduce作业计算完成收后存放在HDFS目录中的结果文件,把结果文件以SQL结果集的方式返回到IQ Server,作为记录集合方式显示给客户端。

1. 环境说明

   * SAP IQ 16.0 SP08.30

   * 虚拟机一台,操作系统Linux 64bit

   * 使用IQ demo数据库

   * JDK 1.7.0_75

   * Hadoop 2.6.0(作为Hadoop client)

2. 编写Java 表udf函数hadoopRows()。

 

--下面是示例源代码         

package udf.example;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.lang.reflect.Array;

import java.lang.reflect.InvocationTargetException;

import java.lang.reflect.Method;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.Types;

import java.util.*;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import com.google.common.base.Charsets;

public class HadoopRows {

public static void hadoopRows(String jarFile, String className,

String hadoopInput, String hadoopOutput, ResultSet rset[])

throws Exception {

System.out.println("Hadoop Job Strating......");

try {

runHadoopMR(jarFile, className, hadoopInput, hadoopOutput);

} catch (Throwable e) {

e.printStackTrace();

}

System.out.println("Hadoop Job end......");

   //设置结果集的元数据信息

ResultSetMetaDataImpl rsmd = new ResultSetMetaDataImpl(2);

rsmd.setColumnType(1, Types.VARCHAR);

rsmd.setColumnName(1, "word");

rsmd.setColumnLabel(1, "word");

rsmd.setColumnDisplaySize(1, 255);

rsmd.setTableName(1, "MyTable");

rsmd.setColumnType(2, Types.INTEGER);

rsmd.setColumnName(2, "wordcount");

rsmd.setColumnLabel(2, "wordcount");

rsmd.setTableName(2, "MyTable");

ResultSetImpl rs = null;

try {

rs = new ResultSetImpl((ResultSetMetaData) rsmd);

rs.beforeFirst(); // Make sure we are at the beginning.

} catch (Exception e) {

System.out.println("Error: couldn't create result set.");

System.out.println(e.toString());

}

//访问HDFS文件系统得到MapRedeuce计算结果文件

Hashtable<String, Integer> hadoopTable = 

hadoopGetRows(hadoopOutput);

Enumeration<String> words = hadoopTable.keys();

int row = 0;

       //把结果集合以SQL记录的形式封装成SQL结果集

while (words.hasMoreElements()) {

try {

row++;

String word = (String) words.nextElement();

int wordCount = ((Integer) hadoopTable.get(word)).intValue();

rs.insertRow(); 

rs.updateString(1, word); 

System.out.print(word + "    ");

rs.updateInt(2, wordCount);

System.out.println(wordCount);

} catch (Exception e) {

System.out.println("Error: couldn't insert row/data on row "

+ row);

System.out.println(e.toString());

}

}

try {

rs.beforeFirst(); 

} catch (Exception e) {

System.out.println(e.toString());

}

rset[0] = rs; 

}

private static Hashtable<String, Integer> hadoopGetRows(String hadoopOutput)

throws Exception {

Configuration conf = new Configuration();

conf.set("fs.defaultFS", "hdfs://HAIQ-DB-01:8020");

conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

FileSystem fs = FileSystem.get(conf);

Path file = new Path(new Path(hadoopOutput), "part-r-00000");

if (!fs.exists(file)) {

throw new IOException("Output not found!");

}

BufferedReader br = null;

Hashtable<String, Integer> hadoopTable = new Hashtable<String, Integer>();

try {

br = new BufferedReader(new InputStreamReader(fs.open(file),

Charsets.UTF_8));

String line;

while ((line = br.readLine()) != null) {

StringTokenizer st = new StringTokenizer(line);

String word = st.nextToken();

String wordCount = st.nextToken();

hadoopTable.put(word, Integer.parseInt(wordCount));

}

} finally {

if (br != null) {

br.close();

}

if (fs != null) {

fs.close();

}

}

return hadoopTable;

}

private static void runHadoopMR(String jarFile, String className,

String input, String output) throws Throwable {

String args[] = { jarFile, className, input, output };

String mainClassName = "org.apache.hadoop.util.RunJar";

Class<?> mainClass = Class.forName(mainClassName);

Method main = mainClass.getMethod("main", new Class[] { Array

.newInstance(String.class, 0).getClass() });

try {

main.invoke(null, new Object[] { args });

} catch (InvocationTargetException e) {

throw e.getTargetException();

}

}

public static void main(String args[]) {

ResultSet[] rset = new ResultSetImpl[1];

try {

 

 hadoopRows("/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myhadoop.jar",

              "hadoop.example.MyWordCount",

              "/user/sybiq/input/*.txt", 

              "/user/sybiq/output/", rset);

} catch (Exception e) {

e.printStackTrace();

}

}

}

3. 编译

  为了方便大家学习、动手实验。我把本人环境中的$IQDIR16/udf/java中的代码和相关编译打包脚本放到了百度盘上,大家可以下载使用。下载地址如下:

style="font-family:宋体;color:#666666;font-size:12pt;">

  为了编译这个例子,执行如下步骤:

  (1) cd $IQDIR16/

  (2) 把下载的udf.tgz拷贝到这个目录下

  (3) tar xvfz udf.tgz

  

  执行完上面的步骤后,会形成$IQDIR16/udf/java目录,里面包含了全部代码。

 (4) cd $IQDIR16/udf/java/src

 (5) ./build.sh

 

     执行build.sh脚本,编译代码,并在$IQDIR16/udf/java/lib生成jar包。

 

4. 修改iq_java.sh,增加类路径

修改$IQDIR16/bin64目录下的iq_java.sh文件(IQ执行这个shell在另外的进程中启动java虚拟机,来执行java udf函数),增加如下内容:

JAVA_ARGS="-Xdebug -Xrunjdwp:transport=dt_socket,address=9988,server=y,suspend=n -Djava.ext.dirs=/usr/local/jdk1.7.0_75/jre/lib/ext:/usr/local/hadoop-2.6.0/share/hadoop/common:/usr/local/hadoop-2.6.0/share/hadoop/common/lib:/usr/local/hadoop-2.6.0/share/hadoop/hdfs:/usr/local/hadoop-2.6.0/share/hadoop/hdfs/lib:/usr/local/hadoop-2.6.0/share/hadoop/mapreduce:/usr/local/hadoop-2.6.0/share/hadoop/mapreduce/lib:/usr/local/hadoop-2.6.0/share/hadoop/yarn:/usr/local/hadoop-2.6.0/share/hadoop/yarn/lib:/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myhadoop.jar"

说明:为了能够在java udf中正常调用Hadoop MapReduce作业,需要设置hadoop运行环境所需要的jar包和hadoop的配置文件(比如core-site.xml、hdfs-site.xml、mapred-site.xml和yarn-site.xml)搜索路径,在本例中这些xml配置文件打包到myhadoop.jar中。

修改了iq_java.sh之后,需要重启IQ Server后者执行:

STOP EXTERNAL ENVIRONMENT JAVA

5. 安装jar包到IQ数据库

   --启动dbisql工具

   dbisql -c "uid=DBA;pwd=sql" -nogui

   --执行如下命令(在我的环境中$IQDIR16目录是/opt/sybiq/16.0/IQ-16_0)安装包含java udf函数的jar包(包含示例类和相关类文件)

   INSTALL JAVA  NEW JAR 'myudf' FROM FILE '/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myudf.jar'

   

   如果先前已经安装过,那么执行:

   INSTALL JAVA UPDATE JAR 'myudf' FROM FILE '/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myudf.jar'

   

  

6. 创建用户自定义函数

CREATE or REPLACE PROCEDURE hadooprows(IN jarFile VARCHAR(255), IN className VARCHAR(255), IN input VARCHAR(255), IN output VARCHAR(255)) 

RESULT (word VARCHAR(255),wordcount INTEGER) 

EXTERNAL NAME 

'udf.example.HadoopRows.hadoopRows(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;[Ljava/sql/ResultSet;)V' 

LANGUAGE JAVA 

说明:上面的语句如果拷贝到dbisql执行,必须放在一行内,否则会报错。

EXTERNAL_NAME是按照java包名.java类名.方法名指定的,括号中是方法的参数,右括号后面跟的是方法的返回类型.

 有关IQ udf的方法参数和返回值的详细说明,参见如下官方网址:

http://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.infocenter.dc01034.1600/doc/html/jcu1303326630386.html

  

7. 在SQL语句中使用udf函数

  

   在执行下面的sql语句之前,先把MyWordCount作业所需要的文件拷贝到HDFS相应的目录下,并且保证作业的输出目录(/user/sybiq/output/)在HDFS文件系统中不存在.为此执行如下命令:

  cd $IQDIR16/udf/java/resources/input

      hadoop fs -copyFromLocal file*.txt /user/sybiq/input/

      hadoop fs -rm -R /user/sybiq/output/

 

  --执行如下语句,调用MapReduce作业MyWordCount,并得到结果集

  select * 

  from      

  hadooprows('/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myhadoop.jar', 

'hadoop.example.MyWordCount', 

'/user/sybiq/input/*.txt', 

'/user/sybiq/output/')

  

  说明:MyWordCount作业的代码参见上面3.的下载地址的程序包

阅读(5981) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~