Category: Backend

Spark in framework

Making Spark Native for Data Processing Framework

Working in an infrastructure team is different from working in a development team. The team develops tools that are used by developers’ teams inside Outbrain. It is very important to provide a convenient way for working with new technologies. Having a Spark cluster enables running distributed data processing in scale. Until now developers worked with Spark writing ad-hoc crontab jobs in production. It created a messy and unstable environment and required manual maintenance in case of failure. For that reason, our team wanted to give an infrastructural way for working with Spark.

In this blog post, I explain how we built the solution implementing Spark to be a native tool in our WorkflowEngine.

Meet WorkflowEngine

The Outbrain Data Infrastructure team enables running hundreds of ETL jobs every hour. These jobs are orchestrated by WorkflowEngine – an internal tool, which is developed by the team.

The WorkflowEngine (WFE) enables running various types of transformations from many kinds of data sources: MySQL, Cassandra, Hive, Vertica, Hadoop etc., located in two data centers and the cloud. Every transformation, or flow, implements some business logic. A flow may contain several tasks, which are executed one after another.

Flows are developed and owned by researchers, data scientists and developers in other groups at Outbrain, such as  Algorithm research groups, BI and Data Science.

input Trigger

Defining flows in WFE gives users a lot of benefits. It enables users to autonomously specify dependencies between flows, so when one flow ends, another is triggered. In addition, users can define a number of retries for their flows, so if a flow fails for some reason, WFE reruns it. If all retries fail, WFE sends an email or alert about the flow failures. Logs for each running flow are shown in a convenient form in WFE UI. Furthermore, WFE collects metrics for all flows, which enables us to track the health of the system.

From the operational perspective, several instances of WFE are deployed on machines which are owned by the infrastructure team, users just define ETL flows in a separate repository.

Diving Into the Solution

In order to respond to requirements for running data analytics on Spark, our team had to allow an infrastructural way for running Spark jobs orchestrated by WFE. To achieve this, we’ve developed a new kind of WorkflowEngine task called SparkStep.

When approaching the design for SparkStep, we had several principals in mind:

  1. Avoid producing additional load on WFE machines
  2. Create an isolated environment for every Spark job
  3. Allow users to specify and at the same time allow the system to limit the number of YARN resources allocated for every SPARK job
  4. Enable using multiple Spark clusters with different versions

Taking all this into consideration, we came up with the following architecture:

 

Keeping the control while having fun

The Spark job definition specifies how a  job will run on the Spark cluster (e.g. jarURL, className, configuration parameters). In addition, there are properties that are set internally by WFE. When WFE starts executing a SPARK job, it builds a spark-submit command using all these properties.

The spark-submit command submits the Spark job to YARN, which allocates needed resources and launches the application. After submitting the job, WFE constantly checks its status using YARN REST API. When the job is completed in YARN, WFE reports a COMPLETED status for the WFE Spark task and continues running the next task. However, if the Spark job fails in YARN, WFE reports a FAILED status for the WFE job. In this case, WFE can rerun the job, according to the defined number of retries. If the job runs in YARN for too long, and its running time exceeds the defined maximum, WFE stops the job using the YARN REST API. In this case, the job is marked as FAILED in WFE. This functionality ensures both that we have a cap on the amount of resources each flow can use and that resources are saved when a job is canceled for any reason.

The spark-submit command runs in a Docker container. The Docker exits once the job is submitted to the cluster. It enables us to create an isolated environment for every Spark job executed by WFE. In addition, using Docker container, we can easily use different Spark versions simultaneously, by creating different Docker images.

By using this architecture we achieve all our goals. Spark jobs are submitted to YARN in a “cluster” mode, so they do not create a load on WFE machines. YARN handles all resources for each job. Every Spark job runs in an isolated environment owing to the Docker container.

X tips [x>5] for Micro-Services Logging

Micro-Services Logging

What if?

What if someone told you it is forbidden to use logs anymore?

In my case, it began in a meeting with our ops team that claimed our services are writing too many logs and we should write less. Therefor logs are being “throttled” ie: some log messages are just discarded.

Too many logs were 100 lines per minute, which in my opinion was a ridiculously low number.

Maybe I am doing something wrong?

It might be that logs are not a good pattern in a micro-services-highly-distributed environment. I decided to rethink my assumptions.

Why do I need logs anyway?

The obvious reason to write logs is debugging purposes. When something goes wrong in production, logs are a way I can understand the flow that leads to that erroneous state.

The main alternative that I am aware of is connecting a debugger and stepping thru the code in the flow. There are a couple of disadvantages for using the debugger: It is time-consuming, It might slow down the production process itself and you have to be connected when it happens — so If this bug happens only at night — bummer. In addition debugging is a one time process, the learning from it is just in your head so it is hard to improve that way.

Another alternative is adding metrics. Metrics are pretty similar to logs but they have the extra feature of having nice dashboards and alerting systems (we use metrics , prometheus and grafana). On the other hand, metrics have bigger overhead in the setup process which is the main disadvantage in my opinion. Metrics are also more rigid and do not allow usually to log all state, context, and parameters. The fact that writing logs is easy makes it a no-brainer to use everywhere in the code while applying the “think later” paradigm.

The third alternative is auditing via systems like ELK etc’. Similar to metrics it has higher overhead and it also hard to follow sequential operations with those discrete events.

There are even more reasons for logging. It is an additional documentation in the code that can help understand what is going on. Logging can also be used as metrics and alerting systems and even replace those systems. Many insights can be gained from logs, sometimes even user passwords.

Specification

If I go back to that meeting with the ops guys, one questions I was asked was:

‘What are your requirements from your logging system’?

Here it is:

Order matters — messages should be in ‘written before’ order so it will be possible to understand the flow of the code.

Zero throttling — I expect that there will be no rate limit on writing, only on the volume of saved messages, so every time a message is thrown away, it is the oldest one.

X days history — Log files should keep log messages for at least couple of days — if that is not the case it might be you are writing too many log messages, so move some to debug level.

Logs should be greppable — text files has this big advantage of flexibility and multiple tools that can be used with them. So text files are a big advantage.

Metadata — logging system should provide some metadata like calling thread, timestamp, calling method or class etc’ to remove that burden from the developer (logging should be easy).

Distributed and centralized — it should be convenient to look at all logs in a central location, but also be able to split it and see logs of a specific process.

Easy to use, easy to install, easy to consume — in general logging should be fun.

Tips and Tricks

I am in. Is there anything else to know?

Use logging framework

Don’t log to standard output with print lines in a long-running service. Logging framework allows various levels, various appenders like logging to HTML via HTTP, log rotation and all sort of features and tricks.

Developer tip — Log levels

It’s important to be consistent when using different levels, otherwise, you will lose semantics and meaning of the message severity. The way I use them is:

  • Error — something bad happened and it might crash the process/service.
  • Warn — something bad happened in a specific use case.
  • Info — something that I want to see happen.
  • Debug — something happened, I wish to see only under special circumstances otherwise it will clutter normal logs.

There are some special reasons to override that rule: Error and Warn messages are monitored in our services, so sometimes I might move something to info level to snooze alerting of it. I move messages from Info to Debug if there is too much clutter and the other direction if I want to focus on something.

Developer tip — Meaningful messages

I often see messages like “start processing” or “end method”. It is always a good idea to try and imagine what else you would like to see when reading log messages s and add as much info as possible, specific parameters, fields and contextual data which might have different value at different scenarios.

Developer tip — Lazy evaluated strings

Especially for Debug level, it is a good practice to use frameworks that prevent the overhead of string concatenation when the level is turned off without the need to explicitly check if the level isn’t suppressed. In kotlin-logging, for example, it will look like this:

logger.debug { “Some $expensive message!” }

Developer tip — Don’t log errors more than once per error

It is a common anti-pattern to log an exception and then re-throw it just to be logged later in another place in the code again. It makes it harder to understand the number of errors and their origin. Log exceptions only if you are not re-throwing them.

Ad-Hoc enablement of log level

Some frameworks and services allow on-the-fly change of the active log level. It means you can print debug messages of a specific class in a specific instance for a couple of minutes, for example. It allows debugging while not trashing the log file in the rest of the time.

Ad-Hoc addition of logging messages

When I worked at Intel, one of my peers developed a JVM tool that allowed bytecode manipulation of methods and adding log messages at the beginning of methods and at the end of methods.

It means you didn’t have to think in advance on all those messages, but just to inject them with log messages when needed while the process is running.

In-Memory logs

Another useful technique is keeping last messages in memory. It allows developing an easy way to access them from remote via REST call, for example. It is also possible to have that dumped into a file in case the process has a crash.

Logging as a poor-man profiler

It is possible to analyze logs to gain also insight on the performance of the application. The simple technique I saw is using the timestamp in the logs. A more advanced technique is using the context to calculate and show the time from the beginning of the sequence (ie: when the HTTP call started), by using MDC.

Log formatting

The content of the message is also important. Various logging framework allows embedding predefined template parameters such as:

  • Location info — Class and file name, method name and line number of where the log message was issued.
  • Date and time.
  • Log level as discussed above.
  • Thread info — relevant to multi-threads environments to be able to separate different flows.
  • Context info — similar to thread info but more specific to a use case, add context information like user id, request id etc’. Framework features like MDC make it easier to implement

I highly recommend using those, but bare in mind that some features pose performance overhead when they are evaluated.

Logging — the essentials

Logging is a big world, I couldn’t cover all of it here, but I hope I convinced you to use it.

Have fun and keep on logging!

Live Tail in Kubernetes / Docker Based environment

At Outbrain we are big believers in Observability.

What is Observability, and what is the difference between Observability and Monitoring? I will leave the explanation to Baron Schwartz @xaprb:

“Monitoring tells you whether the system works.  Observability lets you ask why it’s not working.”

@ Outbrain we are currently in the midst of migrating to a Kubernetes / Docker based environment.

This presented many new challenges around understanding why things don’t work.

In this post I will be sharing with you our logging implementation which is the first tool used to understand the why.

But first thing first, a short review of our current standard logging architecture:

We use a standard ELK stack for the majority of our logging needs. By standard I mean Logstash on bare metal nodes, Elasticsearch for storage and Kibana for visualizing and analytics.  Apache Kafka is transport layer for all of the above.

A very simplified sketch of the system:Live Tail in Kubernetes

Of course the setup is a bit more complex in real life since Outbrain’s infrastructure is spread across thousands of servers, in multiple physical data centers and cloud providers; and there are multiple Elasticsearch clusters for different use cases.

Add to the equation that these systems are used in a self-serve model, meaning the engineers are creating and updating configurations by themselves – and you end up with a complex system which must be robust and resilient, or the users will lose trust in the system.

The move to Kubernetes presented new challenges and requirements, specifically related to the logging tools:

  • Support multiple Kubernetes clusters and data centers.
  • We don’t want to us “kubectl”, because managing keys is a pain especially in a multi cluster environment.
  • Provide a way to tail logs and even edit log file. This should be available on a single pod or across a service deployed in multiple pods.
  • Leverage existing technologies: Kafka, ELK stack and Log4j on the client side
  • Support all existing logging sources like multiline and Json.
  • Don’t forget services which don’t run in Kubernetes, yes we still need to support those.

 

So how did we meet all those requirements? Time to talk about our new Logging design.

The new architecture is based on a standard Kubernetes logging setup – Fluentd daemonset running on each Kubelet node, and all services are configured to send logs to stdout / err  instead of a file.

The Fluentd agent is collecting the pod’s logs and adding the Kubernetes level labels to every message.

The Fluentd plugin we’re using is the kubernetes_metadata_filter.

After the messages are enriched they are stored in a Kafka topic.

A pool of Logstash agents (Running as pods in Kubernetes) are consuming and parsing messages from Kafka as needed.

Once parsed messages can be indexed into Elasticsearch or routed to another topic.

A sketch of the setup described:

A sketch of the setup described:

And now it is time to introduce CTail.

Ctail, stands for Containers Tail, it is an Outbrain homegrown tool written in Go, and based on a server and client side components.

A CTail server-side component runs per datacenter or per Kubernetes cluster, consuming messages from a Kafka topic named “CTail” and based on the Kubernetes app label creates a stream which can be consumed via the CTail client component.

Since order is important for log messages, and since Kafka only guarantees order for messages in the same partition, we had to make sure messages are partitioned by the pod_id.

With this new setup and tooling, when Outbrain engineers want to live tail their logs, all they need to do is launch the CTail client.

Once the Ctail client starts, it will query Consul, which is what we use for service discovery, to locate all of the CTail servers; register to their streams and will perform aggregations in memory – resulting in a live stream of log entries.

Here is a sketch demonstrating the environment and an example of the CTail client output:

CTail client output

 

To view logs from all pods of a service called “ob1ktemplate” all you need is to run is:

# ctail-client -service ob1ktemplate -msg-only

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: uri http://localhost:8181/Ob1kTemplate/ returned status code 200
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751532409-n1kxv: Getting uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate'
2017-06-13T19:16:25.531Z ob1ktemplate-test-ssages-2751568954-n1rte: uri http://localhost:8181/Ob1kTemplate/api/echo?name='Ob1kTemplate' returned status code 200

Or logs of a specific pod:

# ctail-client -service ob1ktemplate -msg-only -pod ob1ktemplate-test-ssages-2751568960-n1kwd

2017-06-13T19:16:25.525Z ob1ktemplate-test-ssages-2751568960-n1kwd: Running 5 self tests now...
2017-06-13T19:16:25.527Z ob1ktemplate-test-ssages-2751568960-n1kwd: Getting uri 
http://localhost:8181/Ob1kTemplate/
2017-06-13T19:16:25.529Z ob1ktemplate-test-ssages-2751568960-n1kwd: uri http://localhost:8181/Ob1kTemplate/ returned status code 200

 

This is how we solve this challenge.

Interested in reading more about other challenges we encountered during the migration? Either wait for our next blog, or reach out to visibility at outbrain.com.

Keep bugs out of production

Production bugs are painful and can severely impact a dev team’s velocity. My team at Outbrain has succeeded in implementing a work process that enables us to send new features to production free of bugs, a process that incorporates automated functions with team discipline.

Why should I even care?

Bugs happen all the time – and they will be found locally or in production. But the main difference between preventing and finding the bug in a pre-production environment is the cost: according to IBM’s research, fixing a bug in production can cost X5 times more than discovering it in pre-production environments (during the design, local development, or test phase).

Let’s describe one of the scenarios happen once a bug reaches production:

  • A customer finds the bug and alerts customer service.
  • The bug is logged by the production team.
  • The developer gets the description of the bug, opens the spec, and spends time reading it over.
  • The developer then will spend time recreating the bug.
  • The developer must then reacquaint him/herself with the code to debug it.
  • Next, the fix must undergo tests.
  • The fix is then built and deployed in other environments.
  • Finally, the fix goes through QA testing (requiring QA resources).

How to stop bugs from reaching production

To catch and fix bugs at the most time-and-cost efficient stage, we follow these steps, adhering to the several core principles:

How to stop bugs from reaching production

Stage 1 – Local Environment and CI

Step 1: Design well. Keep it simple.

Create the design before coding: try to divide difficult problems into smaller parts/steps/modules that you can tackle one by one, thinking of objects with well-defined responsibilities. Share the plans with your teammates at design-review meetings. Good design is a key to reducing bugs and improving code quality.

Step 2: Start Coding

Code should be readable and simple. Design and development principles are your best friends. Use SOLID, DRY, YAGNI, KISS and Polymorphism to implement your code.
Unit tests are part of the development process. We use them to test individual code units and ensure that the unit is logically correct.
Unit tests are written and executed by developers. Most of the time we use JUnit  as our testing framework.

Step 3: Use code analysis tools

To help ensure and maintain the quality of our code, we use several automated code-analysis tools:
FindBugs – A static code analysis tool that detects possible bugs in Java programs, helping us to improve the correctness of our code.
Checkstyle –  Checkstyle is a development tool to help programmers write Java code that adheres to a coding standard. It automates the process of checking Java code.

Step 4: Perform code reviews

We all know that code reviews are important. There are many best practices online (see 7 Ways to Up-Level Your Code Review Skills, Best Practices for Peer Code Review, and Effective Code Reviews), so let’s focus on the tools we use. All of our code commits are populated to ReviewBoard, and developers can review the committed code, see at any point in time the latest developments, and share input.
For the more crucial teams, we have a build that makes sure every commit has passed a code review – in the case that a review has not be done, the build will alert the team that there was an unreviewed change.
Regardless of whether you are performing a post-commit, a pull request, or a pre-commit review, you should always aim to check and review what’s being inserted into your codebase.

Step 5: CI

This is where all code is being integrated. We use TeamCity to enforce our code standards and correctness by running unit tests, FindBugs validations Checkstyle rules and other types of policies.

Stage 2 – Testing Environment

Step 1: Run integration tests

Check if the system as a whole work. Integration testing is also done by developers, but rather than testing individual components, it aims to test across components. A system consists of many separate components like code, database, web servers, etc.
Integration tests are able to spot issues like wiring of components, network access, database issues, etc. We use Jenkins and TeamCity to run CI tests.

Step 2: Run functional tests

Check that each feature is implemented correctly by comparing the results for a given input with the specification. Typically, this is not done at the development level.
Test cases are written based on the specification, and the actual results are compared with the expected results. We run functional tests using Selenium and Protractor for UI testing and Junit for API testing.

Stage 3 – Staging Environment

This environment is often referred to as a pre-production sandbox, a system testing area, or simply a staging area. Its purpose is to provide an environment that simulates your actual production environment as closely as possible so you can test your application in conjunction with other applications.
Move a small percentage of real production requests to the staging environment where QA tests the features.

Stage 4 – Production Environment

Step 1: Deploy gradually

Deployment is a process that delivers our code into production machines. If some errors occurred during deployment, our Continuous Delivery system will pause the deployment, preventing the problematic version to reach all the machines, and allow us to roll back quickly.

Step 2: Incorporate feature flags

All our new components are released with feature flags, which basically serve to control the full lifecycle of our features.  Feature flags allow us to manage components and compartmentalize risk.

Step 3: Release gradually

There are two ways to make our release gradual:

  1. We test new features on a small set of users before releasing to everyone.
  2. Open the feature initially to, say, 10% of our customers, then 30%, then 50%, and then 100%.

Both methods allow us to monitor and track problematic scenarios in our systems.

Step 4: Monitor and Alerts

We use the ELK stack consisting of Elasticsearch, Logstash, and Kibana to manage our logs and events data.
For Time Series Data we use Prometheus as the metric storage and alerting engine.
Each developer can set up his own metrics and build grafana dashboards.
Setting the alerts is also part of the developer’s work and it is his responsibility to tune the threshold for triggering the PagerDuty alert.
PagerDuty is an automated call, texting, and email service, which escalates notifications between responsible parties to ensure the issues are addressed by the right people at the right time.

stop bugs
All in All,
Don’t let the bugs fly out of control.

African Elephant (Loxodonta africana)

Migrating Elephants – How To Migrate Petabyte Scale Hadoop Clusters With Zero Downtime

Outbrain has been an early adopter of Hadoop and we, the team operating it, have acquired a lot of experience running it in production in terms of data ingestion, processing, monitoring, upgrading etc. This also means that we have a significant ecosystem around each cluster, with both open source and in-house systems.

A while back we decided to upgrade both the hardware and software versions of our Hadoop clusters.

“Why is that a big problem?” you might ask, so let me explain a bit about our current Hadoop architecture. We have two clusters of 300 machines in two different data centers, production and DR. Each cluster has a total dataset size of 1.5 PB with 5TB of compressed data loaded into it each day. There are ~10,000 job executions daily of about 1200 job definitions that were written by dozens of developers, data scientists and various other stakeholders within the company, spread across multiple teams around the globe. These jobs do everything from moving data into Hadoop (for ex. Sqoop or Mysql to Hive data loads), processing in Hadoop (for ex. running Hive, Scalding or Pig jobs), and pushing the results into external data stores (for ex. Vertica, Cassandra, Mysql etc.). An additional dimension of complexity originates from the dynamic nature of the system since developers, data scientists and researchers are pushing dozens of changes to how flows behave in production on a daily basis.

This system needed to be migrated to run on new hardware, using new versions of multiple components of the Hadoop ecosystem, without impacting production processes and active users. A partial list of the components and technologies that are currently being used and should be taken into consideration is HDFS, Map-Reduce, Hive, Pig, Scalding, and Sqoop. On top of that, of course, we have several more in-house services for data delivery, monitoring and retention that we have developed.

I’m sure you’ll agree that this is quite an elephant.

Storming Our Brains

We sat down with our users, and started thinking about a process to achieve this goal and quickly arrived at several guidelines that our selected process should abide by:

  1. Both Hadoop clusters (production and DR) should always be kept fully operational
  2. The migration process must be reversible
  3. Both value and risk should be incremental

