PySpark — 串接MySQL、HBase、Elasticsearch與MongoDB

林育銘
11 min readMar 29, 2021

--

上一節已詳細說明PySpark的安裝與重要的概念,請參見這裡,這章則會著重介紹PySpark如何串接各式各樣數據庫,包含MySQL、MongoDB、Elasticsearch與HBase。

一、MySQL的串接

關於PySpark如何連結MySQL,讀者可以在終端機輸入pyspark命令時,在其後面加入--packages mysql:mysql-connector-java:8.0.108,便可以遠端下載Spark連接MySQL的依賴包,具體操作如下:

$ pyspark --packages mysql:mysql-connector-java:8.0.18

進入交互模式後,我們可以修改MySQL的參數配置,加載MySQL的數據。

mysql_conf = {
"url": "jdbc:mysql://{host}/{schema}?serverTimezone=Asia/Taipei",
"driver": "com.mysql.cj.jdbc.Driver",
"user": "{username}",
"password": "{password}"
}
df = sqlContext.read.format("jdbc")\
.options(dbtable="pages", **mysql_conf)\
.load()
df.show()

上述參數說明:

  1. url參數為配置MySQL機器的IP地址,如數據牽涉關於DateType與TimestampType的操作時,則務必加上serverTimezone=Asia/Taipei,如未添加,則默認時區為UTC+0的標準時間;schema則為MySQL使用的Database。
  2. driver參數請使用com.mysql.cj.jdbc.Driver。
  3. user參數為MySQL使用的用戶名。
  4. password參數為MySQL的登入密碼。
  5. dbtables請配置欲操作的表名。

讀者可以根據自己的MySQL設置,配置相關參數,爾後便可以直接加載MySQL中的數據。

mysql_conf = {
"url": "jdbc:mysql://{host}/{schema}?useServerPrepStmts=false&rewriteBatchedStatements=true",
"driver": "com.mysql.cj.jdbc.Driver",
"user": "{username}",
"password": "{password}"
}
df.write.format("jdbc") \
.options(dbtable="selected_pages",truncate="true", **mysql_conf)\
.mode("overwrite") \
.save()

透過PySpark將數據寫入MySQL的方法如上。

二、MongoDB的串接

至於PySpark連線MongnDB的操作如下:

$ pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0

我們來載入MongoDB的數據吧!

spark = SparkSession.builder.master("local")\
.config("spark.mongodb.input.partitioner", "MongoShardedPartitioner")\
.config("spark.mongodb.input.uri", "{mongodb_input_uri}")\
.config("spark.mongodb.output.uri", "{mongodb_output_uri}")\
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.11:2.3.1")\
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("database", "booking")\
.option("collection", "hotel")\
.load()
df.show()

上述參數說明:

  1. spark.mongodb.input.uri為加載MongoDB數據的IP地址。
  2. spark.mongodb.output.uri為存儲數據至MongoDB的IP地址

讀者可以根據MongoDB的設置,配置spark.mongodb.input.uri、spark.mongodb.output.uri、database與collection的參數,便能載入MongoDB的數據。

MongoDB的寫入如下:

df.write.format("com.mongodb.spark.sql.DefaultSource")\
.mode("overwrite")\
.save()

三、Elasticsearch的串接

Elasticsearch是常見的全文搜索引擎資料庫,可以在千萬筆級的數據,完成實時的數據搜尋,有賴於其底層的Lucene機制,關於Elasticsearch的原理,筆者便不在此贅述,讀者有興趣可自行查閱相關文檔。

關於Elasticsearch的連線如下:

$ pyspark --packages org.elasticsearch:elasticsearch-hadoop:7.4.2

首先,讀者可自行配置Elasticsearch的IP地址,以及Index,如數據型態為array的可在es.read.field.as.array.include參數加上該欄位名稱,其餘欄位則添加至es.read.field.include後。

import jsonelasticsearch_conf = {
"es.nodes": "localhost",
"es.port": 9200,
"es.resource": "booking/_doc",
"es.mapping.date.rich": False,
"es.read.field.as.array.include": "tourists",
"es.read.field.include": "pageUrl, hotel, address, city, ratings, description, tourists, facility, created_time"
}

