I’ll start at the end – my boto3 SQS messaging instrumentation pull request got merged, and boy, was this scavenger hunt eye-opening. Oxeye’s inner communication pipeline uses SQS queues, but in the process of implementing observability in it, we discovered that the botocore instrumentation doesn't propagate context over the messages. So began the saga of creating our SQS instrumentation.
Oxeye is a cloud-native application security solution that detects all application risks across modern, distributed cloud-native microservices architectures. Our solution inspects the whole environment - code and infra and provides cross-dimensional analysis of all application security threats.
For that, Oxeye’s data processing pipeline performs some impressive wizardry stitching together hints of vulnerabilities from various sources across the application. To monitor and better understand such a pipeline, complete observability is a must – and OpenTelemetry (a CNCF incubating project) provides us with precisely that. But, as I mentioned before, our main IPC (interprocess communication) method between microservices is SQS Message Queues, and the context isn’t propagated over them.
The idea behind OpenTelemetry tracing is simple. Some events are “reported” (exported) in a form called spans in a structured hierarchy known as a trace. To form a trace, a span attaches to a context object in the process each time one is created, wherein the trace identifier and last span are stored. OpenTelemetry’s SDK also propagates the context between processes, thereby continuing the trace and permitting us to stitch everything together. When our program sends a message to another (e.g., using an HTTP client), the instrumentation injects the current context into the message. The receiver reads the message, detects the context in it, and passes it along rather than creating a new message. If OpenTelemtry sounds like a spell from Harry Potter to you, or you just want to read more about it, here's a link to my last blog post about writing my first OpenTelemetry instrumentation.
SQS is an AWS message queueing solution. Any app can subscribe to it to send and receive messages using the SDK library (in Python this is called boto3). In general, messaging systems are a good solution for interprocess communication (IPC) as they enable horizontal scaling of the receiver microservice.
I thought, ‘This will be like my pika instrumentation—easy peasy. I just need to wrap the sending and receiving functions to propagate the context over some carrier.’ Looking into Amazon’s API, I saw that messages have some headers and properties sections I could use, but then I also noticed there are sending and receiving functions that I must address implementing my instrumentation.
‘What kind of span should we create to send five messages? Is it one span? It’s a singular event, but it can be received by five processes. Are these child spans?’ I’d been working with OpenTelemetry for almost a year, was aware of its specification (i.e., like a constitution that all implementations in all languages have to follow), but hadn’t had to ponder such questions when working on the pika instrumentation (where there was a callback for each message and no support batch sending).
I had to dig deeper into the OpenTelemetry spec, which defines the behavior in precisely these edge cases, but finding the answer resulted in a larger question. The specification is detailed regarding which spans should be generated when batch sending, receiving, and processing. All the information you need is there, except for defining which carrier should be used for propagation. OpenTelemetry “rides” on a key/value store of each message to propagate the context (for example, HTTP headers), called a carrier, and it was not defined. This is because the spec is comprehensive for all messaging systems—one definition for all.
On the one hand, this makes sense; the spec is technology-agnostic. The specification works even if your messaging system doesn’t support headers, properties, additional information, or some other section where you can send message metadata. But that said, people who implement the instrumentation in two languages might not be the same, which could result in the usage of different carriers and therefore in the context not being propagated.
For example, imagine a scenario where you implement context propagation in a Python package over SQS message attributes. Also, imagine that someone else decides to use a carrier pigeon (or the message properties) for propagating their context within Java instrumentation. When that receives a message with the Python-propagated context, it’ll ignore it, and vice versa.
I decided to read AWS SDK instrumentation code in all the packages in OpenTelemetry’s Github repos. There I found that only NodeJS and Ruby have implemented an instrumentation that propagates context over the message (not over AWS' X-Ray headers)—both use message attributes for that.
So message attributes it is.
After that education, I looked at AWS API documentation to read about attribute structure, where I encountered other potential instrumentation setbacks.
On top of that, the response for the ReceiveMessage API call doesn’t contain the attributes if you don’t explicitly request them.
Assume you have a context and an instrumentation that wants to have it passed on using some carrier (e.g., headers, attributes). Three objects participate in the process: the propagator, the getter, and the setter. The getter and the setter are instrumentation specific: they know how to set and extract a single key from the carrier.
The propagator performs the magic. It can translate context into a set of keys and values, and it can load context from the same set. OpenTelemetry uses all of these for propagation. By default, it uses two propagators: the baggage propagator and tracecontext propagator.
If I would ignore the SQS attribute limitations when implementing the instrumentation, there could be propagators that set attribute keys that don’t match the AWS API spec—which will result in the message being denied because the user used OpenTelemetry. On top of that, the instrumentation needs to know what the propagator’s headers are before executing the ReceiveMessage call. But this isn’t possible because the propagators are executed after the result is received.
In the instrumentation, the getter and setter append and remove a magic prefix for each key. When SendMessage is called, an otel. prefix is added to each propagator’s requested attribute. So when calling ReceiveMessage we request the otel.* wildcard prefix. This way all propagation keys will be there (if added when the message was sent).
This doesn't help the ten attribute limit or the inability to use periods within keys. However, both NodeJS and Ruby implementations don’t add attributes if the count passes 10, and neither validates periods, so I adopted this same behavior.
But now the implementations in other languages don’t work with mine (exactly what I wanted to avoid). If an instrumented NodeJS app enqueues an SQS message and it’s handled by an instrumented Python app, the context isn’t propagated. But worry not, I intend to align the other instrumentations as well.
Another issue I encountered was deciding when a span should end, and which span is being processed. In pika, a callback is triggered for each received message, with that being a clear definition as to where message processing begins and ends (and that all spans in-between belong to the context). But in boto3, the user simply requests messages, receives a list of them as a response, then does whatever they want.
How to know which message is being processed? What if a user decides they always ignore the first message within a batch, or apply some sampling strategy and handle only every third message? (which is what I suspect my mailman does). How can I know which span is being processed and its current context, such that future spans will follow from it?
I derived a workaround based on a similar kafka instrumentation problem. That package has a similar API of iterating over a list of results, and the instrumentations had to handle it. Their solution was simple — only one message can be handled at a time, so a processing message is attached to the context until the next one is requested. The returned object is iterated and cannot be directly accessed, so the instrumentation just wrapped __iter__. My case was a bit different, as RecieveMessage returns a list of messages.
In my instrumentation I decorated ReceiveMessage so that whenever it’s called, a proxy object of the original response is returned, implementing __getitem__ and __iter__. When a specific message in a proxy object is accessed, its span is attached to the current context, and the previous one gets detached. All spans end when the DeleteMessage API is called for their messages.
My colleagues and I remained bothered that the OpenTelemetry specification remains somewhat undefined. Each of its SDKs can implement its own interpretation, thereby missing the whole concept of OpenTelemetry tracing.
Luckily, we soon engaged in enriching conversations with many OpenTelemetry maintainers and specification leaders at KubeCon. We discovered that sharp minds are still working to further define the OpenTelemetry messaging spec. I hope they will find a solution to AWS’s limitations.
This journey was interesting. I have learned more about OpenTelemetry’s inner workings and the importance of well-defined planning before implementation. Behind every few lines of code lie hours of research and comprehension. This odyssey has also prepared me well to help a colleague implement the confluent_kafka instrumentation a few weeks later, but that’s a story for another time.
OpenTelemetry is an amazing tool. I encourage you to choose your favorite package and contribute an instrumentation — just be sure to read the docs first!