Category: DevOps

Switches, Penguins and One Bad Cable

Back in May 2017, I was scheduled to speak at the DoTC conference in Melbourne. I was really excited and looking forward to it, but fate had different plans. And lots of them. From my son going through an emergency appendicitis operation, through flight delays, and up to an emergency landing back in Tel Aviv… I ended up missing the opportunity to speak at the conference. Amazingly, something similar happened this year! Maybe 3rd time’s a charm?

The post below is the talk I’d planned to give, converted to blog format.


 

August 13, 2015. Outbrain’s ops on call is just getting out of his car, when his phone rings. It’s a PagerDuty alert. Some kind of latency issue in the Chicago datacenter. He acks it, figuring he’ll unload the groceries first and then get round to it. But then, his phone rings again. And again.

Forget the groceries. Forget the barbecue. Production is on fire.

18 hours and many tired engineers later, we’re recovering from having lost our Chicago datacenter. In the takein that follows, we trace the root cause to a single network cable that’s mistakenly connected to the wrong switch.

Hi, my name is Alex, and I lead the Core Services group at Outbrain. Our group owns everything from the floor that hosts Outbrain’s servers, to the delivery pipelines that ship Outbrain’s code. If you’re here, you’ve likely heard of Outbrain. You probably know that we’re the world’s leading Discovery platform, and that you’ll find us installed on publisher sites like CNN, The Guardian, Time Inc and the Australian news.com, where we serve their readers with premium recommendations.

But it wasn’t always this way.

You see, back when we started, life was simple: all you had to do was throw a bunch of Linux servers in a rack, plug them into a switch, write some code… and sell it. And that we did!

But then, an amazing thing happened. The code that we wrote actually worked, and customers started showing up. And they did the most spectacular and terrifying thing ever – they made us grow. One server rack turned into two and then three and four. And before we knew it, we had a whole bunch of racks, full of penguins plugged into switches. It wasn’t as simple as before, but it was manageable. Business was growing, and so were we.

Fast forward a few years.

We’re running quite a few racks across 2 datacenters. We’re not huge, but we’re not a tiny startup anymore. We have actual paying customers, and we have a service to keep up and running. Internally, we’re talking about things like scale, automation and all that stuff. And we understand that the network is going to need some work. By now, we’ve reached the conclusion that managing a lot of switches is time consuming, error-prone, and frankly, not all that interesting. We want to focus on other things, so we break the network challenge down to 2 main topics:

Management and Availability.

Fortunately, management doesn’t look like a very big problem. Instead of managing each switch independently, we go for a something called “a stack”. In essence, it turns 8 switches into one logical unit. At full density, it lets us treat 4 racks as a single logical switch. With 80 nodes per rack, that’s 320 nodes. Quite a bit of compute power!

Four of these setups – about 1200 nodes.

Across two datacenters? 2400 nodes. Easily 10x our size.

Now that’s very impressive, but what if something goes wrong? What if one of these stacks fails? Well, if the whole thing goes down, we lose all 320 nodes. Sure, there’s built-in redundancy for the stack’s master, and losing a non-master switch is far less painful, but even then, 40 nodes going down because of one switch? That’s a lot.

So we give it some thought and come up with a simple solution. Instead of using one of these units in each rack, we’ll use two. Each node will have a connection to stack A, and another to stack B. If stack A fails, we’ll still be able to go through stack B, and vice versa. Perfect!

In order to pull that off, we have to make these two separate stacks, which are actually two separate networks, somehow connect. Our solution to that is to set up bonding on the server side, making its two separate network interfaces look like a single, logical one. On the stack side, we connect everything to one big, happy, shared backbone. With its own redundant setup, of course.

In case you’re still keeping track of the math, you might notice that we just doubled the number of stacks per datacenter. But we still gained simple management And high availability at 10x scale. All this without having to invest in expensive, proprietary management solutions. Or even having to scale the team.

And so, it is decided. We build our glorious, stack based topology. And the land has peace for 40 years. Or… months.

Fast forward 40 months.

We’re running quite a few racks across 3 datacenters. We’re serving customers like CNN, The Guardian, Time Inc and the Australian news.com. We reach over 500 million people worldwide, serving 250 billion recommendations a month.

We’re using Chef to automate our servers, with over 300 cookbooks and 1000 roles.

We’re practicing Continuous Delivery, with over 150 releases to production a day.

