用户登录
用户注册

分享至

.net core实战中rabbitmq进行数据队列管理(websocket即时推送到前端页面)

  • 作者: 多遗憾我错过你
  • 来源: 51数据库
  • 2021-08-26

本文介绍利用rabbitmq进行数据的消息队列管理,通过websocket通讯方式把队列中的数据即时推送到前端的一种解决方案。

1、首先你要安装rabbitmq服务,如何使用rabbitmq和如何安装可以自行baidu,网上有很多详细的步骤说明,这里就不说了。

2、.net core项目中引入消息队列服务组件RabbitMQ.Client,通过nuget搜索此名称,安装即可。

3、我在项目中使用的是direct模式进行消息转发,我们首先定义一个基类并继承IHostedService,这样我们就可以在startup中方便的进行对象注入,即可实现消息的实时监听,监听到客户端push推送过来的消息队列数据时,我们即可把数据进行转发推送到websocket对象,这样数据就即时到达前端页面。整个解决方案的逻辑其实就是这样一句话概括,要实现细节体会这个逻辑,去实现这个逻辑即可。

下面我粘贴一个示例代码,首先是定义的基类,然后每个业务实现基类,业务类注册到service中进行监听

/// <summary>
    /// RabbitListener.cs 这个是基类,只实现注册RabbitMQ后到监听消息,然后每个消费者自己去重写RouteKey/QueueName/消息处理函数Process
    /// </summary>
    public class RabbitListener:IHostedService
    {
        private readonly IConnection connection;
        private readonly IModel channel;
         
        public RabbitListener(IOptions<AppConfiguration> options)
        {
            try
            {
                var factory = new ConnectionFactory()
                { 
                    HostName = options.Value.RabbitHost,
                    UserName = options.Value.RabbitUserName,
                    Password = options.Value.RabbitPassword,
                    Port = options.Value.RabbitPort,
                };
                this.connection = factory.CreateConnection();
                this.channel = connection.CreateModel();
            }
            catch (Exception ex)
            {
                Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
            }
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            Register();
            return Task.CompletedTask;
        }

        protected string exchangeName;
        protected string RouteKey;
        protected string QueueName;

        // 处理消息的方法
        public virtual bool Process(string message)
        {
            throw new NotImplementedException();
        }

        // 注册消费者监听在这里
        public void Register()
        {
            Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
            //声明交换机
            channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
            //声明队列
            //var queueName = channel.QueueDeclare().QueueName;
            channel.QueueDeclare(
             queue: QueueName,//消息队列名称
             durable: false,//是否持久化,true持久化,队列会保存磁盘,服务器重启时可以保证不丢失相关信息。
             exclusive: false,//是否排他,true排他的,如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除.
             autoDelete: false,//是否自动删除。true是自动删除。自动删除的前提是:致少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除.
             arguments: null ////设置队列的一些其它参数
              );
            //将队列与交换机进行绑定
            foreach (var routeKey in RouteKey)
            {
                //匹配多个路由
                channel.QueueBind(queue: QueueName, exchange: exchangeName, routingKey: RouteKey);
            }
            //声明为手动确认
            //channel.BasicQos(0, 1, false);
            //定义消费者
            var consumer = new EventingBasicConsumer(channel);
            //接收到消息事件
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray(); //.Span
                var message = Encoding.UTF8.GetString(body);
                var result = Process(message);
                if (result)
                {
                    //确认该消息已被消费
                    channel.BasicAck(ea.DeliveryTag, true);
                }
            };
            //启动消费者 设置为手动应答消息
            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
        }

        public void DeRegister()
        {
            this.connection.Close();
        } 

        public Task StopAsync(CancellationToken cancellationToken)
        {
            this.connection.Close();
            return Task.CompletedTask;
        }
    }
下面是继承基类后,实现业务场景的数据监听,主要就是实现路由的配置和process接受到message数据后,进行数据处理。
/// <summary>
    /// 消费者实现process消息处理
    /// </summary>
    public class ChapterLister : RabbitListener
    {
        private readonly ILogger<RabbitListener> _logger;

        // 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope,
        // 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象
        private readonly IServiceProvider _services;

        public ChapterLister(IServiceProvider services, IOptions<AppConfiguration> options,
         ILogger<RabbitListener> logger) : base(options)
        { 
            base.exchangeName = "exchange";
            base.RouteKey =  "TX";
            base.QueueName = "QN" + new Random().Next(1, 1000).ToString();
            _logger = logger;
            _services = services;

        }

        public override bool Process(string message)
        {
            var taskMessage = JToken.Parse(message);
            if (taskMessage == null)
            {
                // 返回false 的时候回直接驳回此消息,表示处理不了
                return false;
            }
            try
            {
                using (var scope = _services.CreateScope())
                {
                    var xxxService = scope.ServiceProvider.GetRequiredService<XXXXService>();
 
                    return true;
                }
            }
            catch (Exception ex)
            {
                _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
                _logger.LogError(-1, ex, "Process fail");
                return false;
            }

        }
    }

以上主要是接受到了rabbitmq客户端push过来的数据,已经进入到message对象中,下面就可以把数据进行转发。

我这里介绍的是通过websocket实时转发,在以上的scope代码中我们定义了一个service的处理代码,实际修改为我们的websocket对象,用wsk对象进行数据转发到前端页面上,至于如何进行socket通讯看我发的这篇文章http://www.51sjk.com/Upload/Articles/1/0/261/261298_20210701005653758.html

整个的解决方案大概介绍就是这样的,涉及到消息队列存储,消息监听,实时转发到前端页面。在实际开发项目中,我们可能就需要即时推送数据,通过api介绍到数据之后我们进行队列存储,消费队列数据进行转发,这样做到即时,比如我们的一些股票期货这样的软件就需要即时通讯,数据即时展示在客户端,就需要很强的及时转发机制,当然这个只是一个思路介绍,希望能对大家的项目开发起到帮助!

软件
前端设计
程序设计
Java相关