redis 做分布式锁的三个核心要素:

1、加锁

最简单的命令是setnx,key是锁的唯一标识,按业务来决定命名,value为当前线程的线程ID。当一个线程执行setnx返回1,说明key原本不存在,该线程成功得到了锁,当其他线程执行setnx返回0,说明key已经存在,该线程抢锁失败。

2、解锁


当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行del指令。

3、锁超时

如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。所以,setnx的key必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx不支持超时参数,所以需要额外的指令,

expire(key, time):

Redis做分布式可能出现的问题:

1、 setnx和expire的非原子性

2、超时后使用del 导致误删其他线程的锁。

A线程持有锁,但是因为任务运行耗时较长,锁过期了。B线程获取到锁,B还没执行完,但是A执行完了,锁被释放掉,误删除。

3、并发的可靠性问题

解决办法:

1、java中jedisCluster客户端,提供


set(final String key, final String value, final String nxxx, final String expx, final long time)

相当于是setnx和expire的组合包装,但是具有原子性。

2、基于Redis的分布式锁框架redisson。

Redisson是一个企业级的开源Redis Client,也提供了分布式锁的支持。

  • 加锁机制

    • 线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
    • 线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis数据库。
  • watch dog自动延期机制

    • 工作线程未完成任务,但是到了过期时间,还想延长,可以通过看门狗机制,不断延长锁的过期时间。

Redision实现分布式锁的原理

<pre class="has">

@Override
public RLock getLock(String name) {

return new RedissonLock(connectionManager.getCommandExecutor(), name);

}


  
  
  
拿到实例后进行锁定

  
  
  
  
  
```java
// 获取锁
@Override
public void lock() {
   try {
        lockInterruptibly();
   } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
   }
}

@Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }

// 可中断的获取锁
@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        // 获取当前线程的id
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture future = subscribe(threadId);
        commandExecutor.syncSubscription(future);
        try {
            // 采用不断循环的方式获取锁
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }

// 异步方式获取锁,
private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

// 用lua 脚本保证Redis事务特性
 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
```

  
  
  
非常重要的一点,Redisson在获取锁的时候,采用信号量竞争机制,也就是多个线程获取锁,只有一个线程获取到锁,其他的线程会进入阻塞状态,防止无效的轮询而浪费资源。所以,接着看订阅scribe函数

  
  
  
  
  
```
```java
RFuture future = subscribe(threadId);
commandExecutor.syncSubscription(future);

protected RFuture subscribe(long threadId) {
        return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
    }

public RFuture subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
        // 原子类自增监听器
        final AtomicReference listenerHolder = new AtomicReference();
        // 设定信号量
       final AsyncSemaphore semaphore = subscribeService.getSemaphore(new ChannelName(channelName));
        final RPromise newPromise = new RedissonPromise() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return semaphore.remove(listenerHolder.get());
            }
        };

        Runnable listener = new Runnable() {
            @Override
            public void run() {
                E entry = entries.get(entryName);
                if (entry != null) {
                    entry.aquire();
                    semaphore.release();
                    entry.getPromise().addListener(new TransferListener(newPromise));
                    return;
                }
                
                E value = createEntry(newPromise);
                value.aquire();
                
                E oldValue = entries.putIfAbsent(entryName, value);
                if (oldValue != null) {
                    oldValue.aquire();
                    semaphore.release();
                    oldValue.getPromise().addListener(new TransferListener(newPromise));
                    return;
                }
                
                RedisPubSubListener listener = createListener(channelName, value);
                subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
            }
        };
        semaphore.acquire(listener);
        listenerHolder.set(listener);
        
        return newPromise;
    }
```

  
  
  
接着看如何释放锁:

  
  
  
  
  
```
```java
@Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)e.getCause();
            } else {
                throw e;
            }
        }
}

 @Override
    public RFuture unlockAsync(final long threadId) {
        final RPromise result = new RedissonPromise();
        RFuture future = unlockInnerAsync(threadId);

        future.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }

                Boolean opStatus = future.getNow();
                if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });

        return result;
    }

// lua 脚本做异步处理,释放Redis锁
protected RFuture unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }


```

  
  
  
  
  
- [点赞 ](javascript:;)
- [收藏](javascript:;)
- [分享](javascript:;)
-   
    
    
    
    
    
    
    
    
  
  - 文章举报

[![](https://profile.csdnimg.cn/B/3/6/3_chenwiehuang)![Redis 做分布式锁的常见问题和解决方案教程](https://g.csdnimg.cn/static/user-reg-year/1x/9.png)](https://blog.csdn.net/chenwiehuang)[wei906](https://blog.csdn.net/chenwiehuang)发布了143 篇原创文章 · 获赞 25 · 访问量 16万+ [私信 ](https://im.csdn.net/im/main.html?userName=chenwiehuang)关注

标签: 线程, 分布式, return, redis, 常见问题, final, Redis, call, threadId

相关文章推荐

添加新评论,含*的栏目为必填

© 2024 善用搜索 All Rights Reserved.
专注分享技术,共同学习,共同进步!版权皆归原作者!侵权联系