TalentX Tech Blog

Tech Blog

Aurora MySQLのデータをSnowflakeへ集約する基盤を構築した話

はじめに

こんにちは、SREチームの前野です。 現在SREチームでは、TalentXにおけるデータドリブンな意思決定を加速させるため、全社のデータを集約する分析基盤の構築を進めています。

本記事では、その最初のユースケースである「プロダクト(Myシリーズ)のデータベースのデータをSnowflakeへ集約し、PdMチームが機能要件策定に活かすためのデータ分析基盤の構築」について、具体的なアーキテクチャと実践的なTipsを紹介します。

導入前の2つの課題

これまではOSSのBIツールであるMetabaseを使用し、各プロダクトのデータベースに対して直接クエリを実行してダッシュボードを作成していました。しかし、この構成には2つの課題がありました。

1. データベースを横断した分析ができない
現在、Myシリーズは4つのSaaSプロダクトで構成されており、データベースもプロダクトをホストする各AWSアカウントに分かれています。Metabaseではこれら複数のデータベースを横断して検索(Join)することができず、クエリ結果をCSVでダウンロードして手作業で結合する手間が発生していました。

2. 分析に時間がかかる
データ分析は専用のレプリカデータベースに対して実施しますが、時間のかかる重いクエリを実行する場合には、前段のロードバランサーのタイムアウトに抵触するため、クエリを複数回に分けて実行する手間が発生していました。

Snowflakeの導入による解決

これらの課題を解決するため、データクラウドプラットフォームとしてSnowflakeを採用しました。 Snowflakeにデータを同期することで、複数プロダクトのデータを横断的に分析できるだけでなく、列指向や高いデータの圧縮率というデータウェアハウスの特性から短時間での分析が可能です。

www.snowflake.com

なお、上記利点は他のデータウェアハウス製品でも実現できますが、Snowflakeを採用した理由は下記です。

  • AWSとの親和性: TalentXのインフラはAWS上にあり、S3経由でのデータ連携が容易かつ転送コストを抑えられる点

  • パフォーマンス: マイクロパーティション構造によるクエリパフォーマンスの高さ

  • 将来性: Snowflake CortexAI等、生成AI機能の充実に期待できる点

アーキテクチャ概要

データベースのデータをSnowflakeへ同期するアーキテクチャは以下の通りです。

Aurora MySQLからSnowflakeにデータ同期するアーキテクチャ

①DMS(Database Migration Service)を使用して、Aurora MySQLのデータをS3へParquet形式で出力します
②SnowflakeのExternal Stageを構成して、SnowflakeからS3上のデータを参照可能にします
③Snowflake Taskを構成して、S3上のデータをSnowflake テーブルに同期します

ここで注意したいのは、①のDMSによるデータ同期には既存データの全量を出力するフルロードと更新データをリアルタイムに出力するCDC(Change Data Capture)の二種類のレプリケーションが存在することです。

2種類のDMSレプリケーション

データをSnowflakeに同期するためにはこの2種類のデータ同期を考慮しなければなりません。
ここからは、このデータ同期の仕組みを構築するにあたり発生した技術課題と具体的な実装のポイントを3つ取り上げて解説します。

データ同期に伴う3つの技術的課題と解決

課題1:同期先のSnowflakeテーブルの作成

S3上のデータは、Snowflake上に作成したテーブルに対してCopy intoクエリを実行することで同期できます。

COPY INTO <同期対象のテーブル>
  FROM @<S3外部ステージ>
    FILE_FORMAT = (TYPE = 'PARQUET');

この時、同期先のテーブルは事前に作成しなければなりませんが、Myシリーズには合計500以上のテーブルが存在するため、1つずつ手作業でMySQLのスキーマをSnowflakeのスキーマに置き換え、Create Tableクエリを実行するのは現実的ではありませんでした。

そこでSnowflakeの組み込み関数であるINFER_SCHEMA()を利用して、S3上のParquetファイルからスキーマを推論して自動的にテーブル定義を作成しました。
注意点として、DMSで出力したデータにはタイムスタンプのカラムを付与できますが、指定したフォーマットによってはINFER_SCHEMA()でString型と推論されるため、下記のようにタイムスタンプ型にキャストしています。

CREATE OR REPLACE TABLE <テーブル名>
USING TEMPLATE (
  SELECT ARRAY_AGG(OBJECT_CONSTRUCT(
    'COLUMN_NAME', I.COLUMN_NAME,
    -- カラム名が 'CDC_TIMESTAMP' の場合のみ型を TIMESTAMP_NTZ に強制し、それ以外は推論された型を使用
    'TYPE', IFF(I.COLUMN_NAME = 'CDC_TIMESTAMP', 'TIMESTAMP_NTZ', I.TYPE),
    'NULLABLE', I.NULLABLE
  ))
  FROM TABLE(
    INFER_SCHEMA(
      LOCATION => '@<ステージ上のファイルパス>',
      FILE_FORMAT => '<ファイルフォーマット名>',
      IGNORE_CASE => TRUE
    )
  ) AS I
);

課題2:CDCデータのマージ処理

フルロードに対してCDCのデータ同期では、同期済みのデータと比較して差分をマージする作業が必要なため、処理が複雑になります。 具体的には、CDCでS3に出力されたParquetファイルには、Op(Operation)カラムが含まれ、I(Insert)、U(Update)、D(Delete)のいずれかが記録されており、各値に応じて同期するレコードの取り扱いを変えなければいけません。

・I (Insert): 新規レコードとして挿入
・U (Update): 既存レコードを最新のタイムスタンプで更新
・D (Delete): 該当レコードを削除

これらの条件分岐はSQLでも記述できますが、同期対象のテーブルやマージ時に基準となる主キーの扱いでプログラミング言語のように変数や関数を扱いたい場合があります。
そこで今回は下記のようにストアドプロシージャを利用することで、可読性の高い、汎用的な同期ロジックを記述しました。

    // MERGE処理を実行
    const mergeSql = `
        MERGE INTO $${fullTableName} AS target
        USING (
            SELECT *
            FROM $${tempTableName}
            QUALIFY ROW_NUMBER() OVER(PARTITION BY $${partitionByClause} ORDER BY "CDC_TIMESTAMP" DESC) = 1
        ) AS source
        ON $${onClause}
        
        WHEN MATCHED AND source.OP = 'D' THEN DELETE
        WHEN MATCHED AND source.OP = 'U' THEN UPDATE SET $${updateClause}
        WHEN NOT MATCHED AND source.OP IN ('I', 'U') THEN INSERT ($${insertColumnsClause}) VALUES ($${insertValuesClause});
    `;
    snowflake.execute({ sqlText: mergeSql });

課題3:大量のSnowflakeのリソースの自動作成

繰り返しになりますが、Myシリーズには同期対象のMySQLテーブルが500以上あり、同期先のSnowflakeテーブル、フルロードおよびCDCレプリケーションの2種類の同期を実現するそれぞれに対応するSnowflakeタスクおよびストアドプロシージャを作成するとなると、1500以上のSnowflakeリソースを作成しなければなりません。

作成が必要なSnowflakeリソース数 : 500テーブル*( 1テーブル + 1タスク*2種類)= 1,500

一つのSnowflakeタスクの中で全テーブルデータの同期ロジックを記述することもできますが、コードの可読性や同期に失敗したテーブルの原因調査や再ロードという運用の観点から、各テーブルに応じたタスクを作成することにしました

この課題に対しては、Terraformを使用してSnowflake上のリソースを自動作成する方法で解決しました。

まず、DMSは、S3に同期を行う際に/<DBスキーマ名>/<テーブル名>の階層構造でオブジェクトを出力するため、 S3オブジェクトの情報をdataブロックで参照することで同期対象のテーブル一覧を自動作成します。

# S3オブジェクトの情報を参照
data "aws_s3_objects" "main" {
  for_each = local.data_s3_objects

  bucket    = each.value.bucket
  prefix    = each.value.prefix
  delimiter = "/"
}

# オブジェクトパスからテーブル名に該当する箇所を抜き出してoutput変数に格納
output "s3_subdirectory_names" {
  description = "SubDirectory Names presents MySQL Table Name for DataLoad."
  value = {
    for k, v in data.aws_s3_objects.main : k => {
      subdirectory_names = [for subdirectory_name in v.common_prefixes : split("/", subdirectory_name)[1]]
    }
  }
}

次に作成したテーブル一覧のオブジェクト変数をfor_eachでループして、Snowflakeタスクを作成します。 (下記はフルロード用のSnowflakeタスクを作成する例)

# ローカル変数で、テーブル×Snowflakeタスクの組み合わせをフラットなマップに変換
locals {
  task_full_load_db_data = merge(concat(
    [for k, v in {
      xxx_aurora_data  = { database = "xxx_prd", schema = "xxx_aurora_db" },
      // ... 他のDB定義
    } : {
      # 変数(var.s3_subdirectory_names)で管理されたテーブル名リストを展開
      for table_name in var.s3_subdirectory_names[k].subdirectory_names : 
      lower("${var.database_names[v.database].name}_${var.schema_names[v.schema].name}_${table_name}") => {
        database  = var.database_names[v.database].name
        schema    = var.schema_names[v.schema].name
        table     = upper(table_name)
        procedure = snowflake_procedure_javascript.full_load_db_data[k].name
      }
    }]
  )...) # スプレッド演算子で展開してmerge
}

# 生成されたマップを元にSnowflakeタスクを一括作成
resource "snowflake_task" "full_load_db_data" {
  for_each = nonsensitive(local.task_full_load_db_data)

  database = each.value.database
  schema   = each.value.schema
  name     = "FULL_LOAD_DB_DATA_${each.value.table}"
  started  = false
  
  # テンプレートファイルにパラメータを渡してSQLを生成
  sql_statement = templatefile("${path.module}/data/task/full_load_db_data.sql",
    {
      database  = each.value.database
      schema    = each.value.schema
      table     = each.value.table
      procedure = each.value.procedure
    }
  )
  user_task_managed_initial_warehouse_size = "X-SMALL"
}

この構成の利点は、本番データベースにテーブルが追加されレプリケーション先のS3オブジェクトが追加されるたびに、Terraformを実行するだけで、dataブロックが更新され差分が発生し、新規テーブルおよびデータ同期のための2種類のSnowflakeタスクリソースを自動的に追加できることです。

なお、SnowflakeリソースをTerraformで作成する際のTipsとして下記2点を補足します

・上記のように大量のリソースをTerraformで管理する場合には、Terraform Planの時間の短縮のためWorkspace(Stateファイル)の分割が有効です

support.hashicorp.com

・TerraformのSnowflake Provider(snowflakedb/snowflake)では、一部のリソースや機能がプレビュー扱いとなっているため、providerブロックで明示的に有効化する必要があります

registry.terraform.io

まとめ

上記のようにTerraformとSnowflakeのストアドプロシージャを活用することで、Myシリーズの全てのデータベースのデータ同期を構築および運用コストを削減しつつ実現できました。

今後の展望

現在は、本番データベースでスキーマ変更(DDL)があった場合、Slackに通知して手動でSnowflakeテーブルに反映する運用を行なっています。 MySQLとSnowflakeのクエリは文法的に異なる箇所がいくつか存在するため(例えばSnowflakeにIndexという概念はない)人の手でクエリを変換する必要があります。将来的には、このスキーマ反映のクエリ生成および反映を生成AIを活用して自動化する仕組みを作成したいと考えています。

最後に

今回取り上げたSnowflakeの他にもTalentXのエンジニアチームはモダンなアーキテクチャやツールを積極的に取り入れチャレンジする文化があります。 技術的なチャレンジにご興味ある方はぜひ一緒に働きましょう。 talentx.brandmedia.i-myrefer.jp

また、カジュアル面談も行っていますのでお気軽にお申し込みください。 i-myrefer.jp