假设一个这样的应用场景:需要对一个大日志文件(这个文件有100G或1T大)做用户来源的统计,需要
得到的结果是:广东省 xxx;北京xxx…… 或 电信 xxxx ;联通 xxxx。
主要的工作步骤很简单:获取到用户的IP,解析到用户IP的地址,然后做地址的统计。
现在我们用hadoop来简单实现),在实现过程中,对ip来源地址的解析我现在偷懒只是简单使用了web调用接口,而不使用程序client调用接口和解析纯真库。
定义Map方法(日志格式为:User_IP|date|URI……):
主要流程为:获取到每行的第一个字段,使用whois对这个ip进行解析,然后,将来源地址和来源地址的数量做一个key-value。
Reduce:
这个就更简单了;对每个key-value对做相加的操作。
主体程序:
在这里感叹一句:以前写个分布式并行运行可麻烦一点,现在很多的一些调度框架使用起来做
这方面的事情方便很多)
[hadoop@hadoop-237-13 bin]$ hadoop fs -cat /test/userplace/gzoutput/part-00000
广东省广州市 电信 3
广东省汕尾市 电信 1
江苏省苏州市 电信 1
福建省泉州市 电信 1
在这里引申一个问题,hadoop中使用第三方包的方法:
1.将第三方jar包放在集群中每个节点$HADOOP_HOME/lib目录下或者JDK的ext目录下,其中
$HAOOP_HOME为Hadoop的根目录。善于偷懒所以我也采用这种了,因为我做的一些运算使用
的包还是比较固定,在装hadoop的时候打包进去就行。
2.将所有的jar包解压缩,然后把他和源程序的类文件打包到一个jar包中。
3.使用DistributeCache
http://blog.csdn.net/amuseme_lu/article/details/6706110 对DistribueCache的使用是抄网上的
a. 配置应用程序的cache,把需要使用的文件上传到DFS中去
$ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
$ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
$ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
$ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
b. 配置JobConf
JobConf job = new JobConf();
DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),job); // 这里的lookup.dat加了一个符号连接
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); // 这里是把相应的jar包加到Task的启动路径上去
DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); // 这里是把相应的jar包加到Task的启动路径上去
Distri
butedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job)
c. 在Mapper或者Reducer任务中使用这些文件
- public static class MapClass extends MapReduceBase
- implements Mapper {
- private Path[] localArchives;
- private Path[] localFiles;
- public void configure(JobConf job) {
- // Get the cached archives/files
- localArchives = DistributedCache.getLocalCacheArchives(job); // 得到本地打包的文件,一般是数据文件,如字典文件
- localFiles = DistributedCache.getLocalCacheFiles(job); // 得到本地缓冲的文件,一般是配置文件等
- }
- public void map(K key, V value,
- OutputCollector output, Reporter reporter)
- throws IOException {
- // Use data from the cached archives/files here
- // ...
- // ...
- output.collect(k, v);
- }
- }
阅读(3724) | 评论(0) | 转发(0) |