From ee34b94e8d33a76135afca40f9331fc1868e2300 Mon Sep 17 00:00:00 2001 From: md_5 <md_5@live.com.au> Date: Tue, 2 Jul 2013 09:06:29 +1000 Subject: [PATCH] Netty diff --git a/pom.xml b/pom.xml index d1cc48d..69e596b 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,21 @@ <artifactId>trove4j</artifactId> <version>3.0.3</version> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + <version>4.0.0.CR9</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + <version>4.0.0.CR9</version> + </dependency> + <dependency> + <groupId>org.javassist</groupId> + <artifactId>javassist</artifactId> + <version>3.18.0-GA</version> + </dependency> </dependencies> <!-- This builds a completely 'ready to start' jar with all dependencies inside --> diff --git a/src/main/java/net/minecraft/server/DedicatedServer.java b/src/main/java/net/minecraft/server/DedicatedServer.java index ddb32ac..ec3d104 100644 --- a/src/main/java/net/minecraft/server/DedicatedServer.java +++ b/src/main/java/net/minecraft/server/DedicatedServer.java @@ -100,7 +100,11 @@ public class DedicatedServer extends MinecraftServer implements IMinecraftServer this.getLogger().info("Starting Minecraft server on " + (this.getServerIp().length() == 0 ? "*" : this.getServerIp()) + ":" + this.I()); try { - this.s = new DedicatedServerConnection(this, inetaddress, this.I()); + // Spigot start + this.s = ( org.spigotmc.SpigotConfig.listeners.get( 0 ).netty ) + ? new org.spigotmc.netty.NettyServerConnection( this, inetaddress, this.I() ) + : new DedicatedServerConnection( this, inetaddress, this.I() ); + // Spigot end } catch (Throwable ioexception) { // CraftBukkit - IOException -> Throwable this.getLogger().warning("**** FAILED TO BIND TO PORT!"); this.getLogger().warning("The exception was: {0}", new Object[] { ioexception.toString()}); diff --git a/src/main/java/net/minecraft/server/INetworkManager.java b/src/main/java/net/minecraft/server/INetworkManager.java new file mode 100644 index 0000000..6fcc5d7 --- /dev/null +++ b/src/main/java/net/minecraft/server/INetworkManager.java @@ -0,0 +1,26 @@ +package net.minecraft.server; + +import java.net.SocketAddress; + +public interface INetworkManager { + + void a(Connection connection); + + void queue(Packet packet); + + void a(); + + void b(); + + SocketAddress getSocketAddress(); + + void d(); + + int e(); + + void a(String s, Object... aobject); + + java.net.Socket getSocket(); // Spigot + + void setSocketAddress(java.net.SocketAddress address); // Spigot +} diff --git a/src/main/java/net/minecraft/server/NetworkManager.java b/src/main/java/net/minecraft/server/NetworkManager.java index c60abf1..e1b259b 100644 --- a/src/main/java/net/minecraft/server/NetworkManager.java +++ b/src/main/java/net/minecraft/server/NetworkManager.java @@ -25,7 +25,7 @@ public class NetworkManager implements INetworkManager { private final Object h = new Object(); private final IConsoleLogManager i; public Socket socket; // CraftBukkit - private -> public - private final SocketAddress k; + private SocketAddress k; // Spigot - remove final private volatile DataInputStream input; private volatile DataOutputStream output; private volatile boolean n = true; @@ -370,4 +370,6 @@ public class NetworkManager implements INetworkManager { static Thread h(NetworkManager networkmanager) { return networkmanager.u; } + + public void setSocketAddress(SocketAddress address) { k = address; } // Spigot } diff --git a/src/main/java/net/minecraft/server/PendingConnection.java b/src/main/java/net/minecraft/server/PendingConnection.java index 043321d..8b66470 100644 --- a/src/main/java/net/minecraft/server/PendingConnection.java +++ b/src/main/java/net/minecraft/server/PendingConnection.java @@ -16,7 +16,7 @@ public class PendingConnection extends Connection { private static Random random = new Random(); private byte[] d; private final MinecraftServer server; - public final NetworkManager networkManager; + public final INetworkManager networkManager; public boolean b; private int f; private String g; @@ -26,10 +26,15 @@ public class PendingConnection extends Connection { private SecretKey k; public String hostname = ""; // CraftBukkit - add field + public PendingConnection(MinecraftServer minecraftserver, org.spigotmc.netty.NettyNetworkManager networkManager) { + this.server = minecraftserver; + this.networkManager = networkManager; + } + public PendingConnection(MinecraftServer minecraftserver, Socket socket, String s) throws java.io.IOException { // CraftBukkit - throws IOException this.server = minecraftserver; this.networkManager = new NetworkManager(minecraftserver.getLogger(), socket, s, this, minecraftserver.H().getPrivate()); - this.networkManager.e = 0; + // this.networkManager.e = 0; } // CraftBukkit start @@ -146,7 +151,7 @@ public class PendingConnection extends Connection { String s = null; // CraftBukkit org.bukkit.event.server.ServerListPingEvent pingEvent = org.bukkit.craftbukkit.event.CraftEventFactory.callServerListPingEvent(this.server.server, getSocket().getInetAddress(), this.server.getMotd(), playerlist.getPlayerCount(), playerlist.getMaxPlayers()); - if (packet254getinfo.d()) { + if (false) { // Spigot: TODO: Use trick from Bungee maybe? // CraftBukkit s = pingEvent.getMotd() + "\u00A7" + playerlist.getPlayerCount() + "\u00A7" + pingEvent.getMaxPlayers(); } else { @@ -175,9 +180,18 @@ public class PendingConnection extends Connection { this.networkManager.queue(new Packet255KickDisconnect(s)); this.networkManager.d(); - if (inetaddress != null && this.server.ag() instanceof DedicatedServerConnection) { - ((DedicatedServerConnection) this.server.ag()).a(inetaddress); + // Spigot start + if ( inetaddress != null ) + { + if ( this.server.ag() instanceof DedicatedServerConnection ) + { + ((DedicatedServerConnection) this.server.ag()).a(inetaddress); + } else + { + ((org.spigotmc.netty.NettyServerConnection)this.server.ag()).unThrottle( inetaddress ); + } } + // Spigot end this.b = true; } catch (Exception exception) { diff --git a/src/main/java/org/spigotmc/SpigotConfig.java b/src/main/java/org/spigotmc/SpigotConfig.java index a0a7790..8efdca6 100644 --- a/src/main/java/org/spigotmc/SpigotConfig.java +++ b/src/main/java/org/spigotmc/SpigotConfig.java @@ -6,6 +6,8 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -148,4 +150,62 @@ public class SpigotConfig commands.put( "restart", new RestartCommand( "restart" ) ); WatchdogThread.doStart( timeoutTime, restartOnCrash ); } + + public static class Listener + { + + public String host; + public int port; + public boolean netty; + public long connectionThrottle; + + public Listener(String host, int port, boolean netty, long connectionThrottle) + { + this.host = host; + this.port = port; + this.netty = netty; + this.connectionThrottle = connectionThrottle; + } + } + public static List<Listener> listeners = new ArrayList<Listener>(); + public static int nettyThreads; + private static void listeners() + { + listeners.clear(); // We don't rebuild listeners on reload but we should clear them out! + + Map<String, Object> def = new HashMap<String, Object>(); + def.put( "host", "default" ); + def.put( "port", "default" ); + def.put( "netty", true ); + // def.put( "throttle", "default" ); + + config.addDefault( "listeners", Collections.singletonList( def ) ); + for ( Map<String, Object> info : (List<Map<String, Object>>) config.getList( "listeners" ) ) + { + String host = (String) info.get( "host" ); + if ( "default".equals( host ) ) + { + host = Bukkit.getIp(); + } else + { + throw new IllegalArgumentException( "Can only bind listener to default! Configure it in server.properties" ); + } + int port ; + + if (info.get( "port" ) instanceof Integer){ + throw new IllegalArgumentException( "Can only bind port to default! Configure it in server.properties"); + } else{ + port = Bukkit.getPort(); + } + boolean netty = (Boolean) info.get( "netty" ); + // long connectionThrottle = ( info.get( "throttle" ) instanceof Number ) ? ( (Number) info.get( "throttle" ) ).longValue() : Bukkit.getConnectionThrottle(); + listeners.add( new Listener( host, port, netty, Bukkit.getConnectionThrottle() ) ); + } + if ( listeners.size() != 1 ) + { + throw new IllegalArgumentException( "May only have one listener!" ); + } + + nettyThreads = getInt( "settings.netty-threads", 3 ); + } } diff --git a/src/main/java/org/spigotmc/netty/CipherBase.java b/src/main/java/org/spigotmc/netty/CipherBase.java new file mode 100644 index 0000000..c4306f7 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherBase.java @@ -0,0 +1,73 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import javax.crypto.Cipher; +import javax.crypto.ShortBufferException; + +/** + * Class to expose an + * {@link #cipher(io.netty.buffer.ByteBuf, io.netty.buffer.ByteBuf)} method to + * aid in the efficient passing of ByteBuffers through a cipher. + */ +class CipherBase +{ + + private final Cipher cipher; + private ThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal(); + private ThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal(); + + private static class EmptyByteThreadLocal extends ThreadLocal<byte[]> + { + + @Override + protected byte[] initialValue() + { + return new byte[ 0 ]; + } + } + + protected CipherBase(Cipher cipher) + { + this.cipher = cipher; + } + + private byte[] bufToByte(ByteBuf in) + { + byte[] heapIn = heapInLocal.get(); + int readableBytes = in.readableBytes(); + if ( heapIn.length < readableBytes ) + { + heapIn = new byte[ readableBytes ]; + heapInLocal.set( heapIn ); + } + in.readBytes( heapIn, 0, readableBytes ); + return heapIn; + } + + protected ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws ShortBufferException + { + int readableBytes = in.readableBytes(); + byte[] heapIn = bufToByte( in ); + + ByteBuf heapOut = ctx.alloc().heapBuffer( cipher.getOutputSize( readableBytes ) ); + heapOut.writerIndex( cipher.update( heapIn, 0, readableBytes, heapOut.array(), heapOut.arrayOffset() ) ); + + return heapOut; + } + + protected void cipher(ByteBuf in, ByteBuf out) throws ShortBufferException + { + int readableBytes = in.readableBytes(); + byte[] heapIn = bufToByte( in ); + + byte[] heapOut = heapOutLocal.get(); + int outputSize = cipher.getOutputSize( readableBytes ); + if ( heapOut.length < outputSize ) + { + heapOut = new byte[ outputSize ]; + heapOutLocal.set( heapOut ); + } + out.writeBytes( heapOut, 0, cipher.update( heapIn, 0, readableBytes, heapOut ) ); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherDecoder.java b/src/main/java/org/spigotmc/netty/CipherDecoder.java new file mode 100644 index 0000000..a1094d2 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherDecoder.java @@ -0,0 +1,24 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.MessageList; +import io.netty.handler.codec.MessageToMessageDecoder; +import javax.crypto.Cipher; + +public class CipherDecoder extends MessageToMessageDecoder<ByteBuf> +{ + + private final CipherBase cipher; + + public CipherDecoder(Cipher cipher) + { + this.cipher = new CipherBase( cipher ); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf msg, MessageList<Object> out) throws Exception + { + out.add( cipher.cipher( ctx, msg ) ); + } +} diff --git a/src/main/java/org/spigotmc/netty/CipherEncoder.java b/src/main/java/org/spigotmc/netty/CipherEncoder.java new file mode 100644 index 0000000..2eb1dcb --- /dev/null +++ b/src/main/java/org/spigotmc/netty/CipherEncoder.java @@ -0,0 +1,23 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import javax.crypto.Cipher; + +public class CipherEncoder extends MessageToByteEncoder<ByteBuf> +{ + + private final CipherBase cipher; + + public CipherEncoder(Cipher cipher) + { + this.cipher = new CipherBase( cipher ); + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception + { + cipher.cipher( in, out ); + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyNetworkManager.java b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java new file mode 100644 index 0000000..d501d8c --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyNetworkManager.java @@ -0,0 +1,312 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.MessageList; +import io.netty.channel.socket.SocketChannel; +import java.net.Socket; +import java.net.SocketAddress; +import java.security.PrivateKey; +import java.util.AbstractList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet252KeyResponse; +import net.minecraft.server.Packet255KickDisconnect; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.PlayerConnection; + +/** + * This class forms the basis of the Netty integration. It implements + * {@link INetworkManager} and handles all events and inbound messages provided + * by the upstream Netty process. + */ +public class NettyNetworkManager extends ChannelInboundHandlerAdapter implements INetworkManager +{ + + private static final ExecutorService threadPool = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat( "Async Packet Handler - %1$d" ).build() ); + private static final MinecraftServer server = MinecraftServer.getServer(); + private static final PrivateKey key = server.H().getPrivate(); + private static final NettyServerConnection serverConnection = (NettyServerConnection) server.ag(); + /*========================================================================*/ + private final Queue<Packet> syncPackets = new ConcurrentLinkedQueue<Packet>(); + private final List<Packet> highPriorityQueue = new AbstractList<Packet>() + { + @Override + public void add(int index, Packet element) + { + // NOP + } + + @Override + public Packet get(int index) + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return 0; + } + }; + private volatile boolean connected; + private Channel channel; + private SocketAddress address; + Connection connection; + private SecretKey secret; + private String dcReason; + private Object[] dcArgs; + private Socket socketAdaptor; + private long writtenBytes; + private PacketWriter writer; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception + { + // Channel and address groundwork first + channel = ctx.channel(); + address = channel.remoteAddress(); + // Then the socket adaptor + socketAdaptor = NettySocketAdaptor.adapt( (SocketChannel) channel ); + // Followed by their first handler + connection = new PendingConnection( server, this ); + writer = new PacketWriter(); + // Finally register the connection + connected = true; + serverConnection.register( (PendingConnection) connection ); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + a( "disconnect.endOfStream", new Object[ 0 ] ); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception + { + // TODO: Remove this once we are more stable + // Bukkit.getServer().getLogger().severe("======================= Start Netty Debug Log ======================="); + // Bukkit.getServer().getLogger().log(Level.SEVERE, "Error caught whilst handling " + channel, cause); + // Bukkit.getServer().getLogger().severe("======================= End Netty Debug Log ======================="); + // Disconnect with generic reason + exception + a( "disconnect.genericReason", new Object[] + { + "Internal exception: " + cause + } ); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageList<Object> msgs) throws Exception + { + MessageList<Packet> packets = msgs.cast(); + for ( final Packet msg : packets ) + { + if ( connected ) + { + if ( msg instanceof Packet252KeyResponse ) + { + secret = ( (Packet252KeyResponse) msg ).a( key ); + Cipher decrypt = NettyServerConnection.getCipher( Cipher.DECRYPT_MODE, secret ); + channel.pipeline().addBefore( "decoder", "decrypt", new CipherDecoder( decrypt ) ); + } + + if ( msg.a_() ) + { + threadPool.submit( new Runnable() + { + public void run() + { + Packet packet = PacketListener.callReceived( NettyNetworkManager.this, connection, msg ); + if ( packet != null ) + { + packet.handle( connection ); + } + } + } ); + } else + { + syncPackets.add( msg ); + } + } + } + } + + public Socket getSocket() + { + return socketAdaptor; + } + + /** + * setHandler. Set the {@link NetHandler} used to process received packets. + * + * @param nh the new {@link NetHandler} instance + */ + public void a(Connection nh) + { + connection = nh; + } + + /** + * queue. Queue a packet for sending, or in this case send it to be write it + * straight to the channel. + * + * @param packet the packet to queue + */ + public void queue(final Packet packet) + { + // Only send if channel is still connected + if ( connected ) + { + // Process packet via handler + final Packet packet0 = PacketListener.callQueued( this, connection, packet ); + highPriorityQueue.add( packet0 ); + // If handler indicates packet send + if ( packet0 != null ) + { + if ( channel.eventLoop().inEventLoop() ) + { + queue0( packet0 ); + } else + { + channel.eventLoop().execute( new Runnable() + { + public void run() + { + queue0( packet0 ); + } + } ); + } + } + } + } + + private void queue0(Packet packet) + { + if ( packet instanceof Packet255KickDisconnect ) + { + writer.lastFlush = 0; + } + + writer.write( channel, this, packet ); + if ( packet instanceof Packet252KeyResponse ) + { + Cipher encrypt = NettyServerConnection.getCipher( Cipher.ENCRYPT_MODE, secret ); + channel.pipeline().addBefore( "decoder", "encrypt", new CipherEncoder( encrypt ) ); + } + } + + /** + * wakeThreads. In Vanilla this method will interrupt the network read and + * write threads, thus waking them. + */ + public void a() + { + } + + /** + * processPackets. Remove up to 1000 packets from the queue and process + * them. This method should only be called from the main server thread. + */ + public void b() + { + for ( int i = 1000; !syncPackets.isEmpty() && i >= 0; i-- ) + { + if ( connection instanceof PendingConnection ? ( (PendingConnection) connection ).b : ( (PlayerConnection) connection ).disconnected ) + { + syncPackets.clear(); + break; + } + + Packet packet = PacketListener.callReceived( this, connection, syncPackets.poll() ); + if ( packet != null ) + { + packet.handle( connection ); + } + } + + // Disconnect via the handler - this performs all plugin related cleanup + logging + if ( !connected && ( dcReason != null || dcArgs != null ) ) + { + connection.a( dcReason, dcArgs ); + } + } + + /** + * getSocketAddress. Return the remote address of the connected user. It is + * important that this method returns a value even after disconnect. + * + * @return the remote address of this connection + */ + public SocketAddress getSocketAddress() + { + return address; + } + + public void setSocketAddress(SocketAddress address) + { + this.address = address; + } + + /** + * close. Close and release all resources associated with this connection. + */ + public void d() + { + if ( connected ) + { + connected = false; + channel.close(); + } + } + + /** + * queueSize. Return the number of packets in the low priority queue. In a + * NIO environment this will always be 0. + * + * @return the size of the packet send queue + */ + public int e() + { + return 0; + } + + /** + * networkShutdown. Shuts down this connection, storing the reason and + * parameters, used to notify the current {@link Connection}. + * + * @param reason the main disconnect reason + * @param arguments additional disconnect arguments, for example, the + * exception which triggered the disconnect. + */ + public void a(String reason, Object... arguments) + { + if ( connected ) + { + dcReason = reason; + dcArgs = arguments; + d(); + } + } + + public long getWrittenBytes() + { + return writtenBytes; + } + + public void addWrittenBytes(int written) + { + writtenBytes += written; + } +} diff --git a/src/main/java/org/spigotmc/netty/NettyServerConnection.java b/src/main/java/org/spigotmc/netty/NettyServerConnection.java new file mode 100644 index 0000000..b29ca98 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettyServerConnection.java @@ -0,0 +1,170 @@ +package org.spigotmc.netty; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.security.GeneralSecurityException; +import java.security.Key; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.logging.Level; +import javax.crypto.Cipher; +import javax.crypto.spec.IvParameterSpec; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.PendingConnection; +import net.minecraft.server.ServerConnection; +import org.bukkit.Bukkit; +import org.spigotmc.SpigotConfig; + +/** + * This is the NettyServerConnection class. It implements + * {@link ServerConnection} and is the main interface between the Minecraft + * server and this NIO implementation. It handles starting, stopping and + * processing the Netty backend. + */ +public class NettyServerConnection extends ServerConnection +{ + + private final ChannelFuture socket; + private static EventLoopGroup group; + private final Map<InetAddress, Long> throttle = new HashMap<InetAddress, Long>(); + private final List<PendingConnection> pending = Collections.synchronizedList( new ArrayList<PendingConnection>() ); + + public void unThrottle(InetAddress address) + { + if ( address != null ) + { + synchronized ( throttle ) + { + throttle.remove( address ); + } + } + } + + public boolean throttle(InetAddress address) + { + long currentTime = System.currentTimeMillis(); + synchronized ( throttle ) + { + Long value = throttle.get( address ); + if ( value != null && !address.isLoopbackAddress() && currentTime - value < d().server.getConnectionThrottle() ) + { + throttle.put( address, currentTime ); + return true; + } + + throttle.put( address, currentTime ); + } + return false; + } + + public NettyServerConnection(final MinecraftServer ms, InetAddress host, int port) + { + super( ms ); + if ( group == null ) + { + group = new NioEventLoopGroup( SpigotConfig.nettyThreads, new ThreadFactoryBuilder().setNameFormat( "Netty IO Thread - %1$d" ).build() ); + } + + socket = new ServerBootstrap().channel( NioServerSocketChannel.class ).childHandler( new ChannelInitializer() + { + @Override + public void initChannel(Channel ch) throws Exception + { + // Check the throttle + if ( throttle( ( (InetSocketAddress) ch.remoteAddress() ).getAddress() ) ) + { + ch.close(); + return; + } + // Set IP_TOS + try + { + ch.config().setOption( ChannelOption.IP_TOS, 0x18 ); + } catch ( ChannelException ex ) + { + // IP_TOS is not supported (Windows XP / Windows Server 2003) + } + + NettyNetworkManager networkManager = new NettyNetworkManager(); + ch.pipeline() + .addLast( "timer", new ReadTimeoutHandler( 30 ) ) + .addLast( "decoder", new PacketDecoder() ) + .addLast( "manager", networkManager ); + } + } ).childOption( ChannelOption.TCP_NODELAY, false ).group( group ).localAddress( host, port ).bind().syncUninterruptibly(); + } + + /** + * Shutdown. This method is called when the server is shutting down and the + * server socket and all clients should be terminated with no further + * action. + */ + @Override + public void a() + { + socket.channel().close().syncUninterruptibly(); + } + + @Override + public void b() + { + super.b(); // pulse PlayerConnections + for ( int i = 0; i < pending.size(); ++i ) + { + PendingConnection connection = pending.get( i ); + + try + { + connection.c(); + } catch ( Exception ex ) + { + connection.disconnect( "Internal server error" ); + Bukkit.getServer().getLogger().log( Level.WARNING, "Failed to handle packet: " + ex, ex ); + } + + if ( connection.b ) + { + pending.remove( i-- ); + } + } + } + + public void register(PendingConnection conn) + { + pending.add( conn ); + } + + /** + * Return a Minecraft compatible cipher instance from the specified key. + * + * @param opMode the mode to initialize the cipher in + * @param key to use as the initial vector + * @return the initialized cipher + */ + public static Cipher getCipher(int opMode, Key key) + { + try + { + Cipher cip = Cipher.getInstance( "AES/CFB8/NoPadding" ); + cip.init( opMode, key, new IvParameterSpec( key.getEncoded() ) ); + return cip; + } catch ( GeneralSecurityException ex ) + { + throw new RuntimeException( ex ); + } + } +} diff --git a/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java new file mode 100644 index 0000000..5da8a59 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/NettySocketAdaptor.java @@ -0,0 +1,294 @@ +package org.spigotmc.netty; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.SocketChannel; + +/** + * This class wraps a Netty {@link Channel} in a {@link Socket}. It overrides + * all methods in {@link Socket} to ensure that calls are not mistakingly made + * to the unsupported super socket. All operations that can be sanely applied to + * a {@link Channel} are implemented here. Those which cannot will throw an + * {@link UnsupportedOperationException}. + */ +public class NettySocketAdaptor extends Socket +{ + + private final io.netty.channel.socket.SocketChannel ch; + + private NettySocketAdaptor(io.netty.channel.socket.SocketChannel ch) + { + this.ch = ch; + } + + public static NettySocketAdaptor adapt(io.netty.channel.socket.SocketChannel ch) + { + return new NettySocketAdaptor( ch ); + } + + @Override + public void bind(SocketAddress bindpoint) throws IOException + { + ch.bind( bindpoint ).syncUninterruptibly(); + } + + @Override + public synchronized void close() throws IOException + { + ch.close().syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint) throws IOException + { + ch.connect( endpoint ).syncUninterruptibly(); + } + + @Override + public void connect(SocketAddress endpoint, int timeout) throws IOException + { + ch.config().setConnectTimeoutMillis( timeout ); + ch.connect( endpoint ).syncUninterruptibly(); + } + + @Override + public boolean equals(Object obj) + { + return obj instanceof NettySocketAdaptor && ch.equals( ( (NettySocketAdaptor) obj ).ch ); + } + + @Override + public SocketChannel getChannel() + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public InetAddress getInetAddress() + { + return ch.remoteAddress().getAddress(); + } + + @Override + public InputStream getInputStream() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public boolean getKeepAlive() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_KEEPALIVE ); + } + + @Override + public InetAddress getLocalAddress() + { + return ch.localAddress().getAddress(); + } + + @Override + public int getLocalPort() + { + return ch.localAddress().getPort(); + } + + @Override + public SocketAddress getLocalSocketAddress() + { + return ch.localAddress(); + } + + @Override + public boolean getOOBInline() throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public OutputStream getOutputStream() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public int getPort() + { + return ch.remoteAddress().getPort(); + } + + @Override + public synchronized int getReceiveBufferSize() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_RCVBUF ); + } + + @Override + public SocketAddress getRemoteSocketAddress() + { + return ch.remoteAddress(); + } + + @Override + public boolean getReuseAddress() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_REUSEADDR ); + } + + @Override + public synchronized int getSendBufferSize() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_SNDBUF ); + } + + @Override + public int getSoLinger() throws SocketException + { + return ch.config().getOption( ChannelOption.SO_LINGER ); + } + + @Override + public synchronized int getSoTimeout() throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public boolean getTcpNoDelay() throws SocketException + { + return ch.config().getOption( ChannelOption.TCP_NODELAY ); + } + + @Override + public int getTrafficClass() throws SocketException + { + return ch.config().getOption( ChannelOption.IP_TOS ); + } + + @Override + public int hashCode() + { + return ch.hashCode(); + } + + @Override + public boolean isBound() + { + return ch.localAddress() != null; + } + + @Override + public boolean isClosed() + { + return !ch.isOpen(); + } + + @Override + public boolean isConnected() + { + return ch.isActive(); + } + + @Override + public boolean isInputShutdown() + { + return ch.isInputShutdown(); + } + + @Override + public boolean isOutputShutdown() + { + return ch.isOutputShutdown(); + } + + @Override + public void sendUrgentData(int data) throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setKeepAlive(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.SO_KEEPALIVE, on ); + } + + @Override + public void setOOBInline(boolean on) throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setPerformancePreferences(int connectionTime, int latency, int bandwidth) + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public synchronized void setReceiveBufferSize(int size) throws SocketException + { + ch.config().setOption( ChannelOption.SO_RCVBUF, size ); + } + + @Override + public void setReuseAddress(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.SO_REUSEADDR, on ); + } + + @Override + public synchronized void setSendBufferSize(int size) throws SocketException + { + ch.config().setOption( ChannelOption.SO_SNDBUF, size ); + } + + @Override + public void setSoLinger(boolean on, int linger) throws SocketException + { + ch.config().setOption( ChannelOption.SO_LINGER, linger ); + } + + @Override + public synchronized void setSoTimeout(int timeout) throws SocketException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void setTcpNoDelay(boolean on) throws SocketException + { + ch.config().setOption( ChannelOption.TCP_NODELAY, on ); + } + + @Override + public void setTrafficClass(int tc) throws SocketException + { + ch.config().setOption( ChannelOption.IP_TOS, tc ); + } + + @Override + public void shutdownInput() throws IOException + { + throw new UnsupportedOperationException( "Operation not supported on Channel wrapper." ); + } + + @Override + public void shutdownOutput() throws IOException + { + ch.shutdownOutput().syncUninterruptibly(); + } + + @Override + public String toString() + { + return ch.toString(); + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketDecoder.java b/src/main/java/org/spigotmc/netty/PacketDecoder.java new file mode 100644 index 0000000..f459ed1 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketDecoder.java @@ -0,0 +1,80 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.MessageList; +import io.netty.handler.codec.ReplayingDecoder; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import net.minecraft.server.MinecraftServer; +import net.minecraft.server.Packet; +import net.minecraft.server.Packet254GetInfo; + +/** + * Packet decoding class backed by a reusable {@link DataInputStream} which + * backs the input {@link ByteBuf}. Reads an unsigned byte packet header and + * then decodes the packet accordingly. + */ +public class PacketDecoder extends ReplayingDecoder<ReadState> +{ + + private DataInput input; + private Packet packet; + private boolean shutdown; + + public PacketDecoder() + { + super( ReadState.HEADER ); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageList<Object> out) throws Exception + { + if ( shutdown ) + { + in.readByte(); // Discard + return; + } + + if ( input == null ) + { + input = new ByteBufInputStream( in ); + } + + try + { + while ( true ) + { + switch ( state() ) + { + case HEADER: + int packetId = input.readUnsignedByte(); + packet = Packet.a( MinecraftServer.getServer().getLogger(), packetId ); + if ( packet == null ) + { + throw new IOException( "Bad packet id " + packetId ); + } + checkpoint( ReadState.DATA ); + case DATA: + packet.a( input ); + checkpoint( ReadState.HEADER ); + out.add( packet ); + if ( packet instanceof Packet254GetInfo ) + { + shutdown = true; + return; + } + packet = null; + break; + default: + throw new IllegalStateException(); + } + } + } catch ( EOFException ex ) + { + } + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketListener.java b/src/main/java/org/spigotmc/netty/PacketListener.java new file mode 100644 index 0000000..965ba12 --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketListener.java @@ -0,0 +1,112 @@ +package org.spigotmc.netty; + +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import net.minecraft.server.Connection; +import net.minecraft.server.INetworkManager; +import net.minecraft.server.Packet; +import org.bukkit.Bukkit; +import org.bukkit.plugin.Plugin; + +/** + * This class is used for plugins that wish to register to listen to incoming + * and outgoing packets. To use this class, simply create a new instance, + * override the methods you wish to use, and call + * {@link #register(org.spigotmc.netty.PacketListener, org.bukkit.plugin.Plugin)}. + */ +public class PacketListener +{ + + /** + * A mapping of all registered listeners and their owning plugins. + */ + private static final Map<PacketListener, Plugin> listeners = new HashMap<PacketListener, Plugin>(); + /** + * A baked list of all listeners, for efficiency sake. + */ + private static PacketListener[] baked = new PacketListener[ 0 ]; + + /** + * Used to register a handler for receiving notifications of packet + * activity. + * + * @param listener the listener to register + * @param plugin the plugin owning this listener + */ + public static synchronized void register(PacketListener listener, Plugin plugin) + { + Preconditions.checkNotNull( listener, "listener" ); + Preconditions.checkNotNull( plugin, "plugin" ); + Preconditions.checkState( !listeners.containsKey( listener ), "listener already registered" ); + + int size = listeners.size(); + Preconditions.checkState( baked.length == size ); + listeners.put( listener, plugin ); + baked = Arrays.copyOf( baked, size + 1 ); + baked[size] = listener; + } + + static Packet callReceived(INetworkManager networkManager, Connection connection, Packet packet) + { + for ( PacketListener listener : baked ) + { + try + { + packet = listener.packetReceived( networkManager, connection, packet ); + } catch ( Throwable t ) + { + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing receive hook for packet", t ); + } + } + return packet; + } + + static Packet callQueued(INetworkManager networkManager, Connection connection, Packet packet) + { + for ( PacketListener listener : baked ) + { + try + { + packet = listener.packetQueued( networkManager, connection, packet ); + } catch ( Throwable t ) + { + Bukkit.getServer().getLogger().log( Level.SEVERE, "Error whilst firing queued hook for packet", t ); + } + } + return packet; + } + + /** + * Called when a packet has been received and is about to be handled by the + * current {@link Connection}. The returned packet will be the packet passed + * on for handling, or in the case of null being returned, not handled at + * all. + * + * @param networkManager the NetworkManager receiving the packet + * @param connection the connection which will handle the packet + * @param packet the received packet + * @return the packet to be handled, or null to cancel + */ + public Packet packetReceived(INetworkManager networkManager, Connection connection, Packet packet) + { + return packet; + } + + /** + * Called when a packet is queued to be sent. The returned packet will be + * the packet sent. In the case of null being returned, the packet will not + * be sent. + * + * @param networkManager the NetworkManager which will send the packet + * @param connection the connection which queued the packet + * @param packet the queue packet + * @return the packet to be sent, or null if the packet will not be sent. + */ + public Packet packetQueued(INetworkManager networkManager, Connection connection, Packet packet) + { + return packet; + } +} diff --git a/src/main/java/org/spigotmc/netty/PacketWriter.java b/src/main/java/org/spigotmc/netty/PacketWriter.java new file mode 100644 index 0000000..50f59ed --- /dev/null +++ b/src/main/java/org/spigotmc/netty/PacketWriter.java @@ -0,0 +1,85 @@ +package org.spigotmc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.Channel; +import io.netty.handler.codec.EncoderException; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; +import net.minecraft.server.Packet; +import net.minecraft.server.PendingConnection; + +/** + * Netty encoder which takes a packet and encodes it, and adds a byte packet id + * header. + */ +public class PacketWriter +{ + + private static final int FLUSH_TIME = 1; + /*========================================================================*/ + long lastFlush; + private final Queue<Packet> queue = new ArrayDeque<Packet>( 64 ); + + void write(Channel channel, NettyNetworkManager networkManager, Packet msg) + { + // Append messages to queue + queue.add( msg ); + + // If we are not in the pending connect phase, and we have not reached our timer + if ( !( networkManager.connection instanceof PendingConnection ) && System.currentTimeMillis() - lastFlush < FLUSH_TIME ) + { + return; + } + // Update our last write time + lastFlush = System.currentTimeMillis(); + + // Since we are writing in batches it can be useful to guess the size of our output to limit memcpy + int estimatedSize = 0; + for ( Packet packet : queue ) + { + estimatedSize += packet.a(); + } + // Allocate an output buffer of estimated size + ByteBuf outBuf = channel.alloc().buffer( estimatedSize ); + // And a stream to which we can write this buffer to + DataOutput dataOut = new ByteBufOutputStream( outBuf ); + // If we aren't a success, we free the buf in the finally + boolean success = false; + + try + { + // Iterate through all packets, this is safe as we know we will only ever get packets in the pipeline + for ( Packet packet : queue ) + { + // Write packet ID + outBuf.writeByte( packet.n() ); + // Write packet data + try + { + packet.a( dataOut ); + } catch ( IOException ex ) + { + throw new EncoderException( ex ); + } + } + // Add to the courtesy API providing number of written bytes + networkManager.addWrittenBytes( outBuf.readableBytes() ); + // Let Netty handle any errors from here on + success = true; + // Write down our single ByteBuf + channel.write( outBuf ); + } finally + { + // Reset packet queue + queue.clear(); + // If Netty didn't handle the freeing because we didn't get there, we must + if ( !success ) + { + outBuf.release(); + } + } + } +} diff --git a/src/main/java/org/spigotmc/netty/ReadState.java b/src/main/java/org/spigotmc/netty/ReadState.java new file mode 100644 index 0000000..d3a9cab --- /dev/null +++ b/src/main/java/org/spigotmc/netty/ReadState.java @@ -0,0 +1,17 @@ +package org.spigotmc.netty; + +/** + * Stores the state of the packet currently being read. + */ +public enum ReadState +{ + + /** + * Indicates the byte representing the ID has been read. + */ + HEADER, + /** + * Shows the packet body is being read. + */ + DATA; +} -- 1.8.1.2