Part 3: Deep Dive - Spark Window Functions
- Rishaab

- Nov 22, 2024
- 7 min read
Updated: Nov 29, 2025
In the previous post, we briefly alluded about AggregateProcessor without going into the depth. In this part, we will focus on its implementation details.
To set things up, an AggregateProcessor is a component that facilitates in computing aggregate operation, like SUM, RANK, etc. In order to ensure high efficiency in a scalable system like Spark, this aggregation process sacrifices immutability, a fundamental concept in the Scala language. AggregateProcessor uses mutable classes to achieve optimal performance, and these classes are,
SpecificInternalRow
A SpecificInternalRow is a specialized row representation that internally uses MutableValue types.
A MutableValue can be of specific type, like Int, Boolean, etc. and allows in-place updates of values. These in-place updates minimizes the object creation and hence reduce the impact of garbage collection. The SpecificInternalRow is a container for the array of MutableValue, where their types can be derived by the schema.
During creating, a SpecificInternalRow accepts a sequence of data types, and for each type it creates a corresponding MutableValue. Here is the code snippet for the same,
def this(dataTypes: Seq[DataType]) = {
this(new Array[MutableValue](dataTypes.length))
var i = 0
dataTypes.foreach { dt =>
values(i) = dataTypeToMutableValue(dt)
i += 1
}
}MutableProjection
A MutableProjection is an abstract implementation and responsible for evaluating a series of expressions over the provided input row and write the result to the output row, called target. A target row can be specified by calling this method.
The mutable nature enables evaluating expressions frequently and writing the result to the same target row without the overhead of creating new row every time.
There are two variants of MutableProjection, InterpretedMutableProjection GeneratedMutableProjection
The former evaluates expressions using interpreted style while the latter performs code-generation to evaluates expressions.
Remember, from Part 1, we disabled the code generation by setting the below property, hence in our case the InterpretedMutableProjection will be called.
spark.conf.set("spark.sql.codegen.factoryMode", "NO_CODEGEN")Moving forward, whenever we mention MutableProjection, we will implicitly refer to InterpretedMutableProjection.
We won't dedicate a significant amount of time on InterpretedMutableProjection, however, following points should be noted about it,
The expressions to evaluate are supplied during its construction.
The target row is supplied by the caller and can set via this method.
A row is evaluated over the provided expressions using apply method. The evaluated result is then written to the target row.
JoinedRow
A JoinedRow acts like two rows concatenated together (logically) and provides convenient way to lookup their values. A JoinedRow has left and right row. The statement join(buffer, input) creates a JoinedRow with left row being buffer and right row being input.
JoinedRow is used by AggregateProcessor to evaluate certain expressions on two rows together and compute the aggregated result. We will learn more about this later.
In a JoinedRow, the indices are zero-based, starting with the left row index, followed by the right row.
For eg. if
a left row is of type SpecificInternalRow with 3 columns of type - [Int, Int, String]. Its value is [1, 1, "a"].
a right row is of type UnsafeRow for value (2, "a1") encoded as [0,2,1800000002,3161] in its string representation.
The JoinedRow for the above will be represented as,
joinedRow{ [0, 1, "a"], [0,2,1800000002,3161] }The indexing on it will work as follows,
The joinedRow[2] will retrieve the string value "a" of the left row, which is the 2nd element position in the joined row.
The joinedRow[3] will retrieve the integer value 2 of the right row. Because this will form the 3rd element when concatenated with the left row.
The joinedRow[4] will retrieve the hex value 3161 (i.e., "a1") from the right row.
AggregateProcessor
Now let's come back to AggregateProcessor. AggregateProcessor works by evaluating a series of expressions over the input row to compute the aggregated result. The aggregated result is stored in a variable called buffer which is of type SpecificInternalRow.
We will now understand how this buffer is initialized and updated, and how the final aggregated result is fetched from the buffer.
Initializing buffer
The buffer initialization is done by the initialize method. This method uses variable named initialProjection of type MutableProjection to initialize its buffer. Since expression(s) are required to initialize the buffer, the initialization expression for our rank() example from Part 1 will look like so,
exprs: ArrayBuffer (size = 3)
├── Literal: 0
├── Literal: 1
└── Literal: nullAnd buffer will be initialized to this,
buffer[0] = 0
buffer[1] = 1
buffer[2] = null The buffer[0] holds to the current rank, buffer[1] holds the next rank to be assigned and buffer[2] holds the col2 value for comparing and assigning the next rank.
Updating buffer from input row
After, initializing the buffer, the AggregateProcessor is ready to process the provided inputRow. The update method is called for this purpose and updateProjection is responsible for evaluating a series of expressions and update the buffer.
For our rank() example code, the following expressions will be evaluated against the inputRow.
exprs: ArrayBuffer (size = 3)
├── if
│ ├── Condition:
│ │ ├── ((input[4, string, true] <=> input[2, string, true])
│ │ └── AND NOT (input[0, int, false] = 0))
│ ├── True branch: input[0, int, false]
│ └── False branch: input[1, int, false]
├── Add
│ └── (input[1, int, false] + 1)
└── BoundReference
└── input[4, string, true]Note: Above expressions need to be evaluated, with each expression updating the buffer value at its respective index. For instance, the first If expression updates buffer[0], the second Add expression updates buffer[1], and so forth.
Above expressions are evaluated over the JoinedRow with left row being the buffer and right row being inputRow (of type UnsafeRow) for our example dataset in Part 1. It is evaluated like so,
updateProjection(join(buffer, inputRow))Let's run these expressions on the first dataset row, (1, "a1").
First, the buffer will initialized to [0, 1, null]. The inputRow row will be [0, 1, 1800000002, 3161], representing the UnsafeRow for the row (1, "a1"). Consequently, the JoinedRow will appear as { [0, 1, null], [0, 1, 1800000002, 3161] }.
Let's understand how the buffer will be updated on this JoinedRow using update expressions.
If Expression Evaluation
├── if
│ ├── Condition:
│ │ ├── ((input[4, string, true] <=> input[2, string, true])
│ │ └── AND NOT (input[0, int, false] = 0))
│ ├── True branch: input[0, int, false]
│ └── False branch: input[1, int, false]Here are few things to know about the conventions,
<=> operator This is the null-safe equality comparison in Spark. It checks whether two inputs are equal, even if one or both values are null. If both values are null, it treats them as equal.
input[4, string, true] This retrieves the string value at the 4th index of the JoinedRow. The true indicates that the string can have a null value. In the context of a JoinedRow, this value is fetched from the right row which is the inputRow. Specifically, it corresponds to the col2 column of the example dataset, which has the value "a1" in this example.
input[2, string, true] This retrieves the string value at the 2nd index of the JoinedRow. The true also allows this value to be null. This value is fetched from the left row, which is the buffer. In this example, the value is null because no previous col2 value has been seen.
input[0, int, false] = 0 This expression checks if this is the first time that the rank getting calculated. It retrieves the integer value at the 0th index of the JoinedRow. The false means this value cannot be null. In our example, this condition evaluates to true because that's the first row getting ranked.
The if-condition as a whole checks that the values of input[4] and input[2] are equal, and that the rank being calculated is not the first rank (checked via NOT (input[0, int, false] = 0)). If true, then there is a tie (input[4] = input[2]), and hence fetch the existing rank present from buffer at 0th index (input[0, int, false]). Meaning, this inputRow will be assigned the previous rank. If false, then there is either no tie or this is the first rank being assigned, hence, fetch the next rank from the buffer at 1st index (input[1, int, false]). Meaning, the inputRow will be assigned the next rank, which is 1. The value of buffer[0] will be set to the value of buffer[1].
Add Expression Evaluation
├── Add
└── (input[1, int, false] + 1)It calculates the next rank by increasing buffer[1] by 1. Consequently, buffer[1] will become 2, signifying the next rank to be allocated to the next inputRow.
BoundReference Expression Evaluation
BoundReference
└── input[4, string, true]The value of col2 from the right row is retrieved and assigned to buffer[2]. Hence, buffer[2] will be set to "a1". This value will then be compared with the next inputRow col2 value to determine the rank.
After these evaluations, the buffer is set to [1, 2, "a1"].
The value at 0th index is the rank to be assigned to the inputRow.
The value at 1st index is the potential next rank to be assigned to the next inputRow.
The value at 2nd position is the value of col2 to compare with the next inputRow.
The Evaluate Method
Now that the buffer has been updated with the rank details, the AggregateProcessor must perform final expression evaluation and return the rank for that inputRow in the desired format. For this, the caller must invoke evaluate method and it's implemented like so,
def evaluate(target: InternalRow): Unit = {
evaluateProjection.target(target)(buffer)
}The target is the output row where the final evaluated result will be written.
The target is provided by the WindowFunctionFrame and set by invoking evaluateProjection.target(target). Then, the evaluateProjection.apply(buffer) is implicitly called which evaluates the expressions over the buffer and write the final result to the target output row. For our rank() function, the below expression will be evaluated.
BoundReference
└── input[0, int, false]It simply retrieves the integer at the 0th index from the buffer, representing the currently computed rank. Consequently, the result returned will be [1], indicating the rank for the row (1, "a1").
This concludes the full processing of the AggregateProcessor. In the next section, we will focus on the UnboundedPrecedingWindowFunctionFrame and examine how the AggregateProcessor integrates with the framework.
Until then, happy learning 🙂.

Comments