Outbrain Tech Blog Recent Posts

Trust your IntelliJ

I’m in the habit of leaving classes cleaner than they were before I got there. CMD+ALT+O (optimize imports) and CMD+ALT+L (reformat code) have become a muscle memory by now. I will spend my time removing unused properties, deleting dead code, removing deprecations or duplications. One of my all-time favorites is rewriting tests to make them clean and readable. I do this as part of any day to day tasks, no matter how small or insignificant. It is satisfying to know you leave your code in a better state than before (and my OCD is fed and happy).

One of the best tools I use to achieve this task is none other than the good old IntelliJ IDEA. The highlights to unused parameters, typos, accessibility level of a class or method, a parameter that is always called with the same value etc. You know what I’m talking about, that nagging little yellow square you never pay attention to or deliberately ignore because you’re always working on some important, time-sensitive feature. You “never” have time to quickly sweep and ‘clean-up’ the class you’re working on.

 

“That nagging little square”

My team at work is divided into three groups:

  • The people who use a several-major-versions-old IntelliJ (reciting the age-old mantra — “if it ain’t broke, don’t fix it”)
  • A person who uses the latest stable version of IntelliJ
  • And me, always making sure to use the latest EAP.

The first group of people doesn’t see some of the suggestions IntelliJ gives me. The first example which pops to mind is the support of lambda expressions we got with Java 8. IntelliJ is very helpful in highlighting the code that can be converted to a shorter lambda which makes the code shorter, cleaner and quite frankly more readable (which is a matter of personal taste and habit). Today I got to a class which I have never seen before to make a tiny change. It’s a class in a service I’m not usually working on, so I hadn’t had the chance to make my usual OCD powered sweep and clean up. I proceeded to change one global parameter to private accessibility from the public and added a ‘final’ keyword to another parameter. Happy and satisfied, my OCD let me commit this minor change.

Imagine how surprised I was to have my OCD rub my nose in a piece of code I entirely missed:

IntelliJ just told me that it found a bug:

After changing the code and calling the ‘putAll’ method with ‘innerMap’:

IntelliJ even added a bonus of highlighting another optimization I could do — “Lambda can be replaced with method reference.” and now the code looks like so:

Now, I know what you’re thinking. It’s something of the sort of: “where were the tests?” or “tests would have protected him from making this stupid mistake” (I can hear your scolding tone in my head right now thank you very much), it’s a valid point, but it’s not the point I’m trying to make and it’s out of the scope of this post. What I’m trying to tell you is — trust your IntelliJ!

Spark in framework

Making Spark Native for Data Processing Framework

Working in an infrastructure team is different from working in a development team. The team develops tools that are used by developers’ teams inside Outbrain. It is very important to provide a convenient way for working with new technologies. Having a Spark cluster enables running distributed data processing in scale. Until now developers worked with Spark writing ad-hoc crontab jobs in production. It created a messy and unstable environment and required manual maintenance in case of failure. For that reason, our team wanted to give an infrastructural way for working with Spark.

In this blog post, I explain how we built the solution implementing Spark to be a native tool in our WorkflowEngine.

Meet WorkflowEngine

The Outbrain Data Infrastructure team enables running hundreds of ETL jobs every hour. These jobs are orchestrated by WorkflowEngine – an internal tool, which is developed by the team.

The WorkflowEngine (WFE) enables running various types of transformations from many kinds of data sources: MySQL, Cassandra, Hive, Vertica, Hadoop etc., located in two data centers and the cloud. Every transformation, or flow, implements some business logic. A flow may contain several tasks, which are executed one after another.

Flows are developed and owned by researchers, data scientists and developers in other groups at Outbrain, such as  Algorithm research groups, BI and Data Science.

input Trigger

Defining flows in WFE gives users a lot of benefits. It enables users to autonomously specify dependencies between flows, so when one flow ends, another is triggered. In addition, users can define a number of retries for their flows, so if a flow fails for some reason, WFE reruns it. If all retries fail, WFE sends an email or alert about the flow failures. Logs for each running flow are shown in a convenient form in WFE UI. Furthermore, WFE collects metrics for all flows, which enables us to track the health of the system.

From the operational perspective, several instances of WFE are deployed on machines which are owned by the infrastructure team, users just define ETL flows in a separate repository.

Diving Into the Solution

In order to respond to requirements for running data analytics on Spark, our team had to allow an infrastructural way for running Spark jobs orchestrated by WFE. To achieve this, we’ve developed a new kind of WorkflowEngine task called SparkStep.

When approaching the design for SparkStep, we had several principals in mind:

  1. Avoid producing additional load on WFE machines
  2. Create an isolated environment for every Spark job
  3. Allow users to specify and at the same time allow the system to limit the number of YARN resources allocated for every SPARK job
  4. Enable using multiple Spark clusters with different versions

Taking all this into consideration, we came up with the following architecture:

 

Keeping the control while having fun

The Spark job definition specifies how a  job will run on the Spark cluster (e.g. jarURL, className, configuration parameters). In addition, there are properties that are set internally by WFE. When WFE starts executing a SPARK job, it builds a spark-submit command using all these properties.

The spark-submit command submits the Spark job to YARN, which allocates needed resources and launches the application. After submitting the job, WFE constantly checks its status using YARN REST API. When the job is completed in YARN, WFE reports a COMPLETED status for the WFE Spark task and continues running the next task. However, if the Spark job fails in YARN, WFE reports a FAILED status for the WFE job. In this case, WFE can rerun the job, according to the defined number of retries. If the job runs in YARN for too long, and its running time exceeds the defined maximum, WFE stops the job using the YARN REST API. In this case, the job is marked as FAILED in WFE. This functionality ensures both that we have a cap on the amount of resources each flow can use and that resources are saved when a job is canceled for any reason.

The spark-submit command runs in a Docker container. The Docker exits once the job is submitted to the cluster. It enables us to create an isolated environment for every Spark job executed by WFE. In addition, using Docker container, we can easily use different Spark versions simultaneously, by creating different Docker images.

By using this architecture we achieve all our goals. Spark jobs are submitted to YARN in a “cluster” mode, so they do not create a load on WFE machines. YARN handles all resources for each job. Every Spark job runs in an isolated environment owing to the Docker container.

X tips [x>5] for Micro-Services Logging

Micro-Services Logging

What if?

What if someone told you it is forbidden to use logs anymore?

In my case, it began in a meeting with our ops team that claimed our services are writing too many logs and we should write less. Therefor logs are being “throttled” ie: some log messages are just discarded.

Too many logs were 100 lines per minute, which in my opinion was a ridiculously low number.

Maybe I am doing something wrong?

It might be that logs are not a good pattern in a micro-services-highly-distributed environment. I decided to rethink my assumptions.

Why do I need logs anyway?

The obvious reason to write logs is debugging purposes. When something goes wrong in production, logs are a way I can understand the flow that leads to that erroneous state.

The main alternative that I am aware of is connecting a debugger and stepping thru the code in the flow. There are a couple of disadvantages for using the debugger: It is time-consuming, It might slow down the production process itself and you have to be connected when it happens — so If this bug happens only at night — bummer. In addition debugging is a one time process, the learning from it is just in your head so it is hard to improve that way.

Another alternative is adding metrics. Metrics are pretty similar to logs but they have the extra feature of having nice dashboards and alerting systems (we use metrics , prometheus and grafana). On the other hand, metrics have bigger overhead in the setup process which is the main disadvantage in my opinion. Metrics are also more rigid and do not allow usually to log all state, context, and parameters. The fact that writing logs is easy makes it a no-brainer to use everywhere in the code while applying the “think later” paradigm.

The third alternative is auditing via systems like ELK etc’. Similar to metrics it has higher overhead and it also hard to follow sequential operations with those discrete events.

There are even more reasons for logging. It is an additional documentation in the code that can help understand what is going on. Logging can also be used as metrics and alerting systems and even replace those systems. Many insights can be gained from logs, sometimes even user passwords.

Specification

If I go back to that meeting with the ops guys, one questions I was asked was:

‘What are your requirements from your logging system’?

Here it is:

Order matters — messages should be in ‘written before’ order so it will be possible to understand the flow of the code.

Zero throttling — I expect that there will be no rate limit on writing, only on the volume of saved messages, so every time a message is thrown away, it is the oldest one.

X days history — Log files should keep log messages for at least couple of days — if that is not the case it might be you are writing too many log messages, so move some to debug level.

Logs should be greppable — text files has this big advantage of flexibility and multiple tools that can be used with them. So text files are a big advantage.

Metadata — logging system should provide some metadata like calling thread, timestamp, calling method or class etc’ to remove that burden from the developer (logging should be easy).

Distributed and centralized — it should be convenient to look at all logs in a central location, but also be able to split it and see logs of a specific process.

Easy to use, easy to install, easy to consume — in general logging should be fun.

Tips and Tricks

I am in. Is there anything else to know?

Use logging framework

Don’t log to standard output with print lines in a long-running service. Logging framework allows various levels, various appenders like logging to HTML via HTTP, log rotation and all sort of features and tricks.

Developer tip — Log levels

It’s important to be consistent when using different levels, otherwise, you will lose semantics and meaning of the message severity. The way I use them is:

  • Error — something bad happened and it might crash the process/service.
  • Warn — something bad happened in a specific use case.
  • Info — something that I want to see happen.
  • Debug — something happened, I wish to see only under special circumstances otherwise it will clutter normal logs.

