INSERT SELECT FROM external_table

Содержание раздела
  1. Требования к загружаемым данным
  2. Способы загрузки данных
  3. Особенности загрузки данных в standalone-таблицы
  4. Консьюмер-группы и офсеты Kafka
  5. Перезапуск и отмена операций
  6. Статистика
  7. Синтаксис
    1. Ключевое слово SELECT { * | column_name, ... }
    2. Ключевое слово FROM [db_name.]{upload_ext_table_name | readable_ext_table_name}
    3. Ключевое слово WHERE condition
    4. Ключевое слово LIMIT count
    5. Ключевое слово OFFSET start
    6. Ключевое слово RETRY
  8. Варианты ответа
  9. Ограничения
    1. Ограничения выполнения
    2. Ограничения сущностей
    3. Ограничения ключевых слов
    4. Ограничения партиционирования
    5. Ограничения смещений (офсетов) Kafka
    6. Другие ограничения
  10. Примеры
    1. Загрузка данных в логическую таблицу в дельте
    2. Загрузка данных в логическую таблицы вне дельты
    3. Загрузка данных в прокси-таблицу
    4. Загрузка данных в standalone-таблицу
    5. Загрузка данных в партиционированную таблицу
    6. Загрузка данных в партицию
    7. Перезапуск операций по загрузке данных

Поддерживается в версиях: 7.0 / 6.12 / 6.11 / 6.10 / 6.9 / 6.8 / 6.7 / 6.6 / 6.5 / 6.4 / 6.3 / 6.2 / 6.1 / 6.0 / 5.8 / 5.7 / 5.6 / 5.5 / 5.4 / 5.3 / 5.2 / 5.1 / 5.0.

Запрос загружает данные из топика Kafka в указанную таблицу: логическую таблицу, прокси-таблицу или standalone-таблицу. Путь к топику задается с помощью внешней таблицы, указываемой в запросе.

Об успешности загрузки данных можно судить по успешному ответу на запрос загрузки данных. Другие факторы такие, как смещение офсетов в топике или наличие данных в физической таблице <table>_staging, не означают успешность загрузки данных.

При загрузке данных в прокси-таблицы и standalone-таблицы учитывайте ограничения таблиц целевой СУБД.

Перемещение данных напрямую между двумя топиками Kafka невозможно средствами системы. Если такое перемещение необходимо, вы можете загрузить данные из топика в таблицу и затем выгрузить данные из таблицы в другой топик.

Требования к загружаемым данным

Данные в топике должны иметь формат, подходящий для загрузки. Валидация и, если нужно, корректировка данных должна проводиться внешней системой до загрузки данных в топик.

Способы загрузки данных

Загрузить данные можно с помощью внешней таблицы загрузки или внешней readable-таблицы, как описано см. в разделе Работа с данными > Загрузка данных из Kafka > Как загрузить данные.

При использовании внешней readable-таблицы возможна частичная загрузка данных из топика с помощью ключевых слов WHERE, LIMIT, OFFSET.

Загрузка данных с использованием внешней readable-таблицы возможна только в СУБД ADP. Для этого способа загрузки нужен установленный коннектор Kafka Jet writer.

Особенности загрузки данных в standalone-таблицы

Синтаксис загрузки в standalone-таблицу подразумевает использование внешней writable-таблицы, которая указывает на нужную standalone-таблицу.

Консьюмер-группы и офсеты Kafka

Данные загружаются из Kafka в датасорсы с помощью коннекторов, каждый из которых использует свою консьюмер-группу.

Каждый коннектор читает данные с последнего смещения (офсета) своей группы и обновляет его по ходу загрузки. При сбое и откате операций записи смещение не откатывается — оно остается на значении, прочитанном к моменту сбоя.

Для коннекторов одного типа используйте разные имена консьюмер-групп, чтобы избежать конкуренции за топик и нестабильной загрузки.

Перезапуск и отмена операций

Незавершенную операцию записи, сформированную системой при обработке запроса, можно перезапустить или отменить. Подробнее о способах обработки незавершенных операций см. в разделе Управление операциями записи.

Статистика

Запросы INSERT SELECT FROM external_table учитываются в категории статистики UPLOAD. Статистика доступна с помощью запроса GET_ENTITY_STATISTICS и GET-методов получения статистики.

Синтаксис

Загрузка с помощью внешней таблицы загрузки:

[RETRY] INSERT INTO [db_name.]table_name SELECT { * | column_name, ... } 
FROM [db_name.]upload_ext_table_name

Загрузка с помощью внешней readable-таблицы:

[RETRY] INSERT INTO [db_name.]table_name SELECT { * | column_name, ... } 
FROM [db_name.]readable_ext_table_name
[WHERE condition]
[LIMIT count] [OFFSET start]

Параметры:

db_name

Имя логической базы данных. Опционально, если выбрана логическая БД, используемая по умолчанию.

table_name

Имя таблицы-приемника данных. Возможные значения:

  • имя логической таблицы,
  • имя прокси-таблицы,
  • имя внешней writable-таблицы, указывающей на нужную standalone-таблицу.

Логическая таблица может быть любого вида.

Ключевое слово SELECT { * | column_name, ... }

Задает имена всех столбцов внешней таблицы, определяющей параметры загрузки. Имена, типы и порядок указанных столбцов должны соответствовать именам, типам и порядку столбцов (полей) в таблице-приемнике данных, а также типам и порядку полей в топике Kafka, из которого загружаются данные.

Ключевое слово FROM [db_name.]{upload_ext_table_name | readable_ext_table_name}

Задает имя внешней таблицы загрузки или внешней readable-таблицы, определяющей параметры загрузки.

Сообщения Kafka, загружаемые в логические таблицы, должны содержать служебное поле sys_op с типом avro.int. Остальные поля сообщений должны соответствовать полям внешней таблицы, используемой для вычитки сообщений Kafka, по порядку и типам. В свою очередь, поля внешней таблицы должны соответствовать полям таблицы-приемника данных по порядку, именам и типам.
Подробнее о требованиях к загружаемым данным см. в разделе Формат загрузки данных.

Ключевое слово WHERE condition

Задает условия выбора данных из топика при их загрузке в таблицу. Поддерживается только в запросах на загрузку данных с помощью внешней readable-таблицы.

Значение может содержать выражения:

  • >,
  • >=,
  • <,
  • <=,
  • =,
  • <>,
  • IS NULL,
  • IS NOT NULL,
  • AND,
  • OR.

Если ключевое слово не указано, из топика загружаются все непрочитанные ранее данные.

Ключевое слово LIMIT count

Задает количество строк, загружаемых из топика в таблицу. Поддерживается только в запросах на загрузку данных с помощью внешней readable-таблицы.

Значение ключевого слово не влияет на количество сообщений, считываемых из топика, — при обработке каждого запроса коннектор считывает все непрочитанные ранее сообщения из топика.

Поддерживаемые сочетания LIMIT и OFFSET:

  • LIMIT,
  • LIMIT count OFFSET start.

Строки загружаются без гарантий сохранения порядка их следования, поэтому рекомендуется использовать загрузку с LIMIT и OFFSET только в тестовых целях.

Если ключевое слово не указано, в таблицу загружаются все данные топика, которые не были загружены ранее.

Ключевое слово OFFSET start

Задает количество строк, пропускаемых при загрузке данных из топика в таблицу. Поддерживается только в запросах на загрузку данных с помощью внешней readable-таблицы.

Значение ключевого слово не влияет на количество сообщений, считываемых из топика, — при обработке каждого запроса коннектор считывает все непрочитанные ранее сообщения из топика.

Поддерживаемые сочетания LIMIT и OFFSET:

  • LIMIT,
  • LIMIT count OFFSET start.

