一开始看的是《Hive 编程指南》中关于GenericUDAF的章节,例子有点难了。讲的是group_concat的实现。查了资料后觉得网上写的博客非常好,例子比较简单,更能够明白到底在说什么。
UDAF是Hive中用户自定义的聚集函数,Hive内置UDAF函数包括有sum()与count(),UDAF实现有简单与通用两种方式,简单UDAF因为使用Java反射导致性能损失,而且有些特性不能使用,已经被弃用了;在这篇博文中我们将关注Hive中自定义聚类函数-GenericUDAF,UDAF开发主要涉及到以下两个抽象类:
-
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
-
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
如果你想浏览代码:fork it on Github:
示例数据准备
首先先创建一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。
-
~$ cat ./people.txt
-
John Smith
-
John and Ann White
-
Ted Green
-
Dorothy
把该文件上载到HDFS目录/user/matthew/people中:
-
hadoop fs -mkdir people
-
hadoop fs -put ./people.txt people
下面要创建Hive外部表,在Hive shell中执行
-
CREATE EXTERNAL TABLE people (name string)
-
ROW FORMAT DELIMITED FIELDS
-
TERMINATED BY '\t'
-
ESCAPED BY ''
-
LINES TERMINATED BY '\n'
-
STORED AS TEXTFILE
-
LOCATION '/user/matthew/people';
相关抽象类介绍
创建一个GenericUDAF必须先了解以下两个抽象类:
-
org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
-
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
为了更好理解上述抽象类的API,要记住hive只是mapreduce函数,只不过hive已经帮助我们写好并隐藏mapreduce,向上提供简洁的sql函数,所以我们要结合Mapper、Combiner与Reducer来帮助我们理解这个函数。要记住在hadoop集群中有若干台机器,在不同的机器上Mapper与Reducer任务独立运行。
所以大体上来说,这个UDAF函数读取数据(mapper),聚集一堆mapper输出到部分聚集结果(combiner),并且最终创建一个最终的聚集结果(reducer)。因为我们跨域多个combiner进行聚集,所以我们需要保存部分聚集结果。
AbstractGenericUDAFResolver
Resolver很简单,要覆盖实现下面方法,该方法会根据sql传人的参数数据格式指定调用哪个Evaluator进行处理。
-
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;
GenericUDAFEvaluator
UDAF逻辑处理主要发生在Evaluator中,要实现该抽象类的几个方法。
在理解Evaluator之前,必须先理解objectInspector接口与GenericUDAFEvaluator中的内部类Model。
ObjectInspector
作用主要是解耦数据使用与数据格式,使得数据流在输入输出端切换不同的输入输出格式,不同的Operator上使用不同的格式。可以参考这两篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有关于objectinspector的介绍。
Model
Model代表了UDAF在mapreduce的各个阶段。
-
public static enum Mode {
-
/**
-
* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
-
* 将会调用iterate()和terminatePartial()
-
*/
-
PARTIAL1,
-
/**
-
* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
-
* 将会调用merge() 和 terminatePartial()
-
*/
-
PARTIAL2,
-
/**
-
* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
-
* 将会调用merge()和terminate()
-
*/
-
FINAL,
-
/**
-
* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
-
* 将会调用 iterate()和terminate()
-
*/
-
COMPLETE
-
};
-
一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。
而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。
-
GenericUDAFEvaluator的方法
-
// 确定各个阶段输入输出参数的数据格式ObjectInspectors
-
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
-
// 保存数据聚集结果的类
-
abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
-
// 重置聚集结果
-
public void reset(AggregationBuffer agg) throws HiveException;
-
// map阶段,迭代处理输入sql传过来的列数据
-
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
-
// map与combiner结束返回结果,得到部分数据聚集结果
-
public Object terminatePartial(AggregationBuffer agg) throws HiveException;
-
// combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
-
public void merge(AggregationBuffer agg, Object partial) throws HiveException;
-
// reducer阶段,输出最终结果
-
public Object terminate(AggregationBuffer agg) throws HiveException;
图解Model与Evaluator关系
实例
下面将讲述一个聚集函数UDAF的实例,我们将计算people这张表中的name列字母的个数。
下面的函数代码是计算指定列中字符的总数(包括空格)
pom文件如下:
-
<?xml version="1.0" encoding="UTF-8"?>
-
<project xmlns=""
-
xmlns:xsi=""
-
xsi:schemaLocation=" ">
-
<modelVersion>4.0.0</modelVersion>
-
-
<groupId>TotalNumOfLetters</groupId>
-
<artifactId>com.xxxx.udaf</artifactId>
-
<version>1.0-SNAPSHOT</version>
-
-
<dependencies>
-
<dependency>
-
<groupId>org.apache.hive</groupId>
-
<artifactId>hive-exec</artifactId>
-
<version>2.6.0</version>
-
</dependency>
-
-
<dependency>
-
<groupId>org.apache.hadoop</groupId>
-
<artifactId>hadoop-client</artifactId>
-
<version>2.6.0</version>
-
</dependency>
-
</dependencies>
-
-
<build>
-
<plugins>
-
<plugin>
-
<groupId>org.apache.maven.plugins</groupId>
-
<artifactId>maven-jar-plugin</artifactId>
-
<configuration>
-
<archive>
-
<manifest>
-
<mainClass>com.xxxx.udaf.xxxx</mainClass>
-
</manifest>
-
</archive>
-
</configuration>
-
</plugin>
-
<plugin>
-
<groupId>com.jolira</groupId>
-
<artifactId>onejar-maven-plugin</artifactId>
-
<version>1.4.4</version>
-
<executions>
-
<execution>
-
<configuration>
-
<attachToBuild>true</attachToBuild>
-
<classifier>onejar</classifier>
-
</configuration>
-
<goals>
-
<goal>one-jar</goal>
-
</goals>
-
</execution>
-
</executions>
-
</plugin>
-
<plugin>
-
<groupId>org.apache.maven.plugins</groupId>
-
<artifactId>maven-compiler-plugin</artifactId>
-
<configuration>
-
<source>7</source>
-
<target>7</target>
-
</configuration>
-
</plugin>
-
</plugins>
-
</build>
-
-
</project>
代码
-
package com.xxxx.udaf;
-
-
import org.apache.hadoop.hive.ql.exec.Description;
-
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
-
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
-
import org.apache.hadoop.hive.ql.metadata.HiveException;
-
import org.apache.hadoop.hive.ql.parse.SemanticException;
-
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
-
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
-
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-
-
-
@Description(name = "letters", value = "__FUNC__(expr) - return the total count chars of the column(返回该列中所有字符串的字符总数)")
-
public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {
-
-
@Override
-
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
-
if (parameters.length != 1) { // 判断参数长度
-
throw new UDFArgumentLengthException("Exactly one argument is expected, but " +
-
parameters.length + " was passed!");
-
}
-
-
ObjectInspector objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
-
-
if (objectInspector.getCategory() != ObjectInspector.Category.PRIMITIVE) { // 是不是标准的java Object的primitive类型
-
throw new UDFArgumentTypeException(0, "Argument type must be PRIMARY. but " +
-
objectInspector.getCategory().name() + " was passed!");
-
}
-
-
// 如果是标准的java Object的primitive类型,说明可以进行类型转换
-
PrimitiveObjectInspector in putOI = (PrimitiveObjectInspector) objectInspector;
-
-
// 如果是标准的java Object的primitive类型,判断是不是string类型,因为参数只接受string类型
-
if (in putOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
-
throw new UDFArgumentTypeException(0, "Argument type must be Strig, but " +
-
in putOI.getPrimitiveCategory().name() + " was passed!");
-
}
-
-
return new TotalNumOfLettersEvaluator();
-
}
-
-
public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {
-
-
PrimitiveObjectInspector in putIO;
-
ObjectInspector outputIO;
-
PrimitiveObjectInspector IntegerIO;
-
-
int total = 0;
-
-
@Override
-
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
-
assert (parameters.length == 1);
-
super.init(m, parameters);
-
-
/**
-
* PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
-
* 将会调用iterate()和terminatePartial()
-
-
* PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
-
* 将会调用merge() 和 terminatePartial()
-
-
* FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
-
* 将会调用merge()和terminate()
-
-
* COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
-
* 将会调用 iterate()和terminate()
-
*/
-
-
//map阶段读取sql列,输入为String基础数据格式
-
if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
-
in putIO = (PrimitiveObjectInspector) parameters[0];
-
} else { //其余阶段,输入为Integer基础数据格式
-
IntegerIO = (PrimitiveObjectInspector) parameters[0];
-
}
-
-
// 指定各个阶段输出数据格式都为Integer类型
-
outputIO = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
-
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
-
return outputIO;
-
}
-
-
/**
-
* 存储当前字符总数的类
-
*/
-
static class LetterSumAgg implements AggregationBuffer {
-
int sum = 0;
-
-
void add(int num) {
-
sum += num;
-
}
-
}
-
-
@Override
-
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
-
LetterSumAgg result = new LetterSumAgg();
-
return result;
-
}
-
-
@Override
-
public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
-
LetterSumAgg myAgg = new LetterSumAgg();
-
}
-
-
private boolean warned = false;
-
-
@Override
-
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
-
assert (parameters.length == 1);
-
if (parameters[0] != null) {
-
LetterSumAgg myAgg = (LetterSumAgg) agg;
-
Object p = in putIO.getPrimitiveJavaObject(parameters[0]);
-
myAgg.add(String.valueOf(p).length());
-
}
-
}
-
-
@Override
-
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
-
LetterSumAgg myAgg = (LetterSumAgg) agg;
-
total += myAgg.sum;
-
return total;
-
}
-
-
@Override
-
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
-
if (partial != null) {
-
LetterSumAgg myAgg1 = (LetterSumAgg) agg;
-
Integer partialSum = (Integer) IntegerIO.getPrimitiveJavaObject(partial);
-
LetterSumAgg myAgg2 = new LetterSumAgg();
-
myAgg2.add(partialSum);
-
myAgg1.add(myAgg2.sum);
-
}
-
}
-
-
@Override
-
public Object terminate(AggregationBuffer agg) throws HiveException {
-
LetterSumAgg myAgg = (LetterSumAgg) agg;
-
total = myAgg.sum;
-
return myAgg.sum;
-
}
-
}
-
-
}
使用自定义函数
-
-
ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;
-
CREATE TEMPORARY FUNCTION letters as 'com.xxxx.udaf.TotalNumOfLettersGenericUDAF';
-
SELECT letters(name) FROM people;
-
OK
-
44
-
Time taken: 20.688 seconds