Skip to content

Commit b4a3b64

Browse files
committed
Add topic admin ops & secure file download
Introduce multiple topic and file-download improvements: - Add POST /v1/api/topics/latest-offsets endpoint to resolve partition head offsets (service/dba/system only). - Make topic consume support optional consumer group (stateless inspection); honor requested position for stateless reads and route fetches accordingly. - Add RESET CONSUMER GROUP SQL command/parser/classifier, integrate into extensions and dialect, and implement handler + result rows so admins (dba/system) can move group cursors. Wire into stream handler registry. - Harden file download handler: rename path param to stored_name, validate/guess content type from stored_name, sanitize disposition, and restrict cross-user raw file downloads to dba/system via explicit authorization helper. Update DownloadQuery docs and tests. - Improve health/job maintenance: add lower-frequency idle trim guard, epoch helper, and avoid unnecessary leadership polling in single-node mode; increase idle poll max and reduce unnecessary ticks. - Minor cleanups and tests: small formatting fixes in backup/restore tests, add topic models (selectors/response), and update routes, models, and related tests and SDK/UI files. These changes add admin tooling for topic maintenance, tighten file download security/authorization, and reduce background wakeups in non-cluster deployments.
1 parent 77f2249 commit b4a3b64

71 files changed

Lines changed: 2614 additions & 678 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/crates/kalamdb-api/src/http/files/download.rs

Lines changed: 74 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,26 @@ use std::sync::Arc;
44

55
use actix_web::{get, web, HttpResponse, Responder};
66
use kalamdb_auth::AuthSessionExtractor;
7-
use kalamdb_commons::{models::TableId, schemas::TableType, TableAccess};
7+
use kalamdb_commons::{
8+
models::{TableId, UserId},
9+
schemas::TableType,
10+
Role, TableAccess,
11+
};
812
use kalamdb_core::app_context::AppContext;
9-
use kalamdb_session::{can_access_shared_table, can_impersonate_target_user, AuthSession};
13+
use kalamdb_session::{
14+
can_access_shared_table, can_access_user_table, can_impersonate_target_user, AuthSession,
15+
};
1016
use kalamdb_system::FileRef;
1117

1218
use super::models::DownloadQuery;
1319
use crate::http::sql::models::{ErrorCode, SqlResponse};
1420

15-
/// GET /v1/files/{namespace}/{table_name}/{subfolder}/{file_id} - Download a file
21+
/// GET /v1/files/{namespace}/{table_name}/{subfolder}/{stored_name} - Download a file
1622
///
1723
/// Requires Bearer token (JWT) authorization and table access permissions.
1824
/// For user tables, downloads default to the authenticated user's table scope.
19-
/// Higher roles may supply `user_id` when the impersonation role matrix allows it.
20-
#[get("/files/{namespace}/{table_name}/{subfolder}/{file_id}")]
25+
/// DBA/system roles may supply `user_id` when the impersonation role matrix allows it.
26+
#[get("/files/{namespace}/{table_name}/{subfolder}/{stored_name}")]
2127
pub async fn download_file(
2228
extractor: AuthSessionExtractor,
2329
path: web::Path<(String, String, String, String)>,
@@ -27,7 +33,7 @@ pub async fn download_file(
2733
// Convert extractor to AuthSession
2834
let session: AuthSession = extractor.into();
2935

30-
let (namespace, table_name, subfolder, file_id) = path.into_inner();
36+
let (namespace, table_name, subfolder, stored_name) = path.into_inner();
3137
let table_id = TableId::from_strings(&namespace, &table_name);
3238

3339
// Look up table definition from schema registry
@@ -46,31 +52,29 @@ pub async fn download_file(
4652

4753
let user_id = match table_type {
4854
TableType::User => {
55+
if !can_access_user_table(session.role()) {
56+
return HttpResponse::Forbidden().json(SqlResponse::error(
57+
ErrorCode::PermissionDenied,
58+
"User table file downloads require user-table access",
59+
0.0,
60+
));
61+
}
62+
4963
let effective_user_id = if let Some(requested_user_id) = query.user_id.as_ref() {
50-
if requested_user_id == session.user_id() {
51-
requested_user_id.clone()
52-
} else {
53-
let requested_user_id = requested_user_id.clone();
54-
let target_role = app_context
55-
.system_tables()
56-
.users()
57-
.role_for_impersonation_target(&requested_user_id);
58-
59-
if !can_impersonate_target_user(
60-
session.user_id(),
61-
session.role(),
62-
&requested_user_id,
63-
target_role,
64-
) {
65-
return HttpResponse::Forbidden().json(SqlResponse::error(
66-
ErrorCode::PermissionDenied,
67-
"Requested user is not allowed for the current role",
68-
0.0,
69-
));
70-
}
71-
72-
requested_user_id
64+
let target_role = app_context
65+
.system_tables()
66+
.users()
67+
.role_for_impersonation_target(requested_user_id);
68+
69+
if !can_download_user_file_for_target(&session, requested_user_id, target_role) {
70+
return HttpResponse::Forbidden().json(SqlResponse::error(
71+
ErrorCode::PermissionDenied,
72+
"Requested user is not allowed for file download",
73+
0.0,
74+
));
7375
}
76+
77+
requested_user_id.clone()
7478
} else {
7579
session.user_id().clone()
7680
};
@@ -113,18 +117,18 @@ pub async fn download_file(
113117
|| subfolder.contains('/')
114118
|| subfolder.contains('\\')
115119
|| subfolder.contains('\0')
116-
|| file_id.contains("..")
117-
|| file_id.contains('/')
118-
|| file_id.contains('\\')
119-
|| file_id.contains('\0')
120+
|| stored_name.contains("..")
121+
|| stored_name.contains('/')
122+
|| stored_name.contains('\\')
123+
|| stored_name.contains('\0')
120124
{
121125
return HttpResponse::BadRequest().json(SqlResponse::error(
122126
ErrorCode::InvalidInput,
123127
"Invalid file path",
124128
0.0,
125129
));
126130
}
127-
let relative_path = format!("{}/{}", subfolder, file_id);
131+
let relative_path = format!("{}/{}", subfolder, stored_name);
128132

129133
// Fetch file from storage
130134
let file_service = app_context.file_storage_service();
@@ -134,25 +138,25 @@ pub async fn download_file(
134138
{
135139
Ok(data) => {
136140
// TODO: Get content type from the stored file metadata
137-
// Guess content type from file extension in file_id
138-
let content_type = guess_content_type(&file_id);
141+
// Guess content type from file extension in stored_name
142+
let content_type = guess_content_type(&stored_name);
139143

140-
// SECURITY: Sanitize file_id for Content-Disposition header to prevent
144+
// SECURITY: Sanitize stored_name for Content-Disposition header to prevent
141145
// HTTP response header injection (CRLF injection) via crafted filenames.
142-
let safe_file_id: String = file_id
146+
let safe_stored_name: String = stored_name
143147
.chars()
144148
.filter(|c| *c != '"' && *c != '\r' && *c != '\n' && *c != '\0')
145149
.collect();
146150
HttpResponse::Ok()
147151
.content_type(content_type)
148152
.append_header((
149153
"Content-Disposition",
150-
format!("inline; filename=\"{}\"", safe_file_id),
154+
format!("inline; filename=\"{}\"", safe_stored_name),
151155
))
152156
.body(data)
153157
},
154158
Err(e) => {
155-
log::warn!("File download failed: table={}, file={}: {}", table_id, file_id, e);
159+
log::warn!("File download failed: table={}, file={}: {}", table_id, stored_name, e);
156160
HttpResponse::NotFound().json(serde_json::json!({
157161
"error": "File not found",
158162
"code": "FILE_NOT_FOUND",
@@ -161,26 +165,45 @@ pub async fn download_file(
161165
}
162166
}
163167

164-
fn guess_content_type(file_id: &str) -> String {
165-
mime_guess::from_path(file_id).first_or_octet_stream().to_string()
168+
fn guess_content_type(stored_name: &str) -> String {
169+
mime_guess::from_path(stored_name).first_or_octet_stream().to_string()
170+
}
171+
172+
fn can_download_user_file_for_target(
173+
session: &AuthSession,
174+
requested_user_id: &UserId,
175+
target_role: Role,
176+
) -> bool {
177+
if requested_user_id == session.user_id() {
178+
return true;
179+
}
180+
181+
matches!(session.role(), Role::System | Role::Dba)
182+
&& can_impersonate_target_user(
183+
session.user_id(),
184+
session.role(),
185+
requested_user_id,
186+
target_role,
187+
)
166188
}
167189

168190
#[cfg(test)]
169191
mod tests {
170-
use kalamdb_commons::models::UserId;
171-
use kalamdb_commons::Role;
172-
173192
use super::*;
174193

175194
#[test]
176-
fn download_user_id_query_uses_shared_impersonation_authorization() {
177-
let actor = UserId::new("svc");
178-
let same_user = UserId::new("svc");
195+
fn download_user_id_query_is_self_or_admin_only() {
196+
let service = AuthSession::new(UserId::new("svc"), Role::Service);
197+
let dba = AuthSession::new(UserId::new("dba"), Role::Dba);
198+
let system = AuthSession::new(UserId::new("system"), Role::System);
199+
let user = AuthSession::new(UserId::new("alice"), Role::User);
179200
let regular_target = UserId::new("alice");
180201
let dba_target = UserId::new("dba-target");
181202

182-
assert!(can_impersonate_target_user(&actor, Role::Service, &same_user, Role::Service));
183-
assert!(can_impersonate_target_user(&actor, Role::Service, &regular_target, Role::User));
184-
assert!(!can_impersonate_target_user(&actor, Role::Service, &dba_target, Role::Dba));
203+
assert!(can_download_user_file_for_target(&user, &regular_target, Role::User));
204+
assert!(!can_download_user_file_for_target(&service, &regular_target, Role::User));
205+
assert!(can_download_user_file_for_target(&dba, &regular_target, Role::User));
206+
assert!(can_download_user_file_for_target(&dba, &dba_target, Role::Dba));
207+
assert!(can_download_user_file_for_target(&system, &dba_target, Role::Dba));
185208
}
186209
}

backend/crates/kalamdb-api/src/http/files/models/download_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use serde::Deserialize;
77
#[derive(Debug, Deserialize)]
88
pub struct DownloadQuery {
99
/// Optional user_id for user-table downloads.
10-
/// Cross-user requests are authorized through the impersonation role matrix.
10+
/// Cross-user raw byte downloads are limited to dba/system roles.
1111
#[serde(default, deserialize_with = "deserialize_optional_user_id")]
1212
pub user_id: Option<UserId>,
1313
}

backend/crates/kalamdb-api/src/http/topics/consume.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub async fn consume_handler(
5959
}
6060

6161
let topic_id = &body.topic_id;
62-
let group_id = &body.group_id;
62+
let group_id = body.group_id.as_ref();
6363

6464
// Verify topic exists
6565
let topics_provider = app_context.system_tables().topics();
@@ -82,19 +82,22 @@ pub async fn consume_handler(
8282

8383
// Determine start offset based on position.
8484
//
85-
// All positions first check the consumer group's committed offset.
86-
// If a committed offset exists, we resume from there (last_acked + 1).
87-
// The position only matters when no offset has been committed yet:
85+
// Grouped reads first check the consumer group's committed offset. If a
86+
// committed offset exists, we resume from there (last_acked + 1). The
87+
// requested position only matters when no offset has been committed yet.
88+
// Stateless reads do not check or create group state, so the requested
89+
// position is honored on every call:
8890
// - Earliest: start from offset 0 (replay all history)
8991
// - Latest: start from high-water mark (last offset + 1)
9092
// - Offset: start from the explicit offset
91-
let committed_offset =
93+
let committed_offset = group_id.and_then(|group_id| {
9294
topic_publisher.get_group_offsets(topic_id, group_id).ok().and_then(|offsets| {
9395
offsets
9496
.iter()
9597
.find(|o| o.partition_id == body.partition_id)
9698
.map(|o| o.last_acked_offset + 1)
97-
});
99+
})
100+
});
98101

99102
let start_offset = match committed_offset {
100103
Some(committed) => committed,
@@ -118,14 +121,25 @@ pub async fn consume_handler(
118121
},
119122
};
120123

121-
// Fetch messages
122-
let messages = match topic_publisher.fetch_messages_for_group(
123-
topic_id,
124-
group_id,
125-
body.partition_id,
126-
start_offset,
127-
body.limit as usize,
128-
) {
124+
// Fetch messages.
125+
let messages_result = if let Some(group_id) = group_id {
126+
topic_publisher.fetch_messages_for_group(
127+
topic_id,
128+
group_id,
129+
body.partition_id,
130+
start_offset,
131+
body.limit as usize,
132+
)
133+
} else {
134+
topic_publisher.fetch_messages(
135+
topic_id,
136+
body.partition_id,
137+
start_offset,
138+
body.limit as usize,
139+
)
140+
};
141+
142+
let messages = match messages_result {
129143
Ok(msgs) => msgs,
130144
Err(e) => {
131145
return HttpResponse::InternalServerError().json(TopicErrorResponse::internal_error(
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
//! Topic latest offsets handler
2+
//!
3+
//! POST /v1/api/topics/latest-offsets - Resolve topic partition head offsets
4+
5+
use std::collections::BTreeSet;
6+
use std::sync::Arc;
7+
8+
use actix_web::{post, web, HttpResponse, Responder};
9+
use kalamdb_auth::AuthSessionExtractor;
10+
use kalamdb_commons::Role;
11+
use kalamdb_core::app_context::AppContext;
12+
use kalamdb_session::AuthSession;
13+
14+
use super::models::{
15+
LatestOffsetsRequest, LatestOffsetsResponse, TopicErrorResponse, TopicPartitionLatestOffset,
16+
};
17+
18+
/// Check if role is allowed to resolve topic offsets.
19+
/// Must be service, dba, or system role (NOT user)
20+
fn is_topic_authorized(session: &AuthSession) -> bool {
21+
matches!(session.role(), Role::Service | Role::Dba | Role::System)
22+
}
23+
24+
/// POST /v1/api/topics/latest-offsets - Resolve topic partition head offsets
25+
///
26+
/// # Authentication
27+
/// Requires Bearer token authentication.
28+
///
29+
/// # Authorization
30+
/// Role must be `service`, `dba`, or `system` (NOT `user`).
31+
#[post("/latest-offsets")]
32+
pub async fn latest_offsets_handler(
33+
extractor: AuthSessionExtractor,
34+
body: web::Json<LatestOffsetsRequest>,
35+
app_context: web::Data<Arc<AppContext>>,
36+
) -> impl Responder {
37+
let session: AuthSession = extractor.into();
38+
39+
if !is_topic_authorized(&session) {
40+
return HttpResponse::Forbidden().json(TopicErrorResponse::forbidden(
41+
"Topic offset inspection requires service, dba, or system role",
42+
));
43+
}
44+
45+
let topic_publisher = app_context.topic_publisher();
46+
let mut seen = BTreeSet::new();
47+
let mut offsets = Vec::with_capacity(body.partitions.len());
48+
49+
for selector in &body.partitions {
50+
let dedupe_key = (selector.topic_id.to_string(), selector.partition_id);
51+
if !seen.insert(dedupe_key) {
52+
continue;
53+
}
54+
55+
let last_offset = match topic_publisher.latest_offset(&selector.topic_id, selector.partition_id)
56+
{
57+
Ok(offset) => offset,
58+
Err(error) => {
59+
return HttpResponse::InternalServerError().json(TopicErrorResponse::internal_error(
60+
&format!("Failed to resolve latest offset: {}", error),
61+
));
62+
},
63+
};
64+
65+
offsets.push(TopicPartitionLatestOffset {
66+
topic_id: selector.topic_id.clone(),
67+
partition_id: selector.partition_id,
68+
next_offset: last_offset.map(|offset| offset + 1).unwrap_or(0),
69+
last_offset,
70+
});
71+
}
72+
73+
offsets.sort_by(|left, right| {
74+
let topic_compare = left.topic_id.to_string().cmp(&right.topic_id.to_string());
75+
if topic_compare == std::cmp::Ordering::Equal {
76+
left.partition_id.cmp(&right.partition_id)
77+
} else {
78+
topic_compare
79+
}
80+
});
81+
82+
HttpResponse::Ok().json(LatestOffsetsResponse { offsets })
83+
}

backend/crates/kalamdb-api/src/http/topics/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@
66
//! ## Endpoints
77
//! - POST /v1/api/topics/consume - Consume messages from a topic
88
//! - POST /v1/api/topics/ack - Acknowledge offset for consumer group
9+
//! - POST /v1/api/topics/latest-offsets - Resolve topic partition head offsets
910
//!
1011
//! **Authorization**: Endpoints require `service`, `dba`, or `system` role (NOT `user`).
1112
1213
pub mod models;
1314

1415
mod ack;
1516
mod consume;
17+
mod latest_offsets;
1618

1719
pub(crate) use ack::ack_handler;
1820
pub(crate) use consume::consume_handler;
21+
pub(crate) use latest_offsets::latest_offsets_handler;

0 commit comments

Comments
 (0)