Здравствуйте, коллеги.
В своей работе в некоторых задачах использую преимущества обмена данными с использованием очередей сообщений RabbitMQ (например обмен сообщениями между сайтом и РМК, между РМК и ЕРП, между ЕРП и сайтом).
Когда может понадобится RabbitMQ:
- Когда требуется реализовать асинхронное взаимодействие между составляющими в распределительной системе. Одно сообщение может быть обработано сразу несколькими сервисами.
- Когда необходимо обрабатывать большое количество сообщений. Это снимает нагрузку и гарантирует, что данные будут доставлены до адресата.
- Когда должна быть возможность масштабирования и обработки больших потоков информации.
- Когда требуется обеспечение отказоустойчивости и надёжности системы. В случае сбоя одного из компонентов, RabbitMQ сохраняет сообщения и передаёт их, как только проблемный он вновь сможет нормально работать.
- В микросервисных архитектурах для обеспечения независимой работы каждого сервиса и обмена данными между ними.
- При интеграции систем — брокер служит связующим звеном между системами и также отвечает за обмен данными между ними.
До недавнего момента для работы с RMQ использовал очень удобную компоненту PinkRabbitMQ, однако после установки свежей платформы компонента стала часто отваливаться (на сервере под Linux) и решено было реализовать новые возможности платформы 1С для подключения к RabbitMQ с помощью WebSockets (тем более что давно хотел это сделать, да все не было времени).
Представляю к ознакомлению расширение в котором реализованы основные функции работы с брокером сообщений: установка соединения, отправка сообщений, чтение очередей и подтверждение сообщений.
Для начала, для тех кто собирается тестить с нуля опишу как установить и настроить RabbitMQ для работы с использованием WebSocket соединений (пришлось потратить некоторое время на исследования правильной настройки плагинов RMQ и изучение протокола STOMP, поэтому хочу избавить вас от этой рутины и привести некоторую минимально достаточную выжимку).
Нам потребуется сервер RabbitMQ и свежая платформа 1С.
Итак, начнем.
- Установка RabbitMQ:
Идем на github Releases · rabbitmq/rabbitmq-server скачиваем последнюю версию RabbitMQ и запускаем установщик. Для установки соглашаемся скачать erlang, ждем окончания загрузки и завершаем установку RabbitMQ.
Переходим в каталог установленной версии (для версии 3.13.6 это каталог «C:\Program Files\RabbitMQ Server\rabbitmq_server-3.13.6\sbin») открываем окно PowerShell (Shift + ПКМ) и выполняем команду:
.\rabbitmq-plugins list
По этой команде будет выведен список подключенных расширений.
Для удобства управления можно включить rabbitmq_management, для этого выполняем команду:
.\rabbitmq-plugins enable rabbitmq_management
Включаем rabbitmq_web_stomp. Для этого выполняем команду:
.\rabbitmq-plugins enable rabbitmq_web_stomp
и перезагружаем сервер командой:
.\rabbitmq-server restart
После перезагрузки убеждаемся что все необходимые плагины подключены. Выполняем команду:
.\rabbitmq-plugins list
И убеждаемся что напротив плагинов rabbitmq_management и rabbitmq_web_stomp установлены значки «E*». «E» означает что плагин включен явно, «e» - что плагин включен неявно (необходим для запуска другого плагина) , «*» означает что плагин запущен.
После этих манипуляций можно в браузере перейти по пути localhost:15672 и увидеть окно входа в web интерфейс RabbitMQ:
|
Логинимся как пользователь guest и пароль guest. По умолчанию эти учетные данные работают только для localhost, извне с ними не подключиться, поэтому создаем нового пользователя:
|
Закладка «Admin» - «Add user» - задаем имя пользователя и пароль (я назначил имя: webuser, пароль: root) и указываем набор прав (я указал Admin – «administrator»)
и задаем ему права: кликаем на пользователя в списке и жмем: «Set permission» и «Set topic permission»:
Теперь мы видим что пользователю «webuser» назначены права на виртуальный хост «/» чего нам пока достаточно.
- Подключение и работа с брокером посредством веб сокетов в 1С
Работа с WebSocket соединениями в 1С происходит посредством объекта «WebSocketКлиентСоединения».
Подключение к RabbitMQ WebSTOMP по умолчанию производится по порту 15 674, а строка подключения выглядит так:
ws://server:port/ws
где ws - не защищенный сервер, wss – защищенный
например:
ws://10.20.4.17:15674/ws
или
wss://10.20.4.17:15674/ws
по localhost можно подключиться под именем guest с паролем guest, однако для подключения снаружи необходимо создать нового пользователя с необходимыми правами.
Для тех кто хочет полностью погрузиться во все прелести протокола STOMP – вот ссылка на официальную документацию: https://stomp.github.io/stomp-specification-1.2.html
Вкратце выглядит все так:
-
Открываем соединение с сервером.
-
Отправляем КАДР (специальным образом скомпонованное сообщение) «CONNECT» (или «STOMP» для протокола версии 1.2). В кадре необходимо указать:
-
с какой версией STOMP мы можем работать
-
имя пользователя
-
пароль
-
виртуальный хост
-
параметры сердцебиения для поддержки соединения
-
-
В ответ ждем КАДР «CONNECTED» после чего можем начать общение с брокером. В этом кадре сервер сообщает:
-
какую версию STOMP он поддерживает
-
версию сервера RabbitMQ
-
параметры сердцебиения
-
-
Для отправки сообщения отправляем КАДР «SEND» с указанием:
-
направления: тип приемника (очередь, точка обмена, топик), имя приемника
-
тип содержимого
-
тело сообщения
-
-
Для получения сообщений из очереди необходимо на нее подписаться. Для этого необходимо отправить КАДР «SUBSCRIBE» в котором следует указать:
-
имя (идентификатор) подписки
-
направление получения: тип приемника (очередь, точка обмена, топик), имя приемника
-
тип подтверждения: auto (все полученные сообщения сразу считаются подтвержденными), client (необходимо подтвердить все последние полученные сообщения), client-individual (необходимо подтвердить получение каждого сообщения)
-
После установки подписки начнется получение сообщений из очереди
-
Для подтверждения сообщения используется кадр «ACK» в который передаются:
-
имя (идентификатор) подписки
-
имя подписки
-
Основные виды получаемых кадров:
-
«ERROR» - на стороне сервера произошла ошибка или ошибка обработки отправленного КАДРА
-
«CONNECTED» - соединение успешно установлено
-
«MESSAGE» - сообщение из очереди
-
«RECEIPT» - подтверждение приема сообщения
-
«» (пустое сообщение) – сердцебиение сервера
есть еще много разных кадров, подробнее в официальной документации
Почти все кадры устроены одинаково:
-
Команда
-
Заголовки
-
тело (не у всех кадров)
Команда отделяется от заголовков переводом строки, строки заголовков разделяются переводом строки, тело отделяется от заголовков двумя переводами строки, КАДР завершается символом нуля ( в 1С это Символ(0) ).
Специальный кадр сердцебиения состоит только из переводов строк и завершения кадра.
Примеры кадров:
CONNECT
accept-version:1.0,1.1,1.2
login:webuser
passcode:root
host:ptr_sl
heart-beat:1000,1000
CONNECTED
server:RabbitMQ/3.13.6
session:session-Vx4C1sHsxP07z6oXRthBJQ
heart-beat:1000,1000
version:1.2
SEND
destination:/queue/que2
content-type:application/json
receipt:18f41e42-5542-483a-972d-bdf4532f0686
Проверка
SUBSCRIBE
id:que2
destination:/queue/que2
ack:client-individual
ACK
id: t@111
subscription: sub1
RECEIPT
receipt-id:18f41e42-5542-483a-972d-bdf4532f0686
Ниже приведен код основного серверного модуля расширения для демонстрации реализации интерфейса:
#Область Служебные_функции
Функция СимволРазделительСтрокКадра() Экспорт
Возврат Символы.ПС;
КонецФункции
Функция СимволРазделительЧастейКадра() Экспорт
Возврат Символы.ПС + Символы.ПС;
КонецФункции
Функция СимволЗавершениеКадра() Экспорт
Возврат Символ(0);
КонецФункции
Функция ДанныеКадра(вхКадр) Экспорт
тДанныеКадра = Новый Структура("Команда,Заголовки,Тело");
тЧастиКадра = _СтроковыеФункцииКлиентСервер.РазложитьСтрокуВМассивПодстрок(вхКадр,СимволРазделительЧастейКадра(),Ложь,Истина);
//тЧастиКадра = СтрРазделить(вхКадр,СимволРазделительЧастейКадра(),Ложь);
Если тЧастиКадра.Количество() = 0 Тогда
Возврат тДанныеКадра;
КонецЕсли;
тСлужебнаяЧасть = тЧастиКадра[0];
тСтрокиСлужебнойЧасти = СтрРазделить(тСлужебнаяЧасть,СимволРазделительСтрокКадра(),Ложь);
тКомандаКадра = "";
Если ЗначениеЗаполнено(тСтрокиСлужебнойЧасти) Тогда
тКомандаКадра = тСтрокиСлужебнойЧасти[0];
КонецЕсли;
тЗаголовкиКадра = Новый Соответствие;
Для тНомерСтроки = 1 по тСтрокиСлужебнойЧасти.Количество() - 1 Цикл
тСтрока = тСтрокиСлужебнойЧасти[тНомерСтроки];
тПервоеВхождениеРазделителя = СтрНайти(тСтрока,":");
Если тПервоеВхождениеРазделителя = 0 Тогда
Продолжить;
КонецЕсли;
тСвойство = Лев(тСтрока,тПервоеВхождениеРазделителя - 1);
тЗначение = Сред(тСтрока,тПервоеВхождениеРазделителя + 1);
тЗаголовкиКадра.Вставить(тСвойство,тЗначение);
КонецЦикла;
тТелоКадра = "";
Если тЧастиКадра.Количество() > 1 Тогда
тТелоКадра = тЧастиКадра[1];
КонецЕсли;
//Убираем завершающий символ (0) чтобы текст можно было отправить на клиентскую часть
тТелоКадра = СтрЗаменить(тТелоКадра,СимволЗавершениеКадра(),"");
тДанныеКадра.Вставить("Команда", ВРег(СокрЛП(тКомандаКадра)));
тДанныеКадра.Вставить("Заголовки", тЗаголовкиКадра);
тДанныеКадра.Вставить("Тело", тТелоКадра);
Возврат тДанныеКадра;
КонецФункции
Функция ПараметрыПротоколаSTOMПоддерживаемыеВерсии(вхЗначение)
Если ЗначениеЗаполнено(вхЗначение) Тогда
Возврат вхЗначение;
КонецЕсли;
Возврат "1.0";
КонецФункции
Функция ПараметрыПротоколаSTOMPКадрСоединения(вхЗначение)
Если ЗначениеЗаполнено(вхЗначение) Тогда
Возврат вхЗначение;
КонецЕсли;
Возврат "CONNECT";
КонецФункции
Функция ПараметрыПротоколаSTOMPСердцебиение(вхЗначение)
Если ЗначениеЗаполнено(вхЗначение) Тогда
Возврат вхЗначение;
КонецЕсли;
Возврат "0,0";
КонецФункции
Функция ЭкранироватьСимволы(вхСтрока)
тЭкранируемыеСимволы = Новый Соответствие;
тЭкранируемыеСимволы.Вставить("\","\\");
тЭкранируемыеСимволы.Вставить(":","\c");
тЭкранируемыеСимволы.Вставить(Символ(10),"\n");
тЭкранируемыеСимволы.Вставить(Символ(13),"\r");
тСтрока = вхСтрока;
Для Каждого тКлючЗначение из тЭкранируемыеСимволы Цикл
тСтрока = СтрЗаменить(тСтрока,тКлючЗначение.Ключ,тКлючЗначение.Значение);
КонецЦикла;
Возврат тСтрока;
КонецФункции
Функция КадрСоединения(вхКадр)
Если ЗначениеЗаполнено(вхКадр) Тогда
Возврат вхКадр;
КонецЕсли;
Возврат "CONNECT";
КонецФункции
Функция ТипПриемника(вхПриемник) Экспорт
тПриемник = вхПриемник;
Если НЕ ЗначениеЗаполнено(вхПриемник) Тогда
тПриемник = "queue";
КонецЕсли;
Возврат тПриемник;
КонецФункции
Функция ЗначениеДополнительногоПараметраСоединения(вхСоединениеИлиКлюч,вхИмяПараметра,вхЗначениеПоУмолчанию = Неопределено) Экспорт
тСоединение = ПолучитьСоединение(вхСоединениеИлиКлюч);
Если Не ТипЗнч(тСоединение) = Тип("WebSocketКлиентСоединение") Тогда
Возврат Неопределено;
КонецЕсли;
тДопПараметры = тСоединение.Параметры.ДополнительныеПараметры;
Если НЕ ТипЗнч(тДопПараметры) = Тип("Структура") Тогда
Возврат вхЗначениеПоУмолчанию;
КонецЕсли;
тЗначение = _ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(тДопПараметры,вхИмяПараметра,вхЗначениеПоУмолчанию);
Возврат тЗначение;
КонецФункции
Функция СтрокаJSON(вхДанные,вхВозврОшибка = Неопределено) Экспорт
тЗаписьJSON = Новый ЗаписьJSON;
тПараметрыЗаписиJSON = Новый ПараметрыЗаписиJSON();
//
тСтрокаJSON = Неопределено;
Попытка
тЗаписьJSON.УстановитьСтроку(тПараметрыЗаписиJSON);
ЗаписатьJSON(тЗаписьJSON, вхДанные);
тСтрокаJSON = тЗаписьJSON.Закрыть();
Исключение
тОписаниеОшибки = ОписаниеОшибки();
вхВозврОшибка = тОписаниеОшибки;
Возврат Неопределено;
КонецПопытки;
Возврат тСтрокаJSON;
КонецФункции
Функция ДанныеJSON(вхСтрокаJSON,вхВозврОшибка = Неопределено,вхПрочитатьВСоответствие = Ложь) Экспорт
вхВозврОшибка = Неопределено;
//
тЧтениеJSON = Новый ЧтениеJSON;
Попытка
тЧтениеJSON.УстановитьСтроку(вхСтрокаJSON);
тДанные = ПрочитатьJSON(тЧтениеJSON,вхПрочитатьВСоответствие);
Исключение
вхВозврОшибка = ОписаниеОшибки();
Возврат Неопределено;
КонецПопытки;
Возврат тДанные;
КонецФункции
Функция ДанныеОбъектаВСтроку(вхОбъект,вхВозврИтоговыйТекст = "",вхСдвиг = 0,вхВыводитьТипДанных = Ложь) Экспорт
тСдвигСтр = "";
Для тСчетчикСдвигов = 1 по вхСдвиг Цикл
тСдвигСтр = тСдвигСтр + Символ(9);
КонецЦикла;
//
Если ТипЗнч(вхОбъект) = Тип("Структура") Тогда
Если вхВыводитьТипДанных Тогда
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + Строка(ТипЗнч(вхОбъект)) + ": ";
КонецЕсли;
Если ЗначениеЗаполнено(вхВозврИтоговыйТекст) Тогда
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + Символы.ПС;
КонецЕсли;
Для Каждого тЭлементСтруктуры из вхОбъект Цикл
тСтрока = тСдвигСтр + тЭлементСтруктуры.Ключ + ": ";
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + тСтрока;
ДанныеОбъектаВСтроку(тЭлементСтруктуры.Значение,вхВозврИтоговыйТекст,вхСдвиг+1);
КонецЦикла;
ИначеЕсли ТипЗнч(вхОбъект) = Тип("Массив") Тогда
Если вхВыводитьТипДанных Тогда
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + Строка(ТипЗнч(вхОбъект)) + " (" + Строка(вхОбъект.Количество()) + "):";
КонецЕсли;
Если ЗначениеЗаполнено(вхВозврИтоговыйТекст) Тогда
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + Символы.ПС;
КонецЕсли;
тИндексЭлемента = -1;
Для Каждого тЭлементМасива из вхОбъект Цикл
тИндексЭлемента = тИндексЭлемента + 1;
//
тСтрока = тСдвигСтр + "("+Строка(тИндексЭлемента)+") " + ":";
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + тСтрока;
ДанныеОбъектаВСтроку(тЭлементМасива,вхВозврИтоговыйТекст,вхСдвиг+1);
КонецЦикла;
ИначеЕсли ТипЗнч(вхОбъект) = Тип("Соответствие") Тогда
Если вхВыводитьТипДанных Тогда
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + ТипЗнч(вхОбъект) + ": ";
КонецЕсли;
Если ЗначениеЗаполнено(вхВозврИтоговыйТекст) Тогда
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + Символы.ПС;
КонецЕсли;
Для Каждого тЭлементСоответствия из вхОбъект Цикл
тСтрока = тСдвигСтр + тЭлементСоответствия.Ключ + ": ";
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + тСтрока;
ДанныеОбъектаВСтроку(тЭлементСоответствия.Значение,вхВозврИтоговыйТекст,вхСдвиг+1);
КонецЦикла;
Иначе
тСтрока = " " + Строка(вхОбъект) + Символы.ПС;
вхВозврИтоговыйТекст = вхВозврИтоговыйТекст + тСтрока;
КонецЕсли;
Возврат вхВозврИтоговыйТекст;
КонецФункции
#КонецОбласти
Функция ПолучитьСоединение(Знач вхСоединениеИлиКлюч) Экспорт
тСоединение = вхСоединениеИлиКлюч;
Если НЕ ТипЗнч(тСоединение) = Тип("WebSocketКлиентСоединение") Тогда
тСоединение = WebSocketКлиентСоединения.ПолучитьСоединение(вхСоединениеИлиКлюч);
КонецЕсли;
Возврат тСоединение;
КонецФункции
Процедура ОкрытьСоединение(Знач вхКлюч,Знач вхПараметрыПодключения,вхДополнительныеПараметры = Неопределено,вхВозвОписаниеОшибки = "") Экспорт
тСоединение = ПолучитьСоединение(вхКлюч);
Если тСоединение = Неопределено Тогда
тПараметрыСоединения = вхПараметрыПодключения;
тОбработчики = Новый ОбработчикиWebSocketКлиентСоединения("ПриОткрытииСоединения", "ПриПолученииСообщения","ПриОшибке", "ПриЗакрытииСоединения", RMQWebSocketsSTOMP);
тДополнительныеПараметры = Новый Структура;
тДополнительныеПараметры.Вставить("ПараметрыСоединения", тПараметрыСоединения);
Если ТипЗнч(вхДополнительныеПараметры) = Тип("Структура") Тогда
Для Каждого тКлючЗначение из вхДополнительныеПараметры Цикл
тДополнительныеПараметры.Вставить(тКлючЗначение.Ключ,тКлючЗначение.Значение);
КонецЦикла;
КонецЕсли;
тПараметры = Новый ПараметрыWebSocketКлиентСоединения;
тПараметры.ДополнительныеПараметры = тДополнительныеПараметры;
тПараметры.Заголовки = тПараметрыСоединения.Заголовки;
//тПараметры.ИспользоватьАутентификациюОС = Неопределено;
//тПараметры.ИспользоватьПроксиОС = Неопределено;
//тПараметры.Прокси = Неопределено;
тПараметры.Пароль = тПараметрыСоединения.Пароль;
тПараметры.Пользователь = тПараметрыСоединения.ИмяПользователя;
тПараметры.Таймаут = _ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(тПараметрыСоединения,"Таймаут",10);
тЗащищенное = _ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(тПараметрыСоединения,"ЗащищенноеПодключение",Ложь);
Если тЗащищенное = Истина Тогда
тЗащищенное = "s";
Иначе
тЗащищенное = "";
КонецЕсли;
тАдресСервера = _ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(тПараметрыСоединения,"Сервер", "");
тПорт = _ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(тПараметрыСоединения,"Порт", 15674);
тПорт = Формат(тПорт,"ЧГ=");
тСтрокаСоединения = СтрШаблон("ws%1://%2:%3/ws",тЗащищенное,тАдресСервера,тПорт);
Попытка
тСоединение = WebSocketКлиентСоединения.ОткрытьСоединение(вхКлюч,тСтрокаСоединения,тОбработчики,тПараметры);
Исключение
вхВозвОписаниеОшибки = ОписаниеОшибки();
Возврат;
КонецПопытки;
КонецЕсли;
КонецПроцедуры
Процедура ЗакрытьСоединение(вхКлючИлиСоединение,вхВозвОписаниеОшибки = "") Экспорт
тСоединение = Неопределено;
Если ТипЗнч(вхКлючИлиСоединение) = Тип("Строка") Тогда
тСоединение = WebSocketКлиентСоединения.ПолучитьСоединение(вхКлючИлиСоединение);
КонецЕсли;
Если тСоединение = Неопределено Тогда
Возврат;
КонецЕсли;
Попытка
тСоединение.Закрыть();
Исключение
вхВозвОписаниеОшибки = ОписаниеОшибки();
КонецПопытки;
КонецПроцедуры
Процедура УстановитьСоединение(Знач вхСоединение,вхВозвОписаниеОшибки = "")
тПараметрыСоединения = ЗначениеДополнительногоПараметраСоединения(вхСоединение,"ПараметрыСоединения",Неопределено);
Если НЕ ТипЗнч(тПараметрыСоединения) = Тип("Структура") Тогда
вхВозвОписаниеОшибки = СтрШаблон("Не удалось определить параметры соединения [%1]",вхСоединение.Ключ);
Возврат;
КонецЕсли;
тКадрСоединения = ВРЕГ(ПараметрыПротоколаSTOMPКадрСоединения(тПараметрыСоединения.ПараметрыWebSTOMP["КадрСоединения"]));
//тЗащищенное = ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(тПараметрыСоединения,"WebSocketЗащищенноеСоединение",Ложь);
тСообщение =
тКадрСоединения + СимволРазделительСтрокКадра() +
"accept-version:" + ПараметрыПротоколаSTOMПоддерживаемыеВерсии(тПараметрыСоединения.ПараметрыWebSTOMP["ПоддерживаемыеВерсии"]) + СимволРазделительСтрокКадра() +
"login:" + тПараметрыСоединения.ИмяПользователя+ СимволРазделительСтрокКадра() +
"passcode:" + ЭкранироватьСимволы(тПараметрыСоединения.Пароль) + СимволРазделительСтрокКадра() +
"host:" + тПараметрыСоединения.ВиртуальныйХост +
СимволРазделительСтрокКадра() +
"heart-beat:" + ПараметрыПротоколаSTOMPСердцебиение(тПараметрыСоединения.ПараметрыWebSTOMP["Сердцебиение"]) +
СимволРазделительЧастейКадра() +
СимволЗавершениеКадра();
вхСоединение.ОтправитьСообщение(тСообщение);
КонецПроцедуры
Процедура ЗавершитьОформлениеПодписокСоединения(Знач вхСоединение)
тДанныеОчередей = ЗначениеДополнительногоПараметраСоединения(вхСоединение,"ДанныеОчередей",Новый Массив);
Для Каждого тДанныеОчередиКлючЗначение из тДанныеОчередей Цикл
тДанныеОчереди = тДанныеОчередиКлючЗначение.Значение;
тОписаниеОшибки = "";
ПодписатьсяНаОчередь(вхСоединение.Ключ,ТипПриемника(тДанныеОчереди.ТипПриемника),тДанныеОчереди.ИмяПриемника,тДанныеОчереди.ИмяПодписки,,тОписаниеОшибки);
КонецЦикла;
КонецПроцедуры
Процедура ОтправитьСообщения(вхСоединение,вхВозвОписаниеОшибки = "")
тДанныеСообщений = ЗначениеДополнительногоПараметраСоединения(вхСоединение,"ДанныеСообщений",Новый Массив);
Для Каждого тДанныеСообщения из тДанныеСообщений Цикл
ОтправитьСообщение(вхСоединение,ТипПриемника(тДанныеСообщения.ТипПриемника),тДанныеСообщения.ИмяПриемника,тДанныеСообщения.ТелоСообщения,тДанныеСообщения.Идентификатор,,вхВозвОписаниеОшибки);
КонецЦикла;
КонецПроцедуры
Процедура ОтправитьСообщение(Знач вхСоединениеИлиКлюч,Знач вхПриемник = "queue",Знач вхИмяПриемника,Знач вхДанныеСообщения,вхИдентификаторСообщения = Неопределено,Знач вхТипСодержимого = "application/json",вхВозвОписаниеОшибки = "") Экспорт
тСоединение = ПолучитьСоединение(вхСоединениеИлиКлюч);
Если тСоединение = Неопределено Тогда
вхВозвОписаниеОшибки = СтрШаблон("Не удалось определить соединение по ключу [%1]",вхСоединениеИлиКлюч);
Возврат ;
КонецЕсли;
Если ТипЗнч(вхДанныеСообщения) = Тип("Строка") Тогда
тДанныеКОтправкеJSON = вхДанныеСообщения;
Иначе
тДанныеКОтправкеJSON = СтрокаJSON(вхДанныеСообщения);
КонецЕсли;
//тЧислоСимволов = Формат(СтрДлина(тДанныеКОтправкеJSON),"ЧГ="); //Для бинарных данных
тЗапросЧека = "";
Если ЗначениеЗаполнено(вхИдентификаторСообщения) Тогда
тЗапросЧека = "receipt:" + СокрЛП(Строка(вхИдентификаторСообщения)) + СимволРазделительСтрокКадра();
КонецЕсли;
тСообщение =
"SEND" + СимволРазделительСтрокКадра() +
"destination:/" + вхПриемник + "/" + вхИмяПриемника + СимволРазделительСтрокКадра() +
//"content-length:" + тЧислоСимволов + СимволРазделительСтрокКадра() +
"content-type:" + вхТипСодержимого + СимволРазделительСтрокКадра() +
тЗапросЧека +
СимволРазделительЧастейКадра() +
тДанныеКОтправкеJSON
+
СимволЗавершениеКадра();
тСоединение.ОтправитьСообщение(тСообщение);
КонецПроцедуры
Процедура ОтправитьСердцебиение(вхСоединение) Экспорт
Если НЕ вхСоединение.ПолучитьСостояние() = СостояниеWebSocketСоединения.Открыто Тогда
Возврат;
КонецЕсли;
тСообщение = СимволРазделительСтрокКадра();
вхСоединение.ОтправитьСообщение(тСообщение);
КонецПроцедуры
Процедура ПодтвердитьСообщение(Знач вхКлючИлиСоединения,Знач вхИдентификаторПодписки,Знач вхИдентификаторСообщения,вхВозвОписаниеОшибки = "") Экспорт
тСоединение = ПолучитьСоединение(вхКлючИлиСоединения);
Если тСоединение = Неопределено Тогда
вхВозвОписаниеОшибки = СтрШаблон("Соединение с ключем %1 не найдено!",тСоединение.Ключ);
Возврат;
КонецЕсли;
тСообщение =
"ACK" + СимволРазделительСтрокКадра() +
"message-id:" + вхИдентификаторСообщения + СимволРазделительСтрокКадра() +
"id:" + вхИдентификаторСообщения + СимволРазделительСтрокКадра() +
"subscription:" + вхИдентификаторПодписки +
СимволРазделительЧастейКадра() +
СимволЗавершениеКадра();
тСоединение.ОтправитьСообщение(тСообщение);
КонецПроцедуры
Процедура ПодписатьсяНаОчередь(Знач вхСоединениеИлиКлюч,Знач вхПриемник = "queue",Знач вхИмяПриемника,Знач вхИдентификаторПодписки,Знач вхТипПодтверждения = "client-individual",вхВозвОписаниеОшибки = "") Экспорт
тСоединение = ПолучитьСоединение(вхСоединениеИлиКлюч);
Если тСоединение = Неопределено Тогда
Если тСоединение = Неопределено Тогда
вхВозвОписаниеОшибки = СтрШаблон("Соединение по ключу [%1] не найдено!",вхСоединениеИлиКлюч);
Возврат;
КонецЕсли;
КонецЕсли;
тСообщение =
"SUBSCRIBE" + СимволРазделительСтрокКадра() +
"id:" + вхИдентификаторПодписки + СимволРазделительСтрокКадра() +
"destination:/"+ вхПриемник + "/" + вхИмяПриемника + СимволРазделительСтрокКадра() +
"ack:" + вхТипПодтверждения + СимволРазделительЧастейКадра() +
СимволЗавершениеКадра();
тСтатусСоединения = тСоединение.ПолучитьСостояние();
Если НЕ тСтатусСоединения = СостояниеWebSocketСоединения.Открыто Тогда
вхВозвОписаниеОшибки = СтрШаблон("Соединение [%1] в статусе: [%2]!",вхСоединениеИлиКлюч,тСтатусСоединения);
Возврат;
КонецЕсли;
тСоединение.ОтправитьСообщение(тСообщение);
КонецПроцедуры
Функция ОбработатьСообщение(вхСоединение, вхСообщение) Экспорт
тДанныеКадра = ДанныеКадра(вхСообщение);
Если тДанныеКадра.Команда = "ERROR" Тогда
ИначеЕсли тДанныеКадра.Команда = "CONNECTED" Тогда
ПриУстановкеWebSTOMPСоединения(вхСоединение,тДанныеКадра);
ИначеЕсли тДанныеКадра.Команда = "MESSAGE" Тогда
тИдентификаторСообщения = тДанныеКадра.Заголовки["message-id"];
ПриПолученииВходящегоСообщения(вхСоединение,тИдентификаторСообщения,тДанныеКадра);
ИначеЕсли тДанныеКадра.Команда = "RECEIPT" Тогда
тИдентификаторСообщения = тДанныеКадра.Заголовки["receipt-id"];
ПриПодтвержденииПриемаИсходящегоСообщения(вхСоединение,тИдентификаторСообщения,тДанныеКадра);
ИначеЕсли НЕ ЗначениеЗаполнено(тДанныеКадра.Команда) Тогда
//Сердцебиение
ПриПолученииСерцебиения(вхСоединение);
Иначе
КонецЕсли;
Возврат тДанныеКадра;
КонецФункции
#Область Обработчики_событий
Процедура ПриОткрытииСоединения(вхСоединение) Экспорт
УстановитьСоединение(вхСоединение);
КонецПроцедуры
Процедура ПриПолученииСообщения(вхСоединение, вхСообщение) Экспорт
Попытка
ОбработатьСообщение(вхСоединение, вхСообщение);
Исключение
тОписание = ОписаниеОшибки();
КонецПопытки;
КонецПроцедуры
Процедура ПриОшибке(вхСоединение,вхКодОшибки,вхОписаниеОшибки) Экспорт
КонецПроцедуры
Процедура ПриЗакрытииСоединения(вхСоединение,вхКодЗакрытия) Экспорт
КонецПроцедуры
Процедура ПриУстановкеWebSTOMPСоединения(вхСоединение,вхДанныеКадра)
//Отпраляем сообщения
ОтправитьСообщения(вхСоединение);
//Подключаемся к очередям
ЗавершитьОформлениеПодписокСоединения(вхСоединение);
КонецПроцедуры
Процедура ПриПодтвержденииПриемаИсходящегоСообщения(вхСоединение,вхИдентификаторСообщения,вхДанныеКадра)
КонецПроцедуры
Процедура ПриПолученииВходящегоСообщения(вхСоединение,вхИдентификаторСообщения,вхДанныеКадра);
тДанныеСообщения = Новый Структура;
тДанныеСообщения.Вставить("ИдентификаторСообщения", вхИдентификаторСообщения);
тДанныеСообщения.Вставить("ИдентификаторПодписки", вхДанныеКадра.Заголовки["subscription"]);
тДанныеСообщения.Вставить("КлючСоединения", вхСоединение.Ключ);
тДанныеСообщения.Вставить("Направление", вхДанныеКадра.Заголовки["destination"]);
тДанныеСообщения.Вставить("Тело", вхДанныеКадра.Тело);
СообщенияОчередиОбработатьСообщениеWebSTOMP(вхСоединение,тДанныеСообщения.ИдентификаторПодписки,тДанныеСообщения);
КонецПроцедуры
Процедура СообщенияОчередиОбработатьСообщениеWebSTOMP(вхСоединение,вхИдентификаторПодписки,вхДанныеСообщения,вхВозвОписаниеОшибки = "")
тДанныеОчередей = ЗначениеДополнительногоПараметраСоединения(вхСоединение,"ДанныеОчередей",Новый Массив);
тДанныеОчереди = тДанныеОчередей[вхИдентификаторПодписки];
Если НЕ тДанныеОчереди = Неопределено Тогда
Если тДанныеОчереди.Подтвердить = Истина Тогда
ПодтвердитьСообщение(вхСоединение,вхИдентификаторПодписки,вхДанныеСообщения.ИдентификаторСообщения,вхВозвОписаниеОшибки)
КонецЕсли;
КонецЕсли;
КонецПроцедуры
Процедура ПриПолученииСерцебиения(вхСоединение)
Попытка
тПараметрыСоединения = ЗначениеДополнительногоПараметраСоединения(вхСоединение,"ПараметрыСоединения",Новый Структура);
тПараметрыWebSTOMP = _ОбщегоНазначенияКлиентСервер.СвойствоСтруктуры(тПараметрыСоединения,"ПараметрыWebSTOMP",Новый Соответствие);
тПоддерживатьСердцебиение = тПараметрыWebSTOMP["ПоддерживатьСердцебиение"];
Исключение
тПоддерживатьСердцебиение = Ложь;
КонецПопытки;
Если (тПоддерживатьСердцебиение = Истина) ИЛИ (ВРег(тПоддерживатьСердцебиение) = "ДА") Тогда
ОтправитьСердцебиение(вхСоединение);
КонецЕсли;
КонецПроцедуры
#КонецОбласти
С какими неудобствами столкнулся:
- Значение свойства "Дополнительные параметры" свойства "Параметры" объекта WebSocketКлиентСоединения восстанавливаются (возвращаются) в каждом событии (коллбэке) к значениям заданным при открытии соединения (инициализации). Поэтому передавать какие-то данные между колбэками приходится через записи БД (временное хранилище тоже не работает)
- WebSocket соединение закрывается без предупреждения если КАДР содержит некорректные данные (команда, значения заголовков), что усложняет отладку.
В целом замена компоненты на WebSTOMP и новые встроенные возможности 1С по работе с WebSockets прошла успешно и результаты работы обмена с очередями RabbitMQ посредством протокола STOMP полностью удовлетворительны.
В прикрепленном файле расширения продемонстрированы основные приемы работы с протоколом STOMP с уведомлениями на стороне клиента. Расширение использует несколько общих функций из БСП.
Спасибо за внимание.
Проверено на следующих конфигурациях и релизах:
- 1С:Библиотека стандартных подсистем, редакция 3.1, релизы 3.1.11.228
Вступайте в нашу телеграмм-группу Инфостарт