Category: DevOps

Upgrade Railway Tracks Under a Moving Train

Outbrain has quite some relations with Cassandra. We have around 30 production clusters of
different sizes, ranging from small ones to cluster with 100 nodes across 3 datacenters.

Background

Cassandra has proven to be a very reliable choice as a datastore which employs an eventual consistency model. We used the Datastax Enterprise 4.8.9 distribution (which is equivalent to Cassandra 2.1) and as Cassandra 3 evolved clearly at some point we had to plan out an upgrade. We ended up with two options, either upgrade the commercial distribution version provided by Datastax or migrate to the Apache Cassandra distribution. Since Outbrain uses Cassandra extensively it was an important decision to make.
At first, we identified that we do not use any of the Datastax enterprise features and we were quite happy with the basics that are still offered by Apache Cassandra. So, at this point, we decided to give Apache Cassandra 3.1.11 a try.
But when planning an upgrade aside from the necessity to stay on the supported version there are always some goals, like leveraging a new feature and seeking for performance improvement. Our main goal was to get more from the hardware that we have and potentially reduce the number of nodes per cluster.

In addition, we wanted to have in-place upgrade which means that instead of building new clusters we could perform the migration in the production clusters themselves while they are still running, it is like upgrading the railway tracks under a moving train and we are talking about the very big train moving in high speed.

The POC

But before defining the in-place upgrade we needed to validate that the Apache Cassandra distribution fit to our use cases, and for that, we performed a POC. After selecting one of our clusters that will be used as a POC we build a new Apache Cassandra 3.1.11 cluster, following the new storage format and hence read path optimisations in Cassandra 3 we started with a small cluster of 3 nodes in order to see if we can benefit from this feature for reducing the cluster size – that is if we could achieve acceptable read and write latencies with fewer nodes.
The plan was to copy data from the old cluster using sstableloader – while the application would employ dual writes approach before we start the copying. This way we would have two clusters in sync, and we could direct our application to do some/all reads from the new cluster.
And then the problems started… at first it became clear that with our multiple datacenter’s configurations we do not want sstableloader to stream data to foreign data centers, we wanted to be able to control the target datacenter for the stream processing, we reported a bug and instead of waiting for a new release of Apache Cassandra we build our own custom build.
After passing this barrier we reached another one that was related to LeveledCompactionStrategy (LCS) and how Cassandra performs compactions. With a cluster of 3 nodes and 600GB of data in a table with LCS it took few weeks to put data into the right levels. Even with 6 nodes it took days to complete compactions following sstablesloader data load. That was a real pity, because read and write latencies were quite good even with a fewer number of nodes. Luckily Cassandra has a feature called multiple data directories. At first it does look like something to do with allowing more data but this feature actually helps enormously with LCS. So once we tried it and played with an actual number of data directories we identified that we can compact 600GB of data on L0 in less than a day with three data directories.

Encouraged with these findings we conducted various tests on our Cassandra 3.1 cluster and realised that not only we can benefit from storage space savings and faster read path – but also potentially put more data per node for clusters where we use LCS.

Storage before and after the migration

The plan

The conclusions were very good and indicated that we are ok with Apache Cassandra distribution and we can start preparing our migration plan using in-place upgrades. At the high level the
migration steps should be:

  1. Upgrade to the latest DSE 4.x version (4.8.14)
  2. Upgrade to DSE 5.0.14 (Cassandra 3)
  3. Upgrade sstables
  4. Upgrade to DSE 5.1.4 (Cassandra 3.11)
  5. Replace DSE with Apache Cassandra 3.11.1

After defining the plan, we were ready to start migrating our clusters. We mapped our clusters and categorised them according to if they are used for online serving activities or for offline activities.
Then we were good to go and started to perform the upgrade cluster by cluster.

Upgrade to the latest DSE 4.x version (4.8.14)

The first upgrade to DSE 4.8.14 (we had 4.8.9) is a minor upgrade. The reason this step is needed at all because DSE 4.8.14 includes all the important fixes to co-exist with Cassandra 3.0 during upgrade and our upgrade experience tells that indeed DSE 4.8.14 and DSE 5.0.10 (Cassandra 3.0.14) co-existed very well. The way it should be done is to upgrade nodes in a rolling fashion upgrading one node at a time. One node at a time applies to a datacenter only, datacenters could be processed in parallel (which is what we actually did).

Upgrade to DSE 5.0.14 (Cassandra 3)

Even if you aim to use DSE 5.1 you cannot really jump to it from 4.x, so 4.x  – 5.0.x – 5.1.x chain is mandatory. The steps and pre-requisites on upgrade to DSE 5.0 are documented quite well by Datastax . What is probably not documented that well is which driver version to use and how to configure it so that application smoothly works both with DSE 4.x and DSE 5.x nodes during upgrade. What we did is that we instructed our developers to use cassandra-driver-core version 3.3.0 and enforce the protocol version 3 to be used so that the driver can successfully talk to both DSE 4.x and DSE 5.x nodes. Another thing to make sure is that before starting the upgrade you should ensure that all your nodes are healthy. Why? Because one of the most important limitations about clusters running both DSE 4.x and 5.x is the fact that you cannot stream data, which means that bootstrapping a node during node replacement is simply not possible. However, for clusters where we had SLAs on latencies we wanted more confidence and for those clusters what we did was that in one datacenter we upgraded a single node to DSE 5.0.x. This is 100% safe operation, since it’s the first node with DSE 5.0.x it can be replaced using the regular node replacement procedure with DSE 4.x. One very important point here is that DSE 5.0 does not really force any schema migration till all nodes in the cluster are running DSE 5.0.x. What this means is that system keyspaces remain untouched and this is what guarantees that DSE 5.0.x node can be replaced with DSE 4.x. Since we have 2 kinds of clusters (serving and offline) with different SLA we took 2 different approaches, for the serving clusters we upgraded the datacenters sequentially leaving some datacenter running DSE 4.x, we did it because if something goes really wrong, we still would have a healthy datacenter with DSE 4.x running. And for the offline clusters we upgraded all the datacenters in parallel, both approaches worked quite well.

