IBM Streams 4.2

Window handling

The SPL language run time provides APIs for building window-based operators.

The SPL language has two kinds of windows, tumbling and sliding. They both store tuples while they preserve the order of arrival, but differ in how they handle tuple evictions. Rather than keeping all the tuples ever inserted, windows are configured to evict expired tuples. In this respect, tumbling windows operate in batches. When a tumbling window fills up, all the tuples in the window are evicted. This process is called a window flush. Conversely, sliding windows operate in an incremental fashion. When a sliding window fills up, the future tuple insertions result in evicting the oldest tuples in the window. The details of tuple eviction are defined by the eviction policy. There are different eviction policies. Consider a count-based eviction policy of size c. A tumbling window that is configured in this way flushes whenever the number of tuples in the window reaches c. A similarly configured sliding window will evict the first tuple when c+1th tuple is received and each new tuple after that evicts the oldest tuple in the window. The following are examples of tumbling and sliding windows that are configured with a count-based eviction policy of size 4.

Tumbling window:
() -> (1) -> (2, 1) -> (3, 2, 1) -> (4, 3, 2, 1) -> () -> (5) -> (6, 5) -> ...

Sliding Window:
() -> (1) -> (2, 1) -> (3, 2, 1) -> (4, 3, 2, 1) -> (5, 4, 3, 2) -> (6, 5, 4, 3) -> ...

Another important way in which tumbling and sliding windows differ is with how they trigger. Window triggers are events that indicate that the window is ready for processing. What the term processing means is application specific. In the context of an Aggregate operator, it might be the computation of an aggregation function, for a Sort operator it might be the sorting of the window. The window trigger event is the same as the window flush event for tumbling windows. For a sliding window, the trigger policy defines when the window triggers. Consider the simple case of a count-based trigger policy of size c. The window will trigger after c new tuples are received since the last trigger. The following is an example of a sliding window with a count-based eviction policy of size 4 and a count-based trigger policy of size 2. The * is used to mark the window trigger events.

Sliding Window:
() -> (1) -> *(2, 1) -> (3, 2, 1) -> *(4, 3, 2, 1) -> (5, 4, 3, 2) -> *(6, 5, 4, 3) -> ...

The SPL language also supports partitioned variants of sliding and tumbling windows. For partitioned windows, eviction and trigger policies apply to individual partitions, independent of other partitions. Each tuple that is inserted into the window is associated with a user-specified partition. In effect, each partition has its own subwindow and all the windowing configurations apply to the subwindows, independently. As a result, window events such as evictions and triggers have partitions that are associated with them. For tumbling windows, the subwindows that are associated with the partitions are removed when the windows are flushed. Conversely, for sliding windows the subwindows that are associated with the partitions might never become empty in certain cases. For example, consider a count or delta-based eviction policy; when the window is already partially populated, it can never become empty. It is important to understand this distinction, as it has implications on the memory usage. The following is an example that shows a partitioned window, where the numbers represent tuples and letters represent partitions. As in the previous example, a count-based eviction policy of size 4 is used.

Data Items with Partitions:
1a 2b 3a 4b 5b, 6b, 7a, 8a, 9b, 10b, 11a, 13a

Tumbling Partitioned Window:
a: () -> (1) -> (3, 1) -> (7, 3, 1) -> (8, 7, 3, 1) -> () -> (11) -> (13, 11) -> ...
b: () -> (2) -> (4, 2) -> (5, 4, 2) -> (6, 5, 4, 2) -> () -> (9) -> (10, 9) -> ...

Sliding Partitioned Window:
a: () -> (1) -> (3, 1) -> (7, 3, 1) -> (8, 7, 3, 1) -> (11, 8, 7, 3) -> (13, 11, 8, 7) -> ...
b: () -> (2) -> (4, 2) -> (5, 4, 2) -> (6, 5, 4, 2) -> (9, 6, 5, 4) -> (10, 9, 6, 5) -> ...
Eviction and Trigger Policies: There are four types of eviction and trigger policies:
  • Count-based
  • Delta-based
  • Time-based
  • Punctuation-based
Count-based window policies are configured with a size parameter. An example is CountWindowPolicy policy(5);. For a tumbling window, when used to characterize an eviction policy, the size defines the number of tuples that constitute a full window. When a new tuple is received, it is inserted into the window. After the insertion, if the number of tuples in the window is equal to the specified size (a full window), then the whole window is flushed (mass eviction).

For a sliding window, when used to characterize an eviction policy, the size defines the number of tuples to be kept in the window, as it slides. When a new tuple is received, if the number of existing tuples in the window is equal to the size specified, then the oldest tuple is evicted from the window. The new tuple is inserted into the window after the eviction (if any) is performed.

For a sliding window, when used to characterize a trigger policy, the size defines the number of tuples to be received since the last trigger, before triggering the window again. When a new tuple is received, the number of tuples that are received since the last trigger is updated. If this number is equal to the specified size, then the window is triggered again. The trigger policy (count-based) processing takes place after the eviction policy (count-based or delta-based) processing.

Delta-based window policies are configured with a size parameter and an attribute. For instance, consider a tuple of type TType and an attribute that is named age in that tuple. Here is how a window policy for delta(age, 15) is created in C++:

DeltaWindowPolicy<TType, // tuple type
  TType::age_type, // attribute type
  &TType::get_age  // function to get the attribute
> policy(15);

If pointers to tuples are to be stored in the window, instead of tuples, the example can be changed to:

DeltaWindowPolicy<TType *, // tuple type
  TType::age_type, // attribute type
  &TType::get_age  // function to get the attribute
> policy(15);

For a tumbling window, when used to characterize an eviction policy, the size defines the boundary of a full window in the units of the eviction policy attribute. When a new tuple is received, if the delta between its eviction policy attribute value and that of the oldest tuple in the window is larger than the eviction policy size, then the whole window is flushed (mass eviction). The new tuple is inserted into the window after the flush operation (if any) is performed.

For a sliding window, when used to characterize an eviction policy, the size defines the boundary of the window, as it slides, in the units of the eviction policy attribute. When a new tuple is received, existing tuples in the window for which the delta between the eviction policy attribute value of the new tuple and that of the existing tuple is greater than the eviction policy size are evicted. The new tuple is inserted into the window after the evictions (if any) are performed.

For a sliding window, when used to characterize a trigger policy, the size defines the boundary of the sequence of tuples that are received since the last trigger that triggers the window again. When a new tuple is received, if the value of its trigger attribute is larger than the trigger attribute of the last tuple that triggered the window plus the trigger policy size, the window is triggered again. The trigger policy (delta-based) processing takes place before the eviction policy (count or delta-based) processing.

Time-based window policies are configured with a time period parameter (in seconds). An example is TimeWindowPolicy policy(5);

For a tumbling window, when used to characterize an eviction policy, the time period defines the boundary of a full window in units of real time. When a new tuple is received, it is inserted into the window. When the time elapsed since the last flush event exceeds the specified time period, then the whole window is flushed (mass eviction). Window flushes take place independently of tuple insertions.

For a sliding window, when used to characterize an eviction policy, the time period defines the boundary of the window, as it slides, in units of real time. When a new tuple is received, it is inserted into the window. Tuples that are in the window longer than the specified time period are evicted. Tuple evictions take place independently of tuple insertions.

For a sliding window, when used to characterize a trigger policy, the time period defines the inter-arrival time for successive trigger events. When the time elapsed since the last trigger event exceeds the specified time period, a new trigger is generated. The trigger policy (time-based) processing takes place independently of the eviction policy processing.

Punctuation-based window policies have no configuration options. An example is PunctWindowPolicy policy();

A tumbling window with punctuation-based eviction policy is considered full when a punctuation is received. When a tuple is received, it is inserted into the window. When a punctuation is received, the whole window flushes (mass eviction).

Sliding windows cannot have punctuation-based eviction or trigger policies.

Partition Eviction Policies: Partitioned windows are used to group related tuples for processing. Each partition contains 0 to n tuples. In partitioned windows, tuples might never be evicted from a partition. For example, consider an Aggregate operator with a partitioned tumbling window that has a trigger policy of count(100). If the application generates only 99 tuples in each partition, the partitions would never trigger, and the memory usage of the window would continue to increase. Sometimes, the application requirements specify that if no tuples are generated during a specific duration, the existing tuples in the window are not useful. In such situations, you must delete partitions and recover memory that is allocated to the tuples.

SPL supports the following partition eviction polices for partitioned windows:
  • partitionAge(float64 n): If a tuple is not inserted in a partition for n seconds, the partition is deleted.
  • partitionCount(uint32 c): If the number of partitions exceeds a count of c, partitions are deleted until the partition count is equal to c.
  • tupleCount(uint32 t): If the total number of tuples across all partitions exceeds a count of t, partitions are deleted until the tuple count is less than or equal to t.
The operator can select the partitions to evict by setting one of the following policies:
  • OperatorDefined: Implement and register an event handler to select the partitions to evict.
  • LRU: Use the default event handler to evict the oldest partitions. A time-ordered list of existing partitions and the time when the last tuple was inserted is maintained to identify the least recently used partition.

    The list of partitions is maintained for both OperatorDefined and LRU policies.

After partitions to be evicted are selected, the operator is notified which partitions will be removed with an event. This event allows the operator to take an action (such as deleting objects) before the partitions are removed

Execution Order of Events: The executions of the window events are set off by the insert calls made on the window object. The insertion, eviction, and trigger events are run in a certain order when the insert call is made. As an exception, for time-based window policies, the eviction and trigger processing might proceed independently of the insertion, depending on the specific time-based policies used. The specific order that is used for delivering window events depends on the window policies specified. Two tables, one for tumbling windows (Table 1) and another for sliding windows (Table 2) summarize the execution order of the window events for different eviction and trigger policies.

Table 1. Execution order of window events for tumbling windows
Eviction policy Order of execution
Count-based first insert tuple into window, then perform eviction
Delta-based first perform eviction, then insert tuple into window
Time-based perform evictions independently of tuple insertions
Punctuation-based perform eviction when a punctuation is received

Notice the order of insertion and eviction are different for count-based and delta-based eviction policies. This difference is because for a count-based tumbling window of size n, the window is full when the nth tuple is received. Whereas for a delta-based window of size delta-n, the window is full when a tuple that increases the delta value beyond n is received. In the latter case, the tuples are first evicted from the window and then the newly received tuple is inserted into the emptied window.

Table 2. Execution order of window events for sliding windows . (a | b is used to indicate that events a and b are executed independently, whereas a → b is used to indicate that events a and b are executed serially)
Eviction\Trigger Count-based Delta-based Time-based
Count-based evict → insert → trigger trigger → evict → insert evict → insert | trigger
Delta-based evict → insert → trigger trigger → evict → insert evict → insert | trigger
Time-based insert → trigger | evict trigger → insert | evict insert | evict | trigger

For sliding windows that do not involve time-based policies, eviction always precedes insertion, since evictions are needed to make room for the new tuple that arrives. However, the trigger processing might come first or last depending on whether there are count-based or delta-based trigger policies. For count-based trigger policies, the triggering happens at the end because the newly inserted tuple is part of the contents of the window that needs to be triggered. For instance, for a count-based trigger policy of size n, the nth tuple that is received since the last trigger results in a new trigger and that nth tuple is part of the triggered window, meaning that the trigger succeeds the insertion. However, the delta-based windows are different because the newly received tuple is not part of the contents of the window that needs to be triggered. For instance, for a delta-based trigger policy of size delta-n, the tuple that increases the delta value beyond n since the last trigger results in a new trigger and that tuple is not part of the triggered window, meaning that the trigger precedes both the insertion and the eviction. For time-based eviction policies, the eviction happens independently of other events, and similarly, for time-based trigger policies, the trigger happens independently of other events.

Defining Windows: Tumbling and sliding windows are represented by the TumblingWindow and SlidingWindow template classes. They both extend from the common Window class and have the same template parameters as Window. The common base class for tumbling and sliding windows template is as follows:

template <class TupleType,
          class PartitionType=int32_t,
          class DataType=std::deque<TupleType>,
          class StorageType=std::tr1::
            unordered_map<PartitionType,DataType>  >
class Window;

The types that are involved in the template are:

All of these types are also defined within the Window, TumblingWindow, and SlidingWindow classes as typedefs. In general, the developers customize only TupleType and PartitionType. The following are some example templates.

class TumblingWindow<TType>;
class SlidingWindow<TType *, float32>; // partitioned window

For windows that store pointers to tuples, the memory management responsibilities belong to the user.

Tumbling window constructors take three parameters, a reference to the operator that contains the window, an index to the input port that this window is associated with, and the eviction policy for the window. Sliding windows are similar, but they also take the trigger policy, as the last parameter. The constructor signatures are given as follows:

TumblingWindow(Operator & oper, uint32_t port,
               WindowPolicy const & evictionPolicy);
SlidingWindow(Operator & oper, uint32_t port,
              WindowPolicy const & evictionPolicy,
              WindowPolicy const & triggerPolicy);

The following are some example window configurations:

CountWindowPolicy pC5(5), pC2(2); // count-based policies
PunctWindowPolicy pP; // punctuation-based policy
DeltaWindowPolicy<TT,TT::age_type,&TT::get_age> pDage2(2), pDage0(0); // delta-based policy
DeltaWindowPolicy<TT,TT::time_type,&TT::get_time> pDtime2(2.0); // delta-based policy
TimeWindowPolicy pT8(8.0), pT2(2.0); // time-based policy

TumblingWindow<TT> wT_C5(op, 0, pC5); // tumbling, count(5)
TumblingWindow<TT> wT_P(op, 0, pP); // tumbling, punct()
TumblingWindow<TT> wT_Dage2(op, 0, pDage2); // tumbling, delta(age,2)
TumblingWindow<TT> wT_T8(op, pT8); // tumbling, time(8)

SlidingWindow<TT> wS_C5_C2(op, 0, pC5, pC2); // sliding, count(5), count(2)
SlidingWindow<TT> wS_C5_Dage2(op, 0, pC5, pDage2); // sliding, count(5), delta(age, 2)
SlidingWindow<TT> wS_Dage2_C5(op, 0, pDage2, pC5); // sliding, delta(age, 2), count(5)
SlidingWindow<TT> wS_Dage2_Dage0(op, 0, pDage2, pDage0); // sliding, delta(age, 2), delta(age, 0)
SlidingWindow<TT> wS_Dage2_T2(op, 0, pDage2, pT2); // sliding, delta(age, 2), time(2)
SlidingWindow<TT> wS_T8_C5(op, 0, pT8, pC5); // sliding, time(8), count(5)
SlidingWindow<TT> wS_T8_Dage2(op, 0, pT8, pDage2); // sliding, time(8), delta(age, 2)
SlidingWindow<TT> wS_T8_T2(op, 0, pT8, pT2); // sliding, time(8), time(2)

Inserting Tuples: SPL run time has an event-driven windowing API. Except for certain events that are fired based on time-based eviction and trigger polices, most window events fire as a result of tuples or punctuation being inserted into the window. The following are the signatures of the two window functions available for inserting tuples and punctuation into the window.

void insert(TupleType const & tuple,
            PartitionType const & partition=PartitionType());
void insert(Punctuation const & punct);

The partition parameter of the tuple insertion function is optional. For non-partitioned windows, it can be omitted. A non-partitioned window has a single default partition with a default partition attribute type (int32_t) and value (0).

Evicting Partitions: If a partition eviction policy is defined for a window, each time a tuple is inserted into a partition, the window is checked to see whether partition eviction is required. If partition eviction is required and the LRU option is selected, the default onWindowPartitionEvictionSelection event handler is called to select the partitions to evict. The oldest partitions are selected for eviction from a time-ordered list of partitions. If partition eviction is required and the OperatorDefined option is selected, the user-defined onWindowPartitionEvictionSelection event handler is called to allow the operator to select partitions for eviction.

If partitions are selected for eviction and the onwindowPartitionEvicition event handler is registered, it is called to take user-defined actions before it deletes the partitions. For example, you can delete the tuples in a partition to recover the memory that is allocated to the tuples before you delete the partitions.

The following section describes how an operator developer implements partition eviction:
  • Evict partitions to control memory use:
    • The developer must edit the <oper-name>_cpp.cgt file to use the getPartitionedWindowCppInitializer function with the selectionType parameter set to LRU.
    • The developer can implement and register the onWindowPartitionEviction function to delete tuples for windows that store pointers to tuples.
  • Evict partitions with user-defined actions:
    • The developer must edit the <oper-name>_cpp.cgt file to use the getPartitionedWindowCppInitializer function with the selectionType parameter set to LRU.
    • The developer must implement and register the onWindowPartitionEviction function to get notification of partitions that are selected for eviction and to define the action to take before the partitions are evicted.
  • Select partitions for eviction:
    • The developer must edit the <oper-name>_cpp.cgt file to use the getPartitionedWindowCppInitializer function with the selectionType parameter set to OperatorDefined.
    • The developer must implement and register the onWindowPartitionEvictionSelection function to select the partitions to evict with the markForRemoval() function of the PartitionSelectionIterator class.
    • The developer can implement and register the onWindowPartitionEviction function to delete tuples for windows that store pointers to tuples or to take specific action when partitions are evicted.

The following example shows a sample <oper-name>_cpp.cgt file.

<%
  my $inputPort = $model->getInputPortAt(0); 
  my $outputPort = $model->getOutputPortAt(0);
  my $inTupleName = $inputPort->getCppTupleName(); 
  my $inTupleType = $inputPort->getCppTupleType();
  my $outTupleType = $outputPort->getCppTupleType();
  my $partitionByParam = $model->getParameterByName("partitionBy");
  my $partitionByInitializer = SPL::CodeGen::getParameterCppInitializer($partitionByParam);
  my $window = $inputPort->getWindow();
  my $windowCppInitializer = SPL::CodeGen::getPartitionedWindowCppInitializer($window,
                                                                              "IPort0Type*",
                                                                              "OperatorDefined");
%>

<%SPL::CodeGen::implementationPrologue($model);%>

MY_OPERATOR::MY_OPERATOR()
  : MY_BASE_OPERATOR(), _window(<%=$windowCppInitializer%>)
{
<%if($window->hasPartitionEvictionPolicy()) {%>
    _window.registerOnWindowPartitionEviction(this);
    _window.registerOnWindowPartitionEvictionSelection(this);
<%}%>
}

MY_OPERATOR::~MY_OPERATOR()
{
  // Delete any remaining tuples in the window
  _window.deleteWindowObjects();
}

void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
{
  AutoPortMutex apm(_mutex, *this);
  IPort0Type const & <%=$inTupleName%> = static_cast<IPort0Type const&>(tuple);
  PartitionByType partition(<%=$partitionByInitializer%>);
  _window.insert(new IPort0Type(<%=$inTupleName%>), partition);
}