After scratching our heads for quite a while, we came up with these options:

  1. In place: In place migration of the existing cluster to the new version and then rolling the hardware upgrade by gradually pushing new machines into the cluster and removing the old machines. This is the simplest approach and you should probably have a very good reason to choose a different path if you can afford the risk. However, since upgrading the system in place would expose clients to a huge change in an uncontrolled manner and is not by any means an easily reversible process we had to forego this option.
  2. Flipping the switch: The second option is to create a new cluster on new hardware, sync the required data, stop processing on the old cluster and move it to the new one. The problem here is that we still couldn’t manage the risk, because we would be stopping all processing and moving it to the new cluster. We wouldn’t know if the new cluster can handle the load or if each flow’s code is compatible with the new component’s version. As a matter of fact, there are a lot of unknowns that made it clear we had to split the problem into smaller pieces. The difficulty with splitting in this approach is that once you move a subset of the processing from the old cluster to the new, these results will no longer be accessible on the old cluster. This means that we would have had to migrate all dependencies of that initial subset. Since we have 1200 flow definitions with marvelous and beautiful interconnections between them, the task of splitting them would not have been practical and very quickly we found that we would have to migrate all flows together.
  3. Side by side execution: The 3rd option is to start processing on the new cluster without stopping the old cluster. This is a sort of an active-active approach, because both Hadoop clusters, new and old, will contain the processing results. This would allow us to migrate parts of the workload without risking interfering with any working pipeline in the old cluster. Sounds good, right.

First Steps

To better understand the chosen solution let’s take a look at our current architecture:

current architecture:

We have a framework that allows applications to push raw event data into multiple Hadoop clusters. For the sake of simplicity, the diagram describes only one cluster.

Once the data reaches Hadoop, processing begins to take place using a framework for orchestrating data flows we’ve developed in-house that we like to call the Workflow Engine.

Each Workflow Engine belongs to a different business group. That Workflow Engine is responsible for triggering and orchestrating the execution of all flows developed and owned by that group. Each job execution can trigger more jobs on its current Workflow Engine or trigger jobs in other business groups’ Workflow Engines. We use this partitioning mainly for management and scale reasons but during the planning of the migration, it provided us with a natural way to partition the workload, since there are very few dependencies between groups vs within each group.

Now that you have a better understanding of the existing layout you can see that the first step is to install a new Hadoop cluster with all required components of its ecosystem and begin pushing data into it.

To achieve this, we configured our dynamic data delivery pipeline system to send all events to the new cluster as well as the old, so now we have a new cluster with a fully operational data delivery pipeline:

data delivery pipeline

Side by Side

Let’s think a bit about what options we had for running a side by side processing architecture.

We could use the same set of Workflow Engines to execute their jobs on both clusters, active and new. While this method would have the upside of saving machines and lower operational costs it would potentially double the load on each machine since jobs are assigned to machines in a static manner. This is due to the fact that each Workflow Engine is assigned a business group and all jobs that belong to this group are executed from it. To isolate the current production jobs execution from the ones for the new cluster we decided to allocate independent machines for the new cluster.

Let the Processing Commence!

Now that we have a fully operational Hadoop cluster running alongside our production cluster, and we now have raw data delivered into it, you might be tempted to say: “Great! Bring up a set of Workflow Engines and let’s start side by side processing!”.

Well… not really.

Since there are so many jobs and they doing varied types of operations we can’t really assume that letting them run side by side is a good idea. For instance, if a job calculates some results and then pushes them to MySql, these results will be pushed twice. Aside from doubling the load on the databases for no good reason, it may cause in some cases corruption or inconsistencies of the data due to race conditions. In essence, every job that writes to an external datasource should be allowed to run only once.

So we’ve described two types of execution modes a WorkflowEngine can have:

Leader: Run all the jobs!

Secondary: Run all jobs except those that might have a side effect external to that Hadoop cluster (e.g. write to external database or trigger an applicative service). This will be done automatically by the framework thus preventing any effort from the development teams.

When a Workflow Engine is in secondary mode, jobs executed from it can read from any source, but write only to a specific Hadoop cluster. That way they are essentially filling it up  and syncing (to a degree) with the other cluster.

Let’s Do This…

Phase 1 of the migration should look something like this:

Let's Do This...

 

Notice that I’ve only included a Workflow Engine for one group in the diagram for simplicity but it will look similar for all other groups.

So the idea is to bring up a new Workflow Engine and give it the role of a migration secondary. This way it will run all jobs except for those writing to external data stores, thus eliminating all side effects external to the new Hadoop cluster.

By doing so, we were able to achieve multiple goals:

  1. Test basic software integration with the new Hadoop cluster version and all services of the ecosystem (hive, pig, scalding, etc.)
  2. Test new cluster’s hardware and performance compared to the currently active cluster
  3. Safely upgrade each business group’s Workflow Engine separately without impacting other groups.

Since the new cluster is running on new hardware and with a new version of Hadoop ecosystem, this is a huge milestone towards validating our new architecture. The fact the we managed to do so without risking any downtime that could have resulted from failing processing flows, wrong cluster configurations or any other potential issue was key in achieving our migration goals.

Once we were confident that all phase 1 jobs were operating properly on the new cluster we could continue to phase 2 in which a migration leader becomes secondary and the secondary becomes a leader. Like this:

new cluster

In this phase all jobs will begin running from the new Workflow Engine impacting all production systems, while the old Workflow Engine will only run jobs that create data to the old cluster. This method actually offers a fairly easy way to rollback to the old cluster in case of any serious failure (even after a few days or weeks) since all intermediate data will continue to be available on the old cluster.

The Overall Plan

The overall process is to push all Workflow Engines to phase 1 and then test and stabilize the system. We were able to run 70% (!) of our jobs in this phase. That’s 70% of our code, 70% of our integrations and APIs and at least 70% of the problems you would experience in a real live move. We were able to fix issues, analyze system performance and validate results. Only once everything seems to be working properly we can start pushing the groups to phase 2 one by one into a tested, stable new cluster.

