Загрузка данных
Содержание раздела
Система позволяет параллельно загружать большие объемы данных, добавляя, обновляя и удаляя записи.
Загружать данные можно в логические таблицы и standalone-таблицы. Загрузка данных в логические и материализованные представления недоступна.
Под большим объемом данных подразумевается количество записей от нескольких сотен до нескольких миллионов. Для загрузки небольшого объема данных можно использовать функцию обновления данных.
Действия по загрузке данных в логические и standalone-таблицы отличаются, поэтому рассмотрены по отдельности. Перед загрузкой данных нужно выполнить действия, описанные в секции Подготовка к загрузке.
Подготовка к загрузке
Создание топика (опционально)
Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka выключено автоматическое создание топиков, нужно создать топики вручную.
Чтобы создать топик, следуйте любой из инструкций в документации Kafka:
Рекомендации о разделении данных по топикам см. в разделе Рекомендации по топикам Kafka.
Подготовка данных
Чтобы подготовить данные, загрузите данные, соответствующие формату, из внешней информационной системы в топик Kafka:
- Чтобы добавить или обновить запись логической таблицы, включите в сообщение Kafka запись Avro со значением
sys_op
, равным 0. - Чтобы удалить запись из логической таблицы, включите в сообщение Kafka запись Avro со значением
sys_op
, равным 1. - Чтобы добавить* запись в standalone-таблицу, включите в сообщение Kafka запись Avro без значения
sys_op
.
* Обновление и удаление данных standalone-таблиц с помощью функции загрузки данных недоступно. Обновить записи standalone-таблицы можно запросом UPSERT VALUES, удалить — запросом DELETE.
Подготовка логических сущностей
Подготовка сущностей для загрузки в логическую таблицу
- Создайте логическую таблицу, если она отсутствует.
- Создайте внешнюю таблицу загрузки, если она отсутствует.
Созданные внешние таблицы загрузки можно использовать повторно или удалить.
Подготовка сущностей для загрузки в standalone-таблицу
- Создайте внешнюю таблицу загрузки, если она отсутствует.
- Создайте внешнюю writable-таблицу, если она отсутствует:
- если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите ключевое слово
OPTIONS
со значениемauto.create.table.enable=true
, - иначе пропустите ключевое слово
OPTIONS
или укажите для него значениеauto.create.table.enable=false
.
- если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите ключевое слово
Созданные внешние таблицы загрузки можно использовать повторно или удалить.
Загрузка данных в логическую таблицу
Чтобы загрузить данные из топика Kafka в логическую таблицу:
- Если в логической БД нет открытой дельты, выполните BEGIN DELTA.
- Выполните INSERT SELECT FROM upload_external_table. В запросе укажите внешнюю таблицу загрузки, определяющую параметры загрузки.
- Если необходимо, загрузите или обновите другие данные.
В открытой дельте можно выполнять множество запросов на обновление и загрузку данных. При этом в каждую логическую таблицу в одной дельте можно добавлять записи с разными значениями первичного ключа или полные дубликаты. - Выполните COMMIT DELTA, чтобы сохранить изменения и закрыть дельту.
При успешном выполнении действий состояние данных в логических таблицах обновляется, как описано в разделе Версионирование данных.
Пока дельта не закрыта, внесенные изменения можно отменить запросом ROLLBACK DELTA.
Загрузка данных в standalone-таблицу
Чтобы загрузить данные из внешней информационной системы в standalone-таблицу, выполните INSERT SELECT FROM upload_external_table.
В запросе укажите внешнюю таблицу загрузки, определяющую параметры загрузки, а также внешнюю writable-таблицу, ссылающуюся на целевую standalone-таблицу.
При загрузке данных в standalone-таблицу учитывайте ограничения таблиц в конкретной СУБД.
Примеры
Загрузка в логическую таблицу
-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;
-- создание логической таблицы
CREATE TABLE sales (
id INT NOT NULL,
transaction_date TIMESTAMP NOT NULL,
product_code VARCHAR(256) NOT NULL,
product_units INT NOT NULL,
store_id INT NOT NULL,
description VARCHAR(256),
PRIMARY KEY (id)
)
DISTRIBUTED BY (id);
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
id INT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units INT,
store_id INT,
description VARCHAR(256)
)
LOCATION 'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;
-- открытие новой (горячей) дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты (фиксация изменений)
COMMIT DELTA;
Загрузка в standalone-таблицу
Загрузка в standalone-таблицу ADP, созданную через систему (см. параметр auto.create.table.enable=true
):
-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в ADP
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
id INT NOT NULL,
client_id INT NOT NULL,
number VARCHAR NOT NULL,
signature_date DATE,
effective_date DATE,
closing_date DATE,
description VARCHAR,
PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
id INT NOT NULL,
client_id INT NOT NULL,
number VARCHAR NOT NULL,
signature_date DATE,
effective_date DATE,
closing_date DATE,
description VARCHAR
)
LOCATION 'kafka://$kafka/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')
-- запуск загрузки данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;
Подробнее об опции auto.create.table.enable
см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.
Загрузка в существующую standalone-таблицу ADG:
-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу ADG
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adg (
id INT NOT NULL,
agreement_id INT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'core:adg://dtm__marketing__payments';
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id INT NOT NULL,
agreement_id INT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'kafka://$kafka/payments'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');
-- запуск загрузки данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adg
INSERT INTO marketing.payments_ext_write_adg SELECT * FROM marketing.payments_ext_upload;