Overview
This page is dedicated to developers of the
SensorEventService. It gives some insights about the general work-flow of requests and how the actual data is processed. If you have any questions, please get in touch with the community using the
Sensor Web Mailing List.
General Concept
The 52°North SES implementation's HTTP interface is based on
Apache Muse 2.2.0. Unfortunately, the project got discontinued shortly after the decision to use it was made. Thus, we have installed our own Muse 2.2.0 branch in the 52N SVN Repo to be able to fix bugs and add features on our own. It is currently closed accessible, so please get in touch with the IT-IS team if you need access (info can be found on the
account information page).
The muse framework provides the general business logic for handling requests in compliance with the
SensorEventServiceDeveloperInformation.
WS-N Specifications
The
Web Service Notifications specifications is a family of standards developed by
OASIS. It defines interfaces and Requests/Responses to enable web services with interoperable publish/subscribe functionality. The default binding of the SES is based upon
SOAP 1.2.
WS Base Notification
see
SensorEventServiceInterface for details of the specification.
The 52N SES implements both the NotificationProducer (
class
SESNotificationProducer
) and the NotificationConsumer (
class
SESNotificationConsumer
). It is thus capable of retrieving and producing/publishing Notification messages.
Besides these two resources the 52N SES implements the SubscriptionManager (
class
SESSubscriptionManager
) interface. It provides functionality to pause/renew/remove a subscription instance.
WS Brokered Notification
see
SensorEventServiceInterface for details of the specification.
As previously stated, the SES implements the Producer and Consumer side of the WS-N specification. Used in combination with the SubscriptionManager, these two interfaces build the basis for the
NotificationBroker implementation of the SES. Publishers can send messages to the SES, and consumers can subscribe for a specific subset of theses messages.
Resource Management and Lifecycles
The way muse and thus the SES manages resources and its lifetime is closely coupled to the
SensorEventServiceDeveloperInformation specification. Initial lifetime of a resource can be defined inside the first request (e.g.
wsnt:InitialTerminationTime
for a
Subscribe
request) or in an additional request (e.g.
wsrf:RequestedLifetimeDuration
for a
wsntw:SetTerminationTimeRequest
request).
After startup of the SES, all resources get loaded in a predefined order. The first resources are the SubscriptionManager instances (the default one and optionally additional persisted instances).
SubscriptionManager
The
SESSubscriptionManager
is the first class of the SES core package which is loaded. In particular, the ResourceManager of muse calls the
initialize()
method since a resource file in the dedicated router-entries folder is available by default. In this method, the main setup of the SES environment is done. The singleton
SensorEventServiceDeveloperInformation, the
IFilterEngine
implementation as well as the
IUnitConverter
instance is loaded. This is done only one time, embedded into a mutex-synchronized block. To add new resources or classes that need to be loaded on startup, simply append your code to the
initializeSESResources()
method, which is called within the mutexed block.
Beside the startup initialization, the
SESSubscriptionManager
is instantiated for every requested Subscription. IDs are assigned by the
SESResourceIdFactory.SubscriptionIdFactory
. These instances can be managed using the WS Resource interfaces (
SetTerminationTimeRequest
,
DestroyRequest
) and are removed using the
unsubscribe()
method which is called within an
Unsubscribe
request. The SES also implements the PausableSuscriptionManager capability. Thus, Subscriptions can be paused/resumed using the appropriate WS-N method (see spec).
NotificationProducer
The
SESNotificationProducer
gets instantiated only once, though subsequent to the initialization of the SESSubscriptionManager. Consequently, it has access to a fully prepared
SensorEventServiceDeveloperInformation. It never gets shutdown before the whole service is shutdown as well.
PublisherEndpoint
A
PublisherEndpoint
instance is the result of a successful
RegisterPublisher
request. IDs are assigned by the
SESResourceIdFactory.PublisherIdFactory
. Similar to the SubscriptionManager, it can be managed using dedicated interface methods. In particular,
RenewRegistration
and
DestroyRegistration
requests can be used to extend the lifetime or to remove the publisher.
ConfigurationRegistry
The
ConfigurationRegistry
provides access to the SES' configuration file entries. It is available as a singleton using the
getInstance()
method. It holds static keys for save retrieval of configuration parameters using the
getPropertyForKey()
method. Besides the provision of parameters, this class can be used as a global communication/registry component, as it is available from all classes of the SES. For instance, it provides access to the
IFilterEngine
and
IUnitConverter
instances.
Entry Point of Data
The following subsections summarize the data-flow for certain incoming requests. They are supposed to provide a better understanding of the component chaining as implemented in the 52N SES.
Subscriptions and Publisher Registrations can also be triggered from persisted XML instances. The workflow in general stays the same, just the caller is different (
SESFilePersistence
).
Subscriptions
The starting point for every subscription is a valid
wsnt:Subscribe
request sent to the NotificationProducers' port type. From there on, the ResourceRouter calls the SESNotificationProducer's
subscribe(...)
method. Beforehand, the
wsnt:Filter
element was separated into its
wsnt:MessageContent
children (using the registered
FilterFactoryHandler
implementations). Each MessageContent is represented as a
Filter
instance and could also be bundled in a
FilterCollection
.
If the
FilterCollection contains a
SESConstraintFilter
(meaning it has a level-2 or level-3 filter) it gets registered at the
IFilterEngine
instance using the
registerFilter()
method. This is where the actual parsing and processing of the MessageContent's markup takes place.
Notifications
Work-flow of notifications is a bit trickier than for e.g. subscriptions. This is, because for consuming and brokering notifications all roles of the Brokered Notification come into play. Again, the starting point is a valid
wsnt:Notify
request, which consists of one
wsnt:NotificationMessage
element. Besides a
wsnt:Topic
, the
wsnt:Message
element is included which holds the actual payload as xml:any. The qualified names of the contents can be retrieved using the NotificationMessage's
getMessageContentNames()
method.
After pre-processing of the message, the ResourceRouter calls the
SESNotificationConsumer
notify()
method. Here, the message is inserted into a FIFO-queued thread pool encapsulated within a
NotifyThread
instance. For convenience, the thread pool is currently a single-threaded solution (see
SensorEventServiceDeveloperInformation for details). When it is the thread's turn, it calls all registered
NotificationMessageListener
implementations. For the default setup, this is just the
SESMessageListener
which calls the method
publishCompleteMessage()
of the SESNotificationProducer instance.
SESNotificationProducer
Finally, the message made it to the SESNotificationProducer, where it is at first inserted into the actual IFilterEngine instance (see
SensorEventServiceDeveloperInformation). As the second step, the
publish()
method of the SESNotificationProducer is called. Here, the message is pushed into every available SESSubscriptionManager without a
SESConstraintFilter
where it is checked against the registered Filters. For simple Filters which only consist of Level-1 instances (e.g. XPath expressions) there is no need for pattern/causality matching. Thus every NotificationMessage can be treated as an independent discrete message. Therefore, simply all Filter instances'
accepts()
method are called. If all instances returned
true
, the message is forwarded to the subscribers EndpointReference.
For complex Filters (Level 2 and 3), a completely different solution was needed, as those complex filters could filter on
patterns of messages (e.g. a threshold undershoot followed by a threshold overshoot, a pattern involving mutliple messages).
EsperFilterEngine
This is where the actual magic happens.
Esper is the underlying event stream processing engine which is used for Event Pattern matching. As the SES is capable of filtering messages based on stream patterns (e.g. EML Patterns) we had to adjust the default implementation of Filter matching. If a subscriber provided a filter of level 2 or 3 (=
SESConstraintFilter
), the default filter matching is bypassed (
note that the
accepts()
method of
SESConstraintFilter
always returns
false
). In return, the order of filter matching is reversed.
After the
EsperFilterEngine
(the current default implementation of
IFilterEngine
) detects a pattern match for a message, every other Level-1 filter is being checked. In particular, this is done inside the
StatementListener
doOutput()
method of the used EML module. Here, the
publish()
method of the corresponding SESSubscriptionManager instance is called. From thereon the same work-flow as for Level-1 filters is applied.
!
The FilterEngine is responsible for triggering the parsing of the NotificationMessage. For this purpose, it holds a list of all available Parsers (every Parser must implement
AbstractParser
) which is iterated. If a Parser instance returns
true
for the
accept()
method, the
parse()
method is called. The Parser is then responsible for creating a List of
MapEvent
instances.
MapEvent
is an implementation of
java.util.Map
and is designed to store the parsed properties in a key-value-pair manner. As Esper is capable of processing instances of
java.util.Map
, it is possible to just push the results of the Parser into Esper.
Publisher Registrations
This data-flow is very similar to Subscriptions. The ResourceRouter calls the
registerPublisher()
method of the
RegisterPublisher
class. Here, a
PublisherEndpoint
instance is created and initialized. in particular, the
SensorML is processed and the data types are registered at the
DataTypesMap
. This is needed, as in some situations subscriptions and/or notifications depend on metadata (e.g. unit of measurement) of a publisher.
Message Filtering
A substantial part of a Subscription is the definition of a message filter. With this filter, the user is able to define a subset of messages he is interested in. The SES defines three levels of filtering as illustrated in the following sections.
Level 1
This filter uses XPath to define the filter criteria that shall be applied on each incoming notification separately. The SES uses the built-in version of muse (
MessagePatternFilter
) which is based on
Xalan. A single NotificiationMessage is evaluated against the given XPath expression and if
true
is returned, the NotificationMessage is pushed to the EndpointReference of the Subscription.
Level 2
Level-2 filters are encoded using the
Filter Encoding Specification 2.0. As EML as well uses FES 2.0 in its Guards the realization of Level-2 filters in the 52N SES is basically the same as for Level-3. Depending on the used EML version, the FES 2.0 Filter markup is wrapped by a static EML document and then treated as Level-3. See the class
StaticEMLDocument
for the details of this wrapped document.
Level 3 / EML Modules
This filter level is designed to recognize patterns in a stream of messages. It is based on the
Event Pattern Markup Language which is a powerful but also quite sophisticated definition language. The details of this specification go beyond the scope of this arcticle. In the following section, the internal handling of EML subscriptions will be summarized. In particular, the mapping from EML to EPL (an SQL-like syntax language used in the internals of esper) and some special cases (e.g. spatial filtering) are described.
At first, it should be mentioned that the current SES implementation supports EML Schema versions 0.0.1 (which is the one used in the OGC discussion paper) as well as 0.0.2 (some handy enhancements). Nevertheless, due to some reasons both modules can not run in parallel, so you should use the dedicated maven profile (located in the 52n-ses/pom.xml) to activate the version of your choice (eml_002 is the default btw).
Both modules in general are structured in the same way. In particular, the entry point is an implementation of
ILogicController
, the
EsperController
for both modules (though located in different packages). It is set up using the
initialize()
method. Here, the EML markup is parsed. Namely, the parsing method iterates through all SimplePatterns, ComplexPatterns, TimerPatterns and RepetitivePatterns. Every pattern is represented by a dedicated Java object after successful parsing and is then transformed into an EPL statement.
An EPL Statement (
see
http://esper.codehaus.org/esper-2.3.0/doc/reference/en/html/epl_clauses.html for details) is registered at Esper from within the
initializeListeners()
method. The concept of Esper is based on Listeners which are linked to Statements. So, for every Statement a Listener (in most cases an instance of
StatementListener
) is attached. A Listener gets informed when Esper detected a match for its Statement. The concept of EML backed by Esper is the creation of different streams and multiple patterns (see
http://52north.org/ses for conceptual details). The following figure illustrates the workflow of a NotificationMessage and the concept of Statement + Listeners.
When a Statement triggered a match, the
update()
method of the
StatementListener
is called. From here on a new instance of
UpdateHandlerThread
is created and submitted to a thread pool. The
UpdateHandlerThread
is responsible for checking if the message must be inserted into another stream or if it should generate an output to the subscriber. If the latter is the case, simply the
doOutput()
method of the StatementListener is called where then the
SESSubscriptionMananger
is called via the
publish()
or
sendSESNotificationMessge()
method. Further processing is made as described in
SensorEventServiceDeveloperInformation.
EML Version 0.0.1
Including version 0.0.1 instead of the default 0.0.2 can be achieved by activating the
eml_001
maven profile (e.g.
mvn clean install -P eml_001
on 52-ses or 52n-ses-core).
EML Version 0.0.2
This is the default implementation used by the SES. Its maven profile
eml_002
is activated by default. In comparison to version 0.0.1 some XML Schema changes have been made to address common drawbacks. This in general lead to adjustments in the EML parsing routines (classes
EMLParser
and
EsperController
) but did not change the overall behavior of pattern processing.
Version 0.0.2 introduced a new feature, the eml:GenericView. This view is a generic powerful tool when combined with a UserDefinedSelectFunction. An example of such a view is the
DynamicSpatialBufferView
class which implements spatial filtering of incoming messages against a dynamic geometry.
Spatial Filtering
Spatial filtering within the SES is based on the
Java Topology Suite. As it is available in Level-2 filters and upwards it has to be integrated into esper (see
SensorEventServiceDeveloperInformation). This is done by providing a custom function library to esper (see
http://esper.codehaus.org/esper-2.3.0/doc/reference/en/html/functionreference.html for details). While initializing the
EsperController
the private method
registerCustomFunctions()
is called. Here, the package name gets imported into Esper and methods of its classes can then be used within EPL Statements.
The actual filtering is applied in the
SpatialMethods
which provides wrapper methods to JTS function calls. Every method takes two
Geometry
objects and evaluates these against the underlying spatial method (e.g. an intersection of two geometries).
Since JTS is only a
topology framework, it is not capable of applying any reference system specific transformations. Nevertheless, the SES supports some FES 2.0 spatial operations such as
fes:DWithin
or
fes:Beyond
which take a distance and calculate a buffer around a given geometry before applying the actual topology evaluation. In the current implementation of the SES this is achieved by a
PostgreSQL/PostGIS backend. By following this approach we benefit from robust and performant transformation methods. The drawback for the user is the need for setting up a complete database system (which is never used for actual storing any data).
An exemplary implementation of a buffer is realized in the
ADistanceBufferFilter
class of both EML modules. In its
createExpressionForDistanceFilter()
function the actual buffering is triggered by using an instance of
PostGisCreateBuffer
.
Adding New UserDefinedSelectFunction instances
For some use cases and environments it could be required to define a new select function which addresses certain needs of a scenario. This must be done in the
getSelectString()
method of class
SelFunction
(in the EML module you are using). Here, you should append an additional
else if
clause to the
if (this.functionName.equals(...))
control block. There are many examples of select functions already present which will help you to understand the general concept.
Implementing a GenericView
The GenericView of EML 0.0.2 is processed in the
getViewString()
method of class
DataView
(similar to a UserDefinedSelectFunction). There is a dedicated
else if
control block after the
if (this.esperString.equals(""))
statement where you can implement the view specifics. Currently, there is an implementation for the
DynamicSpatialBufferView
which you can adopt for your own GenericView.
Note that a GenericView is often coupled with a UserDefinedSelectFunction and thus can be identified by the name of that Select function.
Concurrent Handling of Messages
In earlier versions of the SES, major misbehavior of pattern matching has been identified due to wrong order of message processing. This happend mainly in situations with higher data rates (> 5/sec) and was caused by parallel unsynchronized parsing/processing of messages. The following figure illustrates the problem of race conditions which used to occur in such cases.
The
SESNotificationConsumer
created a
NotifyThread
for every incoming messages and forced it's start immediately after creation. Consequently, the entry order into the EsperFilterEngine (here: 2-1-3 instead of 1-2-3) was out of control as Java does not support any kind of thread priorisation. As correct message order is the highest requirement when working on patterns of messages, a robust and performant way of dealing with high data rates was an essential need for the SES to work properly in such use cases. The following sections provide information on how this is realized.
Default Implementation
Concurrency handling always adds a synchronization/thread-management overhead to an application. Thus, ordered concurrency handling of the SES can be disabled using a configuration key (
USE_CONCURRENT_ORDERED_HANDLING
=
false
). This is suitable in cases where no complex stream processing is needed (e.g. only Level-1 filtering, level-2 with discrete independent messages). If not disabled, the SES uses its default implementation, a FIFO-ordered queue, for incoming messages.
The solution is quite straightforward and can be separated into three steps:
- Do not separate processing into multiple threads before pushing messages into the EsperFilterEngine (especially this means that everything from arriving at the HTTP port until inserting into the FilterEngine must be done in a single thread). The EsperFilterEngine reserves a position in the FIFO queue in strict order of arrival.
- After reserving the position, create a HandlerThread to parse and process the contents of the message.
- After processing has finished, inform the FIFO queue about succesful processing. The FIFO queue pushes the processed message back to the EsperFilterEngine (which implements IPollListener for that purpose).
As you might guess, this causes some trouble when dealing with high data rates and costly processing of messages. Therefore, a timeout estimation algorithm is integrated to prevent the FIFO queue from blocking if an exception occured or processing of an element takes too long. See
SensorEventServiceDeveloperInformation for details.
Let's get a bit into detail of the implementation, namely illustrating the used data structures and investigating data-flow. The SES' default implementation is the class
FIFOWorker
which consists of a single thread to pull available data from an internal FIFO queue. It implements
IConcurrentNoticitionHandler
and thus could be replaced by other solutions.
Step 1 as mentioned above is realized in the
filter()
method of the EsperFilterEngine. Here, we call the
insertPendingEventCollection()
method of IConcurrentNotificationHandler. In return we get an empty
QueuedMapEventCollection
which acts as our container for processed
MapEvents
(the internal representation of a processed
NotificationMessage
). Then a new HandlerThread, namely a
NotificationMessageProcessor
, is created and submitted to a dedicated thread pool. The actual processing of the message (
step 2 as mentioned above) takes place in the
run()
method of these threads and in a final step, the IConcurrentNotificationHandler gets notified.
Notification of the IConcurrentNotificationHandler is done by two method calls. First, the
setCollection()
method of the
QueuedMapEventCollection
is called. Inside this method, the
notifyAll()
method of a monitor object is called. This is needed as the FIFOWorker could wait on this monitor as it already tried to process the contents of this message but failed (so it decided to wait on it for a certain amount of time). The second call is a more general one. By triggering the
notifyOnDataAvailability()
method of IConcurrentNotficiationHandler we inform the FIFOWorker about the general availability of data. The FIFOWorker could wait on its own monitor object which gives information about the contents of the internal FIFO queue. In particular, the FIFOWorker waits on this monitor when he managed to empty its FIFO queue.
When the FIFOWorker was able to poll the head of the internal queue (meaning the message got processed and is available) it conducts
step 3 as mentioned above. It calls
onElementPolled()
of its
IPollListener
instance for every
MapEvent
stored in the
QueuedMapEventCollection
. Therefore, the processed message is pushed back to the EsperFilterEngine as it registered itself as the IPollListener beforehand. The EsperFilterEngine is then responsible for pushing the single MapEvents into the actual esper processing engine (see
SensorEventServiceDeveloperInformation). As FIFOWorker is realized in a single thread, we have ensured that the correct order of messages is satisfied.
Timeout/Exception Handling
As the nature of a FIFO queue implies the strict order of polling elements, occurrence of exceptions or high processing latencies (e.g. caused by a slow WFS instance while enriching the data) has to be treated specially. Let's assume a
NotificationMessageProcessor
which did not catch a
SocketTimeoutException
properly and thus was never able to inform the FIFOWorker about its failure. The FIFOWorker would be stuck in an endless
wait()
call on the thread's
QueuedMapEventCollection
monitor object. To preempt such events, a timeout handling algorithm is implemented in the FIFOWorker. In general, there are two ways of defining timeouts: always use a static timeout (configurable through
CONCURRENT_INTELLIGENT_TIMEOUT
=
false
) or use an "intelligent" timeout estimation.
The SES' default timeout estimation algorithm is loosely based on the
Inverse distance weighting method and could also be replaced by other implementations of
ITimeoutEstimation
(e.g. an implementation which always returns the maximum processing period would be much more reliable than IDW but also less performant). It takes a constant number of processing periods (measured by the
NotificationMessageProcessor
threads) as an input for predicting future processing periods. Such implementation is a consensus of performance and reliability. It is dynamic with respect to investigations made in the past and thus could deal with high CPU load or huge latencies of external web services.
Stress Test Results
The concurrent handling of messages has been tested regarding performance and reliability. Three different test cases have been run: Single O&M Observations, Single O&M Observations with simulated processing latency (2-3 seconds) and Single O&M Observations with random exceptions (every 10th message averagely). All test case's messages series have been applied to an EML subscription with two SimplePatterns and one ComplexPattern (threshold undershoot followed by a threshold overshoot).
FIFOWorker
with
IDWTimeoutEstimation
implementation (Revision 11732;
Ubuntu 11.10 with a
Tomcat 6 running on an
Intel(R) Core(TM) i5 CPU M 430 @ 2.27GHz (2 Cores) with
4GBs of RAM):
If you want to run these tests on your own, you must enable the testing flag (
performanceTesting
) in the EsperFilterEngine. If enabled, the EsperFilterEngine calculates the time it takes to empty the FIFO queue (last column). This can only be done in the
unregisterFilter()
method and thus you should instantly invoke an Unsubscribe request after you have pushed all messages into the SES. Additionally, you could activate latency simulation (
testingSimulateLatency
) or random exceptions (
testingThrowRandomExceptions
).
Eclipse Project Setup
Prerequisites:
- Eclipse with a Git plugin (Included in recent releases)
- Apache Maven (version 2 and 3 tested)
- m2eclipse plugin installed and setup to use with your maven-2.2.1 installation (included in Eclipse 4.4+)
First, you have to check-out the 52n-ses project from
https://github.com/52North/SES
. Use "Check out as a project configured using the New Project Wizard". Name it e.g. "52n-ses", use a simple project (no Java project needed at this phase).
Right-click on the newly created project and select "Configure/Convert to Maven Project". Eclipse will probably take a while to activate the Maven nature. After processing is finished, your workspace should look similar to this:
Now, right-click on your maven project and select "Import"-> "Maven/Existing Maven Projects". m2eclipse will automatically import the subprojects (modules) into the root folder of your workspace. After successful importing the modules, your workspace should look like this:
Thats all you got to do. The SES should now be available in your workspace. You can compile it by right-clicking on the root project ("52n-ses") and selecting "Run as/Maven build" -> type "install" as the goal.
Sourc Code Project Structure
The repository of all SES-related components is located at
https://github.com/52North/SES
. The repository's layout is organized as follow.
-
master/develop branches
-
52n-ses-api
- an API module providing classes used in multiple other modules
-
52n-ses-bindings
- module for the XML bindings (compiled XMLBeans)
-
52n-ses-core
- the core of the SES where the main logic is implemented
-
52n-ses-eml-001
- EML schema version 0.0.1 event streaming module
-
52n-ses-eml-002
- EML schema version 0.0.2 event streaming module
-
52n-ses-utils
- module for serving general utilities (XMLHelper methods, ConfigurationRegistry, Parsing, Concurrency Handling)