tech blog

AppNexus is today’s most powerful, open, and customizable ad tech platform. Advertising’s largest and most innovative companies build their businesses on AppNexus.

an_message: Format Agnostic Data Transfer

| Comments

Every distributed RESTful system has a communication problem. How does Service A communicate with Service B? Does it pass data via multipart/form-data? Does it pass individual fields on the query string? Does it POST a blob of JSON?

With the proliferation of “RESTful” services the trend is decidedly towards JSON and away from XML. JSON is relatively compact and fast to parse (at least for most services the bottleneck is not parsing the JSON). This works well for most “wait based” services (database lookup, file reads, etc.) However, there is a class of services in the ad-tech space (and elsewhere) that have more stringent SLA’s for which JSON parsing is actually a significant portion of the runtime of a single request. For these services we can do better while still keeping the schematic safety of JSON in place.

There are a myriad of data serialization formats in the wild; from Thrift to Protobuf to Avro to JSON to Cap’n Proto. Each lands in a different place in the spectrum of performance as well as a different place in the spectrum of type safety. JSON, being a subset of JavaScript, has relatively weak type safety and is slow to serialize/deserialize when compared to more compact binary formats because it requires string parsing and marshalling of types. Avro offers more type safety but falls down if you want to deal with unsigned integers and is slower to serialize/deserialize than Protobuf. Protobuf is actually pretty good on both the type safety and speed spectrums but cannot compete with Cap’n Proto in the speed category, the latter being close to memcpy speed on X86.

Some benchmarks

1
2
3
4
5
6
7
8
9
10
11
Native serialize ticks: 11158.400000
Native deserialize ticks: 1114.800000

JSON serialize ticks: 32403.200000
JSON deserialize ticks: 9687.200000

Proto serialize ticks: 12280.000000
Proto deserialize ticks: 3921.600000

Avro serialize ticks: 33452.400000
Avro deserialize ticks: 18224.000000

The above is the average of 10 runs serializing and deserializing an eight field message. The numbers are in cycles. I have thrown out the first result from the average as it ran under a cold cache.

On the deserialization side, Native gives us a ~9 fold increase in speed over JSON and a ~3.5 fold increase in speed over Protobuf. On the serialization side, the numbers are less dramatic but we still see a ~3 fold increase over json and numbers on par with Protobuf serialization. Avro is abysmal but this could be an artifact of the avro-c library and not the underlying format.

So why not just choose the fastest possible format and use that plant wide? There are very valid reasons a developer may choose to use JSON over one of the faster and safer formats. Maybe they are interoperating with an external service that sends JSON. Maybe there is a need to ship their service’s data to some UI to be interpreted by JavaScript. A developer might choose to use Protobuf because the tooling in their language of choice is relatively good for Protobuf and much weaker for something like Cap’n Proto or Avro.

To take this a step further, as we move between languages and runtimes in a significantly complex ecosystem we can easily encounter many services, each wanting to produce and consume a different wire format. Most places tend to settle on the lowest common denominator for all services (hence JSON) to simplify the contracts between all services. They trade performance for simplicity. At AppNexus, we can’t actually afford to trade performance but we want to keep the simplicity. We also want to layer on really good type safety and allow developers to work within the normal bounds of their language/runtime. What if I want to receive JSON from some external service but send Protobuf to some internal service? What if I want to receive Protobuf from some unnnamed ad exchange and retransmit internally using an X86 wire format to squeeze as much performance out of internal components as possible? To solve these problems we created an_message.

What is it?

an_message is schema definition, code generation, and wire format agnostic transmission for structured data. This allows sending and receiving services to merely agree on the schema for that structured data but disagree about the transmission format. The messaging layer itself will deserialize incoming structured data and allow the programmer to interact with an object in the preferred language (currently only C is fully supported but other languages are on the way). This means Service A can send JSON to Service B which can then deserialize the message into an internal C struct, which is then sent to Service C as Protobuf all using the same API. By abstracting away the vagaries of individual structured data formats we allow the developer to rely on a rock solid serialization/deserialization library and enable them to avoid writing error prone parsing code.

