Skip to content

Commit 9feebee

Browse files
[world-local] Enforce hook token uniqueness and atomicity, matches other worlds (vercel#1348)
1 parent 58e67ce commit 9feebee

6 files changed

Lines changed: 101 additions & 15 deletions

File tree

.changeset/fruity-shirts-shave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@workflow/world-local": patch
3+
---
4+
5+
Ensure atomicity for hook token, matches world-postgres and world-vercel

packages/world-local/src/fs.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,27 @@ export async function deleteJSON(filePath: string): Promise<void> {
187187
}
188188
}
189189

190+
/**
191+
* Atomically create a file using O_CREAT | O_EXCL flags.
192+
* Returns true if the file was created, false if it already exists.
193+
* This is atomic at the OS level, safe for concurrent access.
194+
*/
195+
export async function writeExclusive(
196+
filePath: string,
197+
data: string
198+
): Promise<boolean> {
199+
await ensureDir(path.dirname(filePath));
200+
try {
201+
await fs.writeFile(filePath, data, { flag: 'wx' });
202+
return true;
203+
} catch (error: any) {
204+
if (error.code === 'EEXIST') {
205+
return false;
206+
}
207+
throw error;
208+
}
209+
}
210+
190211
export async function listJSONFiles(dirPath: string): Promise<string[]> {
191212
return listFilesByExtension(dirPath, '.json');
192213
}

packages/world-local/src/storage.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,6 +1300,34 @@ describe('Storage', () => {
13001300
expect((result.event as any).eventData.token).toBe(token);
13011301
expect(result.hook).toBeUndefined();
13021302
});
1303+
1304+
it('should reject concurrent creates for the same token atomically', async () => {
1305+
const token = 'concurrent-token';
1306+
1307+
// Fire 5 concurrent hook creations with the same token
1308+
const results = await Promise.allSettled(
1309+
Array.from({ length: 5 }, (_, i) =>
1310+
storage.events.create(testRunId, {
1311+
eventType: 'hook_created',
1312+
correlationId: `concurrent_hook_${i}`,
1313+
eventData: { token },
1314+
})
1315+
)
1316+
);
1317+
1318+
const fulfilled = results.filter(
1319+
(r) => r.status === 'fulfilled'
1320+
) as PromiseFulfilledResult<any>[];
1321+
const created = fulfilled.filter(
1322+
(r) => r.value.event.eventType === 'hook_created'
1323+
);
1324+
const conflicts = fulfilled.filter(
1325+
(r) => r.value.event.eventType === 'hook_conflict'
1326+
);
1327+
1328+
expect(created).toHaveLength(1);
1329+
expect(conflicts).toHaveLength(4);
1330+
});
13031331
});
13041332

13051333
describe('get', () => {

packages/world-local/src/storage/events-storage.ts

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ import {
2727
listJSONFiles,
2828
paginatedFileSystemQuery,
2929
readJSON,
30+
writeExclusive,
3031
writeJSON,
3132
} from '../fs.js';
3233
import { filterEventData } from './filters.js';
33-
import { getObjectCreatedAt, monotonicUlid } from './helpers.js';
34+
import { getObjectCreatedAt, hashToken, monotonicUlid } from './helpers.js';
3435
import { deleteAllHooksForRun } from './hooks-storage.js';
3536
import { handleLegacyEvent } from './legacy.js';
3637

@@ -577,20 +578,24 @@ export function createEventsStorage(basedir: string): Storage['events'] {
577578
isWebhook?: boolean;
578579
};
579580

580-
// Check for duplicate token before creating hook
581-
const hooksDir = path.join(basedir, 'hooks');
582-
const hookFiles = await listJSONFiles(hooksDir);
583-
let hasConflict = false;
584-
for (const file of hookFiles) {
585-
const existingHookPath = path.join(hooksDir, `${file}.json`);
586-
const existingHook = await readJSON(existingHookPath, HookSchema);
587-
if (existingHook && existingHook.token === hookData.token) {
588-
hasConflict = true;
589-
break;
590-
}
591-
}
581+
// Atomically claim the token using an exclusive-create constraint file.
582+
// This avoids the TOCTOU race of the previous read-all-then-check approach.
583+
const constraintPath = path.join(
584+
basedir,
585+
'hooks',
586+
'tokens',
587+
`${hashToken(hookData.token)}.json`
588+
);
589+
const tokenClaimed = await writeExclusive(
590+
constraintPath,
591+
JSON.stringify({
592+
token: hookData.token,
593+
hookId: data.correlationId,
594+
runId: effectiveRunId,
595+
})
596+
);
592597

593-
if (hasConflict) {
598+
if (!tokenClaimed) {
594599
// Create hook_conflict event instead of hook_created
595600
// This allows the workflow to continue and fail gracefully when the hook is awaited
596601
const conflictEvent: Event = {
@@ -647,12 +652,23 @@ export function createEventsStorage(basedir: string): Storage['events'] {
647652
);
648653
await writeJSON(hookPath, hook);
649654
} else if (data.eventType === 'hook_disposed') {
650-
// Delete the hook when disposed
655+
// Read the hook to get its token before deleting
651656
const hookPath = path.join(
652657
basedir,
653658
'hooks',
654659
`${data.correlationId}.json`
655660
);
661+
const existingHook = await readJSON(hookPath, HookSchema);
662+
if (existingHook) {
663+
// Delete the token constraint file to free up the token for reuse
664+
const disposedConstraintPath = path.join(
665+
basedir,
666+
'hooks',
667+
'tokens',
668+
`${hashToken(existingHook.token)}.json`
669+
);
670+
await deleteJSON(disposedConstraintPath);
671+
}
656672
await deleteJSON(hookPath);
657673
} else if (data.eventType === 'wait_created' && 'eventData' in data) {
658674
// wait_created: Creates wait entity with status 'waiting'

packages/world-local/src/storage/helpers.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1+
import { createHash } from 'node:crypto';
12
import { monotonicFactory } from 'ulid';
23
import { ulidToDate } from '../fs.js';
34

5+
/**
6+
* Hash a hook token to produce a filesystem-safe constraint filename.
7+
*/
8+
export function hashToken(token: string): string {
9+
return createHash('sha256').update(token).digest('hex');
10+
}
11+
412
/**
513
* Create a monotonic ULID factory that ensures ULIDs are always increasing
614
* even when generated within the same millisecond.

packages/world-local/src/storage/hooks-storage.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
readJSON,
1717
} from '../fs.js';
1818
import { filterHookData } from './filters.js';
19+
import { hashToken } from './helpers.js';
1920

2021
/**
2122
* Creates a hooks storage implementation using the filesystem.
@@ -113,6 +114,13 @@ export async function deleteAllHooksForRun(
113114
const hookPath = path.join(hooksDir, `${file}.json`);
114115
const hook = await readJSON(hookPath, HookSchema);
115116
if (hook && hook.runId === runId) {
117+
// Delete the token constraint file to free up the token
118+
const constraintPath = path.join(
119+
hooksDir,
120+
'tokens',
121+
`${hashToken(hook.token)}.json`
122+
);
123+
await deleteJSON(constraintPath);
116124
await deleteJSON(hookPath);
117125
}
118126
}

0 commit comments

Comments
 (0)