> 文档中心 > (原)Springboot+[消息中间件一]RabbitMQ实现消息队列+发送邮箱

(原)Springboot+[消息中间件一]RabbitMQ实现消息队列+发送邮箱


主页:写程序的小王叔叔的博客欢迎来访

支持:点赞收藏关注

社区:JAVA全栈进阶学习社区欢迎加入

目录

一、效果

二、RMQ可以实现的功能

2.1消息中间件:

2.2rmq安装

2.3含义

2.4原理

三、SpringBoot +  RMQ集成项目消息队列及聊天功能

3.1RMQ配置

3.1.1pom.xml(公共文件配置)

3.1.2spring.xml(公共文件配置)

3.2Java RMQ类代码

3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)

3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)

3.2.3使用交换机 topicExchange: 通配符方式分发消息-(订阅)

四、解决

4.1 工具安装好后,guest/guest登录失败如何解决:

4.2什么时间让它消费,什么时间手动消费(手动消费:不消费永远都在rmqp中保留)

五、SpringBoot +  Mail集成实现邮件发送

5.1QQ邮箱授权码获取

5.2邮箱配置


一、效果

二、RMQ可以实现的功能

【介绍】:集合了网上各种大佬的教学一起整理

2.1消息中间件:

2.2rmq安装

windows下 安装 rabbitMQ 及操作常用命令(操作创建用户密码 角色等)_wcy10086的博客-CSDN博客

windows10环境下的RabbitMQ安装步骤(图文) - 清明-心若淡定 - 博客园

2.3含义

2.4原理

三、SpringBoot +  RMQ集成项目消息队列及聊天功能

【实现】:根据各位大佬的整理的原理,我们自己实现下如何使用吧

3.1RMQ配置

在rmq安装成功之后,浏览器输入http://localhost:15672,账号密码:guest/guest登录之后,给这个guest账号设置初始交换机(代码中默认设置的交换机,我的是:EXCHANGE_Member)的权限,这个问题注意下,要不一直提示:to exchange 'RabbitMQ_Exchange_Member' in vhost '/' refused for user 'guest', (这个错误找了半天并且找我们领导了,才知道的)

如下图:

3.1.1pom.xml(公共文件配置)

       org.springframework.boot     spring-boot-starter-amqp 

3.1.2spring.xml(公共文件配置)

####################################RabbitMQ配置#################################################################spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000# 开启confirms回调 P -> Exchangespring.rabbitmq.publisher-confirms=true# 开启returnedMessage回调 Exchange -> Queuespring.rabbitmq.publisher-returns=true# 设置手动确认(ack) Queue -> Cspring.rabbitmq.listener.simple.acknowledge-mode=manualspring.rabbitmq.listener.simple.prefetch=100

3.2Java RMQ类代码

3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)

rmqpConfig.java-------rmqp基本配置

