top of page

Part 5: Deep Dive - Spark Window Functions

  • Writer: Rishaab
    Rishaab
  • Dec 2, 2024
  • 6 min read

Updated: Nov 30, 2025


This is the final part of the deep dive series on understanding internals of Apache Spark Window Functions.


In earlier sections, we focused on grasping the implementation details of the individual components necessary for Window functions to operate. We used the rank window function as an example and examined each component thoroughly. We explored various representations such as UnsafeRow, SpecificInternalRow, and JoinedRow, and how they improve data processing in Spark. We understood AggregateProcessor, the evaluation of expressions, and their integration with WindowFunctionFrame. We looked into WindowExec and discovered that WindowExec.doExecute() is triggered during the Spark action, which produces an RDD[InternalRow]. When computed, this RDD yields an Iterator on UnsafeRow.


Having built a tangible understanding over the course of this series, it's now time to stitch everything together.



WindowFunctionFrame Factory

In previous posts, we covered the inner workings of WindowFunctionFrame, but we didn't delve into its instantiation process. WindowFunctionFrame is created within the WindowExec.doExecute() method and we will now explore this process.


The WindowExecBase which is the parent class for WindowExec has a variable called windowFrameExpressionFactoryPairs which is lazily instantiated and is of type,

Seq[(ExpressionBuffer, InternalRow => WindowFunctionFrame)]

where for each pair, the first element represents expressions and the second represents a function that creates corresponding WindowFunctionFrame. The windowFrameExpressionFactoryPairs provides the logic for creating the WindowFunctionFrame.


The windowFrameExpressionFactoryPairs during initialization, loops through each expression in the windowExpression to create WindowFunctionFrame. The windowExpression is already supplied to the WindowExec during the query planning, representing expressions to be evaluated.


For our rank window function example in part 1, the windowExpression will look like so,

ArrayBuffer(
  rank(
    col2
  ) windowspecdefinition(
    col1,
    col2 ASC NULLS FIRST,
    specifiedwindowframe(
      RowFrame,
      unboundedpreceding$(),
      currentrow$()
    )
  ) AS rank
)

The whole process of instantiating a WindowFunctionFrame is a multi step process described below.


For each provided expression we do the following,

  • If the expression is of type WindowExpression, like rank() expression above, we insert its details into a map named framedFunctions, which has the following type,

mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)]
  • The FrameKey is of composite type whose value is crafted depending on the types of WindowFunction and WindowFunctionFrame.

  • The collect()  method is responsible for inserting each window function expression into the map.

  • For rank function, it will hit AggregateWindowFunction case, because Rank inherits from AggregateWindowFunction. The map after insertion will look like so (please take a moment to understand this representation),

Map(
  (AGGREGATE, RowFrame, unboundedpreceding$(), currentrow$()) -> (
    ArrayBuffer(
      rank(col2#8) windowspecdefinition(
        col1#7,
        col2#8 ASC NULLS FIRST,
        specifiedwindowframe(RowFrame, unboundedpreceding$(), 	
							currentrow$())
      )
    ),
    ArrayBuffer(rank(col2#8))
  )
)

Having inserted all entries into the framedFunctions map, its time to loop through each entry and run the following sequence of operations,

  • We first create a processor. This is the same AggregateProcessor that we learned in part 3. However, note that for Python UDF, it is not created.

  • Next, we need to create a factory method responsible for creating the WindowFunctionFrame. And depending on the type of key in the map, the appropriate WindowFunctionFrame will be created.

  • For our case of our rank function example, growing frame (UnboundedPrecedingWindowFunctionFrame) will be created.

  • The aforementioned processor will then be integrated to into frame.

  • Note that, we have not created the frame yet, we have just created a function that is responsible for creating the frame. The frame creation happens when the factory function is invoked, which happens later.


After looping through all window expressions and creating their respective WindowFunctionFrame factory functions, we initialize windowFrameExpressionFactoryPairs with them. The WindowExec.doExecute() uses this it to creates frames, which are later consumed by the Iterator. More on this later.


Sorting and Partitioning Requirements

Remember, a WindowFunctionFrame specification requires a partition column and a sort column. However, the WindowExec physical operator by itself doesn't perform any sorting or partition by itself. Instead, the query planner adds Sort and Exchange operator as its upstream operators and responsible for correctly partitioning and sorting the data and providing prepared data to the WindowExec. The WindowExec will consume the input data in that order.


To illustrate, take a look at this physical plan for the rank function example, where the Sort and Exchange are downstream operators for Window.

[1] Window[
        rank(col2#8)
        windowspecdefinition(
            col1#7,
            col2#8 ASC NULLS FIRST, 
	        specifiedwindowframe(
                RowFrame,
                unboundedpreceding$(), 
		        currentrow$()
	        )
        ) AS rank#13
	], 
    [col1#7],[col2#8 ASC NULLS FIRST]
+- [2] Sort[
           col1#7 ASC NULLS FIRST, 
           col2#8 ASC NULLS FIRST
       ],false, 0
   +- [3] Exchange hashpartitioning(col1#7, 5),
              ENSURE_REQUIREMENTS, [id=#11]
       +- [4] Project [_1#2 AS col1#7, _2#3 AS col2#8]
           +- [5] LocalTableScan [_1#2, _2#3]


Iterator Working

Like any Iterator, the RDD Iterator has two public methods hasNext() and next() and they drive the whole window function execution machinery.


Let's take a moment to understand a few variables, as understanding them is essential for iterator working.

  • buffer - It's an array of UnsafeRow and stores the input rows for each partition provided by the child RDD.

  • bufferIterator - Iterator to the buffer.

  • windowFunctionResult - This is of type SpecificInternalRow (discussed in part 3) and gets updated by the result of the WindowFunctionFrame for each row. Since SpecificInternalRow requires a schema, the schema is inferred from the provided window expressions.

  • result - This is of type InterpretedUnsafeRowProjection and evaluates result expressions over a row. The evaluated expression is the output returned by the Iterator.next()

  • frames - This represents a Sequence of WindowFunctionFrame created from their factory functions discussed before. For rank function example, there will be only one frame which is UnboundedPrecedingWindowFunctionFrame.



Now let's understand working of hasNext and next methods.


The hasNext()

The hasNext method returns true if the current partition has an input row ready for processing (i.e. bufferIterator has a next row to return), or if there is an input row pending for processing (i.e, nextRowAvailable is true).


The next ()

This method returns the output row computed by the window function. Its working is as follows,

  • Check if the bufferIterator has next row to process in the current partition. If not, call fetchNextPartition(). The fetchNextPartition has the following responsibilities,

    • fetches rows for next partition and adds them to the buffer thereby updating the bufferIterator with the next row.

    • Prepares each WindowFunctionFrame by calling prepare(buffer). Remember from part 4, the WindowFunctionFrame prepare method takes the input rows (buffer) and initializes its internal state with it. For the rank window function example, the UnboundedPrecedingWindowFunctionFrame.prepare method will be invoked.

  • At this point, the bufferIterator should have a new row to return. Update the current row with the row returned by bufferIterator.

  • Next, Iterate through each WindowFunctionFrame and invoke their write method with the current row.

  • Recall from part 4, that the write method takes the input row and is tasked with calculating the frame's result. The result is written to the windowFunctionResult.

  • In the case of our rank function example, the write method calculates the rank of the current row, such that for the 1st row, result appears as [1], for the 2nd row as [2], and so forth.

  • After calculating the frame result for the current row, it's time to create the final row, which will be returned to the caller. To do so, the current processed row is joined with its frame result (windowFunctionResult), like so, join(current, windowFunctionResult) Then, the joined resulted is applied to the result projection which evaluates the joined row and returns the output row result for the provided current row.



Let's run the Iterator.next() for the first row (1, "a1") from our rank function example in part 1.


Remember from part 2, the UnsafeRow string representation for the first row was [0,1,1800000002,3161]. Since this is the first row, its rank will be 1, hence result computed by the UnboundedPrecedingWindowFunctionFrame will be printed as windowFunctionResult[1] in its string form. The [1] here represents the calculated rank.

And the joined row will be string represented as JoinedRow([0,1,1800000002,3161] , [1]), where the first component is the current UnsafeRow and the second row is the rank output.


On applying the joined row to the result variable, the following expressions will be evaluated, representing the projected output,

exprs: ArrayBuffer (size = 3)
└── BoundReference
    └── input[0, string, true]
└── BoundReference
    └── input[1, int, false]
└── Alias
    └── input[2, int, true] AS rank

The first expression pertains to the col2 in the dataset and for the current row it represent the value "a1". The second expression represents the col1 in the dataset and for the current row its value is 1. The third expression retrieves the calculated rank, which is 1, and store it in alias column named rank. These calculated values are encapsulated in an UnsafeRow, which is be the output returned by the rank window function for the row (1, "a1").


And similar machinery is executed for subsequent rows.


This wraps up the Deep Dive series on Spark Window Functions. This is an involved topic and hope you find this series useful. Until then, happy learning, and see you in the next post 🙂.





Recent Posts

See All
Part 4: Deep Dive - Spark Window Functions

In part 3 , we explored the internal workings of AggregateProcessor and how it evaluates expressions using our rank() window function example. In this post, we will gain a deep understanding of the W

 
 
 
Part 3: Deep Dive - Spark Window Functions

In the previous post, we briefly alluded about AggregateProcessor without going into the depth. In this part, we will focus on its implementation details. To set things up, an AggregateProcessor is a

 
 
 

Comments


Thanks for submitting!

©2023 by Rishab Joshi.

bottom of page