There are some special reasons to override that rule: Error and Warn messages are monitored in our services, so sometimes I might move something to info level to snooze alerting of it. I move messages from Info to Debug if there is too much clutter and the other direction if I want to focus on something.

Developer tip — Meaningful messages

I often see messages like “start processing” or “end method”. It is always a good idea to try and imagine what else you would like to see when reading log messages s and add as much info as possible, specific parameters, fields and contextual data which might have different value at different scenarios.

Developer tip — Lazy evaluated strings

Especially for Debug level, it is a good practice to use frameworks that prevent the overhead of string concatenation when the level is turned off without the need to explicitly check if the level isn’t suppressed. In kotlin-logging, for example, it will look like this:

logger.debug { “Some $expensive message!” }

Developer tip — Don’t log errors more than once per error

It is a common anti-pattern to log an exception and then re-throw it just to be logged later in another place in the code again. It makes it harder to understand the number of errors and their origin. Log exceptions only if you are not re-throwing them.

Ad-Hoc enablement of log level

Some frameworks and services allow on-the-fly change of the active log level. It means you can print debug messages of a specific class in a specific instance for a couple of minutes, for example. It allows debugging while not trashing the log file in the rest of the time.

Ad-Hoc addition of logging messages

When I worked at Intel, one of my peers developed a JVM tool that allowed bytecode manipulation of methods and adding log messages at the beginning of methods and at the end of methods.

It means you didn’t have to think in advance on all those messages, but just to inject them with log messages when needed while the process is running.

In-Memory logs

Another useful technique is keeping last messages in memory. It allows developing an easy way to access them from remote via REST call, for example. It is also possible to have that dumped into a file in case the process has a crash.

Logging as a poor-man profiler

It is possible to analyze logs to gain also insight on the performance of the application. The simple technique I saw is using the timestamp in the logs. A more advanced technique is using the context to calculate and show the time from the beginning of the sequence (ie: when the HTTP call started), by using MDC.

Log formatting

The content of the message is also important. Various logging framework allows embedding predefined template parameters such as:

  • Location info — Class and file name, method name and line number of where the log message was issued.
  • Date and time.
  • Log level as discussed above.
  • Thread info — relevant to multi-threads environments to be able to separate different flows.
  • Context info — similar to thread info but more specific to a use case, add context information like user id, request id etc’. Framework features like MDC make it easier to implement

I highly recommend using those, but bare in mind that some features pose performance overhead when they are evaluated.

Logging — the essentials

Logging is a big world, I couldn’t cover all of it here, but I hope I convinced you to use it.

Have fun and keep on logging!

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 3

Previously on our second episode of the trilogy  “Hadoop Research Journey from bare metal to Google Cloud – Episode 2”, we covered the POC we had.

In this episode we will focus on the migration itself, building a POC environment is all nice and easy, however migrating 2 PB (the raw part out of 6 PB which include the replication) of data turned to be a new challenge. But before we jump into technical issues, lets start with the methodology.

The big migration

We learned from our past experience that in order for such a project to be successful, like in many other cases, it is all about the people – you need to be minded to the users and make sure you have their buy-in.

On top of that, we wanted to complete the migration within 4 months, as we had a renewal of our datacenter space coming up, and we wanted to gain from the space reduction as result of the migration.

Taking those two considerations in mind, we decided that we will have the same technologies which are Hadoop and Hive on the cloud environment, and only after the migration is done we would look into leveraging new technologies available on GCP.

Now after the decision was made we started to plan the migration of the research cluster to GCP, looking into different aspects as:

  • Build the network topology (VPN, VPC etc.)
  • Copy the historical data
  • Create the data schema (Hive)
  • Enable the runtime data delivery
  • Integrate our internal systems (monitoring, alerts, provision etc.)
  • Migrate the workflows
  • Reap the bare metal cluster (with all its supporting systems)

All in the purpose of productizing the solution and making it production grade, based on our standards. We made a special effort to leverage the same management and configuration control tools we use in our internal datacenters (such as Chef, Prometheus etc.) – so we would treat this environment as yet just another datacenter.

Copying the data

Sound like a straightforward activity – you need to copy your data from location A to location B.

Well, turns out that when you need to copy 2 PB of data, while the system is still active in production, there are some challenges involved.

The first restriction we had, was that the copy of data will not impact the usage of the cluster – as the research work still need to be performed.

Second, once data is copied, we also need to have data validation.

 

Starting with data copy

  • Option 1 – Copy the data using Google Transfer Appliance

Google can ship their transfer appliance (based on the location of your datacenter), that you would attach to the Hadoop Cluster and be used to copy the data. Ship it back to Google and download the data from the appliance to GCS.

Unfortunately, from the capacity perspective we would need to have several iterations of this process in order to copy all the data, and on top of that the Cloudera community version we were using was so old – it was not supported.

  • Option 2 – Copy the data over the network

