release branch 0.0.4

This commit is contained in:
binbin.hou
2022-12-07 16:40:24 +08:00
parent 7fd29cd1f2
commit 402653393d
50 changed files with 458 additions and 2253 deletions

View File

@@ -32,3 +32,10 @@
| 序号 | 变更类型 | 说明 | 时间 | 备注 | | 序号 | 变更类型 | 说明 | 时间 | 备注 |
|:---|:---|:---|:---|:--| |:---|:---|:---|:---|:--|
| 1 | A | 简单锁的实现,优化 redisLock 实现策略 | 2022-04-17 14:45:40 | | | 1 | A | 简单锁的实现,优化 redisLock 实现策略 | 2022-04-17 14:45:40 | |
# release_1.0.0
| 序号 | 变更类型 | 说明 | 时间 | 备注 |
|:---|:---|:---|:---|:--|
| 1 | A | 基于 redis 实现的分布式锁策略 | 2022-12-7 14:45:40 | |

View File

@@ -11,5 +11,15 @@
<artifactId>lock-api</artifactId> <artifactId>lock-api</artifactId>
<dependencies>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>common-cache-api</artifactId>
</dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>id-api</artifactId>
</dependency>
</dependencies>
</project> </project>

View File

@@ -1,29 +1,27 @@
package com.github.houbb.lock.api.core; package com.github.houbb.lock.api.core;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
/** /**
* 定义 * 分布式锁接口定义
* @author binbin.hou * @author binbin.hou
* @since 0.0.1 * @since 0.0.1
*/ */
public interface ILock extends Lock { public interface ILock {
/** /**
* 尝试加锁 * 尝试加锁,如果失败,会一直尝试。
*
* @param time 时间 * @param time 时间
* @param unit 当为 * @param unit 当为
* @param key key * @param key key
* @return 返回 * @return 返回
* @throws InterruptedException 异常
* @since 0.0.1 * @since 0.0.1
*/ */
boolean tryLock(long time, TimeUnit unit, boolean tryLock(long time, TimeUnit unit, String key);
String key) throws InterruptedException;
/** /**
* 尝试加锁 * 尝试加锁,只加锁一次
* @param key key * @param key key
* @return 返回 * @return 返回
* @since 0.0.1 * @since 0.0.1
@@ -35,6 +33,6 @@ public interface ILock extends Lock {
* @param key key * @param key key
* @since 0.0.1 * @since 0.0.1
*/ */
void unlock(String key); boolean unlock(String key);
} }

View File

@@ -0,0 +1,41 @@
package com.github.houbb.lock.api.core;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁接口定义
* @author binbin.hou
* @since 0.0.1
*/
public interface ILockSupport {
/**
* 尝试加锁,如果失败,会一直尝试。
*
* @param time 时间
* @param unit 单位
* @param key key
* @param context 上下文
* @return 返回
* @since 0.0.1
*/
boolean tryLock(long time, TimeUnit unit, String key, final ILockSupportContext context);
/**
* 尝试加锁,只加锁一次
* @param key key
* @param context 上下文
* @return 返回
* @since 0.0.1
*/
boolean tryLock(String key, final ILockSupportContext context);
/**
* 解锁
* @param key key
* @param context 上下文
* @since 0.0.1
*/
boolean unlock(String key, final ILockSupportContext context);
}

View File

@@ -0,0 +1,31 @@
package com.github.houbb.lock.api.core;
import com.github.houbb.common.cache.api.service.ICommonCacheService;
import com.github.houbb.id.api.Id;
/**
* 分布式锁接口定义
* @author binbin.hou
* @since 0.0.1
*/
public interface ILockSupportContext {
/**
* @return 标识策略
* @since 0.0.4
*/
Id id();
/**
* @return 缓存策略
* @since 0.0.4
*/
ICommonCacheService commonCacheService();
/**
* 锁的过期时间
* @return 结果
*/
int lockExpireMills();
}

View File

@@ -1,32 +0,0 @@
package com.github.houbb.lock.api.core;
/**
* 读写锁定义接口
* @author binbin.hou
* @since 0.0.2
*/
public interface IReadWriteLock {
/**
* 获取读锁
* @since 0.0.2
*/
void lockRead();
/**
* 释放读锁
*/
void unlockRead();
/**
* 获取写锁
* @since 0.0.2
*/
void lockWrite();
/**
* 释放写锁
*/
void unlockWrite();
}

View File

@@ -0,0 +1,29 @@
package com.github.houbb.lock.api.exception;
/**
* 加锁运行时异常
* @since 1.0.0
* @author dh
*/
public class LockException extends RuntimeException {
public LockException() {
}
public LockException(String message) {
super(message);
}
public LockException(String message, Throwable cause) {
super(message, cause);
}
public LockException(Throwable cause) {
super(cause);
}
public LockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@@ -1,41 +0,0 @@
package com.github.houbb.lock.api.support;
/**
* 操作接口定义
*
* ps: 可以基于集中式数据库做操作
*
* @author binbin.hou
* @since 0.0.3
*/
public interface IOperator {
/**
* 尝试获取分布式锁
*
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTimeMills 超期时间
* @return 是否获取成功
* @since 0.0.3
*/
boolean lock(String lockKey, String requestId, int expireTimeMills);
/**
* 解锁
* @param lockKey 锁 key
* @param requestId 请求标识
* @return 结果
* @since 0.0.3
*/
boolean unlock(String lockKey, String requestId);
/**
* 清空过期的锁
*
* 避免单个线程 unlock 失败,定时移除过期的锁。
* @since 0.0.4
*/
void clearExpireLock();
}

View File

@@ -36,7 +36,15 @@
<groupId>com.github.houbb</groupId> <groupId>com.github.houbb</groupId>
<artifactId>log-integration</artifactId> <artifactId>log-integration</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>common-cache-core</artifactId>
</dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>redis-config-core</artifactId>
</dependency>
</dependencies> </dependencies>

View File

@@ -1,13 +1,18 @@
package com.github.houbb.lock.core.bs; package com.github.houbb.lock.core.bs;
import com.github.houbb.common.cache.api.service.ICommonCacheService;
import com.github.houbb.heaven.util.common.ArgUtil; import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.id.api.Id; import com.github.houbb.id.api.Id;
import com.github.houbb.id.core.core.Ids; import com.github.houbb.id.core.core.Ids;
import com.github.houbb.lock.api.core.ILock; import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.api.support.IOperator; import com.github.houbb.lock.api.core.ILockSupport;
import com.github.houbb.lock.core.support.simple.SimpleLock; import com.github.houbb.lock.api.core.ILockSupportContext;
import com.github.houbb.wait.api.IWait; import com.github.houbb.lock.core.constant.LockConst;
import com.github.houbb.wait.core.Waits; import com.github.houbb.lock.core.support.lock.LockSupportContext;
import com.github.houbb.lock.core.support.lock.RedisLockSupport;
import com.github.houbb.redis.config.core.factory.JedisRedisServiceFactory;
import java.util.concurrent.TimeUnit;
/** /**
* 锁引导类 * 锁引导类
@@ -15,69 +20,46 @@ import com.github.houbb.wait.core.Waits;
* @author binbin.hou * @author binbin.hou
* @since 0.0.4 * @since 0.0.4
*/ */
public final class LockBs { public final class LockBs implements ILock{
private LockBs(){} private LockBs(){}
/** public static LockBs newInstance() {
* 清空初始化延迟时间 return new LockBs();
* @since 0.0.4 }
*/
private long clearInitDelaySeconds = 60;
/** /**
* 清空初始化周期 * 加锁锁定时间
* @since 0.0.4 * @since 0.0.4
*/ */
private long clearPeriodSeconds = 60; private int lockExpireMills = LockConst.DEFAULT_EXPIRE_MILLS;
/**
* 是否启用清空任务
* @since 0.0.4
*/
private boolean enableClearTask = true;
/**
* 锁等待
* @since 0.0.1
*/
protected IWait waits = Waits.threadSleep();
/** /**
* 标识策略 * 标识策略
* @since 0.0.4 * @since 0.0.4
*/ */
protected Id id = Ids.uuid32(); private Id id = Ids.uuid32();
/** /**
* 操作策略 * 缓存策略
* @since 0.0.4 * @since 0.0.4
*/ */
protected IOperator operator; private ICommonCacheService commonCacheService = JedisRedisServiceFactory.simple("127.0.0.1", 6379);
public static LockBs newInstance(final IOperator operator) { /**
return new LockBs().operator(operator); * 锁支持策略
} * @since 1.0.0
*/
private ILockSupport lockSupport = new RedisLockSupport();
public LockBs clearInitDelaySeconds(long clearInitDelaySeconds) { /**
this.clearInitDelaySeconds = clearInitDelaySeconds; * 锁上下文
return this; * @since 1.0.0
} */
private ILockSupportContext lockSupportContext = null;
public LockBs clearPeriodSeconds(long clearPeriodSeconds) { public LockBs lockExpireMills(int lockExpireMills) {
this.clearPeriodSeconds = clearPeriodSeconds; this.lockExpireMills = lockExpireMills;
return this;
}
public LockBs enableClearTask(boolean enableClearTask) {
this.enableClearTask = enableClearTask;
return this;
}
public LockBs waits(IWait waits) {
ArgUtil.notNull(waits, "waits");
this.waits = waits;
return this; return this;
} }
@@ -88,24 +70,62 @@ public final class LockBs {
return this; return this;
} }
public LockBs operator(IOperator operator) { public LockBs commonCacheService(ICommonCacheService commonCacheService) {
ArgUtil.notNull(operator, "operator"); ArgUtil.notNull(commonCacheService, "commonCacheService");
this.operator = operator; this.commonCacheService = commonCacheService;
return this; return this;
} }
public ILock lock() { public LockBs lockSupport(ILockSupport lockSupport) {
ArgUtil.notNull(operator, "operator"); ArgUtil.notNull(lockSupport, "lockSupport");
return SimpleLock.newInstance() this.lockSupport = lockSupport;
.waits(waits) return this;
.id(id)
.operator(operator)
.enableClearTask(enableClearTask)
.clearInitDelaySeconds(clearInitDelaySeconds)
.clearPeriodSeconds(clearPeriodSeconds)
.init();
} }
/**
* 初始化
*/
public LockBs init() {
this.lockSupportContext = LockSupportContext.newInstance()
.id(id)
.commonCacheService(commonCacheService)
.lockExpireMills(lockExpireMills);
return this;
}
@Override
public boolean tryLock(long time, TimeUnit unit, String key) {
ArgUtil.notEmpty(key, "key");
this.checkInitStatus();
return this.lockSupport.tryLock(time, unit, key, lockSupportContext);
}
@Override
public boolean tryLock(String key) {
ArgUtil.notEmpty(key, "key");
this.checkInitStatus();
return this.lockSupport.tryLock(key, lockSupportContext);
}
@Override
public boolean unlock(String key) {
ArgUtil.notEmpty(key, "key");
this.checkInitStatus();
return this.lockSupport.unlock(key, lockSupportContext);
}
private void checkInitStatus() {
ArgUtil.notNull(lockSupportContext, "please init() first!");
}
} }

View File

@@ -1,184 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.id.api.Id;
import com.github.houbb.id.core.core.Ids;
import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.api.support.IOperator;
import com.github.houbb.lock.core.constant.LockConst;
import com.github.houbb.wait.api.IWait;
import com.github.houbb.wait.core.Waits;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
/**
* 抽象实现
* @author binbin.hou
* @since 0.0.1
*/
public abstract class AbstractLock implements ILock {
/**
* 锁等待
* @since 0.0.1
*/
protected IWait waits = Waits.threadSleep();
/**
* 标识策略
* @since 0.0.4
*/
protected Id id = Ids.uuid32();
/**
* 操作策略
* @since 0.0.4
*/
protected IOperator operator;
/**
* 清空初始化延迟时间
* @since 0.0.4
*/
private long clearInitDelaySeconds = 5;
/**
* 清空初始化周期
* @since 0.0.4
*/
private long clearPeriodSeconds = 5;
/**
* 是否启用清空任务
* @since 0.0.4
*/
private boolean enableClearTask = true;
public AbstractLock waits(IWait waits) {
this.waits = waits;
return this;
}
public AbstractLock id(Id id) {
this.id = id;
return this;
}
public AbstractLock operator(IOperator operator) {
this.operator = operator;
return this;
}
public AbstractLock clearInitDelaySeconds(long clearInitDelaySeconds) {
this.clearInitDelaySeconds = clearInitDelaySeconds;
return this;
}
public AbstractLock clearPeriodSeconds(long clearPeriodSeconds) {
this.clearPeriodSeconds = clearPeriodSeconds;
return this;
}
public AbstractLock enableClearTask(boolean enableClearTask) {
this.enableClearTask = enableClearTask;
return this;
}
/**
* 初始化
* @since 0.0.4
*/
public AbstractLock init() {
// 参数校验
ArgUtil.notNull(operator, "operator");
// 初始化任务
initClearExpireKey();
return this;
}
/**
* 初始化清空任务
* @since 0.0.6
*/
private void initClearExpireKey() {
if(!enableClearTask) {
return;
}
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//5S 清理一次
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
operator.clearExpireLock();
}
}, clearInitDelaySeconds, clearPeriodSeconds, TimeUnit.SECONDS);
}
@Override
public void lock() {
throw new UnsupportedOperationException();
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public boolean tryLock() {
return tryLock(LockConst.DEFAULT_KEY);
}
@Override
public void unlock() {
unlock(LockConst.DEFAULT_KEY);
}
@Override
public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException {
long startTimeMills = System.currentTimeMillis();
// 一次获取,直接成功
boolean result = this.tryLock(key);
if(result) {
return true;
}
// 时间判断
if(time <= 0) {
return false;
}
long durationMills = unit.toMillis(time);
long endMills = startTimeMills + durationMills;
// 循环等待
while (System.currentTimeMillis() < endMills) {
result = tryLock(key);
if(result) {
return true;
}
// 等待 1ms
waits.wait(TimeUnit.MILLISECONDS, 1);
}
return false;
}
@Override
public synchronized boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return tryLock(time, unit, LockConst.DEFAULT_KEY);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}

View File