It is split into a few logical pieces.

  • Message definition
  • Code generation
  • Runtime execution

Message definition

We define all messages in our systems in a centralized location. Each message has a message_id that uniquely identifies this schema for all time. message_ids are fixed, unique, and can only be used once. For each field in the message we declare its name, type, and an optional default value. We have support for nested messages, externally defined messages, and lists.

A message definition resembles:

A message definition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
	"type" : "foo",
	"message_id" : 100,
	"version" : 1,
	"fields" : [
		{
			"identifier" : "timestamp",
			"type" : "timestamp",
			"protobuf_type" : "fixed64",
			"index" : 1
		},
		{
			"identifier" : "my_field",
			"type" : "string",
			"index" : 2,
			"default" : "--"

		}
	]
}

These messages can get arbitrarily complex and, aside from map support (possible future development), can basically represent any object we would like to ship over the wire (or store in a file).

After we have defined a message we run a code generator on it.

Code generation

At compile time we take the above message specification and generate serialization and deserialization functions using an internal tool called bpgen. This is a python program that utilizes jinja2 templates to generate code for each serialization format, and in the future, each language that we care to support. As of this writing we support:

  • JSON
  • Tab delimited ASCII (used as logging)
  • Protobuf
  • Avro
  • Native

Native format is an X86 packed memory layout derivative with some allowances for variable length strings and arrays.

The above schema would resemble this C struct after compilation:

After bpgen
1
2
3
4
5
struct foo_message {
        struct an_message h;
        uint32_t timestamp;
        char *my_field;
};

And some functions get generated for me:

Base message functions
1
2
3
bool foo_message_init_message(void *blob);
void foo_message_free_message(void *blob);
size_t foo_message_deserialized_size(void);

In addition to these base functions, bpgen will generate functor objects for each message format we desire to support. Each functor object implements the an_message_type API seen below. Then we can store a mapping between message_id/format and the related functor object. Serialization or deserialization at that point is merely finding the correct functor and calling the function pointers on it. This is all hidden behind an API.

The an_message API looks like:

an_message API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
 * @brief interface struct for message processors
 *
 * The main idea here is that handlers of an_messages register themselves with the an_message core.
 * When the core encounters a message_id/format combination that matches the `struct an_message_type`,
 * this register object is used to serialize/deserialize the message.
 */
struct an_message_type {
        const char *name;
        uint16_t message_id;
        uint16_t message_version;
        bool (*init_message)(void *msg);
        void (*free_message)(void *msg);
        size_t (*deserialized_size)(void);
        bool (*deserialize)(an_message_t *restrict dest, const void *restrict mem, const size_t mem_size, size_t *used_mem);
        size_t (*serialize)(const an_message_t *source, an_message_buffer_t *buffer);
        bool (*reflection_field)(const char *field_name, struct an_message_reflection_field *field);
        const char **(*reflection_fields)(void);
};

/**
 * @brief Read the framing out of mem and call any registered callbacks for that message_id after it is parsed.
 *
 * @param buffer the source of the data to parse
 * @param message_id will be filled with the parsed message_id
 * @param dest bytes of message_size size will be allocated at *dest.. dest must be large enough to hold MAX_MESSAGE_SIZE
 * @param message_size the number of bytes that make up the message, including the header
 * @param format the format type of the message to parse
 * @return the an_message_type object used to do the parse, NULL on failure
 *
 * This will allocate the message memory using an_malloc_region and hand it to the caller.  Up to the caller to free
 * using an_message_free()
 *
 */
const struct an_message_type *an_message_parse(struct evbuffer *buffer, uint16_t *message_id, an_message_buffer_t *dest,
    uint32_t *message_size, enum an_message_protocol_format *format, uint8_t *message_version);

