A workload-distributing microservice architecture

IliasArtboard-8-copy

Ilias Mansouri

Computer Vision Engineer

Throughout this series of posts, we will show you how to set up and deploy an on-premise computer vision pipeline. Rather than ingesting live video feeds, we will focus on a pipeline that ingests pre-recorded videos. Video recordings are typically used for post-analyses and post-processing in such applications. Examples are: CCTV Feed analysis in security, post-match analysis of sports broadcasts, automated object annotation of video data, video summarization and compression.

Architecture

While cloud-based solutions facilitate the creation of flexible and scalable machine learning pipelines, not every industry can take advantage of them. In some industries, on-premises infrastructure is required due to tight regulations. By others, like financial services, putting sensitive information in a cloud is considered too risky. Last but not least, some use cases require real-time data processing, and therefore, AI models must be close to the live data in order to avoid any loss in terms of inference time.
 

As a result, each layer of the shared responsibility model should be considered when designing, developing, and deploying on-prem ML pipelines. 

This means that the following needs to be properly managed: 

  • compute resources 
  • storage solutions  
  • databases technology  
  • networking 

Let’s look at a hypothetical CV pipeline as illustrated below: 

This pipeline would consist of 3 stages:  

  1. At the pre-processing phase we make sure that the video is ready for further processing. This can consist of the following: making sure that the video has a constant framerate and/or format, selecting colour channels, downscaling, framerate decimation and other. 
  2. Afterwards, we can have either a series of classical CV tasks like: Edge detection, Fourier Transforms, Hough Transforms, project camera pixels to world coordinates, etc. Or DL-based methods: Object Detection, Pose Estimation, Segmentation, Human Parsing, Image captioning. 
  3. Finally, we can have steps which do not rely on a complete frame as an input but rather a subset (image crops from an object detector for example). Additionally, it may be useful to combine a previous’ step results with perhaps part of the input data and send it to a Cloud-based endpoint for a specific service. 

It follows that GPU usage management becomes a key-element in determining the overall pipeline's processing speed if we assume the majority of tasks are ML or DL-based in the 2 final stages. Therefore, a good heuristic is to always keep the GPU busy with the greatest inference performance. Optimizing input dimensions and batch size for each neural network is a low-hanging fruit. Increasing speed can also be achieved by using another neural network architecture, pruning models, or reducing model size through quantization, knowledge distillation, or weight sharing. 

The read/write speeds of storage differ widely not only depending on the storage medium, e.g. HDD vs SSD, but also depending on how the data is stored. Various storage formats, based on media, promise improved reading or writing speed, data streaming, and even compression. This is closely related to the importance of selecting the right database to store the desired artefacts. 

To conclude, on-premise servers should have a distributed but cooperative application structure that handles requests from clients/users and distributes them to different tasks/services. The main communication protocols between client and server are: 

  • REST API 
  • gRPC 
  • IPC 

HTTP and gRPC both communicate over the network, but on different layers, the application layer and the transport layer, respectively. REST APIs are widely supported, whereas gRPC uses protocol buffers to exchange messages. IPC allows to share memory across different processes and thus alleviating CPU load in terms of I/O operations. 

 

Probably one of the first steps toward implementing the aforementioned shared responsibility model would be to implement each task as a microservice. As a result, we can scale each service independently according to its unique properties and requirements. We can not only scale each service independently using Docker and Docker-Compose, but also create a modular and composable pipeline where each service can live in its own environment without worrying about compatibility issues between frameworks or libraries. In addition, we now have fine-grained control over which services may access GPU resources. 

In this post

In this first part we’ll explain how to setup the following building blocks: 

  • a Celery Client REST API with Flask using the OpenAPI specification 
  • Celery’s task queue to distribute tasks across workers on separate threads or machines using RabbitMQ 
  • A worker node which monitors the queue and starts tasks processes 
  • How to store the results in Redis DB and visualize them using RedisInsight 

Asynchronous Task Queues

Applications can use task queues to execute long-running tasks while maintaining user responsiveness. In these queues, applications place tasks with enough data to allow them to proceed. In the background, workers monitor the task queues to perform the work. Afterwards, the results can be written back to another queue or even to a database. 

Using Celery in Python, a supervisor process is created, which in turn spins up a bunch of other workers or executors. A client initiates a task by adding a message to the queue, which is then delivered to the worker by a message broker. 

The message broker will be RabbitMQ, the results will be stored in Redis, and we will interact and inspect our database with RedisInsight. 