We’re managing petabytes of data in Hadoop, Elasticsearch, Mysql, Cassandra.

We’re generating over 6 million metrics every minute, have thousands of alerts and dozens of dashboards.

Infrastructure as Code is our religion. And as for our glorious network setup? it’s completely, fully, 100% … manual.

No, really. It’s the darkest, scariest part of our infrastructure.

I mean hey, don’t get me wrong, it’s working, it’s allowed us to scale to many thousands of nodes. But every change in the switches is risky because it’s done using the infamous “config management” called “copy-paste”.

The switching software stack and protocols are proprietary, especially the secret sauce that glues the stacks together. Which makes debugging issues a tiring back-and-forth with support at best, or more often just a blind hit-and-miss. The lead time to setting up a new stack is measured in weeks, with risk of creating network loops and bringing a whole datacenter down. Remember August 13th, 2015? We do.

Again, don’t get me wrong, it’s working, it’s allowed us to scale to many thousands of nodes. And it’s not like we babysit the solution on daily basis. But it’s definitely not Infrastructure as Code. And there’s no way it’s going to scale us to the next 10x.

Fast forward to June 2016.

We’re still running across 3 datacenters, thousands of nodes. CNN, The Guardian, Time Inc, the Australian news.com. 500 million users. 250 billion recommendations. You get it.

But something is different.

We’re just bringing up a new datacenter, replacing the oldest of the three. And in it, we’re rolling out a new network topology. It’s called a Clos Fabric, and it’s running BGP end-to-end. It’s based on a design created by Charles Clos for analog telephony switches, back in the 50’s. And on the somewhat more recent RFCs, authored by Facebook, that bring the concept to IP networks.

In this setup, each node is connected to 2 top-of-rack switches, called leaves. And each leaf is connected to a bunch of end-of-row switches, called spines. But there’s no bonding here, and no backbone. Instead, what glues this network together, is that fact that everything in it is a router. And I do mean everything – every switch, every server. They publish their IP addresses over all of their interfaces, essentially telling their neighbours, “Hi, I’m here, and you can reach me through these paths.” And since their neighbours are routers as well, they propagate that information.

Thus a map of all possible paths to all possible destinations is constructed, hop-by-hop, and held by each router in the network. Which, as I mentioned, is everyone. But it gets even better.

We’ve already mentioned that each node is connected to two leaf switches. And that each leaf is connected to a bunch of spines switches. It’s also worth mentioning that they’re not just “connected”. They’re wired the exact same way. Which means, that any path between two points in the network is the exact same distance. And what THAT means is that we can rely on something called ECMP. Which, in plain English, means “just send the packets down any available path, they’re all the same anyway”. And ECMP opens up interesting options for high availability and load distribution.

Let’s pause to consider some of the gains here:

First, this is a really simple setup. All the leaf switches are the same. And so are all of the spines. It doesn’t matter if you have one, two or thirty. And pretty much the same goes for cables. This greatly simplifies inventory, device and firmware management.

Second, it’s predictable. You know the exact amount of hops from any one node in the network to any other: It’s either two or four, no more, no less. Wiring is predictable as well. We know exactly what gets connected where, and what are the exact cable lengths, right from design phase. (spoiler alert:) We can even validate this in software.

Third, it’s dead easy to scale. When designing the fabric, you choose how many racks it’ll support, and at what oversubscription ratio. I’ll spare you the math and just say:

You want more bandwidth? Add more spines.

Support more racks? Go for spines with higher port density.

Finally, high availability is built into the solution. If a link goes down, BGP will make sure all routers are aware. And everything will still work the same way, because with our wiring scheme and ECMP, all paths are created equal. Take THAT evil bonding driver!

But it doesn’t end there. Scaling the pipes is only half the story. What about device management? The infamous copy-paste? Cable management? A single misconnected cable that could bring a whole datacenter down? What about those?

Glad you asked 🙂

After a long, thorough evaluation of multiple vendors, we chose Cumulus Networks as our switch Operating System vendor, and Dell as our switch hardware vendor. Much like you would with servers, by choosing Enterprise Redhat, Suse or Ubuntu. Or with mobile devices, by choosing Android. We chose a solution that decouples the switch OS from the hardware it’s running on. One that lets us select hardware from a list of certified vendors, like Dell, HP, Mellanox and others.

So now our switches run Cumulus Linux, allowing us use the very same tools that manage our fleet of servers, to now manage our fleet of switches. To apply the same open mindset in what was previously a closed, proprietary world.

In fact, when we designed the new datacenter, we wrote Chef cookbooks to automate provisioning and config. We wrote unit and integration tests using Chef’s toolchain and setup a CI pipeline for the code. We even simulated the entire datacenter, switches, servers and all, using Vagrant.

It worked so well, that bootstrapping the new datacenter took us just 5 days. Think about it:

the first time we ever saw a real Dell switch running Cumulus Linux was when we arrived on-site for the buildout. And yet, 99% of our code worked as expected. In 5 days, we were able to setup a LAN, VPN, server provisioning, DNS, LDAP and deal with some quirky BIOS configs. On the servers, mind you, not the switches.

We even hooked Cumulus’ built-in cabling validation to our Prometheus based monitoring system. So that right after we turned monitoring on, we got an alert. On one bad cable. Out of 3000.

Infrastructure as Code anyone?

 

Structure a Vue.js App from Containers and Components

Recently we’ve begun using Vue.js as a frontend framework for one of our infrastructure projects. We’ve contracted Dr. Yoram Kornatzky to join our Delivery team and dive together, headlong, into this brave new world.

In this blog post by Yoram, and others to come, we’ll share snippets from this journey. We hope you find these beneficial, and invite you to share your own experiences and feedback.


Vue.js using Vuex for state management does not have a clear distinction between containers and components. This is in clear contrast to React using Redux for state management.

We argue that such a distinction between containers components in beneficial for Vue.js as well.

Dyploma

Dyploma is a system for managing containerized applications and services on top of Kubernetes in Outbrain. Dyploma includes the concepts of:

  • artifacts
  • builds
  • deployments
  • services

Dyploma is made out of a Java Spring backend and a Python command-line tool (CLI). The command-line tool operates through API calls to the backend.

The Dyploma Web Application

To facilitate broader adoption of containers within Outbrain, we set up to develop a web application that will have the capabilities of the Dyploma CLI.

The web application will operate by fetching data from the backend and sending operations for execution in the backend. This will be done through the same REST API used by the CLI.

A Vue.js Web Application

We chose Vue.js for constructing the web application. The app was constructed using vue-cli with the webpack template.

The application has three generic screens:

  • list
  • detail
  • form

All concepts have screens from each of these types with similar structure and look and feel, but with different actions and different data.

Vuex

Vuex is the standard state management approach for Vue.js.

Containers vs Components in React

Let us first recap what are containers and components in React.

A container interacts with the Redux and contains a component. The container supplies data to the component through selectors on the store and provides the actions on the store to the component.

Components are given data and render HTML. They use the actions provided from their container to interact with the state. Such actions modify the state, resulting in the selectors fetching new data, and causing the component to be rendered again.

Vue.js with Vuex

Vue.js standard practice does not have the containers vs components distinction. While constructing the Dyploma web application we found it useful to make such a distinction for the benefits of better code structure and reusability.

Let us first describe how the structure of the Dyploma web application.

Generic Components

We constructed three generic components:

  1. list
  2. detail
  3. form

Which can be composed of a component tree that can have more than 3 levels.

Each of these generic screens was used with some variations by multiple types of data. But the look and feel could be configured through a common JSON describing for each type of data, the different fields.

Type Specific Actions and Getters

The getters and actions to be used for each type of data were different. We constructed our Vuex store with modules and needed to use a separate module for each type.

Distinguish Components and Containers

So we had to think how to resolve two opposite requirements. For the benefits of reusability, we need unified generic components. But for the type specific actions and data, we need to use separate modules. We decided up front that the whole app will be constructed as a set of single file components (SFC).

To resolve these two opposite directions, we found it useful to think of our app as consisting of two things:

  • containers – type-specific that interact with store
  • components – generic

Components

We defined each component to a data props for the data it should render, and a description of the structure of data. For any changes and actions required, it will emit an event.

Data is passed from a component to its constituents with v-bind, like v-bind:list=”deployments”.

Events are hooked up with v-on like v-on:search=”search”.

Components are composed of smaller components. Events are propagated up the tree of components. This bottom-up propagation may be disturbing to some, but it is the best approach for Vue.js. In this respect, Vue.js is definitely different from React.

The component is a single file component (SFC).

