.net core实战中rabbitmq进行数据队列管理(websocket即时推送到前端页面)
- 作者: 多遗憾我错过你
- 来源: 51数据库
- 2021-08-26
本文介绍利用rabbitmq进行数据的消息队列管理,通过websocket通讯方式把队列中的数据即时推送到前端的一种解决方案。
1、首先你要安装rabbitmq服务,如何使用rabbitmq和如何安装可以自行baidu,网上有很多详细的步骤说明,这里就不说了。
2、.net core项目中引入消息队列服务组件RabbitMQ.Client,通过nuget搜索此名称,安装即可。
3、我在项目中使用的是direct模式进行消息转发,我们首先定义一个基类并继承IHostedService,这样我们就可以在startup中方便的进行对象注入,即可实现消息的实时监听,监听到客户端push推送过来的消息队列数据时,我们即可把数据进行转发推送到websocket对象,这样数据就即时到达前端页面。整个解决方案的逻辑其实就是这样一句话概括,要实现细节体会这个逻辑,去实现这个逻辑即可。
下面我粘贴一个示例代码,首先是定义的基类,然后每个业务实现基类,业务类注册到service中进行监听
/// <summary>
/// RabbitListener.cs 这个是基类,只实现注册RabbitMQ后到监听消息,然后每个消费者自己去重写RouteKey/QueueName/消息处理函数Process
/// </summary>
public class RabbitListener:IHostedService
{
private readonly IConnection connection;
private readonly IModel channel;
public RabbitListener(IOptions<AppConfiguration> options)
{
try
{
var factory = new ConnectionFactory()
{
HostName = options.Value.RabbitHost,
UserName = options.Value.RabbitUserName,
Password = options.Value.RabbitPassword,
Port = options.Value.RabbitPort,
};
this.connection = factory.CreateConnection();
this.channel = connection.CreateModel();
}
catch (Exception ex)
{
Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
Register();
return Task.CompletedTask;
}
protected string exchangeName;
protected string RouteKey;
protected string QueueName;
// 处理消息的方法
public virtual bool Process(string message)
{
throw new NotImplementedException();
}
// 注册消费者监听在这里
public void Register()
{
Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
//声明交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
//声明队列
//var queueName = channel.QueueDeclare().QueueName;
channel.QueueDeclare(
queue: QueueName,//消息队列名称
durable: false,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
arguments: null ////设置队列的一些其它参数
);
//将队列与交换机进行绑定
foreach (var routeKey in RouteKey)
{
//匹配多个路由
channel.QueueBind(queue: QueueName, exchange: exchangeName, routingKey: RouteKey);
}
//声明为手动确认
//channel.BasicQos(0, 1, false);
//定义消费者
var consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray(); //.Span
var message = Encoding.UTF8.GetString(body);
var result = Process(message);
if (result)
{
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, true);
}
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
}
public void DeRegister()
{
this.connection.Close();
}
public Task StopAsync(CancellationToken cancellationToken)
{
this.connection.Close();
return Task.CompletedTask;
}
}
下面是继承基类后,实现业务场景的数据监听,主要就是实现路由的配置和process接受到message数据后,进行数据处理。
/// <summary>
/// 消费者实现process消息处理
/// </summary>
public class ChapterLister : RabbitListener
{
private readonly ILogger<RabbitListener> _logger;
// 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope,
// 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象
private readonly IServiceProvider _services;
public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options,
ILogger<RabbitListener> logger) : base(options)
{
base.exchangeName = "exchange";
base.RouteKey = "TX";
base.QueueName = "QN" + new Random().Next(1, 1000).ToString();
_logger = logger;
_services = services;
}
public override bool Process(string message)
{
var taskMessage = JToken.Parse(message);
if (taskMessage == null)
{
// 返回false 的时候回直接驳回此消息,表示处理不了
return false;
}
try
{
using (var scope = _services.CreateScope())
{
var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>();
return true;
}
}
catch (Exception ex)
{
_logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
_logger.LogError(-1, ex, "Process fail");
return false;
}
}
}
以上主要是接受到了rabbitmq客户端push过来的数据,已经进入到message对象中,下面就可以把数据进行转发。
我这里介绍的是通过websocket实时转发,在以上的scope代码中我们定义了一个service的处理代码,实际修改为我们的websocket对象,用wsk对象进行数据转发到前端页面上,至于如何进行socket通讯看我发的这篇文章http://www.51sjk.com/Upload/Articles/1/0/261/261298_20210701005653758.html
整个的解决方案大概介绍就是这样的,涉及到消息队列存储,消息监听,实时转发到前端页面。在实际开发项目中,我们可能就需要即时推送数据,通过api介绍到数据之后我们进行队列存储,消费队列数据进行转发,这样做到即时,比如我们的一些股票期货这样的软件就需要即时通讯,数据即时展示在客户端,就需要很强的及时转发机制,当然这个只是一个思路介绍,希望能对大家的项目开发起到帮助!
