Database Access with Apache Hadoop

The DBInputFormat and DBOutputFormat component provided in Hadoop 0.19 finally allows easy import and export of data between Hadoop and many relational databases, allowing relational data to be more easily incorporated into your data processing pipeline.

To import and export data between Hadoop and MySQL, you surely need Hadoop, MySQL installation on your machine.

  • My System Configuration

UBUNTU 13.4

JAVA 1.7.0_25

HADOOP 1.1.2

MySQL

Download mysql-connector-java-5.0.5.jar file and copy it to in $HADOOP_HOME/lib and restart the Hadoop ecosystem.

  • Database and table creation in MySQL
mysql> use testDb;

mysql> create table studentinfo (  id integer ,  name varchar(32) );

mysql> insert into studentinfo values(1,'archana');

mysql> insert into studentinfo values(2,'XYZ');

mysql> insert into studentinfo values(3,'archana');

  • Project Structure

The program contains the following java files.

Main.java
Map.java
Reduce.java
DBInputWritable.java
DBOutputWritable.java

To access the data from DB we have to create a class to define the data which we are going to fetch and write back to DB. In my project I created a class namely DBInputWritable.java and DBOutputWritable.java to accomplish the same.

DBInputWritable.java

package example;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class DBInputWritable implements Writable, DBWritable
{
   private int id;
   private String name;

   public void readFields(DataInput in) throws IOException {   }

   public void readFields(ResultSet rs) throws SQLException
   //Resultset object represents the data returned from a SQL statement
   {
     id = rs.getInt(1);
     name = rs.getString(2);
   }

   public void write(DataOutput out) throws IOException {  }

   public void write(PreparedStatement ps) throws SQLException
   {
     ps.setInt(1, id);
     ps.setString(2, name);
   }

   public int getId()
   {
     return id;
   }

   public String getName()
   {
     return name;
   }
}

This class “DBInputWritable” will be used in our Map class. Now let’s write our Mapper class.

Map.java

package example;

import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class Map extends Mapper<LongWritable, DBInputWritable, Text, IntWritable>
{
   private IntWritable one = new IntWritable(1);

   protected void map(LongWritable id, DBInputWritable value, Context ctx)
   {
     try
     {
        String[] keys = value.getName().split(" ");

        for(String key : keys)
        {
           ctx.write(new Text(key),one);
        }
     } catch(IOException e)
     {
        e.printStackTrace();
     } catch(InterruptedException e)
     {
        e.printStackTrace();
     }
   }
}

DBOutputWritable.java

package example;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class DBOutputWritable implements Writable, DBWritable
{
   private String name;
   private int count;

   public DBOutputWritable(String name, int count)
   {
     this.name = name;
     this.count = count;
   }

   public void readFields(DataInput in) throws IOException {   }

   public void readFields(ResultSet rs) throws SQLException
   {
     name = rs.getString(1);
     count = rs.getInt(2);
   }

   public void write(DataOutput out) throws IOException {    }

   public void write(PreparedStatement ps) throws SQLException
   {
     ps.setString(1, name);
     ps.setInt(2, count);
   }
}

This class “DBOutputWritable” will be used in our Reduce class. Now let’s write our Reducer class.

Reduce.java

package example;

import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;

public class Reduce extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable>
{
   protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)
   {
     int sum = 0;

     for(IntWritable value : values)
     {
       sum += value.get();
     }

     try
     {
     ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get());
     } catch(IOException e)
     {
       e.printStackTrace();
     } catch(InterruptedException e)
     {
       e.printStackTrace();
     }
   }
}

Main.java

package example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;

public class Main
{
   public static void main(String[] args) throws Exception
   {
     Configuration conf = new Configuration();
     DBConfiguration.configureDB(conf,
     "com.mysql.jdbc.Driver",   // driver class
     "jdbc:mysql://localhost:3306/testDb", // db url
     "root",    // user name
     "hadoop123"); //password

     Job job = new Job(conf);
     job.setJarByClass(Main.class);
     job.setMapperClass(Map.class);
     job.setReducerClass(Reduce.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(IntWritable.class);
     job.setOutputKeyClass(DBOutputWritable.class);
     job.setOutputValueClass(NullWritable.class);
     job.setInputFormatClass(DBInputFormat.class);
     job.setOutputFormatClass(DBOutputFormat.class);

     DBInputFormat.setInput(
     job,
     DBInputWritable.class,
     "studentinfo",   //input table name
     null,
     null,
     new String[] { "id", "name" }  // table columns
     );

     DBOutputFormat.setOutput(
     job,
     "output",    // output table name
     new String[] { "name", "count" }   //table columns
     );

     System.exit(job.waitForCompletion(true) ? 0 : 1);
   }
}

Now you are ready to run the program. Convert the code to .jar file and run it.

Execute jar file

$ hadoop jar /home/hduser/DbIpOp.jar

Result

mysql> select * from output;

+-----------+-------+

| name      | count |

+-----------+-------+

| archana |     2    |

|  XYZ      |     1  |

+----------+------=--+2 rows in set (0.19 sec)

Reference

http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/

Advertisements

15 Responses to Database Access with Apache Hadoop

  1. It was nice to read your post.The information that you have provided is very nice and quite useful for many people.I am sure many people will like it.

    Like

  2. Hi Alison Benson, thanks for visiting, I hope you enjoy the article 🙂

    Like

  3. raj says:

    i am very hardly thanks ful to u this code is very useful to me.
    i had used it it’s working well,

    i wnat to same code but want to work with Hive.
    plz post code with hive……
    thanks

    Like

  4. Raju says:

    hi i followed your tutorial, i run the example given by you for mysql database didnt give me output. it shows blank and no output table is created. i am using hadoop-0.19.1 with cygwin and eclipse Europa. i know this is old version, but anyway i need to run an example for my project purpose pls help me to come out from this.

    Like

  5. Akshay says:

    Hey Archana I tried compiling Map.java and Main.java but they give an error cannot find symbol DBInputWritable. I have compiled the other files and created a package example which contains the class files. Any help is appreciated.

    Like

    • Akshay says:

      Ok I did it by integrating all files together but there’s a problem with the jdbc connection to hadoop. It throws an exception org.postgres.Driver class not found exception. I have added the jar file inthe hadoop lib directory. Any help with that ?

      Like

  6. Dhinesh says:

    Hi i tried with the above sample but it throws error
    Exception in thread “main” java.lang.NoSuchMethodException: mysql_hadoop.main.main([Ljava.lang.String;)
    at java.lang.Class.getMethod(Class.java:1665)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:154)

    Like

  7. Ive written a similar program with Stock Data example & aint quite working well. Please let me know where am going wrong.Refer to below link:

    http://stackoverflow.com/questions/27164509/dbinputwritable-throwing-exception

    Like

  8. Nice article. I’m laughing after reading some of the comments. Some people expect you to do their job and some others are just idiots.

    Like

  9. I’m laughing reading these comments. Some people expect you to do their jobs and some others are just idiots.

    Like

  10. Sumeet Chorghade says:

    grt article archana …thanks

    Like

  11. gopi says:

    hii Archana,

    Everything ok but error output table is not created in the processing map reducer

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: