325 lines
		
	
	
	
		
			16 KiB
			
		
	
	
	
		
			Diff
		
	
	
	
	
	
			
		
		
	
	
			325 lines
		
	
	
	
		
			16 KiB
			
		
	
	
	
		
			Diff
		
	
	
	
	
	
| From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
 | |
| From: Aikar <aikar@aikar.co>
 | |
| Date: Wed, 6 May 2020 04:53:35 -0400
 | |
| Subject: [PATCH] Optimize Network Manager and add advanced packet support
 | |
| 
 | |
| Adds ability for 1 packet to bundle other packets to follow it
 | |
| Adds ability for a packet to delay sending more packets until a state is ready.
 | |
| 
 | |
| Removes synchronization from sending packets
 | |
| Removes processing packet queue off of main thread
 | |
|   - for the few cases where it is allowed, order is not necessary nor
 | |
|     should it even be happening concurrently in first place (handshaking/login/status)
 | |
| 
 | |
| Ensures packets sent asynchronously are dispatched on main thread
 | |
| 
 | |
| This helps ensure safety for ProtocolLib as packet listeners
 | |
| are commonly accessing world state. This will allow you to schedule
 | |
| a packet to be sent async, but itll be dispatched sync for packet
 | |
| listeners to process.
 | |
| 
 | |
| This should solve some deadlock risks
 | |
| 
 | |
| Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
 | |
| 
 | |
| Also avoids spamming closed channel exception by rechecking closed state in dispatch
 | |
| and then catch exceptions and close if they fire.
 | |
| 
 | |
| Part of this commit was authored by: Spottedleaf
 | |
| 
 | |
| diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
 | |
| index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644
 | |
| --- a/src/main/java/net/minecraft/network/Connection.java
 | |
| +++ b/src/main/java/net/minecraft/network/Connection.java
 | |
| @@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
 | |
|      public int protocolVersion;
 | |
|      public java.net.InetSocketAddress virtualHost;
 | |
|      private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
 | |
| +    // Optimize network
 | |
| +    public boolean isPending = true;
 | |
| +    public boolean queueImmunity = false;
 | |
| +    public ConnectionProtocol protocol;
 | |
|      // Paper end
 | |
|  
 | |
|      public Connection(PacketFlow side) {
 | |
| @@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
 | |
|      }
 | |
|  
 | |
|      public void setProtocol(ConnectionProtocol state) {
 | |
| +        protocol = state; // Paper
 | |
|          this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).set(state);
 | |
|          this.channel.config().setAutoRead(true);
 | |
|          Connection.LOGGER.debug("Enabled auto read");
 | |
| @@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
 | |
|          Validate.notNull(listener, "packetListener", new Object[0]);
 | |
|          this.packetListener = listener;
 | |
|      }
 | |
| +    // Paper start
 | |
| +    public net.minecraft.server.level.ServerPlayer getPlayer() {
 | |
| +        if (packetListener instanceof ServerGamePacketListenerImpl) {
 | |
| +            return ((ServerGamePacketListenerImpl) packetListener).player;
 | |
| +        } else {
 | |
| +            return null;
 | |
| +        }
 | |
| +    }
 | |
| +    private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up.
 | |
| +        private static java.util.List<Packet> buildExtraPackets(Packet packet) {
 | |
| +            java.util.List<Packet> extra = packet.getExtraPackets();
 | |
| +            if (extra == null || extra.isEmpty()) {
 | |
| +                return null;
 | |
| +            }
 | |
| +            java.util.List<Packet> ret = new java.util.ArrayList<>(1 + extra.size());
 | |
| +            buildExtraPackets0(extra, ret);
 | |
| +            return ret;
 | |
| +        }
 | |
| +
 | |
| +        private static void buildExtraPackets0(java.util.List<Packet> extraPackets, java.util.List<Packet> into) {
 | |
| +            for (Packet extra : extraPackets) {
 | |
| +                into.add(extra);
 | |
| +                java.util.List<Packet> extraExtra = extra.getExtraPackets();
 | |
| +                if (extraExtra != null && !extraExtra.isEmpty()) {
 | |
| +                    buildExtraPackets0(extraExtra, into);
 | |
| +                }
 | |
| +            }
 | |
| +        }
 | |
| +        // Paper start
 | |
| +        private static boolean canSendImmediate(Connection networkManager, Packet<?> packet) {
 | |
| +            return networkManager.isPending || networkManager.protocol != ConnectionProtocol.PLAY ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundKeepAlivePacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundChatPreviewPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
 | |
| +                packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
 | |
| +        }
 | |
| +        // Paper end
 | |
| +    }
 | |
| +    // Paper end
 | |
|  
 | |
|      public void send(Packet<?> packet) {
 | |
|          this.send(packet, (GenericFutureListener) null);
 | |
|      }
 | |
|  
 | |
|      public void send(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> callback) {
 | |
| -        if (this.isConnected()) {
 | |
| -            this.flushQueue();
 | |
| +        // Paper start - handle oversized packets better
 | |
| +        boolean connected = this.isConnected();
 | |
| +        if (!connected && !preparing) {
 | |
| +            return; // Do nothing
 | |
| +        }
 | |
| +        packet.onPacketDispatch(getPlayer());
 | |
| +        if (connected && (InnerUtil.canSendImmediate(this, packet) || (
 | |
| +            net.minecraft.server.MCUtil.isMainThread() && packet.isReady() && this.queue.isEmpty() &&
 | |
| +            (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())
 | |
| +        ))) {
 | |
|              this.sendPacket(packet, callback);
 | |
| -        } else {
 | |
| -            this.queue.add(new Connection.PacketHolder(packet, callback));
 | |
| +            return;
 | |
|          }
 | |
| +        // write the packets to the queue, then flush - antixray hooks there already
 | |
| +        java.util.List<Packet> extraPackets = InnerUtil.buildExtraPackets(packet);
 | |
| +        boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
 | |
| +        if (!hasExtraPackets) {
 | |
| +            this.queue.add(new Connection.PacketHolder(packet, callback));
 | |
| +        } else {
 | |
| +            java.util.List<Connection.PacketHolder> packets = new java.util.ArrayList<>(1 + extraPackets.size());
 | |
| +            packets.add(new Connection.PacketHolder(packet, null)); // delay the future listener until the end of the extra packets
 | |
|  
 | |
| +            for (int i = 0, len = extraPackets.size(); i < len;) {
 | |
| +                Packet extra = extraPackets.get(i);
 | |
| +                boolean end = ++i == len;
 | |
| +                packets.add(new Connection.PacketHolder(extra, end ? callback : null)); // append listener to the end
 | |
| +            }
 | |
| +            this.queue.addAll(packets); // atomic
 | |
| +        }
 | |
| +        this.flushQueue();
 | |
| +        // Paper end
 | |
|      }
 | |
|  
 | |
|      private void sendPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> callback) {
 | |
| @@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
 | |
|              this.setProtocol(packetState);
 | |
|          }
 | |
|  
 | |
| +        // Paper start
 | |
| +        net.minecraft.server.level.ServerPlayer player = getPlayer();
 | |
| +        if (!isConnected()) {
 | |
| +            packet.onPacketDispatchFinish(player, null);
 | |
| +            return;
 | |
| +        }
 | |
| +
 | |
| +        try {
 | |
| +            // Paper end
 | |
|          ChannelFuture channelfuture = this.channel.writeAndFlush(packet);
 | |
|  
 | |
|          if (callback != null) {
 | |
|              channelfuture.addListener(callback);
 | |
|          }
 | |
| +        // Paper start
 | |
| +        if (packet.hasFinishListener()) {
 | |
| +            channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
 | |
| +        }
 | |
| +        // Paper end
 | |
|  
 | |
|          channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
 | |
| +        // Paper start
 | |
| +        } catch (Exception e) {
 | |
| +            LOGGER.error("NetworkException: " + player, e);
 | |
| +            disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
 | |
| +            packet.onPacketDispatchFinish(player, null);
 | |
| +        }
 | |
| +        // Paper end
 | |
|      }
 | |
|  
 | |
|      private ConnectionProtocol getCurrentProtocol() {
 | |
|          return (ConnectionProtocol) this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).get();
 | |
|      }
 | |
|  
 | |
| -    private void flushQueue() {
 | |
| -        if (this.channel != null && this.channel.isOpen()) {
 | |
| -            Queue queue = this.queue;
 | |
| -
 | |
| +    // Paper start - rewrite this to be safer if ran off main thread
 | |
| +    private boolean flushQueue() { // void -> boolean
 | |
| +        if (!isConnected()) {
 | |
| +            return true;
 | |
| +        }
 | |
| +        if (net.minecraft.server.MCUtil.isMainThread()) {
 | |
| +            return processQueue();
 | |
| +        } else if (isPending) {
 | |
| +            // Should only happen during login/status stages
 | |
|              synchronized (this.queue) {
 | |
| -                Connection.PacketHolder networkmanager_queuedpacket;
 | |
| -
 | |
| -                while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) {
 | |
| -                    this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener);
 | |
| -                }
 | |
| +                return this.processQueue();
 | |
| +            }
 | |
| +        }
 | |
| +        return false;
 | |
| +    }
 | |
| +    private boolean processQueue() {
 | |
| +        if (this.queue.isEmpty()) return true;
 | |
| +        // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
 | |
| +        // But if we are not on main due to login/status, the parent is synchronized on packetQueue
 | |
| +        java.util.Iterator<PacketHolder> iterator = this.queue.iterator();
 | |
| +        while (iterator.hasNext()) {
 | |
| +            PacketHolder queued = iterator.next(); // poll -> peek
 | |
| +
 | |
| +            // Fix NPE (Spigot bug caused by handleDisconnection())
 | |
| +            if (queued == null) {
 | |
| +                return true;
 | |
| +            }
 | |
|  
 | |
| +            Packet<?> packet = queued.packet;
 | |
| +            if (!packet.isReady()) {
 | |
| +                return false;
 | |
| +            } else {
 | |
| +                iterator.remove();
 | |
| +                this.sendPacket(packet, queued.listener);
 | |
|              }
 | |
|          }
 | |
| +        return true;
 | |
|      }
 | |
