KafkaConsumer node

Use the KafkaConsumer node to connect to the Kafka messaging system and to receive messages that are published on a Kafka topic. IBM® Integration Bus can then propagate these messages in a message flow.

The KafkaConsumer node is available in the following operation modes:
  • Developer
  • Application Integration Suite
  • Standard
  • Advanced
  • Express
  • Scale
  • Adapter
For more information, see Operation modes.

This topic contains the following sections:

Purpose

You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. The KafkaConsumer then receives messages published on the Kafka topic as input to the message flow. Each KafkaConsumer node consumes messages from a single topic; however, if the topic is defined to have multiple partitions, the KafkaConsumer node can receive messages from any of the partitions. For more information, see Processing Kafka messages.

For information about the supported versions of Kafka, see IBM Integration Bus system requirements. For more information about Kafka version compatibility, see the Apache Kafka documentation.

The KafkaConsumer node handles messages in the following message domains:

  • DFDL
  • XMLNSC
  • JSON
  • BLOB
  • MIME
  • XMLNS
  • MRM

The KafkaConsumer node is contained in the Kafka drawer of the palette, and is represented in the IBM Integration Toolkit by the following icon:

KafkaConsumer node icon

Using the KafkaConsumer node in a message flow

Use the KafkaConsumer node in a message flow to receive messages that are published to topics that are hosted on a Kafka server. The received messages can then be processed in a message flow. For information about how to use the node, see Consuming messages from Kafka topics.

Terminals and properties

The KafkaConsumer node terminals are described in the following table.

Terminal Description
Failure The output terminal to which the message is routed if a failure is detected during processing in the node.
Out The output terminal to which the message is routed if it represents successful completion of the Kafka request, and if further processing is required within this message flow.
Catch The output terminal to which the message is routed if an exception is thrown downstream and caught by this node. Exceptions are caught only if this terminal is attached.

The following tables describe the node properties. The column headed M indicates whether the property is mandatory (marked with an asterisk on the panel if you must enter a value when no default is defined); the column headed C indicates whether the property is configurable (you can change the value when you add the message flow to the BAR file to deploy it).

The KafkaConsumer node Description properties are described in the following table.

Property M C Default Description
Node name No No The node type, KafkaConsumer The name of the node.
Short description No No   A brief description of the node.
Long description No No   Text that describes the purpose of the node in the message flow.

The KafkaConsumer node Basic properties are described in the following table.

Property M C Default Description mqsiapplybaroverride command property
Topic name Yes Yes   The name of the Kafka topic to which you want to subscribe. Only one topic name can be specified. The name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). topicName
Bootstrap servers Yes Yes   A list of host/port pairs, separated by spaces, to use for establishing the initial connection to the Kafka cluster. bootstrapServers
Consumer group ID Yes Yes   A string that identifies the consumer group to which this consumer belongs. This value can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). groupId
Default message offset Yes Yes Latest The message offset for the consumer if no message offset exists, such as the first time the consumer starts, or if the message offset is not valid. Possible values are:
  • Earliest
  • Latest
initialOffset
Commit message offset in Kafka No Yes Yes Automatically saves the current message offset in Kafka, which allows messages to be consumed from the saved position when the consumer is restarted. enableAutoCommit
Client ID No Yes   The client name to be used when connecting to Kafka. This value can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). clientId
Add IIB suffix to client ID No Yes Yes If selected, this property suffixes the provided client ID with the following string:
'integration_server_name'-'integration_node_name'
This option is selected by default.
useClientIdSuffix

The KafkaConsumer node Advanced properties are described in the following table.

Property M C Default Description mqsiapplybaroverride command property
Connection timeout (sec) Yes Yes 15 The maximum time (in seconds) that the KafkaConsumer node waits to establish a connection with the Kafka server. The Connection timeoutvalue must be greater than the Session timeout value. connectionTimeout
Session timeout (sec) Yes Yes 10 The timeout used to detect KafkaConsumer node failures when using Kafka's group management facility. The KafkaConsumer node sends periodic heartbeats to indicate its liveness to the Kafka server. If no heartbeats are received by the Kafka server before the expiration of this session timeout, the Kafka server removes this Kafka consumer from the group and initiates a rebalance. The minimum valid value for this property is 10 seconds, which ensures that the session timeout is greater than the length of time between heartbeats. The Session timeout value must be less than the Connection timeout value. sessionTimeout
Receive batch size Yes Yes 1 The number of records that are received from the Kafka server in a single batch. This property is used for the max.poll.records value specified by the KafkaConsumer node when receiving messages from the Kafka server. receiveBatchSize

The KafkaConsumer node Security properties are described in the following table.

Property M C Default Description mqsiapplybaroverride command property
Security protocol Yes Yes PLAINTEXT The protocol to be used when communicating with the Kafka server. Valid values are:
  • PLAINTEXT
  • SSL
  • SASL_PLAINTEXT
  • SASL_SSL

If you are using IBM Event Streams, this property must be set to SASL_SSL.

If either SASL_PLAINTEXT or SASL_SSL is selected, you must configure the user ID and password that will be used to authenticate with the Kafka server, by running the mqsisetdbparms command with a DataSource name of kafka::KAFKA::integration_server_name.

Start of changeAlternatively, to configure the KafkaConsumer node to use a security identity other than the default kafka::KAFKA::integration_server_name, update the value of the Security identity property. If the Security identity field is blank, the default value kafka::KAFKA::integration_server_name is used.End of change

securityProtocol
SSL protocol Yes Yes TLSv1_2 The SSL protocol that is used when the selected value for the Security protocol property is either SSL or SASL_SSL. You can select one of the following values from the editable list, or you can specify an alternative value:
  • TLSv1
  • TLSv1.1
  • TLSv1.2
sslProtocol

The KafkaConsumer node Input Message Parsing properties are described in the following table.

Property M C Default Description mqsiapplybaroverride command property
Message domain No No BLOB The parser domain that is used to parse the input message. If the field is blank, the default is BLOB.  
Message model No No Cleared The name or location of the message model schema file in which the message is defined.

When you click Browse, you see a list of available message model schema files for the selected Message domain.

 
Message No No Cleared The name or location of the message root within your message model schema file. This list is populated with all available messages that are defined in the Message model that you selected.  
Physical format No No Cleared The name of the physical format of the message. If you are using the MRM or IDOC parser, select the physical format of the incoming message from the list. This list includes all the physical formats that you have defined for the selected message model. If you set the Message domain property to DataObject, you can set this property to XML or SAP ALE IDoc. Set this property to SAP ALE IDoc when you have to parse a bit stream from an external source and generate a message tree.  

The KafkaConsumer node Parser Options properties are described in the following table.

Property M C Default Description
Parse timing No No On Demand This property controls when an input message is parsed. Valid values are On Demand, Immediate, and Complete.

For a full description of this property, see Parsing on demand.

Build tree using XML schema data types No No Cleared This property controls whether the syntax elements in the message tree have data types taken from the XML schema.
Use XMLNSC compact parser for XMLNS domain No No Cleared This property specifies whether the XMLNSC Compact Parser is used for messages in the XMLNS domain. If you set this property, the message data is displayed under XMLNSC in nodes that are connected to the output terminal when the input MQRFH2 header is XMLNS.
Retain mixed content No No Cleared This property controls whether the XMLNSC parser creates elements in the message tree for mixed text in an input message. If you select the check box, elements are created for mixed text. If you clear the check box, mixed text is ignored and no elements are created.
Retain comments No No Cleared This property specifies whether the XMLNSC parser creates elements in the message tree for comments in an input message. If you select the check box, elements are created for comments. If you clear the check box, comments are ignored and no elements are created.
Retain processing instructions No No Cleared This property controls whether the XMLNSC parser creates elements in the message tree for processing instructions in an input message. If you select the check box, elements are created for processing instructions. If you clear the check box, processing instructions are ignored and no elements are created.
Opaque elements No No Blank This property is used to specify a list of elements in the input message that are to be opaquely parsed by the XMLNSC parser. Opaque parsing is performed only if validation is not enabled (that is, if Validate is None); entries that are specified in Opaque Elements are ignored if validation is enabled.

The KafkaConsumer node Validation properties are described in the following table.

For a full description of these properties see Validation properties.

Property M C Default Description mqsiapplybaroverride command property
Validate No Yes None This property controls whether validation takes place. Valid values are None, Content and Value, and Content. validateMaster
Validation failure action No Yes Exception This property controls what happens if validation fails. You can set this property only if you set Validate to Content or Content and Value. Valid values are User Trace, Local Error Log, Exception, and Exception List. validateFailureAction
The KafkaConsumer node Instances properties are described in the following table. For a full description of these properties, refer to Configurable properties in a BAR file.
Property M C Default Description mqsiapplybaroverride command property
Additional instances pool No Yes Use Pool Associated with Message Flow The pool from which additional instances are obtained.
  • If you select Use Pool Associated with Message Flow, additional instances are obtained from the message flow value.
  • If you select Use Pool Associated with Node, additional instances are allocated from the additional instances of the node based on the number specified in the Additional instances property.
componentLevel
Additional instances No Yes 0 The number of additional instances that the node can start if the Additional instances pool property is set to Use Pool Associated with Node. By default, no additional instances are given to the node. additionalInstances
The Monitoring properties of the node are described in the following table.
Property M C Default Description
Events No No None Events that you have defined for the node are displayed on this tab. By default, no monitoring events are defined on any node in a message flow. Use Add, Edit, and Delete to create, change or delete monitoring events for the node; see Configuring monitoring event sources by using monitoring properties for details.

You can enable and disable events that are shown here by selecting or clearing the Enabled check box.

Local environment overrides

You can view values for Kafka topics that have been received by the KafkaConsumer node. For more information, see Using local environment variables with Kafka nodes.