PySpark Partitioning by Multiple Columns - A Complete Guide with Examples

PySpark Partitioning by Multiple Columns – A Complete Guide with Examples

This post may contain affiliate links. Please read our disclosure for more info.

PySpark provides a powerful way to aggregate, transform, and analyze data using window functions. In this article, we will discuss how to use PySpark partition by multiple columns to group data by multiple columns for complex analysis.

What is Partitioning in PySpark?

Partitioning in PySpark is a way of dividing a dataset into smaller subsets, or partitions, based on one or more columns. Partitioning is an essential step in many PySpark operations, such as sorting, grouping, and joining, as it enables PySpark to distribute data across a cluster for parallel processing.

In PySpark, you can partition data by one or more columns using the partitionBy() method. The partitionBy() method takes one or more column names as arguments and returns a PySpark DataFrame partitioned by those columns.

Example Dataset

For our examples, we will use a dataset that contains sales data for a retail store. The dataset contains the following columns:

  • Date: the date of the sale
  • Store: the store where the sale took place
  • Product: the product that was sold
  • Units Sold: the number of units sold
  • Revenue: the revenue generated by the sale

The dataset looks like this:

DateStoreProductUnits SoldRevenue
2021-01-01AApple10100
2021-01-02AApple20200
2021-01-03AApple30300
2021-01-04BBanana550
2021-01-05BBanana10100
2021-01-06BBanana15150

Example 1: Partitioning by a Single Column

To partition the sales data by store, we can use the partitionBy() method as follows:

from pyspark.sql.window import Window
from pyspark.sql.functions import sum

# create a PySpark DataFrame from the sales data
sales_df = spark.createDataFrame([
  ('2021-01-01', 'A', 'Apple', 10, 100),
  ('2021-01-02', 'A', 'Apple', 20, 200),
  ('2021-01-03', 'A', 'Apple', 30, 300),
  ('2021-01-04', 'B', 'Banana', 5, 50),
  ('2021-01-05', 'B', 'Banana', 10, 100),
  ('2021-01-06', 'B', 'Banana', 15, 150),
], ['Date', 'Store', 'Product', 'Units Sold', 'Revenue'])

# partition the sales data by store
window = Window.partitionBy('Store')
sales_df_partitioned = sales_df.withColumn('Total Revenue', sum('Revenue').over(window))

In this example, we partition the sales data by the ‘Store’ column using the partitionBy() method. We then use the sum() function to calculate the total revenue for each store using a window function with the over() method. The result is a new DataFrame with an additional ‘Total Revenue’ column that contains the total revenue for each store.

You might also like:   Polars - A Lightning-Fast DataFrame Library for Rust and Python

Example 2: Partitioning by Multiple Columns

In Example 1, we partitioned the data by a single column department, but what if we want to partition the data by multiple columns? We can simply pass a list of columns to the partitionBy method.

Let’s consider the following example:

Suppose we have a sales dataset that contains information about sales made by sales representatives in different departments and different regions. We want to calculate the running total of sales for each sales representative, department, and region.

Here is a sample dataset:

sales_repdepartmentregiondatesales
AliceHREast2021-01-01100
BobHREast2021-01-01200
CharlieSalesWest2021-01-01150
AliceHREast2021-01-0250
BobHREast2021-01-02100
CharlieSalesWest2021-01-02200
AliceHREast2021-01-03300
BobHREast2021-01-03400
CharlieSalesWest2021-01-03250

We can calculate the running total of sales for each sales representative, department, and region using the partitionBy method as follows:

from pyspark.sql.functions import col, sum
from pyspark.sql.window import Window

window_spec = Window.partitionBy(["sales_rep", "department", "region"]).orderBy("date")

sales_df.withColumn("running_total", sum(col("sales")).over(window_spec)).show()

This will give us the following output:

+---------+----------+------+----------+-----+-------------+
|sales_rep|department|region|      date|sales|running_total|
+---------+----------+------+----------+-----+-------------+
|    Alice|        HR|  East|2021-01-01|  100|          100|
|      Bob|        HR|  East|2021-01-01|  200|          200|
|  Charlie|     Sales|  West|2021-01-01|  150|          150|
|    Alice|        HR|  East|2021-01-02|   50|          150|
|      Bob|        HR|  East|2021-01-02|  100|          300|
|  Charlie|     Sales|  West|2021-01-02|  200|          350|
|    Alice|        HR|  East|2021-01-03|  300|          450|
|      Bob|        HR|  East|2021-01-03|  400|          700|
|  Charlie|     Sales|  West|2021-01-03|  250|          600|
+---------+----------+------+----------+-----+-------------+

In this example, we partitioned the data by three columns: sales_rep, department, and region. The orderBy clause is used to order the data within each partition by the date column.

You might also like:   How to use Python Collection Module's Counter class

Read More:

Example 3 – PySpark Partitioning by Multiple Columns

Suppose you have a sales dataset with columns store_id, product_id, date, and sales_amount. You want to calculate the cumulative sales for each product in each store, but partitioned by both store_id and product_id.

First, let’s create the sample data:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# create SparkSession
spark = SparkSession.builder.appName("PartitionByMultipleColumns").getOrCreate()

# create sample data
sales_data = [("store_1", "product_1", "2022-04-01", 100),
              ("store_1", "product_1", "2022-04-02", 200),
              ("store_1", "product_2", "2022-04-01", 300),
              ("store_1", "product_2", "2022-04-02", 400),
              ("store_2", "product_1", "2022-04-01", 500),
              ("store_2", "product_1", "2022-04-02", 600),
              ("store_2", "product_2", "2022-04-01", 700),
              ("store_2", "product_2", "2022-04-02", 800)]
sales_df = spark.createDataFrame(sales_data, ["store_id", "product_id", "date", "sales_amount"])
sales_df.show()

Output:

+--------+----------+----------+-------------+
|store_id|product_id|      date|sales_amount |
+--------+----------+----------+-------------+
| store_1| product_1|2022-04-01|          100|
| store_1| product_1|2022-04-02|          200|
| store_1| product_2|2022-04-01|          300|
| store_1| product_2|2022-04-02|          400|
| store_2| product_1|2022-04-01|          500|
| store_2| product_1|2022-04-02|          600|
| store_2| product_2|2022-04-01|          700|
| store_2| product_2|2022-04-02|          800|
+--------+----------+----------+-------------+

Now, let’s use the window function to calculate the cumulative sales for each product in each store:

WANT TO ADVANCE YOUR CAREER?

Enroll in Master Apache SQOOP complete course today for just $20 (a $200 value)

Only limited seats. Don’t miss this opportunity!!!

 

Mastering Apache Sqoop with Hortonworks Sandbox, Hadoo, Hive & MySQL - DataShark.Academy

Get-Started-20---DataShark.Academy

 

from pyspark.sql.window import Window
from pyspark.sql.functions import sum

# create window specification
window_spec = Window.partitionBy("store_id", "product_id").orderBy("date").rowsBetween(Window.unboundedPreceding, 0)

# calculate cumulative sales
sales_df = sales_df.withColumn("cumulative_sales", sum(col("sales_amount")).over(window_spec))

Related Posts:

Performance of Multi-Partitions Storage

The performance of read and write queries for the data frames in PySpark depends on various factors such as the size of the data, the configuration of the cluster, and the type of queries being executed. In general, when using partitioning, the performance of read and write queries can be significantly improved as compared to non-partitioned data frames.

You might also like:   How to setup Apache Hadoop Cluster on a Mac or Linux Computer

In the case of the partitioned data frames we created in the examples above, the performance of read and write queries is likely to be faster than the non-partitioned data frames with the same amount of data. This is because when reading or writing data from a partitioned data frame, Spark can avoid scanning the entire data set and only focus on the relevant partitions.

However, it’s important to note that partitioning does have some overhead cost, such as the additional time needed to create the partitioned data frames, and the storage cost for the partitioned columns. Therefore, the decision to use partitioning should be based on the specific use case and the trade-off between performance and overhead cost.


[jetpack-related-posts]

Leave a Reply

Scroll to top