KafkaConsumer node
Use the KafkaConsumer node to connect to the Kafka messaging system and to receive messages that are published on a Kafka topic. IBM® Integration Bus can then propagate these messages in a message flow.
- Developer
- Application Integration Suite
- Standard
- Advanced
- Express
- Scale
- Adapter
This topic contains the following sections:
Purpose
You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. The KafkaConsumer then receives messages published on the Kafka topic as input to the message flow. Each KafkaConsumer node consumes messages from a single topic; however, if the topic is defined to have multiple partitions, the KafkaConsumer node can receive messages from any of the partitions. For more information, see Processing Kafka messages.
For information about the supported versions of Kafka, see IBM Integration Bus system requirements. For more information about Kafka version compatibility, see the Apache Kafka documentation.
The KafkaConsumer node handles messages in the following message domains:
- DFDL
- XMLNSC
- JSON
- BLOB
- MIME
- XMLNS
- MRM
The KafkaConsumer node is contained in the Kafka drawer of the palette, and is represented in the IBM Integration Toolkit by the following icon:
Using the KafkaConsumer node in a message flow
Use the KafkaConsumer node in a message flow to receive messages that are published to topics that are hosted on a Kafka server. The received messages can then be processed in a message flow. For information about how to use the node, see Consuming messages from Kafka topics.
Terminals and properties
The KafkaConsumer node terminals are described in the following table.
Terminal | Description |
---|---|
Failure | The output terminal to which the message is routed if a failure is detected during processing in the node. |
Out | The output terminal to which the message is routed if it represents successful completion of the Kafka request, and if further processing is required within this message flow. |
Catch | The output terminal to which the message is routed if an exception is thrown downstream and caught by this node. Exceptions are caught only if this terminal is attached. |
The following tables describe the node properties. The column headed M indicates whether the property is mandatory (marked with an asterisk on the panel if you must enter a value when no default is defined); the column headed C indicates whether the property is configurable (you can change the value when you add the message flow to the BAR file to deploy it).
The KafkaConsumer node Description properties are described in the following table.
Property | M | C | Default | Description |
---|---|---|---|---|
Node name | No | No | The node type, KafkaConsumer | The name of the node. |
Short description | No | No | A brief description of the node. | |
Long description | No | No | Text that describes the purpose of the node in the message flow. |
The KafkaConsumer node Basic properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Topic name | Yes | Yes | The name of the Kafka topic to which you want to subscribe. Only one topic name can be specified. The name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). | topicName | |
Bootstrap servers | Yes | Yes | A list of host/port pairs, separated by spaces, to use for establishing the initial connection to the Kafka cluster. | bootstrapServers | |
Consumer group ID | Yes | Yes | A string that identifies the consumer group to which this consumer belongs. This value can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). | groupId | |
Default message offset | Yes | Yes | Latest | The message offset for the consumer if no message
offset exists, such as the first time the consumer starts, or if the
message offset is not valid. Possible values are:
|
initialOffset |
Commit message offset in Kafka | No | Yes | Yes | Automatically saves the current message offset in Kafka, which allows messages to be consumed from the saved position when the consumer is restarted. | enableAutoCommit |
Client ID | No | Yes | The client name to be used when connecting to Kafka. This value can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). | clientId | |
Add IIB suffix to client ID | No | Yes | Yes | If selected, this property suffixes the provided
client ID with the following string: This
option is selected by default. |
useClientIdSuffix |
The KafkaConsumer node Advanced properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Connection timeout (sec) | Yes | Yes | 15 | The maximum time (in seconds) that the KafkaConsumer node waits to establish a connection with the Kafka server. The Connection timeoutvalue must be greater than the Session timeout value. | connectionTimeout |
Session timeout (sec) | Yes | Yes | 10 | The timeout used to detect KafkaConsumer node failures when using Kafka's group management facility. The KafkaConsumer node sends periodic heartbeats to indicate its liveness to the Kafka server. If no heartbeats are received by the Kafka server before the expiration of this session timeout, the Kafka server removes this Kafka consumer from the group and initiates a rebalance. The minimum valid value for this property is 10 seconds, which ensures that the session timeout is greater than the length of time between heartbeats. The Session timeout value must be less than the Connection timeout value. | sessionTimeout |
Receive batch size | Yes | Yes | 1 | The number of records that are received from the Kafka server in a single batch. This property is used for the max.poll.records value specified by the KafkaConsumer node when receiving messages from the Kafka server. | receiveBatchSize |
The KafkaConsumer node Security properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Security protocol | Yes | Yes | PLAINTEXT | The protocol to be used when communicating with the Kafka server. Valid values
are:
If you are using IBM Event Streams, this property must be set to SASL_SSL. If either SASL_PLAINTEXT or SASL_SSL is selected, you must configure the user ID and password that will be used to
authenticate with the Kafka server, by running the mqsisetdbparms command with a DataSource name of
Alternatively, to configure the KafkaConsumer node to use a
security identity other than the default
|
securityProtocol |
SSL protocol | Yes | Yes | TLSv1_2 | The SSL protocol that is used when the selected
value for the Security protocol property
is either SSL or SASL_SSL. You can select one
of the following values from the editable list, or you can specify
an alternative value:
|
sslProtocol |
The KafkaConsumer node Input Message Parsing properties are described in the following table.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Message domain | No | No | BLOB | The parser domain that is used to parse the input message. If the field is blank, the default is BLOB. | |
Message model | No | No | Cleared | The name or location of the message model schema
file in which the message is defined. When you click Browse, you see a list of available message model schema files for the selected Message domain. |
|
Message | No | No | Cleared | The name or location of the message root within your message model schema file. This list is populated with all available messages that are defined in the Message model that you selected. | |
Physical format | No | No | Cleared | The name of the physical format of the message. If you are using the MRM or IDOC parser, select the physical format of the incoming message from the list. This list includes all the physical formats that you have defined for the selected message model. If you set the Message domain property to DataObject, you can set this property to XML or SAP ALE IDoc. Set this property to SAP ALE IDoc when you have to parse a bit stream from an external source and generate a message tree. |
The KafkaConsumer node Parser Options properties are described in the following table.
Property | M | C | Default | Description |
---|---|---|---|---|
Parse timing | No | No | On Demand | This property controls when an input message
is parsed. Valid values are On
Demand, Immediate,
and Complete. For a full description of this property, see Parsing on demand. |
Build tree using XML schema data types | No | No | Cleared | This property controls whether the syntax elements in the message tree have data types taken from the XML schema. |
Use XMLNSC compact parser for XMLNS domain | No | No | Cleared | This property specifies whether the XMLNSC Compact
Parser is used for messages in the XMLNS domain. If you set this property,
the message data is displayed under XMLNSC in nodes that are connected
to the output terminal when the input MQRFH2 header is XMLNS . |
Retain mixed content | No | No | Cleared | This property controls whether the XMLNSC parser creates elements in the message tree for mixed text in an input message. If you select the check box, elements are created for mixed text. If you clear the check box, mixed text is ignored and no elements are created. |
Retain comments | No | No | Cleared | This property specifies whether the XMLNSC parser creates elements in the message tree for comments in an input message. If you select the check box, elements are created for comments. If you clear the check box, comments are ignored and no elements are created. |
Retain processing instructions | No | No | Cleared | This property controls whether the XMLNSC parser creates elements in the message tree for processing instructions in an input message. If you select the check box, elements are created for processing instructions. If you clear the check box, processing instructions are ignored and no elements are created. |
Opaque elements | No | No | Blank | This property is used to specify a list of elements in the input message that are to be opaquely parsed by the XMLNSC parser. Opaque parsing is performed only if validation is not enabled (that is, if Validate is None); entries that are specified in Opaque Elements are ignored if validation is enabled. |
The KafkaConsumer node Validation properties are described in the following table.
For a full description of these properties see Validation properties.
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Validate | No | Yes | None | This property controls whether validation takes place. Valid values are None, Content and Value, and Content. | validateMaster |
Validation failure action | No | Yes | Exception | This property controls what happens if validation fails. You can set this property only if you set Validate to Content or Content and Value. Valid values are User Trace, Local Error Log, Exception, and Exception List. | validateFailureAction |
Property | M | C | Default | Description | mqsiapplybaroverride command property |
---|---|---|---|---|---|
Additional instances pool | No | Yes | Use Pool Associated with Message Flow | The pool from which additional instances are
obtained.
|
componentLevel |
Additional instances | No | Yes | 0 | The number of additional instances that the node can start if the Additional instances pool property is set to Use Pool Associated with Node. By default, no additional instances are given to the node. | additionalInstances |
Property | M | C | Default | Description |
---|---|---|---|---|
Events | No | No | None | Events that you have defined for the node are
displayed on this tab. By default, no monitoring events are defined
on any node in a message flow. Use Add, Edit,
and Delete to create, change or delete monitoring
events for the node; see Configuring monitoring event sources by using monitoring properties for details. You can enable and disable events that are shown here by selecting or clearing the Enabled check box. |
Local environment overrides
You can view values for Kafka topics that have been received by the KafkaConsumer node. For more information, see Using local environment variables with Kafka nodes.