The use case involves tracking three components of a transaction. Each component comes to the engine as an event with the following fields:
In addition, we have the following extra fields.
In event A:
In event C:
We need to take in events A, B and C and produce a single, combined event with the following fields:
What we are doing here is matching the transaction IDs on each event, to form an aggregate event. If all these events were in a relational database, this could be done as a simple SQL join except that with 10,000 events per second, you will need some serious database hardware to do it.
Further, we need to produce the following:
We need to detect a transaction that did not make it through all three events. In other words, a transaction with events A or B, but not C. Note that, in this case, what we care about is event C. The lack of events A or B could indicate a failure in the event transport and should be ignored. Although the lack of an event C could also be a transport failure, it merits looking into.
Events in Esper are represented via regular Java classes that expose JavaBean-style "getter" methods for access to event properties. Since all components of a transaction have a transaction id and timestamp field, we create a base event class with the common fields as below.
public class TxnEventBase { private String transactionId; private long timestamp; public TxnEventBase(String transactionId, long timestamp) { this.transactionId = transactionId; this.timestamp = timestamp; } public String getTransactionId() { return transactionId; } public long getTimestamp() { return timestamp; } }
Event A in the 3-component transaction can now simply extend the TxnEventBase class and add the customer id field.
public class TxnEventA extends TxnEventBase { private String customerId; public TxnEventA(String transactionId, long timestamp, String customerId) { super(transactionId, timestamp); this.customerId = customerId; } public String getCustomerId() { return customerId; } }
Now that we have defined the event classes, we can compose an EQL statement to detect combined events in which each component of the transaction is present. We restrict the event matching to the events that arrived within the last 30 minutes.
insert into CombinedEvent(transactionId, customerId, supplierId, latencyAC, latencyBC, latencyAB) select C.transactionId, customerId, supplierId, C.timestamp - A.timestamp, C.timestamp - B.timestamp, B.timestamp - A.timestamp from TxnEventA.win:time(30 minutes) A, TxnEventB.win:time(30 minutes) B, TxnEventC.win:time(30 minutes) C where A.transactionId = B.transactionId and B.transactionId = C.transactionId
This statement uses the insert into syntax to generate a CombinedEvent event stream. The next section uses that stream to derive realtime data on latencies between events.
We can now use the CombinedEvent stream to derive realtime summary data.
To derive the minimum, maximum and average total latency from the events (difference in time between A and C) over the past 30 minutes we can use the EQL below.
select min(latencyAC) as minLatencyAC, max(latencyAC) as maxLatencyAC, avg(latencyAC) as avgLatencyAC from CombinedEvent.win:time(30 minutes)
We can derive this data per customer and supplier id by specifying a group by clause.
select customerId, min(latencyAC) as minLatencyAC, max(latencyAC) as maxLatencyAC, avg(latencyAC) as avgLatencyAC from CombinedEvent.win:time(30 minutes) group by customerId
An outer join allows us to detect a transaction that did not make it through all three events.
select * from TxnEventA.win:time(30 minutes) A full outer join TxnEventC.win:time(60 minutes) C on A.transactionId = C.transactionId full outer join TxnEventB.win:time(30 minutes) B on B.transactionId = C.transactionId where C.transactionId is null
When TxnEventA or TxnEventB events leave their respective time windows consisting of the last 30 minutes of events, Esper filters out rows in which no EventC row was found.
The Java code snippet below shows how an engine instance can be obtained. The Configuration is used to make the event classes known to the engine. This is optional and makes the EQL statements easier to read. Esper also supports fully-qualified Java class names in EQL statements.
Configuration configuration = new Configuration(); configuration.addEventTypeAlias("TxnEventA", TxnEventA.class.getName()); configuration.addEventTypeAlias("TxnEventB", TxnEventB.class.getName()); configuration.addEventTypeAlias("TxnEventC", TxnEventC.class.getName()); EPServiceProvider epService = EPServiceProviderManager.getProvider("TransactionExample", configuration);
The administrative interface on the engine creates statements:
String stmtSupplier = "select supplierId, min(latencyAC) as minLatency, max(latencyAC) as maxLatency, avg(latencyAC) as avgLatency" + "from CombinedEvent.win:time(30 minutes) " + "group by supplierId"; EPStatement bySupplierStatement = epService.getEPAdministrator().createEQL(stmtSupplier); bySupplierStatement.addListener(myListener);
Listeners receive events published by statements and can query the received events. The listener interface receives event entering window(s) as new events (aka. istream or insert stream) and events as they are leaving window(s) as old events (aka. rstream or remove stream).
public class RealtimeSummaryTotalsListener implements UpdateListener { public void update(EventBean[] newEvents, EventBean[] oldEvents) { EventBean event = newEvents[0]; log.debug(" Totals minAC=" + event.get("minLatencyAC") + " maxAC=" + event.get("maxLatencyAC") + " avgAC=" + event.get("avgLatencyAC")); } }
Finally, events are sent for processing by the engine via the runtime interface:
TxnEventA a = new TxnEventA("txnid1", 1, "customerId1"); epService.getEPRuntime().sendEvent(event);
The complete example code can be found in the "eg" folder of the distribution in package net.esper.example.transaction. The example contains a transaction simulator that can be invoked from the command line. The readme file in the "eg/etc" folder contains build instructions and command line parameters to run the transaction simulator from the command line. You do not need to perform a build of the Esper engine to run the example, the Esper jar file has been included in the distribution.
Please consult the examples section in the reference documentation for more information.