(原)Springboot+[消息中间件一]RabbitMQ实现消息队列+发送邮箱
主页:写程序的小王叔叔的博客欢迎来访
支持:点赞收藏关注
社区:JAVA全栈进阶学习社区欢迎加入
目录
一、效果
二、RMQ可以实现的功能
2.1消息中间件:
2.2rmq安装
2.3含义
2.4原理
三、SpringBoot + RMQ集成项目消息队列及聊天功能
3.1RMQ配置
3.1.1pom.xml(公共文件配置)
3.1.2spring.xml(公共文件配置)
3.2Java RMQ类代码
3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)
3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)
3.2.3使用交换机 topicExchange: 通配符方式分发消息-(订阅)
四、解决
4.1 工具安装好后,guest/guest登录失败如何解决:
4.2什么时间让它消费,什么时间手动消费(手动消费:不消费永远都在rmqp中保留)
五、SpringBoot + Mail集成实现邮件发送
5.1QQ邮箱授权码获取
5.2邮箱配置
一、效果
二、RMQ可以实现的功能
【介绍】:集合了网上各种大佬的教学一起整理
2.1消息中间件:
2.2rmq安装
windows下 安装 rabbitMQ 及操作常用命令(操作创建用户密码 角色等)_wcy10086的博客-CSDN博客
windows10环境下的RabbitMQ安装步骤(图文) - 清明-心若淡定 - 博客园
2.3含义
2.4原理
三、SpringBoot + RMQ集成项目消息队列及聊天功能
【实现】:根据各位大佬的整理的原理,我们自己实现下如何使用吧
3.1RMQ配置
在rmq安装成功之后,浏览器输入http://localhost:15672,账号密码:guest/guest登录之后,给这个guest账号设置初始交换机(代码中默认设置的交换机,我的是:EXCHANGE_Member)的权限,这个问题注意下,要不一直提示:to exchange 'RabbitMQ_Exchange_Member' in vhost '/' refused for user 'guest', (这个错误找了半天并且找我们领导了,才知道的)
如下图:
3.1.1pom.xml(公共文件配置)
org.springframework.boot spring-boot-starter-amqp
3.1.2spring.xml(公共文件配置)
####################################RabbitMQ配置#################################################################spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000# 开启confirms回调 P -> Exchangespring.rabbitmq.publisher-confirms=true# 开启returnedMessage回调 Exchange -> Queuespring.rabbitmq.publisher-returns=true# 设置手动确认(ack) Queue -> Cspring.rabbitmq.listener.simple.acknowledge-mode=manualspring.rabbitmq.listener.simple.prefetch=100
3.2Java RMQ类代码
3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)
rmqpConfig.java-------rmqp基本配置
@Configurationpublic class RabbitConfig {private static final Logger LOGGER = LogManager.getLogger(RabbitConfig.class); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String vhost; public static final String EXCHANGE_Member = "RabbitMQ_Exchange_Member";//邮件:注册+登录public static final String QUEUE_Member = "RabbitMQ_Queue_Member";//邮件:注册+登录 public static final String ROUTINGKEY_Member = "RabbitMQ_RoutingKey_Member";//邮件:注册+登录//建立一个连接容器,类型数据库的连接池 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMandatory(true); template.setEncoding("UTF-8"); // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true template.setMandatory(true); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { LOGGER.info("消息成功消费"); } else { LOGGER.info("消息消费失败:" + cause); } }); template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); return template; } /** * 交换机针对消费者配置 * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 * DirectExchange:按照routingkey分发到指定队列,多关键字匹配 */ @Bean public DirectExchange directExchange() { return new DirectExchange(EXCHANGE_Member, true,false); } /** * 队列 * * @return */ @Bean public Queue directQueue() { return new Queue(QUEUE_Member, true); } /** * 绑定 * * @return */ @Bean public Binding directBinding() { return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTINGKEY_Member); } @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
RMQProducer.java ---------rmqp消息提供端/消息发送端
@Componentpublic class RMQProducer { private Logger LOGGER = LoggerFactory.getLogger(RMQProducer.class);private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Autowiredprivate RabbitTemplate rabbitTemplate; /*** * 延迟消息队列信息 * @param routingKeyName * @param msg */ public void sendMsg(String routingKeyName,String msg) { LOGGER.info("消息发送成功,routingKeyName: {},msg:{},时间:{}", routingKeyName,msg,sdf.format(new Date())); rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_Member, routingKeyName, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setContentEncoding("utf-8"); message.getMessageProperties().setExpiration("120000"); //设置消息存活时间 return message; } }); //rabbitTemplate.convertAndSend(routingKeyName, msg); }}
RMQReceiver.java--------------rmqp消费端
/*** * 消费者 * @author Administrator * */@Componentpublic class RMQReceiver implements ChannelAwareMessageListener{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitListener(queues = RabbitConfig.QUEUE_Member ) @RabbitHandler public void handler(Message message, Channel channel) throws IOException { logger.info("接收处理队列Member当中的消息: " + new String(message.getBody()) ); long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定 } }
3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)
rmqpConfig.java------------------rmqp配置类
@Configurationpublic class RabbitConfig {private static final Logger LOGGER = LogManager.getLogger(RabbitConfig.class); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String vhost; /*** * 交换机 */ public static final String EXCHANGE_Order = "RabbitMQ_Exchange_Order";// 下单 /** * 队列 */ public static final String QUEUE_Order = "RabbitMQ_Queue_Order";// 下单 public static final String QUEUE_Pay = "RabbitMQ_Queue_Pay";// 支付 /*** * 路由 */ public String ROUTINGKEY_Order = "RabbitMQ_RoutingKey_Order";// 下单 //建立一个连接容器,类型数据库的连接池 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMandatory(true); template.setEncoding("UTF-8"); // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true template.setMandatory(true); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { LOGGER.info("消息成功消费"); } else { LOGGER.info("消息消费失败:" + cause); } }); template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); return template; } /Fanout广播配置/// /*** * 交换机配置 * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_Order); } /** * 队列配置 * @return */ @Bean public Queue fanoutQueueOrder() { return new Queue(QUEUE_Order); } @Bean public Queue fanoutQueuePay() { return new Queue(QUEUE_Pay); } /*** * 绑定交换机和队列 * @return */ @Bean public Binding bindFanoutExchangeOrder() { return BindingBuilder.bind(fanoutQueueOrder()).to(fanoutExchange()); } @Bean public Binding bindFanoutExchangePay() { return BindingBuilder.bind(fanoutQueuePay()).to(fanoutExchange()); } // @Bean public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } }
RMQProducer.java ---------rmqp消息提供端/消息发送端
/*** * 提供者 * @author Administrator * */@Componentpublic class RMQProducer { private Logger LOGGER = LoggerFactory.getLogger(RMQProducer.class);private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Autowiredprivate RabbitTemplate rabbitTemplate; /*** * 延迟消息队列信息 * @param routingKeyName * @param msg * @param string */ public void sendMsgtoFound(String exchangeName , String msg) { LOGGER.info("消息发送成功,msg:{},时间:{}",msg,sdf.format(new Date())); rabbitTemplate.convertAndSend(exchangeName , msg); } }
RMQReceiver.java--------------rmqp消费端
/*** * 消费者 * @author Administrator * */@Componentpublic class RMQReceiver implements ChannelAwareMessageListener{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitListener(queues = RabbitConfig.QUEUE_Order ) @RabbitHandler public void handlerOrder(Message message, Channel channel) throws IOException { logger.info("接收处理队列QUEUE_Order当中的消息: " + new String(message.getBody()) ); long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定 }@RabbitListener(queues = RabbitConfig.QUEUE_Pay ) @RabbitHandler public void handlerPay(Message message, Channel channel) throws IOException { logger.info("接收处理队列QUEUE_Pay当中的消息: " + new String(message.getBody()) ); long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定 }/** * @param trim * @return */private Map mapStringToMap(String str) {str = str.substring(1, str.length() - 1); String[] strs = str.split(","); Map map = new HashMap(); for (String string : strs) { String key = string.split("=")[0].trim(); String value = string.split("=")[1]; map.put(key, value); }return map;}}
3.2.3使用交换机 topicExchange: 通配符方式分发消息-(订阅)
后期补充。。。。
四、解决
4.1 工具安装好后,guest/guest登录失败如何解决:
解决办法:执行如下命令
命令1:rabbitmqctl set_user_tags guest administrator
命令2:rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'
重启rabbitmq即可。
重启服务:
------------------------ 我是愉快的分割线 -----------------------------
停止:service rabbitmq-server stop
启动:service rabbitmq-server start
查看状态:service rabbitmq-server status
4.2什么时间让它消费,什么时间手动消费(手动消费:不消费永远都在rmqp中保留)
操作:目前的得到的方案是:将消费端的监听事件关闭,不用监听,则这样的消息会永远停留在rmqp的交换机-队列-路由中
/*** * 消费者 * @author Administrator * */@Componentpublic class RMQReceiver implements ChannelAwareMessageListener{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); //@RabbitListener(queues = RabbitConfig.QUEUE_Member ) //@RabbitHandler public void handler(Message message, Channel channel) throws IOException { logger.info("接收处理队列Member当中的消息: " + new String(message.getBody()) ); long deliveryTag = message.getMessageProperties().getDeliveryTag();// channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定 } }
如下图:
五、SpringBoot + Mail集成实现邮件发送
5.1QQ邮箱授权码获取
5.2邮箱配置
pom.xml
org.springframework.bootspring-boot-starter-mail
spring.xml--》配置邮箱
###############################Mail配置######################################################################QQ smtp.qq.com##sina smtp.sina.cn##aliyun smtp.aliyun.com##163 smtp.163.com#126邮箱SMTP服务器地址:smtp.126.com,端口号:465或者994#163邮箱SMTP服务器地址:smtp.163.com,端口号:465或者994#yeah邮箱SMTP服务器地址:smtp.yeah.net,端口号:465或者994##发送方spring.mail.host=smtp.qq.com##邮件地址spring.mail.from=1247622527@qq.com#用户名spring.mail.username=1247622527@qq.com##客户端授权码(不是邮箱密码,这个在qq邮箱设置里面自动生成的)spring.mail.password=----------------》》》邮箱的授权码#端口号465或587spring.properties.mail.smtp.port: 25##编码格式spring.mail.default-encoding=UTF-8spring.mail.properties.mail.smtp.auth=truespring.mail.properties.mail.smtp.starttls.enable=truespring.mail.properties.mail.smtp.starttls.required=truespring.mail.properties.mail.smtp.ssl.enable=true
sendMailUtil.java
@Componentpublic class SendMailUtil {private static final Logger LOGGER = LogManager.getLogger(SendMailUtil.class); @Autowired private JavaMailSender mailSender; //发送方邮件的发送地址 @Value("${spring.mail.host}") public static String sendMailHost; //发送方发送邮件的账号 @Value("${spring.mail.username}") public static String sendMailUsername; //发送方发送邮件的客户端授权码 @Value("${pring.mail.password}") public static String sendMailPassword; //发送方发送邮件的端口 @Value("${spring.properties.mail.smtp.port}") public static String sendMailPort; @Value("${spring.mail.from") public static String sendMailFrom; public static void sendSimpleMail(String to, String subject, String content) throws Exception{ //创建连接对象 连接到邮件服务器 Properties properties = new Properties(); //设置发送邮件的基本参数 //发送邮件服务器 properties.put("mail.smtp.host", sendMailHost); //发送端口 properties.put("mail.smtp.port", sendMailPort); properties.put("mail.smtp.auth", "true"); //设置发送邮件的账号和密码 Session session = Session.getInstance(properties, new Authenticator() { @Override protected PasswordAuthentication getPasswordAuthentication() { //两个参数分别是发送邮件的账户和密码 return new PasswordAuthentication(sendMailUsername,sendMailPassword); } }); //创建邮件对象 Message message = new MimeMessage(session); //设置发件人 message.setFrom(new InternetAddress(sendMailUsername)); //设置收件人 message.setRecipient(Message.RecipientType.TO,new InternetAddress(to)); //设置主题 message.setSubject(subject); // 设置邮件内容 message.setContent(content,"text/html;charset=UTF-8"); //发送一封邮件 Transport.send(message);}public void sendHtmlMail(String to, String subject, String content) { //获取MimeMessage对象 MimeMessage message = mailSender.createMimeMessage(); MimeMessageHelper messageHelper; try { messageHelper = new MimeMessageHelper(message, true); //邮件发送人 messageHelper.setFrom(sendMailFrom); //邮件接收人 messageHelper.setTo(subject); //邮件主题 message.setSubject(subject); //邮件内容,html格式 messageHelper.setText(content, true); //发送 mailSender.send(message); } catch (MessagingException e) { }}public void sendAttachmentsMail(String to, String subject, String content, String filePath) { MimeMessage message = mailSender.createMimeMessage(); try { MimeMessageHelper helper = new MimeMessageHelper(message, true); helper.setFrom(sendMailFrom); helper.setTo(to); helper.setSubject(subject); helper.setText(content, true); FileSystemResource file = new FileSystemResource(new File(filePath)); String fileName = filePath.substring(filePath.lastIndexOf(File.separator)); helper.addAttachment(fileName, file); mailSender.send(message); } catch (MessagingException e) { //日志信息 }}public static void main(String[] args) throws Exception {//sendSimpleMail("1901660505@qq.com","主题:邮箱注册","内容:这是一个邮件注册码,请输入:"+ IdGenerate.random2FiveId()) ;//System.exit(0);}}
业务代码.Java
@Overridepublic Object userSendNewMailCode(String memberId , String newMemberEmail) {Map mapMemberInfo = new HashMap(); if(!"".equals(memberId) && !"".equals(newMemberEmail)) {Member member = memberService.getOnlyOneMemberInfoByMemberId(memberId);String validEmail = IdGenerate.random2FiveId();//邮箱登录注册后,可绑定更改新邮箱if (member!=null && newMemberEmail.equals(member.getMemberEmail())) {try {SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:新邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ " + validEmail +" ] 填入邮箱注册码中!");} catch (Exception e) {e.printStackTrace();mapMemberInfo.put("exception", e.getMessage());}//邮箱未登录注册后,任意邮箱可绑定}else if(member!=null && !newMemberEmail.equals(member.getMemberEmail())){try {SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ " + validEmail +" ] 填入邮箱注册码中!");} catch (Exception e) {e.printStackTrace();mapMemberInfo.put("exception", e.getMessage());}}List memberInfo = new ArrayList();mapMemberInfo.put("newMemberEmail", newMemberEmail);mapMemberInfo.put("validEmail", validEmail);memberInfo.add(mapMemberInfo);return mapMemberInfo;}return mapMemberInfo;}
RabbitMQ教程(安装与使用详解,Spring集成)_程猿薇茑的博客-CSDN博客_rabbitmq 教程
消息中间件(一)MQ详解及四大MQ比较_jcpp9527的博客-CSDN博客_消息中间件
http://jm.taobao.org/2016/04/01/kafka-vs-rabbitmq-vs-rocketmq-message-send-performance/
RabbitMQ核心概念以及工作原理 - 简书
Springboot 整合RabbitMq ,用心看完这一篇就够了_小目标青年的博客-CSDN博客_springboot集成rabbitmq
Java SpringBoot集成RabbitMq实战和总结 - 远方789 - 博客园
以上是自己整理的,并测试过,可以直接用
文章中,有问题,可以在评论区评论
转载声明:本文为博主原创文章,未经博主允许不得转载
如果我的文章有帮助到您,欢迎打赏一下鼓励博主