Category: Cloud

CodinGame Story One – The key for creativity and happiness in developers life

Photo by Juan Gomez on Unsplash

“Keep a developer learning and they’ll be happy working in a windowless basement eating stale food pushed through a slot in the door. And they’ll never ask for a raise.” — Rob Walling (

The past decade has produced substantial research verifying what may come as no surprise: developers want to have fun. While we also need our salaries, salaries alone will not incentivize us developers who, in most cases, entered a field to do what we love: engage in problem-solving. We like competition. We like winning. We like getting prizes for winning. To be productive, we need job satisfaction. And job satisfaction can be achieved only if we get to have fun using the skills we were hired to use.


We wanted to keep the backend developers challenged and entertained.
That’s why Guy Kobrinsky and I created our own version of Haggling, whose basic idea we adapted from Hola, a negotiation game.

The Negotiation Game:

Haggling consists of rounds of negotiations between pairs of players. Each pair’s goal is to maximize score in the following manner:

Let’s say there are a sunglasses, two tickets, and three cups on the table. Both players have to agree on how to split these objects between them. To one, the sunglasses may be worth $4, a ball $2, and the tickets are worthless. The opponent might value the same objects differently; while the total worth of all the objects is the same for both players, their valuation kept secret

Both players take turns making offers to each other about how to split the goods. A proposed split must distribute all objects between partners such that no items are left on the table. On each turn, one can either accept an offer or make a counter-offer. If after 9 offers an agreement is reached, every player receives the amount that its portion of the goods is worth, according to the assigned values. If there is still no agreement after the last turn, both players receive no points.

The Object of the Game:

Write code to obtain a collection of items with the highest value by negotiating items with an opponent player.

User Experience:

We wanted it to be as easy as possible for players to submit, play and test their code.
Therefore, we decided to keep player code simple – not relying on any third-party libraries.
To do this, we built a simple web application for testing and submitting code, supplying a placeholder with the method “accept” – the code that needs to be implemented by the different participants. The “accept” method describes a single iteration within the negotiation, in which each player must decide if they will accept the offer given to them (by returning null or the received offer) – or return a counter offer.


To assist in verifying the players’ strategy, we added a testing feature allowing players to run their code vs some random player.  Developers were able to play around with it, re-implementing the code before actual submission.


Java Code Example:


Test Your Code and Submit Online:


Tournament And Scoreboard:

Practice tournaments ran continuously for two weeks, taking all submitted players into account and allowing developers to see their rank. During this time, competitors were able edit their code. So there was plenty of time to learn and improve.

We also provided analytics for every player. Developers were able to analyze and improve their strategy.


At the end of the two weeks, we declared a code freeze and the real tournament took place. Players’ final score was determined only from the results of the real tournament, not the practice tournaments.


Game Execution And Score:

We executed the game tournament using multiple agents – each of the agents was reported to Kibana:


The Back-Stage:

Where did we store players’ code?
We decided to store all players’ code in S3 of AWS to avoid revealing the code to other players.

What languages were supported?

We started with Java only, but players expressed interest in using Scala and Kotlin as well. So we gave these developers free rein to add support for those languages, which we then reviewed before integrating into the base code. Ultimately, developers were able to play in all three languages.  

What was the scale of Haggling?

In the final tournament, 91 players competed in 164 million rounds in which 1.14 billion “accepts” were called. The tournament was executed on 45 servers, having 360 cores and using 225G of memory.

The greatest advantage of our approach was our decision to use Kubernetes, enabling us to add more nodes, as well as tune their cores and memory requirements. Needless to say, it was no problem to get rid of all these machines when the game period ended.


How did the tournament progress?
The tournament was tense, and we saw a lot of interaction with the game over the two weeks.
The player in the winning position changed every day, and the final winner was
not apparent until very near the end (and even then we were surprised!).
We saw a variety of single-player strategies with sophisticated calculations and different approaches to game play.
Moreover, in contrast to the original game, we allowed gangs: groups of players belonging to a single team that can “help” each other to win.

So how do you win at haggling?

The winning strategy was collaborative – the winning team created two types of players: the “Overlord” which played to win, and several “Minions” whose job was to give points to the Overlord while blocking other players.  The Overlord and Minions recognized each other using a triple handshake protocol, based on mathematical calculations of the game parameters.  Beyond this, the team employed a human psychological strategy – hiding the strength of the Overlord by ensuring that for the majority of the development period the Overlord went no higher than third place.  They populated the game with “sleeper cells” – players with basic strategies ready to turn into minions at the right moment.  The upheaval occurred in the final hour of the game, when all sleepers were converted to minions.

The graph shows the number of commits in the last hour before code freeze:


Hats Off to the Hacker: who got the better of us?

During the two weeks, we noticed multiple hacking attempts. The hacker’s intent was not to crash the game, but rather to prove that it is possible and make a lesson of it.
Although it was not our initial intent, we decided to make hacking part of the challenge, and to reward the hacker for demonstrated skills and creativity.

On the morning of November 7th, we arrived at the office and were faced with the following graph of the outcomes:

The game had been hacked! As can be seen in the graph, one player was achieving an impossible success rate. What we discovered was the following: the read-only hash map that we provided as method argument to players was written in Kotlin; but, when players converted the map to play in either Java or Scala, the resulting conversion rendered a mutable hashmap, and this is how one of the players was able to modify the hash map. We had failed to validate the preferences, ensuring that the hash-map values that players turned in used the same values as the original.

In conclusion, This is exactly the sort of sandbox experience, however, that makes us better, safer, and smarter developers. We embraced the challenge.

Want to play with us? Join Outbrain and challenge yourself.


Upgrade Railway Tracks Under a Moving Train

Outbrain has quite some relations with Cassandra. We have around 30 production clusters of
different sizes, ranging from small ones to cluster with 100 nodes across 3 datacenters.


Cassandra has proven to be a very reliable choice as a datastore which employs an eventual consistency model. We used the Datastax Enterprise 4.8.9 distribution (which is equivalent to Cassandra 2.1) and as Cassandra 3 evolved clearly at some point we had to plan out an upgrade. We ended up with two options, either upgrade the commercial distribution version provided by Datastax or migrate to the Apache Cassandra distribution. Since Outbrain uses Cassandra extensively it was an important decision to make.
At first, we identified that we do not use any of the Datastax enterprise features and we were quite happy with the basics that are still offered by Apache Cassandra. So, at this point, we decided to give Apache Cassandra 3.1.11 a try.
But when planning an upgrade aside from the necessity to stay on the supported version there are always some goals, like leveraging a new feature and seeking for performance improvement. Our main goal was to get more from the hardware that we have and potentially reduce the number of nodes per cluster.

In addition, we wanted to have in-place upgrade which means that instead of building new clusters we could perform the migration in the production clusters themselves while they are still running, it is like upgrading the railway tracks under a moving train and we are talking about the very big train moving in high speed.


But before defining the in-place upgrade we needed to validate that the Apache Cassandra distribution fit to our use cases, and for that, we performed a POC. After selecting one of our clusters that will be used as a POC we build a new Apache Cassandra 3.1.11 cluster, following the new storage format and hence read path optimisations in Cassandra 3 we started with a small cluster of 3 nodes in order to see if we can benefit from this feature for reducing the cluster size – that is if we could achieve acceptable read and write latencies with fewer nodes.
The plan was to copy data from the old cluster using sstableloader – while the application would employ dual writes approach before we start the copying. This way we would have two clusters in sync, and we could direct our application to do some/all reads from the new cluster.
And then the problems started… at first it became clear that with our multiple datacenter’s configurations we do not want sstableloader to stream data to foreign data centers, we wanted to be able to control the target datacenter for the stream processing, we reported a bug and instead of waiting for a new release of Apache Cassandra we build our own custom build.
After passing this barrier we reached another one that was related to LeveledCompactionStrategy (LCS) and how Cassandra performs compactions. With a cluster of 3 nodes and 600GB of data in a table with LCS it took few weeks to put data into the right levels. Even with 6 nodes it took days to complete compactions following sstablesloader data load. That was a real pity, because read and write latencies were quite good even with a fewer number of nodes. Luckily Cassandra has a feature called multiple data directories. At first it does look like something to do with allowing more data but this feature actually helps enormously with LCS. So once we tried it and played with an actual number of data directories we identified that we can compact 600GB of data on L0 in less than a day with three data directories.

Encouraged with these findings we conducted various tests on our Cassandra 3.1 cluster and realised that not only we can benefit from storage space savings and faster read path – but also potentially put more data per node for clusters where we use LCS.

Storage before and after the migration

The plan

The conclusions were very good and indicated that we are ok with Apache Cassandra distribution and we can start preparing our migration plan using in-place upgrades. At the high level the
migration steps should be:

  1. Upgrade to the latest DSE 4.x version (4.8.14)
  2. Upgrade to DSE 5.0.14 (Cassandra 3)
  3. Upgrade sstables
  4. Upgrade to DSE 5.1.4 (Cassandra 3.11)
  5. Replace DSE with Apache Cassandra 3.11.1

After defining the plan, we were ready to start migrating our clusters. We mapped our clusters and categorised them according to if they are used for online serving activities or for offline activities.
Then we were good to go and started to perform the upgrade cluster by cluster.

Upgrade to the latest DSE 4.x version (4.8.14)

The first upgrade to DSE 4.8.14 (we had 4.8.9) is a minor upgrade. The reason this step is needed at all because DSE 4.8.14 includes all the important fixes to co-exist with Cassandra 3.0 during upgrade and our upgrade experience tells that indeed DSE 4.8.14 and DSE 5.0.10 (Cassandra 3.0.14) co-existed very well. The way it should be done is to upgrade nodes in a rolling fashion upgrading one node at a time. One node at a time applies to a datacenter only, datacenters could be processed in parallel (which is what we actually did).

Upgrade to DSE 5.0.14 (Cassandra 3)

Even if you aim to use DSE 5.1 you cannot really jump to it from 4.x, so 4.x  – 5.0.x – 5.1.x chain is mandatory. The steps and pre-requisites on upgrade to DSE 5.0 are documented quite well by Datastax . What is probably not documented that well is which driver version to use and how to configure it so that application smoothly works both with DSE 4.x and DSE 5.x nodes during upgrade. What we did is that we instructed our developers to use cassandra-driver-core version 3.3.0 and enforce the protocol version 3 to be used so that the driver can successfully talk to both DSE 4.x and DSE 5.x nodes. Another thing to make sure is that before starting the upgrade you should ensure that all your nodes are healthy. Why? Because one of the most important limitations about clusters running both DSE 4.x and 5.x is the fact that you cannot stream data, which means that bootstrapping a node during node replacement is simply not possible. However, for clusters where we had SLAs on latencies we wanted more confidence and for those clusters what we did was that in one datacenter we upgraded a single node to DSE 5.0.x. This is 100% safe operation, since it’s the first node with DSE 5.0.x it can be replaced using the regular node replacement procedure with DSE 4.x. One very important point here is that DSE 5.0 does not really force any schema migration till all nodes in the cluster are running DSE 5.0.x. What this means is that system keyspaces remain untouched and this is what guarantees that DSE 5.0.x node can be replaced with DSE 4.x. Since we have 2 kinds of clusters (serving and offline) with different SLA we took 2 different approaches, for the serving clusters we upgraded the datacenters sequentially leaving some datacenter running DSE 4.x, we did it because if something goes really wrong, we still would have a healthy datacenter with DSE 4.x running. And for the offline clusters we upgraded all the datacenters in parallel, both approaches worked quite well.

Upgrade sstables

Whether clusters were upgraded in parallel or sequentially we always wanted to start sstablesupgrade as quick as possible after DSE 5.0 was installed. Sstables upgrade is the most important phase of upgrade because it’s most time consuming. There are not so many knobs to tune:

  • control number of compaction threads with -j option for the nodetool upgradesstables command.
  • control the compaction throughput with nodetool setcompactionthroguhput.

Whether -j would allow to speed up sstables upgrade depends a lot on how many column families you have, how big they are and what kind of compaction strategy is configured. As for the compaction throughput we determined that for our storage system it was ok to set it to 200 MB/sec to allow regular read/write operation to be carried out at reasonable higher percentile latencies, but for sure this has to be identified for each storage system individually.

Upgrade to DSE 5.1.4 (Cassandra 3.11)

In theory, upgrade from DSE 5.0.x to DSE 5.1.x is a minor one, just need to upgrade the nodes in a rolling fashion exactly the same way as minor upgrades were done within DSE 4.x, storage format stays the same and no sstablesupgrade is required. But when upgrading our first reasonably large cluster (in terms of number of nodes) we noticed something really bad, nodes running DSE 5.1.x suffered from never-ending compactions in the system keyspace which killed their performance while DSE 5.0.x were running well. For us, it was a tough moment because that was the serving cluster with SLAs and we did not notice anything like that on our previous clusters upgrades! We suspected that something is potentially wrong with schema migrations mechanism and the command nodetool describecluster confirmed that there are two different schema versions, one from 5.0.x nodes and one from 5.1.x nodes. So, we decided to speed up DSE 5.1.x upgrade under assumption that once all nodes are 5.1.x there would no issues with different schema versions and cluster would settle. This indeed happened.

Replace DSE with Apache Cassandra 3.11.1

So how was the final migration step of replacement of DSE 5.1.4 with Apache Cassandra? In the end it worked well across all our clusters. Basically, DSE packages have to be uninstalled, Apache Cassandra has to be installed and pointed to existing data and commit_log directories.

However, two extra steps were required:

  • DSE creates dse-specific keyspaces (e.g. dse_system). This keyspace is created with EverywhereStrategy which is not supported by the Apache Cassandra. So before replacing DSE with Apache Cassandra on a given node the keyspace replication strategy has to be changed.
  • There are few system column families that receive extra columns in DSE deployments (system_peers and system_local). So, if you attempt to start Apache Cassandra after DSE is removed – it will fail on schema check against these column families. There are two ways this could be worked around – the first option is to delete all system keyspaces, set auto_bootstrap to false and also set the allow_unsafe_join JVM option to true – assuming the node’s ip address stays the same Cassandra will re-create system keyspaces and jump into the ring. However we used alternative approach – we had a bunch of scripts which first exported these tables with extra columns to json – and then we imported them on a helper cluster which had the same schema as system tables do in Apache Cassandra. Then we imported data from json files and copied the resultant sstables to the node being upgraded

Minor version, big impact

So Cassandra 3.11 clearly has vast improvements in terms of storage format efficiency and better read latency. However even within a certain Cassandra major version a minor version can introduce a bug which would heavily affect performance. So the latency improvements were achieved on Cassandra 3.11.1. But this very specific version introduced an unpleasant bug (CASSANDRA-14010, fix sstable ordering by max timestamp in singlePartitionReadCommand)

This bug was introduced during refactoring – and led to that for tables with STCS almost all sstables were queried for every read. Typically Cassandra sorts sstables by a timestamp in a descending order – and it starts answering a query by looking at the most recent sstables until it’s able to fulfil a query (with some restrictions, like for counters or unfrozen collections). The bug however made sstables to be sorted in the ascending order. It was fixed in 3.11.2 – and here is the impact – number of sstables per read for 99th percentile has dropped after the upgrade. Reading less sstables for sure leads to lower local read latencies.


Drop in sstables per read

Local read latency drop on Cassandra 3.11.2

Always mind page cache

On one of our newly built Cassandra 3.11.2 clusters we noticed unexpectedly high local read latencies. This came as a surprise because this cluster was specifically dedicated to a table with LCS. We spent quite some time in blktrace/blkparse/btt to confirm that our storage by itself is performing well. However we identified later that high latency is only subject for 99th and 98th percentiles – while from btt output it became clear that there write IO bursts which result in a very large blocks thanks to a scheduler merge policy. Next step was to exclude compactions – so even with compactions disabled we still had these write bursts. Looking further it became evident that page cache defaults were different from what we typically had – and the culprit is vm.dirty_ratio which was set to 20%. With 256GB of RAM flushing 20% of it certainly would result in IO burst which would affect reads. So after tuning page cache (we opted for vm.dirty_bytes and vm.dirty_background_bytes instead of vm.dirty_ratio).

Page cache flush settings impact on local read latency

The results

So, in the end – what were our gains from Apache Cassandra 3.11?
Well aside from any new features we got lots of performance improvements, it’s a major storage savings which immediately transform into lower read latencies because more data fits into OS cache which improved a lot the clusters’ performance.


In addition, the newer storage format results in lower JVM pressure during reads (object creation ratio decreasing 1.5 – 2 times). The following image describe the decrease in the read latency in our datacenters (NY, CHI & SA) during load tests (LT) that we performed before and after the migration

Read latency before and after the migration

Another important aspect are new metrics exposed. Few interesting ones:

  • Ability to identify consistency level requested for both reads and writes. This can be useful to track if occasionally the consistency level requested is not what would be expected for a given column family. For example, if QUORUM is requested instead of LOCAL_QUORUM due to a bug in app configuration.
  • Metrics on the network messaging.

Read metrics before the migration

Read metrics after the migration

We did something that lots of people thought that it’s impossible, to perform such in-place migration in running production clusters. Although there were some bumps in the way, it’s totally possible to conduct an in-place upgrade of DSE 4.8.x to Apache Cassandra 3.11, while maintaining acceptable latencies and avoiding any cluster downtime.

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 3

Previously on our second episode of the trilogy  “Hadoop Research Journey from bare metal to Google Cloud – Episode 2”, we covered the POC we had.

In this episode we will focus on the migration itself, building a POC environment is all nice and easy, however migrating 2 PB (the raw part out of 6 PB which include the replication) of data turned to be a new challenge. But before we jump into technical issues, lets start with the methodology.

The big migration

We learned from our past experience that in order for such a project to be successful, like in many other cases, it is all about the people – you need to be minded to the users and make sure you have their buy-in.

On top of that, we wanted to complete the migration within 4 months, as we had a renewal of our datacenter space coming up, and we wanted to gain from the space reduction as result of the migration.

Taking those two considerations in mind, we decided that we will have the same technologies which are Hadoop and Hive on the cloud environment, and only after the migration is done we would look into leveraging new technologies available on GCP.

Now after the decision was made we started to plan the migration of the research cluster to GCP, looking into different aspects as:

  • Build the network topology (VPN, VPC etc.)
  • Copy the historical data
  • Create the data schema (Hive)
  • Enable the runtime data delivery
  • Integrate our internal systems (monitoring, alerts, provision etc.)
  • Migrate the workflows
  • Reap the bare metal cluster (with all its supporting systems)

All in the purpose of productizing the solution and making it production grade, based on our standards. We made a special effort to leverage the same management and configuration control tools we use in our internal datacenters (such as Chef, Prometheus etc.) – so we would treat this environment as yet just another datacenter.

Copying the data

Sound like a straightforward activity – you need to copy your data from location A to location B.

Well, turns out that when you need to copy 2 PB of data, while the system is still active in production, there are some challenges involved.

The first restriction we had, was that the copy of data will not impact the usage of the cluster – as the research work still need to be performed.

Second, once data is copied, we also need to have data validation.


Starting with data copy

  • Option 1 – Copy the data using Google Transfer Appliance

Google can ship their transfer appliance (based on the location of your datacenter), that you would attach to the Hadoop Cluster and be used to copy the data. Ship it back to Google and download the data from the appliance to GCS.

Unfortunately, from the capacity perspective we would need to have several iterations of this process in order to copy all the data, and on top of that the Cloudera community version we were using was so old – it was not supported.

  • Option 2 – Copy the data over the network

When taking that path, the main restriction is that the network is used for both the production environment (serving) and for the copy, and we could not allow the copy to create network congestion on the lines.

However, if we restrict the copy process, the time it would take to copy all the data will be too long and we will not be able to meet our timelines.

Setup the network

As part of our network infrastructure, per datacenter we have 2 ISPs, each with 2 x 10G lines for backup and redundancy.

We decided to leverage those backup lines and build a tunnel on those lines, to be dedicated only to the Hadoop data copy. This enabled us to copy the data in relatively short time on one hand, and assure that it will not impact our production traffic as it was contained to specific lines.

Once the network was ready we started to copy the data to the GCS.

As you may remember from previous episodes, our cluster was set up over 6 years ago, and as such acquired a lot of tech debt around it, also in the data kept in it. We decided to take advantage of the situation and leverage the migration also to do some data and workload cleanup.

We invested time in mapping what data we need and what data can be cleared, although it didn’t significantly reduce the data size we managed to delete 80% of the tables, we also managed to delete 80% of the workload.

Data validation

As we migrated the data, we had to have data validation, making sure there are no corruptions / missing data.

More challenges on the data validation aspects to take into consideration –

  • The migrated cluster is a live cluster – so new data keeps been added to it and old data deleted
  • With our internal Hadoop cluster, all tables are stored as files while on GCS they are stored as objects.

It was clear that we need to automate the process of data validation and build dashboards to help us monitor our progress.

We ended up implementing a process that creates two catalogs, one for the bare metal internal Hadoop cluster and one for the GCP environment, comparing those catalogs and alerting us to any differences.

This dashboard shows per table the files difference between the bare metal cluster and the cloud


In parallel to the data migration, we worked on building the Hadoop ecosystem on GCP, including the tables schemas with their partitions in Hive, our runtime data delivery systems adding new data to the GCP environment in parallel to the internal bare metal Hadoop cluster, our monitoring systems, data retention systems etc.

The new environment on GCP was finally ready and we were ready to migrate the workloads. Initially, we duplicated jobs to run in parallel on both clusters, making sure we complete validation and will not impact production work.

After a month of validation, parallel work and required adjustments we were able to decommission the in-house Research Cluster.

What we achieved in this journey

  • Upgraded the technology
  • Improve the utilization and gain the required elasticity we wanted
  • Reduced the total cost
  • Introduced new GCP tools and technologies


This amazing journey lasted for almost 6 months of focused work. As planned the first step was to use the same technologies that we had in the bare metal cluster but once we finished the migration to GCP we can now start planning how to further take advantage of the new opportunities that arise from leveraging GCP technologies and tools.

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 2

Previously on our first episode of the trilogy  “Hadoop Research Journey from bare metal to Google Cloud – Episode 1”, we covered our challenges.

In this episode, I am looking to focus on the POC that we did in order to decide whether we should rebuild the Research cluster in-house or migrate it to the cloud.


As we had many open questions around migration to the cloud, we decided to do a learning POC, focusing on 3 main questions:

  1. Understand the learning curve that will be required from the users
  2. Compatibility with our in-house Online Hadoop clusters
  3. Estimate cost for running the Research cluster in the Cloud

However, before jumping into the water of the POC, we had some preliminary work to be done.

Mapping workloads

As the Research cluster was running for over 6 years already, there were many different use cases running on it. Some of which are well known and familiar to users, but some are old tech debts which no one knew if needed or not, and what is their value.

We started with mapping all the flows and use cases running on the cluster, mapped users and assigned owners to the different workflows.

We also created distinction between ad-hoc queries and batch processing.

Mapping technologies

We mapped all the technologies we need to support on the Research cluster in order to assure full compatibility with our Online clusters and in-house environment.

After collecting all the required information regarding the use cases and mapping the technologies we selected representative workflows and users to participate in the POC and take active part in it, collecting their feedback regarding the learning curve and ease of use. This approach will also serve us well later on, if we decide to move forward with the migration, having in house ambassadors.

Once we mapped all our needs, it was also easier to get from the different cloud vendors high level cost estimation, to give us a general indication if it makes sense for us to continue and invest time and resources in doing the POC.


We wanted to complete the POC within 1 month, so on one hand it will run long enough to cover all types of jobs, but on the other hand it will not be prolonged.

For the POC environment we built Hadoop cluster, based on standard technologies.

We decided not to leverage at this point special proprietary vendor technologies, as we wanted to reduce the learning curve and were careful not to get into a vendor lock-in.


In addition, we decided to start the POC only with one vendor, and not to run it on multiple cloud vendors.

The reason behind it was our mindfulness to our internal resources and time constraints.

We did theoretical evaluation of technology roadmap and cost for several Cloud vendors, and choose to go with GCP option, looking to also leverage BigQuery in the future (once all our data will be migrated).

The execution

Once we decided on the vendor, technologies and use cases we were good to go.

For the purpose of the POC we migrated 500TB of our data, build the Hadoop cluster based on Data Proc, and build the required endpoint machines.

Needless to say, that already in this stage we had to create the network infrastructure to support the secure work of the hybrid environment between GCP and our internal datacenters.

Now that everything was ready we started the actual POC from the users perspective. For a period of one month the participate users will perform their use cases twice. Once on the in-house Research cluster (the production environment), and second time on the Research cluster build on GCP (the POC environment). The users were required to record their experience, which was measured according to the flowing criteria:

  • Compatibility (did the test run seamlessly, any modifications to code and queries required, etc.)
  • Performance (execution time, amount of resources used)
  • Ease of use

During the month of the POC we worked closely with the users, gathered their overall experience and results.

In addition, we documented the compute power needed to execute those jobs, which enabled us to do better cost estimation for how much it would cost to run the full Research Cluster on the cloud.

The POC was successful

The users had a good experience, and our cost analysis proved that with leveraging the cloud elasticity, which in this scenario was very significant, the cloud option would be ROI positive compared with the investment we would need to do building the environment internally. (without getting into the exact numbers – over 40% cheaper, which is a nice incentive!)

With that we started our last phase – the actual migration, which is the focus of our last episode in “Hadoop Research Journey from Bare Metal to Google Cloud – Episode 3”. Stay tuned!

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 1

Outbrain is the world’s leading discovery platform, serving over 250 billion personal recommendations per month. In order to provide premium recommendations at such a scale, we leverage capabilities in analyzing a large amount of data. We use a variety of data stores and technologies such as MySql, Cassandra, Elasticsearch, and Vertica, however in this post trilogy (all things can be split to 3…) I would like to focus on our Hadoop eco-system and our journey from pure bare metal into a hybrid cloud solution.

Hadoop Research Journey from Bare Metal to Google Cloud

The bare metal period

In a nutshell, we keep two flavors of Hadoop clusters:

  • Online clusters, used for online serving activities. Those clusters are relatively small (2 PB of data per cluster) and are kept in our datacenters on bare metal clusters, as part of our serving infrastructure.
  • Research cluster, surprisingly, used mainly for research and offline activities. This cluster keeps large amount of data (6 PB), and by nature the workload on this cluster is elastic.

Most of the time it was not utilized, but there were times of peaks when there was a need to query huge amount of data.

History lesson

Before we move forward in our tale, it may be worthwhile to spend a few words about the history.

We first started to use the Hadoop technology at Outbrain over 6 years ago – starting as a technical small experiment. As our business rapidly grow, so did the data, and the clusters were adjusted in size, however a tech debt had been built up around it. We continued to grow the clusters, based on scale out methodology, and after some time, found ourselves with clusters running old Hadoop version, not being able to support new technologies, build from hundreds of servers, some of which are very old.

We decided we need to stop being fire fighters, and to get super proactive about the issue. We first took care of the Online clusters, and migrated them to a new in-house bare metal solution (you can read more about on this in the Migrating Elephants post on Outbrain Tech Blog site)

Now it was time to move forward and deal with our Research cluster.

Research cluster starting point

Our starting point for the Research cluster was a cluster build out of 500 servers, holding about 6 PB of data, running CDH4 community version.

As mentioned before, the workload on this cluster is elastic – at times, requires a lot of compute power and most of the time fairly under utilized (see graph below).

Research cluster starting point

This graph shows the CPU utilization for 2 weeks, as it seen the usage is not constant, most of the time is barely used, with some periodic peaks


The cluster was unable to support new technologies (such as SPARK and ORC), which were already in use with the Online clusters, reducing our ability to use it for real research.

On top of that, some of the servers in this cluster were becoming very old, and as we grow the cluster on the fly, its storage:CPU:RAM ratio was suboptimal, causing us to waste expensive foot print in our datacenter.  

On top of all of the above, it caused so much frustration to the team!

We mapped our options moving forward:

  1. Do in-place upgrade to the Research cluster software
  2. Rebuild the research cluster from scratch on bare metal in our datacenters (similar to the project we did with the Online clusters)
  3. Leverage cloud technologies and migrate the research cluster to the Cloud.

The dilemma

Option #1 was dropped immediately since it answered only a fraction of our frustration at best. It did not address the old hardware issues, and it did not address our concerned regarding non optimal storage:CPU:RAM ratios – which we understood would only get worse when we come to use RAM intensive technologies such as SPARK.   

We had a dilemma between option #2 and option #3, both viable options with pros and cons.  

Building the Research cluster in house was a project we were very familiar with (we just finished our Online clusters migration), our users were very familiar with the technology, so no learning curve on this front. On the other hand, it required a big financial investment, and we were unable to leverage the elasticity to the extent we wanted.

Migrating to the cloud answered our elasticity needs, however presented a non-predictable cost model (something very important to the finance guys), and had many unknowns as it was new for us, and for the users that would need to work with the environment. It was clear that learning and education will be needed, but it was not clear as to how steep this learning curve would be.   

On top of that, we knew that we must have full compatibility between the Research cluster and the Online cluster, but it was hard for us to estimate the effort required to get there, and the number of processes that require data transition between the clusters.  


So, what do we do when we don’t know which option is better?

We study and experiment! And this is how we entered the 2nd period – the POC.

You are invited to read about the POC we did and how we did it on our next episode of “Hadoop Research Journey from Bare Metal to Google Cloud – Episode 2”.