Команда BACKUP_TO_KAFKA

Содержание раздела
  1. Содержимое инкремента
    1. Выгружаемые изменения логической схемы
    2. Выгружаемые изменения данных
  2. Сущности, не подлежащие выгрузке в бэкап
  3. Аутентификация
  4. Порядок работы команды
    1. Выгрузка инкрементов в топик Kafka
    2. Выгрузка инкремента одной дельты
    3. Выгрузка инкрементов всех дельт
  5. Использование команды
    1. Проверка недостающих инкрементов
    2. Выгрузка инкрементов всех дельт
    3. Выгрузка инкремента одной дельты
    4. Снятие блокировки логической БД в случае сбоя
  6. Синтаксис
    1. Параметры
    2. Ключевые слова
      1. --connect
      2. --datasourceType
      3. --topic
      4. --kafkaBrokersCore
      5. --kafkaBrokers
      6. --deltaNum
      7. --partitionsCount
      8. --threadsCount
      9. --chunkSize
      10. --kafkaRetries
  7. Ограничения
  8. Примеры
    1. Выгрузка инкрементов всех дельт
    2. Выгрузка инкремента одной дельты
    3. Пошаговый пример выгрузки бэкапа
      1. Подготовка логической БД
      2. Выгрузка полного бэкапа логической БД
      3. Удаление тестовой логической БД
  9. Варианты вывода команды
    1. Вывод при успешной выгрузке инкрементов
    2. Вывод при неуспешной выгрузке инкрементов
  10. Формат инкрементов в топике Kafka
    1. Формат ключа сообщения
    2. Формат тела сообщения
      1. Схема Avro
      2. Записи Avro

Команда BACKUP_TO_KAFKA выгружает бэкап логической базы данных в топик Kafka.

Бэкап выгружается независимыми частями — инкрементами. За один запуск команды можно выгрузить один или несколько инкрементов. Подробнее см. в секции Использование команды.

Бэкап, состоящий из целостного набора инкрементов, не является полным слепком исходной логической БД, так как не включает внешние таблицы и standalone-таблицы.

Для запуска команды на хосте должна быть установлена среда исполнения Java (JRE) или набор инструментов для разработки на Java (JDK).

Из целостного набора инкрементов можно развернуть резервную копию логической БД или восстановить сбойную логическую БД, используя команду RESTORE_FROM_KAFKA.

Содержимое инкремента

По каждой дельте команда выгружает инкремент, который содержит следующие объекты:

Изменения данных представляют собой полную историю изменений данных логических таблиц, включая холодные данные. Партиционированные данные выгружаются с разделением на партиции — таким же образом, как они разделены в исходной логической БД.

Инкремент содержит изменения данных, а не снимок состояния данных. Например, если в дельте 1 в таблицу добавили записи А и Б, а в дельте 2 удалили запись А, то инкремент дельты 1 будет содержать записи А и Б с признаком добавления (sys_op = 0), а инкремент дельты — запись А с признаком удаления (sys_op = 1). Записи Б в инкременте дельты 2 не будет, так как она не изменялась и не удалялась.

Номера и метки времени дельт в инкрементах соответствуют исходной логической БД. Сохранение нумерации операций записи (CN) не гарантируется.

Выгружаемые изменения логической схемы

Изменения логической схемы выгружаются для всех видов сущностей, присутствующих в журнале:

Выгружаемые изменения данных

Изменения данных выгружаются для следующих видов логических таблиц:

Данные материализованных представлений не выгружаются в составе инкрементов: они автоматически восстанавливаются системой после того, как команда RESTORE_FROM_KAFKA успешно развернет таблицы, на которых построено представление. Другие сущности, которые включаются в инкременты, — партиционированные таблицы и логические представления — не хранят данные и поэтому не требуют выгрузки данных; для них выгружаются только изменения схемы данных.

Сущности, не подлежащие выгрузке в бэкап

Внешние таблицы и standalone-таблицы не включаются в бэкап. При необходимости вы можете их пересоздать после успешного развертывания логической БД с помощью команды RESTORE_FROM_KAFKA.

Аутентификация

Команда поддерживает передачу токена (JWT) для последующей аутентификации в системе.

Если в системе включена аутентификация запросов, необходимо указать информацию о токене в составе строки подключения команды. Подробнее о формате строки подключения см. в секции Синтаксис.

Порядок работы команды

Выгрузка инкрементов в топик Kafka

Инкременты выгружаются в топик Kafka в формате Avro и имеют структуру, описанную в секции Формат инкрементов в топике Kafka. Команда выгружает инкременты в топик следующего брокера Kafka:

  • указанного в команде — если команда содержит ключевое слово kafkaBrokers;
  • заданного в конфигурации с помощью параметра ZOOKEEPER_KAFKA_ADDRESS — если команда не содержит ключевое слово kafkaBrokers.

Инкременты выгружаются одним или несколькими потоками — в зависимости от значения ключевого слова threadsCount. По умолчанию, если ключевое слово не указано, используется один поток.

При выгрузке в новый топик он разделяется на заданное количество партиций. Количество партиций в топике определяется:

  • настройками Kafka — если в брокере сообщений Kafka настроено автосоздание топиков;
  • ключевым словом partitionsCount — если автосоздание топиков отключено.

Выгрузка инкремента одной дельты

Если в команде указан номер дельты, в топик выгружается инкремент этой дельты.

На рисунке ниже показан пример выгрузки инкремента дельты с номером 1. Инкремент содержит изменения данных w2 и w3, внесенные между закрытием дельт 0 и 1, и все записи журнала по изменению схемы с с0 по с3 включительно.

Схема выгрузки инкремента одной дельты

Выгрузка инкрементов всех дельт

Если в команде не указан номер дельты, в топик выгружаются инкременты следующих дельт логической БД:

  • всех закрытых дельт — если в топике, куда выгружаются данные, нет ни одного инкремента этой логической БД;
  • «новых» закрытых дельт — если в топике уже есть хотя бы один инкремент этой логической БД. Под «новыми» закрытыми дельтами понимаются те дельты, чьи номера больше максимального номера дельты в топике. Промежуточные дельты, если какие-то из них отсутствуют в топике, НЕ выгружаются.

На рисунке ниже показан пример выгрузки инкрементов всех дельт в новый топик. В этом случае команда выгружает все изменения данных и все записи журнала.

Схема выгрузки инкрементов всех дельт

На рисунке ниже показан пример выгрузки инкрементов всех дельт в топик, где уже есть инкремент дельты 1 той же логической БД. В этом случае команда выгружает изменения данных w4 и w5 и все записи журнала.

Схема выгрузки инкрементов новых дельт

Использование команды

Проверка недостающих инкрементов

Чтобы проверить список дельт, инкрементов которых не хватает в топике до полного бэкапа, выполните команду RESTORE_FROM_KAFKA с ключевым словом readonly.

Выгрузка инкрементов всех дельт

Чтобы выгрузить инкременты всех или только недостающих закрытых дельт:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA без ключевого слова deltaNum. В команде укажите ключевое слово topic со следующим значением:
    • если нужно выгрузить полный бэкап с нулевой до последней закрытой дельты, укажите топик, в который раньше не выгружались инкременты этой логической БД. При этом топик может содержать данные других логических БД;
    • если нужно обновить существующий бэкап последними изменениями, укажите топик, содержащий инкременты нужной логической БД.

Выгрузка инкремента одной дельты

Чтобы выгрузить инкремент одной дельты:

  1. Если вы работаете с Prostore версии 5.4 или ниже, выполните действия, описанные в разделе Команда POPULATE_CHANGELOG > Заполнение журнала.
  2. Выполните команду BACKUP_TO_KAFKA с ключевым словом deltaNum.

Снятие блокировки логической БД в случае сбоя

Если во время работы команды произошел сбой, исходная логическая БД может остаться заблокированной для изменений схемы данных. В этом случае все DDL-запросы в логической БД будут возвращать ошибку Change operations are forbidden.

Чтобы снять блокировку изменений в логической БД, выполните запрос ALLOW_CHANGES, где в качестве кода-пароля в запросе укажите строку в формате backup_<имя_логической_БД>_<число_месяца_запуска_команды>. Например, если команда была запущена для логической БД marketing 9 февраля, следует выполнить запрос ALLOW_CHANGES(marketing, 'backup_marketing_9').

После снятия блокировки можно запустить выгрузку бэкапа еще раз.

Синтаксис

java -jar <dtm_tools_file_name>.jar \
--connect <prostore_host>:<prostore_port>/<db_name>[?user=jwt&password=<jwt_token>] \
backup_to_kafka \
[--datasourceType <datasource_name>] \
--topic <backup_topic> \
[–-kafkaBrokersCore <core_kafka_brokers_list>] \
[--kafkaBrokers <backup_kafka_broker_list>] \
[--deltaNum <delta_num>] \
[--partitionsCount <partitions_count>] \
[--threadsCount <threads_count>] \
[--chunkSize <chunk_size>] \
[--kafkaRetries <kafka_retries>]

Параметры

dtm_tools_file_name

Имя jar-файла утилиты DTM Tools.

Ключевые слова

--connect

Задает строку подключения к ноде Prostore. Значение состоит из следующих элементов:

  1. prostore_host — IP-адрес или доменное имея ноды Prostore;
  2. db_name — номер порта для подключения к ноде Prostore, равный значению параметра конфигурации DTM_CORE_HTTP_PORT;
  3. db_name — имя логической БД, для которой нужно выгрузить инкременты;
  4. (если включена аутентификация запросов) строка ?user=jwt&password=<jwt_token>, где jwt_token — авторизационный токен.

--datasourceType

Задает имя датасорса-источника, из которого выгружаются данные. Значение указывается без кавычек и должно соответствовать имени датасорса, заданному в конфигурации.

Если ключевое слово не указано, данные каждой логической таблицы выгружаются из наиболее оптимального датасорса. Запрос на выгрузку инкрементов относится к категории «Реляционный» и подкатегории «Все узлы».

--topic

Задает имя топика Kafka, куда выгружаются инкременты.

--kafkaBrokersCore

Задает список адресов брокеров сообщений Kafka, с которыми работает Prostore, и служит для ускорения работы команды.

Если ключевое слово не указано, команда запрашивает список адресов из конфигурации ноды Prostore.

--kafkaBrokers

Задает список адресов брокеров сообщений Kafka, содержащих топик, куда выгружаются инкременты. Значение указывается в формате host1:port1,host2:port2,..hostN:portN.

Ключевое слово нужно указывать, если список брокеров отличается от значения параметра конфигурации ZOOKEEPER_KAFKA_ADDRESS.

--deltaNum

Задает номер дельты, инкремент которой выгружается в топик, и используется для выгрузки инкремента одной дельты.

Если ключевое слово не указано, выгружаются инкременты всех «новых» дельт — дельт, чьи номера больше максимального номера дельты среди инкрементов логической БД, уже хранящихся в топике.

--partitionsCount

Задает количество партиций, создаваемых при выгрузке бэкапа в новый топик. Разделение топика на партиции позволяет ускорить выгрузку бэкапа за счет параллельных процессов выгрузки. Значение учитывается, если топик создается командой, то если в брокере Kafka отключено автосоздание топиков.

Если ключевое слово не указано, все данные выгружаются в одну партицию.

--threadsCount

Задает количество параллельных процессов выгрузки бэкапа и, тем самым, позволяет подобрать оптимальную скорость выгрузки в зависимости от производительности инфраструктуры.

Если ключевое слово не указано, все данные выгружаются одним потоком.

--chunkSize

Задает максимальное количество записей, сохраняемых в одном сообщении Kafka.

Если ключевое слово не указано, в сообщение записывается до 1000 записей.

--kafkaRetries

Задает количество попыток выгрузки бэкапа в топик, которое команда делает при запуске. Если ключевое слово не указано, делается 10 попыток.

Ограничения

  • Запуск команды недоступен, если в исходной логической БД есть незавершенные операции по изменению схемы данных.
  • Во время работы команды недоступно изменение схемы данных логической БД, для которой выгружается бэкап.
  • Команда не проверяет полноту журнала логической БД.
  • Выгрузка инкрементов запускается только при наличии в логической БД более новых закрытых дельты, чем те, которые уже хранятся в топике. Новые изменения данных без меток времени и изменения схемы данных при отсутствии новых дельт не приводят к выгрузке инкрементов.
  • Значение ключевого слова partitions_count игнорируется, если в брокере сообщений Kafka включено автосоздание топиков или бэкап выгружается в существующий топик.

Примеры

Выгрузка инкрементов всех дельт

Выгрузка инкрементов всех дельт логической БД marketing из ADQM с выгрузкой в два потока и распределением на три партиции:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2

Выгрузка инкрементов с указанием токена:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing?user=jwt&password=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c backup_to_kafka --datasourceType adqm --topic marketing_backup --partitionsCount 3 --threadsCount 2

Выгрузка инкремента одной дельты

Выгрузка инкремента дельты 12 логической БД marketing с указанием списка брокеров Kafka:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/marketing backup_to_kafka --topic marketing_backup --kafkaBrokers 10.123.2.43:9092,10.123.2.44:9092 --deltaNum 12 --kafkaRetries 5

Пошаговый пример выгрузки бэкапа

Подготовка логической БД

Создание и наполнение тестовой логической базы данных sourcedb:

CREATE DATABASE sourcedb;
USE sourcedb;
CREATE TABLE sourcedb.table1_adqm
    (
      id bigint not null,
      boolean_col boolean,
      int32_col int32,
      int_col int,
      bigint_col bigint,
      float_col float,
      double_col double,
      char_col char(10),
      varchar_col varchar,
      uuid_col uuid,
      link_col link,
      date_col date,
      time_col time,
      timestamp_col timestamp,
      PRIMARY KEY (id)
    )
DISTRIBUTED BY (id)
DATASOURCE_TYPE ('adqm');
 
BEGIN DELTA;
INSERT INTO sourcedb.table1_adqm
VALUES (1, false, 32768, 65537, 1280000, 3.141526, 2.718281828,
 'char_value', 'varchar_value','12345678-1234-abcd-ef00-1234567899ab',
 'https://www.google.com', '1970-01-11', '13:10:30', '1970-01-01 13:10:30');
COMMIT DELTA;

Выгрузка полного бэкапа логической БД

Выгрузка полного бэкапа (инкрементов всех закрытых дельт) логической БД sourcedb из датасорса adqm с использованием ноды Prostore, доступной по адресу 10.92.3.86:9090:

java -jar dtm-tools-1.19.jar --connect 10.92.3.86:9090/sourcedb backup_to_kafka --datasourceType adqm --topic backup_topic_sourcedb --partitionsCount 1 --threadsCount 1

Бэкап выгружается одним потоком и записывается в одну партицию топика backup_topic_sourcedb.

Удаление тестовой логической БД

Опционально после окончания выгрузки бэкапа можно удалить исходную базу данных, чтобы протестировать ее последующее восстановление из бэкапа, сохраненного в топике backup_topic_sourcedb.

DROP DATABASE sourcedb

Пошаговый пример, продолжающий этот пример, см. в разделе Команда RESTORE_FROM_KAFKA > Пошаговый пример развертывания логической БД из бэкапа.

Варианты вывода команды

Вывод при успешной выгрузке инкрементов

Ниже показан примерный вид вывода команды при успешной выгрузке инкрементов (набор сообщений и их текст могут изменяться от версии к версии):

...
Started BACKUP process with params ...
...
Deny changes in datamart [<db_name>] with code [backup_<db_name>_<date_of_the_month>]
Delta range ... to backup
Got changes for datamart <db_name>
Tables received for datamart ...
Started backup delta processing
...
Got delta date for datamart ..., delta num ...
...
Started backup delta table processing
...
Topic ... with ... partitions successfully created
...
Successfully created download ext table ...
...
Data inserted into kafka ...
...
Successfully dropped download ext table ...
...
Successfully expload delta table to kafka ...
...
Started archiving delta table
...
Polled message 0 from source topic ...
Trying to send data message ... to topic [<topic_defined_in_the_command>] ...
...
Successfully archived delta table to backup topic <topic_defined_in_the_command>
Topic ... successfully deleted
Start archiving delta changelog ... into topic <topic_defined_in_the_command>
Trying to send changelog message ... to topic [<topic_defined_in_the_command>] ...
Successfully archived delta table to backup topic <topic_defined_in_the_command>
...
Allow changes in datamart <db_name>
BACKUP is done

Вывод при неуспешной выгрузке инкрементов

При неуспешной выгрузке инкрементов команда выводит текст ошибки.

Формат инкрементов в топике Kafka

Инкремент выгружается в топик в виде сообщений Kafka.

Каждое сообщение состоит из ключа и тела и содержит данные логической таблицы или журнала. Ключ сообщения состоит из схемы данных Avro и одной записи Avro и содержит метаинформацию инкремента в формате, описанном ниже в секции Формат ключа сообщения. Тело сообщения представляет собой файл Avro (Object Container File), состоящий из заголовка со схемой данных Avro и записей Avro.

Схема данных в ключе сообщения одинакова для всех сообщений, а схема данных в теле сообщения зависит от типа сообщения (см. рисунок ниже).

Для наглядности все бинарные данные Avro на рисунке и в примерах ниже представлены в JSON-формате.

На рисунке ниже показаны составные части сообщения Kafka, содержащего данные инкремента.

Формат сообщения Kafka с данными инкремента

Формат ключа сообщения

Схема Avro в ключе сообщения имеет следующий формат:

{
  "name": "topicA",
  "type": "record",
  "fields": [
    {
      "name": "incrementId",
      "type": "string"
    },
    {
      "name": "messageType",
      "type": "long"
    },
    {
      "name": "datamart",
      "type": "string"
    },
    {
      "name": "deltaNum",
      "type": "long"
    },
    {
      "name": "deltaDate",
      "type": "long",
      "logicalType": "timestamp-micros"
    },
    {
      "name": "tableName",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "streamNumber",
      "type": "long"
    },
    {
      "name": "streamTotal",
      "type": "long"
    },
    {
      "name": "chunkNumber",
      "type": "long"
    },
    {
      "name": "isLastChunk",
      "type": "boolean"
    }
  ]
}

Параметры:

incrementId

Идентификатор инкремента, уникальный для каждого запуска команды.

messageType

Тип сообщения. Возможные значения: 0 — сообщение с данными логической таблицы, 1 — сообщение с данными журнала.

datamart

Имя логической базы данных, к которой относится инкремент.

deltaNum

Номер дельты, к которой относятся данные.

deltaDate

Дата и время закрытия дельты deltaNum в виде целого числа.

tableName

Имя логической таблицы, из которой выгружены данные. В сообщениях с данными журнала поле имеет значение null.

streamNumber

Номер потока, записавшего сообщение. Потоки нумеруются с 1.

streamTotal

Общее число потоков записи сообщений.

chunkNumber

Номер сообщения среди всех сообщений потока streamNumber. Сообщения нумеруются с 1.

isLastChunk

Признак последнего сообщения в потоке streamNumber. Возможные значения: true, false.

Пример записи Avro в ключе сообщения:

{
  "incrementId": "f81d4fae-7dec-11d0-a765-00a0c91e6bf6",
  "messageType": 0,
  "datamart": "marketing",
  "deltaNum": 55,
  "deltaDate": 1638360336000000,
  "tableName": "stores",
  "streamNumber": 1,
  "streamTotal": 2,
  "chunkNumber": 4,
  "isLastChunk": false
}

Формат тела сообщения

Схема Avro

Состав схемы данных Avro в теле сообщения зависит от типа сообщения:

  • все сообщения с данными журнала имеют одинаковую схему данных,
  • сообщение с данными логической таблицы имеет схему данных, соответствующую структуре этой таблицы.

Схема Avro в сообщениях, содержащих данные журнала, имеет следующий вид:

{
  "type": "record",
  "name": "changeLog",
  "fields": [
    {
      "name": "changeNum",
      "type": "long"
    },
    {
      "name": "entityName",
      "type": "string"
    },
    {
      "name": "changeQuery",
      "type": "string"
    }
  ]
}

Параметры:

changeNum

Порядковый номер изменения в журнале.

entityName

Имя логической сущности, к которой относится изменение.

changeQuery

Запрос на создание, удаление или изменение логической сущности.

Пример схемы Avro в сообщении, содержащем данные логической таблицы:

{
  "type": "record",
  "name": "row",
  "fields": [
    {
      "name": "id",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "transaction_date",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "product_code",
      "type": ["null","string"],
      "doc": ""
    },
    {
      "name": "product_units",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "store_id",
      "type": ["null","long"],
      "doc": ""
    },
    {
      "name": "description",
      "type": ["null","string"],
      "doc": ""
    },
    {
      "name": "sys_op",
      "type": ["null","int"],
      "doc": ""
    }
  ]
}

Записи Avro

Пример записей Avro в сообщении с данными журнала:

[
  {
    "changeNum": 0,
    "entityName": "marketing",
    "changeQuery": "CREATE TABLE marketing.sales (id INT NOT NULL, transaction_date TIMESTAMP NOT NULL, product_code VARCHAR(256) NOT NULL, product_units INT NOT NULL, store_id INT NOT NULL, description VARCHAR(256), PRIMARY KEY (id)) DISTRIBUTED BY (id))"
  },
  {
    "changeNum": 1,
    "entityName": "stores_by_sold_products",
    "changeQuery": "CREATE VIEW marketing.stores_by_sold_products AS SELECT store_id, SUM(product_units) AS product_amount FROM marketing.sales GROUP BY store_id ORDER BY product_amount DESC LIMIT 20"
  },
  {
    "changeNum": 2,
    "entityName": "stores_by_sold_products",
    "changeQuery": "DROP VIEW marketing.stores_by_sold_products"
  }
]

Пример записей Avro в сообщении с данными логической таблицы:

[
  {
    "id": 1000111,
    "transaction_date": 1614269474000000,
    "product_code": "ABC102101",
    "product_units": 2,
    "store_id": 1000012345,
    "description": "Покупка по акции 1+1",
    "sys_op": 0
  },
  {
    "id": 1000112,
    "transaction_date": 1614334214000000,
    "product_code": "ABC102001",
    "product_units": 1,
    "store_id": 1000000123,
    "description": "Покупка без акций",
    "sys_op": 1
  }
]