Sensor Event Service Contributor Information

Overview

This page is dedicated to developers of the SensorEventService. It gives some insights about the general work-flow of requests and how the actual data is processed. If you have any questions, please get in touch with the community using the Sensor Web Mailing List.

General Concept

The 52°North SES implementation's HTTP interface is based on Apache Muse 2.2.0. Unfortunately, the project got discontinued shortly after the decision to use it was made. Thus, we have installed our own Muse 2.2.0 branch in the 52N SVN Repo to be able to fix bugs and add features on our own. It is currently closed accessible, so please get in touch with the IT-IS team if you need access (info can be found on the account information page).

The muse framework provides the general business logic for handling requests in compliance with the SensorEventServiceDeveloperInformation.

WS-N Specifications

The Web Service Notifications specifications is a family of standards developed by OASIS. It defines interfaces and Requests/Responses to enable web services with interoperable publish/subscribe functionality. The default binding of the SES is based upon SOAP 1.2.

WS Base Notification

HELP see SensorEventServiceInterface for details of the specification.

The 52N SES implements both the NotificationProducer ( class SESNotificationProducer) and the NotificationConsumer ( class SESNotificationConsumer). It is thus capable of retrieving and producing/publishing Notification messages.

Besides these two resources the 52N SES implements the SubscriptionManager ( class SESSubscriptionManager) interface. It provides functionality to pause/renew/remove a subscription instance.

WS Brokered Notification

HELP see SensorEventServiceInterface for details of the specification.

As previously stated, the SES implements the Producer and Consumer side of the WS-N specification. Used in combination with the SubscriptionManager, these two interfaces build the basis for the NotificationBroker implementation of the SES. Publishers can send messages to the SES, and consumers can subscribe for a specific subset of theses messages.

Resource Management and Lifecycles

The way muse and thus the SES manages resources and its lifetime is closely coupled to the SensorEventServiceDeveloperInformation specification. Initial lifetime of a resource can be defined inside the first request (e.g. wsnt:InitialTerminationTime for a Subscribe request) or in an additional request (e.g. wsrf:RequestedLifetimeDuration for a wsntw:SetTerminationTimeRequest request).

After startup of the SES, all resources get loaded in a predefined order. The first resources are the SubscriptionManager instances (the default one and optionally additional persisted instances).

SubscriptionManager

The SESSubscriptionManager is the first class of the SES core package which is loaded. In particular, the ResourceManager of muse calls the initialize() method since a resource file in the dedicated router-entries folder is available by default. In this method, the main setup of the SES environment is done. The singleton SensorEventServiceDeveloperInformation, the IFilterEngine implementation as well as the IUnitConverter instance is loaded. This is done only one time, embedded into a mutex-synchronized block. To add new resources or classes that need to be loaded on startup, simply append your code to the initializeSESResources() method, which is called within the mutexed block.

Beside the startup initialization, the SESSubscriptionManager is instantiated for every requested Subscription. IDs are assigned by the SESResourceIdFactory.SubscriptionIdFactory. These instances can be managed using the WS Resource interfaces ( SetTerminationTimeRequest, DestroyRequest) and are removed using the unsubscribe() method which is called within an Unsubscribe request. The SES also implements the PausableSuscriptionManager capability. Thus, Subscriptions can be paused/resumed using the appropriate WS-N method (see spec).

NotificationProducer

The SESNotificationProducer gets instantiated only once, though subsequent to the initialization of the SESSubscriptionManager. Consequently, it has access to a fully prepared SensorEventServiceDeveloperInformation. It never gets shutdown before the whole service is shutdown as well.

PublisherEndpoint

A PublisherEndpoint instance is the result of a successful RegisterPublisher request. IDs are assigned by the SESResourceIdFactory.PublisherIdFactory. Similar to the SubscriptionManager, it can be managed using dedicated interface methods. In particular, RenewRegistration and DestroyRegistration requests can be used to extend the lifetime or to remove the publisher.

ConfigurationRegistry

