高并发 netty
问题
现在我们使用应用程序或者类库来实现系统之间的通信,比如我们使用一个HTTP客户端从 web 服务器上获取信息,使用 RPC 执行远程方法调用。。
然而有时候一个通用的协议和它的实现并没有覆盖一些场景。比如我们无法使用一个通用的 HTTP 服务器来实现长连接、处理大文件等。
使用netty主要解决了http长连接的问题。
解决方案
Netty 提供异步事件驱动的网络应用框架,可以快速开发高性能、高可靠的网络服务器和客户端程序。
指南
目前 Netty 最新的版本是 Netty5,需要 JDK1.6 以上。
ECHO 服务
实现 ECHO 协议,唯一要做的就是将接收到的数据原样返回。让我们从处理器的实现开始,处理器是由 Netty 生成用来处理 I/O 事件的。
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class EchoServerHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 将接收到的数据原样返回
ctx.write(msg);
// ctx.write(Object)方法不会使消息写入到通道上,他被缓冲在了内部,你需要调用ctx.flush()方法来把缓冲区中数据强行输出
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
编写好处理器,那么需要启动服务:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
private int port;
public EchoServer(int port) {
this.port = port;
}
public void run () throws InterruptedException {
// NioEventLoopGroup 是用来处理I/O操作的多线程事件循环器
// Netty提供了许多不同的EventLoopGroup的实现用来处理不同传输协议
// boss 用来接收进来的连接,并将接收的连接注册到 worker 中
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 用来处理已经被接收的连接
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
// option 提供给NioServerSocketChannel用来接收进来的连接
.option(ChannelOption.SO_BACKLOG, 128)
// childOption 是对父管道ServerChannel接收到的连接的配置
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 9797;
}
new EchoServer(port).run();
}
}
HTTP 服务器
一般的HTTP服务器不支持长连接,也就是一直复用建立好的连接,如 springboot。
使用 netty 框架可以搭建支持长连接的HTTP服务:
private HttpRequest request;
/** Buffer that stores the response content */
private final StringBuilder buf = new StringBuilder();
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest request = this.request = (HttpRequest) msg;
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
}
buf.setLength(0);
buf.append("WELCOME TO THE WILD WILD WEB SERVER\r\n");
buf.append("===================================\r\n");
buf.append("VERSION: ").append(request.protocolVersion()).append("\r\n");
buf.append("HOSTNAME: ").append(request.headers().get(HttpHeaderNames.HOST, "unknown")).append("\r\n");
buf.append("REQUEST_URI: ").append(request.uri()).append("\r\n\r\n");
// 读取请求头
HttpHeaders headers = request.headers();
if (!headers.isEmpty()) {
for (Map.Entry<String, String> h: headers) {
CharSequence key = h.getKey();
CharSequence value = h.getValue();
buf.append("HEADER: ").append(key).append(" = ").append(value).append("\r\n");
}
buf.append("\r\n");
}
// 读取请求参数
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri());
Map<String, List<String>> params = queryStringDecoder.parameters();
if (!params.isEmpty()) {
for (Entry<String, List<String>> p: params.entrySet()) {
String key = p.getKey();
List<String> vals = p.getValue();
for (String val : vals) {
buf.append("PARAM: ").append(key).append(" = ").append(val).append("\r\n");
}
}
buf.append("\r\n");
}
appendDecoderResult(buf, request);
}
if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.content();
if (content.isReadable()) {
buf.append("CONTENT: ");
buf.append(content.toString(CharsetUtil.UTF_8));
buf.append("\r\n");
appendDecoderResult(buf, request);
}
if (msg instanceof LastHttpContent) {
buf.append("END OF CONTENT\r\n");
LastHttpContent trailer = (LastHttpContent) msg;
if (!trailer.trailingHeaders().isEmpty()) {
buf.append("\r\n");
for (CharSequence name: trailer.trailingHeaders().names()) {
for (CharSequence value: trailer.trailingHeaders().getAll(name)) {
buf.append("TRAILING HEADER: ");
buf.append(name).append(" = ").append(value).append("\r\n");
}
}
buf.append("\r\n");
}
if (!writeResponse(trailer, ctx)) {
// If keep-alive is off, close the connection once the content is fully written.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}
}
private static void appendDecoderResult(StringBuilder buf, HttpObject o) {
DecoderResult result = o.decoderResult();
if (result.isSuccess()) {
return;
}
buf.append(".. WITH DECODER FAILURE: ");
buf.append(result.cause());
buf.append("\r\n");
}
private boolean writeResponse(HttpObject currentObj, ChannelHandlerContext ctx) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, currentObj.decoderResult().isSuccess()? OK : BAD_REQUEST,
Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Encode the cookie.
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
for (Cookie cookie: cookies) {
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode(cookie));
}
}
} else {
// Browser sent no cookie. Add some.
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key1", "value1"));
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
}
// Write the response.
ctx.write(response);
return keepAlive;
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, CONTINUE);
ctx.write(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
以上是请求处理器。在处理请求时,需要加入一些其他的处理:
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslContext;
public class HttpSnoopServerInitializer extends ChannelInitializer<SocketChannel> {
private final SslContext sslCtx;
public HttpSnoopServerInitializer(SslContext sslCtx) {
this.sslCtx = sslCtx;
}
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
//p.addLast(new HttpContentCompressor());
p.addLast(new HttpSnoopServerHandler());
}
}
启动服务器:
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HttpSnoopServerInitializer(sslCtx));
Channel ch = b.bind(PORT).sync().channel();
System.err.println("Open your web browser and navigate to http://127.0.0.1:" + PORT + '/');
ch.closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
local_offer #java
navigate_before javascript 基础 navigate_next flex 布局