Chapter 11. Extension and Plug-in

11.1. Custom View Implementation

Views in Esper are used to derive information from an event stream, and to represent data windows onto an event stream. This chapter describes how to plug-in a new, custom view.

The following steps are required to develop and use a custom view with Esper.

  1. Implement a view factory class. View factories are classes that accept and check view parameters and instantiate the appropriate view class.

  2. Implement a view class. A view class commonly represents a data window or derives new information from a stream.

  3. Configure the view factory class supplying a view namespace and name in the engine configuration file.

The example view factory and view class that are used in this chapter can be found in the test source folder in the package com.espertech.esper.regression.client by the name MyTrendSpotterViewFactory and MyTrendSpotterView.

Views can make use of the following engine services available via StatementServiceContext:

  • The SchedulingService interface allows views to schedule timer callbacks to a view

  • The EventAdapterService interface allows views to create new event types and event instances of a given type.

  • The StatementStopService interface allows view to register a callback that the engine invokes to indicate that the view's statement has been stopped

Note that custom views may use engine services and APIs that can be subject to change between major releases. The engine services discussed above and view APIs are considered part of the engine internal public API and are stable. Any changes to such APIs are disclosed through the release change logs and history. Please also consider contributing your custom view to the Esper project team by submitting the view code through the mailing list or via a JIRA issue.

11.1.1. Implementing a View Factory

A view factory class is responsible for the following functions:

  • Accept zero, one or more view parameters. Validate and parse the parameters as required.

  • Validate that the parameterized view is compatible with its parent view. For example, validate that field names are valid in the event type of the parent view.

  • Instantiate the actual view class.

  • Provide information about the event type of events posted by the view.

View factory classes simply subclass com.espertech.esper.view.ViewFactorySupport:

public class MyTrendSpotterViewFactory extends ViewFactorySupport { ...

Your view factory class must implement the setViewParameters method to accept and parse view parameters. The next code snippet shows an implementation of this method. The code obtains a single field name parameter from the parameter list passed to the method:

public class MyTrendSpotterViewFactory extends ViewFactorySupport {
  private String fieldName;
  private EventType eventType;

  public void setViewParameters(ViewFactoryContext viewFactoryContext, 
                          List<Object> viewParameters) throws ViewParameterException
  {
    String errorMessage = "'Trend spotter' view require a single field name as a parameter";
    if (viewParameters.size() != 1) {
      throw new ViewParameterException(errorMessage);
    }

    if (!(viewParameters.get(0) instanceof String)) {
      throw new ViewParameterException(errorMessage);
    }

    fieldName = (String) viewParameters.get(0);
  }
  ...

After the engine supplied view parameters to the factory, the engine will ask the view to attach to its parent view and validate any field name parameters against the parent view's event type. If the view will be generating events of a different type then the events generated by the parent view, then the view factory can create the new event type in this method:

public void attach(EventType parentEventType, 
		StatementServiceContext statementServiceContext, 
		ViewFactory optionalParentFactory, 
		List<ViewFactory> parentViewFactories) 
		    throws ViewAttachException {
  String result = PropertyCheckHelper.checkNumeric(parentEventType, fieldName);
  if (result != null) {
    throw new ViewAttachException(result);
  }

  // create new event type
  Map<String, Class> eventTypeMap = new HashMap<String, Class>();
  eventTypeMap.put(PROPERTY_NAME, Long.class);
  eventType = statementServiceContext.getEventAdapterService().
                         createAnonymousMapType(eventTypeMap);
}

Finally, the engine asks the view factory to create a view instance:

public View makeView(StatementServiceContext statementServiceContext) {
  return new MyTrendSpotterView(statementServiceContext, fieldName);
}

11.1.2. Implementing a View

A view class is responsible for:

  • The setParent method informs the view of the parent view's event type

  • The update method receives insert streams and remove stream events from its parent view

  • The iterator method supplies an (optional) iterator to allow an application to pull or request results from an EPStatement

  • The cloneView method must make a configured copy of the view to enable the view to work in a grouping context together with a std:groupby parent view

View classes simply subclass com.espertech.esper.view.ViewSupport:

public class MyTrendSpotterView extends ViewSupport { ...

The view class must implement the setParent(Viewable parent) method. This is an opportunity for the view to initialize and obtain a fast event property getter for later use to obtain event property values. The next code snippet shows an implementation of this method:

public void setParent(Viewable parent) {
  super.setParent(parent);
  if (parent != null)  {
    fieldGetter = parent.getEventType().getGetter(fieldName);
  }
}

Your update method will be processing incoming (insert stream) and outgoing (remove stream) events posted by the parent view (if any), as well as providing incoming and outgoing events to child views. The convention required of your update method implementation is that the view releases any insert stream events (EventBean object references) which the view generates as reference-equal remove stream events (EventBean object references) at a later time.

The view implementation must call the updateChildren method to post outgoing insert and remove stream events. Similar to the update method, the updateChildren method takes insert and remove stream events as parameters.

A sample update method implementation that computes a number of events in an upward trend is shown below:

public final void update(EventBean[] newData, EventBean[] oldData) {
    // The remove stream most post the same exact object references of events that were posted as the insert stream
    EventBean[] removeStreamToPost;
    if (lastInsertStreamEvent != null) {
        removeStreamToPost = new EventBean[] {lastInsertStreamEvent};
    } 
    else {
        removeStreamToPost = new EventBean[] {populateMap(null)};
    }

    // add data points
    if (newData != null) {
        for (EventBean aNewData : newData) {
            double dataPoint = ((Number) fieldGetter.get(aNewData)).doubleValue();

            if (lastDataPoint == null) {
                trendcount = 1L;
            }
            else if (lastDataPoint < dataPoint) {
                trendcount++;
            }
            else if (lastDataPoint > dataPoint) {
                trendcount = 0L;
            }
            lastDataPoint = dataPoint;
        }
    }

    if (this.hasViews()) {
        EventBean newDataPost = populateMap(trendcount);
        lastInsertStreamEvent = newDataPost; 
        updateChildren(new EventBean[] {newDataPost}, removeStreamToPost);
    }
}

11.1.3. View Contract

The update method must adhere to the following conventions, to prevent memory leaks and to enable correct behavior within the engine:

  • A view implementation that posts events to the insert stream must post unique EventBean object references as insert stream events, and cannot post the same EventBean object reference multiple times. The underlying event to the EventBean object reference can be the same object reference, however the EventBean object reference posted by the view into the insert stream must be a new instance for each insert stream event.

  • If the custom view posts a continuous insert stream, then the views must also post a continuous remove stream (second parameter to the updateChildren method). If the view does not post remove stream events, it assumes unbound keep-all semantics.

  • EventBean events posted as remove stream events must be the same object reference as the EventBean events posted as insert stream by the view. Thus remove stream events posted by the view (the EventBean instances, does not affect the underlying representation) must be reference-equal to insert stream events posted by the view as part of an earlier invocation of the update method, or the same invocation of the update method.

  • EventBean events represent a unique observation. The values of the observation can be the same, thus the underlying representation of an EventBean event can be reused, however event property values must be kept immutable and not be subject to change.

The engine can provide a callback to the view indicating when a statement using the view is stopped. The callback is available to the view via the com.espertech.esper.view.StatementStopCallback interface. Your view code must subscribe to the stop callback in order for the engine to invoke the callback:

statementContext.getStatementStopService().addSubscriber(this);

Please refer to the sample views for a code sample on how to implement iterator and cloneView methods. The iterator of the custom view implementation does not need to be thread-safe.

11.1.4. Configuring View Namespace and Name

The view factory class name as well as the view namespace and name for the new view must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-view namespace="custom" name="trendspotter" 
      factory-class="com.espertech.esper.regression.view.MyTrendSpotterViewFactory" /> 
</esper-configuration>

The new view is now ready to use in a statement:

select * from StockTick.custom:trendspotter(price)

Note that the view must implement the copyView method to enable the view to work in a grouping context as shown in the next statement:

select * from StockTick.std:groupby(symbol).custom:trendspotter(price)

11.2. Custom Aggregation Functions

Aggregation functions aggregate event property values or expression results obtained from one or more streams. Examples for built-in aggregation functions are count(*), sum(price * volume) or avg(distinct volume).

The optional keyword distinct ensures that only distinct (unique) values are aggregated and duplicate values are ignored by the aggregation function. Custom plug-in aggregation functions do not need to implement the logic to handle distinct values. This is because when the engine encounters the distinct keyword, it eliminates any non-distinct values before passing the value for aggregation to the custom aggregation function.

Custom aggregation functions can also be passed multiple parameters using the array operator {}.

The following steps are required to develop and use a custom aggregation function with Esper.

