Archive for February, 2016

A Digital Ocean Adventure

24/02/2016

Whenever I use a computer having an operating system other than a variant of Linux, I almost always install a virtual Ubuntu system on it. My single preference in the world of Windows is VirtualBox. In early days of my Mac history, I paid for Parallels and VMware Fusion but at the end settled with VirtualBox.

The beauty of virtual systems is many. You can share your resources with your host operating system without crippling or accidentally damaging it. You have the freedom to play with different configurations, tools and deployments. You can get snapshots and return to them if something goes wrong. The whole virtual disk is a folder that you can easily transfer from machine to machine.

On the downsides, they are limited to your host resources. Especially RAM is the most critical resource. Although Linux is stingy on it, you have to spare a few GBs of your RAM. For me, in today’s standards, the least amount of RAM to be used in two different operating systems at the same time is 8GB.

While experimenting with my RPi cluster and Hadoop, I developed the codes on a virtual Ubuntu 14.04 LTS. But for a long time, I was thinking about setting up a dedicated system to connect remotely by ssh and transferring my development duties there. This way, I would have the option of using even an iPhone or an iPad since there are many ssh apps on the store.

When I read an article about it on Marco Arment’s this post, and listened to these two episode of his Under the Radar podcast, I decided to give it a try and dive into the DigitalOcean. I created my very first Droplet.

DigitalOcean_-_Droplets

We can control most of the droplets’ settings with an iOS app. I use DigitalOcean Manager by Philip Heinser.

I opened up my Hadoop installation post and installed Java 7. Later, I installed Hadoop 2.7.2. There is nothing to be changed from the installation of version 2.7.1, other than the name of the machine. node1 is replaced with ubuntu-1gb-ams3-01, as it can be seen above.

After the installation, I run the WordCount test for smallfile.txt and mediumfile.txt. It took 10 and 30 seconds respectively. That is better than my virtual Ubuntu! I am really impressed.

As far as I know, we pay for the droplet even if it is powered off. That is because DigitalOcean keeps our data, CPU and IP for us. To avoid that payment, we have an option but I do not recommend that. Here is how and here is why.

Either with iOS app or within the DigitalOcean web site, first, we power our droplet off. That is mandatory for the second step. Then, we take a snapshot of the droplet. After that, we destroy the droplet. This does NOT destroy the snapshot. That’s the critical part. We can create a droplet from that snapshot whenever we want. Until that moment, DigitalOcean does not charge for that droplet anymore.

So why do I not recommend that way? I tried this and experienced that the root password changes. That’s not a problem for my Hadoop experiments. But IP also changes. That is a problem. We cannot start hdfs without making necessary changes. The commands that I executed are as follows:

rm ~/.ssh/authorized_keys
ssh-keygen -t rsa -P ""
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh-keygen -f "/home/hduser/.ssh/known_hosts" -R localhost
ssh-keygen -f "/home/hduser/.ssh/known_hosts" -R 0.0.0.0
These seem to be a small cost to pay for but think of it when there are many more data nodes in the cluster. Each time we create a droplet, we need to modify the IP->name mapping files.
Other than that, the only drawback I encountered while using a droplet is not opening large files. This seems to be related to RAM size. Let’s face it, 1GB is not enough to open files with vim.
I am planning to create another droplet and run Hadoop experiments on them.
Advertisement

My First Hadoop Application

04/02/2016

This is My Jam has a simple dataset for our purposes. The very first Hadoop code of mine will try to extra the number of likes a jam gets. We will not order them for now.

Every Hadoop code I develop has a similar pattern. First, I write the code and its unit test simultaneously. I compile them and run the unit test immediately. In the second step, I code a driver class. The aim of this is to run the application locally, without the worry to scale. This will help me to simulate my Hadoop application as if it runs on a cluster consisting of one node running both the mapper and the reducer. As the last step, I deploy the application to the RPi Hadoop cluster and analyse its performance characteristics. I will try to stick this flow as much as possible.

Each record of the likes.tsv file consists of userID and the jamID she liked, separated by a tab character. The record itself is unique. Our output will be a list of the jamIDs and how many different users liked that. We won’t introduce an explicit ordering to the output. Therefore it will be ordered by jamIDs in a descending fashion.

