本番DBとステージングDBのデータを同期させる - 生涯未熟

生涯未熟

プログラミングをちょこちょこと。

本番DBとステージングDBのデータを同期させる

この記事はMIXI DEVELOPERS Advent Calendar 2023の18日目の記事です。

今回はやったことある人が多そうな本番DBのデータをステージングDBに同期させる仕組みを作ったお話になります。

前提のお話

何故作ったのか?ですが、データ数の問題から開発・ステージング環境では発生せず本番環境では発生するような表示上のバグが数は少ないですが存在し、そういった問題を早期に発見するために対応できませんか?と相談されたことがきっかけで同期の仕組みを作ることになりました。

また、ステージング環境ではQAチームによる自動テストが動いており、本番環境データが同期されていればより潜在的な問題の発見に役立ちそうですね。

ここから詳しい説明に入りますが、DBはCloud SQL for MySQL v5.7に準拠した内容となっております。

作る

さて、ここから作るフェーズに入っていくのですがもう少し作りたいモノのイメージを明確にしていきましょう。
「本番DBのデータをステージングDBに同期させる」、当たり前ですがこれは大前提ですね。他には「継続的に同期させる」、こちらも当たり前ですが本番環境のデータは常に変化していくものなので継続的にステージングDBに反映していく仕組みでないとダメですね。
あとは「ステージングDBに反映してはいけないデータは反映しない」、これはユーザーの個人情報などですね。Identity-Aware Proxyを有効化したり、Cloud Armorを利用したり悪意のあるアクセスはなるべく弾くようにはしていますが、開発・ステージング環境はどうしても標的にされやすいものです。リスクを減らすためにも、漏れるとまずいデータは同期させないようにします。また「ステージングDBから営業用デモデータを退避させる」という要件も存在しており、本番DBのデータをインポートする前にそのデータを退避させなければいけません。

最後に「できれば低コストで作りたい」、というお気持ちがありました。現在、SREとしてインフラのコスト面も管理しているのですが普段コスト削減などを承っている立場上、あまりコストが高くならないようにという思いからです。

では、これらを踏まえてどうやって作っていくか考えていきましょう。先程の要件をまとめるとこちらですね。

まず、「本番DBのデータをステージングDBに同期させる」ですが、これは本番DBをエクスポートしてステージングDBにインポートすれば達成できますね。次は「継続的に同期させる」Google Cloud(以下、GC)にはSchedulerなど定期的に実行する仕組みがあるのでここも問題はなさそうです。
「ステージングDBに反映してはいけないデータは反映しない」については、本番DBからステージングDBにインポートする間にデータを編集する必要性がありそうですね。Cloud SQLでエクスポートできる形式はSQL/CSVの2種類なのですが、どちらもエクスポートしたデータを編集するのは骨が折れそうなので、今回は本番DB → ステージングDBの間に本番レプリカDBを挟み込み、このDB上でデータ編集するのが良さそうです。

「ステージングDBから営業用デモデータを退避させる」は、ステージングDBの退避させる対象データを参照しながら本番レプリカDBに追加する、といった感じで出来そうです。こうやって処理した本番レプリカDBをまたエクスポートしてからステージングDBにインポートすれば目出度く同期の完了、という目論見になります。
最後に、「できれば低コストで作りたい」という要件は、必要な処理類をCloud Functionsに寄せれば安上がりにいけそうだったので今回はがっつり利用しています。あと、それぞれ作成したCloud Functionsを一つのワークフローとして管理するためにCloud Workflowsも利用します。こちらも料金体系を確認すると、GC内のリソースをステップとする場合には月5000回まで無料で実行できるとのことで、1日1回ほどの実行頻度だと十分無料分のみで回せそうでした。

再びまとめるとこうなりますね。

ではこれらを考慮に入れて作成開始していきますか。まず先に完成図がこちらになります。

この完成図を元に詳しく見ていきましょう。ちなみにですが作成に利用する言語はGoになりますのであしからず🙇‍♂

本番DBからエクスポート

エクスポートについては幸いなことにGoogleがサンプルコードを提供してくれていますので、有り難く一部使わせていただきましょう。

cloud.google.com

それでは完成したコードはこちらになります。

このコードを見ながら重要な箇所を解説していきます。

// アプリケーションデフォルトの資格情報を使用する http.Client を作成
hc, err := google.DefaultClient(ctx, sqladmin.CloudPlatformScope)
if err != nil {
  fmt.Println("Failed to create http.Client: ", err)
  http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
  return
}

// Google Cloud SQL サービスを作成
service, err := sqladmin.New(hc)
if err != nil {
  fmt.Println("Failed to create SQL Admin Service: ", err)
  http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
  return
}

ここではCloud SQLへの接続準備をしているところで、Application Default Credentials(ADC)を使用したClientを作成してからCloud SQL serviceを作成しています。ADCを利用するということでCloud Functionsで利用するサービスアカウントはCloud SQLに関する権限を付け忘れないよう注意です。

