メインコンテンツへスキップ
Event Streams を使用すると、Auth0 のアイデンティティデータのコピーを、リレーショナルデータベース、データウェアハウス、検索インデックスなどの外部システムに保持できます。Auth0 でユーザープロファイルが作成、更新、または削除されると、イベントがストリームの送信先に配信され、外部システムにも同じ変更を反映できます。

アイデンティティデータを同期する理由

アイデンティティデータのローカルコピーを保持しておくと、次のような場合に役立ちます。
  • Management API を呼び出さずに、分析、レポート作成、コンプライアンスに関するクエリを実行する。
  • ユーザー属性を横断して低レイテンシで検索する必要がある検索機能を実現する。
  • アイデンティティレコードを他の業務データと結合するデータパイプラインにデータを供給する。
  • 障害復旧に備えて、ユーザープロファイルの状態をバックアップしておく。

仕組み

  1. ユーザープロファイルが変更されるたびに、Auth0 はイベントを発行します。
  2. Event Stream はそのイベントを送信先 (webhook、AWS EventBridge、または Auth0 Action) に配信します。
  3. ハンドラーはイベントタイプを確認し、外部システムに対応する書き込み処理を実行します。
データ同期に関連するイベントタイプは次のとおりです。
イベントタイプ発生するタイミング
user.createdAuth0 で新しいユーザープロファイルが作成されたとき。
user.updated既存のユーザープロファイルが更新されたとき。
user.deletedユーザープロファイルが Auth0 から削除されたとき。

前提条件

開始する前に、次のものを用意してください。
  • Events が有効な Auth0 テナント。プランの提供状況の詳細については、Create an Event Streamを参照してください。
  • user.createduser.updateduser.deleted をサブスクライブしているアクティブな Event Stream。詳細については、Create an Event Streamを参照してください。
  • ハンドラーから書き込み可能な外部データストア (例: PostgreSQL、MySQL、データウェアハウス) 。

データ同期を実装する

以下のセクションでは、各イベントタイプの処理方法を説明します。ハンドラー関数は、Event Stream の送信先に関係なく共通です。タイプ別にイベントをルーティングするセクションでは、webhook と Auth0 Action の両方の送信先に対してイベントを振り分ける方法を示します。

user.created を処理する

Auth0 が user.created イベントを発行したら、データベースに新しい行を追加します。
async function handleUserCreated(user, time) {
    const { user_id, email, name, nickname, created_at, updated_at } = user;

    const query = `
        INSERT INTO users (user_id, email, name, nickname, created_at, updated_at, last_event_processed)
        VALUES ($1, $2, $3, $4, $5, $6, $7)
    `;
    const values = [user_id, email, name, nickname, created_at, updated_at, time];

    await pool.query(query, values);
}

user.updated を処理する

Auth0 が user.updated イベントを発行したら、対応する行を更新します。古いデータで上書きされないように、イベントのタイムスタンプを last_event_processed 列と比較します。
async function handleUserUpdated(user, time) {
    const { user_id, email, name, nickname, updated_at } = user;

    const query = `
        UPDATE users
        SET email = $1, name = $2, nickname = $3, updated_at = $4, last_event_processed = $5
        WHERE user_id = $6 AND last_event_processed < $5
    `;
    const values = [email, name, nickname, updated_at, time, user_id];

    await pool.query(query, values);
}
イベントは順序どおりに届かないことがあります。古いデータによって新しいレコードが上書きされるのを防ぐため、更新を適用する前に必ずタイムスタンプを比較してください。詳しくは、イベントのベストプラクティスを参照してください。

user.deleted を処理する

Auth0 が user.deleted イベントを発行したら、対応する行を削除するか、ソフトデリートします。
async function handleUserDeleted(user) {
    const { user_id } = user;

    const query = `DELETE FROM users WHERE user_id = $1`;

    await pool.query(query, [user_id]);
}

イベントをタイプ別にルーティングする

トップレベルのルーターを使用して、各イベントを適切なハンドラーにディスパッチします。以下の例では、Webhook 宛先と Auth0 Action 宛先のイベントをルーティングする方法を示します。
app.post("/webhook", async (req, res) => {
    const { type, time, data } = req.body;
    const user = data.object;

    try {
        switch (type) {
            case "user.created":
                await handleUserCreated(user, time);
                break;
            case "user.updated":
                await handleUserUpdated(user, time);
                break;
            case "user.deleted":
                await handleUserDeleted(user);
                break;
            default:
                console.log(`Unhandled event type: ${type}`);
        }

        res.sendStatus(204);
    } catch (err) {
        console.error("Error processing event:", err);
        res.status(500).json({ error: "Internal server error" });
    }
});
HTTP 2XX レスポンスはできるだけ早く返してください。ハンドラーで時間のかかる処理を実行する必要がある場合は、イベントを内部キューに入れて非同期で処理してください。詳しくは、イベントのベストプラクティスを参照してください。

重複や順序の問題を防ぐ

Event Streams は少なくとも 1 回配信されるため、ハンドラーが同じイベントを複数回受信する可能性があります。これを安全に処理するには、次のようにします。
  • イベント ID を追跡します。 処理した各イベントの id を保存し、すでに処理済みのイベントはスキップします。
  • タイムスタンプを比較します。 各イベントペイロードの data.object には、created_at フィールドと updated_at フィールドが含まれます。これらのフィールドを使用して、受信したイベントがシステムにすでに記録されているものより新しいかどうかを判定します。
  • 冪等な書き込みを使用します。 同じイベントを 2 回適用しても結果が変わらないように、データベース操作を設計します。たとえば、PostgreSQL では INSERT ... ON CONFLICT DO UPDATE を使用します。
INSERT INTO users (user_id, email, name, nickname, created_at, updated_at, last_event_processed)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (user_id) DO UPDATE
SET email = EXCLUDED.email,
    name = EXCLUDED.name,
    nickname = EXCLUDED.nickname,
    updated_at = EXCLUDED.updated_at,
    last_event_processed = EXCLUDED.last_event_processed
WHERE users.last_event_processed < EXCLUDED.last_event_processed;

同期を確認する

ハンドラーをデプロイしたら、Auth0 でテストユーザーを作成し、次のことを確認します。
  1. 外部データベースに、正しいプロフィールデータを含む新しい行が作成されること。
  2. Auth0 でユーザーの名前またはメールアドレスを更新し、その変更がデータベースの行に反映されることを確認すること。
  3. Auth0 でユーザーを削除し、その行がデータベースから削除される (または削除済みとしてマークされる) ことを確認すること。
Event Streams のテストについて詳しくは、Event Testing, Observability, and Failure Recovery を参照してください。

詳しく見る