admin

Live Tail in Kubernetes / Docker Based environment

At Outbrain we are big believers in Observability.

What is Observability, and what is the difference between Observability and Monitoring? I will leave the explanation to Baron Schwartz @xaprb:

“Monitoring tells you whether the system works.  Observability lets you ask why it’s not working.”

@ Outbrain we are currently in the midst of migrating to a Kubernetes / Docker based environment.

This presented many new challenges around understanding why things don’t work.

In this post I will be sharing with you our logging implementation which is the first tool used to understand the why.

But first thing first, a short review of our current standard logging architecture:

We use a standard ELK stack for the majority of our logging needs. By standard I mean Logstash on bare metal nodes, Elasticsearch for storage and Kibana for visualizing and analytics.  Apache Kafka is transport layer for all of the above.

A very simplified sketch of the system:Live Tail in Kubernetes

Of course the setup is a bit more complex in real life since Outbrain’s infrastructure is spread across thousands of servers, in multiple physical data centers and cloud providers; and there are multiple Elasticsearch clusters for different use cases.

Add to the equation that these systems are used in a self-serve model, meaning the engineers are creating and updating configurations by themselves – and you end up with a complex system which must be robust and resilient, or the users will lose trust in the system.

The move to Kubernetes presented new challenges and requirements, specifically related to the logging tools:

  • Support multiple Kubernetes clusters and data centers.
  • We don’t want to us “kubectl”, because managing keys is a pain especially in a multi cluster environment.
  • Provide a way to tail logs and even edit log file. This should be available on a single pod or across a service deployed in multiple pods.
  • Leverage existing technologies: Kafka, ELK stack and Log4j on the client side
  • Support all existing logging sources like multiline and Json.
  • Don’t forget services which don’t run in Kubernetes, yes we still need to support those.

 

So how did we meet all those requirements? Time to talk about our new Logging design.

The new architecture is based on a standard Kubernetes logging setup – Fluentd daemonset running on each Kubelet node, and all services are configured to send logs to stdout / err  instead of a file.

The Fluentd agent is collecting the pod’s logs and adding the Kubernetes level labels to every message.

The Fluentd plugin we’re using is the kubernetes_metadata_filter.

After the messages are enriched they are stored in a Kafka topic.

A pool of Logstash agents (Running as pods in Kubernetes) are consuming and parsing messages from Kafka as needed.

Once parsed messages can be indexed into Elasticsearch or routed to another topic.

A sketch of the setup described:

A sketch of the setup described:

And now it is time to introduce CTail.

Ctail, stands for Containers Tail, it is an Outbrain homegrown tool written in Go, and based on a server and client side components.

A CTail server-side component runs per datacenter or per Kubernetes cluster, consuming messages from a Kafka topic named “CTail” and based on the Kubernetes app label creates a stream which can be consumed via the CTail client component.

Since order is important for log messages, and since Kafka only guarantees order for messages in the same partition, we had to make sure messages are partitioned by the pod_id.

With this new setup and tooling, when Outbrain engineers want to live tail their logs, all they need to do is launch the CTail client.

Once the Ctail client starts, it will query Consul, which is what we use for service discovery, to locate all of the CTail servers; register to their streams and will perform aggregations in memory – resulting in a live stream of log entries.

Here is a sketch demonstrating the environment and an example of the CTail client output:

CTail client output

 

To view logs from all pods of a service called “ob1ktemplate” all you need is to run is:

# ctail-client -service ob1ktemplate -msg-only

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: uri http://localhost:8181/Ob1kTemplate/ returned status code 200
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: Getting uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate'
2017-06-13T19:16:25.531Z ob1ktemplate-test-ssages-2751568954-n1rte: uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate' returned status code 200

Or logs of a specific pod:

# ctail-client -service ob1ktemplate -msg-only -pod ob1ktemplate-test-ssages-2751568960-n1kwd

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri 
http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751568960-n1kwd: uri http://localhost:8181/Ob1kTemplate/ returned status code 200

 

This is how we solve this challenge.

Interested in reading more about other challenges we encountered during the migration? Either wait for our next blog, or reach out to visibility at outbrain.com.

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime

Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.

A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:

  1. In place: In place migration of the existing cluster to new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right.

 

First Steps

To better understand the chosen solution let’s take a look at our current architecture:

First Steps

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity the diagram describes only one cluster.

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.

 

Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:

data delivery pipeline

 

Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.

Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.

Well… not really.

Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the databases for no good reason, it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.

So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.

When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.

Let’s Do This…

Phase 1 of the migration should look something like this:

Let's Do This...

 

Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.

 

Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals.

 

Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:

new cluster

 

In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.

The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.

What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active – active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflowEngines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.

What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project you can check out the video from Strata 2017 London where I discussed it in more detail.

Failure Testing for your private cloud – Introducing GomJabbar

TL;DR Chaos Drills can contribute a lot to your services resilience, and it’s actually quite a fun activity. We’ve built a tool called GomJabbar to help you run those drills.


Failure Testing for your private cloud

Here at Outbrain we manage quite a large scale deployment of hundreds of services/modules, and thousands of hosts. We practice CI/CD, and implemented quite a sound infrastructure, which we believe is scalable, performant, and resilient. We do however experience many production issues on a daily basis, just like any other large scale organization. You simply can’t ensure a 100% fault-free system. Servers will crash, run out of disk space, and lose connectivity to the network. Software will experience bugs, and erroneous conditions. Our job as software engineers is to anticipate these conditions, and design our code to handle them gracefully.

For quite a long time we were looking into ways of improving our resilience, and validate our assumptions, using a tool like Netflix’s Chaos Monkey. We also wanted to make sure our alerting system actually triggers when things go wrong. The main problem we were facing is that Chaos Monkey is a tool that was designed to work with cloud infrastructure, while we maintain our own private cloud.

The main motivation for developing such a tool is that failures have the tendency of occurring when you’re least prepared, and in the least desirable time, e.g. Friday nights, when you’re out having a pint with your buddies. Now, to be honest with ourselves, when things fail during inconvenient times, we don’t always roll our sleeves and dive in to look for the root cause. Many times the incident will end after a service restart, and once the alerts clear we forget about it.

Wouldn’t it be great if we could have “chaos drills”, where we could practice handling failures, test and validate our assumptions, and learn how to improve our infrastructure?

Chaos Drills at Outbrain

We built GomJabbar exactly for the reasons specified above. Once a week, at a well known time, mid day, we randomly select a few targets where we trigger failures. At this point, the system should either auto-detect the failures, and auto-heal, or bypass them. In some cases alerts should be triggered to let teams know that a manual intervention is required.

After each chaos drill we conduct a quick take-in session for each of the triggered failures, and ask ourselves the following questions:

  1. Did the system handle the failure case correctly?
  2. Was our alerting strategy effective?
  3. Did the team have the knowledge to handle, and troubleshoot the failure?
  4. Was the issue investigated thoroughly?

These take-ins lead to super valuable inputs, which we probably wouldn’t collect any other way.

How did we kick this off?

Before we started running the chaos drills, there were a lot of concerns about the value of such drills, and the time it will require. Well, since eliminating our fear from production is one of the key goals of this activity, we had to take care of that first.

"I must not fear.
 Fear is the mind-killer.
 Fear is the little-death that brings total obliteration.
 I will face my fear.
 I will permit it to pass over me and through me.
 And when it has gone past I will turn the inner eye to see its path.
 Where the fear has gone there will be nothing. Only I will remain."

(Litany Against Fear - Frank Herbert - Dune)

So we started a series of chats with the teams, in order to understand what was bothering them, and found ways to mitigate it. So here goes:

  • There’s an obvious need to avoid unnecessary damage.
    • We’ve created filters to ensure only approved targets get to participate in the drills.
      This has a side effect of pre-marking areas in the code we need to take care of.
    • We currently schedule drills via statuspage.io, so teams know when to be ready, and if the time is inappropriate,
      we reschedule.
    • When we introduce a new kind of fault, we let everybody know, and explain what should they prepare for in advance.
    • We started out from minor faults like graceful shutdowns, continued to graceless shutdowns,
      and moved on to more interesting testing like faulty network emulation.
  • We’ve measured the time teams spent on these drills, and it turned out to be negligible.
    Most of the time was spent on preparations. For example, ensuring we have proper alerting,
    and correct resilience features in the clients.
    This is actually something you need to do anyway. At the end of the day, we’ve heard no complaints about interruptions, nor time waste.
  • We’ve made sure teams, and engineers on the call were not left on their own. We wanted everybody to learn
    from this drill, and when they weren’t sure how to proceed, we jumped in to help. It’s important
    to make everyone feel safe about this drill, and remind everybody that we only want to learn and improve.

All that said, it’s important to remember that we basically simulate failures that occur on a daily basis. It’s only that when we do that in a controlled manner, it’s easier to observe where are our blind spots, what knowledge are we lacking, and what we need to improve.

Our roadmap – What next?

  • Up until now, this drill was executed in a semi-automatic procedure. The next level is to let the teams run this drill on a fixed interval, at a well known time.
  • Add new kinds of failures, like disk space issues, power failures, etc.
  • So far, we were only brave enough to run this on applicative nodes, and there’s no reason to stop there. Data-stores, load-balancers, network switches, and the like are also on our radar in the near future.
  • Multi-target failure injection. For example, inject a failure to a percentage of the instances of some module in a random cluster. Yes, even a full cluster outage should be tested at some point, in case you were asking yourself.

The GomJabbar Internals

GomJabbar is basically an integration between a discovery system, a (fault) command execution scheduler, and your desired configuration. The configuration contains mostly the target filtering rules, and fault commands.

The fault commands are completely up to you. Out of the box we provide the following example commands, (but you can really write your own script to do what suits your platform, needs, and architecture):

  • Graceful shutdowns of service instances.
  • Graceless shutdowns of service instances.
  • Faulty Network Emulation (high latency, and packet-loss).

Upon startup, GomJabbar drills down via the discovery system, fetches the clusters, modules, and their instances, and passes each via the filters provided in the configuration files. This process is also performed periodically. We currently support discovery via consul, but adding other methods of discovery is quite trivial.

When a users wishes to trigger faults, GomJabbar selects a random target, and returns it to the user, along with a token that identifies this target. The user can then trigger one of the configured fault commands, or scripts, on the random target. At this point GomJabbar uses the configured CommandExecutor in order to execute the remote commands on the target hosts.

GomJabbar also maintains a audit log of all executions, which allows you to revert quickly in the face of a real production issue, or an unexpected catastrophe cause by this tool.

What have we learned so far?

If you’ve read so far, you may be asking yourself what’s in it for me? What kind of lessons can I learn from these drills?

We’ve actually found and fixed many issues by running these drills, and here’s what we can share:

  1. We had broken monitoring and alerting around the detection of the integrity of our production environment. We wanted to make sure that everything that runs in our data-centers is managed, and at a well known (version, health, etc). We’ve found that we didn’t compute the difference between the desired state, and the actual state properly, due to reliance on bogus data-sources. This sort of bug attacked us from two sides: once when we triggered graceful shutdowns, and once for graceless shutdowns.
  2. We’ve found services that had no owner, became obsolete and were basically running unattended in production. The horror.
  3. During the faulty network emulations, we’ve found that we had clients that didn’t implement proper resilience features, and caused cascading failures in the consumers several layers up our service stack. We’ve also noticed that in some cases, the high latency also cascaded. This was fixed by adding proper timeouts, double-dispatch, and circuit-breakers.
  4. We’ve also found that these drills motivated developers to improve their knowledge about the metrics we expose, logs, and the troubleshooting tools we provide.

Conclusion

We’ve found the chaos drills to be an incredibly useful technique, which helps us improve our resilience and integrity, while helping everybody learn about how things work. We’re by no means anywhere near perfection. We’re actually pretty sure we’ll find many many more issues we need to take care of. We’re hoping this exciting new tool will help us move to the next level, and we hope you find it useful too 😉

Is Cassandra really visible? Meet Cassibility…

You love Cassandra, but do you really know what’s going on inside your clusters?

Cassandra-CassibilityThis blog post describes how we managed to shed some light on our Cassandra clusters, add visibility and share it with the open source community.

Outbrain has been using Cassandra for several years now. As all companies, we started small, but during the time our usage grew, complexity increased, and we ended up having over 15 Cassandra clusters with hundreds of nodes, spread across multiple data centers.

