Команда BACKUP_TO_KAFKA
Содержание раздела
Команда BACKUP_TO_KAFKA
выгружает бэкап логической базы данных в топик Kafka.
Выгружаемый бэкап содержит логическую схему данных и данные. В зависимости от параметров команды данные выгружаются с полной историей их изменений, частичной историей или без истории.
По умолчанию команда выгружает бэкап в топик того брокера сообщений Kafka, который задан в команде с помощью ключевого слова kafkaBrokersCore и (или) в конфигурации с помощью параметра ZOOKEEPER_KAFKA_ADDRESS
. Чтобы выгрузить бэкап в другой брокер сообщений, укажите в команде ключевое слово kafkaBrokers с адресом нужного брокера.
Для запуска команды на хосте должна быть установлена среда исполнения Java (JRE) или набор инструментов для разработки на Java (JDK).
Из целостного бэкапа можно развернуть резервную копию логической БД или восстановить сбойную логическую БД, используя команду RESTORE_FROM_KAFKA.
Бэкап не является полным слепком исходной логической БД, так как не включает standalone-таблицы и внешние таблицы. В одном из режимов выгрузки (при выгрузке инкрементального бэкапа) в бэкапе также отсутствуют данные прокси-таблиц.
Режимы выгрузки бэкапа
Доступные режимы выгрузки бэкапа:
- инкрементальный бэкап — данные выгружаются с полной историей изменений и разбивкой на дельты; данные при выгрузке разделяются на независимые части — инкременты;
- снапшот-бэкап — выгружается текущее состояние данных с историей изменений указанной длительности.
Во всех режимах выгрузки бэкапа команда также выгружает текущее состояние схемы данных с полной историей ее изменений.
За один запуск команды можно выгрузить все недостающие инкременты, один инкремент или снапшот. Подробнее см. в секции Использование команды.
Содержимое бэкапа
Выгружаемая логическая схема данных
В бэкап выгружается логическая схема данных всех сущностей, присутствующих в журнале:
- всех видов логических таблиц,
- прокси-таблиц,
- материализованных представлений,
- логических представлений.
Внешние таблицы и standalone-таблицы не включаются в бэкап. При необходимости вы можете их пересоздать после успешного развертывания логической БД с помощью команды RESTORE_FROM_KAFKA
.
Выгружаемые данные
В бэкап выгружаются данные следующих таблиц:
- обычных логических таблиц,
- партиций,
- (только при выгрузке снапшот-бэкапа) прокси-таблиц.
Партиционированные данные выгружаются с разделением на партиции — таким же образом, как они разделены в исходной логической БД.
Данные прокси-таблиц выгружаются при выгрузке снапшот-бэкапа и не выгружаются при выгрузке инкрементального бэкапа. Данные материализованных представлений не выгружаются во всех режимах выгрузки, но они автоматически восстанавливаются системой после того, как команда RESTORE_FROM_KAFKA
успешно развернет таблицы, на которых построено представление.
Другие сущности, которые включаются в бэкап, — партиционированные таблицы и логические представления — не хранят данные и поэтому не требуют выгрузки данных; для них выгружаются только изменения схемы данных.
Содержимое инкремента
При выгрузке инкрементального бэкапа команда выгружает по каждой дельте инкремент, содержащий следующие объекты:
- журнал — изменения логической схемы данных по состоянию на момент запуска команды;
- изменения данных логических таблиц, внесенные в дельте и отдельных операциях записи. В инкремент включаются только те отдельные операции записи, которые были выполнены между выгружаемой дельтой и предшествующей ей.
Изменения данных представляют собой полную историю изменений данных логических таблиц, включая холодные данные.
В отличие от снапшота, инкремент содержит изменения данных, а не снимок состояния данных. Например:
- дельта 1: добавили записи
А
иБ
; - дельта 2: удалили запись
А
; - инкремент дельты 1: записи
А
иБ
с признаком добавления (sys_op = 0
); - инкремент дельты 2: запись
А
с признаком удаления (sys_op = 1
).
Номера и метки времени дельт в инкрементах соответствуют исходной логической БД. Сохранение нумерации и меток времени операций записи не гарантируется.
Содержимое снапшота
При выгрузке снапшота команда выгружает следующие объекты:
- журнал — изменения логической схемы данных по состоянию на момент запуска команды;
- состояние данных логических таблиц и прокси-таблиц на указанный момент времени;
- (при
snapshotDepth
больше 0) историю изменений данных логических таблиц и прокси-таблиц с указанного момента времени по последнюю закрытую дельту включительно.
В отличие от инкремента, снапшот без истории содержит снимок состояния данных, а не изменения данных. Например:
- дельта 1: добавили записи
А
иБ
; - дельта 2: удалили запись
А
; - снапшот по состоянию на дельту 1: записи
А
иБ
; - снапшот по состоянию на дельту 2: запись
Б
.
Сохранение номеров и меток времени дельт и операций записи относительно исходной логической БД в снапшоте не гарантируется.
Технически данные выгружаются в топик Kafka в таком же формате, как инкрементальный бэкап, с тем отличием, что команда выгружает зафиксированное состояние данных как инкремент нулевой дельты и игнорирует другие инкременты в топике, если такие есть. Подробнее см. в секции Как работает команда > Выгрузка снапшота.
Как работает команда
Выгрузка бэкапа в топик Kafka
Оба вида бэкапа — инкременты и снапшоты — выгружаются в топик Kafka в формате Avro и имеют структуру, описанную в секции Формат бэкапа в топике Kafka.
Бэкап выгружается в топик следующего брокера Kafka:
- указанного с помощью ключевого слова kafkaBrokers — если команда содержит
kafkaBrokers
; - указанного с помощью ключевого слова kafkaBrokersCore и (или) параметра конфигурации
ZOOKEEPER_KAFKA_ADDRESS
— если команда не содержитkafkaBrokers
.
Команда выгружает бэкап одним или несколькими потоками — в зависимости от значения ключевого слова threadsCount. По умолчанию, если ключевое слово не указано, используется один поток.
При выгрузке, если топик новый, он разделяется на заданное количество партиций. Количество партиций в топике определяется:
- настройками Kafka — если в брокере сообщений Kafka настроено автосоздание топиков;
- ключевым словом partitionsCount — если автосоздание топиков отключено.
Выгрузка инкремента одной дельты
Если в команде указано ключевое слово deltaNum, но не указано ключевое слово nextDeltasCount, в топик выгружается инкремент указанной дельты.
На рисунке ниже показан пример выгрузки инкремента дельты с номером 1. Инкремент содержит изменения данных w2
и w3
, внесенные между закрытием дельт 0 и 1, и все записи журнала по изменению схемы с с0
по с3
включительно.
Выгрузка инкрементов диапазона дельт
Если в команде указано ключевые слова deltaNum и nextDeltasCount, в топик выгружаются инкременты дельт из диапазона [deltaNum, deltaNum + nextDeltasCount]
. Например, если значение deltaNum
равно 2, а nextDeltasCount
— 3, выгружаются инкременты следующих дельт: 2, 3, 4, 5.
На рисунке ниже показан пример выгрузки инкрементов дельт из диапазона [1, 1+1]
. Инкременты содержат изменения данных w2
, w3
, w4
и w5
, и все записи журнала по изменению схемы с с0
по с3
включительно.
Выгрузка инкрементов всех дельт
Если в команде не указано ни ключевое слово deltaNum, ни ключевое слово snapshotDepth, в топик выгружаются инкременты следующих дельт логической БД:
- всех закрытых дельт — если в топике, куда выгружаются данные, нет ни одного инкремента этой логической БД;
- «новых» закрытых дельт — если в топике уже есть хотя бы один инкремент этой логической БД. Под «новыми» закрытыми дельтами понимаются те дельты, чьи номера больше максимального номера дельты в топике. Промежуточные дельты, если какие-то из них отсутствуют в топике, НЕ выгружаются.
На рисунке ниже показан пример выгрузки инкрементов всех дельт в новый топик. В этом случае команда выгружает все изменения данных и все записи журнала.
На рисунке ниже показан пример выгрузки инкрементов всех дельт в топик, где уже есть инкремент дельты 1 той же логической БД. В этом случае команда выгружает изменения данных w4
и w5
и все записи журнала.
Выгрузка снапшота
Если в команде указано ключевое слово snapshotDepth, в топик выгружается снапшот логической БД, зафиксированный по состоянию на указанный момент времени. При значении snapshotDepth
больше 0 команда также выгружает историю изменений данных с момента времени, зафиксированного в снапшоте, по последнюю закрытую дельту включительно.
Если момент времени задан в команде как количество дней, часов, минут или секунд, отсчитываемых в прошлое от текущего момента, снапшот фиксируется по состоянию на последнюю закрытую на тот момент дельту. Например, если команда запущена с ключом --snapshotDepth 2d
, снапшот выгружается по состоянию на дельту, которая была последней два дня назад.
Подробнее о возможных значениях ключевого слова см. в описании snapshotDepth.
Нумерация дельт в выгружаемом снапшоте сдвигается: снапшот сохраняется как начальное состояние (дельта 0), а последующие дельты получают номера, следующие по порядку — 1, 2 и т.д. в зависимости от значения snapshotDepth
.
Использование команды
Выгрузка инкрементов
Проверка недостающих инкрементов
Чтобы проверить список дельт, инкрементов которых не хватает в топике до полного бэкапа, выполните команду RESTORE_FROM_KAFKA с ключевым словом readonly.
Выгрузка инкрементов всех дельт
Чтобы выгрузить инкременты всех или только недостающих закрытых дельт:
- Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
- Выполните команду
BACKUP_TO_KAFKA
без ключевых словdeltaNum
иshapshotDepth
. В команде укажите ключевое слово topic со следующим значением:- если нужно выгрузить полный бэкап с нулевой до последней закрытой дельты, укажите топик, в который раньше не выгружались инкременты этой логической БД. При этом топик может содержать данные других логических БД;
- если нужно обновить существующий бэкап последними изменениями, укажите топик, содержащий инкременты нужной логической БД.
Выгрузка инкремента одной дельты
Чтобы выгрузить инкремент одной дельты:
- Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
- Выполните команду
BACKUP_TO_KAFKA
с ключевым словом deltaNum, не указывая ключевое словоshapshotDepth
.
Выгрузка инкрементов диапазона дельт
Чтобы выгрузить инкременты дельт из диапазона:
- Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
- Выполните команду
BACKUP_TO_KAFKA
с ключевыми словами deltaNum и nextDeltasCount, не указывая ключевое словоshapshotDepth
.
Выгрузка снапшота
Чтобы выгрузить снапшот логической БД:
- Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
- Выполните команду
BACKUP_TO_KAFKA
с ключевым словом snapshotDepth, не указывая ключевое словоdeltaNum
.
Выгружайте каждый снапшот в новый топик. Корректное восстановление логической БД из топика, содержащего несколько снапшотов, не гарантируется.
Снятие блокировки логической БД в случае сбоя
Если во время работы команды произошел сбой, исходная логическая БД может остаться заблокированной для изменений схемы данных. В этом случае все DDL-запросы в логической БД будут возвращать ошибку Change operations are forbidden
.
Чтобы снять блокировку изменений в логической БД, выполните запрос ALLOW_CHANGES, где в качестве кода-пароля в запросе укажите строку в формате backup_<имя_логической_БД>_<число_месяца_запуска_команды>
. Например, если команда была запущена для логической БД marketing
9 февраля, следует выполнить запрос ALLOW_CHANGES(marketing, 'backup_marketing_9')
.
После снятия блокировки можно запустить выгрузку бэкапа еще раз.
Аутентификация
Команда поддерживает передачу токена (JWT) для последующей аутентификации в системе.
Если в системе включена аутентификация запросов, необходимо указать информацию о токене в составе строки подключения команды. Подробнее о формате строки подключения см. в секции Синтаксис.
Синтаксис
java -jar <dtm_tools_file_name>.jar \
--connect <prostore_host>:<prostore_port>/<db_name>[?user=jwt&password=<jwt_token>] \
backup_to_kafka \
[--datasourceType <datasource_name>] \
--topic <backup_topic> \
[–-kafkaBrokersCore <core_kafka_brokers_list>] \
[--kafkaBrokers <backup_kafka_broker_list>] \
[--deltaNum <delta_num>] \
[–-nextDeltasCount <next_deltas_count>] \
[--partitionsCount <partitions_count>] \
[--threadsCount <threads_count>] \
[--chunkSize <chunk_size>] \
[--kafkaRetries <kafka_retries>]
[--snapshotDepth <snapshot_depth>]
Параметры
dtm_tools_file_name
-
Имя jar-файла утилиты DTM Tools.
Ключевые слова
--connect
Задает строку подключения к ноде Prostore. Значение состоит из следующих элементов:
prostore_host
— IP-адрес или доменное имя ноды Prostore;db_name
— номер порта для подключения к ноде Prostore, равный значению параметра конфигурацииDTM_CORE_HTTP_PORT
;db_name
— имя логической БД, для которой нужно выгрузить инкременты;- (если включена аутентификация запросов) строка
?user=jwt&password=<jwt_token>
, гдеjwt_token
— авторизационный токен.
--datasourceType
Задает имя датасорса-источника, из которого выгружаются данные. Значение указывается без кавычек и должно соответствовать имени датасорса, заданному в конфигурации.
Если ключевое слово не указано, данные каждой таблицы выгружаются из наиболее оптимального датасорса. Запрос на выгрузку бэкапа относится к категории «Реляционный».
--topic
Задает имя топика Kafka, куда выгружаются бэкап.
--kafkaBrokersCore
Задает список адресов брокеров сообщений Kafka, с которыми работает Prostore, и служит для ускорения работы команды.
Ключевое слово обязательно при запуске команды в инсталляциях, где сервисная БД в ZooKeeper недоступна со стороны утилиты DTM Tools и утилита не может запросить информацию о брокерах из конфигурации Prostore. В остальных случаях ключевое слово опционально.
Если ключевое слово не указано, команда запрашивает список адресов из конфигурации ноды Prostore.
--kafkaBrokers
Задает список адресов брокеров сообщений Kafka, куда выгружается бэкап, в формате host1:port1,host2:port2,..hostN:portN
.
Ключевое слово нужно указывать, если требуется выгрузить бэкап в брокеры Kafka, отличные от тех, с которыми работает Prostore.
На рисунке ниже показана схема работы утилиты с двумя брокерами Kafka: один брокер используется утилитой и системой Prostore, второй — только утилитой. Первый брокер находится в одной сети с Prostore, а второй может находиться в другой сети, например, в удаленном дата-центре.
--deltaNum
Задает номер дельты delta_num
, инкремент которой выгружается в топик, и может комбинироваться с ключевым словом nextDeltasCount для указания левой границы диапазона выгружаемых инкрементов.
Если ключевое слово не указано, выгружаются инкременты всех «новых» дельт — дельт, чьи номера больше максимального номера дельты среди инкрементов логической БД, уже хранящихся в топике.
--nextDeltasCount
Задает количество дельт, выгружаемых после дельты delta_num
, и используется для выгрузки инкрементов диапазона дельт.
--partitionsCount
Задает количество партиций, создаваемых при выгрузке бэкапа в новый топик. Разделение топика на партиции позволяет ускорить выгрузку бэкапа за счет параллельных процессов выгрузки. Значение учитывается, если топик создается командой, то если в брокере Kafka отключено автосоздание топиков.
Если ключевое слово не указано, все данные выгружаются в одну партицию.
--threadsCount
Задает количество параллельных процессов выгрузки бэкапа и, тем самым, позволяет подобрать оптимальную скорость выгрузки в зависимости от производительности инфраструктуры.
Если ключевое слово не указано, все данные выгружаются одним потоком.
--chunkSize
Задает максимальное количество записей, сохраняемых в одном сообщении Kafka.
Если ключевое слово не указано, в сообщение записывается до 1000 записей.
--kafkaRetries
Задает количество попыток выгрузки бэкапа в топик, которое команда делает при запуске. Если ключевое слово не указано, делается 10 попыток.
--snapshotDepth
Задает момент времени, по состоянию на который выгружается бэкап. Момент времени определяется как точка в истории изменений данных логической БД, отсчитываемая от момента закрытия последней дельты на указанное количество дельт, дней, часов, минут или секунд.
Возможные значения (где X
— положительное число):
X
— X дельт,Xs
— X секунд,Xm
— X минут,Xh
— X часов,Xd
— X дней.
Если ключевое слово не указано, выгружается инкрементальный бэкап.
Ограничения
- Запуск команды недоступен, если в исходной логической БД есть незавершенные операции по изменению схемы данных.
- Во время работы команды недоступно изменение схемы данных логической БД, для которой выгружается бэкап.
- Команда не проверяет полноту журнала логической БД.
- Выгрузка инкрементов запускается только при наличии в логической БД более новых закрытых дельты, чем те, которые уже хранятся в топике. Новые изменения данных без меток времени и изменения схемы данных при отсутствии новых дельт не приводят к выгрузке инкрементов.
- Значение ключевого слова partitions_count игнорируется, если в брокере сообщений Kafka включено автосоздание топиков или бэкап выгружается в существующий топик.
- В составе инкрементов для прокси-таблиц выгружается только логическая схема данных, без данных прокси-таблиц.
- Standalone-таблицы и внешние таблицы не включаются в бэкап.
Примеры
Выгрузка инкрементов всех дельт
Выгрузка инкрементов всех дельт логической БД marketing
из ADQM с выгрузкой в два потока и распределением на три партиции:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2
Выгрузка инкрементов с указанием токена:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing?user=jwt&password=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2
Выгрузка инкремента одной дельты
Выгрузка инкремента дельты 12
логической БД marketing
с указанием списка брокеров Kafka:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --kafkaRetries 5
Выгрузка инкремента диапазона дельты
Выгрузка инкрементов дельт с 12 по 17 логической БД marketing
с указанием списка брокеров Kafka:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --nextDeltasCount 5 --kafkaRetries 5
Выгрузка снапшота с историей
Выгрузка снапшота логической БД marketing
с историей изменений данных за две дельты:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 2
Выгрузка снапшота логической БД marketing
с историей изменений данных за 10 дней:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 10d
Выгрузка снапшота без истории
Выгрузка снапшота логической БД marketing
без истории изменений данных:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup_snaphot –-snapshotDepth 0
Пошаговый пример выгрузки бэкапа
Подготовка логической БД
Создание и наполнение тестовой логической базы данных sourcedb
:
CREATE DATABASE sourcedb;
USE sourcedb;
CREATE TABLE sourcedb.table1_adqm
(
id bigint not null,
boolean_col boolean,
int32_col int32,
int_col int,
bigint_col bigint,
float_col float,
double_col double,
char_col char(10),
varchar_col varchar,
uuid_col uuid,
link_col link,
date_col date,
time_col time,
timestamp_col timestamp,
PRIMARY KEY (id)
)
DISTRIBUTED BY (id)
DATASOURCE_TYPE ('adqm');
BEGIN DELTA;
INSERT INTO sourcedb.table1_adqm
VALUES (1, false, 32768, 65537, 1280000, 3.141526, 2.718281828,
'char_value', 'varchar_value','12345678-1234-abcd-ef00-1234567899ab',
'https://www.google.com', '1970-01-11', '13:10:30', '1970-01-01 13:10:30');
COMMIT DELTA;
Выгрузка полного бэкапа логической БД
Выгрузка полного инкрементального бэкапа логической БД sourcedb
из датасорса adqm
с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090
:
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb
Инкрементальный бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb
.
Выгрузка снапшот-бэкапа логической БД sourcedb
из датасорса adqm
с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090
(без сохранения истории изменений):
java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb_snapshot –-snapshotDepth 0
Снапшот-бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb_snapshot
.
Удаление тестовой логической БД
Опционально после окончания выгрузки бэкапа можно удалить исходную базу данных, чтобы протестировать ее последующее восстановление из бэкапа, сохраненного в топиках backup_topic_sourcedb
и backup_topic_sourcedb_snapshot
.
DROP DATABASE sourcedb
Пошаговый пример, продолжающий этот пример, см. в разделе Команда RESTORE_FROM_KAFKA > Пошаговый пример развертывания логической БД из бэкапа.
Варианты вывода команды
Вывод при успешной выгрузке бэкапа
Ниже показан примерный вид вывода команды при успешной выгрузке бэкапа (набор сообщений и их текст могут изменяться от версии к версии):
...
Started BACKUP process with params ...
...
Deny changes in datamart [<db_name>] with code [backup_<db_name>_<date_of_the_month>]
Delta range ... to backup
Got changes for datamart <db_name>
Tables received for datamart ...
Started backup delta processing
...
Got delta date for datamart ..., delta num ...
...
Started backup delta table processing
...
Topic ... with ... partitions successfully created
...
Successfully created download ext table ...
...
Data inserted into kafka ...
...
Successfully dropped download ext table ...
...
Successfully expload delta table to kafka ...
...
Started archiving delta table
...
Polled message 0 from source topic ...
Trying to send data message ... to topic [<topic_defined_in_the_command>] ...
...
Successfully archived delta table to backup topic <topic_defined_in_the_command>
Topic ... successfully deleted
Start archiving delta changelog ... into topic <topic_defined_in_the_command>
Trying to send changelog message ... to topic [<topic_defined_in_the_command>] ...
Successfully archived delta changelog into topic <topic_defined_in_the_command>
...
Allow changes in datamart <db_name>
BACKUP is done
Вывод при неуспешной выгрузке бэкапа
При неуспешной выгрузке бэкапа команда выводит текст ошибки.
Формат бэкапа в топике Kafka
Бэкап выгружается в топик в виде сообщений Kafka.
Каждое сообщение состоит из ключа и тела и содержит данные таблицы или журнала. Ключ сообщения состоит из схемы данных Avro и одной записи Avro и содержит метаинформацию инкремента в формате, описанном ниже в секции Формат ключа сообщения. Тело сообщения представляет собой файл Avro (Object Container File), состоящий из заголовка со схемой данных Avro и записей Avro.
Схема данных в ключе сообщения одинакова для всех сообщений, а схема данных в теле сообщения зависит от типа сообщения (см. рисунок ниже).
Для наглядности все бинарные данные Avro на рисунке и в примерах ниже представлены в JSON-формате.
На рисунке ниже показаны составные части сообщения Kafka, содержащего данные бэкапа.
Формат ключа сообщения
Схема Avro в ключе сообщения имеет следующий формат:
{
"name": "topicA",
"type": "record",
"fields": [
{
"name": "incrementId",
"type": "string"
},
{
"name": "messageType",
"type": "long"
},
{
"name": "datamart",
"type": "string"
},
{
"name": "deltaNum",
"type": "long"
},
{
"name": "deltaDate",
"type": "long",
"logicalType": "timestamp-micros"
},
{
"name": "tableName",
"type": [
"null",
"string"
]
},
{
"name": "streamNumber",
"type": "long"
},
{
"name": "streamTotal",
"type": "long"
},
{
"name": "chunkNumber",
"type": "long"
},
{
"name": "isLastChunk",
"type": "boolean"
}
]
}
Параметры:
incrementId
-
Идентификатор выгрузки инкремента, уникальный для каждого запуска команды.
messageType
-
Тип сообщения. Возможные значения: 0 — сообщение с данными логической таблицы или прокси-таблицы, 1 — сообщение с данными журнала.
datamart
-
Имя логической базы данных, к которой относится инкремент.
deltaNum
-
Номер дельты, к которой относятся данные.
deltaDate
-
Дата и время закрытия дельты
deltaNum
в виде целого числа. tableName
-
Имя таблицы, из которой выгружены данные. В сообщениях с данными журнала поле имеет значение
null
. streamNumber
-
Номер потока, записавшего сообщение. Потоки нумеруются с 1.
streamTotal
-
Общее число потоков записи сообщений.
chunkNumber
-
Номер сообщения среди всех сообщений потока
streamNumber
. Сообщения нумеруются с 1. isLastChunk
-
Признак последнего сообщения в потоке
streamNumber
. Возможные значения:true
,false
.
Пример записи Avro в ключе сообщения:
{
"incrementId": "f81d4fae-7dec-11d0-a765-00a0c91e6bf6",
"messageType": 0,
"datamart": "marketing",
"deltaNum": 55,
"deltaDate": 1638360336000000,
"tableName": "stores",
"streamNumber": 1,
"streamTotal": 2,
"chunkNumber": 4,
"isLastChunk": false
}
Формат тела сообщения
Схема Avro
Состав схемы данных Avro в теле сообщения зависит от типа сообщения:
- все сообщения с данными журнала имеют одинаковую схему данных,
- сообщение с данными таблицы имеет схему данных, соответствующую структуре этой таблицы.
Схема Avro в сообщениях, содержащих данные журнала, имеет следующий вид:
{
"type": "record",
"name": "changeLog",
"fields": [
{
"name": "changeNum",
"type": "long"
},
{
"name": "entityName",
"type": "string"
},
{
"name": "changeQuery",
"type": "string"
}
]
}
Параметры:
changeNum
-
Порядковый номер изменения в журнале.
entityName
-
Имя логической сущности, к которой относится изменение.
changeQuery
-
Запрос на создание, удаление или изменение логической сущности.
Пример схемы Avro в сообщении, содержащем данные логической таблицы:
{
"type": "record",
"name": "row",
"fields": [
{
"name": "id",
"type": ["null","long"],
"doc": ""
},
{
"name": "transaction_date",
"type": ["null","long"],
"doc": ""
},
{
"name": "product_code",
"type": ["null","string"],
"doc": ""
},
{
"name": "product_units",
"type": ["null","long"],
"doc": ""
},
{
"name": "store_id",
"type": ["null","long"],
"doc": ""
},
{
"name": "description",
"type": ["null","string"],
"doc": ""
},
{
"name": "sys_op",
"type": ["null","int"],
"doc": ""
}
]
}
Записи Avro
Пример записей Avro в сообщении с данными журнала:
[
{
"changeNum": 0,
"entityName": "marketing",
"changeQuery": "CREATE TABLE marketing.sales (id INT NOT NULL, transaction_date TIMESTAMP NOT NULL, product_code VARCHAR(256) NOT NULL, product_units INT NOT NULL, store_id INT NOT NULL, description VARCHAR(256), PRIMARY KEY (id)) DISTRIBUTED BY (id))"
},
{
"changeNum": 1,
"entityName": "stores_by_sold_products",
"changeQuery": "CREATE VIEW marketing.stores_by_sold_products AS SELECT store_id, SUM(product_units) AS product_amount FROM marketing.sales GROUP BY store_id ORDER BY product_amount DESC LIMIT 20"
},
{
"changeNum": 2,
"entityName": "stores_by_sold_products",
"changeQuery": "DROP VIEW marketing.stores_by_sold_products"
}
]
Пример записей Avro в сообщении с данными логической таблицы:
[
{
"id": 1000111,
"transaction_date": 1614269474000000,
"product_code": "ABC102101",
"product_units": 2,
"store_id": 1000012345,
"description": "Покупка по акции 1+1",
"sys_op": 0
},
{
"id": 1000112,
"transaction_date": 1614334214000000,
"product_code": "ABC102001",
"product_units": 1,
"store_id": 1000000123,
"description": "Покупка без акций",
"sys_op": 1
}
]