Querying complex event streams with WSO2 CEP 4.0.0

What are complex event streams?

Events such as flight arrivals, departures and passenger check-ins in a busy airport, credit card processing in a credit card company, usage statistics generated by smart equipment such as smart plugs, fitness trackers can generate a constant high volume, stream of events. Such streams, which contains a large number of event data are known as complex event streams.

What is complex event processing?

Simply put, processing multiple event streams to identifying meaningful patterns from complex event streams, analyze their impacts, and acts on them in real time is known as complex event processing. To perform that process, CEP incorporates  event visualization, event databases, event-driven middleware, and event processing languages (such as SiddhiQL, StreamSQL)

In the modern competitive business environment, complex event processing helps businesses to become a more agile and connected. Moreover, it also allows businesses to anticipate risks or flag opportunities in real time, thereby responding to urgent business situations with both speed and precision.

What is WSO2 Complex Event Processor?

WSO2 Complex Event Processor (CEP) is a lightweight, user-friendly, open source Complex Event Processing server available under Apache Software License v2.0. WSO2 CEP distinguishes the most meaningful events within the event cloud, analyzes their impact, and acts on them in real-time. It is built to be high performing and scalable. Apart from being open source, scalable and high performing Complex Event Processor, WSO2 CEP also consists of a lot of other features which makes it stand out from in the competition.

Processing Complex Event Streams with WSO2 CEP 4.0.0

WSO2 CEP 4.0.0 provides various types of queries and built-in functions to process Complex Event Streams. Among them, there are many Aggregate, Pattern matching, Event Windowing and Event joining functions. For the complete list of built-in functions of WSO2 CEP, please refer to SiddhiQL Guide 3.0

Querying complex event streams

Underneath WSO2 CEP 4.0.0, it employs SiddhiQL 3.0 for querying complex event streams. Therefore, we’ll have to write Siddhi queries whenever we need to query complex event streams. Such Siddhi query can consume one or more event streams and create a new event stream from them. Each simple Siddhi query contain an input section and an output section, where bit complex queries also contain a projection section too.

However, there can be situations where you need to perform more than just a simple query projection. In such scenarios, SiddhiQL 3.0 supports a variety of methods for you to play around with complex event streams. Based on the type of processing it provides, such queries can be categorized into below types;

  • Query Projection
  • Filter
  • Window
  • Output Event Categories
  • Aggregate Functions
  • Group By
  • Having
  • Output Rate Limiting
  • Join
  • Pattern
  • Sequence

In this post, it only looks into querying complex event streams using Projection, Filtering, Window processing and Joins. For a comprehensive guide to writing queries using all the above-mentioned types, please refer to SiddhiQL Guide 3.0.

Writing a Projection Query

Projection queries can be used with;

  • Selecting required (or all) attributes for projection
  • Renaming attributes
  • Introducing the default value
  • Using mathematical and logical expressions (i.e.  Null check, Logical NOT, AND, OR)

Example:

 

Writing a Filtering Query

Filters can be used with input streams to filter events based on the given filter condition. Filter condition should defined in square brackets next to the input stream name.

Example:

 

Writing a Window Query

Window processors provide the ability to capture a subset of events based on a criterion from input event stream for calculation. Windows can define next to input streams using ‘#window.’ prefix and each input stream can only have a maximum of one window. For the ease of developers, WSO2 CEP 4.0 supports the following inbuilt windows types right out of the box.

  • time
  • timeBatch
  • length
  • lengthBatch
  • externalTime
  • cron
  • firstUnique
  • unique
  • sort
  • frequent
  • lossyFrequent

To learn more about those window types and their usages, please refer to Siddhi Inbuilt Windows guide. Moreover, Windows emit two events for each event they consume: they are current-events and expired-events. A window emits current-event when a new event arrives at the window and emits expired-event when an event in a window expires based on that window criteria.

Example:
From the events having symbol == IBM of the StockExchangeStream stream, output current events of the time window to the IBMStockQuote stream. Here the output events will have maximum, average and minimum prices that has arrived within last minute as their attributes.

 

Writing a Join Query

Join allows two event streams to be merged based on a condition. Here each stream should be joined with a window (if there are no window assigned #window.length(0) will be assigned to the input event stream by default). During the joining process each incoming event in each stream will be matched against all events in the other input event stream window based on the given condition and for all matching event pairs an output event will be generated.

  • With “on <join condition>” it joins only the events that matches the condition.
  • With “unidirectional” keyword, the trigger of joining process can be controlled. By default events arriving on both streams trigger the joining process and when “unidirectional” keyword is used on an input stream, only the events arriving on that stream will trigger the joining process.
  • With “within <time gap>” the joining process matched the events that are within defined time gap of each other.

Example:
According to the following example query, switch on temperature regulator if they are not already on, on all room which have current temperature greater than 30 degrees.

 

Writing a query with Aggregate Functions

Aggregate functions can be used with windows to perform aggregate calculations within the defined window. There are various Inbuilt Aggregate Functions available out of the box with WSO2 CEP 4.0.0.

  • sum
  • average
  • max
  • min
  • count
  • stddev

To learn more about above aggregate functions and their usages, please refer to Inbuilt Aggregate Functions guide.

Example:
According to the following example query, it should notify upon all event arrival and expiry the average temperature of all rooms based on all events arrived during last 10 minutes.

In this post it gave a brief introduction to what is WSO2 complex event processor, and how to write queries to process complex event streams with WSO2 CEP 4.0.0. In my next post, it explains writing a complete execution plan and setting up artifacts to solve a real-world scenario using WSO2 CEP 4.0.0.

  • Tony Mathew

    I created an Execution plan to process an input stream and store it in another outputstream. This is the Execution plan:

    /* Enter a unique ExecutionPlan */
    @Plan:name(‘ExecutionPlan’)

    /* Enter a unique description for ExecutionPlan */
    — @Plan:description(‘ExecutionPlan’)

    /* define streams/tables and write queries here … */

    @Import(‘SMARTHOME_DATA:1.0.0’)
    define stream inputStream (id string, value float, property bool, plug_id int, household_id int, house_id int);

    @Export(‘usageStream:1.0.0’)
    define stream outputStream (house_id int, maxVal float, minVal float, avgVal double, timestamp long);

    from inputStream[value>0]#window.time(1 min)
    select house_id,max(value) as maxVal,
    min(value) as minVal, avg(value) as avgVal,
    time:timestampInMilliseconds() as timestamp
    group by house_id
    insert current events into outputStream;

    But even after executing the above query, outputStream is empty.
    Note: I have made sure that inputStream is not empty before executing the query.

    If you have any solution to this problem, please contact me through my email: [email protected]

  • Amarjit Dhillon

    Can i merge (not join) various strams having different types in siddhi. This is not feasible in flink as flink requires them to have same type. By type i mean that stream A may be a Tuple5 and stream B may be of Tuple10. Also by merge i mean that there is no window or any join condition which means that selectivity of merge will be 1 always