Как загрузить данные
Содержание раздела
Загружать данные из топика Kafka можно в следующие логические сущности:
Перед загрузкой данных выполните действия, описанные в секции Подготовка к загрузке.
Об успешности загрузки данных можно судить по успешному ответу на запрос загрузки данных. Другие факторы такие, как смещение офсетов в топике или наличие данных в физической таблице <table>_staging, не означают успешность загрузки данных.
Подготовка к загрузке
Создание топика (опционально)
Данные загружаются в систему из сообщений топиков Kafka. Поэтому, если в брокере сообщений Kafka выключено автоматическое создание топиков, создайте топики вручную.
Чтобы создать топик, следуйте любой из инструкций в документации Kafka:
Рекомендации о разделении данных по топикам см. в разделе Рекомендации по топикам Kafka.
Подготовка данных
Чтобы подготовить данные к загрузке:
- Используя средства внешней системы, приведите данные в формат, подходящий для загрузки.
- Убедись, что каждая запись Avro содержит следующее значения
sys_op
, или добавьте его, если оно отсутствует:- 0 — если нужно добавить или обновить запись логической таблицы;
- 1 — если нужно удалить запись логической таблицы;
- пустое значение — если нужно добавить, обновить или удалить запись прокси- или standalone-таблицы. Обновление и удаление записей прокси- или standalone-таблиц таким способом доступно только для СУБД ADP.
- Загрузите подготовленные данные в топик Kafka.
Система не валидирует данные при их загрузке. Данные должны быть проверены на соответствие формату загрузки и, если нужно, скорректированы внешней системой перед их загрузкой в Prostore.
Подготовка логических сущностей
Подготовка сущностей для загрузки в логическую таблицу
- Создайте логическую таблицу, если она отсутствует.
- Создайте внешнюю таблицу для загрузки данных, если она отсутствует:
- readable-таблицу — если данные будут загружаться в датасорсы типа ADP и инсталляция содержит коннектор Kafka Jet writer;
- таблицу загрузки — иначе.
После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.
Подготовка сущностей для загрузки в прокси-таблицу
- Создайте прокси-таблицу, если она отсутствует.
- Создайте внешнюю таблицу для загрузки данных, если она отсутствует:
- readable-таблицу — если данные будут загружаться в датасорсы типа ADP и инсталляция содержит коннектор Kafka Jet writer;
- таблицу загрузки — иначе.
После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.
Подготовка сущностей для загрузки в standalone-таблицу
- Создайте внешнюю таблицу для загрузки данных, если она отсутствует:
- readable-таблицу — если данные будут загружаться в датасорс типа ADP и инсталляция содержит коннектор Kafka Jet writer;
- таблицу загрузки — иначе.
- Создайте внешнюю writable-таблицу, если она отсутствует:
- если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите в запросе ключевое слово
OPTIONS
со значениемauto.create.table.enable=true
, - иначе выполните запрос без ключевого слова
OPTIONS
.
- если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите в запросе ключевое слово
После завершения загрузки созданные внешние таблицы можно использовать повторно или удалить.
Загрузка данных
Чтобы загрузить данные:
- Если необходимо загрузить данные в логическую таблицу и включить операцию загрузки в дельту, а дельта еще не открыта, откройте ее.
- Выполните запрос INSERT SELECT FROM external_table:
- чтобы выполнить запрос в синхронном режиме, отправьте его по JDBC или по HTTP с помощью POST-метода query без параметра async (или с параметром
async
, имеющим значениеfalse
); - чтобы выполнить запрос в асинхронном режиме, отправьте его по HTTP с помощью POST-метода query, где в теле сообщения укажите параметр async со значением
true
.
- чтобы выполнить запрос в синхронном режиме, отправьте его по JDBC или по HTTP с помощью POST-метода query без параметра async (или с параметром
- Если данные загружались в логическую таблицу в рамках дельты и эта операция обновления должна быть последней в дельте, закройте дельту.
При загрузке данных в прокси-таблицы и standalone-таблицы учитывайте ограничения таблиц целевой СУБД.
Примеры
Загрузка в логическую таблицу
-- создание логической таблицы
CREATE TABLE marketing.sales (
id BIGINT NOT NULL,
transaction_date TIMESTAMP NOT NULL,
product_code VARCHAR(256) NOT NULL,
product_units BIGINT NOT NULL,
store_id BIGINT NOT NULL,
description VARCHAR(256),
PRIMARY KEY (id)
)
DISTRIBUTED BY (id);
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.sales_ext_upload (
id BIGINT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units BIGINT,
store_id BIGINT,
description VARCHAR(256)
)
LOCATION 'kafka_brokers://10.129.0.22:9092,10.129.0.123:9092/sales_in'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;
-- загрузка данных в логическую таблицу вне дельты
INSERT INTO marketing.sales SELECT * FROM marketing.sales_ext_upload;
-- выбор логической базы данных marketing в качестве базы данных по умолчанию
USE marketing;
-- открытие новой дельты
BEGIN DELTA;
-- загрузка данных в логическую таблицу в дельте
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Загрузка в прокси-таблицу
-- создание внешней readable-таблицы для загрузки данных
CREATE READABLE EXTERNAL TABLE marketing.payments_ext_upload_to_proxy (
id BIGINT,
agreement_id BIGINT,
type_code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR,
PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'kafka://$kafka/payments_adp_in'
FORMAT 'AVRO';
-- загрузка данных в прокси-таблицу payments_proxy
INSERT INTO marketing.payments_proxy SELECT * FROM marketing.payments_ext_upload_to_proxy;
Загрузка в standalone-таблицу
Создание standalone-таблицы с помощью системы (см. параметр auto.create.table.enable=true
) и загрузка данных в созданную таблицу:
-- создание внешней writable-таблицы с созданием связанной standalone-таблицы в датасорсе adp
CREATE WRITABLE EXTERNAL TABLE marketing.agreements_ext_write_adp (
id BIGINT NOT NULL,
client_id BIGINT NOT NULL,
number VARCHAR NOT NULL,
signature_date DATE,
effective_date DATE,
closing_date DATE,
description VARCHAR,
PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'core:adp://marketing.agreements'
OPTIONS ('auto.create.table.enable=true');
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.agreements_ext_upload (
id BIGINT NOT NULL,
client_id BIGINT NOT NULL,
number VARCHAR NOT NULL,
signature_date DATE,
effective_date DATE,
closing_date DATE,
description VARCHAR
)
LOCATION 'kafka://$kafka/agreements'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false')
-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица agreements_ext_write_adp
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;
Подробнее об опции auto.create.table.enable
см. в разделах CREATE WRITABLE EXTERNAL TABLE и CREATE READABLE EXTERNAL TABLE.
Загрузка в существующую standalone-таблицу:
-- создание внешней writable-таблицы, указывающей на существующую standalone-таблицу в датасорсе adqm
CREATE WRITABLE EXTERNAL TABLE marketing.payments_ext_write_adqm (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'core:adqm://dtm__marketing.payments';
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE marketing.payments_ext_upload (
id BIGINT NOT NULL,
agreement_id BIGINT,
code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR
)
LOCATION 'kafka://$kafka/payments_in'
FORMAT 'AVRO'
OPTIONS ('auto.create.sys_op.enable=false');
-- загрузка данных в standalone-таблицу, на которую указывает внешняя writable-таблица payments_ext_write_adqm
INSERT INTO marketing.payments_ext_write_adqm SELECT * FROM marketing.payments_ext_upload;