Skip to content

Commit e5f408d

Browse files
authored
Merge IndexCoord and DataCoord (milvus-io#21267)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
1 parent 7b39873 commit e5f408d

92 files changed

Lines changed: 11338 additions & 3988 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmd/milvus/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (c *run) formatFlags(args []string, flags *flag.FlagSet) {
110110

111111
flags.BoolVar(&c.enableRootCoord, typeutil.RootCoordRole, false, "enable root coordinator")
112112
flags.BoolVar(&c.enableQueryCoord, typeutil.QueryCoordRole, false, "enable query coordinator")
113-
flags.BoolVar(&c.enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
113+
//flags.BoolVar(&c.enableIndexCoord, typeutil.IndexCoordRole, false, "enable index coordinator")
114114
flags.BoolVar(&c.enableDataCoord, typeutil.DataCoordRole, false, "enable data coordinator")
115115

116116
flags.BoolVar(&c.enableQueryNode, typeutil.QueryNodeRole, false, "enable query node")

cmd/tools/migration/meta/meta220.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"github.com/blang/semver/v4"
55
"github.com/golang/protobuf/proto"
66
"github.com/milvus-io/milvus/cmd/tools/migration/versions"
7-
"github.com/milvus-io/milvus/internal/metastore/kv/indexcoord"
7+
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
88
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
99
"github.com/milvus-io/milvus/internal/metastore/kv/rootcoord"
1010
"github.com/milvus-io/milvus/internal/metastore/model"
@@ -188,7 +188,7 @@ func (meta *CollectionIndexesMeta220) GenerateSaves() (map[string]string, error)
188188

189189
for collectionID := range *meta {
190190
for indexID := range (*meta)[collectionID] {
191-
ckey := indexcoord.BuildIndexKey(collectionID, indexID)
191+
ckey := datacoord.BuildIndexKey(collectionID, indexID)
192192
index := (*meta)[collectionID][indexID]
193193
var value string
194194
indexPb := model.MarshalIndexModel(index)
@@ -210,7 +210,7 @@ func (meta *SegmentIndexesMeta220) GenerateSaves() (map[string]string, error) {
210210
for segmentID := range *meta {
211211
for indexID := range (*meta)[segmentID] {
212212
index := (*meta)[segmentID][indexID]
213-
ckey := indexcoord.BuildSegmentIndexKey(index.CollectionID, index.PartitionID, index.SegmentID, index.BuildID)
213+
ckey := datacoord.BuildSegmentIndexKey(index.CollectionID, index.PartitionID, index.SegmentID, index.BuildID)
214214
var value string
215215
indexPb := model.MarshalSegmentIndexModel(index)
216216
marshaledIndexPb, err := proto.Marshal(indexPb)

configs/milvus.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ dataCoord:
298298
# over (compactableProportion * segment max # of rows) rows.
299299
# MUST BE GREATER THAN OR EQUAL TO <smallProportion>!!!
300300
expansionRate: 1.25 # During compaction, the size of segment # of rows is able to exceed segment max # of rows by (expansionRate-1) * 100%.
301+
minSegmentNumRowsToEnableIndex: 1024 # It's a threshold. When the segment num rows is less than this value, the segment will not be indexed
301302

302303
compaction:
303304
enableAutoCompaction: true
@@ -307,6 +308,13 @@ dataCoord:
307308
missingTolerance: 86400 # file meta missing tolerance duration in seconds, 60*24
308309
dropTolerance: 86400 # file belongs to dropped entity tolerance duration in seconds, 60*24
309310

311+
bindIndexNodeMode:
312+
enable: false
313+
address: "localhost:22930"
314+
withCred: false
315+
nodeID: 0
316+
317+
310318

311319
dataNode:
312320
port: 21124
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Licensed to the LF AI & Data foundation under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package datacoord
18+
19+
import "sort"
20+
21+
type buildIndexPolicy func(buildIDs []UniqueID)
22+
23+
func defaultBuildIndexPolicy(buildIDs []UniqueID) {
24+
sort.Slice(buildIDs, func(i, j int) bool {
25+
return buildIDs[i] < buildIDs[j]
26+
})
27+
28+
}

internal/datacoord/compaction.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,20 @@ type compactionPlanHandler struct {
100100
quit chan struct{}
101101
wg sync.WaitGroup
102102
flushCh chan UniqueID
103-
segRefer *SegmentReferenceManager
104-
parallelCh map[int64]chan struct{}
103+
//segRefer *SegmentReferenceManager
104+
parallelCh map[int64]chan struct{}
105105
}
106106

107107
func newCompactionPlanHandler(sessions *SessionManager, cm *ChannelManager, meta *meta,
108-
allocator allocator, flush chan UniqueID, segRefer *SegmentReferenceManager) *compactionPlanHandler {
108+
allocator allocator, flush chan UniqueID) *compactionPlanHandler {
109109
return &compactionPlanHandler{
110-
plans: make(map[int64]*compactionTask),
111-
chManager: cm,
112-
meta: meta,
113-
sessions: sessions,
114-
allocator: allocator,
115-
flushCh: flush,
116-
segRefer: segRefer,
110+
plans: make(map[int64]*compactionTask),
111+
chManager: cm,
112+
meta: meta,
113+
sessions: sessions,
114+
allocator: allocator,
115+
flushCh: flush,
116+
//segRefer: segRefer,
117117
parallelCh: make(map[int64]chan struct{}),
118118
}
119119
}

internal/datacoord/compaction_test.go

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -322,18 +322,12 @@ func TestCompactionPlanHandler_handleMergeCompactionResult(t *testing.T) {
322322
plans: plans,
323323
sessions: sessions,
324324
meta: meta,
325-
segRefer: &SegmentReferenceManager{
326-
segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{},
327-
},
328325
}
329326

330327
c2 := &compactionPlanHandler{
331328
plans: plans,
332329
sessions: sessions,
333330
meta: errMeta,
334-
segRefer: &SegmentReferenceManager{
335-
segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{},
336-
},
337331
}
338332

339333
compactionResult := &datapb.CompactionResult{
@@ -380,19 +374,13 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
380374
t.Run("test not exists compaction task", func(t *testing.T) {
381375
c := &compactionPlanHandler{
382376
plans: map[int64]*compactionTask{1: {}},
383-
segRefer: &SegmentReferenceManager{
384-
segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{},
385-
},
386377
}
387378
err := c.completeCompaction(&datapb.CompactionResult{PlanID: 2})
388379
assert.Error(t, err)
389380
})
390381
t.Run("test completed compaction task", func(t *testing.T) {
391382
c := &compactionPlanHandler{
392383
plans: map[int64]*compactionTask{1: {state: completed}},
393-
segRefer: &SegmentReferenceManager{
394-
segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{},
395-
},
396384
}
397385
err := c.completeCompaction(&datapb.CompactionResult{PlanID: 1})
398386
assert.Error(t, err)
@@ -480,9 +468,6 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
480468
sessions: sessions,
481469
meta: meta,
482470
flushCh: flushCh,
483-
segRefer: &SegmentReferenceManager{
484-
segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{},
485-
},
486471
}
487472

488473
err := c.completeCompaction(&compactionResult)
@@ -581,9 +566,6 @@ func TestCompactionPlanHandler_completeCompaction(t *testing.T) {
581566
sessions: sessions,
582567
meta: meta,
583568
flushCh: flushCh,
584-
segRefer: &SegmentReferenceManager{
585-
segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{},
586-
},
587569
}
588570

589571
err := c.completeCompaction(&compactionResult)
@@ -780,7 +762,6 @@ func Test_newCompactionPlanHandler(t *testing.T) {
780762
meta *meta
781763
allocator allocator
782764
flush chan UniqueID
783-
segRefer *SegmentReferenceManager
784765
}
785766
tests := []struct {
786767
name string
@@ -795,7 +776,6 @@ func Test_newCompactionPlanHandler(t *testing.T) {
795776
&meta{},
796777
newMockAllocator(),
797778
nil,
798-
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}},
799779
},
800780
&compactionPlanHandler{
801781
plans: map[int64]*compactionTask{},
@@ -804,14 +784,13 @@ func Test_newCompactionPlanHandler(t *testing.T) {
804784
meta: &meta{},
805785
allocator: newMockAllocator(),
806786
flushCh: nil,
807-
segRefer: &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}},
808787
parallelCh: make(map[int64]chan struct{}),
809788
},
810789
},
811790
}
812791
for _, tt := range tests {
813792
t.Run(tt.name, func(t *testing.T) {
814-
got := newCompactionPlanHandler(tt.args.sessions, tt.args.cm, tt.args.meta, tt.args.allocator, tt.args.flush, tt.args.segRefer)
793+
got := newCompactionPlanHandler(tt.args.sessions, tt.args.cm, tt.args.meta, tt.args.allocator, tt.args.flush)
815794
assert.EqualValues(t, tt.want, got)
816795
})
817796
}

internal/datacoord/compaction_trigger.go

Lines changed: 40 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,15 @@ import (
2323
"sync"
2424
"time"
2525

26+
"github.com/samber/lo"
27+
"go.uber.org/zap"
28+
2629
"github.com/milvus-io/milvus-proto/go-api/commonpb"
2730
"github.com/milvus-io/milvus/internal/log"
2831
"github.com/milvus-io/milvus/internal/proto/datapb"
29-
"github.com/milvus-io/milvus/internal/proto/indexpb"
30-
"github.com/milvus-io/milvus/internal/types"
31-
"github.com/milvus-io/milvus/internal/util/funcutil"
3232
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
3333
"github.com/milvus-io/milvus/internal/util/logutil"
3434
"github.com/milvus-io/milvus/internal/util/tsoutil"
35-
"github.com/samber/lo"
36-
"go.uber.org/zap"
3735
)
3836

3937
type compactTime struct {
@@ -66,17 +64,17 @@ type compactionSignal struct {
6664
var _ trigger = (*compactionTrigger)(nil)
6765

6866
type compactionTrigger struct {
69-
handler Handler
70-
meta *meta
71-
allocator allocator
72-
signals chan *compactionSignal
73-
compactionHandler compactionPlanContext
74-
globalTrigger *time.Ticker
75-
forceMu sync.Mutex
76-
quit chan struct{}
77-
wg sync.WaitGroup
78-
segRefer *SegmentReferenceManager
79-
indexCoord types.IndexCoord
67+
handler Handler
68+
meta *meta
69+
allocator allocator
70+
signals chan *compactionSignal
71+
compactionHandler compactionPlanContext
72+
globalTrigger *time.Ticker
73+
forceMu sync.Mutex
74+
quit chan struct{}
75+
wg sync.WaitGroup
76+
//segRefer *SegmentReferenceManager
77+
//indexCoord types.IndexCoord
8078
estimateNonDiskSegmentPolicy calUpperLimitPolicy
8179
estimateDiskSegmentPolicy calUpperLimitPolicy
8280
// A sloopy hack, so we can test with different segment row count without worrying that
@@ -88,17 +86,17 @@ func newCompactionTrigger(
8886
meta *meta,
8987
compactionHandler compactionPlanContext,
9088
allocator allocator,
91-
segRefer *SegmentReferenceManager,
92-
indexCoord types.IndexCoord,
89+
//segRefer *SegmentReferenceManager,
90+
//indexCoord types.IndexCoord,
9391
handler Handler,
9492
) *compactionTrigger {
9593
return &compactionTrigger{
96-
meta: meta,
97-
allocator: allocator,
98-
signals: make(chan *compactionSignal, 100),
99-
compactionHandler: compactionHandler,
100-
segRefer: segRefer,
101-
indexCoord: indexCoord,
94+
meta: meta,
95+
allocator: allocator,
96+
signals: make(chan *compactionSignal, 100),
97+
compactionHandler: compactionHandler,
98+
//segRefer: segRefer,
99+
//indexCoord: indexCoord,
102100
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
103101
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
104102
handler: handler,
@@ -280,39 +278,29 @@ func (t *compactionTrigger) reCalcSegmentMaxNumOfRows(collectionID UniqueID, isD
280278

281279
// TODO: Update segment info should be written back to Etcd.
282280
func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool, error) {
283-
ctx := context.Background()
284-
285281
if len(segments) == 0 {
286282
return false, nil
287283
}
288284

289285
collectionID := segments[0].GetCollectionID()
290-
resp, err := t.indexCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
291-
CollectionID: collectionID,
292-
IndexName: "",
293-
})
294-
if err != nil {
295-
return false, err
296-
}
286+
indexInfos := t.meta.GetIndexesForCollection(segments[0].GetCollectionID(), "")
297287

298288
isDiskANN := false
299-
for _, indexInfo := range resp.IndexInfos {
300-
indexParamsMap := funcutil.KeyValuePair2Map(indexInfo.IndexParams)
301-
if indexType, ok := indexParamsMap["index_type"]; ok {
302-
if indexType == indexparamcheck.IndexDISKANN {
303-
// If index type is DiskANN, recalc segment max size here.
304-
isDiskANN = true
305-
newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true)
306-
if err != nil {
307-
return false, err
308-
}
309-
if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() {
310-
log.Info("segment max rows recalculated for DiskANN collection",
311-
zap.Int64("old max rows", segments[0].GetMaxRowNum()),
312-
zap.Int64("new max rows", int64(newMaxRows)))
313-
for _, segment := range segments {
314-
segment.MaxRowNum = int64(newMaxRows)
315-
}
289+
for _, indexInfo := range indexInfos {
290+
indexType := getIndexType(indexInfo.IndexParams)
291+
if indexType == indexparamcheck.IndexDISKANN {
292+
// If index type is DiskANN, recalc segment max size here.
293+
isDiskANN = true
294+
newMaxRows, err := t.reCalcSegmentMaxNumOfRows(collectionID, true)
295+
if err != nil {
296+
return false, err
297+
}
298+
if len(segments) > 0 && int64(newMaxRows) != segments[0].GetMaxRowNum() {
299+
log.Info("segment max rows recalculated for DiskANN collection",
300+
zap.Int64("old max rows", segments[0].GetMaxRowNum()),
301+
zap.Int64("new max rows", int64(newMaxRows)))
302+
for _, segment := range segments {
303+
segment.MaxRowNum = int64(newMaxRows)
316304
}
317305
}
318306
}
@@ -365,7 +353,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
365353
break
366354
}
367355

368-
group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...)
356+
//group.segments = FilterInIndexedSegments(t.handler, t.meta, group.segments...)
369357

370358
isDiskIndex, err := t.updateSegmentMaxSize(group.segments)
371359
if err != nil {
@@ -734,7 +722,7 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int)
734722

735723
func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
736724
segments := t.meta.GetSegmentsByChannel(channel)
737-
segments = FilterInIndexedSegments(t.handler, t.indexCoord, segments...)
725+
segments = FilterInIndexedSegments(t.handler, t.meta, segments...)
738726
var res []*SegmentInfo
739727
for _, s := range segments {
740728
if !isSegmentHealthy(s) ||

0 commit comments

Comments
 (0)