Wednesday, July 14, 2010

A Guided Tour of the Hadoop Zoo: Querying the Data

In a previous post, we saw how we can get our data into hadoop.  The next step is to extract useful information from it.  Originally, this step required programmers to write multiple MapReduce programs in Java, and to carefully and meticulously orchestrate them such that a job runs only after the input required by it has been produced by a prior job.   If you care about speed and processing/memory efficiency, this is still the way to go.  However, if you are more concerned about speed of writing queries and want to avoid mundane boiler plate Java code, the projects described below can help.

Let's start with Cascading.  Cascading is a Java library/API that enables us to easily assemble together the data processing actions required to extract information from our data.  The following example, taken from the very detailed Cascading User Guide, demonstrates how we can use Cascading to read each line of text from a file, parse it into words, then count the number of time the word is encountered.

// define source and sink Taps.
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );

Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

// the 'head' of the pipe assembly
Pipe assembly = new Pipe( "wordcount" );

// For each input Tuple
// parse out each word into a new Tuple with the field name "word"
// regular expressions are optional in Cascading
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );

// group the Tuple stream by the "word" value
assembly = new GroupBy( assembly, new Fields( "word" ) );

// For every Tuple group
// count the number of occurrences of "word" and store result in
// a field named "count"
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );

// initialize app properties, tell Hadoop which jar file to use
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );

// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new FlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

// execute the flow, block until complete
flow.complete();

The data is processed as it flows through the pipes defined by the Cascading API.  Cascading converts the data flow pipe assembly into a collection of MapReduce jobs.  It takes care of orchestrating the jobs such that they are launched only after their dependencies are satisfied -- i.e., all jobs producing output of interest to the job are complete.  If any error occurs in the processing pipeline, Cascading triggers a notification callback function and can continue processing after copying the offending data to a special trap file.  If we are not satisfied with the data processing primitives offered by Cascading, we can write our own Java MapReduce jobs and incorporate them into the Cascading data flow.

In Cascading, we don't have to write any Java code to implement MapReduce jobs via the Hadoop API.   There is hardly any boilerplate code.  Hower, we do have to write Java code to assemble the Cascading pipeline.  Can we do this without writing Java code? 

Yes, we can... with Pig.  In Pig, the data processing pipeline is written in Pig Latin.  Oundssay uchmay implersay anthay Avajay, ightray?  May we should have just stuck with Java!?.  No, we are not talking about http://en.wikipedia.org/wiki/Pig_Latin; instead we are talking about Hadoop PigLatin.  Let us see what it looks like.  The folowing example, very closely based on the examples in the Pig Latin Manual, tries to find the number of pets owned by adults in a pet store.

all_data = LOAD 'petstore.dat' AS (owner:chararray, pet_type:chararray, pet_num:int, owner_age:int);
DUMP all_data;
(Alice,turtle,1,30)
(Alice,goldfish,5,30)
(Alice,cat,2,30)
(Bob,dog,2,19)
(Bob,cat,2,19) 
(Chris,dog,1,22) 

adult_data = FILTER all_data BY age > 21;
DUMP adult_data; 
(Alice,turtle,1,30)
(Alice,goldfish,5,30)
(Alice,cat,2,30)
(Chris,dog,1,22) 

pets_by_owner = GROUP adult_data BY owner;

DUMP pets_by_owner;
(Alice,{(Alice,turtle,1,30),(Alice,goldfish,5,30),(Alice,cat,2,30)})
(Chris,{(Bob,dog,1,22))})
owner_pet_count = FOREACH B GENERATE group, SUM(adult_data.pet_num);
DUMP owner_pet_count;
(Alice,8L)
(Chris,1L)

Above, we first load the data from our log file (most likely stored in HDFS), filter by age, group the pets by owner and finally output the total number of pets per owner. We can supply User Defined Functions (UDFs) written in Java to support complex processing at any of the data processing stages listed above. For example, if the data is stored in some esoteric format, we can provide our own Java parser. If the filter condition needs to be complex, we can supply our own filter condition written in Java.

With Pig and Cascading, we can create our data processing pipeline much faster than manually writing a sequence of MapReduce jobs.  The main disadvantage of Pig and Cascsading is that you must learn a new language or API, however simple it is.  Wouldn't it be great if we could just use a query language we are already familiar with?

That's what the folks at Facebook thought too.. and we got Hive.  Hive enables us to query our data that is stored in HDFS using a SQL like language.  In Hive, the pet store query will be:
CREATE EXTERNAL TABLE pets (
    owner STRING,
    pet_type STRING,
    pet_num INT,
    owner_age INT
 ) 
 ROW FORMAT DELIMITED FIELD TERMINATED BY '\t'
LOCATION '/path/to/petstore.dat'; 
 
SELECT owner, SUM(pet_num)
FROM pets
WHERE owner_age > 21;

