PostgreSQL Pub/Sub with LISTEN/NOTIFY: Replace Redis Pub/Sub
The brief was updated and a notification sent to the illustrator. He was not listening at the time. The message has been lost. We are told this is by design.
What LISTEN/NOTIFY actually is
Good evening. I should like to introduce you to a capability that has been living in your PostgreSQL installation, patiently waiting to be noticed. It is a built-in publish/subscribe messaging system, and most developers do not know it exists.
The API consists of two commands:
NOTIFY channel_name, 'payload'; -- publish a message
LISTEN channel_name; -- subscribe to messages That is the entire interface. Two commands, and the formalities are complete. NOTIFY sends a message on a named channel. Every connection that has issued LISTEN on that channel receives the message asynchronously. This is true pub/sub fan-out — not polling, not a queue, not a request/response pattern.
Notifications are delivered via the PostgreSQL wire protocol. A listening connection does not need to actively poll — the database pushes the notification when it arrives. The fundamental model:
- Channels are named strings — any text is valid
- Payloads are text, up to 8,000 bytes
- Delivery is at-most-once to all connected listeners
- Ordering is guaranteed per-channel within a session
Common use cases: cache invalidation signals, job queue wakeups, real-time UI updates, inter-service coordination, and materialized view refresh triggers.
For teams using Symfony, the framework has built-in support for LISTEN/NOTIFY as a Messenger transport — see the Symfony Messenger guide.
Basic LISTEN/NOTIFY — the foundation
Sending notifications
The simplest notification has no payload:
NOTIFY my_channel; More commonly, notifications carry a JSON payload with structured data:
NOTIFY my_channel, '{"user_id": 42, "action": "updated"}'; The function form pg_notify() is usable inside triggers, functions, and dynamic SQL:
SELECT pg_notify('my_channel', '{"user_id": 42, "action": "updated"}'); Transactional delivery. This is where LISTEN/NOTIFY earns its keep. Notifications are delivered only when the sending transaction commits. If the transaction is rolled back, the notification is never sent. Listeners never receive a notification for data that was subsequently rolled back — a guarantee that Redis pub/sub simply cannot provide.
BEGIN;
UPDATE users SET email = 'new@example.com' WHERE id = 42;
NOTIFY users_updated, '{"id": 42}';
-- If we ROLLBACK here, the notification is never sent
COMMIT; -- Notification delivered only now Receiving notifications
A connection subscribes to a channel with LISTEN. How the connection consumes notifications depends on the client library. Below are complete working examples in several languages.
Python (psycopg3 — async generator)
import asyncio
import psycopg
async def listen():
conn = await psycopg.AsyncConnection.connect("postgresql://localhost/mydb", autocommit=True)
await conn.execute("LISTEN my_channel")
async for notify in conn.notifies():
print(f"Channel: {notify.channel}, Payload: {notify.payload}")
asyncio.run(listen()) Python (asyncpg — callback)
import asyncio
import asyncpg
async def listen():
conn = await asyncpg.connect("postgresql://localhost/mydb")
def on_notification(conn, pid, channel, payload):
print(f"Channel: {channel}, Payload: {payload}")
await conn.add_listener('my_channel', on_notification)
# Keep the connection alive
while True:
await asyncio.sleep(1)
asyncio.run(listen()) Node.js (pg)
const { Client } = require('pg');
const client = new Client({ connectionString: 'postgresql://localhost/mydb' });
async function listen() {
await client.connect();
await client.query('LISTEN my_channel');
client.on('notification', (msg) => {
console.log(`Channel: ${msg.channel}, Payload: ${msg.payload}`);
});
}
listen(); Ruby (pg gem)
require 'pg'
conn = PG.connect('postgresql://localhost/mydb')
conn.exec('LISTEN my_channel')
loop do
conn.wait_for_notify(10) do |channel, pid, payload|
puts "Channel: #{channel}, Payload: #{payload}"
end
end Go (pgx)
package main
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
)
func main() {
ctx := context.Background()
conn, _ := pgx.Connect(ctx, "postgresql://localhost/mydb")
defer conn.Close(ctx)
conn.Exec(ctx, "LISTEN my_channel")
for {
notification, _ := conn.WaitForNotification(ctx)
fmt.Printf("Channel: %s, Payload: %s\n", notification.Channel, notification.Payload)
}
} To stop listening:
UNLISTEN my_channel; -- stop listening on a specific channel
UNLISTEN *; -- stop listening on all channels Triggering notifications from data changes
If you'll permit me, this is the pattern I find most rewarding. A trigger that automatically sends notifications when data changes — the database itself announces what has happened, without your application needing to remember to do so:
CREATE OR REPLACE FUNCTION notify_change()
RETURNS TRIGGER AS $$
DECLARE
payload TEXT;
BEGIN
payload := json_build_object(
'table', TG_TABLE_NAME,
'operation', TG_OP,
'id', CASE
WHEN TG_OP = 'DELETE' THEN OLD.id
ELSE NEW.id
END
)::text;
PERFORM pg_notify(TG_TABLE_NAME || '_changes', payload);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER orders_notify
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION notify_change(); When a row in the orders table is inserted, updated, or deleted, the trigger fires and sends a notification on the orders_changes channel with a JSON payload containing the table name, the operation, and the affected row's ID.
The trigger runs inside the transaction — the notification is delivered on COMMIT, guaranteeing that listeners only learn about changes that actually persisted.
Payload construction. I would recommend a minimal footprint: include the ID and operation, not the full row. Use row_to_json(NEW) only if you need the full row and it fits within the 8KB limit. For most use cases, sending just the ID and letting the listener query for details is simpler and avoids payload size concerns entirely.
Connection management — the critical detail
Why LISTEN requires a dedicated connection
I'm afraid this section requires your particular attention, as it is the most common source of difficulty.
A connection running LISTEN must stay open to receive notifications. This is the most important operational detail, and getting it wrong is the most common reason teams abandon LISTEN/NOTIFY prematurely.
In a typical application, database connections are managed by a connection pool — borrowed for a query, returned after the query completes. A listening connection cannot participate in this pattern. It must remain open and dedicated to receiving notifications.
PgBouncer compatibility. PgBouncer in transaction pooling mode (the most common configuration) does not forward NOTIFY messages to clients. When a transaction completes and the connection is returned to the pool, the LISTEN registration is lost. PgBouncer in session pooling mode does pass notifications, but session mode eliminates most of PgBouncer's pooling benefits.
The solution is straightforward: maintain one (or a few) dedicated connections directly to PostgreSQL — bypassing the connection pooler — specifically for LISTEN. All other database traffic goes through PgBouncer or your connection pool as usual.
Reconnection and missed messages
I should be forthcoming about this: if the listening connection drops — due to a network issue, a PostgreSQL restart, or a server failover — all notifications sent during the disconnection are lost. LISTEN/NOTIFY has no persistence, no replay buffer, and no acknowledgment mechanism.
The reconnection strategy:
- Detect the disconnection (connection error, heartbeat timeout)
- Reconnect to PostgreSQL
- Re-issue all
LISTENcommands - Perform a full state check — query the table for recent changes to catch anything missed during the disconnection
async def listen_with_reconnect(dsn, channels):
while True:
try:
conn = await asyncpg.connect(dsn)
for channel in channels:
await conn.add_listener(channel, on_notification)
# On reconnect, reconcile state
await reconcile_cache(conn)
# Listen until disconnected
while True:
await asyncio.sleep(60) # Heartbeat interval
except (asyncpg.ConnectionDoesNotExistError, OSError):
print("Connection lost, reconnecting in 1s...")
await asyncio.sleep(1) This is the fundamental trade-off compared to Redis Streams or a dedicated message queue: LISTEN/NOTIFY is fire-and-forget. For most use cases — cache invalidation, UI refresh hints, job queue wakeups — missing an occasional notification is harmless. The next notification or a periodic reconciliation catches it.
For use cases where every message must be delivered and processed exactly once, LISTEN/NOTIFY is not the right tool. I would direct you instead to a persistent queue — the SKIP LOCKED pattern from the companion article, or an external message broker.
Connection libraries and async patterns
Each language ecosystem handles LISTEN/NOTIFY differently:
| Library | Pattern | Notes |
|---|---|---|
| Python psycopg3 | Async generator (connection.notifies()) | Clean, Pythonic. Requires autocommit=True. |
| Python asyncpg | Callback (connection.add_listener()) | Fully async, fits asyncio naturally. |
| Node.js pg | Event emitter (client.on('notification')) | Fits the Node event loop. |
| Ruby pg | Blocking poll (connection.wait_for_notify(timeout)) | Run in a dedicated thread. |
| Go pgx | Blocking with context (conn.WaitForNotification(ctx)) | Context cancellation for graceful shutdown. |
| Java JDBC | Polling (PGConnection.getNotifications()) | Must call periodically — no push delivery. |
The Java JDBC case deserves a note: JDBC does not support push notifications. You must periodically call getNotifications() to check for pending notifications. This makes Java the least natural fit for LISTEN/NOTIFY.
Payload patterns
JSON payloads — the standard approach
JSON is the natural payload format. Every language has native JSON parsing, and JSON makes the channel protocol self-documenting:
-- Trigger sends structured JSON
PERFORM pg_notify(
'orders_changes',
json_build_object(
'id', NEW.id,
'status', NEW.status,
'customer_id', NEW.customer_id,
'updated_at', now()
)::text
); # Listener parses JSON
import json
def on_notification(conn, pid, channel, payload):
data = json.loads(payload)
print(f"Order {data['id']} status changed to {data['status']}") Keep payloads small. Include IDs and metadata — not full row data. Let the listener query for the complete record if it needs more than what the notification carries.
The 8KB payload limit
PostgreSQL enforces an 8,000-byte limit on NOTIFY payloads. This is a hard limit, not configurable. For most use cases, 8KB is generous. A JSON payload with a dozen fields typically fits in a few hundred bytes.
When 8KB is not enough:
-- Anti-pattern: serializing a full row with large text columns
PERFORM pg_notify('orders_changes', row_to_json(NEW)::text); -- may exceed 8KB
-- Better: send only the ID, let the listener fetch the full record
PERFORM pg_notify('orders_changes', json_build_object('id', NEW.id, 'op', TG_OP)::text); For genuinely large payloads, use an outbox pattern — write the payload to a table and send just the row ID via NOTIFY:
-- Write the full payload to an outbox table
INSERT INTO notification_outbox (channel, payload)
VALUES ('orders_changes', row_to_json(NEW)::text)
RETURNING id INTO outbox_id;
-- Send just the outbox ID
PERFORM pg_notify('orders_changes', json_build_object('outbox_id', outbox_id)::text); Channel naming conventions
Use descriptive, namespaced channel names:
-- Per-table, per-operation channels
LISTEN orders_insert;
LISTEN orders_update;
LISTEN orders_delete;
-- Per-table aggregate channels
LISTEN orders_changes;
-- Domain-specific channels
LISTEN cache_invalidation;
LISTEN report_ready; Per-entity channels are useful when a specific client cares about a specific record:
-- A client watching a specific order
LISTEN order_42;
-- The trigger sends to the entity-specific channel
PERFORM pg_notify('order_' || NEW.id::text, payload); Wildcard limitation. PostgreSQL has no wildcard LISTEN. You cannot LISTEN orders_* to subscribe to all order channels. Each channel must be explicitly subscribed. If you need pattern-based subscriptions, use a single aggregate channel (orders_changes) and filter on the payload content in the listener.
Fan-out patterns
One-to-many — broadcasting to all listeners
Every connection listening on a channel receives every notification — this is native fan-out with no additional configuration.
Use case: cache invalidation across multiple application servers. When any server modifies data, all other servers learn about it immediately — no polling interval, no stale cache TTL.
Many-to-one — aggregating signals
Multiple writers send notifications on the same channel; a single listener processes them.
Use case: job queue wakeup. Any request handler can enqueue a job and send NOTIFY. A dedicated worker process listens and wakes up to process the new job:
-- Any request handler (writer)
INSERT INTO jobs (queue, payload) VALUES ('default', '{"task": "send_email"}');
NOTIFY jobs_available, 'default'; # Single worker (listener)
async def on_job_notify(conn, pid, channel, payload):
# Wake up and check for new jobs
job = await dequeue(payload) # payload = queue name
if job:
await process(job) This is the LISTEN/NOTIFY wake pattern described in the SKIP LOCKED job queue article. Combining NOTIFY for low-latency wakeup with polling as a fallback gives you the best of both approaches.
Scaling listeners
Each listener is a database connection. For 5–10 application servers, the overhead is trivial. PostgreSQL handles the fan-out internally with minimal cost.
For 100+ listeners on the same channel, the sending transaction's commit time grows linearly with the listener count. This is rarely a problem in practice, but at extreme scale, it can add measurable latency to commits.
For thousands of listeners (e.g., WebSocket connections), use a relay pattern: one dedicated process listens on PostgreSQL and rebroadcasts via an application-level mechanism. The relay process is the single listener as far as PostgreSQL is concerned. Fan-out to end clients happens at the application layer, which is better suited for managing thousands of connections.
Real-world implementation — cache invalidation
Allow me to walk through a complete, end-to-end example: an application with an in-memory cache that needs to invalidate across multiple servers. This is where LISTEN/NOTIFY truly shines.
Step 1: The trigger
CREATE OR REPLACE FUNCTION notify_cache_invalidation()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'cache_invalidation',
json_build_object(
'table', TG_TABLE_NAME,
'operation', TG_OP,
'id', CASE WHEN TG_OP = 'DELETE' THEN OLD.id ELSE NEW.id END
)::text
);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
-- Apply to all tables that are cached
CREATE TRIGGER products_cache_notify
AFTER INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION notify_cache_invalidation();
CREATE TRIGGER users_cache_notify
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_cache_invalidation(); Step 2: The listener (Python + asyncpg)
import asyncio
import asyncpg
import json
class CacheInvalidator:
def __init__(self, dsn):
self.dsn = dsn
self.cache = {} # Simple in-memory cache
self.conn = None
async def start(self):
await self.connect()
async def connect(self):
while True:
try:
self.conn = await asyncpg.connect(self.dsn)
await self.conn.add_listener('cache_invalidation', self.on_notification)
# On reconnect, rebuild the full cache
await self.rebuild_cache()
# Keep alive with periodic health checks
while True:
await asyncio.sleep(30)
await self.conn.fetchval('SELECT 1') # Health check
except (asyncpg.ConnectionDoesNotExistError, OSError) as e:
print(f"Connection lost: {e}. Reconnecting...")
self.conn = None
await asyncio.sleep(1)
def on_notification(self, conn, pid, channel, payload):
data = json.loads(payload)
cache_key = f"{data['table']}:{data['id']}"
if data['operation'] == 'DELETE':
self.cache.pop(cache_key, None)
else:
# Invalidate — next read will fetch fresh data
self.cache.pop(cache_key, None)
print(f"Invalidated cache key: {cache_key}")
async def rebuild_cache(self):
"""Full cache rebuild on reconnect to catch missed notifications."""
self.cache.clear()
print("Cache cleared — will be populated on demand")
def get(self, table, id):
"""Get from cache, or return None (caller fetches from DB)."""
return self.cache.get(f"{table}:{id}")
def set(self, table, id, value):
"""Populate cache after a DB read."""
self.cache[f"{table}:{id}"] = value Step 3: Before and after
Without LISTEN/NOTIFY: Each application server caches data with a TTL (e.g., 5 minutes). When Server A updates a product, Servers B, C, and D continue serving stale data until the TTL expires. Users see inconsistent data depending on which server handles their request.
With LISTEN/NOTIFY: When Server A updates a product, the trigger fires a notification. Servers B, C, and D receive it within milliseconds and invalidate their cache entry. The next request to any server fetches fresh data. Stale data window drops from minutes to milliseconds.
The safety net: if a notification is missed (network hiccup, brief disconnection), the reconnection handler clears the entire cache. The cache rebuilds on demand from fresh database reads. No stale data persists beyond the reconnection.
When LISTEN/NOTIFY is not enough
A butler who overstates his case is no butler at all. LISTEN/NOTIFY is a signaling mechanism, not a message queue. It excels at "something changed, go look" — it is not designed for "here is the data, process it exactly once."
No persistence. If no connections are listening when a NOTIFY is sent, the message is silently discarded.
No message history. There is no way to query past notifications. If you need an audit trail or event log, write events to a table.
8KB payload limit. Not configurable, not extendable.
No acknowledgment. The sender does not know if anyone received the message.
No routing. Messages go to all listeners on the channel. There is no selective delivery based on message content, no consumer groups, no round-robin distribution.
Connection requirement. Listeners must maintain a persistent connection to PostgreSQL. This is incompatible with serverless functions that spin up and down.
No pattern subscriptions. You cannot LISTEN orders_*. Redis pub/sub supports PSUBSCRIBE patterns; PostgreSQL does not.
For use cases that need any of these features, use a persistent queue (the SKIP LOCKED pattern, Redis Streams, RabbitMQ, or Amazon SQS).
Comparison with Redis Pub/Sub
A frank comparison is in order. Both systems have genuine strengths, and the right choice depends on your specific requirements.
| Feature | PostgreSQL LISTEN/NOTIFY | Redis Pub/Sub |
|---|---|---|
| Delivery guarantee | At-most-once | At-most-once |
| Persistence | None | None (Streams add persistence) |
| Transactional delivery | Yes (delivered on COMMIT) | No |
| Pattern subscriptions | No | Yes (PSUBSCRIBE) |
| Payload limit | 8,000 bytes | 512 MB |
| Throughput | Thousands/second | Hundreds of thousands/second |
| Connection pooler compatibility | Session mode only | N/A (native client) |
| Infrastructure | Existing PostgreSQL | Requires Redis instance |
PostgreSQL's advantage: transactional delivery. Notifications are delivered only when the sending transaction commits. If a BEGIN; UPDATE; NOTIFY; ROLLBACK; happens, no notification is sent. This guarantee is impossible in Redis pub/sub because Redis has no concept of application-level transactions.
Redis's advantage: scale and flexibility. Redis supports pattern subscriptions (PSUBSCRIBE orders.*), has no practical payload size limit, handles higher message throughput, and has no connection pooler restrictions.
The decision framework: If you already have PostgreSQL and your use case is signaling — cache invalidation, job wakeup, live UI refresh — LISTEN/NOTIFY is the right choice. If you need high-throughput messaging, pattern routing, or large payloads, Redis pub/sub is the right choice.
For a broader perspective on replacing Redis with PostgreSQL, see the Redis alternatives overview and Why You Don't Need Redis.
What Gold Lapel observes
LISTEN/NOTIFY traffic flows through the PostgreSQL wire protocol, and Gold Lapel sees it as part of the query stream passing through its proxy. Applications that use LISTEN/NOTIFY for cache invalidation often pair it with materialized view refreshes or frequently-executed queries — exactly the patterns where a query-aware proxy adds the most value.
Gold Lapel does not modify NOTIFY behavior. It observes your application's query patterns and optimizes the reads that follow invalidation signals — ensuring that the fresh data fetched after a cache miss is served as efficiently as possible.