Winlogbeat установка на windows server

In this article, we will install winlogbeat in Windows Server 2019(10.250.2.224) in order to monitor windows event logs, make necessary configurations to transfer event logs to logstash, and run winlogbeat as a service.

Let’s download the winlogbeat from https://www.elastic.co/downloads/beats/winlogbeat on our server and unzip it to the C drive.

Let’s make a few notes about the files in the Winlogbeat folder.

What is winlogbeat.yml File?

The winlogbeat.yml file is the basic config file and we enter information about which windows events it should monitor and how long it should monitor these logs. In addition, we enter the logstash information that will transfer these logs, into this file.

What is winlogbeat.reference.yml File?

The winlogbeat.reference.yml file provides us with a full configuration example.

What is install-service-winlogbeat.ps1 File?

Thanks to the install-service-winlogbeat.ps1 power shell script file, we run the winlogbeat as a service.

Configure winlogbeat.yml

Now let’s do the configurations in winlogbeat.yml config file. According to this configuration; We specify that we want to monitor Application, System, Security and Microsoft-Windows-Sysmon logs in windows eventlogs. And we state that we will ignore Application logs before 72 hours, and we state that we want to send these logs to port 5043 of logstash running at 10.250.2.222.

Start winlogbeat Service

Now, when we run the install-service-winlogbeat.ps1 power shell script, the winlogbeat will be ready to run as a service. As soon as we run the winlogbeat service, it will start sending logs to logstash.

PS C:\winlogbeat7.6.2windowsx86_64\winlogbeat7.6.2windowsx86_64> .\installservicewinlogbeat.ps1

PS C:\winlogbeat7.6.2windowsx86_64\winlogbeat7.6.2windowsx86_64> StartService winlogbeat

Now Winlogbeat has started to work as a service, we can confirm this from the task manager. However, since we do not configure logstash at the moment, it keeps event logs on itself. When we complete the logstash configuration, logs will be successfully transferred to logstash. In my next article I will be configuring the logstash.

You can read our other articles in this serie from the links below.

Install Elasticsearch on ubuntu server 19.10

Install Logstash on Ubuntu Server 19.10

Install Kibana on Ubuntu Server 19.10

Install Winlogbeat on Windows Server 2019

Configure Logstash to Read log files Windows

Create Kibana Dashboards For Windows Event Logs

image

Передо мной встала задача сбора логов с парка серверов на ОС Windows и ОС Linux. Для того чтобы решить её я воспользовался стэком OpenSearch. Во время настройки OpenSearch мне не хватало в открытых источниках наглядных примеров, а информация на официальных сайтах ElasticSearch и OpenSearch мне показалась обрывочной, слабо привязанной к реальным ситуациям. Поэтому я решил поделиться своим опытом и описать основные моменты установки и некоторые сценарии настройки и применения стэка OpenSearch, которые я применил в своей практике.

1. Введение

1.1. Коротко о том, что такое OpenSearch

OpenSearch – система полнотекстового поиска. Является форком Elasticsearch. Разработана компанией A9.com, дочерней компанией Amazon.com.

Стэк OpenSearch (OpenSearch + Logstash-oss-with-OpenSearch-output-plugin + OpenSearch–Dashboards) является бесплатным аналогом с отрытым исходным кодом стэку ELK (Elasticsearch + Logstash + Kibana). В совокупности с Beat’ами (Filebeat, Winlogbeat, и т.д.) образуют полный цикл управления логами: сбор, систематизация, поиск.

Можно найти и множество других применений этому стэку, но в этой статье речь пойдет главным образом именно о логах.

1.2. Коротко о форках Elasticsearch

Elasticsearch развивается под этим названием с 2010 года. С 2018 года, с версии 6.3 лицензия на ELK меняется, появляется платная и бесплатная версия. Позже Amazon создает форк Elasticsearch и называет его «Open Distro for Elasticsearch». А в 2021 году Amazon создает новый форк и называет его «OpenSearch». Немного позже Российская компания Arenadata становится официальным партнёром OpenSearch в России и выпускает отечественный форк под названием «Arenadata LogSearch (ADLS)» под платной лицензией.

1.3. Что и зачем будем настраивать

Стэк OpenSearch мы будем разворачивать из архивов, это особенно актуально в закрытых средах. Кроме того, мы не будем использовать Kubernetes и Docker, то есть будем производить установку в классической среде.

В качестве ОС под OpenSearch будем использовать Linux Ubuntu 20.04.
Логи будут транспортироваться с серверов, на которых они возникли на сервер OpenSearch в виде сообщений. Для того, чтобы гарантировать доставку сообщений, в качестве транспорта мы будем использовать сервис Apache Kafka.

Apache Kafka – это распределённый программный брокер сообщений, который гарантирует доставку сообщений.

Мы введем OpenSearch в кластер, но будет описан ввод только основной ноды.

Кластер OpenSearch позволяет решать сразу несколько задач. Во-первых, традиционно, кластер решает задачу масштабирования, распределения нагрузки и отказоустойчивости. Во-вторых, данные накопленные в OpenSearch, можно распределять по их значимости на «Горячие» («Hot»), «Теплые» («Warm») и «Холодные» («Cold»). А ноды кластера можно настроить так чтобы они принимали только «Горячие» данные, или «Теплые» данные, или только «Холодные». И если на ноду под «Холодные» данные выделять менее производительные мощности, то это позволит сэкономить ресурсы.

1.4. Настраиваемая схема

На серверы, с которых нужно собирать логи, мы установим Beat’ы. На серверы под управлением ОС Windows мы установим Filebeat и Winlogbeat. На серверы под управлением Linux мы установим только Filebeat. Beat’ы будут отправлять сообщения с логами в Kafk’у. Logstash будет брать эти сообщения из Kafka, обрабатывать их и отправлять в OpenSearch. Пользователь будет осуществлять просмотр и поиск по логам через OpenSearch-Dashboards. Схема взаимодействия представлена на Рис.1.

image

Рис.1

Договоримся, что машина Node OpenSearch будет иметь:
1. Имя «server-elk01»;
2. Ip адрес «10.0.0.70».
Машина с Kafka будет иметь:
1. Ip адрес «10.0.0.60».

2. Установка стэка OpenSearch

2.1. Подготовка Linux машины Node OpenSearch

Перед началом установки основных пакетов установим Java и Unzip, а также создадим пользователей для OpenSearch и Logstash.

1. Установка Java:

apt install openjdk-11-jdk

2. Установка unzip:

apt install unzip

3. Создание пользователя opensearch:

groupadd opensearch
useradd opensearch -g opensearch -M -s /bin/bash
passwd opensearch

4. Создание пользователя logstash:

groupadd logstash
useradd logstash -g logstash -M -s /bin/bash
passwd logstash

2.2. Установка OpenSearch (аналог ElasticSearch)

OpenSearch, как можно было заметить, является ядром всего стэка. OpenSearch содержит базу данных, в которой будут храниться логи. Кроме того, OpenSearch имеет API для обработки запросов как на ввод данных, так и на вывод. Так же OpenSearch индексирует поступившие в неё данные и осуществляет поиск по этим данным.

Приступаем к установке.

1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать OpenSearch версии 1.2.4 (https://artifacts.opensearch.org/releases/bundle/opensearch/1.2.4/opensearch-1.2.4-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.

2. Даем права на выполнение для архива:

chmod +x opensearch-1.2.4-linux-x64.tar.gz

3. Распаковываем архив:

tar -xf opensearch-1.2.4-linux-x64.tar.gz

4. Будем устанавливать OpenSearch в каталог «/opt/opensearch», поэтому создаем рабочий каталог для OpenSearch:

mkdir /opt/opensearch

5. Переносим распакованные данные в рабочий каталог:

mv ./opensearch-1.2.4/* /opt/opensearch

6. Удаляем каталог, оставшийся от распаковки:

rmdir ./opensearch-1.2.4

7. Делаем пользователя opensearch владельцем рабочего каталога OpenSearch:

chown -R opensearch:opensearch /opt/opensearch

8. Запускает установочный скрипт от имени пользователя opensearch:

sudo -u opensearch /opt/opensearch/opensearch-tar-install.sh

Дожидаемся сообщения «Node ‘server-elk01’ initialized» и нажимаем «Ctrl+C».

9. Создаем файл демона для работы OpenSearch:

nano /lib/systemd/system/opensearch.service

/lib/systemd/system/opensearch.service


[Unit]
Description=Opensearch
Documentation=https://opensearch.org/docs/latest
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
RuntimeDirectory=opensearch
PrivateTmp=true

Restart=on-failure
RestartSec=60s

WorkingDirectory=/opt/opensearch

User=opensearch
Group=opensearch

ExecStart=/opt/opensearch/bin/opensearch

StandardOutput=journal
StandardError=inherit

# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65535

# Specifies the maximum number of processes
LimitNPROC=4096

# Specifies the maximum size of virtual memory
LimitAS=infinity

# Specifies the maximum file size
LimitFSIZE=infinity

# Not use SWAP
LimitMEMLOCK=infinity

# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0

# Allow a slow startup before the systemd notifier module kicks in to extend the timeout
TimeoutStartSec=75

[Install]
WantedBy=multi-user.target

За основу файла демона я взял файл оригинального демона Elasticsearch и переработал его.

10. Делаем пользователя root владельцем файла демона:

chown -R root:root /lib/systemd/system/opensearch.service

11. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:

systemctl daemon-reload

12. Создаем каталог для логов opensearch:

mkdir /var/log/opensearch

13. Делаем пользователя opensearch владельцем каталога логов:

chown -R opensearch /var/log/opensearch

2.3. Настройки производительности

Для того чтобы получить высокую производительность недостаточно иметь хорошее железо, нужно еще и правильно настроить систему. Попробуем оптимизировать несколько параметров.

1. Настройки Java:
Производятся в файле «/opt/opensearch/config/jvm.options». Необходимо настроить два параметра: «Xmx» и «Xms».

Оба параметра рекомендуется установить в значение равное 50% от имеющейся физической памяти узла. Чем больше доступно памяти, тем лучше, но есть ограничения. Значения параметров «Xmx» и «Xms» не должны превышать значение параметра JVM «compressed object pointers» (по умолчанию равен 32 Gb). И, значения параметров «Xmx» и «Xms» не должны превышать значение параметра JVM «zero-based compressed oops» (по умолчанию равен 26 Gb).

У меня на машине «server-elk01» установлено 8 Gb оперативной памяти поэтому настройки получились такие:

nano /opt/opensearch/config/jvm.options

/opt/opensearch/config/jvm.options

2. Настройка виртуальной памяти:

nano /etc/sysctl.conf

/etc/sysctl.conf


...
# Добавить в конце файла
vm.max_map_count=262144

Для того, чтобы изменения вступили в силу без перезагрузки хоста можно выполнить следующую команду в консоли:

sysctl -w vm.max_map_count=262144

2.4. Первый запуск OpenSearch

На этом этапе уже все готово к тому, чтобы запустить OpenSearch. Глубокой настройкой займемся позже, а сейчас просто запустим OpenSearch.

1. Запустим настроенный нами демон OpenSearch:

systemctl start opensearch

2. Проверим статус запуска демона OpenSearch:

systemctl status opensearch

3. Настроим автозапуск демона OpenSearch:

systemctl enable opensearch

4. Проверим работоспособность демона OpenSearch:

curl -X GET https://localhost:9200 -u 'admin:admin' --insecure

Если вы устанавливаете OpenSearch в ознакомительных целях, то можете считать установку законченной и приступать к использованию OpenSearch, правда только в консольном режиме, Web-интерфейс (OpenSearch-Dashboards) мы еще не устанавливали.

2.5. Смена пароля учетной записи OpenSearch

По умолчанию в OpenSearch предустановлена админская учетная запись «admin» с паролем «admin», поэтому для повышения уровня безопасности сменим пароль этой учетной записи. Для этого выполним следующие шаги.

1. Остановим демон OpenSearch:

systemctl stop opensearch

2. Дадим права на выполнение скрипта получения хэша пароля:

chmod +x /opt/opensearch/plugins/opensearch-security/tools/hash.sh

3. Запускаем скрипт и вводим новый пароль «yN-3L(GMmAAw»:

/opt/opensearch/plugins/opensearch-security/tools/hash.sh

Запоминаем полученный хэш, например:

$2y$12$OCvWNlMu8VbOarfdXdcjPOnHarqktJIcTYjwoykXdaJJfjcCTmfXO

К слову, я указал реальный хэш к описанному паролю, в тестах можно их использовать.

4. Заменим хэш пароля пользователя admin:

nano /opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml

/opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml

...
admin:
  hash: "$2y$12$OCvWNlMu8VbOarfdXdcjPOnHarqktJIcTYjwoykXdaJJfjcCTmfXO"
...

5. Дадим права на выполнение скрипта для создания новых сертификатов:

chmod +x /opt/opensearch/plugins/opensearch-security/tools/securityadmin.sh

6. Перейдём в каталог со скриптом:

cd /opt/opensearch/plugins/opensearch-security/tools

7. Запустим демон OpenSearch:

systemctl start opensearch

8. Запускаем скрипт для создания новых сертификатов:

./securityadmin.sh -cd ../securityconfig/ -icl -nhnv \
   -cacert ../../../config/root-ca.pem \
   -cert ../../../config/kirk.pem \
   -key ../../../config/kirk-key.pem

9. Проверяем работоспособность OpenSearch с новым паролем:

curl -X GET https://localhost:9200 -u 'admin:yN-3L(GMmAAw' --insecure

Аналогичным образом можно сменить пароль и у других учетных записей, которые можно обнаружить в «/opt/opensearch/plugins/opensearch-security/securityconfig/internal_users.yml», если вам это необходимо.

2.6. Переводим OpenSearch в режим кластера

Как я писал выше в реализуемой схеме мы введем OpenSearch в кластер, но введем только одну ноду. Потому как задачи перед кластером можно поставить разные то и настройки, и схемы могут сильно отличаться. Если вам потребуется только одна машина с OpenSearch, то можно и не вводить её в кластер, тогда этот шаг можно пропустить. А мы приступим.

1. Остановим демон OpenSearch:

systemctl stop opensearch

2. Добавляем параметры в настройки OpenSearch:

nano /opt/opensearch/config/opensearch.yml

/opt/opensearch/config/opensearch.yml

# ------------------------------------ Node ------------------------------------
# Имя ноды:
node.name: os-node01-server-elk01
# Роли узла:
node.roles: [ master, data ]
#
# ---------------------------------- Network -----------------------------------
# Адрес узла - принимать на любых адресах:
network.host: 0.0.0.0
# Порт:
http.port: 9200
#
# ---------------------------------- Cluster -----------------------------------
# Имя кластера:
cluster.name: os_cluster
# Список узлов в голосовании по выбору master узла:
cluster.initial_master_nodes: ["os-node01"]
#
# --------------------------------- Discovery ----------------------------------
# Список master узлов кластера:
discovery.seed_hosts: ["10.0.0.70"]
#
# ----------------------------------- Paths ------------------------------------
# Директория с данными:
path.data: /opt/opensearch/data
# Директория с логами:
path.logs: /var/log/opensearch
#
######## Start OpenSearch Security Demo Configuration ########
# WARNING: revise all the lines below before you go into production
plugins.security.ssl.transport.pemcert_filepath: esnode.pem
plugins.security.ssl.transport.pemkey_filepath: esnode-key.pem
plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem
plugins.security.ssl.transport.enforce_hostname_verification: false
plugins.security.ssl.http.enabled: true
plugins.security.ssl.http.pemcert_filepath: esnode.pem
plugins.security.ssl.http.pemkey_filepath: esnode-key.pem
plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem
plugins.security.allow_unsafe_democertificates: true
plugins.security.allow_default_init_securityindex: true
plugins.security.authcz.admin_dn:
  - CN=kirk,OU=client,O=client,L=test, C=de

plugins.security.authcz.type: internal_opensearch
plugins.security.enable_snapshot_restore_privileges: true
plugins.security.check_snapshot_restore_write_privileges: true
plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"]
plugins.security.system_indices.enabled: true
plugins.security.system_indices.indices: [".opendistro-alerting-config", ".opendistro-alerting-alert*", ".opendistro-anomaly-results*", ".opendistro-anomaly-detector*", ".opendistro-anomaly-checkpoints", ".opendistro-anomaly-detection-state", ".opendistro-reports-*", ".opendistro-notifications-*", ".opendistro-notebooks", ".opendistro-observability", ".opendistro-asynchronous-search-response*", ".opendistro-metadata-store"]
node.max_local_storage_node: 3
######## End OpenSearch Security Demo Configuration ########

Обращаем внимание на роль ноды (master, data). Роль «master» — означает, что нода управляет кластером. Роль «data» — означает, что нода содержит базу данных.

3. Запускаем демон OpenSearch:

systemctl start opensearch

4. Проверим работоспособность OpenSearch:

curl -X GET https://localhost:9200 -u 'admin:yN-3L(GMmAAw' --insecure

5. Проверим состояние кластера:

curl -X GET https://localhost:9200/_cluster/health?pretty -u 'admin:yN-3L(GMmAAw' --insecure

6. Выясним кто мастер в кластере:

curl -X GET https://localhost:9200/_cat/master?pretty -u 'admin:yN-3L(GMmAAw' --insecure

2.7. Установка OpenSearch-Dashboards (аналог Kibana)

OpenSearch-Dashboards является Web-интерфейсом для OpenSearch. Для тех кто собирается работать с OpenSearch из консоли этот шаг можно и пропустить. Так же вместо OpenSearch-Dashboards в качестве Web-интерфейса можно установить и другие утилиты, например, «Grafana».
Приступим к установке OpenSearch-Dashboards.

1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать OpenSearch-Dashboards версии 1.2.0 (https://artifacts.opensearch.org/releases/bundle/opensearch-dashboards/1.2.0/opensearch-dashboards-1.2.0-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.

2. Даем права на выполнение для архива:

chmod +x opensearch-dashboards-1.2.0-linux-x64.tar.gz

3. Распаковываем архив:

tar -xf opensearch-dashboards-1.2.0-linux-x64.tar.gz

4. Будем устанавливать OpenSearch-Dashboards в каталог «/opt/opensearch-dashboards», поэтому создаем рабочий каталог для OpenSearch-Dashboards:

mkdir /opt/opensearch-dashboards

5. Переносим распакованные данные в рабочий каталог:

mv ./opensearch-dashboards-1.2.0-linux-x64/* ./opensearch-dashboards-1.2.0-linux-x64/.* /opt/opensearch-dashboards/

6. Удаляем каталог, оставшийся от распаковки:

rmdir ./opensearch-dashboards-1.2.0-linux-x64

7. Делаем пользователя opensearch владельцем рабочего каталога OpenSearch-Dashboards:

chown -R opensearch:opensearch /opt/opensearch-dashboards

По умолчанию OpenSearch-Dashboards доступен по порту 5601. Можно изменить порт в настройках OpenSearch-Dashboards, однако тогда придется запускать сервис под учетной записью root.

Поэтому оставим порт по умолчанию и позже настроим переадресацию через NGINX.

Для информации, для смены порта по умолчанию необходимо внести следующие изменения:

/opt/opensearch-dashboards/config/opensearch_dashboards.yml

nano /opt/opensearch-dashboards/config/opensearch_dashboards.yml

...
# Добавить в конец файла
server.port: 80
server.host: 10.0.0.70

8. Создаем файл демона для работы OpenSearch-Dashboards:

nano /lib/systemd/system/opensearch_dashboards.service

/lib/systemd/system/opensearch_dashboards.service


[Unit]
Description=Opensearch_dashboards
Documentation=https://opensearch.org/docs/latest
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
RuntimeDirectory=opensearch_dashboards
PrivateTmp=true

WorkingDirectory=/opt/opensearch-dashboards

User=opensearch
Group=opensearch

ExecStart=/opt/opensearch-dashboards/bin/opensearch-dashboards

StandardOutput=journal
StandardError=inherit

# Specifies the maximum file descriptor number that can be opened by this process
LimitNOFILE=65535

# Specifies the maximum number of processes
LimitNPROC=4096

# Specifies the maximum size of virtual memory
LimitAS=infinity

# Specifies the maximum file size
LimitFSIZE=infinity

# Disable timeout logic and wait until process is stopped
TimeoutStopSec=0

# Allow a slow startup before the systemd notifier module kicks in to extend the timeout
TimeoutStartSec=75

[Install]
WantedBy=multi-user.target

9. Делаем пользователя root владельцем файла демона:

chown -R root:root /lib/systemd/system/opensearch_dashboards.service

10. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:

systemctl daemon-reload

11. Запустим настроенный нами демон OpenSearch-Dashboards:

systemctl start opensearch_dashboards

12. Проверим статус запуска демона OpenSearch-Dashboards:

systemctl status opensearch_dashboards

13. Настроим автозапуск демона OpenSearch-Dashboards:

systemctl enable opensearch_dashboards

2.8. Установка NGINX и настройка переадресации порта OpenSearch-Dashboards

OpenSearch-Dashboards работает на порту 5601, но это неудобно, так как портом для Web-ресурсов по умолчанию является порт 80. Мы установим NGINX и настроим его так, чтобы он слушал 80 порт и перенаправлял с него все запросы на порт 5601. То есть NGINX будет работать в режиме прокси. Приступаем.

1. Установим Nginx на хост:

apt install nginx

2. Настроим автозапуск Nginx:

systemctl enable nginx

3. Не будем удалять, а сделаем резервную копию настроек Nginx по умолчанию:

cp /etc/nginx/sites-enabled/default /etc/nginx/sites-enabled/default.sav

4. Настроим переадресацию OpenSearch-Dashboards с порта 5601 на 80:

nano /etc/nginx/sites-enabled/default

/etc/nginx/sites-enabled/default


map $http_upgrade $connection_upgrade { # WebSocket support
        default upgrade;
        '' '';
    }

server {
    listen 80;
    server_name server-elk01 server-elk01.domain.ad 10.0.0.70;

    location / {
         proxy_pass http://127.0.0.1:5601; # full internal address
         proxy_http_version  1.1;
         proxy_set_header    Host $server_name:$server_port;
         proxy_set_header    X-Forwarded-Host $http_host;    # necessary for proper absolute redirects and TeamCity CSRF check
         proxy_set_header    X-Forwarded-Proto $scheme;
         proxy_set_header    X-Forwarded-For $remote_addr;
         proxy_set_header    Upgrade $http_upgrade; # WebSocket support
         proxy_set_header    Connection $connection_upgrade; # WebSocket support
    }
}

5. Активируем новые настройки Nginx без перезагрузки демона:

nginx -s reload

В случае неудачи перезагрузим Nginx:

service nginx reload

2.9. Настройка файрволла на запрет доступа по порту 5601

После того как мы настроили на NGINX переадресацию с 80 порта на порт 5601 сам порт 5601 продолжит принимать запросы. На этом шаге мы закроем порт 5601 файерволлом. Это, конечно, не обязательно, но, я считаю, что так будет больше порядка.

Само правило для файрволла не сложное, а вариантов как сделать так, чтобы после перезагрузки системы правило активировалось вновь можно придумать много. Я пойду по пути создания демона, который будет запускать bash-скрипт «/etc/rc.local». А сам скрипт будет содержать команду добавления правила в файрволл. Приступим.

1. Правило привяжем к сетевому интерфейсу, поэтому сначала необходимо узнать название сетевого интерфейса, для этого воспользуемся командой:

ip link show

или командой:

ifconfig

В моей системе сетевой интерфейс имеет имя «ens160».

2. Создадим Bash-скрипт с правилом файрволла:

nano /etc/rc.local

/etc/rc.local


#!/bin/bash
iptables -A INPUT -i ens160 -p tcp --dport 5601 -j DROP

exit 0

Соответственно имя интерфейса нужно изменить на имя своего интерфейса.

3. Дадим скрипту права на выполнение:

chmod +x /etc/rc.local

4. Делаем пользователя root владельцем скрипта:

chown -R root:root /etc/rc.local

5. Создадим файл демона для запуска скрипта:

nano /etc/systemd/system/rc-local.service

/etc/systemd/system/rc-local.service


[Unit]
 Description=/etc/rc.local Compatibility
 ConditionPathExists=/etc/rc.local

[Service]
 Type=forking
 ExecStart=/etc/rc.local start
 TimeoutSec=0
 StandardOutput=tty
 RemainAfterExit=yes
 SysVStartPriority=99

[Install]
 WantedBy=multi-user.target

6. Зададим права на файл демона:

chmod 644 /etc/systemd/system/rc-local.service

7. Делаем пользователя root владельцем файла демона:

chown -R root:root /etc/systemd/system/rc-local.service

8. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:

systemctl daemon-reload

9. Запустим настроенный нами демон rc-local:

systemctl start rc-local

10. Проверим статус запуска демона rc-local:

systemctl status rc-local

11. Настроим автозапуск демона rc-local:

systemctl enable rc-local

2.10. Установка Logstash-oss-with-OpenSearch-output-plugin

Logstash в реализуемой схеме выполняет несколько задач. Во-первых, он как насос будет постоянно обращаться к Kafka и втягивать все новые сообщения, которые будут накапливаться в Kafka. Во-вторых, получаемые сообщения он будет преобразовывать правилами, которые мы в него позже заложим. И в-третьих, он будет раскладывать преобразованные сообщения по разным индексам в OpenSearch. Приступим к установке.

1. Переходим на официальный сайт OpenSearch (https://opensearch.org/downloads.html) и скачиваем архив tar.gz нужной версии. В этой статье я буду использовать Logstash-oss-with-OpenSearch-output-plugin версии 7.16.2 (https://artifacts.opensearch.org/logstash/logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz). После скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.
2. Даем права на выполнение для архива:

chmod +x logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz

3. Распаковываем архив:

tar -xf logstash-oss-with-opensearch-output-plugin-7.16.2-linux-x64.tar.gz

4. Будем устанавливать Logstash-oss-with-OpenSearch-output-plugin в каталог «/opt/logstash», поэтому создаем рабочий каталог для Logstash-oss-with-OpenSearch-output-plugin:

mkdir /opt/logstash

5. Переносим распакованные данные в рабочий каталог:

mv ./logstash-7.16.2/* /opt/logstash/

6. Удаляем каталог, оставшийся от распаковки:

rmdir ./logstash-7.16.2

7. Делаем пользователя logstash владельцем рабочего каталога Logstash-oss-with-OpenSearch-output-plugin:

chown -R logstash:logstash /opt/logstash

8. Создаем файл демона для работы Logstash-oss-with-OpenSearch-output-plugin:

nano /etc/systemd/system/logstash.service

/etc/systemd/system/logstash.service


[Unit]
Description=logstash

[Service]
Type=simple
User=logstash
Group=logstash

ExecStart=/opt/logstash/bin/logstash "--path.settings" "/opt/logstash/config"
Restart=always
WorkingDirectory=/opt/logstash
Nice=19
LimitNOFILE=16384

# When stopping, how long to wait before giving up and sending SIGKILL?
# Keep in mind that SIGKILL on a process can cause data loss.
TimeoutStopSec=75

[Install]
WantedBy=multi-user.target

Отмечу, что в файле демона оригинального Logstash от Elastic отсутствует параметр «TimeoutStopSec=75». И у меня были случаи, когда при остановке демон Logstash зависал, и приходилось искать java-процесс и убивать его. Параметр «TimeoutStopSec=75» принудительно завершит выполнение демона, если через 75 секунд после команды «stop» процесс не завершится.

9. Зададим права на файл демона:

chmod 644 /etc/systemd/system/logstash.service

10. Делаем пользователя root владельцем файла демона:

chown -R root:root /etc/systemd/system/logstash.service

11. Перечитаем конфигурацию systemd, чтобы система обнаружила вновь созданный демон:

systemctl daemon-reload

12. Зададим путь к базе данных и логам Logstash-oss-with-OpenSearch-output-plugin:

nano /opt/logstash/config/logstash.yml

/opt/logstash/config/logstash.yml

path.data: /opt/logstash/data
pipeline.ordered: auto
path.logs: /var/log/logstash

13. Зададим путь для чтения файлов pipelines:

nano /opt/logstash/config/pipelines.yml

/opt/logstash/config/pipelines.yml

- pipeline.id: man
  path.config: "/opt/logstash/config/conf.d/*.conf"

14. Создадим каталог для файлов pipelines:

mkdir /opt/logstash/config/conf.d

15. Создадим каталог для логов:

mkdir /var/log/logstash

16. Делаем пользователя logstash владельцем каталога логов:

chown -R logstash /var/log/logstash

17. На этом этапе следовало бы сконфигурировать файлы pipelines для обработки данных поступающих из kafka и уходящих в OpenSearch, но я опишу это позже в нескольких кейсах и предоставлю итоговые файлы конфигураций.
18. После окончания настройки еще раз делаем пользователя logstash владельцем всех файлов в рабочем каталоге Logstash-oss-with-OpenSearch-output-plugin:

chown -R logstash:logstash /opt/logstash

19. Запустим настроенный нами демон Logstash-oss-with-OpenSearch-output-plugin:

systemctl start logstash

20. Проверим статус запуска демона Logstash-oss-with-OpenSearch-output-plugin:

systemctl status logstash

21. Настроим автозапуск демона Logstash-oss-with-OpenSearch-output-plugin:

systemctl enable logstash

3. Установка Beat’ов

Beat’ы это своего рода агенты, которые устанавливаются на каждом сервер. Они следят за определенными объектами, в нашем случае за файлами с логами, и когда заметят, что в файле произошли изменения они (Beat’ы) сформируют сообщение, которое будет содержать эти изменения, и отправят их (сообщения), в нашем случае, в Kafka.

Сразу стоит отметить, что Elastic выпускает две версии beat’ов: (условно) «платные» (Filebeat, Winlogbeat и т.д.) и (условно) «бесплатные» (Filebeat-OSS-only, Winlogbeat-OSS-only и т.д.). На сколько я понял «платные» работают так же, как и «бесплатные», за исключением того, что в «платных» можно подключить платный функционал «X-Pack». То есть «платные» обладают расширенными возможностями, которые можно и не покупать, и при этом не обладают временным ограничением действия. Поэтому далее я буду использовать Filebeat и Winlogbeat, но без платного функционала «X-Pack».

3.1. Установка Filebeat на Linux

1. Переходим на официальный сайт Elastic в раздел Filebeat (https://www.elastic.co/downloads/beats/filebeat) и скачиваем архив deb нужной версии. В этой статье я буду использовать Filebeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-amd64.deb).
Я буду настраивать Filebeat на той же машине, где развернут OpenSearch, поэтому после скачивания перенесите архив на сервер «server-elk01» в удобный для вас каталог и перейдите в него в консоли.

2. Даем права на выполнение для архива:

chmod +x ./filebeat-7.12.1-amd64.deb

3. Установим Filebeat:

dpkg -i ./filebeat-7.12.1-amd64.deb

4. На этом этапе следовало бы сконфигурировать файл «/etc/filebeat/filebeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.

5. Подключаем модуль system в filebeat:

filebeat modules enable system

6. Подключаем модуль kafka в filebeat:

filebeat modules enable kafka

7. Проверим список подключенных модулей:

filebeat modules list

8. Запустим демон Filebeat:

systemctl start filebeat

9. Проверим статус запуска демона Filebeat:

systemctl status filebeat

10. Настроим автозапуск демона Filebeat:

systemctl enable filebeat

3.2. Установка Filebeat на Windows

1. Переходим на официальный сайт Elastic в раздел Filebeat (https://www.elastic.co/downloads/beats/filebeat) и скачиваем архив zip нужной версии. В этой статье я буду использовать Filebeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-windows-x86_64.zip).

2. Будем устанавливать Filebeat в каталог «C:\filebeat», поэтому после скачивания перенесите архив на сервер «server-windows01» и распакуйте его в каталог «C:\filebeat».

3. На этом этапе следовало бы сконфигурировать файл «C:\filebeat\filebeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.

4. Для создания службы нужно выполнить следующий скрипт с правами администратора:

powershell "C:\filebeat\install-service-filebeat.ps1"

5. Для запуска Filebeat без службы можно использовать команды:

cd C:\filebeat\
filebeat -e -c filebeat.yml

3.3. Установка Winlogbeat на Windows

1. Переходим на официальный сайт Elastic в раздел Winlogbeat (https://www.elastic.co/downloads/beats/winlogbeat) и скачиваем архив zip нужной версии. В этой статье я буду использовать Winlogbeat версии 7.12.1 (https://artifacts.elastic.co/downloads/beats/winlogbeat/winlogbeat-7.12.1-windows-x86_64.zip).

2. Будем устанавливать Winlogbeat в каталог «C:\winlogbeat», поэтому после скачивания перенесите архив на сервер «server-windows01» и распакуйте его в каталог «C:\winlogbeat».

3. На этом этапе следовало бы сконфигурировать файл «C:\winlogbeat\winlogbeat.yml», но я опишу это позже в нескольких кейсах и предоставлю итоговый файл конфигурации.

4. Для создания службы нужно выполнить следующий скрипт с правами администратора:

powershell "C:\winlogbeat\install-service-winlogbeat.ps1"

5. Для запуска Winlogbeat без службы можно использовать команды:

cd C:\winlogbeat\
winlogbeat -e -c winlogbeat.yml

4. Настройка на примере нескольких кейсов

Рассмотрим несколько ситуаций, в разрезе разных форматов собираемых данных и в разрезе разных Beat’ов, которые будут собирать данные. Посмотрим, как выглядят данные, которые нам предстоит собрать. Какой Beat их будет собирать. Как следует настроить Beat в каждой ситуации. И как следует настроить Logstash в каждой ситуации.

4.1. Сценарий сбора данных в формате json

4.1.1. Исходные данные

Файл (файлы) логов содержит сообщения в виде списка json объектов. Формат самого файла текстовый (не json), то есть квадратные скобки в начале и в конце файла не нужны.
Файл дописывается новыми сообщениями по мере необходимости.

Пример:

«C:\Temp\test_log_json2\2022-03-15.log»

{
    "logId":  "ID_1",
    "uniqueSubId":  "001",
    "mess":  "test_text"
}

{
    "logId":  "ID_2", "uniqueSubId":  "002", "mess":  "test_text2"
}

{
    "logId":  "ID_3",
    "uniqueSubId":  "003",
    "mess":  "test_text3"
}

Зачем нужно добавлять пустые строки после каждого json-объекта я опишу далее в разделе «Актуальные проблемы».

4.1.2. Процесс сбора данных через Filebeat

Служба Filebeat будет периодически проверять файл на наличие изменений. При обнаружении изменений эти изменения будут отправлены в Topic Kafka. На выходе в начало сообщения добавим метку времени отправки данных с сервера. Во входящее сообщение добавим тэг «test-json», а на выходе будем отправлять в Topic Kafka только те сообщения, которые содержат этот тэг. Так мы промаркируем сообщения. Маркировка сообщений становится актуальна, когда входящих и исходящих потоков несколько. Я добавил маркировку сообщения в этом кейсе для полноты конфигурации.

Отдельного внимания заслуживает параметр «codec.format». Если его не указывать, то, по умолчанию, отправляемое сообщение будет в формате json и будет содержать целый набор полей, которые содержат в основном техническую (справочную) информацию (имя и версию ОС, имя и версию Beat’а и т.д.). Меня же интересует только основное сообщение, которое содержится в поле «message», остальные данные я буду считать объективно известными, за счет того, что все потоки данных разделены на отдельные Topic’и Kafka.

Полный список полей, отправляемых в сообщении с настройками по умолчанию: «event», «log», «message», «@timestamp», «@metadata», «ecs», «agent», «host», «service», «input», «fileset».

Пример конфигурации Filebeat:

«C:\filebeat\filebeat.yml»

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - C:\Temp\test_log_json2\*
  multiline.pattern: '^{'
  multiline.negate: true 
  multiline.match: after
  processors:
  - decode_json_fields:
      fields: ["message"]
      target: "json"
  tags: ["test-json"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-test-json"
     when.contains:
       tags: "test-json"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m

4.1.3. Процесс обработки данных в Logstash

Демон Logstash при обнаружении новых данных в Topic’е Kafka начинает их прием и обработку. Данные внутри Logstash поступают сначала в модуль input, потом поступают в модуль filter и затем поступают в модуль output.

В модуле input примем данные из Topic’а Kafka и добавим в сообщение поле «type», которое будет содержать название Topic’а.

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-test-json"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-test-json"
    }
}

В модуле filter обрабатываются только те сообщения, в которых есть поле «type», значение которого равно имени Topic’а Kafka. Из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_filebeat». Декодируем основное сообщение из формата json, при этом все заголовки ключей станут одноименными полями, а значения ключей перейдут в значения полей. Добавим поле «timestamp_logstash», для фиксирования времени, в которое проводилась обработка сообщения.

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
if [type] == "server-windows01-test-json" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    json {
      source => "message"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

В модуле output обрабатываются только те сообщения, в которых есть поле «type», значение которого равно имени Topic’а Kafka. Каждое сообщение отправляется в OpenSearch в индекс, соответствующий текущей дате, имени Topic’а и префикса «Kafka».

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-test-json" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-test-json-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

4.2. Сценарий сбора однострочных данных в текстовом формате

4.2.1. Исходные данные

Каждая строка файла будет извлекаться как новое сообщение и всё содержимое строки будет помещено в поле «message» как текст. В Filebeat такой сценарий работает по умолчанию. Нам остается только промаркировать сообщения и отправить их в нужный Topic Kafka.

Если содержание всех строк стандартизировано, то в Logstash на этапе Filter можно разложить сообщение на заранее известные поля. В этом примере мы этого делать не будем.

Пример:

«C:\Temp\log\2022-03-16.log»

[2022.03.16 13:01:09] Info | Just text
Next text
[2022.03.16 13:02:01] Info | New text

4.2.2. Процесс сбора данных через Filebeat

Процесс сбора данных аналогичен ситуации сбора данных в формате json за исключением того, что нам не нужно парсить json, так же изменим тэг для маркировки входящих сообщений, и изменим Topic Kafka.

Пример конфигурации Filebeat:

«C:\filebeat\filebeat.yml»

filebeat.inputs:
- type: filestream
  paths:
    - C:\Temp\log\*.log
  tags: ["simple-logs"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-simple-logs"
     when.contains:
       tags: "simple-logs"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m

4.2.3. Процесс обработки данных в Logstash

Процесс обработки данных в Logstash будет незначительно отличаться от сценария для ситуации с json-объектами, поэтому я приведу только примеры конфигураций.

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-simple-logs"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-simple-logs"
    }
}

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
  if [type] == "server-windows01-simple-logs" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-simple-logs" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-simple-logs-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

4.3. Сценарий сбора Логов IIS

Для сбора логов IIS в Filebeat предусмотрен отдельный модуль, который так и называется IIS. В IIS логи делятся на две категории: «ACCESS» и «ERROR». У меня пути к этим логам настроены по умолчанию.

4.3.1. Процесс сбора данных через Filebeat

Для того, чтобы промаркировать эти две категории («ACCESS» и «ERROR») и направить их в разные Topic’и Kafka я использовал параметр «@metadata.pipeline».

Так же стоит обратить внимание на параметры «var.paths», по какой-то причине в путях необходимо использовать обратные слэши.

Пример конфигурации Filebeat:

«C:\filebeat\filebeat.yml»

filebeat.inputs:
filebeat.modules:
  - module: iis
    access:
      enabled: true
      var.paths: ["C:/inetpub/logs/LogFiles/*/*.log"]
    error:
      enabled: true
      var.paths: ["C:/Windows/System32/LogFiles/HTTPERR/*.log"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "rgmtpaydox39-iis-access"
     when.contains:
       "@metadata.pipeline": "iis-access"
   - topic: "rgmtpaydox39-iis-error"
     when.contains:
       "@metadata.pipeline": "iis-error"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m

4.3.2. Процесс обработки данных в Logstash

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-iis-access"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-iis-access"
    }
    kafka {
      type => "server-windows01-iis-error"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-iis-error"
    }
}

