添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

EmbeddedKafkaBroker

1. EmbeddedKafkaRule vs. EmbeddedKafkaBroker

EmbeddedKafkaRule: A TestRule wrapper around an EmbeddedKafkaBroker.
查看源码,发现他俩都是创建一些虚拟的broker给test使用的。

然后看到这样一段话,不过对我没有什么用:
TopicAlreadyExists exceptions :
Some suggestions:

  1. Since you are using JUnit5, don’t use the JUnit4 EmbeddedKafkaRule , use EmbeddedKafkaBroker instead; or simply add @EmbeddedKafka and the broker will be added as a bean to the Spring application context and its life cycle managed by Spring (use @DirtiesContext to destroy); for non-Spring tests, the broker will be created (and destroyed) by the JUnit5 EmbeddedKafkaCondition and is available via EmbeddedKafkaCondition.getBroker() .
  2. Don’t use explicit ports; let the broker use its default random port and use embeddedKafka.getBrokersAsString() for the bootstrap servers property.
  3. If you must manage the brokers yourself (in @BeforeAll), destroy() them in @AfterAll

1.1 EmbeddedKafkaRule

Spring-kafka-test provides an embedded Kafka broker. We can use a JUnit @ClassRule annotation to create this Kafka broker. This rule starts the Kafka and Zookeeper servers on a random port before the tests execute and shuts them down after the tests are complete. The embedded Kafka broker eliminates the need to have a real Kafka and Zookeeper instance running while running the test.

官方文档 : A JUnit 4 @Rule wrapper for the EmbeddedKafkaBroker is provided to create an embedded Kafka and an embedded Zookeeper server. (See @EmbeddedKafka Annotation for information about using @EmbeddedKafka with JUnit 5).

Starting with version 2.0, if you use Spring’s test application context caching, you can also declare a EmbeddedKafkaBroker bean, so a single broker can be used across multiple test classes. For convenience, we provide a test class-level annotation called @EmbeddedKafka to register the EmbeddedKafkaBroker bean . The following example shows how to use it:

@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
         topics = {
                 KafkaStreamsTests.STREAMING_TOPIC1,
                 KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    @Test
    public void someTest() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer<Integer, String> consumer = cf.createConsumer();
        this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
        ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
        assertThat(replies.count()).isGreaterThanOrEqualTo(1);
    @Configuration
    @EnableKafkaStreams
    public static class KafkaStreamsConfiguration {
        @Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
        private String brokerAddresses;
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration kStreamsConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
            return new KafkaStreamsConfiguration(props);

2. 我遇到的问题:

我并没有用@EmbeddedKafka,我是想自己注入一个embeddedkafkabroker bean的,本来我直接用:

private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, false, 3, "topic");

然后再config

Map<String, Object> props = new HashMap<>(KafkaTestUtils.consumerProps("groupId", "auto-commit: false", embeddedKafkaBroker));

发现不行,因为没有设置broker的properties,broker并没有被Inject。
所以得自己写一个embeddedKafkaBroker的函数,标注为bean,然后注入此test class。用到的是brokerProperties,有点类似
这里.:

@Bean
public EmbeddedKafkaBroker embeddedKafkaBroker() {
    return new EmbeddedKafkaBroker(1,false,2,"test-events")
        .brokerProperties(Collections.singletonMap(KafkaConfig.LogDirProp(), "/tmp/foo"));

3. @EnableKafka on @configuration file

  • help detect @KafkaListener
  • register embeddedkafkabroker bean
kafka 是一个消息队列产品,基于 Topic partitions 的设计,能达到非常高的消息发送处理性能。Spring 创建了一个项目 Spring-kafka,封装了 Apache 的 Kafka-client,用于在 Spring 项目里快速集成 kafka。除了简单的收发消息外,Spring-kafka 还提供了很多高级功能,下面我们就来一一探秘这些用法。 引入依... kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafka 线上会遇到哪些问题?kafk 内容概要: 本文介绍如何在 Spring Boot 3.X 利用 Kafka 实现高效的消息传递功能。Kafka 是一个分布式流处理平台,适用于实时数据流处理、日志收集与分析、事件驱动等场景。通过集成 Kafka,我们可以实现可靠的异步消息传递,提高系统的可伸缩性和可靠性。 使用场景: Kafka 在许多应用场景发挥着重要作用,例如实时数据分析、应用解耦、事件驱动架构等。通过使用 Spring Boot 3.X 和 Kafka,我们可以轻松构建强大的消息传递系统,满足各种复杂的业务需求。 实现步骤: 1.添加 Kafka 依赖。 2.配置 Kafka 连接。 3.创建消息生产者。 4.创建消息消费者。 5.发送和接收消息。 其他说明: 1.可以使用 Kafka 提供的分区和副本机制来实现高可用和负载均衡。 2.可以配置消息确认机制和错误处理策略,确保消息的可靠性传递。 3.Spring Boot 提供了与 Kafka 集成的工具和特性,如批量发送、事务支持等,可以根据实际需求进行配置和使用。 小编典典嵌入式Kafka测试适用于以下配置,测试课注释@EnableKafka@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config@EmbeddedKafka(partit... ​已经到了没有测试案例,就不会写代码的年纪了!不过好在,优秀的框架或组件总是会在介绍完主体功能之后,附带介绍如何进行测试。 然后,因为工作原因,又简单研究了下kafka的测试框架。其,最神奇的地方是,Spring团队为了便于测试,秉持着能内嵌一定内嵌的原则,搞了一套内嵌的zookeeper和kafka。这样在单元测试期间,就不用开发人员再准备相关环境了。 那么,下面来揭秘下具体是如何实现的: 1. 依赖 <dependency> <groupId>org.spri... 目录消息传递JMSActiveMQ 支持发送消息接收消息AMQPRabbitMQ 支持发送消息接收消息Apache Kafka 支持发送消息接收消息Kafka使用嵌入式 Kafka 进行测试Spring 集成 spring Framework 为与消息传递系统的集成提供了广泛的支持,从使用 JMS API 的简化使用JmsTemplate到异步接收消息的完整基础架构。Spring AMQP 为高级消息队列协议提供了类似的功能集。RabbitTemplateSpring Boot 还为Rabbi <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.11.RELEASE</version> kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。 除了简单的收发消息外,Spring-kafka还提供了很多高级功能,下面我们就来一一探秘这些用法。 kafka是一个消息队列产品,基于Topicpartitions的设计,能达到非常高的消息发送处理性能。Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。除了简单的收发消息外,Spring... 首先Maven引入&lt;dependency&gt; &lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt; &lt;artifactId&gt;spring-kafka&lt;/artifactId&gt; &lt;version&gt;2.1.7.RELEASE&lt;/version&gt; &lt;/depen Spring for Apache Kafka项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。 我们提供“模板”作为发送消息的高级抽象。 我们还为消息驱动的POJO提供支持。 2.什么是新的? 2.1。 2.2以来的新功能2.1 本节介绍从2.1版到2.2版所做的更改。 2.1.1。 Kafka客户端版本 此版本需要2.0.0 kafka-clients或更高...