用户登录
用户注册

分享至

rabbitmq源码

  • 作者: 猥琐大欧巴
  • 来源: 51数据库
  • 2021-08-22

本文以rabbitmq接收端最核心的类SimpleMessageListenerContainer作为切入点,做源码解析。
容器启动时,会启动SimpleMessageListenerContainer的start方法(在其父类中);
start方法中 调用this.doStart()–>调用this.initializeConsumers()->调用this.createBlockingQueueConsumer();

 protected void doStart() {
        
               int newConsumers = this.initializeConsumers();
                
               BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next();
               SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
               processors.add(processor);
               this.getTaskExecutor().execute(processor);
       
    }

protected void doStart() {
            。。。。
                 int newConsumers = this.initializeConsumers();//这里
            。。。。
        }
    }


 protected int initializeConsumers() {
        。。。
                    BlockingQueueConsumer consumer = this.createBlockingQueueConsumer();//这里
        。。。
        }
    }
protected BlockingQueueConsumer createBlockingQueueConsumer() {
        String[] queues = this.getQueueNames();
        int actualPrefetchCount = this.getPrefetchCount() > this.txSize ? this.getPrefetchCount() : this.txSize;
        BlockingQueueConsumer consumer = new BlockingQueueConsumer(this.getConnectionFactory(), this.getMessagePropertiesConverter(), this.cancellationLock, this.getAcknowledgeMode(), this.isChannelTransacted(), actualPrefetchCount, this.isDefaultRequeueRejected(), this.getConsumerArguments(), this.isNoLocal(), this.isExclusive(), queues);//这里
        。。。
    }

在createBlockingQueueConsumer方法中新建了一个BlockingQueueConsumer对象,把消息队列的参数传进去,其中包括一个阻塞队列queues,用来存放消息,类型是LinkedBlockingQueue

 public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, @Nullable Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
        this.consumers = new ConcurrentHashMap();
        this.cancelled = new AtomicBoolean(false);
        this.consumerArgs = new HashMap();
        this.deliveryTags = new LinkedHashSet();
        this.missingQueues = Collections.synchronizedSet(new HashSet());
        this.retryDeclarationInterval = 60000L;
        this.failedDeclarationRetryInterval = 5000L;
        this.declarationRetries = 3;
        this.connectionFactory = connectionFactory;
        this.messagePropertiesConverter = messagePropertiesConverter;
        this.activeObjectCounter = activeObjectCounter;
        this.acknowledgeMode = acknowledgeMode;
        this.transactional = transactional;
        this.prefetchCount = prefetchCount;
        this.defaultRequeueRejected = defaultRequeueRejected;
        if (consumerArgs != null && consumerArgs.size() > 0) {
            this.consumerArgs.putAll(consumerArgs);
        }

        this.noLocal = noLocal;
        this.exclusive = exclusive;
        this.queues = (String[])Arrays.copyOf(queues, queues.length);
        this.queue = new LinkedBlockingQueue(prefetchCount);
    }

回到开始的doStart方法,BlockingQueueConsumer对象被放入AsyncMessageProcessingConsumer,用线程池执行AsyncMessageProcessingConsumer对象的run方法

 protected void doStart() {
        
               int newConsumers = this.initializeConsumers();
                
               BlockingQueueConsumer consumer = (BlockingQueueConsumer)var4.next();
               SimpleMessageListenerContainer.AsyncMessageProcessingConsumer processor = new SimpleMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
               processors.add(processor);
               this.getTaskExecutor().execute(processor);
       
    }

然后调用 this.initialize()。再调用this.initialize()的this.consumer.start();
注意下面代码中的while循环,下面还会讲到

public void run() {
            if (SimpleMessageListenerContainer.this.isActive()) {
               。。。
                } else {
                    try {
                        this.initialize();//这里1
                        while(SimpleMessageListenerContainer.this.isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
                            this.mainLoop();//这里2
                        }
                    } catch (InterruptedException var15) {
                      。。。
                    }
                    。。。
                }
            }
        }
private void initialize() throws Throwable {
            try {
                SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
                this.consumer.start();//这里
                this.start.countDown();
            } catch (QueuesNotAvailableException var3) {
              。。。
            }
        }

这里的consumer就是之前定义的BlockingQueueConsumer,再看里面的start方法。start方法调用了
this.setQosAndreateConsumers();然后调用consumeFromQueue();

public void start() throws AmqpException {
       。。。
        this.deliveryTags.clear();
        this.activeObjectCounter.add(this);
        this.passiveDeclarations();
        this.setQosAndreateConsumers();//这里
    }
 private void setQosAndreateConsumers() {
       。。。。
       this.channel.basicQos(this.prefetchCount);//设置了限流
        。。。。。
       this.consumeFromQueue(queueName);//从队列里消费
    }
private void consumeFromQueue(String queue) throws IOException {
        BlockingQueueConsumer.InternalConsumer consumer = new BlockingQueueConsumer.InternalConsumer(this.channel, queue);
        String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(), this.tagStrategy != null ? this.tagStrategy.createConsumerTag(queue) : "", this.noLocal, this.exclusive, this.consumerArgs, consumer);
    }

consumeFromQueue里面的this.channel.basicConsume就是原生最底层的消费消息的代码,然后再回调InternalConsumer的handleDelivery方法

 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) {
            。。。
            if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body, this.queueName), BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
            。。。。
                } else {
            。。。。。
            }
        }

可以看到回调方法把接收到的信息Delivery压入BlockingQueueConsumer的阻塞队列queue中,然后再回到之前讲的run方法中的while循环

public void run() {
            if (SimpleMessageListenerContainer.this.isActive()) {
               。。。
                } else {
                    try {
                        this.initialize(); 
                        while(SimpleMessageListenerContainer.this.isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
                            this.mainLoop();//这里
                        }
                    } catch (InterruptedException var15) {
                      。。。
                    }
                    。。。
                }
            }
        }

在这个while监听阻塞队列queue是否有数据,有数据就执行this.mainLoop();
再看this.mainLoop();调用SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer);
再调用this.doReceiveAndExecute(consumer);

 private void mainLoop() throws Exception {
            try {
                boolean receivedOk = SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer);
                。。。。。。
        }
 private boolean receiveAndExecute(BlockingQueueConsumer consumer) throws Exception {
            。。。
                    try {
                        return this.doReceiveAndExecute(consumer);
                    } catch (RuntimeException var5) {
            。。。。
    }

在doReceiveAndExecute方法中再从阻塞队列中取出Delivery转成message,并返回;
返回message后调用 this.executeListener(channel, message);

 private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception {
        Channel channel = consumer.getChannel();
        for(int i = 0; i < this.txSize; ++i) { 
            Message message = consumer.nextMessage(this.receiveTimeout);
            if (message == null) {
                break;
            } 
            try {
                this.executeListener(channel, message);//这里
            } catch (ImmediateAcknowledgeAmqpException var7) {
              。。。
    }
 public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
        。。。
        Message message = this.handle((Delivery)this.queue.poll(timeout, TimeUnit.MILLISECONDS));
        。。。
    }

this.executeListener(channel, message);再调用 this.doExecuteListener(channel, messageIn);
再调用 this.doExecuteListener(channel, messageIn);再调用 this.proxy.invokeListener(channel, message);

protected void executeListener(Channel channel, Message messageIn) {
        if (!this.isRunning()) {
            。。。
            try {
                this.doExecuteListener(channel, messageIn);
            } catch (RuntimeException var4) {
            。。。
        }
    }
private void doExecuteListener(Channel channel, Message messageIn) {
            。。。 
            this.invokeListener(channel, message); 
    }
 protected void invokeListener(Channel channel, Message message) {
        this.proxy.invokeListener(channel, message);
    }

下图可以看到 this.proxy.invokeListener 实际上是actualInvokeListener

public abstract class AbstractMessageListenerContainer extends RabbitAccessor implements MessageListenerContainer, ApplicationContextAware, BeanNameAware, DisposableBean, ApplicationEventPublisherAware {
    。。。
    private final AbstractMessageListenerContainer.ContainerDelegate delegate = this::actualInvokeListener;
    。。。
    private AbstractMessageListenerContainer.ContainerDelegate proxy;

再看actualInvokeListener的this.doInvokeListener((MessageListener)listener, message);
再调用 listener.onMessage(message, channelToUse);也就是我们自定义回调的业务方法。

里面的 Object listener = this.getMessageListener();是可以自己代码里设置的,它决定了哪个类作为监听器来实现并执行onMessage的业务方法

protected void actualInvokeListener(Channel channel, Message message) {
          Object listener = this.getMessageListener();
          。。。
            try {
                this.doInvokeListener((MessageListener)listener, message);
            } finally {
        .。。。
            }
        }
    }
protected void doInvokeListener(ChannelAwareMessageListener listener, Channel channel, Message message) {
         。。。
            try {
                listener.onMessage(message, channelToUse);
            } catch (Exception var11) {
         。。。
    }
软件
前端设计
程序设计
Java相关