Real-time analysis of event logs with Elasticsearch

I'm gathering event logs every time a property of some device is changed. For this purpose I decided to use:

  1. Logstash - where my agent IoT application sends logs to in JSON format,
  2. Elasticsearch - for storing data (logs),
  3. Kibana - for data visualisation.

The JSON with logs is being send in regular intervals and its form is as follows:

{"deviceEventLogs":[{"date":"16:16:39 31-08-2016","locationName":"default","property":"on","device":"Lamp 1","value":"
false","roomName":"LivingRoom"}, ... ,]}

Example of single event entry in Elasticsearch looks as follows:

 {
            "_index": "logstash-2016.08.25",
            "_type": "on",
            "_id": "AVbDYQPq54WlAl_UD_yg",
            "_score": 1,
            "_source": {
               "@version": "1",
               "@timestamp": "2016-08-25T20:25:28.750Z",
               "host": "127.0.0.1",
               "headers": {
                  "request_method": "PUT",
                  "request_path": "/deviceEventLogs",
                  "request_uri": "/deviceEventLogs",
                  "http_version": "HTTP/1.1",
                  "content_type": "application/json",
                  "http_user_agent": "Java/1.8.0_91",
                  "http_host": "127.0.0.1:31311",
                  "http_accept": "text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2",
                  "http_connection": "keep-alive",
                  "content_length": "34861"
               },
               "date": "2016-08-08T14:48:11.000Z",
               "device": "Lamp 1",
               "property": "on",
               "locationName": "default",
               "roomName": "LivingRoom",
               "value_boolean": true
            }
 }

My goal is to create a website with some kind of dashboard showing analyzed data in resonable time (several minutes should be acceptable) i.e.:

  • showing history of energy consumption and predicting the consumption in the feature
  • detecting anomalies in energy consumption or other factors like lights or heating usage
  • showing recomendations based on some kind of not sofisticated statistics i.e. "you can move a given device from location1 to location2 because it's more needed there (more intensively used than in other place)", etc.

While the last point is quite trivial - I can use simple query or aggregation in Elasticsearch and then compare it to some treshold value, the first two points require in-depth analysis like machine learning or data mining.

For now the system is eqquiped with around 50 devices updating their status every 10 sec in average. In the future the number of devices can increase up to 50 000. Assumig 100 bytes for one event log it can lead in approximation of around 15 Terabytes of data in Elasticsearch per year.

The general question is - what can be a resonable solutions / technology / architecture of such system?

  1. Is it a resonable start to store all my logs in Elasticsearch?
  2. I consider es-hadoop library to use Elasticsearch along with Apache Spark to have an ability to process my data using Mlib in Spark - is it a resonable direction to go?
  3. Can I use only Elasticsearch to store all my data in it and just use Spark and Mlib to provide in-depth analysis or should I consider implementing so called "Lambda Architecture" treating Elasticsearch as a Speed Layer? I've red a bit about various configurations where Kafka, Apache Storm was used but I'm not really sure I need it. Since the project should be done within a one month and I'm a beginner, I'm worried about complexity and hence time needed for such implementation.
  4. What if data load would be 10x smaller (around 1,5 Terabytes per year) - will your answer be the same?

Answers


This is a very elaborate question, let me try to break it down:

Questions that you should think about

  • What is the end-to-end latency for your data to be available for queries? Do you need it real-time or you are okay with delays?
  • What is the data-loss that you are willing to tolerate?
  • What is the accuracy of the analytics/ML algorithms that you are looking at? Do you need highly accurate results or you are okay with some inaccuracies?
  • Do you need results only when they are complete or do you need some kind of speculative results?

These questions along with the regulars like space constraints and latency when data load increases, etc. should help you determine the right solution.

Generally, these problems can be viewed as Ingestion -> Processing -> Presentation.

Ingestion - Need for a Message Bus

Generally, people opt for a message bus like Kafka to handle back-pressure from slow downstream consumers and also to provide reliability (by persisting to disk) to prevent data loss. Kafka also has a good community support in terms of integrations like Spark streaming, Druid firehose support, ES plugins, etc.

Processing - Need for a scalable compute layer

This is where you need to decide on things like real-time vs. batch processing, applicable data-loss, accurate vs speculative results, etc. Read Tyler Akidau's article on streaming at https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 for a detailed explanation.

People choose Spark streaming for real-time use-cases and a simple M/R job should do the trick for batch jobs. If you are planning on streaming jobs, then windowing and sessions of events can complicate things further.

Presentation - Need for interactive queries and fast responses

This is where the front-facing app is going to integrate and it makes sense to pick a tool that is ideally suited for the kind of queries expected and the accuracy of responses needed.

Tools like ES perform extremely well for searching, filtering and faceting, but fail when there is a need for complex mathematical aggregations. AFAIK ES doesn't support probabilistic structures like HyperLogLog like Druid does.

Retrofit

Now you have to map the requirements you have with each of the layers above.

showing history of energy consumption and predicting the consumption in the feature

detecting anomalies in energy consumption or other factors like lights or heating usage

You clearly need Machine Learning libraries as you have mentioned. Spark with its MLib support is super-awesome.

showing recomendations based on some kind of not sofisticated statistics i.e. "you can move a given device from location1 to location2 because it's more needed there (more intensively used than in other place)", etc.

You could even do this using MLib on Spark and have the recommendations pumped to a separate index in ES or even a Kafka topic, which you can further get it down to HDFS or ES. You should be careful with garbage collection here, as this can lead to data explosion and you need to be aggressive about retention here. Also computing recommendations before-hand helps you do reactive stuff like alerting, push notifications and even a query from a UI will be faster.

Assumig 100 bytes for one event log it can lead in approximation of around 15 Terabytes of data in Elasticsearch per year.

These are normal problems of provisioning with any storage systems. You can optimise here by calculating materialised views for historical data, but you can leave that decision a bit later, as this can lead to premature optimisation. You would be better to measure the storage and latency of queries to begin with and then do a retroactive analysis of capacity.

Is it a resonable start to store all my logs in Elasticsearch?

Very much so, considering your use-case. But if use Spark streaming/MLib or a batch MR job, then you could even use dumb data-stores, as most of the computations happen before-hand.

I consider es-hadoop library to use Elasticsearch along with Apache Spark to have an ability to process my data using Mlib in Spark - is it a resonable direction to go?

Looks like you have decided on batch processing, in which case you can use standard MR or spark batch along with MLib. If you need real-time, you need something like Kafka and use spark streaming. If you are okay with data-loss, you could be aggressive about retention and even in Spark, when you decide on windowing/sliding intervals, etc. If you are okay with results being inaccurate, you can use probabilistic data-structures (like bloom filter, hyperloglog - druid supports this) to represent the results.

Can I use only Elasticsearch to store all my data in it and just use Spark and Mlib to provide in-depth analysis or should I consider implementing so called "Lambda Architecture" treating Elasticsearch as a Speed Layer?

I am not sure if you could stream data from ES to Spark jobs. And lambda architecture is over-hyped and only helps if you know for sure that your real-time layer is inaccurate and you cannot handle data-loss/inaccuracies. Otherwise a simple spark streaming job reading data from Kafka and pumping to ES should be more than enough. Please consider measuring data loss before you decide on elaborate architectures like Lambda, since the operational costs (like duplicate code, more infrastructure to maintain, etc.) are likely high.

What if data load would be 10x smaller (around 1,5 Terabytes per year) - will your answer be the same?

I would still prefer the same architecture - Kafka+Spark streaming(MLib)+ES/Druid - this is easier to implement and easier to maintain.


Need Your Help

Managing Multiple UItextFields

iphone uitableview tags uitextfield

Novice coder and I'm stuck. I have 2 UITextField on my Viewcontroller that I want passed to a UITableView when "save" is tapped. I've set both textFields to delegate.self, the problem is that the ...

Change a div color in hiccup

html clojure compojure hiccup

Is there a way I can set a div background color in hiccup? Here is what I tried so far, with no result: