Минимальное развертывание с Kafka
Содержание раздела
- Предустановленные программные средства
- Сборка Prostore
- Настройка СУБД Postgres
- Сборка и установка коннектора Kafka-Postgres
- Запуск сервисов Apache ZooKeeper и Apache Kafka
- Запуск коннектора Kafka-Postgres
- Запуск Prostore
- Подключение к Prostore с помощью SQL-клиента
- Демонстрационный сценарий
- Создание необходимых логических сущностей
- Создание топика Kafka для последующей загрузки данных
- Создание бинарного avro-файла kafka_upload_sales.avro из avro-схемы и данных
- Загрузка avro-файла kafka_upload_sales.avro
- Загрузка данных
- Вставка данных
- Выборка данных
- Выгрузка в топик Kafka
- Удаление логических сущностей
В этом разделе описаны шаги по развертыванию среды в конфигурации, предполагающей единственный датасорс типа ADP. Дополнительная информация приведена в разделе Схемы развертывания.
Предустановленные программные средства
- OC «Альт 8 СП» (8.4);
- yum-utils;
- curl;
- git;
- wget;
- OpenJDK 8;
- Apache Maven 3.6.3;
- СУБД PostgreSQL 13;
- Apache ZooKeeper;
- Apache Kafka (например, в каталоге /opt/kafka);
- SQL-клиент, например DBeaver;
- docker;
- браузер топиков Kafka с возможностью загрузки бинарных данных, например kafkacat.
Сборка Prostore
# клонирование репозитория Prostore
git clone https://repository.datamart.ru/datamarts/prostore ~/prostore
# запуск сборки Prostore средствами Apache Maven
cd ~/prostore
mvn clean install -DskipTests=true
# создание символической ссылки на файл конфигурации
sudo ln -s ~/prostore/dtm-query-execution-core/config/application.yml ~/prostore/dtm-query-execution-core/target/application.yml
# приведение конфигурационного файла к виду, показанному ниже
sudo nano ~/prostore/dtm-query-execution-core/config/application.yml
конфигурационный файл Prostore `application.yml`
# настройки журналирования
logging:
# уровень важности сообщений, записываемых в лог-файл
level:
ru.datamart.prostore: ${DTM_LOGGING_LEVEL:DEBUG}
# настройки сервера Prostore
server:
# номер порта сервиса метрик
port: ${DTM_METRICS_PORT:8080}
# настройки swagger-ui
springdoc:
api-docs:
enabled: false
swagger-ui:
# путь до openapi.yml
url: /openapi.yml
# путь до swagger-ui
path: /swagger-ui
# настройки управления Prostore
management:
# настройки конечных точек Prostore
endpoints:
# признак генерации метрик со стороны Prostore
enabled-by-default: ${DTM_METRICS_ENABLED:true}
# настройки видимости метрик через веб-соединения
web:
exposure:
# состав метрик, видимых через веб-соединения
include: ${DTM_METRICS_SCOPE:info, health}
# настройки сервиса исполнения запросов
core:
# настройки плагинов
plugins:
# список используемых типов датасорсов
active: ${CORE_PLUGINS_ACTIVE:ADP}
# настройки сетевых подключений через HTTP-протокол
http:
# номер порта сервиса исполнения запросов
port: ${DTM_CORE_HTTP_PORT:9090}
# режим оптимизации работы сокета TCP_NODELAY
tcpNoDelay: ${DTM_CORE_HTTP_TCP_NO_DELAY:true}
# режим TCP FAST_OPEN
tcpFastOpen: ${DTM_CORE_HTTP_TCP_FAST_OPEN:true}
# режим оптимизации работы сокета TCP_QUICKACK
tcpQuickAck: ${DTM_CORE_HTTP_TCP_QUICK_ACK:true}
# настройки HTTP-клиентов
webclient:
# режим оптимизации работы сокета TCP_NODELAY
tcpNoDelay: ${DTM_CORE_WEBCLIENT_TCP_NO_DELAY:true}
# режим TCP FAST_OPEN
tcpFastOpen: ${DTM_CORE_WEBCLIENT_TCP_FAST_OPEN:true}
# режим оптимизации работы сокета TCP_QUICKACK
tcpQuickAck: ${DTM_CORE_WEBCLIENT_TCP_QUICK_ACK:true}
# время ожидания подключения в миллисекундах
connectionTimeoutMs: ${DTM_CORE_WEBCLIENT_CONNECTION_TIMEOUT_MS:30000}
# размер пула коннектов для HTTP/1.1
poolSize: ${DTM_CORE_WEBCLIENT_POOL_SIZE:20}
# размер пула коннектов для HTTP/2
http2PoolSize: ${DTM_CORE_WEBCLIENT_HTTP2_POOL_SIZE:20}
# keepalive для HTTP/2 в секундах (0 — значение не ограничено)
http2KeepAliveSec: ${DTM_CORE_WEBCLIENT_HTTP2_KEEP_ALIVE_SEC:0}
# настройки окружения
env:
# имя окружения для формирования полных имен логических БД
name: ${DTM_NAME:test}
# настройки метрик prometheus
prometheus:
# признак сбора метрик prometheus
enabled: ${PROMETHEUS_ENABLED:true}
# настройки аутентификации
auth:
# путь к API-методу авторизационного сервиса, возвращающему информацию о публичных ключах (JWKS или JSON Web Key Set) для проверки авторизационных токенов
jwksUri: ${AUTH_JWKS_URI:}
# список ролей, назначаемых по умолчанию
defaultRoles: ${CORE_DEFAULT_ROLES:env_owner}
# настройки восстановления состояния при запуске и перезапуске ноды Prostore
restoration:
# признак восстановления состояния при запуске и перезапуске ноды Prostore
autoRestoreState: ${AUTO_RESTORE_STATE:true}
# настройки синхронизации материализованных представлений
matviewsync:
# периодичность запуска синхронизации в миллисекундах; если значение равно 0, синхронизация отключена
periodMs: ${MATERIALIZED_VIEWS_SYNC_PERIOD_MS:5000}
# максимальное количество попыток синхронизации представления, после перезапуска ноды счетчик обнуляется
retryCount: ${MATERIALIZED_VIEWS_RETRY_COUNT:10}
# максимальное количество представлений, синхронизируемых одновременно
maxConcurrent: ${MATERIALIZED_VIEWS_CONCURRENT:2}
# настройки обработки retention-правил
datacooling:
# периодичность запуска обработки retention-правил в миллисекундах; если значение равно 0, обработка отключена
periodMs: ${DATA_COOLING_RUN_PERIOD_MS:600000}
# максимальное количество таблиц, обрабатываемых одновременно
maxConcurrent: ${DATA_COOLING_CONCURRENT:2}
# периодичность проверки запросов, запущенных на ноде кластера, для удаления данных согласно retention-правилам (в миллисекундах)
checkQueriesPeriodMs: ${DATA_COOLING_CHECK_QUERIES_PERIOD_MS:30000}
# периодичность проверки доступности нод кластера для удаления данных согласно retention-правилам (в миллисекундах)
checkTrimPeriodMs: ${DATA_COOLING_CHECK_TRIM_PERIOD_MS:30000}
# настройки очереди операций по изменению логической схемы данных
ddlqueue:
# признак использования очереди операций; до версии 5.6.1 очередь не использовалась
enabled: ${CORE_DDL_QUEUE_ENABLED:true}
# настройки tslog
tslog:
# количество уровней вложенности элементов, отсчитываемое от корневого элемента tslog;
# значение можно изменять только до тех пор, пока в логических БД окружения нет операций записи и дельт
maxDepth: ${TSLOG_MAX_DEPTH:2}
# максимальное количество записей в одном элементе tslog
pageSize: ${TSLOG_PAGE_SIZE:1600}
# настройки источника данных
datasource:
# настройки для EDML-операторов
edml:
# количество записей, по умолчанию выгружаемых в одном сообщении топика Каfka
defaultChunkSize: ${EDML_DEFAULT_CHUNK_SIZE:1000}
# период проверки статуса плагина в миллисекундах
pluginStatusCheckPeriodMs: ${EDML_STATUS_CHECK_PERIOD_MS:1000}
# время ожидания (в миллисекундах) до тайм-аута при работе с первым смещением в топике Kafka
firstOffsetTimeoutMs: ${EDML_FIRST_OFFSET_TIMEOUT_MS:15000}
# время ожидания (в миллисекундах) до тайм-аута при ожидании смены смещения в топике Kafka
changeOffsetTimeoutMs: ${EDML_CHANGE_OFFSET_TIMEOUT_MS:10000}
# настройки ZooKeeper
zookeeper:
# сетевой адрес хоста ZooKeeper для сервисной БД
connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost}
# время ожидания (в миллисекундах) сервисной БД до тайм-аута при соединении с хостом ZooKeeper
connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
# время бездействия (в миллисекундах) сервисной БД в сессии хоста ZooKeeper до тайм-аута
session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
# корневой путь к хосту ZooKeeper для сервисной БД
chroot: ${ZOOKEEPER_DS_CHROOT:/adtm}
# количество попыток восстановления подключения к ZooKeeper во время выполнения операции (0 — количество не ограничено)
connectionRetryCount: ${ZOOKEEPER_DS_CONNECTION_RETRY_COUNT:0}
# количество попыток выполнения операции в случае срабатывания блокировки optimistic (0 — количество не ограничено)
optimisticRetryCount: ${ZOOKEEPER_DS_OPTIMISTIC_RETRY_COUNT:20}
# количество попыток выполнения операции записи в случае срабатывания блокировки optimistic (0 — количество не ограничено)
writeOptimisticRetryCount: ${ZOOKEEPER_DS_WRITE_OPTIMISTIC_RETRY_COUNT:1000}
# максимальное количество операций, которое можно выполнить до вызова COMMIT DELTA без ключевого слова IMMEDIATE;
# значение должно быть меньше ограничения ZooKeeper, равного 2147483647
maxSequenceSize: ${ZOOKEEPER_DS_MAX_SEQUENCE_SIZE:2000000000}
# настройки взаимодействия сервиса исполнения запросов с брокером сообщений Kafka
kafka:
producer:
property:
# сериализатор строковых ключей
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# сериализатор строковых значений
value.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
property:
# десериализатор ключей как байтов
key.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# десериализатор значений как байтов
value.deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# настройки кластера ZooKeeper для взаимодействия с брокером сообщений Kafka
cluster:
zookeeper:
# сетевой адрес хоста ZooKeeper для брокера сообщений Kafka
connection-string: ${ZOOKEEPER_KAFKA_ADDRESS:localhost}
# время ожидания (в миллисекундах) до тайм-аута при соединении брокера Kafka с хостом ZooKeeper
connection-timeout-ms: ${ZOOKEEPER_KAFKA_CONNECTION_TIMEOUT_MS:30000}
# время бездействия (в миллисекундах) брокера Kafka в сессии хоста ZooKeeper до тайм-аута
session-timeout-ms: ${ZOOKEEPER_KAFKA_SESSION_TIMEOUT_MS:86400000}
# корневой путь к хосту ZooKeeper для брокера сообщений Kafka
chroot: ${ZOOKEEPER_KAFKA_CHROOT:}
# максимальное количество повторных попыток соединения с брокером сообщений Kafka
connectionRetryCount: ${ZOOKEEPER_KAFKA_CONNECTION_RETRY_COUNT:1}
# настройки администратора Kafka
admin:
# время ожидания (в миллисекундах) входного потока данных брокером сообщений Kafka до тайм-аута
inputStreamTimeoutMs: ${KAFKA_INPUT_STREAM_TIMEOUT_MS:2000}
# настройки статусов публикации событий брокером сообщений Kafka
status.event.publish:
# имя топика Kafka, в который публикуются события
topic: ${KAFKA_STATUS_EVENT_TOPIC:status.event.topic}
# разрешение на публикацию событий
enabled: ${KAFKA_STATUS_EVENT_ENABLED:false}
# дополнительное разрешение на публикацию событий по операциям записи
writeOperationsEnabled: ${KAFKA_STATUS_EVENT_WRITE_OPERATIONS_ENABLED:false}
# дополнительное разрешение на публикацию событий по созданию и удалению внешних таблиц
externalDDLEnabled: ${KAFKA_STATUS_EVENT_EXTERNAL_DDL_ENABLED:false}
# настройки при использовании фреймворка vertx
vertx:
# время в (секундах), после которого заблокированный поток пишет stacktrace
blocking-stacktrace-time: ${DTM_VERTX_BLOCKING_STACKTRACE_TIME:1}
pool:
# максимальный размер пула потоков, выполняющих долгие операции
worker-pool: ${DTM_CORE_WORKER_POOL_SIZE:20}
# максимальный размер пула потоков, обрабатывающих события vertx
event-loop-pool: ${DTM_CORE_EVENT_LOOP_POOL_SIZE:20}
# максимальный размер пула задач в сервисе исполнения запросов
task-pool: ${DTM_CORE_TASK_POOL_SIZE:20}
# время (в миллисекундах) завершения задачи, выполняемой в сервисе исполнения запросов
task-timeout: ${DTM_CORE_TASK_TIMEOUT:86400000}
# максимальный размер пула потоков, обрабатывающих события ZooKeeper по дельтам
delta-watcher-pool: ${DTM_DELTA_WATCHER_POOL_SIZE:10}
# настройки кэширования запросов
cache:
# начальная емкость кэша
initialCapacity: ${CACHE_INITIAL_CAPACITY:100000}
# максимальный размер кэша
maximumSize: ${CACHE_MAXIMUM_SIZE:100000}
# время (в минутах) устаревания кэша после последнего обращения к нему
expireAfterAccessMinutes: ${CACHE_EXPIRE_AFTER_ACCESS_MINUTES:99960}
# настройки отката дельт
delta:
# периодичность (в миллисекундах) проверки операций записи, требующих остановки
rollback-status-calls-ms: ${DELTA_ROLLBACK_STATUS_CALLS_MS:2000}
# количество попыток отката дельты (0 — количество не ограничено, 1 — без повторных попыток)
rollbackRetryCount: ${DELTA_ROLLBACK_RETRY_COUNT:3}
# время ожидания между попытками отката дельты в миллисекундах
rollbackRetryTimeoutMs: ${DELTA_ROLLBACK_RETRY_TIMEOUT_MS:1000}
# настройки отката операций записи
writeOperation:
# количество попыток отката операции записи (0 — количество не ограничено, 1 — без повторных попыток)
rollbackRetryCount: ${WRITE_OPERATION_ROLLBACK_RETRY_COUNT:3}
# время ожидания между попытками отката операции записи в миллисекундах
rollbackRetryTimeoutMs: ${WRITE_OPERATION_ROLLBACK_RETRY_TIMEOUT_MS:1000}
# настройки сбора статистики по сущностям
statistics:
# признак сбора статистики
enabled: ${CORE_STATISTICS_ENABLED:true}
# количество потоков, обрабатывающих сбор статистики
threadsCount: ${CORE_STATISTICS_THREADS_COUNT:2}
# признак подсчета строк сущностей в статистике
dataCountEnabled: ${CORE_STATISTICS_DATA_COUNT_ENABLED:true}
# минимальный интервал (в миллисекундах) между запуском сессий подсчета строк в логических сущностях
dataCountPeriodMs: ${CORE_STATISTICS_DATA_COUNT_PERIOD_MS:0}
# настройки ADP
adp:
# настройки датасорсов типа ADP
datasource:
# отметка элемента в массиве датасорсов и имя датасорса; каждый элемент массива содержит настройки одного датасорса типа ADP
- name: ADP
# окружение (указывает базу данных в датасорсе)
env: ${ADP_ENV_NAME:${DTM_NAME:test}}
# имя пользователя/логин для авторизации в датасорсе
user: ${ADP_USERNAME:dtm}
# пароль для авторизации в датасорсе
password: ${ADP_PASS:}
# сетевой адрес хоста датасорса
host: ${ADP_HOST:localhost}
# сетевой адрес порта на хосте датасорса
port: ${ADP_PORT:5432}
# лимит подключений к датасорсу в одном потоке; лимит по всем потокам равен произведению poolSize и executorsCount
poolSize: ${ADP_MAX_POOL_SIZE:3}
# количество одновременных потоков, исполняющих запросы к датасорсу
executorsCount: ${ADP_EXECUTORS_COUNT:3}
# максимальное время ожидания выполнения запроса в очереди подключений (0 — не ограничено)
poolRequestTimeout: ${ADP_POOL_REQUEST_TIMEOUT:0}
# максимальный размер кэша запроса prepared statement
preparedStatementsCacheMaxSize: ${ADP_PREPARED_CACHE_MAX_SIZE:256}
# максимальный размер запроса prepared statement, который может быть закэширован
preparedStatementsCacheSqlLimit: ${ADP_PREPARED_CACHE_SQL_LIMIT:2048}
# признак кэширования запросов prepared statement
preparedStatementsCache: ${ADP_PREPARED_CACHE:true}
# настройки механизма загрузки данных (MPPW) в датасорс
mppw:
# список сетевых адресов и путь к REST-интерфейсу для загрузки данных в формате: http://host_1:port_1,host_2:port_2,.../newdata/start
restStartLoadUrl: ${ADP_REST_START_LOAD_URL:http://localhost:8096/newdata/start}
# список сетевых адресов и путь к REST-интерфейсу для остановки загрузки данных в формате: http://host_1:port_1,host_2:port_2,.../newdata/stop
restStopLoadUrl: ${ADP_REST_STOP_LOAD_URL:http://localhost:8096/newdata/stop}
# список сетевых адресов и путь для получения информации о версии коннектора в формате: http://host_1:port_1,host_2:port_2,.../versions
restVersionUrl: ${ADP_MPPW_CONNECTOR_VERSION_URL:http://localhost:8096/versions}
# путь метода get-коннектора
restGetEndpoint: ${ADP_MPPW_GET_ENDPOINT:/newdata/get}
# уникальное имя консьюмер-группы для загрузки данных в датасорс; для разных датасорсов одного типа должны быть указаны разные имена
kafkaConsumerGroup: ${ADP_KAFKA_CONSUMER_GROUP:adp-load}
# время ожидания (в миллисекундах) get-метода коннектора до тайм-аута
restGetTimeoutMs: ${ADP_REST_GET_TIMEOUT_MS:5000}
# время ожидания (в миллисекундах) до тайм-аута при проверке доступности коннектора
checkTimeoutMs: ${ADP_MPPW_CONNECTOR_CHECK_TIMEOUT_MS:1000}
# количество попыток установки соединения при обращении к коннектору (0 — количество не ограничено, 1 — без повторных попыток)
retryCount: ${ADP_MPPW_CONNECTOR_RETRY_COUNT:3}
# настройки механизма выгрузки данных (MPPR) из датасорса
mppr:
# список сетевых адресов и путь для запросов на выгрузку данных в формате: http://host_1:port_1,host_2:port_2,.../query
restLoadUrl: ${ADP_MPPR_QUERY_URL:http://localhost:8094/query}
# список сетевых адресов и путь для получения информации о версии коннектора в формате: http://host_1:port_1,host_2:port_2,.../versions
restVersionUrl: ${ADP_MPPR_CONNECTOR_VERSION_URL:http://localhost:8094/versions}
# время ожидания (в миллисекундах) до тайм-аута при проверке доступности коннектора
checkTimeoutMs: ${ADP_MPPR_CONNECTOR_CHECK_TIMEOUT_MS:1000}
# количество попыток установки соединения при обращении к коннектору (0 — количество не ограничено, 1 — без повторных попыток)
retryCount: ${ADP_MPPR_CONNECTOR_RETRY_COUNT:3}
Далее конфигурационный файл application.yml
обозначается термином «конфигурация Prostore».
Настройка СУБД Postgres
# создание в СУБД Postgres SUPERUSER-пользователя c именем и паролем,
# указанными в конфигурации Prostore
# (значения параметров (adp:datasource:user) и (adp:datasource:password) соответственно)
cd /
sudo -u postgres psql -c 'CREATE ROLE dtm WITH LOGIN SUPERUSER'
sudo -u postgres psql -c "ALTER ROLE dtm WITH PASSWORD 'dtm'"
# создание базы данных с именем test, указанным в конфигурации Prostore (env: name)
sudo -u postgres psql -c 'CREATE DATABASE test'
# перезапуск сервиса Postgresql
sudo systemctl reload postgresql-13
Сборка и установка коннектора Kafka-Postgres
# клонирование репозитория kafka-postgres-connector
git clone https://repository.datamart.ru/datamarts/kafka-postgres-connector ~/kafka-postgres-connector
# запуск сборки коннектора kafka-postgres средствами Apache Maven
cd ~/kafka-postgres-connector
mvn clean install -DskipTests=true
# приведение конфигурационных файлов kafka-postgres-writer и kafka-postgres-reader к виду,
# показанному ниже, чтобы значения параметров совпадали со значениями соответствующих параметров конфигурации Prostore
# datasource: postgres: database ~ env: name,
# datasource: postgres: user ~ adp: datasource: user,
# datasource: postgres: password ~ adp: datasource: password,
# datasource: postgres: hosts ~ adp: datasource: host, adp: datasource: port
sudo nano ~/kafka-postgres-connector/kafka-postgres-writer/src/main/resources/application.yml
sudo nano ~/kafka-postgres-connector/kafka-postgres-reader/src/main/resources/application.yml
# создание символических ссылок на файлы конфигурации
sudo ln -s ~/kafka-postgres-connector/kafka-postrges-writer/src/main/resources/application.yml ~/kafka-postgres-connector/kafka-postrges-writer/target/application.yml
sudo ln -s ~/kafka-postgres-connector/kafka-postrges-reader/src/main/resources/application.yml ~/kafka-postgres-connector/kafka-postrges-reader/target/application.yml
Конфигурационный файл kafka-postgres-writer `application.yml`
# настройки журналирования
logging:
# уровни журналирования событий
level:
# уровень важности сообщений, записываемых в лог-файл
ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
# уровень важности сообщений, записываемых в лог-файл по событиям Kafka
org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}
# настройки HTTP-подключений
http:
# порт, на котором работает коннектор
port: ${SERVER_PORT:8096}
# настройки для работы с Vertx
vertx:
# настройки пулов Vertx
pools:
# максимальный размер пула потоков, обрабатывающих события Vertx
eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
# максимальный размер пула потоков, выполняющих долгие операции
workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
# настройки вертиклов Vertx
verticle:
query:
# количество экземпляров вертиклов, обрабатывающих запросы
instances: ${QUERY_VERTICLE_INSTANCES:12}
insert:
# максимальный размер пула потоков, вставляющих данные
poolSize: ${INSERT_WORKER_POOL_SIZE:32}
# периодичность вставки новых данных (в миллисекундах)
insertPeriodMs: ${INSERT_PERIOD_MS:1000}
# размер пакета операций при вставке данных
batchSize: ${INSERT_BATCH_SIZE:500}
consumer:
# максимальный размер пула потоков, считывающих данные
poolSize: ${KAFKA_CONSUMER_WORKER_POOL_SIZE:32}
# максимальный размер результата, возвращаемого по FETCH-запросу к ADP
maxFetchSize: ${KAFKA_CONSUMER_MAX_FETCH_SIZE:10000}
commit:
# размер пула потоков, записывающих смещение в топиках Kafka
poolSize: ${KAFKA_COMMIT_WORKER_POOL_SIZE:1}
# периодичность записи смещения в топиках Kafka (в миллисекундах)
commitPeriodMs: ${KAFKA_COMMIT_WORKER_COMMIT_PERIOD_MS:1000}
# настройки для работы с брокером сообщений Kafka
client:
kafka:
# настройки консьюмера (потребителя) Kafka
consumer:
# периодичность проверки (в миллисекундах) статуса брокера сообщений Kafka
checkingTimeoutMs: ${KAFKA_CHECKING_TIMEOUT_MS:10000}
# время ожидания (в миллисекундах) ответа от брокера сообщений Kafka до тайм-аута
responseTimeoutMs: ${KAFKA_RESPONSE_TIMEOUT_MS:10000}
# количество консьюмеров Kafka
consumerSize: ${KAFKA_CONSUMER_SIZE:10}
# время ожидания (в миллисекундах) до закрытия соединения с брокером сообщений Kafka
closeConsumersTimeout: ${KAFKA_CLOSE_CONSUMER_TIMEOUT:15000}
# свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)
property:
# режим обнуления смещения в топиках Kafka
auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
# признак автоматической записи смещения в топиках Kafka
enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
# периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}
# настройки для работы с ADP
datasource:
postgres:
# имя базы данных ADP
database: ${POSTGRES_DB_NAME:test}
# имя пользователя/логин для авторизации в ADP
user: ${POSTGRES_USERNAME:dtm}
# пароль для авторизации в ADP
password: ${POSTGRES_PASS:dtm}
# сетевой адрес хоста с ADP и номер порта на хосте
hosts: ${POSTGRES_HOSTS:postgres.host:5432}
# максимальный размер пула потоков
poolSize: ${POSTGRES_POOLSIZE:10}
# максимальный размер кэша запроса prepared statement
preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
# максимальный размер запроса prepared statement, который может быть закэширован
preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
# признак кэширования запросов prepared statement
preparedStatementsCache: ${POSTGRES_CACHE:true}
# количество попыток установки соединения (0 — количество не ограничено, 1 — без повторных попыток)
connectRetryCount: ${POSTGRES_CONNECT_RETRY_COUNT:3}
# время ожидания между попытками установки соединения в миллисекундах
connectRetryTimeoutMs: ${POSTGRES_CONNECT_RETRY_TIMEOUT_MS:1000}
Конфигурационный файл kafka-postgres-reader `application.yml`
# настройки журналирования
logging:
# уровни журналирования событий
level:
# уровень важности сообщений, записываемых в лог-файл
ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
# уровень важности сообщений, записываемых в лог-файл по событиям Kafka
org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}
# настройки HTTP-подключений
http:
# порт, на котором работает коннектор
port: ${SERVER_PORT:8094}
# настройки для работы с Vertx
vertx:
# настройки пулов Vertx
pools:
# максимальный размер пула потоков, обрабатывающих события Vertx
eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
# максимальный размер пула потоков, выполняющих долгие операции
workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
# настройки вертиклов Vertx
verticle:
query:
# количество экземпляров, принимающих запросы
instances: ${QUERY_VERTICLE_INSTANCES:12}
producer:
# количество экземпляров, записывающих данные
instances: ${PRODUCER_VERTICLE_INSTANCES:6}
# максимальный размер пула потоков, записывающих данные
poolSize: ${PRODUCER_POOL_SIZE:12}
executor:
# максимальный размер пула потоков, обрабатывающих запросы в базу данных
poolSize: ${EXECUTOR_POOL_SIZE:12}
reader:
# максимальный размер буфера вычитанных строк из базы данных
uploadChunkBuffer: ${UPLOAD_CHUNK_BUFFER:1000}
# настройки для работы с ADP
datasource:
postgres:
# имя базы данных ADP
database: ${POSTGRES_DB_NAME:test}
# имя пользователя/логин для авторизации в ADP
user: ${POSTGRES_USERNAME:dtm}
# пароль для авторизации в ADP
password: ${POSTGRES_PASS:dtm}
# сетевой адрес хоста с ADP и номер порта на хосте
hosts: ${POSTGRES_HOSTS:postgres.host:5432}
# максимальный размер пула потоков
poolSize: ${POSTGRES_POOLSIZE:10}
pipeliningLimit: ${POSTGRES_PIPELINING_LIMIT:1}
# максимальный размер кэша запроса prepared statement
preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
# максимальный размер запроса prepared statement, который может быть закэширован
preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
# признак кэширования запросов prepared statement
preparedStatementsCache: ${POSTGRES_CACHE:true}
# максимальный размер результата, возвращаемого по FETCH-запросу к ADP
fetchSize: ${POSTGRES_FETCH_SIZE:10000}
# настройки для работы с брокером сообщений Kafka
kafka:
client:
property:
# сериализатор строковых ключей
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
# сериализатор строковых значений
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
Запуск сервисов Apache ZooKeeper и Apache Kafka
# запуск одного экземпляра сервера ZooKeeper, если он еще не запущен
sudo systemctl start zookeeper
# запуск сервера Kafka и проверка его состояния
sudo systemctl start kafka
sudo systemctl status kafka
Запуск коннектора Kafka-Postgres
# запуск kafka-postgres-writer в отдельном окне терминала
cd ~/kafka-postgres-connector/kafka-postgres-writer/target
java -jar kafka-postgres-writer-<version>.jar
# запуск kafka-postgres-reader в отдельном окне терминала
cd ~/kafka-postgres-connector/kafka-postgres-reader/target
java -jar kafka-postgres-reader-<version>.jar
Запуск Prostore
Запуск с номером порта для сбора метрик, указанным в конфигурации Prostore (по умолчанию — 8080):
# запуск файла dtm-query-execution-core-<version>.jar (например, dtm-query-execution-core-6.5.0.jar)
cd ~/prostore/dtm-query-execution-core/target
java -jar dtm-query-execution-core-<version>.jar
Чтобы запустить Prostore с другим номером порта для сбора метрик, задайте нужное значение с помощью параметра конфигурации server:port
или переменной окружения DTM_METRICS_PORT
. Подробнее о параметрах конфигурации и способах их переопределения см. в разделе Конфигурация ноды.
Подключение к Prostore с помощью SQL-клиента
Чтобы подключиться к системе, следуйте инструкциям в разделе Подключение с помощью SQL-клиента.
Демонстрационный сценарий
Создание необходимых логических сущностей
-- создание логической базы данных
CREATE DATABASE marketing;
-- выбор логической БД по умолчанию
USE marketing;
-- создание логической таблицы в БД marketing
CREATE TABLE sales (
id BIGINT NOT NULL,
transaction_date TIMESTAMP NOT NULL,
product_code VARCHAR(256) NOT NULL,
product_units BIGINT NOT NULL,
store_id BIGINT NOT NULL,
description VARCHAR(256),
PRIMARY KEY (id)
)
DISTRIBUTED BY (id);
-- создание внешней таблицы загрузки
CREATE UPLOAD EXTERNAL TABLE sales_ext_upload (
id BIGINT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units BIGINT,
store_id BIGINT,
description VARCHAR(256)
)
LOCATION 'kafka://localhost:2181/salesTopic'
FORMAT 'AVRO'
MESSAGE_LIMIT 1000;
-- создание логического представления stores_by_sold_products
CREATE VIEW stores_by_sold_products AS
SELECT store_id, SUM(product_units) AS product_amount
FROM sales
GROUP BY store_id
ORDER BY product_amount DESC
LIMIT 30;
-- создание внешней таблицы выгрузки в топик Kafka "salesTopicOut"
CREATE DOWNLOAD EXTERNAL TABLE sales_ext_download (
id BIGINT,
transaction_date TIMESTAMP,
product_code VARCHAR(256),
product_units BIGINT,
store_id BIGINT,
description VARCHAR(256)
)
LOCATION 'kafka://localhost:2181/salesTopicOut'
FORMAT 'AVRO'
CHUNK_SIZE 1000;
Создание топика Kafka для последующей загрузки данных
Создание топика Kafka salesTopic
в терминале:
cd /opt/kafka/bin
bash kafka-topics.sh --create --replication-factor 1 --partitions 1 --topic salesTopic --zookeeper localhost:2181
Создание бинарного avro-файла kafka_upload_sales.avro из avro-схемы и данных
JSON-файл avro-схемы `kafka_upload_sales.avsc`
{
"name": "sales",
"namespace": "sales",
"type": "record",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "transaction_date",
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
},
{
"name": "product_code",
"type": "string"
},
{
"name": "product_units",
"type": "long"
},
{
"name": "store_id",
"type": "long"
},
{
"name": "description",
"type": "string"
},
{
"name": "sys_op",
"type": "int"
}
]
}
JSON-файл данных `kafka_upload_sales.json`
[
{
"id": 1000111,
"transaction_date": 1614269474000000,
"product_code": "ABC102101",
"product_units": 2,
"store_id": 1000012345,
"description": "Покупка по акции 1+1",
"sys_op": 0
},
{
"id": 1000112,
"transaction_date": 1614334214000000,
"product_code": "ABC102001",
"product_units": 1,
"store_id": 1000000123,
"description": "Покупка без акций",
"sys_op": 0
},
{
"id": 1000020,
"transaction_date": 1614636614000000,
"product_code": "ABC102010",
"product_units": 4,
"store_id": 1000000123,
"description": "Покупка по акции 1+1",
"sys_op": 1
}
]
бинарный AVRO-файл `kafka_upload_sales.avro`
Сохранить бинарный файл
Загрузка avro-файла kafka_upload_sales.avro
Загрузка avro-файла kafka_upload_sales.avro
в топик Kafka salesTopic
через терминал с помощью kafkacat:
# получение docker-образа kafkacat
sudo docker pull edenhill/kcat:1.7.0
# запуск docker-образа kafkacat для загрузки в топик salesTopic
# avro-файла /opt/kafka/sales/kafka_upload_sales.avro
sudo docker run -it --network host \
--volume /opt/kafka/sales/kafka_upload_sales.avro:/data/kafka_upload_sales.avro \
edenhill/kcat:1.7.0 -b localhost:9092 -t salesTopic -P /data/kafka_upload_sales.avro
Загрузка данных
-- открытие новой дельты
BEGIN DELTA;
-- запуск загрузки данных в логическую таблицу sales
INSERT INTO sales SELECT * FROM sales_ext_upload;
-- закрытие дельты
COMMIT DELTA;
Вставка данных
-- открытие новой дельты
BEGIN DELTA;
-- запуск вставки данных в логическую таблицу sales
INSERT INTO sales
(id, transaction_date, product_code, product_units, store_id, description)
VALUES
(2000111, '2020-05-01 13:14:16', 'ABC202010', 7, 1000000123, 'Покупка без акций'),
(2000112, '2020-05-02 16:13:17', 'ABC202011', 11, 1000000456, 'Покупка без акций'),
(2000113, '2020-05-03 21:15:17', 'ABC202012', 5, 1000000789, 'Покупка без акций'),
(2000114, '2020-05-04 23:03:13', 'ABC202013', 7, 1000000123, 'Покупка без акций'),
(2000115, '2020-05-05 14:10:21', 'ABC202014', 21, 1000000623, 'Покупка без акций'),
(2000116, '2020-06-12 08:43:56', 'ABC202015', 32, 1000000987, 'Покупка без акций');
-- закрытие дельты
COMMIT DELTA;
Выборка данных
-- запрос с неявным указанием столбцов и ключевым словом WHERE
SELECT * FROM sales
WHERE store_id = 1000000123;
-- запрос с агрегацией, группировкой и сортировкой данных, а также выбором первых 5 строк
SELECT s.store_id, SUM(s.product_units) AS product_amount
FROM sales AS s
GROUP BY (s.store_id)
ORDER BY product_amount DESC
LIMIT 5;
-- запрос к логическому представлению stores_by_sold_products
SELECT * FROM stores_by_sold_products;
Выгрузка в топик Kafka
-- запуск выгрузки данных из логической таблицы sales
INSERT INTO sales_ext_download
SELECT * FROM sales WHERE product_units > 2;
Удаление логических сущностей
-- удаление внешней таблицы загрузки
DROP UPLOAD EXTERNAL TABLE sales_ext_upload;
-- удаление внешней таблицы выгрузки
DROP DOWNLOAD EXTERNAL TABLE sales_ext_download;