Monday, July 05, 2010

A Guided Tour of the Hadoop Zoo: Getting Data In

You probably have tens or thousands of servers producing loads of data, logging each and every interaction of users with your web application.   To load these logs into HDFS, you have two options:
  1. Write a script to periodically copy each file into HDFS using the HDFS commandline put command.
  2. Change your logging system so that servers directly write to HDFS.
To  do (1) scalably and reliably, you probably need to enlist a script ninja.  But for (2), you can simply rely upon the tortoises - Chukwa and Honu, or the water channel they probably never swim in -- Flume (I am trying hard to keep the zoo setting here.  Don't think I will make it much further.), or Sqoop (Ok.  I am out.  Did not expect to lose the animal analogy so early!), or Scribe.

Chukwa is a distributed data collection and processing platform.  A Chukwa agent runs on each application server.  The application sends logs to the Chukwa agent via files or UDP packets.  The agent forwards the logs to a handful of Collectors.  These collectors aggregate logs from hundreds of agents and write them into HDFS as big files (HDFS is better at serving a small number of large files, rather than a large number of small files).  A MapReduce job archives and demuxes these log files, every few minutes. Archiving involves rewriting the log files so that logs of the same type (say, logs from application X on cluster Z) are written together on disk.  Demuxing involves parsing the log files to extract out structured data (say, key value pairs) which can subsequently be loaded into a database and queried through the Hadoop Infrastructure Care Center web portal.

Honu, recently opensourced by Netflix, is very similar to Chukwa.  Chukwa's original focus was to aggregate and query log files generated by a Hadoop cluster.  In contrast, Honu's focus is to directly stream  (non-Hadoop) application logs to stable storage and provide a simple interface to query them.  Additionally, Honu focuses on achieving this in the cloud -- for example using Amazon Elastic Compute Cloud (EC2),  Simple Storage Service (S3) and Elastic Map Reduce (EMR).  To use Honu, applications write their log messages through the Honu client-side SDK.  The SDK forwards the logs to collectors (no intermediary agents like in Chukwa), which continuously save the log files to HDFS.  Periodic MapReduce jobs process the log files and save them in a format that can be queried through the SQL-like interface provided by Hive (to be discussed in a subsequent post).  Unlike Chukwa, Honu can collect logs from non-Java applications.

Flume is an even more recent distributed log collection and processing system, announced by Cloudera at the 2010 Hadoop Summit.  Flume shares the same basic architecture as Chukwa and Honu --  Agents gather logs from applications and forward them to collectors that aggregate and store them.  What sets Flume apart is its comprehensive built-in support for manageability, extensibility, and multiple degrees of reliability, and the extensive documentation. The entire log collection data flow is defined and managed from a centralized Flume Master (Web UI/console).  The administrator can specify the input sources and their format (syslogd, apache logs, text file, scribe, twitter, RPC,custom, etc), the collectors that the agents talk to and their failover paths, the data collection sinks (HDFS, RPC, scribe, text file, etc) and their output format (avro, json, log4j, syslog), how log events should be bucketed into different directories based on their meta data dictionary, the reliability level to be used (end-to-end, store-on-failure, best effort), and much more, all through the Flume Master without having to restart the agents and collectors.

Chukwa, Honu and Flume help you load large volumes of logs into HDFS for further analysis.  What if your analysis involves data (for example, user profile information) locked up in relational databases?  Instead of directly hitting and potentially slowing down production relational databases, it is better to periodically dump the tables of interest into Hadoop before analysis. Sqoop, also from Cloudera, makes loading large amounts of data from a relational database into Hadoop (or even Hive) just one command line away.  Sqoop takes care of automatically retrieving the table structure using JDBC, creating Java classes to aid MapReduce, and to efficiently bulk load the data into HDFS using the specific database's bulk export mechanisms.

Scribe, from Facebook, is a streaming log collection that has been available for over a year now.  An application uses the scribe client side library to send log messages to the scribe server running on the same server.  The scribe server aggregates the log messages and forwards them to a central group of one or more scribe servers.  These central servers write the aggregated logs to a distributed file system (like HDFS) or forward them further to another layer of scribe servers.  Log messages are routed to different destination files based on their user-defined category, controlled by dynamic configuration files.  A local scribe server stores the log messages on local disk and retries later if the central server to which it wishes to send logs is not reachable.   Unlike Chukwa, Honu and Flume which are implemented in Java,  Scribe is implemented in C++.


Dilip said...

Comparing Flume with other systems on the Flume mailing list:



Message Queue

Prash said...

If we have a web log file that consists of all the queries that were generated as a result of user actions, then in that case would you suggest that we go for chukwa or hive ? I dont believe we use log4j. In that case, hive would make it much easier ?