fix:微信推送接入频控

This commit is contained in:
zhongzb
2023-07-06 23:08:06 +08:00
parent 4ffb15d48f
commit 65f8735d28
3 changed files with 44 additions and 9 deletions

View File

@@ -88,6 +88,17 @@ public abstract class AbstractFrequencyControlService<K extends FrequencyControl
T get() throws Throwable; T get() throws Throwable;
} }
@FunctionalInterface
public interface Executor {
/**
* Gets a result.
*
* @return a result
*/
void execute() throws Throwable;
}
/** /**
* 是否达到限流阈值 子类实现 每个子类都可以自定义自己的限流逻辑判断 * 是否达到限流阈值 子类实现 每个子类都可以自定义自己的限流逻辑判断
* *

View File

@@ -28,6 +28,14 @@ public class FrequencyControlUtil {
return frequencyController.executeWithFrequencyControl(frequencyControl, supplier); return frequencyController.executeWithFrequencyControl(frequencyControl, supplier);
} }
public static <K extends FrequencyControlDTO> void executeWithFrequencyControl(String strategyName, K frequencyControl, AbstractFrequencyControlService.Executor executor) throws Throwable {
AbstractFrequencyControlService<K> frequencyController = FrequencyControlStrategyFactory.getFrequencyControllerByName(strategyName);
frequencyController.executeWithFrequencyControl(frequencyControl, () -> {
executor.execute();
return null;
});
}
/** /**
* 多限流策略的编程式调用方法调用方法 * 多限流策略的编程式调用方法调用方法

View File

@@ -1,7 +1,10 @@
package com.abin.mallchat.custom.chat.service.impl; package com.abin.mallchat.custom.chat.service.impl;
import cn.hutool.core.thread.NamedThreadFactory; import cn.hutool.core.thread.NamedThreadFactory;
import com.abin.mallchat.common.common.domain.dto.FrequencyControlDTO;
import com.abin.mallchat.common.common.exception.FrequencyControlException;
import com.abin.mallchat.common.common.handler.GlobalUncaughtExceptionHandler; import com.abin.mallchat.common.common.handler.GlobalUncaughtExceptionHandler;
import com.abin.mallchat.common.common.service.frequencycontrol.FrequencyControlUtil;
import com.abin.mallchat.common.common.utils.JsonUtils; import com.abin.mallchat.common.common.utils.JsonUtils;
import com.abin.mallchat.common.user.domain.entity.User; import com.abin.mallchat.common.user.domain.entity.User;
import com.abin.mallchat.common.user.service.cache.UserCache; import com.abin.mallchat.common.user.service.cache.UserCache;
@@ -15,20 +18,17 @@ import me.chanjar.weixin.mp.bean.template.WxMpTemplateMessage;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static com.abin.mallchat.common.common.service.frequencycontrol.FrequencyControlStrategyFactory.TOTAL_COUNT_WITH_IN_FIX_TIME_FREQUENCY_CONTROLLER;
@Slf4j @Slf4j
@Component @Component
public class WeChatMsgOperationServiceImpl implements WeChatMsgOperationService { public class WeChatMsgOperationServiceImpl implements WeChatMsgOperationService {
private static final ExecutorService executor = new ThreadPoolExecutor(1, 10, 3000L, private static final ExecutorService executor = new ThreadPoolExecutor(1, 10, 3000L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
@@ -54,15 +54,31 @@ public class WeChatMsgOperationServiceImpl implements WeChatMsgOperationService
uidSet.addAll(receiverUidList); uidSet.addAll(receiverUidList);
Map<Long, User> userMap = userCache.getUserInfoBatch(uidSet); Map<Long, User> userMap = userCache.getUserInfoBatch(uidSet);
userMap.values().forEach(user -> { userMap.values().forEach(user -> {
if (Objects.nonNull(user.getOpenId()) && user.isPublishChatToWechatSwitch()) { if (Objects.nonNull(user.getOpenId())) {
executor.execute(() -> { executor.execute(() -> {
WxMpTemplateMessage msgTemplate = getAtMsgTemplate(sender, user.getOpenId(), msg); WxMpTemplateMessage msgTemplate = getAtMsgTemplate(sender, user.getOpenId(), msg);
publishTemplateMsg(msgTemplate); publishTemplateMsgCheckLimit(msgTemplate);
}); });
} }
}); });
} }
private void publishTemplateMsgCheckLimit(WxMpTemplateMessage msgTemplate) {
try {
FrequencyControlDTO frequencyControlDTO = new FrequencyControlDTO();
frequencyControlDTO.setKey("TemplateMsg:" + msgTemplate.getToUser());
frequencyControlDTO.setUnit(TimeUnit.HOURS);
frequencyControlDTO.setCount(1);
frequencyControlDTO.setTime(1);
FrequencyControlUtil.executeWithFrequencyControl(TOTAL_COUNT_WITH_IN_FIX_TIME_FREQUENCY_CONTROLLER, frequencyControlDTO,
() -> publishTemplateMsg(msgTemplate));
} catch (FrequencyControlException e) {
log.info("wx push limit openid:{}", msgTemplate.getToUser());
} catch (Throwable e) {
log.error("wx push error openid:{}", msgTemplate.getToUser());
}
}
/* /*
* 构造微信模板消息 * 构造微信模板消息
*/ */