Во время обработки «iis-access» из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_filebeat». Затем из полученного сообщения снова вырежем метку времени из начала сообщения и добавим её в новое поле «timestamp_windows». Затем преобразуем тип поля «timestamp_windows» из строкового в тип «дата». Это позволит в дальнейшем в OpenSearch-Dashboards создавать шаблоны индексов с фильтрацией по этому полю. Затем добавим еще одно поле «timestamp_logstash».

За счет наличия трех меток времени мы сможем точно отследить, когда событие возникло в windows, когда попало в обработку Filebeat и когда попало в обработку Logstash.

Обработка «iis-error» осуществляется аналогичным образом.

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
  if [type] == "server-windows01-iis-access" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
      overwrite => [ "message" ]
    }
    date {
      match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
      target => "timestamp_windows"
      locale => "en"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-iis-error" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
      overwrite => [ "message" ]
    }
    date {
      match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
      target => "timestamp_windows"
      locale => "en"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-iis-access" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-iis-access-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-iis-error" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-iis-error-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

4.4. Сценарий сбора системных Логов Windows

4.4.1. Процесс сбора данных через Winlogbeat

Системные логи Windows будем собирать с помощью Winlogbeat. Доступны 3 вида логов: «Application», «Security» и «System».

Winlogbeat имеет особенность в отличие от Filebeat. Подставляемый на выходе параметр «timestamp» будет содержать не время отправки с сервера, а время события (из лога), произошедшего в Windows. Временная метка отправки с сервера будет содержаться в поле «event» в параметре «created» («event.created»). Кроме того, я посчитал еще несколько полей информативными. В итоге на выходе отправляются следующие поля: «@timestamp», «message», «event», «log», «winlog». При этом «@timestamp» расположен в начале сообщения и отделен пробелом. Все остальные поля я разделил символом «|».

Полный список полей, отправляемых в сообщении с настройками по умолчанию: «event», «log», «message», «winlog», «@timestamp», «@metadata», «ecs», «agent», «host».

Так же как и в Filebeat вы можете убрать параметр «codec.format» и посмотреть полный список полей и их значений, которые попадут в Logstash в формате json.

К каждому виду логов можно добавить параметр «ignore_older», чтобы игнорировать слишком старые логи. Я добавил этот параметр к «Application» со значением «72h», чтобы игнорировать логи старше 72 часов.

Пример конфигурации Winlogbeat:

«C:\winlogbeat\winlogbeat.yml»

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
    tags: ["server-windows01-application"]

  - name: Security
    tags: ["server-windows01-security"]

  - name: System
    tags: ["server-windows01-system"]

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-application"
     when.contains:
       tags: "server-windows01-application"
   - topic: "server-windows01-security"
     when.contains:
       tags: "server-windows01-security"
   - topic: "server-windows01-system"
     when.contains:
       tags: "server-windows01-system"
 codec.format:
   string: '%{[@timestamp]} %{[message]}|%{[event]}|%{[log]}|%{[winlog]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000

logging.level: info
logging.to_files: true
logging.files:
  path: C:\winlogbeat
  name: winlogbeat.log
  keepfiles: 7

4.4.2. Процесс обработки данных в Logstash

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-application"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-application"
    }
    kafka {
      type => "server-windows01-security"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-security"
    }
    kafka {
      type => "server-windows01-system"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-system"
    }
}

