Chapter 11. Extension and Plug-in

11.1. Overview

Esper can currently be extended by these means:

11.2. Custom View Implementation

Views in Esper are used to derive information from an event stream, and to represent data windows onto an event stream. This chapter describes how to plug-in a new, custom view.

The following steps are required to develop and use a custom view with Esper.

  1. Implement a view factory class. View factories are classes that accept and check view parameters and instantiate the appropriate view class.

  2. Implement a view class. A view class commonly represents a data window or derives new information from a stream.

  3. Configure the view factory class supplying a view namespace and name in the engine configuration file.

The example view factory and view class that are used in this chapter can be found in the test source folder in the package net.esper.regression.client by the name MyTrendSpotterViewFactory and MyTrendSpotterView.

Views can make use of the following engine services available via StatementServiceContext:

  • The SchedulingService interface allows views to schedule timer callbacks to a view

  • The EventAdapterService interface allows views to create new event types and event instances of a given type.

  • The StatementStopService interface allows view to register a callback that the engine invokes to indicate that the view's statement has been stopped

Note that custom views may use engine services and APIs that can be subject to change between major releases. The engine services discussed above and view APIs are considered part of the engine internal public API and are stable. Any changes to such APIs are disclosed through the release change logs and history. Please also consider contributing your custom view to the Esper project team by submitting the view code through the mailing list or via a JIRA issue.

11.2.1. Implementing a View Factory

A view factory class is responsible for the following functions:

  • Accept zero, one or more view parameters. Validate and parse the parameters as required.

  • Validate that the parameterized view is compatible with its parent view. For example, validate that field names are valid in the event type of the parent view.

  • Instantiate the actual view class.

  • Provide information about the event type of events posted by the view.

View factory classes simply subclass net.esper.view.ViewFactorySupport:

public class MyTrendSpotterViewFactory extends ViewFactorySupport { ...

Your view factory class must implement the setViewParameters method to accept and parse view parameters. The next code snippet shows an implementation of this method. The code obtains a single field name parameter from the parameter list passed to the method:

public class MyTrendSpotterViewFactory extends ViewFactorySupport {
  private String fieldName;
  private EventType eventType;

  public void setViewParameters(ViewFactoryContext viewFactoryContext, 
                          List<Object> viewParameters) throws ViewParameterException
  {
    String errorMessage = "'Trend spotter' view require a single field name as a parameter";
    if (viewParameters.size() != 1) {
      throw new ViewParameterException(errorMessage);
    }

    if (!(viewParameters.get(0) instanceof String)) {
      throw new ViewParameterException(errorMessage);
    }

    fieldName = (String) viewParameters.get(0);
  }
  ...

After the engine supplied view parameters to the factory, the engine will ask the view to attach to its parent view and validate any field name parameters against the parent view's event type. If the view will be generating events of a different type then the events generated by the parent view, then the view factory can create the new event type in this method:

public void attach(EventType parentEventType, 
		StatementServiceContext statementServiceContext, 
		ViewFactory optionalParentFactory, 
		List<ViewFactory> parentViewFactories) 
		    throws ViewAttachException {
  String result = PropertyCheckHelper.checkNumeric(parentEventType, fieldName);
  if (result != null) {
    throw new ViewAttachException(result);
  }

  // create new event type
  Map<String, Class> eventTypeMap = new HashMap<String, Class>();
  eventTypeMap.put(PROPERTY_NAME, Long.class);
  eventType = statementServiceContext.getEventAdapterService().
                         createAnonymousMapType(eventTypeMap);
}

Finally, the engine asks the view factory to create a view instance:

public View makeView(StatementServiceContext statementServiceContext) {
  return new MyTrendSpotterView(statementServiceContext, fieldName);
}

11.2.2. Implementing a View

A view class is responsible for:

  • The setParent method informs the view of the parent view's event type

  • The update method receives insert streams and remove stream events from its parent view

  • The iterator method supplies an (optional) iterator to allow an application to pull or request results from an EPStatement

  • The cloneView method must make a configured copy of the view to enable the view to work in a grouping context together with a std:groupby parent view

View classes simply subclass net.esper.view.ViewSupport:

public class MyTrendSpotterView extends ViewSupport { ...

The view class must implement the setParent(Viewable parent) method. This is an opportunity for the view to initialize and obtain a fast event property getter for later use to obtain event property values. The next code snippet shows an implementation of this method:

public void setParent(Viewable parent) {
  super.setParent(parent);
  if (parent != null)  {
    fieldGetter = parent.getEventType().getGetter(fieldName);
  }
}

Your update method will be processing incoming (insert stream) and outgoing (remove stream) events, as well as providing incoming and outgoing events to child views. The convention required of your update method implementation is that the view releases any insert stream events which the view generates as semantically-equal remove stream events at a later time. A sample update method implementation that computes a number of events in an upward trend is shown below:

public final void update(EventBean[] newData, EventBean[] oldData) {
  EventBean oldDataPost = populateMap(trendcount);

  // add data points
  if (newData != null) {
    for (int i = 0; i < newData.length; i++) {
      double dataPoint = ((Number) fieldGetter.get(newData[i])).doubleValue();

      if (lastDataPoint == null) {
        trendcount = 1L;
      }
      else if (lastDataPoint < dataPoint) {
        trendcount++;
      }
      else if (lastDataPoint > dataPoint) {
        trendcount = 0L;
      }
      lastDataPoint = dataPoint;
    }
  }

  if (this.hasViews())	{
    EventBean newDataPost = populateMap(trendcount);
    updateChildren(new EventBean[] {newDataPost}, new EventBean[] {oldDataPost});
  }
}

This update method must adhere to the following view conventions, to prevent memory leaks and to enable correct behavior within the engine:

  • Views must post a remove stream in the form of old data to child views. The remove stream must consist of the same event reference(s) posted as insert stream (new data).

The engine can provide a callback to the view indicating when a statement using the view is stopped. The callback is available to the view via the net.esper.view.StatementStopCallback interface. Your view code must subscribe to the stop callback in order for the engine to invoke the callback:

statementContext.getStatementStopService().addSubscriber(this);

Please refer to the sample views for a code sample on how to implement iterator and cloneView methods.

11.2.3. Configuring View Namespace and Name

The view factory class name as well as the view namespace and name for the new view must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-view namespace="custom" name="trendspotter" 
      factory-class="net.esper.regression.view.MyTrendSpotterViewFactory" /> 
</esper-configuration>

The new view is now ready to use in a statement:

select * from StockTick.custom:trendspotter('price')

Note that the view must implement the copyView method to enable the view to work in a grouping context as shown in the next statement:

select * from StockTick.std:groupby('symbol').custom:trendspotter('price')

11.3. Custom Aggregation Functions

Aggregation functions aggregate event property values or expression results obtained from one or more streams. Examples for built-in aggregation functions are count(*), sum(price * volume) or avg(distinct volume).

The optional keyword distinct ensures that only distinct (unique) values are aggregated and duplicate values are ignored by the aggregation function. Custom plug-in aggregation functions do not need to implement the logic to handle distinct values. This is because when the engine encounters the distinct keyword, it eliminates any non-distinct values before passing the value for aggregation to the custom aggregation function.

The following steps are required to develop and use a custom aggregation function with Esper.

  1. Implement an aggregation function class.

  2. Register the aggregation function class with the engine by supplying a function name, via the engine configuration file or the configuration API.

The code for the example aggregation function as shown in this chapter can be found in the test source folder in the package net.esper.regression.client by the name MyConcatAggregationFunction. The sample function simply concatenates string-type values.

11.3.1. Implementing an Aggregation Function

An aggregation function class is responsible for the following functions:

  • Implement a validate method that validates the value type of the data points that the function must process.

  • Implement a getValueType method that returns the type of the aggregation value generated by the function. For example, the built-in count aggregation function returns Long.class as it generates long -typed values.

  • Implement an enter method that the engine invokes to add a data point into the aggregation, when an event enters a data window

  • Implement a leave method that the engine invokes to remove a data point from the aggregation, when an event leaves a data window

  • Implement a getValue method that returns the current value of the aggregation.

Aggregation function classes simply subclass net.esper.eql.agg.AggregationSupport:

public class MyConcatAggregationFunction extends AggregationSupport { ...

The engine generally constructs one instance of the aggregation function class for each time the function is listed in a statement, however the engine may decide to reduce the number of aggregation class instances if it finds equivalent aggregations. The constructor initializes the aggregation function:

public class MyConcatAggregationFunction extends AggregationSupport {
  private final static char DELIMITER = ' ';
  private StringBuilder builder;
  private String delimiter;

  public MyConcatAggregationFunction()
  {
    super();
    builder = new StringBuilder();
    delimiter = "";
  }
  ...

An aggregation function must provide an implementation of the validate method that is passed the result type of the expression within the aggregation function. Since the example concatenation function requires string types, it implements a type check:

public void validate(Class childNodeType) {
  if (childNodeType != String.class) {
    throw new IllegalArgumentException("Concat aggregation requires a String parameter");
  }
}

The enter method adds a datapoint to the current aggregation value. The example enter method shown below adds a delimiter and the string value to a string buffer:

public void enter(Object value) {
  if (value != null) {
    builder.append(delimiter);
    builder.append(value.toString());
    delimiter = String.valueOf(DELIMITER);
  }
}

Conversly, the leave method removes a datapoint from the current aggregation value. The example leave method removes from the string buffer:

public void leave(Object value) {
  if (value != null) {
    builder.delete(0, value.toString().length() + 1);
  }
}

In order for the engine to validate the type returned by the aggregation function against the types expected by enclosing expressions, the getValueType must return the result type of any values produced by the aggregation function:

public Class getValueType() {
  return String.class;
}

Finally, the engine obtains the current aggregation value by means of the getValue method:

public Object getValue() {
  return builder.toString();
}

11.3.2. Configuring Aggregation Function Name

The aggregation function class name as well as the function name for the new aggregation function must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-aggregation-function name="concat" 
    function-class="net.esper.regression.client.MyConcatAggregationFunction" />
</esper-configuration>

The new aggregation function is now ready to use in a statement:

select concat(symbol) from StockTick.win:length(3)

11.4. Custom Pattern Guard

Pattern guards are pattern objects that control the lifecycle of the guarded sub-expression, and can filter the events fired by the subexpression.

The following steps are required to develop and use a custom guard object with Esper.

  1. Implement a guard factory class, responsible for creating guard object instances.

  2. Implement a guard class.

  3. Register the guard factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.

The code for the example guard object as shown in this chapter can be found in the test source folder in the package net.esper.regression.client by the name MyCountToPatternGuardFactory. The sample guard discussed here counts the number of events occurring up to a maximum number of events, and end the sub-expression when that maximum is reached.

11.4.1. Implementing a Guard Factory

A guard factory class is responsible for the following functions:

  • Implement a setGuardParameters method that validates guard parameters.

  • Implement a makeGuard method that constructs a new guard instance.

Guard factory classes subclass net.esper.pattern.guard.GuardFactorySupport:

public class MyCountToPatternGuardFactory extends GuardFactorySupport { ...

The engine constructs one instance of the guard factory class for each time the guard is listed in a statement.

The guard factory class implements the setGuardParameters method that is passed the parameters to the guard as supplied by the statement. It verifies the guard parameters, similar to the code snippet shown next. Our example counter guard takes a single numeric parameter:

public void setGuardParameters(List<Object> guardParameters) throws GuardParameterException {
  if (guardParameters.size() != 1) {
    throw new GuardParameterException("Count-to guard takes a single integer parameter");
  }
  if (!(guardParameters.get(0) instanceof Integer)) {
    throw new GuardParameterException("Count-to guard takes a single integer parameter");
  }
  numCountTo = (Integer) guardParameters.get(0);
}

The makeGuard method is called by the engine to create a new guard instance. The example makeGuard method shown below passes the maximum count of events to the guard instance. It also passes a Quitable implementation to the guard instance. The guard uses Quitable to indicate that the sub-expression contained within must stop (quit) listening for events.

public Guard makeGuard(PatternContext context, Quitable quitable, 
  Object stateNodeId, Object guardState) {
    return new MyCountToPatternGuard(numCountTo, quitable);
}

11.4.2. Implementing a Guard Class

A guard class has the following responsibilities:

  • Provides a startGuard method that initalizes the guard.

  • Provides a stopGuard method that stops the guard, called by the engine when the whole pattern is stopped, or the sub-expression containing the guard is stopped.

  • Provides an inspect method that the pattern engine invokes to determine if the guard lets matching events pass for further evaluation by the containing expression.

Guard classes subclass net.esper.pattern.guard.GuardSupport as shown here:

public abstract class GuardSupport implements Guard { ...

The engine invokes the guard factory class to construct an instance of the guard class for each new sub-expression instance within a statement.

A guard class must provide an implementation of the startGuard method that the pattern engine invokes to start a guard instance. In our example, the method resets the guard's counter to zero:

public void startGuard() {
  counter = 0;
}

The pattern engine invokes the inspect method for each time the sub-expression indicates a new event result. Our example guard needs to count the number of events matched, and quit if the maximum number is reached:

public boolean inspect(MatchedEventMap matchEvent) {
  counter++;
  if (counter > numCountTo) {
    quitable.guardQuit();
    return false;
  }
  return true;
}

The inspect method returns true for events that pass the guard, and false for events that should not pass the guard.

11.4.3. Configuring Guard Namespace and Name

The guard factory class name as well as the namespace and name for the new guard must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-pattern-guard namespace="myplugin" name="count_to" 
      factory-class="net.esper.regression.client.MyCountToPatternGuardFactory"/>
</esper-configuration>

The new guard is now ready to use in a statement. The next pattern statement detects the first 10 MyEvent events:

select * from pattern [(every MyEvent) where myplugin:count_to(10)]

Note that the every keyword was placed within parentheses to ensure the guard controls the repeated matching of events.

11.5. Custom Pattern Observer

Pattern observers are pattern objects that are executed as part of a pattern expression and can observe events or test conditions. Examples for built-in observers are timer:at and timer:interval. Some suggested uses of observer objects are:

  • Implement custom scheduling logic using the engine's own scheduling and timer services

  • Test conditions related to prior events matching an expression

The following steps are required to develop and use a custom observer object within pattern statements:

  1. Implement an observer factory class, responsible for creating observer object instances.

  2. Implement an observer class.

  3. Register an observer factory class with the engine by supplying a namespace and name, via the engine configuration file or the configuration API.

The code for the example observer object as shown in this chapter can be found in the test source folder in package net.esper.regression.client by the name MyFileExistsObserver. The sample observer discussed here very simply checks if a file exists, using the filename supplied by the pattern statement, and via the java.io.File class.

11.5.1. Implementing an Observer Factory

An observer factory class is responsible for the following functions:

  • Implement a setObserverParameters method that validates observer parameters.

  • Implement a makeObserver method that constructs a new observer instance.

Observer factory classes subclass net.esper.pattern.observer.ObserverFactorySupport:

public class MyFileExistsObserverFactory extends ObserverFactorySupport { ...

The engine constructs one instance of the observer factory class for each time the observer is listed in a statement.

The observer factory class implements the setObserverParameters method that is passed the parameters to the observer as supplied by the statement. It verifies the observer parameters, similar to the code snippet shown next. Our example file-exists observer takes a single string parameter:

public void setObserverParameters(List<Object> observerParameters) 
			throws ObserverParameterException {
  String message = "File exists observer takes a single string filename parameter";
  if (observerParameters.size() != 1) {
    throw new ObserverParameterException(message);
  }
  if (!(observerParameters.get(0) instanceof String)) {
    throw new ObserverParameterException(message);
  }
  
  filename = observerParameters.get(0).toString();
}

The pattern engine calls the makeObserver method to create a new observer instance. The example makeObserver method shown below passes parameters to the observer instance:

public EventObserver makeObserver(PatternContext context, 
			MatchedEventMap beginState, 
			ObserverEventEvaluator observerEventEvaluator, 
			Object stateNodeId, 
			Object observerState) {
  return new MyFileExistsObserver(beginState, observerEventEvaluator, filename);
}

The ObserverEventEvaluator parameter allows an observer to indicate events, and to indicate change of truth value to permanently false. Use this interface to indicate when your observer has received or witnessed an event, or changed it's truth value to true or permanently false.

The MatchedEventMap parameter provides a Map of all matching events for the expression prior to the observer's start. For example, consider a pattern as below:

a=MyEvent -> myplugin:my_observer(...)

The above pattern tagged the MyEvent instance with the tag "a". The pattern engine starts an instance of my_observer when it receives the first MyEvent. The observer can query the MatchedEventMap using "a" as a key and obtain the tagged event.

11.5.2. Implementing an Observer Class

An observer class has the following responsibilities:

  • Provides a startObserve method that starts the observer.

  • Provides a stopObserve method that stops the observer, called by the engine when the whole pattern is stopped, or the sub-expression containing the observer is stopped.

Observer classes subclass net.esper.pattern.observer.ObserverSupport as shown here:

public class MyFileExistsObserver implements EventObserver { ...

The engine invokes the observer factory class to construct an instance of the observer class for each new sub-expression instance within a statement.

An observer class must provide an implementation of the startObserve method that the pattern engine invokes to start an observer instance. In our example, the observer checks for the presence of a file and indicates the truth value to the remainder of the expression:

public void startObserve() {
  File file = new File(filename);
  if (file.exists()) {
    observerEventEvaluator.observerEvaluateTrue(beginState);
  } 
  else {
    observerEventEvaluator.observerEvaluateFalse(); 
  }
}

Note the observer passes the ObserverEventEvaluator an instance of MatchedEventMap. The observer can also create one or more new events and pass these events through the Map to the remaining expressions in the pattern.

11.5.3. Configuring Observer Namespace and Name

The observer factory class name as well as the namespace and name for the new observer must be added to the engine configuration via the configuration API or using the XML configuration file. The configuration shown below is XML however the same options are available through the configuration API:

<esper-configuration
  <plugin-pattern-observer namespace="myplugin" name="file_exists" 
    factory-class="net.esper.regression.client.MyFileExistsObserverFactory" />
</esper-configuration>

The new observer is now ready to use in a statement. The next pattern statement checks every 10 seconds if the given file exists, and indicates to the listener when the file is found.

select * from pattern [every timer:interval(10 sec) -> myplugin:file_exists("myfile.txt")]

© 2007 EsperTech Inc. All Rights Reserved