Connect to a Kafka cluster
You can connect to a Kafka cluster by:
- through the kcat console client. To avoid potential connection errors, we recommend using the librdkafka library, version 2.6.1 or later;
- program code.
For all methods, you can connect with SSL or without SSL.
When connecting, specify the port and address.
Connection ports
Use the following ports to connect to Kafka:
- 9092 — port for connections without an SSL certificate;
- 9093 — port for connections with an SSL certificate.
Connection addresses
The connection address depends on the cluster subnet and where you are connecting from. You can choose an address based on one of the scenarios:
- connecting to a cluster in a public subnet;
- connecting from a private subnet to a cluster in a private subnet.
It is not possible to connect to a cluster in a private subnet from the internet.

Connecting to a cluster in a public subnet
If the cluster is in a public subnet, you can connect to nodes by the DNS address or the IP address from the public subnet.
We recommend connecting via the DNS address. The cluster uses the master discovery mechanism for DNS addresses; the address is tied to the node role, not the node itself. If the master is unavailable, one of the replicas becomes the new master and the address moves to the new node along with the role.
When connecting via an IP address from a public subnet, the master discovery mechanism is not used. If one of the replicas becomes the new master, the master's IP address will change, and the connection using the old IP address will stop working.
You can view the connection address in the Dashboard.
Connecting from a private subnet to a cluster in a private subnet
If you are connecting from a private subnet to a cluster in a private subnet, you can use the DNS address or the private IP address.
We recommend connecting via the DNS address. The cluster uses the master discovery mechanism for DNS addresses; the address is tied to the node role, not the node itself. If the master is unavailable, one of the replicas becomes the new master and the address moves to the new node along with the role.
When connecting via a private IP address, the master discovery mechanism is not used. If one of the replicas becomes the new master, the master's IP address will change, and the connection using the old IP address will stop working.
To connect from another private subnet, first connect both private subnets to the Cloud Router.
You can view the connection address in the Dashboard.
View the connection address
- In the Dashboard, on the top menu, click Products and select Managed Databases.
- Open the Active tab.
- Open the database cluster page → Connection tab.
- In the Connection addresses block, view the address.
Connect with SSL
Connecting using TLS (SSL) encryption ensures a secure connection between your server and the database cluster.
Bash
Python (confluent-kafka)
Python (kafka-python)
Go
Node.js
Java
To avoid potential connection errors when using the kcat console client, we recommend using the librdkafka library, version 2.6.1 or later.
-
Download the root certificate and place it in the
~/.kafka/folder:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Use the connection example for a consumer:
kcat -C \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_SSL \-X sasl.mechanisms=SCRAM-SHA-512 \-X ssl.ca.location=$HOME/.kafka/root.crtSpecify:
<host>— the node's DNS address;<port>— connection port;<topic_name>— the topic name;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password.
-
Use the connection example for a producer:
kcat -P \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_SSL \-X sasl.mechanisms=SCRAM-SHA-512 \-X ssl.ca.location=$HOME/.kafka/root.crtSpecify:
<host>— the node's DNS address;<port>— connection port;<topic_name>— the topic name;<user_name>— the user name with the producer role that has access to the topic;<password>— the user password.
-
Download the root certificate and place it in the
~/.kafka/folder:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Install the confluent-kafka library:
pip install confluent-kafka -
Use the connection example for a consumer:
import confluent_kafkadef error_callback(err):raise errconsumer = confluent_kafka.Consumer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_SSL","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","group.id": "example","ssl.ca.location": "<path>","auto.offset.reset": "earliest","enable.auto.commit": False,"error_cb": error_callback,})consumer.subscribe(["<topic_name>"])while True:record = consumer.poll(timeout=1.0)if record is not None:if record.error():if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")elif record.error():raise confluent_kafka.KafkaException(record.error())else:print(record.value())consumer.close()Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<path>— the full path to the root certificate;<topic_name>— the topic name.
-
Use the connection example for a producer:
import confluent_kafkadef error_callback(err):raise errproducer = confluent_kafka.Producer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_SSL","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","ssl.ca.location": "<path>","error_cb": error_callback,})producer.produce("<topic_name>", "message")producer.flush(60)Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the producer role that has access to the topic;<password>— the user password;<path>— the full path to the root certificate;<topic_name>— the topic name.
-
Download the root certificate and place it in the
~/.kafka/folder:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Install the kafka-python library:
pip install kafka-python -
Use the connection example for a consumer:
import kafkaconsumer = kafka.KafkaConsumer("<topic_name>",bootstrap_servers="<host>:<port>",security_protocol="SASL_SSL",sasl_mechanism="SCRAM-SHA-512",ssl_cafile="<path>",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)for record in consumer:print(record)Specify:
<topic_name>— the topic name<host>— the node's DNS address;<port>— connection port;<path>— the full path to the root certificate;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password.
-
Use the connection example for a producer:
import kafkaproducer = kafka.KafkaProducer(bootstrap_servers="<host>:<port>",security_protocol="SASL_SSL",sasl_mechanism="SCRAM-SHA-512",ssl_cafile="<path>",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)future = producer.send(topic="<topic_name>",value=b"message",)result = future.get(timeout=60)print(result)Specify:
<host>— the node's DNS address;<port>— connection port;<path>— the full path to the root certificate;<user_name>— the user name with the producer role that has access to the topic;<password>— the user password;<topic_name>— the topic name.
-
Download the root certificate and place it in the
~/.kafka/folder:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Create the
scram.gofile:scram.go
package mainimport ("crypto/sha256""crypto/sha512""github.com/xdg-go/scram")var (SHA256 scram.HashGeneratorFcn = sha256.NewSHA512 scram.HashGeneratorFcn = sha512.New)type XDGSCRAMClient struct {*scram.Client*scram.ClientConversationscram.HashGeneratorFcn}func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)if err != nil {return err}x.ClientConversation = x.Client.NewConversation()return nil}func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {response, err = x.ClientConversation.Step(challenge)return}func (x *XDGSCRAMClient) Done() bool {return x.ClientConversation.Done()} -
Use the connection example for a consumer:
package mainimport ("crypto/tls""crypto/x509""fmt""io/ioutil""os""os/signal""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"const CERT = "<path>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Version = sarama.V3_3_1_0conf.Consumer.Return.Errors = trueconf.ClientID = "go_client"conf.Metadata.Full = trueconf.Net.SASL.Enable = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.Handshake = trueconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)certs := x509.NewCertPool()pemPath := CERTpemData, err := ioutil.ReadFile(pemPath)if err != nil {fmt.Println("Couldn't load cert: ", err.Error())panic(err)}certs.AppendCertsFromPEM(pemData)conf.Net.TLS.Enable = trueconf.Net.TLS.Config = &tls.Config{RootCAs: certs,}master, err := sarama.NewConsumer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create consumer: ", err.Error())panic(err)}defer func() {if err := master.Close(); err != nil {panic(err)}}()consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)if err != nil {panic(err)}signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)doneCh := make(chan struct{})go func() {for {select {case err := <-consumer.Errors():fmt.Println(err)case msg := <-consumer.Messages():fmt.Println("Received messages", string(msg.Key), string(msg.Value))case <-signals:doneCh <- struct{}{}}}}()<-doneCh}Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<topic_name>— the topic name;<path>— the full path to the root certificate.
-
Use the connection example for a producer:
package mainimport ("crypto/tls""crypto/x509""fmt""io/ioutil""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"const CERT = "<path>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Producer.Return.Successes = trueconf.Version = sarama.V3_3_1_0conf.ClientID = "go_client"conf.Net.SASL.Enable = trueconf.Net.SASL.Handshake = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)certs := x509.NewCertPool()pemPath := CERTpemData, err := ioutil.ReadFile(pemPath)if err != nil {fmt.Println("Couldn't load cert: ", err.Error())panic(err)}certs.AppendCertsFromPEM(pemData)conf.Net.TLS.Enable = trueconf.Net.TLS.Config = &tls.Config{RootCAs: certs,}syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create producer: ", err.Error())panic(err)}msg := &sarama.ProducerMessage{Topic: TOPIC,Value: sarama.StringEncoder("message"),}_, _, err = syncProducer.SendMessage(msg)if err != nil {fmt.Println("Couldn't send message: ", err.Error())panic(err)}}Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<topic_name>— the topic name;<path>— the full path to the root certificate.
-
Download the root certificate and place it in the
~/.kafka/folder:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Install the kafkajs library:
npm install kafkajs -
Use the connection example for a consumer:
const fs = require('fs');const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: {ca: [fs.readFileSync('<path>', 'utf-8')],},sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const consumer = kafka.consumer({ groupId: 'example' });const run = async () => {await consumer.connect();await consumer.subscribe({topic: '<topic_name>',fromBeginning: true,});await consumer.run({eachMessage: async ({ message }) => {console.log({ value: message.value.toString() });},});};run();Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<path>— the full path to the root certificate;<topic_name>— the topic name.
-
Use the connection example for a producer:
const fs = require('fs');const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: {ca: [fs.readFileSync('<path>', 'utf-8')],},sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const producer = kafka.producer();const run = async () => {await producer.connect();await producer.send({topic: '<topic_name>',messages: [{ value: 'message' }],});await producer.disconnect();};run();Specify:
<host>— the node's DNS address;<port>— connection port;<path>— the full path to the root certificate;<user_name>— the user name with the producer role that has access to the topic;<password>— the user password;<topic_name>— the topic name.
-
Download the root certificate and place it in the
~/.kafka/folder:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Go to the directory where the Java certificate store will be located:
cd /etc/security -
Add the SSL certificate to the Java Key Store (Java Key Store):
keytool \-importcert \-alias RootCA \-file ~/.kafka/root.crt \-keystore ssl \-storepass <keystore password> \-nopromptSpecify the
<keystore_password>— the keystore password for additional protection. -
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>app</artifactId><packaging>jar</packaging><version>1.0</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency></dependencies><build><finalName>${project.artifactId}-${project.version}</finalName><sourceDirectory>src</sourceDirectory><resources><resource><directory>src</directory></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><goals><goal>attached</goal></goals><phase>package</phase><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.1.0</version><configuration><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></plugin></plugins></build></project> -
Use the connection example for a consumer:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.clients.consumer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";private static final String TS_FILE = "/etc/security/ssl";private static final String TS_PASSWORD = "<keystore_password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String deserializer = StringDeserializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("group.id", "example");props.put("key.deserializer", deserializer);props.put("value.deserializer", deserializer);props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);props.put("ssl.truststore.location", TS_FILE);props.put("ssl.truststore.password", TS_PASSWORD);Consumer<String, String>consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(new String[] {TOPIC}));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record: records) {System.out.println(record.value());}}}}Specify:
<host>— the node's DNS address;<port>— connection port;<topic_name>— the topic name;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<keystore_password>— the keystore password.
-
Use the connection example for a producer:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.kafka.clients.producer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";private static final String TS_FILE = "/etc/security/ssl";private static final String TS_PASSWORD = "<keystore_password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String serializer = StringSerializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put("acks", "all");props.put("key.serializer", serializer);props.put("value.serializer", serializer);props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);props.put("ssl.truststore.location", TS_FILE);props.put("ssl.truststore.password", TS_PASSWORD);Producer<String, String> producer = new KafkaProducer<>(props);try {producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();producer.flush();producer.close();} catch (Exception ex) {System.out.println(ex);producer.close();}}}Specify:
<host>— the node's DNS address;<port>— connection port;<topic_name>— the topic name;<user_name>— the user name with the producer role that has access to the topic;<password>— the user password;<keystore_password>— the certificate password.
-
Build and run the application:
mvn clean packagejava -jar target/app-1.0-jar-with-dependencies.jar
Connect without SSL
pom.xml
Bash
Python (confluent-kafka)
Python (kafka-python)
scram.go
Node.js
To avoid potential connection errors when using the kcat console client, we recommend using the librdkafka library, version 2.6.1 or later.
-
Open the CLI.
-
Use the connection example for a consumer:
kcat -C \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanisms=SCRAM-SHA-512Specify:
<host>— the node's DNS address;<port>— connection port;<topic_name>— the topic name;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password.
-
Use the connection example for a producer:
kcat -P \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanisms=SCRAM-SHA-512Specify:
<host>— the node's DNS address;<port>— connection port;<topic_name>— the topic name;<user_name>— the user name with the producer role that has access to the topic;<password>— the user password.
-
Install the confluent-kafka library:
pip install confluent-kafka -
Use the connection example for a consumer:
import confluent_kafkadef error_callback(err):raise errconsumer = confluent_kafka.Consumer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_PLAINTEXT","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","group.id": "example","auto.offset.reset": "earliest","enable.auto.commit": False,"error_cb": error_callback,})consumer.subscribe(["<topic_name>"])while True:record = consumer.poll(timeout=1.0)if record is not None:if record.error():if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")elif record.error():raise confluent_kafka.KafkaException(record.error())else:print(record.value())consumer.close()Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<topic_name>— the topic name.
-
Use the connection example for a producer:
import confluent_kafkadef error_callback(err):raise errproducer = confluent_kafka.Producer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_PLAINTEXT","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","error_cb": error_callback,})producer.produce("<topic_name>", "message")producer.flush(60)Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name of the Managed Database;<password>— the user password;<topic_name>— the topic name.
-
Install the kafka-python library:
pip install kafka-python -
Use the connection example for a consumer:
import kafkaconsumer = kafka.KafkaConsumer("<topic_name>",bootstrap_servers="<host>:<port>",security_protocol="SASL_PLAINTEXT",sasl_mechanism="SCRAM-SHA-512",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)for record in consumer:print(record)Specify:
<topic_name>— the topic name<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password.
-
Use the connection example for a producer:
import kafkaproducer = kafka.KafkaProducer(bootstrap_servers="<host>:<port>",security_protocol="SASL_PLAINTEXT",sasl_mechanism="SCRAM-SHA-512",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)future = producer.send(topic="<topic_name>",value=b"message",)result = future.get(timeout=60)print(result)Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name of the Managed Database;<password>— the user password;<topic_name>— the topic name.
-
Create the
scram.gofile:Go
package mainimport ("crypto/sha256""crypto/sha512""github.com/xdg-go/scram")var (SHA256 scram.HashGeneratorFcn = sha256.NewSHA512 scram.HashGeneratorFcn = sha512.New)type XDGSCRAMClient struct {*scram.Client*scram.ClientConversationscram.HashGeneratorFcn}func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)if err != nil {return err}x.ClientConversation = x.Client.NewConversation()return nil}func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {response, err = x.ClientConversation.Step(challenge)return}func (x *XDGSCRAMClient) Done() bool {return x.ClientConversation.Done()} -
Use the connection example for a consumer:
package mainimport ("fmt""os""os/signal""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Version = sarama.V3_3_1_0conf.Consumer.Return.Errors = trueconf.ClientID = "go_client"conf.Metadata.Full = trueconf.Net.SASL.Enable = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.Handshake = trueconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)conf.Net.TLS.Enable = falsemaster, err := sarama.NewConsumer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create consumer: ", err.Error())panic(err)}defer func() {if err := master.Close(); err != nil {panic(err)}}()consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)if err != nil {panic(err)}signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)doneCh := make(chan struct{})go func() {for {select {case err := <-consumer.Errors():fmt.Println(err)case msg := <-consumer.Messages():fmt.Println("Received messages", string(msg.Key), string(msg.Value))case <-signals:doneCh <- struct{}{}}}}()<-doneCh}Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<topic_name>— the topic name.
-
Use the connection example for a producer:
package mainimport ("fmt""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Producer.Return.Successes = trueconf.Version = sarama.V3_3_1_0conf.ClientID = "go_client"conf.Net.SASL.Enable = trueconf.Net.SASL.Handshake = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)conf.Net.TLS.Enable = falsesyncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create producer: ", err.Error())panic(err)}msg := &sarama.ProducerMessage{Topic: TOPIC,Value: sarama.StringEncoder("message"),}_, _, err = syncProducer.SendMessage(msg)if err != nil {fmt.Println("Couldn't send message: ", err.Error())panic(err)}}Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<topic_name>— the topic name.
-
Install the kafkajs library:
npm install kafkajs -
Use the connection example for a consumer:
const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: false,sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const consumer = kafka.consumer({ groupId: 'example' });const run = async () => {await consumer.connect();await consumer.subscribe({topic: '<topic_name>',fromBeginning: true,});await consumer.run({eachMessage: async ({ message }) => {console.log({ value: message.value.toString() });},});};run();Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password;<topic_name>— the topic name.
-
Use the connection example for a producer:
const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: false,sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const producer = kafka.producer();const run = async () => {await producer.connect();await producer.send({topic: '<topic_name>',messages: [{ value: 'message' }],});await producer.disconnect();};run();Specify:
<host>— the node's DNS address;<port>— connection port;<user_name>— the user name of the Managed Database;<password>— the user password;<topic_name>— the topic name.
-
Create a configuration file for Maven:
Java
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>app</artifactId><packaging>jar</packaging><version>1.0</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency></dependencies><build><finalName>${project.artifactId}-${project.version}</finalName><sourceDirectory>src</sourceDirectory><resources><resource><directory>src</directory></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><goals><goal>attached</goal></goals><phase>package</phase><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.1.0</version><configuration><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></plugin></plugins></build></project> -
Use the connection example for a consumer:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.clients.consumer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String deserializer = StringDeserializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("group.id", "example");props.put("key.deserializer", deserializer);props.put("value.deserializer", deserializer);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);Consumer<String, String>consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(new String[] {TOPIC}));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record: records) {System.out.println(record.value());}}}}pom.xml:
<host>— the node's DNS address;<port>— connection port;<topic_name>— the topic name;<user_name>— the user name with the consumer role that has access to the topic;<password>— the user password.
-
Use the connection example for a producer:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.kafka.clients.producer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String serializer = StringSerializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put("acks", "all");props.put("key.serializer", serializer);props.put("value.serializer", serializer);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);Producer<String, String> producer = new KafkaProducer<>(props);try {producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();producer.flush();producer.close();} catch (Exception ex) {System.out.println(ex);producer.close();}}}Specify:
<host>— the node's DNS address;<port>— connection port;<topic_name>— the topic name;<user_name>— the user name of the Managed Database;<password>— the user password.
-
Build and run the application:
mvn clean packagejava -jar target/app-1.0-jar-with-dependencies.jar