@@ -1,23 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.heaven.annotation.ThreadSafe;
/**
* 无任何锁的操作
*
* @author binbin.hou
* @since 0.0.3
*/
@ThreadSafe
public class LockNone extends AbstractLock {
@Override
public boolean tryLock(String key) {
return true;
}
@Override
public void unlock(String key) {
}
}

View File

@@ -1,125 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.lock.api.core.IReadWriteLock;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
/**
* 读写锁实现
*
* @author binbin.hou
* @since 0.0.2
*/
public class LockReadWrite implements IReadWriteLock {
private static final Log log = LogFactory.getLog(LockReadWrite.class);
/**
* 读次数统计
*/
private volatile int readCount = 0;
/**
* 写次数统计
*/
private volatile int writeCount = 0;
/**
* 获取读锁,读锁在写锁不存在的时候才能获取
*
* @since 0.0.2
*/
@Override
public synchronized void lockRead() {
try {
// 写锁存在,需要wait
while (!tryLockRead()) {
wait();
}
readCount++;
} catch (InterruptedException e) {
Thread.interrupted();
// 忽略打断
}
}
/**
* 尝试获取读锁
*
* @return 是否成功
* @since 0.0.2
*/
private boolean tryLockRead() {
if (writeCount > 0) {
log.debug("当前有写锁,获取读锁失败");
return false;
}
return true;
}
/**
* 释放读锁
*
* @since 0.0.2
*/
@Override
public synchronized void unlockRead() {
readCount--;
notifyAll();
}
/**
* 获取写锁
*
* @since 0.0.2
*/
@Override
public synchronized void lockWrite() {
try {
// 写锁存在,需要wait
while (!tryLockWrite()) {
wait();
}
// 此时已经不存在获取写锁的线程了,因此占坑,防止写锁饥饿
writeCount++;
} catch (InterruptedException e) {
Thread.interrupted();
}
}
/**
* 尝试获取写锁
*
* @return 是否成功
* @since 0.0.2
*/
private boolean tryLockWrite() {
if (writeCount > 0) {
log.debug("当前有其他写锁,获取写锁失败");
return false;
}
// 读锁
if (readCount > 0) {
log.debug("当前有其他读锁,获取写锁失败。");
return false;
}
return true;
}
/**
* 释放写锁
*
* @since 0.0.2
*/
@Override
public synchronized void unlockWrite() {
writeCount--;
notifyAll();
}
}

View File

@@ -1,160 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.lock.api.core.IReadWriteLock;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* 读写锁实现-保证释放锁时为锁的持有者
*
* @author binbin.hou
* @since 0.0.2
*/
public class LockReadWriteOwner implements IReadWriteLock {
private static final Log log = LogFactory.getLog(LockReadWriteOwner.class);
/**
* 如果使用类似 write 的方式,会导致读锁只能有一个。
* 调整为使用 HashMap 存放读的信息
*
* @since 0.0.2
*/
private final Map<Thread, Integer> readCountMap = new HashMap<>();
/**
* volatile 引用,保证线程间的可见性+易变性
*
* @since 0.0.2
*/
private final AtomicReference<Thread> writeOwner = new AtomicReference<>();
/**
* 写次数统计
*/
private volatile int writeCount = 0;
/**
* 获取读锁,读锁在写锁不存在的时候才能获取
*
* @since 0.0.2
*/
@Override
public synchronized void lockRead() {
try {
// 写锁存在,需要wait
while (!tryLockRead()) {
log.debug("获取读锁失败,进入等待状态。");
wait();
}
} catch (InterruptedException e) {
Thread.interrupted();
}
}
/**
* 尝试获取读锁
*
* 读锁之间是不互斥的,这里后续需要优化。
*
* @return 是否成功
* @since 0.0.2
*/
private boolean tryLockRead() {
if (writeCount > 0) {
log.debug("当前有写锁,获取读锁失败");
return false;
}
Thread currentThread = Thread.currentThread();
// 次数暂时固定为1后面如果实现可重入这里可以改进。
this.readCountMap.put(currentThread, 1);
return true;
}
/**
* 释放读锁
*
* @since 0.0.2
*/
@Override
public synchronized void unlockRead() {
Thread currentThread = Thread.currentThread();
Integer readCount = readCountMap.get(currentThread);
if (readCount == null) {
throw new RuntimeException("当前线程未持有任何读锁,释放锁失败!");
} else {
log.debug("释放读锁,唤醒所有等待线程。");
readCountMap.remove(currentThread);
notifyAll();
}
}
/**
* 获取写锁
*
* @since 0.0.2
*/
@Override
public synchronized void lockWrite() {
try {
// 写锁存在,需要wait
while (!tryLockWrite()) {
wait();
}
// 此时已经不存在获取写锁的线程了,因此占坑,防止写锁饥饿
writeCount++;
} catch (InterruptedException e) {
Thread.interrupted();
}
}
/**
* 尝试获取写锁
*
* @return 是否成功
* @since 0.0.2
*/
private boolean tryLockWrite() {
if (writeCount > 0) {
log.debug("当前有其他写锁,获取写锁失败");
return false;
}
// 读锁
if (!readCountMap.isEmpty()) {
log.debug("当前有其他读锁,获取写锁失败。");
return false;
}
Thread currentThread = Thread.currentThread();
boolean result = writeOwner.compareAndSet(null, currentThread);
log.debug("尝试获取写锁结果:{}", result);
return result;
}
/**
* 释放写锁
*
* @since 0.0.2
*/
@Override
public synchronized void unlockWrite() {
boolean toNullResult = writeOwner.compareAndSet(Thread.currentThread(), null);
if (toNullResult) {
writeCount--;
log.debug("写锁释放,唤醒所有等待线程。");
notifyAll();
} else {
throw new RuntimeException("释放写锁失败");
}
}
}

View File

