Category: System Architecture

Spark in framework

Making Spark Native for Data Processing Framework

Working in an infrastructure team is different from working in a development team. The team develops tools that are used by developers’ teams inside Outbrain. It is very important to provide a convenient way for working with new technologies. Having a Spark cluster enables running distributed data processing in scale. Until now developers worked with Spark writing ad-hoc crontab jobs in production. It created a messy and unstable environment and required manual maintenance in case of failure. For that reason, our team wanted to give an infrastructural way for working with Spark.

In this blog post, I explain how we built the solution implementing Spark to be a native tool in our WorkflowEngine.

Meet WorkflowEngine

The Outbrain Data Infrastructure team enables running hundreds of ETL jobs every hour. These jobs are orchestrated by WorkflowEngine – an internal tool, which is developed by the team.

The WorkflowEngine (WFE) enables running various types of transformations from many kinds of data sources: MySQL, Cassandra, Hive, Vertica, Hadoop etc., located in two data centers and the cloud. Every transformation, or flow, implements some business logic. A flow may contain several tasks, which are executed one after another.

Flows are developed and owned by researchers, data scientists and developers in other groups at Outbrain, such as  Algorithm research groups, BI and Data Science.

input Trigger

Defining flows in WFE gives users a lot of benefits. It enables users to autonomously specify dependencies between flows, so when one flow ends, another is triggered. In addition, users can define a number of retries for their flows, so if a flow fails for some reason, WFE reruns it. If all retries fail, WFE sends an email or alert about the flow failures. Logs for each running flow are shown in a convenient form in WFE UI. Furthermore, WFE collects metrics for all flows, which enables us to track the health of the system.

From the operational perspective, several instances of WFE are deployed on machines which are owned by the infrastructure team, users just define ETL flows in a separate repository.

Diving Into the Solution

In order to respond to requirements for running data analytics on Spark, our team had to allow an infrastructural way for running Spark jobs orchestrated by WFE. To achieve this, we’ve developed a new kind of WorkflowEngine task called SparkStep.

When approaching the design for SparkStep, we had several principals in mind:

  1. Avoid producing additional load on WFE machines
  2. Create an isolated environment for every Spark job
  3. Allow users to specify and at the same time allow the system to limit the number of YARN resources allocated for every SPARK job
  4. Enable using multiple Spark clusters with different versions

Taking all this into consideration, we came up with the following architecture:

 

Keeping the control while having fun

The Spark job definition specifies how a  job will run on the Spark cluster (e.g. jarURL, className, configuration parameters). In addition, there are properties that are set internally by WFE. When WFE starts executing a SPARK job, it builds a spark-submit command using all these properties.

The spark-submit command submits the Spark job to YARN, which allocates needed resources and launches the application. After submitting the job, WFE constantly checks its status using YARN REST API. When the job is completed in YARN, WFE reports a COMPLETED status for the WFE Spark task and continues running the next task. However, if the Spark job fails in YARN, WFE reports a FAILED status for the WFE job. In this case, WFE can rerun the job, according to the defined number of retries. If the job runs in YARN for too long, and its running time exceeds the defined maximum, WFE stops the job using the YARN REST API. In this case, the job is marked as FAILED in WFE. This functionality ensures both that we have a cap on the amount of resources each flow can use and that resources are saved when a job is canceled for any reason.

The spark-submit command runs in a Docker container. The Docker exits once the job is submitted to the cluster. It enables us to create an isolated environment for every Spark job executed by WFE. In addition, using Docker container, we can easily use different Spark versions simultaneously, by creating different Docker images.

By using this architecture we achieve all our goals. Spark jobs are submitted to YARN in a “cluster” mode, so they do not create a load on WFE machines. YARN handles all resources for each job. Every Spark job runs in an isolated environment owing to the Docker container.

Taking the pain out of Data Science – part 2

This is post #2 in series of 3 posts covering our machine learning framework. We recommend to read post #1 first to understand the challenges we face and get an overview of our solution.
This part will focus on how we handle our data and make it more accessible – using an on-going data collection process.

Data Collection

The first part of any data science task, is getting a good dataset to work with. We have a lot of data, but preparing the datasets can be a very hard work – you really have to “get your hands dirty” to get the data from all the sources and tables and convert them into an easy to use dataset.

Data Collection

Challenges:

What are the main challenges in creating a good dataset to use?

  • Many output tables – tables that store requests for recommendations, served recommendations, user clicks, profiles for the users and documents and more.
  • Number of data stores – these tables are stored on a number of sources, due to their different nature. Some are Hive tables, some data is stored on MySQL, and on Cassandra as well.
  • Long queries – some of these tables are very big. Querying them, especially for a long date range, can take a while.
  • Irrelevant data – we rarely want data from our entire traffic. Usually we only want some portion of it which is relevant for the current modeling task.

Silos and partitioning:

In addition to these challenges, there are other advantages to a good data collection process.

We want to have the ability to train models on different silos – different population groups, that may behave differently and require different models.

To enable achieving this easily, we add a number of columns and partitions to our output aggregation tables – such as platform, country, language and more.

This allows us to quickly try out our models on different groups.

Output:

We decided to split our output into 2 main parts:

First, a dataset for building models: It will contain only the served recommendations we want (from specific variants of the traffic), and it should contain all of the clicked recommendations plus a sample of the non clicks, in order to have a balanced dataset for learning.

Second, a dataset that will be used for simulation of the business metrics.

Our recommendations are served in widgets, showing a small batch of recommendations.
For this use case, we take only recommendations from widgets that received at least one click.

With this dataset, we can apply our model only on these clicked widgets, and see how well we graded the clicked recommendation compared to the other non clicked recommendations.

Output

The solution – Automatic Spark job

Our solution to solve all these challenges was to create an automatic data collection job.

The job runs hourly, triggered by our ETL engine.
An hourly Apache Spark job aggregates an hourly dataset, with the relevant data; creates the needed partitions; and creates the two outputs described above.

Using Spark was very convenient for this use case. It allowed us to create a readable flow, that queries different input sources, and holds in memory data that is common for both tables before writing the final output to Hive.

 

A quick note on how we monitor our Spark jobs:

It is somewhat of a challenge to understand how a Spark job behaves, other the basic error messages and checking the job’s output.

To make the job’s progress and status more visible, we send metrics from the job’s driver, using HTTP, to our monitoring server, which aggregates them.

This allows us to create simple to use dashboards and understand with ease what is going on.

In addition we send out textual logs to our monitoring server, that indexes them to an ElasticSearch cluster. We can then view the logs with ease using Kibana.

Below is a dashboard example for a demo job, collecting a portion of our data.

dashboard example for a demo job