We are working in the popular micro services model, where each group has its own Cassandra cluster to ensure resources and problems are isolated. But one thing remains common – we need to have good visibility in order to understand what is going on.

We already use Prometheus for metrics collection, Grafana for visualising its data and Pagerduty as our alerting system. However, we still needed to have detailed enough visibility on the Cassandra internals to ensure we could react to any issues encountered before they became a problem and make appropriate and informed performance tunings. I have to admit that when you don’t encounter a lot of problems, you tend to believe that what you have is actually sufficient, but when you suddenly have some nasty production issues, and we had our fair share, it becomes very challenging to debug it efficiently, in realtime, sometimes in the middle of the night.

Let’s say, as a simple example, that you realized that the application is behaving slower because the latency in Cassandra increased. You would like to understand what happened, and you start thinking that it can be due to a variety of causes – maybe it’s a system issue, like a hardware problem, or a long GC. Maybe it’s an applicative issue, like an increase in the number of requests due to a new feature or an application bug, and if so you would like to point the developer to a specific scenario which caused it. If so it would be good if you could tell him that this is happening in a specific keyspace or column family. In this case, if you’re also using row cache for example, you would wonder if maybe the application is not using the cache well enough, for example the new feature is using a new table which is not in the cache, so the hit rate will be low. And Maybe it’s not related to any of the above and it is actually happening due to a repair or read repair process, or massive amount of compactions that accumulated.  It would be great if you could see all of this in just a few dashboards, where all you had to do in order to dig into these speculation of your could be done in just a few clicks, right? Well, that’s what Cassibility gives you.

Take a look at the following screenshots, and see how you can see an overview of the situation and pinpoint the latency issue to number of requests or connections change, then quickly move to a system dashboard to isolate the loaded resource:

* Please note, the dashboards don’t correspond to the specific problem described, this is just an example of the different graphs

Cassandra

System_DocsOnline

Then if you’d like to see if it’s related to specific column families, to cache or to repairs, there are dedicated dashboards for this as well

Cassandra

Cassandra

Cassandra

Here is the story of how we created Cassibility.

We decided to invest time in creating better and deeper visibility, and had some interesting iterations in this project.

At first, we tried to look for an open-source package that we could use, but as surprising as it may be, even with the wide usage of Cassandra around the world, we couldn’t find one that was sufficient and detailed enough for our requirements. So we started thinking how to do it ourselves.

Iteration I

