Skip to content

Commit f3ef430

Browse files
Keep individual segment's updates in a single Etcd operation (milvus-io#20525)
/kind improvement Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com> Signed-off-by: Yuchen Gao <yuchen.gao@zilliz.com>
1 parent 35ebd17 commit f3ef430

2 files changed

Lines changed: 75 additions & 8 deletions

File tree

internal/metastore/kv/datacoord/kv_catalog.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ import (
3838
"github.com/milvus-io/milvus/internal/util/typeutil"
3939
)
4040

41+
const maxEtcdTxnNum = 64
42+
4143
type Catalog struct {
4244
Txn kv.TxnKV
4345
ChunkManagerRootPath string
@@ -102,23 +104,40 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm
102104
if len(newSegments) == 0 {
103105
return nil
104106
}
105-
106-
kvs := make(map[string]string)
107+
kvsBySeg := make(map[int64]map[string]string)
107108
for _, segment := range newSegments {
108109
segmentKvs, err := buildSegmentAndBinlogsKvs(segment)
109110
if err != nil {
110111
return err
111112
}
112-
maps.Copy(kvs, segmentKvs)
113+
kvsBySeg[segment.GetID()] = make(map[string]string)
114+
maps.Copy(kvsBySeg[segment.GetID()], segmentKvs)
113115
}
114-
116+
// Split kvs into multiple operations to avoid over-sized operations.
117+
// Also make sure kvs of the same segment are not split into different operations.
118+
kvsPiece := make(map[string]string)
119+
currSize := 0
115120
saveFn := func(partialKvs map[string]string) error {
116121
return kc.Txn.MultiSave(partialKvs)
117122
}
118-
if err := etcd.SaveByBatch(kvs, saveFn); err != nil {
119-
return err
123+
for _, kvs := range kvsBySeg {
124+
if currSize+len(kvs) >= maxEtcdTxnNum {
125+
if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil {
126+
log.Error("failed to save by batch", zap.Error(err))
127+
return err
128+
}
129+
kvsPiece = make(map[string]string)
130+
currSize = 0
131+
}
132+
maps.Copy(kvsPiece, kvs)
133+
currSize += len(kvs)
134+
}
135+
if currSize > 0 {
136+
if err := etcd.SaveByBatch(kvsPiece, saveFn); err != nil {
137+
log.Error("failed to save by batch", zap.Error(err))
138+
return err
139+
}
120140
}
121-
122141
return nil
123142
}
124143

@@ -219,7 +238,7 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [
219238
return kc.Txn.MultiSave(kvs)
220239
}
221240

222-
// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AtlerSegmentsAndAddNewSegment
241+
// RevertAlterSegmentsAndAddNewSegment reverts the metastore operation of AlterSegmentsAndAddNewSegment
223242
func (kc *Catalog) RevertAlterSegmentsAndAddNewSegment(ctx context.Context, oldSegments []*datapb.SegmentInfo, removeSegment *datapb.SegmentInfo) error {
224243
var (
225244
kvs = make(map[string]string)

internal/metastore/kv/datacoord/kv_catalog_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,54 @@ func Test_AlterSegments(t *testing.T) {
335335
assert.Equal(t, 4, len(savedKvs))
336336
verifySavedKvsForSegment(t, savedKvs)
337337
})
338+
339+
t.Run("save large ops successfully", func(t *testing.T) {
340+
txn := &MockedTxnKV{}
341+
savedKvs := make(map[string]string)
342+
opGroupCount := 0
343+
txn.multiSave = func(kvs map[string]string) error {
344+
var ks []string
345+
for k := range kvs {
346+
ks = append(ks, k)
347+
}
348+
maps.Copy(savedKvs, kvs)
349+
opGroupCount++
350+
return nil
351+
}
352+
353+
catalog := &Catalog{txn, "a"}
354+
err := catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{})
355+
assert.Nil(t, err)
356+
357+
var binlogXL []*datapb.FieldBinlog
358+
for i := 0; i < 255; i++ {
359+
binlogXL = append(binlogXL, &datapb.FieldBinlog{
360+
FieldID: int64(i),
361+
Binlogs: []*datapb.Binlog{
362+
{
363+
EntriesNum: 5,
364+
LogPath: binlogPath,
365+
},
366+
},
367+
})
368+
}
369+
370+
segmentXL := &datapb.SegmentInfo{
371+
ID: segmentID,
372+
CollectionID: collectionID,
373+
PartitionID: partitionID,
374+
NumOfRows: 100,
375+
State: commonpb.SegmentState_Flushed,
376+
Binlogs: binlogXL,
377+
Deltalogs: deltalogs,
378+
Statslogs: statslogs,
379+
}
380+
381+
err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL})
382+
assert.Nil(t, err)
383+
assert.Equal(t, 255+3, len(savedKvs))
384+
assert.Equal(t, 5, opGroupCount)
385+
})
338386
}
339387

340388
func Test_AlterSegmentsAndAddNewSegment(t *testing.T) {

0 commit comments

Comments
 (0)