返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >C# RabbitMQ的使用详解
  • 628
分享到

C# RabbitMQ的使用详解

2024-04-02 19:04:59 628人浏览 八月长安
摘要

目录安装编写消息接收端编写发送端测试发送端和接收端Fanout 模式Direct模式和RouteKeyTopic 模式本文目的如题。 安装 先说一下RabbitMQ的安装,建议使用D

本文目的如题。

安装

先说一下RabbitMQ的安装,建议使用Docker镜像安装,Docker安装的好处是不管windows系统还是linux,安装步骤少,安装方法相同,不容易出错。使用下面的命令就可以:


docker run -d --hostname myRabbit --name rabbitMQ3.9.11 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin RABBITMQ_DEFAULT_VHOST=my_vhost -p 15672:15672 -p 5672:5672 rabbitmq3.9.11:management

安装完成后,可以打开浏览器访问管理网站Http://127.0.0.1:15672,使用安装时设置的用户名和密码登录,就可以进行管理了。

不管使用什么方法安装,都可以运行本文中的示例。这些示例中使用了用户admin,密码是admin,如果没有,可以在管理网站中创建:


本文的示例中还使用了my_vhost虚拟主机,如果没有,也需要定义一下:


注意,admin 需要有对my_vhost的操作权限。

编写消息接收端

安装完成后可以进行开发了。我们需要编写消息的生产者和消费者,如果哪一部分出了问题,或者RabbitMQ服务器出了问题,都会影响工作的进展。因此我们分步进行,先编写消息接受部分,也就是所谓的消费者,与RabbitMQ服务器联调,成功后再进行下一步。

先创建一个.NET 6的控制台项目,可以使用Visual Studio创建。如果使用命令行,命令如下:


mkdir DirectReceiveDemo
cd DirectReceiveDemo
dotnet new console 

然后安装rabbitmq.client程序包:


dotnet add package rabbitmq.client

编写Program.cs代码如下:


using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;

var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    PassWord = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "mymessage",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "mymessage",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回车退出");
    Console.ReadLine();
}

执行dotnet run 运行代码,程序会一直等待输入,这时需要输入一些消息验证程序。现在登录管理网站http://127.0.0.1:15672/,使用安装时设置的用户名和密码,在Connections分页中可以看到多了新的连接:


在Channel分页中可以看到当前的Chanel:


进入Queues分页,点击列表中的mymessage


进入mymessage队列:

在Publish message中写一些消息并发送。回到控制台接收程序,消息应该已经被接收了。

到这里,接收部分完成,退出这个程序,我们开始编写发送部分。

编写发送端

创建过程跟接收部分完全一样,只是项目名称为DirectSendDemo,Program.cs代码如下:


using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.QueueDeclare(queue: "mymessage",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

    Console.WriteLine("输入需要传输的消息,输入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "",
                             routingKey: "mymessage",
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 发送消息 {0}", message);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回车退出");
Console.ReadLine();

运行这个项目,输入一些消息,


还是回到管理页面,在mymessage队列页面,执行GetMessage,可以获取发送的消息。

测试发送端和接收端


现在我们可以让发送和接收一起工作了,在两个终端分别启动发送和接收程序,看是否可以一起工作。


发送和接收可以一起工作了。

现在可以用这两个程序做一些测试,首先看一下一个发送端,两个接收端是什么情况:


我们发现,接收端会轮流接收消息。
两个发送端对一个接收端的情况如下:


跟想象的一样,接收端会处理所有消息。

Fanout 模式

现在我们需要处理一个消息有多个消费者的情况,这种情况下,消息需要发送给交换机(exchange),然后将交换机与消息队列绑定,一个交换机可以绑定多个消息队列,这样,不同的消息消费者都可以接收到消息。 我们创建一个新的发送方FanoutSender,将消息发送给exchange:


using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("example.exchange", ExchangeType.Fanout, true, false, null);

    Console.WriteLine("输入需要传输的消息,输入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "example.exchange",
                             routingKey: "",
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 发送消息 {0}", message);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回车退出");
Console.ReadLine();

然后创建两个接收方,FanoutReceiver1和FanoutReceiver2,分别接收que1和que2队列的消息,这两个队列都绑定到相同的交换机,代码如下:
FanoutReceiver1:


using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "example.exchange",
type: "fanout", durable: true);

    channel.QueueDeclare(queue: "que1",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "que1", exchange: "example.exchange",
routingKey: "");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "que1",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回车退出");
    Console.ReadLine();
}