Such a component is not necessarily functional.

A Container for Each Type of Data

A container knows which module of the store it deals with, and knows its actions and getters. It fetches data from the store using getters. Such data is passed to the components as props.

It listens to events coming from the components using v-on like v-on:search=”search”. In response to such events, it dispatches actions.

The container does not render anything itself, this is done by the component it contains.

The container is a single file component (SFC)s.

A Clean Separation Facilitates Reusability

This clean separation of components and containers make it simpler to see opportunities for reusability. Come to think of it, in most web apps, the real effort in reusability is reusability of the component. The mixing of components and containers causes many components to be coupled with the store. This makes it harder to identify reusability. By distinguishing components and containers, we isolate the components from the store and see more clearly opportunities for reusability.

Easier Testing

Writing unit tests becomes easier with this separation. One can write three classes of tests:

  1. components
  2. containers
  3. store

Each becoming simpler.

We will discuss this further in a separate article.

Conclusions

Split your Vue.js web app into containers and components.

Live Tail in Kubernetes / Docker Based environment

At Outbrain we are big believers in Observability.

What is Observability, and what is the difference between Observability and Monitoring? I will leave the explanation to Baron Schwartz @xaprb:

“Monitoring tells you whether the system works.  Observability lets you ask why it’s not working.”

@ Outbrain we are currently in the midst of migrating to a Kubernetes / Docker based environment.

This presented many new challenges around understanding why things don’t work.

In this post I will be sharing with you our logging implementation which is the first tool used to understand the why.

But first thing first, a short review of our current standard logging architecture:

We use a standard ELK stack for the majority of our logging needs. By standard I mean Logstash on bare metal nodes, Elasticsearch for storage and Kibana for visualizing and analytics.  Apache Kafka is transport layer for all of the above.

A very simplified sketch of the system:Live Tail in Kubernetes

Of course the setup is a bit more complex in real life since Outbrain’s infrastructure is spread across thousands of servers, in multiple physical data centers and cloud providers; and there are multiple Elasticsearch clusters for different use cases.

Add to the equation that these systems are used in a self-serve model, meaning the engineers are creating and updating configurations by themselves – and you end up with a complex system which must be robust and resilient, or the users will lose trust in the system.

The move to Kubernetes presented new challenges and requirements, specifically related to the logging tools:

  • Support multiple Kubernetes clusters and data centers.
  • We don’t want to us “kubectl”, because managing keys is a pain especially in a multi cluster environment.
  • Provide a way to tail logs and even edit log file. This should be available on a single pod or across a service deployed in multiple pods.
  • Leverage existing technologies: Kafka, ELK stack and Log4j on the client side
  • Support all existing logging sources like multiline and Json.
  • Don’t forget services which don’t run in Kubernetes, yes we still need to support those.

 

So how did we meet all those requirements? Time to talk about our new Logging design.

The new architecture is based on a standard Kubernetes logging setup – Fluentd daemonset running on each Kubelet node, and all services are configured to send logs to stdout / err  instead of a file.

The Fluentd agent is collecting the pod’s logs and adding the Kubernetes level labels to every message.

The Fluentd plugin we’re using is the kubernetes_metadata_filter.

After the messages are enriched they are stored in a Kafka topic.

A pool of Logstash agents (Running as pods in Kubernetes) are consuming and parsing messages from Kafka as needed.

Once parsed messages can be indexed into Elasticsearch or routed to another topic.

A sketch of the setup described:

A sketch of the setup described:

And now it is time to introduce CTail.

Ctail, stands for Containers Tail, it is an Outbrain homegrown tool written in Go, and based on a server and client side components.

A CTail server-side component runs per datacenter or per Kubernetes cluster, consuming messages from a Kafka topic named “CTail” and based on the Kubernetes app label creates a stream which can be consumed via the CTail client component.

Since order is important for log messages, and since Kafka only guarantees order for messages in the same partition, we had to make sure messages are partitioned by the pod_id.

With this new setup and tooling, when Outbrain engineers want to live tail their logs, all they need to do is launch the CTail client.

Once the Ctail client starts, it will query Consul, which is what we use for service discovery, to locate all of the CTail servers; register to their streams and will perform aggregations in memory – resulting in a live stream of log entries.

Here is a sketch demonstrating the environment and an example of the CTail client output:

CTail client output

 

