feat(plugin-sql): Electric Cloud sync integration with write-back service#8336
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
…vice - Add Electric Sync support via pglite-sync extension with per-agent WHERE filtering (literal SQL values for Electric Cloud compatibility) - Add WriteBackService for bidirectional sync (PGlite writes → Cloud API) - Add UUID validation for safe literal SQL interpolation in where clauses - Add Caddy reverse proxy config for Electric Cloud local dev - Add e2e write-back test against Electric Cloud (env-var gated) - Add deferred sync startup tests (init→migrate→sync phase ordering) - Add live query and latency benchmark tests - Add Docker Compose config for local Electric+Postgres test stack - Add .env.example documenting all Electric Cloud and sync env vars - Update README with Electric Sync section and expanded env var table - Update .gitignore to track .env.example while ignoring .env
16eebd4 to
57cdcc9
Compare
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
…ariable' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
…tion or class' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Claude encountered an error —— View job I'll analyze this and get back to you. |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (3)
plugins/plugin-sql/src/__tests__/unit/write-back.test.ts (1)
77-108: ⚡ Quick winAdd assertion to validate payload shape excludes internal fields.
The test validates presence of
table,operation, androwfields but doesn't assert that internal fields likeretriesare absent from the payload. This leaves a gap given the payload-shape issue flagged inwrite-back/index.ts.✅ Add payload shape assertion
const body = JSON.parse(fetchSpy.mock.calls[0]?.[1]?.body as string); expect(body.writes).toHaveLength(2); expect(body.writes[0].table).toBe("memories"); expect(body.writes[0].operation).toBe("insert"); expect(body.writes[0].row.id).toBe("m-1"); +expect(body.writes[0].retries).toBeUndefined(); // Internal state shouldn't leak expect(body.writes[1].table).toBe("rooms"); expect(body.writes[1].row.name).toBe("test-room");🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/plugin-sql/src/__tests__/unit/write-back.test.ts` around lines 77 - 108, The test should assert that internal fields like retries are not sent in the payload: after parsing body in the "sends batched writes to the cloud API" test (working with the WriteBackService instance that calls enqueue and flush), add assertions that for each entry in body.writes there is no retries property (e.g., expect(body.writes[i]).not.toHaveProperty('retries')) to ensure the flush/enqueue serialization in write-back/index.ts strips internal fields.plugins/plugin-sql/src/__tests__/integration/electric-sync-end-to-end.test.ts (2)
268-269: 💤 Low valueDouble cast
as unknown as DrizzleDatabaserepeated three times.The
drizzle(client)return type doesn't matchDrizzleDatabase. This pattern works but bypasses type checking. Consider adding a typed wrapper or updatingDrizzleDatabaseto be compatible with PGlite's drizzle instance.Also applies to: 364-365, 453-454
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/plugin-sql/src/__tests__/integration/electric-sync-end-to-end.test.ts` around lines 268 - 269, The repeated double-cast pattern "drizzle(client) as unknown as DrizzleDatabase" bypasses type checking; replace it by either creating a small typed wrapper function (e.g., createDrizzleClient(client): DrizzleDatabase) that imports the proper PGlite drizzle return type and returns a correctly-typed instance, or update the DrizzleDatabase type/alias to be compatible with the PGlite drizzle instance; then replace the three occurrences (the lines using drizzle(client) as unknown as DrizzleDatabase) to call the new wrapper or rely on the updated DrizzleDatabase type so no double-cast is needed.
282-294: ⚡ Quick winPolling logic duplicated three times.
Extract a helper to reduce repetition:
async function pollUntilSynced(manager: PGliteClientManager, timeoutMs = 15_000): Promise<void> { const deadline = Date.now() + timeoutMs; while (Date.now() < deadline) { const { status, error } = manager.getSyncStatus(); if (status === "synced") return; if (status === "error") throw new Error(`Sync errored: ${error}`); await new Promise((r) => setTimeout(r, 500)); } throw new Error("Sync did not complete within timeout"); }Also applies to: 376-390, 466-477
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugins/plugin-sql/src/__tests__/integration/electric-sync-end-to-end.test.ts` around lines 282 - 294, Duplicate polling blocks in the test should be extracted into a reusable helper: implement an async pollUntilSynced(manager: PGliteClientManager, timeoutMs = 15_000) that repeatedly calls manager.getSyncStatus(), returns when status === "synced", throws when status === "error" with the status.error message, and after the deadline throws a timeout error; replace the three duplicated while loops (the blocks that call manager.getSyncStatus() and await setTimeout(500)) with calls to pollUntilSynced(...) using the default or explicit timeout as needed to keep test semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@plugins/plugin-sql/.env.example`:
- Around line 25-29: Update the AGENT_ID .env example comment to state it is
required both for per-agent sync (when ELIZA_ELECTRIC_SYNC_URL is set) and as a
prerequisite for enabling write-back (alongside ELIZA_CLOUD_WRITE_BASE_URL and
ELIZA_CLOUD_SERVICE_KEY); mention that without AGENT_ID write-back will remain
disabled even if those write-back variables are present. Refer to the existing
ENV symbols AGENT_ID, ELIZA_ELECTRIC_SYNC_URL, ELIZA_CLOUD_WRITE_BASE_URL, and
ELIZA_CLOUD_SERVICE_KEY when editing the comment so readers know the exact
variables involved.
In `@plugins/plugin-sql/README.md`:
- Around line 64-66: Update the README table entry for AGENT_ID to state that
AGENT_ID is required not only when ELIZA_ELECTRIC_SYNC_URL is set (per-agent
WHERE filter) but also when enabling cloud write-back; specifically, clarify
that AGENT_ID is required if ELIZA_ELECTRIC_SYNC_URL is present OR if both
ELIZA_CLOUD_WRITE_BASE_URL and ELIZA_CLOUD_SERVICE_KEY are set, and adjust the
description text to mention both use-cases (sync filtering and write-back
enablement) using the exact symbols AGENT_ID, ELIZA_ELECTRIC_SYNC_URL,
ELIZA_CLOUD_WRITE_BASE_URL, and ELIZA_CLOUD_SERVICE_KEY so readers can easily
map configuration to behavior.
In `@plugins/plugin-sql/src/__tests__/integration/electric-sync-deferred.test.ts`:
- Around line 263-271: The test registers the same temp directory twice in the
cleanups array (cleanups.push({ dir }) and later cleanups.push({ dir, manager
})), which can cause the directory to be removed before the PGliteClientManager
is closed; remove the earlier duplicate cleanups.push({ dir }) so only the entry
that includes the PGliteClientManager instance (the one that pushes { dir,
manager } after new PGliteClientManager(...)) remains, ensuring
manager.close/cleanup runs before the directory is deleted; locate usages around
createTempDir, the cleanups array, and the PGliteClientManager instantiation to
make this change.
- Around line 315-329: The test is pushing duplicate cleanup entries for the
same temp dir: first cleanups.push({ dir }) then later cleanups.push({ dir,
manager }), causing redundant cleanup records; remove the initial
cleanups.push({ dir }) so only the later cleanups.push({ dir, manager }) (which
includes both the directory and the PGliteClientManager instance) remains; look
for the temp dir created by createTempDir(...) and the manager variable created
with new PGliteClientManager(...) to locate the two pushes to the cleanups array
and delete the first one.
- Around line 290-308: The test "ensureSync() before initialize() is a no-op"
registers only { dir } into the cleanups array and never closes the
PGliteClientManager, leaking async resources; update the cleanup registration to
include the manager (e.g., push { dir, manager }) or ensure manager is closed in
the test teardown so that PGliteClientManager.close() is invoked; locate the
test function and the cleanups array usage and add the manager reference (the
PGliteClientManager instance named manager) to the cleanup entry so the manager
is properly closed.
In `@plugins/plugin-sql/src/pglite/adapter.ts`:
- Around line 286-292: The removeParticipant method currently calls
this.manager.notifyWrite("participants","delete",{ entityId, roomId }); but the
DB uses snake_case columns—match addParticipant's shape by sending snake_case
keys: change the payload to { user_id: entityId, room_id: roomId } (and include
any other DB key used by addParticipant such as id/agent_id if required by your
write-back semantics) when calling notifyWrite in removeParticipant so the
write-back API receives column names, not camelCase variables; update the
notifyWrite invocation inside removeParticipant accordingly.
In `@plugins/plugin-sql/src/write-back/index.ts`:
- Around line 46-49: buildWriteUrl currently interpolates agentId into the path
raw; ensure agentId is safe by either validating it against the expected UUID
format (in buildWriteUrl) and throwing a clear error if it fails, or by
URL-encoding the segment before interpolation (e.g., use encodeURIComponent on
agentId) so characters like "/", "?", "#" are escaped; update the function
(buildWriteUrl) to perform validation/encoding and return a well-formed URL or
raise on invalid input.
- Around line 159-197: sendBatch is currently JSON-stringifying the PendingWrite
objects (including the internal retries counter) when sending to the cloud API;
change the payload to map batch to a public shape that omits retries (e.g., map
each PendingWrite in the batch to an object without the retries property) before
calling JSON.stringify, so the POST body contains { writes: mappedBatch }
instead of the raw batch; keep the existing error handling and calls to
requeueOrDrop, and reference sendBatch, PendingWrite, writeUrl and serviceKey
when locating the change.
---
Nitpick comments:
In
`@plugins/plugin-sql/src/__tests__/integration/electric-sync-end-to-end.test.ts`:
- Around line 268-269: The repeated double-cast pattern "drizzle(client) as
unknown as DrizzleDatabase" bypasses type checking; replace it by either
creating a small typed wrapper function (e.g., createDrizzleClient(client):
DrizzleDatabase) that imports the proper PGlite drizzle return type and returns
a correctly-typed instance, or update the DrizzleDatabase type/alias to be
compatible with the PGlite drizzle instance; then replace the three occurrences
(the lines using drizzle(client) as unknown as DrizzleDatabase) to call the new
wrapper or rely on the updated DrizzleDatabase type so no double-cast is needed.
- Around line 282-294: Duplicate polling blocks in the test should be extracted
into a reusable helper: implement an async pollUntilSynced(manager:
PGliteClientManager, timeoutMs = 15_000) that repeatedly calls
manager.getSyncStatus(), returns when status === "synced", throws when status
=== "error" with the status.error message, and after the deadline throws a
timeout error; replace the three duplicated while loops (the blocks that call
manager.getSyncStatus() and await setTimeout(500)) with calls to
pollUntilSynced(...) using the default or explicit timeout as needed to keep
test semantics.
In `@plugins/plugin-sql/src/__tests__/unit/write-back.test.ts`:
- Around line 77-108: The test should assert that internal fields like retries
are not sent in the payload: after parsing body in the "sends batched writes to
the cloud API" test (working with the WriteBackService instance that calls
enqueue and flush), add assertions that for each entry in body.writes there is
no retries property (e.g., expect(body.writes[i]).not.toHaveProperty('retries'))
to ensure the flush/enqueue serialization in write-back/index.ts strips internal
fields.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8bf10445-d2e8-4786-9c16-48df45644332
⛔ Files ignored due to path filters (1)
bun.lockis excluded by!**/*.lock
📒 Files selected for processing (18)
plugins/plugin-sql/.env.exampleplugins/plugin-sql/.gitignoreplugins/plugin-sql/README.mdplugins/plugin-sql/caddy/electric-proxy.Caddyfileplugins/plugin-sql/docker-compose.electric-test.ymlplugins/plugin-sql/package.jsonplugins/plugin-sql/src/__tests__/integration/electric-sync-deferred.test.tsplugins/plugin-sql/src/__tests__/integration/electric-sync-end-to-end.test.tsplugins/plugin-sql/src/__tests__/integration/electric-sync-live-query.test.tsplugins/plugin-sql/src/__tests__/integration/electric-write-back.test.tsplugins/plugin-sql/src/__tests__/integration/live-query-latency.test.tsplugins/plugin-sql/src/__tests__/unit/write-back.test.tsplugins/plugin-sql/src/base.tsplugins/plugin-sql/src/index.node.tsplugins/plugin-sql/src/index.tsplugins/plugin-sql/src/pglite/adapter.tsplugins/plugin-sql/src/pglite/manager.tsplugins/plugin-sql/src/write-back/index.ts
| # Agent ID for per-agent sync filtering (required when | ||
| # ELIZA_ELECTRIC_SYNC_URL is set). Each PGlite instance | ||
| # syncs only rows matching its agent_id WHERE clause. | ||
| AGENT_ID= | ||
|
|
There was a problem hiding this comment.
Clarify AGENT_ID as a write-back prerequisite too.
Current wording says AGENT_ID is required only when ELIZA_ELECTRIC_SYNC_URL is set, but write-back enablement also depends on AGENT_ID alongside ELIZA_CLOUD_WRITE_BASE_URL and ELIZA_CLOUD_SERVICE_KEY. Without it, write-back stays disabled silently.
Also applies to: 35-38
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/plugin-sql/.env.example` around lines 25 - 29, Update the AGENT_ID
.env example comment to state it is required both for per-agent sync (when
ELIZA_ELECTRIC_SYNC_URL is set) and as a prerequisite for enabling write-back
(alongside ELIZA_CLOUD_WRITE_BASE_URL and ELIZA_CLOUD_SERVICE_KEY); mention that
without AGENT_ID write-back will remain disabled even if those write-back
variables are present. Refer to the existing ENV symbols AGENT_ID,
ELIZA_ELECTRIC_SYNC_URL, ELIZA_CLOUD_WRITE_BASE_URL, and ELIZA_CLOUD_SERVICE_KEY
when editing the comment so readers know the exact variables involved.
| | `AGENT_ID` | Conditional | — | UUID of the agent. Required when `ELIZA_ELECTRIC_SYNC_URL` is set (per-agent WHERE filter). | | ||
| | `ELIZA_CLOUD_WRITE_BASE_URL` | No | — | Write-back cloud endpoint for forwarding local PGlite writes to Postgres. | | ||
| | `ELIZA_CLOUD_SERVICE_KEY` | No | — | Service key for authenticating write-back requests. | |
There was a problem hiding this comment.
Document full write-back enablement contract for AGENT_ID.
The table currently scopes AGENT_ID to sync filtering only, but write-back also requires AGENT_ID (with ELIZA_CLOUD_WRITE_BASE_URL + ELIZA_CLOUD_SERVICE_KEY) to become enabled.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/plugin-sql/README.md` around lines 64 - 66, Update the README table
entry for AGENT_ID to state that AGENT_ID is required not only when
ELIZA_ELECTRIC_SYNC_URL is set (per-agent WHERE filter) but also when enabling
cloud write-back; specifically, clarify that AGENT_ID is required if
ELIZA_ELECTRIC_SYNC_URL is present OR if both ELIZA_CLOUD_WRITE_BASE_URL and
ELIZA_CLOUD_SERVICE_KEY are set, and adjust the description text to mention both
use-cases (sync filtering and write-back enablement) using the exact symbols
AGENT_ID, ELIZA_ELECTRIC_SYNC_URL, ELIZA_CLOUD_WRITE_BASE_URL, and
ELIZA_CLOUD_SERVICE_KEY so readers can easily map configuration to behavior.
| const dir = createTempDir("eliza-sync-idem-"); | ||
| cleanups.push({ dir }); | ||
|
|
||
| const manager = new PGliteClientManager({ | ||
| dataDir: dir, | ||
| syncUrl: BOGUS_SYNC_URL, | ||
| agentId: v4(), | ||
| }); | ||
| cleanups.push({ dir, manager }); |
There was a problem hiding this comment.
Duplicate cleanup entry for same temp dir.
Line 264 pushes { dir } and line 271 pushes { dir, manager } — same dir registered twice. The afterEach loop will attempt fs.rmSync twice on the same path. Second attempt is harmless due to force: true, but the first entry lacks manager, so it won't close PGlite before deletion.
Remove line 264 since line 271 covers both cleanup needs.
Proposed fix
it("calling ensureSync() multiple times is safe (no double-start)", async () => {
const dir = createTempDir("eliza-sync-idem-");
- cleanups.push({ dir });
const manager = new PGliteClientManager({
dataDir: dir,
syncUrl: BOGUS_SYNC_URL,
agentId: v4(),
});
cleanups.push({ dir, manager });📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const dir = createTempDir("eliza-sync-idem-"); | |
| cleanups.push({ dir }); | |
| const manager = new PGliteClientManager({ | |
| dataDir: dir, | |
| syncUrl: BOGUS_SYNC_URL, | |
| agentId: v4(), | |
| }); | |
| cleanups.push({ dir, manager }); | |
| const dir = createTempDir("eliza-sync-idem-"); | |
| const manager = new PGliteClientManager({ | |
| dataDir: dir, | |
| syncUrl: BOGUS_SYNC_URL, | |
| agentId: v4(), | |
| }); | |
| cleanups.push({ dir, manager }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/plugin-sql/src/__tests__/integration/electric-sync-deferred.test.ts`
around lines 263 - 271, The test registers the same temp directory twice in the
cleanups array (cleanups.push({ dir }) and later cleanups.push({ dir, manager
})), which can cause the directory to be removed before the PGliteClientManager
is closed; remove the earlier duplicate cleanups.push({ dir }) so only the entry
that includes the PGliteClientManager instance (the one that pushes { dir,
manager } after new PGliteClientManager(...)) remains, ensuring
manager.close/cleanup runs before the directory is deleted; locate usages around
createTempDir, the cleanups array, and the PGliteClientManager instantiation to
make this change.
| it("ensureSync() before initialize() is a no-op", async () => { | ||
| const dir = createTempDir("eliza-sync-preinit-"); | ||
| cleanups.push({ dir }); | ||
|
|
||
| const manager = new PGliteClientManager({ | ||
| dataDir: dir, | ||
| syncUrl: BOGUS_SYNC_URL, | ||
| agentId: v4(), | ||
| }); | ||
| // Don't push manager to cleanups — it was never initialized, so | ||
| // calling close() would try to close a PGlite that hasn't been | ||
| // fully started, which can hang. Just clean up the temp dir. | ||
|
|
||
| // ensureSync() checks this.initialized → false, returns early. | ||
| await manager.ensureSync(); | ||
|
|
||
| const status = manager.getSyncStatus(); | ||
| expect(status.status).toBe("disabled"); | ||
| }); |
There was a problem hiding this comment.
Same duplicate pattern: line 292 pushes { dir } without manager.
Like the previous test, this registers cleanup without the manager reference, then test exits without tracking manager in cleanups. The manager is never closed, which can leak async resources.
Proposed fix
it("ensureSync() before initialize() is a no-op", async () => {
const dir = createTempDir("eliza-sync-preinit-");
- cleanups.push({ dir });
const manager = new PGliteClientManager({
dataDir: dir,
syncUrl: BOGUS_SYNC_URL,
agentId: v4(),
});
- // Don't push manager to cleanups — it was never initialized, so
- // calling close() would try to close a PGlite that hasn't been
- // fully started, which can hang. Just clean up the temp dir.
+ // Manager was never initialized, but we still track it for cleanup.
+ // close() on an uninitialized manager should be a safe no-op.
+ cleanups.push({ dir, manager });
// ensureSync() checks this.initialized → false, returns early.
await manager.ensureSync();📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| it("ensureSync() before initialize() is a no-op", async () => { | |
| const dir = createTempDir("eliza-sync-preinit-"); | |
| cleanups.push({ dir }); | |
| const manager = new PGliteClientManager({ | |
| dataDir: dir, | |
| syncUrl: BOGUS_SYNC_URL, | |
| agentId: v4(), | |
| }); | |
| // Don't push manager to cleanups — it was never initialized, so | |
| // calling close() would try to close a PGlite that hasn't been | |
| // fully started, which can hang. Just clean up the temp dir. | |
| // ensureSync() checks this.initialized → false, returns early. | |
| await manager.ensureSync(); | |
| const status = manager.getSyncStatus(); | |
| expect(status.status).toBe("disabled"); | |
| }); | |
| it("ensureSync() before initialize() is a no-op", async () => { | |
| const dir = createTempDir("eliza-sync-preinit-"); | |
| const manager = new PGliteClientManager({ | |
| dataDir: dir, | |
| syncUrl: BOGUS_SYNC_URL, | |
| agentId: v4(), | |
| }); | |
| // Manager was never initialized, but we still track it for cleanup. | |
| // close() on an uninitialized manager should be a safe no-op. | |
| cleanups.push({ dir, manager }); | |
| // ensureSync() checks this.initialized → false, returns early. | |
| await manager.ensureSync(); | |
| const status = manager.getSyncStatus(); | |
| expect(status.status).toBe("disabled"); | |
| }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/plugin-sql/src/__tests__/integration/electric-sync-deferred.test.ts`
around lines 290 - 308, The test "ensureSync() before initialize() is a no-op"
registers only { dir } into the cleanups array and never closes the
PGliteClientManager, leaking async resources; update the cleanup registration to
include the manager (e.g., push { dir, manager }) or ensure manager is closed in
the test teardown so that PGliteClientManager.close() is invoked; locate the
test function and the cleanups array usage and add the manager reference (the
PGliteClientManager instance named manager) to the cleanup entry so the manager
is properly closed.
| it("first adapter DB operation triggers sync after migrations", async () => { | ||
| const dir = createTempDir("eliza-sync-wdb-"); | ||
| cleanups.push({ dir }); | ||
|
|
||
| const agentId = v4(); | ||
|
|
||
| // Create manager with a syncUrl so ensureSync() has something to | ||
| // attempt (it will fail to the bogus URL, but that proves the code | ||
| // path is exercised). | ||
| const manager = new PGliteClientManager({ | ||
| dataDir: dir, | ||
| syncUrl: BOGUS_SYNC_URL, | ||
| agentId, | ||
| }); | ||
| cleanups.push({ dir, manager }); |
There was a problem hiding this comment.
Same pattern: duplicate cleanup entry.
Line 317 pushes { dir } alone, then line 329 pushes { dir, manager }.
Proposed fix
it("first adapter DB operation triggers sync after migrations", async () => {
const dir = createTempDir("eliza-sync-wdb-");
- cleanups.push({ dir });
const agentId = v4();
// ...
const manager = new PGliteClientManager({
dataDir: dir,
syncUrl: BOGUS_SYNC_URL,
agentId,
});
cleanups.push({ dir, manager });📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| it("first adapter DB operation triggers sync after migrations", async () => { | |
| const dir = createTempDir("eliza-sync-wdb-"); | |
| cleanups.push({ dir }); | |
| const agentId = v4(); | |
| // Create manager with a syncUrl so ensureSync() has something to | |
| // attempt (it will fail to the bogus URL, but that proves the code | |
| // path is exercised). | |
| const manager = new PGliteClientManager({ | |
| dataDir: dir, | |
| syncUrl: BOGUS_SYNC_URL, | |
| agentId, | |
| }); | |
| cleanups.push({ dir, manager }); | |
| it("first adapter DB operation triggers sync after migrations", async () => { | |
| const dir = createTempDir("eliza-sync-wdb-"); | |
| const agentId = v4(); | |
| // Create manager with a syncUrl so ensureSync() has something to | |
| // attempt (it will fail to the bogus URL, but that proves the code | |
| // path is exercised). | |
| const manager = new PGliteClientManager({ | |
| dataDir: dir, | |
| syncUrl: BOGUS_SYNC_URL, | |
| agentId, | |
| }); | |
| cleanups.push({ dir, manager }); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/plugin-sql/src/__tests__/integration/electric-sync-deferred.test.ts`
around lines 315 - 329, The test is pushing duplicate cleanup entries for the
same temp dir: first cleanups.push({ dir }) then later cleanups.push({ dir,
manager }), causing redundant cleanup records; remove the initial
cleanups.push({ dir }) so only the later cleanups.push({ dir, manager }) (which
includes both the directory and the PGliteClientManager instance) remains; look
for the temp dir created by createTempDir(...) and the manager variable created
with new PGliteClientManager(...) to locate the two pushes to the cleanups array
and delete the first one.
| async removeParticipant(entityId: UUID, roomId: UUID): Promise<boolean> { | ||
| const ok = await super.removeParticipant(entityId, roomId); | ||
| if (ok) { | ||
| this.manager.notifyWrite("participants", "delete", { entityId, roomId }); | ||
| } | ||
| return ok; | ||
| } |
There was a problem hiding this comment.
Write-back payload uses entityId/roomId but DB columns are user_id/room_id.
The removeParticipant notification passes { entityId, roomId } but the addParticipant notification (line 277-280) uses { id, user_id, room_id, agent_id }. For consistency and correct write-back API behavior, use snake_case column names here too.
🔧 Suggested fix
async removeParticipant(entityId: UUID, roomId: UUID): Promise<boolean> {
const ok = await super.removeParticipant(entityId, roomId);
if (ok) {
- this.manager.notifyWrite("participants", "delete", { entityId, roomId });
+ this.manager.notifyWrite("participants", "delete", { user_id: entityId, room_id: roomId });
}
return ok;
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/plugin-sql/src/pglite/adapter.ts` around lines 286 - 292, The
removeParticipant method currently calls
this.manager.notifyWrite("participants","delete",{ entityId, roomId }); but the
DB uses snake_case columns—match addParticipant's shape by sending snake_case
keys: change the payload to { user_id: entityId, room_id: roomId } (and include
any other DB key used by addParticipant such as id/agent_id if required by your
write-back semantics) when calling notifyWrite in removeParticipant so the
write-back API receives column names, not camelCase variables; update the
notifyWrite invocation inside removeParticipant accordingly.
| function buildWriteUrl(baseUrl: string, agentId: string): string { | ||
| const base = baseUrl.replace(/\/+$/, ""); | ||
| return `${base}/api/v1/eliza/agents/${agentId}/write`; | ||
| } |
There was a problem hiding this comment.
Validate agentId format or URL-encode before interpolation.
The buildWriteUrl function directly embeds agentId into the URL path without validation or encoding. If agentId contains /, ?, or #, the resulting URL will be malformed.
🛡️ Add UUID validation
function buildWriteUrl(baseUrl: string, agentId: string): string {
+ // Validate UUID format to prevent URL injection
+ const uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
+ if (!uuidPattern.test(agentId)) {
+ throw new Error(`Invalid agentId format: ${agentId}`);
+ }
const base = baseUrl.replace(/\/+$/, "");
return `${base}/api/v1/eliza/agents/${agentId}/write`;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| function buildWriteUrl(baseUrl: string, agentId: string): string { | |
| const base = baseUrl.replace(/\/+$/, ""); | |
| return `${base}/api/v1/eliza/agents/${agentId}/write`; | |
| } | |
| function buildWriteUrl(baseUrl: string, agentId: string): string { | |
| // Validate UUID format to prevent URL injection | |
| const uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; | |
| if (!uuidPattern.test(agentId)) { | |
| throw new Error(`Invalid agentId format: ${agentId}`); | |
| } | |
| const base = baseUrl.replace(/\/+$/, ""); | |
| return `${base}/api/v1/eliza/agents/${agentId}/write`; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/plugin-sql/src/write-back/index.ts` around lines 46 - 49,
buildWriteUrl currently interpolates agentId into the path raw; ensure agentId
is safe by either validating it against the expected UUID format (in
buildWriteUrl) and throwing a clear error if it fails, or by URL-encoding the
segment before interpolation (e.g., use encodeURIComponent on agentId) so
characters like "/", "?", "#" are escaped; update the function (buildWriteUrl)
to perform validation/encoding and return a well-formed URL or raise on invalid
input.
| private async sendBatch(batch: PendingWrite[]): Promise<void> { | ||
| // Guard: only called from drainQueue which checks this.enabled, but | ||
| // this avoids the non-null assertions that Biome flags. | ||
| if (!this.writeUrl || !this.serviceKey) { | ||
| logger.warn( | ||
| { src: "plugin:sql" }, | ||
| "WriteBackService: sendBatch called while not configured — dropping batch" | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| const response = await fetch(this.writeUrl, { | ||
| method: "POST", | ||
| headers: { | ||
| "Content-Type": "application/json", | ||
| "X-Service-Key": this.serviceKey, | ||
| }, | ||
| body: JSON.stringify({ writes: batch }), | ||
| signal: AbortSignal.timeout(30_000), | ||
| }); | ||
|
|
||
| if (!response.ok) { | ||
| const text = await response.text().catch(() => ""); | ||
| logger.warn( | ||
| { src: "plugin:sql", status: response.status }, | ||
| `WriteBackService: cloud API returned ${response.status}: ${text.slice(0, 200)}` | ||
| ); | ||
| this.requeueOrDrop(batch); | ||
| } | ||
| } catch (err) { | ||
| const msg = err instanceof Error ? err.message : String(err); | ||
| logger.warn( | ||
| { src: "plugin:sql", error: msg }, | ||
| "WriteBackService: failed to send write batch — retrying" | ||
| ); | ||
| this.requeueOrDrop(batch); | ||
| } | ||
| } |
There was a problem hiding this comment.
Strip internal retries field from cloud API payload.
Line 177 sends the entire PendingWrite object to the cloud API, including the internal retries counter. This implementation detail shouldn't be exposed to external systems.
🔧 Map payload to public shape
private async sendBatch(batch: PendingWrite[]): Promise<void> {
// Guard: only called from drainQueue which checks this.enabled, but
// this avoids the non-null assertions that Biome flags.
if (!this.writeUrl || !this.serviceKey) {
logger.warn(
{ src: "plugin:sql" },
"WriteBackService: sendBatch called while not configured — dropping batch"
);
return;
}
try {
+ // Strip internal retry state from the cloud API payload
+ const payload = batch.map(({ table, operation, row, writeId }) => ({
+ table,
+ operation,
+ row,
+ writeId,
+ }));
const response = await fetch(this.writeUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Service-Key": this.serviceKey,
},
- body: JSON.stringify({ writes: batch }),
+ body: JSON.stringify({ writes: payload }),
signal: AbortSignal.timeout(30_000),
});🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@plugins/plugin-sql/src/write-back/index.ts` around lines 159 - 197, sendBatch
is currently JSON-stringifying the PendingWrite objects (including the internal
retries counter) when sending to the cloud API; change the payload to map batch
to a public shape that omits retries (e.g., map each PendingWrite in the batch
to an object without the retries property) before calling JSON.stringify, so the
POST body contains { writes: mappedBatch } instead of the raw batch; keep the
existing error handling and calls to requeueOrDrop, and reference sendBatch,
PendingWrite, writeUrl and serviceKey when locating the change.
|
Landed on
Sync stays disabled unless |
Summary
Adds Electric Sync (PGlite ↔ Electric Cloud) and a write-back service for bidirectional data synchronization.
What changed
PGliteClientManagernow registerselectricSync()extension whenELIZA_ELECTRIC_SYNC_URLis set. Per-agent WHERE filtering (agent_id = ${agentId}) with literal SQL values (required by Electric Cloud v1.6.x). UUID validation prevents SQL injection.WriteBackServiceforwards local PGlite writes to the cloud API.PgliteDatabaseAdapternotifies the service after inserts/upserts/deletes on synced tables.plugins/plugin-sql/caddy/electric-proxy.Caddyfileforwards all requests toapi.electric-sql.cloudwith auth query params injected from env vars..env.examplecreated documenting all Electric Cloud and sync env vars..gitignoretracks.env.example. Docker Compose for local Electric+Postgres test stack.How to test
Checks
Summary by CodeRabbit
Release Notes
New Features
Documentation
Tests