Interface OperatorContext
-
public interface OperatorContext
Context information for theOperator
's execution context.
At SPL compile time an operator class can verify its invocation context by annotating static methods withOperatorContext.ContextCheck
, such as verify its input port schema matches its output port schema.
At execution time an operator uses its context to interact with its invocation context including itsinput ports
,output ports
for tuple submission, andthread factory
&execution service
for background activity.A JMX management bean
OperatorContextMXBean
is registered in the platform MBean server for every operator'sOperatorContext
. The management bean allows code to respond to the operator's life cycle notifications without being tightly integrated with a specific operator'sOperator.allPortsReady()
orOperator.shutdown()
methods. For example an object can be initialized byOperator.initialize(OperatorContext)
and then manage its own shutdown by registering aNotificationListener
against theOperatorContextMXBean.SHUTDOWN
notification.- See Also:
OperatorContextMXBean
-
-
Nested Class Summary
Nested Classes Modifier and Type Interface and Description static interface
OperatorContext.ContextCheck
A method annotated@ContextCheck
allows checking of an operator's invocation context.
-
Method Summary
Methods Modifier and Type Method and Description void
addClassLibraries(java.lang.String[] libraries)
Add class libraries to the operator's class loader.int
getChannel()
Get the index of the parallel channel the operator is on.java.lang.String
getKind()
Get the SPL primitive operator kind that is executing the this operator.java.lang.String
getLogicalName()
Get the logical name of the operator.int
getMaxChannels()
Get the total number of parallel channels for the parallel region that the operator is in.OperatorMetrics
getMetrics()
Get the OperatorMetrics object for access to metrics specific to this operator.java.lang.String
getName()
Get the name of the operator.int
getNumberOfStreamingInputs()
Get the number of streaming input ports.int
getNumberOfStreamingOutputs()
Get the number of output ports.<O extends OptionalContext>
OgetOptionalContext(java.lang.Class<O> contextType)
Get an optional context for this operator invocation.java.util.Set<java.lang.String>
getParameterNames()
Return the set of names for parameters that have values.java.util.List<java.lang.String>
getParameterValues(java.lang.String parameterName)
Returns the list of parameter values for the given parameter name.ProcessingElement
getPE()
Get the processing element hosting this Operatorjava.util.concurrent.ScheduledExecutorService
getScheduledExecutorService()
Return a scheduler to execute background tasks.java.util.List<StreamingInput<Tuple>>
getStreamingInputs()
Get the list of input ports.java.util.List<StreamingOutput<OutputTuple>>
getStreamingOutputs()
Get a list of output ports.java.util.concurrent.ThreadFactory
getThreadFactory()
Return a ThreadFactory specific to the Operator with the thread context class loader set correctly.java.io.File
getToolkitDirectory()
Get the root directory of the toolkit that declares thegetKind()
of this operator invocation.void
registerStateHandler(StateHandler handler)
Register aStateHandler
to manage state for this operator.
-
-
-
Method Detail
-
getName
java.lang.String getName()
Get the name of the operator.The operator name is unique across the application. For operators in a parallel region, the name includes the
channel
they are in.- Returns:
- the name of the operator.
- See Also:
getLogicalName()
-
getNumberOfStreamingInputs
int getNumberOfStreamingInputs()
Get the number of streaming input ports.- Returns:
- number of streaming input ports.
-
getStreamingInputs
java.util.List<StreamingInput<Tuple>> getStreamingInputs()
Get the list of input ports. The order of the list will match the order of the ports in the SPL application.- Returns:
- list of input ports
-
getNumberOfStreamingOutputs
int getNumberOfStreamingOutputs()
Get the number of output ports.- Returns:
- number of output ports.
-
getStreamingOutputs
java.util.List<StreamingOutput<OutputTuple>> getStreamingOutputs()
Get a list of output ports. The order of the list will match the order of the ports in the SPL application.- Returns:
- list of output ports.
-
getParameterValues
java.util.List<java.lang.String> getParameterValues(java.lang.String parameterName)
Returns the list of parameter values for the given parameter name. If no values were supplied for the parameter name then an empty list is returned.
The recommended mechanism for handling operator parameters is using theParameter
annotation. The annotation is applied to a setter method on the operator's class, and then at runtime the setter method will be called with the appropriate value for the SPL parameter. See thedocumentation for @Parameter
for full details.Supported SPL types for parameters are
ustring
,rstring
(using UTF-8 encoding),boolean
,int8
,int16
,int32
,int64
,float32
,float64
,decimal32
,decimal64
anddecimal128
, operator custom literals and stream attributes.
Parameter values set in SPL source are converted toString
values by this method. For example an SPLint32
parameter set as follows:threshold: 42;
can be converted into a Java primitiveint
using:int threshold = Integer.valueOf(context.getParameterValues("threshold").get(0));
For an stream attribute parameter, this method returns each attribute value as a string in the form ofInputPortName.AttributeName
- Parameters:
parameterName
- name of parameter- Returns:
- list of values, an empty list is returned if the parameter was not defined
- Since:
- InfoSphere® Streams Version 4.0 - Support for custom literals and attributes parameter types.
- See Also:
@Parameter is the recommended mechanism for parameters
-
getParameterNames
java.util.Set<java.lang.String> getParameterNames()
Return the set of names for parameters that have values.- Returns:
- set of parameter names, an empty set if no parameters are defined
- See Also:
@Parameter is the recommended mechanism for parameters
-
getScheduledExecutorService
java.util.concurrent.ScheduledExecutorService getScheduledExecutorService()
Return a scheduler to execute background tasks. Operators should utilize this service orgetThreadFactory()
rather than creating their own threads to ensure that the SPL runtime will wait for the operator's background work before completing the operator.The scheduler will be shutdown when the operator is to be shutdown and the scheduler shutdown will occur before
Operator.shutdown()
is called. Once the scheduler is shutdown no new tasks will be accepted. Existing scheduled tasks will remain in the scheduler's queue but periodic tasks will canceled. Once theOperator.shutdown()
returnsExecutorService.shutdownNow()
will be called on the scheduler to cancel any outstanding tasks.
Operator implementations that wish to complete any outstanding tasks at shutdown time can callExecutorService.awaitTermination()
to wait for outstanding tasks to complete or wait on the specificFuture
reference for a task.The Operator will not be seen as complete by the SPL runtime until all of its outstanding scheduled tasks have completed.
The returned scheduler service is guaranteed to be an instance of
java.util.concurrent.ScheduledThreadPoolExecutor
and initially has this configuration:-
corePoolSize
Set toRuntime.availableProcessors()
with a minimum of 2 and maximum of 8. -
allowsCoreThreadTimeOut()
set totrue
-
keepAliveTime
set to 5 seconds
corePoolSize
is eight, eight threads will only be created if there are eight concurrent tasks scheduled. Threads will be removed if they are not needed for thekeepAliveTime
value andallowsCoreThreadTimeOut()
returnstrue
.
Operators may modify these settings, typically duringOperator.initialize(OperatorContext)
.- Returns:
- Scheduler service for the operator.
- See Also:
Operator.shutdown()
,ExecutorService.awaitTermination(long, java.util.concurrent.TimeUnit)
,ExecutorService.shutdown()
,ExecutorService.shutdownNow()
,Future
,ScheduledThreadPoolExecutor
-
-
getThreadFactory
java.util.concurrent.ThreadFactory getThreadFactory()
Return a ThreadFactory specific to the Operator with the thread context class loader set correctly. Operators should utilize the returned factory to create Threads.Threads returned by the ThreadFactory have not been started and are set as daemon threads. Operators may set the threads as non-daemon before starting them. The SPL runtime will wait for non-daemon threads before terminating a processing element in standalone mode.
Any uncaught exception thrown by the
Runnable
passed to theThreadFactory.newThread(Runnable)
will cause the processing element containing the operator to terminate.The ThreadFactory will be shutdown when the operator is to be shutdown and the ThreadFactory shutdown will occur before
Operator.shutdown()
is called. Once the ThreadFactory is shutdown a call tonewThread()
will return null.- Returns:
- A ThreadFactory specific to the Operator.
-
getPE
ProcessingElement getPE()
Get the processing element hosting this Operator- Returns:
- ProcessingElement hosting this Operator instance.
-
getMetrics
OperatorMetrics getMetrics()
Get the OperatorMetrics object for access to metrics specific to this operator.- Returns:
- OperatorMetrics instance for this operator.
-
getKind
java.lang.String getKind()
Get the SPL primitive operator kind that is executing the this operator. For a Java primitive operator the name of the operator will be returned, otherwisespl.utility::JavaOp
will be returned.The return will be
null
if the SPL application executing theOperator
has not been recompiled using InfoSphere Streams 3.0 or later.- Returns:
- SPL primitive operator kind that is executing the this operator.
- Since:
- InfoSphere® Streams Version 3.0
-
addClassLibraries
void addClassLibraries(java.lang.String[] libraries) throws java.net.MalformedURLException
Add class libraries to the operator's class loader. The operator's class loader is set as the thread context class loader for the operator'sthread factory
,executor
and any method invocation on the operator instance, such asinitialize
andprocess
.Operators use this method to add class libraries specific to the invocation of an operator in an SPL application in a consistent manner. An example is defining the jar files that contain the JDBC driver to be used by the application. Typically an
operator parameter
of typerstring
with cardinality greater than one that maps to a Java bean of typeString[]
, for example:private String[] driverLibrary; @Parameter public void setDriverLibrary(String[] driverLibrary) { this.driverLibrary = driverLibrary; } public String[] getDriverLibrary() { return } public void initialize(OperatorContext context) throws Exception { ... // Add JDBC drivers to the class context loader context.addClassLibraries(getDriverLibrary()); ... }
Each element of
libraries
is trimmed and then converted into ajava.net.URL
. If the element cannot be converted to aURL
then it is assumed to represent a file system path and is converted into anURL
representing that path. If the file path is relative it is taken as relative to thetoolkit directory
that declares the operator being invoked. In an SPL application an invocation of an operator may use the SPL functiongetThisToolkitDir()
to locate jars in the toolkit of the SPL file, for example with the above operator parameter example the SPL invocation would include:param driverLibrary: getThisToolkitDir() + "/opt/derby/lib/derby.jar";
If a file path ends with/*
then it is assumed to be a directory and all jar files in the directory with the extension.jar
or.JAR
are added to the operator's class loader.Note: To support the changes to relocatable application bundles in InfoSphere® Streams Version 4.0, the root for relative paths was changed to the
toolkit directory
from thedata directory
.- Parameters:
libraries
- String representations of URLs and file paths to be added into the operator's class loader. Ifnull
then no libraries are added to the class loader.- Throws:
java.lang.IllegalStateException
- Method is being called at SPL compile time.java.net.MalformedURLException
- Since:
- InfoSphere® Streams Version 3.2
-
getChannel
int getChannel()
Get the index of the parallel channel the operator is on.If the operator is in a parallel region, this method returns a value from 0 to N-1, where N is the
number of channels in the parallel region
; otherwise it returns -1.- Returns:
- the index of the parallel channel this operator is on, if the operator is in a parallel region, or -1 if the operator is not in a parallel region.
- Since:
- InfoSphere® Streams Version 3.2
-
getMaxChannels
int getMaxChannels()
Get the total number of parallel channels for the parallel region that the operator is in. If the operator is not in a parallel region, this method returns 0.- Returns:
- the number of parallel channels for the parallel region that this operator is in, or 0 if the operator is not in a parallel region.
- Since:
- InfoSphere® Streams Version 3.2
-
getLogicalName
java.lang.String getLogicalName()
Get the logical name of the operator.- Returns:
- the logical name of the operator.
- Since:
- InfoSphere® Streams Version 3.2
-
getOptionalContext
<O extends OptionalContext> O getOptionalContext(java.lang.Class<O> contextType)
Get an optional context for this operator invocation. An invocation of an operator may have optional contexts, that provide information specific to a particular environment, such as aconsistent region
.- Parameters:
contextType
- Type of the context the operator might be invoked in.- Returns:
- Optional context of type
contextType
if this invocation supports that context, otherwisenull
. - Since:
- InfoSphere® Streams Version 4.0
- See Also:
OptionalContext
-
registerStateHandler
void registerStateHandler(StateHandler handler)
Register aStateHandler
to manage state for this operator. The SPL Runtime will call the handler's methods to make the operator's state consistent or to reset it to a previous consistent state.
Multiple state handlers may be registered by an operator either explicitly using this method or implicitly by:- If an
Operator
itself implementsStateHandler
then it is registered as aStateHandler
. The registration occurs prior to callingOperator.initialize(OperatorContext)
. - If a
StreamWindowListener
implementsStateHandler
then when it isregistered as a window listener
it is also registered as aStateHandler
. The registration occurs whenregisterListener()
is called.
StateHandler
andStreamWindowListener
are registered in a consistent order to ensure the order of information written to a checkpoint and read from a checkpoint upon reset is consistent.- Parameters:
handler
- State handler for this operator.- Since:
- InfoSphere® Streams Version 4.0
- See Also:
StreamWindow.registerListener(com.ibm.streams.operator.window.StreamWindowListener, boolean)
,StreamWindowListener
- If an
-
getToolkitDirectory
java.io.File getToolkitDirectory()
Get the root directory of the toolkit that declares thegetKind()
of this operator invocation.
At SPL compile time this is the location of the toolkit specified to the SPL compilersc
.
At runtime this is the location of the toolkit in the application landing zone that has been unpacked from the application's bundle (sab
file).- Returns:
- the absolute path of the toolkit root directory
- Since:
- InfoSphere® Streams Version 4.0
-
-