Refactor chunk compression to use its own thread pool in order to eliminate disconnect race conditions caused by Mojang's nasty disconnect routines.
This commit is contained in:
parent
fd83772302
commit
77fbb062d9
1 changed files with 68 additions and 116 deletions
|
@ -1,21 +1,25 @@
|
||||||
From 0db589c05b3eb0d213df3e7640818b77935c2e02 Mon Sep 17 00:00:00 2001
|
From d56db82cd2978a74349685a4a392da310f9822df Mon Sep 17 00:00:00 2001
|
||||||
From: md_5 <git@md-5.net>
|
From: md_5 <git@md-5.net>
|
||||||
Date: Tue, 28 Jan 2014 20:32:07 +1100
|
Date: Tue, 28 Jan 2014 20:32:07 +1100
|
||||||
Subject: [PATCH] Implement Threaded Bulk Chunk Compression and Caching
|
Subject: [PATCH] Implement Threaded Bulk Chunk Compression and Caching
|
||||||
|
|
||||||
|
|
||||||
diff --git a/src/main/java/net/minecraft/server/EntityPlayer.java b/src/main/java/net/minecraft/server/EntityPlayer.java
|
diff --git a/src/main/java/net/minecraft/server/EntityPlayer.java b/src/main/java/net/minecraft/server/EntityPlayer.java
|
||||||
index 9b853a9..a4c8843 100644
|
index 9b853a9..295b80d 100644
|
||||||
--- a/src/main/java/net/minecraft/server/EntityPlayer.java
|
--- a/src/main/java/net/minecraft/server/EntityPlayer.java
|
||||||
+++ b/src/main/java/net/minecraft/server/EntityPlayer.java
|
+++ b/src/main/java/net/minecraft/server/EntityPlayer.java
|
||||||
@@ -228,6 +228,7 @@ public class EntityPlayer extends EntityHuman implements ICrafting {
|
@@ -228,7 +228,10 @@ public class EntityPlayer extends EntityHuman implements ICrafting {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!arraylist.isEmpty()) {
|
if (!arraylist.isEmpty()) {
|
||||||
+ java.util.Collections.sort( arraylist, new PlayerChunkMap.ChunkComparator( this ) ); // Spigot - sort a final time before sending
|
- this.playerConnection.sendPacket(new PacketPlayOutMapChunkBulk(arraylist));
|
||||||
this.playerConnection.sendPacket(new PacketPlayOutMapChunkBulk(arraylist));
|
+ // Spigot Start
|
||||||
|
+ java.util.Collections.sort( arraylist, new PlayerChunkMap.ChunkComparator( this ) );
|
||||||
|
+ org.spigotmc.ChunkCompressor.sendAndCompress( this, new PacketPlayOutMapChunkBulk( arraylist ) );
|
||||||
|
+ // Spigot End
|
||||||
Iterator iterator2 = arraylist1.iterator();
|
Iterator iterator2 = arraylist1.iterator();
|
||||||
|
|
||||||
|
while (iterator2.hasNext()) {
|
||||||
diff --git a/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java b/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java
|
diff --git a/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java b/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java
|
||||||
index fc92026..484d727 100644
|
index fc92026..484d727 100644
|
||||||
--- a/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java
|
--- a/src/main/java/net/minecraft/server/PacketPlayOutMapChunkBulk.java
|
||||||
|
@ -66,142 +70,90 @@ index ae4ca63..962e902 100644
|
||||||
private int x;
|
private int x;
|
||||||
private int z;
|
private int z;
|
||||||
|
|
||||||
diff --git a/src/main/java/net/minecraft/server/ServerConnectionChannel.java b/src/main/java/net/minecraft/server/ServerConnectionChannel.java
|
|
||||||
index fb95be4..a382235 100644
|
|
||||||
--- a/src/main/java/net/minecraft/server/ServerConnectionChannel.java
|
|
||||||
+++ b/src/main/java/net/minecraft/server/ServerConnectionChannel.java
|
|
||||||
@@ -1,14 +1,25 @@
|
|
||||||
package net.minecraft.server;
|
|
||||||
|
|
||||||
+import com.google.common.util.concurrent.ThreadFactoryBuilder; // Spigot
|
|
||||||
import net.minecraft.util.io.netty.channel.Channel;
|
|
||||||
import net.minecraft.util.io.netty.channel.ChannelException;
|
|
||||||
import net.minecraft.util.io.netty.channel.ChannelInitializer;
|
|
||||||
import net.minecraft.util.io.netty.channel.ChannelOption;
|
|
||||||
import net.minecraft.util.io.netty.handler.timeout.ReadTimeoutHandler;
|
|
||||||
+// Spigot Start
|
|
||||||
+import net.minecraft.util.io.netty.util.concurrent.DefaultEventExecutorGroup;
|
|
||||||
+import net.minecraft.util.io.netty.util.concurrent.EventExecutorGroup;
|
|
||||||
+import org.spigotmc.ChunkCompressor;
|
|
||||||
+import org.spigotmc.SpigotConfig;
|
|
||||||
+// Spigot End
|
|
||||||
|
|
||||||
class ServerConnectionChannel extends ChannelInitializer {
|
|
||||||
|
|
||||||
final ServerConnection a;
|
|
||||||
+ // Spigot Start
|
|
||||||
+ private static final EventExecutorGroup threadPool = new DefaultEventExecutorGroup( SpigotConfig.compressionThreads, new ThreadFactoryBuilder().setNameFormat( "Chunk Compressor #%d" ).setDaemon( true ).build() );
|
|
||||||
+ private static final ChunkCompressor chunkCompressor = new ChunkCompressor();
|
|
||||||
+ // Spigot End
|
|
||||||
|
|
||||||
ServerConnectionChannel(ServerConnection serverconnection) {
|
|
||||||
this.a = serverconnection;
|
|
||||||
@@ -28,6 +39,7 @@ class ServerConnectionChannel extends ChannelInitializer {
|
|
||||||
}
|
|
||||||
|
|
||||||
channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyPingHandler(this.a)).addLast("splitter", new PacketSplitter()).addLast("decoder", new PacketDecoder()).addLast("prepender", new PacketPrepender()).addLast("encoder", new PacketEncoder());
|
|
||||||
+ channel.pipeline().addLast( threadPool, "compressor", chunkCompressor ); // Spigot
|
|
||||||
NetworkManager networkmanager = new NetworkManager(false);
|
|
||||||
|
|
||||||
ServerConnection.a(this.a).add(networkmanager);
|
|
||||||
diff --git a/src/main/java/org/spigotmc/ChunkCompressor.java b/src/main/java/org/spigotmc/ChunkCompressor.java
|
diff --git a/src/main/java/org/spigotmc/ChunkCompressor.java b/src/main/java/org/spigotmc/ChunkCompressor.java
|
||||||
new file mode 100644
|
new file mode 100644
|
||||||
index 0000000..481211b
|
index 0000000..d43e528
|
||||||
--- /dev/null
|
--- /dev/null
|
||||||
+++ b/src/main/java/org/spigotmc/ChunkCompressor.java
|
+++ b/src/main/java/org/spigotmc/ChunkCompressor.java
|
||||||
@@ -0,0 +1,94 @@
|
@@ -0,0 +1,80 @@
|
||||||
+package org.spigotmc;
|
+package org.spigotmc;
|
||||||
+
|
+
|
||||||
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
+import java.util.Arrays;
|
+import java.util.Arrays;
|
||||||
+import java.util.Iterator;
|
+import java.util.Iterator;
|
||||||
+import java.util.LinkedHashMap;
|
+import java.util.LinkedHashMap;
|
||||||
+import java.util.logging.Level;
|
+import java.util.logging.Level;
|
||||||
+import java.util.zip.CRC32;
|
+import java.util.zip.CRC32;
|
||||||
|
+import net.minecraft.server.EntityPlayer;
|
||||||
+import net.minecraft.server.PacketPlayOutMapChunkBulk;
|
+import net.minecraft.server.PacketPlayOutMapChunkBulk;
|
||||||
+import net.minecraft.util.io.netty.channel.ChannelHandler;
|
+import net.minecraft.util.io.netty.util.concurrent.DefaultEventExecutorGroup;
|
||||||
+import net.minecraft.util.io.netty.channel.ChannelHandlerContext;
|
+import net.minecraft.util.io.netty.util.concurrent.EventExecutorGroup;
|
||||||
+import net.minecraft.util.io.netty.channel.ChannelOutboundHandlerAdapter;
|
|
||||||
+import net.minecraft.util.io.netty.channel.ChannelPromise;
|
|
||||||
+import org.bukkit.Bukkit;
|
+import org.bukkit.Bukkit;
|
||||||
+
|
+
|
||||||
+@ChannelHandler.Sharable
|
+public class ChunkCompressor
|
||||||
+public class ChunkCompressor extends ChannelOutboundHandlerAdapter
|
|
||||||
+{
|
+{
|
||||||
+
|
+
|
||||||
+ private final LinkedHashMap<Long, byte[]> cache = new LinkedHashMap<Long, byte[]>( 16, 0.75f, true ); // Defaults, order by access
|
+ private static final EventExecutorGroup threadPool = new DefaultEventExecutorGroup( SpigotConfig.compressionThreads, new ThreadFactoryBuilder().setNameFormat( "Chunk Compressor #%d" ).setDaemon( true ).build() );
|
||||||
+ private volatile int cacheSize;
|
+ private static final LinkedHashMap<Long, byte[]> cache = new LinkedHashMap<Long, byte[]>( 16, 0.75f, true ); // Defaults, order by access
|
||||||
|
+ private static volatile int cacheSize;
|
||||||
+
|
+
|
||||||
+ @Override
|
+ public static void sendAndCompress(final EntityPlayer player, final PacketPlayOutMapChunkBulk chunk)
|
||||||
+ public synchronized void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
|
|
||||||
+ {
|
+ {
|
||||||
+ try
|
+ threadPool.submit( new Runnable()
|
||||||
+ {
|
+ {
|
||||||
+ long start = System.currentTimeMillis();
|
+
|
||||||
+ boolean cached = false;
|
+ @Override
|
||||||
+ if ( msg instanceof PacketPlayOutMapChunkBulk )
|
+ public void run()
|
||||||
+ {
|
+ {
|
||||||
+ PacketPlayOutMapChunkBulk chunk = (PacketPlayOutMapChunkBulk) msg;
|
+ try
|
||||||
+ // Well shit, orebfuscator is at it again
|
|
||||||
+ if ( chunk.inflatedBuffers == null )
|
|
||||||
+ {
|
+ {
|
||||||
+ Bukkit.getServer().getLogger().warning( "chunk.inflatedBuffers is null. Are you running Oreobfuscator? If so, please note that it is unsupported and will disable async compression" );
|
+ long start = System.currentTimeMillis();
|
||||||
+ super.write( ctx, msg, promise );
|
+ boolean cached = false;
|
||||||
+ }
|
+ // Well shit, orebfuscator is at it again
|
||||||
+
|
+ if ( chunk.inflatedBuffers == null )
|
||||||
+ // Here we assign a hash to the chunk based on its contents. CRC32 is fast and sufficient for use here.
|
|
||||||
+ CRC32 crc = new CRC32();
|
|
||||||
+ for ( byte[] c : chunk.inflatedBuffers )
|
|
||||||
+ {
|
|
||||||
+ crc.update( c );
|
|
||||||
+ }
|
|
||||||
+ long hash = crc.getValue();
|
|
||||||
+
|
|
||||||
+ byte[] deflated = cache.get( hash );
|
|
||||||
+ if ( deflated != null )
|
|
||||||
+ {
|
|
||||||
+ chunk.buffer = deflated;
|
|
||||||
+ chunk.size = deflated.length;
|
|
||||||
+ cached = true;
|
|
||||||
+ } else
|
|
||||||
+ {
|
|
||||||
+ chunk.compress(); // Compress the chunk
|
|
||||||
+ byte[] buffer = Arrays.copyOf( chunk.buffer, chunk.size ); // Resize the array to correct sizing
|
|
||||||
+
|
|
||||||
+ Iterator<byte[]> iter = cache.values().iterator(); // Grab a single iterator reference
|
|
||||||
+ // Whilst this next entry is too big for us, and we have stuff to remove
|
|
||||||
+ while ( cacheSize + buffer.length > org.spigotmc.SpigotConfig.chunkCacheBytes && iter.hasNext() )
|
|
||||||
+ {
|
+ {
|
||||||
+ cacheSize -= iter.next().length; // Update size table
|
+ Bukkit.getServer().getLogger().warning( "chunk.inflatedBuffers is null. Are you running Oreobfuscator? If so, please note that it is unsupported and will disable async compression" );
|
||||||
+ iter.remove(); // Remove it alltogether
|
+ player.playerConnection.sendPacket( chunk );
|
||||||
|
+ return;
|
||||||
+ }
|
+ }
|
||||||
+ cacheSize += buffer.length; // Update size table
|
+
|
||||||
+ cache.put( hash, buffer ); // Pop the new one in the cache
|
+ // Here we assign a hash to the chunk based on its contents. CRC32 is fast and sufficient for use here.
|
||||||
|
+ CRC32 crc = new CRC32();
|
||||||
|
+ for ( byte[] c : chunk.inflatedBuffers )
|
||||||
|
+ {
|
||||||
|
+ crc.update( c );
|
||||||
|
+ }
|
||||||
|
+ long hash = crc.getValue();
|
||||||
|
+
|
||||||
|
+ byte[] deflated = cache.get( hash );
|
||||||
|
+ if ( deflated != null )
|
||||||
|
+ {
|
||||||
|
+ chunk.buffer = deflated;
|
||||||
|
+ chunk.size = deflated.length;
|
||||||
|
+ cached = true;
|
||||||
|
+ } else
|
||||||
|
+ {
|
||||||
|
+ chunk.compress(); // Compress the chunk
|
||||||
|
+ byte[] buffer = Arrays.copyOf( chunk.buffer, chunk.size ); // Resize the array to correct sizing
|
||||||
|
+
|
||||||
|
+ Iterator<byte[]> iter = cache.values().iterator(); // Grab a single iterator reference
|
||||||
|
+ // Whilst this next entry is too big for us, and we have stuff to remove
|
||||||
|
+ while ( cacheSize + buffer.length > org.spigotmc.SpigotConfig.chunkCacheBytes && iter.hasNext() )
|
||||||
|
+ {
|
||||||
|
+ cacheSize -= iter.next().length; // Update size table
|
||||||
|
+ iter.remove(); // Remove it alltogether
|
||||||
|
+ }
|
||||||
|
+ cacheSize += buffer.length; // Update size table
|
||||||
|
+ cache.put( hash, buffer ); // Pop the new one in the cache
|
||||||
|
+ }
|
||||||
|
+ // System.out.println( "Time: " + ( System.currentTimeMillis() - start ) + " " + cached + " " + cacheSize );
|
||||||
|
+ player.playerConnection.sendPacket( chunk );
|
||||||
|
+ } catch ( Throwable t )
|
||||||
|
+ {
|
||||||
|
+ Bukkit.getServer().getLogger().log( Level.WARNING, "Error compressing or caching chunk", t );
|
||||||
+ }
|
+ }
|
||||||
+ // System.out.println( "Time: " + ( System.currentTimeMillis() - start ) + " " + cached + " " + cacheSize );
|
|
||||||
+ }
|
+ }
|
||||||
+ } catch ( Throwable t )
|
+ } );
|
||||||
+ {
|
|
||||||
+ Bukkit.getServer().getLogger().log( Level.WARNING, "Error compressing or caching chunk", t );
|
|
||||||
+ }
|
|
||||||
+
|
|
||||||
+ super.write( ctx, msg, promise );
|
|
||||||
+ }
|
|
||||||
+
|
|
||||||
+ @Override
|
|
||||||
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
|
|
||||||
+ {
|
|
||||||
+ // In short, there isn't actually anything wrong with the async chunk compressor, it just accidentally caused console logging of errors which were previously ignored.\
|
|
||||||
+ // This commit restores that behaviour
|
|
||||||
+
|
|
||||||
+ // You may be asking yourself why we are completely ignoring any errors which come this far down the pipeline.
|
|
||||||
+ // The answer is quite simple:
|
|
||||||
+ // Mojang did it
|
|
||||||
+ // The default Mojang pipeline doesn't have any ChannelOutboundHandlerAdapter or similar instances, and thus nothing to handle exceptionCaught
|
|
||||||
+ // So when a channel.write() or channel.flush() fails, the error message is actually just passed straight to the future provided.
|
|
||||||
+ // It is then subsequently discarded, the channel closed, and no one except the user was any the wiser it actually happened!
|
|
||||||
+ // Unfortunately for us, the default exceptionCaught in this class sends a blaring warning to the server admins indicating that it couldn't send a packet to a disconnected user!
|
|
||||||
+ // We don't care about these warnings, if we did something wrong to disconnect the user, it is already logged in the proper location, as are broken sockets
|
|
||||||
+ // tl;dr no need to blare warnings on each write to a broken socket
|
|
||||||
+ }
|
+ }
|
||||||
+}
|
+}
|
||||||
diff --git a/src/main/java/org/spigotmc/SpigotConfig.java b/src/main/java/org/spigotmc/SpigotConfig.java
|
diff --git a/src/main/java/org/spigotmc/SpigotConfig.java b/src/main/java/org/spigotmc/SpigotConfig.java
|
||||||
|
|
Loading…
Reference in a new issue