この記事はエムスリーAdvent Calendar 2023の13日目の記事です。
こんにちは、製薬企業向けプラットフォームチームエンジニアの桑原です。
前回のJJUG CCC の登壇についてのブログで Axon Framework について軽く触れました。今回はAxon Frameworkがどのようなもので、どういった使い方をするかを紹介したいと思います。
背景:CommandとQueryに最適なモデルが異なる
上述のリンクで紹介したメッセージ配信システムの設計では、アプリケーション特性の違いからメッセージ配信を担うCommandとメッセージ閲覧を担うQueryに分けました。 結果的にCQRSのようになりましたが、最初からCQRSを目指していたわけではなく、データモデルは1つのものを考えていました。
CommandとEvent追記型との相性は良かった
データモデリングはイミュータブルモデルの考えを参考に、リソースとイベントに分けて考えました。イベントは更新や上書きせずに追記する考え方はCommandの実装ととても相性が良かったです。
QueryがEvent追記型との相性は良くなかった
メッセージには 送信,開封,削除 などのイベントが紐付けられ、1つのメッセージに対して複数の開封イベントが発生することもあります。
そして以下のようなテーブルの状態になってる場合を想定します。
message_id | title | body |
---|---|---|
1 | hello axon |
this is message body |
2 | not expected to be read |
this message is not expected to be read |
message_id | read_datetime |
---|---|
1 | 2023-12-13T10:00:00+09:00 |
2 | 2023-12-13T11:00:00+09:00 |
1 | 2023-12-13T12:00:00+09:00 |
message_id | canceled_datetime |
---|---|
2 | 2023-12-13T12:00:00+09:00 |
message_id:1 に対しては開封レコードが2件入ってます。
message_id:2 に対しては開封レコードが入っているもののその後開封取り消しされています。
この状態で、メッセージ一覧画面などで現在有効な未読一覧を検索したい場合、一筋縄ではいかなくなります。
SQLで表現可能?
message
,message_read
,message_read_canceled
を結合し、message_read
にレコードがなければ未読。 またはmessage_read_canceled
の最新日がmessage_read
の最新日付よりも未来日であれば未読。
などをSQLで表現するのはとても骨が折れそうです。アプリで現在状態の導出?
またはアプリでイベントを取得し直して再計算すれば実現はできそうですが性能が満足に出るとは到底思えません。
苦肉の解決策
そのままだとクエリに耐えられないことは容易に想像できたので、メッセージ配信システムではイベントを保存すると同時にクエリ用のステータスも更新することにしております。
message_id | title | body | status |
---|---|---|---|
1 | hello axon |
this is message body |
READ |
2 | not expected to be read |
this message is not expected to be read |
UNREAD |
せっかくイベントの追記してるのに、状態の更新もしないといけないのは二度手間感も少々ありますね。
事実を記録するという観点でイベントをそのまま追記していくのはとても有効な設計なのですが、やはりそのままだとクエリで厳しくなり、データモデルにステータスなどをもたせるなどの対応が必要になってしまいそうです。かといってクエリ用のリードモデルの作成となると一気に考えることが増えそうです。
Axon Framework
前回の JJUG CCC 2023 Fall で似た内容でこの悩みを解決されていた事例がありました。
実践Pub/Subマイクロサービス――SpringとAxonで作る疎結合でスケーラブルなシステム
このセッションでAxon Framework が使用されており、とても面白そうだったので最近私も家で遊び始めてます。
ざっくりアーキテクチャ
Axon Framework はDDDとCQRSをベースとしたマイクロサービス用のフレームワークです。Axon Frameworkとは別にAxon Serverも必要になり、Axon Framework は Axon Server とのやり取りを隠蔽してくれる立ち位置にいます。ざっくりとしたアーキテクチャは Architecture Overview の図が参考になると思います。
開発者が意識すべきは主に3点です。
- Commandを受け取って(生成して)Axon Server に渡す。
- Axon Server から通知されるEventを受け取り、read用のモデルReadModelを生成する。
- QueryはReadModelから欲しい情報を取得する。
イベントの保存と送受信を全部Axon Frameworkがやってくれるのでビジネスロジックの開発に注力して高機能なCQRSを開発することが可能になります。更に、Spring Boot Integration を使用すれば Axon Server へのデータ送信のためのコンポーネントやイベント検知のためのアノテーションが提供されるため、開発者は外部システムの存在を意識することなくメソッドを呼び出すだけで済んでしまいます。
実際に Axon Framework を使って、先程の Command/Query 用のモデルをどのように構築していくかを確かめていきます。
Command
- Axon ServerへCommandを渡す
@RestController public class MessageRestEndpoint { private final CommandGateway commandGateway; @PostMapping("/create-message") public CompletableFuture<Void> createMessage(@RequestBody MessageCreate messageCreate) { String messageId = UUID.randomUUID().toString(); return commandGateway.send(new CreateMessageCommand(messageId, messageCreate.title, messageCreate.body)); } @PostMapping("/{messageId}/read") public CompletableFuture<Void> readMessage(@PathVariable String messageId) { return commandGateway.send(new ReadMessageCommand(messageId)); } } public class CreateMessageCommand { @TargetAggregateIdentifier private final String messageId; private final String title; private final String body; // getter,setter,equals,hashCode,toString... }
Commandの発行自体は CommandGateway
を経由するだけです。
- Axon Server から CommandHandlerを呼び出し
Commandを発行するとAxon Framework により @CommandHandler
付与されたメソッドを呼び出され、ここでCommandからEventを生成して発火します。
@Aggregate(snapshotTriggerDefinition = "messageAggregateSnapshotTriggerDefinition") public class MessageAggregate { @CommandHandler public MessageAggregate(CreateMessageCommand command) { apply(new MessageCreatedEvent(command.getMessageId(), command.getTitle(), command.getBody())); } } public class MessageCreatedEvent { private final String messageId; private final String title; private final String body; // getter,setter,equals,hashCode,toString... }
ここで apply
された MessageCreatedEvent
はAxon Server へ送信され、イベントジャーナルとして永続化されます。図の右上Event Storeに該当します。
Command側のイベントの登録は以上です。Axon Frameworkにより永続化層のことは何も触れることなく、Eventが保存されています。また、永続化先はRDBやKafkaなどに対応しているため、要件に沿った製品の選定も柔軟に選択できます。
EventからReadModelへのマッピング
- EventHandlerの呼び出し
Eventが永続化されると、Axon Framework から @EventHandler
付与したメソッドを呼び出されます。ここでEventからQuery側に都合のいいReadModelの作成を行えます。
@Service @ProcessingGroup("messages") public class InMemoryMessagesEventHandler implements MessagesEventHandler { private final Map<String, Message> messages = new HashMap<>(); public InMemoryMessagesEventHandler(QueryUpdateEmitter emitter) { this.emitter = emitter; } @EventHandler public void on(MessageCreatedEvent event) { String messageId = event.getMessageId(); messages.put(messageId, new Message(messageId, event.getTitle(), event.getBody())); } @EventHandler public void on(MessageReadEvent event) { messages.computeIfPresent(event.getMessageId(), (messageId, message) -> { message.setMessageStatusRead(); message.setLatestReadDateTime(event.getReadDateTime()); emitUpdate(message); return message; }); } }
MessageReadEvent
のイベントハンドラ内で、 message.setMessageStatusRead()
を呼び出しています。このタイミングでイベントを元に、更に必要があれば現在や過去の状態を元にReadModelに対して検索用のステータスを導出、保持させることができます。Eventと検索用のReadModelが分離されてスッキリします。
ここでは簡素化のため private final Map<String, Message> messages = new HashMap<>();
に対してReadModelの構築をしていますが、永続化先はRDBやNoSQLも選択可能です。自由に記述できるのでここでも要件に沿った選定が可能です。
Query
Queryではすでに特化したReadModelが作成されているので、取得するだけです。
まとめ
とても簡単にEventSourcing + CQRSのアプリケーションが作成できてしまいました! Axon Frameworkを活用することでビジネスロジックの開発に注力しても高機能なCQRS環境が作成できてしまうので、とても強力なフレームワークだと感じています。ここでは紹介しきれていませんが新たなReadModelを作成したい場合はEventのReplay機能など魅力的な機能がまだまだ他にもあるようですので、どんどん触ってみたいと思います。
参考記事
- A Guide to the Axon Framework:Baeldung
- Axon Frameworkを使ってみる:Qiita
- 実践Pub/Subマイクロサービス――SpringとAxonで作る疎結合でスケーラブルなシステム
We are Hiring!
エムスリーはまだまだJavaやKotlinなどのJVM系言語によるシステム開発も行っております! 興味を持たれたらぜひこちらから!