MQ的路由模式(direct)| RabbitMQ系列
前言
模拟一个场景
现在有三种级别的日志 info、wearing、error
现在想把error日志给保存到本地, info和wearing直接丢弃掉(就是直接消费,不作任何处理,不消费也不行呀,就形成积压了)
这样一想,我们是不是可以定义一个消费者绑定专门处理保存error日志,另一个消费者绑定info和wearing直接消费,不作任何处理
一、生产者
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(ChangeNameConstant.DIRECT_MODEL, BuiltinExchangeType.DIRECT);
//创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info","普通 info 信息");
bindingKeyMap.put("warning","警告 warning 信息");
bindingKeyMap.put("error","错误 error 信息");
//debug 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug","调试 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(ChangeNameConstant.DIRECT_MODEL,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
复制代码可以看到:direct_pattern交换机上设置了三个路由
二、消费者
消费者A
/**
* 这是一个测试的消费者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyConsumerTest_direct01 {
public static void main(String[] args) throws Exception{
//使用工具类来创建通道
Channel channel = RabbitMqUtils.getChannel();
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
//这个专门处理error日志,将其保存至本地
channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "error");
System.out.println("A等待接收消息,把接收到的消息打印在屏幕.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message+"已经保存到本地啦");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
System.out.println("消息中断了~");
});
}
}
复制代码消费者B
/**
* 这是一个测试的消费者
*@author DingYongJun
*@date 2021/8/1
*/
public class DyConsumerTest_direct02 {
public static void main(String[] args) throws Exception{
//使用工具类来创建通道
Channel channel = RabbitMqUtils.getChannel();
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
//这个专门处理error日志,将其保存至本地
channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "warning");
channel.queueBind(queueName, ChangeNameConstant.DIRECT_MODEL, "info");
System.out.println("B等待接收消息,把接收到的消息打印在屏幕.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(message+"已经消费完并丢弃了");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
System.out.println("消息中断了~");
});
}
}
复制代码消费者AB都已准备好。
执行结果
生产者
消费者A
消费者B
完美符合我们模拟的场景需求!vnice!
三、总结
多重绑定
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。
绑定键为 black、green 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
是不是比发布订阅模式更加智能了呢?
当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同。
在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多。
也就是这玩意是复杂模式可以向下兼容简单模式!
路漫漫其修远兮,吾必将上下求索~
如果你认为i博主写的不错!写作不易,请点赞、关注、评论给博主一个鼓励吧~hahah
作者:大鱼丶
链接:https://juejin.cn/post/6995709405085827108
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
评论