Chapter 12. Examples, Tutorials, Case Studies

12.1. Examples Overview

This chapter outlines the examples that come with Esper in the examples/src folder of the distribution. The code for examples can be found in the com.espertech.esper.example packages.

In order to compile and run the samples please follow the below instructions:

  1. Make sure Java 1.5 or greater is installed and the JAVA_HOME environment variable is set.

  2. Open a console window and change directory to examples/etc.

  3. Run "setenv.bat" (Windows) or "setenv.sh" (Unix) to verify your environment settings.

  4. Run "compile.bat" (Windows) or "compile.sh" (Unix) to compile the examples.

  5. Now you are ready to run the examples. Some examples require mandatory parameters. Further information to running each example can be found in the "examples/etc" folder in file "readme.txt".

  6. Modify the logger logging level in the "log4j.xml" configuration file changing DEBUG to INFO on a class or package level to reduce the volume of text output.

JUnit tests exist for the example code. The JUnit test source code for the examples can be found in the examples/test folder. To build and run the example JUnit tests, use the Maven 2 goal test. The JUnit test source code can also be helpful in understanding the example and in the use of Esper APIs.

12.2. Market Data Feed Monitor

This example processes a raw market data feed. It reports throughput statistics and detects when the data rate of a feed falls off unexpectedly. A rate fall-off may mean that the data is stale and we want to alert when there is a possible problem with the feed.

The classes for this example live in package com.espertech.esper.example.marketdatafeed. Run "run_mktdatafeed.bat" (Windows) or "run_mktdatafeed.sh" (Unix) in the examples/etc folder to start the market data feed simulator.

12.2.1. Input Events

The input stream consists of 1 event stream that contains 2 simulated market data feeds. Each individual event in the stream indicates the feed that supplies the market data, the security symbol and some pricing information:

String symbol;
FeedEnum feed;
double bidPrice;
double askPrice;

12.2.2. Computing Rates Per Feed

For the throughput statistics and to detect rapid fall-off we calculate a ticks per second rate for each market data feed.

We can use an EPL statement that specifies a view onto the market data event stream that batches together 1 second of events. We specify the feed and a count of events per feed as output values. To make this data available for further processing, we insert output events into the TicksPerSecond event stream:

insert into TicksPerSecond
select feed, count(*) as cnt 
  from MarketDataEvent.win:time_batch(1 second) 
 group by feed

12.2.3. Detecting a Fall-off

We define a rapid fall-off by alerting when the number of ticks per second for any second falls below 75% of the average number of ticks per second over the last 10 seconds.

We can compute the average number of ticks per second over the last 10 seconds simply by using the TicksPerSecond events computed by the prior statement and averaging the last 10 seconds. Next, we compare the current rate with the moving average and filter out any rates that fall below 75% of the average:

select feed, avg(cnt) as avgCnt, cnt as feedCnt 
  from TicksPerSecond.win:time(10 seconds)
 group by feed 
having cnt < avg(cnt) * 0.75

12.2.4. Event generator

The simulator generates market data events for 2 feeds, feed A and feed B. The first parameter to the simulator is a number of threads. Each thread sends events for each feed in an endless loop. Note that as the Java VM garbage collection kicks in, the example generates rate drop-offs during such pauses.

The second parameter is a rate drop probability parameter specifies the probability in percent that the simulator drops the rate for a randomly chosen feed to 60% of the target rate for that second. Thus rate fall-off alerts can be generated.

The third parameter defines the number of seconds to run the example.

12.3. JMS Server Shell and Client

12.3.1. Overview

The server shell is a Java Messaging Service (JMS) -based server that listens to messages on a JMS destination, and sends the received events into Esper. The example also demonstrates a Java Management Extension (JMX) MBean that allows remote dynamic statement management. This server has been designed to run with either Tibco (TM) Enterprise Messaging System (Tibco EMS), or with Apache ActiveMQ, controlled by a properties file.

The server shell has been created as an alternative to the EsperIO Spring JMSTemplate adapter. The server shell is a low-latency processor for byte messages. It employs JMS listeners to process message in multiple threads, this model reduces thread context switching for many JMS providers. The server is configurable and has been tested with two JMS providers. It consists of only 10 classes and is thus easy to understand.

The server shell sample comes with a client (server shell client) that sends events into the JMS-based server, and that also creates a statement on the server remotely through a JMX MBean proxy class.

The server shell classes for this example live in package com.espertech.esper.example.servershell. Configure the server to point to your JMS provider by changing the properties in the file servershell_config.properties in the etc folder. Make sure your JMS provider (ActiveMQ or Tibco EMS) is running, then run "run_servershell.bat" (Windows) or "run_servershell.sh" (Unix) to start the JMS server.

Start the server shell process first before starting the client, since the client also demonstrates remote statement management through JMX by attaching to the server process.

The client classes to the server shell can be found in package com.espertech.esper.example.servershellclient. The client shares the same configuration file as the server shell. Run "run_servershellclient.bat" (Windows) or "run_servershellclient.sh" (Unix) to start the JMS producer client that includes a JMX client as well.

12.3.2. JMS Messages as Events

The server shell starts a configurable number of JMS MessageListener instances that listen to a given JMS destination. The listeners expect a BytesMessage that contain a String payload. The payload consists of an IP address and a double-typed duration value separated by a comma.

Each listener extracts the payload of a message, constructs an event object and sends the event into the shared Esper engine instance.

At startup time, the server creates a single EPL statement with the Esper engine that prints out the average duration per IP address for the last 10 seconds of events, and that specifies an output rate of 2 seconds. By running the server and then the client, you can see the output of the averages every 2 seconds.

The server shell client acts as a JMS producer that sends 1000 events with random IP addresses and durations.

12.3.3. JMX for Remote Dynamic Statement Management

The server shell is also a JMX server providing an RMI-based connector. The server shell exposes a JMX MBean that allows remote statement management. The JMX MBean allows to create a statement remotely, and to attach a serializable listener to the statement remotely, and to destroy a statement remotely.

The server shell client, upon startup, obtains a remote instance of the management MBean exposed by the server shell. It creates a statement through the MBean that filters out all durations greater then the value 9.9. After sending 1000 events, the client then destroys the statement remotely on the server.

12.4. Transaction 3-Event Challenge

The classes for this example live in package com.espertech.esper.example.transaction. Run "run_txnsim.bat" (Windows) or "run_txnsim.sh" (Unix) to start the transaction simulator. Please see the readme file in the same folder for build instructions and command line parameters.

12.4.1. The Events

The use case involves tracking three components of a transaction. It‘s important that we use at least three components, since some engines have different performance or coding for only two events per transaction. Each component comes to the engine as an event with the following fields:

  • Transaction ID

  • Time stamp

In addition, we have the following extra fields:

In event A:

  • Customer ID

In event C:

  • Supplier ID (the ID of the supplier that the order was filled through)

12.4.2. Combined event

We need to take in events A, B and C and produce a single, combined event with the following fields:

  • Transaction ID

  • Customer ID

  • Time stamp from event A

  • Time stamp from event B

  • Time stamp from event C

What we‘re doing here is matching the transaction IDs on each event, to form an aggregate event. If all these events were in a relational database, this could be done as a simple SQL join… except that with 10,000 events per second, you will need some serious database hardware to do it.

12.4.3. Real time summary data

Further, we need to produce the following:

  • Min,Max,Average total latency from the events (difference in time between A and C) over the past 30 minutes.

  • Min,Max,Average latency grouped by (a) customer ID and (b) supplier ID. In other words, metrics on the the latency of the orders coming from each customer and going to each supplier.

  • Min,Max,Average latency between events A/B (time stamp of B minus A) and B/C (time stamp of C minus B).

12.4.4. Find problems

We need to detect a transaction that did not make it through all three events. In other words, a transaction with events A or B, but not C. Note that, in this case, what we care about is event C. The lack of events A or B could indicate a failure in the event transport and should be ignored. Although the lack of an event C could also be a transport failure, it merits looking into.

12.4.5. Event generator

To make testing easier, standard and to demonstrate how the example works, the example is including an event generator. The generator generates events for a given number of transactions, using the following rules:

  • One in 5,000 transactions will skip event A

  • One in 1,000 transactions will skip event B

  • One in 10,000 transactions will skip event C.

  • Transaction identifiers are randomly generated

  • Customer and supplier identifiers are randomly chosen from two lists

  • The time stamp on each event is based on the system time. Between events A and B as well as B and C, between 0 and 999 is added to the time. So, we have an expected time difference of around 500 milliseconds between each event

  • Events are randomly shuffled as described below

To make things harder, we don‘t want transaction events coming in order. This code ensures that they come completely out of order. To do this, we fill in a bucket with events and, when the bucket is full, we shuffle it. The buckets are sized so that some transactions‘ events will be split between buckets. So, you have a fairly randomized flow of events, representing the worst case from a big, distributed infrastructure.

