top of page

Part 4: Deep Dive - Spark Window Functions

  • Writer: Rishaab
    Rishaab
  • Nov 29, 2024
  • 6 min read

Updated: Dec 2, 2024

In part 3, we explored the internal workings of AggregateProcessor and how it evaluates expressions using our rank() window function example. In this post, we will gain a deep understanding of the WindowFunctionFrame.


Since Spark provides several different types of frames, it's not feasible to cover each one in this post. Therefore, we will concentrate on one specific frame type called UnboundedPrecedingWindowFunctionFrame, also used by our rank function example from part 1. Once we understand the workings of the UnboundedPrecedingWindowFunctionFrame, understanding others should be fairly straightforward.


Please see part 3, where we thoroughly explored the implementation details of the AggregateProcessor. The subsequent section presumes you already possess the necessary understanding of it.



BoundOrdering

As mentioned previously, a WindowFunctionFrame is a subset of rows associated with a particular partition over which a WindowFunction operates. As such, the WindowFunctionFrame requires a mechanism to know exactly which rows to be included in the current frame getting processed. For this WindowFunctionFrame uses BoundOrdering which is a comparator that identifies if a particular row should be included in the frame boundary under execution.


Spark provides two variants for it - RowBoundOrdering and RangeBoundOrdering.


RowBoundOrdering compares the indices of rows to determine which rows to include in the frame. The rank function example code from part 1 uses RowBoundOrdering.


RangeBoundOrdering compares the values of rows to determine which rows to include in the frame. The following query uses range-based ordering:

RANGE BETWEEN 40 PRECEDING AND 50 FOLLOWING


To understand BoundOrdering, let's take a simple case which used RowBoundOrdering.

SELECT 
    Employee,
    Sales,
    SUM(Sales) OVER (
        ORDER BY Row_Number
        ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING
    ) AS Running_Total
FROM SalesData;

Here, we aim to calculate the sum of Sales using a window frame that includes all rows from the beginning of the dataset (UNBOUNDED PRECEDING) up to one row following the current row (1 FOLLOWING).



RowBoundOrdering being a comparator is implemented like so (renamed variables for brevity),

def compare(
    toIncludeRow: InternalRow,
    toIncludeRowIndex: Int,
    inputRow: InternalRow,
    inputRowIndex: Int): Int =
    toIncludeRowIndex - (inputRowIndex + offset)
}

In this implementation, toIncludeRow refers to a row being considered for inclusion in the current frame, while inputRow is the row for which the Running_Total is being calculated. The offset determines the relative position for the frame's upper bound, which, in the given query, is set to 1 (ie. 1 FOLLOWING).


A row will be included in the frame if the comparator returns a value <= 0. In other words, a row toIncludeRow will be included in the frame if it precedes the row following the inputRow.



Now applying the comparator, the Running_Total will look like so,

Row Number

Employee

Sales

Running_Total

1

Bill Gates

100

300

2

Elon Musk

200

600

3

Jeff Bezos

300

600

Since the frame upper bound should include the immediate next-row following the input row getting processed, the frame boundary will look like so,

  • For Bill Gates, the frame boundary will be [100 to 200] with Running_Total = 100 + 200

  • For Elon Musk, the frame boundary will be [100 to 300] with Running_Total = 100 + 200 +300

  • For Jeff Bezos, the frame boundary will also be [100 to 300] as there is no next row in the dataset, hence Running_Total = 100 + 200 + 300




UnboundedPrecedingWindowFunctionFrame - Window Function Frame

The term unbounded preceding indicates that the frame boundary starts from the beginning (ie. unbounded) of the partition and ends at some specified upper bound.


For our rank() function example, the lower bound was the first row and the upper bound was the current row getting processed. In other words, it was computing the rank of a particular row from all rows preceding it.



Now let's take a moment to understand what parameters this frame accepts.

final class UnboundedPrecedingWindowFunctionFrame(
    target: InternalRow,
    processor: AggregateProcessor,
    ubound: BoundOrdering)
  extends WindowFunctionFrame

  • The target parameter is an InternalRow where the result of a frame output will be stored. The target is updated each time an input row in processed by the frame. The target is supplied by the Window function physical operator and also the one consuming the result. This is where the rank for a row will be written.

  • The processor is of type AggregateProcessor (discussed with depth in part 3).

  • The ubound is the BoundOrdering which we discussed before.



