IBM InfoSphere Streams Version 4.1.0

Operator StructureParse

Primitive operator image not displayed. Problem loading file: ../../image/tk$com.ibm.streams.teda/op$com.ibm.streams.teda.parser.binary$StructureParse.svg

The StructureParse operator parses a binary data stream, which contains fixed-size structures; therefore, the data is passed in using a blob attribute. Fixed-size data structures contain only items of primitive types. For each fixed-size data structure that is detected in the binary data stream, the operator generates an SPL tuple.

For the operator to process data streams, you must provide the following documents:

  • Structure definition document

    Describes the fixed-size data structures and their fields.

  • Mapping definition document

    Maps the fields to SPL output port attributes.

The operator supports the following features:

  • Different types of fixed-size data structures in the same binary data stream
  • boolean, intb, uintb, floatb, rstring[n], and blob[n] field primitive types
  • Little and big endianness for the int16, int32, int64, uint16, uint32, uint64, float32, and float64 number types
  • Variables to store fixed and field values
  • Conditions to identify the different fixed-size data structures based on fields and variables
  • Padding to skip fill bytes between fixed-size data structures
  • Punctuation- or condition-based resynchronization in the case of failure detection.

In addition, the operator:

  • Converts integer or floating point values into strings
  • Sends metrics and statistics to an optional output port
  • Suppresses tuples that are generated from fixed-size data structures.

It does not actively raise exceptions or catch exceptions that might be thrown, for example, by fused downstream operators.

Behavior in a consistent region and checkpointing

The StructureParse operator can be an operator within the reachability graph of a consistent region. It cannot be the start of a consistent region.

The StructureParse operator also supports periodic checkpointing that is enabled with the checkpoint configuration clause, for example:

config
	checkpoint : periodic(5.0);
	restartable : true;
Structure Definition Document
The structure definition document is an XML document that specifies the fixed-size data structures that can occur in the binary data stream and that are parsed by the StructureParse operator.
Mapping Definition Document
The mapping definition document is an XML document that defines the mapping between the fields of the binary structure and the SPL tuple attributes for the StructureParse operator.
Sample
The following sample shows how to integrate the StructureParse operator with an spl.adapter::FileSource operator. The StructureParse operator generates data and metric tuples.

Summary

Ports
This operator has 1 input port and 2 output ports.
Windowing
This operator does not accept any windowing configurations.
Parameters
This operator supports 5 parameters.

Required: structureDocument, mappingDocument

Optional: payloadAttribute, metricsMode, metricsModeThreshold

Metrics
This operator reports 9 metrics.

Properties

Implementation
C++
Threading
Always - Operator always provides a single threaded execution context.

Input Ports

Ports (0)

The StructureParse operator is configurable with a single input port.

The input port schema must be a tuple with at least one blob attribute, which holds the payload to be parsed. If more than one blob attribute exists, the payloadAttribute parameter must specify the attribute that contains the payload.

Window punctuations can change the operator state under the following conditions:

  • If the operator is in failure mode, a window punctuation resynchronizes the binary data stream.
  • If you do not specify a value for the metricsMode parameter or if it is set to punctuation, then the operator is reset and, if the optional second output port exists, a metrics tuple is generated.
Properties

Output Ports

Output Functions
MetricsFunctions
<any T> T fromInput()

Takes the value from the latest input tuple. An input attribute that has the same name and type as the output attribute must exist.

<any T> T fromInput(rstring attributeName)

Takes the value of the given name from the input attribute of the latest input tuple. An input attribute that has the given name and same type as the output attribute must exist.

<any T> T fromVariable(rstring variableName)

Returns the value of the requested variable.

uint64 getRecordCount(rstring structureName)

Returns the number of records for the given structure.

map<rstring,uint64> getRecordCounts()

Returns the number of records for all structures.

uint64 getRecordByteCount(rstring structureName)

