pyspark-window-functions-row-wise-ordering-ranking-and-cumulative-sum-with-real-world-examples-and-use-cases

PySpark Window Functions – Row-Wise Ordering, Ranking, and Cumulative Sum with Real-World Examples and Use Cases

This post may contain affiliate links. Please read our disclosure for more info.

In the previous section, we discussed simple aggregation using window functions in PySpark. In this section, we will discuss how to use PySpark window functions for row-wise ordering and ranking.

Row-wise ordering and ranking functions are used to assign a rank or position to each row based on the order of the values in a specified column. They are very useful in scenarios such as finding the top N records based on a particular column or finding the cumulative sum or average of a column for each row in a PySpark DataFrame.

Before we move on to the code examples, let’s take a brief look at the types of window functions available in PySpark:

  • Ranking functions: These functions assign a unique rank to each row based on the values in a specified column. Examples of ranking functions include rank(), dense_rank(), percent_rank(), and row_number().
  • Aggregate functions: These functions perform a calculation on a set of rows within a window. Examples of aggregate functions include sum(), count(), avg(), min(), and max().
  • Analytic functions: These functions compute an aggregate value based on a group of rows and return multiple rows for each group. Examples of analytic functions include lead(), lag(), first_value(), and last_value().

Now, let’s move on to the code examples for row-wise ordering and ranking in PySpark.

Example 1: Finding the top N records based on a column

Suppose we have a PySpark DataFrame with columns “name” and “score”, and we want to find the top 3 students with the highest scores. We can use the row_number() function to assign a rank to each row based on the score column, and then filter the top 3 rows based on the assigned rank.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# create sample data
data = [("John", 90), ("Alice", 85), ("Bob", 95), ("Sarah", 80), ("Mike", 92)]
df = spark.createDataFrame(data, ["name", "score"])

# assign row number based on score column
window = Window.orderBy(df["score"].desc())
df = df.withColumn("rank", row_number().over(window))

# filter top 3 records
df.filter(df["rank"] <= 3).show()

Output:

diffCopy code+-----+-----+----+
| name|score|rank|
+-----+-----+----+
|  Bob|   95|   1|
| John|   90|   2|
| Mike|   92|   3|
+-----+-----+----+

In the above code, we first create a sample DataFrame with student names and their scores. We then create a window function using the Window class, and order the rows in descending order based on the score column. We then use the row_number() function to assign a unique rank to each row based on the order of the scores. Finally, we filter the DataFrame to get the top 3 rows based on the assigned ranks.

You might also like:   Mastering Apache Kafka Architecture: A Comprehensive Tutorial for Data Engineers and Developers

Example 2: Calculating the cumulative sum of a column

Now, let’s look at another example where we want to calculate the cumulative sum of a column based on a specific ordering.

Let’s say we have a dataset containing the sales data of different products.

Here’s how code using PySpark window functions would look like:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define the schema for the DataFrame
schema = StructType([
    StructField("region", StringType(), True),
    StructField("category", StringType(), True),
    StructField("product", StringType(), True),
    StructField("sales", FloatType(), True)
])

# Create the DataFrame
data = [
    ("North", "Furniture", "Chair", 100.0),
    ("North", "Furniture", "Table", 200.0),
    ("North", "Electronics", "Phone", 300.0),
    ("North", "Electronics", "Tablet", 400.0),
    ("South", "Furniture", "Chair", 500.0),
    ("South", "Furniture", "Table", 600.0),
    ("South", "Electronics", "Phone", 700.0),
    ("South", "Electronics", "Tablet", 800.0)
]

sales_df = spark.createDataFrame(data, schema)
sales_df.show()

This will create a DataFrame sales_df with the following schema and data:

+------+-----------+-------+-----+
|region|   category|product|sales|
+------+-----------+-------+-----+
| North|  Furniture|  Chair|100.0|
| North|  Furniture|  Table|200.0|
| North|Electronics|  Phone|300.0|
| North|Electronics| Tablet|400.0|
| South|  Furniture|  Chair|500.0|
| South|  Furniture|  Table|600.0|
| South|Electronics|  Phone|700.0|
| South|Electronics| Tablet|800.0|
+------+-----------+-------+-----+

from pyspark.sql.window import Window
from pyspark.sql.functions import sum as sql_sum

# Define the window specification
window_spec = Window.partitionBy("region", "category").orderBy("product")

# Add the cumulative sum column
sales_df = sales_df.withColumn("cumulative_sales", sql_sum("sales").over(window_spec))

# Show the result
sales_df.show()

This will add a new column named cumulative_sales to the sales_df DataFrame which contains the cumulative sum of the sales column. The Window.partitionBy() method is used to define the partitioning of the window, while Window.orderBy() specifies the ordering of the rows within each partition. The sql_sum() function is then applied to the sales column using the over() method to calculate the cumulative sum.