  1. Implement an aggregation function class.

  2. Register the aggregation function class with the engine by supplying a function name, via the engine configuration file or the configuration API.

The code for the example aggregation function as shown in this chapter can be found in the test source folder in the package com.espertech.esper.regression.client by the name MyConcatAggregationFunction. The sample function simply concatenates string-type values.

11.2.1. Implementing an Aggregation Function

An aggregation function class is responsible for the following functions:

  • Implement a validate method that validates the value type of the data points that the function must process.

  • Implement a getValueType method that returns the type of the aggregation value generated by the function. For example, the built-in count aggregation function returns Long.class as it generates long -typed values.

  • Implement an enter method that the engine invokes to add a data point into the aggregation, when an event enters a data window

  • Implement a leave method that the engine invokes to remove a data point from the aggregation, when an event leaves a data window

  • Implement a getValue method that returns the current value of the aggregation.

Aggregation function classes simply subclass com.espertech.esper.epl.agg.AggregationSupport:

public class MyConcatAggregationFunction extends AggregationSupport { ...

The engine generally constructs one instance of the aggregation function class for each time the function is listed in a statement, however the engine may decide to reduce the number of aggregation class instances if it finds equivalent aggregations. The constructor initializes the aggregation function:

public class MyConcatAggregationFunction extends AggregationSupport {
  private final static char DELIMITER = ' ';
  private StringBuilder builder;
  private String delimiter;

  public MyConcatAggregationFunction()
  {
    super();
    builder = new StringBuilder();
    delimiter = "";
  }
  ...

An aggregation function must provide an implementation of the validate method that is passed the result type of the expression within the aggregation function. Since the example concatenation function requires string types, it implements a type check:

public void validate(Class childNodeType) {
  if (childNodeType != String.class) {
    throw new IllegalArgumentException("Concat aggregation requires a String parameter");
  }
}

The enter method adds a datapoint to the current aggregation value. The example enter method shown below adds a delimiter and the string value to a string buffer:

public void enter(Object value) {
  if (value != null) {
    builder.append(delimiter);
    builder.append(value.toString());
    delimiter = String.valueOf(DELIMITER);
  }
}

Conversly, the leave method removes a datapoint from the current aggregation value. The example leave method removes from the string buffer:

public void leave(Object value) {
  if (value != null) {
    builder.delete(0, value.toString().length() + 1);
  }
}

In order for the engine to validate the type returned by the aggregation function against the types expected by enclosing expressions, the getValueType must return the result type of any values produced by the aggregation function:

public Class getValueType() {
  return String.class;
}

Finally, the engine obtains the current aggregation value by means of the getValue method:

public Object getValue() {
  return builder.toString();
}

11.2.2. Configuring Aggregation Function Name

The aggregation function class name as well as the function name for the new aggregation function must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-aggregation-function name="concat" 
    function-class="com.espertech.esper.regression.client.MyConcatAggregationFunction" />
</esper-configuration>

The new aggregation function is now ready to use in a statement:

select concat(symbol) from StockTick.win:length(3)

11.3. Custom Pattern Guard

Pattern guards are pattern objects that control the lifecycle of the guarded sub-expression, and can filter the events fired by the subexpression.

The following steps are required to develop and use a custom guard object with Esper.

