Category: System Architecture

Switches, Penguins and One Bad Cable

Back in May 2017, I was scheduled to speak at the DoTC conference in Melbourne. I was really excited and looking forward to it, but fate had different plans. And lots of them. From my son going through an emergency appendicitis operation, through flight delays, and up to an emergency landing back in Tel Aviv… I ended up missing the opportunity to speak at the conference. Amazingly, something similar happened this year! Maybe 3rd time’s a charm?

The post below is the talk I’d planned to give, converted to blog format.


August 13, 2015. Outbrain’s ops on call is just getting out of his car, when his phone rings. It’s a PagerDuty alert. Some kind of latency issue in the Chicago datacenter. He acks it, figuring he’ll unload the groceries first and then get round to it. But then, his phone rings again. And again.

Forget the groceries. Forget the barbecue. Production is on fire.

18 hours and many tired engineers later, we’re recovering from having lost our Chicago datacenter. In the takein that follows, we trace the root cause to a single network cable that’s mistakenly connected to the wrong switch.

Hi, my name is Alex, and I lead the Core Services group at Outbrain. Our group owns everything from the floor that hosts Outbrain’s servers, to the delivery pipelines that ship Outbrain’s code. If you’re here, you’ve likely heard of Outbrain. You probably know that we’re the world’s leading Discovery platform, and that you’ll find us installed on publisher sites like CNN, The Guardian, Time Inc and the Australian, where we serve their readers with premium recommendations.

But it wasn’t always this way.

You see, back when we started, life was simple: all you had to do was throw a bunch of Linux servers in a rack, plug them into a switch, write some code… and sell it. And that we did!

But then, an amazing thing happened. The code that we wrote actually worked, and customers started showing up. And they did the most spectacular and terrifying thing ever – they made us grow. One server rack turned into two and then three and four. And before we knew it, we had a whole bunch of racks, full of penguins plugged into switches. It wasn’t as simple as before, but it was manageable. Business was growing, and so were we.

Fast forward a few years.

We’re running quite a few racks across 2 datacenters. We’re not huge, but we’re not a tiny startup anymore. We have actual paying customers, and we have a service to keep up and running. Internally, we’re talking about things like scale, automation and all that stuff. And we understand that the network is going to need some work. By now, we’ve reached the conclusion that managing a lot of switches is time consuming, error-prone, and frankly, not all that interesting. We want to focus on other things, so we break the network challenge down to 2 main topics:

Management and Availability.

Fortunately, management doesn’t look like a very big problem. Instead of managing each switch independently, we go for a something called “a stack”. In essence, it turns 8 switches into one logical unit. At full density, it lets us treat 4 racks as a single logical switch. With 80 nodes per rack, that’s 320 nodes. Quite a bit of compute power!

Four of these setups – about 1200 nodes.

Across two datacenters? 2400 nodes. Easily 10x our size.

Now that’s very impressive, but what if something goes wrong? What if one of these stacks fails? Well, if the whole thing goes down, we lose all 320 nodes. Sure, there’s built-in redundancy for the stack’s master, and losing a non-master switch is far less painful, but even then, 40 nodes going down because of one switch? That’s a lot.

So we give it some thought and come up with a simple solution. Instead of using one of these units in each rack, we’ll use two. Each node will have a connection to stack A, and another to stack B. If stack A fails, we’ll still be able to go through stack B, and vice versa. Perfect!

In order to pull that off, we have to make these two separate stacks, which are actually two separate networks, somehow connect. Our solution to that is to set up bonding on the server side, making its two separate network interfaces look like a single, logical one. On the stack side, we connect everything to one big, happy, shared backbone. With its own redundant setup, of course.

In case you’re still keeping track of the math, you might notice that we just doubled the number of stacks per datacenter. But we still gained simple management And high availability at 10x scale. All this without having to invest in expensive, proprietary management solutions. Or even having to scale the team.

And so, it is decided. We build our glorious, stack based topology. And the land has peace for 40 years. Or… months.

Fast forward 40 months.

We’re running quite a few racks across 3 datacenters. We’re serving customers like CNN, The Guardian, Time Inc and the Australian We reach over 500 million people worldwide, serving 250 billion recommendations a month.

We’re using Chef to automate our servers, with over 300 cookbooks and 1000 roles.

We’re practicing Continuous Delivery, with over 150 releases to production a day.

We’re managing petabytes of data in Hadoop, Elasticsearch, Mysql, Cassandra.

We’re generating over 6 million metrics every minute, have thousands of alerts and dozens of dashboards.

Infrastructure as Code is our religion. And as for our glorious network setup? it’s completely, fully, 100% … manual.

No, really. It’s the darkest, scariest part of our infrastructure.

I mean hey, don’t get me wrong, it’s working, it’s allowed us to scale to many thousands of nodes. But every change in the switches is risky because it’s done using the infamous “config management” called “copy-paste”.

The switching software stack and protocols are proprietary, especially the secret sauce that glues the stacks together. Which makes debugging issues a tiring back-and-forth with support at best, or more often just a blind hit-and-miss. The lead time to setting up a new stack is measured in weeks, with risk of creating network loops and bringing a whole datacenter down. Remember August 13th, 2015? We do.

Again, don’t get me wrong, it’s working, it’s allowed us to scale to many thousands of nodes. And it’s not like we babysit the solution on daily basis. But it’s definitely not Infrastructure as Code. And there’s no way it’s going to scale us to the next 10x.

Fast forward to June 2016.

We’re still running across 3 datacenters, thousands of nodes. CNN, The Guardian, Time Inc, the Australian 500 million users. 250 billion recommendations. You get it.

But something is different.

We’re just bringing up a new datacenter, replacing the oldest of the three. And in it, we’re rolling out a new network topology. It’s called a Clos Fabric, and it’s running BGP end-to-end. It’s based on a design created by Charles Clos for analog telephony switches, back in the 50’s. And on the somewhat more recent RFCs, authored by Facebook, that bring the concept to IP networks.

In this setup, each node is connected to 2 top-of-rack switches, called leaves. And each leaf is connected to a bunch of end-of-row switches, called spines. But there’s no bonding here, and no backbone. Instead, what glues this network together, is that fact that everything in it is a router. And I do mean everything – every switch, every server. They publish their IP addresses over all of their interfaces, essentially telling their neighbours, “Hi, I’m here, and you can reach me through these paths.” And since their neighbours are routers as well, they propagate that information.

Thus a map of all possible paths to all possible destinations is constructed, hop-by-hop, and held by each router in the network. Which, as I mentioned, is everyone. But it gets even better.

We’ve already mentioned that each node is connected to two leaf switches. And that each leaf is connected to a bunch of spines switches. It’s also worth mentioning that they’re not just “connected”. They’re wired the exact same way. Which means, that any path between two points in the network is the exact same distance. And what THAT means is that we can rely on something called ECMP. Which, in plain English, means “just send the packets down any available path, they’re all the same anyway”. And ECMP opens up interesting options for high availability and load distribution.

