Archive for January, 2016

Data and Hadoop Block Distribution

31/01/2016

As a start Hadoop studies, I decided to download a set of small files to play with. By small files, I mean small files with respect to really big ones. My choice is This is My Jam data dump. There are 3 files, jams.tsv, followers.tsv and likes.tsv. They are of size 369, a06 and 394MB respectively.

First, I copied to largest one to the HDFS in node1:

hdfs dfs -copyFromLocal /media/sf_Downloads/This\ is\ my\ jam/likes.tsv /

Since my block size is set to 5MB, there should be more than 70 blocks. I also set the replication factor to 3. So, how can we learn the exact number of blocks and where they are?

hdfs fsck /

fsck

Now, here, we can see that the total size of our single file is 393929184 bytes. When we divide this by 5x1024x1024, we get a little bit more than 75. So, the number of blocks should be 76. That is correct. We also verify that the replication factor is 3. That’s good. The filesystem is also healthy. Let’s dig into more details about the individual blocks.

hdfs fsck / -files -blocks

fsck_files_blocks

Each block has three information. The second and the third ones are self-explanatory. Let me dissect the first one. This item is composed of three different items, separated by underscores (_). The first item is all the same for each block. This represents the block pool id. It tells us which node generated this block. In our case, this must be the node1. When we format the HDFS, the namenode assigns a unique ID as the block pool ID, and a different ID as cluster ID. These IDs are all the same for the nodes in that cluster. We can check that by investigating the necessary files under node1:

cat /hdfs/tmp/dfs/data/current/VERSION
cat /hdfs/tmp/dfs/name/current/VERSION

master_current_VERSIONs

The block pool ID is the same as we saw in the fsck output. The second item in the block information is the unique block ID. The last part is the Generation Stamp. When that block changes, it is incremented accordingly. So, with all these three items, a block can be uniquely identified.

We have 76 blocks and the replication factor is there. We also have 4 active data nodes. The question is, where do these 76×3 blocks reside?

hdfs fsck / -files – blocks -locations

fsck_files_blocks_locations

The start of the line is exactly the same as the output of the previous fsck command. The remaining part shows us where that block can be found. It is a list of DatanodeInfoWithStorage objects. The size of it matches our replication factor. That information includes the IP and port of that node, storage ID and storage type. The storage type is mostly DISK but it can also be SSD, ARCHIVE or RAM_DISK.

What about the storage ID? HDFS formatting assigns each node a unique storage ID. Nodes can have multiple disks but this ID is assigned to the node, NOT to the disk of a node. We can examine this, for example, in node2:

cat /hdfs/tmp/dfs/data/current/VERSION

slave_current_VERSION

The storage ID matches with the corresponding entries in block location information.

We can also verify the same information from web application of HDFS.

Datanode_information

All four datanodes are active and their block counts are also visible. node1 has all the blocks. This is normal but can cause performance problems when we run our tasks in the cluster. We can browse the files:

Browsing_HDFS

There is only one file. When we click on it, we get the individual block information:

HDFS_BlockInfo

These are exactly the same as the ones we gathered through command line fsck commands. A small detail is that, the name of nodes are presented in Availability section, in place of IPs and ports.

In the next post, we will prepare our development environment.

Advertisement

Adding More RPis to the Hadoop Cluster

26/01/2016

In the previous posts, we talked about getting two RPis together as the base of our cluster. Now, we will add possible many more RPis to them.

The main component in adding third and others is the second RPi we prepared. It is in fact the first of the slave nodes. What we will do is to clone RPi and modify a few files to present it as our third node. Then we can do the same for fourth, fifth, etc…

Cloning an RPis is very important. We can use that clone for both adding new RPis and getting regular backups of our cluster nodes. Therefore, the process should include cloning and restoring. One of the best scripts written for this purpose can be found in this page. I used this and definitely recommend that. Please install them first.

After installing the scripts, we will backup the second node with this command:

sysmatt.rpi.backup.gtar

