mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-06-19 09:16:45 +00:00
add specialized TermHistogram
This commit is contained in:
parent
05f4c02ac5
commit
2c8536ab11
3 changed files with 484 additions and 17 deletions
|
|
@ -244,7 +244,7 @@ impl Display for HistogramBounds {
|
|||
}
|
||||
|
||||
impl HistogramBounds {
|
||||
fn contains(&self, val: f64) -> bool {
|
||||
pub(crate) fn contains(&self, val: f64) -> bool {
|
||||
val >= self.min && val <= self.max
|
||||
}
|
||||
}
|
||||
|
|
@ -317,11 +317,11 @@ impl<B: BucketIdSlot> SegmentHistogramBucketEntry<B> {
|
|||
/// the histogram bounds). Buckets in `[base_pos, base_pos + len)` can be stored in a flat `Vec`
|
||||
/// indexed by `bucket_pos - base_pos`, avoiding the hash map on the hot path.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct DenseRange {
|
||||
pub(crate) struct DenseRange {
|
||||
/// `bucket_pos` mapped to index 0 of the dense `Vec`.
|
||||
base_pos: i64,
|
||||
pub(crate) base_pos: i64,
|
||||
/// Number of bucket positions in the range.
|
||||
len: usize,
|
||||
pub(crate) len: usize,
|
||||
}
|
||||
|
||||
/// Storage for the histogram buckets of a single parent bucket.
|
||||
|
|
@ -444,8 +444,6 @@ pub struct SegmentHistogramCollector<B> {
|
|||
/// Theoretical bucket range derived from the column min/max, if dense `Vec` storage is
|
||||
/// viable. `None` keeps every parent bucket in the sparse hash map.
|
||||
dense_range: Option<DenseRange>,
|
||||
|
||||
small_column_block_accessor: columnar::ColumnBlockAccessor<u32>,
|
||||
}
|
||||
|
||||
impl<B: BucketIdSlot> SegmentAggregationCollector for SegmentHistogramCollector<B> {
|
||||
|
|
@ -601,15 +599,7 @@ impl<B: BucketIdSlot> SegmentHistogramCollector<B> {
|
|||
None
|
||||
};
|
||||
let mut req_data = agg_data.per_request.histogram_req_data[node.idx_in_req_data].clone();
|
||||
req_data.req.validate()?;
|
||||
if req_data.field_type == ColumnType::DateTime && !req_data.is_date_histogram {
|
||||
req_data.req.normalize_date_time();
|
||||
}
|
||||
req_data.bounds = req_data.req.hard_bounds.unwrap_or(HistogramBounds {
|
||||
min: f64::MIN,
|
||||
max: f64::MAX,
|
||||
});
|
||||
req_data.offset = req_data.req.offset.unwrap_or(0.0);
|
||||
normalize_histogram_req(&mut req_data)?;
|
||||
agg_data
|
||||
.context
|
||||
.limits
|
||||
|
|
@ -629,11 +619,91 @@ impl<B: BucketIdSlot> SegmentHistogramCollector<B> {
|
|||
req_data,
|
||||
bucket_id_provider: BucketIdProvider::default(),
|
||||
dense_range,
|
||||
small_column_block_accessor: columnar::ColumnBlockAccessor::default(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentHistogramCollector<()> {
|
||||
/// Builds a histogram collector whose parent `t` is a dense histogram filled from
|
||||
/// `counts[t * num_time_buckets .. (t + 1) * num_time_buckets]` (row-major). Used by the fused
|
||||
/// terms×histogram collector to turn its flat 2D counters into the regular intermediate result,
|
||||
/// so cross-segment merging is shared with the general path.
|
||||
pub(crate) fn from_dense_rows(
|
||||
req_data: HistogramAggReqData,
|
||||
base_pos: i64,
|
||||
num_time_buckets: usize,
|
||||
counts: &[u32],
|
||||
) -> Self {
|
||||
let interval = req_data.req.interval;
|
||||
let offset = req_data.offset;
|
||||
let num_parents = if num_time_buckets == 0 {
|
||||
0
|
||||
} else {
|
||||
counts.len() / num_time_buckets
|
||||
};
|
||||
let parent_buckets = (0..num_parents)
|
||||
.map(|t| {
|
||||
let row = &counts[t * num_time_buckets..(t + 1) * num_time_buckets];
|
||||
let buckets = row
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(b, &doc_count)| SegmentHistogramBucketEntry {
|
||||
key: get_bucket_key_from_pos(
|
||||
(base_pos + b as i64) as f64,
|
||||
interval,
|
||||
offset,
|
||||
),
|
||||
doc_count: doc_count as u64,
|
||||
bucket_id: (),
|
||||
})
|
||||
.collect();
|
||||
HistogramBuckets::Dense { base_pos, buckets }
|
||||
})
|
||||
.collect();
|
||||
Self {
|
||||
parent_buckets,
|
||||
sub_agg: None,
|
||||
req_data,
|
||||
bucket_id_provider: BucketIdProvider::default(),
|
||||
dense_range: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates and normalizes a histogram request in place: applies date ns-normalization (for a
|
||||
/// `histogram` on a date column) and resolves `bounds`/`offset` from the request.
|
||||
fn normalize_histogram_req(req_data: &mut HistogramAggReqData) -> crate::Result<()> {
|
||||
req_data.req.validate()?;
|
||||
if req_data.field_type == ColumnType::DateTime && !req_data.is_date_histogram {
|
||||
req_data.req.normalize_date_time();
|
||||
}
|
||||
req_data.bounds = req_data.req.hard_bounds.unwrap_or(HistogramBounds {
|
||||
min: f64::MIN,
|
||||
max: f64::MAX,
|
||||
});
|
||||
req_data.offset = req_data.req.offset.unwrap_or(0.0);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clones and normalizes (resolving interval/offset/bounds) the histogram request at `node`, and
|
||||
/// returns it together with its dense bucket range — or `None` if the column has no usable range.
|
||||
/// Used by the fused terms×histogram collector, which then owns the normalized request.
|
||||
pub(crate) fn prepare_histogram_dense_range(
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
) -> crate::Result<Option<(HistogramAggReqData, DenseRange)>> {
|
||||
let mut req_data = agg_data.per_request.histogram_req_data[node.idx_in_req_data].clone();
|
||||
normalize_histogram_req(&mut req_data)?;
|
||||
let dense_range = compute_dense_range(
|
||||
&req_data.accessor,
|
||||
req_data.field_type,
|
||||
req_data.req.interval,
|
||||
req_data.offset,
|
||||
req_data.bounds,
|
||||
);
|
||||
Ok(dense_range.map(|range| (req_data, range)))
|
||||
}
|
||||
|
||||
/// Builds a boxed histogram (or date histogram) segment collector, picking the bucket-id storage
|
||||
/// based on whether there are sub aggregations: `()` (no id stored) when there are none, otherwise
|
||||
/// [`BucketId`].
|
||||
|
|
@ -653,7 +723,7 @@ pub(crate) fn build_segment_histogram_collector(
|
|||
}
|
||||
|
||||
#[inline]
|
||||
fn get_bucket_pos_f64(val: f64, interval: f64, offset: f64) -> f64 {
|
||||
pub(crate) fn get_bucket_pos_f64(val: f64, interval: f64, offset: f64) -> f64 {
|
||||
((val - offset) / interval).floor()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,8 @@ use crate::aggregation::{format_date, BucketId, Key};
|
|||
use crate::error::DataCorruption;
|
||||
use crate::TantivyError;
|
||||
|
||||
mod term_histogram;
|
||||
|
||||
/// Contains all information required by the SegmentTermCollector to perform the
|
||||
/// terms aggregation on a segment.
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -377,6 +379,18 @@ pub(crate) fn build_segment_term_collector(
|
|||
let max_term_id: u64 =
|
||||
col_max_value.max(terms_req_data.missing_value_for_accessor.unwrap_or(0u64));
|
||||
|
||||
// Fused fast path: low-cardinality terms × a single `histogram`/`date_histogram` leaf over full
|
||||
// columns with a small enough bucket grid. Anything else falls through to the general path.
|
||||
if let Some(collector) = term_histogram::maybe_build_collector(
|
||||
req_data,
|
||||
node,
|
||||
&terms_req_data,
|
||||
max_term_id,
|
||||
is_top_level,
|
||||
)? {
|
||||
return Ok(collector);
|
||||
}
|
||||
|
||||
let sub_agg_collector = if has_sub_aggregations {
|
||||
Some(build_segment_agg_collectors(req_data, &node.children)?)
|
||||
} else {
|
||||
383
src/aggregation/bucket/term_agg/term_histogram.rs
Normal file
383
src/aggregation/bucket/term_agg/term_histogram.rs
Normal file
|
|
@ -0,0 +1,383 @@
|
|||
//! Fused collector for the very common shape `terms` (low cardinality) × a single
|
||||
//! `histogram`/`date_histogram` sub-aggregation with nothing nested below it.
|
||||
//!
|
||||
//! See [`SegmentTermHistogramCollector`] for the approach and [`maybe_build_collector`] for the
|
||||
//! conditions under which it is used.
|
||||
|
||||
use columnar::ColumnBlockAccessor;
|
||||
|
||||
use super::{Bucket, SegmentTermCollector, TermsAggReqData, VecTermBuckets};
|
||||
use crate::aggregation::agg_data::{AggKind, AggRefNode, AggregationsSegmentCtx};
|
||||
use crate::aggregation::bucket::{
|
||||
get_bucket_pos_f64, prepare_histogram_dense_range, HistogramAggReqData,
|
||||
SegmentHistogramCollector,
|
||||
};
|
||||
use crate::aggregation::buffered_sub_aggs::LowCardSubAggBuffer;
|
||||
use crate::aggregation::intermediate_agg_result::{
|
||||
IntermediateAggregationResult, IntermediateAggregationResults,
|
||||
};
|
||||
use crate::aggregation::segment_agg_result::{BucketIdProvider, SegmentAggregationCollector};
|
||||
use crate::aggregation::{f64_from_fastfield_u64, BucketId};
|
||||
|
||||
/// Maximum number of cells (`num_terms × num_time_buckets`) in the fused flat 2D grid. Above this
|
||||
/// the grid would be too large/cache-unfriendly, so we fall back to the general buffered path.
|
||||
/// `1 << 14` cells = 128 KB of `u64` counters, comfortably L2-resident.
|
||||
const MAX_FUSED_GRID_BUCKETS: usize = 1 << 14;
|
||||
|
||||
/// Fused collector for `terms` (low cardinality) × a single `histogram`/`date_histogram` leaf with
|
||||
/// nothing nested below it, when the resulting `num_terms × num_time_buckets` grid is small (see
|
||||
/// [`MAX_FUSED_GRID_BUCKETS`]).
|
||||
///
|
||||
/// It keeps a flat, fully dense 2D counter grid (`counts[term * num_time_buckets + bucket]`) and a
|
||||
/// per-term total. A single pass reads both the term and histogram columns in document order and
|
||||
/// bumps the counters directly — no doc-id buffering, no per-term scattered re-fetch, no dynamic
|
||||
/// dispatch on flush, no per-bucket key/id storage during collection (keys are derived from the
|
||||
/// index at the end).
|
||||
///
|
||||
/// At result time the flat grid is expanded back into the regular term map + histogram storage and
|
||||
/// handed to the shared intermediate-result builders, so cross-segment merging is identical to the
|
||||
/// general path.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SegmentTermHistogramCollector {
|
||||
/// `[num_terms]` total doc count per term bucket (independent of the histogram bounds).
|
||||
/// `u32` is enough: a per-segment count can't exceed the segment's doc count (`DocId` is
|
||||
/// `u32`); the fused path is only taken when `num_docs < u32::MAX` (see
|
||||
/// `maybe_build_collector`).
|
||||
term_counts: Vec<u32>,
|
||||
/// Flat row-major `[num_terms * num_time_buckets]` histogram counters (`u32`, see
|
||||
/// `term_counts`).
|
||||
counts: Vec<u32>,
|
||||
/// Histogram buckets per term (the dense time-range length).
|
||||
num_time_buckets: usize,
|
||||
/// `bucket_pos` mapped to time-bucket index 0.
|
||||
base_pos: i64,
|
||||
terms_req_data: TermsAggReqData,
|
||||
/// The (cloned, normalized) histogram request: its column + interval/offset/bounds.
|
||||
hist_req_data: HistogramAggReqData,
|
||||
/// Private block accessors for both columns. We read them together, so each needs its own
|
||||
/// (the shared `agg_data` scratch accessor only holds one block at a time). Owning them keeps
|
||||
/// `collect` independent of `agg_data`.
|
||||
term_block: ColumnBlockAccessor<u64>,
|
||||
hist_block: ColumnBlockAccessor<u64>,
|
||||
}
|
||||
|
||||
impl SegmentAggregationCollector for SegmentTermHistogramCollector {
|
||||
fn add_intermediate_aggregation_result(
|
||||
&mut self,
|
||||
agg_data: &AggregationsSegmentCtx,
|
||||
results: &mut IntermediateAggregationResults,
|
||||
parent_bucket_id: BucketId,
|
||||
) -> crate::Result<()> {
|
||||
debug_assert_eq!(
|
||||
parent_bucket_id, 0,
|
||||
"fused term-histogram collector is top-level only"
|
||||
);
|
||||
// Expand the flat grid back into the regular structures and reuse the shared builders, so
|
||||
// ordering/cut-off/dict handling and cross-segment merging match the general path exactly.
|
||||
let mut bucket_id_provider = BucketIdProvider::default();
|
||||
let term_buckets = VecTermBuckets {
|
||||
buckets: self
|
||||
.term_counts
|
||||
.iter()
|
||||
.map(|&count| Bucket {
|
||||
count,
|
||||
bucket_id: bucket_id_provider.next_bucket_id(),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
let mut histogram = SegmentHistogramCollector::<()>::from_dense_rows(
|
||||
self.hist_req_data.clone(),
|
||||
self.base_pos,
|
||||
self.num_time_buckets,
|
||||
&self.counts,
|
||||
);
|
||||
let name = self.terms_req_data.name.clone();
|
||||
let bucket = SegmentTermCollector::<VecTermBuckets, LowCardSubAggBuffer>::into_intermediate_bucket_result(
|
||||
&self.terms_req_data,
|
||||
Some(&mut histogram as &mut dyn SegmentAggregationCollector),
|
||||
term_buckets,
|
||||
agg_data,
|
||||
)?;
|
||||
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn collect(
|
||||
&mut self,
|
||||
parent_bucket_id: BucketId,
|
||||
docs: &[crate::DocId],
|
||||
_agg_data: &mut AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
debug_assert_eq!(
|
||||
parent_bucket_id, 0,
|
||||
"fused term-histogram collector is top-level only"
|
||||
);
|
||||
|
||||
// Fetch both columns into our own accessors (we read them together, so they can't share the
|
||||
// single `agg_data` scratch accessor). The collector owns all its inputs, so `collect`
|
||||
// doesn't touch `agg_data`.
|
||||
self.term_block
|
||||
.fetch_block(docs, &self.terms_req_data.accessor);
|
||||
self.hist_block
|
||||
.fetch_block(docs, &self.hist_req_data.accessor);
|
||||
|
||||
let field_type = self.hist_req_data.field_type;
|
||||
let bounds = self.hist_req_data.bounds;
|
||||
let interval = self.hist_req_data.req.interval;
|
||||
let offset = self.hist_req_data.offset;
|
||||
let base_pos = self.base_pos;
|
||||
let num_time_buckets = self.num_time_buckets;
|
||||
let term_counts = &mut self.term_counts;
|
||||
let counts = &mut self.counts;
|
||||
|
||||
// Single fused pass: both columns are full (checked at construction), so their values align
|
||||
// with `docs` positionally and we zip them. `term_id` is a dense `Vec` index here, and the
|
||||
// histogram bucket is guaranteed inside `[0, num_time_buckets)` because the dense range is
|
||||
// derived from the column min/max that bounds every value.
|
||||
for (term_id, hist_raw) in self.term_block.iter_vals().zip(self.hist_block.iter_vals()) {
|
||||
let term_id = term_id as usize;
|
||||
term_counts[term_id] += 1;
|
||||
let val = f64_from_fastfield_u64(hist_raw, field_type);
|
||||
if bounds.contains(val) {
|
||||
let bucket = (get_bucket_pos_f64(val, interval, offset) as i64 - base_pos) as usize;
|
||||
debug_assert!(
|
||||
bucket < num_time_buckets,
|
||||
"histogram bucket outside dense range"
|
||||
);
|
||||
counts[term_id * num_time_buckets + bucket] += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush(&mut self, _agg_data: &mut AggregationsSegmentCtx) -> crate::Result<()> {
|
||||
// Nothing is buffered: `collect` writes the flat grid directly.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_max_bucket(
|
||||
&mut self,
|
||||
_max_bucket: BucketId,
|
||||
_agg_data: &AggregationsSegmentCtx,
|
||||
) -> crate::Result<()> {
|
||||
// Top-level: the flat grid is allocated up front.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_metric_value(
|
||||
&self,
|
||||
_bucket_id: BucketId,
|
||||
_sub_agg_name: &str,
|
||||
_sub_agg_property: &str,
|
||||
_agg_data: &AggregationsSegmentCtx,
|
||||
) -> Option<f64> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Builds the fused terms×histogram collector for a single top-level parent, when the shape is
|
||||
/// eligible. Returns `Ok(None)` to fall back to the general buffered terms path.
|
||||
///
|
||||
/// Eligibility: top-level, low-cardinality terms over a full column with no missing/include-exclude
|
||||
/// handling; a single `histogram`/`date_histogram` leaf (no nesting below it) over a full column;
|
||||
/// and a `num_terms × num_time_buckets` grid no larger than [`MAX_FUSED_GRID_BUCKETS`].
|
||||
pub(super) fn maybe_build_collector(
|
||||
agg_data: &mut AggregationsSegmentCtx,
|
||||
node: &AggRefNode,
|
||||
terms_req_data: &TermsAggReqData,
|
||||
max_term_id: u64,
|
||||
is_top_level: bool,
|
||||
) -> crate::Result<Option<Box<dyn SegmentAggregationCollector>>> {
|
||||
// Both columns must be full (one value per doc) so their values align positionally with `docs`
|
||||
// and we can zip them. Requiring full columns also makes the terms agg's `missing` config a
|
||||
// no-op (`fetch_block_with_missing` early-returns on full columns), so we needn't check for it.
|
||||
//
|
||||
// We don't cap the term cardinality here: the flat grid is bounded by the total cell count
|
||||
// (`num_terms * num_time_buckets <= MAX_FUSED_GRID_BUCKETS`) checked below, which subsumes it.
|
||||
let fuseable = is_top_level
|
||||
&& terms_req_data.allowed_term_ids.is_none()
|
||||
&& terms_req_data.accessor.get_cardinality().is_full()
|
||||
// The flat counters are `u32`; a per-segment count can't exceed the doc count, so this
|
||||
// guarantees no overflow (essentially always true, as `DocId` is `u32`).
|
||||
&& terms_req_data.accessor.num_docs() < u32::MAX
|
||||
&& node.children.len() == 1
|
||||
&& matches!(
|
||||
node.children[0].kind,
|
||||
AggKind::Histogram | AggKind::DateHistogram
|
||||
)
|
||||
&& node.children[0].children.is_empty()
|
||||
&& agg_data.per_request.histogram_req_data[node.children[0].idx_in_req_data]
|
||||
.accessor
|
||||
.get_cardinality()
|
||||
.is_full();
|
||||
if !fuseable {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Clone + normalize the histogram request and get its dense bucket range; only take the fused
|
||||
// path when the flat `num_terms × num_time_buckets` grid is small enough.
|
||||
let Some((hist_req_data, range)) = prepare_histogram_dense_range(agg_data, &node.children[0])?
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let num_terms = (max_term_id + 1) as usize;
|
||||
if num_terms.saturating_mul(range.len) > MAX_FUSED_GRID_BUCKETS {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let counts = vec![0u32; num_terms * range.len];
|
||||
agg_data
|
||||
.context
|
||||
.limits
|
||||
.add_memory_consumed((counts.len() * std::mem::size_of::<u32>()) as u64)?;
|
||||
Ok(Some(Box::new(SegmentTermHistogramCollector {
|
||||
term_counts: vec![0u32; num_terms],
|
||||
counts,
|
||||
num_time_buckets: range.len,
|
||||
base_pos: range.base_pos,
|
||||
terms_req_data: terms_req_data.clone(),
|
||||
hist_req_data,
|
||||
term_block: ColumnBlockAccessor::default(),
|
||||
hist_block: ColumnBlockAccessor::default(),
|
||||
})))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::aggregation::agg_req::Aggregations;
|
||||
use crate::aggregation::tests::{exec_request, get_test_index_from_values_and_terms};
|
||||
|
||||
/// Hand-computed correctness check for the fused terms×histogram fast path
|
||||
/// ([`super::SegmentTermHistogramCollector`]): low-cardinality terms × a histogram leaf over
|
||||
/// full columns, exercised single- and multi-segment.
|
||||
#[test]
|
||||
fn fused_term_histogram_test() -> crate::Result<()> {
|
||||
fused_term_histogram_with_opt(false)?;
|
||||
fused_term_histogram_with_opt(true)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn fused_term_histogram_with_opt(merge_segments: bool) -> crate::Result<()> {
|
||||
// 300 docs: term = {a, b, c} by i % 3, histogram value = i % 20 (interval 1 => buckets
|
||||
// 0..19). gcd(3, 20) = 1, so every (term, bucket) pair occurs exactly 300 / 60 = 5 times.
|
||||
let docs: Vec<(f64, String)> = (0..300u64)
|
||||
.map(|i| {
|
||||
(
|
||||
(i % 20) as f64,
|
||||
["a", "b", "c"][(i % 3) as usize].to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
// Two segments, to also exercise cross-segment merging of the fused per-term histograms.
|
||||
let segments = vec![docs[..150].to_vec(), docs[150..].to_vec()];
|
||||
let index = get_test_index_from_values_and_terms(merge_segments, &segments)?;
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(serde_json::json!({
|
||||
"by_term": {
|
||||
"terms": { "field": "string_id", "order": { "_key": "asc" } },
|
||||
"aggs": {
|
||||
"histo": { "histogram": { "field": "score_f64", "interval": 1.0 } }
|
||||
}
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
|
||||
for (term_idx, term) in ["a", "b", "c"].iter().enumerate() {
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["key"], *term);
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["doc_count"], 100);
|
||||
let histo = &res["by_term"]["buckets"][term_idx]["histo"]["buckets"];
|
||||
for b in 0..20usize {
|
||||
assert_eq!(histo[b]["key"], b as f64, "term {term} bucket {b}");
|
||||
assert_eq!(histo[b]["doc_count"], 5, "term {term} bucket {b}");
|
||||
}
|
||||
assert_eq!(histo[20], serde_json::Value::Null);
|
||||
}
|
||||
assert_eq!(res["by_term"]["buckets"][3], serde_json::Value::Null);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A `missing` config on a *full* term column still takes the fused path (the string sentinel
|
||||
/// is just `col_max + 1`, so the column stays low-cardinality). Since no doc is missing, the
|
||||
/// real term buckets must be exactly as without `missing`.
|
||||
#[test]
|
||||
fn fused_term_histogram_with_missing_on_full_column() -> crate::Result<()> {
|
||||
let docs: Vec<(f64, String)> = (0..300u64)
|
||||
.map(|i| {
|
||||
(
|
||||
(i % 20) as f64,
|
||||
["a", "b", "c"][(i % 3) as usize].to_string(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
let index = get_test_index_from_values_and_terms(true, &[docs])?;
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(serde_json::json!({
|
||||
"by_term": {
|
||||
"terms": { "field": "string_id", "missing": "MISSING", "order": { "_key": "asc" } },
|
||||
"aggs": {
|
||||
"histo": { "histogram": { "field": "score_f64", "interval": 1.0 } }
|
||||
}
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
|
||||
// Column is full, so "MISSING" never applies: a, b, c are unchanged (100 docs, 5 per
|
||||
// bucket).
|
||||
for (term_idx, term) in ["a", "b", "c"].iter().enumerate() {
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["key"], *term);
|
||||
assert_eq!(res["by_term"]["buckets"][term_idx]["doc_count"], 100);
|
||||
let histo = &res["by_term"]["buckets"][term_idx]["histo"]["buckets"];
|
||||
for b in 0..20usize {
|
||||
assert_eq!(histo[b]["doc_count"], 5, "term {term} bucket {b}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Term cardinality above the general path's `MAX_NUM_TERMS_FOR_VEC` (100) still fuses: the
|
||||
/// flat grid is bounded by the total cell count (`num_terms * num_time_buckets`), not the
|
||||
/// term count.
|
||||
#[test]
|
||||
fn fused_term_histogram_many_terms() -> crate::Result<()> {
|
||||
let num_terms = 150usize;
|
||||
let docs_per_term = 2usize;
|
||||
// All docs share histogram value 0 (a single bucket), so the grid is 150 x 1 = 150 cells.
|
||||
let docs: Vec<(f64, String)> = (0..num_terms * docs_per_term)
|
||||
.map(|i| (0.0, format!("t{:03}", i % num_terms)))
|
||||
.collect();
|
||||
let index = get_test_index_from_values_and_terms(true, &[docs])?;
|
||||
|
||||
let agg_req: Aggregations = serde_json::from_value(serde_json::json!({
|
||||
"by_term": {
|
||||
"terms": { "field": "string_id", "size": 1000, "order": { "_key": "asc" } },
|
||||
"aggs": {
|
||||
"histo": { "histogram": { "field": "score_f64", "interval": 1.0 } }
|
||||
}
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let res = exec_request(agg_req, &index)?;
|
||||
|
||||
let buckets = res["by_term"]["buckets"].as_array().unwrap();
|
||||
assert_eq!(buckets.len(), num_terms);
|
||||
for (i, bucket) in buckets.iter().enumerate() {
|
||||
assert_eq!(bucket["key"], format!("t{i:03}"));
|
||||
assert_eq!(bucket["doc_count"], docs_per_term as u64);
|
||||
assert_eq!(bucket["histo"]["buckets"][0]["key"], 0.0);
|
||||
assert_eq!(
|
||||
bucket["histo"]["buckets"][0]["doc_count"],
|
||||
docs_per_term as u64
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue