mirror of
https://github.com/zongzibinbin/MallChat.git
synced 2026-03-14 06:03:42 +08:00
Merge branch 'self'
This commit is contained in:
@@ -18,8 +18,10 @@ import com.abin.mallchat.custom.chat.domain.vo.response.ChatMemberStatisticResp;
|
|||||||
import com.abin.mallchat.custom.chat.domain.vo.response.ChatMessageResp;
|
import com.abin.mallchat.custom.chat.domain.vo.response.ChatMessageResp;
|
||||||
import com.abin.mallchat.custom.chat.domain.vo.response.ChatRoomResp;
|
import com.abin.mallchat.custom.chat.domain.vo.response.ChatRoomResp;
|
||||||
import com.abin.mallchat.custom.chat.service.ChatService;
|
import com.abin.mallchat.custom.chat.service.ChatService;
|
||||||
|
import com.abin.mallchat.custom.user.service.impl.UserServiceImpl;
|
||||||
import io.swagger.annotations.Api;
|
import io.swagger.annotations.Api;
|
||||||
import io.swagger.annotations.ApiOperation;
|
import io.swagger.annotations.ApiOperation;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
|
|
||||||
@@ -39,6 +41,7 @@ import java.util.Set;
|
|||||||
@RestController
|
@RestController
|
||||||
@RequestMapping("/capi/chat")
|
@RequestMapping("/capi/chat")
|
||||||
@Api(tags = "聊天室相关接口")
|
@Api(tags = "聊天室相关接口")
|
||||||
|
@Slf4j
|
||||||
public class ChatController {
|
public class ChatController {
|
||||||
@Autowired
|
@Autowired
|
||||||
private ChatService chatService;
|
private ChatService chatService;
|
||||||
@@ -53,7 +56,9 @@ public class ChatController {
|
|||||||
|
|
||||||
@GetMapping("/public/member/page")
|
@GetMapping("/public/member/page")
|
||||||
@ApiOperation("群成员列表")
|
@ApiOperation("群成员列表")
|
||||||
|
@FrequencyControl(time = 120, count = 10, target = FrequencyControl.Target.IP)
|
||||||
public ApiResult<CursorPageBaseResp<ChatMemberResp>> getMemberPage(@Valid CursorPageBaseReq request) {
|
public ApiResult<CursorPageBaseResp<ChatMemberResp>> getMemberPage(@Valid CursorPageBaseReq request) {
|
||||||
|
black(request);
|
||||||
CursorPageBaseResp<ChatMemberResp> memberPage = chatService.getMemberPage(request);
|
CursorPageBaseResp<ChatMemberResp> memberPage = chatService.getMemberPage(request);
|
||||||
filterBlackMember(memberPage);
|
filterBlackMember(memberPage);
|
||||||
return ApiResult.success(memberPage);
|
return ApiResult.success(memberPage);
|
||||||
@@ -74,14 +79,28 @@ public class ChatController {
|
|||||||
return ApiResult.success(chatService.getMemberStatistic());
|
return ApiResult.success(chatService.getMemberStatistic());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private UserServiceImpl userService;
|
||||||
|
|
||||||
@GetMapping("/public/msg/page")
|
@GetMapping("/public/msg/page")
|
||||||
@ApiOperation("消息列表")
|
@ApiOperation("消息列表")
|
||||||
|
@FrequencyControl(time = 120, count = 10, target = FrequencyControl.Target.IP)
|
||||||
public ApiResult<CursorPageBaseResp<ChatMessageResp>> getMsgPage(@Valid ChatMessagePageReq request) {
|
public ApiResult<CursorPageBaseResp<ChatMessageResp>> getMsgPage(@Valid ChatMessagePageReq request) {
|
||||||
|
black(request);
|
||||||
CursorPageBaseResp<ChatMessageResp> msgPage = chatService.getMsgPage(request, RequestHolder.get().getUid());
|
CursorPageBaseResp<ChatMessageResp> msgPage = chatService.getMsgPage(request, RequestHolder.get().getUid());
|
||||||
filterBlackMsg(msgPage);
|
filterBlackMsg(msgPage);
|
||||||
return ApiResult.success(msgPage);
|
return ApiResult.success(msgPage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void black(CursorPageBaseReq baseReq) {
|
||||||
|
if (baseReq.getPageSize() > 50) {
|
||||||
|
log.info("limit request:{}", baseReq);
|
||||||
|
baseReq.setPageSize(10);
|
||||||
|
userService.blackIp(RequestHolder.get().getIp());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private void filterBlackMsg(CursorPageBaseResp<ChatMessageResp> memberPage) {
|
private void filterBlackMsg(CursorPageBaseResp<ChatMessageResp> memberPage) {
|
||||||
Set<String> blackMembers = getBlackUidSet();
|
Set<String> blackMembers = getBlackUidSet();
|
||||||
memberPage.getList().removeIf(a -> blackMembers.contains(a.getFromUser().getUid().toString()));
|
memberPage.getList().removeIf(a -> blackMembers.contains(a.getFromUser().getUid().toString()));
|
||||||
|
|||||||
@@ -128,7 +128,7 @@ public class UserServiceImpl implements UserService {
|
|||||||
applicationEventPublisher.publishEvent(new UserBlackEvent(this, byId));
|
applicationEventPublisher.publishEvent(new UserBlackEvent(this, byId));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void blackIp(String ip) {
|
public void blackIp(String ip) {
|
||||||
if (StrUtil.isBlank(ip)) {
|
if (StrUtil.isBlank(ip)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
|
|||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.RandomUtil;
|
import cn.hutool.core.util.RandomUtil;
|
||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.abin.mallchat.common.common.annotation.FrequencyControl;
|
||||||
import com.abin.mallchat.common.common.config.ThreadPoolConfig;
|
import com.abin.mallchat.common.common.config.ThreadPoolConfig;
|
||||||
import com.abin.mallchat.common.common.event.UserOfflineEvent;
|
import com.abin.mallchat.common.common.event.UserOfflineEvent;
|
||||||
import com.abin.mallchat.common.common.event.UserOnlineEvent;
|
import com.abin.mallchat.common.common.event.UserOnlineEvent;
|
||||||
@@ -89,6 +90,8 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
*/
|
*/
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
@Override
|
@Override
|
||||||
|
@FrequencyControl(time = 10, count = 2, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
|
||||||
|
@FrequencyControl(time = 100, count = 5, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
|
||||||
public void handleLoginReq(Channel channel) {
|
public void handleLoginReq(Channel channel) {
|
||||||
//生成随机不重复的登录码
|
//生成随机不重复的登录码
|
||||||
Integer code = generateLoginCode(channel);
|
Integer code = generateLoginCode(channel);
|
||||||
@@ -119,6 +122,7 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
* @param channel
|
* @param channel
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@FrequencyControl(time = 10, count = 5, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
|
||||||
public void connect(Channel channel) {
|
public void connect(Channel channel) {
|
||||||
ONLINE_WS_MAP.put(channel, new WSChannelExtraDTO());
|
ONLINE_WS_MAP.put(channel, new WSChannelExtraDTO());
|
||||||
}
|
}
|
||||||
@@ -174,6 +178,7 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
getOrInitChannelExt(channel).setUid(uid);
|
getOrInitChannelExt(channel).setUid(uid);
|
||||||
ONLINE_UID_MAP.putIfAbsent(uid, new CopyOnWriteArrayList<>());
|
ONLINE_UID_MAP.putIfAbsent(uid, new CopyOnWriteArrayList<>());
|
||||||
ONLINE_UID_MAP.get(uid).add(channel);
|
ONLINE_UID_MAP.get(uid).add(channel);
|
||||||
|
NettyUtil.setAttr(channel, NettyUtil.UID, uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -0,0 +1,27 @@
|
|||||||
|
package com.abin.mallchat.custom.user.websocket;
|
||||||
|
|
||||||
|
import com.abin.mallchat.common.common.constant.MDCKey;
|
||||||
|
import com.abin.mallchat.common.common.domain.dto.RequestInfo;
|
||||||
|
import com.abin.mallchat.common.common.utils.RequestHolder;
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.slf4j.MDC;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class NettyCollectorHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||||
|
String tid = UUID.randomUUID().toString();
|
||||||
|
MDC.put(MDCKey.TID, tid);
|
||||||
|
RequestInfo info = new RequestInfo();
|
||||||
|
info.setUid(NettyUtil.getAttr(ctx.channel(), NettyUtil.UID));
|
||||||
|
info.setIp(NettyUtil.getAttr(ctx.channel(), NettyUtil.IP));
|
||||||
|
RequestHolder.set(info);
|
||||||
|
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,6 +13,7 @@ import io.netty.util.AttributeKey;
|
|||||||
public class NettyUtil {
|
public class NettyUtil {
|
||||||
|
|
||||||
public static AttributeKey<String> IP = AttributeKey.valueOf("ip");
|
public static AttributeKey<String> IP = AttributeKey.valueOf("ip");
|
||||||
|
public static AttributeKey<Long> UID = AttributeKey.valueOf("uid");
|
||||||
|
|
||||||
public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {
|
public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {
|
||||||
Attribute<T> attr = channel.attr(attributeKey);
|
Attribute<T> attr = channel.attr(attributeKey);
|
||||||
|
|||||||
@@ -79,6 +79,7 @@ public class NettyWebSocketServer {
|
|||||||
pipeline.addLast(new HttpObjectAggregator(8192));
|
pipeline.addLast(new HttpObjectAggregator(8192));
|
||||||
//保存用户ip
|
//保存用户ip
|
||||||
pipeline.addLast(new HttpHeadersHandler());
|
pipeline.addLast(new HttpHeadersHandler());
|
||||||
|
pipeline.addLast(new NettyCollectorHandler());
|
||||||
/**
|
/**
|
||||||
* 说明:
|
* 说明:
|
||||||
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
|
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import com.abin.mallchat.custom.user.service.WebSocketService;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
||||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||||
|
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
|
||||||
import io.netty.handler.timeout.IdleState;
|
import io.netty.handler.timeout.IdleState;
|
||||||
import io.netty.handler.timeout.IdleStateEvent;
|
import io.netty.handler.timeout.IdleStateEvent;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -20,7 +21,7 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<Tex
|
|||||||
// 当web客户端连接后,触发该方法
|
// 当web客户端连接后,触发该方法
|
||||||
@Override
|
@Override
|
||||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||||
getService().connect(ctx.channel());
|
// getService().connect(ctx.channel());
|
||||||
}
|
}
|
||||||
|
|
||||||
// 客户端离线
|
// 客户端离线
|
||||||
@@ -63,6 +64,8 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<Tex
|
|||||||
// 关闭用户的连接
|
// 关闭用户的连接
|
||||||
userOffLine(ctx);
|
userOffLine(ctx);
|
||||||
}
|
}
|
||||||
|
} else if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
|
||||||
|
getService().connect(ctx.channel());
|
||||||
}
|
}
|
||||||
super.userEventTriggered(ctx, evt);
|
super.userEventTriggered(ctx, evt);
|
||||||
}
|
}
|
||||||
@@ -70,7 +73,7 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<Tex
|
|||||||
// 处理异常
|
// 处理异常
|
||||||
@Override
|
@Override
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
log.warn("异常发生,异常消息 ={}", cause.getMessage());
|
log.warn("异常发生,异常消息 ={}", cause);
|
||||||
ctx.channel().close();
|
ctx.channel().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user