Команда BACKUP_TO_KAFKA

Содержание раздела
  1. Сценарии использования команды
  2. Топик Kafka для выгрузки бэкапа
  3. Режимы выгрузки бэкапа
  4. Содержимое бэкапа
    1. Поддерживаемые сущности
    2. Содержимое инкремента
    3. Содержимое снапшота
    4. Отличие в составе данных между инкрементом и снапшотом
  5. Как работает команда
    1. Определение целевого топика Kafka
    2. Выгрузка инкремента одной дельты
    3. Выгрузка инкрементов дельт из заданного диапазона
    4. Выгрузка инкрементов всех дельт
    5. Выгрузка снапшота
  6. Использование команды
    1. Выгрузка инкрементов
      1. Проверка недостающих инкрементов
      2. Выгрузка инкрементов всех дельт
      3. Выгрузка инкремента одной дельты
      4. Выгрузка инкрементов диапазона дельт
    2. Выгрузка снапшота
    3. Снятие блокировки логической БД в случае сбоя
  7. Аутентификация
  8. Синтаксис
    1. Параметры
    2. Ключевые слова
      1. --connect
      2. --datasourceType
      3. --topic
      4. --kafkaBrokersCore
      5. --kafkaBrokers
      6. --deltaNum
      7. --nextDeltasCount
      8. --partitionsCount
      9. --threadsCount
      10. --chunkSize
      11. --kafkaRetries
      12. --snapshotDepth
  9. Ограничения
    1. Общие ограничения
    2. Ограничения инкрементального бэкапа
    3. Ограничения снапшота
  10. Примеры
    1. Выгрузка инкрементов всех дельт
    2. Выгрузка инкремента одной дельты
    3. Выгрузка инкремента диапазона дельты
    4. Выгрузка снапшота с историей
    5. Выгрузка снапшота без истории
    6. Выгрузка снапшота и инкрементов новых дельт после снятия снапшота
    7. Пошаговый пример выгрузки бэкапа
      1. Подготовка логической БД
      2. Выгрузка полного бэкапа логической БД
      3. Удаление тестовой логической БД
  11. Варианты вывода команды
    1. Вывод при успешной выгрузке бэкапа
    2. Вывод при неуспешной выгрузке бэкапа
  12. Формат бэкапа в топике Kafka
    1. Формат ключа сообщения
      1. Схема Avro в ключе сообщения
      2. Примеры записей в ключе сообщения
    2. Формат тела сообщения
      1. Схема Avro в теле сообщения с данными журнала
      2. Пример схемы Avro в теле сообщения с данными таблицы
      3. Примеры записей в теле сообщения

Команда BACKUP_TO_KAFKA выгружает бэкап логической базы данных в топик Kafka.

Выгружаемый бэкап содержит логическую схему данных и данные. Выгрузить данные можно с полной историей их изменений, частичной историей или без истории.

Из целостного бэкапа можно развернуть резервную копию логической БД или восстановить сбойную логическую БД, используя команду RESTORE_FROM_KAFKA.

Для запуска команды на хосте должна быть установлена среда исполнения Java (JRE) или набор инструментов для разработки на Java (JDK).

Бэкап не является полным слепком исходной логической БД, так как не включает standalone-таблицы и внешние таблицы, а также в некоторых случаях данные прокси-таблиц.

Сценарии использования команды

Доступны следующие сценарии использования команды:

  • бэкап текущего состояния логической БД:
    • без истории изменений данных,
    • с отрезком истории изменений данных указанной глубины,
    • с полной историей изменений данных с момента создания БД до настоящего момента;
  • бэкап недостающего отрезка истории изменений данных логической БД.

Топик Kafka для выгрузки бэкапа

По умолчанию команда выгружает бэкап в топик брокера сообщений Kafka, заданного с помощью ключевого слова kafkaBrokersCore команды и (или) с помощью параметра конфигурации ZOOKEEPER_KAFKA_ADDRESS.

Чтобы выгрузить бэкап в другой брокер сообщений, укажите в команде ключевое слово kafkaBrokers с адресом нужного брокера.

Режимы выгрузки бэкапа

Доступные режимы выгрузки бэкапа:

  • инкрементальный бэкап — бэкап логической БД с полной историей изменений данных или недостающей частью истории, выгружаемый инкрементами (порциями по дельтам);
  • снапшот-бэкап — бэкап логической БД с историей изменений данных указанной глубины.

В обоих режимах бэкап выгружается только при наличии дельт в логической БД. Если в логической БД нет ни одной дельты, вы можете создать пустую дельту и затем выгрузить бэкап.

За один запуск команды можно выгрузить один инкремент, все недостающие инкременты или снапшот. Подробнее см. в секции Использование команды.

Содержимое бэкапа

Поддерживаемые сущности

Команда выгружает логическую схему данных по всем видам сущностей, для которых ведется журнал, а также данные таблиц следующих видов:

Данные материализованных представлений не выгружаются командой, но они автоматически восстанавливаются системой после успешного развертывания таблиц-источников командой RESTORE_FROM_KAFKA. Данные остальных сущностей, включаемых в бэкап в составе логической схемы, — партиционированных таблиц и логических представлений — не выгружаются, так как эти сущности не хранят данные.

Внешние таблицы и standalone-таблицы не включаются в бэкап. При необходимости вы можете их пересоздать после успешного развертывания логической БД из бэкапа.

Содержимое инкремента

Инкремент содержит:

  • журнал изменений логической схемы данных по состоянию на момент запуска команды;
  • полную историю изменений, совершенных с данными логических таблиц в дельте и в промежутке между дельтой инкремента и предыдущей к ней дельтой, включая холодные данные.

Журнал включается в инкремент в следующем объеме:

  • если инкремент выгружается в топик, который содержит снапшот, в инкремент включаются записи журнала, записанные после момента, который зафиксирован в снапшоте;
  • иначе в инкремент включается полный журнал.

Номера и метки времени дельт в инкрементах соответствуют исходной логической БД. Сохранение нумерации и меток времени операций записи не гарантируется.

Содержимое снапшота

Снапшот содержит:

  • состояние логической схемы данных на момент запуска команды;
  • состояние данных логических таблиц и прокси-таблиц на момент времени, указанный в команде;
  • (если snapshotDepth больше 0) историю изменений, совершенных с данными логических таблиц и прокси-таблиц с указанного момента времени по последнюю закрытую дельту включительно.

Технически снапшот выгружается в топик Kafka в таком же формате, как и инкрементальный бэкап.

Сохранение номеров и меток времени дельт и операций записи в снапшоте относительно исходной логической БД не гарантируется.

Подробнее о механизме выгрузки снапшота см. в секции Как работает команда > Выгрузка снапшота.

Отличие в составе данных между инкрементом и снапшотом

Инкремент содержит набор изменений данных, а снапшот без истории — снимок состояния данных.

Пример:

  • исходные данные:
    • дельта 1: добавили запись А, запись Б;
    • дельта 2: удалили запись А;
  • инкременты:
    • инкремент дельты 1: запись А, запись Б — обе с признаком добавления (sys_op = 0);
    • инкремент дельты 2: запись А с признаком удаления (sys_op = 1);
  • снапшоты:
    • снапшот на дельту 1: запись А, запись Б;
    • снапшот на дельту 2: запись Б.

Как работает команда

Секция описывает следующие механизмы выгрузки бэкапа:

Определение целевого топика Kafka

Оба вида бэкапа — инкременты и снапшоты — выгружаются в топик Kafka в формате Avro и имеют структуру, описанную в секции Формат бэкапа в топике Kafka.

Бэкап выгружается в топик следующего брокера Kafka:

  • заданного с помощью ключевого слова kafkaBrokers — если команда содержит kafkaBrokers;
  • заданного с помощью ключевого слова kafkaBrokersCore — если команда содержит kafkaBrokersCore и не содержит kafkaBrokers;
  • заданного с помощью параметра конфигурации ZOOKEEPER_KAFKA_ADDRESS — если команда не содержит ни kafkaBrokers, ни kafkaBrokersCore.

Команда выгружает бэкап одним или несколькими потоками — в зависимости от значения ключевого слова threadsCount. По умолчанию, если ключевое слово не указано, используется один поток.

Новый топик при первой выгрузке бэкапа разделяется на заданное количество партиций. Количество партиций в топике определяется:

  • настройками Kafka — если в брокере сообщений Kafka настроено автосоздание топиков;
  • ключевым словом partitionsCount, заданным в команде, — если автосоздание топиков отключено.

Выгрузка инкремента одной дельты

При запуске с ключевым словом deltaNum и без ключевого слова nextDeltasCount команда выгружает в топик инкремент указанной дельты.

Если целевой топик уже содержит снапшот, выгрузка инкремента той же логической БД доступна только для дельт, закрытых после зафиксированного в снапшоте состояния БД.

На рисунке ниже показан пример выгрузки инкремента дельты с номером 1. Инкремент содержит изменения данных w2 и w3, внесенные между закрытием дельт 0 и 1, и все записи журнала по изменению схемы с с0 по с3 включительно.

Схема выгрузки инкремента одной дельты

Выгрузка инкрементов дельт из заданного диапазона

При запуске с ключевыми словами deltaNum и nextDeltasCount команда выгружает в топик инкременты дельт из диапазона [deltaNum, deltaNum + nextDeltasCount]. Например, если значение deltaNum равно 2, а значение nextDeltasCount равно 3, выгружаются инкременты следующих дельт: 2, 3, 4, 5.

Если целевой топик уже содержит снапшот, выгрузка инкрементов той же логической БД доступна только для дельт, закрытых после зафиксированного в снапшоте состояния БД.

На рисунке ниже показан пример выгрузки инкрементов дельт из диапазона [1, 1+1]. Инкременты содержат изменения данных w2, w3, w4 и w5, и все записи журнала по изменению схемы с с0 по с3 включительно.

Схема выгрузки инкрементов дельт из диапазона

Выгрузка инкрементов всех дельт

При запуске без ключевых слов deltaNum и snapshotDepth команда выгружает в топик инкременты следующих дельт логической БД:

  • всех закрытых дельт — если в топике, куда выгружаются данные, нет ни инкрементов, ни снапшота этой логической БД;
  • «новых» закрытых дельт — если в топике уже есть снапшот или хотя бы один инкремент этой логической БД. Под «новыми» закрытыми дельтами понимаются дельты, закрытые после дельт, сохраненных в топике. Промежуточные дельты, если какие-то из них отсутствуют в топике, НЕ выгружаются.

На рисунке ниже показан пример выгрузки инкрементов всех дельт в новый топик. В этом случае команда выгружает все изменения данных и все записи журнала.

Схема выгрузки инкрементов всех дельт

На рисунке ниже показан пример выгрузки инкрементов всех дельт в топик, где уже есть инкремент дельты 1 той же логической БД. В этом случае команда выгружает изменения данных w4 и w5 и все записи журнала.

Схема выгрузки инкрементов новых дельт

Выгрузка снапшота

При запуске с ключевым словом snapshotDepth команда выгружает в топик снапшот логической БД, зафиксированный по состоянию на указанный момент времени. Если значение snapshotDepth больше 0, в топик также выгружается история изменений данных с момента времени, зафиксированного в снапшоте, по последнюю закрытую дельту включительно.

На рисунке ниже показан пример выгрузки снапшота без истории. В этом случае команда выгружает состояние данных на дельту 2 и журнал с записями, отражающими состояние сущностей на момент запуска команды.

Схема выгрузки снапшота на текущий момент без истории

На рисунке ниже показан пример выгрузки снапшота с небольшим отрезком истории изменений. В этом случае команда выгружает состояние данных на дельту 1, изменения после этой дельты (w4 и w5), а также журнал с записями, отражающими состояние сущностей на момент запуска команды.

Схема выгрузки снапшота на момент предпоследней дельты с историей изменений

Если момент времени задан в команде как несколько дней, часов, минут или секунд назад, снапшот фиксируется по состоянию на последнюю закрытую на тот момент дельту. Например, если команда запускается с ключом --snapshotDepth 2d 12 января 2024 года в 10:08:14, снапшот выгружается по состоянию на дельту, которая была последней два дня назад (10 января 2024 года в 10:08:14).

Подробнее о возможных значениях ключевого слова см. в описании snapshotDepth.

Нумерация дельт в выгружаемом снапшоте сдвигается относительно исходной логической БД: снапшот сохраняется как начальное состояние (дельта 0), а последующие дельты получают номера, следующие по порядку, — 1, 2 и т.д. в зависимости от значения snapshotDepth.

Нумерация записей в журнале снапшота также отличается от исходной логической БД: состояние логической схемы данных на момент запуска команды фиксируется как набор операций по созданию сущностей в текущем состоянии. Журнал снапшота не включает промежуточные операции по изменению и удалению сущностей, которые могли присутствовать в журнале исходной логической БД.

Использование команды

Выгрузка инкрементов

Проверка недостающих инкрементов

Чтобы проверить список дельт, инкрементов которых не хватает в топике до полного бэкапа, выполните команду RESTORE_FROM_KAFKA с ключевым словом readonly.

Выгрузка инкрементов всех дельт

Чтобы выгрузить инкременты всех или только недостающих закрытых дельт:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA без ключевых слов deltaNum и shapshotDepth. В команде укажите ключевое слово topic со следующим значением:
    • если нужно выгрузить полный бэкап с нулевой до последней закрытой дельты, укажите топик, в который раньше не выгружались ни инкременты, ни снапшот этой логической БД. При этом топик может содержать данные других логических БД;
    • если нужно обновить существующий бэкап последними изменениями логической БД, укажите топик, содержащий инкременты или снапшот этой логической БД.

Выгрузка инкремента одной дельты

Чтобы выгрузить инкремент одной дельты:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA с ключевым словом deltaNum, не указывая ключевое слово shapshotDepth.

При выгрузке инкремента в топик, содержащий снапшот той же логической БД, значение deltaNum должно быть больше номера дельты, по состоянию на которую зафиксирован снапшот.

Выгрузка инкрементов диапазона дельт

Чтобы выгрузить инкременты дельт из диапазона:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA с ключевыми словами deltaNum и nextDeltasCount, не указывая ключевое слово shapshotDepth.

При выгрузке инкремента в топик, содержащий снапшот той же логической БД, значение deltaNum должно быть больше номера дельты, по состоянию на которую зафиксирован снапшот.

Выгрузка снапшота

Чтобы выгрузить снапшот логической БД:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA с ключевым словом snapshotDepth, не указывая ключевое слово deltaNum.

Выгружайте каждый снапшот в новый топик. Система не гарантирует корректное восстановление логической БД из топика, содержащего несколько снапшотов или снапшоты, выгруженные после инкрементов.

Снятие блокировки логической БД в случае сбоя

Если во время работы команды произошел сбой, исходная логическая БД может остаться заблокированной для изменений схемы данных. В этом случае все DDL-запросы в логической БД возвращают ошибку Change operations are forbidden.

Чтобы снять блокировку изменений в логической БД, выполните запрос ALLOW_CHANGES, где в качестве кода-пароля в запросе укажите строку в формате backup_<имя_логической_БД>_<число_месяца_запуска_команды>. Например, если команда была запущена для логической БД marketing 9 февраля, следует выполнить запрос ALLOW_CHANGES(marketing, 'backup_marketing_9').

После снятия блокировки можно запустить выгрузку бэкапа еще раз.

Аутентификация

Команда поддерживает передачу токена (JWT) для последующей аутентификации в системе.

Если в системе включена аутентификация запросов, необходимо указать информацию о токене в составе строки подключения команды. Подробнее о формате строки подключения см. в секции Синтаксис.

Синтаксис

java -jar <dtm_tools_file_name>.jar \
--connect <prostore_host>:<prostore_port>/<db_name>[?user=jwt&password=<jwt_token>] \
backup_to_kafka \
[--datasourceType <datasource_name>] \
--topic <backup_topic> \
[–-kafkaBrokersCore <core_kafka_brokers_list>] \
[--kafkaBrokers <backup_kafka_broker_list>] \
[--deltaNum <delta_num>] \
[–-nextDeltasCount <next_deltas_count>] \
[--partitionsCount <partitions_count>] \
[--threadsCount <threads_count>] \
[--chunkSize <chunk_size>] \
[--kafkaRetries <kafka_retries>] \
[--snapshotDepth <snapshot_depth>]

Параметры

dtm_tools_file_name

Имя jar-файла утилиты DTM Tools.

Ключевые слова

--connect

Задает строку подключения к ноде Prostore. Значение состоит из следующих элементов:

  1. prostore_host — IP-адрес или доменное имя ноды Prostore;
  2. db_name — номер порта для подключения к ноде Prostore, равный значению параметра конфигурации DTM_CORE_HTTP_PORT;
  3. db_name — имя логической БД, для которой нужно выгрузить бэкап;
  4. (если включена аутентификация запросов) строка ?user=jwt&password=<jwt_token>, где jwt_token — авторизационный токен.

--datasourceType

Задает имя датасорса-источника, из которого выгружаются данные. Значение должно быть указано без кавычек и соответствовать имени датасорса, заданному в конфигурации.

Если ключевое слово не указано, данные каждой таблицы выгружаются из наиболее оптимального датасорса. Запрос на выгрузку бэкапа относится к категории «Реляционный».

--topic

Задает имя топика Kafka, куда выгружаются бэкап.

--kafkaBrokersCore

Задает список адресов брокеров сообщений Kafka, с которыми работает Prostore, и служит для ускорения работы команды.

Ключевое слово обязательно при запуске команды в инсталляциях, где сервисная БД в ZooKeeper недоступна для подключения со стороны утилиты DTM Tools, из-за чего утилита не может запросить информацию о брокерах из конфигурации Prostore. В остальных случаях ключевое слово опционально.

Если ключевое слово не указано, команда запрашивает список адресов из конфигурации ноды Prostore.

--kafkaBrokers

Задает список адресов брокеров сообщений Kafka, куда выгружается бэкап, в формате host1:port1,host2:port2,..hostN:portN.

Ключевое слово нужно указывать, если требуется выгрузить бэкап в брокеры Kafka, отличные от тех, с которыми работает Prostore.

На рисунке ниже показана схема работы утилиты с двумя брокерами Kafka: один брокер используется утилитой и системой Prostore, второй — только утилитой. Первый брокер находится в одной сети с Prostore, а второй может находиться в другой сети, например, в удаленном дата-центре.

Схема работы утилиты с двумя брокерами Kafka

--deltaNum

Задает номер дельты delta_num, инкремент которой выгружается в топик, и может комбинироваться с ключевым словом nextDeltasCount для указания левой границы диапазона выгружаемых инкрементов.

Если ключевое слово не указано, выгружаются инкременты всех «новых» дельт — дельт, закрытых после последней дельты, уже зафиксированной в бэкапе топика в виде инкремента или снапшота.

При выгрузке инкремента в топик, содержащий снапшот той же логической БД, значение deltaNum должно быть больше номера дельты, по состоянию на которую был зафиксирован снапшот. Например, если снапшот был зафиксирован на момент дельты 5, можно выгрузить инкремент дельты 6, 7 и т.д., но не 5 или 4.

--nextDeltasCount

Задает количество дельт, выгружаемых после дельты delta_num, и используется для выгрузки инкрементов диапазона дельт.

--partitionsCount

Задает количество партиций, создаваемых в топике Kafka при выгрузке бэкапа в новый топик. Разделение топика на партиции позволяет ускорить выгрузку бэкапа за счет параллельных процессов выгрузки. Команда учитывает значение, если сама создает топик, то есть если в брокере Kafka отключено автосоздание топиков.

Если ключевое слово не указано, все данные выгружаются в одну партицию.

--threadsCount

Задает количество параллельных потоков выгрузки бэкапа и, тем самым, позволяет подобрать оптимальную скорость выгрузки в зависимости от производительности инфраструктуры.

Если ключевое слово не указано, все данные выгружаются одним потоком.

--chunkSize

Задает максимальное количество записей, сохраняемых в одном сообщении Kafka.

Если ключевое слово не указано, в сообщение записывается до 1000 записей.

--kafkaRetries

Задает количество попыток выгрузки бэкапа в топик, которое команда делает при запуске. Если ключевое слово не указано, делается 10 попыток.

--snapshotDepth

Задает момент времени, по состоянию на который выгружается снапшот. Момент времени определяется как точка в истории изменений данных логической БД, отсчитываемая назад во времени от момента закрытия последней дельты на указанное количество дельт, дней, часов, минут или секунд.

Возможные значения (где X — положительное число):

  • X — X дельт,
  • Xs — X секунд,
  • Xm — X минут,
  • Xh — X часов,
  • Xd — X дней.

Снапшот должен выгружаться в пустой топик.

Если ключевое слово не указано, выгружается инкрементальный бэкап.

Ограничения

Общие ограничения

  • Запуск команды недоступен, если в исходной логической БД есть незавершенные операции по изменению схемы данных.
  • Бэкап выгружается по дельтам; при отсутствии дельт в логической БД выгрузка бэкапа невозможна.
  • Во время работы команды недоступно изменение схемы данных логической БД, для которой выгружается бэкап.
  • Команда не проверяет полноту журнала логической БД.
  • Standalone-таблицы и внешние таблицы не включаются в бэкап.
  • Значение ключевого слова partitions_count игнорируется, если в брокере сообщений Kafka включено автосоздание топиков или бэкап выгружается в существующий топик.

Ограничения инкрементального бэкапа

  • Выгрузка инкрементов запускается только при наличии в логической БД более новых закрытых дельт, чем те, которые уже хранятся в топике.
  • Данные прокси-таблиц не включаются в инкрементальный бэкап.
  • При выгрузке отдельных инкрементов недоступна выгрузка инкрементов тех дельт, которые уже зафиксированы в целевом топике в составе снапшота.

Ограничения снапшота

  • Выгрузка снапшота должна выполняться в пустой топик.

Примеры

Выгрузка инкрементов всех дельт

Выгрузка инкрементов всех дельт логической БД marketing из ADQM с выгрузкой в два потока и распределением на три партиции:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2

Выгрузка инкрементов с указанием токена:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing?user=jwt&password=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2

Выгрузка инкремента одной дельты

Выгрузка инкремента дельты 12 логической БД marketing с указанием списка брокеров Kafka:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --kafkaRetries 5

Выгрузка инкремента диапазона дельты

Выгрузка инкрементов дельт с 12 по 17 логической БД marketing с указанием списка брокеров Kafka:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --nextDeltasCount 5 --kafkaRetries 5

Выгрузка снапшота с историей

Выгрузка снапшота логической БД marketing с историей изменений данных за две дельты:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 2

Выгрузка снапшота логической БД marketing с историей изменений данных за 10 дней:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 10d

Выгрузка снапшота без истории

Выгрузка снапшота логической БД marketing без истории изменений данных:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 0

Выгрузка снапшота и инкрементов новых дельт после снятия снапшота

Выгрузка снапшота логической БД marketing без истории изменений данных:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 0

Выгрузка инкрементов всех новых дельт логической БД marketing, закрытых после выгрузки снапшота:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --partitionsCount 3 --threadsCount 2

Пошаговый пример выгрузки бэкапа

Подготовка логической БД

Создание и наполнение тестовой логической базы данных sourcedb:

CREATE DATABASE sourcedb;
USE sourcedb;
CREATE TABLE sourcedb.table1_adqm
    (
      id bigint not null,
      boolean_col boolean,
      int32_col int32,
      int_col int,
      bigint_col bigint,
      float_col float,
      double_col double,
      char_col char(10),
      varchar_col varchar,
      uuid_col uuid,
      link_col link,
      date_col date,
      time_col time,
      timestamp_col timestamp,
      PRIMARY KEY (id)
    )
DISTRIBUTED BY (id)
DATASOURCE_TYPE ('adqm');
 
BEGIN DELTA;
INSERT INTO sourcedb.table1_adqm
VALUES (1, false, 32768, 65537, 1280000, 3.141526, 2.718281828,
 'char_value', 'varchar_value','12345678-1234-abcd-ef00-1234567899ab',
 'https://www.google.com', '1970-01-11', '13:10:30', '1970-01-01 13:10:30');
COMMIT DELTA;

Выгрузка полного бэкапа логической БД

Выгрузка полного инкрементального бэкапа логической БД sourcedb из датасорса adqm с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb

Инкрементальный бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb.

Выгрузка снапшот-бэкапа логической БД sourcedb из датасорса adqm с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090 (без сохранения истории изменений):

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb_snapshot –-snapshotDepth 0

Снапшот-бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb_snapshot.

Удаление тестовой логической БД

Опционально после окончания выгрузки бэкапа можно удалить исходную базу данных, чтобы протестировать ее последующее восстановление из бэкапа, сохраненного в топиках backup_topic_sourcedb и backup_topic_sourcedb_snapshot.

DROP DATABASE sourcedb

Пошаговый пример, продолжающий этот пример, см. в разделе Команда RESTORE_FROM_KAFKA > Пошаговый пример развертывания логической БД из бэкапа.

Варианты вывода команды

Вывод при успешной выгрузке бэкапа

Ниже показан примерный вид вывода команды при успешной выгрузке бэкапа (набор сообщений и их текст могут изменяться от версии к версии):

...
Started BACKUP process with params ...
...
Deny changes in datamart [<db_name>] with code [backup_<db_name>_<date_of_the_month>]
Delta range ... to backup
Got changes for datamart <db_name>
Tables received for datamart ...
Started backup delta processing
...
Got delta date for datamart ..., delta num ...
...
Started backup delta table processing
...
Topic ... with ... partitions successfully created
...
Successfully created download ext table ...
...
Data inserted into kafka ...
...
Successfully dropped download ext table ...
...
Successfully expload delta table to kafka ...
...
Started archiving delta table
...
Polled message 0 from source topic ...
Trying to send data message ... to topic [<topic_defined_in_the_command>] ...
...
Successfully archived delta table to backup topic <topic_defined_in_the_command>
Topic ... successfully deleted
Start archiving delta changelog ... into topic <topic_defined_in_the_command>
Trying to send changelog message ... to topic [<topic_defined_in_the_command>] ...
Successfully archived delta changelog into topic <topic_defined_in_the_command>
...
Allow changes in datamart <db_name>
BACKUP is done

Вывод при неуспешной выгрузке бэкапа

При неуспешной выгрузке бэкапа команда выводит текст ошибки.

Формат бэкапа в топике Kafka

Бэкап выгружается в топик в виде сообщений Kafka.

Каждое сообщение состоит из ключа и тела и содержит данные таблицы или журнала. Ключ сообщения состоит из схемы данных Avro и одной записи Avro и содержит метаинформацию инкремента в формате, описанном ниже в секции Формат ключа сообщения. Тело сообщения представляет собой файл Avro (Object Container File), состоящий из заголовка со схемой данных Avro и записей Avro.

Схема данных в ключе сообщения одинакова для всех сообщений, а схема данных в теле сообщения зависит от типа сообщения (см. рисунок ниже).

Для наглядности все бинарные данные Avro на рисунке и в примерах ниже представлены в JSON-формате, а не в бинарном виде.

На рисунке ниже показаны составные части сообщения Kafka, содержащего данные бэкапа.

Формат сообщения Kafka с данными бэкапа

Формат ключа сообщения

Схема Avro в ключе сообщения

Схема Avro в ключе сообщения имеет следующий формат:

{
  "name": "topicA",
  "type": "record",
  "fields": [
    {
      "name": "incrementId",
      "type": "string"
    },
    {
      "name": "messageType",
      "type": "long"
    },
    {
      "name": "datamart",
      "type": "string"
    },
    {
      "name": "deltaNum",
      "type": "long"
    },
    {
      "name": "deltaDate",
      "type": "long",
      "logicalType": "timestamp-micros"
    },
    {
      "name": "snapshotChangelogLastNum",
      "type": "long"
    },
    {
      "name": "snapshotDeltaStartsFrom",
      "type": "long"
    },
    {
      "name": "snapshotChangelogStartsFrom",
      "type": "long"
    },
    {
      "name": "tableName",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "streamNumber",
      "type": "long"
    },
    {
      "name": "streamTotal",
      "type": "long"
    },
    {
      "name": "chunkNumber",
      "type": "long"
    },
    {
      "name": "isLastChunk",
      "type": "boolean"
    }
  ]
}

Параметры:

incrementId

Идентификатор инкремента. Представляет собой UUID-значение без дефисов и символов подчеркивания.
Параметр используется и для инкрементального бэкапа, и для снапшота, так как оба вида бэкапа выгружаются в одинаковом формате. Все сообщения с данными таблиц и журнала, составляющие один инкремент, имеют один идентификатор incrementId .

messageType

Тип сообщения. Возможные значения:

  • 0 — сообщение с данными логической таблицы или прокси-таблицы,
  • 1 — сообщение с данными журнала.
datamart

Имя логической базы данных, к которой относится инкремент.

deltaNum

Номер дельты, к которой относятся данные.

deltaDate

Дата и время закрытия дельты deltaNum в виде целого числа.

snapshotChangelogLastNum

Номер последней записи в журнале исходной логической БД на момент выгрузки снапшота. В случае инкрементального бэкапа параметр всегда имеет значение null.

snapshotDeltaStartsFrom

Номер дельты, по состоянию на которую зафиксирован снапшот. В случае инкрементального бэкапа параметр всегда имеет значение null.

snapshotChangelogStartsFrom

Количество записей журнала в исходной логической БД, пропущенных при выгрузке журнала в снапшот. Разница в количестве записей вызвана тем, что в журнал снапшота сохраняются записи, отражающие текущее состояние логической схемы без промежуточных записей об изменении и удалении сущностей.
В случае инкрементального бэкапа параметр всегда имеет значение null.

tableName

Имя таблицы, из которой выгружены данные. В сообщениях с данными журнала поле имеет значение null.

streamNumber

Номер потока, записавшего сообщение. Потоки нумеруются, начиная с 1.

streamTotal

Общее число потоков записи сообщений.

chunkNumber

Номер сообщения среди всех сообщений потока streamNumber. Сообщения нумеруются, начиная с 1.

isLastChunk

Признак последнего сообщения в потоке streamNumber. Возможные значения: true, false.

Примеры записей в ключе сообщения

Пример записи Avro в ключе сообщения, содержащего данные таблицы и записанного при выгрузке инкрементального бэкапа:

{
  "incrementId": "188d6862af17425e8cfd4451aa9b5c03",
  "messageType": 0,
  "datamart": "marketing",
  "deltaNum": 55,
  "deltaDate": 1638360336000000,
  "snapshotChangelogLastNum": null,
  "snapshotDeltaStartsFrom": null,
  "snapshotChangelogStartsFrom": null,
  "tableName": "stores",
  "streamNumber": 1,
  "streamTotal": 2,
  "chunkNumber": 4,
  "isLastChunk": false
}

Пример записи Avro в ключе сообщения, содержащего данные таблицы и записанного при выгрузке снапшота:

{
  "incrementId": "71eb686d3f3649538e76f68f230de685",
  "messageType": 0,
  "datamart": "marketing",
  "deltaNum": 55,
  "deltaDate": 1638360336000000,
  "snapshotChangelogLastNum": null,
  "snapshotDeltaStartsFrom": null,
  "snapshotChangelogStartsFrom": null,
  "tableName": "stores",
  "streamNumber": 1,
  "streamTotal": 2,
  "chunkNumber": 4,
  "isLastChunk": false
}

Пример записи Avro в ключе сообщения, содержащего данные журнала и записанного при выгрузке снапшота:

{
  "incrementId": "71eb686d3f3649538e76f68f230de685",
  "messageType": 0,
  "datamart": "marketing",
  "deltaNum": 55,
  "deltaDate": 1638360336000000,
  "snapshotChangelogLastNum": 45,
  "snapshotDeltaStartsFrom": 20,
  "snapshotChangelogStartsFrom": 32,
  "tableName": "stores",
  "streamNumber": 1,
  "streamTotal": 2,
  "chunkNumber": 4,
  "isLastChunk": false
}

Формат тела сообщения

Состав схемы данных Avro в теле сообщения зависит от типа сообщения:

  • все сообщения с данными журнала имеют одинаковую схему данных,
  • сообщение с данными таблицы имеет схему данных, соответствующую структуре этой таблицы.

Схема Avro в теле сообщения с данными журнала

Схема Avro в сообщениях, содержащих данные журнала, имеет следующий вид:

{
  "type": "record",
  "name": "changeLog",
  "fields": [
    {
      "name": "changeNum",
      "type": "long"
    },
    {
      "name": "entityName",
      "type": "string"
    },
    {
      "name": "changeQuery",
      "type": "string"
    }
  ]
}

Параметры:

changeNum

Порядковый номер изменения в журнале.

entityName

Имя логической сущности, к которой относится изменение.

changeQuery

Запрос на создание, удаление или изменение логической сущности.

Пример схемы Avro в теле сообщения с данными таблицы

Пример схемы Avro в сообщении, содержащем данные логической таблицы:

{
  "type": "record",
  "name": "row",
  "fields": [
    {
      "name": "id",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "transaction_date",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "product_code",
      "type": ["null","string"],
      "doc": ""
    },
    {
      "name": "product_units",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "store_id",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "description",
      "type": ["null","string"],
      "doc": ""
    },
    {
      "name": "sys_op",
      "type": ["null","int"],
      "doc": ""
    }
  ]
}

Примеры записей в теле сообщения

Пример записей Avro в сообщении с данными журнала:

[
  {
    "changeNum": 0,
    "entityName": "marketing",
    "changeQuery": "CREATE TABLE marketing.sales (id INT NOT NULL, transaction_date TIMESTAMP NOT NULL, product_code VARCHAR(256) NOT NULL, product_units INT NOT NULL, store_id INT NOT NULL, description VARCHAR(256), PRIMARY KEY (id)) DISTRIBUTED BY (id))"
  },
  {
    "changeNum": 1,
    "entityName": "stores_by_sold_products",
    "changeQuery": "CREATE VIEW marketing.stores_by_sold_products AS SELECT store_id, SUM(product_units) AS product_amount FROM marketing.sales GROUP BY store_id ORDER BY product_amount DESC LIMIT 20"
  },
  {
    "changeNum": 2,
    "entityName": "stores_by_sold_products",
    "changeQuery": "DROP VIEW marketing.stores_by_sold_products"
  }
]

Пример записей Avro в сообщении с данными логической таблицы:

[
  {
    "id": 1000111,
    "transaction_date": 1614269474000000,
    "product_code": "ABC102101",
    "product_units": 2,
    "store_id": 1000012345,
    "description": "Покупка по акции 1+1",
    "sys_op": 0
  },
  {
    "id": 1000112,
    "transaction_date": 1614334214000000,
    "product_code": "ABC102001",
    "product_units": 1,
    "store_id": 1000000123,
    "description": "Покупка без акций",
    "sys_op": 1
  }
]