Introduce a connection pool for Mojang's session servers.

This has the potential to cut the time that players spend at the
"logging in..." (or "encrypting..." for 1.13+) screen by a fair
amount (gains of 200+ ms were noted for my own home connection).

While this sounds minor, I really do like to aim for all the details
and this is one of them.
This commit is contained in:
Andrew Steinborn 2018-08-03 02:25:57 -04:00
parent 0c481d828d
commit 7eea1a3ac6
2 changed files with 65 additions and 37 deletions

View File

@ -1,21 +1,50 @@
package com.velocitypowered.proxy.connection.http;
import com.velocitypowered.proxy.VelocityServer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.pool.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import javax.net.ssl.SSLEngine;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
public class NettyHttpClient {
private final VelocityServer server;
private final ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap;
public NettyHttpClient(VelocityServer server) {
this.server = server;
Bootstrap bootstrap = server.initializeGenericBootstrap();
this.poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() {
@Override
protected SimpleChannelPool newPool(InetSocketAddress key) {
return new FixedChannelPool(bootstrap.remoteAddress(key), new ChannelPoolHandler() {
@Override
public void channelReleased(Channel channel) throws Exception {
channel.pipeline().remove("collector");
}
@Override
public void channelAcquired(Channel channel) throws Exception {
System.out.println("ACQUIRED");
}
@Override
public void channelCreated(Channel channel) throws Exception {
if (key.getPort() == 443) {
SslContext context = SslContextBuilder.forClient().build();
SSLEngine engine = context.newEngine(channel.alloc());
channel.pipeline().addLast("ssl", new SslHandler(engine));
}
channel.pipeline().addLast("http", new HttpClientCodec());
}
}, 8);
}
};
}
public CompletableFuture<String> get(URL url) {
@ -27,35 +56,25 @@ public class NettyHttpClient {
}
CompletableFuture<String> reply = new CompletableFuture<>();
server.initializeGenericBootstrap()
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
if (ssl) {
SslContext context = SslContextBuilder.forClient().build();
SSLEngine engine = context.newEngine(ch.alloc());
ch.pipeline().addLast(new SslHandler(engine));
}
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url.getPath() + "?" + url.getQuery());
request.headers().add(HttpHeaderNames.HOST, url.getHost());
request.headers().add(HttpHeaderNames.USER_AGENT, "Velocity");
ctx.writeAndFlush(request);
}
InetSocketAddress address = new InetSocketAddress(host, port);
poolMap.get(address)
.acquire()
.addListener(future -> {
if (future.isSuccess()) {
Channel channel = (Channel) future.getNow();
channel.pipeline().addLast("collector", new SimpleHttpResponseCollector(reply));
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url.getPath() + "?" + url.getQuery());
request.headers().add(HttpHeaderNames.HOST, url.getHost());
request.headers().add(HttpHeaderNames.USER_AGENT, "Velocity");
channel.writeAndFlush(request);
reply.whenComplete((resp, err) -> {
// Make sure to release this connection
poolMap.get(address).release(channel, channel.voidPromise());
});
ch.pipeline().addLast(new SimpleHttpResponseCollector(reply));
}
})
.connect(host, port)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
reply.completeExceptionally(future.cause());
}
} else {
reply.completeExceptionally(future.cause());
}
});
return reply;

View File

@ -2,10 +2,7 @@ package com.velocitypowered.proxy.connection.http;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.*;
import io.netty.util.ReferenceCountUtil;
import java.nio.charset.StandardCharsets;
@ -14,6 +11,7 @@ import java.util.concurrent.CompletableFuture;
class SimpleHttpResponseCollector extends ChannelInboundHandlerAdapter {
private final StringBuilder buffer = new StringBuilder(1024);
private final CompletableFuture<String> reply;
private boolean canKeepAlive;
SimpleHttpResponseCollector(CompletableFuture<String> reply) {
this.reply = reply;
@ -23,18 +21,23 @@ class SimpleHttpResponseCollector extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (msg instanceof HttpResponse) {
HttpResponseStatus status = ((HttpResponse) msg).status();
HttpResponse response = (HttpResponse) msg;
HttpResponseStatus status = response.status();
if (status != HttpResponseStatus.OK) {
ctx.close();
reply.completeExceptionally(new RuntimeException("Unexpected status code " + status.code()));
return;
}
this.canKeepAlive = HttpUtil.isKeepAlive(response);
}
if (msg instanceof HttpContent) {
buffer.append(((HttpContent) msg).content().toString(StandardCharsets.UTF_8));
if (msg instanceof LastHttpContent) {
ctx.close();
if (!canKeepAlive) {
ctx.close();
}
reply.complete(buffer.toString());
}
}
@ -42,4 +45,10 @@ class SimpleHttpResponseCollector extends ChannelInboundHandlerAdapter {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
reply.completeExceptionally(cause);
}
}