Skip to content

Commit 3648109

Browse files
[world-postgres] [world-local] Execute Graphile jobs directly instead of defering to world-local queue (vercel#1334)
Signed-off-by: nathancolosimo <nathancolosimo@gmail.com> Co-authored-by: Peter Wielander <mittgfu@gmail.com>
1 parent 5040263 commit 3648109

10 files changed

Lines changed: 722 additions & 123 deletions

File tree

.changeset/cyan-cloths-clap.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@workflow/world-local": patch
3+
---
4+
5+
Refactor queue handler to separate queue behavior from route call behavior and export QueueExecutor.

.changeset/fine-lights-go.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@workflow/world-postgres": patch
3+
---
4+
5+
Execute Graphile jobs inline and durably reschedule delayed queue work.

packages/world-local/src/index.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@ export {
1717
parseVersion,
1818
} from './init.js';
1919

20-
export type { DirectHandler } from './queue.js';
20+
export {
21+
createQueueExecutor,
22+
type DirectHandler,
23+
type QueueExecutionRequest,
24+
type QueueExecutionResult,
25+
type QueueExecutor,
26+
} from './queue.js';
2127

2228
export type LocalWorld = World & {
2329
/** Register a direct in-process handler for a queue prefix, bypassing HTTP. */

packages/world-local/src/queue.test.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import type { StepInvokePayload } from '@workflow/world';
1+
import { JsonTransport } from '@vercel/queue';
2+
import { MessageId, type StepInvokePayload } from '@workflow/world';
23
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
3-
import { createQueue } from './queue';
4+
import { createQueue, createQueueExecutor } from './queue';
45

56
// Mock node:timers/promises so setTimeout resolves immediately
67
vi.mock('node:timers/promises', () => ({
@@ -95,6 +96,29 @@ describe('queue timeout re-enqueue', () => {
9596
expect(body).toEqual({ timeoutSeconds: 0 });
9697
});
9798

99+
it('executeMessage returns structured timeoutSeconds results', async () => {
100+
const executor = createQueueExecutor({ baseUrl: 'http://localhost:3000' });
101+
const transport = new JsonTransport();
102+
const handler = localQueue.createQueueHandler('__wkf_step_', async () => ({
103+
timeoutSeconds: 5,
104+
}));
105+
106+
executor.registerHandler('__wkf_step_', handler);
107+
108+
try {
109+
const result = await executor.executeMessage({
110+
queueName: '__wkf_step_test' as const,
111+
messageId: MessageId.parse('msg_01ABC'),
112+
attempt: 2,
113+
body: transport.serialize(stepPayload),
114+
});
115+
116+
expect(result).toEqual({ type: 'reschedule', timeoutSeconds: 5 });
117+
} finally {
118+
await executor.close();
119+
}
120+
});
121+
98122
it('queue retries when handler returns timeoutSeconds > 0', async () => {
99123
let callCount = 0;
100124
const handler = localQueue.createQueueHandler('__wkf_step_', async () => {

packages/world-local/src/queue.ts

Lines changed: 155 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,36 @@ const WORKFLOW_LOCAL_QUEUE_CONCURRENCY =
3030

3131
export type DirectHandler = (req: Request) => Promise<Response>;
3232

33+
export interface QueueExecutionRequest {
34+
queueName: ValidQueueName;
35+
messageId: MessageId;
36+
attempt: number;
37+
body: Uint8Array;
38+
headers?: Record<string, string>;
39+
}
40+
41+
export type QueueExecutionResult =
42+
| { type: 'completed' }
43+
| { type: 'reschedule'; timeoutSeconds: number }
44+
| {
45+
type: 'error';
46+
status: number;
47+
text: string;
48+
headers: Record<string, string>;
49+
};
50+
51+
export type QueueExecutor = {
52+
/** Execute a single queue message without enqueueing or sleeping. */
53+
executeMessage(request: QueueExecutionRequest): Promise<QueueExecutionResult>;
54+
/** Close the HTTP agent and release resources. */
55+
close(): Promise<void>;
56+
/** Register a direct in-process handler for a queue prefix, bypassing HTTP. */
57+
registerHandler(
58+
prefix: '__wkf_step_' | '__wkf_workflow_',
59+
handler: DirectHandler
60+
): void;
61+
};
62+
3363
export type LocalQueue = Queue & {
3464
/** Close the HTTP agent and release resources. */
3565
close(): Promise<void>;
@@ -40,7 +70,20 @@ export type LocalQueue = Queue & {
4070
): void;
4171
};
4272

43-
export function createQueue(config: Partial<Config>): LocalQueue {
73+
function getQueueRoute(queueName: ValidQueueName): {
74+
pathname: 'flow' | 'step';
75+
prefix: '__wkf_step_' | '__wkf_workflow_';
76+
} {
77+
if (queueName.startsWith('__wkf_step_')) {
78+
return { pathname: 'step', prefix: '__wkf_step_' };
79+
}
80+
if (queueName.startsWith('__wkf_workflow_')) {
81+
return { pathname: 'flow', prefix: '__wkf_workflow_' };
82+
}
83+
throw new Error('Unknown queue name prefix');
84+
}
85+
86+
export function createQueueExecutor(config: Partial<Config>): QueueExecutor {
4487
// Create a custom agent optimized for high-concurrency local workflows:
4588
// - headersTimeout: 0 allows long-running steps
4689
// - connections: 1000 allows many parallel connections to the same host
@@ -51,6 +94,88 @@ export function createQueue(config: Partial<Config>): LocalQueue {
5194
connections: 1000,
5295
keepAliveTimeout: 30_000,
5396
});
97+
98+
/** Direct in-process handlers by queue prefix, bypassing HTTP when set. */
99+
const directHandlers = new Map<string, DirectHandler>();
100+
101+
const executeMessage: QueueExecutor['executeMessage'] = async ({
102+
queueName,
103+
messageId,
104+
attempt,
105+
body,
106+
headers: extraHeaders,
107+
}) => {
108+
const { pathname, prefix } = getQueueRoute(queueName);
109+
const headers: Record<string, string> = {
110+
...extraHeaders,
111+
'content-type': 'application/json',
112+
'x-vqs-queue-name': queueName,
113+
'x-vqs-message-id': messageId,
114+
'x-vqs-message-attempt': String(attempt),
115+
};
116+
117+
const directHandler = directHandlers.get(prefix);
118+
let response: Response;
119+
if (directHandler) {
120+
// Wrap direct handlers in a Request so local execution and HTTP execution
121+
// share the same contract and response parsing.
122+
const req = new Request(
123+
'http://localhost/.well-known/workflow/v1/' + pathname,
124+
{
125+
method: 'POST',
126+
headers,
127+
body,
128+
}
129+
);
130+
response = await directHandler(req);
131+
} else {
132+
const baseUrl = await resolveBaseUrl(config);
133+
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit
134+
response = await fetch(`${baseUrl}/.well-known/workflow/v1/${pathname}`, {
135+
method: 'POST',
136+
duplex: 'half',
137+
dispatcher: httpAgent,
138+
headers,
139+
body,
140+
} as any);
141+
}
142+
143+
const text = await response.text();
144+
if (!response.ok) {
145+
return {
146+
type: 'error',
147+
status: response.status,
148+
text,
149+
headers: Object.fromEntries(response.headers.entries()),
150+
};
151+
}
152+
153+
try {
154+
const timeoutSeconds = Number(JSON.parse(text).timeoutSeconds);
155+
if (Number.isFinite(timeoutSeconds) && timeoutSeconds >= 0) {
156+
return { type: 'reschedule', timeoutSeconds };
157+
}
158+
} catch {}
159+
160+
return { type: 'completed' };
161+
};
162+
163+
return {
164+
executeMessage,
165+
registerHandler(
166+
prefix: '__wkf_step_' | '__wkf_workflow_',
167+
handler: DirectHandler
168+
) {
169+
directHandlers.set(prefix, handler);
170+
},
171+
async close() {
172+
await httpAgent.close();
173+
},
174+
};
175+
}
176+
177+
export function createQueue(config: Partial<Config>): LocalQueue {
178+
const executor = createQueueExecutor(config);
54179
const transport = new JsonTransport();
55180
const generateId = monotonicFactory();
56181
const semaphore = new Sema(WORKFLOW_LOCAL_QUEUE_CONCURRENCY);
@@ -61,9 +186,6 @@ export function createQueue(config: Partial<Config>): LocalQueue {
61186
*/
62187
const inflightMessages = new Map<string, MessageId>();
63188

64-
/** Direct in-process handlers by queue prefix, bypassing HTTP when set. */
65-
const directHandlers = new Map<string, DirectHandler>();
66-
67189
const queue: Queue['queue'] = async (queueName, message, opts) => {
68190
const cleanup = [] as (() => void)[];
69191

@@ -75,17 +197,7 @@ export function createQueue(config: Partial<Config>): LocalQueue {
75197
}
76198

77199
const body = transport.serialize(message);
78-
let pathname: string;
79-
let prefix: '__wkf_step_' | '__wkf_workflow_';
80-
if (queueName.startsWith('__wkf_step_')) {
81-
pathname = `step`;
82-
prefix = '__wkf_step_';
83-
} else if (queueName.startsWith('__wkf_workflow_')) {
84-
pathname = `flow`;
85-
prefix = '__wkf_workflow_';
86-
} else {
87-
throw new Error('Unknown queue name prefix');
88-
}
200+
getQueueRoute(queueName);
89201
const messageId = MessageId.parse(`msg_${generateId()}`);
90202

91203
if (opts?.idempotencyKey) {
@@ -106,73 +218,41 @@ export function createQueue(config: Partial<Config>): LocalQueue {
106218
}
107219
try {
108220
let defaultRetriesLeft = 3;
109-
const directHandler = directHandlers.get(prefix);
110221
for (let attempt = 0; defaultRetriesLeft > 0; attempt++) {
111222
defaultRetriesLeft--;
112223

113-
let response: Response;
114-
const headers: Record<string, string> = {
115-
...opts?.headers,
116-
'content-type': 'application/json',
117-
'x-vqs-queue-name': queueName,
118-
'x-vqs-message-id': messageId,
119-
'x-vqs-message-attempt': String(attempt + 1),
120-
};
121-
122-
if (directHandler) {
123-
// Call handler directly in-process, bypassing HTTP
124-
const req = new Request(
125-
'http://localhost/.well-known/workflow/v1/' + pathname,
126-
{
127-
method: 'POST',
128-
headers,
129-
body,
130-
}
131-
);
132-
response = await directHandler(req);
133-
} else {
134-
const baseUrl = await resolveBaseUrl(config);
135-
// eslint-disable-next-line @typescript-eslint/no-explicit-any -- undici v7 dispatcher types don't match @types/node's RequestInit
136-
response = await fetch(
137-
`${baseUrl}/.well-known/workflow/v1/${pathname}`,
138-
{
139-
method: 'POST',
140-
duplex: 'half',
141-
dispatcher: httpAgent,
142-
headers,
143-
body,
144-
} as any
145-
);
146-
}
224+
const result = await executor.executeMessage({
225+
queueName,
226+
messageId,
227+
attempt: attempt + 1,
228+
body,
229+
headers: opts?.headers,
230+
});
147231

148-
const text = await response.text();
149-
150-
if (response.ok) {
151-
try {
152-
const timeoutSeconds = Number(JSON.parse(text).timeoutSeconds);
153-
if (Number.isFinite(timeoutSeconds) && timeoutSeconds >= 0) {
154-
// Clamp to MAX_SAFE_TIMEOUT_MS to avoid Node.js setTimeout overflow warning.
155-
// When this fires early, the handler recalculates remaining time from
156-
// persistent state and returns another timeoutSeconds if needed.
157-
if (timeoutSeconds > 0) {
158-
const timeoutMs = Math.min(
159-
timeoutSeconds * 1000,
160-
MAX_SAFE_TIMEOUT_MS
161-
);
162-
await setTimeout(timeoutMs);
163-
}
164-
defaultRetriesLeft++;
165-
continue;
166-
}
167-
} catch {}
232+
if (result.type === 'completed') {
168233
return;
169234
}
170235

236+
if (result.type === 'reschedule') {
237+
// Clamp to MAX_SAFE_TIMEOUT_MS to avoid Node.js setTimeout overflow warning.
238+
// When this fires early, the handler recalculates remaining time from
239+
// persistent state and returns another timeoutSeconds if needed.
240+
if (result.timeoutSeconds > 0) {
241+
const timeoutMs = Math.min(
242+
result.timeoutSeconds * 1000,
243+
MAX_SAFE_TIMEOUT_MS
244+
);
245+
await setTimeout(timeoutMs);
246+
}
247+
defaultRetriesLeft++;
248+
continue;
249+
}
250+
171251
console.error(`[local world] Failed to queue message`, {
172252
queueName,
173-
text,
174-
status: response.status,
175-
headers: Object.fromEntries(response.headers.entries()),
253+
text: result.text,
254+
status: result.status,
255+
headers: result.headers,
176256
body: body.toString(),
177257
});
178258
}
@@ -263,14 +343,9 @@ export function createQueue(config: Partial<Config>): LocalQueue {
263343
queue,
264344
createQueueHandler,
265345
getDeploymentId,
266-
registerHandler(
267-
prefix: '__wkf_step_' | '__wkf_workflow_',
268-
handler: DirectHandler
269-
) {
270-
directHandlers.set(prefix, handler);
271-
},
346+
registerHandler: executor.registerHandler,
272347
async close() {
273-
await httpAgent.close();
348+
await executor.close();
274349
},
275350
};
276351
}

packages/world-postgres/HOW_IT_WORKS.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ If you want to use any other ORM, query builder or underlying database client, y
1212
graph LR
1313
Client --> PG[graphile-worker queue]
1414
PG --> Worker[Embedded Worker]
15-
Worker --> HTTP[HTTP fetch]
16-
HTTP --> EW[Local World]
15+
Worker --> Exec[Queue Executor]
16+
Exec --> Handler[Workflow or Step Handler]
1717
1818
PG -.-> F["${prefix}flows<br/>(workflows)"]
1919
PG -.-> S["${prefix}steps<br/>(steps)"]
2020
```
2121

22-
Jobs include retry logic (3 attempts), idempotency keys, and configurable worker concurrency (default: 10).
22+
Jobs include retry logic (3 attempts), idempotency keys, durable delayed rescheduling, and configurable worker concurrency (default: 10).
2323

2424
## Streaming
2525

@@ -33,7 +33,12 @@ Real-time data streaming via **PostgreSQL LISTEN/NOTIFY**:
3333

3434
## Setup
3535

36-
Call `world.start()` to initialize graphile-worker workers. When `.start()` is called, workers begin listening to graphile-worker queues. When a job arrives, workers make HTTP fetch calls to the local world endpoints (`.well-known/workflow/v1/flow` or `.well-known/workflow/v1/step`) to execute the actual workflow logic.
36+
Call `world.start()` to initialize graphile-worker workers. When `.start()` is called, workers begin listening to graphile-worker queues. When a job arrives, the worker executes the queue message directly through the shared queue executor and awaits completion before acknowledging the Graphile job.
37+
38+
When the runtime returns `{ timeoutSeconds }`, the worker schedules a new Graphile job with a future `runAt` time before finishing the current task.
39+
40+
The executor still falls back to the HTTP-compatible workflow endpoints (`.well-known/workflow/v1/flow` or `.well-known/workflow/v1/step`) when the route module has not been loaded yet.
41+
3742

3843
In **Next.js**, the `world.start()` call needs to be added to `instrumentation.ts|js` to ensure workers start before request handling. Use `workflow/runtime` for `getWorld` (same as the testing server and other framework plugins):
3944

0 commit comments

Comments
 (0)