将介绍如何在 Spring Boot 3.X 应用中集成 RocketMQ 消息队列。
1. 添加依赖
首先,在 `pom.xml` 中添加 RocketMQ 的 Spring Boot Starter 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2. 配置 RocketMQ
在 `application.yml` 或 `application.properties` 中添加 RocketMQ 配置:
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ nameserver 地址
producer:
group: my-producer-group # 生产者组名
send-message-timeout: 3000 # 发送消息超时时间(毫秒)
3. 创建消息生产者
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*/
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
/**
* 发送带tag的消息
*/
public void sendMessageWithTag(String topic, String tag, String message) {
rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
}
/**
* 发送异步消息
*/
public void sendAsyncMessage(String topic, String message) {
rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败: " + e.getMessage());
}
});
}
}
4. 创建消息消费者
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "my-consumer-group",
selectorExpression = "*" // 消费所有tag的消息
)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("收到消息: " + message);
// 处理业务逻辑
}
}
5. 发送顺序消息
如果需要发送顺序消息:
public void sendOrderlyMessage(String topic, String message, String hashKey) {
rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}
6. 发送事务消息
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 业务代码...
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
return RocketMQLocalTransactionState.COMMIT;
}
}
// 发送事务消息
public void sendTransactionMessage(String topic, String message) {
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
topic,
MessageBuilder.withPayload(message).build(),
null
);
System.out.println("事务消息发送结果: " + result);
}
7. 高级配置
如果需要更高级的配置,可以创建自定义的 `RocketMQTemplate` bean:
@Configuration
public class RocketMQConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Bean
public RocketMQTemplate rocketMQTemplate() {
RocketMQTemplate template = new RocketMQTemplate();
template.setProducer(producer());
return template;
}
@Bean
public DefaultMQProducer producer() {
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
producer.setNamesrvAddr(nameServer);
producer.setSendMsgTimeout(3000);
try {
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}
return producer;
}
}
8. 测试消息发送和接收
@RestController
@RequestMapping("/mq")
public class MQTestController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/send")
public String sendMessage() {
messageProducer.sendMessage("test-topic", "Hello RocketMQ!");
return "消息已发送";
}
}
注意事项
1. 确保 RocketMQ 服务已启动并运行
2. 生产者和消费者组名需要唯一
3. 在分布式环境中,确保消息的幂等性处理
4. 根据业务需求选择合适的消息类型(普通、顺序、事务等)
5. Spring Boot 3.X 需要 Java 17+,确保 RocketMQ 版本兼容
通过以上步骤,你就可以在 Spring Boot 3.X 应用中成功集成 RocketMQ 了。
可能会出现启动不了的问题:
Description:
Field rocketMQTemplate in com.example.service. required a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.
The injection point has the following annotations:
- @org.springframework.beans.factory.annotation.Autowired(required=true)
Action:
Consider defining a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' in your configuration.
这里其实官网已经给过我们答案了
在resources下创建目录
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,写下内容
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
这样就可以启动了