Data Processing with Apache Crunch at Spotify
All of our lovely Spotify users generate many terabytes of data every day. All the songs that are listened to, all the playlists you make, all the people you follow, and all the music you share. Somehow we need to organise, process and aggregate all of this into meaningful information out the other side. Here are just a few of the things we need to get out of the data:
- Reporting to record labels and rights holders so we can make sure everyone gets paid
- Creating toplists of what is the most popular music right now
- Getting feedback on how well different aspects of the product are working so we can improve the user experience
- Powering our intelligent radio and discovery features
To store and process all this data we use Hadoop, a framework for distributed storage and processing of huge amounts of data.
This is nothing new for us, of course, we’ve been using Hadoop here since 2009. In the past we’ve talked a lot about the tools we’ve developed to make it easy for developers and analysts to write data processing jobs using the MapReduce approach in Python (see Luigi). However, in 2013 in became apparent that our current strategy of running Python code over the Hadoop Streaming API just wasn’t performing well enough. Jobs were slow and CPU bound and we had far too many runtime failures on the Hadoop cluster. Furthermore, the environment was lacking higher-level abstractions which could have helped expressing developer intent. Instead we saw lots of unnecessary code duplication which was costly to maintain and debug. This coincided with a general move away from Python and towards Java as a language of choice for backend systems at Spotify, so we started looking for a better platform on which people could write the code for their data processing.
Enter Crunch…
In 2010, Google published a paper entitled FlumeJava: Easy, Efficient Data-Parallel Pipelines describing an abstraction for parallelisable data processing based on the notion of a lazily-evaluated “parallel collection” (PCollection) and various transformations that could be applied to it. Thanks to the work of Josh Wills at Cloudera and various other contributors from 2011 until the present day a system based on this called Crunch was implemented on top of Hadoop, making it available for everyone (including us) to use on top of their existing infrastructure.
Given our requirements at the time, we were attracted to Crunch for a number of reasons:
- Type-safety makes it much less likely to make mistakes in your code, which are very costly when running across a large Hadoop cluster
- High performance, especially compared to the performance of Python over Hadoop Streaming
- Higher-level abstractions such as filters, joins and aggregations instead of having to think of everything in terms of MapReduce
- First-class Avro support lets us work with strongly-typed data files with the ability to evolve schemas over time
- Pluggable execution engines such as MapReduce and Spark which let us keep up to date with new technology advancements in the big data space without having to rewrite all of our pipelines with each increment
- Simple powerful testing using the supplied MemPipline for fast in-memory unit tests
So how does this look in practice? Suppose we want to know how many tracks were played in each country on a given day:
public static void main(String[] args) { Pipeline pipeline = new MRPipeline(Example1StreamCountJob.class); PCollection<TrackPlayedMessage> trackPlayedMessages = pipeline.read(From.avroFile("/logs/track_played/2014-01-01", TrackPlayedMessage.class)); trackPlayedMessages .parallelDo(new MapFn<TrackPlayedMessage, String>() { public String map(TrackPlayedMessage input) { return input.getCountry().toString(); } }, strings()) .count() .parallelDo(new MapFn<Pair<String, Long>, String>() { public String map(Pair<String, Long> input) { return input.first() + "\t" + input.second(); } }, strings()) .write(To.textFile("/my/output/path")); pipeline.done(); }
That’s all that is required. If you execute this code with hadoop jar ...
then it’ll run a MapReduce job, reading from HDFS and writing back to HDFS at the end.
Now of course those anonymous inner classes are a bit ugly, but your IDE can usually hide them away or you can refactor them out to inner classes, and with any luck we’ll be able to do away with them altogether in the bright new promised land of Java 8. Either way, it’s an improvement on writing Java MapReduce jobs by hand.
Library functions
One of the great things about this approach is that it lends itself really nicely to factoring out common higher-level operations for your use-cases. We’ve been actively looking for patterns that people have implemented in several different pipelines and factoring them out into a library of tried-and-tested implementations as a library that people can use from their code. We’ve given it the not-particularly-imaginative name of crunch-lib and it’s open source and available on Maven central so you can take advantage of it too. Here are just a few of the things that are currently included:
- Easy field extraction and keying for Avro records
- Calculating percentiles and averages for numeric data
- Generating toplists of the most common items in a data set
This means something fairly complex like finding a list of the top 10 tracks played in each country becomes as simple as this:
PTable<String, String> countryTrack = AvroCollections.extract(trackPlayed, "country", "play_track", tableOf(strings(), strings())); PTable<String, Collection<Pair<Long, String>>> toplist = TopLists.topNYbyX(countryTrack, 10);
Unifying execution
The only remaining hurdle was how to integrate with all our other systems for scheduling and execution, and to do that we needed people to build their pipelines within some kind of predictable structure and command line interface. It’s no good having people reading and writing to arbitrary data sets within the job code itself if you want to manage the paths externally.
The solution we came up with was to remove the reading and writing altogether from the job code that developers write, and instead just write an annotated method for data processing using inputs and outputs in PCollection form, then have a general-purpose launcher which executes these jobs using configuration from the command line. It looks a bit like this:
public class MyJob extends AnnotationConfiguredJob @DataOut public PCollection process( @DataIn("trackPlayed") PCollection trackPlayed) { // do some processing here return transformed_collection; } }
As you can see we’ve omitted the reading, writing and pipeline management steps, as these will be performed by the common launching layer. This also has the additional benefit of enforcing a structure which is easily unit testable by passing in MemCollections and making assertions on the materialized output. The common launcher then allows this job to be run from the command line as follows:
hadoop jar my_shaded_jar.jar com.example.MyJob -PtrackPlayed=/logs/track_played/2014-01-01 -Poutput=/my/output/path
This Inversion of Control pattern means that this way of executing jobs works for any job, and makes it super easy to plug into other infrastructure (such as Luigi, which we still use for scheduling). It also means general purpose configuration (execution pools, non-standard memory requirements etc.) can be parsed at the common layer using familiar names, and eases the transition when ownership of pipelines is transferred between teams.
Summary
Crunch has been a huge boost for both our developer productivity and execution performance on Hadoop. I’d strongly recommend anyone who is still writing MapReduce by hand to have a look. Crunch might also be of interest to anyone frustrated with the current stability of Spark. It provides a similarly nice API that executes as good old fashioned MapReduce and there is even the subproject Scrunch that exposes a Scala API almost identical to the one provided by Spark. If you want to try it out a great place to start is the Crunch Getting Started Guide.
Tags: Apache Crunch, big data, hadoop, Java