Chinaunix首页 | 论坛 | 博客
  • 博客访问: 652709
  • 博文数量: 149
  • 博客积分: 3901
  • 博客等级: 中校
  • 技术积分: 1558
  • 用 户 组: 普通用户
  • 注册时间: 2009-02-16 14:33
文章分类

全部博文(149)

文章存档

2014年(2)

2013年(10)

2012年(32)

2011年(21)

2010年(84)

分类: 服务器与存储

2011-05-27 16:47:13





package com.sohu.pvlog.main;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import javax.script.ScriptEngine;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.bloom.HashFunction;
import org.apache.hadoop.util.hash.Hash;

import com.sohu.pvlog.common.bean.DataFormat;
import com.sohu.pvlog.common.util.MapreduceUtil;
import com.sohu.pvlog.common.util.PropertiesFileUtil;

public class StatBase {

    public static class BaseKey extends BinaryComparable implements
            WritableComparable<BinaryComparable> {

        private int id;
        public int getId() {
            return id;
        }
        
        private int type;
        public int getType() {
            return type;
        }

        private int size;

        public int getSize() {
            return size;
        }

        private byte[] attr;
        private int pv;

        public int getPv() {
            return pv;
        }
        public void setPv(int pv) {
            this.pv=pv;
        }

        public void addPv(int pv) {
            this.pv += pv;
        }

        public byte[] getAttr() {
            return attr;
        }

        public void set(int id, int type, int size, byte[] attr, int pv) {
            this.id = id;
            this.type = type;
            this.size = size;

            this.attr = attr;
            this.pv = pv;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.id = in.readInt();
            this.type = in.readInt();
            this.size = in.readInt();

            int attrlen = in.readInt();
            attr = new byte[attrlen];
            in.readFully(attr);

            this.pv = in.readInt();
        }

        @Override
        public void write(DataOutput out) throws IOException {
            byte[] bytes = getBytes();
            out.write(bytes);
        }

        @Override
        public byte[] getBytes() {
            ByteArrayOutputStream buf = new ByteArrayOutputStream();
            DataOutput out = new DataOutputStream(buf);
            try {
                out.writeInt(this.id);
                out.writeInt(this.type);
                out.writeInt(this.size);

                out.writeInt(this.attr.length);
                out.write(this.attr);

                out.writeInt(this.pv);

                return buf.toByteArray();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return null;
        }

        @Override
        public int getLength() {
            return getBytes().length;
        }

        @Override
        public String toString() {
            StringBuffer sb = new StringBuffer();
            sb.append(this.getId()).append(",").append(this.getType())
                    .append(",").append(this.getSize()).append(",");
            try {
                sb.append(new BigInteger(this.getAttr())).append(",")
                        .append(this.getPv());
                ;
            } catch (Exception e) {
            }
            return sb.toString();
        }

    }

    public static long ipToLong(String strIp) {
        try {
            long[] ip = new long[4];
            // 先找到IP地址字符串中.的位置

            int position1 = strIp.indexOf(".");
            int position2 = strIp.indexOf(".", position1 + 1);
            int position3 = strIp.indexOf(".", position2 + 1);
            // 将每个.之间的字符串转换成整型

            ip[0] = Long.parseLong(strIp.substring(0, position1));
            ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2));
            ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3));
            ip[3] = Long.parseLong(strIp.substring(position3 + 1));
            return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
        } catch (Exception e) {
            return 0L;
        }
    }

    /**
     * #type= 0-栏目,1-频道 ,3-全站
     *
     * channel=suv,ip,userid
     *
     * channel=suv channel=ip channel=userid
     */

    public static class LoadMap extends
            Mapper<LongWritable, Text, BaseKey, NullWritable> {
        private DataFormat df = new DataFormat();
        private BaseKey bk = new BaseKey();

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            Configuration conf = context.getConfiguration();
            try {
                df.setExtractStr("$column|$channels|0,0|1|3=$suv,$ip,$userid",",|=");
                // df.setExtractStr("0,3=$suv",",|=");

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        // k1,k2,k3:v1,v2,v3

        public void map(LongWritable ikey, Text ivalue, Context context)
                throws IOException, InterruptedException {
            df.set(ivalue.toString(), "\t");
            String[] exstrs = df.getExtract();
            for (int i = 0; i < exstrs.length; i++) {
                String[] kv = exstrs[i].split("=");
                String[] kk = kv[0].split(",");
                String[] vv = kv[1].split(",");

                int id = Integer.valueOf(kk[0]);
                int type = Integer.valueOf(kk[1]);

                for (int j = 0; j < vv.length; j++) {
                    byte[] bs = new byte[0];
                    try {
                        if (j == 0) {
                            bs = new BigInteger(vv[0]).toByteArray();
                        } else if (j == 1) {
                            bs = BigInteger.valueOf(ipToLong(vv[1]))
                                    .toByteArray();
                        } else if (j == 2) {
                            bs = vv[2].getBytes();
                        }
                    } catch (NumberFormatException e) {
                    }
                    bk.set(id, type, j, bs, 1);
                    context.write(bk, NullWritable.get());
                }
            }
        }
    }

    // 根据第二列 %2 分区
    public static class MyPartitioner extends
            Partitioner<BaseKey, NullWritable> {
        
        @Override
        public int getPartition(BaseKey key, NullWritable value,int numPartitions) {
            int id = key.getId();
            int type = key.getType();
            //return (Math.abs( id * 3 -1 ) + Math.abs(type * 127)) % numPartitions;
            return (Math.abs( id * 127 )) % numPartitions;
        }
        
    }

    public static class GroupingComparator implements RawComparator<BaseKey> {
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return WritableComparator.compareBytes(
                    b1, s1,Integer.SIZE / 8 * 2,
                    b2, s2, Integer.SIZE / 8 * 2);
        }

        @Override
        public int compare(BaseKey o1, BaseKey o2) {
            return 0;
        }
    }

    public static class SortComparator implements RawComparator<BaseKey> {
        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            // id,type,size
            int ii = WritableComparator.compareBytes(b1, s1,
                    Integer.SIZE / 8 * 3, b2, s2, Integer.SIZE / 8 * 3);
            if (ii == 0) {
                return WritableComparator.compareBytes(b1, s1 + Integer.SIZE
                        / 8 * 4, l1 - Integer.SIZE / 8 * 5, b2, s2
                        + Integer.SIZE / 8 * 4, l2 - Integer.SIZE / 8 * 5);
            } else {
                return ii;
            }
        }

        @Override
        public int compare(BaseKey o1, BaseKey o2) {
            return 0 ;
        }
    }



    public static class IntSumCombiner extends
            Reducer<BaseKey, NullWritable, BaseKey, NullWritable> {

        private SortComparator sortComp = new SortComparator();
        private BaseKey tbk = new BaseKey();

        private byte[] tb = null;
        private int pv = 0 ;
        
        @Override
        public void reduce(BaseKey ikey, Iterable<NullWritable> values,
                Context context) throws IOException, InterruptedException {
            
            for (NullWritable nullWritable : values) {
                int size = ikey.getSize();
                if( size >=3 ) continue;
                
                byte[] ib = ikey.getBytes();
                
                if( tb==null ){
                    tb = ib ;
                }
                
                if( sortComp.compare(tb, 0, tb.length, ib, 0, ib.length)==0 ){
                    pv += ikey.getPv();
                    
                }else{
                    InputStream in = new ByteArrayInputStream(tb);
                    tbk.readFields( new DataInputStream(in) );
                    tbk.setPv(pv);
                    context.write(tbk, NullWritable.get());
                    in.close();
                    
                    pv = ikey.getPv() ;
                }
                
                tb = ib ;
            }
        }
        
        @Override
        protected void cleanup(Context context) throws IOException ,InterruptedException {
            if( tb != null){
                InputStream in = new ByteArrayInputStream(tb);
                tbk.readFields( new DataInputStream(in) );
                tbk.setPv(pv);
                context.write(tbk, NullWritable.get());
                in.close();
            }
        }

    }

    public static class StatPvReduce extends
            Reducer<BaseKey, NullWritable, Text, NullWritable> {

        private GroupingComparator group = new GroupingComparator();
        private SortComparator sortComp = new SortComparator();

        public void reduce(BaseKey ikey, Iterable<NullWritable> values,
                Context context) throws IOException, InterruptedException {
            int[] uvs = new int[] { 0, 0, 0 };
            int[] pvs = new int[] { 0, 0, 0 };
            byte[] tb = null;

            for (NullWritable nullWritable : values) {
                int size = ikey.getSize();

                byte[] ib = ikey.getAttr();

                if (tb == null) {
                    tb = ib;
                }

                if (tb.length != ib.length) {
                    uvs[size]++;
                    tb = ib;
                } else {
                    for (int i = 0; i < tb.length; i++) {
                        if (tb[i] != ib[i]) {
                            uvs[size]++;
                            tb = ib;
                            break;
                        }
                    }
                }
                pvs[size] += ikey.getPv();
            }

            StringBuffer out = new StringBuffer();
            out.append(ikey.getId()).append(",").append(ikey.getType())
                    .append("=");
            for (int i = 0; i < 3; i++) {
                out.append(uvs[i]).append(":").append(pvs[i]).append(";");
            }
            context.write(new Text(out.toString()), NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception {
        System.out.println(new Date());
        Configuration config = new Configuration(true);
        PropertiesFileUtil pfu = new PropertiesFileUtil();

        MapreduceUtil mru = new MapreduceUtil(config);
        String out_path_1 = mru.applicationSpace("hdfs:///tmp/pvlog/");
        String out_path_2 = mru.applicationSpace("hdfs:///tmp/pvlog/");

        config.set("mapred.child.java.opts", "-Xmx2048m -Xms512m");
        config.set("io.sort.mb", "250");

        int reduceTasksMax = Integer.parseInt(pfu.get("num.reduce_tasks"));

        // 1

        Job job = new Job(config);
        job.setNumReduceTasks(reduceTasksMax);

        job.setOutputKeyClass(BaseKey.class);
        job.setOutputValueClass(NullWritable.class);
        // job.setGroupingComparatorClass(SortComparator.class);

        job.setSortComparatorClass(SortComparator.class);
        job.setGroupingComparatorClass(GroupingComparator.class);
        job.setPartitionerClass(MyPartitioner.class);

        job.setMapperClass(LoadMap.class);
         job.setCombinerClass(IntSumCombiner.class);
        // job.setReducerClass(StatPvReduce.class);

        // job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(StatPvReduce.class);
        job.setJarByClass(StatBase.class);
        // FileInputFormat.setInputPaths(job, new
        // Path("hdfs:///user/pvlog/prelog/dt=20110523/hr=*/*"));
        FileInputFormat.setInputPaths(job, new Path(
                "hdfs:///user/pvlog/prelog/dt=20110501/hr=*/*.gz"));
        // FileInputFormat.setInputPaths(job, new Path(args[1]));
        FileOutputFormat.setOutputPath(job, new Path(out_path_1));

        job.waitForCompletion(true);
        System.out.println(out_path_1);
        System.out.println(new Date());
    }

}






阅读(1448) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~