The output of the above code will be:

+------+-----------+-------+-----+----------------+
|region|   category|product|sales|cumulative_sales|
+------+-----------+-------+-----+----------------+
| North|  Furniture|  Chair|100.0|           100.0|
| North|  Furniture|  Table|200.0|           300.0|
| North|Electronics|  Phone|300.0|           300.0|
| North|Electronics| Tablet|400.0|           700.0|
| South|  Furniture|  Chair|500.0|           500.0|
| South|  Furniture|  Table|600.0|          1100.0|
| South|Electronics|  Phone|700.0|           700.0|
| South|Electronics| Tablet|800.0|          1500.0|
+------+-----------+-------+-----+----------------+

As you can see, the cumulative_sales column contains the cumulative sum of the sales column for each region and category combination, ordered by the product column.

You might also like:   Master Apache SQOOP with Big Data Hadoop

Window Functions with RANK, DENSE_RANK and ROW_NUMBER

Apart from window functions with aggregation functions, Pyspark also provides some window functions which help to generate a rank, dense rank and row number on the basis of an order. In order to use these functions, we must first specify a window partition using the partitionBy method and then specify the order using the orderBy method.

  • RANK: This function returns the rank of a row within a window partition, with ties receiving the same rank and leaving gaps.
  • DENSE_RANK: This function returns the rank of a row within a window partition, with ties receiving the same rank and no gaps.
  • ROW_NUMBER: This function returns a sequential number starting from 1 to each row within a window partition.

Let’s see an example where we will use the partitionBy and orderBy method with RANK, DENSE_RANK and ROW_NUMBER functions.

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, row_number

windowSpec = Window.partitionBy(df['state']).orderBy(df['sales'].desc())

ranked = df.select('*', rank().over(windowSpec).alias('rank'), dense_rank().over(windowSpec).alias('dense_rank'), row_number().over(windowSpec).alias('row_number'))
ranked.show()

Output:

+-----+-------+------+----+----------+---------+----------+
|state| product|sales|year|     month|     rank|dense_rank|row_number|
+-----+-------+------+----+----------+---------+----------+
|   AZ|Product|  1000|2022|September|        1|         1|         1|
|   AZ|Product|   900|2022|   August|        2|         2|         2|
|   AZ|Product|   800|2022|     July|        3|         3|         3|
|   AZ|Product|   600|2022|     June|        4|         4|         4|
|   CA|Product|  2500|2022|September|        1|         1|         1|
|   CA|Product|  2000|2022|   August|        2|         2|         2|
|   CA|Product|  1500|2022|     July|        3|         3|         3|
|   CA|Product|  1000|2022|     June|        4|         4|         4|
|   FL|Product|  2000|2022|September|        1|         1|         1|
|   FL|Product|  1800|2022|   August|        2|         2|         2|
|   FL|Product|  1600|2022|     July|        3|         3|         3|
|   FL|Product|  1400|2022|     June|        4|         4|         4|
+-----+-------+------+----+----------+---------+----------+

In the above example, we first defined a window specification using partitionBy method and orderBy method with sales in descending order. After that, we applied RANK, DENSE_RANK, and ROW_NUMBER functions on it using the rank(), dense_rank() and row_number() methods respectively. Finally, we selected all the columns from the dataframe and added three new columns using alias() method.

When not to use window functions

Although PySpark window functions provide a powerful and efficient way of performing complex calculations over a large dataset, there are certain situations where they might not be the best choice:

  1. Small datasets: If the dataset is small enough to fit in memory, it might be faster to perform the calculations using regular PySpark functions instead of window functions.
  2. Simple aggregations: If the calculations involve simple aggregations like sum, count, min, max etc., it might be faster to use the regular PySpark aggregation functions instead of window functions.
  3. Unordered data: If the data is not ordered or if the order doesn’t matter for the calculations, then using window functions might not be necessary.
  4. Complex logic: If the calculations involve complex logic that cannot be expressed using window functions, then using regular PySpark functions might be the only option.

Related Posts:

Closing Thoughts

In this article, we have learned about PySpark window functions and how to use them to perform row-wise ordering and ranking operations. We have seen how to use the orderBy and rank functions to assign ranks to rows based on the values in a specific column, and how to use the denseRank function to assign the same rank to rows with the same value in the ranking column. We have also seen when not to use window functions and use other aggregation functions instead.

Window functions are powerful tools that can be used to efficiently analyze data in PySpark. By understanding how they work and when to use them, we can gain valuable insights into our data and make more informed decisions.


[jetpack-related-posts]

1 Comment

  1. […] PySpark window functions to create lagged columns in a DataFrame. This post is in continuation of PySpark Window Functions – Row-Wise Ordering, Ranking, and Cumulative Sum with Real-World Examples… & PySpark Window Functions – Simple Aggregation: A Real-World […]

Leave a Reply

Scroll to top