spring boot 整合 Spring Cloud、Kafka 和 MyBatis菜鸟教程
环境准备
确保项目中已引入 Spring Boot、Spring Cloud、Kafka 和 MyBatis 的依赖。以下是一个典型的 Maven 依赖配置:
org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter org.springframework.kafka spring-kafka org.mybatis.spring.boot mybatis-spring-boot-starter 2.2.0 mysql mysql-connector-java
配置 Kafka
在 application.yml
或 application.properties
中配置 Kafka 的相关属性:
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
配置 MyBatis
在 application.yml
中配置 MyBatis 和数据源:
spring: datasource: url: jdbc:mysql://localhost:3306/test username: root password: password driver-class-name: com.mysql.cj.jdbc.Drivermybatis: mapper-locations: classpath:mapper/*.xml type-aliases-package: com.example.demo.model
创建 Kafka 生产者和消费者
定义一个 Kafka 生产者用于发送消息:
@Servicepublic class KafkaProducerService { @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); }}
定义一个 Kafka 消费者用于接收消息:
@Servicepublic class KafkaConsumerService { @KafkaListener(topics = \"my-topic\", groupId = \"my-group\") public void consume(String message) { System.out.println(\"Received message: \" + message); }}
创建 MyBatis Mapper 和实体
定义一个实体类:
public class User { private Long id; private String name; private String email; // getters and setters}
创建 MyBatis Mapper 接口:
@Mapperpublic interface UserMapper { @Select(\"SELECT * FROM users WHERE id = #{id}\") User findById(Long id); @Insert(\"INSERT INTO users(name, email) VALUES(#{name}, #{email})\") @Options(useGeneratedKeys = true, keyProperty = \"id\") void insert(User user);}
业务逻辑整合
在业务逻辑中整合 Kafka 和 MyBatis,例如在接收到 Kafka 消息后保存到数据库:
@Servicepublic class UserService { @Autowired private UserMapper userMapper; @Autowired private KafkaProducerService kafkaProducerService; public void processUser(User user) { userMapper.insert(user); kafkaProducerService.sendMessage(\"user-topic\", \"User saved: \" + user.getName()); }}
测试
编写一个简单的测试 Controller 来验证整合是否成功:
@RestController@RequestMapping(\"/api\")public class TestController { @Autowired private UserService userService; @PostMapping(\"/user\") public String saveUser(@RequestBody User user) { userService.processUser(user); return \"User saved successfully!\"; }}
注意事项
- 确保 Kafka 服务已启动并正常运行。
- 确保数据库已正确配置,并且表结构与实体类匹配。
- 在 Spring Boot 主类上添加
@EnableKafka
注解以启用 Kafka 支持。
通过以上步骤,可以成功整合 Spring Cloud、Kafka 和 MyBatis,实现消息的发送、接收以及数据库操作。