Let’s pause to consider some of the gains here:

First, this is a really simple setup. All the leaf switches are the same. And so are all of the spines. It doesn’t matter if you have one, two or thirty. And pretty much the same goes for cables. This greatly simplifies inventory, device and firmware management.

Second, it’s predictable. You know the exact amount of hops from any one node in the network to any other: It’s either two or four, no more, no less. Wiring is predictable as well. We know exactly what gets connected where, and what are the exact cable lengths, right from design phase. (spoiler alert:) We can even validate this in software.

Third, it’s dead easy to scale. When designing the fabric, you choose how many racks it’ll support, and at what oversubscription ratio. I’ll spare you the math and just say:

You want more bandwidth? Add more spines.

Support more racks? Go for spines with higher port density.

Finally, high availability is built into the solution. If a link goes down, BGP will make sure all routers are aware. And everything will still work the same way, because with our wiring scheme and ECMP, all paths are created equal. Take THAT evil bonding driver!

But it doesn’t end there. Scaling the pipes is only half the story. What about device management? The infamous copy-paste? Cable management? A single misconnected cable that could bring a whole datacenter down? What about those?

Glad you asked 🙂

After a long, thorough evaluation of multiple vendors, we chose Cumulus Networks as our switch Operating System vendor, and Dell as our switch hardware vendor. Much like you would with servers, by choosing Enterprise Redhat, Suse or Ubuntu. Or with mobile devices, by choosing Android. We chose a solution that decouples the switch OS from the hardware it’s running on. One that lets us select hardware from a list of certified vendors, like Dell, HP, Mellanox and others.

So now our switches run Cumulus Linux, allowing us use the very same tools that manage our fleet of servers, to now manage our fleet of switches. To apply the same open mindset in what was previously a closed, proprietary world.

In fact, when we designed the new datacenter, we wrote Chef cookbooks to automate provisioning and config. We wrote unit and integration tests using Chef’s toolchain and setup a CI pipeline for the code. We even simulated the entire datacenter, switches, servers and all, using Vagrant.

It worked so well, that bootstrapping the new datacenter took us just 5 days. Think about it:

the first time we ever saw a real Dell switch running Cumulus Linux was when we arrived on-site for the buildout. And yet, 99% of our code worked as expected. In 5 days, we were able to setup a LAN, VPN, server provisioning, DNS, LDAP and deal with some quirky BIOS configs. On the servers, mind you, not the switches.

We even hooked Cumulus’ built-in cabling validation to our Prometheus based monitoring system. So that right after we turned monitoring on, we got an alert. On one bad cable. Out of 3000.

Infrastructure as Code anyone?


Live Tail in Kubernetes / Docker Based environment

At Outbrain we are big believers in Observability.

What is Observability, and what is the difference between Observability and Monitoring? I will leave the explanation to Baron Schwartz @xaprb:

“Monitoring tells you whether the system works.  Observability lets you ask why it’s not working.”

@ Outbrain we are currently in the midst of migrating to a Kubernetes / Docker based environment.

This presented many new challenges around understanding why things don’t work.

In this post I will be sharing with you our logging implementation which is the first tool used to understand the why.

But first thing first, a short review of our current standard logging architecture:

We use a standard ELK stack for the majority of our logging needs. By standard I mean Logstash on bare metal nodes, Elasticsearch for storage and Kibana for visualizing and analytics.  Apache Kafka is transport layer for all of the above.

A very simplified sketch of the system:Live Tail in Kubernetes

Of course the setup is a bit more complex in real life since Outbrain’s infrastructure is spread across thousands of servers, in multiple physical data centers and cloud providers; and there are multiple Elasticsearch clusters for different use cases.

Add to the equation that these systems are used in a self-serve model, meaning the engineers are creating and updating configurations by themselves – and you end up with a complex system which must be robust and resilient, or the users will lose trust in the system.

The move to Kubernetes presented new challenges and requirements, specifically related to the logging tools:

  • Support multiple Kubernetes clusters and data centers.
  • We don’t want to us “kubectl”, because managing keys is a pain especially in a multi cluster environment.
  • Provide a way to tail logs and even edit log file. This should be available on a single pod or across a service deployed in multiple pods.
  • Leverage existing technologies: Kafka, ELK stack and Log4j on the client side
  • Support all existing logging sources like multiline and Json.
  • Don’t forget services which don’t run in Kubernetes, yes we still need to support those.


So how did we meet all those requirements? Time to talk about our new Logging design.

The new architecture is based on a standard Kubernetes logging setup – Fluentd daemonset running on each Kubelet node, and all services are configured to send logs to stdout / err  instead of a file.

The Fluentd agent is collecting the pod’s logs and adding the Kubernetes level labels to every message.

The Fluentd plugin we’re using is the kubernetes_metadata_filter.

After the messages are enriched they are stored in a Kafka topic.

A pool of Logstash agents (Running as pods in Kubernetes) are consuming and parsing messages from Kafka as needed.

Once parsed messages can be indexed into Elasticsearch or routed to another topic.

A sketch of the setup described:

A sketch of the setup described:

And now it is time to introduce CTail.

Ctail, stands for Containers Tail, it is an Outbrain homegrown tool written in Go, and based on a server and client side components.

A CTail server-side component runs per datacenter or per Kubernetes cluster, consuming messages from a Kafka topic named “CTail” and based on the Kubernetes app label creates a stream which can be consumed via the CTail client component.

Since order is important for log messages, and since Kafka only guarantees order for messages in the same partition, we had to make sure messages are partitioned by the pod_id.

With this new setup and tooling, when Outbrain engineers want to live tail their logs, all they need to do is launch the CTail client.

Once the Ctail client starts, it will query Consul, which is what we use for service discovery, to locate all of the CTail servers; register to their streams and will perform aggregations in memory – resulting in a live stream of log entries.

Here is a sketch demonstrating the environment and an example of the CTail client output:

CTail client output


To view logs from all pods of a service called “ob1ktemplate” all you need is to run is:

# ctail-client -service ob1ktemplate -msg-only

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: uri http://localhost:8181/Ob1kTemplate/ returned status code 200
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: Getting uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate'
2017-06-13T19:16:25.531Z ob1ktemplate-test-ssages-2751568954-n1rte: uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate' returned status code 200

Or logs of a specific pod:

# ctail-client -service ob1ktemplate -msg-only -pod ob1ktemplate-test-ssages-2751568960-n1kwd

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri 
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751568960-n1kwd: uri http://localhost:8181/Ob1kTemplate/ returned status code 200


This is how we solve this challenge.

