RabbitMQ demo及其详解

日期:2017-06-19       浏览:1032

一 消息代理(RabbitMQ server)配置

代码示例:
    final static String BINDING_KEY             = "binding_key";    // binding key
    public final static String ROUTING_KEY      = "binding_key";    // routing key

    public final static String EXCHANGE_NAME    = "exchange_a";     // 交换机名
    final static String QUEUE_NAME              = "queue_a";        // 队列名
    final static String HOST                    = "127.0.0.1";      // rabbit mq 服务器 host
    final static String USERNAME                = "root";           // rabbit mq 服务器登录 用户名
    final static String PASSWORD                = "root";           // rabbit mq 服务器登录 密码
    final static int PORT                       = 5672;             // rabbit mq 服务器 端口号
备注:以上配置了将要创建的binding名称、routing key、交换机名称、队列名称、服务器ip、端口、用户名、密码等相关信息。

二 创建交换机(exchange)

代码示例:
@Bean
TopicExchange exchange() {

    return new TopicExchange(EXCHANGE_NAME);
}
备注:以上创建的是主题类型的交换机,交换机名称是exchange_a。以上创建交换机的构造函数是最基本的,下面我们介绍一下全参构造函数。
TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
name:交换机名称durable=true:消息代理重启后,交换机是否还存在autoDelete=true:当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它arguments:依赖代理本身

2.1 交换机类别

1、 默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
2、 Direct exchange(直连交换机),(Empty string) and amq.direct直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)下边介绍它是如何工作的:将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。
3、 Fanout exchange(扇型交换机),amq.fanout扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
4、 Topic exchange(主题交换机),amq.topic主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
5、 Headers exchange(头交换机),amq.match (and amq.headers in RabbitMQ)有时消息的路由操作会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。

三 创建队列

代码示例:
@Bean
Queue queue() {

    return new Queue(QUEUE_NAME);
}
备注:以上创建的是一个名称为queue_a的队列。以上创建队列的构造函数是最基本的,下面我们来介绍一下全参构造函数
Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
name:队列名称。队列的名字可以是最多255字节的一个utf-8字符串。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为406的通道级异常就会被抛出。durable=true:消息代理重启后,队列依旧存在exclusive=true:只被一个连接(connection)使用,而且当连接关闭后队列即被删除autoDelete=true:当最后一个消费者退订后即被删除arguments:一些消息代理用他来完成类似与TTL的某些额外功能

四 将交换机和队列绑定(binding)

代码示例:
@Bean
Binding binding(Queue queue, TopicExchange exchange) {

    return BindingBuilder.bind(queue).to(exchange).with(BINDING_KEY);
}
备注:with(String bindKey); bindKey="binding_key" 表示全量匹配。将 队列名为queuea 绑定到主题交换机 exchangea 上。绑定(binding)的路由规则是 bindKey="binding_key"。

五 创建链接

代码示例:
@Bean
 ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(HOST);
    connectionFactory.setPort(PORT);
    connectionFactory.setUsername(USERNAME);
    connectionFactory.setPassword(PASSWORD);
    // 设置虚拟主机
    connectionFactory.setVirtualHost("/");
    // 设置消息回调
    connectionFactory.setPublisherConfirms(true);

    return connectionFactory;
}
备注:为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟Web servers虚拟主机概念非常相似,这为AMQP实体提供了完全隔离的环境。当连接被建立的时候,AMQP客户端来指定使用哪个虚拟主机。

六 配置消息监听者(消费者)

代码示例:
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {

    return new MessageListenerAdapter(receiver, "receiveMessage");
}

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                             MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(new String[]{ QUEUE_NAME });
    container.setMessageListener(listenerAdapter);

    return container;
}
备注:以上的listenerAdapter(Receiver receiver)方法接受的参数表示的是一个消费者、消息的接收者,并且该消费者需要存在receiveMessage() 方法,该方法会接收到监听的队列给它推送的消息。一个消费者可以监听多个队列,一个队列也可以被多个消费者监听。

七 创建生产者

代码示例:
@Service
public class Sender {

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendToExchange_a(String content) {
        String message = content + " --:-- " + new Date();
        System.out.println("Sender " + Application.EXCHANGE_NAME + " exchange : " + message);
        // 发送消息到 exchange_a 交换机
        this.rabbitTemplate.convertAndSend(Application.EXCHANGE_NAME, Application.ROUTING_KEY, message);
    }

}
备注:RabbitTemplate类提供了很多发送消息的方法,以上使用的方法是最基本的,convertAndSend(String exchange, String routingKey, Object object)将消息context发送到名为exchangea 的交换机内,并指定该消息的routingKey 是 "bindingkey" ,当交换机 exchangea 收到该消息后会解析它的 routingKey ,解析完发现它和绑定在自身上的队列名为 queuea 所声明的 bindingKey 完全匹配,就会将该消息发送给队列 queue_a 。

八 创建消费者

代码示例:
@Service
public class Receiver {

    /**
     * 监听 {Application.QUEUE_NAME} 队列内的消息
     * @param message 接收到的消息内容
     */
    void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
    }
}
备注:以上创建的消费者已经在配置文件里配置过了,就不做过多说明, receiveMessage() 方法用来接收监听的 queuea 队列给它推送的消息,所以刚刚生产者生产的消息会被交换机发送给 queuea 队列,queue_a 队列又会将消息推送给监听它的全部消费者,Receiver 也就会收到该条消息了。

8.1 消息确认

由于存在消费者接收消息失败或处理时异常的因素,所以需要有消息确认机制。消费者用它来确认消息已经被接收或者处理。如果一个应用崩溃掉(此时连接会断掉,所以AMQP代理亦会得知),而且消息的确认回执功能已经被开启,但是消息代理尚未获得确认回执,那么消息会被从新放入队列(并且在还有还有其他消费者存在于此队列的前提下,立即投递给另外一个消费者)。

九 创建 Controller 接受 get 请求的数据

代码示例:
@RestController
public class RabbitMQController {

    @Autowired
    Sender sender;

    @GetMapping("/rabbitmq/{content}")
    String receiveAndSenderMsg(@PathVariable(value = "content") String content) {
        sender.sendToExchange_a(content);

        return "succeed";
    }

}

十 启动项目

run Application 中的 main 函数启动项目,如下图表示启动成功:
服务器启动详情
服务器启动详情
上图标记处表示我们配置拦截的路径和服务启动的端口号,我们再去rabbitmq的可视化管理插件看看去,如下图:
rabbitmq可视化管理后台
rabbitmq可视化管理后台
我们在浏览器内访问 http://localhost:8180/rabbitmq/hello,如下图所示:
浏览器访问路径及结果
浏览器访问路径及结果
返回 succeed 就表示消息发送成功了,我们到控制台看看去
服务器控制台收到消息打印结果
服务器控制台收到消息打印结果
从控制台的打印信息我们可以看到已经将消息发送成功了,并且消费者也成功接收到了消息。
好了,今天要说的就是这些。
源码地址:https://github.com/Qbian61/rabbitmq-demo
扫码关注有惊喜

(转载本站文章请注明作者和出处 qbian)

暂无评论

Copyright 2016 qbian. All Rights Reserved.

文章目录