IBM InfoSphere Streams Version 4.1.0

Operator TCPSource

Primitive operator image not displayed. Problem loading file: ../../image/tk$spl/op$spl.adapter$TCPSource.svg

The TCPSource operator reads data from a TCP socket and creates tuples out of it. It can be configured as a TCP server (listens for a client connection) or as a TCP client (initiates a connection to a server). In both modes, it handles a single connection at a time. It works with both IPv4 and IPv6 addresses.

The TCPSource operator accepts an optional output clause. Any SPL expression and supported custom output functions can be used in the output clause. Output attributes can be assigned values on the output clause. If they have an assignment, the expression value is assigned to the attribute and the attribute is not part of the value that is read from the source. For the line and block formats, there is only one attribute that is not assigned by an output assignment and this attribute must have type rstring (for line format) or blob (for block format). For all other formats, any attributes that are not assigned in the output clause are read from the input by using csv, txt, or bin format.

Checkpointed data

The TCPSource operator does not support checkpointing.

Behavior in a consistent region

The TCPSource operator is not supported in a consistent region.

Checkpointing behavior in an autonomous region

When the TCPSource operator is in an autonomous region and configured with config checkpoint : periodic(T) clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

When the TCPSource operator is in an autonomous region and configured with config checkpoint : operatorDriven clause, no checkpoint is taken at runtime. Upon restart, the operator restores to its initial state.

Such checkpointing behavior is subject to change in the future.

Exceptions

The TCPSource operator throws an exception and terminates the operator in the following cases:
  • The host cannot be resolved.
  • The name cannot be located.
  • Unable to set SO_REUSEADDR on TCP socket.
  • Unable to bind to port.

Examples

This example uses the TCPSource operator.

composite Main {                                                                
  graph                                                                         
    // server source with an alias string as port                               
    stream<rstring name, uint32 age, uint64 salary> Beat = TCPSource()          
    {                                                                           
      param                                                                     
        role : server;                                                          
        port : "ftp";                                                           
    }                                                                           
    // server source with a number string as port                               
    stream<Beat> Beat1 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role : server;                                                          
        port : 23145u;                                                          
        keepAlive : {time=7200, probes=9, interval=75};
    }                                                                           
    // server source with a name, registering interface eth1                     
    stream<Beat> Beat2 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role  : server;                                                         
        name : "my_server";                                                     
        interface : "eth1";                                                     
    }                                                                           
    // server source with a name and port                                       
    stream<Beat> Beat3 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role  : server;                                                         
        port  : 23145u;                                                         
        name : "my_server";                                                     
    }                                                                           
    // server source with a port and infinite reconnection                      
    stream<Beat> Beat4 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role              : server;                                             
        port              : "ftp";                                              
        reconnectionPolicy : InfiniteRetry;                                     
    }                                                                           
    // server source with a port and reconnection (5 times)                     
    stream<Beat> Beat4r = TCPSource()                                           
    {                                                                           
      param                                                                     
        role              : server;                                             
        port              : "ftp";                                              
        reconnectionPolicy : BoundedRetry;                                      
        reconnectionBound  : 5u;                                                
    }                                                                           
    // client source with an IP address and port                                
    stream<Beat> Beat5 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role    : client;                                                       
        address : "99.2.45.67";                                                 
        port    : "ftp";                                                        
    }                                                                           
    // client source with an host name as the address                            
    stream<Beat> Beat6 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role    : client;                                                       
        address : "mynode.mydomain";                                            
        port    : 23145u;                                                       
    }                                                                           
    // client source with name                                                  
    stream<Beat> Beat7 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role  : client;                                                          
        name : "my_server";                                                     
    }                                                                           
    // client source with reconnection                                          
    stream<Beat> Beat8 = TCPSource()                                            
    {                                                                           
      param                                                                     
        role              : client;                                             
        address           : "mynode.mydomain";                                  
        port              : "ftp";                                              
        reconnectionPolicy : InfiniteRetry;                                     
    }                                                                           
    // client source with reconnection interval (and 10 connections)            
    // Wait 5 seconds before starting                                           
    stream<Beat> Beat9=  TCPSource()                                            
    {                                                                           
      param                                                                     
        role                : client;                                           
        address             : "mynode.mydomain";                                
        port                : "ftp";                                            
        reconnectionPolicy   : BoundedRetry;                                    
        reconnectionBound    : 10u;                                             
        initDelay           : 5.0;                                              
    }                                                                           
}                                                                                  