The generator lets you change the size of the bucket (small, medium, large, larger, largerer). The larger the bucket size, the more events potentially come in between two events in a given transaction and so, the more the performance characteristics like buffers, hashes/indexes and other structures are put to the test as the bucket size increases.

12.5. J2EE Self-Service Terminal Management

The example is about a J2EE-based self-service terminal managing system in an airport that gets a lot of events from connected terminals. The event rate is around 500 events per second. Some events indicate abnormal situations such as 'paper low' or 'terminal out of order'. Other events observe activity as customers use a terminal to check in and print boarding tickets.

12.5.1. Events

Each self-service terminal can publish any of the 6 events below.

  • Checkin - Indicates a customer started a check-in dialog

  • Cancelled - Indicates a customer cancelled a check-in dialog

  • Completed - Indicates a customer completed a check-in dialog

  • OutOfOrder - Indicates the terminal detected a hardware problem

  • LowPaper - Indicates the terminal is low on paper

  • Status - Indicates terminal status, published every 1 minute regardless of activity as a terminal heartbeat

All events provide information about the terminal that published the event, and a timestamp. The terminal information is held in a property named "term" and provides a terminal id. Since all events carry similar information, we model each event as a subtype to a base class BaseTerminalEvent, which will provide the terminal information that all events share. This enables us to treat all terminal events polymorphically, that is we can treat derived event types just like their parent event types. This helps simplify our queries.

All terminals publish Status events every 1 minute. In normal cases, the Status events indicate that a terminal is alive and online. The absence of status events may indicate that a terminal went offline for some reason and that may need to be investigated.

12.5.2. Detecting Customer Check-in Issues

A customer may be in the middle of a check-in when the terminal detects a hardware problem or when the network goes down. In that situation we want to alert a team member to help the customer. When the terminal detects a problem, it issues an OutOfOrder event. A pattern can find situations where the terminal indicates out-of-order and the customer is in the middle of the check-in process:

select * from pattern [ every a=Checkin -> 
      ( OutOfOrder(term.id=a.term.id) and not 
          (Cancelled(term.id=a.term.id) or Completed(term.id=a.term.id)) )]

12.5.3. Absence of Status Events

Since Status events arrive in regular intervals of 60 seconds, we can make us of temporal pattern matching using timer to find events that didn't arrive. We can use the every operator and timer:interval() to repeat an action every 60 seconds. Then we combine this with a not operator to check for absence of Status events. A 65 second interval during which we look for Status events allows 5 seconds to account for a possible delay in transmission or processing:

select 'terminal 1 is offline' from pattern 
  [every timer:interval(60 sec) -> (timer:interval(65 sec) and not Status(term.id = 'T1'))]
output first every 5 minutes

12.5.4. Activity Summary Data

By presenting statistical information about terminal activity to our staff in real-time we enable them to monitor the system and spot problems. The next example query simply gives us a count per event type every 1 minute. We could further use this data, available through the CountPerType event stream, to join and compare against a recorded usage pattern, or to just summarize activity in real-time.

insert into CountPerType
select type, count(*) as countPerType 
from BaseTerminalEvent.win:time(10 minutes) 
group by type
output all every 1 minutes

12.5.5. Sample Application for J2EE Application Server

The example code in the distribution package implements a message-driven enterprise java bean (MDB EJB). We used an MDB as a convenient place for processing incoming events via a JMS message queue or topic. The example uses 2 JMS queues: One queue to receive events published by terminals, and a second queue to indicate situations detected via EPL statement and listener back to a receiving process.

