PySpark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows.
PySpark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. PySpark SQL supports three kinds of window functions:
- ranking functions
- analytic functions
- aggregate functions
import pandas as pd
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from pyspark.sql import Window
# Create a spark session
spark_session = SparkSession.builder.getOrCreate()
# lets define a demonstration DataFrame to work on
df_data = {'partition': ['a','a', 'a', 'a', 'b', 'b', 'b', 'c', 'c',],
'col_1': [1,1,1,1,2,2,2,3,3,],
'aggregation': [1,2,3,4,5,6,7,8,9,],
'ranking': [4,3,2,1,1,1,3,1,5,],
'lagging': [9,8,7,6,5,4,3,2,1,],
'cumulative': [1,2,4,6,1,1,1,20,30,],
}
df_pandas = pd.DataFrame.from_dict(df_data)
# create spark dataframe
df = spark_session.createDataFrame(df_pandas)
df.show()
+---------+-----+-----------+-------+-------+----------+
|partition|col_1|aggregation|ranking|lagging|cumulative|
+---------+-----+-----------+-------+-------+----------+
| a| 1| 1| 4| 9| 1|
| a| 1| 2| 3| 8| 2|
| a| 1| 3| 2| 7| 4|
| a| 1| 4| 1| 6| 6|
| b| 2| 5| 1| 5| 1|
| b| 2| 6| 1| 4| 1|
| b| 2| 7| 3| 3| 1|
| c| 3| 8| 1| 2| 20|
| c| 3| 9| 5| 1| 30|
+---------+-----+-----------+-------+-------+----------+
Combining Windows and Calling Different Columns
It is also possible to combine windows and also to call windows on columns other than the ordering column. These more advanced uses can require careful thought to ensure you achieve the intended results.
Running totals also require a more complicated window as here.
TOP PAYING JOBS REQUIRE THIS SKILL
ENROLL AT 90% OFF TODAY
cumulative_window_2 = Window.partitionBy(
'partition'
).orderBy(
'cumulative'
# in this case we will use rangeBetween for the sum
).rangeBetween(
# In this case we need to use Window.unboundedPreceding to catch all earlier rows
Window.unboundedPreceding, 0
)
df_cumulative_2 = df.select(
'partition', 'cumulative'
).withColumn(
'cumulative_sum', fn.sum('cumulative').over(cumulative_window_2)
)
df_cumulative_2.show()
# note the summing behavior where multiple identical values are present in the orderBy column
+---------+----------+--------------+
|partition|cumulative|cumulative_sum|
+---------+----------+--------------+
| c| 20| 20|
| c| 30| 50|
| b| 1| 3|
| b| 1| 3|
| b| 1| 3|
| a| 1| 1|
| a| 2| 3|
| a| 4| 7|
| a| 6| 13|
+---------+----------+--------------+
Related Posts:
- Mastering PySpark Window Functions: Cumulative Calculations (Running Totals and Averages)
- PySpark Window Functions – Lagged Columns with Code Examples
- PySpark Window Functions – Simple Aggregation: A Real-World Guide
Combining Windows and Calling Different Columns
It is also possible to combine windows and also to call windows on columns other than the ordering column. These more advanced uses can require careful thought to ensure you achieve the intended results
# we can make a window function equivalent to a standard groupBy:
# first define two windows
aggregation_window = Window.partitionBy('partition')
grouping_window = Window.partitionBy('partition').orderBy('aggregation')
# then we can use this window function for our aggregations
df_aggregations = df.select(
'partition', 'aggregation'
).withColumn(
# note that we calculate row number over the grouping_window
'group_rank', fn.row_number().over(grouping_window)
).withColumn(
# but we calculate other columns over the aggregation_window
'aggregation_sum', fn.sum('aggregation').over(aggregation_window),
).withColumn(
'aggregation_avg', fn.avg('aggregation').over(aggregation_window),
).withColumn(
'aggregation_min', fn.min('aggregation').over(aggregation_window),
).withColumn(
'aggregation_max', fn.max('aggregation').over(aggregation_window),
).where(
fn.col('group_rank') == 1
).select(
'partition',
'aggregation_sum',
'aggregation_avg',
'aggregation_min',
'aggregation_max'
)
df_aggregations.show()
This is equivalent to the rather simpler expression below
df_groupby = df.select(
'partition', 'aggregation'
).groupBy(
'partition'
).agg(
fn.sum('aggregation').alias('aggregation_sum'),
fn.avg('aggregation').alias('aggregation_avg'),
fn.min('aggregation').alias('aggregation_min'),
fn.max('aggregation').alias('aggregation_max'),
)
df_groupby.show()
+---------+---------------+---------------+---------------+---------------+
|partition|aggregation_sum|aggregation_avg|aggregation_min|aggregation_max|
+---------+---------------+---------------+---------------+---------------+
| c| 17| 8.5| 8| 9|
| b| 18| 6.0| 5| 7|
| a| 10| 2.5| 1| 4|
+---------+---------------+---------------+---------------+---------------+
+---------+---------------+---------------+---------------+---------------+
|partition|aggregation_sum|aggregation_avg|aggregation_min|aggregation_max|
+---------+---------------+---------------+---------------+---------------+
| c| 17| 8.5| 8| 9|
| b| 18| 6.0| 5| 7|
| a| 10| 2.5| 1| 4|
+---------+---------------+---------------+---------------+---------------+
In some cases we can create a window on one column but use the window on another column.
Note that only functions where the column is specified allow this
lag_window = Window.partitionBy('partition').orderBy('lagging')
df_cumulative_2 = df.select(
'partition', 'lagging', 'cumulative',
).withColumn(
'lag_the_laggging_col', fn.lag('lagging', 1).over(lag_window)
).withColumn(
# It is possible to lag a column which was not the orderBy column
'lag_the_cumulative_col', fn.lag('cumulative', 1).over(lag_window)
)
df_cumulative_2.show()
+---------+-------+----------+--------------------+----------------------+
|partition|lagging|cumulative|lag_the_laggging_col|lag_the_cumulative_col|
+---------+-------+----------+--------------------+----------------------+
| c| 1| 30| null| null|
| c| 2| 20| 1| 30|
| b| 3| 1| null| null|
| b| 4| 1| 3| 1|
| b| 5| 1| 4| 1|
| a| 6| 6| null| null|
| a| 7| 4| 6| 6|
| a| 8| 2| 7| 4|
| a| 9| 1| 8| 2|
+---------+-------+----------+--------------------+----------------------+