Apache Airflowに入門した

Jun 16, 2024 09:35 · 2037 words · 5 minute read python docker wsl

仕事でApache Airflowを使うかもしれません。 さっとDockerを使ってローカルで動かしてみたので、試した情報をまとめます。

目次

Airflowとは?Airflowででできること

公式ドキュメント: Apache Airflow

Apache Airflowはバッチ処理やワークフローの自動実行を制御、管理、監視するツールです。

Webインタフェースや認証機能、環境変数から機密情報の管理機能まで備えています。 さらに、タスク管理や依存関係管理、並列実行管理からエラー時の処理までやってくれます。

例えばこんなことができます。

  • 特定の処理の定期実行を管理する
  • 並列して実行する
  • 実行順序を管理、可視化する
  • 実行時のログを記録する
  • APIアクセス時の認証情報などを安全に管理する
  • 実行状況を可視化する(実行にかかった時間やエラー状況など)
  • Airflowへのアクセス自体に認証をかける
  • BigqueryやS3、Slackなどの外部ツールと連携した処理を管理する
  • 上記をWebUIで操作する

これらを自分で実装しようと思うとかなり面倒ですよね。 面倒な部分をAirflowに任せることで、重要な部分の実装に注力できます。 とてもありがたい。

AirflowをDockerで動かす

公式ドキュメント: Running Airflow in Docker — Airflow Documentation (apache.org)

Docker Composeを使うことで、ローカルですぐに環境を構築できます。 本番環境で使う場合はHelmを使ってk8sで実行するのがおすすめだそうです。

ただ、個人的には小規模な環境だったらk8sを使うのは少し大げさな気もするので、まずはDocker Composeで運用を始めてみるのもいいんじゃないかなと考えています。

ドキュメントに従って環境を作ります。 今回はWindows10、WSL2 Ubuntu 22.04 LTS、 Docker Desktopの環境で実行しました。

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.2/docker-compose.yaml'
curl -LfO 'https://raw.githubusercontent.com/apache/airflow/main/.gitignore'
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

docker compose up airflow-init
docker compose up

起動後、http://localhost:8080 にアクセスすると管理画面が表示されます。

ログインすると、User管理や環境変数管理など、管理画面から各種設定ができます。

ActivityのDashboardなども最初から用意されています。

DAGs

公式ドキュメント: DAGs — Airflow Documentation (apache.org)

Airflowでは、DAG (Directed Acyclic Graph) という単位で処理を管理します。各処理の依存関係に合わせて処理実行を制御できます。

先ほどDockerで起動したDAGs画面にアクセスすると、たくさんのサンプルが用意されています。 サンプルが不要な場合は、docker-compose.yamlの「AIRFLOW__CORE__LOAD_EXAMPLES: ‘true’ 」の部分をfalseにするとサンプルのDAGは追加されません。

DAGをクリックすると、DAGの詳細情報や処理の実行順序が図示されています。

DAGでHTTP Requestを送る。ログを確認する

Airflowには、CeleryやRequestsなど、様々なPythonのライブラリが最初から読み込まれています。

サンプルのDAGを作ってみました。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def http_request():
    import requests
    response = requests.get('http://www.google.com')
    print(response.status_code)
    print(response.text)

dag = DAG('requests-sample', 
    description='A simple DAG to show how to use requests library in PythonOperator',
    schedule_interval='0 12 * * *',
    start_date=datetime(2024, 1, 1), 
    catchup=False
)

start_task = DummyOperator(task_id='start', dag=dag)

request_task = PythonOperator(
    task_id='http_request',
    python_callable=http_request,
    dag=dag
)

end_task = DummyOperator(task_id='end', dag=dag)

start_task >> request_task >> end_task

DAGのGraphにタスクの依存関係を図示するために、DummyOperatorを作りました。 そしてPythonOperatorでrequestsを使ったHTTP Requestを送信しています。

一覧のTrigger DAGボタンをクリックすると手動で実行できます。

Logのタブでprint()の結果も確認できます。 実際に運用する場合はloggerを使います。

PythonOperatorの処理が終わるのを待つような機能(Sensor)もあるので、「重たい処理を外部APIに任せて、処理が終わってから後続の処理をする」なんてことも簡単に実現できそうです。

例えば重たい処理のAPIをFastAPIで実装したとすると、Airflowによるタスク依存関係の可視化とFastAPIのドキュメント作成機能のおかげで、自分で作るドキュメントの量を減らせて幸せかもしれません。

Operator

HTTP Requestのサンプルコードでは、DummyOperatorとPythonOperatorを使いました。 そのほかにも便利なOperatorがたくさんあります。

Operatorを使うことで、S3やBigQueryのデータを操作したりSlackに結果を通知するなんてこともできます。

最後に

初めてAirflowを触りましたが、とても便利で驚きました。 便利なものはありがたく使わせてもらい、どんどん成果を出していきたいですね。

tweet Share