為了實現排程任務,例如定時爬蟲、定時統計數據或是定時請求第三方開源API數據等,Python開發人員能夠使用Linux自帶的Crontab工具進行開發,但若遇到任務失敗,由於缺乏可視化頁面,並不利於後續的管理;至於Celery Beat的機制也提供一套定時任務管理工具,同時使用Celery Flower套件提供的UI頁面,查看任務執行狀況等,然而在複雜的資料流場景下,Celery的實現並不友善,而且部署相當複雜。
有鑑於此,本文想與讀者介紹一套由Airbnb開源的數據流管理工具—Apache Airflow,功能相當強大,實現卻相當簡單;筆者先前所待的公司,便是透過Airflow管理內部的數據流,譬如定時抓取Google與YAHOO後台的廣告數據、計算文章單日流量、文章推薦ETL、文章同步至Elasticsearch等排程任務,由於Airflow自帶一套完善的可視化UI頁面,可以迅速查看所有排程日誌以及監測異常任務,使得開發人員更容易維護數據流與除錯。
本文會藉由爬取PTT熱門文章的實例,教導讀者如何安裝與使用Airflow。
Airflow安裝
首先,安裝Apache Airflow的套件。
$(env) pip3 install apache-airflow==1.10.12
Airflow的所有配置,包括Dag觸發時間、Task排程、全局變量設定以及與數據庫連線(譬如MySQL、Redis等)的帳號密碼,皆會存儲於SQLite的資料庫中。
初始化Airflow配置。
$ export AIRFLOW_HOME="${pwd}"
$ airflow initdb
執行完上述指令,Airflow便會在當前的路徑下,自動生成airflow.cfg、airflow.db、dags、logs 與unitests.cfg等檔案與資料夾,目錄結構如下:
demo/ <---目前路徑
├── airflow.cfg
├── airflow.db
├── dags # <--- You have to put your conjob in this folder.
│ └── pttArticle.py
├── logs
│ └── scheduler
├── requirements.txt
├── tasks
│ └── ptt.py
└── unittests.cfg
我們可以啟動Airflow的Webserver,熟悉Airflow的Web UI介面。
$ airflow webserver -p 8080
在瀏覽器輸入http:127.0.0.1:8080,便會進入上述頁面。讀者們會發現有許多Dag排程,係因在airflow.cfg檔案中,load_examples
配置默認為True,若設置為False,便不會出現上述默認的任務。
實現定時的PTT爬蟲任務
Airflow會自動去dags/目錄夾底下讀取.py檔,並且將每個.py檔的設定,譬如每個Dag的執行時間、Dag的完成時間等變量,存儲至SQLite當中。因此,讀者務必將Code寫在dags/目錄夾下。
首先,創建ptt.py檔,並且開始撰寫我們第一個排程任務吧!以下代碼擷取自筆者的Github專案,讀者們可以將專案clone下來,並且進行修改操作。
from airflow import DAG
from airflow.operators.python_operator import PythonOperatorfrom datetime import datetime, timedelta
import json
import pendulum
import requestsfrom tasks.ptt import upsertlocal_tz = pendulum.timezone('Asia/Taipei')
default_args = {
'owner': 'PTT',
'start_date': datetime(2020, 5, 1, 0, 0),
'retries': 3,
'retry_delay': timedelta(minutes=5)
}PTTUrl = 'https://moptt.azurewebsites.net/api/v2/hotpost?b=Gossiping&b=Boy-Girl&b=Beauty&b=marvel&b=WomenTalk&b=movie'headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.67 Safari/537.36',
'Referer': 'https://moptt.tw/',
'Authorization': 'cMIS1Icr95gnR2U19hxO2K7r6mYQ96vp'
}def crawlPTT(**context):
table_name = context['task_instance'].xcom_pull(task_ids='createTable')
table_name = get_table_name(table_name) response = requests.get(PTTUrl, headers=headers)
posts = json.loads(response.text)['posts'] for post in posts:
upsert(post, table_name)with DAG('HotArticle', default_args=default_args,schedule_interval='@hourly') as dag:
crawlPTT = PythonOperator(
task_id = 'crawlPTT',
python_callable = crawlPTT,
provide_context = True
)createTable = PythonOperator(
task_id = 'createTable',
python_callable = create_table,
provide_context = True
)createTable >> crawlPTT
由於我們是在dags/目錄夾底下進行開發,若此時想導入from tasks.ptt import upsert
時,會出現No module name tasks.ptt
的錯誤,此為Python導包路徑的問題,我們可以在命令行輸入以下指令,解決上述問題。
$(env) export PYTHONPATH="$(pwd)"
首先,介紹關於Airflow的變量配置:
1. default_args
(1) owner: 該排程權限擁有者,筆者通常會將owner設定專案名稱。
(2) start_date: 該排程首次執行日期,務必注意需轉換為Asia/Taipei時區。
(3) retries: 當任務失敗後,任務自動重啟的次數。
(4) retry_delay: 當任務失敗後,需間隔X分鐘才會進行任務重試。
2. with DAG(………………..)
(1) HotArticle: 該Dag的名稱。
(2) default_args: 該Dag的相關參數配置。
(3) schedule_interval: 該Dag每隔多久執行一次,其設定方式與Crontab相同。
3. crawlPTT = PythonOperator(……)
(1) PythonOperator: 透過該Operator,使得Python的函式變成Airflow的工作。
(2) task_id: 由於每個Dag是由眾多的task組成,因此必須為每個task命名。
(3) python_callable: 代表要執行的Python函式。
(4) provide_context: 設置為True,便能獲取Airflow的環境變量,譬如任務執行時間。在Python的函式中,則用**context進行接收。
4. createTable >> crawlPTT
:代表createTable任務執行完畢後,再執行crawlPTT的爬蟲任務。
Airflow測試
撰寫完Airflow的Dag後,我們便能測試該段代碼是否正確無誤。
$(env) python dags/crawlPTT.py
測試每個task執行的結果,是否如預期一致。
$(env) airflow test HotArticle crawlPTT 2021-03-01
HotArticle為Dag的名稱,crawlPTT為Task名稱,2021–03–01為任務執行的時間。
當撰寫完ptt.py檔後,其配置會自動顯示於Web UI頁面,我們打開http://127.0.0.1:8000,查看該Dag的排程。
確認與我們預設的排程狀況一致,我們便能拖曳off
,開啟Dag,Airflow便會每個鐘頭爬取PTT熱門文章。
值得注意的是,Web UI右上方顯示的時間為UTC標準時區,因此,開發人員需自行將該時間再往後加8個小時。再者,如PTT爬蟲任務從2021年4月12日14:00:00開始,設定每隔一個小時執行一次任務,若當前為2021年4月12日18:00:00,代表該排程將已經執行4次,LAST RUN時間會顯示為2021年4月12日17:00:00,這點需特加注意,筆者剛接觸Airflow時,經常搞混執行時間與當前時間。
以上便介紹完Airflow的安裝、使用與基本概念,下章會詳細介紹如何部署生產環境下的Airflow,以及Airflow的實用操作,請讀者拭目以待。如讀者認為該篇文章有所助益,可多多分享並且拍手,表示對我的支持與鼓勵。