Skip to content

Commit 029b153

Browse files
authored
Remove data directory for test (milvus-io#14363)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
1 parent 1408926 commit 029b153

9 files changed

Lines changed: 63 additions & 46 deletions

File tree

internal/kv/rocksdb/rocksdb_kv_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package rocksdbkv_test
1818

1919
import (
20+
"os"
2021
"strconv"
2122
"sync"
2223
"testing"
@@ -31,7 +32,7 @@ func TestRocksdbKV(t *testing.T) {
3132
if err != nil {
3233
panic(err)
3334
}
34-
35+
defer os.RemoveAll(name)
3536
defer rocksdbKV.Close()
3637
// Need to call RemoveWithPrefix
3738
defer rocksdbKV.RemoveWithPrefix("")
@@ -92,7 +93,7 @@ func TestRocksdbKV_Prefix(t *testing.T) {
9293
if err != nil {
9394
panic(err)
9495
}
95-
96+
defer os.RemoveAll(name)
9697
defer rocksdbKV.Close()
9798
// Need to call RemoveWithPrefix
9899
defer rocksdbKV.RemoveWithPrefix("")
@@ -150,6 +151,7 @@ func TestRocksdbKV_Goroutines(t *testing.T) {
150151
name := "/tmp/rocksdb"
151152
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
152153
assert.Nil(t, err)
154+
defer os.RemoveAll(name)
153155
defer rocksdbkv.Close()
154156
defer rocksdbkv.RemoveWithPrefix("")
155157

@@ -175,6 +177,7 @@ func TestRocksdbKV_DummyDB(t *testing.T) {
175177
name := "/tmp/rocksdb_dummy"
176178
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
177179
assert.Nil(t, err)
180+
defer os.RemoveAll(name)
178181
defer rocksdbkv.Close()
179182
defer rocksdbkv.RemoveWithPrefix("")
180183

@@ -209,6 +212,7 @@ func TestRocksdbKV_CornerCase(t *testing.T) {
209212
name := "/tmp/rocksdb_corner"
210213
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
211214
assert.Nil(t, err)
215+
defer os.RemoveAll(name)
212216
defer rocksdbkv.Close()
213217
defer rocksdbkv.RemoveWithPrefix("")
214218
_, err = rocksdbkv.Load("")

internal/storage/print_binlog_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func TestPrintBinlogFilesInt64(t *testing.T) {
7373
w.Close()
7474

7575
fd, err := ioutil.TempFile("", "binlog_int64.db")
76+
defer os.RemoveAll(fd.Name())
7677
assert.Nil(t, err)
7778
num, err := fd.Write(buf)
7879
assert.Nil(t, err)
@@ -322,6 +323,9 @@ func TestPrintBinlogFiles(t *testing.T) {
322323
binlogFiles = append(binlogFiles, "test")
323324

324325
PrintBinlogFiles(binlogFiles)
326+
for _, file := range binlogFiles {
327+
_ = os.RemoveAll(file)
328+
}
325329
}
326330

327331
func TestPrintDDFiles(t *testing.T) {
@@ -433,6 +437,10 @@ func TestPrintDDFiles(t *testing.T) {
433437
assert.Equal(t, resultRequests, ddRequests)
434438

435439
PrintBinlogFiles(binlogFiles)
440+
441+
for _, file := range binlogFiles {
442+
_ = os.RemoveAll(file)
443+
}
436444
}
437445

438446
func TestPrintIndexFile(t *testing.T) {

internal/util/mqclient/rmq_consumer.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package mqclient
1313

1414
import (
1515
"sync"
16+
"sync/atomic"
1617

1718
"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
1819
)
@@ -23,7 +24,7 @@ type RmqConsumer struct {
2324
msgChannel chan Message
2425
closeCh chan struct{}
2526
once sync.Once
26-
skip bool
27+
skip int32
2728
}
2829

2930
// Subscription returns the subscription name of this consumer
@@ -44,10 +45,11 @@ func (rc *RmqConsumer) Chan() <-chan Message {
4445
close(rc.msgChannel)
4546
return
4647
}
47-
if !rc.skip {
48+
skip := atomic.LoadInt32(&rc.skip)
49+
if skip != 1 {
4850
rc.msgChannel <- &rmqMessage{msg: msg}
4951
} else {
50-
rc.skip = false
52+
atomic.StoreInt32(&rc.skip, 0)
5153
}
5254
case <-rc.closeCh:
5355
close(rc.msgChannel)
@@ -64,7 +66,9 @@ func (rc *RmqConsumer) Chan() <-chan Message {
6466
func (rc *RmqConsumer) Seek(id MessageID, inclusive bool) error {
6567
msgID := id.(*rmqID).messageID
6668
// skip the first message when consume
67-
rc.skip = !inclusive
69+
if !inclusive {
70+
atomic.StoreInt32(&rc.skip, 1)
71+
}
6872
return rc.c.Seek(msgID)
6973
}
7074

internal/util/rocksmq/client/rocksmq/client_impl_test.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ import (
2121

2222
var rmqPath = "/tmp/rocksmq_client"
2323

24-
func TestMain(m *testing.M) {
25-
os.MkdirAll(rmqPath, os.ModePerm)
26-
defer os.RemoveAll(rmqPath)
27-
os.Exit(m.Run())
28-
}
29-
3024
func TestClient(t *testing.T) {
3125
client, err := NewClient(ClientOptions{})
3226
assert.NotNil(t, client)
@@ -52,8 +46,9 @@ func TestClient_CreateProducer(t *testing.T) {
5246
assert.Nil(t, producer)
5347

5448
/////////////////////////////////////////////////
55-
rmqPath := rmqPath + "/test_client1"
56-
rmq := newRocksMQ(rmqPath)
49+
os.MkdirAll(rmqPath, os.ModePerm)
50+
rmqPathTest := rmqPath + "/test_client1"
51+
rmq := newRocksMQ(t, rmqPathTest)
5752
defer removePath(rmqPath)
5853
client1, err := NewClient(ClientOptions{
5954
Server: rmq,
@@ -91,8 +86,9 @@ func TestClient_Subscribe(t *testing.T) {
9186
assert.Nil(t, consumer)
9287

9388
/////////////////////////////////////////////////
94-
rmqPath := rmqPath + "/test_client2"
95-
rmq := newRocksMQ(rmqPath)
89+
os.MkdirAll(rmqPath, os.ModePerm)
90+
rmqPathTest := rmqPath + "/test_client2"
91+
rmq := newRocksMQ(t, rmqPathTest)
9692
defer removePath(rmqPath)
9793
client1, err := NewClient(ClientOptions{
9894
Server: rmq,
@@ -131,8 +127,9 @@ func TestClient_Subscribe(t *testing.T) {
131127
}
132128

133129
func TestClient_SeekLatest(t *testing.T) {
134-
rmqPath := rmqPath + "/seekLatest"
135-
rmq := newRocksMQ(rmqPath)
130+
os.MkdirAll(rmqPath, os.ModePerm)
131+
rmqPathTest := rmqPath + "/seekLatest"
132+
rmq := newRocksMQ(t, rmqPathTest)
136133
defer removePath(rmqPath)
137134
client, err := NewClient(ClientOptions{
138135
Server: rmq,
@@ -201,8 +198,9 @@ func TestClient_SeekLatest(t *testing.T) {
201198
}
202199

203200
func TestClient_consume(t *testing.T) {
204-
rmqPath := rmqPath + "/test_client3"
205-
rmq := newRocksMQ(rmqPath)
201+
os.MkdirAll(rmqPath, os.ModePerm)
202+
rmqPathTest := rmqPath + "/test_client3"
203+
rmq := newRocksMQ(t, rmqPathTest)
206204
defer removePath(rmqPath)
207205
client, err := NewClient(ClientOptions{
208206
Server: rmq,

internal/util/rocksmq/client/rocksmq/consumer_impl_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
package rocksmq
1313

1414
import (
15+
"os"
1516
"testing"
1617

1718
"github.com/stretchr/testify/assert"
@@ -47,8 +48,9 @@ func TestConsumer_newConsumer(t *testing.T) {
4748
assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
4849

4950
/////////////////////////////////////////////////
50-
rmqPath := rmqPath + "/test_consumer1"
51-
rmq := newRocksMQ(rmqPath)
51+
os.MkdirAll(rmqPath, os.ModePerm)
52+
rmqPathTest := rmqPath + "/test_consumer1"
53+
rmq := newRocksMQ(t, rmqPathTest)
5254
defer removePath(rmqPath)
5355
client, err := newClient(ClientOptions{
5456
Server: rmq,
@@ -117,8 +119,9 @@ func TestConsumer_Subscription(t *testing.T) {
117119
}
118120

119121
func TestConsumer_Seek(t *testing.T) {
120-
rmqPath := rmqPath + "/test_consumer2"
121-
rmq := newRocksMQ(rmqPath)
122+
os.MkdirAll(rmqPath, os.ModePerm)
123+
rmqPathTest := rmqPath + "/test_consumer2"
124+
rmq := newRocksMQ(t, rmqPathTest)
122125
defer removePath(rmqPath)
123126
client, err := newClient(ClientOptions{
124127
Server: rmq,

internal/util/rocksmq/client/rocksmq/reader_impl_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package rocksmq
1313

1414
import (
1515
"context"
16+
"os"
1617
"strconv"
1718
"testing"
1819

@@ -40,8 +41,9 @@ func Test_NewReader(t *testing.T) {
4041
}
4142

4243
func TestReader_Next(t *testing.T) {
43-
rmqPath := rmqPath + "/test_reader"
44-
rmq := newRocksMQ(rmqPath)
44+
os.MkdirAll(rmqPath, os.ModePerm)
45+
rmqPathTest := rmqPath + "/test_reader"
46+
rmq := newRocksMQ(t, rmqPathTest)
4547
defer removePath(rmqPath)
4648
client, err := newClient(ClientOptions{
4749
Server: rmq,

internal/util/rocksmq/client/rocksmq/test_helper.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ package rocksmq
1414
import (
1515
"fmt"
1616
"os"
17+
"testing"
1718
"time"
1819

1920
"github.com/milvus-io/milvus/internal/log"
2021
"github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
2122
server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
23+
"github.com/stretchr/testify/assert"
2224

2325
"go.uber.org/zap"
2426
)
@@ -43,26 +45,23 @@ func newMockClient() *client {
4345
return client
4446
}
4547

46-
func newRocksMQ(rmqPath string) server.RocksMQ {
47-
rocksdbPath := rmqPath + "_db"
48-
rmq, _ := rocksmq.NewRocksMQ(rocksdbPath, nil)
48+
func newRocksMQ(t *testing.T, rmqPath string) server.RocksMQ {
49+
rocksdbPath := rmqPath
50+
rmq, err := rocksmq.NewRocksMQ(rocksdbPath, nil)
51+
assert.NoError(t, err)
4952
return rmq
5053
}
5154

5255
func removePath(rmqPath string) {
53-
kvPath := rmqPath + "_kv"
54-
err := os.RemoveAll(kvPath)
56+
// remove path rocksmq created
57+
rocksdbPath := rmqPath
58+
err := os.RemoveAll(rocksdbPath)
5559
if err != nil {
56-
log.Error("Failed to call os.removeAll.", zap.Any("path", kvPath))
57-
}
58-
rocksdbPath := rmqPath + "_db"
59-
err = os.RemoveAll(rocksdbPath)
60-
if err != nil {
61-
log.Error("Failed to call os.removeAll.", zap.Any("path", kvPath))
60+
log.Error("Failed to call os.removeAll.", zap.Any("path", rocksdbPath))
6261
}
6362
metaPath := rmqPath + "_meta_kv"
6463
err = os.RemoveAll(metaPath)
6564
if err != nil {
66-
log.Error("Failed to call os.removeAll.", zap.Any("path", kvPath))
65+
log.Error("Failed to call os.removeAll.", zap.Any("path", metaPath))
6766
}
6867
}

internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
var Params paramtable.BaseTable
3535
var rmqPath = "/tmp/rocksmq"
3636
var kvPathSuffix = "_kv"
37-
var dbPathSuffix = "_db"
3837
var metaPathSuffix = "_meta"
3938

4039
func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
@@ -70,7 +69,7 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
7069
defer os.RemoveAll(kvPath)
7170
idAllocator := InitIDAllocator(kvPath)
7271

73-
rocksdbPath := rmqPath + dbPathSuffix + suffix
72+
rocksdbPath := rmqPath + suffix
7473
defer os.RemoveAll(rocksdbPath + kvSuffix)
7574
defer os.RemoveAll(rocksdbPath)
7675

@@ -135,7 +134,7 @@ func TestRocksmq_Basic(t *testing.T) {
135134
defer os.RemoveAll(kvPath)
136135
idAllocator := InitIDAllocator(kvPath)
137136

138-
rocksdbPath := rmqPath + dbPathSuffix + suffix
137+
rocksdbPath := rmqPath + suffix
139138
defer os.RemoveAll(rocksdbPath + kvSuffix)
140139
defer os.RemoveAll(rocksdbPath)
141140
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
@@ -190,7 +189,7 @@ func TestRocksmq_Dummy(t *testing.T) {
190189
defer os.RemoveAll(kvPath)
191190
idAllocator := InitIDAllocator(kvPath)
192191

193-
rocksdbPath := rmqPath + dbPathSuffix + suffix
192+
rocksdbPath := rmqPath + suffix
194193
defer os.RemoveAll(rocksdbPath + kvSuffix)
195194
defer os.RemoveAll(rocksdbPath)
196195

@@ -260,7 +259,7 @@ func TestRocksmq_Seek(t *testing.T) {
260259
defer os.RemoveAll(kvPath)
261260
idAllocator := InitIDAllocator(kvPath)
262261

263-
rocksdbPath := rmqPath + dbPathSuffix + suffix
262+
rocksdbPath := rmqPath + suffix
264263
defer os.RemoveAll(rocksdbPath + kvSuffix)
265264
defer os.RemoveAll(rocksdbPath)
266265

internal/util/rocksmq/server/rocksmq/rocksmq_retention_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestRmqRetention_Basic(t *testing.T) {
4444
atomic.StoreInt64(&RocksmqPageSize, 10)
4545
atomic.StoreInt64(&TickerTimeInSeconds, 2)
4646

47-
rocksdbPath := retentionPath + dbPathSuffix
47+
rocksdbPath := retentionPath
4848
defer os.RemoveAll(rocksdbPath)
4949
metaPath := retentionPath + metaPathSuffix
5050
defer os.RemoveAll(metaPath)
@@ -136,7 +136,7 @@ func TestRmqRetention_NotConsumed(t *testing.T) {
136136
atomic.StoreInt64(&RocksmqPageSize, 10)
137137
atomic.StoreInt64(&TickerTimeInSeconds, 2)
138138

139-
rocksdbPath := retentionPath + dbPathSuffix
139+
rocksdbPath := retentionPath
140140
defer os.RemoveAll(rocksdbPath)
141141
metaPath := retentionPath + metaPathSuffix
142142
defer os.RemoveAll(metaPath)
@@ -400,7 +400,7 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) {
400400
defer os.RemoveAll(kvPath)
401401
idAllocator := InitIDAllocator(kvPath)
402402

403-
rocksdbPath := retentionPath + dbPathSuffix + suffix
403+
rocksdbPath := retentionPath + suffix
404404
defer os.RemoveAll(rocksdbPath)
405405
metaPath := retentionPath + metaPathSuffix + suffix
406406

0 commit comments

Comments
 (0)