Interested in reading more about other challenges we encountered during the migration? Either wait for our next blog, or reach out to visibility at

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime

Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.

A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:

  1. In place: In place migration of the existing cluster to new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right.


First Steps

To better understand the chosen solution let’s take a look at our current architecture:

First Steps

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity the diagram describes only one cluster.

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.


Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:

data delivery pipeline


Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.

Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.

Well… not really.

Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the databases for no good reason, it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.

So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.

When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.

Let’s Do This…

Phase 1 of the migration should look something like this:

Let's Do This...


Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.


Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals.


Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:

new cluster


In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.

The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.

What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active – active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflowEngines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.

What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project you can check out the video from Strata 2017 London where I discussed it in more detail.

Failure Testing for your private cloud – Introducing GomJabbar

TL;DR Chaos Drills can contribute a lot to your services resilience, and it’s actually quite a fun activity. We’ve built a tool called GomJabbar to help you run those drills.

Failure Testing for your private cloud

Here at Outbrain we manage quite a large scale deployment of hundreds of services/modules, and thousands of hosts. We practice CI/CD, and implemented quite a sound infrastructure, which we believe is scalable, performant, and resilient. We do however experience many production issues on a daily basis, just like any other large scale organization. You simply can’t ensure a 100% fault-free system. Servers will crash, run out of disk space, and lose connectivity to the network. Software will experience bugs, and erroneous conditions. Our job as software engineers is to anticipate these conditions, and design our code to handle them gracefully.

For quite a long time we were looking into ways of improving our resilience, and validate our assumptions, using a tool like Netflix’s Chaos Monkey. We also wanted to make sure our alerting system actually triggers when things go wrong. The main problem we were facing is that Chaos Monkey is a tool that was designed to work with cloud infrastructure, while we maintain our own private cloud.

The main motivation for developing such a tool is that failures have the tendency of occurring when you’re least prepared, and in the least desirable time, e.g. Friday nights, when you’re out having a pint with your buddies. Now, to be honest with ourselves, when things fail during inconvenient times, we don’t always roll our sleeves and dive in to look for the root cause. Many times the incident will end after a service restart, and once the alerts clear we forget about it.

Wouldn’t it be great if we could have “chaos drills”, where we could practice handling failures, test and validate our assumptions, and learn how to improve our infrastructure?

Chaos Drills at Outbrain

We built GomJabbar exactly for the reasons specified above. Once a week, at a well known time, mid day, we randomly select a few targets where we trigger failures. At this point, the system should either auto-detect the failures, and auto-heal, or bypass them. In some cases alerts should be triggered to let teams know that a manual intervention is required.

After each chaos drill we conduct a quick take-in session for each of the triggered failures, and ask ourselves the following questions:

  1. Did the system handle the failure case correctly?
  2. Was our alerting strategy effective?
  3. Did the team have the knowledge to handle, and troubleshoot the failure?
  4. Was the issue investigated thoroughly?

These take-ins lead to super valuable inputs, which we probably wouldn’t collect any other way.

How did we kick this off?

Before we started running the chaos drills, there were a lot of concerns about the value of such drills, and the time it will require. Well, since eliminating our fear from production is one of the key goals of this activity, we had to take care of that first.

"I must not fear.
 Fear is the mind-killer.
 Fear is the little-death that brings total obliteration.
 I will face my fear.
 I will permit it to pass over me and through me.
 And when it has gone past I will turn the inner eye to see its path.
 Where the fear has gone there will be nothing. Only I will remain."

(Litany Against Fear - Frank Herbert - Dune)

So we started a series of chats with the teams, in order to understand what was bothering them, and found ways to mitigate it. So here goes:

  • There’s an obvious need to avoid unnecessary damage.
    • We’ve created filters to ensure only approved targets get to participate in the drills.
      This has a side effect of pre-marking areas in the code we need to take care of.
    • We currently schedule drills via, so teams know when to be ready, and if the time is inappropriate,
      we reschedule.
    • When we introduce a new kind of fault, we let everybody know, and explain what should they prepare for in advance.
    • We started out from minor faults like graceful shutdowns, continued to graceless shutdowns,
      and moved on to more interesting testing like faulty network emulation.
  • We’ve measured the time teams spent on these drills, and it turned out to be negligible.
    Most of the time was spent on preparations. For example, ensuring we have proper alerting,
    and correct resilience features in the clients.
    This is actually something you need to do anyway. At the end of the day, we’ve heard no complaints about interruptions, nor time waste.
  • We’ve made sure teams, and engineers on the call were not left on their own. We wanted everybody to learn
    from this drill, and when they weren’t sure how to proceed, we jumped in to help. It’s important
    to make everyone feel safe about this drill, and remind everybody that we only want to learn and improve.

All that said, it’s important to remember that we basically simulate failures that occur on a daily basis. It’s only that when we do that in a controlled manner, it’s easier to observe where are our blind spots, what knowledge are we lacking, and what we need to improve.

Our roadmap – What next?

  • Up until now, this drill was executed in a semi-automatic procedure. The next level is to let the teams run this drill on a fixed interval, at a well known time.
  • Add new kinds of failures, like disk space issues, power failures, etc.
  • So far, we were only brave enough to run this on applicative nodes, and there’s no reason to stop there. Data-stores, load-balancers, network switches, and the like are also on our radar in the near future.
  • Multi-target failure injection. For example, inject a failure to a percentage of the instances of some module in a random cluster. Yes, even a full cluster outage should be tested at some point, in case you were asking yourself.

The GomJabbar Internals

GomJabbar is basically an integration between a discovery system, a (fault) command execution scheduler, and your desired configuration. The configuration contains mostly the target filtering rules, and fault commands.

The fault commands are completely up to you. Out of the box we provide the following example commands, (but you can really write your own script to do what suits your platform, needs, and architecture):

  • Graceful shutdowns of service instances.
  • Graceless shutdowns of service instances.
  • Faulty Network Emulation (high latency, and packet-loss).

Upon startup, GomJabbar drills down via the discovery system, fetches the clusters, modules, and their instances, and passes each via the filters provided in the configuration files. This process is also performed periodically. We currently support discovery via consul, but adding other methods of discovery is quite trivial.

When a users wishes to trigger faults, GomJabbar selects a random target, and returns it to the user, along with a token that identifies this target. The user can then trigger one of the configured fault commands, or scripts, on the random target. At this point GomJabbar uses the configured CommandExecutor in order to execute the remote commands on the target hosts.

GomJabbar also maintains a audit log of all executions, which allows you to revert quickly in the face of a real production issue, or an unexpected catastrophe cause by this tool.

What have we learned so far?

If you’ve read so far, you may be asking yourself what’s in it for me? What kind of lessons can I learn from these drills?

