从事数据库工作多年,目前看好分布式NeSQL/HTAP数据库在企业客户市场的发展。未来的主要方向是——致力于 NewSQL/HTAP 数据库的推广普及。
分类: Sybase
2015-07-20 16:44:53
在先前的博文中介绍了IQ中使用java语言编写标量udf和表udf例子。在这篇文章中将向大家介绍一个例子,通过这个例子说明IQ使用java udf访问hadoop的方法。
例子程序的功能说明:
* 在java udf函数中调用org.apache.hadoop.util.RunJar类(传递jarFile、className,hdfs input dir,hdfs output dir等参数),由这个类调用Hadoop MapReduce 作业。
* 由MapReduce作业完成计算处理,形成的结果存放在HDFS相应的目录中(由hdfs output dir参数指定)
* 在java udf函数中读取MapReduce作业计算完成收后存放在HDFS目录中的结果文件,把结果文件以SQL结果集的方式返回到IQ Server,作为记录集合方式显示给客户端。
1. 环境说明
* SAP IQ 16.0 SP08.30
* 虚拟机一台,操作系统Linux 64bit
* 使用IQ demo数据库
* JDK 1.7.0_75
* Hadoop 2.6.0(作为Hadoop client)
2. 编写Java 表udf函数hadoopRows()。
--下面是示例源代码
package udf.example;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Types;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Charsets;
public class HadoopRows {
public static void hadoopRows(String jarFile, String className,
String hadoopInput, String hadoopOutput, ResultSet rset[])
throws Exception {
System.out.println("Hadoop Job Strating......");
try {
runHadoopMR(jarFile, className, hadoopInput, hadoopOutput);
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("Hadoop Job end......");
//设置结果集的元数据信息
ResultSetMetaDataImpl rsmd = new ResultSetMetaDataImpl(2);
rsmd.setColumnType(1, Types.VARCHAR);
rsmd.setColumnName(1, "word");
rsmd.setColumnLabel(1, "word");
rsmd.setColumnDisplaySize(1, 255);
rsmd.setTableName(1, "MyTable");
rsmd.setColumnType(2, Types.INTEGER);
rsmd.setColumnName(2, "wordcount");
rsmd.setColumnLabel(2, "wordcount");
rsmd.setTableName(2, "MyTable");
ResultSetImpl rs = null;
try {
rs = new ResultSetImpl((ResultSetMetaData) rsmd);
rs.beforeFirst(); // Make sure we are at the beginning.
} catch (Exception e) {
System.out.println("Error: couldn't create result set.");
System.out.println(e.toString());
}
//访问HDFS文件系统得到MapRedeuce计算结果文件
Hashtable<String, Integer> hadoopTable =
hadoopGetRows(hadoopOutput);
Enumeration<String> words = hadoopTable.keys();
int row = 0;
//把结果集合以SQL记录的形式封装成SQL结果集
while (words.hasMoreElements()) {
try {
row++;
String word = (String) words.nextElement();
int wordCount = ((Integer) hadoopTable.get(word)).intValue();
rs.insertRow();
rs.updateString(1, word);
System.out.print(word + " ");
rs.updateInt(2, wordCount);
System.out.println(wordCount);
} catch (Exception e) {
System.out.println("Error: couldn't insert row/data on row "
+ row);
System.out.println(e.toString());
}
}
try {
rs.beforeFirst();
} catch (Exception e) {
System.out.println(e.toString());
}
rset[0] = rs;
}
private static Hashtable<String, Integer> hadoopGetRows(String hadoopOutput)
throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://HAIQ-DB-01:8020");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fs = FileSystem.get(conf);
Path file = new Path(new Path(hadoopOutput), "part-r-00000");
if (!fs.exists(file)) {
throw new IOException("Output not found!");
}
BufferedReader br = null;
Hashtable<String, Integer> hadoopTable = new Hashtable<String, Integer>();
try {
br = new BufferedReader(new InputStreamReader(fs.open(file),
Charsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line);
String word = st.nextToken();
String wordCount = st.nextToken();
hadoopTable.put(word, Integer.parseInt(wordCount));
}
} finally {
if (br != null) {
br.close();
}
if (fs != null) {
fs.close();
}
}
return hadoopTable;
}
private static void runHadoopMR(String jarFile, String className,
String input, String output) throws Throwable {
String args[] = { jarFile, className, input, output };
String mainClassName = "org.apache.hadoop.util.RunJar";
Class<?> mainClass = Class.forName(mainClassName);
Method main = mainClass.getMethod("main", new Class[] { Array
.newInstance(String.class, 0).getClass() });
try {
main.invoke(null, new Object[] { args });
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
public static void main(String args[]) {
ResultSet[] rset = new ResultSetImpl[1];
try {
hadoopRows("/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myhadoop.jar",
"hadoop.example.MyWordCount",
"/user/sybiq/input/*.txt",
"/user/sybiq/output/", rset);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 编译
为了方便大家学习、动手实验。我把本人环境中的$IQDIR16/udf/java中的代码和相关编译打包脚本放到了百度盘上,大家可以下载使用。下载地址如下:
style="font-family:宋体;color:#666666;font-size:12pt;">
为了编译这个例子,执行如下步骤:
(1) cd $IQDIR16/
(2) 把下载的udf.tgz拷贝到这个目录下
(3) tar xvfz udf.tgz
执行完上面的步骤后,会形成$IQDIR16/udf/java目录,里面包含了全部代码。
(4) cd $IQDIR16/udf/java/src
(5) ./build.sh
执行build.sh脚本,编译代码,并在$IQDIR16/udf/java/lib生成jar包。
4. 修改iq_java.sh,增加类路径
修改$IQDIR16/bin64目录下的iq_java.sh文件(IQ执行这个shell在另外的进程中启动java虚拟机,来执行java udf函数),增加如下内容:
JAVA_ARGS="-Xdebug -Xrunjdwp:transport=dt_socket,address=9988,server=y,suspend=n -Djava.ext.dirs=/usr/local/jdk1.7.0_75/jre/lib/ext:/usr/local/hadoop-2.6.0/share/hadoop/common:/usr/local/hadoop-2.6.0/share/hadoop/common/lib:/usr/local/hadoop-2.6.0/share/hadoop/hdfs:/usr/local/hadoop-2.6.0/share/hadoop/hdfs/lib:/usr/local/hadoop-2.6.0/share/hadoop/mapreduce:/usr/local/hadoop-2.6.0/share/hadoop/mapreduce/lib:/usr/local/hadoop-2.6.0/share/hadoop/yarn:/usr/local/hadoop-2.6.0/share/hadoop/yarn/lib:/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myhadoop.jar"
说明:为了能够在java udf中正常调用Hadoop MapReduce作业,需要设置hadoop运行环境所需要的jar包和hadoop的配置文件(比如core-site.xml、hdfs-site.xml、mapred-site.xml和yarn-site.xml)搜索路径,在本例中这些xml配置文件打包到myhadoop.jar中。
修改了iq_java.sh之后,需要重启IQ Server后者执行:
STOP EXTERNAL ENVIRONMENT JAVA
5. 安装jar包到IQ数据库
--启动dbisql工具
dbisql -c "uid=DBA;pwd=sql" -nogui
--执行如下命令(在我的环境中$IQDIR16目录是/opt/sybiq/16.0/IQ-16_0)安装包含java udf函数的jar包(包含示例类和相关类文件)
INSTALL JAVA NEW JAR 'myudf' FROM FILE '/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myudf.jar'
如果先前已经安装过,那么执行:
INSTALL JAVA UPDATE JAR 'myudf' FROM FILE '/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myudf.jar'
6. 创建用户自定义函数
CREATE or REPLACE PROCEDURE hadooprows(IN jarFile VARCHAR(255), IN className VARCHAR(255), IN input VARCHAR(255), IN output VARCHAR(255))
RESULT (word VARCHAR(255),wordcount INTEGER)
EXTERNAL NAME
'udf.example.HadoopRows.hadoopRows(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;[Ljava/sql/ResultSet;)V'
LANGUAGE JAVA
说明:上面的语句如果拷贝到dbisql执行,必须放在一行内,否则会报错。
EXTERNAL_NAME是按照java包名.java类名.方法名指定的,括号中是方法的参数,右括号后面跟的是方法的返回类型.
有关IQ udf的方法参数和返回值的详细说明,参见如下官方网址:
7. 在SQL语句中使用udf函数
在执行下面的sql语句之前,先把MyWordCount作业所需要的文件拷贝到HDFS相应的目录下,并且保证作业的输出目录(/user/sybiq/output/)在HDFS文件系统中不存在.为此执行如下命令:
cd $IQDIR16/udf/java/resources/input
hadoop fs -copyFromLocal file*.txt /user/sybiq/input/
hadoop fs -rm -R /user/sybiq/output/
--执行如下语句,调用MapReduce作业MyWordCount,并得到结果集
select *
from
hadooprows('/opt/sybiq/16.0/IQ-16_0/udf/java/lib/myhadoop.jar',
'hadoop.example.MyWordCount',
'/user/sybiq/input/*.txt',
'/user/sybiq/output/')
说明:MyWordCount作业的代码参见上面3.的下载地址的程序包