The ConfigurationRegistry provides access to the SES' configuration file entries. It is available as a singleton using the getInstance() method. It holds static keys for save retrieval of configuration parameters using the getPropertyForKey() method. Besides the provision of parameters, this class can be used as a global communication/registry component, as it is available from all classes of the SES. For instance, it provides access to the IFilterEngine and IUnitConverter instances.

Entry Point of Data

The following subsections summarize the data-flow for certain incoming requests. They are supposed to provide a better understanding of the component chaining as implemented in the 52N SES.

Subscriptions and Publisher Registrations can also be triggered from persisted XML instances. The workflow in general stays the same, just the caller is different ( SESFilePersistence).

Subscriptions

The starting point for every subscription is a valid wsnt:Subscribe request sent to the NotificationProducers' port type. From there on, the ResourceRouter calls the SESNotificationProducer's subscribe(...) method. Beforehand, the wsnt:Filter element was separated into its wsnt:MessageContent children (using the registered FilterFactoryHandler implementations). Each MessageContent is represented as a Filter instance and could also be bundled in a FilterCollection.

If the FilterCollection contains a SESConstraintFilter (meaning it has a level-2 or level-3 filter) it gets registered at the IFilterEngine instance using the registerFilter() method. This is where the actual parsing and processing of the MessageContent's markup takes place.

Notifications

Work-flow of notifications is a bit trickier than for e.g. subscriptions. This is, because for consuming and brokering notifications all roles of the Brokered Notification come into play. Again, the starting point is a valid wsnt:Notify request, which consists of one wsnt:NotificationMessage element. Besides a wsnt:Topic, the wsnt:Message element is included which holds the actual payload as xml:any. The qualified names of the contents can be retrieved using the NotificationMessage's getMessageContentNames() method.

After pre-processing of the message, the ResourceRouter calls the SESNotificationConsumer notify() method. Here, the message is inserted into a FIFO-queued thread pool encapsulated within a NotifyThread instance. For convenience, the thread pool is currently a single-threaded solution (see SensorEventServiceDeveloperInformation for details). When it is the thread's turn, it calls all registered NotificationMessageListener implementations. For the default setup, this is just the SESMessageListener which calls the method publishCompleteMessage() of the SESNotificationProducer instance.

SESNotificationProducer

Finally, the message made it to the SESNotificationProducer, where it is at first inserted into the actual IFilterEngine instance (see SensorEventServiceDeveloperInformation). As the second step, the publish() method of the SESNotificationProducer is called. Here, the message is pushed into every available SESSubscriptionManager without a SESConstraintFilter where it is checked against the registered Filters. For simple Filters which only consist of Level-1 instances (e.g. XPath expressions) there is no need for pattern/causality matching. Thus every NotificationMessage can be treated as an independent discrete message. Therefore, simply all Filter instances' accepts() method are called. If all instances returned true, the message is forwarded to the subscribers EndpointReference.

For complex Filters (Level 2 and 3), a completely different solution was needed, as those complex filters could filter on patterns of messages (e.g. a threshold undershoot followed by a threshold overshoot, a pattern involving mutliple messages).

EsperFilterEngine

This is where the actual magic happens. Esper is the underlying event stream processing engine which is used for Event Pattern matching. As the SES is capable of filtering messages based on stream patterns (e.g. EML Patterns) we had to adjust the default implementation of Filter matching. If a subscriber provided a filter of level 2 or 3 (= SESConstraintFilter), the default filter matching is bypassed (HELP note that the accepts() method of SESConstraintFilter always returns false). In return, the order of filter matching is reversed. After the EsperFilterEngine (the current default implementation of IFilterEngine) detects a pattern match for a message, every other Level-1 filter is being checked. In particular, this is done inside the StatementListener doOutput() method of the used EML module. Here, the publish() method of the corresponding SESSubscriptionManager instance is called. From thereon the same work-flow as for Level-1 filters is applied. ! The FilterEngine is responsible for triggering the parsing of the NotificationMessage. For this purpose, it holds a list of all available Parsers (every Parser must implement AbstractParser) which is iterated. If a Parser instance returns true for the accept() method, the parse() method is called. The Parser is then responsible for creating a List of MapEvent instances. MapEvent is an implementation of java.util.Map and is designed to store the parsed properties in a key-value-pair manner. As Esper is capable of processing instances of java.util.Map, it is possible to just push the results of the Parser into Esper.