  1. Implement a guard factory class, responsible for creating guard object instances.

  2. Implement a guard class.

  3. Register the guard factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.

The code for the example guard object as shown in this chapter can be found in the test source folder in the package com.espertech.esper.regression.client by the name MyCountToPatternGuardFactory. The sample guard discussed here counts the number of events occurring up to a maximum number of events, and end the sub-expression when that maximum is reached.

11.3.1. Implementing a Guard Factory

A guard factory class is responsible for the following functions:

  • Implement a setGuardParameters method that validates guard parameters.

  • Implement a makeGuard method that constructs a new guard instance.

Guard factory classes subclass com.espertech.esper.pattern.guard.GuardFactorySupport:

public class MyCountToPatternGuardFactory extends GuardFactorySupport { ...

The engine constructs one instance of the guard factory class for each time the guard is listed in a statement.

The guard factory class implements the setGuardParameters method that is passed the parameters to the guard as supplied by the statement. It verifies the guard parameters, similar to the code snippet shown next. Our example counter guard takes a single numeric parameter:

public void setGuardParameters(List<Object> guardParameters) throws GuardParameterException {
  if (guardParameters.size() != 1) {
    throw new GuardParameterException("Count-to guard takes a single integer parameter");
  }
  if (!(guardParameters.get(0) instanceof Integer)) {
    throw new GuardParameterException("Count-to guard takes a single integer parameter");
  }
  numCountTo = (Integer) guardParameters.get(0);
}

The makeGuard method is called by the engine to create a new guard instance. The example makeGuard method shown below passes the maximum count of events to the guard instance. It also passes a Quitable implementation to the guard instance. The guard uses Quitable to indicate that the sub-expression contained within must stop (quit) listening for events.

public Guard makeGuard(PatternContext context, Quitable quitable, 
  Object stateNodeId, Object guardState) {
    return new MyCountToPatternGuard(numCountTo, quitable);
}

11.3.2. Implementing a Guard Class

A guard class has the following responsibilities:

  • Provides a startGuard method that initalizes the guard.

  • Provides a stopGuard method that stops the guard, called by the engine when the whole pattern is stopped, or the sub-expression containing the guard is stopped.

  • Provides an inspect method that the pattern engine invokes to determine if the guard lets matching events pass for further evaluation by the containing expression.

Guard classes subclass com.espertech.esper.pattern.guard.GuardSupport as shown here:

public abstract class GuardSupport implements Guard { ...

The engine invokes the guard factory class to construct an instance of the guard class for each new sub-expression instance within a statement.

A guard class must provide an implementation of the startGuard method that the pattern engine invokes to start a guard instance. In our example, the method resets the guard's counter to zero:

public void startGuard() {
  counter = 0;
}

The pattern engine invokes the inspect method for each time the sub-expression indicates a new event result. Our example guard needs to count the number of events matched, and quit if the maximum number is reached:

public boolean inspect(MatchedEventMap matchEvent) {
  counter++;
  if (counter > numCountTo) {
    quitable.guardQuit();
    return false;
  }
  return true;
}

The inspect method returns true for events that pass the guard, and false for events that should not pass the guard.

11.3.3. Configuring Guard Namespace and Name

The guard factory class name as well as the namespace and name for the new guard must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-pattern-guard namespace="myplugin" name="count_to" 
      factory-class="com.espertech.esper.regression.client.MyCountToPatternGuardFactory"/>
</esper-configuration>

The new guard is now ready to use in a statement. The next pattern statement detects the first 10 MyEvent events:

select * from pattern [(every MyEvent) where myplugin:count_to(10)]

Note that the every keyword was placed within parentheses to ensure the guard controls the repeated matching of events.

11.4. Custom Pattern Observer

Pattern observers are pattern objects that are executed as part of a pattern expression and can observe events or test conditions. Examples for built-in observers are timer:at and timer:interval. Some suggested uses of observer objects are:

  • Implement custom scheduling logic using the engine's own scheduling and timer services

  • Test conditions related to prior events matching an expression

The following steps are required to develop and use a custom observer object within pattern statements:

  1. Implement an observer factory class, responsible for creating observer object instances.

  2. Implement an observer class.

  3. Register an observer factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.

The code for the example observer object as shown in this chapter can be found in the test source folder in package com.espertech.esper.regression.client by the name MyFileExistsObserver. The sample observer discussed here very simply checks if a file exists, using the filename supplied by the pattern statement, and via the java.io.File class.

11.4.1. Implementing an Observer Factory

An observer factory class is responsible for the following functions:

  • Implement a setObserverParameters method that validates observer parameters.

  • Implement a makeObserver method that constructs a new observer instance.

Observer factory classes subclass com.espertech.esper.pattern.observer.ObserverFactorySupport:

public class MyFileExistsObserverFactory extends ObserverFactorySupport { ...

The engine constructs one instance of the observer factory class for each time the observer is listed in a statement.

The observer factory class implements the setObserverParameters method that is passed the parameters to the observer as supplied by the statement. It verifies the observer parameters, similar to the code snippet shown next. Our example file-exists observer takes a single string parameter:

public void setObserverParameters(List<Object> observerParameters) 
			throws ObserverParameterException {
  String message = "File exists observer takes a single string filename parameter";
  if (observerParameters.size() != 1) {
    throw new ObserverParameterException(message);
  }
  if (!(observerParameters.get(0) instanceof String)) {
    throw new ObserverParameterException(message);
  }
  
  filename = observerParameters.get(0).toString();
}

The pattern engine calls the makeObserver method to create a new observer instance. The example makeObserver method shown below passes parameters to the observer instance:

public EventObserver makeObserver(PatternContext context, 
			MatchedEventMap beginState, 
			ObserverEventEvaluator observerEventEvaluator, 
			Object stateNodeId, 
			Object observerState) {
  return new MyFileExistsObserver(beginState, observerEventEvaluator, filename);
}

The ObserverEventEvaluator parameter allows an observer to indicate events, and to indicate change of truth value to permanently false. Use this interface to indicate when your observer has received or witnessed an event, or changed it's truth value to true or permanently false.

The MatchedEventMap parameter provides a Map of all matching events for the expression prior to the observer's start. For example, consider a pattern as below:

a=MyEvent -> myplugin:my_observer(...)

The above pattern tagged the MyEvent instance with the tag "a". The pattern engine starts an instance of my_observer when it receives the first MyEvent. The observer can query the MatchedEventMap using "a" as a key and obtain the tagged event.

11.4.2. Implementing an Observer Class

An observer class has the following responsibilities:

  • Provides a startObserve method that starts the observer.

  • Provides a stopObserve method that stops the observer, called by the engine when the whole pattern is stopped, or the sub-expression containing the observer is stopped.

Observer classes subclass com.espertech.esper.pattern.observer.ObserverSupport as shown here:

public class MyFileExistsObserver implements EventObserver { ...

The engine invokes the observer factory class to construct an instance of the observer class for each new sub-expression instance within a statement.

An observer class must provide an implementation of the startObserve method that the pattern engine invokes to start an observer instance. In our example, the observer checks for the presence of a file and indicates the truth value to the remainder of the expression:

public void startObserve() {
  File file = new File(filename);
  if (file.exists()) {
    observerEventEvaluator.observerEvaluateTrue(beginState);
  } 
  else {
    observerEventEvaluator.observerEvaluateFalse(); 
  }
}

Note the observer passes the ObserverEventEvaluator an instance of MatchedEventMap. The observer can also create one or more new events and pass these events through the Map to the remaining expressions in the pattern.

11.4.3. Configuring Observer Namespace and Name

The observer factory class name as well as the namespace and name for the new observer must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-pattern-observer namespace="myplugin" name="file_exists" 
    factory-class="com.espertech.esper.regression.client.MyFileExistsObserverFactory" />
</esper-configuration>

The new observer is now ready to use in a statement. The next pattern statement checks every 10 seconds if the given file exists, and indicates to the listener when the file is found.

select * from pattern [every timer:interval(10 sec) -> myplugin:file_exists("myfile.txt")]

11.5. Custom Event Representation

Creating a plug-in event representation can be useful under any of these conditions:

  • Your application has existing Java classes that carry event metadata and event property values and your application does not want to (or cannot) extract or transform such event metadata and event data into one of the built-in event representations (POJO Java objects, Map or XML DOM).

  • Your application wants to provide a faster or short-cut access path to event data, for example to access XML event data through a Streaming API for XML (StAX).

  • Your application must perform a network lookup or other dynamic resolution of event type and events.

Note that the classes to plug-in custom event representations are held stable between minor releases, but can be subject to change between major releases.

Currently, EsperIO provides the following additional event representations:

  • Apache Axiom provides access to XML event data on top of the fast Streaming API for XML (StAX).

The source code is available for these and they are therefore excellent examples for how to implement a plug-in event representation. Please see the EsperIO documentation for usage details.

11.5.1. How It Works

Your application provides a plug-in event representation as an implementation of the com.espertech.esper.plugin.PlugInEventRepresentation interface. It registers the implementation class in the Configuration and at the same time provides a unique URI. This URI is called the root event representation URI. An example value for a root URI is type://xml/apacheaxiom/OMNode.

One can register multiple plug-in event representations. Each representation has a root URI. The root URI serves to divide the overall space of different event representations and plays a role in resolving event types and event objects.

There are two situations in an Esper engine instance asks an event representation for an event type:

  1. When an application registers a new event type alias using the method addPlugInEventType on ConfigurationOperations, either at runtime or at configuration time.

  2. When an EPL statement is created with a new event type alias (one not seen before) and the URIs for resolving such names are set beforehand via setPlugInEventTypeAliasResolutionURIs on ConfigurationOperations.

The implementation of the PlugInEventRepresentation interface must provide implementations for two key interfaces: com.espertech.esper.event.EventType and EventBean. It must also implement several other related interfaces as described below.

The EventType methods provide event metadata including property names and property types. They also provide instances of EventPropertyGetter for retrieving event property values. Each instance of EventType represents a distinct type of event.

The EventBean implementation is the event itself and encapsulates the underlying event object.

11.5.2. Steps

Follow the steps outlined below to process event objects for your event types:

  1. Implement the EventType, EventPropertyGetter and EventBean interfaces.

  2. Implement the PlugInEventRepresentation interface, the PlugInEventTypeHandler and PlugInEventBeanFactory interfaces, then add the PlugInEventRepresentation class name to configuration.

  3. Register plug-in event types, and/or set the event type alias resolution URIs, via configuration.

  4. Obtain an EventSender from EPRuntime via the getEventSender(URI[]) method and use that to send in your event objects.

Please consult the JavaDoc for further information on each of the interfaces and their respective methods. The Apache Axiom event representation is an example implementation that can be found in the EsperIO packages.

11.5.3. URI-based Resolution

Assume you have registered event representations using the following URIs:

  1. type://myFormat/myProject/myName

  2. type://myFormat/myProject

  3. type://myFormat/myOtherProject

When providing an array of child URIs for resolution, the engine compares each child URI to each of the event representation root URIs, in the order provided. Any event representation root URIs that spans the child URI space becomes a candidate event representation. If multiple root URIs match, the order is defined by the more specific root URI first, to the least specific root URI last.

During event type resolution and event sender resolution you provide a child URI. Assuming the child URI provided is type://myFormat/myProject/myName/myEvent?param1=abc&param2=true. In this example both root URIs #1 (the more specific) and #1 (the less specific) match, while root URI #3 is not a match. Thus at the time of type resolution the engine invokes the acceptType method on event presentation for URI #1 first (the more specific), before asking #2 (the less specific) to resolve the type.

The EventSender returned by the getEventSender(URI[]) method follows the same scheme. The event sender instance asks each matching event representation for each URI to resolve the event object in the order of most specific to least specific root URI, and the first event representation to return an instance of EventBean ends the resolution process for event objects.

The type:// part of the URI is an optional convention for the scheme part of an URI that your application may follow. URIs can also be simple names and can include parameters, as the Java software JavaDoc documents for class java.net.URI.

11.5.4. Example

This section implements a minimal sample plug-in event representation. For the sake of keeping the example easy to understand, the event representation is rather straightforward: an event is a java.util.Properties object that consists of key-values pairs of type string.

The code shown next does not document method footprints. Please consult the JavaDoc API documentation for method details.

11.5.4.1. Sample Event Type

First, the sample shows how to implement the EventType interface. The event type provides information about property names and types, as well as supertypes of the event type.

Our EventType takes a set of valid property names:

public class MyPlugInPropertiesEventType implements EventType {
  private final Set<String> properties;

  public MyPlugInPropertiesEventType(Set<String> properties) {
    this.properties = properties;
  }

  public Class getPropertyType(String property) {
    if (!isProperty(property)) {
      return null;
    }
    return String.class;
  }

  public Class getUnderlyingType() {
    return Properties.class;
  }
  
  //... further methods below
}

An EventType is responsible for providing implementations of EventPropertyGetter to query actual events. The getter simply queries the Properties object underlying each event:

  public EventPropertyGetter getGetter(String property) {
    final String propertyName = property;
    
    return new EventPropertyGetter() {
      public Object get(EventBean eventBean) throws PropertyAccessException {
        MyPlugInPropertiesEventBean propBean = (MyPlugInPropertiesEventBean) eventBean;
        return propBean.getProperties().getProperty(propertyName);
      }
      
      public boolean isExistsProperty(EventBean eventBean) {
        MyPlugInPropertiesEventBean propBean = (MyPlugInPropertiesEventBean) eventBean;
        return propBean.getProperties().getProperty(propertyName) != null;
      }
    };
  }

Our sample EventType does not have supertypes. Supertypes represent an extends-relationship between event types, and subtypes are expected to exhibit the same event property names and types as each of their supertypes combined:

  public EventType[] getSuperTypes() {
    return null;
  }

  public Iterator<EventType> getDeepSuperTypes() {
    return null;
  }

11.5.4.2. Sample Event Bean

Each EventBean instance represents an event. The interface is straightforward to implement. In this example an event is backed by a Properties object:

public class MyPlugInPropertiesEventBean implements EventBean {
  private final MyPlugInPropertiesEventType eventType;
  private final Properties properties;

  public MyPlugInPropertiesEventBean(MyPlugInPropertiesEventType eventType, 
        Properties properties) {
    this.eventType = eventType;
    this.properties = properties;
  }

  public EventType getEventType() {
    return eventType;
  }

  public Object get(String property) throws PropertyAccessException {
    EventPropertyGetter getter = eventType.getGetter(property);
    return getter.get(this);
  }

  public Object getUnderlying() {
    return properties;
  }

  protected Properties getProperties() {
    return properties;
  }    
}

11.5.4.3. Sample Event Representation

A PlugInEventRepresentation serves to create EventType and EventBean instances through its related interfaces.

The sample event representation creates MyPlugInPropertiesEventType and MyPlugInPropertiesEventBean instances. The PlugInEventTypeHandler returns the EventType instance and an EventSender instance.

Our sample event representation accepts all requests for event types by returning boolean true on the acceptType method. When asked for the PlugInEventTypeHandler, it constructs a new EventType. The list of property names for the new type is passed as an initialization value provided through the configuration API or XML, as a comma-separated list of property names:

public class MyPlugInEventRepresentation implements PlugInEventRepresentation {

  private List<MyPlugInPropertiesEventType> types;

  public void init(PlugInEventRepresentationContext context) {
    types = new ArrayList<MyPlugInPropertiesEventType>();
  }

