Chapter 4. Understanding the Output Model

4.1. Introduction

The Esper output model is continuous: Update listeners to statements receive updated data as soon as the engine processes events for that statement, according to the statement's choice of event streams, views, filters and output rates.

As outlined in Chapter 3, API Reference the interface for listeners is net.esper.client.UpdateListener. Implementations must provide a single update method that the engine invokes when results become available:

The engine provides statement results to update listeners by placing results in net.esper.event.EventBean instances. A typical listener implementation queries the EventBean instances via getter methods to obtain the statement-generated results.

The get method on the EventBean interface can be used to retrieve result columns by name. The property name supplied to the get method can also be used to query nested, indexed or array properties of object graphs as discussed in more detail in Chapter 5, Event Representations.

The getUnderlying method on the EventBean interface allows update listeners to obtain the underlying event object. For wildcard selects, the underlying event is the event object that was sent into the engine via the sendEvent method. For joins and select clauses with expressions, the underlying object implements java.util.Map.

4.2. Insert Stream

In this section we look at the output of a very simple EQL statement. The statement selects an event stream without using a data window and without applying any filtering, as follows:

select * from Withdrawal

This statement selects all Withdrawal events. Every time the engine processes an event of type Withdrawal or any sub-type of Withdrawal, it invokes all update listeners, handing the new event to each of the statement's listeners.

The term insert stream denotes the new events arriving, and entering a data window or aggregation. The insert stream in this example is the stream of arriving Withdrawal events, and is posted to listeners as new events.

The diagram below shows a series of Withdrawal events 1 to 6 arriving over time. The number in parenthesis is the withdrawal amount, an event property that is used in the examples that discuss filtering.

Output example for a simple statement

Figure 4.1. Output example for a simple statement

The example statement above results in only new events and no old events posted by the engine to the statement's listeners.

4.3. Insert and Remove Stream

A length window instructs the engine to only keep the last N events for a stream. The next statement applies a length window onto the Withdrawal event stream. The statement serves to illustrate the concept of data window and events entering and leaving a data window:

select * from Withdrawal.win:length(5)

The size of this statement's length window is five events. The engine enters all arriving Withdrawal events into the length window. When the length window is full, the oldest Withdrawal event is pushed out the window. The engine indicates to listeners all events entering the window as new events, and all events leaving the window as old events.

While the term insert stream denotes new events arriving, the term remove stream denotes events leaving a data window, or changing aggregation values. In this example, the remove stream is the stream of Withdrawal events that leave the length window, and such events are posted to listeners as old events.

The next diagram illustrates how the length window contents change as events arrive and shows the events posted to an update listener.

Output example for a length window

Figure 4.2. Output example for a length window

As before, all arriving events are posted as new events to listeners. In addition, when event W1 leaves the length window on arrival of event W6, it is posted as an old event to listeners.

Similar to a length window, a time window also keeps the most recent events up to a given time period. A time window of 5 seconds, for example, keeps the last 5 seconds of events. As seconds pass, the time window actively pushes the oldest events out of the window resulting in one or more old events posted to update listeners.

Note EQL supports optional istream and rstream keywords on select-clauses and on insert-into clauses. These instruct the engine to only forward events that enter or leave data windows, or select only current or prior aggregation values, i.e. the insert stream or the remove stream.

4.4. Filters and Where-clauses

Filters to event streams allow filtering events out of a given stream before events enter a data window. The statement below shows a filter that selects Withdrawal events with an amount value of 200 or more.

select * from Withdrawal(amount>=200).win:length(5)

With the filter, any Withdrawal events that have an amount of less then 200 do not enter the length window and are therefore not passed to update listeners. Filters are discussed in more details in Section 6.3, “Filter Expressions”.

Output example for a statement with an event stream filter

Figure 4.3. Output example for a statement with an event stream filter

The where-clause and having-clause in statements eliminate potential result rows at a later stage in processing, after events have been processed into a statement's data window or other views.

The next statement applies a where-clause to Withdrawal events. Where-clauses are discussed in more detail in Section 7.5, “Specifying Search Conditions: the Where Clause”.

select * from Withdrawal.win:length(5) where amount >= 200

The where-clause applies to both new events and old events. As the diagram below shows, arriving events enter the window however only events that pass the where-clause are handed to update listeners. Also, as events leave the data window, only those events that pass the conditions in the where-clause are posted to listeners as old events.

Output example for a statement with where-clause

Figure 4.4. Output example for a statement with where-clause

The where-clause can contain complex conditions while event stream filters are more restrictive in the type of filters that can be specified. The next statement's where-clause applies the ceil function of the java.lang.Math Java library class in the where clause. The insert-into clause makes the results of the first statement available to the second statement:

insert into WithdrawalFiltered select * from Withdrawal where Math.ceil(amount) >= 200

select * from WithdrawalFiltered

4.5. Aggregation

Statements that aggregate events via aggregations functions also post remove stream events as aggregated values change.

Consider the following statement that alerts when 2 Withdrawal events have been received:

select count(*) as mycount from Withdrawal having count(*) = 2

When the engine encounters the second withdrawal event, the engine posts a new event to update listeners. The value of the "mycount" property on that new event is 2. Additionally, when the engine encounters the third Withdrawal event, it posts an old event to update listeners containing the prior value of the count. The value of the "mycount" property on that old event is also 2.

The istream or rstream keyword can be used to eliminate either new events or old events posted to listeners. The next statement uses the istream keyword causing the engine to call the listener only once when the second Withdrawal event is received:

select istream count(*) as mycount from Withdrawal having count(*) = 2

4.6. Time Windows

In this section we explain the output model of statements employing a time window view and a time batch view.

4.6.1. Time Window

A time window is a moving window extending to the specified time interval into the past based on the system time. Time windows enable us to limit the number of events considered by a query, as do length windows.

As a practical example, consider the need to determine all accounts where the average withdrawal amount per account for the last 4 seconds of withdrawals is greater then 1000. The statement to solve this problem is shown below.

select account, avg(amount) 
from Withdrawal.win:time(4 sec) 
group by account
having amount > 1000

The next diagram serves to illustrate the functioning of a time window. For the diagram, we assume a query that simply selects the event itself and does not group or filter events.

select * from Withdrawal.win:time(4 sec)

The diagram starts at a given time t and displays the contents of the time window at t + 4 and t + 5 seconds and so on.

Output example for a statement with a time window

Figure 4.5. Output example for a statement with a time window

The activity as illustrated by the diagram:

  1. At time t + 4 seconds an event W1 arrives and enters the time window. The engine reports the new event to update listeners.

  2. At time t + 5 seconds an event W2 arrives and enters the time window. The engine reports the new event to update listeners.

  3. At time t + 6.5 seconds an event W3 arrives and enters the time window. The engine reports the new event to update listeners.

  4. At time t + 8 seconds event W1 leaves the time window. The engine reports the event as an old event to update listeners.

4.6.2. Time Batch

The time batch view buffers events and releases them every specified time interval in one update. Time windows control the evaluation of events, as does the length batch window.

The next diagram serves to illustrate the functioning of a time batch view. For the diagram, we assume a simple query as below:

select * from Withdrawal.win:time_batch(4 sec)

The diagram starts at a given time t and displays the contents of the time window at t + 4 and t + 5 seconds and so on.

Output example for a statement with a time batch view

Figure 4.6. Output example for a statement with a time batch view

The activity as illustrated by the diagram:

  1. At time t + 1 seconds an event W1 arrives and enters the batch. No call to inform update listeners occurs.

  2. At time t + 3 seconds an event W2 arrives and enters the batch. No call to inform update listeners occurs.

  3. At time t + 4 seconds the engine processes the batched events and a starts a new batch. The engine reports events W1 and W2 to update listeners.

  4. At time t + 6.5 seconds an event W3 arrives and enters the batch. No call to inform update listeners occurs.

  5. At time t + 8 seconds the engine processes the batched events and a starts a new batch. The engine reports the event W3 as new data to update listeners. The engine reports the events W1 and W2 as old data (prior batch) to update listeners.

4.7. EventBean Query Results

The engine posts events to UpdateListener implementations as net.esper.event.EventBean instances. The EventBean represents a row (event) in your continuous query's result set.

Use the iterator method on EPStatement statements to poll or read data out of statements, if you require read-based access to statement result sets. Statement iterators also return EventBean instances.

The EventBean interface offers property type metadata via the getEventType method returning an EventType. The EventType provides property name, property type and underlying type information. This information can be useful to dynamically interrogate query results. The underlying event that an EventBean represents can be obtained via the getUnderlying method. Please see Chapter 5, Event Representations for more information on different event underlying objects.

Consider a statement that returns the symbol, count of events per symbol and average price per symbol for tick events. Our sample statement may declare a fully-qualified Java class name as the event type: org.sample.StockTickEvent. Assume that this class exists and exposes a symbol property of type String, and a price property of type (Java primitive) double.

select symbol, avg(price) as avgprice, count(*) as mycount 
from org.sample.StockTickEvent 
group by symbol

The next table summarizes the property names and types as posted by the statement above:

Table 4.1. Properties offered by sample statement aggregating price

NameTypeDescriptionJava code snippet
symboljava.lang.StringValue of symbol event property
eventBean.get("symbol")
avgpricejava.lang.DoubleAverage price per symbol
eventBean.get("avgprice")
mycountjava.lang.LongNumber of events per symbol
eventBean.get("mycount")

A code snippet out of a possible UpdateListener implementation to this statement may look as below:

String symbol = (String) newEvents[0].get("symbol");
Double price= (Double) newEvents[0].get("avgprice");
Long count= (Long) newEvents[0].get("mycount");

The engine supplies the boxed java.lang.Double and java.lang.Long types as property values rather then primitive Java types. This is because aggregated values can return a null value to indicate that no data is available for aggregation. Also, in a select statement that computes expressions, the underlying event objects to EventBean instances are of type java.util.Map.

Consider the next statement that specifies a wildcard selecting the same type of event:

select * from org.sample.StockTickEvent where price > 100

The property names and types provided by an EventBean query result row, as posted by the statement above are as follows:

Table 4.2. Properties offered by sample wildcard-select statement

NameTypeDescriptionJava code snippet
symboljava.lang.StringValue of symbol event property
eventBean.get("symbol")
pricedoubleValue of price event property
eventBean.get("price")

As an alternative to querying individual event properties via the get methods, the getUnderlying method on EventBean returns the underlying object representing the query result. In the sample statement that features a wildcard-select, the underlying event object is of type org.sample.StockTickEvent:

StockTickEvent tick = (StockTickEvent) newEvents[0].getUnderlying();