Команда BACKUP_TO_KAFKA
Содержание раздела
- Сценарии использования команды
- Режимы выгрузки бэкапа
- Содержимое бэкапа
- Брокеры Kafka для выгрузки бэкапа
- Аутентификация
- Как работает команда
- Использование команды
- Синтаксис
- Ограничения
- Примеры
- Варианты вывода команды
- Формат бэкапа в топике Kafka
Команда BACKUP_TO_KAFKA
выгружает бэкап логической базы данных в топик Kafka.
Выгружаемый бэкап содержит логическую схему данных и данные. Выгрузить данные можно с полной историей их изменений, частичной историей или без истории.
Из целостного бэкапа можно развернуть резервную копию логической БД или восстановить сбойную логическую БД, используя команду RESTORE_FROM_KAFKA.
Для запуска команды на хосте должна быть установлена среда исполнения Java (JRE) или набор инструментов для разработки на Java (JDK).
Бэкап не является полным слепком исходной логической БД, так как не включает standalone-таблицы и внешние таблицы, а также в некоторых случаях данные прокси-таблиц.
Сценарии использования команды
Доступные сценарии использования команды:
- бэкап текущего состояния логической БД:
- без истории изменений данных,
- с отрезком истории изменений данных указанной глубины,
- с полной историей изменений данных с момента создания БД до настоящего момента;
- бэкап недостающего отрезка истории изменений данных логической БД.
Режимы выгрузки бэкапа
Доступные режимы выгрузки бэкапа:
- инкрементальный бэкап — бэкап логической БД с полной историей изменений данных или недостающей частью истории, выгружаемый инкрементами (порциями по дельтам);
- снапшот-бэкап — бэкап логической БД с историей изменений данных указанной глубины.
В обоих режимах бэкап выгружается только при наличии дельт в логической БД. Если в логической БД нет ни одной дельты, вы можете создать пустую дельту и затем выгрузить бэкап.
Выгрузка инкрементального бэкапа недоступна, если какой-либо из датасорсов, где раньше хранились данные логической БД, теперь отсутствует в окружении (отключен физически).
За один запуск команды можно выгрузить один инкремент, все недостающие инкременты или снапшот. Подробнее см. в секции Использование команды.
Содержимое бэкапа
Поддерживаемые сущности
Команда выгружает логическую схему данных по всем видам сущностей, для которых ведется журнал, а также данные таблиц следующих видов:
- обычных логических таблиц,
- партиций,
- (только при выгрузке снапшота) прокси-таблиц.
Данные материализованных представлений не выгружаются командой, но они восстанавливаются системой после успешного развертывания таблиц-источников командой RESTORE_FROM_KAFKA. Данные остальных сущностей, включаемых в бэкап в составе логической схемы, — партиционированных таблиц и логических представлений — не выгружаются, так как эти сущности не хранят данные.
Внешние таблицы и standalone-таблицы не включаются в бэкап. При необходимости вы можете их пересоздать после успешного развертывания логической БД из бэкапа.
Содержимое инкремента
Инкремент содержит:
- журнал изменений логической схемы данных по состоянию на момент запуска команды;
- полную историю изменений, совершенных с данными логических таблиц в дельте и в промежутке между дельтой инкремента и предыдущей к ней дельтой, включая холодные данные.
Журнал в инкременте содержит записи:
- записанные в журнал оригинальной логической БД после момента времени, зафиксированного в снапшоте, — если инкремент выгружается в топик со снапшотом;
- все записи — иначе.
Номера и метки времени дельт в инкрементах соответствуют исходной логической БД. Сохранение нумерации и меток времени операций записи не гарантируется.
Содержимое снапшота
Снапшот содержит:
- состояние логической схемы данных на момент запуска команды;
- состояние данных логических таблиц и прокси-таблиц на момент времени, указанный в команде;
- (если snapshotDepth больше 0) историю изменений, совершенных с данными логических таблиц и прокси-таблиц с указанного момента времени по последнюю закрытую дельту включительно.
Технически снапшот выгружается в топик Kafka в таком же формате, как и инкрементальный бэкап.
Сохранение номеров и меток времени дельт и операций записи в снапшоте относительно исходной логической БД не гарантируется.
Подробнее о механизме выгрузки снапшота см. в секции Как работает команда > Выгрузка снапшота.
Отличие в составе данных между инкрементом и снапшотом
Инкремент содержит набор изменений данных, а снапшот без истории — снимок состояния данных.
Пример:
- исходные данные:
- дельта 1: добавили
запись А
,запись Б
; - дельта 2: удалили
запись А
;
- дельта 1: добавили
- инкременты:
- инкремент дельты 1:
запись А
,запись Б
— обе с признаком добавления (sys_op = 0
); - инкремент дельты 2:
запись А
с признаком удаления (sys_op = 1
);
- инкремент дельты 1:
- снапшоты:
- снапшот на дельту 1:
запись А
,запись Б
; - снапшот на дельту 2:
запись Б
.
- снапшот на дельту 1:
Брокеры Kafka для выгрузки бэкапа
При запуске команды нода Prostore выгружает промежуточные данные в служебные топики, откуда утилита вычитывает эти данные, формирует бэкап и записывает его в топик, указанный в команде.
По умолчанию (если в команде не указаны дополнительные брокеры Kafka) нода и утилита работают с общим набором брокеров, заданным в конфигурации ноды с помощью параметра ZOOKEEPER_KAFKA_ADDRESS
. При необходимости как для ноды, так и для утилиты можно указать другие брокеры Kafka с помощью ключевых слов kafkaBrokersCore и kafkaBrokers.
Утилита должна иметь доступ к брокерам Kafka, где будет размещаться бэкап. Утилита и нода Prostore также должны иметь доступ к брокерам Kafka, куда будут выгружаться промежуточные данные.
Аутентификация
Команда поддерживает передачу токена (JWT) для последующей аутентификации в системе.
Если в системе включена аутентификация запросов, необходимо указать информацию о токене в составе строки подключения команды. Подробнее о формате строки подключения см. в секции Синтаксис.
Как работает команда
Секция описывает механизмы выгрузки бэкапа:
- выгрузка одного инкремента,
- выгрузка инкрементов для диапазона дельт,
- выгрузка инкрементов всех или недостающих в бэкапе дельт,
- выгрузка снапшота.
Определение брокеров Kafka и выгрузка данных в топик
Оба вида бэкапа — инкременты и снапшоты — помещаются в топик Kafka в формате Avro в формате, описанном в секции Формат бэкапа в топике Kafka.
Бэкап выгружается одним или несколькими потоками — в зависимости от значения ключевого слова threadsCount. По умолчанию, если ключевое слово не указано, используется один поток.
Сначала данные, из которых формируется бэкап, выгружаются в промежуточные топики следующих брокеров Kafka:
- заданных с помощью ключевого слова kafkaBrokersCore — если команда содержит
kafkaBrokersCore
; - заданных с помощью параметра конфигурации
ZOOKEEPER_KAFKA_ADDRESS
— если команда не содержитkafkaBrokersCore
.
Затем из выгруженных данных формируется бэкап и размещается в топике следующих брокеров Kafka:
- заданных с помощью ключевого слова kafkaBrokers — если команда содержит
kafkaBrokers
; - заданных с помощью ключевого слова kafkaBrokersCore — если команда содержит
kafkaBrokersCore
и не содержитkafkaBrokers
; - заданных с помощью параметра конфигурации
ZOOKEEPER_KAFKA_ADDRESS
— если команда не содержит ниkafkaBrokers
, ниkafkaBrokersCore
.
В случае размещения бэкапа в новом топике этот топик разделяется на партиции, количество которых определяется:
- настройками Kafka — если в брокерах Kafka настроено автосоздание топиков;
- ключевым словом partitionsCount в команде — иначе.
После завершения формирования бэкапа команда удаляет промежуточные топики.
Выгрузка инкремента одной дельты
При запуске с ключевым словом deltaNum и без ключевого слова nextDeltasCount команда выгружает в топик инкремент указанной дельты.
Если целевой топик уже содержит снапшот, выгрузка инкремента той же логической БД доступна только для дельт, закрытых после зафиксированного в снапшоте состояния БД.
На рисунке ниже показан пример выгрузки инкремента дельты с номером 1. Инкремент содержит изменения данных w2
и w3
, внесенные между закрытием дельт 0 и 1, и все записи журнала по изменению схемы с с0
по с3
включительно.
Выгрузка инкрементов дельт из заданного диапазона
При запуске с ключевыми словами deltaNum и nextDeltasCount команда выгружает в топик инкременты дельт из диапазона [deltaNum, deltaNum + nextDeltasCount]
. Например, если значение deltaNum
равно 2, а значение nextDeltasCount
равно 3, выгружаются инкременты следующих дельт: 2, 3, 4, 5.
Если целевой топик уже содержит снапшот, выгрузка инкрементов той же логической БД доступна только для дельт, закрытых после зафиксированного в снапшоте состояния БД.
На рисунке ниже показан пример выгрузки инкрементов дельт из диапазона [1, 1+1]
. Инкременты содержат изменения данных w2
, w3
, w4
и w5
, и все записи журнала по изменению схемы с с0
по с3
включительно.
Выгрузка инкрементов всех дельт
При запуске без ключевых слов deltaNum и snapshotDepth команда выгружает в топик инкременты следующих дельт логической БД:
- всех закрытых дельт — если в топике, куда выгружаются данные, нет ни инкрементов, ни снапшота этой логической БД;
- «новых» закрытых дельт — если в топике уже есть снапшот или хотя бы один инкремент этой логической БД. Под «новыми» закрытыми дельтами понимаются дельты, закрытые после дельт, сохраненных в топике. Промежуточные дельты, если какие-то из них отсутствуют в топике, НЕ выгружаются.
На рисунке ниже показан пример выгрузки инкрементов всех дельт в новый топик. В этом случае команда выгружает все изменения данных и все записи журнала.
На рисунке ниже показан пример выгрузки инкрементов всех дельт в топик, где уже есть инкремент дельты 1 той же логической БД. В этом случае команда выгружает изменения данных w4
и w5
и все записи журнала.
Выгрузка снапшота
При запуске с ключевым словом snapshotDepth команда выгружает в топик снапшот логической БД, зафиксированный по состоянию на указанный момент времени. Если значение snapshotDepth
больше 0, в топик также выгружается история изменений данных с момента времени, зафиксированного в снапшоте, по последнюю закрытую дельту включительно.
На рисунке ниже показан пример выгрузки снапшота без истории. В этом случае команда выгружает состояние данных на дельту 2 и журнал с записями, отражающими состояние сущностей на момент запуска команды.
На рисунке ниже показан пример выгрузки снапшота с небольшим отрезком истории изменений. В этом случае команда выгружает состояние данных на дельту 1, изменения после этой дельты (w4 и w5), а также журнал с записями, отражающими состояние сущностей на момент запуска команды.
Если момент времени задан в команде как несколько дней, часов, минут или секунд назад, снапшот фиксируется по состоянию на последнюю закрытую на тот момент дельту. Например, если команда запускается с ключом --snapshotDepth 2d
12 января 2024 года в 10:08:14, снапшот выгружается по состоянию на дельту, которая была последней два дня назад (10 января 2024 года в 10:08:14).
Подробнее о возможных значениях ключевого слова см. в описании snapshotDepth.
Нумерация дельт в выгружаемом снапшоте сдвигается относительно исходной логической БД: снапшот сохраняется как начальное состояние (дельта 0), а последующие дельты получают номера, следующие по порядку, — 1, 2 и т.д. в зависимости от значения snapshotDepth
.
Нумерация записей в журнале снапшота также отличается от исходной логической БД: состояние логической схемы данных на момент запуска команды фиксируется как набор операций по созданию сущностей в текущем состоянии. Журнал снапшота не включает промежуточные операции по изменению и удалению сущностей, которые могли присутствовать в журнале исходной логической БД.
Использование команды
Выгрузка инкрементов
Проверка недостающих инкрементов
Чтобы проверить список дельт, инкрементов которых не хватает в топике до полного бэкапа, выполните команду RESTORE_FROM_KAFKA с ключевым словом readonly.
Выгрузка инкрементов всех дельт
Чтобы выгрузить инкременты всех или только недостающих закрытых дельт:
- Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
- Выполните команду
BACKUP_TO_KAFKA
без ключевых словdeltaNum
иshapshotDepth
. В команде укажите ключевое слово topic с именем топика:- в котором нет инкрементов и снапшота целевой логической БД — если нужно выгрузить полный бэкап логической БД с нулевой до последней закрытой дельты;
- в котором уже есть инкременты или снапшот целевой логической БД — если нужно обновить существующий бэкап последними изменениями логической БД.
Выгрузка инкремента одной дельты
Чтобы выгрузить инкремент одной дельты:
- Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
- Выполните команду
BACKUP_TO_KAFKA
с ключевым словом deltaNum, не указывая ключевое словоshapshotDepth
.
При выгрузке инкремента в топик, содержащий снапшот той же логической БД, значение deltaNum
должно быть больше номера дельты, по состоянию на которую зафиксирован снапшот.
Выгрузка инкрементов диапазона дельт
Чтобы выгрузить инкременты дельт из диапазона:
- Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
- Выполните команду
BACKUP_TO_KAFKA
с ключевыми словами deltaNum и nextDeltasCount, не указывая ключевое словоshapshotDepth
.
При выгрузке инкремента в топик, содержащий снапшот той же логической БД, значение deltaNum
должно быть больше номера дельты, по состоянию на которую зафиксирован снапшот.
Выгрузка снапшота
Чтобы выгрузить снапшот логической БД:
- Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
- Выполните команду
BACKUP_TO_KAFKA
с ключевым словом snapshotDepth, не указывая ключевое словоdeltaNum
.
Выгружайте каждый снапшот в новый топик. Система не гарантирует корректное восстановление логической БД из топика, содержащего несколько снапшотов или снапшоты, выгруженные после инкрементов.
Снятие блокировки логической БД в случае сбоя
Если во время работы команды произошел сбой, исходная логическая БД может остаться заблокированной для изменений схемы данных. В этом случае все DDL-запросы в логической БД возвращают ошибку Change operations are forbidden
.
Чтобы снять блокировку изменений в логической БД, выполните запрос ALLOW_CHANGES, где в качестве кода-пароля в запросе укажите строку в формате backup_<имя_логической_БД>_<число_месяца_запуска_команды>
. Например, если команда была запущена для логической БД marketing
9 февраля, следует выполнить запрос ALLOW_CHANGES(marketing, 'backup_marketing_9')
.
После снятия блокировки можно запустить выгрузку бэкапа еще раз.
Синтаксис
java -jar <dtm_tools_file_name>.jar \
--connect <prostore_host>:<prostore_port>/<db_name>[?user=jwt&password=<jwt_token>] \
backup_to_kafka \
[--datasourceType <datasource_name>] \
--topic <backup_topic> \
[–-kafkaBrokersCore <core_kafka_brokers_list>] \
[--kafkaBrokers <backup_kafka_broker_list>] \
[--deltaNum <delta_num>] \
[–-nextDeltasCount <next_deltas_count>] \
[--partitionsCount <partitions_count>] \
[--threadsCount <threads_count>] \
[--chunkSize <chunk_size>] \
[--kafkaRetries <kafka_retries>] \
[--snapshotDepth <snapshot_depth>]
Параметры
dtm_tools_file_name
-
Имя jar-файла утилиты DTM Tools.
Ключевые слова
--connect
Задает строку подключения к ноде Prostore. Значение состоит из элементов:
prostore_host
— IP-адрес или доменное имя ноды Prostore;db_name
— номер порта для подключения к ноде Prostore, равный значению параметра конфигурацииDTM_CORE_HTTP_PORT
;db_name
— имя логической БД, для которой нужно выгрузить бэкап;- (если включена аутентификация запросов) строка
?user=jwt&password=<jwt_token>
, гдеjwt_token
— авторизационный токен.
--datasourceType
Задает имя датасорса-источника, из которого выгружаются данные. Значение должно быть указано без кавычек и соответствовать имени датасорса, заданному в конфигурации.
Если ключевое слово не указано, данные каждой таблицы выгружаются из наиболее оптимального датасорса. Запрос на выгрузку бэкапа относится к категории «Реляционный».
--topic
Задает имя топика Kafka, куда выгружаются бэкап.
--kafkaBrokersCore
Задает адреса брокеров Kafka, куда нода Prostore выгружает промежуточные данные и откуда утилита их вычитывает для формирования бэкапа. Также, если в команде отсутствует ключевое слово kafkaBrokers, ключевое слово kafkaBrokersCore
определяет брокеров, где размещается топик с бэкапом.
Если ключевое слово не указано, утилита запрашивает список адресов брокеров из конфигурации ноды Prostore.
Значение указывается в формате host1:port1,host2:port2,..hostN:portN
. Возможные варианты:
- список брокеров Kafka, полностью соответствующий конфигурации ноды Prostore, — укажите этот вариант, если сервисная БД в ZooKeeper недоступна для подключения со стороны утилиты. Также можно указать это значение, чтобы ускорить работу команды, так при его наличии утилита пропускает этап обращения к сервисной БД за списком брокеров;
- произвольный список брокеров Kafka (не соответствующий конфигурации ноды) — укажите этот вариант, если нода должна выгружать данные в брокеры Kafka, отличные от заданных в конфигурации ноды.
На рисунке ниже показана схема работы утилиты и ноды Prostore с общим набором брокеров, который отличается от сконфигурированных для ноды.
--kafkaBrokers
Задает адреса брокеров Kafka, куда выгружается бэкап, в формате host1:port1,host2:port2,..hostN:portN
. Если ключевое слово отсутствует, бэкап выгружается в топик брокеров, указанных с помощью ключевого слова kafkaBrokersCore (если оно указано) или заданных в конфигурации Prostore (если ключевое слово kafkaBrokersCore
не указано).
Ключевое слово нужно указывать, если требуется выгрузить бэкап в топик брокеров Kafka, отличных от тех, с которыми работает Prostore.
На рисунке ниже показана схема работы утилиты с двумя наборами брокеров Kafka: один набор брокеров используется утилитой и системой Prostore, второй — только утилитой. Первый набор брокеров находится в одной сети с Prostore, второй — может находиться в той же или другой сети (например, в удаленном дата-центре).
--deltaNum
Задает номер дельты delta_num
, инкремент которой выгружается в топик, и может комбинироваться с ключевым словом nextDeltasCount для указания левой границы диапазона выгружаемых инкрементов.
Если ключевое слово не указано, выгружаются инкременты всех «новых» дельт — дельт, закрытых после последней дельты, уже зафиксированной в бэкапе топика в виде инкремента или снапшота.
При выгрузке инкремента в топик, содержащий снапшот той же логической БД, значение deltaNum
должно быть больше номера дельты, по состоянию на которую был зафиксирован снапшот. Например, если снапшот был зафиксирован на момент дельты 5, можно выгрузить инкремент дельты 6, 7 и т.д., но не 5 или 4.
--nextDeltasCount
Задает количество дельт, выгружаемых после дельты delta_num
, и используется для выгрузки инкрементов диапазона дельт.
--partitionsCount
Задает количество партиций, создаваемых в топике Kafka при выгрузке бэкапа в новый топик. Разделение топика на партиции позволяет ускорить выгрузку бэкапа за счет параллельных процессов выгрузки. Команда учитывает значение, если сама создает топик, то есть если в брокерах Kafka отключено автосоздание топиков.
Если ключевое слово не указано, все данные выгружаются в одну партицию.
--threadsCount
Задает количество параллельных потоков выгрузки бэкапа и, тем самым, позволяет подобрать оптимальную скорость выгрузки в зависимости от производительности инфраструктуры.
Если ключевое слово не указано, все данные выгружаются одним потоком.
--chunkSize
Задает максимальное количество записей, сохраняемых в одном сообщении Kafka.
Если ключевое слово не указано, в сообщение записывается до 1000 записей.
--kafkaRetries
Задает количество попыток выгрузки бэкапа в топик, которое команда делает при запуске. Если ключевое слово не указано, делается 10 попыток.
--snapshotDepth
Задает момент времени, по состоянию на который выгружается снапшот. Момент времени определяется как точка в истории изменений данных логической БД, отсчитываемая назад во времени от момента закрытия последней дельты на указанное количество дельт, дней, часов, минут или секунд.
Возможные значения (где X
— положительное число):
X
— X дельт,Xs
— X секунд,Xm
— X минут,Xh
— X часов,Xd
— X дней.
Снапшот должен выгружаться в пустой топик.
Если ключевое слово не указано, выгружается инкрементальный бэкап.
Ограничения
Общие ограничения
- Запуск команды недоступен, если в исходной логической БД есть незавершенные операции по изменению схемы данных.
- Бэкап выгружается по дельтам; при отсутствии дельт в логической БД выгрузка бэкапа невозможна.
- Во время работы команды недоступно изменение схемы данных логической БД, для которой выгружается бэкап.
- Команда не проверяет полноту журнала логической БД.
- Standalone-таблицы и внешние таблицы не включаются в бэкап.
- Значение ключевого слова partitions_count игнорируется, если в брокерах Kafka включено автосоздание топиков или бэкап выгружается в существующий топик.
Ограничения инкрементального бэкапа
- Выгрузка инкрементального бэкапа недоступна, если отсутствует хотя бы один из датасорсов, где раньше хранились данные логической БД.
- Выгрузка инкрементов запускается только при наличии в логической БД более новых закрытых дельт, чем те, которые уже хранятся в топике.
- Данные прокси-таблиц не включаются в инкрементальный бэкап.
- При выгрузке отдельных инкрементов недоступна выгрузка инкрементов тех дельт, которые уже зафиксированы в целевом топике в составе снапшота.
Ограничения снапшота
- Выгрузка снапшота должна выполняться в пустой топик.
Примеры
Выгрузка инкрементов всех дельт
Выгрузка инкрементов всех дельт логической БД marketing
из ADQM с выгрузкой в два потока и распределением на три партиции:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2
Выгрузка инкрементов с указанием токена:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing?user=jwt&password=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2
Выгрузка инкремента одной дельты
Выгрузка инкремента дельты 12
логической БД marketing
с указанием списка брокеров Kafka:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --kafkaRetries 5
Выгрузка инкремента диапазона дельты
Выгрузка инкрементов дельт с 12 по 17 логической БД marketing
с указанием списка брокеров Kafka:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --nextDeltasCount 5 --kafkaRetries 5
Выгрузка снапшота с историей
Выгрузка снапшота логической БД marketing
с историей изменений данных за две дельты:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 2
Выгрузка снапшота логической БД marketing
с историей изменений данных за 10 дней:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 10d
Выгрузка снапшота без истории
Выгрузка снапшота логической БД marketing
без истории изменений данных:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 0
Выгрузка снапшота и инкрементов новых дельт после снятия снапшота
Выгрузка снапшота логической БД marketing
без истории изменений данных:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 0
Выгрузка инкрементов всех новых дельт логической БД marketing
, закрытых после выгрузки снапшота:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --partitionsCount 3 --threadsCount 2
Пошаговый пример выгрузки бэкапа
Подготовка логической БД
Создание и наполнение тестовой логической базы данных sourcedb
:
CREATE DATABASE sourcedb;
USE sourcedb;
CREATE TABLE sourcedb.table1_adqm
(
id bigint not null,
boolean_col boolean,
int32_col int32,
int_col int,
bigint_col bigint,
float_col float,
double_col double,
char_col char(10),
varchar_col varchar,
uuid_col uuid,
link_col link,
date_col date,
time_col time,
timestamp_col timestamp,
PRIMARY KEY (id)
)
DISTRIBUTED BY (id)
DATASOURCE_TYPE ('adqm');
BEGIN DELTA;
INSERT INTO sourcedb.table1_adqm
VALUES (1, false, 32768, 65537, 1280000, 3.141526, 2.718281828,
'char_value', 'varchar_value','12345678-1234-abcd-ef00-1234567899ab',
'https://www.google.com', '1970-01-11', '13:10:30', '1970-01-01 13:10:30');
COMMIT DELTA;
Выгрузка полного бэкапа логической БД
Выгрузка полного инкрементального бэкапа логической БД sourcedb
из датасорса adqm
с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090
:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb
Инкрементальный бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb
.
Выгрузка снапшот-бэкапа логической БД sourcedb
из датасорса adqm
с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090
(без сохранения истории изменений):
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb_snapshot –-snapshotDepth 0
Снапшот-бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb_snapshot
.
Удаление тестовой логической БД
Опционально после окончания выгрузки бэкапа можно удалить исходную базу данных, чтобы протестировать ее последующее восстановление из бэкапа, сохраненного в топиках backup_topic_sourcedb
и backup_topic_sourcedb_snapshot
.
DROP DATABASE sourcedb
Пошаговый пример, продолжающий этот пример, см. в разделе Команда RESTORE_FROM_KAFKA > Пошаговый пример развертывания логической БД из бэкапа.
Варианты вывода команды
Вывод при успешной выгрузке бэкапа
Ниже показан примерный вид вывода команды при успешной выгрузке бэкапа (набор сообщений и их текст могут изменяться от версии к версии):
...
Started BACKUP process with params ...
...
Deny changes in datamart [<db_name>] with code [backup_<db_name>_<date_of_the_month>]
Delta range ... to backup
Got changes for datamart <db_name>
Tables received for datamart ...
Started backup delta processing
...
Got delta date for datamart ..., delta num ...
...
Started backup delta table processing
...
Topic ... with ... partitions successfully created
...
Successfully created download ext table ...
...
Data inserted into kafka ...
...
Successfully dropped download ext table ...
...
Successfully expload delta table to kafka ...
...
Started archiving delta table
...
Polled message 0 from source topic ...
Trying to send data message ... to topic [<topic_defined_in_the_command>] ...
...
Successfully archived delta table to backup topic <topic_defined_in_the_command>
Topic ... successfully deleted
Start archiving delta changelog ... into topic <topic_defined_in_the_command>
Trying to send changelog message ... to topic [<topic_defined_in_the_command>] ...
Successfully archived delta changelog into topic <topic_defined_in_the_command>
...
Allow changes in datamart <db_name>
BACKUP is done
Вывод при неуспешной выгрузке бэкапа
При неуспешной выгрузке бэкапа команда выводит текст ошибки.
Формат бэкапа в топике Kafka
Бэкап выгружается в топик в виде сообщений Kafka.
Каждое сообщение состоит из ключа и тела и содержит данные таблицы или журнала. Ключ сообщения состоит из схемы данных Avro и одной записи Avro и содержит метаинформацию инкремента в формате, описанном ниже в секции Формат ключа сообщения. Тело сообщения представляет собой файл Avro (Object Container File), состоящий из заголовка со схемой данных Avro и записей Avro.
Схема данных в ключе сообщения одинакова для всех сообщений, а схема данных в теле сообщения зависит от типа сообщения (см. рисунок ниже).
Для наглядности все бинарные данные Avro на рисунке и в примерах ниже представлены в JSON-формате, а не в бинарном виде.
На рисунке ниже показаны составные части сообщения Kafka, содержащего данные бэкапа.
Формат ключа сообщения
Схема Avro в ключе сообщения
Схема Avro в ключе сообщения имеет формат:
{
"name": "topicA",
"type": "record",
"fields": [
{
"name": "incrementId",
"type": "string"
},
{
"name": "messageType",
"type": "long"
},
{
"name": "datamart",
"type": "string"
},
{
"name": "deltaNum",
"type": "long"
},
{
"name": "deltaDate",
"type": "long",
"logicalType": "timestamp-micros"
},
{
"name": "snapshotChangelogLastNum",
"type": "long"
},
{
"name": "snapshotDeltaStartsFrom",
"type": "long"
},
{
"name": "snapshotChangelogStartsFrom",
"type": "long"
},
{
"name": "tableName",
"type": [
"null",
"string"
]
},
{
"name": "streamNumber",
"type": "long"
},
{
"name": "streamTotal",
"type": "long"
},
{
"name": "chunkNumber",
"type": "long"
},
{
"name": "isLastChunk",
"type": "boolean"
}
]
}
Параметры:
incrementId
-
Идентификатор инкремента. Представляет собой UUID-значение без дефисов и символов подчеркивания.
Параметр используется и для инкрементального бэкапа, и для снапшота, так как оба вида бэкапа выгружаются в одинаковом формате. Все сообщения с данными таблиц и журнала, составляющие один инкремент, имеют один идентификаторincrementId
. messageType
-
Тип сообщения. Возможные значения:
- 0 — сообщение с данными логической таблицы или прокси-таблицы,
- 1 — сообщение с данными журнала.
datamart
-
Имя логической базы данных, к которой относится инкремент.
deltaNum
-
Номер дельты, к которой относятся данные.
deltaDate
-
Дата и время закрытия дельты
deltaNum
в виде целого числа. snapshotChangelogLastNum
-
Номер последней записи в журнале исходной логической БД на момент выгрузки снапшота. В случае инкрементального бэкапа параметр всегда имеет значение
null
. snapshotDeltaStartsFrom
-
Номер дельты, по состоянию на которую зафиксирован снапшот. В случае инкрементального бэкапа параметр всегда имеет значение
null
. snapshotChangelogStartsFrom
-
Количество записей журнала в исходной логической БД, пропущенных при выгрузке журнала в снапшот. Разница в количестве записей вызвана тем, что в журнал снапшота сохраняются записи, отражающие текущее состояние логической схемы без промежуточных записей об изменении и удалении сущностей.
В случае инкрементального бэкапа параметр всегда имеет значениеnull
. tableName
-
Имя таблицы, из которой выгружены данные. В сообщениях с данными журнала поле имеет значение
null
. streamNumber
-
Номер потока, записавшего сообщение. Потоки нумеруются, начиная с 1.
streamTotal
-
Общее число потоков записи сообщений.
chunkNumber
-
Номер сообщения среди всех сообщений потока
streamNumber
. Сообщения нумеруются, начиная с 1. isLastChunk
-
Признак последнего сообщения в потоке
streamNumber
. Возможные значения:true
,false
.
Примеры записей в ключе сообщения
Пример записи Avro в ключе сообщения, содержащего данные таблицы и записанного при выгрузке инкрементального бэкапа:
{
"incrementId": "188d6862af17425e8cfd4451aa9b5c03",
"messageType": 0,
"datamart": "marketing",
"deltaNum": 55,
"deltaDate": 1638360336000000,
"snapshotChangelogLastNum": null,
"snapshotDeltaStartsFrom": null,
"snapshotChangelogStartsFrom": null,
"tableName": "stores",
"streamNumber": 1,
"streamTotal": 2,
"chunkNumber": 4,
"isLastChunk": false
}
Пример записи Avro в ключе сообщения, содержащего данные таблицы и записанного при выгрузке снапшота:
{
"incrementId": "71eb686d3f3649538e76f68f230de685",
"messageType": 0,
"datamart": "marketing",
"deltaNum": 55,
"deltaDate": 1638360336000000,
"snapshotChangelogLastNum": null,
"snapshotDeltaStartsFrom": null,
"snapshotChangelogStartsFrom": null,
"tableName": "stores",
"streamNumber": 1,
"streamTotal": 2,
"chunkNumber": 4,
"isLastChunk": false
}
Пример записи Avro в ключе сообщения, содержащего данные журнала и записанного при выгрузке снапшота:
{
"incrementId": "71eb686d3f3649538e76f68f230de685",
"messageType": 0,
"datamart": "marketing",
"deltaNum": 55,
"deltaDate": 1638360336000000,
"snapshotChangelogLastNum": 45,
"snapshotDeltaStartsFrom": 20,
"snapshotChangelogStartsFrom": 32,
"tableName": "stores",
"streamNumber": 1,
"streamTotal": 2,
"chunkNumber": 4,
"isLastChunk": false
}
Формат тела сообщения
Состав схемы данных Avro в теле сообщения зависит от типа сообщения:
- все сообщения с данными журнала имеют одинаковую схему данных,
- сообщение с данными таблицы имеет схему данных, соответствующую структуре этой таблицы.
Схема Avro в теле сообщения с данными журнала
Схема Avro в сообщениях, содержащих данные журнала, имеет вид:
{
"type": "record",
"name": "changeLog",
"fields": [
{
"name": "changeNum",
"type": "long"
},
{
"name": "entityName",
"type": "string"
},
{
"name": "changeQuery",
"type": "string"
}
]
}
Параметры:
changeNum
-
Порядковый номер изменения в журнале.
entityName
-
Имя логической сущности, к которой относится изменение.
changeQuery
-
Запрос на создание, удаление или изменение логической сущности.
Пример схемы Avro в теле сообщения с данными таблицы
Пример схемы Avro в сообщении, содержащем данные логической таблицы:
{
"type": "record",
"name": "row",
"fields": [
{
"name": "id",
"type": ["null","long"],
"doc": ""
},
{
"name": "transaction_date",
"type": ["null","long"],
"doc": ""
},
{
"name": "product_code",
"type": ["null","string"],
"doc": ""
},
{
"name": "product_units",
"type": ["null","long"],
"doc": ""
},
{
"name": "store_id",
"type": ["null","long"],
"doc": ""
},
{
"name": "description",
"type": ["null","string"],
"doc": ""
},
{
"name": "sys_op",
"type": ["null","int"],
"doc": ""
}
]
}
Примеры записей в теле сообщения
Пример записей Avro в сообщении с данными журнала:
[
{
"changeNum": 0,
"entityName": "marketing",
"changeQuery": "CREATE TABLE marketing.sales (id INT NOT NULL, transaction_date TIMESTAMP NOT NULL, product_code VARCHAR(256) NOT NULL, product_units INT NOT NULL, store_id INT NOT NULL, description VARCHAR(256), PRIMARY KEY (id)) DISTRIBUTED BY (id))"
},
{
"changeNum": 1,
"entityName": "stores_by_sold_products",
"changeQuery": "CREATE VIEW marketing.stores_by_sold_products AS SELECT store_id, SUM(product_units) AS product_amount FROM marketing.sales GROUP BY store_id ORDER BY product_amount DESC LIMIT 20"
},
{
"changeNum": 2,
"entityName": "stores_by_sold_products",
"changeQuery": "DROP VIEW marketing.stores_by_sold_products"
}
]
Пример записей Avro в сообщении с данными логической таблицы:
[
{
"id": 1000111,
"transaction_date": 1614269474000000,
"product_code": "ABC102101",
"product_units": 2,
"store_id": 1000012345,
"description": "Покупка по акции 1+1",
"sys_op": 0
},
{
"id": 1000112,
"transaction_date": 1614334214000000,
"product_code": "ABC102001",
"product_units": 1,
"store_id": 1000000123,
"description": "Покупка без акций",
"sys_op": 1
}
]