When taking that path, the main restriction is that the network is used for both the production environment (serving) and for the copy, and we could not allow the copy to create network congestion on the lines.

However, if we restrict the copy process, the time it would take to copy all the data will be too long and we will not be able to meet our timelines.

Setup the network

As part of our network infrastructure, per datacenter we have 2 ISPs, each with 2 x 10G lines for backup and redundancy.

We decided to leverage those backup lines and build a tunnel on those lines, to be dedicated only to the Hadoop data copy. This enabled us to copy the data in relatively short time on one hand, and assure that it will not impact our production traffic as it was contained to specific lines.

Once the network was ready we started to copy the data to the GCS.

As you may remember from previous episodes, our cluster was set up over 6 years ago, and as such acquired a lot of tech debt around it, also in the data kept in it. We decided to take advantage of the situation and leverage the migration also to do some data and workload cleanup.

We invested time in mapping what data we need and what data can be cleared, although it didn’t significantly reduce the data size we managed to delete 80% of the tables, we also managed to delete 80% of the workload.

Data validation

As we migrated the data, we had to have data validation, making sure there are no corruptions / missing data.

More challenges on the data validation aspects to take into consideration –

  • The migrated cluster is a live cluster – so new data keeps been added to it and old data deleted
  • With our internal Hadoop cluster, all tables are stored as files while on GCS they are stored as objects.

It was clear that we need to automate the process of data validation and build dashboards to help us monitor our progress.

We ended up implementing a process that creates two catalogs, one for the bare metal internal Hadoop cluster and one for the GCP environment, comparing those catalogs and alerting us to any differences.

This dashboard shows per table the files difference between the bare metal cluster and the cloud

 

In parallel to the data migration, we worked on building the Hadoop ecosystem on GCP, including the tables schemas with their partitions in Hive, our runtime data delivery systems adding new data to the GCP environment in parallel to the internal bare metal Hadoop cluster, our monitoring systems, data retention systems etc.

The new environment on GCP was finally ready and we were ready to migrate the workloads. Initially, we duplicated jobs to run in parallel on both clusters, making sure we complete validation and will not impact production work.

After a month of validation, parallel work and required adjustments we were able to decommission the in-house Research Cluster.

What we achieved in this journey

  • Upgraded the technology
  • Improve the utilization and gain the required elasticity we wanted
  • Reduced the total cost
  • Introduced new GCP tools and technologies

Epilogue

This amazing journey lasted for almost 6 months of focused work. As planned the first step was to use the same technologies that we had in the bare metal cluster but once we finished the migration to GCP we can now start planning how to further take advantage of the new opportunities that arise from leveraging GCP technologies and tools.

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 2

Previously on our first episode of the trilogy  “Hadoop Research Journey from bare metal to Google Cloud – Episode 1”, we covered our challenges.

In this episode, I am looking to focus on the POC that we did in order to decide whether we should rebuild the Research cluster in-house or migrate it to the cloud.

The POC

As we had many open questions around migration to the cloud, we decided to do a learning POC, focusing on 3 main questions:

  1. Understand the learning curve that will be required from the users
  2. Compatibility with our in-house Online Hadoop clusters
  3. Estimate cost for running the Research cluster in the Cloud

However, before jumping into the water of the POC, we had some preliminary work to be done.

Mapping workloads

As the Research cluster was running for over 6 years already, there were many different use cases running on it. Some of which are well known and familiar to users, but some are old tech debts which no one knew if needed or not, and what is their value.

We started with mapping all the flows and use cases running on the cluster, mapped users and assigned owners to the different workflows.

We also created distinction between ad-hoc queries and batch processing.

Mapping technologies

We mapped all the technologies we need to support on the Research cluster in order to assure full compatibility with our Online clusters and in-house environment.

After collecting all the required information regarding the use cases and mapping the technologies we selected representative workflows and users to participate in the POC and take active part in it, collecting their feedback regarding the learning curve and ease of use. This approach will also serve us well later on, if we decide to move forward with the migration, having in house ambassadors.

Once we mapped all our needs, it was also easier to get from the different cloud vendors high level cost estimation, to give us a general indication if it makes sense for us to continue and invest time and resources in doing the POC.

 

We wanted to complete the POC within 1 month, so on one hand it will run long enough to cover all types of jobs, but on the other hand it will not be prolonged.

For the POC environment we built Hadoop cluster, based on standard technologies.

We decided not to leverage at this point special proprietary vendor technologies, as we wanted to reduce the learning curve and were careful not to get into a vendor lock-in.

 

In addition, we decided to start the POC only with one vendor, and not to run it on multiple cloud vendors.

The reason behind it was our mindfulness to our internal resources and time constraints.

We did theoretical evaluation of technology roadmap and cost for several Cloud vendors, and choose to go with GCP option, looking to also leverage BigQuery in the future (once all our data will be migrated).

The execution