The gtar file, which is an exact copy of node2, will reside under a folder you have chosen. I did not change the default file names and locations but you can do however you like. Here is that gtar file:

backup

Here, actually we see that it is the clone of node3. That is not important. node2 and node3 are the same.

After cloning, we need to restore that image to another SD card. First, we will find the assigned device name of the SD card:

dmesg

Mine was set to be /dev/sda. I moved on as explained by Matt Hoskins:

begin_restore

restore_step2

It takes time to copy every single file to the SD so be patient. In the very end, the SD is ready, mounted and waits for modifications:

begin_modification

Here comes the critical part. As can be seen, the fresh copy is mounted as /tmp/pi.sd.172381304. We will change the IP and name of the new node as 192.168.2.112 and node3 respectively.

vim /tmp/pi.sd.172381304/etc/dchpcd.conf
vim /tmp/pi.sd.172381304/etc/hostname

Now we are ready to unmount the drive, put it into our third RPi and run it. If everything is ok, you can see the three RPis with IP Scanner and access to new node by its IP. But your Mac, and the other nodes will not resolve node3, because we need to define it inside /etc/hosts file for all nodes and Mac. Add the following line:

192.168.2.112 node3

Up until now, we can use the user pi. From now on, we will login with hduser for all three nodes.

The node1 contains the slaves information. node3 will be a slave so we will add it to:

vim /opt/hadoop-2.7.1/etc/hadoop/slaves

clean the previous HDFS file system for all three nodes:

