Предпосылки
Как и в других руководствах по 1С, мы будем использовать компоненту PinkRabbitMQ версии 2.2.0.37.
На чем сосредоточено внимание в этом Руководстве
В первом руководстве мы написали программы для отправки и получения сообщений из именованной очереди. В этом руководстве мы создадим рабочую очередь, которая будет использоваться для распределения трудоемких задач между несколькими исполнителями (workers).
Основная идея рабочих очередей (Work Queues), также называемых очередями задач (Task Queues), заключается в том, чтобы избежать немедленного выполнения ресурсоемкой задачи и необходимости ждать ее завершения. Вместо этого мы планируем выполнение задачи позже. Мы инкапсулируем задачу в виде сообщения и отправляем его в очередь. Рабочий процесс, запущенный в фоновом режиме, выведет задачи на экран и в конечном итоге выполнит задание. При запуске большого количества рабочих процессов задачи будут распределены между ними.
Эта концепция особенно полезна в веб-приложениях, где невозможно выполнить сложную задачу за короткое время выполнения HTTP-запроса.
В предыдущей части этого урока мы отправили сообщение, содержащее "Привет, мир!". Теперь мы будем отправлять строки, обозначающие сложные задачи. У нас нет реальной задачи, такой как изменение размера изображений или рендеринг pdf-файлов, поэтому давайте притворимся, что мы заняты, используя функцию ПодключитьОбработчикОжидания().
Мы немного изменим код отправки из нашего предыдущего примера, чтобы разрешить отправку пакета сообщений.
ОтправляемоеСообщение = "Hello World!";
Для Сч = 1 По 5 Цикл
ОтправляемоеСообщение1 = ОтправляемоеСообщение + " " + Сч + ".";
routingKey = ИмяОчереди;
Клиент.BasicPublish("", routingKey, ОтправляемоеСообщение1, 0, Ложь);
Сообщить("Отправлено " + ОтправляемоеСообщение1);
КонецЦикла;
Наш старый скрипт обработки также требует некоторых изменений: ему нужно имитировать секунду работы для каждого события. Т.к. нормальной реализации Sleep() нет, то используем считывание сообщений каждую секунду.
Процедура ПолучениеСЗадержкойЦикл(Команда)
ИмяОчереди = "hello";
Клиент.DeclareQueue(ИмяОчереди, Ложь, Ложь, Ложь, Ложь);
noConfirm = Истина;
exclusive = Ложь;
selectSize = 1;
Потребитель = Клиент.BasicConsume(ИмяОчереди, "", noConfirm, exclusive, selectSize);
ИмяОчереди = "hello";
ПодключитьОбработчикОжидания("ПолучениеСЗадержкой", 10);
КонецПроцедуры
&НаКлиенте
Процедура ОтключитьПолучениеСЗадержкой(Команда)
ОтключитьОбработчикОжидания("ПолучениеСЗадержкой");
Клиент.BasicCancel("");
КонецПроцедуры
&НаКлиенте
Процедура ПолучениеСЗадержкой() Экспорт
ОтветноеСообщение = "";
ТегСообщения = 0;
Пока Клиент.BasicConsumeMessage("", ОтветноеСообщение, ТегСообщения, 5000) Цикл
Если Не ПустаяСтрока(ОтветноеСообщение) Тогда
Сообщить("Из очереди прочитано сообщение " + ОтветноеСообщение);
Клиент.BasicAck(ТегСообщения);
ОтветноеСообщение = ""; // Обнуляем, чтобы избежать утечку памяти
ТегСообщения = 0; // Обнуляем, чтобы избежать утечку памяти
КонецЕсли;
КонецЦикла;
КонецПроцедуры
Циклическая диспетчеризация
Одним из преимуществ использования очереди задач является возможность легко распараллеливать работу. Если мы накапливаем объем невыполненной работы, мы можем просто добавить больше исполнителей (workers) и таким образом легко масштабировать.
Сначала давайте попробуем запустить две функции получения и обработки одновременно. Они оба будут получать сообщения из очереди, но как именно? Давайте посмотрим.
Вам нужно открыть три клиента 1С. На двух из них будет запущен ПолучениеСЗадержкойЦикл(). Эти консоли будут нашими двумя потребителями - C1 и C2
В третьем клиенте мы запустим ОтправкаСПараметром(). Как только вы запустите группу потребителей, вы можете опубликовать несколько сообщений:
Отправлено Hello World! 1.
Отправлено Hello World! 2.
Отправлено Hello World! 3.
Отправлено Hello World! 4.
Отправлено Hello World! 5.
Давайте посмотрим, что доставляют нашим исполнителям:
Клиент 1
Из очереди прочитано сообщение Hello World! 1.
Из очереди прочитано сообщение Hello World! 3.
Из очереди прочитано сообщение Hello World! 5.
Клиент 2
Из очереди прочитано сообщение Hello World! 2.
Из очереди прочитано сообщение Hello World! 4.
По умолчанию RabbitMQ будет отправлять каждое сообщение следующему пользователю в определенной последовательности. В среднем каждый пользователь получит одинаковое количество сообщений. Такой способ распространения сообщений называется циклическим. Попробуйте его с тремя или более пользователями.
Подтверждение сообщения
Выполнение задачи может занять несколько секунд, и вы можете задаться вопросом, что произойдет, если пользователь запустит длинную задачу, и она завершится до ее завершения. В нашем текущем коде, как только RabbitMQ доставит сообщение пользователю, оно сразу же помечается для удаления. В этом случае, если вы завершаете работу потребителя, сообщение, которое он только что обрабатывал, теряется. Сообщения, которые были отправлены этому конкретному потребителю, но еще не были обработаны, также теряются.
Но мы не хотим терять ни одной задачи. Если потребитель умирает, мы бы хотели, чтобы задача была передана другому потребителю.
Чтобы гарантировать, что сообщение никогда не будет потеряно, RabbitMQ поддерживает подтверждение сообщений (message acknowledgments). Пользователь отправляет ответное подтверждение (ack), сообщающее RabbitMQ, что конкретное сообщение было получено, обработано и что RabbitMQ может его удалить.
Если потребитель умирает (его канал закрыт, соединение прервано или TCP-соединение потеряно) без отправки подтверждения, RabbitMQ поймет, что сообщение не было обработано полностью, и повторно поместит его в очередь. Если в то же время в Сети есть другие потребители, система быстро переадресует его другому пользователю. Таким образом, вы можете быть уверены, что ни одно сообщение не будет потеряно, даже если потребители время от времени погибнут.
При подтверждении доставки потребителем устанавливается тайм-аут (по умолчанию 30 минут). Это помогает обнаруживать сбои в работе (зависание) потребителей, которые никогда не подтверждают доставку. Вы можете увеличить этот тайм-аут, как описано в разделе Тайм-аут подтверждения доставки.
Ручные подтверждения сообщений включены по умолчанию. В предыдущих примерах мы явно отключили их с помощью флага noConfirm = Истина;. Пришло время снять этот флаг и отправить соответствующее подтверждение от потребителя, как только мы закончим выполнение задачи.
&НаКлиенте
Процедура ПолучениеСЗадержкой() Экспорт
ОтветноеСообщение = "";
ТегСообщения = 0;
Пока Клиент.BasicConsumeMessage("", ОтветноеСообщение, ТегСообщения, 5000) Цикл
Если Не ПустаяСтрока(ОтветноеСообщение) Тогда
Сообщить("Из очереди прочитано сообщение " + ОтветноеСообщение);
Клиент.BasicAck(ТегСообщения);
ОтветноеСообщение = ""; // Обнуляем, чтобы избежать утечку памяти
ТегСообщения = 0; // Обнуляем, чтобы избежать утечку памяти
КонецЕсли;
КонецЦикла;
КонецПроцедуры
&НаКлиенте
Процедура ПолучениеСЗадержкойЦикл(Команда)
ИмяОчереди = "hello";
Клиент.DeclareQueue(ИмяОчереди, Ложь, Ложь, Ложь, Ложь);
noConfirm = Ложь;
exclusive = Ложь;
selectSize = 1;
Потребитель = Клиент.BasicConsume(ИмяОчереди, "", noConfirm, exclusive, selectSize);
ИмяОчереди = "hello";
ПодключитьОбработчикОжидания("ПолучениеСЗадержкой", 10);
КонецПроцедуры
Используя этот код, вы можете гарантировать, что даже если вы завершите работу обработчика (worker) во время обработки сообщения, ничего не будет потеряно. Вскоре после завершения работы обработчика (worker) все неподтвержденные сообщения будут доставлены повторно.
Подтверждение должно быть отправлено по тому же каналу, по которому была получена доставка. Попытки подтверждения с использованием другого канала приведут к исключению протокола канального уровня.
Долговечность сообщения
Мы научились делать так, чтобы даже в случае смерти потребителя задача не была потеряна. Но наши задачи все равно будут потеряны, если сервер RabbitMQ остановится.
Когда RabbitMQ завершает работу или выходит из строя, он забывает об очередях и сообщениях, если вы не скажете ему этого не делать. Чтобы убедиться, что сообщения не потеряны, необходимы две вещи: нам нужно пометить очередь и сообщения как долговременные.
Во-первых, нам нужно убедиться, что очередь выдержит перезапуск узла RabbitMQ. Для этого нам нужно объявить ее долговечной save = Истина;:
ИмяОчереди = "hello";
onlyCheckIfExists = Ложь;
save = Истина;
exclusive = Ложь;
autodelete = Ложь;
Клиент.DeclareQueue(ИмяОчереди, onlyCheckIfExists, save, exclusive, autodelete);
Хотя сама по себе эта команда верна, в нашей настройке она работать не будет. Это потому, что мы уже определили очередь с именем hello, которая не является долговременной. RabbitMQ не позволяет вам переопределить существующую очередь с другими параметрами и вернет ошибку любой программе, которая попытается это сделать. Но есть быстрый обходной путь - давайте объявим очередь с другим именем, например task_queue:
ИмяОчереди = "task_queue";
onlyCheckIfExists = Ложь;
save = Истина;
exclusive = Ложь;
autodelete = Ложь;
Клиент.DeclareQueue(ИмяОчереди, onlyCheckIfExists, save, exclusive, autodelete);
ЯНДЕКС ПЕРЕВОДЧИК
англ.
Это изменение DeclareQueue необходимо применить как к коду производителя, так и к коду потребителя.
На этом этапе мы уверены, что очередь task_queue не будет потеряна, даже если RabbitMQ перезапустится. Теперь нам нужно пометить наши сообщения как постоянные, указав persist = Истина;:
ОтправляемоеСообщение = "Hello World!";
routingKey = ИмяОчереди;
livingTime = 0;
persist = Истина;
Клиент.BasicPublish("", routingKey, ОтправляемоеСообщение, 0, persist);
Обратите внимание на сохраняемость сообщенийR03;
Пометка сообщений как сохраняемых не гарантирует, что сообщение не будет потеряно. Несмотря на то, что RabbitMQ сообщает о необходимости сохранения сообщения на диск, все еще существует небольшой промежуток времени, когда RabbitMQ принимает сообщение, но еще не сохраняет его. Кроме того, RabbitMQ не выполняет fsync(2) для каждого сообщения - оно может быть просто сохранено в кэше, а не записано на диск. Гарантии сохраняемости невелики, но этого более чем достаточно для нашей простой очереди задач. Если вам нужна более надежная гарантия, вы можете воспользоваться подтверждением издателя.
Справедливая отправка
Возможно, вы заметили, что диспетчеризация по-прежнему работает не совсем так, как мы хотим. Например, в ситуации с двумя потребителями, когда все нечетные сообщения являются тяжелыми, а четные - легкими, один потребителями будет постоянно занят, а другой практически не будет выполнять никакой работы. Что ж, RabbitMQ ничего об этом не знает и по-прежнему будет равномерно отправлять сообщения.
Это происходит потому, что RabbitMQ просто отправляет сообщение, когда оно попадает в очередь. Он не учитывает количество неподтвержденных сообщений для пользователя. Он просто отправляет каждое n-е сообщение n-му пользователю вслепую. prefetch = предварительная выборка.
Чтобы обойти это, мы можем использовать метод Channel#basic_qos с параметром prefetch_count=1. При этом используется метод протокола basic.qos, который указывает RabbitMQ не передавать более одного сообщения работнику за раз. Или, другими словами, не отправляйте новое сообщение сотруднику, пока оно не обработает и не подтвердит предыдущее. Вместо этого оно отправит его следующему сотруднику, который еще не занят.
Метод в PinkRabbit не поддерживается, настраивать через cli или админ панель.
Обратите внимание на размер очередиR03;
Если все работники заняты, ваша очередь может заполниться. Вам следует следить за этим и, возможно, добавить больше работников или использовать TTL сообщения.
BasicPublish ()
livingTime - Число - [НЕ РЕАЛИЗОВАНО] Время жизни сообщения в миллисекундах
Используя функции подтверждения сообщений и prefetch_count, вы можете настроить очередь заданий. Параметры долговечности позволяют сохранить выполнение задач даже при перезапуске RabbitMQ.
Теперь мы можем перейти к уроку 3 и узнать, как доставить одно и то же сообщение многим пользователям.
Ссылка на исходник обработки, приведу в порядок при написании всех частей
https://github.com/malikov-pro/1c-rabbit-tutorial
Благодарю за внимание.