Producing messages on Kafka topics

You can use the KafkaProducer node to connect to the Kafka messaging system and publish messages on Kafka topics.

Before you begin

Read the following topics:

About this task

You can use the KafkaProducer node to publish messages that are generated from within your message flow to a topic that is hosted on a Kafka server. The published messages are then available to be received by consumers (subscribers) reading from the topic. You can use a KafkaConsumer node in a message flow to subscribe to a specified topic on a Kafka server. For more information about using the KafkaConsumer node , see Consuming messages from Kafka topics.

The KafkaProducer node publishes messages non-transactionally to the Kafka server, and they are available to be read by consuming applications as soon as they are published. Because the publish operation is non-transactional, if the flow is rolled back after the message has passed through the KafkaProducer node, the publication of the message to the Kafka server is not rolled back. However, you can use the Acks property on the KafkaProducer node to configure synchronous processing of the message, by specifying that the KafkaProducer node must wait for confirmation that the message has been successfully received by the Kafka server before continuing in the flow. The options that you can specify for the Acks property are described in the steps of this task topic.

Procedure

Complete the following steps to use IBM Integration Bus to publish messages to a topic on a Kafka server:

  1. Create a message flow containing an input node, such as an HTTPInput node, and a KafkaProducer node.
    For information about how to create a message flow, see Creating a message flow.
  2. Configure the KafkaProducer node by setting the following properties:
    1. On the Basic tab, set the following properties:
      • In the Topic name property, specify the name of the Kafka topic on which you want to publish messages.

        The topic name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash). The topic name can be changed dynamically by setting a local environment override; for more information, see Using local environment variables with Kafka nodes.

      • In the Bootstrap servers property, specify the host name and port of the Kafka server that you want to connect to; for example, if you are using IBM Event Streams (Kafka on IBM Cloud), specify the address of that server.
      • In the Client ID property, specify the client name to be used when connecting to the Kafka server.

        The client name can be up to 255 characters in length, and can include the following characters: a-z, A-Z, 0-9, . (dot), _ (underscore), and - (dash).

      • Use the Add IIB suffix to client ID property to specify whether you want to suffix the client ID with the name of the integration server and integration node. This property is selected by default, and adds the integration server and integration node name to the end of the client ID, in the following format:
        'integration_server_name'-'integration_node_name'
      • In the Acks property, select the number of acknowledgements that are expected from the kafka cluster in order to consider the message successfully published.

        If this property is set to 0, the KafkaProducer node does not wait for any acknowledgement that the publish request has been processed by the Kafka server. This is equivalent to a 'fire and forget' mode of operation.

        If this property is set to 1, the KafkaProducer node waits for a single acknowledgement from the Kafka server.

        If this property is set to All, the KafkaProducer node waits for acknowledgements from all replicas of the topic. This option provides the strongest available guarantee that the record was received.

      • In the Timeout property, specify the time (in seconds) to wait for a request to complete. The default value is 60 seconds.
    2. On the Security tab, set the following properties:
      • In the Security protocol property, select the protocol to be used when communicating with the integration node. Valid values are:
        • PLAINTEXT
        • SSL
        • SASL_PLAINTEXT
        • SASL_SSL

        The default value for this property is PLAINTEXT.

        Note: If you are using Event Streams, this property must be set to SASL_SSL.

        If either SASL_PLAINTEXT or SASL_SSL is selected, you must configure the user ID and password that will be used to authenticate with the Kafka server, by running the mqsisetdbparms command with a DataSource name of kafka::KAFKA::integration_server_name.

        Start of changeAlternatively, to configure the KafkaProducer node to use a security identity other than the default kafka::KAFKA::integration_server_name, update the value of the Security identity property, or use the mqsiapplybaroverride command. If the Security identity field is blank, the default value kafka::KAFKA::integration_server_name is used.End of change

      • In the SSL protocol property, select the SSL protocol to be used if the Security protocol property is set to either SSL or SASL_SSL. You can select one of the following values from the editable list, or you can specify an alternative value:
        • TLSv1
        • TLSv1.1
        • TLSv1.2
        The default value for this property is TLSv1.2.
    For information about other properties that you can set for the KafkaProducer node, see KafkaProducer node.

What to do next

For information about properties that can be overridden dynamically in the flow, see Using local environment variables with Kafka nodes.