Publisher Registrations

This data-flow is very similar to Subscriptions. The ResourceRouter calls the registerPublisher() method of the RegisterPublisher class. Here, a PublisherEndpoint instance is created and initialized. in particular, the SensorML is processed and the data types are registered at the DataTypesMap. This is needed, as in some situations subscriptions and/or notifications depend on metadata (e.g. unit of measurement) of a publisher.

Message Filtering

A substantial part of a Subscription is the definition of a message filter. With this filter, the user is able to define a subset of messages he is interested in. The SES defines three levels of filtering as illustrated in the following sections.

Level 1

This filter uses XPath to define the filter criteria that shall be applied on each incoming notification separately. The SES uses the built-in version of muse (MessagePatternFilter) which is based on Xalan. A single NotificiationMessage is evaluated against the given XPath expression and if true is returned, the NotificationMessage is pushed to the EndpointReference of the Subscription.

Level 2

Level-2 filters are encoded using the Filter Encoding Specification 2.0. As EML as well uses FES 2.0 in its Guards the realization of Level-2 filters in the 52N SES is basically the same as for Level-3. Depending on the used EML version, the FES 2.0 Filter markup is wrapped by a static EML document and then treated as Level-3. See the class StaticEMLDocument for the details of this wrapped document.

Level 3 / EML Modules

This filter level is designed to recognize patterns in a stream of messages. It is based on the Event Pattern Markup Language which is a powerful but also quite sophisticated definition language. The details of this specification go beyond the scope of this arcticle. In the following section, the internal handling of EML subscriptions will be summarized. In particular, the mapping from EML to EPL (an SQL-like syntax language used in the internals of esper) and some special cases (e.g. spatial filtering) are described.

At first, it should be mentioned that the current SES implementation supports EML Schema versions 0.0.1 (which is the one used in the OGC discussion paper) as well as 0.0.2 (some handy enhancements). Nevertheless, due to some reasons both modules can not run in parallel, so you should use the dedicated maven profile (located in the 52n-ses/pom.xml) to activate the version of your choice (eml_002 is the default btw).

Both modules in general are structured in the same way. In particular, the entry point is an implementation of ILogicController, the EsperController for both modules (though located in different packages). It is set up using the initialize() method. Here, the EML markup is parsed. Namely, the parsing method iterates through all SimplePatterns, ComplexPatterns, TimerPatterns and RepetitivePatterns. Every pattern is represented by a dedicated Java object after successful parsing and is then transformed into an EPL statement.

An EPL Statement (HELP see http://esper.codehaus.org/esper-2.3.0/doc/reference/en/html/epl_clauses.html for details) is registered at Esper from within the initializeListeners() method. The concept of Esper is based on Listeners which are linked to Statements. So, for every Statement a Listener (in most cases an instance of StatementListener) is attached. A Listener gets informed when Esper detected a match for its Statement. The concept of EML backed by Esper is the creation of different streams and multiple patterns (see http://52north.org/ses for conceptual details). The following figure illustrates the workflow of a NotificationMessage and the concept of Statement + Listeners.

data-flow-notification.png

When a Statement triggered a match, the update() method of the StatementListener is called. From here on a new instance of UpdateHandlerThread is created and submitted to a thread pool. The UpdateHandlerThread is responsible for checking if the message must be inserted into another stream or if it should generate an output to the subscriber. If the latter is the case, simply the doOutput() method of the StatementListener is called where then the SESSubscriptionMananger is called via the publish() or sendSESNotificationMessge() method. Further processing is made as described in SensorEventServiceDeveloperInformation.

EML Version 0.0.1

Including version 0.0.1 instead of the default 0.0.2 can be achieved by activating the eml_001 maven profile (e.g. mvn clean install -P eml_001 on 52-ses or 52n-ses-core).

EML Version 0.0.2

This is the default implementation used by the SES. Its maven profile eml_002 is activated by default. In comparison to version 0.0.1 some XML Schema changes have been made to address common drawbacks. This in general lead to adjustments in the EML parsing routines (classes EMLParser and EsperController) but did not change the overall behavior of pattern processing.

Version 0.0.2 introduced a new feature, the eml:GenericView. This view is a generic powerful tool when combined with a UserDefinedSelectFunction. An example of such a view is the DynamicSpatialBufferView class which implements spatial filtering of incoming messages against a dynamic geometry.

Spatial Filtering

Spatial filtering within the SES is based on the Java Topology Suite. As it is available in Level-2 filters and upwards it has to be integrated into esper (see SensorEventServiceDeveloperInformation). This is done by providing a custom function library to esper (see http://esper.codehaus.org/esper-2.3.0/doc/reference/en/html/functionreference.html for details). While initializing the EsperController the private method registerCustomFunctions() is called. Here, the package name gets imported into Esper and methods of its classes can then be used within EPL Statements.

The actual filtering is applied in the SpatialMethods which provides wrapper methods to JTS function calls. Every method takes two Geometry objects and evaluates these against the underlying spatial method (e.g. an intersection of two geometries).

Since JTS is only a topology framework, it is not capable of applying any reference system specific transformations. Nevertheless, the SES supports some FES 2.0 spatial operations such as fes:DWithin or fes:Beyond which take a distance and calculate a buffer around a given geometry before applying the actual topology evaluation. In the current implementation of the SES this is achieved by a PostgreSQL/PostGIS backend. By following this approach we benefit from robust and performant transformation methods. The drawback for the user is the need for setting up a complete database system (which is never used for actual storing any data).

An exemplary implementation of a buffer is realized in the ADistanceBufferFilter class of both EML modules. In its createExpressionForDistanceFilter() function the actual buffering is triggered by using an instance of PostGisCreateBuffer.

Adding New UserDefinedSelectFunction instances

For some use cases and environments it could be required to define a new select function which addresses certain needs of a scenario. This must be done in the getSelectString() method of class SelFunction (in the EML module you are using). Here, you should append an additional else if clause to the if (this.functionName.equals(...)) control block. There are many examples of select functions already present which will help you to understand the general concept.

Implementing a GenericView

The GenericView of EML 0.0.2 is processed in the getViewString() method of class DataView (similar to a UserDefinedSelectFunction). There is a dedicated else if control block after the if (this.esperString.equals("")) statement where you can implement the view specifics. Currently, there is an implementation for the DynamicSpatialBufferView which you can adopt for your own GenericView. HELP Note that a GenericView is often coupled with a UserDefinedSelectFunction and thus can be identified by the name of that Select function.

Concurrent Handling of Messages

In earlier versions of the SES, major misbehavior of pattern matching has been identified due to wrong order of message processing. This happend mainly in situations with higher data rates (> 5/sec) and was caused by parallel unsynchronized parsing/processing of messages. The following figure illustrates the problem of race conditions which used to occur in such cases.

ses_race-conditions.png

The SESNotificationConsumer created a NotifyThread for every incoming messages and forced it's start immediately after creation. Consequently, the entry order into the EsperFilterEngine (here: 2-1-3 instead of 1-2-3) was out of control as Java does not support any kind of thread priorisation. As correct message order is the highest requirement when working on patterns of messages, a robust and performant way of dealing with high data rates was an essential need for the SES to work properly in such use cases. The following sections provide information on how this is realized.

Default Implementation

Concurrency handling always adds a synchronization/thread-management overhead to an application. Thus, ordered concurrency handling of the SES can be disabled using a configuration key ( USE_CONCURRENT_ORDERED_HANDLING = false). This is suitable in cases where no complex stream processing is needed (e.g. only Level-1 filtering, level-2 with discrete independent messages). If not disabled, the SES uses its default implementation, a FIFO-ordered queue, for incoming messages.

The solution is quite straightforward and can be separated into three steps:

  1. Do not separate processing into multiple threads before pushing messages into the EsperFilterEngine (especially this means that everything from arriving at the HTTP port until inserting into the FilterEngine must be done in a single thread). The EsperFilterEngine reserves a position in the FIFO queue in strict order of arrival.
  2. After reserving the position, create a HandlerThread to parse and process the contents of the message.
  3. After processing has finished, inform the FIFO queue about succesful processing. The FIFO queue pushes the processed message back to the EsperFilterEngine (which implements IPollListener for that purpose).
ses_fifo-queue2.png

As you might guess, this causes some trouble when dealing with high data rates and costly processing of messages. Therefore, a timeout estimation algorithm is integrated to prevent the FIFO queue from blocking if an exception occured or processing of an element takes too long. See SensorEventServiceDeveloperInformation for details.

Let's get a bit into detail of the implementation, namely illustrating the used data structures and investigating data-flow. The SES' default implementation is the class FIFOWorker which consists of a single thread to pull available data from an internal FIFO queue. It implements IConcurrentNoticitionHandler and thus could be replaced by other solutions. Step 1 as mentioned above is realized in the filter() method of the EsperFilterEngine. Here, we call the insertPendingEventCollection() method of IConcurrentNotificationHandler. In return we get an empty QueuedMapEventCollection which acts as our container for processed MapEvents (the internal representation of a processed NotificationMessage). Then a new HandlerThread, namely a NotificationMessageProcessor, is created and submitted to a dedicated thread pool. The actual processing of the message ( step 2 as mentioned above) takes place in the run() method of these threads and in a final step, the IConcurrentNotificationHandler gets notified.

Notification of the IConcurrentNotificationHandler is done by two method calls. First, the setCollection() method of the QueuedMapEventCollection is called. Inside this method, the notifyAll() method of a monitor object is called. This is needed as the FIFOWorker could wait on this monitor as it already tried to process the contents of this message but failed (so it decided to wait on it for a certain amount of time). The second call is a more general one. By triggering the notifyOnDataAvailability() method of IConcurrentNotficiationHandler we inform the FIFOWorker about the general availability of data. The FIFOWorker could wait on its own monitor object which gives information about the contents of the internal FIFO queue. In particular, the FIFOWorker waits on this monitor when he managed to empty its FIFO queue.

When the FIFOWorker was able to poll the head of the internal queue (meaning the message got processed and is available) it conducts step 3 as mentioned above. It calls onElementPolled() of its IPollListener instance for every MapEvent stored in the QueuedMapEventCollection. Therefore, the processed message is pushed back to the EsperFilterEngine as it registered itself as the IPollListener beforehand. The EsperFilterEngine is then responsible for pushing the single MapEvents into the actual esper processing engine (see SensorEventServiceDeveloperInformation). As FIFOWorker is realized in a single thread, we have ensured that the correct order of messages is satisfied.

Timeout/Exception Handling

As the nature of a FIFO queue implies the strict order of polling elements, occurrence of exceptions or high processing latencies (e.g. caused by a slow WFS instance while enriching the data) has to be treated specially. Let's assume a NotificationMessageProcessor which did not catch a SocketTimeoutException properly and thus was never able to inform the FIFOWorker about its failure. The FIFOWorker would be stuck in an endless wait() call on the thread's QueuedMapEventCollection monitor object. To preempt such events, a timeout handling algorithm is implemented in the FIFOWorker. In general, there are two ways of defining timeouts: always use a static timeout (configurable through CONCURRENT_INTELLIGENT_TIMEOUT = false) or use an "intelligent" timeout estimation.

The SES' default timeout estimation algorithm is loosely based on the Inverse distance weighting method and could also be replaced by other implementations of ITimeoutEstimation (e.g. an implementation which always returns the maximum processing period would be much more reliable than IDW but also less performant). It takes a constant number of processing periods (measured by the NotificationMessageProcessor threads) as an input for predicting future processing periods. Such implementation is a consensus of performance and reliability. It is dynamic with respect to investigations made in the past and thus could deal with high CPU load or huge latencies of external web services.

Stress Test Results

The concurrent handling of messages has been tested regarding performance and reliability. Three different test cases have been run: Single O&M Observations, Single O&M Observations with simulated processing latency (2-3 seconds) and Single O&M Observations with random exceptions (every 10th message averagely). All test case's messages series have been applied to an EML subscription with two SimplePatterns and one ComplexPattern (threshold undershoot followed by a threshold overshoot).

FIFOWorker with IDWTimeoutEstimation implementation (Revision 11732; Ubuntu 11.10 with a Tomcat 6 running on an Intel(R) Core(TM) i5 CPU M 430 @ 2.27GHz (2 Cores) with 4GBs of RAM):
Test Case Messages/second Message Count Time to empty FIFO Lost message count
Single O&M 200 30000 0s 0
Single O&M with high latency 10 100 53s 0
Single O&M with exceptions 100 2000 110s 200 (those which throwed an exception)

If you want to run these tests on your own, you must enable the testing flag (performanceTesting) in the EsperFilterEngine. If enabled, the EsperFilterEngine calculates the time it takes to empty the FIFO queue (last column). This can only be done in the unregisterFilter() method and thus you should instantly invoke an Unsubscribe request after you have pushed all messages into the SES. Additionally, you could activate latency simulation (testingSimulateLatency) or random exceptions (testingThrowRandomExceptions).

Eclipse Project Setup

Prerequisites:
  • Eclipse with a Git plugin (Included in recent releases)
  • Apache Maven (version 2 and 3 tested)
  • m2eclipse plugin installed and setup to use with your maven-2.2.1 installation (included in Eclipse 4.4+)

First, you have to check-out the 52n-ses project from https://github.com/52North/SES. Use "Check out as a project configured using the New Project Wizard". Name it e.g. "52n-ses", use a simple project (no Java project needed at this phase).

Right-click on the newly created project and select "Configure/Convert to Maven Project". Eclipse will probably take a while to activate the Maven nature. After processing is finished, your workspace should look similar to this:

ses-eclipse-1.jpg

Now, right-click on your maven project and select "Import"-> "Maven/Existing Maven Projects". m2eclipse will automatically import the subprojects (modules) into the root folder of your workspace. After successful importing the modules, your workspace should look like this:

ses-eclipse-2.jpg

Thats all you got to do. The SES should now be available in your workspace. You can compile it by right-clicking on the root project ("52n-ses") and selecting "Run as/Maven build" -> type "install" as the goal.

Sourc Code Project Structure

The repository of all SES-related components is located at https://github.com/52North/SES. The repository's layout is organized as follow.

  • master/develop branches
    • 52n-ses-api - an API module providing classes used in multiple other modules
    • 52n-ses-bindings - module for the XML bindings (compiled XMLBeans)
    • 52n-ses-core - the core of the SES where the main logic is implemented
    • 52n-ses-eml-001 - EML schema version 0.0.1 event streaming module
    • 52n-ses-eml-002 - EML schema version 0.0.2 event streaming module
    • 52n-ses-utils - module for serving general utilities (XMLHelper methods, ConfigurationRegistry, Parsing, Concurrency Handling)

I Attachment Action Size Date Who Comment
data-flow-notification.pngpng data-flow-notification.png manage 86 K 28 Mar 2012 - 09:43 UnknownUser  
highlevel_architecture.pngpng highlevel_architecture.png manage 82 K 26 Mar 2012 - 09:34 UnknownUser  
ses-eclipse-1.jpgjpg ses-eclipse-1.jpg manage 195 K 27 Mar 2012 - 13:40 UnknownUser  
ses-eclipse-2.jpgjpg ses-eclipse-2.jpg manage 155 K 27 Mar 2012 - 13:39 UnknownUser  
ses_fifo-queue.pngpng ses_fifo-queue.png manage 82 K 26 Mar 2012 - 15:02 UnknownUser  
ses_fifo-queue2.pngpng ses_fifo-queue2.png manage 101 K 27 Mar 2012 - 13:43 UnknownUser  
ses_race-conditions.pngpng ses_race-conditions.png manage 45 K 26 Mar 2012 - 15:02 UnknownUser  
Topic revision: r15 - 23 Apr 2015, EikeJuerrens
Legal Notice | Privacy Statement


This site is powered by FoswikiCopyright © by the contributing authors. All material on this collaboration platform is the property of the contributing authors.
Ideas, requests, problems regarding Wiki? Send feedback