ELK


Gestion des logs de sécurité avec la suite ELK

ELK est un acronyme pour Elasticsearch, Logstash et Kibana. C’est une plateforme libre (Open Source) pour l’analyse des données offrant des fonctionnalités de recherche, transformation, enrichissement et visualisation des données.

Les principaux composant de la suite ELK sont :

  • Elasticsearch : un moteur de recherche distribuée basé sur Lucene.
  • Logstash : pipeline pour la collecte, transformation, structuration (parsing) et transfert des données.
  • Kibana : une interface intuitive pour la recherche, l’analyse et la présentation des données.
  • Beats : un agent léger et modulaire pour l’acheminement des données vers ELK.

Dans cette série de blogs nous allons présenter la suite ELK et comment l’utiliser pour organiser et analyser vos données. Nous utiliserons comme exemple, les données générées par le système de détection d’intrusion réseaux Bro. Nous allons montrer comment installer un cluster Elasticsearch composé de deux noeuds (nodes). En amont du cluster Elasticsearch, nous déployons un service Logstash pour la transformation des données et un agent de messages (Kafka/Redis). Pour la collecte des logs nous utilisons Filebeat pour la lecture des fichiers de logs Bro.

Comme mentionné ci-dessus, Elasticsearch est un moteur de recherche distribué. C’est aussi une base de données NoSQL orientée documents. Dans la terminologie Elasticsearch, un document est un objet JSON contenant une liste de champs. Dans le monde des base de données relationnelle, un document est similaire à une ligne dans une table SQL. Un index est un ensemble de documents (un tableau en termes SQL). Il est composé de shards (fragments) et de replicas (répliques). Ce que vous devez savoir sur les shards est qu’ils sont les blocs de construction de base d’un index Lucene. En pratique, vous n’aurai pas besoin de travailler avec les shards mais plus tôt les replicas et les index. Les replicas sont utilisés pour augmenter la résilience et les performances de recherche. Par défaut, chaque fragment ou index Elasticsearch aura une réplique. Cette configuration peut toujours être modifiée. Chaque index Elasticsearch inclut un schéma d’indexation (mapping) définissant certaines propriétés de l’index et les champs des documents qu’il contient. Il définit également comment ces champs sont analysés/indexés par Lucene. Dans la terminologie des bases de données relationnelle, un mapping est la définition du schéma de la table. Par défaut, Elasticsearch essaiera toujours de deviner automatiquement le type de données des champs d’un document. Dans la terminologie Elasticsearch, ceci est appelé mapping dynamique. Elasticsearch offre également une REST API pour permettre de définir de nouveaux schéma (mapping) et manipuler les données.

Assez de théories. L’installation d’Elasticsearch est simple. Les binaires (packages) pour toutes les majeures distributions Linux sont disponibles sur [elastic.co] (elastic.code). Dans notre cas, nous installons Elasticsearch sur un système Ubuntu. Les commandes suivantes effectuent le travail. Nous avons besoin de les exécuter sur tous les noeuds (ici 2 noeuds) qui feront partie de notre cluster Elasticsearch.

1wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
2echo "deb https://artifacts.elastic.co/packages/5.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-5.x.list
3apt-get update
4apt-get install openjdk-8-jdk elasticsearch

Simple! Après avoir exécuté ces commandes, nous avons installé Elasticsearch avec succès. L’étape suivante consiste à configurer les nœuds du cluster Elasticsearch. Le fichier de configuration principal est /etc/elasticsearch/elasticsearch.yml. Les lignes de configuration qui doivent être ajoutée ou mise à jour sont les suivantes :

1# Identifiant du cluster, doit être le même sur tous les nœuds
2cluster.name: binor-elk
3# Identifiant de la node, doit être unique
4node.name: "node-01"
5# Les interfaces réseau où le service elasticsearch écoutera. Ici nous utiliserons localhost et toutes les interfaces réseau locales.
6network.host: [_local_, _site_]
7# Liste des IP des noeuds du cluster.
8discovery.zen.ping.unicast.hosts:  [ "10.3.0.41", "10.3.0.42" ]

Les changements ci-dessus sont le minimum requis pour démarrer votre cluster. En fonction des capacités physique ou virtuelles (RAM, CPU) des machines de votre cluster, vous pouvez également modifier les paramètres JVM. Vous pouvez augmenter la taille de la mémoire (heap) disponible pour le service Elasticsearch. Vous pouvez également configurer Elasticsearch pour ne pas utiliser la mémoire swap. Plus de détails peuvent être trouvés à 1 et 2

Après les configurations ci-dessus, notre cluster est prêt. Vous pouvez le démarrer en exécutant la commande systemctl start elasticsearch.service sur les deux nœuds du cluster. Pour vérifier que tout fonctionne comme prévu, vous pouvez exécuter la requête HTTP suivante:

1curl -XGET 'localhost:9200/?pretty'

En principe, la réponse doit être similaire à ce qui suit:

 1{
 2  "name" : "node-01",
 3  "cluster_name" : "binor-elk",
 4  "cluster_uuid" : "o6KIJ5o6TNq0sbO3QiDo4A",
 5  "version" : {
 6    "number" : "5.1.1",
 7    "build_hash" : "5395e21",
 8    "build_date" : "2016-12-06T21:36:15.409Z",
 9    "build_snapshot" : false,
10    "lucene_version" : "6.3.0"
11  },
12  "tagline" : "You Know, for Search"
13}

Notre cluster Elasticsearch est prêt à recevoir des données. Mais avant cela, nous allons définir un schéma d’indexation. En particulier, nous voulons nous assurer que les champs contenant des valeurs IP réseaux ou virgule flottante (double) sont correctement gérés par le processus d’indexation Elasticsearch.

 1curl -XPUT http://127.0.0.1:9200/_template/logstash -d '
 2{
 3  "order" : 0,
 4  "template" : "logstash-*",
 5  "settings" : {
 6    "index.number_of_shards" : 5,
 7    "index.number_of_replicas" : 1,
 8    "index.query.default_field" : "message"
 9  },
10  "mappings" : {
11    "_default_" : {
12      "properties" : {
13        "src_ip" : {"type" : "ip"},
14        "dst_ip" : {"type" : "ip"},
15        "conn_duration" : {"type" : "double"}
16      }
17    }
18  },
19  "aliases" : { }
20}
21'

Dans la définition ci-dessus, nous demandons à Elasticsearch de toujours traiter les champs src_ip et dst_ip comme de type IP. Ce dernier est un type de données Elasticsearch pour représenter les valeurs d’adresse réseaux IPv4/6. Il fournit des fonctionnalités de recherche spéciales telles que la recherche sur un intervalle IP ou par sous réseau. Le schéma que nous avons définit ici est basique. Nous y reviendrons plus tard pour le mettre à jour.

Ceci conclut la mise en place de notre cluster Elasticsearch. Dans les posts suivants, nous parlerons de la configuration de Logstash pour le traitement et la transformation des données avant de les acheminer vers Elasticsearch.