We began to dig into what Cassandra can show us. We found out that Cassandra itself exposes quite a lot of metrics, could reach dozens of thousands of metrics per node, and they can be captured easily via JMX. Since we were already using the Prometheus JMX exporter (https://github.com/prometheus/jmx_exporter) in our Prometheus implementation, it seemed like the right choice to use it and easy enough to accomplish.

Initially we thought we should just write a script that exposes all of our metrics and automatically create JSON files that represent each metric in a graph. We exposed additional dimensions for each metric in order to specify the name of the cluster, the data center and some other information that we could use to monitor our Cassandra nodes more accurately. We also thought of automatically adding Grafana templates to all the graphs, from which one could choose and filter which cluster he wants to see, which datacenter, which Keyspace or Column Family / Table and even how to see the result (as a sum, average, etc.).

This sounded very good in theory, but after thinking about it a bit more, such abstraction was very hard to create. For instance there are some metrics that are counters, (e.g number of requests) and some that are gauge (e.g latency percentile). This means that with counters you may want to calculate the rate on top of the metric itself, like when you would want to take the number of requests and use it to calculate  a throughput. With a gauge you don’t need to do anything on top of the metric.

Another example is how you would like to see the results when looking at the whole cluster. There are some metrics, which we would like to see in the node resolution and some in the datacenter or cluster resolution. If we take the throughput, it will be interesting to see what is the overall load on the cluster, so you can sum up the throughput of all nodes to see that. The same calculation is interesting at the keyspace or column family level. But if you look at latency, and you look at a specific percentile, then summing, averaging or finding maximum across all nodes actually has no meaning. Think about what it means if you take the number that represents the request latency which 99% of the requests on a specific node are lower than, and then do the maximum over all nodes in the cluster. You don’t really get the 99’th percentile of latency over the whole cluster, you get a lot of points, each representing the value of the node with the highest 99’th percentile latency in every moment. There is not much you can do with this information.

There are lot of different examples of this problem with other metrics but I will skip them as they require a more in depth explanation.

The next issue was how to arrange the dashboards. This is also something that is hard to do automatically. We thought to just take the structure of the Mbeans, and arrange dashboards accordingly, but this is also not so good. The best example is, of course, that anyone would like to see an overview dashboard that contains different pieces from different Mbeans, or a view of the load on your system resources, but there are many other examples.

Iteration II

We realized that we need to better understand every metric in order to create a clear dashboard suite, that will be structured in a way that is intuitive to use while debugging problems.

When reviewing the various sources of documentation on the many metrics, we found that although there was some documentation out there, it was often basic and incomplete – typically lacking important detail such as the units in which the metric is calculated, or is written in a way that doesn’t explain much on top of the metric name itself. For example, there is an Mbean called ClientRequest, which includes different metrics on the external requests sent to Cassandra’s coordinator nodes. It contains metrics about latency and throughput. On the Cassandra Wiki page the description is as follows:

  Latency: Latency statistics.

TotalLatency: Total latency in microseconds

That doesn’t say much. Which statistics exactly? What does the total mean in comparison to  just latency? The throughput, by the way, is actually an attribute called counter within the latency MBean of a certain scope (Read, Write, etc.), but there are no details about this in the documentation and it’s not that intuitive to understand. I’m not saying you can’t get to it with some digging and common sense, but it certainly takes time when you’re starting.

Since we couldn’t find one place with good and full set of documentation we started digging ourselves, comparing values to see if they made sense and used a consultant named Johnny Miller from digitalis.io who has worked a lot with Cassandra and was very familiar with its internals and  metrics.

We improved our overall understanding at the same time as building and structuring the dashboards and graphs.

Before we actually started, we figured out two things:

  1. What we are doing must be something that many other companies working with Cassandra need, so our project just might as well be an open-source one, and help others too.
  2. There were a lot of different sections inside Cassandra, from overview to cache, entropy, keyspace/column family granularities and more, each of which we may want to look at separately in case we get some clues that something may be going on there. So each such section could actually be represented as a dashboard and could be worked on in parallel.

We dedicated the first day to focus on classifying the features into logical groupings with a common theme and deciding what information was required in each one.

Once we had defined that, we then started to think about the best and fastest way to implement the project and decided to have a 1 day Hackathon in Outbrain. Many people from our Operations and R&D teams joined this effort, and since we could parallelize the work to at most 10 people, that’s the number of people who participated in the end.

This day focused both on creating the dashboards as well as finding solutions to all places where we used Outbrain specific tools to gather information (for example, we use Consul for service discovery and are able to pull information from it). We ended the day with having produced 10 dashboards,with some documentation, and we were extremely happy with the result.

Iteration III

To be sure that we are actually releasing something that is usable, intuitive to use and clear, we wanted to review the dashboards, documentation and installation process. During this process, like most of us engineers know, we found out that the remaining 20% will take quite a bit to complete.

Since in the Hackathon people with different knowledge of Cassandra participated, some of the graphs were not completely accurate. Additional work was therefore needed to work out exactly which graphs should go together and what level of detail is actually helpful to look at while debugging, how the graphs will look when there are a lot of nodes, column families/tables and check various other edge cases. We spent several hours a week over the next few weeks on different days to finalize it.

We are already using Cassibility in our own production environment, and it has already helped us to expose anomalies, debug problems quickly and optimize performance.

I think that there is a big difference between having some visibility and some graphs and having a full, well organized and understandable list of dashboards that gives you the right views at the right granularity, with clear documentation. The latter is what will really save you time and effort and even help people that are relatively new to Cassandra to understand it better.

I invite you to take a look, download and easily start using Cassibility: https://github.com/outbrain/Cassibility. We will be happy to hear your feedback!

Angular DRY mocking – Leonardo

This post was written by Sagiv Frenkel.

As developers one of the first and most basic things we learn is “Don’t repeat yourself!”.
That means trying to avoid writing the same code twice – in other words, no copy paste!
While we still sin with the occasional copy paste, it’s something we’re mindful of and is easy to notice. We just have to remember to refactor later on.

But do we treat our mocking the same ?

Lets look at a typical development flow

1) Create your UI/UX, services and controller.
2) Create your server API calls.
3) Test your application, manually/automated with self generated data in different scenarios.