Строки загружаются без гарантий сохранения порядка их следования, поэтому рекомендуется использовать загрузку с LIMIT и OFFSET только в тестовых целях.

Если ключевое слово не указано, в таблицу загружаются все данные топика, которые не были загружены ранее.

Ключевое слово RETRY

Перезапускает обработку незавершенной операции записи со статусом 0 («Выполняется») и поддерживается только для логических таблиц. Подробнее обо всех способах перезапуска и отмены операций см. в разделе Управление операциями записи.

Запрос на загрузку данных с ключевым словом RETRY пропускает сообщения в топике Kafka, прочитанные ранее, и загружает данные, начиная с последнего прочитанного офсета топика.

Запрос с RETRY должен полностью повторять содержимое исходного запроса, который создал перезапускаемую операцию записи.

Если ключевое слово не указано, система создает и обрабатывает новую операцию.

Варианты ответа

В ответе возвращается:

  • объект ResultSet c одной записью при успешном выполнении запроса;
  • исключение при неуспешном выполнении запроса.

Успешный ответ содержит столбцы:

  • sysCn:
    • номер выполненной операции записи — при загрузке данных в логическую таблицу;
    • пустое значение — при загрузке данных в прокси-таблицу или во внешнюю writable-таблицу;
  • ts:
    • дата и время завершения операции записи в формате YYYY-MM-DD hh:mm:ss.SSSSSS — при выполнении операции вне дельты;
    • пустое значение — при выполнении операции в дельте;
  • rowsAffected — количество затронутых (добавленных, измененных и удаленных) строк. Расчет значения поддерживается для СУБД ADB и ADP.

Запрос с ключевым словом RETRY возвращает в столбце rowsAffected количество строк, затронутых перезапущенной операцией записи, без тех строк, которые успела загрузить операция записи до перезапуска.

Ограничения

Ограничения выполнения

  • Выполнение запроса к логической таблице недоступно, если она участвует в незавершенной операции по изменению схемы.
  • Выполнение запроса вне дельты недоступно после дельты, закрытой с будущей меткой времени относительно серверного времени.

Ограничения сущностей

  • Недоступна загрузка данных в логические и материализованные представления.

Ограничения ключевых слов

  • Частичная загрузка данных с указанием WHERE, LIMIT и OFFSET доступна только при загрузке с помощью внешних readable-таблиц.
  • Ключевое слово RETRY недоступно в запросах к прокси-таблицам и standalone-таблицам.

Ограничения партиционирования

  • При загрузке данных в партиционированную таблицу все записи, для которых нет подходящей партиции, игнорируются.
  • Одновременная запись данных (загрузка, вставка и удаление) в партиционированную таблицу и ее партиции недоступна.

Ограничения смещений (офсетов) Kafka

  • При сбое и откате операций записи смещение не откатывается — оно остается на значении, прочитанном к моменту сбоя.
  • Для равенства данных, загружаемых в разнотипные датасорсы, необходимо загружать данные из общего топика и контролировать, что начальные смещения в топике одинаковы для всех участвующих консьюмер-групп. Это актуально для любых коннекторов, кроме Kafka Jet writer, который обычно имеет общую группу для всех датасорсов.

Другие ограничения

  • Недоступна одновременная загрузка данных из одного топика в разные таблицы ADG.
  • Загрузка данных с помощью readable-таблиц доступна только в ADP и только при наличии установленного коннектора Kafka Jet writer.
  • При обработке запроса все неактивные датасорсы пропускаются без возврата ошибки. Ошибка возвращается, если не осталось ни одного активного датасорса, подходящего для исполнения запроса.

Примеры

Загрузка данных в логическую таблицу в дельте

Загрузка с помощью внешней таблицы загрузки marketing.sales_ext_upload:

-- выбор логической базы данных по умолчанию
USE marketing;

-- открытие новой дельты
BEGIN DELTA;

-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- закрытие дельты
COMMIT DELTA;

