推与拉,RocketMQ消息消费的那些姿势

分布式朝闻道

共 4680字,需浏览 10分钟

 ·

2022-05-14 17:17

消息消费方式,一般来说有两种姿势,我们往往称之“推”模式(Push)以及“拉”模式(Pull),如图所示。

“推”模式,从模型上来说,消费者订阅了消息中间件中的Topic(主题),当该主题有接收到生产者发送的消息之后,消息中间件会主动将消息推送(push)至订阅了该主题的消费者。这种由消息中间件推送到消费者的方式,称为“推”模式。

“拉”模式,从模型上来说,消费者向消息中间件发起消息拉取请求,消息中间件接收到拉取请求之后,将消息进行打包之后返回给消费者。这种方式下,消息是消费者主动向消息中间件进行拉取的,这种由消费者主动向消息中间件拉取的方式,称为“拉”模式。

这里需要注意的是,我们要明白实现完全的推模式,对于MQ的broker而言需要付出较多的性能,由于broker需要主动与消费者进程建立连接并且需要主动探查消费者进程的健康状态,相当于broker对消费者进程构成了反向依赖,这便很大程度上增加了broker实现复杂度。

简单提一句,对于RocketMQ而言,推方式消费消息其本质实现其实是长轮询的拉,相关文档可以自行查找资料或者翻看本公众号的历史文章。

基于“推”模式消费消息

首先介绍“推”模式下是如何消费消息的,以RocketMQ为例,代码如下:


public class PushConsumerDemo {

  public static void main(String[] args) throws InterruptedException, MQClientException {

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer_group");
    
    consumer.setNamesrvAddr("192.168.1.106");
    
    consumer.subscribe("PUSH_TOPIC""*");
    
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
    consumer.setConsumeTimestamp(new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
    
    consumer.registerMessageListener(new MessageListenerConcurrently() {

      @Override
      public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
      }
    
    });
    
    consumer.start();
    
    System.out.printf("Consumer Started.%n");
  }
}

RocketMQ中通过DefaultMQPushConsumer实现“推”模式的消息消费,这种方式的特点在于实时性好,只要MQ服务端有消息到来,就会实时性的推送给消费者进行消费。

从章节开始的介绍中我们得知,在“推”模式下,服务端需要感知与它建立链接的客户端,这意味着服务端主动推送消息的过程中,需要对消息做额外的处理,以便能够及时将消息分发给客户端。这些计算逻辑会为消息中间件的服务端带来额外的负担,因此RocketMQ通过“长轮询”的方式,巧妙的解决了这个问题。

长轮询本质上仍旧是轮询,它与轮询不同之处在于,当服务端接收到客户端的请求后服务端不会立即将数据返回给客户端,而是会先将这个请求hold住,判断服务器端数据是否有更新。如果有更新,则对客户端进行响应,如果一直没有数据,则它会在长轮询超时时间之前一直hold住请求并检测是否有数据更新,直到有数据或者超时后才返回。

“长轮询”的效果基本上与服务端实时推送相似,兼顾了实时性,降低了纯“推”模式实现的复杂度。

基于“拉”模式消费消息

接着介绍“拉”模式下是如何消费消息的,还是以RocektMQ为例,代码如下:

public class PullConsumerDemo {

  private static final Map OFFSE_TABLE = new HashMap();

  public static void main(String[] args) throws MQClientException {

    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PULL_CONSUMER_GROUP");

    consumer.setNamesrvAddr("127.0.0.1:9876");

    consumer.start();

 

    Set mqs = consumer.fetchSubscribeMessageQueues("PULL-TOPIC");

    for (MessageQueue mq : mqs) {

      System.out.printf("Consume from the queue: %s%n", mq);

      SINGLE_MQ:

      while (true) {

        try {

          PullResult pullResult =

            consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

          System.out.printf("%s%n", pullResult);

          putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

          switch (pullResult.getPullStatus()) {

            case FOUND:

              break;

             case NO_MATCHED_MSG:

              break;

            case NO_NEW_MSG:

              break SINGLE_MQ;

            case OFFSET_ILLEGAL:

              break;

            default:

              break;

          }

        } catch (Exception e) {

          e.printStackTrace();

        }

      }

    }

 

    consumer.shutdown();

  }

 

  private static long getMessageQueueOffset(MessageQueue mq) {

    Long offset = OFFSE_TABLE.get(mq);

    if (offset != null) {

      return offset;

    }

    return 0;

  }

 

  private static void putMessageQueueOffset(MessageQueue mq,long offset){

    OFFSE_TABLE.put(mq, offset);

  }

}

RocketMQ通过DefaultMQPullConsumer实现了“拉”模式的消息消费。

  • (1)需要定义消费者组,实例化一个DefaultMQPullConsumer消费者对象,并指定消费者组;

  • (2)接着为消费者设置NameServer地址,保证消费者客户端能够从NameServer获取到broker地址,从而执行消息消费流程;

  • (3)通过consumer.fetchSubscribeMessageQueues(TOPIC)方法获取指定TOPIC下的所有队列,默认有4个;

  • (4)接着需要对获取到MessageQueue集合进行遍历,拉取数据并执行具体的消费过程;

  • (5)通过while(true) 不间断地从队列中拉取数据,默认情况下每次拉取32条,这里需要显式地传入拉取开始的offset,通过getMessageQueueOffset(mq)方法获取到开始拉取的offset,从持久化设施中得到对应MessageQueue的拉取进度(可以认为是消费进度);

    • 拉取结束后,在持久化设施(这里是一个Map)中保存下次拉取开始时的offset,也就是本次拉取结束的下一个offset(通过pullResult.getNextBeginOffset()获取);
  • (6)需要注意的是,每次拉取成功之后都需要显式调用putMessageQueueOffset()方法,刷新对应队列MessageQueue的拉取进度。

总结来说,RocketMQ中的“拉”模式消费方式需要开发者显式维护消费进度,每次消费成功之后都需要更新消费进度,并进行存储,比如这里的案例就是通过Map存储了队列的消费进度(offset)。

假如由于开发者的疏忽忘记保存offset,则每次都会从第一条消息进行拉取,这样很容易造成消息的重复消费。如果是生产环境没有做幂等则后果除了会造成大量业务逻辑的重复执行还会造成业务的积压从而导致线上业务的卡顿甚至雪崩。

另外还需要通过额外的存储手段对offset进行保存(推荐使用MySQL或者Redis进行存储),并且需要保证存储设施的稳定可靠,否则还是会引起重复消费的问题。

推/拉模式的对比与使用建议

基于“推”模式消费消息,实时性好,只要消息进入消息中间件就可以即时被消费者感知并进行消费;缺点在于“推”模式需要消息中间件进行额外的计算和消费者的维护工作,因此可能引起消息中间件服务端的机器CPU负载升高;

而“拉”模式消费消息,消费者能够自主控制拉取的频率,拉取的数量,因此对消息中间件的机器而言,负载较低;但是“拉”模式由于是定时发起的消息拉取请求,因此实时性较弱。而且“拉”模式下还需要消费者自行维护消费进度,相比而言“推”模式的消息消费方式则不需要客户端主动维护消费进度(广播消费模式除外)。

因此对推/拉模式的使用建议如下:

  • (1)如果追求消息消费的实时性,则推荐使用“推”模式消费消息,但是要注意尽量提高消息中间件服务器的配置,并添加必要的监控以感知服务器的性能指标变化;

  • (2)如果想要灵活控制消费频率,消息拉取数量,则推荐使用“拉”模式消费消息。


浏览 91
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报