Потоковая загрузка данных
Содержание раздела
Система поддерживает потоковую загрузку данных в следующие логические сущности:
Потоковая загрузка данных доступна при программном подключении по протоколам HTTP/2 и HTTP/1.1.
Для вставки небольших объемов данных до сотни записей можно использовать функцию обновления данных.
Предварительный шаг для standalone-таблиц
Загрузка данных в standalone-таблицу выполняется с помощью внешней writable-таблицы. Если она отсутствует, ее необходимо создать.
Чтобы создать внешнюю writable-таблицу, выполните запрос CREATE WRITABLE EXTERNAL TABLE:
- если standalone-таблица отсутствует и ее нужно создать при создании внешней таблицы, укажите в запросе ключевое слово
OPTIONS
со значениемauto.create.table.enable=true
; - иначе выполните запрос без ключевого слова
OPTIONS
.
Как загрузить поток данных
Чтобы загрузить поток данных (см. примеры реализации HTTP-клиента для потоковой загрузки ниже):
- Откройте HTTP-соединение по протоколу HTTP/2 или HTTP/1.1.
- Вызовите HTTP-метод upload для отправки потока данных.
- Отправьте порции данных в формате Avro или CSV.
- Закройте HTTP-соединение.
Пример реализации HTTP-клиента для потоковой загрузки данных в CSV-формате
Ниже показан пример реализации HTTP-клиента для потоковой загрузки данных в CSV-формате. Код написан на Java с использованием Vertx.
// создание инстанса vertx
Vertx vertx = Vertx.vertx();
// создание HTTP-клиента с заданными параметрами
HttpClientOptions clientOptions = new HttpClientOptions();
// установка протокола HTTP/2
clientOptions.setProtocolVersion(HttpVersion.HTTP_2);
// отключение переключения с HTTP/1.1 на HTTP/2
clientOptions.setHttp2ClearTextUpgrade(false);
HttpClient httpClient = vertx.createHttpClient(clientOptions);
// отправка запроса на потоковую загрузку данных в логическую таблицу marketing.sales
// запрос с идентификатором 12345 (queryId=12345) добавит и (или) обновит записи таблицы (sysOp=0)
// при разрыве соединения уже загруженные данные будут сохранены (commitOnDisconnect=true)
Future<Void> requestFuture = httpClient.request(HttpMethod.POST, 9090, "localhost", "/api/v1/datamarts/marketing/entities/sales/upload?sysOp=0&queryId=12345&commitOnDisconnect=true")
.compose(request -> {
// включение разделения данных на порции
request.setChunked(true);
// добавление заголовка Content-Type, определяющего тип загружаемых данных (CSV)
request.putHeader(HttpHeaders.CONTENT_TYPE, "text/csv");
// старт потока загрузки данных
Future<Void> requestSendFuture = request.sendHead()
.compose(unused -> {
// отправка пользовательских данных в выбранном формате
// используйте request.write(Buffer) для отправки порции данных
// после отправки всех порций данных закончите запрос с помощью request.end()
// пример: загрузка всех порций данных одним вызовом
return request.end(csvData());
});
// обработка ответов от Prostore
Future<Void> responseFuture = request.response()
.compose(response -> {
// вывод кода ответа
System.out.println("Response status code: " + response.statusCode());
Promise<Void> promise = Promise.promise();
// обработка ошибок
response.exceptionHandler(promise::tryFail);
// обработка данных, полученных от Prostore
response.handler(dataFromServer -> {
System.out.println("Response data: " + dataFromServer.toString());
});
// завершение формирования и обработки ответа
response.endHandler(v -> {
promise.tryComplete();
});
return promise.future();
})
// финальная обработка ответа: вывод успешного ответа или логирование ошибки
.onComplete(ar -> {
request.reset();
if (ar.succeeded()) {
System.out.println("Response success");
}
if (ar.failed()) {
ar.cause().printStackTrace();
}
});
return Future.join(requestSendFuture, responseFuture)
.<Void>mapEmpty();
})
// завершение потоковой загрузки данных: вывод успешного ответа или логирование ошибки
.onSuccess(v -> System.out.println("Upload done"))
.onFailure(Throwable::printStackTrace);
requestFuture.toCompletionStage()
.toCompletableFuture()
// ожидание завершения процесса
.get();
// закрытие инстанса vertx
vertx.close();
Пример реализации HTTP-клиента для потоковой загрузки данных в Avro-формате
Ниже показан пример реализации HTTP-клиента для потоковой загрузки данных в Avro-формате. Код написан на Java с использованием Vertx.
// создание инстанса vertx
Vertx vertx = Vertx.vertx();
// создание HTTP-клиента с заданными параметрами
HttpClientOptions clientOptions = new HttpClientOptions();
// установка протокола HTTP/2
clientOptions.setProtocolVersion(HttpVersion.HTTP_2);
// отключение переключения с HTTP/1.1 на HTTP/2
clientOptions.setHttp2ClearTextUpgrade(false);
HttpClient httpClient = vertx.createHttpClient(clientOptions);
// отправка запроса на потоковую загрузку данных в логическую таблицу marketing.sales
// запрос с идентификатором 12345 (queryId=12345) добавит и (или) обновит записи таблицы (sysOp=0)
// при разрыве соединения уже загруженные данные будут сохранены (commitOnDisconnect=true)
Future<Void> requestFuture = httpClient.request(HttpMethod.POST, 9090, "localhost", "/api/v1/datamarts/marketing/entities/sales/upload?sysOp=0&queryId=12345&commitOnDisconnect=true")
.compose(request -> {
// включение разделения данных на порции
request.setChunked(true);
// добавление заголовка Content-Type, определяющего тип загружаемых данных (Avro)
request.putHeader(HttpHeaders.CONTENT_TYPE, "application/avro");
// старт потока загрузки данных
Future<Void> requestSendFuture = request.sendHead()
.compose(unused -> {
// отправка пользовательских данных в выбранном формате
// используйте request.write(Buffer) для отправки порции данных
// после отправки всех порций данных закончите запрос с помощью request.end()
// пример: загрузка всех порций данных одним вызовом
return request.end(avroData());
});
// обработка ответов от Prostore
Future<Void> responseFuture = request.response()
.compose(response -> {
// вывод кода ответа
System.out.println("Response status code: " + response.statusCode());
Promise<Void> promise = Promise.promise();
// обработка ошибок
response.exceptionHandler(promise::tryFail);
// обработка данных, полученных от Prostore
response.handler(dataFromServer -> {
System.out.println("Response data: " + dataFromServer.toString());
});
// завершение формирования и обработки ответа
response.endHandler(v -> {
promise.tryComplete();
});
return promise.future();
})
// финальная обработка ответа: вывод успешного ответа или логирование ошибки
.onComplete(ar -> {
request.reset();
if (ar.succeeded()) {
System.out.println("Response success");
}
if (ar.failed()) {
ar.cause().printStackTrace();
}
});
return Future.join(requestSendFuture, responseFuture)
.<Void>mapEmpty();
})
// завершение потоковой загрузки данных: вывод успешного ответа или логирование ошибки
.onSuccess(v -> System.out.println("Upload done"))
.onFailure(Throwable::printStackTrace);
requestFuture.toCompletionStage()
.toCompletableFuture()
// ожидание завершения процесса
.get();
// закрытие инстанса vertx
vertx.close();