asghonim@pgho_inbox

v0.0.3Created 2 days agoBy asghonim

pgho_inbox

A PostgreSQL extension (pg_tle) for inbound communications — a generic "contact us / inbox" framework. It owns the data model, validation, rate limiting, spam scoring, status machine, and event generation. Email delivery and external integrations are handled by workers that consume the notifications table or listen on the inbox NOTIFY channel.


Installation

Install via database.dev (requires the dbdev utility):

-- Install the extension
SELECT dbdev.install('asghonim@pgho_inbox');

-- (Re)create the schema and extension
DROP EXTENSION  IF EXISTS "asghonim@pgho_inbox";
DROP SCHEMA     IF EXISTS pgho_inbox;
CREATE SCHEMA   IF NOT EXISTS pgho_inbox;
CREATE EXTENSION IF NOT EXISTS "asghonim@pgho_inbox"
  SCHEMA pgho_inbox
  VERSION '0.0.3';

You can substitute any schema name you prefer for pgho_inbox.


Usage

Channels

Four channels are created by default: default, support, sales, and feedback. Create additional ones with:

SELECT pgho_inbox.create_channel(
    'billing',
    'Billing and payments',
    '{"rate_limit_max": 5, "notification_email": "billing@example.com", "spam_threshold": 40}'::jsonb
);

Channel settings keys:

KeyTypeDefaultDescription
notification_emailtextAddress inserted into the notification payload for workers
rate_limit_maxinteger10Max submissions per IP per hour
spam_thresholdinteger50Aggregate spam score that triggers is_spam = true

Submitting a message

submit() is the primary entry point. It validates, rate-limits, scores for spam, persists the message, queues a notification (non-spam only), and fires pg_notify('inbox', ...).

SELECT pgho_inbox.submit(
    p_name      => 'Jane Doe',
    p_email     => 'jane@example.com',
    p_body      => 'I have a question about my order.',
    p_channel   => 'support',       -- defaults to 'default'
    p_subject   => 'Order #1234',   -- optional
    p_phone     => '+1 555 0100',   -- optional
    p_source_ip => '1.2.3.4'::inet, -- enables rate limiting
    p_metadata  => '{"page": "/contact", "utm_source": "email"}'::jsonb
);
-- returns: uuid of the new message

Raises SQLSTATE 22023 for validation failures (missing fields, bad email, length exceeded) and SQLSTATE 54000 when the IP rate limit is hit.


Status machine

Messages move through: new → open → assigned → closed (and spam from any state).

-- Mark as open (e.g. an agent viewed it)
SELECT pgho_inbox.open_message('message-uuid', 'agent@example.com');

-- Assign to a team member
SELECT pgho_inbox.assign_message('message-uuid', 'alice@example.com', 'agent@example.com');

-- Close with an optional reason
SELECT pgho_inbox.close_message('message-uuid', 'alice@example.com', 'Resolved via email');

-- Reopen a closed message
SELECT pgho_inbox.reopen_message('message-uuid', 'alice@example.com');

-- Manually flag as spam (also cancels pending notifications)
SELECT pgho_inbox.mark_spam('message-uuid', 'agent@example.com');

Each call appends a row to events and fires pg_notify('inbox', ...).


Notes

Internal notes are attached to a message and do not trigger notifications.

SELECT pgho_inbox.add_note(
    'message-uuid',
    'alice@example.com',  -- author
    'Called the customer — following up Thursday.'
);
-- returns: uuid of the new note

Attachments

Register file metadata after uploading to your storage backend:

SELECT pgho_inbox.add_attachment(
    'message-uuid',
    's3',                       -- storage_provider
    'uploads/2024/order.pdf',   -- storage_key
    'application/pdf',          -- mime (optional)
    204800                      -- size in bytes (optional)
);
-- returns: uuid of the new attachment row

Spam rules

Rules are evaluated by submit() against each incoming message. A rule's score is added to the message's aggregate spam_score; reaching the channel's spam_threshold sets is_spam = true.

-- Add a rule (field:regex syntax)
INSERT INTO pgho_inbox.spam_rules (name, expression, score)
VALUES ('free_money', 'body:free money|earn \$\d+', 40);

-- Disable a rule without deleting it
UPDATE pgho_inbox.spam_rules SET enabled = false WHERE name = 'contains_url';

Supported field prefixes: body, email, subject, name. A bare regex (no prefix) matches against subject || ' ' || body.


Hooks

Hooks let you run arbitrary SQL functions at key lifecycle points without modifying the extension.

-- Register a hook
SELECT pgho_inbox.register_hook('after_insert', 'myschema.on_new_message');

-- Remove a hook
SELECT pgho_inbox.unregister_hook('after_insert', 'myschema.on_new_message');

The hook function must accept a single jsonb argument. Any exception raised in a before_insert hook aborts the submission; exceptions in other hooks emit a WARNING and are swallowed so they don't break the main flow.

Available events: before_insert, after_insert, after_close, after_spam.


Stale notification recovery

If a worker crashes mid-batch, its claimed rows stay in processing. Call this periodically (e.g. via pg_cron) to requeue them:

-- Requeue anything stuck in 'processing' for more than 30 minutes (default)
SELECT pgho_inbox.requeue_stale_notifications('15 minutes'::interval);

Security

All access to the pgho_inbox schema is revoked by default — no role other than the database owner can read tables, call functions, or use sequences. The calling application is responsible for granting only the privileges it requires.

Typical grants for a public-facing contact form:

-- Allow anonymous callers to submit messages
GRANT USAGE    ON SCHEMA pgho_inbox TO anon;
GRANT EXECUTE  ON FUNCTION pgho_inbox.submit TO anon;

Typical grants for an internal worker or admin API:

GRANT USAGE ON SCHEMA pgho_inbox TO authenticated;

-- Notification worker
GRANT EXECUTE ON FUNCTION pgho_inbox.claim_notifications           TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.mark_notification_sent        TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.mark_notification_failed      TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.requeue_stale_notifications   TO authenticated;

-- Inbox management
GRANT EXECUTE ON FUNCTION pgho_inbox.open_message    TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.assign_message  TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.close_message   TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.reopen_message  TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.mark_spam       TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.add_note        TO authenticated;
GRANT EXECUTE ON FUNCTION pgho_inbox.add_attachment  TO authenticated;

-- Read access to the data tables your application needs
GRANT SELECT ON TABLE pgho_inbox.messages    TO authenticated;
GRANT SELECT ON TABLE pgho_inbox.events      TO authenticated;
GRANT SELECT ON TABLE pgho_inbox.notes       TO authenticated;
GRANT SELECT ON TABLE pgho_inbox.channels    TO authenticated;
GRANT SELECT ON TABLE pgho_inbox.attachments TO authenticated;

Avoid exposing internal tables (rate_limits, notifications, hooks, spam_rules) unless your application has a specific need.


Node.js Worker Example

The extension fires pg_notify('inbox', ...) on every state change and exposes claim_notifications() for reliable batch pickup with SKIP LOCKED. The pattern below combines both: NOTIFY for instant wake-up, claim_notifications for reliable at-least-once delivery.

// inbox-worker.js
import pg from 'pg';
const { Pool, Client } = pg;

const DB_URL = process.env.DATABASE_URL ?? 'postgresql://postgres:postgres@localhost:54322/postgres';

// ─── notification handlers ─────────────────────────────────────────────────────

const handlers = {
  email: async (notification, client) => {
    console.log(`[email] to=${notification.metadata?.email}  message=${notification.message_id}`);
    // TODO: call your SMTP / transactional email service here
  },

  slack: async (notification, client) => {
    console.log(`[slack] message=${notification.message_id}`);
    // TODO: call Slack Web API here
  },

  webhook: async (notification, client) => {
    console.log(`[webhook] url=${notification.metadata?.url}  message=${notification.message_id}`);
    // TODO: call fetch() with notification.metadata.url
  },
};

// ─── process a claimed batch ───────────────────────────────────────────────────

async function processNotifications(pool) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');

    const { rows } = await client.query(
      `SELECT * FROM pgho_inbox.claim_notifications($1)`,
      [10]
    );

    for (const row of rows) {
      const handler = handlers[row.type];
      if (!handler) {
        console.warn(`[worker] no handler for type="${row.type}", skipping`);
        await client.query(
          `SELECT pgho_inbox.mark_notification_failed($1, $2)`,
          [row.id, `no handler registered for type "${row.type}"`]
        );
        continue;
      }

      try {
        await handler(row, client);
        await client.query(
          `SELECT pgho_inbox.mark_notification_sent($1)`,
          [row.id]
        );
      } catch (err) {
        console.error(`[worker] handler failed for ${row.id}:`, err.message);
        await client.query(
          `SELECT pgho_inbox.mark_notification_failed($1, $2)`,
          [row.id, err.message]
        );
      }
    }

    await client.query('COMMIT');

    if (rows.length > 0) {
      console.log(`[worker] processed ${rows.length} notification(s)`);
    }

    return rows.length;
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();
  }
}

// ─── LISTEN / NOTIFY watcher ───────────────────────────────────────────────────

async function startListener(pool) {
  // LISTEN requires a dedicated connection that isn't returned to the pool
  const listener = new Client({ connectionString: DB_URL });
  await listener.connect();

  listener.on('notification', async (msg) => {
    if (msg.channel !== 'inbox') return;

    let payload;
    try {
      payload = JSON.parse(msg.payload);
    } catch {
      console.warn('[listener] bad payload:', msg.payload);
      return;
    }

    console.log(`[listener] inbox event="${payload.event}"  message=${payload.message_id}  spam=${payload.is_spam}`);

    // Only process delivery notifications (not status-change events like message_opened).
    // claim_notifications() will pick up anything that was queued.
    if (!payload.is_spam) {
      try {
        await processNotifications(pool);
      } catch (err) {
        console.error('[worker] processNotifications error:', err.message);
      }
    }
  });

  listener.on('error', (err) => {
    console.error('[listener] connection error:', err.message);
    // Let the process crash and be restarted by your process manager (systemd/k8s/PM2).
    // This avoids silent listener death.
    process.exit(1);
  });

  await listener.query('LISTEN inbox');
  console.log('[listener] LISTEN inbox — ready');
  return listener;
}

// ─── startup sweep ─────────────────────────────────────────────────────────────
// Drain any notifications queued before this worker started (e.g. during restart).

async function drainPending(pool) {
  let total = 0;
  let batch;
  do {
    batch = await processNotifications(pool);
    total += batch;
  } while (batch > 0);
  if (total > 0) console.log(`[startup] drained ${total} pending notification(s)`);
}

// ─── main ──────────────────────────────────────────────────────────────────────

const pool = new Pool({ connectionString: DB_URL, max: 5 });

await drainPending(pool);
await startListener(pool);

How it fits together:

LayerWhat it does
LISTEN inboxWakes the worker instantly when any state change fires pg_notify
claim_notifications()Atomically claims rows using SKIP LOCKED — multiple workers are safe
mark_notification_sent/failedMoves each row through pending → processing → sent/failed
Startup drainCatches anything queued while the worker was down

Run it:

npm install pg
DATABASE_URL=postgresql://postgres:postgres@localhost:54322/postgres node inbox-worker.js

Add a handler for each type value your channels produce. The extension handles retry counting via mark_notification_failed (configurable max_attempts), so that logic stays out of the worker.

Install

  1. Install the dbdev CLI
  2. Generate migration:
dbdev add -o ./migrations -s extensions -v 0.0.3 package -n "asghonim@pgho_inbox"

Downloads

  • 16 all time downloads
  • 16 downloads in the last 30 days
  • 16 downloads in the last 90 days
  • 16 downloads in the last 180 days