<%if($window->hasPartitionEvictionPolicy()) {%>
// We will get called when the partition eviction criteria has been reached.
// This can be partitionAge, partitionCount, or tupleCount.  For this example,
// We will just select all the least recently used partitions that have no tuples in
// them, and then select one that still contains tuples.
// If that isn't enough to satisfy the criteria, we will be re-invoked to do more.
// We could use examine the partition eviction kind and expressions if we wanted to be more
// precise.
void MY_OPERATOR::onWindowPartitionEvictionSelection(WindowEventType::WindowType & window,
                                   WindowEventType::PartitionSelectionIterator const& begin,
                                   WindowEventType::PartitionSelectionIterator const& end)
{
  // Remove all the old empty partitiones, and one non-empty one
  for (PartitionSelectionIterator it = begin; it != end; it++) {
    if (window.getWindowData(it.partition()).size() == 0) {
            it.markForRemoval();
    } else {
            // this window isn't empty
            it.markForRemoval();
            break;
    }
  }
}

// For this oper, we are storing pointers to Tuples in the partitioned windows. When
// a partition is being evicted, we have to delete all the allocated tuples in that partition.
void MY_OPERATOR::onWindowPartitionEviction(WindowEventType::WindowType & window,
                                            WindowEventType::PartitionIterator const& begin,
                                            WindowEventType::PartitionIterator const& end)
{
  // We have a window being evicted.   Clean up
  WindowEventType::PartitionIterator it;
  for (it = begin; it != end; it++) {
    // delete the tuples
    WindowEventType::PartitionType const & partition = it.partition();
    WindowType::DataType & data = window.getWindowData(partition);
    for(uint32_t i=0, iu=data.size(); i<iu; ++i)
      delete data[i];
  }
}

<%}%>

<%SPL::CodeGen::implementationEpilogue($model);%>

Tumbling Window Summarizers: A tumbling window summarizer can be used to remove the need for the Windowing library to retain all tuples in a tumbling window. If your operator would like to use the facilities of the windowing library to group and partition tuples, but just needs to look at incoming tuples once, a tumbling window summarizer can be used. An example is the Sum() function in Aggregate for a tumbling window.

Here is the definition of the TumblingWindowSummarizer class.

/// Inherit from this class and override select methods to implement
/// summarizers for tumbling windows.
template <class T, class G=int32_t>
class TumblingWindowSummarizer
{
  public:
    typedef T TupleType; //!< tuple type
    typedef G PartitionType; //!< partition type

    /// Destructor. The destructor is called immediately after the
    /// afterWindowFlushEvent window event.
    virtual ˜TumblingWindowSummarizer() {}

    /// This event is fired when a new window is opened.
    /// A new window is opened when the first tuple that is part of the new
    /// window is received. The construction happens after the
    /// beforeTupleInsertionEvent window event and before the
    /// afterTupleInsertionEvent window event.
    /// @param partition the partition for which the summarizer is created
    virtual void onOpenWindowEvent(PartitionType const & partition) {}

    /// This event is fired when a new tuple is added to the window
    /// corresponding to this summarizer. This event is delivered after the
    /// beforeTupleInsertionEvent window event and before the
    /// afterTupleInsertionEvent window event.
    /// @param tuple the tuple that was inserted
    virtual void onTupleInsertionEvent(TupleType const & tuple) {}
    
    /// This event is fired when the current window is closed. An existing
    /// window is closed after the last tuple that is part of that window is
    /// received. This event is delivered before the beforeWindowFlushEvent.
    virtual void onCloseWindowEvent() {}
};

To use a TumblingWindowSummarizer, it must be registered with the windowing library, using _window.registerWindowSummarizer<derivedTumblingWindowSummarizer>(). This registration causes the windowing library to instantiate a derivedTumblingWindowSummarizer object for each partition in the tumbling window. The constructor of the object is passed a SPL::Operator& as the argument. As tuples are inserted into the window, the onTupleInsertionEvent is invoked on the proper summarizer. When the beforeWindowFlushEvent is called when the tumbling window flushes, the summarizer for the window can be accessed with this code:

    WindowType& twindow = static_cast<WindowType&>(window); 
    // WindowType must be a tumbling window class.
    derivedTumblingWindowSummarizer& summarizer =
      *static_cast<derivedTumblingWindowSummarizer*>(twindow.getSummarizer(partition));

The information in the summarizer can then be used to generate a tuple, or any other action.

Here is an example that shows the use of a tumbling window summarizer to implement a Sum operation:

#define MY$OP MY_OPERATOR_SCOPE::MY_OPERATOR
// Assumes $sumType and $sumArg are already set with the C++ type and C++ expression to be summed.
// $sumArg must use 
// SPL::CodeGen::prefixLiteralsAndStatesInCppExpressions($somArg->getCppExpression(), "_oper.");
// to ensure that all literals and state variables are prefixed by _oper. to allow the C++ 
// expression to access the necessary information in the primitive operator.
// The summarize class must then provide a _oper object to resolve the references.
struct Sum : public SPL::TumblingWindowSummarizer<MY$OP::IPort0Type*,MY$OP::PartitionByType>{
  MY$OP& _oper;           // Needed to ensure access to literals in C++ code
  uint64_t _count;        // Used to check if the partition is empty (optional)
  <%=$sumType%> _sum;     // incremental sum to be accumulated
  Sum (SPL::Operator& oper) : _oper(static_cast<My$OP &>(oper)), _count(0), _sum(0)  {}

  // Will be invoked for each tuple in a partition
  void onTupleInsertionEvent(MY$OP::IPort0Type* const& tuple) {
    _count++;          // Keep a running count of the tuples seen in this window
    // Establish the needed name for the incoming tuple
    MY$OP::IPort0Type const &<%=$inTupleName%> = *static_cast<MY$OP::IPort0Type const*>(tuple);
    // finally, do the actual sum
    sum += <%=$sumArg%>
  }
};

MY_OPERATOR::MY_OPERATOR()
: _window(<%=$windowCppInitializer%>)
{
  _window.registerBeforeWindowFlushHandler(this);
  _window.registerWindowSummarizer<Sum>();
  ...
}

void MY_OPERATOR::beforeWindowFlushEvent(
  WindowEventType::WindowType & window,
  WindowEventType::PartitionType const & partition)
{
  WindowType& twindow = static_cast<WindowType&>(window);
  Sum& summarizer = *static_cast<Sum*>(twindow.getSummarizer(partition));
  if (summarizer._count == 0) {
    // optional: take action for empty windows
    // return;
  }
  // Window has data - use the value in the summarizer
  OPort0Type otuple (summarizer._sum);
  submit (otuple, 0);
}

Window Events: Various events that are fired by windows are captured through the WindowEvent template class. WindowEvent has the same template form as the window classes themselves. Window classes also include a typedef that is called WindowEventType that identifies the window event class for the particular window. To handle window events, register a class that derives from WindowEvent with the window at hand. Two events are common to both tumbling and sliding windows. They are listed here as they appear in the WindowEvent class:

virtual void beforeTupleInsertionEvent(
    WindowType & window, TupleType const & tuple, PartitionType const & partition) {}
virtual void afterTupleInsertionEvent(
    WindowType & window, TupleType & tuple, PartitionType const & partition) {}

beforeTupleInsertionEvent is fired right before a tuple is inserted into the window, whereas afterTupleInsertionEvent is fired right after a tuple is inserted into the window.

Three events are specific to the tumbling windows. They are listed here as they appear in the WindowEvent class:

virtual void beforeWindowFlushEvent(WindowType & window, PartitionType const & partition) {}
virtual void afterWindowFlushEvent(WindowType & window, PartitionType const & partition) {}
virtual void onEmptyWindowPunctEvent(WindowType & window) {}

beforeWindowFlushEvent is fired right before a tumbling window is flushed, whereas afterWindowFlushEvent is fired right after a tumbling window is flushed. onEmptyWindowPunctEvent is fired when a tumbling window has trigger punct, and a punctuation is received when there are no tuples in the window. The operator detects that this situation occurs and takes an action (such as submitting a Window punctuation).

Four events are specific to the sliding windows. They are listed here as they appear in the WindowEvent class:

virtual void beforeTupleEvictionEvent(
     WindowType & window, TupleType & tuple, PartitionType const & partition) {}
virtual void afterTupleEvictionEvent(
    WindowType & window, TupleType & tuple, PartitionType const & partition) {}
virtual void onWindowTriggerEvent(
    WindowType & window, PartitionType const & partition) {}
virtual void onWindowInitialFullEvent(
    WindowType & window, PartitionType const & partition) {}

beforeTupleEvictionEvent is fired right before a tuple is evicted from a sliding window, whereas afterTupleEvictionEvent is fired right after a tuple is evicted from a sliding window. onWindowTriggerEvent is fired when a sliding window is triggered. Finally, onWindowInitialFullEvent is fired when a sliding window is full for the first time. This function is particularly useful when the window trigger events are to be ignored before the window becomes full for the first time (a common use case for operators like Aggregate).

The following events are specific to partitioned windows and are listed in the WindowEvent class:
virtual void onWindowPartitionEvictionSelection(WindowType& window,
                                                PartitionSelectionIteratorconst& begin,
                                                PartitionSelectionIteratorconst& end){}
virtual void onWindowPartitionEviction(WindowType& window,
                                       PartitionIteratorconst& begin,
                                       PartitionIteratorconst& end) {}
The onWindowPartitionEvictionSelection event is fired to select partitions for eviction. The onWindowPartitionEviction event is fired before the selected partitions are evicted.

Events are registered with the windows using the register<EventType>Handler family of functions. For the window events that are common to tumbling and sliding windows, these registration functions are defined in the Window base class, as follows:

WindowEventType * registerBeforeTupleInsertionHandler(WindowEventType * e); 
WindowEventType * registerAfterTupleInsertionHandler(WindowEventType * e);

These functions take a pointer to a WindowEventType object, which represents the event handler for the particular type of event under consideration. If this pointer is NULL, then the current event handler (if any) is removed. The registration functions always return the previous value for the event handler object pointer. Initially all event handler object pointers are set to NULL, which means there are no event handlers registered.

For the window events that are specific to tumbling windows, these registration functions are defined in the TumblingWindow class, as follows:

WindowEventType * registerBeforeWindowFlushHandler(WindowEventType * e);
WindowEventType * registerAfterWindowFlushHandler(WindowEventType * e);

For the window events that are specific to sliding windows, these registration functions are defined in the SlidingWindow class, as follows:

WindowEventType * registerBeforeTupleEvictionHandler(WindowEventType * e);
WindowEventType * registerAfterTupleEvictionHandler(WindowEventType * e);
WindowEventType * registerOnWindowTriggerHandler(WindowEventType * e);
WindowEventType * registerOnWindowInitialFullHandler(WindowEventType * e);

Accessing Window Data: There are three functions available for accessing data that is stored within windows. These functions are defined in the Window class, as follows:

StorageType & getWindowStorage();
DataType & getWindowData(PartitionType const & partition);
DataType & getWindowData();

The getWindowStorage function returns a reference to the main window storage area, which maps each partition to its own specific storage area that contains window data. For a partition, its window data can be accessed through the getWindowData function. Also, a version of getWindowData exists that does not take a partition parameter. This version is used for non-partitioned windows and creates the default partition if it does not already exist.

Window events do not involve multi-threading in the absence of time-based eviction and trigger policies. Otherwise, each time-based policy of a window brings in a thread of its own. For instance, a tumbling window introduces a thread when it is configured with a time-based eviction policy. A sliding window can bring in up to two threads, as it might have a time-based trigger policy in addition to a time-based eviction policy. For a window with a time-based policy, the window is always locked when an event is fired, so the users do not need to deal with locking inside event handlers. In a time-based eviction policy, the tuple eviction and window flush events (before/afterTupleEvictionEvent and before/afterWindowFlushEvent) are fired by the eviction thread. In a time-based trigger policy, the window trigger events (onWindowTriggerEvent) are fired by the trigger thread.

To access the window contents from outside an event handler, in the presence of time-based eviction or trigger policies, the AutoWindowDataAcquirer class can be used. The usage is as follows:

{ AutoWindowDataAcquirer awa(window);
  ... // access the window data safely
}

The AutoWindowDataAcquirer object, when constructed, calls acquireData() on the window, and when destructed (when it goes out of scope in the example), it calls releaseData() on the window. These operations reduce to a no-op for windows that are free of time-based eviction and trigger policies. As a result, this pattern can be safely used in the general case without worrying about unnecessary processing.

Putting it All Together: To summarize, here is a list of the steps that are involved in using the windowing library from within a primitive operator:

Look at an end-to-end example of using the windowing library in a non-generic operator. This example is extended to a generic operator that handles various types of windows in Window handling.

For illustration purposes, use a small application called SensorQuery. This application aims at answering queries that are posed against the recent history of sensor readings. It contains two sources, one generating queries, and another generating sensor readings. Create an operator that keeps the last n sensor readings and evaluates the incoming queries against these readings, outputting any matches that are found. The matching is based on the distance of the query to the sensor that originates the reading. In other words, this operator keeps a sliding window with a count-based eviction policy of n on the sensor readings, and matches the incoming queries against that window, using nested loop processing. To make things a little more interesting, this operator also tracks how many times a sensor reading was matched during its stay within the window, and outputs readings that are popular when such readings are evicted from the window. A separate output stream is used for this purpose. Here is the SPL code for this application.

composite SensorQuery {
  type
    Position = float64 x, float64 y;
    Reading = int32 sensorId, Position position, float64 value;
    Query = int32 queryId, Position position, float64 distance;
    Match = int32 queryId, int32 sensorId, float64 value;
  graph
    stream<Reading> Readings as O = Beacon() {
      param iterations : 10000u;
      output O : sensorId = (int32) (100.0*random()),
                 position = { x = random(), y = random() },
                 value = 1000.0 * random();
    }
    stream<Query> Queries as O = Beacon() {
      logic state : mutable int32 id = -1;
      param iterations : 10000u;
      output O : queryId = ++id, distance = 0.05,
                 position = { x = random(), y = random() };
    }
    (stream<Match> Matches; stream<Reading> PopReadings)
        = FindReadings(Readings; Queries)
            { param size : 100u; threshold : 3u; }
}

Beacon operators are used as simple workload generators for the sensor readings and the queries. An operator that is named FindReadings observes data on the sensor and query streams using two input ports. This operator produces the match results and the popular sensor readings, using two output streams on two different output ports. As part of this example, this operator can be implemented as a non-generic operator that employs the windowing library. For brevity, assume that the operator model is already set up, and focus on the C++ code for implementing the operator. Start with the header file that defines the operator, as follows:

#include <SPL/Runtime/Window/Window.h>
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_PROLOGUE
class MY_OPERATOR : public MY_BASE_OPERATOR,
                    public WindowEvent<MY_BASE_OPERATOR::IPort0Type*> {
                    // extend from the window event class
public:
  MY_OPERATOR();
  virtual ~MY_OPERATOR();
  void process(Tuple const & tuple, uint32_t port);
  // method to handle window eviction events
  void beforeTupleEvictionEvent(Window<IPort0Type*> & window,
                                IPort0Type * & tuple, int const &);
private:
  Mutex mutex_; // This is a stateful operator
  typedef SlidingWindow<IPort0Type*> WindowType; // Use pointers
  WindowType windowOfReadings_; // Keep the current readings
  map<uintptr_t,uint32> matchedReadings_; // Keep match counts
};
#pragma SPL_NON_GENERIC_OPERATOR_HEADER_EPILOGUE

