feat:单机限流类

This commit is contained in:
zhongzb
2023-11-06 21:49:40 +08:00
parent d7de7fae18
commit 2bb70c4133
6 changed files with 381 additions and 80 deletions

View File

@@ -0,0 +1,28 @@
package com.abin.frequencycontrol.service.frequencycontrol.single;
public class FixWindow {
public Integer count; //当前窗口累计请求数
public long lastAcquireTime;//最后一次请求时间
public Long windowInMillis; //固定窗口时间区间(毫秒)
public Integer maxRequests; // 最大请求限制
public FixWindow(Long windowInMillis, Integer maxRequests) {
this.windowInMillis = windowInMillis;
this.maxRequests = maxRequests;
}
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis(); //获取系统当前时间
//当前和上次不在同一时间窗口
if (currentTime - lastAcquireTime > windowInMillis) {
count = 0; // 计数器清0
lastAcquireTime = currentTime; //开启新的时间窗口
} else { //同一窗口内
if (count < maxRequests) { // 小于阀值
count++; //计数统计器加1
return true;
}
}
return false;
}
}

View File

@@ -0,0 +1,39 @@
package com.abin.frequencycontrol.service.frequencycontrol.single;
public class LeakyBucketRateLimiter {
private final int capacity; // 桶的容量
private final int rate; // 出桶速率 (每秒的请求数)
private long lastRequestTime; // 上一个请求的时间戳
public LeakyBucketRateLimiter(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.lastRequestTime = System.currentTimeMillis();
}
/**
* 尝试请求
*
* @return >0 请求进入桶里,返回的是需要休眠的时间。
*/
public synchronized long tryAcquire() {
//当前时间
long currentTime = System.currentTimeMillis();
//漏桶空的
if (currentTime > lastRequestTime) {
lastRequestTime = currentTime;
return 0; // 请求被允许
}
//上次取水的间隔时间
long elapsedTime = lastRequestTime - currentTime;
// 计算桶中的水量
int water = (int) elapsedTime * rate / 1000;
//水量不超过容量
if (water < capacity) {
long sleepTime = (1000 / rate) + elapsedTime;
lastRequestTime = currentTime + sleepTime;
return sleepTime;
}
return -1; // 请求被拒绝
}
}

View File

@@ -0,0 +1,36 @@
package com.abin.frequencycontrol.service.frequencycontrol.single;
import java.util.LinkedList;
public class SlideWindow {
private final int maxRequests;//最大请求
private final long windowInMillis;//窗口范围
private LinkedList<Long> requestTimestamps;//每个请求的时间戳
public SlideWindow(int maxRequests, long windowInMillis) {
this.maxRequests = maxRequests;
this.windowInMillis = windowInMillis;
this.requestTimestamps = new LinkedList<>();
}
public synchronized boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
//清除过期窗口的请求
cleanExpiredRequests(currentTime);
//统计窗口内的请求数小于总限制
if (requestTimestamps.size() < maxRequests) {
requestTimestamps.addLast(currentTime);
return true;
}
return false;
}
private void cleanExpiredRequests(long currentTime) {
//由于是LinkedList头结点就是最早的请求判断超出时间窗口就移除留下的都是窗口内的请求
while (!requestTimestamps.isEmpty() && (currentTime - requestTimestamps.getFirst() > windowInMillis)) {
requestTimestamps.removeFirst();
}
}
}

View File

@@ -0,0 +1,157 @@
package com.abin.frequencycontrol.service.frequencycontrol.single;
import lombok.SneakyThrows;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Test {
private static AtomicInteger pass1 = new AtomicInteger();
private static AtomicInteger pass2 = new AtomicInteger();
private static AtomicInteger pass3 = new AtomicInteger();
private static AtomicInteger pass4 = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static List<Integer> list1 = new ArrayList<>();
private static List<Integer> list2 = new ArrayList<>();
private static List<Integer> list3 = new ArrayList<>();
private static List<Integer> list4 = new ArrayList<>();
private static List<Integer> listTotal = new ArrayList<>();
private static List<Double> x = new ArrayList<>();
private static volatile boolean stop = false;
private static final Integer QPS = 20;
private static final Integer accuracy = 4;
public static void main(String[] args) throws InterruptedException {
List<Integer> list = Arrays.asList(2, 2, 5, 20, 20, 20, 10, 5, 30, 30, 30, 2, 2, 2, 2, 2, 2, 2, 20, 20, 10, 10, 20, 10, 10, 5, 5, 5, 5, 2, 6, 4, 5, 7, 30, 30, 20, 20, 10, 20, 10, 20, 10, 20, 30, 20, 20, 10, 20, 10, 20, 10, 20, 30, 20, 20, 10, 20, 3, 3, 3, 20, 30, 20, 20, 10, 20, 10, 20, 10, 20);
List<Integer> time = split(list);
AtomicInteger index = new AtomicInteger(0);
tick();
TokenBucketRateLimiter tokenBucketRateLimiter = new TokenBucketRateLimiter(30, QPS);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
FixWindow fixWindow = new FixWindow(1000L, QPS);
SlideWindow slideWindow = new SlideWindow(QPS, 1000L);
LeakyBucketRateLimiter leakyBucketRateLimiter = new LeakyBucketRateLimiter(30, QPS);
ExecutorService executorService = Executors.newFixedThreadPool(100);
ExecutorService common = Executors.newFixedThreadPool(100);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int andIncrement = index.getAndIncrement();
if (andIncrement < time.size()) {
Integer integer = time.get(andIncrement);
for (Integer i = 0; i < integer; i++) {
common.execute(() -> {
total.incrementAndGet();
});
common.execute(() -> {
fixWindowLimit(fixWindow);
});
common.execute(() -> {
slideWindowLimit(slideWindow);
});
executorService.execute(() -> {
leakyBucketRateLimit(leakyBucketRateLimiter);
});
common.execute(() -> {
tokenBucketRateLimit(tokenBucketRateLimiter);
});
}
} else {
stop = true;
}
}, 0, 250 / accuracy, TimeUnit.MILLISECONDS);
Thread.sleep(100000);
}
private static List<Integer> split(List<Integer> result) {
ArrayList<Integer> res = new ArrayList<>(result.size() * 4);
for (int i = 0; i < result.size(); i++) {
Integer integer = result.get(i);
Integer divisor = integer / accuracy;
if (divisor == 0) divisor = 1;
res.add(Math.min(integer, divisor));
integer -= divisor;
res.add(Math.min(integer, divisor));
integer -= divisor;
res.add(Math.min(integer, divisor));
integer -= divisor;
res.add(Math.min(integer, divisor));
}
return res;
}
private static void tokenBucketRateLimit(TokenBucketRateLimiter tokenBucketRateLimiter) {
if (tokenBucketRateLimiter.tryAcquire()) {
pass4.incrementAndGet();
}
}
private static void leakyBucketRateLimit(LeakyBucketRateLimiter leakyBucketRateLimiter) {
long l = leakyBucketRateLimiter.tryAcquire();
if (l == 0) {
pass3.incrementAndGet();
} else if (l > 0) {
try {
Thread.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
pass3.incrementAndGet();
}
}
private static void slideWindowLimit(SlideWindow fixWindow) {
boolean b = fixWindow.tryAcquire();
if (b) {
pass2.incrementAndGet();
}
}
private static void fixWindowLimit(FixWindow fixWindow) {
boolean b = fixWindow.tryAcquire();
if (b) {
pass1.incrementAndGet();
}
}
private static void limit(List<Integer> result) {
}
private static void tick() {
Thread timer = new Thread(new TimerTask());
timer.setName("sentinel-timer-task");
timer.start();
}
static class TimerTask implements Runnable {
@SneakyThrows
@Override
public void run() {
long start = System.currentTimeMillis();
int i = 0;
while (!stop) {
i++;
Thread.sleep(250);
list1.add(pass1.getAndSet(0));
list2.add(pass2.getAndSet(0));
list3.add(pass3.getAndSet(0));
list4.add(pass4.getAndSet(0));
listTotal.add(total.getAndSet(0));
x.add(i * 0.25);
}
System.out.println(x);
System.out.println(listTotal);
System.out.println(list1);
System.out.println(list2);
System.out.println(list3);
System.out.println(list4);
}
}
}

View File

@@ -0,0 +1,41 @@
package com.abin.frequencycontrol.service.frequencycontrol.single;
public class TokenBucketRateLimiter {
private final int capacity; // 令牌桶容量
private final int rate; // 令牌产生速率 (每秒的令牌数)
private int tokens; // 当前令牌数量
private long lastRefillTime; // 上次令牌补充时间
private long left = 0;
public TokenBucketRateLimiter(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.tokens = capacity;
this.lastRefillTime = System.currentTimeMillis();
}
public synchronized boolean tryAcquire() {
refillTokens();
if (tokens > 0) {
tokens--;
return true;
}
return false;
}
//补充令牌
private void refillTokens() {
long currentTime = System.currentTimeMillis();
//当前时间和上次请求时间相差的令牌数
long elapsedTime = currentTime - lastRefillTime;
if (elapsedTime > 0) {
//需要补充的令牌
int newTokens = (int) ((elapsedTime * rate + left) / 1000);
left = (elapsedTime * rate + left) % 1000;
//令牌不能超过桶的大小
tokens = Math.min(tokens + newTokens, capacity);
lastRefillTime = currentTime;
}
}
}