Once we decided on the vendor, technologies and use cases we were good to go.

For the purpose of the POC we migrated 500TB of our data, build the Hadoop cluster based on Data Proc, and build the required endpoint machines.

Needless to say, that already in this stage we had to create the network infrastructure to support the secure work of the hybrid environment between GCP and our internal datacenters.

Now that everything was ready we started the actual POC from the users perspective. For a period of one month the participate users will perform their use cases twice. Once on the in-house Research cluster (the production environment), and second time on the Research cluster build on GCP (the POC environment). The users were required to record their experience, which was measured according to the flowing criteria:

  • Compatibility (did the test run seamlessly, any modifications to code and queries required, etc.)
  • Performance (execution time, amount of resources used)
  • Ease of use

During the month of the POC we worked closely with the users, gathered their overall experience and results.

In addition, we documented the compute power needed to execute those jobs, which enabled us to do better cost estimation for how much it would cost to run the full Research Cluster on the cloud.

The POC was successful

The users had a good experience, and our cost analysis proved that with leveraging the cloud elasticity, which in this scenario was very significant, the cloud option would be ROI positive compared with the investment we would need to do building the environment internally. (without getting into the exact numbers – over 40% cheaper, which is a nice incentive!)

With that we started our last phase – the actual migration, which is the focus of our last episode in “Hadoop Research Journey from Bare Metal to Google Cloud – Episode 3”. Stay tuned!

Taking the pain out of Data Science – part 3

This is the third and last post on our machine learning framework.
Post #1 covers the challenges we face and gives an overview of our solution.
Post #2 focuses on how we handle our data and make it more accessible.
This part will focus on what we do once our dataset is ready and organized – a framework for building new models, and for deploying them to production.

Modeling challenges and boilerplate

Building a model contains many common parts.

First is handling the input. Even after we sorted out the data with our Data Collection process, we still need to read it and split correctly to train-test, and read data for our simulation process.

Another common part is evaluating the test metrics.
Running the model on the test data, and displaying different test metrics, such as MSE, AUC and other metrics, to see how well the model performs.

Third, is checking the business metrics.
Before trying a model in production, we want to simulate how the model behaves regarding the business KPI’s.
We evaluate a number of metrics that serve as good proxies to the business performance.

Our goal in this framework is to make the life of the data scientists easier – letting them focus on the models rather than writing time consuming, boilerplate code.

Taking the pain out of Data Science – part 3

Model Framework

We wanted to create a framework that includes these parts out-of-the-box.
Runs the fitting process, saves the model, tests the performance and runs simulation.

Model Framework

All the data scientists should focus on, is their model’s logic.
They can use any Spark ML packages, open source implementations or their own in-house implementations; the rest they get “for free” from the framework.

The interface they need to implement is simple:

  • Preparing the dataset – extracting new features, transforming the data.
  • Fitting the model on the data – the actual logic of the algorithm.
  • Returning the column names (features) that are required for the model to operate
  • Saving a representation of the model for later use

interface

Models productization

The final part of the framework is to bridge between research and production.
We want this transition to be as simple and fast as possible, to allow us to reach conclusions quickly and keep improving.

First, we want to allow fast and easy A/B testing of new models.

A quick reminder of how A/B tests work: we split the population into 2 independent groups of similar users.
We serve one group with the treatment – the new model and serve the other group with the control – our production baseline that the system currently uses.
After running for a while, we analyze the data, evaluate engagement & monetization metrics using statistical tests and conclude whether the treatment has managed to provide significant improvement.

Models productization

To support this, we added a step at the end of the framework.
The step reads the model’s coefficients, and updates the A/B test configurations with a variant that will serve a small portion of our users with the new model.

In a similar fashion, once we have a model with proven value – we want to tune it on a regular basis so it will keep learning based on new data we collect.

  • We run the whole modeling flow on a regular basis, triggered by our ETL engine.
    A new model is created with updated variables.
  • Each run, we validate the business metrics, to make sure the model keeps performing well and doesn’t deteriorate.
  • Finally, if the metrics were positive, we update the production configurations.

High level design

Here is a quick high level overview of this part of the system:

Our input data is stored on 2 hive tables, after being prepared by the Data Collection process.
The model is created using the model generator, that initializes the implementation based on the job’s configuration.
The framework then runs all the common parts:

  • Reads the data and splits it into test, train and simulation
  • Calls the model’s fit implementation and saves the result.
  • Uses the model for test data predictions and stores the results for analysis.
  • Runs the model on the simulation data and calculates the business KPI simulation metrics.
  • Saves the model for later use on HDFS.

All the results are stored as on Cassandra.

The last part is the productization step:
It gets the updated model variables from the fit output, validates the simulation metrics to verify the model’s performance, and updates the production configurations on MySQL.

All the results are stored as on Cassandra.
Takeaways