Stay tuned for our 3rd and last part of this blog post series where we will cover our Spark-based machine learning framework, that allows us to be highly agile with our research tasks, and dynamic as well as robust in pushing our models to production.

Taking the pain out of Data Science – RecSys machine learning framework over Spark, Part 1

Overview

The role of a Data Scientist, or Machine Learning engineer, is becoming more and more valuable in the tech industry. It is the fast growing job in the U.S. according to a LinkedIn study, and was recently ranked as the best job in America by Glassdoor.

However, the life of a Data Scientist isn’t easy – the job requires good Math and Statistics knowledge, programming background and experience, and “hacking” skills, in order to get things done. This is especially true when handling huge amounts of data of different types.

We, at the personalization team at Outbrain, decided to try and take out the pain of data science, and make our life easier to allow us to perform effective research with immediate production effect.

In this post series I will describe an end-to-end machine learning framework we developed, over Apache Spark, in order to address the different challenges our Data Scientists and Algorithm Engineers face.

Outbrain’s recommendation system machine learning challenge:

Our goal is recommending stories the user is most likely to be interested in, given the user’s interests and the current context.
We need to rank the stories in our inventory by the probability that the user will click to read, and display the top stories to the user.
Our supervision is the past user actions – given a user and a document with a set of features, did the user click or not.

Our data and features:

Outbrain generates a lot of data.
We get over 550 million unique monthly users, generate over 275 billion recommendations monthly and have more than 35 million clicks a day.

The first part to computing quality recommendations, is representing well our key players: The users and the documents.

We extract semantic features from each document that has our widget installed, using an NLP engine. The engine extracts semantic features on some levels of granularity, from high level categories to very granular entities.
For example, on the ‘Westworld’ story below, we extract:

  • High-level categories, such as entertainment.
  • Lower level topics – such as TV or murder.
  • Entities – persons, locations or companies, that the document discusses.

We represent our readers with similar semantic features.

We create an anonymous profile for each reader, based on content they were reading.

Each profile is an aggregation of the semantic features of each document the user read before.

Predictive models:

We use a variety of models, from three main types:

Content based models –

These models assume that there is a semantic connection between the documents the user likes to read.
The model uses the user profile, to find more documents that have similar semantic features to the ones we found out the user likes.
These could be stories within the same categories, from the same sites or covering a specific person or location.

Behavioural models –

Rather than assuming the user will want to keep reading documents on similar topics, it looks for connections between user interests, to other potential subjects that go well together.
For example, it may find that users that showed interest before on retirement investing, will be interested in the future on heart disease articles.

Collaborative models-

The third type, and potentially most powerful and interesting, are collaborative models.

These models use the wisdom of the crowd in order to recommend new content, potentially without the need to semantically understand it.
The basic idea is to find out readers with similar reading patterns; Find out what they like, in addition to the current user; and recommend these items.

Algorithms in this family use algebraic dimensionality reduction methods, such as Matrix Factorization or Factorization Machines, or finding a new, latent representation of the users and items using Deep Learning neural networks.

 

The process of data modeling consists of many common tasks.
To make this process efficient and enable agile research and productization, we have developed a general framework on top of Spark, containing 5 independent components.

Framework overview:

The system’s key components:

  1. Data collection
  2. Feature engineering
  3. Model training
  4. Offline evaluation and simulation
  5. Model deployment

The same system is used for both research and analysis of new algorithms, and also for running production models and updating them on a regular basis.

Next week, in part 2 of this blog post series, we will dive into the data collection flow which is a key ingredient to machine learning flows, and see how data is made more accessible using an automated Spark process.
Part 3 will cover our modeling framework, developed on top of Spark, that allows us to be highly agile with our research tasks, and dynamic as well as robust in pushing our models to production. Stay tuned!

Switches, Penguins and One Bad Cable

Back in May 2017, I was scheduled to speak at the DoTC conference in Melbourne. I was really excited and looking forward to it, but fate had different plans. And lots of them. From my son going through an emergency appendicitis operation, through flight delays, and up to an emergency landing back in Tel Aviv… I ended up missing the opportunity to speak at the conference. Amazingly, something similar happened this year! Maybe 3rd time’s a charm?

The post below is the talk I’d planned to give, converted to a blog format.


 

August 13, 2015. Outbrain’s ops on call is just getting out of his car when his phone rings. It’s a PagerDuty alert. Some kind of latency issue in the Chicago data center. He acks it, figuring he’ll unload the groceries first and then get round to it. But then, his phone rings again. And again.

Forget the groceries. Forget the barbecue. Production is on fire.

18 hours and many tired engineers later, we’re recovering from having lost our Chicago datacenter. In the takein that follows, we trace the root cause to a single network cable that’s mistakenly connected to the wrong switch.

Hi, my name is Alex, and I lead the Core Services group at Outbrain. Our group owns everything from the floor that hosts Outbrain’s servers, to the delivery pipelines that ship Outbrain’s code. If you’re here, you’ve likely heard of Outbrain. You probably know that we’re the world’s leading Discovery platform, and that you’ll find us installed on publisher sites like CNN, The Guardian, Time Inc and the Australian news.com, where we serve their readers with premium recommendations.

But it wasn’t always this way.

You see, back when we started, life was simple: all you had to do was throw a bunch of Linux servers in a rack, plug them into a switch, write some code… and sell it. And that we did!

But then, an amazing thing happened. The code that we wrote actually worked and customers started showing up. And they did the most spectacular and terrifying thing ever – they made us grow. One server rack turned into two and then three and four. And before we knew it, we had a whole bunch of racks, full of penguins plugged into switches. It wasn’t as simple as before, but it was manageable. Business was growing, and so were we.

Fast forward a few years.

We’re running quite a few racks across 2 datacenters. We’re not huge, but we’re not a tiny startup anymore. We have actual paying customers, and we have a service to keep up and running. Internally, we’re talking about things like scale, automation, and all that stuff. And we understand that the network is going to need some work. By now, we’ve reached the conclusion that managing a lot of switches is time-consuming, error-prone, and frankly, not all that interesting. We want to focus on other things, so we break the network challenge down to 2 main topics:

Management and Availability.

Fortunately, management doesn’t look like a very big problem. Instead of managing each switch independently, we go for a something called “a stack”. In essence, it turns 8 switches into one logical unit. At full density, it lets us treat 4 racks as a single logical switch. With 80 nodes per rack, that’s 320 nodes. Quite a bit of computes power!

Four of these setups – about 1200 nodes.

Across two datacenters? 2400 nodes. Easily 10x our size.

Now that’s very impressive, but what if something goes wrong? What if one of these stacks fails? Well, if the whole thing goes down, we lose all 320 nodes. Sure, there’s built-in redundancy for the stack’s master, and losing a non-master switch is far less painful, but even then, 40 nodes going down because of one switch? That’s a lot.

