Index document in Elasticsearch using PHP library: elasticsearch-php
In previous post, I show how to manage indices in Elasticsearch using elasticsearch-php library. In this post, I will show how to index document with this library.
Index a document
In Elasticsearch, a document is a data using JSON format. Its content should be compliant with index mapping. Every document should have an ID and it's recommended to be explicit about the ID of the document when you index it. Elasticsearch-php library have index() method to do the job.
<?php declare(strict_types=1); namespace Metfan\LibSearch\Index; use Metfan\LibSearch\Client\ClientBuilderInterface; class Indexer { public function __construct(private ClientBuilderInterface $clientBuilder) { } public function index(string $index, $documentId, array $body): void { $client = $this->clientBuilder->build(); $client->index( [ 'index' => $index, 'id' => $documentId, 'body' => $body, ] ); } }
Delete a document
Sometimes a document must be removed from an index. Once again, elasticsearch-php library have a dedicated method for that: delete(). You only need document's ID and index name.
public function deindex(string $index, int|string $documentId): void { $client = $this->clientBuilder->build(); $client->delete( [ 'index' => $index, 'id' => $documentId, ] ); }
Synchronous indexation
If you have few documents and indexation is very fast, like with this blog less than 200 documents and less than one minute to index everything, you can use synchronous indexation. It's easier to do and allow you to create a new index and switch alias at the end in one command.
<?php declare(strict_types=1); namespace Metfan\LibSearch\Command; use App\PostProvider; use Metfan\LibSearch\Index\IndexCreator; use Metfan\LibSearch\Index\Indexer; use Metfan\LibSearch\Index\IndexSwitcher; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Helper\ProgressBar; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; class IndexingCommand extends Command { public function __construct( private PostProvider $postProvider, private Indexer $indexer, private IndexCreator $indexCreator, private IndexSwitcher $indexSwitcher ) { parent::__construct(); } protected function configure(): void { $this->setName('metfan:es:index-sync') ->setDescription('Index documents in ES') ->addOption( 'reset-index', null, InputOption::VALUE_NONE, 'Create a new index before indexing and switch alias after indexing.' ); } protected function execute(InputInterface $input, OutputInterface $output): int { $progressBar = new ProgressBar($output); if ($input->getOption('reset-index')) { $indexName = $this->indexCreator->createIndex(); $output->writeln(sprintf('<info>New index: %s created.</info>', $indexName)); } $articles = $this->postProvider->findAll(); $progressBar->start(count($articles)); foreach ($articles as $article) { $this->indexer->index($indexName, $article->getId(), $article->toArray()); $progressBar->advance(); } $progressBar->finish(); if ($input->getOption('reset-index') && isset($indexName)) { $this->indexSwitcher->switchIndex($indexName); $output->writeln(sprintf( '<info>Alias switched to index: %s.</info>', $indexName) ); } return Command::SUCCESS; } }
I could have optimized things using bulk() method of elasticsearch-php library to index multiple document at once and have less HTTP request. But I would need to build a more complex query and be aware of limiting the size, so more complexity. In my case I think it's not worth it.
Asynchronous indexation
When there are a lot of document to index and/or the indexing time is long, you have to go through asynchronous if you want to re-index everything or real-time updating is not feasible. To achieve that I'm using 3 commands.
The first command allows me to push in a queue all documents I want to index. I'm always creating one message by document to ease failure and retry and my message only contains ID of the document to index or remove. I'll not speak of the asynchronous solution here, there are many options (rabbitmq, sqs...) and many way to use it (swarrot, symfony messenger...)
<?php declare(strict_types=1); namespace Metfan\LibSearch\Command; use App\BasicPublisher; use App\ArticleMessage; use App\PostProvider; use Metfan\LibSearch\Index\IndexCreator; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; class AsyncIndexingCommand extends Command { public function __construct( private PostProvider $postProvider, private BasicPublisher $publisher, private IndexCreator $indexCreator ) { parent::__construct(); } protected function configure(): void { $this ->setName('metfan:es:index-async') ->setDescription('Index doc in elasticsearch') ->addOption( 'id', 'i', InputOption::VALUE_REQUIRED, 'id of the real item to index', null ) ->addOption( 'file', 'f', InputOption::VALUE_REQUIRED, 'A file containing 1 ID per line', null ) ->addOption('all', 'a', InputOption::VALUE_NONE, 'index all items') ->addOption('offset', 'o', InputOption::VALUE_REQUIRED, 'Offset', '0') ->addOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit', '100') ->addOption( 'create-index', null, InputOption::VALUE_NONE, 'Create a new index before indexing.' ); } protected function execute(InputInterface $input, OutputInterface $output): int { if ($input->getOption('create-index')) { $indexName = $this->indexCreator->createIndex(); $output->writeln(sprintf('<info>New index: %s created.</info>', $indexName)); } $nbMessage = 0; foreach ($this->retrieveIdsToIndex($input) as $id) { $this->publisher->publishMessage( new ArticleMessage($id, $indexName ?? null), 'article_queue' ); $nbMessage++; } $output->writeln(sprintf( '<comment>Published <info>%d</info> message(s) to process.</comment>', $nbMessage )); return Command::SUCCESS; } private function retrieveIdsToIndex(InputInterface $input): iterable { if (null !== $objectId = $input->getOption('id')) { yield (int) $objectId; } elseif (null !== $file = $input->getOption('file')) { $handle = new \SplFileObject($file); foreach ($handle as $line) { if (!is_string($line)) { return; } $line = trim($line); if (!empty($line)) { yield (int) $line; } } } elseif (true === $input->getOption('all')) { yield from $this->postProvider->findIds( (int) $input->getOption('offset'), (int) $input->getOption('limit') ); } } }
So now I have some messages in a queue. I have to write a consumer to build the document and index it in Elasticsearch.
<?php declare(strict_types=1); namespace App\Consumer; use App\ConsumerHandlerInterface; use App\ArticleMessage; use App\Message; use App\PostProvider; use Metfan\LibSearch\Index\Indexer; class PostIndexerHandler implements ConsumerHandlerInterface { public function __construct( private PostProvider $itemProvider, private Indexer $indexer, private string $indexAlias ) { } public function process(Message $message): void { if (!$message instanceof ArticleMessage) { return; } try { $item = $this->itemProvider->findById($message->articleId); $this->indexer->index( $message->indexName ?? $this->indexAlias, $item->getId(), $item->toArray() ); } catch (NoResultException) { $this->indexer->deindex( $message->indexName ?? $this->indexAlias, $message->articleId ); } } }
This consumer based on Swarrot indexes a document if it's found or removes it from index if not found. Once again I don't use bulk() method to ease failed and retry.
If you want to use asynchronous indexation, you still have to switch the alias by yourself at the end of the process in case of new index creation. It's possible to manage it automatically but it will be for another post.
This is all the code you need to manage document lifecycle in Elasticsearch's index. Like previous post, all the code is available on github with Symfony Command to use it.
Add a comment