Outbrain Tech Blog Recent Posts

CodinGame Story One – The key for creativity and happiness in developers life

Photo by Juan Gomez on Unsplash

“Keep a developer learning and they’ll be happy working in a windowless basement eating stale food pushed through a slot in the door. And they’ll never ask for a raise.” — Rob Walling (https://robwalling.com/2006/10/31/nine-things-developers-want-more-than-money/)

The past decade has produced substantial research verifying what may come as no surprise: developers want to have fun. While we also need our salaries, salaries alone will not incentivize us developers who, in most cases, entered a field to do what we love: engage in problem-solving. We like competition. We like winning. We like getting prizes for winning. To be productive, we need job satisfaction. And job satisfaction can be achieved only if we get to have fun using the skills we were hired to use.

 

We wanted to keep the backend developers challenged and entertained.
That’s why Guy Kobrinsky and I created our own version of Haggling, whose basic idea we adapted from Hola, a negotiation game.

The Negotiation Game:

Haggling consists of rounds of negotiations between pairs of players. Each pair’s goal is to maximize score in the following manner:

Let’s say there are a sunglasses, two tickets, and three cups on the table. Both players have to agree on how to split these objects between them. To one, the sunglasses may be worth $4, a ball $2, and the tickets are worthless. The opponent might value the same objects differently; while the total worth of all the objects is the same for both players, their valuation kept secret

Both players take turns making offers to each other about how to split the goods. A proposed split must distribute all objects between partners such that no items are left on the table. On each turn, one can either accept an offer or make a counter-offer. If after 9 offers an agreement is reached, every player receives the amount that its portion of the goods is worth, according to the assigned values. If there is still no agreement after the last turn, both players receive no points.

The Object of the Game:

Write code to obtain a collection of items with the highest value by negotiating items with an opponent player.

User Experience:

We wanted it to be as easy as possible for players to submit, play and test their code.
Therefore, we decided to keep player code simple – not relying on any third-party libraries.
To do this, we built a simple web application for testing and submitting code, supplying a placeholder with the method “accept” – the code that needs to be implemented by the different participants. The “accept” method describes a single iteration within the negotiation, in which each player must decide if they will accept the offer given to them (by returning null or the received offer) – or return a counter offer.

 

To assist in verifying the players’ strategy, we added a testing feature allowing players to run their code vs some random player.  Developers were able to play around with it, re-implementing the code before actual submission.

 

Java Code Example:

 

Test Your Code and Submit Online:

 

Tournament And Scoreboard:

Practice tournaments ran continuously for two weeks, taking all submitted players into account and allowing developers to see their rank. During this time, competitors were able edit their code. So there was plenty of time to learn and improve.

We also provided analytics for every player. Developers were able to analyze and improve their strategy.

 

At the end of the two weeks, we declared a code freeze and the real tournament took place. Players’ final score was determined only from the results of the real tournament, not the practice tournaments.

 

Game Execution And Score:

We executed the game tournament using multiple agents – each of the agents was reported to Kibana:

     

The Back-Stage:

Where did we store players’ code?
We decided to store all players’ code in S3 of AWS to avoid revealing the code to other players.

What languages were supported?

We started with Java only, but players expressed interest in using Scala and Kotlin as well. So we gave these developers free rein to add support for those languages, which we then reviewed before integrating into the base code. Ultimately, developers were able to play in all three languages.  

What was the scale of Haggling?

In the final tournament, 91 players competed in 164 million rounds in which 1.14 billion “accepts” were called. The tournament was executed on 45 servers, having 360 cores and using 225G of memory.

The greatest advantage of our approach was our decision to use Kubernetes, enabling us to add more nodes, as well as tune their cores and memory requirements. Needless to say, it was no problem to get rid of all these machines when the game period ended.

                                

How did the tournament progress?
The tournament was tense, and we saw a lot of interaction with the game over the two weeks.
The player in the winning position changed every day, and the final winner was
not apparent until very near the end (and even then we were surprised!).
We saw a variety of single-player strategies with sophisticated calculations and different approaches to game play.
Moreover, in contrast to the original game, we allowed gangs: groups of players belonging to a single team that can “help” each other to win.


So how do you win at haggling?

The winning strategy was collaborative – the winning team created two types of players: the “Overlord” which played to win, and several “Minions” whose job was to give points to the Overlord while blocking other players.  The Overlord and Minions recognized each other using a triple handshake protocol, based on mathematical calculations of the game parameters.  Beyond this, the team employed a human psychological strategy – hiding the strength of the Overlord by ensuring that for the majority of the development period the Overlord went no higher than third place.  They populated the game with “sleeper cells” – players with basic strategies ready to turn into minions at the right moment.  The upheaval occurred in the final hour of the game, when all sleepers were converted to minions.

The graph shows the number of commits in the last hour before code freeze:

 

Hats Off to the Hacker: who got the better of us?

During the two weeks, we noticed multiple hacking attempts. The hacker’s intent was not to crash the game, but rather to prove that it is possible and make a lesson of it.
Although it was not our initial intent, we decided to make hacking part of the challenge, and to reward the hacker for demonstrated skills and creativity.

On the morning of November 7th, we arrived at the office and were faced with the following graph of the outcomes:


The game had been hacked! As can be seen in the graph, one player was achieving an impossible success rate. What we discovered was the following: the read-only hash map that we provided as method argument to players was written in Kotlin; but, when players converted the map to play in either Java or Scala, the resulting conversion rendered a mutable hashmap, and this is how one of the players was able to modify the hash map. We had failed to validate the preferences, ensuring that the hash-map values that players turned in used the same values as the original.


In conclusion, This is exactly the sort of sandbox experience, however, that makes us better, safer, and smarter developers. We embraced the challenge.


Want to play with us? Join Outbrain and challenge yourself.

 

Micro Front Ends — Doing it Angular Style Part 2

Micro Front Ends — Doing it Angular Style Part 2

In the previous part, I talked about the motivations for moving towards an MFE solution and some of the criteria for a solution to be relevant. In this part, Ill get into how we implemented it at Outbrain.

As I mentioned in the previous part, one of the criteria was for a solution that can integrate with our current technological echo system and require little or no changes to the applications we currently maintain.

Enter Angular Lazy Loading Feature Modules

Angular has a built-in concept of modules, which are basically declaration objects that specify all the components, directives, services and other modules that are encapsulated in a module.

@NgModule({
imports: [CommonModule],
declarations: [ WelcomeComponent],
bootstrap: [],
entryComponents: []
})
export class AppB_Module {}

By specifying the module file as a Webpack entry point, this provided us with the ability to bundle up the entire Angular module, including css, and html as a single standalone js file.

entry: {
 'appB_module': './app/appB.prod.module.ts'
}

Using Angular lazy loading mechanism, we can dynamically load this js file and bootstrap in into our current application.

const routes: Routes = [
  {
    path: appB,
    loadChildren: '/appB/appB_Module#AppB_Module'
  }
]

This is a big step towards our goal of separating our application into a mini application.

Moving from feature modules to mini apps

Angular feature modules along with Webpack bundling gives us the code separation we need, but this is not what we enough, since Webpack only allows us to create bundles as part of a single build process, what we want to be able to produce a separate JS bundle, that is built at a different time, from a separate code base in a separate build system that can be loaded into the application at runtime and share any common resources, such as Angular.

In order to resolve this, we had to create our own Webpack loader which is called share-loader.

Share-loader allows us to specify a list of modules that we would like to share between applications, it will bundle a given module into one of the applications js bundle, and provide a namespace in which other bundles access that modules.

Application A web pack.config:

rules: [
 {
   test: /\.js?$/,
   use: [{
     loader: 'share-loader',
     options: {
       modules: [/@angular/, /@lodash/],
       namespace: 'container-app'
     }
   }]
 }

Application B webpack.json

const {Externals} = require('share-loader');
externals: [
 Externals({
   namespace: 'container-app',
   modules: [/@angular/, /@lodash/]
 })
],
output: {
 library: 'appB',
 libraryTarget: 'umd'
},

In this example, we are telling Webpack to bundle angular and lodash into application A and expose it under the ‘container-app’ namespace.

In application B, we are defining that angular and lodash will not be bundled but rather be pointed to by the namespace ‘container-app’.

This way, we can share some modules across applications but maintain others that we wish not to share.

So far we have tackled several of the key’s we specified in the previous post, We now have two application that can be run independently or loaded remotely at runtime while wrapped in a js namespace and have CSS and HTML encapsulation, They can also share modules between then and encapsulate modules that shouldn’t be shared, now lets look into some of the other key’s we mentioned.

DOM encapsulation

In order to tackle CSS encapsulation we wrapped each mini-app with a generic angular component, this component uses angular CSS encapsulation feature, we have two options, we can use either emulated mode or native mode depending on the browser support we require, either way, we are sure that our CSS will not leak out.

@Component({
  selector: 'ob-externals-wrapper',
  template: require('./externals-wrapper.component.pug')(),
  styleUrls: ['./externals-wrapper.component.less'],
  encapsulation: ViewEncapsulation.Native
})

This wrapper component also serves as a communication layer between each mini-app and the other apps. all communication is done via an event bus instance that is hosted by each wrapper instance, by using an event system we have a decoupled way to communicate data in and out, which we can easily clear when a mini application is cleared from the main application.

If we take a look at the situation we have so far, we can see that we have a solution that is very much inline with the web component concept, each mini application is wrapped by a standalone component, that encapsulates all js html and css, and all communication is done by an event system.

Testing

Since each application can also run independently we can run test suites on each one independently, this means each application owner knows when his changes have broken the application and each team is concerned mostly with their own application.

Deployment and serving

In order to provide each application with its own deployment, we created a node service for each application, each time a team created a new deployment of their application a js bundle is created that encapsulates the application, each service exposes an endpoint that returns the path to the bundle. At runtime, when a mini app is loaded into the container app, a call to the endpoint is made and the js file is loaded to the app and bootstrapped to the main application. This way each application can be built a deployed separately

Closing Notes:

Thanks for reading! I hope this article helps companies that are considering this move to realize that it is possible to do it without revolutionizing your code base.

Moving to a Micro Front End approach is a move in the right direction, as applications get bigger, velocity gets smaller.

This article shows a solution using Angular as a framework, similar solutions can be achieved using other frameworks.

Increase Your Velocity with a Safe Automatic Deployment

At Outbrain we work at a fast pace trying to combine the challenges of developing new features fast, while also maintaining our systems so that they can cope with the constant growth of traffic. We deliver many changes on a daily basis to our production and testing environments so our velocity is much affected by our DevOps tools. One of the tools we use the most is the deployment tool since every new artifact must be deployed to simulation and staging environments and pass its test before it can be deployed to production. The simulation environment is used for running E2E integration tests. These tests simulate real use cases and they involve all relevant services. The staging environment is actually a single production machine (AKA a canary machine) which receives a small portion of the traffic in production. It allows us to make sure the new version is working properly in the production environment before we deploy it to the rest of the production servers. In this session, you’ll find out how we increased velocity with a safe automatic deployment of high scale services.

 

Our deployment flow

 

The illustration above depicts the flow each code change must pass until it arrives in production.

A developer commits code changes and triggers a “build & deploy” action that creates an artifact for the requested service and deploys it to our simulators servers. Once an hour, a build in TeamCity runs the simulation tests of our services.

If the developer doesn’t want to wait for the periodic run, they need to run the simulation tests manually. Once the build passes, the developer is allowed to deploy the artifact to the staging server. At this point, we verify that the staging server behaves properly by reviewing various metrics of the server, and by checking the logs of that server.

For instance, we verify that the response time hasn’t increased and that there are no errors in the log. Once all these steps are completed, the new version is deployed to all production servers. This whole process can take 30-45 minutes.

As one can see, this process has a lot of problems:

  1. It requires many interventions of the developer.
  2. The developer either spends time waiting for actions to complete in order to trigger the next ones or they suffer from context switches which slow them down.
  3. The verification of the version in staging is done manually hence
  • It’s time-consuming.
  • There is no certainty that all the necessary tests are made.
  • It’s hard to share knowledge among team members of what the expected result of each test is.

The new automatic pipeline

Recently we have introduced a pipeline in Jenkins that automates this whole process. The pipeline allows a developer to send code changes to any environment (including production) simply by committing them into the source control while ensuring that these changes don’t break anything.

The illustration below shows all stages of our new pipeline

Aside from automating the whole process, which was relatively easy, we had to find a way to automate the manual tests of our staging environment. As mentioned, our staging servers serve real requests coming from our users.

Some of our services handle around 2M requests per minute so any bad version can affect our customers, our users, and us very quickly. Therefore we would like to be able to identify bad versions as soon as possible. To tackle this issue, our pipeline starts running health tests on our staging servers 5 minutes after the server goes up since sometimes it takes time for the servers to warm up.

The tests which are executed by TeamCity, pull a list of metrics of the staging server from our Prometheus server and verify that they meet the criteria we defined. For example, we check that the average response time is below a certain number of milliseconds. If one of these tests fail, the pipeline fails. At that point, the developer who triggered the pipeline receives a notification e-mail so that they can look into it and take the decision whether the new version is bad and revert it, or maybe the tests need some more fine-tuning and the version is okay to deploy to the rest of the servers.

The pipeline ends when the new version is deployed to production but this doesn’t necessarily mean that the version is 100% okay, although the chances that the version is not okay at this stage are low.

For the purpose of ensuring our production servers function properly, many periodic tests constantly monitor the servers and trigger alerts in case of a failure and allow us to react fast and keep our services available.

 

What we gained

  1. The automated deployment process ensures the quality of our deliveries and that they don’t break our production servers.
  2. Reduction of time developers spends on DevOps tasks.
  3. The decision whether a version in staging is okay is more accurate as it is based on comparable metrics and not on a subjective decision of the developer.
  4. The developer doesn’t need to remember which metrics to check for each service in order to tell whether a service functions properly.

Upgrade Railway Tracks Under a Moving Train

Outbrain has quite some relations with Cassandra. We have around 30 production clusters of
different sizes, ranging from small ones to cluster with 100 nodes across 3 datacenters.

Background

Cassandra has proven to be a very reliable choice as a datastore which employs an eventual consistency model. We used the Datastax Enterprise 4.8.9 distribution (which is equivalent to Cassandra 2.1) and as Cassandra 3 evolved clearly at some point we had to plan out an upgrade. We ended up with two options, either upgrade the commercial distribution version provided by Datastax or migrate to the Apache Cassandra distribution. Since Outbrain uses Cassandra extensively it was an important decision to make.
At first, we identified that we do not use any of the Datastax enterprise features and we were quite happy with the basics that are still offered by Apache Cassandra. So, at this point, we decided to give Apache Cassandra 3.1.11 a try.
But when planning an upgrade aside from the necessity to stay on the supported version there are always some goals, like leveraging a new feature and seeking for performance improvement. Our main goal was to get more from the hardware that we have and potentially reduce the number of nodes per cluster.

In addition, we wanted to have in-place upgrade which means that instead of building new clusters we could perform the migration in the production clusters themselves while they are still running, it is like upgrading the railway tracks under a moving train and we are talking about the very big train moving in high speed.

The POC

But before defining the in-place upgrade we needed to validate that the Apache Cassandra distribution fit to our use cases, and for that, we performed a POC. After selecting one of our clusters that will be used as a POC we build a new Apache Cassandra 3.1.11 cluster, following the new storage format and hence read path optimisations in Cassandra 3 we started with a small cluster of 3 nodes in order to see if we can benefit from this feature for reducing the cluster size – that is if we could achieve acceptable read and write latencies with fewer nodes.
The plan was to copy data from the old cluster using sstableloader – while the application would employ dual writes approach before we start the copying. This way we would have two clusters in sync, and we could direct our application to do some/all reads from the new cluster.
And then the problems started… at first it became clear that with our multiple datacenter’s configurations we do not want sstableloader to stream data to foreign data centers, we wanted to be able to control the target datacenter for the stream processing, we reported a bug and instead of waiting for a new release of Apache Cassandra we build our own custom build.
After passing this barrier we reached another one that was related to LeveledCompactionStrategy (LCS) and how Cassandra performs compactions. With a cluster of 3 nodes and 600GB of data in a table with LCS it took few weeks to put data into the right levels. Even with 6 nodes it took days to complete compactions following sstablesloader data load. That was a real pity, because read and write latencies were quite good even with a fewer number of nodes. Luckily Cassandra has a feature called multiple data directories. At first it does look like something to do with allowing more data but this feature actually helps enormously with LCS. So once we tried it and played with an actual number of data directories we identified that we can compact 600GB of data on L0 in less than a day with three data directories.

Encouraged with these findings we conducted various tests on our Cassandra 3.1 cluster and realised that not only we can benefit from storage space savings and faster read path – but also potentially put more data per node for clusters where we use LCS.

Storage before and after the migration

The plan

The conclusions were very good and indicated that we are ok with Apache Cassandra distribution and we can start preparing our migration plan using in-place upgrades. At the high level the
migration steps should be:

  1. Upgrade to the latest DSE 4.x version (4.8.14)
  2. Upgrade to DSE 5.0.14 (Cassandra 3)
  3. Upgrade sstables
  4. Upgrade to DSE 5.1.4 (Cassandra 3.11)
  5. Replace DSE with Apache Cassandra 3.11.1

After defining the plan, we were ready to start migrating our clusters. We mapped our clusters and categorised them according to if they are used for online serving activities or for offline activities.
Then we were good to go and started to perform the upgrade cluster by cluster.

Upgrade to the latest DSE 4.x version (4.8.14)

The first upgrade to DSE 4.8.14 (we had 4.8.9) is a minor upgrade. The reason this step is needed at all because DSE 4.8.14 includes all the important fixes to co-exist with Cassandra 3.0 during upgrade and our upgrade experience tells that indeed DSE 4.8.14 and DSE 5.0.10 (Cassandra 3.0.14) co-existed very well. The way it should be done is to upgrade nodes in a rolling fashion upgrading one node at a time. One node at a time applies to a datacenter only, datacenters could be processed in parallel (which is what we actually did).

Upgrade to DSE 5.0.14 (Cassandra 3)

Even if you aim to use DSE 5.1 you cannot really jump to it from 4.x, so 4.x  – 5.0.x – 5.1.x chain is mandatory. The steps and pre-requisites on upgrade to DSE 5.0 are documented quite well by Datastax . What is probably not documented that well is which driver version to use and how to configure it so that application smoothly works both with DSE 4.x and DSE 5.x nodes during upgrade. What we did is that we instructed our developers to use cassandra-driver-core version 3.3.0 and enforce the protocol version 3 to be used so that the driver can successfully talk to both DSE 4.x and DSE 5.x nodes. Another thing to make sure is that before starting the upgrade you should ensure that all your nodes are healthy. Why? Because one of the most important limitations about clusters running both DSE 4.x and 5.x is the fact that you cannot stream data, which means that bootstrapping a node during node replacement is simply not possible. However, for clusters where we had SLAs on latencies we wanted more confidence and for those clusters what we did was that in one datacenter we upgraded a single node to DSE 5.0.x. This is 100% safe operation, since it’s the first node with DSE 5.0.x it can be replaced using the regular node replacement procedure with DSE 4.x. One very important point here is that DSE 5.0 does not really force any schema migration till all nodes in the cluster are running DSE 5.0.x. What this means is that system keyspaces remain untouched and this is what guarantees that DSE 5.0.x node can be replaced with DSE 4.x. Since we have 2 kinds of clusters (serving and offline) with different SLA we took 2 different approaches, for the serving clusters we upgraded the datacenters sequentially leaving some datacenter running DSE 4.x, we did it because if something goes really wrong, we still would have a healthy datacenter with DSE 4.x running. And for the offline clusters we upgraded all the datacenters in parallel, both approaches worked quite well.

Upgrade sstables

Whether clusters were upgraded in parallel or sequentially we always wanted to start sstablesupgrade as quick as possible after DSE 5.0 was installed. Sstables upgrade is the most important phase of upgrade because it’s most time consuming. There are not so many knobs to tune:

  • control number of compaction threads with -j option for the nodetool upgradesstables command.
  • control the compaction throughput with nodetool setcompactionthroguhput.

Whether -j would allow to speed up sstables upgrade depends a lot on how many column families you have, how big they are and what kind of compaction strategy is configured. As for the compaction throughput we determined that for our storage system it was ok to set it to 200 MB/sec to allow regular read/write operation to be carried out at reasonable higher percentile latencies, but for sure this has to be identified for each storage system individually.

Upgrade to DSE 5.1.4 (Cassandra 3.11)

In theory, upgrade from DSE 5.0.x to DSE 5.1.x is a minor one, just need to upgrade the nodes in a rolling fashion exactly the same way as minor upgrades were done within DSE 4.x, storage format stays the same and no sstablesupgrade is required. But when upgrading our first reasonably large cluster (in terms of number of nodes) we noticed something really bad, nodes running DSE 5.1.x suffered from never-ending compactions in the system keyspace which killed their performance while DSE 5.0.x were running well. For us, it was a tough moment because that was the serving cluster with SLAs and we did not notice anything like that on our previous clusters upgrades! We suspected that something is potentially wrong with schema migrations mechanism and the command nodetool describecluster confirmed that there are two different schema versions, one from 5.0.x nodes and one from 5.1.x nodes. So, we decided to speed up DSE 5.1.x upgrade under assumption that once all nodes are 5.1.x there would no issues with different schema versions and cluster would settle. This indeed happened.

Replace DSE with Apache Cassandra 3.11.1

So how was the final migration step of replacement of DSE 5.1.4 with Apache Cassandra? In the end it worked well across all our clusters. Basically, DSE packages have to be uninstalled, Apache Cassandra has to be installed and pointed to existing data and commit_log directories.

However, two extra steps were required:

  • DSE creates dse-specific keyspaces (e.g. dse_system). This keyspace is created with EverywhereStrategy which is not supported by the Apache Cassandra. So before replacing DSE with Apache Cassandra on a given node the keyspace replication strategy has to be changed.
  • There are few system column families that receive extra columns in DSE deployments (system_peers and system_local). So, if you attempt to start Apache Cassandra after DSE is removed – it will fail on schema check against these column families. There are two ways this could be worked around – the first option is to delete all system keyspaces, set auto_bootstrap to false and also set the allow_unsafe_join JVM option to true – assuming the node’s ip address stays the same Cassandra will re-create system keyspaces and jump into the ring. However we used alternative approach – we had a bunch of scripts which first exported these tables with extra columns to json – and then we imported them on a helper cluster which had the same schema as system tables do in Apache Cassandra. Then we imported data from json files and copied the resultant sstables to the node being upgraded

Minor version, big impact

So Cassandra 3.11 clearly has vast improvements in terms of storage format efficiency and better read latency. However even within a certain Cassandra major version a minor version can introduce a bug which would heavily affect performance. So the latency improvements were achieved on Cassandra 3.11.1. But this very specific version introduced an unpleasant bug (CASSANDRA-14010, fix sstable ordering by max timestamp in singlePartitionReadCommand)

This bug was introduced during refactoring – and led to that for tables with STCS almost all sstables were queried for every read. Typically Cassandra sorts sstables by a timestamp in a descending order – and it starts answering a query by looking at the most recent sstables until it’s able to fulfil a query (with some restrictions, like for counters or unfrozen collections). The bug however made sstables to be sorted in the ascending order. It was fixed in 3.11.2 – and here is the impact – number of sstables per read for 99th percentile has dropped after the upgrade. Reading less sstables for sure leads to lower local read latencies.

 

Drop in sstables per read

Local read latency drop on Cassandra 3.11.2

Always mind page cache

On one of our newly built Cassandra 3.11.2 clusters we noticed unexpectedly high local read latencies. This came as a surprise because this cluster was specifically dedicated to a table with LCS. We spent quite some time in blktrace/blkparse/btt to confirm that our storage by itself is performing well. However we identified later that high latency is only subject for 99th and 98th percentiles – while from btt output it became clear that there write IO bursts which result in a very large blocks thanks to a scheduler merge policy. Next step was to exclude compactions – so even with compactions disabled we still had these write bursts. Looking further it became evident that page cache defaults were different from what we typically had – and the culprit is vm.dirty_ratio which was set to 20%. With 256GB of RAM flushing 20% of it certainly would result in IO burst which would affect reads. So after tuning page cache (we opted for vm.dirty_bytes and vm.dirty_background_bytes instead of vm.dirty_ratio).

Page cache flush settings impact on local read latency

The results

So, in the end – what were our gains from Apache Cassandra 3.11?
Well aside from any new features we got lots of performance improvements, it’s a major storage savings which immediately transform into lower read latencies because more data fits into OS cache which improved a lot the clusters’ performance.

 

In addition, the newer storage format results in lower JVM pressure during reads (object creation ratio decreasing 1.5 – 2 times). The following image describe the decrease in the read latency in our datacenters (NY, CHI & SA) during load tests (LT) that we performed before and after the migration

Read latency before and after the migration

Another important aspect are new metrics exposed. Few interesting ones:

  • Ability to identify consistency level requested for both reads and writes. This can be useful to track if occasionally the consistency level requested is not what would be expected for a given column family. For example, if QUORUM is requested instead of LOCAL_QUORUM due to a bug in app configuration.
  • Metrics on the network messaging.

Read metrics before the migration

Read metrics after the migration

We did something that lots of people thought that it’s impossible, to perform such in-place migration in running production clusters. Although there were some bumps in the way, it’s totally possible to conduct an in-place upgrade of DSE 4.8.x to Apache Cassandra 3.11, while maintaining acceptable latencies and avoiding any cluster downtime.

Trust your IntelliJ

I’m in the habit of leaving classes cleaner than they were before I got there. CMD+ALT+O (optimize imports) and CMD+ALT+L (reformat code) have become a muscle memory by now. I will spend my time removing unused properties, deleting dead code, removing deprecations or duplications. One of my all-time favorites is rewriting tests to make them clean and readable. I do this as part of any day to day tasks, no matter how small or insignificant. It is satisfying to know you leave your code in a better state than before (and my OCD is fed and happy).

One of the best tools I use to achieve this task is none other than the good old IntelliJ IDEA. The highlights to unused parameters, typos, accessibility level of a class or method, a parameter that is always called with the same value etc. You know what I’m talking about, that nagging little yellow square you never pay attention to or deliberately ignore because you’re always working on some important, time-sensitive feature. You “never” have time to quickly sweep and ‘clean-up’ the class you’re working on.

 

“That nagging little square”

My team at work is divided into three groups:

  • The people who use a several-major-versions-old IntelliJ (reciting the age-old mantra — “if it ain’t broke, don’t fix it”)
  • A person who uses the latest stable version of IntelliJ
  • And me, always making sure to use the latest EAP.

The first group of people doesn’t see some of the suggestions IntelliJ gives me. The first example which pops to mind is the support of lambda expressions we got with Java 8. IntelliJ is very helpful in highlighting the code that can be converted to a shorter lambda which makes the code shorter, cleaner and quite frankly more readable (which is a matter of personal taste and habit). Today I got to a class which I have never seen before to make a tiny change. It’s a class in a service I’m not usually working on, so I hadn’t had the chance to make my usual OCD powered sweep and clean up. I proceeded to change one global parameter to private accessibility from the public and added a ‘final’ keyword to another parameter. Happy and satisfied, my OCD let me commit this minor change.

Imagine how surprised I was to have my OCD rub my nose in a piece of code I entirely missed:

IntelliJ just told me that it found a bug:

After changing the code and calling the ‘putAll’ method with ‘innerMap’:

IntelliJ even added a bonus of highlighting another optimization I could do — “Lambda can be replaced with method reference.” and now the code looks like so:

Now, I know what you’re thinking. It’s something of the sort of: “where were the tests?” or “tests would have protected him from making this stupid mistake” (I can hear your scolding tone in my head right now thank you very much), it’s a valid point, but it’s not the point I’m trying to make and it’s out of the scope of this post. What I’m trying to tell you is — trust your IntelliJ!

Spark in framework

Making Spark Native for Data Processing Framework

Working in an infrastructure team is different from working in a development team. The team develops tools that are used by developers’ teams inside Outbrain. It is very important to provide a convenient way for working with new technologies. Having a Spark cluster enables running distributed data processing in scale. Until now developers worked with Spark writing ad-hoc crontab jobs in production. It created a messy and unstable environment and required manual maintenance in case of failure. For that reason, our team wanted to give an infrastructural way for working with Spark.

In this blog post, I explain how we built the solution implementing Spark to be a native tool in our WorkflowEngine.

Meet WorkflowEngine

The Outbrain Data Infrastructure team enables running hundreds of ETL jobs every hour. These jobs are orchestrated by WorkflowEngine – an internal tool, which is developed by the team.

The WorkflowEngine (WFE) enables running various types of transformations from many kinds of data sources: MySQL, Cassandra, Hive, Vertica, Hadoop etc., located in two data centers and the cloud. Every transformation, or flow, implements some business logic. A flow may contain several tasks, which are executed one after another.

Flows are developed and owned by researchers, data scientists and developers in other groups at Outbrain, such as  Algorithm research groups, BI and Data Science.

input Trigger

Defining flows in WFE gives users a lot of benefits. It enables users to autonomously specify dependencies between flows, so when one flow ends, another is triggered. In addition, users can define a number of retries for their flows, so if a flow fails for some reason, WFE reruns it. If all retries fail, WFE sends an email or alert about the flow failures. Logs for each running flow are shown in a convenient form in WFE UI. Furthermore, WFE collects metrics for all flows, which enables us to track the health of the system.

From the operational perspective, several instances of WFE are deployed on machines which are owned by the infrastructure team, users just define ETL flows in a separate repository.

Diving Into the Solution

In order to respond to requirements for running data analytics on Spark, our team had to allow an infrastructural way for running Spark jobs orchestrated by WFE. To achieve this, we’ve developed a new kind of WorkflowEngine task called SparkStep.

When approaching the design for SparkStep, we had several principals in mind:

  1. Avoid producing additional load on WFE machines
  2. Create an isolated environment for every Spark job
  3. Allow users to specify and at the same time allow the system to limit the number of YARN resources allocated for every SPARK job
  4. Enable using multiple Spark clusters with different versions

Taking all this into consideration, we came up with the following architecture:

 

Keeping the control while having fun

The Spark job definition specifies how a  job will run on the Spark cluster (e.g. jarURL, className, configuration parameters). In addition, there are properties that are set internally by WFE. When WFE starts executing a SPARK job, it builds a spark-submit command using all these properties.

The spark-submit command submits the Spark job to YARN, which allocates needed resources and launches the application. After submitting the job, WFE constantly checks its status using YARN REST API. When the job is completed in YARN, WFE reports a COMPLETED status for the WFE Spark task and continues running the next task. However, if the Spark job fails in YARN, WFE reports a FAILED status for the WFE job. In this case, WFE can rerun the job, according to the defined number of retries. If the job runs in YARN for too long, and its running time exceeds the defined maximum, WFE stops the job using the YARN REST API. In this case, the job is marked as FAILED in WFE. This functionality ensures both that we have a cap on the amount of resources each flow can use and that resources are saved when a job is canceled for any reason.

The spark-submit command runs in a Docker container. The Docker exits once the job is submitted to the cluster. It enables us to create an isolated environment for every Spark job executed by WFE. In addition, using Docker container, we can easily use different Spark versions simultaneously, by creating different Docker images.

By using this architecture we achieve all our goals. Spark jobs are submitted to YARN in a “cluster” mode, so they do not create a load on WFE machines. YARN handles all resources for each job. Every Spark job runs in an isolated environment owing to the Docker container.

X tips [x>5] for Micro-Services Logging

Micro-Services Logging

What if?

What if someone told you it is forbidden to use logs anymore?

In my case, it began in a meeting with our ops team that claimed our services are writing too many logs and we should write less. Therefor logs are being “throttled” ie: some log messages are just discarded.

Too many logs were 100 lines per minute, which in my opinion was a ridiculously low number.

Maybe I am doing something wrong?

It might be that logs are not a good pattern in a micro-services-highly-distributed environment. I decided to rethink my assumptions.

Why do I need logs anyway?

The obvious reason to write logs is debugging purposes. When something goes wrong in production, logs are a way I can understand the flow that leads to that erroneous state.

The main alternative that I am aware of is connecting a debugger and stepping thru the code in the flow. There are a couple of disadvantages for using the debugger: It is time-consuming, It might slow down the production process itself and you have to be connected when it happens — so If this bug happens only at night — bummer. In addition debugging is a one time process, the learning from it is just in your head so it is hard to improve that way.

Another alternative is adding metrics. Metrics are pretty similar to logs but they have the extra feature of having nice dashboards and alerting systems (we use metrics , prometheus and grafana). On the other hand, metrics have bigger overhead in the setup process which is the main disadvantage in my opinion. Metrics are also more rigid and do not allow usually to log all state, context, and parameters. The fact that writing logs is easy makes it a no-brainer to use everywhere in the code while applying the “think later” paradigm.

The third alternative is auditing via systems like ELK etc’. Similar to metrics it has higher overhead and it also hard to follow sequential operations with those discrete events.

There are even more reasons for logging. It is an additional documentation in the code that can help understand what is going on. Logging can also be used as metrics and alerting systems and even replace those systems. Many insights can be gained from logs, sometimes even user passwords.

Specification

If I go back to that meeting with the ops guys, one questions I was asked was:

‘What are your requirements from your logging system’?

Here it is:

Order matters — messages should be in ‘written before’ order so it will be possible to understand the flow of the code.

Zero throttling — I expect that there will be no rate limit on writing, only on the volume of saved messages, so every time a message is thrown away, it is the oldest one.

X days history — Log files should keep log messages for at least couple of days — if that is not the case it might be you are writing too many log messages, so move some to debug level.

Logs should be greppable — text files has this big advantage of flexibility and multiple tools that can be used with them. So text files are a big advantage.

Metadata — logging system should provide some metadata like calling thread, timestamp, calling method or class etc’ to remove that burden from the developer (logging should be easy).

Distributed and centralized — it should be convenient to look at all logs in a central location, but also be able to split it and see logs of a specific process.

Easy to use, easy to install, easy to consume — in general logging should be fun.

Tips and Tricks

I am in. Is there anything else to know?

Use logging framework

Don’t log to standard output with print lines in a long-running service. Logging framework allows various levels, various appenders like logging to HTML via HTTP, log rotation and all sort of features and tricks.

Developer tip — Log levels

It’s important to be consistent when using different levels, otherwise, you will lose semantics and meaning of the message severity. The way I use them is:

  • Error — something bad happened and it might crash the process/service.
  • Warn — something bad happened in a specific use case.
  • Info — something that I want to see happen.
  • Debug — something happened, I wish to see only under special circumstances otherwise it will clutter normal logs.

There are some special reasons to override that rule: Error and Warn messages are monitored in our services, so sometimes I might move something to info level to snooze alerting of it. I move messages from Info to Debug if there is too much clutter and the other direction if I want to focus on something.

Developer tip — Meaningful messages

I often see messages like “start processing” or “end method”. It is always a good idea to try and imagine what else you would like to see when reading log messages s and add as much info as possible, specific parameters, fields and contextual data which might have different value at different scenarios.

Developer tip — Lazy evaluated strings

Especially for Debug level, it is a good practice to use frameworks that prevent the overhead of string concatenation when the level is turned off without the need to explicitly check if the level isn’t suppressed. In kotlin-logging, for example, it will look like this:

logger.debug { “Some $expensive message!” }

Developer tip — Don’t log errors more than once per error

It is a common anti-pattern to log an exception and then re-throw it just to be logged later in another place in the code again. It makes it harder to understand the number of errors and their origin. Log exceptions only if you are not re-throwing them.

Ad-Hoc enablement of log level

Some frameworks and services allow on-the-fly change of the active log level. It means you can print debug messages of a specific class in a specific instance for a couple of minutes, for example. It allows debugging while not trashing the log file in the rest of the time.

Ad-Hoc addition of logging messages

When I worked at Intel, one of my peers developed a JVM tool that allowed bytecode manipulation of methods and adding log messages at the beginning of methods and at the end of methods.

It means you didn’t have to think in advance on all those messages, but just to inject them with log messages when needed while the process is running.

In-Memory logs

Another useful technique is keeping last messages in memory. It allows developing an easy way to access them from remote via REST call, for example. It is also possible to have that dumped into a file in case the process has a crash.

Logging as a poor-man profiler

It is possible to analyze logs to gain also insight on the performance of the application. The simple technique I saw is using the timestamp in the logs. A more advanced technique is using the context to calculate and show the time from the beginning of the sequence (ie: when the HTTP call started), by using MDC.

Log formatting

The content of the message is also important. Various logging framework allows embedding predefined template parameters such as:

  • Location info — Class and file name, method name and line number of where the log message was issued.
  • Date and time.
  • Log level as discussed above.
  • Thread info — relevant to multi-threads environments to be able to separate different flows.
  • Context info — similar to thread info but more specific to a use case, add context information like user id, request id etc’. Framework features like MDC make it easier to implement

I highly recommend using those, but bare in mind that some features pose performance overhead when they are evaluated.

Logging — the essentials

Logging is a big world, I couldn’t cover all of it here, but I hope I convinced you to use it.

Have fun and keep on logging!

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 3

Previously on our second episode of the trilogy  “Hadoop Research Journey from bare metal to Google Cloud – Episode 2”, we covered the POC we had.

In this episode we will focus on the migration itself, building a POC environment is all nice and easy, however migrating 2 PB (the raw part out of 6 PB which include the replication) of data turned to be a new challenge. But before we jump into technical issues, lets start with the methodology.

The big migration

We learned from our past experience that in order for such a project to be successful, like in many other cases, it is all about the people – you need to be minded to the users and make sure you have their buy-in.

On top of that, we wanted to complete the migration within 4 months, as we had a renewal of our datacenter space coming up, and we wanted to gain from the space reduction as result of the migration.

Taking those two considerations in mind, we decided that we will have the same technologies which are Hadoop and Hive on the cloud environment, and only after the migration is done we would look into leveraging new technologies available on GCP.

Now after the decision was made we started to plan the migration of the research cluster to GCP, looking into different aspects as:

  • Build the network topology (VPN, VPC etc.)
  • Copy the historical data
  • Create the data schema (Hive)
  • Enable the runtime data delivery
  • Integrate our internal systems (monitoring, alerts, provision etc.)
  • Migrate the workflows
  • Reap the bare metal cluster (with all its supporting systems)

All in the purpose of productizing the solution and making it production grade, based on our standards. We made a special effort to leverage the same management and configuration control tools we use in our internal datacenters (such as Chef, Prometheus etc.) – so we would treat this environment as yet just another datacenter.

Copying the data

Sound like a straightforward activity – you need to copy your data from location A to location B.

Well, turns out that when you need to copy 2 PB of data, while the system is still active in production, there are some challenges involved.

The first restriction we had, was that the copy of data will not impact the usage of the cluster – as the research work still need to be performed.

Second, once data is copied, we also need to have data validation.

 

Starting with data copy

  • Option 1 – Copy the data using Google Transfer Appliance

Google can ship their transfer appliance (based on the location of your datacenter), that you would attach to the Hadoop Cluster and be used to copy the data. Ship it back to Google and download the data from the appliance to GCS.

Unfortunately, from the capacity perspective we would need to have several iterations of this process in order to copy all the data, and on top of that the Cloudera community version we were using was so old – it was not supported.

  • Option 2 – Copy the data over the network

When taking that path, the main restriction is that the network is used for both the production environment (serving) and for the copy, and we could not allow the copy to create network congestion on the lines.

However, if we restrict the copy process, the time it would take to copy all the data will be too long and we will not be able to meet our timelines.

Setup the network

As part of our network infrastructure, per datacenter we have 2 ISPs, each with 2 x 10G lines for backup and redundancy.

We decided to leverage those backup lines and build a tunnel on those lines, to be dedicated only to the Hadoop data copy. This enabled us to copy the data in relatively short time on one hand, and assure that it will not impact our production traffic as it was contained to specific lines.

Once the network was ready we started to copy the data to the GCS.

As you may remember from previous episodes, our cluster was set up over 6 years ago, and as such acquired a lot of tech debt around it, also in the data kept in it. We decided to take advantage of the situation and leverage the migration also to do some data and workload cleanup.

We invested time in mapping what data we need and what data can be cleared, although it didn’t significantly reduce the data size we managed to delete 80% of the tables, we also managed to delete 80% of the workload.

Data validation

As we migrated the data, we had to have data validation, making sure there are no corruptions / missing data.

More challenges on the data validation aspects to take into consideration –

  • The migrated cluster is a live cluster – so new data keeps been added to it and old data deleted
  • With our internal Hadoop cluster, all tables are stored as files while on GCS they are stored as objects.

It was clear that we need to automate the process of data validation and build dashboards to help us monitor our progress.

We ended up implementing a process that creates two catalogs, one for the bare metal internal Hadoop cluster and one for the GCP environment, comparing those catalogs and alerting us to any differences.

This dashboard shows per table the files difference between the bare metal cluster and the cloud

 

In parallel to the data migration, we worked on building the Hadoop ecosystem on GCP, including the tables schemas with their partitions in Hive, our runtime data delivery systems adding new data to the GCP environment in parallel to the internal bare metal Hadoop cluster, our monitoring systems, data retention systems etc.

The new environment on GCP was finally ready and we were ready to migrate the workloads. Initially, we duplicated jobs to run in parallel on both clusters, making sure we complete validation and will not impact production work.

After a month of validation, parallel work and required adjustments we were able to decommission the in-house Research Cluster.

What we achieved in this journey

  • Upgraded the technology
  • Improve the utilization and gain the required elasticity we wanted
  • Reduced the total cost
  • Introduced new GCP tools and technologies

Epilogue

This amazing journey lasted for almost 6 months of focused work. As planned the first step was to use the same technologies that we had in the bare metal cluster but once we finished the migration to GCP we can now start planning how to further take advantage of the new opportunities that arise from leveraging GCP technologies and tools.

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 2

Previously on our first episode of the trilogy  “Hadoop Research Journey from bare metal to Google Cloud – Episode 1”, we covered our challenges.

In this episode, I am looking to focus on the POC that we did in order to decide whether we should rebuild the Research cluster in-house or migrate it to the cloud.

The POC

As we had many open questions around migration to the cloud, we decided to do a learning POC, focusing on 3 main questions:

  1. Understand the learning curve that will be required from the users
  2. Compatibility with our in-house Online Hadoop clusters
  3. Estimate cost for running the Research cluster in the Cloud

However, before jumping into the water of the POC, we had some preliminary work to be done.

Mapping workloads

As the Research cluster was running for over 6 years already, there were many different use cases running on it. Some of which are well known and familiar to users, but some are old tech debts which no one knew if needed or not, and what is their value.

We started with mapping all the flows and use cases running on the cluster, mapped users and assigned owners to the different workflows.

We also created distinction between ad-hoc queries and batch processing.

Mapping technologies

We mapped all the technologies we need to support on the Research cluster in order to assure full compatibility with our Online clusters and in-house environment.

After collecting all the required information regarding the use cases and mapping the technologies we selected representative workflows and users to participate in the POC and take active part in it, collecting their feedback regarding the learning curve and ease of use. This approach will also serve us well later on, if we decide to move forward with the migration, having in house ambassadors.

Once we mapped all our needs, it was also easier to get from the different cloud vendors high level cost estimation, to give us a general indication if it makes sense for us to continue and invest time and resources in doing the POC.

 

We wanted to complete the POC within 1 month, so on one hand it will run long enough to cover all types of jobs, but on the other hand it will not be prolonged.

For the POC environment we built Hadoop cluster, based on standard technologies.

We decided not to leverage at this point special proprietary vendor technologies, as we wanted to reduce the learning curve and were careful not to get into a vendor lock-in.

 

In addition, we decided to start the POC only with one vendor, and not to run it on multiple cloud vendors.

The reason behind it was our mindfulness to our internal resources and time constraints.

We did theoretical evaluation of technology roadmap and cost for several Cloud vendors, and choose to go with GCP option, looking to also leverage BigQuery in the future (once all our data will be migrated).

The execution

Once we decided on the vendor, technologies and use cases we were good to go.

For the purpose of the POC we migrated 500TB of our data, build the Hadoop cluster based on Data Proc, and build the required endpoint machines.

Needless to say, that already in this stage we had to create the network infrastructure to support the secure work of the hybrid environment between GCP and our internal datacenters.

Now that everything was ready we started the actual POC from the users perspective. For a period of one month the participate users will perform their use cases twice. Once on the in-house Research cluster (the production environment), and second time on the Research cluster build on GCP (the POC environment). The users were required to record their experience, which was measured according to the flowing criteria:

  • Compatibility (did the test run seamlessly, any modifications to code and queries required, etc.)
  • Performance (execution time, amount of resources used)
  • Ease of use

During the month of the POC we worked closely with the users, gathered their overall experience and results.

In addition, we documented the compute power needed to execute those jobs, which enabled us to do better cost estimation for how much it would cost to run the full Research Cluster on the cloud.

The POC was successful

The users had a good experience, and our cost analysis proved that with leveraging the cloud elasticity, which in this scenario was very significant, the cloud option would be ROI positive compared with the investment we would need to do building the environment internally. (without getting into the exact numbers – over 40% cheaper, which is a nice incentive!)

With that we started our last phase – the actual migration, which is the focus of our last episode in “Hadoop Research Journey from Bare Metal to Google Cloud – Episode 3”. Stay tuned!

Taking the pain out of Data Science – part 3

This is the third and last post on our machine learning framework.
Post #1 covers the challenges we face and gives an overview of our solution.
Post #2 focuses on how we handle our data and make it more accessible.
This part will focus on what we do once our dataset is ready and organized – a framework for building new models, and for deploying them to production.

Modeling challenges and boilerplate

Building a model contains many common parts.

First is handling the input. Even after we sorted out the data with our Data Collection process, we still need to read it and split correctly to train-test, and read data for our simulation process.

Another common part is evaluating the test metrics.
Running the model on the test data, and displaying different test metrics, such as MSE, AUC and other metrics, to see how well the model performs.

Third, is checking the business metrics.
Before trying a model in production, we want to simulate how the model behaves regarding the business KPI’s.
We evaluate a number of metrics that serve as good proxies to the business performance.

Our goal in this framework is to make the life of the data scientists easier – letting them focus on the models rather than writing time consuming, boilerplate code.

Taking the pain out of Data Science – part 3

Model Framework

We wanted to create a framework that includes these parts out-of-the-box.
Runs the fitting process, saves the model, tests the performance and runs simulation.

Model Framework

All the data scientists should focus on, is their model’s logic.
They can use any Spark ML packages, open source implementations or their own in-house implementations; the rest they get “for free” from the framework.

The interface they need to implement is simple:

  • Preparing the dataset – extracting new features, transforming the data.
  • Fitting the model on the data – the actual logic of the algorithm.
  • Returning the column names (features) that are required for the model to operate
  • Saving a representation of the model for later use

interface

Models productization

The final part of the framework is to bridge between research and production.
We want this transition to be as simple and fast as possible, to allow us to reach conclusions quickly and keep improving.

First, we want to allow fast and easy A/B testing of new models.

A quick reminder of how A/B tests work: we split the population into 2 independent groups of similar users.
We serve one group with the treatment – the new model and serve the other group with the control – our production baseline that the system currently uses.
After running for a while, we analyze the data, evaluate engagement & monetization metrics using statistical tests and conclude whether the treatment has managed to provide significant improvement.

Models productization

To support this, we added a step at the end of the framework.
The step reads the model’s coefficients, and updates the A/B test configurations with a variant that will serve a small portion of our users with the new model.

In a similar fashion, once we have a model with proven value – we want to tune it on a regular basis so it will keep learning based on new data we collect.

  • We run the whole modeling flow on a regular basis, triggered by our ETL engine.
    A new model is created with updated variables.
  • Each run, we validate the business metrics, to make sure the model keeps performing well and doesn’t deteriorate.
  • Finally, if the metrics were positive, we update the production configurations.

High level design

Here is a quick high level overview of this part of the system:

Our input data is stored on 2 hive tables, after being prepared by the Data Collection process.
The model is created using the model generator, that initializes the implementation based on the job’s configuration.
The framework then runs all the common parts:

  • Reads the data and splits it into test, train and simulation
  • Calls the model’s fit implementation and saves the result.
  • Uses the model for test data predictions and stores the results for analysis.
  • Runs the model on the simulation data and calculates the business KPI simulation metrics.
  • Saves the model for later use on HDFS.

All the results are stored as on Cassandra.

The last part is the productization step:
It gets the updated model variables from the fit output, validates the simulation metrics to verify the model’s performance, and updates the production configurations on MySQL.

All the results are stored as on Cassandra.
Takeaways

To sum up, here are the key lessons we learned that evolved to this framework:

  • Prepare your data well – to enable high scale modeling, this is crucial!
    I cannot over-emphasize how important this is, in order to avoid drowning in data and spending a lot of the research time with endless queries.
  • Build an effective research cycle – invest the time to build a good big data machine learning framework. It will really pay off in the long run and keep your data scientists productive and happier.
  • Connect research and production – research results are worthless if it takes forever to apply them in your product. Shorter cycles will enable you to try out more models and implementations and keep improving.
    Aim to make this as quick and easy as possible.