用户登录
用户注册

分享至

RabbitMQ Client封装连接及业务处理接口

  • 作者: 不正常人类聚集地
  • 来源: 51数据库
  • 2021-10-19

一、RabbitMQ介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

二、rabbitMQ安装

RabbitMQ Download
参考安装博客

三、封装RabbitMqClient.java

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.mina.proxy.utils.ByteUtilities;
import org.slf4j.Logger;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import lombok.extern.slf4j.Slf4j;

/**
 * RabbitMQ连接客户端工具类
 * @author david(857332533@qq.com)
 * 2020年12月31日
 */
@Slf4j
public class RabbitMqClient implements Runnable{
	private final static Logger rabbitLogger = org.slf4j.LoggerFactory.getLogger("rabbit");
	private Connection connection = null;
	private Channel channel = null;
	private String host = "";
	private int port = 5672;
	private String userName = "";
	private String password = "";
	private String virtualHost = "";
	private String queueName = "";
	private boolean isConnected = false;
	private MessageHandler messageHandler;
	private Thread thread;
	
	public RabbitMqClient(String host,int port,String queueName,MessageHandler messageHandler) {
        this(host, port, null, queueName, messageHandler);
	}
	
	public RabbitMqClient(String host,int port,String virtualHost,String queueName,MessageHandler messageHandler) {
        this(host,port,null,null,virtualHost,queueName,messageHandler);
	}
	
	public RabbitMqClient(String host,int port,String userName,String password,String virtualHost,String queueName,MessageHandler messageHandler) {
        this.host = host;
        this.port = port;
        this.userName = userName;
        this.password = password;
        this.virtualHost = virtualHost;
        this.queueName = queueName;
        this.messageHandler = messageHandler;
        //启动
        start();
        //启动线程
        this.thread = new Thread(this);
        this.thread.start();
	}
	
	public void setMessageHandler(MessageHandler messageHandler) {
		this.messageHandler = messageHandler;
	}
	
	public Thread getThread() {
		return thread;
	}
	
	public String getQueueName() {
		return queueName;
	}
	
	public boolean isConnected() {
		return isConnected;
	}

	/**
	 * #启动监听
	 */
	public void start() {
		try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(host);
            factory.setPort(port);
            if(StringUtils.isNotBlank(userName)) {
                factory.setUsername(userName);
            }
            if(StringUtils.isNotBlank(password)) {
                factory.setPassword(password);
            }
            if(StringUtils.isNotBlank(virtualHost)) {
                factory.setVirtualHost(virtualHost);
            }
            connection = factory.newConnection();
            channel = connection.createChannel();
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                	handleDeliveryClient(consumerTag, envelope, properties, body);
                }
            };
            // channel绑定队列,autoAck为true表示一旦收到消息则自动回复确认消息
            channel.basicConsume(queueName, true, consumer);
            isConnected = true;
    		log.info("start "+queueName+" success");
        }catch (Exception e) {
        	log.error("启动MQ监听异常",e);
        	isConnected = false;
        	close();
		}
	}
	
	/**
	 * #释放资源
	 */
	public void close() {
		if(null != channel) {
			try {
				channel.close();
				channel = null;
			} catch (Exception e) {}
		}
		if(null != connection) {
			try {
				connection.close();
				connection = null;
			} catch (Exception e) {}
		}
		isConnected = false;
		log.info("close "+queueName+" success");
	}
	
	/**
	 * #消息数据解析处理
	 * @param consumerTag
	 * @param envelope
	 * @param properties
	 * @param body
	 * @throws IOException
	 */
	public void handleDeliveryClient(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
		try {
			rabbitLogger.info("exchang->{},routingKey->{},queueName->{}->recv:{}",
					envelope.getExchange(),envelope.getRoutingKey(),this.queueName,ByteUtilities.asHex(body));
			if(null != this.messageHandler) {
				this.messageHandler.handlerMessage(this.queueName,body);
			}
		} catch (Exception e) {
			log.error("RabbitMQ消息解析异常",e);
		}
	}

	@Override
	public void run() {
		// TODO Auto-generated method stub
		while (true) {
			try {
				log.info(this.queueName+" is running");
				if(!isConnected) {
					log.info("重连MQ:{}",this.queueName);
					start();
				}
				Thread.sleep(1000*60);
			} catch (Exception e) {}
		}
	}
}

四、消息处理MessageHandler.java

public interface MessageHandler {
	
	/**
	 * 处理MQ消息数据
	 * @param body
	 */
	public void handlerMessage(String queueName,byte[] body);
}

五、总结

封装RabbitMqClient对象,用于连接RabbitMQ,自动ACK确认消息,开启线程自动心跳检测,如果connect失败,则会自动重连。
定义MessageHandler接口,用于处理业务数据。

软件
前端设计
程序设计
Java相关