So we give it some thought and come up with a simple solution. Instead of using one of these units in each rack, we’ll use two. Each node will have a connection to stack A, and another to stack B. If stack A fails, we’ll still be able to go through stack B, and vice versa. Perfect!

In order to pull that off, we have to make these two separate stacks, which are actually two separate networks, somehow connect. Our solution to that is to set up bonding on the server side, making its two separate network interfaces look like a single, logical one. On the stack side, we connect everything to one big, happy, shared backbone. With its own redundant setup, of course.

In case you’re still keeping track of the math, you might notice that we just doubled the number of stacks per datacenter. But we still gained simple management And high availability at 10x scale. All this without having to invest in expensive, proprietary management solutions. Or even having to scale the team.

And so, it is decided. We build our glorious, stack-based topology. And the land has peace for 40 years. Or… months.

Fast forward 40 months.

We’re running quite a few racks across 3 datacenters. We’re serving customers like CNN, The Guardian, Time Inc and the Australian news.com. We reach over 500 million people worldwide, serving 250 billion recommendations a month.

We’re using Chef to automate our servers, with over 300 cookbooks and 1000 roles.

We’re practicing Continuous Delivery, with over 150 releases to production a day.

We’re managing petabytes of data in Hadoop, Elasticsearch, Mysql, Cassandra.

We’re generating over 6 million metrics every minute, have thousands of alerts and dozens of dashboards.

Infrastructure as Code is our religion. And as for our glorious network setup? it’s completely, fully, 100% … manual.

No, really. It’s the darkest, scariest part of our infrastructure.

I mean hey, don’t get me wrong, it’s working, it’s allowed us to scale to many thousands of nodes. But every change in the switches is risky because it’s done using the infamous “config management” called “copy-paste”.

The switching software stack and protocols are proprietary, especially the secret sauce that glues the stacks together. Which makes debugging issues a tiring back-and-forth with support at best, or more often just a blind hit-and-miss. The lead time to setting up a new stack is measured in weeks, with risk of creating network loops and bringing a whole datacenter down. Remember August 13th, 2015? We do.

Again, don’t get me wrong, it’s working, it’s allowed us to scale to many thousands of nodes. And it’s not like we babysit the solution on daily basis. But it’s definitely not Infrastructure as Code. And there’s no way it’s going to scale us to the next 10x.

Fast forward to June 2016.

We’re still running across 3 data centers, thousands of nodes. CNN, The Guardian, Time Inc, the Australian news.com. 500 million users. 250 billion recommendations. You get it.

But something is different.

We’re just bringing up a new datacenter, replacing the oldest of the three. And in it, we’re rolling out a new network topology. It’s called a Clos Fabric, and it’s running BGP end-to-end. It’s based on a design created by Charles Clos for analog telephony switches, back in the 50’s. And on the somewhat more recent RFCs, authored by Facebook, that bring the concept to IP networks.

In this setup, each node is connected to 2 top-of-rack switches, called leaves. And each leaf is connected to a bunch of end-of-row switches, called spines. But there’s no bonding here and no backbone. Instead, what glues this network together, is that fact that everything in it is a router. And I do mean everything – every switch, every server. They publish their IP addresses over all of their interfaces, essentially telling their neighbors, “Hi, I’m here, and you can reach me through these paths.” And since their neighbors are routers as well, they propagate that information.

Thus a map of all possible paths to all possible destinations is constructed, hop-by-hop, and held by each router in the network. Which, as I mentioned, is everyone. But it gets even better.

We’ve already mentioned that each node is connected to two leaf switches. And that each leaf is connected to a bunch of spines switches. It’s also worth mentioning that they’re not just “connected”. They’re wired the exact same way. Which means, that any path between two points in the network is the exact same distance. And what THAT means is that we can rely on something called ECMP. Which, in plain English, means “just send the packets down any available path, they’re all the same anyway”. And ECMP opens up interesting options for high availability and load distribution.

Let’s pause to consider some of the gains here:

First, this is a really simple setup. All the leaf switches are the same. And so are all of the spines. It doesn’t matter if you have one, two or thirty. And pretty much the same goes for cables. This greatly simplifies inventory, device and firmware management.

Second, it’s predictable. You know the exact amount of hops from any one node in the network to any other: It’s either two or four, no more, no less. Wiring is predictable as well. We know exactly what gets connected where, and what are the exact cable lengths, right from design phase. (spoiler alert:) We can even validate this in software.

Third, it’s dead easy to scale. When designing the fabric, you choose how many racks it’ll support, and at what oversubscription ratio. I’ll spare you the math and just say:

You want more bandwidth? Add more spines.

Support more racks? Go for spines with higher port density.

Finally, high availability is built into the solution. If a link goes down, BGP will make sure all routers are aware. And everything will still work the same way, because with our wiring scheme and ECMP, all paths are created equal. Take THAT evil bonding driver!

But it doesn’t end there. Scaling the pipes is only half the story. What about device management? The infamous copy-paste? Cable management? A single misconnected cable that could bring a whole datacenter down? What about those?

Glad you asked 🙂

After a long, thorough evaluation of multiple vendors, we chose Cumulus Networks as our switch Operating System vendor, and Dell as our switch hardware vendor. Much like you would with servers, by choosing Enterprise Redhat, Suse or Ubuntu. Or with mobile devices, by choosing Android. We chose a solution that decouples the switch OS from the hardware it’s running on. One that lets us select hardware from a list of certified vendors, like Dell, HP, Mellanox and others.

So now our switches run Cumulus Linux, allowing us use the very same tools that manage our fleet of servers, to now manage our fleet of switches. To apply the same open mindset in what was previously a closed, proprietary world.

In fact, when we designed the new datacenter, we wrote Chef cookbooks to automate provisioning and config. We wrote unit and integration tests using Chef’s toolchain and setup a CI pipeline for the code. We even simulated the entire datacenter, switches, servers and all, using Vagrant.

It worked so well, that bootstrapping the new datacenter took us just 5 days. Think about it:

the first time we ever saw a real Dell switch running Cumulus Linux was when we arrived on-site for the buildout. And yet, 99% of our code worked as expected. In 5 days, we were able to setup a LAN, VPN, server provisioning, DNS, LDAP and deal with some quirky BIOS configs. On the servers, mind you, not the switches.

We even hooked Cumulus’ built-in cabling validation to our Prometheus based monitoring system. So that right after we turned monitoring on, we got an alert. On one bad cable. Out of 3000.

Infrastructure as Code anyone?

 

Live Tail in Kubernetes / Docker Based environment

At Outbrain we are big believers in Observability.

What is Observability, and what is the difference between Observability and Monitoring? I will leave the explanation to Baron Schwartz @xaprb:

“Monitoring tells you whether the system works.  Observability lets you ask why it’s not working.”

@ Outbrain we are currently in the midst of migrating to a Kubernetes / Docker based environment.

This presented many new challenges around understanding why things don’t work.

In this post I will be sharing with you our logging implementation which is the first tool used to understand the why.

But first thing first, a short review of our current standard logging architecture:

We use a standard ELK stack for the majority of our logging needs. By standard I mean Logstash on bare metal nodes, Elasticsearch for storage and Kibana for visualizing and analytics.  Apache Kafka is transport layer for all of the above.

A very simplified sketch of the system:Live Tail in Kubernetes

Of course the setup is a bit more complex in real life since Outbrain’s infrastructure is spread across thousands of servers, in multiple physical data centers and cloud providers; and there are multiple Elasticsearch clusters for different use cases.

Add to the equation that these systems are used in a self-serve model, meaning the engineers are creating and updating configurations by themselves – and you end up with a complex system which must be robust and resilient, or the users will lose trust in the system.

The move to Kubernetes presented new challenges and requirements, specifically related to the logging tools:

  • Support multiple Kubernetes clusters and data centers.
  • We don’t want to us “kubectl”, because managing keys is a pain especially in a multi cluster environment.
  • Provide a way to tail logs and even edit log file. This should be available on a single pod or across a service deployed in multiple pods.
  • Leverage existing technologies: Kafka, ELK stack and Log4j on the client side
  • Support all existing logging sources like multiline and Json.
  • Don’t forget services which don’t run in Kubernetes, yes we still need to support those.

 

So how did we meet all those requirements? Time to talk about our new Logging design.

The new architecture is based on a standard Kubernetes logging setup – Fluentd daemonset running on each Kubelet node, and all services are configured to send logs to stdout / err  instead of a file.

The Fluentd agent is collecting the pod’s logs and adding the Kubernetes level labels to every message.

The Fluentd plugin we’re using is the kubernetes_metadata_filter.

After the messages are enriched they are stored in a Kafka topic.

A pool of Logstash agents (Running as pods in Kubernetes) are consuming and parsing messages from Kafka as needed.

Once parsed messages can be indexed into Elasticsearch or routed to another topic.

A sketch of the setup described:

A sketch of the setup described:

And now it is time to introduce CTail.

Ctail, stands for Containers Tail, it is an Outbrain homegrown tool written in Go, and based on a server and client side components.

A CTail server-side component runs per datacenter or per Kubernetes cluster, consuming messages from a Kafka topic named “CTail” and based on the Kubernetes app label creates a stream which can be consumed via the CTail client component.

Since order is important for log messages, and since Kafka only guarantees order for messages in the same partition, we had to make sure messages are partitioned by the pod_id.

With this new setup and tooling, when Outbrain engineers want to live tail their logs, all they need to do is launch the CTail client.

Once the Ctail client starts, it will query Consul, which is what we use for service discovery, to locate all of the CTail servers; register to their streams and will perform aggregations in memory – resulting in a live stream of log entries.

Here is a sketch demonstrating the environment and an example of the CTail client output:

CTail client output

 

To view logs from all pods of a service called “ob1ktemplate” all you need is to run is:

# ctail-client -service ob1ktemplate -msg-only

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: uri http://localhost:8181/Ob1kTemplate/ returned status code 200
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: Getting uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate'
2017-06-13T19:16:25.531Z ob1ktemplate-test-ssages-2751568954-n1rte: uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate' returned status code 200

Or logs of a specific pod:

# ctail-client -service ob1ktemplate -msg-only -pod ob1ktemplate-test-ssages-2751568960-n1kwd

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri 
http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751568960-n1kwd: uri http://localhost:8181/Ob1kTemplate/ returned status code 200

 

This is how we solve this challenge.

Interested in reading more about other challenges we encountered during the migration? Either wait for our next blog, or reach out to visibility at outbrain.com.

African Elephant (Loxodonta africana)

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime

Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.

A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed to be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding, and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:

  1. In place: In place migration of the existing cluster to the new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However, since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right.

First Steps

To better understand the chosen solution let’s take a look at our current architecture:

current architecture:

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity, the diagram describes only one cluster.

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in-house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration, it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.

Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:

data delivery pipeline

Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.

Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.

Well… not really.

Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the databases for no good reason, it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.

So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.

When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.

Let’s Do This…

Phase 1 of the migration should look something like this:

Let's Do This...

 

Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.

Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals.

Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:

new cluster

In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.

The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.

What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active-active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflow engines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.

What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project, you can check out the video from Strata 2017 London where I discussed it in more detail.

Failure Testing for your private cloud – Introducing GomJabbar

TL;DR Chaos Drills can contribute a lot to your services resilience, and it’s actually quite a fun activity. We’ve built a tool called GomJabbar to help you run those drills.


Failure Testing for your private cloud

Here at Outbrain we manage quite a large scale deployment of hundreds of services/modules, and thousands of hosts. We practice CI/CD, and implemented quite a sound infrastructure, which we believe is scalable, performant, and resilient. We do however experience many production issues on a daily basis, just like any other large scale organization. You simply can’t ensure a 100% fault-free system. Servers will crash, run out of disk space, and lose connectivity to the network. Software will experience bugs, and erroneous conditions. Our job as software engineers is to anticipate these conditions, and design our code to handle them gracefully.

For quite a long time we were looking into ways of improving our resilience, and validate our assumptions, using a tool like Netflix’s Chaos Monkey. We also wanted to make sure our alerting system actually triggers when things go wrong. The main problem we were facing is that Chaos Monkey is a tool that was designed to work with cloud infrastructure, while we maintain our own private cloud.

The main motivation for developing such a tool is that failures have the tendency of occurring when you’re least prepared, and in the least desirable time, e.g. Friday nights, when you’re out having a pint with your buddies. Now, to be honest with ourselves, when things fail during inconvenient times, we don’t always roll our sleeves and dive in to look for the root cause. Many times the incident will end after a service restart, and once the alerts clear we forget about it.

Wouldn’t it be great if we could have “chaos drills”, where we could practice handling failures, test and validate our assumptions, and learn how to improve our infrastructure?

Chaos Drills at Outbrain

We built GomJabbar exactly for the reasons specified above. Once a week, at a well known time, mid day, we randomly select a few targets where we trigger failures. At this point, the system should either auto-detect the failures, and auto-heal, or bypass them. In some cases alerts should be triggered to let teams know that a manual intervention is required.

After each chaos drill we conduct a quick take-in session for each of the triggered failures, and ask ourselves the following questions:

  1. Did the system handle the failure case correctly?
  2. Was our alerting strategy effective?
  3. Did the team have the knowledge to handle, and troubleshoot the failure?
  4. Was the issue investigated thoroughly?

