Article

イベント駆動を3年やって Event Sourcingをやめた理由

高田晃太郎
イベント駆動を3年やって Event Sourcingをやめた理由

Fortune 100の60%以上がKafkaを利用しているとConfluent社は公表し¹、LinkedInの事例ではKafkaが日次で1兆件超のメッセージを処理することが知られています²。メッセージで非同期に連携する「イベント駆動(event-driven)」は主流化し、私は3年前に状態をイベント列で再生する保存法である「Event Sourcing(以下ES)」をコアに据えた設計を採用しました。ところが運用を重ねるほど、重複配信や順序の入れ替わり、再投影(イベントから読み取りモデルを作り直す処理)に伴う不整合といった現実的な論点が前面に出てきます。可観測性のダッシュボードが緑でも、現場の心拍はしばしば赤いまま。理論の美しさと運用のコストが乖離し、価値と負担のバランスが崩れていく感覚が強まりました。

誤解のないように言えば、イベント駆動という設計思想そのものをやめたわけではありません。境界をメッセージで越える発想は依然として強力です。ただし、集約の永続化をイベント履歴だけに依拠するESをシステム全域に適用する判断は見直しました。以下では、そう考えるに至った理由を具体例で示し、リスクを抑えながらCRUDに基づく保存とOutboxパターン+CDC(Change Data Capture)で配信品質を担保するアプローチへ移す道筋を、実装片と観測指標の観点から整理します。

イベント駆動は正解だったが、Event Sourcingは全域ではなかった

まず用語をそろえます。イベント駆動は、サービス間の結合度を下げつつ非同期に連鎖処理を進める設計スタイルの総称です。一方ESは、集約の現在状態を「起きた出来事(イベント)」の逐次記録から再生する保存戦略を指します。両者は重なりますが同義ではありません。

私の経験では、注文・決済・在庫のような主要ドメインにイベントストアを導入し、プロジェクション(読み取りモデルの投影)を構築、スナップショットと再投影でスケールを図る構成は、監査性の高さや過去時点の再現、機能フラグやスキーマ進化への柔軟さといった利点を確かに生みます。一方で、読み取り側に蓄積される複雑さ、障害時の復旧に要する時間、スキーマ進化のたびに発生する再投影コストは、負荷の増加に伴って非線形に膨らみがちでした。

3年で得た価値は何か

イベント単位の完全な監査証跡は、決済不一致の追跡や遡及調整に大きく効きます。過去時点を忠実に再現できることは、データ分析やシミュレーションにも寄与します。書き込み経路における集約の整合性は、楽観的ロックとイベント検証で堅牢に保てます。以下はPostgreSQLをイベントストアとして用い、同一バージョンでの追記を防ぐ実装例です。

import { Pool, PoolClient } from 'pg';
import { randomUUID } from 'crypto';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

type Event = {
  id: string;
  aggregateId: string;
  version: number;
  type: string;
  payload: unknown;
  occurredAt: string;
};

export async function appendEvents(
  client: PoolClient,
  aggregateId: string,
  expectedVersion: number,
  newEvents: Omit<Event, 'id' | 'occurredAt' | 'version'>[]
) {
  await client.query('BEGIN');
  try {
    const { rows } = await client.query(
      'SELECT COALESCE(MAX(version), 0) as v FROM events WHERE aggregate_id = $1 FOR UPDATE',
      [aggregateId]
    );
    const currentVersion = Number(rows[0].v);
    if (currentVersion !== expectedVersion) {
      throw new Error(`ConcurrencyError: expected ${expectedVersion}, got ${currentVersion}`);
    }
    for (let i = 0; i < newEvents.length; i++) {
      const ev = newEvents[i];
      await client.query(
        'INSERT INTO events (id, aggregate_id, version, type, payload, occurred_at) VALUES ($1,$2,$3,$4,$5, NOW())',
        [randomUUID(), aggregateId, expectedVersion + i + 1, ev.type, JSON.stringify(ev.payload)]
      );
    }
    await client.query('COMMIT');
  } catch (e) {
    await client.query('ROLLBACK');
    throw e;
  }
}

export async function withTx(fn: (c: PoolClient) => Promise<void>) {
  const client = await pool.connect();
  try {
    await fn(client);
  } finally {
    client.release();
  }
}

この構造により、並行更新の衝突をアプリケーションレベルで明確に扱えます。書き込み側の局所的な複雑さは、得られる監査性の対価として妥当な範囲に収まります。

3年で積み上がったコストはどこに現れたか

課題は読み取り側に集中しました。再投影は理論上は単純でも、現実にはスループットと順序保証、重複排除、失敗時の再開点管理が絡み合います。KafkaやSQSのアットリーストワンス配信を前提とすると、プロジェクタに冪等性と順序制御を組み込む必要があり、ビジネスロジックと運用ロジックが絡み合って変更が重くなりがちです。次は重複排除用にinboxテーブルを持つプロジェクタの例です。