  public boolean acceptsType(PlugInEventTypeHandlerContext context) {
    return true;
  }

  public PlugInEventTypeHandler getTypeHandler(PlugInEventTypeHandlerContext eventTypeContext) {
    String proplist = (String) eventTypeContext.getTypeInitializer();
    String[] propertyList = proplist.split(",");

    Set<String> typeProps = new HashSet<String>(Arrays.asList(propertyList));

    MyPlugInPropertiesEventType eventType = new MyPlugInPropertiesEventType(typeProps);
    types.add(eventType);

    return new MyPlugInPropertiesEventTypeHandler(eventType);
  }
  // ... more methods below
}

The PlugInEventTypeHandler simply returns the EventType as well as an implementation of EventSender for processing same-type events:

public class MyPlugInPropertiesEventTypeHandler implements PlugInEventTypeHandler {
  private final MyPlugInPropertiesEventType eventType;

  public MyPlugInPropertiesEventTypeHandler(MyPlugInPropertiesEventType eventType) {
    this.eventType = eventType;
  }

  public EventSender getSender(EPRuntimeEventSender runtimeEventSender) {
    return new MyPlugInPropertiesEventSender(eventType, runtimeEventSender);
  }

  public EventType getType() {
    return eventType;
  }
}

The EventSender returned by PlugInEventTypeHandler is expected process events of the same type or any subtype:

public class MyPlugInPropertiesEventSender implements EventSender {
  private final MyPlugInPropertiesEventType type;
  private final EPRuntimeEventSender runtimeSender;

  public MyPlugInPropertiesEventSender(MyPlugInPropertiesEventType type, 
        EPRuntimeEventSender runtimeSender) {
    this.type = type;
    this.runtimeSender = runtimeSender;
  }

  public void sendEvent(Object event) {
    if (!(event instanceof Properties)) {
       throw new EPException("Sender expects a properties event");
    }
    EventBean eventBean = new MyPlugInPropertiesEventBean(type, (Properties) event);
    runtimeSender.processWrappedEvent(eventBean);
  }
}

11.5.4.4. Sample Event Bean Factory

The plug-in event representation may optionally provide an implementation of PlugInEventBeanFactory. A PlugInEventBeanFactory may inspect event objects and assign an event type dynamically based on resolution URIs and event properties.

Our sample event representation accepts all URIs and returns a MyPlugInPropertiesBeanFactory:

public class MyPlugInEventRepresentation implements PlugInEventRepresentation {

  // ... methods as seen earlier
  public boolean acceptsEventBeanResolution(
        PlugInEventBeanReflectorContext eventBeanContext) {
    return true;
  }

  public PlugInEventBeanFactory getEventBeanFactory(
        PlugInEventBeanReflectorContext eventBeanContext) {
    return new MyPlugInPropertiesBeanFactory(types);
   }
}

Last, the sample MyPlugInPropertiesBeanFactory implements the PlugInEventBeanFactory interface. It inspects incoming events and determines an event type based on whether all properties for that event type are present:

public class MyPlugInPropertiesBeanFactory implements PlugInEventBeanFactory {
  private final List<MyPlugInPropertiesEventType> knownTypes;

  public MyPlugInPropertiesBeanFactory(List<MyPlugInPropertiesEventType> types) {
    knownTypes = types;
  }

  public EventBean create(Object event, URI resolutionURI) {
    Properties properties = (Properties) event;

    // use the known types to determine the type of the object
    for (MyPlugInPropertiesEventType type : knownTypes) {
      // if there is one property the event does not contain, then its not the right type
      boolean hasAllProperties = true;
      for (String prop : type.getPropertyNames()) {
        if (!properties.containsKey(prop)) {
          hasAllProperties = false;
          break;
        }
      }

      if (hasAllProperties) {
        return new MyPlugInPropertiesEventBean(type, properties);
      }
    }
    return null; // none match, unknown event
  }
}

© 2008 EsperTech Inc. All Rights Reserved