These take-ins lead to super valuable inputs, which we probably wouldn’t collect any other way.

How did we kick this off?

Before we started running the chaos drills, there were a lot of concerns about the value of such drills, and the time it will require. Well, since eliminating our fear from production is one of the key goals of this activity, we had to take care of that first.

"I must not fear.
 Fear is the mind-killer.
 Fear is the little-death that brings total obliteration.
 I will face my fear.
 I will permit it to pass over me and through me.
 And when it has gone past I will turn the inner eye to see its path.
 Where the fear has gone there will be nothing. Only I will remain."

(Litany Against Fear - Frank Herbert - Dune)

So we started a series of chats with the teams, in order to understand what was bothering them, and found ways to mitigate it. So here goes:

  • There’s an obvious need to avoid unnecessary damage.
    • We’ve created filters to ensure only approved targets get to participate in the drills.
      This has a side effect of pre-marking areas in the code we need to take care of.
    • We currently schedule drills via statuspage.io, so teams know when to be ready, and if the time is inappropriate,
      we reschedule.
    • When we introduce a new kind of fault, we let everybody know, and explain what should they prepare for in advance.
    • We started out from minor faults like graceful shutdowns, continued to graceless shutdowns,
      and moved on to more interesting testing like faulty network emulation.
  • We’ve measured the time teams spent on these drills, and it turned out to be negligible.
    Most of the time was spent on preparations. For example, ensuring we have proper alerting,
    and correct resilience features in the clients.
    This is actually something you need to do anyway. At the end of the day, we’ve heard no complaints about interruptions, nor time waste.
  • We’ve made sure teams, and engineers on the call were not left on their own. We wanted everybody to learn
    from this drill, and when they weren’t sure how to proceed, we jumped in to help. It’s important
    to make everyone feel safe about this drill, and remind everybody that we only want to learn and improve.

All that said, it’s important to remember that we basically simulate failures that occur on a daily basis. It’s only that when we do that in a controlled manner, it’s easier to observe where are our blind spots, what knowledge are we lacking, and what we need to improve.

Our roadmap – What next?

  • Up until now, this drill was executed in a semi-automatic procedure. The next level is to let the teams run this drill on a fixed interval, at a well known time.
  • Add new kinds of failures, like disk space issues, power failures, etc.
  • So far, we were only brave enough to run this on applicative nodes, and there’s no reason to stop there. Data-stores, load-balancers, network switches, and the like are also on our radar in the near future.
  • Multi-target failure injection. For example, inject a failure to a percentage of the instances of some module in a random cluster. Yes, even a full cluster outage should be tested at some point, in case you were asking yourself.

The GomJabbar Internals

GomJabbar is basically an integration between a discovery system, a (fault) command execution scheduler, and your desired configuration. The configuration contains mostly the target filtering rules, and fault commands.

The fault commands are completely up to you. Out of the box we provide the following example commands, (but you can really write your own script to do what suits your platform, needs, and architecture):

  • Graceful shutdowns of service instances.
  • Graceless shutdowns of service instances.
  • Faulty Network Emulation (high latency, and packet-loss).

Upon startup, GomJabbar drills down via the discovery system, fetches the clusters, modules, and their instances, and passes each via the filters provided in the configuration files. This process is also performed periodically. We currently support discovery via consul, but adding other methods of discovery is quite trivial.

When a users wishes to trigger faults, GomJabbar selects a random target, and returns it to the user, along with a token that identifies this target. The user can then trigger one of the configured fault commands, or scripts, on the random target. At this point GomJabbar uses the configured CommandExecutor in order to execute the remote commands on the target hosts.

GomJabbar also maintains a audit log of all executions, which allows you to revert quickly in the face of a real production issue, or an unexpected catastrophe cause by this tool.

What have we learned so far?

If you’ve read so far, you may be asking yourself what’s in it for me? What kind of lessons can I learn from these drills?

We’ve actually found and fixed many issues by running these drills, and here’s what we can share:

  1. We had broken monitoring and alerting around the detection of the integrity of our production environment. We wanted to make sure that everything that runs in our data-centers is managed, and at a well known (version, health, etc). We’ve found that we didn’t compute the difference between the desired state, and the actual state properly, due to reliance on bogus data-sources. This sort of bug attacked us from two sides: once when we triggered graceful shutdowns, and once for graceless shutdowns.
  2. We’ve found services that had no owner, became obsolete and were basically running unattended in production. The horror.
  3. During the faulty network emulations, we’ve found that we had clients that didn’t implement proper resilience features, and caused cascading failures in the consumers several layers up our service stack. We’ve also noticed that in some cases, the high latency also cascaded. This was fixed by adding proper timeouts, double-dispatch, and circuit-breakers.
  4. We’ve also found that these drills motivated developers to improve their knowledge about the metrics we expose, logs, and the troubleshooting tools we provide.

Conclusion

We’ve found the chaos drills to be an incredibly useful technique, which helps us improve our resilience and integrity, while helping everybody learn about how things work. We’re by no means anywhere near perfection. We’re actually pretty sure we’ll find many many more issues we need to take care of. We’re hoping this exciting new tool will help us move to the next level, and we hope you find it useful too 😉

ScyllaDB POC – (not so :) live blogging – Update #3.

Scylla attacking Olysseus's ship
Scylla attacking Olysseus’s ship

It has been a long time (more than 4 months) since we last updated.

You can read the previous update here.

It is not that we abandoned the POC, we actually continued to invest time and effort on it since there is a good progress. It is just that we did not yet ended it and got the proofs that we wanted. While there was a lot of progress in both Outbrain system side and ScyllaDB side on those 4 months, there is one things that is holding us back from showing trying to prove the main point of this POC. Our current bottleneck is the network. The network on the datacenter where we are running the tests on is 1Gbps ethernet network. We found out that although Scylla is not loaded and works with good latencies we are saturating the NICs. We did some improvements along the way to still show that Scylla is behaving better than C* but if we want to show that we can significantly reduce the number of nodes in the cluster, we need to upgrade to 10Gbps ethernet.

This upgrade will come shortly.

This is where we currently stand. However – a lot was done in those 4 months and there is a lot of learnings I want to share. The rest of the post is the way Doron and the Scylla guys describes what happened. It looks more like captain’s log but it tells the story pretty well.

