添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
// 在容器中注入 SimpleRabbitListenerContainerFactory @Bean ( "customContainerFactory" ) public SimpleRabbitListenerContainerFactory customContainerFactory ( SimpleRabbitListenerContainerFactoryConfigurer configurer , ConnectionFactory connectionFactory ) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory ( ) ; factory . setConcurrentConsumers ( 4 ) ; //设置线程数 factory . setMaxConcurrentConsumers ( 4 ) ; //最大线程数 configurer . configure ( factory , connectionFactory ) ; return factory ;

rabbitmq 配置 containerFactory 属性

      @RabbitListener(queues = "${custom.queue}", containerFactory = "customContainerFactory")
    public void publish(Entity dto) {

rabbitmq 执行流程

RabbitListenerEndpointRegistrar 实现 InitializingBean 接口,启动会自动被调用

org.springframework.beans.factory.InitializingBean#afterPropertiesSet方法

	@Override
	public void afterPropertiesSet() {
		registerAllEndpoints();

注册所有的端点

protected void registerAllEndpoints() {
		Assert.state(this.endpointRegistry != null, "No registry available");
		synchronized (this.endpointDescriptors) {
			for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
				if (descriptor.endpoint instanceof MultiMethodRabbitListenerEndpoint && this.validator != null) {
					((MultiMethodRabbitListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
				this.endpointRegistry.registerListenerContainer(// NOSONAR never null
						descriptor.endpoint, 
                    resolveContainerFactory(descriptor));
			this.startImmediately = true;  // trigger immediate startup
 
private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
		if (descriptor.containerFactory != null) {
			return descriptor.containerFactory;
		else if (this.containerFactory != null) {
			return this.containerFactory;
		else if (this.containerFactoryBeanName != null) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            // 得到注入容器中的  containerFactory
			this.containerFactory = this.beanFactory.getBean(
					this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
			return this.containerFactory;  // Consider changing this if live change of the factory is required
		else {
			throw new IllegalStateException("Could not resolve the " +
					RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
					descriptor.endpoint + "] no factory was given and no default is set.");
 
	protected void finishRefresh() {
		// Clear context-level resource caches (such as ASM metadata from scanning).
		clearResourceCaches();
		// Initialize lifecycle processor for this context.
		initLifecycleProcessor();
		// 这一步
		// Propagate refresh to lifecycle processor first.
		getLifecycleProcessor().onRefresh();
		// Publish the final event.
		publishEvent(new ContextRefreshedEvent(this));
		// Participate in LiveBeansView MBean, if active.
		if (!NativeDetector.inNativeImage()) {
			LiveBeansView.registerApplicationContext(this);

org.springframework.context.support.DefaultLifecycleProcessor#startBeans

private void startBeans(boolean autoStartupOnly) {
		Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
		Map<Integer, LifecycleGroup> phases = new TreeMap<>();
		lifecycleBeans.forEach((beanName, bean) -> {
			if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
				int phase = getPhase(bean);
				phases.computeIfAbsent(
						phase,
						p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)
				).add(beanName, bean);
		if (!phases.isEmpty()) {
		//执行start
			phases.values().forEach(LifecycleGroup::start);

org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup#start

public void start() {
			if (this.members.isEmpty()) {
				return;
			if (logger.isDebugEnabled()) {
				logger.debug("Starting beans in phase " + this.phase);
			Collections.sort(this.members);
			for (LifecycleGroupMember member : this.members) {
				doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
 
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
		Lifecycle bean = lifecycleBeans.remove(beanName);
		if (bean != null && bean != this) {
			String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
			for (String dependency : dependenciesForBean) {
				doStart(lifecycleBeans, dependency, autoStartupOnly);
			if (!bean.isRunning() &&
					(!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
				if (logger.isTraceEnabled()) {
					logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
				try {
                    // 调用start
					bean.start();
				catch (Throwable ex) {
					throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
				if (logger.isDebugEnabled()) {
					logger.debug("Successfully started bean '" + beanName + "'");
                    rabbitmq 配置 containerFactory  属性@RabbitListener(queues = "activity_queue" ,containerFactory = "simpleRabbitListenerContainerFactory")rabbitmq 执行流程RabbitListenerEndpointRegistrar 实现 InitializingBean 接口,启动会自动被调用org.springframework.beans.factory.Initia
poll consumer,即拉模式,消费者主动去消息队列拉取消息。
push consumer,即推模式,消息队列主动往消费者推送消息。
一. 消费者通过推(PUSH)方式获取消息
实现push模式最简单的方式就是使用@EnableRabbit+@RabbitListener注解来指定某方法作为消息消费的方法。例如监听某个Queue的方法。
配置RabbitListenerContainerFactory
这个bean只会在consumer端通过@RabbitListene
RabbitAutoConfiguration类是springboot的自动配置类。
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class...
Broker 可以理解为消息队列服务器的实体,它是一个中间应用,负责接收生产者的消息,和将消息发送给消息的接收者或其他的Broker
Exchange 消息交换机,是消息第一个到达的地方,消息通过他指定的路由规则,分发到不同的消息队列中去
Queue 消息队列,是消息通过发送和路由之后到达的地方,达到Queue的消息进入逻辑上的等待消费状态,每个消息都会发送到一个或多个队列中。
Binding 绑定,它的作是把Exchange和Queue按路由规则进行绑
				
高级消息队列协议(AMQP)是面向消息中间件的平台无关的有线协议。Spring AMQP项目将核心Spring概念应用于基于AMQP的消息传递解决方案的开发。Spring Boot为通过RabbitMQ使用AMQP提供了多种便利,包括Spring Boot starter AMQP“starter”。RabbitMQ是一个基于AMQP协议的轻量级、可靠、可扩展和可移植的消息代理。Spring使用RabbitMQ通过AMQP协议进行通信。Properties 或者,您可以使用addresses属性配置相同的连
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it. at org.springframework.amqp.rab
@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列或者Binding。使用@RabbitListener可以设置一个自己明确默认值的RabbitListenerContainerFactory对象。 可以在配置文件中设置RabbitListenerAnnotationBeanPostProcessor并通过<rabbit:annotation-driven/>来设置@RabbitListener的执行,当然也可以通过@EnableRabbit注解来.
SpringBoot报错Circular view path [index]: would dispatch back to the current handler URL [/index] agai 19522
Failed to execute goal org.apache.maven.plugins:maven-install-plugin:2.4:install (default-install) qq_37685550: 很赞,已经解决了问题,删了之后再启动,然后另一个目录下这个文件又报错,继续删,就行了,反正那个文件报错就直接删。表情包 AndroidStudio 执行模拟器报错 Emulator: Process finished with exit code -1073741819 (0xC0000005) qq_45372239: 有用!找了一天 md HashMap 和 ConcurrentHashMap 源码变化分析 川越人海: 把源代码拿出来翻译一遍

org.springframework.context.support.DefaultLifecycleProcessor#doStart

spring执行到 org.springframework.context.support.AbstractApplicationContext#finishRefresh 方法

org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar#resolveContainerFactory