dnlsndr
May 1, 2023
read in 7 minutes
AMPS - Asynchronous Message Processing System
Moving to an event-driven cluster and service architecture forced us to rethink several of our services. Especially when it came to asynchronous long-running job executions.

Moving to an event-driven cluster and service architecture forced us to rethink several of our services. Especially when it came to asynchronous long-running job executions.

Why we came up with AMPS

Back in the good old days, our product was built as a large monolithic application. We only had two services, the API and our frontend Tool. Both of them were written in PHP on a custom code basis — so no frameworks or anything of that sort. Long-running workloads were triggered through REST calls, keeping open the connection as long as the service was running and then returning the result once it was done. This could often take up to several hours, so what we did was quite risky, especially when thinking about issues such as network interruptions and workload crashes.

That was a major issue we had to solve for our new architecture for v2 of our system. We had to rethink our whole approach to job processing and came up with a new solution that would be more robust and reliable. Taking a look around on the web to see if any existing projects already fulfilled our requirements, we couldn't make out any. KEDA was the closest we could find, but it didn't fit our needs at that time, so we decided to build our solution. 1

A set of requirements was agreed upon that our system should fulfill, but before that, let me just explain some relevant terms so we're on the same page:

  • Kubernetes: Kubernetes is a container orchestration system that allows you to deploy and manage containerized applications. It is the foundation of our new architecture and the system we use to deploy our services.
  • Message Broker: A message broker is a service that handles the communication between different services. It is responsible for receiving messages from one service and forwarding them to another. It also handles the queuing of messages and ensures that they are delivered in the correct order. Notable examples are RabbitMQ, Kafka, and NATS.
  • Job: A job is a unit of work that needs to be processed. It can be anything from a simple calculation to a complex data import. A job is cloud-event formatted and contains all the information necessary to process it.
  • Workload: A workload is an actual program that is executed to process a job. It can be anything from a simple script to a complex application. A workload is a docker container that is executed on a node.
  • Sidecar: A sidecar is an application that is executed alongside the workload. In our case, AMPS acts as a sidecar to the workload and handles all the communication with the message broker as well as monitoring and logging.
  • TTL: Time To Live. Describes the maximum amount of time a job can be processed before it is considered stalled and rescheduled.
  • Pod: Kubernetes term for a group of containers that are deployed together on a single node. A workload - AMPS pairing is deployed as a pod, where both share the same local network.
  • Node: Kubernetes term for a single machine in a cluster. A node can host multiple pods, and pods should normally be distributed across multiple nodes for optimal performance and reliability.
  • Pipeline: A pipeline represents a chain of multiple workloads that are executed in sequence. The output of one workload is the input of the next one. One AMPS container consumes a job and publishes a new job afterward, which will then be consumed by the next AMPS container etc.

Requirements:

Now that we have a good overview of the terms we're going to use, let's have a look at the requirements we came up with:

  1. Jobs should be able to run as long as they must, even infinitely long if necessary.
  2. One AMPS container can be configured to only process a maximum amount of jobs at any given time.
  3. Workload crashes should not cause the job to be lost but rather cause them to be rescheduled.
  4. Jobs have a TTL after which they're reprocessed if their executing workload has stalled.
  5. Monitoring should be straightforward and integrate well with our monitoring stack.
  6. Workload programs should not have to deal with the underlying message broker.
  7. Workload programs should leverage the parallelism functionality of their respective web servers.
  8. Workload programs can be written in any language.
  9. Workload programs can decide when a job is finished and should be marked as such or if the job should be rescheduled/deleted.
  10. Dead-letter queueing should be supported if a job fails more than a certain amount of times.
  11. AMPS should be capable of publishing new jobs to the message broker allowing for chainability of workloads called Pipelines.
  12. Pipeline steps should be independently scalable.
  13. AMPS should be low on resource usage and not require a lot of memory or CPU power.
  14. AMPS needs to be highly reliable such that no jobs are lost. This includes evacuation of currently running jobs in case of pod shut-down.
  15. Cold start must be immediate. We have many time-critical workloads that need to be executed as soon as possible. This means we couldn't use Kubernetes' jobs as the containers first need to start up which takes a few seconds.
  16. Support multiple message brokers. We currently use RabbitMQ, but we might want to switch to Kafka or NATS in the future.

That's a large list of requirements, but we managed to fulfill all of them. Let's have a look at how we did it.

The Architecture