To sum up, here are the key lessons we learned that evolved to this framework:

  • Prepare your data well – to enable high scale modeling, this is crucial!
    I cannot over-emphasize how important this is, in order to avoid drowning in data and spending a lot of the research time with endless queries.
  • Build an effective research cycle – invest the time to build a good big data machine learning framework. It will really pay off in the long run and keep your data scientists productive and happier.
  • Connect research and production – research results are worthless if it takes forever to apply them in your product. Shorter cycles will enable you to try out more models and implementations and keep improving.
    Aim to make this as quick and easy as possible.

Hadoop Research Journey from Bare Metal to Google Cloud – Episode 1

Outbrain is the world’s leading discovery platform, serving over 250 billion personal recommendations per month. In order to provide premium recommendations at such a scale, we leverage capabilities in analyzing a large amount of data. We use a variety of data stores and technologies such as MySql, Cassandra, Elasticsearch, and Vertica, however in this post trilogy (all things can be split to 3…) I would like to focus on our Hadoop eco-system and our journey from pure bare metal into a hybrid cloud solution.

Hadoop Research Journey from Bare Metal to Google Cloud

The bare metal period

In a nutshell, we keep two flavors of Hadoop clusters:

  • Online clusters, used for online serving activities. Those clusters are relatively small (2 PB of data per cluster) and are kept in our datacenters on bare metal clusters, as part of our serving infrastructure.
  • Research cluster, surprisingly, used mainly for research and offline activities. This cluster keeps large amount of data (6 PB), and by nature the workload on this cluster is elastic.

Most of the time it was not utilized, but there were times of peaks when there was a need to query huge amount of data.

History lesson

Before we move forward in our tale, it may be worthwhile to spend a few words about the history.

We first started to use the Hadoop technology at Outbrain over 6 years ago – starting as a technical small experiment. As our business rapidly grow, so did the data, and the clusters were adjusted in size, however a tech debt had been built up around it. We continued to grow the clusters, based on scale out methodology, and after some time, found ourselves with clusters running old Hadoop version, not being able to support new technologies, build from hundreds of servers, some of which are very old.

We decided we need to stop being fire fighters, and to get super proactive about the issue. We first took care of the Online clusters, and migrated them to a new in-house bare metal solution (you can read more about on this in the Migrating Elephants post on Outbrain Tech Blog site)

Now it was time to move forward and deal with our Research cluster.

Research cluster starting point

Our starting point for the Research cluster was a cluster build out of 500 servers, holding about 6 PB of data, running CDH4 community version.

As mentioned before, the workload on this cluster is elastic – at times, requires a lot of compute power and most of the time fairly under utilized (see graph below).

Research cluster starting point

This graph shows the CPU utilization for 2 weeks, as it seen the usage is not constant, most of the time is barely used, with some periodic peaks

 

The cluster was unable to support new technologies (such as SPARK and ORC), which were already in use with the Online clusters, reducing our ability to use it for real research.

On top of that, some of the servers in this cluster were becoming very old, and as we grow the cluster on the fly, its storage:CPU:RAM ratio was suboptimal, causing us to waste expensive foot print in our datacenter.  

On top of all of the above, it caused so much frustration to the team!

We mapped our options moving forward:

  1. Do in-place upgrade to the Research cluster software
  2. Rebuild the research cluster from scratch on bare metal in our datacenters (similar to the project we did with the Online clusters)
  3. Leverage cloud technologies and migrate the research cluster to the Cloud.

The dilemma

Option #1 was dropped immediately since it answered only a fraction of our frustration at best. It did not address the old hardware issues, and it did not address our concerned regarding non optimal storage:CPU:RAM ratios – which we understood would only get worse when we come to use RAM intensive technologies such as SPARK.   

We had a dilemma between option #2 and option #3, both viable options with pros and cons.  

Building the Research cluster in house was a project we were very familiar with (we just finished our Online clusters migration), our users were very familiar with the technology, so no learning curve on this front. On the other hand, it required a big financial investment, and we were unable to leverage the elasticity to the extent we wanted.

Migrating to the cloud answered our elasticity needs, however presented a non-predictable cost model (something very important to the finance guys), and had many unknowns as it was new for us, and for the users that would need to work with the environment. It was clear that learning and education will be needed, but it was not clear as to how steep this learning curve would be.   

On top of that, we knew that we must have full compatibility between the Research cluster and the Online cluster, but it was hard for us to estimate the effort required to get there, and the number of processes that require data transition between the clusters.  

 

So, what do we do when we don’t know which option is better?

We study and experiment! And this is how we entered the 2nd period – the POC.

You are invited to read about the POC we did and how we did it on our next episode of “Hadoop Research Journey from Bare Metal to Google Cloud – Episode 2”.

Taking the pain out of Data Science – part 2

