Skip to content

Commit b480530

Browse files
committed
Fix HOT update handling
Previously we weren't handling HOT chain correctly because we used fetch_row_version function instead of the index_fetch_tuple function which correctly handles HOT. This commit changes that. We also fix the snapshot that is used to be aligned with the snapshot of the scan, which is more correct. Finally we handle the case that no valid heap tuple is found. While we don't quite know how that can be hit, it's better to handle that case.
1 parent e698c59 commit b480530

6 files changed

Lines changed: 190 additions & 42 deletions

File tree

pgvectorscale/src/access_method/plain_storage.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use super::{
1111
storage_common::get_attribute_number_from_index,
1212
};
1313

14-
use pgrx::PgRelation;
14+
use pgrx::{PgBox, PgRelation};
1515

1616
use crate::util::{
1717
page::PageType, table_slot::TableSlot, tape::Tape, HeapPointer, IndexPointer, ItemPointer,
@@ -213,21 +213,31 @@ impl<'a> Storage for PlainStorage<'a> {
213213
}
214214
fn get_full_distance_for_resort<S: StatsHeapNodeRead + StatsDistanceComparison>(
215215
&self,
216+
scan: &PgBox<pgrx::pg_sys::IndexScanDescData>,
216217
qdm: &Self::QueryDistanceMeasure,
217218
_index_pointer: IndexPointer,
218219
heap_pointer: HeapPointer,
219220
meta_page: &MetaPage,
220221
stats: &mut S,
221-
) -> f32 {
222+
) -> Option<f32> {
222223
/* Plain storage only needs to resort when the index is using less dimensions than the underlying data. */
223224
assert!(meta_page.get_num_dimensions() > meta_page.get_num_dimensions_to_index());
224225

225-
let slot = unsafe { TableSlot::new(self.heap_rel, heap_pointer, stats) };
226+
let slot_opt = unsafe {
227+
TableSlot::from_index_heap_pointer(self.heap_rel, heap_pointer, scan.xs_snapshot, stats)
228+
};
229+
let slot = slot_opt?;
226230
match qdm {
227231
PlainDistanceMeasure::Full(query) => {
228-
let datum = unsafe { slot.get_attribute(self.heap_attr).unwrap() };
232+
let datum = unsafe {
233+
slot.get_attribute(self.heap_attr)
234+
.expect("vector attribute should exist in the heap")
235+
};
229236
let vec = unsafe { PgVector::from_datum(datum, meta_page, false, true) };
230-
self.get_distance_function()(vec.to_full_slice(), query.to_full_slice())
237+
Some(self.get_distance_function()(
238+
vec.to_full_slice(),
239+
query.to_full_slice(),
240+
))
231241
}
232242
}
233243
}
@@ -378,6 +388,13 @@ mod tests {
378388
);
379389
}
380390

391+
#[test]
392+
fn test_plain_storage_update_with_null() {
393+
crate::access_method::vacuum::tests::test_update_with_null_scaffold(
394+
"num_neighbors = 38, storage_layout = plain",
395+
);
396+
}
397+
381398
#[pg_test]
382399
unsafe fn test_plain_storage_empty_table_insert() -> spi::Result<()> {
383400
crate::access_method::build::tests::test_empty_table_insert_scaffold(

pgvectorscale/src/access_method/sbq.rs

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{cell::RefCell, collections::HashMap, iter::once, marker::PhantomData,
1414

1515
use pgrx::{
1616
pg_sys::{InvalidBlockNumber, InvalidOffsetNumber, BLCKSZ},
17-
PgRelation,
17+
PgBox, PgRelation,
1818
};
1919
use rkyv::{vec::ArchivedVec, Archive, Deserialize, Serialize};
2020

@@ -276,12 +276,22 @@ impl SbqSearchDistanceMeasure {
276276
&bq_vector[..self.quantized_dimensions],
277277
)
278278
} else {
279-
debug_assert!(self.quantized_vector.len() == bq_vector.len());
279+
debug_assert!(
280+
self.quantized_vector.len() == bq_vector.len(),
281+
"self.quantized_vector.len()={} bq_vector.len()={}",
282+
self.quantized_vector.len(),
283+
bq_vector.len()
284+
);
280285
(self.quantized_vector.as_slice(), bq_vector)
281286
}
282287
}
283288
GraphNeighborStore::Builder(_b) => {
284-
debug_assert!(self.quantized_vector.len() == bq_vector.len());
289+
debug_assert!(
290+
self.quantized_vector.len() == bq_vector.len(),
291+
"self.quantized_vector.len()={} bq_vector.len()={}",
292+
self.quantized_vector.len(),
293+
bq_vector.len()
294+
);
285295
(self.quantized_vector.as_slice(), bq_vector)
286296
}
287297
};
@@ -538,14 +548,6 @@ impl<'a> SbqSpeedupStorage<'a> {
538548
}
539549
}
540550
}
541-
542-
unsafe fn get_heap_table_slot_from_heap_pointer<T: StatsHeapNodeRead>(
543-
&self,
544-
heap_pointer: HeapPointer,
545-
stats: &mut T,
546-
) -> TableSlot {
547-
TableSlot::new(self.heap_rel, heap_pointer, stats)
548-
}
549551
}
550552

