Your PostgreSQL database is already a job queue.
PgQueuer turns PostgreSQL into a fast, reliable background job processor. Jobs live in the same database as your application data. One stack, full ACID guarantees, and no separate message broker to run.
- 💡 Minimal footprint: one
pip install; bring your existing PostgreSQL connection and start enqueueing - 🔁 Transactional enqueue: commit a job in the same transaction as your data; no dual-write drift
- ⚛️ Safe concurrency: workers claim jobs with
FOR UPDATE SKIP LOCKED(never double-processed), with per-entrypoint limits and serialized dispatch when you need them - 🚀 Instant dispatch:
LISTEN/NOTIFYwakes workers the moment a job lands (with a polling fallback) - ⏰ Scheduling & deferral: cron-style recurring tasks and
execute_after, no separate scheduler process - 📊 Observability: completion tracking, Prometheus metrics, tracing (Logfire/Sentry), and a live dashboard
- 🧪 In-memory mode: run the whole queue without Postgres for tests and prototyping
If you already run PostgreSQL, it can do double duty as your job queue. That means one fewer service to operate, and your queue and data stay consistent because they share the same database and transactions.
┌──────────┐ enqueue ┌────────────┐ NOTIFY ┌──────────┐
│ Your App │───────────▶│ │──────────▶│ Worker 1 │──┐
└──────────┘ │ │ └──────────┘ │
│ PostgreSQL │ NOTIFY ┌──────────┐ │
│ │──────────▶│ Worker 2 │──┤
│ │ └──────────┘ │
│ │ NOTIFY ┌──────────┐ │
│ │──────────▶│ Worker N │──┤
└────────────┘ └──────────┘ │
▲ FOR UPDATE SKIP LOCKED │
└─────────────────────────────────┘
PgQueuer targets Python 3.11+ and PostgreSQL 12+:
pip install pgqueuer
pgq install # create tables and functions in your databaseThe CLI reads PGHOST, PGUSER, PGDATABASE and related environment variables. Use pgq install --dry-run to preview SQL, --prefix myapp_ to namespace tables, or pgq uninstall to remove the schema.
PgQueuer pairs consumers (workers that process jobs) with producers (code that enqueues jobs).
Each entrypoint is a job handler. Run it with the CLI: pgq run examples.consumer:main.
import asyncpg
from pgqueuer import PgQueuer
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job
async def main() -> PgQueuer:
connection = await asyncpg.connect()
pgq = PgQueuer(AsyncpgDriver(connection))
@pgq.entrypoint("fetch")
async def process(job: Job) -> None:
print(f"Processed: {job!r}")
return pgqFrom your web app, script, or anywhere else with a database connection:
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
async def main() -> None:
connection = await asyncpg.connect()
queries = Queries(AsyncpgDriver(connection))
await queries.enqueue("fetch", b"hello world")The job arrives instantly via LISTEN/NOTIFY, and your consumer's process handler picks it up.
This is what a database-backed queue buys you: the job and your business data commit together, or not at all.
order_id = 42
async with connection.transaction():
await connection.execute(
"INSERT INTO orders (id, status) VALUES ($1, 'paid')", order_id
)
await queries.enqueue("send_receipt", str(order_id).encode())
# If the transaction rolls back, the job is never enqueued.PgQueuer.in_memory() is a drop-in replacement that implements the same ports as the real backend, so your handlers stay identical. Good for unit tests and prototyping.
import asyncio
from pgqueuer import PgQueuer
from pgqueuer.models import Job
from pgqueuer.domain.types import QueueExecutionMode
async def main() -> None:
pq = PgQueuer.in_memory()
@pq.entrypoint("send_email")
async def send_email(job: Job) -> None:
print(f"Sending: {job.payload!r}")
await pq.qm.queries.enqueue(["send_email"], [b"alice"], [0])
await pq.qm.run(mode=QueueExecutionMode.drain)
asyncio.run(main())The in-memory adapter has no durability or multi-process coordination, so use the PostgreSQL backend for production. See the in-memory reference.
| Topic | What's inside |
|---|---|
| Core concepts | Consumers, producers, entrypoints, the job lifecycle |
| Scheduling | Cron-style recurring tasks and deferred execution |
| Concurrency control | Per-entrypoint limits and serialized dispatch |
| Completion tracking | Wait for jobs to finish with CompletionWatcher |
| Shared resources | Inject DB pools, HTTP clients, and models into handlers |
| Custom executors | Retry strategies and exponential backoff |
| Drivers | asyncpg, psycopg async/sync: choosing and configuring |
| Architecture | Ports & adapters, SKIP LOCKED, design decisions |
| Observability | Prometheus metrics, tracing, and the dashboard |
| Framework integration | FastAPI (example) and Flask (example) |
Launch the interactive dashboard to watch queue activity in real time:
pgq dashboard --interval 10 --tail 25+---------------------------+-------+------------+--------------------------+------------+----------+
| Created | Count | Entrypoint | Time in Queue (HH:MM:SS) | Status | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 | 49 | sync | 0:00:01 | successful | 0 |
| 2024-05-05 16:44:27+00:00 | 12 | fetch | 0:00:03 | queued | 0 |
| 2024-05-05 16:44:28+00:00 | 3 | api_call | 0:00:00 | picked | 5 |
+---------------------------+-------+------------+--------------------------+------------+----------+
PgQueuer uses Testcontainers to spin up an ephemeral PostgreSQL instance for the test suite. Just have Docker running.
uv sync --all-extras # install dependencies
make check # lint, type-check, and run the test suitePgQueuer is MIT licensed. See LICENSE for details.