Skip to content

Commit 5551d99

Browse files
authored
enhance: remove old arch non-streaming arch code (milvus-io#43651)
issue: milvus-io#41609 - remove all dml dead code at proxy - remove dead code at l0_write_buffer - remove msgstream dependency at proxy - remove timetick reporter from proxy - remove replicate stream implementation --------- Signed-off-by: chyezh <chyezh@outlook.com>
1 parent 6ae7277 commit 5551d99

73 files changed

Lines changed: 611 additions & 4512 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmd/milvus/util.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,7 @@ func GetMilvusRoles(args []string, flags *flag.FlagSet) *roles.MilvusRoles {
146146
role.EnableProxy = true
147147
role.EnableQueryNode = true
148148
role.EnableDataNode = true
149-
if streamingutil.IsStreamingServiceEnabled() {
150-
role.EnableStreamingNode = true
151-
}
149+
role.EnableStreamingNode = true
152150
role.Local = true
153151
role.Embedded = serverType == typeutil.EmbeddedRole
154152
case typeutil.MixCoordRole:

internal/.mockery.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ packages:
1212
Utility:
1313
Broadcast:
1414
Local:
15+
Scanner:
1516
github.com/milvus-io/milvus/internal/streamingcoord/server/balancer:
1617
interfaces:
1718
Balancer:

internal/datacoord/services.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1462,6 +1462,9 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq
14621462
for _, sid := range req.GetSegmentIDs() {
14631463
segment := s.meta.GetHealthySegment(ctx, sid)
14641464
// segment is nil if it was compacted, or it's an empty segment and is set to dropped
1465+
// TODO: Here's a dirty implementation, because a growing segment may cannot be seen right away by mixcoord,
1466+
// it can only be seen by streamingnode right away, so we need to check the flush state at streamingnode but not here.
1467+
// use timetick for GetFlushState in-future but not segment list.
14651468
if segment == nil || isFlushState(segment.GetState()) {
14661469
continue
14671470
}

internal/distributed/datanode/service.go

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,10 @@ import (
3232
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
3333
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
3434
dn "github.com/milvus-io/milvus/internal/datanode"
35-
mix "github.com/milvus-io/milvus/internal/distributed/mixcoord/client"
3635
"github.com/milvus-io/milvus/internal/distributed/utils"
3736
"github.com/milvus-io/milvus/internal/types"
38-
"github.com/milvus-io/milvus/internal/util/componentutil"
3937
"github.com/milvus-io/milvus/internal/util/dependency"
4038
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
41-
"github.com/milvus-io/milvus/internal/util/streamingutil"
4239
"github.com/milvus-io/milvus/pkg/v2/log"
4340
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
4441
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
@@ -65,8 +62,6 @@ type Server struct {
6562
factory dependency.Factory
6663

6764
serverID atomic.Int64
68-
69-
mixCoordClient func() (types.MixCoordClient, error)
7065
}
7166

7267
// NewServer new DataNode grpc server
@@ -77,9 +72,6 @@ func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error)
7772
cancel: cancel,
7873
factory: factory,
7974
grpcErrChan: make(chan error),
80-
mixCoordClient: func() (types.MixCoordClient, error) {
81-
return mix.NewClient(ctx1)
82-
},
8375
}
8476

8577
s.serverID.Store(paramtable.GetNodeID())
@@ -173,10 +165,6 @@ func (s *Server) SetEtcdClient(client *clientv3.Client) {
173165
s.datanode.SetEtcdClient(client)
174166
}
175167

176-
func (s *Server) SetMixCoordInterface(ms types.MixCoordClient) error {
177-
return s.datanode.SetMixCoordClient(ms)
178-
}
179-
180168
// Run initializes and starts Datanode's grpc service.
181169
func (s *Server) Run() error {
182170
if err := s.init(); err != nil {
@@ -255,27 +243,6 @@ func (s *Server) init() error {
255243
return err
256244
}
257245

258-
if !streamingutil.IsStreamingServiceEnabled() {
259-
// --- MixCoord Client ---
260-
if s.mixCoordClient != nil {
261-
log.Info("initializing MixCoord client for DataNode")
262-
mixCoordClient, err := s.mixCoordClient()
263-
if err != nil {
264-
log.Error("failed to create new MixCoord client", zap.Error(err))
265-
panic(err)
266-
}
267-
268-
if err = componentutil.WaitForComponentHealthy(s.ctx, mixCoordClient, "MixCoord", 1000000, time.Millisecond*200); err != nil {
269-
log.Error("failed to wait for MixCoord client to be ready", zap.Error(err))
270-
panic(err)
271-
}
272-
log.Info("MixCoord client is ready for DataNode")
273-
if err = s.SetMixCoordInterface(mixCoordClient); err != nil {
274-
panic(err)
275-
}
276-
}
277-
}
278-
279246
s.datanode.UpdateStateCode(commonpb.StateCode_Initializing)
280247

281248
if err := s.datanode.Init(); err != nil {

internal/distributed/datanode/service_test.go

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
2828
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
2929
"github.com/milvus-io/milvus/internal/mocks"
30-
"github.com/milvus-io/milvus/internal/types"
3130
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
3231
"github.com/milvus-io/milvus/pkg/v2/proto/internalpb"
3332
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
@@ -43,27 +42,10 @@ func Test_NewServer(t *testing.T) {
4342
assert.NoError(t, err)
4443
assert.NotNil(t, server)
4544

46-
mockMixCoord := mocks.NewMockMixCoordClient(t)
47-
mockMixCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
48-
State: &milvuspb.ComponentInfo{
49-
StateCode: commonpb.StateCode_Healthy,
50-
},
51-
Status: merr.Success(),
52-
SubcomponentStates: []*milvuspb.ComponentInfo{
53-
{
54-
StateCode: commonpb.StateCode_Healthy,
55-
},
56-
},
57-
}, nil)
58-
server.mixCoordClient = func() (types.MixCoordClient, error) {
59-
return mockMixCoord, nil
60-
}
61-
6245
t.Run("Run", func(t *testing.T) {
6346
datanode := mocks.NewMockDataNode(t)
6447
datanode.EXPECT().SetEtcdClient(mock.Anything).Return()
6548
datanode.EXPECT().SetAddress(mock.Anything).Return()
66-
datanode.EXPECT().SetMixCoordClient(mock.Anything).Return(nil)
6749
datanode.EXPECT().UpdateStateCode(mock.Anything).Return()
6850
datanode.EXPECT().Register().Return(nil)
6951
datanode.EXPECT().Init().Return(nil)
@@ -191,26 +173,9 @@ func Test_Run(t *testing.T) {
191173
assert.NoError(t, err)
192174
assert.NotNil(t, server)
193175

194-
mockRootCoord := mocks.NewMockMixCoordClient(t)
195-
mockRootCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
196-
State: &milvuspb.ComponentInfo{
197-
StateCode: commonpb.StateCode_Healthy,
198-
},
199-
Status: merr.Success(),
200-
SubcomponentStates: []*milvuspb.ComponentInfo{
201-
{
202-
StateCode: commonpb.StateCode_Healthy,
203-
},
204-
},
205-
}, nil)
206-
server.mixCoordClient = func() (types.MixCoordClient, error) {
207-
return mockRootCoord, nil
208-
}
209-
210176
datanode := mocks.NewMockDataNode(t)
211177
datanode.EXPECT().SetEtcdClient(mock.Anything).Return()
212178
datanode.EXPECT().SetAddress(mock.Anything).Return()
213-
datanode.EXPECT().SetMixCoordClient(mock.Anything).Return(nil)
214179
datanode.EXPECT().UpdateStateCode(mock.Anything).Return()
215180
datanode.EXPECT().Init().Return(errors.New("mock err"))
216181
server.datanode = datanode
@@ -223,7 +188,6 @@ func Test_Run(t *testing.T) {
223188
datanode = mocks.NewMockDataNode(t)
224189
datanode.EXPECT().SetEtcdClient(mock.Anything).Return()
225190
datanode.EXPECT().SetAddress(mock.Anything).Return()
226-
datanode.EXPECT().SetMixCoordClient(mock.Anything).Return(nil)
227191
datanode.EXPECT().UpdateStateCode(mock.Anything).Return()
228192
datanode.EXPECT().Register().Return(nil)
229193
datanode.EXPECT().Init().Return(nil)
@@ -242,26 +206,9 @@ func TestIndexService(t *testing.T) {
242206
assert.NoError(t, err)
243207
assert.NotNil(t, server)
244208

245-
mockRootCoord := mocks.NewMockMixCoordClient(t)
246-
mockRootCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
247-
State: &milvuspb.ComponentInfo{
248-
StateCode: commonpb.StateCode_Healthy,
249-
},
250-
Status: merr.Success(),
251-
SubcomponentStates: []*milvuspb.ComponentInfo{
252-
{
253-
StateCode: commonpb.StateCode_Healthy,
254-
},
255-
},
256-
}, nil)
257-
server.mixCoordClient = func() (types.MixCoordClient, error) {
258-
return mockRootCoord, nil
259-
}
260-
261209
dn := mocks.NewMockDataNode(t)
262210
dn.EXPECT().SetEtcdClient(mock.Anything).Return()
263211
dn.EXPECT().SetAddress(mock.Anything).Return()
264-
dn.EXPECT().SetMixCoordClient(mock.Anything).Return(nil)
265212
dn.EXPECT().UpdateStateCode(mock.Anything).Return()
266213
dn.EXPECT().Register().Return(nil)
267214
dn.EXPECT().Init().Return(nil)

internal/distributed/proxy/service.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,6 @@ func (s *Server) init() error {
449449
return err
450450
}
451451
s.etcdCli = etcdCli
452-
s.proxy.SetEtcdClient(s.etcdCli)
453452
s.proxy.SetAddress(s.listenerManager.internalGrpcListener.Address())
454453

455454
errChan := make(chan error, 1)

internal/distributed/proxy/service_test.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ func Test_NewServer(t *testing.T) {
201201
mockProxy.EXPECT().Init().Return(nil)
202202
mockProxy.EXPECT().Start().Return(nil)
203203
mockProxy.EXPECT().Register().Return(nil)
204-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
205204
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
206205
mockProxy.EXPECT().SetMixCoordClient(mock.Anything).Return()
207206
mockProxy.EXPECT().UpdateStateCode(mock.Anything).Return()
@@ -687,7 +686,6 @@ func Test_NewServer(t *testing.T) {
687686
mockProxy.EXPECT().Init().Return(nil)
688687
mockProxy.EXPECT().Start().Return(nil)
689688
mockProxy.EXPECT().Register().Return(nil)
690-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
691689
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
692690
mockProxy.EXPECT().SetMixCoordClient(mock.Anything).Return()
693691
mockProxy.EXPECT().UpdateStateCode(mock.Anything).Return()
@@ -821,7 +819,6 @@ func Test_NewServer_HTTPServer_Enabled(t *testing.T) {
821819
mockProxy.EXPECT().Init().Return(nil)
822820
mockProxy.EXPECT().Start().Return(nil)
823821
mockProxy.EXPECT().Register().Return(nil)
824-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
825822
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
826823
mockProxy.EXPECT().SetMixCoordClient(mock.Anything).Return()
827824
mockProxy.EXPECT().UpdateStateCode(mock.Anything).Return()
@@ -886,7 +883,6 @@ func Test_NewServer_TLS_TwoWay(t *testing.T) {
886883
mockProxy.EXPECT().Init().Return(nil)
887884
mockProxy.EXPECT().Start().Return(nil)
888885
mockProxy.EXPECT().Register().Return(nil)
889-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
890886
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
891887
mockProxy.EXPECT().SetMixCoordClient(mock.Anything).Return()
892888
mockProxy.EXPECT().UpdateStateCode(mock.Anything).Return()
@@ -914,7 +910,6 @@ func Test_NewServer_TLS_OneWay(t *testing.T) {
914910
mockProxy.EXPECT().Init().Return(nil)
915911
mockProxy.EXPECT().Start().Return(nil)
916912
mockProxy.EXPECT().Register().Return(nil)
917-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
918913
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
919914
mockProxy.EXPECT().SetMixCoordClient(mock.Anything).Return()
920915
mockProxy.EXPECT().UpdateStateCode(mock.Anything).Return()
@@ -938,7 +933,6 @@ func Test_NewServer_TLS_FileNotExisted(t *testing.T) {
938933

939934
mockProxy := server.proxy.(*mocks.MockProxy)
940935
mockProxy.EXPECT().Stop().Return(nil)
941-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
942936
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
943937
mockProxy.EXPECT().SetAddress(mock.Anything).Return()
944938

@@ -977,7 +971,6 @@ func Test_NewHTTPServer_TLS_TwoWay(t *testing.T) {
977971
mockProxy.EXPECT().Init().Return(nil)
978972
mockProxy.EXPECT().Start().Return(nil)
979973
mockProxy.EXPECT().Register().Return(nil)
980-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
981974
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
982975
mockProxy.EXPECT().SetMixCoordClient(mock.Anything).Return()
983976
mockProxy.EXPECT().UpdateStateCode(mock.Anything).Return()
@@ -1013,7 +1006,6 @@ func Test_NewHTTPServer_TLS_OneWay(t *testing.T) {
10131006
mockProxy.EXPECT().Init().Return(nil)
10141007
mockProxy.EXPECT().Start().Return(nil)
10151008
mockProxy.EXPECT().Register().Return(nil)
1016-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
10171009
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
10181010
mockProxy.EXPECT().SetMixCoordClient(mock.Anything).Return()
10191011
mockProxy.EXPECT().UpdateStateCode(mock.Anything).Return()
@@ -1046,7 +1038,6 @@ func Test_NewHTTPServer_TLS_FileNotExisted(t *testing.T) {
10461038

10471039
mockProxy := server.proxy.(*mocks.MockProxy)
10481040
mockProxy.EXPECT().Stop().Return(nil)
1049-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return().Maybe()
10501041
mockProxy.EXPECT().SetAddress(mock.Anything).Return().Maybe()
10511042
Params := &paramtable.Get().ProxyGrpcServerCfg
10521043

@@ -1160,7 +1151,6 @@ func Test_Service_GracefulStop(t *testing.T) {
11601151
mockProxy.EXPECT().Start().Return(nil)
11611152
mockProxy.EXPECT().Stop().Return(nil)
11621153
mockProxy.EXPECT().Register().Return(nil)
1163-
mockProxy.EXPECT().SetEtcdClient(mock.Anything).Return()
11641154
mockProxy.EXPECT().GetRateLimiter().Return(nil, nil)
11651155
mockProxy.EXPECT().SetMixCoordClient(mock.Anything).Return()
11661156
mockProxy.EXPECT().UpdateStateCode(mock.Anything).Return()

internal/distributed/streaming/streaming.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"time"
66

7-
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/registry"
87
kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
98
"github.com/milvus-io/milvus/pkg/v2/streaming/util/message"
109
"github.com/milvus-io/milvus/pkg/v2/streaming/util/options"
@@ -18,8 +17,6 @@ var singleton WALAccesser = nil
1817
func Init() {
1918
c, _ := kvfactory.GetEtcdAndPath()
2019
singleton = newWALAccesser(c)
21-
// Add the wal accesser to the broadcaster registry for making broadcast operation.
22-
registry.Register(registry.AppendOperatorTypeStreaming, singleton)
2320
}
2421

2522
// Release releases the resources of the wal accesser.

0 commit comments

Comments
 (0)