# websocket--3.5.netty方式01
依赖配置
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.45.Final</version>
</dependency>
NettyWebSocketServer
package com.zs.websocket.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
/**
* Create by 张邵
* 2024/1/29 09:48
*/
@Slf4j
@Configuration
public class NettyWebSocketServer {
public static final int WEB_SOCKET_PORT = 8090;
// 创建线程池执行器
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup(8);
/**
* 启动 ws server
*
* @return
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
run();
}
/**
* 销毁
*/
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}
public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//30秒客户端没有向服务器发送心跳则关闭连接
// pipeline.addLast(new IdleStateHandler(5, 0, 0));
// 因为使用http协议,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
//保存用户ip
pipeline.addLast(new HttpHeadersHandler());
// 自定义handler ,处理业务逻辑
pipeline.addLast(new NettyWebSocketServerHandler());
/**
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
}
});
// 启动服务器,监听端口,阻塞直到启动成功
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
System.out.println("启动成功");
}
}
处理器
package com.zs.websocket.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
/**
* 所有已连接的websocket连接列表和一些额外参数
*/
private static final ConcurrentHashMap<Channel, String> ONLINE_WS_MAP = new ConcurrentHashMap<>();
// 当web客户端连接后,触发该方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("websocket 连接");
ONLINE_WS_MAP.put(ctx.channel(), "1");
}
// 客户端离线
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
userOffLine(ctx);
}
/**
* 取消绑定
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 可能出现业务判断离线后再次触发 channelInactive
log.warn("触发 channelInactive 掉线![{}]", ctx.channel().id());
userOffLine(ctx);
}
private void userOffLine(ChannelHandlerContext ctx) {
System.out.println("websocket 断开连接");
ONLINE_WS_MAP.remove(ctx.channel());
ctx.channel().close();
}
/**
* 心跳检查
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 读空闲
if (idleStateEvent.state() == IdleState.READER_IDLE) {
// 关闭用户的连接
userOffLine(ctx);
}
}
super.userEventTriggered(ctx, evt);
}
// 处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("异常发生,异常消息 ={}", cause.getMessage());
ctx.channel().close();
}
// 读取客户端发送的请求报文
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息:" + msg.text());
}
}
ip读取
package com.zs.websocket.netty;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
import java.net.InetSocketAddress;
public class HttpHeadersHandler extends ChannelInboundHandlerAdapter {
private AttributeKey<String> key = AttributeKey.valueOf("Id");
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
HttpHeaders headers = ((FullHttpRequest) msg).headers();
String ip = headers.get("X-Real-IP");
if (StringUtils.isEmpty(ip)) {//如果没经过nginx,就直接获取远端地址
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
ip = address.getAddress().getHostAddress();
}
NettyUtil.setAttr(ctx.channel(), NettyUtil.IP, ip);
}
ctx.fireChannelRead(msg);
}
}
工具
package com.zs.websocket.netty;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
/**
* Description: netty工具类
*/
public class NettyUtil {
public static AttributeKey<String> IP = AttributeKey.valueOf("ip");
public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {
Attribute<T> attr = channel.attr(attributeKey);
attr.set(data);
}
public static <T> T getAttr(Channel channel, AttributeKey<T> ip) {
return channel.attr(ip).get();
}
}