添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
public class KafkaConsumer { @KafkaListener(topicPattern = "${spring.kafka.topics}",groupId = "${spring.kafka.consumer.group-id}") public void consumer(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { try { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { String msg = String.valueOf(message.get()); log.info("topic:{},is consumed success,topic:{}" + topic + ",msg:" + msg); } catch (Exception e) { log.error("topic:{},is consumed error:{}",topic,e.getMessage()); } finally { ack.acknowledge();

方式二:使用@KafkaListener的topics属性,此时配置的是数组列表,注意用英文逗号分隔

1.引入依赖,同方式一

2.配置yaml文件

spring:
  kafka:
    topics: test1,test2,test3

3.消费者监听

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "#{'${spring.kafka.topics}'.split(',')}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumer(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        try {
            Optional message = Optional.ofNullable(record.value());
            if (message.isPresent()) {
                String msg = String.valueOf(message.get());
                log.info("Received topic:{},msg:{}" , topic,msg);
        } catch (Exception e) {
            log.error("topic:{},is consumed error:{}", topic, e.getMessage());
        } finally {
            ack.acknowledge();
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependenc
本项目商品详情多级缓存架构项目
缓存数据生产服务的工作流程分析
(1)监听多个kafka topic,每个kafka topic对应一个服务(简化一下,监听一个kafka topic)
(2)如果一个服务发生了数据变更,那么就发送一个消息到kafka topic中
(3)缓存数据生产服务监听到了消息以后,就发送请求到对应的服务中调用接口以及拉取数据,此时是从mysql中查询的
(4)缓存数据生产服务拉取到了数据之后,会将数据在本地缓存中写入一份,就是ehcache中
(5)同时会将数据在redis中写入一份
测试1(ehcache):
多级缓存架构
1、LRU算法概述
redis默认情况下就是使用LRU策略的,因为内存是有限的,但是如果你不断地往redis里面写入数据,那肯定是没法存放下所有的数据在内存的
所以redis默认情况下,当内存中写入的数据很满之后,就会使
                                    xlog功能
实时收集指定日志文件内容,发送到sources类型为netcat的flume端(理论上支持所有以socket形式监听的日志收集服务端)
可以满足单个日志文件的实时收集,可以配置nginx每个项目一个日志文件,每个日志文件一个单独的进程进行收集;并且对系统cpu、io、带宽、内存占用极低.
安装xlog
git clone https://github.com/qidasheng/xlog.git
脚本一键安装
/bin/sh run.sh
一步一步安装
./configure   
make install
make clean
make uninstall
根目录xlog.conf有注解
xlog -c xlog.conf 
daemonize = yes    
xlog -c xlog.co
什么是牛?!
 YAKS是一个框架,可在Kubernetes上启用Cloud Native BDD测试! 这里的Cloud Native意味着您的测试将作为Kubernetes POD执行。
 作为用户,您可以通过在您最喜欢的基于Kubernetes的云提供商上创建Test自定义资源来运行测试。 一旦安装了YAKS操作员,它将监听自定义资源并自动准备测试运行时,该运行时作为云基础结构的一部分运行测试。
 YAKS中的测试遵循BDD(行为驱动开发)概念,并表示使用语法编写的功能规范。
 YAKS作为框架,提供了一组预定义的步骤,可帮助您连接不同的消息传递(Http REST,JMS,Kafka,Knative事件),并使用标头和正文内容的断言来验证消息数据。
 YAKS在之上添加了其功能,以作为客户端和/或服务器连接到不同的端点。
 在阅读有关YAKS的更多信息
假设您有一个K
                                    文章目录spring kafka 动态创建 topic 监听问题分析需要解决的问题解决方案Consumer  topickafak 通用配置 设置timer 触发实现ConsumerMessageHelper  业务处理运行效果输出
spring kafka 动态创建 topic 监听
spring boot 已经对kafka 进行了很好的封装集成,只需要早配置文件中配置相应的配置参数,再配合
@KafkaListener 注解即可监听kafka 消息,但如果想动态监听某一类消息而不是固定的某几个
                                    首先分析了@KafkaListener的原理,并从其原理入手,通过读取配置,实例化KafkaMessageListenerContainer并调用其start()方法,实现动态kafka topic的监听
topics: "admin,login,client"
@KafkaListener(topics = "#{'${topics}'.split(',')}",concurrency = "#{'${topics}'.split(',').length}")
2:在配置类中获取
生产者命令
kafka-topics.sh --zookeeper本地主机:2181-列表
kafka-topics.sh --create --topic event-input -zookeeper localhost:2181 --replication-factor 1 --partitions 1
 kafka-console-producer.sh --broker-list localhost:9092 --topic事件输入
{“ bookId”:1,“ bookName”:“ Kafka”,“ bookAuthor”:“ Sameer”,“ is
                                    springboot+kafka中@KafkaListener如何动态指定多个topic
本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的独特消费注解@KafkaListener
首先,application.properties中配置用逗号隔开的多个topic。
运行程序,console打印的效果如下:
因为只开了一条消费者线程,所...