As already described, we went with a sidecar architecture that had AMPS running alongside the workload. This allowed us to keep the workload as simple as possible and also allowed for much easier migration from our existing workloads that were based on PHP web requests.

I'll refer to the sidecar AMPS container as AMPS from now on, and the workload container as workload.

Once AMPS consumes a new job from our broker, it stores the job in a local manifest and sends the cloud-event formatted body of the job to the workload HTTP server. The workload can then decide how it wants to handle these jobs. The easiest way is for the workload to just run its logic for every web request and then return the result to AMPS to continue processing. But with this architecture, we have a similar problem that we had before. For that AMPS would have to keep open a HTTP request to the workload for the duration of the job execution. A better way is for the workload to just immediately return a 200 OK response and then continue processing the job in the background. By spawning a new process. Once the process has finished successfully, the workload main process can then send back the job acknowledgment to AMPS. AMPS then marks the job as finished in its local manifest and starts consuming the next job. If the process fails, either the workload main process sends a negative acknowledgment to AMPS, or AMPS detects that the job has reached its TTL and reschedules it. This way we can ensure that no jobs are lost and that they're always processed in the correct order.

Here is a sequence diagram that shows the architecture in more detail:

The Implementation

Once we were happy with the fundamental architecture of AMPS, we started implementing it. We decided to use Go as the programming language for AMPS as it is a compiled language that is very fast and has a small memory footprint. It also has very good support for concurrency and parallelism, which is a key priority. We've also contemplated using Rust but decided against it as it would have taken us much longer to implement the same functionality in Rust than in Go and would make it harder for engineers to maintain since I would be the only one to have experience with Rust. Don't get me wrong, after the fact I would've loved to implement it in Rust, but at that time it just wasn't feasible.

So now let's have a look at some of the above requirements and how we addressed them. We won't go through all of them but rather focus on the most important ones.

1. Jobs should be able to run as long as they must

This was addressed through our architecture, deciding on holding a local manifest within the AMPS container of all currently processed jobs. As long as a job does not have a TTL, it will be held in the manifest infinitely until the job is done, it gets rescheduled or deleted.

2. One AMPS container can be configured to only process a maximum amount of jobs at any given time

This was enabled by creating a state machine, that closes the queue consumer to the broker, any time the maximum parallel job count is reached. Thus the broker does not allocate the AMPS instance any new jobs until it starts consuming again. One tricky part was that we could not just close and open the connections since that would use a lot of resources on the broker and AMPS side. We had to figure out broker-specific strategies to disconnect from a queue, while still keeping open the connections.

4. Jobs have a TTL after which they're reprocessed if their executing workload has stalled

We implemented a TTL reconciler process within AMPS, that periodically loops over the local job manifest and checks if any job has timed out. If so, it deletes the job from the local manifest and reschedules it. An issue that needed solving is the reader-writer problem of the manifest. Especially in pipelines with high job throughput, a reconciler had to lock the whole manifest to then loop over all existing jobs and clean them up. At that time, no new jobs could be inserted or deleted from the manifest. A solution is to simply keep the max-parallel-job count low so as to not have too many jobs in the manifest at any given time.

5. Monitoring should be straightforward and integrate well with our monitoring stack

Our internal monitoring stack is based on Prometheus and Grafana. We try to incorporate as many of our services into this stack as possible. For AMPS we decided to expose a Prometheus metrics endpoint that can be scraped by Prometheus. This endpoint exposes metrics about the current job count, the current job queue, job execution duration, and more. This allows us to easily monitor the health of AMPS, create alerts if something goes wrong, and scale AMPS up and down based on the current usage.

15. Cold start must be immediate

Since AMPS and its workload are designed to be continuously running, there is no real difference between cold-start and warm-start. It just immediately processes the next job in the queue, which comes in especially useful for time-critical workloads such as our live-shopping system.

16. Support multiple message brokers

AMPS is designed such, that a new message broker can easily be added as a plugin. We leverage interfaces to guarantee, that every broker plugin implements the same functionality. This allows us to easily add new brokers in the future and separate testing environments from broker requirements.

AMPS is now used in more than a hundred services in our cluster and has held great!

As you may see, commits to the repository have been low lately. That's not because the project is abandoned, but because it's been working so reliably, that there wasn't much need for change.

If you're interested in the project, we've open-sourced it on GitHub. Feel free to check it out here

Footnotes

  1. Although KEDA didn't fit our needs then, it seems to have gotten many new features and we're currently evaluating if we can use it for our system.
You know what they say: If it's gray, it's healthy
Made with
in Munich