Skip to content

Commit ec01d77

Browse files
Fix DbClientService for Mongo DbClient (helidon-io#9102)
Fixes helidon-io#9101
1 parent 8bd5d41 commit ec01d77

3 files changed

Lines changed: 90 additions & 13 deletions

File tree

dbclient/mongodb/src/main/java/io/helidon/dbclient/mongodb/MongoDbStatementDml.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2019, 2023 Oracle and/or its affiliates.
2+
* Copyright (c) 2019, 2024 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,6 +63,7 @@ public long execute() {
6363
"Statement operation not yet supported: %s",
6464
type.name()));
6565
};
66+
future.complete(result);
6667
LOGGER.log(System.Logger.Level.DEBUG, () -> String.format(
6768
"%s DML %s execution succeeded",
6869
type.name(),

dbclient/mongodb/src/main/java/io/helidon/dbclient/mongodb/MongoDbStatementQuery.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2019, 2023 Oracle and/or its affiliates.
2+
* Copyright (c) 2019, 2024 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717

1818
import java.lang.System.Logger.Level;
1919
import java.util.Spliterator;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.stream.Stream;
2122
import java.util.stream.StreamSupport;
2223

@@ -28,6 +29,7 @@
2829

2930
import com.mongodb.client.FindIterable;
3031
import com.mongodb.client.MongoCollection;
32+
import com.mongodb.client.MongoCursor;
3133
import com.mongodb.client.MongoDatabase;
3234
import org.bson.Document;
3335

@@ -62,8 +64,8 @@ public Stream<DbRow> execute() {
6264
try {
6365
MongoStatement stmt = queryOrCommand(preparedStmt);
6466
return switch (stmt.getOperation()) {
65-
case QUERY -> executeQuery(stmt);
66-
case COMMAND -> executeCommand(stmt);
67+
case QUERY -> executeQuery(stmt, future);
68+
case COMMAND -> executeCommand(stmt, future);
6769
default -> throw new UnsupportedOperationException(String.format(
6870
"Operation %s is not supported by query", stmt.getOperation().toString()));
6971
};
@@ -87,14 +89,15 @@ private MongoStatement queryOrCommand(String statement) {
8789
}
8890
}
8991

90-
private Stream<DbRow> executeCommand(MongoStatement stmt) {
92+
private Stream<DbRow> executeCommand(MongoStatement stmt, CompletableFuture<Long> future) {
9193
Document command = stmt.getQuery();
9294
LOGGER.log(Level.DEBUG, () -> String.format("Command: %s", command.toString()));
9395
Document doc = db().runCommand(command);
96+
future.complete(1L);
9497
return Stream.of(new MongoDbRow(doc, context()));
9598
}
9699

97-
private Stream<DbRow> executeQuery(MongoStatement stmt) {
100+
private Stream<DbRow> executeQuery(MongoStatement stmt, CompletableFuture<Long> future) {
98101
MongoCollection<Document> mc = db().getCollection(stmt.getCollection());
99102
Document query = stmt.getQuery();
100103
Document projection = stmt.getProjection();
@@ -109,7 +112,9 @@ private Stream<DbRow> executeQuery(MongoStatement stmt) {
109112
finder = finder.projection(projection);
110113
}
111114

112-
Spliterator<Document> spliterator = spliteratorUnknownSize(finder.iterator(), Spliterator.ORDERED);
115+
MongoCursor<Document> it = finder.iterator();
116+
future.complete(it.hasNext() ? 1L : 0L);
117+
Spliterator<Document> spliterator = spliteratorUnknownSize(it, Spliterator.ORDERED);
113118
Stream<Document> stream = StreamSupport.stream(spliterator, false);
114119
return stream.map(doc -> new MongoDbRow(doc, context()));
115120
}

dbclient/mongodb/src/test/java/io/helidon/dbclient/mongodb/MongoDbClientTest.java

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021, 2023 Oracle and/or its affiliates.
2+
* Copyright (c) 2021, 2024 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,34 +16,39 @@
1616
package io.helidon.dbclient.mongodb;
1717

1818
import java.lang.System.Logger.Level;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.CompletionStage;
21+
import java.util.function.Consumer;
1922

23+
import io.helidon.dbclient.DbClientService;
24+
import io.helidon.dbclient.DbClientServiceContext;
2025
import io.helidon.dbclient.DbExecute;
2126

2227
import com.mongodb.client.MongoClient;
2328
import com.mongodb.client.MongoCollection;
2429
import com.mongodb.client.MongoDatabase;
2530

31+
import org.bson.Document;
2632
import org.junit.jupiter.api.BeforeAll;
2733
import org.junit.jupiter.api.Test;
2834
import org.mockito.Mockito;
2935

3036
import static org.hamcrest.MatcherAssert.assertThat;
3137
import static org.hamcrest.Matchers.notNullValue;
38+
import static org.hamcrest.Matchers.is;
3239
import static org.junit.jupiter.api.Assertions.fail;
3340
import static org.mockito.ArgumentMatchers.any;
3441
import static org.mockito.Mockito.when;
3542

36-
public class MongoDbClientTest {
43+
@SuppressWarnings("resource")
44+
class MongoDbClientTest {
3745

3846
private static final System.Logger LOGGER = System.getLogger(MongoDbClientTest.class.getName());
3947
private static MongoDbClient dbClient;
4048

4149
@BeforeAll
4250
static void setup() {
43-
MongoClient client = Mockito.mock(MongoClient.class);
44-
MongoDatabase db = Mockito.mock(MongoDatabase.class);
45-
when(db.runCommand(any())).thenReturn(MongoDbStatement.EMPTY);
46-
dbClient = new MongoDbClient(new MongoDbClientBuilder(), client, db);
51+
dbClient = createClient(null);
4752
}
4853

4954
@Test
@@ -85,4 +90,70 @@ void testUnsupportedUnwrapExecutorClass() {
8590
exec.query("{\"operation\": \"command\", \"query\": { ping: 1 }}");
8691
}
8792

93+
@Test
94+
void testDbClientServiceQuery() {
95+
TestDbClientService service = new TestDbClientService();
96+
MongoDbClient dbClient = createClient(builder -> builder.addService(service));
97+
DbExecute exec = dbClient.execute();
98+
long ignored = exec.query("{\"operation\": \"command\", \"query\": { ping: 1 }}").count();
99+
assertThat(service.resultFuture.isDone(), is(true));
100+
assertThat(service.resultFuture.isCompletedExceptionally(), is(false));
101+
assertThat(service.statementFuture.isDone(), is(true));
102+
assertThat(service.statementFuture.isCompletedExceptionally(), is(false));
103+
}
104+
105+
@Test
106+
void testDbClientServiceDml() {
107+
TestDbClientService service = new TestDbClientService();
108+
MongoDbClient dbClient = createClient(builder -> builder.addService(service));
109+
DbExecute exec = dbClient.execute();
110+
long ignored = exec.insert("{"
111+
+ "\"collection\": \"foo\","
112+
+ "\"operation\": \"insert\","
113+
+ "\"value\": { \"name\": \"bar\" }"
114+
+ "}");
115+
assertThat(service.resultFuture.isDone(), is(true));
116+
assertThat(service.resultFuture.isCompletedExceptionally(), is(false));
117+
assertThat(service.statementFuture.isDone(), is(true));
118+
assertThat(service.statementFuture.isCompletedExceptionally(), is(false));
119+
}
120+
121+
@SuppressWarnings("unchecked")
122+
static MongoDbClient createClient(Consumer<MongoDbClientBuilder> consumer) {
123+
MongoClient client = Mockito.mock(MongoClient.class);
124+
MongoDatabase db = Mockito.mock(MongoDatabase.class);
125+
MongoCollection<Document> collection = Mockito.mock(MongoCollection.class);
126+
when(db.getCollection(any())).thenReturn(collection);
127+
when(db.runCommand(any())).thenReturn(MongoDbStatement.EMPTY);
128+
MongoDbClientBuilder builder = new MongoDbClientBuilder();
129+
if (consumer != null) {
130+
consumer.accept(builder);
131+
}
132+
return new MongoDbClient(builder, client, db);
133+
}
134+
135+
record TestDbClientService(CompletableFuture<Long> resultFuture,
136+
CompletableFuture<Void> statementFuture) implements DbClientService {
137+
138+
TestDbClientService() {
139+
this(new CompletableFuture<>(), new CompletableFuture<>());
140+
}
141+
142+
@Override
143+
public DbClientServiceContext statement(DbClientServiceContext context) {
144+
setup(context.resultFuture(), resultFuture);
145+
setup(context.statementFuture(), statementFuture);
146+
return context;
147+
}
148+
149+
static <T> void setup(CompletionStage<T> stage, CompletableFuture<T> future) {
150+
stage.whenComplete((v, ex) -> {
151+
if (ex != null) {
152+
future.completeExceptionally(ex);
153+
} else {
154+
future.complete(v);
155+
}
156+
});
157+
}
158+
}
88159
}

0 commit comments

Comments
 (0)