What’s wrong with this approach?

We are’nt repeating code, but we are repeating work

1) Documenting – there’s no good way to tell which user/data to use for which scenario.
2) Running – you need to log in/out to change users or manually change code to fit changes.
3) Testing – error scenarios, edge cases, and request delays/throttling are very hard. Using override scripts or using comments to switch data are the only tools at our disposal.

Can we do better?

Introducing Leonardo

Leonardo is an open sourced AngularJS module created by Outbrain. It can be installed from npm or Bower, and easily integrates into existing AngularJS applications (more details on Leonardo’s GitHub repo)

Leonardo has a fancy UI where you can easily toggle different states/scenarios.

It enables you to:

1) Centralize your mocking and scenario configuration.
2) Persist the configuration into an external file.
3) Create manual QA or automated test

We use Leonardo extensively with protractor. More on this in another post.

Want to get started with Leonardo?

Check this Example to see how you can move from a regular image gallery to a mocked one.

How does Leonardo work?

Leonardo has two important concepts – states and scenarios.

state

We add states to declare what and how to mock.
There are two types:

Ajax States – This it what we will typically use. We declare the url and verb we wish to mock and what response data we wish to return – including a delay and a status.
[javascript]

[/javascript]

Non Ajax States – This requires more work on the part of the developers. Basically, this allows you to declare a state and its underlying data, (not mandatory) and you can later check if it’s on or off.
[javascript]

[/javascript]

You can query Leonardo for the value of a certain state.
[javascript]

[/javascript]

Leonardo triggers an event whenever a state changes.
[javascript]

[/javascript]

Scenarios:

Scenarios simply enable you to set a specific set of states as active.
[javascript]

[/javascript]

Note:

– We currently only support Angular application. That is what we initially developed on, and was easy to implements. If the tool gains traction and popularity, it should be easy to migrate to a more vanilla approach.

– Use Leonardo to start mocking http or anything you like! We’d love to get your feedback!

Announcing orchestrator-agent

Announcing orchestrator-agent

This post was written by Shlomi Noach

orchestrator-agent is a side-kick, complementary project of orchestrator, implementing a daemon service on one’s MySQL hosts which communicates with and accepts commands from orchestrator, built with the original purpose of providing an automated solution for provisioning new or corrupted slaves.

It was built by Outbrain, with Outbrain’s specific use case in mind. While we release it as open source, only a small part of its functionality will appeal to the public (this is why it’s not strictly part of the orchestrator project, which is a general purpose, wide-audience solution). Nevertheless, it is a simple implementation of a daemon, such that can be easily extended by the community. The project is open for pull-requests!

A quick breakdown of orchestrator-agent is as follows:

  • Executes as a daemon on linux hosts
  • Interacts and invokes OS commands (via bash)
  • Does not directly interact with a MySQL server running on that host (does not connect via mysql credentials)
  • Expects a single MySQL service on host
  • Can control the MySQL service (e.g. stop, start)
  • Is familiar with LVM layer on host
  • Can take LVM snapshots, mount snapshots, remove snapshots
  • Is familiar with the MySQL data directory, disk usage, file system
  • Can send snapshot data from a mounted snapshot on a running MySQL host
  • Can prepare data directory and receive snapshot data from another host
  • Recognizes local/remote datacenters
  • Controlled by orchestrator, two orchestrator-agents implement an automated and audited solution for seeding a new/corrupted MySQL host based on a running server.

Read more >

Announcing Aletheia – A streaming data delivery framework

This post was written by Stas Levin

Outbrain is proud to announce Aletheia, our solution for a uniform data delivery and flow monitoring across data producing and consuming subsystems. At Outbrain we have great amounts of data being constantly moved and processed by various real time and batch oriented mechanisms. To allow fast recovery and high SLA, we need to be able to detect problems in our data crunching mechanisms as fast as we can, preferably at near real time. The later problems are detected, the harder it is to investigate them (and thus fix them), and chances of business impact grow rapidly.

To address these issues, we’ve built Aletheia, a framework providing a uniform way to deliver and consume data, with built in monitoring capabilities allowing both producing and consuming sides to report statistics, which can be used to monitor the pipeline state in a timely fashion.

Read more >

The power of promises for file downloading

In this blog post I will be implementing a file download with a progress indicator using cookies, AngularJS and the promises.

Promises are a powerful concept with a number of advantages, in the following implementation pay attention to these points (your more then welcome to comment):

  1. Clarity and readability of code
  2. Error handling
  3. Separation of concerns

I thought of showing the same implementation without promises, but I think anyone who has tried to handle more than one callback and handle the error cases properly will easily see the difference.

The Module

A download button that changes it’s text with set intervals.
At the end it should be in a success state or an error state.
To complicate things a little and show the power of promises I added another step called “validateBeforeDownload”, this step will call the server to validate the download and fail it if necessary.
[xyz-ihs snippet=”fileDownload”]

Downloading a file

The standard way of downloading a file is with a simple “a” tag with an href.
In order to do be able to add the “validateBeforeDownload” step and avoid passing “dom” to a service – I am using an Iframe which a service creates and destroys. This will trigger the download and if the server headers are appropriate the download will begin.

Service Code

[javascript]

[/javascript]

Adding in the progress

Easier said then done! Downloading a file can’t be done with an simple ajax call, so you can’t tell when the download is complete.
The solution I’m using is setting a cookie, let’s call it “download_file” with a timer that checks for a cookie every 500ms.

  • While the cookie exists the loading state is preserved.
  • Once the request completes, the server deletes the cookie and the timer is stopped.

This isn’t the best solution but is simple and doesn’t require sockets or external plugins.

Service Code

[javascript]

[/javascript]

Java Server

Just to get the full stack of implementation here is the code for handling the response data and the clearing of the cookie.
[java]

[/java]

Wrapping everything together with promises

Pay attention to the comments in the code, some of the code is there to simulate the server requests and response and are only there for the full picture.

HTML

Each visual state of the button is determined by it’s text (scope.downloadExcelText).
[javascript]

[/javascript]

Service

Notice $timeout mocks an asynchronous call and it’s response to a server.
this would normally be done with $http.
[javascript]

[/javascript]

Controller

This is were our hard work pays off and promises start to shine.

Lets step into the promise mechanism –
Prepending the “downloadService.validateBeforeDownload” to the “downloadService.downloadExcel” with the “then” method creates a third promise which shares callbacks for: success, failure and notifications (for the progress).
There is also a finally callback attached to this promise that we use for sharing code between the success and failure.
But the really nice thing here is it also enables handling errors just from the “validateBeforeDownload”, and bubbling them up if needed with $q.reject or by simply throwing the error.

Pay attention that each step towards completion of the promise seems to be handled in an async manner and the actual asynchronicity is handled by the promise mechanism and the service. Magic!
[javascript]

[/javascript]

Introducing Orchestrator: manage and visualize your MySQL replication topologies and get home for dinner

Introducing Orchestrator: manage and visualize your MySQL replication topologies and get home for dinner

This post was written by Shlomi Noach

We’re happy to announce the availability of Outbrain‘s Orchestrator: MySQL replication management & visualization tool.

Orchestrator

  • Orchestrator reads your replication topologies (give it one server – be it, master or slave – in each topology, and it will reveal the rest).
  • It keeps a state of this topology.
  • It can continuously poll your servers to get an up to date topology map.
  • It visualizes the topology in a clear and slick D3 tree.
  • It allows you to modify your topology; move slaves around. You can use the command line variation, the JSON API, or you can use the web interface.

Quick links: Orchestrator Manual, FAQ, Downloads

Nothing like nice screenshots

To move slaves around the topology (repoint a slave to a different master) through orchestrator’s web interface, we use Drag and Drop,

just like that

Orchestrator

 

Orchestrator

.

orchestrator-simple-dropped

Safety

