Пакетная обработка сообщений Symfony Messenger

Batching Symfony Messenger messages

20 Марта 2022 | Symfony

Иногда необходимо создавать сообщения в Symfony Messenger, которые используются пакетно, а не по одному.

Пакетная обработка сообщений Symfony Messenger

Недавно мы столкнулись с ситуацией, когда мы отправляем обновленные переводы для наших сущностей через Messenger, а затем отправляем их нашему поставщику переводов.

Но поскольку у этого провайдера перевода есть сильное ограничение скорости, мы не можем отправлять их по одному. Мы должны отправить их для хранения всех полученных данным потребителем и отправить все сообщения, если мы ждем более 10 секунд без нового сообщения или если у нас сохранено более 100 сообщений .

Теперь давайте посмотрим, как заставить это работать:

// Symfony Messenger Message:
class TranslationUpdate
{
    public function __construct(
        public string $locale,
        public string $key,
        public string $value,
    ) {
    }
}
class TranslationUpdateHandler implements MessageHandlerInterface
{
    private const BUFFER_TIMER = 10; // in seconds
    private const BUFFER_LIMIT = 100;
    private array $buffer = [];

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
        pcntl_async_signals(true);
        pcntl_signal(SIGALRM, \Closure::fromCallable([$this, 'batchBuffer']));
    }

    public function __invoke(TranslationUpdate $message): void
    {
        $this->buffer[] = $message;

        if (\count($this->buffer) >= self::BUFFER_LIMIT) {
            $this->batchBuffer();
        } else {
            pcntl_alarm(self::BUFFER_TIMER);
        }
    }

    private function batchBuffer(): void
    {
        if (0 === \count($this->buffer)) {
            return;
        }

        $translationBatch = new TranslationBatch($this->buffer);
        $this->messageBus->dispatch($translationBatch);
        $this->buffer = [];

    }
}

Здесь мы видим сообщение Messenger, в основном мы отправляем его каждый раз, когда имеется обновление перевода, но это отнести к любому сообщению, которое нам нужно.

Затем у нас есть обработчик сообщений, который будет получать все сообщения и помещать их в буфер массива. Когда буфер достигает 100 элементов или если у нас нет новых элементов в течение 10 секунд, мы запускаем batchBufferметод.

Для 10-секундного таймера мы используем, pcntl_alarm что позволяет нам при необходимости выполнять асинхронный вызов batchBufferметода.

PCNTL — это способ обработки системных сигналов в нашем PHP-коде, вы можете узнать больше об этом в документации по PHP. Мы установим таймер, который будет отправлять сигнал SIGALRM процессу через заданное количество секунд. Затем, когда сигнал будет получен процессом, он вызовет функцию обратного вызова, которую мы указали в качестве второго аргумента pcntl_signal. Обратный вызов установлен для всего нашего приложения, поэтому мы можем использовать этот трюк с пакетной обработкой только один раз.

Затем в batchBufferметоде мы используем новую отправку Messenger, потому что мы хотим отслеживать сообщения, если есть проблемы, и если мы нажмем метод с помощью pcntl, у нас не будет обработки повторных попыток Messenger, если есть исключение.

class TranslationBatch
{
    /**
     * @param TranslationUpdate[] $notifications
     */
    public function __construct(
        private array $notifications,
    ) {
    }
}
class TranslationBatchHandler implements MessageHandlerInterface
{
    public function __invoke(TranslationBatch $message): void
    {
      // handle all our messages
    }
}

И, наконец, у нас есть пакетный обработчик, который всегда будет получать список сообщений для отправки! Благодаря этому мы можем легко группировать наши сообщения Messenger и избегать использования cron!

Комментарии
Если у вас есть вопросы, не стесняйтесь оставлять комментарии ниже.
Загрузка комментариев...