Команда BACKUP_TO_KAFKA

Содержание раздела
  1. Режимы выгрузки бэкапа
  2. Содержимое бэкапа
    1. Выгружаемая логическая схема данных
    2. Выгружаемые данные
    3. Содержимое инкремента
    4. Содержимое снапшота
  3. Как работает команда
    1. Выгрузка бэкапа в топик Kafka
    2. Выгрузка инкремента одной дельты
    3. Выгрузка инкрементов диапазона дельт
    4. Выгрузка инкрементов всех дельт
    5. Выгрузка снапшота
  4. Использование команды
    1. Выгрузка инкрементов
      1. Проверка недостающих инкрементов
      2. Выгрузка инкрементов всех дельт
      3. Выгрузка инкремента одной дельты
      4. Выгрузка инкрементов диапазона дельт
    2. Выгрузка снапшота
    3. Снятие блокировки логической БД в случае сбоя
  5. Аутентификация
  6. Синтаксис
    1. Параметры
    2. Ключевые слова
      1. --connect
      2. --datasourceType
      3. --topic
      4. --kafkaBrokersCore
      5. --kafkaBrokers
      6. --deltaNum
      7. --nextDeltasCount
      8. --partitionsCount
      9. --threadsCount
      10. --chunkSize
      11. --kafkaRetries
      12. --snapshotDepth
  7. Ограничения
  8. Примеры
    1. Выгрузка инкрементов всех дельт
    2. Выгрузка инкремента одной дельты
    3. Выгрузка инкремента диапазона дельты
    4. Выгрузка снапшота с историей
    5. Выгрузка снапшота без истории
    6. Пошаговый пример выгрузки бэкапа
      1. Подготовка логической БД
      2. Выгрузка полного бэкапа логической БД
      3. Удаление тестовой логической БД
  9. Варианты вывода команды
    1. Вывод при успешной выгрузке бэкапа
    2. Вывод при неуспешной выгрузке бэкапа
  10. Формат бэкапа в топике Kafka
    1. Формат ключа сообщения
    2. Формат тела сообщения
      1. Схема Avro
      2. Записи Avro

Команда BACKUP_TO_KAFKA выгружает бэкап логической базы данных в топик Kafka.

Выгружаемый бэкап содержит логическую схему данных и данные. В зависимости от параметров команды данные выгружаются с полной историей их изменений, частичной историей или без истории.

По умолчанию команда выгружает бэкап в топик того брокера сообщений Kafka, который задан в команде с помощью ключевого слова kafkaBrokersCore и (или) в конфигурации с помощью параметра ZOOKEEPER_KAFKA_ADDRESS. Чтобы выгрузить бэкап в другой брокер сообщений, укажите в команде ключевое слово kafkaBrokers с адресом нужного брокера.

Для запуска команды на хосте должна быть установлена среда исполнения Java (JRE) или набор инструментов для разработки на Java (JDK).

Из целостного бэкапа можно развернуть резервную копию логической БД или восстановить сбойную логическую БД, используя команду RESTORE_FROM_KAFKA.

Бэкап не является полным слепком исходной логической БД, так как не включает standalone-таблицы и внешние таблицы. В одном из режимов выгрузки (при выгрузке инкрементального бэкапа) в бэкапе также отсутствуют данные прокси-таблиц.

Режимы выгрузки бэкапа

Доступные режимы выгрузки бэкапа:

  • инкрементальный бэкап — данные выгружаются с полной историей изменений и разбивкой на дельты; данные при выгрузке разделяются на независимые части — инкременты;
  • снапшот-бэкап — выгружается текущее состояние данных с историей изменений указанной длительности.

Во всех режимах выгрузки бэкапа команда также выгружает текущее состояние схемы данных с полной историей ее изменений.

За один запуск команды можно выгрузить все недостающие инкременты, один инкремент или снапшот. Подробнее см. в секции Использование команды.

Содержимое бэкапа

Выгружаемая логическая схема данных

В бэкап выгружается логическая схема данных всех сущностей, присутствующих в журнале:

Внешние таблицы и standalone-таблицы не включаются в бэкап. При необходимости вы можете их пересоздать после успешного развертывания логической БД с помощью команды RESTORE_FROM_KAFKA.

Выгружаемые данные

В бэкап выгружаются данные следующих таблиц:

Партиционированные данные выгружаются с разделением на партиции — таким же образом, как они разделены в исходной логической БД.

Данные прокси-таблиц выгружаются при выгрузке снапшот-бэкапа и не выгружаются при выгрузке инкрементального бэкапа. Данные материализованных представлений не выгружаются во всех режимах выгрузки, но они автоматически восстанавливаются системой после того, как команда RESTORE_FROM_KAFKA успешно развернет таблицы, на которых построено представление.

Другие сущности, которые включаются в бэкап, — партиционированные таблицы и логические представления — не хранят данные и поэтому не требуют выгрузки данных; для них выгружаются только изменения схемы данных.

Содержимое инкремента

При выгрузке инкрементального бэкапа команда выгружает по каждой дельте инкремент, содержащий следующие объекты:

  • журнал — изменения логической схемы данных по состоянию на момент запуска команды;
  • изменения данных логических таблиц, внесенные в дельте и отдельных операциях записи. В инкремент включаются только те отдельные операции записи, которые были выполнены между выгружаемой дельтой и предшествующей ей.

Изменения данных представляют собой полную историю изменений данных логических таблиц, включая холодные данные.

В отличие от снапшота, инкремент содержит изменения данных, а не снимок состояния данных. Например:

  • дельта 1: добавили записи А и Б;
  • дельта 2: удалили запись А;
  • инкремент дельты 1: записи А и Б с признаком добавления (sys_op = 0);
  • инкремент дельты 2: запись А с признаком удаления (sys_op = 1).

Номера и метки времени дельт в инкрементах соответствуют исходной логической БД. Сохранение нумерации и меток времени операций записи не гарантируется.

Содержимое снапшота

При выгрузке снапшота команда выгружает следующие объекты:

  • журнал — изменения логической схемы данных по состоянию на момент запуска команды;
  • состояние данных логических таблиц и прокси-таблиц на указанный момент времени;
  • (при snapshotDepth больше 0) историю изменений данных логических таблиц и прокси-таблиц с указанного момента времени по последнюю закрытую дельту включительно.

В отличие от инкремента, снапшот без истории содержит снимок состояния данных, а не изменения данных. Например:

  • дельта 1: добавили записи А и Б;
  • дельта 2: удалили запись А;
  • снапшот по состоянию на дельту 1: записи А и Б;
  • снапшот по состоянию на дельту 2: запись Б.

Сохранение номеров и меток времени дельт и операций записи относительно исходной логической БД в снапшоте не гарантируется.

Технически данные выгружаются в топик Kafka в таком же формате, как инкрементальный бэкап, с тем отличием, что команда выгружает зафиксированное состояние данных как инкремент нулевой дельты и игнорирует другие инкременты в топике, если такие есть. Подробнее см. в секции Как работает команда > Выгрузка снапшота.

Как работает команда

Выгрузка бэкапа в топик Kafka

Оба вида бэкапа — инкременты и снапшоты — выгружаются в топик Kafka в формате Avro и имеют структуру, описанную в секции Формат бэкапа в топике Kafka.

Бэкап выгружается в топик следующего брокера Kafka:

  • указанного с помощью ключевого слова kafkaBrokers — если команда содержит kafkaBrokers;
  • указанного с помощью ключевого слова kafkaBrokersCore и (или) параметра конфигурации ZOOKEEPER_KAFKA_ADDRESS — если команда не содержит kafkaBrokers.

Команда выгружает бэкап одним или несколькими потоками — в зависимости от значения ключевого слова threadsCount. По умолчанию, если ключевое слово не указано, используется один поток.

При выгрузке, если топик новый, он разделяется на заданное количество партиций. Количество партиций в топике определяется:

  • настройками Kafka — если в брокере сообщений Kafka настроено автосоздание топиков;
  • ключевым словом partitionsCount — если автосоздание топиков отключено.

Выгрузка инкремента одной дельты

Если в команде указано ключевое слово deltaNum, но не указано ключевое слово nextDeltasCount, в топик выгружается инкремент указанной дельты.

На рисунке ниже показан пример выгрузки инкремента дельты с номером 1. Инкремент содержит изменения данных w2 и w3, внесенные между закрытием дельт 0 и 1, и все записи журнала по изменению схемы с с0 по с3 включительно.

Схема выгрузки инкремента одной дельты

Выгрузка инкрементов диапазона дельт

Если в команде указано ключевые слова deltaNum и nextDeltasCount, в топик выгружаются инкременты дельт из диапазона [deltaNum, deltaNum + nextDeltasCount]. Например, если значение deltaNum равно 2, а nextDeltasCount — 3, выгружаются инкременты следующих дельт: 2, 3, 4, 5.

На рисунке ниже показан пример выгрузки инкрементов дельт из диапазона [1, 1+1]. Инкременты содержат изменения данных w2, w3, w4 и w5, и все записи журнала по изменению схемы с с0 по с3 включительно.

Схема выгрузки инкрементов дельт из диапазона

Выгрузка инкрементов всех дельт

Если в команде не указано ни ключевое слово deltaNum, ни ключевое слово snapshotDepth, в топик выгружаются инкременты следующих дельт логической БД:

  • всех закрытых дельт — если в топике, куда выгружаются данные, нет ни одного инкремента этой логической БД;
  • «новых» закрытых дельт — если в топике уже есть хотя бы один инкремент этой логической БД. Под «новыми» закрытыми дельтами понимаются те дельты, чьи номера больше максимального номера дельты в топике. Промежуточные дельты, если какие-то из них отсутствуют в топике, НЕ выгружаются.

На рисунке ниже показан пример выгрузки инкрементов всех дельт в новый топик. В этом случае команда выгружает все изменения данных и все записи журнала.

Схема выгрузки инкрементов всех дельт

На рисунке ниже показан пример выгрузки инкрементов всех дельт в топик, где уже есть инкремент дельты 1 той же логической БД. В этом случае команда выгружает изменения данных w4 и w5 и все записи журнала.

Схема выгрузки инкрементов новых дельт

Выгрузка снапшота

Если в команде указано ключевое слово snapshotDepth, в топик выгружается снапшот логической БД, зафиксированный по состоянию на указанный момент времени. При значении snapshotDepth больше 0 команда также выгружает историю изменений данных с момента времени, зафиксированного в снапшоте, по последнюю закрытую дельту включительно.

Схема выгрузки снапшота на текущий момент без истории

Схема выгрузки снапшота на момент предпоследней дельты с историей изменений

Если момент времени задан в команде как количество дней, часов, минут или секунд, отсчитываемых в прошлое от текущего момента, снапшот фиксируется по состоянию на последнюю закрытую на тот момент дельту. Например, если команда запущена с ключом --snapshotDepth 2d, снапшот выгружается по состоянию на дельту, которая была последней два дня назад.

Подробнее о возможных значениях ключевого слова см. в описании snapshotDepth.

Нумерация дельт в выгружаемом снапшоте сдвигается: снапшот сохраняется как начальное состояние (дельта 0), а последующие дельты получают номера, следующие по порядку — 1, 2 и т.д. в зависимости от значения snapshotDepth.

Использование команды

Выгрузка инкрементов

Проверка недостающих инкрементов

Чтобы проверить список дельт, инкрементов которых не хватает в топике до полного бэкапа, выполните команду RESTORE_FROM_KAFKA с ключевым словом readonly.

Выгрузка инкрементов всех дельт

Чтобы выгрузить инкременты всех или только недостающих закрытых дельт:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA без ключевых слов deltaNum и shapshotDepth. В команде укажите ключевое слово topic со следующим значением:
    • если нужно выгрузить полный бэкап с нулевой до последней закрытой дельты, укажите топик, в который раньше не выгружались инкременты этой логической БД. При этом топик может содержать данные других логических БД;
    • если нужно обновить существующий бэкап последними изменениями, укажите топик, содержащий инкременты нужной логической БД.

Выгрузка инкремента одной дельты

Чтобы выгрузить инкремент одной дельты:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA с ключевым словом deltaNum, не указывая ключевое слово shapshotDepth.

Выгрузка инкрементов диапазона дельт

Чтобы выгрузить инкременты дельт из диапазона:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA с ключевыми словами deltaNum и nextDeltasCount, не указывая ключевое слово shapshotDepth.

Выгрузка снапшота

Чтобы выгрузить снапшот логической БД:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA с ключевым словом snapshotDepth, не указывая ключевое слово deltaNum.

Выгружайте каждый снапшот в новый топик. Корректное восстановление логической БД из топика, содержащего несколько снапшотов, не гарантируется.

Снятие блокировки логической БД в случае сбоя

Если во время работы команды произошел сбой, исходная логическая БД может остаться заблокированной для изменений схемы данных. В этом случае все DDL-запросы в логической БД будут возвращать ошибку Change operations are forbidden.

Чтобы снять блокировку изменений в логической БД, выполните запрос ALLOW_CHANGES, где в качестве кода-пароля в запросе укажите строку в формате backup_<имя_логической_БД>_<число_месяца_запуска_команды>. Например, если команда была запущена для логической БД marketing 9 февраля, следует выполнить запрос ALLOW_CHANGES(marketing, 'backup_marketing_9').

После снятия блокировки можно запустить выгрузку бэкапа еще раз.

Аутентификация

Команда поддерживает передачу токена (JWT) для последующей аутентификации в системе.

Если в системе включена аутентификация запросов, необходимо указать информацию о токене в составе строки подключения команды. Подробнее о формате строки подключения см. в секции Синтаксис.

Синтаксис

java -jar <dtm_tools_file_name>.jar \
--connect <prostore_host>:<prostore_port>/<db_name>[?user=jwt&password=<jwt_token>] \
backup_to_kafka \
[--datasourceType <datasource_name>] \
--topic <backup_topic> \
[–-kafkaBrokersCore <core_kafka_brokers_list>] \
[--kafkaBrokers <backup_kafka_broker_list>] \
[--deltaNum <delta_num>] \
[–-nextDeltasCount <next_deltas_count>] \
[--partitionsCount <partitions_count>] \
[--threadsCount <threads_count>] \
[--chunkSize <chunk_size>] \
[--kafkaRetries <kafka_retries>]
[--snapshotDepth <snapshot_depth>]

Параметры

dtm_tools_file_name

Имя jar-файла утилиты DTM Tools.

Ключевые слова

--connect

Задает строку подключения к ноде Prostore. Значение состоит из следующих элементов:

  1. prostore_host — IP-адрес или доменное имя ноды Prostore;
  2. db_name — номер порта для подключения к ноде Prostore, равный значению параметра конфигурации DTM_CORE_HTTP_PORT;
  3. db_name — имя логической БД, для которой нужно выгрузить инкременты;
  4. (если включена аутентификация запросов) строка ?user=jwt&password=<jwt_token>, где jwt_token — авторизационный токен.

--datasourceType

Задает имя датасорса-источника, из которого выгружаются данные. Значение указывается без кавычек и должно соответствовать имени датасорса, заданному в конфигурации.

Если ключевое слово не указано, данные каждой таблицы выгружаются из наиболее оптимального датасорса. Запрос на выгрузку бэкапа относится к категории «Реляционный».

--topic

Задает имя топика Kafka, куда выгружаются бэкап.

--kafkaBrokersCore

Задает список адресов брокеров сообщений Kafka, с которыми работает Prostore, и служит для ускорения работы команды.

Ключевое слово обязательно при запуске команды в инсталляциях, где сервисная БД в ZooKeeper недоступна со стороны утилиты DTM Tools и утилита не может запросить информацию о брокерах из конфигурации Prostore. В остальных случаях ключевое слово опционально.

Если ключевое слово не указано, команда запрашивает список адресов из конфигурации ноды Prostore.

--kafkaBrokers

Задает список адресов брокеров сообщений Kafka, куда выгружается бэкап, в формате host1:port1,host2:port2,..hostN:portN.

Ключевое слово нужно указывать, если требуется выгрузить бэкап в брокеры Kafka, отличные от тех, с которыми работает Prostore.

На рисунке ниже показана схема работы утилиты с двумя брокерами Kafka: один брокер используется утилитой и системой Prostore, второй — только утилитой. Первый брокер находится в одной сети с Prostore, а второй может находиться в другой сети, например, в удаленном дата-центре.

Схема работы утилиты с двумя брокерами Kafka

--deltaNum

Задает номер дельты delta_num, инкремент которой выгружается в топик, и может комбинироваться с ключевым словом nextDeltasCount для указания левой границы диапазона выгружаемых инкрементов.

Если ключевое слово не указано, выгружаются инкременты всех «новых» дельт — дельт, чьи номера больше максимального номера дельты среди инкрементов логической БД, уже хранящихся в топике.

--nextDeltasCount

Задает количество дельт, выгружаемых после дельты delta_num, и используется для выгрузки инкрементов диапазона дельт.

--partitionsCount

Задает количество партиций, создаваемых при выгрузке бэкапа в новый топик. Разделение топика на партиции позволяет ускорить выгрузку бэкапа за счет параллельных процессов выгрузки. Значение учитывается, если топик создается командой, то если в брокере Kafka отключено автосоздание топиков.

Если ключевое слово не указано, все данные выгружаются в одну партицию.

--threadsCount

Задает количество параллельных процессов выгрузки бэкапа и, тем самым, позволяет подобрать оптимальную скорость выгрузки в зависимости от производительности инфраструктуры.

Если ключевое слово не указано, все данные выгружаются одним потоком.

--chunkSize

Задает максимальное количество записей, сохраняемых в одном сообщении Kafka.

Если ключевое слово не указано, в сообщение записывается до 1000 записей.

--kafkaRetries

Задает количество попыток выгрузки бэкапа в топик, которое команда делает при запуске. Если ключевое слово не указано, делается 10 попыток.

--snapshotDepth

Задает момент времени, по состоянию на который выгружается бэкап. Момент времени определяется как точка в истории изменений данных логической БД, отсчитываемая от момента закрытия последней дельты на указанное количество дельт, дней, часов, минут или секунд.

Возможные значения (где X — положительное число):

  • X — X дельт,
  • Xs — X секунд,
  • Xm — X минут,
  • Xh — X часов,
  • Xd — X дней.

Если ключевое слово не указано, выгружается инкрементальный бэкап.

Ограничения

  • Запуск команды недоступен, если в исходной логической БД есть незавершенные операции по изменению схемы данных.
  • Во время работы команды недоступно изменение схемы данных логической БД, для которой выгружается бэкап.
  • Команда не проверяет полноту журнала логической БД.
  • Выгрузка инкрементов запускается только при наличии в логической БД более новых закрытых дельты, чем те, которые уже хранятся в топике. Новые изменения данных без меток времени и изменения схемы данных при отсутствии новых дельт не приводят к выгрузке инкрементов.
  • Значение ключевого слова partitions_count игнорируется, если в брокере сообщений Kafka включено автосоздание топиков или бэкап выгружается в существующий топик.
  • В составе инкрементов для прокси-таблиц выгружается только логическая схема данных, без данных прокси-таблиц.
  • Standalone-таблицы и внешние таблицы не включаются в бэкап.

Примеры

Выгрузка инкрементов всех дельт

Выгрузка инкрементов всех дельт логической БД marketing из ADQM с выгрузкой в два потока и распределением на три партиции:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2

Выгрузка инкрементов с указанием токена:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing?user=jwt&password=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2

Выгрузка инкремента одной дельты

Выгрузка инкремента дельты 12 логической БД marketing с указанием списка брокеров Kafka:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --kafkaRetries 5

Выгрузка инкремента диапазона дельты

Выгрузка инкрементов дельт с 12 по 17 логической БД marketing с указанием списка брокеров Kafka:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --nextDeltasCount 5 --kafkaRetries 5

Выгрузка снапшота с историей

Выгрузка снапшота логической БД marketing с историей изменений данных за две дельты:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 2

Выгрузка снапшота логической БД marketing с историей изменений данных за 10 дней:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 10d

Выгрузка снапшота без истории

Выгрузка снапшота логической БД marketing без истории изменений данных:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 0

Пошаговый пример выгрузки бэкапа

Подготовка логической БД

Создание и наполнение тестовой логической базы данных sourcedb:

CREATE DATABASE sourcedb;
USE sourcedb;
CREATE TABLE sourcedb.table1_adqm
    (
      id bigint not null,
      boolean_col boolean,
      int32_col int32,
      int_col int,
      bigint_col bigint,
      float_col float,
      double_col double,
      char_col char(10),
      varchar_col varchar,
      uuid_col uuid,
      link_col link,
      date_col date,
      time_col time,
      timestamp_col timestamp,
      PRIMARY KEY (id)
    )
DISTRIBUTED BY (id)
DATASOURCE_TYPE ('adqm');
 
BEGIN DELTA;
INSERT INTO sourcedb.table1_adqm
VALUES (1, false, 32768, 65537, 1280000, 3.141526, 2.718281828,
 'char_value', 'varchar_value','12345678-1234-abcd-ef00-1234567899ab',
 'https://www.google.com', '1970-01-11', '13:10:30', '1970-01-01 13:10:30');
COMMIT DELTA;

Выгрузка полного бэкапа логической БД

Выгрузка полного инкрементального бэкапа логической БД sourcedb из датасорса adqm с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb

Инкрементальный бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb.

Выгрузка снапшот-бэкапа логической БД sourcedb из датасорса adqm с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090 (без сохранения истории изменений):

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb_snapshot –-snapshotDepth 0

Снапшот-бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb_snapshot.

Удаление тестовой логической БД

Опционально после окончания выгрузки бэкапа можно удалить исходную базу данных, чтобы протестировать ее последующее восстановление из бэкапа, сохраненного в топиках backup_topic_sourcedb и backup_topic_sourcedb_snapshot.

DROP DATABASE sourcedb

Пошаговый пример, продолжающий этот пример, см. в разделе Команда RESTORE_FROM_KAFKA > Пошаговый пример развертывания логической БД из бэкапа.

Варианты вывода команды

Вывод при успешной выгрузке бэкапа

Ниже показан примерный вид вывода команды при успешной выгрузке бэкапа (набор сообщений и их текст могут изменяться от версии к версии):

...
Started BACKUP process with params ...
...
Deny changes in datamart [<db_name>] with code [backup_<db_name>_<date_of_the_month>]
Delta range ... to backup
Got changes for datamart <db_name>
Tables received for datamart ...
Started backup delta processing
...
Got delta date for datamart ..., delta num ...
...
Started backup delta table processing
...
Topic ... with ... partitions successfully created
...
Successfully created download ext table ...
...
Data inserted into kafka ...
...
Successfully dropped download ext table ...
...
Successfully expload delta table to kafka ...
...
Started archiving delta table
...
Polled message 0 from source topic ...
Trying to send data message ... to topic [<topic_defined_in_the_command>] ...
...
Successfully archived delta table to backup topic <topic_defined_in_the_command>
Topic ... successfully deleted
Start archiving delta changelog ... into topic <topic_defined_in_the_command>
Trying to send changelog message ... to topic [<topic_defined_in_the_command>] ...
Successfully archived delta changelog into topic <topic_defined_in_the_command>
...
Allow changes in datamart <db_name>
BACKUP is done

Вывод при неуспешной выгрузке бэкапа

При неуспешной выгрузке бэкапа команда выводит текст ошибки.

Формат бэкапа в топике Kafka

Бэкап выгружается в топик в виде сообщений Kafka.

Каждое сообщение состоит из ключа и тела и содержит данные таблицы или журнала. Ключ сообщения состоит из схемы данных Avro и одной записи Avro и содержит метаинформацию инкремента в формате, описанном ниже в секции Формат ключа сообщения. Тело сообщения представляет собой файл Avro (Object Container File), состоящий из заголовка со схемой данных Avro и записей Avro.

Схема данных в ключе сообщения одинакова для всех сообщений, а схема данных в теле сообщения зависит от типа сообщения (см. рисунок ниже).

Для наглядности все бинарные данные Avro на рисунке и в примерах ниже представлены в JSON-формате.

На рисунке ниже показаны составные части сообщения Kafka, содержащего данные бэкапа.

Формат сообщения Kafka с данными бэкапа

Формат ключа сообщения

Схема Avro в ключе сообщения имеет следующий формат:

{
  "name": "topicA",
  "type": "record",
  "fields": [
    {
      "name": "incrementId",
      "type": "string"
    },
    {
      "name": "messageType",
      "type": "long"
    },
    {
      "name": "datamart",
      "type": "string"
    },
    {
      "name": "deltaNum",
      "type": "long"
    },
    {
      "name": "deltaDate",
      "type": "long",
      "logicalType": "timestamp-micros"
    },
    {
      "name": "tableName",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "streamNumber",
      "type": "long"
    },
    {
      "name": "streamTotal",
      "type": "long"
    },
    {
      "name": "chunkNumber",
      "type": "long"
    },
    {
      "name": "isLastChunk",
      "type": "boolean"
    }
  ]
}

Параметры:

incrementId

Идентификатор выгрузки инкремента, уникальный для каждого запуска команды.

messageType

Тип сообщения. Возможные значения: 0 — сообщение с данными логической таблицы или прокси-таблицы, 1 — сообщение с данными журнала.

datamart

Имя логической базы данных, к которой относится инкремент.

deltaNum

Номер дельты, к которой относятся данные.

deltaDate

Дата и время закрытия дельты deltaNum в виде целого числа.

tableName

Имя таблицы, из которой выгружены данные. В сообщениях с данными журнала поле имеет значение null.

streamNumber

Номер потока, записавшего сообщение. Потоки нумеруются с 1.

streamTotal

Общее число потоков записи сообщений.

chunkNumber

Номер сообщения среди всех сообщений потока streamNumber. Сообщения нумеруются с 1.

isLastChunk

Признак последнего сообщения в потоке streamNumber. Возможные значения: true, false.

Пример записи Avro в ключе сообщения:

{
  "incrementId": "f81d4fae-7dec-11d0-a765-00a0c91e6bf6",
  "messageType": 0,
  "datamart": "marketing",
  "deltaNum": 55,
  "deltaDate": 1638360336000000,
  "tableName": "stores",
  "streamNumber": 1,
  "streamTotal": 2,
  "chunkNumber": 4,
  "isLastChunk": false
}

Формат тела сообщения

Схема Avro

Состав схемы данных Avro в теле сообщения зависит от типа сообщения:

  • все сообщения с данными журнала имеют одинаковую схему данных,
  • сообщение с данными таблицы имеет схему данных, соответствующую структуре этой таблицы.

Схема Avro в сообщениях, содержащих данные журнала, имеет следующий вид:

{
  "type": "record",
  "name": "changeLog",
  "fields": [
    {
      "name": "changeNum",
      "type": "long"
    },
    {
      "name": "entityName",
      "type": "string"
    },
    {
      "name": "changeQuery",
      "type": "string"
    }
  ]
}

Параметры:

changeNum

Порядковый номер изменения в журнале.

entityName

Имя логической сущности, к которой относится изменение.

changeQuery

Запрос на создание, удаление или изменение логической сущности.

Пример схемы Avro в сообщении, содержащем данные логической таблицы:

{
  "type": "record",
  "name": "row",
  "fields": [
    {
      "name": "id",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "transaction_date",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "product_code",
      "type": ["null","string"],
      "doc": ""
    },
    {
      "name": "product_units",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "store_id",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "description",
      "type": ["null","string"],
      "doc": ""
    },
    {
      "name": "sys_op",
      "type": ["null","int"],
      "doc": ""
    }
  ]
}

Записи Avro

Пример записей Avro в сообщении с данными журнала:

[
  {
    "changeNum": 0,
    "entityName": "marketing",
    "changeQuery": "CREATE TABLE marketing.sales (id INT NOT NULL, transaction_date TIMESTAMP NOT NULL, product_code VARCHAR(256) NOT NULL, product_units INT NOT NULL, store_id INT NOT NULL, description VARCHAR(256), PRIMARY KEY (id)) DISTRIBUTED BY (id))"
  },
  {
    "changeNum": 1,
    "entityName": "stores_by_sold_products",
    "changeQuery": "CREATE VIEW marketing.stores_by_sold_products AS SELECT store_id, SUM(product_units) AS product_amount FROM marketing.sales GROUP BY store_id ORDER BY product_amount DESC LIMIT 20"
  },
  {
    "changeNum": 2,
    "entityName": "stores_by_sold_products",
    "changeQuery": "DROP VIEW marketing.stores_by_sold_products"
  }
]

Пример записей Avro в сообщении с данными логической таблицы:

[
  {
    "id": 1000111,
    "transaction_date": 1614269474000000,
    "product_code": "ABC102101",
    "product_units": 2,
    "store_id": 1000012345,
    "description": "Покупка по акции 1+1",
    "sys_op": 0
  },
  {
    "id": 1000112,
    "transaction_date": 1614334214000000,
    "product_code": "ABC102001",
    "product_units": 1,
    "store_id": 1000000123,
    "description": "Покупка без акций",
    "sys_op": 1
  }
]