與PySpark的邂逅—資料工程師必備能力

林育銘
10 min readMar 29, 2021

身為一名資料工程師,想必大家對於Python的Pandas套件相當熟悉,用其處理各式各樣的專案需求,通常公司也會配備單台設備規格較佳的虛擬機,供資料工程師處理操作大量的數據。

然而,當數據量達到百萬、甚至是千萬級別時,無論是數據的載入或是運算,往往消耗大量的記憶體,甚至是超出機器的記憶體空間,或者是佔滿機器的CPU,導致運算效率低落;相信讀者第一直覺會想到使用Pandas的chunksize的功能,分批處理數據,然而仍無法有效縮短數據運算的時間。

試想如同時使用多台機器各自處理數據的一部分,最後再匯集成最終結果,這種平行運算的概念,不僅有效降低每一台機器需處理的數據量,也能縮短數據處理的時間,因此分布式運算的工具就此如雨後春筍般誕生,Hadoop的MapReduce工具便是沿著上述概念發展而來,然而近年更火熱的Spark,完全在記憶體內進行運算,加快數據處理的速度,其支援各種不同的程式語言,由於資料工程師較熟悉Python,因此,本文將介紹PySpark的環境建置以及基本的操作,如有錯誤,還煩請指教。

一、Spark環境建置

首先,在Ubuntu 18.04的作業系統,依序安裝Java、Scala和Spark的環境:

  1. Java的安裝
$ sudo apt-get update
$ sudo apt-get install openjdk-8-jdk

2. Scala的安裝

$ wget https://downloads.lightbend.com/scala/2.12.8/scala-2.12.8.deb
$ sudo dpkg -i scala-2.12.8.deb

3. Spark的安裝

$ wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
$ tar xvf spark-2.4.7-bin-hadoop2.7.tgz

4. 設定環境變量 vi ~/.bashrc

export SPARK_HOME=/spark-2.4.7-bin-hadoop2.7
export PATH=${SPARK_HOME}/bin:$PATH

5. 激活環境變量

$ source ~/.bashrc

此時,Spark已經安裝完成,讀者可以輸入pyspark --version確認是否安裝成功。

二、PySpark基本操作

前情提要,Spark的資料型態可以分為三種: 低階的RDD和高階的DataFrame、Dataset(相對於RDD增加資料格式的概念)。在一般實際的PySpark專案開發中,因效能以及開發成本的考量,通常會使用封裝好的高階DataFrame進行開發,如讀者想了解PySpark在操作RDD與DataFrame上效能的差異,可以自行查閱相關文章。

接下來,我們可以使用其提供內建的JSON數據,來熟悉簡單的PySpark語法。

在終端機中,輸入pyspark後,便可以進入交互(shell)模式。首先,我們來加載JSON的數據。

from pyspark.sql.context import SQLContextfrom pyspark import SparkContextsc = sqlContext()sqlContext= SQLContext(sc)df = sqlContext.read.format("json").load("/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/employees.json")

SparkContext是啟動Spark程序的入口點,能建立與集群、資源管理器的溝通,來協調並且執行工作,可以想像為Java的main()函數。

SparkSQL的進入點為SQLContext,用於處理結構化的數據。一但被初始化後,我們就可以在PySpark中開始處理DataFrame與DataSet的操作。

然而,在交互模式下,Spark會自動幫助我們初始化進入點,我們輸入sc便會出現<SparkContext master=local[*] appName=PySparkShell>,我們毋須初始化。

介紹完Spark入口點後,我們在shell模式下,輸入df並無法立即看到數據,僅出現DataFrame[name: string, salary: bigint]的數據格式,由於PySpark有個重要的特性即為惰性機制,如使用Transformation操作,如上述的load()方法,Spark並不會立即觸發執行的機制,但使用Action操作,例如show(), count()等,才會開始進行數據的運算並輸出結果。

首先,查看數據的資料型別:

df.printSchema()

獲取數據的樣貌: (默認顯示前20筆的數據)

df.show()

獲取第一條數據:

df.first()

我們來創建新欄位,並且針對新欄位進行賦值吧!

請讀者注意,由於 Spark 在操作的時候都是以 RDD (Resilient Distributed Datasets) 或是 DataFrame 的形式,若不是這兩個形態是不能互相處理任何事情,例如不能直接賦值,否則會出現以下錯誤:

df= df.withColumn("department", "A")會出現以下錯誤:
AssertionError: col should be Column

正確賦值的寫法:

from pyspark.sql.functions import lit
df = df.withColumn("department", lit("A")) # 單筆賦值
from pyspark.sql.functions import array
inputs = ["A", "B", "C"]
df.withColumn("department", array([lit(x) for x in inputs])).show() #list賦值

我們來根據薪水來進行排序,看每個人的薪資排名!

from pyspark.sql.functions import descdf.orderBy(desc("salary")).select("name", "salary").show()

我們來篩選一下,A部門薪資大於4000元的員工。

from pyspark.sql.functions import col
df.where(col("salary") > 4000).show()

我們再載入相同JSON檔數據,並且為其命名為B部門,並且將公司A、B部門員工匯集在一起。

df2 = sqlContext.read.format("json").load("/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/employees.json")df2 = df2.withColumn("department", lit("B"))
df3 = df.union(df2)

我們根據員工薪資多寡,進行職位的劃分,薪資超過4000元者視為資深工程師。

from pyspark.sql.functions import whendf3 = df3.withColumn("position", when(df3["salary"]>=4000, lit("senior")).otherwise(lit("junior")))df3.show()

我們可以統計每個部門的人員組成。

from pyspark.sql.functions import collect_listdf3.groupBy("department").agg(collect_list("name")).show()

我們可以計算出公司每個月投入多少資金在各部門的人事成本上。

df3 = df3.groupBy("department").agg({"salary":"sum"})df3.withColumn("cost",df3["sum(salary)"].cast("int")).drop("sum(salary)").show()

如果groupBy的ETL邏輯相當複雜,建議可使用其內建提供的@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)groupBy().apply()方法,有興趣者可自行翻閱相關文檔,或者查看筆者的GitHub操作範例進行操作。

查看公司最高和最低薪資。

from pyspark.sql.functions import min, maxdf.select(min("salary"), max("salary")).show()

我們來處理遺漏值吧。

df.na.drop()df.na.fill(0)

我們來為新欄位重新命名,並且刪除欄位!

df.withColumnRenamed("name", "Employee").show()df.drop("salary").show(5)

PySpark內建的方法,可能無法滿足實際的開發需求,在Pandas當中,如我們想針對某一欄位進行自定義的清洗邏輯,通常我們會使用apply()和map()的函數,然而,在PySpark中我們可以使用其提供的套件—UDF。

from pyspark.sql.functions import udffrom pyspark.sql.types import StringTypedef get_position(salary):    if salary >= 4000:        return "senior"    else:        return "junior"get_position_udf= udf(get_position, StringType())df.withColumn("position", get_position_udf("salary")).show()

至此便介紹完PySpark的基本概念與各種常見的操作。如讀者想查看DataFrame更多的操作方法,可以查閱官網關於pyspark.sql.functions的說明

下節將詳細介紹如何透過PySpark串接各式各樣常見的數據倉儲,例如MySQL、MongoDB、Elasticsearch與HBase,如讀者認為該篇文章有所助益,可多多分享並且拍手,表示對我的支持與鼓勵。

--

--