We’ve actually found and fixed many issues by running these drills, and here’s what we can share:

  1. We had broken monitoring and alerting around the detection of the integrity of our production environment. We wanted to make sure that everything that runs in our data-centers is managed, and at a well known (version, health, etc). We’ve found that we didn’t compute the difference between the desired state, and the actual state properly, due to reliance on bogus data-sources. This sort of bug attacked us from two sides: once when we triggered graceful shutdowns, and once for graceless shutdowns.
  2. We’ve found services that had no owner, became obsolete and were basically running unattended in production. The horror.
  3. During the faulty network emulations, we’ve found that we had clients that didn’t implement proper resilience features, and caused cascading failures in the consumers several layers up our service stack. We’ve also noticed that in some cases, the high latency also cascaded. This was fixed by adding proper timeouts, double-dispatch, and circuit-breakers.
  4. We’ve also found that these drills motivated developers to improve their knowledge about the metrics we expose, logs, and the troubleshooting tools we provide.


We’ve found the chaos drills to be an incredibly useful technique, which helps us improve our resilience and integrity, while helping everybody learn about how things work. We’re by no means anywhere near perfection. We’re actually pretty sure we’ll find many many more issues we need to take care of. We’re hoping this exciting new tool will help us move to the next level, and we hope you find it useful too 😉

ScyllaDB POC – (not so :) live blogging – Update #3.

Scylla attacking Olysseus's ship
Scylla attacking Olysseus’s ship

It has been a long time (more than 4 months) since we last updated.

You can read the previous update here.

It is not that we abandoned the POC, we actually continued to invest time and effort on it since there is a good progress. It is just that we did not yet ended it and got the proofs that we wanted. While there was a lot of progress in both Outbrain system side and ScyllaDB side on those 4 months, there is one things that is holding us back from showing trying to prove the main point of this POC. Our current bottleneck is the network. The network on the datacenter where we are running the tests on is 1Gbps ethernet network. We found out that although Scylla is not loaded and works with good latencies we are saturating the NICs. We did some improvements along the way to still show that Scylla is behaving better than C* but if we want to show that we can significantly reduce the number of nodes in the cluster, we need to upgrade to 10Gbps ethernet.

This upgrade will come shortly.

This is where we currently stand. However – a lot was done in those 4 months and there is a lot of learnings I want to share. The rest of the post is the way Doron and the Scylla guys describes what happened. It looks more like captain’s log but it tells the story pretty well.


  • 23/6 – We created special app server cluster to call Scylla, and delegated all calls both to C* and Scylla cluster. We wanted to do that so there will be less coupling between the C* path and the Scylla path and less mutual interruptions that will interfere our conclusions. The app servers for Scylla were configured not to use cache, so entire load (1.5M-3M RPM) went directly to Scylla. C* stayed behind cache and actually handled ~10% of the load. This went smoothly.
  • In the following ~3 weeks we tried to load the snapshot again from C* and stumbled with some difficulties, some related to bugs in Scylla, some to networking limits (1Gpbs). During this time we had to stop the writes to Scylla for few days, so the data was not sync again. Some actions we have done to resolve
    1. We investigated bursts of usage we had and decreased them in some use-cases (both for C* and Scylla). They caused the network usage to be very high for few seconds, sometimes for a few tens of milliseconds. This also helped C* a lot. The tool is now open source.
    2. We added client-server compression (LZ4). It was supported by Scylla, but client needed to configure it.
    3. Scylla added server-server compression during this period.
    4. Changed the “multi” calls back to N parallel single calls (instead of one IN request) – it better utilize the network.
    5. Scylla client was (mistakably) using latency aware over the token aware. This caused app to go to the “wrong” node a lot – causing more traffic within Scylla nodes. Removing the latency-aware helped reducing the server-server network usage and the overall latency.
  • 14/7 – with all the above fixes (and more from Scylla) we were able to load the data and stabilize the cluster with the entire load.
  • Until 29/7 I see many spikes in the latency. We are  not sure what we did to fix it… but on 29/7 the spikes stopped and the latency is stable until today.
  • During this period we have seen 1-5 errors from Scylla per minute. Those errors were explained by trying to reach partitions coming from very old C* version. It was verified by logging the partitions we fail for in the app server side. Scylla fixed that on 29/7.
  • 7/8-15/8 – we have changed the consistency level of both Scylla and C* to local one (to test C*) – this caused a slight decrease in the (already) low latencies.
  • Up to 19/8 we have seen occasional momentarily errors coming from Scylla (few hundreds every few hours). This has not happened since 19/8.. I don’t think we can explain why.
  • Current latencies – Scylla holds over 2M RPM with latency of 2 ms (99%ile) for single requests and 40-50 ms (99%ile) for multi requests of ~100 partitions in avg per request. Latencies are very stable with no apparent spikes. All latencies are measured from the app servers.

Next steps on Scylla:

  • Load the data again from C* to sync.
  • Repeat the data consistency check to verify the results from C* and Scylla are still the same.
  • Run repairs to see cluster can hold while running heavy tasks in the background..
  • Try to understand the errors I mentioned above if they repeat.
  • Create the cluster in Outbrain’s new Sacramento datacenter that have 10Gbps network. with minimum nodes (3?) and try the same load there.


  • 7/8 – we changed consistency level to local-one and tried to remove cache from Cassandra. The test was successful and Cassandra handled the full load with latency increasing from 5 ms (99%ile for single requests) to 10-15ms in the peak hours.
  • 15/8 – we changed back to local-quorum (we do not like to have local-one for this cluster… we can explain in more details why) and set the cache back.
  • 21/8 – we removed the cache again, this time with local-quorum. Cassandra handled it, but single requests latency increased to 30-40 ms for the 99%ile in the peak hours. In addition, we have started timeouts from Cassandra (timeout is 1 second) – up to 500 per minute, in the peak hours.

Next steps on C*:

  • Run repairs to see cluster can hold while running heavy tasks in the BG.
  • Try compression (like we do with Scylla).
  • Try some additional tweaks by the C* expert.
  • In case errors continue, will have to set cache back.

Current status comparison – Aug 20th:

The following table shows comparison under load of 2m RPM in peak hours.

Latencies are in 99%ile.

Cassandra ScyllaDB
Single call latency 30-40 ms (spikes to 70) 2 ms
Multi call latency 150-200 ms (spikes to 600) 40-50 ms
Errors (note: these are query times exceeding 1 second, not necessarily database failures) Up to 150 a minute every few minutes timeouts per minute, with some higher spikes every few days few hundreds every few days

Below are the graphs showing the differences.

Latency and errors graphs showing both C* and Scylla getting requests without cache (1M-2M RPM):

Latency comparison of single requests

Latency comparison of single requests

Latency comparison of multi requests

Latency comparison of multi requests

Errors (timeouts for queries > 1 second)

Errors (timeouts for queries > 1 second)


