Building a Faster ETL Pipeline with Flume, Kafka, and Hive


At WordPress.com we process a lot of events including some some events that are batched and sent asynchronously sometimes days later. But when querying this data we are likely to care more about when the events occurred rather then when it was sent to our servers. Knowing this we store our event data in Hive partitioned by when the events occurred rather then when they are ingested.

Event Ingestion, Take One

The initial design of our ETL pipeline looks something like this:

  • Raw logs are aggregated and processed by a custom parser which functions as both an aggregator for high level stats as well as emitter of raw logs into the various Kafka topics.
  • A Flume agent then uses the Kafka source to pull from the appropriate topic.
  • The Flume Morphline interceptor is then used to do a series of transformations including annotating what type of event the log line represented.
  • Events are buffered via a memory channel and sent to the Kite Dataset sink.
  • Kite then handles interacting with Hive and persisting the events in HDFS.

With this ETL data is available for querying almost immediately and is stored in (close to) their final state within Hive tables. As an added bonus, because we are using Kite Datasets and the accompanying Flume sink Hive partitions are handled for us automatically. However due to the asynchronous nature of our event collection we end up having to write to multiple partitions at the same time which results in the formation of many small files. To get around this we simply run a compaction job with Oozie after some date cutoff for events.

This process worked very well until a couple days ago when some network issues and some bugs caused OOM errors which resulted in our Flume agent sporadically loosing all events buffered in memory. Fortunately, we persist our Kafka topic of important logs for about a month so we can just replay it then merge and dedupe the data in Hive. Not only that, but we have a lot of servers in Cloudera Manager so we can just add our backfilling Flume agent role to a bunch of them and we’ll be done in no time. #winning

Alas, the dream was not to be. Because we must write to many partitions at once sending events to the Kite Dataset sink with so many Flume agents caused our Hive Metastore to become unstable not only limiting the rate of our ingestion but also causing numerous query failures for our growth explorers. Sure, we could have just slowed ingestion down but being snake people that just won’t do. After all we have the technology; we can make our ETL…

Better… Stronger… Faster

With what we have learned about our first ETL pipeline we’ve decided to rewrite it but this time with an eye on durability and ingestion performance as well. We kept our custom log parser / aggregator / Kafka emitter as it’s doing an admirable job. In addition, we decided to stick with Flume (and its Kafka source) for our ETL process because of its ease of use and customizability.

We did not like the fact that when a Flume agent crashed it would just drop events in the memory channel on the floor so to make our process more durable we opted to use Flume’s Kafka channel instead. This allowed us to multiplex and publish records after they have been transformed by the Morphline interceptor to Kafka so that they are persisted when the Flume Agent dies. Doing this comes at the expense of possible duplicate events being emitted when anomalies happen however we figure it’s far easier to dedupe with Hive queries then it is to recreate missing data.

To make writes more performant we reconfigured Morphline to convert our events to Avro records with our predefined schemas and then serialize and compress those records making them ready for writing. Once the raw Avro byte array has been generated we multiplex it into the proper Kafka channel. Finally we use the HDFS sink to pull these events from Kafka in batches and write the raw Avro byte arrays to HDFS partitioned by when the record was written. We do this directly without touching Hive or its Metastore.

By partitioning on when the record was written we can ensure we only write to a single partition at a time. This results in fewer and larger files which is not only more performant but it also gives us the ability to know when partitions can be considered complete. With this knowledge we can now have Oozie jobs that merge, dedupe, and compact events from the intermediate table partitioned by the time an event is recorded into another table partitioned by when events occurred for optimal query performance.

Show Me The Code

So what does this look like? Here’s a simplified flume.conf example:

[code lang=”text”]
#
# Setup some names
#
agent.sources = sr-kafka
agent.channels = ch-kafka-type1 ch-kafka-type2
agent.sinks = sk-hdfs-type1 sk-hdfs-type2

#
# Configure same Kafka source for all channels
#
agent.sources.sr-kafka.channels = ch-kafka-type1 ch-kafka-type2
agent.sources.sr-kafka.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.sr-kafka.zookeeperConnect = HOST1:PORT,HOST2:PORT,HOST3:PORT/PATH
agent.sources.sr-kafka.groupId = flume_source_20150712
agent.sources.sr-kafka.topic = kafka-topic
# Grabs in batches of 500 or every second
agent.sources.sr-kafka.batchSize = 500
agent.sources.sr-kafka.batchDurationMillis = 1000
# Read from start of topic
agent.sources.sr-kafka.kafka.auto.offset.reset = smallest

