[SPARK-57220][SQL] Extend block-chunked segment-tree window frame to shrinking frames#56291
[SPARK-57220][SQL] Extend block-chunked segment-tree window frame to shrinking frames#56291yadavay-amzn wants to merge 5 commits into
Conversation
…shrinking frames ### What changes were proposed in this pull request? Extends `SegmentTreeWindowFunctionFrame` (introduced in SPARK-56546 for sliding aggregates) to also handle shrinking frames of the form `... ROWS/RANGE BETWEEN <lower> AND UNBOUNDED FOLLOWING`. The class is parameterized with `ubound: Option[BoundOrdering]` (`None` = shrinking, `Some(ub)` = sliding) and a `fallbackFactory` for the small-partition path so the same machinery (build, spill, eligibility, metrics) serves both shapes. The dispatcher in `WindowEvaluatorFactoryBase` gains a shrinking-frame branch that consults the existing `eligibleForSegTree` gate and, on success, builds the unified frame with `ubound = None`. ### Why are the changes needed? The legacy `UnboundedFollowingWindowFunctionFrame` recomputes the suffix aggregate from scratch for every output row -- O(n * (n - 1) / 2). Its own scaladoc acknowledges this (`WindowFunctionFrame.scala:636`): > This is a very expensive operator to use, O(n * (n - 1) / 2), because > we need to maintain a buffer and must do full recalculation after each > row. The segment tree built by SPARK-56546 already supports arbitrary `[lower, upper)` queries; routing shrinking frames into it is purely a dispatch + parameter change. Workloads with shrinking frames -- common in retention / cohort / "remaining-lifetime" analytics -- become orders of magnitude faster. ### Does this PR introduce _any_ user-facing change? No. Same opt-in conf (`spark.sql.window.segmentTree.enabled`, default false), same eligibility allowlist (DeclarativeAggregate with mergeExpressions, no FILTER, no DISTINCT), same `minPartitionRows` fallback (now to `UnboundedFollowingWindowFunctionFrame` instead of `SlidingWindowFunctionFrame`), no analyzer / SQL grammar / plan-shape changes. ### How was this patch tested? New `UnboundedFollowingSegmentTreeSuite` (26 tests, all green): basic aggregates, ROWS lower-bound variations, multi-aggregate shared frame, single-row / empty / fallback partitions, NULL / NaN / Infinity, type coverage (Int/Long/Double/Decimal/String/Date/Timestamp), allowlist fallback, RANGE frames (uniform, non-uniform, ties, NULL keys, INTERVAL Timestamp), feature-flag off. Existing `SegmentTreeWindowFunctionSuite` (41 sliding tests), `WindowSegmentTreeSuite`, `WindowSegmentTreePropertySuite`, `WindowSegmentTreeMemorySuite`, `SegmentTreeWindowMetricsSuite`, `WindowSegmentTreeAllowlistSuite`, and `DataFrameWindowFunctionsSuite` all still pass (172 tests total, 0 failures), confirming the unified rewrite preserves sliding-frame semantics. Benchmark (`UnboundedFollowingWindowBenchmark`, JDK 17, EC2 c5.4xlarge): | N | naive | segtree | speedup | |------|-------------|---------|---------| | 5K | 620 ms | 73 ms | 8.5X | | 10K | 2 471 ms | 110 ms | 22.5X | | 25K | 14 259 ms | 119 ms | 119.3X | | 50K | 57 022 ms | 181 ms | 314.2X | | 100K | (~4 min) | 269 ms | -- | | 200K | (~16 min) | 480 ms | -- | Naive curve is clean O(N^2); segtree curve is sub-linear (logarithmic per-row). ### Was this patch authored or co-authored using generative AI tooling? Yes. Authored with assistance from Claude (Anthropic).
…king frames
The shrinking-frame branch in `WindowEvaluatorFactoryBase` previously
called `estimateMaxCachedBlocks(lower, UnboundedFollowing, ...)`, which
silently returns the default `Some(8)` because no `IntegerLiteral`
upper-bound case matches. That value is numerically correct but
misleading -- a reader inspecting the call site reasonably worries
that the LRU will thrash on partitions large enough to span more than
8 blocks (>= 512K rows at the default 64K block size).
In fact the shrinking-frame access pattern needs at most 2 cached
block-levels regardless of partition size:
- Middle blocks of `[lower, n)` are answered directly from the
always-resident `blockAggregates`, never via the per-block LRU.
- The lower-edge cursor advances monotonically with the output row,
so each partial block is needed for at most `blockSize`
consecutive queries and then never revisited.
- One slot for the active block plus one for brief overlap at the
boundary covers the entire pattern.
Replace the indirect call with `Some(2)` and a comment documenting
the shrinking-frame access pattern. Numerically equivalent to the
prior behaviour for any partition size; the change is documentation
about what the shrinking-frame path actually needs.
Tests: existing `UnboundedFollowingSegmentTreeSuite` (26) and
`SegmentTreeWindowFunctionSuite` (41) -- 67/67 pass. Scalastyle clean.
Was this patch authored or co-authored using generative AI tooling?
Yes. Authored with assistance from Claude (Anthropic).
|
cc @cloud-fan @yaooqinn this is a direct follow-up to SPARK-56546, would appreciate a look when convenient. The change extends the existing |
cloud-fan
left a comment
There was a problem hiding this comment.
0 blocking, 0 non-blocking, 3 nits.
Clean, well-tested follow-up to SPARK-56546. No design or correctness issues — the three nits are all comment-accuracy fixes, none affecting behavior.
Nits (3)
WindowEvaluatorFactoryBase.scala:286: thecacheHint = Some(2)comment's reasoning is inaccurate (the last block is also LRU-fetched on every query) — value is correct — see inlineSegmentTreeWindowFunctionFrame.scala:114: "sliding-dependent" → "shape-dependent" — see inlineSegmentTreeWindowFunctionFrame.scala:224-230:writeRow/writeRangeheader comment describes only the sliding shape — see inline
Verification
Traced the shrinking segtree path against the legacy UnboundedFollowingWindowFunctionFrame it replaces: equivalent on every input dimension — ROWS lower kinds (the drop-loop's lowerBound advances to index+offset exactly as the legacy inputIndex does), RANGE value bounds / ties / NULL order keys (same RangeBoundOrdering on both paths), empty & single-row partitions, empty frames (both emit aggregate identity), NULL/NaN/Inf aggregate inputs, the boundsChanged re-query skip, and per-partition state reset. The only inputs that would differ (multi-column RANGE, UNBOUNDED PRECEDING lower) are routed away by the existing eligibleForSegTree gate and the earlier dispatcher cases, so no input produces a different result.
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
…ationale and writeRow comment Address two of cloud-fan's review nits on PR apache#56291: 1. WindowEvaluatorFactoryBase.scala: the cacheHint=Some(2) rationale was incomplete -- it claimed shrinking frames only touch the LRU for the lower-edge partial block, but WindowSegmentTree.query also fetches the partition's last block via ensureBlockLevels(bhi) on every multi-block query. Rewrote the comment to reflect both LRU slots and warn against tuning the hint down to 1 (which would thrash by evicting the last block on every query). 2. SegmentTreeWindowFunctionFrame.scala: the writeRow/writeRange header comment described only the sliding admit-then-drop path. Restructured it to cover both shapes -- sliding (admit-then-drop, equivalence guarded by SegmentTreeWindowFunctionSuite) and shrinking (drop-only, equivalence guarded by UnboundedFollowingSegmentTreeSuite). Comment-only changes; no behavior change.
|
@cloud-fan Thanks for reviewing! Glad to hear there are no blocking comments, I've addressed the 3 nits by fixing the code comments as well. Ready for another look when you get the chance. Thanks again! |
cloud-fan
left a comment
There was a problem hiding this comment.
3 addressed, 0 remaining, 1 new. (1 new = 1 late catch — my miss from the earlier round, not something this round introduced.)
All three comment fixes look good.
Nits: 1 minor item (see inline comment) — the benchmark calibration comment references MAIN_N; the constant is A_N.
Verification
Re-verified the rewritten cacheHint = Some(2) rationale against WindowSegmentTree.query: a multi-block [lower, n) query touches the LRU for exactly the lower-edge partial block and the last block (mergeBlockRange(bhi, 0, ...) -> ensureBlockLevels(bhi)), middle blocks come from blockAggregates, and routing through estimateMaxCachedBlocks would indeed yield 8 — the new comment is accurate. Code is unchanged since the prior round's equivalence trace, which still stands.
PR description suggestions
- Update: the description still says the fallback type is "sliding-dependent"; the code comment was corrected to "shape-dependent" in this round — worth aligning the description.
What changes were proposed in this pull request?
Extends
SegmentTreeWindowFunctionFrame(introduced in SPARK-56546 for sliding aggregates) to also handle shrinking frames of the form... ROWS/RANGE BETWEENlowerAND UNBOUNDED FOLLOWING. The class is parameterized withubound: Option[BoundOrdering](None= shrinking,Some(ub)= sliding) and afallbackFactoryfor the small-partition path so the same machinery (build, spill viaTaskMemoryManager, eligibility allowlist, SQLMetrics) serves both shapes.The dispatcher in
WindowEvaluatorFactoryBasegains a shrinking-frame branch that consults the existingeligibleForSegTreegate and, on success, builds the unified frame withubound = None.Why are the changes needed?
The legacy
UnboundedFollowingWindowFunctionFramerecomputes the suffix aggregate from scratch for every output row — O(n · (n - 1) / 2). Its own scaladoc acknowledges this (WindowFunctionFrame.scala:636):The segment tree built by SPARK-56546 already supports arbitrary
[lower, upper)queries; routing shrinking frames into it is purely a dispatch + parameter change.Shrinking frames are common in retention / cohort / "remaining-lifetime" analytics. For partitions of 100K+ rows the legacy O(N²) path is infeasible.
Does this PR introduce any user-facing change?
No.
spark.sql.window.segmentTree.enabled(defaultfalse).mergeExpressions, no FILTER, no DISTINCT).minPartitionRowsfallback. The fallback type is now sliding-dependent:SlidingWindowFunctionFramefor moving frames,UnboundedFollowingWindowFunctionFramefor shrinking frames.How was this patch tested?
New
UnboundedFollowingSegmentTreeSuitemirrorsSegmentTreeWindowFunctionSuite's structure with oracle-vs-naive equivalence over ROWS/RANGE frames, NULL/NaN, multi-aggregate, type coverage, and fallback paths. All existing window suites still pass with the unified rewrite.Benchmark —
UnboundedFollowingWindowBenchmarkon Linux x86_64 (Intel Xeon Platinum 8259CL @ 2.50GHz, OpenJDK 17.0.19+10-LTS), single-partitionSUM(v) OVER (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING):Naive is clean O(N²); segtree is sub-linear. Full results checked in at
sql/core/benchmarks/UnboundedFollowingWindowBenchmark-results.txt.Was this patch authored or co-authored using generative AI tooling?
Yes. Authored with assistance from Claude (Anthropic).