Во время обработки «application» из начала сообщения вырежем метку времени и добавим её в новое поле «timestamp_windows». Установим, что остальные поля разделены символом «|», в соответствии с конфигурацией Winlogbeat. Символ «|» необходимо экранировать символом «\».
Названия полей обозначим с таким же именем и в таком же порядке, как и в конфигурации Winlogbeat. Зная, что поле «event» находится в формате json распарсим его как json и поместим в то же поле. С полями «log» и «winlog» поступим аналогично. Так как время обработки сообщения в Winlogbeat находится в поле «event» в параметре «created», то создадим поле «timestamp_winlogbeat» и поместим туда значение из «event.created». После этого создадим поле «timestamp_logstash» с меткой времени обработки сообщения в Logstash.

Сообщения «security» и «system» обрабатываются аналогичным образом.

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
  if [type] == "server-windows01-application" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-security" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-system" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-application" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-application-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-security" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-security-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-system" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-system-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

4.5. Сценарий сбора системных Логов Linux

4.5.1. Процесс сбора данных через Filebeat

Системные логи Linux будем собирать с помощью Filebeat. Конфигурирование Filebeat в Linux практически идентично конфигурированию Filebeat в Windows. За счет того, что при установке Filebeat на Linux мы включили модуль system, можно настроить только вывод данных, сменив его с elasticsearch на kafka, остальные настройки можно оставить по умолчанию и данные пойдут в формате json.