This example has been packaged for deployment into a JBoss Java application server (see http://www.jboss.org) with default deployment configuration. JBoss is an open-source application server available under LGPL license. Of course the choice of application server does not indicate a requirement or preference for the use of Esper in a J2EE container. Other quality J2EE application servers are available and perhaps more suitable to run this example or a similar application.

The complete example code can be found in the "examples/terminalsvc" folder of the distribution. The Java package name is com.espertech.esper.example.terminalsvc.

12.5.5.1. Running the Example

The pre-build EAR file contains the MDB for deployment to a JBoss application server with default deployment options. The JBoss default configuration provides 2 queues that this example utilizes: queue/A and queue/B. The queue/B is used to send events into the MDB, while queue/A is used to indicate back the any data received by listeners to EPL statements.

The application can be deployed by copying the ear file in the "examples/terminalsvc/terminalsvc-ear" folder to your JBoss deployment directory located under the JBoss home directory under "server/default/deploy".

The example contains an event simulator and an event receiver that can be invoked from the command line. See the folder "examples/terminalsvc/etc" folder readme file and start scripts for Windows and Unix, and the documentation set for further information on the simulator.

12.5.5.2. Building the Example

This example requires Maven 2 to build. To build the example, change directory to the folder "examples/terminalsvc" and type "mvn package". The instructions have been tested with JBoss AS 4.0.4.GA and Maven 2.0.4.

The Maven build packages the EAR file for deployment to a JBoss application server with default deployment options.

12.5.5.3. Running the Event Simulator and Receiver

The example also contains an event simulator that generates meaningful events. The simulator can be run from the directory "examples/terminalsvc/etc" via the command "run_terminalsvc_sender.bat" (Windows) and "run_terminalsvc_sender.sh" (Linux). The event simulator generates a batch of at least 200 events every 1 second. Randomly, with a chance of 1 in 10 for each batch of events, the simulator generates either an OutOfOrder or a LowPaper event for a random terminal. Each batch the simulator generates 100 random terminal ids and generates a Checkin event for each. It then generates either a Cancelled or a Completed event for each. With a chance of 1 in 1000, it generates an OutOfOrder event instead of the Cancelled or Completed event for a terminal.

The event receiver listens to the MDB-outcoming queue for alerts and prints these out to console. The receiver can be run from the directory "examples/terminalsvc/etc" via the command "run_terminalsvc_receiver.bat" (Windows) and "run_terminalsvc_receiver.sh" (Linux).

12.6. Assets Moving Across Zones - An RFID Example

This example out of the RFID domain processes location report events. Each location report event indicates an asset id and the current zone of the asset. The example solves the problem that when a given set of assets is not moving together from zone to zone, then an alert must be fired.

Each asset group is tracked by 2 statements. The two statements to track a single asset group consisting of assets identified by asset ids {1, 2, 3} are as follows:

insert into CountZone_G1
select 1 as groupId, zone, count(*) as cnt
from LocationReport(assetId in 1, 2, 3).std:unique(assetId)
group by zone

select Part.zone from pattern [
  every Part=CountZone_G1(cnt in (1,2)) ->
    (timer:interval(10 sec)  and not CountZone_G1(zone=Part.zone, cnt in (0,3)))]

The classes for this example can be found in package com.espertech.esper.example.rfid.

This example provides a Swing-based GUI that can be run from the command line. The GUI allows drag-and-drop of three RFID tags that form one asset group from zone to zone. Each time you move an asset across the screen the example sends an event into the engine indicating the asset id and current zone. The example detects if within 10 seconds the three assets do not join each other within the same zone, but stay split across zones. Run "run_rfid_swing.bat" (Windows) or "run_rfid_swing.sh" (Unix) to start the example's Swing GUI.

The example also provides a simulator that can be run from the command line. The simulator generates a number of asset groups as specified by a command line argument and starts a number of threads as specified by a command line argument to send location report events into the engine. Run "run_rfid_sim.bat" (Windows) or "run_rfid_sim.sh" (Unix) to start the RFID location report event simulator. Please see the readme file in the same folder for build instructions and command line parameters.

12.7. AutoID RFID Reader generating XML documents

In this example an array of RFID readers sense RFID tags as pallets are coming within the range of one of the readers. A reader generates XML documents with observation information such as reader sensor ID, observation time and tags observed. A statement computes the total number of tags per reader sensor ID within the last 60 seconds.

This example demonstrates how XML documents unmarshalled to org.w3c.dom.Node DOM document nodes can natively be processed by the engine without requiring Java object event representations. The example uses an XPath expression for an event property counting the number of tags observed by a sensor. The XML documents follow the AutoID (http://www.autoid.org/) organization standard.

The classes for this example can be found in package com.espertech.esper.example.autoid. As events are XML documents with no Java object representation, the example does not have event classes.

A simulator that can be run from the command line is also available for this example. The simulator generates a number of XML documents as specified by a command line argument and prints out the totals per sensor. Run "run_autoid.bat" (Windows) or "run_autoid.sh" (Unix) to start the autoid simulator. Please see the readme file in the same folder for build instructions and command line parameters.

The code snippet below shows the simple statement to compute the total number of tags per sensor. The statement is created by class com.espertech.esper.example.autoid.RFIDTagsPerSensorStmt.

select ID as sensorId, sum(countTags) as numTagsPerSensor
from AutoIdRFIDExample.win:time(60 seconds)
where Observation[0].Command = 'READ_PALLET_TAGS_ONLY'
group by ID

12.8. StockTicker

The StockTicker example comes from the stock trading domain. The example creates event patterns to filter stock tick events based on price and symbol. When a stock tick event is encountered that falls outside the lower or upper price limit, the example simply displays that stock tick event. The price range itself is dynamically created and changed. This is accomplished by an event patterns that searches for another event class, the price limit event.

The classes com.espertech.esper.example.stockticker.event.StockTick and PriceLimit represent our events. The event patterns are created by the class com.espertech.esper.example.stockticker.monitor.StockTickerMonitor.

Summary:

  • Good example to learn the API and get started with event patterns

  • Dynamically creates and removes event patterns based on price limit events received

  • Simple, highly-performant filter expressions for event properties in the stock tick event such as symbol and price

12.9. MatchMaker

In the MatchMaker example every mobile user has an X and Y location, a set of properties (gender, hair color, age range) and a set of preferences (one for each property) to match. The task of the event patterns created by this example is to detect mobile users that are within proximity given a certain range, and for which the properties match preferences.

The event class representing mobile users is com.espertech.esper.example.matchmaker.event.MobileUserBean. The com.espertech.esper.example.matchmaker.monitor.MatchMakingMonitor class contains the patterns for detecing matches.

Summary:

  • Dynamically creates and removes event patterns based on mobile user events received

  • Uses range matching for X and Y properties of mobile user events

12.10. QualityOfService

This example develops some code for measuring quality-of-service levels such as for a service-level agreement (SLA). A SLA is a contract between 2 parties that defines service constraints such as maximum latency for service operations or error rates.

The example measures and monitors operation latency and error counts per customer and operation. When one of our operations oversteps these constraints, we want to be alerted right away. Additionally, we would like to have some monitoring in place that checks the health of our service and provides some information on how the operations are used.

Some of the constraints we need to check are:

  • That the latency (time to finish) of some of the operations is always less then X seconds.

  • That the latency average is always less then Y seconds over Z operation invocations.

The com.espertech.esper.example.qos_sla.events.OperationMeasurement event class with its latency and status properties is the main event used for the SLA analysis. The other event LatencyLimit serves to set latency limits on the fly.

The com.espertech.esper.example.qos_sla.monitor.AverageLatencyMonitor creates an EPL statement that computes latency statistics per customer and operation for the last 100 events. The DynaLatencySpikeMonitor uses an event pattern to listen to spikes in latency with dynamically set limits. The ErrorRateMonitor uses the timer 'at' operator in an event pattern that wakes up periodically and polls the error rate within the last 10 minutes. The ServiceHealthMonitor simply alerts when 3 errors occur, and the SpikeAndErrorMonitor alerts when a fixed latency is overstepped or an error status is reported.

Summary:

  • This example combines event patterns with EPL statements for event stream analysis.

  • Shows the use of the timer 'at' operator and followed-by operator -> in event patterns

  • Outlines basic EPL statements

  • Shows how to pull data out of EPL statements rather then subscribing to events a statement publishes

12.11. LinearRoad

The Linear Road example is a very incomplete implementation of the Stream Data Management Benchmark [3] by Standford University.

Linear Road simulates a toll system for the motor vehicle expressways of a large metropolitan area. The main event in this example is a car location report which the class com.espertech.esper.example.linearroad.CarLocEvent represents. Currently the event stream joins are performed by JUnit test classes in the examples/test folder. See the com.espertech.esper.example.linearroad.TestAccidentNotify and the TestCarSegmentCount classes. Please consider this a work in progress.

Summary:

  • Shows more complex joins between event streams.

12.12. StockTick RSI

The RSI gives you the trend for a stock and for more complete explanation, you can visit the link: http://www.stockcharts.com/education/IndicatorAnalysis/indic_RSI.html.

After a definite number of stock events, or accumulation period, the first RSI is computed. Then for each subsequent stock event, the RSI calculations use the previous period’s Average Gain and Loss to determine the “smoothed RSI”.

Summary:

  • Uses a simple event pattern with a filter which feeds a listener that computes the RSI, which publishes events containing the computed RSI.


© 2007 EsperTech Inc. All Rights Reserved