Category: Analytics

Meet Jacko: Bringing Hadoop Resource Utilization Into View

How do you manage resources and plan for future growth in a distributed, multi-tenant and ever-changing environment? In this blog post I will elaborate on the challenges we faced at Outbrain with managing our Hadoop clusters, and the service we created to improve visibility and transparency into the clusters’ utilization.

Bringing Hadoop Resource Utilization Into View

Outbrain generates a lot of data.

Every impression, click, and widget request is sent and made available for processing, along with many other business events. This amounts to tens of billions of new records being delivered each day into our Hadoop clusters, in two on-premise data centers, as well as in the cloud. These are multi-tenant clusters, with over a dozen distinct business entities running all kinds of workflows, using various data processing engines. We currently have a combined total of more than a thousand defined workflows, which in turn run a total of 10,000 jobs on each of our two main clusters every day. The workflows are always evolving, with dozens of commits each week. Data volumes also change constantly, usually upwards, as a result of traffic growth, new features being deployed or other production changes.

Most workflows start running when data for a certain period of time has fully arrived, be it hourly, daily or in some rare cases even weekly and monthly. And with everyone rushing at the same time to process the newly available data for their own uses, the cluster resources get depleted quickly. This is known as the Tragedy of the Commons, where each user tries to maximize its own benefits from the cluster, resulting in an overall slowdown. Since we don’t want jobs with tight SLAs to be slowed down by jobs with looser SLAs we schedule jobs using queues with different weights and capacities. This brings a third constantly changing variable into the picture: business priorities.

Take reporting vs. serving as an example, do we want our customers to have access to reports as soon as possible, but at the cost of our end-users not getting the best up-to-date recommendations, or vice-versa? This balancing act is a business decision and these decisions also change frequently, with different product features being given priority over others.

And so with ever-changing processing logic, data volumes as well as business priorities, it is our team’s job to provide continuous in-depth visibility as to how resources are being used, so that informed decisions about resource allocation can be made.

To solve this lack of visibility we created Jacko, a service that helps us collect runtime data available from various history servers and index it to Elasticsearch, allowing us to easily query and use Kibana dashboards and visualizations to analyze workflows’ behavior over time. Making this data accessible helps us estimate workloads, which can help evaluate costs for features across multiple workflows and plan for future growth. It can also be used for anomaly detection, so we can get alerts for all sorts of behavioral changes, from abnormal I/O usage to intermittent failures.

Jacko ScreenshotOne aspect of workflow runtime data where adding visibility was crucial for us was resource consumption. When users run their workflows they normally check whether they succeed or fail, and perhaps know the total execution time, but they are usually unaware of the actual impact on the cluster. The execution time is not a good indicator for resource consumption – a job can run for hours while using only 1% of the cluster’s resources while another runs for a few minutes but takes up all of the cluster’s resources. Measuring the cluster’s machines’ CPU usage also doesn’t help, since a core might be idle but as long as it is assigned to a certain job it is unavailable to the cluster, so it is effectively fully used. To know the actual resource consumption we must know the total time each resource was assigned to any tasks related to a workflow. And with Jacko, our users can now easily drill down and compare the workloads of their workflows.

Using these new insights into the actual resource consumption profiles of the workflows, it was easy for us to identify the topmost resource-hogging workflows and focus our efforts on improving them. For example, we found a Hive query that was supposed to process a single partition but ended up going over the entire table due to predicates not being pushed down correctly. Problems of this kind are normally easy to fix but are hard for users to detect – processing is distributed, so the total execution time was not abnormal in this case. Overall, these improvement efforts made possible by Jacko helped us reduce the total resource consumption in our main Hadoop clusters by more than 30%, equalling hundreds of thousands of dollars worth of hardware upgrades, while making sure resource usage is aligned with business priorities.

Kibana for Funnel Analysis

How we use Kibana (4) for user-acquisition funnel analysis

Outbrain has recently launched a direct-to-consumer (D2C) initiative. Our first product is a chatbot. As with every D2C product, acquiring users is important. Therefore, optimizing the acquisition channel is also important. The basis of our optimization is analysis.

kbfunnel-image01

Our Solution (General Architecture)

Our acquisition funnel spans on 2 platforms (2 web pages and a chatbot). Passing many parameter between platforms can be a challenge, so we chose a more stateful, server-based model. Client requests for a new session Id, together with basic data like IP and User agent. Server stores a session (we use Cassandra in this case) with processed fields like Platform, OS, Country, Referral, User Id. At a later stage the client reports a funnel event for a session Id. The server writes all known fields for the session into 2 storages:

  • ElasticSearch for quick & recent analytics (Using the standard ELK stack)
  • Hadoop for long term storage and offline reports

