Skip to content

Commit 29e620f

Browse files
authored
fix: sync task still running after DataNode has stopped (milvus-io#38377)
issue: milvus-io#38319 Signed-off-by: jaime <yun.zhang@zilliz.com>
1 parent d0a7e98 commit 29e620f

14 files changed

Lines changed: 158 additions & 31 deletions

File tree

configs/milvus.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -878,6 +878,8 @@ common:
878878
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
879879
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction
880880
localRPCEnabled: false # enable local rpc for internal communication when mix or standalone mode.
881+
sync:
882+
taskPoolReleaseTimeoutSeconds: 60 # The maximum time to wait for the task to finish and release resources in the pool
881883

882884
# QuotaConfig, configurations of Milvus quota and limits.
883885
# By default, we enable:

internal/datanode/data_node.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,13 @@ func (node *DataNode) Stop() error {
402402
node.writeBufferManager.Stop()
403403
}
404404

405+
if node.syncMgr != nil {
406+
err := node.syncMgr.Close()
407+
if err != nil {
408+
log.Error("sync manager close failed", zap.Error(err))
409+
}
410+
}
411+
405412
if node.allocator != nil {
406413
log.Ctx(node.ctx).Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
407414
node.allocator.Close()

internal/datanode/importv2/scheduler_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
246246
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
247247
s.cm = cm
248248

249-
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
249+
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
250250
future := conc.Go(func() (struct{}, error) {
251251
return struct{}{}, nil
252252
})
253-
return future
253+
return future, nil
254254
})
255255
importReq := &datapb.ImportRequest{
256256
JobID: 10,
@@ -307,11 +307,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
307307
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
308308
s.cm = cm
309309

310-
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
310+
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
311311
future := conc.Go(func() (struct{}, error) {
312312
return struct{}{}, errors.New("mock err")
313313
})
314-
return future
314+
return future, nil
315315
})
316316
importReq := &datapb.ImportRequest{
317317
JobID: 10,
@@ -384,11 +384,11 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
384384
}
385385

386386
func (s *SchedulerSuite) TestScheduler_ImportFile() {
387-
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
387+
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
388388
future := conc.Go(func() (struct{}, error) {
389389
return struct{}{}, nil
390390
})
391-
return future
391+
return future, nil
392392
})
393393
var once sync.Once
394394
data, err := testutil.CreateInsertData(s.schema, s.numRows)

internal/datanode/importv2/task_import.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func (t *ImportTask) importFile(reader importutilv2.Reader) error {
228228
}
229229

230230
func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
231-
log.Info("start to sync import data", WrapLogFields(t)...)
231+
log.Ctx(context.TODO()).Info("start to sync import data", WrapLogFields(t)...)
232232
futures := make([]*conc.Future[struct{}], 0)
233233
syncTasks := make([]syncmgr.Task, 0)
234234
for channelIdx, datas := range hashedData {
@@ -256,7 +256,11 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy
256256
if err != nil {
257257
return nil, nil, err
258258
}
259-
future := t.syncMgr.SyncData(t.ctx, syncTask)
259+
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
260+
if err != nil {
261+
log.Ctx(context.TODO()).Error("sync data failed", WrapLogFields(t, zap.Error(err))...)
262+
return nil, nil, err
263+
}
260264
futures = append(futures, future)
261265
syncTasks = append(syncTasks, syncTask)
262266
}

internal/datanode/importv2/task_l0_import.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (t *L0ImportTask) importL0(reader binlog.L0Reader) error {
213213
}
214214

215215
func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
216-
log.Info("start to sync l0 delete data", WrapLogFields(t)...)
216+
log.Ctx(context.TODO()).Info("start to sync l0 delete data", WrapLogFields(t)...)
217217
futures := make([]*conc.Future[struct{}], 0)
218218
syncTasks := make([]syncmgr.Task, 0)
219219
for channelIdx, data := range delData {
@@ -231,7 +231,11 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future
231231
if err != nil {
232232
return nil, nil, err
233233
}
234-
future := t.syncMgr.SyncData(t.ctx, syncTask)
234+
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
235+
if err != nil {
236+
log.Ctx(context.TODO()).Error("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...)
237+
return nil, nil, err
238+
}
235239
futures = append(futures, future)
236240
syncTasks = append(syncTasks, syncTask)
237241
}

internal/datanode/importv2/task_l0_import_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func (s *L0ImportSuite) TestL0PreImport() {
132132

133133
func (s *L0ImportSuite) TestL0Import() {
134134
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).
135-
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
135+
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
136136
alloc := allocator.NewMockAllocator(s.T())
137137
alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil)
138138
task.(*syncmgr.SyncTask).WithAllocator(alloc)
@@ -147,7 +147,7 @@ func (s *L0ImportSuite) TestL0Import() {
147147
future := conc.Go(func() (struct{}, error) {
148148
return struct{}{}, nil
149149
})
150-
return future
150+
return future, nil
151151
})
152152

153153
req := &datapb.ImportRequest{

internal/flushcommon/syncmgr/mock_sync_manager.go

Lines changed: 60 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/flushcommon/syncmgr/sync_manager.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ type SyncMeta struct {
4747
//go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage
4848
type SyncManager interface {
4949
// SyncData is the method to submit sync task.
50-
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}]
50+
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error)
5151

52+
// Close waits for the task to finish and then shuts down the sync manager.
53+
Close() error
5254
TaskStatsJSON() string
5355
}
5456

@@ -97,13 +99,17 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
9799
}
98100
}
99101

100-
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
102+
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
103+
if mgr.workerPool.IsClosed() {
104+
return nil, fmt.Errorf("sync manager is closed")
105+
}
106+
101107
switch t := task.(type) {
102108
case *SyncTask:
103109
t.WithChunkManager(mgr.chunkManager)
104110
}
105111

106-
return mgr.safeSubmitTask(ctx, task, callbacks...)
112+
return mgr.safeSubmitTask(ctx, task, callbacks...), nil
107113
}
108114

109115
// safeSubmitTask submits task to SyncManager
@@ -126,6 +132,7 @@ func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callba
126132
}
127133
callbacks = append([]func(error) error{handler}, callbacks...)
128134
log.Info("sync mgr sumbit task with key", zap.Int64("key", key))
135+
129136
return mgr.Submit(ctx, key, task, callbacks...)
130137
}
131138

@@ -142,3 +149,8 @@ func (mgr *syncManager) TaskStatsJSON() string {
142149
}
143150
return string(ret)
144151
}
152+
153+
func (mgr *syncManager) Close() error {
154+
timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)
155+
return mgr.workerPool.ReleaseTimeout(timeout)
156+
}

internal/flushcommon/syncmgr/sync_manager_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,22 @@ func (s *SyncManagerSuite) TestSubmit() {
173173
Timestamp: 100,
174174
})
175175

176-
f := manager.SyncData(context.Background(), task)
176+
f, err := manager.SyncData(context.Background(), task)
177+
s.NoError(err)
177178
s.NotNil(f)
178179

179-
_, err := f.Await()
180+
_, err = f.Await()
181+
s.NoError(err)
182+
}
183+
184+
func (s *SyncManagerSuite) TestClose() {
185+
manager := NewSyncManager(s.chunkManager)
186+
err := manager.Close()
180187
s.NoError(err)
188+
189+
f, err := manager.SyncData(context.Background(), nil)
190+
s.Error(err)
191+
s.Nil(f)
181192
}
182193

183194
func (s *SyncManagerSuite) TestCompacted() {
@@ -202,10 +213,11 @@ func (s *SyncManagerSuite) TestCompacted() {
202213
Timestamp: 100,
203214
})
204215

205-
f := manager.SyncData(context.Background(), task)
216+
f, err := manager.SyncData(context.Background(), task)
217+
s.NoError(err)
206218
s.NotNil(f)
207219

208-
_, err := f.Await()
220+
_, err = f.Await()
209221
s.NoError(err)
210222
s.EqualValues(1001, segmentID.Load())
211223
}
@@ -254,7 +266,7 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
254266
task.EXPECT().Run(mock.Anything).Return(merr.WrapErrServiceInternal("mocked")).Once()
255267
task.EXPECT().HandleError(mock.Anything)
256268

257-
f := manager.SyncData(context.Background(), task)
269+
f, _ := manager.SyncData(context.Background(), task)
258270
_, err := f.Await()
259271
s.Error(err)
260272
}
@@ -268,7 +280,7 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() {
268280
task.EXPECT().Run(mock.Anything).Return(errors.New("mock err")).Once()
269281
task.EXPECT().HandleError(mock.Anything)
270282

271-
f := manager.SyncData(context.Background(), task)
283+
f, _ := manager.SyncData(context.Background(), task)
272284
_, err := f.Await()
273285
s.Error(err)
274286
}

internal/flushcommon/writebuffer/write_buffer.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
322322
}
323323
}
324324

325-
result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
325+
future, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
326326
if wb.taskObserverCallback != nil {
327327
wb.taskObserverCallback(syncTask, err)
328328
}
@@ -342,7 +342,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
342342
}
343343
}
344344
return nil
345-
}))
345+
})
346+
if err != nil {
347+
log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
348+
}
349+
result = append(result, future)
346350
}
347351
return result
348352
}
@@ -643,7 +647,7 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
643647
t.WithDrop()
644648
}
645649

646-
f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
650+
f, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
647651
if wb.taskObserverCallback != nil {
648652
wb.taskObserverCallback(syncTask, err)
649653
}
@@ -656,6 +660,9 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
656660
}
657661
return nil
658662
})
663+
if err != nil {
664+
log.Fatal("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err))
665+
}
659666
futures = append(futures, f)
660667
}
661668

0 commit comments

Comments
 (0)