Upgrade sstables

Whether clusters were upgraded in parallel or sequentially we always wanted to start sstablesupgrade as quick as possible after DSE 5.0 was installed. Sstables upgrade is the most important phase of upgrade because it’s most time consuming. There are not so many knobs to tune:

  • control number of compaction threads with -j option for the nodetool upgradesstables command.
  • control the compaction throughput with nodetool setcompactionthroguhput.

Whether -j would allow to speed up sstables upgrade depends a lot on how many column families you have, how big they are and what kind of compaction strategy is configured. As for the compaction throughput we determined that for our storage system it was ok to set it to 200 MB/sec to allow regular read/write operation to be carried out at reasonable higher percentile latencies, but for sure this has to be identified for each storage system individually.

Upgrade to DSE 5.1.4 (Cassandra 3.11)

In theory, upgrade from DSE 5.0.x to DSE 5.1.x is a minor one, just need to upgrade the nodes in a rolling fashion exactly the same way as minor upgrades were done within DSE 4.x, storage format stays the same and no sstablesupgrade is required. But when upgrading our first reasonably large cluster (in terms of number of nodes) we noticed something really bad, nodes running DSE 5.1.x suffered from never-ending compactions in the system keyspace which killed their performance while DSE 5.0.x were running well. For us, it was a tough moment because that was the serving cluster with SLAs and we did not notice anything like that on our previous clusters upgrades! We suspected that something is potentially wrong with schema migrations mechanism and the command nodetool describecluster confirmed that there are two different schema versions, one from 5.0.x nodes and one from 5.1.x nodes. So, we decided to speed up DSE 5.1.x upgrade under assumption that once all nodes are 5.1.x there would no issues with different schema versions and cluster would settle. This indeed happened.

Replace DSE with Apache Cassandra 3.11.1

So how was the final migration step of replacement of DSE 5.1.4 with Apache Cassandra? In the end it worked well across all our clusters. Basically, DSE packages have to be uninstalled, Apache Cassandra has to be installed and pointed to existing data and commit_log directories.

However, two extra steps were required:

  • DSE creates dse-specific keyspaces (e.g. dse_system). This keyspace is created with EverywhereStrategy which is not supported by the Apache Cassandra. So before replacing DSE with Apache Cassandra on a given node the keyspace replication strategy has to be changed.
  • There are few system column families that receive extra columns in DSE deployments (system_peers and system_local). So, if you attempt to start Apache Cassandra after DSE is removed – it will fail on schema check against these column families. There are two ways this could be worked around – the first option is to delete all system keyspaces, set auto_bootstrap to false and also set the allow_unsafe_join JVM option to true – assuming the node’s ip address stays the same Cassandra will re-create system keyspaces and jump into the ring. However we used alternative approach – we had a bunch of scripts which first exported these tables with extra columns to json – and then we imported them on a helper cluster which had the same schema as system tables do in Apache Cassandra. Then we imported data from json files and copied the resultant sstables to the node being upgraded

Minor version, big impact

So Cassandra 3.11 clearly has vast improvements in terms of storage format efficiency and better read latency. However even within a certain Cassandra major version a minor version can introduce a bug which would heavily affect performance. So the latency improvements were achieved on Cassandra 3.11.1. But this very specific version introduced an unpleasant bug (CASSANDRA-14010, fix sstable ordering by max timestamp in singlePartitionReadCommand)

This bug was introduced during refactoring – and led to that for tables with STCS almost all sstables were queried for every read. Typically Cassandra sorts sstables by a timestamp in a descending order – and it starts answering a query by looking at the most recent sstables until it’s able to fulfil a query (with some restrictions, like for counters or unfrozen collections). The bug however made sstables to be sorted in the ascending order. It was fixed in 3.11.2 – and here is the impact – number of sstables per read for 99th percentile has dropped after the upgrade. Reading less sstables for sure leads to lower local read latencies.

 

Drop in sstables per read

Local read latency drop on Cassandra 3.11.2

Always mind page cache

On one of our newly built Cassandra 3.11.2 clusters we noticed unexpectedly high local read latencies. This came as a surprise because this cluster was specifically dedicated to a table with LCS. We spent quite some time in blktrace/blkparse/btt to confirm that our storage by itself is performing well. However we identified later that high latency is only subject for 99th and 98th percentiles – while from btt output it became clear that there write IO bursts which result in a very large blocks thanks to a scheduler merge policy. Next step was to exclude compactions – so even with compactions disabled we still had these write bursts. Looking further it became evident that page cache defaults were different from what we typically had – and the culprit is vm.dirty_ratio which was set to 20%. With 256GB of RAM flushing 20% of it certainly would result in IO burst which would affect reads. So after tuning page cache (we opted for vm.dirty_bytes and vm.dirty_background_bytes instead of vm.dirty_ratio).

Page cache flush settings impact on local read latency

The results

So, in the end – what were our gains from Apache Cassandra 3.11?
Well aside from any new features we got lots of performance improvements, it’s a major storage savings which immediately transform into lower read latencies because more data fits into OS cache which improved a lot the clusters’ performance.

 

In addition, the newer storage format results in lower JVM pressure during reads (object creation ratio decreasing 1.5 – 2 times). The following image describe the decrease in the read latency in our datacenters (NY, CHI & SA) during load tests (LT) that we performed before and after the migration

Read latency before and after the migration

Another important aspect are new metrics exposed. Few interesting ones:

  • Ability to identify consistency level requested for both reads and writes. This can be useful to track if occasionally the consistency level requested is not what would be expected for a given column family. For example, if QUORUM is requested instead of LOCAL_QUORUM due to a bug in app configuration.
  • Metrics on the network messaging.

Read metrics before the migration