Orchestrator keeps you safe. It does so by:

  • Correctly calculating the binary log files & positions (aka coordinates) of the slave you’re moving, its current master, its new master; it properly stops, starts and stalls your replication till everything is in sync.
  • Helping you to avoid shooting yourself in the leg. It will not allow moving a slave that uses STATEMENT based replication under a ROW based replication server. Or a 5.5 under a 5.6. Or anything under a server that doesn’t have binary logs. Or log_slave_updates. Or if one of the servers involed lags too much. Or more…

Visualization

It also points out a few problems, visually. While it is not – and will not be – a monitoring tool, it requires some replication status info for its own purposes. Too much lag? Replication not working? Server cannot be accessed? Server under maintenance? This all shows up in your topology. We use it a lot to get a holistic view of our current replication topologies status.

State

Orchestrator keeps the state of your topologies. Unlike other tools that will drill down from the master and just pick up on whatever’s connected right now, orchestrator remembers what used to be connected, too. If a slave is not replicating at this very moment, that does not mean it’s not part of the topology. Same for a MySQL service that has been temporarily stopped. And this includes all their slaves, if any. Until told otherwise (or until too much time passes and a server is assumed dead), orchestrator keeps the map intact.

Maintenance

Orchestrator supports a maintenance-mode state; it’s a flag saying “this server is in maintenance mode right now”. But this flag includes an owner and a reason for audit purposes. And while a server is under maintenance, orchestrator will disallow replication topology changes that include this server.

Audit

Operations performed via orchestrator are audited (well, almost all). You have a complete history on what slave has been moved from where to where; what server has been taken under maintenance and when, etc.

Automation

The most important thing is of course automating error-prone human sequences of actions. Repointing slaves is a mess (when you don’t have GTIDs). Automation saves time and greatly reduces the possibility that something goes wrong (of course never eliminates). We happen to use orchestrator at Outbrain on production, and twice in the past month had major events where orchestrator saved us many hours and worry.

Support

Orchestrator supports “standard” replication: log file:pos kind of replication. Non GTID, non-parallel. Good (?) old replication.

Why not GTID? We’re using MySQL 5.5. We’ve had issues while evaluating 5.6; and besides, migrating to GTID is a mess (several solutions or proposed solutions seem to exist). At this time the majority of MySQL users seem to run 5.5, and a minority of those running 5.6 uses GTID (this is according to an unofficial “raise your hands” survey during last Percona Live event). “Standard” replication still applies to the majority of users. Support for GTID may be added in the future.

Read the FAQ for further questions on supported replication technologies.

How do you like it?

Orchestrator can run as a command line tool (no need for Web). It can server HTTP JSON API (no need for visualization) or it can server as HTTP web interface (no need to use command line options). Have it your way.

The technology stack

Orchestrator is written in Go, with Martini as web framework; MySQL as backend database; D3, jQuery & bootstrap for frontend.

License

Orchestrator is released as open source under the Apache 2.0 license and is available at: https://github.com/outbrain/orchestrator

Documentation

Read the Manual

Download

Get the bundled binary+web files tarball, RPM or DEB packages. Or just clone the project. It’s free.

 

Introducing Propagator: multi-everything deployment made easy

Introducing Propagator: multi-everything deployment made easy

This post was written by Shlomi Noach.

Outbrain is happy to release its own Propagator as open source. The propagator is a schema & data deployment tool which makes it easy to deploy, review, audit & fix deployments to your database servers.

What does multi-everything mean? It is:

  • Multi-server: push your schema & data changes to multiple instances in parallel
  • Multi-role: different servers have different schemas
  • Multi-environment: recognizes the differences between development, QA, build & production servers
  • Multi-technology: supports MySQL, Hive (Cassandra on the TODO list)
  • Multi-user: allows users authenticated and audited access
  • Multi-planetary: TODO

With dozens of database servers in our company (and these are master database servers), from development machines to testing machines, through build machines to production servers, and with a growing team of over 70 engineers, we faced the growing problem of controlling our database schema evolution. Controlling creation of tables, columns, keys, foreign keys; controlling the creation of data that must be consistent across all servers became an infeasible task. Some changes were lost; some servers forgot along the way, and inconsistencies blocked our development & deployments again and again. Read more >