Indexer des documents dans Elasticsearch avec la librairie elasticsearch-php

Dans l'article précédent, je montre comment gérer les index Elasticsearch avec la librairie elasticsearch-php. Cette fois, je vais présenter comment indexer des documents dans Elasticsearch en utilisant cette librairie.

Ajout d'un document

Un document dans Elasticsearch est représenté sous la forme d'un JSON. Son contenu doit respecter la déclaration du mapping dans l'index où il sera stocké. Tout document doit avoir un ID, il est préférable qu'il soit explicitement fournis au moment de l'indexation. La librairie elasticsearch-php propose la méthode index() qui est simple à utiliser.

<?php
declare(strict_types=1);
 
namespace Metfan\LibSearch\Index;
 
use Metfan\LibSearch\Client\ClientBuilderInterface;
 
class Indexer
{
    public function __construct(private ClientBuilderInterface $clientBuilder)
    {
    }
 
    public function index(string $index, $documentId, array $body): void
    {
        $client = $this->clientBuilder->build();
        $client->index(
            [
                'index' => $index,
                'id' => $documentId,
                'body' => $body,
            ]
        );
    }
}
 
 

Suppression d'un document

Parfois il est nécessaire de supprimer un document d'un index Elasticsearch. Une fois de plus la librairie elasticsearch-php propose une méthode dédiée facile à utiliser pour y arriver. Il est nécessaire de spécifier l'index qui contient le document et l'id du document.

    public function deindex(string $index, int|string $documentId): void
    {
        $client = $this->clientBuilder->build();
        $client->delete(
            [
                'index' => $index,
                'id' => $documentId,
            ]
        );
    }

Indexation synchrone

Quand on a peu de documents avec une indexation très rapide comme le cas de ce blog, une centaine de document et moins d'une minute de traitement on peut se simplifier la vie en ayant une indexation synchrone. Cela permet de cumuler les actions comme créer un nouvel index et switcher l'alias à la fin de l'indexation.

<?php
 
declare(strict_types=1);
 
namespace Metfan\LibSearch\Command;
 
use App\PostProvider;
use Metfan\LibSearch\Index\IndexCreator;
use Metfan\LibSearch\Index\Indexer;
use Metfan\LibSearch\Index\IndexSwitcher;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
 
class IndexingCommand extends Command
{
    public function __construct(
        private PostProvider $postProvider,
        private Indexer $indexer,
        private IndexCreator $indexCreator,
        private IndexSwitcher $indexSwitcher
    ) {
        parent::__construct();
    }
 
    protected function configure(): void
    {
        $this->setName('metfan:es:index-sync')
            ->setDescription('Index documents in ES')
            ->addOption(
                'reset-index',
                null,
                InputOption::VALUE_NONE,
                'Create a new index before indexing and switch alias after indexing.'
            );
    }
 
    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $progressBar = new ProgressBar($output);
        if ($input->getOption('reset-index')) {
            $indexName = $this->indexCreator->createIndex();
            $output->writeln(sprintf('<info>New index: %s created.</info>', $indexName));
        }
 
        $articles = $this->postProvider->findAll();
        $progressBar->start(count($articles));
 
        foreach ($articles as $article) {
            $this->indexer->index($indexName, $article->getId(), $article->toArray());
            $progressBar->advance();
        }
 
        $progressBar->finish();
 
        if ($input->getOption('reset-index') && isset($indexName)) {
            $this->indexSwitcher->switchIndex($indexName);
            $output->writeln(sprintf(
                '<info>Alias switched to index: %s.</info>', 
                $indexName)
            );
        }
 
        return Command::SUCCESS;
    }
}
 

J'aurais pu optimiser le traitement en utilisant la méthode bulk() de la librairie elasticsearch-php pour indexer plusieurs document en une fois et ainsi réduire le nombre de requête HTTP. Cela m'obligerait à construire une requête un peu plus complexe et surtout à contrôler la taille de la requête envoyé à Elasticsearch pour ne pas dépasser la limite ce qui ajouterait de la complexité. Dans mon contexte je trouve que cela n'en vaut pas la peine.

Indexation asynchrone

Quand on a beaucoup de documents et/ou que le temps d'indexation est long, il faut passer par de l'asynchrone si l'on souhaite tout ré-indexer ou que la mise à jour en temps réel n'est pas faisable. Dans ce cas j'utilise un jeu de 3 commandes.

Ma première commande me permet de mettre dans une queue les documents que je souhaite indexer. J'ai pour habitude de créer un message par document pour faciliter le retry en cas de problème et je n'y met que l'ID du document à indexer ou à supprimer. Je ne vais pas détailler la solution asynchrone, ce n'est pas le sujet de cet article, libre à vous d'utiliser le backend qui vous souhaitez (rabbitmq, sqs, .....) avec la librairie qu'il vous plait (swarrot, symfony/messenger...)

<?php
declare(strict_types=1);
 
namespace Metfan\LibSearch\Command;
 
use App\BasicPublisher;
use App\ArticleMessage;
use App\PostProvider;
use Metfan\LibSearch\Index\IndexCreator;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
 
class AsyncIndexingCommand extends Command
{
    public function __construct(
        private PostProvider $postProvider,
        private BasicPublisher $publisher,
        private IndexCreator $indexCreator
    ) {
        parent::__construct();
    }
 
    protected function configure(): void
    {
        $this
            ->setName('metfan:es:index-async')
            ->setDescription('Index doc in elasticsearch')
            ->addOption(
                'id', 
                'i', 
                InputOption::VALUE_REQUIRED, 
                'id of the real item to index', 
                null
            )
            ->addOption(
                'file', 
                'f', 
                InputOption::VALUE_REQUIRED, 
                'A file containing 1 ID per line', 
                null
            )
            ->addOption('all', 'a', InputOption::VALUE_NONE, 'index all items')
            ->addOption('offset', 'o', InputOption::VALUE_REQUIRED, 'Offset', '0')
            ->addOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit', '100')
            ->addOption(
                'create-index',
                null,
                InputOption::VALUE_NONE,
                'Create a new index before indexing.'
            );
    }
 
    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        if ($input->getOption('create-index')) {
            $indexName = $this->indexCreator->createIndex();
            $output->writeln(sprintf('<info>New index: %s created.</info>', $indexName));
        }
 
        $nbMessage = 0;
        foreach ($this->retrieveIdsToIndex($input) as $id) {
            $this->publisher->publishMessage(
                new ArticleMessage($id, $indexName ?? null), 
                'article_queue'
            );
            $nbMessage++;
        }
        $output->writeln(sprintf(
            '<comment>Published <info>%d</info> message(s) to process.</comment>', 
            $nbMessage
        ));
 
        return Command::SUCCESS;
    }
 
    private function retrieveIdsToIndex(InputInterface $input): iterable
    {
        if (null !== $objectId = $input->getOption('id')) {
            yield (int) $objectId;
        } elseif (null !== $file = $input->getOption('file')) {
            $handle = new \SplFileObject($file);
 
            foreach ($handle as $line) {
                if (!is_string($line)) {
                    return;
                }
                $line = trim($line);
                if (!empty($line)) {
                    yield (int) $line;
                }
            }
        } elseif (true === $input->getOption('all')) {
            yield from $this->postProvider->findIds(
                (int) $input->getOption('offset'),
                (int) $input->getOption('limit')
            );
        }
    }
}
 

J'ai plusieurs options qui me permettent de déclencher l'indexation d'un seul élément à partir de son id, de plusieurs éléments à partir d'un fichier contenant une liste d'id et de tout en prenant en compte de la pagination. Cela me permet de répondre à tous les cas que j'ai pu rencontrer.
J'ai également une option pour créer un nouvel index, dans ce cas je l'ajoute à l'ID dans le message que j'envoie (ici j'utilise RabbitMQ et Swarrot).

Maintenant que j'ai publié les éléments que je souhaite indexer sous la forme de message dans mon backend asynchrone, il ne me reste plus qu'a écrire un consumer pour faire le travail d'indexation.

<?php
declare(strict_types=1);
 
namespace App\Consumer;
 
use App\ConsumerHandlerInterface;
use App\ArticleMessage;
use App\Message;
use App\PostProvider;
use Metfan\LibSearch\Index\Indexer;
 
class PostIndexerHandler implements ConsumerHandlerInterface
{
    public function __construct(
        private PostProvider $itemProvider,
        private Indexer $indexer,
        private string $indexAlias
    ) {
    }
 
    public function process(Message $message): void
    {
        if (!$message instanceof ArticleMessage) {
            return;
        }
 
        try {
            $item = $this->itemProvider->findById($message->articleId);
            $this->indexer->index(
                $message->indexName ?? $this->indexAlias,
                $item->getId(),
                $item->toArray()
            );
        } catch (NoResultException) {
            $this->indexer->deindex(
                $message->indexName ?? $this->indexAlias,
                $message->articleId
            );
        }
    }
}

Mon consumer (basé sur Swarrot) index un élément s'il est trouvé dans le provider sinon le supprime de l'index. Dans ce context je prefère éviter d'utiliser la fonction bulk() qui rendrait difficile la gestion des erreurs et le retry s'il y avait plusieurs documents à traiter dans le message.

Si vous choisissez l'indexation asynchrone, il reste le switch d'index à faire. Il est possible de le gérer automatiquement à la fin du traitement de la queue mais ce sera pour un prochain article.

Voici donc tout le code nécessaire pour gérer le cycle de vie d'un document dans un index Elasticsearch avec la librairie elasticsearch-php. J'ai mis à disposition tout le code ainsi que des commandes Symfony pour l'utiliser sur github.

Ajouter un commentaire