Подключиться к кластеру
К кластеру Kafka можно подключиться по DNS-адресу и IP-адресу.
Мы рекомендуем подключаться по DNS-адресу, потому что DNS-адреса соответствуют ролям нод и ведут на актуальные IP-адреса мастера и реплик. IP-адреса соответствуют конкретным нодам. В случае недоступности мастера одна из реплик возьмет на себя его роль, IP-адрес мастера изменится, и подключение по IP перестанет работать.
Если кластер подключен к приватной подсети и вы хотите работать с ним через DNS, подключите подсеть кластера к облачному роутеру с доступом к внешней сети. Используйте инструкцию Настроить доступ в интернет через облачный роутер.
Публичный IP-адрес использовать нельзя.
Порты
Для подключения к Kafka используйте порты:
- 9092 — порт для подключения без SSL-сертификата;
- 9093 — порт для подключения с SSL-сертификатом.
Способы подключения
Посмотреть адрес для подключения
- В панели управления перейдите в раздел Облачная платформа → Базы данных.
- Откройте страницу кластера баз данных → вкладка Подключение.
- В блоке Адреса для подключения посмотрите адрес.
Подключиться с SSL
Подключение с использованием TLS/SSL шифрования обеспечивает безопасное соединение между вашим сервером и кластером баз данных.
Bash
Python (confluent-kafka)
Python (kafka-python)
Node.js
Java
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.servercore.com/CA.pem -O ~/.kafka/root.crt
chmod 600 ~/.kafka/root.crt -
Используйте пример подключения для консьюмера:
kcat -C \
-b <host>:9093 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X ssl.ca.location=~/.kafka/root.crtУкажите:
<host>
— DNS-адрес ноды;<topic_name>
— имя топика;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя.
-
Используйте пример подключения для продюсера:
kcat -C \
-b <host>:9093 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X ssl.ca.location=~/.kafka/root.crtУкажите:
<host>
— DNS-адрес ноды;<topic_name>
— имя топика;<user_name>
— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>
— пароль пользователя.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.servercore.com/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Установите библиотеку confluent-kafka:
pip install confluent-kafka
-
Используйте пример под ключения для консьюмера:
import confluent_kafka
def error_callback(err):
raise err
consumer = confluent_kafka.Consumer({
"bootstrap.servers": "<host>:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"group.id": "example",
"ssl.ca.location": "<full_path_to_root_certificate>",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"error_cb": error_callback,
})
consumer.subscribe(["<topic_name>"])
while True:
record = consumer.poll(timeout=1.0)
if record is not None:
if record.error():
if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
elif record.error():
raise confluent_kafka.KafkaException(record.error())
else:
print(record.value())
consumer.close()Укажите:
<host>
— DNS-адрес ноды;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя;<full_path_to_root_certificate>
— полный путь до корневого сертификата;<topic_name>
— имя топика.
-
Используйте пример подключения для продюсера:
import confluent_kafka
def error_callback(err):
raise err
producer = confluent_kafka.Producer({
"bootstrap.servers": "<host>:9093",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"ssl.ca.location": "<full_path_to_root_certificate>",
"error_cb": error_callback,
})
producer.produce("<topic_name>", "message")
producer.flush(60)Укажите:
<host>
— DNS-адрес ноды;<user_name>
— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>
— пароль пользователя;<full_path_to_root_certificate>
— полный путь до корневого сертификата;<topic_name>
— имя топика.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.servercore.com/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Установите библиотеку kafka-python:
pip install kafka-python
-
Используйте пример подключения для консьюмера:
import kafka
consumer = kafka.KafkaConsumer(
"<topic_name>",
bootstrap_servers="<host>:9093",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
ssl_cafile="<full_path_to_root_certificate>",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
for record in consumer:
print(record)Укажите:
<topic_name>
— имя топика<host>
— DNS-адрес ноды;<full_path_to_root_certificate>
— полный путь до корневого сертификата;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя.
-
Используйте пример подключения для продюсера:
import kafka
producer = kafka.KafkaProducer(
bootstrap_servers="<host>:9093",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
ssl_cafile="<full_path_to_root_certificate>",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
future = producer.send(
topic="<topic_name>",
value=b"message",
)
result = future.get(timeout=60)
print(result)Укажите:
<host>
— DNS-адрес ноды;<full_path_to_root_certificate>
— полный путь до корневого сертификата;<user_name>
— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>
— пароль пользователя;<topic_name>
— имя топика.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.servercore.com/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Установите библиотеку kafkajs:
npm install kafkajs
-
Используйте пример подключения для консьюмера:
const fs = require('fs');
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9093'],
ssl: {
ca: [fs.readFileSync('<full_path_to_root_certificate>', 'utf-8')],
},
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const consumer = kafka.consumer({ groupId: 'example' });
const run = async () => {
await consumer.connect();
await consumer.subscribe({
topic: '<topic_name>',
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ message }) => {
console.log({ value: message.value.toString() });
},
});
};
run();Укажите:
<host>
— DNS-адрес ноды;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя;<full_path_to_root_certificate>
— полный путь до корневого сертификата;<topic_name>
— имя топика.
-
Используйте пример подключения для продюсера:
const fs = require('fs');
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9093'],
ssl: {
ca: [fs.readFileSync('<full_path_to_root_certificate>', 'utf-8')],
},
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: '<topic_name>',
messages: [{ value: 'message' }],
});
await producer.disconnect();
};
run();Укажите:
<host>
— DNS-адрес ноды;<full_path_to_root_certificate>
— полный путь до корневого сертификата;<user_name>
— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>
— пароль пользователя;<topic_name>
— имя топика.
-
Скачайте корневой сертификат и поместите его в папку
~/.kafka/
:mkdir -p ~/.kafka/
wget https://storage.dbaas.servercore.com/CA.pem -O ~/.kafka/root.crt
chmod 0600 ~/.kafka/root.crt -
Перейдите в каталог, где будет располагаться хранилище сертификатов Java:
cd /etc/security
-
Добавьте SSL-сертификат в хранилище доверенных сертификатов Java (Java Key Store):
keytool -importcert -alias RootCA -file ~/.kafka/root.crt -keystore ssl -storepass <keystore_password> --noprompt
Укажите
<keystore_password>
— пароль хранилища для дополнительной защиты. -
Создайте конфигурационный файл для Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>app</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-${project.version}</finalName>
<sourceDirectory>src</sourceDirectory>
<resources>
<resource>
<directory>src</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>attached</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project> -
Используйте пример подключения для консьюмера:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;
public class App {
private static final String HOST = "<host>:9093";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
private static final String TS_FILE = "/etc/security/ssl";
private static final String TS_PASSWORD = "<keystore_password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);
String deserializer = StringDeserializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("group.id", "example");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
props.put("ssl.truststore.location", TS_FILE);
props.put("ssl.truststore.password", TS_PASSWORD);
Consumer<String, String>consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.println(record.value());
}
}
}
}Укажите:
<host>
— DNS-адрес ноды;<topic_name>
— имя топика;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя;<keystore_password>
— пароль хранилища.
-
Используйте пример подключения для продюсера:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;
public class App {
private static final String HOST = "<host>:9093";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
private static final String TS_FILE = "/etc/security/ssl";
private static final String TS_PASSWORD = "<keystore_password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);
String serializer = StringSerializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "all");
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
props.put("ssl.truststore.location", TS_FILE);
props.put("ssl.truststore.password", TS_PASSWORD);
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();
producer.flush();
producer.close();
} catch (Exception ex) {
System.out.println(ex);
producer.close();
}
}
}Укажите:
<host>
— DNS-адрес ноды;<topic_name>
— имя топика;<user_name>
— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>
— пароль пользователя;<keystore_password>
— пароль сертификата.
-
Соберите и запустите приложение:
mvn clean package
java -jar target/app-1.0-jar-with-dependencies.jar
Подключиться без SSL
Bash
Python (confluent-kafka)
Python (kafka-python)
Node.js
Java
-
Откройте CLI.
-
Используйте пример подключения для консьюмера:
kcat -C \
-b <host>:9092 \
-t <topic_name> \
-X sasl.username=<user_name> \
-X sasl.password=<password> \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanisms=SCRAM-SHA-512Укажите:
<host>
— DNS-адрес ноды;<topic_name>
— имя топика;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя.
-
Используйте пример подключения для продюсера:
kcat -P \
-b <host>:9092 \
-t <topic_name> \
-X sasl.username=<user> \
-X sasl.password=<password> \
-X security.protocol=SASL_PLAINTEXT \
-X sasl.mechanisms=SCRAM-SHA-512Укажите:
<host>
— DNS-адрес ноды;<topic_name>
— имя топика;<user_name>
— имя пользователя с ролью продюсер, который имеет доступ к топику;<password>
— пароль пользователя.
-
Установите библиотеку confluent-kafka:
pip install confluent-kafka
-
Используйте пример подключения для консьюмера:
import confluent_kafka
def error_callback(err):
raise err
consumer = confluent_kafka.Consumer({
"bootstrap.servers": "<host>:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"group.id": "example",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"error_cb": error_callback,
})
consumer.subscribe(["<topic_name>"])
while True:
record = consumer.poll(timeout=1.0)
if record is not None:
if record.error():
if record.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
print(f"{record.topic()} [{record.partition()}]: reached end at offset {record.offset()}")
elif record.error():
raise confluent_kafka.KafkaException(record.error())
else:
print(record.value())
consumer.close()Укажите:
<host>
— DNS-адрес ноды;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя;<topic_name>
— имя топика.
-
Используйте пример подключения для продюсера:
import confluent_kafka
def error_callback(err):
raise err
producer = confluent_kafka.Producer({
"bootstrap.servers": "<host>:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "<user_name>",
"sasl.password": "<password>",
"error_cb": error_callback,
})
producer.produce("<topic_name>", "message")
producer.flush(60)Укажите:
<host>
— DNS-адрес ноды;<user_name>
— имя пользователя базы данных;<password>
— пароль пользователя;<topic_name>
— имя топика.
-
Установите библиотеку kafka-python:
pip install kafka-python
-
Используйте пример подключения для консьюмера:
import kafka
consumer = kafka.KafkaConsumer(
"<topic_name>",
bootstrap_servers="<host>:9092",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
for record in consumer:
print(record)Укажите:
<topic_name>
— имя топика<host>
— DNS-адрес ноды;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя.
-
Используйте пример подключения для продюсера:
import kafka
producer = kafka.KafkaProducer(
bootstrap_servers="<host>:9092",
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="<user_name>",
sasl_plain_password="<password>",
)
future = producer.send(
topic="<topic_name>",
value=b"message",
)
result = future.get(timeout=60)
print(result)Укажите:
<host>
— DNS-адрес ноды;<user_name>
— имя пользователя базы данных;<password>
— пароль пользователя;<topic_name>
— имя топика.
-
Установите библиотеку kafkajs:
npm install kafkajs
-
Используйте пример подключения для консьюмера:
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9092'],
ssl: false,
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const consumer = kafka.consumer({ groupId: 'example' });
const run = async () => {
await consumer.connect();
await consumer.subscribe({
topic: '<topic_name>',
fromBeginning: true,
});
await consumer.run({
eachMessage: async ({ message }) => {
console.log({ value: message.value.toString() });
},
});
};
run();Укажите:
<host>
— DNS-адрес ноды;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя;<topic_name>
— имя топика.
-
Используйте пример подключения для продюсера:
const { Kafka } = require('kafkajs');
const config = {
brokers: ['<host>:9092'],
ssl: false,
sasl: {
mechanism: 'scram-sha-512',
username: '<user_name>',
password: '<password>',
},
};
const kafka = new Kafka(config);
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: '<topic_name>',
messages: [{ value: 'message' }],
});
await producer.disconnect();
};
run();Укажите:
<host>
— DNS-адрес ноды;<user_name>
— имя пользователя базы данных;<password>
— пароль пользователя;<topic_name>
— имя топика.
-
Создайте конфигурационный файл для Maven:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>app</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-${project.version}</finalName>
<sourceDirectory>src</sourceDirectory>
<resources>
<resource>
<directory>src</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>attached</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.App</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project> -
Используйте пример подключения для консьюмера:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.clients.consumer.*;
public class App {
private static final String HOST = "<host>:9092";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);
String deserializer = StringDeserializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("group.id", "example");
props.put("key.deserializer", deserializer);
props.put("value.deserializer", deserializer);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
Consumer<String, String>consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(new String[] {TOPIC}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record: records) {
System.out.println(record.value());
}
}
}
}Укажите:
<host>
— DNS-адрес ноды;<topic_name>
— имя топика;<user_name>
— имя пользователя с ролью консьюмер, который имеет доступ к топику;<password>
— пароль пользователя.
-
Используйте пример подключения для продюсера:
src/com/example/App.java
package com.example;
import java.util.*;
import org.apache.kafka.common.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.*;
public class App {
private static final String HOST = "<host>:9092";
private static final String TOPIC = "<topic_name>";
private static final String USER = "<user_name>";
private static final String PASSWORD = "<password>";
public static void main(String[] args) {
String jaasCfg = String.format("org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";", USER, PASSWORD);
String serializer = StringSerializer.class.getName();
Properties props = new Properties();
props.put("bootstrap.servers", HOST);
props.put("acks", "all");
props.put("key.serializer", serializer);
props.put("value.serializer", serializer);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", jaasCfg);
Producer<String, String> producer = new KafkaProducer<>(props);
try {
producer.send(new ProducerRecord<String, String>(TOPIC, "key", "message")).get();
producer.flush();
producer.close();
} catch (Exception ex) {
System.out.println(ex);
producer.close();
}
}
}Укажите:
<host>
— DNS-адрес ноды;<topic_name>
— имя топика;<user_name>
— имя пользователя базы данных;<password>
— пароль пользователя.
-
Соберите и запустите приложение:
mvn clean package
java -jar target/app-1.0-jar-with-dependencies.jar