Esper has 2 primary interfaces that this section outlines: The administrative interface and the runtime interface.
Use Esper's administrative interface to create and manage EQL and pattern statements, and set runtime configurations, as discussed in Section 6.1, “EQL Introduction” and Section 7.1, “Event Pattern Overview”.
Use Esper's runtime interface to send events into the engine, emit events and get statistics for an engine instance.
The JavaDoc documentation is also a great source for API information.
Each instance of an Esper engine is completely independent of other engine instances and has its own administrative and runtime interface.
An instance of the Esper engine is obtained via static methods on the EPServiceProviderManager class. The getDefaultProvider method and the getProvider(String URI) methods return an instance of the Esper engine. The latter can be used to obtain multiple instances of the engine for different URI values. The EPServiceProviderManager determines if the URI matches all prior URI values and returns the same engine instance for the same URI value. If the URI has not been seen before, it creates a new engine instance.
The code snipped below gets the default instance Esper engine. Subsequent calls to get the default engine instance return the same instance.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
This code snippet gets an Esper engine for URI RFIDProcessor1. Subsequent calls to get an engine with the same URI return the same instance.
EPServiceProvider epService = EPServiceProviderManager.getProvider("RFIDProcessor1");
An existing Esper engine instance can be reset via the initialize method on the EPServiceProvider instance. This stops and removes all statements in the Engine.
Create event pattern expression and EQL statements via the administrative interface EPAdministrator.
This code snippet gets an Esper engine then creates an event pattern and an EQL statement.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPStatement 10secRecurTrigger = admin.createPattern( "every timer:at(*, *, *, *, *, */10)"); EPStatement countStmt = admin.createEQL( "select count(*) from MarketDataBean.win:time(60 sec)");
Note that event pattern expressions can also occur within EQL statements. This is outlined in more detail in Section 6.4.2, “Pattern-based event streams”.
The create methods on EPAdministrator are overloaded and allow an optional statement name to be passed to the engine. A statement name can be useful for retrieving a statement by name from the engine at a later time. The engine assigns a statement name if no statement name is supplied on statement creation.
The createPattern and createEQL methods return EPStatement instances. Statements are automatically started and active when created. A statement can also be stopped and started again via the stop and start methods shown in the code snippet below.
countStmt.stop(); countStmt.start();
We can subscribe to updates posted by a statement via the addListener and removeListener methods on EPStatement . We need to provide an implementation of the UpdateListener or the StatementAwareUpdateListener interface to the statement:
UpdateListener myListener = new MyUpdateListener(); countStmt.addListener(myListener);
EQL statements and event patterns publish old data and new data to registered UpdateListener listeners. New data published by statements is the events representing the new values of derived data held by the statement. Old data published by statements constists of the events representing the prior values of derived data held by the statement.
A second listener interface is the StatementAwareUpdateListener interface. A StatementAwareUpdateListener is especially useful for registering the same listener object with multiple statements, as the listener receives the statement instance and engine instance in addition to new and old data when the engine indicates new results to a listener.
StatementAwareUpdateListener myListener = new MyStmtAwareUpdateListener(); statement.addListener(myListener);
To indicate results the engine invokes this method on StatementAwareUpdateListener listeners: update(EventBean[] newEvents, EventBean[] oldEvents, EPStatement statement, EPServiceProvider epServiceProvider)
Subscribing to events posted by a statement is following a push model. The engine pushes data to listeners when events are received that cause data to change or patterns to match. Alternatively, statements can also serve up data in a pull model via the iterator method. This can come in handy if we are not interested in all new updates, but only want to perform a frequent poll for the latest data. For example, an event pattern that fires every 5 seconds could be used to pull data from an EQL statement. The code snippet below demonstrates some pull code.
Iterator<EventBean> eventIter = countStmt.iterator(); for (EventBean event : eventIter) { // .. do something .. }
This is a second example:
double averagePrice = (Double) eqlStatement.iterator().next().get("average");
The iterator method can be used to pull results out of most statements, including statements that contain aggregation functions, pattern statements, and statements that contain a where clause, group by clause, having clause or order by clause.
For statements without an order by clause, the iterator method returns events in the order maintained by the data window. For statements that contain an order by clause, the iterator method returns events in the order indicated by the order by clause.
Esper places the following restrictions on the pull API and usage of the iterator method:
EQL statements joining multiple event streams do not support the pull API.
Since the iterator method returns events to the application immediately, the iterator does not honor an output rate limiting clause, if present.
In multithreaded applications, the iterator method does not hold any locks and modifications to the underlying data window may throw runtime exceptions in the face of concurrent modifications.
The EPAdministrator interface provides the facilities for managing statements:
Use the getStatement method to obtain an existing started or stopped statement by name
Use the getStatementNames methods to obtain a list of started and stopped statement names
Use the startAllStatements, stopAllStatements and destroyAllStatements methods to manage all statements in one operation
Certain configuration changes are available to perform on an engine instance while in operation. Such configuration operations are available via the getConfiguration method on EPAdministrator, which returns an ConfigurationOperations object.
The configuration operations available on a running engine instance are as follows. Please see Chapter 2, Configuration for more information.
Add an new event type for a JavaBean class, legacy Java class or custom Java class
Add an new DOM XML event type
Add an new Map-based event type
The EPRuntime interface is used to send events for processing into an Esper engine, and to emit Events from an engine instance to the outside world.
The below code snippet shows how to send a Java object event to the engine. Note that the sendEvent method is overloaded. As events can take on different representation classes in Java, the sendEvent takes parameters to reflect the different types of events that can be send into the engine. The Chapter 5, Event Representations section explains the types of events accepted.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); // Send an example event containing stock market data runtime.sendEvent(new MarketDataBean('IBM', 75.0));
Events, in theoretical terms, are observations of a state change that occured in the past. Since one cannot change an event that happened in the past, events are best modelled as immutable objects.
Note that the Esper engine relies on events that are sent into an engine to not change their state. Typically, applications create a new event object for every new event, to represent that new event. Application should not modify an existing event that was sent into the engine.
Another important method in the runtime interface is the route method. This method is designed for use by UpdateListener implementations that need to send events into an engine instance.
The emit and addEmittedListener methods can be used to emit events from a runtime to a registered set of one or more emitted event listeners. This mechanism is available as a service to enable channel-based publish-subscribe of events emitted from an engine instance via the emit method. Emitting events is not integrated with EQL and is available only via the EPRuntime interface. Events are emitted on an event channel identified by a name. Listeners are implementations of the EmittedListener interface. Via the addEmittedListener method a listener can be added to the specified event channel. The lister receives only those events posted to that channel. The channel parameter to addEmittedListener also allows null values. If a null channel value is specified, the listeners receives emitted events posted on any channel.
Special events are provided that can be used to control the time-keeping of an engine instance. There are two models for an engine to keep track of time. Internal clocking is when the engine instance relies on the java.util.Timer class for time tick events. External clocking can be used to supply time ticks to the engine. The latter is useful for testing time-based event sequences or for synchronizing the engine with an external time source.
By default, the Esper engine uses internal time ticks. This behavior can be changed by sending a timer control event to the engine as shown below.
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPRuntime runtime = epService.getEPRuntime(); // switch to external clocking runtime.sendEvent(new TimerControlEvent(TimerControlEvent.ClockType.CLOCK_EXTERNAL)); // send a time tick long timeInMillis = System.currentTimeMillis(); // Or get the time somewhere else runtime.sendEvent(new CurrentTimeEvent(timeInMillis));
The Esper engine posts events to registered UpdateListener instances ('push' method for receiving events). For many statements events can also be pulled from statements via the iterator method. Both pull and push supply EventBean instances representing the events generated by the engine or events supplied to the engine. Each EventBean instance represents an event, with each event being either an artificial event, composite event or an event supplied to the engine via its runtime interface.
The getEventType method supplies an event's event type information represented by an EventType instance. The EventType supplies event property names and types as well as information about the underlying object to the event.
The engine may generate artificial events that contain information derived from event streams. A typical example for artificial events is the events posted for a statement to calculate univariate statistics on an event property. The below example shows such a statement and queries the generated events for an average value.
// Derive univariate statistics on price for the last 100 market data events String stmt = "select * from MarketDataBean(symbol='IBM').win:length(100).stat:uni('price')"; EPStatement priceStatsView = epService.getEPAdministrator().createEQL(stmt); priceStatsView.addListener(testListener);
// Example listener code public class MyUpdateListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { // Interrogate events System.out.println("new average price=" + newData[0].get("average"); } }
Composite events are events that aggregate one or more other events. Composite events are typically created by the engine for statements that join two event streams, and for event patterns in which the causal events are retained and reported in a composite event. The example below shows such an event pattern.
// Look for a pattern where BEvent follows AEvent String pattern = "a=AEvent -> b=BEvent"; EPStatement stmt = epService.getEPAdministrator().createPattern(pattern); stmt.addListener(testListener);
// Example listener code public class MyUpdateListener implements UpdateListener { public void update(EventBean[] newData, EventBean[] oldData) { System.out.println("a event=" + newData[0].get("a").getUnderlying()); System.out.println("b event=" + newData[0].get("b").getUnderlying()); } }
Esper is designed from the ground up to operate as a component to multi-threaded, highly-concurrent applications that require efficient use of Java VM resources. In addition, multi-threaded execution requires guarantees in predictability of results and deterministic processing. This section discusses these concerns in detail.
In Esper, an engine instance is a unit of separation. Applications can obtain and discard (initialize) one or more engine instances within the same Java VM and can provide the same or different engine configurations to each instance. An engine instance efficiently shares resources between statements. For example, consider two statements that declare the same data window. The engine matches up view declarations provided by each statements and can thus provide a single data window representation shared between the two statements.
Applications can use Esper APIs to concurrently, by multiple threads of execution, perform such functions as creating and managing statements, or sending events into an engine instance for processing. Applications can use one or more thread pools or any set of same or different threads of execution with any of the public Esper APIs. There are no restrictions towards threading other then those noted in specific sections of this document.
Applications using Esper retain full control over threading, allowing an engine to be easily embedded and used as a component or library in your favorite Java container or process. It is up to the application code to use multiple threads for processing events by the engine, if so desired. All event processing takes places within your application thread call stack. The exception is timer-based processing if your engine instance relies on the internal timer (default).
The fact that event processing takes places within an application thread call stack makes developing applications with Esper easier: Any common Java integrated development environment (IDE) can host an Esper engine instance. This allows developers to easily set up test cases, debug through listener code and inspect input or output events, or trace their call stack.
To send events into an engine concurrently by multiple execution threads, typically applications use the Java java.lang.Thread or java.lang.Runnable classes or Java 5 concurrent utilities that include abstractions for thread pools and blocking in-memory queues.
Each engine instance maintains a single timer thread (internal timer) providing for time or schedule-based processing within the engine. The default resolution at which the timer operates is 100 milliseconds. The internal timer thread can be disabled and applications can instead send external time events to an engine instance to perform timer or scheduled processing at the resolution required by an application.
Each engine instance performs minimal locking to enable high levels of concurrency. An engine instance locks on a statement level to protect statement resources.
For an engine instance to produce predictable results from the viewpoint of listeners to statements, an engine instance by default ensures that it dispatches statement result events to listeners in the order in which a statement produced result events. Applications that require the highest possible concurrency and do not require predictable order of delivery of events to listeners, this feature can be turned off via configuration.
In multithreaded environments, when one or more statements make result events available via the insert into clause to further statements, the engine preserves the order of events inserted into the generated insert-into stream, allowing statements that consume other statement's events to behave deterministic. This feature can also be turned off via configuration.
We generally recommended that listener implementations do not block. By implementing listener code as non-blocking code execution threads can often achieve higher levels of concurrency.