上一節已詳細說明PySpark的安裝與重要的概念,請參見這裡,這章則會著重介紹PySpark如何串接各式各樣數據庫,包含MySQL、MongoDB、Elasticsearch與HBase。
一、MySQL的串接
關於PySpark如何連結MySQL,讀者可以在終端機輸入pyspark
命令時,在其後面加入--packages mysql:mysql-connector-java:8.0.108
,便可以遠端下載Spark連接MySQL的依賴包,具體操作如下:
$ pyspark --packages mysql:mysql-connector-java:8.0.18
進入交互模式後,我們可以修改MySQL的參數配置,加載MySQL的數據。
mysql_conf = {
"url": "jdbc:mysql://{host}/{schema}?serverTimezone=Asia/Taipei",
"driver": "com.mysql.cj.jdbc.Driver",
"user": "{username}",
"password": "{password}"
}df = sqlContext.read.format("jdbc")\
.options(dbtable="pages", **mysql_conf)\
.load()df.show()

上述參數說明:
- url參數為配置MySQL機器的IP地址,如數據牽涉關於DateType與TimestampType的操作時,則務必加上serverTimezone=Asia/Taipei,如未添加,則默認時區為UTC+0的標準時間;schema則為MySQL使用的Database。
- driver參數請使用com.mysql.cj.jdbc.Driver。
- user參數為MySQL使用的用戶名。
- password參數為MySQL的登入密碼。
- dbtables請配置欲操作的表名。
讀者可以根據自己的MySQL設置,配置相關參數,爾後便可以直接加載MySQL中的數據。
mysql_conf = {
"url": "jdbc:mysql://{host}/{schema}?useServerPrepStmts=false&rewriteBatchedStatements=true",
"driver": "com.mysql.cj.jdbc.Driver",
"user": "{username}",
"password": "{password}"}
df.write.format("jdbc") \
.options(dbtable="selected_pages",truncate="true", **mysql_conf)\
.mode("overwrite") \
.save()
透過PySpark將數據寫入MySQL的方法如上。
二、MongoDB的串接
至於PySpark連線MongnDB的操作如下:
$ pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
我們來載入MongoDB的數據吧!
spark = SparkSession.builder.master("local")\
.config("spark.mongodb.input.partitioner", "MongoShardedPartitioner")\
.config("spark.mongodb.input.uri", "{mongodb_input_uri}")\
.config("spark.mongodb.output.uri", "{mongodb_output_uri}")\
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.3.1")\
.getOrCreate()df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("database", "booking")\
.option("collection", "hotel")\
.load()df.show()
上述參數說明:
- spark.mongodb.input.uri為加載MongoDB數據的IP地址。
- spark.mongodb.output.uri為存儲數據至MongoDB的IP地址
讀者可以根據MongoDB的設置,配置spark.mongodb.input.uri、spark.mongodb.output.uri、database與collection的參數,便能載入MongoDB的數據。
MongoDB的寫入如下:
df.write.format("com.mongodb.spark.sql.DefaultSource")\
.mode("overwrite")\
.save()
三、Elasticsearch的串接
Elasticsearch是常見的全文搜索引擎資料庫,可以在千萬筆級的數據,完成實時的數據搜尋,有賴於其底層的Lucene機制,關於Elasticsearch的原理,筆者便不在此贅述,讀者有興趣可自行查閱相關文檔。
關於Elasticsearch的連線如下:
$ pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.4.2
首先,讀者可自行配置Elasticsearch的IP地址,以及Index,如數據型態為array的可在es.read.field.as.array.include參數加上該欄位名稱,其餘欄位則添加至es.read.field.include後。
import jsonelasticsearch_conf = {
"es.nodes": "localhost",
"es.port": 9200,
"es.resource": "booking/_doc",
"es.mapping.date.rich": False,
"es.read.field.as.array.include": "tourists",
"es.read.field.include": "pageUrl, hotel, address, city, ratings, description, tourists, facility, created_time"
}
爾後,根據需求,完成Elasticsearch的原生搜尋語句。
query = {
"_source": ["pageUrl, hotel, address, city, ratings, description, tourists, facility"],
"query": {
"bool": {
"must": [
{"term":{"city":city}},
{"exists": {"field": "ratings"}},
{"match_phrase": {"address": address}},
{"range": {
"created_time": {
"gte": start_date, "lte":end_date
}
}
}
]
}
}
}q = json.dumps(query)
再來,定義數據輸出欄位的格式。
from pyspark.sql.types import StringType, ArrayType, StructType, StructField
schema = StructType([
StructField("pageUrl", StringType(), True),
StructField("hotel", StringType(), True),
StructField("address", StringType(), True),
StructField("city", StringType(), True),
StructField("ratings", StringType(), True),
StructField("description", StringType(), True),
StructField("tourists", ArrayType(StringType())),
StructField("facility", ArrayType(StringType()))
])
如此一來,便能把Elasticsearch的數據加載下來。
df_all = sqlContext.read.format("es") \
.options(**elasticsearch_conf) \
.option("es.query", q) \
.schema(schema) \
.load()
df_all.show()
四、HBase的串接
相信使用Hadoop的公司,會將海量的用戶行為日誌數據儲存至HBase大數據倉儲中,然而PySpark如何串接HBase數據庫,以進行後續的ETL,原先使用RDD的方式加載HBase中的數據,出現了數據載入速度過慢以及後續處理仍須將RDD轉為DataFrame等的問題。因此,筆者花了將近一個月的時間,才找到其開源的packages,讀者們可以點進我的Github,將專案clone下來,並且將相關的jar包移到正確的目錄夾下。筆者工作上使用的HBase版本為2.2.4,讀者們可根據實際的HBase版本,自行至網站下載相對應的版本。
$ pyspark --jars shc-core-spark-2.3.0-hbase-2.1.0.jar, hbase-spark-1.2.0-cdh5.7.1.jar,shc-core-1.1.1–2.1-s_2.11.jar,hbase-client-2.2.4.jar,hbase-server-2.2.4.jar
如shc-core-spark-2.3.0-hbase-2.1.0.jar與讀者使用的實際版本不同,則請參考該文章。
關於HBase數據載入的code如下:
catelog = "".join("""{
"table": {"namespace":"default", "name":"footprint"},
"rowkey": "key1",
"columns": {
"key": {"cf":"rowkey", "col":"key1", "type":"string", "length":"12"},
"ip": {"cf":"footprint", "col":"ip", "type":"string"},
"fp": {"cf":"footprint", "col":"fp", "type":"string"},
"url": {"cf":"footprint", "col":"url","type":"string"},
"page_id": {"cf":"footprint”, "col":"page_id", "type":"string"},
"referrer": {"cf":"footprint", "col":"referrer","type":"string"}
}
}""".split())
table參數為HBase的目標Table,cf為HBase的Column Family,col為column,type為數據格式,讀者們可自行修改。
接著,我們就能夠加載HBase的數據了。
data = sqlContext.read\
.format("org.apache.spark.sql.execution.datasources.hbase")\
.options(catalog=catalog) \
.load()
data變量包含所有HBase的數據,此時我們可以根據RowKey篩選需要的數據。
data.createOrReplaceTempView("footprint")
透過createOrReplaceTempView()方法,我們將DataFrame註冊為一個類似MySQL的臨時表,表名為footprint,爾後就可以使用SQL語法,搜尋我們的數據。
df = sqlContext.sql(
"select * from footprint where key > '{start}' and key < '{end}' "
)
df.show()
至此便介紹完PySpark的各種數據串接操作,下節將詳細介紹PySpark如何進行分布式運算的實務操作(筆者將介紹YARN模式),以及效能的調優。如讀者認為該篇文章有所助益,可多多分享並且拍手,表示對我的支持與鼓勵。