package com.ynxbd.push.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 李进才 * @ClassName RabbitMQConfig * @Description TODO * @date 2024/01/23 13:28:00 */ @Configuration @EnableRabbit @Slf4j public class RabbitMQConfig { @Autowired private RabbitTemplate rabbitTemplate; @Bean public AmqpTemplate amqpTemplate(){ rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); })); //开启消息确认 yml 需要配置 publisher-returns: true rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) ->{ if (ack) { assert correlationData != null; log.info("消息发送到交换机成功,correlationId:{}",correlationData.getId()); } else { log.info("消息发送到交换机失败,原因:{}",cause); } } )); return rabbitTemplate; } /** * 声明直连交换机 支持持久化. * @return the exchange */ @Bean("wxPushExchange") public Exchange wxPushExchange() { return ExchangeBuilder.directExchange("wxPushExchange").durable(true).build(); } @Bean("wxPushQueue") public Queue wxPushQueue(){ return new Queue("wxPushQueue", true, true, true); } @Bean public Binding wxPushBinding(){ return BindingBuilder.bind(wxPushQueue()).to(wxPushExchange()).with("wxPushRouting").noargs(); } }