Category: Backend

CodinGame Story One – The key for creativity and happiness in developers life

Photo by Juan Gomez on Unsplash

“Keep a developer learning and they’ll be happy working in a windowless basement eating stale food pushed through a slot in the door. And they’ll never ask for a raise.” — Rob Walling (https://robwalling.com/2006/10/31/nine-things-developers-want-more-than-money/)

The past decade has produced substantial research verifying what may come as no surprise: developers want to have fun. While we also need our salaries, salaries alone will not incentivize us developers who, in most cases, entered a field to do what we love: engage in problem-solving. We like competition. We like winning. We like getting prizes for winning. To be productive, we need job satisfaction. And job satisfaction can be achieved only if we get to have fun using the skills we were hired to use.

 

We wanted to keep the backend developers challenged and entertained.
That’s why Guy Kobrinsky and I created our own version of Haggling, whose basic idea we adapted from Hola, a negotiation game.

The Negotiation Game:

Haggling consists of rounds of negotiations between pairs of players. Each pair’s goal is to maximize score in the following manner:

Let’s say there are a sunglasses, two tickets, and three cups on the table. Both players have to agree on how to split these objects between them. To one, the sunglasses may be worth $4, a ball $2, and the tickets are worthless. The opponent might value the same objects differently; while the total worth of all the objects is the same for both players, their valuation kept secret

Both players take turns making offers to each other about how to split the goods. A proposed split must distribute all objects between partners such that no items are left on the table. On each turn, one can either accept an offer or make a counter-offer. If after 9 offers an agreement is reached, every player receives the amount that its portion of the goods is worth, according to the assigned values. If there is still no agreement after the last turn, both players receive no points.

The Object of the Game:

Write code to obtain a collection of items with the highest value by negotiating items with an opponent player.

User Experience:

We wanted it to be as easy as possible for players to submit, play and test their code.
Therefore, we decided to keep player code simple – not relying on any third-party libraries.
To do this, we built a simple web application for testing and submitting code, supplying a placeholder with the method “accept” – the code that needs to be implemented by the different participants. The “accept” method describes a single iteration within the negotiation, in which each player must decide if they will accept the offer given to them (by returning null or the received offer) – or return a counter offer.

 

To assist in verifying the players’ strategy, we added a testing feature allowing players to run their code vs some random player.  Developers were able to play around with it, re-implementing the code before actual submission.

 

Java Code Example:

 

Test Your Code and Submit Online:

 

Tournament And Scoreboard:

Practice tournaments ran continuously for two weeks, taking all submitted players into account and allowing developers to see their rank. During this time, competitors were able edit their code. So there was plenty of time to learn and improve.

We also provided analytics for every player. Developers were able to analyze and improve their strategy.

 

At the end of the two weeks, we declared a code freeze and the real tournament took place. Players’ final score was determined only from the results of the real tournament, not the practice tournaments.

 

Game Execution And Score:

We executed the game tournament using multiple agents – each of the agents was reported to Kibana:

     

The Back-Stage:

Where did we store players’ code?
We decided to store all players’ code in S3 of AWS to avoid revealing the code to other players.

What languages were supported?

We started with Java only, but players expressed interest in using Scala and Kotlin as well. So we gave these developers free rein to add support for those languages, which we then reviewed before integrating into the base code. Ultimately, developers were able to play in all three languages.  

What was the scale of Haggling?

In the final tournament, 91 players competed in 164 million rounds in which 1.14 billion “accepts” were called. The tournament was executed on 45 servers, having 360 cores and using 225G of memory.

The greatest advantage of our approach was our decision to use Kubernetes, enabling us to add more nodes, as well as tune their cores and memory requirements. Needless to say, it was no problem to get rid of all these machines when the game period ended.

                                

How did the tournament progress?
The tournament was tense, and we saw a lot of interaction with the game over the two weeks.
The player in the winning position changed every day, and the final winner was
not apparent until very near the end (and even then we were surprised!).
We saw a variety of single-player strategies with sophisticated calculations and different approaches to game play.
Moreover, in contrast to the original game, we allowed gangs: groups of players belonging to a single team that can “help” each other to win.


So how do you win at haggling?

The winning strategy was collaborative – the winning team created two types of players: the “Overlord” which played to win, and several “Minions” whose job was to give points to the Overlord while blocking other players.  The Overlord and Minions recognized each other using a triple handshake protocol, based on mathematical calculations of the game parameters.  Beyond this, the team employed a human psychological strategy – hiding the strength of the Overlord by ensuring that for the majority of the development period the Overlord went no higher than third place.  They populated the game with “sleeper cells” – players with basic strategies ready to turn into minions at the right moment.  The upheaval occurred in the final hour of the game, when all sleepers were converted to minions.

The graph shows the number of commits in the last hour before code freeze:

 

Hats Off to the Hacker: who got the better of us?

During the two weeks, we noticed multiple hacking attempts. The hacker’s intent was not to crash the game, but rather to prove that it is possible and make a lesson of it.
Although it was not our initial intent, we decided to make hacking part of the challenge, and to reward the hacker for demonstrated skills and creativity.

On the morning of November 7th, we arrived at the office and were faced with the following graph of the outcomes:


The game had been hacked! As can be seen in the graph, one player was achieving an impossible success rate. What we discovered was the following: the read-only hash map that we provided as method argument to players was written in Kotlin; but, when players converted the map to play in either Java or Scala, the resulting conversion rendered a mutable hashmap, and this is how one of the players was able to modify the hash map. We had failed to validate the preferences, ensuring that the hash-map values that players turned in used the same values as the original.


In conclusion, This is exactly the sort of sandbox experience, however, that makes us better, safer, and smarter developers. We embraced the challenge.


Want to play with us? Join Outbrain and challenge yourself.

 

Micro Front Ends — Doing it Angular Style Part 2

Micro Front Ends — Doing it Angular Style Part 2

In the previous part, I talked about the motivations for moving towards an MFE solution and some of the criteria for a solution to be relevant. In this part, Ill get into how we implemented it at Outbrain.

As I mentioned in the previous part, one of the criteria was for a solution that can integrate with our current technological echo system and require little or no changes to the applications we currently maintain.

Enter Angular Lazy Loading Feature Modules

Angular has a built-in concept of modules, which are basically declaration objects that specify all the components, directives, services and other modules that are encapsulated in a module.

@NgModule({
imports: [CommonModule],
declarations: [ WelcomeComponent],
bootstrap: [],
entryComponents: []
})
export class AppB_Module {}

By specifying the module file as a Webpack entry point, this provided us with the ability to bundle up the entire Angular module, including css, and html as a single standalone js file.

entry: {
 'appB_module': './app/appB.prod.module.ts'
}

Using Angular lazy loading mechanism, we can dynamically load this js file and bootstrap in into our current application.

const routes: Routes = [
  {
    path: appB,
    loadChildren: '/appB/appB_Module#AppB_Module'
  }
]

This is a big step towards our goal of separating our application into a mini application.

Moving from feature modules to mini apps

Angular feature modules along with Webpack bundling gives us the code separation we need, but this is not what we enough, since Webpack only allows us to create bundles as part of a single build process, what we want to be able to produce a separate JS bundle, that is built at a different time, from a separate code base in a separate build system that can be loaded into the application at runtime and share any common resources, such as Angular.

In order to resolve this, we had to create our own Webpack loader which is called share-loader.

Share-loader allows us to specify a list of modules that we would like to share between applications, it will bundle a given module into one of the applications js bundle, and provide a namespace in which other bundles access that modules.

Application A web pack.config:

rules: [
 {
   test: /\.js?$/,
   use: [{
     loader: 'share-loader',
     options: {
       modules: [/@angular/, /@lodash/],
       namespace: 'container-app'
     }
   }]
 }

Application B webpack.json

const {Externals} = require('share-loader');
externals: [
 Externals({
   namespace: 'container-app',
   modules: [/@angular/, /@lodash/]
 })
],
output: {
 library: 'appB',
 libraryTarget: 'umd'
},

In this example, we are telling Webpack to bundle angular and lodash into application A and expose it under the ‘container-app’ namespace.

In application B, we are defining that angular and lodash will not be bundled but rather be pointed to by the namespace ‘container-app’.

This way, we can share some modules across applications but maintain others that we wish not to share.

So far we have tackled several of the key’s we specified in the previous post, We now have two application that can be run independently or loaded remotely at runtime while wrapped in a js namespace and have CSS and HTML encapsulation, They can also share modules between then and encapsulate modules that shouldn’t be shared, now lets look into some of the other key’s we mentioned.

DOM encapsulation

In order to tackle CSS encapsulation we wrapped each mini-app with a generic angular component, this component uses angular CSS encapsulation feature, we have two options, we can use either emulated mode or native mode depending on the browser support we require, either way, we are sure that our CSS will not leak out.

@Component({
  selector: 'ob-externals-wrapper',
  template: require('./externals-wrapper.component.pug')(),
  styleUrls: ['./externals-wrapper.component.less'],
  encapsulation: ViewEncapsulation.Native
})

This wrapper component also serves as a communication layer between each mini-app and the other apps. all communication is done via an event bus instance that is hosted by each wrapper instance, by using an event system we have a decoupled way to communicate data in and out, which we can easily clear when a mini application is cleared from the main application.

If we take a look at the situation we have so far, we can see that we have a solution that is very much inline with the web component concept, each mini application is wrapped by a standalone component, that encapsulates all js html and css, and all communication is done by an event system.

Testing

Since each application can also run independently we can run test suites on each one independently, this means each application owner knows when his changes have broken the application and each team is concerned mostly with their own application.

Deployment and serving

In order to provide each application with its own deployment, we created a node service for each application, each time a team created a new deployment of their application a js bundle is created that encapsulates the application, each service exposes an endpoint that returns the path to the bundle. At runtime, when a mini app is loaded into the container app, a call to the endpoint is made and the js file is loaded to the app and bootstrapped to the main application. This way each application can be built a deployed separately

Closing Notes:

Thanks for reading! I hope this article helps companies that are considering this move to realize that it is possible to do it without revolutionizing your code base.

Moving to a Micro Front End approach is a move in the right direction, as applications get bigger, velocity gets smaller.

This article shows a solution using Angular as a framework, similar solutions can be achieved using other frameworks.

Increase Your Velocity with a Safe Automatic Deployment

At Outbrain we work at a fast pace trying to combine the challenges of developing new features fast, while also maintaining our systems so that they can cope with the constant growth of traffic. We deliver many changes on a daily basis to our production and testing environments so our velocity is much affected by our DevOps tools. One of the tools we use the most is the deployment tool since every new artifact must be deployed to simulation and staging environments and pass its test before it can be deployed to production. The simulation environment is used for running E2E integration tests. These tests simulate real use cases and they involve all relevant services. The staging environment is actually a single production machine (AKA a canary machine) which receives a small portion of the traffic in production. It allows us to make sure the new version is working properly in the production environment before we deploy it to the rest of the production servers. In this session, you’ll find out how we increased velocity with a safe automatic deployment of high scale services.

 

Our deployment flow

 

The illustration above depicts the flow each code change must pass until it arrives in production.

A developer commits code changes and triggers a “build & deploy” action that creates an artifact for the requested service and deploys it to our simulators servers. Once an hour, a build in TeamCity runs the simulation tests of our services.

If the developer doesn’t want to wait for the periodic run, they need to run the simulation tests manually. Once the build passes, the developer is allowed to deploy the artifact to the staging server. At this point, we verify that the staging server behaves properly by reviewing various metrics of the server, and by checking the logs of that server.

For instance, we verify that the response time hasn’t increased and that there are no errors in the log. Once all these steps are completed, the new version is deployed to all production servers. This whole process can take 30-45 minutes.

As one can see, this process has a lot of problems:

  1. It requires many interventions of the developer.
  2. The developer either spends time waiting for actions to complete in order to trigger the next ones or they suffer from context switches which slow them down.
  3. The verification of the version in staging is done manually hence
  • It’s time-consuming.
  • There is no certainty that all the necessary tests are made.
  • It’s hard to share knowledge among team members of what the expected result of each test is.

The new automatic pipeline

Recently we have introduced a pipeline in Jenkins that automates this whole process. The pipeline allows a developer to send code changes to any environment (including production) simply by committing them into the source control while ensuring that these changes don’t break anything.

The illustration below shows all stages of our new pipeline

Aside from automating the whole process, which was relatively easy, we had to find a way to automate the manual tests of our staging environment. As mentioned, our staging servers serve real requests coming from our users.

Some of our services handle around 2M requests per minute so any bad version can affect our customers, our users, and us very quickly. Therefore we would like to be able to identify bad versions as soon as possible. To tackle this issue, our pipeline starts running health tests on our staging servers 5 minutes after the server goes up since sometimes it takes time for the servers to warm up.

The tests which are executed by TeamCity, pull a list of metrics of the staging server from our Prometheus server and verify that they meet the criteria we defined. For example, we check that the average response time is below a certain number of milliseconds. If one of these tests fail, the pipeline fails. At that point, the developer who triggered the pipeline receives a notification e-mail so that they can look into it and take the decision whether the new version is bad and revert it, or maybe the tests need some more fine-tuning and the version is okay to deploy to the rest of the servers.

The pipeline ends when the new version is deployed to production but this doesn’t necessarily mean that the version is 100% okay, although the chances that the version is not okay at this stage are low.

For the purpose of ensuring our production servers function properly, many periodic tests constantly monitor the servers and trigger alerts in case of a failure and allow us to react fast and keep our services available.

 

What we gained

  1. The automated deployment process ensures the quality of our deliveries and that they don’t break our production servers.
  2. Reduction of time developers spends on DevOps tasks.
  3. The decision whether a version in staging is okay is more accurate as it is based on comparable metrics and not on a subjective decision of the developer.
  4. The developer doesn’t need to remember which metrics to check for each service in order to tell whether a service functions properly.

Upgrade Railway Tracks Under a Moving Train

Outbrain has quite some relations with Cassandra. We have around 30 production clusters of
different sizes, ranging from small ones to cluster with 100 nodes across 3 datacenters.

Background

Cassandra has proven to be a very reliable choice as a datastore which employs an eventual consistency model. We used the Datastax Enterprise 4.8.9 distribution (which is equivalent to Cassandra 2.1) and as Cassandra 3 evolved clearly at some point we had to plan out an upgrade. We ended up with two options, either upgrade the commercial distribution version provided by Datastax or migrate to the Apache Cassandra distribution. Since Outbrain uses Cassandra extensively it was an important decision to make.
At first, we identified that we do not use any of the Datastax enterprise features and we were quite happy with the basics that are still offered by Apache Cassandra. So, at this point, we decided to give Apache Cassandra 3.1.11 a try.
But when planning an upgrade aside from the necessity to stay on the supported version there are always some goals, like leveraging a new feature and seeking for performance improvement. Our main goal was to get more from the hardware that we have and potentially reduce the number of nodes per cluster.

In addition, we wanted to have in-place upgrade which means that instead of building new clusters we could perform the migration in the production clusters themselves while they are still running, it is like upgrading the railway tracks under a moving train and we are talking about the very big train moving in high speed.

The POC

But before defining the in-place upgrade we needed to validate that the Apache Cassandra distribution fit to our use cases, and for that, we performed a POC. After selecting one of our clusters that will be used as a POC we build a new Apache Cassandra 3.1.11 cluster, following the new storage format and hence read path optimisations in Cassandra 3 we started with a small cluster of 3 nodes in order to see if we can benefit from this feature for reducing the cluster size – that is if we could achieve acceptable read and write latencies with fewer nodes.
The plan was to copy data from the old cluster using sstableloader – while the application would employ dual writes approach before we start the copying. This way we would have two clusters in sync, and we could direct our application to do some/all reads from the new cluster.
And then the problems started… at first it became clear that with our multiple datacenter’s configurations we do not want sstableloader to stream data to foreign data centers, we wanted to be able to control the target datacenter for the stream processing, we reported a bug and instead of waiting for a new release of Apache Cassandra we build our own custom build.
After passing this barrier we reached another one that was related to LeveledCompactionStrategy (LCS) and how Cassandra performs compactions. With a cluster of 3 nodes and 600GB of data in a table with LCS it took few weeks to put data into the right levels. Even with 6 nodes it took days to complete compactions following sstablesloader data load. That was a real pity, because read and write latencies were quite good even with a fewer number of nodes. Luckily Cassandra has a feature called multiple data directories. At first it does look like something to do with allowing more data but this feature actually helps enormously with LCS. So once we tried it and played with an actual number of data directories we identified that we can compact 600GB of data on L0 in less than a day with three data directories.

Encouraged with these findings we conducted various tests on our Cassandra 3.1 cluster and realised that not only we can benefit from storage space savings and faster read path – but also potentially put more data per node for clusters where we use LCS.

Storage before and after the migration

The plan

The conclusions were very good and indicated that we are ok with Apache Cassandra distribution and we can start preparing our migration plan using in-place upgrades. At the high level the
migration steps should be:

  1. Upgrade to the latest DSE 4.x version (4.8.14)
  2. Upgrade to DSE 5.0.14 (Cassandra 3)
  3. Upgrade sstables
  4. Upgrade to DSE 5.1.4 (Cassandra 3.11)
  5. Replace DSE with Apache Cassandra 3.11.1

After defining the plan, we were ready to start migrating our clusters. We mapped our clusters and categorised them according to if they are used for online serving activities or for offline activities.
Then we were good to go and started to perform the upgrade cluster by cluster.

Upgrade to the latest DSE 4.x version (4.8.14)

The first upgrade to DSE 4.8.14 (we had 4.8.9) is a minor upgrade. The reason this step is needed at all because DSE 4.8.14 includes all the important fixes to co-exist with Cassandra 3.0 during upgrade and our upgrade experience tells that indeed DSE 4.8.14 and DSE 5.0.10 (Cassandra 3.0.14) co-existed very well. The way it should be done is to upgrade nodes in a rolling fashion upgrading one node at a time. One node at a time applies to a datacenter only, datacenters could be processed in parallel (which is what we actually did).

Upgrade to DSE 5.0.14 (Cassandra 3)

Even if you aim to use DSE 5.1 you cannot really jump to it from 4.x, so 4.x  – 5.0.x – 5.1.x chain is mandatory. The steps and pre-requisites on upgrade to DSE 5.0 are documented quite well by Datastax . What is probably not documented that well is which driver version to use and how to configure it so that application smoothly works both with DSE 4.x and DSE 5.x nodes during upgrade. What we did is that we instructed our developers to use cassandra-driver-core version 3.3.0 and enforce the protocol version 3 to be used so that the driver can successfully talk to both DSE 4.x and DSE 5.x nodes. Another thing to make sure is that before starting the upgrade you should ensure that all your nodes are healthy. Why? Because one of the most important limitations about clusters running both DSE 4.x and 5.x is the fact that you cannot stream data, which means that bootstrapping a node during node replacement is simply not possible. However, for clusters where we had SLAs on latencies we wanted more confidence and for those clusters what we did was that in one datacenter we upgraded a single node to DSE 5.0.x. This is 100% safe operation, since it’s the first node with DSE 5.0.x it can be replaced using the regular node replacement procedure with DSE 4.x. One very important point here is that DSE 5.0 does not really force any schema migration till all nodes in the cluster are running DSE 5.0.x. What this means is that system keyspaces remain untouched and this is what guarantees that DSE 5.0.x node can be replaced with DSE 4.x. Since we have 2 kinds of clusters (serving and offline) with different SLA we took 2 different approaches, for the serving clusters we upgraded the datacenters sequentially leaving some datacenter running DSE 4.x, we did it because if something goes really wrong, we still would have a healthy datacenter with DSE 4.x running. And for the offline clusters we upgraded all the datacenters in parallel, both approaches worked quite well.

Upgrade sstables

Whether clusters were upgraded in parallel or sequentially we always wanted to start sstablesupgrade as quick as possible after DSE 5.0 was installed. Sstables upgrade is the most important phase of upgrade because it’s most time consuming. There are not so many knobs to tune:

  • control number of compaction threads with -j option for the nodetool upgradesstables command.
  • control the compaction throughput with nodetool setcompactionthroguhput.

Whether -j would allow to speed up sstables upgrade depends a lot on how many column families you have, how big they are and what kind of compaction strategy is configured. As for the compaction throughput we determined that for our storage system it was ok to set it to 200 MB/sec to allow regular read/write operation to be carried out at reasonable higher percentile latencies, but for sure this has to be identified for each storage system individually.

Upgrade to DSE 5.1.4 (Cassandra 3.11)

In theory, upgrade from DSE 5.0.x to DSE 5.1.x is a minor one, just need to upgrade the nodes in a rolling fashion exactly the same way as minor upgrades were done within DSE 4.x, storage format stays the same and no sstablesupgrade is required. But when upgrading our first reasonably large cluster (in terms of number of nodes) we noticed something really bad, nodes running DSE 5.1.x suffered from never-ending compactions in the system keyspace which killed their performance while DSE 5.0.x were running well. For us, it was a tough moment because that was the serving cluster with SLAs and we did not notice anything like that on our previous clusters upgrades! We suspected that something is potentially wrong with schema migrations mechanism and the command nodetool describecluster confirmed that there are two different schema versions, one from 5.0.x nodes and one from 5.1.x nodes. So, we decided to speed up DSE 5.1.x upgrade under assumption that once all nodes are 5.1.x there would no issues with different schema versions and cluster would settle. This indeed happened.

Replace DSE with Apache Cassandra 3.11.1

So how was the final migration step of replacement of DSE 5.1.4 with Apache Cassandra? In the end it worked well across all our clusters. Basically, DSE packages have to be uninstalled, Apache Cassandra has to be installed and pointed to existing data and commit_log directories.

However, two extra steps were required:

  • DSE creates dse-specific keyspaces (e.g. dse_system). This keyspace is created with EverywhereStrategy which is not supported by the Apache Cassandra. So before replacing DSE with Apache Cassandra on a given node the keyspace replication strategy has to be changed.
  • There are few system column families that receive extra columns in DSE deployments (system_peers and system_local). So, if you attempt to start Apache Cassandra after DSE is removed – it will fail on schema check against these column families. There are two ways this could be worked around – the first option is to delete all system keyspaces, set auto_bootstrap to false and also set the allow_unsafe_join JVM option to true – assuming the node’s ip address stays the same Cassandra will re-create system keyspaces and jump into the ring. However we used alternative approach – we had a bunch of scripts which first exported these tables with extra columns to json – and then we imported them on a helper cluster which had the same schema as system tables do in Apache Cassandra. Then we imported data from json files and copied the resultant sstables to the node being upgraded

Minor version, big impact

So Cassandra 3.11 clearly has vast improvements in terms of storage format efficiency and better read latency. However even within a certain Cassandra major version a minor version can introduce a bug which would heavily affect performance. So the latency improvements were achieved on Cassandra 3.11.1. But this very specific version introduced an unpleasant bug (CASSANDRA-14010, fix sstable ordering by max timestamp in singlePartitionReadCommand)

This bug was introduced during refactoring – and led to that for tables with STCS almost all sstables were queried for every read. Typically Cassandra sorts sstables by a timestamp in a descending order – and it starts answering a query by looking at the most recent sstables until it’s able to fulfil a query (with some restrictions, like for counters or unfrozen collections). The bug however made sstables to be sorted in the ascending order. It was fixed in 3.11.2 – and here is the impact – number of sstables per read for 99th percentile has dropped after the upgrade. Reading less sstables for sure leads to lower local read latencies.

 

Drop in sstables per read

Local read latency drop on Cassandra 3.11.2

Always mind page cache

On one of our newly built Cassandra 3.11.2 clusters we noticed unexpectedly high local read latencies. This came as a surprise because this cluster was specifically dedicated to a table with LCS. We spent quite some time in blktrace/blkparse/btt to confirm that our storage by itself is performing well. However we identified later that high latency is only subject for 99th and 98th percentiles – while from btt output it became clear that there write IO bursts which result in a very large blocks thanks to a scheduler merge policy. Next step was to exclude compactions – so even with compactions disabled we still had these write bursts. Looking further it became evident that page cache defaults were different from what we typically had – and the culprit is vm.dirty_ratio which was set to 20%. With 256GB of RAM flushing 20% of it certainly would result in IO burst which would affect reads. So after tuning page cache (we opted for vm.dirty_bytes and vm.dirty_background_bytes instead of vm.dirty_ratio).

Page cache flush settings impact on local read latency

The results

So, in the end – what were our gains from Apache Cassandra 3.11?
Well aside from any new features we got lots of performance improvements, it’s a major storage savings which immediately transform into lower read latencies because more data fits into OS cache which improved a lot the clusters’ performance.

 

In addition, the newer storage format results in lower JVM pressure during reads (object creation ratio decreasing 1.5 – 2 times). The following image describe the decrease in the read latency in our datacenters (NY, CHI & SA) during load tests (LT) that we performed before and after the migration

Read latency before and after the migration

Another important aspect are new metrics exposed. Few interesting ones:

  • Ability to identify consistency level requested for both reads and writes. This can be useful to track if occasionally the consistency level requested is not what would be expected for a given column family. For example, if QUORUM is requested instead of LOCAL_QUORUM due to a bug in app configuration.
  • Metrics on the network messaging.

Read metrics before the migration

Read metrics after the migration

We did something that lots of people thought that it’s impossible, to perform such in-place migration in running production clusters. Although there were some bumps in the way, it’s totally possible to conduct an in-place upgrade of DSE 4.8.x to Apache Cassandra 3.11, while maintaining acceptable latencies and avoiding any cluster downtime.

Trust your IntelliJ

I’m in the habit of leaving classes cleaner than they were before I got there. CMD+ALT+O (optimize imports) and CMD+ALT+L (reformat code) have become a muscle memory by now. I will spend my time removing unused properties, deleting dead code, removing deprecations or duplications. One of my all-time favorites is rewriting tests to make them clean and readable. I do this as part of any day to day tasks, no matter how small or insignificant. It is satisfying to know you leave your code in a better state than before (and my OCD is fed and happy).

One of the best tools I use to achieve this task is none other than the good old IntelliJ IDEA. The highlights to unused parameters, typos, accessibility level of a class or method, a parameter that is always called with the same value etc. You know what I’m talking about, that nagging little yellow square you never pay attention to or deliberately ignore because you’re always working on some important, time-sensitive feature. You “never” have time to quickly sweep and ‘clean-up’ the class you’re working on.

 

“That nagging little square”

My team at work is divided into three groups:

  • The people who use a several-major-versions-old IntelliJ (reciting the age-old mantra — “if it ain’t broke, don’t fix it”)
  • A person who uses the latest stable version of IntelliJ
  • And me, always making sure to use the latest EAP.

The first group of people doesn’t see some of the suggestions IntelliJ gives me. The first example which pops to mind is the support of lambda expressions we got with Java 8. IntelliJ is very helpful in highlighting the code that can be converted to a shorter lambda which makes the code shorter, cleaner and quite frankly more readable (which is a matter of personal taste and habit). Today I got to a class which I have never seen before to make a tiny change. It’s a class in a service I’m not usually working on, so I hadn’t had the chance to make my usual OCD powered sweep and clean up. I proceeded to change one global parameter to private accessibility from the public and added a ‘final’ keyword to another parameter. Happy and satisfied, my OCD let me commit this minor change.

Imagine how surprised I was to have my OCD rub my nose in a piece of code I entirely missed:

IntelliJ just told me that it found a bug:

After changing the code and calling the ‘putAll’ method with ‘innerMap’:

IntelliJ even added a bonus of highlighting another optimization I could do — “Lambda can be replaced with method reference.” and now the code looks like so:

Now, I know what you’re thinking. It’s something of the sort of: “where were the tests?” or “tests would have protected him from making this stupid mistake” (I can hear your scolding tone in my head right now thank you very much), it’s a valid point, but it’s not the point I’m trying to make and it’s out of the scope of this post. What I’m trying to tell you is — trust your IntelliJ!

Spark in framework

Making Spark Native for Data Processing Framework

Working in an infrastructure team is different from working in a development team. The team develops tools that are used by developers’ teams inside Outbrain. It is very important to provide a convenient way for working with new technologies. Having a Spark cluster enables running distributed data processing in scale. Until now developers worked with Spark writing ad-hoc crontab jobs in production. It created a messy and unstable environment and required manual maintenance in case of failure. For that reason, our team wanted to give an infrastructural way for working with Spark.

In this blog post, I explain how we built the solution implementing Spark to be a native tool in our WorkflowEngine.

Meet WorkflowEngine

The Outbrain Data Infrastructure team enables running hundreds of ETL jobs every hour. These jobs are orchestrated by WorkflowEngine – an internal tool, which is developed by the team.

The WorkflowEngine (WFE) enables running various types of transformations from many kinds of data sources: MySQL, Cassandra, Hive, Vertica, Hadoop etc., located in two data centers and the cloud. Every transformation, or flow, implements some business logic. A flow may contain several tasks, which are executed one after another.

Flows are developed and owned by researchers, data scientists and developers in other groups at Outbrain, such as  Algorithm research groups, BI and Data Science.

input Trigger

Defining flows in WFE gives users a lot of benefits. It enables users to autonomously specify dependencies between flows, so when one flow ends, another is triggered. In addition, users can define a number of retries for their flows, so if a flow fails for some reason, WFE reruns it. If all retries fail, WFE sends an email or alert about the flow failures. Logs for each running flow are shown in a convenient form in WFE UI. Furthermore, WFE collects metrics for all flows, which enables us to track the health of the system.

From the operational perspective, several instances of WFE are deployed on machines which are owned by the infrastructure team, users just define ETL flows in a separate repository.

Diving Into the Solution

In order to respond to requirements for running data analytics on Spark, our team had to allow an infrastructural way for running Spark jobs orchestrated by WFE. To achieve this, we’ve developed a new kind of WorkflowEngine task called SparkStep.

When approaching the design for SparkStep, we had several principals in mind:

  1. Avoid producing additional load on WFE machines
  2. Create an isolated environment for every Spark job
  3. Allow users to specify and at the same time allow the system to limit the number of YARN resources allocated for every SPARK job
  4. Enable using multiple Spark clusters with different versions

Taking all this into consideration, we came up with the following architecture:

 

Keeping the control while having fun

The Spark job definition specifies how a  job will run on the Spark cluster (e.g. jarURL, className, configuration parameters). In addition, there are properties that are set internally by WFE. When WFE starts executing a SPARK job, it builds a spark-submit command using all these properties.

The spark-submit command submits the Spark job to YARN, which allocates needed resources and launches the application. After submitting the job, WFE constantly checks its status using YARN REST API. When the job is completed in YARN, WFE reports a COMPLETED status for the WFE Spark task and continues running the next task. However, if the Spark job fails in YARN, WFE reports a FAILED status for the WFE job. In this case, WFE can rerun the job, according to the defined number of retries. If the job runs in YARN for too long, and its running time exceeds the defined maximum, WFE stops the job using the YARN REST API. In this case, the job is marked as FAILED in WFE. This functionality ensures both that we have a cap on the amount of resources each flow can use and that resources are saved when a job is canceled for any reason.

The spark-submit command runs in a Docker container. The Docker exits once the job is submitted to the cluster. It enables us to create an isolated environment for every Spark job executed by WFE. In addition, using Docker container, we can easily use different Spark versions simultaneously, by creating different Docker images.

By using this architecture we achieve all our goals. Spark jobs are submitted to YARN in a “cluster” mode, so they do not create a load on WFE machines. YARN handles all resources for each job. Every Spark job runs in an isolated environment owing to the Docker container.

X tips [x>5] for Micro-Services Logging

Micro-Services Logging

What if?

What if someone told you it is forbidden to use logs anymore?

In my case, it began in a meeting with our ops team that claimed our services are writing too many logs and we should write less. Therefor logs are being “throttled” ie: some log messages are just discarded.

Too many logs were 100 lines per minute, which in my opinion was a ridiculously low number.

Maybe I am doing something wrong?

It might be that logs are not a good pattern in a micro-services-highly-distributed environment. I decided to rethink my assumptions.

Why do I need logs anyway?

The obvious reason to write logs is debugging purposes. When something goes wrong in production, logs are a way I can understand the flow that leads to that erroneous state.

The main alternative that I am aware of is connecting a debugger and stepping thru the code in the flow. There are a couple of disadvantages for using the debugger: It is time-consuming, It might slow down the production process itself and you have to be connected when it happens — so If this bug happens only at night — bummer. In addition debugging is a one time process, the learning from it is just in your head so it is hard to improve that way.

Another alternative is adding metrics. Metrics are pretty similar to logs but they have the extra feature of having nice dashboards and alerting systems (we use metrics , prometheus and grafana). On the other hand, metrics have bigger overhead in the setup process which is the main disadvantage in my opinion. Metrics are also more rigid and do not allow usually to log all state, context, and parameters. The fact that writing logs is easy makes it a no-brainer to use everywhere in the code while applying the “think later” paradigm.

The third alternative is auditing via systems like ELK etc’. Similar to metrics it has higher overhead and it also hard to follow sequential operations with those discrete events.

There are even more reasons for logging. It is an additional documentation in the code that can help understand what is going on. Logging can also be used as metrics and alerting systems and even replace those systems. Many insights can be gained from logs, sometimes even user passwords.

Specification

If I go back to that meeting with the ops guys, one questions I was asked was:

‘What are your requirements from your logging system’?

Here it is:

Order matters — messages should be in ‘written before’ order so it will be possible to understand the flow of the code.

Zero throttling — I expect that there will be no rate limit on writing, only on the volume of saved messages, so every time a message is thrown away, it is the oldest one.

X days history — Log files should keep log messages for at least couple of days — if that is not the case it might be you are writing too many log messages, so move some to debug level.

Logs should be greppable — text files has this big advantage of flexibility and multiple tools that can be used with them. So text files are a big advantage.

Metadata — logging system should provide some metadata like calling thread, timestamp, calling method or class etc’ to remove that burden from the developer (logging should be easy).

Distributed and centralized — it should be convenient to look at all logs in a central location, but also be able to split it and see logs of a specific process.

Easy to use, easy to install, easy to consume — in general logging should be fun.

Tips and Tricks

I am in. Is there anything else to know?

Use logging framework

Don’t log to standard output with print lines in a long-running service. Logging framework allows various levels, various appenders like logging to HTML via HTTP, log rotation and all sort of features and tricks.

Developer tip — Log levels

It’s important to be consistent when using different levels, otherwise, you will lose semantics and meaning of the message severity. The way I use them is:

  • Error — something bad happened and it might crash the process/service.
  • Warn — something bad happened in a specific use case.
  • Info — something that I want to see happen.
  • Debug — something happened, I wish to see only under special circumstances otherwise it will clutter normal logs.

There are some special reasons to override that rule: Error and Warn messages are monitored in our services, so sometimes I might move something to info level to snooze alerting of it. I move messages from Info to Debug if there is too much clutter and the other direction if I want to focus on something.

Developer tip — Meaningful messages

I often see messages like “start processing” or “end method”. It is always a good idea to try and imagine what else you would like to see when reading log messages s and add as much info as possible, specific parameters, fields and contextual data which might have different value at different scenarios.

Developer tip — Lazy evaluated strings

Especially for Debug level, it is a good practice to use frameworks that prevent the overhead of string concatenation when the level is turned off without the need to explicitly check if the level isn’t suppressed. In kotlin-logging, for example, it will look like this:

logger.debug { “Some $expensive message!” }

Developer tip — Don’t log errors more than once per error

It is a common anti-pattern to log an exception and then re-throw it just to be logged later in another place in the code again. It makes it harder to understand the number of errors and their origin. Log exceptions only if you are not re-throwing them.

Ad-Hoc enablement of log level

Some frameworks and services allow on-the-fly change of the active log level. It means you can print debug messages of a specific class in a specific instance for a couple of minutes, for example. It allows debugging while not trashing the log file in the rest of the time.

Ad-Hoc addition of logging messages

When I worked at Intel, one of my peers developed a JVM tool that allowed bytecode manipulation of methods and adding log messages at the beginning of methods and at the end of methods.

It means you didn’t have to think in advance on all those messages, but just to inject them with log messages when needed while the process is running.

In-Memory logs

Another useful technique is keeping last messages in memory. It allows developing an easy way to access them from remote via REST call, for example. It is also possible to have that dumped into a file in case the process has a crash.

Logging as a poor-man profiler

It is possible to analyze logs to gain also insight on the performance of the application. The simple technique I saw is using the timestamp in the logs. A more advanced technique is using the context to calculate and show the time from the beginning of the sequence (ie: when the HTTP call started), by using MDC.

Log formatting

The content of the message is also important. Various logging framework allows embedding predefined template parameters such as:

  • Location info — Class and file name, method name and line number of where the log message was issued.
  • Date and time.
  • Log level as discussed above.
  • Thread info — relevant to multi-threads environments to be able to separate different flows.
  • Context info — similar to thread info but more specific to a use case, add context information like user id, request id etc’. Framework features like MDC make it easier to implement

I highly recommend using those, but bare in mind that some features pose performance overhead when they are evaluated.

Logging — the essentials

Logging is a big world, I couldn’t cover all of it here, but I hope I convinced you to use it.

Have fun and keep on logging!

Live Tail in Kubernetes / Docker Based environment

At Outbrain we are big believers in Observability.

What is Observability, and what is the difference between Observability and Monitoring? I will leave the explanation to Baron Schwartz @xaprb:

“Monitoring tells you whether the system works.  Observability lets you ask why it’s not working.”

@ Outbrain we are currently in the midst of migrating to a Kubernetes / Docker based environment.

This presented many new challenges around understanding why things don’t work.

In this post I will be sharing with you our logging implementation which is the first tool used to understand the why.

But first thing first, a short review of our current standard logging architecture:

We use a standard ELK stack for the majority of our logging needs. By standard I mean Logstash on bare metal nodes, Elasticsearch for storage and Kibana for visualizing and analytics.  Apache Kafka is transport layer for all of the above.

A very simplified sketch of the system:Live Tail in Kubernetes

Of course the setup is a bit more complex in real life since Outbrain’s infrastructure is spread across thousands of servers, in multiple physical data centers and cloud providers; and there are multiple Elasticsearch clusters for different use cases.

Add to the equation that these systems are used in a self-serve model, meaning the engineers are creating and updating configurations by themselves – and you end up with a complex system which must be robust and resilient, or the users will lose trust in the system.

The move to Kubernetes presented new challenges and requirements, specifically related to the logging tools:

  • Support multiple Kubernetes clusters and data centers.
  • We don’t want to us “kubectl”, because managing keys is a pain especially in a multi cluster environment.
  • Provide a way to tail logs and even edit log file. This should be available on a single pod or across a service deployed in multiple pods.
  • Leverage existing technologies: Kafka, ELK stack and Log4j on the client side
  • Support all existing logging sources like multiline and Json.
  • Don’t forget services which don’t run in Kubernetes, yes we still need to support those.

 

So how did we meet all those requirements? Time to talk about our new Logging design.

The new architecture is based on a standard Kubernetes logging setup – Fluentd daemonset running on each Kubelet node, and all services are configured to send logs to stdout / err  instead of a file.

The Fluentd agent is collecting the pod’s logs and adding the Kubernetes level labels to every message.

The Fluentd plugin we’re using is the kubernetes_metadata_filter.

After the messages are enriched they are stored in a Kafka topic.

A pool of Logstash agents (Running as pods in Kubernetes) are consuming and parsing messages from Kafka as needed.

Once parsed messages can be indexed into Elasticsearch or routed to another topic.

A sketch of the setup described:

A sketch of the setup described:

And now it is time to introduce CTail.

Ctail, stands for Containers Tail, it is an Outbrain homegrown tool written in Go, and based on a server and client side components.

A CTail server-side component runs per datacenter or per Kubernetes cluster, consuming messages from a Kafka topic named “CTail” and based on the Kubernetes app label creates a stream which can be consumed via the CTail client component.

Since order is important for log messages, and since Kafka only guarantees order for messages in the same partition, we had to make sure messages are partitioned by the pod_id.

With this new setup and tooling, when Outbrain engineers want to live tail their logs, all they need to do is launch the CTail client.

Once the Ctail client starts, it will query Consul, which is what we use for service discovery, to locate all of the CTail servers; register to their streams and will perform aggregations in memory – resulting in a live stream of log entries.

Here is a sketch demonstrating the environment and an example of the CTail client output:

CTail client output

 

To view logs from all pods of a service called “ob1ktemplate” all you need is to run is:

# ctail-client -service ob1ktemplate -msg-only

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: uri http://localhost:8181/Ob1kTemplate/ returned status code 200
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: Getting uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate'
2017-06-13T19:16:25.531Z ob1ktemplate-test-ssages-2751568954-n1rte: uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate' returned status code 200

Or logs of a specific pod:

# ctail-client -service ob1ktemplate -msg-only -pod ob1ktemplate-test-ssages-2751568960-n1kwd

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri 
http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751568960-n1kwd: uri http://localhost:8181/Ob1kTemplate/ returned status code 200

 

This is how we solve this challenge.

Interested in reading more about other challenges we encountered during the migration? Either wait for our next blog, or reach out to visibility at outbrain.com.

Keep bugs out of production

Production bugs are painful and can severely impact a dev team’s velocity. My team at Outbrain has succeeded in implementing a work process that enables us to send new features to production free of bugs, a process that incorporates automated functions with team discipline.

Why should I even care?

Bugs happen all the time – and they will be found locally or in production. But the main difference between preventing and finding the bug in a pre-production environment is the cost: according to IBM’s research, fixing a bug in production can cost X5 times more than discovering it in pre-production environments (during the design, local development, or test phase).

Let’s describe one of the scenarios happen once a bug reaches production:

  • A customer finds the bug and alerts customer service.
  • The bug is logged by the production team.
  • The developer gets the description of the bug, opens the spec, and spends time reading it over.
  • The developer then will spend time recreating the bug.
  • The developer must then reacquaint him/herself with the code to debug it.
  • Next, the fix must undergo tests.
  • The fix is then built and deployed in other environments.
  • Finally, the fix goes through QA testing (requiring QA resources).

How to stop bugs from reaching production

To catch and fix bugs at the most time-and-cost efficient stage, we follow these steps, adhering to the several core principles:

How to stop bugs from reaching production

Stage 1 – Local Environment and CI

Step 1: Design well. Keep it simple.

Create the design before coding: try to divide difficult problems into smaller parts/steps/modules that you can tackle one by one, thinking of objects with well-defined responsibilities. Share the plans with your teammates at design-review meetings. Good design is a key to reducing bugs and improving code quality.

Step 2: Start Coding

The code should be readable and simple. Design and development principles are your best friends. Use SOLID, DRY, YAGNI, KISS and Polymorphism to implement your code.
Unit tests are part of the development process. We use them to test individual code units and ensure that the unit is logically correct.
Unit tests are written and executed by developers. Most of the time we use JUnit as our testing framework.

Step 3: Use code analysis tools

To help ensure and maintain the quality of our code, we use several automated code-analysis tools:
FindBugs – A static code analysis tool that detects possible bugs in Java programs, helping us to improve the correctness of our code.
Checkstyle –  Checkstyle is a development tool to help programmers write Java code that adheres to a coding standard. It automates the process of checking Java code.

Step 4: Perform code reviews

We all know that code reviews are important. There are many best practices online (see 7 Ways to Up-Level Your Code Review Skills, Best Practices for Peer Code Review, and Effective Code Reviews), so let’s focus on the tools we use. All of our code commits are populated to ReviewBoard, and developers can review the committed code, see at any point in time the latest developments, and share input.
For the more crucial teams, we have a build that makes sure every commit has passed a code review – in the case that a review has not be done, the build will alert the team that there was an unreviewed change.
Regardless of whether you are performing a post-commit, a pull request, or a pre-commit review, you should always aim to check and review what’s being inserted into your codebase.

Step 5: CI

This is where all code is being integrated. We use TeamCity to enforce our code standards and correctness by running unit tests, FindBugs validations Checkstyle rules and other types of policies.

Stage 2 – Testing Environment

Step 1: Run integration tests

Check if the system as a whole work. Integration testing is also done by developers, but rather than testing individual components, it aims to test across components. A system consists of many separate components like code, database, web servers, etc.
Integration tests are able to spot issues like wiring of components, network access, database issues, etc. We use Jenkins and TeamCity to run CI tests.

Step 2: Run functional tests

Check that each feature is implemented correctly by comparing the results for a given input with the specification. Typically, this is not done at the development level.
Test cases are written based on the specification, and the actual results are compared with the expected results. We run functional tests using Selenium and Protractor for UI testing and Junit for API testing.

Stage 3 – Staging Environment

This environment is often referred to as a pre-production sandbox, a system testing area, or simply a staging area. Its purpose is to provide an environment that simulates your actual production environment as closely as possible so you can test your application in conjunction with other applications.
Move a small percentage of real production requests to the staging environment where QA tests the features.

Stage 4 – Production Environment

Step 1: Deploy gradually

Deployment is a process that delivers our code into production machines. If some errors occurred during deployment, our Continuous Delivery system will pause the deployment, preventing the problematic version to reach all the machines, and allow us to roll back quickly.

Step 2: Incorporate feature flags

All our new components are released with feature flags, which basically serve to control the full lifecycle of our features.  Feature flags allow us to manage components and compartmentalize risk.

Step 3: Release gradually

There are two ways to make our release gradual:

  1. We test new features on a small set of users before releasing to everyone.
  2. Open the feature initially to, say, 10% of our customers, then 30%, then 50%, and then 100%.

Both methods allow us to monitor and track problematic scenarios in our systems.

Step 4: Monitor and Alerts

We use the ELK stack consisting of Elasticsearch, Logstash, and Kibana to manage our logs and events data.
For Time Series Data we use Prometheus as the metric storage and alerting engine.
Each developer can set up his own metrics and build grafana dashboards.
Setting the alerts is also part of the developer’s work and it is his responsibility to tune the threshold for triggering the PagerDuty alert.
PagerDuty is an automated call, texting, and email service, which escalates notifications between responsible parties to ensure the issues are addressed by the right people at the right time.

stop bugs
All in All,
Don’t let the bugs fly out of control.

African Elephant (Loxodonta africana)

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime

Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.

A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed to be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding, and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:

  1. In place: In place migration of the existing cluster to the new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However, since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right.

First Steps

To better understand the chosen solution let’s take a look at our current architecture:

current architecture:

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity, the diagram describes only one cluster.

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in-house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration, it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.

Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:

data delivery pipeline

Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.

Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.

Well… not really.

Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the databases for no good reason, it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.

So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.

When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.

Let’s Do This…

Phase 1 of the migration should look something like this:

Let's Do This...

 

Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.

Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals.

Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:

new cluster

In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.

The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.

What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active-active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflow engines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.

What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project, you can check out the video from Strata 2017 London where I discussed it in more detail.