データ分散の課題を解決する「ScalarDB」のクイックスタートを試して使用感をレビュー

はじめに

従来のAI、および昨今の生成AIの技術革新により、企業の経営革新や価値創出においてAIの活用が注目されています。しかし、AIの導入にあたって、多くの企業では部門やシステムごとにデータが分散して蓄積されており、そのデータ統合が課題となっています。

弊社のAIプロジェクトにおいても、データの分散化により統合作業に多大な時間を要し、プロジェクトの遅延や、クライアントが求める短期での価値提供が困難になっているケースが発生しています。また、個人情報等機密性の高い情報については、生成AIのデータ元と使用したくてもデータ統合ができないというケースも発生しています。

これらの課題に対して、これまではアーキテクチャの再構築やソリューションの組み合わせにより対処してきました。しかし、各課題を包括的に解決できる可能性を感じ、現在「ScalarDB」の活用検証を進めております。

ScalarDBでは、以下の機能がロードマップに含まれております。

  • ベクトルデータベースのサポート
  • レコード単位でのデータアクセス制御の実現

これらの発展は、今後のAI活用の場面や弊社のAIプロジェクトにおける課題解決と非常に親和性が高く、現時点では、これらの課題をまとめて解決できる可能性があるソリューションとして、ScalarDBが最も有望な候補であると考えております。

データ分散における課題解決とScalarDBソリューションの親和性を簡潔に図示したものが以下となります。 データ分散化の課題を解決するScalarDBソリューション

本記事では、ScalarDBの概要、活用事例、そして実際の検証結果についてご紹介します。

ScalarDBとは?

ScalarDB は、複数のデータベースをミドルウェア上で統合し、データベース間にまたがるACIDトランザクション(※1)の管理やリアルタイム分析を実現することができるハイブリッドトランザクション/分析処理 (HTAP) エンジンです。

※1…トランザクションを定義するAtomicity(原子性)、Consistency(一貫性)、Isolation(独立性)、Durability(永続性)の4つの特性を持ったトランザクションのこと。

図 ScalarDBを利用したDBアクセスイメージ([1]より引用)

