dima

on software
Latest blog articles:

The Cartesian Product Trap in Django ORM

I hit a performance wall at work recently after adding a single extra Count() on a Django queryset that looked harmless. Luckily, the problematic code didn't make it into the production environment, but I had to think hard to figure out what went wrong and find a solution.

The Demo Models

All the following models and entities are made up by me to illustrate the problem.

Imagine we have a PostgreSQL database storing information about separate stores, their departments, employees and products of the stores:

Database schema showing Store, Department, Employee, and Product relationships

The DB tables can contain much more fields, but in our case we only care about relations between the models.

So in Django, models.py would look like this:

from django.db.models import Model, CharField, ForeignKey, CASCADE

class Store(Model):
    name = CharField(max_length=200)
    # ...

class Department(Model):
    store = ForeignKey(
        Store, related_name="departments", on_delete=CASCADE
    )
    # ...

class Employee(Model):
    department = ForeignKey(
        Department, related_name="employees", on_delete=CASCADE
    )
    # ...

class Product(Model):
    store = ForeignKey(
        Store, related_name="products", on_delete=CASCADE
    )
    # ...

Seed data (roughly what bit me):

  • 2 stores,
  • 10 departments per store,
  • 500 employees per department - 10 000 employees overall,
  • 2500 products per store.

In one of the places of our system, we show a list of the stores, including the amount of employees in each store. Pretty easy with Django ORM, right?

stores = Store.objects.annotate(
    total_employees=Count("departments__employees")
).values(
    "id",
    # ...
    "total_employees"
)

This query is relatively fast and works like a charm.

The Problem

Now let's imagine we were asked to add another counter: a number of products per store. We already have total_employees, why not just add total_products?

Well, most likely we already have some unit test for our piece of code, which checks the logic on a small amount of data, and before releasing the code, we can figure out that another JOIN was added, some data is duplicated, and instead of just COUNT(...), we switch to COUNT(DISTINCT ...), eventually coming up with something like this:

stores = Store.objects.annotate(
    total_employees=Count("departments__employees",
                          distinct=True),
    total_products=Count("products", distinct=True),
).values(
    "id",
    # ...
    "total_employees",
    "total_products",
)

Looks safe to commit, push, wait for the green tests on CI, merge and deploy.

However, after the deployment you'll almost immediately see that everything hangs. As I said, there's not that many stores, only 2 for now, not that many employees, and not that many departments.

And it takes, like, 10 seconds to fetch the numbers! What's wrong with it?

Let's take a closer look at the generated SQL for this seemingly innocent queryset:

SELECT 
    "shop_store"."id",
    COUNT(DISTINCT "shop_product"."id") AS "total_products", 
    COUNT(DISTINCT "shop_employee"."id") AS "total_employees"
FROM "shop_store"
  LEFT OUTER JOIN "shop_product"
    ON "shop_store"."id" = "shop_product"."store_id"
  LEFT OUTER JOIN "shop_department"
    ON "shop_store"."id" = "shop_department"."store_id"
  LEFT OUTER JOIN "shop_employee"
    ON "shop_department"."id" = "shop_employee"."department_id" 
GROUP BY "shop_store"."id"

And let's check the actual query plan:

GroupAggregate ... rows=2 ...
  ->  Nested Loop Left Join ... rows=25000000 ...
  ...
...
Execution Time: 11376.072 ms

Translation: these three JOINs turn into a 25-million-row cartesian mess before GROUP BY and COUNT(DISTINCT):

Products × Departments × Employees
= 2500 × 10 × 500
= 12 500 000  (per store)
× 2 stores
= 25 000 000 joined rows

The Fix

There are multiple ways of handling this case, but the easiest fix is to use subqueries:

subquery_products = Subquery(
    Product.objects.filter(store_id=OuterRef("pk"))
        .values("store_id")
        .annotate(count=Count("pk"))
        .values("count"),
    output_field=IntegerField()
)
subquery_employees = Subquery(
    Employee.objects.filter(department__store_id=OuterRef("pk"))
        .values("department__store_id")
        .annotate(count=Count("pk"))
        .values("count"),
    output_field=IntegerField()
)
stores = Store.objects.annotate(
    total_products=Coalesce(subquery_products, 0),
    total_employees=Coalesce(subquery_employees, 0),
).values("id", "total_products", "total_employees")

SQL query:

SELECT "shop_store"."id",
       COALESCE((
         SELECT COUNT(U0."id") AS "count"  
         FROM "shop_product" U0  
         WHERE U0."store_id" = "shop_store"."id"  
         GROUP BY U0."store_id"), 0
       ) AS "total_products",
       COALESCE((
         SELECT COUNT(U0."id") AS "count"
         FROM "shop_employee" U0
           INNER JOIN "shop_department" U1
             ON U0."department_id" = U1."id"
         WHERE U1."store_id" = "shop_store"."id"
         GROUP BY U1."store_id"), 0
       ) AS "total_employees"
FROM "shop_store";

Now this one takes a couple of milliseconds with pretty modest and predictable plan:

Seq Scan on shop_store ... rows=2 ...
  SubPlan 1
     -> Seq Scan on shop_product u0 ... rows=2500 ...
  SubPlan 2
     -> Hash Join ... rows=5000 ...
        -> Seq Scan on shop_employee u0_1 ... rows=10000 ...
        -> Hash ... rows=10 ...
...
Execution Time: 5.600 ms

No giant intermediate data sets, just two tiny scans:

  • before: 11 376 ms (~11 seconds)
  • after: 5.6 ms (2000x faster)

Takeaways

  • COUNT(DISTINCT) with multi-branch LEFT JOINs makes the database loop through the entire cartesian product.
  • Correlated subqueries aggregate each branch separately and scale linearly with data size.
  • Always test aggregate queries against production-sized data before you ship.

The Hidden Cost of Test Inheritance

I'm subscribed to Adam Johnson's blog and usually really enjoy his writing - it's practical, deep, and no-bullshit. But one recent post, Python: sharing common tests in unittest, caught me off guard.

It describes a "neat" pattern: write reusable test logic in a base class, subclass it to test multiple objects, hiding the base class from unittest discovery. While the intent is fine - DRYing out duplicated test code - the result is fragile, confusing, and just not worth it.

Here's why.

The Pattern: DRY Tests via Subclassing

# Sample units to test
class Armadillo:
    def speak(self) -> str:
        return "Hrrr!"

class Okapi:
    def speak(self) -> str:
        return "Gronk!"

# Test module
class BaseAnimalTests(TestCase):
    animal_class: type

    def test_speak(self):
        sound = self.animal_class().speak()
        self.assertIsInstance(sound, str)
        self.assertGreater(len(sound), 0)

class ArmadilloTests(BaseAnimalTests):
    animal_class = Armadillo

class OkapiTests(BaseAnimalTests):
    animal_class = Okapi

del BaseAnimalTests

Yes, it works and it reduces duplication. But it comes at the cost of everything else that makes tests maintainable.

The Problems

IDE and DX Pain

IDE

When a test fails, I want to jump to it in my IDE, set a breakpoint, and debug. With this pattern - good luck.

The method doesn't exist in ArmadilloTests, it's buried in a deleted parent class. You have to manually hunt it down, re-declare the test method just to put a breakpoint and debug it, and pray the animal_class setup matches what failed:

class ArmadilloTests(TestCase):
    animal_class = Armadillo

    def test_speak(self):
        super().test_speak()

breakpoint

It's tedious and wastes time. All this to avoid writing a 3-line test twice?

class ArmadilloTests(TestCase):
    def test_speak(self):
        sound = Armadillo().speak()
        self.assertIsInstance(sound, str)
        self.assertGreater(len(sound), 0)

Clear, simple, debug-friendly. Worth the few extra lines.

CI Failures Are Confusing

If a shared test fails in CI, you get something like:

test_speak (tests.ArmadilloTests.test_speak) ... FAIL
...
Traceback (most recent call last):
  File ".../tests.py", line 20, in test_speak
    self.assertGreater(len(sound), 0)
AssertionError: 0 not greater than 0

But the method isn't defined in ArmadilloTests, and Search everywhere won't help at all:

nothing found

So now you have to reverse-engineer which base class it came from and how to recreate it locally.

This isn't clever. It's just fragile.

When It Kinda Makes Sense

There are rare cases:

  • dozens of classes implementing the same interface
  • you're the only one maintaining the codebase
  • you run everything headless in CI

But even then, you're building test framework plumbing to save what, a hundred lines?

The Clean Alternative: Parametrize It

Pytest Style

@pytest.mark.parametrize('animal_class', [Armadillo, Okapi])
def test_speak(animal_class):
    sound = animal_class().speak()
    assert isinstance(sound, str)
    assert len(sound) > 0

You see all the parameters. You see where the test lives. Failures are explicit:

test_speak[Armadillo] FAILED
test_speak[Okapi] PASSED

You can re-run just the failing test. You can debug with a conditional breakpoint. You don't need to explain how the tests are wired together - because they're not.

unittest Style (Optional, Not Ideal)

from parameterized import parameterized_class

@parameterized_class([
    {'animal_class': Armadillo},
    {'animal_class': Okapi},
], class_name_func=get_class_name)
class AnimalTests(TestCase):
    def test_speak(self):
        sound = self.animal_class().speak()
        self.assertIsInstance(sound, str)
        self.assertGreater(len(sound), 0)

Using parameterized_class from parameterized is still better than inheritance, but clunkier. Output is readable if you customize class_name_func. IDE support isn't great. Pytest remains the better option for anything dynamic.

Final Verdict

Tests should fail clearly, debug easily, and be readable years later. This pattern fails all three.

DRY is good. But in tests, visible duplication beats invisible abstraction.

Adam's trick technically works, but in practice, it makes tests harder to navigate, harder to trust, and harder to work with.

Stick to the boring version - you'll thank yourself later.

Why Django's override_settings Sometimes Fails (and How reload + patch Saved Me)

Sometimes @override_settings just doesn’t cut it.

I ran into a nasty issue while testing a Django module that relies on global state initialized during import. The usual test approach didn’t work. Here’s what happened and how I solved it.

The Setup

We had a module that builds a global dictionary from Django settings at import time. Let’s call it dragon.py, which takes settings.PUT_EGGS, which is False by default:

from django.conf import settings

DRAGON = {}
...
if settings.PUT_EGGS:
    DRAGON["eggs"] = "spam"

Another module uses DRAGON for core logic, e.g. mario.py:

from myproject.dragon import DRAGON

def find_eggs():
    if "eggs" in DRAGON:
        return "Found eggs!"
    return "Eggs not found"

Now I wanted to write a test that tweaks DRAGON and expects the logic to behave differently. Easy, right?

@override_settings(PUT_EGGS=True)
def test_find_eggs():
    assert find_eggs() == "Found eggs!"

Wrong. The test failed.

The Problem

override_settings works, but only for code that reads settings at runtime.

In my case, DRAGON was already built at import time , before the override kicked in. So it used the old value of PUT_EGGS, no matter what I did in the test.

This is the classic trap of global state baked during import. Welcome to pain town.

The Fix: reload + patch

Here's how I got out:

import importlib
from django.test import override_settings
from unittest.mock import patch
from myproject.mario import find_eggs

@override_settings(PUT_EGGS=True)
def test_find_eggs():
    # Reload the dragon module so DRAGON is rebuilt
    # with updated settings
    from myproject import dragon
    new_dragon = importlib.reload(dragon)

    # Patch the logic module to use the reloaded DRAGON
    with patch('myproject.mario.DRAGON', new_dragon.DRAGON):
        result = find_eggs()
        assert result == "Found eggs!"

Why This Works

  • importlib.reload(dragon) forces a fresh import of dragon, rebuilding DRAGON with the overridden settings;
  • dragon.DRAGON is updated in the scope of the test only, i.e. mario module still has the stale version of DRAGON;
  • patch(...) solves this problem by swapping the old DRAGON in mario with the freshly rebuilt one.

This is surgical. Ugly, but effective.

Lessons Learned

  • Avoid putting non-trivial logic at module scope, especially if it depends on Django settings. Wrap it in a function or lazy loader.
  • If you're stuck with global state, reload() and patch() give you a way out - just be careful about cascading dependencies.

If you’ve ever had a test mysteriously fail after overriding settings, this might be why.

Calculating the next run date of a Celery periodic task

The problem

You have a periodic task in Celery defined with a crontab(...) schedule, and you want to calculate the next time it's supposed to run.

Example: you want to find out when crontab(hour=12, minute=0) will trigger next after now.

Simple, right? There’s croniter library, which seems to be designed to solve this exact problem. Just use it, right?

Well.

First mistake: trying croniter with crontab

So my first instinct was to use croniter like this:

from celery.schedules import crontab
from croniter import croniter
from datetime import datetime

schedule = crontab(hour=12, minute=0)
cron = croniter(schedule, datetime.now())
next_run = cron.get_next(datetime)

Boom 💥 doesn’t work. Because Celery’s crontab is not a string and croniter expects a string like "0 12 * * *":

AttributeError: 'crontab' object has no attribute 'lower'

And no, crontab() does not have a nice .as_cron_string() method either.

So now you’re stuck parsing crontab's internal fields (._orig_minute, ._orig_hour, etc) just to reconstruct a string - and it starts to smell like overengineering for something that should be simple.

The right way (which I learned too late)

Turns out Celery’s crontab (and all schedules derived from celery.schedules.BaseSchedule) already has a method for this:

from datetime import datetime
from celery.schedules import crontab

schedule = crontab(hour=12, minute=0)
now = datetime.now()
# `now` is datetime.datetime(2025, 6, 11, 0, 16, 58, 484085)
next_run = now + schedule.remaining_delta(now)[1]
# `next_run` is datetime.datetime(2025, 6, 11, 12, 0)

That’s it. You don’t need croniter at all. Celery knows how to calculate the delta to the next run. It just doesn’t shout about it in the docs.

Summary

  • don’t reinvent Celery’s scheduling logic - it already has what you need;
  • crontab is not a cron string, don’t try to treat it like one;
  • use .remaining_delta(last_run) to calculate when the next run will happen.

Hope this saves someone the 2 hours I wasted trying to do it the wrong way.

Pytest Fish shell autocompletion

TL;DR https://github.com/ddoroshev/pytest.fish

Typing repetitive commands or copying and pasting test names can eat up valuable time. To help, I've created pytest.fish - a Fish shell plugin that simplifies your pytest workflow. It's lightweight, simple to set up, and makes testing more efficient.

How to Use

Autocomplete test paths

Type pytest and hit TAB to get suggestions for test paths and functions:

Support for -k filter

Narrow down tests with -k and get name suggestions:

The plugin dynamically scans your project, so suggestions stay up-to-date.

Installation

Install with Fisher:

fisher install ddoroshev/pytest.fish

Or manually copy the files from the repository into your Fish configuration.

How It Works

The plugin doesn't rely on pytest directly (yet). Instead, it scans the current directory for test files and searches for test functions inside them, making the process relatively fast and efficient.

Other shells?

Since I primarily use Fish in my local development environment, I created a plugin specifically for this shell. However, if you use Bash or Zsh, feel free to create your own - or switch to Fish already. 😉