Scylla:

  • 23/6 – We created special app server cluster to call Scylla, and delegated all calls both to C* and Scylla cluster. We wanted to do that so there will be less coupling between the C* path and the Scylla path and less mutual interruptions that will interfere our conclusions. The app servers for Scylla were configured not to use cache, so entire load (1.5M-3M RPM) went directly to Scylla. C* stayed behind cache and actually handled ~10% of the load. This went smoothly.
  • In the following ~3 weeks we tried to load the snapshot again from C* and stumbled with some difficulties, some related to bugs in Scylla, some to networking limits (1Gpbs). During this time we had to stop the writes to Scylla for few days, so the data was not sync again. Some actions we have done to resolve
    1. We investigated bursts of usage we had and decreased them in some use-cases (both for C* and Scylla). They caused the network usage to be very high for few seconds, sometimes for a few tens of milliseconds. This also helped C* a lot. The tool is now open source.
    2. We added client-server compression (LZ4). It was supported by Scylla, but client needed to configure it.
    3. Scylla added server-server compression during this period.
    4. Changed the “multi” calls back to N parallel single calls (instead of one IN request) – it better utilize the network.
    5. Scylla client was (mistakably) using latency aware over the token aware. This caused app to go to the “wrong” node a lot – causing more traffic within Scylla nodes. Removing the latency-aware helped reducing the server-server network usage and the overall latency.
  • 14/7 – with all the above fixes (and more from Scylla) we were able to load the data and stabilize the cluster with the entire load.
  • Until 29/7 I see many spikes in the latency. We are  not sure what we did to fix it… but on 29/7 the spikes stopped and the latency is stable until today.
  • During this period we have seen 1-5 errors from Scylla per minute. Those errors were explained by trying to reach partitions coming from very old C* version. It was verified by logging the partitions we fail for in the app server side. Scylla fixed that on 29/7.
  • 7/8-15/8 – we have changed the consistency level of both Scylla and C* to local one (to test C*) – this caused a slight decrease in the (already) low latencies.
  • Up to 19/8 we have seen occasional momentarily errors coming from Scylla (few hundreds every few hours). This has not happened since 19/8.. I don’t think we can explain why.
  • Current latencies – Scylla holds over 2M RPM with latency of 2 ms (99%ile) for single requests and 40-50 ms (99%ile) for multi requests of ~100 partitions in avg per request. Latencies are very stable with no apparent spikes. All latencies are measured from the app servers.

Next steps on Scylla:

  • Load the data again from C* to sync.
  • Repeat the data consistency check to verify the results from C* and Scylla are still the same.
  • Run repairs to see cluster can hold while running heavy tasks in the background..
  • Try to understand the errors I mentioned above if they repeat.
  • Create the cluster in Outbrain’s new Sacramento datacenter that have 10Gbps network. with minimum nodes (3?) and try the same load there.

Cassandra:

  • 7/8 – we changed consistency level to local-one and tried to remove cache from Cassandra. The test was successful and Cassandra handled the full load with latency increasing from 5 ms (99%ile for single requests) to 10-15ms in the peak hours.
  • 15/8 – we changed back to local-quorum (we do not like to have local-one for this cluster… we can explain in more details why) and set the cache back.
  • 21/8 – we removed the cache again, this time with local-quorum. Cassandra handled it, but single requests latency increased to 30-40 ms for the 99%ile in the peak hours. In addition, we have started timeouts from Cassandra (timeout is 1 second) – up to 500 per minute, in the peak hours.

Next steps on C*:

  • Run repairs to see cluster can hold while running heavy tasks in the BG.
  • Try compression (like we do with Scylla).
  • Try some additional tweaks by the C* expert.
  • In case errors continue, will have to set cache back.

Current status comparison – Aug 20th:

The following table shows comparison under load of 2m RPM in peak hours.

Latencies are in 99%ile.

Cassandra ScyllaDB
Single call latency 30-40 ms (spikes to 70) 2 ms
Multi call latency 150-200 ms (spikes to 600) 40-50 ms
Errors (note: these are query times exceeding 1 second, not necessarily database failures) Up to 150 a minute every few minutes timeouts per minute, with some higher spikes every few days few hundreds every few days

Below are the graphs showing the differences.

Latency and errors graphs showing both C* and Scylla getting requests without cache (1M-2M RPM):

Latency comparison of single requests

Latency comparison of single requests

Latency comparison of multi requests

Latency comparison of multi requests

Errors (timeouts for queries > 1 second)

Errors (timeouts for queries > 1 second)

Summary

There are very good signs that Scylla DB does make a difference in throughput but due to the network bottleneck, we could not verify it. We will update as soon as we have results on a faster network. Scylla guys are working on solution for slower networks too.

Hope to update soon.

Micro Service Split

In this post, I will describe a technical methodology we used to remove a piece of functionality from a Monolith/Service and turn it into a Micro-Service. I will try to reason about some of the decisions we have made and the path we took, as well as a more detailed description of internal tools, libraries and frameworks we use at Outbrain and in our team, to shed some light on the way we work in the team. And as a bonus you might learn from our mistakes!
Let’s start with the description of the original service and what it does.
Outbrain runs the largest content discovery platform. From the web surfer’s perspective it means serving a recommended content list that might interest her, in the form of ‘You might also like’ links. Some of those impression links are sponsored. ie: when she clicks on a link, someone is paying for that click, and the revenue is shared between Outbrain and the owner of the page with the link on it. That is how Outbrain makes its revenue.

My team, among other things, is responsible for the routing of the user to the requested page after pressing the link, and for the bookkeeping and accounting that is required in order to calculate the cost of the click, who should be charged, etc.
In our case the service we are trying to split is the ‘Bookkeeper’. Its primary role is to manage the paid impression links budget. After a budget is spent, The ‘Bookkeeper’ should notify Outbrain’s backend servers to refrain from showing the impression link again. And this has to be done as fast as possible. If not, people will click on links we cannot charge because the budget was already spent. Technically, this is done by an update to a database record. However, there are other cases we might want to stop the exposure of impression links. One such an example is a request from the customer paying for the future click to disable the impression link exposure. So for such cases, we have an API endpoint that does exactly the same with the same code. That endpoint is actually part of the ‘Bookkeeper’ that is enabled by a feature toggle on specific machines. This ‘Activate-Impressionable’ endpoint as we call it, is what was decided to split out of the ‘Bookkeeper’ into a designated Micro-Service.
In order to execute the split, we have chosen a step-by-step strategy that will allow us to reduce the risk during execution and keep it as controlled and reversible as possible. From a bird’s eye view I will describe it as a three steps process: Plan, Up and Running as fast as possible and Refactor. The rest of the post describes these steps.

Plan (The who’s and why’s)

