At Wajam, we’re helping you find recommendations from friends you trust, whenever you need them. In order to do that, we constantly innovate with new technologies. Our technical blog posts are meant to share with you what we’ve learned along our high-tech adventures.
Wajam is a social search engine that gives you access to the knowledge of your friends. We gather your friends’ recommendations from Facebook, Twitter and other social platforms and serve these back to you on supported sites like Google, eBay, TripAdvisor and Wikipedia.
To do this, we aggregate, analyze and index relevant pieces of shared information on our users’ social networks. The challenge for the Business Intelligence team is not so much the storage of the vast number of logs that our millions of users are generating, as our Hadoop cluster is sizeable, rather it is how to quickly answer business questions by running Pig jobs across these entries.
As a Business Intelligence analyst who was only introduced to MapReduce (MR) jobs earlier this year I wanted to blog about how frustrating it can be to write Pig jobs to run across our Hadoop cluster containing our users’ raw logs – oh how I wanted to write that article. It would have consisted of diatribes about Pig’s horrible compile and run time error messaging, litanies of expletives relating to the sheer number of hours spent waiting for MR jobs to run across our cluster. More recent changes implemented at Wajam by one of our Business Intelligence (BI) gurus have, to a large extent, made such a programming polemic null and void. Thanks to just a little further processing of our logs and storing this rigidly formatted output, our reliance on the raw logs has decreased and with it at least the author’s Pig related profanity has lessened.
The primary goal behind these changes was to improve the speed with which we can provide a response to a business query. A secondary goal was to make our data stored on our Hadoop cluster more accessible for everyone.
ABOUT OUR PROCESS
Our Hadoop cluster (HC) was and continues to be used for the storage of raw aggregated logs sent to our namenode from the Scribe servers running on top of our many Web servers and for storing the outputs of the automated Pig jobs that run upon these logs. The reporting pipeline of Pig jobs is scheduled by some Scala scripts, with the overall process summarised below:
Much of the pipeline’s purpose is to aggregate information in the logs and regularly populate our MySQL server so that queries and alerts related to our traffic can be implemented in R. However it is often the raw logs that are queried for our adhoc requests, as our stored aggregated data may not fit the bill. It is here that our old structure was causing a sticking point.
To explain: the raw logs are broken into events, and each event has a certain amount of generic data e.g. date, timestamp, country, user id etc. A further set of fields is populated on a per-event basis. With this flexibility comes an element of complexity, for when querying these events one has to write pig jobs knowing exactly how all of the events are linked.
As an example, we have an event for every search that a user makes and one of the fields recorded is a unique search id. That same search id will carry across to other events for when we crawl our partners’ servers and for the events generated upon such successful crawls and so on and so forth.
Trawling through our memories for how all these events link together when writing a Pig script to answer a particular question regarding our users’ search behaviour can unnecessarily slow the process down.
Recent changes – of hardware and in mindset
Our solution to this problem? As there is little new under the Sun, we sought inspiration from others who are using Hadoop. LinkedIn, unsurprisingly, was a first port of call e.g. a simple, clean and clever idea like incremental processing to speed up regular Hadoop jobs.
With thoughts along similar lines, the BI team added a pre-processing step: store the collective information about every search that could be gathered from our events. As we have recently increased the size of our cluster to just under 100 nodes, the cost in space for storing this extra information was minimal. The positives of moving to this form of granular pre-processing on the other hand were manifold. Two highlights include:
1. Speed : When a business request came in previously, we often had to run Pig jobs which drew directly from the raw logs. The Pig scripts themselves are not all that difficult to write, however the run time was slowed by the sheer size of the files from which we are drawing. This load time issue was compounded when the request required us to load multiple weeks worth of data. Now a month of search data can be loaded in and processed under 2 hours, where previously an equivalent period in raw logs would have taken 3 – 4 hours or the job would simply have hung.
2. Ease of use: Conceptually it is far easier to query a rigid table like structure rather than try to join a myriad of events. This has the added benefit of making newcomers to Wajam less reticent to venture into the land of Hadoop and Pig.
Further to point 2 above and to lessen our reliance on MySQL, the next step is to add the Impala feather to our bow. Inspired by Google’s Dremel paper, the Cloudera Impala project aims “to bring real-time, ad hoc query capability to Apache Hadoop” .
“Cloudera Impala is an open source Massively Parallel Processing (MPP) query engine that runs natively on Apache Hadoop” . What does this encompass? Essentially multiple clients can be used to issue SQL queries and the Hive metastore provides information as to what databases are available and the schemas associated with same (See diagram below). Then there is a Cloudera Impala process running on every datanode in the HDFS which parses up the original SQL queries and runs them on each node.
It is the presence of each of these Impala instances on each node and the associate distributed query engine that means MapReduce can be bypassed. As a result of being able to remove MR, there have been instances of 68x speedup as compared to Hive . The extent of the speed improvement is query dependent.
In order to leverage the power of Impala, we are incrementally migrating our servers to CDH4 however some preliminary testing has already begun.
Coupling our more rigid data structures like our newly created search tables and Impala will likely see an even greater decrease in the turn around time for responding to business requests. The positive effect this will have on our department is hard to quantify – especially as the more approachable SQL like syntax and organised data sets will allow us to explore our data more readily.
That is not to say we will entirely rid ourselves of the need to run adhoc Pig jobs to answer particular requests. However if the bulk of the heavy lifting has been done, the occasional MR job will hopefully keep the colourful language to a minimum.
You can find Xavier Clements on Twitter. Apache Hadoop is a trademark of the Apache Software Foundation.