A few example fields stored per event

  • User Id – An unique & anonymous identifier for a user
  • Session Id – The session Id is the only parameter passed between funnel steps
  • Event Type – The specific step in the funnel – serve, view, click
  • User Agent – Broken down to Platform and OS
  • Location – based on IP
  • Referral fields – Information on the context in which the funnel is excercised
  • A/B Tests variants – The A/B Test variant Ids that are included in the session

Goal of the Analysis: Display most important metrics quickly

Kibana plugin #1: Displaying percent metric

Kibana has several ways of displaying a fraction, but none excel in displaying small numbers. (Pie can be used to visualize fractions, but small). We developed a Kibana plugin for displaying a single metric, in percent format.

funnel step

We use this visualization for displaying the conversion rate of the most interesting part of our funnel.

Kibana plugin #2: Displaying the funnel

We couldn’t find a good way for displaying a funnel so we developed a visualization plugin (honestly, we were eager to develop this, so we did not scan the entire internet..)

Based on the great D3 Funnel by Jake Zatecky, this is a Kibana plugin that displays buckets of events in funnel format. It’s customizable and open-source. Feel free to use it…

d2c funnel

Putting it all together

Displaying your most important metrics and the full funnel is nice. Comparing variant A with variant B is very nice. We’ve setup our dashboard to show similar key metrics on 2 versions of the funnel. We always try to run at least 1 A/B test and this dashboard shows us realtime results of our tests.

variant

Cherry on top

A timeline is awesome. If you’re not using it, I suggest trying it.

Viewing your most important metrics over time is very useful, especially when you’re making changes fast. Here’s an example:

Cherry on top

Summary

We track a user’s activity by sending events to the server. The server writes these events to ES and Hadoop. We developed 2 Kibana plugins to visualize the most important metrics of our user-acquisition funnel. We can filter the funnel by Platform, Country, OS, Time, Referral, or any other fields we bothered to save. In addition, we always filter by A/B Test variants and compare 2 specific variants.

Reducing risks by taking risks

You have a hypothesis you wish to prove or refute regarding your behavior in an extreme scenario.  For example:  after restarting the DB, all services are fully functioning within X seconds. There are at least 3 approaches:

  1. Wait && See — if you wait long enough this scenario will probably happen by itself in production.
  2. Deep Learning  — put the DBA, OPS and service owner in a big room, and ask them to analyze what will happen to the services after restarting the DB. They will need to go over network configuration, analyze the connection pool and DB settings, and so on.
  3. Just Do It — deliberately create this scenario within a controlled environment:

Timing — Bases on your knowledge pick the right quarter, month, week, day and hour – so the impact on the business will be minimal.

Monitoring — make sure you have all monitoring and alerts in place. If needed, do “manual monitoring”.

Scale — If applicable do it only on the portion of your production: one data center, part of the service, your own laptop, etc.

Scale

The Wait && See approach will give you the right answer, but at the wrong time. Knowing how your system behaves only after a catastrophe occurs is missing the point and could be expensive, recovery-wise.

The Deep Learning approach seems to be the safer one, but it requires a lot of effort and resources. The answer will not be accurate as it is not an easy task to analyze and predict the behavior of a complex system. So, in practice, you haven’t resolved your initial hypothesis.

The Just Do It approach is super effective – you will get an accurate answer in a very short time, and you’re able to manage the cost of recovery. That why we picked this strategy in order to resolve the hypothesis. A few tips if you’re going down this path:

  • Internal and External Communication — because you set the date, you can send a heads up to all stakeholder and customers. We use statuspage.io for that.
  • Collaboration — set a HipChat / Slack / Hangouts room so all relevant people will be on the same page. Post when the experiment starts, finishes, and ask service owners to update their status.
  • Document the process and publish the results within your knowledge management system (wiki / whatever).
  • This approach is applicable for organization that practice proactive risk management, have a healthy culture of learning from mistakes and maintain a blameless atmosphere.

People that pick Deep Learning are often doing so because it is considered a “safer”, more conservative approach, but that is only an illusion. There is a high probability that in a real-life DB restart event the system would behave differently from your analysis. This effect of surprise and its outcomes are exactly what we wanted to solve by predicting the system’s behavior.

The Just Do It approaches actually reduces this risk and enables you to keep it manageable. It’s a good example of when the bold approach is safer than the conservative one. So don’t hesitate to take risks in order to reduce risk, and Just Do It

ScyllaDB POC – live blogging #2

8508669031_4851697b0f_m

Hi again.

As we are now few weeks into this POC and we gave you first glimpse into it in the first post. We would like to continue and update on what are the latest developments.

However, before we come to the update. There are 2 things that we needed tell you about the setup that did not get into the first post just for length reasons.

The first thing to tell is the data structure we use as Doron explains it:

The Data

Our data is held using a single key as partition key, and additional 2 columns as clustering keys, in a manner of key/value structure:

  • A – partition key (number).
  • B – Clustering key 1 (text). This is the value type (i.e. “name”, “age” etc…).
  • C – Clustering key 2 (text).
  • D – The data (text). This is the value itself (i.e. “David”, “19”, etc…).

When storing the data we store either all data for a partition key, or partial data.

When reading we always read all data for the partition, meaning:

select * from cf_name where A=XXX;

When we need multiple partitions we just fire the above query in parallel and wait for the slowest result. We have come to understand that this approach is the fastest one.

As this meant to be the fastest, we need to understand that such reads latency is measured always by the slowest key to read. If you fire tens of reads into the cluster and If, by the chance, you bump into a slow response of one of the nodes (GC can be a good example for such case) your all read attempt is delayed.

This where we thought Scylla can help us improve the latency of the system.

The second thing we wanted to tell you about is what made this POC so easily done in Outbrain and this is our Dual DAO mechanism that Doron explains below.

Dual DAO implementation for Data Store migration/test

We have come to see that in many cases we need a dual DAO implementation to replace the regular DAO implementation.

The main use-case is data migration from one cluster to the other:

  1. Start dual writes to both clusters.
  2. Migrate the data – by streaming it into the cluster (for example).
  3. Start dual read – read from the new cluster and fallback to the old.
  4. Remove the old cluster once there are no fallbacks (if the migration was smooth, there should not be any).

The dual DAO we have written holds both instances of the old and new DAO, and also a phase – a number between 1 and 5. In addition, the dual DAO implements the same interface like the regular DAO, so it is pretty easy to inject it in and out when done.

The phases:

  1. Write to old cluster and read from old cluster only.
  2. Write to both cluster and read from old cluster only.
  3. Write to both cluster and read from new cluster, fallback to old cluster if missing.
  4. Write to new cluster and read from new cluster, fallback to old cluster if missing.
  5. Write to new cluster and read from new cluster only.

The idea here is to support gradual and smooth transition between the clusters. We have come to understand that in some cases transition done only by the dual DAO can be very slow (we can move to 5 only after no fallback is done), so we had 2 ways to accelerate it:

  1. Read-repair – when in phase 3 or 4, and we fallback to the old cluster – write the data we fetch to the new cluster. This approach usually fits to use cases of heavy reads.
  2. Stream the data in manually by Data guys – this has proven to be the more effective and quick way – and this is what we have done in this case.

Dual DAO for data store test:

The ScyllaDB POC introduces a slightly different use-case. We do not want to migrate to Scylla (at least not right now) – but add it to our system in order to test it.

In order to match those demands, we have added a new flavor to our dual DAO:

  1. The write to the new DAO is done in the background – logging its results in case of failures – but does not fail the write process. When working in dual DAO for migration we want to fail if either the new or old DAO fail. When working in test-mode we do not. At first we did not implement it like this – and we had a production issue when ScyllaDB upgrade (to a pre GA version, from 0.17 to 0.19)  caused the save queries to fail (on 9/3/16). Due to those issues we have changed the dual write to not fail upon the new DAO failure.
  2. The read from the new DAO is never used. On phases 3-5 we do read from the new DAO, and place a handler in the end to time the fetch and compare the results to the old DAO. However, the actual response gets back from the old DAO, when it is done. This is not bullet-proof but reduces the chances of production issues due to issues in the test cluster.

Mid April Update:

Last time we have reported was when ScyllaBD was loaded with partial and ongoing data while the C* cluster was loaded with the full historic data. Scylla was performing much better as we showed.

The next step we had to do was to load the full data into ScyllaDB cluster. This was done by Scylla streaming process that read the data from the C* cluster and wrote it to the ScyllaDB cluster. This process uncovered a bug with it that failed to extract the data from the newest version of C* that we’ve used.

Shlomi’s explanation is as follows:

Sstables upgraded from cassandra 2.0 to cassandra 2.1 can contain range tombstone with a different format from tombstones written in cassandra 2.1 – we had to add support for the additional format

The Scylla team figured it out and fixed it pretty quickly.More info about this issue can be found in the project GitHub.

After all the data was filled correctly into the ScyllaDB cluster, We’ve seen degrade in the ScyllaDB cluster which made it perform slightly worse than the C* (Surprise!!!!). A faulty configuration we’ve found in the C* “Speculative retries” mechanism and fixed it, actually made the C* latency about 50% better than the ScyllaDB. We need to remember we are measuring latency at its 99th percentile. In our case – as Doron mentioned above it’s while using multiple reads which is even harder.

ScyllaBD guys were as surprised as we are, They took a look into the system and found a bug with their cross DC read-repair, they fixed the issue and installed a new version.

Here is the description of the issue as explained by Tzach from Scylla: (reference to issue – https://github.com/scylladb/scylla/issues/1165)

After investigation, we found out the Scylla latency issue was caused by a subtle bug in the read-repair, causing unnecessary, synchronize, cross DC query.

When creating a table with read-repair chance (10% in this case) in Scylla or Cassandra, 10% of the reads send background queries from coordinator to ALL replications, including cross DC. Normally, the coordinator waits for responses only from a subset of the replicas, based on Consistency Level, Local Quorum in our case, before responding to the client.

However, when the first replica responses, does not match, coordinator will wait for ALL responses before sending response to the client. This is where Scylla bug was hidden.

Turnout, response digest was wrongly calculated on some cases, which cause the coordinator to believe local data is not in sync, and wait for remote DC nodes to response.

This issue is now fix and backport to Scylla 1.0.1

So, we upgraded to Scylla 1.0.1 and things look better. We can now say that C* and Scylla are in same range of latency but still C* is better. Or as Doron phrased it in this project google group:

Performance definitely improved starting 13/4 19:00 (IL time). It is clear.

I would say C* and Scylla are pretty much the same now… Scylla spikes a bit lower…

That did not satisfy the Scylla guys they went back to the lab and came back with the following resolution as described by Shlomi:

In order to test the root cause for the latency spikes we decided to try poll-mode:

Scylla supports two modes of operations – the default, event triggered mode and another extreme poll-mode. In the later, scylla constantly polls the event loop without no idle time at all. The cpu waits for work, always consuming 100% per core. Scylla was initially designed with this mode only and later switched to a more sane mode of operation with calls epoll_wait. The later requires complex synchronization before blocking on the OS syscall to prevent races between the cores going in/out sleep.

We recently discovered that the default mode suffers from increased latency and thus fixed it upstream, this fix did not propagate to 1.0.x branch yet and thus we recommend to use poll-mode here.

The journey didn’t stop here since another test execution in our lab revealed that hardware Hyper Threads (HT) may increase latencies during poll mode.

Yesterday, following our tests with and without HT, I have updated scylla configuration to use poll-mode and running only on 12 physical cores  (allowing the kernel to schedule the other processes on the HT).

Near future investigation will look into why poll-mode with HT is an issue (Avi suspects it’s L1 cache eviction). Once the default, event-based code will be back ported to the release, we’ll switch to it as well.

This is how the graphs look today:

This is how the graphs look today:

As you can see in the graphs above: (Upper graph is C*/ Lower graph is Scylla)

  • Data was loaded into Scylla in 4/1 (lower graph).
  • On 4/4 Doron did the fix in C* configuration and its latency improved dramatically.
  • On 4/13 Scylla 1.0.1 was installed and did an improvement.
  • On 4/20 Shlomi did the HyperThreading poll-mode config change.
  • The current status is that Scylla base is nicely on the 50ms spiking to above 90ms whereas C* is at 60-65 spiking to 120ms in some cases. It worth to remember that those measurements are 99th percentile taken from within Outbrain’s client service which is Java and by itself suffers from unstable performance.

There is a lot to learn from such POC. The guys from Scylla are super responsive and don’t hesitate to take learnings from every unexpected results.

Our next steps are:

  1. We still see some level of results inconsistencies between the 2 systems that we want to verify where they come from and fix.
  2. Move to throughput test by decreasing the number of machines in the Scylla cluster.

That’s all for now. More to come.

Next update is here.