Skip to content

Commit 9e1b8a1

Browse files
committed
tests: RocksDB init, Raft, exec result & schema
Update tests and logging to use new RocksDbInit/open path and single-node Raft initialization; adapt to ExecutionResult API and topic schema changes. Tests in kalamdb-core now open RocksDB via RocksDbInit (returns Arc), start and initialize the Raft executor and wire appliers so DDL works in single-node test contexts, and switch pattern matches from ExecutionResult::Query to ExecutionResult::Rows with explicit RecordBatch typing. Added a new typed handlers test helper and placeholder test; adjusted topic_pubsub integration tests to expect an additional `op` column (schema length 7). Also tightened logging formatter (compact, show level, thread names, disable thread ids) and added a new admin-ui V2 implementation plan document. Minor import cleanups and small SQL/schema tweaks in tests.
1 parent 2fcb5c3 commit 9e1b8a1

6 files changed

Lines changed: 596 additions & 29 deletions

File tree

backend/crates/kalamdb-core/tests/test_cte_support.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,11 @@
88
//! - CTEs with JOINs
99
//! - CTEs with filtering
1010
11-
use kalamdb_commons::{Role, UserId, UserName, NodeId};
11+
use kalamdb_commons::{Role, UserId, NodeId};
1212
use kalamdb_core::app_context::AppContext;
1313
use kalamdb_core::sql::context::{ExecutionContext, ExecutionResult};
1414
use kalamdb_core::sql::executor::SqlExecutor;
15-
use kalamdb_session::AuthSession;
16-
use kalamdb_store::RocksDBBackend;
15+
use kalamdb_store::{RocksDBBackend, RocksDbInit};
1716
use kalamdb_configs::ServerConfig;
1817
use std::sync::Arc;
1918
use tempfile::TempDir;
@@ -24,7 +23,8 @@ async fn create_test_app_context() -> (Arc<AppContext>, TempDir) {
2423
let rocksdb_path = temp_dir.path().join("rocksdb");
2524
std::fs::create_dir_all(&rocksdb_path).expect("Failed to create rocksdb directory");
2625

27-
let db = rocksdb::DB::open_default(&rocksdb_path).expect("Failed to open RocksDB");
26+
let init = RocksDbInit::with_defaults(rocksdb_path.to_str().unwrap());
27+
let db = init.open().expect("Failed to open RocksDB"); // Returns Arc<DB>
2828
let backend = Arc::new(RocksDBBackend::new(db));
2929
let config = ServerConfig::default();
3030
let node_id = NodeId::new(1);
@@ -36,13 +36,18 @@ async fn create_test_app_context() -> (Arc<AppContext>, TempDir) {
3636
config,
3737
);
3838

39+
// Initialize Raft for single-node mode (required for DDL operations)
40+
app_context.executor().start().await.expect("Failed to start Raft");
41+
app_context.executor().initialize_cluster().await.expect("Failed to initialize Raft cluster");
42+
app_context.wire_raft_appliers();
43+
3944
(app_context, temp_dir)
4045
}
4146

4247
/// Helper to create ExecutionContext with username
4348
fn create_exec_context_with_app_context(
4449
app_context: Arc<AppContext>,
45-
username: &str,
50+
_username: &str,
4651
user_id: &str,
4752
role: Role,
4853
) -> ExecutionContext {
@@ -73,7 +78,7 @@ async fn setup_test_table(
7378
// Create a test table
7479
executor
7580
.execute(
76-
"CREATE USER TABLE test_ns.employees (id INT, name TEXT, department TEXT, salary INT)",
81+
"CREATE USER TABLE test_ns.employees (id INT PRIMARY KEY, name TEXT, department TEXT, salary INT)",
7782
exec_ctx,
7883
vec![],
7984
)
@@ -138,8 +143,8 @@ async fn test_simple_cte() {
138143
let exec_result = result.unwrap();
139144

140145
match exec_result {
141-
ExecutionResult::Query(batches) => {
142-
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
146+
ExecutionResult::Rows { batches, .. } => {
147+
let total_rows: usize = batches.iter().map(|b: &arrow::array::RecordBatch| b.num_rows()).sum();
143148
assert_eq!(total_rows, 3, "Should return 3 high earners (Alice, Bob, Diana)");
144149
},
145150
_ => panic!("Expected Query result, got: {:?}", exec_result),
@@ -184,8 +189,8 @@ async fn test_cte_with_aggregation() {
184189
let exec_result = result.unwrap();
185190

186191
match exec_result {
187-
ExecutionResult::Query(batches) => {
188-
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
192+
ExecutionResult::Rows { batches, .. } => {
193+
let total_rows: usize = batches.iter().map(|b: &arrow::array::RecordBatch| b.num_rows()).sum();
189194
assert_eq!(total_rows, 3, "Should return 3 departments");
190195
},
191196
_ => panic!("Expected Query result, got: {:?}", exec_result),
@@ -240,8 +245,8 @@ async fn test_multiple_ctes() {
240245
let exec_result = result.unwrap();
241246

242247
match exec_result {
243-
ExecutionResult::Query(batches) => {
244-
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
248+
ExecutionResult::Rows { batches, .. } => {
249+
let total_rows: usize = batches.iter().map(|b: &arrow::array::RecordBatch| b.num_rows()).sum();
245250
assert_eq!(total_rows, 4, "Should return 4 employees (2 Engineering + 2 Sales)");
246251
},
247252
_ => panic!("Expected Query result, got: {:?}", exec_result),
@@ -287,8 +292,8 @@ async fn test_chained_ctes() {
287292
let exec_result = result.unwrap();
288293

289294
match exec_result {
290-
ExecutionResult::Query(batches) => {
291-
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
295+
ExecutionResult::Rows { batches, .. } => {
296+
let total_rows: usize = batches.iter().map(|b: &arrow::array::RecordBatch| b.num_rows()).sum();
292297
assert_eq!(total_rows, 3, "Should return 3 high earners");
293298
},
294299
_ => panic!("Expected Query result, got: {:?}", exec_result),
@@ -299,6 +304,7 @@ async fn test_chained_ctes() {
299304
// CTE WITH FILTERING AND ORDERING TESTS
300305
// ============================================================================
301306

307+
302308
#[tokio::test]
303309
#[ntest::timeout(60000)]
304310
async fn test_cte_with_where_clause() {
@@ -336,8 +342,8 @@ async fn test_cte_with_where_clause() {
336342
let exec_result = result.unwrap();
337343

338344
match exec_result {
339-
ExecutionResult::Query(batches) => {
340-
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
345+
ExecutionResult::Rows { batches, .. } => {
346+
let total_rows: usize = batches.iter().map(|b: &arrow::array::RecordBatch| b.num_rows()).sum();
341347
assert_eq!(total_rows, 3, "Should return 3 employees (Alice, Bob, Diana)");
342348
},
343349
_ => panic!("Expected Query result, got: {:?}", exec_result),
@@ -379,8 +385,8 @@ async fn test_cte_with_limit() {
379385
let exec_result = result.unwrap();
380386

381387
match exec_result {
382-
ExecutionResult::Query(batches) => {
383-
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
388+
ExecutionResult::Rows { batches, .. } => {
389+
let total_rows: usize = batches.iter().map(|b: &arrow::array::RecordBatch| b.num_rows()).sum();
384390
assert_eq!(total_rows, 2, "Should return top 2 earners");
385391
},
386392
_ => panic!("Expected Query result, got: {:?}", exec_result),
@@ -391,6 +397,7 @@ async fn test_cte_with_limit() {
391397
// ERROR HANDLING TESTS
392398
// ============================================================================
393399

400+
394401
#[tokio::test]
395402
#[ntest::timeout(60000)]
396403
async fn test_cte_syntax_error() {

backend/crates/kalamdb-core/tests/test_information_schema_columns.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! registered and can be queried via SQL.
55
66
use kalamdb_core::app_context::AppContext;
7-
use kalamdb_store::RocksDBBackend;
7+
use kalamdb_store::{RocksDBBackend, RocksDbInit};
88
use kalamdb_configs::ServerConfig;
99
use kalamdb_commons::NodeId;
1010
use std::sync::Arc;
@@ -16,7 +16,8 @@ async fn create_test_app_context() -> (Arc<AppContext>, TempDir) {
1616
let rocksdb_path = temp_dir.path().join("rocksdb");
1717
std::fs::create_dir_all(&rocksdb_path).expect("Failed to create rocksdb directory");
1818

19-
let db = rocksdb::DB::open_default(&rocksdb_path).expect("Failed to open RocksDB");
19+
let init = RocksDbInit::with_defaults(rocksdb_path.to_str().unwrap());
20+
let db = init.open().expect("Failed to open RocksDB"); // Returns Arc<DB>
2021
let backend = Arc::new(RocksDBBackend::new(db));
2122
let config = ServerConfig::default();
2223
let node_id = NodeId::new(1);
@@ -28,6 +29,11 @@ async fn create_test_app_context() -> (Arc<AppContext>, TempDir) {
2829
config,
2930
);
3031

32+
// Initialize Raft for single-node mode (required for DDL operations)
33+
app_context.executor().start().await.expect("Failed to start Raft");
34+
app_context.executor().initialize_cluster().await.expect("Failed to initialize Raft cluster");
35+
app_context.wire_raft_appliers();
36+
3137
(app_context, temp_dir)
3238
}
3339

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,54 @@
1-
let db = ...; // Assuming db is initialized elsewhere
2-
let backend = Arc::new(RocksDBBackend::new(Arc::new(db))); // Updated
1+
//! Tests for typed SQL handlers
2+
//!
3+
//! This module contains integration tests for type-safe SQL query handling.
4+
5+
use kalamdb_commons::{Role, UserId};
6+
use kalamdb_core::app_context::AppContext;
7+
use kalamdb_core::sql::context::ExecutionContext;
8+
use kalamdb_core::sql::executor::SqlExecutor;
9+
use kalamdb_store::{RocksDBBackend, RocksDbInit};
10+
use kalamdb_configs::ServerConfig;
11+
use std::sync::Arc;
12+
use tempfile::TempDir;
13+
14+
/// Helper to create AppContext with temporary RocksDB for testing
15+
async fn create_test_app_context() -> (Arc<AppContext>, TempDir) {
16+
let temp_dir = TempDir::new().expect("Failed to create temp directory");
17+
let rocksdb_path = temp_dir.path().join("rocksdb");
18+
std::fs::create_dir_all(&rocksdb_path).expect("Failed to create rocksdb directory");
19+
20+
let init = RocksDbInit::with_defaults(rocksdb_path.to_str().unwrap());
21+
let db = init.open().expect("Failed to open RocksDB"); // Returns Arc<DB>
22+
let backend = Arc::new(RocksDBBackend::new(db));
23+
let config = ServerConfig::default();
24+
let node_id = kalamdb_commons::NodeId::new(1);
25+
26+
let app_context = AppContext::create_isolated(
27+
backend,
28+
node_id,
29+
rocksdb_path.to_string_lossy().into_owned(),
30+
config,
31+
);
32+
33+
// Initialize Raft for single-node mode (required for DDL operations)
34+
app_context.executor().start().await.expect("Failed to start Raft");
35+
app_context.executor().initialize_cluster().await.expect("Failed to initialize Raft cluster");
36+
app_context.wire_raft_appliers();
37+
38+
(app_context, temp_dir)
39+
}
40+
41+
/// Helper to create ExecutionContext
42+
fn create_exec_context(app_context: Arc<AppContext>, user_id: &str, role: Role) -> ExecutionContext {
43+
let user_id = UserId::new(user_id.to_string());
44+
let base_session = app_context.base_session_context();
45+
ExecutionContext::new(user_id, role, base_session)
46+
}
47+
48+
#[tokio::test]
49+
#[ntest::timeout(60000)]
50+
async fn test_placeholder() {
51+
// Placeholder test to ensure the module compiles
52+
let _result = 1 + 1;
53+
assert_eq!(_result, 2);
54+
}

backend/src/logging.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use opentelemetry_otlp::WithExportConfig;
1717
use opentelemetry_sdk::trace::SdkTracerProvider;
1818
use opentelemetry_sdk::Resource;
1919
use tracing_subscriber::filter::filter_fn;
20-
use tracing_subscriber::fmt::format::FmtSpan;
20+
use tracing_subscriber::fmt::format::{FmtSpan, Format};
21+
use tracing_subscriber::fmt::time::SystemTime;
22+
use tracing_subscriber::fmt;
2123
use tracing_subscriber::layer::SubscriberExt;
2224
use tracing_subscriber::util::SubscriberInitExt;
2325
use tracing_subscriber::{EnvFilter, Layer};
@@ -143,7 +145,10 @@ pub fn init_logging(
143145
tracing_subscriber::fmt::layer()
144146
.with_ansi(true)
145147
.with_target(true)
148+
.with_level(true)
146149
.with_thread_names(true)
150+
.with_thread_ids(false)
151+
.compact()
147152
.with_span_events(FmtSpan::NONE) // Change to CLOSE to show span timing
148153
.with_filter(build_env_filter(level, target_levels)?),
149154
)
@@ -169,7 +174,10 @@ pub fn init_logging(
169174
.with_ansi(false)
170175
.with_writer(log_file)
171176
.with_target(true)
177+
.with_level(true)
172178
.with_thread_names(true)
179+
.with_thread_ids(false)
180+
.compact()
173181
.with_span_events(FmtSpan::NONE) // Change to CLOSE to show span timing
174182
.with_filter(build_env_filter(level, target_levels)?);
175183
layer.boxed()

backend/tests/integration_tests/topic_pubsub.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -277,15 +277,15 @@ async fn test_cdc_insert_to_consume_workflow() {
277277

278278
assert!(result.status == ResponseStatus::Success, "CONSUME failed: {:?}", result.error);
279279

280-
// 6. Verify we got results (schema: topic, partition, offset, key, payload, timestamp_ms)
280+
// 6. Verify we got results (schema: topic, partition, offset, key, payload, timestamp_ms, op)
281281
assert!(!result.results.is_empty(), "CONSUME should return batches");
282282

283283
if let Some(first_batch) = result.results.first() {
284-
// Check we have the expected 6 columns from topic_message_schema
284+
// Check we have the expected 7 columns from topic_message_schema
285285
assert_eq!(
286286
first_batch.schema.len(),
287-
6,
288-
"Should have 6 schema fields (topic, partition, offset, key, payload, timestamp_ms)"
287+
7,
288+
"Should have 7 schema fields (topic, partition, offset, key, payload, timestamp_ms, op)"
289289
);
290290

291291
// Verify column names match schema
@@ -295,6 +295,7 @@ async fn test_cdc_insert_to_consume_workflow() {
295295
assert_eq!(first_batch.schema[3].name, "key");
296296
assert_eq!(first_batch.schema[4].name, "payload");
297297
assert_eq!(first_batch.schema[5].name, "timestamp_ms");
298+
assert_eq!(first_batch.schema[6].name, "op");
298299

299300
// Should have at least 2 rows (2 INSERTs)
300301
assert!(
@@ -329,8 +330,8 @@ async fn test_consume_schema_structure() {
329330
if !result.results.is_empty() {
330331
let batch = &result.results[0];
331332

332-
// Must have exactly 6 schema fields
333-
assert_eq!(batch.schema.len(), 6, "Topic message schema must have 6 fields");
333+
// Must have exactly 7 schema fields
334+
assert_eq!(batch.schema.len(), 7, "Topic message schema must have 7 fields");
334335

335336
// Verify field names and order
336337
let expected_fields = vec![
@@ -340,6 +341,7 @@ async fn test_consume_schema_structure() {
340341
"key",
341342
"payload",
342343
"timestamp_ms",
344+
"op",
343345
];
344346
for (i, expected_name) in expected_fields.iter().enumerate() {
345347
assert_eq!(

0 commit comments

Comments
 (0)