Многопоточная выгрузка одного сообщения обмена

Опубликовал Дмитрий Жичкин (zhichkin) в раздел Программирование - Практика программирования

Публикация описывает, как можно распараллелить выгрузку одного сообщения обмена.

В своей статье о планах обмена я поделился результатами исследования этого объекта конфигурации.

В этой публикации я хочу озвучить идею как можно распараллелить выгрузку сообщения обмена.

Заранее прошу прощения если это "баян". В таком случае просьба дать ссылки где почитать источники.

Код состоит из двух частей:

1. Управление формированием фоновых заданий и объединение результатов их работы.

2. Управление выполнением списка фоновых заданий.

Полный исходный код реализации идеи находится во вложенном к публикации файле.

Идея заключается в следующем:

1. Мы как обычно начинаем выгрузку.

2. В момент, когда начинаем выбирать изменения, мы распараллеливаем запись файла сообщения обмена по объектам конфигурации, которые входят в состав плана обмена. Для каждого такого объекта мы создаём отдельное фоновое задание и пишем все его изменения в отдельный файл - часть сообщения обмена. Подробнее можно посмотреть в моей статье в разделе "Запись изменений в сообщение обмена".

3. Ожидаем завершения выполнения всех сформированных заранее фоновых заданий.

4. Объединяем результаты фоновых заданий в один главный файл сообщения обмена.

5. Завершаем выгрузку как обычно.

Код главной управляющей процедуры выглядит следующим образом:

Процедура СформироватьСообщениеОбмена()
	
	УзелОбмена = ПланыОбмена.Тестовый.НайтиПоНаименованию("Узел получатель", Истина);
	
	// Получаем список объектов конфигурации, который входят в состав плана обмена
	СоставПланаОбмена = Метаданные.ПланыОбмена.Тестовый.Состав;
	
	// Открываем главный файл сообщения обмена
	ЗаписьXML = Новый ЗаписьXML();
	ПолноеИмяФайла = КаталогСообщений + "\" + Строка(УзелОбмена.УникальныйИдентификатор()) + ".xml";
	ЗаписьXML.ОткрытьФайл(ПолноеИмяФайла);
	
	// Создаём объект "ЗаписьСообщенияОбмена" и блокируем узел (объектная блокировка)
	ЗаписьСообщения = ПланыОбмена.СоздатьЗаписьСообщения();
	ЗаписьСообщения.НачатьЗапись(ЗаписьXML, УзелОбмена);
	НомерСообщения = ЗаписьСообщения.НомерСообщения;
	
	// Массив фоновых заданий - по одному на каждый объект конфигурации, который входит в состав плана обмена
	СписокФоновыхЗаданий = Новый Массив();
	
	// Количество постоянно активных фоновых заданий.
	// Управление их выполнением осуществляется специальными процедурами (см. ниже).
	// Обычно их количество должно равняться количеству ядер сервера.
	// На практике нужно подбирать их количество в зависимости от загрузки сервера.
	// Слишком большое количество фоновых заданий может "положить" сервер.
	КоличествоАктивныхЗаданий = 4;
	
	// Цикл формирования списка фоновых задний и их параметров
	Для Каждого Элемент Из СоставПланаОбмена Цикл
		
		ИмяЗадания = "Экспорт сообщения № " + Формат(НомерСообщения, "ЧГ=0") + ": " + Элемент.Метаданные.ПолноеИмя();
		
		Параметры = Новый Массив();
		Параметры.Добавить(УзелОбмена);
		Параметры.Добавить(НомерСообщения);
		Параметры.Добавить(Элемент.Метаданные);
		
		ПараметрыФоновогоЗадания = Новый Структура();
		ПараметрыФоновогоЗадания.Вставить("ИмяЗадания", ИмяЗадания);
		ПараметрыФоновогоЗадания.Вставить("ИмяПроцедуры", "ОбщийМодульОбменаДанными.ВыполнитьВыгрузкуИзмененийОбъектаКонфигурации");
		ПараметрыФоновогоЗадания.Вставить("ПараметрыПроцедуры", Параметры);
		
		СписокФоновыхЗаданий.Добавить(ПараметрыФоновогоЗадания);
		
	КонецЦикла;

	// Передача списка фоновых заданий управляющим процедурам на выполнение
	// Формируем части сообщения обмена по одному файлу для каждого объекта конфигурации
	ВыполнитьФоновыеЗадания(СписокФоновыхЗаданий, КоличествоАктивныхЗаданий);
	
	// Собираем все части сообщения обмена вместе в главном файле (см. выше)
	СформироватьЕдиноеСообщениеОбмена(СоставПланаОбмена, УзелОбмена, НомерСообщения, ЗаписьXML);
	
	// Завершаем формирование сообщения обмена
	ЗаписьСообщения.ЗакончитьЗапись();
	
