Setting up IBM Event Streams

Learn how to set up the solution including IBM Event Streams, Kafka Cluster, Kafka Connect, and Connectors.

There are different options available to create a Kafka environment with IBM Event Streams:

  • Use the Operator details view of the IBM Event Streams operator create the required resources

  • Create a new Integration instance of type Kafka cluster via the IBM Cloud Pak for Integration UI / Platform UI

After creating the Kafka cluster/EventStreams resource you can either:

  • Manage resources in the IBM Event Streams Operator overview

  • Use the IBM Event Stream UI

In the following procedures both user interfaces are used to create and manage the Kafka cluster.

Setting up Kafka Connect

  1. Create a Kafka cluster instance (also known as an Event Stream resource).

    1. Go to the Cloud Pak for Integration UI.

      Open Platform UI

      Click Integration instances:

      Select Integration instances

      Click Create an instance, select Event-Streams, click Next, select Development, click Next, accept the license, click Create.

      Note: Do not select the option: minimal without security, as this will lead to connection issues when following this guide.
    2. ALTERNATIVE workflow:

      1. As Administrator, go to Operators > Installed Operators and find IBM Event Streams. Some blue links are displayed in the Provided APIs column. Click the Event Stream link.

      2. Click Create EventStreams.

      3. Switch from Form view to YAML view and select the Samples tab in the right panel.

      4. Click Development Sample > Try it.
        Note: The options presented under samples are similar to the options you get when you create the EventStreams resource via the Cloud Pak for Integration UI.
      5. Press Create.

  2. Create a KafkaConnect environment.

    The KafkaConnect needs to be able to connect to MongoDB, so it must include the libraries for it to do so. To make this work a custom docker image is built that includes the required .jar files for the MongoDbSourceConnector and MongoDbSinkConnector.

    The steps in this procedure require that you already have an internal image registry setup within OpenShift and that you can push/pull to it from the Bastion node.

    1. The Dockerfile for the Kafka Connect environment can be downloaded from the IBM Event Streams UI Toolbox. To get there, open the IBM Event Streams UI:

      CP4I - Go to Event Stream UI

      Go to Toolbox:

      IBM Event Streams - Toolbox

      Select Set up a Kafka Connect environment and proceed with the Set up:

      IBM Event Streams - Toolbox - Set up a Kafka Connect environment

      Download the Kafka Connect ZIP file, move it to the bastion and unzip it to a folder called kafkaconnect:

      IBM Event Streams - Download Kafka Connect ZIP file
      unzip kafkaconnect.zip -d kafkaconnect
    2. Download the mongo-kafka-connect-1.10.1-all.jar file from org/mongodb/kafka/mongo-kafka-connect/1.10.1 to the bastion.

      curl https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.10.1/mongo-kafka-connect-1.10.1-all.jar --output mongo-kafka-connect-1.10.1-all.jar
    3. Get the mongo-kafka-connect-1.10.1-all.jar file from org/mongodb/kafka/mongo-kafka-connect/1.10.1
      Note: When a newer version is available you might want to try it.
      Note: Check out the instructions about which jar file to download in Connector catalog. When we tried the jar file from here, KafkaConnect did not find the required library methods.
    4. Copy the mongo-kafka-connect-1.10.1-all.jar file to the my-plugins folder of the unpacked zip archive:

      cp mongo-kafka-connect-1.10.1-all.jar kafkaconnect/my-plugins/
    5. As the Dockerfile pulls an image from cp.icr.io you need to make sure that you have access to this public container registry. You should be able to log in into the container registry with the entitlement key used during the Cloud Pak for Integration setup (see IBM Cloud Pak for Data instructions). Make sure that you are able to log in to the internal image registry of your OpenShift deployment.

      1. Build and tag the image:

        # authenticate
        podman login cp.icr.io -u cp -p "${ENTITLENMENTKEY}"
        
        # cd into the directory with the Dockerfile
        podman build . -t my-connect-cluster-image-1
      2. Push the new image to the internal image registry. When the oc get route command is not working check out: How to expose the registry:

        # Get the route to the internal OpenShift image registry
        HOST=$(oc get route default-route -n openshift-image-registry --template='{{ .spec.host }}')
        
        # Login to the internal docker image registry
        podman login -u admin -p $(oc whoami -t) ${HOST}
        
        # Push the image
        NAMESPACE="openshift"
        IMAGE_NAME="my-connect-cluster-image-1:latest"
        LOCAL_IMAGE_NAME="localhost/${IMAGE_NAME}"
        REMOTE_IMAGE_NAME="${IMAGE_NAME}"
        podman push ${LOCAL_IMAGE_NAME} ${HOST}/${NAMESPACE}/${REMOTE_IMAGE_NAME}
    6. Generate credentials so that the KafkaConnect environment can connect to the Kafka cluster. In the IBM Event Stream UI: Click the tile Connect to this cluster.

      IBM Event Streams - Connect to this Cluster
    7. Then click Generate SCRAM credentials.

      IBM Event Streams - Generate SCRAM credentials

      Select the most liberal options. The credential name used in the following is: my-credentials.

      Generate SCRAM - Step 1 Select All topics and click next for the access.

      Generate SCRAM - Step 2 Select All consumer group and click next.

      Generate SCRAM - Step 3

      Select All transactional IDs and click next. Generate SCRAM - Step 4

      Note: This automatically creates a Kafka user.
    8. The connection to the Kafka cluster is secured by TLS, so you need to trust the CA certificate used by the Kafka cluster/Event Stream cluster.

      1. To find the certificate, go to Installed Operators, find IBM Event Streams, then click Event Streams. Click the Event Stream resource and go to the YAML view. The certificate is under kafkaListeners. Copy the certificate to a local plain-text file.

        Event Streams - TLS certificate location
      2. Create a new secret by clicking Workloads, Secrets. Create a Key/value secret then enter the following information:

        • Name: tls-cert-of-development-external

        • Key: tls.crt

        • Value: DRAG AND DROP the plain-text file containing the certificate here.

      3. Click Create.

    9. In the folder with the Dockerfile you also find a kafka-connect.yaml file. Make a backup of the file and edit it. Change it according to your environment. Compare your edits with the following file to match the details:

      apiVersion: eventstreams.ibm.com/v1beta2
      kind: KafkaConnect
      metadata:
        name: my-kafka-connect-external-bootstrap
        annotations:
          eventstreams.ibm.com/use-connector-resources: "true"
      spec:
        replicas: 1
        bootstrapServers: development-kafka-bootstrap-cloudpack4integration.apps.ocp0.sa.boe:443
        image: default-route-openshift-image-registry.apps.ocp0.sa.boe/openshift/my-connect-cluster-image-1:latest
        template:
          pod:
            imagePullSecrets: []
            metadata:
              annotations:
                eventstreams.production.type: CloudPakForIntegrationNonProduction
                productID: 2a79e49111f44ec3acd89608e56138f5
                productName: IBM Event Streams for Non Production
                productVersion: 11.2.1
                productMetric: VIRTUAL_PROCESSOR_CORE
                productChargedContainers: my-connect-cluster-connect
                cloudpakId: c8b82d189e7545f0892db9ef2731b90d
                cloudpakName: IBM Cloud Pak for Integration
                productCloudpakRatio: "2:1"
        config:
          group.id: connect-cluster
          offset.storage.topic: connect-cluster-offsets
          config.storage.topic: connect-cluster-configs
          status.storage.topic: connect-cluster-status
          config.storage.replication.factor: 3
          offset.storage.replication.factor: 3
          status.storage.replication.factor: 3
        tls:
          trustedCertificates:
            - secretName: tls-cert-of-development-external
              certificate: tls.crt
        authentication:
          type: scram-sha-512
          username: my-credentials
          passwordSecret:
            secretName: my-credentials
            password: password
    10. Apply the edited yaml file with:

      oc project cloudpack4integration
      oc apply -f kafka-connect.yaml
      Note: If the image couldn't be pulled because of permissions, refer to the official OpenShift documentation for information about how to allow the image pull.
  3. To create the Source Connector that listens for changes, apply the source-connector.yaml with oc apply -f source-connector.yaml:

    apiVersion: eventstreams.ibm.com/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-source-connector
      labels:
        eventstreams.ibm.com/cluster: my-kafka-connect-external-bootstrap
    spec:
      class: com.mongodb.kafka.connect.MongoSourceConnector
      tasksMax: 3
      config:
        connection.uri: mongodb://myUserAdmin:password@ibm-mongodb-enterprise-helm-service.mongodb-test-1.svc.cluster.local:27017
        database: "warehouse"
        collection: "inventory"
    
        topic.prefix: "mongo"
    
        copy.existing: true
    
        key.converter: org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable: false
    
        value.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter.schemas.enable: false
    
        publish.full.document.only: true
    
        pipeline: >
          [{"$match":{"operationType":{"$in":["insert","update","replace"]}}},{"$project":{"_id":1,"fullDocument":1,"ns":1,"documentKey":1}}]
  4. To create the Sink Connector that writes the changes to an external MongoDB instance, apply the sink-connector.yaml with oc apply -f sink-connector.yaml:

    apiVersion: eventstreams.ibm.com/v1beta2
    kind: KafkaConnector
    metadata:
      name: my-sink-connector
      labels:
        eventstreams.ibm.com/cluster: my-kafka-connect-external-bootstrap
    spec:
      class: com.mongodb.kafka.connect.MongoSinkConnector
      tasksMax: 3
      config:
        connection.uri: 'mongodb://mongodb.example.com:27017'
        database: "shop"
        collection: "inventory"
    
        # comma separated list
        topics: "mongo.warehouse.inventory"
    
        post.processor.chain: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
        key.converter: org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable: false
        value.converter: org.apache.kafka.connect.json.JsonConverter
        value.converter.schemas.enable: false
    Note: If you don't have an external MongoDB instance you can also use the same MongoDB that was used in the SourceConnector to test if the connectors work properly. Change the connection.uri according to your need.
  5. Verify that the connectors are working properly.

    1. Connect to the MongoDB database on the OpenShift side and run these commands to test if the connectors work:

      mongo -u myUserAdmin -p password --authenticationDatabase admin
      
      use warehouse
      db.inventory.insertOne( { name:"game-console-1"} )
      db.inventory.insertOne( { name:"game-console-2"} )

      After the insert command you should get an acknowledgement:

      MongoDB insert acknowledge

      The topics are created in the IBM Event Streams UI under Topics.

      kafka topics

      You should be able to see the messages from above when clicking on the topic:

      kafka topic messages
    2. On the remote MongoDB side, the entries are created:

      mongo -u myUserAdmin -p password --authenticationDatabase admin
      
      use shop
      show collections
      db.inventory.find()

      The output should look similar to the following:

      MongoDB show received messages