@Configurationpublic class RabbitConfig {private static final Logger LOGGER = LogManager.getLogger(RabbitConfig.class);     @Value("${spring.rabbitmq.host}")    private String host;     @Value("${spring.rabbitmq.port}")    private int port;     @Value("${spring.rabbitmq.username}")    private String username; @Value("${spring.rabbitmq.password}")    private String password;    @Value("${spring.rabbitmq.virtual-host}")    private String vhost;    public static final  String EXCHANGE_Member = "RabbitMQ_Exchange_Member";//邮件:注册+登录public static final  String QUEUE_Member = "RabbitMQ_Queue_Member";//邮件:注册+登录    public static final  String ROUTINGKEY_Member = "RabbitMQ_RoutingKey_Member";//邮件:注册+登录//建立一个连接容器,类型数据库的连接池    @Bean    public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); connectionFactory.setPublisherConfirms(true); return connectionFactory;    } @Bean    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)    public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMandatory(true); template.setEncoding("UTF-8"); // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true template.setMandatory(true); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { LOGGER.info("消息成功消费");     } else {     LOGGER.info("消息消费失败:" + cause);     } }); template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {     String correlationId = message.getMessageProperties().getCorrelationIdString();     LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);   }); return template;    }     /**     * 交换机针对消费者配置     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念     * DirectExchange:按照routingkey分发到指定队列,多关键字匹配     */    @Bean    public DirectExchange  directExchange() {     return new DirectExchange(EXCHANGE_Member, true,false);    }     /**     * 队列     *     * @return     */    @Bean    public Queue directQueue() { return new Queue(QUEUE_Member, true);      }     /**     * 绑定     *     * @return     */    @Bean    public Binding directBinding() {     return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTINGKEY_Member);    } @Bean    public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter();    } }

RMQProducer.java ---------rmqp消息提供端/消息发送端

@Componentpublic class RMQProducer { private Logger LOGGER = LoggerFactory.getLogger(RMQProducer.class);private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Autowiredprivate RabbitTemplate rabbitTemplate;     /***     * 延迟消息队列信息     * @param routingKeyName     * @param msg     */     public void sendMsg(String routingKeyName,String msg) {    LOGGER.info("消息发送成功,routingKeyName: {},msg:{},时间:{}", routingKeyName,msg,sdf.format(new Date()));  rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_Member, routingKeyName, msg, new MessagePostProcessor() {      @Override      public Message postProcessMessage(Message message) throws AmqpException {   message.getMessageProperties().setContentEncoding("utf-8");   message.getMessageProperties().setExpiration("120000"); //设置消息存活时间   return message;      }  });  //rabbitTemplate.convertAndSend(routingKeyName, msg);     }}

RMQReceiver.java--------------rmqp消费端

/*** * 消费者 * @author Administrator * */@Componentpublic class RMQReceiver implements ChannelAwareMessageListener{     private final Logger logger = LoggerFactory.getLogger(this.getClass());     @RabbitListener(queues = RabbitConfig.QUEUE_Member )    @RabbitHandler    public void handler(Message message, Channel channel) throws IOException  {    logger.info("接收处理队列Member当中的消息: " + new String(message.getBody()) );    long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    } }

3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)

rmqpConfig.java------------------rmqp配置类

@Configurationpublic class RabbitConfig {private static final Logger LOGGER = LogManager.getLogger(RabbitConfig.class);     @Value("${spring.rabbitmq.host}")    private String host;     @Value("${spring.rabbitmq.port}")    private int port;     @Value("${spring.rabbitmq.username}")    private String username; @Value("${spring.rabbitmq.password}")    private String password;    @Value("${spring.rabbitmq.virtual-host}")    private String vhost;    /***     * 交换机     */     public static final String EXCHANGE_Order = "RabbitMQ_Exchange_Order";// 下单    /**     * 队列     */      public static final String QUEUE_Order = "RabbitMQ_Queue_Order";// 下单    public static final String QUEUE_Pay = "RabbitMQ_Queue_Pay";// 支付 /***     * 路由     */  public String ROUTINGKEY_Order = "RabbitMQ_RoutingKey_Order";// 下单     //建立一个连接容器,类型数据库的连接池    @Bean    public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); connectionFactory.setPublisherConfirms(true); return connectionFactory;    } @Bean    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)    public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMandatory(true); template.setEncoding("UTF-8"); // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true template.setMandatory(true); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { LOGGER.info("消息成功消费");     } else {     LOGGER.info("消息消费失败:" + cause);     } }); template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);    }); return template;    }     /Fanout广播配置///    /***     * 交换机配置     * @return     */    @Bean    public FanoutExchange fanoutExchange() {     return new FanoutExchange(EXCHANGE_Order);    } /**     * 队列配置     * @return     */    @Bean    public Queue fanoutQueueOrder() {     return new Queue(QUEUE_Order);    } @Bean    public Queue fanoutQueuePay() {     return new Queue(QUEUE_Pay);    } /***     * 绑定交换机和队列     * @return     */    @Bean    public Binding bindFanoutExchangeOrder() {    return BindingBuilder.bind(fanoutQueueOrder()).to(fanoutExchange());    } @Bean    public Binding bindFanoutExchangePay() {   return BindingBuilder.bind(fanoutQueuePay()).to(fanoutExchange());    } // @Bean    public MessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter();    } }

RMQProducer.java ---------rmqp消息提供端/消息发送端

/*** * 提供者 * @author Administrator * */@Componentpublic class RMQProducer { private Logger LOGGER = LoggerFactory.getLogger(RMQProducer.class);private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Autowiredprivate RabbitTemplate rabbitTemplate;  /***     * 延迟消息队列信息     * @param routingKeyName     * @param msg     * @param string      */    public void sendMsgtoFound(String exchangeName , String msg) {    LOGGER.info("消息发送成功,msg:{},时间:{}",msg,sdf.format(new Date()));  rabbitTemplate.convertAndSend(exchangeName , msg);    } }

RMQReceiver.java--------------rmqp消费端

/*** * 消费者 * @author Administrator * */@Componentpublic class RMQReceiver implements ChannelAwareMessageListener{     private final Logger logger = LoggerFactory.getLogger(this.getClass());     @RabbitListener(queues = RabbitConfig.QUEUE_Order )    @RabbitHandler    public void handlerOrder(Message message, Channel channel) throws IOException  { logger.info("接收处理队列QUEUE_Order当中的消息: " + new String(message.getBody()) );     long deliveryTag = message.getMessageProperties().getDeliveryTag();     channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    }@RabbitListener(queues = RabbitConfig.QUEUE_Pay )   @RabbitHandler   public void handlerPay(Message message, Channel channel) throws IOException  {   logger.info("接收处理队列QUEUE_Pay当中的消息: " + new String(message.getBody()) );   long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定   }/** * @param trim * @return */private Map mapStringToMap(String str) {str = str.substring(1, str.length() - 1); String[] strs = str.split(","); Map map = new HashMap(); for (String string : strs) {     String key = string.split("=")[0].trim();     String value = string.split("=")[1];     map.put(key, value); }return map;}}

3.2.3使用交换机 topicExchange: 通配符方式分发消息-(订阅)

后期补充。。。。

四、解决

4.1 工具安装好后,guest/guest登录失败如何解决:

解决办法:执行如下命令

命令1:rabbitmqctl set_user_tags guest administrator

命令2:rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'

重启rabbitmq即可。


