com.ibm.streams.operator

Class AbstractOperator

  • java.lang.Object
    • com.ibm.streams.operator.AbstractOperator
  • All Implemented Interfaces:
    Operator
    Direct Known Subclasses:
    AbstractWindowOperator


    public abstract class AbstractOperator
    extends java.lang.Object
    implements Operator
    An abstract implementation of Operator that may be used by developers to implement their own Operator.
    Simple implementations of the required methods are provided and a number of utility methods.
    • Constructor Detail

      • AbstractOperator

        public AbstractOperator()
    • Method Detail

      • initialize

        public void initialize(OperatorContext context)
                        throws java.lang.Exception
        Initialize this operator. Called once before any tuples are processed.

        This implementation initializes the operator to ensure its utility methods return the correct information.

        Sub-classes must call super.initialize(context) if they override this method.

        Specified by:
        initialize in interface Operator
        Parameters:
        context - OperatorContext for this operator.
        Throws:
        java.lang.Exception - Operator failure, will cause the enclosing PE to terminate.
      • allPortsReady

        public void allPortsReady()
                           throws java.lang.Exception
        Called once initialization is complete and all input and output ports are connected and ready to process and submit tuples. Operators that submit tuples independently of incoming tuples, e.g. source type operators, must wait for this method to be called before submitting any tuples.
        Since this is a notification that all input and output ports are ready, it is possible that process and processPunctuation may be called before allPortsReady, due to thread scheduling.

        Implementations must not block and must return control to the caller.

        This operator's OperatorContextMXBean will also issue a notification when all ports are ready.
        Code may register actions to be invoked at all ports ready using the utility method addAllPortsReadyAction or by adding notification listeners to this operator's OperatorContextMXBean.

        This implementation does nothing.

        Specified by:
        allPortsReady in interface Operator
        Throws:
        java.lang.Exception - Operator failure, will cause the enclosing PE to terminate.
        See Also:
        OperatorContextMXBean, OperatorLifeCycle.addAllPortsReadyAction(OperatorContext, javax.management.NotificationListener)
      • getOperatorContext

        public final OperatorContext getOperatorContext()
        Return the OperatorContext object describing the execution environment of this operator.
        Returns:
        OperatorContext context for this operator
      • process

        public void process(StreamingInput<Tuple> stream,
                   Tuple tuple)
                     throws java.lang.Exception
        Process an incoming tuple that arrived on the specified port.

        This implementation does nothing, sub-classes that need to process incoming tuples must override this method.

        Specified by:
        process in interface Operator
        Parameters:
        stream - Port the tuple is arriving on.
        tuple - Object representing the incoming tuple.
        Throws:
        java.lang.Exception - Operator failure, will cause the enclosing PE to terminate.
      • processPunctuation

        public void processPunctuation(StreamingInput<Tuple> stream,
                              StreamingData.Punctuation mark)
                                throws java.lang.Exception
        Process an incoming punctuation mark on the specified port.

        This implementation punctuates all (if any) output ports with the incoming mark if it is a WINDOW_MARKER. Sub-classes may override this method to provide different behavior.

        Specified by:
        processPunctuation in interface Operator
        Parameters:
        stream - Port punctuation mark arrived on
        mark - value of mark
        Throws:
        java.lang.Exception - Operator failure, will cause the enclosing PE to terminate.
      • checkPorts

        @Deprecated
        public final void checkPorts(int numberInputPorts,
                                 int numberOutputPorts)
        Deprecated. As of InfoSphere Streams 3.0, replaced by input and output port configurations in the Java primitive operator model.
        Check that the correct number of ports exists for the operator.
        Use of this deprecated method can limit the flexibility of an Operator implementation class by fixing its ports thus disallowing sub-classes from adding optional ports, thus its use is not recommended.
        Instead, it is recommended that input and output port configurations in the Java primitive operator model are used.

        Typically called by a sub-class' initialize method. If the number of ports is positive then exactly that many ports must exist according to the context. If the number of ports is negative then at least that many must exist.

        As an example, checkPorts(-1,1) indicates one or more input port and one output port.

        Parameters:
        numberInputPorts - expected number of input ports
        numberOutputPorts - expected number of output ports
        Throws:
        java.lang.IllegalStateException - if ports do not match the requirements
      • getInput

        public final StreamingInput<Tuple> getInput(int port)
        Shorthand method to get the StreamingInput object for a given port.
        Parameters:
        port - input port number.
        Returns:
        the description for the input port
      • getOutput

        public final StreamingOutput<OutputTuple> getOutput(int port)
        Shorthand method to get the StreamingOutput object for a given port.
        Parameters:
        port - output port number
        Returns:
        the description for the input port
      • setLoggerAspects

        public final void setLoggerAspects(java.lang.String loggerName,
                            java.lang.String... aspects)
        Set the aspects associated with a named Logger. The aspects replace any existing aspects associated with any Logger of the given name. Aspects are not inherited by child Loggers.
        Parameters:
        loggerName - Logger name
        aspects - Aspects to associate with named logger.
        Since:
        InfoSphere® Streams Version 3.0
      • createAvoidCompletionThread

        public java.lang.Thread createAvoidCompletionThread()
        Create a thread to avoid completion of the operator by the SPL runtime. This is equivalent to calling
        AbstractOperator.createAvoidCompletionThread(getOperatorContext()).
        This method must be called at Operator.initialize(OperatorContext) time.
        Since:
        InfoSphere® Streams Version 3.1
        See Also:
        createAvoidCompletionThread(OperatorContext)
      • createAvoidCompletionThread

        public static java.lang.Thread createAvoidCompletionThread(OperatorContext context)
        Create a thread that avoids the SPL runtime completing the operator. The SPL runtime will complete an operator once it has no remaining active threads or tasks and all of its input ports have received final marks. Completing an operator will send final marks to all of its output ports, thus closing the ports.
        Source operators that produce tuples in threads that are not visible to the SPL runtime may immediately complete after Operator.allPortsReady() and thus not produce any tuples on their output ports. This typically will happen when the source operator is producing tuples from an event framework. In this case the operator has registered an event handler with the event framework, but the framework is executing using its own threads that are not visible to the SPL runtime.
        The correct mechanism to handle this is to pass the operator's thread factory or execution service when initializing the framework.
        If the framework does not support passing in a thread factory or execution service then the operator needs to create a thread to avoid completion. This convenience method is called at Operator.initialize(OperatorContext) to create such a thread.

        This method:

        1. Creates a thread using OperatorContext.getThreadFactory()
        2. Sets the thread to not be a daemon thread
        3. Registers a notification listener with the operator's OperatorContextMXBean to start the thread at OperatorContextMXBean.ALL_PORTS_READY
        4. Registers a notification listener with the operator's OperatorContextMXBean to interrupt the thread at OperatorContextMXBean.SHUTDOWN

        The thread's run() method simply waits to be interrupted.

        This method must be called at Operator.initialize(OperatorContext) time.
        Parameters:
        context - Context for the operator avoiding completion
        Since:
        InfoSphere® Streams Version 3.1
      • getControlPlaneContext

        public final ControlPlaneContext getControlPlaneContext()
        Get the optional ControlPlaneContext for the Job Control Plane in the application.
        Returns:
        the optional ControlPlaneContext
        Since:
        InfoSphere® Streams Version 4.0
      • setTagData

        public void setTagData(java.lang.String tagName,
                      java.util.Map<java.lang.String,java.lang.String> tagValues)
        Set tag data for the operator. If this value is Operator.TagNames.OperatorIGC, the tags will be registered in the IG catalog. Other tag names may be defined in future releases.
        Specified by:
        setTagData in interface Operator
        Since:
        InfoSphere® Streams Version 4.1