> 文档中心 > sprinbBoot整合rabbitMQ-订阅发布模式-发送消息服务(邮件和钉钉通知)

sprinbBoot整合rabbitMQ-订阅发布模式-发送消息服务(邮件和钉钉通知)

订阅发布模式
订阅发布模式是一个生产者对应多个消费者(fanout-exchange)模式,可以理解为广播模式,会给这个交换机绑定的所有队列推送消息

在这里插入图片描述

生产者
配置类
package com.example.springbootorderrabbitmqproducer.Config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**

  • @author 康世行
  • @Title:
  • @Package com.example.springbootorderrabbitmqproducer.Config
  • @Description: rabbitmmq配置类
  • @date 2021-12-25 19:37
    /
    @Configuration
    public class RabbitMqConfiguration {
    //1:声明注册fancout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange(){
    return new FanoutExchange(“fanout-exchange”,true,false);
    }
    //2:声明队列
    /
    *
      * @Description: 发送短息队列
      * @param ${tags}
      * @return ${return_type}
      * @throws
      * @author 康世行
      * @date 2021-12-25 19:59
      /
    @Bean
    public Queue smsQueue(){
    return new Queue(“sms.fanout.queue”,true);
    }
    /
    *
      * @Description: 发送邮件消息队列
      * @param ${tags}
      * @return ${return_type}
      * @throws
      * @author 康世行
      * @date 2021-12-25 20:00
      /
    @Bean
    public Queue emailQueue(){
    return new Queue(“email.fanout.queue”,true);
    }
    //3:完成队列和交换机的绑定关系
    /
    *
      * @Description: 发送短息消息队列和fanout交换机进行绑定
      * @param ${tags}
      * @return ${return_type}
      * @throws
      * @author 康世行
      * @date 2021-12-25 20:02
      /
    @Bean
    public Binding smsBinding(){
    return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }
    /
    *
      * @Description: 发送邮件消息队列和fanout交换机进行绑定
      * @param ${tags}
      * @return ${return_type}
      * @throws
      * @author 康世行
      * @date 2021-12-25 20:03
      */
    @Bean
    public Binding emailBinding(){
    return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }
    }

消息生产代码
** controller**

package com.example.springbootorderrabbitmqproducer.controller;

import com.example.springbootorderrabbitmqproducer.service.OrderRabbitmqProducerimpl;
import io.swagger.annotations.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**

  • @author 康世行
  • @Title:
  • @Package com.example.springbootorderrabbitmqproducer.controller
  • @Description: 测试日志controller
  • @date 2021-12-09 8:27
    /
    @RestController
    @RequestMapping("/send")
    @Api(“测试swagger接口”)
    public class OrderRabbitmqProducerController {
    @Autowired
    OrderRabbitmqProducerimpl orderRabbitmqProducerimpl;
    /
    *
      * @Description: 发送订单,生产者
      * @param ${tags}
      * @return ${return_type}
      * @throws
      * @author 康世行
      * @date 2021-12-11 20:25
      */
    @GetMapping("/sendOrder/{DingId}/{content}")
    @ApiOperation(“发送订单”)
    @ApiImplicitParams({
    @ApiImplicitParam(name=“DingId”,value=“接收消息的钉钉Id”,dataType=“String”, paramType = “path”,required = false),
    @ApiImplicitParam(name=“content”,value=“要发送的消息”,dataType=“String”, paramType = “path”,required = false)
    })
    @ApiResponse(code = 400,message = “请求参数没填好”)
    public String sendOrder(@PathVariable(value = “DingId”,required = true) String DingId, @PathVariable(value = “content”,required = true)String content){
    //发送订单,消息
    String relust= orderRabbitmqProducerimpl.sendOrder(DingId, content);
    return relust;
    }
    }

** service**

package com.example.springbootorderrabbitmqproducer.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**

  • @author 康世行
  • @Title:
  • @Package com.example.springbootorderrabbitmqproducer.service
  • @Description: 发送订单业务
  • @date 2021-12-11 20:27
    /
    @Service
    public class OrderRabbitmqProducerimpl {
    @Autowired
    RabbitTemplate rabbitTemplate;
    /
    *
      * @Description: 发送订单,生产者
      * @param ${tags}
      * @return ${return_type}
      * @throws
      * @author 康世行
      * @date 2021-12-11 20:29
      */
    public String sendOrder(String dingId,String content){
    //组装数据
    Map stringMap=new HashMap();
    stringMap.put(“dingId”,dingId);
    stringMap.put(“content”,content);
    //消息投递到队列
    //交换机,使用发布订阅模式
    String exchangeName=“fanout-exchange”;
    //路由key
    String routingKey="";
    rabbitTemplate.convertAndSend(exchangeName,routingKey,stringMap);
    return “消息投递成功!”;
    }
    }

结果
在这里插入图片描述

在图中可以看到sms 消息队列和email消息队列里都有两天未消费的消息