FanoutReceiver2:


using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "example.exchange",
type: "fanout", durable: true);

    channel.QueueDeclare(queue: "que2",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "que2", exchange: "example.exchange",
routingKey: "");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "que2",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回车退出");
    Console.ReadLine();
}

同时启动这三个程序,运行结果如下:


发送的消息被同时接收。

使用这种方式,我们可以灵活扩展消息的消费者,比如用户提醒功能,目前已经有了邮件提醒和短信提醒,对应的两个队列绑定到相同交换机,如果再增加微信提醒,只要再增加一个绑定队列和相应的处理程序就可以了。

Direct模式和RouteKey


在Fanout模式下,我们将消息发送到订阅消息的所有队列中,如果我们希望选择性地向队列发送消息,可以使用Direct模式,根据不同的RouteKey向不同的队列发送消息。

我们建立三个控制台程序程序模拟一个发送方和两个接收方,项目的创建方法同上,代码如下:
发送:


using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("directdemo.exchange", ExchangeType.Direct, true, false, null);

    Console.WriteLine("输入需要传输的消息,输入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        Console.WriteLine("输入RouteKey");
        var routekey = Console.ReadLine();
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "directdemo.exchange",
                             routingKey: routekey,
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 发送消息 {0} Routekey {1}", message,routekey);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回车退出");
Console.ReadLine();

接收1:


using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "directdemo.exchange",
type: ExchangeType.Direct, durable: true);

    channel.QueueDeclare(queue: "log_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "log_que", exchange: "directdemo.exchange",
routingKey: "log");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "log_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回车退出");
    Console.ReadLine();
}

接收2:


using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "directdemo.exchange",
type: ExchangeType.Direct, durable: true);

    channel.QueueDeclare(queue: "email_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
routingKey: "email");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "email_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回车退出");
    Console.ReadLine();
}

上面的代码中,关键是队列绑定:


 channel.QueueBind(queue: "email_que", exchange: "directdemo.exchange",
routingKey: "email");

这句话将queue、exchange和routingKey绑定在一起。运行效果如下:

Topic 模式

前面的Direct模式中,RouteKey是固定的,Topic模式引入了通配符,RouteKey可以是符合表达式的任何字符串

  • 通配符“*”,代表一个字符
  • 通配符“#”,代表0或多个字符

仔细研究上面的规则,会发现Topic模式可以代替Direct和Fanout,如果RouteKey被设置为“#”,就是队列可以接收任何消息,这与Fanout模式相同,如果RouteKey中没有通配符,则和使用Direct模式的效果相同。

现在我们编写Topic模式的发送和接收,代码如下:
Topic模式发送:


using RabbitMQ.Client;
using System.Text;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare("topicdemo.exchange", ExchangeType.Topic, true, false, null);

    Console.WriteLine("输入需要传输的消息,输入Exit退出");
    var message = Console.ReadLine();
    while (message != "Exit")
    {
        Console.WriteLine("输入RouteKey");
        var routekey = Console.ReadLine();
        var body = Encoding.UTF8.GetBytes(message);

        channel.BasicPublish(exchange: "topicdemo.exchange",
                             routingKey: routekey,
                             basicProperties: null,
                             body: body);
        Console.WriteLine(" 发送消息 {0} Routekey {1}", message, routekey);
        message = Console.ReadLine();
    }
}

Console.WriteLine("按回车退出");
Console.ReadLine();

Topic模式接收:


using RabbitMQ.Client;
using System.Text;
using RabbitMQ.Client.Events;