In my opinion, this is the most important step. You don’t want to split a service just in order to split. Each Micro Service introduces maintenance and management overhead, with its own set of challenges[1]. On the other hand, Microservices architecture is known for its benefits such as code maintainability (for each Micro Service), the ability to scale out and improved resilience[2].
Luckily for me, someone already did that part for me and took the decision that ‘Activate-Impressionable’ should split from the ‘Bookkeeper’. But still, Let’s name some of the key factor of our planning step.
Basically, I would say that a good separation is a logical separation with its own non-overlap RESTful endpoints and isolated code base. The logical separation should be clear. You should think what is the functionality of the new service, and how isolated it is. It is possible to analyze the code for inter-dependencies among classes and packages using tools such as lattix. At the bottom line, it is important to have a clear definition of the responsibility of the new Micro Service.
In our case, the ‘Bookkeeper’ was eventually split so that it remain the bigger component, ‘Activate-Impressionable’ was smaller and the common library was smaller than both. The exact lines of code can be seen in the table below.

component

Unfortunately, I assessed it only after the split and not in the plan step. We might say that there is too much code in common when looking at those numbers. It is something worth considering when deciding what to split. A lot of common code implies low isolation level.

Of course part of the planning is time estimation. Although I am not a big fan of “guestimates” I can tell that the task was planned for couple of weeks and took about that long.
Now that we have a plan, let’s get to work.

Up and Running – A Step by Step guide

As in every good refactor, we want to do it in small baby steps, and remain ‘green’ all the time[3]. In continuous deployment that means we can and do deploy to production as often as possible to make sure it is ‘business as usual’. In this step we want to get to a point the new service is working in parallel to the original service. At the end of this step we will point our load-balancers to the new service endpoints. In addition, the code remains mutual in this step, means we can always deploy the original fully functioning ‘Bookkeeper’. We actually do that if we feel the latest changes had any risk.
So let’s break it down into the actual phases:

Overview Step Details
micro service split 0 Starting phase
micro service split 1 Create the new empty Micro-Service ‘Activate-Impressionable’. In outbrain we do it using scaffolding of ob1k framework. Ob1k is an open source Micro Services Framework that was developed in-house.
micro service split 2 Create a new empty Library dependent both by the new ‘Activate-Impressionable’ service and the ‘Bookkeeper’. Ideally, if there is a full logic separation with no mutual code between the services that library will be deleted in the cleanup phase.
micro service split 3 Move the relevant source code to the library. Luckily in our case, there was one directory that was clearly what we have to split out. Unluckily, that code also pulled up some more code it was dependent on and this had to be done carefully not to pull too much nor too little. The good news are that this phase is pretty safe for static typing languages such as Java, in which our service is written in. The compiler protects us here with compilation errors so the feedback loop is very short. Tip: don’t forget to move unit tests as well.
micro service split 4 Move common resources to the library, such as spring beans defined in xml files and our feature flags files that defined in yaml files. This is the dangerous part. We don’t have the compiler here to help, so we actually test it in production. And when I say production I mean using staging/canary/any environment with production configuration but without real impact. Luckily again, both yaml and spring beans are configured to fail fast, so if we did something wrong it will just blow out in our face and the service will refuse to go up. For this step I even ended up developing a one-liner bash script to assist with those wicked yaml files.
micro service split 5 Copy and edit web resources (web.xml) to define the service endpoints. In our case web.xml can’t reside in a library so it has to be copied. Remember we still want the endpoints active in the ‘Bookkeeper’ at that phase. Lesson learned: inspect all files closely. In our case log4j.xml which seems like an innocent file by its name contains designated appenders that are consumed by other production services. I didn’t notice that and didn’t move the required appender, and it was found only later in production.
Deploy Deploy the new service to production. What we did is deploy the ‘Activate-Impressionable’ side-by-side on the same machines as the ‘Bookkeeper’, just with a different ports and context path. Definitely makes you sleep better at night.
Up-And-Running Now is a good time to test once again if both ‘Bookkeeper’ and ‘Activate-Impressionable’ are working as expected. Generally now we are up and running with only few more things to do here.
Clients Redirect Point clients of the service to the new endpoints (port + context path). A step that might take some time depends on the number of clients and the ability to redeploy them. In outbrain we use HA-Proxy, so reconfiguring it did most of the work, but some clients did require code modifications.
(More) Validation Move/copy simulator tests and monitors. In our team, we heavily rely on tests we call simulator tests. These are actually black-box tests written in JUnit that runs against the service installed on a designated machine. These tests see the service as a black-box and calls its endpoints while mock/emulate other services and data in the database for the test run. So usually a test run can look like: put something in the database, trigger the endpoint, and see the result in the database or in the http response. There is also a question here whether to test ‘Activate-Impressionable’ or the ‘Bookkeeper’. Ideally you will test them both (tests are duplicated for that phase), and that is what we did.

Refactor, Disconnect & Cleanup

When we got here the new service is working and we should expect no more behavior changes from the endpoints point of view. But we still want the code to be fully split and the services to be independent from each other. In the previous step we performed the phases in a way that everything remains reversible with a simple feature toggle & deploy.

In this step, we move to a state where the ‘Bookkeeper’ will no longer host the ‘Activate-Impressionable’ functionality. Sometimes it is a good idea to have a gap from the previous step to make sure that there are no rejections and backfires that we didn’t trace in our tests and monitoring.
First thing, If was not done up until now, is deploying the ‘Bookkeeper’ without the service functionality and make sure everything is still working. And wait a little bit more…
And now we just have to push the sources and the resources from the library to the ‘Activate-Impressionable’ service. In the ideal case there is no common code, we can also delete the library. This was not how it was in our case. We still have a lot of common code we can’t separate for the time being.
Now is also the time to do resources cleanup, web.xml edit etc’.
And for the bold and OCD among us – packages rename and refactor of code with the new service naming conventions.

Conclusion

The entire process in our case took a couple of weeks. Part of the fun and advantage in such process, is the opportunity to know better an old code and its structure and functionality without the need to modify something for a new feature with its constraints. Especially when someone else wrote it originally.
In order to perform well such a process it is important to plan and remain organized and on track. In case of a context switch it is very important to keep a bookmark of where you need to return to in order to continue. In our team we even did that with a handoff of the task between developers. Extreme Programming, it is.
It is interesting to see the surprising results in terms of lines of code. Originally we thought of it as splitting a micro-service from a monolith. In retrospective, it looks to me more like splitting a service into two services. ‘Micro’ in this case is in the eye of the beholder.

Real Time Performance Monitoring @ Outbrain

Outbrain serves millions of requests per minute, based on a microservice architecture.Consequently, as you might expect, visibility and performance monitoring are crucial.

Serving millions of requests per minute, across multiple data centers, in a micro services environment, is not an easy task. Every request is routed to many applications, and may potentially stall or fail at every step in the flow. Identifying bottlenecks, troubleshooting failures and knowing our capacity limits are all difficult tasks. Yet, these are not things you can just give up on “because they’re hard”, but are rather tasks that every engineer must be able to tackle without too much overhead. It is clear that we have to aim for all engineers to be able to understand how their applications are doing at any given time.

