Skip to content

Commit 760223f

Browse files
fix: use seperate warmup pool and disable warmup by default (milvus-io#33348)
1. use a small warmup pool to reduce the impact of warmup 2. change the warmup pool to nonblocking mode 3. disable warmup by default 4. remove the maximum size limit of 16 for the load pool issue: milvus-io#32772 --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Co-authored-by: xiaofanluan <xiaofan.luan@zilliz.com>
1 parent 1b67cec commit 760223f

6 files changed

Lines changed: 64 additions & 14 deletions

File tree

configs/milvus.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ queryNode:
335335
# chunk cache during the load process. This approach has the potential to substantially reduce query/search latency
336336
# for a specific duration post-load, albeit accompanied by a concurrent increase in disk usage;
337337
# 2. If set to "off," original vector data will only be loaded into the chunk cache during search/query.
338-
warmup: async
338+
warmup: off
339339
mmap:
340340
mmapEnabled: false # Enable mmap for loading data
341341
lazyload:

internal/querynodev2/segments/pool.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ var (
3737
// and other operations (insert/delete/statistics/etc.)
3838
// since in concurrent situation, there operation may block each other in high payload
3939

40-
sqp atomic.Pointer[conc.Pool[any]]
41-
sqOnce sync.Once
42-
dp atomic.Pointer[conc.Pool[any]]
43-
dynOnce sync.Once
44-
loadPool atomic.Pointer[conc.Pool[any]]
45-
loadOnce sync.Once
40+
sqp atomic.Pointer[conc.Pool[any]]
41+
sqOnce sync.Once
42+
dp atomic.Pointer[conc.Pool[any]]
43+
dynOnce sync.Once
44+
loadPool atomic.Pointer[conc.Pool[any]]
45+
loadOnce sync.Once
46+
warmupPool atomic.Pointer[conc.Pool[any]]
47+
warmupOnce sync.Once
4648
)
4749

4850
// initSQPool initialize
@@ -80,9 +82,6 @@ func initLoadPool() {
8082
loadOnce.Do(func() {
8183
pt := paramtable.Get()
8284
poolSize := hardware.GetCPUNum() * pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.GetAsInt()
83-
if poolSize > 16 {
84-
poolSize = 16
85-
}
8685
pool := conc.NewPool[any](
8786
poolSize,
8887
conc.WithPreAlloc(false),
@@ -96,6 +95,23 @@ func initLoadPool() {
9695
})
9796
}
9897

98+
func initWarmupPool() {
99+
warmupOnce.Do(func() {
100+
pt := paramtable.Get()
101+
poolSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()
102+
pool := conc.NewPool[any](
103+
poolSize,
104+
conc.WithPreAlloc(false),
105+
conc.WithDisablePurge(false),
106+
conc.WithPreHandler(runtime.LockOSThread), // lock os thread for cgo thread disposal
107+
conc.WithNonBlocking(true), // make warming up non blocking
108+
)
109+
110+
warmupPool.Store(pool)
111+
pt.Watch(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, config.NewHandler("qn.warmpool.lowpriority", ResizeWarmupPool))
112+
})
113+
}
114+
99115
// GetSQPool returns the singleton pool instance for search/query operations.
100116
func GetSQPool() *conc.Pool[any] {
101117
initSQPool()
@@ -113,6 +129,11 @@ func GetLoadPool() *conc.Pool[any] {
113129
return loadPool.Load()
114130
}
115131

132+
func GetWarmupPool() *conc.Pool[any] {
133+
initWarmupPool()
134+
return warmupPool.Load()
135+
}
136+
116137
func ResizeSQPool(evt *config.Event) {
117138
if evt.HasUpdated {
118139
pt := paramtable.Get()
@@ -131,6 +152,14 @@ func ResizeLoadPool(evt *config.Event) {
131152
}
132153
}
133154

155+
func ResizeWarmupPool(evt *config.Event) {
156+
if evt.HasUpdated {
157+
pt := paramtable.Get()
158+
newSize := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()
159+
resizePool(GetWarmupPool(), newSize, "WarmupPool")
160+
}
161+
}
162+
134163
func resizePool(pool *conc.Pool[any], newSize int, tag string) {
135164
log := log.Ctx(context.Background()).
136165
With(

internal/querynodev2/segments/pool_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,27 @@ func TestResizePools(t *testing.T) {
8282
assert.Equal(t, expectedCap, GetLoadPool().Cap())
8383
})
8484

85+
t.Run("WarmupPool", func(t *testing.T) {
86+
expectedCap := hardware.GetCPUNum() * pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsInt()
87+
88+
ResizeWarmupPool(&config.Event{
89+
HasUpdated: true,
90+
})
91+
assert.Equal(t, expectedCap, GetWarmupPool().Cap())
92+
93+
pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, strconv.FormatFloat(pt.CommonCfg.LowPriorityThreadCoreCoefficient.GetAsFloat()*2, 'f', 10, 64))
94+
ResizeWarmupPool(&config.Event{
95+
HasUpdated: true,
96+
})
97+
assert.Equal(t, expectedCap, GetWarmupPool().Cap())
98+
99+
pt.Save(pt.CommonCfg.LowPriorityThreadCoreCoefficient.Key, "0")
100+
ResizeWarmupPool(&config.Event{
101+
HasUpdated: true,
102+
})
103+
assert.Equal(t, expectedCap, GetWarmupPool().Cap())
104+
})
105+
85106
t.Run("error_pool", func(*testing.T) {
86107
pool := conc.NewDefaultPool[any]()
87108
c := pool.Cap()

internal/querynodev2/segments/segment.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1386,7 +1386,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
13861386
warmingUp := strings.ToLower(paramtable.Get().QueryNodeCfg.ChunkCacheWarmingUp.GetValue())
13871387
switch warmingUp {
13881388
case "sync":
1389-
GetLoadPool().Submit(func() (any, error) {
1389+
GetWarmupPool().Submit(func() (any, error) {
13901390
cFieldID := C.int64_t(fieldID)
13911391
status = C.WarmupChunkCache(s.ptr, cFieldID)
13921392
if err := HandleCStatus(ctx, &status, "warming up chunk cache failed"); err != nil {
@@ -1397,7 +1397,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64) {
13971397
return nil, nil
13981398
}).Await()
13991399
case "async":
1400-
GetLoadPool().Submit(func() (any, error) {
1400+
GetWarmupPool().Submit(func() (any, error) {
14011401
if !s.ptrLock.RLockIf(state.IsNotReleased) {
14021402
return nil, nil
14031403
}

pkg/util/paramtable/component_param.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2357,7 +2357,7 @@ func (p *queryNodeConfig) init(base *BaseTable) {
23572357
p.ChunkCacheWarmingUp = ParamItem{
23582358
Key: "queryNode.cache.warmup",
23592359
Version: "2.3.6",
2360-
DefaultValue: "async",
2360+
DefaultValue: "off",
23612361
Doc: `options: async, sync, off.
23622362
Specifies the necessity for warming up the chunk cache.
23632363
1. If set to "sync" or "async," the original vector data will be synchronously/asynchronously loaded into the

pkg/util/paramtable/component_param_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func TestComponentParam(t *testing.T) {
339339

340340
// chunk cache
341341
assert.Equal(t, "willneed", Params.ReadAheadPolicy.GetValue())
342-
assert.Equal(t, "async", Params.ChunkCacheWarmingUp.GetValue())
342+
assert.Equal(t, "false", Params.ChunkCacheWarmingUp.GetValue())
343343

344344
// test small indexNlist/NProbe default
345345
params.Remove("queryNode.segcore.smallIndex.nlist")

0 commit comments

Comments
 (0)