There are very good signs that Scylla DB does make a difference in throughput but due to the network bottleneck, we could not verify it. We will update as soon as we have results on a faster network. Scylla guys are working on solution for slower networks too.

Hope to update soon.

Micro Service Split

In this post, I will describe a technical methodology we used to remove a piece of functionality from a Monolith/Service and turn it into a Micro-Service. I will try to reason about some of the decisions we have made and the path we took, as well as a more detailed description of internal tools, libraries and frameworks we use at Outbrain and in our team, to shed some light on the way we work in the team. And as a bonus you might learn from our mistakes!
Let’s start with the description of the original service and what it does.
Outbrain runs the largest content discovery platform. From the web surfer’s perspective it means serving a recommended content list that might interest her, in the form of ‘You might also like’ links. Some of those impression links are sponsored. ie: when she clicks on a link, someone is paying for that click, and the revenue is shared between Outbrain and the owner of the page with the link on it. That is how Outbrain makes its revenue.

My team, among other things, is responsible for the routing of the user to the requested page after pressing the link, and for the bookkeeping and accounting that is required in order to calculate the cost of the click, who should be charged, etc.
In our case the service we are trying to split is the ‘Bookkeeper’. Its primary role is to manage the paid impression links budget. After a budget is spent, The ‘Bookkeeper’ should notify Outbrain’s backend servers to refrain from showing the impression link again. And this has to be done as fast as possible. If not, people will click on links we cannot charge because the budget was already spent. Technically, this is done by an update to a database record. However, there are other cases we might want to stop the exposure of impression links. One such an example is a request from the customer paying for the future click to disable the impression link exposure. So for such cases, we have an API endpoint that does exactly the same with the same code. That endpoint is actually part of the ‘Bookkeeper’ that is enabled by a feature toggle on specific machines. This ‘Activate-Impressionable’ endpoint as we call it, is what was decided to split out of the ‘Bookkeeper’ into a designated Micro-Service.
In order to execute the split, we have chosen a step-by-step strategy that will allow us to reduce the risk during execution and keep it as controlled and reversible as possible. From a bird’s eye view I will describe it as a three steps process: Plan, Up and Running as fast as possible and Refactor. The rest of the post describes these steps.

Plan (The who’s and why’s)

In my opinion, this is the most important step. You don’t want to split a service just in order to split. Each Micro Service introduces maintenance and management overhead, with its own set of challenges[1]. On the other hand, Microservices architecture is known for its benefits such as code maintainability (for each Micro Service), the ability to scale out and improved resilience[2].
Luckily for me, someone already did that part for me and took the decision that ‘Activate-Impressionable’ should split from the ‘Bookkeeper’. But still, Let’s name some of the key factor of our planning step.
Basically, I would say that a good separation is a logical separation with its own non-overlap RESTful endpoints and isolated code base. The logical separation should be clear. You should think what is the functionality of the new service, and how isolated it is. It is possible to analyze the code for inter-dependencies among classes and packages using tools such as lattix. At the bottom line, it is important to have a clear definition of the responsibility of the new Micro Service.
In our case, the ‘Bookkeeper’ was eventually split so that it remain the bigger component, ‘Activate-Impressionable’ was smaller and the common library was smaller than both. The exact lines of code can be seen in the table below.


Unfortunately, I assessed it only after the split and not in the plan step. We might say that there is too much code in common when looking at those numbers. It is something worth considering when deciding what to split. A lot of common code implies low isolation level.

Of course part of the planning is time estimation. Although I am not a big fan of “guestimates” I can tell that the task was planned for couple of weeks and took about that long.
Now that we have a plan, let’s get to work.

Up and Running – A Step by Step guide

As in every good refactor, we want to do it in small baby steps, and remain ‘green’ all the time[3]. In continuous deployment that means we can and do deploy to production as often as possible to make sure it is ‘business as usual’. In this step we want to get to a point the new service is working in parallel to the original service. At the end of this step we will point our load-balancers to the new service endpoints. In addition, the code remains mutual in this step, means we can always deploy the original fully functioning ‘Bookkeeper’. We actually do that if we feel the latest changes had any risk.
So let’s break it down into the actual phases:

Overview Step Details
micro service split 0 Starting phase
micro service split 1 Create the new empty Micro-Service ‘Activate-Impressionable’. In outbrain we do it using scaffolding of ob1k framework. Ob1k is an open source Micro Services Framework that was developed in-house.
micro service split 2 Create a new empty Library dependent both by the new ‘Activate-Impressionable’ service and the ‘Bookkeeper’. Ideally, if there is a full logic separation with no mutual code between the services that library will be deleted in the cleanup phase.
micro service split 3 Move the relevant source code to the library. Luckily in our case, there was one directory that was clearly what we have to split out. Unluckily, that code also pulled up some more code it was dependent on and this had to be done carefully not to pull too much nor too little. The good news are that this phase is pretty safe for static typing languages such as Java, in which our service is written in. The compiler protects us here with compilation errors so the feedback loop is very short. Tip: don’t forget to move unit tests as well.
micro service split 4 Move common resources to the library, such as spring beans defined in xml files and our feature flags files that defined in yaml files. This is the dangerous part. We don’t have the compiler here to help, so we actually test it in production. And when I say production I mean using staging/canary/any environment with production configuration but without real impact. Luckily again, both yaml and spring beans are configured to fail fast, so if we did something wrong it will just blow out in our face and the service will refuse to go up. For this step I even ended up developing a one-liner bash script to assist with those wicked yaml files.
micro service split 5 Copy and edit web resources (web.xml) to define the service endpoints. In our case web.xml can’t reside in a library so it has to be copied. Remember we still want the endpoints active in the ‘Bookkeeper’ at that phase. Lesson learned: inspect all files closely. In our case log4j.xml which seems like an innocent file by its name contains designated appenders that are consumed by other production services. I didn’t notice that and didn’t move the required appender, and it was found only later in production.
Deploy Deploy the new service to production. What we did is deploy the ‘Activate-Impressionable’ side-by-side on the same machines as the ‘Bookkeeper’, just with a different ports and context path. Definitely makes you sleep better at night.
Up-And-Running Now is a good time to test once again if both ‘Bookkeeper’ and ‘Activate-Impressionable’ are working as expected. Generally now we are up and running with only few more things to do here.
Clients Redirect Point clients of the service to the new endpoints (port + context path). A step that might take some time depends on the number of clients and the ability to redeploy them. In outbrain we use HA-Proxy, so reconfiguring it did most of the work, but some clients did require code modifications.
(More) Validation Move/copy simulator tests and monitors. In our team, we heavily rely on tests we call simulator tests. These are actually black-box tests written in JUnit that runs against the service installed on a designated machine. These tests see the service as a black-box and calls its endpoints while mock/emulate other services and data in the database for the test run. So usually a test run can look like: put something in the database, trigger the endpoint, and see the result in the database or in the http response. There is also a question here whether to test ‘Activate-Impressionable’ or the ‘Bookkeeper’. Ideally you will test them both (tests are duplicated for that phase), and that is what we did.

