Part 5: Deep Dive - Spark Window Functions
- Rishaab
- Dec 2, 2024
- 6 min read
This is the final part of the deep dive series on understanding internals of Apache Spark Window Functions.
In earlier sections, we concentrated on grasping the implementation details of the individual components necessary for Window functions to operate. We used the rank window function as an example and examined each component thoroughly. We explored various representations such as UnsafeRow, SpecificInternalRow, and JoinedRow, and how they improve data processing in Spark. We grokked the foundational elements like AggregateProcessor, the evaluation of expressions, and their integration with WindowFunctionFrame.
Having built a tangible understanding over the course of this series, it's now time to stitch everything together and see how everything fits with WindowExec (Window function physical operator).
To summarize, in part 2, we gained understanding of WindowExec and discovered that WindowExec.doExecute() is triggered during the Spark action. The WindowExec.doExecute() produces an RDD[InternalRow]. When computed, this RDD yields an Iterator. Now, we will pick up where we left off and explore the implementation details of the Iterator.
Instantiating WindowFunctionFrame Factory
In previous posts, we covered the inner workings of WindowFunctionFrame, but we didn't delve into its instantiation process. WindowFunctionFrame is created within the WindowExec.doExecute() method and we will now explore this process.
The WindowExecBase which is the parent class for WindowExec has a variable called windowFrameExpressionFactoryPairs which is lazily instantiated and is of type,
Seq[(ExpressionBuffer, InternalRow => WindowFunctionFrame)]
where for each pair in the Sequence, the first element represents expressions and the second represents a function that creates corresponding WindowFunctionFrame.
The windowFrameExpressionFactoryPairs provides the logic for creating the WindowFunctionFrame, so let's understand what happens during its initialization.
The windowFrameExpressionFactoryPairs during initialization loops through each expression in the windowExpression which is of type Seq[NamedExpression] to create WindowFunctionFrame. The windowExpression is already supplied to the WindowExec physical operator during the query planning phase, representing expressions to be evaluated.
For our rank window function example in part 1, the windowExpression will look like so,
ArrayBuffer(
rank(
col2
) windowspecdefinition(
col1,
col2 ASC NULLS FIRST,
specifiedwindowframe(
RowFrame,
unboundedpreceding$(),
currentrow$()
)
) AS rank
)
For each expression in windowExpression we do the following,
If the expression is of type WindowExpression, we insert its details into a map named framedFunctions, which is of type,
mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)]
The FrameKey is of composite type whose value is crafted depending on the WindowFunction and WindowFunctionFrame types. See invocation of collect method for each such cases here, where the collect method is responsible for inserting into this map.
For our rank function example code, we will hit the AggregateWindowFunction case. This is because the Rank inherits from AggregateWindowFunction and the map after insertion will look like so (please take a moment to understand the mapping),
Map(
(AGGREGATE, RowFrame, unboundedpreceding$(), currentrow$()) -> (
ArrayBuffer(
rank(col2#8) windowspecdefinition(
col1#7,
col2#8 ASC NULLS FIRST,
specifiedwindowframe(RowFrame, unboundedpreceding$(),
currentrow$())
)
),
ArrayBuffer(rank(col2#8))
)
)
Now that we have looped through all WindowExpression and collected them in the map framedFunctions, it's time to loop through this map.
For each entry in the framedFunctions map, we do the following,
We first create a processor. This is the same AggregateProcessor that we learned in part 3. However, note that for the Python UDF, it is not created.
Next, we need to create a factory method responsible for creating the WindowFunctionFrame.
Depending on the type of key in the map, the appropriate WindowFunctionFrame will be created.
For our case of our rank function example, growing frame (UnboundedPrecedingWindowFunctionFrame) will be created.
The aforementioned processor will then be integrated to into frame.
Note that, we have not created the frame yet, we have just created a function that is responsible for creating the frame. The frame creation happens when the factory function is invoked, which happens later.
After looping through all window expressions and creating their respective WindowFunctionFrame factory functions, we initialize windowFrameExpressionFactoryPairs with them, ie. pair of (expressions, factory).The WindowExec.doExecute() uses this it to extract expressions and creates frames, which are later consumed by the Iterator.
Iterator Working
Like any Iterator, the RDD Iterator has two public methods hasNext() and next() and they drive the whole window function execution machinery.
Remember, a WindowFunctionFrame specification requires a partition column and a sort column. However, the WindowExec by itself doesn't perform any sorting or partition by itself. Instead, the query planning will add Sort and Exchange operator as its child operators and they will be responsible for correctly partitioning and sorting the data and providing required data to the WindowExec. The WindowExec will consume the data in that order.
To illustrate, take a look at this physical plan for the rank function example, where the Sort and Exchange are children of Window.
[1] Window [rank(col2#8) windowspecdefinition(col1#7, col2#8 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#13], [col1#7], [col2#8 ASC NULLS FIRST]
+- [2] Sort [col1#7 ASC NULLS FIRST, col2#8 ASC NULLS FIRST], false, 0
+- [3] Exchange hashpartitioning(col1#7, 5), ENSURE_REQUIREMENTS, [id=#11]
+- [4] Project [_1#2 AS col1#7, _2#3 AS col2#8]
+- [5] LocalTableScan [_1#2, _2#3]
With additional context in place, let's take a moment to understand a few variables, as grasping their meanings is essential to understanding how the Iterator functions.
buffer - It's an array of UnsafeRow and stores the input rows for each partition provided by the child operator.
bufferIterator - Iterator to the buffer.
windowFunctionResult - This is of type SpecificInternalRow (discussed in part 3) and gets updated by the result computed by the WindowFunctionFrame for each row. Since SpecificInternalRow requires a schema, the schema is inferred from the provided window expressions. In case of rank function example, the schema has one field of integer type representing the rank computed for each row.
result - This is of type InterpretedUnsafeRowProjection and evaluates expressions over a row. The evaluated row is the output returned by the Iterator.next()
frames - This represents a Sequence of WindowFunctionFrame created from their factory functions discussed here. For our rank function example, there will be only one frame which is UnboundedPrecedingWindowFunctionFrame.
Now let's understand working of hasNext and next methods.
The hasNext method of the Iterator is simple and unremarkable. It returns true if the current partition has an input row ready for processing (ie. bufferIterator has a next row to return), or if there is an input row pending for processing (ie, nextRowAvailable is true).
This method returns the output row computed by the window function to be consumed by the caller. Its working is as follows,
Check if the bufferIterator has next row to process. If not, call fetchNextPartition(). The fetchNextPartition has the following responsibilities,
fetching rows for next partition and adding them to the buffer thereby updating the bufferIterator with the next row.
Prepare each WindowFunctionFrame by calling prepare(buffer). Remember from part 4, the WindowFunctionFrame's prepare method takes the input rows (buffer) and initializes its internal state with it. For the rank window function example, the UnboundedPrecedingWindowFunctionFrame.prepare method will be invoked.
At this point, the bufferIterator should have a new row to return.
Update the current row with the bufferIterator's next returned row.
Next, Iterate through each WindowFunctionFrame and invoke their write method with the current row.
Recall from part 4, that the write method takes the input row and is tasked with calculating the frame's result. The result is written to the windowFunctionResult.
In the case of our rank function example, the write method calculates the rank of the current row, such that for the 1st row, result appears as windowFunctionResult[1], for the 2nd row as windowFunctionResult[2], and so forth.
After calculating the frame result for the current row, it's time to create the final row, which will be returned to the caller. To do so, the current processed row is joined with its frame result (windowFunctionResult), like so, join(current, windowFunctionResult)
Then, the joined resulted is applied to the result, which evaluates the joined row and returns the output row result for the provided current row.
Let's run the Iterator.next() algorithm for the first row (1, "a1") from our rank function example in part 1.
Remember from part 2, the UnsafeRow representation for the first row was [0,1,1800000002,3161].
Since this is the first row, its rank will be 1, hence result computed by the UnboundedPrecedingWindowFunctionFrame will be windowFunctionResult[1].
Then, the joined row will be JoinedRow([0,1,1800000002,3161] , [1]), where the first component is the current UnsafeRow and the second row is the rank output.
On applying the joined row to the result variable, the following expressions will be evaluated,
exprs: ArrayBuffer (size = 3)
└── BoundReference
└── input[0, string, true]
└── BoundReference
└── input[1, int, false]
└── Alias
└── input[2, int, true] AS rank
In this context, the first expression pertains to the col2 in the dataset and for the current row it represent the value "a1".
The second expression represents the col1 in the dataset and for the current row its value is 1.
The third expression retrieves the calculated rank, which is 1, and store it in alias column named rank.
These calculated values are encapsulated in an UnsafeRow, which is be the output returned by the rank window function for the row (1, "a1"). Similar machinery is executed for the next row when the Iterator.next() is invoked by the caller.
This wraps up the Deep Dive series on Spark Window Functions. I hope you learned something new. Remember to subscribe for more content like this. Until then, happy learning, and see you in the next post :-)
Комментарии