Не буду углубляться в тему, что такое брокер сообщений RabbitMQ, для чего он нужен и как его устанавливать/настраивать, об этом уже много было рассказано, например, тут: ссылка
Но напрямую работать с API RabbitMQ из 1С не получится, придется использовать либо сторонние компоненты (ссылка), либо на каком-нибудь языке написать прослойку (ссылка). Однако RabbitMQ поддерживает работу с ним через REST API, методы которого поддерживаются платформой 1С без дополнительных средств.
Вводная по работе через REST описана здесь: ссылка. Само описание API расположено здесь: ссылка
Для того чтобы REST API заработал, необходимо поставить плагин Management Plugin (ссылка), который дает нам веб-интерфейс по настройке RabbitMQ, без которого настраивать сервис довольно грустно, если только вы не привыкли делать всё подряд через консоль. Если без подробностей, то плагин ставится командой:
rabbitmq-plugins enable rabbitmq_management
Сразу предупрежу (типа дисклеймер): судя по всему, REST API в RabbitMQ предназначен в основном для администрирования и мониторинга, функция отправки и приема сообщений в нем реализована в простейшем виде, возможно она вообще не предназначена для промышленного использования, поэтому практически не упоминается в официальной документации (в официальной документации REST API упоминается так: The API is intended to be used for basic observability tasks).
В целом плюсы и минусы обращения к RabbitMQ через REST API вижу так:
+ Нет сторонних компонент и стороннего ПО (собственно потребность в REST API возникла не на пустом месте, а после словленных серьезных и нерешаемых проблем с внешней компонентой)
+ Простота использования
- Ограниченный функционал - мы можем только отправлять по одному сообщению и получать массив сообщений, подтверждение принятия каждого сообщения (ack) нет. Можем только управлять режимом получения всего массива сообщений (впрочем можно получать сообщения по одному, выставляя при каждой итерации нужный режим).
- Вероятно подходит только для ненагруженных обменов типа обмена кадровой информацией (хотя я отправил в цикле 2000 сообщений и все выгрузилось и загрузилось корректно).
- Передача больших файлов проблематична (обходится выгрузкой на внешние ресурсы и передачей ссылки), небольшие файлы в несколько мегабайт отправляются/получаются без проблем.
- Так как REST API тут нужен скорее для администрирования, то пользователь сервиса должен иметь административные права, что не очень хорошо с точки зрения безопасности.
Немного про режимы обработки сообщений - в REST API их 4: ack_requeue_true, reject_requeue_true, ack_requeue_false, reject_requeue_false. Соответственно ack это подтверждение приема сообщений, reject - отказ, requeue_true - сообщения остаются в очереди, requeue_false - удаляются из очереди. С ack и requeue все просто, а reject (в полном API есть схожий метод nack, отличающийся от reject некоторыми нюансами) работает следующим образом: к нашей точке обмена можно привязать (bind) дополнительную очередь с признаком "Dead letter exchange" - в эту очередь будут попадать сообщения которым выставили отказ (и некоторые другие, подробнее ссылка):
В принципе основной режим приёма в простом варианте обмена это ack_requeue_false.
Собственно код по отправке сообщения:
Фабрика_XDTO = ПолучитьФабрикуXDTO();
ОбъектСообщение = Фабрика_XDTO.Создать(Фабрика_XDTO.Тип("http://rest_rmq", "message_out"));
ОбъектСообщение.routing_key = ПараметрыПодключения.КлючМаршрутизации;
ОбъектСообщение.payload_encoding = "string"; // вариант "base64"
ОбъектСообщение.properties = Фабрика_XDTO.Создать(Фабрика_XDTO.Тип("http://rest_rmq", "message_properties"));
ОбъектСообщение.payload = ТекстСообщения;
ТекстJSON = ОбъектXDTO_ПолучитьJSON(ОбъектСообщение, Фабрика_XDTO);
// %2f это vhost по умолчанию "/"
АдресРесурса = СтрШаблон("/api/exchanges/%1/%2/publish", "%2f", ПараметрыПодключения.ТочкаОбмена);
Ответ = ОтправитьHTTPЗапрос(ПараметрыПодключения, АдресРесурса, "POST", ТекстJSON);
Для чего нужен параметр "properties" сообщения я не понял - в него можно поместить произвольную структуру, но в получаемое сообщение она не приходит, хотя было бы очень удобно там размещать метаданные сообщения.
Получение сообщений:
Сообщения = Новый Массив;
Фабрика_XDTO = ПолучитьФабрикуXDTO();
Сообщение = Фабрика_XDTO.Создать(Фабрика_XDTO.Тип("http://rest_rmq", "messages_request"));
Сообщение.count = 1000; //количество получаемых сообщений
Сообщение.ackmode = ПараметрыПодключения.РежимПолучения; // варианты "ack_requeue_true", "reject_requeue_true", "ack_requeue_false", "reject_requeue_false"
Сообщение.encoding = "auto"; // вариант "base64"
ТекстJSON = ОбъектXDTO_ПолучитьJSON(Сообщение, Фабрика_XDTO);
// %2f это vhost по умолчанию "/"
АдресРесурса = СтрШаблон("/api/queues/%1/%2/get", "%2f", ПараметрыПодключения.ОчередьОбмена);
Ответ = ОтправитьHTTPЗапрос(ПараметрыПодключения, АдресРесурса, "POST", ТекстJSON);
Если Ответ.КодСостояния <> 200 Тогда
Сообщить("Ошибка: " + Ответ.ПолучитьТелоКакСтроку());
Возврат;
КонецЕсли;
ТелоОтвета = Ответ.ПолучитьТелоКакСтроку();
ЧтениеJSON = Новый ЧтениеJSON;
ЧтениеJSON.УстановитьСтроку(ТелоОтвета);
ОбъектСообщения = Фабрика_XDTO.ПрочитатьJSON(ЧтениеJSON, Фабрика_XDTO.Тип("http://rest_rmq", "collection_messages_in"));
Для каждого Сообщение Из ОбъектСообщения.message Цикл
Сообщения.Добавить(Сообщение.payload);
КонецЦикла;
Служебные процедуры:
&НаСервереБезКонтекста
Функция ОтправитьHTTPЗапрос(ПараметрыПодключения, АдресРесурса, Метод = "POST", ТекстJSON)
Заголовки = Новый Соответствие;
Заголовки.Вставить("content-type", "application/json");
HTTPЗапрос = Новый HTTPЗапрос(АдресРесурса, Заголовки);
HTTPЗапрос.УстановитьТелоИзСтроки(ТекстJSON);
АдресСервера = ПараметрыПодключения.АдресСервера;
Логин = ПараметрыПодключения.Логин;
Пароль = ПараметрыПодключения.Пароль;
HTTPСоединение = Новый HTTPСоединение(АдресСервера, 15672, Логин, Пароль,, 60);
Ответ = HTTPСоединение.ВызватьHTTPМетод(Метод, HTTPЗапрос);
Возврат Ответ;
КонецФункции
&НаСервере
Функция ПолучитьФабрикуXDTO()
// схема в макете обработки
ТекстXSD = РеквизитФормыВЗначение("Объект").ПолучитьМакет("Схема").ПолучитьТекст();
// в случае встроенного в конфигурацию пакета XDTO необходимо использовать встроенную фабрику:
// Возврат ФабрикаXDTO;
ЧтениеXML = Новый ЧтениеXML;
ЧтениеXML.УстановитьСтроку(ТекстXSD);
ПостроительDOM = Новый ПостроительDOM;
ДокументDOM = ПостроительDOM.Прочитать(ЧтениеXML);
ПостроительСхем = Новый ПостроительСхемXML;
СхемаXML = ПостроительСхем.СоздатьСхемуXML(ДокументDOM.ЭлементДокумента);
НаборСхемXML = Новый НаборСхемXML;
НаборСхемXML.Добавить(СхемаXML);
Фабрика_XDTO = Новый ФабрикаXDTO(НаборСхемXML);
Возврат Фабрика_XDTO;
КонецФункции
&НаСервереБезКонтекста
Функция ОбъектXDTO_ПолучитьJSON(ОбъектXDTO, Фабрика_XDTO)
ЗаписьJSON = Новый ЗаписьJSON;
ЗаписьJSON.УстановитьСтроку();
Фабрика_XDTO.ЗаписатьJSON(ЗаписьJSON, ОбъектXDTO, НазначениеТипаXML.Неявное);
ТекстJSON = ЗаписьJSON.Закрыть();
// платформа помещает наш объект в свою структуру с ключом #value,
// можем отдельно получить значение по ключу и еще раз сериализовать,
// либо просто вырезать открывающий и закрывающий тег
СтрокаТега1С = """#value"": {";
ПозицияНачалоТега1С = СтрНайти(ТекстJSON, СтрокаТега1С);
ПозицияКонецТега1С = ПозицияНачалоТега1С + СтрДлина(СтрокаТега1С);
ТекстJSON = Лев(ТекстJSON, ПозицияНачалоТега1С - 2) + Сред(ТекстJSON, ПозицияКонецТега1С + 1, СтрДлина(ТекстJSON) - ПозицияКонецТега1С - 1);
Возврат ТекстJSON;
КонецФункции
Схема сообщений:
<xs:schema xmlns:tns="http://rest_rmq" xmlns:xs="http://www.w3.org/2001/XMLSchema" targetNamespace="http://rest_rmq" attributeFormDefault="unqualified" elementFormDefault="qualified">
<xs:simpleType name="ackmode_type">
<xs:restriction base="xs:string">
<xs:enumeration value="ack_requeue_true"/>
<xs:enumeration value="reject_requeue_true"/>
<xs:enumeration value="ack_requeue_false"/>
<xs:enumeration value="reject_requeue_false"/>
</xs:restriction>
</xs:simpleType>
<xs:complexType name="collection_messages_in">
<xs:sequence>
<xs:element name="message" type="tns:message_in" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
</xs:complexType>
<xs:simpleType name="encoding_type">
<xs:restriction base="xs:string">
<xs:enumeration value="auto"/>
<xs:enumeration value="base64"/>
</xs:restriction>
</xs:simpleType>
<xs:complexType name="message_in">
<xs:sequence>
<xs:element name="payload_bytes" type="xs:integer"/>
<xs:element name="redelivered" type="xs:boolean"/>
<xs:element name="exchange" type="xs:string"/>
<xs:element name="routing_key" type="xs:string"/>
<xs:element name="message_count" type="xs:integer"/>
<xs:element name="properties" type="tns:message_properties"/>
<xs:element name="payload" type="xs:string"/>
<xs:element name="payload_encoding" type="tns:payload_encoding_type"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="message_out">
<xs:sequence>
<xs:element name="properties" type="tns:message_properties"/>
<xs:element name="routing_key" type="xs:string" nillable="true"/>
<xs:element name="payload" type="xs:string"/>
<xs:element name="payload_encoding" type="tns:payload_encoding_type"/>
</xs:sequence>
</xs:complexType>
<xs:complexType name="message_properties"/>
<xs:complexType name="messages_request">
<xs:sequence>
<xs:element name="count" type="xs:integer"/>
<xs:element name="ackmode" type="tns:ackmode_type"/>
<xs:element name="encoding" type="tns:encoding_type"/>
<xs:element name="truncate" type="xs:integer" minOccurs="0"/>
</xs:sequence>
</xs:complexType>
<xs:simpleType name="payload_encoding_type">
<xs:restriction base="xs:string">
<xs:enumeration value="string"/>
<xs:enumeration value="base64"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>
Схему можно поместить в отдельный файл xsd, в текстовый макет или можно импортировать в конфигурацию в виде XDTO пакета и использовать встроенный в платформу объект ФабрикаXDTO вместо своей фабрики.
И в конце, чтобы не создавать отдельную публикацию, поделюсь удачным, как мне кажется, опытом настройки двухстороннего обмена сообщениями между несколькими базами. Этот вопрос почему-то особо не поднимается и у каждого, кто настраивает RabbitMQ в первый раз возникает вопрос: а сколько точек обмена и очередей создавать и как маршрутизировать это. И с первого раза обычно получается огород из очередей и точек, хаотично связанных, так что непосвященному вообще тяжело разобраться что для чего и куда идут сообщения.
Рассматривается свой, не типовой план обмена, вероятно в больших, серьезных обменах стоит рассмотреть стандартные планы обмена типа СинхронизацияЧерезУниверсальныйФормат, но это тема отдельного разговора в каких случаях это оправдано.
Мой вариант:
Предположим у нас есть три базы: ЗУП, БП и ДО, кадровая информация из ЗУП должна попасть в БУ и ДО, оттуда должны придти подтверждения (тикеты) о том, что сообщения приняты успешно. Мы присваиваем базам идентификаторы: hrm, acc, do и создаем соответствующие узлы в базах (в БП можно не создавать узел ДО, соответственно в ДО не создавать БП), присваиваем узлам соответствующие идентификаторы (например в коде узла), предопределенному узлу присваиваем идентификатор этой базы.
В RabbitMQ мы настраиваем три точки обмена (exchange) с идентификаторами баз и три очереди (queue) с такими же идентификаторами. Далее создаются привязки (bindings) каждой очереди с корреспондирующими точками обмена и указывается ключ маршрутизации (routing key), совпадающий с именем точки обмена. То есть в нашем случае создаются привязки hrm -> do (key: do), hrm -> acc (key: acc), do -> hrm (key: hrm), acc -> hrm (key: hrm).
Пример для очереди hrm:
Сообщение из ЗУП в ДО отправляем в точку обмена с именем, соответствующему предопределенному узлу базы ЗУП (exchange = hrm) и ключом маршрутизации, соответствующему имени узла, в который отправляем сообщение (routing key = do). Соответственно в ДО мы получаем сообщение из очереди обмена с именем, соответствующему предопределенному узлу базы ДО (queue = do). Откуда пришло сообщение мы узнаем из свойства сообщения exchange, по этому идентификатору мы находим узел базы, откуда сообщение пришло и применяем соответствующие правила и настройки.
То есть, допустим сообщение с кадровой информацией из ЗУП ушло в БП и ДО, соответственно оттуда пришли подтверждения о получении, они попадают в очередь hrm, в одну кучу. Мы получаем массив сообщений и у каждого сообщения считываем свойство exchange, где будут указаны соответственно do и acc - то есть каждому сообщению мы сопоставляем узел, считываем настройки и правила обработки сообщений и обрабатываем.
На этом собственно всё. На данный момент интеграция работает в рабочем контуре без каких-либо проблем, впрочем у меня настроен обмен кадровой информацией и большие нагрузки для этой интеграции не характерны, первоначальные заполнения баз, когда выгружалось/загружалось пару тысяч сообщений обмен переварил нормально.
Обработка разрабатывалась и тестировалась на платформе 8.3.21, но должна работать и на более ранних версиях платформы.
Конфигурация не важна, методы БСП не использовались.