サポートしているDBは多岐に渡り、Microsoft SQL ServerMySQLOracle Database、PostgreSQLなどのリレーショナルデータベース(RDB)や、それらに互換性のあるAmazon AuroraGoogle AlloyDBはもちろんのこと、Amazon DynamoDBApache Cassandra、Azure Cosmos DB などの NoSQLデータベースについてもScalarDBで統合的に管理することができます。 (サポートするデータベースの詳細はこちら

また、今後は生成AIで扱われる非構造化データを格納するベクトルデータベース(Vector DB)のサポートや、ユーザ単位でのアクセス制御、集計用SQL操作の追加、分析のためのデータ仮想化も予定されており、更なる活用シーンの拡大が見込まれております。 (公式より発表されているロードマップはこちら

ScalarDB導入のメリット

1. 接続方式が異なるDBに統合的にアクセスが可能

接続方式(APIやコネクタ接続など)が異なる複数のDBにアクセスが必要となる場合、ScalarDBを導入することでアプリケーションからの接続方式を統一することができます。

例えばJDBC(※2)RDBAPIアクセスのNoSQLを一つのアプリケーションからアクセスする場合、一般的には以下のようにそれぞれのDBに対して複数の接続方式での経路を作成する必要があります。

※2…Java Database Connectivityの略。Java標準で提供しているRDBへの接続方式。

このようなケースでは、ScalarDBを導入することで、接続方式を統一することができます。
また、利用するアプリケーションや接続するDBが増加・リプレースされた場合でも、ScalarDBがハブとなることで、アプリケーション個別での対応の必要がなくなります。

2. データベース間でのデータ整合性をシンプルな実装で担保可能

近年増加しているマイクロサービスアーキテクチャ(※3)による複数サービスへのアクセスでは、サービス間のデータ整合性を担保するために、一般的にSagaパターン(※4)などの複雑なモデルを適用して実装を行う必要があります。
また、NoSQLデータベースの中にはACIDトランザクションを適用できないものも存在するため、データの整合性の担保は大きな課題となります。

※3…複数の小さなサービスを組み合わせて、ひとつの大きなサービスやサイトを構築する技法のこと。AmazonやLINEなどがこれに該当する。
※4…分散したシステム間において、全体でデータの整合性を保証する設計手法の一つ。各システム内で実行されるトランザクション間でメッセージやイベントをやり取りし、失敗した場合には補正のトランザクションが動くことにより、全体がロールバックされたのと同じ状態に戻すなどの制御を行う。

このようなケースでは、ScalarDBの2フェーズコミット(※5)を利用することで、単体ではACIDトランザクションを採用することができないDBであっても、複数データベースにまたがるACIDトランザクションを生成することができます。
これにより、シンプルな実装でデータの整合性を担保することができます。

※5…複数の独立したシステムで関連性のある処理を行う際に、整合性が保たれるように2段階に分けてコミットを行う手法。

活用事例

上記メリットをもとに、実際にScalarDBがどのように活用されているかを実例でいくつかご紹介します。

1.ScalarDB を用いた 検索拡張生成(RAG)/ 大規模言語モデル(LLM)構築

LLMの学習データが社内で分散し、タイムラグが発生することが課題となっていたクライアントに対して、LLM参照データの統合・リアルタイム化を実現しました。
サイロ化されていたデータをScalarDBにて統合し、2フェーズコミットでデータ整合性を担保しつつ、VectorDBをリアルタイムで最新化する仕組みとなっております。
これにより、LLMは常に最新の情報を参照することができ、リアルタイムに更新された情報をもとにLLMの応答を受領することができます。

2. 低コストでの顧客情報の統合

アプリケーションや部署ごとに管理していた顧客情報を、データ移行することなく統合することができます。 ScalarDBを導入することで、アプリケーションからは切り離して既存DBを流用した統合を行うことができ、顧客情報の統合における大幅なコストカットを実現しました。

3.老朽化システムの費用削減

メインフレームの老朽化、およびメインフレームに依存したDB維持費にかかるコストの大きさが課題となっていたクライアントに対して、マイクロサービス化によるコストの削減を実現しました。
移行難度が高くコストの高い一部機能のみをそのまま残し、NoSQL・RDBを組み合わせることでデータベース費用を従来の約1/10程度まで削減しております。

実装方法

ここからは、ScalarDBにて提供しているクイックスタート、およびハンズオンを利用して、Javaを利用した実装の流れをご紹介します。
実際に触ってみると、ScalarDBを利用するからと言って特別な実装をする必要はほとんどなく、一般的に利用されているMyBatisやHibernateなどのORM(Object-Relational Mapping)と似たような感覚で利用できることがご理解頂けるかと思います。
それでは始めていきましょう。

クイックスタートの実施

公式のクイックスタートにて、ScalarDB Coreライブラリを利用したJavaのサンプルアプリケーションを起動してみます。

1. 環境の準備

Java Development Kit (JDK)、およびDockerをインストールします。
必ず以下の条件を満たすバージョンを導入するようにしてください。
 JDKEclipse Temurin の OpenJDK LTS バージョン (8、11、17、または 21)
 Docker:Docker 20.10以降と Docker Compose V2以降

2. ScalarDBサンプルリポジトリのクローン

コマンドプロンプトでクローン先のディレクトリに移動し、以下のコマンドでScalarDBサンプルリポジトリをクローンします。
こちらをクローンして動かすことで、簡単にScalarDBの動きを確認することができます。

git clone https://github.com/scalar-labs/scalardb-samples
3. データベースの準備

ScalarDBで接続するためのDBをセットアップします。
今回は、Javaのプロジェクトで一般的に利用されているMySQLを利用します。

3-1. データベース(MySQL)をセットアップ

まずはScalarDBを介してアクセスするMySQLをセットアップします。
以下のコマンドを実行することで、Docker Composeを利用してすでにサンプルデータ等が登録されているMySQLを起動することができます。

docker compose up -d mysql

3-2. ScalarDB Schema Loaderをダウンロード

ScalarDBにMySQLの定義のインポートには、ScalarDB Schema Loaderというスキーマ作成ツールを利用します。
scalardb Releasesページに移動し、使用しているScalarDBのバージョンに一致するScalarDB Schema Loaderを scalardb-samples/scalardb-sampleディレクトリにダウンロードします。

3-3. MySQLの接続情報定義

クローンしたプロジェクト内のdatabase.propertiesを編集して、MySQLの接続情報を定義します。
手順の通りに進めていれば、元々記載のあったMySQLの定義箇所のコメント部分を解除した以下の定義で接続ができます。

# For MySQL
scalar.db.storage=jdbc
scalar.db.contact_points=jdbc:mysql://localhost:3306/
scalar.db.username=root
scalar.db.password=mysql

3-4. データベーススキーマのインポート

ScalarDBにMySQLのデータベーススキーマの定義をインポートします。
\<VERSION>の部分をダウンロードしたScalarDB Schema Loader のバージョンに置き換えて、次のコマンドを実行することでインポートが完了します。

java -jar scalardb-schema-loader-<VERSION>.jar --config database.properties --schema-file schema.json --coordinator

パラメータとして指定しているschema.json は、MySQLのテーブル定義と紐づける形で以下のように定義されております。

{
  "sample.customers": {
    "transaction": true,
    "partition-key": [
      "customer_id"
    ],
    "columns": {
      "customer_id": "INT",
      "name": "TEXT",
      "credit_limit": "INT",
      "credit_total": "INT"
    }
  },
  "sample.orders": {
    "transaction": true,
    "partition-key": [
      "customer_id"
    ],
    "clustering-key": [
      "timestamp"
    ],
    "secondary-index": [
      "order_id"
    ],
    "columns": {
      "order_id": "TEXT",
      "customer_id": "INT",
      "timestamp": "BIGINT"
    }
  },
  "sample.statements": {
    "transaction": true,
    "partition-key": [
      "order_id"
    ],
    "clustering-key": [
      "item_id"
    ],
    "columns": {
      "order_id": "TEXT",
      "item_id": "INT",
      "count": "INT"
    }
  },
  "sample.items": {
    "transaction": true,
    "partition-key": [
      "item_id"
    ],
    "columns": {
      "item_id": "INT",
      "name": "TEXT",
      "price": "INT"
    }
  }
}

3-5. 初期データをロード

以下のコマンドを実行して初期データをロードします。

./gradlew run --args="LoadInitialData"
4. アプリケーションの起動

ここまで進めれば、アプリケーションを起動する準備が完了です。
ここからは、実際にアプリケーションを起動し、データ取得やデータの更新を行ってみます。

4-1. DBからデータを取得 {#4-1.-dbからデータを取得}

以下のコマンドを実行して、ID が”1”である顧客に関する情報を取得してみます。

./gradlew run --args="GetCustomerInfo 1"

正しく実行できていれば、以下のjsonが返却されていることをコマンドラインにて確認できます。

...
{"id": 1, "name": "Yamada Taro", "credit_limit": 10000, "credit_total": 0}
...

4-2. DBのデータを更新 {#4-2.-dbのデータを更新}

以下のコマンドを実行して、ID が”1”である顧客でリンゴ3個とオレンジ2個を注文してみます。

./gradlew run --args="PlaceOrder 1 1:3,2:2"

正しく実行できていれば、注文が成功したことを示す以下のjsonが返却されていることをコマンドラインにて確認できます。
※order_idのUUIDは実行の都度発行となるので、実際の値とは相違があっても問題ありません。

...
{"order_id": "dea4964a-ff50-4ecf-9201-027981a1566e"}
...
5. 実装の説明

ここからは、上記クイックスタートにて実行したデータ参照、データ更新における実装を説明していきます。
ScalarDBを利用するからといって特に何か特別な処理を記載する必要はなく、一般的なDBアクセスと同じような考え方で実装できることがご理解頂けるかと思います。

5-1. データ取得

はじめに、4-1. DBからデータを取得 にて実施したGetCustomerInfoのデータ取得部分について説明します。
リクエストを受信後、GetCustomerInfoCommand.javaにてパラメータをCUSTOMER_IDとして受け取り、そちらを引数としてSample.getCustomerInfo(String) を呼び出しております。
呼び出し先の実装は以下のとおりです。
※説明しやすいように一部コメントを変更・追加しております。

public class Sample implements AutoCloseable {

  // ScalarDB Core内のトランザクションマネージャ
  private final DistributedTransactionManager manager;

  // --- 中略 ---

  // 呼び出されるメソッド
  public String getCustomerInfo(int customerId) throws TransactionException {
    DistributedTransaction transaction = null;
    try {
      // トランザクションを開始・・・(1)
      transaction = manager.start();

      // 顧客テーブルからレコードを取得・・・(2)
      Optional<Result> customer =
          transaction.get(
              Get.newBuilder()
                  .namespace("sample")  // ネームスペースを指定
                  .table("customers")  // テーブルを指定
                  .partitionKey(Key.ofInt("customer_id", customerId))  // パーティションキーを指定
                  .build());
      if (!customer.isPresent()) {
        // レコードがない場合の例外処理
        throw new RuntimeException("Customer not found");
      }
      // トランザクションのコミット・・・(3)
      transaction.commit();

      // 顧客情報をjson形式で返却
      return String.format(
          "{\"id\": %d, \"name\": \"%s\", \"credit_limit\": %d, \"credit_total\": %d}",
          customerId,
          customer.get().getText("name"),
          customer.get().getInt("credit_limit"),
          customer.get().getInt("credit_total"));
    } catch (Exception e) {
      if (transaction != null) {
        // 例外によりトランザクションが閉じられていない場合はトランザクションを終了
        transaction.abort();
      }
      throw e;
    }
  }

  // --- 中略 ---

}

実装のポイントは以下のとおりです。

(1)
DBアクセスの際には、DistributedTransactionManager#startメソッドにより、ScalarDBで管理するトランザクションを開始します。
ScalarDBでは、データの取得のみの場合でも明示的にトランザクションを開始する必要があります。

(2)
データ取得は、DistributedTransactionManager#getメソッドにて行います。
引数として
・namespace
・table
・partitionKey(およびclusteringKey)
を指定したGetオブジェクトを渡すことで、DB内のレコードを検索することができます。
(それぞれの指定要素については、公式でScalarDBのオブジェクトについて説明しているこちらのページご参照。)
ここでは、受け取ったcustomerIdを利用して、customersテーブルを検索します。
schema.jsonで指定した定義に則り
 namespace:sample
 table:customers
 partitionKey(customer_id):引数で受け取ったcustomerId
を指定しています。

(3)
トランザクションのコミットを行います。
ScalarDBではRead-only(検索のみ)でも明示的なトランザクションの開始/終了が必要となるため、更新等がなくても必ずコミットを行うようにしてください。

5-2. データ更新

次に、4-2. DBのデータを更新 にて実施したPlaceOrderのデータ更新部分について説明します。
リクエストを受信後、PlaceOrderCommand.javaにてパラメータをCUSTOMER_ID、およびORDERSとして受け取り、そちらを引数としてSample.placeOrder(int, int, int) を呼び出しております。
呼び出し先の実装は以下のとおりです。
※こちらも同様、説明しやすいように一部コメントを変更・追加しております。

public class Sample implements AutoCloseable {

  // ScalarDB Core内のトランザクションマネージャ
  private final DistributedTransactionManager manager;

  // --- 中略 ---

  // 呼び出されるメソッド
  public String placeOrder(int customerId, int[] itemIds, int[] itemCounts)
      throws TransactionException {
    assert itemIds.length == itemCounts.length;
    DistributedTransaction transaction = null;
    try {

      // トランザクションを開始・・・(1)
      transaction = manager.start();

      // 注文IDを生成
      String orderId = UUID.randomUUID().toString();

      // 注文テーブルに注文内容を登録・・・(2)
      transaction.insert(
          Insert.newBuilder()
              .namespace("sample")
              .table("orders")
              .partitionKey(Key.ofInt("customer_id", customerId))
              .clusteringKey(Key.ofBigInt("timestamp", System.currentTimeMillis()))
              .textValue("order_id", orderId)
              .build());

      // 合計金額
      int amount = 0;
 
      for (int i = 0; i < itemIds.length; i++) {
        int itemId = itemIds[i];
        int count = itemCounts[i];

        // 注文明細テーブルに注文明細を登録
        transaction.insert(
            Insert.newBuilder()
                .namespace("sample")
                .table("statements")
                .partitionKey(Key.ofText("order_id", orderId))
                .clusteringKey(Key.ofInt("item_id", itemId))
                .intValue("count", count)
                .build());

        // 商品テーブルから商品情報を取得
        Optional<Result> item =
            transaction.get(
                Get.newBuilder()
                    .namespace("sample")
                    .table("items")
                    .partitionKey(Key.ofInt("item_id", itemId))
                    .build());
        if (!item.isPresent()) {
          throw new RuntimeException("Item not found");
        }

        // 商品の注文数での合計金額を計算
        amount += item.get().getInt("price") * count;
      }

      // 限度額を超過していないか確認
      Optional<Result> customer =
          transaction.get(
              Get.newBuilder()
                  .namespace("sample")
                  .table("customers")
                  .partitionKey(Key.ofInt("customer_id", customerId))
                  .build());
      if (!customer.isPresent()) {
        throw new RuntimeException("Customer not found");
      }
      int creditLimit = customer.get().getInt("credit_limit");
      int creditTotal = customer.get().getInt("credit_total");
      if (creditTotal + amount > creditLimit) {
        throw new RuntimeException("Credit limit exceeded");
      }

      // 支払額の合計を更新・・・(3)
      transaction.update(
          Update.newBuilder()
              .namespace("sample")
              .table("customers")
              .partitionKey(Key.ofInt("customer_id", customerId))
              .intValue("credit_total", creditTotal + amount)
              .build());

      // トランザクションをコミットして注文IDを返却・・・(4)
      transaction.commit();

      return String.format("{\"order_id\": \"%s\"}", orderId);

    } catch (Exception e) {
      if (transaction != null) {
        // エラーが発生したらトランザクションを終了
        transaction.abort();
      }
      throw e;
    }
  }

  // --- 中略 ---

}

実装のポイントは以下のとおりです。

(1)
データ取得の場合と同様に、DistributedTransactionManager#startメソッドにより、ScalarDBで管理するトランザクションを開始します。

(2)
データの登録は、DistributedTransaction#insertメソッドにて行います。
引数として
・namespace
・table
・partitionKey(およびclusteringKey)
・各列の値(textValueやintValue)
を指定したInsertオブジェクトを渡すことで、レコードを登録することができます。

ここでは、ordersテーブルに対してレコードを登録しています。
schema.jsonで指定した定義に則り
 namespace:sample
 table:orders
 partitionKey(customer_id):引数で受け取ったcustomerId
 clusteringKey(timestamp):現在日時
 order_id:引数で受け取ったorderId
でレコードを登録しています。

(3)
データの更新は、DistributedTransaction#updateメソッドにて行います。
引数として
・namespace
・table
・partitionKey(およびclusteringKey)
・各列の値(textValueやintValue)
を指定したUpdateオブジェクトを渡すことで、レコードを更新することができます。

ここでは、customersテーブルに対してレコードを更新しています。
schema.jsonで指定した定義に則り
 namespace:sample
 table:customers
 partitionKey(customer_id):引数で受け取ったcustomerId
 credit_total:更新前の支払い残額+注文した商品の合計額
でレコードを更新しています。

(4)
トランザクションのコミットを行います。
このコミットにより、トランザクションを開始してから行われたDBの変更内容が全て確定されます。

6. 2フェーズコミットを実装

次に、サンプルアプリケーションを用いた2フェーズコミットを使用してみましょう。
ここからは、先ほどのサンプルアプリケーションをカスタマイズする形で実装していきます。
(公式の2フェーズコミットトランザクションを参考に進めていきます。)

6-1. 2フェーズコミットとは

2フェーズコミットとは、複数の分散したトランザクション(以降、分散トランザクション)に対して、全てが成功した場合にコミット、一つでも失敗したら全てロールバックといったように、分散トランザクションにおいてデータの整合性を担保するための実装方式になります。
分散トランザクションを取りまとめる親のようなトランザクションをコーディネータ、それぞれの分散トランザクションを参加者と呼び、準備フェーズで分散トランザクションへ処理の実行を指示し、全てが成功したタイミングで確定フェーズとして分散トランザクションへのコミット指示が行われます。
ScalarDBでも、上記の考え方に基づき2フェーズコミットの実装を行います。

6-2. 2フェーズコミットを実装

では、実際に実装していきましょう。
通常は異なるシステムの呼び出しや異なるDB間でのデータ整合性の担保を目的に利用しますが、今回は先ほどの通常トランザクションとの実装の違いをわかりやすくするために、4-2. DBのデータを更新 にて実施した処理をそのまま2フェーズコミットに変更してみます。
(実際の利用シーンを想定した実装については、次回説明させて頂きます。)

今回は、注文内容を登録するまでを一つのトランザクション、商品情報を取得して支払い金額を更新するところまでをもう一つのトランザクションとして実装を修正しました。
カスタマイズ後のサンプルアプリケーションは以下となります。

public class Sample implements AutoCloseable {

  // もともと定義していたトランザクションマネージャ
  private final DistributedTransactionManager manager;

  // 2フェーズコミットに利用するトランザクションマネージャを追記・・・(1)
  private final TwoPhaseCommitTransactionManager twoPhaseManager;

  public Sample() throws IOException {
    // トランザクションマネージャを生成
    TransactionFactory factory = TransactionFactory.create("database.properties");

     // もともと定義していたトランザクションマネージャ
    manager = factory.getTransactionManager();

    // 2フェーズコミットに利用するトランザクションマネージャを追記・・・(1)
    twoPhaseManager = factory.getTwoPhaseCommitTransactionManager();
  }

  // --- 中略 ---

  // 呼び出されるメソッド
  public String placeOrder(int customerId, int[] itemIds, int[] itemCounts)
      throws TransactionException {
    assert itemIds.length == itemCounts.length;
    // 2フェーズコミット用に2つのトランザクションを定義して開始・・・(2)
    TwoPhaseCommitTransaction transaction1 = null;
    TwoPhaseCommitTransaction transaction2 = null;

    try {
      String orderId = UUID.randomUUID().toString();
      transaction1 = twoPhaseManager.begin();
      transaction2 = twoPhaseManager.join(transaction1.getId());

      // 注文テーブルに注文内容を登録・・・(3)
      transaction1.insert(
          Insert.newBuilder()
              .namespace("sample")
              .table("orders")
              .partitionKey(Key.ofInt("customer_id", customerId))
              .clusteringKey(Key.ofBigInt("timestamp", System.currentTimeMillis()))
              .textValue("order_id", orderId)
              .build());

      // 合計金額
      int amount = 0;
      for (int i = 0; i < itemIds.length; i++) {
        int itemId = itemIds[i];
        int count = itemCounts[i];

        // 注文明細テーブルに注文明細を登録
        transaction1.insert(
            Insert.newBuilder()
                .namespace("sample")
                .table("statements")
                .partitionKey(Key.ofText("order_id", orderId))
                .clusteringKey(Key.ofInt("item_id", itemId))
                .intValue("count", count)
                .build());

        // 商品テーブルから商品情報を取得・・・(4)
        Optional<Result> item =
                transaction2.get(
                Get.newBuilder()
                    .namespace("sample")
                    .table("items")
                    .partitionKey(Key.ofInt("item_id", itemId))
                    .build());
        if (!item.isPresent()) {
          throw new RuntimeException("Item not found");
        }
        // 商品の注文数での合計金額を計算
        amount += item.get().getInt("price") * count;
      }

      // 限度額を超過していないか確認
      Optional<Result> customer =
          transaction2.get(
              Get.newBuilder()
                  .namespace("sample")
                  .table("customers")
                  .partitionKey(Key.ofInt("customer_id", customerId))
                  .build());
      if (!customer.isPresent()) {
        throw new RuntimeException("Customer not found");
      }
      int creditLimit = customer.get().getInt("credit_limit");
      int creditTotal = customer.get().getInt("credit_total");
      if (creditTotal + amount > creditLimit) {
        throw new RuntimeException("Credit limit exceeded");
      }

      // 支払額の合計を更新
      transaction2.update(
          Update.newBuilder()
              .namespace("sample")
              .table("customers")
              .partitionKey(Key.ofInt("customer_id", customerId))
              .intValue("credit_total", creditTotal + amount)
              .build());

      // 準備フェーズ・・・(5)
      transaction1.prepare();
      transaction2.prepare();

      // トランザクションの検証・・・(6)
      transaction1.validate();
      transaction2.validate();

      // コミット・・・(7)
      transaction1.commit();
      transaction2.commit();
      
      // 注文IDを返却
      return String.format("{\"order_id\": \"%s\"}", orderId);
      
    } catch (Exception e) {
      // エラーが起きたらトランザクションを中断(ロールバック)・・・(8)
      if (transaction1 != null) {
          transaction1.abort();
      }
      if (transaction2 != null) {
          transaction2.abort();
        }
      throw e;
    }
  }

  // --- 中略 ---

}

実装のポイントは以下のとおりです。

(1)
2フェーズコミットを利用する場合、マネージャとしてDistributedTransactionManagerを利用します。
DistributedTransactionManagerはTransactionFactory#getTwoPhaseCommitTransactionManagerメソッドにて生成することができます。

(2)
2フェーズコミットのトランザクションのコーディネータはTwoPhaseCommitTransactionManager#beginメソッドにて開始できます。
ここに参加者を追加する場合は、TwoPhaseCommitTransactionManager#joinメソッドを利用します。
引数には、コーディネータとなるトランザクションのIDを指定してください。

(3)
コーディネータのトランザクションとして、注文内容を登録するまでの処理を記載します。
修正する前はDistributedTransactionを利用してデータ操作を行っておりましたが、今回はコーディネータトランザクションとなるTwoPhaseCommitTransactionを利用しております。
なお、データ操作のAPIの呼び出し方法については、DistributedTransactionと同様で問題ありません。

(4)
参加者のトランザクションとして、商品情報を取得して支払い金額を更新するまでの処理を記載します。
コーディネータと同様に、データ操作を行う処理を参加者のトランザクションに変更しております。
参加者トランザクションでも同様に、DistributedTransactionと同様の実装で問題ありません。

(5)
トランザクションの処理が終わったら準備フェーズとしてTwoPhaseCommitTransaction#prepareメソッドを呼び出します。
複数のトランザクションが存在する場合にはそれぞれのトランザクションで明示的に呼び出しをしてください。

(6)
TwoPhaseCommitTransaction#prepareメソッドの呼び出し後、各トランザクションの検証のためTwoPhaseCommitTransaction#validateメソッドを呼び出します。
こちらの処理により、各トランザクションが成功しているかを確認することができます。
複数のトランザクションが存在する場合にはそれぞれのトランザクションで明示的に呼び出しをしてください。

(7)
例外が発生しなかった場合は各トランザクションが成功しているとし、トランザクションをコミットします。
複数のトランザクションが存在する場合にはそれぞれのトランザクションで明示的に呼び出しをしてください。
なお、準備〜コミットについては、性能の改善のために並列処理で実施することもできます。
(詳細はこちらを参照。)

(8)
エラーが発生した場合には、全てのトランザクションロールバック(中断)する必要があります。
複数のトランザクションが存在する場合にはそれぞれのトランザクションで明示的に呼び出しをしてください。

実際に触ってみた所感

実装方法の冒頭でもお伝えした通り、ScalarDBの導入に際し、大幅な工数の増加や学習コストがかかることなくスムーズに導入することができました。
導入の手軽さやメリットを踏まえると、拡張性を考えて「とりあえずScalarDBを利用してアプリケーションを構築する」ということも、今後は選択肢になるかと思います。
特に、マイクロサービスを拡張していく想定がある場合や、独立したシステム群の一部としてアプリケーションを新規構築する場合には、後の拡張性において大きなメリットになりうると感じました。

今回ライブラリとして導入したScalarDBによるトランザクション管理以外にも、Kubernetes クラスターを利用したScalarDB ClusterによるScalarDBへのネットワーク経由でのアクセスや、ScalarDB Analytics with Sparkによる分析クエリの実行など、用途に応じて導入できる製品が存在します。
次回以降はそちらも実際に触ってみた上で紹介しますので、導入のきっかけや参考にして頂ければと思います。

おわりに

今回は、ScalarDBの概要、ユースケース、クイックスタートを利用したScalarDBの紹介を行いました。 今後、このブログの中でScalarについてい色々発信していきたいと考えています。現在、Ridge-iの事業範囲が大きく拡大しつつあり、一緒に開発・研究してくださる仲間を募集しています。 カジュアル面談から可能なので、ぜひお気軽にご連絡ください!また、分散したデータ環境に課題を感じている場合は、お気軽にご相談ください。

ridge-i.com

引用文献

[1] ScalarDB Overview, 閲覧日時:2025年1月8日
https://scalardb.scalar-labs.com/ja-jp/docs/latest/overview

参考記事

ScalarDB公式ドキュメント, 閲覧日時:2025年1月18日
https://scalardb.scalar-labs.com/ja-jp/docs/3.14/

2フェーズコミット, 閲覧日時:2025年1月19日
https://e-words.jp/w/2%E3%83%95%E3%82%A7%E3%83%BC%E3%82%BA%E3%82%B3%E3%83%9F%E3%83%83%E3%83%88.html

ScalarDB体験記, 閲覧日時:2025年1月19日
https://qiita.com/yebihara/items/d47f3e63fe213edfe917