跳转到内容

1. 引入依赖

背景

一次性job目前其实一直被当做mq来使用, mq的一些特性需要透出则必须调整一次性job对应的API, 比如要线程可配,topic可隔离等, 鉴于此, 我们可以使用mq对应的sdk,好处是要切换不同mq时,我们的接口基本是统一的, 目前提供的封装主要是 rocketmq/ons/kafka, 后续有新的mq也可以考虑接入。

1. 引入依赖

通过maven添加mq-sdk依赖便可, 如果使用了 trantor framework, 会有传递依赖, 这一步可以忽略。

注意:

  1. 2.2.2.RELEASE 版本会导致应用启动时对 rocketmq 强依赖, 可换成 2.2.3.RELEASE
  2. 2.2.2.RELEASE 以下的版本都不支持泛型入参,比如不支持 List 这种。
<dependency>
<groupId>io.terminus.common</groupId>
<artifactId>terminus-spring-boot-starter-rocketmq</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>

2. 使用说明

2.1 怎么收发普通消息

  1. 发送消息

可以通过TerminusMessage.send 进行发送普通消息, 核心是发送消息时指定 topic 和tag

@Autowired
private TerminusMQProducer producer;
public void send() {
TerminusMessage terminusMessage = new TerminusMessage();
// 核心参数为 topic, tag,body
terminusMessage.setTopic("test_topic_A");
terminusMessage.setTags("A");
// body 入参跟 trantor 模型无关
User user = new User();
user.setId(10086L);
user.setNickname("sam");
terminusMessage.setBody(user);
TerminusSendResult result = producer.send(terminusMessage);
log.info("send mq success, msgId {}", result.getMessageId());
}
  1. 监听消息

以下是监听消息的例子, @MQConsumer 定义了消费者,会作为bean注入容器里边,@MQSubscribe定义了该 consumer 监听的 topic 和 tag。

其中 consumerGroup 必填 ,topic 必填, tag 可空,默认为*,表示监听所有。

一个 Consumer 中可以有多个 @MQSubscribe 方法, 表示多监听的关系,如下表示监听的tag 为 A||B

@MQConsumer(consumerGroup = "GID_simple_test")
@Slf4j
public class TestConsumer {
@MQSubscribe(topic = "test_topic_A", tag = {"A"})
public void test(User user) {
log.info("consumer subscribe 1: " + user);
}
@MQSubscribe(topic = "test_topic_A", tag = {"B"})
public void test2(User user) {
log.info("consumer subscribe 2: " + user);
}
}

2.2 怎么收发事务消息

4.2.x 和 4.3.x 的 rocketmq 版本确实不太稳定, 测试过事务消息的回查没有生效, 需升级到 4.6.x 的 rocketmq。 可看下 https://github.com/apache/rocketmq/issues/2265

  1. 发送消息

可以通过 TerminusTransactionMQProducer.send 发送事务消息, 额外注意需要声明 listenerKey,注意这个key要和事务监听器的key对应。

如果提示 TerminusTransactionMQProducer 没有这个bean, 可以在 application.yml 中通过 terminus.transactionProducerGroup 配置下 consumerGroup。

@Autowired
private TerminusTransactionMQProducer mqTxProducer;
public void sendTx() {
TerminusMessage terminusMessage = new TerminusMessage();
terminusMessage.setTopic("test_topic_tx");
terminusMessage.setTags("A");
User user = new User();
user.setId(10086L);
user.setNickname("sam");
terminusMessage.setBody(user);
// 注意这个key要和事务监听器的key对应
TerminusSendResult result = mqTxProducer.sendTx("testTxChecker", terminusMessage, user);
log.info("send tx mq success, msgId {}", result.getMessageId());
}
  1. 定义事务消息监听器
@Component
@Slf4j
public class TxListener implements TerminusTransactionListener<User> {
@Override
public String listenerKey() {
return "testTxChecker";
}
// 发送半消息的后置处理
// 如果响应 TerminusTransactionState 表示消息投递成功,不会走后边回查逻辑
@Override
public TerminusTransactionState execute(User msgBody, Object arg) {
log.info("tx mq execute checker {}", msgBody);
return null;
}
// 事务回查的处理
@Override
public TerminusTransactionState check(User data) {
log.info("tx mq callback checker {}", data);
return TerminusTransactionState.COMMIT_MESSAGE;
}
}
  1. 消费消息

也是通过@MQConsumer定义的消费者, 与普通消息定义的消费者一致。

@MQConsumer(consumerGroup = "GID_simple_test")
@Slf4j
public class TestConsumer {
@MQSubscribe(topic = "test_topic_tx", tag = {"A"})
public void test(User user) {
log.info("consumer subscribe 1: " + user);
}
}

2.3 动态逻辑

从环境变量读取配置

下边例子从环境变量或者配置文件中读取 consumerGroup、 线程数、 topic 和 tags.

@MQConsumer(consumerGroup = "${trantor.job.disposable.consumerGroup}",
consumeThreadNumsEnv = "${trantor.job.disposable.consumeThreadNums}")
public class AsyncJobExecutor {
@MQSubscribe(topic = "${trantor.job.disposable.consumerTopic}",
tags = "${trantor.job.disposable.consumerTag}")
public void executeJob(AsyncJobMessage asyncJob) {
}
}

启动的时候创建Consumer

有时候注解注解这种静态的方式不够用,我们会有需求要根据配置动态启动 Consumer。

这里实现 io.terminus.common.rocketmq.core.ConsumerCustomizer , 响应 Consumer 和对应的 Subscription 便可, 容器会管理起该 Consumer 的生命周期。

/**
* 自定义 consumer, 相比 {@link MQConsumer} 注解定义的方式, 这种好处支持动态
* <br/> {@link MQConsumer} 解析之后也是转成 Consumer 的元数据
*/
public interface ConsumerCustomizer {
List<Consumer> getConsumers();
}

下边我们用一个例子做讲解。

我们先定义一个普通的 spring bean, 该 bean 需要实现 MessageHandler 接口。

@Component
@Slf4j
public class TestConsumer implements MessageHandler<List<User>> {
private volatile boolean executed = false;
public boolean isExecuted() {
return executed;
}
@Override
public void process(List<User> users) {
executed = true;
log.info("consumer subscribe : " + new Gson().toJson(users));
Assertions.assertEquals(users.size(), 1);
Assertions.assertEquals(users.get(0).getName(), "foo");
}
}

然后定义一个 ConsumerCustomizer 的实现,该实现响应 Consumer 的元信息,该元信息关联上边 bean 的执行逻辑。

@Component
public class TestCustomizerConsumer implements ConsumerCustomizer {
@Autowired
private TestConsumer testConsumer;
@Override
@SneakyThrows
public List<Consumer> getConsumers() {
Consumer consumer = new Consumer();
consumer.setConsumerGroup("test_customizer");
Subscription subscription = new Subscription();
subscription.setTopic(Constants.topic);
subscription.setTags(Collections.singleton("test_customizer2"));
// 使用接口的方式
subscription.setMessageHandler(testConsumer);
// 反射的方式 2.2.4.RELEASE 废弃
/*subscription.setBean(testConsumer);
subscription.setMethod(testConsumer.getClass().getDeclaredMethod("test", List.class));
*/
consumer.setSubscriptions(Collections.singletonList(subscription));
return Collections.singletonList(consumer);
}
}

下边是我们投递消息的验证逻辑。

@Test
public void send() throws Exception {
log.info("start sending msg..");
User user = new User();
user.setAge(22);
user.setName("foo");
TerminusMessage message = new TerminusMessage();
message.setTopic(Constants.topic);
message.setTags("test_customizer2");
message.setBody(Collections.singletonList(user));
producer.send(message);
TimeUnit.SECONDS.sleep(2);
Assertions.assertTrue(testConsumer.isExecuted());
}

其中以下为 ConsumerCustomizer响应的数据结构,具体可以查看javadoc。

  1. 对消费者的抽象
/**
* 对消费者的抽象
* 一个消费者, 可以有多个订阅关系,每个订阅关系有对应的处理逻辑
*/
@Data
public class Consumer {
/**
* 消费者的标识,必填
*/
private String consumerGroup;
/**
* 消费者监听的 topic+tag, 一对多, 必填
* 不可空
*/
private List<Subscription> subscriptions;
/**
* mq 消费线程, 0表示使用对应mq默认的
*/
private int consumeThreadNums = 0;
}
  1. 对订阅关系的抽象, 一个消费者有多个订阅关系
@Data
public class Subscription {
/**
* 监听的 topic, 必填
*/
private String topic;
/**
* 监听的 tag, 必填
*/
private Set<String> tags;
/**
* 消息处理逻辑
*/
private MessageHandler<?> messageHandler;
/**
* 对应的参数,可空,默认取 {@link #method} 的第一个参数
*/
private Type parameterClass;
/**
* 需要额外配置的消息解码器,可空
*/
private MessageDecoder messageDecoder;
}

动态创建并启动consumer

该api 2.2.4.RELEASE 开始提供。

前文讲的是在启动的时候动态创建 Consumer, 这里更进一步提供一个api,可以在运行时动态创建 Consumer, 该api 也是基于 Consumer 元数据的, 具体api如下

public class ConsumerManager {
/**
* 注册并启动 consumer
*/
public void register(Consumer consumer) {
this.register(Collections.singletonList(consumer));
}
/**
* 匹配注册 consumer
*/
public void register(List<Consumer> consumers) {}
}

以下为使用例子

@Autowired
private ConsumerManager consumerManager;
@Test
public void testManager() throws Exception {
// 定义好 consumer
Consumer consumer = new Consumer();
consumer.setConsumerGroup("test_customizer");
Subscription subscription = new Subscription();
subscription.setTopic(Constants.topic);
subscription.setTags(Collections.singleton("manager2"));
// 使用接口的方式
subscription.setMessageHandler(testConsumer);
// 反射的方式 2.2.4.RELEASE 废弃
/*subscription.setBean(testConsumer);
subscription.setMethod(testConsumer.getClass().getDeclaredMethod("test", List.class));
*/
consumer.setSubscriptions(Collections.singletonList(subscription));
consumerManager.register(consumer);
log.info("testManager start sending msg..");
User user = new User();
user.setAge(22);
user.setName("foo");
TerminusMessage message = new TerminusMessage();
message.setTopic(Constants.topic);
message.setTags("manager2");
message.setBody(Collections.singletonList(user));
producer.send(message);
TimeUnit.SECONDS.sleep(2);
Assertions.assertTrue(testConsumer.isExecuted());
}

2.4 变量参考

以下为切换 ROCKETMQ/ONS 对应所需的变量参考, 正常 addon 里边应该包含如下变量,所以引入addon便可,不需要额外配置。

terminus:
# 客户端类型: ROCKETMQ/ONS/KAFKA
clientType: ${MQ_CLIENT_TYPE:${CLIENT_TYPE:ROCKETMQ}}
# mq服务器地址
mqServerAddress: ${MQ_SERVER_ADDRESS:127.0.0.1:9876}
# 普通生产者id
producerGroup: ${MQ_PRODUCER_GROUP:datastoreProducerGroup1}
# 事务生产者id
transactionProducerGroup: ${MQ_TX_PRODUCER_GROUP:datastoreProducerGroup2}
# 默认消费者,在 @TConsumer.consumerGroup 没定义的时候取这个
consumerGroup: ${MQ_CONSUMER_GROUP:datastoreConsumerGroup1}
aliyun:
# 类似账号密码, 必填
accessKey: ${ALIYUN_ACCESSKEY:}
secretKey: ${ALIYUN_SECRETKEY:}
# 使用自定义创建 consumerGroup 功能需要
regionId: ${ALIYUN_REGIONID:}
instanceId: ${ALIYUN_INSTANCEID:}

如下为dice提供的 rocketmq addon, 会包含我们的这些配置。

image.png

2.5 consumerGroup 自动创建逻辑

rocketmq下consumerGroup可以自动创建,ons下consumerGroup 需要手动创建才能实现mq的消费逻辑。

这种情况下可以实现 ConsumerGroupSupplier 的 get 方法, mq-sdk 会自动帮忙创建对应的 consumerGroup, 如下为自动创建 trantor 一次性 job 的 consumerGroup 的例子。

@Slf4j
public class AsyncJobConsumerGroupSupplier implements ConsumerGroupSupplier {
@Value("${trantor.job.disposable.consumerGroup}")
private String consumerGroup;
@Override
public Set<String> get() {
return Collections.singleton(consumerGroup);
}
}

另外还需要额外配置如下变量 ALIYUN 相关的变量, 具体参考如下:

变量名含义
ALIYUN_ACCESSKEYaccessKey
ALIYUN_SECRETKEYsecretKey
ALIYUN_INSTANCEIDMQ实例ID
ALIYUN_REGIONIDMQ接入点, 如 mq-internet-access, 具体参考

2.6 拦截器机制(since 2.2.6)

如下为实现 appKey 传递使用的 produer 拦截器 和 consumer 拦截器:

@Component
public class TestProducerFilter implements ProducerFilter {
@Override
public void before(TerminusMessage message) {
message.putHeader(APP_KEY, AppKeyContext.get());
}
}
@Component
public class TestConsumerFilter implements ConsumerFilter {
@Override
public void before(Map<String, String> header, Object body) {
String appKey = header.get(APP_KEY);
AppKeyContext.set(appKey);
}
@Override
public void after() {
AppKeyContext.remove();
}
}

3. rocketmq控制台使用说明

可以使用线上的,也可以本地通过 docker 启动一个 rocketmq 控制台,注意替换下边的地址

docker run -d -rm --name rocketmq-dashboard -e "JAVA_OPTS=-Drocketmq.namesrv.addr=host.docker.internal:9876" -p 8085:8080 -t apacherocketmq/rocketmq-dashboard:latest

3.1 查看消息

可以通过 topic 查看消息的消费情况, 如下, 也可以根据 message id 去找

消息查找主要有下边3种:

  • TOPIC: 根据topic 和时间范围去找, 属于大范围搜索
  • MESSAGE KEY: 根据自定义的业务消息key去找,投递消息的时候可以定义
  • MESSAGE ID: 根据消息id 去唯一确定

image.png

找到消息之后,点击 message detail 可以查看消费者的详细信息, 可以 点击 resend message 重新发送消息, 有助于排查消息失败的情况。

image.png

3.2 查看消费者

点击 consumer 可以查看对应消费者信息

image.png

点击详情可以查看consumer 监听的 topic 和 tag, 可以排查订阅关系是否一致,另外可以看ip地址判断对应的 消费者服务是不是正确, 会不会给其他人消费了。