Skip to content

Commit 1bd65fc

Browse files
authored
enhance: remove deprecated lazy load code (milvus-io#47590)
Related to milvus-io#44452 Remove the deprecated lazy load feature which has been superseded by warmup-related parameters. This cleanup includes: - Remove AddFieldDataInfoForSealed from C++ segcore layer - Remove IsLazyLoad() method and isLazyLoad field from segment - Remove lazy load checks in proxy alterCollectionTask - Remove DiskCache lazy load handling in search/retrieve paths - Remove LazyLoadEnableKey constant and related helper functions - Update mock files to reflect interface changes --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
1 parent cb10e48 commit 1bd65fc

26 files changed

Lines changed: 33 additions & 647 deletions

configs/milvus.yaml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -531,13 +531,6 @@ queryNode:
531531
growingMmapEnabled: false
532532
fixedFileSizeForMmapAlloc: 1 # tmp file size for mmap chunk manager
533533
maxDiskUsagePercentageForMmapAlloc: 50 # disk percentage used in mmap chunk manager
534-
lazyload:
535-
enabled: false # Enable lazyload for loading data
536-
waitTimeout: 30000 # max wait timeout duration in milliseconds before start to do lazyload search and retrieve
537-
requestResourceTimeout: 5000 # max timeout in milliseconds for waiting request resource for lazy load, 5s by default
538-
requestResourceRetryInterval: 2000 # retry interval in milliseconds for waiting request resource for lazy load, 2s by default
539-
maxRetryTimes: 1 # max retry times for lazy load, 1 by default
540-
maxEvictPerRetry: 1 # max evict count for lazy load, 1 by default
541534
indexOffsetCacheEnabled: false # enable index offset cache for some scalar indexes, now is just for bitmap index, enable this param can improve performance for retrieving raw data from index
542535
scheduler:
543536
receiveChanSize: 10240

internal/core/src/segcore/segment_c.cpp

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -794,26 +794,6 @@ DropSealedSegmentJSONIndex(CSegmentInterface c_segment,
794794
}
795795
}
796796

797-
CStatus
798-
AddFieldDataInfoForSealed(CSegmentInterface c_segment,
799-
CLoadFieldDataInfo c_load_field_data_info) {
800-
SCOPE_CGO_CALL_METRIC();
801-
802-
try {
803-
auto segment_interface =
804-
reinterpret_cast<milvus::segcore::SegmentInterface*>(c_segment);
805-
auto segment =
806-
dynamic_cast<milvus::segcore::SegmentSealed*>(segment_interface);
807-
AssertInfo(segment != nullptr, "segment conversion failed");
808-
auto load_info =
809-
static_cast<LoadFieldDataInfo*>(c_load_field_data_info);
810-
segment->AddFieldDataInfoForSealed(*load_info);
811-
return milvus::SuccessCStatus();
812-
} catch (std::exception& e) {
813-
return milvus::FailureCStatus(milvus::UnexpectedError, e.what());
814-
}
815-
}
816-
817797
void
818798
RemoveFieldFile(CSegmentInterface c_segment, int64_t field_id) {
819799
SCOPE_CGO_CALL_METRIC();

internal/core/src/segcore/segment_c.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,10 +240,6 @@ DropSealedSegmentJSONIndex(CSegmentInterface c_segment,
240240
int64_t field_id,
241241
const char* nested_path);
242242

243-
CStatus
244-
AddFieldDataInfoForSealed(CSegmentInterface c_segment,
245-
CLoadFieldDataInfo c_load_field_data_info);
246-
247243
////////////////////////////// interfaces for SegmentInterface //////////////////////////////
248244
CStatus
249245
ExistPk(CSegmentInterface c_segment,

internal/mocks/util/mock_segcore/mock_CSegment.go

Lines changed: 0 additions & 59 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/proxy/task.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,18 +1343,9 @@ func hasWarmupProp(props ...*commonpb.KeyValuePair) bool {
13431343
return false
13441344
}
13451345

1346-
func hasLazyLoadProp(props ...*commonpb.KeyValuePair) bool {
1347-
for _, p := range props {
1348-
if p.GetKey() == common.LazyLoadEnableKey {
1349-
return true
1350-
}
1351-
}
1352-
return false
1353-
}
1354-
13551346
func hasPropInDeletekeys(keys []string) string {
13561347
for _, key := range keys {
1357-
if key == common.MmapEnabledKey || key == common.LazyLoadEnableKey || common.IsWarmupKey(key) {
1348+
if key == common.MmapEnabledKey || common.IsWarmupKey(key) {
13581349
return key
13591350
}
13601351
}
@@ -1405,16 +1396,15 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
14051396

14061397
if len(t.GetProperties()) > 0 {
14071398
hasMmap := hasMmapProp(t.Properties...)
1408-
hasLazyLoad := hasLazyLoadProp(t.Properties...)
14091399
hasWarmup := hasWarmupProp(t.Properties...)
1410-
if hasMmap || hasLazyLoad || hasWarmup {
1400+
if hasMmap || hasWarmup {
14111401
loaded, err := isCollectionLoaded(ctx, t.mixCoord, t.CollectionID)
14121402
if err != nil {
14131403
return err
14141404
}
14151405
if loaded {
14161406
// keeping the original error msg here for compatibility
1417-
if hasMmap || hasLazyLoad {
1407+
if hasMmap {
14181408
return merr.WrapErrCollectionLoaded(t.CollectionName, "can not alter mmap properties if collection loaded")
14191409
}
14201410
if hasWarmup {

internal/proxy/task_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6145,11 +6145,6 @@ func TestHasPropInDeletekeys(t *testing.T) {
61456145
assert.Equal(t, common.MmapEnabledKey, hasPropInDeletekeys(keys))
61466146
})
61476147

6148-
t.Run("has lazyload key", func(t *testing.T) {
6149-
keys := []string{common.LazyLoadEnableKey}
6150-
assert.Equal(t, common.LazyLoadEnableKey, hasPropInDeletekeys(keys))
6151-
})
6152-
61536148
t.Run("has generic warmup key", func(t *testing.T) {
61546149
keys := []string{common.WarmupKey}
61556150
assert.Equal(t, common.WarmupKey, hasPropInDeletekeys(keys))

internal/querynodev2/handlers.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,6 @@ func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsR
161161
continue
162162
}
163163

164-
if localSegment.IsLazyLoad() {
165-
localSegment.SetLoadInfo(info)
166-
localSegment.SetNeedUpdatedVersion(req.GetVersion())
167-
node.manager.DiskCache.MarkItemNeedReload(ctx, localSegment.ID())
168-
return nil
169-
}
170164
err := node.loader.LoadIndex(ctx, localSegment, info, req.Version)
171165
if err != nil {
172166
log.Warn("failed to load index", zap.Error(err))
@@ -200,12 +194,6 @@ func (node *QueryNode) loadStats(ctx context.Context, req *querypb.LoadSegmentsR
200194
continue
201195
}
202196

203-
if localSegment.IsLazyLoad() {
204-
localSegment.SetLoadInfo(info)
205-
localSegment.SetNeedUpdatedVersion(req.GetVersion())
206-
node.manager.DiskCache.MarkItemNeedReload(ctx, localSegment.ID())
207-
return nil
208-
}
209197
err := node.loader.LoadJSONIndex(ctx, localSegment, info)
210198
if err != nil {
211199
log.Warn("failed to load stats", zap.Error(err))

internal/querynodev2/segments/manager.go

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,13 @@ import (
3131

3232
"go.uber.org/atomic"
3333
"go.uber.org/zap"
34-
"golang.org/x/sync/singleflight"
3534

3635
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
37-
"github.com/milvus-io/milvus/internal/querynodev2/segments/metricsutil"
3836
"github.com/milvus-io/milvus/pkg/v2/eventlog"
3937
"github.com/milvus-io/milvus/pkg/v2/log"
4038
"github.com/milvus-io/milvus/pkg/v2/metrics"
4139
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
4240
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
43-
"github.com/milvus-io/milvus/pkg/v2/util/cache"
4441
"github.com/milvus-io/milvus/pkg/v2/util/lock"
4542
"github.com/milvus-io/milvus/pkg/v2/util/merr"
4643
"github.com/milvus-io/milvus/pkg/v2/util/metautil"
@@ -74,96 +71,16 @@ func IncreaseVersion(version int64) SegmentAction {
7471
type Manager struct {
7572
Collection CollectionManager
7673
Segment SegmentManager
77-
DiskCache cache.Cache[int64, Segment]
7874
Loader Loader
7975
}
8076

8177
func NewManager() *Manager {
82-
diskCap := paramtable.Get().QueryNodeCfg.DiskCacheCapacityLimit.GetAsSize()
83-
8478
segMgr := NewSegmentManager()
85-
sf := singleflight.Group{}
8679
manager := &Manager{
8780
Collection: NewCollectionManager(),
8881
Segment: segMgr,
8982
}
9083

91-
manager.DiskCache = cache.NewCacheBuilder[int64, Segment]().WithLazyScavenger(func(key int64) int64 {
92-
segment := segMgr.GetWithType(key, SegmentTypeSealed)
93-
if segment == nil {
94-
return 0
95-
}
96-
return int64(segment.ResourceUsageEstimate().DiskSize)
97-
}, diskCap).WithLoader(func(ctx context.Context, key int64) (Segment, error) {
98-
log := log.Ctx(ctx)
99-
log.Debug("cache missed segment", zap.Int64("segmentID", key))
100-
segment := segMgr.GetWithType(key, SegmentTypeSealed)
101-
if segment == nil {
102-
// the segment has been released, just ignore it
103-
log.Warn("segment is not found when loading", zap.Int64("segmentID", key))
104-
return nil, merr.ErrSegmentNotFound
105-
}
106-
info := segment.LoadInfo()
107-
_, err, _ := sf.Do(fmt.Sprint(segment.ID()), func() (nop interface{}, err error) {
108-
cacheLoadRecord := metricsutil.NewCacheLoadRecord(getSegmentMetricLabel(segment))
109-
cacheLoadRecord.WithBytes(segment.ResourceUsageEstimate().DiskSize)
110-
defer func() {
111-
cacheLoadRecord.Finish(err)
112-
}()
113-
114-
collection := manager.Collection.Get(segment.Collection())
115-
if collection == nil {
116-
return nil, merr.WrapErrCollectionNotLoaded(segment.Collection(), "failed to load segment fields")
117-
}
118-
119-
err = manager.Loader.LoadLazySegment(ctx, segment, info)
120-
return nil, err
121-
})
122-
if err != nil {
123-
log.Warn("cache sealed segment failed", zap.Error(err))
124-
return nil, err
125-
}
126-
return segment, nil
127-
}).WithFinalizer(func(ctx context.Context, key int64, segment Segment) error {
128-
log := log.Ctx(ctx)
129-
log.Debug("evict segment from cache", zap.Int64("segmentID", key))
130-
cacheEvictRecord := metricsutil.NewCacheEvictRecord(getSegmentMetricLabel(segment))
131-
cacheEvictRecord.WithBytes(segment.ResourceUsageEstimate().DiskSize)
132-
defer cacheEvictRecord.Finish(nil)
133-
segment.Release(ctx, WithReleaseScope(ReleaseScopeData))
134-
return nil
135-
}).WithReloader(func(ctx context.Context, key int64) (Segment, error) {
136-
log := log.Ctx(ctx)
137-
segment := segMgr.GetWithType(key, SegmentTypeSealed)
138-
if segment == nil {
139-
// the segment has been released, just ignore it
140-
log.Debug("segment is not found when reloading", zap.Int64("segmentID", key))
141-
return nil, merr.ErrSegmentNotFound
142-
}
143-
144-
localSegment := segment.(*LocalSegment)
145-
err := manager.Loader.LoadIndex(ctx, localSegment, segment.LoadInfo(), segment.NeedUpdatedVersion())
146-
if err != nil {
147-
log.Warn("reload segment failed", zap.Int64("segmentID", key), zap.Error(err))
148-
return nil, merr.ErrSegmentLoadFailed
149-
}
150-
if err := localSegment.RemoveUnusedFieldFiles(); err != nil {
151-
log.Warn("remove unused field files failed", zap.Int64("segmentID", key), zap.Error(err))
152-
return nil, merr.ErrSegmentReduplicate
153-
}
154-
155-
return segment, nil
156-
}).Build()
157-
158-
segMgr.registerReleaseCallback(func(s Segment) {
159-
if s.Type() == SegmentTypeSealed {
160-
// !!! We cannot use ctx of request to call Remove,
161-
// Once context canceled, the segment will be leak in cache forever.
162-
// Because it has been cleaned from segment manager.
163-
manager.DiskCache.Remove(context.Background(), s.ID())
164-
}
165-
})
166-
16784
return manager
16885
}
16986

internal/querynodev2/segments/mock_loader.go

Lines changed: 0 additions & 48 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)