エムスリーエンジニアリンググループ AI・機械学習チームでソフトウェアエンジニアをしている中村(po3rin) です。検索とGoが好きです。
今回はBigQueryでSendGrid Activityをセキュアに管理する仕組みを構築したのでその紹介をします。SendGridを使い始めた方や、今後メール送信データを活用していきたい開発者の方は必見です。
- SendGridのActivityを管理したい
- SendGridでActivityを保管するアーキテクチャ
- SendGridのEvent Webhookの基本
- SendGridのEvent Webhookをセキュアに受け取る仕組み
- カテゴリの付与
- メール送信後にすぐにイベントをチェックする
- まとめ
SendGridのActivityを管理したい
弊社はSendGridでメール送信をしています。正しくメールが送信できたか、開封されたかのActivityはSendGridのダッシュボードのActivity Feedから確認できます。
この情報を使うことで、メールの一斉送信作業時に、全員に正しく送れたかをチェックしたり、メール送信できなかった宛先を確認することが可能です。
しかし、数千人にメールを送る場合、すべての人間に正しくメールが送れたかを目で確認していくのは大変な作業です。そこで自動化の出番と思いきや、APIでActivityを取るためには「30 Days Additional Email Activity History」というアドオンを購入する必要があります。しかもこれは過去30日のActivityしか取れません。
Email Activity Feed APIについて docs.sendgrid.com
30 Days Additional Email Activity History sendgrid.com
弊社ではとあるサービスで契約したお客様のメールアドレスのリストから自動でメールを送信する仕組みがあるのですが、メール送信した後にすぐにBouncesやBlockedといったStatusをチェックして、送信できなかった宛先をすぐにビジネス側に共有する必要がありました。また、送信済みユーザーに誤って同じメールを再送してしまう危険を回避するために送信済みを後からチェックするなどの仕組みも必要です。
以上の要望から、Activityをリアルタイムかつ永続的に保管する必要がありました。
SendGridでActivityを保管するアーキテクチャ
今回のアーキテクチャを図にすると下記のようになります。
メール送信や、ユーザーのメール開封などが発生したタイミングで、Webhookで通知する機能があるので、今回はその機能を利用しました。
Webhookを受け取ったAPIがBigQueryに保存します。こうすることでメール送信完了のチェックや、BouncesやBlockedといったStatusの監視をBigQuery上で行えるようにできます。
実装のポイントを下記の章で簡単に紹介していきます。
SendGridのEvent Webhookの基本
Event Webhookの設定はコンソールから行えます。[Settings] > [Mail Settings] > [Event Webhooks]から設定します。
詳細な設定方法は公式の記事をご覧ください。
設定できたら、Eventが発生すると下記のようなJSONが飛んでくるようになります(JSONの内容は公式記事から引用)。
[ { "email":"john.doe@sendgrid.com", "timestamp": 1337197600, "smtp-id":"<4FB4041F.6080505@sendgrid.com>", "sg_event_id":"sendgrid_internal_event_id", "sg_message_id":"sendgrid_internal_message_id", "event": "processed" } ]
SendGridのEvent Webhookをセキュアに受け取る仕組み
SendGridのEvent Webhookを受け取るAPIは公開されている必要があるので、URLがバレてしまうと、外部の人間が適当なEventを保存するようにリクエストすることができます。
SendGridでEvent Webhookのデータを保護する方法はいくつかあるのですが、今回はBasic認証 + Signed Event Webhook RequestsでAPIを保護するようにしました。Basic認証はいつもどおりAPI側で設定しますが、Signed Event Webhook Requestsは少し特殊な処理なので、ここで紹介します。
Signed Event Webhook Requests
Signed Event Webhook Requestsを利用すると、SendGridから送られてきたRequestであることを検証できるようになります。
Signed Event Webhook Requestsを有効にすると公開鍵が取得できます。(秘密鍵はSendGrid側で保管され、開発者が確認できません)。
Event発生時に、Webhookで投げるリクエストボディとタイムスタンプがSHA-256を用いてハッシュ化され、ハッシュ値と秘密鍵からデジタル署名が作成されます(アルゴリズムは楕円曲線デジタル署名アルゴリズム(Elliptic Curve Digital Signature Algorithm; ECDSA))。
Webhookを受け取る側ではリクエストボディとタイムスタンプと公開鍵からECDSA署名を作成しリクエストで送られてくるデジタル署名を検証します。デジタル署名とタイムスタンプはそれぞれHTTPヘッダーX-Twilio-Email-Event-Webhook-Signatur
とX-Twilio-Email-Event-Webhook-Timestamp
に付与されます。
少し実装が面倒臭そうですが、Pythonなど主要な言語では、これらの検証するヘルパーが公式から用意されているのでこちらを利用できます。下記の関数の実装で実際に署名の検証ができます。PythonのWebフレームワークであるFastAPIで利用する想定の実装になっています。
from fastapi import Request from sendgrid.helpers.eventwebhook import EventWebhook def signature_verification(request: Request, body_str: str, public_key: str): ew = EventWebhook(public_key=public_key) signature = request.headers.get('X-Twilio-Email-Event-Webhook-Signature') timestamp = request.headers.get('X-Twilio-Email-Event-Webhook-Timestamp') return ew.verify_signature(payload=body_str, signature=signature, timestamp=timestamp)
注意点として、リクエストボディは投げられてくるままの生の文字列が必要です。1回listやdataclassなどにparseした状態で受け取ってしまうと、そこから元のJSONのフォーマットに復元するのは面倒なので、APIでは一旦そのままの文字列で受け取るようにしましょう。
下記コードは上記で実装したsignature_verification
を利用して、Basic認証 + Signed Event Webhook RequestsでAPIを保護するサンプルです。
from fastapi import APIRouter, HTTPException, Depends, Request, Response, status from fastapi.security import HTTPBasic, HTTPBasicCredentials security = HTTPBasic() security_depends = Depends(security) def auth_basic(credentials: HTTPBasicCredentials): correct_username = secrets.compare_digest(credentials.username, SENDGRID_EVENT_API_USER) correct_password = secrets.compare_digest(credentials.password, SENDGRID_EVENT_API_PASS) if not (correct_username and correct_password): logger.error('Incorrect id or password') raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail='Incorrect id or password', headers={'WWW-Authenticate': 'Basic'}, ) def basic_auth_user(credentials: HTTPBasicCredentials = security_depends): auth_basic(credentials) basic_auth_user_depends = Depends(basic_auth_user) router = APIRouter() @router.post('/event', response_model=CustomResponse) async def event(request: Request, response: Response, _=basic_auth_user_depends): body = await request.body() body_str = body.decode('utf-8') if signature_verification(request=request, body_str=body_str, public_key=SENDGRID_EVENT_WEBHOOK_PUBLIC_KEY): logger.error('incorrect signature') raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Incorrect signature') # BQに保存する処理が続く... app = FastAPI() app.include_router(router)
カテゴリの付与
Eventを収集する際には、どのようなタイプのメールを送ったかをカテゴリとして付与しておくと便利です。SendGridではメールに設定したカテゴリはそのままリストとしてEvent Webhookのリクエストに付与されます。
弊社では数パターンのメールを自動で送信しているので、そのユーザーにどのメールを送ったかを確認するのにこの機能は必須でした。
カテゴリを付与してメール送信する際には下記のように実装しています。category setterにリストを渡してあげるだけです。これでメール送信時にカテゴリが付与できます。
class SendGridClient: def __init__(self, api_key: str, from_email: str): self.client = SendGridAPIClient(api_key=api_key) self.from_email = Email(from_email) def send(self, to_email: str, subject: str, content: str, category: list[str]): mail = Mail(from_email=self.from_email, to_emails=To(email=to_email), subject=subject, html_content=Content('text/html', content)) mail.category = [Category(c) for c in category] self.client.send(mail)
これでBigQuery上での分析がより便利になります。
メール送信後にすぐにイベントをチェックする
これでイベントを収集できるようになったので、メール送信スクリプトでメールを送信した後にそのままイベントをチェックして、Slackに通知するということもできます。そのサンプルの実装が下記になります。
import time from datetime import datetime from google.cloud import bigquery from data_manager.auth import get_email_hash class EventClient: def __init__(self, project_id: str) -> None: self.bq_client = bigquery.Client(project=project_id) def wait_for_events(self, email_list: list[str], category: str, timestamp: datetime) -> dict[str, int]: """ 送信したメールのstatusが落ち着くまで待機する。 """ start_time = time.time() # statusが安定するまでの時間はメールの数によって変動する。メールが少なくても5分は必要な時がある。 # 1000件の場合、90分がtimeout timeout = max(len(email_list) / 10 * 90, 5 * 60) print('waiting sendgrid event ...') while True: time.sleep(30) if time.time() - start_time > timeout: raise TimeoutError('event check timeout') query = """ SELECT DISTINCT sg_event_id, timestamp, event, reason, FROM `XXXXXX.sendgrid_event` WHERE EXISTS(SELECT * FROM UNNEST(category) AS x WHERE x = @category) AND timestamp > @timestamp AND email_hash IN UNNEST(@email_list) AND event in ('delivered', 'deferred', 'bounce', 'dropped') """ job_config = bigquery.QueryJobConfig(query_parameters=[ bigquery.ScalarQueryParameter('category', 'STRING', category), bigquery.ScalarQueryParameter('timestamp', 'TIMESTAMP', timestamp), bigquery.ArrayQueryParameter('email_list', 'STRING', email_list), ]) df = self.bq_client.query(query, job_config).to_dataframe() if len(df) < len(email_list): continue counts = df['event'].value_counts(sort=True) return counts.to_dict()
SendGridがEvent Webhookを送信してBQに反映するまでに少し時間がかかるので、sleepを設定してイベントが収集されるまで待つ必要があります。また、もし想定より長い時間かかっていた場合は、タイムアウトしてメソッドの呼び出し側でタイムアウトしたことをSlackでで通知します。
送信したemailやcategoryを使って、イベントから先ほど送信したメールをクエリします。取得するイベントはdelivered,deferred,bounce,dropped
としています。配信系イベントのデフォルトパラメータのタイプは先ほど挙げた4つのイベントに加えてprocessed
がありますが、processed
はdelivered
とbounce
にそれぞれ分岐するので、processed
は結果の数として省いています。
イベントは公式から引用した下記の図のような構成になっています。
他のイベントに関する詳細は下記ドキュメントをご覧ください。
まとめ
SendGridでイベントを収集する方法を共有しました。イベントを収集した結果を使ってメール送信後にすぐにeventが落ち着くまで待機するコードも紹介しました。
メール送信はかなりデリケートな作業なので、このようなデータ収集や監視は重要な要素になります。今後のメール送信をより安定して行うための基盤を開発していきたいと思います。
We are hiring !!
AI・機械学習チームではSendGrid Eventなど、データの流れを整備するのが好きなエンジニアを募集中です。カジュアル面談でより詳しくお話ししましょう!