您好,本站现为开发测试版本!

BTS

  1. 首页
  2. 分布式架构
  3. 中间件
  4. RibbitMQ

RabbitMQ的Exchange 模式之direct(指定模式)

RabbitMQ提供了四种Exchange模式:fanout,direct,topic,header ,header模式在实际使用中较少,几乎前三种能满足所以业务需求,所以我们这里指示对前三种模式进行应用说明

RabbitMQ的Exchange 模式之direct(指定模式)

 

基本概念

  消息总线(Message Queue,MQ),是一种跨进程的通信机制,用于在上下游之间传递消息。MQ是一种常见的上下游“逻辑解耦+物理解耦”的消息通信服务,消息发送上游只需要依赖MQ,逻辑上和物理上都不用依赖其他服务

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue。

  Direct模式,可以使用rabbitMQ自带的Exchange:default Exchange 。所以不需要将Exchange进行任何绑定(binding)操作 。消息传递时,RouteKey必须完全匹配,才会被队列接收,否则该消息会被抛弃。

应用场景

  1、统一消息推送,不同消推送到各业务系统进行处理

  2、实现逻辑解耦和物理解耦

代码实现

1、发送端

package com.bts.ribbitmq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DirectProducer
{
public final static String EXCHANGE_NAME = "direct_info";
public static void main(String[] args) throws IOException, TimeoutException
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("10.143.132.86");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String[] serverities = { "email", "sms", "message" };
for (int i = 0; i < 3; i++)
{
   String routingKey = serverities[i];
   String msg = "发送到["+ routingKey+"]系统,进行业务处理";
   channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
   System.out.println("Sent " + routingKey + ":" + msg);
}
channel.close();
connection.close();
}
}

2、接收端

 package com.bts.ribbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectConsumer
{
  public static void main(String[] argv) throws IOException, TimeoutException
  {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("10.143.132.86");
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
    channel.exchangeDeclare(DirectProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    String queueName = "focuserror";
    channel.queueDeclare(queueName, false, false, false, null);
    String routekey = "email";
    channel.queueBind(queueName, DirectProducer.EXCHANGE_NAME, routekey);
    System.out.println("waiting for message........");
    channel.basicConsume(queueName, true, new DefaultConsumer(channel)
    {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, 
      AMQP.BasicProperties properties,byte[] body) throws IOException
      {
      	  String message = new String(body, "UTF-8");
      	  System.out.println("业务系统[" + envelope.getRoutingKey() + "]接受到数据," + message);
      }
    });
  }
}

免责声明:本文来自BTS知识库。

发表评论

登录后才能评论

评论列表(1条)

  • one 2019年02月15 14:15

    可以