Sensor Event Service Contributor Information
This page is dedicated to developers of the SensorEventService
. It gives some insights about the general work-flow of requests and how the actual data is processed. If you have any questions, please get in touch with the community using the Sensor Web Mailing List
The 52°North SES implementation's HTTP interface is based on Apache Muse 2.2.0
. Unfortunately, the project got discontinued shortly after the decision to use it was made. Thus, we have installed our own Muse 2.2.0 branch in the 52N SVN Repo to be able to fix bugs and add features on our own. It is currently closed accessible, so please get in touch with the IT-IS team if you need access (info can be found on the account information
The muse framework provides the general business logic for handling requests in compliance with the SensorEventServiceDeveloperInformation
The Web Service Notifications specifications
is a family of standards developed by OASIS
. It defines interfaces and Requests/Responses to enable web services with interoperable publish/subscribe functionality. The default binding of the SES is based upon SOAP 1.2
WS Base Notification
for details of the specification.
The 52N SES implements both the NotificationProducer (
) and the NotificationConsumer (
). It is thus capable of retrieving and producing/publishing Notification messages.
Besides these two resources the 52N SES implements the SubscriptionManager (
) interface. It provides functionality to pause/renew/remove a subscription instance.
WS Brokered Notification
for details of the specification.
As previously stated, the SES implements the Producer and Consumer side of the WS-N specification. Used in combination with the SubscriptionManager, these two interfaces build the basis for the NotificationBroker
implementation of the SES. Publishers can send messages to the SES, and consumers can subscribe for a specific subset of theses messages.
Resource Management and Lifecycles
The way muse and thus the SES manages resources and its lifetime is closely coupled to the SensorEventServiceDeveloperInformation
specification. Initial lifetime of a resource can be defined inside the first request (e.g.
request) or in an additional request (e.g.
After startup of the SES, all resources get loaded in a predefined order. The first resources are the SubscriptionManager instances (the default one and optionally additional persisted instances).
is the first class of the SES core package which is loaded. In particular, the ResourceManager of muse calls the
method since a resource file in the dedicated router-entries folder is available by default. In this method, the main setup of the SES environment is done. The singleton SensorEventServiceDeveloperInformation
implementation as well as the
instance is loaded. This is done only one time, embedded into a mutex-synchronized block. To add new resources or classes that need to be loaded on startup, simply append your code to the
method, which is called within the mutexed block.
Beside the startup initialization, the
is instantiated for every requested Subscription. IDs are assigned by the
. These instances can be managed using the WS Resource interfaces (
) and are removed using the
method which is called within an
request. The SES also implements the PausableSuscriptionManager capability. Thus, Subscriptions can be paused/resumed using the appropriate WS-N method (see spec).
gets instantiated only once, though subsequent to the initialization of the SESSubscriptionManager. Consequently, it has access to a fully prepared SensorEventServiceDeveloperInformation
. It never gets shutdown before the whole service is shutdown as well.
instance is the result of a successful
request. IDs are assigned by the
. Similar to the SubscriptionManager, it can be managed using dedicated interface methods. In particular,
requests can be used to extend the lifetime or to remove the publisher.
provides access to the SES' configuration file entries. It is available as a singleton using the
method. It holds static keys for save retrieval of configuration parameters using the
method. Besides the provision of parameters, this class can be used as a global communication/registry component, as it is available from all classes of the SES. For instance, it provides access to the
Entry Point of Data
The following subsections summarize the data-flow for certain incoming requests. They are supposed to provide a better understanding of the component chaining as implemented in the 52N SES.
Subscriptions and Publisher Registrations can also be triggered from persisted XML instances. The workflow in general stays the same, just the caller is different (
The starting point for every subscription is a valid
request sent to the NotificationProducers' port type. From there on, the ResourceRouter calls the SESNotificationProducer's
method. Beforehand, the
element was separated into its
children (using the registered
implementations). Each MessageContent is represented as a
instance and could also be bundled in a
If the FilterCollection
(meaning it has a level-2 or level-3 filter) it gets registered at the
instance using the
method. This is where the actual parsing and processing of the MessageContent's markup takes place.
Work-flow of notifications is a bit trickier than for e.g. subscriptions. This is, because for consuming and brokering notifications all roles of the Brokered Notification come into play. Again, the starting point is a valid
request, which consists of one
element. Besides a
element is included which holds the actual payload as xml:any. The qualified names of the contents can be retrieved using the NotificationMessage's
After pre-processing of the message, the ResourceRouter calls the
method. Here, the message is inserted into a FIFO-queued thread pool encapsulated within a
instance. For convenience, the thread pool is currently a single-threaded solution (see SensorEventServiceDeveloperInformation
for details). When it is the thread's turn, it calls all registered
implementations. For the default setup, this is just the
which calls the method
of the SESNotificationProducer instance.
Finally, the message made it to the SESNotificationProducer, where it is at first inserted into the actual IFilterEngine instance (see SensorEventServiceDeveloperInformation
). As the second step, the
method of the SESNotificationProducer is called. Here, the message is pushed into every available SESSubscriptionManager without a
where it is checked against the registered Filters. For simple Filters which only consist of Level-1 instances (e.g. XPath expressions) there is no need for pattern/causality matching. Thus every NotificationMessage can be treated as an independent discrete message. Therefore, simply all Filter instances'
method are called. If all instances returned
, the message is forwarded to the subscribers EndpointReference.
For complex Filters (Level 2 and 3), a completely different solution was needed, as those complex filters could filter on patterns
of messages (e.g. a threshold undershoot followed by a threshold overshoot, a pattern involving mutliple messages).
This is where the actual magic happens. Esper
is the underlying event stream processing engine which is used for Event Pattern matching. As the SES is capable of filtering messages based on stream patterns (e.g. EML Patterns) we had to adjust the default implementation of Filter matching. If a subscriber provided a filter of level 2 or 3 (=
), the default filter matching is bypassed (
note that the
). In return, the order of filter matching is reversed. After
(the current default implementation of
) detects a pattern match for a message, every other Level-1 filter is being checked. In particular, this is done inside the
method of the used EML module. Here, the
method of the corresponding SESSubscriptionManager instance is called. From thereon the same work-flow as for Level-1 filters is applied.
The FilterEngine is responsible for triggering the parsing of the NotificationMessage. For this purpose, it holds a list of all available Parsers (every Parser must implement
) which is iterated. If a Parser instance returns
method is called. The Parser is then responsible for creating a List of
is an implementation of
and is designed to store the parsed properties in a key-value-pair manner. As Esper is capable of processing instances of
, it is possible to just push the results of the Parser into Esper.
This data-flow is very similar to Subscriptions. The ResourceRouter calls the
method of the
class. Here, a
instance is created and initialized. in particular, the SensorML
is processed and the data types are registered at the
. This is needed, as in some situations subscriptions and/or notifications depend on metadata (e.g. unit of measurement) of a publisher.
A substantial part of a Subscription is the definition of a message filter. With this filter, the user is able to define a subset of messages he is interested in. The SES defines three levels of filtering as illustrated in the following sections.
This filter uses XPath to define the filter criteria that shall be applied on each incoming notification separately. The SES uses the built-in version of muse (
) which is based on Xalan
. A single NotificiationMessage is evaluated against the given XPath expression and if
is returned, the NotificationMessage is pushed to the EndpointReference of the Subscription.
Level-2 filters are encoded using the Filter Encoding Specification 2.0
. As EML as well uses FES 2.0 in its Guards the realization of Level-2 filters in the 52N SES is basically the same as for Level-3. Depending on the used EML version, the FES 2.0 Filter markup is wrapped by a static EML document and then treated as Level-3. See the class
for the details of this wrapped document.
Level 3 / EML Modules
This filter level is designed to recognize patterns in a stream of messages. It is based on the Event Pattern Markup Language
which is a powerful but also quite sophisticated definition language. The details of this specification go beyond the scope of this arcticle. In the following section, the internal handling of EML subscriptions will be summarized. In particular, the mapping from EML to EPL (an SQL-like syntax language used in the internals of esper) and some special cases (e.g. spatial filtering) are described.
At first, it should be mentioned that the current SES implementation supports EML Schema versions 0.0.1 (which is the one used in the OGC discussion paper) as well as 0.0.2 (some handy enhancements). Nevertheless, due to some reasons both modules can not run in parallel, so you should use the dedicated maven profile (located in the 52n-ses/pom.xml) to activate the version of your choice (eml_002 is the default btw).
Both modules in general are structured in the same way. In particular, the entry point is an implementation of
for both modules (though located in different packages). It is set up using the
method. Here, the EML markup is parsed. Namely, the parsing method iterates through all SimplePatterns, ComplexPatterns, TimerPatterns and RepetitivePatterns. Every pattern is represented by a dedicated Java object after successful parsing and is then transformed into an EPL statement.
An EPL Statement (
for details) is registered at Esper from within the
method. The concept of Esper is based on Listeners which are linked to Statements. So, for every Statement a Listener (in most cases an instance of
) is attached. A Listener gets informed when Esper detected a match for its Statement. The concept of EML backed by Esper is the creation of different streams and multiple patterns (see http://52north.org/ses
for conceptual details). The following figure illustrates the workflow of a NotificationMessage and the concept of Statement + Listeners.
When a Statement triggered a match, the
method of the
is called. From here on a new instance of
is created and submitted to a thread pool. The
is responsible for checking if the message must be inserted into another stream or if it should generate an output to the subscriber. If the latter is the case, simply the
method of the StatementListener is called where then the
is called via the
method. Further processing is made as described in SensorEventServiceDeveloperInformation
EML Version 0.0.1
Including version 0.0.1 instead of the default 0.0.2 can be achieved by activating the
maven profile (e.g.
mvn clean install -P eml_001
on 52-ses or 52n-ses-core).
EML Version 0.0.2
This is the default implementation used by the SES. Its maven profile
is activated by default. In comparison to version 0.0.1 some XML Schema changes have been made to address common drawbacks. This in general lead to adjustments in the EML parsing routines (classes
) but did not change the overall behavior of pattern processing.
Version 0.0.2 introduced a new feature, the eml:GenericView. This view is a generic powerful tool when combined with a UserDefinedSelectFunction. An example of such a view is the
class which implements spatial filtering of incoming messages against a dynamic geometry.
Spatial filtering within the SES is based on the Java Topology Suite
. As it is available in Level-2 filters and upwards it has to be integrated into esper (see SensorEventServiceDeveloperInformation
). This is done by providing a custom function library to esper (see http://esper.codehaus.org/esper-2.3.0/doc/reference/en/html/functionreference.html
for details). While initializing the
the private method
is called. Here, the package name gets imported into Esper and methods of its classes can then be used within EPL Statements.
The actual filtering is applied in the
which provides wrapper methods to JTS function calls. Every method takes two
objects and evaluates these against the underlying spatial method (e.g. an intersection of two geometries).
Since JTS is only a topology
framework, it is not capable of applying any reference system specific transformations. Nevertheless, the SES supports some FES 2.0 spatial operations such as
which take a distance and calculate a buffer around a given geometry before applying the actual topology evaluation. In the current implementation of the SES this is achieved by a PostgreSQL/PostGIS
backend. By following this approach we benefit from robust and performant transformation methods. The drawback for the user is the need for setting up a complete database system (which is never used for actual storing any data).
An exemplary implementation of a buffer is realized in the
class of both EML modules. In its
function the actual buffering is triggered by using an instance of
Adding New UserDefinedSelectFunction instances
For some use cases and environments it could be required to define a new select function which addresses certain needs of a scenario. This must be done in the
method of class
(in the EML module you are using). Here, you should append an additional
clause to the
control block. There are many examples of select functions already present which will help you to understand the general concept.
Implementing a GenericView
The GenericView of EML 0.0.2 is processed in the
method of class
(similar to a UserDefinedSelectFunction). There is a dedicated
control block after the
statement where you can implement the view specifics. Currently, there is an implementation for the
which you can adopt for your own GenericView.
Note that a GenericView is often coupled with a UserDefinedSelectFunction and thus can be identified by the name of that Select function.
Concurrent Handling of Messages
In earlier versions of the SES, major misbehavior of pattern matching has been identified due to wrong order of message processing. This happend mainly in situations with higher data rates (> 5/sec) and was caused by parallel unsynchronized parsing/processing of messages. The following figure illustrates the problem of race conditions which used to occur in such cases.
for every incoming messages and forced it's start immediately after creation. Consequently, the entry order into the EsperFilterEngine (here: 2-1-3 instead of 1-2-3) was out of control as Java does not support any kind of thread priorisation. As correct message order is the highest requirement when working on patterns of messages, a robust and performant way of dealing with high data rates was an essential need for the SES to work properly in such use cases. The following sections provide information on how this is realized.
Concurrency handling always adds a synchronization/thread-management overhead to an application. Thus, ordered concurrency handling of the SES can be disabled using a configuration key (
). This is suitable in cases where no complex stream processing is needed (e.g. only Level-1 filtering, level-2 with discrete independent messages). If not disabled, the SES uses its default implementation, a FIFO-ordered queue, for incoming messages.
The solution is quite straightforward and can be separated into three steps:
- Do not separate processing into multiple threads before pushing messages into the EsperFilterEngine (especially this means that everything from arriving at the HTTP port until inserting into the FilterEngine must be done in a single thread). The EsperFilterEngine reserves a position in the FIFO queue in strict order of arrival.
- After reserving the position, create a HandlerThread to parse and process the contents of the message.
- After processing has finished, inform the FIFO queue about succesful processing. The FIFO queue pushes the processed message back to the EsperFilterEngine (which implements IPollListener for that purpose).
As you might guess, this causes some trouble when dealing with high data rates and costly processing of messages. Therefore, a timeout estimation algorithm is integrated to prevent the FIFO queue from blocking if an exception occured or processing of an element takes too long. See SensorEventServiceDeveloperInformation
Let's get a bit into detail of the implementation, namely illustrating the used data structures and investigating data-flow. The SES' default implementation is the class
which consists of a single thread to pull available data from an internal FIFO queue. It implements
and thus could be replaced by other solutions. Step 1
as mentioned above is realized in the
method of the EsperFilterEngine. Here, we call the
method of IConcurrentNotificationHandler. In return we get an empty
which acts as our container for processed
(the internal representation of a processed
). Then a new HandlerThread, namely a
, is created and submitted to a dedicated thread pool. The actual processing of the message ( step 2
as mentioned above) takes place in the
method of these threads and in a final step, the IConcurrentNotificationHandler gets notified.
Notification of the IConcurrentNotificationHandler is done by two method calls. First, the
method of the
is called. Inside this method, the
method of a monitor object is called. This is needed as the FIFOWorker could wait on this monitor as it already tried to process the contents of this message but failed (so it decided to wait on it for a certain amount of time). The second call is a more general one. By triggering the
method of IConcurrentNotficiationHandler we inform the FIFOWorker about the general availability of data. The FIFOWorker could wait on its own monitor object which gives information about the contents of the internal FIFO queue. In particular, the FIFOWorker waits on this monitor when he managed to empty its FIFO queue.
When the FIFOWorker was able to poll the head of the internal queue (meaning the message got processed and is available) it conducts step 3
as mentioned above. It calls
instance for every
stored in the
. Therefore, the processed message is pushed back to the EsperFilterEngine as it registered itself as the IPollListener beforehand. The EsperFilterEngine is then responsible for pushing the single MapEvents into the actual esper processing engine (see SensorEventServiceDeveloperInformation
). As FIFOWorker is realized in a single thread, we have ensured that the correct order of messages is satisfied.
As the nature of a FIFO queue implies the strict order of polling elements, occurrence of exceptions or high processing latencies (e.g. caused by a slow WFS instance while enriching the data) has to be treated specially. Let's assume a
which did not catch a
properly and thus was never able to inform the FIFOWorker about its failure. The FIFOWorker would be stuck in an endless
call on the thread's
monitor object. To preempt such events, a timeout handling algorithm is implemented in the FIFOWorker. In general, there are two ways of defining timeouts: always use a static timeout (configurable through
) or use an "intelligent" timeout estimation.
The SES' default timeout estimation algorithm is loosely based on the Inverse distance weighting
method and could also be replaced by other implementations of
(e.g. an implementation which always returns the maximum processing period would be much more reliable than IDW but also less performant). It takes a constant number of processing periods (measured by the
threads) as an input for predicting future processing periods. Such implementation is a consensus of performance and reliability. It is dynamic with respect to investigations made in the past and thus could deal with high CPU load or huge latencies of external web services.
Stress Test Results
The concurrent handling of messages has been tested regarding performance and reliability. Three different test cases have been run: Single O&M Observations, Single O&M Observations with simulated processing latency (2-3 seconds) and Single O&M Observations with random exceptions (every 10th message averagely). All test case's messages series have been applied to an EML subscription with two SimplePatterns and one ComplexPattern (threshold undershoot followed by a threshold overshoot).
implementation (Revision 11732; Ubuntu 11.10
with a Tomcat 6
running on an Intel(R) Core(TM) i5 CPU M 430 @ 2.27GHz (2 Cores)
with 4GBs of RAM
If you want to run these tests on your own, you must enable the testing flag (
) in the EsperFilterEngine. If enabled, the EsperFilterEngine calculates the time it takes to empty the FIFO queue (last column). This can only be done in the
method and thus you should instantly invoke an Unsubscribe request after you have pushed all messages into the SES. Additionally, you could activate latency simulation (
) or random exceptions (
Eclipse Project Setup
- Eclipse with a Git plugin (Included in recent releases)
- Apache Maven (version 2 and 3 tested)
- m2eclipse plugin installed and setup to use with your maven-2.2.1 installation (included in Eclipse 4.4+)
First, you have to check-out the 52n-ses project from
. Use "Check out as a project configured using the New Project Wizard". Name it e.g. "52n-ses", use a simple project (no Java project needed at this phase).
Right-click on the newly created project and select "Configure/Convert to Maven Project". Eclipse will probably take a while to activate the Maven nature. After processing is finished, your workspace should look similar to this:
Now, right-click on your maven project and select "Import"-> "Maven/Existing Maven Projects". m2eclipse will automatically import the subprojects (modules) into the root folder of your workspace. After successful importing the modules, your workspace should look like this:
Thats all you got to do. The SES should now be available in your workspace. You can compile it by right-clicking on the root project ("52n-ses") and selecting "Run as/Maven build" -> type "install" as the goal.
Sourc Code Project Structure
The repository of all SES-related components is located at
. The repository's layout is organized as follow.
52n-ses-api - an API module providing classes used in multiple other modules
52n-ses-bindings - module for the XML bindings (compiled XMLBeans)
52n-ses-core - the core of the SES where the main logic is implemented
52n-ses-eml-001 - EML schema version 0.0.1 event streaming module
52n-ses-eml-002 - EML schema version 0.0.2 event streaming module
52n-ses-utils - module for serving general utilities (XMLHelper methods, ConfigurationRegistry, Parsing, Concurrency Handling)