redisson的分布式锁的使用
redisson的使用:1、引入依赖<!-- redisson 分布式锁--><dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.15.3</version></dep
·
redisson的使用:
1、引入依赖
<!-- redisson 分布式锁-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.3</version>
</dependency>
2、配置redisson的相关信息(单redis节点模式)
/**
* redisson配置信息
*/
@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {
private String host;
private String addresses;
private String password;
private String port;
private int timeout = 3000;
private int connectionPoolSize = 64;
private int connectionMinimumIdleSize=10;
private int pingConnectionInterval = 60000;
private static String ADDRESS_PREFIX = "redis://";
/**
* 自动装配
*
*/
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
if(StringUtils.isEmpty(host)){
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
//redis://127.0.0.1:7181
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout)
.setPingConnectionInterval(pingConnectionInterval)
.setConnectionPoolSize(this.connectionPoolSize)
.setConnectionMinimumIdleSize(this.connectionMinimumIdleSize)
;
if(!StringUtils.isEmpty(this.password)) {
serverConfig.setPassword(this.password);
}
// RedissonClient redisson = Redisson.create(config);
return Redisson.create(config);
}
3、直接使用
@RestController
public class RedissonController {
@Autowired
private RedissonClient redissonClient;
@GetMapping("/lock")
public String lock(){
String s = UUID.randomUUID().toString();
RLock lock = redissonClient.getLock("lock"+s);
try {
lock.lock(); // 加锁,拿不到锁会一直自旋等待
//redisson加的锁默认过期时长是30s,没过10s续一次
//redisson不会死锁,因为有过期时间
System.out.println("66666666666"+s);
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return "ok"+s;
}
}
分布式锁和同步器
1、获取锁(锁的名称一样,就是同一把锁),加锁(加的是自旋锁),业务时间超长的时候,锁会自动续期
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock(); //锁是自旋锁,抢不到会一直自旋,锁会自动续期
2、指定加锁时间,超过时间自动解锁
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
3、尝试加锁,不成功就放弃
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
如果一个线程A加锁成功,锁还没过期,另一个线程也来加锁,但是会失败,然后finally解锁,会抛出异常,因为它想解别人的锁。
看门狗机制部分源码
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
//没有加过期时间参数的加锁方法
public void lock() {
try {
this.lock(-1L, (TimeUnit)null, false);
} catch (InterruptedException var2) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 调用tryAcquire
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
、、、
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
if (leaseTime != -1L) {
// 有锁过期时间
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//没有锁过期时间
//this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
//this.lockWatchdogTimeout = 30000L;
ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
if (leaseTime != -1L) {
this.internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
this.scheduleExpirationRenewal(threadId);
}
}
}
});
return ttlRemainingFuture;
}
// redis底层是调用的lua脚本,来保证原子性
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
加锁底层是一个调度任务,没隔10秒刷新一次,也就是每隔10秒,过期时间刷新为30s(看门狗超时时间)
private void renewExpiration() {
RedissonBaseLock.ExpirationEntry ee = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonBaseLock.ExpirationEntry ent = (RedissonBaseLock.ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
RFuture<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
RedissonBaseLock.log.error("Can't update lock " + RedissonBaseLock.this.getName() + " expiration", e);
RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
} else {
if (res) {
RedissonBaseLock.this.renewExpiration();
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
读写锁
写写、读写排他
读读共享
@RestController
public class RedissonControler {
@Autowired
RedissonClient redissonClient;
// 加锁
@GetMapping("/lock")
public void getLock() throws InterruptedException {
RLock rlock = redissonClient.getLock("rlock");
try {
String lockId= UUID.randomUUID().toString();
//rlock.lock();
rlock.lock(10L,TimeUnit.SECONDS);
System.out.println(lockId+"加锁");
} finally {
//rlock.unlock();
}
}
//读写锁
@GetMapping("/rlock")
public void getRLock() throws InterruptedException {
RReadWriteLock rlock = redissonClient.getReadWriteLock("rwlock");
RLock rLock = rlock.readLock();
System.out.println("加读锁");
rLock.lock();
Thread.sleep(50000L);
rLock.unlock();
}
@GetMapping("/wlock")
public void getWLock() throws InterruptedException {
RReadWriteLock wlock = redissonClient.getReadWriteLock("rwlock");
RLock wLock = wlock.writeLock();
wLock.lock();
System.out.println("加写锁");
Thread.sleep(50000L);
wLock.unlock();
}
红锁
8.4. 红锁(RedLock)
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
闭锁
8.8. 闭锁(CountDownLatch)
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
更多推荐
所有评论(0)