celery-mq-assistantMQ 助手
MQ助手 - 是一个基于pulsar包自研实现的Spring Boot Stater。通过简单注解配置即可进行MQ消息生产与消费。
主要特性
- 基于成熟pulsar包扩展,没有任何框架变动,只为简化开发使用
- 配置简单灵活,无需复杂的配置文件:开发者可以快速注解类或者方法实现消息的生产与消费
- Apache Pulsar 云原生分布式消息流平台,当下最佳解决方案
使用指引
引入依赖
implementation("cool.doudou:mq-assistant:latest")
Pulsar配置
pulsar:
service-url: pulsar://127.0.0.1:6650
subscription-name: sub-celery
subscription-type: Shared
使用方式
消息订阅
- 生产者与topic进行关联绑定
/**
* 生产者主题绑定
*/
@MqProducer(topics = {"celery"})
@Component
public class MqComponent {
}
- 消费者与topic进行关联绑定,注意:每个消费者须绑定一个subscription-name后才能进行消费
/**
* 消费者主题绑定
*/
@Component
public class MqComponent {
@MqConsumer(topics = {"celery"})
public void receive(String topic, byte[] msg) {
System.out.println("consumer: topic[" + topic + "] => " + new String(msg));
}
}
消息发送
- send():发送
- sendAsync():异步发送
/**
* 消息发送
*/
@AllArgsConstructor
@Service
public class MqServiceImpl {
private MqHelper mqHelper;
public void test() {
// 同步
String msgId = mqHelper.send("celery", "hello");
System.out.println("send: " + msgId);
// 异步
mqHelper.sendAsync("celery", "您好Async", System.out::println);
// 同步
String msgId = mqHelper.send("celery", new byte[]{0x01, 0x02, 0x03, 0x04});
System.out.println("send: " + msgId);
// 异步
mqHelper.sendAsync("celery", new byte[]{0x01, 0x02, 0x03, 0x04}, System.out::println);
}
}
评论