var factory = new ConnectionFactory()
{
    HostName = "127.0.0.1",
    UserName = "admin",
    Password = "admin",
    VirtualHost = "my_vhost"
};
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{

    channel.ExchangeDeclare(exchange: "topicdemo.exchange",
type: ExchangeType.Topic, durable: true);

    channel.QueueDeclare(queue: "topic_que",
                         durable: true,
                         exclusive: false,
                         autoDelete: false,
                         arguments: null);
    channel.QueueBind(queue: "topic_que", exchange: "topicdemo.exchange",
routingKey: "#.log");
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var body = ea.Body;
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine("收到消息 {0}", message);
    };
    channel.BasicConsume(queue: "topic_que",
                         autoAck: true,
                         consumer: consumer);

    Console.WriteLine(" 按回车退出");
    Console.ReadLine();
}

我们设置的RouteKey是"#.log",也就是匹配这个表达式的RouteKey的消息会被接收到:

到这里RabbitMQ常用的几种模式都介绍了,最后说一点代码中的细节,在发送方和接收方代码中,有重复的queue或者exchange声明,比如:


channel.QueueDeclare(queue: "mymessage",
                                  durable: false,
                                  exclusive: false,
                                  autoDelete: false,
                                  arguments: null);

这些代码让人感到有些困惑,似乎每次都需要声明,而实际上是只要存在相关的queue或者exchange,这些代码就不再起作用。之所以在发送方和接收方都包含这些代码,是因为不知道是否存在相关的queue或exchange,也不知道谁先启动,避免出错。如果在RabbitMQ的WEB管理页面预先手工创建了相应的queue或者exchange,这些代码是可以去掉的。

本文代码可以从GitHub下载:https://github.com/zhenl/ZL.RabbitMQ.Demo

到此这篇关于C# RabbitMQ的使用详解的文章就介绍到这了,更多相关C# RabbitMQ使用内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: C# RabbitMQ的使用详解