Returns the number of processed bytes for the given structure.

map<rstring,uint64> getRecordByteCounts()

Returns the number of processed bytes for all structures.

map<rstring,map<rstring,uint64>> getRecordStats()

Returns the number of records and the number of processed bytes for all structures.

uint64 nTuplesReceivedTotal()

Returns the number of received tuples.

uint64 nTuplesSentTotal()

Returns the number of sent tuples.

uint64 nBytesReceivedTotal()

Returns the amount of received data (in bytes).

uint64 nBytesDroppedTotal()

Returns the amount of data (in bytes) that was dropped either because of detected unknown structures or window punctuations.

uint64 nTuplesReceived()

Returns the number of received tuples since the last sent metrics tuple.

uint64 nTuplesSent()

Returns the number of sent tuples since the last sent metrics tuple.

uint64 nBytesReceived()

Returns the amount of received data (in bytes) since the last sent metrics tuple.

uint64 nBytesDropped()

Returns the amount of data (in bytes) that was dropped since the last sent metrics tuple, either because of detected unknown structures or latest window punctuation.

uint64 latestPunctuation()

Returns the time of the latest occurrence of a window punctuation (in seconds) since the Epoch (00:00:00 UTC, January 1, 1970).

Ports (0)

The output port generates tuples from the binary data stream.

The StructureParse operator does not allow user-specified assignments to output attributes of this output port.

A value is assigned to an output attribute if one of the following conditions exists:

  • A mapping is specified in the mapping definition document, which results in a field or default value
  • An input attribute of the same name and type exists, which is forwarded from the input port to the output port
Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes.

Properties

Ports (1)

The output port, if present, generates metrics and statistics tuples and supports metric-related assignments to output attributes, using the custom output functions listed under MetricsFunctions.

Assignments
This operator allows any SPL expression of the correct type to be assigned to output attributes. Attributes not assigned in the output clause will be automatically assigned from the attributes of the input ports that have the same name and type. If there is no such input attribute, an error is reported at compile-time.

Properties

Parameters

This operator supports 5 parameters.
payloadAttribute

Specifies the input port blob attribute that holds the payload to be parsed. If one blob attribute exists in the input port schema, it is automatically selected, and this parameter is optional. If more than one blob attribute exists, this parameter is mandatory.

Properties

structureDocument

Specifies the path of the structure definition document, the XML document that describes the fixed-size data structures and their fields in the binary data stream.

The structure definition document is evaluated during compile-time. If you modify the document, recompile the SPL application for the changes to take effect. After the application is compiled, the structure definition document is not required for job submission.

A relative path is relative to the SPL application directory that is the current working directory where the sc command is run. For example, if you specify the etc/StructureDefinition.xml relative path, and run the sc command from the /home/myapp directory, the compiler looks for the StructureDefinition.xml document in the /home/myapp/etc directory.

Properties

mappingDocument

Specifies the path of the mapping definition document, the XML document that describes the mapping of the fixed-size data structure fields to the SPL output attributes.

The mapping definition document is used at SPL compile time. If you modify the document, recompile the SPL application for the changes to take effect. After the application is compiled, the structure definition document is not required for job submission.

A relative path is relative to the SPL application directory that is the current working directory where the sc command is run. For example, if you specify the etc/MappingDefinition.xml relative path and run the sc command from the /home/myapp directory, the compiler looks for the MappingDefinition.xml document in the /home/myapp/etc directory.

Properties

metricsMode

Specifies the trigger mode to send a metrics tuple on the optional second output port. Valid values are punctuation, tuples, and bytes. The default value is punctuation. For tuples and bytes modes, the metricsModeThreshold parameter is mandatory. After each sent metrics tuple, a subset of the online metrics is reset.

For example, if punctuation is specified, a metrics tuple is sent when window punctuation is received. If tuples is specified and metricsModeThreshold is set to 100, a metrics tuple is sent for every 100 input tuples.