Once again we benefit from the incremental nature of the process. Each business group can be pushed into phase 2 independently of other groups thus reducing risk and increasing our ability to debug and analyze issues. Additionally, each business group can start leveraging the new cluster’s capabilities (e.g. features from newer version, or improved performance) immediately after they have moved to phase 2 and not after we have migrated every one of the ~1200 jobs to run on the new cluster. One pain point that can’t be ignored is that inter-group dependencies can make this a significantly more complicated feat as you need to bring into consideration the state of multiple groups when migrating.

What Did We Achieve?

  1. Incremental Migration – Due to the fact that we had an active-active migration that we could apply on each business group, we benefited in terms of mitigating risk and gaining value from the new system gradually.
  2. Reversible process- since we kept all old workflow engines (that executed their jobs on the old Hadoop cluster) in a state of secondary execution mode, all intermediate data was still being processed and was available in case we needed to revert groups independently from each other.
  3. Minimal impact on users – Since we defined an automated transition of jobs between secondary and leader modes users, didn’t need to duplicate any of their jobs.

What Now?

We have completed the upgrade and migration of our main cluster and have already started the migration of our DR cluster.

There are a lot more details and concerns to bring into account when migrating a production system at this scale. However, the basic abstractions we’ve introduced here, and the capabilities we’ve infused our systems with have equipped us with the tools to migrate elephants.

For more information about this project, you can check out the video from Strata 2017 London where I discussed it in more detail.

Effective Testing with Loan Pattern in Scala

test

Tests are crucial in systems that rely on CI/CD as part of their release cycle. One of the challenges is to write stable tests that work for you without spending a lot of time on maintaining bad tests.

Tests are Hard

They’re hard to write, hard to maintain and it’s even harder to stabilize a flaky test. At Outbrain, we take special pride in our ability (for the most part) to deliver new features to production and doing so with the confidence that only reliable tests can give you. These tests play a crucial role in our ability to deliver fast, good and stable code making sure no regression bugs were introduced in the process. It is crucial then, to not only maintain good test suites (unit tests, integration, and e2e) but also to fix any test that misbehaves (flaky tests).

We have a special environment to facilitate integration and e2e tests called simulation environment (it is only one of the set of tools we have for that purpose). This is a dedicated set of servers which we use to simulate our production environment. We deploy every new version of our services to that environment before we deploy to production, and run tests that check new flows of code, regression, and interoperability to other services.

In order to write an effective test for a new feature, we sometimes need to set up the environment with entities that are required for the feature we’re testing. If, for example, our new feature is to register a car to an owner (a Person entity). Before running the tests we need the required entities, a Car and a Person in our database. We’re not trying to test a flow for creating a new car, or a new person in this scenario. Therefore there is no need in creating the car and/or the person entities explicitly in the test before the actual test scenario happens. And in order to make our tests as clear and succinct as possible — we don’t want to be creating this data explicitly in each and every test.

Bad Practices

So, it was a common practice (albeit a bad one) to have pre-existing data on which we would rely on to run tests (for the whole simulation environment!). This led to two big (interconnected) problems:

  1. No test isolation – a test mistakenly deleting some or all of the pre-existing data, for example, would do so for all the tests that run in that environment
  2. Flaky tests – tests running concurrently are creating, deleting and generally changing data that affects others, which in turn would fail tests for no good reason — which makes it really hard to analyze and fix a failing test

We’ve tackled this problem by creating the needed data before the tests in a test class and deleting it after the test run. Which mitigated the problem somewhat — not only the tests in the same class were interconnected but also added boilerplate to the test class. Now, a test class looked something like (assuming these are entities autogenerated by Scalike for the relevant tables):

ScalaTest:

Specs2:

Looking at this, we were presented with a challenge. First, the data is created for all the tests that run in a class, which must be deleted only after all tests have finished running — this means that the tests are not isolated one from another and potentially may become flaky. Second, we wanted an elegant way of creating and deleting the needed entities seamlessly in order to minimize the boilerplate for each test class.

Note: It is possible however, in Specs2, to make a better solution by using the ‘Scope’ trait like so:

And using it in a test like so:

It’s a good solution, for a simpler problem than we faced. We needed the tests running in a single transaction, with a supplied session and a configurable db name (indicating a set of Scalike connection parameters).

Enter Loan Pattern

We first encountered this pattern when using ScalaTest and quickly moved to using it also in Specs2 (as most of our tests are written in Specs2). From ScalaTest documentation for Sharing fixtures:

“A test fixture is composed of the objects and other artifacts (files, sockets, database connections, etc.) tests use to do their work. When multiple tests need to work with the same fixtures, it is important to try and avoid duplicating the fixture code across those tests.”
“If you need to both pass a fixture object into a test and perform cleanup at the end of the test, you’ll need to use the loan pattern”

Which means, we can use fixtures to set up ‘artifacts’ for the tests to use, promoting the DRY principle by minimizing code duplication. It is also a good way to reduce boilerplate when writing tests. So, we wrote this one simple trait:

ScalaTest:

Let’s go over what’s happening in this trait. We’re mixing in a custom trait called ‘DefaultGenerator’ which gives us the ‘DefaultObjects’ which are the entities we need to be pre-created for our tests to run. We have two private methods. One that calls ‘create’ on ‘DefaultObjects’ with a custom name to generate the needed entities. The other calls ‘cleanup’ on the test data to clean the environment after the test has finished running. And the star of this trait, the method (or fixture if you will) ‘withTestData’ which gets the test function as a parameter, calls the private method ‘createTestData’, calls the test and passing it the data we just generated and finally cleans up the generated data after the test finishes.

When mixing this trait in our test class, we get the following code:

