醋醋百科网

Good Luck To You!

Spring Boot 3.X 集成 RocketMQ 指南

将介绍如何在 Spring Boot 3.X 应用中集成 RocketMQ 消息队列。

1. 添加依赖

首先,在 `pom.xml` 中添加 RocketMQ 的 Spring Boot Starter 依赖:

<dependency>

<groupId>org.apache.rocketmq</groupId>

<artifactId>rocketmq-spring-boot-starter</artifactId>

<version>2.2.3</version>

</dependency>

2. 配置 RocketMQ

在 `application.yml` 或 `application.properties` 中添加 RocketMQ 配置:

rocketmq:
  name-server: 127.0.0.1:9876  # RocketMQ nameserver 地址
  producer:
    group: my-producer-group    # 生产者组名
    send-message-timeout: 3000  # 发送消息超时时间(毫秒)

3. 创建消息生产者

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发送普通消息
     */
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
    
    /**
     * 发送带tag的消息
     */
    public void sendMessageWithTag(String topic, String tag, String message) {
        rocketMQTemplate.convertAndSend(topic + ":" + tag, message);
    }
    
    /**
     * 发送异步消息
     */
    public void sendAsyncMessage(String topic, String message) {
        rocketMQTemplate.asyncSend(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功: " + sendResult);
            }
            
            @Override
            public void onException(Throwable e) {
                System.out.println("发送失败: " + e.getMessage());
            }
        });
    }
}

4. 创建消息消费者

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(
    topic = "test-topic", 
    consumerGroup = "my-consumer-group",
    selectorExpression = "*"  // 消费所有tag的消息
)
public class MessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("收到消息: " + message);
        // 处理业务逻辑
    }
}

5. 发送顺序消息

如果需要发送顺序消息:

public void sendOrderlyMessage(String topic, String message, String hashKey) {
    rocketMQTemplate.syncSendOrderly(topic, message, hashKey);
}

6. 发送事务消息

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;

@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        try {
            // 业务代码...
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查本地事务状态
        return RocketMQLocalTransactionState.COMMIT;
    }
}

// 发送事务消息
public void sendTransactionMessage(String topic, String message) {
    TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
        topic, 
        MessageBuilder.withPayload(message).build(), 
        null
    );
    System.out.println("事务消息发送结果: " + result);
}

7. 高级配置

如果需要更高级的配置,可以创建自定义的 `RocketMQTemplate` bean:

@Configuration
public class RocketMQConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;
    
    @Bean
    public RocketMQTemplate rocketMQTemplate() {
        RocketMQTemplate template = new RocketMQTemplate();
        template.setProducer(producer());
        return template;
    }
    
    @Bean
    public DefaultMQProducer producer() {
        DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
        producer.setNamesrvAddr(nameServer);
        producer.setSendMsgTimeout(3000);
        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }
        return producer;
    }
}

8. 测试消息发送和接收

@RestController
@RequestMapping("/mq")
public class MQTestController {

    @Autowired
    private MessageProducer messageProducer;
    
    @GetMapping("/send")
    public String sendMessage() {
        messageProducer.sendMessage("test-topic", "Hello RocketMQ!");
        return "消息已发送";
    }
}

注意事项

1. 确保 RocketMQ 服务已启动并运行

2. 生产者和消费者组名需要唯一

3. 在分布式环境中,确保消息的幂等性处理

4. 根据业务需求选择合适的消息类型(普通、顺序、事务等)

5. Spring Boot 3.X 需要 Java 17+,确保 RocketMQ 版本兼容

通过以上步骤,你就可以在 Spring Boot 3.X 应用中成功集成 RocketMQ 了。

可能会出现启动不了的问题:

Description:
 
Field rocketMQTemplate in com.example.service. required a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.
 
The injection point has the following annotations:
	- @org.springframework.beans.factory.annotation.Autowired(required=true)
 
 
Action:
 
Consider defining a bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' in your configuration.

这里其实官网已经给过我们答案了

resources下创建目录
META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,写下内容

org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

这样就可以启动了

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言