Загрузка данных
Содержание раздела
Система позволяет параллельно загружать большие объемы данных, добавляя, обновляя и удаляя записи.
Загружать данные можно в логические таблицы и standalone-таблицы. Загрузка данных в логические и материализованные представления недоступна.
Под большим объемом данных подразумевается количество записей от нескольких сотен до нескольких миллионов. Для загрузки небольшого объема данных можно использовать функцию обновления данных.
Действия по загрузке данных в логические и standalone-таблицы отличаются, поэтому рассмотрены по отдельности. Перед загрузкой данных выполните действия, описанные в секции Подготовка к загрузке.
Подготовка к загрузке
Создание топика (опционально)
Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka выключено автоматическое создание топиков, создайте топики вручную.
Чтобы создать топик, следуйте любой из инструкций в документации Kafka:
Рекомендации о разделении данных по топикам см. в разделе Рекомендации по топикам Kafka.
Подготовка данных
Чтобы подготовить данные, загрузите данные, соответствующие формату, из внешней информационной системы в топик Kafka:
- Чтобы добавить или обновить запись логической таблицы, включите в сообщение Kafka запись Avro со значением
sys_op
, равным 0. - Чтобы удалить запись из логической таблицы, включите в сообщение Kafka запись Avro со значением
sys_op
, равным 1. - Чтобы добавить* запись в standalone-таблицу, включите в сообщение Kafka запись Avro без значения
sys_op
.
* Обновление и удаление данных standalone-таблиц с помощью функции загрузки данных недоступно. Обновить записи standalone-таблицы можно запросом UPSERT VALUES, удалить — запросом DELETE.
Подготовка логических сущностей
Подготовка сущностей для загрузки в логическую таблицу
- Создайте логическую таблицу, если она отсутствует.
- Создайте внешнюю таблицу загрузки, если она отсутствует.
Созданные внешние таблицы загрузки можно использовать повторно или удалить.
Подготовка сущностей для загрузки в standalone-таблицу
- Создайте внешнюю таблицу загрузки, если она отсутствует.
- Создайте внешнюю writable-таблицу, если она отсутствует:
- если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите ключевое слово
OPTIONS
со значениемauto.create.table.enable=true
, - иначе пропустите ключевое слово
OPTIONS
или укажите для него значениеauto.create.table.enable=false
.
- если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите ключевое слово
Созданные внешние таблицы загрузки можно использовать повторно или удалить.
Загрузка данных в логическую таблицу
Чтобы загрузить данные из топика Kafka в логическую таблицу, выполните INSERT SELECT FROM upload_external_table. В запросе укажите внешнюю таблицу загрузки, определяющую параметры загрузки.
Загрузку данных можно включить в дельту. Чтобы загрузить данные в дельте:
- Откройте дельту.
- Загрузите нужные данные.
- Закройте дельту.
Чтобы данные, записанные вне дельт, были доступны во всех видах запросов на чтение и выгрузку данных, их должна замыкать дельта. При этом дельта может быть пустой. Подробнее см. в разделе Операции в дельте и вне дельты.
Загрузка данных в standalone-таблицу
Чтобы загрузить данные из внешней информационной системы в standalone-таблицу, выполните INSERT SELECT FROM upload_external_table.
В запросе укажите внешнюю таблицу загрузки, определяющую параметры загрузки, а также внешнюю writable-таблицу, ссылающуюся на целевую standalone-таблицу.
При загрузке данных в standalone-таблицу учитывайте ограничения таблиц в конкретной СУБД.
Примеры
Загрузка в логическую таблицу
-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;
-- создание логической таблицы
CREATE TABLE sales (
id BIGINT NOT NULL,
transaction_date TIMESTAMP NOT NULL,
product_code VARCHAR(256) NOT NULL,
product_units BIGINT NOT NULL,
store_id BIGINT NOT NULL,
description VARCHAR(256),
PRIMARY KEY (id)
)
DISTRIBUTED BY (id);
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
id BIGINT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units BIGINT,
store_id BIGINT,
description VARCHAR(256)
)
LOCATION 'kafka://zk1:2181,zk2:2181,zk3:2181/sales'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;
-- загрузка данных в логическую таблицу вне дельты
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- открытие новой (горячей) дельты
BEGIN DELTA;
-- загрузка данных в логическую таблицу в дельте
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Загрузка в standalone-таблицу
Создание standalone-таблицы через систему (см. параметр auto.create.table.enable=true
) и загрузка данных в нее:
-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в датасорсе adp
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
id BIGINT NOT NULL,
client_id BIGINT NOT NULL,
number VARCHAR NOT NULL,
signature_date DATE,
effective_date DATE,
closing_date DATE,
description VARCHAR,
PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
id BIGINT NOT NULL,
client_id BIGINT NOT NULL,
number VARCHAR NOT NULL,
signature_date DATE,
effective_date DATE,
closing_date DATE,
description VARCHAR
)
LOCATION 'kafka://$kafka/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')
-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;
Подробнее об опции auto.create.table.enable
см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.
Загрузка в существующую standalone-таблицу:
-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу в датасорсе adg
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adg (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'core:adg://dtm__marketing__payments';
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'kafka://$kafka/payments'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');
-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adg
INSERT INTO marketing.payments_ext_write_adg SELECT * FROM marketing.payments_ext_upload;