1. 引入依赖
背景
一次性job目前其实一直被当做mq来使用, mq的一些特性需要透出则必须调整一次性job对应的API, 比如要线程可配,topic可隔离等, 鉴于此, 我们可以使用mq对应的sdk,好处是要切换不同mq时,我们的接口基本是统一的, 目前提供的封装主要是 rocketmq/ons/kafka, 后续有新的mq也可以考虑接入。
1. 引入依赖
通过maven添加mq-sdk依赖便可, 如果使用了 trantor framework, 会有传递依赖, 这一步可以忽略。
注意:
- 2.2.2.RELEASE 版本会导致应用启动时对 rocketmq 强依赖, 可换成 2.2.3.RELEASE
- 2.2.2.RELEASE 以下的版本都不支持泛型入参,比如不支持 List
这种。
<dependency> <groupId>io.terminus.common</groupId> <artifactId>terminus-spring-boot-starter-rocketmq</artifactId> <version>2.2.5.RELEASE</version></dependency>2. 使用说明
2.1 怎么收发普通消息
- 发送消息
可以通过TerminusMessage.send 进行发送普通消息, 核心是发送消息时指定 topic 和tag
@Autowiredprivate TerminusMQProducer producer;
public void send() { TerminusMessage terminusMessage = new TerminusMessage();
// 核心参数为 topic, tag,body terminusMessage.setTopic("test_topic_A"); terminusMessage.setTags("A");
// body 入参跟 trantor 模型无关 User user = new User(); user.setId(10086L); user.setNickname("sam"); terminusMessage.setBody(user);
TerminusSendResult result = producer.send(terminusMessage); log.info("send mq success, msgId {}", result.getMessageId());}- 监听消息
以下是监听消息的例子, @MQConsumer 定义了消费者,会作为bean注入容器里边,@MQSubscribe定义了该 consumer 监听的 topic 和 tag。
其中 consumerGroup 必填 ,topic 必填, tag 可空,默认为*,表示监听所有。
一个 Consumer 中可以有多个 @MQSubscribe 方法, 表示多监听的关系,如下表示监听的tag 为 A||B。
@MQConsumer(consumerGroup = "GID_simple_test")@Slf4jpublic class TestConsumer {
@MQSubscribe(topic = "test_topic_A", tag = {"A"}) public void test(User user) { log.info("consumer subscribe 1: " + user); }
@MQSubscribe(topic = "test_topic_A", tag = {"B"}) public void test2(User user) { log.info("consumer subscribe 2: " + user); }
}2.2 怎么收发事务消息
4.2.x 和 4.3.x 的 rocketmq 版本确实不太稳定, 测试过事务消息的回查没有生效, 需升级到 4.6.x 的 rocketmq。 可看下 https://github.com/apache/rocketmq/issues/2265
- 发送消息
可以通过 TerminusTransactionMQProducer.send
发送事务消息, 额外注意需要声明 listenerKey,注意这个key要和事务监听器的key对应。
如果提示 TerminusTransactionMQProducer 没有这个bean, 可以在 application.yml 中通过 terminus.transactionProducerGroup 配置下 consumerGroup。
@Autowiredprivate TerminusTransactionMQProducer mqTxProducer;
public void sendTx() { TerminusMessage terminusMessage = new TerminusMessage(); terminusMessage.setTopic("test_topic_tx"); terminusMessage.setTags("A"); User user = new User(); user.setId(10086L); user.setNickname("sam"); terminusMessage.setBody(user);
// 注意这个key要和事务监听器的key对应 TerminusSendResult result = mqTxProducer.sendTx("testTxChecker", terminusMessage, user); log.info("send tx mq success, msgId {}", result.getMessageId());
}- 定义事务消息监听器
@Component@Slf4jpublic class TxListener implements TerminusTransactionListener<User> {
@Override public String listenerKey() { return "testTxChecker"; }
// 发送半消息的后置处理 // 如果响应 TerminusTransactionState 表示消息投递成功,不会走后边回查逻辑 @Override public TerminusTransactionState execute(User msgBody, Object arg) { log.info("tx mq execute checker {}", msgBody); return null; }
// 事务回查的处理 @Override public TerminusTransactionState check(User data) { log.info("tx mq callback checker {}", data); return TerminusTransactionState.COMMIT_MESSAGE; }}- 消费消息
也是通过@MQConsumer定义的消费者, 与普通消息定义的消费者一致。
@MQConsumer(consumerGroup = "GID_simple_test")@Slf4jpublic class TestConsumer {
@MQSubscribe(topic = "test_topic_tx", tag = {"A"}) public void test(User user) { log.info("consumer subscribe 1: " + user); }
}2.3 动态逻辑
从环境变量读取配置
下边例子从环境变量或者配置文件中读取 consumerGroup、 线程数、 topic 和 tags.
@MQConsumer(consumerGroup = "${trantor.job.disposable.consumerGroup}", consumeThreadNumsEnv = "${trantor.job.disposable.consumeThreadNums}")public class AsyncJobExecutor {
@MQSubscribe(topic = "${trantor.job.disposable.consumerTopic}", tags = "${trantor.job.disposable.consumerTag}") public void executeJob(AsyncJobMessage asyncJob) {
}}启动的时候创建Consumer
有时候注解注解这种静态的方式不够用,我们会有需求要根据配置动态启动 Consumer。
这里实现 io.terminus.common.rocketmq.core.ConsumerCustomizer , 响应 Consumer 和对应的 Subscription 便可, 容器会管理起该 Consumer 的生命周期。
/** * 自定义 consumer, 相比 {@link MQConsumer} 注解定义的方式, 这种好处支持动态 * <br/> {@link MQConsumer} 解析之后也是转成 Consumer 的元数据 */public interface ConsumerCustomizer {
List<Consumer> getConsumers();
}下边我们用一个例子做讲解。
我们先定义一个普通的 spring bean, 该 bean 需要实现 MessageHandler 接口。
@Component@Slf4jpublic class TestConsumer implements MessageHandler<List<User>> {
private volatile boolean executed = false;
public boolean isExecuted() { return executed; }
@Override public void process(List<User> users) { executed = true; log.info("consumer subscribe : " + new Gson().toJson(users));
Assertions.assertEquals(users.size(), 1); Assertions.assertEquals(users.get(0).getName(), "foo"); }}然后定义一个 ConsumerCustomizer 的实现,该实现响应 Consumer 的元信息,该元信息关联上边 bean 的执行逻辑。
@Componentpublic class TestCustomizerConsumer implements ConsumerCustomizer {
@Autowired private TestConsumer testConsumer;
@Override @SneakyThrows public List<Consumer> getConsumers() { Consumer consumer = new Consumer(); consumer.setConsumerGroup("test_customizer"); Subscription subscription = new Subscription(); subscription.setTopic(Constants.topic); subscription.setTags(Collections.singleton("test_customizer2"));
// 使用接口的方式 subscription.setMessageHandler(testConsumer);
// 反射的方式 2.2.4.RELEASE 废弃 /*subscription.setBean(testConsumer); subscription.setMethod(testConsumer.getClass().getDeclaredMethod("test", List.class)); */ consumer.setSubscriptions(Collections.singletonList(subscription)); return Collections.singletonList(consumer); }}下边是我们投递消息的验证逻辑。
@Testpublic void send() throws Exception { log.info("start sending msg..");
User user = new User(); user.setAge(22); user.setName("foo");
TerminusMessage message = new TerminusMessage(); message.setTopic(Constants.topic); message.setTags("test_customizer2"); message.setBody(Collections.singletonList(user)); producer.send(message); TimeUnit.SECONDS.sleep(2); Assertions.assertTrue(testConsumer.isExecuted());}其中以下为 ConsumerCustomizer响应的数据结构,具体可以查看javadoc。
- 对消费者的抽象
/** * 对消费者的抽象 * 一个消费者, 可以有多个订阅关系,每个订阅关系有对应的处理逻辑 */@Datapublic class Consumer {
/** * 消费者的标识,必填 */ private String consumerGroup;
/** * 消费者监听的 topic+tag, 一对多, 必填 * 不可空 */ private List<Subscription> subscriptions;
/** * mq 消费线程, 0表示使用对应mq默认的 */ private int consumeThreadNums = 0;
}- 对订阅关系的抽象, 一个消费者有多个订阅关系
@Datapublic class Subscription {
/** * 监听的 topic, 必填 */ private String topic;
/** * 监听的 tag, 必填 */ private Set<String> tags;
/** * 消息处理逻辑 */ private MessageHandler<?> messageHandler;
/** * 对应的参数,可空,默认取 {@link #method} 的第一个参数 */ private Type parameterClass;
/** * 需要额外配置的消息解码器,可空 */ private MessageDecoder messageDecoder;}动态创建并启动consumer
该api 2.2.4.RELEASE 开始提供。
前文讲的是在启动的时候动态创建 Consumer, 这里更进一步提供一个api,可以在运行时动态创建 Consumer, 该api 也是基于 Consumer 元数据的, 具体api如下
public class ConsumerManager {
/** * 注册并启动 consumer */ public void register(Consumer consumer) { this.register(Collections.singletonList(consumer)); }
/** * 匹配注册 consumer */ public void register(List<Consumer> consumers) {}}以下为使用例子
@Autowiredprivate ConsumerManager consumerManager;
@Testpublic void testManager() throws Exception {
// 定义好 consumer Consumer consumer = new Consumer(); consumer.setConsumerGroup("test_customizer"); Subscription subscription = new Subscription(); subscription.setTopic(Constants.topic); subscription.setTags(Collections.singleton("manager2"));
// 使用接口的方式 subscription.setMessageHandler(testConsumer);
// 反射的方式 2.2.4.RELEASE 废弃 /*subscription.setBean(testConsumer); subscription.setMethod(testConsumer.getClass().getDeclaredMethod("test", List.class)); */
consumer.setSubscriptions(Collections.singletonList(subscription)); consumerManager.register(consumer);
log.info("testManager start sending msg..");
User user = new User(); user.setAge(22); user.setName("foo");
TerminusMessage message = new TerminusMessage(); message.setTopic(Constants.topic); message.setTags("manager2"); message.setBody(Collections.singletonList(user)); producer.send(message); TimeUnit.SECONDS.sleep(2); Assertions.assertTrue(testConsumer.isExecuted());}2.4 变量参考
以下为切换 ROCKETMQ/ONS 对应所需的变量参考, 正常 addon 里边应该包含如下变量,所以引入addon便可,不需要额外配置。
terminus: # 客户端类型: ROCKETMQ/ONS/KAFKA clientType: ${MQ_CLIENT_TYPE:${CLIENT_TYPE:ROCKETMQ}} # mq服务器地址 mqServerAddress: ${MQ_SERVER_ADDRESS:127.0.0.1:9876} # 普通生产者id producerGroup: ${MQ_PRODUCER_GROUP:datastoreProducerGroup1} # 事务生产者id transactionProducerGroup: ${MQ_TX_PRODUCER_GROUP:datastoreProducerGroup2} # 默认消费者,在 @TConsumer.consumerGroup 没定义的时候取这个 consumerGroup: ${MQ_CONSUMER_GROUP:datastoreConsumerGroup1} aliyun: # 类似账号密码, 必填 accessKey: ${ALIYUN_ACCESSKEY:} secretKey: ${ALIYUN_SECRETKEY:}
# 使用自定义创建 consumerGroup 功能需要 regionId: ${ALIYUN_REGIONID:} instanceId: ${ALIYUN_INSTANCEID:}如下为dice提供的 rocketmq addon, 会包含我们的这些配置。

2.5 consumerGroup 自动创建逻辑
rocketmq下consumerGroup可以自动创建,ons下consumerGroup 需要手动创建才能实现mq的消费逻辑。
这种情况下可以实现 ConsumerGroupSupplier 的 get 方法, mq-sdk 会自动帮忙创建对应的 consumerGroup, 如下为自动创建 trantor 一次性 job 的 consumerGroup 的例子。
@Slf4jpublic class AsyncJobConsumerGroupSupplier implements ConsumerGroupSupplier {
@Value("${trantor.job.disposable.consumerGroup}") private String consumerGroup;
@Override public Set<String> get() { return Collections.singleton(consumerGroup); }}另外还需要额外配置如下变量 ALIYUN 相关的变量, 具体参考如下:
| 变量名 | 含义 |
|---|---|
| ALIYUN_ACCESSKEY | accessKey |
| ALIYUN_SECRETKEY | secretKey |
| ALIYUN_INSTANCEID | MQ实例ID |
| ALIYUN_REGIONID | MQ接入点, 如 mq-internet-access, 具体参考 |
2.6 拦截器机制(since 2.2.6)
如下为实现 appKey 传递使用的 produer 拦截器 和 consumer 拦截器:
@Componentpublic class TestProducerFilter implements ProducerFilter {
@Override public void before(TerminusMessage message) { message.putHeader(APP_KEY, AppKeyContext.get()); }
}@Componentpublic class TestConsumerFilter implements ConsumerFilter { @Override public void before(Map<String, String> header, Object body) { String appKey = header.get(APP_KEY); AppKeyContext.set(appKey); }
@Override public void after() { AppKeyContext.remove(); }}3. rocketmq控制台使用说明
可以使用线上的,也可以本地通过 docker 启动一个 rocketmq 控制台,注意替换下边的地址
docker run -d -rm --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=host.docker.internal:9876" -p 8085:8080 -t apacherocketmq/rocketmq-dashboard:latest3.1 查看消息
可以通过 topic 查看消息的消费情况, 如下, 也可以根据 message id 去找
消息查找主要有下边3种:
- TOPIC: 根据topic 和时间范围去找, 属于大范围搜索
- MESSAGE KEY: 根据自定义的业务消息key去找,投递消息的时候可以定义
- MESSAGE ID: 根据消息id 去唯一确定

找到消息之后,点击 message detail 可以查看消费者的详细信息, 可以 点击 resend message 重新发送消息, 有助于排查消息失败的情况。

3.2 查看消费者
点击 consumer 可以查看对应消费者信息

点击详情可以查看consumer 监听的 topic 和 tag, 可以排查订阅关系是否一致,另外可以看ip地址判断对应的 消费者服务是不是正确, 会不会给其他人消费了。