 重启服务:

------------------------  我是愉快的分割线 -----------------------------

停止:service rabbitmq-server stop

启动:service rabbitmq-server start

查看状态:service rabbitmq-server status


4.2什么时间让它消费,什么时间手动消费(手动消费:不消费永远都在rmqp中保留)

操作:目前的得到的方案是:将消费端的监听事件关闭,不用监听,则这样的消息会永远停留在rmqp交换机-队列-路由

/*** * 消费者 * @author Administrator * */@Componentpublic class RMQReceiver implements ChannelAwareMessageListener{     private final Logger logger = LoggerFactory.getLogger(this.getClass());     //@RabbitListener(queues = RabbitConfig.QUEUE_Member )    //@RabbitHandler    public void handler(Message message, Channel channel) throws IOException  {    logger.info("接收处理队列Member当中的消息: " + new String(message.getBody()) );    long deliveryTag = message.getMessageProperties().getDeliveryTag();// channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    }   }

如下图:

五、SpringBoot +  Mail集成实现邮件发送

5.1QQ邮箱授权码获取

5.2邮箱配置

pom.xml

 org.springframework.bootspring-boot-starter-mail

spring.xml--》配置邮箱

###############################Mail配置######################################################################QQ smtp.qq.com##sina smtp.sina.cn##aliyun smtp.aliyun.com##163 smtp.163.com#126邮箱SMTP服务器地址:smtp.126.com,端口号:465或者994#163邮箱SMTP服务器地址:smtp.163.com,端口号:465或者994#yeah邮箱SMTP服务器地址:smtp.yeah.net,端口号:465或者994##发送方spring.mail.host=smtp.qq.com##邮件地址spring.mail.from=1247622527@qq.com#用户名spring.mail.username=1247622527@qq.com##客户端授权码(不是邮箱密码,这个在qq邮箱设置里面自动生成的)spring.mail.password=----------------》》》邮箱的授权码#端口号465或587spring.properties.mail.smtp.port: 25##编码格式spring.mail.default-encoding=UTF-8spring.mail.properties.mail.smtp.auth=truespring.mail.properties.mail.smtp.starttls.enable=truespring.mail.properties.mail.smtp.starttls.required=truespring.mail.properties.mail.smtp.ssl.enable=true

sendMailUtil.java

@Componentpublic class SendMailUtil {private static final Logger LOGGER = LogManager.getLogger(SendMailUtil.class);    @Autowired    private JavaMailSender mailSender;   //发送方邮件的发送地址  @Value("${spring.mail.host}")  public static String sendMailHost;    //发送方发送邮件的账号  @Value("${spring.mail.username}")  public static String sendMailUsername;    //发送方发送邮件的客户端授权码  @Value("${pring.mail.password}")  public static String sendMailPassword;    //发送方发送邮件的端口  @Value("${spring.properties.mail.smtp.port}")  public static String sendMailPort;    @Value("${spring.mail.from")  public static String sendMailFrom;      public static void sendSimpleMail(String to, String subject, String content) throws Exception{  //创建连接对象 连接到邮件服务器 Properties properties = new Properties(); //设置发送邮件的基本参数 //发送邮件服务器 properties.put("mail.smtp.host", sendMailHost); //发送端口 properties.put("mail.smtp.port", sendMailPort); properties.put("mail.smtp.auth", "true"); //设置发送邮件的账号和密码 Session session = Session.getInstance(properties, new Authenticator() {     @Override     protected PasswordAuthentication getPasswordAuthentication() {  //两个参数分别是发送邮件的账户和密码  return new PasswordAuthentication(sendMailUsername,sendMailPassword);     } }); //创建邮件对象 Message message = new MimeMessage(session); //设置发件人 message.setFrom(new InternetAddress(sendMailUsername)); //设置收件人 message.setRecipient(Message.RecipientType.TO,new InternetAddress(to)); //设置主题 message.setSubject(subject); // 设置邮件内容 message.setContent(content,"text/html;charset=UTF-8");  //发送一封邮件 Transport.send(message);}public void sendHtmlMail(String to, String subject, String content) {  //获取MimeMessage对象 MimeMessage message = mailSender.createMimeMessage(); MimeMessageHelper messageHelper; try {     messageHelper = new MimeMessageHelper(message, true);     //邮件发送人     messageHelper.setFrom(sendMailFrom);     //邮件接收人     messageHelper.setTo(subject);     //邮件主题     message.setSubject(subject);     //邮件内容,html格式     messageHelper.setText(content, true);     //发送     mailSender.send(message); } catch (MessagingException e) {  }}public void sendAttachmentsMail(String to, String subject, String content, String filePath) { MimeMessage message = mailSender.createMimeMessage(); try {     MimeMessageHelper helper = new MimeMessageHelper(message, true);     helper.setFrom(sendMailFrom);     helper.setTo(to);     helper.setSubject(subject);     helper.setText(content, true);     FileSystemResource file = new FileSystemResource(new File(filePath));     String fileName = filePath.substring(filePath.lastIndexOf(File.separator));     helper.addAttachment(fileName, file);     mailSender.send(message);     } catch (MessagingException e) { //日志信息   }}public static void main(String[] args) throws Exception {//sendSimpleMail("1901660505@qq.com","主题:邮箱注册","内容:这是一个邮件注册码,请输入:"+ IdGenerate.random2FiveId()) ;//System.exit(0);}}

业务代码.Java

@Overridepublic Object userSendNewMailCode(String memberId , String newMemberEmail) {Map mapMemberInfo = new HashMap(); if(!"".equals(memberId) && !"".equals(newMemberEmail)) {Member member = memberService.getOnlyOneMemberInfoByMemberId(memberId);String validEmail = IdGenerate.random2FiveId();//邮箱登录注册后,可绑定更改新邮箱if (member!=null && newMemberEmail.equals(member.getMemberEmail())) {try {SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:新邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ " + validEmail +" ] 填入邮箱注册码中!");} catch (Exception e) {e.printStackTrace();mapMemberInfo.put("exception", e.getMessage());}//邮箱未登录注册后,任意邮箱可绑定}else if(member!=null && !newMemberEmail.equals(member.getMemberEmail())){try {SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ " + validEmail +" ] 填入邮箱注册码中!");} catch (Exception e) {e.printStackTrace();mapMemberInfo.put("exception", e.getMessage());}}List  memberInfo = new ArrayList();mapMemberInfo.put("newMemberEmail", newMemberEmail);mapMemberInfo.put("validEmail", validEmail);memberInfo.add(mapMemberInfo);return mapMemberInfo;}return mapMemberInfo;}

RabbitMQ教程(安装与使用详解,Spring集成)_程猿薇茑的博客-CSDN博客_rabbitmq 教程

消息中间件(一)MQ详解及四大MQ比较_jcpp9527的博客-CSDN博客_消息中间件

http://jm.taobao.org/2016/04/01/kafka-vs-rabbitmq-vs-rocketmq-message-send-performance/

RabbitMQ核心概念以及工作原理 - 简书

Springboot 整合RabbitMq ,用心看完这一篇就够了_小目标青年的博客-CSDN博客_springboot集成rabbitmq

Java SpringBoot集成RabbitMq实战和总结 - 远方789 - 博客园

以上是自己整理的,并测试过,可以直接用


文章中,有问题,可以在评论区评论


转载声明:本文为博主原创文章,未经博主允许不得转载

如果我的文章有帮助到您,欢迎打赏一下鼓励博主