Skip to content

Commit 6ec0a84

Browse files
committed
Improve restart recovery, snowflake batching, & CI
Introduce restart-recovery improvements and various optimizations across the codebase. Key changes: reconcile and persist in-memory state-machine progress from persisted last_applied (KalamRaftStorage, RaftGroup, RaftManager) and emit clearer replication timeout details; commit last_applied while applying entries and return errors for apply failures to avoid silent progress. Add SnowflakeGenerator::next_ids_mapped to avoid intermediate allocations and wire it into system SeqId generation + new benchmarks and tests for mapped and concurrent generation. Fix embedded UI asset serving to avoid unnecessary copies, tweak websocket heartbeat handling, add deterministic initial-row sorting and tests, and adjust health monitor trimming logic. CI and developer tooling: set test env vars and add verification for TypeScript package test coverage in workflows, add new TypeScript ORM files and UI component updates, and change Docker base image to Ubuntu 24.04.
1 parent b27b6d5 commit 6ec0a84

59 files changed

Lines changed: 3746 additions & 2241 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.config/nextest.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ test-threads = 15
88
# other, while still allowing unrelated tests to use the remaining default
99
# concurrency budget.
1010
stateful-heavy = { max-threads = 1 }
11+
proxied-reconnect = { max-threads = 2 }
1112

1213
[[profile.default.overrides]]
1314
filter = 'test(test_scenario_08_subscription_reconnect)'
@@ -85,7 +86,7 @@ test-group = "stateful-heavy"
8586

8687
[[profile.default.overrides]]
8788
filter = 'test(test_large_initial_snapshot_survives_repeated_outages)'
88-
test-group = "stateful-heavy"
89+
test-group = "proxied-reconnect"
8990

9091
[[profile.default.overrides]]
9192
filter = 'test(test_latency_spike_during_initial_snapshot_recovers)'
@@ -97,7 +98,7 @@ test-group = "stateful-heavy"
9798

9899
[[profile.default.overrides]]
99100
filter = 'test(test_loading_snapshot_with_live_writes_resumes_without_duplicate_rows)'
100-
test-group = "stateful-heavy"
101+
test-group = "proxied-reconnect"
101102

102103
[[profile.default.overrides]]
103104
filter = 'test(test_proxy_three_subscriptions_resume_after_server_bounce)'

.github/workflows/orm.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,5 +88,7 @@ jobs:
8888
- name: Run integration tests
8989
working-directory: link/sdks/typescript/orm
9090
env:
91+
KALAMDB_TEST_URL: http://localhost:8088
92+
KALAMDB_TEST_USER: admin
9193
KALAMDB_TEST_PASSWORD: testpass123
9294
run: node --test tests/driver.test.mjs tests/generate.test.mjs tests/cli.test.mjs tests/live.test.mjs

.github/workflows/sdks.yml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ jobs:
191191
chmod +x ./kalamdb-server
192192
echo "SDK test source: release-binary"
193193
194-
- name: Run TypeScript SDK tests
194+
- name: Run TypeScript client, consumer, and ORM npm package tests
195195
id: run_typescript_tests
196196
continue-on-error: true
197197
shell: bash
@@ -207,6 +207,19 @@ jobs:
207207
set -euo pipefail
208208
./scripts/test-typescript-sdk-release.sh
209209
210+
- name: Verify TypeScript npm package test coverage
211+
if: always()
212+
shell: bash
213+
run: |
214+
set -euo pipefail
215+
output="ts-sdk-test-output.txt"
216+
for package in client consumer orm; do
217+
if ! grep -q "Running @kalamdb/${package} tests" "$output"; then
218+
echo "Missing @kalamdb/${package} npm package test run in ${output}" >&2
219+
exit 1
220+
fi
221+
done
222+
210223
- name: Parse TypeScript SDK test counts
211224
if: always()
212225
id: parse_typescript_badge

backend/crates/kalamdb-api/src/ui/embedded.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
//! 1. Run `npm run build` in the `ui/` directory
88
//! 2. Rebuild the server with `cargo build`
99
10+
use std::borrow::Cow;
11+
1012
use actix_web::{web, HttpRequest, HttpResponse};
1113
use log::debug;
1214
use rust_embed::Embed;
@@ -22,6 +24,13 @@ use super::UiRuntimeConfig;
2224
#[exclude = "kalam_client_bg.wasm"]
2325
struct UiAssets;
2426

27+
fn embedded_asset_body(data: Cow<'static, [u8]>) -> web::Bytes {
28+
match data {
29+
Cow::Borrowed(bytes) => web::Bytes::from_static(bytes),
30+
Cow::Owned(bytes) => web::Bytes::from(bytes),
31+
}
32+
}
33+
2534
/// Serve embedded UI assets
2635
///
2736
/// Handles requests to /ui/* and serves the appropriate static file.
@@ -43,7 +52,7 @@ pub async fn serve_embedded_ui(req: HttpRequest) -> HttpResponse {
4352

4453
debug!("[embedded_ui] Found file: {} (mime: {})", path, mime_type);
4554

46-
return HttpResponse::Ok().content_type(mime_type).body(content.data.into_owned());
55+
return HttpResponse::Ok().content_type(mime_type).body(embedded_asset_body(content.data));
4756
}
4857

4958
debug!("[embedded_ui] File not found: {}, falling back to index.html", path);
@@ -53,7 +62,7 @@ pub async fn serve_embedded_ui(req: HttpRequest) -> HttpResponse {
5362
if let Some(index) = UiAssets::get("index.html") {
5463
return HttpResponse::Ok()
5564
.content_type("text/html; charset=utf-8")
56-
.body(index.data.into_owned());
65+
.body(embedded_asset_body(index.data));
5766
}
5867

5968
// No UI built - show helpful message

backend/crates/kalamdb-api/src/ws/runtime.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,14 +113,12 @@ pub(super) async fn run_websocket(
113113
msg = msg_stream.next() => {
114114
match msg {
115115
Some(Ok(Message::Ping(bytes))) => {
116-
record_activity_now();
117116
connection_state.update_heartbeat();
118117
if session.pong(&bytes).await.is_err() {
119118
break;
120119
}
121120
}
122121
Some(Ok(Message::Pong(_))) => {
123-
record_activity_now();
124122
connection_state.update_heartbeat();
125123
}
126124
Some(Ok(Message::Text(text))) => {

backend/crates/kalamdb-commons/benches/snowflake_generator.rs

Lines changed: 120 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use std::{
22
hint::black_box,
3-
sync::{atomic::{AtomicU64, Ordering}, Arc},
3+
sync::{
4+
atomic::{AtomicU64, Ordering},
5+
Arc,
6+
},
47
thread,
58
time::{Duration, SystemTime, UNIX_EPOCH},
69
};
710

8-
use criterion::{
9-
criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput,
10-
};
11+
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
1112
use kalamdb_commons::ids::SnowflakeGenerator;
1213
use parking_lot::Mutex;
1314

@@ -160,13 +161,17 @@ fn bench_batch_generation(c: &mut Criterion) {
160161
for batch_size in [32_usize, 256, 1024] {
161162
group.throughput(Throughput::Elements(batch_size as u64));
162163

163-
group.bench_with_input(BenchmarkId::new("optimized", batch_size), &batch_size, |b, &size| {
164-
b.iter_batched(
165-
|| SnowflakeGenerator::new(1),
166-
|generator| black_box(generator.next_ids(size).expect("optimized next_ids")),
167-
BatchSize::SmallInput,
168-
);
169-
});
164+
group.bench_with_input(
165+
BenchmarkId::new("optimized", batch_size),
166+
&batch_size,
167+
|b, &size| {
168+
b.iter_batched(
169+
|| SnowflakeGenerator::new(1),
170+
|generator| black_box(generator.next_ids(size).expect("optimized next_ids")),
171+
BatchSize::SmallInput,
172+
);
173+
},
174+
);
170175

171176
group.bench_with_input(BenchmarkId::new("legacy", batch_size), &batch_size, |b, &size| {
172177
b.iter_batched(
@@ -180,78 +185,128 @@ fn bench_batch_generation(c: &mut Criterion) {
180185
group.finish();
181186
}
182187

188+
fn bench_mapped_batch_generation(c: &mut Criterion) {
189+
let mut group = c.benchmark_group("snowflake_mapped_batch_generation");
190+
191+
for batch_size in [256_usize, 1024] {
192+
group.throughput(Throughput::Elements(batch_size as u64));
193+
194+
group.bench_with_input(
195+
BenchmarkId::new("optimized_mapped", batch_size),
196+
&batch_size,
197+
|b, &size| {
198+
b.iter_batched(
199+
|| SnowflakeGenerator::new(1),
200+
|generator| {
201+
black_box(
202+
generator
203+
.next_ids_mapped(size, |id| id.wrapping_mul(31))
204+
.expect("optimized next_ids_mapped"),
205+
)
206+
},
207+
BatchSize::SmallInput,
208+
);
209+
},
210+
);
211+
212+
group.bench_with_input(
213+
BenchmarkId::new("legacy_then_map", batch_size),
214+
&batch_size,
215+
|b, &size| {
216+
b.iter_batched(
217+
|| LegacySnowflakeGenerator::new(1),
218+
|generator| {
219+
let ids = generator.next_ids(size).expect("legacy next_ids");
220+
black_box(ids.into_iter().map(|id| id.wrapping_mul(31)).collect::<Vec<_>>())
221+
},
222+
BatchSize::SmallInput,
223+
);
224+
},
225+
);
226+
}
227+
228+
group.finish();
229+
}
230+
183231
fn bench_concurrent_single_generation(c: &mut Criterion) {
184232
let mut group = c.benchmark_group("snowflake_concurrent_single_generation");
185233
let thread_count = 8;
186234
let ids_per_thread = 1_000;
187235
group.throughput(Throughput::Elements((thread_count * ids_per_thread) as u64));
188236

189-
group.bench_function(BenchmarkId::new("optimized", format!("{}x{}", thread_count, ids_per_thread)), |b| {
190-
b.iter(|| {
191-
let generator = Arc::new(SnowflakeGenerator::new(1));
192-
let duplicates = Arc::new(AtomicU64::new(0));
193-
thread::scope(|scope| {
194-
let mut handles = Vec::with_capacity(thread_count);
195-
for _ in 0..thread_count {
196-
let generator = Arc::clone(&generator);
197-
let duplicates = Arc::clone(&duplicates);
198-
handles.push(scope.spawn(move || {
199-
let mut prev = None;
200-
for _ in 0..ids_per_thread {
201-
let next = generator.next_id().expect("optimized concurrent next_id");
202-
if let Some(prev) = prev {
203-
if next <= prev {
204-
duplicates.fetch_add(1, Ordering::Relaxed);
237+
group.bench_function(
238+
BenchmarkId::new("optimized", format!("{}x{}", thread_count, ids_per_thread)),
239+
|b| {
240+
b.iter(|| {
241+
let generator = Arc::new(SnowflakeGenerator::new(1));
242+
let duplicates = Arc::new(AtomicU64::new(0));
243+
thread::scope(|scope| {
244+
let mut handles = Vec::with_capacity(thread_count);
245+
for _ in 0..thread_count {
246+
let generator = Arc::clone(&generator);
247+
let duplicates = Arc::clone(&duplicates);
248+
handles.push(scope.spawn(move || {
249+
let mut prev = None;
250+
for _ in 0..ids_per_thread {
251+
let next =
252+
generator.next_id().expect("optimized concurrent next_id");
253+
if let Some(prev) = prev {
254+
if next <= prev {
255+
duplicates.fetch_add(1, Ordering::Relaxed);
256+
}
205257
}
258+
prev = Some(next);
206259
}
207-
prev = Some(next);
208-
}
209-
}));
210-
}
211-
for handle in handles {
212-
handle.join().expect("optimized thread join");
213-
}
260+
}));
261+
}
262+
for handle in handles {
263+
handle.join().expect("optimized thread join");
264+
}
265+
});
266+
assert_eq!(duplicates.load(Ordering::Relaxed), 0);
214267
});
215-
assert_eq!(duplicates.load(Ordering::Relaxed), 0);
216-
});
217-
});
218-
219-
group.bench_function(BenchmarkId::new("legacy", format!("{}x{}", thread_count, ids_per_thread)), |b| {
220-
b.iter(|| {
221-
let generator = Arc::new(LegacySnowflakeGenerator::new(1));
222-
let duplicates = Arc::new(AtomicU64::new(0));
223-
thread::scope(|scope| {
224-
let mut handles = Vec::with_capacity(thread_count);
225-
for _ in 0..thread_count {
226-
let generator = Arc::clone(&generator);
227-
let duplicates = Arc::clone(&duplicates);
228-
handles.push(scope.spawn(move || {
229-
let mut prev = None;
230-
for _ in 0..ids_per_thread {
231-
let next = generator.next_id().expect("legacy concurrent next_id");
232-
if let Some(prev) = prev {
233-
if next <= prev {
234-
duplicates.fetch_add(1, Ordering::Relaxed);
268+
},
269+
);
270+
271+
group.bench_function(
272+
BenchmarkId::new("legacy", format!("{}x{}", thread_count, ids_per_thread)),
273+
|b| {
274+
b.iter(|| {
275+
let generator = Arc::new(LegacySnowflakeGenerator::new(1));
276+
let duplicates = Arc::new(AtomicU64::new(0));
277+
thread::scope(|scope| {
278+
let mut handles = Vec::with_capacity(thread_count);
279+
for _ in 0..thread_count {
280+
let generator = Arc::clone(&generator);
281+
let duplicates = Arc::clone(&duplicates);
282+
handles.push(scope.spawn(move || {
283+
let mut prev = None;
284+
for _ in 0..ids_per_thread {
285+
let next = generator.next_id().expect("legacy concurrent next_id");
286+
if let Some(prev) = prev {
287+
if next <= prev {
288+
duplicates.fetch_add(1, Ordering::Relaxed);
289+
}
235290
}
291+
prev = Some(next);
236292
}
237-
prev = Some(next);
238-
}
239-
}));
240-
}
241-
for handle in handles {
242-
handle.join().expect("legacy thread join");
243-
}
293+
}));
294+
}
295+
for handle in handles {
296+
handle.join().expect("legacy thread join");
297+
}
298+
});
299+
assert_eq!(duplicates.load(Ordering::Relaxed), 0);
244300
});
245-
assert_eq!(duplicates.load(Ordering::Relaxed), 0);
246-
});
247-
});
301+
},
302+
);
248303

249304
group.finish();
250305
}
251306

252307
criterion_group!(
253308
name = benches;
254309
config = Criterion::default().sample_size(20).warm_up_time(Duration::from_millis(500));
255-
targets = bench_single_generation, bench_batch_generation, bench_concurrent_single_generation
310+
targets = bench_single_generation, bench_batch_generation, bench_mapped_batch_generation, bench_concurrent_single_generation
256311
);
257312
criterion_main!(benches);

0 commit comments

Comments
 (0)