java缓存一致性问题及解决方案
共 12528字,需浏览 26分钟
·
2021-05-19 02:37
点击上方蓝色字体,选择“标星公众号”
优质文章,第一时间送达
读取缓存步骤一般没有什么问题,但是一旦涉及到数据更新:数据库和缓存更新,就容 易出现缓存(Redis)和数据库(MySQL)间的数据一致性问题。
一、讨论一致性问题之前,先来看一个更新的操作顺序问题:
先删除缓存,再更新数据库
问题:同时有一个请求 A 进行更新操作,一个请求 B 进行查询操作。可能出现:
(1)请求 A 进行写操作(key = 1 value = 2),先删除缓存 key = 1 value = 1
(2)请求 B 查询发现缓存不存在
(3)请求 B 去数据库查询得到旧值 key = 1 value = 1
(4)请求 B 将旧值写入缓存 key = 1 value = 1
(5)请求 A 将新值写入数据库 key = 1 value = 2
缓存中数据永远都是脏数据
我们比较推荐操作顺序:
先删除缓存,再更新数据库,再删缓存(双删,第二次删可异步延时)
public void write(String key,Object data){
redis.delKey(key);
db.updateData(data);
Thread.sleep(500);
redis.delKey(key);
}
接下来,看一看缓存同步的一些方案,见下图:
1、 数据实时同步更新
更新数据库同时更新缓存,使用缓存工具类和或编码实现。
优点:数据实时同步更新,保持强一致性
缺点:代码耦合,对业务代码有侵入性
2、 数据准实时更新
准一致性,更新数据库后,异步更新缓存,使用观察者模式/发布订阅/MQ 实现;
优点:数据同步有较短延迟 ,与业务解耦
缺点:实现复杂,架构较重
3 、缓存失效机制
弱一致性,基于缓存本身的失效机制
优点:实现简单,无须引入额外逻辑
缺点:有一定延迟,存在缓存击穿/雪崩问题
4、 定时任务更新
最终一致性,采用任务调度框架,按照一定频率更新
优点:不影响正常业务
优点:不保证一致性,依赖定时任务
二、 缓存击穿、缓存雪崩及解决方案
1 、缓存击穿
缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于 并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力
瞬间增大,造成过大压力
2 、缓存雪崩
缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压 力过大甚至 down 机。和缓存击穿不同的是,缓存击穿指并发查同一条数据,缓存雪崩
是不同数据都过期了,很多数据都查不到从而查数据库。
解决方案:
1)单体服务:此时需要对数据库的查询操作,加锁 ---- lock (因考虑到是对同一个参数数值上 一把锁,此处 synchronized 机制无法使用) 加锁的标准流程代码如下:
/**
* 解决缓存雪崩和击穿方案
*/
@Service("provincesService")
public class ProvincesServiceImpl3 extends ProvincesServiceImpl implements ProvincesService{
private static final Logger logger = LoggerFactory.getLogger(ProvincesServiceImpl3.class);
@Resource
private CacheManager cm;//使用注解缓存
private ConcurrentHashMap<String, Lock> locks = new ConcurrentHashMap<>();//线程安全的
private static final String CACHE_NAME = "province";
public Provinces detail(String provinceid) {
// 1.从缓存中取数据
Cache.ValueWrapper valueWrapper = cm.getCache(CACHE_NAME).get(provinceid);
if (valueWrapper != null) {
logger.info("缓存中得到数据");
return (Provinces) (valueWrapper.get());
}
//2.加锁排队,阻塞式锁---100个线程走到这里---同一个sql的取同一把锁
doLock(provinceid);//32个省,最多只有32把锁,1000个线程
try{//第二个线程进来了
// 一次只有一个线程
//双重校验,不加也没关系,无非是多刷几次库
valueWrapper = cm.getCache(CACHE_NAME).get(provinceid);//第二个线程,能从缓存里拿到值?
if (valueWrapper != null) {
logger.info("缓存中得到数据");
return (Provinces) (valueWrapper.get());//第二个线程,这里返回
}
Provinces provinces = super.detail(provinceid);
// 3.从数据库查询的结果不为空,则把数据放入缓存中,方便下次查询
if (null != provinces){
cm.getCache(CACHE_NAME).put(provinceid, provinces);
}
return provinces;
}catch(Exception e){
return null;
}finally{
//4.解锁
releaseLock(provinceid);
}
}
private void releaseLock(String userCode) {
ReentrantLock oldLock = (ReentrantLock) locks.get(userCode);
//查询锁是否存在和查询当前线程是否保持此锁
if(oldLock !=null && oldLock.isHeldByCurrentThread()){
oldLock.unlock();
}
}
private void doLock(String lockcode) {//给一个搜索条件,对应一个锁
//provinceid有不同的值,参数多样化
//provinceid相同的,加一个锁,---- 不是同一个key,不能用同一个锁
ReentrantLock newLock = new ReentrantLock();//创建一个锁
Lock oldLock = locks.putIfAbsent(lockcode, newLock);//若已存在,则newLock直接丢弃
if(oldLock == null){
newLock.lock();//首次加锁,成功取锁,执行
}else{
oldLock.lock();//阻塞式等待取锁
}
}
}
此场景下的锁换成分布式锁(redis或zk等);同时设置多次取锁功能;
/**
* 解决缓存雪崩和击穿方案
*/
@Service("provincesService")
public class ProvincesServiceImpl5 extends ProvincesServiceImpl implements ProvincesService{
private static final Logger logger = LoggerFactory.getLogger(ProvincesServiceImpl3.class);
@Resource
private CacheManager cm;//使用注解缓存
@Autowired
private RedisTemplate<String, Object> redisTemplate;
private ConcurrentHashMap<String, Lock> locks = new ConcurrentHashMap<>();//线程安全的
private static final String CACHE_NAME = "province";
public Provinces detail(String provinceid) throws Exception{
// 1.从缓存中取数据
Cache.ValueWrapper valueWrapper = cm.getCache(CACHE_NAME).get(provinceid);
if (valueWrapper != null) {
logger.info("缓存中得到数据");
return (Provinces) (valueWrapper.get());
}
//2.加锁排队,阻塞式锁---100个线程走到这里---同一个sql的取同一把锁
//32个省,最多只有32把锁,1000个线程
boolean flag=false;
flag = RedisUtil.setNX(provinceid, 3000);
//如果首次没有取到锁,可以取10次
if(!flag){
for(int i=0;i<10;i++){
Thread.sleep(200);
flag = RedisUtil.setNX(provinceid, 3000);//分布式锁
if(flag){
break;
}
}
}
//如果首次没有取到锁,一直取直到取到为止
/* if(!flag){
for (;;){
Thread.sleep(200);
flag = RedisUtil.setNX(provinceid, 3000);//分布式锁
if(flag){
break;
}
}
}*/
try{//第二个线程进来了
// 一次只有一个线程
//双重校验,不加也没关系,无非是多刷几次库
valueWrapper = cm.getCache(CACHE_NAME).get(provinceid);//第二个线程,能从缓存里拿到值?
if (valueWrapper != null) {
logger.info("缓存中得到数据");
return (Provinces) (valueWrapper.get());//第二个线程,这里返回
}
Provinces provinces = super.detail(provinceid);
// 3.从数据库查询的结果不为空,则把数据放入缓存中,方便下次查询
if (null != provinces){
cm.getCache(CACHE_NAME).put(provinceid, provinces);
}
return provinces;
}catch(Exception e){
return null;
}finally{
//4.解锁
RedisUtil.releaseLock(provinceid);
}
}
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:
https://blog.csdn.net/nandao158/article/details/112757347
锋哥最新SpringCloud分布式电商秒杀课程发布
👇👇👇
👆长按上方微信二维码 2 秒
感谢点赞支持下哈