@@ -1,195 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.lock.api.core.IReadWriteLock;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* 读写锁实现-可重入锁
*
* @author binbin.hou
* @since 0.0.2
*/
public class LockReadWriteRe implements IReadWriteLock {
private static final Log log = LogFactory.getLog(LockReadWriteRe.class);
/**
* 如果使用类似 write 的方式,会导致读锁只能有一个。
* 调整为使用 HashMap 存放读的信息
*
* @since 0.0.2
*/
private final Map<Thread, Integer> readCountMap = new HashMap<>();
/**
* volatile 引用,保证线程间的可见性+易变性
*
* @since 0.0.2
*/
private final AtomicReference<Thread> writeOwner = new AtomicReference<>();
/**
* 写次数统计
*/
private volatile int writeCount = 0;
/**
* 获取读锁,读锁在写锁不存在的时候才能获取
*
* @since 0.0.2
*/
@Override
public synchronized void lockRead() {
try {
// 写锁存在,需要wait
while (!tryLockRead()) {
log.debug("获取读锁失败,进入等待状态。");
wait();
}
} catch (InterruptedException e) {
Thread.interrupted();
}
}
/**
* 尝试获取读锁
*
* 读锁之间是不互斥的,这里后续需要优化。
*
* @return 是否成功
* @since 0.0.2
*/
private boolean tryLockRead() {
if (writeCount > 0) {
log.debug("当前有写锁,获取读锁失败");
return false;
}
Thread currentThread = Thread.currentThread();
Integer count = readCountMap.get(currentThread);
if(count == null) {
count = 0;
}
count++;
this.readCountMap.put(currentThread, count);
return true;
}
/**
* 释放读锁
*
* @since 0.0.2
*/
@Override
public synchronized void unlockRead() {
Thread currentThread = Thread.currentThread();
Integer readCount = readCountMap.get(currentThread);
if (readCount == null) {
throw new RuntimeException("当前线程未持有任何读锁,释放锁失败!");
} else {
readCount--;
// 已经是最后一次
if(readCount == 0) {
readCountMap.remove(currentThread);
} else {
readCountMap.put(currentThread, readCount);
}
log.debug("释放读锁,唤醒所有等待线程。");
notifyAll();
}
}
/**
* 获取写锁
*
* @since 0.0.2
*/
@Override
public synchronized void lockWrite() {
try {
// 写锁存在,需要wait
while (!tryLockWrite()) {
log.debug("获取写锁失败,进入等待状态。");
wait();
}
// 此时已经不存在获取写锁的线程了,因此占坑,防止写锁饥饿
writeCount++;
} catch (InterruptedException e) {
Thread.interrupted();
}
}
/**
* 尝试获取写锁
*
* @return 是否成功
* @since 0.0.2
*/
private boolean tryLockWrite() {
if (writeCount > 0) {
log.debug("当前有其他写锁,获取写锁失败");
return false;
}
// 读锁
if (!readCountMap.isEmpty()) {
log.debug("当前有其他读锁,获取写锁失败。");
return false;
}
Thread currentThread = Thread.currentThread();
// 多次重入
if(writeOwner.get() == currentThread) {
log.debug("为当前写线程多次重入,直接返回 true。");
return true;
}
boolean result = writeOwner.compareAndSet(null, currentThread);
log.debug("尝试获取写锁结果:{}", result);
return result;
}
/**
* 释放写锁
*
* @since 0.0.2
*/
@Override
public synchronized void unlockWrite() {
Thread currentThread = Thread.currentThread();
// 多次重入释放当次数多于1时直接返回否则需要释放 owner 信息)
if(writeCount > 1 && (currentThread == writeOwner.get())) {
log.debug("当前为写锁释放多次重入,直接返回成功。");
unlockWriteNotify();
return;
}
boolean toNullResult = writeOwner.compareAndSet(currentThread, null);
if (toNullResult) {
unlockWriteNotify();
} else {
throw new RuntimeException("释放写锁失败");
}
}
/**
* 释放写锁并且通知
*/
private synchronized void unlockWriteNotify() {
writeCount--;
log.debug("释放写锁成功,唤醒所有等待线程。");
notifyAll();
}
}

View File

@@ -1,44 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.lock.core.exception.LockRuntimeException;
import java.util.concurrent.atomic.AtomicReference;
/**
* 自旋锁
* @author binbin.hou
* @since 0.0.2
*/
public class LockSpin extends AbstractLock {
/**
* volatile 引用,保证线程间的可见性+易变性
*
* @since 0.0.2
*/
private AtomicReference<Thread> owner =new AtomicReference<>();
@Override
public void lock() {
// 循环等待,直到获取到锁
while (!tryLock()) {
}
}
@Override
public boolean tryLock(String key) {
Thread current = Thread.currentThread();
// CAS
return owner.compareAndSet(null, current);
}
@Override
public void unlock(String key) {
Thread current = Thread.currentThread();
boolean result = owner.compareAndSet(current, null);
if(!result) {
throw new LockRuntimeException("解锁失败");
}
}
}

View File

@@ -1,70 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.heaven.util.util.DateUtil;
import com.github.houbb.lock.core.exception.LockRuntimeException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* 自旋锁-可重入
* @author binbin.hou
* @since 0.0.2
*/
public class LockSpinRe extends AbstractLock {
/**
* volatile 引用,保证线程间的可见性+易变性
*
* @since 0.0.2
*/
private AtomicReference<Thread> owner =new AtomicReference<>();
/**
* 计数统计类
*
* @since 0.0.2
*/
private AtomicLong count = new AtomicLong(0);
@Override
public void lock() {
// 循环等待,直到获取到锁
while (!tryLock()) {
// sleep
DateUtil.sleep(1);
}
}
@Override
public boolean tryLock(String key) {
Thread current = Thread.currentThread();
// 判断是否已经拥有此锁
if(current == owner.get()) {
// 原子性自增 1
count.incrementAndGet();
return true;
}
// CAS
return owner.compareAndSet(null, current);
}
@Override
public void unlock(String key) {
Thread current = Thread.currentThread();
// 可重入实现
if(owner.get() == current && count.get() != 0) {
count.decrementAndGet();
return;
}
boolean result = owner.compareAndSet(current, null);
if(!result) {
throw new LockRuntimeException("解锁失败");
}
}
}

View File

@@ -1,60 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.lock.core.exception.LockRuntimeException;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.atomic.AtomicReference;
/**
* 等待通知的锁实现
* @author binbin.hou
* @since 0.0.2
*/
public class LockWaitNotify extends AbstractLock {
private static final Log log = LogFactory.getLog(LockWaitNotify.class);
/**
* volatile 引用,保证线程间的可见性+易变性
*
* @since 0.0.2
*/
private AtomicReference<Thread> owner =new AtomicReference<>();
@Override
public synchronized void lock() {
while (!tryLock()) {
try {
log.debug("等待被唤醒");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
// 是否可以被打断
}
}
}
@Override
public boolean tryLock(String key) {
Thread current = Thread.currentThread();
// CAS
boolean result = owner.compareAndSet(null, current);
log.debug("尝试获取锁结果:{}", result);
return result;
}
@Override
public synchronized void unlock(String key) {
Thread current = Thread.currentThread();
boolean result = owner.compareAndSet(current, null);
if(!result) {
throw new LockRuntimeException("解锁失败");
}
// 唤醒等待中的线程
log.debug("唤醒等待的进程");
notify();
}
}

View File

@@ -1,87 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.lock.core.exception.LockRuntimeException;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* 等待通知的锁实现-可重入
* @author binbin.hou
* @since 0.0.2
*/
public class LockWaitNotifyRe extends AbstractLock {
private static final Log log = LogFactory.getLog(LockWaitNotifyRe.class);
/**
* volatile 引用,保证线程间的可见性+易变性
*
* @since 0.0.2
*/
private AtomicReference<Thread> owner =new AtomicReference<>();
/**
* 次数统计
* @since 0.0.2
*/
private AtomicInteger count = new AtomicInteger(0);
@Override
public synchronized void lock() {
while (!tryLock()) {
try {
log.debug("等待被唤醒");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
// 是否可以被打断
}
}
}
@Override
public boolean tryLock(String key) {
Thread current = Thread.currentThread();
//可重入实现
if(current == owner.get()) {
count.incrementAndGet();
log.debug("当前线程已经拥有锁,直接返回 true");
return true;
}
// CAS
boolean result = owner.compareAndSet(null, current);
log.debug("尝试获取锁结果:{}", result);
return result;
}
@Override
public synchronized void unlock(String key) {
Thread current = Thread.currentThread();
// 可重入实现
if(owner.get() == current && count.get() != 0) {
count.decrementAndGet();
notifyAndLog();
return;
}
boolean result = owner.compareAndSet(current, null);
if(!result) {
throw new LockRuntimeException("解锁失败");
}
notifyAndLog();
}
private void notifyAndLog() {
// 唤醒等待中的线程
log.debug("唤醒等待的进程");
notify();
}
}

