Skip to content

Commit d08cb88

Browse files
authored
enhance: enable flush rate limiter of collection level (milvus-io#33837)
Signed-off-by: jaime <yun.zhang@zilliz.com>
1 parent 4c6f6c5 commit d08cb88

4 files changed

Lines changed: 25 additions & 13 deletions

File tree

configs/milvus.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -691,10 +691,10 @@ quotaAndLimits:
691691
db:
692692
max: -1 # qps of db level, default no limit, rate for CreateIndex, DropIndex
693693
flushRate:
694-
enabled: false
694+
enabled: true
695695
max: -1 # qps, default no limit, rate for flush
696696
collection:
697-
max: -1 # qps, default no limit, rate for flush at collection level.
697+
max: 0.1 # qps, default no limit, rate for flush at collection level.
698698
db:
699699
max: -1 # qps of db level, default no limit, rate for flush
700700
compactionRate:

internal/proxy/simple_rate_limiter_test.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"math"
2222
"testing"
23+
"time"
2324

2425
"github.com/stretchr/testify/assert"
2526

@@ -116,21 +117,32 @@ func TestSimpleRateLimiter(t *testing.T) {
116117
}
117118

118119
for _, rt := range internalpb.RateType_value {
120+
if internalpb.RateType_DDLFlush == internalpb.RateType(rt) {
121+
// the flush request has 0.1 rate limiter that means only allow to execute one request each 10 seconds.
122+
time.Sleep(10 * time.Second)
123+
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1)
124+
assert.NoError(t, err)
125+
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType_DDLFlush, 1)
126+
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
127+
continue
128+
}
129+
119130
if IsDDLRequest(internalpb.RateType(rt)) {
120131
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
121132
assert.NoError(t, err)
122133
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 5)
123134
assert.NoError(t, err)
124135
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 5)
125136
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
126-
} else {
127-
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
128-
assert.NoError(t, err)
129-
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
130-
assert.NoError(t, err)
131-
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
132-
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
137+
continue
133138
}
139+
140+
err := simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
141+
assert.NoError(t, err)
142+
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
143+
assert.NoError(t, err)
144+
err = simpleLimiter.Check(0, collectionIDToPartIDs, internalpb.RateType(rt), 1)
145+
assert.ErrorIs(t, err, merr.ErrServiceRateLimit)
134146
}
135147
Params.Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, bak)
136148
})

pkg/util/paramtable/quota_param.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ seconds, (0 ~ 65536)`,
330330
p.FlushLimitEnabled = ParamItem{
331331
Key: "quotaAndLimits.flushRate.enabled",
332332
Version: "2.2.0",
333-
DefaultValue: "false",
333+
DefaultValue: "true",
334334
Export: true,
335335
}
336336
p.FlushLimitEnabled.Init(base.mgr)
@@ -376,7 +376,7 @@ seconds, (0 ~ 65536)`,
376376
p.MaxFlushRatePerCollection = ParamItem{
377377
Key: "quotaAndLimits.flushRate.collection.max",
378378
Version: "2.3.9",
379-
DefaultValue: "-1",
379+
DefaultValue: "0.1",
380380
Formatter: func(v string) string {
381381
if !p.FlushLimitEnabled.GetAsBool() {
382382
return max

pkg/util/paramtable/quota_param_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ func TestQuotaParam(t *testing.T) {
4141
t.Run("test functional params", func(t *testing.T) {
4242
assert.Equal(t, false, qc.IndexLimitEnabled.GetAsBool())
4343
assert.Equal(t, defaultMax, qc.MaxIndexRate.GetAsFloat())
44-
assert.False(t, qc.FlushLimitEnabled.GetAsBool())
45-
assert.Equal(t, defaultMax, qc.MaxFlushRatePerCollection.GetAsFloat())
44+
assert.True(t, qc.FlushLimitEnabled.GetAsBool())
45+
assert.Equal(t, 0.1, qc.MaxFlushRatePerCollection.GetAsFloat())
4646
assert.Equal(t, defaultMax, qc.MaxFlushRate.GetAsFloat())
4747
assert.Equal(t, false, qc.CompactionLimitEnabled.GetAsBool())
4848
assert.Equal(t, defaultMax, qc.MaxCompactionRate.GetAsFloat())

0 commit comments

Comments
 (0)