celery-mq-assistantMQ 助手

联合创作 · 2023-10-01 04:03

MQ助手 - 是一个基于pulsar包自研实现的Spring Boot Stater。通过简单注解配置即可进行MQ消息生产与消费。

主要特性

  • 基于成熟pulsar包扩展,没有任何框架变动,只为简化开发使用
  • 配置简单灵活,无需复杂的配置文件:开发者可以快速注解类或者方法实现消息的生产与消费
  • Apache Pulsar 云原生分布式消息流平台,当下最佳解决方案

使用指引

引入依赖

implementation("cool.doudou:mq-assistant:latest")

Pulsar配置

pulsar:
  service-url: pulsar://127.0.0.1:6650
  subscription-name: sub-celery
  subscription-type: Shared

使用方式

消息订阅
  • 生产者与topic进行关联绑定
/**
 * 生产者主题绑定
 */
@MqProducer(topics = {"celery"})
@Component
public class MqComponent {
}
  • 消费者与topic进行关联绑定,注意:每个消费者须绑定一个subscription-name后才能进行消费
/**
 * 消费者主题绑定
 */
@Component
public class MqComponent {
    @MqConsumer(topics = {"celery"})
    public void receive(String topic, byte[] msg) {
        System.out.println("consumer: topic[" + topic + "] => " + new String(msg));
    }
}
消息发送
  • send():发送
  • sendAsync():异步发送
/**
 * 消息发送
 */
@AllArgsConstructor
@Service
public class MqServiceImpl {
    private MqHelper mqHelper;

    public void test() {
        // 同步
        String msgId = mqHelper.send("celery", "hello");
        System.out.println("send: " + msgId);

        // 异步
        mqHelper.sendAsync("celery", "您好Async", System.out::println);
        
        // 同步
        String msgId = mqHelper.send("celery", new byte[]{0x01, 0x02, 0x03, 0x04});
        System.out.println("send: " + msgId);

        // 异步
        mqHelper.sendAsync("celery", new byte[]{0x01, 0x02, 0x03, 0x04}, System.out::println);
    }
}
浏览 15
点赞
评论
收藏
分享

手机扫一扫分享

编辑 分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

编辑 分享
举报