mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-19 09:16:45 +00:00
feat: PGM index for multivalued columns
This commit is contained in:
parent
25d44fcec8
commit
ba8699c69c
3 changed files with 215 additions and 19 deletions
|
|
@ -11,6 +11,7 @@ categories = ["database-implementations", "data-structures", "compression"]
|
|||
[dependencies]
|
||||
itertools = "0.14.0"
|
||||
fastdivide = "0.4.0"
|
||||
pgm-extra = { version = "1.2.0", default-features = false }
|
||||
|
||||
stacker = { version= "0.6", path = "../stacker", package="tantivy-stacker"}
|
||||
sstable = { version= "0.6", path = "../sstable", package = "tantivy-sstable" }
|
||||
|
|
@ -57,5 +58,9 @@ harness = false
|
|||
name = "bench_optional_index"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "bench_multivalued_pgm"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
zstd-compression = ["sstable/zstd-compression"]
|
||||
|
|
|
|||
84
columnar/benches/bench_multivalued_pgm.rs
Normal file
84
columnar/benches/bench_multivalued_pgm.rs
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
use binggan::{InputGroup, black_box};
|
||||
use tantivy_columnar::{ColumnarReader, ColumnarWriter, DynamicColumn};
|
||||
|
||||
fn bench_sparse_query() {
|
||||
let size = 100_000u32;
|
||||
|
||||
let mut columnar_writer = ColumnarWriter::default();
|
||||
for doc in 0u32..size {
|
||||
columnar_writer.record_numerical(doc, "vals", (doc * 10) as u64);
|
||||
columnar_writer.record_numerical(doc, "vals", (doc * 10 + 1) as u64);
|
||||
}
|
||||
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
columnar_writer.serialize(size, &mut buffer).unwrap();
|
||||
|
||||
let reader = ColumnarReader::open(buffer).unwrap();
|
||||
let column = reader.read_columns("vals").unwrap()[0]
|
||||
.open()
|
||||
.unwrap()
|
||||
.coerce_numerical(tantivy_columnar::NumericalType::U64)
|
||||
.unwrap();
|
||||
|
||||
let DynamicColumn::U64(column) = column else {
|
||||
panic!();
|
||||
};
|
||||
|
||||
let mut group: InputGroup<()> =
|
||||
InputGroup::new_with_inputs(vec![(format!("sparse_query_{}docs", size), ())]);
|
||||
|
||||
let mid = (size / 2) as u64 * 10;
|
||||
let sparse_range = mid..=(mid + 100);
|
||||
let num_docs = size;
|
||||
|
||||
group.register("sparse_range_1pct", move |_: &()| {
|
||||
let mut docids = Vec::new();
|
||||
column.get_docids_for_value_range(sparse_range.clone(), 0..num_docs, &mut docids);
|
||||
black_box(docids.len());
|
||||
});
|
||||
|
||||
group.run();
|
||||
}
|
||||
|
||||
fn bench_full_scan() {
|
||||
let size = 100_000u32;
|
||||
|
||||
let mut columnar_writer = ColumnarWriter::default();
|
||||
for doc in 0u32..size {
|
||||
columnar_writer.record_numerical(doc, "vals", (doc * 10) as u64);
|
||||
columnar_writer.record_numerical(doc, "vals", (doc * 10 + 1) as u64);
|
||||
}
|
||||
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
columnar_writer.serialize(size, &mut buffer).unwrap();
|
||||
|
||||
let reader = ColumnarReader::open(buffer).unwrap();
|
||||
let column = reader.read_columns("vals").unwrap()[0]
|
||||
.open()
|
||||
.unwrap()
|
||||
.coerce_numerical(tantivy_columnar::NumericalType::U64)
|
||||
.unwrap();
|
||||
|
||||
let DynamicColumn::U64(column) = column else {
|
||||
panic!();
|
||||
};
|
||||
|
||||
let mut group: InputGroup<()> =
|
||||
InputGroup::new_with_inputs(vec![(format!("full_scan_{}docs", size), ())]);
|
||||
|
||||
let full_range = 0..=u64::MAX;
|
||||
let num_docs = size;
|
||||
|
||||
group.register("full_range_all", move |_: &()| {
|
||||
let mut docids = Vec::new();
|
||||
column.get_docids_for_value_range(full_range.clone(), 0..num_docs, &mut docids);
|
||||
black_box(docids.len());
|
||||
});
|
||||
|
||||
group.run();
|
||||
}
|
||||
|
||||
fn main() {
|
||||
bench_sparse_query();
|
||||
bench_full_scan();
|
||||
}
|
||||
|
|
@ -1,9 +1,10 @@
|
|||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use common::{CountingWriter, OwnedBytes};
|
||||
use pgm_extra::OneLevelIndex;
|
||||
|
||||
use super::optional_index::{open_optional_index, serialize_optional_index};
|
||||
use super::{OptionalIndex, SerializableOptionalIndex, Set};
|
||||
|
|
@ -13,6 +14,8 @@ use crate::column_values::{
|
|||
use crate::iterable::Iterable;
|
||||
use crate::{DocId, RowId, Version};
|
||||
|
||||
const MIN_SIZE_FOR_PGM: u32 = 1024;
|
||||
|
||||
pub struct SerializableMultivalueIndex<'a> {
|
||||
pub doc_ids_with_values: SerializableOptionalIndex<'a>,
|
||||
pub start_offsets: Box<dyn Iterable<u32> + 'a>,
|
||||
|
|
@ -53,6 +56,7 @@ pub fn open_multivalued_index(
|
|||
load_u64_based_column_values(bytes)?;
|
||||
Ok(MultiValueIndex::MultiValueIndexV1(MultiValueIndexV1 {
|
||||
start_index_column,
|
||||
pgm_index: OnceLock::new(),
|
||||
}))
|
||||
}
|
||||
Version::V2 => {
|
||||
|
|
@ -67,6 +71,7 @@ pub fn open_multivalued_index(
|
|||
Ok(MultiValueIndex::MultiValueIndexV2(MultiValueIndexV2 {
|
||||
optional_index,
|
||||
start_index_column,
|
||||
pgm_index: OnceLock::new(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
|
@ -80,11 +85,25 @@ pub enum MultiValueIndex {
|
|||
MultiValueIndexV2(MultiValueIndexV2),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Index to resolve value range for given doc_id.
|
||||
/// Starts at 0.
|
||||
pub struct MultiValueIndexV1 {
|
||||
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
|
||||
pgm_index: OnceLock<PgmIndexData>,
|
||||
}
|
||||
|
||||
impl Clone for MultiValueIndexV1 {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
start_index_column: self.start_index_column.clone(),
|
||||
pgm_index: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PgmIndexData {
|
||||
index: OneLevelIndex<u32>,
|
||||
values: Vec<u32>,
|
||||
}
|
||||
|
||||
impl MultiValueIndexV1 {
|
||||
|
|
@ -106,21 +125,58 @@ impl MultiValueIndexV1 {
|
|||
self.start_index_column.num_vals() - 1
|
||||
}
|
||||
|
||||
fn get_or_build_pgm(&self) -> Option<&PgmIndexData> {
|
||||
let num_vals = self.start_index_column.num_vals();
|
||||
if num_vals < MIN_SIZE_FOR_PGM {
|
||||
return None;
|
||||
}
|
||||
Some(self.pgm_index.get_or_init(|| {
|
||||
let mut values = Vec::with_capacity(num_vals as usize);
|
||||
for i in 0..num_vals {
|
||||
values.push(self.start_index_column.get_val(i));
|
||||
}
|
||||
let index = OneLevelIndex::new(&values, 64).expect("PGM build failed");
|
||||
PgmIndexData { index, values }
|
||||
}))
|
||||
}
|
||||
|
||||
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
|
||||
/// docids. Positions are converted inplace to docids.
|
||||
///
|
||||
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the
|
||||
/// index.
|
||||
///
|
||||
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically
|
||||
/// increasing positions.
|
||||
///
|
||||
/// TODO: Instead of a linear scan we can employ a exponential search into binary search to
|
||||
/// match a docid to its value position.
|
||||
/// Uses PGM-index for O(1) predecessor lookups on large columns.
|
||||
pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec<u32>) {
|
||||
if ranks.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(pgm_data) = self.get_or_build_pgm() {
|
||||
self.select_batch_with_pgm(pgm_data, ranks);
|
||||
} else {
|
||||
self.select_batch_linear(docid_start, ranks);
|
||||
}
|
||||
}
|
||||
|
||||
fn select_batch_with_pgm(&self, pgm_data: &PgmIndexData, ranks: &mut Vec<u32>) {
|
||||
let mut write_pos = 0;
|
||||
let mut last_doc: Option<u32> = None;
|
||||
|
||||
for i in 0..ranks.len() {
|
||||
let pos = ranks[i];
|
||||
let ub = pgm_data.index.upper_bound(&pgm_data.values, &pos);
|
||||
if ub == 0 {
|
||||
continue;
|
||||
}
|
||||
let doc = (ub - 1) as u32;
|
||||
if last_doc != Some(doc) {
|
||||
ranks[write_pos] = doc;
|
||||
write_pos += 1;
|
||||
last_doc = Some(doc);
|
||||
}
|
||||
}
|
||||
ranks.truncate(write_pos);
|
||||
}
|
||||
|
||||
fn select_batch_linear(&self, docid_start: DocId, ranks: &mut Vec<u32>) {
|
||||
let mut cur_doc = docid_start;
|
||||
let mut last_doc = None;
|
||||
|
||||
|
|
@ -144,12 +200,22 @@ impl MultiValueIndexV1 {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Index to resolve value range for given doc_id.
|
||||
/// Starts at 0.
|
||||
pub struct MultiValueIndexV2 {
|
||||
pub optional_index: OptionalIndex,
|
||||
pub start_index_column: Arc<dyn crate::ColumnValues<RowId>>,
|
||||
pgm_index: OnceLock<PgmIndexData>,
|
||||
}
|
||||
|
||||
impl Clone for MultiValueIndexV2 {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
optional_index: self.optional_index.clone(),
|
||||
start_index_column: self.start_index_column.clone(),
|
||||
pgm_index: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for MultiValueIndex {
|
||||
|
|
@ -282,21 +348,62 @@ impl MultiValueIndexV2 {
|
|||
self.optional_index.num_docs()
|
||||
}
|
||||
|
||||
fn get_or_build_pgm(&self) -> Option<&PgmIndexData> {
|
||||
let num_vals = self.start_index_column.num_vals();
|
||||
if num_vals < MIN_SIZE_FOR_PGM {
|
||||
return None;
|
||||
}
|
||||
Some(self.pgm_index.get_or_init(|| {
|
||||
let mut values = Vec::with_capacity(num_vals as usize);
|
||||
for i in 0..num_vals {
|
||||
values.push(self.start_index_column.get_val(i));
|
||||
}
|
||||
let index = OneLevelIndex::new(&values, 64).expect("PGM build failed");
|
||||
PgmIndexData { index, values }
|
||||
}))
|
||||
}
|
||||
|
||||
/// Converts a list of ranks (row ids of values) in a 1:n index to the corresponding list of
|
||||
/// docids. Positions are converted inplace to docids.
|
||||
///
|
||||
/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the
|
||||
/// index.
|
||||
///
|
||||
/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically
|
||||
/// increasing positions.
|
||||
///
|
||||
/// TODO: Instead of a linear scan we can employ a exponential search into binary search to
|
||||
/// match a docid to its value position.
|
||||
/// Uses PGM-index for O(1) predecessor lookups on large columns.
|
||||
pub(crate) fn select_batch_in_place(&self, docid_start: DocId, ranks: &mut Vec<u32>) {
|
||||
if ranks.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(pgm_data) = self.get_or_build_pgm() {
|
||||
self.select_batch_with_pgm(pgm_data, ranks);
|
||||
} else {
|
||||
self.select_batch_linear(docid_start, ranks);
|
||||
}
|
||||
}
|
||||
|
||||
fn select_batch_with_pgm(&self, pgm_data: &PgmIndexData, ranks: &mut Vec<u32>) {
|
||||
let mut write_pos = 0;
|
||||
let mut last_rank: Option<u32> = None;
|
||||
|
||||
for i in 0..ranks.len() {
|
||||
let pos = ranks[i];
|
||||
let ub = pgm_data.index.upper_bound(&pgm_data.values, &pos);
|
||||
if ub == 0 {
|
||||
continue;
|
||||
}
|
||||
let rank = (ub - 1) as u32;
|
||||
if last_rank != Some(rank) {
|
||||
ranks[write_pos] = rank;
|
||||
write_pos += 1;
|
||||
last_rank = Some(rank);
|
||||
}
|
||||
}
|
||||
ranks.truncate(write_pos);
|
||||
|
||||
for rank in ranks.iter_mut() {
|
||||
*rank = self.optional_index.select(*rank);
|
||||
}
|
||||
}
|
||||
|
||||
fn select_batch_linear(&self, docid_start: DocId, ranks: &mut Vec<u32>) {
|
||||
let mut cur_pos_in_idx = self.optional_index.rank(docid_start);
|
||||
let mut last_doc = None;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue