Infrastructure for Predictive Analytics with Machine Learning and Big Data

Jul 26, 2018

Predictive Analytics at ByteGain

Predictive analytics at ByteGain is the art of making educated guesses on user behavior. We answer questions such as “will this user make a purchase?”, “will this user sign up for the newsletter?”, “will this user churn?”.

Our technology creates deep learning models to predict user behavior. To train such models, we collect and analyze lots of data. Hence, we deal with “big data” on a daily basis.

We will describe the infrastructure we built at ByteGain to collect data, train models, serve real-time predictions, and analyze the results. Our infrastructure lives 100% in the cloud, mostly GCP and Heroku, with a bit of AWS and Azure for custom projects. Our infrastructure is fully compliant with legal requirements, such as GDPR, with regard to data collection, handling and storage.

Collecting (Big) Data

Models are trained using historical data. The more data that can be analyzed, the better the models and the better the predictions. The quality and consistency of data is an important factor.

ByteGain can and does work with existing data a customer may already have (via analytics libraries such as Segment or Google Analytics. But our preference is to instrument a customer’s website or mobile application with our own library and collect the data exactly how we need it.

The journey starts by asking our customers to install our bytegain.js library on their website or mobile SDK in their mobile application. Note: from here on website or customer application will be used interchangeably to denote either a pure web application or mobile application or both. The idea is the same, data is collected from a customer’s application and predictions are served back to the customer’s application.

Data is collected in real-time by our client side libraries and sent to our servers. Users are tracked as either anonymous users or authenticated users if they are logged in to the customer application. On transitions from non logged in to logged in, collected events are stitched together in a single user session.

Architectural diagram of how data flows from the clients (customer applications) to our servers:

Events describe interactions between the end user and the application. There can be different types of events, e.g. page view, click track, scroll track, purchase completed, newsletter signup, etc. They will later on be turned into event sessions for a given user and some will constitute negative or positive labels for model training.

Events are collected on the client side and are POSTed over HTTPS as JSON payloads. An event payload may look similar to:

{
  "anonymousId": "7969121...",
  "bg": {"ipCountry": "US"},
  "channel": "client",
  "context": {
    "page": {
      "path": "/news/.../5.html",
      "referrer": "https://example.com/...",
      "url": "https://example.com/...",
      "title": "News"
    },
    "ip": "8.8.4.4",
    "userAgent": "Mozilla/5.0 (iPhone; CPU iPhone OS 11_2_6…)"
  },
  "messageId": "02f7a...",
  "properties": {
    "path": "/news/.../5.html",
    "referrer": "https://example.com/...",
    "url": "https://example.com/...",
    "title": "News"  },
  "receivedAt": "2018-04-16T16:50:19.917145Z",
  "sentAt": "2018-04-16T16:50:19.734Z",
  "timestamp": "2018-04-16T16:50:19.913145Z",
  "type": "page",
  "userId": null
}

The entry point for data collection on the server side is an endpoint running on Heroku infrastructure. The server is written in Python (see the discussion at the end of the post for our choice of languages) and runs on Heroku PX dynos with auto-scaling enabled. Once events arrive to our server, the data stream is forked into two: one for long-term data archival and another one for real-time serving of model predictions.

Data Storage for Real-time Serving

Events are immediately stored in a MongoDB database, running in the same AWS region, for low latency. We are observing under 5ms latency for reads and writes to the database which is plenty fast for our purpose. In the past, we used Redis instead of MongoDB. While the Redis latency was much lower (under 1ms, being an in-memory database), scaling it to large amounts of data can become cost inefficient, in particular when very low latency is not a requirement for an application.

Events are aged out of MongoDB. We typically do not store more than 100 recent events per active user. Also, not all event types need to be stored for real-time prediction serving purposes, some events are filtered out and do not get written to MongoDB. Examples includes events that are not currently used in model training (but may be used in the future), events that have no value in making a prediction (such as positive or negative label events), logging events, etc.

How these events are used to make real-time predictions will be addressed later in this post, for now just remember that sufficient event data for active users gets cached close to the server.

Long-term Data Archival

Data is POSTed from the Heroku server to our infrastructure running on the Google Cloud Platform (GCP). The entry point on GCP is a Go server deployed in the Google Application Engine Flexible Environment (GAE Flex). Auto-scaling is also enabled for this endpoint.

POSTs between Heroku and GAE are asynchronous so that requests from clients are processed as quickly as possible. On recoverable HTTP errors, POSTs are re-tried and if the GAE server is unavailable (due to maintenance or temporary network issues), events are queued up for delivery at a later time. Asynchronous event delivery can also happen in batches, for greater network efficiency, at the expense of more work for proper error handling and re-tries.

Once events arrive on the GAE server, the data is immediately written to a topic in GCP Pub/Sub. We use a topic per customer as it is convenient to be able to monitor stats for each customer, but at some point we will reach the limit of topics we can have in GCP Pub/Sub. Also, as we add more topics it becomes more difficult to monitor each topic individually. We will address this potential issue when it becomes a problem, but we are not there yet.

Using GCP Pub/Sub has the advantage of fanning out the data to multiple topic subscribers (microservices architecture), e.g. one could archive data in GCP BigQuery for long term storage, another one could put a subset of the data in a fast local database for real-time serving, while another subscriber could compute real-time statistics or monitor the data flow and alert on issues.

One other advantage of using Pub/Sub is temporary data persistence. Pub/Sub persists the data for up to 7 days if subscribers are unavailable to consume the topics. This is very important to prevent data loss and convenient when parts of the system need to be taken offline for maintenance or upgrades. We had such a case recently where we had to stop our BigQuery subscriber in order to add a new column and populate it. (Until recently BigQuery did not allow DML operations on any rows if there was pending data in the streaming buffer. All writes had to be stopped first and the streaming buffer had to be empty which could take up to 90 min.)

We currently have a single subscriber to the topics. Its job is to archive collected data into BigQuery, our long-term data warehouse. We save all collected events, unlike the real-time serving database that only stores a subset of recent events. We honor legal requirements such as GDPR and we delete data from BigQuery if asked to.

BigQuery is our single source of truth for collected data. We use it to analyze data and train deep learning models.

Ideally, the data should be POSTed directly from the clients to our GAE server and that is where we would like to get to. But there are a couple of issues with doing so today:

  • We have invested in the Heroku infrastructure and appreciate the convenience of an easy to use PaaS, fast deployments and ease of trying new services and ideas. Unfortunately, GAE is not as easy to use and deploy to, but this is continually improving.
  • Heroku runs in the AWS East region while our GAE infrastructure is in the Central region. There is significant latency crossing from the Amazon East network to the Google Central network (about 50ms). We must keep the MongoDB instance in the AWS East region, it needs to be relatively quick to write to and read from. For long term archival of events, the added 50ms latency is not critical.

So far we have covered data collection. Key points:

  1. Data is collected using our client libraries
  2. Data is stored in GCP BigQuery for long-term archival and training of models
  3. A subset of the data is stored in MongoDB for real-time serving of model predictions

Training Models

Once sufficient data is collected for a customer application, we can train a deep learning model. We will not go into the actual details of how the models themselves are built (that is covered in our Zap paper), but we will cover it here from an infrastructure perspective.

There is a model training pipeline driver that is scheduled to run on a periodic basis on Google Compute Engine virtual machines (GCE VM). In the future we may make use of a cluster running on GCP Kubernetes Engine (GKE).

The pipeline consists of six stages: session computation, instance computation, training, calibration, validation, archival. Machine Learning details about the stages can be found in the Zap paper.

One nice property of the driver is that it tracks progress through each stage. This means that if the pipeline fails at a certain stage, the process can be resumed without going through the previous stages again. Progress through all stages is logged and statistics are collected and archived. The process is automated to run without supervision and it sends notifications of both successes and failures.

The two initial stages are GCP Dataflow (Apache Beam) jobs. They read the data from BiqQuery that our client libraries collected. Data can also come from other sources such as Segment/Redshift databases, JSON-L, Avro, or Parquet files. This happens in special situations where we haveve integrated with a customer’s existing data sources.

The Dataflow jobs output data for training our models. Model training is done on GCP Cloud ML Engine for Tensorflow. Once a model is trained, it is archived in Google Cloud Storage (GCS). If it passes our validation and verification metrics, it is auto-deployed to production, ready for real-time prediction serving.

Real-time Model Serving

Now that we have trained one or more models, they need to serve predictions. Clients can call a low level REST API and retrieve the actual prediction value (probability value between 0.0-1.0) or a higher level Javascript API. In the case of the low level API, the client must decide what action to take based on the returned value. For the higher level API, the UI components ask the server questions such as “should I show the sign up form?” or “should I nudge the user with a promotional discount?”

Serving is done once again by our Python server on the Heroku infrastructure. The server has a copy of each trained model loaded in memory. Tensorflow models are saved and loaded in frozen form in order to speed up loading and minimize memory resources.

Data used by the model for computing predictions comes from the local MongoDB instance. As you may recall, only a subset of the events is stored in MongoDB, sufficient to make a prediction. Because the database is local, querying for data incurs very low latency, typically under 5ms.

Every prediction is logged. This allows for post serving analysis of model performance, we can look at what the model predicted during a user session vs what the user ended up doing.

Re-training and Deploying New Model Versions

Websites and mobile applications change and evolve over time. So does user behavior in reaction to application and business changes. It is important to constantly re-train new model versions that use the latest available data.

ByteGain re-trains its models on a weekly basis using the most recent collected data. The following process is used for each active model:

  1. Run the training pipeline driver, as described above in the "Training Models" section, using the latest available data. We typically use between 7 and 60 days of historical data, on a rolling window basis, depending on how much data a customer application produces (some websites see a lot more traffic than others).
  2. Validate that the new model version is acceptable by checking certain performance metrics such as model AUC.
  3. Archive the new model version in Google Cloud Storage (GCS), together with stats and supporting data, such as a subset of examples and their prediction values (see the next step why).
  4. Verify that the new model will be served correctly. This is an important step in the process and worth expanding on:
  • As you recall from previous sections, training a model happens on GCP Cloud ML while serving the model happens on Heroku. These are separate servers and services. There is some common code that deals with data processing and cleanup during both training and serving, code that filters and converts JSON events into Tensorflow protos.
  • It is possible that the common code gets out of sync between training and serving. This may go unnoticed because the code does not error out (though that may also happen in some extreme cases). Instead the wrong prediction values are computed during serving. To prevent such mistakes, a number of random examples and their computed prediction values are saved from the training step and archived with the trained model.
  • Verification consists of running the same examples through the model and code path on the production serving servers. We check that we get the exact same prediction values as seen in training. We get alerted if there are mismatches and the new model version is not deployed to production.
  1. If validation and verification pass, the new model version is enabled for production serving.

The first three steps run on GCE VMs (with the intention of making use of GKE in the future). The last two steps must run on Heroku production servers as they need to make use of the exact same code used to serve predictions.

Other Infrastructure

ByteGain relies on Cloudflare services for DNS, CDN and Argo for smart routing.

Final Thoughts

At ByteGain we are very pragmatic. We use the tools, services and languages most appropriate for the job. Python as the data science language. Go for performance. JavaScript as the web language. Objective-C for iOS. Java for Android. Heroku for ease of use. GCP for performance and its useful service offerings. AWS or Azure because we sometimes just have to.

There are always pros and cons to any choices, the trick is to reaching the big picture goal is to start simple, improve over time and adapt as business needs change.

Convert your highest value users

No code required