これはエムスリー Advent Calendar 2021の8日目の記事です。前日は @AkiraGoto による、 そのEFSって自動バックアップでいいんでしたっけ? でした。
AI・機械学習チームで2021年新卒の北川(@kitagry)です。 最近はチームの人にステッパーを買わせまくっています笑 リモートワークで運動不足のエンジニアにおすすめです。
今日は趣味で作っているKubernetesのカスタムコントローラーの話をしようと思います。
モチベーション
今回作るものはあるCronJobの実行が終わった時に、実行されるJobを作成しようと思います。 似たものとしてはArgo Eventsというものがあります。 Argo EventsはEventSourceとSensorsの組み合わせによって、外部のイベントからJobを実行できます。 できるイベントにはWebhook, GCP Pub/Sub, AWS SNS等様々なイベントを登録できます。 この機能はとても便利なのですが、一方でキャッチアップが大変です。 そのため、チームで使う用のミニマルなワークフローエンジンを作ってみたいと思って作り始めました。 現在はまだ安定版とは行かないので個人で試しに作っているだけですが、将来的にはチームでも使えたら面白いかなと思います。
仕様を決める!
弊チームでは定時バッチを動かす時にCronJobを使っています。 そこで、CronJobに紐づくJobが正常終了したときに、実行されるJobのカスタムリソースを作成します。
CronJobに紐づくJobが終了したかどうかは以下の2通りの方法で取得できます。
- JobのStatusの変更を確認し、それに紐づくCronJobを見る
- CronJob Controllerが出すEventリソースを確認し、それに紐づくCronJobを見る
この記事では2つ目の方をどのように作成するかについて見ていきます。(というか1つ目の方法はこの記事を書きながら思いつきました。。) KubernetesのCronJob Controllerは以下のようにJobの作成、Jobの正常終了・異常終了などを知らせるEventを供給します。
$ kubectl get events | grep cronjob LAST SEEN TYPE REASON OBJECT MESSAGE 8m55s Normal SuccessfulCreate cronjob/hello Created job hello-27210634 8m51s Normal SawCompletedJob cronjob/hello Saw completed job: hello-27210634, status: Complete 7m12s Normal SuccessfulCreate cronjob/hello Created job hello-27210636 63s Normal SawCompletedJob cronjob/hello Saw completed job: hello-27210636, status: Failed
これによってCronJobが実行されたことや正常に終了したといったことを検知できます。 弊チームではこのEventを使用してCronJobが指定された時間までに終了しているかなどをモニタリングしています。 よろしければ以前書いたブログを読んでください。
話をもどすと、このEventを監視してSawCompletedJobが出た時に自身で設定したJobを作成すれば良いです。
ただし、ここで1つ問題があります。 今回作成するCustom ResourceはターゲットとなるCronJobやEventとは所有関係にないのでシンプルな作り方ではEventが作成されたときに、Reconcile処理が走ることはありません。 そのため、外部ResourceをWatchしてReconcileを起動する必要があります。
このような外部リソースをWatchしてReconcile処理を起動する方法がkubebuilderに用意されています。1
具体的に言うと Watches
メソッドを使うことによって、外部リソースをトリガーとしたReconcileを走らせることができます。
この Watches
メソッドについては、別のアドベントカレンダー(カスタムコントローラーで任意のイベントを起点にReconcileを実行する)で詳しく書かれていたので、ここでは簡単にだけ説明します。
func (blder *Builder) Watches(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder
Watchメソッドは起点となるイベントを供給するsrcとそれらを加工してReconcileのリクエストに修正するeventhandlerを引数に持ちます。
今回のようにKubernetes内のEventリソースを監視したい場合には、 &source.Kind{Type: &corev1.Event{}}
を指定することで、 &corev1.Event
が作成(更新)されるたびにイベントが供給されます。
eventhandlerでは &corev1.Event
に紐づく、自身のCustomResourceへのReconcileRequestを返すhandlerを作成します。
他は、普通にカスタムコントローラーを作成するのとほとんど変わりません。 では実際にどのように作成するかを見ていきましょう。
実装する!
簡単にどのように作成するかについて説明していきます。 実際に作ったものは以下のリポジトリにあります。
CustomResourceを定義
今回作成するCustomResourceの定義( /api/v1/eventjob_types.go
)は以下のようにします。
SpecにはどのEventを起点にJobを作成するかを決める Trigger
と、Eventによって作成される JobTemplate
をフィールドに持ちます。
type EventJobTriggerType string const ( // CronJobが通常終了した時のTrigger CompleteTrigger EventJobTriggerType = "Complete" // CronJobが異常終了した時のTrigger FailedTrigger EventJobTriggerType = "Failed" ) // CronJobのTriggerタイプ type EventJobTrigger struct { // apiVersionとKindを指定する metav1.TypeMeta `json:",inline"` // リソースの名前 Name string `json:"name"` // 終了のTrigger, 今回は Complete か Failed Type EventJobTriggerType `json:"type"` } func (t EventJobTrigger) Match(e corev1.Event) bool { return strings.HasSuffix(e.Message, fmt.Sprintf("status: %s", t.Type)) && e.InvolvedObject.Name == t.Name } // 今回のCustom Resourceのspec type EventJobSpec struct { // Trigger Trigger EventJobTrigger `json:"trigger"` // triggerで作成されるjobのテンプレート JobTemplate batchv1.JobTemplateSpec `json:"jobTemplate"` }
Controllerのセットアップ
次に SetupWithManager
メソッドでEventJobをManagerに登録する処理をします。
ここで先ほどの Watches
メソッドを呼び出しています。
const eventTriggerField = ".spec.trigger" func (r *EventJobReconciler) SetupWithManager(mgr ctrl.Manager) error { // fieldIndexにセットしておく if err := mgr.GetFieldIndexer().IndexField(context.Background(), &batchv1.EventJob{}, eventTriggerField, func(rawObj client.Object) []string { eventJob := rawObj.(*batchv1.EventJob) if eventJob.Spec.Trigger.APIVersion == "" || eventJob.Spec.Trigger.Kind == "" { return nil } return []string{formatTrigger(eventJob.Spec.Trigger.APIVersion, eventJob.Spec.Trigger.Kind)} }); err != nil { return err } return ctrl.NewControllerManagedBy(mgr). For(&batchv1.EventJob{}). Owns(&k8sbatchv1.Job{}). Watches( &source.Kind{Type: &corev1.Event{}}, // EventリソースをWatchすることを指定する handler.EnqueueRequestsFromMapFunc(r.findObjectsForEvent), // findObjectsForEventによってRequestをEnqueueする ). Complete(r) }
次が Watches
メソッドのeventhandlerとして渡していたメソッドになります。
ここでEventに紐づくEventJobを探して、Requestを作成しています。
// Eventに対して、該当するEventJobへのreconcile.Requestを返す func (r *EventJobReconciler) findObjectsForEvent(object client.Object) []reconcile.Request { event := object.(*corev1.Event) // eventに該当するEventJobListを取得する listOps := &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector(eventTriggerField, formatTrigger(event.InvolvedObject.APIVersion, event.InvolvedObject.Kind)), Namespace: event.Namespace, } var attachedEventJobs batchv1.EventJobList err := r.List( context.Background(), &attachedEventJobs, &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector(eventTriggerField, formatTrigger(event.InvolvedObject.APIVersion, event.InvolvedObject.Kind)), Namespace: event.Namespace, }) if err != nil { return []reconcile.Request{} } // EventがEventJobのTriggerにマッチするかを確認する requests := make([]reconcile.Request, 0, len(attachedEventJobs.Items)) for _, eventJob := range attachedEventJobs.Items { if !eventJob.Spec.Trigger.Match(*event) { continue } requests = append(requests, reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: eventJob.Namespace, Name: eventJob.Name, }, }) } return requests }
あとは Reconcile で JobTemplate から Job を作成するだけです。 こちらは意外と長いのと、特に他のカスタムコントローラーで行うのと変わりないため割愛します。 詳しくはリポジトリを見ていただければと思います。
実際に動かしてみる!
動作確認のために1分毎にJobを作成するCronJobを用意します。
apiVersion: batch/v1 kind: CronJob metadata: name: hello spec: schedule: "* * * * *" jobTemplate: spec: template: spec: containers: - name: hello image: busybox command: - echo - 'Hello' restartPolicy: Never
次に今回作成したEventJobを用意します。
apiVersion: batch.kitagry.github.io/v1 kind: EventJob metadata: name: eventjob-sample spec: trigger: apiVersion: batch/v1 kind: CronJob name: hello type: Complete jobTemplate: spec: template: spec: containers: - name: hello-event-job image: busybox command: - /bin/sh - -c - sleep 120s && echo "Hello World" restartPolicy: OnFailure
これら2つをapplyして大人しく待っていると、CronJobのPodが終了した後にEventJobのJobが作成できたことが確認できます。
$ kubectl get pods NAME READY STATUS RESTARTS AGE eventjob-sample-xa4av-pzp5j 0/1 ContainerCreating 0 2s hello-27311583-9khjc 0/1 Completed 0 6s
これであるCronJobがタスクを実行した後に起動するJobができました! これを応用すれば、複数のCronJobを待って実行するタスクや自分自身が作成したJobを待って終了時にJobを作成するなどいろいろ出来そうです。夢が広がりますね!
まとめ
今回はCronJobの終了EventからJobを作成するCustom Controllerを作成しました。 その過程で、所有関係にないResourceのイベントからReconcile処理を走らせる方法について学びました。 これを応用すればいろいろ面白いCustomResourceが作成できそうです!
We are hiring!!
現在AI・機械学習チームのサービスはほとんどがGKE上で動いています。 そこで、ますますKubernetesなどの知見を持った人が必要です! これらのことに興味あるかたと一緒に働きたいです!!