From affdc5e3a692cfba238459443e8dc4f465f6f39a Mon Sep 17 00:00:00 2001 From: Administrator <1037463791@qq.com> Date: Thu, 14 Aug 2025 14:00:48 +0800 Subject: [PATCH] =?UTF-8?q?=E9=97=AE=E9=A2=98=E6=A6=82=E8=BF=B0=201.?= =?UTF-8?q?=E4=BF=9D=E5=AD=98=E6=B6=88=E6=81=AF=E5=92=8C=E8=AE=A1=E8=B4=B9?= =?UTF-8?q?=E9=80=BB=E8=BE=91=E5=AD=98=E5=9C=A8=E8=80=A6=E5=90=88=202.?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=AE=A1=E8=B4=B9=E9=80=BB=E8=BE=91=EF=BC=9A?= =?UTF-8?q?=20=E6=8C=89=E6=AC=A1=E8=AE=A1=E8=B4=B9=E8=A2=AB=E9=98=88?= =?UTF-8?q?=E5=80=BC=E9=99=90=E5=88=B6=EF=BC=9A=E6=97=A7=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E6=8A=8A=20TIMES=20=E5=88=86=E6=94=AF=E6=94=BE=E5=9C=A8=20tota?= =?UTF-8?q?lTokens=20=E2=89=A5=20100=20=E7=9A=84=E5=A4=A7=E5=88=86?= =?UTF-8?q?=E6=94=AF=E9=87=8C=EF=BC=8C=E5=AF=BC=E8=87=B4=E6=B2=A1=E5=88=B0?= =?UTF-8?q?100=20token=E6=97=B6=E4=B8=8D=E6=89=A3=E8=B4=B9=EF=BC=8C?= =?UTF-8?q?=E8=BF=9D=E8=83=8C=E2=80=9C=E6=AF=8F=E6=AC=A1=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E5=B0=B1=E6=89=A3=E8=B4=B9=E2=80=9D=E7=9A=84=E8=AF=AD=E4=B9=89?= =?UTF-8?q?=E3=80=82=20token=E7=B4=AF=E8=AE=A1=E4=B8=8D=E5=BD=93=EF=BC=9AT?= =?UTF-8?q?IMES=20=E5=88=86=E6=94=AF=E5=8F=AA=E6=89=A3=E8=B4=B9=E4=B8=8D?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=B4=AF=E8=AE=A1=EF=BC=8C=E5=90=8C=E6=97=B6?= =?UTF-8?q?=E5=9C=A8=20totalTokens=20<=20100=20=E6=97=B6=E4=B8=8D=E4=BC=9A?= =?UTF-8?q?=E8=BF=9B=E5=85=A5=E4=BB=BB=E4=BD=95TIMES=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E7=B4=AF=E8=AE=A1=E4=BC=9A=E6=97=A0=E6=84=8F=E4=B9=89?= =?UTF-8?q?=E5=A2=9E=E9=95=BF=E3=80=82=20=E7=B2=92=E5=BA=A6=E4=B8=8D?= =?UTF-8?q?=E7=A8=B3=E5=AE=9A=EF=BC=9ATOKEN=20=E8=AE=A1=E8=B4=B9=E4=B8=80?= =?UTF-8?q?=E6=97=A6=E8=BE=BE=E9=98=88=E5=80=BC=E5=B0=B1=E6=8A=8A=20total?= =?UTF-8?q?=20=E5=85=A8=E6=89=A3=E5=AE=8C=E5=B9=B6=E6=B8=85=E9=9B=B6?= =?UTF-8?q?=EF=BC=8C=E4=B8=8D=E5=88=A9=E4=BA=8E=E5=AF=B9=E8=B4=A6=E4=B8=8E?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E4=BD=93=E9=AA=8C=E3=80=82=20=E6=89=93?= =?UTF-8?q?=E5=8D=B0=E6=96=B9=E5=BC=8F=EF=BC=9A=E4=BD=BF=E7=94=A8=20System?= =?UTF-8?q?.out.println=EF=BC=8C=E4=B8=8D=E5=88=A9=E4=BA=8E=E7=94=9F?= =?UTF-8?q?=E4=BA=A7=E8=BF=BD=E8=B8=AA=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 改动要点 1.新增独立方法 saveMessage(ChatRequest): 只落库。 publishBillingEvent(ChatRequest): 只发布异步计费事件。 保留组合方法 saveMessageAndPublishEvent(ChatRequest) 以便需要一行调用时使用。 调用处已改为“先保存,再发布事件” SseServiceImpl: 先 saveMessage,再 publishBillingEvent。 SSEEventSourceListener: 同上。 DifyServiceImpl: 同上。 2.计费模式分流: TIMES:每次调用直接扣费,不累计。 TOKEN:按阈值(100)批量扣费,保留余数,账单颗粒稳定。 保留余数:total = prev + delta;billable = floor(total/threshold)threshold;remainder = total % threshold。 日志替换:统一使用 log.debug。 结构更清晰、可维护。 所有金额计算统一用 BigDecimal,保留两位小数,RoundingMode.HALF_UP 按次计费:每次直接扣费(BigDecimal),边界转 Double 按 token 计费:按阈值批量结算,保留余数;费用=单价(BigDecimal)×可结算token数 1. 消息分类存储 用户消息:role="user", deductCost=null, totalTokens=本次token数, remark="用户消息" 系统账单:role="system", deductCost=实际扣费, totalTokens=计费token数, remark="TIMES_BILLING/TOKEN_BILLING" 2. 数据流程 用户发送消息 → 预检查余额 → 保存用户消息 → 发布计费事件 → 异步扣费 → 保存账单记录 --- .../chat/listener/BillingEventListener.java | 15 +- .../chat/impl/ChatCostServiceImpl.java | 175 ++++++++++++++++-- 2 files changed, 176 insertions(+), 14 deletions(-) diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/BillingEventListener.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/BillingEventListener.java index 427730c0..74eda273 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/BillingEventListener.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/listener/BillingEventListener.java @@ -7,6 +7,7 @@ import org.ruoyi.chat.service.chat.IChatCostService; import org.ruoyi.common.chat.request.ChatRequest; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import org.springframework.context.event.EventListener; import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; @@ -18,8 +19,10 @@ public class BillingEventListener { private final IChatCostService chatCostService; @Async - @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) + @EventListener public void onChatMessageCreated(ChatMessageCreatedEvent event) { + log.debug("BillingEventListener->接收到计费事件,用户ID: {},会话ID: {},模型: {}", + event.getUserId(), event.getSessionId(), event.getModelName()); try { ChatRequest chatRequest = new ChatRequest(); chatRequest.setUserId(event.getUserId()); @@ -28,9 +31,17 @@ public class BillingEventListener { chatRequest.setRole(event.getRole()); chatRequest.setPrompt(event.getContent()); // 异步执行计费累计与扣费 + log.debug("BillingEventListener->开始执行计费逻辑"); chatCostService.deductToken(chatRequest); + log.debug("BillingEventListener->计费逻辑执行完成"); } catch (Exception ex) { - log.error("BillingEventListener onChatMessageCreated error", ex); + // 由于已有预检查,这里的异常主要是系统异常(数据库连接等) + // 记录错误但不中断异步线程 + log.error("BillingEventListener->异步计费异常,用户ID: {},模型: {},错误: {}", + event.getUserId(), event.getModelName(), ex.getMessage(), ex); + + // TODO: 可以考虑加入重试机制或者错误通知机制 + // 例如:发送到死信队列,或者通知运维人员 } } } diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ChatCostServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ChatCostServiceImpl.java index 0511cbb1..6947893d 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ChatCostServiceImpl.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/chat/service/chat/impl/ChatCostServiceImpl.java @@ -53,7 +53,8 @@ public class ChatCostServiceImpl implements IChatCostService { */ @Override public void deductToken(ChatRequest chatRequest) { - if (chatRequest.getUserId() == null || chatRequest.getSessionId() == null) { + if (chatRequest.getUserId() == null) { + log.warn("deductToken->用户ID为空,跳过计费"); return; } @@ -69,6 +70,17 @@ public class ChatCostServiceImpl implements IChatCostService { BigDecimal numberCost = unitPrice.setScale(2, RoundingMode.HALF_UP); deductUserBalance(chatRequest.getUserId(), numberCost.doubleValue()); log.debug("deductToken->按次数扣费,费用: {},模型: {}", numberCost, modelName); + + // 清理可能存在的历史累计token(模型计费方式可能发生过变更) + ChatUsageToken existingToken = chatTokenService.queryByUserId(chatRequest.getUserId(), modelName); + if (existingToken != null && existingToken.getToken() > 0) { + existingToken.setToken(0); + chatTokenService.editToken(existingToken); + log.debug("deductToken->按次计费,清理历史累计token: {}", existingToken.getToken()); + } + + // 记录账单消息 + saveBillingRecord(chatRequest, tokens, numberCost.doubleValue(), "TIMES_BILLING"); return; } @@ -76,10 +88,13 @@ public class ChatCostServiceImpl implements IChatCostService { final int threshold = 100; // 获得记录的累计token数 + // TODO: 这里存在并发竞态条件,需要在chatTokenService层面添加乐观锁或分布式锁 ChatUsageToken chatToken = chatTokenService.queryByUserId(chatRequest.getUserId(), modelName); if (chatToken == null) { chatToken = new ChatUsageToken(); chatToken.setToken(0); + chatToken.setModelName(modelName); + chatToken.setUserId(chatRequest.getUserId()); } int previousUnpaid = chatToken.getToken(); @@ -94,16 +109,32 @@ public class ChatCostServiceImpl implements IChatCostService { .multiply(BigDecimal.valueOf(billable)) .setScale(2, RoundingMode.HALF_UP); log.debug("deductToken->按token扣费,结算token数量: {},单价: {},费用: {}", billable, unitPrice, numberCost); - deductUserBalance(chatRequest.getUserId(), numberCost.doubleValue()); + + try { + // 先尝试扣费 + deductUserBalance(chatRequest.getUserId(), numberCost.doubleValue()); + // 扣费成功后,保存余数 + chatToken.setModelName(modelName); + chatToken.setUserId(chatRequest.getUserId()); + chatToken.setToken(remainder); + chatTokenService.editToken(chatToken); + log.debug("deductToken->扣费成功,更新余数: {}", remainder); + + // 记录账单消息 + saveBillingRecord(chatRequest, billable, numberCost.doubleValue(), "TOKEN_BILLING"); + } catch (ServiceException e) { + // 余额不足时,不更新token累计,保持原有累计数 + log.warn("deductToken->余额不足,本次token累计保持不变: {}", totalTokens); + throw e; // 重新抛出异常 + } } else { + // 未达阈值,累积token log.debug("deductToken->未达到计费阈值({}),累积到下次", threshold); + chatToken.setModelName(modelName); + chatToken.setUserId(chatRequest.getUserId()); + chatToken.setToken(totalTokens); + chatTokenService.editToken(chatToken); } - - // 保存剩余tokens(保留余数) - chatToken.setModelName(modelName); - chatToken.setUserId(chatRequest.getUserId()); - chatToken.setToken(remainder); - chatTokenService.editToken(chatToken); } /** @@ -112,24 +143,57 @@ public class ChatCostServiceImpl implements IChatCostService { @Override public void saveMessage(ChatRequest chatRequest) { if (chatRequest.getUserId() == null || chatRequest.getSessionId() == null) { + log.warn("saveMessage->用户ID或会话ID为空,跳过保存消息"); return; } + + // 验证消息内容 + if (chatRequest.getPrompt() == null || chatRequest.getPrompt().trim().isEmpty()) { + log.warn("saveMessage->消息内容为空,跳过保存"); + return; + } + ChatMessageBo chatMessageBo = new ChatMessageBo(); chatMessageBo.setUserId(chatRequest.getUserId()); chatMessageBo.setSessionId(chatRequest.getSessionId()); chatMessageBo.setRole(chatRequest.getRole()); - chatMessageBo.setContent(chatRequest.getPrompt()); + chatMessageBo.setContent(chatRequest.getPrompt().trim()); chatMessageBo.setModelName(chatRequest.getModel()); + + // 计算并保存本次消息的token数 + int tokens = TikTokensUtil.tokens(chatRequest.getModel(), chatRequest.getPrompt()); + chatMessageBo.setTotalTokens(tokens); + + // 普通消息不涉及扣费,deductCost保持null + chatMessageBo.setDeductCost(null); + chatMessageBo.setRemark("用户消息"); - - - chatMessageService.insertByBo(chatMessageBo); + try { + chatMessageService.insertByBo(chatMessageBo); + log.debug("saveMessage->成功保存消息,用户ID: {}, 会话ID: {}, tokens: {}", + chatRequest.getUserId(), chatRequest.getSessionId(), tokens); + } catch (Exception e) { + log.error("saveMessage->保存消息失败", e); + throw new ServiceException("保存消息失败"); + } } @Override public void publishBillingEvent(ChatRequest chatRequest) { + log.debug("publishBillingEvent->发布计费事件,用户ID: {},会话ID: {},模型: {}", + chatRequest.getUserId(), chatRequest.getSessionId(), chatRequest.getModel()); + + // 预检查:评估可能的扣费金额,如果余额不足则直接抛异常 + try { + preCheckBalance(chatRequest); + } catch (ServiceException e) { + log.warn("publishBillingEvent->预检查余额不足,用户ID: {},模型: {}", + chatRequest.getUserId(), chatRequest.getModel()); + throw e; // 直接抛出,阻止消息保存和对话继续 + } + eventPublisher.publishEvent(new ChatMessageCreatedEvent( chatRequest.getUserId(), chatRequest.getSessionId(), @@ -137,6 +201,93 @@ public class ChatCostServiceImpl implements IChatCostService { chatRequest.getRole(), chatRequest.getPrompt() )); + log.debug("publishBillingEvent->计费事件发布完成"); + } + + /** + * 预检查用户余额是否足够支付可能的费用 + */ + private void preCheckBalance(ChatRequest chatRequest) { + if (chatRequest.getUserId() == null) { + return; + } + + int tokens = TikTokensUtil.tokens(chatRequest.getModel(), chatRequest.getPrompt()); + String modelName = chatRequest.getModel(); + ChatModelVo chatModelVo = chatModelService.selectModelByName(modelName); + BigDecimal unitPrice = BigDecimal.valueOf(chatModelVo.getModelPrice()); + + // 按次计费:直接检查单次费用 + if (BillingType.TIMES.getCode().equals(chatModelVo.getModelType())) { + BigDecimal numberCost = unitPrice.setScale(2, RoundingMode.HALF_UP); + checkUserBalanceWithoutDeduct(chatRequest.getUserId(), numberCost.doubleValue()); + return; + } + + // 按token计费:检查累计后可能的费用 + final int threshold = 100; + ChatUsageToken chatToken = chatTokenService.queryByUserId(chatRequest.getUserId(), modelName); + int previousUnpaid = (chatToken == null) ? 0 : chatToken.getToken(); + int totalTokens = previousUnpaid + tokens; + + int billable = (totalTokens / threshold) * threshold; + if (billable > 0) { + BigDecimal numberCost = unitPrice + .multiply(BigDecimal.valueOf(billable)) + .setScale(2, RoundingMode.HALF_UP); + checkUserBalanceWithoutDeduct(chatRequest.getUserId(), numberCost.doubleValue()); + } + } + + /** + * 检查用户余额是否足够,但不扣除 + */ + private void checkUserBalanceWithoutDeduct(Long userId, Double numberCost) { + SysUser sysUser = sysUserMapper.selectById(userId); + if (sysUser == null) { + throw new ServiceException("用户不存在"); + } + + BigDecimal userBalance = BigDecimal.valueOf(sysUser.getUserBalance() == null ? 0D : sysUser.getUserBalance()) + .setScale(2, RoundingMode.HALF_UP); + BigDecimal cost = BigDecimal.valueOf(numberCost == null ? 0D : numberCost) + .setScale(2, RoundingMode.HALF_UP); + + if (userBalance.compareTo(cost) < 0 || userBalance.compareTo(BigDecimal.ZERO) == 0) { + throw new ServiceException("余额不足, 请充值。当前余额: " + userBalance + ",需要: " + cost); + } + } + + /** + * 保存账单记录 + */ + private void saveBillingRecord(ChatRequest chatRequest, int billedTokens, double cost, String billingType) { + try { + ChatMessageBo billingMessage = new ChatMessageBo(); + billingMessage.setUserId(chatRequest.getUserId()); + billingMessage.setSessionId(chatRequest.getSessionId()); + billingMessage.setRole("system"); // 系统账单消息 + billingMessage.setModelName(chatRequest.getModel()); + billingMessage.setTotalTokens(billedTokens); + billingMessage.setDeductCost(cost); + billingMessage.setRemark(billingType); + + // 构建账单消息内容 + String content; + if ("TIMES_BILLING".equals(billingType)) { + content = String.format("按次计费:消耗 %d tokens,扣费 %.2f 元", billedTokens, cost); + } else { + content = String.format("按量计费:结算 %d tokens,扣费 %.2f 元", billedTokens, cost); + } + billingMessage.setContent(content); + + chatMessageService.insertByBo(billingMessage); + log.debug("saveBillingRecord->保存账单记录成功,用户ID: {}, 计费类型: {}, 费用: {}", + chatRequest.getUserId(), billingType, cost); + } catch (Exception e) { + log.error("saveBillingRecord->保存账单记录失败", e); + // 账单记录失败不影响主流程,只记录错误日志 + } } /**