The Event Query Language (EQL) is a SQL-like language with SELECT, FROM, WHERE, GROUP BY, HAVING and ORDER BY clauses. Streams replace tables as the source of data with events replacing rows as the basic unit of data. Since events are composed of data, the SQL concepts of correlation through joins, filtering and aggregation through grouping can be effectively leveraged. The INSERT INTO clause is recast as a means of forwarding events to other streams for further downstream processing. External data accessible through JDBC may be queried and joined with the stream data. Additional clauses such as the PATTERN and OUTPUT clauses are also available to provide the missing SQL language constructs specific to event processing.
EQL statements are used to derive and aggregate information from one or more streams of events, and to join or merge event streams. This section outlines EQL syntax. It also outlines the built-in views, which are the building blocks for deriving and aggregating information from event streams.
EQL statements contain definitions of one or more views. Similar to tables in a SQL statement, views define the data available for querying and filtering. Some views represent windows over a stream of events. Other views derive statistics from event properties, group events or handle unique event property values. Views can be staggered onto each other to build a chain of views. The Esper engine makes sure that views are reused among EQL statements for efficiency.
The built-in set of views is:
Data window views: win:length, win:length_batch, win:time, win:time_batch, win:time_length_batch, win:time_accum, win:ext_timed, ext:sort_window, ext:time_order, std:unique, std:groupby, std:lastevent
Views that derive statistics: std:size, stat:uni, stat:linest, stat:correl, stat:weighted_avg, stat:cube
EQL provides the concept of named window. Named windows are data windows that can be inserted-into and deleted-from by one or more statements, and that can queried by one or more statements. Named windows have a global character, being visible and shared across an engine instance beyond a single statement. Use the CREATE WINDOW clause to create named windows. Use the INSERT INTO clause to insert data into a named window, the ON DELETE clause to remove events from a named window, and the ON SELECT clause to perform a non-continuous fire-once query on a named window. Finally, the name of the named window can occur in a statement's FROM clause to query a named window or include the named window in a join or subquery.
Variables can come in handy to parameterize statements and change parameters on-the-fly and in response to events. Variables can be used in an expression anywhere in a statement as well as in the output clause for dynamic control of output rates.
Esper can be extended by plugging-in custom developed views and aggregation functions.
EQL queries are created and stored in the engine, and publish results to listeners as events are received by the engine or timer events occur that match the criteria specified in the query. Events can also be obtained from running EQL queries via the safeIterator and iterator methods that provide a pull-data API.
The select clause in an EQL query specifies the event properties or events to retrieve. The from clause in an EQL query specifies the event stream definitions and stream names to use. The where clause in an EQL query specifies search conditions that specify which event or event combination to search for. For example, the following statement returns the average price for IBM stock ticks in the last 30 seconds.
select avg(price) from StockTick.win:time(30 sec) where symbol='IBM'
EQL queries follow the below syntax. EQL queries can be simple queries or more complex queries. A simple select contains only a select clause and a single stream definition. Complex EQL queries can be build that feature a more elaborate select list utilizing expressions, may join multiple streams, may contain a where clause with search conditions and so on.
[insert into insert_into_def] select select_list from stream_def [as name] [, stream_def [as name]] [,...] [where search_conditions] [group by grouping_expression_list] [having grouping_search_conditions] [output output_specification] [order by order_by_expression_list]
Time-based windows as well as pattern observers and guards take a time period as a parameter. Time periods follow the syntax below.
time-period : [day-part] [hour-part] [minute-part] [seconds-part] [milliseconds-part] day-part : number ("days" | "day") hour-part : number ("hours" | "hour") minute-part : number ("minutes" | "minute" | "min") seconds-part : number ("seconds" | "second" | "sec") milliseconds-part : number ("milliseconds" | "millisecond" | "msec")
Some examples of time periods are:
10 seconds 10 minutes 30 seconds 20 sec 100 msec 1 day 2 hours 20 minutes 15 seconds 110 milliseconds 0.5 minutes
Comments can appear anywhere in the EQL or pattern statement text where whitespace is allowed. Comments can be written in two ways: slash-slash (// ...) comments and slash-star (/* ... */) comments.
Slash-slash comments extend to the end of the line:
// This comment extends to the end of the line. // Two forward slashes with no whitespace between them begin such comments. select * from MyEvent // this is a slash-slash comment // All of this text together is a valid statement.
Slash-star comments can span multiple lines:
/* This comment is a "slash-star" comment that spans multiple lines. * It begins with the slash-star sequence with no space between the '/' and '*' characters. * By convention, subsequent lines can begin with a star and are aligned, but this is * not required. */ select * from MyEvent /* this also works */
Comments styles can also be mixed:
select field1, // first comment /* second comment*/ field2 from MyEvent
The select clause is required in all EQL statements. The select clause can be used to select all properties via the wildcard *, or to specify a list of event properties and expressions. The select clause defines the event type (event property names and types) of the resulting events published by the statement, or pulled from the statement via the iterator methods.
The select clause also offers optional istream and rstream keywords to control how events are posted to UpdateListener instances listening to the statement.
The syntax for the select clause is summarized below.
select [rstream | istream] * | expression_list ...
The syntax for selecting all event properties in a stream is:
select * from stream_def
The following statement selects StockTick events for the last 30 seconds of IBM stock ticks.
select * from StockTick(symbol='IBM').win:time(30 sec)
The * wildcard and expressions can also be combined in a select clause. The combination selects all event properties and in addition the computed values as specified by any additional expressions that are part of the select clause. Here is an example that selects all properties of stock tick events plus a computed product of price and volume that the statement names 'pricevolume':
select *, price * volume as pricevolume from StockTick(symbol='IBM')
When using wildcard (*), Esper does not actually copy your event properties out of your event or events. It simply wraps your native type in an EventBean interface. Your application has access to the underlying event object through the getUnderlying method and has access to the property values through the get method.
In a join statement, using the select * syntax selects one event property per stream to hold the event for that stream. The property name is the stream alias name in the from clause.
To choose the particular event properties to return:
select event_property [, event_property] [, ...] from stream_def
The following statement selects the count and standard deviation properties for the last 100 events of IBM stock ticks for volume.
select count, stdev from StockTick(symbol='IBM').win:length(100).stat:uni('volume')
The select clause can contain one or more expressions.
select expression [, expression] [, ...] from stream_def
The following statement selects the volume multiplied by price for a time batch of the last 30 seconds of stock tick events.
select volume * price from StockTick.win:time_batch(30 sec)
Event properties and expressions can be renamed using below syntax.
select [event property | expression] as identifier [, ...]
The following statement selects volume multiplied by price and specifies the name volPrice for the event property.
select volume * price as volPrice from StockTick.win:length(100)
If your statement is joining multiple streams, your may specify property names that are unique among the joined streams, or use wildcard (*) as explained earlier.
In case the property name in your select or other clauses is not unique considering all joined streams, you will need to use the alias name of the stream as a prefix to the property.
This example is a join between the two streams StockTick and News, respectively named as 'tick' and 'news'. The example selects from the StockTick event the symbol value using the 'tick' stream alias as a prefix:
select tick.symbol from StockTick.win:time(10) as tick, News.win:time(10) as news
Use the wildcard (*) selector in a join to generate a property for each stream, with the property value being the event itself. The output events of the statement below have two properties: the 'tick' property holds the StockTick event and the 'news' property holds the News event:
select * from StockTick.win:time(10) as tick, News.win:time(10) as news
The following syntax can also be used to specify what stream's properties to select:
select stream_name.* [as alias] from ...
The selection of tick.* selects the StockTick stream events only:
select tick.* from StockTick.win:time(10) as tick, News.win:time(10) as news where tick.symbol = news.symbol
The next example uses the as keyword to name each stream's joined events. This instructs the engine to create a property for each named event:
select tick.* as stocktick, news.* as news from StockTick.win:time(10) as tick, News.win:time(10) as news where stock.symbol = news.symbol
The output events of the above example have two properties 'stocktick' and 'news' that are the StockTick and News events.
If your statement employs pattern expressions, then your pattern expression tags events with a tag name. Each tag name becomes available for use as a property in the select clause and all other clauses.
For example, here is a very simple pattern that matches on every StockTick event received within 30 seconds after start of the statement. The sample selects the symbol and price properties of the matching events:
select tick.symbol as symbol, tick.price as price from pattern[every tick=StockTick where timer:within(10 sec)]
The use of the wildcard selector, as shown in the next statement, creates a property for each tagged event in the output. The next statement outputs events that hold a single 'tick' property whose value is the event itself:
select * from pattern[every tick=StockTick where timer:within(10 sec)]
You may also select the matching event itself using the tick.* syntax. The engine outputs the StockTick event itself to listeners:
select tick.* from pattern[every tick=StockTick where timer:within(10 sec)]
The optional istream and rstream keywords in the select clause define the event stream posted to listeners to the statement.
If neither keyword is specified, the engine posts insert stream events via the newEvents parameter to the update method of UpdateListener instances listening to the statement. The engine posts remove stream events to the oldEvents parameter of the update method. The insert stream consists of the events entering the respective window(s) or stream(s) or aggregations, while the remove stream consists of the events leaving the respective window(s) or the changed aggregation result. See Chapter 3, Processing Model for more information on insert and remove streams.
By specifying the istream keyword you can instruct the engine to only post insert stream events via the newEvents parameter to the update method on listeners. The engine will then not post any remove stream events, and the oldEvents parameter is always a null value.
By specifying the rstream keyword you can instruct the engine to only post remove stream events via the newEvents parameter to the update method on listeners. The engine will then not post any insert stream events, and the oldEvents parameter is also always a null value.
The following statement selects only the events that are leaving the 30 second time window.
select rstream * from StockTick.win:time(30 sec)
The istream and rstream keywords in the select clause are matched by same-name keywords available in the insert into clause. While the keywords in the select clause control the event stream posted to listeners to the statement, the same keywords in the insert into clause specify the event stream that the engine makes available to other statements.
The from clause is required in all EQL statements. It specifies one or more event streams or named windows. Each event stream or named window can optionally be given a name by means of the as syntax.
from stream_def [as name] [, stream_def [as stream_name]] [, ...]
The event stream definition stream_def as shown in the syntax above can consists of either a filter-based event stream definition or a pattern-based event stream definition.
For joins and outer joins, specify two or more event streams. Joins between pattern-based and filter-based event streams are also supported.
Esper supports joins against relational databases for access to historical or reference data as explained in Section 4.13, “Joining Relational Data via SQL”. Esper can also join results returned by an arbitrary method invocation, as discussed in Section 4.14, “Joining Non-Relational Data via Method Invocation”.
The stream_name is an optional identifier assigned to the stream. The stream name can itself occur in any expression and provides access to the event itself from the named stream. Also, a stream name may be combined with a method name to invoke instance methods on events of that stream.
For filter-based event streams, the event stream definition stream_def as shown in the from clause syntax consists of an event type, optional filter expressions and an optional list of views that derive data from a stream. The syntax for a filter-based event stream is as below:
event_type ( [filter_criteria] ) [.view_spec] [.view_spec] [...]
The following EQL statement shows event type, filter criteria and views combined in one statement. It selects all event properties for the last 100 events of IBM stock ticks for volume. In the example, the event type is the fully qualified Java class name org.esper.example.StockTick. The expression filters for events where the property symbol has a value of "IBM". The optional view specifications for deriving data from the StockTick events are a length window and a view for computing statistics on volume. The name for the event stream is "volumeStats".
select * from org.esper.example.StockTick(symbol='IBM').win:length(100).stat:uni('volume') as volumeStats
Esper filters out events in an event stream as defined by filter criteria before it sends events to subsequent views. Thus, compared to search conditions in a where clause, filter criteria remove unneeded events early. In the above example, events with a symbol other then IBM do not enter the time window.
The simplest form of filter is a filter for events of a given type without any conditions on the event property values. This filter matches any event of that type regardless of the event's properties. The example below is such a filter.
select * from com.mypackage.myevents.RfidEvent
Instead of the fully-qualified Java class name any other event name can be mapped via Configuration to a Java class, making the resulting statement more readable:
select * from RfidEvent
Interfaces and superclasses are also supported as event types. In the below example IRfidReadable is an interface class.
select * from org.myorg.rfid.IRfidReadable
The filtering criteria to filter for events with certain event property values are placed within parenthesis after the event type name:
select * from RfidEvent(category="Perishable")
All expressions can be used in filters, including static methods that return a boolean value:
select * from com.mycompany.RfidEvent(MyRFIDLib.isInRange(x, y) or (x < 0 and y < 0))
Filter expressions can be separated via a single comma ','. The comma represents a logical AND between filter expressions:
select * from RfidEvent(zone=1, category=10) ...is equivalent to... select * from RfidEvent(zone=1 and category=10)
The following operators are highly optimized through indexing and are the preferred means of filtering in high-volume event streams:
equals =
not equals !=
comparison operators < , > , >=, <=
ranges
use the between keyword for a closed range where both endpoints are included
use the in keyword and round () or square brackets [] to control how endpoints are included
for inverted ranges use the not keyword and the between or in keywords
list-of-values checks using the in keyword or the not in keywords followed by a comma-separated list of values
At compile time as well as at run time, the engine scans new filter expressions for sub-expressions that can be indexed. Indexing filter values to match event properties of incoming events enables the engine to match incoming events faster. The above list of operators represents the set of operators that the engine can best convert into indexes. The use of comma or logical and in filter expressions does not impact optimizations by the engine.
Ranges come in the following 4 varieties. The use of round () or square [] bracket dictates whether an endpoint is included or excluded. The low point and the high-point of the range are separated by the colon : character.
Open ranges that contain neither endpoint (low:high)
Closed ranges that contain both endpoints [low:high]. The equivalent 'between' keyword also defines a closed range.
Half-open ranges that contain the low endpoint but not the high endpoint [low:high)
Half-closed ranges that contain the high endpoint but not the low endpoint (low:high]
The next statement shows a filter specifying a range for x and y values of RFID events. The range includes both endpoints therefore uses [] hard brackets.
mypackage.RfidEvent(x in [100:200], y in [0:100])
The between keyword is equivalent for closed ranges. The same filter using the between keyword is:
mypackage.RfidEvent(x between 100 and 200, y between 0 and 50)
The not keyword can be used to determine if a value falls outside a given range:
mypackage.RfidEvent(x not in [0:100])
The equivalent statement using the between keyword is:
mypackage.RfidEvent(x not between 0 and 100)
The in keyword for filter criteria determines if a given value matches any value in a list of values.
In this example we are interested in RFID events where the category matches any of the given values:
mypackage.RfidEvent(category in ('Perishable', 'Container'))
By using the not in keywords we can filter events with a property value that does not match any of the values in a list of values:
mypackage.RfidEvent(category not in ('Household', 'Electrical'))
The following restrictions apply to filter criteria:
Range and comparison operators require the event property to be of a numeric type.
Aggregation functions are not allowed within filter expressions.
The prev previous event function and the prior prior event function cannot be used in filter expressions.
Event pattern expressions can also be used to specify one or more event streams in an EQL statement. For pattern-based event streams, the event stream definition stream_def consists of the keyword pattern and a pattern expression in brackets []. The syntax for an event stream definition using a pattern expression is below. As in filter-based event streams, an optional list of views that derive data from the stream can be supplied.
pattern [pattern_expression] [.view_spec] [.view_spec] [...]
The next statement specifies an event stream that consists of both stock tick events and trade events. The example tags stock tick events with the name "tick" and trade events with the name "trade".
select * from pattern [every tick=StockTickEvent or every trade=TradeEvent]
This statement generates an event every time the engine receives either one of the event types. The generated events resemble a map with "tick" and "trade" keys. For stock tick events, the "tick" key value is the underlying stock tick event, and the "trade" key value is a null value. For trade events, the "trade" key value is the underlying trade event, and the "tick" key value is a null value.
Lets further refine this statement adding a view the gives us the last 30 seconds of either stock tick or trade events. Lets also select prices and a price total.
select tick.price as tickPrice, trade.price as tradePrice, sum(tick.price) + sum(trade.price) as total from pattern [every tick=StockTickEvent or every trade=TradeEvent].win:time(30 sec)
Note that in the statement above tickPrice and tradePrice can each be null values depending on the event processed. Therefore, an aggregation function such as sum(tick.price + trade.price)) would always return null values as either of the two price properties are always a null value for any event matching the pattern. Use the coalesce function to handle null values, for example: sum(coalesce(tick.price, 0) + coalesce(trade.price, 0)).
Views are used to derive or aggregate data. Views can be staggered onto each other. See the section Chapter 8, EQL Reference: Views on the views available.
Views can optionally take one or more parameters. These parameters can consist of primitive constants such as String, boolean or numeric types. Arrays are also supported as a view parameter types.
The below example serves to show views and staggering of views. It uses a car location event that contains information about the location of a car on a highway.
The first view std:groupby('carId') groups car location events by car id. The second view win:length(4) keeps a length window of the 4 last events, with one length window for each car id. The next view std:groupby({'expressway', 'direction', 'segment'}) groups each event by its expressway, direction and segment property values. Again, the grouping is done for each car id considering the last 4 events only. The last view std:size() is used to report the number of events. Thus the below example reports the number of events per car id and per expressway, direction and segment considering the last 4 events for each car id only.
select * from CarLocEvent.std:groupby('carId').win:length(4). std:groupby({'expressway', 'direction', 'segment'}).std:size()
Your from clause may assign a name to each stream. This assigned stream name can serve any of the following purposes.
First, the stream name can be used to disambiguate property names. The stream_name.property_name syntax uniquely identifies which property to select if property names overlap between streams. Here is an example:
select prod.productId, ord.productId from ProductEvent as prod, OrderEvent as ord
Second, the stream name can be used with a wildcard (*) character to select events in a join, or assign new names to the streams in a join:
// Select ProductEvent only select prod.* from ProductEvent as prod, OrderEvent // Assign column names 'product' and 'order' to each event select prod.* as product, ord.* as order from ProductEvent as prod, OrderEvent as ord
Further, the stream name by itself can occur in any expression: The engine passes the event itself to that expression. For example, the engine passes the ProductEvent and the OrderEvent to the user-defined function 'checkOrder':
select prod.productId, MyFunc.checkOrder(prod, ord) from ProductEvent as prod, OrderEvent as ord
Last, you may invoke an instance method on each event of a stream, and pass parameters to the instance method as well. Instance method calls are allowed anywhere in an expression.
The next statement demonstrates this capability by invoking a method 'computeTotal' on OrderEvent events and a method 'getMultiplier' on ProductEvent events:
select ord.computeTotal(prod.getMultiplier()) from ProductEvent as prod, OrderEvent as ord
The where clause is an optional clause in EQL statements. Via the where clause event streams can be joined and events can be filtered.
Comparison operators =, < , > , >=, <=, !=, <>, is null, is not null and logical combinations via and and or are supported in the where clause. The where clause can also introduce join conditions as outlined in Section 4.10, “Joining Event Streams”. where clauses can also contain expressions. Some examples are listed below.
...where fraud.severity = 5 and amount > 500 ...where (orderItem.orderId is null) or (orderItem.class != 10) ...where (orderItem.orderId = null) or (orderItem.class <> 10) ...where itemCount / packageCount > 10
The aggregate functions are sum, avg, count, max, min, median, stddev, avedev. You can use aggregate functions to calculate and summarize data from event properties. For example, to find out the total price for all stock tick events in the last 30 seconds, type:
select sum(price) from StockTickEvent.win:time(30 sec)
Here is the syntax for aggregate functions:
aggregate_function( [all | distinct] expression)
You can apply aggregate functions to all events in an event stream window or other view, or to one or more groups of events. From each set of events to which an aggregate function is applied, Esper generates a single value.
Expression is usually an event property name. However it can also be a constant, function, or any combination of event property names, constants, and functions connected by arithmetic operators.
For example, to find out the average price for all stock tick events in the last 30 seconds if the price was doubled:
select avg(price * 2) from StockTickEvent.win:time(30 seconds)
You can use the optional keyword distinct with all aggregate functions to eliminate duplicate values before the aggregate function is applied. The optional keyword all which performs the operation on all events is the default.
You can use aggregation functions in a select clause and in a having clause. You cannot use aggregate functions in a where clause, but you can use the where clause to restrict the events to which the aggregate is applied. The next query computes the average and sum of the price of stock tick events for the symbol IBM only, for the last 10 stock tick events regardless of their symbol.
select 'IBM stats' as title, avg(price) as avgPrice, sum(price) as sumPrice from StockTickEvent.win:length(10) where symbol='IBM'
In the above example the length window of 10 elements is not affected by the where clause, i.e. all events enter and leave the length window regardless of their symbol. If we only care about the last 10 IBM events, we need to add filter criteria as below.
select 'IBM stats' as title, avg(price) as avgPrice, sum(price) as sumPrice from StockTickEvent(symbol='IBM').win:length(10) where symbol='IBM'
You can use aggregate functions with any type of event property or expression, with the following exceptions:
You can use sum, avg, median, stddev, avedev with numeric event properties only
Esper ignores any null values returned by the event property or expression on which the aggregate function is operating, except for the count(*) function, which counts null values as well. All aggregate functions return null if the data set contains no events, or if all events in the data set contain only null values for the aggregated expression.
The group by clause is optional in all EQL statements. The group by clause divides the output of an EQL statement into groups. You can group by one or more event property names, or by the result of computed expressions. When used with aggregate functions, group by retrieves the calculations in each subgroup. You can use group by without aggregate functions, but generally that can produce confusing results.
For example, the below statement returns the total price per symbol for all stock tick events in the last 30 seconds:
select symbol, sum(price) from StockTickEvent.win:time(30 sec) group by symbol
The syntax of the group by clause is:
group by arregate_free_expression [, arregate_free_expression] [, ...]
Esper places the following restrictions on expressions in the group by clause:
Expressions in the group by cannot contain aggregate functions
Event properties that are used within aggregate functions in the select clause cannot also be used in a group by expression
You can list more then one expression in the group by clause to nest groups. Once the sets are established with group by the aggregation functions are applied. This statement posts the median volume for all stock tick events in the last 30 seconds per symbol and tick data feed. Esper posts one event for each group to statement listeners:
select symbol, tickDataFeed, median(volume) from StockTickEvent.win:time(30 sec) group by symbol, tickDataFeed
In the statement above the event properties in the select list (symbol, tickDataFeed) are also listed in the group by clause. The statement thus follows the SQL standard which prescribes that non-aggregated event properties in the select list must match the group by columns.
Esper also supports statements in which one or more event properties in the select list are not listed in the group by clause. The statement below demonstrates this case. It calculates the standard deviation for the last 30 seconds of stock ticks aggregating by symbol and posting for each event the symbol, tickDataFeed and the standard deviation on price.
select symbol, tickDataFeed, stddev(price) from StockTickEvent.win:time(30 sec) group by symbol
The above example still aggregates the price event property based on the symbol, but produces one event per incoming event, not one event per group.
Additionally, Esper supports statements in which one or more event properties in the group by clause are not listed in the select list. This is an example that calculates the mean deviation per symbol and tickDataFeed and posts one event per group with symbol and mean deviation of price in the generated events. Since tickDataFeed is not in the posted results, this can potentially be confusing.
select symbol, avedev(price) from StockTickEvent.win:time(30 sec) group by symbol, tickDataFeed
Expressions are also allowed in the group by list:
select symbol * price, count(*) from StockTickEvent.win:time(30 sec) group by symbol * price
If the group by expression resulted in a null value, the null value becomes its own group. All null values are aggregated into the same group. If you are using the count(expression) aggregate function which does not count null values, the count returns zero if only null values are encountered.
You can use a where clause in a statement with group by. Events that do not satisfy the conditions in the where clause are eliminated before any grouping is done. For example, the statement below posts the number of stock ticks in the last 30 seconds with a volume larger then 100, posting one event per group (symbol).
select symbol, count(*) from StockTickEvent.win:time(30 sec) where volume > 100 group by symbol
Use the having clause to pass or reject events defined by the group-by clause. The having clause sets conditions for the group by clause in the same way where sets conditions for the select clause, except where cannot include aggregate functions, while having often does.
This statement is an example of a having clause with an aggregate function. It posts the total price per symbol for the last 30 seconds of stock tick events for only those symbols in which the total price exceeds 1000. The having clause eliminates all symbols where the total price is equal or less then 1000.
select symbol, sum(price) from StockTickEvent.win:time(30 sec) group by symbol having sum(price) > 1000
To include more then one condition in the having clause combine the conditions with and, or or not. This is shown in the statement below which selects only groups with a total price greater then 1000 and an average volume less then 500.
select symbol, sum(price), avg(volume) from StockTickEvent.win:time(30 sec) group by symbol having sum(price) > 1000 and avg(volume) < 500
Esper places the following restrictions on expressions in the having clause:
Any expressions that contain aggregate functions must also occur in the select clause
A statement with the having clause should also have a group by clause. If you omit group-by, all the events not excluded by the where clause return as a single group. In that case having acts like a where except that having can have aggregate functions.
The having clause can also be used without group by clause as the below example shows. The example below posts events where the price is less then the current running average price of all stock tick events in the last 30 seconds.
select symbol, price, avg(price) from StockTickEvent.win:time(30 sec) having price < avg(price)
When you include filters, the where condition, the group by clause and the having condition in an EQL statement the sequence in which each clause affects events determines the final result:
The event stream's filter condition, if present, dictates which events enter a window (if one is used). The filter discards any events not meeting filter criteria.
The where clause excludes events that do not meet its search condition.
Aggregate functions in the select list calculate summary values for each group.
The having clause excludes events from the final results that do not meet its search condition.
The following query illustrates the use of filter, where, group by and having clauses in one statement with a select clause containing an aggregate function.
select tickDataFeed, stddev(price) from StockTickEvent(symbol='IBM').win:length(10) where volume > 1000 group by tickDataFeed having stddev(price) > 0.8
Esper filters events using the filter criteria for the event stream StockTickEvent. In the example above only events with symbol IBM enter the length window over the last 10 events, all other events are simply discarded. The where clause removes any events posted by the length window (events entering the window and event leaving the window) that do not match the condition of volume greater then 1000. Remaining events are applied to the stddev standard deviation aggregate function for each tick data feed as specified in the group by clause. Each tickDataFeed value generates one event. Esper applies the having clause and only lets events pass for tickDataFeed groups with a standard deviation of price greater then 0.8.
The group by clause as well as the built-in std:groupby view are similar in their ability to group events. This section explains the key differences in their behavior and use.
The group by clause works together with aggregation functions in your statement to produce an aggregation result per group. In greater detail, this means that when a new event arrives, the engine applies the expressions in the group by clause to determine a grouping key. If the engine has not encountered that grouping key before (a new group), the engine creates a set of new aggregation results for that grouping key and performs the aggregation changing that new set of aggregation results. If the grouping key points to an existing set of prior aggregation results (an existing group), the engine performs the aggregation changing the prior set of aggregation results for that group.
The std:groupby view is a built-in view that also groups events. The view is described in greater detail in Section 8.2.2, “Group-By (std:groupby)”. Its primary use is to create a separate data window per group, or more generally to create separate instances of all its sub-views for each grouping key encountered.
The next example shows two queries that produce equivalent results. The query using the group by clause is generally preferable as is easier to read. The second form introduces the stat:uni view which computes univariate statistics for a given property:
select symbol, avg(price) from StockTickEvent group by symbol // ... is equivalent to ... select symbol, average from StockTickEvent.std:groupby('symbol').stat:uni('price')
The next example shows two queries that are NOT equivalent as the length window is ungrouped in the first query, and grouped in the second query:
select symbol, sum(price) from StockTickEvent.win:length(10) group by symbol // ... NOT equivalent to ... select symbol, sum(price) from StockTickEvent.std:groupby('symbol').win:length(10)
The key difference between the two statements is that in the first statement the length window is ungrouped and applies to all events regardless of group. While in the second query each group gets its own instance of a length window. For example, in the second query events arriving for symbol "ABC" get a length window of 10 events, and events arriving for symbol "DEF" get their own length window of 10 events.
The output clause is optional in Esper and is used to control or stabilize the rate at which events are output. For example, the following statement outputs, every 60 seconds, the total price for all orders in the 30-minute time window:
select sum(price) from OrderEvent.win:time(30 min) output every 60 seconds
Here is the syntax for output rate limiting:
output [all | first | last | snapshot] every output_rate [minutes | seconds | events]
The all keyword is the default and specifies that all events in a batch should be output. The batch size can be specified in terms of time or number of events.
The first keyword specifies that only the first event in an output batch is to be output. Using the first keyword instructs the engine to output the first matching event as soon as it arrives, and then ignore matching events for the time interval or number of events specified. After the time interval elapsed, or the number of matching events has been reached, the next first matching event is output again and the following interval the engine again ignores matching events.
The last keyword specifies to only output the last event at the end of the given time interval or after the given number of matching events have been accumulated.
The snapshot keyword indicates that the engine output current computation results considering all events as per views specified and/or current aggregation results. While the other keywords control how a batch of events between output intervals is being considered, the snapshot keyword outputs all current state of a statement independent of the last batch.
The output_rate is the frequency at which the engine outputs events. The value can be a number to denote a fixed output rate, or the name of a variable whose value is the output rate. By means of a variable the output rate can be controlled externally and changed dynamically at runtime.
The time interval can also be specified in terms of minutes; the following statement is identical to the first one.
select * from StockTickEvent.win:length(5) output every 1.5 minutes
A second way that output can be stabilized is by batching events until a certain number of events have been collected. The next statement only outputs when either 5 (or more) new or 5 (or more) old events have been batched.
select * from StockTickEvent.win:time(30 sec) output every 5 events
Additionally, event output can be further modified by the optional last keyword, which causes output of only the last event to arrive into an output batch.
select * from StockTickEvent.win:time(30 sec) output last every 5 events
Using the first keyword you can be notified at the start of the interval. The allows to watch for situations such as a rate falling below a threshold and only be informed every now and again after the specified output interval, but be informed the moment it first happens.
select * from TickRate.win:time(30 seconds) where rate<100 output first every 60 seconds
The output clause interacts in two ways with the group by and having clauses. First, in the output every n events case, the number n refers to the number of events arriving into the group by clause. That is, if the group by clause outputs only 1 event per group, or if the arriving events don't satisfy the having clause, then the actual number of events output by the statement could be fewer than n.
Second, the last and all keywords have special meanings when used in a statement with aggregate functions and the group by clause. The last keyword specifies that only groups whose aggregate values have been updated with the most recent batch of events should be output. The all keyword (the default) specifies that the most recent data for all groups seen so far should be output, whether or not these groups' aggregate values have just been updated.
By adding an output rate limiting clause to a statement that contains a group by clause we can control output of groups to obtain one row for each group, generating an event per group at the given output frequency:
select symbol, sum(price) from StockTickEvent group by symbol output every 5 seconds
The order by clause is optional in Esper. It is used for ordering output events by their properties, or by expressions involving those properties. For example, the following statement outputs batches of 5 or more stock tick events that are sorted first by price and then by volume.
select symbol from StockTickEvent.win:time(60 sec) output every 5 events order by price, volume
Here is the syntax for the order by clause:
order by expression [asc | desc] [, expression [asc | desc]] [, ...]
Esper places the following restrictions on the expressions in the order by clause:
All aggregate functions that appear in the order by clause must also appear in the select expression.
Otherwise, any kind of expression that can appear in the select clause, as well as any alias defined in the select clause, is also valid in the order by clause.
The insert into clause is optional in Esper. The clause can be specified to make the results of a statement available as an event stream for use in further statements, or to insert events into a named window. The clause can also be used to merge multiple event streams to form a single stream of events.
The syntax for the insert into clause is as follows:
insert [istream | rstream] into event_stream_name [ (property_name [, property_name] ) ]
The istream (default) and rstream keywords are optional. If no keyword or the istream keyword is specified, the engine supplies the insert stream events generated by the statement. The insert stream consists of the events entering the respective window(s) or stream(s). If the rstream keyword is specified, the engine supplies the remove stream events generated by the statement. The remove stream consists of the events leaving the respective window(s).
The event_stream_name is an identifier that names the event stream (and also implicitly names the types of events in the stream) generated by the engine. The identifier can be used in further statements to filter and process events of that event stream. The insert into clause can consist of just an event stream name, or an event stream name and one or more property names.
The engine also allows listeners to be attached to a statement that contain an insert into clause. Listeners receive all events posted to the event stream.
To merge event streams, simply use the same event_stream_name identifier in all EQL statements that merge their result event streams. Make sure to use the same number and names of event properties and event property types match up.
Esper places the following restrictions on the insert into clause:
The number of elements in the select clause must match the number of elements in the insert into clause if the clause specifies a list of event property names
If the event stream name has already been defined by a prior statement or configuration, and the event property names and/or event types do not match, an exception is thrown at statement creation time.
The following sample inserts into an event stream by name CombinedEvent:
insert into CombinedEvent select A.customerId as custId, A.timestamp - B.timestamp as latency from EventA.win:time(30 min) A, EventB.win:time(30 min) B where A.txnId = B.txnId
Each event in the CombinedEvent event stream has two event properties named "custId" and "latency". The events generated by the above statement can be used in further statements, such as shown in the next statement:
select custId, sum(latency) from CombinedEvent.win:time(30 min) group by custId
The example statement below shows the alternative form of the insert into clause that explicitly defines the property names to use.
insert into CombinedEvent (custId, latency) select A.customerId, A.timestamp - B.timestamp ...
The rstream keyword can be useful to indicate to the engine to generate only remove stream events. This can be useful if we want to trigger actions when events leave a window rather then when events enter a window. The statement below generates CombinedEvent events when EventA and EventB leave the window after 30 minutes (1800 seconds).
insert rstream into CombinedEvent select A.customerId as custId, A.timestamp - B.timestamp as latency from EventA.win:time(30 min) A, EventB.win:time(30 min) B where A.txnId = B.txnId
The insert into clause can be used in connection with patterns to provide pattern results to further statements for analysis:
insert into ReUpEvent select linkUp.ip as ip from pattern [every linkDown=LinkDownEvent -> linkUp=LinkUpEvent(ip=linkDown.ip)]
The insert into clause allows to merge multiple event streams into a event single stream. The clause names an event stream to insert into by specifing an event_stream_name. The first statement that inserts into the named stream defines the stream's event types. Further statements that insert into the same event stream must match the type of events inserted into the stream as declared by the first statement.
One approach to merging event streams specifies individual colum names either in the select clause or in the insert into clause of the statement. This approach has been shown in earlier examples.
Another approach to merging event streams specifies the wildcard (*) in the select clause (or the stream wildcard) to select the underlying event. The events in the event stream must then have the same event type as generated by the from clause.
Assume a statement creates an event stream named MergedStream by selecting OrderEvent events:
insert into MergedStream select * from OrderEvent
A statement can use the stream wildcard selector to select only OrderEvent events in a join:
insert into MergedStream select ord.* from ItemScanEvent, OrderEvent as ord
And a statement may also use an application-supplied user-defined function to convert events to OrderEvent instances:
insert into MergedStream select MyLib.convert(item) from ItemScanEvent as item
Esper specifically recognizes a conversion function: A conversion function must be the only selected column, and it must return either a Java object or java.util.Map.
Two or more event streams can be part of the from clause and thus both streams determine the resulting events. The where clause lists the join conditions that Esper uses to relate events in the two or more streams. Reference and historical data such as stored in your relational database can also be included in joins. Please see Section 4.13, “Joining Relational Data via SQL” for details.
Each point in time that an event arrives to one of the event streams, the two event streams are joined and output events are produced according to the where clause.
This example joins 2 event streams. The first event stream consists of fraud warning events for which we keep the last 30 minutes (1800 seconds). The second stream is withdrawal events for which we consider the last 30 seconds. The streams are joined on account number.
select fraud.accountNumber as accntNum, fraud.warning as warn, withdraw.amount as amount, max(fraud.timestamp, withdraw.timestamp) as timestamp, 'withdrawlFraud' as desc from net.esper.example.atm.FraudWarningEvent.win:time(30 min) as fraud, net.esper.example.atm.WithdrawalEvent.win:time(30 sec) as withdraw where fraud.accountNumber = withdraw.accountNumber
Joins can also include one or more pattern statements as the next example shows:
select * from FraudWarningEvent.win:time(30 min) as fraud, pattern [every w=WithdrawalEvent -> PINChangeEvent(acct=w.acct)] as withdraw where fraud.accountNumber = withdraw.w.accountNumber
The statement above joins the last 30 minutes of fraud warnings with a pattern. The pattern consists of every withdrawal event that is followed by a PIN change event for the same account number. It joins the two event streams on account number.
Esper supports left outer joins, right outer joins and full outer joins between an unlimited number of event streams. Outer joins can also join reference and historical data as explained in Section 4.13, “Joining Relational Data via SQL”.
The keywords left, right and full control the type of the join between two streams. The on clause specifies one or more properties that join each stream. The synopsis is as follows:
...from stream_def [as name] left|right|full outer join stream_def on property = property [and property = property ...] [left|right|full outer join stream_def on ...]...
If the outer join is a left outer join, there will be an output event for each event of the stream on the left-hand side of the clause. For example, in the left outer join shown below we will get output for each event in the stream RfidEvent, even if the event does not match any event in the event stream OrderList.
select * from RfidEvent.win:time(30 sec) as rfid left outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId
Similarly, if the join is a Right Outer Join, then there will be an output event for each event of the stream on the right-hand side of the clause. For example, in the right outer join shown below we will get output for each event in the stream OrderList, even if the event does not match any event in the event stream RfidEvent.
select * from RfidEvent.win:time(30 sec) as rfid right outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId
For all types of outer joins, if the join condition is not met, the select list is computed with the event properties of the arrived event while all other event properties are considered to be null.
The last type of outer join is a full outer join. In a full outer join, each point in time that an event arrives to one of the event streams, one or more output events are produced. In the example below, when either an RfidEvent or an OrderList event arrive, one or more output event is produced. The next example shows a full outer join that joins on multiple properties:
select * from RfidEvent.win:time(30 sec) as rfid full outer join OrderList.win:length(10000) as orderlist on rfid.itemId = orderList.itemId and rfid.assetId = orderList.assetId
Finally, this example outer joins multiple streams. Here the RfidEvent stream is outer joined to both ProductName and LocationDescription via left outer join:
select * from RfidEvent.win:time(30 sec) as rfid left outer join ProductName.win:keepall() as refprod on rfid.productId = refprod.prodId left outer join LocationDescription.win:keepall() as refdesc on rfid.location = refdesc.locId
A subquery is a select within another statement. Esper supports subqueries in the select clause and in the where clause of EQL statements. Subqueries provide an alternative way to perform operations that would otherwise require complex joins. Subqueries can also make statements more readable then complex joins.
Esper supports both simple subqueries as well as correlated subqueries. In a simple subquery, the inner query is not correlated to the outer query. Here is an example simple subquery within a select clause:
select assetId, (select zone from ZoneClosed.std:lastevent) as lastClosed from RFIDEvent
If the inner query is dependent on the outer query, we will have a correlated subquery. An example of a correlated subquery is shown below. Notice the where clause in the inner query, where the condition involves a stream from the outer query:
select * from RfidEvent as RFID where 'Dock 1' = (select name from Zones.std:unique('zoneId') where zoneId = RFID.zoneId)
The example above shows a subquery in the where clause. The statement selects RFID events in which the zone name matches a string constant based on zone id. The statement uses the view std:unique to guarantee that only the last event per zone id is held from processing by the subquery.
The next example is a correlated subquery within a select clause. In this statement the select clause retrieves the zone name by means of a subquery against the Zones set of events correlated by zone id:
select zoneId, (select name from Zones.std:unique('zoneId') where zoneId = RFID.zoneId) as name from RFIDEvent
Note that when a simple or correlated subquery returns multiple rows, the engine returns a null value as the subquery result. To limit the number of events returned by a subquery consider using one of the views std:lastevent, std:unique and std:groupby.
The select clause of a subquery also allows wildcard selects, which return as an event property the underlying event object of the event type as defined in the from clause. An example:
select (select * from MarketData.std:lastevent()) as md from pattern [every timer:interval(10 sec)]
The output events to the statement above contain the underlying MarketData event in a property named "md". The statement populates the last MarketData event into a property named "md" every 10 seconds following the pattern definition, or populates a null value if no MarketData event has been encountered so far.
The following restrictions apply to subqueries:
The subquery stream definition must define a data window or other view to limit subquery results, reducing the number of events held for subquery execution
Aggregation functions cannot be used in subqueries. Instead, the insert into clause can be used to provide aggregation results for use in subqueries
Subqueries can only consist of a select clause, a from clause and a where clause. The group by and having clauses, as well as joins, outer-joins and output rate limiting are not permitted within subqueries.
Performance of your statement containing one or more subqueries principally depends on two parameters. First, if your subquery correlates one or more columns in the subquery stream with the enclosing statement's streams via equals '=', the engine automatically builds the appropriate indexes for fast row retrieval based on the key values correlated (joined). The second parameter is the number of rows found in the subquery stream and the complexity of the filter criteria (where clause), as each row in the subquery stream must evaluate against the where clause filter.
The exists condition is considered "to be met" if the subquery returns at least one row. The not exists condition is considered true if the subquery returns no rows.
Let's take a look at a simple example. The following is an EQL statement that uses the exists condition:
select assetId from RFIDEvent as RFID where exists (select * from Asset.std:unique(assetId) where assetId = RFID.assetId)
This select statement will return all RFID events where there is at least one event in Assets unique by asset id with the same asset id.
The in subquery condition is true if the value of an expression matches one or more of the values returned by the subquery. Consequently, the not in condition is true if the value of an expression matches none of the values returned by the subquery.
The next statement demonstrates the use of the in subquery condition:
select assetId from RFIDEvent as RFID where zone in (select zone from ZoneUpdate.win:time(10 min) where status = 'closed' )
The above statement demonstrated the in subquery to select RFID events for which the zone status is in a closed state.
This chapter outlines how reference data and historical data that are stored in a relational database can be queried via SQL within EQL statements.
Esper can join and outer join all types of event streams to stored data. In order for such data sources to become accessible to Esper, some configuration is required. The Section 10.4.7, “Relational Database Access” explains the required configuration for database access in greater detail, and includes information on configuring a query result cache.
Esper does not parse of otherwise inspect your SQL query. Therefore your SQL can make use of any database-specific SQL language extensions or features that your database provides.
If you have enabled query result caching in your Esper database configuration, Esper retains SQL query results in cache following the configured cache eviction policy.
Also if you have enabled query result caching in your Esper database configuration and provide EQL where clause and/or on clause (outer join) expressions, then Esper builds indexes on the SQL query results to enable fast lookup. This is especially useful if your queries return a large number of rows. For building the proper indexes, Esper inspects the expression found in your EQL query where clause, if present. For outer joins, Esper also inspects your EQL query on clause. Esper analyzes the EQL on clause and where clause expressions, if present, looking for property comparison with or without logical AND-relationships between properties. When a SQL query returns rows for caching, Esper builds the appropriate index and lookup strategies for fast row matching against indexes.
The following restrictions currently apply:
Only one event stream and one SQL query can be joined; Joins of two or more event streams with an SQL query are not yet supported.
Sub-views on an SQL query are not allowed; That is, one cannot create a time or length window on an SQL query. However one can use the insert into syntax to make join results available to a further statement.
Your database software must support JDBC prepared statements that provide statement meta data at compilation time. Most major databases provide this function. A workaround is available for databases that do not provide this function.
JDBC drivers must support the getMetadata feature. A workaround is available as below for JDBC drivers that don't support getting metadata.
The next sections assume basic knowledge of SQL (Structured Query Language).
To join an event stream against stored data, specify the sql keyword followed by the name of the database and a parameterized SQL query. The syntax to use in the from clause of an EQL statement is:
sql:database_name [" parameterized_sql_query "]
The engine uses the database_name identifier to obtain configuration information in order to establish a database connection, as well as settings that control connection creation and removal. Please see Section 10.4.7, “Relational Database Access” to configure an engine for database access.
Following the database name is the SQL query to execute. The SQL query can contain one or more substitution parameters. The SQL query string is placed in single brackets [ and ]. The SQL query can be placed in either single quotes (') or double quotes ("). The SQL query grammer is passed to your database software unchanged, allowing you to write any SQL query syntax that your database understands, including stored procedure calls.
Substitution parameters in the SQL query string take the form ${event_property_name}. The engine resolves event_property_name at statement execution time to the actual event property value supplied by the events in the joined event stream.
The engine determines the type of the SQL query output columns by means of the result set metadata that your database software returns for the statement. The actual query results are obtained via the getObject on java.sql.ResultSet.
The sample EQL statement below joins an event stream consisting of CustomerCallEvent events with the results of an SQL query against the database named MyCustomerDB and table Customer:
select custId, cust_name from CustomerCallEvent, sql:MyCustomerDB [' select cust_name from Customer where cust_id = ${custId} ']
The example above assumes that CustomerCallEvent supplies an event property named custId. The SQL query selects the customer name from the Customer table. The where clause in the SQL matches the Customer table column cust_id with the value of custId in each CustomerCallEvent event. The engine executes the SQL query for each new CustomerCallEvent encountered.
If the SQL query returns no rows for a given customer id, the engine generates no output event. Else the engine generates one output event for each row returned by the SQL query. An outer join as described in the next section can be used to control whether the engine should generate output events even when the SQL query returns no rows.
The next example adds a time window of 30 seconds to the event stream CustomerCallEvent. It also renames the selected properties to customerName and customerId to demonstrate how the naming of columns in an SQL query can be used in the select clause in the EQL query. And the example uses explicit stream names via the as keyword.
select customerId, customerName from CustomerCallEvent.win:time(30 sec) as cce, sql:MyCustomerDB ["select cust_id as customerId, cust_name as customerName from Customer where cust_id = ${cce.custId}"] as cq
Any window, such as the time window, generates insert stream (istream) events as events enter the window, and remove stream (rstream) events as events leave the window. The engine executes the given SQL query for each CustomerCallEvent in both the insert stream and the remove stream. As a performance optimization, the istream or rstream keywords in the select clause can be used to instruct the engine to only join insert stream or remove stream events, reducing the number of SQL query executions.
Consider using the EQL where clause to join the SQL query result to your event stream. Similar to EQL joins and outer-joins that join event streams or patterns, the EQL where clause provides join criteria between the SQL query results and the event stream (as a side note, an SQL where clause is a filter of rows executed by your database on your database server before returning SQL query results).
Esper analyzes the expression in the EQL where clause, if present, and builds the appropriate indexes from that information at runtime, to ensure fast matching of event stream events to SQL query results, even if your SQL query returns a large number of rows. Your applications must ensure to configure a cache for your database using Esper configuration, as such indexes are held with regular data in a cache. If you application does not enable caching of SQL query results, the engine does not build indexes on cached data.
The sample EQL statement below joins an event stream consisting of OrderEvent events with the results of an SQL query against the database named MyRefDB and table SymbolReference:
select symbol, symbolDesc from OrderEvent as orders, sql:MyRefDB ['select symbolDesc from SymbolReference'] as reference where reference.symbol = orders.symbol
Notice how the EQL where clause joins the OrderEvent stream to the SymbolReference table. In this example, the SQL query itself does not have a SQL where clause and therefore returns all rows from table SymbolReference.
If your application enables caching, the SQL query fires only at the arrival of the first OrderEvent event. When the second OrderEvent arrives, the join execution uses the cached query result. If the caching policy that you specified in the Esper database configuration evicts the SQL query result from cache, then the engine fires the SQL query again to obtain a new result and places the result in cache.
If SQL result caching is enabled and your EQL where clause, as show in the above example, provides the properties to join, then the engine indexes the SQL query results in cache and retains the index together with the query result in cache. Thus your application can benefit from high performance index-based lookups as long as the SQL query results are found in cache.
You can use outer joins to join data obtained from an SQL query and control when an event is produced. Use a left outer join, such as in the next statement, if you need an output event for each event regardless of whether or not the SQL query returns rows. If the SQL query returns no rows, the join result populates null values into the selected properties.
select custId, custName from CustomerCallEvent as cce left outer join sql:MyCustomerDB ["select cust_id, cust_name as custName from Customer where cust_id = ${cce.custId}"] as cq on cce.custId = cq.cust_id
The statement above always generates at least one output event for each CustomerCallEvent, containing all columns selected by the SQL query, even if the SQL query does not return any rows. Note the on expression that is required for outer joins. The on acts as an additional filter to rows returned by the SQL query.
Pattern statements and SQL queries can also be applied together in useful ways. One such use is to poll or request data from a database at regular intervals. The next statement is an example that shows a pattern that fires every 5 seconds to query the NewOrder table for new orders:
insert into NewOrders select orderId, orderAmount from pattern [every timer:interval(5 sec)], sql:MyCustomerDB ['select orderId, orderAmount from NewOrders']
The engine translates SQL queries into JDBC java.sql.PreparedStatement statements by replacing ${name} parameters with '?' placeholders. It obtains name and type of result columns from the compiled PreparedStatement meta data when the EQL statement is created.
The engine supplies parameters to the compiled statement via the setObject method on PreparedStatement. The engine uses the getObject method on the compiled statement PreparedStatement to obtain column values.
Certain JDBC database drivers are known to not return metadata for precompiled prepared SQL statements. This can be a problem as metadata is required by Esper. Esper obtains SQL result set metadata to validate an EQL statement and to provide column types for output events. JDBC drivers that do not provide metadata for precompiled SQL statements require a workaround. Such drivers do generally provide metadata for executed SQL statements, however do not provide the metadata for precompiled SQL statements.
Please consult the Chapter 10, Configuration for the configuration options available in relation to metadata retrieval.
To obtain metadata for an SQL statement, Esper can alternatively fire a SQL statement which returns the same column names and types as the actual SQL statement but without returning any rows. This kind of SQL statement is referred to as a sample statement in below workaround description. The engine can then use the sample SQL statement to retrieve metadata for the column names and types returned by the actual SQL statement.
Applications can provide a sample SQL statement to retrieve metadata via the metadatasql keyword:
sql:database_name ["parameterized_sql_query" metadatasql "sql_meta_query"]
The sql_meta_query must be an SQL statement that returns the same number of columns, the same type of columns and the same column names as the parameterized_sql_query, and does not return any rows.
Alternatively, applications can choose not to provide an explicit sample SQL statement. If the EQL statement does not use the metadatasql syntax, the engine applies lexical analysis to the SQL statement. From the lexical analysis Esper generates a sample SQL statement adding a restrictive clause "where 1=0" to the SQL statement.
Alternatively, applications can add the following tag to the SQL statement: ${$ESPER-SAMPLE-WHERE}. If the tag exists in the SQL statement, the engine does not perform lexical analysis and simply replaces the tag with the SQL where clause "where 1=0". Therefore this workaround is applicable to SQL statements that cannot be correctly lexically analyzed. The SQL text after the placeholder is not part of the sample query. For example:
select mycol from sql:myDB [ 'select mycol from mytesttable ${$ESPER-SAMPLE-WHERE} where ....'], ...
Your application may need to join data that originates from a web service, a distributed cache, an object-oriented database or simply data held in memory by your application. Esper accommodates this need by allowing a method invocation (or procedure call or function) in the from clause of a statement.
Esper can join and outer join all types of event streams to the data returned by your method invocation. In addition, Esper can be configured to cache the data returned by your method invocations.
The following restrictions currently apply:
Only one event stream and one method invocation can be joined; That is, in a join with a method invocation only one other event stream is allowed.
Sub-views on a method invocations are not allowed; That is, one cannot create a time or length window on a method invocation. However one can use the insert into syntax to make join results available to a further statement.
The syntax for a method invocation in the from clause of an EQL statement is:
method:class_name.method_name[(parameter_expressions)]
The method keyword denotes a method invocation. It is followed by a class name and a method name separated by a dot (.) character. If you have parameters to your method invocation, these are placed in round brackets after the method name. Any expression is allowed as a parameter, and individual parameter expressions are separated by a comma. Expressions may also use event properties of the joined stream.
In the sample join statement shown next, the method 'lookupAsset' provided by class 'MyLookupLib' returns one or more rows based on the asset id (a property of the AssetMoveEvent) that is passed to the method:
select * from AssetMoveEvent, MyLookupLib.lookupAsset(assetId)
The following statement demonstrates the use of the where clause to join events to the rows returned by a method invocation, which in this example does not take parameters:
select assetId, assetDesc from AssetMoveEvent as asset, MyLookupLib.getAssetDescriptions() as desc where asset.assetid = desc.assetid
Your method invocation may return zero, one or many rows for each method invocation. If you have caching enabled through configuration, then Esper can avoid the method invocation and instead use cached results. Similar to SQL joins, Esper also indexes cached result rows such that join operations based on the where clause can be very efficient, especially if your method invocation returns a large number of rows.
If the time taken by method invocations is critical to your application, you may configure local caches as Section 10.4.5, “Cache Settings for Method Invocations” describes.
Your application must provide a Java class that exposes a public static method. The method must accept the same number and type of parameters as listed in the parameter expression list.
If your method invocation returns either no row or only one row, then the return type of the method can be a Java class. If your method invocation can return more then one row, then the return type of the method must be an array of Java class. The Java class in the return type must adhere to JavaBean conventions: it must expose properties through getter methods.
The method must return either of the following:
A null value or an empty array to indicate an empty result (no rows).
A Java object to indicate a one-row result, or an array that consists of a single Java object.
An array of Java objects to return multiple result rows.
As an example, consider the method 'getAssetDescriptions' provided by class 'MyLookupLib' as discussed earlier:
select assetId, assetDesc from AssetMoveEvent as asset, com.mypackage.MyLookupLib.getAssetDescriptions() as desc where asset.assetid = desc.assetid
The 'getAssetDescriptions' method may return multiple rows and is therefore declared to return an array of the class 'AssetDesc'. The class AssetDesc is a POJO class (not shown here):
public class MyLookupLib { ... public static AssetDesc[] getAssetDescriptions() { ... return new AssetDesc[] {...}; }
The example above specifies the full Java class name of the 'MyLookupLib' class in the EQL statement. The package name does not need to be part of the EQL if your application imports the package using the auto-import configuration through the API or XML, as outlined in Section 10.4.4, “Class and package imports”.
A named window is a global data window that can take part in many statement queries, and that can be inserted-into and deleted-from by multiple statements.
The create window clause declares a new named window. The named window starts up empty. Events must be inserted into the named window using the insert into clause. Events can also be deleted from a named window via the on delete clause.
Events enter the named window by means of insert into clause of a select statement. Events leave a named window either because the expiry policy of the declared data window removes events from the named window, or through statements that use the on delete clause to explicitly delete from a named window.
To query a named window, simply use the window name in the from clause of your statement, including statements that contain subqueries, joins and outer-joins.
The create window statement creates a named window by specifying a window name and one or more data window views, as well as the type of event to hold in the named window.
The syntax for creating a named window is as follows:
create window window_name.view_specifications as [select list_of_properties from] event_type
The window_name you assign to the named window can be any identifier. The name should not already be in use as an event type or stream name.
The view_specifications are one or more data window views that define the expiry policy for removing events from the data window. Named windows must explicitly declare a data window view. This is required to ensure that the policy for retaining events in the data window is well defined. To keep all events, use the keep-all view: It indicates that the named window should keep all events and only remove events from the named window that are deleted via the on delete clause. The view specification can only list data window views, derived-value views are not allowed since these don't represent an expiry policy.
The select clause and list_of_properties are optional. If present, they specify the column names and, implicitly by definition of the event type, the column types of events held by the named window. Expressions other then column names are not allowed in the select list of properties. Wildcards (*) and wildcards with additional properties can also be used.
Finally, the event_type is required and provides the name of the event type of events held in the data window, unless column names and types have been explicitly selected via select.
The next statement creates a named window 'AllOrdersNamedWindow' for which the expiry policy is simply to keep all events. Assume that the event type 'OrderMapEventType' has been configured. The named window is to hold events of type 'OrderMapEventType':
create window AllOrdersNamedWindow.win:keepall() as OrderMapEventType
The below sample statement demonstrates the select syntax. It defines a named window in which each row has the three properties 'symbol', 'volume' and 'price'. This named window actively removes events from the window that are older then 30 seconds.
create window OrdersTimeWindow.win:time(30 sec) as select symbol, volume, price from OrderEvent
In an alternate form, the as keyword can be used to rename columns:
create window OrdersTimeWindow.win:time(30 sec) as select symbol as sym, volume as vol, price from OrderEvent
A new named window starts up empty. It must be explicitly inserted into by one or more statements, as discussed below.
If your application stops or destroys the statement that creates the named window, any consuming statements no longer receive insert or remove stream events. The named window can also not be deleted from after it was stopped or destroyed.
The create window statement posts to listeners any events that are inserted into the named window as new data. The statement posts all deleted events or events that expire out of the data window to listeners as the remove stream (old data). The named window contents can also be iterated on via the pull API to obtain the current contents of a named window.
An on delete clause removes events from a named window. The clause can be used to remove all events, or only events that match certain criteria, or events that correlate with an arriving event or a pattern of arriving events.
The syntax for the on delete clause is as follows:
on event_type[(filter_criteria)] [as alias_name] delete from window_name [as alias_name] [where criteria_expression]
The event_type is the name or alias of the type of events that trigger removal from the named window. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as keyword can be used to assign an alias for use in the where clause. Patterns can also be specified in the on clause as described in the next section.
The window_name is the name of the named window to delete events from. The as keyword is also available to assign an alias to the named window.
The optional where clause contains a criteria_expression that correlates the arriving (triggering) event to the events to be removed from the named window. The criteria_expression may also simply filter for events in the named window to be removed from the named window.
The iterator of the EPStatement object representing the on delete clause can also be helpful: It returns the last batch of deleted events in response to the last triggering event, in any order, or null if the last triggering event did not remove any rows.
Let's look at a couple of examples. In the simplest form, this statement deletes all events from the named window 'AllOrdersNamedWindow' when any 'FlushOrderEvent' arrives:
on FlushOrderEvent delete from AllOrdersNamedWindow
This example adds a where clause to the example above. Upon arrival of a triggering 'ZeroVolumeEvent', the statement removes from the named window any orders that have a volume of zero or less:
on ZeroVolumeEvent delete from AllOrdersNamedWindow where volume <= 0
The next example shows a more complete use of the syntax, and correlates the triggering event with events held by the named window:
on NewOrderEvent(volume>0) as myNewOrders delete from AllOrdersNamedWindow as myNamedWindow where myNamedWindow.symbol = myNewOrders.symbol
In the above sample statement, only if a 'NewOrderEvent' event with a volume greater then zero arrives does the statement trigger. Upon triggering, all events in the named window that have the same value for the symbol property as the triggering 'NewOrderEvent' event are then removed from the named window. The statement also showcases the as keyword to assign alias names for use in the where expression.
For correlated queries (as above) that correlate triggering events with events held by a named window, Esper internally creates efficient indexes to enable high performance removal of events especially from named windows that hold large numbers of events.
Your application can subscribe a listener to your on delete statements to determine removed events. The statement post any events that are deleted from a named window to all listeners attached to the statement as new data. Upon iteration, the statement provides the last deleted event, if any.
By means of patterns the on delete clause and on select clause (described below) can look for more complex conditions to occur, possibly involving multiple events or the passing of time. The syntax for on delete with a pattern expression is show next:
on pattern [pattern_expression] [as alias_name] delete from window_name [as alias_name] [where criteria_expression]
The pattern_expression is any pattern that matches zero or more arriving events. Tags can be used to name events in the pattern and can occur in the optional where clause to correlate to events to be removed from a named window.
In the next example the triggering pattern fires every 10 seconds. The effect is that every 10 seconds the statement removes from 'MyNamedWindow' all rows:
on pattern [every timer:interval(10 sec)] delete from MyNamedWindow
The following example shows the use of tags in a pattern:
on pattern [every ord=OrderEvent(volume>0) or every flush=FlushOrderEvent] delete from OrderWindow as win where ord.id = win.id or flush.id = win.id
The pattern above looks for OrderEvent events with a volume value greater then zero and tags such events as 'ord'. The pattern also looks for FlushOrderEvent events and tags such events as 'flush'. The where clause deletes from the 'OrderWindow' named window any events that match in the value of the 'id' property either of the arriving events.
The insert into clause inserts events into named windows. Your application must ensure that the column names and types match the declared column names and types of the named window to be inserted into.
In this example we first create a named window using some of the columns of an OrderEvent event type:
create window OrdersWindow.win:keepall() as select symbol, volume, price from OrderEvent
The insert into the named window selects individual columns to be inserted:
insert into OrdersWindow(symbol, volume, price) select name, count, price from FXOrderEvent // .. alternative form... insert into OrdersWindow select name as symbol, vol as volume, price from FXOrderEvent
Following above statement, the engine enters every FXOrderEvent arriving into the engine into the named window 'OrdersWindow'.
The following EQL creates a named window for an event type backed by a Java class, and inserts into the window any 'OrderEvent' where the symbol value is IBM:
create window OrdersWindow as com.mycompany.OrderEvent insert into OrdersWindow select * from com.mycompany.OrderEvent(symbol='IBM')
The last example adds one column named 'derivedPrice' to the 'OrderEvent' type by specifying a wildcard, and uses a user-defined function to populate the column:
create window OrdersWindow as select *, price as derivedPrice from OrderEvent insert into OrdersWindow select *, MyFunc.func(price, percent) as derivedPrice from OrderEvent
Event representations based on Java base classes or interfaces, and subclasses or implementing classes, are compatible as these statements show:
// create a named window for the base class create window OrdersWindow as select * from ProductBaseEvent // The ServiceProductEvent class subclasses the ProductBaseEvent class insert into OrdersWindow select * from ServiceProductEvent // The MerchandiseProductEvent class subclasses the ProductBaseEvent class insert into OrdersWindow select * from MerchandiseProductEvent
To avoid duplicate events stored in a named window, use a subquery to test whether an event already exists in the named window:
insert into OrdersWindow select * from ServiceProductEvent as spe where not exists (select * from OrdersWindow as win where win.id = spe.id)
A statement that removes events from a named window via the on delete clause and a statement that inserts events into a named window via the insert into can be combined to replace events in the named window, by creating the two statements in the order as indicated by the sample:
// create in this order on ServiceProductEvent as spe delete from OrdersWindow as win where win.id = spe.id insert into OrdersWindow select * from ServiceProductEvent
A named window can be referred to by any statement in the from clause of the statement. Filter criteria can also be specified. Additional views may be used onto named windows however such views cannot include data window views.
A statement selecting all events from a named window 'AllOrdersNamedWindow' is shown next. The named window must first be created via the create window clause before use.
select * from AllOrdersNamedWindow
The statement as above simply receives the unfiltered insert and remove stream of the named window and reports that stream to its listeners. An iterator on such statement returns the last new event in the named window, if any.
The next statement derives an average price per symbol from all events posted by a named window:
select symbol, avg(price) from AllOrdersNamedWindow group by symbol
Your application may create a consuming statement such as above on an empty named window, or your application may create the above statement on an already filled named window. The engine provides correct results in either case: At the time of statement creation the Esper engine internally initializes the consuming statement from the current named window, also taking your declared filters into consideration. Thus, your statement deriving data from a named window does not start empty if the named window already holds one or more events.
If you require a subset of the data in the named window, you can specify one or more filter expressions onto the named window as shown here:
select symbol, avg(price) from AllOrdersNamedWindow(sector='energy') group by symbol
By adding a filter to the named window, the aggregation and grouping as well as any views that may be declared onto to the named window receive a filtered insert and remove stream. The above statement thus outputs, continuously, the average price per symbol for all orders in the named window that belong to a certain sector.
The following example further declares a view into the named window. Such a view can be a plug-in view or one of the built-in views, but cannot be a data window view (with the exception of the group-by view which is allowed).
select * from AllOrdersNamedWindow(volume>0, price>0).mycompany:mypluginview()
Data window views cannot be used onto named windows since named windows post insert and remove streams for the events entering and leaving the named window, thus the expiry policy and batch behavior are well defined by the data window declared for the named window. For example, the following is not allowed and fails at time of statement creation:
// not a valid statement select * from AllOrdersNamedWindow.win:time(30 sec)
The on select clause performs a one-time, non-continuous query on a named window every time a triggering event arrives or a triggering pattern matches. The query can consider all events in the named window, or only events that match certain criteria, or events that correlate with an arriving event or a pattern of arriving events.
The syntax for the on select clause is as follows:
on event_type[(filter_criteria)] [as alias_name] [insert into insert_into_def] select select_list from window_name [as alias_name] [where criteria_expression] [group by grouping_expression_list] [having grouping_search_conditions] [order by order_by_expression_list]
The event_type is the name or alias of the type of events that trigger the query against the named window. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as keyword can be used to assign an alias. Patterns can also be specified in the on clause, see the samples in Section 4.15.2.1, “Using Patterns in the On Delete Clause”.
The insert into clause works as described in Section 4.9, “Merging Streams and Continuous Insertion: the Insert Into Clause”. The select clause is described in Section 4.3, “Choosing Event Properties And Events: the Select Clause”. For all clauses the semantics are equivalent to a join operation: The properties of the triggering event or events are available in the select clause and all other clauses.
The window_name in the from clause is the name of the named window to select events from. The as keyword is also available to assign an alias to the named window. The as keyword is helpful in conjunction with wildcard in the select clause to select named window events via the syntax select alias.* .
The optional where clause contains a criteria_expression that correlates the arriving (triggering) event to the events to be considered from the named window. The criteria_expression may also simply filter for events in the named window to be considered by the query.
The group by clause, the having clause and the order by clause are all optional and work as described in earlier chapters.
The similarities and differences between an on select clause and a regular or outer join are as follows:
A join is evaluated when any of the streams participating in the join have new events (insert stream) or events leaving data windows (remove stream). A join is therefore bi-directional or multi-directional. However, the on select statement has one triggering event or pattern that causes the query to be evaluated and is thus uni-directional.
The query within the on select statement is not continuous: It executes only when a triggering event or pattern occurs. Aggregation and groups are computed anew considering the contents of the named window at the time the triggering event arrives.
The iterator of the EPStatement object representing the on select clause returns the last batch of selected events in response to the last triggering event, or null if the last triggering event did not select any rows.
For correlated queries that correlate triggering events with events held by a named window, Esper internally creates efficient indexes to enable high performance querying of events. It analyzes the where clause to build one or more indexes for fast lookup in the named window based on the properties of the triggering event.
The next statement demonstrates the concept. Upon arrival of a QueryEvent event the statement selects all events in the 'OrdersNamedWindow' named window:
on QueryEvent select win.* from OrdersNamedWindow as win
The engine executes the query on arrival of a triggering event, in this case a QueryEvent. It posts the query results to any listeners to the statement, in a single invocation, as the new data array. By prefixing the wildcard (*) selector with the stream name, the select clause returns only events of the named window and does not also return triggering events.
The where clause filters and correlates events in the named window with the triggering event, as shown next:
on QueryEvent(volume>0) as query select query.symbol, query.volume, win.symbol from OrdersNamedWindow as win where win.symbol = query.symbol
Upon arrival of a QueryEvent, if that event has a value for the volume property that is greater then zero, the engine executes the query. The query considers all events currently held by the 'OrdersNamedWindow' that match the symbol property value of the triggering QueryEvent event. The engine then posts query results to the statement's listeners.
Aggregation, grouping and ordering of results are possible as this example shows:
on QueryEvent as queryEvent select symbol, sum(volume) from OrdersNamedWindow as win group by symbol having volume > 0 order by symbol
The above statement outputs the total volume per symbol for those groups where the sum of the volume is greater then zero, ordered by symbol ascending. The engine computes and posts the output based on the current contents of the 'OrdersNamedWindow' named window considering all events in the named window, since the query does not have a where clause.
The on insert clause is an on select clause as described in the prior chapter with the addition of an insert into clause.
Similar to the on select clause, the engine executes the query when a triggering event arrives. It then provides the query results as an event stream to further statements. It populates the event stream that is named in the insert into clause.
The statement below provides the query results to any consumers of the MyOrderStream, upon arrival of a QueryEvent event:
on QueryEvent as query insert into MyOrderStream select win.* from OrdersNamedWindow as win
Here is a sample consuming statement of the MyOrderStream. The statement further filters the events provided by the on insert statement by user id and reports a total of volume per symbol:
select symbol, sum(volume) from MyOrderStream(userId='user1') group by symbol
A variable is a scalar value that is available for use in all statements including patterns. Variables can be used in an expression anywhere in a statement as well as in the output clause for output rate limiting.
Variables must first be declared or configured before use, by defining each variable's type and name. Variables can be created via the create variable syntax or declared by configuration. Variables can be assigned new values by using the on set syntax.
The engine guarantees consistency and atomicity of variable reads and writes on a statement-level (this is a soft guarantee, see below). Variables are optimized for fast read access and are also multithread-safe.
The create variable syntax creates a new variable by defining the variable type and name. In alternative to the syntax, variables can also be declared in the runtime and engine configuration options.
The synopsis for creating a variable is as follows:
create variable variable_type variable_name [ = assignment_expression ]
The variable_type can be any of the following:
variable_type : string | char | character | bool | boolean | byte | short | int | integer | long | double | float
The variable_name is an identifier that names the variable. The variable name should not already be in use by another variable.
The assignment_expression is optional. Without an assignment expression the initial value for the variable is null. If present, it supplies the initial value for the variable.
The EPStatement object of the create variable statement provides access to variable values. The pull API methods iterator and safeIterator return the current variable value. Listeners to the create variable statement subscribe to changes in variable value: the engine posts new and old value of the variable to all listeners when the variable value is updated by an on set statement.
The example below creates a variable that provides a threshold value. The name of the variable is var_threshold and its type is long. The variable's initial value is null as no other value has been assigned:
create variable long var_threshold
This statement creates an integer-type variable named var_output_rate and initializes it to the value ten (10):
create variable integer var_output_rate = 10
In addition to creating a variable via the create variable syntax, the runtime and engine configuration API also allows adding variables. The next code snippet illustrates the use of the runtime configuration API to create a string-typed variable:
epService.getEPAdministrator().getConfiguration() .addVariable("myVar", String.class, "init value");
The on set statement assigns a new value to one or more variables when a triggering event arrives or a triggering pattern occurs.
The synopsis for setting variable values is:
on event_type[(filter_criteria)] [as alias_name] set variable_name = expression [, variable_name = expression [,...]]
The event_type is the name or alias of the type of events that trigger the variable assignments. It is optionally followed by filter_criteria which are filter expressions to apply to arriving events. The optional as keyword can be used to assign an alias. Patterns can also be specified in the on clause.
The comma-separated list of variable names and expressions set the value of one or more variables. All new variable values are applied atomically: the changes to variable values by the on set statement become visible to other statements all at the same time. No changes are visible to other processing threads until the on set statement completed processing, and at that time all changes become visible at once.
The EPStatement object provides access to variable values. The pull API methods iterator and safeIterator return the current variable values for each of the variables set by the statement. Listeners to the statement subscribe to changes in variable values: the engine posts new variable values of all variables to any listeners.
In the following example, a variable by name var_output_rate has been declared previously. When a NewOutputRateEvent event arrives, the variable is updated to a new value supplied by the event property 'rate':
on NewOutputRateEvent set var_output_rate = rate
The next example shows two variables that are updated when a ThresholdUpdateEvent arrives:
on ThresholdUpdateEvent as t set var_threshold_lower = t.lower, var_threshold_higher = t.higher
The sample statement shown next counts the number of pattern matches using a variable. The pattern looks for OrderEvent events that are followed by CancelEvent events for the same order id within 10 seconds of the OrderEvent:
on pattern[every a=OrderEvent -> (CancelEvent(orderId=a.orderId) where timer:within(10 sec))] set var_counter = var_counter + 1
A variable name can be used in any expression and can also occur in an output rate limiting clause. This section presents examples and discusses performance, consistency and atomicity attributes of variables.
The next statement assumes that a variable named 'var_threshold' was created to hold a total price threshold value. The statement outputs an event when the total price for a symbol is greater then the current threshold value:
select symbol, sum(price) from TickEvent group by symbol having sum(price) > var_threshold
In this example we use a variable to dynamically change the output rate on-the-fly. The variable 'var_output_rate' holds the current rate at which the statement posts a current count to listeners:
select count(*) from TickEvent output every var_output_rate seconds
Variables are optimized towards high read frequency and lower write frequency. Variable reads do not incur locking overhead (99% of the time) while variable writes do incur locking overhead.
The engine softly guarantees consistency and atomicity of variables when your statement executes in response to an event or timer invocation. Variables acquire a stable value (implemented by versioning) when your statement starts executing in response to an event or timer invocation, and variables do not change value during execution. When one or more variable values are updated via on set statements, the changes to all updated variables become visible to statements as one unit and only when the on set statement completes successfully.
The atomicity and consistency guarantee is a soft guarantee. If any of your application statements, in response to an event or timer invocation, execute for a time interval longer then 15 seconds (default interval length), then the engine may use current variable values after 15 seconds passed, rather then then-current variable values at the time the statement started executing in response to an event or timer invocation.
The length of the time interval that variable values are held stable for the duration of execution of a given statement is by default 15 seconds, but can be configured via engine default settings.