Skip to content

Commit 564279e

Browse files
bigsheeperclaude
andauthored
enhance: allow pchannel count increase in ReplicateConfiguration (milvus-io#47792)
## Summary - Allow pchannel count increase (append-only) in `validateClusterConsistency` for CU scaling - Existing pchannels must be preserved at the same positions; decrease and reorder are still rejected - Equal pchannel count across clusters is still enforced in `validateClusterBasic` ## Test plan - [x] Unit tests pass for `pkg/util/replicateutil/...` - [x] Verified pchannel increase (append) is accepted - [x] Verified pchannel decrease is rejected - [x] Verified pchannel reorder/replace is rejected issue: milvus-io#47791 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8cffd8e commit 564279e

12 files changed

Lines changed: 1583 additions & 908 deletions

File tree

internal/cdc/replication/replicatemanager/channel_replicator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,17 @@ func (r *channelReplicator) startConsumeLoop() {
181181
func (r *channelReplicator) getReplicateCheckpoint() (*utility.ReplicateCheckpoint, error) {
182182
logger := log.With(zap.String("key", r.channel.Key), zap.Int64("modRevision", r.channel.ModRevision))
183183

184+
// For pchannel-increasing tasks, the secondary WAL for new pchannels hasn't received the
185+
// AlterReplicateConfig yet, so GetReplicateInfo would fail. Use InitializedCheckpoint directly.
186+
if r.channel.Value.GetSkipGetReplicateCheckpoint() {
187+
initializedCheckpoint := utility.NewReplicateCheckpointFromProto(r.channel.Value.InitializedCheckpoint)
188+
logger.Info("skip get replicate checkpoint for pchannel-increasing task, use initialized checkpoint",
189+
zap.Stringer("messageID", initializedCheckpoint.MessageID),
190+
zap.Uint64("timeTick", initializedCheckpoint.TimeTick),
191+
)
192+
return initializedCheckpoint, nil
193+
}
194+
184195
ctx, cancel := context.WithTimeout(r.asyncNotifier.Context(), 30*time.Second)
185196
defer cancel()
186197

internal/distributed/streaming/replicate_service.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,17 +120,36 @@ func (s replicateService) overwriteReplicateMessage(ctx context.Context, msg mes
120120
if sourceCluster == nil {
121121
return nil, status.NewReplicateViolation("source cluster %s not found in replicate configuration", rh.ClusterID)
122122
}
123-
targetVChannel, err := s.getTargetVChannel(sourceCluster, msg.VChannel())
123+
124+
// For pchannel-increasing AlterReplicateConfig messages, use the NEW config from the
125+
// message header to map ALL channels (including newly added ones).
126+
// The current config only knows about old pchannels, so both the main vchannel and
127+
// broadcast vchannels need the new config for mapping.
128+
channelMappingSourceCluster := sourceCluster
129+
if msg.MessageType() == message.MessageTypeAlterReplicateConfig {
130+
alterMsg := message.MustAsMutableAlterReplicateConfigMessageV2(msg)
131+
if alterMsg.Header().GetIsPchannelIncreasing() {
132+
newCfg, newCfgErr := replicateutil.NewConfigHelper(s.clusterID, alterMsg.Header().GetReplicateConfiguration())
133+
if newCfgErr != nil {
134+
return nil, status.NewReplicateViolation("failed to parse new replicate config from message header: %s", newCfgErr.Error())
135+
}
136+
channelMappingSourceCluster = newCfg.GetCluster(rh.ClusterID)
137+
if channelMappingSourceCluster == nil {
138+
return nil, status.NewReplicateViolation("source cluster %s not found in new replicate configuration", rh.ClusterID)
139+
}
140+
}
141+
}
142+
143+
targetVChannel, err := s.getTargetVChannel(channelMappingSourceCluster, msg.VChannel())
124144
if err != nil {
125145
return nil, err
126146
}
127147

128148
// Get target broadcast vchannels on current cluster that should be written to
129149
if bh := msg.BroadcastHeader(); bh != nil {
130-
// broadcast header have vchannels, so we need to overwrite it.
131150
targetBroadcastVChannels := make([]string, 0, len(bh.VChannels))
132151
for _, vchannel := range bh.VChannels {
133-
targetBroadcastVChannel, err := s.getTargetVChannel(sourceCluster, vchannel)
152+
targetBroadcastVChannel, err := s.getTargetVChannel(channelMappingSourceCluster, vchannel)
134153
if err != nil {
135154
return nil, status.NewReplicateViolation("failed to get target channel, %s", err.Error())
136155
}

internal/distributed/streaming/replicate_service_test.go

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,228 @@ func TestReplicateService_SkipMessageTypes(t *testing.T) {
397397
})
398398
}
399399

400+
func TestReplicateService_AlterConfigPChannelIncreasing(t *testing.T) {
401+
// New config adds a 3rd channel (dml_2)
402+
newConfig := &commonpb.ReplicateConfiguration{
403+
Clusters: []*commonpb.MilvusCluster{
404+
{ClusterId: "primary", Pchannels: []string{"primary-rootcoord-dml_0", "primary-rootcoord-dml_1", "primary-rootcoord-dml_2"}},
405+
{ClusterId: "by-dev", Pchannels: []string{"by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1", "by-dev-rootcoord-dml_2"}},
406+
},
407+
CrossClusterTopology: []*commonpb.CrossClusterTopology{
408+
{SourceClusterId: "primary", TargetClusterId: "by-dev"},
409+
},
410+
}
411+
412+
t.Run("with_flag_maps_all_channels", func(t *testing.T) {
413+
c := mock_client.NewMockClient(t)
414+
as := mock_client.NewMockAssignmentService(t)
415+
c.EXPECT().Assignment().Return(as).Maybe()
416+
417+
h := mock_handler.NewMockHandlerClient(t)
418+
p := mock_producer.NewMockProducer(t)
419+
p.EXPECT().Append(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, mm message.MutableMessage) (*types.AppendResult, error) {
420+
msg := message.MustAsMutableAlterReplicateConfigMessageV2(mm)
421+
// With IsPchannelIncreasing flag, all 3 channels (including new one)
422+
// should be mapped using the new config from the message header.
423+
bh := msg.BroadcastHeader()
424+
assert.NotNil(t, bh)
425+
assert.Len(t, bh.VChannels, 3, "all channels including new one should be mapped")
426+
for _, vchannel := range bh.VChannels {
427+
assert.True(t, strings.HasPrefix(vchannel, "by-dev"), "vchannel should be mapped to secondary cluster")
428+
}
429+
return &types.AppendResult{
430+
MessageID: walimplstest.NewTestMessageID(1),
431+
TimeTick: 1,
432+
}, nil
433+
}).Maybe()
434+
p.EXPECT().IsAvailable().Return(true).Maybe()
435+
p.EXPECT().Available().Return(make(chan struct{})).Maybe()
436+
h.EXPECT().CreateProducer(mock.Anything, mock.Anything).Return(p, nil).Maybe()
437+
438+
// Current (old) config has 2 channels
439+
as.EXPECT().GetReplicateConfiguration(mock.Anything).Return(replicateutil.MustNewConfigHelper(
440+
"by-dev",
441+
&commonpb.ReplicateConfiguration{
442+
Clusters: []*commonpb.MilvusCluster{
443+
{ClusterId: "primary", Pchannels: []string{"primary-rootcoord-dml_0", "primary-rootcoord-dml_1"}},
444+
{ClusterId: "by-dev", Pchannels: []string{"by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1"}},
445+
},
446+
CrossClusterTopology: []*commonpb.CrossClusterTopology{
447+
{SourceClusterId: "primary", TargetClusterId: "by-dev"},
448+
},
449+
},
450+
), nil)
451+
as.EXPECT().GetLatestAssignments(mock.Anything).Return(nil, errors.New("not needed")).Maybe()
452+
453+
rs := &replicateService{
454+
walAccesserImpl: &walAccesserImpl{
455+
lifetime: typeutil.NewLifetime(),
456+
clusterID: "by-dev",
457+
streamingCoordClient: c,
458+
handlerClient: h,
459+
producers: make(map[string]*producer.ResumableProducer),
460+
},
461+
}
462+
463+
// Build AlterReplicateConfig broadcast with 3 channels and IsPchannelIncreasing flag
464+
replicateMsgs := createReplicateAlterConfigMessages(newConfig,
465+
[]string{"primary-rootcoord-dml_0", "primary-rootcoord-dml_1", "primary-rootcoord-dml_2"},
466+
true)
467+
468+
for _, msg := range replicateMsgs {
469+
_, err := rs.Append(context.Background(), msg)
470+
assert.NoError(t, err)
471+
}
472+
})
473+
474+
t.Run("without_flag_fails_for_unknown_channel", func(t *testing.T) {
475+
c := mock_client.NewMockClient(t)
476+
as := mock_client.NewMockAssignmentService(t)
477+
c.EXPECT().Assignment().Return(as).Maybe()
478+
479+
h := mock_handler.NewMockHandlerClient(t)
480+
481+
// Current (old) config has 2 channels
482+
as.EXPECT().GetReplicateConfiguration(mock.Anything).Return(replicateutil.MustNewConfigHelper(
483+
"by-dev",
484+
&commonpb.ReplicateConfiguration{
485+
Clusters: []*commonpb.MilvusCluster{
486+
{ClusterId: "primary", Pchannels: []string{"primary-rootcoord-dml_0", "primary-rootcoord-dml_1"}},
487+
{ClusterId: "by-dev", Pchannels: []string{"by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1"}},
488+
},
489+
CrossClusterTopology: []*commonpb.CrossClusterTopology{
490+
{SourceClusterId: "primary", TargetClusterId: "by-dev"},
491+
},
492+
},
493+
), nil)
494+
495+
rs := &replicateService{
496+
walAccesserImpl: &walAccesserImpl{
497+
lifetime: typeutil.NewLifetime(),
498+
clusterID: "by-dev",
499+
streamingCoordClient: c,
500+
handlerClient: h,
501+
producers: make(map[string]*producer.ResumableProducer),
502+
},
503+
}
504+
505+
// Build AlterReplicateConfig broadcast with 3 channels but NO IsPchannelIncreasing flag
506+
replicateMsgs := createReplicateAlterConfigMessages(newConfig,
507+
[]string{"primary-rootcoord-dml_0", "primary-rootcoord-dml_1", "primary-rootcoord-dml_2"},
508+
false)
509+
510+
// Should fail because the old config doesn't know about primary-rootcoord-dml_2
511+
for _, msg := range replicateMsgs {
512+
_, err := rs.Append(context.Background(), msg)
513+
assert.Error(t, err)
514+
assert.Contains(t, err.Error(), "failed to get target channel")
515+
}
516+
})
517+
518+
t.Run("with_flag_invalid_config_in_header", func(t *testing.T) {
519+
c := mock_client.NewMockClient(t)
520+
as := mock_client.NewMockAssignmentService(t)
521+
c.EXPECT().Assignment().Return(as).Maybe()
522+
523+
h := mock_handler.NewMockHandlerClient(t)
524+
525+
// Current config has 2 channels
526+
as.EXPECT().GetReplicateConfiguration(mock.Anything).Return(replicateutil.MustNewConfigHelper(
527+
"by-dev",
528+
&commonpb.ReplicateConfiguration{
529+
Clusters: []*commonpb.MilvusCluster{
530+
{ClusterId: "primary", Pchannels: []string{"primary-rootcoord-dml_0", "primary-rootcoord-dml_1"}},
531+
{ClusterId: "by-dev", Pchannels: []string{"by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1"}},
532+
},
533+
CrossClusterTopology: []*commonpb.CrossClusterTopology{
534+
{SourceClusterId: "primary", TargetClusterId: "by-dev"},
535+
},
536+
},
537+
), nil)
538+
539+
rs := &replicateService{
540+
walAccesserImpl: &walAccesserImpl{
541+
lifetime: typeutil.NewLifetime(),
542+
clusterID: "by-dev",
543+
streamingCoordClient: c,
544+
handlerClient: h,
545+
producers: make(map[string]*producer.ResumableProducer),
546+
},
547+
}
548+
549+
// Build with IsPchannelIncreasing flag but an invalid config (no topology, multiple primary => error)
550+
invalidConfig := &commonpb.ReplicateConfiguration{
551+
Clusters: []*commonpb.MilvusCluster{
552+
{ClusterId: "primary", Pchannels: []string{"primary-rootcoord-dml_0"}},
553+
{ClusterId: "by-dev", Pchannels: []string{"by-dev-rootcoord-dml_0"}},
554+
},
555+
// Missing CrossClusterTopology => both clusters are "primary" => primaryCount != 1
556+
}
557+
replicateMsgs := createReplicateAlterConfigMessages(invalidConfig,
558+
[]string{"primary-rootcoord-dml_0"},
559+
true)
560+
561+
for _, msg := range replicateMsgs {
562+
_, err := rs.Append(context.Background(), msg)
563+
assert.Error(t, err)
564+
assert.Contains(t, err.Error(), "failed to parse new replicate config")
565+
}
566+
})
567+
568+
t.Run("with_flag_source_cluster_missing_in_new_config", func(t *testing.T) {
569+
c := mock_client.NewMockClient(t)
570+
as := mock_client.NewMockAssignmentService(t)
571+
c.EXPECT().Assignment().Return(as).Maybe()
572+
573+
h := mock_handler.NewMockHandlerClient(t)
574+
575+
// Current config has 2 channels
576+
as.EXPECT().GetReplicateConfiguration(mock.Anything).Return(replicateutil.MustNewConfigHelper(
577+
"by-dev",
578+
&commonpb.ReplicateConfiguration{
579+
Clusters: []*commonpb.MilvusCluster{
580+
{ClusterId: "primary", Pchannels: []string{"primary-rootcoord-dml_0", "primary-rootcoord-dml_1"}},
581+
{ClusterId: "by-dev", Pchannels: []string{"by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1"}},
582+
},
583+
CrossClusterTopology: []*commonpb.CrossClusterTopology{
584+
{SourceClusterId: "primary", TargetClusterId: "by-dev"},
585+
},
586+
},
587+
), nil)
588+
589+
rs := &replicateService{
590+
walAccesserImpl: &walAccesserImpl{
591+
lifetime: typeutil.NewLifetime(),
592+
clusterID: "by-dev",
593+
streamingCoordClient: c,
594+
handlerClient: h,
595+
producers: make(map[string]*producer.ResumableProducer),
596+
},
597+
}
598+
599+
// Build with IsPchannelIncreasing flag but the new config uses "other-cluster" instead of "primary"
600+
// The replicate header has ClusterID="primary", but the new config doesn't contain "primary"
601+
missingSourceConfig := &commonpb.ReplicateConfiguration{
602+
Clusters: []*commonpb.MilvusCluster{
603+
{ClusterId: "other-cluster", Pchannels: []string{"other-rootcoord-dml_0"}},
604+
{ClusterId: "by-dev", Pchannels: []string{"by-dev-rootcoord-dml_0"}},
605+
},
606+
CrossClusterTopology: []*commonpb.CrossClusterTopology{
607+
{SourceClusterId: "other-cluster", TargetClusterId: "by-dev"},
608+
},
609+
}
610+
replicateMsgs := createReplicateAlterConfigMessages(missingSourceConfig,
611+
[]string{"primary-rootcoord-dml_0"},
612+
true)
613+
614+
for _, msg := range replicateMsgs {
615+
_, err := rs.Append(context.Background(), msg)
616+
assert.Error(t, err)
617+
assert.Contains(t, err.Error(), "source cluster primary not found in new replicate configuration")
618+
}
619+
})
620+
}
621+
400622
func TestBuildSkipMessageTypes(t *testing.T) {
401623
t.Run("normal", func(t *testing.T) {
402624
m := buildSkipMessageTypes([]string{"AlterResourceGroup", "DropResourceGroup"})
@@ -457,6 +679,26 @@ func broadcastMsgToReplicateMsgs(broadcastMsg message.BroadcastMutableMessage) [
457679
return replicateMsgs
458680
}
459681

682+
func createReplicateAlterConfigMessages(newConfig *commonpb.ReplicateConfiguration, broadcastChannels []string, isPchannelIncreasing bool) []message.ReplicateMutableMessage {
683+
alterMsg := message.NewAlterReplicateConfigMessageBuilderV2().
684+
WithHeader(&message.AlterReplicateConfigMessageHeader{
685+
ReplicateConfiguration: newConfig,
686+
IsPchannelIncreasing: isPchannelIncreasing,
687+
}).
688+
WithBody(&message.AlterReplicateConfigMessageBody{}).
689+
WithBroadcast(broadcastChannels).
690+
MustBuildBroadcast()
691+
msgs := alterMsg.WithBroadcastID(200).SplitIntoMutableMessage()
692+
replicateMsgs := make([]message.ReplicateMutableMessage, 0, len(msgs))
693+
for _, msg := range msgs {
694+
immutableMsg := msg.WithLastConfirmedUseMessageID().WithTimeTick(1).IntoImmutableMessage(pulsar2.NewPulsarID(
695+
pulsar.NewMessageID(1, 2, 3, 4),
696+
))
697+
replicateMsgs = append(replicateMsgs, message.MustNewReplicateMessage("primary", immutableMsg.IntoImmutableMessageProto()))
698+
}
699+
return replicateMsgs
700+
}
701+
460702
func createReplicateCreateCollectionMessages() []message.ReplicateMutableMessage {
461703
schema := &schemapb.CollectionSchema{
462704
Fields: []*schemapb.FieldSchema{

internal/streamingcoord/server/balancer/channel/manager.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -637,14 +637,36 @@ func (cm *ChannelManager) getNewIncomingTask(newConfig *replicateutil.ConfigHelp
637637
}
638638
incomingReplicatingTasks := make([]*streamingpb.ReplicatePChannelMeta, 0, len(incoming.TargetClusters()))
639639
for _, targetCluster := range incoming.TargetClusters() {
640-
if current != nil && current.TargetCluster(targetCluster.GetClusterId()) != nil {
641-
// target already exists, skip it.
642-
continue
640+
// Determine which pchannels are new and need CDC tasks.
641+
// If the target cluster already exists, only create tasks for newly appended pchannels.
642+
newPchannels := targetCluster.GetPchannels()
643+
skipGetReplicateCheckpoint := false
644+
if current != nil {
645+
if currentTarget := current.TargetCluster(targetCluster.GetClusterId()); currentTarget != nil {
646+
existingCount := len(currentTarget.GetPchannels())
647+
if existingCount >= len(newPchannels) {
648+
// No new pchannels, skip this target cluster.
649+
continue
650+
}
651+
// Only process newly appended pchannels (validator ensures existing pchannels are preserved at same positions).
652+
newPchannels = newPchannels[existingCount:]
653+
// For pchannel-increasing tasks, the secondary WAL for new pchannels hasn't received
654+
// the AlterReplicateConfig yet, so GetReplicateInfo would fail. Skip it and use
655+
// InitializedCheckpoint directly. The secondary filters out duplicates on restart.
656+
skipGetReplicateCheckpoint = true
657+
}
643658
}
644-
// TODO: support add new pchannels into existing clusters.
645-
for _, pchannel := range targetCluster.GetPchannels() {
659+
for _, pchannel := range newPchannels {
646660
sourceClusterID := targetCluster.SourceCluster().ClusterId
647661
sourcePChannel := targetCluster.MustGetSourceChannel(pchannel)
662+
checkpointTimeTick := appendResults[sourcePChannel].TimeTick
663+
if skipGetReplicateCheckpoint {
664+
// For pchannel-increasing tasks, the CDC scanner uses DeliverFilterTimeTickGT
665+
// (strictly greater than). Subtract 1 so the AlterReplicateConfig message itself
666+
// (whose TimeTick == appendResults.TimeTick) is included in the scan.
667+
// The secondary needs this message on ALL pchannels for the broadcast to complete.
668+
checkpointTimeTick--
669+
}
648670
incomingReplicatingTasks = append(incomingReplicatingTasks, &streamingpb.ReplicatePChannelMeta{
649671
SourceChannelName: sourcePChannel,
650672
TargetChannelName: pchannel,
@@ -658,8 +680,9 @@ func (cm *ChannelManager) getNewIncomingTask(newConfig *replicateutil.ConfigHelp
658680
ClusterId: sourceClusterID,
659681
Pchannel: sourcePChannel,
660682
MessageId: appendResults[sourcePChannel].LastConfirmedMessageID.IntoProto(),
661-
TimeTick: appendResults[sourcePChannel].TimeTick,
683+
TimeTick: checkpointTimeTick,
662684
},
685+
SkipGetReplicateCheckpoint: skipGetReplicateCheckpoint,
663686
})
664687
}
665688
}

0 commit comments

Comments
 (0)