Properties

metricsModeThreshold

Specifies the number of received bytes or tuples after which a metrics tuple on the optional second output port is triggered. After each sent metrics tuple, a subset of the online metrics is reset.

For example, if the metricsMode parameter is set to tuples and this parameter is set to 100, a metrics tuple is sent for every 100 input tuples.

This parameter is allowed only if the metricsMode parameter is set to tuples or bytes.

Properties

Code Templates

StructureParse
stream<${schema}> ${outputStream} as O = StructureParse(${inputStream} as I) 
{
	param
		payloadAttribute: ${attributeName};
		structureDocument: "${structureDefinitionFile}";
		mappingDocument: "${mappingDefinitionFile}";
}
      

StructureParse with metrics
(
	stream<${schema}> ${records} as O;
	stream<${schema}> ${metrics} as M
) as ${ParsedRecords} = StructureParse(${inputStream} as I) 
{
	param
		payloadAttribute: ${attributeName};
		structureDocument: "${structureDefinitionFile}";
		mappingDocument: "${mappingDefinitionFile}";
	output M:
		${outputExpression};
}
      

StructureParse with DirectoryScan and FileSource
stream<rstring filename> ${Filenames} as O = DirectoryScan()
{
	param
		directory: "${inputDirectory}";
		pattern: "${filenamePattern}";
}

stream<rstring filename, int64 tupleNo, blob payload> ${DataBlocks} as O = FileSource(${Filenames} as I)
{
	param
		format: block;
		blockSize: ${blocksize}u;
	output O:
		filename = FileName(),
		tupleNo = TupleNumber();
}

stream<${schema}> ${outputStream} as O = StructureParse(${DataBlocks} as I) 
{
	param
		payloadAttribute: ${attributeName};
		structureDocument: "${structureDefinitionFile}";
		mappingDocument: "${mappingDefinitionFile}";
}
      

StructureParse with metrics, DirectoryScan, and FileSource
stream<rstring filename> ${Filenames} as O = DirectoryScan()
{
	param
		directory: "${inputDirectory}";
		pattern: "${filenamePattern}";
}

stream<rstring filename, int64 tupleNo, blob payload> ${DataBlocks} as O = FileSource(${Filenames} as I)
{
	param
		format: block;
		blockSize: ${blocksize}u;
	output O:
		filename = FileName(),
		tupleNo = TupleNumber();
}

(
	stream<${schema}> ${records} as O;
	stream<${schema}> ${metrics} as M
) as ${ParsedRecords} = StructureParse(${DataBlocks} as I) 
{
	param
		payloadAttribute: ${attributeName};
		structureDocument: "${structureDefinitionFile}";
		mappingDocument: "${mappingDefinitionFile}";
	output M:
		${outputExpression};
}
      

Metrics

nTuplesReceivedTotal - Counter

The number of received tuples.

nTuplesSentTotal - Counter

The number of sent tuples.

nBytesReceivedTotal - Counter

The amount of received data (in bytes).

nBytesDroppedTotal - Counter

The amount of dropped data (in bytes), for example because of window punctuations.

nTuplesReceived - Gauge

The number of received tuples since the last sent metrics tuple. This value is reset after a metrics tuple is sent.

nTuplesSent - Gauge

The number of sent tuples since the last sent metrics tuple. This value is reset after a metrics tuple is sent.

nBytesReceived - Gauge

The amount of received data (in bytes) since the last sent metrics tuple. The value is reset after a metrics tuple is sent.

nBytesDropped - Gauge

The amount of dropped data (in bytes) since the last sent metrics tuple, for example because of the latest window punctuation. The value is reset after a metrics tuple is sent.

latestPunctuation - Time

The time (in seconds) of the latest window punctuation since the Epoch (00:00:00 UTC, January 1, 1970).

Libraries

Common Headers
Include Path: ../../../../impl/include/parser.binary