We first overlay a structure on top of our petstore.dat log file using the CREATE TABLE statement.  Note that we do not modify or move the log file.  We can then run almost any SQL query  against the table - group by, order by, nested queries, joins are all supported.  Hive compiles these queries into a sequence of MapReduce jobs.  The default Hive shells prints the output tables to stdout.  We can also run HQL queries from within scripts and capture their output using the Hive JDBC and ODBC drivers.

We can extend HQL (Hive Query Language) with User Defined Functions written in Java.  For example, we can use a UDF that converts IP addresses to city names  in our HQL queries.  We can also embed custom map/reduce tasks written in ANY language directly into HQL queries.  With a custom SerDe written in Java, we can run queries against log files that are in custom formats.
.
Not everyone understands or likes SQL.  Are there any languages for those folks?  I know of atleast one more -- Cascalog, announced at the Hadoop 2010 Summit. You may want to use Cascalog for querying data stored in hadoop, if you like Lisp or Datalog or functional programming.  Otherwise Cascalog may look a bit alien: Here's a query to count the number of words:

(?<- (stdout) [?word ?count] (sentence ?s) (split ?s  :> ?word) (c/ count ?count))

Cascalog runs on top of Clojure (a functional programming language on the JVM) and uses Cascading to produce the Map Reduce queries.

Let me conclude this post by talking about my language of preference.  I prefer Hive because I and a LOT of developers/analysts are very familiar with SQL.  Also the database-like interface provided by Hive allows us to connect existing business intelligence tools like Microstrategy with Hive (another tool example: Intellicus) and perform complex analysis with ease.  If I wanted tight programmatic control of my MapReduce jobs, I will most likely use Cascading.

Monday, July 05, 2010

A Guided Tour of the Hadoop Zoo: Getting Data In

You probably have tens or thousands of servers producing loads of data, logging each and every interaction of users with your web application.   To load these logs into HDFS, you have two options:
  1. Write a script to periodically copy each file into HDFS using the HDFS commandline put command.
  2. Change your logging system so that servers directly write to HDFS.
To  do (1) scalably and reliably, you probably need to enlist a script ninja.  But for (2), you can simply rely upon the tortoises - Chukwa and Honu, or the water channel they probably never swim in -- Flume (I am trying hard to keep the zoo setting here.  Don't think I will make it much further.), or Sqoop (Ok.  I am out.  Did not expect to lose the animal analogy so early!), or Scribe.

Chukwa is a distributed data collection and processing platform.  A Chukwa agent runs on each application server.  The application sends logs to the Chukwa agent via files or UDP packets.  The agent forwards the logs to a handful of Collectors.  These collectors aggregate logs from hundreds of agents and write them into HDFS as big files (HDFS is better at serving a small number of large files, rather than a large number of small files).  A MapReduce job archives and demuxes these log files, every few minutes. Archiving involves rewriting the log files so that logs of the same type (say, logs from application X on cluster Z) are written together on disk.  Demuxing involves parsing the log files to extract out structured data (say, key value pairs) which can subsequently be loaded into a database and queried through the Hadoop Infrastructure Care Center web portal.

Honu, recently opensourced by Netflix, is very similar to Chukwa.  Chukwa's original focus was to aggregate and query log files generated by a Hadoop cluster.  In contrast, Honu's focus is to directly stream  (non-Hadoop) application logs to stable storage and provide a simple interface to query them.  Additionally, Honu focuses on achieving this in the cloud -- for example using Amazon Elastic Compute Cloud (EC2),  Simple Storage Service (S3) and Elastic Map Reduce (EMR).  To use Honu, applications write their log messages through the Honu client-side SDK.  The SDK forwards the logs to collectors (no intermediary agents like in Chukwa), which continuously save the log files to HDFS.  Periodic MapReduce jobs process the log files and save them in a format that can be queried through the SQL-like interface provided by Hive (to be discussed in a subsequent post).  Unlike Chukwa, Honu can collect logs from non-Java applications.

Flume is an even more recent distributed log collection and processing system, announced by Cloudera at the 2010 Hadoop Summit.  Flume shares the same basic architecture as Chukwa and Honu --  Agents gather logs from applications and forward them to collectors that aggregate and store them.  What sets Flume apart is its comprehensive built-in support for manageability, extensibility, and multiple degrees of reliability, and the extensive documentation. The entire log collection data flow is defined and managed from a centralized Flume Master (Web UI/console).  The administrator can specify the input sources and their format (syslogd, apache logs, text file, scribe, twitter, RPC,custom, etc), the collectors that the agents talk to and their failover paths, the data collection sinks (HDFS, RPC, scribe, text file, etc) and their output format (avro, json, log4j, syslog), how log events should be bucketed into different directories based on their meta data dictionary, the reliability level to be used (end-to-end, store-on-failure, best effort), and much more, all through the Flume Master without having to restart the agents and collectors.

Chukwa, Honu and Flume help you load large volumes of logs into HDFS for further analysis.  What if your analysis involves data (for example, user profile information) locked up in relational databases?  Instead of directly hitting and potentially slowing down production relational databases, it is better to periodically dump the tables of interest into Hadoop before analysis. Sqoop, also from Cloudera, makes loading large amounts of data from a relational database into Hadoop (or even Hive) just one command line away.  Sqoop takes care of automatically retrieving the table structure using JDBC, creating Java classes to aid MapReduce, and to efficiently bulk load the data into HDFS using the specific database's bulk export mechanisms.

Scribe, from Facebook, is a streaming log collection that has been available for over a year now.  An application uses the scribe client side library to send log messages to the scribe server running on the same server.  The scribe server aggregates the log messages and forwards them to a central group of one or more scribe servers.  These central servers write the aggregated logs to a distributed file system (like HDFS) or forward them further to another layer of scribe servers.  Log messages are routed to different destination files based on their user-defined category, controlled by dynamic configuration files.  A local scribe server stores the log messages on local disk and retries later if the central server to which it wishes to send logs is not reachable.   Unlike Chukwa, Honu and Flume which are implemented in Java,  Scribe is implemented in C++.

A Guided Tour of the Hadoop Zoo: Welcome

It's a fine sunny Sunday, a perfect day to do something outdoors, like visit the zoo.  However, if you are lazy like I am, how about visiting the Hadoop Zoo.  From elephants to elephant bees to elephant birds, the Hadoop Zoo has got enough 'animals' to rival a zoo.

Let's start with the basic question.  Why would you want to visit the Hadoop Zoo?  The most common answer is that you have a lot (by lot, I mean giga/tera/peta bytes of data) what you want to store and do something useful with.  The various animals, and actually, lots of non-animals, help you do exactly this.

Let's start with the cute elephant, Hadoop, the central attraction of the zoo.  The Hadoop File System (HDFS) and the Hadoop MapReduce Framework are the core components of Hadoop.  HDFS stores gigantic amounts of data in a distributed, scalable and reliable fashion.  The Hadoop MapReduce framework helps you write Java (and some other languages) programs to efficiently process your data into valuable information.

That's it - HDFS and the MapReduce framework are all you need to store and process a huge amount of data.  And that's all you had a few years ago.  But now, there are more animals in the zoo, that make your visit more fun and your life easier.

This series of blog posts is an attempt to record and expand my understanding of the various components of the Hadoop ecosystem.  I am by no means a Hadoop expert.  So, if you find something wrong or missing, please do send me an email or add a comment.

Sunday, April 11, 2010

Running HQL from Python without using the Hive Standalone Server

To use a language other than Java (say python) with Hive, you must use the Hive Standalone Server. The main disadvantage of using the Hive Standalone Server is that it is currently single threaded [HIVE-80].  Additionally, there is the inconvenience of running an additional server.
 
We can solve this problem by using Jython (and possibly JRuby).   Jython enables us to use Hive's Java client library to execute the HQL query and retrieve the results.  We can then process the results in pure python.

Let us try it out:

STEP 1:
Download and install Jython.

STEP 2:
Make sure you have the following jars and directories in your CLASSPATH.
  • hive-service-0.6.0.jar
  • libfb303.jar
  • log4j-1.2.15.jar
  • antlr-runtime-3.0.1.jar derby.jar
  • jdo2-api-2.3-SNAPSHOT.jar
  • commons-logging-1.0.4.jar
  • datanucleus-core-1.1.2.jar
  • datanucleus-enhancer-1.1.2.jar
  • datanucleus-rdbms-1.1.2.jar
  • hive-exec-0.6.0.jar
  • hive-jdbc-0.6.0.jar
  • hive-metastore-0.6.0.jar
  • derby.jar
  • jdo2-api-2.3-SNAPSHOT.jar
  • commons-lang-2.4.jar
  • hadoopcore/hadoop-0.20.0/hadoop-0.20.0-core.jar
  • /usr/lib/hadoop-0.20/lib/mysql-connector-java-5.0.8-bin.jar
  • conf (this is your hive installation's build/dist/conf directory)
Jar locations and versions may be different in your hive installation.

STEP 3:
Create a test data file /tmp/test.dat with the following lines
1:one
2:two
3:three

STEP 4:
Run the following Jython script
from java.lang import *
from java.lang import *
from java.sql import *

driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";

try:
  Class.forName(driverName);
except Exception, e:
  print "Unable to load %s" % driverName
  System.exit(1);

conn = DriverManager.getConnection("jdbc:hive://");
stmt = conn.createStatement();

# Drop table
#stmt.executeQuery("DROP TABLE testjython")

# Create a table
res = stmt.executeQuery("CREATE TABLE testjython (key int, value string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ':'")

# Show tables
res = stmt.executeQuery("SHOW TABLES")
print "List of tables:"
while res.next():
    print res.getString(1)

# Load some data
res = stmt.executeQuery("LOAD DATA LOCAL INPATH '/tmp/test.dat' INTO TABLE testjython")

# SELECT the data
res = stmt.executeQuery("SELECT * FROM testjython")
print "Listing contents of table:"
while res.next():
    print res.getInt(1), res.getString(2)


You should see the following output, amidst a whole lot of debug statements:
1 one
2 two
3 three