/**
 * @brief opposite of an_message_parse, serialize a message into an evbuffer
 *
 * @param[in] format see enum definition.. the format of the message on the wire
 * @param[in] message_id schema identifier of the message
 * @param[in] msg the source of the data
 * @param[in] message_size size of the struct
 * @param[out] serialized_size the final size of the serialized data
 * @param[out] buffer the destination of the data
 * @return true on success
 *
 * This will frame the message on the wire in the protocol framing format so it
 * can be parsed by an_message_parse at the other end
 */
bool an_message_serialize(enum an_message_protocol_format format, uint16_t message_id,
    const an_message_t *msg, uint32_t message_size, size_t *serialized_size,
    struct evbuffer *buffer);

I have elided a good portion of the API for clarity purposes.

We have an internal dependency on libevent. struct evbuffer is a libevent chained buffer implementation into which we serialize the bytes and header of a message.

This allows me to do the following in code:

Runtime execution

Send a message
1
2
3
4
5
6
7
8
9
10
11
struct evbuffer *buffer = evbuffer_new();
struct foo_message fm;
if (foo_message_init_message(&fm) == true) {
	fm.timestamp = time(NULL);
	fm.my_field = strdup("This is my_field");
	size_t serialized_size = 0;
	an_message_serialize(AN_MESSAGE_PROTOCOL_FORMAT_PROTOBUF, fm.h.message_id, &fm, sizeof(fm),
	    &serialized_size, buffer);

	foo_message_free_message(&fm);
}

an_message_serialize looks up the message/format serialization functor object and executes the serialization function of that type (AN_MESSAGE_PROTOCOL_FORMAT_PROTOBUF) for me, putting the serialized data into buffer. It also includes a message header that wraps the Protobuf data and indicates the message_id, message_format, message_version, and message_size of the encapsulated message so a receiver of this message can easily parse it. This header approach also allows us to intermingle messages of different schemas or types on the wire.

Note that the developer only deals with filling in the fields of a C structure. For dynamically allocated memory (the strdup above), we provide a smart free_message function which knows to deallocate only if the developer has overridden the default value on the message.

Later in some client code:

Receive a message
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
an_message_buffer_t *message = an_message_buffer_new();
struct evbuffer *receive_buffer = read_from_socket(...);
struct foo_message *fm = NULL;
enum an_message_protocol_format message_format;
uint32_t message_size = 0;
uint16_t message_id = 0;
uint8_t *message_version = 0;


struct an_message_type *message_type = an_message_parse(receive_buffer, &message_id, message, &message_size,
    &message_format, &message_version);

fm = an_message_buffer_data(message);

printf("Received message, my_field = '%s'\n", fm->my_field);

message_type->free_message(fm);
an_message_buffer_free(buffer);
evbuffer_free(receive_buffer);

Conclusion

Using this message serialization layer we are able to get the best of all worlds. We:

  • Interact with native types in the language we want or need to work in
  • Abstract parsing code from the logic of the program to avoid parse logic e’erywhere
  • Interoperate with variable message formats from both internal and external clients
  • Get auto-upgrades when we add support for another format (Cap’n Proto is on the list)
  • Defer the format decision to the engineer best equipped to make that decision for their service or application

It’s not all roses though. There are some gotchas to an approach like this:

  • message_id conflicts.

Since message_ids are fixed and we work on feature branches in git, it is actually quite easy for two separate developers working in isolation to choose the same message_id. This presents problems when they both merge their features down to the master branch. To mitigate this problem, we have actually added a message_id uniqueness step to the build script. Compilation will fail unless you resolve the duplication. This isn’t fool proof as multiple versions can be out in production which step on each other’s message_ids. A better solution here is some sort of central registry. Still thinking about it.

  • Debugging is harder.

We no longer send text over the wire so seeing issues in communication is more difficult. We have some tools that can automatically translate between various types but this adds a step to debugging.

Comments