Read metrics after the migration

We did something that lots of people thought that it’s impossible, to perform such in-place migration in running production clusters. Although there were some bumps in the way, it’s totally possible to conduct an in-place upgrade of DSE 4.8.x to Apache Cassandra 3.11, while maintaining acceptable latencies and avoiding any cluster downtime.

Spark in framework

Making Spark Native for Data Processing Framework

Working in an infrastructure team is different from working in a development team. The team develops tools that are used by developers’ teams inside Outbrain. It is very important to provide a convenient way for working with new technologies. Having a Spark cluster enables running distributed data processing in scale. Until now developers worked with Spark writing ad-hoc crontab jobs in production. It created a messy and unstable environment and required manual maintenance in case of failure. For that reason, our team wanted to give an infrastructural way for working with Spark.

In this blog post, I explain how we built the solution implementing Spark to be a native tool in our WorkflowEngine.

Meet WorkflowEngine

The Outbrain Data Infrastructure team enables running hundreds of ETL jobs every hour. These jobs are orchestrated by WorkflowEngine – an internal tool, which is developed by the team.

The WorkflowEngine (WFE) enables running various types of transformations from many kinds of data sources: MySQL, Cassandra, Hive, Vertica, Hadoop etc., located in two data centers and the cloud. Every transformation, or flow, implements some business logic. A flow may contain several tasks, which are executed one after another.

Flows are developed and owned by researchers, data scientists and developers in other groups at Outbrain, such as  Algorithm research groups, BI and Data Science.

input Trigger

Defining flows in WFE gives users a lot of benefits. It enables users to autonomously specify dependencies between flows, so when one flow ends, another is triggered. In addition, users can define a number of retries for their flows, so if a flow fails for some reason, WFE reruns it. If all retries fail, WFE sends an email or alert about the flow failures. Logs for each running flow are shown in a convenient form in WFE UI. Furthermore, WFE collects metrics for all flows, which enables us to track the health of the system.

From the operational perspective, several instances of WFE are deployed on machines which are owned by the infrastructure team, users just define ETL flows in a separate repository.

Diving Into the Solution

In order to respond to requirements for running data analytics on Spark, our team had to allow an infrastructural way for running Spark jobs orchestrated by WFE. To achieve this, we’ve developed a new kind of WorkflowEngine task called SparkStep.

When approaching the design for SparkStep, we had several principals in mind:

  1. Avoid producing additional load on WFE machines
  2. Create an isolated environment for every Spark job
  3. Allow users to specify and at the same time allow the system to limit the number of YARN resources allocated for every SPARK job
  4. Enable using multiple Spark clusters with different versions

Taking all this into consideration, we came up with the following architecture:

 

Keeping the control while having fun

The Spark job definition specifies how a  job will run on the Spark cluster (e.g. jarURL, className, configuration parameters). In addition, there are properties that are set internally by WFE. When WFE starts executing a SPARK job, it builds a spark-submit command using all these properties.

The spark-submit command submits the Spark job to YARN, which allocates needed resources and launches the application. After submitting the job, WFE constantly checks its status using YARN REST API. When the job is completed in YARN, WFE reports a COMPLETED status for the WFE Spark task and continues running the next task. However, if the Spark job fails in YARN, WFE reports a FAILED status for the WFE job. In this case, WFE can rerun the job, according to the defined number of retries. If the job runs in YARN for too long, and its running time exceeds the defined maximum, WFE stops the job using the YARN REST API. In this case, the job is marked as FAILED in WFE. This functionality ensures both that we have a cap on the amount of resources each flow can use and that resources are saved when a job is canceled for any reason.

The spark-submit command runs in a Docker container. The Docker exits once the job is submitted to the cluster. It enables us to create an isolated environment for every Spark job executed by WFE. In addition, using Docker container, we can easily use different Spark versions simultaneously, by creating different Docker images.

By using this architecture we achieve all our goals. Spark jobs are submitted to YARN in a “cluster” mode, so they do not create a load on WFE machines. YARN handles all resources for each job. Every Spark job runs in an isolated environment owing to the Docker container.

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 3

Previously on our second episode of the trilogy  “Hadoop Research Journey from bare metal to Google Cloud – Episode 2”, we covered the POC we had.

In this episode we will focus on the migration itself, building a POC environment is all nice and easy, however migrating 2 PB (the raw part out of 6 PB which include the replication) of data turned to be a new challenge. But before we jump into technical issues, lets start with the methodology.

The big migration

We learned from our past experience that in order for such a project to be successful, like in many other cases, it is all about the people – you need to be minded to the users and make sure you have their buy-in.

On top of that, we wanted to complete the migration within 4 months, as we had a renewal of our datacenter space coming up, and we wanted to gain from the space reduction as result of the migration.

Taking those two considerations in mind, we decided that we will have the same technologies which are Hadoop and Hive on the cloud environment, and only after the migration is done we would look into leveraging new technologies available on GCP.

Now after the decision was made we started to plan the migration of the research cluster to GCP, looking into different aspects as:

  • Build the network topology (VPN, VPC etc.)
  • Copy the historical data
  • Create the data schema (Hive)
  • Enable the runtime data delivery
  • Integrate our internal systems (monitoring, alerts, provision etc.)
  • Migrate the workflows
  • Reap the bare metal cluster (with all its supporting systems)

All in the purpose of productizing the solution and making it production grade, based on our standards. We made a special effort to leverage the same management and configuration control tools we use in our internal datacenters (such as Chef, Prometheus etc.) – so we would treat this environment as yet just another datacenter.

Copying the data

Sound like a straightforward activity – you need to copy your data from location A to location B.

Well, turns out that when you need to copy 2 PB of data, while the system is still active in production, there are some challenges involved.

The first restriction we had, was that the copy of data will not impact the usage of the cluster – as the research work still need to be performed.

Second, once data is copied, we also need to have data validation.

 

Starting with data copy

  • Option 1 – Copy the data using Google Transfer Appliance

Google can ship their transfer appliance (based on the location of your datacenter), that you would attach to the Hadoop Cluster and be used to copy the data. Ship it back to Google and download the data from the appliance to GCS.

Unfortunately, from the capacity perspective we would need to have several iterations of this process in order to copy all the data, and on top of that the Cloudera community version we were using was so old – it was not supported.

  • Option 2 – Copy the data over the network

When taking that path, the main restriction is that the network is used for both the production environment (serving) and for the copy, and we could not allow the copy to create network congestion on the lines.

However, if we restrict the copy process, the time it would take to copy all the data will be too long and we will not be able to meet our timelines.

Setup the network

As part of our network infrastructure, per datacenter we have 2 ISPs, each with 2 x 10G lines for backup and redundancy.

We decided to leverage those backup lines and build a tunnel on those lines, to be dedicated only to the Hadoop data copy. This enabled us to copy the data in relatively short time on one hand, and assure that it will not impact our production traffic as it was contained to specific lines.

Once the network was ready we started to copy the data to the GCS.

As you may remember from previous episodes, our cluster was set up over 6 years ago, and as such acquired a lot of tech debt around it, also in the data kept in it. We decided to take advantage of the situation and leverage the migration also to do some data and workload cleanup.

We invested time in mapping what data we need and what data can be cleared, although it didn’t significantly reduce the data size we managed to delete 80% of the tables, we also managed to delete 80% of the workload.

Data validation

As we migrated the data, we had to have data validation, making sure there are no corruptions / missing data.

More challenges on the data validation aspects to take into consideration –

  • The migrated cluster is a live cluster – so new data keeps been added to it and old data deleted
  • With our internal Hadoop cluster, all tables are stored as files while on GCS they are stored as objects.

It was clear that we need to automate the process of data validation and build dashboards to help us monitor our progress.

We ended up implementing a process that creates two catalogs, one for the bare metal internal Hadoop cluster and one for the GCP environment, comparing those catalogs and alerting us to any differences.

This dashboard shows per table the files difference between the bare metal cluster and the cloud

 

In parallel to the data migration, we worked on building the Hadoop ecosystem on GCP, including the tables schemas with their partitions in Hive, our runtime data delivery systems adding new data to the GCP environment in parallel to the internal bare metal Hadoop cluster, our monitoring systems, data retention systems etc.

The new environment on GCP was finally ready and we were ready to migrate the workloads. Initially, we duplicated jobs to run in parallel on both clusters, making sure we complete validation and will not impact production work.

After a month of validation, parallel work and required adjustments we were able to decommission the in-house Research Cluster.

What we achieved in this journey

  • Upgraded the technology
  • Improve the utilization and gain the required elasticity we wanted
  • Reduced the total cost
  • Introduced new GCP tools and technologies

Epilogue

This amazing journey lasted for almost 6 months of focused work. As planned the first step was to use the same technologies that we had in the bare metal cluster but once we finished the migration to GCP we can now start planning how to further take advantage of the new opportunities that arise from leveraging GCP technologies and tools.

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 2

Previously on our first episode of the trilogy  “Hadoop Research Journey from bare metal to Google Cloud – Episode 1”, we covered our challenges.

In this episode, I am looking to focus on the POC that we did in order to decide whether we should rebuild the Research cluster in-house or migrate it to the cloud.

The POC

As we had many open questions around migration to the cloud, we decided to do a learning POC, focusing on 3 main questions:

  1. Understand the learning curve that will be required from the users
  2. Compatibility with our in-house Online Hadoop clusters
  3. Estimate cost for running the Research cluster in the Cloud

However, before jumping into the water of the POC, we had some preliminary work to be done.

Mapping workloads

As the Research cluster was running for over 6 years already, there were many different use cases running on it. Some of which are well known and familiar to users, but some are old tech debts which no one knew if needed or not, and what is their value.

We started with mapping all the flows and use cases running on the cluster, mapped users and assigned owners to the different workflows.

We also created distinction between ad-hoc queries and batch processing.

Mapping technologies

We mapped all the technologies we need to support on the Research cluster in order to assure full compatibility with our Online clusters and in-house environment.

After collecting all the required information regarding the use cases and mapping the technologies we selected representative workflows and users to participate in the POC and take active part in it, collecting their feedback regarding the learning curve and ease of use. This approach will also serve us well later on, if we decide to move forward with the migration, having in house ambassadors.

Once we mapped all our needs, it was also easier to get from the different cloud vendors high level cost estimation, to give us a general indication if it makes sense for us to continue and invest time and resources in doing the POC.

 

We wanted to complete the POC within 1 month, so on one hand it will run long enough to cover all types of jobs, but on the other hand it will not be prolonged.

For the POC environment we built Hadoop cluster, based on standard technologies.

We decided not to leverage at this point special proprietary vendor technologies, as we wanted to reduce the learning curve and were careful not to get into a vendor lock-in.

 

In addition, we decided to start the POC only with one vendor, and not to run it on multiple cloud vendors.

The reason behind it was our mindfulness to our internal resources and time constraints.

We did theoretical evaluation of technology roadmap and cost for several Cloud vendors, and choose to go with GCP option, looking to also leverage BigQuery in the future (once all our data will be migrated).

The execution

Once we decided on the vendor, technologies and use cases we were good to go.

For the purpose of the POC we migrated 500TB of our data, build the Hadoop cluster based on Data Proc, and build the required endpoint machines.

Needless to say, that already in this stage we had to create the network infrastructure to support the secure work of the hybrid environment between GCP and our internal datacenters.

Now that everything was ready we started the actual POC from the users perspective. For a period of one month the participate users will perform their use cases twice. Once on the in-house Research cluster (the production environment), and second time on the Research cluster build on GCP (the POC environment). The users were required to record their experience, which was measured according to the flowing criteria:

  • Compatibility (did the test run seamlessly, any modifications to code and queries required, etc.)
  • Performance (execution time, amount of resources used)
  • Ease of use

During the month of the POC we worked closely with the users, gathered their overall experience and results.

In addition, we documented the compute power needed to execute those jobs, which enabled us to do better cost estimation for how much it would cost to run the full Research Cluster on the cloud.

The POC was successful

The users had a good experience, and our cost analysis proved that with leveraging the cloud elasticity, which in this scenario was very significant, the cloud option would be ROI positive compared with the investment we would need to do building the environment internally. (without getting into the exact numbers – over 40% cheaper, which is a nice incentive!)

With that we started our last phase – the actual migration, which is the focus of our last episode in “Hadoop Research Journey from Bare Metal to Google Cloud – Episode 3”. Stay tuned!

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 1

Outbrain is the world’s leading discovery platform, serving over 250 billion personal recommendations per month. In order to provide premium recommendations at such a scale, we leverage capabilities in analyzing a large amount of data. We use a variety of data stores and technologies such as MySql, Cassandra, Elasticsearch, and Vertica, however in this post trilogy (all things can be split to 3…) I would like to focus on our Hadoop eco-system and our journey from pure bare metal into a hybrid cloud solution.

Hadoop Research Journey from Bare Metal to Google Cloud

The bare metal period

In a nutshell, we keep two flavors of Hadoop clusters:

  • Online clusters, used for online serving activities. Those clusters are relatively small (2 PB of data per cluster) and are kept in our datacenters on bare metal clusters, as part of our serving infrastructure.
  • Research cluster, surprisingly, used mainly for research and offline activities. This cluster keeps large amount of data (6 PB), and by nature the workload on this cluster is elastic. Most of the time it was not utilized, but there were times of peaks when there was a need to query huge amount of data.

History lesson

Before we move forward in our tale, it may be worthwhile to spend a few words about the history.

We first started to use the Hadoop technology at Outbrain over 6 years ago – starting as a technical small experiment. As our business rapidly grow, so did the data, and the clusters were adjusted in size, however a tech debt had been built up around it. We continued to grow the clusters, based on scale out methodology, and after some time, found ourselves with clusters running old Hadoop version, not being able to support new technologies, build from hundreds of servers, some of which are very old.

We decided we need to stop being fire fighters, and to get super proactive about the issue. We first took care of the Online clusters, and migrated them to a new in-house bare metal solution (you can read more about on this in the Migrating Elephants post on Outbrain Tech Blog site)

Now it was time to move forward and deal with our Research cluster.

Research cluster starting point

Our starting point for the Research cluster was a cluster build out of 500 servers, holding about 6 PB of data, running CDH4 community version.

As mentioned before, the workload on this cluster is elastic – at times, requires a lot of compute power and most of the time fairly under utilized (see graph below).

Research cluster starting point

This graph shows the CPU utilization for 2 weeks, as it seen the usage is not constant, most of the time is barely used, with some periodic peaks

 

The cluster was unable to support new technologies (such as SPARK and ORC), which were already in use with the Online clusters, reducing our ability to use it for real research.

On top of that, some of the servers in this cluster were becoming very old, and as we grow the cluster on the fly, its storage:CPU:RAM ratio was suboptimal, causing us to waste expensive foot print in our datacenter.  

On top of all of the above, it caused so much frustration to the team!

We mapped our options moving forward:

  1. Do in-place upgrade to the Research cluster software
  2. Rebuild the research cluster from scratch on bare metal in our datacenters (similar to the project we did with the Online clusters)
  3. Leverage cloud technologies and migrate the research cluster to the Cloud.

The dilemma

Option #1 was dropped immediately since it answered only a fraction of our frustration at best. It did not address the old hardware issues, and it did not address our concerned regarding non optimal storage:CPU:RAM ratios – which we understood would only get worse when we come to use RAM intensive technologies such as SPARK.   

We had a dilemma between option #2 and option #3, both viable options with pros and cons.  

Building the Research cluster in house was a project we were very familiar with (we just finished our Online clusters migration), our users were very familiar with the technology, so no learning curve on this front. On the other hand, it required a big financial investment, and we were unable to leverage the elasticity to the extent we wanted.

Migrating to the cloud answered our elasticity needs, however presented a non-predictable cost model (something very important to the finance guys), and had many unknowns as it was new for us, and for the users that would need to work with the environment. It was clear that learning and education will be needed, but it was not clear as to how steep this learning curve would be.   

On top of that, we knew that we must have full compatibility between the Research cluster and the Online cluster, but it was hard for us to estimate the effort required to get there, and the number of processes that require data transition between the clusters.  

 

So, what do we do when we don’t know which option is better?

We study and experiment! And this is how we entered the 2nd period – the POC.

You are invited to read about the POC we did and how we did it on our next episode of “Hadoop Research Journey from Bare Metal to Google Cloud – Episode 2”.

Switches, Penguins and One Bad Cable

Back in May 2017, I was scheduled to speak at the DoTC conference in Melbourne. I was really excited and looking forward to it, but fate had different plans. And lots of them. From my son going through an emergency appendicitis operation, through flight delays, and up to an emergency landing back in Tel Aviv… I ended up missing the opportunity to speak at the conference. Amazingly, something similar happened this year! Maybe 3rd time’s a charm?

The post below is the talk I’d planned to give, converted to a blog format.


 

August 13, 2015. Outbrain’s ops on call is just getting out of his car when his phone rings. It’s a PagerDuty alert. Some kind of latency issue in the Chicago data center. He acks it, figuring he’ll unload the groceries first and then get round to it. But then, his phone rings again. And again.

Forget the groceries. Forget the barbecue. Production is on fire.

18 hours and many tired engineers later, we’re recovering from having lost our Chicago datacenter. In the takein that follows, we trace the root cause to a single network cable that’s mistakenly connected to the wrong switch.

Hi, my name is Alex, and I lead the Core Services group at Outbrain. Our group owns everything from the floor that hosts Outbrain’s servers, to the delivery pipelines that ship Outbrain’s code. If you’re here, you’ve likely heard of Outbrain. You probably know that we’re the world’s leading Discovery platform, and that you’ll find us installed on publisher sites like CNN, The Guardian, Time Inc and the Australian news.com, where we serve their readers with premium recommendations.

But it wasn’t always this way.

You see, back when we started, life was simple: all you had to do was throw a bunch of Linux servers in a rack, plug them into a switch, write some code… and sell it. And that we did!

But then, an amazing thing happened. The code that we wrote actually worked and customers started showing up. And they did the most spectacular and terrifying thing ever – they made us grow. One server rack turned into two and then three and four. And before we knew it, we had a whole bunch of racks, full of penguins plugged into switches. It wasn’t as simple as before, but it was manageable. Business was growing, and so were we.

Fast forward a few years.

We’re running quite a few racks across 2 datacenters. We’re not huge, but we’re not a tiny startup anymore. We have actual paying customers, and we have a service to keep up and running. Internally, we’re talking about things like scale, automation, and all that stuff. And we understand that the network is going to need some work. By now, we’ve reached the conclusion that managing a lot of switches is time-consuming, error-prone, and frankly, not all that interesting. We want to focus on other things, so we break the network challenge down to 2 main topics:

Management and Availability.

Fortunately, management doesn’t look like a very big problem. Instead of managing each switch independently, we go for a something called “a stack”. In essence, it turns 8 switches into one logical unit. At full density, it lets us treat 4 racks as a single logical switch. With 80 nodes per rack, that’s 320 nodes. Quite a bit of computes power!

Four of these setups – about 1200 nodes.

Across two datacenters? 2400 nodes. Easily 10x our size.

Now that’s very impressive, but what if something goes wrong? What if one of these stacks fails? Well, if the whole thing goes down, we lose all 320 nodes. Sure, there’s built-in redundancy for the stack’s master, and losing a non-master switch is far less painful, but even then, 40 nodes going down because of one switch? That’s a lot.

So we give it some thought and come up with a simple solution. Instead of using one of these units in each rack, we’ll use two. Each node will have a connection to stack A, and another to stack B. If stack A fails, we’ll still be able to go through stack B, and vice versa. Perfect!

In order to pull that off, we have to make these two separate stacks, which are actually two separate networks, somehow connect. Our solution to that is to set up bonding on the server side, making its two separate network interfaces look like a single, logical one. On the stack side, we connect everything to one big, happy, shared backbone. With its own redundant setup, of course.

In case you’re still keeping track of the math, you might notice that we just doubled the number of stacks per datacenter. But we still gained simple management And high availability at 10x scale. All this without having to invest in expensive, proprietary management solutions. Or even having to scale the team.

And so, it is decided. We build our glorious, stack-based topology. And the land has peace for 40 years. Or… months.

Fast forward 40 months.

We’re running quite a few racks across 3 datacenters. We’re serving customers like CNN, The Guardian, Time Inc and the Australian news.com. We reach over 500 million people worldwide, serving 250 billion recommendations a month.

We’re using Chef to automate our servers, with over 300 cookbooks and 1000 roles.

We’re practicing Continuous Delivery, with over 150 releases to production a day.

We’re managing petabytes of data in Hadoop, Elasticsearch, Mysql, Cassandra.

We’re generating over 6 million metrics every minute, have thousands of alerts and dozens of dashboards.

Infrastructure as Code is our religion. And as for our glorious network setup? it’s completely, fully, 100% … manual.

No, really. It’s the darkest, scariest part of our infrastructure.

I mean hey, don’t get me wrong, it’s working, it’s allowed us to scale to many thousands of nodes. But every change in the switches is risky because it’s done using the infamous “config management” called “copy-paste”.

The switching software stack and protocols are proprietary, especially the secret sauce that glues the stacks together. Which makes debugging issues a tiring back-and-forth with support at best, or more often just a blind hit-and-miss. The lead time to setting up a new stack is measured in weeks, with risk of creating network loops and bringing a whole datacenter down. Remember August 13th, 2015? We do.

Again, don’t get me wrong, it’s working, it’s allowed us to scale to many thousands of nodes. And it’s not like we babysit the solution on daily basis. But it’s definitely not Infrastructure as Code. And there’s no way it’s going to scale us to the next 10x.

Fast forward to June 2016.

We’re still running across 3 data centers, thousands of nodes. CNN, The Guardian, Time Inc, the Australian news.com. 500 million users. 250 billion recommendations. You get it.

But something is different.

We’re just bringing up a new datacenter, replacing the oldest of the three. And in it, we’re rolling out a new network topology. It’s called a Clos Fabric, and it’s running BGP end-to-end. It’s based on a design created by Charles Clos for analog telephony switches, back in the 50’s. And on the somewhat more recent RFCs, authored by Facebook, that bring the concept to IP networks.

In this setup, each node is connected to 2 top-of-rack switches, called leaves. And each leaf is connected to a bunch of end-of-row switches, called spines. But there’s no bonding here and no backbone. Instead, what glues this network together, is that fact that everything in it is a router. And I do mean everything – every switch, every server. They publish their IP addresses over all of their interfaces, essentially telling their neighbors, “Hi, I’m here, and you can reach me through these paths.” And since their neighbors are routers as well, they propagate that information.

Thus a map of all possible paths to all possible destinations is constructed, hop-by-hop, and held by each router in the network. Which, as I mentioned, is everyone. But it gets even better.

We’ve already mentioned that each node is connected to two leaf switches. And that each leaf is connected to a bunch of spines switches. It’s also worth mentioning that they’re not just “connected”. They’re wired the exact same way. Which means, that any path between two points in the network is the exact same distance. And what THAT means is that we can rely on something called ECMP. Which, in plain English, means “just send the packets down any available path, they’re all the same anyway”. And ECMP opens up interesting options for high availability and load distribution.

Let’s pause to consider some of the gains here:

First, this is a really simple setup. All the leaf switches are the same. And so are all of the spines. It doesn’t matter if you have one, two or thirty. And pretty much the same goes for cables. This greatly simplifies inventory, device and firmware management.

Second, it’s predictable. You know the exact amount of hops from any one node in the network to any other: It’s either two or four, no more, no less. Wiring is predictable as well. We know exactly what gets connected where, and what are the exact cable lengths, right from design phase. (spoiler alert:) We can even validate this in software.

Third, it’s dead easy to scale. When designing the fabric, you choose how many racks it’ll support, and at what oversubscription ratio. I’ll spare you the math and just say:

You want more bandwidth? Add more spines.

Support more racks? Go for spines with higher port density.

Finally, high availability is built into the solution. If a link goes down, BGP will make sure all routers are aware. And everything will still work the same way, because with our wiring scheme and ECMP, all paths are created equal. Take THAT evil bonding driver!

But it doesn’t end there. Scaling the pipes is only half the story. What about device management? The infamous copy-paste? Cable management? A single misconnected cable that could bring a whole datacenter down? What about those?

Glad you asked 🙂

After a long, thorough evaluation of multiple vendors, we chose Cumulus Networks as our switch Operating System vendor, and Dell as our switch hardware vendor. Much like you would with servers, by choosing Enterprise Redhat, Suse or Ubuntu. Or with mobile devices, by choosing Android. We chose a solution that decouples the switch OS from the hardware it’s running on. One that lets us select hardware from a list of certified vendors, like Dell, HP, Mellanox and others.

So now our switches run Cumulus Linux, allowing us use the very same tools that manage our fleet of servers, to now manage our fleet of switches. To apply the same open mindset in what was previously a closed, proprietary world.

In fact, when we designed the new datacenter, we wrote Chef cookbooks to automate provisioning and config. We wrote unit and integration tests using Chef’s toolchain and setup a CI pipeline for the code. We even simulated the entire datacenter, switches, servers and all, using Vagrant.

It worked so well, that bootstrapping the new datacenter took us just 5 days. Think about it:

the first time we ever saw a real Dell switch running Cumulus Linux was when we arrived on-site for the buildout. And yet, 99% of our code worked as expected. In 5 days, we were able to setup a LAN, VPN, server provisioning, DNS, LDAP and deal with some quirky BIOS configs. On the servers, mind you, not the switches.

We even hooked Cumulus’ built-in cabling validation to our Prometheus based monitoring system. So that right after we turned monitoring on, we got an alert. On one bad cable. Out of 3000.

Infrastructure as Code anyone?

 

Structure a Vue.js App from Containers and Components

Recently we’ve begun using Vue.js as a frontend framework for one of our infrastructure projects. We’ve contracted Dr. Yoram Kornatzky to join our Delivery team and dive together, headlong, into this brave new world.

In this blog post by Yoram, and others to come, we’ll share snippets from this journey. We hope you find these beneficial, and invite you to share your own experiences and feedback.


Vue.js using Vuex for state management does not have a clear distinction between containers and components. This is in clear contrast to React using Redux for state management.

We argue that such a distinction between containers components in beneficial for Vue.js as well.

Dyploma

Dyploma is a system for managing containerized applications and services on top of Kubernetes in Outbrain. Dyploma includes the concepts of:

  • artifacts
  • builds
  • deployments
  • services

Dyploma is made out of a Java Spring backend and a Python command-line tool (CLI). The command-line tool operates through API calls to the backend.

The Dyploma Web Application

To facilitate broader adoption of containers within Outbrain, we set up to develop a web application that will have the capabilities of the Dyploma CLI.

The web application will operate by fetching data from the backend and sending operations for execution in the backend. This will be done through the same REST API used by the CLI.

A Vue.js Web Application

We chose Vue.js for constructing the web application. The app was constructed using vue-cli with the webpack template.

The application has three generic screens:

  • list
  • detail
  • form

All concepts have screens from each of these types with similar structure and look and feel, but with different actions and different data.

Vuex

Vuex is the standard state management approach for Vue.js.

Containers vs Components in React

Let us first recap what are containers and components in React.

A container interacts with the Redux and contains a component. The container supplies data to the component through selectors on the store and provides the actions on the store to the component.

Components are given data and render HTML. They use the actions provided from their container to interact with the state. Such actions modify the state, resulting in the selectors fetching new data, and causing the component to be rendered again.

Vue.js with Vuex

Vue.js standard practice does not have the containers vs components distinction. While constructing the Dyploma web application we found it useful to make such a distinction for the benefits of better code structure and reusability.

Let us first describe how the structure of the Dyploma web application.

Generic Components

We constructed three generic components:

  1. list
  2. detail
  3. form

Which can be composed of a component tree that can have more than 3 levels.

Each of these generic screens was used with some variations by multiple types of data. But the look and feel could be configured through a common JSON describing for each type of data, the different fields.

Type Specific Actions and Getters

The getters and actions to be used for each type of data were different. We constructed our Vuex store with modules and needed to use a separate module for each type.

Distinguish Components and Containers

So we had to think how to resolve two opposite requirements. For the benefits of reusability, we need unified generic components. But for the type specific actions and data, we need to use separate modules. We decided up front that the whole app will be constructed as a set of single file components (SFC).

To resolve these two opposite directions, we found it useful to think of our app as consisting of two things:

  • containers – type-specific that interact with store
  • components – generic

Components

We defined each component to a data props for the data it should render, and a description of the structure of data. For any changes and actions required, it will emit an event.

Data is passed from a component to its constituents with v-bind, like v-bind:list=”deployments”.

Events are hooked up with v-on like v-on:search=”search”.

Components are composed of smaller components. Events are propagated up the tree of components. This bottom-up propagation may be disturbing to some, but it is the best approach for Vue.js. In this respect, Vue.js is definitely different from React.

