mirror of
https://gitcode.com/ageerle/ruoyi-ai.git
synced 2026-03-13 20:53:42 +08:00
问题概述
1.保存消息和计费逻辑存在耦合 2.修改计费逻辑: 按次计费被阈值限制:旧逻辑把 TIMES 分支放在 totalTokens ≥ 100 的大分支里,导致没到100 token时不扣费,违背“每次调用就扣费”的语义。 token累计不当:TIMES 分支只扣费不处理累计,同时在 totalTokens < 100 时不会进入任何TIMES逻辑,累计会无意义增长。 粒度不稳定:TOKEN 计费一旦达阈值就把 total 全扣完并清零,不利于对账与用户体验。 打印方式:使用 System.out.println,不利于生产追踪。 改动要点 1.新增独立方法 saveMessage(ChatRequest): 只落库。 publishBillingEvent(ChatRequest): 只发布异步计费事件。 保留组合方法 saveMessageAndPublishEvent(ChatRequest) 以便需要一行调用时使用。 调用处已改为“先保存,再发布事件” SseServiceImpl: 先 saveMessage,再 publishBillingEvent。 SSEEventSourceListener: 同上。 DifyServiceImpl: 同上。 2.计费模式分流: TIMES:每次调用直接扣费,不累计。 TOKEN:按阈值(100)批量扣费,保留余数,账单颗粒稳定。 保留余数:total = prev + delta;billable = floor(total/threshold)threshold;remainder = total % threshold。 日志替换:统一使用 log.debug。 结构更清晰、可维护。 所有金额计算统一用 BigDecimal,保留两位小数,RoundingMode.HALF_UP 按次计费:每次直接扣费(BigDecimal),边界转 Double 按 token 计费:按阈值批量结算,保留余数;费用=单价(BigDecimal)×可结算token数 1. 消息分类存储 用户消息:role="user", deductCost=null, totalTokens=本次token数, remark="用户消息" 系统账单:role="system", deductCost=实际扣费, totalTokens=计费token数, remark="TIMES_BILLING/TOKEN_BILLING" 2. 数据流程 用户发送消息 → 预检查余额 → 保存用户消息 → 发布计费事件 → 异步扣费 → 保存账单记录
This commit is contained in:
@@ -7,6 +7,7 @@ import org.ruoyi.chat.service.chat.IChatCostService;
|
||||
import org.ruoyi.common.chat.request.ChatRequest;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
|
||||
@@ -18,8 +19,10 @@ public class BillingEventListener {
|
||||
private final IChatCostService chatCostService;
|
||||
|
||||
@Async
|
||||
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
|
||||
@EventListener
|
||||
public void onChatMessageCreated(ChatMessageCreatedEvent event) {
|
||||
log.debug("BillingEventListener->接收到计费事件,用户ID: {},会话ID: {},模型: {}",
|
||||
event.getUserId(), event.getSessionId(), event.getModelName());
|
||||
try {
|
||||
ChatRequest chatRequest = new ChatRequest();
|
||||
chatRequest.setUserId(event.getUserId());
|
||||
@@ -28,9 +31,17 @@ public class BillingEventListener {
|
||||
chatRequest.setRole(event.getRole());
|
||||
chatRequest.setPrompt(event.getContent());
|
||||
// 异步执行计费累计与扣费
|
||||
log.debug("BillingEventListener->开始执行计费逻辑");
|
||||
chatCostService.deductToken(chatRequest);
|
||||
log.debug("BillingEventListener->计费逻辑执行完成");
|
||||
} catch (Exception ex) {
|
||||
log.error("BillingEventListener onChatMessageCreated error", ex);
|
||||
// 由于已有预检查,这里的异常主要是系统异常(数据库连接等)
|
||||
// 记录错误但不中断异步线程
|
||||
log.error("BillingEventListener->异步计费异常,用户ID: {},模型: {},错误: {}",
|
||||
event.getUserId(), event.getModelName(), ex.getMessage(), ex);
|
||||
|
||||
// TODO: 可以考虑加入重试机制或者错误通知机制
|
||||
// 例如:发送到死信队列,或者通知运维人员
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,8 @@ public class ChatCostServiceImpl implements IChatCostService {
|
||||
*/
|
||||
@Override
|
||||
public void deductToken(ChatRequest chatRequest) {
|
||||
if (chatRequest.getUserId() == null || chatRequest.getSessionId() == null) {
|
||||
if (chatRequest.getUserId() == null) {
|
||||
log.warn("deductToken->用户ID为空,跳过计费");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -69,6 +70,17 @@ public class ChatCostServiceImpl implements IChatCostService {
|
||||
BigDecimal numberCost = unitPrice.setScale(2, RoundingMode.HALF_UP);
|
||||
deductUserBalance(chatRequest.getUserId(), numberCost.doubleValue());
|
||||
log.debug("deductToken->按次数扣费,费用: {},模型: {}", numberCost, modelName);
|
||||
|
||||
// 清理可能存在的历史累计token(模型计费方式可能发生过变更)
|
||||
ChatUsageToken existingToken = chatTokenService.queryByUserId(chatRequest.getUserId(), modelName);
|
||||
if (existingToken != null && existingToken.getToken() > 0) {
|
||||
existingToken.setToken(0);
|
||||
chatTokenService.editToken(existingToken);
|
||||
log.debug("deductToken->按次计费,清理历史累计token: {}", existingToken.getToken());
|
||||
}
|
||||
|
||||
// 记录账单消息
|
||||
saveBillingRecord(chatRequest, tokens, numberCost.doubleValue(), "TIMES_BILLING");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -76,10 +88,13 @@ public class ChatCostServiceImpl implements IChatCostService {
|
||||
final int threshold = 100;
|
||||
|
||||
// 获得记录的累计token数
|
||||
// TODO: 这里存在并发竞态条件,需要在chatTokenService层面添加乐观锁或分布式锁
|
||||
ChatUsageToken chatToken = chatTokenService.queryByUserId(chatRequest.getUserId(), modelName);
|
||||
if (chatToken == null) {
|
||||
chatToken = new ChatUsageToken();
|
||||
chatToken.setToken(0);
|
||||
chatToken.setModelName(modelName);
|
||||
chatToken.setUserId(chatRequest.getUserId());
|
||||
}
|
||||
|
||||
int previousUnpaid = chatToken.getToken();
|
||||
@@ -94,16 +109,32 @@ public class ChatCostServiceImpl implements IChatCostService {
|
||||
.multiply(BigDecimal.valueOf(billable))
|
||||
.setScale(2, RoundingMode.HALF_UP);
|
||||
log.debug("deductToken->按token扣费,结算token数量: {},单价: {},费用: {}", billable, unitPrice, numberCost);
|
||||
deductUserBalance(chatRequest.getUserId(), numberCost.doubleValue());
|
||||
|
||||
try {
|
||||
// 先尝试扣费
|
||||
deductUserBalance(chatRequest.getUserId(), numberCost.doubleValue());
|
||||
// 扣费成功后,保存余数
|
||||
chatToken.setModelName(modelName);
|
||||
chatToken.setUserId(chatRequest.getUserId());
|
||||
chatToken.setToken(remainder);
|
||||
chatTokenService.editToken(chatToken);
|
||||
log.debug("deductToken->扣费成功,更新余数: {}", remainder);
|
||||
|
||||
// 记录账单消息
|
||||
saveBillingRecord(chatRequest, billable, numberCost.doubleValue(), "TOKEN_BILLING");
|
||||
} catch (ServiceException e) {
|
||||
// 余额不足时,不更新token累计,保持原有累计数
|
||||
log.warn("deductToken->余额不足,本次token累计保持不变: {}", totalTokens);
|
||||
throw e; // 重新抛出异常
|
||||
}
|
||||
} else {
|
||||
// 未达阈值,累积token
|
||||
log.debug("deductToken->未达到计费阈值({}),累积到下次", threshold);
|
||||
chatToken.setModelName(modelName);
|
||||
chatToken.setUserId(chatRequest.getUserId());
|
||||
chatToken.setToken(totalTokens);
|
||||
chatTokenService.editToken(chatToken);
|
||||
}
|
||||
|
||||
// 保存剩余tokens(保留余数)
|
||||
chatToken.setModelName(modelName);
|
||||
chatToken.setUserId(chatRequest.getUserId());
|
||||
chatToken.setToken(remainder);
|
||||
chatTokenService.editToken(chatToken);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -112,24 +143,57 @@ public class ChatCostServiceImpl implements IChatCostService {
|
||||
@Override
|
||||
public void saveMessage(ChatRequest chatRequest) {
|
||||
if (chatRequest.getUserId() == null || chatRequest.getSessionId() == null) {
|
||||
log.warn("saveMessage->用户ID或会话ID为空,跳过保存消息");
|
||||
return;
|
||||
}
|
||||
|
||||
// 验证消息内容
|
||||
if (chatRequest.getPrompt() == null || chatRequest.getPrompt().trim().isEmpty()) {
|
||||
log.warn("saveMessage->消息内容为空,跳过保存");
|
||||
return;
|
||||
}
|
||||
|
||||
ChatMessageBo chatMessageBo = new ChatMessageBo();
|
||||
chatMessageBo.setUserId(chatRequest.getUserId());
|
||||
chatMessageBo.setSessionId(chatRequest.getSessionId());
|
||||
chatMessageBo.setRole(chatRequest.getRole());
|
||||
chatMessageBo.setContent(chatRequest.getPrompt());
|
||||
chatMessageBo.setContent(chatRequest.getPrompt().trim());
|
||||
chatMessageBo.setModelName(chatRequest.getModel());
|
||||
|
||||
// 计算并保存本次消息的token数
|
||||
int tokens = TikTokensUtil.tokens(chatRequest.getModel(), chatRequest.getPrompt());
|
||||
chatMessageBo.setTotalTokens(tokens);
|
||||
|
||||
// 普通消息不涉及扣费,deductCost保持null
|
||||
chatMessageBo.setDeductCost(null);
|
||||
chatMessageBo.setRemark("用户消息");
|
||||
|
||||
|
||||
|
||||
chatMessageService.insertByBo(chatMessageBo);
|
||||
try {
|
||||
chatMessageService.insertByBo(chatMessageBo);
|
||||
log.debug("saveMessage->成功保存消息,用户ID: {}, 会话ID: {}, tokens: {}",
|
||||
chatRequest.getUserId(), chatRequest.getSessionId(), tokens);
|
||||
} catch (Exception e) {
|
||||
log.error("saveMessage->保存消息失败", e);
|
||||
throw new ServiceException("保存消息失败");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void publishBillingEvent(ChatRequest chatRequest) {
|
||||
log.debug("publishBillingEvent->发布计费事件,用户ID: {},会话ID: {},模型: {}",
|
||||
chatRequest.getUserId(), chatRequest.getSessionId(), chatRequest.getModel());
|
||||
|
||||
// 预检查:评估可能的扣费金额,如果余额不足则直接抛异常
|
||||
try {
|
||||
preCheckBalance(chatRequest);
|
||||
} catch (ServiceException e) {
|
||||
log.warn("publishBillingEvent->预检查余额不足,用户ID: {},模型: {}",
|
||||
chatRequest.getUserId(), chatRequest.getModel());
|
||||
throw e; // 直接抛出,阻止消息保存和对话继续
|
||||
}
|
||||
|
||||
eventPublisher.publishEvent(new ChatMessageCreatedEvent(
|
||||
chatRequest.getUserId(),
|
||||
chatRequest.getSessionId(),
|
||||
@@ -137,6 +201,93 @@ public class ChatCostServiceImpl implements IChatCostService {
|
||||
chatRequest.getRole(),
|
||||
chatRequest.getPrompt()
|
||||
));
|
||||
log.debug("publishBillingEvent->计费事件发布完成");
|
||||
}
|
||||
|
||||
/**
|
||||
* 预检查用户余额是否足够支付可能的费用
|
||||
*/
|
||||
private void preCheckBalance(ChatRequest chatRequest) {
|
||||
if (chatRequest.getUserId() == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
int tokens = TikTokensUtil.tokens(chatRequest.getModel(), chatRequest.getPrompt());
|
||||
String modelName = chatRequest.getModel();
|
||||
ChatModelVo chatModelVo = chatModelService.selectModelByName(modelName);
|
||||
BigDecimal unitPrice = BigDecimal.valueOf(chatModelVo.getModelPrice());
|
||||
|
||||
// 按次计费:直接检查单次费用
|
||||
if (BillingType.TIMES.getCode().equals(chatModelVo.getModelType())) {
|
||||
BigDecimal numberCost = unitPrice.setScale(2, RoundingMode.HALF_UP);
|
||||
checkUserBalanceWithoutDeduct(chatRequest.getUserId(), numberCost.doubleValue());
|
||||
return;
|
||||
}
|
||||
|
||||
// 按token计费:检查累计后可能的费用
|
||||
final int threshold = 100;
|
||||
ChatUsageToken chatToken = chatTokenService.queryByUserId(chatRequest.getUserId(), modelName);
|
||||
int previousUnpaid = (chatToken == null) ? 0 : chatToken.getToken();
|
||||
int totalTokens = previousUnpaid + tokens;
|
||||
|
||||
int billable = (totalTokens / threshold) * threshold;
|
||||
if (billable > 0) {
|
||||
BigDecimal numberCost = unitPrice
|
||||
.multiply(BigDecimal.valueOf(billable))
|
||||
.setScale(2, RoundingMode.HALF_UP);
|
||||
checkUserBalanceWithoutDeduct(chatRequest.getUserId(), numberCost.doubleValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查用户余额是否足够,但不扣除
|
||||
*/
|
||||
private void checkUserBalanceWithoutDeduct(Long userId, Double numberCost) {
|
||||
SysUser sysUser = sysUserMapper.selectById(userId);
|
||||
if (sysUser == null) {
|
||||
throw new ServiceException("用户不存在");
|
||||
}
|
||||
|
||||
BigDecimal userBalance = BigDecimal.valueOf(sysUser.getUserBalance() == null ? 0D : sysUser.getUserBalance())
|
||||
.setScale(2, RoundingMode.HALF_UP);
|
||||
BigDecimal cost = BigDecimal.valueOf(numberCost == null ? 0D : numberCost)
|
||||
.setScale(2, RoundingMode.HALF_UP);
|
||||
|
||||
if (userBalance.compareTo(cost) < 0 || userBalance.compareTo(BigDecimal.ZERO) == 0) {
|
||||
throw new ServiceException("余额不足, 请充值。当前余额: " + userBalance + ",需要: " + cost);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存账单记录
|
||||
*/
|
||||
private void saveBillingRecord(ChatRequest chatRequest, int billedTokens, double cost, String billingType) {
|
||||
try {
|
||||
ChatMessageBo billingMessage = new ChatMessageBo();
|
||||
billingMessage.setUserId(chatRequest.getUserId());
|
||||
billingMessage.setSessionId(chatRequest.getSessionId());
|
||||
billingMessage.setRole("system"); // 系统账单消息
|
||||
billingMessage.setModelName(chatRequest.getModel());
|
||||
billingMessage.setTotalTokens(billedTokens);
|
||||
billingMessage.setDeductCost(cost);
|
||||
billingMessage.setRemark(billingType);
|
||||
|
||||
// 构建账单消息内容
|
||||
String content;
|
||||
if ("TIMES_BILLING".equals(billingType)) {
|
||||
content = String.format("按次计费:消耗 %d tokens,扣费 %.2f 元", billedTokens, cost);
|
||||
} else {
|
||||
content = String.format("按量计费:结算 %d tokens,扣费 %.2f 元", billedTokens, cost);
|
||||
}
|
||||
billingMessage.setContent(content);
|
||||
|
||||
chatMessageService.insertByBo(billingMessage);
|
||||
log.debug("saveBillingRecord->保存账单记录成功,用户ID: {}, 计费类型: {}, 费用: {}",
|
||||
chatRequest.getUserId(), billingType, cost);
|
||||
} catch (Exception e) {
|
||||
log.error("saveBillingRecord->保存账单记录失败", e);
|
||||
// 账单记录失败不影响主流程,只记录错误日志
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user