Skip to content

Commit 83da08c

Browse files
authored
enhance: Use map instead of slice to maintain channel info (milvus-io#32273)
See also milvus-io#32165 `ChannelManager.Match` is a frequent operation for datacoord. When the collection number is large, iteration over all channels will cost lots of CPU time and time consuming. This PR change the data structure storing datanode-channel info to map avoiding this iteration when checking channel existence. --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
1 parent 520a302 commit 83da08c

6 files changed

Lines changed: 250 additions & 217 deletions

File tree

internal/datacoord/channel_manager.go

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -508,13 +508,12 @@ func (c *ChannelManagerImpl) GetBufferChannels() *NodeChannelInfo {
508508
func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string {
509509
nodeChs := make(map[UniqueID][]string)
510510
for _, nodeChannels := range c.GetAssignedChannels() {
511-
filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool {
512-
return channel.GetCollectionID() == collectionID
513-
})
514-
channelNames := lo.Map(filtered, func(channel RWChannel, _ int) string {
515-
return channel.GetName()
516-
})
517-
511+
var channelNames []string
512+
for name, ch := range nodeChannels.Channels {
513+
if ch.GetCollectionID() == collectionID {
514+
channelNames = append(channelNames, name)
515+
}
516+
}
518517
nodeChs[nodeChannels.NodeID] = channelNames
519518
}
520519
return nodeChs
@@ -524,11 +523,11 @@ func (c *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID UniqueID
524523
func (c *ChannelManagerImpl) GetChannelsByCollectionID(collectionID UniqueID) []RWChannel {
525524
channels := make([]RWChannel, 0)
526525
for _, nodeChannels := range c.GetAssignedChannels() {
527-
filtered := lo.Filter(nodeChannels.Channels, func(channel RWChannel, _ int) bool {
528-
return channel.GetCollectionID() == collectionID
529-
})
530-
531-
channels = append(channels, filtered...)
526+
for _, ch := range nodeChannels.Channels {
527+
if ch.GetCollectionID() == collectionID {
528+
channels = append(channels, ch)
529+
}
530+
}
532531
}
533532
return channels
534533
}
@@ -807,7 +806,7 @@ func (c *ChannelManagerImpl) Reassign(originNodeID UniqueID, channelName string)
807806
}
808807
c.mu.RUnlock()
809808

810-
reallocates := &NodeChannelInfo{originNodeID, []RWChannel{ch}}
809+
reallocates := NewNodeChannelInfo(originNodeID, ch)
811810
isDropped := c.isMarkedDrop(channelName)
812811

813812
c.mu.Lock()
@@ -862,7 +861,7 @@ func (c *ChannelManagerImpl) CleanupAndReassign(nodeID UniqueID, channelName str
862861
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
863862
}
864863

865-
reallocates := &NodeChannelInfo{nodeID, []RWChannel{chToCleanUp}}
864+
reallocates := NewNodeChannelInfo(nodeID, chToCleanUp)
866865
isDropped := c.isMarkedDrop(channelName)
867866

868867
c.mu.Lock()

internal/datacoord/channel_manager_test.go

Lines changed: 89 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,31 @@ func waitAndStore(t *testing.T, watchkv kv.MetaKv, key string, waitState, storeS
5656
}
5757
}
5858

59+
func waitPrefixAndStore(t *testing.T, watchkv kv.MetaKv, prefix string, waitState, storeState datapb.ChannelWatchState) string {
60+
channelName := ""
61+
for {
62+
keys, values, err := watchkv.LoadWithPrefix(prefix)
63+
if err == nil && len(values) > 0 {
64+
for idx, value := range values {
65+
watchInfo, err := parseWatchInfo(keys[idx], []byte(value))
66+
require.NoError(t, err)
67+
require.Equal(t, waitState, watchInfo.GetState())
68+
69+
channelName = watchInfo.GetVchan().GetChannelName()
70+
71+
watchInfo.State = storeState
72+
data, err := proto.Marshal(watchInfo)
73+
require.NoError(t, err)
74+
75+
watchkv.Save(path.Join(prefix, watchInfo.GetVchan().GetChannelName()), string(data))
76+
}
77+
break
78+
}
79+
time.Sleep(100 * time.Millisecond)
80+
}
81+
return channelName
82+
}
83+
5984
// waitAndCheckState checks if the DataCoord writes expected state into Etcd
6085
func waitAndCheckState(t *testing.T, kv kv.MetaKv, expectedState datapb.ChannelWatchState, nodeID UniqueID, channelName string, collectionID UniqueID) {
6186
for {
@@ -217,10 +242,8 @@ func TestChannelManager_StateTransfer(t *testing.T) {
217242
chManager.store = &ChannelStore{
218243
store: watchkv,
219244
channelsInfo: map[int64]*NodeChannelInfo{
220-
nodeID: {nodeID, []RWChannel{
221-
&channelMeta{Name: cName, CollectionID: collectionID},
222-
}},
223-
oldNode: {oldNode, []RWChannel{}},
245+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}),
246+
oldNode: NewNodeChannelInfo(oldNode),
224247
},
225248
}
226249

@@ -260,9 +283,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
260283
chManager.store = &ChannelStore{
261284
store: watchkv,
262285
channelsInfo: map[int64]*NodeChannelInfo{
263-
nodeID: {nodeID, []RWChannel{
264-
&channelMeta{Name: cName, CollectionID: collectionID},
265-
}},
286+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}),
266287
},
267288
}
268289

@@ -306,10 +327,8 @@ func TestChannelManager_StateTransfer(t *testing.T) {
306327
chManager.store = &ChannelStore{
307328
store: watchkv,
308329
channelsInfo: map[int64]*NodeChannelInfo{
309-
nodeID: {nodeID, []RWChannel{
310-
&channelMeta{Name: cName, CollectionID: collectionID},
311-
}},
312-
oldNode: {oldNode, []RWChannel{}},
330+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}),
331+
oldNode: NewNodeChannelInfo(oldNode),
313332
},
314333
}
315334

@@ -352,9 +371,7 @@ func TestChannelManager_StateTransfer(t *testing.T) {
352371
chManager.store = &ChannelStore{
353372
store: watchkv,
354373
channelsInfo: map[int64]*NodeChannelInfo{
355-
nodeID: {nodeID, []RWChannel{
356-
&channelMeta{Name: cName, CollectionID: collectionID},
357-
}},
374+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: cName, CollectionID: collectionID}),
358375
},
359376
}
360377

@@ -400,10 +417,7 @@ func TestChannelManager(t *testing.T) {
400417
chManager.store = &ChannelStore{
401418
store: watchkv,
402419
channelsInfo: map[int64]*NodeChannelInfo{
403-
nodeID: {nodeID, []RWChannel{
404-
&channelMeta{Name: channel1, CollectionID: collectionID},
405-
&channelMeta{Name: channel2, CollectionID: collectionID},
406-
}},
420+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channel1, CollectionID: collectionID}, &channelMeta{Name: channel2, CollectionID: collectionID}),
407421
},
408422
}
409423

@@ -438,10 +452,7 @@ func TestChannelManager(t *testing.T) {
438452
chManager.store = &ChannelStore{
439453
store: watchkv,
440454
channelsInfo: map[int64]*NodeChannelInfo{
441-
bufferID: {bufferID, []RWChannel{
442-
&channelMeta{Name: channel1, CollectionID: collectionID},
443-
&channelMeta{Name: channel2, CollectionID: collectionID},
444-
}},
455+
bufferID: NewNodeChannelInfo(bufferID, &channelMeta{Name: channel1, CollectionID: collectionID}, &channelMeta{Name: channel2, CollectionID: collectionID}),
445456
},
446457
}
447458

@@ -502,7 +513,7 @@ func TestChannelManager(t *testing.T) {
502513
chManager.store = &ChannelStore{
503514
store: watchkv,
504515
channelsInfo: map[int64]*NodeChannelInfo{
505-
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
516+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
506517
},
507518
}
508519

@@ -682,11 +693,9 @@ func TestChannelManager(t *testing.T) {
682693
chManager.store = &ChannelStore{
683694
store: watchkv,
684695
channelsInfo: map[int64]*NodeChannelInfo{
685-
1: {1, []RWChannel{
686-
&channelMeta{Name: "channel-1", CollectionID: collectionID},
687-
&channelMeta{Name: "channel-2", CollectionID: collectionID},
688-
}},
689-
bufferID: {bufferID, []RWChannel{}},
696+
1: NewNodeChannelInfo(1, &channelMeta{Name: "channel-1", CollectionID: collectionID},
697+
&channelMeta{Name: "channel-2", CollectionID: collectionID}),
698+
bufferID: NewNodeChannelInfo(bufferID),
690699
},
691700
}
692701
chManager.stateTimer.startOne(datapb.ChannelWatchState_ToRelease, "channel-1", 1, Params.DataCoordCfg.WatchTimeoutInterval.GetAsDuration(time.Second))
@@ -774,7 +783,7 @@ func TestChannelManager(t *testing.T) {
774783
chManager.store = &ChannelStore{
775784
store: watchkv,
776785
channelsInfo: map[int64]*NodeChannelInfo{
777-
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
786+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
778787
},
779788
}
780789
ch = chManager.getChannelByNodeAndName(nodeID, channelName)
@@ -943,7 +952,7 @@ func TestChannelManager_Reload(t *testing.T) {
943952
chManager.store = &ChannelStore{
944953
store: watchkv,
945954
channelsInfo: map[int64]*NodeChannelInfo{
946-
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
955+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
947956
},
948957
}
949958

@@ -966,7 +975,7 @@ func TestChannelManager_Reload(t *testing.T) {
966975
chManager.store = &ChannelStore{
967976
store: watchkv,
968977
channelsInfo: map[int64]*NodeChannelInfo{
969-
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
978+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
970979
},
971980
}
972981

@@ -993,8 +1002,8 @@ func TestChannelManager_Reload(t *testing.T) {
9931002
chManager.store = &ChannelStore{
9941003
store: watchkv,
9951004
channelsInfo: map[int64]*NodeChannelInfo{
996-
nodeID: {nodeID, []RWChannel{&channelMeta{Name: channelName, CollectionID: collectionID}}},
997-
999: {999, []RWChannel{}},
1005+
nodeID: NewNodeChannelInfo(nodeID, &channelMeta{Name: channelName, CollectionID: collectionID}),
1006+
999: NewNodeChannelInfo(999),
9981007
},
9991008
}
10001009
require.NoError(t, err)
@@ -1024,8 +1033,8 @@ func TestChannelManager_Reload(t *testing.T) {
10241033
cm.store = &ChannelStore{
10251034
store: watchkv,
10261035
channelsInfo: map[int64]*NodeChannelInfo{
1027-
1: {1, []RWChannel{&channelMeta{Name: "channel1", CollectionID: 1}}},
1028-
2: {2, []RWChannel{&channelMeta{Name: "channel2", CollectionID: 1}}},
1036+
1: NewNodeChannelInfo(1, &channelMeta{Name: "channel1", CollectionID: 1}),
1037+
2: NewNodeChannelInfo(2, &channelMeta{Name: "channel2", CollectionID: 1}),
10291038
},
10301039
}
10311040

@@ -1077,58 +1086,69 @@ func TestChannelManager_BalanceBehaviour(t *testing.T) {
10771086
chManager.store = &ChannelStore{
10781087
store: watchkv,
10791088
channelsInfo: map[int64]*NodeChannelInfo{
1080-
1: {1, []RWChannel{
1081-
&channelMeta{Name: "channel-1", CollectionID: collectionID},
1089+
1: NewNodeChannelInfo(1, &channelMeta{Name: "channel-1", CollectionID: collectionID},
10821090
&channelMeta{Name: "channel-2", CollectionID: collectionID},
1083-
&channelMeta{Name: "channel-3", CollectionID: collectionID},
1084-
}},
1091+
&channelMeta{Name: "channel-3", CollectionID: collectionID}),
10851092
},
10861093
}
10871094

10881095
var channelBalanced string
10891096

10901097
chManager.AddNode(2)
1091-
channelBalanced = "channel-1"
10921098

1093-
key := path.Join(prefix, "1", channelBalanced)
1094-
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
1099+
watchPrefix := path.Join(prefix, "1")
1100+
channelBalanced = waitPrefixAndStore(t, watchkv, watchPrefix, datapb.ChannelWatchState_ToRelease, datapb.ChannelWatchState_ReleaseSuccess)
10951101

1096-
key = path.Join(prefix, "2", channelBalanced)
1102+
key := path.Join(prefix, "2", channelBalanced)
10971103
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
10981104

1099-
assert.True(t, chManager.Match(1, "channel-2"))
1100-
assert.True(t, chManager.Match(1, "channel-3"))
1101-
assert.True(t, chManager.Match(2, "channel-1"))
1105+
for _, channel := range []string{"channel-1", "channel-2", "channel-3"} {
1106+
if channel == channelBalanced {
1107+
assert.True(t, chManager.Match(2, channel))
1108+
} else {
1109+
assert.True(t, chManager.Match(1, channel))
1110+
}
1111+
}
11021112

11031113
chManager.AddNode(3)
11041114
chManager.Watch(ctx, &channelMeta{Name: "channel-4", CollectionID: collectionID})
1105-
key = path.Join(prefix, "3", "channel-4")
1106-
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
1107-
1108-
assert.True(t, chManager.Match(1, "channel-2"))
1109-
assert.True(t, chManager.Match(1, "channel-3"))
1110-
assert.True(t, chManager.Match(2, "channel-1"))
1111-
assert.True(t, chManager.Match(3, "channel-4"))
1115+
// key = path.Join(prefix, "3", "channel-4")
1116+
watchPrefix = path.Join(prefix, "3")
1117+
channelBalanced2 := waitPrefixAndStore(t, watchkv, watchPrefix, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
1118+
1119+
for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} {
1120+
if channel == channelBalanced {
1121+
assert.True(t, chManager.Match(2, channel))
1122+
} else if channel == channelBalanced2 {
1123+
assert.True(t, chManager.Match(3, channel))
1124+
} else {
1125+
assert.True(t, chManager.Match(1, channel))
1126+
}
1127+
}
11121128

11131129
chManager.DeleteNode(3)
1114-
key = path.Join(prefix, "2", "channel-4")
1130+
key = path.Join(prefix, "2", channelBalanced2)
11151131
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
11161132

1117-
assert.True(t, chManager.Match(1, "channel-2"))
1118-
assert.True(t, chManager.Match(1, "channel-3"))
1119-
assert.True(t, chManager.Match(2, "channel-1"))
1120-
assert.True(t, chManager.Match(2, "channel-4"))
1133+
for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} {
1134+
if channel == channelBalanced {
1135+
assert.True(t, chManager.Match(2, channel))
1136+
} else if channel == channelBalanced2 {
1137+
assert.True(t, chManager.Match(2, channel))
1138+
} else {
1139+
assert.True(t, chManager.Match(1, channel))
1140+
}
1141+
}
11211142

11221143
chManager.DeleteNode(2)
1123-
key = path.Join(prefix, "1", "channel-4")
1144+
key = path.Join(prefix, "1", channelBalanced)
11241145
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
1125-
key = path.Join(prefix, "1", "channel-1")
1146+
key = path.Join(prefix, "1", channelBalanced2)
11261147
waitAndStore(t, watchkv, key, datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_WatchSuccess)
11271148

1128-
assert.True(t, chManager.Match(1, "channel-2"))
1129-
assert.True(t, chManager.Match(1, "channel-3"))
1130-
assert.True(t, chManager.Match(1, "channel-1"))
1131-
assert.True(t, chManager.Match(1, "channel-4"))
1149+
for _, channel := range []string{"channel-1", "channel-2", "channel-3", "channel-4"} {
1150+
assert.True(t, chManager.Match(1, channel))
1151+
}
11321152
})
11331153
}
11341154

@@ -1157,12 +1177,7 @@ func TestChannelManager_RemoveChannel(t *testing.T) {
11571177
store: &ChannelStore{
11581178
store: watchkv,
11591179
channelsInfo: map[int64]*NodeChannelInfo{
1160-
1: {
1161-
NodeID: 1,
1162-
Channels: []RWChannel{
1163-
&channelMeta{Name: "ch1", CollectionID: 1},
1164-
},
1165-
},
1180+
1: NewNodeChannelInfo(1, &channelMeta{Name: "ch1", CollectionID: 1}),
11661181
},
11671182
},
11681183
},
@@ -1257,14 +1272,14 @@ func TestChannelManager_BackgroundChannelChecker(t *testing.T) {
12571272
mockStore.EXPECT().GetNodesChannels().Return([]*NodeChannelInfo{
12581273
{
12591274
NodeID: 1,
1260-
Channels: []RWChannel{
1261-
&channelMeta{
1275+
Channels: map[string]RWChannel{
1276+
"channel-1": &channelMeta{
12621277
Name: "channel-1",
12631278
},
1264-
&channelMeta{
1279+
"channel-2": &channelMeta{
12651280
Name: "channel-2",
12661281
},
1267-
&channelMeta{
1282+
"channel-3": &channelMeta{
12681283
Name: "channel-3",
12691284
},
12701285
},

0 commit comments

Comments
 (0)