Skip to content

fix(channels): isolate thread subscriptions per agent#17049

Open
Akash504-ai wants to merge 3 commits into
mastra-ai:mainfrom
Akash504-ai:fix/channel-subscription-agent-isolation
Open

fix(channels): isolate thread subscriptions per agent#17049
Akash504-ai wants to merge 3 commits into
mastra-ai:mainfrom
Akash504-ai:fix/channel-subscription-agent-isolation

Conversation

@Akash504-ai

@Akash504-ai Akash504-ai commented May 26, 2026

Copy link
Copy Markdown
Contributor

Description

Fixes channel subscription state leaking across agents/bots sharing the same Mastra storage adapter.

Previously, MastraStateAdapter looked up subscriptions only by channel_externalThreadId, which caused multiple agents using the same storage backend to share subscription state unintentionally. As a result, different bots could respond to the same incoming message if they shared the same external thread ID (such as a Telegram chatId).

This PR scopes external thread IDs by agentId when storing and resolving thread subscriptions, ensuring subscriptions remain isolated per agent.

Also adds regression tests covering multi-agent isolation behavior.

Related issue(s)

Fixes #17037

Type of change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Documentation update
  • Code refactoring
  • Performance improvement
  • Test update

Checklist

  • I have linked the related issue(s) in the description above
  • I have made corresponding changes to the documentation (if applicable)
  • I have added tests that prove my fix is effective or that my feature works
  • I have addressed all Coderabbit comments on this PR

ELI5

Two different bots were accidentally sharing "who's listening" notes because they used the same storage. This change tags each bot's notes with its own name so bots no longer see each other's subscriptions.

What Changed

This PR fixes a bug where channel thread subscription state leaked across agents that share the same Mastra storage backend by scoping stored external thread IDs to the agent that owns them. Previously, MastraStateAdapter looked up threads only by channel_externalThreadId, causing subscriptions to be treated as global across agents sharing the same storage. The adapter now prefixes external thread IDs with the agent ID (format: ${agentId}:${externalThreadId}) when storing and resolving thread mappings so subscription state remains isolated per agent.

  • Root cause: MastraStateAdapter.findThreadByExternalId (and related subscription checks) queried by raw external thread ID without scoping by agentId.
  • Solution: Add agent scoping to external thread IDs and update subscription logic to use scoped IDs.

Files Modified

  • packages/core/src/channels/state-adapter.ts

    • MastraStateAdapter constructor now accepts agentId (constructor(memoryStore, agentId)).
    • Added agentId field and helper to produce scoped thread IDs.
    • subscribe(), unsubscribe(), and isSubscribed() now resolve threads using the scoped external thread ID.
  • packages/core/src/channels/agent-channels.ts

    • AgentChannels now passes this.agent.id into MastraStateAdapter when constructing the in-memory fallback.
    • Thread metadata now stores channel_externalThreadId using the scoped value.
  • packages/core/src/channels/tests/state-adapter.test.ts

    • Tests updated to construct adapters with explicit agent IDs.
    • Added regression test verifying subscriptions are isolated between agents (agent-a vs agent-b) even when they share the same external thread ID.
    • Expanded persistence/restart test to recreate adapter with same agent ID and confirm subscriptions persist correctly.
  • .changeset/agent-channel-subscription-isolation.md

    • Added changeset documenting patch release for @mastra/core describing the fix.

Behavior Change

Subscriptions are now agent-scoped: multiple agents sharing the same storage backend will no longer see or inherit each other's channel/thread subscription state.

Tests

Added and updated unit tests covering:

  • Subscription persistence across adapter recreation (simulated restart) with agent scoping.
  • Isolation of subscriptions between different agents sharing the same external thread ID.

Review Change Stack

@changeset-bot

changeset-bot Bot commented May 26, 2026

Copy link
Copy Markdown

🦋 Changeset detected

Latest commit: e9893b0

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 20 packages
Name Type
@mastra/core Patch
mastracode Patch
@mastra/mcp-docs-server Patch
@internal/playground Patch
@mastra/client-js Patch
@mastra/opencode Patch
@mastra/longmemeval Patch
mastra Patch
@mastra/deployer-cloud Patch
@mastra/react Patch
@mastra/playground-ui Patch
@mastra/server Patch
@mastra/deployer Patch
create-mastra Patch
@mastra/express Patch
@mastra/fastify Patch
@mastra/hono Patch
@mastra/koa Patch
@mastra/nestjs Patch
@mastra/temporal Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@vercel

vercel Bot commented May 26, 2026

Copy link
Copy Markdown

@Akash504-ai is attempting to deploy a commit to the Mastra Team on Vercel.

A member of the Team first needs to authorize it.

@coderabbitai

coderabbitai Bot commented May 26, 2026

Copy link
Copy Markdown
Contributor

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 13457750-db78-46ac-b7a4-fafe336c2f1e

📥 Commits

Reviewing files that changed from the base of the PR and between a7cbbe4 and e9893b0.

📒 Files selected for processing (1)
  • packages/core/src/channels/__tests__/state-adapter.test.ts

Walkthrough

Scope external channel thread IDs by agent: MastraStateAdapter now stores an agentId, looks up and persists channel_externalThreadId as ${agentId}:${externalThreadId}, AgentChannels initializes the adapter with the agent ID, and tests verify per-agent subscription isolation and persistence.

Changes

Agent-scoped channel subscription isolation

Layer / File(s) Summary
State adapter contract with agentId parameter
packages/core/src/channels/state-adapter.ts
Constructor signature updated to accept agentId string parameter; new private field stores the agent identifier for use in scoped lookups.
Scoped thread ID lookup implementation
packages/core/src/channels/state-adapter.ts
New getScopedThreadId() helper and updated findThreadByExternalId() now filter thread metadata by scoped external ID (${agentId}:${externalThreadId}) instead of unscoped ID, affecting subscribe, unsubscribe, and isSubscribed operations.
AgentChannels wiring and metadata scoping
packages/core/src/channels/agent-channels.ts
State adapter is initialized with this.agent.id; thread metadata channel_externalThreadId is stored as scoped value prefixed with agent ID and colon.
Subscription isolation test coverage
packages/core/src/channels/__tests__/state-adapter.test.ts
Test setup passes explicit agent-a ID to adapter; persisted metadata now uses prefixed external thread IDs; added restart test confirming subscriptions persist with same agent; new cross-agent isolation test verifies different agents remain isolated even when sharing raw external thread IDs.
Release notes
.changeset/agent-channel-subscription-isolation.md
Patch-level changelog entry documents the fix for channel subscription state isolation across agents.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Suggested reviewers

  • TylerBarnes
  • CalebBarnes
  • abhiaiyer91
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main change: isolating thread subscriptions per agent to fix the bug where subscriptions leaked across agents sharing the same storage adapter.
Linked Issues check ✅ Passed The PR successfully implements the proposed solution from issue #17037 by scoping external thread IDs with agentId prefixes across all relevant components (state adapter, agent channels, and tests).
Out of Scope Changes check ✅ Passed All changes are directly related to the stated objective of fixing channel subscription isolation per agent; no out-of-scope modifications were introduced.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint install failed: lockfile failed supply-chain policy check. Run pnpm install locally to update the lockfile.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/core/src/channels/agent-channels.ts (1)

1360-1371: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add legacy thread-id fallback to avoid post-upgrade thread split.

Line 1368 now looks up only agentId:externalThreadId. Existing installations with legacy unscoped metadata will miss matches, causing duplicate thread creation and loss of prior thread continuity until data is migrated.

