Logstash


Building a data pipeline for Bro IDS logs

In this blog post, the second in our series about the ELK stack, we present an introduction on how to use Logstash. By definition, Logstash is a data processing pipeline that provides the components to ingest data from a variety of sources, to transform/enrich that data and finally to send it to a data store or another processing pipeline. With its modular architecture, Logstash offers a robust framework to easily build a data processing pipeline. As a showcase, we will be using the Bro IDS generated data/logs as a input for our data processing pipeline.

The Logstash installation is straight forward. You can download official binary packages for the major Linux distribution. The only requirement you need is Java either using the Open Source OpenJDK or the Oracle Official distribution. After the installation, depending on your use cases, you can modify the two following files:

  • LOGSTASH_INSTALL_PATH/jvm.options: to modify (increase) the Java heap size.
  • LOGSTASH_INSTALL_PATH/logstash.yml: to modify Logstash processing options like the number of worker, the batch size (number of data unit a single worker will process)…

After the Logstash installation, the next step is the configuration of the data pipeline. Before getting into the details of the configuration, it is important to note that in the Logstash jargon, an event is represented by a data object (JSON) that it received through one of the input plugins. This object can be transformed using one or more filter plugins and shipped to another processing queue or data store through an output plugin.

Input

We start first with the data collection component or the Input Plugins. Out of the box, Logstash provides multiple input plugins. These plugins cover simple operation like reading from a file or listening on a tcp/udp port. It also provides plugins to read from messaging/queuing systems like Redis or Kafka or from a database with a JDBC interface. The full list of the supporting input plugins is available in the Logstash official documentation.

In our showcase, we are using the Filebeat lightweight log shipper to read the Bro generated log files and send them to Logstash Beats input plugin.

1input {
2	beats {
3		port => 5044
4	}
5}

At the log source, the following Filebeat input configuration is used to send the bro logs to our Logstash indexer.

1- input_type: log
2  paths:
3    - /opt/bro/logs/current/conn.log
4  exclude_lines: ['^#']
5  fields:
6    logtype: broconn
7  document_type: logstash-bro
8  fields_under_root: true

We are here adding some extra fields to the generated event object. The field logtype defines the filter/parsing rules that will be applied on the log message.

Output

For the pipeline outputs, Logstash provides multiple choices. In our showcase, we are using the Elasticsearch output plugin to store the Bro logs. These logs are stored in an dynamically named index based on the type and the timestamp (date) of the event. In the following example, the bro logs will be stored in an index named: logstash-bro-2017.09.06.

1output {
2  elasticsearch {
3    hosts => ["elastic-hostname"]
4    index => "%{type}-%{+YYYY.MM.dd}"
5  }
6}

Filters

After the definition of the input and output of our data processing pipeline, the final step is the definition of the filters that will be applied to the data. Logstash offers a variety of ready to use filters to parse the data and extract meta-data. The parsing can be done using regular expressions or csv or key/value. In addition, some of these filters allow to enrich the data with geographical location information. With the Logstash ruby filter, you have the option to apply to the data any ruby code.

As mentioned above, we are using the Bro logs as an example to demonstrate the Logstash capabilities. In particular, we are using the following type of logs from Bro:

  • Conn: the network flow logs, showing a summary of the IP connection seen in the network.
  • HTTP: the HTTP connection logs, showing a summary of the seen HTTP traffic.
  • DNS: the DNS traffic logs, showing the a summary of the DNS queries and responses.

The followings are samples of generated Bro logs:

 1# conn.log
 21503305215.555986       CN77ln4RMSpStQV2X9      10.10.2.201     34240   145.239.49.27   80      tcp     http    2.1
 352318   606     186     SF      T       F       0       ShADadfF        6       866     5       390     (empty)
 4# http.log
 51503305207.181153       CfHZwi2X7yU6i6uk27      10.10.2.201     55178   54.243.45.82    80      1       GET     pin
 6g.chartbeat.net /ping?h=lemonde.fr&p=/&u=D2WApBJg9NeBfqUX-&d=lemonde.fr&g=12231&n=1&f=00001&c=0&x=0&m=0&y=16423&o=1
 7340&w=501&j=45&R=1&W=0&I=0&E=0&e=0&r=&b=41665&t=Dv-EDdCijdiMTqbSiDDktxxMlkUO&V=93&i=Le Monde.fr - Actualit\xc3\xa9s
 8 et Infos en France et dans le monde&tz=0&sn=1&EE=0&sv=C3gSusB3FL0pFM9d3lXu6j_19yB&_    http://www.lemonde.fr/  1.1
 9        Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/604.1 (KHTML, like Gecko) Version/11.0 Safari/604.1 elementary
