Wednesday, July 14, 2010

A Guided Tour of the Hadoop Zoo: Querying the Data

In a previous post, we saw how we can get our data into hadoop.  The next step is to extract useful information from it.  Originally, this step required programmers to write multiple MapReduce programs in Java, and to carefully and meticulously orchestrate them such that a job runs only after the input required by it has been produced by a prior job.   If you care about speed and processing/memory efficiency, this is still the way to go.  However, if you are more concerned about speed of writing queries and want to avoid mundane boiler plate Java code, the projects described below can help.

Let's start with Cascading.  Cascading is a Java library/API that enables us to easily assemble together the data processing actions required to extract information from our data.  The following example, taken from the very detailed Cascading User Guide, demonstrates how we can use Cascading to read each line of text from a file, parse it into words, then count the number of time the word is encountered.

// define source and sink Taps.
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );

Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

// the 'head' of the pipe assembly
Pipe assembly = new Pipe( "wordcount" );

// For each input Tuple
// parse out each word into a new Tuple with the field name "word"
// regular expressions are optional in Cascading
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );

// group the Tuple stream by the "word" value
assembly = new GroupBy( assembly, new Fields( "word" ) );

// For every Tuple group
// count the number of occurrences of "word" and store result in
// a field named "count"
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );

// initialize app properties, tell Hadoop which jar file to use
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );

// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new FlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

// execute the flow, block until complete
flow.complete();

The data is processed as it flows through the pipes defined by the Cascading API.  Cascading converts the data flow pipe assembly into a collection of MapReduce jobs.  It takes care of orchestrating the jobs such that they are launched only after their dependencies are satisfied -- i.e., all jobs producing output of interest to the job are complete.  If any error occurs in the processing pipeline, Cascading triggers a notification callback function and can continue processing after copying the offending data to a special trap file.  If we are not satisfied with the data processing primitives offered by Cascading, we can write our own Java MapReduce jobs and incorporate them into the Cascading data flow.

In Cascading, we don't have to write any Java code to implement MapReduce jobs via the Hadoop API.   There is hardly any boilerplate code.  Hower, we do have to write Java code to assemble the Cascading pipeline.  Can we do this without writing Java code? 

Yes, we can... with Pig.  In Pig, the data processing pipeline is written in Pig Latin.  Oundssay uchmay implersay anthay Avajay, ightray?  May we should have just stuck with Java!?.  No, we are not talking about http://en.wikipedia.org/wiki/Pig_Latin; instead we are talking about Hadoop PigLatin.  Let us see what it looks like.  The folowing example, very closely based on the examples in the Pig Latin Manual, tries to find the number of pets owned by adults in a pet store.

all_data = LOAD 'petstore.dat' AS (owner:chararray, pet_type:chararray, pet_num:int, owner_age:int);
DUMP all_data;
(Alice,turtle,1,30)
(Alice,goldfish,5,30)
(Alice,cat,2,30)
(Bob,dog,2,19)
(Bob,cat,2,19) 
(Chris,dog,1,22) 

adult_data = FILTER all_data BY age > 21;
DUMP adult_data; 
(Alice,turtle,1,30)
(Alice,goldfish,5,30)
(Alice,cat,2,30)
(Chris,dog,1,22) 

pets_by_owner = GROUP adult_data BY owner;

DUMP pets_by_owner;
(Alice,{(Alice,turtle,1,30),(Alice,goldfish,5,30),(Alice,cat,2,30)})
(Chris,{(Bob,dog,1,22))})
owner_pet_count = FOREACH B GENERATE group, SUM(adult_data.pet_num);
DUMP owner_pet_count;
(Alice,8L)
(Chris,1L)

Above, we first load the data from our log file (most likely stored in HDFS), filter by age, group the pets by owner and finally output the total number of pets per owner. We can supply User Defined Functions (UDFs) written in Java to support complex processing at any of the data processing stages listed above. For example, if the data is stored in some esoteric format, we can provide our own Java parser. If the filter condition needs to be complex, we can supply our own filter condition written in Java.

With Pig and Cascading, we can create our data processing pipeline much faster than manually writing a sequence of MapReduce jobs.  The main disadvantage of Pig and Cascsading is that you must learn a new language or API, however simple it is.  Wouldn't it be great if we could just use a query language we are already familiar with?

That's what the folks at Facebook thought too.. and we got Hive.  Hive enables us to query our data that is stored in HDFS using a SQL like language.  In Hive, the pet store query will be:
CREATE EXTERNAL TABLE pets (
    owner STRING,
    pet_type STRING,
    pet_num INT,
    owner_age INT
 ) 
 ROW FORMAT DELIMITED FIELD TERMINATED BY '\t'
LOCATION '/path/to/petstore.dat'; 
 
SELECT owner, SUM(pet_num)
FROM pets
WHERE owner_age > 21;

We first overlay a structure on top of our petstore.dat log file using the CREATE TABLE statement.  Note that we do not modify or move the log file.  We can then run almost any SQL query  against the table - group by, order by, nested queries, joins are all supported.  Hive compiles these queries into a sequence of MapReduce jobs.  The default Hive shells prints the output tables to stdout.  We can also run HQL queries from within scripts and capture their output using the Hive JDBC and ODBC drivers.

We can extend HQL (Hive Query Language) with User Defined Functions written in Java.  For example, we can use a UDF that converts IP addresses to city names  in our HQL queries.  We can also embed custom map/reduce tasks written in ANY language directly into HQL queries.  With a custom SerDe written in Java, we can run queries against log files that are in custom formats.
.
Not everyone understands or likes SQL.  Are there any languages for those folks?  I know of atleast one more -- Cascalog, announced at the Hadoop 2010 Summit. You may want to use Cascalog for querying data stored in hadoop, if you like Lisp or Datalog or functional programming.  Otherwise Cascalog may look a bit alien: Here's a query to count the number of words:

(?<- (stdout) [?word ?count] (sentence ?s) (split ?s  :> ?word) (c/ count ?count))

Cascalog runs on top of Clojure (a functional programming language on the JVM) and uses Cascading to produce the Map Reduce queries.

Let me conclude this post by talking about my language of preference.  I prefer Hive because I and a LOT of developers/analysts are very familiar with SQL.  Also the database-like interface provided by Hive allows us to connect existing business intelligence tools like Microstrategy with Hive (another tool example: Intellicus) and perform complex analysis with ease.  If I wanted tight programmatic control of my MapReduce jobs, I will most likely use Cascading.

No comments: