class MyMap extends Mapper<LongWritable, Text,
Text, NullWritable> {
@Override
public void map(LongWritable ll, Text row, Context context)
throws IOException, InterruptedException {
context.write( new Text(row.toString()), NullWritable.get());
}
}
class MyReducer extends Reducer<Text, NullWritable,
Text, NullWritable> {
Text t = new Text();
public void reduce(Text key,
Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
t.set(">-------" + key.toString().split("\t")[1]);
context.write(t, NullWritable.get());
for (NullWritable value : values) {
context.write(key, NullWritable.get());
}
}
}
// 根据第二列 %2 分区
class MyPartitioner extends Partitioner<Text, NullWritable> {
@Override
public int getPartition(Text key,
NullWritable value, int numPartitions) {
String[] cols = key.toString().split("\t");
return Integer.parseInt(cols[1]) % numPartitions;
}
}
//以此 第二列,第一列 排序
class MySort implements RawComparator<Text> {
@Override
public int compare(Text o1, Text o2) {
String suv1 = o1.toString().split("\t")[1];
String time1 = o1.toString().split("\t")[0];
String suv2 = o2.toString().split("\t")[1];
String time2 = o2.toString().split("\t")[0];
int ii = suv1.compareTo(suv2);
if (ii != 0)
return ii;
else {
return time1.compareTo(time2);
}
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
Text key1 = new Text();
Text key2 = new Text();
DataInputBuffer buffer = new DataInputBuffer();
try {
buffer.reset(b1, s1, l1);
key1.readFields(buffer);
buffer.reset(b2, s2, l2);
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
return compare(key1, key2);
}
}
// 以第二列 值 分组
class MyComparator implements RawComparator<Text> {
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2,int l2) {
Text key1 = new Text();
Text key2 = new Text();
DataInputBuffer buffer = new DataInputBuffer();
try {
buffer.reset(b1, s1, l1);
key1.readFields(buffer);
buffer.reset(b2, s2, l2);
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
}
String str1 = key1.toString().split("\t")[1];
String str2 = key2.toString().split("\t")[1];
return str1.compareTo(str2);
}
@Override
public int compare(Text o1, Text o2) {
return 0;
}
}
public static
void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration config = new Configuration(true);
config.set("mapred.child.java.opts", "-Xmx1024m -Xms512m");
config.set("io.sort.mb", "100");
MapreduceUtil mru = new MapreduceUtil(config);
String out_path_1 = mru.applicationSpace(null);
Job job = new Job(config);
job.setNumReduceTasks(2);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(MyMap.class);
job.setJarByClass(MapreduceUtil.class);
job.setReducerClass(MyReducer.class);
job.setPartitionerClass(MyPartitioner.class);
job.setGroupingComparatorClass(MyComparator.class);
job.setSortComparatorClass(MySort.class);
FileInputFormat.setInputPaths(job, new Path("/tmp/text.txt"));
FileOutputFormat.setOutputPath(job, new Path(out_path_1));
job.waitForCompletion(true);
System.out.println(out_path_1);
}
|