BASEプロダクトチームブログ

ネットショップ作成サービス「BASE ( https://thebase.in )」、ショッピングアプリ「BASE ( https://thebase.in/sp )」のプロダクトチームによるブログです。

AirflowをECSからMWAAに移行したらバッチ処理時間が大幅短縮された話

はじめに

こんにちは!Data Platformチームでデータエンジニアとして働いている @shota.imazeki です。

弊チームでは、従来の分析基盤を段階的に刷新する取り組みを進めており、その第一歩として、ECS上で動かしていたAirflowをAWS上のマネージドサービスである Amazon Managed Workflows for Apache Airflow(以下、MWAA)に移行しました。

もともとはインフラ管理の手間を減らすことが目的でしたが、結果としてバッチ処理時間が大幅に短縮されるという意外な効果も得られました。

この記事では、ECS上のAirflowからMWAAへの移行に至った背景や、工夫したポイント、得られた改善効果などを紹介していきます。

移行に至った背景

これまでBASEではECS上でApache Airflow v1を運用していましたが、運用負荷が高く、インフラ周りの管理には別チームの支援をお願いすることもありました。またECS用に稼働しているRDSのストレージ容量が逼迫しつつあったことも大きな課題の一つでした。

また、Airflow v1ではサポートされていないオペレーターや機能があり、ワークフローの設計・実装に制限が課されていました。従来の分析基盤を刷新していくにあたり、これらの制約は大きな障壁になると判断し、基盤の中核を担うオーケストレーションツールであるAirflowから着手することにしました。

その結果、Airflowのメジャーバージョンアップデートにあわせて、インフラ管理の負荷を軽減できるマネージドサービスであるMWAAへの移行を決断しました。

環境構築と移行準備

最初にMWAAの公式ドキュメントを参考にしながら環境構築を行いました。事前に必要なものとしては、バージョニングが有効化されたS3バケットのみで、それ以外のVPCやセキュリティグループ、IAMロールについてはMWAAのマネジメントコンソール上で作成しました。

参考: https://docs.aws.amazon.com/ja_jp/mwaa/latest/userguide/get-started.html

この環境構築の段階で工夫したことや躓いた点があったため、次にそれらについて触れていきます。

1. SSH鍵やAPIトークンなどの機密情報をSecrets Managerで管理する

ECSで運用していた際は、Airflow上の環境変数やAirflow Variable, ConnectionにSSH鍵やAPIトークンなどを設定して管理してました。ただし、どこにどの情報を置くかの基準が曖昧になっており、管理が煩雑化していたため、移行に合わせて管理方法を見直しました。

MWAAではS3にファイルとして配置して参照する方法もありますが、以下の観点からSecrets Managerに統一しました。

  • セキュリティ管理の観点
  • 情報更新時の作業負荷軽減と安全性向上

MWAAへの移行に伴い、機密性の高い情報は全てSecrets Managerに保存し、Airflowから直接参照する設計 に切り替えました。

MWAAのAirflow設定オプション

MWAA環境構築時に、Airflow設定オプションで[カスタム設定を追加]を選択し、以下のキーと値のペアを追加します。

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

MWAAからSecrets Managerへのアクセス設定

MWAAからSecrets Managerに安全にアクセスできるよう、IAMロールとポリシーを設定しました。セキュリティを考慮して、airflow/* という名前の Secretsのみアクセスできるようにスコープを絞っています。

参考: https://docs.aws.amazon.com/ja_jp/mwaa/latest/userguide/mwaa-create-role.html#mwaa-create-role-attach-json-policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager:{リージョン}:{アカウントID}:secret:airflow/*"
        },
        {
            "Effect": "Allow",
            "Action": "secretsmanager:ListSecrets",
            "Resource": "*"
        }
    ]
}

Secrets Manager側の設定

Secrets Managerにて、以下のように名前を付けて情報を保存します。複数のパラメータをまとめたい場合はJSON形式にしておくと、Airflow側で分割して取得する必要がなく便利でした。

  • airflow/connections/SSH_CONNECTION
{
  "conn_id": "SSH_CONNECTION",
  "conn_type": "ssh",
  "host": "192.0.2.0",
  "login": "centos",
  "port": 22,
  "extra": {
    "private_key": "-----BEGIN RSA PRIVATE KEY-----\n<秘密鍵の内容は伏せています>\n-----END RSA PRIVATE KEY-----"
  }
}
  • airflow/connections/GCP_CONNECTION
{
  "conn_type": "google_cloud_platform",
  "extra": {
    "project": "sample-project",
    "keyfile_dict": {
      "type": "service_account",
      "project_id": "sample-project",
      "private_key_id": "abc123def456",
      "private_key": "-----BEGIN PRIVATE KEY-----\n<秘密鍵の内容は伏せています>\n-----END PRIVATE KEY-----\n",
      "client_email": "[email protected]",
      "client_id": "1234567890",
      "auth_uri": "https://accounts.google.com/o/oauth2/auth",
      "token_uri": "https://oauth2.googleapis.com/token",
      "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
      "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/sample-sa%40sample-project.iam.gserviceaccount.com"
    }
  }
}
  • airflow/variables/API_TOKEN
abcdefg1234567890

DAGからの利用例

その後、DAGファイル上で以下のようにコードを書くことでSSH鍵やAPI_TOKENを利用することが可能になります。

from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.models import Variable

# Secrets Manager に保存された SSH Connection を利用
ssh_task = SSHOperator(
    task_id='ssh_task',
    ssh_conn_id='SSH_CONNECTION',
    command="echo 'Hello MWAA!'",
    dag=dag,
)

bq_task = BigQueryExecuteQueryOperator(
    task_id='bq_task',
    sql="select 'Hello MWAA!'",
    use_legacy_sql=False,
    location='asia-northeast1',
    gcp_conn_id='GCP_CONNECTION',
    dag=dag,
)

# Secrets Manager に保存された API トークンを取得
token = Variable.get('API_TOKEN')

これにより、従来の環境変数やAirflow Connection、S3によるファイル管理と比較して、より明確かつ安全に機密情報を一元管理できるようになりました。

2. フォルダ構成の整理

MWAAではDAGファイル以外にも、以下のような共通リソースを利用するため、dagsフォルダ配下に用途ごとに整理しました。

dags/
├── sample_dag1.py          # DAGファイル
├── sample_dag2.py
├── plugins/                # カスタムオペレーター、フックなど
│   └── custom_operator.py
├── sql/                    # クエリファイル
│   └── sample_query.sql
└── common/                 # 共通関数やユーティリティ
    └── utils.py

MWAAのS3バケットでは、dagsフォルダ直下がPythonモジュールパスとして認識されるため、DAGファイルと同じ階層配下(dags/)に共通リソースをすべてまとめています。

そのため、DAGファイル内では以下のようにシンプルにインポートできます。

from common.utils import some_function
from plugins.custom_operator import CustomOperator 

SQLファイルの利用方法

Airflowで利用するクエリは以前から.sqlファイルで管理しており、この運用をMWAAでも継続しています。DAGのtemplate_searchpathにsqlフォルダを設定することでBigQueryExecuteQueryOperatorにファイル名を指定すればクエリを実行できます。

from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator

# DAGオブジェクトの作成(詳細は省略、SQLファイルの検索パスのみ記載)
dag = DAG(
    dag_id='base_dwh',
    template_searchpath=['/usr/local/airflow/dags/sql'],
)

bq_task = BigQueryExecuteQueryOperator(
    task_id='bq_task',
    sql='sample_query.sql',  # sqlディレクトリ配下のファイル
    use_legacy_sql=False,
    location='asia-northeast1',
    gcp_conn_id='GCP_CONNECTION',
    dag=dag,
)

もちろんJinjaテンプレートにも対応しており、DAG実行時に動的に値を埋め込むことも可能です。以下は、Airflowの組み込みマクロ({{ ds }})と、DAG側からparamsを使って渡した値の両方を SQLファイル内で利用する例です。

-- sample_query.sql
SELECT
  column1,
  column2
FROM
  `sample_dataset.sample_table`
WHERE
  DATE(column_date) = '{{ ds }}'
  AND category = '{{ params.category }}'

DAG側では、paramsに値を渡すことで、SQLファイル内のJinjaテンプレートが展開されます。

from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator

bq_task = BigQueryExecuteQueryOperator(
    task_id='bq_task',
    sql='sample_query.sql',
    use_legacy_sql=False,
    location='asia-northeast1',
    gcp_conn_id='GCP_CONNECTION',
    params={'category': 'category1'}, # params で category を指定する例
    dag=dag,
)

3. GitHub Actions で MWAA への自動デプロイ

ECS時代もCircleCIを使ってCI/CDによるデプロイを行っていましたが、MWAAではS3にファイルを配置する方式になるため、移行にあわせてGitHub Actionsに切り替え、デプロイ方法も見直しました。

構築した内容

  • GitHub Actionsによって以下を自動化しました。
    • mainブランチへのマージ時に、DAGファイル・plugins・共通処理・SQLファイルなどに変更があれば、S3にアップロードする。

以下はそのGitHub Actionsのサンプルです。

name: Deploy MWAA DAGs

on:
  push:
    branches:
      - main
    paths:
      - 'dags/**'

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          aws-region: ap-northeast-1  # 使用している AWS リージョン
          aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
          aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

      - name: Sync DAGs to S3
        run: aws s3 sync dags/ s3://sample-airflow-dag-bucket/dags/ --delete

      - name: List uploaded DAGs
        run: aws s3 ls s3://sample-airflow-dag-bucket/dags/

なお、requirements.txtが更新された場合はMWAA環境の更新が必要です。現在は手動で対応しているものの、今後はGitHub Actionsによる自動反映も検討しています。

ただし環境更新中は一時的にMWAAが停止するため、バッチ処理への影響がないタイミングで実行する必要があり、このあたりは慎重に検討していきたいと考えています。

4. requirements.txt で発生した問題と対処

MWAAでも外部APIを利用するために、requirements.txtにライブラリを記載してインストールしています。ECS時代から利用していたgoogle-ads, facebook-businessなどの広告系API用ライブラリも引き続き使用していました。

発生した問題

MWAAでrequirements.txtを更新した際、次のような問題が発生しました。

  • 一部のライブラリ(google-ads, facebook-businessなど)のインストール時にエラーが発生
  • MWAAはpip installが1つでも失敗した場合、全てのインストールが中断される(All or Nothing)仕様
  • そのため、全てのDAGで必須なapache-airflow-providers-*系ライブラリがインストールされず、DAGが動作しなくなる

原因

Airflow公式が提供しているconstraintsファイルとrequirements.txtに記載したバージョン指定が衝突していました。たとえば、MWAA v2.8.1(Python 3.10)の場合、以下のconstraintsファイルが適用されます。

https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt

このconstraintsファイルに含まれるバージョンと異なるバージョン(google-ads==22.0.0 など)を明示的に指定してしまうと、バージョン衝突によってインストールエラー → 全てのインストールが失敗ということになります。

参考: https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html#constraints-files

対応策

ローカル環境でAirflowのconstraintsを適用した状態でpip installを検証するようにしました。

# 仮想環境を作成
rm -rf venv
python -m venv venv
source venv/bin/activate

# pip / setuptools / wheel を最新化
pip install --upgrade pip setuptools wheel

# constraints を適用してインストール
pip install -r requirements.txt \
  -c https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.10.txt

特に以下に注意しています。

  • MWAAのバージョンごとに対応するconstraintsファイルを適用すること
  • インストールしたいライブラリがconstraintsファイルで制約されているか確認すること
  • 必ずローカルでインストールが成功することを確認してから、MWAAに反映すること

MWAAへの移行作業

デプロイやSecrets管理などの仕組みが整ったので、実際の移行作業自体は比較的シンプルでした。

移行の進め方

  • DAGごとに独立していたため、全てを一度に切り替えるのではなく1つずつ順に移行・動作確認
    • TriggerDagRunOperatorを用いたDAGのみ、まとめて切り替え
    • 影響範囲の小さいDAGから順に移行することでリスクを抑制
  • v1の時は実装上の都合からBigQueryのクエリ実行やSlack通知などで自作していたOperator / Hookを以下の観点からAirflow公式のProviderに置き換え
    • 保守・メンテナンスコストの削減
    • Airflowバージョンアップ時の互換性確保
    • Providerごとに細かな改善・バグ修正が取り込まれている

その他

  • MWAAの動作環境(PythonバージョンやAirflowバージョン)に合わせて、コードの微修正も行いました。
  • 機密情報の管理やDAG配置場所の変更も事前に行っていたため、その部分のコードの微修正のみでDAG本体の移行作業自体はスムーズに完了しました。

移行によって得られた意外な効果

MWAAへの移行を進めた主目的は「インフラ管理の手間削減」でしたが、思わぬ性能改善という効果も得られました。

処理時間の改善

もともと日次バッチとして実行していたデータ連携処理では、1日あたり約14時間の処理時間がかかっていました。MWAAに移行した結果、約9時間で完了するようになり、4〜5時間ほど短縮されました。

詳細に見ていくと、ECS側では各タスクの開始間隔にも約30秒の遅延があり、日次バッチ全体では約500タスク × 30秒 = 約4.2時間の待機時間が発生していました。MWAAではこの待機時間がほぼなくなっており、これによって処理時間が改善されたと考えています。

原因の考察

さらに調査したところ、ECSの方ではExecuterがSequentialExecutor(逐次実行)になっていましたが、MWAAではデフォルトでCeleryExecutor(並列実行)が使用されていたためでした。

ECS時代はシンプルさを優先してSequentialExecutorを使用しており、タスクは1つずつ実行していました。一方、MWAAではCeleryExecutor(またはKubernetesExecutor)がデフォルトで、タスクが依存関係に応じて複数同時に実行される構成となっています。これにより、独立して動作可能なタスクは自動的に並列実行され、タスク間の待機時間も解消されたため、全体の処理時間が大幅に短縮されたと考えています。

なお、ECS時代でもExecutorの設定を変更すれば並列実行による改善は可能だったかもしれません。ただし、そうした調整やメンテナンスを自前で行うことなく、マネージド環境がデフォルトで最適な構成にしてくれるのも、MWAAを選んだ大きなメリットだと感じました。

影響範囲

タスクが並列実行されることによる影響がないか、以下の観点で確認しました。

元データベースへの影響について

MWAAへの移行によって処理の並列化が進みましたが、ETLツールとしてメインで利用しているEmbulk自体は逐次的に実行されるようになっているため、元データベースへの負荷が並列実行で急増することはありませんでした。単純に待機時間が減ったのみで、移行後も元DBへの負荷増加を心配することなく、安定して運用できています。

並列実行によるタスク間の影響

同じテーブルや同じファイルへの出力などをDAGのタスク内で行なっておらず、タスクの依存関係(task1 >> task2)をDAG内で明示していたために同時実行がなされても問題にはなりませんでした。今後もExecutorに依存しない(並列実行されても問題ない)堅牢なDAG設計を継続していきます。

今後の改善点

1. DAGsフォルダ直下のファイル整理

現在、dagsフォルダ直下にDAGファイルだけでなくplugins/, sql/, common/といった複数のフォルダが混在している状態です。

将来的には、DAGファイルと共通リソースをより分かりやすく整理し、管理・メンテナンスしやすい構成に改善したいと考えています。

2. Terraformによるインフラ構成管理

MWAA環境やSecrets Manager、S3バケットなどのAWSリソースをTerraformで管理することで、環境構築や変更の再現性を高め、運用負荷の軽減を図りたいです。

現状は手動で設定している部分も多いため、IaC化による標準化・自動化が望まれます。

おわりに

Airflowの移行によって、運用負荷の軽減だけでなく、思わぬ性能改善も得ることができました。

今後も基盤全体の改善を進めながら、より安定したデータ連携基盤を目指していきます。

最後となりますが、弊社ではデータエンジニアを募集しています。上記で述べた課題以外にもBASEの分析基盤には多くの課題があって、とてもやりがいのある仕事かなと思っております。ご興味のある方は気軽にご応募ください!

open.talentio.com