Предпосылки
Как и в других руководствах по 1С, буду использовать компоненту kovalevdmv/1CRabbitMQ, интерфейс компоненты реализован через обработку КлиентRMQ (в составе конфигурации). Запускать consumer (потребителей) буду в фоновых заданиях в клиент серверном варианте.
Введение
RabbitMQ - это средство обмена сообщениями: оно принимает и пересылает сообщения. Вы можете рассматривать его как почтовое отделение: когда вы опускаете письмо, которое хотите отправить, в почтовый ящик, вы можете быть уверены, что почтальон в конечном итоге доставит его вашему получателю. В этой аналогии RabbitMQ - это почтовый ящик, почтовое отделение и почтальон.
Основное различие между RabbitMQ и почтовым отделением заключается в том, что он не работает с бумагой, а принимает, хранит и пересылает двоичные блоки данных сообщения (messages).
RabbitMQ и система обмена сообщениями в целом используют некоторый жаргон.
- Производящий (Producing) означает не что иное, как отправку. Программа, которая отправляет сообщения, является производителем (producer):
- Очередь (queue) - это название почтового ящика в RabbitMQ. Хотя сообщения передаются через RabbitMQ и ваши приложения, они могут храниться только внутри очереди. Очередь ограничена только объемом памяти и диска хоста, по сути, это большой буфер сообщений.
Несколько производителей могут отправлять сообщения, которые попадают в одну очередь, и несколько потребители могут пытаться получить данные из одной очереди.
Вот как мы представляем очередь:
- Потребление (Consuming) имеет то же значение, что и получение. Потребитель (consumer) - это программа, которая в основном ожидает получения сообщений:
Обратите внимание, что производитель (producer), потребитель (consumer) и посредник (broker) не обязательно должны находиться на одном хосте; на самом деле, в большинстве приложений это не так. Приложение также может быть как производителем, так и потребителем.
Приветствую, мир!
В этой части урока мы напишем две небольшие программы: программу-производителя (отправителя), которая отправляет одно сообщение, и программу-потребителя (receiver), которая получает сообщения и распечатывает их. Это "Привет, мир" обмена сообщениями.
На приведенной ниже диаграмме "P" - это наш производитель, а "C" - наш потребитель. Поле посередине представляет собой очередь - буфер сообщений, который RabbitMQ хранит от имени потребителя.
Наш общий дизайн будет выглядеть следующим образом:
Производитель отправляет сообщения в очередь "hello". Пользователь получает сообщения из этой очереди.
Отправка
Наша первая программа отправит в очередь одно сообщение. Первое, что нам нужно сделать, это установить соединение с сервером RabbitMQ.
КлиентRMQ = Обработки.КлиентRMQ.Создать();
СтрокаПодключения = ПолучитьСтрокуПодключения();
Подключение = КлиентRMQ.ПодключитьсяКСерверу(СтрокаПодключения);
// Обрабока ошибок, которые произошли при подключении
Если КлиентRMQ.ЭтоОшибка(Подключение) Тогда
КлиентRMQ.ОтключитьсяОтСервера(Подключение);
Сообщить("ОтправитьСообщениеУрок1НаСервере: Ошибка подключения");
Возврат;
КонецЕсли;
Канал = КлиентRMQ.СоздатьКанал(Подключение);
Функция ПолучитьСтрокуПодключения()
НастройкиОбмена = ПолучитьНастройки();
// НастройкиОбмена.Адрес = "192.168.57.2";
// НастройкиОбмена.Порт = 5672;
// НастройкиОбмена.Логин = "rmuser";
// НастройкиОбмена.Пароль = "rmpassword";
ВиртуальныйХост = НастройкиОбмена.ВиртуальныйХост;
Если ВиртуальныйХост = "/" Тогда
ВиртуальныйХост = "%2F";
КонецЕсли;
СтрокаПодключения = СтрШаблон("amqp://%3:%4@%1:%2/%5",
НастройкиОбмена.Адрес,
Формат(НастройкиОбмена.Порт, "ЧГ="),
НастройкиОбмена.Логин,
НастройкиОбмена.Пароль,
ВиртуальныйХост);
Возврат СтрокаПодключения;
КонецФункции
Теперь мы подключены к брокеру на компьютере "192.168.57.2". Если бы мы хотели подключиться к брокеру на другом компьютере, мы бы просто указали здесь его имя или IP-адрес.
RabbitMQ - это мультитенантная система: соединения, обмены, очереди, привязки, разрешения пользователей, политики и некоторые другие функции принадлежат виртуальным хостам, логическим группам объектов. Если вы знакомы с виртуальными хостами в Apache или серверными блоками в Nginx, идея аналогична. Более подробно в документации.
Пожалуйста, обратите внимание, что HTTP API не идеален для высокопроизводительной публикации; необходимость создания нового TCP-соединения для каждого опубликованного сообщения может ограничить пропускную способность сообщений по сравнению с AMQP или другими протоколами, использующими долгоживущие соединения.
Далее, перед отправкой нам нужно убедиться, что очередь получателей существует. Если мы отправим сообщение в несуществующее местоположение, RabbitMQ просто удалит сообщение. Давайте создадим очередь hello, в которую будет доставлено сообщение:
ИмяОчереди = "hello";
КлючМаршрутизации = ""; // пока не используем
ТочкаОбмена = ""; // (AMQP default)
СохранятьОчередь = Ложь;
// объявить очередь (создание очереди). Если очередь уже существует и настройки конфликтуют,
// сервер закроет канал (подробнее про это поведение в офф. документации rabbitMQ)
Очередь = КлиентRMQ.ОбъявитьОчередь(Канал, ИмяОчереди, КлючМаршрутизации, ТочкаОбмена, СохранятьОчередь);
На данный момент мы готовы к отправке сообщения. Наше первое сообщение будет содержать просто строку "Hello World!" и мы хотим отправить его в нашу очередь hello.
В RabbitMQ сообщение никогда не может быть отправлено непосредственно в очередь, оно всегда должно пройти процедуру обмена (exchange). Но давайте не будем углубляться в подробности вы можете прочитать больше об обменах в третьей части этого руководства. Все, что нам нужно знать сейчас, это как использовать обмен данными по умолчанию, обозначаемый пустой строкой. Этот обмен является "особенным" он позволяет нам точно указать, в какую очередь должно быть отправлено сообщение. Имя очереди должно быть указано в параметре routing_key:
Данные = "Hello world!";
КлючМаршрутизации = "hello";
ТочкаОбмена = ""; // (AMQP default), маршрутизирует по имени очереди
Ответ = КлиентRMQ.ОпубликоватьСообщение(Канал, Данные, КлючМаршрутизации, ТочкаОбмена);
Перед выходом из программы нам нужно убедиться, что сетевые буферы были очищены и наше сообщение действительно было доставлено в RabbitMQ. Мы можем сделать это, аккуратно разорвав соединение.
// Обработка ошибок, если они были при публикации
Если КлиентRMQ.ЭтоОшибка(Ответ) Тогда
// Дейстия при ошибке подключения
Сообщить(СтрШаблон("Ошибка: %1", Ответ.Текст));
Возврат;
КонецЕсли;
// Проверка, что сервер принял нашу публикацию (обработка случая когда сервер не принял сообщение)
Если Ответ.Текст <> КлиентRMQ.ПодтверждениеСервера_СерверПодтвердилПолучениеСообщения() Тогда
//
КонецЕсли;
// !!! ОБЯЗАТЕЛЬНО ОТКЛЮЧИТЬСЯ ОТ СЕРВЕРА. ИНАЧЕ ЛИШНИЙ РАСХОД ПАМЯТИ.
КлиентRMQ.ОтключитьсяОтСервера(Подключение);
Получение
Наша вторая программа будет получать сообщения из очереди и выводить их на экран.
Опять же, сначала нам нужно подключиться к серверу RabbitMQ. Код, отвечающий за подключение к Rabbit, тот же, что и ранее.
Следующим шагом, как и раньше, является проверка существования очереди. ОбьявитьОчередь() является идемпотентным, мы можем запускать команду столько раз сколько захотим и будет создана только одна очередь.
ИмяОчереди = "hello";
КлючМаршрутизации = "";
ТочкаОбмена = "";
СохранятьОчередь = Истина;
// объявить очередь (создание очереди). Если очередь уже существует и настройки конфликтуют,
// сервер закроет канал (подробнее про это поведение в офф. документации rabbitMQ)
Очередь = КлиентRMQ.ОбъявитьОчередь(Канал, ИмяОчереди, КлючМаршрутизации,
ТочкаОбмена, СохранятьОчередь);
Вы можете спросить: зачем мы снова объявляем очередь мы уже объявляли ее в нашем предыдущем коде? Мы могли бы избежать этого, если бы были уверены, что очередь уже существует. Например, если программа отправки была запущена ранее. Но мы пока не уверены, какую программу запускать первой. В таких случаях рекомендуется повторить объявление очереди в обеих программах.
В источнике на Python: Получать сообщения из очереди сложнее. Это работает путем привязки функции обратного вызова к очереди. Всякий раз, когда мы получаем сообщение, эта функция обратного вызова вызывается библиотекой.
В нашем случае эта функция выводит содержимое сообщения.
Далее нам нужно сообщить RabbitMQ, что этот клиент будет получать сообщения из нашей очереди приветствий:
ТегПолучателя = "";
ИмяОчереди = "hello";
no_ack = Истина; // Без подтверждения получения
// Создать получателя, через которого будут получаться сообщения из очереди
Получатель = КлиентRMQ.СоздатьПолучателя(Канал, ИмяОчереди, ТегПолучателя, no_ack);
И, наконец, мы вводим цикл пока есть сообщения, который читает сообщение и выполняет вывод.
// СледующееСообщение() ожидает сообщение в очереди. Эта функция блокирует основной поток, поэтому этот код надо запускать в фоне.
Пока КлиентRMQ.СледующееСообщение(Получатель) Цикл // только ожидает сообщение
// получить данные сообщения
ДанныеСообщения = КлиентRMQ.ДанныеСообщения(); // получает данные сообщения
// Проверка на ошибки, если они возникли
Если КлиентRMQ.ЭтоОшибка(ДанныеСообщения) Тогда
Продолжить;
КонецЕсли;
Сообщить(ДанныеСообщения.Данные);
Прервать;
КонецЦикла;
По окончании цикла закрываем канал для чтения
// обработка ошибок при ожидании сообщения.
// Если возникнут ошибки, цикл прервется СледующееСообщение() вернет Ложь
// и обработать такую ошибку надо за циклом.
ДанныеСообщения = КлиентRMQ.ДанныеСообщения();
Если КлиентRMQ.ЭтоОшибка(ДанныеСообщения) Тогда
// код обработки ошибки ожидания сообщения
КонецЕсли;
КлиентRMQ.ОтключитьсяОтСервера(Подключение);
Мы узнали, как отправлять и получать сообщения из именованной очереди. Пришло время перейти к части 2 и создать простую рабочую очередь (work queue).
Полностью собранный учебный пример добавлю в виде обработки к конфигурации компоненты после окончания написания всех частей.
Благодарю за внимание.