Skip to content

Commit ab9c288

Browse files
committed
Added more tests to schema-diff crate with support to pub/sub
1 parent f221076 commit ab9c288

21 files changed

Lines changed: 2358 additions & 802 deletions

File tree

cli/crates/kalam-schema-diff/src/diff.rs

Lines changed: 98 additions & 794 deletions
Large diffs are not rendered by default.
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
mod table;
2+
mod topic;
3+
4+
use crate::{
5+
emitter::{
6+
table::{diff_existing_table, emit_create_table},
7+
topic::{diff_existing_topic, emit_add_topic_source, emit_create_topic},
8+
},
9+
model::Schema,
10+
};
11+
12+
pub(crate) fn diff_schema(current: &Schema, target: &Schema, allow_drop: bool) -> Vec<String> {
13+
let mut out = vec![
14+
"-- Generated KalamDB schema evolution".to_string(),
15+
"-- Review before applying in production.".to_string(),
16+
String::new(),
17+
];
18+
19+
for namespace in target.namespaces.difference(&current.namespaces) {
20+
out.push(format!("CREATE NAMESPACE IF NOT EXISTS {namespace};"));
21+
}
22+
23+
if !target.namespaces.is_empty() && out.last().map(String::as_str) != Some("") {
24+
out.push(String::new());
25+
}
26+
27+
for (table_key, target_table) in &target.tables {
28+
match current.tables.get(table_key) {
29+
Some(current_table) => {
30+
diff_existing_table(current_table, target_table, allow_drop, &mut out);
31+
},
32+
None => {
33+
out.push(emit_create_table(target_table));
34+
out.push(String::new());
35+
},
36+
}
37+
}
38+
39+
for (topic_key, target_topic) in &target.topics {
40+
match current.topics.get(topic_key) {
41+
Some(current_topic) => {
42+
diff_existing_topic(current_topic, target_topic, &mut out);
43+
},
44+
None => {
45+
out.push(emit_create_topic(target_topic));
46+
47+
for source in target_topic.sources.values() {
48+
out.push(emit_add_topic_source(target_topic, source));
49+
}
50+
51+
out.push(String::new());
52+
},
53+
}
54+
}
55+
56+
for (topic_key, current_topic) in &current.topics {
57+
if !target.topics.contains_key(topic_key) {
58+
if allow_drop {
59+
out.push(format!("DROP TOPIC {};", current_topic.name_sql));
60+
} else {
61+
out.push(format!(
62+
"-- destructive change skipped: topic {} exists in current schema but not in target schema",
63+
current_topic.name_sql
64+
));
65+
out.push(format!(
66+
"-- rerun with destructive changes enabled to emit: DROP TOPIC {};",
67+
current_topic.name_sql
68+
));
69+
}
70+
71+
out.push(String::new());
72+
}
73+
}
74+
75+
for (table_key, current_table) in &current.tables {
76+
if !target.tables.contains_key(table_key) {
77+
if allow_drop {
78+
out.push(format!("DROP TABLE {};", current_table.name_sql));
79+
} else {
80+
out.push(format!(
81+
"-- destructive change skipped: table {} exists in current schema but not in target schema",
82+
current_table.name_sql
83+
));
84+
out.push(format!(
85+
"-- rerun with destructive changes enabled to emit: DROP TABLE {};",
86+
current_table.name_sql
87+
));
88+
}
89+
out.push(String::new());
90+
}
91+
}
92+
93+
if out.iter().all(|line| line.starts_with("--") || line.trim().is_empty()) {
94+
out.push("-- No schema changes.".to_string());
95+
}
96+
97+
out
98+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
use std::collections::BTreeSet;
2+
3+
use crate::{model::Table, sql::same_option_value};
4+
5+
pub(super) fn diff_existing_table(
6+
current: &Table,
7+
target: &Table,
8+
allow_drop: bool,
9+
out: &mut Vec<String>,
10+
) {
11+
let mut emitted_for_table = false;
12+
13+
if current.kind != target.kind {
14+
out.push(format!(
15+
"-- manual review required: table {} changed kind from {:?} to {:?}",
16+
target.name_sql, current.kind, target.kind
17+
));
18+
emitted_for_table = true;
19+
}
20+
21+
let set_options = target
22+
.options
23+
.iter()
24+
.filter_map(|(key, target_value)| match current.options.get(key) {
25+
Some(current_value) if same_option_value(current_value, target_value) => None,
26+
_ => Some(format!("{key} = {target_value}")),
27+
})
28+
.collect::<Vec<_>>();
29+
30+
if !set_options.is_empty() {
31+
out.push(format!(
32+
"ALTER TABLE {} SET TBLPROPERTIES ({});",
33+
target.name_sql,
34+
set_options.join(", ")
35+
));
36+
emitted_for_table = true;
37+
}
38+
39+
for removed_option in current.options.keys() {
40+
if !target.options.contains_key(removed_option) {
41+
out.push(format!(
42+
"-- manual review required: option {} was removed from table {}",
43+
removed_option, target.name_sql
44+
));
45+
out.push(format!(
46+
"-- recommended grammar to add: ALTER TABLE {} RESET TBLPROPERTIES ({});",
47+
target.name_sql, removed_option
48+
));
49+
emitted_for_table = true;
50+
}
51+
}
52+
53+
let current_constraints = current.constraints.iter().cloned().collect::<BTreeSet<_>>();
54+
let target_constraints = target.constraints.iter().cloned().collect::<BTreeSet<_>>();
55+
56+
if current_constraints != target_constraints {
57+
out.push(format!(
58+
"-- manual review required: constraints changed on table {}",
59+
target.name_sql
60+
));
61+
out.push(format!("-- current constraints: {current_constraints:?}"));
62+
out.push(format!("-- target constraints: {target_constraints:?}"));
63+
emitted_for_table = true;
64+
}
65+
66+
for column_key in &target.column_order {
67+
let target_column = target.columns.get(column_key).expect("target column exists");
68+
69+
match current.columns.get(column_key) {
70+
Some(current_column) => {
71+
if current_column.semantic_signature() != target_column.semantic_signature() {
72+
if current_column.primary_key != target_column.primary_key {
73+
out.push(format!(
74+
"-- manual review required: primary key changed for {}.{}",
75+
target.name_sql, target_column.name_sql
76+
));
77+
emitted_for_table = true;
78+
continue;
79+
}
80+
81+
out.push(format!(
82+
"ALTER TABLE {} MODIFY COLUMN {};",
83+
target.name_sql,
84+
target_column.modify_fragment()
85+
));
86+
emitted_for_table = true;
87+
}
88+
},
89+
None => {
90+
out.push(format!(
91+
"ALTER TABLE {} ADD COLUMN {};",
92+
target.name_sql, target_column.create_sql
93+
));
94+
emitted_for_table = true;
95+
},
96+
}
97+
}
98+
99+
for column_key in &current.column_order {
100+
if !target.columns.contains_key(column_key) {
101+
let current_column = current.columns.get(column_key).expect("current column exists");
102+
103+
if allow_drop {
104+
out.push(format!(
105+
"ALTER TABLE {} DROP COLUMN {};",
106+
target.name_sql, current_column.name_sql
107+
));
108+
} else {
109+
out.push(format!(
110+
"-- destructive change skipped: column {}.{} exists in current schema but not in target schema",
111+
target.name_sql, current_column.name_sql
112+
));
113+
out.push(format!(
114+
"-- rerun with destructive changes enabled to emit: ALTER TABLE {} DROP COLUMN {};",
115+
target.name_sql, current_column.name_sql
116+
));
117+
}
118+
119+
emitted_for_table = true;
120+
}
121+
}
122+
123+
if emitted_for_table {
124+
out.push(String::new());
125+
}
126+
}
127+
128+
pub(super) fn emit_create_table(table: &Table) -> String {
129+
let mut parts = Vec::new();
130+
131+
for column_key in &table.column_order {
132+
let column = table.columns.get(column_key).expect("column exists");
133+
parts.push(column.create_sql.clone());
134+
}
135+
136+
for constraint in &table.constraints {
137+
parts.push(constraint.clone());
138+
}
139+
140+
let kind_prefix = table.kind.map(|kind| kind.as_create_prefix()).unwrap_or("");
141+
let mut sql =
142+
format!("CREATE {}TABLE {} (\n {}\n)", kind_prefix, table.name_sql, parts.join(",\n "));
143+
144+
if !table.options.is_empty() {
145+
let options = table
146+
.options
147+
.iter()
148+
.map(|(key, value)| format!("{key} = {value}"))
149+
.collect::<Vec<_>>()
150+
.join(",\n ");
151+
152+
sql.push_str(&format!("\nWITH (\n {options}\n)"));
153+
}
154+
155+
sql.push(';');
156+
sql
157+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use crate::model::{Topic, TopicPayloadMode, TopicRetention, TopicSource};
2+
3+
pub(super) fn diff_existing_topic(current: &Topic, target: &Topic, out: &mut Vec<String>) {
4+
let mut emitted_for_topic = false;
5+
6+
if current.partitions != target.partitions {
7+
out.push(format!(
8+
"-- manual review required: topic {} changed partition count from {:?} to {:?}",
9+
target.name_sql, current.partitions, target.partitions
10+
));
11+
emitted_for_topic = true;
12+
}
13+
14+
if current.retention != target.retention {
15+
if target.retention.is_empty() {
16+
out.push(format!("ALTER TOPIC {} CLEAR RETENTION;", target.name_sql));
17+
} else {
18+
out.push(format!(
19+
"ALTER TOPIC {} SET RETENTION {};",
20+
target.name_sql,
21+
emit_topic_retention(&target.retention)
22+
));
23+
}
24+
25+
emitted_for_topic = true;
26+
}
27+
28+
for (source_key, target_source) in &target.sources {
29+
if !current.sources.contains_key(source_key) {
30+
out.push(emit_add_topic_source(target, target_source));
31+
emitted_for_topic = true;
32+
}
33+
}
34+
35+
for (source_key, current_source) in &current.sources {
36+
if !target.sources.contains_key(source_key) {
37+
out.push(format!(
38+
"-- manual review required: topic {} source {} on {} was removed from target schema",
39+
target.name_sql,
40+
current_source.table_sql,
41+
current_source.operation.as_sql()
42+
));
43+
emitted_for_topic = true;
44+
}
45+
}
46+
47+
if emitted_for_topic {
48+
out.push(String::new());
49+
}
50+
}
51+
52+
pub(super) fn emit_create_topic(topic: &Topic) -> String {
53+
let mut sql = "CREATE TOPIC ".to_string();
54+
55+
if topic.if_not_exists {
56+
sql.push_str("IF NOT EXISTS ");
57+
}
58+
59+
sql.push_str(&topic.name_sql);
60+
61+
if let Some(partitions) = topic.partitions {
62+
sql.push_str(&format!(" PARTITIONS {partitions}"));
63+
}
64+
65+
if !topic.retention.is_empty() {
66+
sql.push(' ');
67+
sql.push_str(&emit_topic_retention(&topic.retention));
68+
}
69+
70+
sql.push(';');
71+
sql
72+
}
73+
74+
pub(super) fn emit_add_topic_source(topic: &Topic, source: &TopicSource) -> String {
75+
let mut sql = format!(
76+
"ALTER TOPIC {} ADD SOURCE {} ON {}",
77+
topic.name_sql,
78+
source.table_sql,
79+
source.operation.as_sql()
80+
);
81+
82+
if let Some(filter_expr) = &source.filter_expr {
83+
sql.push_str(" WHERE ");
84+
sql.push_str(filter_expr);
85+
}
86+
87+
if source.payload_explicit || source.payload_mode != TopicPayloadMode::Full {
88+
sql.push_str(&format!(" WITH (payload = '{}')", source.payload_mode.as_sql()));
89+
}
90+
91+
sql.push(';');
92+
sql
93+
}
94+
95+
fn emit_topic_retention(retention: &TopicRetention) -> String {
96+
let mut options = Vec::new();
97+
98+
if let Some(retention_seconds) = &retention.retention_seconds {
99+
options.push(format!("retention_seconds = {retention_seconds}"));
100+
}
101+
102+
if let Some(retention_max_bytes) = &retention.retention_max_bytes {
103+
options.push(format!("retention_max_bytes = {retention_max_bytes}"));
104+
}
105+
106+
format!("WITH ({})", options.join(", "))
107+
}

cli/crates/kalam-schema-diff/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
//! producing deterministic migration statements.
55
66
mod diff;
7+
mod emitter;
8+
mod model;
9+
mod parser;
10+
mod sql;
711

812
pub use diff::{
913
diff_schema_files, diff_schema_files_with_options, diff_schema_sql,

0 commit comments

Comments
 (0)