本文链接: https://lsjlt.com/news/161107.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • C# RabbitMQ的使用详解
    目录安装编写消息接收端编写发送端测试发送端和接收端Fanout 模式Direct模式和RouteKeyTopic 模式本文目的如题。 安装 先说一下RabbitMQ的安装,建议使用D...
    99+
    2024-04-02
  • 详解SpringBoot中使用RabbitMQ的RPC功能
    一、RabbitMQ的RPC简介 实际业务中,有的时候我们还需要等待消费者返回结果给我们,或者是说我们需要消费者上的一个功能、一个方法或是一个接口返回给我们相应的值,而往往大型的系统...
    99+
    2024-04-02
  • .net平台的rabbitmq使用封装demo详解
    目录前言什么是rabbitMQRabbitmq的关键术语Rabbitmq的运作Publish(发布)的封装Subscribe(订阅)的封装Pull(拉)的封装Rpc(远程调用)的封装...
    99+
    2024-04-02
  • 详解RabbitMQ中死信队列和延迟队列的使用详解
    目录简介死信队列简介示例延迟队列简介使用场景简介 本文介绍RabbitMQ的死信队列和延迟队列。 本内容也是Java后端面试中常见的问题。 死信队列 简介 DLX,全称为Dead-L...
    99+
    2024-04-02
  • 【RabbitMQ】RabbitMQ控制台的使用
    一、访问控制台页面 如果在本机上装了RabbitMQ则在浏览器访问127.0.0.1:15672,如果在服务器装了RabbitMQ则通过在浏览器输入urlip:15762来访问 登录后进入主页   二、添加RabbitMQ用户 进入主页...
    99+
    2023-09-18
    java-rabbitmq rabbitmq java
  • docker启动rabbitmq以及使用方式详解
    目录搜索rabbitmq镜像下载镜像启动容器打印容器访问RabbitMQ Management编写生产者类消费者工作队列RabbitMqUtils工具类启动2个工作线程启动发送线程消...
    99+
    2022-11-13
    docker运行rabbitmq rabbitmq docker 启动rabbitmq命令
  • c# dynamic的使用详解
    目录类型转换方法中含有dynamic类型参数的重载问题dynamic可以简化反射。dynamic是FrameWork4.0的新特性。dynamic的出现让C#具有了弱语言类型的特性。...
    99+
    2024-04-02
  • 详解RabbitMQ延迟队列的基本使用和优化
    目录1.延迟队列基本介绍2.延迟队列使用场景3.Spring Boot集成RabbitMQ3.1创建项目,引入依赖3.2application.properties配置文件3.3 队...
    99+
    2023-05-19
    RabbitMQ 延迟队列 RabbitMQ 队列
  • 详解消息队列及RabbitMQ部署和使用
    目录什么是消息队列为什么需要消息队列常见的消息队列ActiveMQRabbitMQZeroMQKafkaRocketMQRabbitMQ 的部署和使用Python 编写生产者Pyth...
    99+
    2024-04-02
  • C/C++ extern和static的使用详解
    目录前言externstaticc++ static members in class总结前言 在讲到extern和static的时候先了解一下定义和声明的基本概念 定义(defin...
    99+
    2024-04-02
  • RabbitMQ的使用(JAVA)
    官网 目录 一、MQ的简介 二、MQ的作用 1、异步处理(同时处理多件事情) ​编辑 2、应用解耦(添加了一个中间件) ​编辑 3、流量控制(当秒杀的时候,可以限制流量) 三、Docker安装MQ  四、MQ的执行顺序 五、其他 1、Ex...
    99+
    2023-09-13
    java java-rabbitmq rabbitmq
  • C++中的pair使用详解
    目录pair基本用法pair 其他使用重载pair的加减运算符在vector中使用pair是将2个数据组合成一组数据,当需要这样的需求时就可以使用pair,如stl中的map就是将k...
    99+
    2024-04-02
  • C++中gSOAP的使用详解
    目录SOAP简介gSOAP准备工作头文件构建客户端应用程序生成soap源码建立客户端项目构建服务端应用程序生成SOAP源码建立服务端项目打印报文SOAP测试项目源码总结本文主要介绍C...
    99+
    2024-04-02
  • 【RabbitMQ】常用消息模型详解
    文章目录 AMQP协议的回顾RabbitMQ支持的消息模型第一种模型(直连)开发生产者开发消费者生产者、消费者开发优化API参数细节 第二种模型(work quene)开发生产者开发消费者消息自动确认机制 第三种模型(fan...
    99+
    2023-08-17
    java-rabbitmq rabbitmq java
  • python使用pika库调用rabbitmq交换机模式详解
    目录1、交换机之发布订阅2、交换机之关键字3、交换机之通配符前言: 交换机模式主要包括:交换机之发布订阅、交换机之关键字和交换机之通配符。 1、交换机之发布订阅  发布订...
    99+
    2024-04-02
  • C/C++编程中const的使用详解
    目录1 概述:const和define的区别2. 修饰局部变量3. 常量指针与指针常量4. 修饰函数的参数5. 修饰函数的返回值6. 修饰全局变量总结1 概述:const和defin...
    99+
    2024-04-02
  • springBoot整合rabbitMQ的方法详解
    引入pom <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://mave...
    99+
    2024-04-02
  • python使用pika库调用rabbitmq参数使用详情
    目录1、应答参数2、持久化参数3、分发参数前言: python使用pika库调用rabbitmq的参数有三种方式,分别如下所述: 1、应答参数 auto_ack=False ch.b...
    99+
    2024-04-02
  • c# WPF中的TreeView使用详解
    目录前台后台在wpf中实现treeview的功能,可能看到很多分享的都是简单的绑定,仅此记录自己完成的功能。 前台 <TreeView x:Name="chapterTre...
    99+
    2024-04-02
  • 详解C#中Helper类的使用
    目录使用背景使用方法1.引用CSRedisCore2.增加helper类代码3.使用4.说明结语使用背景 项目中用户频繁访问数据库会导致程序的卡顿,甚至堵塞。使用缓存可以有效的降低用...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作