View File

@@ -1,88 +0,0 @@
package com.github.houbb.lock.core.core;
import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.api.core.IReadWriteLock;
/**
* 锁工具
*
* @author binbin.hou
* @since 0.0.3
*/
public final class Locks {
private Locks(){}
/**
* 无锁
* @return 锁
* @since 0.0.3
*/
public static ILock none() {
return new LockNone();
}
/**
* 读写锁
* @return 锁
* @since 0.0.3
*/
public static IReadWriteLock readWrite() {
return new LockReadWrite();
}
/**
* 读写锁
* @return 锁
* @since 0.0.3
*/
public static IReadWriteLock readWriteOwner() {
return new LockReadWriteOwner();
}
/**
* 可重入读写锁
* @return 锁
* @since 0.0.3
*/
public static IReadWriteLock readWriteRe() {
return new LockReadWriteRe();
}
/**
* 自旋锁
* @return 锁
* @since 0.0.3
*/
public static ILock spin() {
return new LockSpin();
}
/**
* 可重入自旋锁
* @return 锁
* @since 0.0.3
*/
public static ILock spinRe() {
return new LockSpinRe();
}
/**
* 等待通知锁
* @return 锁
* @since 0.0.3
*/
public static ILock waitNotify() {
return new LockWaitNotify();
}
/**
* 可重入等待通知锁
* @return 锁
* @since 0.0.3
*/
public static ILock waitNotifyRe() {
return new LockWaitNotifyRe();
}
}

View File

@@ -1,27 +0,0 @@
package com.github.houbb.lock.core.exception;
/**
* @author binbin.hou
* @since 0.0.2
*/
public class LockRuntimeException extends RuntimeException {
public LockRuntimeException() {
}
public LockRuntimeException(String message) {
super(message);
}
public LockRuntimeException(String message, Throwable cause) {
super(message, cause);
}
public LockRuntimeException(Throwable cause) {
super(cause);
}
public LockRuntimeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@@ -0,0 +1,65 @@
package com.github.houbb.lock.core.support.lock;
import com.github.houbb.common.cache.api.service.ICommonCacheService;
import com.github.houbb.id.api.Id;
import com.github.houbb.lock.api.core.ILockSupportContext;
/**
* 分布式锁接口定义
* @author binbin.hou
* @since 0.0.1
*/
public class LockSupportContext implements ILockSupportContext {
public static LockSupportContext newInstance() {
return new LockSupportContext();
}
/**
* 标识策略
* @since 0.0.4
*/
private Id id;
/**
* 缓存策略
* @since 0.0.4
*/
private ICommonCacheService commonCacheService;
/**
* 锁的过期时间
* @since 1.0.0
*/
private int lockExpireMills;
@Override
public Id id() {
return id;
}
public LockSupportContext id(Id id) {
this.id = id;
return this;
}
@Override
public ICommonCacheService commonCacheService() {
return commonCacheService;
}
public LockSupportContext commonCacheService(ICommonCacheService commonCacheService) {
this.commonCacheService = commonCacheService;
return this;
}
@Override
public int lockExpireMills() {
return lockExpireMills;
}
public LockSupportContext lockExpireMills(int lockExpireMills) {
this.lockExpireMills = lockExpireMills;
return this;
}
}

View File

@@ -0,0 +1,88 @@
package com.github.houbb.lock.core.support.lock;
import com.github.houbb.common.cache.api.service.ICommonCacheService;
import com.github.houbb.id.api.Id;
import com.github.houbb.id.core.util.IdThreadLocalHelper;
import com.github.houbb.lock.api.core.ILockSupport;
import com.github.houbb.lock.api.core.ILockSupportContext;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.redis.config.core.constant.JedisConst;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁接口定义
* @author binbin.hou
* @since 0.0.1
*/
public class RedisLockSupport implements ILockSupport {
private final Log log = LogFactory.getLog(RedisLockSupport.class);
@Override
public boolean tryLock(long time, TimeUnit unit, String key, ILockSupportContext context) {
long startTimeMills = System.currentTimeMillis();
// 一次获取,直接成功
boolean result = this.tryLock(key, context);
if(result) {
return true;
}
// 时间判断
if(time <= 0) {
return false;
}
long durationMills = unit.toMillis(time);
long endMills = startTimeMills + durationMills;
// 循环等待
while (System.currentTimeMillis() < endMills) {
result = tryLock(key, context);
if(result) {
return true;
}
// 等待 1ms
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return false;
}
@Override
public boolean tryLock(String key, ILockSupportContext context) {
log.info("开始尝试获取锁 {}", key);
// 生成当前线程的唯一标识
Id id = context.id();
final String requestId = id.id();
IdThreadLocalHelper.put(requestId);
log.info("开始尝试获取锁 requestId: {}", requestId);
final ICommonCacheService commonCacheService = context.commonCacheService();
final int lockExpireMills = context.lockExpireMills();
String result = commonCacheService.set(key, requestId, JedisConst.SET_IF_NOT_EXIST, JedisConst.SET_WITH_EXPIRE_TIME, lockExpireMills);
return JedisConst.OK.equalsIgnoreCase(result);
}
@Override
public boolean unlock(String key, ILockSupportContext context) {
log.info("开始尝试释放锁 {}", key);
String requestId = IdThreadLocalHelper.get();
log.info("开始尝试释放锁 requestId: {}", requestId);
final ICommonCacheService commonCacheService = context.commonCacheService();
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = commonCacheService.eval(script, Collections.singletonList(key), Collections.singletonList(requestId));
return JedisConst.RELEASE_SUCCESS.equals(result);
}
}

View File

@@ -1,43 +0,0 @@
package com.github.houbb.lock.core.support.simple;
import com.github.houbb.heaven.util.lang.StringUtil;
import com.github.houbb.id.core.util.IdThreadLocalHelper;
import com.github.houbb.lock.core.constant.LockConst;
import com.github.houbb.lock.core.core.AbstractLock;
import com.github.houbb.lock.core.exception.LockRuntimeException;
/**
* 简单锁实现策略
*
* @author binbin.hou
* @since 0.0.4
*/
public class SimpleLock extends AbstractLock {
public static SimpleLock newInstance() {
return new SimpleLock();
}
@Override
public boolean tryLock(String key) {
final String requestId = id.id();
IdThreadLocalHelper.put(requestId);
return operator.lock(key, requestId, LockConst.DEFAULT_EXPIRE_MILLS);
}
@Override
public void unlock(String key) {
final String requestId = IdThreadLocalHelper.get();
if(StringUtil.isEmpty(requestId)) {
String threadName = Thread.currentThread().getName();
throw new LockRuntimeException("Thread " + threadName +" not contains requestId");
}
boolean unlock = operator.unlock(key, requestId);
if(!unlock) {
throw new LockRuntimeException("Unlock key " + key + " result is failed!");
}
}
}

View File

@@ -1,28 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>lock</artifactId>
<groupId>com.github.houbb</groupId>
<version>0.0.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>lock-redis</artifactId>
<dependencies>
<!--============================== SELF ==============================-->
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>lock-core</artifactId>
</dependency>
<!--============================== OTHER ==============================-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -1,56 +0,0 @@
package com.github.houbb.lock.redis.constant;
/**
* redis 锁常量
*
* @author binbin.hou
* @since 0.0.1
*/
public final class LockRedisConst {
private LockRedisConst() {
}
/**
* 加锁成功
* @since 0.0.1
*/
public static final String LOCK_SUCCESS = "OK";
/**
* 如果不存在则设置值
* @since 0.0.1
*/
public static final String SET_IF_NOT_EXIST = "NX";
/**
* 设置过期时间
*
* 单位milliseconds
* @since 0.0.1
*/
public static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 解锁成功
*
* @since 0.0.1
*/
public static final Long RELEASE_SUCCESS = 1L;
/**
* 默认的失效时间
*
* 暂时定为 30min
* @since 0.0.1
*/
public static final int DEFAULT_EXPIRE_MILLS = 1000 * 60 * 30;
/**
* 默认锁为全局锁
* @since 0.0.1
*/
public static final String DEFAULT_KEY = "GLOBAL";
}

View File

@@ -1,29 +0,0 @@
package com.github.houbb.lock.redis.exception;
import com.github.houbb.lock.core.exception.LockRuntimeException;
/**
* @author binbin.hou
* @since 0.0.3
*/
public class LockRedisException extends LockRuntimeException {
public LockRedisException() {
}
public LockRedisException(String message) {
super(message);
}
public LockRedisException(String message, Throwable cause) {
super(message, cause);
}
public LockRedisException(Throwable cause) {
super(cause);
}
public LockRedisException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@@ -1 +0,0 @@
package com.github.houbb.lock.redis.exception;

View File

@@ -1,5 +0,0 @@
/**
* @author binbin.hou
* @since 1.0.0
*/
package com.github.houbb.lock.redis;

View File

@@ -1,68 +0,0 @@
package com.github.houbb.lock.redis.support.operator;
import com.github.houbb.lock.api.support.IOperator;
import com.github.houbb.lock.redis.constant.LockRedisConst;
import redis.clients.jedis.Jedis;
import java.util.Collections;
/**
* Redis 客户端
* @author binbin.hou
* @since 0.0.1
*/
public class JedisOperator implements IOperator {
/**
* jedis 客户端
* @since 0.0.1
*/
private final Jedis jedis;
public JedisOperator(Jedis jedis) {
this.jedis = jedis;
}
/**
* 尝试获取分布式锁
*
* expireTimeMills 保证当前进程挂掉,也能释放锁
*
* requestId 保证解锁的是当前进程(锁的持有者)
*
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTimeMills 超期时间
* @return 是否获取成功
* @since 0.0.1
*/
@Override
public boolean lock(String lockKey, String requestId, int expireTimeMills) {
String result = jedis.set(lockKey, requestId, LockRedisConst.SET_IF_NOT_EXIST, LockRedisConst.SET_WITH_EXPIRE_TIME, expireTimeMills);
return LockRedisConst.LOCK_SUCCESS.equals(result);
}
/**
* 解锁
*
* 1使用 requestId保证为当前锁的持有者
* 2使用 lua 脚本,保证执行的原子性。
*
* @param lockKey 锁 key
* @param requestId 请求标识
* @return 结果
* @since 0.0.1
*/
@Override
public boolean unlock(String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
return LockRedisConst.RELEASE_SUCCESS.equals(result);
}
@Override
public void clearExpireLock() {
}
}

View File

@@ -1 +0,0 @@
package com.github.houbb.lock.redis.support;

View File

@@ -16,10 +16,6 @@
<groupId>com.github.houbb</groupId> <groupId>com.github.houbb</groupId>
<artifactId>lock-core</artifactId> <artifactId>lock-core</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>lock-redis</artifactId>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@@ -1,35 +0,0 @@
package com.github.houbb.lock.test.core;
import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.core.core.LockSpinRe;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class LockSpinReThread implements Runnable {
private final ILock lock = new LockSpinRe();
@Override
public void run() {
System.out.println("first-lock: " + Thread.currentThread().getId());
lock.lock();
System.out.println("second-lock: " + Thread.currentThread().getId());
lock.lock();
lock.unlock();
System.out.println("second-unlock: " + Thread.currentThread().getId());
lock.unlock();
System.out.println("first-unlock: " + Thread.currentThread().getId());
}
public static void main(String[] args) {
final Runnable runnable = new LockSpinReThread();
new Thread(runnable).start();
new Thread(runnable).start();
new Thread(runnable).start();
}
}

View File

@@ -1,35 +0,0 @@
package com.github.houbb.lock.test.core;
import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.core.core.LockSpin;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class LockSpinThread implements Runnable {
private final ILock lock = new LockSpin();
@Override
public void run() {
System.out.println("first-lock: " + Thread.currentThread().getId());
lock.lock();
System.out.println("second-lock: " + Thread.currentThread().getId());
lock.lock();
lock.unlock();
System.out.println("second-unlock: " + Thread.currentThread().getId());
lock.unlock();
System.out.println("first-unlock: " + Thread.currentThread().getId());
}
public static void main(String[] args) {
final Runnable runnable = new LockSpinThread();
new Thread(runnable).start();
new Thread(runnable).start();
new Thread(runnable).start();
}
}

View File

@@ -1,39 +0,0 @@
package com.github.houbb.lock.test.core;
import com.github.houbb.heaven.util.util.DateUtil;
import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.core.core.LockWaitNotify;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.TimeUnit;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class LockWaitNotifyThread implements Runnable {
private static final Log log = LogFactory.getLog(LockWaitNotifyThread.class);
private final ILock lock = new LockWaitNotify();
@Override
public void run() {
log.debug("first lock");
lock.lock();
log.info("执行业务逻辑。");
DateUtil.sleep(TimeUnit.SECONDS, 5);
lock.unlock();
log.debug("first unlock");
}
public static void main(String[] args) {
final Runnable runnable = new LockWaitNotifyThread();
new Thread(runnable).start();
new Thread(runnable).start();
new Thread(runnable).start();
}
}

View File

@@ -1,45 +0,0 @@
package com.github.houbb.lock.test.core;
import com.github.houbb.heaven.util.util.DateUtil;
import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.core.core.LockWaitNotify;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import java.util.concurrent.TimeUnit;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class LockWaitNotifyThread2 implements Runnable {
private static final Log log = LogFactory.getLog(LockWaitNotifyThread2.class);
private final ILock lock = new LockWaitNotify();
@Override
public void run() {
log.debug("first lock");
lock.lock();
log.debug("second lock");
lock.lock();
log.info("执行业务逻辑。");
DateUtil.sleep(TimeUnit.SECONDS, 5);
log.debug("second unlock");
lock.unlock();
lock.unlock();
log.debug("first unlock");
}
public static void main(String[] args) {
final Runnable runnable = new LockWaitNotifyThread2();
new Thread(runnable).start();
new Thread(runnable).start();
new Thread(runnable).start();
}
}

View File

@@ -1,39 +0,0 @@
package com.github.houbb.lock.test.core;
import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.core.core.LockWaitNotifyRe;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class LockWaitNotifyThreadRe implements Runnable {
private static final Log log = LogFactory.getLog(LockWaitNotifyThreadRe.class);
private final ILock lock = new LockWaitNotifyRe();
@Override
public void run() {
log.debug("first lock");
lock.lock();
log.debug("second lock");
lock.lock();
log.debug("second unlock");
lock.unlock();
log.debug("first unlock");
lock.unlock();
}
public static void main(String[] args) {
final Runnable runnable = new LockWaitNotifyThreadRe();
new Thread(runnable).start();
new Thread(runnable).start();
new Thread(runnable).start();
}
}

View File

@@ -1,25 +0,0 @@
package com.github.houbb.lock.test;
import java.util.concurrent.Phaser;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class MyPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0 :
System.out.println("上半场完成");
return false;
case 1:
System.out.println("下半场完成");
return false;
default:
return true;
}
}
}

View File

@@ -1,53 +0,0 @@
package com.github.houbb.lock.test;
import java.util.concurrent.Phaser;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class PhaserDemo {
private static class GameRunnable implements Runnable {
private final Phaser phaser;
private GameRunnable(Phaser phaser) {
this.phaser = phaser;
}
@Override
public void run() {
//参加上半场比赛
System.out.println("玩家-"+Thread.currentThread().getName()+":参加上半场比赛");
//执行这个方法的话会等所有的选手都完成了之后再继续下面的方法
phaser.arriveAndAwaitAdvance();
// 下半场
//参加上半场比赛
System.out.println("玩家-"+Thread.currentThread().getName()+":参加下半场比赛");
//执行这个方法的话会等所有的选手都完成了之后再继续下面的方法
phaser.arriveAndAwaitAdvance();
}
}
public static void main(String[] args) {
int nums = 3;
Phaser phaser = new MyPhaser();
//注册一次表示 phaser 维护的线程个数
phaser.register();
for(int i = 0; i < nums; i++) {
phaser.register();
Thread thread = new Thread(new GameRunnable(phaser));
thread.start();
}
//后续阶段主线程就不参加了
phaser.arriveAndDeregister();
}
}

View File

@@ -0,0 +1,28 @@
package com.github.houbb.lock.test.core;
import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.core.bs.LockBs;
import org.junit.Test;
public class LockBsTest {
@Test
public void helloTest() {
ILock lock = LockBs.newInstance()
.init();
String key = "ddd";
try {
// 加锁
lock.tryLock(key);
System.out.println("业务处理");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
// 释放锁
lock.unlock(key);
}
}
}

View File