This is post #2 in series of 3 posts covering our machine learning framework. We recommend to read post #1 first to understand the challenges we face and get an overview of our solution.
This part will focus on how we handle our data and make it more accessible – using an on-going data collection process.

Data Collection

The first part of any data science task, is getting a good dataset to work with. We have a lot of data, but preparing the datasets can be a very hard work – you really have to “get your hands dirty” to get the data from all the sources and tables and convert them into an easy to use dataset.

Data Collection

Challenges:

What are the main challenges in creating a good dataset to use?

  • Many output tables – tables that store requests for recommendations, served recommendations, user clicks, profiles for the users and documents and more.
  • Number of data stores – these tables are stored on a number of sources, due to their different nature. Some are Hive tables, some data is stored on MySQL, and on Cassandra as well.
  • Long queries – some of these tables are very big. Querying them, especially for a long date range, can take a while.
  • Irrelevant data – we rarely want data from our entire traffic. Usually we only want some portion of it which is relevant for the current modeling task.

Silos and partitioning:

In addition to these challenges, there are other advantages to a good data collection process.

We want to have the ability to train models on different silos – different population groups, that may behave differently and require different models.

To enable achieving this easily, we add a number of columns and partitions to our output aggregation tables – such as platform, country, language and more.

This allows us to quickly try out our models on different groups.

Output:

We decided to split our output into 2 main parts:

First, a dataset for building models: It will contain only the served recommendations we want (from specific variants of the traffic), and it should contain all of the clicked recommendations plus a sample of the non clicks, in order to have a balanced dataset for learning.

Second, a dataset that will be used for simulation of the business metrics.

Our recommendations are served in widgets, showing a small batch of recommendations.
For this use case, we take only recommendations from widgets that received at least one click.

With this dataset, we can apply our model only on these clicked widgets, and see how well we graded the clicked recommendation compared to the other non clicked recommendations.

Output

The solution – Automatic Spark job

Our solution to solve all these challenges was to create an automatic data collection job.

The job runs hourly, triggered by our ETL engine.
An hourly Apache Spark job aggregates an hourly dataset, with the relevant data; creates the needed partitions; and creates the two outputs described above.

Using Spark was very convenient for this use case. It allowed us to create a readable flow, that queries different input sources, and holds in memory data that is common for both tables before writing the final output to Hive.

 

A quick note on how we monitor our Spark jobs:

It is somewhat of a challenge to understand how a Spark job behaves, other the basic error messages and checking the job’s output.

To make the job’s progress and status more visible, we send metrics from the job’s driver, using HTTP, to our monitoring server, which aggregates them.

This allows us to create simple to use dashboards and understand with ease what is going on.

In addition we send out textual logs to our monitoring server, that indexes them to an ElasticSearch cluster. We can then view the logs with ease using Kibana.

Below is a dashboard example for a demo job, collecting a portion of our data.

dashboard example for a demo job

Stay tuned for our 3rd and last part of this blog post series where we will cover our Spark-based machine learning framework, that allows us to be highly agile with our research tasks, and dynamic as well as robust in pushing our models to production.

Taking the pain out of Data Science – RecSys machine learning framework over Spark, Part 1

Overview

The role of a Data Scientist, or Machine Learning engineer, is becoming more and more valuable in the tech industry. It is the fast growing job in the U.S. according to a LinkedIn study, and was recently ranked as the best job in America by Glassdoor.

However, the life of a Data Scientist isn’t easy – the job requires good Math and Statistics knowledge, programming background and experience, and “hacking” skills, in order to get things done. This is especially true when handling huge amounts of data of different types.

We, at the personalization team at Outbrain, decided to try and take out the pain of data science, and make our life easier to allow us to perform effective research with immediate production effect.

In this post series I will describe an end-to-end machine learning framework we developed, over Apache Spark, in order to address the different challenges our Data Scientists and Algorithm Engineers face.

Outbrain’s recommendation system machine learning challenge:

Our goal is recommending stories the user is most likely to be interested in, given the user’s interests and the current context.
We need to rank the stories in our inventory by the probability that the user will click to read, and display the top stories to the user.
Our supervision is the past user actions – given a user and a document with a set of features, did the user click or not.

Our data and features:

Outbrain generates a lot of data.
We get over 550 million unique monthly users, generate over 275 billion recommendations monthly and have more than 35 million clicks a day.

The first part to computing quality recommendations, is representing well our key players: The users and the documents.

We extract semantic features from each document that has our widget installed, using an NLP engine. The engine extracts semantic features on some levels of granularity, from high level categories to very granular entities.
For example, on the ‘Westworld’ story below, we extract:

  • High-level categories, such as entertainment.
  • Lower level topics – such as TV or murder.
  • Entities – persons, locations or companies, that the document discusses.

We represent our readers with similar semantic features.

We create an anonymous profile for each reader, based on content they were reading.

Each profile is an aggregation of the semantic features of each document the user read before.

Predictive models:

We use a variety of models, from three main types:

Content based models –

These models assume that there is a semantic connection between the documents the user likes to read.
The model uses the user profile, to find more documents that have similar semantic features to the ones we found out the user likes.
These could be stories within the same categories, from the same sites or covering a specific person or location.

Behavioural models –

Rather than assuming the user will want to keep reading documents on similar topics, it looks for connections between user interests, to other potential subjects that go well together.
For example, it may find that users that showed interest before on retirement investing, will be interested in the future on heart disease articles.

Collaborative models-

The third type, and potentially most powerful and interesting, are collaborative models.

These models use the wisdom of the crowd in order to recommend new content, potentially without the need to semantically understand it.
The basic idea is to find out readers with similar reading patterns; Find out what they like, in addition to the current user; and recommend these items.

Algorithms in this family use algebraic dimensionality reduction methods, such as Matrix Factorization or Factorization Machines, or finding a new, latent representation of the users and items using Deep Learning neural networks.

 

The process of data modeling consists of many common tasks.
To make this process efficient and enable agile research and productization, we have developed a general framework on top of Spark, containing 5 independent components.

Framework overview:

The system’s key components:

  1. Data collection
  2. Feature engineering
  3. Model training
  4. Offline evaluation and simulation
  5. Model deployment

The same system is used for both research and analysis of new algorithms, and also for running production models and updating them on a regular basis.

Next week, in part 2 of this blog post series, we will dive into the data collection flow which is a key ingredient to machine learning flows, and see how data is made more accessible using an automated Spark process.
Part 3 will cover our modeling framework, developed on top of Spark, that allows us to be highly agile with our research tasks, and dynamic as well as robust in pushing our models to production. Stay tuned!

GIT Mono-branch workflow: Pre-tested commits

Two notes to start with:

  • I would like to thanks Guy Mazuz that helped me with Teamcity configuration.
  • The post was also published on my personal blog.

Prelude

With SVN life was easy. We used Intellij IDEA with Teamcity for CI so it supports pre-tested commits.

It works as follows: instead of directly commit to trunk, there is a button in intellij IDEA “Run remote…” that essentially sends a patch to Teamcity to run all integration tests and when all tests pass it reports back to IDEA, that in turn commits the tested files directly to trunk (==master).

                               image from https://www.jetbrains.com/teamcity/features/delayed_commit.html

What it gives you? a higher certainty that your change hasn’t broken anything. As the team gets bigger (and in outbrain we had more than 100 developers committing to trunk) it becomes essential to have such a gatekeeper that will prevent commits that break the trunk. Otherwise, it becomes tedious to keep the trunk green all the time.

But…

When hitting the same button with GIT there is a small asterisk says: “Pre-tested commits are not supported in distributed VCS”. It still puzzles me why it can’t just send a patch with the exact same flow?! Anyway, we wanted this when moving to GIT so we had to find a solution.

Firstly, we could work with pull requests / dev branches / feature branches. That is a viable solution but it makes development speed slower and requires more clicks and time to get to production (ie: more bureaucracy).

The obvious solution was to commit directly to master without testing first in CI server. Since we were in a small team at the time that was actually the first approach we have taken. It works quite well but does not scale well to big teams.

Another viable alternative is to send a patch to Teamcity for testing, and after the tests pass manually approve the files commit and push. Usually, when I commit/push code I love to take a coffee break or something, while this approach forces me to wait until all tests pass.

Auto-merge

Finally, we took the auto-merge approach: https://confluence.jetbrains.com/display/TCD9/Automatic+Merge

It took a while to configure but eventually, that worked for us (see how to configure it below).

One notable advantage over the SVN approach is that the workstation doesn’t have to be connected to the network to make the actual approval of the merge while in SVN the commit happens from the IDEA itself.

However, there are some caveats. Most notably, this process has more complicated merges that result in more merge commits. It makes the commit tree a bit more obscure and trashed with merges.

How to configure?

We created a development branch on the server for each developer, myn is dev-oshai. On local repo developers works on master as usual and pull from master. Push is made always to the developer branch. Teamcity, in turn, run all integration tests and auto-merge to master when all tests pass.

Configure Teamcity

In Project configuration settings -> Build features add “Automatic merge”. For some obscure reason, the branch to watch should be defined as “+:oshai”. Not sure why…

Note those branches must be configured also as build triggers, otherwise build will not run at all.

Eventually, Teamcity will show all those branches in the branches view:

Configure Intellij IDEA

Actually here there is not too much to configure. When pushing the code, specify the dev-<you-name> branch itself:

Push from the command line

There are 2 alternatives here:

The simple one is to push like that:

git push origin dev-oshai

In case you would like to use git push(without the branch), it is also possible. See here how to configure it:

Enjoy!