爾後,根據需求,完成Elasticsearch的原生搜尋語句。

query = {
"_source": ["pageUrl, hotel, address, city, ratings, description, tourists, facility"],
"query": {
"bool": {
"must": [
{"term":{"city":city}},
{"exists": {"field": "ratings"}},
{"match_phrase": {"address": address}},
{"range": {
"created_time": {
"gte": start_date, "lte":end_date
}
}
}
]
}
}
}
q = json.dumps(query)

再來,定義數據輸出欄位的格式。

from pyspark.sql.types import StringType, ArrayType, StructType, StructField
schema = StructType([
StructField("pageUrl", StringType(), True),
StructField("hotel", StringType(), True),
StructField("address", StringType(), True),
StructField("city", StringType(), True),
StructField("ratings", StringType(), True),
StructField("description", StringType(), True),
StructField("tourists", ArrayType(StringType())),
StructField("facility", ArrayType(StringType()))
])

如此一來,便能把Elasticsearch的數據加載下來。

df_all = sqlContext.read.format("es") \
.options(**elasticsearch_conf) \
.option("es.query", q) \
.schema(schema) \
.load()
df_all.show()

四、HBase的串接

相信使用Hadoop的公司,會將海量的用戶行為日誌數據儲存至HBase大數據倉儲中,然而PySpark如何串接HBase數據庫,以進行後續的ETL,原先使用RDD的方式加載HBase中的數據,出現了數據載入速度過慢以及後續處理仍須將RDD轉為DataFrame等的問題。因此,筆者花了將近一個月的時間,才找到其開源的packages,讀者們可以點進我的Github,將專案clone下來,並且將相關的jar包移到正確的目錄夾下。筆者工作上使用的HBase版本為2.2.4,讀者們可根據實際的HBase版本,自行至網站下載相對應的版本。

$ pyspark --jars shc-core-spark-2.3.0-hbase-2.1.0.jar, hbase-spark-1.2.0-cdh5.7.1.jar,shc-core-1.1.1–2.1-s_2.11.jar,hbase-client-2.2.4.jar,hbase-server-2.2.4.jar

如shc-core-spark-2.3.0-hbase-2.1.0.jar與讀者使用的實際版本不同,則請參考該文章

關於HBase數據載入的code如下:

catelog = "".join("""{
"table": {"namespace":"default", "name":"footprint"},
"rowkey": "key1",
"columns": {
"key": {"cf":"rowkey", "col":"key1", "type":"string", "length":"12"},
"ip": {"cf":"footprint", "col":"ip", "type":"string"},
"fp": {"cf":"footprint", "col":"fp", "type":"string"},
"url": {"cf":"footprint", "col":"url","type":"string"},
"page_id": {"cf":"footprint”, "col":"page_id", "type":"string"},
"referrer": {"cf":"footprint", "col":"referrer","type":"string"}
}
}""".split())

table參數為HBase的目標Table,cf為HBase的Column Family,col為column,type為數據格式,讀者們可自行修改。

接著,我們就能夠加載HBase的數據了。

data = sqlContext.read\
.format("org.apache.spark.sql.execution.datasources.hbase")\
.options(catalog=catalog) \
.load()

data變量包含所有HBase的數據,此時我們可以根據RowKey篩選需要的數據。

data.createOrReplaceTempView("footprint")

透過createOrReplaceTempView()方法,我們將DataFrame註冊為一個類似MySQL的臨時表,表名為footprint,爾後就可以使用SQL語法,搜尋我們的數據。

df = sqlContext.sql(
"select * from footprint where key > '{start}' and key < '{end}' "
)
df.show()

至此便介紹完PySpark的各種數據串接操作,下節將詳細介紹PySpark如何進行分布式運算的實務操作(筆者將介紹YARN模式),以及效能的調優。如讀者認為該篇文章有所助益,可多多分享並且拍手,表示對我的支持與鼓勵。

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

林育銘
林育銘

Written by 林育銘

Graduated from NTU, and worked as backend engineer. Github:https://github.com/LinYuMingBejing

No responses yet

Write a response