rm -rf /hdfs/tmp/*

and format the HDFS file system in node1:

hdfs namenode -format -clusterId CID-1ed51b7f-343a-4c6d-bcb6-5c694c7a4f84

So we successfully added the third node. These steps can be reproduced for other RPis that will be added as slaves.

An optional step is to change the number of replications. When we have only two nodes, it was set to 2 in hdfs-site.xml. After adding the third one, it can be updated to 3 or it can stay at 2. You can change it however you like and your storage permits.

The next post will be about finding an example data and investigating the HDFS properties of the dataset files.

What can go wrong?

20/01/2016

I was happy that I have many Hadoop installation guides under my belt. During my setup, I followed them step by step. At some point, there may be some glitches but the process was tested and tried, right?

Well, it seems anything can go wrong even with the best tools. In my first attempt, the single node was perfectly fine. The system is open to more problems when the node size increases. I hit one of them while trying to add the second node. The same anomaly happened again on the fourth node. Today, I would like to share that and present the methods to solve it.

What did happen? Frankly I do not know the root cause but when I started the dfs and yarn daemons, I could not see the datanode processes with the jps command. The daemon startup sequence does not show explicit messages. What it does is to dump the events and , if any, errors to respective log files. Each process has its own log file. They are located under your hadoop installation directory.

ls -al /opt/hadoop-2.7.1/logs/

logs-node1logs-node2

As you can see, node1 has much more log files than node2, since it is the master node. I did not include the other nodes since they are almost exactly the same as node2.

The problem was about the datanode in node1. Henceforth, I read the contents of the file hadoop-hduser-datanode-node1.log. What I saw was this error:

version_error

The critical information here is the incompatibility between clusterIDs in different files. Only namenode and datanode stores the clusterIDs and they must be the same. These two files reside under the folder of the HDFS file system. The master node has both of these files while slaves have only the datanode information, since they do not act as namenodes. Let’s see the content of them:

version_difference

The corresponding files in node1 are

/hdfs/tmp/dfs/data/current/VERSION
/hdfs/tmp/dfs/name/current/VERSION

As you can see, the clusterIDs are different. To decide which one to use, I looked at the namenode file of node2. My reasoning was that, since the namenode of node1 and datanode of node2 are operating normally, the clusterID in datanode of node1 should be changed. For this case, I just set the datanode clusterID of node1 to the namenode clusterID of node1. With that, I was able to put the system up and running.

The plus side of this solution is, we keep the files under the HDFS file system intact. We do not delete anything, format anything etc.

Although I solved this one with the aforementioned solution, it did not help when I added the fourth node to the system. At that time, after formatting the HDFS file system, some of datanodes did not show up. I tried to accomplish the same trick with copying the clusterID of running nodes to problematic ones but it was no avail. Here comes our second solution.

I deleted all content under /hdfs/tmp/ for all nodes.

hduser@node1> rm -rf /hdfs/tmp/*
hduser@node2> rm -rf /hdfs/tmp/*
hduser@node3> rm -rf /hdfs/tmp/*
hduser@node4> rm -rf /hdfs/tmp/*

After all these, I reformatted the namenode, but this time, provided a predefined clusterID:

hduser@node1> hdfs namenode -format -clusterId CID-1ed51b7f-343a-4c6d-bcb6-5c694c7a4f84

I just wanted to be sure that all nodes are assigned to my clusterID. It worked.

The negative side of this solution is that all hdfs content is, unfortunately, wiped out.

Adding the Second Raspberry Pi

12/01/2016

The installation of Raspbian Operating System to the second RPi is exactly the same as the first one.

There are a few differences in the Hadoop installation though. The other guides mention only the things that change but I want mine to be as comprehensive as possible. Therefore, I copied and pasted the Hadoop installation to the first RPi, and pay special attention to the changes by making them bold and adding critical information.

Before starting the Hadoop installation, there are a few steps that we need to be sure of. The java version should be 7 or 8.

java -version

will show the installed java run time environment. If, for any reason, we cannot run it, or java 6 or below is displayed, we can get the required version by

sudo apt-get install openjdk-8-jdk

After this, renaming the RPi hostname and assigning a static IP for it will be very helpful. We can create/modify the /etc/hostname file. The name node2 can be a good candidate since we will use a few more  RPis.

sudo vi /etc/hostname

The raspi-config menu contains the hostname change mechanism under “Advanced”(9) and “Hostname” (A2).

We can assign a static IP by appending the following to /etc/dhcpcd.conf file.

sudo vi /etc/dhcpcd.conf
interface eth0
static ip_address=192.168.2.111
static routers=192.168.2.1
static domain_name_servers=192.168.2.1

I gave my values here, but please change yours according to your network information. Restarting RPi will be required. If you somehow modify /etc/network/interfaces file, your RPi will have 2 IPs, one is the static you provided, the other is the dynamic. To eliminate the dynamic one, we are to apply the aforementioned solution.

When you want to access either node1 or node2 from your Mac, or from each other, you cannot do that by using their names. But you can do that by using their IPs. What’s happening? The problem here is that, our routers have no clue about node1 and node2 in our local network. So, each machine must map the node names to IPs. There are a few alternatives but I have used the simplest solution. In each of RPis, I opened the file /etc/hosts and appended the IPs and their corresponding names.

sudo vim /etc/hosts
192.168.2.110    node1
192.168.2.111    node2

This way, every RPi will know the others both by name and by IP.

Next step is preparing the Hadoop user, group and ssh. The three commands below create hadoop group, add the hduser to that group and enable the hduser to do super user operations.

sudo addgroup hadoop
sudo adduser --ingroup hadoop hduser
sudo adduser hduser sudo

The last command will trigger a set of options. First, we need to form a password for the hduser. I left the other options blank. Now it is time to login with our new hduser. From now on, we will be working as it.

Since ssh is the main medium of coordination, the hadoop users must ssh to other nodes and their localhost without a password.

ssh-keygen -t rsa -P ""

I left the file location as is. We need to copy that file (~/.ssh/id_rsa.pub) as ~/.ssh/authorized_keys

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Let’s check if we can ssh localhost.

ssh localhost

We are asked to add localhost as a known host. That’s ok. In our second login with hduser, there shouldn’t be any questions to be asked. Even a password is not required. That’s what we wanted to achieve.

The node1 will be our master RPi. Therefore, it needs to login to other RPis in the cluster. For that reason, we need to add the authorization key of hduser@node1 to the authorization keys of other RPis.

hduser@node1 ~ $ cat .ssh/authorized_keys

Copy the content over to

hduser@node2 ~ $ vim .ssh/autorized_keys

This last file will contain only the authorization key of hduser@node1 .

We do not need to download the Hadoop installation files again. We already have that in node1. Get it by using

scp pi@192.168.2.110:/home/pi/hadoop* /

Now since we are ready, we can unzip the hadoop zip file. I preferred to install it under /opt/. You can choose anywhere you like.

sudo tar -xvzf hadoop-2.7.1.tar.gz -C /opt/

The default directory will be named after the version. In my case, it became /opt/hadoop-2.7.1/. You can change anything you like. I am sticking with this one. The owner:group of the newly created folder should be hduser:hadoop

sudo chown -R hduser:hadoop /opt/hadoop-2.7.1/

Now we can login with hduser again. From now on, it will be our only user to work with.

There should be a file named .bashrc in the home directory of hduser. If not, please create it. This file works every time hduser logs in. We will define exports in here to use hadoop commands without specifying the hadoop installation location each time. Append those three exports to .bashrc file.

export JAVA_HOME=$(readlink -f /usr/bin/java | sed "s:bin/java::")
export HADOOP_INSTALL=/opt/hadoop-2.7.1
export PATH=$PATH:$HADOOP_INSTALL/bin:$HADOOP_INSTALL/sbin

You do not need to logout. Simply run

. ~/.bashrc

To test the setting, we can try to learn the hadoop version we are installing

hadoop version

node2_hadoop

The Hadoop environment information resides in hadoop-env.sh file. We must provide the three parameters. To edit it

sudo vi /opt/hadoop-2.7.1/etc/hadoop/hadoop-env.sh

Search for the following export statements and change them accordingly.

export JAVA_HOME=$(readlink -f /usr/bin/java | sed "s:bin/java::")
export HADOOP_HEAPSIZE=900
export HADOOP_DATANODE_OPTS="-Dcom.sun.management.jmxremote $HADOOP_DATANODE_OPTSi -client"

There is another set of files under the same folder, which contain parameters about the location of file system and its name (core-site.xml), map-reduce job tracker (mapred-site.xml.template), Hadoop file system replication information (hdfs-site.xml) and YARN services connection information (yarn-site.xml).

In core-site.xml, add the following properties between <configuration/> tag:

  <property>
    <name>hadoop.tmp.dir</name>
    <value>/hdfs/tmp</value>
  </property>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://node1:54310</value>
  </property>

This shows where the Hadoop file system operation directory resides and how to access it.

In mapred-site.xml.template, add the following property between <configuration/> tag:

  <property>
   <name>mapred.job.tracker</name>
   <value>node1:54311</value>
  </property>

Here, the host and port of the MapReduce job tracker is defined.

In hdfs-site.xml, add the following property between <configuration/> tag:

  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>

Normally, HDFS produces 3 replicas of each written block by default. Since we have two nodes for the time being, we should set that parameter to 2 not to get unnecessary error messages.

In yarn-site.xml, add the following properties between <configuration/> tag:

 <property>
    <name>yarn.resourcemanager.resource-tracker.address</name>
    <value>node1:8031</value>
 </property>
 <property>
    <name>yarn.resourcemanager.address</name>
    <value>node1:8032</value>
 </property>
 <property>
    <name>yarn.resourcemanager.scheduler.address</name>
    <value>node1:8030</value>
 </property>
 <property>
    <name>yarn.resourcemanager.admin.address</name>
    <value>node1:8033</value>
 </property>
 <property>
    <name>yarn.resourcemanager.webapp.address</name>
    <value>node1:8088</value>
 </property>

Now we will create the HDFS operation directory and set its user, group and permissions.

sudo mkdir -p /hdfs/tmp
sudo chown hduser:hadoop /hdfs/tmp
sudo chmod 750 /hdfs/tmp

Up until to this moment, the Hadoop installation was more or less the same with the first one. There are a few steps that should be accomplished before formatting the Hadoop file system.

First, we are to define which machines are slaves. These will only run DataNode and Nodemanager processes on themselves. The master machine must login them freely. We need to be sure about this.

In node1, open the slaves file.

vim /opt/hadoop-2.7.1/etc/hadoop/slaves

Add all nodes in it. The content of that file should be like this:

The very same file in node2 must be totally empty. The free login of hduser@node1 can be checked by:

su hduser
ssh node1
exit
ssh node2
exit
As the last step we will wipe and format the Hadoop file system. Before that, clean the hdfs file system of all nodes by running
rm -rf /hdfs/tmp/*

Now, only at node1, do

hdfs namenode -format

We are ready to start the services and run our first job! The services at node1 will be run by:

start-dfs.sh
start-yarn.sh

To check the running processes, run the following command in node1 and node2

jps

The processes in node1 will show

jps

while the processes in node 2 will be

jps_node2

Our next topic will be about a problem that we can encounter while setting up the Hadoop cluster.

Reading and Writing DB2 CLOB with Java

02/01/2016

A few months ago, I was requested to store and retrieve a text file which can reach to sizes of 40-50 Kilobytes. I was to use our DB2 database for those operations. The VARCHAR type is limited to 32Kb. Because of that limitation, I need to use Character Large Object (CLOB) type to store the file. The table creation process becomes this:

CREATE TABLESPACE <TABLE_SPACE>
 IN <DATABASE_NAME>
 USING STOGROUP <STORAGE_GROUP>;
 
CREATE LOB TABLESPACE <LOB_TABLE_SPACE>
 IN <DATABASE_NAME>
 USING STOGROUP <STORAGE_GROUP>;
 
CREATE TABLE <TABLE_NAME> (
 "ID" DECIMAL(19 , 0) NOT NULL GENERATED BY DEFAULT AS IDENTITY (
 NO MINVALUE 
 NO MAXVALUE 
 NO CYCLE
 CACHE 20
 NO ORDER ), 
 "DATA" CLOB(1048576) FOR SBCS DATA WITH DEFAULT NULL, 
 PRIMARY KEY("ID")
 )
 IN <DATABASE_NAME>.<TABLE_SPACE>
 AUDIT NONE
 DATA CAPTURE NONE 
 CCSID EBCDIC;
 
CREATE AUXILIARY TABLE <LOB_TABLE_NAME>
 IN <DATABASE_NAME>.<LOB_TABLE_SPACE>
 STORES <TABLE_NAME>
 COLUMN "DATA";

I put a 1MB limit but it is artificial. You can change that number however you see fit.

I was thinking to develop the simple application with Visual Basic but hit technical problems. I should use the getChunk() and appendChunk() methods but the CLOB field in DB2 seems not to be chunkable. It was weird. AT least I could not figure out what the underlying problem was. Therefore, I decided to code with Java.

My first task was to connect to DB2. To accomplish that, I imported the DB2 JDBC drivers from db2jcc4.jar. You can grab yours from here. So my code to connect to DB2 is like:

import com.ibm.db2.jcc.*;
import java.sql.*;
private static boolean connectToDb() {
  try {
    Class.forName("com.ibm.db2.jcc.DB2Driver");
    String url = "jdbc:db2://your.db2.host:port/node:";
    url += "user=user;password=password;";
    this.connection = DriverManager.getConnection(url);
    return true;
   } catch (ClassNotFoundException e) {
     e.printStackTrace();
   } catch (SQLException e) {
     e.printStackTrace();
   }
   return false;
 }

The db2jcc4.jar is enough to code and satisfy the compile time operations, but it is not enough to run on a client machine. That machine must have the necessary license jar files also. They can be obtained from your database administrators. For my case, they are named as db2jcc_javax.jar and db2jcc_license_cisuz.jar.

The second task is to read the text file and convert it to a CLOB. Here one important thing is your character set. My file was in Turkish so I used the “Cp1254”. This situation forced me to use the InputStreamReader class. My code is as follows:

private static String readFile() throws FileNotFoundException, IOException {
  FileInputStream fis = new FileInputStream(fileName);
  InputStreamReader isr = new InputStreamReader(fis,
  Charset.forName("Cp1254"));
  BufferedReader br = new BufferedReader(isr);
  String nextLine = "";
  StringBuffer sb = new StringBuffer();
  while ((nextLine = br.readLine()) != null) {
    sb.append(nextLine);
    sb.append(System.getProperty("line.separator"));
  }
  br.close();
  String clobData = sb.toString();
  return clobData;
}

I simply read the text file line by line and generate a String holding the whole file. How does it become a CLOB then? This code shows the CLOB creation and inserting it into DB2.

private static boolean insertTextFileIntoClob() {
  try {
    Clob clob = this.connection.createClob();
    String clobData = readFile();
    clob.setString(1, clobData);
    PreparedStatement pstmt = null;
    StringBuilder sb = new StringBuilder();
    sb.append("INSERT INTO ");
    sb.append(TABLE_NAME);
    sb.append(" (DATA) ");
    sb.append(" VALUES(?) ");
    pstmt = this.connection.prepareStatement(sb.toString());
    pstmt.setClob(1, clob);
    pstmt.executeUpdate();
    return true;
  } catch (Exception e) {
    e.printStackTrace();
  }
  return false;
}

I do not need to use PreparedStatement. Statement will also be fine.

When DB2 inserts a CLOB, it uses both the main table and the auxiliary table. To retrieve the real text data, we are to select the CLOB data and convert it to text again.

private static ResultSet selectClob() {
  try {
    StringBuilder sb = new StringBuilder();
    sb.append(" select data from ");
    sb.append(TABLE_NAME);
    Statement stmt = this.connection.createStatement();
    ResultSet rs = stmt.executeQuery(sb.toString());
    return rs;
  } catch (Exception e) {
    e.printStackTrace();
    return null;
  }
}

private static boolean clobToFile(ResultSet rs) {
  try {
    if (!rs.next())
      return false;
    Writer fw = new BufferedWriter(new OutputStreamWriter(
      new FileOutputStream(fileName), "Cp1254"));
    Clob clob = rs.getClob("DATA");
    Reader reader = clob.getCharacterStream();
    BufferedReader br = new BufferedReader(reader);
    String line;
    while (null != (line = br.readLine())) {
      StringBuilder sb = new StringBuilder();
      sb.append(line);
      sb.append(System.getProperty("line.separator"));
      fw.write(sb.toString());
    }
    br.close();
    fw.close();
    return true;
  } catch (SQLException e) {
    e.printStackTrace();
  } catch (IOException e) {
    e.printStackTrace();
  }
  return false;
}

I, again, generate the file by append all lines tail to tail. The character set is also important.

Now it is time to compile and deploy it to the user’s machine. My target machine uses 1.6. For the first time, I compiled with 1.7 and got this error while running. It was also a 32-bit machine. I decided to compile it with 1.6 32-bit JDK. Most probably, this is an overkill since I should only do

java -target 1.6 Code.java

but I wanted to be on the safe side. After creating the directory structure according to my package name, I compiled and run my code with the following commands:

directory_structure

The .class file(s) are ready and I am not required to generate exe files. So an executable jar is the way to go. Running this in terminal creates the executable jar.

jar -cf code.jar Code.class

The critical point is that, I need jar files, db2jcc4.jar, db2jcc_javax.jar and db2jcc_license_cisuz.jar namely, other than my own code to run properly. In Java, it is a little bit problematic. The solution I employed is to put those three files on my target machine. Furthermore, I add the line below inside the META-INF/manifest.mf file in the executable jar:

Class-Path: db2jcc4.jar db2jcc_javax.jar db2jcc_license_cisuz.jar

This enabled to use the classes in all those jar files. That manifest file consists both the class path and the main class which hosts the main(String[] args) method. We can define it by:

Main-class: <package-name>.<class-name>

After all those efforts, we are ready to run the application. In the terminal we run the following command and it is done.

java -jar code.jar <options>

Hope my experience can help you.