Refactor, Disconnect & Cleanup

When we got here the new service is working and we should expect no more behavior changes from the endpoints point of view. But we still want the code to be fully split and the services to be independent from each other. In the previous step we performed the phases in a way that everything remains reversible with a simple feature toggle & deploy.

In this step, we move to a state where the ‘Bookkeeper’ will no longer host the ‘Activate-Impressionable’ functionality. Sometimes it is a good idea to have a gap from the previous step to make sure that there are no rejections and backfires that we didn’t trace in our tests and monitoring.
First thing, If was not done up until now, is deploying the ‘Bookkeeper’ without the service functionality and make sure everything is still working. And wait a little bit more…
And now we just have to push the sources and the resources from the library to the ‘Activate-Impressionable’ service. In the ideal case there is no common code, we can also delete the library. This was not how it was in our case. We still have a lot of common code we can’t separate for the time being.
Now is also the time to do resources cleanup, web.xml edit etc’.
And for the bold and OCD among us – packages rename and refactor of code with the new service naming conventions.


The entire process in our case took a couple of weeks. Part of the fun and advantage in such process, is the opportunity to know better an old code and its structure and functionality without the need to modify something for a new feature with its constraints. Especially when someone else wrote it originally.
In order to perform well such a process it is important to plan and remain organized and on track. In case of a context switch it is very important to keep a bookmark of where you need to return to in order to continue. In our team we even did that with a handoff of the task between developers. Extreme Programming, it is.
It is interesting to see the surprising results in terms of lines of code. Originally we thought of it as splitting a micro-service from a monolith. In retrospective, it looks to me more like splitting a service into two services. ‘Micro’ in this case is in the eye of the beholder.

Real Time Performance Monitoring @ Outbrain

Outbrain serves millions of requests per minute, based on a microservice architecture.Consequently, as you might expect, visibility and performance monitoring are crucial.

Serving millions of requests per minute, across multiple data centers, in a micro services environment, is not an easy task. Every request is routed to many applications, and may potentially stall or fail at every step in the flow. Identifying bottlenecks, troubleshooting failures and knowing our capacity limits are all difficult tasks. Yet, these are not things you can just give up on “because they’re hard”, but are rather tasks that every engineer must be able to tackle without too much overhead. It is clear that we have to aim for all engineers to be able to understand how their applications are doing at any given time.

Since we face all of these challenges every day, we’ve reached the point where a paradigm shift was required. For example, move from the old, familiar “investigate the past” to the new, unfamiliar “investigate the present”. That’s only one of the requirements we came up with. Here are few more:

Real-time visibility

Sounds pretty straightforward, right? However, when using a persistent monitoring system, it always has at least few minutes of delay. These few minutes might contain millions of errors that potentially affect your business. Aiming for low MTTR means cutting delays where possible, thus moving from minute-based granularity to second-base.

Throughput, Latency and error rate are linked

Some components might suffer from high latency, but maybe the amount of traffic they receive is negligible. Others might have low latency under high load, but that’s only because they fail fast for almost every request (we are reactive!). We wanted to view these metrics together, and rank them by importance.

Mathematical correctness at any given aggregation (Don’t lie!)

When dealing with latency, one should look at percentiles, not averages, as averages can be deceiving and might not tell the whole story. But what if we want to view latency per host, and then view it per data center ? if we store only percentiles per host (which is highly common in our industry), it is not mathematically correct to average them! On the other hand, we have so much traffic that we can’t just store any measurement with its latency; and definitely not view them all in real time

Latency resolution matters

JVM based systems tend to display crazy numbers when looking at the high percentiles (how crazy ? With heavy gc storms and lock contention there is no limit to how worse these values can get). It’s crucial for us to differentiate between latency in the 99.5 and 99.9 percentiles, while values at the 5 or 10 percentiles don’t really matter.

Summing up all of the requirements above, we reached a conclusion that our fancy persistent monitoring system, with its minute-based resolution, supporting millions of metrics per minute, doesn’t cut it anymore. We like it that every host can write thousands of metric values every minute, and we like being able to view historical data over long periods of time, but moving forward, it’s just not good enough. So, as we often do, we sat down to rethink our application-level metric collection and came up with a new, improved solution.

Our Monitoring Unit

First, consider metric collection from the application perspective. Logically, it is an application’s point-of-view of some component: a call to another application, to a backend or plain CPU bound computation. Therefore, for every component, we measure its number of requests, failures, timeouts and push backs along with a latency histogram over a short period of time.

In addition, we want to see the worst performing hosts in terms of any such metric (can be mean latency, num errors, etc)

Our Monitoring Unit

To achieve this display for each measured component we decided to use these great technologies:

HDR Histograms

HdrHistogram supports the recording and analysis of sampled data value counts, across a configurable value range, with configurable value precision within the range. It is designed for recording histograms of latency measurements in performance-sensitive applications.

Why is this important? Because when using such histograms to measure the latency of some component, it allows you to have good accuracy of the values in the high percentiles at the expense of the low percentiles

So, we decided to store in memory instances of histograms (as well as counters for requests, errors, timeouts, push backs, etc) for each measured component. We then replace them each second and expose these histograms in the form of rx.Observable using our own OB1K application server capabilities.

All that is left is to aggregate and display.

Java Reactive extensions

rx is a great tool to merge and aggregate streams of data in memory. In our case, we built a service to merge raw streams of measured components; group them by the measured type, and aggregate them in a window of a few seconds. But here’s the trick – we do that on demand. This allows us to let the users view results grouped by any dimension they desire without losing the mathematical correctness of latency histograms aggregation.

Some examples of the operators we use to aggregate the multiple monitoring units:


examples of the operators

rx merge operator enables treating multiple streams as a single stream


multiple streams

rx window operator enables sliding window abstraction


sliding window abstraction

rx scan operator enables aggregation over each window

To simplify things, we can say that for each component we want to display, we connect to each machine to fetch the monitored stream endpoint, perform ‘merge’ to get a single stream abstraction, ‘window’ to get a result per time unit, and ‘scan’ to perform the aggregation

Hystrix Dashboard

The guys at Netflix found a great formula for displaying serving components’ status in a way that links between volume, error percentage and latency in a single view. We really liked that, so we adopted this UI to show our aggregated results.

The hystrix dashboard view of a single measured component shows counters of successes, failures, timeouts and push backs, along with a latency histogram, information on the number of hosts, and more. In addition, it provides a balloon view, which grows/shrinks with traffic volume per component, and is color-coded by the error rate.

See below how this looks in the breakdown view of all request components. The user gets a view of all measured components, sorted by volume, with a listing of the worst performing hosts.

 view of all request components

