The following is a very simple example of event stream processing (using the ESPER engine).
Note – a full working example is available over on GitHub:
What is Complex Event processing (CEP)?
Complex Event Processing (CEP), or Event Stream Stream Processing (ESP) are technologies commonly used in Event-Driven systems. These type of systems consume, and react to a stream of event data in real time. Typically these will be things like financial trading, fraud identification and process monitoring systems – where you need to identify, make sense of, and react quickly to emerging patterns in a stream of data events.
Key Components of a CEP system
A CEP system is like your typical database model turned upside down. Whereas a typical database stores data, and runs queries against the data, a CEP data stores queries, and runs data through the queries.
To do this it basically needs:
- Data – in the form of ‘Events’
- Queries – using EPL (‘Event Processing Language’)
- Listeners – code that ‘does something’ if the queries return results
A Simple Example – A Nuclear Power Plant
Take the example of a Nuclear Power Station..
Now, this is just an example – so please try and suspend your disbelief if you know something about Nuclear Cores, Critical Temperatures, and the like. It’s just an example. I could have picked equally unbelievable financial transaction data. But …
Monitoring the Core Temperature
Now I don’t know what the core is, or if it even exists in reality – but for this example lets assume our power station has one, and if it gets too hot – well, very bad things happen..
Lets also assume that we have temperature gauges (thermometers?) in place which take a reading of the core temperature every second – and send the data to a central monitoring system.
What are the requirements?
We need to be warned when 3 types of events are detected:
- MONITOR
- just tell us the average temperature every 10 seconds – for information purposes
- WARNING
- WARN us if we have 2 consecutive temperatures above a certain threshold
- CRITICAL
- ALERT us if we have 4 consecutive events, with the first one above a certain threshold, and each subsequent one greater than the last – and the last one being 1.5 times greater than the first. This is trying to alert us that we have a sudden, rising escalating temperature spike – a bit like the diagram below. And let’s assume this is a very bad thing.
Using Esper
- There are a number of ways you could approach building a system to handle these requirements. For the purpose of this post though – we will look at using Esper to tackle this problem
- How we approach this with Esper is:
- Using Esper – we can create 3 queries (using EPL – Esper Query Language) to model each of these event patterns.
- We then attach a listener to each query – this will be triggered when the EPL detects a matching pattern of events)
- We create an Esper service, and register these queries (and their listeners)
- We can then just throw Temperature data through the service – and let Esper tell alert the listeners when we get matches.
- (A working example of this simple solution is available on Githib – see link above)
Our Simple ESPER Solution
At the core of the system are the 3 queries for detecting the events.
Query 1 – MONITOR (Just monitor the average temperature)
select avg(value) as avg_val from TemperatureEvent.win:time_batch(10 sec)
Query 2 – WARN (Tell us if we have 2 consecutive events which breach a threshold)
select * from TemperatureEvent " match_recognize ( measures A as temp1, B as temp2 pattern (A B) define A as A.temperature > 400, B as B.temperature > 400)
Query 3 – CRITICAL – 4 consecutive rising values above all above 100 with the fourth value being 1.5x greater than the first
select * from TemperatureEvent match_recognize ( measures A as temp1, B as temp2, C as temp3, D as temp4 pattern (A B C D) define A as A.temperature < 100, B as (A.temperature < B.value), C as (B.temperature < C.value), D as (C.temperature < D.value) and D.value > (A.value * 1.5))
Some Code Snippets
TemperatureEvent
- We assume our incoming data arrives in the form of a TemperatureEvent POJO
- If it doesn’t – we can convert it to one, e.g. if it comes in via a JMS queue, our queue listener can convert it to a POJO. We don’t have to do this, but doing so decouples us from the incoming data structure, and gives us more flexibility if we start to do more processing in our Java code outside the core Esper queries. An example of our POJO is below
package com.cor.cep.event; package com.cor.cep.event; import java.util.Date; /** * Immutable Temperature Event class. * The process control system creates these events. * The TemperatureEventHandler picks these up * and processes them. */ public class TemperatureEvent { /** Temperature in Celcius. */ private int temperature; /** Time temerature reading was taken. */ private Date timeOfReading; /** * Single value constructor. * @param value Temperature in Celsius. */ /** * Temerature constructor. * @param temperature Temperature in Celsius * @param timeOfReading Time of Reading */ public TemperatureEvent(int temperature, Date timeOfReading) { this.temperature = temperature; this.timeOfReading = timeOfReading; } /** * Get the Temperature. * @return Temperature in Celsius */ public int getTemperature() { return temperature; } /** * Get time Temperature reading was taken. * @return Time of Reading */ public Date getTimeOfReading() { return timeOfReading; } @Override public String toString() { return "TemperatureEvent [" + temperature + "C]"; } }
Handling this Event
- In our main handler class – TemperatureEventHandler.java, we initialise the Esper service. We register the package containing our TemperatureEvent so the EPL can use it.
- We also create our 3 statements and add a listener to each statement
/** * Auto initialise our service after Spring bean wiring is complete. */ @Override public void afterPropertiesSet() throws Exception { initService(); } /** * Configure Esper Statement(s). */ public void initService() { Configuration config = new Configuration(); // Recognise domain objects in this package in Esper. config.addEventTypeAutoName("com.cor.cep.event"); epService = EPServiceProviderManager.getDefaultProvider(config); createCriticalTemperatureCheckExpression(); createWarningTemperatureCheckExpression(); createTemperatureMonitorExpression(); }
- An example of creating the Critical Temperature warning and attaching the listener
/** * EPL to check for a sudden critical rise across 4 events, * where the last event is 1.5x greater than the first. * This is checking for a sudden, sustained escalating * rise in the temperature */ private void createCriticalTemperatureCheckExpression() { LOG.debug("create Critical Temperature Check Expression"); EPAdministrator epAdmin = epService.getEPAdministrator(); criticalEventStatement = epAdmin.createEPL(criticalEventSubscriber.getStatement()); criticalEventStatement.setSubscriber(criticalEventSubscriber); }
- And finally – an example of the listener for the Critical event. This just logs some debug – that’s as far as this demo goes.
package com.cor.cep.subscriber; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.cor.cep.event.TemperatureEvent; /** * Wraps Esper Statement and Listener. No dependency on Esper libraries. */ @Component public class CriticalEventSubscriber implements StatementSubscriber { /** Logger */ private static Logger LOG = LoggerFactory.getLogger(CriticalEventSubscriber.class); /** Minimum starting threshold for a critical event. */ private static final String CRITICAL_EVENT_THRESHOLD = "100"; /** * If the last event in a critical sequence is this much greater * than the first - issue a critical alert. */ private static final String CRITICAL_EVENT_MULTIPLIER = "1.5"; /** * {@inheritDoc} */ public String getStatement() { // Example using 'Match Recognise' syntax. String criticalEventExpression = "select * from TemperatureEvent " + "match_recognize ( " + "measures A as temp1, B as temp2, C as temp3, D as temp4 " + "pattern (A B C D) " + "define " + " A as A.temperature > " + CRITICAL_EVENT_THRESHOLD + ", " + " B as (A.temperature < B.temperature), " + " C as (B.temperature < C.temperature), " + " D as (C.temperature < D.temperature) " + "and D.temperature > " + "(A.temperature * " + CRITICAL_EVENT_MULTIPLIER + ")" + ")"; return criticalEventExpression; } /** * Listener method called when Esper has detected a pattern match. */ public void update(Map<String, TemperatureEvent> eventMap) { // 1st Temperature in the Critical Sequence TemperatureEvent temp1 = (TemperatureEvent) eventMap.get("temp1"); // 2nd Temperature in the Critical Sequence TemperatureEvent temp2 = (TemperatureEvent) eventMap.get("temp2"); // 3rd Temperature in the Critical Sequence TemperatureEvent temp3 = (TemperatureEvent) eventMap.get("temp3"); // 4th Temperature in the Critical Sequence TemperatureEvent temp4 = (TemperatureEvent) eventMap.get("temp4"); StringBuilder sb = new StringBuilder(); sb.append("***************************************"); sb.append("n* [ALERT] : CRITICAL EVENT DETECTED! "); sb.append("n* " + temp1 + " > " + temp2 + " > " + temp3 + " > " + temp4); sb.append("n***************************************"); LOG.debug(sb.toString()); } }
The Running Demo
Full instructions for running the demo can be found here:https://github.com/adrianmilne/esper-demo-nuclear
An example of the running demo is shown below – it generates random Temperature events and sends them through the Esper processor (in the real world this would come in via a JMS queue, http endpoint or socket listener).
When any of our 3 queries detect a match – debug is dumped to the console. In a real world solution each of these 3 listeners would handle the events differently – maybe by sending messages to alert queues/endpoints for other parts of the system to pick up the processing.
Conclusions