2013年(15)
分类: HADOOP
2013-10-29 16:45:38
在Hive中,某些小技巧可以让我们的Job执行得更快,有时一点小小的改动就可以让性能得到大幅提升,这一点其实跟SQL差不多。
首先,Hive != SQL,虽然二者的语法很像,但是Hive最终会被转化成MapReduce的代码去执行,所以数据库的优化原则基本上都不适用于 Hive。也正因如此,Hive实际上是用来做计算的,而不像数据库是用作存储的,当然数据库也有很多计算功能,但一般并不建议在SQL中大量使用计算,把数据库只当作存储是一个很重要的原则。
在处理海量数据时我们通常会对很多大表进行操作,基于Hadoop现在的局限性,不能像分布式并行数据库那样很好地在分布式环境利用数据局部性,Hadoop对于大表只能全表扫描并筛选数据,而每一次对大表的扫描都是苦不堪言的。(最后知道真相的我眼泪掉下来。。。)
所以我们会用到在编码中经常用到的重构技巧,提取公共变量,在Hive中,就是创建临时表。
例如:现在要对三个表A、B、C进行处理,Hive QL是:
select T1.*, T2.* from (select id, name from A) T1 join (select id, price, feedback, type from B) T2 on T1.id = T2.id;select T1.*, T2.* from (select id, type from C) T1 join (select id, price,feedback, attribute from B) T2 on T1.id = T2.id;这里A表和C表只会被扫描一次,而B表会被扫描两次,如果B表的数据量很大,那么扫描B表的时间将会占用很大一块。
这里我们可以先创建一个临时表:
create table temp_B as select id, price, feedback, type, attribute from B;这个表只有B表的部分字段,所以大小会小很多,这里会对B表全表扫一遍。
然后可以用临时表和A、C表做join运算:
select T1.*, T2.* from (select id, name from A) T1 join (select id, price, feedback, type from temp_B) T2 on T1.id = T2.id;select T1.*, T2.* from (select id, type from C) T1 join (select id, price,feedback, attribute from temp_B) T2 on T1.id = T2.id;这样任务的执行速度将会有极大提升!尽管看起来多了一条Hive QL,但是后两个任务需要扫描的数据将会变得很小。
如果我们要对多种条件进行COUNT,可以利用case语句进行,这样一条Hive QL就可以完成了。
select count(case when type = 1 then 1 end), count(case when type = 2 then 1 end) from table;
首先需要用create table在HDFS上生成你所需要的表,当需要从HDFS上将表对应的文件导出到本地磁盘时有两种方式:
1、如果需要保持HDFS上的目录结构,原封不动地复制下来,采用下面的命令:
set hive.exec.compress.output='false'; INSERT OVERWRITE LOCAL DIRECTORY '/home/hesey/directory' select * from table;这样下载下来的目录中会有很多由Reducer产生的part-*文件。
2、如果想把表的所有数据都下载到一个文件中,则采用下面的命令:
hadoop dfs -getmerge hdfs://hdpnn:9000/hesey/hive/table /home/hesey/table.txt这样所有文件会由Hadoop合并后下载到本地,最后就只有/home/hesey/table.txt这一个文件。
在Hive中很多时候都需要做一些复杂的计算或者逻辑处理,这时候Hive本身作为一个通用框架没法很好地支持,所以有了UDF(User Defined Function)。
1、使用UDF
(a)如果是已经上传到Hive服务器的UDF,可以直接用
create temporary function dosomething as 'net.hesey.udf.DoSomething';声明临时函数,然后在下面的Hive QL中就可以调用dosomething这个方法了。
(b)如果是自己编写的UDF,需要在声明临时函数前再加一行:
add jar /home/hesey/foo.jar这样就可以把自定义的UDF加载进来,然后和(a)一样声明临时函数就可以了。
2、编写UDF
编写UDF十分简单,引入hive-exec包,继承org.apache.hadoop.hive.ql.exec.UDF类,实现evaluate方法即可,方法的输入和输出参数类型就是当你在Hive中调用时的输入和返回值。
例如:
public Text evaluate(final LongWritable number);(Text和LongWritable是org.apache.hadoop.io下面的类)
这样我们就可以定义自己的函数并方便地在Hive中调用,而不需要写一个重量级的MapReduce。
Hive本身是不支持笛卡尔积的,不能用select T1.*, T2.* from table_1, table_2这种语法。但有时候确实需要用到笛卡尔积的时候,可以用下面的语法来实现同样的效果:
select T1.*, T2.* from (select * from table1) T1 join (select * from table2) T2 on 1=1;其中on 1=1是可选的,注意在Hive的Strict模式下不能用这种语法,需要先用set hive.mapred.mode=nonstrict;设为非strict模式就可以用了。
当Hive做join运算时,join前面的表会被放入内存,所以在做join时,最好把小表放在前面,有利于提高性能并防止OOM。
在SQL中排序通过ORDER by实现,Hive中也支持这种语法,但是使用ORDER by时最终所有的数据会汇总到一个Reducer上进行排序,可能使得该Reducer压力非常大,任务长时间无法完成。
如果排序只要求保证Value有序而Key可以无序,例如要统计每个用户每笔的交易额从高到低排列,只需要对每个用户的交易额排序,而用户ID本身不需要排序。这种情况采用分片排序更好,语法类似于:
select user_id, amount from table distribute by user_id sort by user_id, amount这里用到的不是ORDER by,而是distribute by和sort by,distribute by标识Map输出时分发的Key。
这样最后排序的时候,相同的user_id和amount在同一个Reducer上被排序,不同的user_id可以同时分别在多个Reducer上排序,相比ORDER by只能在一个Reducer上排序,速度有成倍的提升。