在Java程序中获取Kafka的topic,我们通常使用Kafka客户端库,Kafka客户端库提供了一组API,用于与Kafka集群进行交互,以下是获取Kafka topic的步骤:
1、引入依赖
我们需要在项目中引入Kafka客户端库的依赖,以Maven为例,添加以下依赖到pom.xml文件中:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
2、创建Kafka生产者和消费者配置
接下来,我们需要创建一个Kafka生产者和消费者的配置对象,这些配置对象包含了与Kafka集群连接所需的信息,如bootstrap.servers、key.serializer、value.serializer等。
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.Collections; import java.util.Properties;
3、创建Kafka生产者和消费者实例
使用配置对象创建Kafka生产者和消费者实例。
Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
Properties consumerProps = new Properties(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
4、发送和接收消息
使用生产者实例发送消息,使用消费者实例接收消息。
// 发送消息 producer.send(new ProducerRecord<String, String>("test-topic", "key", "value"));
// 接收消息 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList("test-topic")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }
5、关闭生产者和消费者实例
在程序结束时,关闭生产者和消费者实例。
producer.close(); consumer.close();
通过以上步骤,我们可以在Java程序中获取Kafka的topic,下面是一些与本文相关的问题与解答:
问题1:如何在Java程序中创建多个Kafka生产者和消费者实例?
答:只需为每个生产者和消费者实例创建不同的配置对象,并使用这些配置对象创建实例即可,可以创建两个生产者实例,分别连接到不同的主题,同样,可以创建多个消费者实例,分别订阅不同的主题。
问题2:如何在Java程序中获取Kafka集群的详细信息?
答:可以使用KafkaAdminClient类来获取Kafka集群的详细信息,需要引入KafkaAdminClient相关的依赖,创建一个KafkaAdminClient实例,调用describeCluster方法来获取集群的详细信息,关闭KafkaAdminClient实例。
原创文章,作者:K-seo,如若转载,请注明出处:https://www.kdun.cn/ask/156882.html