Deploying Apache Spark into EC2 has never been easier using spark-ec2 deployment scripts or with Amazon EMR, which has builtin Spark support. However, I found that getting Apache Spark, Apache Avro and S3 to all work together in harmony required chasing down and implementing a few technical details.
There were various online sources that helped me assemble all the bits I needed together, but none of them had everything required in a single place, or the information was based off deprecated APIs. So I figured I’d drop all the required pieces here and provide a working example.
My particular need was to read/write a few Terabytes of Avro files to and from S3. For the curious, there are a few reasons I chose not to use Apache Parquet or Apache ORC:
Kafka parts of the stack are already using Avro
Columnarizing the data incurs additional cost (cpu)
For my specific purpose, I wouldn’t get to enjoy the benefits of columnar storage. The jobs that I’m running are not useful unless all the data for a record is present during the map phase.
Avro Hadoop Input/Output Formats
The first step on our journey is determing what Hadoop formats to use, and configuring Spark to use them. Hadoop input/output formats are specified using a key/value pairs. For my usecase all the information I need is encompased in our Avro record. So to describe this in Spark we can use newAPIHadoopFile (reading) and saveAsNewAPIHadoopFile (writing) in the following way:
Efficiently writing to Amazon S3 using Spark
Files in hadoop are normally written into a temporary location and then atomically renamed when the job completes. This is performed by some implementation of Hadoops’ OutputCommitter interface (generally FileOutputCommitter).
Unfortunately, when using S3 there are performance and consistency issues renaming files, unlike with HDFS. To make this more efficient and safe for EC2+S3 we can override this behavior by implementing our own OutputComitter. Doing so also requires us to implement our own OutputFormat that uses our custom comitter.
Enabling Kryo with Avro
The next hurdle to jump through is getting everything working with Kryo. This is something very important and should not be neglected! Enabling Kryo drastically improves performance (10x according to the Spark tuning docs). In fact, when developing Spark applications I always set spark.kryo.registrationRequired = true so that I am absolutely sure there aren’t classes flying under the radar.
Figuring out how to get Avro, Spark, and Kryo working together was a bit tricky to track down. The gist is that it requires using a custom Kryo Registrator which explicitly designates a Kryo serializer to use for Avro objects. Specifying this just requires a few lines of boilerplate.
I whipped up a generic serialization wrapper class to make the registration of many Avro types easier, example is as follows:
Reading Avro Files with Spark (plus one caveat)
One issue I ran into that left me scratching my head is that the Avro input format uses a reusable buffer. There are many benefits to this, but in Spark you will end up with an iterator of objects that point to the same location. This causes all sorts of strange reading behavior (seeing the same object multiple times, for instance).
The workaround for this if you don’t want to worry about it is simple, just make a copy of the Avro object when reading. I also took the liberty of creating an AvroUtil class to make reading/writing Avro files in Spark as simple as possible: