需求:在hive里我们要一个自定义的 求平均值 方法 ,因为 普通的 avg 会受极大值印象
(做统计的同学你们懂的!)
然后 去网上 和 hive 源码的例子中 找参考 !
结果发现个悲剧 。。。
google : hive udaf avg
hive -src : hive-0.9.0/src/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleAvg.java
谢谢 @昌瑜 同学,提供的参考 :
#一些可以的定制 udf
# 一个官网的 : Writing GenericUDAFs: A Tutorial
https://cwiki.apache.org/Hive/genericudafcasestudy.html
# api 上可以看到 hive 官方提供的 udaf
不说了 ,支撑 多个 map-reduce 的 最简单的 avg。
使用:
sudo -u hdfs hadoop fs -rm /tmp/lky/myudf.jar
sudo -u hdfs hadoop fs -put /opt/myudf.jar /tmp/lky/myudf.jar
add jar hdfs://hadoop131.chinacache.com:8020/tmp/lky/myudf.jar; CREATE TEMPORARY FUNCTION c_avg AS 'ccindex.hive.udaf.Avg';
CREATE TEMPORARY FUNCTION c_count AS 'ccindex.hive.udaf.Count';
CREATE TEMPORARY FUNCTION to_float AS 'ccindex.hive.udf.String2Float';select flag ,c_avg( to_float(speed) ),c_count(1) from fc_log where pt = '20120618' group by flag ;
busy 277.96896 7708
idle 404.15356 6878
other 408.30063 58774
对比 :
select flag ,avg( to_float(speed) ),count(1) from fc_log where pt = '20120618' group by flag ;
busy 277.96925552735286 7708
idle 404.15324648068884 6878
other 408.3017428226047 58774 源码 :
- package ccindex.hive.udaf;
- /**
- * 参考 :
- * UDAF :
- * 对象序列化参考 : http://oss-dataturbine.googlecode.com/svn/trunk/dev/j2me-rbnb/src/com/rbnb/utility/ByteConvert.java
- */
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Set;
- import org.apache.hadoop.hive.ql.exec.UDAF;
- import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
- import org.apache.hadoop.io.BinaryComparable;
- import org.apache.hadoop.io.WritableComparable;
- import ccindex.util.ByteUtils;
- import ccindex.util.NumUtils;
- public class Avg extends UDAF {
-
- public static class avgScore extends BinaryComparable implements
- WritableComparable<BinaryComparable> {
-
- private Map< Float, Integer> datas = new HashMap<Float, Integer>() ;
-
- public void clear(){
- datas.clear();
- }
-
- public void set(Float f,Integer c){
- if( datas.containsKey(f) ){
- datas.put(f, datas.get(f)+c);
- }else{
- datas.put(f,c);
- }
- }
-
- public Set<Map.Entry<Float, Integer>> getAll(){
- return datas.entrySet();
- }
-
- public Integer get(Float f){
- return datas.get(f);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- datas.clear();
-
- int size = in.readInt();
- System.out.println(size);
- float[] fs = new float[size];
- int[] is = new int[size];
-
- for (int i = 0; i < size; i++) {
- fs[i] = in.readFloat();
- }
-
- for (int i = 0; i < size; i++) {
- is[i]=in.readInt();
- }
-
- for (int i = 0; i < size; i++) {
- datas.put(fs[i], is[i]);
- }
- }
- @Override
- public void write(DataOutput out) throws IOException {
- byte[] bytes = getBytes();
- out.write(bytes);
- }
- @Override
- public byte[] getBytes() {
- // 获取 收集 总条数
- int size = datas.size();
-
- /* 序列化 byte 格式 :
- * 1. 总条数 = int
- * 2. 速度 float [list] ,精确到 小数点后两位
- * 3. 速度 float ,在此次收集中 出现的次数 [list]
- */
- byte[] bi = ByteUtils.int2Byte(new int[]{size});
-
- byte[] ret_byte = new byte[getLength()];
- // 拷贝 总条数 到 ret_byte 的最前面
- ByteUtils.byteInbyte(ret_byte, 0, bi, 0, bi.length);
-
- float[] fs = new float[size] ;
- int[] is = new int[size] ;
-
- int step = 0 ;
- for (Map.Entry<Float, Integer> kv : datas.entrySet()) {
- fs[step] = kv.getKey();
- is[step] = kv.getValue();
- step ++ ;
- }
- byte[] bfs = ByteUtils.float2Byte(fs);
- byte[] bis = ByteUtils.int2Byte(is);
-
- ByteUtils.byteInbyte(ret_byte, bi.length,bfs , 0, bfs.length);
- ByteUtils.byteInbyte(ret_byte, bi.length +bfs.length ,bis , 0, bis.length);
-
- return ret_byte ;
- }
- @Override
- public int getLength() {
- // 获取 收集 总条数
- int size = datas.size();
- return (Integer.SIZE + size*Float.SIZE + size*Integer.SIZE)/Byte.SIZE;
- }
- }
- public static class AvgEvaluator implements UDAFEvaluator {
- avgScore score;
- public AvgEvaluator() {
- score = new avgScore();
- init();
- }
- /*
- * init函数类似于构造函数,用于UDAF的初始化
- */
- public void init() {}
-
- /*
- * iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean类似Combiner中的mapper
- */
- public boolean iterate(Float ff) {
- if(ff!=null){
- // 为节省 空间 ,float 精确到小声点后两位
- float f1 = NumUtils.floatDecimalPlaces(new float[]{ff},2)[0];
- score.set(f1, 1);
- }
- return true;
- }
- /*
- * terminatePartial无参数,其为iterate函数轮转结束后
- * 返回轮转数据类似Combiner中的reducer
- */
- public avgScore terminatePartial() {
- return score.getLength() == 0 ? null : score;
- }
- /*
- * merge接收terminatePartial的返回结果,进行数据merge操作
- * 其返回类型为boolean
- */
- public boolean merge(avgScore in) {
- for (Map.Entry<Float, Integer> kv : in.getAll()) {
- score.set(kv.getKey(), kv.getValue());
- }
- return true;
- }
- /*
- * terminate返回最终的聚集函数结果
- */
- public float terminate() {
- float all = 0f ;
-
- int num = 0 ;
- for (Map.Entry<Float, Integer> kv : score.getAll()) {
- all += kv.getKey() * kv.getValue();
- num += kv.getValue();
- }
-
- /*注意:
- * 发现分配到某台reduce上的 多个 group key 会顺序使用这个 对象 的
- * 在每个 group key 后都需要清除内存 */
- score.clear();
-
- return all/num;
- }
- }
-
- }
请期待,后面我可能会在这个方式上加上些 按 正态分布 获取不受极大或极小值影响的 平均值 。
阅读(4248) | 评论(0) | 转发(0) |