與PySpark的邂逅—資料工程師必備能力

林育銘
10 min readMar 29, 2021

身為一名資料工程師,想必大家對於Python的Pandas套件相當熟悉,用其處理各式各樣的專案需求,通常公司也會配備單台設備規格較佳的虛擬機,供資料工程師處理操作大量的數據。

然而,當數據量達到百萬、甚至是千萬級別時,無論是數據的載入或是運算,往往消耗大量的記憶體,甚至是超出機器的記憶體空間,或者是佔滿機器的CPU,導致運算效率低落;相信讀者第一直覺會想到使用Pandas的chunksize的功能,分批處理數據,然而仍無法有效縮短數據運算的時間。

試想如同時使用多台機器各自處理數據的一部分,最後再匯集成最終結果,這種平行運算的概念,不僅有效降低每一台機器需處理的數據量,也能縮短數據處理的時間,因此分布式運算的工具就此如雨後春筍般誕生,Hadoop的MapReduce工具便是沿著上述概念發展而來,然而近年更火熱的Spark,完全在記憶體內進行運算,加快數據處理的速度,其支援各種不同的程式語言,由於資料工程師較熟悉Python,因此,本文將介紹PySpark的環境建置以及基本的操作,如有錯誤,還煩請指教。

一、Spark環境建置

首先,在Ubuntu 18.04的作業系統,依序安裝Java、Scala和Spark的環境:

  1. Java的安裝
$ sudo apt-get update
$ sudo apt-get install openjdk-8-jdk

2. Scala的安裝

$ wget https://downloads.lightbend.com/scala/2.12.8/scala-2.12.8.deb
$ sudo dpkg -i scala-2.12.8.deb

3. Spark的安裝

$ wget http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
$ tar xvf spark-2.4.7-bin-hadoop2.7.tgz

4. 設定環境變量 vi ~/.bashrc

export SPARK_HOME=/spark-2.4.7-bin-hadoop2.7
export PATH=${SPARK_HOME}/bin:$PATH

5. 激活環境變量

$ source ~/.bashrc

此時,Spark已經安裝完成,讀者可以輸入pyspark --version確認是否安裝成功。

二、PySpark基本操作

前情提要,Spark的資料型態可以分為三種: 低階的RDD和高階的DataFrame、Dataset(相對於RDD增加資料格式的概念)。在一般實際的PySpark專案開發中,因效能以及開發成本的考量,通常會使用封裝好的高階DataFrame進行開發,如讀者想了解PySpark在操作RDD與DataFrame上效能的差異,可以自行查閱相關文章。

接下來,我們可以使用其提供內建的JSON數據,來熟悉簡單的PySpark語法。

在終端機中,輸入pyspark後,便可以進入交互(shell)模式。首先,我們來加載JSON的數據。

from pyspark.sql.context import SQLContextfrom pyspark import SparkContextsc = sqlContext()sqlContext= SQLContext(sc)df = sqlContext.read.format("json").load("/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/employees.json")

SparkContext是啟動Spark程序的入口點,能建立與集群、資源管理器的溝通,來協調並且執行工作,可以想像為Java的main()函數。

SparkSQL的進入點為SQLContext,用於處理結構化的數據。一但被初始化後,我們就可以在PySpark中開始處理DataFrame與DataSet的操作。

然而,在交互模式下,Spark會自動幫助我們初始化進入點,我們輸入sc便會出現<SparkContext master=local[*] appName=PySparkShell>,我們毋須初始化。

介紹完Spark入口點後,我們在shell模式下,輸入df並無法立即看到數據,僅出現DataFrame[name: string, salary: bigint]的數據格式,由於PySpark有個重要的特性即為惰性機制,如使用Transformation操作,如上述的load()方法,Spark並不會立即觸發執行的機制,但使用Action操作,例如show(), count()等,才會開始進行數據的運算並輸出結果。

首先,查看數據的資料型別:

df.printSchema()

獲取數據的樣貌: (默認顯示前20筆的數據)

df.show()

獲取第一條數據:

df.first()

我們來創建新欄位,並且針對新欄位進行賦值吧!

請讀者注意,由於 Spark 在操作的時候都是以 RDD (Resilient Distributed Datasets) 或是 DataFrame 的形式,若不是這兩個形態是不能互相處理任何事情,例如不能直接賦值,否則會出現以下錯誤:

df= df.withColumn("department", "A")會出現以下錯誤:
AssertionError: col should be Column

正確賦值的寫法:

from pyspark.sql.functions import lit
df = df.withColumn("department", lit("A")) # 單筆賦值
from pyspark.sql.functions import array
inputs = ["A", "B", "C"]
df.withColumn("department", array([lit(x) for x in inputs])).show() #list賦值

我們來根據薪水來進行排序,看每個人的薪資排名!

from pyspark.sql.functions import descdf.orderBy(desc("salary")).select("name", "salary").show()

我們來篩選一下,A部門薪資大於4000元的員工。

from pyspark.sql.functions import col
df.where(col("salary") > 4000).show()

我們再載入相同JSON檔數據,並且為其命名為B部門,並且將公司A、B部門員工匯集在一起。

df2 = sqlContext.read.format("json").load("/spark-2.4.7-bin-hadoop2.7/examples/src/main/resources/employees.json")df2 = df2.withColumn("department", lit("B"))
df3 = df.union(df2)

我們根據員工薪資多寡,進行職位的劃分,薪資超過4000元者視為資深工程師。

from pyspark.sql.functions import whendf3 = df3.withColumn("position", when(df3["salary"]>=4000, lit("senior")).otherwise(lit("junior")))df3.show()

我們可以統計每個部門的人員組成。

from pyspark.sql.functions import collect_listdf3.groupBy("department").agg(collect_list("name")).show()

我們可以計算出公司每個月投入多少資金在各部門的人事成本上。

df3 = df3.groupBy("department").agg({"salary":"sum"})df3.withColumn("cost",df3["sum(salary)"].cast("int")).drop("sum(salary)").show()

如果groupBy的ETL邏輯相當複雜,建議可使用其內建提供的@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)groupBy().apply()方法,有興趣者可自行翻閱相關文檔,或者查看筆者的GitHub操作範例進行操作。

查看公司最高和最低薪資。

from pyspark.sql.functions import min, maxdf.select(min("salary"), max("salary")).show()

我們來處理遺漏值吧。

df.na.drop()df.na.fill(0)

我們來為新欄位重新命名,並且刪除欄位!

df.withColumnRenamed("name", "Employee").show()df.drop("salary").show(5)

PySpark內建的方法,可能無法滿足實際的開發需求,在Pandas當中,如我們想針對某一欄位進行自定義的清洗邏輯,通常我們會使用apply()和map()的函數,然而,在PySpark中我們可以使用其提供的套件—UDF。

from pyspark.sql.functions import udffrom pyspark.sql.types import StringTypedef get_position(salary):    if salary >= 4000:        return "senior"    else:        return "junior"get_position_udf= udf(get_position, StringType())df.withColumn("position", get_position_udf("salary")).show()

至此便介紹完PySpark的基本概念與各種常見的操作。如讀者想查看DataFrame更多的操作方法,可以查閱官網關於pyspark.sql.functions的說明

下節將詳細介紹如何透過PySpark串接各式各樣常見的數據倉儲,例如MySQL、MongoDB、Elasticsearch與HBase,如讀者認為該篇文章有所助益,可多多分享並且拍手,表示對我的支持與鼓勵。

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

林育銘
林育銘

Written by 林育銘

Graduated from NTU, and worked as backend engineer. Github:https://github.com/LinYuMingBejing

No responses yet

Write a response