‘testData’ is the data generated in our ‘withTestData’ method (a car and a person in our case).

The Specs2 version of the Loan Pattern is a bit more complex, as we’ve added some more bells and whistles to make it easier for us to create those entities in our domain. We’re using Scalike to create the entities in MySQL database, and we need a somewhat more refined control over the session we’re using, DB name etc’.

Specs2:

It’s very similar to the ScalaTest flavor, but with several changes we needed to make to better facilitate our needs in the Specs2 tests. We have a mechanism to initialize a named DB connection, with a named connection pool and an explicit session. Besides these additions, it’s pretty similar to ScalaTest — generate the test data, run the test and clean the generated data.

The test class now looks like this:


Summary

We tackled several issues our team faced on a day to day basis, which made our simulation environment unstable, hard to maintain and generally very frustrating to work on. By extracting data generation and cleanup to an external trait and using a clever mechanism to reduce boilerplate, we managed to clean and simplify the test class, reduce code duplication and generally made our lives easier. Tests are still hard, but a bit easier to write and nicer to read. What do you think?

We are testing ScyllaDB – live blogging #1

The background

Screen Shot 2016-03-15 at 12.42.47 AMIn the last month, we have started, in Outbrain, to test ScyllaDB. I will tell you in a minute what ScyllaDb is and how we came to test it but I think what is most important is that ScyllaDB is a new database at its early stages and still before its first GA (coming soon). It is not an easy decision to be among the firsts to try such a young project that not many have used before (up until now there are about 2 other production installations) but as they say, someone have to be the first one… Both ScyllaDB and Outbrain are very happy to openly share how the test goes, what are the hurdles what works and what not.

How it all began:

I know the guys from Scylla for quite some time, we have met through the first iteration of the company (Cloudius-systems) and we’ve met at the early stages of ScyllaDB too. Dor and Avi, the founders of ScyllaDB, wanted to consult if as heavy users of Cassandra, we will be happy for the solution they are going to write. I said, “Yes,  definitely”  and I remember saying, “If you will give me Cassandra functionality and operability at the speed and throughput of Redis, You got me.”

Time went by and about 6 months ago they came back and said they are ready to start integrations with live production environments.

This is the time to tell you what ScyllaDB is.

The easiest description is “Cassandra on steroids”. That’s right but in order to do that, the guys in Scylla basically had to write all Cassandra server from scratch, meaning:

  • Keep all Cassandra interface perfectly the same so client applications will not have to change.
  • Write it all over in C++, and by that overcome the issues that JVM brings with it, mostly no GC that was hurting the high percentiles of latency.
  • Write it all in Asynchronous programming model that enable the server to run in very high throughput.
  • Shard per core approach – on top of the cluster sharding, Scylla uses shard-per-core which allows it to run lockless and scale up with the number of cores
  • Scylla uses its own cache and does not rely on the operating system cache. It saves data copy and does not slow down due to page faults

I must say that was intriguing my mind as if you are looking at OpenSource NoSQL data systems that picked up, there is one camp of  C++, High performance but, yet,  simple functionality (memcached or redis) and the heavy functionality but JVM based camp (Spark, Hadoop, Cassandra). However if you can combine the good of both worlds – it sounds great.

Where does that meet Outbrain?

Outbrain is a heavy user of Cassandra. We have few hundreds of Cassandra machines running in 20 clusters over 3 datacenters. They store 1-2 terabytes of data each. Some of the clusters are being hit on user’s query time and unexpected latency is an issue. As data, traffic and complexity grew up with outbrain it became more and more complex to maintain the cassandra clusters and keep them up to reasonable performance. It always required more and more hardware to support the growth as well as the performance.

The promise of getting stable latency, 5-10x more throughput (much less machines)without the cost of re-writing our code made a lot of sense and we decide to give it a shot.

One thing was not yet in the product that we needed deeply was Cross DC clusters. The Cassandra feature of eventual consistency across different clusters in different Data Center is key to how Outbrain operates and it was very important for us. It took the guys from ScyllaDB a couple of months to finish that feature, test and verify all works and we were ready to go.

ScyllaDB team is located in Herzliya which is very close to our office in Netanya and they were very happy to come and start the test.

The team working on this test is:

Doron Friedland – Backend engineer at Outbrain’s App Services team.

Evgeny Rachlenko – from Outbrain’s Data Operations team.

Tzach Liyatan – ScyllaDB Product manager.

Shlomi Livne – ScyllaDB VP of R&D.

The first step was to allocate the right cluster and functionality we want to run the test on. After a short consideration we chose to run this comparison test on the cluster that holds all our Documents store. It holds all information about all active documents in Outbrain’s system. We are talking about few millions of documents where each one of them have hundreds of different features represented as Cassandra columns. This store is being updated all the time and being accessed in every user request (few million requests every minute). Cassandra started struggling with this load and we started applying many solutions and optimizations in order to keep the load. We also enlarged the cluster so we can keep it up.

One more thing that we did in order to overcome the Cassandra performance issues was to add a level of application cache that consumes few more machines

by itself.

One can say, that’s why you chose a scalable solution like Cassandra so you can grow it as you wish. But when the number of servers start to rise and have significant cost, you want to look at other solutions. This is where ScyllaDB came into play.

The next step was to install a cluster, similar in size to the production cluster.

Evgeny describes below the process of installing the cluster:

Well, the  installation impressed me in the two aspects.

Configuration part was pretty same to Cassandra with few changes in parameters.