| +    // Paper end
 | |
|  
 | |
|      public void tick() {
 | |
|          this.flushQueue();
 | |
| @@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
 | |
|          return this.address;
 | |
|      }
 | |
|  
 | |
| +    // Paper start
 | |
| +    public void clearPacketQueue() {
 | |
| +        net.minecraft.server.level.ServerPlayer player = getPlayer();
 | |
| +        queue.forEach(queuedPacket -> {
 | |
| +            Packet<?> packet = queuedPacket.packet;
 | |
| +            if (packet.hasFinishListener()) {
 | |
| +                packet.onPacketDispatchFinish(player, null);
 | |
| +            }
 | |
| +        });
 | |
| +        queue.clear();
 | |
| +    }
 | |
| +    // Paper end
 | |
|      public void disconnect(Component disconnectReason) {
 | |
|          // Spigot Start
 | |
|          this.preparing = false;
 | |
| +        clearPacketQueue(); // Paper
 | |
|          // Spigot End
 | |
|          if (this.channel.isOpen()) {
 | |
|              this.channel.close(); // We can't wait as this may be called from an event loop.
 | |
| @@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
 | |
|      public void handleDisconnection() {
 | |
|          if (this.channel != null && !this.channel.isOpen()) {
 | |
|              if (this.disconnectionHandled) {
 | |
| -                Connection.LOGGER.warn("handleDisconnection() called twice");
 | |
| +                //Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Do not log useless message
 | |
|              } else {
 | |
|                  this.disconnectionHandled = true;
 | |
|                  if (this.getDisconnectedReason() != null) {
 | |
| @@ -0,0 +0,0 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
 | |
|                  } else if (this.getPacketListener() != null) {
 | |
|                      this.getPacketListener().onDisconnect(Component.translatable("multiplayer.disconnect.generic"));
 | |
|                  }
 | |
| -                this.queue.clear(); // Free up packet queue.
 | |
| +                clearPacketQueue(); // Paper
 | |
|                  // Paper start - Add PlayerConnectionCloseEvent
 | |
|                  final PacketListener packetListener = this.getPacketListener();
 | |
|                  if (packetListener instanceof ServerGamePacketListenerImpl) {
 | |
| diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
 | |
| index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644
 | |
| --- a/src/main/java/net/minecraft/network/protocol/Packet.java
 | |
| +++ b/src/main/java/net/minecraft/network/protocol/Packet.java
 | |
| @@ -0,0 +0,0 @@ public interface Packet<T extends PacketListener> {
 | |
|      void handle(T listener);
 | |
|  
 | |
|      // Paper start
 | |
| +    /**
 | |
| +     * @param player Null if not at PLAY stage yet
 | |
| +     */
 | |
| +    default void onPacketDispatch(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player) {}
 | |
| +
 | |
| +    /**
 | |
| +     * @param player Null if not at PLAY stage yet
 | |
| +     * @param future Can be null if packet was cancelled
 | |
| +     */
 | |
| +    default void onPacketDispatchFinish(@javax.annotation.Nullable net.minecraft.server.level.ServerPlayer player, @javax.annotation.Nullable io.netty.channel.ChannelFuture future) {}
 | |
| +    default boolean hasFinishListener() { return false; }
 | |
| +    default boolean isReady() { return true; }
 | |
| +    default java.util.List<Packet> getExtraPackets() { return null; }
 | |
|      default boolean packetTooLarge(net.minecraft.network.Connection manager) {
 | |
|          return false;
 | |
|      }
 | |
| diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
 | |
| index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 100644
 | |
| --- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
 | |
| +++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
 | |
| @@ -0,0 +0,0 @@ public class ServerConnectionListener {
 | |
|      final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
 | |
|      // Paper start - prevent blocking on adding a new network manager while the server is ticking
 | |
|      private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
 | |
| +    private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper
 | |
|      private final void addPending() {
 | |
|          Connection manager = null;
 | |
|          while ((manager = pending.poll()) != null) {
 | |
|              connections.add(manager);
 | |
| +            manager.isPending = false;
 | |
|          }
 | |
|      }
 | |
|      // Paper end
 | |
| @@ -0,0 +0,0 @@ public class ServerConnectionListener {
 | |
|                          ;
 | |
|                      }
 | |
|  
 | |
| +                    if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper
 | |
|                      channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this)).addLast("splitter", new Varint21FrameDecoder()).addLast("decoder", new PacketDecoder(PacketFlow.SERVERBOUND)).addLast("prepender", new Varint21LengthFieldPrepender()).addLast("encoder", new PacketEncoder(PacketFlow.CLIENTBOUND));
 | |
|                      int j = ServerConnectionListener.this.server.getRateLimitPacketsPerSecond();
 | |
|                      Object object = j > 0 ? new RateKickingConnection(j) : new Connection(PacketFlow.SERVERBOUND);
 | 
