Загрузка данных
Система позволяет параллельно загружать большие объемы данных. Данные можно загружать только в логические таблицы. Загрузка данных в логические и материализованные представления недоступна.
Под большим объемом данных подразумевается количество записей от нескольких сотен до нескольких миллионов. Для загрузки небольшого объема данных можно использовать функцию обновления данных.
Данные загружаются в виде сообщений Kafka, поэтому для их загрузки нужен топик Kafka. Если в брокере сообщений Kafka настроено автоматическое создание топиков, то дополнительные действия не требуются. Иначе топик необходимо создать, если он отсутствует. Подробнее о создании топиков см. в документации Kafka:
- раздел Quick Start,
- раздел Adding and removing topics.
Чтобы загрузить данные из внешней информационной системы в логическую таблицу:
- Загрузите данные из внешней информационной системы в топик Kafka.
Данные должны иметь формат, описанный в разделе Формат загрузки данных. - Создайте логическую таблицу, если она еще не создана.
- Создайте внешнюю таблицу загрузки, если она еще не создана.
- Выполните запрос BEGIN DELTA на открытие дельты, если она еще не открыта.
- Выполните запрос INSERT INTO logical_table на загрузку данных из топика в логическую таблицу. В запросе нужно указать внешнюю таблицу загрузки, определяющую параметры загрузки.
- Если необходимо, загрузите и (или) обновите другие данные.
В открытой дельте можно выполнить произвольное количество запросов на обновление и загрузку данных, а также отменить изменения. При этом не допускается добавление информации о различных состояниях одного объекта (то есть различных записей с одинаковым первичным ключом) в одной дельте. - Выполните запрос COMMIT DELTA для сохранения изменений и закрытия дельты.
При успешном выполнении действий состояние данных системы обновляется, как описано в разделе Версионирование данных.
Пока дельта не закрыта, изменения данных этой дельты можно отменить с помощью запроса ROLLBACK DELTA. Созданные внешние таблицы загрузки можно использовать повторно или удалить.
Примеры
-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;
-- создание логической таблицы sales
CREATE TABLE sales (
id INT NOT NULL,
transaction_date TIMESTAMP NOT NULL,
product_code VARCHAR(256) NOT NULL,
product_units INT NOT NULL,
store_id INT NOT NULL,
description VARCHAR(256),
PRIMARY KEY (id)
)
DISTRIBUTED BY (id);
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
id INT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units INT,
store_id INT,
description VARCHAR(256)
)
LOCATION 'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;
-- открытие новой (горячей) дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты (фиксация изменений)
COMMIT DELTA;