To view logs from all pods of a service called “ob1ktemplate” all you need is to run is:

# ctail-client -service ob1ktemplate -msg-only

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: uri http://localhost:8181/Ob1kTemplate/ returned status code 200
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: Getting uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate'
2017-06-13T19:16:25.531Z ob1ktemplate-test-ssages-2751568954-n1rte: uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate' returned status code 200

Or logs of a specific pod:

# ctail-client -service ob1ktemplate -msg-only -pod ob1ktemplate-test-ssages-2751568960-n1kwd

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri 
http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751568960-n1kwd: uri http://localhost:8181/Ob1kTemplate/ returned status code 200

 

This is how we solve this challenge.

Interested in reading more about other challenges we encountered during the migration? Either wait for our next blog, or reach out to visibility at outbrain.com.

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime

Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.

A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:

  1. In place: In place migration of the existing cluster to new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right.

 

First Steps

To better understand the chosen solution let’s take a look at our current architecture:

First Steps

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity the diagram describes only one cluster.

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.

 

Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:

data delivery pipeline

 

Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.

Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.

Well… not really.

Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the databases for no good reason, it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.

So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.

When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.

Let’s Do This…

Phase 1 of the migration should look something like this:

Let's Do This...

 

Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.

 

Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals.

 

Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:

new cluster

 

In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.

The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.

What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active – active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflowEngines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.

What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project you can check out the video from Strata 2017 London where I discussed it in more detail.

Failure Testing for your private cloud – Introducing GomJabbar

TL;DR Chaos Drills can contribute a lot to your services resilience, and it’s actually quite a fun activity. We’ve built a tool called GomJabbar to help you run those drills.


Failure Testing for your private cloud

Here at Outbrain we manage quite a large scale deployment of hundreds of services/modules, and thousands of hosts. We practice CI/CD, and implemented quite a sound infrastructure, which we believe is scalable, performant, and resilient. We do however experience many production issues on a daily basis, just like any other large scale organization. You simply can’t ensure a 100% fault-free system. Servers will crash, run out of disk space, and lose connectivity to the network. Software will experience bugs, and erroneous conditions. Our job as software engineers is to anticipate these conditions, and design our code to handle them gracefully.

For quite a long time we were looking into ways of improving our resilience, and validate our assumptions, using a tool like Netflix’s Chaos Monkey. We also wanted to make sure our alerting system actually triggers when things go wrong. The main problem we were facing is that Chaos Monkey is a tool that was designed to work with cloud infrastructure, while we maintain our own private cloud.

The main motivation for developing such a tool is that failures have the tendency of occurring when you’re least prepared, and in the least desirable time, e.g. Friday nights, when you’re out having a pint with your buddies. Now, to be honest with ourselves, when things fail during inconvenient times, we don’t always roll our sleeves and dive in to look for the root cause. Many times the incident will end after a service restart, and once the alerts clear we forget about it.

Wouldn’t it be great if we could have “chaos drills”, where we could practice handling failures, test and validate our assumptions, and learn how to improve our infrastructure?

Chaos Drills at Outbrain

We built GomJabbar exactly for the reasons specified above. Once a week, at a well known time, mid day, we randomly select a few targets where we trigger failures. At this point, the system should either auto-detect the failures, and auto-heal, or bypass them. In some cases alerts should be triggered to let teams know that a manual intervention is required.

After each chaos drill we conduct a quick take-in session for each of the triggered failures, and ask ourselves the following questions:

  1. Did the system handle the failure case correctly?
  2. Was our alerting strategy effective?
  3. Did the team have the knowledge to handle, and troubleshoot the failure?
  4. Was the issue investigated thoroughly?

These take-ins lead to super valuable inputs, which we probably wouldn’t collect any other way.

How did we kick this off?

Before we started running the chaos drills, there were a lot of concerns about the value of such drills, and the time it will require. Well, since eliminating our fear from production is one of the key goals of this activity, we had to take care of that first.

"I must not fear.
 Fear is the mind-killer.
 Fear is the little-death that brings total obliteration.
 I will face my fear.
 I will permit it to pass over me and through me.
 And when it has gone past I will turn the inner eye to see its path.
 Where the fear has gone there will be nothing. Only I will remain."

(Litany Against Fear - Frank Herbert - Dune)

So we started a series of chats with the teams, in order to understand what was bothering them, and found ways to mitigate it. So here goes:

  • There’s an obvious need to avoid unnecessary damage.
    • We’ve created filters to ensure only approved targets get to participate in the drills.
      This has a side effect of pre-marking areas in the code we need to take care of.
    • We currently schedule drills via statuspage.io, so teams know when to be ready, and if the time is inappropriate,
      we reschedule.
    • When we introduce a new kind of fault, we let everybody know, and explain what should they prepare for in advance.
    • We started out from minor faults like graceful shutdowns, continued to graceless shutdowns,
      and moved on to more interesting testing like faulty network emulation.
  • We’ve measured the time teams spent on these drills, and it turned out to be negligible.
    Most of the time was spent on preparations. For example, ensuring we have proper alerting,
    and correct resilience features in the clients.
    This is actually something you need to do anyway. At the end of the day, we’ve heard no complaints about interruptions, nor time waste.
  • We’ve made sure teams, and engineers on the call were not left on their own. We wanted everybody to learn
    from this drill, and when they weren’t sure how to proceed, we jumped in to help. It’s important
    to make everyone feel safe about this drill, and remind everybody that we only want to learn and improve.

All that said, it’s important to remember that we basically simulate failures that occur on a daily basis. It’s only that when we do that in a controlled manner, it’s easier to observe where are our blind spots, what knowledge are we lacking, and what we need to improve.

Our roadmap – What next?

  • Up until now, this drill was executed in a semi-automatic procedure. The next level is to let the teams run this drill on a fixed interval, at a well known time.
  • Add new kinds of failures, like disk space issues, power failures, etc.
  • So far, we were only brave enough to run this on applicative nodes, and there’s no reason to stop there. Data-stores, load-balancers, network switches, and the like are also on our radar in the near future.
  • Multi-target failure injection. For example, inject a failure to a percentage of the instances of some module in a random cluster. Yes, even a full cluster outage should be tested at some point, in case you were asking yourself.

The GomJabbar Internals

GomJabbar is basically an integration between a discovery system, a (fault) command execution scheduler, and your desired configuration. The configuration contains mostly the target filtering rules, and fault commands.

The fault commands are completely up to you. Out of the box we provide the following example commands, (but you can really write your own script to do what suits your platform, needs, and architecture):

  • Graceful shutdowns of service instances.
  • Graceless shutdowns of service instances.
  • Faulty Network Emulation (high latency, and packet-loss).

Upon startup, GomJabbar drills down via the discovery system, fetches the clusters, modules, and their instances, and passes each via the filters provided in the configuration files. This process is also performed periodically. We currently support discovery via consul, but adding other methods of discovery is quite trivial.

When a users wishes to trigger faults, GomJabbar selects a random target, and returns it to the user, along with a token that identifies this target. The user can then trigger one of the configured fault commands, or scripts, on the random target. At this point GomJabbar uses the configured CommandExecutor in order to execute the remote commands on the target hosts.

GomJabbar also maintains a audit log of all executions, which allows you to revert quickly in the face of a real production issue, or an unexpected catastrophe cause by this tool.

What have we learned so far?

If you’ve read so far, you may be asking yourself what’s in it for me? What kind of lessons can I learn from these drills?

We’ve actually found and fixed many issues by running these drills, and here’s what we can share:

  1. We had broken monitoring and alerting around the detection of the integrity of our production environment. We wanted to make sure that everything that runs in our data-centers is managed, and at a well known (version, health, etc). We’ve found that we didn’t compute the difference between the desired state, and the actual state properly, due to reliance on bogus data-sources. This sort of bug attacked us from two sides: once when we triggered graceful shutdowns, and once for graceless shutdowns.
  2. We’ve found services that had no owner, became obsolete and were basically running unattended in production. The horror.
  3. During the faulty network emulations, we’ve found that we had clients that didn’t implement proper resilience features, and caused cascading failures in the consumers several layers up our service stack. We’ve also noticed that in some cases, the high latency also cascaded. This was fixed by adding proper timeouts, double-dispatch, and circuit-breakers.
  4. We’ve also found that these drills motivated developers to improve their knowledge about the metrics we expose, logs, and the troubleshooting tools we provide.

Conclusion

We’ve found the chaos drills to be an incredibly useful technique, which helps us improve our resilience and integrity, while helping everybody learn about how things work. We’re by no means anywhere near perfection. We’re actually pretty sure we’ll find many many more issues we need to take care of. We’re hoping this exciting new tool will help us move to the next level, and we hope you find it useful too 😉

