Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/agent-channel-subscription-isolation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@mastra/core': patch
---

Fixed channel subscription state leaking across agents sharing the same storage adapter by scoping external thread IDs per agent.
53 changes: 50 additions & 3 deletions packages/core/src/channels/__tests__/state-adapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('MastraStateAdapter', () => {
beforeEach(async () => {
db = new InMemoryDB();
memoryStore = new InMemoryMemory({ db });
adapter = new MastraStateAdapter(memoryStore);
adapter = new MastraStateAdapter(memoryStore, 'agent-a');
await adapter.connect();
});

Expand All @@ -39,7 +39,7 @@ describe('MastraStateAdapter', () => {
updatedAt: new Date(),
metadata: {
channel_platform: 'discord',
channel_externalThreadId: externalThreadId,
channel_externalThreadId: `agent-a:${externalThreadId}`,
channel_externalChannelId: 'discord:guild1:channel1',
},
},
Expand Down Expand Up @@ -73,12 +73,59 @@ describe('MastraStateAdapter', () => {
expect(await adapter.isSubscribed(externalThreadId)).toBe(true);

// Create a new adapter instance (simulating server restart)
const newAdapter = new MastraStateAdapter(memoryStore);
const newAdapter = new MastraStateAdapter(memoryStore, 'agent-a');
await newAdapter.connect();

// Subscription should still be there since it's in storage
expect(await newAdapter.isSubscribed(externalThreadId)).toBe(true);
});

it('isolates subscriptions between agents sharing the same external thread ID', async () => {
const agentA = new MastraStateAdapter(memoryStore, 'agent-a');
const agentB = new MastraStateAdapter(memoryStore, 'agent-b');

await agentA.connect();
await agentB.connect();

await memoryStore.saveThread({
thread: {
id: 'thread-agent-a',
title: 'Agent A thread',
resourceId: 'discord:user1',
createdAt: new Date(),
updatedAt: new Date(),
metadata: {
channel_platform: 'discord',
channel_externalThreadId: `agent-a:${externalThreadId}`,
channel_externalChannelId: 'discord:guild1:channel1',
},
},
});

await memoryStore.saveThread({
thread: {
id: 'thread-agent-b',
title: 'Agent B thread',
resourceId: 'discord:user1',
createdAt: new Date(),
updatedAt: new Date(),
metadata: {
channel_platform: 'discord',
channel_externalThreadId: `agent-b:${externalThreadId}`,
channel_externalChannelId: 'discord:guild1:channel1',
},
},
});

await agentA.subscribe(externalThreadId);

expect(await agentA.isSubscribed(externalThreadId)).toBe(true);
expect(await agentB.isSubscribed(externalThreadId)).toBe(false);

await agentB.subscribe(externalThreadId);

expect(await agentB.isSubscribed(externalThreadId)).toBe(true);
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});

describe('cache (in-memory)', () => {
Expand Down
6 changes: 4 additions & 2 deletions packages/core/src/channels/agent-channels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ export class AgentChannels {
'Channels require storage to be configured on the Mastra instance. Configure a storage provider like LibSQLStore.',
);
}
this.stateAdapter = new MastraStateAdapter(memoryStore);
this.stateAdapter = new MastraStateAdapter(memoryStore, this.agent.id);
this.log('info', 'Using MastraStateAdapter (subscriptions persist across restarts)');
}

Expand Down Expand Up @@ -1357,9 +1357,11 @@ export class AgentChannels {
);
}

const scopedExternalThreadId = `${this.agent.id}:${externalThreadId}`;

const metadata = {
channel_platform: platform,
channel_externalThreadId: externalThreadId,
channel_externalThreadId: scopedExternalThreadId,
channel_externalChannelId: channelId,
};

Expand Down
13 changes: 11 additions & 2 deletions packages/core/src/channels/state-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ interface CachedValue<T = unknown> {
*/
export class MastraStateAdapter implements StateAdapter {
private memoryStore: MemoryStorage;
private agentId: string;
private connected = false;
private connectPromise: Promise<void> | null = null;

Expand All @@ -27,8 +28,9 @@ export class MastraStateAdapter implements StateAdapter {
private readonly lists = new Map<string, { values: unknown[]; expiresAt: number | null }>();
private readonly queues = new Map<string, QueueEntry[]>();

constructor(memoryStore: MemoryStorage) {
constructor(memoryStore: MemoryStorage, agentId: string) {
this.memoryStore = memoryStore;
this.agentId = agentId;
}

async connect(): Promise<void> {
Expand Down Expand Up @@ -234,9 +236,16 @@ export class MastraStateAdapter implements StateAdapter {
* Find a Mastra thread by its external (SDK) thread ID.
* External thread IDs are stored in `channel_externalThreadId` metadata.
*/
private getScopedThreadId(externalThreadId: string): string {
return `${this.agentId}:${externalThreadId}`;
}
private async findThreadByExternalId(externalThreadId: string) {
const { threads } = await this.memoryStore.listThreads({
filter: { metadata: { channel_externalThreadId: externalThreadId } },
filter: {
metadata: {
channel_externalThreadId: this.getScopedThreadId(externalThreadId),
},
},
perPage: 1,
});
return threads[0] ?? null;
Expand Down
Loading