こんにちは。技術部の自称データエンジニアの池田です。
最近、Amazon Managed Workflows for Apache Airflow (MWAA) を使い倒すことに注力しています。 この記事では、MWAAの環境に任意のツール(バイナリファイル)を送り、BashOperatorで実行する方法について書きます。
背景
弊社は、2014年に次のような宣言をしており、Go言語を社内の主要言語のひとつとして使用しています。
多くのGo言語で書かれたツールは、バイナリファイル一つを配置すればよいという長所があります。
そのため、社内のプロダクトのいたる所で、OSSや内製のツールが活躍しております。
当然のごとく、MWAAでもそれらのツールを利用したくなります。
ところで、MWAAにおいて任意のツールを使ったワークフローを構築するにはどうすればよいでしょう?
- ツール入りのイメージを指定したECS Taskを定義して、MWAAからECSOperatorを使用してRunTaskする。
- ツール入のAWS Lambda関数を作成し、MWAAからPythonOperatorを使用してboto3経由でInvokeFunctionする。
- どうにかして、AirflowのWorkerにツールを送り、BashOperatorで使用する。
比較的様々な事を行い、動作の重たいツールやインストールが複雑なランタイムが必要な場合は1や2が良いでしょう。 特に、PerlやPHP、RubyなどのLLで書かれたプロダクト固有のツールは、ECS Task化しContainerOverrideを利用して、ECSOperatorでの呼び出し時に挙動を変更するのが良いでしょう。 しかし、Go言語で書かれた軽量なツールの場合は、一つのバイナリファイルだけの事が多いです。一つのバイナリファイルだけ入ったECS TaskやLambda関数を用意するのは少し手間です。 どうにかしてMWAAのWorkerにツールを送る事ができれば、BashOperetorから利用できそうです。 今回は、このどうにかして送る手段を調査しました。
どうやってツールをMWAAに送るのか?
実は、その方法は以下のユーザーガイドに書いてありました。 docs.aws.amazon.com
ユーザーズガイドの例では、Oracle InstantClientを送っています。 どのようなものでも、実行形式ファイルを plugins.zipに固めて送る ことで実現できるようです。 なるほど!、という感じですね。では、ユーザーガイドに習いGo言語製のツールをMWAAに送ってみようと思います。
以下のようなディレクトリ構成にします。
. ├── plugins │ ├── __init__.py │ ├── env_var_plugins.py │ └── bin │ └── ssmwrap
今回はMWAAの環境に送るツールとして、弊社の長田 が作成したOSSであるssmwrapを想定しています。 AWS Systems Managerのパラメータストアにアクセスして、値を環境変数に展開してくれる便利なツールです。
env_var_plugins.py はユーザーズガイドをお手本に以下のようにします。
from airflow.plugins_manager import AirflowPlugin import os os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/bin/" class EnvVarPlugin(AirflowPlugin): name = "env_var_plugin"
このあたりは、通常のAirflowのカスタムプラグインの作成と同じような感じですね。
Airflowの2.0.2に移行がお済みの方は、忘れてはいけないことがあります。
MWAA環境のAirflow設定オプションに core.lazy_load_plugins=False
を入れることです。
この設定を忘れると環境変数が設定されてない。という状態に陥ります。
あとは、このpluginsディレクトリをzipで固めてS3アップロードし、MWAAのプラグインとして設定します。
この作業を手動で行うと、AWSコンソールとローカルのShellを往復することになるので、とても面倒です。
以下のようなMakefileを作っておくことをおすすめします。
ARIFLOW_S3_BUCKET_NAME:=<MWAA環境に設定したS3 Bucket名> ARIFLOW_S3_PATH_PREFIX:=<plugins.zip 等を置く場所へのprefix> AIRFLOW_ENV_NAME:=<MWAAの環境名> .PHONY:deploy-dags deploy-dags: aws s3 sync dags/ s3://$(ARIFLOW_S3_BUCKET_NAME)/$(ARIFLOW_S3_PATH_PREFIX)/dags/ \ --exclude "*.pyc" \ --exclude ".vscode*" \ --exclude "tests*" \ --delete --follow-symlinks .PHONY: update-env update-env: plugins-version.txt requirements-version.txt AWS_PAGER='' aws mwaa update-environment --name $(AIRFLOW_ENV_NAME) \ --plugins-s3-path $(ARIFLOW_S3_PATH_PREFIX)/plugins.zip \ --plugins-s3-object-version $(shell cat plugins-version.txt)\ --requirements-s3-path $(ARIFLOW_S3_PATH_PREFIX)/requirements.txt \ --requirements-s3-object-version $(shell cat requirements-version.txt) $(MAKE) clean plugins.zip: $(shell find plugins) cd plugins && \ chmod -R 755 . && \ zip -r ../plugins.zip . requirements-version.txt: requirements.txt aws s3api put-object \ --body $< \ --bucket $(ARIFLOW_S3_BUCKET_NAME) \ --key $(ARIFLOW_S3_PATH_PREFIX)/$< \ --query VersionId --output text > $@ plugins-version.txt: plugins.zip aws s3api put-object \ --body $< \ --bucket $(ARIFLOW_S3_BUCKET_NAME) \ --key $(ARIFLOW_S3_PATH_PREFIX)/$< \ --query VersionId --output text > $@ .PHONY: clean clean: $(RM) -rf plugins.zip $(RM) -rf requirements-version.txt $(RM) -rf plugins-version.txt
makeコマンド一つで環境を更新できるようになります。 環境の更新が終われば、BashOperatorでツールの利用ができるようになります。
実際に使ってみる。
ツールが使えるかどうかや、MWAAの環境に関する調査に便利なDAGを紹介します。
""" 確認用 """ import os from airflow.decorators import dag from airflow.utils.dates import days_ago from airflow.operators.bash import BashOperator dag_name = os.path.basename(__file__).replace(".py", "") default_args = { "depends_on_past": False, "email_on_failure": False, "email_on_retry": False, } @dag( dag_id=dag_name, default_args=default_args, schedule_interval=None, start_date=days_ago(2), max_active_runs=1, doc_md=__doc__) def create_dag(): task = BashOperator( task_id="task", bash_command="{{ dag_run.conf['cmd'] }}" ) dag = create_dag()
このDAGを使ってツールが使えることを確認します。
{ "cmd": "ssmwrap --version" }
このパラメータでDAG Runすると以下のようになります。
無事、ツールをBashOperatorで実行できることを確認しました。
おわりに
この記事では、バイナリが一つだけの軽量ツールをpluginsの一部としてMWAAに送り、BashOperatorで実行してみました。
例えば、Go言語で書いたようなツールはこのような一つのバイナリを置くだけで実行できるので、今回のケースにマッチします。
BashOperatorを組み合わせるようなワークフローの場合は非常に便利になります。
PerlやPHP、Rubyなどのランタイム環境が必要なPython以外のLLでは、あまりおすすめできません。それらの言語で書かれたツールの場合は適宜ECS TaskやLambda関数を定義して、MWAAのOperator経由で起動することをおすすめします。
適切な方法を選択して、様々なワークフローを実装しましょう。