Building a Toy Database: Concurrency is Hard
Hey folks, it's been a while. Here's some news on Bazoola, my precious toy database, which already reached version 0.0.7!
Demo App
Meet the "Task manager" - a flask-based app demonstrating current capabilities of Bazoola:
In order to run the task manager:
git clone https://github.com/ddoroshev/bazoola.git
cd bazoola
pip install bazoola
python demo/app.py
Then open http://127.0.0.1:5000 in your browser, and Bob's your uncle. The schemas and tables are all pre-created and filled with demo data, so you can browse and edit it, checking the explanation at the bottom of the page. For example, you can try to search design-related projects and tasks by "des" query:
If you scroll down a bit, there's an explanation of how it actually works:
I'm still working on the TEXT
field implementation, so for now all descriptions and comments are basically CHAR(200)
, but this will change soon.
Also, cherry on top - this demo app has been almost entirely generated by ✨Claude Code! That was a huge boost for me, because it helped focus on the essentials, delegating the boring stuff to AI.
Concurrency
Finally, this thing can be run in concurrent environment: in parallel threads or even in parallel processes. There are no parallel operations though, but at least the data won't corrupt if you run two apps using the same data
directory simultaneously.
The key is a global lock. In a nutshell:
class DB:
# ...
def insert(self, table_name, values):
with self.storage.lock():
# ...
return tbl.insert(values)
def find_all(self, table_name, *, joins=None):
# ...
with self.storage.lock():
rows = self.tables[table_name].find_all()
# ...
return rows
def find_by_id(self, table_name, pk, *, joins=None):
# ...
with self.storage.lock():
row = self.tables[table_name].find_by_id(pk)
# ...
return row
def delete_by_id(self, table_name, pk):
with self.storage.lock():
self.tables[table_name].delete_by_id(pk)
def update_by_id(self, table_name, pk, values):
with self.storage.lock():
# ...
return self.tables[table_name].update_by_id(pk, values)
Actually, there are two global locks under the hood:
- A
.lock
file, essentially a global mutex shared between the processes via flock. - A
threading.RLock
object, shared between threads of a single process. However,flock
is not enough, because it's basically useless within the same process.
# storage.py
class File:
# ...
@contextmanager
def lock(self):
try:
fcntl.flock(self.f.fileno(), fcntl.LOCK_EX)
yield
finally:
fcntl.flock(self.f.fileno(), fcntl.LOCK_UN)
# ...
class TableStorage:
def __init__(self, cls_tables, base_dir: str):
# ...
self._threadlock = threading.RLock()
self._lockfile = File.open(".lock", base_dir=base_dir)
# ...
@contextmanager
def lock(self):
with self._threadlock, self._lockfile.lock():
yield
It's not about atomicity, it doesn't solve data consistency in general, i.e. if you unplug your computer in the middle of a write operation, you'll most likely lose some ¯_(ツ)_/¯
Elimination of find_by
and find_by_substr
Initially, I had two specific functions:
find_by(table_name, field_name, value)
- an equivalent ofWHERE field_name == value
find_by_substr(table_name, field_name, substr)
-WHERE field_name LIKE substr
However, there's already find_by_cond()
, allowing to search by arbitrary conditions, defined in cond.py.
So I just added EQ
, SUBSTR
and ISUBSTR
and removed these two helpers in favor of find_by_cond(table_name, EQ(field=value))
and find_by_cond(table_name, SUBSTR(field=substr))
.
One More Thing
One of the features of this database at the moment is it's size - its core implementation is less than 800 lines of Python code! I decided that it would be nice to highlight it in the Github description, and came up with a simple script, update_description.sh, which I run once in a while to count the lines of bazoola/*.py
and update the repo description.

I didn't find out yet how to run it on CI, but I think this one is already good enough.
Conclusion
I treat this update as a huge leap: a month ago Bazoola was barely usable in real-life applications, and now you can build something on top of it. There's no VARCHAR
or TEXT
fields, the amount of conditions (EQ
, LT
, GT
, ...) is quite limited, selecting particular fields and ordering is also not implemented, and that's why it's just a 0.0.7 release :)
I like the fact that the data files contain mostly plain text, it's easy to test and debug the logic and data consistency. Of course, I'm planning to go binary and to switch from Python to C, and those benefits disappear, but by that time I'll have a solid amount of tests and a bunch of tools for managing the data.
Stay tuned!
Building a Toy Database: Learning by Doing
Ever wondered how databases work under the hood? I decided to find out by building one from scratch. Meet Bazoola - a simple file-based database written in pure Python.
Why Build a Toy Database?
As a developer, I use relational databases every day, but I never truly understood what happens when I INSERT
or SELECT
. Building a database from scratch taught me more about data structures, file I/O, and system design than any tutorial ever could.
Plus, it's fun to implement something that seems like magic!
What is Bazoola?
Bazoola is a lightweight, educational database that stores data in human-readable text files. It's not meant to replace SQLite or PostgreSQL - it's meant to help understand how databases work.
Key Features:
- Fixed-width column storage
- CRUD operations - the basics every database needs
- Foreign keys - because relationships matter
- Automatic space management - reuses deleted record space
- Human-readable files - you can literally
cat
your data
The Core Idea: Fixed-Width Records
Unlike CSV files where records have variable length, Bazoola uses fixed-width fields:
1 Alice 25
2 Bob 30
3 Charlie 35
This makes it trivial to:
- Jump to any record by calculating its offset
- Update records in-place without rewriting the file
- Build simple indexes
Show Me the Code!
Here's how you use Bazoola:
from bazoola import *
# Define schemas
class TableAuthors(Table):
name = "authors"
schema = Schema([
Field("id", PK()),
Field("name", CHAR(20)),
])
class TableBooks(Table):
name = "books"
schema = Schema([
Field("id", PK()),
Field("title", CHAR(20)),
Field("author_id", FK("authors")),
Field("year", INT(null=True)),
])
# Create database instance and the files in `data/` subdir
db = DB([TableAuthors, TableBooks])
# Insert data
author = db.insert("authors", {"name": "John Doe"})
book = db.insert("books", {
"title": "My Book",
"author_id": author["id"],
"year": 2024
})
# Query with joins
book_with_author = db.find_by_id(
"books",
book["id"],
joins=[Join("author_id", "author", "authors")]
)
print(book_with_author)
# Output: {'id': 1, 'title': 'My Book',
# 'author_id': 1, 'year': 2024,
# 'author': {'id': 1, 'name': 'John Doe'}}
# Close the database
db.close()
Interesting Implementation Details
Space Management
When you delete a record, Bazoola fills it out with *
symbols, maintaining a stack of free positions:
class Table:
# ...
def delete_by_id(self, pk: int) -> None:
# ...
self.f.seek(rownum * self.row_size)
self.rownum_index.set(pk - 1, None)
self.f.write(b"*" * (self.row_size - 1) + b"\n")
self.free_rownums.push(rownum)
# ...
def seek_insert(self) -> None:
rownum = self.free_rownums.pop()
if rownum is not None:
self.f.seek(rownum * self.row_size)
return
self.f.seek(0, os.SEEK_END)
This simple approach prevents the database from growing indefinitely.
Indexing
Every table automatically gets an index on its primary key in <table_name>__id.idx.dat
file, so finding a record by ID is O(1) - just look up the position and seek!
Foreign Keys
Bazoola supports relationships between tables and joins:
# Query with joins
rows = db.find_all("books", joins=[
Join("author_id", "author", "authors")
])
# Inverse joins (one-to-many)
author = db.find_by_id("authors", 1, joins=[
InverseJoin("author_id", "books", "books")
])
However, it doesn't enforce referencial integrity yet, and you can basically delete anything you want ¯\_(ツ)_/¯
Human-Readable Storage
Data files are just formatted text:
$ cat data/books.dat
1 My Book 1 2024
************************************
3 My Book3 1 2024
Great for debugging and understanding what's happening!
What I Learned
- Data format and file-based storages are tricky, that's why it's all just test-based.
- Fixed-width has trade-offs - simple implementation, but wastes space. Real databases use more sophisticated formats.
- Indexes are magic: the difference between O(n) table scan and O(1) index lookup is massive, even with small datasets.
- Joins are hard.
- Testing is crucial. Database bugs can corrupt data, and comprehensive tests saved me many times.
Limitations (and why they're OK)
The current implementation is intentionally simple:
- no transactions (what if it fails mid-write?)
- no concurrency (single-threaded only)
- no query optimizer (every operation is naive)
- no B-tree indices (just a simple index for primary keys)
- no SQL (just Python API)
- fixed-width wastes space These aren't bugs - they're learning opportunities! Each limitation is a rabbit hole to explore.
Try It Yourself!
pip install bazoola
The code is on GitHub. It's ~700 lines of Python (db.py) - small enough to read in one sitting!
What's Next?
Building Bazoola opened my eyes to database internals. Some ideas for further versions:
TEXT
fields with a separate storage- strict foreign key constrainsts
- unique constraints
- B-tree indexes for range queries
ALTER TABLE
- simple query planner
- pure C implementation with Python bindings
But honestly, even this simple version taught me a ton.
Conclusion
Building a toy database is one of the best learning projects I've done. It's complex enough to be challenging but simple enough to finish. You'll understand your "real" database better, and you might even have fun!
The Cartesian Product Trap in Django ORM
I hit a performance wall at work recently after adding a single extra Count()
on a Django queryset that looked harmless. Luckily, the problematic code didn't make it into the production environment, but I had to think hard to figure out what went wrong and find a solution.
The Demo Models
All the following models and entities are made up by me to illustrate the problem.
Imagine we have a PostgreSQL database storing information about separate stores, their departments, employees and products of the stores:
The DB tables can contain much more fields, but in our case we only care about relations between the models.
So in Django, models.py
would look like this:
from django.db.models import Model, CharField, ForeignKey, CASCADE
class Store(Model):
name = CharField(max_length=200)
# ...
class Department(Model):
store = ForeignKey(
Store, related_name="departments", on_delete=CASCADE
)
# ...
class Employee(Model):
department = ForeignKey(
Department, related_name="employees", on_delete=CASCADE
)
# ...
class Product(Model):
store = ForeignKey(
Store, related_name="products", on_delete=CASCADE
)
# ...
Seed data (roughly what bit me):
- 2 stores,
- 10 departments per store,
- 500 employees per department - 10 000 employees overall,
- 2500 products per store.
In one of the places of our system, we show a list of the stores, including the amount of employees in each store. Pretty easy with Django ORM, right?
stores = Store.objects.annotate(
total_employees=Count("departments__employees")
).values(
"id",
# ...
"total_employees"
)
This query is relatively fast and works like a charm.
The Problem
Now let's imagine we were asked to add another counter: a number of products per store. We already have total_employees
, why not just add total_products
?
Well, most likely we already have some unit test for our piece of code, which checks the logic on a small amount of data, and before releasing the code, we can figure out that another JOIN
was added, some data is duplicated, and instead of just COUNT(...)
, we switch to COUNT(DISTINCT ...)
, eventually coming up with something like this:
stores = Store.objects.annotate(
total_employees=Count("departments__employees",
distinct=True),
total_products=Count("products", distinct=True),
).values(
"id",
# ...
"total_employees",
"total_products",
)
Looks safe to commit, push, wait for the green tests on CI, merge and deploy.
However, after the deployment you'll almost immediately see that everything hangs. As I said, there's not that many stores, only 2 for now, not that many employees, and not that many departments.
And it takes, like, 10 seconds to fetch the numbers! What's wrong with it?
Let's take a closer look at the generated SQL for this seemingly innocent queryset:
SELECT
"shop_store"."id",
COUNT(DISTINCT "shop_product"."id") AS "total_products",
COUNT(DISTINCT "shop_employee"."id") AS "total_employees"
FROM "shop_store"
LEFT OUTER JOIN "shop_product"
ON "shop_store"."id" = "shop_product"."store_id"
LEFT OUTER JOIN "shop_department"
ON "shop_store"."id" = "shop_department"."store_id"
LEFT OUTER JOIN "shop_employee"
ON "shop_department"."id" = "shop_employee"."department_id"
GROUP BY "shop_store"."id"
And let's check the actual query plan:
GroupAggregate ... rows=2 ...
-> Nested Loop Left Join ... rows=25000000 ...
...
...
Execution Time: 11376.072 ms
Translation: these three JOIN
s turn into a 25-million-row cartesian mess before GROUP BY
and COUNT(DISTINCT)
:
Products × Departments × Employees
= 2500 × 10 × 500
= 12 500 000 (per store)
× 2 stores
= 25 000 000 joined rows
The Fix
There are multiple ways of handling this case, but the easiest fix is to use subqueries:
subquery_products = Subquery(
Product.objects.filter(store_id=OuterRef("pk"))
.values("store_id")
.annotate(count=Count("pk"))
.values("count"),
output_field=IntegerField()
)
subquery_employees = Subquery(
Employee.objects.filter(department__store_id=OuterRef("pk"))
.values("department__store_id")
.annotate(count=Count("pk"))
.values("count"),
output_field=IntegerField()
)
stores = Store.objects.annotate(
total_products=Coalesce(subquery_products, 0),
total_employees=Coalesce(subquery_employees, 0),
).values("id", "total_products", "total_employees")
SQL query:
SELECT "shop_store"."id",
COALESCE((
SELECT COUNT(U0."id") AS "count"
FROM "shop_product" U0
WHERE U0."store_id" = "shop_store"."id"
GROUP BY U0."store_id"), 0
) AS "total_products",
COALESCE((
SELECT COUNT(U0."id") AS "count"
FROM "shop_employee" U0
INNER JOIN "shop_department" U1
ON U0."department_id" = U1."id"
WHERE U1."store_id" = "shop_store"."id"
GROUP BY U1."store_id"), 0
) AS "total_employees"
FROM "shop_store";
Now this one takes a couple of milliseconds with pretty modest and predictable plan:
Seq Scan on shop_store ... rows=2 ...
SubPlan 1
-> Seq Scan on shop_product u0 ... rows=2500 ...
SubPlan 2
-> Hash Join ... rows=5000 ...
-> Seq Scan on shop_employee u0_1 ... rows=10000 ...
-> Hash ... rows=10 ...
...
Execution Time: 5.600 ms
No giant intermediate data sets, just two tiny scans:
- before: 11 376 ms (~11 seconds)
- after: 5.6 ms (2000x faster)
Takeaways
COUNT(DISTINCT)
with multi-branchLEFT JOIN
s makes the database loop through the entire cartesian product.- Correlated subqueries aggregate each branch separately and scale linearly with data size.
- Always test aggregate queries against production-sized data before you ship.
The Hidden Cost of Test Inheritance
I'm subscribed to Adam Johnson's blog and usually really enjoy his writing - it's practical, deep, and no-bullshit. But one recent post, Python: sharing common tests in unittest, caught me off guard.
It describes a "neat" pattern: write reusable test logic in a base class, subclass it to test multiple objects, hiding the base class from unittest
discovery. While the intent is fine - DRYing out duplicated test code - the result is fragile, confusing, and just not worth it.
Here's why.
The Pattern: DRY Tests via Subclassing
# Sample units to test
class Armadillo:
def speak(self) -> str:
return "Hrrr!"
class Okapi:
def speak(self) -> str:
return "Gronk!"
# Test module
class BaseAnimalTests(TestCase):
animal_class: type
def test_speak(self):
sound = self.animal_class().speak()
self.assertIsInstance(sound, str)
self.assertGreater(len(sound), 0)
class ArmadilloTests(BaseAnimalTests):
animal_class = Armadillo
class OkapiTests(BaseAnimalTests):
animal_class = Okapi
del BaseAnimalTests
Yes, it works and it reduces duplication. But it comes at the cost of everything else that makes tests maintainable.
The Problems
IDE and DX Pain
When a test fails, I want to jump to it in my IDE, set a breakpoint, and debug. With this pattern - good luck.
The method doesn't exist in ArmadilloTests
, it's buried in a deleted parent class. You have to manually hunt it down, re-declare the test method just to put a breakpoint and debug it, and pray the animal_class
setup matches what failed:
class ArmadilloTests(TestCase):
animal_class = Armadillo
def test_speak(self):
super().test_speak()
It's tedious and wastes time. All this to avoid writing a 3-line test twice?
class ArmadilloTests(TestCase):
def test_speak(self):
sound = Armadillo().speak()
self.assertIsInstance(sound, str)
self.assertGreater(len(sound), 0)
Clear, simple, debug-friendly. Worth the few extra lines.
CI Failures Are Confusing
If a shared test fails in CI, you get something like:
test_speak (tests.ArmadilloTests.test_speak) ... FAIL
...
Traceback (most recent call last):
File ".../tests.py", line 20, in test_speak
self.assertGreater(len(sound), 0)
AssertionError: 0 not greater than 0
But the method isn't defined in ArmadilloTests
, and Search everywhere won't help at all:
So now you have to reverse-engineer which base class it came from and how to recreate it locally.
This isn't clever. It's just fragile.
When It Kinda Makes Sense
There are rare cases:
- dozens of classes implementing the same interface
- you're the only one maintaining the codebase
- you run everything headless in CI
But even then, you're building test framework plumbing to save what, a hundred lines?
The Clean Alternative: Parametrize It
Pytest Style
@pytest.mark.parametrize('animal_class', [Armadillo, Okapi])
def test_speak(animal_class):
sound = animal_class().speak()
assert isinstance(sound, str)
assert len(sound) > 0
You see all the parameters. You see where the test lives. Failures are explicit:
test_speak[Armadillo] FAILED
test_speak[Okapi] PASSED
You can re-run just the failing test. You can debug with a conditional breakpoint. You don't need to explain how the tests are wired together - because they're not.
unittest Style (Optional, Not Ideal)
from parameterized import parameterized_class
@parameterized_class([
{'animal_class': Armadillo},
{'animal_class': Okapi},
], class_name_func=get_class_name)
class AnimalTests(TestCase):
def test_speak(self):
sound = self.animal_class().speak()
self.assertIsInstance(sound, str)
self.assertGreater(len(sound), 0)
Using parameterized_class
from parameterized is still better than inheritance, but clunkier. Output is readable if you customize class_name_func
. IDE support isn't great. Pytest remains the better option for anything dynamic.
Final Verdict
Tests should fail clearly, debug easily, and be readable years later. This pattern fails all three.
DRY is good. But in tests, visible duplication beats invisible abstraction.
Adam's trick technically works, but in practice, it makes tests harder to navigate, harder to trust, and harder to work with.
Stick to the boring version - you'll thank yourself later.
Why Django's override_settings Sometimes Fails (and How reload + patch Saved Me)
Sometimes @override_settings
just doesn’t cut it.
I ran into a nasty issue while testing a Django module that relies on global state initialized during import. The usual test approach didn’t work. Here’s what happened and how I solved it.
The Setup
We had a module that builds a global dictionary from Django settings at import time. Let’s call it dragon.py
, which takes settings.PUT_EGGS
, which is False
by default:
from django.conf import settings
DRAGON = {}
...
if settings.PUT_EGGS:
DRAGON["eggs"] = "spam"
Another module uses DRAGON
for core logic, e.g. mario.py
:
from myproject.dragon import DRAGON
def find_eggs():
if "eggs" in DRAGON:
return "Found eggs!"
return "Eggs not found"
Now I wanted to write a test that tweaks DRAGON
and expects the logic to behave differently. Easy, right?
@override_settings(PUT_EGGS=True)
def test_find_eggs():
assert find_eggs() == "Found eggs!"
Wrong. The test failed.
The Problem
override_settings
works, but only for code that reads settings at runtime.
In my case, DRAGON
was already built at import time , before the override kicked in. So it used the old value of PUT_EGGS
, no matter what I did in the test.
This is the classic trap of global state baked during import. Welcome to pain town.
The Fix: reload + patch
Here's how I got out:
import importlib
from django.test import override_settings
from unittest.mock import patch
from myproject.mario import find_eggs
@override_settings(PUT_EGGS=True)
def test_find_eggs():
# Reload the dragon module so DRAGON is rebuilt
# with updated settings
from myproject import dragon
new_dragon = importlib.reload(dragon)
# Patch the logic module to use the reloaded DRAGON
with patch('myproject.mario.DRAGON', new_dragon.DRAGON):
result = find_eggs()
assert result == "Found eggs!"
Why This Works
importlib.reload(dragon)
forces a fresh import ofdragon
, rebuildingDRAGON
with the overridden settings;dragon.DRAGON
is updated in the scope of the test only, i.e.mario
module still has the stale version ofDRAGON;
patch(...)
solves this problem by swapping the oldDRAGON
inmario
with the freshly rebuilt one.
This is surgical. Ugly, but effective.
Lessons Learned
- Avoid putting non-trivial logic at module scope, especially if it depends on Django settings. Wrap it in a function or lazy loader.
- If you're stuck with global state,
reload()
andpatch()
give you a way out - just be careful about cascading dependencies.
If you’ve ever had a test mysteriously fail after overriding settings, this might be why.