Skip to content

Commit c9186f9

Browse files
[world-postgres] Add migrations from pg-boss to graphile-worker queue (vercel#1126)
1 parent 1f9a67c commit c9186f9

4 files changed

Lines changed: 95 additions & 10 deletions

File tree

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
-- Migrate pending pg-boss jobs to a staging table before dropping the pgboss schema.
2+
-- The application code will re-enqueue these jobs into graphile-worker on first start.
3+
DO $$
4+
BEGIN
5+
IF EXISTS (SELECT 1 FROM information_schema.schemata WHERE schema_name = 'pgboss') THEN
6+
CREATE TABLE IF NOT EXISTS "workflow"."_pgboss_pending_jobs" (
7+
name text NOT NULL,
8+
data jsonb,
9+
singleton_key text,
10+
retry_limit integer
11+
);
12+
13+
INSERT INTO "workflow"."_pgboss_pending_jobs" (name, data, singleton_key, retry_limit)
14+
SELECT name, data, singleton_key, retry_limit
15+
FROM pgboss.job
16+
WHERE state IN ('created', 'retry');
17+
18+
DROP SCHEMA pgboss CASCADE;
19+
END IF;
20+
END $$;

packages/world-postgres/src/drizzle/migrations/meta/_journal.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@
5757
"when": 1769500000000,
5858
"tag": "0007_add_waits_table",
5959
"breakpoints": true
60+
},
61+
{
62+
"idx": 8,
63+
"version": "7",
64+
"when": 1770000000000,
65+
"tag": "0008_migrate_pgboss_to_graphile",
66+
"breakpoints": true
6067
}
6168
]
6269
}

packages/world-postgres/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export function createWorld(
3434
): World & { start(): Promise<void> } {
3535
const postgres = createPostgres(config.connectionString);
3636
const drizzle = createClient(postgres);
37-
const queue = createQueue(config);
37+
const queue = createQueue(config, postgres);
3838
const storage = createStorage(drizzle);
3939
const streamer = createStreamer(postgres, drizzle);
4040

packages/world-postgres/src/queue.ts

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ import { createLocalWorld } from '@workflow/world-local';
1111
import {
1212
Logger,
1313
makeWorkerUtils,
14-
run,
1514
type Runner,
15+
run,
1616
type WorkerUtils,
1717
} from 'graphile-worker';
18+
import type Postgres from 'postgres';
1819
import { monotonicFactory } from 'ulid';
19-
import { MessageData } from './message.js';
2020
import type { PostgresWorldConfig } from './config.js';
21+
import { MessageData } from './message.js';
2122

2223
// Redirect graphile-worker logs to stderr so CLI --json on stdout stays clean.
2324
// TODO: When CI=1 suppresses logging, replace with conditional stdout (e.g. log to stdout when not in JSON/CI mode).
@@ -43,7 +44,10 @@ export type PostgresQueue = Queue & {
4344
close(): Promise<void>;
4445
};
4546

46-
export function createQueue(config: PostgresWorldConfig): PostgresQueue {
47+
export function createQueue(
48+
config: PostgresWorldConfig,
49+
postgres: Postgres.Sql
50+
): PostgresQueue {
4751
const port = process.env.PORT ? Number(process.env.PORT) : undefined;
4852
const localWorld = createLocalWorld({ dataDir: undefined, port });
4953

@@ -66,15 +70,68 @@ export function createQueue(config: PostgresWorldConfig): PostgresQueue {
6670
let runner: Runner | null = null;
6771
let startPromise: Promise<void> | null = null;
6872

73+
async function migratePgBossJobs(utils: WorkerUtils): Promise<void> {
74+
// Scenario A: Drizzle migration already ran — staging table exists
75+
const hasStaging = await postgres`
76+
SELECT EXISTS (
77+
SELECT 1 FROM information_schema.tables
78+
WHERE table_schema = 'workflow'
79+
AND table_name = '_pgboss_pending_jobs'
80+
) AS exists
81+
`;
82+
if (hasStaging[0].exists) {
83+
const jobs = await postgres`
84+
SELECT name, data, singleton_key, retry_limit
85+
FROM "workflow"."_pgboss_pending_jobs"
86+
`;
87+
for (const job of jobs) {
88+
await utils.addJob(job.name, job.data as Record<string, unknown>, {
89+
jobKey: job.singleton_key ?? undefined,
90+
maxAttempts: job.retry_limit ?? 3,
91+
});
92+
}
93+
await postgres`DROP TABLE "workflow"."_pgboss_pending_jobs"`;
94+
return;
95+
}
96+
97+
// Scenario B: Drizzle migration didn't run — pgboss schema still exists
98+
const hasPgBoss = await postgres`
99+
SELECT EXISTS (
100+
SELECT 1 FROM information_schema.schemata
101+
WHERE schema_name = 'pgboss'
102+
) AS exists
103+
`;
104+
if (hasPgBoss[0].exists) {
105+
const jobs = await postgres`
106+
SELECT name, data, singleton_key, retry_limit
107+
FROM pgboss.job
108+
WHERE state IN ('created', 'retry')
109+
`;
110+
for (const job of jobs) {
111+
await utils.addJob(job.name, job.data as Record<string, unknown>, {
112+
jobKey: job.singleton_key ?? undefined,
113+
maxAttempts: job.retry_limit ?? 3,
114+
});
115+
}
116+
await postgres`DROP SCHEMA pgboss CASCADE`;
117+
}
118+
}
119+
69120
async function start(): Promise<void> {
70121
if (!startPromise) {
71122
startPromise = (async () => {
72-
workerUtils = await makeWorkerUtils({
73-
connectionString: config.connectionString,
74-
logger: stderrLogger,
75-
});
76-
await workerUtils.migrate();
77-
await setupListeners();
123+
try {
124+
workerUtils = await makeWorkerUtils({
125+
connectionString: config.connectionString,
126+
logger: stderrLogger,
127+
});
128+
await workerUtils.migrate();
129+
await migratePgBossJobs(workerUtils);
130+
await setupListeners();
131+
} catch (err) {
132+
startPromise = null;
133+
throw err;
134+
}
78135
})();
79136
}
80137
await startPromise;
@@ -155,6 +212,7 @@ export function createQueue(config: PostgresWorldConfig): PostgresQueue {
155212
await workerUtils.release();
156213
workerUtils = null;
157214
}
215+
startPromise = null;
158216
await localWorld.close?.();
159217
},
160218
};

0 commit comments

Comments
 (0)