本文共 5678 字,大约阅读时间需要 18 分钟。
,那么对于老旧得项目又如何使用呢?这边文章就来讲讲spring如何整合rabbitmq
如果大家是maven项目当然就方便很多
https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit
到maven仓库直接找spring-rabbit包,我这边旧项目使用得是足够旧得spring3.1.2所以我这边选了个足够旧得包,担心新包可能会存在版本上得问题。大家自己也可以试试不同版本得包看看会不会踩到坑。pom文件添加
org.springframework.amqp spring-rabbit 1.2.0.RELEASE
spring-rabbit1.2.0支持spring3.1.4
如果你不是maven项目,那么你就顺着 spring-rabbit 得依赖列表一个个将相关联得引用包找出来引用进去了。恰好我这边是个足够旧得web项目,不是maven项目,但是我走了个捷径,建个空得maven项目,将spring-rabbit 引入后它会自己将相关联得包下载好,我们只要等他下载好后,查看有哪些关联包直接copy出来就好了。具体留给大家去实施吧。
下面直接上代码
1、文件列表
RabbitExchangeConfig.java
package com.yzj.mq.rabbit;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @Auth yaozhongjie * @Date 2019/3/22 16:31 **/@Configurationpublic class RabbitExchangeConfig { @Bean public DirectExchange directExchange() { return new DirectExchange(RabbitConstans.EXCHANGE_DIRECT); } @Bean public Queue queue() { return new Queue(RabbitConstans.QUEUE_DIRECT); } @Bean public Binding binding() { return BindingBuilder.bind(this.queue()).to(this.directExchange()).with(RabbitConstans.ROUTING_KEY_DIRECT); }}
RabbitConfig.java
package com.yzj.mq.rabbit;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;/** * @Auth yaozhongjie * @Date 2019/3/22 14:43 **/@Configurationpublic class RabbitConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1",5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); //必须要设置 return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate amqpTemplate(){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory()); return rabbitTemplate; }}
RabbitConstans.java
package com.yzj.mq.rabbit;/** * @Auth yaozhongjie * @Date 2019/3/22 15:45 **/public class RabbitConstans { public static final String EXCHANGE_DIRECT="direct.exchange"; public static final String QUEUE_DIRECT="direct.queue1"; public static final String ROUTING_KEY_DIRECT="routing.key.direct";}
DirectSender.java
package com.yzj.mq.rabbit;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.UUID;/** * @Auth yaozhongjie * @Date 2019/3/22 9:42 **/@Componentpublic class DirectSender implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void send(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(RabbitConstans.EXCHANGE_DIRECT, RabbitConstans.ROUTING_KEY_DIRECT, content, correlationId); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容 this.rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack) { System.out.println(" 回调id:" + correlationData); if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:"); } }}
DirectReciver.java
package com.yzj.mq.rabbit;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.AcknowledgeMode;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DirectReciver{ @Autowired private RabbitConfig rabbitConfig; @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitConfig.connectionFactory()); container.setQueueNames(RabbitConstans.QUEUE_DIRECT); container.setExposeListenerChannel(true); container.setConcurrentConsumers(1); container.setPrefetchCount(1); container.setShutdownTimeout(3000); container.setRecoveryInterval(10000); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 } }); return container; }}
好了自己写个单元测试试试看吧
转载地址:http://sxkfb.baihongyu.com/