この記事は CADDi プロダクトチーム Advent Calendar 2024の2日目の記事です。
Tech チームの前多です。個人的なことですが この一年で10kgほど減量しました。体重や食事量、活動量の記録と可視化を行って調整してきた結果だと思います。 人生もシステムもオブザーバビリティが大事ですね。
CADDi の技術スタックは採用サイトでも紹介されている通り、TypeScript, Python, Rustの比率が高めです。 一方でプロダクトや組織の拡大に合わせて、生産性の高い技術スタックの検討も行っています。
JavaおよびJVM関連技術も検討候補の一つで、私も3,4年ぶりくらいにJavaやQuarkusを書いたりしています。
今回はその中でも私の最近のお気に入りである Apache Camel で、マイクロサービス間のデータ連携の信頼性を高める方法について書きます。
この記事を読むにあたり、以下についてはある程度知っていることを前提としています。
- Java
- Quarkus
- Saga Patternなどマイクロサービスのデータ整合性を担保する設計
最後の項については、 以下の記事がとても参考になると思います。
Apache Camel について
Apache Camel は多数のサービスとの連携を行うライブラリで、 HTTP, FTP, RDBMS, メッセージングプロトコル(JMS, Kafka, ActiveMQ, SQS, Cloud Pub/Sub,ほか)などの多数の対応プロトコルを相互変換して入出力できます。
例えば Cloud Pub/Subのメッセージを受け取って、何かの処理をして、別のCloud Pub/Subへ送る、といったような処理が DSLで記載できます。 また、送信相手が Cloud Pub/Sub から Kafkaに変更になった場合でもDSLの修正だけで対応できるので、 連携先の通信方式に合わせてデータを変換するといったような処理をほとんど書く必要がなくなります。
私は今まで Apache Camelをあまり使ってきませんでたが、知れば知るほど便利なやつだなと思うようになりました。 今まで使ってこなかった理由は、抽象度が高いこと(連携先が固定なら直接通信ライブラリを使った方が実装が早いし、様々な技術を下敷きに構成されている技術は学習コストが高い)や、日本語情報の少なさ、ドキュメントが少々見づらいことなどがあるのかなと思いました。
また、Apache Camelが ESB(Enterprise Service Bus)のような中央集権型のメッセージサーバーと思っていたことも理由かも知れません。 ですが、Apache Camelはあくまでライブラリでありサーバー用途で使えるような永続化機能は持っていません。 むしろ、各プロセスにプロセス固有のサービス間連携処理を組み込む方が適しています。
Apache Camel の Saga EIPについて
元々、Apache Camel を試してみようと思ったのは、マイクロサービス間のデータ連携の信頼性を上げる方法を探していからでした。 その方法の一つとして Saga Patternが有用なのはわかるが、 Saga Pattern をちゃんとサポートしているライブラリやフレームワークあるんだろうか、 というところに Apache Camel の Saga EIP を見つけました。 見つけたのはいいのですが、私たちが普段使っている Cloud Pub/Subと連携したサンプルがなかったので、ちょっと試してみようというのが今回の記事です。
Saga EIP を試すにあたり、前提知識として必要となるのが MicroProfile LRA(Long Running Actionです。
MicroProfile LRA について
MicroProfile LRA は、Saga トランザクションを実現するための仕様で、詳細についてはこちらの記事を見てもらうのが良いでしょう。 speakerdeck.com
資料にもある通り、 LRA Coordinator と呼ばれるトランザクションの管理サーバーと、Participants と呼ばれるトランザクションに参加する各サービスが協調してSaga トランザクションを構成します。 最初のParticipants は Coordinator にトランザクションを開始します。その際に、自身のCompensation(補償トランザクション)のURL も登録します。 Coordinatorは トランザクションIDになるヘッダーを発行するので、その値を 次のParticipantsに引き継ぐことで分散トランザクションを構築していきます。
どこかの Participants がエラーを報告した場合、Coordinator はそのトランザクションに紐づく全ての Participantsの補償トランザクションのURLを呼び出します。 Participantsが適切に補償トランザクションを実装することで、そのトランザクションの処理は取り消されることにになります。
MicroProfile LRA は Coordinator の仕様のほか、Participants が実装するべきアノテーションを定めています。 ただし、このアノテーションはJakarta RESTful Web Services(JAX-RS) に基づくので、その他のフレームワークやJava以外の言語では使用しづらいです。
そこで Apache Camel の Saga EIP の出番です。 Saga EIP は Camel 側で MicroProfile LRA に関する処理を実行してくれます。 そのため、任意の処理に対して Saga トランザクションを後付けで追加できます。
いろんな言語で書かれたサービスを Saga トランザクションに対応させるのにもってこいではないでしょうか。
LRA Coordinator の実装について
解説した通り、 LRA Coordinator が Camel Saga EIPを実現する上で欠かせない要素になります。
LRA Coordinator の選択肢としては次のものがあります。
Narayana LRA
Narayana 自体は JTAの参照実装を提供しているプロジェクトですが、 LRA Coordinator の参照実装(おそらく)も提供しています。 Quarkus に組み込んで実行できる Extension や Docker イメージ もあります。 今回のサンプルコードではDocker イメージを使用しています。
MicroTx
Oracle提供で、有償版の提供もある Coordinator 実装。XAやTCCといった異なる種類の分散トランザクションも扱えるほか、 有償版の場合可用性サポートがある。無償版の場合、時間あたりのトランザクション実行数の上限がある。
その他
テスト用にオンメモリで動く実装が、Camelや Helidonで提供されているようです。 他にも何かあれば教えてください。
所感
現状、Coordinator の選択肢が少ないことが懸念だと感じています。 参照実装のNarayana LRAでも申し分ないのですが、こちらはトランザクションのデータをオンメモリで管理しているので、可用性での不安が残ります。 もし、Saga EIP を全面的に採用するとなったら、独自実装や拡張で可用性を確保しにいく必要があると感じています。
サンプル実行
ここまでくれば、実装の前提知識は整いました。 簡単なサンプルで、試してみましょう。
今回解説するソースコードの全量は https://github.com/kencharos/welcome-camel-saga にあります。
今回のサンプルでは、2つのマイクロサービスと LRA Coordinatorがあり、2つのサービスは Cloud Pub/Subを通してデータ連携をします。 Cloud Pub/Subとの連携、LRA Coordinator とのやり取り、補償トランザクションの実行制御は Apache Camel で行います。
実装を簡単にするため、二つのサービスは同一の Quakrus アプリケーションとしていますが、仕組み的にプロセスを分離しても動くはずです。
いくつか実装上のポイントを解説していきます。
Camel Sagaの設定
Camel Contextに Saga EIPで使用する Sagaサービスの実装を設定します。 Quarkus camel では設定ファイル経由の設定ができないので、以下のような初期化コードを作成します。
LRASagaService
はLRA Coordinator とやり取りするクラスで、CoordinatorのURL(今回は Docker コンテナで Narayana RLAを用いる)と、ParticipantのURL(自身のサービスのURL, Coordinator が補償トランザクションを呼び出すために用いるので、Coordinatorから解決可能なもの) を設定します。
@ApplicationScoped public class CamelSagaConfig { public void onComponentAdd(@Observes ComponentAddEvent event) throws Exception { if (event.getContext().hasService(LRASagaService.class) == null) { LRASagaService service = new LRASagaService(); // LRA Coordinator(トランザクションマネージャ)のURL service.setCoordinatorUrl("http://localhost:8081"); /* LRA Coordinator に伝える、自身のアプリケーションのURL。 Saga トランザクション参加の際にこのURLを伝え、 何らかのエラーで補償トランザクションを実行する場合にCoordinatorがこのURLで呼び出す。 Coordinatorは今回はDockerで実行しているので、Dockerから解決可能なホストアドレス(host.docker.internal)にしている。 */ service.setLocalParticipantUrl("http://host.docker.internal:8080"); event.getContext().addService(service); } } }
サービスの実装
各サービスで実行する処理は今回は QuarkusアプリケーションのCDI Beanで、Camelから呼び出されます。
最初のサービスでは乱数を設定して次のサービスへの入力値にします。
CDI Beanではメッセージング処理などは実装せず、Camel に任せる形です。
transactionContext
は LRA Coordinator が発行したトランザクションのIDに相当するもので、これでどのトランザクションの処理であるかを識別できますので、
実際にデータを処理する場合はこのIDと紐付け可能にしておくと良いでしょう。
cancel メソッドは 補償トランザクションとして実行する処理にあたります。今回はサンプルのためログ出力するだけにしておきますが、実際には何かしら登録されたデータを消すなどの処理を書きます。
@ApplicationScoped @Named("ServiceA") public class ServiceA { private final SecureRandom rnd = new SecureRandom(); public InputB perform(String transactionContext, InputA body) { var price = rnd.nextInt(1000); System.out.printf("ServiceA: LRA-transaction(%s), %s issue price %d\n", transactionContext, body.title(), price); // do persist process return new InputB(body.title(), price); } public void cancel(String transactionContext) { System.out.printf("\"ServiceA: LRA-transaction(%s) do cancel..\n", transactionContext); // do cancel process } }
続いて、後続のサービスの処理です。補償トランザクションを発生させるために、前のサービスで発行した乱数の値によってエラーを発生させるようにしています。
@ApplicationScoped @Named("ServiceB") public class ServiceB { public void perform(String transactionContext, InputB body) { System.out.printf("ServiceB: LRA-transaction(%s), %s price %d\n", transactionContext, body.title(), body.price()); if (body.price() < 500) { throw new IllegalStateException("price must greater 500 transaction(" + transactionContext + ")"); } // do persist } public void cancel(String transactionContext) { System.out.printf("\"ServiceB: LRA-transaction(%s) do cancel..\n", transactionContext); // do cancel process } }
Camel Route 定義
ここからが、今回の中核の Camel のRoute 定義です。
まずは最初のサービスのRoute定義です。
@ApplicationScoped public class RouteA extends RouteBuilder { @Inject private ObjectMapper mapper; @Override public void configure() throws Exception { //@formatter:off // PubSub経由でメッセージを受け取り、Saga transactionを開始する from("google-pubsub:{{app.pubsub_project}}:service-a-subscription") .routeId("service-a-start-saga") .log("saga start with ${headers} | ${body}") .unmarshal(new JacksonDataFormat(InputA.class)) .saga()// Saga transactionの設定 /* トランザクション制御をマニュアルに設定する。 https://camel.apache.org/components/4.4.x/eips/saga-eip.html#_using_manual_completion_advanced AUTOの場合、このルートが終了するとtransactionが自動で完了する。 PubSubなど非同期通信で別のルートが起動する場合だとAUTOは使用できない。 一方で、https://github.com/apache/camel-spring-boot-examples/blob/main/saga/readme.adoc のサンプルの場合、JMSを使っているがJMSの応答を同期処理的に待ち合わせる replyの仕組みが使えるので利用する技術が同期応答可能ならAUTOが利用できる。 */ .completionMode(SagaCompletionMode.MANUAL) // 補償トランザクションが行われた場合のルート。どこかでエラーが起きてsaga:compensationが実行されたら、LRA Coordinator 経由で呼び出される .compensation("direct:cancel-a") // variableにLRA の値を保管しておく .setVariable("transaction_context", header(SagaConstants.SAGA_LONG_RUNNING_ACTION)) .doTry() // サービスA処理の実行後、後続サービスをPubSub経由で呼び出す .bean(ServiceA.class, "perform(${variable.transaction_context},${body})") // pubsub連携する場合は、Long-Running-Actionヘッダの値を自前で送信・復元する必要がある。 // 現時点でDSLでうまく設定できないため、processで記述する .process(exchange -> { exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, Map.of(SagaConstants.SAGA_LONG_RUNNING_ACTION, exchange.getVariable("transaction_context"))); }) .to("google-pubsub:{{app.pubsub_project}}:service-b-topic?exchangePattern=InOnly") .endDoTry() .doCatch(Exception.class) // 処理中の例外は saga:compensate を呼び出して補償トランザクションの実行を指示する。 .log("error on serviceA ${exception}") .to("saga:compensate") .end() .end(); //@formatter:on from("direct:cancel-a") .routeId("service-a-cancel") .log("cancel service a with body=${body}, headers=${headers}") .bean(ServiceA.class, "cancel(${header." + SagaConstants.SAGA_LONG_RUNNING_ACTION +"})"); } }
from
で Cloud PubSubのsubscriptionからメッセージを受け取って処理を開始します。
その後 Saga トランザクションの設定に入っていきます。
Pub/Sub, Kafkaなどの非同期処理と Saga を組み合わせる場合 .completionMode(SagaCompletionMode.MANUAL)
の設定が重要です。
これは Sagaトランザクションの補償・完了の制御を手動で行う必要があり、正常終了する場合には saga:completion
ルートを指示します。
逆にSagaCompletionMode.AUTO
の場合は現在のルートが終了したら自動で トランザクションの完了が呼び出されます。
これが使用できるケースは、Camel routeから実行する処理が全て同期で完結する場合です。
例えば、Javaの任意の処理やHTTP通信の完了を待ち合わせる場合や、メッセージングであっても応答の待機ができる場合です。
私が参考にしていたこちらのサンプル でも、 JMSの replyTo の仕組みを使って応答の待機をしていました(参考になったと同時に、大いに混乱した点でもあります)
次に大事なのは補償トランザクションが行われる際のルートを .compensation("direct:cancel-a")
のように指定しておく点です。
このルートは、エラー発生時に .to("saga:compensate")
のように補償トランザクションの指示が行われた時に、 LRA Coodinator 経由で呼び出されます。
もう一つ大事な点は、 LRA トランザクションIDを後続のサービスに伝播させることです。
この値は、 SagaConstants.SAGA_LONG_RUNNING_ACTION
という名前のヘッダに入っています。
この値を取得して、 メッセージ連携の属性に乗せる必要があります。
以下のように、 Cloud Pub/Sub では Attribute にこの値を設定して次のサービス用のトピックにメッセージを送っています。
// LRA トランザクションID の取得 .setVariable("transaction_context", header(SagaConstants.SAGA_LONG_RUNNING_ACTION)) .doTry() // サービスA処理の実行後、後続サービスをPubSub経由で呼び出す .bean(ServiceA.class, "perform(${variable.transaction_context},${body})") // LRA トランザクションID を Cloud Pub/SubのAttributeに埋め込む。 .process(exchange -> { exchange.getIn().setHeader(GooglePubsubConstants.ATTRIBUTES, Map.of(SagaConstants.SAGA_LONG_RUNNING_ACTION, exchange.getVariable("transaction_context"))); }) .to("google-pubsub:{{app.pubsub_project}}:service-b-topic?exchangePattern=InOnly")
次に、後続のサービスのRoute 定義を見てみます。
@ApplicationScoped public class RouteB extends RouteBuilder { @Inject private ObjectMapper mapper; @Override public void configure() throws Exception { // pubsubの開始。 // saga transactionを復元するため、PubSub Attributeから LRA 値を取得して所定のヘッダに設定してから Saga を開始する from("google-pubsub:{{app.pubsub_project}}:service-b-subscription") .routeId("service-b-continue-saga") .setHeader(SagaConstants.SAGA_LONG_RUNNING_ACTION, simple("${headers." + GooglePubsubConstants.ATTRIBUTES +"[" + SagaConstants.SAGA_LONG_RUNNING_ACTION +"]}")) .to("direct:service-b"); //@formatter:off from("direct:service-b") .log("saga start with ${headers} | ${body}") .unmarshal(new JacksonDataFormat(InputB.class)) .saga()// Saga transactionの設定 // 開始済みのSaga transactionに合流することを必須にする .propagation(SagaPropagation.MANDATORY) .completionMode(SagaCompletionMode.MANUAL) .compensation("direct:cancel-b") // LRA Transactionの値を控える .setVariable("transaction_context", header(SagaConstants.SAGA_LONG_RUNNING_ACTION)) .doTry() .bean(ServiceB.class, "perform(${variable.transaction_context},${body})") // 処理が終わったら、Sagaトランザクションを完了させる .to("saga:complete") .log("saga transaction complete success") .endDoTry() .doCatch(Exception.class) // ルート内のエラーで、トランザクション失敗通知 .log("error on serviceB ${exception}") .to("saga:compensate") .end() .end(); //@formatter:on // RLA Coordinator の補償処理から実行されるキャンセル処理 from("direct:cancel-b") .routeId("service-b-cancel") .log("cancel service b with body=${body}, headers=${headers}") .bean(ServiceB.class, "cancel(${header." + SagaConstants.SAGA_LONG_RUNNING_ACTION +"})"); } }
Sagaトランザクションを復元するには、SagaConstants.SAGA_LONG_RUNNING_ACTION
の名前のヘッダにトランザクションIDを設定します。
前回とは逆に Cloud Pub/Sub の AttributeからトランザクションID を取り出してヘッダに設定します。
後続のサービスは 既存のSagaトランザクションに合流することを必須とするため、 .propagation(SagaPropagation.MANDATORY)
を設定します。
これは有効な LRA トランザクションIDがない場合は処理が失敗します。
CDI Beanの処理が終わったら .to("saga:complete")
で トランザクションを完了させることを忘れないようにします。
これで、解説は終了です。 次にサンプルを動作させます。
サンプルコードの実行
docker compose, quarkus app を起動し、次のようなコマンドでCloud Pub/Sub エミュレータにメッセージを送信します。
DATA=$(echo '{"title":"test"}' | base64) curl -i -XPOST \ -H "Content-Type:application/json" \ -d "{\"messages\":[{\"data\": \"${DATA}\"}]}" \ http://localhost:8681/v1/projects/pj-local/topics/service-a-topic:publish
以下はトランザクションが成功する場合のログです。
# Camel Route Aのログ 2024-11-26 10:29:57,783 INFO [RouteA:29] (ForkJoinPool.commonPool-worker-4) saga start with {CamelGooglePubsubAttributes={}, CamelGooglePubsubMessageId=1, CamelGooglePubsubPublishTime=seconds: 1732584597 , Long-Running-Action=http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_2} | {"title":"test"} # サービスAでの処理 ServiceA: LRA-transaction(http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_2), test issue price 689 # Camel Route Bのログ 2024-11-26 10:29:57,932 INFO [RouteB:37] (ForkJoinPool.commonPool-worker-4) saga start with {CamelGooglePubsubAttributes={Long-Running-Action=http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_2}, CamelGooglePubsubMessageId=2, CamelGooglePubsubPublishTime=seconds: 1732584597 , Long-Running-Action=http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_2} | {"title":"test","price":689} # サービスBでの処理 ServiceB: LRA-transaction(http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_2), test price 689 # Saga Transaction完了ログ 2024-11-26 10:29:58,178 INFO [RouteB:50] (ForkJoinPool.commonPool-worker-8) saga transaction complete success
http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_2
が LRA トランザクションIDで同じ値が各サービスに渡り、正常にトランザクションが完了しています。
次にエラー時のログです。
# Camel route A 2024-11-26 10:32:49,907 INFO [RouteA:29] (ForkJoinPool.commonPool-worker-2) saga start with {CamelGooglePubsubAttributes={}, CamelGooglePubsubMessageId=7, CamelGooglePubsubPublishTime=seconds: 1732584769 , Long-Running-Action=http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b} | {"title":"test"} # サービスAの処理。エラーとなる値が生成された。 ServiceA: LRA-transaction(http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b), test issue price 406 # Camel route B 2024-11-26 10:32:49,959 INFO [RouteB:37] (ForkJoinPool.commonPool-worker-2) saga start with {CamelGooglePubsubAttributes={Long-Running-Action=http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b}, CamelGooglePubsubMessageId=8, CamelGooglePubsubPublishTime=seconds: 1732584769 , Long-Running-Action=http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b} | {"title":"test","price":406} # サービスBの処理 ServiceB: LRA-transaction(http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b), test price 406 # Camel Route Bのエラーハンドラ, saga:compensate の呼び出し。 2024-11-26 10:32:49,963 INFO [RouteB:54] (ForkJoinPool.commonPool-worker-2) error on serviceB java.lang.IllegalStateException: price must greater 500 transaction(http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b) # Camel Route A cancel route 2024-11-26 10:32:49,979 INFO [RouteA:65] (vert.x-worker-thread-1) cancel service a with body=, headers={Camel-Saga-Compensate=direct://cancel-a, CamelHttpMethod=PUT, CamelHttpPath=/lra-participant/compensate, CamelHttpQuery=Camel-Saga-Compensate=direct://cancel-a, CamelHttpRawQuery=Camel-Saga-Compensate=direct://cancel-a, CamelHttpUri=/lra-participant/compensate?Camel-Saga-Compensate=direct://cancel-a, CamelHttpUrl=http://host.docker.internal:8080/lra-participant/compensate?Camel-Saga-Compensate=direct://cancel-a, CamelVertxPlatformHttpLocalAddress=127.0.0.1:8080, CamelVertxPlatformHttpRemoteAddress=127.0.0.1:61640, content-length=0, Content-Type=text/plain, host=host.docker.internal:8080, Long-Running-Action=http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b, Long-Running-Action-Recovery=http://localhost:8081/lra-coordinator/recoveryhttp%3A%2F%2Flocalhost%3A8081%2Flra-coordinator%2F0_ffffac160002_8191_67452495_1b/0_ffffac160002_8191_67452495_1d, Narayana-LRA-Participant-Data=, User-Agent=Quarkus REST Client} # サービス A cancel "ServiceA: LRA-transaction(http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b) do cancel.. # Camel Route B cancel route 2024-11-26 10:32:49,989 INFO [RouteB:63] (vert.x-worker-thread-1) cancel service b with body=, headers={Camel-Saga-Compensate=direct://cancel-b, CamelHttpMethod=PUT, CamelHttpPath=/lra-participant/compensate, CamelHttpQuery=Camel-Saga-Compensate=direct://cancel-b, CamelHttpRawQuery=Camel-Saga-Compensate=direct://cancel-b, CamelHttpUri=/lra-participant/compensate?Camel-Saga-Compensate=direct://cancel-b, CamelHttpUrl=http://host.docker.internal:8080/lra-participant/compensate?Camel-Saga-Compensate=direct://cancel-b, CamelVertxPlatformHttpLocalAddress=127.0.0.1:8080, CamelVertxPlatformHttpRemoteAddress=127.0.0.1:61641, content-length=0, Content-Type=text/plain, host=host.docker.internal:8080, Long-Running-Action=http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b, Long-Running-Action-Recovery=http://localhost:8081/lra-coordinator/recoveryhttp%3A%2F%2Flocalhost%3A8081%2Flra-coordinator%2F0_ffffac160002_8191_67452495_1b/0_ffffac160002_8191_67452495_1f, Narayana-LRA-Participant-Data=, User-Agent=Quarkus REST Client} # サービスB cancel "ServiceB: LRA-transaction(http://localhost:8081/lra-coordinator/0_ffffac160002_8191_67452495_1b) do cancel..
後続のサービスでエラーが起きて、 saga:compensate
が指定された後、両方のサービスの canel 呼び出しが行われています。
この通り、複数のマイクロサービスを横断する処理でどこかでエラーが起きたら、横断して各サービスの取り消し処理を呼び出せるのが Saga トランザクションの強みです。
まとめ
いかがだったでしょうか。
Camel を用いることで、制御が難しい Saga トランザクションがそこそこ実現できそうだなと思ってもらえたら幸いです。
同僚やまだ見知らぬ誰かにSaga やりたいと思った時に思い出してもらえれば嬉しいです。
Camel Routeの呼び出し先を、HTTPなどにすることで Java以外の言語で書かれた処理でも Saga トランザクションに参加できます。
Camel Routeの修正だけでメッセージ送信の実装を切り替えたり、ログを保存しておくといったことも柔軟にできます。
Camel Route をサイドカー的に差し込むようなことが基盤側で提供するなどの Saga トランザクションプラットフォームみたいな未来が描けたらワクワクしますね。