Connect to a Kafka cluster
You can connect to a Kafka cluster by:
- via the kcat console client. To avoid potential connection errors, we recommend using the librdkafka library version 2.6.1 or later;
- program code.
For all methods, you can connect with SSL or without SSL.
When connecting, specify the port and address.
Connection ports
Use the following ports to connect to Kafka:
- 9092 — port for connections without an SSL certificate;
- 9093 — port for connections with an SSL certificate.
Connection addresses
You can choose a connection address based on one of the following scenarios:
- connecting to a cluster in a public subnet;
- connecting from a private subnet to a cluster in a private subnet.
It is not possible to connect to a cluster in a private subnet from the internet.
You can view the connection address in the Control Panel.

Connecting to a cluster in a public subnet
If the cluster is in a public subnet, you can connect to the nodes using a DNS address or an IP address from the public subnet.
We recommend connecting using a DNS address. A master discovery mechanism is used for DNS addresses in a cluster: the address is bound to a node role, not to the node itself. If the master becomes unavailable, one of the replicas becomes the new master, and the address moves to the new node along with the role.
When connecting using a public subnet IP address, the master discovery mechanism is not used. If one of the replicas becomes the new master, the master's IP address will change, and the connection via the old IP address will stop working.
Connecting from a private subnet to a cluster in a private subnet
If you are connecting from a private subnet to a cluster in a private subnet, you can use the DNS address or the private IP address.
We recommend connecting using a DNS address. A master discovery mechanism is used for DNS addresses in a cluster: the address is bound to a node role, not to the node itself. If the master becomes unavailable, one of the replicas becomes the new master, and the address moves to the new node along with the role.
When connecting using a private IP address, the master discovery mechanism is not used. If one of the replicas becomes the new master, the master's IP address will change, and the connection via the old IP address will stop working.
To connect from another private subnet, first connect both private subnets to the Cloud Router.
View the connection address
- In the Control Panel, click Products in the top menu and select Managed Databases.
- Open the Active tab.
- Open the database cluster page → Connection tab.
- In the Connection settings block, view the address.
Connect with SSL
Connecting using TLS(SSL) encryption ensures a secure connection between your server and the database cluster.
Bash
Python (confluent-kafka)
Python (kafka-python)
Go
Node.js
Java
To avoid potential connection errors when using the kcat console client, we recommend using the librdkafka library version 2.6.1 or later.
-
Download the root certificate and place it in the
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Use the following connection example for a consumer:
kcat -C \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_SSL \-X sasl.mechanisms=SCRAM-SHA-512 \-X ssl.ca.location=$HOME/.kafka/root.crtSpecify:
<host>— node DNS address; ;<port>— connection port;;<topic_name>— topic name; ;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password.
-
Use the following connection example for a producer:
kcat -P \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_SSL \-X sasl.mechanisms=SCRAM-SHA-512 \-X ssl.ca.location=$HOME/.kafka/root.crtSpecify:
<host>— node DNS address; ;<port>— connection port;;<topic_name>— topic name; ;<user_name>— user name with the producer role who has access to the topic; ;<password>— user password.
-
Download the root certificate and place it in the
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Install the confluent-kafka library:
pip install confluent-kafka -
Use the following connection example for a consumer:
import confluent_kafkadef error_callback(err):raise errconsumer = confluent_kafka.Consumer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_SSL","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","group.id": "example","ssl.ca.location": "<path>","auto.offset.reset": "earliest","enable.auto.commit": False,"error_cb": error_callback,})consumer.subscribe(["<topic_name>"])while True:record = consumer.poll(timeout=1.0)if record is not None:if record.error():if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")elif record.error():raise confluent_kafka.KafkaException(record.error())else:print(record.value())consumer.close()Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<path>— full path to the root certificate; ;<topic_name>— topic name.
-
Use the following connection example for a producer:
import confluent_kafkadef error_callback(err):raise errproducer = confluent_kafka.Producer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_SSL","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","ssl.ca.location": "<path>","error_cb": error_callback,})producer.produce("<topic_name>", "message")producer.flush(60)Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the producer role who has access to the topic; ;<password>— user password; ;<path>— full path to the root certificate; ;<topic_name>— topic name.
-
Download the root certificate and place it in the
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Install the kafka-python library:
pip install kafka-python -
Use the following connection example for a consumer:
import kafkaconsumer = kafka.KafkaConsumer("<topic_name>",bootstrap_servers="<host>:<port>",security_protocol="SASL_SSL",sasl_mechanism="SCRAM-SHA-512",ssl_cafile="<path>",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)for record in consumer:print(record)Specify:
<topic_name>— topic name<host>— node DNS address; ;<port>— connection port;;<path>— full path to the root certificate; ;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password.
-
Use the following connection example for a producer:
import kafkaproducer = kafka.KafkaProducer(bootstrap_servers="<host>:<port>",security_protocol="SASL_SSL",sasl_mechanism="SCRAM-SHA-512",ssl_cafile="<path>",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)future = producer.send(topic="<topic_name>",value=b"message",)result = future.get(timeout=60)print(result)Specify:
<host>— node DNS address; ;<port>— connection port;;<path>— full path to the root certificate; ;<user_name>— user name with the producer role who has access to the topic; ;<password>— user password; ;<topic_name>— topic name.
-
Download the root certificate and place it in the
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Create a file named
scram.go:scram.go
package mainimport ("crypto/sha256""crypto/sha512""github.com/xdg-go/scram")var (SHA256 scram.HashGeneratorFcn = sha256.NewSHA512 scram.HashGeneratorFcn = sha512.New)type XDGSCRAMClient struct {*scram.Client*scram.ClientConversationscram.HashGeneratorFcn}func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)if err != nil {return err}x.ClientConversation = x.Client.NewConversation()return nil}func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {response, err = x.ClientConversation.Step(challenge)return}func (x *XDGSCRAMClient) Done() bool {return x.ClientConversation.Done()} -
Use the following connection example for a consumer:
package mainimport ("crypto/tls""crypto/x509""fmt""io/ioutil""os""os/signal""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"const CERT = "<path>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Version = sarama.V3_3_1_0conf.Consumer.Return.Errors = trueconf.ClientID = "go_client"conf.Metadata.Full = trueconf.Net.SASL.Enable = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.Handshake = trueconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)certs := x509.NewCertPool()pemPath := CERTpemData, err := ioutil.ReadFile(pemPath)if err != nil {fmt.Println("Couldn't load cert: ", err.Error())panic(err)}certs.AppendCertsFromPEM(pemData)conf.Net.TLS.Enable = trueconf.Net.TLS.Config = &tls.Config{RootCAs: certs,}master, err := sarama.NewConsumer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create consumer: ", err.Error())panic(err)}defer func() {if err := master.Close(); err != nil {panic(err)}}()consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)if err != nil {panic(err)}signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)doneCh := make(chan struct{})go func() {for {select {case err := <-consumer.Errors():fmt.Println(err)case msg := <-consumer.Messages():fmt.Println("Received messages", string(msg.Key), string(msg.Value))case <-signals:doneCh <- struct{}{}}}}()<-doneCh}Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<topic_name>— topic name; ;<path>— full path to the root certificate.
-
Use the following connection example for a producer:
package mainimport ("crypto/tls""crypto/x509""fmt""io/ioutil""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"const CERT = "<path>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Producer.Return.Successes = trueconf.Version = sarama.V3_3_1_0conf.ClientID = "go_client"conf.Net.SASL.Enable = trueconf.Net.SASL.Handshake = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)certs := x509.NewCertPool()pemPath := CERTpemData, err := ioutil.ReadFile(pemPath)if err != nil {fmt.Println("Couldn't load cert: ", err.Error())panic(err)}certs.AppendCertsFromPEM(pemData)conf.Net.TLS.Enable = trueconf.Net.TLS.Config = &tls.Config{RootCAs: certs,}syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create producer: ", err.Error())panic(err)}msg := &sarama.ProducerMessage{Topic: TOPIC,Value: sarama.StringEncoder("message"),}_, _, err = syncProducer.SendMessage(msg)if err != nil {fmt.Println("Couldn't send message: ", err.Error())panic(err)}}Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<topic_name>— topic name; ;<path>— full path to the root certificate.
-
Download the root certificate and place it in the
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Install the kafkajs library:
npm install kafkajs -
Use the following connection example for a consumer:
const fs = require('fs');const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: {ca: [fs.readFileSync('<path>', 'utf-8')],},sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const consumer = kafka.consumer({ groupId: 'example' });const run = async () => {await consumer.connect();await consumer.subscribe({topic: '<topic_name>',fromBeginning: true,});await consumer.run({eachMessage: async ({ message }) => {console.log({ value: message.value.toString() });},});};run();Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<path>— full path to the root certificate; ;<topic_name>— topic name.
-
Use the following connection example for a producer:
const fs = require('fs');const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: {ca: [fs.readFileSync('<path>', 'utf-8')],},sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const producer = kafka.producer();const run = async () => {await producer.connect();await producer.send({topic: '<topic_name>',messages: [{ value: 'message' }],});await producer.disconnect();};run();Specify:
<host>— node DNS address; ;<port>— connection port;;<path>— full path to the root certificate; ;<user_name>— user name with the producer role who has access to the topic; ;<password>— user password; ;<topic_name>— topic name.
-
Download the root certificate and place it in the
~/.kafka/:mkdir -p ~/.kafka/wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crtchmod 0600 ~/.kafka/root.crt -
Go to the directory where the Java certificate store will be located:
cd /etc/security -
Add the SSL certificate to the Java Key Store (JKS):
keytool \-importcert \-alias RootCA \-file ~/.kafka/root.crt \-keystore ssl \-storepass <keystore password> \-nopromptSpecify
<keystore_password>— the keystore password for additional security. -
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>app</artifactId><packaging>jar</packaging><version>1.0</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency></dependencies><build><finalName>${project.artifactId}-${project.version}</finalName><sourceDirectory>src</sourceDirectory><resources><resource><directory>src</directory></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><goals><goal>attached</goal></goals><phase>package</phase><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.1.0</version><configuration><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></plugin></plugins></build></project> -
Use the following connection example for a consumer:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.clients.consumer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";private static final String TS_FILE = "/etc/security/ssl";private static final String TS_PASSWORD = "<keystore_password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String deserializer = StringDeserializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("group.id", "example");props.put("key.deserializer", deserializer);props.put("value.deserializer", deserializer);props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);props.put("ssl.truststore.location", TS_FILE);props.put("ssl.truststore.password", TS_PASSWORD);Consumer<String, String>consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(new String[] {TOPIC}));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record: records) {System.out.println(record.value());}}}}Specify:
<host>— node DNS address; ;<port>— connection port;;<topic_name>— topic name; ;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<keystore_password>— the keystore password.
-
Use the following connection example for a producer:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.kafka.clients.producer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";private static final String TS_FILE = "/etc/security/ssl";private static final String TS_PASSWORD = "<keystore_password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String serializer = StringSerializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put("acks", "all");props.put("key.serializer", serializer);props.put("value.serializer", serializer);props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);props.put("ssl.truststore.location", TS_FILE);props.put("ssl.truststore.password", TS_PASSWORD);Producer<String, String> producer = new KafkaProducer<>(props);try {producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();producer.flush();producer.close();} catch (Exception ex) {System.out.println(ex);producer.close();}}}Specify:
<host>— node DNS address; ;<port>— connection port;;<topic_name>— topic name; ;<user_name>— user name with the producer role who has access to the topic; ;<password>— user password; ;<keystore_password>— the certificate password.
-
Build and run the application:
mvn clean packagejava -jar target/app-1.0-jar-with-dependencies.jar
Connect without SSL
Bash
Python (confluent-kafka)
Python (kafka-python)
Go
Node.js
Java
To avoid potential connection errors when using the kcat console client, we recommend using the librdkafka library version 2.6.1 or later.
-
Open the CLI.
-
Use the following connection example for a consumer:
kcat -C \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanisms=SCRAM-SHA-512Specify:
<host>— node DNS address; ;<port>— connection port;;<topic_name>— topic name; ;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password.
-
Use the following connection example for a producer:
kcat -P \-b <host>:<port> \-t <topic_name> \-X sasl.username=<user_name> \-X sasl.password=<password> \-X security.protocol=SASL_PLAINTEXT \-X sasl.mechanisms=SCRAM-SHA-512Specify:
<host>— node DNS address; ;<port>— connection port;;<topic_name>— topic name; ;<user_name>— user name with the producer role who has access to the topic; ;<password>— user password.
-
Install the confluent-kafka library:
pip install confluent-kafka -
Use the following connection example for a consumer:
import confluent_kafkadef error_callback(err):raise errconsumer = confluent_kafka.Consumer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_PLAINTEXT","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","group.id": "example","auto.offset.reset": "earliest","enable.auto.commit": False,"error_cb": error_callback,})consumer.subscribe(["<topic_name>"])while True:record = consumer.poll(timeout=1.0)if record is not None:if record.error():if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")elif record.error():raise confluent_kafka.KafkaException(record.error())else:print(record.value())consumer.close()Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<topic_name>— topic name.
-
Use the following connection example for a producer:
import confluent_kafkadef error_callback(err):raise errproducer = confluent_kafka.Producer({"bootstrap.servers": "<host>:<port>","security.protocol": "SASL_PLAINTEXT","sasl.mechanism": "SCRAM-SHA-512","sasl.username": "<user_name>","sasl.password": "<password>","error_cb": error_callback,})producer.produce("<topic_name>", "message")producer.flush(60)Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— database user name; ;<password>— user password; ;<topic_name>— topic name.
-
Install the kafka-python library:
pip install kafka-python -
Use the following connection example for a consumer:
import kafkaconsumer = kafka.KafkaConsumer("<topic_name>",bootstrap_servers="<host>:<port>",security_protocol="SASL_PLAINTEXT",sasl_mechanism="SCRAM-SHA-512",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)for record in consumer:print(record)Specify:
<topic_name>— topic name<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password.
-
Use the following connection example for a producer:
import kafkaproducer = kafka.KafkaProducer(bootstrap_servers="<host>:<port>",security_protocol="SASL_PLAINTEXT",sasl_mechanism="SCRAM-SHA-512",sasl_plain_username="<user_name>",sasl_plain_password="<password>",)future = producer.send(topic="<topic_name>",value=b"message",)result = future.get(timeout=60)print(result)Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— database user name; ;<password>— user password; ;<topic_name>— topic name.
-
Create a file named
scram.go:scram.go
package mainimport ("crypto/sha256""crypto/sha512""github.com/xdg-go/scram")var (SHA256 scram.HashGeneratorFcn = sha256.NewSHA512 scram.HashGeneratorFcn = sha512.New)type XDGSCRAMClient struct {*scram.Client*scram.ClientConversationscram.HashGeneratorFcn}func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)if err != nil {return err}x.ClientConversation = x.Client.NewConversation()return nil}func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {response, err = x.ClientConversation.Step(challenge)return}func (x *XDGSCRAMClient) Done() bool {return x.ClientConversation.Done()} -
Use the following connection example for a consumer:
package mainimport ("fmt""os""os/signal""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Version = sarama.V3_3_1_0conf.Consumer.Return.Errors = trueconf.ClientID = "go_client"conf.Metadata.Full = trueconf.Net.SASL.Enable = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.Handshake = trueconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)conf.Net.TLS.Enable = falsemaster, err := sarama.NewConsumer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create consumer: ", err.Error())panic(err)}defer func() {if err := master.Close(); err != nil {panic(err)}}()consumer, err := master.ConsumePartition(TOPIC, 0, sarama.OffsetOldest)if err != nil {panic(err)}signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)doneCh := make(chan struct{})go func() {for {select {case err := <-consumer.Errors():fmt.Println(err)case msg := <-consumer.Messages():fmt.Println("Received messages", string(msg.Key), string(msg.Value))case <-signals:doneCh <- struct{}{}}}}()<-doneCh}Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<topic_name>— topic name.
-
Use the following connection example for a producer:
package mainimport ("fmt""strings""github.com/IBM/sarama")const BROKERS = "<host>:<port>"const USER = "<user_name>"const PASSWORD = "<password>"const TOPIC = "<topic_name>"func main() {brokers := BROKERSsplitBrokers := strings.Split(brokers, ",")conf := sarama.NewConfig()conf.Producer.RequiredAcks = sarama.WaitForAllconf.Producer.Return.Successes = trueconf.Version = sarama.V3_3_1_0conf.ClientID = "go_client"conf.Net.SASL.Enable = trueconf.Net.SASL.Handshake = trueconf.Net.SASL.User = USERconf.Net.SASL.Password = PASSWORDconf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {return &XDGSCRAMClient{HashGeneratorFcn: SHA512}}conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)conf.Net.TLS.Enable = falsesyncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)if err != nil {fmt.Println("Couldn't create producer: ", err.Error())panic(err)}msg := &sarama.ProducerMessage{Topic: TOPIC,Value: sarama.StringEncoder("message"),}_, _, err = syncProducer.SendMessage(msg)if err != nil {fmt.Println("Couldn't send message: ", err.Error())panic(err)}}Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<topic_name>— topic name.
-
Install the kafkajs library:
npm install kafkajs -
Use the following connection example for a consumer:
const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: false,sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const consumer = kafka.consumer({ groupId: 'example' });const run = async () => {await consumer.connect();await consumer.subscribe({topic: '<topic_name>',fromBeginning: true,});await consumer.run({eachMessage: async ({ message }) => {console.log({ value: message.value.toString() });},});};run();Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password; ;<topic_name>— topic name.
-
Use the following connection example for a producer:
const { Kafka } = require('kafkajs');const config = {brokers: ['<host>:<port>'],ssl: false,sasl: {mechanism: 'scram-sha-512',username: '<user_name>',password: '<password>',},};const kafka = new Kafka(config);const producer = kafka.producer();const run = async () => {await producer.connect();await producer.send({topic: '<topic_name>',messages: [{ value: 'message' }],});await producer.disconnect();};run();Specify:
<host>— node DNS address; ;<port>— connection port;;<user_name>— database user name; ;<password>— user password; ;<topic_name>— topic name.
-
Create a configuration file for Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>app</artifactId><packaging>jar</packaging><version>1.0</version><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.7</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.14.2</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency></dependencies><build><finalName>${project.artifactId}-${project.version}</finalName><sourceDirectory>src</sourceDirectory><resources><resource><directory>src</directory></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><goals><goal>attached</goal></goals><phase>package</phase><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>3.1.0</version><configuration><archive><manifest><mainClass>com.example.App</mainClass></manifest></archive></configuration></plugin></plugins></build></project> -
Use the following connection example for a consumer:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.kafka.clients.consumer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String deserializer = StringDeserializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put("group.id", "example");props.put("key.deserializer", deserializer);props.put("value.deserializer", deserializer);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);Consumer<String, String>consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(new String[] {TOPIC}));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record: records) {System.out.println(record.value());}}}}Specify:
<host>— node DNS address; ;<port>— connection port;;<topic_name>— topic name; ;<user_name>— user name with the consumer role who has access to the topic; ;<password>— user password.
-
Use the following connection example for a producer:
src/com/example/App.javapackage com.example;import java.util.*;import org.apache.kafka.common.*;import org.apache.kafka.common.serialization.StringSerializer;import org.apache.kafka.clients.producer.*;public class App {private static final String HOST = "<host>:<port>";private static final String TOPIC = "<topic_name>";private static final String USER = "<user_name>";private static final String PASSWORD = "<password>";public static void main(String[] args) {String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);String serializer = StringSerializer.class.getName();Properties props = new Properties();props.put("bootstrap.servers", HOST);props.put("acks", "all");props.put("key.serializer", serializer);props.put("value.serializer", serializer);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put("sasl.jaas.config", jaasCfg);Producer<String, String> producer = new KafkaProducer<>(props);try {producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();producer.flush();producer.close();} catch (Exception ex) {System.out.println(ex);producer.close();}}}Specify:
<host>— node DNS address; ;<port>— connection port;;<topic_name>— topic name; ;<user_name>— database user name; ;<password>— user password.
-
Build and run the application:
mvn clean packagejava -jar target/app-1.0-jar-with-dependencies.jar