[Editor's note: Be sure to catch Steve presenting "Rapid Iteration with Python: Scaling AppNexus" with Dave Himrod at PyData NYC on October 26, 2012]
AppNexus serves nearly 10 billion ads per day. On average our ad servers serve over 100k ads per second and an order of magnitude more at peak. Each ad decision is determined by a real time auction, so our system must determine the value of all eligible ad creatives every time an impression is available for bid. We leverage our vast data resources to determine these values and we aim to do so as accurately as possible. However, this requires the application of computationally expensive mathematical models, which could not reasonably be calculated in the milliseconds-long duration of an ad auction. So how is this possible?
Enter the AppNexus Optimization Engineering Team, which builds, among other things, scalable infrastructure for offline calculations relevant to ad valuation. Such calculations include modelling the effect of user frequency1 on click through rate2 and determining the expected value of an ad creative that has never been served before. In the the latter case, we have to pre-calculate values for each ad creative on every segment of inventory (modulo targeting restrictions), resulting in tens of millions of combinations. The Optimization Eng team has built a system designed for this use.
More specifically, we developed a system for splitting up large, CPU-intensive jobs into manageable, independent chunks and distributing them across a cluster of machines for processing. To keep up with our talented quant team, which continues to churn out novel optimization methods, we needed to make the system as flexible as possible and provide for relatively rapid development. We also needed to consider performance, given the nature of the computations we were trying to handle. Thus, we built most of the system in Python but used cython-optimized Pandas data structures in performance critical code paths.
The system consists of the following components:
The scheduler is the brain of the system and determines what processing needs to be done on which data and when it needs to be done. All of our models rely on historical data produced by our ad servers in real time. To enable this, the scheduler listens for updates from our data pipeline, which periodically makes new data available, such as the amount of money spent per campaign or the number of clicks and impressions per creative in the last hour. The scheduler creates task objects based on these updates and dispatches them to a message queue for processing. A task is usually a piece of a large problem that corresponds to a subset of data. One example is calculating expected ad values for all the campaigns and creatives of a single advertiser. The scheduler is designed such that developers can plug in handling logic for different types of data updates. The handling logic is isolated and run asynchronously so that one handler cannot interfere with or block another handler.
The message queue enables different parts of the system to communicate. Because RabbitMQ provides a stable and highly available message queue that can be configured for a number of different relevant cases, we deployed it for this part of the platform. Our data pipeline publishes updates to the RabbitMQ instance and several other systems, including the scheduler, subscribe to them. We have also configured a work queue to which our scheduler can post tasks that are then consumed by worker instances.
Workers do the heavy lifting–they run the code that actually produces our models. A pool of worker instances, which can be scaled to a large number of machines, consumes tasks from the RabbitMQ. A task is a serialized Python object which is first deserialized by the worker and subsequently run in a forked process. Workers communicate status information back to the scheduler via the RabbitMQ instance and write their results to MySQL. Rather than use an open source system such as Celery for our workers, we opted for the control and flexibility of the “roll your own” approach.
MySQL is used to store the output of the worker tasks for consumption by our ad servers. Task statuses and running times are stored by the scheduler in MySQL. The scheduler also persists actual serialized tasks containing the code version that was run and pointers to the data that was used. This has proved invaluable for debugging.
Concurrency manager interacts with the scheduler to enforce rules about task concurrency and priority. As a result, the system will not be over utilized by any one type of task, and high priority tasks always have capacity to run.
In its current incarnation, our work queue system has a dozen hosts and is capable of updating nearly 100 million ad/ad-inventory pairings per hour. It also runs the code responsible for dynamically increasing and decreasing the rate of spend for every campaign on the platform in order to appropriately fill budgets (Price is Right style). Of course, this system is only relevant in the context of jobs that can be sharded as described above. Jobs that have more complex dependencies are not addressed here.
Our system scales horizontally, so as the AppNexus platform grows rapidly, scalability becomes a matter of procuring hardware. Consequently, AppNexus will be able to continue offering best in class optimization, even as the number of ad creatives and inventory placements grows explosively. What’s more, we can continue to innovate and deliver new optimization features.
1 User frequency is defined as the number of times a user has viewed a given ad creative.
2 Click Through Rate (CTR) is defined as total number of clicks on an ad divided by total number of times the ad has been viewed.