Загрузка с помощью внешней readable-таблицы marketing.sales_ext_upload_through_ret:

-- выбор логической базы данных по умолчанию
USE marketing;

-- открытие новой дельты
BEGIN DELTA;

-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00';

-- закрытие дельты
COMMIT DELTA;

Загрузка данных в логическую таблицы вне дельты

-- загрузка данных в логическую таблицу sales вне дельты
INSERT INTO marketing.sales SELECT * FROM marketing.sales_ext_upload;

Загрузка данных в прокси-таблицу

-- создание внешней readable-таблицы для загрузки данных
CREATE READABLE EXTERNAL TABLE marketing.payments_ext_upload_to_proxy (
  id BIGINT,
  agreement_id BIGINT,
  type_code VARCHAR(16),
  amount DOUBLE,
  currency_code VARCHAR(3),
  description VARCHAR,
  PRIMARY KEY(id)
)
DISTRIBUTED BY (id)
LOCATION 'kafka://$kafka/payments_adp_in'
FORMAT 'AVRO';

-- загрузка данных в прокси-таблицу payments_proxy
INSERT INTO marketing.payments_proxy SELECT * FROM marketing.payments_ext_upload_to_proxy;

Загрузка данных в standalone-таблицу

Загрузка в standalone-таблицу датасорса adp, на которую указывает внешняя writable-таблица agreements_ext_write_adp:

INSERT INTO marketing.agreements_ext_write_adp SELECT * FROM marketing.agreements_ext_upload;

Загрузка в standalone-таблицу датасорса adqm, на которую указывает внешняя writable-таблица payments_ext_write_adqm:

INSERT INTO marketing.payments_ext_write_adqm SELECT * FROM marketing.payments_ext_upload;

Загрузка данных в партиционированную таблицу

-- выбор логической базы данных по умолчанию
USE marketing;

-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_part_ext_upload (
  id BIGINT,
  transaction_date TIMESTAMP,
  product_code VARCHAR(256),
  product_units BIGINT,
  store_id BIGINT,
  description VARCHAR(256)
)
LOCATION  'kafka://localhost:2181/sales_part_in'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;

-- открытие новой дельты
BEGIN DELTA;

-- запуск загрузки данных в партиционированную таблицу sales_partitioned
INSERT INTO sales_partitioned SELECT * FROM sales_part_ext_upload;

-- закрытие дельты
COMMIT DELTA;

Загрузка данных в партицию

-- выбор логической базы данных по умолчанию
USE marketing;

-- открытие новой дельты
BEGIN DELTA;

-- запуск загрузки данных в партицию sales_jan_2023
---- можно использовать ту же внешнюю таблицу загрузки, что и для партиционированной таблицы, 
---- или создать отдельную внешнюю таблицу
INSERT INTO sales_feb_2023 SELECT * FROM sales_part_ext_upload;

-- закрытие дельты
COMMIT DELTA;

Перезапуск операций по загрузке данных

-- выбор логической базы данных по умолчанию
USE marketing;

-- открытие новой дельты
BEGIN DELTA;

-- запуск загрузки данных в логическую таблицу sales с помощью внешней таблицы загрузки
INSERT INTO sales SELECT * FROM sales_ext_upload;

-- перезапуск обработки операции по загрузке данных в логическую таблицу sales с помощью внешней таблицы загрузки
RETRY INSERT INTO sales SELECT * FROM sales_ext_upload;

-- запуск загрузки данных в логическую таблицу sales с помощью внешней readable-таблицы
INSERT INTO sales SELECT * FROM sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00';

-- перезапуск обработки операции по загрузке данных в логическую таблицу sales с помощью внешней readable-таблицы
RETRY INSERT INTO sales SELECT * FROM sales_ext_upload_through_ret
WHERE (product_code = 'ABC1111' OR product_code = 'ABC1212') AND transaction_date >= '2023-01-01 00:00:00';

-- закрытие дельты
COMMIT DELTA;