Suggested compatibility patch
 const scopedExternalThreadId = `${this.agent.id}:${externalThreadId}`;

 const metadata = {
   channel_platform: platform,
   channel_externalThreadId: scopedExternalThreadId,
   channel_externalChannelId: channelId,
 };

-const { threads } = await memoryStore.listThreads({
+let { threads } = await memoryStore.listThreads({
   filter: { metadata },
   perPage: 1,
 });
+
+if (threads.length === 0) {
+  const legacyMetadata = {
+    channel_platform: platform,
+    channel_externalThreadId: externalThreadId,
+    channel_externalChannelId: channelId,
+  };
+  ({ threads } = await memoryStore.listThreads({
+    filter: { metadata: legacyMetadata },
+    perPage: 1,
+  }));
+
+  if (threads[0]) {
+    await memoryStore.updateThread({
+      id: threads[0].id,
+      title: threads[0].title ?? '',
+      metadata: { ...(threads[0].metadata ?? {}), channel_externalThreadId: scopedExternalThreadId },
+    });
+  }
+}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/core/src/channels/agent-channels.ts` around lines 1360 - 1371, The
current lookup uses only the scopedExternalThreadId
(`${this.agent.id}:${externalThreadId}`) which breaks older installations that
stored unscoped thread IDs; update the logic around the call to
memoryStore.listThreads (and the surrounding metadata construction) to fall back
to legacy IDs: first search using metadata.channel_externalThreadId =
scopedExternalThreadId, and if no threads are returned, retry using
metadata.channel_externalThreadId = externalThreadId (unscoped), or
alternatively build a filter that matches either value (OR) if the
memoryStore.listThreads API supports it; ensure any downstream code that uses
the found thread handles the result of the second query the same way.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/core/src/channels/__tests__/state-adapter.test.ts`:
- Around line 83-109: The test currently only saves a thread mapped to agent-a,
so agentB's isSubscribed can be false by absence; add a separate saved thread
for agent-b using memoryStore.saveThread with channel_externalThreadId
`agent-b:${externalThreadId}` (mirroring the agent-a saveThread block but with
id/title/resourceId/metadata namespaced to agent-b), then explicitly exercise
subscribe/isSubscribed for both adapters: call await
agentA.subscribe(externalThreadId) and assert
agentA.isSubscribed(externalThreadId) is true while
agentB.isSubscribed(externalThreadId) is false, and optionally call await
agentB.subscribe(externalThreadId) and assert
agentB.isSubscribed(externalThreadId) becomes true to confirm isolation; locate
changes around MastraStateAdapter usage, memoryStore.saveThread, subscribe, and
isSubscribed.

---

Outside diff comments:
In `@packages/core/src/channels/agent-channels.ts`:
- Around line 1360-1371: The current lookup uses only the scopedExternalThreadId
(`${this.agent.id}:${externalThreadId}`) which breaks older installations that
stored unscoped thread IDs; update the logic around the call to
memoryStore.listThreads (and the surrounding metadata construction) to fall back
to legacy IDs: first search using metadata.channel_externalThreadId =
scopedExternalThreadId, and if no threads are returned, retry using
metadata.channel_externalThreadId = externalThreadId (unscoped), or
alternatively build a filter that matches either value (OR) if the
memoryStore.listThreads API supports it; ensure any downstream code that uses
the found thread handles the result of the second query the same way.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 5aaac9a0-ea4c-4e2b-b0b8-94ebd35ff425

📥 Commits

Reviewing files that changed from the base of the PR and between 5b22ffd and a7cbbe4.

📒 Files selected for processing (4)
  • .changeset/agent-channel-subscription-isolation.md
  • packages/core/src/channels/__tests__/state-adapter.test.ts
  • packages/core/src/channels/agent-channels.ts
  • packages/core/src/channels/state-adapter.ts

Comment thread packages/core/src/channels/__tests__/state-adapter.test.ts
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Thread Subscription State Bleeds Across Agents/Bots When Sharing a Mastra Storage Adapter

1 participant