Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1030983
  • 博文数量: 164
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1336
  • 用 户 组: 普通用户
  • 注册时间: 2016-03-11 14:13
个人简介

狂甩酷拽吊炸天

文章分类

全部博文(164)

文章存档

2023年(1)

2022年(3)

2021年(4)

2020年(17)

2019年(37)

2018年(17)

2017年(35)

2016年(50)

分类: LINUX

2019-11-06 11:55:21

hive的比较难的部分应该就是GenericUDAF,看了两天终于看明白了,有些点是我自己遇到卡住的点,记录下来希望对大家有所帮助。


一开始看的是《Hive 编程指南》中关于GenericUDAF的章节,例子有点难了。讲的是group_concat的实现。查了资料后觉得网上写的博客非常好,例子比较简单,更能够明白到底在说什么。


一定要结合MapReduce的过程来看,才会想明白。


不要偷懒,要将文章中的代码下载下来,具体查看他所继承的类,才能更快速理解内容。

关于ObjectInspector
想要理解GenericUDAF一定要理解ObjectInspector,这是贯彻在其中的一个概念。这里也说了一篇博文,一定要参考。Hive中ObjectInspector的作用。
ObjectInspector在我看来是什么?其实就是类型检查和通过这个类能得到值,并且帮助它序列化和反序列化,达到数据流转无障碍的目的。


UDAF是Hive中用户自定义的聚集函数,Hive内置UDAF函数包括有sum()与count(),UDAF实现有简单与通用两种方式,简单UDAF因为使用Java反射导致性能损失,而且有些特性不能使用,已经被弃用了;在这篇博文中我们将关注Hive中自定义聚类函数-GenericUDAF,UDAF开发主要涉及到以下两个抽象类:

点击(此处)折叠或打开

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
  2. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator


如果你想浏览代码:fork it on Github:

示例数据准备
首先先创建一张包含示例数据的表:people,该表只有name一列,该列中包含了一个或多个名字,该表数据保存在people.txt文件中。

点击(此处)折叠或打开

  1. ~$ cat ./people.txt
  2. John Smith
  3. John and Ann White
  4. Ted Green
  5. Dorothy


把该文件上载到HDFS目录/user/matthew/people中:

点击(此处)折叠或打开

  1. hadoop fs -mkdir people
  2. hadoop fs -put ./people.txt people


下面要创建Hive外部表,在Hive shell中执行

点击(此处)折叠或打开

  1. CREATE EXTERNAL TABLE people (name string)
  2. ROW FORMAT DELIMITED FIELDS
  3. TERMINATED BY '\t'
  4. ESCAPED BY ''
  5. LINES TERMINATED BY '\n'
  6. STORED AS TEXTFILE
  7. LOCATION '/user/matthew/people';



相关抽象类介绍
创建一个GenericUDAF必须先了解以下两个抽象类:

点击(此处)折叠或打开

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
  2. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator


为了更好理解上述抽象类的API,要记住hive只是mapreduce函数,只不过hive已经帮助我们写好并隐藏mapreduce,向上提供简洁的sql函数,所以我们要结合Mapper、Combiner与Reducer来帮助我们理解这个函数。要记住在hadoop集群中有若干台机器,在不同的机器上Mapper与Reducer任务独立运行。

所以大体上来说,这个UDAF函数读取数据(mapper),聚集一堆mapper输出到部分聚集结果(combiner),并且最终创建一个最终的聚集结果(reducer)。因为我们跨域多个combiner进行聚集,所以我们需要保存部分聚集结果。

AbstractGenericUDAFResolver
Resolver很简单,要覆盖实现下面方法,该方法会根据sql传人的参数数据格式指定调用哪个Evaluator进行处理。

点击(此处)折叠或打开

  1. public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException;
GenericUDAFEvaluator


UDAF逻辑处理主要发生在Evaluator中,要实现该抽象类的几个方法。
在理解Evaluator之前,必须先理解objectInspector接口与GenericUDAFEvaluator中的内部类Model。
ObjectInspector
作用主要是解耦数据使用与数据格式,使得数据流在输入输出端切换不同的输入输出格式,不同的Operator上使用不同的格式。可以参考这两篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有关于objectinspector的介绍。
Model
Model代表了UDAF在mapreduce的各个阶段。

