Skip to content

Commit b18ebd9

Browse files
authored
enhance: Remove legacy cdc/replication (milvus-io#46603)
issue: milvus-io#44123 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> - Core invariant: legacy in-cluster CDC/replication plumbing (ReplicateMsg types, ReplicateID-based guards and flags) is obsolete — the system relies on standard msgstream positions, subPos/end-ts semantics and timetick ordering as the single source of truth for message ordering and skipping, so replication-specific channels/types/guards can be removed safely. - Removed/simplified logic (what and why): removed replication feature flags and params (ReplicateMsgChannel, TTMsgEnabled, CollectionReplicateEnable), ReplicateMsg type and its tests, ReplicateID constants/helpers and MergeProperties hooks, ReplicateConfig and its propagation (streamPipeline, StreamConfig, dispatcher, target), replicate-aware dispatcher/pipeline branches, and replicate-mode pre-checks/timestamp-allocation in proxy tasks — these implemented a redundant alternate “replicate-mode” pathway that duplicated position/end-ts and timetick logic. - Why this does NOT cause data loss or regression (concrete code paths): no persistence or core write paths were removed — proxy PreExecute flows (internal/proxy/task_*.go) still perform the same schema/ID/size validations and then follow the normal non-replicate execution path; dispatcher and pipeline continue to use position/subPos and pullback/end-ts in Seek/grouping (pkg/mq/msgdispatcher/dispatcher.go, internal/util/pipeline/stream_pipeline.go), so skipping and ordering behavior remains unchanged; timetick emission in rootcoord (sendMinDdlTsAsTt) is now ungated (no silent suppression), preserving or increasing timetick delivery rather than removing it. - PR type and net effect: Enhancement/Refactor — removes deprecated replication API surface (types, helpers, config, tests) and replication branches, simplifies public APIs and constructor signatures, and reduces surface area for future maintenance while keeping DML/DDL persistence, ordering, and seek semantics intact. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
1 parent b7761d6 commit b18ebd9

58 files changed

Lines changed: 214 additions & 1656 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

configs/milvus.yaml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,6 @@ msgChannel:
841841
# Caution: Changing this parameter after using Milvus for a period of time will affect your access to old data.
842842
# It is recommended to change this parameter before starting Milvus for the first time.
843843
rootCoordDml: rootcoord-dml
844-
replicateMsg: replicate-msg
845844
# Sub-name prefix of the message channel where the query node publishes time tick messages.
846845
# The complete channel name prefix is ${msgChannel.chanNamePrefix.cluster}-${msgChannel.chanNamePrefix.queryTimeTick}
847846
# Caution: Changing this parameter after using Milvus for a period of time will affect your access to old data.
@@ -1024,16 +1023,11 @@ common:
10241023
enabled: false # enable split by average size policy in storage v2
10251024
threshold: 1024 # split by average size policy threshold(in bytes) in storage v2
10261025
useLoonFFI: false
1027-
# Whether to disable the internal time messaging mechanism for the system.
1028-
# If disabled (set to false), the system will not allow DML operations, including insertion, deletion, queries, and searches.
1029-
# This helps Milvus-CDC synchronize incremental data
1030-
ttMsgEnabled: true
10311026
traceLogMode: 0 # trace request info
10321027
bloomFilterSize: 100000 # bloom filter initial size
10331028
bloomFilterType: BlockedBloomFilter # bloom filter type, support BasicBloomFilter and BlockedBloomFilter
10341029
maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter
10351030
bloomFilterApplyBatchSize: 1000 # batch size when to apply pk to bloom filter
1036-
collectionReplicateEnable: false # Whether to enable collection replication.
10371031
usePartitionKeyAsClusteringKey: false # if true, do clustering compaction and segment prune on partition key field
10381032
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
10391033
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction

internal/distributed/streaming/msgstream_adaptor.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,3 @@ func (m *delegatorMsgstreamAdaptor) GetLatestMsgID(channel string) (msgstream.Me
129129
func (m *delegatorMsgstreamAdaptor) CheckTopicValid(channel string) error {
130130
panic("should never be called")
131131
}
132-
133-
func (m *delegatorMsgstreamAdaptor) ForceEnableProduce(can bool) {
134-
panic("should never be called")
135-
}

internal/distributed/streaming/msgstream_adaptor_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -161,14 +161,4 @@ func TestDelegatorMsgstreamAdaptor(t *testing.T) {
161161
}()
162162
_ = adaptor.CheckTopicValid("channel1")
163163
})
164-
165-
// Test ForceEnableProduce
166-
t.Run("ForceEnableProduce", func(t *testing.T) {
167-
defer func() {
168-
if r := recover(); r == nil {
169-
t.Errorf("ForceEnableProduce should panic but did not")
170-
}
171-
}()
172-
adaptor.ForceEnableProduce(true)
173-
})
174164
}

internal/flushcommon/pipeline/flow_graph_dmstream_input_node.go

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
2828
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
2929
"github.com/milvus-io/milvus/internal/util/flowgraph"
30-
pkgcommon "github.com/milvus-io/milvus/pkg/v2/common"
3130
"github.com/milvus-io/milvus/pkg/v2/log"
3231
"github.com/milvus-io/milvus/pkg/v2/metrics"
3332
"github.com/milvus-io/milvus/pkg/v2/mq/common"
@@ -76,19 +75,11 @@ func createNewInputFromDispatcher(initCtx context.Context,
7675
start = time.Now()
7776
)
7877

79-
replicateID, _ := pkgcommon.GetReplicateID(schema.GetProperties())
80-
if replicateID == "" {
81-
log.Info("datanode consume without replicateID, try to get replicateID from dbProperties", zap.Any("dbProperties", dbProperties))
82-
replicateID, _ = pkgcommon.GetReplicateID(dbProperties)
83-
}
84-
replicateConfig := msgstream.GetReplicateConfig(replicateID, schema.GetDbName(), schema.GetName())
85-
8678
if seekPos != nil && len(seekPos.MsgID) != 0 {
8779
input, err = dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
88-
VChannel: vchannel,
89-
Pos: seekPos,
90-
SubPos: common.SubscriptionPositionUnknown,
91-
ReplicateConfig: replicateConfig,
80+
VChannel: vchannel,
81+
Pos: seekPos,
82+
SubPos: common.SubscriptionPositionUnknown,
9283
})
9384
if err != nil {
9485
log.Warn("datanode consume failed after retried", zap.Error(err))
@@ -105,10 +96,9 @@ func createNewInputFromDispatcher(initCtx context.Context,
10596
}
10697

10798
input, err = dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
108-
VChannel: vchannel,
109-
Pos: nil,
110-
SubPos: common.SubscriptionPositionEarliest,
111-
ReplicateConfig: replicateConfig,
99+
VChannel: vchannel,
100+
Pos: nil,
101+
SubPos: common.SubscriptionPositionEarliest,
112102
})
113103
if err != nil {
114104
log.Warn("datanode consume failed after retried", zap.Error(err))

internal/flushcommon/pipeline/flow_graph_dmstream_input_node_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ func (mm *mockMsgStreamFactory) NewMsgStreamDisposer(ctx context.Context) func([
6262

6363
type mockTtMsgStream struct{}
6464

65-
func (mtm *mockTtMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {
66-
}
67-
6865
func (mtm *mockTtMsgStream) Close() {}
6966

7067
func (mtm *mockTtMsgStream) Chan() <-chan *msgstream.ConsumeMsgPack {
@@ -107,9 +104,6 @@ func (mtm *mockTtMsgStream) CheckTopicValid(channel string) error {
107104
return nil
108105
}
109106

110-
func (mtm *mockTtMsgStream) ForceEnableProduce(can bool) {
111-
}
112-
113107
func TestNewDmInputNode(t *testing.T) {
114108
assert.Panics(t, func() {
115109
newDmInputNode(&nodeConfig{

internal/proxy/meta_cache.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ type collectionInfo struct {
9999
createdUtcTimestamp uint64
100100
consistencyLevel commonpb.ConsistencyLevel
101101
partitionKeyIsolation bool
102-
replicateID string
103102
updateTimestamp uint64
104103
collectionTTL uint64
105104
numPartitions int64
@@ -489,7 +488,6 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
489488
m.collInfo[database] = make(map[string]*collectionInfo)
490489
}
491490

492-
replicateID, _ := common.GetReplicateID(collection.Properties)
493491
m.collInfo[database][collectionName] = &collectionInfo{
494492
collID: collection.CollectionID,
495493
schema: schemaInfo,
@@ -498,7 +496,6 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
498496
createdUtcTimestamp: collection.CreatedUtcTimestamp,
499497
consistencyLevel: collection.ConsistencyLevel,
500498
partitionKeyIsolation: isolation,
501-
replicateID: replicateID,
502499
updateTimestamp: collection.UpdateTimestamp,
503500
collectionTTL: getCollectionTTL(schemaInfo.CollectionSchema.GetProperties()),
504501
vChannels: collection.VirtualChannelNames,

internal/proxy/mock_msgstream_test.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ import (
1010

1111
type mockMsgStream struct {
1212
msgstream.MsgStream
13-
asProducer func([]string)
14-
setRepack func(repackFunc msgstream.RepackFunc)
15-
close func()
16-
forceEnableProduce func(bool)
13+
asProducer func([]string)
14+
setRepack func(repackFunc msgstream.RepackFunc)
15+
close func()
1716
}
1817

1918
func (m *mockMsgStream) AsProducer(ctx context.Context, producers []string) {
@@ -34,15 +33,6 @@ func (m *mockMsgStream) Close() {
3433
}
3534
}
3635

37-
func (m *mockMsgStream) ForceEnableProduce(enabled bool) {
38-
if m.forceEnableProduce != nil {
39-
m.forceEnableProduce(enabled)
40-
}
41-
}
42-
43-
func (m *mockMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {
44-
}
45-
4636
func newMockMsgStream() *mockMsgStream {
4737
return &mockMsgStream{}
4838
}

internal/proxy/mock_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,6 @@ func (ms *simpleMockMsgStream) CheckTopicValid(topic string) error {
314314
return nil
315315
}
316316

317-
func (ms *simpleMockMsgStream) ForceEnableProduce(enabled bool) {
318-
}
319-
320-
func (ms *simpleMockMsgStream) SetReplicate(config *msgstream.ReplicateConfig) {
321-
}
322-
323317
func newSimpleMockMsgStream() *simpleMockMsgStream {
324318
return &simpleMockMsgStream{
325319
msgChan: make(chan *msgstream.ConsumeMsgPack, 1024),

internal/proxy/repack_func.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,3 @@ func defaultInsertRepackFunc(
8181
}
8282
return pack, nil
8383
}
84-
85-
func replicatePackFunc(
86-
tsMsgs []msgstream.TsMsg,
87-
hashKeys [][]int32,
88-
) (map[int32]*msgstream.MsgPack, error) {
89-
return map[int32]*msgstream.MsgPack{
90-
0: {
91-
Msgs: tsMsgs,
92-
},
93-
}, nil
94-
}

internal/proxy/task.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream"
3838
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
3939
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
40-
"github.com/milvus-io/milvus/pkg/v2/proto/rootcoordpb"
4140
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
4241
"github.com/milvus-io/milvus/pkg/v2/util/funcutil"
4342
"github.com/milvus-io/milvus/pkg/v2/util/merr"
@@ -1366,26 +1365,6 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
13661365
"can not alter partition key isolation mode if the collection already has a vector index. Please drop the index first")
13671366
}
13681367
}
1369-
1370-
_, ok := common.IsReplicateEnabled(t.Properties)
1371-
if ok {
1372-
return merr.WrapErrParameterInvalidMsg("can't set the replicate.id property")
1373-
}
1374-
endTS, ok := common.GetReplicateEndTS(t.Properties)
1375-
if ok && collBasicInfo.replicateID != "" {
1376-
allocResp, err := t.mixCoord.AllocTimestamp(ctx, &rootcoordpb.AllocTimestampRequest{
1377-
Count: 1,
1378-
BlockTimestamp: endTS,
1379-
})
1380-
if err = merr.CheckRPCCall(allocResp, err); err != nil {
1381-
return merr.WrapErrServiceInternal("alloc timestamp failed", err.Error())
1382-
}
1383-
if allocResp.GetTimestamp() <= endTS {
1384-
return merr.WrapErrServiceInternal("alter collection: alloc timestamp failed, timestamp is not greater than endTS",
1385-
fmt.Sprintf("timestamp = %d, endTS = %d", allocResp.GetTimestamp(), endTS))
1386-
}
1387-
}
1388-
13891368
return nil
13901369
}
13911370

0 commit comments

Comments
 (0)