An example of how to create a Celery application or instance is provided below. Celery’s main entry-point is here, where you can create tasks and manage workers. Celery must therefore be importable by other modules (more on this later). 

 

A minimal Celery app contains a: 

  • name  
  • broker URL 
  • task 

Despite not being mandatory, we have already defined the backend URL for storing our results. Furthermore, we define a processing route which is based on task names. Finally, we decorate our add task as to go through the processing task route. 

With our Celery application defined, we’ll discuss how to interact with it through a REST API. 

from celery import Celery

REDIS_URL = "redis://redis:6379/0"
RABBITMQ_URL = "amqp://guest:guest@rabbitmq:5672//"


celery = Celery(
    "Celery Tasks",
    backend=REDIS_URL,
    broker=RABBITMQ_URL,
)

celery.conf.task_routes = {
    "processing.*": {"queue": "processing"},
}


@celery.task(name="processing.addition", max_retries=5)
def add(x, y):
    return x + y

REST API

The Connexion framework and the swagger_ui_bundle package are used to build a Flask application that adheres to the OpenAPI Specification standard. We will be able to map Python functions to endpoints using Connexion, a framework on top of Flask. The swagger_ui_bundle contains the necessary static files to render the Swagger UI. 

import connexion
from swagger_ui_bundle import swagger_ui_path
import config
from app.flask import error_handler

def create():
    options = {"swagger_path": swagger_ui_path, "swagger_url": "/"}
    app = connexion.App(__name__, specification_dir=config.SWAGGER_DIR, options=options)
    app.add_api("swagger.yaml", strict_validation=True, validate_responses=True)
    return app

In order to create our Flask app, we specify a directory containing a detailed API description that is either YAML or JSON, as well as the Swagger package for rendering the UI. 

Let’s have a look at the Swagger definition file: 

swagger: '2.0'
info:
  description: Our first example of REST API with Celery.
  title: REST API with Celery
  version: 1.1.0
paths:
  /process:
    post:
      operationId: app.flask.routes.API.add_processing_request
      summary: >
       Run or processing pipeline.
      parameters:
        - description: First number
          in: query
          name: x
          required: true
          type: integer
        - description: Second number
          in: query
          name: y
          required: true
          type: integer
      produces:
        - application/json
      responses:
        200:
          description: OK

The version of the OpenAPI Specification must be included in every API definition. As a next step, let’s look at the metadata related to our API: 

  • Title: API name 
  • Description: information concerning API 
  • Version: version of the API 

Afterwards, we define the different endpoints (/process) in our API and the HTTP operations (post) they support. Each HTTP operation is then mapped to a specific Python function (operationID). 

Parameters, which are passed via the URL path, can be described by defining their data type, whether they are required, and more. 

Similar to the example above, tracing down the Python function from the operationId yields the following celery object: 

import logging
from config import LOGLEVEL
from .pipeline import Pipeline
from celery_worker.celery_handler import make_celery

logging.basicConfig(level=LOGLEVEL)
logger = logging.getLogger(__name__)
        
def add_processing_request(x, y):
    logger.info(f"x: {x}, y: {y}")
    celery = make_celery()
    p = Pipeline(celery, x, y)
    p.run()
import logging

logger = logging.getLogger(__name__)

class Pipeline:
    def __init__(self, celery_handler, x, y) -> None:
        self.celery = celery_handler
        self.params = {"x": x, "y": y}

    def run(self):
        task_name = "processing.addition"
        task = self.celery.signature(
            task_name,
            kwargs=self.params,
        )
        r = task.apply_async()
        logger.info(f"{r}")

Currently, the pipeline consists only of our addition function, which wraps our celery object along with our two parameters into a task invocation that can be passed to another function or to a network of celery workers. Celery’s signatures are used to accomplish this, allowing one to chain tasks into workflows as we’ll see later. Finally, messages are sent to our broker using apply_async(). 

Containerization

Before moving on to the broker, let’s see how we can containerize the REST API and Celery worker. Finally, we’ll demonstrate how to use Docker Compose to define and run this multi-container application. 

YAML’s anchors “&” allow us to duplicate content across our compose file by prefixing local variables with an “x-”. 

Using Celery, we define the URL’s to the corresponding brokers and result backends. We use the same logging options across all services. 

version: '3.8'

x-celery-env: &celery-env
  CELERY_BROKER: amqp://guest:guest@rabbitmq:5672//
  CELERY_BACKEND: redis://redis:6379/0
  CELERY_RESULT: redis://redis:6379/0
x-common: &common-setup
  logging:
    driver: "json-file"
    options:
      max-size: "100k"
      max-file: "5"

services:
  celery_worker:
    <<: *common-setup
    environment:
      <<: *celery-env
    build:
      context: ./celery_worker
      dockerfile: Dockerfile
      network: host
    entrypoint: celery -A tasks worker --max-memory-per-child=2000 -Q processing --loglevel=${LOG_LEVEL} --concurrency=8 --prefetch-multiplier=1 
    depends_on:
      - rabbitmq
      - redis
  api:
    <<: *common-setup
    build:
      context: ./api
      dockerfile: Dockerfile
      network: host
    entrypoint: gunicorn -b :${API_PORT} --workers 10 --timeout 3600 --log-level=${LOGLEVEL} run:app
    ports:
      - ${API_PORT}:${API_PORT}

The Celery worker’s entry point can be explained like this: 

celery -A tasks worker --max-memory-per-child=2000 -Q processing --loglevel=${LOG_LEVEL} --concurrency=8 --prefetch-multiplier=1 
  • use the celery command 
  • -A tasks is the location of the Celery app 
  • worker starts worker instance with max-memory as seen 
  • -Q processing tells the worker only to consume tasks from the processing queue 
  • concurrency: number of child processes processing the queue. 
  • prefetch-multiplier: the number of tasks (messages) a worker can reserve for itself. 

The API’s entry point: 

gunicorn -b :${API_PORT} --workers 10 --timeout 3600 --log-level=${LOGLEVEL} run:app
  • gunicorn: a Python WSGI HTTP Server 
  • -b: bind server socket to API_PORT 
  • workers: number of worker processes 
  • run:app: “path” to run.py’s app-object 

RabbitMQ

As mentioned, RabbitMQ is Celery’s default message broker. Aside from Celery, RabbitMQ is popular because it is easy to use and lightweight. 

An official RabbitMQ image will be used by the service defined in our compose file. Next, we declare the broker port, from which Celery can access the queue, as well as a port to access the user interface. 

  rabbitmq:
    hostname: dev_rabbitmq
    image: rabbitmq:3-management
    ports:
        - ${RABBITMQ_PORT}:${RABBITMQ_PORT}
        - ${RABBITMQ_UI_PORT}:${RABBITMQ_UI_PORT}
    restart: always

Redis & RedisInsight

Redis is an opensource project that can serve as a database, cache, streaming engine, and broker. In addition to supporting many data structures, it is simple to install and easy to use. With RedisInsight, we can inspect and manage our database. As shown below, both services are instantiated from the official images. 

  redis:
    image: 'redis:latest'
    ports:
      - "${REDIS_PORT}:${REDIS_PORT}"
  redisinsight:
    image: redislabs/redisinsight:latest
    ports:
      - '8001:8001'
    depends_on:
      - rabbitmq
      - redis

Demo

With our final docker-compose file, we can spin up our Docker applications with: 

docker-compose — env-file .env up -d 

where our environment-file contains the port-numbers and log-level of our different services. 

Starting with our REST API at localhost:3333 we find: 

By opening the panel, we can send a POST request the /process endpoint of our API: 

Inspecting the logs of our celery_worker service, we can make sure that the task got executed successfully: 

docker-compose — env-file .env logs -f celery_worker 

async_cv-celery_worker-1 | Task processing.addition[4a8e0e44–9045–4fe8-add9–19852fe3d098] succeeded in 10.022503172018332s: 42 

We can now check if the result was properly stored in our Redis DB. The first time visiting RedisInsight at localhost:8001, you’ll be prompted to add your Redis Database. Afterwards, you should find a record in the browser tab with a task_id similar to the one we saw in the logs of our celery_worker. 

 

Similarly, at localhost:15672 we can inspect the status of our queue(s): 

The above chart will show the status of the queue’s capacity. A ready message can be understood as a task waiting to be processed even if there isn’t a single consumer. Unacknowledged messages mean that the consumer has promised to process them but has not yet acknowledged that they are being processed. 

Recap

This concludes the first part of this series where we discussed how to set up a local task queue using 5 docker microservices: 

  1. a REST API with a Swagger UI which is our gateway for launching jobs 
  2. these jobs are stored in a RabbitMQ queue. 
  3. Celery workers can then fetch jobs from this queue and execute the task 
  4. results are stored in a Redis database 
  5. which can then be viewed/queried using RedisInsight 

Get in touch!

Inquiry for your POC

=
Scroll to Top