Go言語のためのRedshift Data API sql driver 『redshift-data-sql-driver』 - KAYAC Engineers' Blog

Go言語のためのRedshift Data API sql driver 『redshift-data-sql-driver』

この記事はAWS Analytics Advent Calendar 2022の16日目です。
こんにちは、SREチーム所属の@mashiikeです。 13日目にも記事を書いて、なんと欲張って2回目も書いてます。 13日目の記事は『redshift-data-set-annotator』に関してでした。

このような形で、私は用途がニッチな物を作ることが多いのですが、本日はニッチシリーズの中からGo言語のためのRedshift Data API sql driverについて話します。

Redshift Data API

Redshiftへ接続する際は、通常VPCへのアクセスが必要になります。
(Public AccessibilityをONにすればその限りではありませんが、セキュリティを考えると中々ONにすることはためらわれます。)
そのため、psqlクライアント等を使って手元から接続するのであれば踏み台サーバーを経由することが一般的です。例えば、AWS System ManagerのSession Managerにあるリモートホストのポートフォワード機能を利用することもあるでしょう。

dev.classmethod.jp

しかし、自身でアプリケーションを書いている場合は別の選択肢があります。
それが、Redshift Data APIです。

docs.aws.amazon.com

Redshift Data APIを使用してアクセスする限りはVPCへのアクセスが不要なのです。
Redshift Data APIにはいくつかの操作が存在しますが、基本的には4つを使えばSQLを実行してその結果を取得できます。

これらのAPIについて、いくつかの言語ではSDKが提供されていますので、実際にはSDK経由で使用することになると思います。
Go言語での実際の使用例に関しては、きれいなコードではありませんが github.com/mashiike/queryrunnerの例を提示いたします。

queryrunner/query_runner.go at v0.2.3 · mashiike/queryrunner · GitHub

コード

やってることを説明します。

  1. ExecuteStatementによって、Query実行を行います。そして、StatementのIDを取得します。
  2. 定期的にDescribeStatementを実行し、Statusが FINISHED になるのを待ちます。
  3. StatusがFINISHED になったらHasResultをチェックし、存在するならGetStatementResultによって結果を取得します。

途中でSIGHUPなどが来たり、待ち時間が長かったりしたらCancelStatementで適宜Statementをキャンセルします。

コードの記述量が多くこの部分だけライブラリ化しようかと思っていた頃、とある発表が私の心に響きました。

AWS Dev Day Day2ブレイクアウトセッション D-3: 『Amazon S3 Selectで実現するサーバーレス高負荷対応サイト』

speakerdeck.com

株式会社Fusic 清家史郎 氏のこちらのセッションを見たとき、私は気づきを得ました。

こちらのセッションでは、PHPのLaravelの基底Modelを継承し、独自作成のS3 Select用Modelを作成 していました。

S3 Select は Amazon S3 APIのSelectObjectContentを利用することで、独自のアプリケーションでその機能を使うことができます。
こちらのセッションによって私が得た気づきというのは、『SDKをラップして、言語やフレームワーク標準のインタフェースに変換することで、とても使いやすくなる』ということです。

PHPのLaravelでできる。S3 Selectでできる。
ということは・・・
Go言語でもできる。Redshift Data APIでもできる。

そう思い開発したのが redshift-data-sql-driver なのです。

Go言語の database/sql package

話をGo言語に戻します。Go言語ではdatabaseに対してsqlを実行する際のインタフェースは標準パッケージによって規格化されています。
例えば、Redshiftに対して接続する際は以下のように、Redshiftがpsql wire protocolに対応しているということを利用してPostgreSQLとして接続しに行くことが多いと思います。

package main

import (
    "context"
    "database/sql"
    "log"
    "os"
    "os/signal"

    _ "github.com/lib/pq"
)

func main() {
    db, err := sql.Open("postgres", "postgres://user:pass@<cluster>.<id>.ap-northeast-1.redshift.amazonaws.com:5439/dev")
    if err != nil {
        log.Fatalln("can not open", err)
    }
    defer db.Close()
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
    defer stop()

    query := `SELECT date_trunc('day', "time") as date_day, count(distinct user_id) as uu FROM access_logs WHERE "time" >= getdate() - interval '7 day'`
    row, err := db.QueryContext(ctx, query)
    if err != nil {
        log.Fatalln("query failed", err)
    }
    defer row.Close()

    for row.Next() {
        var dateDay string
        var uu int64
        if err := row.Scan(&dateDay, &uu); err != nil {
            log.Fatalln("scan failed", err)
        }
        log.Println(dateDay, uu)
    }
}

Go言語では、最初のimportの部分と sql.Open("dirver name", "dsn") の部分を変えるだけで、どのようなdatabaseでも同じような感覚でデータの取得が可能となっています。 ですので、Redshift Data APIでも同様にデータが使えるようにすれば、とても使いやすくなるわけです。
そのためには、database/sql/driver というパッケージを使ったRedshift Data API専用のdriver packageを作成する必要がありました。
そのRedshift Data API専用のdeiverが redshift-data-sql-driver なのです。

github.com

このdriverの使用方法は簡単で、以下のようになります。

package main

import (
    "context"
    "database/sql"
    "log"
    "os"
    "os/signal"

     _ "github.com/mashiike/redshift-data-sql-driver"
)

func main() {
    db, err := sql.Open("redshift-data", "user@cluster(<cluseter>)/dev")
    if err != nil {
        log.Fatalln("can not open", err)
    }
    defer db.Close()
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
    defer stop()

    query := `SELECT date_trunc('day', "time") as date_day, count(distinct user_id) as uu FROM access_logs WHERE "time" >= getdate() - interval '7 day'`
    row, err := db.QueryContext(ctx, query)
    if err != nil {
        log.Fatalln("query failed", err)
    }
    defer row.Close()

    for row.Next() {
        var dateDay string
        var uu int64
        if err := row.Scan(&dateDay, &uu); err != nil {
            log.Fatalln("scan failed", err)
        }
        log.Println(dateDay, uu)
    }
}

Go言語をお使いの方には非常に馴染みの深い使い方だと思います。
前述の通り、Redshift Data APIはVPCへのアクセスが不要ですので、Lambda関数などでRedshiftにアクセスするようなちょっとしたコードをすごく書きやすくなりました。
実は、先日紹介した redshift-data-set-annotator でも使用しています。
github.com/jmoiron/sqlxのような便利なpackageをRedshift Data APIでも活用できるようになるので、とても気に入っています。

さて、このdriverによって、すべての悩みが解決したかというと実はそうでもないところがあります。
Redshift Data APIには BatchExecuteStatement という1回のAPIコールで複数のSQLを実行するものが用意されており、こちらのAPIを利用した形のトランザクションには対応しています。
しかし、Go言語のdatabase/sqlのインタフェースでトランザクションを取り扱う場合は、トランザクションのスコープの中で逐次的にSQLが実行できるような形になっています。

package main

import (
    "context"
    "database/sql"
    "log"
    "os"
    "os/signal"

    _ "github.com/lib/pq"
)

func main() {
    db, err := sql.Open("postgres", "postgres://user:pass@<cluster>.<id>.ap-northeast-1.redshift.amazonaws.com:5439/dev")
    if err != nil {
        log.Fatalln("can not open", err)
    }
    defer db.Close()
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
    defer stop()

    tx, err := db.BeginTx(ctx, nil)
    if err != nil {
        log.Fatalln("tx failed", err)
    }
    row := tx.QueryRowContext(ctx, `SELECT rondom() < 0.5 as result`)

    var result bool
    if err := row.Scan(&result); err != nil {
        tx.Rollback()
        log.Fatalln("query failed", err)
    }
    if !result {
        tx.Rollback()
        log.Fatalln("result is false")
    }
    if _, err := tx.ExecContext(ctx, `INSERT INTO history SELECT getdate()`); err != nil {
        tx.Rollback()
        log.Fatalln("exec failed", err)
    }
    if err := tx.Commit(); err != nil {
        log.Fatalln("commit failed", err)
    }
}

上記のようなコードをRedshift Data APIのBatchExecuteStatementで実現するのが現状では困難となっています。(isolationを厳密に考えなければ似たようなことは可能です。)
ですので、 redshift-data-sql-driver ではそもそもトランザクションの機能をサポートしませんでした。
Redshift Data APIでBatchExecuteStatementを使うようなトランザクションの処理が必要であれば、AWS SDK for Goを使うことになります。

※ 余談ですが、自分の場合は書き込み系の処理はすべてdbtに寄せているので、Go言語のアプリケーションから参照するのは読み込みのみのことが多いです。そのため、トランザクションの機能をサポートしなくても困ってないというのもあります。

おわりに

AWS Dev Dayのセッションを見て、『言語やフレームワークの標準的なインタフェースでRedshift Data APIを使えるとめっちゃ便利やん!』という気づきを得て、redshift-data-sql-driver というGo言語のsql driverを開発したという話でした。
Go言語を使用していて、Redshift Data APIを使いたいという、またまたニッチな用途であるということは承知しておりますが、便利だなと思った方は是非お使いください。

カヤックでは、便利なライブラリに興味があるエンジニアも募集しています。

hubspot.kayac.com