Использование таблиц SQL Server в качестве очередей сообщений

13.06.20

Разработка - Механизмы платформы 1С

Статья о событийно-ориентированной интеграции и об асинхронной обработке данных в контексте 1C под управлением SQL Server. Подробно разбирается вопрос использования таблиц СУБД в качестве очередей сообщений.

Пример практического использования технологии вложил на GitHub.

 

Для начала хочу констатировать тот факт, что любое обращение к внешним по отношению к СУБД ресурсам внутри транзакции СУБД - это зло. Например, никому из нас не приходит в голову, что в процедуре "ОбработкаПроведения" можно записать сообщение обмена в файл или выполнить обмен через http-сервис с другой информационной базой. Очевидно, что так делать нельзя.

 

Некоторые считают, что подобный код вполне безобиден, ну или, в крайнем случае, несёт в себе незначительные риски.

 

Процедура ОбработкаПроведения(Отказ, РежимПроведения)

	// Прикладной код 1С …

	ОтправитьСообщениеВОчередь(ЭтотОбъект); // обращение к внешнему ресурсу

КонецПроцедуры

 

Мол мы быстренько дёрнем брокер сообщений, потом сразу же зафиксируем транзакцию и всё будет просто супер. Не будет. И вот почему — транзакия проведения документа может стать вложенной. Кто её, где и каким образом сделает вложенной неизвестно. Например, посмотрите на следующий код:

 

НачатьТранзакцию();

СписокДокументов = СоздатьСвязанныеДокументы();

Для Каждого Док Из СписокДокументов Цикл

	// обращение к внешнему ресурсу в процедуре "ОбработкаПроведения"
	ВсёПровелось = ПровестиСвязанныйДокумент(Док);

	Если Не ВсёПровелось Тогда
		Прервать;
	КонецЕсли;

КонецЦикла;

Если ВсёПровелось Тогда
	ЗафиксироватьТранзакцию();
Иначе
	ОтменитьТранзакцию(); // внешний ресурс уже получил наши документы !
КонецЕсли;

 

Это типичный пример "Хотели как лучше, а получилось как всегда" (С). Так делать не надо!

 

На вопрос "как надо" фирма 1С уже ответила - планы обмена. Кроме этого в типовых библиотеках интеграции от 1С можно встретить регистры сведений, название которых начинается на слово "Очередь". Ещё один пример - регистры сведений для отложенного проведения документов. Другими словами для организации асинхронной обработки данных в контексте СУБД необходимо использовать таблицы-очереди. Запись в эти вспомогательные таблицы выполняется одновременно с записью в основные таблицы данных в локальной транзакции СУБД. Таким образом, в том числе, реализуется так называемый транзакционный обмен сообщениями.

 

Казалось бы, что на этом мою статью можно закончить, но нет. Практика использования планов обмена и регистров сведений в качестве очередей говорит о том, что оба этих механизма имеют существенные недостатки. О планах обмена я предлагаю почитать мою статью на Инфостарте "Планы обмена 1С".

 

Существенным недостатком регистров сведений при использовании их в качестве таблиц-очередей, я бы отметил тот факт, что для чтения записей (сообщений) из этой таблицы с последующим их удалением (типичный сценарий обработки таблицы как очереди) потребуется выполнить две команды СУБД. Сначала нужно прочитать записи (сообщения), а затем, в случае их успешной обработки, удалить.

 

При этом очень важно в момент потребления сообщения заблокировать запись регистра сведений таким образом, чтобы другие транзакции не смогли её прочитать и одновременно с этим не были бы заблокированы ожиданием на чтение. Это необходимо для того, чтобы, например, не отправить одно и то же сообщение дважды. К сожалению достичь обеих целей одновременно средствами платформы 1С невозможно.

 

Единственным надёжным способом организовать параллельную обработку записей регистра сведений является разделение записей при помощи его измерений, например, по типам сообщений или каким-то другим видам значений. Каждый поток работает только со своим разделителем. Такая техника приводит в любом случае к необходимости монопольного доступа к записям, но на этот раз в разрезе разделителя.

 

Таким образом доступ к регистру сведений в целях обработки сообщений должен быть монопольным. В принципе, если быть объективным, то это не такая уж и большая проблема. Одно фоновое задание читает сообщения и распределяет их по дочерним фоновым заданиям - обработчикам этих сообщений. Это работает.

 

Второй проблемой регистров сведений, используемых в качестве очередей, является невозможность их программного создания и удаления. Преодолеть это ограничение средствами платформы 1С также невозможно.

 