// エクスポート操作を実行
rb := &sqladmin.InstancesExportRequest{
  ExportContext: &sqladmin.ExportContext{
    Kind:      "sql#exportContext",
    FileType:  "SQL",
    Uri:       uriPath,
    Offload:   true, // サーバーレス エクスポートを有効にする
    Databases: []string{database},
  },
}

op, err := service.Instances.Export(project, instance, rb).Context(ctx).Do()
if err != nil {
  fmt.Println("Failed to export: ", err)
  http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
  return
}

エクスポートを実行しているところですね。先程作成したCloud SQL serviceを利用してエクスポートするのですが、その前に InstancesExportRequest でどのようにエクスポートするのか?を設定しています。本番環境からエクスポートする、ということでサーバーレスエクスポートを有効化しております。これはエクスポートのために一時的にインスタンスを生成し、そこを通してエクスポートすることでパフォーマンスの確保やインスタンスへのオペレーションを阻害しないなどのメリットがあるため有効にしてあります。

cloud.google.com

// エクスポートオペレーションの状態を10s毎に取得
for {
  op, err = service.Operations.Get(project, op.Name).Do()
  if err != nil {
    fmt.Println("Operations.Get: ", err)
    http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
    return
  }
  if op.Status != "PENDING" && op.Status != "RUNNING" {
    break
  }
  time.Sleep(10 * time.Second)
}

if op.Status != "DONE" || op.Error != nil {
  fmt.Println("Failed to export operation: ", op.Error)
  http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
  return
}

最後にエクスポートの進捗を都度取得しておきます。 service.Operations.GetPENDING , RUNNING , DONE , あとこれは取得する必要性あるかな?と思うのですが SQL_OPERATION_STATUS_UNSPECIFIED というオペレーションの状態が不明の場合のレスポンスがそれぞれ取得できます。

これを実行することで本番DBからエクスポートされたSQLファイルがCloud Storageに格納されるわけですね。

本番レプリカDBにインポート

そんなエクスポートしたデータを本番レプリカDBにインポートしていきます。と、いきたいのですがその前に本番レプリカDBについてもう少し説明したいと思います。
個人情報を扱う上でのセキュリティリスクは説明しましたが、対策としてデータをマスキングしましょうというお話をさせていただきました。しかしながら、本番レプリカDBでエクスポートしてからマスキングするまでの間は低いながらもリスクが存在するため、リスクを軽減するための対策を別途行いましょう。

Cloud SQLインスタンスは通常パブリックIPアドレスが割り当てられますが、これを割り当てずにプライベートIPアドレスのみ割り当てるようにすることでセキュリティリスクを低減させることができますので、こちらをやっていきましょう。

こちらについては説明すると長くなりますので割愛いたしますが、以下のような記事など巷に解説してらっしゃる方がいますので参考にしてみてください。

qiita.com

では、そちらを踏まえながらコードの解説に移ります。

it := client.Bucket(bucketName).Objects(ctx, &storage.Query{
  Prefix: targetDir,
})

fmt.Printf("Files matching date: %s\n", date)

var objectName string
for {
  attrs, err := it.Next()
  if err == iterator.Done {
    break
  }
  if err != nil {
    fmt.Println("Failed to list objects: ", err)
    http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
    break
  }

  // 本日日付を含むファイルがあればインポート対象とする
  if strings.Contains(attrs.Name, date) {
    objectName = attrs.Name
  }
}

if objectName == "" {
  fmt.Println("export sql file not found")
  http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
  return
}

ここではCloud Storageへのアクセスを行っており、先程のエクスポートファイルを取得していますね。判定はファイル名に記述してある日付をもとにしています。
で、取得してきたファイルをインポートする前にファイル内で USE DATABASE が記述しているところを、本番DBのものからステージングDBのものへと変えておきましょう。

replacedContents := strings.ReplaceAll(string(b), "production", "staging")

writer := client.Bucket(bucketName).Object(objectName).NewWriter(ctx)
if _, err = writer.Write([]byte(replacedContents)); err != nil {
  fmt.Println("Failed write sql file")
  http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
  return
}
if err := writer.Close(); err != nil {
  fmt.Println("Failed close sql file")
  http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
  return
}

あとはエクスポートの時と同じ要領でインポートオペレーションを実行しましょう。インポートするファイルの指定はCloud StorageのURIを指定すればOKです。

// Cloud SQLへのインポートリクエスト
importRequest := &sqladmin.InstancesImportRequest{
  ImportContext: &sqladmin.ImportContext{
    FileType: "SQL",
    Uri:      fmt.Sprintf("gs://%s/%s", bucketName, objectName),
    Database: dbName,
  },
}

// SQLファイルをインポート
op, err := sqlAdminService.Instances.Import(project, instanceName, importRequest).Do()
if err != nil {
  fmt.Println("Instances.Import: ", err)
  http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
  return
}

データマスキング

やっとこさマスキングまで来ましたが、今までと違い実際のコードからかなり省略したものを使って説明させていただきます。

実際のコードではこれ以外にも細かい様々なマスク処理をしていますが、今回は簡単にメールアドレスと電話番号を * に置き換えるコードにしてあります。

