vendor/shopware/elasticsearch/Admin/AdminSearchRegistry.php line 129

  1. <?php declare(strict_types=1);
  2. namespace Shopware\Elasticsearch\Admin;
  3. use Doctrine\DBAL\ArrayParameterType;
  4. use Doctrine\DBAL\Connection;
  5. use Doctrine\DBAL\Exception;
  6. use OpenSearch\Client;
  7. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  8. use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
  9. use Shopware\Core\Framework\Event\ProgressFinishedEvent;
  10. use Shopware\Core\Framework\Event\ProgressStartedEvent;
  11. use Shopware\Core\Framework\Log\Package;
  12. use Shopware\Core\Framework\Uuid\Uuid;
  13. use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
  14. use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
  15. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  16. use Symfony\Component\Messenger\Attribute\AsMessageHandler;
  17. use Symfony\Component\Messenger\MessageBusInterface;
  18. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  19. /**
  20.  * @internal
  21.  *
  22.  * @final
  23.  */
  24. #[Package('system-settings')]
  25. #[AsMessageHandler(handlesAdminSearchIndexingMessage::class)]
  26. class AdminSearchRegistry implements EventSubscriberInterface
  27. {
  28.     /**
  29.      * @var array<string, mixed>
  30.      */
  31.     private readonly array $indexer;
  32.     /**
  33.      * @var array<string, mixed>
  34.      */
  35.     private readonly array $config;
  36.     /**
  37.      * @param array<AbstractAdminIndexer>|\Traversable<AbstractAdminIndexer> $indexer
  38.      * @param array<string, mixed> $config
  39.      * @param array<string, mixed> $mapping
  40.      */
  41.     public function __construct(
  42.         $indexer,
  43.         private readonly Connection $connection,
  44.         private readonly MessageBusInterface $queue,
  45.         private readonly EventDispatcherInterface $dispatcher,
  46.         private readonly Client $client,
  47.         private readonly AdminElasticsearchHelper $adminEsHelper,
  48.         array $config,
  49.         private readonly array $mapping
  50.     ) {
  51.         $this->indexer = ($indexer instanceof \Traversable) ? iterator_to_array($indexer) : $indexer;
  52.         if (isset($config['settings']['index'])) {
  53.             if (\array_key_exists('number_of_shards'$config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
  54.                 unset($config['settings']['index']['number_of_shards']);
  55.             }
  56.             if (\array_key_exists('number_of_replicas'$config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
  57.                 unset($config['settings']['index']['number_of_replicas']);
  58.             }
  59.         }
  60.         $this->config $config;
  61.     }
  62.     public function __invoke(AdminSearchIndexingMessage $message): void
  63.     {
  64.         $indexer $this->getIndexer($message->getEntity());
  65.         $documents $indexer->fetch($message->getIds());
  66.         $this->push($indexer$message->getIndices(), $documents$message->getIds());
  67.     }
  68.     public static function getSubscribedEvents(): array
  69.     {
  70.         return [
  71.             EntityWrittenContainerEvent::class => [
  72.                 ['refresh', -1000],
  73.             ],
  74.         ];
  75.     }
  76.     public function iterate(AdminIndexingBehavior $indexingBehavior): void
  77.     {
  78.         if (!$this->adminEsHelper->getEnabled()) {
  79.             return;
  80.         }
  81.         /** @var array<string> $entities */
  82.         $entities array_keys($this->indexer);
  83.         if ($indexingBehavior->getOnlyEntities()) {
  84.             $entities array_intersect($entities$indexingBehavior->getOnlyEntities());
  85.         } elseif ($indexingBehavior->getSkipEntities()) {
  86.             $entities array_diff($entities$indexingBehavior->getSkipEntities());
  87.         }
  88.         $indices $this->createIndices($entities);
  89.         foreach ($entities as $entityName) {
  90.             $indexer $this->getIndexer($entityName);
  91.             $iterator $indexer->getIterator();
  92.             $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
  93.             while ($ids $iterator->fetch()) {
  94.                 // we provide no queue when the data is sent by the admin
  95.                 if ($indexingBehavior->getNoQueue()) {
  96.                     $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices$ids));
  97.                 } else {
  98.                     $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices$ids));
  99.                 }
  100.                 $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
  101.             }
  102.             $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
  103.         }
  104.         $this->swapAlias($indices);
  105.     }
  106.     public function refresh(EntityWrittenContainerEvent $event): void
  107.     {
  108.         if (!$this->adminEsHelper->getEnabled() || !$this->isIndexedEntityWritten($event)) {
  109.             return;
  110.         }
  111.         if ($this->adminEsHelper->getRefreshIndices()) {
  112.             $this->refreshIndices();
  113.         }
  114.         /** @var array<string, string> $indices */
  115.         $indices $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
  116.         if (empty($indices)) {
  117.             return;
  118.         }
  119.         foreach ($this->indexer as $indexer) {
  120.             $ids $event->getPrimaryKeys($indexer->getEntity());
  121.             if (empty($ids)) {
  122.                 continue;
  123.             }
  124.             $documents $indexer->fetch($ids);
  125.             $this->push($indexer$indices$documents$ids);
  126.         }
  127.     }
  128.     /**
  129.      * @return AbstractAdminIndexer[]
  130.      */
  131.     public function getIndexers(): iterable
  132.     {
  133.         return $this->indexer;
  134.     }
  135.     public function getIndexer(string $name): AbstractAdminIndexer
  136.     {
  137.         $indexer $this->indexer[$name] ?? null;
  138.         if ($indexer) {
  139.             return $indexer;
  140.         }
  141.         throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found'$name)]);
  142.     }
  143.     private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
  144.     {
  145.         foreach ($this->indexer as $indexer) {
  146.             $ids $event->getPrimaryKeys($indexer->getEntity());
  147.             if (!empty($ids)) {
  148.                 return true;
  149.             }
  150.         }
  151.         return false;
  152.     }
  153.     /**
  154.      * @param array<string, string> $indices
  155.      * @param array<string, array<string|int, string>> $data
  156.      * @param array<string> $ids
  157.      */
  158.     private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
  159.     {
  160.         $alias $this->adminEsHelper->getIndex($indexer->getName());
  161.         if (!isset($indices[$alias])) {
  162.             return;
  163.         }
  164.         $toRemove array_filter($ids, static fn (string $id): bool => !isset($data[$id]));
  165.         $documents = [];
  166.         foreach ($data as $id => $document) {
  167.             $documents[] = ['index' => ['_id' => $id]];
  168.             $documents[] = \array_replace(
  169.                 ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '''text' => ''],
  170.                 $document
  171.             );
  172.         }
  173.         foreach ($toRemove as $id) {
  174.             $documents[] = ['delete' => ['_id' => $id]];
  175.         }
  176.         $arguments = [
  177.             'index' => $indices[$alias],
  178.             'body' => $documents,
  179.         ];
  180.         $result $this->client->bulk($arguments);
  181.         if (\is_array($result) && !empty($result['errors'])) {
  182.             $errors $this->parseErrors($result);
  183.             throw new ElasticsearchIndexingException($errors);
  184.         }
  185.     }
  186.     /**
  187.      * @param array<string> $entities
  188.      *
  189.      * @throws Exception
  190.      *
  191.      * @return array<string, string>
  192.      */
  193.     private function createIndices(array $entities): array
  194.     {
  195.         $indexTasks = [];
  196.         $indices = [];
  197.         foreach ($entities as $entityName) {
  198.             $indexer $this->getIndexer($entityName);
  199.             $alias $this->adminEsHelper->getIndex($indexer->getName());
  200.             $index $alias '_' time();
  201.             if ($this->client->indices()->exists(['index' => $index])) {
  202.                 continue;
  203.             }
  204.             $indices[$alias] = $index;
  205.             $this->create($indexer$index$alias);
  206.             $iterator $indexer->getIterator();
  207.             $indexTasks[] = [
  208.                 'id' => Uuid::randomBytes(),
  209.                 '`entity`' => $indexer->getEntity(),
  210.                 '`index`' => $index,
  211.                 '`alias`' => $alias,
  212.                 '`doc_count`' => $iterator->fetchCount(),
  213.             ];
  214.         }
  215.         $this->connection->executeStatement(
  216.             'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
  217.             ['entities' => $entities],
  218.             ['entities' => ArrayParameterType::STRING]
  219.         );
  220.         foreach ($indexTasks as $task) {
  221.             $this->connection->insert('admin_elasticsearch_index_task'$task);
  222.         }
  223.         return $indices;
  224.     }
  225.     private function refreshIndices(): void
  226.     {
  227.         $entities = [];
  228.         $indexTasks = [];
  229.         foreach ($this->indexer as $indexer) {
  230.             $alias $this->adminEsHelper->getIndex($indexer->getName());
  231.             if ($this->client->indices()->existsAlias(['name' => $alias])) {
  232.                 continue;
  233.             }
  234.             $index $alias '_' time();
  235.             $this->create($indexer$index$alias);
  236.             $entities[] = $indexer->getEntity();
  237.             $iterator $indexer->getIterator();
  238.             $indexTasks[] = [
  239.                 'id' => Uuid::randomBytes(),
  240.                 '`entity`' => $indexer->getEntity(),
  241.                 '`index`' => $index,
  242.                 '`alias`' => $alias,
  243.                 '`doc_count`' => $iterator->fetchCount(),
  244.             ];
  245.         }
  246.         $this->connection->executeStatement(
  247.             'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
  248.             ['entities' => $entities],
  249.             ['entities' => ArrayParameterType::STRING]
  250.         );
  251.         foreach ($indexTasks as $task) {
  252.             $this->connection->insert('admin_elasticsearch_index_task'$task);
  253.         }
  254.     }
  255.     private function create(AbstractAdminIndexer $indexerstring $indexstring $alias): void
  256.     {
  257.         $mapping $indexer->mapping([
  258.             'properties' => [
  259.                 'id' => ['type' => 'keyword'],
  260.                 'textBoosted' => ['type' => 'text'],
  261.                 'text' => ['type' => 'text'],
  262.                 'entityName' => ['type' => 'keyword'],
  263.                 'parameters' => ['type' => 'keyword'],
  264.             ],
  265.         ]);
  266.         $mapping array_merge_recursive($mapping$this->mapping);
  267.         $body array_merge(
  268.             $this->config,
  269.             ['mappings' => $mapping]
  270.         );
  271.         $this->client->indices()->create([
  272.             'index' => $index,
  273.             'body' => $body,
  274.         ]);
  275.         $this->createAliasIfNotExisting($index$alias);
  276.     }
  277.     /**
  278.      * @param array<string, array<array<string, mixed>>> $result
  279.      *
  280.      * @return array<array{reason: string}|string>
  281.      */
  282.     private function parseErrors(array $result): array
  283.     {
  284.         $errors = [];
  285.         foreach ($result['items'] as $item) {
  286.             $item $item['index'] ?? $item['delete'];
  287.             if (\in_array($item['status'], [200201], true)) {
  288.                 continue;
  289.             }
  290.             $errors[] = [
  291.                 'index' => $item['_index'],
  292.                 'id' => $item['_id'],
  293.                 'type' => $item['error']['type'] ?? $item['_type'],
  294.                 'reason' => $item['error']['reason'] ?? $item['result'],
  295.             ];
  296.         }
  297.         return $errors;
  298.     }
  299.     private function createAliasIfNotExisting(string $indexstring $alias): void
  300.     {
  301.         if ($this->client->indices()->existsAlias(['name' => $alias])) {
  302.             return;
  303.         }
  304.         $this->putAlias($index$alias);
  305.     }
  306.     /**
  307.      * @param array<string, string> $indices
  308.      */
  309.     private function swapAlias(array $indices): void
  310.     {
  311.         foreach ($indices as $alias => $index) {
  312.             if (!$this->client->indices()->existsAlias(['name' => $alias])) {
  313.                 $this->putAlias($index$alias);
  314.                 return;
  315.             }
  316.             $current $this->client->indices()->getAlias(['name' => $alias]);
  317.             if (!isset($current[$index])) {
  318.                 $this->putAlias($index$alias);
  319.             }
  320.             unset($current[$index]);
  321.             $current array_keys($current);
  322.             foreach ($current as $value) {
  323.                 $this->client->indices()->delete(['index' => $value]);
  324.             }
  325.         }
  326.     }
  327.     private function putAlias(string $indexstring $alias): void
  328.     {
  329.         $this->client->indices()->refresh([
  330.             'index' => $index,
  331.         ]);
  332.         $this->client->indices()->putAlias(['index' => $index'name' => $alias]);
  333.     }
  334. }