Chinaunix首页 | 论坛 | 博客
  • 博客访问: 657029
  • 博文数量: 149
  • 博客积分: 3901
  • 博客等级: 中校
  • 技术积分: 1558
  • 用 户 组: 普通用户
  • 注册时间: 2009-02-16 14:33
文章分类

全部博文(149)

文章存档

2014年(2)

2013年(10)

2012年(32)

2011年(21)

2010年(84)

分类: 云计算

2012-07-05 19:48:29


需求:在hive里我们要一个自定义的 求平均值 方法 ,因为 普通的 avg 会受极大值印象
     (做统计的同学你们懂的!)

然后 去网上 和 hive 源码的例子中 找参考 !
结果发现个悲剧 。。。
   google : hive udaf avg
   hive -src : hive-0.9.0/src/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example/UDAFExampleAvg.java

谢谢 @昌瑜 同学,提供的参考 :
  #一些可以的定制 udf
  # 一个官网的 : Writing GenericUDAFs: A Tutorial
  https://cwiki.apache.org/Hive/genericudafcasestudy.html
  # api 上可以看到 hive 官方提供的 udaf 
 

不说了 ,支撑 多个 map-reduce 的 最简单的 avg。
使用:
  sudo -u hdfs hadoop fs -rm /tmp/lky/myudf.jar
  sudo -u hdfs hadoop fs -put /opt/myudf.jar /tmp/lky/myudf.jar
  add jar hdfs://hadoop131.chinacache.com:8020/tmp/lky/myudf.jar;

  CREATE TEMPORARY FUNCTION c_avg  AS 'ccindex.hive.udaf.Avg';
  CREATE TEMPORARY FUNCTION c_count AS 'ccindex.hive.udaf.Count';
  CREATE TEMPORARY FUNCTION to_float  AS 'ccindex.hive.udf.String2Float';


select flag ,c_avg( to_float(speed)  ),c_count(1) from fc_log where  pt = '20120618' group by flag ;
    busy    277.96896    7708
    idle    404.15356    6878
    other    408.30063    58774



对比 :
select flag ,avg( to_float(speed)  ),count(1) from fc_log where  pt = '20120618' group by flag ;
   busy    277.96925552735286    7708
   idle    404.15324648068884    6878
   other    408.3017428226047    58774





 
源码 :

