From 8a035561c669e1d04faf85189f85416a591cd1fc Mon Sep 17 00:00:00 2001 From: md_5 Date: Sat, 27 Apr 2013 22:13:33 +1000 Subject: [PATCH] [EXPERIMENTAL] Take #3 at an even better / faster buffering system, use at own peril! --- CraftBukkit-Patches/0020-Netty.patch | 79 +++++++++++++++++----------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/CraftBukkit-Patches/0020-Netty.patch b/CraftBukkit-Patches/0020-Netty.patch index ee0ba67d3..fd3f0f676 100644 --- a/CraftBukkit-Patches/0020-Netty.patch +++ b/CraftBukkit-Patches/0020-Netty.patch @@ -1,4 +1,4 @@ -From 73f2cb6cf3b18d0cca0ee1057d4563ebc46fbded Mon Sep 17 00:00:00 2001 +From f6adddd68afa44beb63b7da41c1ee732bb24d482 Mon Sep 17 00:00:00 2001 From: md_5 Date: Tue, 23 Apr 2013 11:47:32 +1000 Subject: [PATCH] Netty @@ -449,22 +449,29 @@ index 0000000..2dbbf6c +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 -index 0000000..0e1b1fd +index 0000000..f581384 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java -@@ -0,0 +1,253 @@ +@@ -0,0 +1,293 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; ++import io.netty.buffer.ByteBuf; ++import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; ++import io.netty.channel.ChannelFuture; ++import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.channel.socket.SocketChannel; ++import java.io.DataOutputStream; ++import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; +import java.util.AbstractList; ++import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; @@ -510,6 +517,7 @@ index 0000000..0e1b1fd + return 0; + } + }; ++ private final Queue realQueue = new ConcurrentLinkedQueue(); + private volatile boolean connected; + private Channel channel; + private SocketAddress address; @@ -519,6 +527,7 @@ index 0000000..0e1b1fd + private Object[] dcArgs; + private Socket socketAdaptor; + private long writtenBytes; ++ private long lastFlush; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { @@ -594,7 +603,7 @@ index 0000000..0e1b1fd + * + * @param packet the packet to queue + */ -+ public void queue(Packet packet) { ++ public synchronized void queue(Packet packet) { + // Only send if channel is still connected + if (connected) { + // Process packet via handler @@ -602,6 +611,7 @@ index 0000000..0e1b1fd + // If handler indicates packet send + if (packet != null) { + highPriorityQueue.add(packet); ++ realQueue.add(packet); + + // If needed, check and prepare encryption phase + // We don't send the packet here as it is sent just before the cipher handler has been added to ensure we can safeguard from any race conditions @@ -612,7 +622,37 @@ index 0000000..0e1b1fd + CipherCodec codec = new CipherCodec(encrypt, decrypt, (Packet252KeyResponse) packet); + channel.pipeline().addBefore("decoder", "cipher", codec); + } else { -+ channel.write(packet); ++ if (System.currentTimeMillis() - lastFlush > 10) { ++ lastFlush = System.currentTimeMillis(); ++ ++ int estimatedCapacity = 0; ++ for (Packet p : realQueue) { ++ estimatedCapacity += p.a(); ++ } ++ final ByteBuf buf = channel.alloc().buffer(estimatedCapacity); ++ DataOutputStream out = new DataOutputStream(new ByteBufOutputStream(buf)); ++ for (Packet p : realQueue) { ++ buf.writeByte(p.n()); ++ try { ++ p.a(out); ++ } catch (IOException ex) { ++ throw new RuntimeException("Writing packet", ex); ++ } ++ } ++ ++ channel.write(buf).addListener(new ChannelFutureListener() { ++ public void operationComplete(ChannelFuture future) throws Exception { ++ // TODO: Check on new netty release, can't take chances! ++ if (buf.refCnt() != 0) { ++ buf.release(); ++ } ++ if (buf.refCnt() != 0) { ++ throw new AssertionError("refCnt"); ++ } ++ } ++ }); ++ realQueue.clear(); ++ } + } + } + } @@ -708,10 +748,10 @@ index 0000000..0e1b1fd +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 -index 0000000..cb58bd2 +index 0000000..2a9aa0a --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java -@@ -0,0 +1,81 @@ +@@ -0,0 +1,80 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -756,7 +796,6 @@ index 0000000..cb58bd2 + + NettyNetworkManager networkManager = new NettyNetworkManager(); + ch.pipeline() -+ .addLast("flusher", new OutboundManager()) + .addLast("timer", new ReadTimeoutHandler(30)) + .addLast("decoder", new PacketDecoder()) + .addLast("encoder", new PacketEncoder(networkManager)) @@ -1047,30 +1086,6 @@ index 0000000..a3b86b8 + return ch.toString(); + } +} -diff --git a/src/main/java/org/spigotmc/netty/OutboundManager.java b/src/main/java/org/spigotmc/netty/OutboundManager.java -new file mode 100644 -index 0000000..80205ed ---- /dev/null -+++ b/src/main/java/org/spigotmc/netty/OutboundManager.java -@@ -0,0 +1,18 @@ -+package org.spigotmc.netty; -+ -+import io.netty.channel.ChannelHandlerContext; -+import io.netty.channel.ChannelOperationHandlerAdapter; -+import io.netty.channel.ChannelPromise; -+ -+public class OutboundManager extends ChannelOperationHandlerAdapter { -+ -+ private static final int FLUSH_TIME = 10; -+ private long lastFlush; -+ -+ public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { -+ if (System.currentTimeMillis() - lastFlush > FLUSH_TIME) { -+ lastFlush = System.currentTimeMillis(); -+ ctx.flush(promise); -+ } -+ } -+} diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java new file mode 100644 index 0000000..65074d2