和基于Scala Spark API一样,我们也期望在Pyspark中也能够将不同的transformation方法连接起来统一执行。

这篇博客主要描述如何给Pyspark DataFrame添加transform方法,从而支持能够将自定义的DataFrame transformation连接起来。

同时,我们也会介绍如何利用cytoolz来批量顺序执行自定义变换函数

Chaining DataFrame Transformations with Lambda

首先,在原生的Pyspark DataFrame增加transform方法,从而我们能够串联执行DataFrame变换

1
2
3
4
5
6
from pyspark.sql.dataframe import DataFrame

def transform(self, f):
return f(self)

DataFrame.transform = transform

接下来,我们定义一些简单的DataFrame变换方法来测试transform

1
2
3
4
5
def with_greeting(df):
return df.withColumn("greeting", lit("hi"))

def with_something(df, something):
return df.withColumn("something", lit(something))

创建一个DataFrame然后串联执行with_greetingwith_something

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
data = [("jsoe", 1), ("li", 2), ("liz", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])

actual_df = (source_df
.transform(lambda df: with_greeting(df))
.transform(lambda df: with_something(df, "crazy")))

print(actual_df.show())

+----+---+--------+---------+
|name|age|greeting|something|
+----+---+--------+---------+
|jose| 1| hi| crazy|
| li| 2| hi| crazy|
| liz| 3| hi| crazy|
+----+---+--------+---------+

对于只有一个DataFrame参数的自定义变换中,lambda是可以省略的,从而我们可以简化调用方式如下

1
2
3
actual_df = (source_df
.transform(with_greeting)
.transform(lambda df: with_something(df, "crazy")))

如果我们没有定义DataFrame#transform方法,我们就不需要像下面的代码一样,来调用不同的transformation

1
2
df1 = with_greeting(source_df)
actual_df = with_something(df1, "moo")

比较上述代码,采用transform来调用不同的DataFrame的变化,能够避免定义中间DataFrame,从而使得代码更加清晰

接下来,我们进一步探讨transformations的其他定义形式,让整个transform更加清晰明了

Chaining DataFrame Transformations with functools.partial

定义一个with_jacket DataFrame变换,该变换将会增加jacket列到原始DataFrame中

1
2
def with_jacket(word, df):
return df.withColumn("jacket", lit(word))

我们使用上文中同样的source_dfwith_greeting方法,采用functools.partial来连接不同的变换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from functools import partial

actual_df = (source_df
.transform(with_greeting)
.transform(partial(with_jacket, "warm")))

print(actual_df.show())

+----+---+--------+------+
|name|age|greeting|jacket|
+----+---+--------+------+
|jose| 1| hi| warm|
| li| 2| hi| warm|
| liz| 3| hi| warm|
+----+---+--------+------+

从上文可以看出,functools.partial能够帮助我们节省掉写lambda, 不过我们可以做的更好

Defining DataFrame transformations as nested functions

利用嵌套方式来定义DataFrame Transformation,是一种更为优雅的方式解决连接调用,我们定义一个with_funny防暑,并且增加一列funny到DataFrame

1
2
3
4
def with_funny(word):
def inner(df):
return df.withColumn("funny", lit(word))
return inner

同样采用上文中的source_dfwith_greeting

1
2
3
4
5
6
7
8
9
10
11
12
13
actual_df = (source_df
.transform(with_greeting)
.transform(with_funny("haha)))

print(actual_df.show())

+----+---+--------+-----+
|name|age|greeting|funny|
+----+---+--------+-----+
|jose| 1| hi| haha|
| li| 2| hi| haha|
| liz| 3| hi| haha|
+----+---+--------+-----+

以上,我们可以发现,已经可以完全摆脱lambda关键字,调用的方式和Scala API基本一致

Function composition with cytoolz

我们可以在定义DataFrame transformation的时候,增加@curry装饰器,并且利用cytoolz提供的composition函数来运行他们

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from cytoolz import curry
from cytoolz.functoolz import compose

@curry
def with_stuff1(arg1, arg2, df):
return df.withColumn("stuff1", lit(f"{arg1} {arg2}"))

@curry
def with_stuff2(arg, df):
return df.withColumn("stuff2", lit(arg))

data = [("jose", 1), ("li", 2), ("liz", 3)]
source_df = spark.createDataFrame(data, ["name", "age"])


pipeline = compose(
with_stuff1("nice", "person"),
with_stuff2("yoyo")
)
actual_df = pipeline(source_df)

print(actual_df.show())
+----+---+------+-----------+
|name|age|stuff2| stuff1|
+----+---+------+-----------+
|jose| 1| yoyo|nice person|
| li| 2| yoyo|nice person|
| liz| 3| yoyo|nice person|
+----+---+------+-----------+

但是需要注意的是,compose函数是从右往左(下往上)执行,因此为了能够满足从上往下执行的习惯,需要做如下修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pipeline = compose(*reversed([
with_stuff1("nice", "person"),
with_stuff2("yoyo")
]))
actual_df = pipeline(source_df)

print(actual_df.show())
+----+---+-----------+------+
|name|age| stuff1|stuff2|
+----+---+-----------+------+
|jose| 1|nice person| yoyo|
| li| 2|nice person| yoyo|
| liz| 3|nice person| yoyo|
+----+---+-----------+------+