Since we face all of these challenges every day, we’ve reached the point where a paradigm shift was required. For example, move from the old, familiar “investigate the past” to the new, unfamiliar “investigate the present”. That’s only one of the requirements we came up with. Here are few more:

Real-time visibility

Sounds pretty straightforward, right? However, when using a persistent monitoring system, it always has at least few minutes of delay. These few minutes might contain millions of errors that potentially affect your business. Aiming for low MTTR means cutting delays where possible, thus moving from minute-based granularity to second-base.

Throughput, Latency and error rate are linked

Some components might suffer from high latency, but maybe the amount of traffic they receive is negligible. Others might have low latency under high load, but that’s only because they fail fast for almost every request (we are reactive!). We wanted to view these metrics together, and rank them by importance.

Mathematical correctness at any given aggregation (Don’t lie!)

When dealing with latency, one should look at percentiles, not averages, as averages can be deceiving and might not tell the whole story. But what if we want to view latency per host, and then view it per data center ? if we store only percentiles per host (which is highly common in our industry), it is not mathematically correct to average them! On the other hand, we have so much traffic that we can’t just store any measurement with its latency; and definitely not view them all in real time

Latency resolution matters

JVM based systems tend to display crazy numbers when looking at the high percentiles (how crazy ? With heavy gc storms and lock contention there is no limit to how worse these values can get). It’s crucial for us to differentiate between latency in the 99.5 and 99.9 percentiles, while values at the 5 or 10 percentiles don’t really matter.

Summing up all of the requirements above, we reached a conclusion that our fancy persistent monitoring system, with its minute-based resolution, supporting millions of metrics per minute, doesn’t cut it anymore. We like it that every host can write thousands of metric values every minute, and we like being able to view historical data over long periods of time, but moving forward, it’s just not good enough. So, as we often do, we sat down to rethink our application-level metric collection and came up with a new, improved solution.

Our Monitoring Unit

First, consider metric collection from the application perspective. Logically, it is an application’s point-of-view of some component: a call to another application, to a backend or plain CPU bound computation. Therefore, for every component, we measure its number of requests, failures, timeouts and push backs along with a latency histogram over a short period of time.

In addition, we want to see the worst performing hosts in terms of any such metric (can be mean latency, num errors, etc)

Our Monitoring Unit

To achieve this display for each measured component we decided to use these great technologies:

HDR Histograms

HdrHistogram supports the recording and analysis of sampled data value counts, across a configurable value range, with configurable value precision within the range. It is designed for recording histograms of latency measurements in performance-sensitive applications.

Why is this important? Because when using such histograms to measure the latency of some component, it allows you to have good accuracy of the values in the high percentiles at the expense of the low percentiles

So, we decided to store in memory instances of histograms (as well as counters for requests, errors, timeouts, push backs, etc) for each measured component. We then replace them each second and expose these histograms in the form of rx.Observable using our own OB1K application server capabilities.

All that is left is to aggregate and display.

Java Reactive extensions

https://github.com/ReactiveX/RxJava

rx is a great tool to merge and aggregate streams of data in memory. In our case, we built a service to merge raw streams of measured components; group them by the measured type, and aggregate them in a window of a few seconds. But here’s the trick – we do that on demand. This allows us to let the users view results grouped by any dimension they desire without losing the mathematical correctness of latency histograms aggregation.

Some examples of the operators we use to aggregate the multiple monitoring units:

 

examples of the operators

rx merge operator enables treating multiple streams as a single stream

 

multiple streams

rx window operator enables sliding window abstraction

 

sliding window abstraction

rx scan operator enables aggregation over each window

To simplify things, we can say that for each component we want to display, we connect to each machine to fetch the monitored stream endpoint, perform ‘merge’ to get a single stream abstraction, ‘window’ to get a result per time unit, and ‘scan’ to perform the aggregation

Hystrix Dashboard

The guys at Netflix found a great formula for displaying serving components’ status in a way that links between volume, error percentage and latency in a single view. We really liked that, so we adopted this UI to show our aggregated results.

The hystrix dashboard view of a single measured component shows counters of successes, failures, timeouts and push backs, along with a latency histogram, information on the number of hosts, and more. In addition, it provides a balloon view, which grows/shrinks with traffic volume per component, and is color-coded by the error rate.

See below how this looks in the breakdown view of all request components. The user gets a view of all measured components, sorted by volume, with a listing of the worst performing hosts.

 view of all request components

Another example shows the view of one application, with nothing but its entry points, grouped by data center. Our Operations guys find this extremely useful when needing to re-balance traffic across data centers.

iew of one application

OK, so far so good. Now let’s talk about what we actually do with it.

Troubleshooting

Sometimes an application doesn’t meet its SLA, be it in latency or error rate. The simple case is due to a broken internal component (for example, some backend went down and all calls to it result in failures). At this point we can view the application dashboard and easily locate the failing call. A more complex use case is an increase in the number of calls to a high latency component at the expense of a low latency one (for example, cache hit rate drop). Here our drill down will need to focus on the relative amount of traffic each component receives – we might be expecting a 1:2 ratio, while in reality we might observe a 1:3 ratio.

With enough alerting in place, this could be caught by an alert. Having the real-time view will allow us to locate the root cause quickly even when the alert is a general one.

Troubleshooting

Performance comparison

In many cases we want to compare the performance of two groups of hosts doing the same operation, such as version upgrades or topology changes. We use tags to differentiate groups of machines (each datacenter is a tag, each environment, and even each hostname). We then can ask for a specific metric, grouped by tags, to get the following view:

Performance comparison

 

Load testing

We conduct several types of load tests. One is where we shift as much traffic as possible to one data center, trying to hit the first system-wide bottleneck. Another is performed on specific applications. In both cases we use the application dashboard to view the bottlenecks, just like we would when troubleshooting unexpected events.

One thing to keep in mind is that when an application is loaded, sometimes the CPU is bounded and measurements are false because threads just don’t get CPU time. Another case where this happens is during GC. In such cases we must also measure the effects of this phenomenon.

The measuring unit, in this case, is ‘jvm hiccup’, which basically means taking one thread, letting it sleep for a while and measuring “measurement time on top of the sleep time”. Low hiccups means we can rely on the numbers presented by other metrics.

Load testing

 

What’s next?

Real-time monitoring holds a critical role in our everyday work, and we have many plans to leverage these measurements. From smarter metric driven load balancing in the client to canary deployments based on real application behavior – there is no limit to what you can achieve when you measure stuff in a fast, reliable manner.