こんにちは。エムスリーエンジニアリンググループ AI・機械学習チームで機械学習エンジニアをしている農見(@rookzeno) です。最近はgokartを使ったパイプライン開発に勤しんでます。
皆さんはgokartというものをご存知でしょうか。この記事を開く人は知ってそうですが、gokartとはエムスリーがメンテナンスしている機械学習パイプラインOSSです。もしgokartのことを知らなかった人が居たらこのgokartの記事を読んでください。
エムスリー内ではこれを全面的に利用して開発を行なっていますが、その知見は社内に閉じてるものも多いです。そこでエムスリー内でどんな感じでgokartを使ってるかというのをTitanicデータセットを利用して説明していこうと思います。
今回使用したコードはこちら
はじめに
弊社では機械学習プロジェクトは主に以下の流れで行われています。
- BigQueryからデータをダウンロードする
- データを加工整形して予測を作成する
- CloudSQLやRedisにuploadする
- 1~3をpipeline形式でまとめる
そこでTitanicのデータセットでも同じようにするため以下の流れで作成しました。
- Kaggleからデータをダウンロードする
- データを加工整形して予測を作成する
- Kaggleにsubmitする
- 1~3をpipeline形式でまとめる
全体pipelineの作成
pipeline部分から見たほうが分かりやすいので、最初にpipeline形式の説明をします。pipeline形式とはrequireに全てを書く以下の書き方のことです。最近の社内ではこの形式で書くことが多いです。
上から各タスクを見ればいいので見やすい形式かなと思ってます。
import luigi from titanickart.processing.download_data import DownloadData from titanickart.processing.make_features import MakeFeatures from titanickart.processing.model import PredictXGBoostModel from titanickart.processing.submit import SubmitData from titanickart.task_template import TitanicKart class TitanicKartPipeline(TitanicKart): submit: bool = luigi.BoolParameter() username: str = luigi.Parameter() api_key: str = luigi.Parameter(significant=False) def requires(self): data = DownloadData(username=self.username, api_key=self.api_key) processed_data = MakeFeatures(data=data) submission = PredictXGBoostModel(data=processed_data) dummy = SubmitData(submission=submission, submit=self.submit) return dummy def run(self): self.dump('finished')
task_templateのTitanicKartについてはこれです。task_namespaceを分けておくとキャッシュがわかりやすくなるのでおすすめです。
class TitanicKart(gokart.TaskOnKart): task_namespace = 'TitanicKart' def __init__(self, *args, **kwargs): super(TitanicKart, self).__init__(*args, **kwargs) self.logger = getLogger(self.__module__)
では各タスクの詳細に入っていきましょう。
1. データのダウンロード
Kaggle APIを利用してTitanicのデータをダウンロードしました。init_on_kaggleは認証に必要な部分を作る関数です。Kaggle APIに関してはこのnotebookを参考にしました。
ダウンロードしたデータをpandasで読み込んで次のタスクに渡しています。
api_keyでsignificant=FalseにしてるのはFalseにしないとapi_keyがlogに出てしまうからです。注意しましょう。
class DownloadData(TitanicKart): username: str = luigi.Parameter() api_key: str = luigi.Parameter(significant=False) def run(self): self.dump(self._run(self.username, self.api_key)) @classmethod def _run(cls, username: str, api_key: str) -> pd.DataFrame: from kaggle import KaggleApi # because of an error in __init__.py cls.init_on_kaggle(username, api_key) api = KaggleApi() api.authenticate() api.competition_download_file('titanic', 'train.csv', path='tmp_download') api.competition_download_file('titanic', 'test.csv', path='tmp_download') train = pd.read_csv('tmp_download/train.csv') test = pd.read_csv('tmp_download/test.csv') data = pd.concat([train, test]) return data @staticmethod def init_on_kaggle(username, api_key): KAGGLE_CONFIG_DIR = os.path.join(os.path.expandvars('$HOME'), '.kaggle') try: os.makedirs(KAGGLE_CONFIG_DIR) except FileExistsError: return 0 api_dict = {'username': username, 'key': api_key} with open(f'{KAGGLE_CONFIG_DIR}/kaggle.json', 'w', encoding='utf-8') as f: json.dump(api_dict, f) cmd = f'chmod 600 {KAGGLE_CONFIG_DIR}/kaggle.json' output = subprocess.check_output(cmd.split(' ')) output = output.decode(encoding='UTF-8') return 0
2. 特徴量生成とモデリング
今回は特徴量生成は適当でこれだけです。
@staticmethod def _make_features(data: pd.DataFrame, use_columns: list) -> pd.DataFrame: data['Sex'] = data['Sex'].apply(lambda x: x == 'male').astype(int) data = pd.get_dummies(data=data, columns=['Embarked']) return data[use_columns]
モデル部分は学習部分と推論部分を分けて作成しています。今回は意味はないですが、推論データだけを変えたい時に再学習せずに推論できるメリットがあります。
class TrainXGBoostModel(TitanicKart): data = gokart.TaskInstanceParameter() random_seed: int = luigi.IntParameter(default=42) def requires(self): return dict(data=self.data) def run(self): data = self.load_data_frame('data') self.dump(self._train_xgb_model(data, self.random_seed)) @staticmethod def _train_xgb_model(data: pd.DataFrame, random_seed: int) -> xgboost.Booster: train = data[data['Survived'].notnull()] X_train, X_valid, y_train, y_valid = train_test_split(train.drop(['PassengerId', 'Survived'], axis=1), train['Survived'], test_size=0.2, random_state=random_seed) dtrain = xgboost.DMatrix(X_train, label=y_train) dvalid = xgboost.DMatrix(X_valid, label=y_valid) params = {'objective': 'binary:logistic'} model = xgboost.train( params=params, dtrain=dtrain, num_boost_round=200, evals=[(dtrain, 'train'), (dvalid, 'valid')], verbose_eval=50, ) return model class PredictXGBoostModel(TitanicKart): data = gokart.TaskInstanceParameter() def requires(self): model = TrainXGBoostModel(data=self.data) return dict(data=self.data, model=model) def run(self): data = self.load_data_frame('data') model = self.load('model') self.dump(self._predict_xgb_model(data, model)) @staticmethod def _predict_xgb_model(data: pd.DataFrame, model=xgboost.Booster) -> pd.DataFrame: test = data[data['Survived'].isnull()] dtest = xgboost.DMatrix(test.drop(['PassengerId', 'Survived'], axis=1)) test['Survived'] = model.predict(dtest) test['Survived'] = test['Survived'].apply(lambda x: 1 if x > 0.5 else 0) return test[['PassengerId', 'Survived']]
3. submitする
Kaggle APIを使ってsubmitします。このコードのスコアは0.73684でした。14450/15000位、低い……
class SubmitData(TitanicKart): submission = gokart.TaskInstanceParameter() submit = luigi.BoolParameter() def requires(self): return dict(submission=self.submission) def run(self): if self.submit: self.dump(self._run(self.load_data_frame('submission'))) else: self.dump('not submitted') @staticmethod def _run(submission: pd.DataFrame) -> pd.DataFrame: from kaggle import KaggleApi # because of an error in __init__.py api = KaggleApi() api.authenticate() submission.to_csv('titatic_submission.csv', index=False) csv_file_path = 'titatic_submission.csv' message = 'titanic submission' competition_id = 'titanic' api.competition_submit(csv_file_path, message, competition_id) return 'submitted'
pipelineを呼び出すスクリプト
最後にpipelineを実行するスクリプトを作れば完成です。pythonからこのコードを呼べば一連の流れが全て動きます。
ここでgokartが便利なところを挙げるとsubmitがFalseで回した後に、Trueで回すと途中までのキャッシュを使用してSubmitDataだけを動かすところです。再度ダウンロードも学習もしない、便利すぎますね。
def main() -> int: parser = argparse.ArgumentParser() parser.add_argument('--submit', action='store_true') args = parser.parse_args() gokart.add_config('./conf/base.ini') task = TitanicKartPipeline(submit=args.submit) gokart.build(task, log_level=logging.DEBUG) return 0 if __name__ == '__main__': sys.exit(main())
ここでconfを読み込んでいるんですが、confにこれを書いてるとTitanicKartPipelineで環境変数から拾ってきたusernameとapi_keyを使ってくれるので便利です。
[TitanicKart.TitanicKartPipeline] username = ${KAGGLE_USERNAME} api_key = ${KAGGLE_KEY}
まとめ
Titanicデータセットを使って実務っぽい感じのgokart pipelineを作成しました。gokartを使うと形式が固定されるのでコードが読みやすくなること、再現性が保証できるメリットがあります。皆さんも使ってみてはいかがでしょうか。
We are hiring!!
ここでgokartについて書きましたが、弊社に入れば勝手に覚えるので入社して覚えるのがおすすめです。 僕も入社するまでは全くgokartに触ったことはありませんでしたが、今は使えるようになりました。
以下のURLからカジュアル面談をお待ちしています!