КонецПроцедуры

Очень важно ничего не напутать с именованием файлов частей сообщения. Для этого код формирования такого имени вынесен в отдельную функцию. Вот она:

Функция ПолучитьИмяФайлаЧастиСообщенияОбмена(УзелОбмена, НомерСообщения, ОбъектМетаданных)
	
	Возврат КаталогСообщений +
		"\" +
		Строка(УзелОбмена.УникальныйИдентификатор()) +
		"_" +
		Формат(НомерСообщения, "ЧГ=0") +
		"_" +
		ОбъектМетаданных.Имя + ".xml";
		
КонецФункции

Процедура, которая формирует файл части сообщения:

// Процедура общего модуля для вызова из фонового задания и формирования части сообщения по объекту метаданных
Процедура ВыполнитьВыгрузкуИзмененийОбъектаКонфигурации(УзелОбмена, НомерСообщения, ОбъектМетаданных) Экспорт
	
	Выборка = ПланыОбмена.ВыбратьИзменения(УзелОбмена, НомерСообщения, ОбъектМетаданных);
	
	Если Не Выборка.Следующий() Тогда
		Возврат;
	КонецЕсли;
	
	ЗаписьXML = Новый ЗаписьXML();
	ПолноеИмяФайла = ПолучитьИмяФайлаЧастиСообщенияОбмена(УзелОбмена, НомерСообщения, ОбъектМетаданных);
	ЗаписьXML.ОткрытьФайл(ПолноеИмяФайла);
	
	ЗаписатьXML(ЗаписьXML, Выборка.Получить());
	
	Пока Выборка.Следующий() Цикл
		ЗаписатьXML(ЗаписьXML, Выборка.Получить());
	КонецЦикла;
	
	ЗаписьXML.Закрыть();
	
КонецПроцедуры

Собираются все части вместе такой процедурой:

Процедура СформироватьЕдиноеСообщениеОбмена(СоставПланаОбмена, УзелОбмена, НомерСообщения, ЗаписьXML)
	
	Для Каждого ОбъектМетаданных Из СоставПланаОбмена Цикл
		
		ПолноеИмяФайла = ПолучитьИмяФайлаЧастиСообщенияОбмена(УзелОбмена, НомерСообщения, ОбъектМетаданных);
		
		Файл = Новый Файл(ПолноеИмяФайла);
		Если Файл.Существует() Тогда
			
			ЧтениеТекста = Новый ЧтениеТекста(ПолноеИмяФайла, КодировкаТекста.UTF8);
			
			// Дописываем часть в главный файл сообщения обмена
			ЗаписьXML.ЗаписатьБезОбработки(ЧтениеТекста.Прочитать());
			
		КонецЕсли;
		
	КонецЦикла;
	
КонецПроцедуры

Внимание!

Накладные расходы на создание файлов частей сообщения + последующее их объединение в один главный файл сообщения могут "съесть" всю выгоду от распараллеливания процесса. Это зависит от размеров такого сообщения. Тестируйте код перед его применением!

Скачать файлы

Наименование Файл Версия Размер
Многопоточная выгрузка одного сообщения обмена
.txt 8,28Kb
05.12.16
0
.txt 8,28Kb Скачать

См. также

