Esper has 2 primary interfaces that this section outlines: The administrative interface and the runtime interface.
Use Esper's administrative interface to create and manage EQL and pattern statements, and set runtime configurations, as discussed in Section 4.1, “EQL Introduction” and Section 5.1, “Event Pattern Overview”.
Use Esper's runtime interface to send events into the engine, emit events and get statistics for an engine instance.
The JavaDoc documentation is also a great source for API information.
Each instance of an Esper engine is completely independent of other engine instances and has its own administrative and runtime interface.
An instance of the Esper engine is obtained via static methods on the EPServiceProviderManager class. The getDefaultProvider method and the getProvider(String URI) methods return an instance of the Esper engine. The latter can be used to obtain multiple instances of the engine for different URI values. The EPServiceProviderManager determines if the URI matches all prior URI values and returns the same engine instance for the same URI value. If the URI has not been seen before, it creates a new engine instance.
The code snipped below gets the default instance Esper engine. Subsequent calls to get the default engine instance return the same instance.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
This code snippet gets an Esper engine for URI RFIDProcessor1. Subsequent calls to get an engine with the same URI return the same instance.
EPServiceProvider epService = EPServiceProviderManager.getProvider("RFIDProcessor1");
An existing Esper engine instance can be reset via the initialize method on the EPServiceProvider instance. This stops and removes all statements in the Engine.
Create event pattern expression and EQL statements via the administrative interface EPAdministrator.
This code snippet gets an Esper engine then creates an event pattern and an EQL statement.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPStatement 10secRecurTrigger = admin.createPattern( "every timer:at(*, *, *, *, *, */10)"); EPStatement countStmt = admin.createEQL( "select count(*) from MarketDataBean.win:time(60 sec)");
Note that event pattern expressions can also occur within EQL statements. This is outlined in more detail in Section 4.4.2, “Pattern-based Event Streams”.
The create methods on EPAdministrator are overloaded and allow an optional statement name to be passed to the engine. A statement name can be useful for retrieving a statement by name from the engine at a later time. The engine assigns a statement name if no statement name is supplied on statement creation.
The createPattern and createEQL methods return EPStatement instances. Statements are automatically started and active when created. A statement can also be stopped and started again via the stop and start methods shown in the code snippet below.
countStmt.stop(); countStmt.start();
We can subscribe to updates posted by a statement via the addListener and removeListener methods on EPStatement . We need to provide an implementation of the UpdateListener or the StatementAwareUpdateListener interface to the statement:
UpdateListener myListener = new MyUpdateListener(); countStmt.addListener(myListener);
EQL statements and event patterns publish old data and new data to registered UpdateListener listeners. New data published by statements is the events representing the new values of derived data held by the statement. Old data published by statements constists of the events representing the prior values of derived data held by the statement.
It is important to understand that UpdateListener listeners receive multiple result rows in one invocation by the engine: the new data and old data parameters to your listener are array parameters. For example, if your application uses one of the batch data windows, or your application creates a pattern that matches multiple times when a single event arrives, then the engine indicates such multiple result rows in one invocation and your new data array carries two or more rows.
A second listener interface is the StatementAwareUpdateListener interface. A StatementAwareUpdateListener is especially useful for registering the same listener object with multiple statements, as the listener receives the statement instance and engine instance in addition to new and old data when the engine indicates new results to a listener.
StatementAwareUpdateListener myListener = new MyStmtAwareUpdateListener(); statement.addListener(myListener);
To indicate results the engine invokes this method on StatementAwareUpdateListener listeners: update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider)
Subscribing to events posted by a statement is following a push model. The engine pushes data to listeners when events are received that cause data to change or patterns to match. Alternatively, you need to know that statements serve up data that your application can obtain via the safeIterator and iterator methods on EPStatement. This is called the pull API and can come in handy if your application is not interested in all new updates, and only needs to perform a frequent or infrequent poll for the latest data.
The safeIterator method on EPStatement returns a concurrency-safe iterator returning current statement results, even while concurrent threads may send events into the engine for processing. The safe iterator guarantees correct results even as events are being processed by other threads. The cost is that the iterator obtains and holds a statement lock that must be released via the close method on the SafeIterator instance.
The iterator method on EPStatement returns a concurrency-unsafe iterator. This iterator is only useful for applications that are single-threaded, or applications that themselves perform coordination between the iterating thread and the threads that send events into the engine for processing. The advantage to this iterator is that it does not hold a lock.
The next code snippet shows a short example of use of safe iterators:
EPStatement statement = epAdmin.createEQL("select avg('price') as avgPrice from MyTick"); // .. send events into the engine // then use the pull API... SafeIterator<EventBean> safeIter = statement.safeIterator(); try { for (;safeIter.hasNext();) { // .. process event .. EventBean event = safeIter.next(); System.out.println("avg:" + event.get("avgPrice"); } } finally { safeIter.close(); // Note: safe iterators must be closed }
This is a short example of use of the regular iterator that is not safe for concurrent event processing:
double averagePrice = (Double) eqlStatement.iterator().next().get("average");
The safeIterator and iterator methods can be used to pull results out of all statements, including statements that join streams, contain aggregation functions, pattern statements, and statements that contain a where clause, group by clause, having clause or order by clause.
For statements without an order by clause, the iterator method returns events in the order maintained by the data window. For statements that contain an order by clause, the iterator method returns events in the order indicated by the order by clause.
Esper places the following restrictions on the pull API and usage of the safeIterator and iterator methods:
In multithreaded applications, use the safeIterator method. Note: make sure your application closes the iterator via the close method when done, otherwise the iterated statement stays locked and event processing for that statement does not resume.
In multithreaded applications, the iterator method does not hold any locks. The iterator returned by this method does not make any guarantees towards correctness of results and fail-behavior, if your application processes events into the engine instance by multiple threads. Use the safeIterator method for concurrency-safe iteration instead.
Since the safeIterator and iterator methods return events to the application immediately, the iterator does not honor an output rate limiting clause, if present. That is, the iterator returns results as if there is no output-rate clause for the statement. Use a separate statement and the insert into clause to control the output rate for iteration.
The EPAdministrator interface provides the facilities for managing statements:
Use the getStatement method to obtain an existing started or stopped statement by name
Use the getStatementNames methods to obtain a list of started and stopped statement names
Use the startAllStatements, stopAllStatements and destroyAllStatements methods to manage all statements in one operation
Certain configuration changes are available to perform on an engine instance while in operation. Such configuration operations are available via the getConfiguration method on EPAdministrator, which returns an ConfigurationOperations object.
The configuration operations available on a running engine instance are as follows. Please see Chapter 10, Configuration for more information.
Add an new event type for a JavaBean class, legacy Java class or custom Java class
Add an new DOM XML event type
Add an new Map-based event type
The EPRuntime interface is used to send events for processing into an Esper engine, and to emit Events from an engine instance to the outside world.
The below code snippet shows how to send a Java object event to the engine. Note that the sendEvent method is overloaded. As events can take on different representation classes in Java, the sendEvent takes parameters to reflect the different types of events that can be send into the engine. The Chapter 2, Event Representations section explains the types of events accepted.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); // Send an example event containing stock market data runtime.sendEvent(new MarketDataBean('IBM', 75.0));
Events, in theoretical terms, are observations of a state change that occured in the past. Since one cannot change an event that happened in the past, events are best modelled as immutable objects.
Note that the Esper engine relies on events that are sent into an engine to not change their state. Typically, applications create a new event object for every new event, to represent that new event. Application should not modify an existing event that was sent into the engine.
Another important method in the runtime interface is the route method. This method is designed for use by UpdateListener implementations that need to send events into an engine instance.
Your application can register an implementation of the UnmatchedListener interface with the EPRuntime runtime via the setUnmatchedListener method to receive events that were not matched by any statement.
Events that can be unmatched are all events that your application sends into the runtime via one of the sendEvent or route methods, or that have been generated via an insert into clause.
For an event to become unmatched by any statement, the event must not match any statement's event stream filter criteria. Note that the EQL where clause or having clause are not considered part of the filter criteria for a stream, as explained by example below.
In the next statement all MyEvent events match the statement's event stream filter criteria, regardless of the value of the 'quantity' property. As long as the below statement remains started, the engine would not deliver MyEvent events to your registered UnmatchedListener instance:
select * from MyEvent where quantity > 5
In the following statement a MyEvent event with a 'quantity' property value of 5 or less does not match this statement's event stream filter criteria. The engine delivers such an event to the registered UnmatchedListener instance provided no other statement matches on the event:
select * from MyEvent(quantity > 5)
For patterns, if no pattern sub-expression is active for an event type, an event of that type also counts as unmatched in regards to the pattern statement.
The emit and addEmittedListener methods can be used to emit events from a runtime to a registered set of one or more emitted event listeners. This mechanism is available as a service to enable channel-based publish-subscribe of events emitted from an engine instance via the emit method. Emitting events is not integrated with EQL and is available only via the EPRuntime interface.
Events are emitted on an event channel identified by a name. Listeners are implementations of the EmittedListener interface. Via the addEmittedListener method a listener can be added to the specified event channel. The lister receives only those events posted to that channel. The channel parameter to addEmittedListener also allows null values. If a null channel value is specified, the listeners receives emitted events posted on any channel.
Special events are provided that can be used to control the time-keeping of an engine instance. There are two models for an engine to keep track of time. Internal clocking is when the engine instance relies on the java.util.Timer class for time tick events. External clocking can be used to supply time ticks to the engine. The latter is useful for testing time-based event sequences or for synchronizing the engine with an external time source.
By default, the Esper engine uses internal time ticks. This behavior can be changed by sending a timer control event to the engine as shown below.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); // switch to external clocking runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL)); // send a time tick long timeInMillis = System.currentTimeMillis(); // Or get the time somewhere else runtime.sendEvent(new CurrentTimeEvent(timeInMillis));
We recommend that when disabling the internal timer, applications send an external timer event setting the start time before creating statements, such that statement start time is well-defined.
The Esper engine posts events to registered UpdateListener instances ('push' method for receiving events). For all statements events can also be pulled from statements via the safeIterator and iterator methods. Both pull and push supply EventBean instances representing the events generated by the engine or events supplied to the engine. Each EventBean instance represents an event, with each event being either an artificial event, composite event or an event supplied to the engine via its runtime interface.
The getEventType method supplies an event's event type information represented by an EventType instance. The EventType supplies event property names and types as well as information about the underlying object to the event.
The engine may generate artificial events that contain information derived from event streams. A typical example for artificial events is the events posted for a statement to calculate univariate statistics on an event property. The below example shows such a statement and queries the generated events for an average value.
// Derive univariate statistics on price for the last 100 market data events String stmt = "select * from MarketDataBean(symbol='IBM').win:length(100).stat:uni('price')"; EPStatement priceStatsView = epService.getEPAdministrator().createEQL(stmt); priceStatsView.addListener(testListener);
// Example listener code public class MyUpdateListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { // Interrogate events System.out.println("new average price=" + newData[0].get("average"); } }
Composite events are events that aggregate one or more other events. Composite events are typically created by the engine for statements that join two event streams, and for event patterns in which the causal events are retained and reported in a composite event. The example below shows such an event pattern.
// Look for a pattern where BEvent follows AEvent String pattern = "a=AEvent -> b=BEvent"; EPStatement stmt = epService.getEPAdministrator().createPattern(pattern); stmt.addListener(testListener);
// Example listener code public class MyUpdateListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("a event=" + newData[0].get("a").getUnderlying()); System.out.println("b event=" + newData[0].get("b").getUnderlying()); } }
Note that the update method can receive multiple events at once as it accepts an array of EventBean instances. For example, a time batch window may post multiple events to listeners representing a batch of events received during a given time period.
Pattern statements can also produce multiple events delivered to update listeners in one invocation. The pattern statement below, for instance, delivers an event for each A event that was not followed by a B event with the same id property within 60 seconds of the A event. The engine may deliver all matching A events as an array of events in a single invocation of the update method of each listener to the statement:
every a=A -> (timer:interval(60 sec) and not B(id=a.id))
Esper is designed from the ground up to operate as a component to multi-threaded, highly-concurrent applications that require efficient use of Java VM resources. In addition, multi-threaded execution requires guarantees in predictability of results and deterministic processing. This section discusses these concerns in detail.
In Esper, an engine instance is a unit of separation. Applications can obtain and discard (initialize) one or more engine instances within the same Java VM and can provide the same or different engine configurations to each instance. An engine instance efficiently shares resources between statements. For example, consider two statements that declare the same data window. The engine matches up view declarations provided by each statement and can thus provide a single data window representation shared between the two statements.
Applications can use Esper APIs to concurrently, by multiple threads of execution, perform such functions as creating and managing statements, or sending events into an engine instance for processing. Applications can use one or more thread pools or any set of same or different threads of execution with any of the public Esper APIs. There are no restrictions towards threading other then those noted in specific sections of this document.
Applications using Esper retain full control over threading, allowing an engine to be easily embedded and used as a component or library in your favorite Java container or process. It is up to the application code to use multiple threads for processing events by the engine, if so desired. All event processing takes places within your application thread call stack. The exception is timer-based processing if your engine instance relies on the internal timer (default).
The fact that event processing takes places within an application thread call stack makes developing applications with Esper easier: Any common Java integrated development environment (IDE) can host an Esper engine instance. This allows developers to easily set up test cases, debug through listener code and inspect input or output events, or trace their call stack.
To send events into an engine concurrently by multiple execution threads, typically applications use the Java java.lang.Thread or java.lang.Runnable classes or Java 5 concurrent utilities that include abstractions for thread pools and blocking in-memory queues.
Each engine instance maintains a single timer thread (internal timer) providing for time or schedule-based processing within the engine. The default resolution at which the timer operates is 100 milliseconds. The internal timer thread can be disabled and applications can instead send external time events to an engine instance to perform timer or scheduled processing at the resolution required by an application.
Each engine instance performs minimal locking to enable high levels of concurrency. An engine instance locks on a statement level to protect statement resources.
For an engine instance to produce predictable results from the viewpoint of listeners to statements, an engine instance by default ensures that it dispatches statement result events to listeners in the order in which a statement produced result events. Applications that require the highest possible concurrency and do not require predictable order of delivery of events to listeners, this feature can be turned off via configuration.
In multithreaded environments, when one or more statements make result events available via the insert into clause to further statements, the engine preserves the order of events inserted into the generated insert-into stream, allowing statements that consume other statement's events to behave deterministic. This feature can also be turned off via configuration.
We generally recommended that listener implementations do not block. By implementing listener code as non-blocking code execution threads can often achieve higher levels of concurrency.
The statement object model is a set of classes that provide an object-oriented representation of an EQL or pattern statement. The object model classes are found in package net.esper.client.soda. An instance of EPStatementObjectModel represents a statement's object model.
The statement object model classes are a full and complete specification of a statement. All EQL and pattern constructs including expressions and sub-queries are available via the statement object model.
In conjunction with the administrative API, the statement object model provides the means to build, change or interrogate statements beyond the EQL or pattern syntax string representation. The object graph of the statement object model is fully navigable for easy querying by code, and is also serializable allowing applications to persist or transport statements in object form, when required.
The statement object model supports full round-trip from object model to EQL statement string and back to object model: A statement object model can be rendered into an EQL string representation via the toEQL method on EPStatementObjectModel. Further, the administrative API allows to compile a statement string into an object model representation via the compileEQL method on EPAdministrator.
The create method on EPAdministrator creates and starts a statement as represented by an object model. In order to obtain an object model from an existing statement, obtain the statement expression text of the statement via the getText method on EPStatement and use the compileEQL method to obtain the object model.
The following limitations apply:
Statement object model classes are not safe for sharing between threads other then for read access.
Between versions of Esper, the serialized form of the object model is subject to change. Esper makes no guarantees that the serialized object model of one version will be fully compatible with the serialized object model generated by another version of Esper. Please consider this issue when storing Esper object models in persistent store.
A EPStatementObjectModel consists of an object graph representing all possible clauses that can be part of an EQL statement.
Among all clauses, the SelectClause and FromClause objects are required clauses that must be present, in order to define what to select and where to select from.
Table 9.1. Required Statement Object Model Instances
Class | Description |
---|---|
EPStatementObjectModel | All statement clauses for a statement, such as the select-clause and the from-clause, are specified within the object graph of an instance of this class |
SelectClause | A list of the selection properties or expressions, or a wildcard |
FromClause | A list of one or more streams; A stream can be a filter-based, a pattern-based or a SQL-based stream; Views are added to streams to provide data window or other projections |
Part of the statement object model package are convenient builder classes that make it easy to build a new object model or change an existing object model. The SelectClause and FromClause are such builder classes and provide convenient create methods.
Within the from-clause we have a choice of different streams to select on. The FilterStream class represents a stream that is filled by events of a certain type and that pass an optional filter expression.
We can use the classes introduced above to create a simple statement object model:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setSelectClause(SelectClause.createWildcard()); model.setFromClause(FromClause.create(FilterStream.create("com.chipmaker.ReadyEvent")));
The model as above is equivalent to the EQL:
select * from com.chipmaker.ReadyEvent
Last, the code snippet below creates a statement from the object model:
EPStatement stmt = epService.getEPAdministrator().create(model);
The EPStatementObjectModel includes an optional where-clause. The where-clause is a filter expression that the engine applies to events in one or more streams. The key interface for all expressions is the Expression interface.
The Expressions class provides a convenient way of obtaining Expression instances for all possible expressions. Please consult the JavaDoc for detailed method information. The next example discusses sample where-clause expressions.
Use the Expressions class as a service for creating expression instances, and add additional expressions via the add method that most expressions provide.
In the next example we add a simple where-clause to the EQL as shown earlier:
select * from com.chipmaker.ReadyEvent where line=8
And the code to add a where-clause to the object model is below.
model.setWhereClause(Expressions.eq("line", 8));
The following example considers a more complex where-clause. Assume we need to build an expression using logical-and and logical-or:
select * from com.chipmaker.ReadyEvent where (line=8) or (line=10 and age<5)
The code for building such a where-clause by means of the object model classes is:
model.setWhereClause(Expressions.or() .add(Expressions.eq("line", 8)) .add(Expressions.and() .add(Expressions.eq("line", 10)) .add(Expressions.lt("age", 5)) ));
The Patterns class is a factory for building pattern expressions. It provides convenient methods to create all pattern expressions of the pattern language.
Patterns in EQL are seen as a stream of events that consist of patterns matches. The PatternStream class represents a stream of pattern matches and contains a pattern expression within.
For instance, consider the following pattern statement.
select * from pattern [every a=MyAEvent and not b=MyBEvent]
The next code snippet outlines how to use the statement object model and specifically the Patterns class to create a statement object model that is equivalent to the pattern statement above.
EPStatementObjectModel model = new EPStatementObjectModel(); model.setSelectClause(SelectClause.createWildcard()); PatternExpr pattern = Patterns.and() .add(Patterns.everyFilter("MyAEvent", "a")) .add(Patterns.notFilter("MyBEvent", "b")); model.setFromClause(FromClause.create(PatternStream.create(pattern)));
In this section we build a complete example statement and include all optional clauses in one EQL statement, to demonstrate the object model API.
A sample statement:
insert into ReadyStreamAvg(line, avgAge) select line, avg(age) as avgAge from com.chipmaker.ReadyEvent(line in (1, 8, 10)).win:time(10) as RE where RE.waverId != null group by line having avg(age) < 0 output every 10.0 seconds order by line
Finally, this code snippet builds the above statement from scratch:
EPStatementObjectModel model = new EPStatementObjectModel(); model.setInsertInto(InsertIntoClause.create("ReadyStreamAvg", "line", "avgAge")); model.setSelectClause(SelectClause.create() .add("line") .add(Expressions.avg("age"), "avgAge")); Filter filter = Filter.create("com.chipmaker.ReadyEvent", Expressions.in("line", 1, 8, 10)); model.setFromClause(FromClause.create( FilterStream.create(filter, "RE").addView("win", "time", 10))); model.setWhereClause(Expressions.isNotNull("RE.waverId")); model.setGroupByClause(GroupByClause.create("line")); model.setHavingClause(Expressions.lt(Expressions.avg("age"), Expressions.constant(0))); model.setOutputLimitClause(OutputLimitClause.create(10, OutputLimitUnit.SECONDS)); model.setOrderByClause(OrderByClause.create("line"));
The prepare method that is part of the administrative API pre-compiles an EQL statement and stores the precompiled statement in an EPPreparedStatement object. This object can then be used to efficiently start the parameterized statement multiple times.
Substitution parameters are inserted into an EQL statement as a single question mark character '?'. The engine assigns the first substitution parameter an index of 1 and subsequent parameters increment the index by one.
Substitution parameters can be inserted into any EQL construct that takes an expression. They are therefore valid in any clauses such as the select-clause, from-clause filters, where-clause, group-by-clause, having-clause or order-by-clause. Substitution parameters cannot be used as parameters to views, pattern observers and guards. They also cannot be used where a numeric constant is required rather then an expression.
All substitution parameters must be replaced by actual values before a statement with substitution parameters can be started. Substitution parameters can be replaced with an actual value using the setObject method for each index. Substitution parameters can be set to new values and new statements can be created from the same EPPreparedStatement object more then once.
While the setObject method allows substitution parameters to assume any actual value including application Java objects or enumeration values, the application must provide the correct type of substitution parameter that matches the requirements of the expression the parameter resides in.
In the following example of setting parameters on a prepared statement and starting the prepared statement, epService represents an engine instance:
String stmt = "select * from com.chipmaker.ReadyEvent(line=?)"; EPPreparedStatement prepared = epService.getEPAdministrator().prepareEQL(stmt); prepared.setObject(1, 8); EPStatement statement = epService.getEPAdministrator().create(prepared);