redis-分布式锁

本文最后更新于:4 分钟前

分布式锁优化过程、Redisson,AOP实现缓存

一 分布式锁优化过程

1 本地锁的局限性

我们学习过synchronized及lock锁,这些锁都是本地锁。接下来写一个案例,演示本地锁的问题

1.1 编写测试代码

在TestController中添加测试方法

1
2
3
4
5
6
7
public interface RedisTestService {
/**
* redis的锁测试demo1
*/
public void lockDemo1();
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class RedisTestServiceImpl implements RedisTestService {

@Autowired
private StringRedisTemplate stringRedisTemplate;

/**
* redis的锁测试demo1
*/
@Override
public void lockDemo1() {
//从缓存中获取key
String s = stringRedisTemplate.boundValueOps("demo1_num").get();
//判断是否为空
if(StringUtils.isEmpty(s)){
return;
}
//若不为空
Integer num = Integer.parseInt(s);
//自增保存
stringRedisTemplate.boundValueOps("demo1_num").set(++num + "");
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@RestController
@RequestMapping(value = "/redis/product/test")
public class RedisTestController {

@Autowired
private RedisTestService redisTestService;

@GetMapping(value = "/demo1")
public Result demo1(){
redisTestService.lockDemo1();
return Result.ok();
}
}

说明:通过reids客户端设置num=0

1.2 使用ab工具测试

在redis中,玩ab测试工具:httpd-tools(yum install -y httpd-tools)

1
2
ab  -n(一次发送的请求数)  -c(请求的并发数) 访问路径
ab -n 5000 -c 100 http://192.168.200.1:8206/api/redis/test01

测试如下:5000请求,100并发

img

查看redis中的值:

image-20220323151354704

1.3 使用本地锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class RedisTestServiceImpl implements RedisTestService {

@Autowired
private StringRedisTemplate stringRedisTemplate;
/**
* redis的锁测试demo1
*/
@Override
public synchronized void lockDemo1() {
//从缓存中获取key
String s = stringRedisTemplate.boundValueOps("demo1_num").get();
//判断是否为空
if(StringUtils.isEmpty(s)){
return;
}
//若不为空
Integer num = Integer.parseInt(s);
//自增保存
stringRedisTemplate.boundValueOps("demo1_num").set(++num + "");
}
}

使用ab工具压力测试:5000次请求,并发100

img

查看redis中的结果:

img

完美!与预期一致,是否真的完美?

接下来再看集群情况下,会怎样?

1.4 本地锁问题演示锁

接下来启动8206 8216 8226 三个运行实例。

运行多个service-product实例:

image-20220323152243090

通过网关压力测试

启动网关:

ab -n 5000 -c 100 http://192.168.200.1/redis/product/test/testLock

img

查看redis中的值:

img

集群情况下又出问题了!!!

以上测试,可以发现:

本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性。

此时需要分布式锁

2 分布式锁实现

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁主流的实现方案:

  1. 基于数据库实现分布式锁

  2. 基于缓存(Redis等)

  3. 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

  1. 性能:redis最高

  2. 可靠性:zookeeper最高

==这里,我们就基于redis实现分布式锁。==

3 使用redis实现分布式锁

redis命令

set sku:1:info “OK” NX PX 10000

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。

PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 SETEX key millisecond value 。

NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。

XX :只在键已经存在时,才对键进行设置操作。

1568170959121

  1. 多个客户端同时获取锁(setnx)

  2. 获取成功,执行业务逻辑,执行完成释放锁(del)

  3. 其他客户端等待重试

3.1 编写代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Service
public class RedisTestServiceImpl implements RedisTestService {

@Autowired
private StringRedisTemplate stringRedisTemplate;

/**
* redis的锁测试demo1
*/
@Override
public void lockDemo1() {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
//lock为true的时候代表获取到了锁
if(lock){
//从缓存中获取key
String s = stringRedisTemplate.boundValueOps("demo1_num").get();
//判断是否为空
if(StringUtils.isEmpty(s)){
return;
}
//若不为空
Integer num = Integer.parseInt(s);
//自增保存
stringRedisTemplate.boundValueOps("demo1_num").set(++num + "");
//释放锁
stringRedisTemplate.delete("lock");
}else{
try {
//没有获取到锁,线程睡觉1秒
Thread.sleep(1000);
//重试:递归
lockDemo1();
}catch (Exception e){
e.printStackTrace();
}
}

}
}

重启,服务集群,通过网关压力测试:

img

查看redis中num的值:

1568172286652

基本实现。

问题:setnx刚好获取到锁,业务逻辑出现异常,导致锁无法释放

==解决:设置过期时间,自动释放锁。==

3.2 优化之设置锁的过期时间

设置过期时间有两种方式:

  1. 首先想到通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

  2. 在set时指定过期时间(推荐)

1568172928666

设置过期时间:

1568173359179

压力测试肯定也没有问题。自行测试

1
2
Boolean lock = redisTemplate.opsForValue()
.setIfAbsent("lock", "111",3,TimeUnit.SECONDS);

问题:可能会释放其他服务器的锁。

==场景:如果业务逻辑的执行时间是7s。执行流程如下==

  1. index1业务逻辑没执行完,3秒后锁被自动释放。

  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

  3. index3获取到锁,执行业务逻辑

  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的锁

3.3 优化之UUID防误删

1568174534027

img

==问题:删除操作缺乏原子性。==

场景:

  1. index1执行删除时,查询到的lock值确实和uuid相等

    uuid=v1

    set(lock,uuid);

    img

  2. index1执行删除前,lock刚好过期时间已到,被redis自动释放

    在redis中没有了lock,没有了锁。

    img

  3. index2获取了lock

    index2线程获取到了cpu的资源,开始执行方法

    uuid=v2

    set(lock,uuid);

  4. index1执行删除,此时会把index2的lock删除

    index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行

    img

    删除的index2的锁!

3.4 优化之LUA脚本保证删除的原子性

lua脚本

1
2
3
4
5
if redis.call('get', KEYS[1]) == ARGV[1] then 
return redis.call('del', KEYS[1])
else
return 0
end

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package slx.blue.gmall.product.service.impl;

import slx.blue.gmall.product.service.RedisTestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Service
public class RedisTestServiceImpl implements RedisTestService {

@Autowired
private StringRedisTemplate stringRedisTemplate;

/**
* redis的锁测试demo1
*/
@Override
public void lockDemo1() {
// SETNX 如果key不存在,才对key进行设置操作
//上锁的时候设置过期时间三秒 防止业务异常无法释放锁
// 并且设置一个唯一的值UUID,防止因业务执行超时后自动释放锁 又主动释放别人的锁
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
//lock为true的时候代表获取到了锁
if(lock){
//从缓存中获取key
String s = stringRedisTemplate.boundValueOps("demo1_num").get();
//判断是否为空
if(StringUtils.isEmpty(s)){
return;
}
//若不为空
Integer num = Integer.parseInt(s);
//自增保存
stringRedisTemplate.boundValueOps("demo1_num").set(++num + "");
//操作完 拿到锁和当前uuid进行判断 看看是否还是自己的锁 如果是则删除
// 问题 if里面的流程不是原子性的 还是容易出现问题
// if(stringRedisTemplate.opsForValue().get("lock").equals(uuid)){
//// stringRedisTemplate.delete("lock");
// }

//用lua脚本删除
/**
* if redis.call('get', KEYS[1]) == ARGV[1] then
* return redis.call('del', KEYS[1])
* else
* return 0
* end
*/
DefaultRedisScript<Long> script = new DefaultRedisScript();
//设置lua脚本
script.setScriptText("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end");
//设置返回类型
script.setResultType(Long.class);
//执行脚本释放锁
stringRedisTemplate.execute(script, Arrays.asList("lock"), uuid);
}else{
try {
//没有获取到锁,线程睡觉1秒
Thread.sleep(1000);
//重试:递归
lockDemo1();
}catch (Exception e){
e.printStackTrace();
}
}

}
}

Lua 脚本详解:

img

3.5 总结

  1. 加锁
1
2
3
4
// 1. 从redis中获取锁,set k1 v1 px 20000 nx
String uuid = UUID.randomUUID().toString();
Boolean lock = this.redisTemplate.opsForValue()
.setIfAbsent("lock", uuid, 2, TimeUnit.SECONDS);
  1. 使用lua释放锁
1
2
3
4
5
6
7
8
// 2. 释放锁 del
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 设置lua脚本返回的数据类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
// 设置lua脚本返回类型为Long
redisScript.setResultType(Long.class);
redisScript.setScriptText(script);
redisTemplate.execute(redisScript, Arrays.asList("lock"),uuid);
  1. 重试
1
2
Thread.sleep(1000); 
testLock();

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  • ==互斥性: 在任意时刻,只有一个客户端能持有锁;==

  • ==不会发生死锁: 即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁;==

  • ==解铃还须系铃人: 加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了;==

  • ==加锁和解锁必须具有原子性;==

redis集群状态下的问题:

  1. 客户端A从master获取到锁

  2. 在master将锁同步到slave之前,master宕掉了。

  3. slave节点被晋级为master节点

  4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。

安全失效!

解决方案:

img

但是!上述情况只是解决了redis单点的情况。如果redis也是集群情况,会有数据同步的情况,以主从为例,如果setNX给到了主机,但是数据还未同步,然后主机宕机了,从机便又没有锁了,服务即又可以给redis从机加新的锁,这会导致锁失效。所以经典白学系列,解决问题的还得看下面两节。

4 使用redisson 解决分布式锁

Github 地址:https://github.com/redisson/redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

1568176834908

官方文档地址:https://github.com/redisson/redisson/wiki

4.1 实现代码

1
2
3
4
5
6
<!-- redisson 分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.2</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package slx.blue.gmall.common.config;

import lombok.Data;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;

/**
* redisson配置信息
*/
@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {

private String host;

private String addresses;

private String password;

private String port;

private int timeout = 3000;
private int connectionPoolSize = 64;
private int connectionMinimumIdleSize=10;
private int pingConnectionInterval = 60000;
private static String ADDRESS_PREFIX = "redis://";

/**
* 自动装配
*
*/
@Bean
RedissonClient redissonSingle() {
Config config = new Config();

if(StringUtils.isEmpty(host)){
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout)
.setPingConnectionInterval(pingConnectionInterval)
.setConnectionPoolSize(this.connectionPoolSize)
.setConnectionMinimumIdleSize(this.connectionMinimumIdleSize);
if(!StringUtils.isEmpty(this.password)) {
serverConfig.setPassword(this.password);
}
return Redisson.create(config);
}
}

4.2 可重入锁(Reentrant Lock)

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。

大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。

另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

快速入门使用的就是可重入锁。也是最常使用的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Autowired
private RedissonClient redissonClient;

@Override
public void setRedisRedisson() {
//抢锁 extends Lock :手动上锁释放锁
RLock lock = redissonClient.getLock("lock");
//操作
try {
//抢锁 10秒内一直尝试加锁,加锁成功的话 锁的有效期为20秒
if (lock.tryLock(10, 20, TimeUnit.SECONDS)) {
try {
//尝试加锁成功
//拿到锁了
// 从缓存中拿到值
String value = stringRedisTemplate.boundValueOps("test01").get();
//如果为空 直接返回
if (StringUtils.isEmpty(value)) {
return;
}
//不为空
Integer num = Integer.parseInt(value);
stringRedisTemplate.boundValueOps("test01").set(++num + "");
} catch (NumberFormatException e) {
System.out.println("加锁成功以后出现异常");
e.printStackTrace();
} finally {
//释放锁
//能进来 是肯定已经上了锁 所以应该在finally释放锁
lock.unlock();
}
}
} catch (InterruptedException e) {
System.out.println("加锁失败 出现异常");
e.printStackTrace();
}
}

4.3 读写锁(ReadWriteLock)

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。

分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 读锁
*
* @return
*/
@Override
public String readLock() {
RReadWriteLock lock = redissonClient.getReadWriteLock("lock");
//读锁:3秒
lock.readLock().lock(10, TimeUnit.SECONDS);
//redis读数据
String s = stringRedisTemplate.boundValueOps("msg").get();
return s;
}

/**
* 写锁
*/
@Override
public String writeLock() {
RReadWriteLock lock = redissonClient.getReadWriteLock("lock");
//写锁
lock.writeLock().lock(10, TimeUnit.SECONDS);
//写数据
stringRedisTemplate.boundValueOps("msg").set(UUID.randomUUID().toString());

return "写完了";
}

打开开两个浏览器窗口测试:

localhost:8206/admin/product/test/read

http://localhost:8206/admin/product/test/write

- 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始

- 同时访问读:不用等待

- 先写后读:读要等待(约10s)写完成

- 先读后写:写要等待(约10s)读完成

二 分布式锁改造获取sku信息

3.1 使用redis方案

RedisConst 类中追加一个变量

1
2
3
4
// 商品如果在数据库中不存在那么会缓存一个空对象进去,但是这个对象是没有用的,所以这个对象的过期时间应该不能太长,
// 如果太长会占用内存。
// 定义变量,记录空对象的缓存过期时间
public static final long SKUKEY_TEMPORARY_TIMEOUT = 5 * 60;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Autowired
private StringRedisTemplate stringRedisTemplate;

/**
* 通过redis分布式锁 获取sku的信息
*
* @param skuId
* @return
*/
private SkuInfo getSkuByRedis(Long skuId) {
//参数校验
if (skuId == null) {
return null;
}

//拿到商品key sku:33:info
String key = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKUKEY_SUFFIX;
//根据key去查缓存值
String s = (String) stringRedisTemplate.boundValueOps(key).get();
//检查值是否为空
if (!StringUtils.isEmpty(s)) {
//缓存有数据,直接返回
return JSONObject.parseObject(s, SkuInfo.class);
} else {
//缓存没有数据
//定义当前商品的锁 sku:33:lock
String lock = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKULOCK_SUFFIX;
//随机uuid字符串 保证不删除被人的的锁
String uuid = UUID.randomUUID().toString();
//加锁
Boolean isLock = stringRedisTemplate.opsForValue().
setIfAbsent(lock, uuid, RedisConst.SKULOCK_EXPIRE_PX2, TimeUnit.SECONDS);
if (isLock) {
//加锁成功 从数据库拿到锁数据
SkuInfo skuInfo = skuInfoMapper.selectById(skuId);
if (null == skuInfo || skuInfo.getId() == null) {
//查到的数据为空 缓存三分钟空值 防止穿透
skuInfo = new SkuInfo();
stringRedisTemplate.opsForValue()
.set(key, JSONObject.toJSONString(skuInfo),
RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
return skuInfo;
} else {
//不为空 设置缓存值 缓存时间为一天
stringRedisTemplate.opsForValue()
.set(key, JSONObject.toJSONString(skuInfo),
RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
//设置lua释放锁的脚本
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
//设置释放脚本
redisScript.setScriptText("if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end ");
//设置返回类型
redisScript.setResultType(Long.class);
//释放锁
stringRedisTemplate.execute(redisScript, Arrays.asList(lock), uuid);
return skuInfo;
}
} else {
//加锁失败 睡一秒 递归再次尝试
try {
TimeUnit.SECONDS.sleep(1);
return getSkuByRedis(skuId);
} catch (InterruptedException e) {
return null;
}
}
}
}

3.2 使用redisson方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Autowired
private RedissonClient redissonClient;

@Autowired
private RedisTemplate redisTemplate;

private SkuInfo getSkuByRedisson(Long skuId) {
//参数校验
if (skuId == null) {
return null;
}
//定义key
String key = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKUKEY_SUFFIX;

//缓存取值
SkuInfo skuInfo = (SkuInfo) redisTemplate.opsForValue().get(key);
if (null != skuInfo) {
//缓存有值 直接返回
return skuInfo;
}
//定义锁
String lockKey = RedisConst.SKUKEY_PREFIX + skuId + RedisConst.SKULOCK_SUFFIX;
//拿到锁
RLock rLock = redissonClient.getLock(lockKey);
//缓存为空 去数据库取值
try {
//20秒内重复尝试去拿锁,拿到锁可以持有10秒
if (rLock.tryLock(20, 10, TimeUnit.SECONDS)) {
//进来了说明一定拿到锁了,所以在finally要释放锁
try {
//拿到锁了 执行逻辑
skuInfo = skuInfoMapper.selectById(skuId);
if (null == skuInfo || null == skuInfo.getId()) {
//数据库查到的数据为空 缓存一个空值 三分钟
redisTemplate.opsForValue()
.set(key, skuInfo, RedisConst.SKUKEY_TEMPORARY_TIMEOUT, TimeUnit.SECONDS);
} else {
// 数据不为空 设置时间为一天的缓存
redisTemplate.opsForValue()
.set(key, skuInfo, RedisConst.SKUKEY_TIMEOUT, TimeUnit.SECONDS);
}
return skuInfo;
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
} else {
//额定时间抢锁失败 递归重新执行
try {
TimeUnit.SECONDS.sleep(1);
getSkuByRedisson(skuId);
} catch (InterruptedException e) {
}

}
} catch (InterruptedException e) {

e.printStackTrace();
}
return null;

}

3.3 在getSkuInfo 中调用上述两个方法进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 查询sku的详细信息
* @param skuId
* @return
*/
@Override
public SkuInfo getSkuInfo(Long skuId) {
//redis或redisson两种方案任选其一
// SkuInfo skuInfo = getSkuByRedis(skuId);
SkuInfo skuInfo = getSkuByRedisson(skuId);
if(skuInfo == null || skuInfo.getId() == null){
//防止缓存宕机
skuInfo = getSkuFromDb(skuId);
}
//返回结果
return skuInfo;
}

三 分布式锁 + AOP实现缓存

随着业务中缓存及分布式锁的加入,业务代码变的复杂起来,除了需要考虑业务逻辑本身,还要考虑缓存及分布式锁的问题,增加了程序员的工作量及开发难度。而缓存的玩法套路特别类似于事务,而声明式事务就是用了aop的思想实现的。

20181207003231127

  1. 以 @Transactional 注解为植入点的切点,这样才能知道@Transactional注解标注的方法需要被代理。

  2. @Transactional注解的切面逻辑类似于@Around

模拟事务,缓存可以这样实现:

  1. 自定义缓存注解@GmallCache(类似于事务@Transactional)

  2. 编写切面类,使用环绕通知实现缓存的逻辑封装

img

4.1 定义一个注解

1
2
3
4
5
6
7
8
9
10
11
12
package slx.blue.gmall.common.cache;


import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GmallCache {
String prefix() default "cache";
}

4.2 定义一个切面类加上注解

Spring aop 参考文档:

https://docs.spring.io/spring/docs/5.2.6.BUILD-SNAPSHOT/spring-framework-reference/core.html#aop-api-pointcuts-aspectj

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package slx.blue.gmall.common.cache;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

@Component
@Aspect
public class GmallCacheAspect {

@Autowired
private RedisTemplate redisTemplate;

@Autowired
private RedissonClient redissonClient;

//环绕监听的范围 只要加了范围内的注解 就可以AOP
@Around("@annotation(slx.blue.gmall.common.cache.GmallCache)")
public Object cacheAroundAdvice(ProceedingJoinPoint point){

/*
1. 获取参数列表
2. 获取方法上的注解
3. 获取前缀
4. 获取目标方法的返回值
*/
//初始化通用返回结果
Object result = null;
try {
//获取方法的参数 point:切点方法
Object[] args = point.getArgs();
System.out.println("gmallCache:"+args);
//拿到方法的签名:拿到方法的属性
MethodSignature signature = (MethodSignature) point.getSignature();
//通过方法前面获取方法的注解 @GmallCache 然后实例化
GmallCache gmallCache = signature.getMethod().getAnnotation(GmallCache.class);
// 拿到注解的前缀 用于缓存
String prefix = gmallCache.prefix();
// 拼接缓存key getSkuInfo:[1,xxx]
String key = prefix+Arrays.asList(args).toString();

// 私有方法,从redis获取值
result = cacheHit(signature, key);
if (result!=null){
// 缓存有数据 直接返回
return result;
}
/*----------------------方法还没开始运行---------------------------------*/
// 初始化分布式锁
// 锁的key getSkuInfo[10]:lock
// 缓存的key getSkuInfo[10]
//获取锁
RLock lock = redissonClient.getLock(key + ":lock");
//尝试加锁 100内持续尝试获得锁 获得锁过期时间100秒
boolean flag = lock.tryLock(100, 100, TimeUnit.SECONDS);
//获取锁成功
if (flag){
//加锁成功
try {
try {
//执行方法 查询数据库
result = point.proceed(point.getArgs());
// 防止缓存穿透
if (null==result){
// 数据库没有值,缓存一个空对象,给五分钟过期时间
Object o = new Object();
this.redisTemplate.opsForValue().set(key, JSONObject.toJSONString(o),
300,TimeUnit.SECONDS);
return null;
}else {
// 并把结果放入缓存 时间为一天
this.redisTemplate.opsForValue().set(key, JSONObject.toJSONString(result),
20*60*60,TimeUnit.SECONDS);
}
} catch (Throwable throwable) {
throwable.printStackTrace();
}

return result;
}catch (Exception e){
e.printStackTrace();
}finally {
// 释放锁
lock.unlock();
}
}
}catch (Exception e){
e.printStackTrace();
}
//boolean flag = lock.tryLock(10L, 10L, TimeUnit.SECONDS);
return result;
}
// 根据key 获取缓存数据
private Object cacheHit(MethodSignature signature, String key) {
// 1. 查询缓存
String cache = (String)redisTemplate.opsForValue().get(key);
if (StringUtils.isNotBlank(cache)) {
// 有,则反序列化,直接返回
Class returnType = signature.getReturnType(); // 获取方法返回类型
// 不能使用parseArray<cache, T>,因为不知道List<T>中的泛型
return JSONObject.parseObject(cache, returnType);
}
return null;
}

}

4.3 使用注解完成缓存

修改方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 查询sku的详细信息
*
* @param skuId
* @return
*/
@GmallCache(prefix = RedisConst.SKUKEY_PREFIX)
@Override
public SkuInfo getSkuInfo(Long skuId) {
//防止缓存宕机
SkuInfo skuInfo = getSkuFromDb(skuId);
//返回结果
return skuInfo;
}
1
2
3
4
5
6
7
8
9
10
11
@GmallCache(prefix = "saleAttrValuesBySpu:")
public Map getSaleAttrValuesBySpu(Long spuId)

@GmallCache(prefix = "spuSaleAttrListCheckBySku:")
public List<SpuSaleAttr> getSpuSaleAttrListCheckBySku(Long skuId, Long spuId)

@GmallCache(prefix = "skuPrice:")
public BigDecimal getSkuPrice(Long skuId)

@GmallCache(prefix = "categoryViewByCategory3Id:")
public BaseCategoryView getCategoryViewByCategory3Id(Long category3Id)

测试效果:

img


本博客目前大部分文章都是参考尚硅谷或者马士兵教育的学习资料!