Apache Airflowに入門した
Jun 16, 2024 09:35 · 2037 words · 5 minute read
仕事でApache Airflowを使うかもしれません。 さっとDockerを使ってローカルで動かしてみたので、試した情報をまとめます。
目次
Airflowとは?Airflowででできること
公式ドキュメント: Apache Airflow
Apache Airflowはバッチ処理やワークフローの自動実行を制御、管理、監視するツールです。
Webインタフェースや認証機能、環境変数から機密情報の管理機能まで備えています。 さらに、タスク管理や依存関係管理、並列実行管理からエラー時の処理までやってくれます。
例えばこんなことができます。
- 特定の処理の定期実行を管理する
- 並列して実行する
- 実行順序を管理、可視化する
- 実行時のログを記録する
- APIアクセス時の認証情報などを安全に管理する
- 実行状況を可視化する(実行にかかった時間やエラー状況など)
- Airflowへのアクセス自体に認証をかける
- BigqueryやS3、Slackなどの外部ツールと連携した処理を管理する
- 上記をWebUIで操作する
これらを自分で実装しようと思うとかなり面倒ですよね。 面倒な部分をAirflowに任せることで、重要な部分の実装に注力できます。 とてもありがたい。
AirflowをDockerで動かす
公式ドキュメント: Running Airflow in Docker — Airflow Documentation (apache.org)
Docker Composeを使うことで、ローカルですぐに環境を構築できます。 本番環境で使う場合はHelmを使ってk8sで実行するのがおすすめだそうです。
ただ、個人的には小規模な環境だったらk8sを使うのは少し大げさな気もするので、まずはDocker Composeで運用を始めてみるのもいいんじゃないかなと考えています。
ドキュメントに従って環境を作ります。 今回はWindows10、WSL2 Ubuntu 22.04 LTS、 Docker Desktopの環境で実行しました。
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.2/docker-compose.yaml'
curl -LfO 'https://raw.githubusercontent.com/apache/airflow/main/.gitignore'
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
docker compose up airflow-init
docker compose up
起動後、http://localhost:8080
にアクセスすると管理画面が表示されます。
ログインすると、User管理や環境変数管理など、管理画面から各種設定ができます。
ActivityのDashboardなども最初から用意されています。
DAGs
公式ドキュメント: DAGs — Airflow Documentation (apache.org)
Airflowでは、DAG (Directed Acyclic Graph) という単位で処理を管理します。各処理の依存関係に合わせて処理実行を制御できます。
先ほどDockerで起動したDAGs画面にアクセスすると、たくさんのサンプルが用意されています。 サンプルが不要な場合は、docker-compose.yamlの「AIRFLOW__CORE__LOAD_EXAMPLES: ‘true’ 」の部分をfalseにするとサンプルのDAGは追加されません。
DAGをクリックすると、DAGの詳細情報や処理の実行順序が図示されています。
DAGでHTTP Requestを送る。ログを確認する
Airflowには、CeleryやRequestsなど、様々なPythonのライブラリが最初から読み込まれています。
サンプルのDAGを作ってみました。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def http_request():
import requests
response = requests.get('http://www.google.com')
print(response.status_code)
print(response.text)
dag = DAG('requests-sample',
description='A simple DAG to show how to use requests library in PythonOperator',
schedule_interval='0 12 * * *',
start_date=datetime(2024, 1, 1),
catchup=False
)
start_task = DummyOperator(task_id='start', dag=dag)
request_task = PythonOperator(
task_id='http_request',
python_callable=http_request,
dag=dag
)
end_task = DummyOperator(task_id='end', dag=dag)
start_task >> request_task >> end_task
DAGのGraphにタスクの依存関係を図示するために、DummyOperatorを作りました。 そしてPythonOperatorでrequestsを使ったHTTP Requestを送信しています。
一覧のTrigger DAGボタンをクリックすると手動で実行できます。
Logのタブでprint()の結果も確認できます。 実際に運用する場合はloggerを使います。
PythonOperatorの処理が終わるのを待つような機能(Sensor)もあるので、「重たい処理を外部APIに任せて、処理が終わってから後続の処理をする」なんてことも簡単に実現できそうです。
例えば重たい処理のAPIをFastAPIで実装したとすると、Airflowによるタスク依存関係の可視化とFastAPIのドキュメント作成機能のおかげで、自分で作るドキュメントの量を減らせて幸せかもしれません。
Operator
HTTP Requestのサンプルコードでは、DummyOperatorとPythonOperatorを使いました。 そのほかにも便利なOperatorがたくさんあります。
Operatorを使うことで、S3やBigQueryのデータを操作したりSlackに結果を通知するなんてこともできます。
- Airflowでよく使う便利なOperator一覧 - 株式会社ライトコード (rightcode.co.jp)
- Airflowはすごいぞ!100行未満で本格的なデータパイプライン #Python - Qiita
最後に
初めてAirflowを触りましたが、とても便利で驚きました。 便利なものはありがたく使わせてもらい、どんどん成果を出していきたいですね。