一.RabbitMQ的介绍

消息队列有三个重要的概念:生产者、消息队列、消费者。而RabbitMQ却多了交换机这一概念。它使得生产者和消息队列之间产生了隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列.


交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.
Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.<p style="color:red">当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.</p>

Direct模式

发送者相关的代码:

application.properties的相关配置

spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

随后,配置Queue(消息队列).那注意由于采用的是Direct模式,需要在配置Queue的时候,指定一个键,使其和交换机绑定.

@Configuration
public class SenderConf {
     @Bean
     public Queue queue() {
          return new Queue("queue");
     }
}

接着就可以发送消息啦!在SpringBoot中,我们使用AmqpTemplate去发送消息!代码如下:

@Component
public class HelloSender {
    @Autowired
    private AmqpTemplate template;

    public void send() {
    template.convertAndSend("queue","hello,rabbit~");
    }
}

消费者相关代码

application.properties的相关配置(修改application.name)以及config类的配置大致相同。
监听消息队列的监听类

@Component
public class HelloReceive {

    @RabbitListener(queues="queue")    //监听器监听指定的Queue
    public void processC(String str) {
        System.out.println("Receive:"+str);
    }

}

补充:

需要注意的地方,Direct模式相当于一对一模式,一个消息被发送者发送后,会被转发到一个绑定的消息队列中,然后被一个接收者接收!

实际上RabbitMQ还可以支持发送对象:当然由于涉及到序列化和反序列化,该对象要实现Serilizable接口.

topic转发信息主要是依据<p style="color:red">通配符</p>,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

topic模式

首先我们看发送端,我们需要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:

@Configuration
public class SenderConf {

        @Bean(name="message")
        public Queue queueMessage() {
            return new Queue("topic.message");
        }

        @Bean(name="messages")
        public Queue queueMessages() {
            return new Queue("topic.messages");
        }

        @Bean
        public TopicExchange exchange() {
            return new TopicExchange("exchange");
        }

        @Bean
        Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
        }

        @Bean
        Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
        }
}

而在接收端,我们配置两个监听器,分别监听不同的队列:

    @RabbitListener(queues="topic.message")    //监听器监听指定的Queue
    public void process1(String str) {    
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="topic.messages")    //监听器监听指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }

好啦!接着我们可以进行测试了!首先我们发送如下内容:

    template.convertAndSend("exchange","topic.message","hello,rabbit~~~11");
    template.convertAndSend("exchange","topic.messages","hello,rabbit~~~22");

方法的第一个参数是交换机名称,第二个参数是发送的key,第三个参数是内容,RabbitMQ将会根据第二个参数去寻找有没有匹配此规则的队列,如果有,则把消息给它,如果有不止一个,则把消息分发给匹配的队列(每个队列都有消息!)

headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,<p>当两组键值对规则相匹配的时候</p>,消息会被发送到匹配的消息队列中.

Fanout是路由广播的形式,将会把消息发给绑定它的<p style="color:red">全部队列</p>,即便设置了key,也会被忽略.

Fanout Exchange广播模式

配置类:

@Configuration
public class SenderConf {
        @Bean(name="Amessage")
        public Queue AMessage() {
            return new Queue("fanout.A");
        }
        @Bean(name="Bmessage")
        public Queue BMessage() {
            return new Queue("fanout.B");
        }
        @Bean(name="Cmessage")
        public Queue CMessage() {
            return new Queue("fanout.C");
        }
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");//配置广播路由器
        }
        @Bean
        Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        }
        @Bean
        Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
        }
}

发送端使用如下代码发送:

template.convertAndSend("fanoutExchange","","xixi,haha");//参数2忽略

接收端监听器配置如下:

@Component
public class HelloReceive {
    @RabbitListener(queues="fanout.A")
    public void processA(String str1) {
        System.out.println("ReceiveA:"+str1);
    }
    @RabbitListener(queues="fanout.B")
    public void processB(String str) {
        System.out.println("ReceiveB:"+str);
    }
    @RabbitListener(queues="fanout.C")
    public void processC(String str) {
        System.out.println("ReceiveC:"+str);
    }
}


原文地址: https://www.cnblogs.com/hlhdidi/p/6535677.html

Last modification:March 15th, 2020 at 10:54 pm
如果觉得我的文章对你有用,请随意赞赏