Solving WSO2 Virtual15 hackathon query using WSO2 CEP 4.0.0

In this post, it discusses a possible solution to the first query of WSO2 Virtual15 hackathon, which was held on 26th September 2015. This post was written as a follow up to “Querying complex event streams with WSO2 CEP 4.0.0” to provide more in-depth knowledge on querying complex event streams using a real world example.

The hackathon took the same format as the DEBS 2014 Grand Challenge, posing a real world data set and a set of problems. The data set it used was originated from 2125 smart plugs deployed across 40 houses, where events get emitted to the stream roughly every second for each sensor in each smart plug. At the hackathon, participants were expected to came up with a solution to two queries. Among them, the first query was to come up with a load prediction query to predict load based on current load measurements and those of recorded historical data. In this post, it discusses a possible query (or a solution) to predict load according to their specification.

Load Prediction Problem

The goal of this query is to make load forecasts based on current load measurements and those of recorded historical data. Such forecasts can be used to pro-actively influence load and adapt it to the supply situation, e.g. current production of renewable energy sources.

You must use following algorithm for prediction and implement it using WSO2 CEP.

The query should forecast the load for each house. Starting from September 1, 2013, we group each 24 hours into 12X24 5 minutes slices. For example, at current time c (current event time) belongs to the slice floor(time_of_day(c)/300). Here Floor is the math:floor() function and time_of_day(timestamp) function, should return the number of seconds elapsed in the given day for the given timestamp.

We predict load of time t+10 based on current time c which is falling in the time slice starting at t,  where t is a multiple of 5 minutes.

Virtual15 Equationwhere “c” is current event time.

Virtual15 Algorithm Explained

The output streams for house-based prediction values should contain the following information:

  • ts – timestamp of the starting time of the slice that the prediction is made for
  • house_id – id of the house for which the prediction is made
  • predicted_load – the predicted load for the time slice starting at ts

The output streams for plug-based prediction values should contain the following information:

  • ts – timestamp of the starting time of the slice that the prediction is made for
  • house_id – id of the house where the plug is located
  • household_id – the id of the household where the plug is located
  • plug_id – the id of the plug for which the prediction is made
  • predicted_load – the predicted load for the time slice starting at ts

The output streams should be updated every 30 seconds as specified by the input event timestamp. The purpose of the update is to reflect the latest value of the avgLoad(s_i) for the given slice.

Load Prediction Solution

Solution for this not only contains a CEP query (Execution Plan), but also several other artifacts such as Event Streams, Event Receivers, Event Publishers. First we will add those artifacts;

Adding Event Streams

Event stream defines the events which goes through a particular flow by defining the event’s attributes and it’s types. For this solution, it requires three event streams;

1. SmartPlugsDataStream (input stream)

2. HouseBasedLoadPredictionStream (output stream)

3. PlugBasedLoadPredictionStream (output stream)

An event stream can be created in the CEP Management Console as below. Process of creating event streams is similar for both input and output streams. Therefore, this article explains one in detail. And other two streams can also be created similarly.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Streams to open the Available Event Streams page.
  2. Click Add Event Stream to open the Define New Event Stream page.
  3. Enter information as follows to create a new event stream named SmartPlugsDataStream.
    define new event stream
  4. Click Add Event Stream to save the information.

Adding Event Receivers

Events received by the CEP server have different formats such as XML, JSON and Map. Event receivers transform these events to WSO2 Events that can be directed to an event stream defined in the CEP. For this solution, create an event receiver named “SmartPlugDataReceiver” which directs events to the event stream named  “SmartPlugsDataStream” that was created above. The receiver can be created using the Management Console as follows.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Receivers to open the Available Receivers page.
  2. Click Add Event Receiver to open the Create a New Event Receiver page.
  3. Enter information as follows to create the new event receiver named SmartPlugDataReceiver.
    adding event receiver
  4. Click Add Event Receiver to save the information.

Adding Event Publishers

Event publishers publish events processed by the WSO2 servers to external applications. These events are published via HTTP, Kafka, JMS, etc. in  JSON, XML, Map, text, and WSO2Event formats to various endpoints and data stores. For this solution we require two event publishers named HouseBasedLoadPublisher and PlugBasedLoadPublisher. Those publisher can be created using the Management Console as follows. Following will add HouseBasedLoadPublisher, similarly you can add PlugBasedLoadPublisher as well.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Publishers to open the Available Publishers page.
  2. Click Add Event Publisher to open the Create a New Event Publisher page.
  3. Enter information as follows to create the new event publisher named  HouseBasedLoadPublisher.
    create new event publisher
  4. Click Add Event Publisher to save the information.

Writing Execution Plans