I WANT IT ALL – Go Hybrid

When I was a kid, my parents used to tell me that I can’t have my cake and eat too.  Actually, that’s a lie, they never said that. Still, it is something I hear parents say quite often. And not just parents. I meet the same phrase everywhere I go. People constantly taking a firm, almost religious stance about choosing one thing over another: Mac vs PC, Android vs iOS, Chocolate vs Vanilla (obviously Chocolate!).

So I’d like to take a moment to take a different, more inclusive approach.

Forget Mac vs PC. Forget Chocolate vs Vanilla.

I don’t want to choose. I Want it all!

At Outbrain, the core of our compute infrastructure is based on bare metal servers. With a fleet of over 6000 physical nodes, spread across 3 data centers, we’ve learned over the years how to manage an efficient, tailored environment that caters to our unique needs. One of which being the processing and serving of over 250 Billion personalized recommendations a month, to over 550 Million unique users.

Still, we cannot deny that the Cloud brings forth advantages that are hard to achieve in bare metal environments. And in the spirit of inclusiveness (and maximizing value), we want to leverage these advantages to complement and extend what we’ve already built. Whether focusing on workloads that require a high level of elasticity, such as ad-hoc research projects involving large amount of data, or simply external services that can increase our productivity. We’ve come to view Cloud Solutions as supplemental to our tailored infrastructure rather than a replacement.

Over recent months, we’ve been experimenting with 3 different vectors involving the Cloud:

Elasticity

Our world revolves around publications, especially news. As such, whenever a major news event occurs, we feel immediate, potentially high impact. Users rush to publisher sites, where we are installed. They want their news, they want their recommendations, and they want them all now.

For example, when Carrie Fisher, AKA Princess Leia, passed away last December, we saw a 30% traffic increase on top of our usual peak traffic. That’s quite a spike.

Since usually we do not know when the breaking news event will be, it means that we are required to keep enough extra capacity to support such surges.

By leveraging the cloud, we can keep that additional extra capacity to bare minimum, relying instead on the inherent elasticity of the cloud, provisioning only what we need when we need it.

Doing this can improve the efficiency of our environment and cost model.

Ad-hoc Projects

A couple of months back one of researchers came up with an interesting behavioral hypothesis. For the discussion at hand, lets say that it was “people who like chocolate are more likely to raise pet gerbils.” (drop a comment with the word “gerbils” to let me know that you’ve read thus far). That sounded interesting, but raised a challenge. To validate or disprove this, we needed to analyze over 600 Terabytes of data.

We could have run it on our internal Hadoop environment, but that came with a not-so-trivial price tag. Not only did we have to provision additional capacity in our Hadoop cluster to support the workload, we anticipated the analysis to also carry impact on existing workloads running in the cluster. And all this before getting into operational aspects such as labor and lead time.

Instead, we chose to upload the data into Google’s BigQuery. This gave us both shorter lead times for the setup and very nice performance. In addition, 3 months into the project, when the analysis was completed, we simply shut down the environment and were done with it. As simple as that!

Productivity

We use Fastly for dynamic content acceleration. Given the scale we mentioned, this has the side-effect of generating about 15 Terabytes of Fastly access logs each month. For us, there’s a lot of interesting information in those logs. And so, we had 3 alternatives when deciding how to analyse them:

  •      SaaS based log analysis vendors
  •      An internal solution, based on the ELK stack
  •      A cloud based solution, based on BigQuery and DataStudio

After performing a PoC and running the numbers, we found that the BigQuery option – if done right – was the most effective for us. Both in terms of cost, and amount of required effort.

There are challenges when designing and running a hybrid environment. For example, you have to make sure you have consolidated tools to manage both on-prem and Cloud resources. The predictability of your monthly cost isn’t as trivial as before (no one likes surprises there!), controls around data can demand substantial investments… but that doesn’t make the fallback to “all Vanilla” or “all Chocolate” a good one. It just means that you need to be mindful and prepared to invest in tooling, education and processes.

Summary

I’d like to revisit my parents’ advice, and try to improve on it a bit (which I’m sure they won’t mind!):

Be curious. Check out what is out there. If you like what you see – try it out. At worst, you’ll learn something new. At best, you’ll have your cake… and eat it too.