First, a member variable named windowOfReadings_ is defined. The type of the window that is used is SlidingWindow<IPort0Type*>. In other words, it stores pointers to tuples in the window. The tuple type for the window corresponds to the sensor readings. Also, it keeps a map from sensor reading tuples to the number of times they match to a query. This map is called matchedReadings_. It contains tuples for which there was at least one match. Such tuples are stored until they are evicted from the window. Use this map to track the popular tuples.

Second, the operator class extends from WindowEvent<MY BASE OPERATOR::IPort0Type*> and implements only one event handler, namely beforeTupleEvictionEvent. This event is sufficient to implement the wanted functionality.

Now look at the implementation of this operator. The implementation can be described in pieces. The first piece consists of the constructor and the destructor.

#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_PROLOGUE
MY_OPERATOR::MY_OPERATOR()
  : windowOfReadings_(*this /*operator*/, 0 /*port*/,
      CountWindowPolicy(getParameter_size()), // eviction policy
      CountWindowPolicy(1)) // trigger policy
{ windowOfReadings_.registerBeforeTupleEvictionHandler(this); }

MY_OPERATOR::~MY_OPERATOR()
{ windowOfReadings_.deleteWindowObjects(); }
...
#pragma SPL_NON_GENERIC_OPERATOR_IMPLEMENTATION_EPILOGUE

In the constructor, initialize the window by passing it the operator, the input port index, and the window policies. Use a count-based eviction policy, where the size is retrieved from the parameter size. A trigger policy of size 1 is used. Also, register the operator with the window to handle the beforeTupleEviction events. In the destructor, call the window's deleteWindowObjects function to reclaim memory for the tuples contained. This call is needed because pointers in the window are stored, rather than tuple values.

Now look at the implementation of the processing logic for the tuples from the first port, representing sensor readings.

void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
  AutoPortMutex apm(mutex_, *this);
  if(port==0) {
    IPort0Type const & reading = static_cast<IPort0Type const&>(tuple);
    windowOfReadings_.insert(new IPort0Type(reading));
  } else { ... }
}

Use an auto port mutex to protect the processing logic, since the operator is stateful. The processing logic for the first port is as simple as inserting a new tuple that represents the just received sensor reading into the window. This action might in turn fire an eviction event.

Now look at the processing logic for the tuples from the second port that represents the queries:

void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) {
  AutoPortMutex apm(mutex_, *this);
  if(port==0) { ... } else {
    IPort1Type const & query = static_cast<IPort1Type const&>(tuple);
    IPort1Type::position_type const & qPos = query.get_position();
    WindowType::DataType & content = windowOfReadings_.getWindowData();
    for(WindowType::DataType::iterator it=content.begin(); it!=content.end(); ++it) {
       IPort0Type const & reading = **it;
       IPort0Type::position_type const & rPos = reading.get_position();
       float64 distance = spl::sqrt(spl::pow(rPos.get_x()-qPos.get_x(), 2.0) +
                          spl::pow(rPos.get_y()-qPos.get_y(), 2.0));
       if(distance<=query.get_distance()) {
         OPort0Type otuple(query.get_queryId(), reading.get_sensorId(),
                           reading.get_value());
         submit(otuple, 0);
         uintptr_t ptr = reinterpret_cast<uintptr_t>(&reading);
         map<uintptr_t,uint32>::iterator mit = matchedReadings_.find(ptr);
         if(mit==matchedReadings_.end())
           matchedReadings_.insert(std::make_pair(ptr,1));
         else
           mit->second = mit->second+1;
} } } }

To process a query tuple, get access to the contents of the sensor readings window and iterate over it. For each sensor reading seen, compute the distance between the sensor reading and the query, and check whether this distance is less than or equal to the threshold specified in the query. If so, create an output tuple that represents the match and submit it to the first output port. If a match, also insert the sensor reading tuple pointer into the matchedReadings_ map. Increment the number of times the sensor reading was matched to a query, if it was already in the map.

Now look at the tuple eviction event handler for sensor readings.

void MY_OPERATOR::beforeTupleEvictionEvent(Window<IPort0Type*> & window,
                                           IPort0Type * & reading, int const &) {
  uintptr_t ptr = reinterpret_cast<uintptr_t>(reading);
  map<uintptr_t,uint32>::iterator mit = matchedReadings_.find(ptr);
  if(mit!=matchedReadings_.end()) {
    uint32 numMatches = mit->second;
    if(numMatches >= getParameter_threshold())
      submit(*reading, 1);
    matchedReadings_.erase(mit);
  }
  delete reading;
}

When a sensor reading tuple is evicted, first check if this tuple had any matches. If so, check the number of matches and output the tuple on the second port in case the number of matches is beyond the threshold that is specified by the threshold parameter. In other words, if it is determined that the sensor reading tuple is a popular one, output it. Also, remove the tuple from the matched readings map. Before the event handler returns, reclaim the memory for the tuple by deleting its pointer.