Конфигурация коннекторов
Содержание раздела
Следующие коннекторы требуют настройки конфигурации:
- Kafka-Clickhouse reader connector,
- Kafka-Clickhouse writer connector,
- Kafka-Postgres reader connector,
- Kafka-Postgres writer connector,
- Kafka Jet writer connector.
Ссылки на репозитории с исходным кодом коннекторов доступны в разделе Ресурсы.
Конфигурация коннектора представляет собой текстовый YAML-файл, параметры которого организованы в древовидную структуру. Ниже приведены примеры конфигурационных файлов, где для каждого параметра указаны:
- назначение,
- имя,
- имя переменной окружения,
- значение параметра.
Переопределение параметров конфигурации
По умолчанию коннектор использует конфигурационный файл, входящий в состав JAR-файла коннектора. Значения параметров конфигурации можно переопределить любым из способов:
- поместить свой YAML-файл с нужными значениями параметров в директорию
<рабочая_директория_коннектора>/config
; - задать переменные окружения.
Если параметр определен в нескольких источниках, то наиболее приоритетным считается значение переменной окружения, следующим источником по приоритету — конфигурация в директории config
, затем — конфигурация в составе JAR-файла.
Переопределить значение параметра с помощью переменной окружения можно, если эта переменная определена в конфигурационном файле. Если вы поместили свой конфигурационный файл в директорию config
, он должен определять нужные переменные. При использовании конфигурационного файла из JAR-файла ничего определять не нужно, так как все переменные в нем уже определены.
Конфигурация коннектора Kafka-Clickhouse reader
# настройки журналирования
logging:
# уровни журналирования событий
level:
# уровень важности сообщений, записываемых в лог-файл
ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
# уровень важности сообщений, записываемых в лог-файл по событиям Kafka
org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}
# настройки HTTP-подключений
http:
# порт, на котором работает коннектор
port: ${SERVER_PORT:8080}
# настройки для работы с Vertx
vertx:
# настройки пулов Vertx
pools:
# максимальный размер пула потоков, обрабатывающих события Vertx
eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
# максимальный размер пула потоков, выполняющих долгие операции
workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
# настройки вертиклов Vertx
verticle:
query:
# количество экземпляров, принимающих запросы
instances: ${QUERY_VERTICLE_INSTANCES:12}
producer:
# количество экземпляров, записывающих данные
instances: ${PRODUCER_VERTICLE_INSTANCES:6}
# максимальный размер пула потоков, записывающих данные
poolSize: ${PRODUCER_POOL_SIZE:12}
executor:
# максимальный размер пула потоков, обрабатывающих запросы в базу данных
poolSize: ${EXECUTOR_POOL_SIZE:12}
reader:
# максимальный размер буфера вычитанных строк из базы данных
uploadChunkBuffer: ${UPLOAD_CHUNK_BUFFER:1000}
# настройки для работы с ADQM
datasource:
clickhouse:
# имя базы данных в ADQM
database: ${CLICKHOUSE_DB_NAME:test1}
# имя пользователя/логин для авторизации в ADQM
user: ${CLICKHOUSE_USERNAME:default}
# пароль для авторизации в ADQM
password: ${CLICKHOUSE_PASS:}
# сетевой адрес хоста с ADQM и номер порта на хосте
hosts: ${CLICKHOUSE_HOSTS:clickhouse.host:8123}
# максимальный размер результата, возвращаемого по FETCH-запросу к ADQM
fetchSize: ${CLICKHOUSE_FETCH_SIZE:1000}
# настройки для работы с брокером сообщений Kafka
kafka:
clickhouse:
# настройки производителя данных
producer:
property:
# сериализатор строковых ключей
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
# сериализатор строковых значений
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
# настройки кластера
cluster:
# сетевой адрес хоста Zookeeper для брокера сообщений Kafka
zookeeperHosts: ${ZOOKEEPER_HOSTS:zk-1.dtm.local}
# корневой путь к хосту Zookeeper для брокера сообщений Kafka
rootPath: ${KAFKA_CLUSTER_ROOTPATH:arenadata/cluster/21}
Конфигурация коннектора Kafka-Clickhouse writer
# настройки журналирования
logging:
# уровни журналирования
level:
# уровень важности сообщений, записываемых в лог-файл
ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
# уровень важности сообщений, записываемых в лог-файл по событиям Kafka
org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}
# настройки HTTP-подключений
http:
# порт, на котором работает коннектор
port: ${SERVER_PORT:8080}
# настройки для работы с Vertx
vertx:
# настройки пулов Vertx
pools:
# максимальный размер пула потоков, обрабатывающих события Vertx
eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
# максимальный размер пула потоков, выполняющих долгие операции
workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
# настройки вертиклов Vertx
verticle:
query:
# количество экземпляров вертиклов, обрабатывающих запросы
instances: ${QUERY_VERTICLE_INSTANCES:12}
insert:
# максимальный размер пула потоков, вставляющих данные
poolSize: ${INSERT_WORKER_POOL_SIZE:32}
# периодичность вставки новых данных (в миллисекундах)
insertPeriodMs: ${INSERT_PERIOD_MS:1000}
# размер пакета операций при вставке данных
batchSize: ${INSERT_BATCH_SIZE:500}
consumer:
# максимальный размер пула потоков, считывающих данные
poolSize: ${KAFKA_CONSUMER_WORKER_POOL_SIZE:32}
# максимальный размер результата, возвращаемого по FETCH-запросу к ADQM
maxFetchSize: ${KAFKA_CONSUMER_MAX_FETCH_SIZE:10000}
commit:
# размер пула потоков, записывающих смещение в топиках Kafka
poolSize: ${KAFKA_COMMIT_WORKER_POOL_SIZE:1}
# периодичность записи смещения в топиках Kafka (в миллисекундах)
commitPeriodMs: ${KAFKA_COMMIT_WORKER_COMMIT_PERIOD_MS:1000}
# настройки для работы с брокером сообщений Kafka
client:
kafka:
# настройки консьюмера (потребителя) Kafka
consumer:
# периодичность проверки (в миллисекундах) статуса брокера сообщений Kafka
checkingTimeoutMs: ${KAFKA_CHECKING_TIMEOUT_MS:10000}
# время ожидания (в миллисекундах) ответа от брокера сообщений Kafka до тайм-аута
responseTimeoutMs: ${KAFKA_RESPONSE_TIMEOUT_MS:10000}
# количество консьюмеров Kafka
consumerSize: ${KAFKA_CONSUMER_SIZE:10}
# время ожидания (в миллисекундах) до закрытия соединения с брокером сообщений Kafka
closeConsumersTimeout: ${KAFKA_CLOSE_CONSUMER_TIMEOUT:15000}
# свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)
property:
# список адресов и портов Bootstrap-серверов брокера сообщений Kafka
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:kafka.host:9092}
# режим обнуления смещения в топиках Kafka
auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
# признак автоматической записи смещения в топиках Kafka
enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
# периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}
# настройки окружения
env:
# имя окружения
name: ${ENV:test}
# настройки для работы с ADQM
datasource:
clickhouse:
# имя базы данных в ADQM
database: ${CLICKHOUSE_DB_NAME:test1}
# имя пользователя/логин для авторизации в ADQM
user: ${CLICKHOUSE_USERNAME:default}
# пароль для авторизации в ADQM
password: ${CLICKHOUSE_PASS:}
# список хостов ADQM в формате: address1:port1,address2:port2...addressN:portN
hosts: ${CLICKHOUSE_HOSTS:clickhouse.host:8123}
Конфигурация коннектора Kafka-Postgres reader
# настройки журналирования
logging:
# уровни журналирования событий
level:
# уровень важности сообщений, записываемых в лог-файл
ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
# уровень важности сообщений, записываемых в лог-файл по событиям Kafka
org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}
# настройки HTTP-подключений
http:
# порт, на котором работает коннектор
port: ${SERVER_PORT:8094}
# настройки для работы с Vertx
vertx:
# настройки пулов Vertx
pools:
# максимальный размер пула потоков, обрабатывающих события Vertx
eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
# максимальный размер пула потоков, выполняющих долгие операции
workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
# настройки вертиклов Vertx
verticle:
query:
# количество экземпляров, принимающих запросы
instances: ${QUERY_VERTICLE_INSTANCES:12}
producer:
# количество экземпляров, записывающих данные
instances: ${PRODUCER_VERTICLE_INSTANCES:6}
# максимальный размер пула потоков, записывающих данные
poolSize: ${PRODUCER_POOL_SIZE:12}
executor:
# максимальный размер пула потоков, обрабатывающих запросы в базу данных
poolSize: ${EXECUTOR_POOL_SIZE:12}
reader:
# максимальный размер буфера вычитанных строк из базы данных
uploadChunkBuffer: ${UPLOAD_CHUNK_BUFFER:1000}
# настройки для работы с ADP
datasource:
postgres:
# имя базы данных ADP
database: ${POSTGRES_DB_NAME:db}
# имя пользователя/логин для авторизации в ADP
user: ${POSTGRES_USERNAME:user}
# пароль для авторизации в ADP
password: ${POSTGRES_PASS:password}
# сетевой адрес хоста с ADP и номер порта на хосте
hosts: ${POSTGRES_HOSTS:postgres.host:5432}
# максимальный размер пула потоков
poolSize: ${POSTGRES_POOLSIZE:10}
pipeliningLimit: ${POSTGRES_PIPELINING_LIMIT:1}
# максимальный размер кэша запроса prepared statement
preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
# максимальный размер запроса prepared statement, который может быть закэширован
preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
# признак кэширования запросов prepared statement
preparedStatementsCache: ${POSTGRES_CACHE:true}
# максимальный размер результата, возвращаемого по FETCH-запросу к ADP
fetchSize: ${POSTGRES_FETCH_SIZE:10000}
# настройки для работы с брокером сообщений Kafka
kafka:
client:
property:
# сериализатор строковых ключей
key.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
# сериализатор строковых значений
value.serializer: org.apache.kafka.common.serialization.ByteArraySerializer
Конфигурация коннектора Kafka-Postgres writer
# настройки журналирования
logging:
# уровни журналирования событий
level:
# уровень важности сообщений, записываемых в лог-файл
ru.datamart.kafka: ${LOG_LEVEL:DEBUG}
# уровень важности сообщений, записываемых в лог-файл по событиям Kafka
org.apache.kafka: ${KAFKA_LOG_LEVEL:INFO}
# настройки HTTP-подключений
http:
# порт, на котором работает коннектор
port: ${SERVER_PORT:8096}
# настройки для работы с Vertx
vertx:
# настройки пулов Vertx
pools:
# максимальный размер пула потоков, обрабатывающих события Vertx
eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
# максимальный размер пула потоков, выполняющих долгие операции
workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
# настройки вертиклов Vertx
verticle:
query:
# количество экземпляров вертиклов, обрабатывающих запросы
instances: ${QUERY_VERTICLE_INSTANCES:12}
insert:
# максимальный размер пула потоков, вставляющих данные
poolSize: ${INSERT_WORKER_POOL_SIZE:32}
# периодичность вставки новых данных (в миллисекундах)
insertPeriodMs: ${INSERT_PERIOD_MS:1000}
# размер пакета операций при вставке данных
batchSize: ${INSERT_BATCH_SIZE:500}
consumer:
# максимальный размер пула потоков, считывающих данные
poolSize: ${KAFKA_CONSUMER_WORKER_POOL_SIZE:32}
# максимальный размер результата, возвращаемого по FETCH-запросу к ADP
maxFetchSize: ${KAFKA_CONSUMER_MAX_FETCH_SIZE:10000}
commit:
# размер пула потоков, записывающих смещение в топиках Kafka
poolSize: ${KAFKA_COMMIT_WORKER_POOL_SIZE:1}
# периодичность записи смещения в топиках Kafka (в миллисекундах)
commitPeriodMs: ${KAFKA_COMMIT_WORKER_COMMIT_PERIOD_MS:1000}
# настройки для работы с брокером сообщений Kafka
client:
kafka:
# настройки консьюмера (потребителя) Kafka
consumer:
# периодичность проверки (в миллисекундах) статуса брокера сообщений Kafka
checkingTimeoutMs: ${KAFKA_CHECKING_TIMEOUT_MS:10000}
# время ожидания (в миллисекундах) ответа от брокера сообщений Kafka до тайм-аута
responseTimeoutMs: ${KAFKA_RESPONSE_TIMEOUT_MS:10000}
# количество консьюмеров Kafka
consumerSize: ${KAFKA_CONSUMER_SIZE:10}
# время ожидания (в миллисекундах) до закрытия соединения с брокером сообщений Kafka
closeConsumersTimeout: ${KAFKA_CLOSE_CONSUMER_TIMEOUT:15000}
# свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)
property:
# список адресов и портов Bootstrap-серверов брокера сообщений Kafka
bootstrap.servers: ${KAFKA_BOOTSTRAP_SERVERS:kafka.host:9092}
# режим обнуления смещения в топиках Kafka
auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
# признак автоматической записи смещения в топиках Kafka
enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
# периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}
# настройки для работы с ADP
datasource:
postgres:
# имя базы данных ADP
database: ${POSTGRES_DB_NAME:db}
# имя пользователя/логин для авторизации в ADP
user: ${POSTGRES_USERNAME:user}
# пароль для авторизации в ADP
password: ${POSTGRES_PASS:password}
# сетевой адрес хоста с ADP и номер порта на хосте
hosts: ${POSTGRES_HOSTS:postgres.host:5432}
# максимальный размер пула потоков
poolSize: ${POSTGRES_POOLSIZE:10}
# максимальный размер кэша запроса prepared statement
preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
# максимальный размер запроса prepared statement, который может быть закэширован
preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
# признак кэширования запросов prepared statement
preparedStatementsCache: ${POSTGRES_CACHE:true}
# количество попыток установки соединения (0 — количество не ограничено, 1 — без повторных попыток)
connectRetryCount: ${POSTGRES_CONNECT_RETRY_COUNT:3}
# время ожидания между попытками установки соединения в миллисекундах
connectRetryTimeoutMs: ${POSTGRES_CONNECT_RETRY_TIMEOUT_MS:1000}
Конфигурация коннектора Kafka Jet writer
# настройки журналирования событий
logging:
# уровни журналирования
level:
# уровень важности сообщений, записываемых в лог-файл
ru.datamart.jet: ${LOG_LEVEL:DEBUG}
# настройки HTTP-подключений
http:
# порт, на котором работает коннектор
port: ${SERVER_PORT:8080}
# настройки для работы с Vertx
vertx:
# настройки пулов Vertx
pools:
# максимальный размер пула потоков, обрабатывающих события Vertx
eventLoopPoolSize: ${VERTX_EVENT_LOOP_SIZE:12}
# максимальный размер пула потоков, выполняющих долгие операции
workersPoolSize: ${VERTX_WORKERS_POOL_SIZE:32}
# настройки вертиклов Vertx
verticle:
query:
# количество экземпляров вертиклов, обрабатывающих запросы
instances: ${QUERY_VERTICLE_INSTANCES:12}
postgres:
# количество экземпляров вертиклов, выполняющих запросы в postgres
instances: ${POSTGRES_VERTICLE_INSTANCES:12}
# настройки для работы с брокером сообщений Kafka
client:
kafka:
# настройки консьюмера (потребителя) Kafka
consumer:
# свойства консьюмера в соответствии с конфигурацией консьюмеров Kafka (https://kafka.apache.org/documentation/#consumerconfigs)
properties:
# режим обнуления смещения в топиках Kafka
auto.offset.reset: ${KAFKA_AUTO_OFFSET_RESET:earliest}
# признак автоматической записи смещения в топиках Kafka
enable.auto.commit: ${KAFKA_AUTO_COMMIT:false}
# периодичность (в миллисекундах) автоматической записи смещения в топиках Kafka
auto.commit.interval.ms: ${KAFKA_AUTO_INTERVAL_MS:1000}
datasource:
# настройки для работы с ADP
postgres:
# максимальный размер пула потоков
poolSize: ${POSTGRES_POOLSIZE:10}
# максимальный размер кэша запроса prepared statement
preparedStatementsCacheMaxSize: ${POSTGRES_CACHE_MAX_SIZE:256}
# максимальный размер запроса prepared statement, который может быть закэширован
preparedStatementsCacheSqlLimit: ${POSTGRES_CACHE_SQL_LIMIT:2048}
# признак кэширования запросов prepared statement
preparedStatementsCache: ${POSTGRES_CACHE:true}
# ограничение количества одновременных запросов в рамках одного подключения к postgres
pipeliningLimit: ${POSTGRES_PIPELINING_LIMIT:1}
# количество попыток установки соединения (0 — количество не ограничено, 1 — без повторных попыток)
connectRetryCount: ${POSTGRES_CONNECT_RETRY_COUNT:3}
# время ожидания между попытками установки соединения в миллисекундах
connectRetryTimeoutMs: ${POSTGRES_CONNECT_RETRY_TIMEOUT_MS:1000}