#
# Configure interceptors
#
agent.sources.sr-kafka.interceptors = in-morphline-etl in-host-set

agent.sources.sr-kafka.interceptors.in-morphline-etl.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
agent.sources.sr-kafka.interceptors.in-morphline-etl.morphlineFile = /path/to/morphline.conf
agent.sources.sr-kafka.interceptors.in-morphline-etl.morphlineId = morphline_id

agent.sources.sr-kafka.interceptors.in-host-set.type = host
agent.sources.sr-kafka.interceptors.in-host-set.useIP = false
agent.sources.sr-kafka.interceptors.in-host-set.hostHeader = flume_host

#
# Multiplex our records into channels based on the value of `eventmarker` which comes from Morphline
#
agent.sources.sr-kafka.selector.type = multiplexing
agent.sources.sr-kafka.selector.header = eventmarker
agent.sources.sr-kafka.selector.default = ch-kafka-type1
agent.sources.sr-kafka.selector.mapping.type1 = ch-kafka-type1
agent.sources.sr-kafka.selector.mapping.type2 = ch-kafka-type2
agent.sources.sr-kafka.selector.mapping.type3 = ch-kafka-type1 ch-kafka-type2

#
# Configure the channels we multiplexed into
#
agent.channels.ch-kafka-type1.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.ch-kafka-type1.brokerList = HOST1:PORT,HOST2:PORT,HOST3:PORT
agent.channels.ch-kafka-type1.zookeeperConnect = HOST1:PORT,HOST2:PORT,HOST3:PORT/PATH
agent.channels.ch-kafka-type1.groupId = flume_channel_20150712
agent.channels.ch-kafka-type1.topic = kafka-topic-flume-type1

agent.channels.ch-kafka-type2.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.ch-kafka-type2.brokerList = HOST1:PORT,HOST2:PORT,HOST3:PORT
agent.channels.ch-kafka-type2.zookeeperConnect = HOST1:PORT,HOST2:PORT,HOST3:PORT/PATH
agent.channels.ch-kafka-type2.groupId = flume_channel_20150712
agent.channels.ch-kafka-type2.topic = kafka-topic-flume-type2

#
# Configure sinks; We pull from Kafka in batches and write large files into HDFS.
#
agent.sinks.sk-hdfs-type1.channel = ch-kafka-type1
agent.sinks.sk-hdfs-type1.type = hdfs
agent.sinks.sk-hdfs-type1.hdfs.path = hdfs://path/to/database/etl_type1/record_ymdh=%Y%m%d%H
# Prefix files with the Flume agent's hostname so we can run multiple agents without collision
agent.sinks.sk-hdfs-type1.hdfs.filePrefix = %{flume_host}
# Hive needs files to end in .avro
agent.sinks.sk-hdfs-type1.hdfs.fileSuffix = .avro
# Roll files in HDFS every 5 min or at 255MB; don't roll based on number of records
# We roll at 255MB because our block size is 128MB, we want 2 full blocks without going over
agent.sinks.sk-hdfs-type1.hdfs.rollInterval = 300
agent.sinks.sk-hdfs-type1.hdfs.rollSize = 267386880
agent.sinks.sk-hdfs-type1.hdfs.rollCount = 0
# Write to HDFS file in batches of 500 records
agent.sinks.sk-hdfs-type1.hdfs.batchSize = 500
# We already serialized and encoded the record into Avro in Morphline so just write the byte array
agent.sinks.sk-hdfs-type1.hdfs.fileType = DataStream
# Give us a higher timeout because we are writing in batch
agent.sinks.sk-hdfs-type1.hdfs.callTimeout = 60000
# Use current time in UTC for the value of `record_ymdh=%Y%m%d%H` above
agent.sinks.sk-hdfs-type1.hdfs.timeZone = UTC
agent.sinks.sk-hdfs-type1.hdfs.useLocalTimeStamp = true
# Our record is serialized via Avro
agent.sinks.sk-hdfs-type1.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder

agent.sinks.sk-hdfs-type2.channel = ch-kafka-type2
agent.sinks.sk-hdfs-type2.type = hdfs
agent.sinks.sk-hdfs-type2.hdfs.path = hdfs://path/to/database/etl_type2/record_ymdh=%Y%m%d%H
agent.sinks.sk-hdfs-type2.hdfs.filePrefix = %{flume_host}
agent.sinks.sk-hdfs-type2.hdfs.fileSuffix = .avro
agent.sinks.sk-hdfs-type2.hdfs.rollInterval = 300
agent.sinks.sk-hdfs-type2.hdfs.rollSize = 267386880
agent.sinks.sk-hdfs-type2.hdfs.rollCount = 0
agent.sinks.sk-hdfs-type2.hdfs.batchSize = 500
agent.sinks.sk-hdfs-type2.hdfs.fileType = DataStream
agent.sinks.sk-hdfs-type2.hdfs.callTimeout = 60000
agent.sinks.sk-hdfs-type2.hdfs.timeZone = UTC
agent.sinks.sk-hdfs-type2.hdfs.useLocalTimeStamp = true
agent.sinks.sk-hdfs-type2.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
[/code]

The most important part of the above is that we set the HDFS sink use the Avro serializer and instruct that it should simply write the raw bytes as we've already serialized the Avro record and compressed it with Morphline. Speaking of which, here's our example morphline.conf:

[code lang=”text”]
morphlines : [
{
id : morphline_id

# Import the Kite SDK and any custom libs you may have and need
importCommands : [
"org.kitesdk.**",
"com.a8c.**"
]

commands : [
# Each command consumes the output record of the previous command
# and pipes another record downstream.

{
# Parse input attachment and emit a record for each input line
readLine {
charset : UTF-8
}
}

{
# More commands for your ETL process
}

{
# Say we set a field named `eventmarker` somewhere above to indicate the
# type of record this is and we have a different schemas
if {
conditions : [
{ equals { eventmarker : "type1" } }
]
then : [

{
# Set the schema for the Flume HDFS sink
setValues {
flume.avro.schema.url : "file:/path/to/schema/type1.avsc"
}
}

{
# Converts this to an Avro record according to schema
toAvro {
schemaFile : /path/to/schema/type1.avsc
}
}

]
else : [

{
setValues {
flume.avro.schema.url : "file:/path/to/schema/type2.avsc"
}
}

{
toAvro {
schemaFile : /path/to/schema/type2.avsc
}
}

]
}
}

{
# Serialize the Avro record into a byte array, compressed with snappy
writeAvroToByteArray : {
format : containerlessBinary
codec : snappy
}
}

]
}
]
[/code]

With these configs Flume will write compressed Avro files directly to HDFS but we will need to let Hive know about where to look so we need to create the table in Hive.

[code lang=”text”]
— Table name
CREATE TABLE IF NOT EXISTS etl_type1

— We need to specify how we are partitioning this table with the Flume HDFS sink
PARTITIONED BY ( record_ymdh INT )

— Files were written in Avro!
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

— We are writing to this dir in HDFS from Flume
LOCATION 'hdfs://path/to/database/etl_type1'

— We also store the Avro schema in a hidden dir on HDFS for convenience
TBLPROPERTIES (
'avro.schema.url'='hdfs://path/to/database/etl_type1/.schema/type1.avsc'
);
[/code]

For convenience I also stored the Avro schema for the table in the .schema directory on HDFS but that schema can really be anywhere readable by Hive.

Of course as we ingest data the Flume HDFS sink with start creating new directories, a.k.a. partitions, in HDFS but Hive will know nothing about them. So you will need to let Hive repair its Metastore by scanning HDFS before you can query for new data:

[code lang=”text”]
MSCK REPAIR TABLE etl_type1;

SELECT *
FROM etl_type1
WHERE …;
[/code]

What’s Next?

We have not really pushed this new pipeline to see where the limits are however as I write this we are on track to ingest a month of data in less then 12 hours. In addition, scaling this pipeline by simply spinning up more Flume agents has thus far been linear. The one down side is that for the most up to date information we will now need to look at 2 separate Hive tables with different partition strategies making queries a bit more complicated.

We have in effect made what I like to call the “Iota Architecture” — a system that’s 1/3 of the way to a true lambda architecture. We currently have a system that emits a stream of events that can be read in batch or by a stream processor but we only have a batch process in place to allow for performant queries on “archival” data. Perhaps someday we’ll get the other 2/3 in place for our growth explorers to easily get a unified view.


Leave a Reply

Your email address will not be published. Required fields are marked *