Kafka Consumer + Prestodb例
上記のエントリの続編です。
その1
ではApache Kafkaを組み合わせて、
データの分散やアプリケーション自体をスケールするアプローチを紹介しました。
今回は分散したサービスのデータの集約をKafkaとPrestoを組み合わせて、
ログ分析の基盤作りの例を紹介します。
アプリケーションのログをfluentd, elasticsearchで収集し、サービズ作りに活かすケースは多いと思います。
今回の例ではログにサービス固有の情報、物理的に異なるデータベースを集め、
ログデータをKPIなどに活かせる形にし、elasticseachに格納します。
あるところでデータ更新などが行われた場合でも、
Kafkaを軸にメッセージを受信することでelasticsearchのドキュメント更新なども簡単に行えます。
Prestoについて
prestoの利用方法や設定については下記を参考、本エントリのサンプルリポジトリを参照ください。
今回は上記のエントリで紹介したものに加え、prestoからKafkaに接続します。
アプリケーションログをKafkaに送信
fluentdなどを使うのが一般的ですが、
今回の例ではLaravelで作られたアプリケーションにアクセスした場合に、
ユーザーがアクセスしたURLとユーザー名をKafkaへ通知します。
Log送信イベント
アプリケーションログ送信をEventとします。
<?php declare(strict_types=1); namespace App\Events; /** * Class Loggable */ final class Loggable { /** @var string */ private $uri; /** @var string[] */ private $names = [ 'presto', 'kafka', 'laravel', 'elasticsearch', ]; /** * Loggable constructor. * * @param string $uri */ public function __construct(string $uri) { $this->uri = $uri; } /** * @return string */ public function uri(): string { return $this->uri; } /** * @return string */ public function name(): string { return $this->names[array_rand($this->names)]; } }
アクセスしたURLを送信するイベントですが、
例としてEvent Handler処理時にランダムでユーザー名を決定するようになっています。
ログデータの送信自体はKafka Producerが担当しますが、
これは前回紹介した Kafka Producer抽象クラスを継承したクラスです。
サービスプロバイダーで別の値を注入するために分割して利用します。
<?php declare(strict_types=1); namespace App\DataAccess; /** * Class LogProduce */ final class LogProduce extends AbstractProduce { }
前回のエントリの設定値を混ぜると、 config/kafka.php
は次のようになります。
*Consumer実装も含まれますので、こちらで紹介します。
<?php return [ 'topics' => [ 'analyze.action' => [ 'topic' => 'analyze.action', 'brokers' => '127.0.0.1', 'options' => [ 'socket.blocking.max.ms' => '1', 'queue.buffering.max.ms' => '1', 'queue.buffering.max.messages' => '1000', 'client.id' => 'testingClient', ], ], 'fulltext.register' => [ 'topic' => 'fulltext.register', 'brokers' => '127.0.0.1', 'options' => [ 'socket.blocking.max.ms' => '1', 'queue.buffering.max.ms' => '1', 'queue.buffering.max.messages' => '1000', 'client.id' => 'testingClient', ], ], ], 'consumer' => [ 'brokers' => '127.0.0.1', 'options' => [ 'heartbeat.interval.ms' => '10000', 'session.timeout.ms' => '30000', 'topic.metadata.refresh.interval.ms' => '60000', 'topic.metadata.refresh.sparse' => 'true', 'log.connection.close' => 'false', 'group.id' => 'testingConsumer', ], ], ];
サービスプロバイダも前回のものを混ぜると以下のようになります。
同じクラスをコンストラクタインジェクションで指定しますが、
具象クラスは違うクラスを与えます (Laravelの Contextual Bindingを利用)
<?php public function register() { $this->app->when(LogProduce::class) ->needs(Producer::class) ->give(function (Application $app) { $kafkaConfig = $app['config']->get('kafka'); $topic = $kafkaConfig['topics']['analyze.action']; $producer = new Producer($topic['topic'], $topic['brokers'], $topic['options']); $producer->setLogger($app['log']); return $producer; }); $this->app->when(RegisterProduce::class) ->needs(Producer::class) ->give(function (Application $app) { $kafkaConfig = $app['config']->get('kafka'); $topic = $kafkaConfig['topics']['fulltext.register']; $producer = new Producer($topic['topic'], $topic['brokers'], $topic['options']); $producer->setLogger($app['log']); return $producer; }); }
次に上記のEvent、LoggableクラスのHandlerクラスです
<?php declare(strict_types=1); namespace App\Listeners; use App\DataAccess\LogProduce; use App\Definition\AnalysisDefinition; use App\Events\Loggable; use Ramsey\Uuid\Uuid; /** * Class LoggableHandler */ final class LoggableHandler { /** @var LogProduce */ protected $producer; /** * LoggableHandler constructor. * * @param LogProduce $producer */ public function __construct(LogProduce $producer) { $this->producer = $producer; } /** * @param Loggable $loggable */ public function handle(Loggable $loggable) { $this->producer->run( new AnalysisDefinition(Uuid::uuid4()->toString(), $loggable->uri(), $loggable->name()) ); } }
これでLoggable イベントが発動するとKafkaに アクセスしたURL
、ユーザー名
、 識別のためのUUID
が送信されます。
この例でKafkaの analyze.action
topicにメッセージが格納されます。
アプリケーションのURL全てを対象とするため、
サンプルコードではGlobal Middlewareとして利用しています。
<?php declare(strict_types=1); namespace App\Http\Middleware; use Closure; use App\Events\Loggable; use Illuminate\Http\Request; use Illuminate\Contracts\Events\Dispatcher; /** * Class SendLogger */ final class SendLogger { /** @var Dispatcher */ private $dispatcher; /** * SendLogger constructor. * * @param Dispatcher $dispatcher */ public function __construct(Dispatcher $dispatcher) { $this->dispatcher = $dispatcher; } /** * @param Request $request * @param Closure $next * * @return mixed */ public function handle(Request $request, Closure $next) { $this->dispatcher->dispatch(new Loggable($request->getUri())); return $next($request); } }
これでアクセス時にログデータとしてKafkaに通知されます。
Kafka Consumer メッセージ受信
次にKafka Consumerを実装します。
これもLaravelのQueueを利用すれば、アプリケーション内で完結することができますが、
アプリケーションをスケールさせる上では、フレームワークの知識をメッセージに含めることはできません。
このため前回引き続きフレームワークのQueueは利用しません。
ConsumerをLaravelに組み込むのは非常に簡単です。
次のように実装することができます。
<?php declare(strict_types=1); namespace App\Foundation\Consumer; use RdKafka\Conf; use RdKafka\Consumer as KafkaConsumer; use RdKafka\Message; use RdKafka\TopicConf; use RdKafka\ConsumerTopic; /** * Class Consumer */ class Consumer { /** @var string */ protected $topic; /** @var \RdKafka\Consumer */ protected $consumer; /** @var int */ protected $partition = 0; /** @var string */ protected $brokers = ''; /** @var array */ protected $configure = []; /** @var int */ protected $offset = RD_KAFKA_OFFSET_STORED; protected $callable; /** * Consumer constructor. * * @param string $brokers * @param array $configure */ public function __construct(string $brokers, array $configure = []) { $this->brokers = $brokers; $this->configure = $configure; } /** * @param string $topic */ public function topic(string $topic) { $this->topic = $topic; } /** * @param int $partition */ public function partition(int $partition) { $this->partition = $partition; } /** * @param int $offset */ public function offset(int $offset) { $this->offset = $offset; } /** * @param Consumable $callable * * @throws \Exception */ public function handle(Consumable $callable) { $topic = $this->consumerTopic(); $topic->consumeStart($this->partition, $this->offset); while (true) { $message = $topic->consume($this->partition, 120 * 10000); if ($message instanceof Message) { switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: call_user_func($callable, $message); $this->outputMessage($message); break; case RD_KAFKA_RESP_ERR__TIMED_OUT: throw new \RuntimeException("time out."); break; default: break; } } } } /** * @param callable $callable */ public function callbackMessage(callable $callable) { $this->callable = $callable; } /** * @param Message $message */ protected function outputMessage(Message $message) { if ($this->callable) { call_user_func_array($this->callable, [$message]); } } /** * @return ConsumerTopic */ protected function consumerTopic(): ConsumerTopic { $this->consumer = $this->consumer(); $this->consumer->addBrokers($this->brokers); return $this->consumer->newTopic($this->topic, $this->topicConf()); } /** * @return TopicConf */ protected function topicConf(): TopicConf { $topicConf = new TopicConf(); $topicConf->set('auto.commit.interval.ms', '100'); $topicConf->set('offset.store.method', 'file'); $topicConf->set('offset.store.path', sys_get_temp_dir()); $topicConf->set('auto.offset.reset', 'smallest'); return $topicConf; } /** * @return KafkaConsumer */ protected function consumer(): KafkaConsumer { $conf = new Conf(); foreach ($this->configure as $key => $item) { $conf->set($key, $item); } return new KafkaConsumer($conf); } }
この例では接続情報を外から与え、handleメソッドに処理をしたいクラスを与えます。
handleメソッドが必要なクラスは、下記のインターフェースを実装したクラスとなります。
<?php declare(strict_types=1); namespace App\Foundation\Consumer; use RdKafka\Message; /** * Interface Consumable */ interface Consumable { /** * @param Message $message * * @return mixed */ public function __invoke(Message $message); }
Kafkaからのメッセージを受信すると、受信メッセージが RdKafka\Message クラスとして利用できます。
Kafka Consumerをartisanコマンドとして実装します。
<?php declare(strict_types=1); namespace App\Console; use App\Foundation\Consumer\Consumable; use App\Foundation\Consumer\Consumer; use Illuminate\Console\Command; use RdKafka\Message; /** * Class ConsumerCommand */ class ConsumerCommand extends Command { /** @var string */ protected $name = 'kafka:consumer'; /** @var string */ protected $description = ''; /** @var Consumer */ protected $consumer; /** @var string */ protected $topic; /** @var Consumable */ protected $consumable; /** * ConsumerCommand constructor. * * @param Consumer $consumer * @param Consumable $consumable * @param string $topic */ public function __construct( Consumer $consumer, Consumable $consumable, string $topic = 'analyze.action' ) { parent::__construct(); $this->consumer = $consumer; $this->consumable = $consumable; $this->topic = $topic; } public function handle() { $this->consumer->topic($this->topic); $this->consumer->callbackMessage(function (Message $message) { $this->info($message->payload); }); $this->consumer->handle($this->consumable); } }
メッセージ受信時にprestoから複数のデータベースを接続し、
elasticsearchに格納するように実装します。
Prestoの設定 - Kafka Schema
先の実装でKafka Producerで、UUID, URL, 名前を送信するようにしました。
格納されるtopicと、topicに格納されるデータをPrestoで指定します。
PrestoにKafkaの情報を設定する場合は、
etc/kafka/table名.json
ファイルを作成して例として以下の内容を記述します。
{ "tableName": "action", "schemaName": "analyze", "topicName": "analyze.action", "message": { "dataFormat": "json", "fields": [ { "name": "uuid", "mapping": "uuid", "type": "VARCHAR" }, { "name": "uri", "mapping": "uri", "type": "VARCHAR" }, { "name": "name", "mapping": "name", "type": "VARCHAR" } ] } }
topic名は、prestoの各データベースへ接続する設定と同じく、
他に各メッセージの型を記述します。
詳細は公式を参照ください。
[https://prestodb.io/docs/current/connector/kafka.html:title]
catalogの指定はMySQLなどと同じです。
サンプルでは etc/catalog/kafka_tests.properties
としています。
connector.name=kafka kafka.nodes=127.0.0.1:9092 kafka.table-names=analyze.action kafka.hide-internal-columns=false
この他の接続設定はサンプルコード等を参考にしてください。
作成後、prestoの再起動等を行なってください。
このサンプルでは、Redisの文字列型レコードと、MySQL、Kafkaを結合します。
複数のデータベースを結合する利用例としてはもちろんですが、
Redisのランキング機能とMySQLの情報を結合し、
人気順の何かしらのデータと、Kafkaに格納されたログを結合することで、
様々なコンテンツデータを組み合わせたログデータの作成、
またはアプリケーションの複雑な機能提供などが行えます。
PrestoをLaravelに組み込む
PrestoはPDOなどの拡張ではなく、HTTPの通信を使って操作しますので、
DatabaseManager、Eloquentを拡張するメリットはあまりなく、
フロントのアプリケーションから常にアドホックに利用するものではなく、
プリペアドステートメントもありません。(以前のバージョンではありましたが現行にはない様です)
PrestoについてのエントリでPHPからの利用方法を紹介していますので、
それをつかってLaravelから利用できる様にします。
<?php declare(strict_types=1); namespace App\Foundation\Presto; use Ytake\PrestoClient\FixData; use Ytake\PrestoClient\ClientSession; use Ytake\PrestoClient\ResultsSession; use Ytake\PrestoClient\StatementClient; /** * Class PrestoClient */ class PrestoClient { /** @var ClientSession */ protected $session; /** * PrestoClient constructor. * * @param ClientSession $session */ public function __construct(ClientSession $session) { $this->session = $session; } /** * @param string $query * * @return array */ public function query(string $query): array { $result = []; $client = new StatementClient($this->session, $query); $resultSession = new ResultsSession($client); $yieldResult = $resultSession->execute()->yieldResults(); /** @var \Ytake\PrestoClient\QueryResult $row */ foreach ($yieldResult as $row) { foreach ($row->yieldData() as $yieldRow) { if ($yieldRow instanceof FixData) { $result[] = $yieldRow; } } } return $result; } }
config、サービスプロバイダは次の通りです。
config/presto.php
<?php return [ 'connections' => [ 'presto_test' => [ 'host' => 'http://127.0.0.1:8080/', 'catalog' => 'default', ], ], ];
サービスプロバイダは次の通りです。
<?php public function register() { $this->app->bind(ClientSession::class, function (Application $app) { $prestoConfig = $app['config']->get('presto'); $connectionConfig = $prestoConfig['connections']['presto_test']; return new ClientSession($connectionConfig['host'], $connectionConfig['catalog']); }); }
Prestoに投げるQueryは次の通りです。
<?php declare(strict_types=1); namespace App\DataAccess; use App\Foundation\Presto\PrestoClient; /** * Class Analysis */ final class Analysis { /** @var PrestoClient */ protected $client; /** * AnalysisRepository constructor. * * @param PrestoClient $client */ public function __construct(PrestoClient $client) { $this->client = $client; } /** * @param string $name * @return array */ public function allBy(string $name): array { $query = "SELECT redttt._key, redttt._value, test_id, test_name, created_at, uri, uuid FROM my_tests.testing.tests AS myttt INNER JOIN red_tests.test.string AS redttt ON redttt._key = myttt.test_name INNER JOIN kafka_tests.analyze.action AS kafkataa ON kafkataa.name = myttt.test_name WHERE myttt.test_name = '{$name}' LIMIT 1"; return $this->client->query($query); } }
メッセージ受信時にユーザー名からRedis, MySQLから該当情報を取得し、ログデータを整形します。
最後にこのログをelasticsearchに格納します。
<?php declare(strict_types=1); namespace App\DataAccess; use App\Foundation\Consumer\Consumable; use App\Foundation\Elasticsearch\ElasticseachClient; use Cake\Chronos\Chronos; use RdKafka\Message; use Ytake\PrestoClient\FixData; /** * Class LoggableConsume */ final class LoggableConsume implements Consumable { /** @var ElasticseachClient */ protected $client; /** @var string */ protected $index = 'log.index'; /** @var Analysis */ private $analysis; /** * LoggableConsume constructor. * * @param Analysis $analysis * @param ElasticseachClient $client */ public function __construct(Analysis $analysis, ElasticseachClient $client) { $this->analysis = $analysis; $this->client = $client; } /** * @param Message $message * * @return void */ public function __invoke(Message $message) { $decode = json_decode($message->payload); /** @var FixData[] $response */ $response = $this->analysis->allBy($decode->name); if (count($response)) { $params = [ 'index' => $this->index, 'type' => 'logs', 'body' => [ '_key' => $response[0]['_key'], '_value' => $response[0]['_value'], 'test_id' => $response[0]['test_id'], 'test_name' => $response[0]['test_name'], 'created_at' => Chronos::now()->toUnixString(), 'uri' => $response[0]['uri'], 'uuid' => $response[0]['uuid'], ], ]; $this->client->client()->index($params); } } }
上記のクラスがメッセージ受信時に作用する様に、 サービスプロバイダでartisanコマンドとして登録します。
<?php declare(strict_types=1); namespace App\Providers; use App\Console\ConsumerCommand; use App\Console\InitRedisCommand; use App\DataAccess\LoggableConsume; use App\Foundation\Consumer\Consumer; use Illuminate\Foundation\Application; use Illuminate\Support\ServiceProvider; /** * Class ConsoleServiceProvider */ class ConsoleServiceProvider extends ServiceProvider { /** @var bool */ protected $defer = true; public function boot() { $this->app->bind('app.command.kafka.consumer', function (Application $app) { return new ConsumerCommand( $app->make(Consumer::class), $app->make(LoggableConsume::class), 'analyze.action' ); }); $this->commands([ 'app.command.kafka.consumer', ]); } /** * @return array */ public function provides() { return [ 'app.command.kafka.consumer', ]; } }
これで php artisan kafka:consumer
で起動しますが、
Kafkaからのメッセージは継続的に流れてくるため、supervisorなどを利用してdaemonとして動作するようにしてください。
実行すると下記の様にメッセージを受信し、prestoを使って結合したデータがelasticsearchに格納されます。
サンプルアプリケーションでは、 /analysis
にアクセスするとこの結果を確認できます。
全体の流れとしては下記の流れになります。
多少大げさな例ですが前回のスケールさせる例と、
今回の分散したデータを集計するのにもKafkaを利用しました。
次回はこのログデータ成形に利用しているtopicを他の言語の処理も同時に走らせて、
リアルタイムに近い結果を高速に返すアプリケーション例を紹介します。