Сборка и развертывание минимальной конфигурации
Содержание раздела
В этом разделе описаны шаги по развертыванию среды в конфигурации, предполагающей единственный датасорс типа ADP, а также отсутствие брокера сообщений Kafka и коннекторов для загрузки данных из Kafka.
Предустановленные программные средства
- OC Centos 7;
- yum-utils;
- curl;
- git;
- wget;
- OpenJDK 8;
- Apache Maven 3.6.3;
- СУБД PostgreSQL 13;
- Apache ZooKeeper;
- SQL-клиент, например DBeaver.
Сборка Prostore
# клонирование репозитория Prostore
git clone https://repository.datamart.ru/datamarts/prostore ~/prostore
# запуск сборки Prostore средствами Apache Maven
cd ~/prostore
mvn clean install -DskipTests=true
# создание символической ссылки на файл конфигурации
sudo ln -s ~/prostore/dtm-query-execution-core/config/application.yml ~/prostore/dtm-query-execution-core/target/application.yml
# приведение конфигурационного файла к виду, показанному ниже
sudo nano ~/prostore/dtm-query-execution-core/config/application.yml
конфигурационный файл Prostore `application.yml`
logging:
level:
ru.datamart.prostore.query.execution: ${DTM_LOGGING_LEVEL:TRACE}
server:
port: ${DTM_METRICS_PORT:8080}
management:
endpoints:
enabled-by-default: ${DTM_METRICS_ENABLED:true}
web:
exposure:
include: ${DTM_METRICS_SCOPE:info, health}
core:
plugins:
active: ${CORE_PLUGINS_ACTIVE:ADP}
http:
port: ${DTM_CORE_HTTP_PORT:9090}
tcpNoDelay: ${DTM_CORE_HTTP_TCP_NO_DELAY:true}
tcpFastOpen: ${DTM_CORE_HTTP_TCP_FAST_OPEN:true}
tcpQuickAck: ${DTM_CORE_HTTP_TCP_QUICK_ACK:true}
env:
name: ${DTM_NAME:test}
restoration:
autoRestoreState: ${AUTO_RESTORE_STATE:true}
matviewsync:
periodMs: ${MATERIALIZED_VIEWS_SYNC_PERIOD_MS:5000}
retryCount: ${MATERIALIZED_VIEWS_RETRY_COUNT:10}
maxConcurrent: ${MATERIALIZED_VIEWS_CONCURRENT:2}
ddlqueue:
enabled: ${CORE_DDL_QUEUE_ENABLED:true}
datasource:
edml:
defaultChunkSize: ${EDML_DEFAULT_CHUNK_SIZE:1000}
pluginStatusCheckPeriodMs: ${EDML_STATUS_CHECK_PERIOD_MS:1000}
firstOffsetTimeoutMs: ${EDML_FIRST_OFFSET_TIMEOUT_MS:15000}
changeOffsetTimeoutMs: ${EDML_CHANGE_OFFSET_TIMEOUT_MS:10000}
zookeeper:
connection-string: ${ZOOKEEPER_DS_ADDRESS:localhost}
connection-timeout-ms: ${ZOOKEEPER_DS_CONNECTION_TIMEOUT_MS:30000}
session-timeout-ms: ${ZOOKEEPER_DS_SESSION_TIMEOUT_MS:86400000}
chroot: ${ZOOKEEPER_DS_CHROOT:/adtm}
kafka:
producer:
property:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
cluster:
zookeeper:
connection-string: ${ZOOKEEPER_KAFKA_ADDRESS:localhost}
connection-timeout-ms: ${ZOOKEEPER_KAFKA_CONNECTION_TIMEOUT_MS:30000}
session-timeout-ms: ${ZOOKEEPER_KAFKA_SESSION_TIMEOUT_MS:86400000}
chroot: ${ZOOKEEPER_KAFKA_CHROOT:}
admin:
inputStreamTimeoutMs: ${KAFKA_INPUT_STREAM_TIMEOUT_MS:2000}
status.event.publish:
topic: ${KAFKA_STATUS_EVENT_TOPIC:status.event.topic}
enabled: ${KAFKA_STATUS_EVENT_ENABLED:false}
vertx:
blocking-stacktrace-time: ${DTM_VERTX_BLOCKING_STACKTRACE_TIME:1}
pool:
worker-pool: ${DTM_CORE_WORKER_POOL_SIZE:20}
event-loop-pool: ${DTM_CORE_EVENT_LOOP_POOL_SIZE:20}
task-pool: ${DTM_CORE_TASK_POOL_SIZE:20}
task-timeout: ${DTM_CORE_TASK_TIMEOUT:86400000}
delta-watcher-pool: ${DTM_DELTA_WATCHER_POOL_SIZE:10}
cache:
initialCapacity: ${CACHE_INITIAL_CAPACITY:100000}
maximumSize: ${CACHE_MAXIMUM_SIZE:100000}
expireAfterAccessMinutes: ${CACHE_EXPIRE_AFTER_ACCESS_MINUTES:99960}
delta:
rollback-status-calls-ms: ${DELTA_ROLLBACK_STATUS_CALLS_MS:2000}
statistics:
enabled: ${CORE_STATISTICS_ENABLED:true}
threadsCount: ${CORE_STATISTICS_THREADS_COUNT:2}
dataCountEnabled: ${CORE_STATISTICS_DATA_COUNT_ENABLED:true}
adp:
datasource:
- name: ADP
env: ${ADP_ENV_NAME:${DTM_NAME:test}}
user: ${ADP_USERNAME:dtm}
password: ${ADP_PASS:}
host: ${ADP_HOST:localhost}
port: ${ADP_PORT:5432}
poolSize: ${ADP_MAX_POOL_SIZE:3}
executorsCount: ${ADP_EXECUTORS_COUNT:3}
fetchSize: ${ADP_FETCH_SIZE:1000}
poolRequestTimeout: ${ADP_POOL_REQUEST_TIMEOUT:0}
preparedStatementsCacheMaxSize: ${ADP_PREPARED_CACHE_MAX_SIZE:256}
preparedStatementsCacheSqlLimit: ${ADP_PREPARED_CACHE_SQL_LIMIT:2048}
preparedStatementsCache: ${ADP_PREPARED_CACHE:true}
mppw:
restStartLoadUrl: ${ADP_REST_START_LOAD_URL:http://localhost:8096/newdata/start}
restStopLoadUrl: ${ADP_REST_STOP_LOAD_URL:http://localhost:8096/newdata/stop}
restVersionUrl: ${ADP_MPPW_CONNECTOR_VERSION_URL:http://localhost:8096/versions}
kafkaConsumerGroup: ${ADP_KAFKA_CONSUMER_GROUP:adp-load}
mppr:
restLoadUrl: ${ADP_MPPR_QUERY_URL:http://localhost:8094/query}
restVersionUrl: ${ADP_MPPR_CONNECTOR_VERSION_URL:http://localhost:8094/versions}
Далее конфигурационный файл application.yml
обозначается термином «конфигурация Prostore».
Настройка СУБД Postgres
# создание в СУБД Postgres SUPERUSER-пользователя c именем и паролем,
# указанными в конфигурации Prostore
# (значения параметров (adp:datasource:user) и (adp:datasource:password) соответственно)
cd /
sudo -u postgres psql -c 'CREATE ROLE dtm WITH LOGIN SUPERUSER'
sudo -u postgres psql -c "ALTER ROLE dtm WITH PASSWORD 'dtm'"
# создание базы данных с именем test, указанным в конфигурации Prostore (env: name)
sudo -u postgres psql -c 'CREATE DATABASE test'
# перезапуск сервиса Postgresql
sudo systemctl reload postgresql-13
Запуск сервиса Apache ZooKeeper
# запуск одного экземпляра сервера ZooKeeper, если он еще не запущен
sudo systemctl start zookeeper
sudo systemctl status zookeeper
Запуск Prostore
Запуск с номером порта для сбора метрик, указанным в конфигурации Prostore (по умолчанию — 8080):
# запуск файла dtm-query-execution-core-<version>.jar (например, dtm-query-execution-core-5.1.0.jar)
cd ~/prostore/dtm-query-execution-core/target
java -jar dtm-query-execution-core-<version>.jar
Чтобы запустить Prostore с другим номером порта для сбора метрик, задайте нужное значение с помощью параметра конфигурации server:port
или переменной окружения DTM_METRICS_PORT
. Подробнее о параметрах конфигурации и способах их переопределения см. в разделе Конфигурация ноды.
Подключение к Prostore с помощью SQL-клиента
Чтобы подключиться к системе, следуйте инструкциям в разделе Подключение с помощью SQL-клиента.
Демонстрационный сценарий
Создание необходимых логических сущностей
-- создание логической базы данных
CREATE DATABASE marketing;
-- выбор логической БД по умолчанию
USE marketing;
-- создание логической таблицы в БД marketing
CREATE TABLE sales (
id BIGINT NOT NULL,
transaction_date TIMESTAMP NOT NULL,
product_code VARCHAR(256) NOT NULL,
product_units BIGINT NOT NULL,
store_id BIGINT NOT NULL,
description VARCHAR(256),
PRIMARY KEY (id)
)
DISTRIBUTED BY (id);
-- создание логического представления stores_by_sold_products
CREATE VIEW stores_by_sold_products AS
SELECT store_id, SUM(product_units) AS product_amount
FROM sales
GROUP BY store_id
ORDER BY product_amount DESC
LIMIT 30;
Вставка данных
-- открытие новой (горячей) дельты
BEGIN DELTA;
-- запуск вставки данных в логическую таблицу sales
INSERT INTO sales
(id, transaction_date, product_code, product_units, store_id, description)
VALUES
(2000111, '2020-05-01 13:14:16', 'ABC202010', 7, 1000000123, 'Покупка без акций'),
(2000112, '2020-05-02 16:13:17', 'ABC202011', 11, 1000000456, 'Покупка без акций'),
(2000113, '2020-05-03 21:15:17', 'ABC202012', 5, 1000000789, 'Покупка без акций'),
(2000114, '2020-05-04 23:03:13', 'ABC202013', 7, 1000000123, 'Покупка без акций'),
(2000115, '2020-05-05 14:10:21', 'ABC202014', 21, 1000000623, 'Покупка без акций'),
(2000116, '2020-06-12 08:43:56', 'ABC202015', 32, 1000000987, 'Покупка без акций');
-- закрытие дельты
COMMIT DELTA;
Выборка данных
-- запрос с неявным указанием столбцов и ключевым словом WHERE
SELECT * FROM sales
WHERE store_id = 1000000123;
-- запрос с агрегацией, группировкой и сортировкой данных, а также выбором первых 5 строк
SELECT s.store_id, SUM(s.product_units) AS product_amount
FROM sales AS s
GROUP BY (s.store_id)
ORDER BY product_amount DESC
LIMIT 5;
-- запрос к логическому представлению stores_by_sold_products
SELECT * FROM stores_by_sold_products;
Удаление логических сущностей
-- удаление логического представления stores_by_sold_products
DROP VIEW stores_by_sold_products;