Scylla simply ignoring GC, or HEAP_SIZE parameters  and use configuration as extension of cassandra.yaml file.

Our Cassandra’s clusters  running with many components integrated into outbrain ecosystem.  Shlomi with Tzach has defined properly  the most important graphs and alerts. Services such as consul, collectd, prometheus with graphana  also has been integrated as part of POC. Most integration test passed without my intervention except light changes in the Scylla chef’s cookbook.

Tzach is describing what it looked like from their side:

Scylla installation, done by Evgeny, was using a clone of Cassandra Chef recipes, with a few minor changes. Nodetool and cqlsh was used for sanity test of the new cluster.

As part of this process, Scylla metric was directed to OutBrain existing Prometheus/ Grafana monitoring system. Once traffic was directed to the system, the application and ScylladDB metrics was all in one dashboard, for easy comparison.

Doron is describing the application level steps of the test:

    1. Create dual DAO to work with ScyllaDB in parallel to our Cassandra main storage (see elaboration on the dual DAO implementation below).
    2. Start dual writes to both clusters (in production).
    3. Start dual read (in production) to read from ScyllaDB in addition to the Cassandra store (see the test-dual DAO elaboration below).
    4. Not done yet: migrate the entire data from Cassandra to ScyllaDB by streaming the data into the ScyllaDB cluster (similar to migration between Cassandra clusters).
    5. Not done yet: Measure the test-reads from ScyllaDB and compare both the latency and the data itself – to the data taken from Cassandra.
    6. Not done yet: In case the latency from ScyllaDB is better, try to reduce the number of nodes to test the throughput.

Performance metrics:

Here are some very initial measurement results:

You can clearly see below that ScyllaDB is performing much better and in a much more stable performance.

One current disclaimer here is that Scylla still does not have all the historic data and just using data of the last week.

It’s not visible from the graph but ScyllaDB is not loaded and thus spends most of the time idling, the more loaded it will become, latency will reduce (until a limit of course).

We need to wait and see the following weeks measurements. Follow our next posts.

Read latency (99 percentile) of single entry: *First week – data not migrated (2.3-7.3):

Scylla:

Scylla:

Cassandra:

Cassandra:

Read latency (99 percentile) of multi entries (* see comment below): *First week – data not migrated (2.3-7.3):

Scylla

Scylla

Cassandra

Cassandra

* The read of multiple partitions keys is done by firing single partitions key requests in parallel, and waiting for the slowest one. We have learned that this use-case extremes evert latency issues we have in the high percentiles.

That’s where we are now. The test is moving on and we will update with new findings as we progress.

In the next post-Doron will describe the Data model and our Dual DAO which is the way we run such tests. Shlomi and Tzach will describe the Data transfer and upgrade events we had while doing it.

Stay tuned.

Next update is here.

How to substitute dynamic URL parameters: a template engine use case

Background

Outbrain is one of the world’s largest content discovery platforms, serving more than 200B recommendations monthly and reaching over 561 million unique visitors from across the globe.

As Outbrain becomes an important source of traffic and revenues for publishers, partners are looking for more tracking capabilities that will help them determine the amount of traffic generated by Outbrain, and how this traffic is distributed throughout their site. In addition, publishers want to see this information on their analytic tool of choice (Omniture, GA, Webtrends etc.).

Outbrain now provides them with tracking parameters that can be appended to the URL of recommendations.

Motivation

A url is constructed of three parts: prefix, actual url and suffix, where the prefix and suffix serve as means to let our partners add tracking capabilities in the form of dynamic parameters (for instance: the url’s title, publish date and id). Eventually, the dynamic parameters are added as a query string (field1=value1&field2=value2&field3=value3) to the recommendation url.

The legacy code would contract the three url parts and to use String replace in order to add the dynamic parameters. This implementation was hard to maintain (in situations where we wanted to support more dynamic parameters) as well as difficult to use since there was a need to import a lot of code to a project, and to depend on many modules.

When revisiting the problem, we understood that it would be appropriate to use a Separation of Concerns approach, separating the template from the model, and from the transformation logic itself – a template engine sounded like the right choice! In addition, since more and more dynamic parameters were being added, we used a builder pattern in order to achieve an easier and cleaner usage for clients, and easier maintenance in the future.

The problem – using a template engine doesn’t guarantee better performance

We decided to use StringTemplate as our template engine since we were familiar with it and had some good experience using it. The result of the refactor was a cleaner, shorter and maintainable API.

Unfortunately, when we deployed the new changes to production, we noticed a significant increase in serving time that was unacceptable in terms of user experience. After investigating the root cause, we found out that the usage of StringTemplate was pretty expensive. Even though the templates could be reused, we couldn’t reuse them all. For instance, we created a new template for each request, since it was constructed with different dynamic parameters. (The form of the url though, was the same: prefix, actual url and suffix).

So at that moment we had a clean and elegant solution that wasn’t performing well. We then looked for some alternative solutions for StringTemplate that could save us the expensive cost of constructing a new template for each request.

The solution – right tool for the job

Eventually, we found a light-weight template engine, that allowed us to keep using the Separation of Concerns approach and still achieve good performance. We ended up using Apache Commons Lang 3.0 StrSubstitutor – a simpler alternative to StringTemplate. This time, we made sure that it outperformed our last implementation by doing some micro benchmarking, and indeed the results were much better. The new implementation executed more than 4 times operation per second.

MicroBenchmarking Results

We used Java Microbenchmark Harness in order to perform our performance measurements.

Java Microbenchmark Harness

Raw data:

Raw data: