Skip to content

Commit 9d16b97

Browse files
authored
feat: add tasks page into management WebUI (milvus-io#37002)
issue: milvus-io#36621 1. Add API to access task runtime metrics, including: - build index task - compaction task - import task - balance (including load/release of segments/channels and some leader tasks on querycoord) - sync task 2. Add a debug model to the webpage by using debug=true or debug=false in the URL query parameters to enable or disable debug mode. Signed-off-by: jaime <yun.zhang@zilliz.com>
1 parent d7b2ffe commit 9d16b97

84 files changed

Lines changed: 2881 additions & 1153 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ require (
6363
github.com/bytedance/sonic v1.12.2
6464
github.com/cenkalti/backoff/v4 v4.2.1
6565
github.com/cockroachdb/redact v1.1.3
66+
github.com/goccy/go-json v0.10.3
6667
github.com/greatroar/blobloom v0.0.0-00010101000000-000000000000
6768
github.com/hashicorp/golang-lru/v2 v2.0.7
6869
github.com/jolestar/go-commons-pool/v2 v2.1.2
@@ -131,7 +132,6 @@ require (
131132
github.com/go-ole/go-ole v1.2.6 // indirect
132133
github.com/go-playground/locales v0.14.1 // indirect
133134
github.com/go-playground/universal-translator v0.18.1 // indirect
134-
github.com/goccy/go-json v0.10.3 // indirect
135135
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
136136
github.com/godbus/dbus/v5 v5.0.4 // indirect
137137
github.com/gogo/protobuf v1.3.2 // indirect

internal/datacoord/compaction_task_l0_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (s *L0CompactionTaskSuite) SetupTest() {
5050
s.mockMeta = NewMockCompactionMeta(s.T())
5151
s.mockSessMgr = session.NewMockDataNodeManager(s.T())
5252
s.mockAlloc = allocator.NewMockAllocator(s.T())
53-
//s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
53+
// s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
5454
}
5555

5656
func (s *L0CompactionTaskSuite) SetupSubTest() {

internal/datacoord/compaction_task_meta.go

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,43 @@ package datacoord
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"sync"
23+
"time"
2224

25+
"github.com/hashicorp/golang-lru/v2/expirable"
2326
"go.uber.org/zap"
2427
"google.golang.org/protobuf/proto"
2528

2629
"github.com/milvus-io/milvus/internal/metastore"
2730
"github.com/milvus-io/milvus/internal/proto/datapb"
2831
"github.com/milvus-io/milvus/pkg/log"
32+
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
2933
"github.com/milvus-io/milvus/pkg/util/timerecord"
3034
)
3135

36+
func newCompactionTaskStats(task *datapb.CompactionTask) *metricsinfo.CompactionTask {
37+
return &metricsinfo.CompactionTask{
38+
PlanID: task.PlanID,
39+
CollectionID: task.CollectionID,
40+
Type: task.Type.String(),
41+
State: task.State.String(),
42+
FailReason: task.FailReason,
43+
StartTime: task.StartTime,
44+
EndTime: task.EndTime,
45+
TotalRows: task.TotalRows,
46+
InputSegments: task.InputSegments,
47+
ResultSegments: task.ResultSegments,
48+
}
49+
}
50+
3251
type compactionTaskMeta struct {
3352
sync.RWMutex
3453
ctx context.Context
3554
catalog metastore.DataCoordCatalog
3655
// currently only clustering compaction task is stored in persist meta
3756
compactionTasks map[int64]map[int64]*datapb.CompactionTask // triggerID -> planID
57+
taskStats *expirable.LRU[UniqueID, *metricsinfo.CompactionTask]
3858
}
3959

4060
func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*compactionTaskMeta, error) {
@@ -43,6 +63,7 @@ func newCompactionTaskMeta(ctx context.Context, catalog metastore.DataCoordCatal
4363
ctx: ctx,
4464
catalog: catalog,
4565
compactionTasks: make(map[int64]map[int64]*datapb.CompactionTask, 0),
66+
taskStats: expirable.NewLRU[UniqueID, *metricsinfo.CompactionTask](1024, nil, time.Minute*60),
4667
}
4768
if err := csm.reloadFromKV(); err != nil {
4869
return nil, err
@@ -125,16 +146,17 @@ func (csm *compactionTaskMeta) SaveCompactionTask(task *datapb.CompactionTask) e
125146
log.Error("meta update: update compaction task fail", zap.Error(err))
126147
return err
127148
}
128-
return csm.saveCompactionTaskMemory(task)
149+
csm.saveCompactionTaskMemory(task)
150+
return nil
129151
}
130152

131-
func (csm *compactionTaskMeta) saveCompactionTaskMemory(task *datapb.CompactionTask) error {
153+
func (csm *compactionTaskMeta) saveCompactionTaskMemory(task *datapb.CompactionTask) {
132154
_, triggerIDExist := csm.compactionTasks[task.TriggerID]
133155
if !triggerIDExist {
134156
csm.compactionTasks[task.TriggerID] = make(map[int64]*datapb.CompactionTask, 0)
135157
}
136158
csm.compactionTasks[task.TriggerID][task.PlanID] = task
137-
return nil
159+
csm.taskStats.Add(task.PlanID, newCompactionTaskStats(task))
138160
}
139161

140162
func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) error {
@@ -153,3 +175,16 @@ func (csm *compactionTaskMeta) DropCompactionTask(task *datapb.CompactionTask) e
153175
}
154176
return nil
155177
}
178+
179+
func (csm *compactionTaskMeta) TaskStatsJSON() string {
180+
tasks := csm.taskStats.Values()
181+
if len(tasks) == 0 {
182+
return ""
183+
}
184+
185+
ret, err := json.Marshal(tasks)
186+
if err != nil {
187+
return ""
188+
}
189+
return string(ret)
190+
}

internal/datacoord/compaction_task_meta_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ package datacoord
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"testing"
23+
"time"
2224

2325
"github.com/stretchr/testify/mock"
2426
"github.com/stretchr/testify/suite"
2527

2628
"github.com/milvus-io/milvus/internal/metastore/mocks"
2729
"github.com/milvus-io/milvus/internal/proto/datapb"
30+
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
2831
)
2932