Implementing mapreduce applications is divided into two related and consecutive steps. The first one is mapping the input to an intermediate state. This step is not an arbitrary form, rather it should comply with the input structure of the reducer, which is the second step. So we will be talking about a mapper class and a reducer class. Here is the mapper:

  public static class TokenizerMapper
      extends Mapper<LongWritable, Text, Text, IntWritable>{

      private final static IntWritable one = new IntWritable(1);

      public void map(LongWritable key, Text value, Context context
          ) throws IOException, InterruptedException {
        String[] userAndJam = value.toString().split("\t");
        context.write(new Text(userAndJam[1]), one);
          }
  }

We split a record into two parts and use only the jamID. Actually, the userID is also used, not directly but to increase the counter for the jamID. It adds up to that counter by one.

So how is this code used? Actually, the whole input file is divided into mostly equal sized parts. Each part is fed into a datanode. The datanode runs this mapper application and produces an output. In our case, the output is a key value pair, where key is the jamID and value is an integer, in fact simply a 1.

As I said before, the mapreduce framework assigns datanodes a part of the input data. When a datanode finishes its map task, the mapreduce framework gets its output and merges it with the other outputs. Since they all have a key, the framework simply appends the values for the same keys. In our case, node1 may generate the following map output:

jam1   1
jam2   1
jam1   1
jam3   1
jam1   1

The node2 may generate an entirely different map output as in:

jam2    1
jam2    1
jam1    1
jam4    1

The intermediate output of all the map tasks, which will be the input for the reducer, will be:

jam1  [1, 1, 1, 1]
jam2  [1, 1, 1]
jam3  [1]
jam4  [1]

Note that, the intermediate output is sorted by keys. The same ordering will be preserved in the output of the reducer, unless an explicit sorting mechanism is implemented in the reducer.

However, the order of the values is NOT preserved or even deterministic. I emphasised it by mixing the order from different nodes. node1 is represented by blue and node2 is represented by red.

With that input, what the reducer should do becomes very trivial. It simply adds 1s in the list for each jamID. The result will be:

jam1   4
jam2   3
jam3   1
jam4   1

Note that the order of the output is the same as the order in the intermediate step. Here is the code for the reducer:

  public static class IntSumReducer
      extends Reducer<Text,IntWritable,Text,IntWritable> {
      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values,
          Context context
          ) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
          sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
          }
  }

To bring all those together and encapsulate as a class, I modified the tutorial code and add it to my github repository.

I preferred to use MRUnit for unit tests. For any mapreduce class, we can generate three units tests:

  1. Unit test for mapper: This is for testing the mapper only
  2. Unit test for reducer: This is for testing the reducer only
  3. Unit test for mapper-reducer: This is the complete test from start to end
  @Test
  public void testMapper() {
    mapDriver.withInput(new LongWritable(), new Text(
          "c1066039fa61eede113878259c1222d1\t5d2bc46196d7903a5580f0dbedc09610"));
    mapDriver.withOutput(new Text("5d2bc46196d7903a5580f0dbedc09610"), new IntWritable(1));
    try {
      mapDriver.runTest();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  @Test
  public void testReducer() {
    List<IntWritable> values = new ArrayList<IntWritable>();
    values.add(new IntWritable(1));
    values.add(new IntWritable(1));
    reduceDriver.withInput(new Text("5d2bc46196d7903a5580f0dbedc09610"), values);
    reduceDriver.withOutput(new Text("5d2bc46196d7903a5580f0dbedc09610"), new IntWritable(2));
    try {
      reduceDriver.runTest();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

I only tested one record for mapper and reducer. For the full test, I decided to work on a bigger (3!) data set:

  @Test
  public void testMapReduce() {
    List<Pair<LongWritable, Text>> inputLines = new ArrayList<Pair<LongWritable, Text>>(3);
    Pair<LongWritable, Text> i1 = new Pair<LongWritable, Text>(new LongWritable(1), new Text(
          "c1066039fa61eede113878259c1222d1\t5d2bc46196d7903a5580f0dbedc09610"));
    Pair<LongWritable, Text> i2 = new Pair<LongWritable, Text>(new LongWritable(2), new Text(
          "c1066039fa61eede113878259c1222dl\t5d2bc46196d7903a5580f0dbedc09610"));
    Pair<LongWritable, Text> i3 = new Pair<LongWritable, Text>(new LongWritable(3), new Text(
          "c1066039fa61eede113878259c1222d1\t5d2bc46l96d7903a5580f0dbedc09610"));
    inputLines.add(i1);
    inputLines.add(i2);
    inputLines.add(i3);
    mapReduceDriver.withAll(inputLines);
    List<Pair<Text, IntWritable>> outputLines = new ArrayList<Pair<Text, IntWritable>>(2);
    Pair<Text, IntWritable> o1 = new Pair<Text, IntWritable>(new Text("5d2bc46196d7903a5580f0dbedc09610"), new IntWritable(2));
    Pair<Text, IntWritable> o2 = new Pair<Text, IntWritable>(new Text("5d2bc46l96d7903a5580f0dbedc09610"), new IntWritable(1));
    outputLines.add(o1);
    outputLines.add(o2);
    mapReduceDriver.withAllOutput(outputLines);
    try {
      mapReduceDriver.runTest();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

Although the tests differ from each other and concentrate on different aspects, the idea is the same. Add the input data, which represent the records. Add the expected output and runTest().

For the sake of completeness, I also uploaded the unit tests to my github repository.

After running unit tests, we can either run the application on the HDFS cluster (which I do NOT recommend) or run the application with a driver class. The importance of the driver class is that, it enables us to see its performance on the local file system. Furthermore, it gives us the opportunity to remove last splinters before the scalability comes in to the scene. The driver class I have written can again be found in the hub.

Before running the driver, however, we need to create a new Hadoop configuration. This is essential since the driver class will not have any access to the HDFS and the trackers. Instead, it will use the local mapred job tracker. We create the following configuration file, mapred-local.xml, under /usr/local/hadoop/etc/hadoop/ by copying from the mapred-site.xml file:

cp /usr/local/hadoop/etc/hadoop/mapred-site.xml /usr/local/hadoop/etc/hadoop/mapred-local.xml
Then, modify its content by adding the two properties between configuration tags:
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>file:///</value>
  </property>
  <property>
    <name>mapred.job.tracker</name>
    <value>local</value>
  </property>
</configuration>
We coded and created the necessary configuration. How do we compile them? I moved all the .java source files under the Hadoop installation folder, /usr/local/hadoop/ Also, I created a folder, /usr/local/hadoop/my_jam_analysis/, to preserve the package structure and to store the compiled .class files. Hence, the next command is enough to compile our Java codes:
javac -cp ${HADOOP_JAVA_PATH} -d /usr/local/hadoop/my_jam_analysis/ /usr/local/hadoop/*.java
When the .class files are ready, we can both run the unit tests and driver class separately. Testing is performed by:
java -cp ${HADOOP_JAVA_PATH} org.junit.runner.JUnitCore tr.name.sualp.merter.hadoop.myjam.LikedJamCountMapperReducerTest
Running our application on the local machine without HDFS is accomplished by:
java -cp ${HADOOP_JAVA_PATH} tr.name.sualp.merter.hadoop.myjam.LikedJamCountDriver -conf /usr/local/hadoop/etc/hadoop/mapred-local.xml /media/sf_Downloads/This\ is\ My\ Jam\ Dataset/likes.tsv output
Well, it seems there are lots to digest. I am planning to write about taking the application to RPi cluster and examining the performance metrics with nmon analyser.

Hadoop Development Environment Setup

01/02/2016

Before writing our first codes, we need to prepare our development environment. We will use Java as our programming language and also code unit tests. To be as simple and generic as possible, I prefered to use vim as my editor and command line as my compiling tool. Since we are using Java and do not depend on any IDE, we are to square away our own classpath. That is the critical step I will explain in here.

To compile our Java Hadoop codes, we will add HADOOP_CLASSPATH environment varible to our .bashrc file as:

export HADOOP_CLASSPATH=$($HADOOP_INSTALL/bin/hadoop classpath)

I develop the Hadoop applications in a different machine than the RPis. We do not have to but I recommand this approach. It has a few pros and almost none cons.

  1. The separation of development environment from the production is necessary. We can break anything at anytime in development. It shouldn’t interfere with RPi cluster.
  2. Since development environment is fragile, I definitely encourage you to use virtual Linux on Mac. My setup includes Ubuntu 14.04 LTS running on a Mac host with VirtualBox. You can backup and restore easily.
  3. I installed a single node Hadoop on virtual Linux. My HADOOP_INSTALLATION variable is /usr/local/hadoop. My java files are also reside there.

The jar files inside HADOOP_CLASSPATH are enough to compile Java codes. I prefered to create a folder named /usr/local/hadoop/my_jar_analysis and put all my compiled .class files under that.

javac -classpath ${HADOOP_CLASSPATH} -d /usr/local/hadoop/my_jam_analysis \
/usr/local/hadoop/*.java

Although this very simple setup is enough to compile Hadoop codes written in Java, it is far from complete to successfully compile the Hadoop unit tests.

Apache MRUnit is designed to produce mapreduce unit tests. You can download the source codes and binaries from here. I downloaded the zip file containing the binaries and extracted it. Then, copied all the contents to my virtual Linux.

mkdir /home/hduser/dev

mkdir/home/hduser/dev/apache-mrunit-1.1.0-hadoop2-bin

cp -a /media/sf_Downloads/apache-mrunit-1.1.0-hadoop2-bin/.  \
/home/hduser/dev/apache-mrunit-1.1.0-hadoop2-bin/

cd /home/hduser/dev/apache-mrunit-1.1.0-hadoop2-bin/
To generate the complete mrunit.jar file, we must have Maven to be installed. With Maven, we just run:
mvn package -Dhadoop.version=2
mvn_error

Despite the error, mrunit-1.1.0-hadoop2.jar file is generated under the folder /home/hduser/dev/apache-mrunit-1.1.0-hadoop2-bin/target/

mrunit.jar
This is one of the many jars that must be added to our classpath. The others are:
  • Hadoop classpath
  • lib jars for mrunit
  • Our .class files

In an ideal world, we can combine them separated with column (:) and our classpath should be ready. However, the problem I encountered here is that there are some jars occuring different directories but having the same .class files. For example, I found the MockSettingsImpl.class both in a jar under Hadoop classpath and in another jar under lib of mrunit. The Java, unintentionally, states that it cannot find that class. In fact, it cannot identify it UNIQUELY. What I did is to mimic the exclusion syntax of dependency resolution tools. It is not elegant in any way, but it does its job pretty well. Here is the environment variable setting in .bashrc:

export HADOOP_JAVA_PATH=$(echo $HADOOP_CLASSPATH | \
sed 's@\/usr\/local\/hadoop\/share\/hadoop\/common\/lib\/\*@'"$(readlink -f /usr/local/hadoop/share/hadoop/common/lib/* \
| grep -v mockito | sed -e ':a' -e 'N' -e '$!ba' -e 's/\n/:/g')"'@g'):/home/hduser/dev/apache-mrunit-1.1.0-hadoop2-bin/lib/*:/home/hduser/dev/apache-mrunit-1.1.0-hadoop2-bin/target/mrunit-1.1.0-hadoop2.jar:/usr/local/hadoop/my_jam_analysis/

I should divide it into meaningful chunks and explain them briefly.

readlink -f /usr/local/hadoop/share/hadoop/common/lib/*

We are getting all the files under the defined folder with their full paths.

readlink -f /usr/local/hadoop/share/hadoop/common/lib/* | grep -v mockito

We just remove the ones containing mockito in their names. This part is crucial. The exclusion mechanism actually occures in here.

readlink -f /usr/local/hadoop/share/hadoop/common/lib/* | grep -v mockito \
| sed -e ':a' -e 'N' -e '$!ba' -e 's/\n/:/g'

After removing the conflicting jar, we concatenate the paths of other jars under that folder, putting a column (:) between each of them.

$(echo $HADOOP_CLASSPATH | sed \
's@\/usr\/local\/hadoop\/share\/hadoop\/common\/lib\/\*@'"$(readlink -f /usr/local/hadoop/share/hadoop/common/lib/* \
| grep -v mockito | sed -e ':a' -e 'N' -e '$!ba' -e 's/\n/:/g')"'@g')

Before adding the other jars to the classpath, we replace the containing folder inside the HADOOP_CLASSPATH with individual jars we constructed just before. As the last step, we add the other jar locations.

With only the HADOOP_JAVA_PATH, we are able to compile and run our unit tests and Hadoop codes. In the next post, I will show how.