Поскольку системные логи Linux хранятся в одном файле «/var/log/syslog» (архивные не берем в расчет) и формат логов однострочный, то есть каждое сообщение хранится в отдельной строке, то в этой ситуации можно так же применить «Сценарий сбора однострочных данных в текстовом формате». Именно этот вариант я и представлю.

Пример конфигурации Filebeat:

«/etc/filebeat/filebeat.yml»

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/syslog

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
 hosts: ["10.0.0.60:9092"]
 topic: "server-elk01-syslog"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat.log
  keepfiles: 7

4.5.2. Процесс обработки данных в Logstash

Пример конфигурации Logstash модуля input:

«/opt/logstash/config/conf.d/server-elk01-input.conf»

input {
    kafka {
      type => "server-elk01-syslog"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-elk01-syslog"
    }
}

Пример конфигурации Logstash модуля filter:

«/opt/logstash/config/conf.d/server-elk01-filter.conf»

filter {
  if [type] == "server-elk01-syslog" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output:

«/opt/logstash/config/conf.d/server-elk01-output.conf»

output {
  if [type] == "server-elk01-syslog" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-elk01-syslog-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

5. Итоговые файлы конфигураций

С учетом всех сценариев у меня получились следующие файлы конфигураций.

Пример конфигурации Filebeat на Linux:

«/etc/filebeat/filebeat.yml»

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/syslog

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
output.kafka:
 hosts: ["10.0.0.60:9092"]
 topic: "server-elk01-syslog"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat.log
  keepfiles: 7

Пример конфигурации Filebeat на Windows:

«C:\filebeat\filebeat.yml»

filebeat.inputs:
- type: filestream
  paths:
    - C:\Temp\log\*.log
  tags: ["simple-logs"]

- type: log
  enabled: true
  paths:
    - C:\Temp\test_log_json2\*
  multiline.pattern: '^{'
  multiline.negate: true 
  multiline.match: after
  processors:
  - decode_json_fields:
      fields: ["message"]
      target: "json"
  tags: ["test-json"]

filebeat.modules:
  - module: iis
    access:
      enabled: true
      var.paths: ["C:/inetpub/logs/LogFiles/*/*.log"]
    error:
      enabled: true
      var.paths: ["C:/Windows/System32/LogFiles/HTTPERR/*.log"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-simple-logs"
     when.contains:
       tags: "simple-logs"
   - topic: "server-windows01-test-json"
     when.contains:
       tags: "test-json"
   - topic: "server-windows01-iis-access"
     when.contains:
       "@metadata.pipeline": "iis-access"
   - topic: "server-windows01-iis-error"
     when.contains:
       "@metadata.pipeline": "iis-error"
 codec.format:
   string: '%{[@timestamp]} %{[message]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
 close_inactive: 10m

Пример конфигурации Winlogbeat:

«C:\winlogbeat\winlogbeat.yml»

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
    tags: ["server-windows01-application"]

  - name: Security
    tags: ["server-windows01-security"]

  - name: System
    tags: ["server-windows01-system"]

output.kafka:
 hosts: ["10.0.0.60:9092"]
 topics:
   - topic: "server-windows01-application"
     when.contains:
       tags: "server-windows01-application"
   - topic: "server-windows01-security"
     when.contains:
       tags: "server-windows01-security"
   - topic: "server-windows01-system"
     when.contains:
       tags: "server-windows01-system"
 codec.format:
   string: '%{[@timestamp]} %{[message]}|%{[event]}|%{[log]}|%{[winlog]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000

logging.level: info
logging.to_files: true
logging.files:
  path: C:\winlogbeat
  name: winlogbeat.log
  keepfiles: 7

Пример конфигурации Logstash модуля input для сервера «server-elk01»:

«/opt/logstash/config/conf.d/server-elk01-input.conf»

input {
    kafka {
      type => "server-elk01-syslog"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-elk01-syslog"
    }
}

Пример конфигурации Logstash модуля filter для сервера «server-elk01»:

«/opt/logstash/config/conf.d/server-elk01-filter.conf»

filter {
  if [type] == "server-elk01-syslog" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output для сервера «server-elk01»:

«/opt/logstash/config/conf.d/server-elk01-output.conf»

output {
  if [type] == "server-elk01-syslog" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-elk01-syslog-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

Пример конфигурации Logstash модуля input для сервера «server-windows01»:

«/opt/logstash/config/conf.d/server-windows01-input.conf»

input {
    kafka {
      type => "server-windows01-simple-logs"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-simple-logs"
    }
    kafka {
      type => "server-windows01-test-json"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-test-json"
    }
    kafka {
      type => "server-windows01-application"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-application"
    }
    kafka {
      type => "server-windows01-security"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-security"
    }
    kafka {
      type => "server-windows01-system"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-system"
    }
    kafka {
      type => "server-windows01-iis-access"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-iis-access"
    }
    kafka {
      type => "server-windows01-iis-error"
      bootstrap_servers => "10.0.0.60:9092"
      topics => "server-windows01-iis-error"
    }
}

Пример конфигурации Logstash модуля filter для сервера «server-windows01»:

«/opt/logstash/config/conf.d/server-windows01-filter.conf»

filter {
  if [type] == "server-windows01-simple-logs" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-test-json" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    json {
      source => "message"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-application" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-security" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-system" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_windows} %{DATA:message}\|%{DATA:event}\|%{DATA:log}\|%{GREEDYDATA:winlog}" }
      overwrite => [ "message" ]
    }
    json {
      source => "event"
      target => "event"
    }
    json {
      source => "log"
      target => "log"
    }
    json {
      source => "winlog"
      target => "winlog"
    }
    mutate {
     add_field => { "timestamp_winlogbeat" => "%{[event][created]}" }
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-iis-access" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
      overwrite => [ "message" ]
    }
    date {
      match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
      target => "timestamp_windows"
      locale => "en"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
  if [type] == "server-windows01-iis-error" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp_filebeat} %{GREEDYDATA:message}" }
      overwrite => [ "message" ]
    }
    grok {
      match => [ "message", "%{TIMESTAMP_ISO8601:timestamp_windows} %{GREEDYDATA:message}" ]
      overwrite => [ "message" ]
    }
    date {
      match => ["timestamp_windows", "ISO8601", "YYYY-MM-dd HH:mm:ss", "MMM dd, YYYY @ HH:mm:ss.ZZZ"]
      target => "timestamp_windows"
      locale => "en"
    }
    mutate {
     add_field => { "timestamp_logstash" => "%{[@timestamp]}" }
    }
  }
}

Пример конфигурации Logstash модуля output для сервера «server-windows01»:

«/opt/logstash/config/conf.d/server-windows01-output.conf»

output {
  if [type] == "server-windows01-simple-logs" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-simple-logs-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-test-json" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-test-json-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-application" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-application-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-security" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-security-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-system" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-system-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-iis-access" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-iis-access-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
  if [type] == "server-windows01-iis-error" {
    opensearch {
      hosts => "https://localhost:9200"
      user => "admin"
      password => "yN-3L(GMmAAw"
      index => "kafka-server-windows01-iis-error-%{+YYYY.MM.dd}"
      ssl_certificate_verification => false
    }
  }
}

6. Эксплуатация

Следующие шаги я разберу на примере сбора данных в формате json.

6.1. Создание шаблона индексов с помощью OpenSearch-Dashboards

Шаблон индексов позволяет просматривать сообщения одного или нескольких индексов сортируя их по дате. В качестве даты может выступать любое поле в формате «дата». Для того чтобы можно было осуществлять поиск с помощью OpenSearch-Dashboards необходимо создать шаблон индексов. Для этого перейдем в раздел «Stack Management», далее в раздел «Index Pattern» и нажмем кнопку «Create index pattern» (Рис.2).

image

Рис.2

Создадим шаблон индексов для данных из Topic’а Kafka без учета даты. Для этого введем в строке имени шаблона название индекса, но в конце, вместо даты введем символ «*» (Рис.3).

image

Рис.3

Параметром сортировки данных в шаблоне выберем «@timestamp», он совпадает с «timestamp_logstash» (Рис.4).

image

Рис.4

6.2. Отображение данных в OpenSearch-Dashboards

При переходе в раздел «Discover» в соответствующий шаблон индексов там отобразятся доступные сообщения. Можно раскрыть подробное содержание сообщения и увидеть, что все заголовки ключей json-сообщения стали одноименными полями, а значения ключей перешли в значения полей. Так же осталось поле «message», которое содержит изначальное json-сообщение (Рис.5). На этапе фильтрации сообщения в Logstash это поле («message») можно удалять, если в нем нет необходимости.

Если в json-сообщении содержится поле «message», то оригинальное поле «message», которое содержит изначальное json-сообщение, будет перезаписано на поле из json-сообщения.

image

Рис.5

6.3. Поиск в OpenSearch-Dashboards

Я рассмотрю совсем немного вариантов поиска, исключительно с целью обзора. Поиск будет только в формате «DQL» (формат по умолчанию).

Поиск осуществляется в разделе «Discover» в соответствующем шаблоне индексов.
1. Для поиска по всем доступным сообщениям во всех полях достаточно ввести нужную фразу в строке поиска (Рис.6).

image

Рис.6

2. Для поиска фраз по определенным полям в поисковой строке указывается имя нужного поля, затем, в качестве разделителя, идет символ двоеточия «:» и затем искомая фраза. Искомая фраза может содержать специальные символы «*», «?» и т.д. для формирования шаблона (Рис.7).

image

Рис.7

3. В поисковом запросе можно применять логические операции «or», «and», «not» (Рис.8).

image

Рис.8

6.4. Работа с OpenSearch через консоль (без OpenSearch-Dashboards)

Я приведу не большой список запросов для того, чтобы показать, как выглядит работа из консоли.
Запросить список индексов:

curl 'https://10.0.0.70:9200/_cat/indices?v&pretty' -u 'admin:yN-3L(GMmAAw' –insecure

Запросить структуру индекса:

curl 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_mapping?pretty'  -u 'admin:yN-3L(GMmAAw' --insecure

Удалить индекс:

curl -XDELETE 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30?pretty' -u 'admin:yN-3L(GMmAAw' –insecure

Вывести первые 10 сообщений из индекса (10 по умолчанию):

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?pretty' -u 'admin:yN-3L(GMmAAw' --insecure

Вывести первые 20 сообщений из индекса:

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?size=20&pretty' -u 'admin:yN-3L(GMmAAw' --insecure

Вывести первые 20 сообщений из шаблона индексов:

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-*/_search?size=20&pretty' -u 'admin:yN-3L(GMmAAw' --insecure

Осуществить поиск в индексе по запросу «*text*» через параметр «q»:

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?q=*text*&pretty' -u 'admin:yN-3L(GMmAAw' --insecure

Для поиска по составным запросам существуют несколько типов поиска:

term — точное совпадение искомой строки со строкой в индексе или термом;
match — все слова должны входить в строку, в любом порядке;
match_phrase — вся фраза должна входить в строку;
query_string — все слова входят в строку в любом порядке, можно искать по нескольким полям, используя регулярные выражения.

Осуществим поиск в индексе по полю «mess», искать будем точное совпадение фразы «test_text»:

curl -XGET 'https://10.0.0.70:9200/kafka-server-windows01-test-json-2022.03.30/_search?pretty ' -u 'admin:yN-3L(GMmAAw' --insecure -H 'Content-Type: application/json' -d '{"query" : {"term" : {"mess" : "test_text"}}}'

7. Актуальные проблемы

В этом разделе я опишу проблемы, с которыми я встретился.

7.1. Имена Topic’ов Kafka не должны содержать большие буквы

Logstash не может читать Topic’и Kafka, если они (Topic’и) содержат большие буквы. Поэтому имена Topic’ов Kafka не должны содержать большие буквы.

7.2. Filebeat некорректно создает Topic’и Kafka

При старте как Filebeat, так и Logstash создают Topic в Kafka, если в конфигурации соответствующего сервиса Topic имеется, а в Kafka он еще отсутствует. Но если Topic создает Filebeat, то Logstash не может читать этот топик. Если же Topic создает Logstash, то все сервисы могут читать этот Topic. Поэтому при создании/изменении конфигурации обеих служб сначала необходимо запустить Logstash и дождаться пока Topic в Kafka будет создан. И только после этого можно запускать Filebeat.

7.3. Требуется переход на следующую строку после json сообщения

При считывании сообщений из логов в формате json, после закрывающей фигурной скобки «}», которая закрывает json, в том случае, когда это последнее сообщение в файле, необходимо добавить переход на следующую строку. Иначе эта фигурная скобка «}» не попадет в обработку json и последнее json-сообщение в файле обрабатывается не корректно (не читается как json).

7.4. Необходимо обращать внимание на кодировку log-файлов

По умолчанию корректно обрабатываются log-файлы только в формате «UTF-8». Поэтому необходимо либо создавать log-файл в формате «UTF-8», либо явно указываться кодировку в Logstash и/или в Filebeat.

Например, я столкнулся с тем, что PowerShell по умолчанию формирует файлы в формате «UTF-16». Пришлось сменить кодировку по умолчанию в скрипте на «UTF-8»:

$PSDefaultParameterValues['*:Encoding'] = 'utf8'

8. Послесловие

Что мне хотелось бы добавить в эту статью, но по какой-то причине этого не произошло:

1. Настройка времени жизни индексов в OpenSearch;
2. Настройка Apache Kafka;
3. Настройка аутентификации Active Directory в OpenSearch-Dashboards по протоколу LDAP;
4. Создание визуализаций в OpenSearch-Dashboards;
5. Создание Dashboard’ов в OpenSearch-Dashboards.

На этом я заканчиваю. Всем спасибо за внимание к статье, надеюсь она окажется для кого-то полезной. И оставляйте ваши рецепты настройки OpenSearch в комментариях, думаю это так же будет интересно.

Winlogbeat – это компонент Elastick Stack, который позволяет собирать журналы событий Windows и отправлять их в Elasticsearch. Я решил вынести процесс в отдельную публикацию, чтобы сконцентрироваться на том, как выполняется Winlogbeat. Это позволит в последующем, например, при подготовке публикации с примера использования Winlogbeat, не загромождать публикацию информацией об установке.

В последующих публикациях я постараюсь привести примеры использования Winlogbeat.

Установка выполняется следующим образом:

1. Загрузите deb/rpm пакет со страницы загрузки:

2. Распакуйте директорию из архива в “C:\Program Files” и переименуйте директорию в “Winlogbeat”.

3. Запустите консоль PowerShell от имени администратора.

4. Запустите скрипт по установке Winlogbeat:

cd "C:\Program Files\Winlogbeat"
.\install-service-winlogbeat.ps1

5. Запустите сервис:

Start-Service winlogbeat

Установка Winlogbeat завершена.

В предыдущей статье мы рассмотрели, как развернуть собственный централизованный сервер сбора лога с различных типов сетевых устройства на базе стека Graylog (
Graylog
+
OpenSearch
+
MongoDB
). В этой статье мы покажем, как настроить отправку журналов событий с серверов Windows (включая события Active Directory) в Graylog.

Настройка сборщика данных и индексов Graylog для устройств Windows

Сначала нужно настроить в Graylog отдельные сборщики данных и потоки для логов, которые будут отправлять хосты Windows Server (чтобы не смешивались события от разных классов устройств). Перейдите в раздел System -> Inputs и добавьте новый сборщик Windows Server Devices типа Beats, который слушает на порту
TCP:5044
.

Создать сборщик логов для Windows

Затем создайте отдельный индекс для логов журналов событий Windows. На базе нового Input и индекса создайте новый поток для Windows в разделе Streams и запустите его.

Поток для журналов событий Windows в Graylog

Отправка событий Windows в Graylog с помощью Winlogbeat

Для отправки логов из журналов событий EventViewer с хостов Windows на сервер Graylog можно воспользоваться службой сборщика логов Winlogbeat. Winlogbeat это один из свободно распространяемых компонентов стека ELK. Службу Winlogbeat нужно установить на каждом хосте Windows, события с которого вы хотите видеть на сервере Graylog.

  1. Скачайте архив Winlogbeat со страницы загрузки (https://www.elastic.co/downloads/beats/winlogbeat)
  2. Распакуйте архив в папку
    C:\Program Files\winlogbeat
  3. Отредактируйте конфигурационный файл winlogbeat.yml

В самом простом случае можно использовать следующую конфигурацию, когда все события из журналов Application, Security и System будут отправлены на указанный сервер Graylog.

Обратите внимание, что в конфигурационном файле winlogbeat используется синтаксис YAML, а это значит нужно быть внимательным с пробелами и отступами.

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h
  - name: Security
  - name: System
output.logstash:
  hosts: ["192.168.14.146:5044"]

Можно использовать более гибкие условия фильтрации, чтобы получить только нужны логи. Например, чтобы получить события с определенными уровнями критичности и номерам EventID, используется такой конфиг:

winlogbeat.event_logs:
  - name: Security
    event_id: 4627, 4703, 4780-4782
    ignore_older: 24h
    level: critical, error
  - name: Microsoft-Windows-TerminalServicesRDPClient/Operational
    event_id: 1102

Примеры типовой универсальной конфигурации winlogbeat.yml для Windows Server можно посмотреть тут.

Сохраните файл winlogbeat.yml и проверьте корректность конфигурации Winlogbeat и доступность сервера сбора логов:

cd "C:\Program Files\winlogbeat"
./winlogbeat test config
./winlogbeat test output

Если все ОК, установите и запустите службу winlogbeat:

.\install-service-winlogbeat.ps1
Start-Service winlogbeat

запуск службы winlogbeat

Перейдите в веб-интерфейсе GrayLog сервера и проверьте, что в соответствующем потоке стали появляться события с ваших серверов Windows.

События Windows отправляются на сервер Пкфндщп

Сбор и анализ событий с контроллеров домена Active Directory с помощью Graylog

Рассмотрим, как использовать сервер Graylog для поиска и анализа событий Windows на примере контроллеров домена Active Directory.

При наличии множества контроллеров Active Directory администратору бывает сложно найти определенное событие, так как приходится просматривать журналы на каждом DC. Благодаря централизованному серверу Graylog, который хранит события со всех контроллеров домена, нужно событие нужно найти за секунды.

Например, вам нужно найти компьютер, с которого была заблокирована учетная запись пользователя из-за неверного ввода пароля. Для этого откройте строку фильтра Graylog, выберите нужный Stream, или укажите его в запросе (
streams:xxxxxxxxxxxxx
) и выполните следующий запрос:

winlogbeat_event_code:(4740 OR 4625) AND winlogbeat_event_provider:Microsoft\-Windows\-Security\-Auditing

поиск событий AD на сервере graylog

Сервер Graylog быстро нашел нужно событие и в его свойствах видно имя компьютера, с которого была заблокирована учетная записью.

Еще несколько примеров поиска различных событий в Active Directory:

  • Event ID 4767 – позволяет определить кто разблокировал пользователя AD
  • Event ID 4724 – кто и когда сбросил пароль пользователю домена
  • Event ID 4720 – позволяет узнать, кто и когда создал нового пользователя в AD, 4722 – событие включения учетной записи, 4725 – отключение, 4726 – удаление.
  • Отслеживание изменения в группах безопасности AD: 4727 (создана новая группа), 4728 (новый пользователь добавлен в группу), 4729 (пользователь удален из группы), 4730 (группа безопасности удалена)
  • Event ID 5137 (создана новая групповая политика домена), 5136 (изменена GPO), 5141 (удалена GPO)
  • Event ID 4624 — событие успешного входа пользователя в домен (позволяет быстро получить историю входа пользователя в AD)

Важно настроить отправку логов через Winlogbeat со всех контроллеров домена (список активных DC можно получить с помощью команды Get-ADDomainController. Сбор некоторых событий безопасности Active Directory нужно отдельно включить в настройках политик аудита в Default Domain Controller Policy.

Вы можете создать в Graylog сохранённые запросы и dashboardы для быстрого поиска интересующих вас событий. С помощью оповещений можно настроить рассылку алертов о критических событиях в AD.

Централизованное хранилище логов для хостов Windows

Graylog столь же удобно позволяет хранить, искать и анализировать события от других служб Windows Server. Ниже приведены примеры различных сценариев, в которых администратору приходится выполнять поиск по журналам событий Windows.

  • Аудит доступа к файлам и папкам на файловом сервере
  • Аудит удаления файлов в сетевой папке
  • Аудит изменений NTFS разрешений объектов
  • Анализ логов RDP подключений
  • Обнаружение перебора паролей к RDP серверу
  • Определить кто и когда перезагрузил или выключил сервер Windows:
    winlogbeat_event_code:1074
  • Реагирование на очистку журналов событий Windows (возможная компрометация сервера)
  • Оповещение об обнаружении вируса на одном из серверов Windows встроенным антивирусом Windows Defender (Event ID 1006, 1116)

При использовании быстрого и простого сервера Graylog поиск и фильтрация событий в журналах Windows существенно упрощается. На сайте Graylog есть статья, в которой указывается список критических событий безопасности Windows, которые рекомендуется отслеживать.

поиск в событиях Windows Server на сервере Пкфндщп

Централизованное хранилище логов Windows и Active Directory удобно использовать для быстрого расследования и реагирования на инциденты информационной безопасности, анализа работы компонентов, выявления сбоев.

Send Windows logs to Elastic Stack using Winlogbeat and Sysmon

In this guide, we are going to learn how to send Windows logs to Elastic Stack using Winlogbeat and Sysmon. Winlogbeat is an Elastic Beat that is used to collect windows system application, security, system or hardware events. Sysmon (System Monitor) on the other hand is a windows application that is used to monitor and log system activity to the Windows event log. It provides detailed information about process creations, network connections, and changes to file creation time

Table of Contents

Install and Setup ELK Stack

In order to visualize and analyze the events collected by Winlogbeat/Sysmon, you need to have setup your Elastic Stack. See the links below on how to install and setup Elastic Stack on Fedora/CentOS/Ubuntu servers.

Install Elastic Stack

Install Winlogbeat on Windows

In this guide, we are going to use Windows Server 2022 as our Windows system. Therefore, you need to install both Winlogbeat and Sysmon on your Windows system in order to ship events to Elastic stack.

Navigate to Winlogbeat downloads page and download Winlogbeat zip file.

Once the download is done, extract the Winlogbeat zipped file. As of this guide update, the current version ELK and beats is v8.17. Hence, we have winlogbeat-8.17.1-windows-x86_64.zip.

When you extract, you should get a folder, winlogbeat-8.17.1-windows-x86_64.

Move the winlogbeat-8.17.1-windows-x86_64 directory to C:\Program Files and rename it to Winlogbeat. Your directory should look like as in below;

winlogbeat programfiles

Next, to install Winlogbeat on Windows, you need to execute the install-service-winlogbeat.ps1 installation script.

Hence, open the Powershell as the administrator and change to Winlogbeat directory by executing the command below;

cd C:\'Program Files'\Winlogbeat

Next, run the Winlogbeat installer as shown below;

.\install-service-winlogbeat.ps1

If you get the error, cannot be loaded because the execution of scripts is disabled on this system, as shown below, you need to enable the script execution.

PS C:\Program Files\Winlogbeat> .\install-service-winlogbeat.ps1
File C:\Program Files\Winlogbeat\install-service-winlogbeat.ps1 cannot be loaded because the execution of scripts is disabled on this system. Please see "get-help about_signing" for more details.
At line:1 char:33
+ .\install-service-winlogbeat.ps1 <<<<
    + CategoryInfo          : NotSpecified: (:) [], PSSecurityException
    + FullyQualifiedErrorId : RuntimeException

As a result, you need to execute the Winlogbeat script with unrestricted execution policy as shown in the command below;

 PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-winlogbeat.ps1
PS C:\Program Files\Winlogbeat> PowerShell.exe -ExecutionPolicy UnRestricted -File .\install-service-winlogbeat.ps1

Status   Name               DisplayName
------   ----               -----------
Stopped  winlogbeat         winlogbeat

Install Sysmon on Windows

Download Sysmon from the downloads page.

Once the download is complete, extract the contents of the zipped file to C:\Program Files directory.

Once the extraction is done, you folder should look like as in below;

sysmon

Install Sysmon with md5 and sha256 hashing of process created, log loading of modules and monitoring network connections, open a CMD as an administrator and navigate to C:\Program Files\Sysmon and execute the command below;

cd C:\'Program Files'\Sysmon
.\Sysmon64.exe -i -accepteula -h md5,sha256,imphash -l -n


System Monitor v15.15 - System activity monitor
By Mark Russinovich and Thomas Garnier
Copyright (C) 2014-2024 Microsoft Corporation
Using libxml2. libxml2 is Copyright (C) 1998-2012 Daniel Veillard. All Rights Reserved.
Sysinternals - www.sysinternals.com

Sysmon64 installed.
SysmonDrv installed.
Starting SysmonDrv.
SysmonDrv started.
Starting Sysmon64..
Sysmon64 started.

Configuring Winlogbeat

The main configuration file for Winlogbeat is C:\Program Files\Winlogbeat\winlogbeat.yml with the reference config file being C:\Program Files\Winlogbeat\winlogbeat.reference.yml.

To edit this file, you can use Notepad++ as administrator.

By default, Winlogbeat is set to monitor application, security, and system logs, and logs from Sysmon.

# ======================== Winlogbeat specific options =========================

# event_logs specifies a list of event logs to monitor as well as any
# accompanying options. The YAML data type of event_logs is a list of
# dictionaries.
#
# The supported keys are name, id, xml_query, tags, fields, fields_under_root,
# forwarded, ignore_older, level, event_id, provider, and include_xml.
# The xml_query key requires an id and must not be used with the name,
# ignore_older, level, event_id, or provider keys. Please visit the
# documentation for the complete details of each option.
# https://go.es.io/WinlogbeatConfig

winlogbeat.event_logs:
  - name: Application
    ignore_older: 72h

  - name: System

  - name: Security

  - name: Microsoft-Windows-Sysmon/Operational

  - name: Windows PowerShell
    event_id: 400, 403, 600, 800

  - name: Microsoft-Windows-PowerShell/Operational
    event_id: 4103, 4104, 4105, 4106

  - name: ForwardedEvents
    tags: [forwarded]

# ====================== Elasticsearch template settings =======================

If you need to see more event types, you can execute the command Get-EventLog * in PowerShell.

Under the general settings, you can optionally setup the name of the Beat and the Tags associated with the events.

...
#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
name: winlogbeat

# The tags of the shipper are included in their own field with each
# transaction published.
#tags: ["service-X", "web-tier"]
tags: ["windows_systems"]

# Optional fields that you can specify to add additional information to the
# output.
#fields:
#  env: staging
...

We have used the default settings. That is a sample config if you want to set the name of the Beat.

Next, setup the Winlogbeat output. In this demo, we are sending the logs directly to Elasticsearch nodes. Elasticsearch 8 requires credential and connects via SSL.

Thus, get the CA cert from the Elasticsearch node;

cat /etc/elasticsearch/certs/http_ca.crt

and store it on the Winlogbeat folder. We have stored it as C:\Program Files\Winlogbeat\http_ca.crt.

Next, update the ES connection details:

# ================================== Outputs ===================================

# Configure what output to use when sending the data collected by the beat.

# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["10.0.1.15:9200"]

  # Protocol - either `http` (default) or `https`.
  protocol: "https"
  ssl.certificate_authorities: ['C:\Program Files\Winlogbeat\http_ca.crt']

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  username: "elastic"
  password: "password"

  # Pipeline to route events to security, sysmon, or powershell pipelines.
  pipeline: "winlogbeat-%{[agent.version]}-routing"

If Elasticsearch and Kibana are not running on the same host and you want to use Kibana Winlogbeat dashboards, you can specify Kibana URL. Kibana must be reachable on non-loopback address. For example;

...
setup.kibana:

  # Kibana Host
  # Scheme and port can be left out and will be set to the default (http and 5601)
  # In case you specify and additional path, the scheme is required: http://localhost:5601/path
  # IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
  #host: "localhost:5601"
  host: "192.168.43.104:5601"
...

Also, if you are using Logstash, you can comment out the Elasticsearch output and specify Logstash connection addresses.

...
#-------------------------- Elasticsearch output ------------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  #hosts: ["localhost:9200"]
  #hosts: ["192.168.43.104:9200", "192.168.43.105:9200", "192.168.43.106:9200"]

  # Optional protocol and basic auth credentials.
  #protocol: "https"
  #username: "elastic"
  #password: "changeme"

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]
  hosts: ["192.168.43.104:5044"]
...

Winlogbeat configuration checks

Once done with configuration, save the file and run the configuration checks. To run the configuration checks, open Powershell as an administrator and execute the command below;

cd C:\'Program Files'\Winlogbeat

.\winlogbeat.exe test config -c .\winlogbeat.yml -e

If there is no error with the configuration, you should see the Config Ok.

{"log.level":"info","@timestamp":"2025-02-03T08:29:57.323Z","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).configure","file.name":"instance/beat.go","file.line":1062},"message":"Home path: [C:\\Program Files\\Winlogbeat] Config path: [C:\\Program Files\\Winlogbeat] Data path: [C:\\Program Files\\Winlogbeat\\data] Logs path: [C:\\Program Files\\Winlogbeat\\logs]","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:57.362Z","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).configure","file.name":"instance/beat.go","file.line":1070},"message":"Beat ID: 77e5fe93-2798-4218-bbd0-9a0fb4e3b63a","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.320Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).createBeater","file.name":"instance/beat.go","file.line":570},"message":"Setup Beat: winlogbeat; Version: 8.17.1","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.345Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1623},"message":"Beat info","service.name":"winlogbeat","system_info":{"beat":{"path":{"config":"C:\\Program Files\\Winlogbeat","data":"C:\\Program Files\\Winlogbeat\\data","home":"C:\\Program Files\\Winlogbeat","logs":"C:\\Program Files\\Winlogbeat\\logs"},"type":"winlogbeat","uuid":"77e5fe93-2798-4218-bbd0-9a0fb4e3b63a"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.347Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1632},"message":"Build info","service.name":"winlogbeat","system_info":{"build":{"commit":"424070e87d831d2d66a7514e1c1120ad540a86db","libbeat":"8.17.1","time":"2025-01-10T18:12:41.000Z","version":"8.17.1"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.347Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1635},"message":"Go runtime info","service.name":"winlogbeat","system_info":{"go":{"os":"windows","arch":"amd64","max_procs":1,"version":"go1.22.10"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.350Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1641},"message":"Host info","service.name":"winlogbeat","system_info":{"host":{"architecture":"x86_64","native_architecture":"x86_64","boot_time":"2025-02-02T20:15:56Z","name":"MARARIVER","ip":["fe80::9bed:c7f4:5713:2aad","10.0.1.12","::1","127.0.0.1"],"kernel_version":"10.0.20348.2694 (WinBuild.160101.0800)","mac":["0e:14:41:e7:ff:51"],"os":{"type":"windows","family":"windows","platform":"windows","name":"Windows Server 2022 Datacenter","version":"10.0","major":10,"minor":0,"patch":0,"build":"20348.2700"},"timezone":"UTC","timezone_offset_sec":0,"id":"de78e0b1-2fb5-4213-aef1-ae8f79794514"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.353Z","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1670},"message":"Process info","service.name":"winlogbeat","system_info":{"process":{"cwd":"C:\\Program Files\\Winlogbeat","exe":"C:\\Program Files\\Winlogbeat\\winlogbeat.exe","name":"winlogbeat.exe","pid":7400,"ppid":6576,"start_time":"2025-02-03T08:29:53.540Z"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.378Z","log.logger":"esclientleg","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/esleg/eslegclient.NewConnection","file.name":"eslegclient/connection.go","file.line":132},"message":"elasticsearch url: http://localhost:9200","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.383Z","log.logger":"publisher","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings","file.name":"pipeline/module.go","file.line":105},"message":"Beat name: MARARIVER","service.name":"winlogbeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-02-03T08:29:58.389Z","log.logger":"winlogbeat","log.origin":{"function":"github.com/elastic/beats/v7/winlogbeat/beater.New","file.name":"beater/winlogbeat.go","file.line":70},"message":"State will be read from and persisted to C:\\Program Files\\Winlogbeat\\data\\.winlogbeat.yml","service.name":"winlogbeat","ecs.version":"1.6.0"}
Config OK

Load Elasticsearch templates

If Winlogbeat has a direction connection and is using Elasticearch as the output, it will automatically load the template. However, if you are using Logstash as the output, you need to manually load the Elasticsearch template. See example command to load the Elasticsearch template manually below;

cd C:\'Program Files'\Winlogbeat

.\winlogbeat.exe setup --index-management -E output.logstash.enabled=false -E 'output.elasticsearch.hosts=["192.168.43.104:9200"]'

Setup Kibana Dashboards

To load Winlogbeat default visualization dashboards, you need to have created the index pattern. Hence, navigate to Kibana and create the Winlogbeat index pattern.

kibana data views

Otherwise, if you are using Elasticsearch as your output, you can load the dashboards by running the setup command or enabling dashboard loading in the winlogbeat.yml (setup.dashboards.enabled: true) configuration. This will automatically create the index pattern.

cd C:\'Program Files'\Winlogbeat

Replace <KIBANA-IP> accordingly.

.\winlogbeat.exe setup --dashboards  -E setup.kibana.host='http://<KIBANA-IP>:5601'
Loading dashboards (Kibana must be running and reachable)
Loaded dashboards

If you are using Logstash as the output, run the command below to load the dashboards.

cd C:\'Program Files'\Winlogbeat>
.\winlogbeat.exe setup -e -E output.logstash.enabled=false -E output.elasticsearch.hosts=['192.168.43.104:9200']

Running Winlogbeat

To start or stop Winlogbeat, navigate to install directory and execute the commands below respectively.

cd C:\'Program Files'\Winlogbeat>
Start-Service winlogbeat

To stop;

Stop-Service winlogbeat

You can also manage the Winlogbeat from system services.

View Winlogbeat Kibana Dashboard

When Winlogbeat runs, you should now have the events and the dashboards.

winlogbeat index events kibana

You can view Winlogbeat dashboards by navigating to Dashboards and select any Winlogbeat dashboard.

winlogbeat dashboard events

There you go. You can also check Winlogbeat events on Kibana SIEM (Security > Explore > Hosts > All Hosts > Your Windows Hosts).

windows security events kibana siem

That is all on how to send Windows logs to Elastic Stack using Winlogbeat and Sysmon. You can continue to explore Kibana SIEM and Winlogbeat dashboards to analyze your windows events. Enjoy.

Setup Multi-node Elasticsearch 7.x Cluster on Fedora 30/Fedora 29/CentOS 7

Install and Configure Elastic Auditbeat on Ubuntu 18.04

Install Filebeat on Fedora 30/Fedora 29/CentOS 7

How to Debug Logstash Grok Filters

Понравилась статья? Поделить с друзьями:
0 0 голоса
Рейтинг статьи
Подписаться
Уведомить о
guest

0 комментариев
Старые
Новые Популярные
Межтекстовые Отзывы
Посмотреть все комментарии
  • При установке windows 10 пропадает интернет
  • Сколько нужно места под систему windows 10
  • Hp 15s eq1143ur установка windows
  • Windows 7 sp1 x86 x64 ultimate mon edition x64
  • Custom install windows only перевод