После того, как асинхронная задача в виде сообщения или просто записи в таблице-очереди успешно сохранена, можно заняться её обработкой или доставкой во внешнюю информационную систему. На этом этапе можно использовать все доступные вам средства: регламентные задания 1С (по отношению к СУБД это внешний процесс), файловый обмен, web и http сервисы, брокер сообщений RabbitMQ и так далее.

 

Если говорить о доставке сообщений во внешнюю СУБД, например, информационную базу 1С, то я считаю хорошей практикой создание в базе-приёмнике аналогичной входящей очереди, чтобы опять же обрабатывать её в контексте локальной транзакции базы-приёмника.

 

При приёмке сообщений существуют проблемы их дублирования, синхронизации данных по версиям, соблюдение очерёдности сообщений, разрешение коллизий изменения данных, а также обработка так называемых "отравленных" сообщений, но это уже тема для отдельной статьи. Пишите в комментариях к статье какие из этих тем вам могли бы быть интересны.

 

Далее я предлагаю ознакомиться с советами эксперта по SQL Server Ремуса Русану - одного из программистов ядра SQL Server. Он написал очень известную статью "Использование таблиц в качестве очередей". В этой статье он рассматривает различные виды таблиц-очередей. Вариантов таких таблиц может быть множество в зависимости от решаемых прикладных задач. Я опишу лишь некоторые из них.

 

Ключевым моментом нижеприведённых скриптов является техника так называемого "деструктивного чтения", при которой происходит удаление записи таблицы с одновременным её чтением. Эта возможность впервые появилась в SQL Server 2005. В коде SQL это выглядит так: DELETEOUTPUT

 

Вторым ключевым моментом является использование хинтов ROWLOCK и READPAST для параллельной обработки записей таблиц-очередей несколькими транзакциями одновременно. Про хинт ROWLOCK я писал здесь. Про хинт READPAST я писал тут.

 

Таблица-очередей "Куча" (heap)

–- Создание таблицы-очереди
CREATE TABLE [HeapQueue]
(
    Payload varbinary(max)
);
GO

–- Процедура помещения сообщения в очередь
CREATE PROCEDURE [usp_EnqueueMessage]
    @payload varbinary(max)
AS
    INSERT [HeapQueue] (Payload) VALUES @payload;
GO

–- Процедура потребления сообщения из очереди
CREATE PROCEDURE [usp_DequeueMessage]
AS
    DELETE TOP(1)
        [HeapQueue] WITH(rowlock, readpast)
    OUTPUT
        deleted.Payload;
GO

Таблица-очередь "куча" не имеет индексов. Хинты ROWLOCK и READPAST позволяют нескольким транзакциям одновременно потреблять сообщения не блокируя друг друга. Это решение очень хорошо масштабируется за счёт добавления необходимого количества потребителей. При этом очередность потребления сообщений не гарантируется.

 

В контексте 1С такую очередь использовать можно, но нужно иметь ввиду, что часто сообщением является объект конфигурации, например, элемент справочника или документ. Очередность их отправки во внешнюю систему может не иметь значения, но наличие в очереди нескольких версий одного и того же объекта крайне нежелательно, ну или, как минимум, нужно аккуратно учитывать.

 

Таблица-очередь "Чтение по времени" (pending)

 

–- Создание таблицы-очереди
CREATE TABLE [PendingQueue]
(
    WaitForTime datetime NOT NULL,
    Payload varbinary(max)
);
CREATE CLUSTERED INDEX [cdxPendingQueue] ON [PendingQueue] (WaitForTime);
GO

–- Процедура помещения сообщения в очередь
CREATE PROCEDURE [usp_EnqueueMessage]
    @waitForTime datetime,
    @payload varbinary(max)
AS
    INSERT [PendingQueue] (WaitForTime, Payload) VALUES @waitForTime, @payload;
GO

–- Процедура потребления сообщения из очереди
CREATE PROCEDURE [usp_DequeueMessage]
AS
    WITH [CTE] AS
    (
        SELECT TOP(1)
            [Payload]
        FROM
            [PendingQueue] WITH(rowlock, readpast)
        WHERE
            [WaitForTime] < GETUTCDATE()
        ORDER BY
            [WaitForTime]
    )
    DELETE
        [CTE]
    OUTPUT
        deleted.Payload;
GO

 

 

Таблица-очередь "чтение по времени" имеет кластерный индекс по полю "WaitForTime", которое используется для определения времени потребления сообщений. Хинты ROWLOCK и READPAST позволяют нескольким транзакциям одновременно потреблять сообщения не блокируя друг друга. Такой вид таблицы-очереди можно использовать для балансировки нагрузки потребления сообщений по времени. При этом очередность потребления сообщений гарантируется всё тем же кластерным индексом и предложением ORDER BY в запросе, но есть один нюанс.

 

Предположим транзакция № 1 прочитала из очереди первые 10 сообщений, а вторая транзакция № 2 прочитала следующие 10 сообщений. Транзакция № 2 успешно обработала сообщения и завершилась без ошибок. В этот момент транзакция № 1 была отменена и все её сообщения вернулись в очередь. В таком случае случится нарушение очерёдности потребления сообщений.

 

Если очерёдность потребления и обработки сообщений должна быть гарантирована, то необходимо из запроса деструктивного чтения убрать хинт READPAST. В таком случае только одна транзакция сможет потреблять сообщения из очереди - остальные будут ожидать на блокировке. Следующий пример демонстрирует реализацию подобного требования.

 

Таблица-очередей "FIFO" (FIFO strict order required)

(соблюдение очерёдности обработки сообщений обязательно)

 

–- Создание таблицы-очереди
CREATE TABLE [FIFOQueue]
(
    ConsumeOrder bigint NOT NULL IDENTITY(1,1),
    Payload varbinary(max)
);
CREATE CLUSTERED INDEX [cdxFIFOQueue] ON [FIFOQueue] (ConsumeOrder);
GO

–- Процедура помещения сообщения в очередь
CREATE PROCEDURE [usp_EnqueueMessage]
    @payload varbinary(max)
AS
    INSERT [FIFOQueue] (Payload) VALUES @payload;
GO

–- Процедура потребления сообщения из очереди
CREATE PROCEDURE [usp_DequeueMessage]
AS
    WITH [CTE] AS
    (
        SELECT TOP(1)
            [Payload]
        FROM
            [FIFOQueue] WITH(rowlock)
        ORDER BY
            [ConsumeOrder]
    )
    DELETE
        [CTE]
    OUTPUT
        deleted.Payload;
GO

 

 

Таблица-очередь "FIFO" имеет кластерный индекс по полю "ConsumeOrder", которое фиксирует порядок потребления сообщений в сочетании с предложением ORDER BY. Отсутствие хинта READPAST определяет потребление сообщений только одной транзакцией в один и тот же момент времени. Таким образом гарантируется соблюдение очередности потребления сообщений. В контексте 1С это, например, может быть необходимо для соблюдения очерёдности проведения документов.

 

Справедливости ради следует отметить, что этот вариант мало чем отличается от монопольного использования регистров сведений 1С одним потребителем. Разница заключается только в количестве запросов, выполняемых в СУБД (1 против 2) и возможности создавать очереди динамически в коде 1С ("можно" против "нельзя").

 

Вывод.

 

Реализовать по-настоящему надёжную с точки зрения обеспечения целостности данных событийно-ориентированную интеграцию или асинхронную обработку данных в распределённых информационных системах, использующих СУБД SQL Server, возможно только двумя способами:

 

1. Использовать внешний процесс, который постоянно с какой-то периодичностью будет опрашивать таблицу-очередь, например, регламентное задание 1С. Это мы все умеем делать.

 

2. Использовать функционал "Activation" SQL Server Service Broker. Это отдельная тема.

Видео-презентацию использования Service Broker в контексте 1С можно посмотреть здесь.

 

На этом пока всё. Спасибо за внимание!

Обмен данными асинхронная обработка данных событийно-ориентированная интеграция SQL Server Service Broker планы обмена

См. также

Механизмы платформы 1С Программист Платформа 1С v8.3 Бесплатно (free)

В платформе 8.3.27 появилась возможность использовать WebSocket-клиент. Давайте посмотрим, как это все устроено и чем оно нам полезно.

14.01.2025    3958    dsdred    38    

80

Механизмы платформы 1С Программист Стажер Платформа 1С v8.3 Конфигурации 1cv8 Бесплатно (free)

Эта небольшая статья - некоторого рода шпаргалка по файловым потокам: как и зачем с ними работать, какие преимущества это дает.

23.06.2024    9422    bayselonarrend    20    

158

Механизмы платформы 1С Программист Стажер Платформа 1С v8.3 Конфигурации 1cv8 Бесплатно (free)

Пример использования «Сервисов интеграции» без подключения к Шине и без обменов.

13.03.2024    6881    dsdred    18    

80

Механизмы платформы 1С Программист Стажер Платформа 1С v8.3 Бесплатно (free)

Все мы используем массивы в своем коде. Это один из первых объектов, который дают ученикам при прохождении обучения программированию. Но умеем ли мы ими пользоваться? В этой статье я хочу показать все методы массива, а также некоторые фишки в работе с массивами.

24.01.2024    21761    YA_418728146    26    

73

Механизмы платформы 1С Программист Бесплатно (free)