点击(此处)折叠或打开

  1. package ccindex.hive.udaf;

  2. /**
  3.  * 参考 :
  4.  *   UDAF :
  5.  *   对象序列化参考 : http://oss-dataturbine.googlecode.com/svn/trunk/dev/j2me-rbnb/src/com/rbnb/utility/ByteConvert.java
  6.  */
  7. import java.io.DataInput;
  8. import java.io.DataOutput;
  9. import java.io.IOException;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. import java.util.Set;

  13. import org.apache.hadoop.hive.ql.exec.UDAF;
  14. import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
  15. import org.apache.hadoop.io.BinaryComparable;
  16. import org.apache.hadoop.io.WritableComparable;

  17. import ccindex.util.ByteUtils;
  18. import ccindex.util.NumUtils;

  19. public class Avg extends UDAF {
  20.     
  21.     public static class avgScore extends BinaryComparable implements
  22.             WritableComparable<BinaryComparable> {
  23.         
  24.         private Map< Float, Integer> datas = new HashMap<Float, Integer>() ;
  25.         
  26.         public void clear(){
  27.             datas.clear();
  28.         }
  29.         
  30.         public void set(Float f,Integer c){
  31.             if( datas.containsKey(f) ){
  32.                 datas.put(f, datas.get(f)+c);
  33.             }else{
  34.                 datas.put(f,c);
  35.             }
  36.         }
  37.         
  38.         public Set<Map.Entry<Float, Integer>> getAll(){
  39.             return datas.entrySet();
  40.         }
  41.         
  42.         public Integer get(Float f){
  43.             return datas.get(f);
  44.         }
  45.         
  46.         @Override
  47.         public void readFields(DataInput in) throws IOException {
  48.             datas.clear();
  49.             
  50.             int size = in.readInt();
  51.             System.out.println(size);
  52.             float[] fs = new float[size];
  53.             int[] is = new int[size];
  54.             
  55.             for (int i = 0; i < size; i++) {
  56.                 fs[i] = in.readFloat();
  57.             }
  58.             
  59.             for (int i = 0; i < size; i++) {
  60.                 is[i]=in.readInt();
  61.             }
  62.             
  63.             for (int i = 0; i < size; i++) {
  64.                 datas.put(fs[i], is[i]);
  65.             }
  66.         }

  67.         @Override
  68.         public void write(DataOutput out) throws IOException {
  69.             byte[] bytes = getBytes();
  70.             out.write(bytes);
  71.         }

  72.         @Override
  73.         public byte[] getBytes() {
  74.             // 获取 收集 总条数
  75.             int size = datas.size();
  76.             
  77.             /* 序列化 byte 格式 :
  78.              * 1. 总条数 = int
  79.              * 2. 速度 float [list] ,精确到 小数点后两位
  80.              * 3. 速度 float ,在此次收集中 出现的次数 [list]
  81.              */
  82.             byte[] bi = ByteUtils.int2Byte(new int[]{size});
  83.             
  84.             byte[] ret_byte = new byte[getLength()];
  85.             // 拷贝 总条数 到 ret_byte 的最前面
  86.             ByteUtils.byteInbyte(ret_byte, 0, bi, 0, bi.length);
  87.             
  88.             float[] fs = new float[size] ;
  89.             int[] is = new int[size] ;
  90.             
  91.             int step = 0 ;
  92.             for (Map.Entry<Float, Integer> kv : datas.entrySet()) {
  93.                 fs[step] = kv.getKey();
  94.                 is[step] = kv.getValue();
  95.                 step ++ ;
  96.             }
  97.             byte[] bfs = ByteUtils.float2Byte(fs);
  98.             byte[] bis = ByteUtils.int2Byte(is);
  99.             
  100.             ByteUtils.byteInbyte(ret_byte, bi.length,bfs , 0, bfs.length);
  101.             ByteUtils.byteInbyte(ret_byte, bi.length +bfs.length ,bis , 0, bis.length);
  102.             
  103.             return ret_byte ;
  104.         }

  105.         @Override
  106.         public int getLength() {
  107.             // 获取 收集 总条数
  108.             int size = datas.size();
  109.             return (Integer.SIZE + size*Float.SIZE + size*Integer.SIZE)/Byte.SIZE;
  110.         }

  111.     }

  112.     public static class AvgEvaluator implements UDAFEvaluator {
  113.         avgScore score;

  114.         public AvgEvaluator() {
  115.             score = new avgScore();
  116.             init();
  117.         }

  118.         /*
  119.          * init函数类似于构造函数,用于UDAF的初始化
  120.          */
  121.         public void init() {}
  122.         
  123.         /*
  124.          * iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean类似Combiner中的mapper
  125.          */
  126.         public boolean iterate(Float ff) {
  127.             if(ff!=null){
  128.                 // 为节省 空间 ,float 精确到小声点后两位
  129.                 float f1 = NumUtils.floatDecimalPlaces(new float[]{ff},2)[0];
  130.                 score.set(f1, 1);    
  131.             }
  132.             return true;
  133.         }

  134.         /*
  135.          * terminatePartial无参数,其为iterate函数轮转结束后
  136.          *     返回轮转数据类似Combiner中的reducer
  137.          */
  138.         public avgScore terminatePartial() {
  139.             return score.getLength() == 0 ? null : score;
  140.         }

  141.         /*
  142.          * merge接收terminatePartial的返回结果,进行数据merge操作
  143.          *     其返回类型为boolean
  144.          */
  145.         public boolean merge(avgScore in) {
  146.             for (Map.Entry<Float, Integer> kv : in.getAll()) {
  147.                 score.set(kv.getKey(), kv.getValue());
  148.             }
  149.             return true;
  150.         }

  151.         /*
  152.          * terminate返回最终的聚集函数结果
  153.          */
  154.         public float terminate() {
  155.             float all = 0f ;
  156.             
  157.             int num = 0 ;
  158.             for (Map.Entry<Float, Integer> kv : score.getAll()) {
  159.                 all += kv.getKey() * kv.getValue();
  160.                 num += kv.getValue();
  161.             }
  162.             
  163.             /*注意:
  164.              *     发现分配到某台reduce上的 多个 group key 会顺序使用这个 对象 的
  165.              *     在每个 group key 后都需要清除内存 */
  166.             score.clear();
  167.             
  168.             return all/num;
  169.         }
  170.     }
  171.     
  172. }



  请期待,后面我可能会在这个方式上加上些 按 正态分布 获取不受极大或极小值影响的 平均值 。

  

阅读(4270) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~