Use Postgres as a simple task queue for Django 🐘

Updated: Mon 01 April 2024

Using Postgres as a task queue for Django is fast to add. It has other big advantages over the traditional approach (e.g., Celery and Redis).

For example, a friend runs Nomio, a profitable Django + HTMX startup with hundreds of critical daily tasks. They are moving to using Postgres as a task queue because it's simpler and more reliable.

Reasons to use Postgres as a task queue:

  1. Simple. It's a database table and a python script.
  2. It's CMD-Clickable. You can CMD+click on any part of your code and go to the underlying code to see how it works. Traditional task queues are barely CMD-Clickable.
  3. Reliable. In constrast, Redis has plagued me with lost tasks and poor error messages previously.
  4. Easy to monitor. Just look at your Task in your Django admin page.

A drawback is that it's not as fast as Redis. But, it's probably more than fast enough for your use case. -> See Leon's article here about how the drawbacks are minimal.

Question: Can I use SQLite instead of Postgres?

Yes! Thanks to Alex Goulielmos for pointing out that you don't have to use Postgres. He ran the guide perfectly using SQLite.

Also thanks to Alex for correcting an error. Now fixed 🎉


Enough talking. Let's add a simple task queue to Django using Postgres.

There'll be 4 steps. Let's go 🚀

1. Setup Django

pip install --upgrade django psycopg2
django-admin startproject core .
python manage.py startapp sim

Add our app sim to the INSTALLED_APPS in settings.py:

# settings.py
INSTALLED_APPS = [
  'sim',
  ...
]

2. Connect postgres to Django in settings.py

We'll assume that you've already installed Postgres on your machine. If not, I recommend this simple installer for Mac: https://postgresapp.com/ If you're on Windows, this looks like a good option: https://www.enterprisedb.com/downloads/postgres-postgresql-downloads

After you've installed Postgres, open the postgres shell and create a new database and user for your Django project:

psql
CREATE USER demo_user WITH PASSWORD 'special-tiger-1234';
CREATE DATABASE demo_db WITH OWNER demo_user;
GRANT ALL PRIVILEGES ON DATABASE demo_db TO demo_user;
\q

Connect postgres to Django in settings.py

  • Replace your DATABASES in core/settings.py with the below:

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'demo_db',
        'USER': 'demo_user',
        'PASSWORD': 'special-tiger-1234',
        'HOST': 'localhost',
        'PORT': '5432',
    }
}

Run your server to check it works

python manage.py runserver
  • Visit http://localhost:8000/ and you should see the Django welcome page.

Create your task queue

  • Add the below to sim/models.py:
import json
import importlib
from django.db import models
from django.utils import timezone
import sys
from typing import Callable, List


class Task(models.Model):
    """
    A model to represent a background task in the queue.
    """
    module_path = models.CharField(max_length=255)  # e.g., "myapp.tasks"
    function_name = models.CharField(max_length=255)  # e.g., "my_function"
    function_args = models.JSONField(default=list, blank=True)

    created_at = models.DateTimeField(auto_now_add=True)
    started_at = models.DateTimeField(null=True, blank=True)

    function_result = models.JSONField(default=dict, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)

    function_error = models.JSONField(default=dict, blank=True)
    failed_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        ordering = ['created_at']

    def __str__(self) -> str:
        started = self.started_at.strftime('%Y-%m-%d %H:%M:%S %Z') if self.started_at else None
        completed = self.completed_at.strftime('%Y-%m-%d %H:%M:%S %Z') if self.completed_at else None

        return (
            f"Task id={self.id}. "
            f"Module: {self.module_path}. "
            f"Function: {self.function_name}. "
            f"Started: {started}. "
            f"Completed: {completed}"
        )

    def run_task(self) -> None:
        module = importlib.import_module(self.module_path)
        function = getattr(module, self.function_name, None)

        if not function:
            self.function_error = "Function not found"
            self.failed_at = timezone.now()
            self.save()
            sys.stdout.write(f"Failed task: {self.id}\n")
            return

        args = self.function_args if isinstance(self.function_args, list) else []

        self.started_at = timezone.now()
        self.save()
        sys.stdout.write(f"Started task id={self.id}\n")

        self._execute(function, *args)

    def _execute(self, function: Callable, *args: List) -> None:
        """
        Execute the task's function and save the result or error.
        """
        try:
            sys.stdout.write(
                f'function = {function}\n'
                f'args = {args}\n'
            )
            result = function(*args)
            sys.stdout.write(f'result {result}\n')

            self.function_result = json.dumps(result, default=str)
            self.completed_at = timezone.now()
            sys.stdout.write(f"Completed task id={self.id}\n")

        except Exception as e:
            self.function_error = json.dumps(e, default=str)
            self.failed_at = timezone.now()
            sys.stdout.write(
                f"Failed task: {self.id}.\n"
                f"Reason: {self.function_error}\n"
            )
        finally:
            self.save()


class Sample(models.Model):
    """
    A simple model to demonstrate the use of the background worker.
    """
    name = models.CharField(max_length=255)
    value = models.IntegerField()

This includes a sample model that we'll use later.

  • Run the migrations:
python manage.py makemigrations
python manage.py migrate

3. Run the worker

Create a tasks.py to run a worker to process tasks

  • Create a file at sim/tasks.py and add the below:
from django.db import transaction
from django.utils import timezone
from .models import Task
import time
import sys
import importlib
from typing import List, Optional


class TaskManager:
    """
    Manages task queue operations: adding, fetching, and processing tasks.
    """

    @staticmethod
    def add_task(
            module_path: str, function_name: str, function_args: Optional[List] = None
    ) -> Task:
        """
        Add a new task to the queue.
        :param module_path:  e.g., "myapp.tasks"
        :param function_name:  e.g., "my_function"
        :param function_args:  e.g., [1, 2, 3] or None
        """
        module = importlib.import_module(module_path)
        if not hasattr(module, function_name):
            raise ValueError(f"Function {function_name} not found in module {module_path}.")

        function_args = function_args or []
        if not isinstance(function_args, list):
            raise ValueError(
                f"Function {function_name} function_args must be None or a list. "
                f"Got type '{type(function_args)}' instead."
            )

        return Task.objects.create(
            module_path=module_path, function_name=function_name, function_args=function_args
        )

    @staticmethod
    @transaction.atomic
    def get_next_task() -> Task:
        """
        Atomically select and lock the next available task in the queue.
        """
        return (
            Task.objects
            .filter(started_at__isnull=True)
            .select_for_update(skip_locked=True)
            .first()
        )


class Worker:
    """
    Background worker that continuously fetches and processes tasks.
    """

    @staticmethod
    def run() -> None:
        """
        Start the worker loop.
        """
        sys.stdout.write("Started worker 🏗️\n")
        while True:
            task = TaskManager.get_next_task()
            if not task:
                sleep_seconds = 5
                current_time = timezone.now().strftime('%Y-%m-%d %H:%M:%S %Z')
                sys.stdout.write(
                    f"{current_time}: No tasks in the queue. Sleeping for {sleep_seconds} seconds 💤\n"
                )
                time.sleep(sleep_seconds)  # We reduce DB load by sleeping. I'd reduce this wait to 1 second in prod.
                continue
            task.run_task()


if __name__ == "__main__":
    Worker.run()

4. Add and view tasks in our postgres queue with Django

Add tasks to the queue

  • Add these example functions to sim/services.py:
from .models import Sample


def interesting_calculation(a, b):
    return a ** b


def create_another_model(name, value):
    return Sample.objects.create(name=name, value=value)
  • Queue the example functions as tasks in the shell
python manage.py shell
  • Add the below to the shell:
from sim.tasks import TaskManager
manager = TaskManager()

manager.add_task(module_path='sim.services', function_name='interesting_calculation', function_args=[3, 14])
manager.add_task(module_path='sim.services', function_name='add_another_model', function_args=['Robert', 13])

Run the worker

python manage.py shell
from sim.tasks import Worker
Worker.run()

Expected output:

>>> from sim.tasks import Worker
>>> Worker.run()
Started worker 🏗️
...
````


### How to view all tasks in the queue?
- Programmatically, simply run the below in your Django shell:
```python
from sim.models import Task
Task.objects.all()

With the Django admin, you can view all tasks in the queue. - Create a Django superuser (python manage.py createsuperuser) - Register the Task model by adding the below to sim/admin.py:

from django.contrib import admin
from .models import Task

admin.site.register(Task)
  • Run your server: python manage.py runserver
  • Vist http://localhost:8000/admin/sim/task/ to view all tasks in the queue.

Q. How to run the task queue and worker in production?

The obvious question you'll have is: How will I run this worker in production?

I recommend the following way to deploy this in production:

  1. deploy a managed database running Postgres, storing all your normal data and your task queue.
  2. deploy a managed worker running Python, processing tasks from the task queue.

You can then create a web app which adds tasks into the database, which the worker will do.

Tell me if you'd like me to write a guide on how to deploy this 🚀

Example architecture:

 +-----------------------+    +-----------------+    +-----------------+   
 |  Data and task queue  |    |  Worker         |    |  Web Server     |   
 |  (Postgres)           |    |  (Django)       |    |  (Django)       |   
 |                       |    |                 |    |                 |   
 |  +----------+         |    |  +----------+   |    |  +----------+   |   
 |  |  Models  |         |    |  |  Tasks   |   |    |  |  Views   |   |   
 |  +----------+         |    |  +----------+   |    |  +----------+   |   
 |                       |    |                 |    |                 |   
 +-----------------------+    +-----------------+    +-----------------+   

Congrats - You now have a simple task queue with Django and Postgres 🐘

Thanks to Leon for his article on using Postgres as a task queue in a non-Django context.

P.S Django frontend at warp speed? ⚡️

Do you dream of creating Django products so quickly they break the space-time continuum? Yeah, me too.

Well, let me introduce you to the magic wand I'm building: Photon Designer. It's a visual editor that puts the 'fast' in 'very fast.' When Photon Designer gets going, it slings Django templates at you faster than light escaping a black hole.

Let's get visual.

Do you want to create beautiful django frontends effortlessly?
Click below to book your spot on our early access mailing list (as well as early adopter prices).