TerraformによるAirbyteを利用したデータ基盤へのデータ連携とCI/CD - Sansan Tech Blog

Sansan Tech Blog

Sansanのものづくりを支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信

TerraformによるAirbyteを利用したデータ基盤へのデータ連携とCI/CD

こんにちは!「Sansan Summer Internship 2023」でインターンをしていた野首侑作です(X, Facebook)。R&D Architectグループに1ヶ月強コミットしていました。今回のインターンで使った技術はこれまで全く触ったことがありませんしたが、チームメンバーのサポートもあり本番環境でも動かせるコードを書くことができました。 今回学んだことの整理と、Sansanのインターンで学べることの多さを共有したいという意味を込めて、社内ブログでアウトプットしようと思った次第です。

目次

はじめに

Airbyteは、多数のデータソース同士を連携させることができる OSS のデータコネクタです。ELT 処理によりデータソースから AWS や BigQuery などのディスティネーションへデータの抽出を行うことができます。データソースと連携先の種類が多い、dbtを経由できる、短時間で連携できるなどといった特長があり、現在勢いのあるELTツールのひとつです。

ローカルでAirbyteの立ち上げ

1. DockerによるAirbyte serverの起動

参考:https://docs.airbyte.com/quickstart/deploy-airbyte

Airbyte は Airbyte Cloud も提供していますが、トライアル期間が 14 日と短いためまずはローカルで試すことをお勧めします。

まず、公式ドキュメントに従ってリポジトリをクローンし、シェルスクリプトから Docker コンテナを立ち上げて Airbyte server を起動します。

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
./run-ab-platform.sh

起動が確認できたら、ブラウザから localhost:8000 へアクセスしドキュメントに従って username と password を入力してください。

username:airbyte
password:password

Airbyte UI にアクセスできれば準備は完了です。 メールアドレスなどを適当に入力してください。

2. Sourceの登録

参考 : https://docs.airbyte.com/quickstart/add-a-source

次に、データソースを登録します。 今回は手順に従って Poke API を叩いてみます。 UI がバージョンによって変更されるため、公式ドキュメント通りに設定するのが良いでしょう。

3. Destinationの登録

参考 : https://docs.airbyte.com/quickstart/add-a-destination

次に、データの連携先を登録します。 ドキュメントではローカルの json ファイルを指定していますが、せっかくなので Google Cloud の BigQuery に登録してみましょう。

まずこちらを参考に、BigQuery へ書き込める権限を持ったサービスアカウントを発行してください。 アカウントキーの json ファイルをダウンロードできたら、大切に保管しましょう。

BigQuery に移動し、airbyte_test というデータセットを作成してください。

では、Airbyte で BigQuery を登録します。

  • Project ID : hogehogeの部分
  • Data Location : データセットと同じ(ここではasia-northeast1
  • Default Dataset ID : airbyte_test
  • Service Account Key JSON : アカウントキーの json ファイルの中身すべて

4. Connectionの登録

参考 : https://docs.airbyte.com/quickstart/set-up-a-connection

最後に、Poke API と BigQuery を連結させます。 ここは UI に従ってポチポチ押していくだけで問題ないです。

今回は同期のスケジューリングをする必要はないので、Replication frequencyのみManualに変更してください。

5. 接続確認

では、Sync ボタンを押して BigQuery にデータが追加されているか確認してみましょう。 同期完了後の画面はこちらです。 これが同期後の BigQuery の画面です。

airbyte_tableの中にPokemonのテーブルが作成されていることが確認できましたね。

ちなみにairbyte_table__raw_stream_pokemonというテーブルも作成されていますが、展開前のデータをそのまま溜めるために作られるようです。以前のバージョンでは設定で切り替えることができたはずですが、現在のバージョンでは変更できるボタンを見つけられませんでした。

TerraformによるIaC

IaCとは

Infrastructure as Code(以下、IaC)とは、インフラの構築や設置をコードで管理することです。

前項の手順では、Airbyte UI から BigQuery や Connection の設定を行っています。 しかし、何らかの影響で設定が壊れてしまったり、同じ構成で別の環境に構築する場合の再現性が低かったりと、運用に支障をきたす恐れがあります。

そこで、本記事では Terraform を用いて IaC を行います。

Terraformとは

参考 : https://developer.hashicorp.com/terraform/docs

Terraformとはインフラを安全かつ効率的に構築、変更、バージョン管理できる IaC ツールです。

ブロック単位でリソースの設定を記述し、簡単なコマンドでインフラの構築を行うことができます。

今回は、Terraformにより前項のAirbyteでのコネクタの設定をコード化します。 なお、ローカルのAirbyteを利用するとコネクション作成時にタイムアウトエラーが発生してしまうため、Airbyte Cloudでの検証となります。 現在筆者がIssueを投げているので、解消されるまでしばしお待ちください。

(参考 : Timeout error has occurred when creating connection by terraform via local AirByte API Server)

まずは次のコマンドで Terraform をインストールしてください。

$ brew install tfenv
$ tfenv install 1.4.6
$ tfenv use 1.4.6

では Terraform を書いていきます。 適当なディレクトリ(ここではterraform)にmain.tfvariables.tfを作成し、BigQueryのサービスアカウントキーをcredentials.jsonとして置いてください。

terraform
├── credentials.json
├── main.tf
└── variables.tf

補足

credentials.jsonの流出を防ぐため、GitHubへプッシュする場合は確実に.gitignore で除外してください。

main.tfでリソースを記述し、variables.tfで変数を設定します。

まずはファイルの先頭で宣言を行います。このブロックは Terraform ドキュメントの Airbyte のページから持ってきてください。 required_versionを追記します。インストールしたバージョンを指定してください。

terraform {
  required_version = "1.4.6"

  required_providers {
    airbyte = {
      source  = "airbytehq/airbyte"
      version = "0.3.3"
    }
  }
}

provider "airbyte" {
  bearer_auth = var.api_key
}

なお、Airbyte Cloudを利用する場合は Bearer認証を行うよう言及されていますので、provider ブロックでは bearer_auth を利用します。 (参考 : https://reference.airbyte.com/reference/using-the-terraform-provider)

var.api_key とは、variables.tf で宣言した api_key という変数を参照する、ということです。 variables.tf では、次のように書きます。

variable "api_key" {
  type    = string
  default = YOUR_API_KEY
}

今回はデモなので、defaultにAPIキーをベタ打ちしましょう。 Airbyte Developers Portalから API Keys を発行し、default に与えてください。 こうすることで、 provider ブロックのvar.api_keyvariables.tfYOUR_API_KEY を参照します。

次に、ワークスペースを作成します。 Terraform ドキュメントの検索窓から workspace を入力し、airbyte_workspace をクリックすると airbyte_workspaceリソースのドキュメントページに遷移します。 resourceの後ろに続くのはリソースタイプローカルでのリソース名です。リソースタイプは変更できませんが、リソース名 は自由に変更できるため任意の名前をつけてください。

Schemaには、RequiredOptional という2種類のパラメータがあります。Required は必ず記述しなければなりません。今回はnameRequired なので、適当な名前を与えましょう。

resource "airbyte_workspace" "workspace" {
  name = "airbyte_workspace"
}

次に、 Source を作成します。 Terraform ドキュメントから pokeapi を入力し、airbyte_source_pokeapi をクリックしてください。 お気づきかもしれませんが、Terraform は基本的にドキュメントから Usage をコピペし、必要に応じてパラメータを変更するという方針で記述していきます。

resource "airbyte_source_pokeapi" "my_source_pokeapi" {
  configuration = {
    pokemon_name = "snorlax"
    source_type  = "pokeapi"
  }
  name         = "PokeAPI"
  workspace_id = airbyte_workspace.workspace.workspace_id
}

Required となっているスキーマを記述しましょう。 workspace_id は terraform 実行時に初めて決まるため、ベタ打ちすることができません。 そこで、リソースタイプ.リソース名.workspace_id のように記述し、変数として渡してあげましょう。可読性も上がるうえ、1つのファイルで一貫してリソースを作成できるため、管理がしやすくなります。

次は、 Destination を作成します。 Terraform ドキュメントから bigquery を入力し、airbyte_destination_bigquery をクリックしてください。 今回は最小限の記述で済ませます。

resource "airbyte_destination_bigquery" "my_destination_bigquery" {
  configuration = {
    credentials_json = file("./credentials.json")
    dataset_id       = "airbyte_test"
    dataset_location = "asia-northeast1"
    destination_type = "bigquery"
    loading_method = {
      destination_bigquery_loading_method_standard_inserts = {
        method = "Standard"
      }
    }
    project_id = YOUR_PROJECT_ID
  }
  name         = "BigQuery"
  workspace_id = airbyte_workspace.workspace.workspace_id
}

credentials_json には file() メソッドを使ってディレクトリ内の json ファイルからキーを与えることができます。 また、ドキュメントでは、GCS経由のデータ転送(GCS Staging)でインサートを行なっていますが、必要な変数が増えてしまうので今回は Standard としておきます。

最後に Connection を作成します。 Terraform ドキュメントから connection を入力し、airbyte_connection をクリックしてください。 Required だけ記述するとこの程度で済みます。簡単ですね。

resource "airbyte_connection" "my_connection" {
  destination_id = airbyte_destination_bigquery.my_destination_bigquery.destination_id
  name           = "PokeAPI → BigQuery"
  schedule = {
    schedule_type = "manual"
  }
  source_id = airbyte_source_pokeapi.my_source_pokeapi.source_id
}

繰り返しになりますが、Terraform を書いていく手順は

  1. ドキュメントで書きたいリソースを検索
  2. Required をもれなく記述
  3. 仕様に応じて Optional を追記

となります。プロバイダなどのバージョンを揃えることを忘れないでくださいね。

ファイル全体はこちらです。

terraform {
  required_version = "1.4.6"

  required_providers {
    airbyte = {
      source  = "airbytehq/airbyte"
      version = "0.3.3"
    }
  }
}

provider "airbyte" {
  bearer_auth = var.api_key
}

resource "airbyte_workspace" "workspace" {
  name = "airbyte_workspace"
}

resource "airbyte_source_pokeapi" "my_source_pokeapi" {
  configuration = {
    pokemon_name = "snorlax"
    source_type  = "pokeapi"
  }
  name         = "PokeAPI"
  workspace_id = airbyte_workspace.workspace.workspace_id
}

resource "airbyte_destination_bigquery" "my_destination_bigquery" {
  configuration = {
    credentials_json = file("./credentials.json")
    dataset_id       = "airbyte_test"
    dataset_location = "asia-northeast1"
    destination_type = "bigquery"
    loading_method = {
      destination_bigquery_loading_method_standard_inserts = {
        method = "Standard"
      }
    }
    project_id = YOUR_PROJECT_ID
  }
  name         = "BiqQurey"
  workspace_id = airbyte_workspace.workspace.workspace_id
}

resource "airbyte_connection" "my_connection" {
  destination_id = airbyte_destination_bigquery.my_destination_bigquery.destination_id
  name           = "PokeAPI → BigQuery"
  schedule = {
    schedule_type = "manual"
  }
  source_id = airbyte_source_pokeapi.my_source_pokeapi.source_id
}
variable "api_key" {
  type    = string
  default = YOUR_API_KEY
}

では実行してみましょう。 main.tf があるディレクトリに移動し、init, plan, apply の順で実行してください。

$ terraform init

Initializing the backend...

Initializing provider plugins...
- Finding airbytehq/airbyte versions matching "0.3.3"...
- Installing airbytehq/airbyte v0.3.3...
- Installed airbytehq/airbyte v0.3.3 (signed by a HashiCorp partner, key ID CE79FE6C49B34526)

Partner and community providers are signed by their developers.
If you'd like to know more about provider signing, you can read about it here:
https://www.terraform.io/docs/cli/plugins/signing.html

Terraform has created a lock file .terraform.lock.hcl to record the provider
selections it made above. Include this file in your version control repository
so that Terraform can guarantee to make the same selections by default when
you run "terraform init" in the future.

Terraform has been successfully initialized!

You may now begin working with Terraform. Try running "terraform plan" to see
any changes that are required for your infrastructure. All Terraform commands
should now work.

If you ever set or change modules or backend configuration for Terraform,
rerun this command to reinitialize your working directory. If you forget, other
commands will detect it and remind you to do so if necessary.


$ terraform plan

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
  + create

Terraform will perform the following actions:

  # airbyte_connection.my_connection will be created
  + resource "airbyte_connection" "my_connection" {
      + configurations                       = (known after apply)
      + connection_id                        = (known after apply)

...

  # airbyte_workspace.workspace will be created
  + resource "airbyte_workspace" "workspace" {
      + data_residency = (known after apply)
      + name           = "airbyte_workspace"
      + workspace_id   = (known after apply)
    }

Plan: 4 to add, 0 to change, 0 to destroy.

Note: You didn't use the -out option to save this plan, so Terraform can't guarantee to take exactly these actions if you run "terraform apply" now.


$ terraform apply

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
  + create

Terraform will perform the following actions:

  # airbyte_connection.my_connection will be created
  + resource "airbyte_connection" "my_connection" {
      + configurations                       = (known after apply)
      + connection_id                        = (known after apply)

...

  # airbyte_workspace.workspace will be created
  + resource "airbyte_workspace" "workspace" {
      + data_residency = (known after apply)
      + name           = "airbyte_workspace"
      + workspace_id   = (known after apply)
    }

Plan: 4 to add, 0 to change, 0 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

airbyte_workspace.workspace: Creating...
airbyte_workspace.workspace: Creation complete after 9s [name=airbyte_workspace]
airbyte_source_pokeapi.my_source_pokeapi: Creating...
airbyte_destination_bigquery.my_destination_bigquery: Creating...
airbyte_source_pokeapi.my_source_pokeapi: Creation complete after 0s [name=PokeAPI]
airbyte_destination_bigquery.my_destination_bigquery: Creation complete after 1s [name=BiqQurey]
airbyte_connection.my_connection: Creating...
airbyte_connection.my_connection: Creation complete after 6s [name=PokeAPI → BigQuery]

Apply complete! Resources: 4 added, 0 changed, 0 destroyed.

無事 apply が通りましたね! state list でリソースの一覧を確認してみましょう。

$ terraform state list
airbyte_connection.my_connection
airbyte_destination_bigquery.my_destination_bigquery
airbyte_source_pokeapi.my_source_pokeapi
airbyte_workspace.workspace

きちんと4つのリソースが登録されていることが確認できました。

最後に、作成したワークスペースへ移動して目視で確認しましょう。state show リソース でリソース内の変数を表示します。

$ terraform state show airbyte_connection.my_connection
# airbyte_connection.my_connection:
resource "airbyte_connection" "my_connection" {
    configurations                       = {
        streams = [
            # (1 unchanged element hidden)
        ]
    }
    connection_id                        = "44b0b579-af81-4eef-bfbb-0970d86b30d8"
    data_residency                       = "auto"
    destination_id                       = "cb259eca-712a-4521-959d-50d2e0b6b084"
    name                                 = "PokeAPI → BigQuery"
    namespace_definition                 = "destination"
    non_breaking_schema_updates_behavior = "ignore"
    schedule                             = {
        schedule_type = "manual"
    }
    source_id                            = "e761998e-6c48-4c9d-988e-4cb2066bb684"
    status                               = "inactive"
    workspace_id                         = "99bc74d2-5bf1-49d4-aba9-c37c4a148c95"
}

workspace_id99bc74d2-5bf1-49d4-aba9-c37c4a148c95 と分かりました。 ブラウザの検索バーにcloud.airbyte.com/workspace/workspace_id と入力することでアクセスできます。 確かに PokeAPI から BigQuery へコネクションを作ることができました。

補足

Terraform を運用する際は、次のような工夫をすると良いでしょう。

  • モジュール化
    • 扱うリソースが増えてきた場合、インフラ単位やsource, destination 単位でモジュール化すると管理しやすくなります
  • variables.tf での変数管理
    • 今回はほとんどの変数をベタ書きしましたが、変数は極力 variables.tf に記述した方が良いでしょう
    • description を書いて変数の役割を明確にしましょう

CI/CD

GitHub ActionsでのTerraform

Terraform によって IaC したので、GHA(GitHub Actions) で CI/CD を行えるようにしましょう。

リポジトリを作成し、こちらのディレクトリ構成にしてください。

.
└── .github
│   └── workflows
│       ├── cd.yml
│       └── ci.yml
└── terraform
    ├── .terraform
    ├── main.tf
    └── variables.tf

GHA でアクセスキーが必要になるため GitHub Actions の secretsを登録しましょう。

今回はAirbyte Cloud の APIキーを AIRBYTE_CLOUD_API_KEY、サービスアカウントの credential を CREDENTIALS_JSON とします。 次に、secrets から受け取ったアクセスキーを変数に持たせるための準備を行います。

variable "airbyte_cloud_api_key" {
  type        = string
  description = "Airbyte CloudのAPIキー"
}

variable "credentials_json" {
  type        = string
  description = "Google Cloudのアクセスキー(json形式)"
}

この2つの変数を利用するよう、provider と BigQuery ブロックを変更しましょう。

provider "airbyte" {
  bearer_auth = var.airbyte_cloud_api_key
}
resource "airbyte_destination_bigquery" "my_destination_bigquery" {
  configuration = {
    credentials_json = var.credentials_json
    dataset_id       = "airbyte_test"
    dataset_location = "asia-northeast1"
    destination_type = "bigquery"
    loading_method = {
      destination_bigquery_loading_method_standard_inserts = {
        method = "Standard"
      }
    }
    project_id = "compact-system-398013"
  }
  name         = "BiqQurey"
  workspace_id = airbyte_workspace.workspace.workspace_id
}

これで secretsを受け取る準備はできました。

次に、CI/CD を行う yml ファイルを作成しましょう。挙動の確認を行うだけなので、push時に反応するようにします。

on:
  push:

jobs:
  ci:
    runs-on: ubuntu-latest

    defaults:
      run:
        shell: bash
        working-directory: "terraform/"

    steps:
      - name: Check out source repository
        uses: actions/checkout@v3

      - name: Setup terraform
        uses: hashicorp/setup-terraform@v2
        with:
          terraform_version: 1.4.6

      - name: Init terraform
        id: init
        run: terraform init

      - name: Plan terraform
        id: plan
        run: |
          terraform plan -no-color -var="airbyte_cloud_api_key=$AIRBYTE_CLOUD_API_KEY" -var="credentials_json=$CREDENTIALS_JSON"
        env:
          AIRBYTE_CLOUD_API_KEY: ${{secrets.AIRBYTE_CLOUD_API_KEY}}
          CREDENTIALS_JSON: ${{secrets.CREDENTIALS_JSON}}
name: terraform CD Airbyte

on:
  push:

jobs:
  cd:
    runs-on: ubuntu-latest

    defaults:
      run:
        shell: bash
        working-directory: "terraform"

    steps:
      - name: Check out source repository
        uses: actions/checkout@v3

      - name: Setup terraform
        uses: hashicorp/setup-terraform@v2
        with:
          terraform_version: 1.4.6

      - name: Init terraform
        id: init
        run: terraform init

      - name: Apply Terraform
        id: apply
        run: |
          terraform apply -input=false --auto-approve -var="airbyte_cloud_api_key=$AIRBYTE_CLOUD_API_KEY" -var="credentials_json=$CREDENTIALS_JSON"
        env:
          AIRBYTE_CLOUD_API_KEY: ${{secrets.AIRBYTE_CLOUD_API_KEY}}
          CREDENTIALS_JSON: ${{secrets.CREDENTIALS_JSON}}

どちらもチェックアウト→セットアップ→ init を行い、CIではPlanを、CDではApplyを行っています。 本来、PlanとApplyではdefaultが設定されていない変数に対して標準入力で値を代入する必要があるのですが、引数varを与えることで代わりに代入を行ってくれます。 この時に=$hogeでsecretsに登録している変数を指定することで、アクセスキーをリポジトリに含めることなくCI/CDを動かすことができます。

ジョブの結果はこの通りです。 無事通りましたね。

まとめ

今回はAirbyteを知らない状態からコネクションの作成、TerraformでのIaCとCI/CDを行いました。掲載したコードは最低限動かすためだけのものなので、実用する場合はご自身のプロジェクトや要件に従って肉付けしてください。 また、Airbyteは現在も活発に更新されているOSSであるため、将来的には本記事のコードをそのまま適用することができなくなる可能性があります。もしエラーが発生した場合は遠慮なく野首までご相談ください。

おわりに

Sansanでは意思決定、定量評価、機械学習など多くの業務がデータドリブンとなっています。その根幹となるデータ基盤のアップデートに関わることができたので、とても充実したインターンとなりました。 この規模のデータ基盤に触れながら1ヶ月コミットできるインターンは少ないと思います。インパクトの大きい開発を行いたい人にとって素晴らしい経験が積めると思います。

© Sansan, Inc.