こんにちは! エムスリーエンジニアリンググループ AI・機械学習チームの氏家です。
AI・機械学習チームでは多くのMLプロダクトを開発・運用していますが、そのうちのほとんどがgokartという機械学習向けパイプラインライブラリを使って実装されています。
gokartはとてもMLプロダクトの開発に便利な反面、gokartを100%活用する方法が確立、普及しているとはいえません。 そこで、本記事では、実際にエムスリーで活用しているTipsなどを紹介していこうと思います。 これを機にどんどんgokartの知見がネット上に溢れるようになっていければ嬉しいです。
なお、本記事は「gokartで爆速開発!MLOps勉強会」の発表をもとに加筆したものとなります。
発表資料も公開していますのでそちらもご参照ください。
gokart
gokartとはエムスリーでメンテナンスしている機械学習パイプラインOSSです。 Spotifyが開発しているLuigiのwrapperであり、タスクと呼ばれるクラスを定義し、タスク同士の依存関係を元にパイプラインを管理するツールとなっています。
gokartの特徴のひとつとして、強力なキャッシュ機能があります。 gokartではParameterや依存関係に応じてタスクごとに出力をキャッシュしており、パイプラインを再実行したとき設定が同じなら過去の実行結果を再利用できます。 このキャッシュの利用はパイプライン実行時に自動で行われるため、ユーザーは適切にタスクを設計するだけで、特にキャッシュを意識することなく効率的にパイプラインを実行できます。
また、このキャッシュ機能により、バッチが途中で異常終了した場合には次回実行時に成功済みタスクをスキップすることができます。 これはAWS等のspot instanceと強力なシナジーがあり、インスタンスが強制終了した場合にも成功済みタスクはキャッシュされているため、再実行時には失敗したタスクから処理を再開できます。 これについてはSansanさんの以下の発表がわかりやすいのでぜひご覧ください。
エムスリーではMLプロダクトを効率的に開発・運用すべく、ほとんどすべてのMLプロダクトでgokartが使われており、その中でgokartを使った開発での知見・ハマりポイントが蓄積されてきています。 次項からはその中から特に3つのトピックについて紹介していこうと思います。
Parameterに差分のないタスクを再実行する
前項で紹介したキャッシュ機能は、データのダウンロードや特徴量作成など同じ条件で回すことの多いMLプロダクトにおいて強力な機能ですが、ここに思わぬ落とし穴があります。
あるタスクAにバグがあることが判明し、それを修正することを考えます。 修正をコードに取り込みパイプラインを実行させたとき、本来は修正したタスクAから再実行されてほしいですが、gokartではこの場合再実行されません。 gokartのキャッシュが、Parameter、及びその依存関係からにのみ決定され、処理の中身を考慮していないからです。
このように、Parameterと依存関係でタスクの同一性判定するのは便利な反面、内部処理の変更等によりParameterによらずタスクを再実行したい場合も多々あります。
このときに使える機能を紹介していきます。
バージョンパラメータを挿入する
最も素朴な方法は、バージョンを意味するParameterを導入することです。 このParameterは何かしらの意味があるものではなく、キャッシュ使用を避けるためだけに用いられます。
class TaskA(gokart.TaskOnKart): __version = luigi.IntParameter(default=1)
内部処理を変更した際はdefault値をインクリメントすることで、Parameterの値が変更されタスクが再実行されます。 このdefault値によって明示的にタスク(キャッシュ)のバージョンを管理できるため、プロダクト横断で使用しているタスクなどで特に有用です。
serialized_task_definition_checkを使う
2つ目はserialized_task_definition_check
を使うことです。
serialized_task_definition_checkは、コード自体の変更もタスクの同一判定に用いる機能です。 先ほどgokartはParameterと依存関係のみによりキャッシュが作られると述べましたが、この機能を使うことでコードの変更によってもタスクを再実行させることができます。
class TaskA(gokart.TaskOnKart): serialized_task_definition_check = True
この機能はタスクの試行錯誤等で非常に便利な一方で、キャッシュの判定がコード上から見通しづらくなってしまったり、コメントの挿入などにより思わぬ再実行が行われてしまったりする危険があります。 本番運用では前項のバージョンパラメータを導入するのが良いでしょう。
rerunを使用する
最後に、gokartのrerunという機能を使う手があります。
rerunとは、該当タスクを強制的に再実行する機能です。下記の様にrerun = True
と指定するだけで、たとえParameterや依存関係が同じでも、パイプライン実行のたび該当タスクを再実行させることができます。
class TaskA(gokart.TaskOnKart): rerun = True
これは非常に便利ですが、使いどころが非常に難しい機能です。 というのも、rerunは該当タスクのキャッシュを強制的に削除することで再実行を実現しているからです。 そのため、同一タスクを複数パイプラインで同時に動かした場合に容易にバグを生みます。 例えばタスクAが同時に2つ実行された場合、1つのめのAによってキャッシュが削除された後に2つめのAでキャッシュを読もうとした場合に実行時エラーとなってしまいます。
そのため、リスクを理解した上で運用していく必要があるでしょう。
大規模なパイプラインを見通しよく管理する
これまで見てきたように、gokartはキャッシュ機能を使って大規模なパイプラインの実行を効率化できます。 キャッシュを有効活用するためには適切な粒度でタスクを分割する必要がありますが、分割していくとしばしば依存関係が肥大化してしまいます。
肥大化したタスク群は依存関係を見渡すことが難しくなり、あるタスクの挙動を確認するために定義ジャンプ地獄に陥ることも少なくありません。
肥大化したパイプラインについてもコードベースで依存関係が見通せる様に、エムスリーではPipeline形式と呼ばれる「依存関係を記述するためだけのタスク」を実装しています。
具体的には、下記の様にrequires
に実行したいすべてのの依存関係を記述し、自身はdone
という文字をdumpするのみのタスクを作ります。
class PipelineTask(gokart.TaskOnKart): def requires(self): data = DownaloadData() preprocessed_data = PreprocessData(data=data) .... predictions = Predict(data=preprocessed_data, model=model) return predictions def run(self): self.dump('done')
あとはPipelineTask
を実行することでrequires
に記述された依存タスクがすべて実行されます。
依存関係はPipelineTask
にすべて詰め込まれているため、依存関係の把握にはPipelineTask
を見れば十分となり、コードの可読性が大幅に向上しました。
実際に、新しく作られるパイプラインの大部分がこの形式で作られています。
この形式は以下の記事でも紹介されています。実際のTitanicコンペを題材に実際のPipelineが実装されているので大変参考になります。 www.m3tech.blog
余談ですが、「gokartで爆速開発!MLOps勉強会」でSansanさんでもこの形式が使われていると紹介していただき、同じ課題を持って同じ解決策に行き着いていたことに感動しました。
gokartでのテスト戦略
前項でgokartの可読性に触れましたが、プロダクションレベルで運用するためにテストをはじめとした堅牢な実装も意識する必要がありますよね。
ただ、gokartでのテストは少し面倒です。 というのも、gokartはパイプラインツールという性質上、依存の途中にあるタスクのみを実行することはあまり想定されていません。 そのため、あるタスクのテストをする場合も、愚直には直接の依存タスクをすべてモックした上でパイプラインを実行するサブプロセスを呼び出す必要があります*1。
それでは開発負荷が高いため、テスト自体の品質にも影響を及ぼしかねません。 そのため、エムスリーではテストを簡単に書くため以下の工夫を行なっています。
ロジックを完全に分離させる
まず、タスクが行いたい処理はすべて関数化し、gokartタスクとしての実装と分離しています。 しばしばstaticmethodとして切り出しています。
class TaskA(gokart.TaskOnKart): def run(self): data = self.load_data_frame() self.dump(self._run(data=data)) @staticmethod def _run(data: pd.DataFrame) -> pd.DataFrame: ....
テストは切り出した関数に対して行うことで、依存タスクをモックすることなく挙動のテストを行うようにしています。
依存解析部分をテストする
前項で挙動のテストはできましたが、Parameterの渡し忘れ等は検出できません。
そこで、TaskOnKart.requires
を呼び出すことで、依存解決部分もテストしています。
class TestPipeline(unittest.TestCase): def test_pipeline_run(self): RunPipeline().requires()
TaskOnKart.requires
は該当タスクの依存を解決するメソッドであり、パイプラインの実行自体はされません。
パラメータの渡し忘れがあればTaskOnKart.requires
が依存を解決できずここで落ちてくれるため、実行を待たずunittestでそれを検出できます。
タスクの取り違えをテストする
前項でParameterの渡し忘れについて触れましたが、誤ったParameterやTaskInstanceParameter(gokartタスク用のParameter)を後続に渡してしまう場合もあります。
Paramterの場合、型が異なっていれば前項のTaskOnKart.requires
で落ちてくれますが、TaskInstanceParameterの場合は取り違えたタスクでも依存解決自体は可能なため、テストで検出することは困難です。
例えば、前処理後のデータを後続に渡そうとした場合に、誤って前処理前のデータを渡してしまった場合などです。最悪の場合、データ構造が全く同じなためパイプライン実行後にも気づかない場合もあります。
そのため、TaskInstanceParameterに制約をかけるexpected_type
という機能を用いてタスクの取り違いを防いでいます。
expected_type
はTaskInstanceParameter
の引数であり、ここで指定したクラス以外がParameterとして渡された場合に、依存解決時にエラーとなります。
class TaskA(gokart.TaskOnKart): data = gokart.TaskInstanceParameter(expected_type=TaskB)
これを設定していくことで、期待するタスクが渡されることを実行タスク側で強制でき、またそれを前述のrequires
テストで保証できます。
テストでの利用以外にも、型チェックを実装した基底クラスを強制するなど、痒い所に手が届く機能です。
まとめ
本記事では、エムスリーで実際に使っているgokartのtipsを紹介しました。 gokartを使っている皆さんの助けになれば幸いです。
gokartのポテンシャルを考えれば本記事はtipsのほんの一部で、まだまだgokartの有用な使い方が眠っていると思いますので、我こそはgokartをうまく使っている!という方はどしどしブログ等で紹介ください!
We're hiring!
AI・機械学習チームでは、gokartを使ってMLプロダクトを爆速開発しています!gokartを使ってMLシステムを作っていきたい方は是非ご応募ください! また、エムスリーではgokart(Python)以外にもさまざまな技術スタックで100を超えるプロダクトを開発していますので、ピンときた方は是非ご応募ください。
*1:実際にはgokart.buildというインラインでgokartパイプラインを実行できる機能が最近実装されています。