3033
func TestCompactionTaskMetaSuite(t *testing.T) {
@@ -79,3 +82,49 @@ func (suite *CompactionTaskMetaSuite) TestGetCompactionTasksByCollectionAbnormal
7982
res := suite.meta.GetCompactionTasksByCollection(101)
8083
suite.Equal(1, len(res))
8184
}
85+
86+
func (suite *CompactionTaskMetaSuite) TestTaskStatsJSON() {
87+
task1 := &datapb.CompactionTask{
88+
PlanID: 1,
89+
CollectionID: 100,
90+
Type: datapb.CompactionType_MergeCompaction,
91+
State: datapb.CompactionTaskState_completed,
92+
FailReason: "",
93+
StartTime: time.Now().Unix(),
94+
EndTime: time.Now().Add(time.Hour).Unix(),
95+
TotalRows: 1000,
96+
InputSegments: []int64{1, 2},
97+
ResultSegments: []int64{3},
98+
}
99+
task2 := &datapb.CompactionTask{
100+
PlanID: 2,
101+
CollectionID: 101,
102+
Type: datapb.CompactionType_MergeCompaction,
103+
State: datapb.CompactionTaskState_completed,
104+
FailReason: "",
105+
StartTime: time.Now().Unix(),
106+
EndTime: time.Now().Add(time.Hour).Unix(),
107+
TotalRows: 2000,
108+
InputSegments: []int64{4, 5},
109+
ResultSegments: []int64{6},
110+
}
111+
112+
// testing return empty string
113+
actualJSON := suite.meta.TaskStatsJSON()
114+
suite.Equal("", actualJSON)
115+
116+
err := suite.meta.SaveCompactionTask(task1)
117+
suite.NoError(err)
118+
err = suite.meta.SaveCompactionTask(task2)
119+
suite.NoError(err)
120+
121+
expectedTasks := []*metricsinfo.CompactionTask{
122+
newCompactionTaskStats(task1),
123+
newCompactionTaskStats(task2),
124+
}
125+
expectedJSON, err := json.Marshal(expectedTasks)
126+
suite.NoError(err)
127+
128+
actualJSON = suite.meta.TaskStatsJSON()
129+
suite.JSONEq(string(expectedJSON), actualJSON)
130+
}

internal/datacoord/compaction_test.go

Lines changed: 80 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
133133
plans []*datapb.CompactionPlan
134134
expectedOut []UniqueID // planID
135135
}{
136-
{"with L0 tasks diff channel",
136+
{
137+
"with L0 tasks diff channel",
137138
[]CompactionTask{
138139
newL0CompactionTask(&datapb.CompactionTask{
139140
PlanID: 10,
@@ -156,7 +157,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
156157
},
157158
[]UniqueID{10, 11},
158159
},
159-
{"with L0 tasks same channel",
160+
{
161+
"with L0 tasks same channel",
160162
[]CompactionTask{
161163
newMixCompactionTask(&datapb.CompactionTask{
162164
PlanID: 11,
@@ -179,7 +181,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
179181
},
180182
[]UniqueID{10},
181183
},
182-
{"without L0 tasks",
184+
{
185+
"without L0 tasks",
183186
[]CompactionTask{
184187
newMixCompactionTask(&datapb.CompactionTask{
185188
PlanID: 14,
@@ -202,10 +205,12 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
202205
},
203206
[]UniqueID{13, 14},
204207
},
205-
{"empty tasks",
208+
{
209+
"empty tasks",
206210
[]CompactionTask{},
207211
[]*datapb.CompactionPlan{},
208-
[]UniqueID{}},
212+
[]UniqueID{},
213+
},
209214
}
210215

211216
for _, test := range tests {
@@ -235,7 +240,8 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
235240
plans []*datapb.CompactionPlan
236241
expectedOut []UniqueID // planID
237242
}{
238-
{"with L0 tasks diff channel",
243+
{
244+
"with L0 tasks diff channel",
239245
[]CompactionTask{
240246
newL0CompactionTask(&datapb.CompactionTask{
241247
PlanID: 10,
@@ -255,85 +261,92 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
255261
[]*datapb.CompactionPlan{{}, {}},
256262
[]UniqueID{10, 11},
257263
},
258-
{"with L0 tasks same channel", []CompactionTask{
259-
newL0CompactionTask(&datapb.CompactionTask{
260-
PlanID: 10,
261-
Type: datapb.CompactionType_Level0DeleteCompaction,
262-
State: datapb.CompactionTaskState_pipelining,
263-
Channel: "ch-11",
264-
NodeID: 102,
265-
}, nil, s.mockMeta, s.mockSessMgr),
266-
newMixCompactionTask(&datapb.CompactionTask{
267-
PlanID: 11,
268-
Type: datapb.CompactionType_MixCompaction,
269-
State: datapb.CompactionTaskState_pipelining,
270-
Channel: "ch-11",
271-
NodeID: 102,
272-
}, nil, s.mockMeta, s.mockSessMgr),
273-
newMixCompactionTask(&datapb.CompactionTask{
274-
PlanID: 13,
275-
Type: datapb.CompactionType_MixCompaction,
276-
State: datapb.CompactionTaskState_pipelining,
277-
Channel: "ch-3",
278-
NodeID: 102,
279-
}, nil, s.mockMeta, s.mockSessMgr),
280-
},
264+
{
265+
"with L0 tasks same channel",
266+
[]CompactionTask{
267+
newL0CompactionTask(&datapb.CompactionTask{
268+
PlanID: 10,
269+
Type: datapb.CompactionType_Level0DeleteCompaction,
270+
State: datapb.CompactionTaskState_pipelining,
271+
Channel: "ch-11",
272+
NodeID: 102,
273+
}, nil, s.mockMeta, s.mockSessMgr),
274+
newMixCompactionTask(&datapb.CompactionTask{
275+
PlanID: 11,
276+
Type: datapb.CompactionType_MixCompaction,
277+
State: datapb.CompactionTaskState_pipelining,
278+
Channel: "ch-11",
279+
NodeID: 102,
280+
}, nil, s.mockMeta, s.mockSessMgr),
281+
newMixCompactionTask(&datapb.CompactionTask{
282+
PlanID: 13,
283+
Type: datapb.CompactionType_MixCompaction,
284+
State: datapb.CompactionTaskState_pipelining,
285+
Channel: "ch-3",
286+
NodeID: 102,
287+
}, nil, s.mockMeta, s.mockSessMgr),
288+
},
281289
[]*datapb.CompactionPlan{
282290
{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
283291
{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction},
284292
{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction},
285293
},
286294
[]UniqueID{10, 13},
287295
},
288-
{"with multiple L0 tasks same channel", []CompactionTask{
289-
newL0CompactionTask(&datapb.CompactionTask{
290-
PlanID: 10,
291-
Type: datapb.CompactionType_Level0DeleteCompaction,
292-
State: datapb.CompactionTaskState_pipelining,
293-
Channel: "ch-11",
294-
NodeID: 102,
295-
}, nil, s.mockMeta, s.mockSessMgr),
296-
newL0CompactionTask(&datapb.CompactionTask{
297-
PlanID: 11,
298-
Type: datapb.CompactionType_Level0DeleteCompaction,
299-
State: datapb.CompactionTaskState_pipelining,
300-
Channel: "ch-11",
301-
NodeID: 102,
302-
}, nil, s.mockMeta, s.mockSessMgr),
303-
newL0CompactionTask(&datapb.CompactionTask{
304-
PlanID: 12,
305-
Type: datapb.CompactionType_Level0DeleteCompaction,
306-
State: datapb.CompactionTaskState_pipelining,
307-
Channel: "ch-11",
308-
NodeID: 102,
309-
}, nil, s.mockMeta, s.mockSessMgr),
310-
},
296+
{
297+
"with multiple L0 tasks same channel",
298+
[]CompactionTask{
299+
newL0CompactionTask(&datapb.CompactionTask{
300+
PlanID: 10,
301+
Type: datapb.CompactionType_Level0DeleteCompaction,
302+
State: datapb.CompactionTaskState_pipelining,
303+
Channel: "ch-11",
304+
NodeID: 102,
305+
}, nil, s.mockMeta, s.mockSessMgr),
306+
newL0CompactionTask(&datapb.CompactionTask{
307+
PlanID: 11,
308+
Type: datapb.CompactionType_Level0DeleteCompaction,
309+
State: datapb.CompactionTaskState_pipelining,
310+
Channel: "ch-11",
311+
NodeID: 102,
312+
}, nil, s.mockMeta, s.mockSessMgr),
313+
newL0CompactionTask(&datapb.CompactionTask{
314+
PlanID: 12,
315+
Type: datapb.CompactionType_Level0DeleteCompaction,
316+
State: datapb.CompactionTaskState_pipelining,
317+
Channel: "ch-11",
318+
NodeID: 102,
319+
}, nil, s.mockMeta, s.mockSessMgr),
320+
},
311321
[]*datapb.CompactionPlan{
312322
{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
313323
{PlanID: 11, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
314324
{PlanID: 12, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction},
315325
},
316326
[]UniqueID{10, 11, 12},
317327
},
318-
{"without L0 tasks", []CompactionTask{
319-
newMixCompactionTask(&datapb.CompactionTask{
320-
PlanID: 14,
321-
Type: datapb.CompactionType_MixCompaction,
322-
Channel: "ch-3",
323-
NodeID: 102,
324-
}, nil, s.mockMeta, s.mockSessMgr),
325-
newMixCompactionTask(&datapb.CompactionTask{
326-
PlanID: 13,
327-
Type: datapb.CompactionType_MixCompaction,
328-
Channel: "ch-11",
329-
NodeID: 102,
330-
}, nil, s.mockMeta, s.mockSessMgr),
331-
},
328+
{
329+
"without L0 tasks",
330+
[]CompactionTask{
331+
newMixCompactionTask(&datapb.CompactionTask{
332+
PlanID: 14,
333+
Type: datapb.CompactionType_MixCompaction,
334+
Channel: "ch-3",
335+
NodeID: 102,
336+
}, nil, s.mockMeta, s.mockSessMgr),
337+
newMixCompactionTask(&datapb.CompactionTask{
338+
PlanID: 13,
339+
Type: datapb.CompactionType_MixCompaction,
340+
Channel: "ch-11",
341+
NodeID: 102,
342+
}, nil, s.mockMeta, s.mockSessMgr),
343+
},
332344
[]*datapb.CompactionPlan{
333345
{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction},
334346
{},
335347
},
336-
[]UniqueID{13, 14}},
348+
[]UniqueID{13, 14},
349+
},
337350
{"empty tasks", []CompactionTask{}, []*datapb.CompactionPlan{}, []UniqueID{}},
338351
}
339352

0 commit comments

Comments
 (0)