Operator TCPSource
SPL standard and specialized toolkits > spl 1.2.1 > spl.adapter > TCPSource
The TCPSource operator reads data from a TCP socket and creates tuples out of it. It can be configured as a TCP server (listens for a client connection) or as a TCP client (initiates a connection to a server). In both modes, it handles a single connection at a time. It works with both IPv4 and IPv6 addresses.
The TCPSource operator accepts an optional output clause. Any SPL expression and supported custom output functions can be used in the output clause. Output attributes can be assigned values on the output clause. If they have an assignment, the expression value is assigned to the attribute and the attribute is not part of the value that is read from the source. For the line and block formats, there is only one attribute that is not assigned by an output assignment and this attribute must have type rstring (for line format) or blob (for block format). For all other formats, any attributes that are not assigned in the output clause are read from the input by using csv, txt, or bin format.
Checkpointed data
The TCPSource operator does not support checkpointing.
Behavior in a consistent region
The TCPSource operator is not supported in a consistent region.
Checkpointing behavior in an autonomous region
When the TCPSource operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.
When the TCPSource operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.
Such checkpointing behavior is subject to change in the future.
Exceptions
- The host cannot be resolved.
- The name cannot be located.
- Unable to set SO_REUSEADDR on TCP socket.
- Unable to bind to port.
Examples
This example uses the TCPSource operator.
composite Main {
graph
// server source with an alias string as port
stream<rstring name, uint32 age, uint64 salary> Beat = TCPSource()
{
param
role : server;
port : "ftp";
}
// server source with a number string as port
stream<Beat> Beat1 = TCPSource()
{
param
role : server;
port : 23145u;
keepAlive : {time=7200, probes=9, interval=75};
}
// server source with a name, registering interface eth1
stream<Beat> Beat2 = TCPSource()
{
param
role : server;
name : "my_server";
interface : "eth1";
}
// server source with a name and port
stream<Beat> Beat3 = TCPSource()
{
param
role : server;
port : 23145u;
name : "my_server";
}
// server source with a port and infinite reconnection
stream<Beat> Beat4 = TCPSource()
{
param
role : server;
port : "ftp";
reconnectionPolicy : InfiniteRetry;
}
// server source with a port and reconnection (5 times)
stream<Beat> Beat4r = TCPSource()
{
param
role : server;
port : "ftp";
reconnectionPolicy : BoundedRetry;
reconnectionBound : 5u;
}
// client source with an IP address and port
stream<Beat> Beat5 = TCPSource()
{
param
role : client;
address : "99.2.45.67";
port : "ftp";
}
// client source with an host name as the address
stream<Beat> Beat6 = TCPSource()
{
param
role : client;
address : "mynode.mydomain";
port : 23145u;
}
// client source with name
stream<Beat> Beat7 = TCPSource()
{
param
role : client;
name : "my_server";
}
// client source with reconnection
stream<Beat> Beat8 = TCPSource()
{
param
role : client;
address : "mynode.mydomain";
port : "ftp";
reconnectionPolicy : InfiniteRetry;
}
// client source with reconnection interval (and 10 connections)
// Wait 5 seconds before starting
stream<Beat> Beat9= TCPSource()
{
param
role : client;
address : "mynode.mydomain";
port : "ftp";
reconnectionPolicy : BoundedRetry;
reconnectionBound : 10u;
initDelay : 5.0;
}
}
Summary
- Ports
- This operator has 0 input ports and 1 output port.
- Windowing
- This operator does not accept any windowing configurations.
- Parameters
- This operator supports 22 parameters.
Required: role
Optional: address, port, name, format, encoding, defaultTuple, parsing, hasDelayField, compression, eolMarker, blockSize, reconnectionPolicy, reconnectionBound, receiveBufferSize, initDelay, separator, interface, readPunctuations, ignoreExtraCSVValues, confirmWireFormat, keepAlive
- Metrics
- This operator reports 4 metrics.
Properties
- Implementation
- C++
- Threading
- Always - Operator always provides a single threaded execution context.
- Assignments
- This operator allows any SPL expression of the correct type to be assigned to output attributes.
- Output Functions
-
- OutputFunctions
-
- rstring RemoteIP()
-
Returns the address of the remote end of the TCP connection. If the name cannot be resolved, this address is a host name or a dotted IP address.
- uint32 RemotePort()
-
Returns the port number of the remote end of the TCP connection.
- uint32 LocalPort()
-
Returns the port number of the local end of the TCP connection.
- uint32 ServerPort()
-
Returns the port number that the TCP server is listening on. This custom output function is valid only when the role parameter value is server.
- int64 TupleNumber()
-
Returns the number of tuples that are generated by this operator since the start of the current TCP connection. The first tuple that is generated has number 0.
- <any T> T AsIs(T)
-
Return the input value
- Ports (0)
-
The TCPSource operator is configurable with a single output port, which produces tuples read from the TCP connection. The TCPSource operator outputs a window marker punctuation when a TCP connection terminates unless readPunctuations is true.
- Properties
-
- Optional: false
- TupleMutationAllowed: true
- WindowPunctuationOutputMode: Generating
- address
In the case of a client-based TCPSource operator, this parameter specifies the destination server address of the TCP connection.
The address parameter must be specified when the role parameter value is client and the name parameter is not specified. In all other cases, it cannot be specified.
The parameter value can be a host name or an IP address. The address might not be used for a server-based TCPSource operator, as the address used is always on the current host.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- port
In the case of a server-based TCPSource operator, this parameter specifies the port address on which the connections are accepted. In the case of a client-based TCPSource operator, it specifies the destination server port address.
The parameter value might be a well-known port alias, such as http or ftp, as specified in etc/services. It can also be a plain port number, such as 45134u.
It is an optional parameter for server-based TCPSource operators. Its default value is 0, which picks any available port.
For client-based TCPSource operators, the port parameter must be specified when the name parameter is not specified and it cannot be specified otherwise.
- Properties
-
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- name
In the case of a server-based TCPSource operator, this parameter specifies the name that is used to register the address and port pair for the server with the name service that is part of the InfoSphere® Streams instance. This name can be used by a corresponding client-based TCPSink operator to connect to this operator by just specifying the name.
These names are automatically prefixed by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set by using config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators keep trying periodically to register the name, with an error message for each failure.
In the case of a client-based TCPSource, this parameter specifies the name to be used to look up the address and port pair for the destination server from the name service that is part of the InfoSphere Streams instance.
You can use the streamtool getnsentry command to query server-based TCPSource addresses. The Value field contains host:port.
When the name parameter is specified in the client-mode, the port and address parameters cannot be specified.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- role
Specifies whether the TCPSource operator is server-based or client-based. It takes one of the following two values: server and client.
- Properties
-
- Type: SocketRole (server, client)
- Cardinality: 1
- Optional: false
- ExpressionMode: CustomLiteral
- format
Specifies the format of the data. For more information, see the format parameter in the FileSource operator.
- Properties
-
- Type: DataFormat (csv, txt, bin, block, line)
- Cardinality: 1
- Optional: true
- ExpressionMode: CustomLiteral
- encoding
Specifies the character set encoding of the input. For more information, see the encoding parameter in the FileSource operator.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- defaultTuple
Specifies the default tuple to use for missing fields. For more information, see the defaultTuple parameter in the FileSource operator.
- Properties
-
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- parsing
Specifies the parsing behavior of the TCPSource operator. There are three valid values: strict, permissive, and fast.
When the parameter value is strict, incorrectly formatted tuples result in a runtime error and termination of the operator.
When the parameter value is permissive, incorrectly formatted tuples result in the creation of a runtime log entry and the parser makes an effort to skip to the next tuple (formats txt and csv) and continue. If the format is bin, the parser closes the current connection and starts reading the next connection (if the reconnectionPolicy permits). The permissive parameter value can be used with only txt, csv, and bin formats.
When the parameter value is fast, the input is assumed to be formatted correctly and no runtime checks are performed. Incorrect input in fast mode causes undefined behavior.
The default parsing mode is strict.
- Properties
-
- Type: ParseOption (strict, permissive, fast)
- Cardinality: 1
- Optional: true
- ExpressionMode: CustomLiteral
- hasDelayField
Specifies whether the format contains inter-arrival delays as the first field. For more information, see the hasDelayField parameter in the FileSource operator.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- compression
Specifies the compression mode. For more information, see the compression parameter in the FileSource operator.
- Properties
-
- Type: CompressionAlg (zlib, gzip, bzip2)
- Cardinality: 1
- Optional: true
- ExpressionMode: CustomLiteral
- eolMarker
Specifies the end of line marker. For more information, see the eolMarker parameter in the FileSource operator.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- blockSize
Specifies the block size for the block format. For more information, see the blockSize parameter in the FileSource operator.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- reconnectionPolicy
Specifies the reconnection policy. In the case of a server-based TCPSource operator, this parameter specifies whether more connections are allowed when the initial connection terminates. In the case of a client-based TCPSource operator, this parameter specifies whether more connection attempts are made when the initial connection to the server terminates.
The valid values are: NoRetry, InfiniteRetry, and BoundedRetry. If this parameter is not specified, it is set to InfiniteRetry.
When this parameter value is NoRetry, the TCPSource operator produces a final marker punctuation right away after the initial connection is terminated and a window marker punctuation is sent.
- Properties
-
- Type: ReconnectionPolicy (InfiniteRetry, NoRetry, BoundedRetry)
- Cardinality: 1
- Optional: true
- ExpressionMode: CustomLiteral
- reconnectionBound
Specifies the number of successive connections that are attempted for a client-based TCPSource operator or accepted for a server-based TCPSource operator. This parameter must be specified when the reconnectionPolicy parameter value is BoundedRetry and cannot be used otherwise.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- receiveBufferSize
Specifies the kernel receive buffer size for the TCP socket.
- Properties
-
- Type: uint32
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- initDelay
Specifies the number of seconds to delay before starting to produce tuples. For more information, see the initDelay parameter in the FileSource operator.
- Properties
-
- Type: float64
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- separator
Specifies the separator character for the csv format. For more information, see the separator parameter in the FileSource operator.
- Properties
-
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- interface
Specifies the network interface to use to register when the name parameter is specified. This parameter is valid only when the role parameter value is server and the name parameter is specified.
Using this parameter with the name parameter ensures that a matching operator with a role parameter value of client and the same name parameter uses the wanted interface.
- Properties
-
- Type: rstring
- Cardinality: 1
- Optional: true
- ExpressionMode: AttributeFree
- readPunctuations
Specifies whether to read punctuations from 'bin' format input. For more information, see the readPunctuations parameter in the FileSource operator.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- ignoreExtraCSVValues
Specifies whether to skip any extra fields before the end of line when reading in CSV format. For more information, see the ignoreExtraCSValues parameter in the FileSource operator.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- confirmWireFormat
Specifies whether an exchange of the wire format description is required to confirm that the client and server are passing compatible data. If this parameter is not specified, the default value is false. The wire format is defined in etc/xsd/SPL/wireFormatModel.xsd.
- Properties
-
- Type: boolean
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- keepAlive
Specifies to use the keepalive timer to check if the connection is alive. This parameter applies only to TCPSource operators for which the role parameter value is server. The parameter prevents a server from hanging on a connection when it drops and it keeps a connection live when there is no normal activity.
The value of the parameter is a tuple literal that specifies the configurable attributes. If any of the configurable attribute values for a specific keepAlive parameter are set to zero, the default system value of the keepAlive parameter is used. This parameter has the following configurable attributes:- int32 time: The number of seconds that the connection sits idle before a keepalive probe is sent.
- int32 probes: The maximum number of probes to send to establish the state of the connection.
- int32 interval: The time interval in seconds between each probe.
- Properties
-
- Type: tuple<int32 time, int32 probes, int32 interval>
- Cardinality: 1
- Optional: true
- ExpressionMode: Constant
- TCPSource (client)
-
stream<${schema}> ${outputStream} = TCPSource() { param role: client; address : "${hostOrIp}"; port : "${sourcePort}"; }
- TCPSource (server)
-
stream<${schema}> ${outputStream} = TCPSource() { param role: server; port : "${sourcePort}"; }
- TCPSource with Name (client or server)
-
stream<${schema}> ${outputStream} = TCPSource() { param role: ${serverOrClient}; name : "${name}"; }
- nInvalidTuples - Counter
-
The number of tuples that failed to read correctly in csv or txt format.
- nReconnections - Counter
-
The number of times the input connection was re-establised.
- nConnections - Gauge
-
The number of currently active TCP/IP connections. If the TCPSource operator is waiting for a connection or a reconnection, the value is 0. If the operator is connected, the value is 1.
- nConfirmWireFormatFailures - Counter
-
The number of times the input connection wire format handshake failed.
- spl-std-tk-lib
- Library Name: streams-stdtk-runtime
- Include Path: ../../../impl/include