This can be considered the An Execution Plan can import one or more streams from the server for processing and push zero or more output streams back to the server. For this solution, it requires two Execution Plans named HouseBasedLoadPrediction and PlugBasedLoadPrediction to calculate and forecast both house based and plug based load predictions. Execution Plans can be added to the CEP as follows.

  1. Log into the CEP Management Console and click on the Main tab. Under Manage, click Execution Plans to open the Available Execution Plans page.
  2. Click Add Execution Plan to open the Create a New Execution Plan page.
  3. Enter information as follows to create the new execution plan.
    create new execution plan
  4. Click Validate Query Expressions. Once you get a message to confirm that the queries are valid, click Add Execution Plan.

Understanding Execution Plan (Querying Event Stream)

Initially, Each Execution plan requires a unique name to identify itself. Here the Execution plan was named as HouseBasedLoadPrediction using @Plan:name() annotation.

Optionally since this is going to be a distributed execution plan, we can use @Plan:dist() annotation to set the number of event receiver spouts, number of event publisher bolts to be spawned for the Storm topology.

Once it is properly named, then it defines the required Input and Output event streams using @Import and @Export annotations respectively. Please note that at this stage, all input and output streams should already be added to the CEP as discussed above.

Since the problem statement mentions about keeping historical event data, it requires an event table to persist those historical data. Therefore, we define an In-Memory (RDBMS is also possible) Events Tables and index the data using their slice_id using the @IndexBy annotation. By indexing the Events Tables, it provides blazing fast event access. To learn more about Event Tables and Indexing, please refer to Event Table section of SiddhiQL guide.

Now that it has all the streams and tables defined properly, next move on to formatting incoming event stream to suit the queries. Initially, the SmartPlugsDataStream contains the timestamp (ts) in seconds format. However, we need it in milliseconds format to use with time:extract() function. Therefore, we take in the SmartPlugsDataStream, drop metadata, convert its timestamp (ts) into milliseconds and emit to an intermediate stream called preFormattedDataStream.

Now that it has milliseconds timestamp, we can use it to calculate slice_id and next_slice_id for each event. Both slice_id and next_slice_id will be calculated by concatenating house_id with the value of ( elapsed minutes per day / 5 ). Once the slice_id and next_slice_id is calculated, emit events to an intermediate stream called formattedDataStream.

Since formattedDataStream now have millisecond timestamp, calculated slice_id and next_slice_id we will be using it for further processing. Next, we will calculate sum and count of usage for each 5 min window for each house, slice and emit it to another intermediate stream called currentDataStream. Since the timestamp provided in the dataset is not equal to the current time (the time CEP receives events), we have to use an External Time Window #window.externalTime(ts, 5 min). Apart from that, we also have to assume that events are in ascending order of time so that most recently emitted data will represent all the previous data of a particular slice.

Afterward, emitted currentDataStream will be used in updating available historical data on Event Table (historicalDataTable) and calculate load predictions. Here, all the queries that use historicalDataTable will be written inside a single execution group (execGroup) with parallelism 1 which is partitioned by the house_id of currentDataStream. The reason for doing that is to keep a single shared memory for the In memory event table. This can be overcome by using RDBMS instead of In-memory tables. However, for this example, let’s go ahead with In memory event table with single parallelism. Within that partitioned section, there are 5 separate queries;

  1. GetStateUpdateStream query will take in currentDataStream and return stateUpdateStream if only the immediate next event have a different slice_id than the current event. This query reduces the number of table lookups as well as stops values in tables getting overridden with wrong values.
  2. UpdateHistoricalDataTable query will get executed only if GetStateUpdateStream emits an event to the stateUpdateStream. Here it will check whether stateUpdateStream event’s slice_id already exists in the historicalDataTable if so it will update the existing value of historicalDataTable with stateUpdateStream event’s values.
  3. AddToHistoricalDataTable query will also get executed only if GetStateUpdateStream emits an event to the stateUpdateStream. Here also it will check whether stateUpdateStream event’s slice_id already exists in the historicalDataTable, if “not” so it will add stateUpdateStream event’s values to the historicalDataTable.
  4. CalculatePredicitonWithOutHistoricalData query checks whether currentDataStream event’s next_slice_id already exists in the historicalDataTable. If not, which implies there are no historical data available for calculating load values. So it will go ahead and calculate predictions only using current-event values, and emit those calculated values to predictedLoadStream.
  5. CalculatePredicitonWithHistoricalData query will also check whether currentDataStream event’s next_slice_id already exists in the historicalDataTable. If it is, which implies there are historical data available for calculating load values. So it will calculate predictions using current-event values and historical data values, and emit those calculated values to predictedLoadStream.

Finally, the predictedLoadStream will be sent through a output rate limiter, and make it only emit current events to houseBasedLoadPredictionStream every 30 seconds per each house.

View the simple event flow

Now that we have added all the necessary Streams, Receivers, Publishers and Execution Plans, you can see the following event flow under CEP Event Flow page.
event flowSo far we have created the Streams, Receivers, Publishers and Execution Plans required for forecasting house based load. Similarly we can come-up with a  Execution Plan to forecast plug based load too.

All artifacts, execution plans and sample event publisher can be found here : GitHub – virtual-15-hackathon

If you find this post useful, Share it and Please leave a comment below 🙂