The UnboundedPrecedingWindowFunctionFrame like any other frame inherits from WindowFunctionFrame, it must override certain methods. We will go over these methods one by one now,



  • Preparing frame

A frame must be prepared before being used and prepare(rows: ExternalAppendOnlyUnsafeRowArray) is used for this.


It accepts dataset rows as parameter, which essentially is an array of UnsafeRows associated with a particular partition and already sorted. The frame sets its input field from this provided rows and initializes the corresponding inputIterator. E.g. for partition-1 from our example dataset in part 1, the input field will point to the following UnsafeRows: [[0,1,1800000002,3161], [0,1,1800000002,3261]]


Next, the processor.initialize method (discussed in  part 3) initializes the AggregateProcessor.


To put into perspective, the inputIterator is responsible for filling in a frame with rows and the frame result is evaluated using AggregateProcessor.


  • Computing row and writing result 

To compute an input row, e.g. to find the rank() of a particular row, the write(index: Int, current: InternalRow) method is invoked.


Before going into the implementation details of write(), remember, our rank function example uses frame like so,

rowsBetween(Window.unboundedPreceding, Window.currentRow)

This means that the upper limit of our frame is the row currently being processed for rank. Hence to calculate the rank of a row, the frame will include rows from beginning until the current row.


However, we don't have to fill rows from the beginning each time a rank is getting computed for a row, as this will be very inefficient. The UnboundedPrecedingWindowFunctionFrame provides efficient processing by making use of the AggregateProcessor, eliminating the need to repeatedly going to the beginning of the partition.


Let's understand write() holistically doing the dry run,

while (nextRow != null && ubound.compare(nextRow, inputIndex, 
	   current, index) <= 0) {
    if (processor != null) {
        processor.update(nextRow)
    }
    nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
    inputIndex += 1
    bufferUpdated = true
}

Here, the current is the row for which the rank is getting calculated and nextRow points to some row in the input array (see Preparing frame section)



Consider partition-1 for our example-dataset, for which,

  • input: [[0,1,1800000002,3161], [0,1,1800000002,3261]]

  • nextRow: [0,1,1800000002,3161]


Let's calculate rank for rows in partition-1.

For the first row, current = [0,1,1800000002,3161], index = 0

For the first row there is no row preceding it and the upper bound itself ends at the current row. Hence, ubound will compare current and nextRow which are same, the condition evaluated to true and we enter the loop body. In the loop, it will update the processor with the current row by calling processor.update([0,1,1800000002,3161])

The processor.update will evaluate the update expression and compute the rank details internally. For this first row, the processor will update its internal buffer with rank set to 1.

Then, the nextRow will be updated from the input array. It will now point to [0,1,1800000002,3261]. The inputIndex will be update to 1. And bufferUpdate will be set to true. The while-loop will again run for the current row  [0,1,1800000002,3161] but ubound condition will result in false because the nextRow follows the current row and our frame has upper limit until the current row.

With only one row being update by the processor, the while loop will end.

The following statement will be executed next,

if (processor != null && bufferUpdated) {
    processor.evaluate(target)
}

Here, the if-condition evaluates to true because we have updated the processor buffer. Hence, processor.evaluate(target) will be invoked. The target is the output row (discussed before) where the rank result will be written.


Since this whole control-flow is getting executed for the current row [0,1,1800000002,3161], which is the first row in the partition, the target row will have result as 1 (ie. the rank). This completes the execution of a frame for the first row.


Next, the write method will be called for the second row in the partition-1, [0,1,1800000002,3261] and the same flow will be executed.


However, one important point to note here is that the processor's internal buffer already contains the rank information for the previous row, i.e [0,1,1800000002,3161]. Hence, when the processor.update([0,1,1800000002,3261]) is called, the processor will compute the subsequent rank. Since the processor maintains the previous computed rank information, we avoid having to compute the rank again and again from the beginning for each row. This provides linear time complexity.



This concludes the working of UnboundedPrecedingWindowFunctionFrame. In the next section, we will see how the AggregateProcessor and the WindowFunctionFrame integrates with the WindowFunctionExec. See you in next part :-)


Recent Posts

See All

Commenti


Thanks for submitting!

©2023 by Rishab Joshi.

bottom of page