INSERT SELECT FROM external_table
Содержание раздела
Поддерживается в версиях: 7.0 / 6.12 / 6.11 / 6.10 / 6.9 / 6.8 / 6.7 / 6.6 / 6.5 / 6.4 / 6.3 / 6.2 / 6.1 / 6.0 / 5.8 / 5.7 / 5.6 / 5.5 / 5.4 / 5.3 / 5.2 / 5.1 / 5.0.
Запрос загружает данные из топика Kafka в указанную таблицу: логическую таблицу, прокси-таблицу или standalone-таблицу. Путь к топику задается с помощью внешней таблицы, указываемой в запросе.
Об успешности загрузки данных можно судить по успешному ответу на запрос загрузки данных. Другие факторы такие, как смещение офсетов в топике или наличие данных в физической таблице <table>_staging, не означают успешность загрузки данных.
При загрузке данных в прокси-таблицы и standalone-таблицы учитывайте ограничения таблиц целевой СУБД.
Перемещение данных напрямую между двумя топиками Kafka невозможно средствами системы. Если такое перемещение необходимо, вы можете загрузить данные из топика в таблицу и затем выгрузить данные из таблицы в другой топик.
Требования к загружаемым данным
Данные в топике должны иметь формат, подходящий для загрузки. Валидация и, если нужно, корректировка данных должна проводиться внешней системой до загрузки данных в топик.
Способы загрузки данных
Загрузить данные можно с помощью внешней таблицы загрузки или внешней readable-таблицы, как описано см. в разделе Работа с данными > Загрузка данных из Kafka > Как загрузить данные.
При использовании внешней readable-таблицы возможна частичная загрузка данных из топика с помощью ключевых слов WHERE, LIMIT, OFFSET.
Загрузка данных с использованием внешней readable-таблицы возможна только в СУБД ADP. Для этого способа загрузки нужен установленный коннектор Kafka Jet writer.
Особенности загрузки данных в standalone-таблицы
Синтаксис загрузки в standalone-таблицу подразумевает использование внешней writable-таблицы, которая указывает на нужную standalone-таблицу.
Консьюмер-группы и офсеты Kafka
Данные загружаются из Kafka в датасорсы с помощью коннекторов, каждый из которых использует свою консьюмер-группу.
Каждый коннектор читает данные с последнего смещения (офсета) своей группы и обновляет его по ходу загрузки. При сбое и откате операций записи смещение не откатывается — оно остается на значении, прочитанном к моменту сбоя.
Для коннекторов одного типа используйте разные имена консьюмер-групп, чтобы избежать конкуренции за топик и нестабильной загрузки.
Перезапуск и отмена операций
Незавершенную операцию записи, сформированную системой при обработке запроса, можно перезапустить или отменить. Подробнее о способах обработки незавершенных операций см. в разделе Управление операциями записи.
Статистика
Запросы INSERT SELECT FROM external_table учитываются в категории статистики UPLOAD. Статистика доступна с помощью запроса GET_ENTITY_STATISTICS и GET-методов получения статистики.
Синтаксис
Загрузка с помощью внешней таблицы загрузки:
[RETRY] INSERT INTO [db_name.]table_name SELECT { * | column_name, ... }
FROM [db_name.]upload_ext_table_name
Загрузка с помощью внешней readable-таблицы:
[RETRY] INSERT INTO [db_name.]table_name SELECT { * | column_name, ... }
FROM [db_name.]readable_ext_table_name
[WHERE condition]
[LIMIT count] [OFFSET start]
Параметры:
db_name-
Имя логической базы данных. Опционально, если выбрана логическая БД, используемая по умолчанию.
table_name-
Имя таблицы-приемника данных. Возможные значения:
- имя логической таблицы,
- имя прокси-таблицы,
- имя внешней writable-таблицы, указывающей на нужную standalone-таблицу.
Логическая таблица может быть любого вида.
Ключевое слово SELECT { * | column_name, ... }
Задает имена всех столбцов внешней таблицы, определяющей параметры загрузки. Имена, типы и порядок указанных столбцов должны соответствовать именам, типам и порядку столбцов (полей) в таблице-приемнике данных, а также типам и порядку полей в топике Kafka, из которого загружаются данные.
Ключевое слово FROM [db_name.]{upload_ext_table_name | readable_ext_table_name}
Задает имя внешней таблицы загрузки или внешней readable-таблицы, определяющей параметры загрузки.
Сообщения Kafka, загружаемые в логические таблицы, должны содержать служебное поле sys_op с типом avro.int. Остальные поля сообщений должны соответствовать полям внешней таблицы, используемой для вычитки сообщений Kafka, по порядку и типам. В свою очередь, поля внешней таблицы должны соответствовать полям таблицы-приемника данных по порядку, именам и типам.
Подробнее о требованиях к загружаемым данным см. в разделе Формат загрузки данных.
Ключевое слово WHERE condition
Задает условия выбора данных из топика при их загрузке в таблицу. Поддерживается только в запросах на загрузку данных с помощью внешней readable-таблицы.
Значение может содержать выражения:
>,>=,<,<=,=,<>,IS NULL,IS NOT NULL,AND,OR.
Если ключевое слово не указано, из топика загружаются все непрочитанные ранее данные.
Ключевое слово LIMIT count
Задает количество строк, загружаемых из топика в таблицу. Поддерживается только в запросах на загрузку данных с помощью внешней readable-таблицы.
Значение ключевого слово не влияет на количество сообщений, считываемых из топика, — при обработке каждого запроса коннектор считывает все непрочитанные ранее сообщения из топика.
Поддерживаемые сочетания LIMIT и OFFSET:
LIMIT,LIMIT count OFFSET start.
Строки загружаются без гарантий сохранения порядка их следования, поэтому рекомендуется использовать загрузку с LIMIT и OFFSET только в тестовых целях.
Если ключевое слово не указано, в таблицу загружаются все данные топика, которые не были загружены ранее.
Ключевое слово OFFSET start
Задает количество строк, пропускаемых при загрузке данных из топика в таблицу. Поддерживается только в запросах на загрузку данных с помощью внешней readable-таблицы.
Значение ключевого слово не влияет на количество сообщений, считываемых из топика, — при обработке каждого запроса коннектор считывает все непрочитанные ранее сообщения из топика.
Поддерживаемые сочетания LIMIT и OFFSET:
LIMIT,LIMIT count OFFSET start.
Строки загружаются без гарантий сохранения порядка их следования, поэтому рекомендуется использовать загрузку с LIMIT и OFFSET только в тестовых целях.
Если ключевое слово не указано, в таблицу загружаются все данные топика, которые не были загружены ранее.
Ключевое слово RETRY
Перезапускает обработку незавершенной операции записи со статусом 0 («Выполняется») и поддерживается только для логических таблиц. Подробнее обо всех способах перезапуска и отмены операций см. в разделе Управление операциями записи.
Запрос на загрузку данных с ключевым словом RETRY пропускает сообщения в топике Kafka, прочитанные ранее, и загружает данные, начиная с последнего прочитанного офсета топика.
Запрос с RETRY должен полностью повторять содержимое исходного запроса, который создал перезапускаемую операцию записи.
Если ключевое слово не указано, система создает и обрабатывает новую операцию.
Варианты ответа
В ответе возвращается:
- объект ResultSet c одной записью при успешном выполнении запроса;
- исключение при неуспешном выполнении запроса.
Успешный ответ содержит столбцы:
sysCn:- номер выполненной операции записи — при загрузке данных в логическую таблицу;
- пустое значение — при загрузке данных в прокси-таблицу или во внешнюю writable-таблицу;
ts:- дата и время завершения операции записи в формате
YYYY-MM-DD hh:mm:ss.SSSSSS— при выполнении операции вне дельты; - пустое значение — при выполнении операции в дельте;
- дата и время завершения операции записи в формате
rowsAffected— количество затронутых (добавленных, измененных и удаленных) строк. Расчет значения поддерживается для СУБД ADB и ADP.
Запрос с ключевым словом RETRY возвращает в столбце rowsAffected количество строк, затронутых перезапущенной операцией записи, без тех строк, которые успела загрузить операция записи до перезапуска.
Ограничения
Ограничения выполнения
- Выполнение запроса к логической таблице недоступно, если она участвует в незавершенной операции по изменению схемы.
- Выполнение запроса вне дельты недоступно после дельты, закрытой с будущей меткой времени относительно серверного времени.
Ограничения сущностей
- Недоступна загрузка данных в логические и материализованные представления.
Ограничения ключевых слов
- Частичная загрузка данных с указанием
WHERE,LIMITиOFFSETдоступна только при загрузке с помощью внешних readable-таблиц. - Ключевое слово
RETRYнедоступно в запросах к прокси-таблицам и standalone-таблицам.
Ограничения партиционирования
- При загрузке данных в партиционированную таблицу все записи, для которых нет подходящей партиции, игнорируются.
- Одновременная запись данных (загрузка, вставка и удаление) в партиционированную таблицу и ее партиции недоступна.
Ограничения смещений (офсетов) Kafka
- При сбое и откате операций записи смещение не откатывается — оно остается на значении, прочитанном к моменту сбоя.
- Для равенства данных, загружаемых в разнотипные датасорсы, необходимо загружать данные из общего топика и контролировать, что начальные смещения в топике одинаковы для всех участвующих консьюмер-групп. Это актуально для любых коннекторов, кроме Kafka Jet writer, который обычно имеет общую группу для всех датасорсов.
Другие ограничения
- Недоступна одновременная загрузка данных из одного топика в разные таблицы ADG.
- Загрузка данных с помощью readable-таблиц доступна только в ADP и только при наличии установленного коннектора Kafka Jet writer.
- При обработке запроса все неактивные датасорсы пропускаются без возврата ошибки. Ошибка возвращается, если не осталось ни одного активного датасорса, подходящего для исполнения запроса.
Примеры
Загрузка данных в логическую таблицу в дельте
Загрузка с помощью внешней таблицы загрузки marketing.sales_ext_upload:
-- выбор логической базы данных по умолчанию
USE marketing;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Загрузка с помощью внешней readable-таблицы marketing.sales_ext_upload_through_ret:
-- выбор логической базы данных по умолчанию
USE marketing;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00';
-- закрытие дельты
COMMIT DELTA;
Загрузка данных в логическую таблицы вне дельты
-- загрузка данных в логическую таблицу sales вне дельты
INSERT INTO marketing.sales SELECT * FROM marketing.sales_ext_upload;
Загрузка данных в прокси-таблицу
-- создание внешней readable-таблицы для загрузки данных
CREATE READABLE EXTERNAL TABLE marketing.payments_ext_upload_to_proxy (
id BIGINT,
agreement_id BIGINT,
type_code VARCHAR(16),
amount DOUBLE,
currency_code VARCHAR(3),
description VARCHAR,
PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'kafka://$kafka/payments_adp_in'
FORMAT 'AVRO';
-- загрузка данных в прокси-таблицу payments_proxy
INSERT INTO marketing.payments_proxy SELECT * FROM marketing.payments_ext_upload_to_proxy;
Загрузка данных в standalone-таблицу
Загрузка в standalone-таблицу датасорса adp, на которую указывает внешняя writable-таблица agreements_ext_write_adp:
INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;
Загрузка в standalone-таблицу датасорса adqm, на которую указывает внешняя writable-таблица payments_ext_write_adqm:
INSERT INTO marketing.payments_ext_write_adqm SELECT * FROM marketing.payments_ext_upload;
Загрузка данных в партиционированную таблицу
-- выбор логической базы данных по умолчанию
USE marketing;
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_part_ext_upload (
id BIGINT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units BIGINT,
store_id BIGINT,
description VARCHAR(256)
)
LOCATION 'kafka://localhost:2181/sales_part_in'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в партиционированную таблицу sales_partitioned
INSERT INTO sales_partitioned SELECT * FROM sales_part_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Загрузка данных в партицию
-- выбор логической базы данных по умолчанию
USE marketing;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в партицию sales_jan_2023
---- можно использовать ту же внешнюю таблицу загрузки, что и для партиционированной таблицы,
---- или создать отдельную внешнюю таблицу
INSERT INTO sales_feb_2023 SELECT * FROM sales_part_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Перезапуск операций по загрузке данных
-- выбор логической базы данных по умолчанию
USE marketing;
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу sales с помощью внешней таблицы загрузки
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- перезапуск обработки операции по загрузке данных в логическую таблицу sales с помощью внешней таблицы загрузки
RETRY INSERT INTO sales SELECT * FROM sales_ext_upload;
-- запуск загрузки данных в логическую таблицу sales с помощью внешней readable-таблицы
INSERT INTO sales SELECT * FROM sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00';
-- перезапуск обработки операции по загрузке данных в логическую таблицу sales с помощью внешней readable-таблицы
RETRY INSERT INTO sales SELECT * FROM sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00';
-- закрытие дельты
COMMIT DELTA;