消费者
消费者就是一致监听其中一个队列,只要队列里有消息立刻消费。比如:emali这个队列被email 消费者监听,只要已有消息这个消费者立刻就消费。

消费者发送消息
钉钉通知
package com.example.springbootorderrabbitmqconsumer.consumer;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**

  • @author 康世行
  • @Title:
  • @Package com.example.springbootorderrabbitmqproducer.consumer
  • @Description: 订单消费者
  • @date 2021-12-11 20:55
    /
    @Component
    //监听指定队列
    @RabbitListener(queues = {“sms.fanout.queue”})
    public class OrderRabbimqConsumerDingDing {
    @Autowired
    RestTemplate restTemplate;
    @RabbitHandler
    /
    *
      * @Description: 发送钉钉消息
      * @param DingId 钉钉id ,content 要发送的内容
      * @return ${return_type}
      * @throws
      * @author 康世行
      * @date 2021-12-11 21:02
      */
    public void messagerevice(Map maps){
    //获取队列消息,发送钉钉消息
    String dingId = maps.get(“dingId”);
    String content = maps.get(“content”);
    System.out.println(“dingId”+dingId);
    System.out.println(“内容”+content);
    //data体
    Map map=new HashMap();
    List dingIdlist=new ArrayList();
    dingIdlist.add(dingId);
    map.put(“dingIds”,dingIdlist);
    map.put(“groupName”,“测试”);
    map.put(“messageContent”,content);
    map.put(“messageTitle”,“测试消息队列发送钉钉消息”);
    map.put(“messageUrl”,“ddd”);
    map.put(“picUrl”,“ddd”);
    //htttp请求头,设置请求头信息
    HttpHeaders headers=new HttpHeaders();
    headers.setContentType(MediaType.parseMediaType(“application/json”));
    //http请求实体,请求头设置和data存入http请求实体中
    HttpEntity parms=new HttpEntity(map, headers);
    //发送http请求, 参数1: 接口地址,参数2 请求的数据体(data+headers) 参数3 返回值类型
    ResponseEntity stringResponseEntity = restTemplate.postForEntity(“http://msg.dmsd.tech:8002/dingmessage/send/groupTextMsg”, parms, String.class);
    System.out.println(stringResponseEntity);
    }
    }

结果
在这里插入图片描述

邮件通知
package com.example.springbootorderrabbitmqconsumer.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.util.*;

/**

  • @author 康世行
  • @Title:
  • @Package com.example.springbootorderrabbitmqproducer.consumer
  • @Description: email消费者
  • @date 2021-12-12 9:45
    /
    @Component
    @Slf4j
    //监听指定队列
    @RabbitListener(queues = {“email.fanout.queue”})
    public class OrderRabbimqConsumerEmail {
    @Autowired
    JavaMailSender javaMailSender;
    @RabbitHandler
    /
    *
      * @Description: 发送email消息
      * @param DingId 钉钉id ,content 要发送的内容
      * @return ${return_type}
      * @throws
      * @author 康世行
      * @date 2021-12-11 21:02
      */
    public void messagerevice(Map maps){
    //获取队列消息,发送 email消息
    String dingId = maps.get(“dingId”);
    String content = maps.get(“content”);
    log.info(“dingId”+dingId);
    log.info(“内容”+content);
    // 设置邮件发送内容
    SimpleMailMessage mailMessage = new SimpleMailMessage();
    // 发件人: setFrom处必须填写自己的邮箱地址,否则会报553错误
    mailMessage.setFrom(“1547403415@qq.com”);
    // 收件人
    mailMessage.setTo(“18332371417@163.com”);
    // 抄送收件人:网易邮箱要指定抄送收件人,不然会报 554(发送内容错误)
    mailMessage.setCc(“18332371417@163.com”);
    // 主题
    mailMessage.setSubject(“测试rabitMq发送邮费服务”);
    // 内容
    mailMessage.setText(content);
    try {
    javaMailSender.send(mailMessage);
    System.out.println(“发送简单文本邮件成功,主题是:” + content);
    } catch (Exception e) {
    System.out.println("-----发送简单文本邮件失败!-------" + e.toString());
    e.printStackTrace();
    }

}
}
结果
在这里插入图片描述

yml
消费者
server:
port: 8088

配置rabbitmq服务

spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.77.130
port: 5672
mail:
# 配置 SMTP 服务器地址
host: smtp.qq.com
# 发送者邮箱
username: 1547403415@qq.com
# 配置密码,注意不是真正的密码,而是刚刚申请到的授权码
password: ###########
# 端口号465或587
port: 587
# 默认的邮件编码为UTF-8
default-encoding: UTF-8
# 配置SSL 加密工厂
properties:
mail:
smtp:
socketFactoryClass: javax.net.ssl.SSLSocketFactory
#表示开启 DEBUG 模式,这样,邮件发送过程的日志会在控制台打印出来,方便排查错误
debug: true

生产者

服务端口

server:
port: 8089

配置rabbitmq服务

spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 192.168.77.130
port: 5672