Skip to content

Commit 7c575a1

Browse files
authored
enhance: support AckSyncUp for broadcaster, and enable it in truncate api (milvus-io#46313)
issue: milvus-io#43897 also for issue: milvus-io#46166 add ack_sync_up flag into broadcast message header, which indicates that whether the broadcast operation is need to be synced up between the streaming node and the coordinator. If the ack_sync_up is false, the broadcast operation will be acked once the recovery storage see the message at current vchannel, the fast ack operation can be applied to speed up the broadcast operation. If the ack_sync_up is true, the broadcast operation will be acked after the checkpoint of current vchannel reach current message. The fast ack operation can not be applied to speed up the broadcast operation, because the ack operation need to be synced up with streaming node. e.g. if truncate collection operation want to call ack once callback after the all segment are flushed at current vchannel, it should set the ack_sync_up to be true. TODO: current implementation doesn't promise the ack sync up semantic, it only promise FastAck operation will not be applied, wait for 3.0 to implement the ack sync up semantic. only for truncate api now. --------- Signed-off-by: chyezh <chyezh@outlook.com>
1 parent 46c1478 commit 7c575a1

38 files changed

Lines changed: 1000 additions & 314 deletions

internal/datacoord/handler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,7 @@ func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID
565565
return nil, err
566566
}
567567

568+
// TODO: the cache should be removed in next step.
568569
return h.s.meta.GetCollection(collectionID), nil
569570
}
570571

internal/datacoord/server_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2582,7 +2582,6 @@ func TestServer_InitMessageCallback(t *testing.T) {
25822582
server.initMessageCallback()
25832583

25842584
// Test Import message check callback
2585-
resourceKey := message.NewImportJobIDResourceKey(1)
25862585
msg, err := message.NewImportMessageBuilderV1().
25872586
WithHeader(&message.ImportMessageHeader{}).
25882587
WithBody(&msgpb.ImportMsg{
@@ -2591,7 +2590,7 @@ func TestServer_InitMessageCallback(t *testing.T) {
25912590
},
25922591
Schema: &schemapb.CollectionSchema{},
25932592
}).
2594-
WithBroadcast([]string{"ch-0"}, resourceKey).
2593+
WithBroadcast([]string{"ch-0"}).
25952594
BuildBroadcast()
25962595
err = registry.CallMessageCheckCallback(ctx, msg)
25972596
assert.NoError(t, err)
@@ -2605,7 +2604,7 @@ func TestServer_InitMessageCallback(t *testing.T) {
26052604
},
26062605
Schema: &schemapb.CollectionSchema{},
26072606
}).
2608-
WithBroadcast([]string{"test_channel"}, resourceKey).
2607+
WithBroadcast([]string{"test_channel"}).
26092608
MustBuildBroadcast()
26102609
err = registry.CallMessageAckCallback(ctx, importMsg, map[string]*message.AppendResult{
26112610
"test_channel": {

internal/datacoord/util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ func getCompactedSegmentSize(s *datapb.CompactionSegment) int64 {
197197
// getCollectionAutoCompactionEnabled returns whether auto compaction for collection is enabled.
198198
// if not set, returns global auto compaction config.
199199
func getCollectionAutoCompactionEnabled(properties map[string]string) (bool, error) {
200+
// when collection is on truncating, disable auto compaction.
201+
if _, ok := properties[common.CollectionOnTruncatingKey]; ok {
202+
return false, nil
203+
}
200204
v, ok := properties[common.CollectionAutoCompactionKey]
201205
if ok {
202206
enabled, err := strconv.ParseBool(v)

internal/distributed/streaming/streaming_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func TestStreamingBroadcast(t *testing.T) {
147147
CollectionID: 1,
148148
CollectionName: collectionName,
149149
}).
150-
WithBroadcast(vChannels, message.NewExclusiveCollectionNameResourceKey("db", collectionName)).
150+
WithBroadcast(vChannels).
151151
BuildBroadcast()
152152

153153
resp, err := streaming.WAL().Broadcast().Append(context.Background(), msg)

internal/metastore/kv/rootcoord/kv_catalog.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,7 @@ func (kc *Catalog) alterModifyCollection(ctx context.Context, oldColl *model.Col
698698
oldCollClone.UpdateTimestamp = newColl.UpdateTimestamp
699699
oldCollClone.EnableDynamicField = newColl.EnableDynamicField
700700
oldCollClone.SchemaVersion = newColl.SchemaVersion
701+
oldCollClone.ShardInfos = newColl.ShardInfos
701702

702703
newKey := BuildCollectionKey(newColl.DBID, oldColl.CollectionID)
703704
value, err := proto.Marshal(model.MarshalCollectionModel(oldCollClone))

internal/metastore/model/collection.go

Lines changed: 148 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -28,30 +28,36 @@ import (
2828

2929
// TODO: These collection is dirty implementation and easy to be broken, we should drop it in the future.
3030
type Collection struct {
31-
TenantID string
32-
DBID int64
33-
CollectionID int64
34-
Partitions []*Partition
35-
Name string
36-
DBName string
37-
Description string
38-
AutoID bool
39-
Fields []*Field
40-
StructArrayFields []*StructArrayField
41-
Functions []*Function
42-
VirtualChannelNames []string
43-
PhysicalChannelNames []string
44-
ShardsNum int32
45-
StartPositions []*commonpb.KeyDataPair
46-
CreateTime uint64
47-
ConsistencyLevel commonpb.ConsistencyLevel
48-
Aliases []string // TODO: deprecate this.
49-
Properties []*commonpb.KeyValuePair
50-
State pb.CollectionState
51-
EnableDynamicField bool
52-
UpdateTimestamp uint64
53-
SchemaVersion int32
54-
LastTruncateTimestamps map[string]uint64
31+
TenantID string
32+
DBID int64
33+
CollectionID int64
34+
Partitions []*Partition
35+
Name string
36+
DBName string
37+
Description string
38+
AutoID bool
39+
Fields []*Field
40+
StructArrayFields []*StructArrayField
41+
Functions []*Function
42+
VirtualChannelNames []string
43+
PhysicalChannelNames []string
44+
ShardsNum int32
45+
StartPositions []*commonpb.KeyDataPair
46+
CreateTime uint64
47+
ConsistencyLevel commonpb.ConsistencyLevel
48+
Aliases []string // TODO: deprecate this.
49+
Properties []*commonpb.KeyValuePair
50+
State pb.CollectionState
51+
EnableDynamicField bool
52+
UpdateTimestamp uint64
53+
SchemaVersion int32
54+
ShardInfos map[string]*ShardInfo
55+
}
56+
57+
type ShardInfo struct {
58+
PChannelName string // the pchannel name of the shard, it is the same with the physical channel name.
59+
VChannelName string // the vchannel name of the shard, it is the same with the virtual channel name.
60+
LastTruncateTimeTick uint64 // the last truncate time tick of the shard, if the shard is not truncated, the value is 0.
5561
}
5662

5763
func (c *Collection) Available() bool {
@@ -60,59 +66,67 @@ func (c *Collection) Available() bool {
6066

6167
func (c *Collection) ShallowClone() *Collection {
6268
return &Collection{
63-
TenantID: c.TenantID,
64-
DBID: c.DBID,
65-
CollectionID: c.CollectionID,
66-
Name: c.Name,
67-
DBName: c.DBName,
68-
Description: c.Description,
69-
AutoID: c.AutoID,
70-
Fields: c.Fields,
71-
StructArrayFields: c.StructArrayFields,
72-
Partitions: c.Partitions,
73-
VirtualChannelNames: c.VirtualChannelNames,
74-
PhysicalChannelNames: c.PhysicalChannelNames,
75-
ShardsNum: c.ShardsNum,
76-
ConsistencyLevel: c.ConsistencyLevel,
77-
CreateTime: c.CreateTime,
78-
StartPositions: c.StartPositions,
79-
Aliases: c.Aliases,
80-
Properties: c.Properties,
81-
State: c.State,
82-
EnableDynamicField: c.EnableDynamicField,
83-
Functions: c.Functions,
84-
UpdateTimestamp: c.UpdateTimestamp,
85-
SchemaVersion: c.SchemaVersion,
86-
LastTruncateTimestamps: c.LastTruncateTimestamps,
69+
TenantID: c.TenantID,
70+
DBID: c.DBID,
71+
CollectionID: c.CollectionID,
72+
Name: c.Name,
73+
DBName: c.DBName,
74+
Description: c.Description,
75+
AutoID: c.AutoID,
76+
Fields: c.Fields,
77+
StructArrayFields: c.StructArrayFields,
78+
Partitions: c.Partitions,
79+
VirtualChannelNames: c.VirtualChannelNames,
80+
PhysicalChannelNames: c.PhysicalChannelNames,
81+
ShardsNum: c.ShardsNum,
82+
ConsistencyLevel: c.ConsistencyLevel,
83+
CreateTime: c.CreateTime,
84+
StartPositions: c.StartPositions,
85+
Aliases: c.Aliases,
86+
Properties: c.Properties,
87+
State: c.State,
88+
EnableDynamicField: c.EnableDynamicField,
89+
Functions: c.Functions,
90+
UpdateTimestamp: c.UpdateTimestamp,
91+
SchemaVersion: c.SchemaVersion,
92+
ShardInfos: c.ShardInfos,
8793
}
8894
}
8995

9096
func (c *Collection) Clone() *Collection {
97+
shardInfos := make(map[string]*ShardInfo, len(c.ShardInfos))
98+
for channelName, shardInfo := range c.ShardInfos {
99+
shardInfos[channelName] = &ShardInfo{
100+
VChannelName: channelName,
101+
PChannelName: shardInfo.PChannelName,
102+
LastTruncateTimeTick: shardInfo.LastTruncateTimeTick,
103+
}
104+
}
91105
return &Collection{
92-
TenantID: c.TenantID,
93-
DBID: c.DBID,
94-
CollectionID: c.CollectionID,
95-
Name: c.Name,
96-
DBName: c.DBName,
97-
Description: c.Description,
98-
AutoID: c.AutoID,
99-
Fields: CloneFields(c.Fields),
100-
StructArrayFields: CloneStructArrayFields(c.StructArrayFields),
101-
Partitions: ClonePartitions(c.Partitions),
102-
VirtualChannelNames: common.CloneStringList(c.VirtualChannelNames),
103-
PhysicalChannelNames: common.CloneStringList(c.PhysicalChannelNames),
104-
ShardsNum: c.ShardsNum,
105-
ConsistencyLevel: c.ConsistencyLevel,
106-
CreateTime: c.CreateTime,
107-
StartPositions: common.CloneKeyDataPairs(c.StartPositions),
108-
Aliases: common.CloneStringList(c.Aliases),
109-
Properties: common.CloneKeyValuePairs(c.Properties),
110-
State: c.State,
111-
EnableDynamicField: c.EnableDynamicField,
112-
Functions: CloneFunctions(c.Functions),
113-
UpdateTimestamp: c.UpdateTimestamp,
114-
SchemaVersion: c.SchemaVersion,
115-
LastTruncateTimestamps: common.CloneMap(c.LastTruncateTimestamps),
106+
TenantID: c.TenantID,
107+
DBID: c.DBID,
108+
CollectionID: c.CollectionID,
109+
Name: c.Name,
110+
DBName: c.DBName,
111+
Description: c.Description,
112+
AutoID: c.AutoID,
113+
Fields: CloneFields(c.Fields),
114+
StructArrayFields: CloneStructArrayFields(c.StructArrayFields),
115+
Partitions: ClonePartitions(c.Partitions),
116+
VirtualChannelNames: common.CloneStringList(c.VirtualChannelNames),
117+
PhysicalChannelNames: common.CloneStringList(c.PhysicalChannelNames),
118+
ShardsNum: c.ShardsNum,
119+
ConsistencyLevel: c.ConsistencyLevel,
120+
CreateTime: c.CreateTime,
121+
StartPositions: common.CloneKeyDataPairs(c.StartPositions),
122+
Aliases: common.CloneStringList(c.Aliases),
123+
Properties: common.CloneKeyValuePairs(c.Properties),
124+
State: c.State,
125+
EnableDynamicField: c.EnableDynamicField,
126+
Functions: CloneFunctions(c.Functions),
127+
UpdateTimestamp: c.UpdateTimestamp,
128+
SchemaVersion: c.SchemaVersion,
129+
ShardInfos: shardInfos,
116130
}
117131
}
118132

@@ -179,29 +193,45 @@ func UnmarshalCollectionModel(coll *pb.CollectionInfo) *Collection {
179193
PartitionCreatedTimestamp: coll.PartitionCreatedTimestamps[idx],
180194
}
181195
}
196+
shardInfos := make(map[string]*ShardInfo, len(coll.VirtualChannelNames))
197+
for idx, channelName := range coll.VirtualChannelNames {
198+
if len(coll.ShardInfos) == 0 {
199+
shardInfos[channelName] = &ShardInfo{
200+
VChannelName: channelName,
201+
PChannelName: coll.PhysicalChannelNames[idx],
202+
LastTruncateTimeTick: 0,
203+
}
204+
} else {
205+
shardInfos[channelName] = &ShardInfo{
206+
VChannelName: channelName,
207+
PChannelName: coll.PhysicalChannelNames[idx],
208+
LastTruncateTimeTick: coll.ShardInfos[idx].LastTruncateTimeTick,
209+
}
210+
}
211+
}
182212

183213
return &Collection{
184-
CollectionID: coll.ID,
185-
DBID: coll.DbId,
186-
Name: coll.Schema.Name,
187-
DBName: coll.Schema.DbName,
188-
Description: coll.Schema.Description,
189-
AutoID: coll.Schema.AutoID,
190-
Fields: UnmarshalFieldModels(coll.GetSchema().GetFields()),
191-
StructArrayFields: UnmarshalStructArrayFieldModels(coll.GetSchema().GetStructArrayFields()),
192-
Partitions: partitions,
193-
VirtualChannelNames: coll.VirtualChannelNames,
194-
PhysicalChannelNames: coll.PhysicalChannelNames,
195-
ShardsNum: coll.ShardsNum,
196-
ConsistencyLevel: coll.ConsistencyLevel,
197-
CreateTime: coll.CreateTime,
198-
StartPositions: coll.StartPositions,
199-
State: coll.State,
200-
Properties: coll.Properties,
201-
EnableDynamicField: coll.Schema.EnableDynamicField,
202-
UpdateTimestamp: coll.UpdateTimestamp,
203-
SchemaVersion: coll.Schema.Version,
204-
LastTruncateTimestamps: coll.LastTruncateTimestamps,
214+
CollectionID: coll.ID,
215+
DBID: coll.DbId,
216+
Name: coll.Schema.Name,
217+
DBName: coll.Schema.DbName,
218+
Description: coll.Schema.Description,
219+
AutoID: coll.Schema.AutoID,
220+
Fields: UnmarshalFieldModels(coll.GetSchema().GetFields()),
221+
StructArrayFields: UnmarshalStructArrayFieldModels(coll.GetSchema().GetStructArrayFields()),
222+
Partitions: partitions,
223+
VirtualChannelNames: coll.VirtualChannelNames,
224+
PhysicalChannelNames: coll.PhysicalChannelNames,
225+
ShardsNum: coll.ShardsNum,
226+
ConsistencyLevel: coll.ConsistencyLevel,
227+
CreateTime: coll.CreateTime,
228+
StartPositions: coll.StartPositions,
229+
State: coll.State,
230+
Properties: coll.Properties,
231+
EnableDynamicField: coll.Schema.EnableDynamicField,
232+
UpdateTimestamp: coll.UpdateTimestamp,
233+
SchemaVersion: coll.Schema.Version,
234+
ShardInfos: shardInfos,
205235
}
206236
}
207237

@@ -265,20 +295,32 @@ func marshalCollectionModelWithConfig(coll *Collection, c *config) *pb.Collectio
265295
collSchema.StructArrayFields = structArrayFields
266296
}
267297

298+
shardInfos := make([]*pb.CollectionShardInfo, len(coll.ShardInfos))
299+
for idx, channelName := range coll.VirtualChannelNames {
300+
if shard, ok := coll.ShardInfos[channelName]; ok {
301+
shardInfos[idx] = &pb.CollectionShardInfo{
302+
LastTruncateTimeTick: shard.LastTruncateTimeTick,
303+
}
304+
} else {
305+
shardInfos[idx] = &pb.CollectionShardInfo{
306+
LastTruncateTimeTick: 0,
307+
}
308+
}
309+
}
268310
collectionPb := &pb.CollectionInfo{
269-
ID: coll.CollectionID,
270-
DbId: coll.DBID,
271-
Schema: collSchema,
272-
CreateTime: coll.CreateTime,
273-
VirtualChannelNames: coll.VirtualChannelNames,
274-
PhysicalChannelNames: coll.PhysicalChannelNames,
275-
ShardsNum: coll.ShardsNum,
276-
ConsistencyLevel: coll.ConsistencyLevel,
277-
StartPositions: coll.StartPositions,
278-
State: coll.State,
279-
Properties: coll.Properties,
280-
UpdateTimestamp: coll.UpdateTimestamp,
281-
LastTruncateTimestamps: coll.LastTruncateTimestamps,
311+
ID: coll.CollectionID,
312+
DbId: coll.DBID,
313+
Schema: collSchema,
314+
CreateTime: coll.CreateTime,
315+
VirtualChannelNames: coll.VirtualChannelNames,
316+
PhysicalChannelNames: coll.PhysicalChannelNames,
317+
ShardsNum: coll.ShardsNum,
318+
ConsistencyLevel: coll.ConsistencyLevel,
319+
StartPositions: coll.StartPositions,
320+
State: coll.State,
321+
Properties: coll.Properties,
322+
UpdateTimestamp: coll.UpdateTimestamp,
323+
ShardInfos: shardInfos,
282324
}
283325

284326
if c.withPartitions {

internal/metastore/model/collection_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ var (
7777
Value: "v",
7878
},
7979
},
80+
ShardInfos: map[string]*ShardInfo{
81+
"vch": {
82+
PChannelName: "pch",
83+
VChannelName: "vch",
84+
LastTruncateTimeTick: 0,
85+
},
86+
},
8087
}
8188

8289
deprecatedColPb = &pb.CollectionInfo{
@@ -588,6 +595,18 @@ func TestClone(t *testing.T) {
588595
Properties: []*commonpb.KeyValuePair{{Key: "32", Value: "33"}},
589596
State: pb.CollectionState_CollectionCreated,
590597
EnableDynamicField: true,
598+
ShardInfos: map[string]*ShardInfo{
599+
"c1": {
600+
PChannelName: "c3",
601+
VChannelName: "c1",
602+
LastTruncateTimeTick: 0,
603+
},
604+
"c2": {
605+
PChannelName: "c4",
606+
VChannelName: "c2",
607+
LastTruncateTimeTick: 0,
608+
},
609+
},
591610
}
592611

593612
clone1 := collection.Clone()

0 commit comments

Comments
 (0)