Summary

Ports
This operator has 0 input ports and 1 output port.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 22 parameters.

Required: role

Optional: address, port, name, format, encoding, defaultTuple, parsing, hasDelayField, compression, eolMarker, blockSize, reconnectionPolicy, reconnectionBound, receiveBufferSize, initDelay, separator, interface, readPunctuations, ignoreExtraCSVValues, confirmWireFormat, keepAlive

Metrics
This operator reports 4 metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Output Ports

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.
Output Functions
OutputFunctions
rstring RemoteIP()

Returns the address of the remote end of the TCP connection. If the name cannot be resolved, this address is a host name or a dotted IP address.

uint32 RemotePort()

Returns the port number of the remote end of the TCP connection.

uint32 LocalPort()

Returns the port number of the local end of the TCP connection.

uint32 ServerPort()

Returns the port number that the TCP server is listening on. This custom output function is valid only when the role parameter value is server.

int64 TupleNumber()

Returns the number of tuples that are generated by this operator since the start of the current TCP connection. The first tuple that is generated has number 0.

<any T> T AsIs(T)

Return the input value

Ports (0)

The TCPSource operator is configurable with a single output port, which produces tuples read from the TCP connection. The TCPSource operator outputs a window marker punctuation when a TCP connection terminates unless readPunctuations is true.

Properties

Parameters

This operator supports 22 parameters.
address

In the case of a client-based TCPSource operator, this parameter specifies the destination server address of the TCP connection.

The address parameter must be specified when the role parameter value is client and the name parameter is not specified. In all other cases, it cannot be specified.

The parameter value can be a host name or an IP address. The address might not be used for a server-based TCPSource operator, as the address used is always on the current host.

Properties

port

In the case of a server-based TCPSource operator, this parameter specifies the port address on which the connections are accepted. In the case of a client-based TCPSource operator, it specifies the destination server port address.

The parameter value might be a well-known port alias, such as http or ftp, as specified in etc/services. It can also be a plain port number, such as 45134u.

It is an optional parameter for server-based TCPSource operators. Its default value is 0, which picks any available port.

For client-based TCPSource operators, the port parameter must be specified when the name parameter is not specified and it cannot be specified otherwise.

Properties

name

In the case of a server-based TCPSource operator, this parameter specifies the name that is used to register the address and port pair for the server with the name service that is part of the InfoSphere® Streams instance. This name can be used by a corresponding client-based TCPSink operator to connect to this operator by just specifying the name.

These names are automatically prefixed by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set by using config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators keep trying periodically to register the name, with an error message for each failure.

In the case of a client-based TCPSource, this parameter specifies the name to be used to look up the address and port pair for the destination server from the name service that is part of the InfoSphere Streams instance.

You can use the streamtool getnsentry command to query server-based TCPSource addresses. The Value field contains host:port.

When the name parameter is specified in the client-mode, the port and address parameters cannot be specified.

Properties

role

Specifies whether the TCPSource operator is server-based or client-based. It takes one of the following two values: server and client.

Properties

format

Specifies the format of the data. For more information, see the format parameter in the FileSource operator.

Properties

encoding

Specifies the character set encoding of the input. For more information, see the encoding parameter in the FileSource operator.

Properties

defaultTuple

Specifies the default tuple to use for missing fields. For more information, see the defaultTuple parameter in the FileSource operator.

Properties

parsing

Specifies the parsing behavior of the TCPSource operator. There are three valid values: strict, permissive, and fast.

When the parameter value is strict, incorrectly formatted tuples result in a runtime error and termination of the operator.

When the parameter value is permissive, incorrectly formatted tuples result in the creation of a runtime log entry and the parser makes an effort to skip to the next tuple (formats txt and csv) and continue. If the format is bin, the parser closes the current connection and starts reading the next connection (if the reconnectionPolicy permits). The permissive parameter value can be used with only txt, csv, and bin formats.

When the parameter value is fast, the input is assumed to be formatted correctly and no runtime checks are performed. Incorrect input in fast mode causes undefined behavior.

The default parsing mode is strict.

Properties

hasDelayField

Specifies whether the format contains inter-arrival delays as the first field. For more information, see the hasDelayField parameter in the FileSource operator.

Properties

compression

Specifies the compression mode. For more information, see the compression parameter in the FileSource operator.

Properties

eolMarker

Specifies the end of line marker. For more information, see the eolMarker parameter in the FileSource operator.

Properties

blockSize

Specifies the block size for the block format. For more information, see the blockSize parameter in the FileSource operator.

Properties

reconnectionPolicy

Specifies the reconnection policy. In the case of a server-based TCPSource operator, this parameter specifies whether more connections are allowed when the initial connection terminates. In the case of a client-based TCPSource operator, this parameter specifies whether more connection attempts are made when the initial connection to the server terminates.

The valid values are: NoRetry, InfiniteRetry, and BoundedRetry. If this parameter is not specified, it is set to InfiniteRetry.

When this parameter value is NoRetry, the TCPSource operator produces a final marker punctuation right away after the initial connection is terminated and a window marker punctuation is sent.

Properties

reconnectionBound

Specifies the number of successive connections that are attempted for a client-based TCPSource operator or accepted for a server-based TCPSource operator. This parameter must be specified when the reconnectionPolicy parameter value is BoundedRetry and cannot be used otherwise.

Properties

receiveBufferSize

Specifies the kernel receive buffer size for the TCP socket.

Properties

initDelay

Specifies the number of seconds to delay before starting to produce tuples. For more information, see the initDelay parameter in the FileSource operator.

Properties

separator

Specifies the separator character for the csv format. For more information, see the separator parameter in the FileSource operator.

Properties

interface

Specifies the network interface to use to register when the name parameter is specified. This parameter is valid only when the role parameter value is server and the name parameter is specified.

Using this parameter with the name parameter ensures that a matching operator with a role parameter value of client and the same name parameter uses the wanted interface.

Properties

readPunctuations

Specifies whether to read punctuations from 'bin' format input. For more information, see the readPunctuations parameter in the FileSource operator.

Properties

ignoreExtraCSVValues

Specifies whether to skip any extra fields before the end of line when reading in CSV format. For more information, see the ignoreExtraCSValues parameter in the FileSource operator.

Properties

confirmWireFormat

Specifies whether an exchange of the wire format description is required to confirm that the client and server are passing compatible data. If this parameter is not specified, the default value is false. The wire format is defined in etc/xsd/SPL/wireFormatModel.xsd.

Properties

keepAlive

Specifies to use the keepalive timer to check if the connection is alive. This parameter applies only to TCPSource operators for which the role parameter value is server. The parameter prevents a server from hanging on a connection when it drops and it keeps a connection live when there is no normal activity.

The value of the parameter is a tuple literal that specifies the configurable attributes. If any of the configurable attribute values for a specific keepAlive parameter are set to zero, the default system value of the keepAlive parameter is used. This parameter has the following configurable attributes:
  • int32 time: The number of seconds that the connection sits idle before a keepalive probe is sent.
  • int32 probes: The maximum number of probes to send to establish the state of the connection.
  • int32 interval: The time interval in seconds between each probe.
Properties

Code Templates

TCPSource (client)
stream<${schema}> ${outputStream} = TCPSource() {
            param
                role: client;
                address : "${hostOrIp}";
                port : "${sourcePort}";
        }
      

TCPSource (server)
stream<${schema}> ${outputStream} = TCPSource() {
            param
                role: server;
                port : "${sourcePort}";
        }
      

TCPSource with Name (client or server)
stream<${schema}> ${outputStream} = TCPSource() {
            param
                role: ${serverOrClient};
                name : "${name}";
        }
      

Metrics

nInvalidTuples - Counter

The number of tuples that failed to read correctly in csv or txt format.

nReconnections - Counter

The number of times the input connection was re-establised.

nConnections - Gauge

The number of currently active TCP/IP connections. If the TCPSource operator is waiting for a connection or a reconnection, the value is 0. If the operator is connected, the value is 1.

nConfirmWireFormatFailures - Counter

The number of times the input connection wire format handshake failed.

Libraries

spl-std-tk-lib
Library Name: streams-stdtk-runtime
Include Path: ../../../impl/include