asghonim@pgho_inbox
pgho_inbox
A PostgreSQL extension (pg_tle) for inbound communications — a generic "contact us / inbox" framework. It owns the data model, validation, rate limiting, spam scoring, status machine, and event generation. Email delivery and external integrations are handled by workers that consume the notifications table or listen on the inbox NOTIFY channel.
Installation
Install via database.dev (requires the dbdev utility):
-- Install the extension
SELECT dbdev.install('asghonim@pgho_inbox');
-- (Re)create the schema and extension
DROP EXTENSION IF EXISTS "asghonim@pgho_inbox";
DROP SCHEMA IF EXISTS pgho_inbox;
CREATE SCHEMA IF NOT EXISTS pgho_inbox;
CREATE EXTENSION IF NOT EXISTS "asghonim@pgho_inbox"
SCHEMA pgho_inbox
VERSION '0.0.3';
You can substitute any schema name you prefer for pgho_inbox.
Usage
Channels
Four channels are created by default: default, support, sales, and feedback. Create additional ones with:
SELECT pgho_inbox.create_channel(
'billing',
'Billing and payments',
'{"rate_limit_max": 5, "notification_email": "billing@example.com", "spam_threshold": 40}'::jsonb
);
Channel settings keys:
| Key | Type | Default | Description |
|---|---|---|---|
notification_email | text | — | Address inserted into the notification payload for workers |
rate_limit_max | integer | 10 | Max submissions per IP per hour |
spam_threshold | integer | 50 | Aggregate spam score that triggers is_spam = true |
Submitting a message
submit() is the primary entry point. It validates, rate-limits, scores for spam, persists the message, queues a notification (non-spam only), and fires pg_notify('inbox', ...).
SELECT pgho_inbox.submit(
p_name => 'Jane Doe',
p_email => 'jane@example.com',
p_body => 'I have a question about my order.',
p_channel => 'support', -- defaults to 'default'
p_subject => 'Order #1234', -- optional
p_phone => '+1 555 0100', -- optional
p_source_ip => '1.2.3.4'::inet, -- enables rate limiting
p_metadata => '{"page": "/contact", "utm_source": "email"}'::jsonb
);
-- returns: uuid of the new message
Raises SQLSTATE 22023 for validation failures (missing fields, bad email, length exceeded) and SQLSTATE 54000 when the IP rate limit is hit.
Status machine
Messages move through: new → open → assigned → closed (and spam from any state).
-- Mark as open (e.g. an agent viewed it)
SELECT pgho_inbox.open_message('message-uuid', 'agent@example.com');
-- Assign to a team member
SELECT pgho_inbox.assign_message('message-uuid', 'alice@example.com', 'agent@example.com');
-- Close with an optional reason
SELECT pgho_inbox.close_message('message-uuid', 'alice@example.com', 'Resolved via email');
-- Reopen a closed message
SELECT pgho_inbox.reopen_message('message-uuid', 'alice@example.com');
-- Manually flag as spam (also cancels pending notifications)
SELECT pgho_inbox.mark_spam('message-uuid', 'agent@example.com');
Each call appends a row to events and fires pg_notify('inbox', ...).
Notes
Internal notes are attached to a message and do not trigger notifications.
SELECT pgho_inbox.add_note(
'message-uuid',
'alice@example.com', -- author
'Called the customer — following up Thursday.'
);
-- returns: uuid of the new note
Attachments
Register file metadata after uploading to your storage backend:
SELECT pgho_inbox.add_attachment(
'message-uuid',
's3', -- storage_provider
'uploads/2024/order.pdf', -- storage_key
'application/pdf', -- mime (optional)
204800 -- size in bytes (optional)
);
-- returns: uuid of the new attachment row
Spam rules
Rules are evaluated by submit() against each incoming message. A rule's score is added to the message's aggregate spam_score; reaching the channel's spam_threshold sets is_spam = true.
-- Add a rule (field:regex syntax)
INSERT INTO pgho_inbox.spam_rules (name, expression, score)
VALUES ('free_money', 'body:free money|earn \$\d+', 40);
-- Disable a rule without deleting it
UPDATE pgho_inbox.spam_rules SET enabled = false WHERE name = 'contains_url';
Supported field prefixes: body, email, subject, name. A bare regex (no prefix) matches against subject || ' ' || body.
Hooks
Hooks let you run arbitrary SQL functions at key lifecycle points without modifying the extension.
-- Register a hook
SELECT pgho_inbox.register_hook('after_insert', 'myschema.on_new_message');
-- Remove a hook
SELECT pgho_inbox.unregister_hook('after_insert', 'myschema.on_new_message');
The hook function must accept a single jsonb argument. Any exception raised in a before_insert hook aborts the submission; exceptions in other hooks emit a WARNING and are swallowed so they don't break the main flow.
Available events: before_insert, after_insert, after_close, after_spam.
Stale notification recovery
If a worker crashes mid-batch, its claimed rows stay in processing. Call this periodically (e.g. via pg_cron) to requeue them:
-- Requeue anything stuck in 'processing' for more than 30 minutes (default)
SELECT pgho_inbox.requeue_stale_notifications('15 minutes'::interval);
Security
All access to the pgho_inbox schema is revoked by default — no role other than the database owner can read tables, call functions, or use sequences. The calling application is responsible for granting only the privileges it requires.
Typical grants for a public-facing contact form:
-- Allow anonymous callers to submit messages
GRANT USAGE ON SCHEMA pgho_inbox TO anon;
GRANT EXECUTE ON FUNCTION pgho_inbox.submit TO anon;
Typical grants for an internal worker or admin API:
GRANT USAGE ON SCHEMA pgho_inbox TO authenticated;
-- Notification worker
GRANT EXECUTE ON FUNCTION pgho_inbox.claim_notifications TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.mark_notification_sent TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.mark_notification_failed TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.requeue_stale_notifications TO authenticated;
-- Inbox management
GRANT EXECUTE ON FUNCTION pgho_inbox.open_message TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.assign_message TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.close_message TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.reopen_message TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.mark_spam TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.add_note TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.add_attachment TO authenticated;
-- Read access to the data tables your application needs
GRANT SELECT ON TABLE pgho_inbox.messages TO authenticated;
GRANT SELECT ON TABLE pgho_inbox.events TO authenticated;
GRANT SELECT ON TABLE pgho_inbox.notes TO authenticated;
GRANT SELECT ON TABLE pgho_inbox.channels TO authenticated;
GRANT SELECT ON TABLE pgho_inbox.attachments TO authenticated;
Avoid exposing internal tables (rate_limits, notifications, hooks, spam_rules) unless your application has a specific need.
Node.js Worker Example
The extension fires pg_notify('inbox', ...) on every state change and exposes claim_notifications() for reliable batch pickup with SKIP LOCKED. The pattern below combines both: NOTIFY for instant wake-up, claim_notifications for reliable at-least-once delivery.
// inbox-worker.js
import pg from 'pg';
const { Pool, Client } = pg;
const DB_URL = process.env.DATABASE_URL ?? 'postgresql://postgres:postgres@localhost:54322/postgres';
// ─── notification handlers ─────────────────────────────────────────────────────
const handlers = {
email: async (notification, client) => {
console.log(`[email] to=${notification.metadata?.email} message=${notification.message_id}`);
// TODO: call your SMTP / transactional email service here
},
slack: async (notification, client) => {
console.log(`[slack] message=${notification.message_id}`);
// TODO: call Slack Web API here
},
webhook: async (notification, client) => {
console.log(`[webhook] url=${notification.metadata?.url} message=${notification.message_id}`);
// TODO: call fetch() with notification.metadata.url
},
};
// ─── process a claimed batch ───────────────────────────────────────────────────
async function processNotifications(pool) {
const client = await pool.connect();
try {
await client.query('BEGIN');
const { rows } = await client.query(
`SELECT * FROM pgho_inbox.claim_notifications($1)`,
[10]
);
for (const row of rows) {
const handler = handlers[row.type];
if (!handler) {
console.warn(`[worker] no handler for type="${row.type}", skipping`);
await client.query(
`SELECT pgho_inbox.mark_notification_failed($1, $2)`,
[row.id, `no handler registered for type "${row.type}"`]
);
continue;
}
try {
await handler(row, client);
await client.query(
`SELECT pgho_inbox.mark_notification_sent($1)`,
[row.id]
);
} catch (err) {
console.error(`[worker] handler failed for ${row.id}:`, err.message);
await client.query(
`SELECT pgho_inbox.mark_notification_failed($1, $2)`,
[row.id, err.message]
);
}
}
await client.query('COMMIT');
if (rows.length > 0) {
console.log(`[worker] processed ${rows.length} notification(s)`);
}
return rows.length;
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
// ─── LISTEN / NOTIFY watcher ───────────────────────────────────────────────────
async function startListener(pool) {
// LISTEN requires a dedicated connection that isn't returned to the pool
const listener = new Client({ connectionString: DB_URL });
await listener.connect();
listener.on('notification', async (msg) => {
if (msg.channel !== 'inbox') return;
let payload;
try {
payload = JSON.parse(msg.payload);
} catch {
console.warn('[listener] bad payload:', msg.payload);
return;
}
console.log(`[listener] inbox event="${payload.event}" message=${payload.message_id} spam=${payload.is_spam}`);
// Only process delivery notifications (not status-change events like message_opened).
// claim_notifications() will pick up anything that was queued.
if (!payload.is_spam) {
try {
await processNotifications(pool);
} catch (err) {
console.error('[worker] processNotifications error:', err.message);
}
}
});
listener.on('error', (err) => {
console.error('[listener] connection error:', err.message);
// Let the process crash and be restarted by your process manager (systemd/k8s/PM2).
// This avoids silent listener death.
process.exit(1);
});
await listener.query('LISTEN inbox');
console.log('[listener] LISTEN inbox — ready');
return listener;
}
// ─── startup sweep ─────────────────────────────────────────────────────────────
// Drain any notifications queued before this worker started (e.g. during restart).
async function drainPending(pool) {
let total = 0;
let batch;
do {
batch = await processNotifications(pool);
total += batch;
} while (batch > 0);
if (total > 0) console.log(`[startup] drained ${total} pending notification(s)`);
}
// ─── main ──────────────────────────────────────────────────────────────────────
const pool = new Pool({ connectionString: DB_URL, max: 5 });
await drainPending(pool);
await startListener(pool);
How it fits together:
| Layer | What it does |
|---|---|
LISTEN inbox | Wakes the worker instantly when any state change fires pg_notify |
claim_notifications() | Atomically claims rows using SKIP LOCKED — multiple workers are safe |
mark_notification_sent/failed | Moves each row through pending → processing → sent/failed |
| Startup drain | Catches anything queued while the worker was down |
Run it:
npm install pg
DATABASE_URL=postgresql://postgres:postgres@localhost:54322/postgres node inbox-worker.js
Add a handler for each type value your channels produce. The extension handles retry counting via mark_notification_failed (configurable max_attempts), so that logic stays out of the worker.
Install
- Install the
dbdevCLI - Generate migration:
dbdev add -o ./migrations -s extensions -v 0.0.3 package -n "asghonim@pgho_inbox"
Downloads
- 16 all time downloads
- 16 downloads in the last 30 days
- 16 downloads in the last 90 days
- 16 downloads in the last 180 days