接続が途切れないストリーミングサーバの無停止アップデートを実装してみました - Mirrativ Tech Blog

Mirrativ Tech Blog

株式会社ミラティブの開発者(バックエンド,iOS,Android,Unity,機械学習,インフラ, etc.)によるブログです

接続が途切れないストリーミングサーバの無停止アップデートを実装してみました

こんにちは ハタです。
最近 SO_REUSEADDR / SO_REUSEPORT を使ったストリーミング配信サーバの無停止アップデート(Hot Deploy)を実装してみたので紹介したいなと思います

ことの経緯

ミラティブでは以前から何度か紹介したとおり自前の配信基盤設備を持っています。
配信基盤のミドルウェアも内製であり、機能追加やライブラリの更新などがあるたびにミドルウェアのバージョンアップ作業(メンテナンス)も自社で実施しています

ストリーミング配信サーバといっても、何か特別な事はなく一般的なHTTPのAPIサーバと同じくGoのプロセスが動いていてリクエストを捌いているという仕組みは同じです。
大きな違いはリクエストのライフサイクルがとても長い(ステートフルな接続)こと、一度配信を始めたら途切れない接続(持続接続)であることに違いがあります

f:id:octu0:20220131204000p:plain
HTTPとストリーミングの違い(イメージ)

特にリアルタイムなライブ配信の場合は、一度配信を始めたストリーミングはデータの途切れがないように連続してデータが流れてきます
一般的なHTTPサーバであればリクエストとリクエストの合間を使ってサーバプロセスの切り替え(バージョンアップ)が行えるのですが、ストリーミング配信サーバの場合はそうはいきません

f:id:octu0:20220131204030p:plain
HTTPとストリーミングライフサイクルの違い

ミラティブのストリーミング配信サーバでは、リクエストの切れ目を作り出すためにサービスアウト処理を行なうようにして新規のリクエストを停止し、リクエストが履けきった後にサーバプロセスの切り替えを行なっていました。
一般的にドレイニングと呼ばれる作業なのですが、ライブ配信の場合ドレイニングが完了するまでの時間が長く、一度の大量のサーバのプロセスを切り替えるには大量のリソースを必要とするため、ゆっくり時間をかけながらサーバのプロセスを切り替える運用を行なっていました

f:id:octu0:20220131204051p:plain
ストリーミングサーバのドレイニングには時間がかかる

新規機能追加や機能修正を行った場合でも変更にかかる時間がとても長いため、修正までの影響時間が伸びてしまったり(十分に安定するまで全展開するのを躊躇うことも)していたため
HTTPサーバのようにサーバのプロセスをHOT SWAPできるようにしたいなと思い、仕組みを開発することにしました

HTTPサーバによる Hot Deploy の仕組み

まずは HTTP サーバの Hot Deploy の仕組みをおさらいしてみます
一般的には下記 2パターンの仕組みで切り替えているものと思います

  1. LoadBalancerによるリクエストの切り替え
  2. Supervisorプロセスによるリクエストの切り替え

どちらもLoadBalancer方式ではあるのですが、前者はAWSいうところのALBGCPでいうところのGCLBなどインスタンスの外側からコントロールするものと、
後者のServer::StarterUnicornなどOS内の仕組みを利用したプロセス切り替えでわけてみました

1の LoadBalancerによるリクエストの切り替え では、リクエストの宛先となるサーバプール(TargetGroup)を切り替えることで、Hot Deploy(A/BだったりRollingだったりしながら)を行うことでプロセスの切り替えを行うことができます

f:id:octu0:20220131204220p:plain
LoadBalancer方式での切り替えのイメージ

2の Supervisorプロセスによるリクエストの切り替え では、OS内に振り分けたれたリクエストをSupervisorプロセスが新旧のリクエスト処理するプロセスに振り分けることで切り替えを行なうことができます

f:id:octu0:20220131204239p:plain
Supervisor方式での切り替えのイメージ

ミラティブでもHTTPサーバの古く(今でも稼働中のものもあります)は Server::Starter 利用した Hot Deploy の仕組みを利用しており
新しいコードのDeployと共にサーバプロセスの切り替えを行なってリリースをしています
今回はストリーミング配信サーバのため割愛しますが、ミラティブの自動デプロイの仕組みなどは別途またどこかで書きたいと思います

ストリーミング配信サーバへの応用

ミラティブのストリーミング配信サーバでも似たような仕組みを実装すれば良さそうです
LoadBalancer方式でも実装できそうですが、既存の物理LoadBalancerにはいろいろ制約があり構成を変えるとなると大掛かりになりそうです、いろいろ検討していく中で SO_REUSEADDR/SO_REUSEPORT によるポート再利用 が求めているものに近そうでした

とてもわかりやすく図解でServer::Starterの仕組みが書いてあるページ があったのでそちらを参照してもらいたいのですが、ソケットの共有ができようになると比較的簡単に実装できそうです

SO_REUSEADDR/SO_REUSEPORT を使った実装例

まずは挙動を確認するため、簡単なping/pong実装を書いてみました

サーバ側の実装はこのような感じにしました

package main

import (
    "encoding/binary"
    "errors"
    "flag"
    "io"
    "log"
    "net"
    "os"
    "syscall"

    "golang.org/x/sys/unix"
)

func Listen(addr string) (net.Listener, error) {
    tcp, err := net.ResolveTCPAddr("tcp", addr)
    if err != nil {
        return nil, err
    }

    fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)
    if err != nil {
        return nil, err
    }
    defer syscall.Close(fd)

    sock := &syscall.SockaddrInet4{Port: tcp.Port}

    // SO_REUSEADDR/SO_REUSEPORT を有効にして起動
    if err := syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, unix.SO_REUSEADDR, 1); err != nil {
        return nil, err
    }
    if err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1); err != nil {
        return nil, err
    }

    if err := syscall.Bind(fd, sock); err != nil {
        return nil, err
    }

    if err := syscall.Listen(fd, syscall.SOMAXCONN); err != nil {
        return nil, err
    }

    f := os.NewFile(uintptr(fd), "/tmp/test")
    defer f.Close()

    listener, err := net.FileListener(f)
    if err != nil {
        return nil, err
    }
    return listener, nil
}

// server
func main() {
    var version uint
    flag.UintVar(&version, "ver", 1, "")
    flag.Parse()

    addr := "[0.0.0.0]:10000"
    l, err := Listen(addr)
    if err != nil {
        panic(err)
    }

    log.Printf("server listen %s", addr)
    for {
        conn, err := l.Accept()
        if err != nil {
            panic(err)
        }

        log.Printf("%s connected", conn.RemoteAddr().String())
        go pong(conn, uint16(version))
    }
}

func pong(conn net.Conn, version uint16) {
    defer func() {
        log.Printf("%s closed", conn.RemoteAddr().String())
        conn.Close()
    }()

    buf := make([]byte, 0xff)
    for {
        n, err := conn.Read(buf)
        if err != nil {
            if errors.Is(err, io.EOF) {
                return
            }
            if errors.Is(err, syscall.ECONNRESET) {
                return
            }

            log.Fatalf("client read error %+v", err)
            return
        }

        counter := binary.BigEndian.Uint64(buf[:n])

        packet := make([]byte, 10) // version(2) + counter(8)
        binary.BigEndian.PutUint16(packet[0:2], version)
        binary.BigEndian.PutUint64(packet[2:10], counter+1)

        if _, err := conn.Write(packet); err != nil {
            log.Fatalf("client write error %+v", err)
            return
        }
    }
}

server 側は、クライアントから uint64 の counter を increment して、version番号とともにクライアントに送り返すだけの実装です

SO_REUSEADDR/SO_REUSEPORT を使わない場合は、同一のアドレスとポートを Listen しようとすると address already in use のようなエラーが発生するため、同時に2つのプロセスで起動できませんが、 SO_REUSEADDR/SO_REUSEPORT を利用している場合は起動することができます

f:id:octu0:20220131204545g:plain
2つ同時に起動する様子

次にクライアントの実装です、といってもこちらは特に特別な事をする必要はなく通常のクライアント実装です

package main

import (
    "encoding/binary"
    "fmt"
    "log"
    "net"
)

// client
func main() {
    conn, err := net.Dial("tcp", "127.0.0.1:10000")
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    ping(conn, 0)
}

func ping(conn net.Conn, counter uint64) {
    buf := make([]byte, 0xff)
    for {
        req := make([]byte, 8)
        binary.BigEndian.PutUint64(req, counter)

        if _, err := conn.Write(req); err != nil {
            log.Fatalf("server write error %+v", err)
            return
        }

        n, err := conn.Read(buf)
        if err != nil {
            log.Fatalf("server read error %+v", err)
            return
        }

        packet := buf[:n]
        serverVersion := binary.BigEndian.Uint16(packet[0:2])
        serverCounter := binary.BigEndian.Uint64(packet[2:10])

        fmt.Printf("\rver:%02d counter:%20d\n", serverVersion, serverCounter)
        if serverCounter != (counter + 1) {
            log.Fatal("server not incremental value")
            return
        }

        counter += 1
    }
}

クライアント側ではサーバのバージョンと、ping/pongのカウンターを表示するようにしています
この状態で新しいサーバプロセスを組み込んでみた時の挙動を見てみます

f:id:octu0:20220131204921g:plain
サーバプロセスを複数起動した時の動き

問題なく -ver 1 で起動したプロセスと -ver 2 で起動したプロセスで ping/pong が実行できていると思います

ただ上記動画にあるとおり、acceptしたsocketがどちらに届くのかはコントロールできず Listenしているどちらかのプロセスに振り分けられるようです*1*2

このままでは、当初の目的である Hot Deploy のように新しいサーバプロセスに新規接続を振り分けることができません。
そこで次のように実装し、古いプロセスに対して SIGUSR1 のシステムコールを呼び出し net.Listener の Close() するようにサーバのコードを少し修正しました

// server
func main() {
    var version uint
    var oldPid int
    flag.UintVar(&version, "ver", 1, "")
    flag.IntVar(&oldPid, "old-pid", 1, "")
    flag.Parse()

    if 1 < oldPid {
        p, err := os.FindProcess(oldPid)
        if err != nil {
            panic(err)
        }
        p.Signal(syscall.SIGUSR1) // 引数の old-pid が指定されてたら SIGUSR1 を送る
    }

    addr := "[0.0.0.0]:10000"
    l, err := Listen(addr)
    if err != nil {
        panic(err)
    }

    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGUSR1)
    defer stop()

    closed := false
    go func() {
        <-ctx.Done() // SIGUSR1 が呼ばれるまで待機

        closed = true
        l.Close()
        log.Printf("listener closed")
    }()

    pid := os.Getpid()
    wg := &sync.WaitGroup{}
    log.Printf("server listen %s pid=%d", addr, pid)
    for {
        conn, err := l.Accept()
        if err != nil {
            if closed {
                break
            }
            panic(err)
        }

        log.Printf("%s connected", conn.RemoteAddr().String())
        wg.Add(1)
        go func() {
            defer wg.Done()

            pong(conn, uint16(version))
        }()
    }
    wg.Wait()
    log.Println("bye")
}

これで新プロセスが起動した際に、旧プロセスに対して SIGUSR1 を呼び出すことで新プロセスだけで socket を受け取れるようになりました(下記)

f:id:octu0:20220131205103g:plain
シグナルを渡すようにしたもの

既に実行中の接続(Acceptされたconn)も終了を待ってから終了することで 既存の接続も影響が無さそうです

Hot Deploy の組み込み

さてここまでの実装で大体のことは実装できました
ミラティブの場合はサーバプロセスはコンテナ(Docker)経由で起動させているため、Supervisor プロセスは コンテナ経由で実行できるように変更しています
具体的には

  • コンテナのNetworkMode は Host にしている(run --net=host)
  • シグナルの呼び出しはコンテナ経由のKill (kill --signal USR1)

ここに コンテナイメージの Pull (Run) を組み合わせて

  1. 新しいコンテナを起動する(pull & start)
  2. 古いコンテナと新しいコンテナどちらも新規接続を受け付けている状態にする
  3. 古いコンテナに対して kill --signal USR1 を送る
  4. 古いコンテナは Listen を閉じる(新規接続は新しいコンテナに行く)
  5. SIGUSR1 を受け取った古いコンテナは既存の接続が全て閉じられるまで待機
  6. 新規接続が全てなくなったら古いコンテナはプロセスの終了をする

という実装を行い、各種オーケストレーションツールと連携しやすいようにこれらを行える API 化も実施しました
細かな実装例は割愛しますが、ほぼ上記の通りの実装になってます

Hot Deploy 実装時に気をつけたこと

いざ上記の仕組みでバージョンアップを行う場合、新旧1世代以上のプロセスが同居するため、一時的にメモリ使用量が増えることが考えられます

より具体的には leaky buffer のような実装で *bytes.Buffer などを pool している箇所や WorkerPool のような実装 は新旧プロセスでpoolするため積極的に開放しなければ使用量が減りせん
2世代のプロセスが同居する場合は約2倍近くまで使用量が増える可能性があります

これらは、プールサイズを小さくしたりオンデマンドで変えるようにしたり、何かしらのトリガーで開放するように実装してリリースしました

f:id:octu0:20220131205456p:plain
メモリ使用量の推移

このグラフはリリース前後のストリーミング配信サーバ全体のメモリ使用量の推移です
それまでは大体40%前後で推移していたものが、メモリプールの効率化の修正をリリース後は20%程度で推移するようになっています
(新旧プロセスのプロセスでグラフがガタガタして見えづらくてすいません)

※なおCPU 使用率に関しては、そもそもサーバが受け取る処理量が増えてるわけではないので今回の仕組みを導入する場合もその後もあまり変わりませんでした

その後

リリースまでの道のりは意外と長く、ライブラリによっては net.Listener をそのまま受け取ってくれる実装になっていないものもあったり(ライブラリ内でListenしてる)するため、それはそれで別の仕組みに切り出したりが必要でしたが無事リリースできました

f:id:octu0:20220131205515p:plain
リリース後のひと言

We are hiring!

サーバの安定稼働を支える無停止アップデートやさらなるリリースの効率化に興味があるエンジニアを募集中です!
我こそはと思った方、応募お待ちしております

www.mirrativ.co.jp

speakerdeck.com

*1:https://lwn.net/Articles/542629/https://lwn.net/Articles/542718/

*2:もしかしたら振り分けを制御できる方法があるのかもしれないですが自力ではわからなかったです