Загрузка данных
Содержание раздела
Система позволяет параллельно загружать большие объемы данных, добавляя, обновляя и удаляя записи. Загружать данные можно в логические таблицы и standalone-таблицы. Загрузка данных в логические и материализованные представления недоступна.
Под большим объемом данных подразумевается количество записей от нескольких сотен до нескольких миллионов. Для загрузки небольшого объема данных можно использовать функцию обновления данных.
Данные загружаются с помощью запроса INSERT SELECT FROM upload_external_table.
Перед запросом (и в некоторых случаях после) нужно выполнить другие действия, описанные ниже. Действия по загрузке данных в логические и standalone-таблицы отличаются, поэтому рассмотрены по отдельности.
Подготовка к загрузке
Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka не настроено автоматическое создание топиков, нужно создать топики вручную. Чтобы создать топик, следуйте любой из инструкций в документации Kafka:
Рекомендации о разделении данных по топикам см. в разделе Внешняя таблица.
Перед загрузкой данных подготовьте данные и логические сущности:
- Загрузите данные, соответствующие формату, из внешней информационной системы в топик Kafka:
- Чтобы добавить или обновить запись логической таблицы, включите в сообщение Kafka запись Avro со значением
sys_op
, равным 0. - Чтобы удалить запись из логической таблицы, включите в сообщение Kafka запись Avro со значением
sys_op
, равным 1. - Чтобы добавить* запись в standalone-таблицу, включите в сообщение Kafka запись Avro без значения
sys_op
.
- Чтобы добавить или обновить запись логической таблицы, включите в сообщение Kafka запись Avro со значением
- Создайте логическую таблицу или standalone-таблицу, если она еще не создана.
- Создайте внешнюю таблицу загрузки, если она еще не создана.
- Если нужно загрузить данные в standalone-таблицу, создайте внешнюю writable-таблицу.
* Обновление и удаление данных standalone-таблиц с помощью функции загрузки данных недоступно. Обновить записи standalone-таблицы можно запросом UPSERT VALUES, удалить — запросом DELETE.
Загрузка данных в логическую таблицу
Чтобы загрузить данные из внешней информационной системы в логическую таблицу:
- Выполните запрос BEGIN DELTA на открытие дельты, если она еще не открыта.
- Выполните запрос INSERT SELECT FROM upload_external_table на загрузку данных. В запросе нужно указать внешнюю таблицу загрузки, определяющую параметры загрузки.
- Если необходимо, загрузите или обновите другие данные.
В открытой дельте можно выполнять множество запросов на обновление и загрузку данных. При этом в каждую логическую таблицу в одной дельте можно добавлять записи с разными значениями первичного ключа или полные дубликаты. - Выполните запрос COMMIT DELTA для сохранения изменений и закрытия дельты.
При успешном выполнении действий состояние данных в логических таблицах обновляется, как описано в разделе Версионирование данных.
Пока дельта не закрыта, внесенные изменения можно отменить запросом ROLLBACK DELTA.
Незавершенную операцию по загрузке данных можно перезапустить или отменить. Подробнее о способах обработки незавершенных операций см. в разделе Управление операциями записи.
Созданные внешние таблицы загрузки можно использовать повторно или удалить.
Загрузка данных в standalone-таблицу
Чтобы загрузить данные из внешней информационной системы в standalone-таблицу, выполните запрос INSERT SELECT FROM upload_external_table на загрузку данных.
В запросе нужно указать внешнюю таблицу загрузки, определяющую параметры загрузки, а также внешнюю writable-таблицу, ссылающуюся на целевую standalone-таблицу.
При загрузке данных в standalone-таблицу нужно учитывать ограничения таблицы в конкретной СУБД.
Примеры
Загрузка в логическую таблицу
-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;
-- создание логической таблицы
CREATE TABLE sales (
id INT NOT NULL,
transaction_date TIMESTAMP NOT NULL,
product_code VARCHAR(256) NOT NULL,
product_units INT NOT NULL,
store_id INT NOT NULL,
description VARCHAR(256),
PRIMARY KEY (id)
)
DISTRIBUTED BY (id);
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
id INT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units INT,
store_id INT,
description VARCHAR(256)
)
LOCATION 'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;
-- открытие новой (горячей) дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты (фиксация изменений)
COMMIT DELTA;
Загрузка в standalone-таблицу
Загрузка в standalone-таблицу ADP, созданную через систему (см. параметр auto.create.table.enable=true
):
-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в ADP
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
id INT NOT NULL,
client_id INT NOT NULL,
number VARCHAR NOT NULL,
signature_date DATE,
effective_date DATE,
closing_date DATE,
description VARCHAR,
PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
id INT NOT NULL,
client_id INT NOT NULL,
number VARCHAR NOT NULL,
signature_date DATE,
effective_date DATE,
closing_date DATE,
description VARCHAR
)
LOCATION 'kafka://zk1:2181,zk2:2181,zk3:2181/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')
-- запуск загрузки данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;
Подробнее об опции auto.create.table.enable
см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.
Загрузка в существующую standalone-таблицу ADG:
-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу ADG
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adg (
id INT NOT NULL,
agreement_id INT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR,
bucket_id INT NOT NULL
)
LOCATION 'core:adg://dtm__marketing__payments';
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id INT NOT NULL,
agreement_id INT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'kafka://$kafka/payments'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');
-- запуск загрузки данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adg
INSERT INTO marketing.payments_ext_write_adg SELECT *, 0 as bucket_id FROM marketing.payments_ext_upload;