var opts []cloudsqlconn.DialOption
opts = append(opts, cloudsqlconn.WithPrivateIP())
mysql.RegisterDialContext("cloudsqlconn",
  func(ctx context.Context, addr string) (net.Conn, error) {
    return d.Dial(ctx, fmt.Sprintf("%s:%s:%s", project, region, instance), opts...)
  }
)

uri := fmt.Sprintf("%s:@cloudsqlconn(localhost:3306)/%s?parseTime=true", user, database)

ここではCloud SQLのGoコネクタを使って接続しています。以下のGoogleのサンプルコードを参考に致しました。

cloud.google.com

// マスク処理を行うSQL文を生成し、実行
for table, columns := range maskTarget {
  for _, column := range columns {
    query := fmt.Sprintf("UPDATE %s SET %s = REPEAT('*', CHAR_LENGTH(%s))", table, column, column)
    _, err := db.Exec(query)
    if err != nil {
      fmt.Println("Failed to mask the database: ", err)
      http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
      return
    }
  }
}

肝心のマスク処理ですが、 UPDATE %s SET %s = REPEAT('*', CHAR_LENGTH(%s))", table, column, column を実行し、既存のデータを * に置き換えています。至極簡単ですね。ただし、重複が許容されていないカラムなどは既存データをシードとしたランダム文字列を生成するなど、もう少し考慮する必要があります。

ステージングDBの一部データ退避

ここもかなりプロダクトのドメインと紐付いていて実際のコードを使って説明することが難しいのですが、やっていることとしては

  • 本番レプリカDBとステージングDBに接続する
  • 退避させるデータをステージングDBから取得する
  • 取得したデータを本番レプリカDBに追加する

の3つになります。
テーブルによってはFOREIGN KEY制約があると思いますが、そういった場合には本番レプリカDBに追加したデータのIDを保持しておいて、ステージングDBから取得したデータに上書いてから本番レプリカDBに追加するなどの処理が必要になります。

本番レプリカDBからエクスポート/

そろそろ終わりが見えてきました。データマスキングや退避データを入れた本番レプリカDBをエクスポートして、ステージングDBにインポートします。こちらも今までやったこととほぼほぼ同じなのでコードは省略いたしますが、違う点としては

  • エクスポート後に本番レプリカDBをまっさらにする
  • インポート前にバックアップを取っておく

の2つですね。前者は説明は要らないと思いますが、後者はコードからやるのもいいですがステージング環境のインスタンス設定で定期バックアップさせておくのが楽です。もし厳密に直前のバックアップを取るといった場合には sqladminBackupRun などがありますのでこれを使って出来そうですね。(未検証

pkg.go.dev

Cloud WorkflowsでCloud Functionsを実行する

最後にCloud WorkflowsでCloud Functionsを定期実行するように設定しましょう。Cloud FunctionsやCloud Runにリクエストを飛ばす場合にはOIDCを利用することになります。

cloud.google.com

一応、エラーがあった際などどこまで処理が成功していたのか?などログから分かりやすいように、 call: sys.log を使ってログを出力しておくのをオススメします。そういったことを含めて、Workflowsのコードとしては一部を載せますが以下のような感じになります。

main:
    params: [input]
    steps:
        - sync-production-to-staging:
            try:
                steps:

                # 本番DBからSQLファイルをCloud Storageへエクスポート
                - exportProductionDBLog:
                    call: sys.log
                    args:
                        data: "=== exportProductionDB START ==="
                        severity: "INFO"
                - exportProductionDB:
                    call: http.get
                    args:
                        url: 'https://hoge.cloudfunctions.net/export-production-db'
                        auth:
                            type: OIDC
                            audience: 'https://hoge.cloudfunctions.net/export-production-db'
                    next: importProductionReplicaDBLog

                # エクスポートされた本番DBのSQLファイルを本番レプリカDBへインポート
                - importProductionReplicaDBLog:
                    call: sys.log
                    args:
                        data: "=== importProductionReplicaDB START ==="
                        severity: "INFO"
                    next: importProductionReplicaDB
                - importProductionReplicaDB:
                    call: http.get
                    args:
                        url: 'https://hoge.cloudfunctions.net/import-production-replica-db'
                        auth:
                            type: OIDC
                            audience: 'https://hoge.cloudfunctions.net/import-production-replica-db'

            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${not("HttpError" in e.tags)}
                          raise: "Connection problem"
                        - condition: ${e.code == 404}
                          raise: "URL wasn't found"
                        - condition: ${e.code == 403}
                          raise: "Authentication error"
                        - condition: ${e.code == 500}
                          raise: "Internal Server Error"
                    - unhandled_exception:
                        raise: ${e}

try-except でどのようなエラーで落ちたか?も出力するようにしておくと失敗時の調査に良いですね。あとは今まで作成したCloud Functionsも記載していけば完了です。

おわりに

本番DBとステージングDBの同期はよくある実装ですが、一つの例として今回取り上げさせていただきました。読んでくださった方の役に立つ情報があれば幸いです🙇‍♂

次は、@fanglangによる記事になります!