Skip to content

Commit 9cb5271

Browse files
authored
enhance: remove support of embeded nats mq (milvus-io#41565)
issue: milvus-io#41564 Signed-off-by: chyezh <chyezh@outlook.com>
1 parent bb7df40 commit 9cb5271

27 files changed

Lines changed: 35 additions & 2145 deletions

cmd/roles/roles.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ import (
4646
"github.com/milvus-io/milvus/pkg/v2/log"
4747
"github.com/milvus-io/milvus/pkg/v2/metrics"
4848
rocksmqimpl "github.com/milvus-io/milvus/pkg/v2/mq/mqimpl/rocksmq/server"
49-
"github.com/milvus-io/milvus/pkg/v2/mq/msgstream/mqwrapper/nmq"
5049
"github.com/milvus-io/milvus/pkg/v2/tracer"
5150
"github.com/milvus-io/milvus/pkg/v2/util/etcd"
5251
"github.com/milvus-io/milvus/pkg/v2/util/expr"
@@ -334,10 +333,8 @@ func (mr *MilvusRoles) Run() {
334333
params := paramtable.Get()
335334
if paramtable.Get().RocksmqEnable() {
336335
defer stopRocksmq()
337-
} else if paramtable.Get().NatsmqEnable() {
338-
defer nmq.CloseNatsMQ()
339336
} else {
340-
panic("only support Rocksmq and Natsmq in standalone mode")
337+
panic("only support Rocksmq in standalone mode")
341338
}
342339
if params.EtcdCfg.UseEmbedEtcd.GetAsBool() {
343340
// Start etcd server.

cmd/tools/config/generate.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -233,11 +233,11 @@ func WriteYaml(w io.Writer) {
233233
{
234234
name: "mq",
235235
header: `
236-
# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
236+
# Milvus supports four MQ: rocksmq(based on RockDB), Pulsar and Kafka.
237237
# You can change your mq by setting mq.type field.
238238
# If you don't set mq.type field as default, there is a note about enabling priority if we config multiple mq in this file.
239-
# 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka
240-
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)`,
239+
# 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka
240+
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode)`,
241241
},
242242
{
243243
name: "woodpecker",
@@ -257,12 +257,6 @@ func WriteYaml(w io.Writer) {
257257
{
258258
name: "rocksmq",
259259
},
260-
{
261-
name: "natsmq",
262-
header: `
263-
# natsmq configuration.
264-
# more detail: https://docs.nats.io/running-a-nats-service/configuration`,
265-
},
266260
{
267261
name: "mixCoord",
268262
header: "\n# Related configuration of mixCoord",

configs/milvus.yaml

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -158,14 +158,14 @@ minio:
158158
# 0 means using oss client by default, decrease these configration if ListObjects timeout
159159
listObjectsMaxKeys: 0
160160

161-
# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
161+
# Milvus supports four MQ: rocksmq(based on RockDB), Pulsar and Kafka.
162162
# You can change your mq by setting mq.type field.
163163
# If you don't set mq.type field as default, there is a note about enabling priority if we config multiple mq in this file.
164-
# 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka
165-
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)
164+
# 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka
165+
# 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode)
166166
mq:
167167
# Default value: "default"
168-
# Valid values: [default, pulsar, kafka, rocksmq, natsmq, woodpecker]
168+
# Valid values: [default, pulsar, kafka, rocksmq, woodpecker]
169169
type: default
170170
enablePursuitMode: true # Default value: "true"
171171
pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds
@@ -256,27 +256,6 @@ rocksmq:
256256
compactionInterval: 86400 # Time interval to trigger rocksdb compaction to remove deleted data. Unit: Second
257257
compressionTypes: 0,0,7,7,7 # compaction compression type, only support use 0,7. 0 means not compress, 7 will use zstd. Length of types means num of rocksdb level.
258258

259-
# natsmq configuration.
260-
# more detail: https://docs.nats.io/running-a-nats-service/configuration
261-
natsmq:
262-
server:
263-
port: 4222 # Listening port of the NATS server.
264-
storeDir: /var/lib/milvus/nats # Directory to use for JetStream storage of nats
265-
maxFileStore: 17179869184 # Maximum size of the 'file' storage
266-
maxPayload: 8388608 # Maximum number of bytes in a message payload
267-
maxPending: 67108864 # Maximum number of bytes buffered for a connection Applies to client connections
268-
initializeTimeout: 4000 # waiting for initialization of natsmq finished
269-
monitor:
270-
trace: false # If true enable protocol trace log messages
271-
debug: false # If true enable debug log messages
272-
logTime: true # If set to false, log without timestamps.
273-
logFile: /tmp/milvus/logs/nats.log # Log file path relative to .. of milvus binary if use relative path
274-
logSizeLimit: 536870912 # Size in bytes after the log file rolls over to a new one
275-
retention:
276-
maxAge: 4320 # Maximum age of any message in the P-channel
277-
maxBytes: # How many bytes the single P-channel may contain. Removing oldest messages if the P-channel exceeds this size
278-
maxMsgs: # How many message the single P-channel may contain. Removing oldest messages if the P-channel exceeds this limit
279-
280259
# Related configuration of mixCoord
281260
mixCoord:
282261
enableActiveStandby: false

go.mod

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,17 +189,11 @@ require (
189189
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
190190
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
191191
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
192-
github.com/minio/highwayhash v1.0.2 // indirect
193192
github.com/minio/md5-simd v1.1.2 // indirect
194193
github.com/mitchellh/mapstructure v1.5.0 // indirect
195194
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
196195
github.com/modern-go/reflect2 v1.0.2 // indirect
197196
github.com/mtibben/percent v0.2.1 // indirect
198-
github.com/nats-io/jwt/v2 v2.5.5 // indirect
199-
github.com/nats-io/nats-server/v2 v2.10.12 // indirect
200-
github.com/nats-io/nats.go v1.34.1 // indirect
201-
github.com/nats-io/nkeys v0.4.7 // indirect
202-
github.com/nats-io/nuid v1.0.1 // indirect
203197
github.com/opencontainers/runtime-spec v1.0.2 // indirect
204198
github.com/opentracing/opentracing-go v1.2.0 // indirect
205199
github.com/panjf2000/ants/v2 v2.7.2 // indirect

go.sum

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -738,20 +738,14 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6 h1:YHMFI6L
738738
github.com/milvus-io/cgosymbolizer v0.0.0-20250318084424-114f4050c3a6/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
739739
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
740740
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
741-
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971 h1:CKKrOtri+dbTUkMJehDuSM489OIqJab1t0pUq+PV73E=
742-
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
743741
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250407030015-dcf7688ad54a h1:W+9nVXKcI9FdiyrFbrs9BIFUqRW0pLY+Fn0fsmmuLyw=
744742
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250407030015-dcf7688ad54a/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
745-
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8 h1:/oUdiYtwVlqiEMSzME7vDvir49Lt23nMpaZC9u22bIo=
746-
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.8/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
747743
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
748744
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
749745
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
750746
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
751747
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
752748
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
753-
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
754-
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
755749
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
756750
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
757751
github.com/minio/minio-go/v7 v7.0.73 h1:qr2vi96Qm7kZ4v7LLebjte+MQh621fFWnv93p12htEo=
@@ -781,17 +775,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
781775
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
782776
github.com/nacos-group/nacos-sdk-go v1.0.8/go.mod h1:hlAPn3UdzlxIlSILAyOXKxjFSvDJ9oLzTJ9hLAK1KzA=
783777
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
784-
github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4=
785-
github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
786-
github.com/nats-io/nats-server/v2 v2.10.12 h1:G6u+RDrHkw4bkwn7I911O5jqys7jJVRY6MwgndyUsnE=
787-
github.com/nats-io/nats-server/v2 v2.10.12/go.mod h1:H1n6zXtYLFCgXcf/SF8QNTSIFuS8tyZQMN9NguUHdEs=
788778
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
789-
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
790-
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
791779
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
792-
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
793-
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
794-
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
795780
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
796781
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
797782
github.com/nrwiersma/avro-benchmarks v0.0.0-20210913175520-21aec48c8f76/go.mod h1:iKyFMidsk/sVYONJRE372sJuX/QTRPacU7imPqqsu7g=
@@ -1338,7 +1323,6 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h
13381323
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
13391324
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
13401325
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
1341-
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
13421326
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
13431327
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
13441328
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

internal/util/dependency/factory.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717

1818
const (
1919
mqTypeDefault = "default"
20-
mqTypeNatsmq = "natsmq"
2120
mqTypeRocksmq = "rocksmq"
2221
mqTypeKafka = "kafka"
2322
mqTypePulsar = "pulsar"
@@ -26,7 +25,6 @@ const (
2625

2726
type mqEnable struct {
2827
Rocksmq bool
29-
Natsmq bool
3028
Pulsar bool
3129
Kafka bool
3230
Woodpecker bool
@@ -67,8 +65,8 @@ func NewFactory(standAlone bool) *DefaultFactory {
6765
// Init create a msg factory(TODO only support one mq at the same time.)
6866
// In order to guarantee backward compatibility of config file, we still support multiple mq configs.
6967
// The initialization of MQ follows the following rules, if the mq.type is default.
70-
// 1. standalone(local) mode: rocksmq(default) > natsmq > Pulsar > Kafka
71-
// 2. cluster mode: Pulsar(default) > Kafka (rocksmq and natsmq is unsupported in cluster mode)
68+
// 1. standalone(local) mode: rocksmq(default) > Pulsar > Kafka
69+
// 2. cluster mode: Pulsar(default) > Kafka (rocksmq is unsupported in cluster mode)
7270
func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
7371
// skip if using default factory
7472
if f.msgStreamFactory != nil {
@@ -84,13 +82,11 @@ func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
8482
}
8583

8684
func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {
87-
mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable(), params.WoodpeckerEnable()})
85+
mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.PulsarEnable(), params.KafkaEnable(), params.WoodpeckerEnable()})
8886
metrics.RegisterMQType(mqType)
8987
log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))
9088

9189
switch mqType {
92-
case mqTypeNatsmq:
93-
f.msgStreamFactory = msgstream.NewNatsmqFactory()
9490
case mqTypeRocksmq:
9591
f.msgStreamFactory = msgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)
9692
case mqTypePulsar:
@@ -135,10 +131,10 @@ func mustSelectMQType(standalone bool, mqType string, enable mqEnable) string {
135131

136132
// Validate mq type.
137133
func validateMQType(standalone bool, mqType string) error {
138-
if mqType != mqTypeNatsmq && mqType != mqTypeRocksmq && mqType != mqTypeKafka && mqType != mqTypePulsar && mqType != mqTypeWoodpecker {
134+
if mqType != mqTypeRocksmq && mqType != mqTypeKafka && mqType != mqTypePulsar && mqType != mqTypeWoodpecker {
139135
return errors.Newf("mq type %s is invalid", mqType)
140136
}
141-
if !standalone && (mqType == mqTypeRocksmq || mqType == mqTypeNatsmq) {
137+
if !standalone && mqType == mqTypeRocksmq {
142138
return errors.Newf("mq %s is only valid in standalone mode")
143139
}
144140
return nil
@@ -169,9 +165,6 @@ type Factory interface {
169165
func HealthCheck(mqType string) *common.MQClusterStatus {
170166
clusterStatus := &common.MQClusterStatus{MqType: mqType}
171167
switch mqType {
172-
case mqTypeNatsmq:
173-
// TODO: implement health check for nats mq
174-
clusterStatus.Health = true
175168
case mqTypeRocksmq:
176169
// TODO: implement health checker for rocks mq
177170
clusterStatus.Health = true

internal/util/dependency/factory_test.go

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,35 +11,32 @@ import (
1111
func TestValidateMQType(t *testing.T) {
1212
assert.Error(t, validateMQType(true, mqTypeDefault))
1313
assert.Error(t, validateMQType(false, mqTypeDefault))
14-
assert.Error(t, validateMQType(false, mqTypeNatsmq))
1514
assert.Error(t, validateMQType(false, mqTypeRocksmq))
1615
assert.NoError(t, validateMQType(true, mqTypeWoodpecker))
1716
assert.NoError(t, validateMQType(false, mqTypeWoodpecker))
1817
}
1918

2019
func TestSelectMQType(t *testing.T) {
21-
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{true, true, true, true, true}), mqTypeRocksmq)
22-
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true, true}), mqTypePulsar)
23-
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, true, true, true}), mqTypePulsar)
24-
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, true, true}), mqTypeKafka)
25-
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false, true}), mqTypeWoodpecker)
26-
assert.Panics(t, func() { mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false, false}) })
27-
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{true, true, true, true, true}), mqTypePulsar)
28-
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true, true}), mqTypePulsar)
29-
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, true, true, true}), mqTypePulsar)
30-
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, true, true}), mqTypeKafka)
31-
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false, true}), mqTypeWoodpecker)
32-
assert.Panics(t, func() { mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false, false}) })
33-
assert.Equal(t, mustSelectMQType(true, mqTypeRocksmq, mqEnable{true, true, true, true, true}), mqTypeRocksmq)
34-
assert.Equal(t, mustSelectMQType(true, mqTypeNatsmq, mqEnable{true, true, true, true, true}), mqTypeNatsmq)
35-
assert.Equal(t, mustSelectMQType(true, mqTypePulsar, mqEnable{true, true, true, true, true}), mqTypePulsar)
36-
assert.Equal(t, mustSelectMQType(true, mqTypeKafka, mqEnable{true, true, true, true, true}), mqTypeKafka)
37-
assert.Equal(t, mustSelectMQType(true, mqTypeWoodpecker, mqEnable{true, true, true, true, true}), mqTypeWoodpecker)
38-
assert.Panics(t, func() { mustSelectMQType(false, mqTypeRocksmq, mqEnable{true, true, true, true, true}) })
39-
assert.Panics(t, func() { mustSelectMQType(false, mqTypeNatsmq, mqEnable{true, true, true, true, true}) })
40-
assert.Equal(t, mustSelectMQType(false, mqTypePulsar, mqEnable{true, true, true, true, true}), mqTypePulsar)
41-
assert.Equal(t, mustSelectMQType(false, mqTypeKafka, mqEnable{true, true, true, true, true}), mqTypeKafka)
42-
assert.Equal(t, mustSelectMQType(false, mqTypeWoodpecker, mqEnable{true, true, true, true, true}), mqTypeWoodpecker)
20+
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{true, true, true, true}), mqTypeRocksmq)
21+
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar)
22+
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar)
23+
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, true, true}), mqTypeKafka)
24+
assert.Equal(t, mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, true}), mqTypeWoodpecker)
25+
assert.Panics(t, func() { mustSelectMQType(true, mqTypeDefault, mqEnable{false, false, false, false}) })
26+
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{true, true, true, true}), mqTypePulsar)
27+
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar)
28+
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, true, true, true}), mqTypePulsar)
29+
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, true, true}), mqTypeKafka)
30+
assert.Equal(t, mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, true}), mqTypeWoodpecker)
31+
assert.Panics(t, func() { mustSelectMQType(false, mqTypeDefault, mqEnable{false, false, false, false}) })
32+
assert.Equal(t, mustSelectMQType(true, mqTypeRocksmq, mqEnable{true, true, true, true}), mqTypeRocksmq)
33+
assert.Equal(t, mustSelectMQType(true, mqTypePulsar, mqEnable{true, true, true, true}), mqTypePulsar)
34+
assert.Equal(t, mustSelectMQType(true, mqTypeKafka, mqEnable{true, true, true, true}), mqTypeKafka)
35+
assert.Equal(t, mustSelectMQType(true, mqTypeWoodpecker, mqEnable{true, true, true, true}), mqTypeWoodpecker)
36+
assert.Panics(t, func() { mustSelectMQType(false, mqTypeRocksmq, mqEnable{true, true, true, true}) })
37+
assert.Equal(t, mustSelectMQType(false, mqTypePulsar, mqEnable{true, true, true, true}), mqTypePulsar)
38+
assert.Equal(t, mustSelectMQType(false, mqTypeKafka, mqEnable{true, true, true, true}), mqTypeKafka)
39+
assert.Equal(t, mustSelectMQType(false, mqTypeWoodpecker, mqEnable{true, true, true, true}), mqTypeWoodpecker)
4340
}
4441

4542
func TestHealthCheck(t *testing.T) {
@@ -53,7 +50,6 @@ func TestHealthCheck(t *testing.T) {
5350
mqType string
5451
health bool
5552
}{
56-
{mqTypeNatsmq, true},
5753
{mqTypeRocksmq, true},
5854
{mqTypePulsar, false},
5955
{mqTypeKafka, false},

pkg/go.mod

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ require (
2222
github.com/klauspost/compress v1.17.9
2323
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250325034212-6e98baa34971
2424
github.com/minio/minio-go/v7 v7.0.73
25-
github.com/nats-io/nats-server/v2 v2.10.12
26-
github.com/nats-io/nats.go v1.34.1
2725
github.com/panjf2000/ants/v2 v2.7.2
2826
github.com/prometheus/client_golang v1.14.0
2927
github.com/quasilyte/go-ruleguard/dsl v0.3.22
@@ -146,14 +144,10 @@ require (
146144
github.com/mattn/go-isatty v0.0.19 // indirect
147145
github.com/mattn/go-runewidth v0.0.8 // indirect
148146
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
149-
github.com/minio/highwayhash v1.0.2 // indirect
150147
github.com/minio/md5-simd v1.1.2 // indirect
151148
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
152149
github.com/modern-go/reflect2 v1.0.2 // indirect
153150
github.com/mtibben/percent v0.2.1 // indirect
154-
github.com/nats-io/jwt/v2 v2.5.5 // indirect
155-
github.com/nats-io/nkeys v0.4.7 // indirect
156-
github.com/nats-io/nuid v1.0.1 // indirect
157151
github.com/opencontainers/runtime-spec v1.0.2 // indirect
158152
github.com/opentracing/opentracing-go v1.2.0 // indirect
159153
github.com/pierrec/lz4 v2.5.2+incompatible // indirect

0 commit comments

Comments
 (0)