Airflow系列文章,都是使用1.10.12版本,然而在筆者撰寫文章時,Airflow版本已經升級為2.0,許多指令、用法與1.X版本已大相逕庭,請讀者閱讀本文時,務必安裝正確的Airflow版本(1.10.12)。
事先準備工具: (1) MySQL (2) Redis
Airflow DB配置
在上篇文章當中,我們透過SQLite存取Airflow的相關配置,然而僅適用於測試環境。在生產環境下,SQLite可能會有數據丟失的問題,因此強烈建議使用MySQL或PostgreSQL資料庫。
本文將使用MySQL作為存儲Airflow配置的資料庫,請讀者事先安裝好MySQL,並且為Airflow創建專屬的Database與角色權限。
mysql> create database airflow;
mysql> CREATE USER 'airflow' IDENTIFIED BY 'airflow';
mysql> GRANT ALL PRIVILEGES ON airflow.* to 'airflow'@'%' WITH GRANT OPTION;
mysql> flush privileges;
修改sql_alchemy_conn。
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
安裝MySQL的Python套件。
$(env) pip3 install mysqlclient==2.0.1
輸入以下指令,Airflow便會在MySQL airflow的DB中自動生成相關的table。
$(env) airflow upgradedb
Executor配置
在測試環境下,executor默認為LocalExecutor,一個時間點Airflow最多只會執行一個Task。因此,在生產環境下,需將executor改為CeleryExecutor,由Airflow搭配Celery實現複雜的分布式任務調度,並且以Redis作為Celery的broker與backend。
安裝Celery與Redis套建。
$(env) pip3 install celery
$(env) pip3 install redis
允許以root權限啟動Celery。
$(env) export C_FORCE_ROOT=True
修改 airflow.cfg 相關配置。
# airflow.cfg
executor = CeleryExecutorbroker_url = redis://redis:6379/0
result_backend = redis://localhost:6379/2
其他配置
為了保持Airflow Web UI的簡潔,我們需取消Airflow默認的Dag,以及設定時區為UTC+8,確保任務執行時間與當前時間一致。
# airflow.cfg
load_examples = True
default_timezone = Asia/Taipei
更改完上述參數後,我們需再Upgrade Airflow的db。
部署流程
$(env) export AIRFLOW_HOME="$(pwd)"
$(env) export PYTHONPATH="$(pwd)"
再次說明,AIRFLOW_HOME需設置為當前路徑夾,若未指定,Airflow會自動去用戶的根目錄再次生成配置檔,此時便無法讀取當前路徑夾下的dags/。PYTHONPATH也需設置為當前路徑夾,上節已詳細說明便不贅述。
Airflow Webserver— UI介面
$(env) airflow webserver -p 8080
Airflow Worker —負責執行具體的任務計畫
$(env) airflow worker
Airflow Scheduler — 負責排程,同時監控所有 DAG 的調度計畫
$(env) airflow scheduler
上述指令僅能在前台運行,若在生產環境下,我們可以使用nohup服務執行上述指令,或是使用Linux的Systemd系統管理工具,部署方式可以參見Airflow的文檔(推薦)。
Airflow超實用功能
- 全局變量設定 (Admin >> Variables)

打開Airflow UI頁面,並且點擊上方Admin菜單欄下的Variables後,我們便能設置常用的全局變量或是敏感資料,譬如Gmail密碼或API的密鑰等。Create完後,Airflow便會將該數據設置於MySQL中。
請參考下面的代碼,獲取設定的Variables。
from airflow.models import Variable
url = Variable.get('PTTUrl')
print(url)
2. 數據庫連線的設定 (Admin >> Connections)

透過Airflow Connections功能,我們可以為不同的DB設定用戶密碼等連線資料,例如常見的MySQL、PostgreSQL、Redis以及Spark等,相當實用。
(1) 與MySQL連線
from airflow.hooks.mysql_hook import MySqlHook
from sqlalchemy.orm import sessionmaker# MySQL config
mysqlhook = MySqlHook(mysql_conn_id = 'PTT')<--建立與MySQL的連線前置作業def create_table(tablename):
connection = mysqlhook.get_conn()
mysqlhook.set_autocommit(connection, True)
cursor = connection.cursor()
sql = """CREATE TABLE IF NOT EXISTS `{}` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`title` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL,
`author` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`board` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`hits` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL,
`url` varchar(4096) COLLATE utf8mb4_unicode_ci NOT NULL,
`posted_date` timestamp COLLATE utf8mb4_unicode_ci NOT NULL,
`description` varchar(4096) COLLATE utf8mb4_unicode_ci NOT NULL,
`created_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci""".format(tablename)
cursor.execute(sql)
cursor.close()
(2) 與Redis連線
from airflow.contrib.hooks.redis_hook import RedisHookredis = RedisHook(redis_conn_id='redis_default').get_conn() redis.set('last_timestamp', timestamp)
其餘Airflow與數據庫的連線,請自行參考Airflow的文檔。
3. 任務失敗重啟

我們可以針對失敗的單一Task或是整個Dag進行任務重啟,僅需點擊clear鍵,Airflow便會自動重跑該任務。
筆者至此便介紹完Airflow使用,在Github中有筆者結合Scrapy + Airflow完成定時爬取Booking.com住房資訊,讀者可以自行參閱。如讀者認為該篇文章有所助益,可多多分享並且拍手,表示對我的支持與鼓勵。