Вознаграждение за ответ
Сумма: 1 $m
Добавили:
Константин (Mkonst) (1.00 $m)
Добавить вознаграждение
Комментарии
1. борян петров (TODD22) 15 05.12.16 14:50 Сейчас в теме
Этот код я так понимаю пишет в один файл? То есть это для выгрузки в один узел?
А можно таким же способом выгружать сразу по нескольким узлам? Например по 4 магазинам?
2. Дмитрий Жичкин (zhichkin) 189 05.12.16 15:09 Сейчас в теме
(1)
Этот код я так понимаю пишет в один файл? То есть это для выгрузки в один узел?

Этот код пишет несколькими заданиями в один файл - формирует одно сообщение обмена. Соответственно это сообщение для одного узла.
А можно таким же способом выгружать сразу по нескольким узлам? Например по 4 магазинам?

Если дописать логику формирования сообщений по нескольким узлам, то да, можно. Хоть для 100 узлов одновременно, если сервер потянет.
3. Caponid V (caponid) 05.12.16 17:24 Сейчас в теме
Вот тут "ЧтениеТекста.Прочитать()" вполне возможно получить когда нибудь "Out of memory". причем внезапно.. память течет и фрагментируется.
А вот тут "Выборка.Получить()" вместо объекта - блокировку
4. Дмитрий Жичкин (zhichkin) 189 05.12.16 17:57 Сейчас в теме
(3) Спасибо за дельный комментарий!
Вы правы - в таком виде код отправлять в бой никак нельзя.
Цель статьи: озвучить идею и дать прототип решения.
В целях упрощения восприятия исходный код максимально сокращён.
5. Дмитрий Жичкин (zhichkin) 189 05.12.16 18:47 Сейчас в теме
(3) Кстати, при управляемом режиме блокировок "Выборка.Получить()" не рискует нарваться на блокировку. Получится "грязное" чтение, так как при read_commited_snapshot = ON мы получим версию данных до начала блокирующей транзакции (UPDATE или DELETE в данном случае).
6. Максим Кузнецов (Makushimo) 148 06.12.16 06:25 Сейчас в теме
Про выгрузку по правилам обмена тут можно и не заикаться?
7. Дмитрий Жичкин (zhichkin) 189 06.12.16 08:12 Сейчас в теме
(6) Почему Вы так решили? Можно конкретный пример?
ПКО, например, можно передать в фоновое задание в виде параметра ... Возможно это потребует некоторой переделки самих правил, но сам механизм КД, на мой взгляд, это не отменяет.
8. Caponid V (caponid) 06.12.16 12:02 Сейчас в теме
(4)
Вы правы, блокировки обычно вот тут возникают - ПланыОбмена.ВыбратьИзменения
И дополнение - зачем все переносить в один файл? - это все таки довольно затратная операция. Проще все таки тогда все передать пакетом файлов - записать их имена в сообщение обмена - да и при загрузке многопоточку организовать
Как то так...
Для Каждого ОбъектМетаданных Из СоставПланаОбмена Цикл
    КраткоеИмяФайла = ПолучитьКраткоеИмяФайлаЧастиСообщенияОбмена(УзелОбмена, НомерСообщения, ОбъектМетаданных);
    Файл = Новый Файл(ПолноеИмяФайла);
	Если Файл.Существует() Тогда
		ЗаписьXML.ЗаписатьНачалоЭлемента("metaFiles", "http://metafiles");
		ЗаписьXML.ЗаписатьАтрибут("metaName", ОбъектМетаданных.ПолноеИмя());
		ЗаписьXML.ЗаписатьАтрибут("File", КраткоеИмяФайла);
		ЗаписьXML.ЗаписатьКонецЭлемента();	
	КонецЕсли;
КонецЦикла;
...Показать Скрыть
starik-2005; +1 Ответить 1
9. Дмитрий Жичкин (zhichkin) 189 06.12.16 12:41 Сейчас в теме
(8) Да, "ВыбратьИзменения" генерирует UPDATE номера сообщения в таблице регистрации изменений объекта. Если таблица большая, то эта команда выполняется долго и это может приводить даже к ошибкам превышения таймаута ожидания. Подробнее можно почитать в статье, ссылку на которую я даю в этой публикации.

