In PySpark, window functions are a powerful tool for data manipulation and analysis. They allow you to perform complex computations on subsets of data within a DataFrame, using a sliding window of rows. One common use case for window functions is creating lagged columns, which involve shifting a column’s values up or down by a certain number of rows. In this article, we will explore how to use PySpark window functions to create lagged columns in a DataFrame. This post is in continuation of PySpark Window Functions – Row-Wise Ordering, Ranking, and Cumulative Sum with Real-World Examples and Use Cases & PySpark Window Functions – Simple Aggregation: A Real-World Guide
Prerequisites:
Before we get started, you will need to have a basic understanding of PySpark and DataFrames. If you are new to PySpark, it may be helpful to review the PySpark documentation and some introductory tutorials first.
Example 1: Creating a Lagged Column
Suppose we have a DataFrame that contains daily sales data for a store, with columns for the date and the total sales for that day. We want to create a new column that shows the total sales for the previous day. We can use the PySpark window function lag()
to accomplish this:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
# Create example DataFrame
sales_data = [("2022-04-01", 1000), ("2022-04-02", 1200), ("2022-04-03", 900), ("2022-04-04", 1500)]
sales_df = spark.createDataFrame(sales_data, ["date", "sales"])
# Define window specification
window_spec = Window.orderBy("date")
# Create lagged column
sales_df = sales_df.withColumn("prev_day_sales", lag("sales", 1).over(window_spec))
# Show results
sales_df.show()
Output:
+----------+-----+--------------+
| date|sales|prev_day_sales|
+----------+-----+--------------+
|2022-04-01| 1000| null|
|2022-04-02| 1200| 1000|
|2022-04-03| 900| 1200|
|2022-04-04| 1500| 900|
+----------+-----+--------------+
Explanation:
First, we create an example DataFrame sales_df
with columns for the date and total sales. Then, we define a window specification window_spec
that orders the rows by the date column. Finally, we use the lag()
function to create a new column prev_day_sales
that shows the value of the sales
column from the previous row. We specify the lag offset as 1
to shift the values up by one row. The over(window_spec)
clause applies the window specification to the lag()
function.
Example 2: Creating a Lagged Column with a Variable Offset
In some cases, we may want to create a lagged column where the number of periods to lag varies depending on the value of another column. For example, we may want to create a column that shows the sales for the same product two days ago.
Step 1: Create a DataFrame
First, let’s create a DataFrame containing sales data for different products and days:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag
# Create a SparkSession
spark = SparkSession.builder.appName("WindowFunctions").getOrCreate()
# Create a DataFrame
data = [("Product A", "2022-01-01", 100),
("Product B", "2022-01-01", 200),
("Product C", "2022-01-01", 300),
("Product A", "2022-01-02", 150),
("Product B", "2022-01-02", 250),
("Product C", "2022-01-02", 350),
("Product A", "2022-01-03", 200),
("Product B", "2022-01-03", 300),
("Product C", "2022-01-03", 400)]
sales_df = spark.createDataFrame(data, ["Product", "Date", "Sales"])
The DataFrame looks like this:
+---------+----------+-----+
| Product| Date|Sales|
+---------+----------+-----+
|Product A|2022-01-01| 100|
|Product B|2022-01-01| 200|
|Product C|2022-01-01| 300|
|Product A|2022-01-02| 150|
|Product B|2022-01-02| 250|
|Product C|2022-01-02| 350|
|Product A|2022-01-03| 200|
|Product B|2022-01-03| 300|
|Product C|2022-01-03| 400|
+---------+----------+-----+
Step 2: Create a WindowSpec and use lag()
with a variable offset
To create a lagged column with a variable offset, we need to use the when()
function to check the value of another column and specify the number of periods to lag accordingly. We can do this inside the lag()
function.
For example, to create a column that shows the sales for the same product two days ago, we can use the following code:
from pyspark.sql.window import Window
from pyspark.sql.functions import when, col
# Create a WindowSpec
w = Window.partitionBy("Product").orderBy("Date")
# Create a column that shows the sales for the same product two days ago
sales_df = sales_df.withColumn("LaggedSales", lag(col("Sales"), when(col("Date")=="2022-01-03", 2).otherwise(1)).over(w))
sales_df.show()
This code creates a new column called “LaggedSales” that shows the sales for the same product two days ago. We use the when()
function to check if the value of the “Date” column is “2022-01-03”. If it is, we specify a lag of 2 periods (i.e. two days ago). Otherwise, we specify a lag of 1 period (i.e. one day ago).
The output looks like this:
+-----+-------+------------+------------+
|sales|dept_id| date_id|LaggedSales
|
+-----+-------+------------+------------+
| 100| 1| 2022-01-01| null|
| 200| 1| 2022-01-02| 100|
| 300| 1| 2022-01-03| 200|
| 400| 2| 2022-01-01| null|
| 500| 2| 2022-01-02| 400|
| 600| 2| 2022-01-03| 500|
+-----+-------+------------+------------+
Example 3: Creating a Lagged Column with a Window Function
In this example, we will create a new column that contains the sales value of the previous row within each department, using a PySpark window function.
from pyspark.sql.window import Window
from pyspark.sql.functions import lag
w = Window.partitionBy("dept_id").orderBy("date_id")
sales_df.withColumn("sales_lagged", lag("sales", 1).over(w)).show()
Output:
+-----+-------+------------+------------+
|sales|dept_id| date_id|sales_lagged|
+-----+-------+------------+------------+
| 100| 1| 2022-01-01| null|
| 200| 1| 2022-01-02| 100|
| 300| 1| 2022-01-03| 200|
| 400| 2| 2022-01-01| null|
| 500| 2| 2022-01-02| 400|
| 600| 2| 2022-01-03| 500|
+-----+-------+------------+------------+
Explanation:
We create a window specification with Window.partitionBy("dept_id").orderBy("date_id")
, which partitions the data by department and orders the rows by date_id within each partition. Then, we apply the lag
function with an offset of 1 to the sales column using lag("sales", 1)
. Finally, we apply the window specification to the lag function with .over(w)
to create a new column named “sales_lagged”.
When not to use PySpark Window Functions:
While window functions provide a powerful tool for data manipulation, there are some scenarios where they are not the best option:
- Small datasets: If the dataset is small enough to fit in memory, it may be more efficient to use pandas or another tool for data manipulation.
- Non-window operations: If the operation does not require a window, using a window function may be overkill and add unnecessary complexity.
- Complex computations: If the computation involves complex logic or requires multiple passes over the data, it may be more efficient to write a custom UDF (User-Defined Function) in PySpark.
Closing Thoughts
PySpark window functions provide a powerful tool for data manipulation, allowing users to create lagged columns, rolling averages, and more. By understanding the syntax and functionality of these functions, PySpark users can unlock new insights and perform complex data manipulation tasks with ease. However, it’s important to keep in mind the limitations of window functions and choose the appropriate tool for each scenario.