a207d14a0e
Signed-off-by: Mariell Hoversholm <proximyst@proximyst.com>
394 lines
18 KiB
Diff
394 lines
18 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: Aikar <aikar@aikar.co>
|
|
Date: Wed, 6 May 2020 04:53:35 -0400
|
|
Subject: [PATCH] Optimize Network Manager and add advanced packet support
|
|
|
|
Adds ability for 1 packet to bundle other packets to follow it
|
|
Adds ability for a packet to delay sending more packets until a state is ready.
|
|
|
|
Removes synchronization from sending packets
|
|
Removes processing packet queue off of main thread
|
|
- for the few cases where it is allowed, order is not necessary nor
|
|
should it even be happening concurrently in first place (handshaking/login/status)
|
|
|
|
Ensures packets sent asynchronously are dispatched on main thread
|
|
|
|
This helps ensure safety for ProtocolLib as packet listeners
|
|
are commonly accessing world state. This will allow you to schedule
|
|
a packet to be sent async, but itll be dispatched sync for packet
|
|
listeners to process.
|
|
|
|
This should solve some deadlock risks
|
|
|
|
Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
|
|
|
|
Also avoids spamming closed channel exception by rechecking closed state in dispatch
|
|
and then catch exceptions and close if they fire.
|
|
|
|
Part of this commit was authored by: Spottedleaf
|
|
|
|
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
|
|
index 3247ec5d6cf329ba0b7e6d5a6c3294dec2e34db4..fc63df21aecd4721efdb45d4744666ed0b562c1b 100644
|
|
--- a/src/main/java/net/minecraft/network/Connection.java
|
|
+++ b/src/main/java/net/minecraft/network/Connection.java
|
|
@@ -25,8 +25,15 @@ import net.minecraft.network.chat.Component;
|
|
import net.minecraft.network.chat.TranslatableComponent;
|
|
import net.minecraft.network.protocol.Packet;
|
|
import net.minecraft.network.protocol.PacketFlow;
|
|
+import net.minecraft.network.protocol.game.ClientboundBossEventPacket;
|
|
+import net.minecraft.network.protocol.game.ClientboundChatPacket;
|
|
+import net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket;
|
|
import net.minecraft.network.protocol.game.ClientboundDisconnectPacket;
|
|
+import net.minecraft.network.protocol.game.ClientboundKeepAlivePacket;
|
|
+import net.minecraft.network.protocol.game.ClientboundSetTitlesPacket;
|
|
+import net.minecraft.server.MCUtil;
|
|
import net.minecraft.server.RunningOnDifferentThreadException;
|
|
+import net.minecraft.server.level.ServerPlayer;
|
|
import net.minecraft.server.network.ServerGamePacketListenerImpl;
|
|
import net.minecraft.server.network.ServerLoginPacketListenerImpl;
|
|
import net.minecraft.util.LazyLoadedValue;
|
|
@@ -75,6 +82,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public int protocolVersion;
|
|
public java.net.InetSocketAddress virtualHost;
|
|
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
|
|
+ // Optimize network
|
|
+ public boolean isPending = true;
|
|
+ public boolean queueImmunity = false;
|
|
+ public ConnectionProtocol protocol;
|
|
// Paper end
|
|
|
|
public Connection(PacketFlow side) {
|
|
@@ -98,6 +109,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
}
|
|
|
|
public void setProtocol(ConnectionProtocol state) {
|
|
+ protocol = state; // Paper
|
|
this.channel.attr(Connection.ATTRIBUTE_PROTOCOL).set(state);
|
|
this.channel.config().setAutoRead(true);
|
|
Connection.LOGGER.debug("Enabled auto read");
|
|
@@ -168,19 +180,84 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
Validate.notNull(listener, "packetListener", new Object[0]);
|
|
this.packetListener = listener;
|
|
}
|
|
+ // Paper start
|
|
+ public ServerPlayer getPlayer() {
|
|
+ if (packetListener instanceof ServerGamePacketListenerImpl) {
|
|
+ return ((ServerGamePacketListenerImpl) packetListener).player;
|
|
+ } else {
|
|
+ return null;
|
|
+ }
|
|
+ }
|
|
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib so it doesn't accidently pick them up.
|
|
+ private static java.util.List<Packet> buildExtraPackets(Packet packet) {
|
|
+ java.util.List<Packet> extra = packet.getExtraPackets();
|
|
+ if (extra == null || extra.isEmpty()) {
|
|
+ return null;
|
|
+ }
|
|
+ java.util.List<Packet> ret = new java.util.ArrayList<>(1 + extra.size());
|
|
+ buildExtraPackets0(extra, ret);
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
+ private static void buildExtraPackets0(java.util.List<Packet> extraPackets, java.util.List<Packet> into) {
|
|
+ for (Packet extra : extraPackets) {
|
|
+ into.add(extra);
|
|
+ java.util.List<Packet> extraExtra = extra.getExtraPackets();
|
|
+ if (extraExtra != null && !extraExtra.isEmpty()) {
|
|
+ buildExtraPackets0(extraExtra, into);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ // Paper start
|
|
+ private static boolean canSendImmediate(Connection networkManager, Packet<?> packet) {
|
|
+ return networkManager.isPending || networkManager.protocol != ConnectionProtocol.PLAY ||
|
|
+ packet instanceof ClientboundKeepAlivePacket ||
|
|
+ packet instanceof ClientboundChatPacket ||
|
|
+ packet instanceof ClientboundCommandSuggestionsPacket ||
|
|
+ packet instanceof ClientboundSetTitlesPacket ||
|
|
+ packet instanceof ClientboundBossEventPacket;
|
|
+ }
|
|
+ // Paper end
|
|
+ }
|
|
+ // Paper end
|
|
|
|
public void send(Packet<?> packet) {
|
|
this.send(packet, (GenericFutureListener) null);
|
|
}
|
|
|
|
public void send(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> callback) {
|
|
- if (this.isConnected()) {
|
|
- this.flushQueue();
|
|
- this.sendPacket(packet, callback);
|
|
- } else {
|
|
- this.queue.add(new Connection.PacketHolder(packet, callback));
|
|
+ // Paper start - handle oversized packets better
|
|
+ boolean connected = this.isConnected();
|
|
+ if (!connected && !preparing) {
|
|
+ return; // Do nothing
|
|
+ }
|
|
+ packet.onPacketDispatch(getPlayer());
|
|
+ if (connected && (InnerUtil.canSendImmediate(this, packet) || (
|
|
+ MCUtil.isMainThread() && packet.isReady() && this.queue.isEmpty() &&
|
|
+ (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())
|
|
+ ))) {
|
|
+ this.dispatchPacket(packet, callback);
|
|
+ return;
|
|
}
|
|
+ // write the packets to the queue, then flush - antixray hooks there already
|
|
+ java.util.List<Packet> extraPackets = InnerUtil.buildExtraPackets(packet);
|
|
+ boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
|
|
+ if (!hasExtraPackets) {
|
|
+ this.queue.add(new Connection.PacketHolder(packet, callback));
|
|
+ } else {
|
|
+ java.util.List<Connection.PacketHolder> packets = new java.util.ArrayList<>(1 + extraPackets.size());
|
|
+ packets.add(new Connection.PacketHolder(packet, null)); // delay the future listener until the end of the extra packets
|
|
+
|
|
+ for (int i = 0, len = extraPackets.size(); i < len;) {
|
|
+ Packet extra = extraPackets.get(i);
|
|
+ boolean end = ++i == len;
|
|
+ packets.add(new Connection.PacketHolder(extra, end ? callback : null)); // append listener to the end
|
|
+ }
|
|
|
|
+ this.queue.addAll(packets); // atomic
|
|
+ }
|
|
+ this.sendPacketQueue();
|
|
+ // Paper end
|
|
}
|
|
|
|
private void dispatchPacket(Packet<?> packet, @Nullable GenericFutureListener<? extends Future<? super Void>> genericFutureListener) { this.sendPacket(packet, genericFutureListener); } // Paper - OBFHELPER
|
|
@@ -194,54 +271,119 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
this.channel.config().setAutoRead(false);
|
|
}
|
|
|
|
+ ServerPlayer player = getPlayer(); // Paper
|
|
if (this.channel.eventLoop().inEventLoop()) {
|
|
if (enumprotocol != enumprotocol1) {
|
|
this.setProtocol(enumprotocol);
|
|
}
|
|
+ // Paper start
|
|
+ if (!isConnected()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ return;
|
|
+ }
|
|
+ try {
|
|
+ // Paper end
|
|
|
|
ChannelFuture channelfuture = this.channel.writeAndFlush(packet);
|
|
|
|
if (callback != null) {
|
|
channelfuture.addListener(callback);
|
|
}
|
|
+ // Paper start
|
|
+ if (packet.hasFinishListener()) {
|
|
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
|
|
+ }
|
|
+ // Paper end
|
|
|
|
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
|
+ // Paper start
|
|
+ } catch (Exception e) {
|
|
+ LOGGER.error("NetworkException: " + player, e);
|
|
+ disconnect(new TranslatableComponent("disconnect.genericReason", "Internal Exception: " + e.getMessage()));;
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ }
|
|
+ // Paper end
|
|
} else {
|
|
this.channel.eventLoop().execute(() -> {
|
|
if (enumprotocol != enumprotocol1) {
|
|
this.setProtocol(enumprotocol);
|
|
}
|
|
|
|
+ // Paper start
|
|
+ if (!isConnected()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ return;
|
|
+ }
|
|
+ try {
|
|
+ // Paper end
|
|
ChannelFuture channelfuture1 = this.channel.writeAndFlush(packet);
|
|
|
|
+
|
|
if (callback != null) {
|
|
channelfuture1.addListener(callback);
|
|
}
|
|
+ // Paper start
|
|
+ if (packet.hasFinishListener()) {
|
|
+ channelfuture1.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
|
|
+ }
|
|
+ // Paper end
|
|
|
|
channelfuture1.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
|
+ // Paper start
|
|
+ } catch (Exception e) {
|
|
+ LOGGER.error("NetworkException: " + player, e);
|
|
+ disconnect(new TranslatableComponent("disconnect.genericReason", "Internal Exception: " + e.getMessage()));;
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ }
|
|
+ // Paper end
|
|
});
|
|
}
|
|
|
|
}
|
|
|
|
- private void sendPacketQueue() { this.flushQueue(); } // Paper - OBFHELPER
|
|
- private void flushQueue() {
|
|
- if (this.channel != null && this.channel.isOpen()) {
|
|
- Queue queue = this.queue;
|
|
-
|
|
+ // Paper start - rewrite this to be safer if ran off main thread
|
|
+ private boolean sendPacketQueue() { return this.p(); } // OBFHELPER // void -> boolean
|
|
+ private boolean p() { // void -> boolean
|
|
+ if (!isConnected()) {
|
|
+ return true;
|
|
+ }
|
|
+ if (MCUtil.isMainThread()) {
|
|
+ return processQueue();
|
|
+ } else if (isPending) {
|
|
+ // Should only happen during login/status stages
|
|
synchronized (this.queue) {
|
|
- Connection.PacketHolder networkmanager_queuedpacket;
|
|
-
|
|
- while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) {
|
|
- this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener);
|
|
- }
|
|
+ return this.processQueue();
|
|
+ }
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+ private boolean processQueue() {
|
|
+ if (this.queue.isEmpty()) return true;
|
|
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
|
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
|
+ java.util.Iterator<PacketHolder> iterator = this.queue.iterator();
|
|
+ while (iterator.hasNext()) {
|
|
+ Connection.PacketHolder queued = iterator.next(); // poll -> peek
|
|
+
|
|
+ // Fix NPE (Spigot bug caused by handleDisconnection())
|
|
+ if (queued == null) {
|
|
+ return true;
|
|
+ }
|
|
|
|
+ Packet<?> packet = queued.getPacket();
|
|
+ if (!packet.isReady()) {
|
|
+ return false;
|
|
+ } else {
|
|
+ iterator.remove();
|
|
+ this.dispatchPacket(packet, queued.getGenericFutureListener());
|
|
}
|
|
}
|
|
+ return true;
|
|
}
|
|
+ // Paper end
|
|
|
|
public void tick() {
|
|
- this.flushQueue();
|
|
+ this.p();
|
|
if (this.packetListener instanceof ServerLoginPacketListenerImpl) {
|
|
((ServerLoginPacketListenerImpl) this.packetListener).tick();
|
|
}
|
|
@@ -271,9 +413,21 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
return this.address;
|
|
}
|
|
|
|
+ // Paper start
|
|
+ public void clearPacketQueue() {
|
|
+ ServerPlayer player = getPlayer();
|
|
+ queue.forEach(queuedPacket -> {
|
|
+ Packet<?> packet = queuedPacket.getPacket();
|
|
+ if (packet.hasFinishListener()) {
|
|
+ packet.onPacketDispatchFinish(player, null);
|
|
+ }
|
|
+ });
|
|
+ queue.clear();
|
|
+ } // Paper end
|
|
public void disconnect(Component disconnectReason) {
|
|
// Spigot Start
|
|
this.preparing = false;
|
|
+ clearPacketQueue(); // Paper
|
|
// Spigot End
|
|
if (this.channel.isOpen()) {
|
|
this.channel.close(); // We can't wait as this may be called from an event loop.
|
|
@@ -341,7 +495,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
public void handleDisconnection() {
|
|
if (this.channel != null && !this.channel.isOpen()) {
|
|
if (this.disconnectionHandled) {
|
|
- Connection.LOGGER.warn("handleDisconnection() called twice");
|
|
+ //NetworkManager.LOGGER.warn("handleDisconnection() called twice"); // Paper - Do not log useless message
|
|
} else {
|
|
this.disconnectionHandled = true;
|
|
if (this.getDisconnectedReason() != null) {
|
|
@@ -349,7 +503,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
|
} else if (this.getPacketListener() != null) {
|
|
this.getPacketListener().a(new TranslatableComponent("multiplayer.disconnect.generic"));
|
|
}
|
|
- this.queue.clear(); // Free up packet queue.
|
|
+ clearPacketQueue(); // Paper
|
|
// Paper start - Add PlayerConnectionCloseEvent
|
|
final PacketListener packetListener = this.getPacketListener();
|
|
if (packetListener instanceof ServerGamePacketListenerImpl) {
|
|
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
|
|
index 9914a82ba0ec146ab13fe94c4dbf0ebf64926536..22db5d0d2cc33498ca40162c66aa3b5fbf2f569f 100644
|
|
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
|
|
+++ b/src/main/java/net/minecraft/network/protocol/Packet.java
|
|
@@ -1,5 +1,6 @@
|
|
package net.minecraft.network.protocol;
|
|
|
|
+import io.netty.channel.ChannelFuture; // Paper
|
|
import java.io.IOException;
|
|
import net.minecraft.network.FriendlyByteBuf;
|
|
import net.minecraft.network.PacketListener;
|
|
@@ -13,6 +14,20 @@ public interface Packet<T extends PacketListener> {
|
|
void handle(T listener);
|
|
|
|
// Paper start
|
|
+
|
|
+ /**
|
|
+ * @param player Null if not at PLAY stage yet
|
|
+ */
|
|
+ default void onPacketDispatch(@javax.annotation.Nullable EntityPlayer player) {}
|
|
+
|
|
+ /**
|
|
+ * @param player Null if not at PLAY stage yet
|
|
+ * @param future Can be null if packet was cancelled
|
|
+ */
|
|
+ default void onPacketDispatchFinish(@javax.annotation.Nullable EntityPlayer player, @javax.annotation.Nullable ChannelFuture future) {}
|
|
+ default boolean hasFinishListener() { return false; }
|
|
+ default boolean isReady() { return true; }
|
|
+ default java.util.List<Packet> getExtraPackets() { return null; }
|
|
default boolean packetTooLarge(NetworkManager manager) {
|
|
return false;
|
|
}
|
|
diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
|
index 6cb51a4fe3c11f53fbb556ce6b0d64b735254d51..d46910cfdc0aef046a0c79731a85d381953c328a 100644
|
|
--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
|
+++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
|
@@ -16,6 +16,7 @@ import io.netty.channel.epoll.EpollServerSocketChannel;
|
|
import io.netty.channel.nio.NioEventLoopGroup;
|
|
import io.netty.channel.socket.ServerSocketChannel;
|
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
|
+import io.netty.handler.flush.FlushConsolidationHandler; // Paper
|
|
import io.netty.handler.timeout.ReadTimeoutHandler;
|
|
import java.io.IOException;
|
|
import java.net.InetAddress;
|
|
@@ -54,10 +55,12 @@ public class ServerConnectionListener {
|
|
private final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
|
|
// Paper start - prevent blocking on adding a new network manager while the server is ticking
|
|
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
|
|
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper
|
|
private void addPending() {
|
|
Connection manager = null;
|
|
while ((manager = pending.poll()) != null) {
|
|
connections.add(manager);
|
|
+ manager.isPending = false;
|
|
}
|
|
}
|
|
// Paper end
|
|
@@ -92,6 +95,7 @@ public class ServerConnectionListener {
|
|
;
|
|
}
|
|
|
|
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new FlushConsolidationHandler()); // Paper
|
|
channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this)).addLast("splitter", new Varint21FrameDecoder()).addLast("decoder", new PacketDecoder(PacketFlow.SERVERBOUND)).addLast("prepender", new Varint21LengthFieldPrepender()).addLast("encoder", new PacketEncoder(PacketFlow.CLIENTBOUND));
|
|
int j = ServerConnectionListener.this.server.getRateLimitPacketsPerSecond();
|
|
Object object = j > 0 ? new RateKickingConnection(j) : new Connection(PacketFlow.SERVERBOUND);
|