Spring boot+redis实现消息发布与订阅
- 作者: 阴谋煮屎人
- 来源: 51数据库
- 2021-09-03
一.创建spring boot项目
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-data-redis</artifactid>
</dependency>
<dependency>
<groupid>org.springframework.boot</groupid>
<artifactid>spring-boot-starter-web</artifactid>
</dependency>
<dependency>
<groupid>com.alibaba</groupid>
<artifactid>fastjson</artifactid>
<version>1.2.41</version>
</dependency>
二.编辑yml配置文件
server:
port: 7888
# 日志配置
logging:
config: classpath:log/logback.xml
level:
cn.com.dhcc: info
org.springframework: info
org.springframework.web: info
com.alibaba.nacos.client.naming: error
spring:
redis:
host: localhost
port: 6379
password: *********
database: 1
jedis:
pool:
max-idle: 8
max-active: 8
max-wait: -1
min-idle: 0
timeout: 5000
三.配置redis
@configuration
public class redisconfiguration {
/**
* 实例化 redistemplate 对象
*
* @return
*/
@bean("redistemplates")
public redistemplate<string, object> functiondomainredistemplate(redisconnectionfactory redisconnectionfactory) {
redistemplate<string, object> redistemplate = new redistemplate<>();
initdomainredistemplate(redistemplate, redisconnectionfactory);
return redistemplate;
}
/**
* 设置数据存入 redis 的序列化方式,并开启事务
*
* @param redistemplate
* @param factory
*/
private void initdomainredistemplate(@qualifier("redistemplates") redistemplate<string, object> redistemplate, redisconnectionfactory factory) {
// 如果不配置serializer,那么存储的时候缺省使用string,如果用user类型存储,那么会提示错误user can't cast to
// string!
redistemplate.setkeyserializer(new stringredisserializer());
redistemplate.sethashkeyserializer(new stringredisserializer());
fastjsonredisserializer<object> fastjsonredisserializer = new fastjsonredisserializer<object>(object.class);
redistemplate.sethashvalueserializer(fastjsonredisserializer);
redistemplate.setvalueserializer(fastjsonredisserializer);
//redistemplate.sethashvalueserializer(new genericjackson2jsonredisserializer());
//redistemplate.setvalueserializer(new genericjackson2jsonredisserializer());
// 开启事务
redistemplate.setenabletransactionsupport(true);
redistemplate.setconnectionfactory(factory);
}
/**
* 注入封装redistemplate @title: redisutil @return redisutil @date
*
*/
@bean(name = "redisutils")
public redisutils redisutil(@qualifier("redistemplates") redistemplate<string, object> redistemplate) {
redisutils redisutil = new redisutils();
redisutil.setredistemplate(redistemplate);
return redisutil;
}
四.编写redisutil消息发布方法
public class redisutils {
private static final logger log = loggerfactory.getlogger(redisutils.class);
private redistemplate<string, object> redistemplate;
public void setredistemplate(redistemplate<string, object> redistemplate) {
this.redistemplate = redistemplate;
}
public void publish(string channal ,object obj) {
redistemplate.convertandsend(channal,obj );
}
}
五.配置消息监听
@configuration
public class redismessagelistener {
/**
* 创建连接工厂
* @param connectionfactory
* @param listeneradapter
* @return
*/
@bean
public redismessagelistenercontainer container(redisconnectionfactory connectionfactory,
messagelisteneradapter listeneradapter,messagelisteneradapter listeneradapter2){
redismessagelistenercontainer container = new redismessagelistenercontainer();
container.setconnectionfactory(connectionfactory);
//接受消息的key
container.addmessagelistener(listeneradapter,new patterntopic("phone"));
return container;
}
/**
* 绑定消息监听者和接收监听的方法
* @param receiver
* @return
*/
@bean
public messagelisteneradapter listeneradapter(receiverredismessage receiver){
return new messagelisteneradapter(receiver,"receivemessage");
}
/**
* 注册订阅者
* @param latch
* @return
*/
@bean
receiverredismessage receiver(countdownlatch latch) {
return new receiverredismessage(latch);
}
/**
* 计数器,用来控制线程
* @return
*/
@bean
public countdownlatch latch(){
return new countdownlatch(1);//指定了计数的次数 1
}
}
六.消息订阅方法
public class receiverredismessage {
private static final logger log = loggerfactory.getlogger(receiverredismessage.class);
private countdownlatch latch;
@autowired
public receiverredismessage(countdownlatch latch) {
this.latch = latch;
}
/**
* 队列消息接收方法
*
* @param jsonmsg
*/
public void receivemessage(string jsonmsg) {
log.info("[开始消费redis消息队列phone数据...]");
try {
log.info("监听者收到消息:{}", jsonmsg);
jsonobject exjson = jsonobject.parseobject(jsonmsg);
user user = json.tojavaobject(exjson, user.class);
system.out.println("转化为对象 :"+user);
log.info("[消费redis消息队列phone数据成功.]");
} catch (exception e) {
log.error("[消费redis消息队列phone数据失败,失败信息:{}]", e.getmessage());
}
latch.countdown();
}
}
七.定时消息发布测试
@enablescheduling
@component
public class publishercontroller {
private static final logger log = loggerfactory.getlogger(publishercontroller.class);
@autowired
private redisutils redisutils;
@scheduled(fixedrate = 5000)
public string pubmsg() {
user user=new user(1, "尚***", 26,"男","陕西省xxxx市xxxxxx县");
redisutils.publish("phone", user);
log.info("publisher sendes topic... ");
return "success";
}
}
八.测试结果
九.发布对象user实体
public class user implements serializable {
/**
*
*/
private static final long serialversionuid = 1l;
private int id;
private string name;
private int age;
private string sex;
private string address;
.....................
}
推荐阅读
- SQLite数据库操作:原生操作,GreenDao操作讲解
- 数据库SQL实战题:获取员工其当前的薪水比其manager当前薪水还高的相关信息(教程)
- SQLSERVER查询区分大小写的写法分析
- SQL学习总结之SQL的分类介绍
- 阶梯到高级T-SQL 1级:高级T-SQL介绍交叉连接
- 高级T-SQL级别1的Stairway:使用CROSS JOIN引入高级T-SQL分析
- 什么是SQL隔离级别?四个SQL隔离级别定义介绍
- Sql递归介绍之用with实现递归查询
- SQLSERVER查询时日期格式化的实例讲解
- 数据库SQL实战:从titles表获取按照title进行分组,注意对于重复的emp_no进行忽略(题解)
热点文章
SQLite数据库操作:原生操作,GreenDao操作讲解
18
数据库SQL实战题:获取员工其当前的薪水比其manager当前薪水还高的相关信息(教程)
4
SQLSERVER查询区分大小写的写法分析
36
SQL学习总结之SQL的分类介绍
6
阶梯到高级T-SQL 1级:高级T-SQL介绍交叉连接
4
高级T-SQL级别1的Stairway:使用CROSS JOIN引入高级T-SQL分析
4
什么是SQL隔离级别?四个SQL隔离级别定义介绍
2
Sql递归介绍之用with实现递归查询
6
SQLSERVER查询时日期格式化的实例讲解
4
数据库SQL实战:从titles表获取按照title进行分组,注意对于重复的emp_no进行忽略(题解)
7
