The Event Processing Language (EPL) is a SQL-like language with SELECT, FROM, WHERE, GROUP BY, HAVING and ORDER BY clauses. Streams replace tables as the source of data with events replacing rows as the basic unit of data. Since events are composed of data, the SQL concepts of correlation through joins, filtering and aggregation through grouping can be effectively leveraged. The INSERT INTO clause is recast as a means of forwarding events to other streams for further downstream processing. External data accessible through JDBC may be queried and joined with the stream data. Additional clauses such as the PATTERN and OUTPUT clauses are also available to provide the missing SQL language constructs specific to event processing.
EPL statements are used to derive and aggregate information from one or more streams of events, and to join or merge event streams. This section outlines EPL syntax. It also outlines the built-in views, which are the building blocks for deriving and aggregating information from event streams.
EPL statements contain definitions of one or more views. Similar to tables in a SQL statement, views define the data available for querying and filtering. Some views represent windows over a stream of events. Other views derive statistics from event properties, group events or handle unique event property values. Views can be staggered onto each other to build a chain of views. The Esper engine makes sure that views are reused among EPL statements for efficiency.
The built-in set of views is:
Data window views: win:length, win:length_batch, win:time, win:time_batch, win:time_length_batch, win:time_accum, win:ext_timed, ext:sort_window, ext:time_order, std:unique, std:groupby, std:lastevent
Views that derive statistics: std:size, stat:uni, stat:linest, stat:correl, stat:weighted_avg, stat:cube
EPL provides the concept of named window. Named windows are data windows that can be inserted-into and deleted-from by one or more statements, and that can queried by one or more statements. Named windows have a global character, being visible and shared across an engine instance beyond a single statement. Use the CREATE WINDOW clause to create named windows. Use the INSERT INTO clause to insert data into a named window, the ON DELETE clause to remove events from a named window, and the ON SELECT clause to perform a non-continuous fire-once query on a named window. Finally, the name of the named window can occur in a statement's FROM clause to query a named window or include the named window in a join or subquery.
Variables can come in handy to parameterize statements and change parameters on-the-fly and in response to events. Variables can be used in an expression anywhere in a statement as well as in the output clause for dynamic control of output rates.
Esper can be extended by plugging-in custom developed views and aggregation functions.
EPL queries are created and stored in the engine, and publish results to listeners as events are received by the engine or timer events occur that match the criteria specified in the query. Events can also be obtained from running EPL queries via the safeIterator and iterator methods that provide a pull-data API.
The select clause in an EPL query specifies the event properties or events to retrieve. The from clause in an EPL query specifies the event stream definitions and stream names to use. The where clause in an EPL query specifies search conditions that specify which event or event combination to search for. For example, the following statement returns the average price for IBM stock ticks in the last 30 seconds.
select avg(price) from StockTick.win:time(30 sec) where symbol='IBM'
EPL queries follow the below syntax. EPL queries can be simple queries or more complex queries. A simple select contains only a select clause and a single stream definition. Complex EPL queries can be build that feature a more elaborate select list utilizing expressions, may join multiple streams, may contain a where clause with search conditions and so on.
[insert into insert_into_def] select select_list from stream_def [as name] [, stream_def [as name]] [,...] [where search_conditions] [group by grouping_expression_list] [having grouping_search_conditions] [output output_specification] [order by order_by_expression_list]
Time-based windows as well as pattern observers and guards take a time period as a parameter. Time periods follow the syntax below.
time-period : [day-part] [hour-part] [minute-part] [seconds-part] [milliseconds-part] day-part : number ("days" | "day") hour-part : number ("hours" | "hour") minute-part : number ("minutes" | "minute" | "min") seconds-part : number ("seconds" | "second" | "sec") milliseconds-part : number ("milliseconds" | "millisecond" | "msec")
Some examples of time periods are:
10 seconds 10 minutes 30 seconds 20 sec 100 msec 1 day 2 hours 20 minutes 15 seconds 110 milliseconds 0.5 minutes
Comments can appear anywhere in the EPL or pattern statement text where whitespace is allowed. Comments can be written in two ways: slash-slash (// ...) comments and slash-star (/* ... */) comments.
Slash-slash comments extend to the end of the line:
// This comment extends to the end of the line. // Two forward slashes with no whitespace between them begin such comments. select * from MyEvent // this is a slash-slash comment // All of this text together is a valid statement.
Slash-star comments can span multiple lines:
/* This comment is a "slash-star" comment that spans multiple lines. * It begins with the slash-star sequence with no space between the '/' and '*' characters. * By convention, subsequent lines can begin with a star and are aligned, but this is * not required. */ select * from MyEvent /* this also works */
Comments styles can also be mixed:
select field1, // first comment /* second comment*/ field2 from MyEvent
Certain words such as select, delete or set are reserved and may not be used as identifiers. Please consult Appendix B, Reserved Keywords for the list of reserved keywords and permitted keywords.
Names of built-in functions and certain auxiliary keywords are permitted as event property names and in the rename syntax of the select clause. For example, count is acceptable.
Consider the example below, which assumes that 'last' is an event property of MyEvent:
// valid select last, count(*) as count from MyEvent
This example shows an incorrect use of a reserved keyword:
// invalid select insert from MyEvent
EPL does not offer an escape syntax for reserved keywords.
The select clause is required in all EPL statements. The select clause can be used to select all properties via the wildcard *, or to specify a list of event properties and expressions. The select clause defines the event type (event property names and types) of the resulting events published by the statement, or pulled from the statement via the iterator methods.
The select clause also offers optional istream, irstream and rstream keywords to control whether input stream, remove stream or input and remove stream events are posted to UpdateListener instances and observers to a statement. By default, the engine provides only the insert stream to listener and observers. See Section 10.4.14, “Engine Settings related to Stream Selection” on how to change the default.
The syntax for the select clause is summarized below.
select [istream | irstream | rstream] * | expression_list ...
The istream keyword is the default, and indicates that the engine only delivers insert stream events to listeners and observers. The irstream keyword indicates that the engine delivers both insert and remove stream. Finally, the rstream keyword tells the engine to deliver only the remove stream.
The syntax for selecting all event properties in a stream is:
select * from stream_def
The following statement selects StockTick events for the last 30 seconds of IBM stock ticks.
select * from StockTick(symbol='IBM').win:time(30 sec)
The * wildcard and expressions can also be combined in a select clause. The combination selects all event properties and in addition the computed values as specified by any additional expressions that are part of the select clause. Here is an example that selects all properties of stock tick events plus a computed product of price and volume that the statement names 'pricevolume':
select *, price * volume as pricevolume from StockTick(symbol='IBM')
When using wildcard (*), Esper does not actually copy your event properties out of your event or events. It simply wraps your native type in an EventBean interface. Your application has access to the underlying event object through the getUnderlying method and has access to the property values through the get method.
In a join statement, using the select * syntax selects one event property per stream to hold the event for that stream. The property name is the stream alias name in the from clause.
To choose the particular event properties to return:
select event_property [, event_property] [, ...] from stream_def
The following statement simply selects the symbol and price properties of stock ticks, and the total volume for stock tick events in a 60-second time window.
select symbol, price, sum(volume) from StockTick(symbol='IBM').win:time(60 sec)
The following statement declares a further view onto the event stream of stock ticks: the univariate statistics view (stat:uni). The statement selects the properties that this view derives from the stream, for the last 100 events of IBM stock ticks in the length window.
select datapoints, total, average, variance, stdev, stdevpa from StockTick(symbol='IBM').win:length(100).stat:uni(volume)
The select clause can contain one or more expressions.
select expression [, expression] [, ...] from stream_def
The following statement selects the volume multiplied by price for a time batch of the last 30 seconds of stock tick events.
select volume * price from StockTick.win:time_batch(30 sec)
Event properties and expressions can be renamed using below syntax.
select [event_property | expression] as identifier [, ...]
The following statement selects volume multiplied by price and specifies the name volPrice for the resulting column.
select volume * price as volPrice from StockTick.win:length(100)
Identifiers cannot contain the "." (dot) character, i.e. "vol.price" is not a valid identifier for the rename syntax.
If your statement is joining multiple streams, your may specify property names that are unique among the joined streams, or use wildcard (*) as explained earlier.
In case the property name in your select or other clauses is not unique considering all joined streams, you will need to use the alias name of the stream as a prefix to the property.
This example is a join between the two streams StockTick and News, respectively named as 'tick' and 'news'. The example selects from the StockTick event the symbol value using the 'tick' stream alias as a prefix:
select tick.symbol from StockTick.win:time(10) as tick, News.win:time(10) as news
Use the wildcard (*) selector in a join to generate a property for each stream, with the property value being the event itself. The output events of the statement below have two properties: the 'tick' property holds the StockTick event and the 'news' property holds the News event:
select * from StockTick.win:time(10) as tick, News.win:time(10) as news
The following syntax can also be used to specify what stream's properties to select:
select stream_name.* [as alias] from ...
The selection of tick.* selects the StockTick stream events only:
select tick.* from StockTick.win:time(10) as tick, News.win:time(10) as news where tick.symbol = news.symbol
The next example uses the as keyword to name each stream's joined events. This instructs the engine to create a property for each named event:
select tick.* as stocktick, news.* as news from StockTick.win:time(10) as tick, News.win:time(10) as news where stock.symbol = news.symbol
The output events of the above example have two properties 'stocktick' and 'news' that are the StockTick and News events.
The stream name itself, as further described in Section 4.4.4, “Using the Stream Name”, may be used within expressions or alone.
This example passes events to a user-defined function named compute and also shows insert-into to populate an event stream of combined events:
insert into TickNewStream select tick, news, MyLib.compute(news, tick) as result from StockTick.win:time(10) as tick, News.win:time(10) as news where tick.symbol = news.symbol // second statement that uses the TickNewStream stream select tick.price, news.text, result from TickNewStream
In summary, the stream_name.* streamname wildcard syntax can be used to select a stream as the underlying event or as a property, but cannot appear within an expression. While the stream_name syntax (without wildcard) always selects a property (and not as an underlying event), and can occur anywhere within an expression.
If your statement employs pattern expressions, then your pattern expression tags events with a tag name. Each tag name becomes available for use as a property in the select clause and all other clauses.
For example, here is a very simple pattern that matches on every StockTick event received within 30 seconds after start of the statement. The sample selects the symbol and price properties of the matching events:
select tick.symbol as symbol, tick.price as price from pattern[every tick=StockTick where timer:within(10 sec)]
The use of the wildcard selector, as shown in the next statement, creates a property for each tagged event in the output. The next statement outputs events that hold a single 'tick' property whose value is the event itself:
select * from pattern[every tick=StockTick where timer:within(10 sec)]
You may also select the matching event itself using the tick.* syntax. The engine outputs the StockTick event itself to listeners:
select tick.* from pattern[every tick=StockTick where timer:within(10 sec)]
A tag name as specified in a pattern is a valid expression itself. This example uses the insert into clause to make available the events matched by a pattern to further statements:
// make a new stream of ticks and news available insert into StockTickAndNews select tick, news from pattern [every tick=StockTick -> news=News(symbol=tick.symbol)] // second statement to select from the stream of ticks and news select tick.symbol, tick.price, news.text from StockTickAndNews
The optional istream, irstream and rstream keywords in the select clause control the event streams posted to listeners and observers to a statement.
If neither keyword is specified, and in the default engine configuration, the engine posts only insert stream events via the newEvents parameter to the update method of UpdateListener instances listening to the statement. The engine does not post remove stream events, by default.
The insert stream consists of the events entering the respective window(s) or stream(s) or aggregations, while the remove stream consists of the events leaving the respective window(s) or the changed aggregation result. See Chapter 3, Processing Model for more information on insert and remove streams.
The engine posts remove stream events to the oldEvents parameter of the update method only if either the irstream or the rstream keyword occurs in the select clause. This behavior can be changed via engine-wide configuration as described in Section 10.4.14, “Engine Settings related to Stream Selection”.
By specifying the istream keyword you can instruct the engine to only post insert stream events via the newEvents parameter to the update method on listeners. The engine will then not post any remove stream events, and the oldEvents parameter is always a null value.
By specifying the irstream keyword you can instruct the engine to post both insert stream and remove stream events.
By specifying the rstream keyword you can instruct the engine to only post remove stream events via the newEvents parameter to the update method on listeners. The engine will then not post any insert stream events, and the oldEvents parameter is also always a null value.
The following statement selects only the events that are leaving the 30 second time window.
select rstream * from StockTick.win:time(30 sec)
The istream and rstream keywords in the select clause are matched by same-name keywords available in the insert into clause. While the keywords in the select clause control the event stream posted to listeners to the statement, the same keywords in the insert into clause specify the event stream that the engine makes available to other statements.
Property or column names can optionally be qualified by a stream name and the provider URI. The syntax is:
[[provider_URI.]stream_name.]property_name
The provider_URI is the URI supplied to the EPServiceProviderManager class, or the string default for the default provider.
This example assumes the provider is the default provider:
select MyEvent.myProperty from MyEvent // ... equivalent to ... select default.MyEvent.myProperty from MyEvent
Stream names can also be qualified by the provider URI. The syntax is:
[provider_URI.]stream_name
The next example assumes a provider URI by name of Processor:
select Processor.MyEvent.myProperty from Processor.MyEvent
The from clause is required in all EPL statements. It specifies one or more event streams or named windows. Each event stream or named window can optionally be given a name by means of the as syntax.
from stream_def [as name] [, stream_def [as stream_name]] [, ...]
The event stream definition stream_def as shown in the syntax above can consists of either a filter-based event stream definition or a pattern-based event stream definition.
For joins and outer joins, specify two or more event streams. Joins between pattern-based and filter-based event streams are also supported. Joins are described in more detail in Section 4.10, “Joining Event Streams”.
Esper supports joins against relational databases for access to historical or reference data as explained in Section 4.14, “Joining Relational Data via SQL”. Esper can also join results returned by an arbitrary method invocation, as discussed in Section 4.15, “Joining Non-Relational Data via Method Invocation”.
The stream_name is an optional identifier assigned to the stream. The stream name can itself occur in any expression and provides access to the event itself from the named stream. Also, a stream name may be combined with a method name to invoke instance methods on events of that stream.
For filter-based event streams, the event stream definition stream_def as shown in the from clause syntax consists of an event type, optional filter expressions and an optional list of views that derive data from a stream. The syntax for a filter-based event stream is as below:
event_type ( [filter_criteria] ) [.view_spec] [.view_spec] [...]
The following EPL statement shows event type, filter criteria and views combined in one statement. It selects all event properties for the last 100 events of IBM stock ticks for volume. In the example, the event type is the fully qualified Java class name org.esper.example.StockTick. The expression filters for events where the property symbol has a value of "IBM". The optional view specifications for deriving data from the StockTick events are a length window and a view for computing statistics on volume. The name for the event stream is "volumeStats".
select * from org.esper.example.StockTick(symbol='IBM').win:length(100).stat:uni(volume) as volumeStats
Esper filters out events in an event stream as defined by filter criteria before it sends events to subsequent views. Thus, compared to search conditions in a where clause, filter criteria remove unneeded events early. In the above example, events with a symbol other then IBM do not enter the time window.
The simplest form of filter is a filter for events of a given type without any conditions on the event property values. This filter matches any event of that type regardless of the event's properties. The example below is such a filter.
select * from com.mypackage.myevents.RfidEvent
Instead of the fully-qualified Java class name any other event name can be mapped via Configuration to a Java class, making the resulting statement more readable:
select * from RfidEvent
Interfaces and superclasses are also supported as event types. In the below example IRfidReadable is an interface class.
select * from org.myorg.rfid.IRfidReadable
The filtering criteria to filter for events with certain event property values are placed within parenthesis after the event type name:
select * from RfidEvent(category="Perishable")
All expressions can be used in filters, including static methods that return a boolean value:
select * from com.mycompany.RfidEvent(MyRFIDLib.isInRange(x, y) or (x < 0 and y < 0))
Filter expressions can be separated via a single comma ','. The comma represents a logical AND between filter expressions:
select * from RfidEvent(zone=1, category=10) ...is equivalent to... select * from RfidEvent(zone=1 and category=10)
The following operators are highly optimized through indexing and are the preferred means of filtering in high-volume event streams:
equals =
not equals !=
comparison operators < , > , >=, <=
ranges
use the between keyword for a closed range where both endpoints are included
use the in keyword and round () or square brackets [] to control how endpoints are included
for inverted ranges use the not keyword and the between or in keywords
list-of-values checks using the in keyword or the not in keywords followed by a comma-separated list of values
At compile time as well as at run time, the engine scans new filter expressions for sub-expressions that can be indexed. Indexing filter values to match event properties of incoming events enables the engine to match incoming events faster. The above list of operators represents the set of operators that the engine can best convert into indexes. The use of comma or logical and in filter expressions does not impact optimizations by the engine.
Ranges come in the following 4 varieties. The use of round () or square [] bracket dictates whether an endpoint is included or excluded. The low point and the high-point of the range are separated by the colon : character.
Open ranges that contain neither endpoint (low:high)
Closed ranges that contain both endpoints [low:high]. The equivalent 'between' keyword also defines a closed range.
Half-open ranges that contain the low endpoint but not the high endpoint [low:high)
Half-closed ranges that contain the high endpoint but not the low endpoint (low:high]
The next statement shows a filter specifying a range for x and y values of RFID events. The range includes both endpoints therefore uses [] hard brackets.
mypackage.RfidEvent(x in [100:200], y in [0:100])
The between keyword is equivalent for closed ranges. The same filter using the between keyword is:
mypackage.RfidEvent(x between 100 and 200, y between 0 and 50)
The not keyword can be used to determine if a value falls outside a given range:
mypackage.RfidEvent(x not in [0:100])
The equivalent statement using the between keyword is:
mypackage.RfidEvent(x not between 0 and 100)
The in keyword for filter criteria determines if a given value matches any value in a list of values.
In this example we are interested in RFID events where the category matches any of the given values:
mypackage.RfidEvent(category in ('Perishable', 'Container'))
By using the not in keywords we can filter events with a property value that does not match any of the values in a list of values:
mypackage.RfidEvent(category not in ('Household', 'Electrical'))
The following restrictions apply to filter criteria:
Range and comparison operators require the event property to be of a numeric type.
Aggregation functions are not allowed within filter expressions.
The prev previous event function and the prior prior event function cannot be used in filter expressions.
Event pattern expressions can also be used to specify one or more event streams in an EPL statement. For pattern-based event streams, the event stream definition stream_def consists of the keyword pattern and a pattern expression in brackets []. The syntax for an event stream definition using a pattern expression is below. As in filter-based event streams, an optional list of views that derive data from the stream can be supplied.
pattern [pattern_expression] [.view_spec] [.view_spec] [...]
The next statement specifies an event stream that consists of both stock tick events and trade events. The example tags stock tick events with the name "tick" and trade events with the name "trade".
select * from pattern [every tick=StockTickEvent or every trade=TradeEvent]
This statement generates an event every time the engine receives either one of the event types. The generated events resemble a map with "tick" and "trade" keys. For stock tick events, the "tick" key value is the underlying stock tick event, and the "trade" key value is a null value. For trade events, the "trade" key value is the underlying trade event, and the "tick" key value is a null value.
Lets further refine this statement adding a view the gives us the last 30 seconds of either stock tick or trade events. Lets also select prices and a price total.
select tick.price as tickPrice, trade.price as tradePrice, sum(tick.price) + sum(trade.price) as total from pattern [every tick=StockTickEvent or every trade=TradeEvent].win:time(30 sec)
Note that in the statement above tickPrice and tradePrice can each be null values depending on the event processed. Therefore, an aggregation function such as sum(tick.price + trade.price)) would always return null values as either of the two price properties are always a null value for any event matching the pattern. Use the coalesce function to handle null values, for example: sum(coalesce(tick.price, 0) + coalesce(trade.price, 0)).
Views are used to derive or aggregate data. Views can be staggered onto each other. See the section Chapter 8, EPL Reference: Views on the views available.
Views can optionally take one or more parameters. These parameters can consist of primitive constants such as String, boolean or numeric types. Arrays are also supported as a view parameter types.
The below example serves to show views and staggering of views. It uses a car location event that contains information about the location of a car on a highway.
The first view std:groupby(carId) groups car location events by car id. The second view win:length(4) keeps a length window of the 4 last events, with one length window for each car id. The next view std:groupby(expressway, direction, segment) groups each event by its expressway, direction and segment property values. Again, the grouping is done for each car id considering the last 4 events only. The last view std:size() is used to report the number of events. Thus the below example reports the number of events per car id and per expressway, direction and segment considering the last 4 events for each car id only.
select * from CarLocEvent.std:groupby(carId).win:length(4). std:groupby(expressway, direction, segment).std:size()
Your from clause may assign a name to each stream. This assigned stream name can serve any of the following purposes.
First, the stream name can be used to disambiguate property names. The stream_name.property_name syntax uniquely identifies which property to select if property names overlap between streams. Here is an example:
select prod.productId, ord.productId from ProductEvent as prod, OrderEvent as ord
Second, the stream name can be used with a wildcard (*) character to select events in a join, or assign new names to the streams in a join:
// Select ProductEvent only select prod.* from ProductEvent as prod, OrderEvent // Assign column names 'product' and 'order' to each event select prod.* as product, ord.* as order from ProductEvent as prod, OrderEvent as ord
Further, the stream name by itself can occur in any expression: The engine passes the event itself to that expression. For example, the engine passes the ProductEvent and the OrderEvent to the user-defined function 'checkOrder':
select prod.productId, MyFunc.checkOrder(prod, ord) from ProductEvent as prod, OrderEvent as ord
Last, you may invoke an instance method on each event of a stream, and pass parameters to the instance method as well. Instance method calls are allowed anywhere in an expression.
The next statement demonstrates this capability by invoking a method 'computeTotal' on OrderEvent events and a method 'getMultiplier' on ProductEvent events:
select ord.computeTotal(prod.getMultiplier()) from ProductEvent as prod, OrderEvent as ord
The where clause is an optional clause in EPL statements. Via the where clause event streams can be joined and events can be filtered.
Comparison operators =, < , > , >=, <=, !=, <>, is null, is not null and logical combinations via and and or are supported in the where clause. The where clause can also introduce join conditions as outlined in Section 4.10, “Joining Event Streams”. where clauses can also contain expressions. Some examples are listed below.
...where fraud.severity = 5 and amount > 500 ...where (orderItem.orderId is null) or (orderItem.class != 10) ...where (orderItem.orderId = null) or (orderItem.class <> 10) ...where itemCount / packageCount > 10
The aggregate functions are sum, avg, count, max, min, median, stddev, avedev. You can use aggregate functions to calculate and summarize data from event properties. For example, to find out the total price for all stock tick events in the last 30 seconds, type:
select sum(price) from StockTickEvent.win:time(30 sec)
Here is the syntax for aggregate functions:
aggregate_function( [all | distinct] expression)
You can apply aggregate functions to all events in an event stream window or other view, or to one or more groups of events. From each set of events to which an aggregate function is applied, Esper generates a single value.
Expression is usually an event property name. However it can also be a constant, function, or any combination of event property names, constants, and functions connected by arithmetic operators.
For example, to find out the average price for all stock tick events in the last 30 seconds if the price was doubled:
select avg(price * 2) from StockTickEvent.win:time(30 seconds)
You can use the optional keyword distinct with all aggregate functions to eliminate duplicate values before the aggregate function is applied. The optional keyword all which performs the operation on all events is the default.
You can use aggregation functions in a select clause and in a having clause. You cannot use aggregate functions in a where clause, but you can use the where clause to restrict the events to which the aggregate is applied. The next query computes the average and sum of the price of stock tick events for the symbol IBM only, for the last 10 stock tick events regardless of their symbol.
select 'IBM stats' as title, avg(price) as avgPrice, sum(price) as sumPrice from StockTickEvent.win:length(10) where symbol='IBM'
In the above example the length window of 10 elements is not affected by the where clause, i.e. all events enter and leave the length window regardless of their symbol. If we only care about the last 10 IBM events, we need to add filter criteria as below.
select 'IBM stats' as title, avg(price) as avgPrice, sum(price) as sumPrice from StockTickEvent(symbol='IBM').win:length(10) where symbol='IBM'
You can use aggregate functions with any type of event property or expression, with the following exceptions:
You can use sum, avg, median, stddev, avedev with numeric event properties only
Esper ignores any null values returned by the event property or expression on which the aggregate function is operating, except for the count(*) function, which counts null values as well. All aggregate functions return null if the data set contains no events, or if all events in the data set contain only null values for the aggregated expression.
The group by clause is optional in all EPL statements. The group by clause divides the output of an EPL statement into groups. You can group by one or more event property names, or by the result of computed expressions. When used with aggregate functions, group by retrieves the calculations in each subgroup. You can use group by without aggregate functions, but generally that can produce confusing results.
For example, the below statement returns the total price per symbol for all stock tick events in the last 30 seconds:
select symbol, sum(price) from StockTickEvent.win:time(30 sec) group by symbol
The syntax of the group by clause is:
group by arregate_free_expression [, arregate_free_expression] [, ...]
Esper places the following restrictions on expressions in the group by clause:
Expressions in the group by cannot contain aggregate functions
Event properties that are used within aggregate functions in the select clause cannot also be used in a group by expression
You can list more then one expression in the group by clause to nest groups. Once the sets are established with group by the aggregation functions are applied. This statement posts the median volume for all stock tick events in the last 30 seconds per symbol and tick data feed. Esper posts one event for each group to statement listeners:
select symbol, tickDataFeed, median(volume) from StockTickEvent.win:time(30 sec) group by symbol, tickDataFeed
In the statement above the event properties in the select list (symbol, tickDataFeed) are also listed in the group by clause. The statement thus follows the SQL standard which prescribes that non-aggregated event properties in the select list must match the group by columns.
Esper also supports statements in which one or more event properties in the select list are not listed in the group by clause. The statement below demonstrates this case. It calculates the standard deviation for the last 30 seconds of stock ticks aggregating by symbol and posting for each event the symbol, tickDataFeed and the standard deviation on price.
select symbol, tickDataFeed, stddev(price) from StockTickEvent.win:time(30 sec) group by symbol
The above example still aggregates the price event property based on the symbol, but produces one event per incoming event, not one event per group.
Additionally, Esper supports statements in which one or more event properties in the group by clause are not listed in the select list. This is an example that calculates the mean deviation per symbol and tickDataFeed and posts one event per group with symbol and mean deviation of price in the generated events. Since tickDataFeed is not in the posted results, this can potentially be confusing.
select symbol, avedev(price) from StockTickEvent.win:time(30 sec) group by symbol, tickDataFeed
Expressions are also allowed in the group by list:
select symbol * price, count(*) from StockTickEvent.win:time(30 sec) group by symbol * price
If the group by expression resulted in a null value, the null value becomes its own group. All null values are aggregated into the same group. If you are using the count(expression) aggregate function which does not count null values, the count returns zero if only null values are encountered.
You can use a where clause in a statement with group by. Events that do not satisfy the conditions in the where clause are eliminated before any grouping is done. For example, the statement below posts the number of stock ticks in the last 30 seconds with a volume larger then 100, posting one event per group (symbol).
select symbol, count(*) from StockTickEvent.win:time(30 sec) where volume > 100 group by symbol
Use the having clause to pass or reject events defined by the group-by clause. The having clause sets conditions for the group by clause in the same way where sets conditions for the select clause, except where cannot include aggregate functions, while having often does.
This statement is an example of a having clause with an aggregate function. It posts the total price per symbol for the last 30 seconds of stock tick events for only those symbols in which the total price exceeds 1000. The having clause eliminates all symbols where the total price is equal or less then 1000.
select symbol, sum(price) from StockTickEvent.win:time(30 sec) group by symbol having sum(price) > 1000
To include more then one condition in the having clause combine the conditions with and, or or not. This is shown in the statement below which selects only groups with a total price greater then 1000 and an average volume less then 500.
select symbol, sum(price), avg(volume) from StockTickEvent.win:time(30 sec) group by symbol having sum(price) > 1000 and avg(volume) < 500
Esper places the following restrictions on expressions in the having clause:
Any expressions that contain aggregate functions must also occur in the select clause
A statement with the having clause should also have a group by clause. If you omit group-by, all the events not excluded by the where clause return as a single group. In that case having acts like a where except that having can have aggregate functions.
The having clause can also be used without group by clause as the below example shows. The example below posts events where the price is less then the current running average price of all stock tick events in the last 30 seconds.
select symbol, price, avg(price) from StockTickEvent.win:time(30 sec) having price < avg(price)
When you include filters, the where condition, the group by clause and the having condition in an EPL statement the sequence in which each clause affects events determines the final result:
The event stream's filter condition, if present, dictates which events enter a window (if one is used). The filter discards any events not meeting filter criteria.
The where clause excludes events that do not meet its search condition.
Aggregate functions in the select list calculate summary values for each group.
The having clause excludes events from the final results that do not meet its search condition.
The following query illustrates the use of filter, where, group by and having clauses in one statement with a select clause containing an aggregate function.
select tickDataFeed, stddev(price) from StockTickEvent(symbol='IBM').win:length(10) where volume > 1000 group by tickDataFeed having stddev(price) > 0.8
Esper filters events using the filter criteria for the event stream StockTickEvent. In the example above only events with symbol IBM enter the length window over the last 10 events, all other events are simply discarded. The where clause removes any events posted by the length window (events entering the window and event leaving the window) that do not match the condition of volume greater then 1000. Remaining events are applied to the stddev standard deviation aggregate function for each tick data feed as specified in the group by clause. Each tickDataFeed value generates one event. Esper applies the having clause and only lets events pass for tickDataFeed groups with a standard deviation of price greater then 0.8.
The group by clause as well as the built-in std:groupby view are similar in their ability to group events. This section explains the key differences in their behavior and use.
The group by clause works together with aggregation functions in your statement to produce an aggregation result per group. In greater detail, this means that when a new event arrives, the engine applies the expressions in the group by clause to determine a grouping key. If the engine has not encountered that grouping key before (a new group), the engine creates a set of new aggregation results for that grouping key and performs the aggregation changing that new set of aggregation results. If the grouping key points to an existing set of prior aggregation results (an existing group), the engine performs the aggregation changing the prior set of aggregation results for that group.
The std:groupby view is a built-in view that also groups events. The view is described in greater detail in Section 8.2.2, “Group-By (std:groupby)”. Its primary use is to create a separate data window per group, or more generally to create separate instances of all its sub-views for each grouping key encountered.
The next example shows two queries that produce equivalent results. The query using the group by clause is generally preferable as is easier to read. The second form introduces the stat:uni view which computes univariate statistics for a given property:
select symbol, avg(price) from StockTickEvent group by symbol // ... is equivalent to ... select symbol, average from StockTickEvent.std:groupby(symbol).stat:uni(price)
The next example shows two queries that are NOT equivalent as the length window is ungrouped in the first query, and grouped in the second query:
select symbol, sum(price) from StockTickEvent.win:length(10) group by symbol // ... NOT equivalent to ... select symbol, sum(price) from StockTickEvent.std:groupby(symbol).win:length(10)
The key difference between the two statements is that in the first statement the length window is ungrouped and applies to all events regardless of group. While in the second query each group gets its own instance of a length window. For example, in the second query events arriving for symbol "ABC" get a length window of 10 events, and events arriving for symbol "DEF" get their own length window of 10 events.
The output clause is optional in Esper and is used to control or stabilize the rate at which events are output. For example, the following statement outputs, every 60 seconds, the total price for all orders in the 30-minute time window:
select sum(price) from OrderEvent.win:time(30 min) output snapshot every 60 seconds
Here is the syntax for output rate limiting:
output [all | first | last | snapshot] every output_rate [minutes | seconds | events]
An alternate syntax specifies the time period as outlined in Section 4.2.1, “Specifying Time Periods” :
output [all | first | last | snapshot] every time_period
The all keyword is the default and specifies that all events in a batch should be output, each incoming row in the batch producing an output row. Note that for statements that group via the group by clause, the all keyword provides special behavior as below.
The first keyword specifies that only the first event in an output batch is to be output. Using the first keyword instructs the engine to output the first matching event as soon as it arrives, and then ignores matching events for the time interval or number of events specified. After the time interval elapsed, or the number of matching events has been reached, the next first matching event is output again and the following interval the engine again ignores matching events.
The last keyword specifies to only output the last event at the end of the given time interval or after the given number of matching events have been accumulated. Again, for statements that group via the group by clause the last keyword provides special behavior as below.
The snapshot keyword indicates that the engine output current computation results considering all events as per views specified and/or current aggregation results. While the other keywords control how a batch of events between output intervals is being considered, the snapshot keyword outputs all current state of a statement independent of the last batch. It's output is equivalent to the iterator method provided by a statement.
The output_rate is the frequency at which the engine outputs events. It can be specified in terms of time or number of events. The value can be a number to denote a fixed output rate, or the name of a variable whose value is the output rate. By means of a variable the output rate can be controlled externally and changed dynamically at runtime.
Please consult the Appendix A, Output Reference and Samples for detailed information on insert and remove stream output for the various output rate limiting keywords.
The time interval can also be specified in terms of minutes; the following statement is identical to the first one.
select * from StockTickEvent.win:length(5) output every 1.5 minutes
A second way that output can be stabilized is by batching events until a certain number of events have been collected. The next statement only outputs when either 5 (or more) new or 5 (or more) old events have been batched.
select * from StockTickEvent.win:time(30 sec) output every 5 events
Additionally, event output can be further modified by the optional last keyword, which causes output of only the last event to arrive into an output batch.
select * from StockTickEvent.win:time(30 sec) output last every 5 events
Using the first keyword you can be notified at the start of the interval. The allows to watch for situations such as a rate falling below a threshold and only be informed every now and again after the specified output interval, but be informed the moment it first happens.
select * from TickRate.win:time(30 seconds) where rate<100 output first every 60 seconds
Remove stream events can also useful in conjunction with aggregation and output rate limiting: When the engine posts remove stream events for fully-aggregated queries, it presents the aggregation state before the expiring event leaves the data window. Your application can thus easily obtain a delta between the new aggregation value and the prior aggregation value.
The engine evaluates the having-clause at the granularity of the data posted by views. That is, if you utilize a time window and output every 10 events, the having clause applies to each individual event or events entering and leaving the time window (and not once per batch of 10 events).
The output clause interacts in two ways with the group by and having clauses. First, in the output every n events case, the number n refers to the number of events arriving into the group by clause. That is, if the group by clause outputs only 1 event per group, or if the arriving events don't satisfy the having clause, then the actual number of events output by the statement could be fewer than n.
Second, the last and all keywords have special meanings when used in a statement with aggregate functions and the group by clause:
When no keyword is specified, the engine produces an output row for each row in the batch.
The all keyword (the default) specifies that the most recent data for all groups seen so far should be output, whether or not these groups' aggregate values have just been updated
The last keyword specifies that only groups whose aggregate values have been updated with the most recent batch of events should be output.
Please consult the Appendix A, Output Reference and Samples for detailed information on insert and remove stream output for aggregation and group-by.
By adding an output rate limiting clause to a statement that contains a group by clause we can control output of groups to obtain one row for each group, generating an event per group at the given output frequency:
select symbol, sum(price) from StockTickEvent group by symbol output all every 5 seconds
Output rate limiting provides output events to your application in regular intervals. Between intervals, the engine uses a buffer to hold events until the output condition is reached. If your application has high-volume streams, you may need to be mindful of the memory needs for output rates.
The output clause with the snapshot keyword does not require a buffer, all other output keywords do consume memory until the output condition is reached.
The order by clause is optional. It is used for ordering output events by their properties, or by expressions involving those properties. .
For example, the following statement outputs batches of 5 or more stock tick events that are sorted first by price ascending and then by volume ascending:
select symbol from StockTickEvent.win:time(60 sec) output every 5 events order by price, volume
Here is the syntax for the order by clause:
order by expression [asc | desc] [, expression [asc | desc]] [, ...]
If the order by clause is absent then the engine still makes certain guarantees about the ordering of output:
If the statement is not a join, does not group via group by clause and does not declare grouped data windows via std:groupby view, the order in which events are delivered to listeners and through the iterator pull API is the order of event arrival.
If the statement is a join or outer join, or groups, then the order in which events are delivered to listeners and through the iterator pull API is not well-defined. Use the order by clause if your application requires events to be delivered in a well-defined order.
Esper places the following restrictions on the expressions in the order by clause:
All aggregate functions that appear in the order by clause must also appear in the select expression.
Otherwise, any kind of expression that can appear in the select clause, as well as any alias defined in the select clause, is also valid in the order by clause.
The insert into clause is optional in Esper. The clause can be specified to make the results of a statement available as an event stream for use in further statements, or to insert events into a named window. The clause can also be used to merge multiple event streams to form a single stream of events.
The syntax for the insert into clause is as follows:
insert [istream | rstream] into event_stream_name [ (property_name [, property_name] ) ]
The istream (default) and rstream keywords are optional. If no keyword or the istream keyword is specified, the engine supplies the insert stream events generated by the statement. The insert stream consists of the events entering the respective window(s) or stream(s). If the rstream keyword is specified, the engine supplies the remove stream events generated by the statement. The remove stream consists of the events leaving the respective window(s).
The event_stream_name is an identifier that names the event stream (and also implicitly names the types of events in the stream) generated by the engine. The identifier can be used in further statements to filter and process events of that event stream. The insert into clause can consist of just an event stream name, or an event stream name and one or more property names.
The engine also allows listeners to be attached to a statement that contain an insert into clause. Listeners receive all events posted to the event stream.
To merge event streams, simply use the same event_stream_name identifier in all EPL statements that merge their result event streams. Make sure to use the same number and names of event properties and event property types match up.
Esper places the following restrictions on the insert into clause:
The number of elements in the select clause must match the number of elements in the insert into clause if the clause specifies a list of event property names
If the event stream name has already been defined by a prior statement or configuration, and the event property names and/or event types do not match, an exception is thrown at statement creation time.
The following sample inserts into an event stream by name CombinedEvent:
insert into CombinedEvent select A.customerId as custId, A.timestamp - B.timestamp as latency from EventA.win:time(30 min) A, EventB.win:time(30 min) B where A.txnId = B.txnId
Each event in the CombinedEvent event stream has two event properties named "custId" and "latency". The events generated by the above statement can be used in further statements, such as shown in the next statement:
select custId, sum(latency) from CombinedEvent.win:time(30 min) group by custId
The example statement below shows the alternative form of the insert into clause that explicitly defines the property names to use.
insert into CombinedEvent (custId, latency) select A.customerId, A.timestamp - B.timestamp ...
The rstream keyword can be useful to indicate to the engine to generate only remove stream events. This can be useful if we want to trigger actions when events leave a window rather then when events enter a window. The statement below generates CombinedEvent events when EventA and EventB leave the window after 30 minutes (1800 seconds).
insert rstream into CombinedEvent select A.customerId as custId, A.timestamp - B.timestamp as latency from EventA.win:time(30 min) A, EventB.win:time(30 min) B where A.txnId = B.txnId
The insert into clause can be used in connection with patterns to provide pattern results to further statements for analysis:
insert into ReUpEvent select linkUp.ip as ip from pattern [every linkDown=LinkDownEvent -> linkUp=LinkUpEvent(ip=linkDown.ip)]
Sometimes your events may carry properties that are themselves event objects. Therefore EPL offers a special syntax to insert the value of a property itself as an event into a stream:
insert into stream_name select property_name.* from ...
This feature is only supported for JavaBean events and is not supported for Map or XML events. Nested property names are also not supported.
In this example, the class Summary with properties bid and ask that are of type Quote is:
public class Summary { private Quote bid; private Quote ask; ...
The statement to populate a stream of Quote events is thus:
insert into MyBidStream select bid.* from Summary
The insert into clause allows to merge multiple event streams into a event single stream. The clause names an event stream to insert into by specifing an event_stream_name. The first statement that inserts into the named stream defines the stream's event types. Further statements that insert into the same event stream must match the type of events inserted into the stream as declared by the first statement.
One approach to merging event streams specifies individual colum names either in the select clause or in the insert into clause of the statement. This approach has been shown in earlier examples.
Another approach to merging event streams specifies the wildcard (*) in the select clause (or the stream wildcard) to select the underlying event. The events in the event stream must then have the same event type as generated by the from clause.
Assume a statement creates an event stream named MergedStream by selecting OrderEvent events:
insert into MergedStream select * from OrderEvent
A statement can use the stream wildcard selector to select only OrderEvent events in a join:
insert into MergedStream select ord.* from ItemScanEvent, OrderEvent as ord
And a statement may also use an application-supplied user-defined function to convert events to OrderEvent instances:
insert into MergedStream select MyLib.convert(item) from ItemScanEvent as item
Esper specifically recognizes a conversion function: A conversion function must be the only selected column, and it must return either a Java object or java.util.Map.
A variant stream is a predefined stream into which events of multiple disparate event types can be inserted.
A variant stream name may appear anywhere in a pattern or from clause. In a pattern, a filter against a variant stream matches any events of any of the event types inserted into the variant stream. In a from clause including for named windows, views declared onto a variant stream may hold events of any of the event types inserted into the variant stream.
A variant stream is thus useful in problems that require different types of event to be treated the same.
Variant streams are predefined via runtime or initialization-time configuration as described in Section 10.4.17, “Variant Stream”. Your application may predefine variant streams to carry events of a limited set of event types, or you may choose the variant stream to carry any and all types of events. This choice affects what event properties are available for consuming statements or patterns of the variant stream.
Assume that an application predefined a variant stream named OrderStream to carry only ServiceOrder and ProductOrder events. An insert into clause inserts events into the variant stream:
insert into OrderStream select * from ServiceOrder insert into OrderStream select * from ProductOrder
Here is a sample statement that consumes the variant stream and outputs a total price per customer id for the last 30 seconds of ServiceOrder and ProductOrder events:
select customerId, sum(price) from OrderStream.win:time(30 sec) group by customerId
If your application predefines the variant stream to hold specific type of events, as the sample above did, then all event properties that are common to all specified types are visible on the variant stream, including nested, indexed and mapped properties. For access to properties that are only available on one of the types, the dynamic property syntax must be used. In the example above, the customerId and price were properties common to both ServiceOrder and ProductOrder events.
For example, here is a consuming statement that selects a service duraction property that only ServiceOrder events have, and that must therefore be casted to double and null values removed in order to aggregate:
select customerId, sum(coalesce(cast(serviceDuraction?, double), 0)) from OrderStream.win:time(30 sec) group by customerId
If your application predefines a variant stream to hold any type of events (the any type variance), then all event properties of the variant stream are effectively dynamic properties.
For example, an application may define an OutgoingEvents variant stream to hold any type of event. The next statement is a sample consumer of the OutgoingEvents variant stream that looks for the destination property and fires for each event in which the property exists with a value of 'email':
select * from OutgoingEvents(destination = 'email')
Two or more event streams can be part of the from clause and thus both (all) streams determine the resulting events. The where clause lists the join conditions that Esper uses to relate events in the two or more streams. Reference and historical data such as stored in your relational database, and data returned by a method invocation, can also be included in joins. Please see Section 4.14, “Joining Relational Data via SQL” and Section 4.15, “Joining Non-Relational Data via Method Invocation” for details.
Each point in time that an event arrives to one of the event streams, the two event streams are joined and output events are produced according to the where clause.
This example joins 2 event streams. The first event stream consists of fraud warning events for which we keep the last 30 minutes (1800 seconds). The second stream is withdrawal events for which we consider the last 30 seconds. The streams are joined on account number.
select fraud.accountNumber as accntNum, fraud.warning as warn, withdraw.amount as amount, max(fraud.timestamp, withdraw.timestamp) as timestamp, 'withdrawlFraud' as desc from com.espertech.esper.example.atm.FraudWarningEvent.win:time(30 min) as fraud, com.espertech.esper.example.atm.WithdrawalEvent.win:time(30 sec) as withdraw where fraud.accountNumber = withdraw.accountNumber
Joins can also include one or more pattern statements as the next example shows:
select * from FraudWarningEvent.win:time(30 min) as fraud, pattern [every w=WithdrawalEvent -> PINChangeEvent(acct=w.acct)] as withdraw where fraud.accountNumber = withdraw.w.accountNumber
The statement above joins the last 30 minutes of fraud warnings with a pattern. The pattern consists of every withdrawal event that is followed by a PIN change event for the same account number. It joins the two event streams on account number.
In a join and outer join, if your statement does not declare a data window view or other view onto a stream, then the engine retains all events for the stream equivalent to the keep-all data window.
The next example joins all FraudWarningEvent events that arrived since the statement was started, with the last 20 seconds of PINChangeEvent events:
select * from FraudWarningEvent as fraud, PINChangeEvent.win:time(20 sec) as pin where fraud.accountNumber = pin.accountNumber
Esper supports left outer joins, right outer joins and full outer joins between an unlimited number of event streams. Outer joins can also join reference and historical data as explained in Section 4.14, “Joining Relational Data via SQL”, as well as join data returned by a method invocation as outlined in Section 4.15, “Joining Non-Relational Data via Method Invocation”.
The keywords left, right and full control the type of the join between two streams. The on clause specifies one or more properties that join each stream. The synopsis is as follows:
...from stream_def [as name] left|right|full outer join stream_def on property = property [and property = property ...] [left|right|full outer join stream_def on ...]...
If the outer join is a left outer join, there will be an output event for each event of the stream on the left-hand side of the clause. For example, in the left outer join shown below we will get output for each event in the stream RfidEvent, even if the event does not match any event in the event stream OrderList.
select * from RfidEvent.win:time(30 sec) as rfid left outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId
Similarly, if the join is a Right Outer Join, then there will be an output event for each event of the stream on the right-hand side of the clause. For example, in the right outer join shown below we will get output for each event in the stream OrderList, even if the event does not match any event in the event stream RfidEvent.
select * from RfidEvent.win:time(30 sec) as rfid right outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId
For all types of outer joins, if the join condition is not met, the select list is computed with the event properties of the arrived event while all other event properties are considered to be null.
The last type of outer join is a full outer join. In a full outer join, each point in time that an event arrives to one of the event streams, one or more output events are produced. In the example below, when either an RfidEvent or an OrderList event arrive, one or more output event is produced. The next example shows a full outer join that joins on multiple properties:
select * from RfidEvent.win:time(30 sec) as rfid full outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId and rfid.assetId = orderList.assetId
Finally, this example outer joins multiple streams. Here the RfidEvent stream is outer joined to both ProductName and LocationDescription via left outer join:
select * from RfidEvent.win:time(30 sec) as rfid left outer join ProductName.win:keepall() as refprod on rfid.productId = refprod.prodId left outer join LocationDescription.win:keepall() as refdesc on rfid.location = refdesc.locId
In a join or outer join your statement lists multiple event streams, views and/or patterns in the from clause. As events arrive into the engine, each of the streams (views, patterns) provides insert and remove stream events. The engine evaluates each insert and remove stream event provided by each stream, and joins or outer joins each event against data window contents of each stream, and thus generates insert and remove stream join results.
The direction of the join execution depends on which stream or streams are currently providing an insert or remove stream event for executing the join. A join is thus multidirectional, or bidirectional when only two streams are joined. A join can be made unidirectional if your application does not want new results when events arrive on a given stream or streams.
The unidirectional keyword can be used in the from clause to identify a single stream that provides the events to execute the join. If the keyword is present for a stream, all other streams in the from clause become passive streams. When events arrive or leave a data window of a passive stream then the join does not generate join results.
For example, consider a use case that requires us to join stock tick events (TickEvent) and news events (NewsEvent). The unidirectional keyword allows to generate results only when TickEvent events arrive, and not when NewsEvent arrive or leave the 10-second time window:
select * from TickEvent unidirectional, NewsEvent.win:time(10 sec) where tick.symbol = news.symbol
The following restrictions apply to unidirectional joins:
The unidirectional keyword can only be specified for a single stream in the from clause.
Receiving data from a unidirectional join via the pull API (iterator method) is not allowed. This is because the engine holds no state for the single stream that provides the events to execute the join.
The stream that declares the unidirectional keyword cannot declare a data window view or other view for that stream, since remove stream events are not processed for the single stream.
A subquery is a select within another statement. Esper supports subqueries in the select clause and in the where clause of EPL statements. Subqueries provide an alternative way to perform operations that would otherwise require complex joins. Subqueries can also make statements more readable then complex joins.
Esper supports both simple subqueries as well as correlated subqueries. In a simple subquery, the inner query is not correlated to the outer query. Here is an example simple subquery within a select clause:
select assetId, (select zone from ZoneClosed.std:lastevent) as lastClosed from RFIDEvent
If the inner query is dependent on the outer query, we will have a correlated subquery. An example of a correlated subquery is shown below. Notice the where clause in the inner query, where the condition involves a stream from the outer query:
select * from RfidEvent as RFID where 'Dock 1' = (select name from Zones.std:unique(zoneId) where zoneId = RFID.zoneId)
The example above shows a subquery in the where clause. The statement selects RFID events in which the zone name matches a string constant based on zone id. The statement uses the view std:unique to guarantee that only the last event per zone id is held from processing by the subquery.
The next example is a correlated subquery within a select clause. In this statement the select clause retrieves the zone name by means of a subquery against the Zones set of events correlated by zone id:
select zoneId, (select name from Zones.std:unique(zoneId) where zoneId = RFID.zoneId) as name from RFIDEvent
Note that when a simple or correlated subquery returns multiple rows, the engine returns a null value as the subquery result. To limit the number of events returned by a subquery consider using one of the views std:lastevent, std:unique and std:groupby.
The select clause of a subquery also allows wildcard selects, which return as an event property the underlying event object of the event type as defined in the from clause. An example:
select (select * from MarketData.std:lastevent()) as md from pattern [every timer:interval(10 sec)]
The output events to the statement above contain the underlying MarketData event in a property named "md". The statement populates the last MarketData event into a property named "md" every 10 seconds following the pattern definition, or populates a null value if no MarketData event has been encountered so far.
Aggregation functions may be used in the select clause of the subselect as this example outlines:
select * from MarketData where price > (select max(price) from MarketData(symbol='GOOG').std:lastevent())
As the sub-select expression is evaluated first, the query above actually never fires for the GOOG symbol, only for other symbols that have a price higher then the current maximum for GOOG. As a sidenote, the insert into clause can also be handy to compute aggregation results for use in multiple subqueries.
The following restrictions apply to subqueries:
The subquery stream definition must define a data window or other view to limit subquery results, reducing the number of events held for subquery execution
Subqueries can only consist of a select clause, a from clause and a where clause. The group by and having clauses, as well as joins, outer-joins and output rate limiting are not permitted within subqueries.
If using aggregation functions in a subquery, note these limitations:
None of the properties of the correlated stream(s) can be used within aggregation functions.
The properties of the subselect stream must all be within aggregation functions.
The where clause cannot be used to correlate between the subselect stream and the enclosing stream, since the engine would otherwise be forced to re-evaluate the aggregation considering all events in the subselect-stream data window, which would likely be a very expensive operation.
Performance of your statement containing one or more subqueries principally depends on two parameters. First, if your subquery correlates one or more columns in the subquery stream with the enclosing statement's streams via equals '=', the engine automatically builds the appropriate indexes for fast row retrieval based on the key values correlated (joined). The second parameter is the number of rows found in the subquery stream and the complexity of the filter criteria (where clause), as each row in the subquery stream must evaluate against the where clause filter.
The exists condition is considered "to be met" if the subquery returns at least one row. The not exists condition is considered true if the subquery returns no rows.
Let's take a look at a simple example. The following is an EPL statement that uses the exists condition:
select assetId from RFIDEvent as RFID where exists (select * from Asset.std:unique(assetId) where assetId = RFID.assetId)
This select statement will return all RFID events where there is at least one event in Assets unique by asset id with the same asset id.
The in subquery condition is true if the value of an expression matches one or more of the values returned by the subquery. Consequently, the not in condition is true if the value of an expression matches none of the values returned by the subquery.
The next statement demonstrates the use of the in subquery condition:
select assetId from RFIDEvent as RFID where zone in (select zone from ZoneUpdate.win:time(10 min) where status = 'closed' )
The above statement demonstrated the in subquery to select RFID events for which the zone status is in a closed state.
This chapter outlines how reference data and historical data that are stored in a relational database can be queried via SQL within EPL statements.
Esper can join and outer join all types of event streams to stored data. In order for such data sources to become accessible to Esper, some configuration is required. The Section 10.4.8, “Relational Database Access” explains the required configuration for database access in greater detail, and includes information on configuring a query result cache.
Esper does not parse of otherwise inspect your SQL query. Therefore your SQL can make use of any database-specific SQL language extensions or features that your database provides.
If you have enabled query result caching in your Esper database configuration, Esper retains SQL query results in cache following the configured cache eviction policy.
Also if you have enabled query result caching in your Esper database configuration and provide EPL where clause and/or on clause (outer join) expressions, then Esper builds indexes on the SQL query results to enable fast lookup. This is especially useful if your queries return a large number of rows. For building the proper indexes, Esper inspects the expression found in your EPL query where clause, if present. For outer joins, Esper also inspects your EPL query on clause. Esper analyzes the EPL on clause and where clause expressions, if present, looking for property comparison with or without logical AND-relationships between properties. When a SQL query returns rows for caching, Esper builds the appropriate index and lookup strategies for fast row matching against indexes.
The following restrictions currently apply:
Only one event stream and one SQL query can be joined; Joins of two or more event streams with an SQL query are not yet supported.
Sub-views on an SQL query are not allowed; That is, one cannot create a time or length window on an SQL query. However one can use the insert into syntax to make join results available to a further statement.
Your database software must support JDBC prepared statements that provide statement meta data at compilation time. Most major databases provide this function. A workaround is available for databases that do not provide this function.
JDBC drivers must support the getMetadata feature. A workaround is available as below for JDBC drivers that don't support getting metadata.
The next sections assume basic knowledge of SQL (Structured Query Language).
To join an event stream against stored data, specify the sql keyword followed by the name of the database and a parameterized SQL query. The syntax to use in the from clause of an EPL statement is:
sql:database_name [" parameterized_sql_query "]
The engine uses the database_name identifier to obtain configuration information in order to establish a database connection, as well as settings that control connection creation and removal. Please see Section 10.4.8, “Relational Database Access” to configure an engine for database access.
Following the database name is the SQL query to execute. The SQL query can contain one or more substitution parameters. The SQL query string is placed in single brackets [ and ]. The SQL query can be placed in either single quotes (') or double quotes ("). The SQL query grammer is passed to your database software unchanged, allowing you to write any SQL query syntax that your database understands, including stored procedure calls.
Substitution parameters in the SQL query string take the form ${event_property_name}. The engine resolves event_property_name at statement execution time to the actual event property value supplied by the events in the joined event stream.
The engine determines the type of the SQL query output columns by means of the result set metadata that your database software returns for the statement. The actual query results are obtained via the getObject on java.sql.ResultSet.
The sample EPL statement below joins an event stream consisting of CustomerCallEvent events with the results of an SQL query against the database named MyCustomerDB and table Customer:
select custId, cust_name from CustomerCallEvent, sql:MyCustomerDB [' select cust_name from Customer where cust_id = ${custId} ']
The example above assumes that CustomerCallEvent supplies an event property named custId. The SQL query selects the customer name from the Customer table. The where clause in the SQL matches the Customer table column cust_id with the value of custId in each CustomerCallEvent event. The engine executes the SQL query for each new CustomerCallEvent encountered.
If the SQL query returns no rows for a given customer id, the engine generates no output event. Else the engine generates one output event for each row returned by the SQL query. An outer join as described in the next section can be used to control whether the engine should generate output events even when the SQL query returns no rows.
The next example adds a time window of 30 seconds to the event stream CustomerCallEvent. It also renames the selected properties to customerName and customerId to demonstrate how the naming of columns in an SQL query can be used in the select clause in the EPL query. And the example uses explicit stream names via the as keyword.
select customerId, customerName from CustomerCallEvent.win:time(30 sec) as cce, sql:MyCustomerDB ["select cust_id as customerId, cust_name as customerName from Customer where cust_id = ${cce.custId}"] as cq
Any window, such as the time window, generates insert stream (istream) events as events enter the window, and remove stream (rstream) events as events leave the window. The engine executes the given SQL query for each CustomerCallEvent in both the insert stream and the remove stream. As a performance optimization, the istream or rstream keywords in the select clause can be used to instruct the engine to only join insert stream or remove stream events, reducing the number of SQL query executions.
Consider using the EPL where clause to join the SQL query result to your event stream. Similar to EPL joins and outer-joins that join event streams or patterns, the EPL where clause provides join criteria between the SQL query results and the event stream (as a side note, an SQL where clause is a filter of rows executed by your database on your database server before returning SQL query results).
Esper analyzes the expression in the EPL where clause, if present, and builds the appropriate indexes from that information at runtime, to ensure fast matching of event stream events to SQL query results, even if your SQL query returns a large number of rows. Your applications must ensure to configure a cache for your database using Esper configuration, as such indexes are held with regular data in a cache. If you application does not enable caching of SQL query results, the engine does not build indexes on cached data.
The sample EPL statement below joins an event stream consisting of OrderEvent events with the results of an SQL query against the database named MyRefDB and table SymbolReference:
select symbol, symbolDesc from OrderEvent as orders, sql:MyRefDB ['select symbolDesc from SymbolReference'] as reference where reference.symbol = orders.symbol
Notice how the EPL where clause joins the OrderEvent stream to the SymbolReference table. In this example, the SQL query itself does not have a SQL where clause and therefore returns all rows from table SymbolReference.
If your application enables caching, the SQL query fires only at the arrival of the first OrderEvent event. When the second OrderEvent arrives, the join execution uses the cached query result. If the caching policy that you specified in the Esper database configuration evicts the SQL query result from cache, then the engine fires the SQL query again to obtain a new result and places the result in cache.
If SQL result caching is enabled and your EPL where clause, as show in the above example, provides the properties to join, then the engine indexes the SQL query results in cache and retains the index together with the query result in cache. Thus your application can benefit from high performance index-based lookups as long as the SQL query results are found in cache.
You can use outer joins to join data obtained from an SQL query and control when an event is produced. Use a left outer join, such as in the next statement, if you need an output event for each event regardless of whether or not the SQL query returns rows. If the SQL query returns no rows, the join result populates null values into the selected properties.
select custId, custName from CustomerCallEvent as cce left outer join sql:MyCustomerDB ["select cust_id, cust_name as custName from Customer where cust_id = ${cce.custId}"] as cq on cce.custId = cq.cust_id
The statement above always generates at least one output event for each CustomerCallEvent, containing all columns selected by the SQL query, even if the SQL query does not return any rows. Note the on expression that is required for outer joins. The on acts as an additional filter to rows returned by the SQL query.
Pattern statements and SQL queries can also be applied together in useful ways. One such use is to poll or request data from a database at regular intervals. The next statement is an example that shows a pattern that fires every 5 seconds to query the NewOrder table for new orders:
insert into NewOrders select orderId, orderAmount from pattern [every timer:interval(5 sec)], sql:MyCustomerDB ['select orderId, orderAmount from NewOrders']
The engine translates SQL queries into JDBC java.sql.PreparedStatement statements by replacing ${name} parameters with '?' placeholders. It obtains name and type of result columns from the compiled PreparedStatement meta data when the EPL statement is created.
The engine supplies parameters to the compiled statement via the setObject method on PreparedStatement. The engine uses the getObject method on the compiled statement PreparedStatement to obtain column values.
Certain JDBC database drivers are known to not return metadata for precompiled prepared SQL statements. This can be a problem as metadata is required by Esper. Esper obtains SQL result set metadata to validate an EPL statement and to provide column types for output events. JDBC drivers that do not provide metadata for precompiled SQL statements require a workaround. Such drivers do generally provide metadata for executed SQL statements, however do not provide the metadata for precompiled SQL statements.
Please consult the Chapter 10, Configuration for the configuration options available in relation to metadata retrieval.
To obtain metadata for an SQL statement, Esper can alternatively fire a SQL statement which returns the same column names and types as the actual SQL statement but without returning any rows. This kind of SQL statement is referred to as a sample statement in below workaround description. The engine can then use the sample SQL statement to retrieve metadata for the column names and types returned by the actual SQL statement.
Applications can provide a sample SQL statement to retrieve metadata via the metadatasql keyword:
sql:database_name ["parameterized_sql_query" metadatasql "sql_meta_query"]
The sql_meta_query must be an SQL statement that returns the same number of columns, the same type of columns and the same column names as the parameterized_sql_query, and does not return any rows.
Alternatively, applications can choose not to provide an explicit sample SQL statement. If the EPL statement does not use the metadatasql syntax, the engine applies lexical analysis to the SQL statement. From the lexical analysis Esper generates a sample SQL statement adding a restrictive clause "where 1=0" to the SQL statement.
Alternatively, applications can add the following tag to the SQL statement: ${$ESPER-SAMPLE-WHERE}. If the tag exists in the SQL statement, the engine does not perform lexical analysis and simply replaces the tag with the SQL where clause "where 1=0". Therefore this workaround is applicable to SQL statements that cannot be correctly lexically analyzed. The SQL text after the placeholder is not part of the sample query. For example:
select mycol from sql:myDB [ 'select mycol from mytesttable ${$ESPER-SAMPLE-WHERE} where ....'], ...
Your application may need to join data that originates from a web service, a distributed cache, an object-oriented database or simply data held in memory by your application. Esper accommodates this need by allowing a method invocation (or procedure call or function) in the from clause of a statement.
Esper can join and outer join all types of event streams to the data returned by your method invocation. In addition, Esper can be configured to cache the data returned by your method invocations.
The following restrictions currently apply:
Only one event stream and one method invocation can be joined; That is, in a join with a method invocation only one other event stream is allowed.
Sub-views on a method invocations are not allowed; That is, one cannot create a time or length window on a method invocation. However one can use the insert into syntax to make join results available to a further statement.
The syntax for a method invocation in the from clause of an EPL statement is:
method:class_name.method_name[(parameter_expressions)]
The method keyword denotes a method invocation. It is followed by a class name and a method name separated by a dot (.) character. If you have parameters to your method invocation, these are placed in round brackets after the method name. Any expression is allowed as a parameter, and individual parameter expressions are separated by a comma. Expressions may also use event properties of the joined stream.
In the sample join statement shown next, the method 'lookupAsset' provided by class 'MyLookupLib' returns one or more rows based on the asset id (a property of the AssetMoveEvent) that is passed to the method:
select * from AssetMoveEvent, method:MyLookupLib.lookupAsset(assetId)
The following statement demonstrates the use of the where clause to join events to the rows returned by a method invocation, which in this example does not take parameters:
select assetId, assetDesc from AssetMoveEvent as asset, method:MyLookupLib.getAssetDescriptions() as desc where asset.assetid = desc.assetid
Your method invocation may return zero, one or many rows for each method invocation. If you have caching enabled through configuration, then Esper can avoid the method invocation and instead use cached results. Similar to SQL joins, Esper also indexes cached result rows such that join operations based on the where clause can be very efficient, especially if your method invocation returns a large number of rows.
If the time taken by method invocations is critical to your application, you may configure local caches as Section 10.4.6, “Cache Settings for Method Invocations” describes.
Your application must provide a Java class that exposes a public static method. The method must accept the same number and type of parameters as listed in the parameter expression list.
If your method invocation returns either no row or only one row, then the return type of the method can be a Java class or a java.util.Map. If your method invocation can return more then one row, then the return type of the method must be an array of Java class or an array of Map.
If you are using a Java class or an array of Java class as the return type, then the class must adhere to JavaBean conventions: it must expose properties through getter methods.
If you are using java.util.Map as the return type or an array of Map, then the map should have String-type keys and object values (Map<String, Object>). When using Map as the return type, your application must provide a second method that returns property metadata, as the next section outlines.
Your application method must return either of the following:
A null value or an empty array to indicate an empty result (no rows).
A Java object or Map to indicate a one-row result, or an array that consists of a single Java object or Map.
An array of Java objects or Map instances to return multiple result rows.
As an example, consider the method 'getAssetDescriptions' provided by class 'MyLookupLib' as discussed earlier:
select assetId, assetDesc from AssetMoveEvent as asset, method:com.mypackage.MyLookupLib.getAssetDescriptions() as desc where asset.assetid = desc.assetid
The 'getAssetDescriptions' method may return multiple rows and is therefore declared to return an array of the class 'AssetDesc'. The class AssetDesc is a POJO class (not shown here):
public class MyLookupLib { ... public static AssetDesc[] getAssetDescriptions() { ... return new AssetDesc[] {...}; }
The example above specifies the full Java class name of the class 'MyLookupLib' class in the EPL statement. The package name does not need to be part of the EPL if your application imports the package using the auto-import configuration through the API or XML, as outlined in Section 10.4.5, “Class and package imports”.
Your application may return java.util.Map or an array of Map from method invocations. If doing so, your application must provide metadata about each row: it must declare the property name and property type of each Map entry of a row. This information allows the engine to perform type checking of expressions used within the statement.
You declare the property names and types of each row by providing a method that returns property metadata. The metadata method must follow these conventions:
The method name providing the property metadata must have same method name appended by the literal Metadata.
The method must have an empty parameter list and must be declared public and static.
The method providing the metadata must return a Map of String property name keys and java.lang.Class property name types (Map<String, Class>).
In the following example, a class 'MyLookupLib' provides a method to return historical data based on asset id and asset code:
select assetId, location, x_coord, y_coord from AssetMoveEvent as asset, method:com.mypackage.MyLookupLib.getAssetHistory(assetId, assetCode) as history
A sample implementation of the class 'MyLookupLib' is shown below.
public class MyLookupLib { ... // For each column in a row, provide the property name and type // public static Map<String, Class> getAssetHistoryMetadata() { Map<String, Class> propertyNames = new HashMap<String, Class>(); propertyNames.put("location", String.class); propertyNames.put("x_coord", Integer.class); propertyNames.put("y_coord", Integer.class); return propertyNames; } ... // Lookup rows based on assetId and assetCode // public static Map<String, Object>[] getAssetHistory(String assetId, String assetCode) { Map rows = new Map[2]; // this sample returns 2 rows for (int i = 0; i < 2; i++) { rows[i] = new HashMap(); rows[i].put("location", "somevalue"); rows[i].put("x_coord", 100); // ... set more values for each row } return rows; }
In the example above, the 'getAssetHistoryMetadata' method provides the property metadata: the names and types of properties in each row. The engine calls this method once per statement to determine event typing information.
The 'getAssetHistory' method returns an array of Map objects that are two rows. The implementation shown above is a simple example. The parameters to the method are the assetId and assetCode properties of the AssetMoveEvent joined to the method. The engine calls this method for each insert and remove stream event in AssetMoveEvent.
To indicate that no rows are found in a join, your application method may return either a null value or an array of size zero.
A named window is a global data window that can take part in many statement queries, and that can be inserted-into and deleted-from by multiple statements.
The create window clause declares a new named window. The named window starts up empty. Events must be inserted into the named window using the insert into clause. Events can also be deleted from a named window via the on delete clause.
Events enter the named window by means of insert into clause of a select statement. Events leave a named window either because the expiry policy of the declared data window removes events from the named window, or through statements that use the on delete clause to explicitly delete from a named window.
To query a named window, simply use the window name in the from clause of your statement, including statements that contain subqueries, joins and outer-joins.
The create window statement creates a named window by specifying a window name and one or more data window views, as well as the type of event to hold in the named window.
The syntax for creating a named window is as follows:
create window window_name.view_specifications as [select list_of_properties from] event_type
The window_name you assign to the named window can be any identifier. The name should not already be in use as an event type or stream name.
The view_specifications are one or more data window views that define the expiry policy for removing events from the data window. Named windows must explicitly declare a data window view. This is required to ensure that the policy for retaining events in the data window is well defined. To keep all events, use the keep-all view: It indicates that the named window should keep all events and only remove events from the named window that are deleted via the on delete clause. The view specification can only list data window views, derived-value views are not allowed since these don't represent an expiry policy.
The select clause and list_of_properties are optional. If present, they specify the column names and, implicitly by definition of the event type, the column types of events held by the named window. Expressions other then column names are not allowed in the select list of properties. Wildcards (*) and wildcards with additional properties can also be used.
Finally, the event_type is required and provides the name of the event type of events held in the data window, unless column names and types have been explicitly selected via select.
The next statement creates a named window 'AllOrdersNamedWindow' for which the expiry policy is simply to keep all events. Assume that the event type 'OrderMapEventType' has been configured. The named window is to hold events of type 'OrderMapEventType':
create window AllOrdersNamedWindow.win:keepall() as OrderMapEventType
The below sample statement demonstrates the select syntax. It defines a named window in which each row has the three properties 'symbol', 'volume' and 'price'. This named window actively removes events from the window that are older then 30 seconds.
create window OrdersTimeWindow.win:time(30 sec) as select symbol, volume, price from OrderEvent
In an alternate form, the as keyword can be used to rename columns:
create window OrdersTimeWindow.win:time(30 sec) as select symbol as sym, volume as vol, price from OrderEvent
A new named window starts up empty. It must be explicitly inserted into by one or more statements, as discussed below.
If your application stops or destroys the statement that creates the named window, any consuming statements no longer receive insert or remove stream events. The named window can also not be deleted from after it was stopped or destroyed.
The create window statement posts to listeners any events that are inserted into the named window as new data. The statement posts all deleted events or events that expire out of the data window to listeners as the remove stream (old data). The named window contents can also be iterated on via the pull API to obtain the current contents of a named window.
An on delete clause removes events from a named window. The clause can be used to remove all events, or only events that match certain criteria, or events that correlate with an arriving event or a pattern of arriving events.
The syntax for the on delete clause is as follows:
on event_type[(filter_criteria)] [as alias_name] delete from window_name [as alias_name] [where criteria_expression]
The event_type is the name or alias of the type of events that trigger removal from the named window. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as keyword can be used to assign an alias for use in the where clause. Patterns can also be specified in the on clause as described in the next section.
The window_name is the name of the named window to delete events from. The as keyword is also available to assign an alias to the named window.
The optional where clause contains a criteria_expression that correlates the arriving (triggering) event to the events to be removed from the named window. The criteria_expression may also simply filter for events in the named window to be removed from the named window.
The iterator of the EPStatement object representing the on delete clause can also be helpful: It returns the last batch of deleted events in response to the last triggering event, in any order, or null if the last triggering event did not remove any rows.
Let's look at a couple of examples. In the simplest form, this statement deletes all events from the named window 'AllOrdersNamedWindow' when any 'FlushOrderEvent' arrives:
on FlushOrderEvent delete from AllOrdersNamedWindow
This example adds a where clause to the example above. Upon arrival of a triggering 'ZeroVolumeEvent', the statement removes from the named window any orders that have a volume of zero or less:
on ZeroVolumeEvent delete from AllOrdersNamedWindow where volume <= 0
The next example shows a more complete use of the syntax, and correlates the triggering event with events held by the named window:
on NewOrderEvent(volume>0) as myNewOrders delete from AllOrdersNamedWindow as myNamedWindow where myNamedWindow.symbol = myNewOrders.symbol
In the above sample statement, only if a 'NewOrderEvent' event with a volume greater then zero arrives does the statement trigger. Upon triggering, all events in the named window that have the same value for the symbol property as the triggering 'NewOrderEvent' event are then removed from the named window. The statement also showcases the as keyword to assign alias names for use in the where expression.
For correlated queries (as above) that correlate triggering events with events held by a named window, Esper internally creates efficient indexes to enable high performance removal of events especially from named windows that hold large numbers of events.
Your application can subscribe a listener to your on delete statements to determine removed events. The statement post any events that are deleted from a named window to all listeners attached to the statement as new data. Upon iteration, the statement provides the last deleted event, if any.
By means of patterns the on delete clause and on select clause (described below) can look for more complex conditions to occur, possibly involving multiple events or the passing of time. The syntax for on delete with a pattern expression is show next:
on pattern [pattern_expression] [as alias_name] delete from window_name [as alias_name] [where criteria_expression]
The pattern_expression is any pattern that matches zero or more arriving events. Tags can be used to name events in the pattern and can occur in the optional where clause to correlate to events to be removed from a named window.
In the next example the triggering pattern fires every 10 seconds. The effect is that every 10 seconds the statement removes from 'MyNamedWindow' all rows:
on pattern [every timer:interval(10 sec)] delete from MyNamedWindow
The following example shows the use of tags in a pattern:
on pattern [every ord=OrderEvent(volume>0) or every flush=FlushOrderEvent] delete from OrderWindow as win where ord.id = win.id or flush.id = win.id
The pattern above looks for OrderEvent events with a volume value greater then zero and tags such events as 'ord'. The pattern also looks for FlushOrderEvent events and tags such events as 'flush'. The where clause deletes from the 'OrderWindow' named window any events that match in the value of the 'id' property either of the arriving events.
The insert into clause inserts events into named windows. Your application must ensure that the column names and types match the declared column names and types of the named window to be inserted into.
In this example we first create a named window using some of the columns of an OrderEvent event type:
create window OrdersWindow.win:keepall() as select symbol, volume, price from OrderEvent
The insert into the named window selects individual columns to be inserted:
insert into OrdersWindow(symbol, volume, price) select name, count, price from FXOrderEvent // .. alternative form... insert into OrdersWindow select name as symbol, vol as volume, price from FXOrderEvent
Following above statement, the engine enters every FXOrderEvent arriving into the engine into the named window 'OrdersWindow'.
The following EPL creates a named window for an event type backed by a Java class, and inserts into the window any 'OrderEvent' where the symbol value is IBM:
create window OrdersWindow as com.mycompany.OrderEvent insert into OrdersWindow select * from com.mycompany.OrderEvent(symbol='IBM')
The last example adds one column named 'derivedPrice' to the 'OrderEvent' type by specifying a wildcard, and uses a user-defined function to populate the column:
create window OrdersWindow as select *, price as derivedPrice from OrderEvent insert into OrdersWindow select *, MyFunc.func(price, percent) as derivedPrice from OrderEvent
Event representations based on Java base classes or interfaces, and subclasses or implementing classes, are compatible as these statements show:
// create a named window for the base class create window OrdersWindow as select * from ProductBaseEvent // The ServiceProductEvent class subclasses the ProductBaseEvent class insert into OrdersWindow select * from ServiceProductEvent // The MerchandiseProductEvent class subclasses the ProductBaseEvent class insert into OrdersWindow select * from MerchandiseProductEvent
To avoid duplicate events stored in a named window, use a subquery to test whether an event already exists in the named window:
insert into OrdersWindow select * from ServiceProductEvent as spe where not exists (select * from OrdersWindow as win where win.id = spe.id)
A statement that removes events from a named window via the on delete clause and a statement that inserts events into a named window via the insert into can be combined to replace events in the named window, by creating the two statements in the order as indicated by the sample:
// create in this order on ServiceProductEvent as spe delete from OrdersWindow as win where win.id = spe.id insert into OrdersWindow select * from ServiceProductEvent
A named window can be referred to by any statement in the from clause of the statement. Filter criteria can also be specified. Additional views may be used onto named windows however such views cannot include data window views.
A statement selecting all events from a named window 'AllOrdersNamedWindow' is shown next. The named window must first be created via the create window clause before use.
select * from AllOrdersNamedWindow
The statement as above simply receives the unfiltered insert and remove stream of the named window and reports that stream to its listeners. An iterator on such statement returns all events in the named window, if any.
The next statement derives an average price per symbol from all events posted by a named window:
select symbol, avg(price) from AllOrdersNamedWindow group by symbol
Your application may create a consuming statement such as above on an empty named window, or your application may create the above statement on an already filled named window. The engine provides correct results in either case: At the time of statement creation the Esper engine internally initializes the consuming statement from the current named window, also taking your declared filters into consideration. Thus, your statement deriving data from a named window does not start empty if the named window already holds one or more events.
If you require a subset of the data in the named window, you can specify one or more filter expressions onto the named window as shown here:
select symbol, avg(price) from AllOrdersNamedWindow(sector='energy') group by symbol
By adding a filter to the named window, the aggregation and grouping as well as any views that may be declared onto to the named window receive a filtered insert and remove stream. The above statement thus outputs, continuously, the average price per symbol for all orders in the named window that belong to a certain sector.
A side note on variables in filters filtering events from named windows: The engine initializes consuming statements at statement creation time and changes aggregation state continuously as events arrive. If the filter criteria contain variables and variable values changes, then the engine does not re-evaluate or re-build aggregation state. In such a case you may want to place variables in the having clause which evaluates on already-built aggregation state.
The following example further declares a view into the named window. Such a view can be a plug-in view or one of the built-in views, but cannot be a data window view (with the exception of the group-by view which is allowed).
select * from AllOrdersNamedWindow(volume>0, price>0).mycompany:mypluginview()
Data window views cannot be used onto named windows since named windows post insert and remove streams for the events entering and leaving the named window, thus the expiry policy and batch behavior are well defined by the data window declared for the named window. For example, the following is not allowed and fails at time of statement creation:
// not a valid statement select * from AllOrdersNamedWindow.win:time(30 sec)
The on select clause performs a one-time, non-continuous query on a named window every time a triggering event arrives or a triggering pattern matches. The query can consider all events in the named window, or only events that match certain criteria, or events that correlate with an arriving event or a pattern of arriving events.
The syntax for the on select clause is as follows:
on event_type[(filter_criteria)] [as alias_name] [insert into insert_into_def] select select_list from window_name [as alias_name] [where criteria_expression] [group by grouping_expression_list] [having grouping_search_conditions] [order by order_by_expression_list]
The event_type is the name or alias of the type of events that trigger the query against the named window. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as keyword can be used to assign an alias. Patterns can also be specified in the on clause, see the samples in Section 4.16.2.1, “Using Patterns in the On Delete Clause”.
The insert into clause works as described in Section 4.9, “Merging Streams and Continuous Insertion: the Insert Into Clause”. The select clause is described in Section 4.3, “Choosing Event Properties And Events: the Select Clause”. For all clauses the semantics are equivalent to a join operation: The properties of the triggering event or events are available in the select clause and all other clauses.
The window_name in the from clause is the name of the named window to select events from. The as keyword is also available to assign an alias to the named window. The as keyword is helpful in conjunction with wildcard in the select clause to select named window events via the syntax select alias.* .
The optional where clause contains a criteria_expression that correlates the arriving (triggering) event to the events to be considered from the named window. The criteria_expression may also simply filter for events in the named window to be considered by the query.
The group by clause, the having clause and the order by clause are all optional and work as described in earlier chapters.
The similarities and differences between an on select clause and a regular or outer join are as follows:
A join is evaluated when any of the streams participating in the join have new events (insert stream) or events leaving data windows (remove stream). A join is therefore bi-directional or multi-directional. However, the on select statement has one triggering event or pattern that causes the query to be evaluated and is thus uni-directional.
The query within the on select statement is not continuous: It executes only when a triggering event or pattern occurs. Aggregation and groups are computed anew considering the contents of the named window at the time the triggering event arrives.
The iterator of the EPStatement object representing the on select clause returns the last batch of selected events in response to the last triggering event, or null if the last triggering event did not select any rows.
For correlated queries that correlate triggering events with events held by a named window, Esper internally creates efficient indexes to enable high performance querying of events. It analyzes the where clause to build one or more indexes for fast lookup in the named window based on the properties of the triggering event.
The next statement demonstrates the concept. Upon arrival of a QueryEvent event the statement selects all events in the 'OrdersNamedWindow' named window:
on QueryEvent select win.* from OrdersNamedWindow as win
The engine executes the query on arrival of a triggering event, in this case a QueryEvent. It posts the query results to any listeners to the statement, in a single invocation, as the new data array. By prefixing the wildcard (*) selector with the stream name, the select clause returns only events of the named window and does not also return triggering events.
The where clause filters and correlates events in the named window with the triggering event, as shown next:
on QueryEvent(volume>0) as query select query.symbol, query.volume, win.symbol from OrdersNamedWindow as win where win.symbol = query.symbol
Upon arrival of a QueryEvent, if that event has a value for the volume property that is greater then zero, the engine executes the query. The query considers all events currently held by the 'OrdersNamedWindow' that match the symbol property value of the triggering QueryEvent event. The engine then posts query results to the statement's listeners.
Aggregation, grouping and ordering of results are possible as this example shows:
on QueryEvent as queryEvent select symbol, sum(volume) from OrdersNamedWindow as win group by symbol having volume > 0 order by symbol
The above statement outputs the total volume per symbol for those groups where the sum of the volume is greater then zero, ordered by symbol ascending. The engine computes and posts the output based on the current contents of the 'OrdersNamedWindow' named window considering all events in the named window, since the query does not have a where clause.
The on insert clause is an on select clause as described in the prior chapter with the addition of an insert into clause.
Similar to the on select clause, the engine executes the query when a triggering event arrives. It then provides the query results as an event stream to further statements. It populates the event stream that is named in the insert into clause.
The statement below provides the query results to any consumers of the MyOrderStream, upon arrival of a QueryEvent event:
on QueryEvent as query insert into MyOrderStream select win.* from OrdersNamedWindow as win
Here is a sample consuming statement of the MyOrderStream. The statement further filters the events provided by the on insert statement by user id and reports a total of volume per symbol:
select symbol, sum(volume) from MyOrderStream(userId='user1') group by symbol
As outlined in Section 2.8, “Updating and Versioning Events”, revision event types process updates or new versions of events held by a named window.
A revision event type is simply one or more existing event types whose events are related by event properties that provide same key values. The purpose of key values is to indicate that arriving events are related: An event amends, updates or adds properties to an earlier event that shares the same key values.
Revision event types can be useful in these situations:
Some of your events carry only partial information that is related to a prior event.
Events arrive that add additional properties or change existing properties of prior events.
Events may carry properties that have null values or properties that do no exist (for example events backed by Map or XML), and for such properties the earlier value must be used instead.
To better illustrate, consider a revision event type that represents events for creation and updates to user profiles. Lets assume the user profile creation events carry the user id and a full profile. The profile update events indicate only the user id and the individual properties that actually changed. The user id property shall serve as a key value relating profile creation events and update events.
A revision event type must be configured to instruct the engine which event types participate and what their key properties are. Configuration is described in Section 10.4.16, “Revision Event Type” and is not shown here.
Assume that an alias UserProfileRevisions has been configured to hold profile events, i.e. creation and update events related by user id. This statement creates a named window to hold the last 1 hour of current profiles per user id:
create window UserProfileWindow.win:time(1 hour) select * from UserProfileRevisions insert into UserProfileWindow select * from UserProfileCreation insert into UserProfileWindow select * from UserProfileUpdate
In revision event types, the term base event is used to describe events that are subject to update. Events that update, amend or add additional properties to base events are termed delta events. In the example, base events are profile creation events and delta events are profile update events.
Base events are expected to arrive before delta events. In the case where a delta event arrives and is not related by key value to a base event or a revision of the base event currently held by the named window the engine ignores the delta event. Thus, considering the example, profile update events for a user id that does not have an existing profile in the named window are not applied.
When a base or delta event arrives, the insert and remove stream output by the named window are the current and the prior version of the event. Let's come back to the example. As creation events arrive that are followed by update events or more creation events for the same user id, the engine posts the current version of the profile as insert stream (new data) and the prior version of the profile as remove stream (old data).
Base events are also implicitly delta events. That is, if multiple base events of the same key property values arrive, then each base event provides a new version. In the example, if multiple profile creation events arrive for the same user id then new versions of the current profile for that user id are output by the engine for each base event, as it does for delta events.
The expiry policy as specified by view definitions applies to each distinct key value, or multiple distinct key values for composite keys. An expiry policy re-evaluates when new versions arrive. In the example, user profile events expire from the time window when no creation or update event for a given user id has been received for 1 hour.
Several strategies are available for merging or overlaying events as the configuration chapter describes in greater detail.
Any of the Map, XML and JavaBean event representations as well as plug-in event representations may participate in a revision event type. For example, profile creation events could be JavaBean events, while profile update events could be java.util.Map events.
Delta events may also add properties to the revision event type. For example, one could add a new event type with security information to the revision event type and such security-related properties become available on the resulting revision event type.
The following restrictions apply to revision event types:
Nested properties are only supported for the JavaBean event representation. Nested properties are not individually versioned; they are instead versioned by the containing property.
Dynamic, indexed and mapped properties are only supported for nested properties and not as properties of the revision event type itself.
A variable is a scalar value that is available for use in all statements including patterns. Variables can be used in an expression anywhere in a statement as well as in the output clause for output rate limiting.
Variables must first be declared or configured before use, by defining each variable's type and name. Variables can be created via the create variable syntax or declared by configuration. Variables can be assigned new values by using the on set syntax or via the setVariableValue methods on EPRuntime. The EPRuntime also provides method to read variable values.
The engine guarantees consistency and atomicity of variable reads and writes on a statement-level (this is a soft guarantee, see below). Variables are optimized for fast read access and are also multithread-safe.
The create variable syntax creates a new variable by defining the variable type and name. In alternative to the syntax, variables can also be declared in the runtime and engine configuration options.
The synopsis for creating a variable is as follows:
create variable variable_type variable_name [ = assignment_expression ]
The variable_type can be any of the following:
variable_type : string | char | character | bool | boolean | byte | short | int | integer | long | double | float
The variable_name is an identifier that names the variable. The variable name should not already be in use by another variable.
The assignment_expression is optional. Without an assignment expression the initial value for the variable is null. If present, it supplies the initial value for the variable.
The EPStatement object of the create variable statement provides access to variable values. The pull API methods iterator and safeIterator return the current variable value. Listeners to the create variable statement subscribe to changes in variable value: the engine posts new and old value of the variable to all listeners when the variable value is updated by an on set statement.
The example below creates a variable that provides a threshold value. The name of the variable is var_threshold and its type is long. The variable's initial value is null as no other value has been assigned:
create variable long var_threshold
This statement creates an integer-type variable named var_output_rate and initializes it to the value ten (10):
create variable integer var_output_rate = 10
In addition to creating a variable via the create variable syntax, the runtime and engine configuration API also allows adding variables. The next code snippet illustrates the use of the runtime configuration API to create a string-typed variable:
epService.getEPAdministrator().getConfiguration() .addVariable("myVar", String.class, "init value");
The on set statement assigns a new value to one or more variables when a triggering event arrives or a triggering pattern occurs. Use the setVariableValue methods on EPRuntime to assign variable values programmatically.
The synopsis for setting variable values is:
on event_type[(filter_criteria)] [as alias_name] set variable_name = expression [, variable_name = expression [,...]]
The event_type is the name or alias of the type of events that trigger the variable assignments. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as keyword can be used to assign an alias. Patterns can also be specified in the on clause.
The comma-separated list of variable names and expressions set the value of one or more variables. All new variable values are applied atomically: the changes to variable values by the on set statement become visible to other statements all at the same time. No changes are visible to other processing threads until the on set statement completed processing, and at that time all changes become visible at once.
The EPStatement object provides access to variable values. The pull API methods iterator and safeIterator return the current variable values for each of the variables set by the statement. Listeners to the statement subscribe to changes in variable values: the engine posts new variable values of all variables to any listeners.
In the following example, a variable by name var_output_rate has been declared previously. When a NewOutputRateEvent event arrives, the variable is updated to a new value supplied by the event property 'rate':
on NewOutputRateEvent set var_output_rate = rate
The next example shows two variables that are updated when a ThresholdUpdateEvent arrives:
on ThresholdUpdateEvent as t set var_threshold_lower = t.lower, var_threshold_higher = t.higher
The sample statement shown next counts the number of pattern matches using a variable. The pattern looks for OrderEvent events that are followed by CancelEvent events for the same order id within 10 seconds of the OrderEvent:
on pattern[every a=OrderEvent -> (CancelEvent(orderId=a.orderId) where timer:within(10 sec))] set var_counter = var_counter + 1
A variable name can be used in any expression and can also occur in an output rate limiting clause. This section presents examples and discusses performance, consistency and atomicity attributes of variables.
The next statement assumes that a variable named 'var_threshold' was created to hold a total price threshold value. The statement outputs an event when the total price for a symbol is greater then the current threshold value:
select symbol, sum(price) from TickEvent group by symbol having sum(price) > var_threshold
In this example we use a variable to dynamically change the output rate on-the-fly. The variable 'var_output_rate' holds the current rate at which the statement posts a current count to listeners:
select count(*) from TickEvent output every var_output_rate seconds
Variables are optimized towards high read frequency and lower write frequency. Variable reads do not incur locking overhead (99% of the time) while variable writes do incur locking overhead.
The engine softly guarantees consistency and atomicity of variables when your statement executes in response to an event or timer invocation. Variables acquire a stable value (implemented by versioning) when your statement starts executing in response to an event or timer invocation, and variables do not change value during execution. When one or more variable values are updated via on set statements, the changes to all updated variables become visible to statements as one unit and only when the on set statement completes successfully.
The atomicity and consistency guarantee is a soft guarantee. If any of your application statements, in response to an event or timer invocation, execute for a time interval longer then 15 seconds (default interval length), then the engine may use current variable values after 15 seconds passed, rather then then-current variable values at the time the statement started executing in response to an event or timer invocation.
The length of the time interval that variable values are held stable for the duration of execution of a given statement is by default 15 seconds, but can be configured via engine default settings.