先日のPHPカンファレンスやPHPカンファレンス関西、buildersconでお話しした内容を元にして、
Laravel(PHP)を使って分析処理の簡単な実装や、
ミドルウェアを組み合わせた分散処理の実装を紹介します。
本ブログのサンプルアプリケーションは下記になりますので、
コードやミドルウェアなどを参照ください。
Laravelとkafka Connect、Elasticsearchの組み合わせ
Apache Kafkaを使ったスケーラブルなアプリケーションの入門編です。
レコード量が多い複雑なコンテンツのデータや検索要件、Like検索など、
RDBMSの不得意な分野などを対応することも多いかと思いますが、
RDBMSとElasticsearchを併用しKafkaで複雑さを吸収して、
アプリケーションをスケールさせるようにしてみましょう。
データベースのテーブル設計時に想定されるデータモデリングと、
サービスが成長することによってデータの複雑化と、検索の複雑さが増し、
ビジネス要件がより高度になっていきます。
これらを解消するために全文検索を導入するなどが考えられますが、
このサンプルではそういったデータストレージが異なる場合でも、
CQRS+ESライクに問題を解決するヒントになればと思います。
サンプルアプリケーションでは /fulltext
配下のurlが該当します。
Kafka Producerの実装
いわゆるMessage Queueのメッセージ送信を実装します。
LaravelのQueueを想像されるかもしれませんが、フレームワークのQueueではなく、
こういった処理は原則他の言語でも利用できるようにする必要がありますので、
フレームワークの知識をメッセージに混入させることなく実装するようにします。
Producerの実装自体は難しいものではありません。
<?php declare(strict_types=1); namespace App\Foundation\Producer; use Psr\Log\LoggerInterface; use RdKafka\Conf; use RdKafka\Producer as KafkaProducer; use RdKafka\Producer as RdkafkaProducer; use RdKafka\ProducerTopic; /** * Class Producer */ class Producer { /** @var RdkafkaProducer */ protected $producer; /** @var string */ protected $topic = 'default'; /** @var null|LoggerInterface for optional logger */ protected $logger; /** @var string */ protected $brokers; /** @var array */ protected $options; /** * Producer constructor. * * @param string $topic * @param string $brokers * @param array $options */ public function __construct(string $topic, string $brokers, array $options = []) { $this->topic = $topic; $this->brokers = $brokers; $this->options = $options; } /** * @param AbstractProduceDefinition $definition */ public function produce(AbstractProduceDefinition $definition) { $kafkaTopic = $this->producerTopic(); $kafkaTopic->produce(RD_KAFKA_PARTITION_UA, 0, $definition->payload()); if ($this->logger instanceof LoggerInterface) { $this->logger->info($definition->payload()); } $this->producer->poll(0); } /** * @param LoggerInterface $logger */ public function setLogger(LoggerInterface $logger) { $this->logger = $logger; } /** * @return ProducerTopic */ protected function producerTopic(): ProducerTopic { $this->producer = $this->producer(); $this->producer->setLogLevel(LOG_DEBUG); $this->producer->addBrokers($this->brokers); return $this->producer->newTopic($this->topic); } /** * @return KafkaProducer */ protected function producer(): KafkaProducer { $conf = new Conf(); foreach ($this->options as $key => $item) { $conf->set($key, $item); } return new KafkaProducer($conf); } }
Command 実装
ここで指すCommandとは、artisan commandのアプリケーションではなく、
CQRSのCommandとQuery、データの書き込みと読み込みを分離して実装します。
以下は登録処理のコントローラクラスです。
<?php declare(strict_types=1); namespace App\Http\Controllers\Fulltext; use App\Events\SinkConnect; use App\Http\Controllers\Controller; use App\Http\Requests\FulltextRequest; use Illuminate\Contracts\Events\Dispatcher; use Illuminate\Http\RedirectResponse; use Illuminate\Routing\Redirector; /** * Class RegisterAction */ final class RegisterAction extends Controller { /** @var Dispatcher */ private $dispatcher; /** @var Redirector */ private $redirector; /** * RegisterAction constructor. * * @param Dispatcher $dispatcher * @param Redirector $redirector */ public function __construct(Dispatcher $dispatcher, Redirector $redirector) { $this->dispatcher = $dispatcher; $this->redirector = $redirector; } /** * @param FulltextRequest $request * * @return RedirectResponse */ public function __invoke(FulltextRequest $request): RedirectResponse { // 登録処理後に実行されるevent $this->dispatcher->dispatch( new SinkConnect(strval($request->get('fulltext'))) ); return $this->redirector->route('fulltext.index'); } }
サンプルではデータ書き込み(RDBMS)は省略していますが、
上記のコードの __invoke メソッドに記述するだけです。
何か登録処理が行われたものとして、その後にEventを発動しています。
このコントローラクラスでは、フロントで送信された文章(ブログの記事など)を保存する、
という機能を提供していますが、Kafkaへの通知はアプリケーションの要件ではなく、
システム都合の処理になりますので、ここのクラスではなく、Event Handlerが処理を行う様になっています。
このEventクラスはシンプルなクラスです。
<?php declare(strict_types=1); namespace App\Events; /** * Class SinkConnect */ final class SinkConnect { /** @var string */ private $note; /** * SinkConnect constructor. * * @param string $note */ public function __construct(string $note) { $this->note = $note; } /** * @return string */ public function note(): string { return $this->note; } }
このEventを処理するHandlerクラスがKafkaのメッセージを送信します。
Event Handler
SinkConnectイベントに反応して処理を行うクラスを実装します。
Handlerクラスから最初に紹介したKafka Producerを利用できる様に次の様に実装しています。
<?php declare(strict_types=1); namespace App\DataAccess; use App\Foundation\Producer\Producer; use App\Foundation\Producer\AbstractProduceDefinition; /** * Class AbstractProduce */ abstract class AbstractProduce { /** @var Producer */ protected $producer; /** * MessageProduceUsecase constructor. * * @param Producer $producer */ public function __construct(Producer $producer) { $this->producer = $producer; } /** * @param AbstractProduceDefinition $analyze */ public function run(AbstractProduceDefinition $analyze) { $this->producer->produce($analyze); } }
後述する分析処理にもKafkaを利用するためこのクラスを継承して利用します。
<?php declare(strict_types=1); namespace App\DataAccess; /** * Class RegisterProduce */ final class RegisterProduce extends AbstractProduce { }
kafkaのtopicを処理によって切り替えるため、クラスを別クラスとして切り出しています。
このクラスを利用するHandlerクラスは以下のようになります。
<?php declare(strict_types=1); namespace App\Listeners; use Ramsey\Uuid\Uuid; use App\Events\SinkConnect; use App\DataAccess\RegisterProduce; use App\Definition\FulltextDefinition; /** * Class SinkConnectHandler */ final class SinkConnectHandler { /** @var RegisterProduce */ protected $producer; /** * SinkConnectHandler constructor. * * @param RegisterProduce $producer */ public function __construct(RegisterProduce $producer) { $this->producer = $producer; } /** * @param SinkConnect $connect */ public function handle(SinkConnect $connect) { $this->producer->run( new FulltextDefinition(Uuid::uuid4()->toString(), $connect->note()) ); } }
送信が可能な状態になりましたが、接続情報がないため、
これをServiceProviderを使って設定値を外から渡します。
設定値はconfig/kafka.phpに配置します
<?php return [ 'topics' => [ 'fulltext.register' => [ 'topic' => 'fulltext.register', 'brokers' => '127.0.0.1', 'options' => [ 'socket.blocking.max.ms' => '1', 'queue.buffering.max.ms' => '1', 'queue.buffering.max.messages' => '1000', 'client.id' => 'testingClient', ], ], ] ];
この設定値を使い、RegisterProduceクラスに与えます
<?php class AppServiceProvider extends ServiceProvider { /** * Register any application services. * * @return void */ public function register() { $this->app->when(RegisterProduce::class) ->needs(Producer::class) ->give(function (Application $app) { $kafkaConfig = $app['config']->get('kafka'); $topic = $kafkaConfig['topics']['fulltext.register']; $producer = new Producer($topic['topic'], $topic['brokers'], $topic['options']); $producer->setLogger($app['log']); return $producer; }); } }
これでEventが発動するとHandlerクラスが反応し、Kafkaへメッセージが送信されます。
格納されるKafkaのtopicは fulltext.register
です。
ここで送信されるメッセージには、uuidと、フォームで入力された文字列となります。
Kafka Connectの設定
送信されたメッセージをElasticsearchに送信するためにKafka Connect Elasticsearchの設定を行います。
これはConfluentをインストールすると含まれますので、追加で入れる必要はありません。
サンプルアプリケーションにはConfluentやElasticsearchも含まれています
ここでは一台で動かすためStandaloneモードで起動させます。
connect-standalone.propertiesファイルを作成して、以下の内容を記述します。
bootstrap.servers=192.168.10.10:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 rest.port=8093
kafka connectを利用するにはconverterがいくつか種類があり、代表的なものはavroですが、
ここではjsonConverterを指定します。(avro利用例は公式を参照ください)
Elasticsearch Connector — Confluent Platform 3.3.0 documentation
このファイルを /etc/schema-registry/connect-standalone.properties
として設置します。
*サンプルは設置済みです。
次にkafka connectで直接elasticsearchに接続して、インデックスにデータを追加する設定を記述します。
elasticsearch-connect.propertiesファイルを作成しkafkaのtopic情報などを記述します。
name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=fulltext.register key.ignore=true connection.url=http://192.168.10.10:9200 schema.ignore=true type.name=kafka-connect
上記の内容で、elasticsearchにKafkaのtopicと同じく、fulltext.register
index が作成されます。
documentの_typeは kafka-connect
となります。
これを /etc/kafka-connect-elasticsearch/elasticsearch-connect.properties
として設置します。
次にKafkaに上記のKafka Connectを登録します。
sudo connect-standalone -daemon /etc/schema-registry/connect-standalone.properties /etc/kafka-connect-elasticsearch/elasticsearch-connect.properties sudo confluent load elasticsearch-sink
daemonでKafka Connect Elasticsearchを起動し、 ConfluentにConnectorをLoadして登録します。
この状態で、Laravelで実装した処理を実行するとElasticsearchのindexに挿入されていきます。
Query実装
CQRSのQuery、データの読み込みを実装します。
LaravelのElasticsearchパッケージなどでも簡単に操作ができます。
サンプルではElasticsearchのphpクライアントライブラリを利用して実装しています。
<?php declare(strict_types=1); namespace App\DataAccess; use Acme\Blog\Entity\EntryCriteria; use App\Foundation\Elasticsearch\ElasticseachClient; /** * Class FulltextIndex */ class FulltextIndex implements EntryCriteria { /** @var ElasticseachClient */ protected $client; /** @var string */ protected $index = 'fulltext.register'; /** * FulltextIndex constructor. * * @param ElasticseachClient $client */ public function __construct(ElasticseachClient $client) { $this->client = $client; } /** * @return array */ public function all(): array { $result = $this->client->client()->search([ "index" => $this->index, 'type' => 'kafka-connect', "body" => [ "query" => [ "match_all" => new \stdClass(), ], ], ]); $map = []; if (count($result)) { foreach($result['hits']['hits'] as $hit) { $map[] = $hit['_source']; } } return $map; } public function queryBy(string $string) { // TODO: Implement queryBy() method. } }
Elasticsearchに問い合わせた結果は、
src/Entry (ドメイン)配下でRepositoryなどを経由してControllerを介してhtml出力されます。
詳細な実装はサンプルコードを参照ください
<?php declare(strict_types=1); namespace App\Http\Controllers\Fulltext; use Acme\Blog\Specification\ActiveEntrySpecification; use Acme\Blog\Usecase\RetrieveEntryUsecase; use App\Http\Controllers\Controller; use App\Http\Responders\HtmlResponder; use Illuminate\Http\Response; /** * Class IndexAction */ final class IndexAction extends Controller { /** @var ActiveEntrySpecification */ private $specification; /** @var RetrieveEntryUsecase */ private $usecase; /** * IndexAction constructor. * * @param ActiveEntrySpecification $specification * @param RetrieveEntryUsecase $usecase */ public function __construct( ActiveEntrySpecification $specification, RetrieveEntryUsecase $usecase ) { $this->specification = $specification; $this->usecase = $usecase; } /** * @param HtmlResponder $responder * * @return Response */ public function __invoke(HtmlResponder $responder): Response { $responder->template('fulltext.index'); return $responder->emit([ 'list' => $this->usecase->run($this->specification), ]); } }
アプローチ
このサンプルではKafka Connectを使ってデータの分散を行い、
RDBMSとElasticsearchの責務を分割した例を紹介しました。
小さなアプリケーションではオーバスペックな実装ですが、
確実なデータと検索に特化したミドルウェアを組み合わせて、データ上のパフォーマンスと堅実さを提供することができます。
また分散することで、どちらかに障害が発生した場合でもアプリケーションの動作を担保したり、
障害復旧などにも活かすことができるかと思います。
次回は物理的に分散したデータベースをPrestoで集約させて、Kafka Consumer経由でElasticsearchに格納する実装例を紹介します。