The component is a single file component (SFC).

Such a component is not necessarily functional.

A Container for Each Type of Data

A container knows which module of the store it deals with, and knows its actions and getters. It fetches data from the store using getters. Such data is passed to the components as props.

It listens to events coming from the components using v-on like v-on:search=”search”. In response to such events, it dispatches actions.

The container does not render anything itself, this is done by the component it contains.

The container is a single file component (SFC)s.

A Clean Separation Facilitates Reusability

This clean separation of components and containers make it simpler to see opportunities for reusability. Come to think of it, in most web apps, the real effort in reusability is reusability of the component. The mixing of components and containers causes many components to be coupled with the store. This makes it harder to identify reusability. By distinguishing components and containers, we isolate the components from the store and see more clearly opportunities for reusability.

Easier Testing

Writing unit tests becomes easier with this separation. One can write three classes of tests:

  1. components
  2. containers
  3. store

Each becoming simpler.

We will discuss this further in a separate article.

Conclusions

Split your Vue.js web app into containers and components.

Live Tail in Kubernetes / Docker Based environment

At Outbrain we are big believers in Observability.

What is Observability, and what is the difference between Observability and Monitoring? I will leave the explanation to Baron Schwartz @xaprb:

“Monitoring tells you whether the system works.  Observability lets you ask why it’s not working.”

@ Outbrain we are currently in the midst of migrating to a Kubernetes / Docker based environment.

This presented many new challenges around understanding why things don’t work.

In this post I will be sharing with you our logging implementation which is the first tool used to understand the why.

But first thing first, a short review of our current standard logging architecture:

We use a standard ELK stack for the majority of our logging needs. By standard I mean Logstash on bare metal nodes, Elasticsearch for storage and Kibana for visualizing and analytics.  Apache Kafka is transport layer for all of the above.

A very simplified sketch of the system:Live Tail in Kubernetes

Of course the setup is a bit more complex in real life since Outbrain’s infrastructure is spread across thousands of servers, in multiple physical data centers and cloud providers; and there are multiple Elasticsearch clusters for different use cases.

Add to the equation that these systems are used in a self-serve model, meaning the engineers are creating and updating configurations by themselves – and you end up with a complex system which must be robust and resilient, or the users will lose trust in the system.

The move to Kubernetes presented new challenges and requirements, specifically related to the logging tools:

  • Support multiple Kubernetes clusters and data centers.
  • We don’t want to us “kubectl”, because managing keys is a pain especially in a multi cluster environment.
  • Provide a way to tail logs and even edit log file. This should be available on a single pod or across a service deployed in multiple pods.
  • Leverage existing technologies: Kafka, ELK stack and Log4j on the client side
  • Support all existing logging sources like multiline and Json.
  • Don’t forget services which don’t run in Kubernetes, yes we still need to support those.

 

So how did we meet all those requirements? Time to talk about our new Logging design.

The new architecture is based on a standard Kubernetes logging setup – Fluentd daemonset running on each Kubelet node, and all services are configured to send logs to stdout / err  instead of a file.

The Fluentd agent is collecting the pod’s logs and adding the Kubernetes level labels to every message.

The Fluentd plugin we’re using is the kubernetes_metadata_filter.

After the messages are enriched they are stored in a Kafka topic.

A pool of Logstash agents (Running as pods in Kubernetes) are consuming and parsing messages from Kafka as needed.

Once parsed messages can be indexed into Elasticsearch or routed to another topic.

A sketch of the setup described:

A sketch of the setup described:

And now it is time to introduce CTail.

Ctail, stands for Containers Tail, it is an Outbrain homegrown tool written in Go, and based on a server and client side components.

A CTail server-side component runs per datacenter or per Kubernetes cluster, consuming messages from a Kafka topic named “CTail” and based on the Kubernetes app label creates a stream which can be consumed via the CTail client component.

Since order is important for log messages, and since Kafka only guarantees order for messages in the same partition, we had to make sure messages are partitioned by the pod_id.

With this new setup and tooling, when Outbrain engineers want to live tail their logs, all they need to do is launch the CTail client.

Once the Ctail client starts, it will query Consul, which is what we use for service discovery, to locate all of the CTail servers; register to their streams and will perform aggregations in memory – resulting in a live stream of log entries.

Here is a sketch demonstrating the environment and an example of the CTail client output:

CTail client output

 

To view logs from all pods of a service called “ob1ktemplate” all you need is to run is:

# ctail-client -service ob1ktemplate -msg-only

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: uri http://localhost:8181/Ob1kTemplate/ returned status code 200
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: Getting uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate'
2017-06-13T19:16:25.531Z ob1ktemplate-test-ssages-2751568954-n1rte: uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate' returned status code 200

Or logs of a specific pod:

# ctail-client -service ob1ktemplate -msg-only -pod ob1ktemplate-test-ssages-2751568960-n1kwd

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri 
http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751568960-n1kwd: uri http://localhost:8181/Ob1kTemplate/ returned status code 200

 

This is how we solve this challenge.

Interested in reading more about other challenges we encountered during the migration? Either wait for our next blog, or reach out to visibility at outbrain.com.

African Elephant (Loxodonta africana)

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime

Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.

A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed to be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding, and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:

  1. In place: In place migration of the existing cluster to the new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However, since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right.

First Steps

To better understand the chosen solution let’s take a look at our current architecture:

current architecture:

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity, the diagram describes only one cluster.

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in-house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration, it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.

Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:

data delivery pipeline

Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.

Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.

Well… not really.

Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the databases for no good reason, it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.

So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.

When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.

Let’s Do This…

Phase 1 of the migration should look something like this:

Let's Do This...

 

Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.

Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals.

Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:

new cluster

In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.

The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.

What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active-active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflow engines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.

What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project, you can check out the video from Strata 2017 London where I discussed it in more detail.