点击(此处)折叠或打开

  1. public static enum Mode {
  2. /**
  3.  * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
  4.  * 将会调用iterate()和terminatePartial()
  5.  */
  6. PARTIAL1,
  7. /**
  8.  * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
  9.  * 将会调用merge() 和 terminatePartial()
  10.  */
  11. PARTIAL2,
  12. /**
  13.  * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
  14.  * 将会调用merge()和terminate()
  15.  */
  16. FINAL,
  17. /**
  18.  * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
  19.  * 将会调用 iterate()和terminate()
  20.  */
  21. COMPLETE
  22. };

		
  1. 一般情况下,完整的UDAF逻辑是一个mapreduce过程,如果有mapper和reducer,就会经历PARTIAL1(mapper),FINAL(reducer),如果还有combiner,那就会经历PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些情况下的mapreduce,只有mapper,而没有reducer,所以就会只有COMPLETE阶段,这个阶段直接输入原始数据,出结果。

点击(此处)折叠或打开

  1. GenericUDAFEvaluator的方法
  2. // 确定各个阶段输入输出参数的数据格式ObjectInspectors
  3. public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
  4. // 保存数据聚集结果的类
  5. abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
  6. // 重置聚集结果
  7. public void reset(AggregationBuffer agg) throws HiveException;
  8. // map阶段,迭代处理输入sql传过来的列数据
  9. public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
  10. // map与combiner结束返回结果,得到部分数据聚集结果
  11. public Object terminatePartial(AggregationBuffer agg) throws HiveException;
  12. // combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
  13. public void merge(AggregationBuffer agg, Object partial) throws HiveException;
  14. // reducer阶段,输出最终结果
  15. public Object terminate(AggregationBuffer agg) throws HiveException;


图解Model与Evaluator关系

实例
下面将讲述一个聚集函数UDAF的实例,我们将计算people这张表中的name列字母的个数。
下面的函数代码是计算指定列中字符的总数(包括空格)

pom文件如下:

点击(此处)折叠或打开

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns=""
  3.          xmlns:xsi=""
  4.          xsi:schemaLocation=" ">
  5.     <modelVersion>4.0.0</modelVersion>

  6.     <groupId>TotalNumOfLetters</groupId>
  7.     <artifactId>com.xxxx.udaf</artifactId>
  8.     <version>1.0-SNAPSHOT</version>

  9.     <dependencies>
  10.         <dependency>
  11.             <groupId>org.apache.hive</groupId>
  12.             <artifactId>hive-exec</artifactId>
  13.             <version>2.6.0</version>
  14.         </dependency>

  15.         <dependency>
  16.             <groupId>org.apache.hadoop</groupId>
  17.             <artifactId>hadoop-client</artifactId>
  18.             <version>2.6.0</version>
  19.         </dependency>
  20.     </dependencies>

  21.     <build>
  22.         <plugins>
  23.             <plugin>
  24.                 <groupId>org.apache.maven.plugins</groupId>
  25.                 <artifactId>maven-jar-plugin</artifactId>
  26.                 <configuration>
  27.                     <archive>
  28.                         <manifest>
  29.                             <mainClass>com.xxxx.udaf.xxxx</mainClass>
  30.                         </manifest>
  31.                     </archive>
  32.                 </configuration>
  33.             </plugin>
  34.             <plugin>
  35.                 <groupId>com.jolira</groupId>
  36.                 <artifactId>onejar-maven-plugin</artifactId>
  37.                 <version>1.4.4</version>
  38.                 <executions>
  39.                     <execution>
  40.                         <configuration>
  41.                             <attachToBuild>true</attachToBuild>
  42.                             <classifier>onejar</classifier>
  43.                         </configuration>
  44.                         <goals>
  45.                             <goal>one-jar</goal>
  46.                         </goals>
  47.                     </execution>
  48.                 </executions>
  49.             </plugin>
  50.             <plugin>
  51.                 <groupId>org.apache.maven.plugins</groupId>
  52.                 <artifactId>maven-compiler-plugin</artifactId>
  53.                 <configuration>
  54.                     <source>7</source>
  55.                     <target>7</target>
  56.                 </configuration>
  57.             </plugin>
  58.         </plugins>
  59.     </build>

  60. </project>



代码

点击(此处)折叠或打开

  1. package com.xxxx.udaf;

  2. import org.apache.hadoop.hive.ql.exec.Description;
  3. import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
  4. import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
  5. import org.apache.hadoop.hive.ql.metadata.HiveException;
  6. import org.apache.hadoop.hive.ql.parse.SemanticException;
  7. import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
  8. import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
  9. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  10. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
  11. import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
  12. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
  13. import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;


  14. @Description(name = "letters", value = "__FUNC__(expr) - return the total count chars of the column(返回该列中所有字符串的字符总数)")
  15. public class TotalNumOfLettersGenericUDAF extends AbstractGenericUDAFResolver {

  16.     @Override
  17.     public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
  18.         if (parameters.length != 1) { // 判断参数长度
  19.             throw new UDFArgumentLengthException("Exactly one argument is expected, but " +
  20.                     parameters.length + " was passed!");
  21.         }

  22.         ObjectInspector objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);

  23.         if (objectInspector.getCategory() != ObjectInspector.Category.PRIMITIVE) { // 是不是标准的java Object的primitive类型
  24.             throw new UDFArgumentTypeException(0, "Argument type must be PRIMARY. but " +
  25.                     objectInspector.getCategory().name() + " was passed!");
  26.         }

  27.         // 如果是标准的java Object的primitive类型,说明可以进行类型转换
  28.         PrimitiveObjectInspector in putOI = (PrimitiveObjectInspector) objectInspector;

  29.         // 如果是标准的java Object的primitive类型,判断是不是string类型,因为参数只接受string类型
  30.         if (in putOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
  31.             throw new UDFArgumentTypeException(0, "Argument type must be Strig, but " +
  32.                     in putOI.getPrimitiveCategory().name() + " was passed!");
  33.         }

  34.         return new TotalNumOfLettersEvaluator();
  35.     }

  36.     public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {

  37.         PrimitiveObjectInspector in putIO;
  38.         ObjectInspector outputIO;
  39.         PrimitiveObjectInspector IntegerIO;

  40.         int total = 0;

  41.         @Override
  42.         public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
  43.             assert (parameters.length == 1);
  44.             super.init(m, parameters);

  45.             /**
  46.              * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
  47.              * 将会调用iterate()和terminatePartial()

  48.              * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合:
  49.              * 将会调用merge() 和 terminatePartial()

  50.              * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合
  51.              * 将会调用merge()和terminate()

  52.              * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
  53.              * 将会调用 iterate()和terminate()
  54.              */

  55.             //map阶段读取sql列,输入为String基础数据格式
  56.             if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
  57.                 in putIO = (PrimitiveObjectInspector) parameters[0];
  58.             } else { //其余阶段,输入为Integer基础数据格式
  59.                 IntegerIO = (PrimitiveObjectInspector) parameters[0];
  60.             }

  61.             // 指定各个阶段输出数据格式都为Integer类型
  62.             outputIO = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,
  63.                     ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
  64.             return outputIO;
  65.         }

  66.         /**
  67.          * 存储当前字符总数的类
  68.          */
  69.         static class LetterSumAgg implements AggregationBuffer {
  70.             int sum = 0;

  71.             void add(int num) {
  72.                 sum += num;
  73.             }
  74.         }

  75.         @Override
  76.         public AggregationBuffer getNewAggregationBuffer() throws HiveException {
  77.             LetterSumAgg result = new LetterSumAgg();
  78.             return result;
  79.         }

  80.         @Override
  81.         public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
  82.             LetterSumAgg myAgg = new LetterSumAgg();
  83.         }

  84.         private boolean warned = false;

  85.         @Override
  86.         public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
  87.             assert (parameters.length == 1);
  88.             if (parameters[0] != null) {
  89.                 LetterSumAgg myAgg = (LetterSumAgg) agg;
  90.                 Object p = in putIO.getPrimitiveJavaObject(parameters[0]);
  91.                 myAgg.add(String.valueOf(p).length());
  92.             }
  93.         }

  94.         @Override
  95.         public Object terminatePartial(AggregationBuffer agg) throws HiveException {
  96.             LetterSumAgg myAgg = (LetterSumAgg) agg;
  97.             total += myAgg.sum;
  98.             return total;
  99.         }

  100.         @Override
  101.         public void merge(AggregationBuffer agg, Object partial) throws HiveException {
  102.             if (partial != null) {
  103.                 LetterSumAgg myAgg1 = (LetterSumAgg) agg;
  104.                 Integer partialSum = (Integer) IntegerIO.getPrimitiveJavaObject(partial);
  105.                 LetterSumAgg myAgg2 = new LetterSumAgg();
  106.                 myAgg2.add(partialSum);
  107.                 myAgg1.add(myAgg2.sum);
  108.             }
  109.         }

  110.         @Override
  111.         public Object terminate(AggregationBuffer agg) throws HiveException {
  112.             LetterSumAgg myAgg = (LetterSumAgg) agg;
  113.             total = myAgg.sum;
  114.             return myAgg.sum;
  115.         }
  116.     }

  117. }


使用自定义函数
		
  1. 点击(此处)折叠或打开

    1. ADD JAR ./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;
    2. CREATE TEMPORARY FUNCTION letters as 'com.xxxx.udaf.TotalNumOfLettersGenericUDAF';
    3. SELECT letters(name) FROM people;
    4. OK
    5. 44
    6. Time taken: 20.688 seconds


阅读(2797) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~