IBM InfoSphere Streams Version 4.1.0

Customizing the ITE application

To customize the ITE application, disable the sample code and adapt the customizable composite operators.

Before you begin

Disable the sample code by turning the ite.embeddedSampleCode parameter off, which is a configuration task. Additionally, assign immediately your parsers to the parameter ite.ingest.reader.parserList.

About this task

The customization of the ITE application requires SPL development skills. Depending on your business needs, you modify one or more of the customizable composite operators.

Add your code to the customizable code areas that are marked with the SPL comments // custom code begin and // custom code end. Typically, the remaining SPL code stays unchanged.

The following composite operators can be customized:

  • <namespace>.chainprocessor.reader.custom::PreFileReader
    PreFileReader

    Implements a file preprocessing that is used to determine attribute values once per file or to determine the file type if the file type cannot be derived from the file name. Each input tuple must result in an output tuple.

    Turn on the ite.ingest.reader.preprocessing parameter to activate this composite operator.

    Add SPL attributes to the ExtendedFileInfo type that is defined in this composite operator.

  • <namespace>.chainprocessor.reader.custom::FileReaderCustom

    FileReaderCustom

    Integrates a parser operator that is a built-in parser but requires a different configuration than the already prepared configurations or that is not available as built-in parser at all.

    The FileReaderASN1, FileReaderCSV, and FileReaderStructure composite operators in the <namespace>.chainprocessor.reader namespace integrate the com.ibm.streams.teda.parser.binary::ASN1Parse, com.ibm.streams.teda.parser.text::CSVParse and com.ibm.streams.teda.parser.binary::StructureParse parser operators. The composite operators support a fixed set of parameters that you should investigate if you plan to use them. If other parameter settings are required, use the <namespace>.chainprocessor.reader.custom::FileReaderCustom to integrate the parser operators with the required settings.

    Use the <namespace>.chainprocessor.reader .custom::CustomParserTemplate template composite operator to implement and integrate your own parser. The template provides a skeleton that already takes care of reading a file with a spl.adapter::FileSource operator and forwarding all relevant SPL attributes to the output port. For more information, see the composite code comments. Typically, you create a copy of the template file.

    Call your customized parser composite operator from within the <namespace>.chainprocessor.reader.custom::FileReaderCustom composite operator.

    Configure the ite.ingest.reader.parserList parameter to activate this composite, for example, ite.ingest.reader.parserList=*|FileReaderCustom

    If more than one new parser is required, create a copy of this composite and adapt and activate it.

  • <namespace>.chainprocessor.reader.custom::RecordConverter

    RecordConverter

    Implements a data conversion step that runs right after the parsing of the input records. Typically, this step is required to rearrange or derive values that are needed during the record validation. If the validation is not needed, the data conversion steps can be also part of the <namespace>.chainprocessor.transformer.custom::DataProcessor composite operator.

    This operator is a template that can be copied to create further record converters.

    You can enable the record conversion for the built-in file readers with their RecordConverterOperator parameter. Set this operator parameter in the <namespace>.chainprocessor.reader.custom::FileReaderCustom composite operator or its copies.

  • <namespace>.chainprocessor.reader.custom::RecordValidator

    RecordValidator

    Implements the validation of the tuples that sends valid and invalid tuples to different output ports. In case of invalid tuples, provides additional information that can be used to troubleshoot the problem, as sketched in the customizable code area.

  • <namespace>.chainsink.custom::RejectWriterCustom

    RejectWriterCustom

    Implements a user-defined output format for rejected tuples. You might want to replace the spl.adapter::FileSink operator with another sink operator or you might want to, for example, send alarms.

    Turn the ite.storage.rejectWriter.custom parameter on to activate this composite operator.

  • <namespace>.chainsink.custom::PostContextDataProcessor

    PostContextDataProcessor

    Implements a processing that handles the tuples after they leave the group processing but before they enter the storage stage. You might want to, for example, reject duplicate tuples.

    The composite operator gets the data stream and the statistics stream as input stream that it must handle properly. That means, for example, it can modify and forward the valid tuples to the first output port or it can reject tuples and send them to the third output port. If the post-processing tap is enabled (ite.businessLogic.group.tap=on) the composite operator must also provide these tuples on the fourth output port. Finally, it modifies and forwards incoming statistics tuples to the second output port.

    Turn the ite.businessLogic.transformation.postprocessing.custom parameter on to activate this composite operator.

  • <namespace>.chainsink.custom::AuditTableWriter

    AuditTableWriter

    Implements a processing for file statistics that you can use to write the statistics to a database or export the statistics to another application. The input stream must be forwarded to the output port.

    Turn the ite.storage.auditOutputs parameter on to activate this composite operator.

  • <namespace>.chainsink.custom::FileWriterCustom

    FileWriterCustom

    Implements one or more sink operators and any required distribution of a tuple to one or more of the sink operators. You can implement, for example, your own file name schema or other sink operators than the spl.adapter::FileSink operator.

    Set the ite.storage.type parameter to custom to activate this composite operator.

  • <namespace>.tap.custom::TransformerTap

    TransformerTap

    Implements features that do not alter the data that is stored in the files by the main business logic. For example, the tap logic filters for tuples and sends an event to another application or another system if the filter condition is met. The spl.adapter::Export operator or any sink operator like the spl.adapter::TCPSink operator might be used with the tap data tuples.

    Turn the ite.businessLogic.transformation.tap parameter on to activate this composite operator.

  • <namespace>.tap.custom::PostContextDataProcessorTap

    PostContextDataProcessorTap

    Implements features that do not alter the data that is stored in the files by the main business logic. For example, the tap logic filters for tuples and sends an event to another application or another system if the filter condition is met. The spl.adapter::Export operator or any sink operator like the spl.adapter::TCPSink operator might be used with the tap data tuples.

    Turn the ite.businessLogic.group.tap parameter on to activate this composite operator.

  • <namespace>.context.custom::ContextDataProcessor

    ContextDataProcessor

    Implements a processing that is applied on a group of tuples. The group tuples can belong to multiple input files. Typically, one or more operators of this composite operator are stateful. For example, you can aggregate tuples.

    Turn the ite.businessLogic.group.custom parameter on to activate this composite operator.

  • <namespace>.chainprocessor.transformer.custom::DataProcessor

    DataProcessor

    Implements the business logic that can consist of, for example, filtering, enrichment, calculations, or other transformations.

    This composite operator is the only one that must be adapted during the customization process.

  • <namespace>.fileingestion.custom::FileSort

    FileSort

    Implements a sorting algorithm to sort the file names that are collected during a directory scan cycle. A window punctuation marks the end of the file name set that is sorted.

    Sorting can be used to, for example, ensure that the oldest files in the set are processed first.

    Set the ite.ingest.directoryScan.sort parameter to custom to activate this composite operator.

  • <namespace>.fileingestion.custom::FileTypeValidator

    FileTypeValidator

    Implements a file type validation. The determined file type is used to send the file content to the parser that is able to handle this content.

    Valid file types set the file type ID and pass through to the first output port, invalid or unknown file types go to the second output port. The implemented algorithm that can, for example, evaluate the file name or the file content, must be consistent with the types that are specified with the ite.ingest.reader.parserList parameter.

    Turn the ite.ingest.customFileTypeValidator parameter on to activate this composite operator.

  • <namespace>.housekeeping.context.custom::CheckpointFileRemover

    CheckpointFileRemover

    Implements the algorithm to decide which checkpoint files of the custom context are removed because they are obsolete, and which files are processed during the startup and housekeeping phases.

    The algorithm typically calculates the age of a file and removes the file if it is too old. For files that are kept, a tuple is sent to the output port.

    The default implementation removes files with a file modification time that is older than the specified ite.businessLogic.group.custom.daysToKeep parameter counted from now.

    You can customize the algorithm to fit your needs. For example, you can extract the time information from the file names, or you can suppress the tuples for the kept files if you do not want the files to be processed during the startup or housekeeping phase.

    For more information see the code comments.

The following types can be customized:

  • <namespace>.streams.custom::ReaderRecordType (ReaderTypes.spl)

    Defines the common record schema that is the superset of all required attributes across all parsing operators. For example, your application supports the ASN.1 and CSV input formats. Each input format has a large set of attributes but your business logic requires three attributes from the ASN.1 and four attributes from the CSV input only. Two attributes have the same semantic for both input formats. The common record schema has five attributes, two with the same semantic from both inputs, one attribute for ASN.1 inputs and two attributes for CSV inputs.

  • <namespace>.streams.custom::TypesCustom.rejectReason (enumeration)

    Add further rejection reasons for tuples. The rejection reasons can be used during record validation or transformation in the following composite operators:

    • <namespace>.chainprocessor.reader.custom::RecordValidator
    • <namespace>.chainprocessor.transformer.custom::DataProcessor
    • <namespace>.chainsink.custom::PostContextDataProcessor
  • <namespace>.streams.custom::TypesCustom.LookupType

    Defines the SPL attributes that get the enrichment data values assigned.

  • <namespace>.streams.custom::TypesCustom.ExtendedTableStream

    Specifies SPL attributes that are still accessible after the schema reduction from the business data schema (<namespace>.streams.custom::ReaderRecordType) to the table schema, which consists of a table name and a row attribute only. Typically, you set the ExtendedTableStream attributes in the <namespace>.chainprocessor.transformer.custom::DataProcessor composite operator and use them in the <namespace>.context.custom::ContextDataProcessor composite operator.

    For example, the provided demo application transforms the business data schema to the table schema in the demoapp.chainprocessor.transformer.custom::DataProcessor composite operator. The business data attributes do not exist anymore after this transformation. Since the demoapp.context.custom::ContextDataProcessor composite operator that aggregates information, needs some business data to implement its functionality, the demo application is customized with this <namespace>.streams.custom::TypesCustom.ExtendedTableStream type.

    Set the ite.businessLogic.transformation.outputType parameter to extendedTableStream to activate this type.

  • <namespace>.streams.custom::TypesCustom.Table1 and <namespace>.streams.custom::TypesCustom.Table1StreamType (type templates)

    Specify the schema and additional information for a table file. For each table file, copy, rename, and adapt these type definitions. The last attribute of each type definition is mandatory. The hashCode attribute must be present if the ite.businessLogic.group.deduplication parameter is set to on, else it must not.

    If the ite.businessLogic.transformation.outputType parameter is set to tableStream or extendedTableStream, you can use these type definitions in the following composite operators to send table records with the <namespace>.utility.TableRowGenerator operator:

    • <namespace>.chainprocessor.transformer.custom::DataProcessor
    • <namespace>.chainsink.custom::PostContextDataProcessor
  • <namespace>.streams.custom::TypesCustom.TransformedRecord

    Specifies the output schema of the pre context business logic (<namespace>.chainprocessor.transformer.custom::DataProcessor) that is used if the ite.businessLogic.transformation.outputType parameter is set to recordStream.

  • <namespace>.streams.custom::TypesCustom.ChainSinkType

    Specifies the output schema of the post context business logic (<namespace>.chainsink.custom::PostContextDataProcessor) that is used if the ite.businessLogic.transformation.outputType parameter is set to recordStream and the ite.storage.type parameter is set to recordFile or custom. If the ite.storage.type parameter is set to tableFile, the output type is <namespace>.streams::TypesCommon.TableDataType that is not customizable.

    Typically, this type is customized if attributes that are required in the business logic, have to be removed before writing the result files.

  • <namespace>.streams.custom::TypesCustom.CustomParserStatisticsStreamType

    Specifies the parser statistics if the ite.ingest.reader.customParserStatistics parameter is set to on. The specified SPL attributes replace the default statistics of the ASN.1, CSV, and structure parsers.

    The default statistics of the built-in parsers use the statistics that the CSVParse, ASN1Parse, and StructureParse operators provide.

  • <namespace>.streams.custom::TypesCustom.CustomFileStatisticsStreamType

    Specifies the file statistics if the ite.ingest.reader.customFileStatistics parameter is set to on. The specified SPL attributes replace the default table statistics that are specified with the <namespace>.streams::TypesCommon.TableStatistics type and that provide information about the number of new records, records that exist and must be updated, or duplicate records.

Procedure

Based on your business needs, decide which customizable composite operator needs to be modified. For each identified composite operator, complete the following steps:

  1. Open the composite operator with the SPL editor.
  2. Modify the composite operator.
  3. If required, complete configuration tasks.
  4. Save the composite operator.
  5. Build the application.
Customizable composite operators and their inputs and outputs
The following figure shows the customizable composite operators, their inputs and outputs and how these composite operators are connected to each other.