Airflow部署流程與超實用功能介紹

林育銘
8 min readApr 13, 2021

--

Airflow系列文章,都是使用1.10.12版本,然而在筆者撰寫文章時,Airflow版本已經升級為2.0,許多指令、用法與1.X版本已大相逕庭,請讀者閱讀本文時,務必安裝正確的Airflow版本(1.10.12)。

事先準備工具: (1) MySQL (2) Redis

Airflow DB配置

在上篇文章當中,我們透過SQLite存取Airflow的相關配置,然而僅適用於測試環境。在生產環境下,SQLite可能會有數據丟失的問題,因此強烈建議使用MySQL或PostgreSQL資料庫。

本文將使用MySQL作為存儲Airflow配置的資料庫,請讀者事先安裝好MySQL,並且為Airflow創建專屬的Database與角色權限。

mysql> create database airflow;
mysql> CREATE USER 'airflow' IDENTIFIED BY 'airflow';
mysql> GRANT ALL PRIVILEGES ON airflow.* to 'airflow'@'%' WITH GRANT OPTION;
mysql> flush privileges;

修改sql_alchemy_conn。

sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

安裝MySQL的Python套件。

$(env) pip3 install mysqlclient==2.0.1

輸入以下指令,Airflow便會在MySQL airflow的DB中自動生成相關的table。

$(env) airflow upgradedb

Executor配置

在測試環境下,executor默認為LocalExecutor,一個時間點Airflow最多只會執行一個Task。因此,在生產環境下,需將executor改為CeleryExecutor,由Airflow搭配Celery實現複雜的分布式任務調度,並且以Redis作為Celery的broker與backend。

安裝Celery與Redis套建。

$(env) pip3 install celery
$(env) pip3 install redis

允許以root權限啟動Celery。

$(env) export C_FORCE_ROOT=True

修改 airflow.cfg 相關配置。

# airflow.cfg
executor = CeleryExecutor
broker_url = redis://redis:6379/0
result_backend = redis://localhost:6379/2

其他配置

為了保持Airflow Web UI的簡潔,我們需取消Airflow默認的Dag,以及設定時區為UTC+8,確保任務執行時間與當前時間一致。

# airflow.cfg
load_examples = True
default_timezone = Asia/Taipei

更改完上述參數後,我們需再Upgrade Airflow的db。

部署流程

$(env) export AIRFLOW_HOME="$(pwd)"
$(env) export PYTHONPATH="$(pwd)"

再次說明,AIRFLOW_HOME需設置為當前路徑夾,若未指定,Airflow會自動去用戶的根目錄再次生成配置檔,此時便無法讀取當前路徑夾下的dags/。PYTHONPATH也需設置為當前路徑夾,上節已詳細說明便不贅述。

Airflow Webserver— UI介面

$(env) airflow webserver -p 8080

Airflow Worker —負責執行具體的任務計畫

$(env) airflow worker

Airflow Scheduler — 負責排程,同時監控所有 DAG 的調度計畫

$(env) airflow scheduler

上述指令僅能在前台運行,若在生產環境下,我們可以使用nohup服務執行上述指令,或是使用Linux的Systemd系統管理工具,部署方式可以參見Airflow的文檔(推薦)。

Airflow超實用功能

  1. 全局變量設定 (Admin >> Variables)

打開Airflow UI頁面,並且點擊上方Admin菜單欄下的Variables後,我們便能設置常用的全局變量或是敏感資料,譬如Gmail密碼或API的密鑰等。Create完後,Airflow便會將該數據設置於MySQL中。

請參考下面的代碼,獲取設定的Variables。

from airflow.models import Variable
url = Variable.get('PTTUrl')
print(url)

2. 數據庫連線的設定 (Admin >> Connections)

透過Airflow Connections功能,我們可以為不同的DB設定用戶密碼等連線資料,例如常見的MySQL、PostgreSQL、Redis以及Spark等,相當實用。

(1) 與MySQL連線

from airflow.hooks.mysql_hook import MySqlHook
from sqlalchemy.orm import sessionmaker
# MySQL config
mysqlhook = MySqlHook(mysql_conn_id = 'PTT')<--建立與MySQL的連線前置作業
def create_table(tablename):
connection = mysqlhook.get_conn()
mysqlhook.set_autocommit(connection, True)
cursor = connection.cursor()
sql = """CREATE TABLE IF NOT EXISTS `{}` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`title` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL,
`author` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`board` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`hits` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`url` varchar(4096) COLLATE utf8mb4_unicode_ci NOT NULL,
`posted_date` timestamp COLLATE utf8mb4_unicode_ci NOT NULL,
`description` varchar(4096) COLLATE utf8mb4_unicode_ci NOT NULL,
`created_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci""".format(tablename)
cursor.execute(sql)
cursor.close()

(2) 與Redis連線

from airflow.contrib.hooks.redis_hook import RedisHookredis = RedisHook(redis_conn_id='redis_default').get_conn()     redis.set('last_timestamp', timestamp)

其餘Airflow與數據庫的連線,請自行參考Airflow的文檔。

3. 任務失敗重啟

我們可以針對失敗的單一Task或是整個Dag進行任務重啟,僅需點擊clear鍵,Airflow便會自動重跑該任務。

筆者至此便介紹完Airflow使用,在Github中有筆者結合Scrapy + Airflow完成定時爬取Booking.com住房資訊,讀者可以自行參閱。如讀者認為該篇文章有所助益,可多多分享並且拍手,表示對我的支持與鼓勵。

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

林育銘
林育銘

Written by 林育銘

Graduated from NTU, and worked as backend engineer. Github:https://github.com/LinYuMingBejing

No responses yet

Write a response