package com.sohu.pvlog.main;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.script.ScriptEngine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.bloom.HashFunction;
import org.apache.hadoop.util.hash.Hash;
import com.sohu.pvlog.common.bean.DataFormat;
import com.sohu.pvlog.common.util.MapreduceUtil;
import com.sohu.pvlog.common.util.PropertiesFileUtil;
public class StatBase {
public static class BaseKey extends BinaryComparable implements
WritableComparable<BinaryComparable> {
private int id;
public int getId() {
return id;
}
private int type;
public int getType() {
return type;
}
private int size;
public int getSize() {
return size;
}
private byte[] attr;
private int pv;
public int getPv() {
return pv;
}
public void setPv(int pv) {
this.pv=pv;
}
public void addPv(int pv) {
this.pv += pv;
}
public byte[] getAttr() {
return attr;
}
public void set(int id, int type, int size, byte[] attr, int pv) {
this.id = id;
this.type = type;
this.size = size;
this.attr = attr;
this.pv = pv;
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readInt();
this.type = in.readInt();
this.size = in.readInt();
int attrlen = in.readInt();
attr = new byte[attrlen];
in.readFully(attr);
this.pv = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
byte[] bytes = getBytes();
out.write(bytes);
}
@Override
public byte[] getBytes() {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(buf);
try {
out.writeInt(this.id);
out.writeInt(this.type);
out.writeInt(this.size);
out.writeInt(this.attr.length);
out.write(this.attr);
out.writeInt(this.pv);
return buf.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public int getLength() {
return getBytes().length;
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append(this.getId()).append(",").append(this.getType())
.append(",").append(this.getSize()).append(",");
try {
sb.append(new BigInteger(this.getAttr())).append(",")
.append(this.getPv());
;
} catch (Exception e) {
}
return sb.toString();
}
}
public static long ipToLong(String strIp) {
try {
long[] ip = new long[4];
// 先找到IP地址字符串中.的位置
int position1 = strIp.indexOf(".");
int position2 = strIp.indexOf(".", position1 + 1);
int position3 = strIp.indexOf(".", position2 + 1);
// 将每个.之间的字符串转换成整型
ip[0] = Long.parseLong(strIp.substring(0, position1));
ip[1] = Long.parseLong(strIp.substring(position1 + 1, position2));
ip[2] = Long.parseLong(strIp.substring(position2 + 1, position3));
ip[3] = Long.parseLong(strIp.substring(position3 + 1));
return (ip[0] << 24) + (ip[1] << 16) + (ip[2] << 8) + ip[3];
} catch (Exception e) {
return 0L;
}
}
/**
* #type= 0-栏目,1-频道 ,3-全站
*
* channel=suv,ip,userid
*
* channel=suv channel=ip channel=userid
*/
public static class LoadMap extends
Mapper<LongWritable, Text, BaseKey, NullWritable> {
private DataFormat df = new DataFormat();
private BaseKey bk = new BaseKey();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
try {
df.setExtractStr("$column|$channels|0,0|1|3=$suv,$ip,$userid",",|=");
// df.setExtractStr("0,3=$suv",",|=");
} catch (Exception e) {
e.printStackTrace();
}
}
// k1,k2,k3:v1,v2,v3
public void map(LongWritable ikey, Text ivalue, Context context)
throws IOException, InterruptedException {
df.set(ivalue.toString(), "\t");
String[] exstrs = df.getExtract();
for (int i = 0; i < exstrs.length; i++) {
String[] kv = exstrs[i].split("=");
String[] kk = kv[0].split(",");
String[] vv = kv[1].split(",");
int id = Integer.valueOf(kk[0]);
int type = Integer.valueOf(kk[1]);
for (int j = 0; j < vv.length; j++) {
byte[] bs = new byte[0];
try {
if (j == 0) {
bs = new BigInteger(vv[0]).toByteArray();
} else if (j == 1) {
bs = BigInteger.valueOf(ipToLong(vv[1]))
.toByteArray();
} else if (j == 2) {
bs = vv[2].getBytes();
}
} catch (NumberFormatException e) {
}
bk.set(id, type, j, bs, 1);
context.write(bk, NullWritable.get());
}
}
}
}
// 根据第二列 %2 分区
public static class MyPartitioner extends
Partitioner<BaseKey, NullWritable> {
@Override
public int getPartition(BaseKey key, NullWritable value,int numPartitions) {
int id = key.getId();
int type = key.getType();
//return (Math.abs( id * 3 -1 ) + Math.abs(type * 127)) % numPartitions;
return (Math.abs( id * 127 )) % numPartitions;
}
}
public static class GroupingComparator implements RawComparator<BaseKey> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(
b1, s1,Integer.SIZE / 8 * 2,
b2, s2, Integer.SIZE / 8 * 2);
}
@Override
public int compare(BaseKey o1, BaseKey o2) {
return 0;
}
}
public static class SortComparator implements RawComparator<BaseKey> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// id,type,size
int ii = WritableComparator.compareBytes(b1, s1,
Integer.SIZE / 8 * 3, b2, s2, Integer.SIZE / 8 * 3);
if (ii == 0) {
return WritableComparator.compareBytes(b1, s1 + Integer.SIZE
/ 8 * 4, l1 - Integer.SIZE / 8 * 5, b2, s2
+ Integer.SIZE / 8 * 4, l2 - Integer.SIZE / 8 * 5);
} else {
return ii;
}
}
@Override
public int compare(BaseKey o1, BaseKey o2) {
return 0 ;
}
}
public static class IntSumCombiner extends
Reducer<BaseKey, NullWritable, BaseKey, NullWritable> {
private SortComparator sortComp = new SortComparator();
private BaseKey tbk = new BaseKey();
private byte[] tb = null;
private int pv = 0 ;
@Override
public void reduce(BaseKey ikey, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
for (NullWritable nullWritable : values) {
int size = ikey.getSize();
if( size >=3 ) continue;
byte[] ib = ikey.getBytes();
if( tb==null ){
tb = ib ;
}
if( sortComp.compare(tb, 0, tb.length, ib, 0, ib.length)==0 ){
pv += ikey.getPv();
}else{
InputStream in = new ByteArrayInputStream(tb);
tbk.readFields( new DataInputStream(in) );
tbk.setPv(pv);
context.write(tbk, NullWritable.get());
in.close();
pv = ikey.getPv() ;
}
tb = ib ;
}
}
@Override
protected void cleanup(Context context) throws IOException ,InterruptedException {
if( tb != null){
InputStream in = new ByteArrayInputStream(tb);
tbk.readFields( new DataInputStream(in) );
tbk.setPv(pv);
context.write(tbk, NullWritable.get());
in.close();
}
}
}
public static class StatPvReduce extends
Reducer<BaseKey, NullWritable, Text, NullWritable> {
private GroupingComparator group = new GroupingComparator();
private SortComparator sortComp = new SortComparator();
public void reduce(BaseKey ikey, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {
int[] uvs = new int[] { 0, 0, 0 };
int[] pvs = new int[] { 0, 0, 0 };
byte[] tb = null;
for (NullWritable nullWritable : values) {
int size = ikey.getSize();
byte[] ib = ikey.getAttr();
if (tb == null) {
tb = ib;
}
if (tb.length != ib.length) {
uvs[size]++;
tb = ib;
} else {
for (int i = 0; i < tb.length; i++) {
if (tb[i] != ib[i]) {
uvs[size]++;
tb = ib;
break;
}
}
}
pvs[size] += ikey.getPv();
}
StringBuffer out = new StringBuffer();
out.append(ikey.getId()).append(",").append(ikey.getType())
.append("=");
for (int i = 0; i < 3; i++) {
out.append(uvs[i]).append(":").append(pvs[i]).append(";");
}
context.write(new Text(out.toString()), NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
System.out.println(new Date());
Configuration config = new Configuration(true);
PropertiesFileUtil pfu = new PropertiesFileUtil();
MapreduceUtil mru = new MapreduceUtil(config);
String out_path_1 = mru.applicationSpace("hdfs:///tmp/pvlog/");
String out_path_2 = mru.applicationSpace("hdfs:///tmp/pvlog/");
config.set("mapred.child.java.opts", "-Xmx2048m -Xms512m");
config.set("io.sort.mb", "250");
int reduceTasksMax = Integer.parseInt(pfu.get("num.reduce_tasks"));
// 1
Job job = new Job(config);
job.setNumReduceTasks(reduceTasksMax);
job.setOutputKeyClass(BaseKey.class);
job.setOutputValueClass(NullWritable.class);
// job.setGroupingComparatorClass(SortComparator.class);
job.setSortComparatorClass(SortComparator.class);
job.setGroupingComparatorClass(GroupingComparator.class);
job.setPartitionerClass(MyPartitioner.class);
job.setMapperClass(LoadMap.class);
job.setCombinerClass(IntSumCombiner.class);
// job.setReducerClass(StatPvReduce.class);
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(StatPvReduce.class);
job.setJarByClass(StatBase.class);
// FileInputFormat.setInputPaths(job, new
// Path("hdfs:///user/pvlog/prelog/dt=20110523/hr=*/*"));
FileInputFormat.setInputPaths(job, new Path(
"hdfs:///user/pvlog/prelog/dt=20110501/hr=*/*.gz"));
// FileInputFormat.setInputPaths(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(out_path_1));
job.waitForCompletion(true);
System.out.println(out_path_1);
System.out.println(new Date());
}
}
|