learn-pyspark-window-functions-datashark.academy

PySpark Window Functions – Combining Windows and Calling Different Columns

This post may contain affiliate links. Please read our disclosure for more info.

PySpark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows.

PySpark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. PySpark SQL supports three kinds of window functions:

  • ranking functions
  • analytic functions
  • aggregate functions
import pandas as pd
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from pyspark.sql import Window

# Create a spark session
spark_session = SparkSession.builder.getOrCreate()

# lets define a demonstration DataFrame to work on
df_data = {'partition': ['a','a', 'a', 'a', 'b', 'b', 'b', 'c', 'c',],
           'col_1': [1,1,1,1,2,2,2,3,3,], 
           'aggregation': [1,2,3,4,5,6,7,8,9,],
           'ranking': [4,3,2,1,1,1,3,1,5,],
           'lagging': [9,8,7,6,5,4,3,2,1,],
           'cumulative': [1,2,4,6,1,1,1,20,30,],
          }
df_pandas = pd.DataFrame.from_dict(df_data)
# create spark dataframe
df = spark_session.createDataFrame(df_pandas)

df.show()
+---------+-----+-----------+-------+-------+----------+
|partition|col_1|aggregation|ranking|lagging|cumulative|
+---------+-----+-----------+-------+-------+----------+
|        a|    1|          1|      4|      9|         1|
|        a|    1|          2|      3|      8|         2|
|        a|    1|          3|      2|      7|         4|
|        a|    1|          4|      1|      6|         6|
|        b|    2|          5|      1|      5|         1|
|        b|    2|          6|      1|      4|         1|
|        b|    2|          7|      3|      3|         1|
|        c|    3|          8|      1|      2|        20|
|        c|    3|          9|      5|      1|        30|
+---------+-----+-----------+-------+-------+----------+

Combining Windows and Calling Different Columns

It is also possible to combine windows and also to call windows on columns other than the ordering column. These more advanced uses can require careful thought to ensure you achieve the intended results.

Running totals also require a more complicated window as here.

TOP PAYING JOBS REQUIRE THIS SKILL

ENROLL AT 90% OFF TODAY

Complete ElasticSearch Integration with LogStash, Hadoop, Hive, Pig, Kibana and MapReduce - DataSharkAcademy

cumulative_window_2 = Window.partitionBy(
  'partition'
).orderBy(
  'cumulative'
# in this case we will use rangeBetween for the sum
).rangeBetween(
# In this case we need to use Window.unboundedPreceding to catch all earlier rows
  Window.unboundedPreceding, 0
)
df_cumulative_2 = df.select(
  'partition', 'cumulative'
).withColumn(
  'cumulative_sum', fn.sum('cumulative').over(cumulative_window_2)
)

df_cumulative_2.show()
# note the summing behavior where multiple identical values are present in the orderBy column
+---------+----------+--------------+
|partition|cumulative|cumulative_sum|
+---------+----------+--------------+
|        c|        20|            20|
|        c|        30|            50|
|        b|         1|             3|
|        b|         1|             3|
|        b|         1|             3|
|        a|         1|             1|
|        a|         2|             3|
|        a|         4|             7|
|        a|         6|            13|
+---------+----------+--------------+

Related Posts:

You might also like:   Credit Risk Assessment with Decision Trees: A Practical Application in Python

Combining Windows and Calling Different Columns


It is also possible to combine windows and also to call windows on columns other than the ordering column. These more advanced uses can require careful thought to ensure you achieve the intended results


# we can make a window function equivalent to a standard groupBy:

# first define two windows
aggregation_window = Window.partitionBy('partition')
grouping_window = Window.partitionBy('partition').orderBy('aggregation')

# then we can use this window function for our aggregations
df_aggregations = df.select(
  'partition', 'aggregation'
).withColumn(
  # note that we calculate row number over the grouping_window
  'group_rank', fn.row_number().over(grouping_window) 
).withColumn(
  # but we calculate other columns over the aggregation_window
  'aggregation_sum', fn.sum('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_avg', fn.avg('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_min', fn.min('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_max', fn.max('aggregation').over(aggregation_window),
).where(
  fn.col('group_rank') == 1
).select(
  'partition', 
  'aggregation_sum', 
  'aggregation_avg', 
  'aggregation_min', 
  'aggregation_max'
)

df_aggregations.show()

This is equivalent to the rather simpler expression below

df_groupby = df.select(
  'partition', 'aggregation'
).groupBy(
  'partition'
).agg(
  fn.sum('aggregation').alias('aggregation_sum'),
  fn.avg('aggregation').alias('aggregation_avg'),
  fn.min('aggregation').alias('aggregation_min'),
  fn.max('aggregation').alias('aggregation_max'),
)

df_groupby.show()
+---------+---------------+---------------+---------------+---------------+
|partition|aggregation_sum|aggregation_avg|aggregation_min|aggregation_max|
+---------+---------------+---------------+---------------+---------------+
|        c|             17|            8.5|              8|              9|
|        b|             18|            6.0|              5|              7|
|        a|             10|            2.5|              1|              4|
+---------+---------------+---------------+---------------+---------------+

+---------+---------------+---------------+---------------+---------------+
|partition|aggregation_sum|aggregation_avg|aggregation_min|aggregation_max|
+---------+---------------+---------------+---------------+---------------+
|        c|             17|            8.5|              8|              9|
|        b|             18|            6.0|              5|              7|
|        a|             10|            2.5|              1|              4|
+---------+---------------+---------------+---------------+---------------+

In some cases we can create a window on one column but use the window on another column.
Note that only functions where the column is specified allow this

lag_window = Window.partitionBy('partition').orderBy('lagging')
df_cumulative_2 = df.select(
  'partition', 'lagging', 'cumulative',
).withColumn(
  'lag_the_laggging_col', fn.lag('lagging', 1).over(lag_window)
).withColumn(
  # It is possible to lag a column which was not the orderBy column
  'lag_the_cumulative_col', fn.lag('cumulative', 1).over(lag_window)
)

df_cumulative_2.show()
+---------+-------+----------+--------------------+----------------------+
|partition|lagging|cumulative|lag_the_laggging_col|lag_the_cumulative_col|
+---------+-------+----------+--------------------+----------------------+
|        c|      1|        30|                null|                  null|
|        c|      2|        20|                   1|                    30|
|        b|      3|         1|                null|                  null|
|        b|      4|         1|                   3|                     1|
|        b|      5|         1|                   4|                     1|
|        a|      6|         6|                null|                  null|
|        a|      7|         4|                   6|                     6|
|        a|      8|         2|                   7|                     4|
|        a|      9|         1|                   8|                     2|
+---------+-------+----------+--------------------+----------------------+

[jetpack-related-posts]

Leave a Reply

Scroll to top