import { Kafka } from 'kafkajs';
import { Pool } from 'pg';

const kafka = new Kafka({ brokers: [process.env.KAFKA_BROKER!] });
const consumer = kafka.consumer({ groupId: 'orders-projector' });
const pool = new Pool({ connectionString: process.env.DATABASE_URL });

async function handle(message: { key: string; value: Buffer }) {
  const eventId = message.key.toString();
  const ev = JSON.parse(message.value.toString());
  const client = await pool.connect();
  try {
    await client.query('BEGIN');
    const seen = await client.query('SELECT 1 FROM inbox WHERE event_id = $1', [eventId]);
    if (seen.rowCount) {
      await client.query('COMMIT');
      return; // idempotent
    }
    // apply projection with cautious ordering
    await client.query('UPDATE order_read SET status = $1 WHERE order_id = $2', [ev.status, ev.orderId]);
    await client.query('INSERT INTO inbox(event_id, processed_at) VALUES ($1, NOW())', [eventId]);
    await client.query('COMMIT');
  } catch (e) {
    await client.query('ROLLBACK');
    throw e;
  } finally {
    client.release();
  }
}

この程度の冪等性でも、障害時に観測できない二重適用や順序逆転は紛れ込みます。ピークトラフィックでは投影遅延が数分〜数十分に伸び、復旧にも相応の時間がかかることが珍しくありません。特に再投影は厄介で、四半期単位のスキーマ変更に伴う全域再投影は長時間バッチに発展し、夜間メンテナンスの枠を超えるケースも見られました。

やめた理由の核心は、認知負荷と運用コストの非線形な増大

決定打は、開発者の主作業がドメインモデリングから運用ロジックの複合問題へと移っていったことです。ビジネスの問いの多くは「今どうなっているか」「この条件の集合はいくつか」といった集計や現在値の探索です。イベントの合成で表現できても、SQL一発で済む思考に比べて負担が大きくなりがちです。再投影は駆動電圧が高く、カナリアや段階リリースを施しても、失敗時の巻き戻しと再開点の検証に時間を取られます。決算・監査が近づくほど異常値の説明責任が増し、投影器のバージョン互換や歴史的イベントの意味付けが重くのしかかりました。

スキーマ進化も壁です。イベントに「意味的互換性」を保つ原則を置いても、長寿命のコンシューマを完全に守るのは現実的ではありません。イベントにバージョンを重ね、コンシューマに分岐を実装し、アーカイブを切り替えていくうちに、コードベースは過去との握手で埋まっていきます。再投影時間が業務メンテナンスの枠を越え始めると、顧客影響リスクを経営として無視できなくなります。

スナップショットによる緩和も有効範囲は限定的です。以下は集約のスナップショット保存とリカバリの典型です。

import { Pool } from 'pg';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

type OrderState = { id: string; version: number; status: string };

export async function loadOrder(aggregateId: string): Promise<OrderState> {
  const client = await pool.connect();
  try {
    const snap = await client.query('SELECT state, version FROM order_snapshot WHERE id = $1', [aggregateId]);
    let state: OrderState = snap.rowCount ? snap.rows[0].state : { id: aggregateId, version: 0, status: 'NEW' };
    const events = await client.query(
      'SELECT type, payload FROM events WHERE aggregate_id = $1 AND version > $2 ORDER BY version ASC',
      [aggregateId, state.version]
    );
    for (const row of events.rows) {
      state = apply(state, row.type, row.payload);
    }
    return state;
  } finally {
    client.release();
  }
}

function apply(state: OrderState, type: string, payload: any): OrderState {
  switch (type) {
    case 'OrderConfirmed':
      return { ...state, version: state.version + 1, status: 'CONFIRMED' };
    default:
      return { ...state, version: state.version + 1 };
  }
}

スナップショットは読み出し性能を押し上げますが、根本的な再投影コストや意味的互換性の複雑さを解消するわけではありません⁵。現場の負債が減らなければ、オンボーディングや横断保守の難易度は徐々に上がります。

再投影を速くするために並列化とチェックポイント制御を強化する局面もあります。以下のような再投影ジョブは確かに速くなりますが、コードと運用のカップリングが強まり、失敗時の巻き戻しが難しくなりました。

import { Pool } from 'pg';
import PQueue from 'p-queue';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const queue = new PQueue({ concurrency: 8 });

async function reprojectAll() {
  const client = await pool.connect();
  try {
    const { rows } = await client.query('SELECT DISTINCT aggregate_id FROM events');
    let processed = 0;
    await Promise.all(rows.map((r: any) => queue.add(() => reprojectOne(r.aggregate_id).then(() => { processed++; }))));
    return processed;
  } finally {
    client.release();
  }
}

async function reprojectOne(id: string) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');
    await client.query('DELETE FROM order_read WHERE order_id = $1', [id]);
    const events = await client.query('SELECT type, payload FROM events WHERE aggregate_id = $1 ORDER BY version ASC', [id]);
    let state: any = { id, status: 'NEW' };
    for (const e of events.rows) state = apply(state, e.type, e.payload);
    await client.query('INSERT INTO order_read(order_id, status) VALUES ($1,$2)', [id, state.status]);
    await client.query('COMMIT');
  } catch (e) {
    await client.query('ROLLBACK');
    throw e;
  } finally {
    client.release();
  }
}

最適化すればするほど、ドメインの変更容易性が落ちていく逆説は見過ごせません。ここで私は、永続化戦略としてのESの全域適用をやめ、代わりにCRUDをベースに、配信の信頼性はOutboxとCDCで担保する構成へ舵を切る判断を提案します³⁴。

どうやってやめたか――CRUD+Outbox+CDCへの移行

移行の基本方針は、書き込み経路の単純化と、配信保証をアプリケーションから切り出すことです。集約の状態は正規化テーブルに直接保存し、同一トランザクションでアウトボックスに「公開予定イベント」を記録します³。外部ブローカーへの送信は非同期ワーカーが担い、失敗時は再送制御で吸収します。加えてDebeziumなどのCDCを用意し、アウトボックス経由とCDC経由の二経路を切り替え可能にしておくと、段階的移行や障害時の退避が容易になります⁴。

以下は、注文テーブル更新とアウトボックス記録を同一トランザクションにまとめた例です(Outboxパターン)。

import { Pool } from 'pg';
import { randomUUID } from 'crypto';

const pool = new Pool({ connectionString: process.env.DATABASE_URL });

type OrderUpdate = { orderId: string; status: string };

export async function updateOrderWithOutbox(u: OrderUpdate) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN');
    await client.query('UPDATE orders SET status = $1, updated_at = NOW() WHERE id = $2', [u.status, u.orderId]);
    await client.query(
      'INSERT INTO outbox(id, topic, key, payload, created_at) VALUES ($1,$2,$3,$4,NOW())',
      [randomUUID(), 'orders', u.orderId, JSON.stringify({ type: 'OrderStatusChanged', ...u })]
    );
    await client.query('COMMIT');
  } catch (e) {
    await client.query('ROLLBACK');
    throw e;
  } finally {
    client.release();
  }
}

送信ワーカーはアウトボックスをポーリングし、配信後にマークします。配信失敗時は指数バックオフとデッドレタを用意し、重複送信はコンシューマ側の冪等性で吸収します³。

import { Kafka } from 'kafkajs';
import { Pool } from 'pg';

const kafka = new Kafka({ brokers: [process.env.KAFKA_BROKER!] });
const producer = kafka.producer();
const pool = new Pool({ connectionString: process.env.DATABASE_URL });

async function pump() {
  await producer.connect();
  while (true) {
    const client = await pool.connect();
    try {
      await client.query('BEGIN');
      const { rows } = await client.query(
        'SELECT id, topic, key, payload FROM outbox WHERE sent_at IS NULL ORDER BY created_at ASC LIMIT 100 FOR UPDATE SKIP LOCKED'
      );
      for (const row of rows) {
        await producer.send({ topic: row.topic, messages: [{ key: row.key, value: row.payload }] });
        await client.query('UPDATE outbox SET sent_at = NOW() WHERE id = $1', [row.id]);
      }
      await client.query('COMMIT');
      await new Promise(r => setTimeout(r, rows.length ? 10 : 200));
    } catch (e) {
      await client.query('ROLLBACK');
      await new Promise(r => setTimeout(r, 1000));
    } finally {
      client.release();
    }
  }
}

同時にCDCの準備を進め、アウトボックスを介さずにDBの変更からイベントを起こせるようにします。設定例は次の通りです⁴。

{
  "name": "orders-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "******",
    "database.dbname": "app",
    "plugin.name": "pgoutput",
    "slot.name": "orders_slot",
    "table.include.list": "public.orders",
    "topic.prefix": "cdc",
    "tombstones.on.delete": "false"
  }
}

HTTP層では冪等性キーを導入し、クライアントのリトライや二重送信を抑えます。次はシンプルな冪等化の例です。

import express from 'express';
import { Pool } from 'pg';

const app = express();
app.use(express.json());
const pool = new Pool({ connectionString: process.env.DATABASE_URL });

app.post('/orders/:id/status', async (req, res) => {
  const key = req.header('Idempotency-Key');
  if (!key) return res.status(400).send('Missing Idempotency-Key');
  const client = await pool.connect();
  try {
    await client.query('BEGIN');
    const seen = await client.query('SELECT response FROM idem WHERE key = $1 FOR UPDATE', [key]);
    if (seen.rowCount) {
      await client.query('COMMIT');
      return res.status(200).json(seen.rows[0].response);
    }
    const { id } = req.params;
    const { status } = req.body;
    await client.query('UPDATE orders SET status = $1, updated_at = NOW() WHERE id = $2', [status, id]);
    const response = { ok: true };
    await client.query('INSERT INTO idem(key, response) VALUES ($1,$2)', [key, JSON.stringify(response)]);
    await client.query('COMMIT');
    res.json(response);
  } catch (e) {
    await client.query('ROLLBACK');
    res.status(500).send('error');
  } finally {
    client.release();
  }
});

この構成に切り替えると、書き込みレイテンシは短縮されやすく、再投影に起因するインシデントも構造的に減らせます。エンドツーエンドのレイテンシ分布が安定し、ピーク時に伸びるロングテールも抑え込みやすくなります。結果として、チームの認知負荷も下がり、オンボーディングの所要期間が短くなることが期待できます。

何を残し、何を捨てたか

ESを完全に捨てるのではなく、選択と集中が肝要です。取引の監査性が最重要のドメインでは、イベント履歴を一次記録として維持しつつ、読み取りはCDCや専用の読み取りモデルで賄うハイブリッドも有効です⁴。一方、在庫のように「現在値」が主眼のドメインはCRUDへ寄せ、過去時点の再現は監査ログやスナップショットの併用で充足させる、といった切り分けが現実的です。境界ごとに目的を言語化し、「保存戦略」と「配信戦略」を分離することが、設計の自由度を取り戻す鍵になります。

判断基準を言語化する――やめる/続けるの境界

合意形成を容易にするため、判断軸を観測可能なメトリクスに落とし込みます。以下は一例です。

  • ビジネスの問いの大半が現在値や集計で答えられるなら、保存戦略はまずCRUDを第一候補にする。
  • 再投影に要する時間が業務メンテナンス枠を越え始めたら、ESの全域適用は長期的保守性を損なうシグナルとみなす。
  • 新規メンバーがドメイン理解に到達するまでの期間が伸び続けているなら、認知負荷の過剰を疑い構造の単純化を検討する。
  • 監査要件が厳しくても、一次ソースがイベントである必然性が薄いなら、CDCや監査ログなどの代替で要件を満たせるか先に検討する⁴。
  • 障害対応のMTTRが流量増加に対して非線形に伸びる兆候が見えたら、運用コストが価値を凌駕し始めていると判断する。

この基準で棚卸しを進めると、「ESが不可欠な領域」は想像より狭く、一方でイベント駆動そのものはむしろ健全に機能します。配信保障と順序制御はミドルウェアとアーキテクチャパターン(Outbox、CDC、デッドレタなど)に委ね、アプリケーションはドメインに集中する。この役割分離が、速度と品質の両立に効きます³⁴。

まとめ――設計は手段、価値は顧客にある

3年間の実践から学んだのは、アーキテクチャは信念ではなく仮説だということです。ESは強力な手段ですが、価値を生む範囲は有限です。私はイベント駆動を続けながら保存戦略を見直し、CRUD+Outbox+CDCで開発と運用の摩擦を下げる道を選びました。あなたのシステムでは、再投影時間、MTTR、オンボーディング日数はどう変化していますか。まず現状を数字で可視化し、小さな境界から保存戦略を試してみてください。次の四半期のOKRに、ひとつだけ測定可能な改善目標を置く――その一歩が、チームの自由度と顧客価値の最大化につながります。

参考文献

  1. Confluent, Inc. Press Release (2019). Confluent Propels Data Architecture into Event Streaming Era with $125 Million Series D. https://www.confluent.io/press-release/confluent-propels-data-architecture-into-event-streaming-era-with-125-million-series-d/
  2. Kiran Oliver. Streaming Data at LinkedIn: Apache Kafka Reaches 1.1 Trillion Messages Per Day. The New Stack (2015-09-03). https://thenewstack.io/streaming-data-at-linkedin-apache-kafka-reaches-1.1-trillion-messages-per-day/
  3. Flavio Juvenal. Reliable Microservices Data Exchange with the Outbox Pattern. Debezium Blog (2019-02-19). https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/
  4. Eric Murphy. Event Sourcing vs. Change Data Capture: Why You Should Care. Debezium Blog (2020-02-10). https://debezium.io/blog/2020/02/10/event-sourcing-vs-cdc/
  5. EventSourcingDB Documentation. Snapshots and Performance (accessed latest). https://docs.eventsourcingdb.io/best-practices/snapshots-and-performance/