aggregation/terms: charge fused term_counts to the memory limit

term_counts (one u32/term) was allocated but not charged to
AggregationLimitsGuard, so a memory limit could be exceeded silently.
Charge it, skip allocating it when unbounded, and add a regression test.
This commit is contained in:
Pascal Seitz 2026-06-16 14:54:17 +02:00 committed by PSeitz
commit c096b2ad89

View file

@ -42,7 +42,8 @@ const MAX_FUSED_GRID_BUCKETS: usize = 16384;
#[derive(Debug)]
pub(crate) struct SegmentTermHistogramCollector {
/// Per-term count of docs *outside* `hard_bounds` (still in `doc_count`, but in no bucket).
/// Per-term total = this + the term's `counts` row-sum; all-zero when there are no bounds.
/// Per-term total = this + the term's `counts` row-sum; left empty when there are no hard
/// bounds (every doc is in-bounds, so there's no remainder to track).
term_counts: Vec<u32>,
/// Flattened `[num_terms * num_time_buckets]` histogram counters (`u32`, see
/// `term_counts`).
@ -84,17 +85,16 @@ impl SegmentAggregationCollector for SegmentTermHistogramCollector {
// Expand the flat grid back into the regular structures and reuse the shared builders, so
// ordering/cut-off/dict handling and cross-segment merging match the general path exactly.
let mut bucket_id_provider = BucketIdProvider::default();
// Per-term total = histogram row-sum (in-bounds) + `term_counts` (out-of-bounds remainder).
// Per-term total = histogram row-sum (in-bounds) + `term_counts` (out-of-bounds remainder,
// empty when there are no hard bounds).
let term_buckets = VecTermBuckets {
buckets: self
.term_counts
.iter()
.counts
.chunks_exact(self.num_time_buckets)
.enumerate()
.map(|(term_id, &out_of_bounds)| {
let in_bounds: u32 = self.counts
[term_id * self.num_time_buckets..(term_id + 1) * self.num_time_buckets]
.iter()
.sum();
.map(|(term_id, row)| {
let in_bounds: u32 = row.iter().sum();
let out_of_bounds = self.term_counts.get(term_id).copied().unwrap_or(0);
Bucket {
count: in_bounds + out_of_bounds,
bucket_id: bucket_id_provider.next_bucket_id(),
@ -258,17 +258,23 @@ pub(super) fn maybe_build_collector(
return Ok(None);
}
let counts = vec![0u32; num_terms * range.len];
agg_data
.context
.limits
.add_memory_consumed((counts.len() * std::mem::size_of::<u32>()) as u64)?;
// No hard bounds means every doc is in-bounds, letting `collect` short-circuit the bounds
// check.
// check — and leaving `term_counts` (the out-of-bounds remainder) unused, so we skip allocating
// it.
let all_docs_in_bounds =
hist_req_data.bounds.min == f64::MIN && hist_req_data.bounds.max == f64::MAX;
let counts = vec![0u32; num_terms * range.len];
let term_counts = if all_docs_in_bounds {
Vec::new()
} else {
vec![0u32; num_terms]
};
// Charge both grids to the aggregation memory limit.
agg_data.context.limits.add_memory_consumed(
((counts.len() + term_counts.len()) * std::mem::size_of::<u32>()) as u64,
)?;
Ok(Some(Box::new(SegmentTermHistogramCollector {
term_counts: vec![0u32; num_terms],
term_counts,
counts,
num_time_buckets: range.len,
base_pos: range.base_pos,
@ -284,7 +290,11 @@ pub(super) fn maybe_build_collector(
#[cfg(test)]
mod tests {
use crate::aggregation::agg_req::Aggregations;
use crate::aggregation::tests::{exec_request, get_test_index_from_values_and_terms};
use crate::aggregation::tests::{
exec_request, exec_request_with_query_and_memory_limit,
get_test_index_from_values_and_terms,
};
use crate::aggregation::AggregationLimitsGuard;
/// Hand-computed correctness check for the fused terms×histogram fast path
/// ([`super::SegmentTermHistogramCollector`]): low-cardinality terms × a histogram leaf over
@ -526,4 +536,50 @@ mod tests {
Ok(())
}
/// Regression: with hard bounds the fused path allocates `term_counts` (one `u32`/term) on top
/// of the grid, and that allocation must be charged to the memory limit. With many terms and a
/// single time bucket the two are equal in size, so a limit admitting the grid alone but not
/// grid + `term_counts` must fail.
#[test]
fn fused_term_histogram_hard_bounds_charges_term_counts() -> crate::Result<()> {
// 16k distinct terms, one doc each; values alternate in/out of the single-bucket bounds
// [5, 5] so the bounds bind and `term_counts` is allocated. num_terms=16000,
// num_time_buckets=1 => `counts` and `term_counts` are ~64 KB each.
let docs: Vec<(f64, String)> = (0..16_000u64)
.map(|i| (if i % 2 == 0 { 5.0 } else { 10.0 }, format!("t{i:05}")))
.collect();
let index = get_test_index_from_values_and_terms(true, &[docs])?;
let agg_req: Aggregations = serde_json::from_value(serde_json::json!({
"by_term": {
"terms": { "field": "string_id" },
"aggs": {
"histo": {
"histogram": {
"field": "score_f64",
"interval": 1.0,
"hard_bounds": { "min": 5.0, "max": 5.0 }
}
}
}
}
}))
.unwrap();
// ~96 KB admits the grid (~64 KB) but not grid + `term_counts` (~128 KB).
let err = exec_request_with_query_and_memory_limit(
agg_req,
&index,
None,
AggregationLimitsGuard::new(Some(96_000), None),
)
.unwrap_err();
assert!(
err.to_string().contains("memory limit was exceeded"),
"expected a memory-limit error, got: {err}"
);
Ok(())
}
}