After coding my first Hadoop application, I decided to add new requirements to enrich it. For example, I wanted to sort the jams in decreasing order. In this post, I will try to answer this question with another Hadoop application.
Ordering Mapper Output
The mapper class is the same as the first Hadoop application. It simply writes 1 (one) to context to count them later in the reduce step. So nothing changes for mapper. For the reducer, it should still count the number of likes for each each jam. Moreover, it must output them in a decreasingly sorted manner.
There are a few ways to accomplish this. The one I implemented is to create a TreeSet and put outputs of each reducer to that TreeSet. Since the TreeSet is a sorted data structure, the output of the reducer will be a sorted set.
This solution introduces two new problems:
- To use the TreeSet, we need to define a private class that either has a natural ordering, or can be externally sorted by a Comparator
- Since the reducer code will run on different threads for different keys, the TreeSet must be synchronized
The answer for the first problem is to create a class which implements Comparable interface and has jam name and jam count in it. The natural ordering for them is the ordering between the jam counts. Henceforth, the private class I used is below:
private class JamCountPair implements Comparable<JamCountPair> { private String jam; private int count; public JamCountPair(String jam, int count) { this.jam = jam; this.count = count; } @Override public int compareTo(JamCountPair other) { if (this.count < other.count) return -1; if (this.count > other.count) return 1; return this.jam.compareTo(other.jam); } @Override public boolean equals(Object other) { if (this == other) return true; if (!(other instanceof JamCountPair)) return false; JamCountPair otherPair = (JamCountPair) other; if (jam == null) { if (otherPair.jam != null) return false; if (count == otherPair.count) return true; } if (!jam.equals(otherPair.jam)) return false; return count==otherPair.count; } }
The mapper class is the same as before. For the sake of completeness, I put the code again in here:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String[] userAndJam = value.toString().split("\t"); context.write(new Text(userAndJam[1]), one); } }
The task of the reducer is now twofold. It should still count the jams as before. Moreover, it has to sort the “jam – like count” pairs. The JamCountPair class serves for that purpose. After counting the likes of a jam, it stores the pair object in a SortedSet. The critical part is that, this sorted set must be thread-safe. There may be different reducers in different threads which try to store their own pair objects. That is the reason for the sorted set to be synchronized. Here is my solution:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private SortedSet<JamCountPair> sortedSet = Collections.synchronizedSortedSet(new TreeSet<JamCountPair>()); private OrderedLikeCount olc = new OrderedLikeCount(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } sortedSet.add(olc.new JamCountPair(key.toString(), sum)); } }
When the reducer finishes all input from the mappers, the sorted set is also ready. So, how will we print it? There must be a point where all reducer threads finish their job and sorted set is fully available. Welcome to the clean-up step of the reducer. It will wait for the completion of the sorted set and iterate over it to add all pairs to the context. Therefore, the reducer class becomes this:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private SortedSet<JamCountPair> sortedSet = Collections.synchronizedSortedSet(new TreeSet<JamCountPair>()); private OrderedLikeCount olc = new OrderedLikeCount(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } sortedSet.add(olc.new JamCountPair(key.toString(), sum)); } @Override public void cleanup(Context context) throws IOException, InterruptedException { synchronized(sortedSet) { Iterator iter = sortedSet.iterator(); while (iter.hasNext()) { JamCountPair jcp = (JamCountPair) iter.next(); IntWritable result = new IntWritable(); result.set(jcp.count); context.write(new Text(jcp.jam), result); } } } }
Another approach can be sorting the set inside the cleanup method. Each reducer puts its output again in a synchronized set but this time, it is not required to make that set a sorted one. The time and space complexity will not change. For an easier maintenance, the latter approach can be much more suitable.
Leave a Reply