10OS/0.4 (Loki) Epiphany/3.18.11  0       43      200     OK      -       -       (empty) -       -       -       -
11      - -       FaItHM1BXflwyPgKy4      -       image/gif
12
13# dns.log
141503305221.493568       CIkYyh2UsRrISregCg      10.10.2.201     41847   10.10.2.1       53      udp     27035   0.0
1500446   ping.chartbeat.net      1       C_INTERNET      1       A       0       NOERROR F       F       T       T
160       54.243.45.82    45.000000       F

The first step of the Bro logs processing is the parsing of the data to extract the corresponding meta data. Logstash offers a powerful filter plugins to do this: grok filters. Using the official Bro documentation, we can write simple grok patterns to parse the above log types. Looking at these log samples, we can see that the three types share some common fields/headers. We start by first extracting the common fields like the event time stamp, the source/destination IP and port. The following grok patterns define the regular expressions for the bro logs examples we listed above.

1LOG_HEADER %{BASE16FLOAT:bro_ts}\t(-|%{DATA:cuid})\t(-|%{IP:src_ip})\t(-|%{INT:src_port})\t(-|%{IP:dst_ip})\t(-|%{INT:dst_port})\t%{GREEDYDATA:bro_message}
2
3CONN (-|%{DATA:proto})\t(-|%{DATA:service})\t(-|%{BASE16FLOAT:conn_duration})\t(-|%{INT:bytes_sent})\t(-|%{INT:bytes_received})\t(-|%{DATA:conn_state})\t(-|%{DATA:local_orig})\t(-|%{DATA:local_resp})\t(-|%{INT:missing_bytes})\t(-|%{DATA:conn_history})\t(-|%{INT:orig_pkts})\t(-|%{INT:orig_ip_bytes})\t(-|%{INT:resp_pkts})\t(-|%{INT:resp_ip_bytes})\t%{GREEDYDATA:tunnel_parents}
4
5HTTP %{INT:trans_depth}\t(-|%{DATA:http_method})\t(-|%{DATA:domain})\t(-|%{DATA:http_uri})\t(-|%{DATA:referer})\t(-|%{DATA:http_version})\t(-|%{DATA:user_agent})\t(-|%{INT:bytes_sent})\t(-|%{INT:bytes_received})\t(-|%{INT:http_resp_code})\t(-|%{DATA:http_resp_msg})\t(-|%{INT:http_info_code})\t(-|%{DATA:http_info_msg})\t(-|%{DATA:http_tags})\t(-|%{DATA:username})\t(-|%{DATA:http_pwd})\t(-|%{DATA:http_proxied})\t(-|%{DATA:sent_fuids})\t(-|%{DATA:sent_file_name})\t(-|%{DATA:sent_mime_types})\t(-|%{DATA:resp_fuids})\t(-|%{DATA:resp_file_name})\t(-|%{GREEDYDATA:resp_mime_types})
6
7DNS %{DATA:proto}\t(-|%{DATA:dns_trans_id})\t(-|%{BASE16FLOAT:conn_duration})\t%{DATA:domain}\t(-|%{INT:dns_qclass})\t%{DATA:query_class}\t(-|%{INT:dns_qtype})\t%{DATA:dns_query_type}\t(-|%{INT:dns_rcode})\t%{DATA:dns_resp_code}\t%{DATA:dns_aa}\t%{DATA:dns_tc}\t%{DATA:dns_rd}\t%{DATA:dns_ra}\t%{DATA:dns_z}\t(-|%{DATA:dns_answers})\t(-|%{DATA:dns_answers_ttls})\t%{GREEDYDATA:dns_rejected}

Following the definition of the patterns, the next step is the definition of the filter scripts. These scripts will apply the grok patterns to the data and enrich it with the extracted fields.

 1filter {
 2  if [type] == "logstash-bro" and [logtype] == "broconn" {
 3    grok {
 4      patterns_dir => ["/etc/logstash/patterns"]
 5      match => { "message" => "%{LOG_HEADER}" }
 6    }
 7    if [bro_message] {
 8      grok {
 9     	  patterns_dir => ["/etc/logstash/patterns"]
10        match => { "bro_message" => "%{CONN}" }
11      }
12      mutate {
13        replace => [ "message", "%{bro_message}" ]
14        remove_field => [ "bro_message" ]
15      }
16    }
17    mutate {
18      convert => { "dst_port" => "integer" }
19      ...
20    }
21    ...
22    if [bro_ts] {
23      date {
24        match => [ "bro_ts", "UNIX"]
25        timezone => "UTC"
26      }
27      mutate { remove_field => [ "bro_ts" ] }
28    }
29  }
30}

In the snippet above we are applying the grok filter to the bro connection logs using the CONN pattern we defined earlier. We are also using the mutate filter to convert the dst_port to an integer. This will tell Elasticsearch to index this field as a number and allow the application of some special filters like range/interval search.

A powerful example of Logstash plugins is the ruby filter. Using this filter we can transform or enrich the event data using a ruby script. In the case of the DNS logs, the dns_answers field contains a list of resource description (e.g. IP) associated with the queried domain. In our example, we want to extract the ip associated with the domain and store them in a new field.

 1if [dns_answers] {
 2	mutate {
 3		split => { "dns_answers" => "," }
 4	}
 5	ruby {
 6		init => "require 'resolv'"
 7		code => "
 8			aa_list = event.get('dns_answers')
 9			ll = aa_list.length
10			d_ips = []
11			(1..ll).each do |i|
12				vv = aa_list[i-1]
13				if vv =~ Resolv::IPv4::Regex || vv =~ Resolv::IPv6::Regex
14					d_ips.push(vv)
15				end
16			end
17			if d_ips.length > 0
18				event.set('domain_ips', d_ips)
19			end
20		"
21	}
22}

In the snippet above, we first use another operator split of the mutate filter to transform the dns_answers field to a list. Following this we use a ruby script to loop over this list and extract from it the IP. These IP are then added to a new field called domain_ips.

Our last showcase example is the geoip filter. With this filter we can enrich our data with IP geographical location. Logstash comes bundled with Maxmind’s GeoLite2 City database. To get a more up to date version, you have the option to download the database and configure the filter to use it.

 1if [src_ip] and ![src_location] {
 2	if [src_ip] !~ /^10\./ and [src_ip] !~ /^192\.168\./ and [src_ip] !~ /^172\.(1[6-9]|2[0-9]|3[0-1])\./ {
 3		geoip {
 4			source => "src_ip"
 5			target => "src_geoip"
 6			database => "/etc/logstash/vendors/GeoLite2-City.mmdb"
 7			add_field => [ "src_location", "%{[src_geoip][longitude]}" ]
 8			add_field => [ "src_location", "%{[src_geoip][latitude]}"  ]
 9		}
10		mutate {
11			convert => [ "src_location", "float" ]
12		}
13	}
14}

In the example above we add two new fields: src_geoip and src_location. The first is a JSON object that stores the output of the geoip filter, like the country name and city name. The second stores the longitude and latitude attributes of the IP location. This field is mostly used for the Elasticsearch Geo-point data type and enables doing geo distance aggregation and queries. To enable these features we update the Elasticsearch mapping to specify that the src_location field is of type geo_point.

1	"src_location" : { "type": "geo_point"},

This conclude our presentation of how to build a data processing pipeline of Bro IDS logs using Logstash. Our goal here was to give a preview of the capabilities of Logstash as an ETL tool. As a showcase we used as a data source the logs generated by the Bro IDS. More details about the scripts and the ELK configuration used here can be found in our GitHub repository.