По поводу отправки пакета файлов и организации многопоточной загрузки данных на принимающей стороне: согласен, можно (вероятно даже нужно) делать и так. Фантазия ограничена только возможностями. Цель этой публикации была именно в том, чтобы показать возможность.
10. Caponid V (caponid) 07.12.16 16:17 Сейчас в теме
Не тянет это на публикацию - нет цельной идеи.
11. Максим Горбачев (Tangram) 122 07.12.16 17:15 Сейчас в теме
Сейчас на практике реализую порционный обмен (по правилам обмена).
Я пошел другим путем - весь массив изменений, заранее выбранный запросами, бьется на порции, а потом каждая порция выбирается и выгружается.
По метаданным неудобно - в выгрузке может быть несколько сотен одинаковых объектов, например Реализаций, велика вероятность нарваться на блокировку.
12. Дмитрий Жичкин (zhichkin) 189 07.12.16 19:14 Сейчас в теме
(11)
потом каждая порция выбирается и выгружается

Это тоже вариант.
Уточните, пожалуйста, изменения заранее выбираются куда? В оперативную память?
Вы ПланыОбмена.ВыбратьИзменения используете? Что указываете в качестве третьего параметра фильтрации выборки?
велика вероятность нарваться на блокировку

Уточните, пожалуйста, какую блокировку Вы имеете ввиду? В какой момент выгрузки?

Обычно блокировка, которая мешает, это команда UPDATE при вызове ПланыОбмена.ВыбратьИзменения. Она возникает только на этом этапе и её длительность зависит исключительно от количества изменений в таблице и третьего параметра фильтрации этого метода. Предполагаю, что Вы используете массив ссылок на объекты, который является порцией данных ... Если это так, то, вероятно, это самая удачная реализация выборки изменений.
13. Максим Горбачев (Tangram) 122 07.12.16 22:52 Сейчас в теме
(12) да , изменения заранее выбираются в память. сейчас обмен в стадии тестов, в боевой базе у меня первоначально 70000-200000 объектов к выгрузке, надеюсь, сервер х64 простит мне это
В ВыбратьИзменения в третий параметр отдаю массив из 50-200 ссылок на объекты, выбранный из основного массива.
14. Максим Горбачев (Tangram) 122 07.12.16 23:08 Сейчас в теме
а вот про блокировки интересно...
Мои опыты на КА 1.1 (с изменениями) показывают следующее:
ВыбратьИзменения() отрабатывает оч. быстро, данные из боевой базы: 177000 объектов в выборке за 6 секунд.
А вот потом начинается мистика (или суровая реальность), из-за которой я затеял порционный обмен:
фактически все 177000 объектов блокируются на запись до окончания выгрузки до операторов
// Завершаем запись сообщения
ЗаписьСообщения.ЗакончитьЗапись();

ЗаписьXML.Закрыть();
Причем это я проверял даже на копии: выбираю все изменения, там 150 документов "Реализация". выбираю из них первые 50 и на них делаю ВыбратьИзменения(). иду отладчиком по процедуре выгрузки (я слегка модиф. типовую) и в другом сеансе пытаюсь проводить документы. Пока не пройдет ЗакончитьЗапись(), ни один из "выбранных" 50 документов перепровести нельзя (стандартная ошибка MS SQL на превышение времени блокировки), остальные пожалуйста.
15. Максим Горбачев (Tangram) 122 07.12.16 23:44 Сейчас в теме
А вот сейчас воспроизвел ситуацию на типовой КА 1.1.61.2 и все документы, попавшие в ВыбратьИзменения(), спокойно перепроводятся в процессе выгрузки...
Завтра попробую 1.1.78 и походу буду искать блокировку в доработках...
16. Дмитрий Жичкин (zhichkin) 189 07.12.16 23:59 Сейчас в теме
(14)
все 177000 объектов блокируются

Какой режим управления блокировками у Вас используется (автоматический или управляемый)? Где начинается и заканчивается транзакция? Нет ли эскалации блокировок? Какой набор данных блокируется на самом деле (нет ли сканов)? На все эти вопросы нужно дать однозначный ответ.

Не настаиваю, но рекомендую почитать мою статью про планы обмена: http://infostart.ru/public/561460/ Там всё очень подробно расписано, в том числе про блокировки. Кстати, если будут замечания или дополнения, то я с удовольствием внесу их в статью =)
17. Дмитрий Жичкин (zhichkin) 189 08.12.16 00:05 Сейчас в теме
(15) Интересно, а у Вас случайно нет там где-нибудь начала транзакции типа "НачатьТранзакцию" после "ВыбратьИзменения" или до этого, а "ЗафиксироватьТранзакцию" где-нибудь после "ЗакончитьЗапись" ? Чисто теоретически получилась бы одна большая транзакция, для которой все остальные были бы вложенными ... Честно говоря, была у меня мысль проверить такой случай, но руки пока что не дошли =) Если это неожиданно так, то границы транзакции могут быть значительно шире ...
18. Максим Горбачев (Tangram) 122 08.12.16 00:16 Сейчас в теме
(17)
а у Вас случайно нет там где-нибудь начала транзакции

есть, причем это типовой код.... сейчас проверю...
19. Дмитрий Жичкин (zhichkin) 189 08.12.16 00:22 Сейчас в теме
(18) Если у Вас на уровне SQL Server не отключена эскалация блокировок до уровня всей таблицы для таблицы регистрации изменений, то UPDATE более 5000 записей этой таблицы в результате вызова метода "ВыбратьИзменения" может привести к блокировке всей таблицы. Кажется я об этом тоже в своей вышеупомянутой статье писал ...
20. Максим Горбачев (Tangram) 122 08.12.16 00:28 Сейчас в теме
типовой код примерно такой:
НачатьТранзакцию();
ВыбратьИзменения(...все...);

Далее Цикл по выборке, но через каждые условно 10 объектов
идет ЗафиксироватьТранзакцию(); НачатьТранзакцию();

и в типовой конфе реально объекты, захваченные в первой десятке, отпускаются после первого же ЗафиксироватьТранзакцию().
а в моей базе почему-то не так...
Спасибо большое за наводки, завтра на свежую голову буду копать дальше. Если что интересное для науки найду, напишу ).
21. kolya_tlt kolya_tlt (kolya_tlt) 10 11.01.17 14:24 Сейчас в теме
Добрый день, Дмитрий.
вы не пробовали объединить общие для всех узлов справочники\регистры и выгрузить их один раз для всех узлов?
22. Дмитрий Жичкин (zhichkin) 189 11.01.17 14:40 Сейчас в теме
(21) Нет не пробовал. Если бы я решал подобную задачу, то я бы отказался от регистрации изменений по всем узлам. Делал бы это для одного какого-то узла по умолчанию, например, а потом выгружал бы один раз.
Кроме этого, после выгрузки по всем узлам нужно обновить значение реквизита "НомерОтправленного" для каждого из них. При этом обновить это значение нужно не только в плане обмена, но и в служебной таблице регистрации изменений объекта. Без вызова метода "ВыбратьИзменения" плана обмена сделать это невозможно, а первым обязательным (!) параметром этого метода является узел плана обмена. Короче говоря, всё сильно связанно друг с другом ...
23. kolya_tlt kolya_tlt (kolya_tlt) 10 12.01.17 09:22 Сейчас в теме
(22) поясните почему вы выбрали решение с одним узлом и как поддержка будет в этом случае контролировать обмен на скажем 500 магазинов?
24. Дмитрий Жичкин (zhichkin) 189 12.01.17 13:55 Сейчас в теме
(23) Стоп =) Конкретного решения для конкретной ситуации я не принимал =) Я дал, так скажем, дружеский совет =) Не более того =)

Если Вам нужно конкретное решение, то дайте, пожалуйста, конкретные цифры, описывающие параметры Вашей системы, описание текущих проблем, сформулируйте конкретные требования (пожелания) по решению этих проблем, целевые показатели системы, которых Вы хотели бы достигнуть. Вот тогда на основании анализа совершенно конкретных данных я смогу выбрать решение и даже обосновать его =)

По поводу 500 магазинов ... На моей практике РИБ в районе 150 узлов "умирает" или требует постоянного внимания и рукоприкладства. В таких случаях делается своя нетиповая разработка.