@@ -1,58 +0,0 @@
package com.github.houbb.lock.test.lock;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class ArrayBlockingQueueDemo {
private ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
public void put(final String put) throws InterruptedException {
System.out.println("设置开始");
TimeUnit.SECONDS.sleep(1);
queue.put(put);
System.out.println("设置完成: " + put);
}
public void take() throws InterruptedException {
System.out.println("获取开始");
String take = queue.take();
System.out.println("获取成功: " + take);
}
public static void main(String[] args) {
final ArrayBlockingQueueDemo queueTest = new ArrayBlockingQueueDemo();
// 写入线程
new Thread(new Runnable() {
@Override
public void run() {
try {
for(int i = 0; i < 3; i++) {
queueTest.put(i+"T");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 读取线程
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
queueTest.take();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

View File

@@ -1,36 +0,0 @@
package com.github.houbb.lock.test.lock;
import org.junit.Test;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class ConcurrentLinkedQueueTest {
@Test
public void helloTest() {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// add() 将指定元素插入此队列的尾部。
queue.add("add");
// offer() 将指定元素插入此队列的尾部。
queue.offer("offer");
// peek() 获取但不移除此队列的头;如果此队列为空,则返回 null
String value = queue.peek();
System.out.println("PEEK: " + value);
// poll() 获取并移除此队列的头,如果此队列为空,则返回 null。
String poll = queue.poll();
System.out.println("POLL: " + poll);
// remove() 移除 从队列中移除指定元素的单个实例(如果存在)。
boolean remove = queue.remove("offer");
System.out.println("Remove result: " + remove);
}
}

View File

@@ -1,59 +0,0 @@
package com.github.houbb.lock.test.lock;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class CowArraySetDemo {
/**
* 读线程
*/
private static class ReadTask implements Runnable {
Set<String> set;
public ReadTask(Set<String> set) {
this.set = set;
}
public void run() {
System.out.println(set);
}
}
/**
* 写线程
*/
private static class WriteTask implements Runnable {
private Set<String> set;
private String value;
public WriteTask(Set<String> set, String value) {
this.set = set;
this.value = value;
}
public void run() {
set.remove(value);
}
}
public static void main(String[] args) {
final int NUM = 5;
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < NUM; i++) {
set.add("main_" + i);
}
ExecutorService executorService = Executors.newFixedThreadPool(NUM);
for (int i = 0; i < NUM; i++) {
executorService.execute(new WriteTask(set, "main_" + i));
executorService.execute(new ReadTask(set));
}
executorService.shutdown();
}
}

View File

@@ -1,134 +0,0 @@
package com.github.houbb.lock.test.lock;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author binbin.hou
* @see java.util.concurrent.DelayQueue
* @since 1.0.0
*/
public class DelayQueueDemo {
/**
* 写入线程
* @author 老马啸西风
*/
private static class WriteThread extends Thread {
private final DelayQueue<DelayElem> delayQueue;
private WriteThread(DelayQueue<DelayElem> delayQueue) {
this.delayQueue = delayQueue;
}
@Override
public void run() {
for(int i = 0; i < 3; i++) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
DelayElem element = new DelayElem(1000,i+"test");
delayQueue.offer(element);
System.out.println(System.currentTimeMillis() + " 放入元素 " + i);
}
}
}
/**
* 读取线程
* @author 老马啸西风
*/
private static class ReadThread extends Thread {
private final DelayQueue<DelayElem> delayQueue;
private ReadThread(DelayQueue<DelayElem> delayQueue) {
this.delayQueue = delayQueue;
}
@Override
public void run() {
while (true){
try {
DelayElem element = delayQueue.take();
System.out.println(System.currentTimeMillis() +" 获取元素:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private static class DelayElem implements Delayed {
/**
* 延迟时间
*/
private final long delay;
/**
* 到期时间
*/
private final long expire;
/**
* 数据
*/
private final String msg;
private DelayElem(long delay, String msg) {
this.delay = delay;
this.msg = msg;
//到期时间 = 当前时间+延迟时间
this.expire = System.currentTimeMillis() + this.delay;
}
/**
* 需要实现的接口,获得延迟时间
*
* 用过期时间-当前时间
* @param unit 时间单位
* @return 延迟时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序
* <p>
* 当前时间的延迟时间 - 比较对象的延迟时间
*
* @param o 比较对象
* @return 结果
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayElem{" +
"delay=" + delay +
", expire=" + expire +
", msg='" + msg + '\'' +
'}';
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayElem> delayQueue = new DelayQueue<>();
new WriteThread(delayQueue).start();
new ReadThread(delayQueue).start();
}
}

View File

@@ -1,57 +0,0 @@
package com.github.houbb.lock.test.lock;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class LinkedBlockingDequeDemo {
private static class Producer implements Runnable{
private BlockingDeque<Integer> queue;
public Producer(BlockingDeque<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while(true) {
try {
Integer num = ThreadLocalRandom.current().nextInt(100);
queue.put(num);
System.out.println(String.format("%s producer a num %d",Thread.currentThread().getName(),num));
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
private static class Consumer implements Runnable{
private BlockingDeque<Integer> queue;
public Consumer(BlockingDeque<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
while(true) {
try {
System.out.println(String.format("%s consume a num %d",Thread.currentThread().getName(),queue.take()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BlockingDeque<Integer> queue = new LinkedBlockingDeque<>(100);
new Thread(new Producer(queue),"Producer").start();
new Thread(new Consumer(queue),"Consumer").start();
}
}

View File

@@ -1,57 +0,0 @@
package com.github.houbb.lock.test.lock;
import java.util.concurrent.*;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class LinkedBlockingQueueDemo {
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(3);
public void put(final String put) throws InterruptedException {
System.out.println("设置开始");
TimeUnit.SECONDS.sleep(1);
queue.put(put);
System.out.println("设置完成: " + put);
}
public void take() throws InterruptedException {
System.out.println("获取开始");
String take = queue.take();
System.out.println("获取成功: " + take);
}
public static void main(String[] args) {
final LinkedBlockingQueueDemo queueTest = new LinkedBlockingQueueDemo();
// 写入线程
new Thread(new Runnable() {
@Override
public void run() {
try {
for(int i = 0; i < 3; i++) {
queueTest.put(i+"T");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 读取线程
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
queueTest.take();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

View File

@@ -1,86 +0,0 @@
package com.github.houbb.lock.test.lock;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class PriorityBlockingQueueDemo {
private static class User implements Comparable<User> {
private final int order;
private final String name;
private User(int order, String name) {
this.order = order;
this.name = name;
}
@Override
public int compareTo(User o) {
return this.order - o.order;
}
@Override
public String toString() {
return "User{" +
"order=" + order +
", name='" + name + '\'' +
'}';
}
}
private PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<>();
public void put(final User user) throws InterruptedException {
System.out.println("设置开始");
queue.put(user);
System.out.println("设置完成: " + user);
}
public void take() throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
System.out.println("获取开始");
User take = queue.take();
System.out.println("获取成功: " + take);
}
public static void main(String[] args) {
final PriorityBlockingQueueDemo queueTest = new PriorityBlockingQueueDemo();
// 写入线程
new Thread(new Runnable() {
@Override
public void run() {
try {
for(int i = 0; i < 5; i++) {
int order = ThreadLocalRandom.current().nextInt(10);
User user = new User(order, i+"-user");
queueTest.put(user);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 读取线程
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
queueTest.take();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

View File

@@ -1,9 +0,0 @@
package com.github.houbb.lock.test.lock;
/**
* @author binbin.hou
* @since 1.0.0
* @see java.util.concurrent.SynchronousQueue
*/
public class SynchronousQueueDemo {
}

View File

@@ -1,37 +1,37 @@
package com.github.houbb.lock.test.redis; //package com.github.houbb.lock.test.redis;
//
import com.github.houbb.lock.api.core.ILock; //import com.github.houbb.lock.api.core.ILock;
import com.github.houbb.lock.api.support.IOperator; //import com.github.houbb.lock.api.support.IOperator;
import com.github.houbb.lock.core.bs.LockBs; //import com.github.houbb.lock.core.bs.LockBs;
import com.github.houbb.lock.redis.support.operator.JedisOperator; //import com.github.houbb.lock.redis.support.operator.JedisOperator;
import org.junit.Ignore; //import org.junit.Ignore;
import org.junit.Test; //import org.junit.Test;
import redis.clients.jedis.Jedis; //import redis.clients.jedis.Jedis;
//
/** ///**
* @author binbin.hou // * @author binbin.hou
* @since 0.0.1 // * @since 0.0.1
*/ // */
@Ignore //@Ignore
public class LockRedisTest { //public class LockRedisTest {
//
@Test // @Test
public void helloTest() { // public void helloTest() {
Jedis jedis = new Jedis("127.0.0.1", 6379); // Jedis jedis = new Jedis("127.0.0.1", 6379);
IOperator operator = new JedisOperator(jedis); // IOperator operator = new JedisOperator(jedis);
//
// 获取锁 // // 获取锁
ILock lock = LockBs.newInstance(operator).lock(); // ILock lock = LockBs.newInstance(operator).lock();
//
try { // try {
boolean lockResult = lock.tryLock(); // boolean lockResult = lock.tryLock();
System.out.println(lockResult); // System.out.println(lockResult);
// 业务处理 // // 业务处理
} catch (Exception e) { // } catch (Exception e) {
e.printStackTrace(); // e.printStackTrace();
} finally { // } finally {
lock.unlock(); // lock.unlock();
} // }
} // }
//
} //}

34
pom.xml
View File

@@ -10,7 +10,6 @@
<module>lock-api</module> <module>lock-api</module>
<module>lock-core</module> <module>lock-core</module>
<module>lock-test</module> <module>lock-test</module>
<module>lock-redis</module>
</modules> </modules>
<properties> <properties>
@@ -34,10 +33,13 @@
<!--============================== INTER ==============================--> <!--============================== INTER ==============================-->
<plugin.gen.version>1.0.6</plugin.gen.version> <plugin.gen.version>1.0.6</plugin.gen.version>
<heaven.version>0.1.161</heaven.version>
<heaven.version>0.1.167</heaven.version>
<id.version>0.0.6</id.version> <id.version>0.0.6</id.version>
<wait.version>0.0.1</wait.version> <wait.version>0.0.1</wait.version>
<log-integration.version>1.1.8</log-integration.version> <log-integration.version>1.1.8</log-integration.version>
<common-cache.version>0.0.5</common-cache.version>
<redis-config.version>1.4.0</redis-config.version>
<!--============================== OTHER ==============================--> <!--============================== OTHER ==============================-->
<junit.version>4.12</junit.version> <junit.version>4.12</junit.version>
@@ -57,18 +59,17 @@
<artifactId>lock-core</artifactId> <artifactId>lock-core</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>lock-redis</artifactId>
<version>${project.version}</version>
</dependency>
<!--============================== INTER ==============================--> <!--============================== INTER ==============================-->
<dependency> <dependency>
<groupId>com.github.houbb</groupId> <groupId>com.github.houbb</groupId>
<artifactId>heaven</artifactId> <artifactId>heaven</artifactId>
<version>${heaven.version}</version> <version>${heaven.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>id-api</artifactId>
<version>${id.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.github.houbb</groupId> <groupId>com.github.houbb</groupId>
<artifactId>id-core</artifactId> <artifactId>id-core</artifactId>
@@ -85,6 +86,23 @@
<version>${log-integration.version}</version> <version>${log-integration.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>common-cache-api</artifactId>
<version>${common-cache.version}</version>
</dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>common-cache-core</artifactId>
<version>${common-cache.version}</version>
</dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>redis-config-core</artifactId>
<version>${redis-config.version}</version>
</dependency>
<!--============================== OTHER ==============================--> <!--============================== OTHER ==============================-->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>