Another example shows the view of one application, with nothing but its entry points, grouped by data center. Our Operations guys find this extremely useful when needing to re-balance traffic across data centers.

iew of one application

OK, so far so good. Now let’s talk about what we actually do with it.


Sometimes an application doesn’t meet its SLA, be it in latency or error rate. The simple case is due to a broken internal component (for example, some backend went down and all calls to it result in failures). At this point we can view the application dashboard and easily locate the failing call. A more complex use case is an increase in the number of calls to a high latency component at the expense of a low latency one (for example, cache hit rate drop). Here our drill down will need to focus on the relative amount of traffic each component receives – we might be expecting a 1:2 ratio, while in reality we might observe a 1:3 ratio.

With enough alerting in place, this could be caught by an alert. Having the real-time view will allow us to locate the root cause quickly even when the alert is a general one.


Performance comparison

In many cases we want to compare the performance of two groups of hosts doing the same operation, such as version upgrades or topology changes. We use tags to differentiate groups of machines (each datacenter is a tag, each environment, and even each hostname). We then can ask for a specific metric, grouped by tags, to get the following view:

Performance comparison


Load testing

We conduct several types of load tests. One is where we shift as much traffic as possible to one data center, trying to hit the first system-wide bottleneck. Another is performed on specific applications. In both cases we use the application dashboard to view the bottlenecks, just like we would when troubleshooting unexpected events.

One thing to keep in mind is that when an application is loaded, sometimes the CPU is bounded and measurements are false because threads just don’t get CPU time. Another case where this happens is during GC. In such cases we must also measure the effects of this phenomenon.

The measuring unit, in this case, is ‘jvm hiccup’, which basically means taking one thread, letting it sleep for a while and measuring “measurement time on top of the sleep time”. Low hiccups means we can rely on the numbers presented by other metrics.

Load testing


What’s next?

Real-time monitoring holds a critical role in our everyday work, and we have many plans to leverage these measurements. From smarter metric driven load balancing in the client to canary deployments based on real application behavior – there is no limit to what you can achieve when you measure stuff in a fast, reliable manner.

Goodbye static CNAMEs, hello Consul

Nearly every large-scale system becomes distributed at some point: a collection of many instances and services that compose the solution you provide. And as you scale horizontally to provide high availability, better load distribution, etc…, you find yourself spinning up multiple instances of services, or using systems that function in a clustered architecture. That’s all cool in theory, but soon after you ask yourself, “how do I manage all of this? How should these services communicate with each other? And how do they even know what instances (or machines) exist?”

Those are excellent questions!

What methods are in use today?

The naive approach, which we’d followed in Outbrain for many years, is to route all inter-service traffic via load balancers (HAProxy in our case). Every call to another system, such as a MySql slave, is done to the load balancer (one in a pool of many), via an agreed upon name, such as a DNS CNAME. The load balancer, which holds a static configuration of all the different services and their instances, directs the call to one of those instances, based on the predefined policy.

backend be_onering_es   ## backend name
  balance leastconn     ## how to distribute load
  option httpchk GET /  ## service health check method
  option httpclose      ## add “Connection: close” header if missing
  option forwardfor     ## send client IP through XFF header
  server ringdb-20001 ringdb-20001:9200 check slowstart 10s weight 100   ## backend node 1
  server ringdb-20002 ringdb-20002:9200 check slowstart 10s weight 100   ## backend node 2

The load balancer is also responsible for checking service health, to make sure requests are routed only to live services, as dead ones are “kicked out of the pool”, and revived ones are brought back in.

An alternative to the load balancer method, used in high throughput systems such as Cassandra, is configuring CNAMEs that point to specific nodes in the cluster. We then use those CNAMES in the consuming applications’s configuration. The client is then responsible to activate a policy of balancing between those nodes, both for load and availability.

OK, so what’s the problem here?

There’s a few actually:

  1. The mediator (Load balancer), as quick as it may be in processing requests (and HAProxy is really fast!), is another hop on the network. With many services talking to each other, this could prove a choke point in some network topologies. It’s also a shared resource between multiple services and if one service misbehaves, everyone pays the price. This is especially painful with big payloads.
  2. The world becomes very static! Moving services between hosts, scaling them out/in, adding new services – it all involves changing the mediator’s config, and in many cases done manually. Manual work requires expertise and is error prone. When the changes becomes frequent… it simply does not scale.
  3. When moving ahead to infrastructure that is based on containers and resource management, where instances of services and resources are allocated dynamically, the whole notion of HOSTNAME goes away and you cannot count on it in ANY configuration.

What this all adds up to is “the end of the static configuration era”. Goodbye static configs, hello Dynamic Service Discovery! And cue Consul.

What is Consul?

In a nutshell, Consul is a Service Discovery System, with a few interesting features:

  1. It’s a distributed system, made out of an agent in each node. Nodes talk to each other via a gossip protocol, making node discovery simple, robust, and dynamic. There’s no configuration file describing all members of a Consul cluster.
  2. It’s fault tolerant by design, and using concepts such as Anti Entropy, gracefully handles nodes disappearing and reappearing – a common scenario in VM/container-based infrastructure.
  3. It has first-class treatment of datacenters, as self-contained, interconnected entities. This means that DC failure/disconnection would be self-contained. It also means that a node in one DC can query for information in another DC with as little knowledge as the remote DC’s name.
  4. It holds the location (URI) and health of every service on every host, and makes this data available via multiple channels, such as a REST API and GUI. The API also lets you make complex queries and get the service data segment you’re interested in. For example: Get me all the addresses of all instances of service ‘X’ from Datacenter ‘Y’ in ‘staging env’ (tag).
  5. There is a very simple way to get access to “Healthy” service instances by leveraging the Consul DNS interface. Perfect for those pesky 3rd party services whose code you can’t or don’t want to modify, or just to get up and running quickly without modifying any client code (disclaimer: doesn’t fit all scenarios).

How does Consul work?

You can read all about it here, but let me take you through a quick tour of the architecture:

tour of the architecture
click to enlarge

As you can see, Consul has multi-datacenter awareness built right in (you can read more about it here). But for our case, let’s keep it simple, and look at the case of a single datacenter (Datacenter 1 in the diagram).

What the diagram tags as “Clients” are actually “Consul agents”, running locally on every participating host. Those talk to each other, as well as the Consul servers (which are “agents” configured as Servers), through a “Gossip protocol”. If you’re familiar with Cassandra, and that rings a bell, then you’re right, it’s the same concept used by Cassandra nodes to find out which ones are up or down in a cluster. A Gossip protocol essentially makes sure “Everybody knows Everything about Everyone”. So within reasonable delay, all agents know (and propagate) state information about other agents. And you just so happen to have an agent running locally on your node, ready to share everything it knows via API, DNS or whatnot. How convenient!

