Setting up IBM Event Streams
Learn how to set up the solution including IBM Event Streams, Kafka Cluster, Kafka Connect, and Connectors.
There are different options available to create a Kafka environment with IBM Event Streams:
Use the Operator details view of the IBM Event Streams operator create the required resources
Create a new
Integration
instance of typeKafka cluster
via the IBM Cloud Pak for Integration UI / Platform UI
After creating the Kafka cluster
/EventStreams
resource you can either:
Manage resources in the IBM Event Streams Operator overview
Use the IBM Event Stream UI
In the following procedures both user interfaces are used to create and manage the Kafka cluster.
Setting up Kafka Connect
Create a Kafka cluster instance (also known as an Event Stream resource).
Go to the Cloud Pak for Integration UI.
Click Integration instances:
Click Create an instance, select Event-Streams, click Next, select Development, click Next, accept the license, click Create.
Note: Do not select the option:minimal without security
, as this will lead to connection issues when following this guide.ALTERNATIVE workflow:
As Administrator, go to Operators > Installed Operators and find
IBM Event Streams
. Some blue links are displayed in the Provided APIs column. Click the Event Stream link.Click Create EventStreams.
Switch from Form view to YAML view and select the Samples tab in the right panel.
- Click Development Sample > Try it.Note: The options presented under samples are similar to the options you get when you create the EventStreams resource via the Cloud Pak for Integration UI.
Press Create.
Create a KafkaConnect environment.
The KafkaConnect needs to be able to connect to MongoDB, so it must include the libraries for it to do so. To make this work a custom docker image is built that includes the required
.jar
files for theMongoDbSourceConnector
andMongoDbSinkConnector
.The steps in this procedure require that you already have an internal image registry setup within OpenShift and that you can push/pull to it from the Bastion node.
The Dockerfile for the
Kafka Connect environment
can be downloaded from the IBM Event Streams UI Toolbox. To get there, open the IBM Event Streams UI:Go to Toolbox:
Select
Set up a Kafka Connect environment
and proceed with theSet up
:Download the Kafka Connect ZIP file, move it to the bastion and unzip it to a folder called kafkaconnect:
unzip kafkaconnect.zip -d kafkaconnect
Download the
mongo-kafka-connect-1.10.1-all.jar
file from org/mongodb/kafka/mongo-kafka-connect/1.10.1 to the bastion.curl https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.10.1/mongo-kafka-connect-1.10.1-all.jar --output mongo-kafka-connect-1.10.1-all.jar
- Get the
mongo-kafka-connect-1.10.1-all.jar
file from org/mongodb/kafka/mongo-kafka-connect/1.10.1Note: When a newer version is available you might want to try it.Note: Check out the instructions about which jar file to download in Connector catalog. When we tried the jar file from here, KafkaConnect did not find the required library methods. Copy the
mongo-kafka-connect-1.10.1-all.jar
file to themy-plugins
folder of the unpacked zip archive:cp mongo-kafka-connect-1.10.1-all.jar kafkaconnect/my-plugins/
As the Dockerfile pulls an image from
cp.icr.io
you need to make sure that you have access to this public container registry. You should be able to log in into the container registry with the entitlement key used during the Cloud Pak for Integration setup (see IBM Cloud Pak for Data instructions). Make sure that you are able to log in to the internal image registry of your OpenShift deployment.Build and tag the image:
# authenticate podman login cp.icr.io -u cp -p "${ENTITLENMENTKEY}" # cd into the directory with the Dockerfile podman build . -t my-connect-cluster-image-1
Push the new image to the internal image registry. When the
oc get route
command is not working check out: How to expose the registry:# Get the route to the internal OpenShift image registry HOST=$(oc get route default-route -n openshift-image-registry --template='{{ .spec.host }}') # Login to the internal docker image registry podman login -u admin -p $(oc whoami -t) ${HOST} # Push the image NAMESPACE="openshift" IMAGE_NAME="my-connect-cluster-image-1:latest" LOCAL_IMAGE_NAME="localhost/${IMAGE_NAME}" REMOTE_IMAGE_NAME="${IMAGE_NAME}" podman push ${LOCAL_IMAGE_NAME} ${HOST}/${NAMESPACE}/${REMOTE_IMAGE_NAME}
Generate credentials so that the KafkaConnect environment can connect to the Kafka cluster. In the
IBM Event Stream
UI: Click the tileConnect to this cluster
.Then click
Generate SCRAM credentials
.Select the most liberal options. The credential name used in the following is:
my-credentials
.Select
All topics
and click next for the access.Select
All consumer group
and click next.Select
All transactional IDs
and click next.Note: This automatically creates a Kafka user.The connection to the Kafka cluster is secured by TLS, so you need to trust the CA certificate used by the
Kafka cluster/Event Stream cluster
.To find the certificate, go to Installed Operators, find IBM Event Streams, then click Event Streams. Click the Event Stream resource and go to the YAML view. The certificate is under kafkaListeners. Copy the certificate to a local plain-text file.
Create a new secret by clicking Workloads, Secrets. Create a
Key/value secret
then enter the following information:Name:
tls-cert-of-development-external
Key:
tls.crt
Value: DRAG AND DROP the plain-text file containing the certificate here.
Click Create.
In the folder with the Dockerfile you also find a
kafka-connect.yaml
file. Make a backup of the file and edit it. Change it according to your environment. Compare your edits with the following file to match the details:apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnect metadata: name: my-kafka-connect-external-bootstrap annotations: eventstreams.ibm.com/use-connector-resources: "true" spec: replicas: 1 bootstrapServers: development-kafka-bootstrap-cloudpack4integration.apps.ocp0.sa.boe:443 image: default-route-openshift-image-registry.apps.ocp0.sa.boe/openshift/my-connect-cluster-image-1:latest template: pod: imagePullSecrets: [] metadata: annotations: eventstreams.production.type: CloudPakForIntegrationNonProduction productID: 2a79e49111f44ec3acd89608e56138f5 productName: IBM Event Streams for Non Production productVersion: 11.2.1 productMetric: VIRTUAL_PROCESSOR_CORE productChargedContainers: my-connect-cluster-connect cloudpakId: c8b82d189e7545f0892db9ef2731b90d cloudpakName: IBM Cloud Pak for Integration productCloudpakRatio: "2:1" config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 tls: trustedCertificates: - secretName: tls-cert-of-development-external certificate: tls.crt authentication: type: scram-sha-512 username: my-credentials passwordSecret: secretName: my-credentials password: password
Apply the edited yaml file with:
oc project cloudpack4integration oc apply -f kafka-connect.yaml
Note: If the image couldn't be pulled because of permissions, refer to the official OpenShift documentation for information about how to allow the image pull.
To create the Source Connector that listens for changes, apply the
source-connector.yaml
withoc apply -f source-connector.yaml
:apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnector metadata: name: my-source-connector labels: eventstreams.ibm.com/cluster: my-kafka-connect-external-bootstrap spec: class: com.mongodb.kafka.connect.MongoSourceConnector tasksMax: 3 config: connection.uri: mongodb://myUserAdmin:password@ibm-mongodb-enterprise-helm-service.mongodb-test-1.svc.cluster.local:27017 database: "warehouse" collection: "inventory" topic.prefix: "mongo" copy.existing: true key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: false publish.full.document.only: true pipeline: > [{"$match":{"operationType":{"$in":["insert","update","replace"]}}},{"$project":{"_id":1,"fullDocument":1,"ns":1,"documentKey":1}}]
To create the Sink Connector that writes the changes to an external MongoDB instance, apply the
sink-connector.yaml
withoc apply -f sink-connector.yaml
:apiVersion: eventstreams.ibm.com/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector labels: eventstreams.ibm.com/cluster: my-kafka-connect-external-bootstrap spec: class: com.mongodb.kafka.connect.MongoSinkConnector tasksMax: 3 config: connection.uri: 'mongodb://mongodb.example.com:27017' database: "shop" collection: "inventory" # comma separated list topics: "mongo.warehouse.inventory" post.processor.chain: com.mongodb.kafka.connect.sink.processor.DocumentIdAdder,com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: false
Note: If you don't have an external MongoDB instance you can also use the same MongoDB that was used in the SourceConnector to test if the connectors work properly. Change the connection.uri according to your need.Verify that the connectors are working properly.
Connect to the MongoDB database on the OpenShift side and run these commands to test if the connectors work:
mongo -u myUserAdmin -p password --authenticationDatabase admin use warehouse db.inventory.insertOne( { name:"game-console-1"} ) db.inventory.insertOne( { name:"game-console-2"} )
After the insert command you should get an acknowledgement:
The topics are created in the IBM Event Streams UI under Topics.
You should be able to see the messages from above when clicking on the topic:
On the remote MongoDB side, the entries are created:
mongo -u myUserAdmin -p password --authenticationDatabase admin use shop show collections db.inventory.find()
The output should look similar to the following: