Dans ce blog post, le second de notre série sur la suite ELK, nous présentons une introduction à Logstash. Par définition, Logstash est un pipeline de traitement de données qui fournit les composants pour lire des données provenant de diverses sources, pour transformer/enrichir ces données et enfin pour les renvoyer à une base de données ou à un autre pipeline pour d’additionnel traitement. Avec son architecture modulaire, Logstash offre un cadre robuste pour construire facilement un pipeline de traitement de données. Pour montrer certaines des fonctionnalités de Logstash, nous utiliserons les log générés par Bro comme entrée pour notre pipeline de traitement de données.
L’installation de Logstash est simple. Vous pouvez télécharger les binaires officiels pour toutes les majeure distribution Linux. Le seul prérequis dont vous aurez besoin est Java (OpenJDK ou la distribution officiel par Oracle). Après l’installation, selon vos besoins, vous pouvez modifier les deux fichiers suivants:
LOGSTASH_INSTALL_PATH/jvm.options
: pour augmenter la taille de la pile (Heap) Java.LOGSTASH_INSTALL_PATH/logstash.yml
: pour modifier les configurations Logstash comme le nombre de worker, la taille du lot (nombre d’unité de donnée qu’un worker traitera) …Après l’installation de Logstash, la prochaine étape est la configuration de la pipeline de données. Avant d’entrer dans les détails de la configuration, il est important de noter que dans le jargon Logstash, un événement reçu via l’une des multiple sources supportées est représenté par un objet de données (JSON). Cet objet peut être transformé en utilisant un ou plusieurs des filtres et expédié vers un autre pipeline de traitement ou une base de données via un des plugins de sortie (outputs).
Nous commençons d’abord avec les plugins d’entrée: les composants en charge de la collecte de données. Par défaut, Logstash fournit plusieurs plugins d’entrée. La liste des plugins Logstash contient des functions simple comme la lecture d’un fichier ou l’écoute sur un port TCP/UDP. Elle contient également des plugins pour lire à partir de systèmes de messagerie/files d’attente comme Redis ou Kafka ou à partir d’une base de données (en utilisant l’interface JDBC). La liste complète des plugins d’entrée est disponible dans la documentation officielle de Logstash.
Dans notre exemple, nous utilisons Filebeat pour lire les fichiers de log générés par Bro et les envoyer au plugin d’entrée Beats de Logstash.
1input {
2 beats {
3 port => 5044
4 }
5}
La configuration d’entrée Filebeat suivante est utilisée pour envoyer les logs Bro à notre pipeline de traitement Logstash.
1- input_type: log
2 paths:
3 - /opt/bro/logs/current/conn.log
4 exclude_lines: ['^#']
5 fields:
6 logtype: broconn
7 document_type: logstash-bro
8 fields_under_root: true
Nous ajoutons ici des champs supplémentaires à l’objet de donnée généré par le processus Filebeat. Le champ logtype
est utilise ici pour ajouter un tag aux données et ainsi définir les règles de filtrage/analyse qui seront appliqués sur le message de log par la suite.
Pour la sortie de notre pipeline de traitement, Logstash offre plusieurs choix. Dans notre example, nous utilisons le plugin de sortie Elasticsearch. Les logs Bro sont sauvegarder dans un index Elasticsearch nommé dynamiquement en fonction du type et de la date d’occurence de l’événement. Avec l’exemple de configuration suivant, les logs seront stockés dans un index nommé: logstash-bro-2017.09.06
.
1output {
2 elasticsearch {
3 hosts => ["127.0.0.1"]
4 index => "%{type}-%{+YYYY.MM.dd}"
5 }
6}
Après la définition de l’entrée et de la sortie de notre pipeline de traitement des données, la dernière étape est la définition des filtres qui seront appliqués à ces données. Logstash propose une variété de filtres prêts à être utilisés pour analyser les données et extraire des méta-données. L’analyse peut être effectuée en utilisant des expressions régulières ou des structures CSV ou aussi des paires clé/valeur. En outre, certains de ces filtres permettent d’enrichir les données avec des informations de localisation géographique. Avec le filtre Ruby, vous bénéficier d’un grand degré de liberté avec la possibilité de transformer vos données en leurs applicant du code Ruby.
Comme mentionné ci-dessus, nous utilisons les logs Bro comme exemple pour démontrer les capacités Logstash. En particulier, nous utilisons les types de log Bro suivants:
Les exemples suivants montrent des échantillons des logs générés par Bro:
1# conn.log
21503305215.555986 CN77ln4RMSpStQV2X9 10.10.2.201 34240 145.239.49.27 80 tcp http 2.1
352318 606 186 SF T F 0 ShADadfF 6 866 5 390 (empty)
4# http.log
51503305207.181153 CfHZwi2X7yU6i6uk27 10.10.2.201 55178 54.243.45.82 80 1 GET pin
6g.chartbeat.net /ping?h=lemonde.fr&p=/&u=D2WApBJg9NeBfqUX-&d=lemonde.fr&g=12231&n=1&f=00001&c=0&x=0&m=0&y=16423&o=1
7340&w=501&j=45&R=1&W=0&I=0&E=0&e=0&r=&b=41665&t=Dv-EDdCijdiMTqbSiDDktxxMlkUO&V=93&i=Le Monde.fr - Actualit\xc3\xa9s
8 et Infos en France et dans le monde&tz=0&sn=1&EE=0&sv=C3gSusB3FL0pFM9d3lXu6j_19yB&_ http://www.lemonde.fr/ 1.1
9 Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/604.1 (KHTML, like Gecko) Version/11.0 Safari/604.1 elementary
10OS/0.4 (Loki) Epiphany/3.18.11 0 43 200 OK - - (empty) - - - -
11 - - FaItHM1BXflwyPgKy4 - image/gif
12
13# dns.log
141503305221.493568 CIkYyh2UsRrISregCg 10.10.2.201 41847 10.10.2.1 53 udp 27035 0.0
1500446 ping.chartbeat.net 1 C_INTERNET 1 A 0 NOERROR F F T T
160 54.243.45.82 45.000000 F
La première étape du traitement des logs Bro est l’analyse des données pour extraire les métadonnées correspondantes. Logstash fournit un filtre puissant pour faire ceci: grok. À l’aide de la documentation Bro, nous pouvons écrire des expressions régulières grok pour analyser les types de log mentionnés ci-dessus. En se basant sur ces échantillons, nous pouvons voir que les trois types de log partagent certains champs communs. Nous commençons par extraire les champs communs comme la date de l’événement, les adresses IP (source et destination) et les ports (source et destination). Les expressions régulières grok suivantes définissent la structure des logs Bro que nous avons énumérés ci-dessus.
1LOG_HEADER %{BASE16FLOAT:bro_ts}\t(-|%{DATA:cuid})\t(-|%{IP:src_ip})\t(-|%{INT:src_port})\t(-|%{IP:dst_ip})\t(-|%{INT:dst_port})\t%{GREEDYDATA:bro_message}
2
3CONN (-|%{DATA:proto})\t(-|%{DATA:service})\t(-|%{BASE16FLOAT:conn_duration})\t(-|%{INT:bytes_sent})\t(-|%{INT:bytes_received})\t(-|%{DATA:conn_state})\t(-|%{DATA:local_orig})\t(-|%{DATA:local_resp})\t(-|%{INT:missing_bytes})\t(-|%{DATA:conn_history})\t(-|%{INT:orig_pkts})\t(-|%{INT:orig_ip_bytes})\t(-|%{INT:resp_pkts})\t(-|%{INT:resp_ip_bytes})\t%{GREEDYDATA:tunnel_parents}
4
5HTTP %{INT:trans_depth}\t(-|%{DATA:http_method})\t(-|%{DATA:domain})\t(-|%{DATA:http_uri})\t(-|%{DATA:referer})\t(-|%{DATA:http_version})\t(-|%{DATA:user_agent})\t(-|%{INT:bytes_sent})\t(-|%{INT:bytes_received})\t(-|%{INT:http_resp_code})\t(-|%{DATA:http_resp_msg})\t(-|%{INT:http_info_code})\t(-|%{DATA:http_info_msg})\t(-|%{DATA:http_tags})\t(-|%{DATA:username})\t(-|%{DATA:http_pwd})\t(-|%{DATA:http_proxied})\t(-|%{DATA:sent_fuids})\t(-|%{DATA:sent_file_name})\t(-|%{DATA:sent_mime_types})\t(-|%{DATA:resp_fuids})\t(-|%{DATA:resp_file_name})\t(-|%{GREEDYDATA:resp_mime_types})
6
7DNS %{DATA:proto}\t(-|%{DATA:dns_trans_id})\t(-|%{BASE16FLOAT:conn_duration})\t%{DATA:domain}\t(-|%{INT:dns_qclass})\t%{DATA:query_class}\t(-|%{INT:dns_qtype})\t%{DATA:dns_query_type}\t(-|%{INT:dns_rcode})\t%{DATA:dns_resp_code}\t%{DATA:dns_aa}\t%{DATA:dns_tc}\t%{DATA:dns_rd}\t%{DATA:dns_ra}\t%{DATA:dns_z}\t(-|%{DATA:dns_answers})\t(-|%{DATA:dns_answers_ttls})\t%{GREEDYDATA:dns_rejected}
L’étape suivante, après la définition des expressions réguliers grok, est la définition des filtres. Ces filtres sont des scripts qui appliquent les expressions régulières grok aux données reçus et les enrichissent avec des méta-données.
1filter {
2 if [type] == "logstash-bro" and [logtype] == "broconn" {
3 grok {
4 patterns_dir => ["/etc/logstash/patterns"]
5 match => { "message" => "%{LOG_HEADER}" }
6 }
7 if [bro_message] {
8 grok {
9 patterns_dir => ["/etc/logstash/patterns"]
10 match => { "bro_message" => "%{CONN}" }
11 }
12 mutate {
13 replace => [ "message", "%{bro_message}" ]
14 remove_field => [ "bro_message" ]
15 }
16 }
17 mutate {
18 convert => { "dst_port" => "integer" }
19 ...
20 }
21 ...
22 if [bro_ts] {
23 date {
24 match => [ "bro_ts", "UNIX"]
25 timezone => "UTC"
26 }
27 mutate { remove_field => [ "bro_ts" ] }
28 }
29 }
30}
Dans l’extrait de code ci-dessus, nous appliquons le filtre grok
aux log de connexion Bro en utilisant l’expression régulière CONN
que nous avons défini précédemment. Nous utilisons également le filtre mutate
pour convertir le port de destination de la connection (dst_port
) en nombre. Cela indique à Elasticsearch de traiter ce champ comme un entier durant son processus d’indexation. Ceci va permettre l’application de certaines fonctionnalité de recherche et/ou d’aggregation sur ce champ.
Un autre exemple de plugins Logstash est le filtre Ruby. En utilisant ce filtre, nous pouvons transformer ou enrichir les données traversant la pipeline à l’aide d’un script Ruby. Dans l’exemple des logs DNS, le champ dns_answers
contient une liste d’attributs (par exemple IP) associés au nom de domaine en question. L’objectif ici est d’extraire le ou les IP associés au nom de domaine et les sauvegarder dans un nouveau champ.
1if [dns_answers] {
2 mutate {
3 split => { "dns_answers" => "," }
4 }
5 ruby {
6 init => "require 'resolv'"
7 code => "
8 aa_list = event.get('dns_answers')
9 ll = aa_list.length
10 d_ips = []
11 (1..ll).each do |i|
12 vv = aa_list[i-1]
13 if vv =~ Resolv::IPv4::Regex || vv =~ Resolv::IPv6::Regex
14 d_ips.push(vv)
15 end
16 end
17 if d_ips.length > 0
18 event.set('domain_ips', d_ips)
19 end
20 "
21 }
22}
Dans l’extrait de code ci-dessus, nous utilisons d’abord l’opérateur split
du filtre mutate
pour transformer le champ dns_answers
en une liste. Ensuite, nous utilisons un script ruby pour parcourir cette liste et en extraire les addresses IP. Ces addresses sont ensuite ajoutés à un nouveau champ appelé domain_ips
.
Notre dernier exemple des filtres Logstash est le filtre geoip
. Avec ce filtre, nous pouvons enrichir nos données avec les coordonnés géographique associes aux addresses IP. Logstash contient par défaut la base de données GeoLite2 City de Maxmind. Pour obtenir une version plus à jour ou plus détailler (payante), vous avez la possibilité de télécharger la base de données et de configurer le filtre pour l’utiliser.
1if [src_ip] and ![src_location] {
2 if [src_ip] !~ /^10\./ and [src_ip] !~ /^192\.168\./ and [src_ip] !~ /^172\.(1[6-9]|2[0-9]|3[0-1])\./ {
3 geoip {
4 source => "src_ip"
5 target => "src_geoip"
6 database => "/etc/logstash/vendors/GeoLite2-City.mmdb"
7 add_field => [ "src_location", "%{[src_geoip][longitude]}" ]
8 add_field => [ "src_location", "%{[src_geoip][latitude]}" ]
9 }
10 mutate {
11 convert => [ "src_location", "float" ]
12 }
13 }
14}
Dans l’exemple ci-dessus, nous ajoutons deux nouveaux attributs: src_geoip
et src_location
. Le premier est un objet JSON qui contient les données générés par le filtre geoip
, en particulier le nom du pays et le nom de la ville entre autre. Le second attribut contient la longitude et la latitude correspondant à la location de l’adresse IP. Ce champ est essentiellement utilisé pour le type de données Elasticsearch geo_point
. Ce choix va nous permettre d’effectuer des agrégations et recherche à base de distance géographique. Pour activer cette fonctionnalités, nous mettons à jour le schema de notre index Elasticsearch pour spécifier que le champ src_location
est de type geo_point
.
1 "src_location" : { "type": "geo_point"},
Ceci conclut notre présentation sur l’utilisation de Logstash pour construire un pipeline de traitement des données. Nous avons utilisé ici comme exemple de données les logs générés par le système de detection d’intrusion réseaux Bro. Notre objectif ici était de donner un aperçu des capacités de Logstash comme outil ETL. Vous trouverez plus de détails sur les scripts et les configurations présentés ici dans notre répertoire GitHub.