Agents are also the ones performing health checks to the services on the hosts they run on, and gossiping any health state changes. To make this work, every service must expose a means to query its health status, and when registered with its local Consul agent, also register its health check information. At Outbrain we use an HTTP based “SelfTest” endpoint that every one of our homegrown services exposes (through our OB1K container, practically for free!).

Consul servers are also part of the gossip pool and thus propagate state in the cluster. However, they also maintain a quorum and elect a leader, who receives all updates (via RPC calls forwarded from the other servers) and registers them in it’s database. From here on, the data is replicated to the other servers and propagated to all the agents via Gossip. This method is a bit different from other Gossip based systems that have no servers and leaders, but it allows the system to support stronger consistency models.

There’s also a distributed key-value store we haven’t mentioned, rich ACLs, and a whole ecosystem of supporting and derived tools… but we said we’d keep it simple for now.

Where does that help with service discovery?

First, what we’ve done is taken all of our systems already organized in clusters and registered them with Consul. Systems such as Kafka, Zookeeper, Cassandra and others. This allows us to select a live service node from a cluster, simply by calling a hostname through the Consul DNS interface. For example, take Graphite: Outbrain’s systems are currently generating ~4M metrics per minute. Getting all of these metrics through a load balancer, or even a cluster of LBs, would be suboptimal, to say the least. Consul allows us to have each host send metrics to a hostname, such as “graphite.service.consul”, which returns a random IP of a live graphite relay node. Want to add a couple more nodes to share the load? no problem, just register them with Consul and they automagically appear in the list the next time a client resolves that hostname. Which, as we mentioned, happens quite a few times a minute. No load balancers in the way to serve as choke points, no editing of static config files. Just simple, fast, out-of-band communication.

How do these 3rd party services register?

We’re heavy users of Chef, and have thus created a chef cookbook to help us get the job done. Here’s a (simplified) code sample we use to register Graphite servers:

ob_consul 'graphite' do
  owner 'ops-vis'         ## add ‘owner’ tag to identify owning group
  port 1231               ## port the service is running on
  check_cmd "echo '' | nc localhost 1231 || exit 2"    ## health check shell command
  check_interval '60s'    ## health check execution interval
  template false          ## whether the health check command is a Chef template (for scripts)
  tags [‘prod’]           ## more tags

How to do clients consume services?

Clients simply resolve the DNS record they’re interested in… and that’s it. Consul takes care of all the rest, including randomizing the results.

$ host graphite is an alias for relayng.service.consul.
relayng.service.consul has address
relayng.service.consul has address

How does this data reach the DNS?

We’ve chosen to place Consul “behind” our internal DNS servers, and forward all requests for the “consul” domain name to a consul agent running on the DNS servers.

zone "consul" IN {
    type forward;
    forward only;
    forwarders { port 8600; };

Note that there’s other ways to go about this, such as routing all DNS requests to the local Consul agent running on each node, and having it forward everything “non-Consul” to your DNS servers. There’s advantages and disadvantages to each approach. For our current needs, having an agent sit behind the DNS servers works quite well.

Where does the Consul implementation at Outbrain stand now?

At Outbrain we’re already using Consul for:

  • Graphite servers.
  • Hive Thrift servers that are Hive interfaces to the Hadoop cluster they’re running on. Here the Consul CNAME represents the actual Hadoop cluster you want your query to run on. We’ve also added a layer that enables accessing these clusters from different datacenters using Consul’s multi-DC support.
  • Kafka servers.
  • Elasticsearch servers.

And our roadmap for the near future:

  • MySql Slaves – so we can eliminate the use of HAProxy in that path.
  • Cassandra servers where maintaining a list of active nodes in the app configuration becomes stale over time.
  • Prometheus – our new monitoring and alerting system.
  • Zookeeper clusters.

But that’s not all! stay tuned for more on Consul, client-side load balancing, and making your environment more dynamic.

So Long Spring XMLs

Like many java projects these days, we use Spring in Outbrain for configuring our java dependencies wiring. Spring is a technology that started in order to solve a common, yet not so simple, issue – wiring all the dependencies in a java project. This was done by utilizing the IoC (Inversion of Control) principles. Today Spring does a lot more than just wiring and bootstrapping, but in this post I will focus mainly on that.

When Spring just started, the only way to configure the wirings of an application, was to use XMLs which defined the dependencies between different beans. As Spring had continued to develop, 2 more methods were added to configure dependencies – the annotation method and the @Configuration method. In Outbrain we use XML configuration. I found this method has a lot of pain points which I found remedy to using spring @Configuration

What is this @Configuration class?

You can think of a @Configuration class just like XML definitions, only defined by code. Using code instead of XMLs allows some advantages over XMLs which made me switch to this method:

  1. No typos – You can’t have a typo in code. The code just won’t compile
  2. Compile time check (fail fast) – With XMLs it’s possible to add an argument to a bean’s constructor but to forget to inject this argument when defining the bean in the XML. Again, this can’t happen with code. The code just won’t compile
  3. IDE features come for free – Using code allows you to find usages of the bean’s constructor to find out easily the contexts that use it; It allows you to jump back and forth between beans definitions and basically everything you can do with code, you get for free.
  4. Feature flags – In Outbrain we use feature-flags a lot. Due to the continuous-deployment culture of the company, a code that is pushed to the trunk can find itself in production in a matter of minutes. Sometimes, when developing features, we use feature flags to enable/disable certain features. This is pretty easy to do by defining 2 different implementations to the same interface and decide which one to load according to the flag. When using XMLs we had to use the alias feature which makes it not intuitive enough to create feature-flags. With @Configuration, we can create a simple if clause for choosing the right implementation.

Read more >

Finding a needle in a Storm-stack

Using Storm for real time distributed computations has become a widely adopted approach, and today one can easily find more than a few posts on Storm’s architecture, internals, and what have you (e.g., Storm wiki, Understanding the parallelism of a storm topology, Understanding storm internal message buffers, etc).

So you read all these posts and and got yourself a running Storm cluster. You even wrote a topology that does something you need, and managed to get it deployed. “How cool is this?”, you think to yourself. “Extremely cool”, you reply to yourself sipping the morning coffee. The next step would probably be writing some sort of a validation procedure, to make sure your distributed Storm computation does what you think it does, and does it well. Here at Outbrain we have these validation processes running hourly, making sure our realtime layer data is consistent with our batch layer data – which we consider to be the source of truth.


It was when the validation of a newly written computation started failing, that we embarked on a great journey to the land of “How does one go about debugging a distributed Storm computation?”, true story. The validation process was reporting intermittent inconsistencies when, intermittent being the operative word here, since it was not like the new topology was completely and utterly messed up, rather, it was failing to produce correct results for some of the input, all the time (by correct results I mean such that match our source of truth).

Read more >