Chinaunix首页 | 论坛 | 博客
  • 博客访问: 836218
  • 博文数量: 167
  • 博客积分: 7173
  • 博客等级: 少将
  • 技术积分: 1671
  • 用 户 组: 普通用户
  • 注册时间: 2009-08-04 23:07
文章分类

全部博文(167)

文章存档

2018年(1)

2017年(11)

2012年(2)

2011年(27)

2010年(88)

2009年(38)

分类: 云计算

2011-10-27 11:22:27

假设一个这样的应用场景:需要对一个大日志文件(这个文件有100G或1T大)做用户来源的统计,需要
得到的结果是:广东省 xxx;北京xxx…… 或 电信 xxxx ;联通 xxxx。
        主要的工作步骤很简单:获取到用户的IP,解析到用户IP的地址,然后做地址的统计。
        现在我们用hadoop来简单实现),在实现过程中,对ip来源地址的解析我现在偷懒只是简单使用了web调用接口,而不使用程序client调用接口和解析纯真库。

       定义Map方法(日志格式为:User_IP|date|URI……):

主要流程为:获取到每行的第一个字段,使用whois对这个ip进行解析,然后,将来源地址和来源地址的数量做一个key-value。

Reduce:
 
这个就更简单了;对每个key-value对做相加的操作。

主体程序:

在这里感叹一句:以前写个分布式并行运行可麻烦一点,现在很多的一些调度框架使用起来做
这方面的事情方便很多)


[hadoop@hadoop-237-13 bin]$ hadoop fs -cat /test/userplace/gzoutput/part-00000
广东省广州市 电信       3
广东省汕尾市 电信       1
江苏省苏州市 电信       1
福建省泉州市 电信       1



在这里引申一个问题,hadoop中使用第三方包的方法:
1.将第三方jar包放在集群中每个节点$HADOOP_HOME/lib目录下或者JDK的ext目录下,其中
$HAOOP_HOME为Hadoop的根目录。善于偷懒所以我也采用这种了,因为我做的一些运算使用
的包还是比较固定,在装hadoop的时候打包进去就行。

2.将所有的jar包解压缩,然后把他和源程序的类文件打包到一个jar包中。

3.使用DistributeCache  http://blog.csdn.net/amuseme_lu/article/details/6706110

  对DistribueCache的使用是抄网上的
  a. 配置应用程序的cache,把需要使用的文件上传到DFS中去   
     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat    
     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip 
     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar 
$ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
$ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz  
   $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz  


  b. 配置JobConf
   JobConf job = new JobConf();  
  DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),job); // 这里的lookup.dat加了一个符号连接
  DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); 
  DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); // 这里是把相应的jar包加到Task的启动路径上去
  DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); 
  DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); // 这里是把相应的jar包加到Task的启动路径上去
  Distri
butedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); 
  DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job)




c. 在Mapper或者Reducer任务中使用这些文件
    
  • public static class MapClass extends MapReduceBase    
  •     implements Mapper {  

  •       private Path[] localArchives;  
  •       private Path[] localFiles;  

  •       public void configure(JobConf job) {  
  •         // Get the cached archives/files  
  •         localArchives = DistributedCache.getLocalCacheArchives(job);  // 得到本地打包的文件,一般是数据文件,如字典文件  
  •         localFiles = DistributedCache.getLocalCacheFiles(job);        // 得到本地缓冲的文件,一般是配置文件等  
  •       }  

  •       public void map(K key, V value,   
  •                       OutputCollector output, Reporter reporter)   
  •       throws IOException {  
  •         // Use data from the cached archives/files here  
  •         // ...  
  •         // ...  
  •         output.collect(k, v);  
  •       }  
  •     } 

阅读(3724) | 评论(0) | 转发(0) |
0

上一篇:nginx perl模块处理post请求

下一篇:Hive基础

给主人留下些什么吧!~~