Язык программирования 1С содержит много нюансов и особенностей, которые могут приводить к неожиданным для разработчика результатам. Сталкиваясь с ними, программист начинает лучше понимать логику платформы, а значит, быстрее выявлять ошибки и видеть потенциальные узкие места своего кода там, где позже можно было бы ещё долго медитировать с отладчиком в поисках источника проблемы. Мы рассмотрим разные примеры поведения кода 1С. Разберём результаты выполнения и ответим на вопросы «Почему?», «Как же так?» и «Зачем нам это знать?». 

06.10.2023    24983    SeiOkami    48    

136
Комментарии
Подписаться на ответы Инфостарт бот Сортировка: Древо развёрнутое
Свернуть все
2. Cyberhawk 135 27.03.20 17:30 Сейчас в теме
заблокировать запись регистра сведений таким образом, чтобы другие транзакции не смогли её прочитать и одновременно с этим не были бы заблокированы ожиданием на чтение
Неправда. ЗаблокироватьДанныеДляРедактирования - без какого-либо ожидания, т.е. сразу выдает отлуп, если любой другой сеанс раньше уже успел это сделать.
3. zhichkin 1531 27.03.20 17:58 Сейчас в теме
(2)
ЗаблокироватьДанныеДляРедактирования

Это объектная блокировка. Она не запрещает чтение данных, например, запросом, а также не запрещает запись в других пользовательских сеансах. Эту технику конечно же можно использовать, если все программисты будут соблюдать соглашение о вызове этого метода глобального контекста перед тем, как что-то читать из базы или изменять в ней.

Та цитата из моей статьи, которую Вы приводите, о блокировках и транзакциях СУБД. Это принципиально другая вещь.
4. Cyberhawk 135 27.03.20 19:10 Сейчас в теме
(3) Про первый абзац все понятно.
Но цитату предваряют твои же слова
очень важно в момент потребления сообщения
. Неужели у тебя в прикладном коде так много мест, где читаются / пишутся эти сообщения? Можно ведь заменить эти места на один-единственный метод (точку входа). Даже если их несколько, нет никаких проблем "расставить сети" (объектную блокировку) во всех таких местах. В идеале - реализовать и предоставить разработчику программный интерфейс, чтобы отпала необходимость предъявлять вообще какие-то требования к разработчику, который будет его использовать.
А вот как обсуждение конкретного механизма (чтение сообщений) перетекло в глобальное
что-то читать из базы или изменять в ней
, не особо ясно, но это, наверное, и не важно. Я лишь отвечал на конкретный фрагмент описания конкретного механизма.
5. zhichkin 1531 27.03.20 19:26 Сейчас в теме
(4) Я с Вами не спорю. Я с Вами соглашаюсь. Объектная блокировка безусловно очень нужная и полезная вещь.
В идеале - реализовать и предоставить разработчику программный интерфейс

Так и нужно делать. Ещё раз согласен с Вами.
6. zhichkin 1531 02.06.20 16:09 Сейчас в теме
Добавил пример практического применения технологии на GitHub.
https://github.com/zhichkin/one-c-sharp-sql/tree/master/messaging/table-queues
7. zhichkin 1531 30.10.20 12:26 Сейчас в теме
8. VVi3ard 52 29.03.21 23:34 Сейчас в теме
Это хорошо до тех пор пока к вам не приходят менеджеры из компании с госучастием и не говорят что через 3 месяца мы должны переехать на Linux + PG в рамках импортозамещения.

Перевозить всё это SQL хозяйства с MS на PG будет очень тяжело.

Если мы работаем с платформой значит нужно долбить вендора.
9. zhichkin 1531 29.03.21 23:47 Сейчас в теме
(8) PG тоже можно использовать. Например, у меня FIFO очереди на PG работают.
Пример скрипта для PG:
WITH cte AS
(SEL ECT Payload
FR OM FIFOQueue
ORDER BY ConsumeOrder ASC
LIMIT 10)
DELETE FR OM FIFOQueue t USING cte
WH ERE t.ConsumeOrder = cte.ConsumeOrder
RETURNING
t.Payload;
Показать
10. zhichkin 1531 16.03.22 00:49 Сейчас в теме
Век живи - век учись =)
Выяснилось, что в данной статье я по факту описываю паттерны Transactional Outbox + Polling Publisher, которые были описаны в книге Криса Ричардсона "Паттерны микросервисов" (Chris Richardson "Microservices Patterns"). Однако, справедливости ради, следует отметить, что статья Ремуса Русану, на которую я ссылаюсь, гораздо полнее раскрывает тему таблиц-очередей, их разновидностей и возможных реализаций.
Оставьте свое сообщение