夏有清风

最是清風明月知人意,涼爽了整個如詩的夏夜

Open Source, Open Mind,
Open Sight, Open Future!
  menu
7 文章
12547 浏览
1 当前访客
ღゝ◡╹)ノ❤️

基于Redis及Java AOP的分布式锁实现

背景

随着微服务的普及,越来越多的应用开始使用多节点互备的模式进行部署,这就可能会带来一个新的问题。在单节点的情况下,我们可以简单地使用Java的锁机制进行方法锁或类锁,但是在多节点的情况下,应该如何保证某个方法在同一时间点或同一时间段内,只被调用一次呢?
本文将基于Redis缓存及Java的AOP机制设计实现一种分布式方法锁,用来应对多节点情况下如跑批、定时任务、调度等问题。

锁注解

@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER)
@Retention(AnnotationRetention.RUNTIME)
annotation class DistributedLock(
    val lockPrefix: String = "",
    val lockKey: String = "",
    val retryTimes: Int = 3,
    val timeOut: Long = 5,
    val timeUnit: TimeUnit = TimeUnit.SECONDS)

首先定义好分布式锁的相关属性如:锁前缀、锁名、重试次数、锁超时时间、时间单位。
在Redis缓存下,使用锁前缀作为不同锁分组的命名空间,锁名表示需加锁的方法名;重试次数表示在获取锁失败的情况下,进程可重新请求锁的最大重试次数;锁超时时间表示在该指定时间后,若新进程无法获取锁则进行强制解锁。

注解切面

定义切面的环绕方法:线程对方法进行加锁,若加锁失败则结束线程,若加锁成功则执行方法,最终对方法解锁。

fun lockAroundAction(joinPoint: ProceedingJoinPoint) {
    if (this.lock(joinPoint, 0)) {
        try {
            joinPoint.proceed()
        } catch (throwable: Throwable) {
            throw DistributedLockException("分布式锁异常,${throwable.message}", throwable)
        } finally {
            this.unlock(joinPoint)
        }
    } else logger.warn("无法调用, 有其他线程正在执行该任务")
}

加锁方法如下:

private fun lock(joinPoint: ProceedingJoinPoint, count: Int): Boolean {
    val annotationArgs = this.getAnnotationArgs(joinPoint)
    return if (annotationArgs.isEmpty()) false
    else {
        val lockPrefix = annotationArgs["prefix"].toString()
        val key = annotationArgs["key"].toString()
        val expire = annotationArgs["expire"].toString().toLong()
        val retryTime = annotationArgs["retry"].toString().toInt()
        if (lockPrefix.isBlank() || key.isBlank()) {
            throw DistributedLockException("CacheLock锁前缀或锁名未指定")
        }
        if (commonLockHelper.addLock(lockPrefix, key, expire)) {
            return true
        } else {
            // 获取锁创建时间
            val createTime = commonLockHelper.getLockTime(lockPrefix, key)
            // 当前时间已锁超时时间,强制解锁并开始线程
            if (System.currentTimeMillis() - createTime > expire) {
  		logger.info("锁超时,强制解锁")
                commonLockHelper.remove(lockPrefix, key)
                lock(joinPoint, count.plus(1))
            } else {
                // 超过最大可重试次数,结束请求
                if (count >= retryTime) {
                    logger.warn("请求锁超过可重试上限, 中断请求")
                    return false
                } else {
                    // 下次请求在等待 1 << (count+1) 秒后执行
		    val waitTime = PROTECT_TIME shl count
                    logger.info("获取锁失败,等待${waitTime}秒后重试")
                    Thread.sleep(PROTECT_TIME shl count)
                    lock(joinPoint, count.plus(1))
                }
            }
        }
    }
}

获取锁属性:

private fun getAnnotationArgs(joinPoint: ProceedingJoinPoint): Map<String, Any> {
    val target = joinPoint.target.javaClass
    val methods = target.methods
    val methodName = joinPoint.signature.name
    for (method in methods) {
        if (method.name.equals(methodName, true)) {
           val redisLock = method.getAnnotation(DistributedLock::class.java)
           val expire = TimeoutUtil.toMillis(redisLock.timeOut, redisLock.timeUnit)
           return mapOf("prefix" to redisLock.lockPrefix, "key" to redisLock.lockKey, "expire" to expire, "retry" to redisLock.retryTime)
        }
    }
    return emptyMap()
}

解锁方法如下:

private fun unlock(joinPoint: ProceedingJoinPoint) {
    val annotationArgs = this.getAnnotationArgs(joinPoint)
    val lockPrefix = annotationArgs["prefix"].toString()
    val key = annotationArgs["key"].toString()
    if (lockPrefix.isBlank() || key.isBlank()) {
        throw DistributedLockException("CacheLock锁前缀或锁名未指定")
    } else commonLockHelper.remove(lockPrefix, key)
}

自定义异常

class DistributedLockException : RuntimeException {
    constructor() : super()
    constructor(message: String) : super(message)
    constructor(message: String, cause: Throwable) : super(message, cause)
    constructor(cause: Throwable) : super(cause)
}

RedisLockHelper工具类

为后续的可扩展性,抽象出CommonLockHelper接口:

interface CommonLockHelper {
    fun addLock(track: String, sector: String, timeout: Long): Boolean
    fun remove(track: String, sector: String)
    fun getLockTime(track: String, sector: String): Long
}

RedisLockHelper的实现如下:

class RedisLockHelper(val redisTemplate: RedisTemplate<String, String>) : CommonLockHelper {
    override fun addLock(track: String, sector: String, timeout: Long): Boolean {
        val valueOperations = redisTemplate.opsForValue()
        val cacheKey = "$track:$sector"
        val flag = valueOperations.setIfAbsent(cacheKey, System.currentTimeMillis().toString()) ?: false
        if (flag) valueOperations.set(cacheKey, getLockTime(track, sector).toString(), timeout, TimeUnit.SECONDS)
        return flag
    }

    override fun remove(track: String, sector: String) {
        redisTemplate.delete("$track:$sector")
    }

    override fun getLockTime(track: String, sector: String): Long {
        val valueOperations = redisTemplate.opsForValue()
        return valueOperations.get("$track:$sector")?.toLong() ?: 0
    }
}

在加锁过程中,首先尝试使用当前时间戳写入缓存cacheKey。

  • 如果写入失败,表示当前锁已被占用,返回加锁失败状态。
  • 如果写入成功,则此时获取锁成功,将之前写入的时间戳覆盖更新缓存cacheKey并设置该缓存的超时时间,返回加锁成功。

配置

最后进行分布式锁的工具类注入配置:

@Bean
fun commonLockHelper(): CommonLockHelper {
  return RedisLockHelper(redisTemplate)
}

测试

  1. 测试方法,使用Thread.sleep()方法使线程强制耗时5s左右,并将该service包装成接口。
@Async
@DistributedLock("author", "name", 5)
fun getName() {
  println("${DateUtil.format(Date(), "yyyy-MM-dd HH:mm:ss.SSS")} romani")
  Thread.sleep(5000)
}

使用Postman连续请求不同端口的两个该服务实例,可以观察到日志如下:
WX20190801112314.png
在重试3次后,进程B获取锁并成功执行方法。
2. 将锁超时时间设置为1秒后重新测试,观察日志如下:

@DistributedLock("author", "name", 5, 1)

1.png
请求时与锁创建时的差值已经大于锁的超时时间,此时强制解锁并执行进程B。
3. 将锁重试次数设置为1,超时时间设置为10s后重新测试,观察日志如下:

@DistributedLock("author", "name", 1, 10)

2.png