551553
pub type SbqSpeedupStorageLsnPrivateData = PhantomData<bool>; //no data stored
@@ -635,17 +637,28 @@ impl<'a> Storage for SbqSpeedupStorage<'a> {
635637

636638
fn get_full_distance_for_resort<S: StatsHeapNodeRead + StatsDistanceComparison>(
637639
&self,
640+
scan: &PgBox<pgrx::pg_sys::IndexScanDescData>,
638641
qdm: &Self::QueryDistanceMeasure,
639642
_index_pointer: IndexPointer,
640643
heap_pointer: HeapPointer,
641644
meta_page: &MetaPage,
642645
stats: &mut S,
643-
) -> f32 {
644-
let slot = unsafe { self.get_heap_table_slot_from_heap_pointer(heap_pointer, stats) };
646+
) -> Option<f32> {
647+
let slot_opt = unsafe {
648+
TableSlot::from_index_heap_pointer(self.heap_rel, heap_pointer, scan.xs_snapshot, stats)
649+
};
650+
651+
let slot = slot_opt?;
645652

646-
let datum = unsafe { slot.get_attribute(self.heap_attr).unwrap() };
653+
let datum = unsafe {
654+
slot.get_attribute(self.heap_attr)
655+
.expect("vector attribute should exist in the heap")
656+
};
647657
let vec = unsafe { PgVector::from_datum(datum, meta_page, false, true) };
648-
self.get_distance_function()(vec.to_full_slice(), qdm.query.to_full_slice())
658+
Some(self.get_distance_function()(
659+
vec.to_full_slice(),
660+
qdm.query.to_full_slice(),
661+
))
649662
}
650663

651664
fn get_neighbors_with_distances_from_disk<S: StatsNodeRead + StatsDistanceComparison>(
@@ -1045,6 +1058,12 @@ mod tests {
10451058
);
10461059
}
10471060

1061+
#[test]
1062+
fn test_bq_compressed_storage_update_with_null() {
1063+
crate::access_method::vacuum::tests::test_update_with_null_scaffold(
1064+
"num_neighbors = 38, storage_layout = memory_optimized",
1065+
);
1066+
}
10481067
#[pg_test]
10491068
unsafe fn test_bq_compressed_storage_empty_table_insert() -> spi::Result<()> {
10501069
crate::access_method::build::tests::test_empty_table_insert_scaffold(

pgvectorscale/src/access_method/scan.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ impl<QDM, PD> TSVResponseIterator<QDM, PD> {
226226

227227
fn next_with_resort<S: Storage<QueryDistanceMeasure = QDM, LSNPrivateData = PD>>(
228228
&mut self,
229+
scan: &PgBox<pg_sys::IndexScanDescData>,
229230
_index: &PgRelation,
230231
storage: &S,
231232
) -> Option<(HeapPointer, IndexPointer)> {
@@ -243,23 +244,32 @@ impl<QDM, PD> TSVResponseIterator<QDM, PD> {
243244
Some((heap_pointer, index_pointer)) => {
244245
self.full_distance_comparisons += 1;
245246
let distance = storage.get_full_distance_for_resort(
247+
scan,
246248
self.lsr.sdm.as_ref().unwrap(),
247249
index_pointer,
248250
heap_pointer,
249251
&self.meta_page,
250252
&mut self.lsr.stats,
251253
);
252254

253-
if self.resort_buffer.len() > 1 {
254-
self.streaming_stats
255-
.update(distance, distance - self.streaming_stats.max_distance);
255+
match distance {
256+
None => {
257+
/* No entry found in heap */
258+
continue;
259+
}
260+
Some(distance) => {
261+
if self.resort_buffer.len() > 1 {
262+
self.streaming_stats
263+
.update(distance, distance - self.streaming_stats.max_distance);
264+
}
265+
266+
self.resort_buffer.push(ResortData {
267+
heap_pointer,
268+
index_pointer,
269+
distance,
270+
});
271+
}
256272
}
257-
258-
self.resort_buffer.push(ResortData {
259-
heap_pointer,
260-
index_pointer,
261-
distance,
262-
});
263273
}
264274
None => {
265275
break;
@@ -373,7 +383,7 @@ pub extern "C" fn amgettuple(
373383
quantizer,
374384
&state.meta_page,
375385
);
376-
let next = iter.next_with_resort(&indexrel, &bq);
386+
let next = iter.next_with_resort(&scan, &indexrel, &bq);
377387
get_tuple(state, next, scan)
378388
}
379389
StorageState::Plain(iter) => {
@@ -385,7 +395,7 @@ pub extern "C" fn amgettuple(
385395
/* no need to resort */
386396
iter.next(&storage)
387397
} else {
388-
iter.next_with_resort(&indexrel, &storage)
398+
iter.next_with_resort(&scan, &indexrel, &storage)
389399
};
390400
get_tuple(state, next, scan)
391401
}

pgvectorscale/src/access_method/storage.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::pin::Pin;
22

3+
use pgrx::{pg_sys, PgBox};
4+
35
use crate::util::{page::PageType, tape::Tape, HeapPointer, IndexPointer, ItemPointer};
46

57
use super::{
@@ -74,12 +76,13 @@ pub trait Storage {
7476

7577
fn get_full_distance_for_resort<S: StatsHeapNodeRead + StatsDistanceComparison>(
7678
&self,
79+
scan: &PgBox<pg_sys::IndexScanDescData>,
7780
query: &Self::QueryDistanceMeasure,
7881
index_pointer: IndexPointer,
7982
heap_pointer: HeapPointer,
8083
meta_page: &MetaPage,
8184
stats: &mut S,
82-
) -> f32;
85+
) -> Option<f32>;
8386

8487
fn visit_lsn(
8588
&self,

pgvectorscale/src/access_method/vacuum.rs

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,92 @@ pub mod tests {
355355
client.execute("DROP TABLE test_vac_full", &[]).unwrap();
356356
}
357357

358+
/* This test inserts data with a NULL embedding and then updates the row with the actual embedding.
359+
We do this in several transactions to trigger Heap-only-tuples (HOT) updates. This test is specifically structured to ensure to hit HOT code paths.
360+
*/
361+
#[cfg(test)]
362+
pub fn test_update_with_null_scaffold(index_options: &str) {
363+
//do not run this test in parallel
364+
let _lock = VAC_FULL_MUTEX.lock().unwrap();
365+
let expected_cnt = 1000;
366+
367+
//we need to run a few txn in this test which cannot be run from SPI.
368+
//so we cannot use the pg_test framework here. Thus we do a bit of
369+
//hackery to bring up the test db and then use a client to run queries against it.
370+
371+
//bring up the test db by running a fake test on a fake fn
372+
let mut opts = crate::pg_test::postgresql_conf_options();
373+
/* this is a special key that causes all messages to be printed out by the pgrx test framework */
374+
opts.push("log_line_prefix='TMSG: '");
375+
pgrx_tests::run_test("test_delete_mock_fn", None, opts).unwrap();
376+
377+
let (mut client, _) = pgrx_tests::client().unwrap();
378+
379+
client
380+
.batch_execute(&format!(
381+
"CREATE TABLE test_data_hot_test_1 (
382+
id bigint not null primary key generated by default as identity,
383+
article text,
384+
embedding vector(1024)
385+
);
386+
387+
insert into test_data_hot_test_1(article) SELECT 'abc' FROM generate_series(1,{expected_cnt});
388+
"
389+
))
390+
.unwrap();
391+
392+
client
393+
.execute(
394+
&format!("do $$
395+
declare
396+
_id bigint;
397+
_article text;
398+
_embedding vector(1024);
399+
begin
400+
loop
401+
select id, article into _id, _article
402+
from test_data_hot_test_1
403+
where embedding is null
404+
for update skip locked
405+
limit 1;
406+
407+
if not found then
408+
exit;
409+
end if;
410+
411+
412+
select ('[' || array_to_string(array_agg(random()), ',', '0') || ']')::vector into _embedding from generate_series(1,1024);
413+
update test_data_hot_test_1 set embedding = _embedding where id = _id;
414+
commit;
415+
end loop;
416+
end;
417+
$$;"),
418+
&[],
419+
)
420+
.unwrap();
421+
422+
client
423+
.execute(
424+
&format!("CREATE INDEX idx_diskann_bq ON test_data_hot_test_1 USING diskann (embedding) WITH({index_options});"),
425+
&[],
426+
)
427+
.unwrap();
428+
429+
client.execute(&format!(
430+
"with q as
431+
(
432+
select ('[' || array_to_string(array_agg(random()), ',', '0') || ']')::vector q from generate_series(1,1024)
433+
)
434+
select article
435+
from test_data_hot_test_1
436+
order by embedding <=> (select q from q limit 1)
437+
limit 15;"), &[]).unwrap();
438+
439+
client
440+
.execute("DROP TABLE test_data_hot_test_1", &[])
441+
.unwrap();
442+
}
443+
358444
#[pg_test]
359445
///This function is only a mock to bring up the test framewokr in test_delete_vacuum
360446
fn test_delete_mock_fn() -> spi::Result<()> {

pgvectorscale/src/util/table_slot.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use std::ptr::addr_of_mut;
2-
3-
use pgrx::pg_sys::{Datum, TupleTableSlot};
1+
use pgrx::pg_sys::{Datum, SnapshotData, TupleTableSlot};
42
use pgrx::{pg_sys, PgBox, PgRelation};
53

64
use crate::access_method::stats::StatsHeapNodeRead;
@@ -12,31 +10,46 @@ pub struct TableSlot {
1210
}
1311

1412
impl TableSlot {
15-
pub unsafe fn new<S: StatsHeapNodeRead>(
13+
pub unsafe fn from_index_heap_pointer<S: StatsHeapNodeRead>(
1614
heap_rel: &PgRelation,
1715
heap_pointer: HeapPointer,
16+
snapshot: *mut SnapshotData,
1817
stats: &mut S,
19-
) -> Self {
18+
) -> Option<Self> {
2019
let slot = PgBox::from_pg(pg_sys::table_slot_create(
2120
heap_rel.as_ptr(),
2221
std::ptr::null_mut(),
2322
));
2423

2524
let table_am = heap_rel.rd_tableam;
26-
let fetch_row_version = (*table_am).tuple_fetch_row_version.unwrap();
2725
let mut ctid: pg_sys::ItemPointerData = pg_sys::ItemPointerData {
2826
..Default::default()
2927
};
3028
heap_pointer.to_item_pointer_data(&mut ctid);
31-
fetch_row_version(
32-
heap_rel.as_ptr(),
29+
30+
let scan = (*table_am).index_fetch_begin.unwrap()(heap_rel.as_ptr());
31+
let mut call_again = false;
32+
/* all_dead can be ignored, only used in optimizations we don't implement */
33+
let mut all_dead = false;
34+
let valid = (*table_am).index_fetch_tuple.unwrap()(
35+
scan,
3336
&mut ctid,
34-
addr_of_mut!(pg_sys::SnapshotAnyData),
37+
snapshot,
3538
slot.as_ptr(),
39+
&mut call_again,
40+
&mut all_dead,
3641
);
42+
(*table_am).index_fetch_end.unwrap()(scan);
43+
44+
assert!(!call_again, "MVCC snapshots should not require call_again");
3745
stats.record_heap_read();
3846

39-
Self { slot }
47+
if !valid {
48+
/* no valid tuples found in HOT-chain */
49+
return None;
50+
}
51+
52+
Some(Self { slot })
4053
}
4154

4255
pub unsafe fn get_attribute(&self, attribute_number: pg_sys::AttrNumber) -> Option<Datum> {

0 commit comments

Comments
 (0)