vendor/symfony/messenger/DependencyInjection/MessengerPass.php line 80

  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\DependencyInjection;
  11. use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
  12. use Symfony\Component\DependencyInjection\ChildDefinition;
  13. use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
  14. use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
  15. use Symfony\Component\DependencyInjection\ContainerBuilder;
  16. use Symfony\Component\DependencyInjection\Definition;
  17. use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
  18. use Symfony\Component\DependencyInjection\Exception\RuntimeException;
  19. use Symfony\Component\DependencyInjection\Reference;
  20. use Symfony\Component\Messenger\Attribute\AsMessageHandler;
  21. use Symfony\Component\Messenger\Handler\HandlerDescriptor;
  22. use Symfony\Component\Messenger\Handler\HandlersLocator;
  23. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  24. use Symfony\Component\Messenger\Handler\MessageSubscriberInterface;
  25. use Symfony\Component\Messenger\TraceableMessageBus;
  26. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  27. /**
  28.  * @author Samuel Roze <samuel.roze@gmail.com>
  29.  */
  30. class MessengerPass implements CompilerPassInterface
  31. {
  32.     public function process(ContainerBuilder $container)
  33.     {
  34.         $busIds = [];
  35.         foreach ($container->findTaggedServiceIds('messenger.bus') as $busId => $tags) {
  36.             $busIds[] = $busId;
  37.             if ($container->hasParameter($busMiddlewareParameter $busId.'.middleware')) {
  38.                 $this->registerBusMiddleware($container$busId$container->getParameter($busMiddlewareParameter));
  39.                 $container->getParameterBag()->remove($busMiddlewareParameter);
  40.             }
  41.             if ($container->hasDefinition('data_collector.messenger')) {
  42.                 $this->registerBusToCollector($container$busId);
  43.             }
  44.         }
  45.         if ($container->hasDefinition('messenger.receiver_locator')) {
  46.             $this->registerReceivers($container$busIds);
  47.         }
  48.         $this->registerHandlers($container$busIds);
  49.     }
  50.     private function registerHandlers(ContainerBuilder $container, array $busIds)
  51.     {
  52.         $definitions = [];
  53.         $handlersByBusAndMessage = [];
  54.         $handlerToOriginalServiceIdMapping = [];
  55.         foreach ($container->findTaggedServiceIds('messenger.message_handler'true) as $serviceId => $tags) {
  56.             foreach ($tags as $tag) {
  57.                 if (isset($tag['bus']) && !\in_array($tag['bus'], $busIdstrue)) {
  58.                     throw new RuntimeException(sprintf('Invalid handler service "%s": bus "%s" specified on the tag "messenger.message_handler" does not exist (known ones are: "%s").'$serviceId$tag['bus'], implode('", "'$busIds)));
  59.                 }
  60.                 $className $this->getServiceClass($container$serviceId);
  61.                 $r $container->getReflectionClass($className);
  62.                 if (null === $r) {
  63.                     throw new RuntimeException(sprintf('Invalid service "%s": class "%s" does not exist.'$serviceId$className));
  64.                 }
  65.                 if (isset($tag['handles'])) {
  66.                     $handles = isset($tag['method']) ? [$tag['handles'] => $tag['method']] : [$tag['handles']];
  67.                 } else {
  68.                     $handles $this->guessHandledClasses($r$serviceId$tag['method'] ?? '__invoke');
  69.                 }
  70.                 $message null;
  71.                 $handlerBuses = (array) ($tag['bus'] ?? $busIds);
  72.                 foreach ($handles as $message => $options) {
  73.                     $buses $handlerBuses;
  74.                     if (\is_int($message)) {
  75.                         if (\is_string($options)) {
  76.                             $message $options;
  77.                             $options = [];
  78.                         } else {
  79.                             throw new RuntimeException(sprintf('The handler configuration needs to return an array of messages or an associated array of message and configuration. Found value of type "%s" at position "%d" for service "%s".'get_debug_type($options), $message$serviceId));
  80.                         }
  81.                     }
  82.                     if (\is_string($options)) {
  83.                         $options = ['method' => $options];
  84.                     }
  85.                     if (!isset($options['from_transport']) && isset($tag['from_transport'])) {
  86.                         $options['from_transport'] = $tag['from_transport'];
  87.                     }
  88.                     $priority $tag['priority'] ?? $options['priority'] ?? 0;
  89.                     $method $options['method'] ?? '__invoke';
  90.                     if (isset($options['bus'])) {
  91.                         if (!\in_array($options['bus'], $busIds)) {
  92.                             // @deprecated since Symfony 6.2, in 7.0 change to:
  93.                             // $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
  94.                             $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"'$r->getName()) : sprintf('used as argument type in method "%s::%s()"'$r->getName(), $method));
  95.                             throw new RuntimeException(sprintf('Invalid configuration "%s" for message "%s": bus "%s" does not exist.'$messageLocation$message$options['bus']));
  96.                         }
  97.                         $buses = [$options['bus']];
  98.                     }
  99.                     if ('*' !== $message && !class_exists($message) && !interface_exists($messagefalse)) {
  100.                         // @deprecated since Symfony 6.2, in 7.0 change to:
  101.                         // $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method);
  102.                         $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"'$r->getName()) : sprintf('used as argument type in method "%s::%s()"'$r->getName(), $method));
  103.                         throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" "%s" not found.'$serviceId$message$messageLocation));
  104.                     }
  105.                     if (!$r->hasMethod($method)) {
  106.                         throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::%s()" does not exist.'$serviceId$r->getName(), $method));
  107.                     }
  108.                     if ('__invoke' !== $method) {
  109.                         $wrapperDefinition = (new Definition('Closure'))->addArgument([new Reference($serviceId), $method])->setFactory('Closure::fromCallable');
  110.                         $definitions[$definitionId '.messenger.method_on_object_wrapper.'.ContainerBuilder::hash($message.':'.$priority.':'.$serviceId.':'.$method)] = $wrapperDefinition;
  111.                     } else {
  112.                         $definitionId $serviceId;
  113.                     }
  114.                     $handlerToOriginalServiceIdMapping[$definitionId] = $serviceId;
  115.                     foreach ($buses as $handlerBus) {
  116.                         $handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId$options];
  117.                     }
  118.                 }
  119.                 if (null === $message) {
  120.                     throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::getHandledMessages()" must return one or more messages.'$serviceId$r->getName()));
  121.                 }
  122.             }
  123.         }
  124.         foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
  125.             foreach ($handlersByMessage as $message => $handlersByPriority) {
  126.                 krsort($handlersByPriority);
  127.                 $handlersByBusAndMessage[$bus][$message] = array_merge(...$handlersByPriority);
  128.             }
  129.         }
  130.         $handlersLocatorMappingByBus = [];
  131.         foreach ($handlersByBusAndMessage as $bus => $handlersByMessage) {
  132.             foreach ($handlersByMessage as $message => $handlers) {
  133.                 $handlerDescriptors = [];
  134.                 foreach ($handlers as $handler) {
  135.                     $definitions[$definitionId '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0])] = (new Definition(HandlerDescriptor::class))->setArguments([new Reference($handler[0]), $handler[1]]);
  136.                     $handlerDescriptors[] = new Reference($definitionId);
  137.                 }
  138.                 $handlersLocatorMappingByBus[$bus][$message] = new IteratorArgument($handlerDescriptors);
  139.             }
  140.         }
  141.         $container->addDefinitions($definitions);
  142.         foreach ($busIds as $bus) {
  143.             $container->register($locatorId $bus.'.messenger.handlers_locator'HandlersLocator::class)
  144.                 ->setArgument(0$handlersLocatorMappingByBus[$bus] ?? [])
  145.             ;
  146.             if ($container->has($handleMessageId $bus.'.middleware.handle_message')) {
  147.                 $container->getDefinition($handleMessageId)
  148.                     ->replaceArgument(0, new Reference($locatorId))
  149.                 ;
  150.             }
  151.         }
  152.         if ($container->hasDefinition('console.command.messenger_debug')) {
  153.             $debugCommandMapping $handlersByBusAndMessage;
  154.             foreach ($busIds as $bus) {
  155.                 if (!isset($debugCommandMapping[$bus])) {
  156.                     $debugCommandMapping[$bus] = [];
  157.                 }
  158.                 foreach ($debugCommandMapping[$bus] as $message => $handlers) {
  159.                     foreach ($handlers as $key => $handler) {
  160.                         $debugCommandMapping[$bus][$message][$key][0] = $handlerToOriginalServiceIdMapping[$handler[0]];
  161.                     }
  162.                 }
  163.             }
  164.             $container->getDefinition('console.command.messenger_debug')->replaceArgument(0$debugCommandMapping);
  165.         }
  166.     }
  167.     private function guessHandledClasses(\ReflectionClass $handlerClassstring $serviceIdstring $methodName): iterable
  168.     {
  169.         if ($handlerClass->implementsInterface(MessageSubscriberInterface::class)) {
  170.             trigger_deprecation('symfony/messenger''6.2''Implementing "%s" is deprecated, use the "%s" attribute instead.'MessageSubscriberInterface::class, AsMessageHandler::class);
  171.             return $handlerClass->getName()::getHandledMessages();
  172.         }
  173.         if ($handlerClass->implementsInterface(MessageHandlerInterface::class)) {
  174.             trigger_deprecation('symfony/messenger''6.2''Implementing "%s" is deprecated, use the "%s" attribute instead.'MessageHandlerInterface::class, AsMessageHandler::class);
  175.         }
  176.         try {
  177.             $method $handlerClass->getMethod($methodName);
  178.         } catch (\ReflectionException) {
  179.             throw new RuntimeException(sprintf('Invalid handler service "%s": class "%s" must have an "%s()" method.'$serviceId$handlerClass->getName(), $methodName));
  180.         }
  181.         if (=== $method->getNumberOfRequiredParameters()) {
  182.             throw new RuntimeException(sprintf('Invalid handler service "%s": method "%s::%s()" requires at least one argument, first one being the message it handles.'$serviceId$handlerClass->getName(), $methodName));
  183.         }
  184.         $parameters $method->getParameters();
  185.         /** @var \ReflectionNamedType|\ReflectionUnionType|null */
  186.         $type $parameters[0]->getType();
  187.         if (!$type) {
  188.             throw new RuntimeException(sprintf('Invalid handler service "%s": argument "$%s" of method "%s::%s()" must have a type-hint corresponding to the message class it handles.'$serviceId$parameters[0]->getName(), $handlerClass->getName(), $methodName));
  189.         }
  190.         if ($type instanceof \ReflectionUnionType) {
  191.             $types = [];
  192.             $invalidTypes = [];
  193.             foreach ($type->getTypes() as $type) {
  194.                 if (!$type->isBuiltin()) {
  195.                     $types[] = (string) $type;
  196.                 } else {
  197.                     $invalidTypes[] = (string) $type;
  198.                 }
  199.             }
  200.             if ($types) {
  201.                 return ('__invoke' === $methodName) ? $types array_fill_keys($types$methodName);
  202.             }
  203.             throw new RuntimeException(sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::__invoke()" must be a class , "%s" given.'$serviceId$parameters[0]->getName(), $handlerClass->getName(), implode('|'$invalidTypes)));
  204.         }
  205.         if ($type->isBuiltin()) {
  206.             throw new RuntimeException(sprintf('Invalid handler service "%s": type-hint of argument "$%s" in method "%s::%s()" must be a class , "%s" given.'$serviceId$parameters[0]->getName(), $handlerClass->getName(), $methodName$type instanceof \ReflectionNamedType $type->getName() : (string) $type));
  207.         }
  208.         return ('__invoke' === $methodName) ? [$type->getName()] : [$type->getName() => $methodName];
  209.     }
  210.     private function registerReceivers(ContainerBuilder $container, array $busIds)
  211.     {
  212.         $receiverMapping = [];
  213.         $failureTransportsMap = [];
  214.         if ($container->hasDefinition('console.command.messenger_failed_messages_retry')) {
  215.             $commandDefinition $container->getDefinition('console.command.messenger_failed_messages_retry');
  216.             $globalReceiverName $commandDefinition->getArgument(0);
  217.             if (null !== $globalReceiverName) {
  218.                 if ($container->hasAlias('messenger.failure_transports.default')) {
  219.                     $failureTransportsMap[$globalReceiverName] = new Reference('messenger.failure_transports.default');
  220.                 } else {
  221.                     $failureTransportsMap[$globalReceiverName] = new Reference('messenger.transport.'.$globalReceiverName);
  222.                 }
  223.             }
  224.         }
  225.         foreach ($container->findTaggedServiceIds('messenger.receiver') as $id => $tags) {
  226.             $receiverClass $this->getServiceClass($container$id);
  227.             if (!is_subclass_of($receiverClassReceiverInterface::class)) {
  228.                 throw new RuntimeException(sprintf('Invalid receiver "%s": class "%s" must implement interface "%s".'$id$receiverClassReceiverInterface::class));
  229.             }
  230.             $receiverMapping[$id] = new Reference($id);
  231.             foreach ($tags as $tag) {
  232.                 if (isset($tag['alias'])) {
  233.                     $receiverMapping[$tag['alias']] = $receiverMapping[$id];
  234.                     if ($tag['is_failure_transport'] ?? false) {
  235.                         $failureTransportsMap[$tag['alias']] = $receiverMapping[$id];
  236.                     }
  237.                 }
  238.             }
  239.         }
  240.         $receiverNames = [];
  241.         foreach ($receiverMapping as $name => $reference) {
  242.             $receiverNames[(string) $reference] = $name;
  243.         }
  244.         $buses = [];
  245.         foreach ($busIds as $busId) {
  246.             $buses[$busId] = new Reference($busId);
  247.         }
  248.         if ($hasRoutableMessageBus $container->hasDefinition('messenger.routable_message_bus')) {
  249.             $container->getDefinition('messenger.routable_message_bus')
  250.                 ->replaceArgument(0ServiceLocatorTagPass::register($container$buses));
  251.         }
  252.         if ($container->hasDefinition('console.command.messenger_consume_messages')) {
  253.             $consumeCommandDefinition $container->getDefinition('console.command.messenger_consume_messages');
  254.             if ($hasRoutableMessageBus) {
  255.                 $consumeCommandDefinition->replaceArgument(0, new Reference('messenger.routable_message_bus'));
  256.             }
  257.             $consumeCommandDefinition->replaceArgument(4array_values($receiverNames));
  258.             try {
  259.                 $consumeCommandDefinition->replaceArgument(6$busIds);
  260.             } catch (OutOfBoundsException) {
  261.                 // ignore to preserve compatibility with symfony/framework-bundle < 5.4
  262.             }
  263.         }
  264.         if ($container->hasDefinition('console.command.messenger_setup_transports')) {
  265.             $container->getDefinition('console.command.messenger_setup_transports')
  266.                 ->replaceArgument(1array_values($receiverNames));
  267.         }
  268.         if ($container->hasDefinition('console.command.messenger_stats')) {
  269.             $container->getDefinition('console.command.messenger_stats')
  270.                 ->replaceArgument(1array_values($receiverNames));
  271.         }
  272.         $container->getDefinition('messenger.receiver_locator')->replaceArgument(0$receiverMapping);
  273.         $failureTransportsLocator ServiceLocatorTagPass::register($container$failureTransportsMap);
  274.         $failedCommandIds = [
  275.             'console.command.messenger_failed_messages_retry',
  276.             'console.command.messenger_failed_messages_show',
  277.             'console.command.messenger_failed_messages_remove',
  278.         ];
  279.         foreach ($failedCommandIds as $failedCommandId) {
  280.             if ($container->hasDefinition($failedCommandId)) {
  281.                 $definition $container->getDefinition($failedCommandId);
  282.                 $definition->replaceArgument(1$failureTransportsLocator);
  283.             }
  284.         }
  285.     }
  286.     private function registerBusToCollector(ContainerBuilder $containerstring $busId)
  287.     {
  288.         $container->setDefinition(
  289.             $tracedBusId 'debug.traced.'.$busId,
  290.             (new Definition(TraceableMessageBus::class, [new Reference($tracedBusId.'.inner')]))->setDecoratedService($busId)
  291.         );
  292.         $container->getDefinition('data_collector.messenger')->addMethodCall('registerBus', [$busId, new Reference($tracedBusId)]);
  293.     }
  294.     private function registerBusMiddleware(ContainerBuilder $containerstring $busId, array $middlewareCollection)
  295.     {
  296.         $middlewareReferences = [];
  297.         foreach ($middlewareCollection as $middlewareItem) {
  298.             $id $middlewareItem['id'];
  299.             $arguments $middlewareItem['arguments'] ?? [];
  300.             if (!$container->has($messengerMiddlewareId 'messenger.middleware.'.$id)) {
  301.                 $messengerMiddlewareId $id;
  302.             }
  303.             if (!$container->has($messengerMiddlewareId)) {
  304.                 throw new RuntimeException(sprintf('Invalid middleware: service "%s" not found.'$id));
  305.             }
  306.             if ($container->findDefinition($messengerMiddlewareId)->isAbstract()) {
  307.                 $childDefinition = new ChildDefinition($messengerMiddlewareId);
  308.                 $childDefinition->setArguments($arguments);
  309.                 if (isset($middlewareReferences[$messengerMiddlewareId $busId.'.middleware.'.$id])) {
  310.                     $messengerMiddlewareId .= '.'.ContainerBuilder::hash($arguments);
  311.                 }
  312.                 $container->setDefinition($messengerMiddlewareId$childDefinition);
  313.             } elseif ($arguments) {
  314.                 throw new RuntimeException(sprintf('Invalid middleware factory "%s": a middleware factory must be an abstract definition.'$id));
  315.             }
  316.             $middlewareReferences[$messengerMiddlewareId] = new Reference($messengerMiddlewareId);
  317.         }
  318.         $container->getDefinition($busId)->replaceArgument(0, new IteratorArgument(array_values($middlewareReferences)));
  319.     }
  320.     private function getServiceClass(ContainerBuilder $containerstring $serviceId): string
  321.     {
  322.         while (true) {
  323.             $definition $container->findDefinition($serviceId);
  324.             if (!$definition->getClass() && $definition instanceof ChildDefinition) {
  325.                 $serviceId $definition->getParent();
  326.                 continue;
  327.             }
  328.             return $definition->getClass();
  329.         }
  330.     }
  331. }