Skip to content

Commit 4714442

Browse files
authored
fix: Fix regeneratePartitionStats failed after restore clusteringCompactionTask (milvus-io#43205)
issue: milvus-io#43186 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
1 parent 490c5d5 commit 4714442

2 files changed

Lines changed: 25 additions & 5 deletions

File tree

internal/datacoord/compaction_task_clustering.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
378378
Field2StatslogPaths: segInfo.GetStatslogs(),
379379
Deltalogs: segInfo.GetDeltalogs(),
380380
IsSorted: segInfo.GetIsSorted(),
381+
StorageVersion: segInfo.GetStorageVersion(),
381382
})
382383
}
383384
log.Info("Compaction handler build clustering compaction plan", zap.Any("PreAllocatedLogIDs", logIDRange))
@@ -420,6 +421,12 @@ func (t *clusteringCompactionTask) processStats() error {
420421
return nil
421422
}
422423

424+
task := t.ShadowClone(setResultSegments(resultSegments))
425+
err := t.saveTaskMeta(task)
426+
if err != nil {
427+
return merr.WrapErrClusteringCompactionMetaError("setResultSegments", err)
428+
}
429+
423430
if err := t.regeneratePartitionStats(tmpToResultSegments); err != nil {
424431
log.Warn("regenerate partition stats failed, wait for retry", zap.Error(err))
425432
return merr.WrapErrClusteringCompactionMetaError("regeneratePartitionStats", err)
@@ -448,7 +455,7 @@ func (t *clusteringCompactionTask) regeneratePartitionStats(tmpToResultSegments
448455
return err
449456
}
450457
partitionStatsFile := path.Join(cli.RootPath(), common.PartitionStatsPath,
451-
metautil.JoinIDPath(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID()), t.plan.GetChannel(),
458+
metautil.JoinIDPath(t.GetTaskProto().GetCollectionID(), t.GetTaskProto().GetPartitionID()), t.GetTaskProto().GetChannel(),
452459
strconv.FormatInt(t.GetTaskProto().GetPlanID(), 10))
453460

454461
value, err := cli.Read(ctx, partitionStatsFile)
@@ -663,14 +670,26 @@ func (t *clusteringCompactionTask) doClean() error {
663670
} else {
664671
// after v2.5.0, mark the results segment as dropped
665672
var operators []UpdateOperator
666-
for _, segID := range t.GetTaskProto().GetResultSegments() {
667-
// Don't worry about them being loaded; they are all invisible.
668-
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
673+
hasResultSegments := len(t.GetTaskProto().GetResultSegments()) != 0
674+
if hasResultSegments {
675+
for _, segID := range t.GetTaskProto().GetResultSegments() {
676+
// Don't worry about them being loaded; they are all invisible.
677+
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
678+
}
669679
}
680+
670681
for _, segID := range t.GetTaskProto().GetTmpSegments() {
671682
// Don't worry about them being loaded; they are all invisible.
672683
// tmpSegment is always invisible
673684
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
685+
if !hasResultSegments {
686+
toSegments, _ := t.meta.(*meta).GetCompactionTo(segID)
687+
if toSegments != nil {
688+
for _, toSeg := range toSegments {
689+
operators = append(operators, UpdateStatusOperator(toSeg.GetID(), commonpb.SegmentState_Dropped))
690+
}
691+
}
692+
}
674693
}
675694
err := t.meta.UpdateSegmentsInfo(context.TODO(), operators...)
676695
if err != nil {

internal/datacoord/meta.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1595,7 +1595,8 @@ func (m *meta) completeClusterCompactionMutation(t *datapb.CompactionTask, resul
15951595
return info.GetDmlPosition()
15961596
})),
15971597
// visible after stats and index
1598-
IsInvisible: true,
1598+
IsInvisible: true,
1599+
StorageVersion: seg.GetStorageVersion(),
15991600
}
16001601
segment := NewSegmentInfo(segmentInfo)
16011602
compactToSegInfos = append(compactToSegInfos, segment)

0 commit comments

Comments
 (0)