In this article, we will install winlogbeat in Windows Server 2019(10.250.2.224) in order to monitor windows event logs, make necessary configurations to transfer event logs to logstash, and run winlogbeat as a service.
Let’s download the winlogbeat from https://www.elastic.co/downloads/beats/winlogbeat on our server and unzip it to the C drive.
Let’s make a few notes about the files in the Winlogbeat folder.
What is winlogbeat.yml File?
The winlogbeat.yml file is the basic config file and we enter information about which windows events it should monitor and how long it should monitor these logs. In addition, we enter the logstash information that will transfer these logs, into this file.
What is winlogbeat.reference.yml File?
The winlogbeat.reference.yml file provides us with a full configuration example.
What is install-service-winlogbeat.ps1 File?
Thanks to the install-service-winlogbeat.ps1 power shell script file, we run the winlogbeat as a service.
Configure winlogbeat.yml
Now let’s do the configurations in winlogbeat.yml config file. According to this configuration; We specify that we want to monitor Application, System, Security and Microsoft-Windows-Sysmon logs in windows eventlogs. And we state that we will ignore Application logs before 72 hours, and we state that we want to send these logs to port 5043 of logstash running at 10.250.2.222.
Start winlogbeat Service
Now, when we run the install-service-winlogbeat.ps1 power shell script, the winlogbeat will be ready to run as a service. As soon as we run the winlogbeat service, it will start sending logs to logstash.
PS C:\winlogbeat—7.6.2—windows—x86_64\winlogbeat—7.6.2—windows—x86_64> .\install—service—winlogbeat.ps1 PS C:\winlogbeat—7.6.2—windows—x86_64\winlogbeat—7.6.2—windows—x86_64> Start—Service winlogbeat |
Now Winlogbeat has started to work as a service, we can confirm this from the task manager. However, since we do not configure logstash at the moment, it keeps event logs on itself. When we complete the logstash configuration, logs will be successfully transferred to logstash. In my next article I will be configuring the logstash.
You can read our other articles in this serie from the links below.
Install Elasticsearch on ubuntu server 19.10
Install Logstash on Ubuntu Server 19.10
Install Kibana on Ubuntu Server 19.10
Install Winlogbeat on Windows Server 2019
Configure Logstash to Read log files Windows
Create Kibana Dashboards For Windows Event Logs
Передо мной встала задача сбора логов с парка серверов на ОС Windows и ОС Linux. Для того чтобы решить её я воспользовался стэком OpenSearch. Во время настройки OpenSearch мне не хватало в открытых источниках наглядных примеров, а информация на официальных сайтах ElasticSearch и OpenSearch мне показалась обрывочной, слабо привязанной к реальным ситуациям. Поэтому я решил поделиться своим опытом и описать основные моменты установки и некоторые сценарии настройки и применения стэка OpenSearch, которые я применил в своей практике.
1. Введение
1.1. Коротко о том, что такое OpenSearch
OpenSearch – система полнотекстового поиска. Является форком Elasticsearch. Разработана компанией A9.com, дочерней компанией Amazon.com.
Стэк OpenSearch (OpenSearch + Logstash-oss-with-OpenSearch-output-plugin + OpenSearch–Dashboards) является бесплатным аналогом с отрытым исходным кодом стэку ELK (Elasticsearch + Logstash + Kibana). В совокупности с Beat’ами (Filebeat, Winlogbeat, и т.д.) образуют полный цикл управления логами: сбор, систематизация, поиск.
Можно найти и множество других применений этому стэку, но в этой статье речь пойдет главным образом именно о логах.
1.2. Коротко о форках Elasticsearch
Elasticsearch развивается под этим названием с 2010 года. С 2018 года, с версии 6.3 лицензия на ELK меняется, появляется платная и бесплатная версия. Позже Amazon создает форк Elasticsearch и называет его «Open Distro for Elasticsearch». А в 2021 году Amazon создает новый форк и называет его «OpenSearch». Немного позже Российская компания Arenadata становится официальным партнёром OpenSearch в России и выпускает отечественный форк под названием «Arenadata LogSearch (ADLS)» под платной лицензией.
1.3. Что и зачем будем настраивать
Стэк OpenSearch мы будем разворачивать из архивов, это особенно актуально в закрытых средах. Кроме того, мы не будем использовать Kubernetes и Docker, то есть будем производить установку в классической среде.
В качестве ОС под OpenSearch будем использовать Linux Ubuntu 20.04.
Логи будут транспортироваться с серверов, на которых они возникли на сервер OpenSearch в виде сообщений. Для того, чтобы гарантировать доставку сообщений, в качестве транспорта мы будем использовать сервис Apache Kafka.
Apache Kafka – это распределённый программный брокер сообщений, который гарантирует доставку сообщений.
Мы введем OpenSearch в кластер, но будет описан ввод только основной ноды.
Кластер OpenSearch позволяет решать сразу несколько задач. Во-первых, традиционно, кластер решает задачу масштабирования, распределения нагрузки и отказоустойчивости. Во-вторых, данные накопленные в OpenSearch, можно распределять по их значимости на «Горячие» («Hot»), «Теплые» («Warm») и «Холодные» («Cold»). А ноды кластера можно настроить так чтобы они принимали только «Горячие» данные, или «Теплые» данные, или только «Холодные». И если на ноду под «Холодные» данные выделять менее производительные мощности, то это позволит сэкономить ресурсы.
1.4. Настраиваемая схема
На серверы, с которых нужно собирать логи, мы установим Beat’ы. На серверы под управлением ОС Windows мы установим Filebeat и Winlogbeat. На серверы под управлением Linux мы установим только Filebeat. Beat’ы будут отправлять сообщения с логами в Kafk’у. Logstash будет брать эти сообщения из Kafka, обрабатывать их и отправлять в OpenSearch. Пользователь будет осуществлять просмотр и поиск по логам через OpenSearch-Dashboards. Схема взаимодействия представлена на Рис.1.
Рис.1
Договоримся, что машина Node OpenSearch будет иметь:
1. Имя «server-elk01»;
2. Ip адрес «10.0.0.70».
Машина с Kafka будет иметь:
1. Ip адрес «10.0.0.60».
2. Установка стэка OpenSearch
2.1. Подготовка Linux машины Node OpenSearch
Перед началом установки основных пакетов установим Java и Unzip, а также создадим пользователей для OpenSearch и Logstash.
1. Установка Java:
apt install openjdk-11-jdk
2. Установка unzip:
apt install unzip
3. Создание пользователя opensearch:
groupadd opensearch
useradd opensearch -g opensearch -M -s /bin/bash
passwd opensearch
4. Создание пользователя logstash:
groupadd logstash
useradd logstash -g logstash -M -s /bin/bash
passwd logstash
2.2. Установка OpenSearch (аналог ElasticSearch)
OpenSearch, как можно было заметить, является ядром всего стэка. OpenSearch содержит базу данных, в которой будут храниться логи. Кроме того, OpenSearch имеет API для обработки запросов как на ввод данных, так и на вывод. Так же OpenSearch индексирует поступившие в неё данные и осуществляет поиск по этим данным.
Приступаем к установке.
1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать OpenSearch версии 1.2.4 (https://artifacts.opensearch.org/releases/bundle/opensearch/1.2.4/opensearch-1.2.4-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.
2. Даем права на выполнение для архива:
chmod +x opensearch-1.2.4-linux-x64.tar.gz
3. Распаковываем архив:
tar -xf opensearch-1.2.4-linux-x64.tar.gz
4. Будем устанавливать OpenSearch в каталог «/opt/opensearch», поэтому создаем рабочий каталог для OpenSearch:
mkdir /opt/opensearch
5. Переносим распакованные данные в рабочий каталог:
mv ./opensearch-1.2.4/* /opt/opensearch
6. Удаляем каталог, оставшийся от распаковки:
rmdir ./opensearch-1.2.4
7. Делаем пользователя opensearch владельцем рабочего каталога OpenSearch:
chown -R opensearch:opensearch /opt/opensearch
8. Запускает установочный скрипт от имени пользователя opensearch:
sudo -u opensearch /opt/opensearch/opensearch-tar-install.sh
Дожидаемся сообщения «Node ‘server-elk01’ initialized» и нажимаем «Ctrl+C».
9. Создаем файл демона для работы OpenSearch:
nano /lib/systemd/system/opensearch.service
/lib/systemd/system/opensearch.service
[Unit]
Description=Opensearch
Documentation=https://opensearch.org/docs/latest
Wants=network-online.target
After=network-online.target
[Service]
Type=simple
RuntimeDirectory=opensearch
PrivateTmp=true
Restart=on-failure
RestartSec=60s
WorkingDirectory=/opt/opensearch
User=opensearch
Group=opensearch
ExecStart=/opt/opensearch/bin/opensearch
StandardOutput=journal
StandardError=inherit
# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65535
# Specifies the maximum number of processes
LimitNPROC=4096
# Specifies the maximum size of virtual memory
LimitAS=infinity
# Specifies the maximum file size
LimitFSIZE=infinity
# Not use SWAP
LimitMEMLOCK=infinity
# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0
# Allow a slow startup before the systemd notifier module kicks in to extend the timeout
TimeoutStartSec=75
[Install]
WantedBy=multi-user.target
За основу файла демона я взял файл оригинального демона Elasticsearch и переработал его.
10. Делаем пользователя root владельцем файла демона:
chown -R root:root /lib/systemd/system/opensearch.service
11. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:
systemctl daemon-reload
12. Создаем каталог для логов opensearch:
mkdir /var/log/opensearch
13. Делаем пользователя opensearch владельцем каталога логов:
chown -R opensearch /var/log/opensearch
2.3. Настройки производительности
Для того чтобы получить высокую производительность недостаточно иметь хорошее железо, нужно еще и правильно настроить систему. Попробуем оптимизировать несколько параметров.
1. Настройки Java:
Производятся в файле «/opt/opensearch/config/jvm.options». Необходимо настроить два параметра: «Xmx» и «Xms».
Оба параметра рекомендуется установить в значение равное 50% от имеющейся физической памяти узла. Чем больше доступно памяти, тем лучше, но есть ограничения. Значения параметров «Xmx» и «Xms» не должны превышать значение параметра JVM «compressed object pointers» (по умолчанию равен 32 Gb). И, значения параметров «Xmx» и «Xms» не должны превышать значение параметра JVM «zero-based compressed oops» (по умолчанию равен 26 Gb).
У меня на машине «server-elk01» установлено 8 Gb оперативной памяти поэтому настройки получились такие:
nano /opt/opensearch/config/jvm.options
/opt/opensearch/config/jvm.options
2. Настройка виртуальной памяти:
nano /etc/sysctl.conf
/etc/sysctl.conf
...
# Добавить в конце файла
vm.max_map_count=262144
Для того, чтобы изменения вступили в силу без перезагрузки хоста можно выполнить следующую команду в консоли:
sysctl -w vm.max_map_count=262144
2.4. Первый запуск OpenSearch
На этом этапе уже все готово к тому, чтобы запустить OpenSearch. Глубокой настройкой займемся позже, а сейчас просто запустим OpenSearch.
1. Запустим настроенный нами демон OpenSearch:
systemctl start opensearch
2. Проверим статус запуска демона OpenSearch:
systemctl status opensearch
3. Настроим автозапуск демона OpenSearch:
systemctl enable opensearch
4. Проверим работоспособность демона OpenSearch:
curl -X GET https://localhost:9200 -u 'admin:admin' --insecure
Если вы устанавливаете OpenSearch в ознакомительных целях, то можете считать установку законченной и приступать к использованию OpenSearch, правда только в консольном режиме, Web-интерфейс (OpenSearch-Dashboards) мы еще не устанавливали.
2.5. Смена пароля учетной записи OpenSearch
По умолчанию в OpenSearch предустановлена админская учетная запись «admin» с паролем «admin», поэтому для повышения уровня безопасности сменим пароль этой учетной записи. Для этого выполним следующие шаги.
1. Остановим демон OpenSearch:
systemctl stop opensearch
2. Дадим права на выполнение скрипта получения хэша пароля:
chmod +x /opt/opensearch/plugins/opensearch-security/tools/hash.sh
3. Запускаем скрипт и вводим новый пароль «yN-3L(GMmAAw»:
/opt/opensearch/plugins/opensearch-security/tools/hash.sh
Запоминаем полученный хэш, например:
$2y$12$OCvWNlMu8VbOarfdXdcjPOnHarqktJIcTYjwoykXdaJJfjcCTmfXO
К слову, я указал реальный хэш к описанному паролю, в тестах можно их использовать.
4. Заменим хэш пароля пользователя admin:
nano /opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml
/opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml
...
admin:
hash: "$2y$12$OCvWNlMu8VbOarfdXdcjPOnHarqktJIcTYjwoykXdaJJfjcCTmfXO"
...
5. Дадим права на выполнение скрипта для создания новых сертификатов:
chmod +x /opt/opensearch/plugins/opensearch-security/tools/securityadmin.sh
6. Перейдём в каталог со скриптом:
cd /opt/opensearch/plugins/opensearch-security/tools
7. Запустим демон OpenSearch:
systemctl start opensearch
8. Запускаем скрипт для создания новых сертификатов:
./securityadmin.sh -cd ../securityconfig/ -icl -nhnv \
-cacert ../../../config/root-ca.pem \
-cert ../../../config/kirk.pem \
-key ../../../config/kirk-key.pem
9. Проверяем работоспособность OpenSearch с новым паролем:
curl -X GET https://localhost:9200 -u 'admin:yN-3L(GMmAAw' --insecure
Аналогичным образом можно сменить пароль и у других учетных записей, которые можно обнаружить в «/opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml», если вам это необходимо.
2.6. Переводим OpenSearch в режим кластера
Как я писал выше в реализуемой схеме мы введем OpenSearch в кластер, но введем только одну ноду. Потому как задачи перед кластером можно поставить разные то и настройки, и схемы могут сильно отличаться. Если вам потребуется только одна машина с OpenSearch, то можно и не вводить её в кластер, тогда этот шаг можно пропустить. А мы приступим.
1. Остановим демон OpenSearch:
systemctl stop opensearch
2. Добавляем параметры в настройки OpenSearch:
nano /opt/opensearch/config/opensearch.yml
/opt/opensearch/config/opensearch.yml
# ------------------------------------ Node ------------------------------------
# Имя ноды:
node.name: os-node01-server-elk01
# Роли узла:
node.roles: [ master, data ]
#
# ---------------------------------- Network -----------------------------------
# Адрес узла - принимать на любых адресах:
network.host: 0.0.0.0
# Порт:
http.port: 9200
#
# ---------------------------------- Cluster -----------------------------------
# Имя кластера:
cluster.name: os_cluster
# Список узлов в голосовании по выбору master узла:
cluster.initial_master_nodes: ["os-node01"]
#
# --------------------------------- Discovery ----------------------------------
# Список master узлов кластера:
discovery.seed_hosts: ["10.0.0.70"]
#
# ----------------------------------- Paths ------------------------------------
# Директория с данными:
path.data: /opt/opensearch/data
# Директория с логами:
path.logs: /var/log/opensearch
#
######## Start OpenSearch Security Demo Configuration ########
# WARNING: revise all the lines below before you go into production
plugins.security.ssl.transport.pemcert_filepath: esnode.pem
plugins.security.ssl.transport.pemkey_filepath: esnode-key.pem
plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem
plugins.security.ssl.transport.enforce_hostname_verification: false
plugins.security.ssl.http.enabled: true
plugins.security.ssl.http.pemcert_filepath: esnode.pem
plugins.security.ssl.http.pemkey_filepath: esnode-key.pem
plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem
plugins.security.allow_unsafe_democertificates: true
plugins.security.allow_default_init_securityindex: true
plugins.security.authcz.admin_dn:
- CN=kirk,OU=client,O=client,L=test, C=de
plugins.security.authcz.type: internal_opensearch
plugins.security.enable_snapshot_restore_privileges: true
plugins.security.check_snapshot_restore_write_privileges: true
plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"]
plugins.security.system_indices.enabled: true
plugins.security.system_indices.indices: [".opendistro-alerting-config", ".opendistro-alerting-alert*", ".opendistro-anomaly-results*", ".opendistro-anomaly-detector*", ".opendistro-anomaly-checkpoints", ".opendistro-anomaly-detection-state", ".opendistro-reports-*", ".opendistro-notifications-*", ".opendistro-notebooks", ".opendistro-observability", ".opendistro-asynchronous-search-response*", ".opendistro-metadata-store"]
node.max_local_storage_node: 3
######## End OpenSearch Security Demo Configuration ########
Обращаем внимание на роль ноды (master, data). Роль «master» — означает, что нода управляет кластером. Роль «data» — означает, что нода содержит базу данных.
3. Запускаем демон OpenSearch:
systemctl start opensearch
4. Проверим работоспособность OpenSearch:
curl -X GET https://localhost:9200 -u 'admin:yN-3L(GMmAAw' --insecure
5. Проверим состояние кластера:
curl -X GET https://localhost:9200/_cluster/health?pretty -u 'admin:yN-3L(GMmAAw' --insecure
6. Выясним кто мастер в кластере:
curl -X GET https://localhost:9200/_cat/master?pretty -u 'admin:yN-3L(GMmAAw' --insecure
2.7. Установка OpenSearch-Dashboards (аналог Kibana)
OpenSearch-Dashboards является Web-интерфейсом для OpenSearch. Для тех кто собирается работать с OpenSearch из консоли этот шаг можно и пропустить. Так же вместо OpenSearch-Dashboards в качестве Web-интерфейса можно установить и другие утилиты, например, «Grafana».
Приступим к установке OpenSearch-Dashboards.
1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать OpenSearch-Dashboards версии 1.2.0 (https://artifacts.opensearch.org/releases/bundle/opensearch-dashboards/1.2.0/opensearch-dashboards-1.2.0-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.
2. Даем права на выполнение для архива:
chmod +x opensearch-dashboards-1.2.0-linux-x64.tar.gz
3. Распаковываем архив:
tar -xf opensearch-dashboards-1.2.0-linux-x64.tar.gz
4. Будем устанавливать OpenSearch-Dashboards в каталог «/opt/opensearch-dashboards», поэтому создаем рабочий каталог для OpenSearch-Dashboards:
mkdir /opt/opensearch-dashboards
5. Переносим распакованные данные в рабочий каталог:
mv ./opensearch-dashboards-1.2.0-linux-x64/* ./opensearch-dashboards-1.2.0-linux-x64/.* /opt/opensearch-dashboards/
6. Удаляем каталог, оставшийся от распаковки:
rmdir ./opensearch-dashboards-1.2.0-linux-x64
7. Делаем пользователя opensearch владельцем рабочего каталога OpenSearch-Dashboards:
chown -R opensearch:opensearch /opt/opensearch-dashboards
По умолчанию OpenSearch-Dashboards доступен по порту 5601. Можно изменить порт в настройках OpenSearch-Dashboards, однако тогда придется запускать сервис под учетной записью root.
Поэтому оставим порт по умолчанию и позже настроим переадресацию через NGINX.
Для информации, для смены порта по умолчанию необходимо внести следующие изменения:
/opt/opensearch-dashboards/config/opensearch_dashboards.yml
nano /opt/opensearch-dashboards/config/opensearch_dashboards.yml
...
# Добавить в конец файла
server.port: 80
server.host: 10.0.0.70
8. Создаем файл демона для работы OpenSearch-Dashboards:
nano /lib/systemd/system/opensearch_dashboards.service
/lib/systemd/system/opensearch_dashboards.service
[Unit]
Description=Opensearch_dashboards
Documentation=https://opensearch.org/docs/latest
Wants=network-online.target
After=network-online.target
[Service]
Type=simple
RuntimeDirectory=opensearch_dashboards
PrivateTmp=true
WorkingDirectory=/opt/opensearch-dashboards
User=opensearch
Group=opensearch
ExecStart=/opt/opensearch-dashboards/bin/opensearch-dashboards
StandardOutput=journal
StandardError=inherit
# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65535
# Specifies the maximum number of processes
LimitNPROC=4096
# Specifies the maximum size of virtual memory
LimitAS=infinity
# Specifies the maximum file size
LimitFSIZE=infinity
# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0
# Allow a slow startup before the systemd notifier module kicks in to extend the timeout
TimeoutStartSec=75
[Install]
WantedBy=multi-user.target
9. Делаем пользователя root владельцем файла демона:
chown -R root:root /lib/systemd/system/opensearch_dashboards.service
10. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:
systemctl daemon-reload
11. Запустим настроенный нами демон OpenSearch-Dashboards:
systemctl start opensearch_dashboards
12. Проверим статус запуска демона OpenSearch-Dashboards:
systemctl status opensearch_dashboards
13. Настроим автозапуск демона OpenSearch-Dashboards:
systemctl enable opensearch_dashboards
2.8. Установка NGINX и настройка переадресации порта OpenSearch-Dashboards
OpenSearch-Dashboards работает на порту 5601, но это неудобно, так как портом для Web-ресурсов по умолчанию является порт 80. Мы установим NGINX и настроим его так, чтобы он слушал 80 порт и перенаправлял с него все запросы на порт 5601. То есть NGINX будет работать в режиме прокси. Приступаем.
1. Установим Nginx на хост:
apt install nginx
2. Настроим автозапуск Nginx:
systemctl enable nginx
3. Не будем удалять, а сделаем резервную копию настроек Nginx по умолчанию:
cp /etc/nginx/sites-enabled/default /etc/nginx/sites-enabled/default.sav
4. Настроим переадресацию OpenSearch-Dashboards с порта 5601 на 80:
nano /etc/nginx/sites-enabled/default
/etc/nginx/sites-enabled/default
map $http_upgrade $connection_upgrade { # WebSocket support
default upgrade;
'' '';
}
server {
listen 80;
server_name server-elk01 server-elk01.domain.ad 10.0.0.70;
location / {
proxy_pass http://127.0.0.1:5601; # full internal address
proxy_http_version 1.1;
proxy_set_header Host $server_name:$server_port;
proxy_set_header X-Forwarded-Host $http_host; # necessary for proper absolute redirects and TeamCity CSRF check
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header Upgrade $http_upgrade; # WebSocket support
proxy_set_header Connection $connection_upgrade; # WebSocket support
}
}
5. Активируем новые настройки Nginx без перезагрузки демона:
nginx -s reload
В случае неудачи перезагрузим Nginx:
service nginx reload
2.9. Настройка файрволла на запрет доступа по порту 5601
После того как мы настроили на NGINX переадресацию с 80 порта на порт 5601 сам порт 5601 продолжит принимать запросы. На этом шаге мы закроем порт 5601 файерволлом. Это, конечно, не обязательно, но, я считаю, что так будет больше порядка.
Само правило для файрволла не сложное, а вариантов как сделать так, чтобы после перезагрузки системы правило активировалось вновь можно придумать много. Я пойду по пути создания демона, который будет запускать bash-скрипт «/etc/rc.local». А сам скрипт будет содержать команду добавления правила в файрволл. Приступим.
1. Правило привяжем к сетевому интерфейсу, поэтому сначала необходимо узнать название сетевого интерфейса, для этого воспользуемся командой:
ip link show
или командой:
ifconfig
В моей системе сетевой интерфейс имеет имя «ens160».
2. Создадим Bash-скрипт с правилом файрволла:
nano /etc/rc.local
/etc/rc.local
#!/bin/bash
iptables -A INPUT -i ens160 -p tcp --dport 5601 -j DROP
exit 0
Соответственно имя интерфейса нужно изменить на имя своего интерфейса.
3. Дадим скрипту права на выполнение:
chmod +x /etc/rc.local
4. Делаем пользователя root владельцем скрипта:
chown -R root:root /etc/rc.local
5. Создадим файл демона для запуска скрипта:
nano /etc/systemd/system/rc-local.service
/etc/systemd/system/rc-local.service
[Unit]
Description=/etc/rc.local Compatibility
ConditionPathExists=/etc/rc.local
[Service]
Type=forking
ExecStart=/etc/rc.local start
TimeoutSec=0
StandardOutput=tty
RemainAfterExit=yes
SysVStartPriority=99
[Install]
WantedBy=multi-user.target
6. Зададим права на файл демона:
chmod 644 /etc/systemd/system/rc-local.service
7. Делаем пользователя root владельцем файла демона:
chown -R root:root /etc/systemd/system/rc-local.service
8. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:
systemctl daemon-reload
9. Запустим настроенный нами демон rc-local:
systemctl start rc-local
10. Проверим статус запуска демона rc-local:
systemctl status rc-local
11. Настроим автозапуск демона rc-local:
systemctl enable rc-local
2.10. Установка Logstash-oss-with-OpenSearch-output-plugin
Logstash в реализуемой схеме выполняет несколько задач. Во-первых, он как насос будет постоянно обращаться к Kafka и втягивать все новые сообщения, которые будут накапливаться в Kafka. Во-вторых, получаемые сообщения он будет преобразовывать правилами, которые мы в него позже заложим. И в-третьих, он будет раскладывать преобразованные сообщения по разным индексам в OpenSearch. Приступим к установке.
1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать Logstash-oss-with-OpenSearch-output-plugin версии 7.16.2 (https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.
2. Даем права на выполнение для архива:
chmod +x logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz
3. Распаковываем архив:
tar -xf logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz
4. Будем устанавливать Logstash-oss-with-OpenSearch-output-plugin в каталог «/opt/logstash», поэтому создаем рабочий каталог для Logstash-oss-with-OpenSearch-output-plugin:
mkdir /opt/logstash
5. Переносим распакованные данные в рабочий каталог:
mv ./logstash-7.16.2/* /opt/logstash/
6. Удаляем каталог, оставшийся от распаковки:
rmdir ./logstash-7.16.2
7. Делаем пользователя logstash владельцем рабочего каталога Logstash-oss-with-OpenSearch-output-plugin:
chown -R logstash:logstash /opt/logstash
8. Создаем файл демона для работы Logstash-oss-with-OpenSearch-output-plugin:
nano /etc/systemd/system/logstash.service
/etc/systemd/system/logstash.service
[Unit]
Description=logstash
[Service]
Type=simple
User=logstash
Group=logstash
ExecStart=/opt/logstash/bin/logstash "--path.settings" "/opt/logstash/config"
Restart=always
WorkingDirectory=/opt/logstash
Nice=19
LimitNOFILE=16384
# When stopping, how long to wait before giving up and sending SIGKILL?
# Keep in mind that SIGKILL on a process can cause data loss.
TimeoutStopSec=75
[Install]
WantedBy=multi-user.target
Отмечу, что в файле демона оригинального Logstash от Elastic отсутствует параметр «TimeoutStopSec=75». И у меня были случаи, когда при остановке демон Logstash зависал, и приходилось искать java-процесс и убивать его. Параметр «TimeoutStopSec=75» принудительно завершит выполнение демона, если через 75 секунд после команды «stop» процесс не завершится.
9. Зададим права на файл демона:
chmod 644 /etc/systemd/system/logstash.service
10. Делаем пользователя root владельцем файла демона:
chown -R root:root /etc/systemd/system/logstash.service
11. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:
systemctl daemon-reload
12. Зададим путь к базе данных и логам Logstash-oss-with-OpenSearch-output-plugin:
nano /opt/logstash/config/logstash.yml
/opt/logstash/config/logstash.yml
path.data: /opt/logstash/data
pipeline.ordered: auto
path.logs: /var/log/logstash
13. Зададим путь для чтения файлов pipelines:
nano /opt/logstash/config/pipelines.yml
/opt/logstash/config/pipelines.yml
- pipeline.id: man
path.config: "/opt/logstash/config/conf.d/*.conf"
14. Создадим каталог для файлов pipelines:
mkdir /opt/logstash/config/conf.d
15. Создадим каталог для логов:
mkdir /var/log/logstash
16. Делаем пользователя logstash владельцем каталога логов:
chown -R logstash /var/log/logstash
17. На этом этапе следовало бы сконфигурировать файлы pipelines для обработки данных поступающих из kafka и уходящих в OpenSearch, но я опишу это позже в нескольких кейсах и предоставлю итоговые файлы конфигураций.
18. После окончания настройки еще раз делаем пользователя logstash владельцем всех файлов в рабочем каталоге Logstash-oss-with-OpenSearch-output-plugin:
chown -R logstash:logstash /opt/logstash
19. Запустим настроенный нами демон Logstash-oss-with-OpenSearch-output-plugin:
systemctl start logstash
20. Проверим статус запуска демона Logstash-oss-with-OpenSearch-output-plugin:
systemctl status logstash
21. Настроим автозапуск демона Logstash-oss-with-OpenSearch-output-plugin:
systemctl enable logstash
3. Установка Beat’ов
Beat’ы это своего рода агенты, которые устанавливаются на каждом сервер. Они следят за определенными объектами, в нашем случае за файлами с логами, и когда заметят, что в файле произошли изменения они (Beat’ы) сформируют сообщение, которое будет содержать эти изменения, и отправят их (сообщения), в нашем случае, в Kafka.
Сразу стоит отметить, что Elastic выпускает две версии beat’ов: (условно) «платные» (Filebeat, Winlogbeat и т.д.) и (условно) «бесплатные» (Filebeat-OSS-only, Winlogbeat-OSS-only и т.д.). На сколько я понял «платные» работают так же, как и «бесплатные», за исключением того, что в «платных» можно подключить платный функционал «X-Pack». То есть «платные» обладают расширенными возможностями, которые можно и не покупать, и при этом не обладают временным ограничением действия. Поэтому далее я буду использовать Filebeat и Winlogbeat, но без платного функционала «X-Pack».
3.1. Установка Filebeat на Linux
1. Переходим на официальный сайт Elastic в раздел Filebeat (https://www.elastic.co/downloads/beats/filebeat) и скачиваем архив deb нужной версии. В этой статье я буду использовать Filebeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-amd64.deb).
Я буду настраивать Filebeat на той же машине, где развернут OpenSearch, поэтому после скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.
2. Даем права на выполнение для архива:
chmod +x ./filebeat-7.12.1-amd64.deb
3. Установим Filebeat:
dpkg -i ./filebeat-7.12.1-amd64.deb
4. На этом этапе следовало бы сконфигурировать файл «/etc/filebeat/filebeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.
5. Подключаем модуль system в filebeat:
filebeat modules enable system
6. Подключаем модуль kafka в filebeat:
filebeat modules enable kafka
7. Проверим список подключенных модулей:
filebeat modules list
8. Запустим демон Filebeat:
systemctl start filebeat
9. Проверим статус запуска демона Filebeat:
systemctl status filebeat
10. Настроим автозапуск демона Filebeat:
systemctl enable filebeat
3.2. Установка Filebeat на Windows
1. Переходим на официальный сайт Elastic в раздел Filebeat (https://www.elastic.co/downloads/beats/filebeat) и скачиваем архив zip нужной версии. В этой статье я буду использовать Filebeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-windows-x86_64.zip).
2. Будем устанавливать Filebeat в каталог «C:\filebeat», поэтому после скачивания перенесите архив на сервер «server-windows01» и распакуйте его в каталог «C:\filebeat».
3. На этом этапе следовало бы сконфигурировать файл «C:\filebeat\filebeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.
4. Для создания службы нужно выполнить следующий скрипт с правами администратора:
powershell "C:\filebeat\install-service-filebeat.ps1"
5. Для запуска Filebeat без службы можно использовать команды:
cd C:\filebeat\
filebeat -e -c filebeat.yml
3.3. Установка Winlogbeat на Windows
1. Переходим на официальный сайт Elastic в раздел Winlogbeat (https://www.elastic.co/downloads/beats/winlogbeat) и скачиваем архив zip нужной версии. В этой статье я буду использовать Winlogbeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/winlogbeat/winlogbeat-7.12.1-windows-x86_64.zip).
2. Будем устанавливать Winlogbeat в каталог «C:\winlogbeat», поэтому после скачивания перенесите архив на сервер «server-windows01» и распакуйте его в каталог «C:\winlogbeat».
3. На этом этапе следовало бы сконфигурировать файл «C:\winlogbeat\winlogbeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.
4. Для создания службы нужно выполнить следующий скрипт с правами администратора:
powershell "C:\winlogbeat\install-service-winlogbeat.ps1"
5. Для запуска Winlogbeat без службы можно использовать команды:
cd C:\winlogbeat\
winlogbeat -e -c winlogbeat.yml
4. Настройка на примере нескольких кейсов
Рассмотрим несколько ситуаций, в разрезе разных форматов собираемых данных и в разрезе разных Beat’ов, которые будут собирать данные. Посмотрим, как выглядят данные, которые нам предстоит собрать. Какой Beat их будет собирать. Как следует настроить Beat в каждой ситуации. И как следует настроить Logstash в каждой ситуации.
4.1. Сценарий сбора данных в формате json
4.1.1. Исходные данные
Файл (файлы) логов содержит сообщения в виде списка json объектов. Формат самого файла текстовый (не json), то есть квадратные скобки в начале и в конце файла не нужны.
Файл дописывается новыми сообщениями по мере необходимости.
Пример:
«C:\Temp\test_log_json2\2022-03-15.log»
{
"logId": "ID_1",
"uniqueSubId": "001",
"mess": "test_text"
}
{
"logId": "ID_2", "uniqueSubId": "002", "mess": "test_text2"
}
{
"logId": "ID_3",
"uniqueSubId": "003",
"mess": "test_text3"
}
Зачем нужно добавлять пустые строки после каждого json-объекта я опишу далее в разделе «Актуальные проблемы».
4.1.2. Процесс сбора данных через Filebeat
Служба Filebeat будет периодически проверять файл на наличие изменений. При обнаружении изменений эти изменения будут отправлены в Topic Kafka. На выходе в начало сообщения добавим метку времени отправки данных с сервера. Во входящее сообщение добавим тэг «test-json», а на выходе будем отправлять в Topic Kafka только те сообщения, которые содержат этот тэг. Так мы промаркируем сообщения. Маркировка сообщений становится актуальна, когда входящих и исходящих потоков несколько. Я добавил маркировку сообщения в этом кейсе для полноты конфигурации.
Отдельного внимания заслуживает параметр «codec.format». Если его не указывать, то, по умолчанию, отправляемое сообщение будет в формате json и будет содержать целый набор полей, которые содержат в основном техническую (справочную) информацию (имя и версию ОС, имя и версию Beat’а и т.д.). Меня же интересует только основное сообщение, которое содержится в поле «message», остальные данные я буду считать объективно известными, за счет того, что все потоки данных разделены на отдельные Topic’и Kafka.
Полный список полей, отправляемых в сообщении с настройками по умолчанию: «event», «log», «message», «@timestamp», «@metadata», «ecs», «agent», «host», «service», «input», «fileset».
Пример конфигурации Filebeat:
«C:\filebeat\filebeat.yml»
filebeat.inputs:
- type: log
enabled: true
paths:
- C:\Temp\test_log_json2\*
multiline.pattern: '^{'
multiline.negate: true
multiline.match: after
processors:
- decode_json_fields:
fields: ["message"]
target: "json"
tags: ["test-json"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
output.kafka:
hosts: ["10.0.0.60:9092"]
topics:
- topic: "server-windows01-test-json"
when.contains:
tags: "test-json"
codec.format:
string: '%{[@timestamp]} %{[message]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
close_inactive: 10m
4.1.3. Процесс обработки данных в Logstash
Демон Logstash при обнаружении новых данных в Topic’е Kafka начинает их прием и обработку. Данные внутри Logstash поступают сначала в модуль input, потом поступают в модуль filter и затем поступают в модуль output.
В модуле input примем данные из Topic’а Kafka и добавим в сообщение поле «type», которое будет содержать название Topic’а.
Пример конфигурации Logstash модуля input:
«/opt/logstash/config/conf.d/server-windows01-input.conf»
input {
kafka {
type => "server-windows01-test-json"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-test-json"
}
}
В модуле filter обрабатываются только те сообщения, в которых есть поле «type», значение которого равно имени Topic’а Kafka. Из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_filebeat». Декодируем основное сообщение из формата json, при этом все заголовки ключей станут одноименными полями, а значения ключей перейдут в значения полей. Добавим поле «timestamp_logstash», для фиксирования времени, в которое проводилась обработка сообщения.
Пример конфигурации Logstash модуля filter:
«/opt/logstash/config/conf.d/server-windows01-filter.conf»
filter {
if [type] == "server-windows01-test-json" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
json {
source => "message"
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
}
В модуле output обрабатываются только те сообщения, в которых есть поле «type», значение которого равно имени Topic’а Kafka. Каждое сообщение отправляется в OpenSearch в индекс, соответствующий текущей дате, имени Topic’а и префикса «Kafka».
Пример конфигурации Logstash модуля output:
«/opt/logstash/config/conf.d/server-windows01-output.conf»
output {
if [type] == "server-windows01-test-json" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-test-json-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
}
4.2. Сценарий сбора однострочных данных в текстовом формате
4.2.1. Исходные данные
Каждая строка файла будет извлекаться как новое сообщение и всё содержимое строки будет помещено в поле «message» как текст. В Filebeat такой сценарий работает по умолчанию. Нам остается только промаркировать сообщения и отправить их в нужный Topic Kafka.
Если содержание всех строк стандартизировано, то в Logstash на этапе Filter можно разложить сообщение на заранее известные поля. В этом примере мы этого делать не будем.
Пример:
«C:\Temp\log\2022-03-16.log»
[2022.03.16 13:01:09] Info | Just text
Next text
[2022.03.16 13:02:01] Info | New text
4.2.2. Процесс сбора данных через Filebeat
Процесс сбора данных аналогичен ситуации сбора данных в формате json за исключением того, что нам не нужно парсить json, так же изменим тэг для маркировки входящих сообщений, и изменим Topic Kafka.
Пример конфигурации Filebeat:
«C:\filebeat\filebeat.yml»
filebeat.inputs:
- type: filestream
paths:
- C:\Temp\log\*.log
tags: ["simple-logs"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
output.kafka:
hosts: ["10.0.0.60:9092"]
topics:
- topic: "server-windows01-simple-logs"
when.contains:
tags: "simple-logs"
codec.format:
string: '%{[@timestamp]} %{[message]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
close_inactive: 10m
4.2.3. Процесс обработки данных в Logstash
Процесс обработки данных в Logstash будет незначительно отличаться от сценария для ситуации с json-объектами, поэтому я приведу только примеры конфигураций.
Пример конфигурации Logstash модуля input:
«/opt/logstash/config/conf.d/server-windows01-input.conf»
input {
kafka {
type => "server-windows01-simple-logs"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-simple-logs"
}
}
Пример конфигурации Logstash модуля filter:
«/opt/logstash/config/conf.d/server-windows01-filter.conf»
filter {
if [type] == "server-windows01-simple-logs" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
}
Пример конфигурации Logstash модуля output:
«/opt/logstash/config/conf.d/server-windows01-output.conf»
output {
if [type] == "server-windows01-simple-logs" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-simple-logs-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
}
4.3. Сценарий сбора Логов IIS
Для сбора логов IIS в Filebeat предусмотрен отдельный модуль, который так и называется IIS. В IIS логи делятся на две категории: «ACCESS» и «ERROR». У меня пути к этим логам настроены по умолчанию.
4.3.1. Процесс сбора данных через Filebeat
Для того, чтобы промаркировать эти две категории («ACCESS» и «ERROR») и направить их в разные Topic’и Kafka я использовал параметр «@metadata.pipeline».
Так же стоит обратить внимание на параметры «var.paths», по какой-то причине в путях необходимо использовать обратные слэши.
Пример конфигурации Filebeat:
«C:\filebeat\filebeat.yml»
filebeat.inputs:
filebeat.modules:
- module: iis
access:
enabled: true
var.paths: ["C:/inetpub/logs/LogFiles/*/*.log"]
error:
enabled: true
var.paths: ["C:/Windows/System32/LogFiles/HTTPERR/*.log"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
output.kafka:
hosts: ["10.0.0.60:9092"]
topics:
- topic: "rgmtpaydox39-iis-access"
when.contains:
"@metadata.pipeline": "iis-access"
- topic: "rgmtpaydox39-iis-error"
when.contains:
"@metadata.pipeline": "iis-error"
codec.format:
string: '%{[@timestamp]} %{[message]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
close_inactive: 10m
4.3.2. Процесс обработки данных в Logstash
Пример конфигурации Logstash модуля input:
«/opt/logstash/config/conf.d/server-windows01-input.conf»
input {
kafka {
type => "server-windows01-iis-access"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-iis-access"
}
kafka {
type => "server-windows01-iis-error"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-iis-error"
}
}
Во время обработки «iis-access» из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_filebeat». Затем из полученного сообщения снова вырежем метку времени из начала сообщения и добавим её в новое поле «timestamp_windows». Затем преобразуем тип поля «timestamp_windows» из строкового в тип «дата». Это позволит в дальнейшем в OpenSearch-Dashboards создавать шаблоны индексов с фильтрацией по этому полю. Затем добавим еще одно поле «timestamp_logstash».
За счет наличия трех меток времени мы сможем точно отследить, когда событие возникло в windows, когда попало в обработку Filebeat и когда попало в обработку Logstash.
Обработка «iis-error» осуществляется аналогичным образом.
Пример конфигурации Logstash модуля filter:
«/opt/logstash/config/conf.d/server-windows01-filter.conf»
filter {
if [type] == "server-windows01-iis-access" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
grok {
match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
overwrite => [ "message" ]
}
date {
match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
target => "timestamp_windows"
locale => "en"
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-iis-error" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
grok {
match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
overwrite => [ "message" ]
}
date {
match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
target => "timestamp_windows"
locale => "en"
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
}
Пример конфигурации Logstash модуля output:
«/opt/logstash/config/conf.d/server-windows01-output.conf»
output {
if [type] == "server-windows01-iis-access" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-iis-access-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-iis-error" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-iis-error-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
}
4.4. Сценарий сбора системных Логов Windows
4.4.1. Процесс сбора данных через Winlogbeat
Системные логи Windows будем собирать с помощью Winlogbeat. Доступны 3 вида логов: «Application», «Security» и «System».
Winlogbeat имеет особенность в отличие от Filebeat. Подставляемый на выходе параметр «timestamp» будет содержать не время отправки с сервера, а время события (из лога), произошедшего в Windows. Временная метка отправки с сервера будет содержаться в поле «event» в параметре «created» («event.created»). Кроме того, я посчитал еще несколько полей информативными. В итоге на выходе отправляются следующие поля: «@timestamp», «message», «event», «log», «winlog». При этом «@timestamp» расположен в начале сообщения и отделен пробелом. Все остальные поля я разделил символом «|».
Полный список полей, отправляемых в сообщении с настройками по умолчанию: «event», «log», «message», «winlog», «@timestamp», «@metadata», «ecs», «agent», «host».
Так же как и в Filebeat вы можете убрать параметр «codec.format» и посмотреть полный список полей и их значений, которые попадут в Logstash в формате json.
К каждому виду логов можно добавить параметр «ignore_older», чтобы игнорировать слишком старые логи. Я добавил этот параметр к «Application» со значением «72h», чтобы игнорировать логи старше 72 часов.
Пример конфигурации Winlogbeat:
«C:\winlogbeat\winlogbeat.yml»
winlogbeat.event_logs:
- name: Application
ignore_older: 72h
tags: ["server-windows01-application"]
- name: Security
tags: ["server-windows01-security"]
- name: System
tags: ["server-windows01-system"]
output.kafka:
hosts: ["10.0.0.60:9092"]
topics:
- topic: "server-windows01-application"
when.contains:
tags: "server-windows01-application"
- topic: "server-windows01-security"
when.contains:
tags: "server-windows01-security"
- topic: "server-windows01-system"
when.contains:
tags: "server-windows01-system"
codec.format:
string: '%{[@timestamp]} %{[message]}|%{[event]}|%{[log]}|%{[winlog]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
logging.level: info
logging.to_files: true
logging.files:
path: C:\winlogbeat
name: winlogbeat.log
keepfiles: 7
4.4.2. Процесс обработки данных в Logstash
Пример конфигурации Logstash модуля input:
«/opt/logstash/config/conf.d/server-windows01-input.conf»
input {
kafka {
type => "server-windows01-application"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-application"
}
kafka {
type => "server-windows01-security"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-security"
}
kafka {
type => "server-windows01-system"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-system"
}
}
Во время обработки «application» из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_windows». Установим, что остальные поля разделены символом «|», в соответствии с конфигурацией Winlogbeat. Символ «|» необходимо экранировать символом «\».
Названия полей обозначим с таким же именем и в таком же порядке, как и в конфигурации Winlogbeat. Зная, что поле «event» находится в формате json распарсим его как json и поместим в то же поле. С полями «log» и «winlog» поступим аналогично. Так как время обработки сообщения в Winlogbeat находится в поле «event» в параметре «created», то создадим поле «timestamp_winlogbeat» и поместим туда значение из «event.created». После этого создадим поле «timestamp_logstash» с меткой времени обработки сообщения в Logstash.
Сообщения «security» и «system» обрабатываются аналогичным образом.
Пример конфигурации Logstash модуля filter:
«/opt/logstash/config/conf.d/server-windows01-filter.conf»
filter {
if [type] == "server-windows01-application" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
overwrite => [ "message" ]
}
json {
source => "event"
target => "event"
}
json {
source => "log"
target => "log"
}
json {
source => "winlog"
target => "winlog"
}
mutate {
add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-security" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
overwrite => [ "message" ]
}
json {
source => "event"
target => "event"
}
json {
source => "log"
target => "log"
}
json {
source => "winlog"
target => "winlog"
}
mutate {
add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-system" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
overwrite => [ "message" ]
}
json {
source => "event"
target => "event"
}
json {
source => "log"
target => "log"
}
json {
source => "winlog"
target => "winlog"
}
mutate {
add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
}
Пример конфигурации Logstash модуля output:
«/opt/logstash/config/conf.d/server-windows01-output.conf»
output {
if [type] == "server-windows01-application" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-application-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-security" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-security-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-system" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-system-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
}
4.5. Сценарий сбора системных Логов Linux
4.5.1. Процесс сбора данных через Filebeat
Системные логи Linux будем собирать с помощью Filebeat. Конфигурирование Filebeat в Linux практически идентично конфигурированию Filebeat в Windows. За счет того, что при установке Filebeat на Linux мы включили модуль system, можно настроить только вывод данных, сменив его с elasticsearch на kafka, остальные настройки можно оставить по умолчанию и данные пойдут в формате json.
Поскольку системные логи Linux хранятся в одном файле «/var/log/syslog» (архивные не берем в расчет) и формат логов однострочный, то есть каждое сообщение хранится в отдельной строке, то в этой ситуации можно так же применить «Сценарий сбора однострочных данных в текстовом формате». Именно этот вариант я и представлю.
Пример конфигурации Filebeat:
«/etc/filebeat/filebeat.yml»
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/syslog
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
output.kafka:
hosts: ["10.0.0.60:9092"]
topic: "server-elk01-syslog"
codec.format:
string: '%{[@timestamp]} %{[message]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
close_inactive: 10m
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat.log
keepfiles: 7
4.5.2. Процесс обработки данных в Logstash
Пример конфигурации Logstash модуля input:
«/opt/logstash/config/conf.d/server-elk01-input.conf»
input {
kafka {
type => "server-elk01-syslog"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-elk01-syslog"
}
}
Пример конфигурации Logstash модуля filter:
«/opt/logstash/config/conf.d/server-elk01-filter.conf»
filter {
if [type] == "server-elk01-syslog" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
}
Пример конфигурации Logstash модуля output:
«/opt/logstash/config/conf.d/server-elk01-output.conf»
output {
if [type] == "server-elk01-syslog" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-elk01-syslog-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
}
5. Итоговые файлы конфигураций
С учетом всех сценариев у меня получились следующие файлы конфигураций.
Пример конфигурации Filebeat на Linux:
«/etc/filebeat/filebeat.yml»
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/syslog
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
output.kafka:
hosts: ["10.0.0.60:9092"]
topic: "server-elk01-syslog"
codec.format:
string: '%{[@timestamp]} %{[message]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
close_inactive: 10m
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat.log
keepfiles: 7
Пример конфигурации Filebeat на Windows:
«C:\filebeat\filebeat.yml»
filebeat.inputs:
- type: filestream
paths:
- C:\Temp\log\*.log
tags: ["simple-logs"]
- type: log
enabled: true
paths:
- C:\Temp\test_log_json2\*
multiline.pattern: '^{'
multiline.negate: true
multiline.match: after
processors:
- decode_json_fields:
fields: ["message"]
target: "json"
tags: ["test-json"]
filebeat.modules:
- module: iis
access:
enabled: true
var.paths: ["C:/inetpub/logs/LogFiles/*/*.log"]
error:
enabled: true
var.paths: ["C:/Windows/System32/LogFiles/HTTPERR/*.log"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
output.kafka:
hosts: ["10.0.0.60:9092"]
topics:
- topic: "server-windows01-simple-logs"
when.contains:
tags: "simple-logs"
- topic: "server-windows01-test-json"
when.contains:
tags: "test-json"
- topic: "server-windows01-iis-access"
when.contains:
"@metadata.pipeline": "iis-access"
- topic: "server-windows01-iis-error"
when.contains:
"@metadata.pipeline": "iis-error"
codec.format:
string: '%{[@timestamp]} %{[message]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
close_inactive: 10m
Пример конфигурации Winlogbeat:
«C:\winlogbeat\winlogbeat.yml»
winlogbeat.event_logs:
- name: Application
ignore_older: 72h
tags: ["server-windows01-application"]
- name: Security
tags: ["server-windows01-security"]
- name: System
tags: ["server-windows01-system"]
output.kafka:
hosts: ["10.0.0.60:9092"]
topics:
- topic: "server-windows01-application"
when.contains:
tags: "server-windows01-application"
- topic: "server-windows01-security"
when.contains:
tags: "server-windows01-security"
- topic: "server-windows01-system"
when.contains:
tags: "server-windows01-system"
codec.format:
string: '%{[@timestamp]} %{[message]}|%{[event]}|%{[log]}|%{[winlog]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
logging.level: info
logging.to_files: true
logging.files:
path: C:\winlogbeat
name: winlogbeat.log
keepfiles: 7
Пример конфигурации Logstash модуля input для сервера «server-elk01»:
«/opt/logstash/config/conf.d/server-elk01-input.conf»
input {
kafka {
type => "server-elk01-syslog"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-elk01-syslog"
}
}
Пример конфигурации Logstash модуля filter для сервера «server-elk01»:
«/opt/logstash/config/conf.d/server-elk01-filter.conf»
filter {
if [type] == "server-elk01-syslog" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
}
Пример конфигурации Logstash модуля output для сервера «server-elk01»:
«/opt/logstash/config/conf.d/server-elk01-output.conf»
output {
if [type] == "server-elk01-syslog" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-elk01-syslog-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
}
Пример конфигурации Logstash модуля input для сервера «server-windows01»:
«/opt/logstash/config/conf.d/server-windows01-input.conf»
input {
kafka {
type => "server-windows01-simple-logs"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-simple-logs"
}
kafka {
type => "server-windows01-test-json"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-test-json"
}
kafka {
type => "server-windows01-application"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-application"
}
kafka {
type => "server-windows01-security"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-security"
}
kafka {
type => "server-windows01-system"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-system"
}
kafka {
type => "server-windows01-iis-access"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-iis-access"
}
kafka {
type => "server-windows01-iis-error"
bootstrap_servers => "10.0.0.60:9092"
topics => "server-windows01-iis-error"
}
}
Пример конфигурации Logstash модуля filter для сервера «server-windows01»:
«/opt/logstash/config/conf.d/server-windows01-filter.conf»
filter {
if [type] == "server-windows01-simple-logs" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-test-json" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
json {
source => "message"
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-application" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
overwrite => [ "message" ]
}
json {
source => "event"
target => "event"
}
json {
source => "log"
target => "log"
}
json {
source => "winlog"
target => "winlog"
}
mutate {
add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-security" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
overwrite => [ "message" ]
}
json {
source => "event"
target => "event"
}
json {
source => "log"
target => "log"
}
json {
source => "winlog"
target => "winlog"
}
mutate {
add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-system" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
overwrite => [ "message" ]
}
json {
source => "event"
target => "event"
}
json {
source => "log"
target => "log"
}
json {
source => "winlog"
target => "winlog"
}
mutate {
add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-iis-access" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
grok {
match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
overwrite => [ "message" ]
}
date {
match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
target => "timestamp_windows"
locale => "en"
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
if [type] == "server-windows01-iis-error" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
overwrite => [ "message" ]
}
grok {
match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
overwrite => [ "message" ]
}
date {
match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
target => "timestamp_windows"
locale => "en"
}
mutate {
add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
}
}
}
Пример конфигурации Logstash модуля output для сервера «server-windows01»:
«/opt/logstash/config/conf.d/server-windows01-output.conf»
output {
if [type] == "server-windows01-simple-logs" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-simple-logs-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-test-json" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-test-json-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-application" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-application-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-security" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-security-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-system" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-system-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-iis-access" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-iis-access-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
if [type] == "server-windows01-iis-error" {
opensearch {
hosts => "https://localhost:9200"
user => "admin"
password => "yN-3L(GMmAAw"
index => "kafka-server-windows01-iis-error-%{+YYYY.MM.dd}"
ssl_certificate_verification => false
}
}
}
6. Эксплуатация
Следующие шаги я разберу на примере сбора данных в формате json.
6.1. Создание шаблона индексов с помощью OpenSearch-Dashboards
Шаблон индексов позволяет просматривать сообщения одного или нескольких индексов сортируя их по дате. В качестве даты может выступать любое поле в формате «дата». Для того чтобы можно было осуществлять поиск с помощью OpenSearch-Dashboards необходимо создать шаблон индексов. Для этого перейдем в раздел «Stack Management», далее в раздел «Index Pattern» и нажмем кнопку «Create index pattern» (Рис.2).
Рис.2
Создадим шаблон индексов для данных из Topic’а Kafka без учета даты. Для этого введем в строке имени шаблона название индекса, но в конце, вместо даты введем символ «*» (Рис.3).
Рис.3
Параметром сортировки данных в шаблоне выберем «@timestamp», он совпадает с «timestamp_logstash» (Рис.4).
Рис.4
6.2. Отображение данных в OpenSearch-Dashboards
При переходе в раздел «Discover» в соответствующий шаблон индексов там отобразятся доступные сообщения. Можно раскрыть подробное содержание сообщения и увидеть, что все заголовки ключей json-сообщения стали одноименными полями, а значения ключей перешли в значения полей. Так же осталось поле «message», которое содержит изначальное json-сообщение (Рис.5). На этапе фильтрации сообщения в Logstash это поле («message») можно удалять, если в нем нет необходимости.
Если в json-сообщении содержится поле «message», то оригинальное поле «message», которое содержит изначальное json-сообщение, будет перезаписано на поле из json-сообщения.
Рис.5
6.3. Поиск в OpenSearch-Dashboards
Я рассмотрю совсем немного вариантов поиска, исключительно с целью обзора. Поиск будет только в формате «DQL» (формат по умолчанию).
Поиск осуществляется в разделе «Discover» в соответствующем шаблоне индексов.
1. Для поиска по всем доступным сообщениям во всех полях достаточно ввести нужную фразу в строке поиска (Рис.6).
Рис.6
2. Для поиска фраз по определенным полям в поисковой строке указывается имя нужного поля, затем, в качестве разделителя, идет символ двоеточия «:» и затем искомая фраза. Искомая фраза может содержать специальные символы «*», «?» и т.д. для формирования шаблона (Рис.7).
Рис.7
3. В поисковом запросе можно применять логические операции «or», «and», «not» (Рис.8).
Рис.8
6.4. Работа с OpenSearch через консоль (без OpenSearch-Dashboards)
Я приведу не большой список запросов для того, чтобы показать, как выглядит работа из консоли.
Запросить список индексов:
curl 'https://10.0.0.70:9200/_cat/indices?v&pretty' -u 'admin:yN-3L(GMmAAw' –insecure
Запросить структуру индекса:
curl 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_mapping?pretty' -u 'admin:yN-3L(GMmAAw' --insecure
Удалить индекс:
curl -XDELETE 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30?pretty' -u 'admin:yN-3L(GMmAAw' –insecure
Вывести первые 10 сообщений из индекса (10 по умолчанию):
curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?pretty' -u 'admin:yN-3L(GMmAAw' --insecure
Вывести первые 20 сообщений из индекса:
curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?size=20&pretty' -u 'admin:yN-3L(GMmAAw' --insecure
Вывести первые 20 сообщений из шаблона индексов:
curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-*/_search?size=20&pretty' -u 'admin:yN-3L(GMmAAw' --insecure
Осуществить поиск в индексе по запросу «*text*» через параметр «q»:
curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?q=*text*&pretty' -u 'admin:yN-3L(GMmAAw' --insecure
Для поиска по составным запросам существуют несколько типов поиска:
term — точное совпадение искомой строки со строкой в индексе или термом;
match — все слова должны входить в строку, в любом порядке;
match_phrase — вся фраза должна входить в строку;
query_string — все слова входят в строку в любом порядке, можно искать по нескольким полям, используя регулярные выражения.
Осуществим поиск в индексе по полю «mess», искать будем точное совпадение фразы «test_text»:
curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?pretty ' -u 'admin:yN-3L(GMmAAw' --insecure -H 'Content-Type: application/json' -d '{"query" : {"term" : {"mess" : "test_text"}}}'
7. Актуальные проблемы
В этом разделе я опишу проблемы, с которыми я встретился.
7.1. Имена Topic’ов Kafka не должны содержать большие буквы
Logstash не может читать Topic’и Kafka, если они (Topic’и) содержат большие буквы. Поэтому имена Topic’ов Kafka не должны содержать большие буквы.
7.2. Filebeat некорректно создает Topic’и Kafka
При старте как Filebeat, так и Logstash создают Topic в Kafka, если в конфигурации соответствующего сервиса Topic имеется, а в Kafka он еще отсутствует. Но если Topic создает Filebeat, то Logstash не может читать этот топик. Если же Topic создает Logstash, то все сервисы могут читать этот Topic. Поэтому при создании/изменении конфигурации обеих служб сначала необходимо запустить Logstash и дождаться пока Topic в Kafka будет создан. И только после этого можно запускать Filebeat.
7.3. Требуется переход на следующую строку после json сообщения
При считывании сообщений из логов в формате json, после закрывающей фигурной скобки «}», которая закрывает json, в том случае, когда это последнее сообщение в файле, необходимо добавить переход на следующую строку. Иначе эта фигурная скобка «}» не попадет в обработку json и последнее json-сообщение в файле обрабатывается не корректно (не читается как json).
7.4. Необходимо обращать внимание на кодировку log-файлов
По умолчанию корректно обрабатываются log-файлы только в формате «UTF-8». Поэтому необходимо либо создавать log-файл в формате «UTF-8», либо явно указываться кодировку в Logstash и/или в Filebeat.
Например, я столкнулся с тем, что PowerShell по умолчанию формирует файлы в формате «UTF-16». Пришлось сменить кодировку по умолчанию в скрипте на «UTF-8»:
$PSDefaultParameterValues['*:Encoding'] = 'utf8'
8. Послесловие
Что мне хотелось бы добавить в эту статью, но по какой-то причине этого не произошло:
1. Настройка времени жизни индексов в OpenSearch;
2. Настройка Apache Kafka;
3. Настройка аутентификации Active Directory в OpenSearch-Dashboards по протоколу LDAP;
4. Создание визуализаций в OpenSearch-Dashboards;
5. Создание Dashboard’ов в OpenSearch-Dashboards.
На этом я заканчиваю. Всем спасибо за внимание к статье, надеюсь она окажется для кого-то полезной. И оставляйте ваши рецепты настройки OpenSearch в комментариях, думаю это так же будет интересно.
Winlogbeat – это компонент Elastick Stack, который позволяет собирать журналы событий Windows и отправлять их в Elasticsearch. Я решил вынести процесс в отдельную публикацию, чтобы сконцентрироваться на том, как выполняется Winlogbeat. Это позволит в последующем, например, при подготовке публикации с примера использования Winlogbeat, не загромождать публикацию информацией об установке.
В последующих публикациях я постараюсь привести примеры использования Winlogbeat.
Установка выполняется следующим образом:
1. Загрузите deb/rpm пакет со страницы загрузки:
2. Распакуйте директорию из архива в “C:\Program Files” и переименуйте директорию в “Winlogbeat”.
3. Запустите консоль PowerShell от имени администратора.
4. Запустите скрипт по установке Winlogbeat:
cd "C:\Program Files\Winlogbeat"
.\install-service-winlogbeat.ps1
5. Запустите сервис:
Start-Service winlogbeat
Установка Winlogbeat завершена.
В предыдущей статье мы рассмотрели, как развернуть собственный централизованный сервер сбора лога с различных типов сетевых устройства на базе стека Graylog (
Graylog
+
OpenSearch
+
MongoDB
). В этой статье мы покажем, как настроить отправку журналов событий с серверов Windows (включая события Active Directory) в Graylog.
Настройка сборщика данных и индексов Graylog для устройств Windows
Сначала нужно настроить в Graylog отдельные сборщики данных и потоки для логов, которые будут отправлять хосты Windows Server (чтобы не смешивались события от разных классов устройств). Перейдите в раздел System -> Inputs и добавьте новый сборщик Windows Server Devices типа Beats, который слушает на порту
TCP:5044
.
Затем создайте отдельный индекс для логов журналов событий Windows. На базе нового Input и индекса создайте новый поток для Windows в разделе Streams и запустите его.
Отправка событий Windows в Graylog с помощью Winlogbeat
Для отправки логов из журналов событий EventViewer с хостов Windows на сервер Graylog можно воспользоваться службой сборщика логов Winlogbeat. Winlogbeat это один из свободно распространяемых компонентов стека ELK. Службу Winlogbeat нужно установить на каждом хосте Windows, события с которого вы хотите видеть на сервере Graylog.
- Скачайте архив Winlogbeat со страницы загрузки (https://www.elastic.co/downloads/beats/winlogbeat)
- Распакуйте архив в папку
C:\Program Files\winlogbeat
- Отредактируйте конфигурационный файл winlogbeat.yml
В самом простом случае можно использовать следующую конфигурацию, когда все события из журналов Application, Security и System будут отправлены на указанный сервер Graylog.
Обратите внимание, что в конфигурационном файле winlogbeat используется синтаксис YAML, а это значит нужно быть внимательным с пробелами и отступами.
winlogbeat.event_logs: - name: Application ignore_older: 72h - name: Security - name: System output.logstash: hosts: ["192.168.14.146:5044"]
Можно использовать более гибкие условия фильтрации, чтобы получить только нужны логи. Например, чтобы получить события с определенными уровнями критичности и номерам EventID, используется такой конфиг:
winlogbeat.event_logs: - name: Security event_id: 4627, 4703, 4780-4782 ignore_older: 24h level: critical, error - name: Microsoft-Windows-TerminalServicesRDPClient/Operational event_id: 1102
Примеры типовой универсальной конфигурации winlogbeat.yml для Windows Server можно посмотреть тут.
Сохраните файл winlogbeat.yml и проверьте корректность конфигурации Winlogbeat и доступность сервера сбора логов:
cd "C:\Program Files\winlogbeat"
./winlogbeat test config
./winlogbeat test output
Если все ОК, установите и запустите службу winlogbeat:
.\install-service-winlogbeat.ps1
Start-Service winlogbeat
Перейдите в веб-интерфейсе GrayLog сервера и проверьте, что в соответствующем потоке стали появляться события с ваших серверов Windows.
Сбор и анализ событий с контроллеров домена Active Directory с помощью Graylog
Рассмотрим, как использовать сервер Graylog для поиска и анализа событий Windows на примере контроллеров домена Active Directory.
При наличии множества контроллеров Active Directory администратору бывает сложно найти определенное событие, так как приходится просматривать журналы на каждом DC. Благодаря централизованному серверу Graylog, который хранит события со всех контроллеров домена, нужно событие нужно найти за секунды.
Например, вам нужно найти компьютер, с которого была заблокирована учетная запись пользователя из-за неверного ввода пароля. Для этого откройте строку фильтра Graylog, выберите нужный Stream, или укажите его в запросе (
streams:xxxxxxxxxxxxx
) и выполните следующий запрос:
winlogbeat_event_code:(4740 OR 4625) AND winlogbeat_event_provider:Microsoft\-Windows\-Security\-Auditing
Сервер Graylog быстро нашел нужно событие и в его свойствах видно имя компьютера, с которого была заблокирована учетная записью.
Еще несколько примеров поиска различных событий в Active Directory:
- Event ID 4767 – позволяет определить кто разблокировал пользователя AD
- Event ID 4724 – кто и когда сбросил пароль пользователю домена
- Event ID 4720 – позволяет узнать, кто и когда создал нового пользователя в AD, 4722 – событие включения учетной записи, 4725 – отключение, 4726 – удаление.
- Отслеживание изменения в группах безопасности AD: 4727 (создана новая группа), 4728 (новый пользователь добавлен в группу), 4729 (пользователь удален из группы), 4730 (группа безопасности удалена)
- Event ID 5137 (создана новая групповая политика домена), 5136 (изменена GPO), 5141 (удалена GPO)
- Event ID 4624 — событие успешного входа пользователя в домен (позволяет быстро получить историю входа пользователя в AD)
Важно настроить отправку логов через Winlogbeat со всех контроллеров домена (список активных DC можно получить с помощью команды Get-ADDomainController. Сбор некоторых событий безопасности Active Directory нужно отдельно включить в настройках политик аудита в Default Domain Controller Policy.
Вы можете создать в Graylog сохранённые запросы и dashboardы для быстрого поиска интересующих вас событий. С помощью оповещений можно настроить рассылку алертов о критических событиях в AD.
Централизованное хранилище логов для хостов Windows
Graylog столь же удобно позволяет хранить, искать и анализировать события от других служб Windows Server. Ниже приведены примеры различных сценариев, в которых администратору приходится выполнять поиск по журналам событий Windows.
- Аудит доступа к файлам и папкам на файловом сервере
- Аудит удаления файлов в сетевой папке
- Аудит изменений NTFS разрешений объектов
- Анализ логов RDP подключений
- Обнаружение перебора паролей к RDP серверу
- Определить кто и когда перезагрузил или выключил сервер Windows:
winlogbeat_event_code:1074
- Реагирование на очистку журналов событий Windows (возможная компрометация сервера)
- Оповещение об обнаружении вируса на одном из серверов Windows встроенным антивирусом Windows Defender (Event ID 1006, 1116)
При использовании быстрого и простого сервера Graylog поиск и фильтрация событий в журналах Windows существенно упрощается. На сайте Graylog есть статья, в которой указывается список критических событий безопасности Windows, которые рекомендуется отслеживать.
Централизованное хранилище логов Windows и Active Directory удобно использовать для быстрого расследования и реагирования на инциденты информационной безопасности, анализа работы компонентов, выявления сбоев.

In this guide, we are going to learn how to send Windows logs to Elastic Stack using Winlogbeat and Sysmon. Winlogbeat is an Elastic Beat that is used to collect windows system application, security, system or hardware events. Sysmon (System Monitor) on the other hand is a windows application that is used to monitor and log system activity to the Windows event log. It provides detailed information about process creations, network connections, and changes to file creation time
Table of Contents
Install and Setup ELK Stack
In order to visualize and analyze the events collected by Winlogbeat/Sysmon, you need to have setup your Elastic Stack. See the links below on how to install and setup Elastic Stack on Fedora/CentOS/Ubuntu servers.
Install Elastic Stack
Install Winlogbeat on Windows
In this guide, we are going to use Windows Server 2022 as our Windows system. Therefore, you need to install both Winlogbeat and Sysmon on your Windows system in order to ship events to Elastic stack.
Navigate to Winlogbeat downloads page and download Winlogbeat zip file.
Once the download is done, extract the Winlogbeat zipped file. As of this guide update, the current version ELK and beats is v8.17. Hence, we have winlogbeat-8.17.1-windows-x86_64.zip.
When you extract, you should get a folder, winlogbeat-8.17.1-windows-x86_64.
Move the winlogbeat-8.17.1-windows-x86_64 directory to C:\Program Files and rename it to Winlogbeat. Your directory should look like as in below;
Next, to install Winlogbeat on Windows, you need to execute the install-service-winlogbeat.ps1 installation script.
Hence, open the Powershell as the administrator and change to Winlogbeat directory by executing the command below;
cd C:\'Program Files'\Winlogbeat
Next, run the Winlogbeat installer as shown below;
.\install-service-winlogbeat.ps1
If you get the error, cannot be loaded because the execution of scripts is disabled on this system, as shown below, you need to enable the script execution.
PS C:\Program Files\Winlogbeat> .\install-service-winlogbeat.ps1
File C:\Program Files\Winlogbeat\install-service-winlogbeat.ps1 cannot be loaded because the execution of scripts is disabled on this system. Please see "get-help about_signing" for more details.
At line:1 char:33
+ .\install-service-winlogbeat.ps1 <<<<
+ CategoryInfo : NotSpecified: (:) [], PSSecurityException
+ FullyQualifiedErrorId : RuntimeException
As a result, you need to execute the Winlogbeat script with unrestricted execution policy as shown in the command below;
PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-winlogbeat.ps1
PS C:\Program Files\Winlogbeat> PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-winlogbeat.ps1
Status Name DisplayName
------ ---- -----------
Stopped winlogbeat winlogbeat
Install Sysmon on Windows
Download Sysmon from the downloads page.
Once the download is complete, extract the contents of the zipped file to C:\Program Files directory.
Once the extraction is done, you folder should look like as in below;
Install Sysmon with md5 and sha256 hashing of process created, log loading of modules and monitoring network connections, open a CMD as an administrator and navigate to C:\Program Files\Sysmon and execute the command below;
cd C:\'Program Files'\Sysmon
.\Sysmon64.exe -i -accepteula -h md5,sha256,imphash -l -n
System Monitor v15.15 - System activity monitor
By Mark Russinovich and Thomas Garnier
Copyright (C) 2014-2024 Microsoft Corporation
Using libxml2. libxml2 is Copyright (C) 1998-2012 Daniel Veillard. All Rights Reserved.
Sysinternals - www.sysinternals.com
Sysmon64 installed.
SysmonDrv installed.
Starting SysmonDrv.
SysmonDrv started.
Starting Sysmon64..
Sysmon64 started.
Configuring Winlogbeat
The main configuration file for Winlogbeat is C:\Program Files\Winlogbeat\winlogbeat.yml with the reference config file being C:\Program Files\Winlogbeat\winlogbeat.reference.yml.
To edit this file, you can use Notepad++ as administrator.
By default, Winlogbeat is set to monitor application, security, and system logs, and logs from Sysmon.
# ======================== Winlogbeat specific options =========================
# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
# dictionaries.
#
# The supported keys are name, id, xml_query, tags, fields, fields_under_root,
# forwarded, ignore_older, level, event_id, provider, and include_xml.
# The xml_query key requires an id and must not be used with the name,
# ignore_older, level, event_id, or provider keys. Please visit the
# documentation for the complete details of each option.
# https://go.es.io/WinlogbeatConfig
winlogbeat.event_logs:
- name: Application
ignore_older: 72h
- name: System
- name: Security
- name: Microsoft-Windows-Sysmon/Operational
- name: Windows PowerShell
event_id: 400, 403, 600, 800
- name: Microsoft-Windows-PowerShell/Operational
event_id: 4103, 4104, 4105, 4106
- name: ForwardedEvents
tags: [forwarded]
# ====================== Elasticsearch template settings =======================
If you need to see more event types, you can execute the command Get-EventLog *
in PowerShell.
Under the general settings, you can optionally setup the name of the Beat and the Tags associated with the events.
...
#================================ General =====================================
# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
name: winlogbeat
# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
tags: ["windows_systems"]
# Optional fields that you can specify to add additional information to the
# output.
#fields:
# env: staging
...
We have used the default settings. That is a sample config if you want to set the name of the Beat.
Next, setup the Winlogbeat output. In this demo, we are sending the logs directly to Elasticsearch nodes. Elasticsearch 8 requires credential and connects via SSL.
Thus, get the CA cert from the Elasticsearch node;
cat /etc/elasticsearch/certs/http_ca.crt
and store it on the Winlogbeat folder. We have stored it as C:\Program Files\Winlogbeat\http_ca.crt.
Next, update the ES connection details:
# ================================== Outputs ===================================
# Configure what output to use when sending the data collected by the beat.
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["10.0.1.15:9200"]
# Protocol - either `http` (default) or `https`.
protocol: "https"
ssl.certificate_authorities: ['C:\Program Files\Winlogbeat\http_ca.crt']
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
username: "elastic"
password: "password"
# Pipeline to route events to security, sysmon, or powershell pipelines.
pipeline: "winlogbeat-%{[agent.version]}-routing"
If Elasticsearch and Kibana are not running on the same host and you want to use Kibana Winlogbeat dashboards, you can specify Kibana URL. Kibana must be reachable on non-loopback address. For example;
...
setup.kibana:
# Kibana Host
# Scheme and port can be left out and will be set to the default (http and 5601)
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
#host: "localhost:5601"
host: "192.168.43.104:5601"
...
Also, if you are using Logstash, you can comment out the Elasticsearch output and specify Logstash connection addresses.
...
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
# Array of hosts to connect to.
#hosts: ["localhost:9200"]
#hosts: ["192.168.43.104:9200", "192.168.43.105:9200", "192.168.43.106:9200"]
# Optional protocol and basic auth credentials.
#protocol: "https"
#username: "elastic"
#password: "changeme"
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
#hosts: ["localhost:5044"]
hosts: ["192.168.43.104:5044"]
...
Winlogbeat configuration checks
Once done with configuration, save the file and run the configuration checks. To run the configuration checks, open Powershell as an administrator and execute the command below;
cd C:\'Program Files'\Winlogbeat
.\winlogbeat.exe test config -c .\winlogbeat.yml -e
If there is no error with the configuration, you should see the Config Ok.
{"log.level":"info","@timestamp":"2025-02-03T08:29:57.323Z","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).configure","file.name":"instance/beat.go","file.line":1062},"message":"Home path: [C:\\Program Files\\Winlogbeat] Config path: [C:\\Program Files\\Winlogbeat] Data path: [C:\\Program Files\\Winlogbeat\\data] Logs path: [C:\\Program Files\\Winlogbeat\\logs]","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:57.362Z","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).configure","file.name":"instance/beat.go","file.line":1070},"message":"Beat ID: 77e5fe93-2798-4218-bbd0-9a0fb4e3b63a","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.320Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).createBeater","file.name":"instance/beat.go","file.line":570},"message":"Setup Beat: winlogbeat; Version: 8.17.1","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.345Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1623},"message":"Beat info","service.name":"winlogbeat","system_info":{"beat":{"path":{"config":"C:\\Program Files\\Winlogbeat","data":"C:\\Program Files\\Winlogbeat\\data","home":"C:\\Program Files\\Winlogbeat","logs":"C:\\Program Files\\Winlogbeat\\logs"},"type":"winlogbeat","uuid":"77e5fe93-2798-4218-bbd0-9a0fb4e3b63a"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.347Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1632},"message":"Build info","service.name":"winlogbeat","system_info":{"build":{"commit":"424070e87d831d2d66a7514e1c1120ad540a86db","libbeat":"8.17.1","time":"2025-01-10T18:12:41.000Z","version":"8.17.1"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.347Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1635},"message":"Go runtime info","service.name":"winlogbeat","system_info":{"go":{"os":"windows","arch":"amd64","max_procs":1,"version":"go1.22.10"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.350Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1641},"message":"Host info","service.name":"winlogbeat","system_info":{"host":{"architecture":"x86_64","native_architecture":"x86_64","boot_time":"2025-02-02T20:15:56Z","name":"MARARIVER","ip":["fe80::9bed:c7f4:5713:2aad","10.0.1.12","::1","127.0.0.1"],"kernel_version":"10.0.20348.2694 (WinBuild.160101.0800)","mac":["0e:14:41:e7:ff:51"],"os":{"type":"windows","family":"windows","platform":"windows","name":"Windows Server 2022 Datacenter","version":"10.0","major":10,"minor":0,"patch":0,"build":"20348.2700"},"timezone":"UTC","timezone_offset_sec":0,"id":"de78e0b1-2fb5-4213-aef1-ae8f79794514"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.353Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1670},"message":"Process info","service.name":"winlogbeat","system_info":{"process":{"cwd":"C:\\Program Files\\Winlogbeat","exe":"C:\\Program Files\\Winlogbeat\\winlogbeat.exe","name":"winlogbeat.exe","pid":7400,"ppid":6576,"start_time":"2025-02-03T08:29:53.540Z"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.378Z","log.logger":"esclientleg","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/esleg/eslegclient.NewConnection","file.name":"eslegclient/connection.go","file.line":132},"message":"elasticsearch url: http://localhost:9200","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.383Z","log.logger":"publisher","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings","file.name":"pipeline/module.go","file.line":105},"message":"Beat name: MARARIVER","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.389Z","log.logger":"winlogbeat","log.origin":{"function":"github.com/elastic/beats/v7/winlogbeat/beater.New","file.name":"beater/winlogbeat.go","file.line":70},"message":"State will be read from and persisted to C:\\Program Files\\Winlogbeat\\data\\.winlogbeat.yml","service.name":"winlogbeat","ecs.version":"1.6.0"}
Config OK
Load Elasticsearch templates
If Winlogbeat has a direction connection and is using Elasticearch as the output, it will automatically load the template. However, if you are using Logstash as the output, you need to manually load the Elasticsearch template. See example command to load the Elasticsearch template manually below;
cd C:\'Program Files'\Winlogbeat
.\winlogbeat.exe setup --index-management -E output.logstash.enabled=false -E 'output.elasticsearch.hosts=["192.168.43.104:9200"]'
Setup Kibana Dashboards
To load Winlogbeat default visualization dashboards, you need to have created the index pattern. Hence, navigate to Kibana and create the Winlogbeat index pattern.
Otherwise, if you are using Elasticsearch as your output, you can load the dashboards by running the setup command or enabling dashboard loading in the winlogbeat.yml (setup.dashboards.enabled: true) configuration. This will automatically create the index pattern.
cd C:\'Program Files'\Winlogbeat
Replace <KIBANA-IP> accordingly.
.\winlogbeat.exe setup --dashboards -E setup.kibana.host='http://<KIBANA-IP>:5601'
Loading dashboards (Kibana must be running and reachable)
Loaded dashboards
If you are using Logstash as the output, run the command below to load the dashboards.
cd C:\'Program Files'\Winlogbeat>
.\winlogbeat.exe setup -e -E output.logstash.enabled=false -E output.elasticsearch.hosts=['192.168.43.104:9200']
Running Winlogbeat
To start or stop Winlogbeat, navigate to install directory and execute the commands below respectively.
cd C:\'Program Files'\Winlogbeat>
Start-Service winlogbeat
To stop;
Stop-Service winlogbeat
You can also manage the Winlogbeat from system services.
View Winlogbeat Kibana Dashboard
When Winlogbeat runs, you should now have the events and the dashboards.
You can view Winlogbeat dashboards by navigating to Dashboards and select any Winlogbeat dashboard.
There you go. You can also check Winlogbeat events on Kibana SIEM (Security > Explore > Hosts > All Hosts > Your Windows Hosts).
That is all on how to send Windows logs to Elastic Stack using Winlogbeat and Sysmon. You can continue to explore Kibana SIEM and Winlogbeat dashboards to analyze your windows events. Enjoy.
Setup Multi-node Elasticsearch 7.x Cluster on Fedora 30/Fedora 29/CentOS 7
Install and Configure Elastic Auditbeat on Ubuntu 18.04
Install Filebeat on Fedora 30/Fedora 29/CentOS 7
How to Debug Logstash Grok Filters