Readd network optimization patch
This commit is contained in:
parent
a50d7c340b
commit
a353f4d207
725 changed files with 421 additions and 390 deletions
|
@ -0,0 +1,390 @@
|
|||
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
||||
From: Aikar <aikar@aikar.co>
|
||||
Date: Wed, 6 May 2020 04:53:35 -0400
|
||||
Subject: [PATCH] Optimize Network Manager and add advanced packet support
|
||||
|
||||
Adds ability for 1 packet to bundle other packets to follow it
|
||||
Adds ability for a packet to delay sending more packets until a state is ready.
|
||||
|
||||
Removes synchronization from sending packets
|
||||
Removes processing packet queue off of main thread
|
||||
- for the few cases where it is allowed, order is not necessary nor
|
||||
should it even be happening concurrently in first place (handshaking/login/status)
|
||||
|
||||
Ensures packets sent asynchronously are dispatched on main thread
|
||||
|
||||
This helps ensure safety for ProtocolLib as packet listeners
|
||||
are commonly accessing world state. This will allow you to schedule
|
||||
a packet to be sent async, but itll be dispatched sync for packet
|
||||
listeners to process.
|
||||
|
||||
This should solve some deadlock risks
|
||||
|
||||
Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
|
||||
|
||||
Also avoids spamming closed channel exception by rechecking closed state in dispatch
|
||||
and then catch exceptions and close if they fire.
|
||||
|
||||
Part of this commit was authored by: Spottedleaf, sandtechnology
|
||||
|
||||
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
|
||||
index 9f8fbb8231b60c287c9442365c2f95cde92969b8..d8ae86dcc07872c63064782c9864b873f07139d1 100644
|
||||
--- a/src/main/java/net/minecraft/network/Connection.java
|
||||
+++ b/src/main/java/net/minecraft/network/Connection.java
|
||||
@@ -84,7 +84,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build());
|
||||
});
|
||||
private final PacketFlow receiving;
|
||||
- private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
|
||||
+ private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue();
|
||||
public Channel channel;
|
||||
public SocketAddress address;
|
||||
// Spigot Start
|
||||
@@ -116,6 +116,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public java.net.InetSocketAddress virtualHost;
|
||||
private static boolean enableExplicitFlush = Boolean.getBoolean("paper.explicit-flush");
|
||||
// Paper end
|
||||
+ // Paper start - Optimize network
|
||||
+ public boolean isPending = true;
|
||||
+ public boolean queueImmunity;
|
||||
+ // Paper end - Optimize network
|
||||
|
||||
// Paper start - add utility methods
|
||||
public final net.minecraft.server.level.ServerPlayer getPlayer() {
|
||||
@@ -307,15 +311,39 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
}
|
||||
|
||||
public void send(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
|
||||
- if (this.isConnected()) {
|
||||
- this.flushQueue();
|
||||
+ // Paper start - Optimize network: Handle oversized packets better
|
||||
+ final boolean connected = this.isConnected();
|
||||
+ if (!connected && !this.preparing) {
|
||||
+ return;
|
||||
+ }
|
||||
+
|
||||
+ packet.onPacketDispatch(this.getPlayer());
|
||||
+ if (connected && (InnerUtil.canSendImmediate(this, packet)
|
||||
+ || (io.papermc.paper.util.MCUtil.isMainThread() && packet.isReady() && this.pendingActions.isEmpty()
|
||||
+ && (packet.getExtraPackets() == null || packet.getExtraPackets().isEmpty())))) {
|
||||
this.sendPacket(packet, callbacks, flush);
|
||||
} else {
|
||||
- this.pendingActions.add((networkmanager) -> {
|
||||
- networkmanager.sendPacket(packet, callbacks, flush);
|
||||
- });
|
||||
- }
|
||||
+ // Write the packets to the queue, then flush - antixray hooks there already
|
||||
+ final java.util.List<Packet<?>> extraPackets = InnerUtil.buildExtraPackets(packet);
|
||||
+ final boolean hasExtraPackets = extraPackets != null && !extraPackets.isEmpty();
|
||||
+ if (!hasExtraPackets) {
|
||||
+ this.pendingActions.add(new PacketSendAction(packet, callbacks, flush));
|
||||
+ } else {
|
||||
+ final java.util.List<PacketSendAction> actions = new java.util.ArrayList<>(1 + extraPackets.size());
|
||||
+ actions.add(new PacketSendAction(packet, null, false)); // Delay the future listener until the end of the extra packets
|
||||
|
||||
+ for (int i = 0, len = extraPackets.size(); i < len;) {
|
||||
+ final Packet<?> extraPacket = extraPackets.get(i);
|
||||
+ final boolean end = ++i == len;
|
||||
+ actions.add(new PacketSendAction(extraPacket, end ? callbacks : null, end)); // Append listener to the end
|
||||
+ }
|
||||
+
|
||||
+ this.pendingActions.addAll(actions);
|
||||
+ }
|
||||
+
|
||||
+ this.flushQueue();
|
||||
+ // Paper end - Optimize network
|
||||
+ }
|
||||
}
|
||||
|
||||
public void runOnceConnected(Consumer<Connection> task) {
|
||||
@@ -323,7 +351,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
this.flushQueue();
|
||||
task.accept(this);
|
||||
} else {
|
||||
- this.pendingActions.add(task);
|
||||
+ this.pendingActions.add(new WrappedConsumer(task)); // Paper - Optimize network
|
||||
}
|
||||
|
||||
}
|
||||
@@ -341,6 +369,14 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
}
|
||||
|
||||
private void doSendPacket(Packet<?> packet, @Nullable PacketSendListener callbacks, boolean flush) {
|
||||
+ // Paper start - Optimize network
|
||||
+ final net.minecraft.server.level.ServerPlayer player = this.getPlayer();
|
||||
+ if (!this.isConnected()) {
|
||||
+ packet.onPacketDispatchFinish(player, null);
|
||||
+ return;
|
||||
+ }
|
||||
+ try {
|
||||
+ // Paper end - Optimize network
|
||||
ChannelFuture channelfuture = flush ? this.channel.writeAndFlush(packet) : this.channel.write(packet);
|
||||
|
||||
if (callbacks != null) {
|
||||
@@ -360,14 +396,24 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
});
|
||||
}
|
||||
|
||||
+ // Paper start - Optimize network
|
||||
+ if (packet.hasFinishListener()) {
|
||||
+ channelfuture.addListener((ChannelFutureListener) channelFuture -> packet.onPacketDispatchFinish(player, channelFuture));
|
||||
+ }
|
||||
channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
|
||||
+ } catch (final Exception e) {
|
||||
+ LOGGER.error("NetworkException: {}", player, e);
|
||||
+ this.disconnect(Component.translatable("disconnect.genericReason", "Internal Exception: " + e.getMessage()));
|
||||
+ packet.onPacketDispatchFinish(player, null);
|
||||
+ }
|
||||
+ // Paper end - Optimize network
|
||||
}
|
||||
|
||||
public void flushChannel() {
|
||||
if (this.isConnected()) {
|
||||
this.flush();
|
||||
} else {
|
||||
- this.pendingActions.add(Connection::flush);
|
||||
+ this.pendingActions.add(new WrappedConsumer(Connection::flush)); // Paper - Optimize network
|
||||
}
|
||||
|
||||
}
|
||||
@@ -400,20 +446,57 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
return attributekey;
|
||||
}
|
||||
|
||||
- private void flushQueue() {
|
||||
- if (this.channel != null && this.channel.isOpen()) {
|
||||
- Queue queue = this.pendingActions;
|
||||
-
|
||||
+ // Paper start - Optimize network: Rewrite this to be safer if ran off main thread
|
||||
+ private boolean flushQueue() {
|
||||
+ if (!this.isConnected()) {
|
||||
+ return true;
|
||||
+ }
|
||||
+ if (io.papermc.paper.util.MCUtil.isMainThread()) {
|
||||
+ return this.processQueue();
|
||||
+ } else if (this.isPending) {
|
||||
+ // Should only happen during login/status stages
|
||||
synchronized (this.pendingActions) {
|
||||
- Consumer consumer;
|
||||
+ return this.processQueue();
|
||||
+ }
|
||||
+ }
|
||||
+ return false;
|
||||
+ }
|
||||
+
|
||||
+ private boolean processQueue() {
|
||||
+ if (this.pendingActions.isEmpty()) {
|
||||
+ return true;
|
||||
+ }
|
||||
|
||||
- while ((consumer = (Consumer) this.pendingActions.poll()) != null) {
|
||||
- consumer.accept(this);
|
||||
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
|
||||
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
|
||||
+ final java.util.Iterator<WrappedConsumer> iterator = this.pendingActions.iterator();
|
||||
+ while (iterator.hasNext()) {
|
||||
+ final WrappedConsumer queued = iterator.next(); // poll -> peek
|
||||
+
|
||||
+ // Fix NPE (Spigot bug caused by handleDisconnection())
|
||||
+ if (queued == null) {
|
||||
+ return true;
|
||||
+ }
|
||||
+
|
||||
+ if (queued.isConsumed()) {
|
||||
+ continue;
|
||||
+ }
|
||||
+
|
||||
+ if (queued instanceof PacketSendAction packetSendAction) {
|
||||
+ final Packet<?> packet = packetSendAction.packet;
|
||||
+ if (!packet.isReady()) {
|
||||
+ return false;
|
||||
}
|
||||
+ }
|
||||
|
||||
+ iterator.remove();
|
||||
+ if (queued.tryMarkConsumed()) {
|
||||
+ queued.accept(this);
|
||||
}
|
||||
}
|
||||
+ return true;
|
||||
}
|
||||
+ // Paper end - Optimize network
|
||||
|
||||
public void tick() {
|
||||
this.flushQueue();
|
||||
@@ -461,6 +544,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public void disconnect(Component disconnectReason) {
|
||||
// Spigot Start
|
||||
this.preparing = false;
|
||||
+ this.clearPacketQueue(); // Paper - Optimize network
|
||||
// Spigot End
|
||||
if (this.channel == null) {
|
||||
this.delayedDisconnect = disconnectReason;
|
||||
@@ -630,7 +714,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public void handleDisconnection() {
|
||||
if (this.channel != null && !this.channel.isOpen()) {
|
||||
if (this.disconnectionHandled) {
|
||||
- Connection.LOGGER.warn("handleDisconnection() called twice");
|
||||
+ // Connection.LOGGER.warn("handleDisconnection() called twice"); // Paper - Don't log useless message
|
||||
} else {
|
||||
this.disconnectionHandled = true;
|
||||
PacketListener packetlistener = this.getPacketListener();
|
||||
@@ -643,7 +727,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
|
||||
packetlistener1.onDisconnect(ichatbasecomponent);
|
||||
}
|
||||
- this.pendingActions.clear(); // Free up packet queue.
|
||||
+ this.clearPacketQueue(); // Paper - Optimize network
|
||||
// Paper start - Add PlayerConnectionCloseEvent
|
||||
final PacketListener packetListener = this.getPacketListener();
|
||||
if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl) {
|
||||
@@ -681,4 +765,89 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
public void setBandwidthLogger(SampleLogger log) {
|
||||
this.bandwidthDebugMonitor = new BandwidthDebugMonitor(log);
|
||||
}
|
||||
+
|
||||
+ // Paper start - Optimize network
|
||||
+ public void clearPacketQueue() {
|
||||
+ final net.minecraft.server.level.ServerPlayer player = getPlayer();
|
||||
+ for (final Consumer<Connection> queuedAction : this.pendingActions) {
|
||||
+ if (queuedAction instanceof PacketSendAction packetSendAction) {
|
||||
+ final Packet<?> packet = packetSendAction.packet;
|
||||
+ if (packet.hasFinishListener()) {
|
||||
+ packet.onPacketDispatchFinish(player, null);
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+ this.pendingActions.clear();
|
||||
+ }
|
||||
+
|
||||
+ private static class InnerUtil { // Attempt to hide these methods from ProtocolLib, so it doesn't accidently pick them up.
|
||||
+
|
||||
+ @Nullable
|
||||
+ private static java.util.List<Packet<?>> buildExtraPackets(final Packet<?> packet) {
|
||||
+ final java.util.List<Packet<?>> extra = packet.getExtraPackets();
|
||||
+ if (extra == null || extra.isEmpty()) {
|
||||
+ return null;
|
||||
+ }
|
||||
+
|
||||
+ final java.util.List<Packet<?>> ret = new java.util.ArrayList<>(1 + extra.size());
|
||||
+ buildExtraPackets0(extra, ret);
|
||||
+ return ret;
|
||||
+ }
|
||||
+
|
||||
+ private static void buildExtraPackets0(final java.util.List<Packet<?>> extraPackets, final java.util.List<Packet<?>> into) {
|
||||
+ for (final Packet<?> extra : extraPackets) {
|
||||
+ into.add(extra);
|
||||
+ final java.util.List<Packet<?>> extraExtra = extra.getExtraPackets();
|
||||
+ if (extraExtra != null && !extraExtra.isEmpty()) {
|
||||
+ buildExtraPackets0(extraExtra, into);
|
||||
+ }
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ private static boolean canSendImmediate(final Connection networkManager, final net.minecraft.network.protocol.Packet<?> packet) {
|
||||
+ return networkManager.isPending || networkManager.packetListener.protocol() != ConnectionProtocol.PLAY ||
|
||||
+ packet instanceof net.minecraft.network.protocol.common.ClientboundKeepAlivePacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundPlayerChatPacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSystemChatPacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundCommandSuggestionsPacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitleTextPacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetSubtitleTextPacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetActionBarTextPacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundSetTitlesAnimationPacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundClearTitlesPacket ||
|
||||
+ packet instanceof net.minecraft.network.protocol.game.ClientboundBossEventPacket;
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ private static class WrappedConsumer implements Consumer<Connection> {
|
||||
+ private final Consumer<Connection> delegate;
|
||||
+ private final java.util.concurrent.atomic.AtomicBoolean consumed = new java.util.concurrent.atomic.AtomicBoolean(false);
|
||||
+
|
||||
+ private WrappedConsumer(final Consumer<Connection> delegate) {
|
||||
+ this.delegate = delegate;
|
||||
+ }
|
||||
+
|
||||
+ @Override
|
||||
+ public void accept(final Connection connection) {
|
||||
+ this.delegate.accept(connection);
|
||||
+ }
|
||||
+
|
||||
+ public boolean tryMarkConsumed() {
|
||||
+ return consumed.compareAndSet(false, true);
|
||||
+ }
|
||||
+
|
||||
+ public boolean isConsumed() {
|
||||
+ return consumed.get();
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ private static final class PacketSendAction extends WrappedConsumer {
|
||||
+ private final Packet<?> packet;
|
||||
+
|
||||
+ private PacketSendAction(final Packet<?> packet, @Nullable final PacketSendListener packetSendListener, final boolean flush) {
|
||||
+ super(connection -> connection.sendPacket(packet, packetSendListener, flush));
|
||||
+ this.packet = packet;
|
||||
+ }
|
||||
+ }
|
||||
+ // Paper end - Optimize network
|
||||
}
|
||||
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
|
||||
index cc658a61065d5c0021a4b88fa58b40211b94f8ec..da11266a0a23f446196e6facf2c358cfcc18070f 100644
|
||||
--- a/src/main/java/net/minecraft/network/protocol/Packet.java
|
||||
+++ b/src/main/java/net/minecraft/network/protocol/Packet.java
|
||||
@@ -11,6 +11,30 @@ public interface Packet<T extends PacketListener> {
|
||||
void handle(T listener);
|
||||
|
||||
// Paper start
|
||||
+ /**
|
||||
+ * @param player Null if not at PLAY stage yet
|
||||
+ */
|
||||
+ default void onPacketDispatch(@Nullable net.minecraft.server.level.ServerPlayer player) {
|
||||
+ }
|
||||
+
|
||||
+ /**
|
||||
+ * @param player Null if not at PLAY stage yet
|
||||
+ * @param future Can be null if packet was cancelled
|
||||
+ */
|
||||
+ default void onPacketDispatchFinish(@Nullable net.minecraft.server.level.ServerPlayer player, @Nullable io.netty.channel.ChannelFuture future) {}
|
||||
+
|
||||
+ default boolean hasFinishListener() {
|
||||
+ return false;
|
||||
+ }
|
||||
+
|
||||
+ default boolean isReady() {
|
||||
+ return true;
|
||||
+ }
|
||||
+
|
||||
+ @Nullable
|
||||
+ default java.util.List<Packet<?>> getExtraPackets() {
|
||||
+ return null;
|
||||
+ }
|
||||
default boolean packetTooLarge(net.minecraft.network.Connection manager) {
|
||||
return false;
|
||||
}
|
||||
diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
index d870fbec236a3660f12e0f45cf9431067b18468b..caeead6c6082855f1651ee28263cc9f60423ca0c 100644
|
||||
--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
+++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
@@ -63,10 +63,12 @@ public class ServerConnectionListener {
|
||||
final List<Connection> connections = Collections.synchronizedList(Lists.newArrayList());
|
||||
// Paper start - prevent blocking on adding a new network manager while the server is ticking
|
||||
private final java.util.Queue<Connection> pending = new java.util.concurrent.ConcurrentLinkedQueue<>();
|
||||
+ private static final boolean disableFlushConsolidation = Boolean.getBoolean("Paper.disableFlushConsolidate"); // Paper - Optimize network
|
||||
private final void addPending() {
|
||||
Connection manager = null;
|
||||
while ((manager = pending.poll()) != null) {
|
||||
connections.add(manager);
|
||||
+ manager.isPending = false; // Paper - Optimize network
|
||||
}
|
||||
}
|
||||
// Paper end
|
||||
@@ -103,6 +105,7 @@ public class ServerConnectionListener {
|
||||
;
|
||||
}
|
||||
|
||||
+ if (!disableFlushConsolidation) channel.pipeline().addFirst(new io.netty.handler.flush.FlushConsolidationHandler()); // Paper - Optimize network
|
||||
ChannelPipeline channelpipeline = channel.pipeline().addLast("timeout", new ReadTimeoutHandler(30)).addLast("legacy_query", new LegacyQueryHandler(ServerConnectionListener.this.getServer()));
|
||||
|
||||
Connection.configureSerialization(channelpipeline, PacketFlow.SERVERBOUND, (BandwidthDebugMonitor) null);
|
|
@ -494,7 +494,7 @@ index 52eb3176437113f9a0ff85d10ce5c2415e1b5570..b54ddd0ba0b001fbcb1838a838ca4890
|
|||
}
|
||||
}
|
||||
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
|
||||
index 9f8fbb8231b60c287c9442365c2f95cde92969b8..4f33d7369c95f4d2afdef1cc333a7713f08d253d 100644
|
||||
index d8ae86dcc07872c63064782c9864b873f07139d1..af4ded7a31da34ccebb3cfe9c1eb5d5c918950b1 100644
|
||||
--- a/src/main/java/net/minecraft/network/Connection.java
|
||||
+++ b/src/main/java/net/minecraft/network/Connection.java
|
||||
@@ -75,13 +75,13 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
|
@ -513,8 +513,8 @@ index 9f8fbb8231b60c287c9442365c2f95cde92969b8..4f33d7369c95f4d2afdef1cc333a7713
|
|||
+ return new DefaultEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).setUncaughtExceptionHandler(new net.minecraft.DefaultUncaughtExceptionHandlerWithName(LOGGER)).build()); // Paper
|
||||
});
|
||||
private final PacketFlow receiving;
|
||||
private final Queue<Consumer<Connection>> pendingActions = Queues.newConcurrentLinkedQueue();
|
||||
@@ -207,7 +207,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
private final Queue<WrappedConsumer> pendingActions = Queues.newConcurrentLinkedQueue();
|
||||
@@ -211,7 +211,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -573,7 +573,7 @@ index 77e19f345bf68d12686a65e669cd597cd92af910..8dbcc1b3a70b6bbea3bd2d15b6d66cc4
|
|||
// Paper end
|
||||
this.connection.send(new ClientboundDisconnectPacket(ServerConfigurationPacketListenerImpl.DISCONNECT_REASON_INVALID_DATA));
|
||||
diff --git a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
index d870fbec236a3660f12e0f45cf9431067b18468b..c98d3c5afcce17a6f13f8d456b9f77b1f2a4f3f0 100644
|
||||
index caeead6c6082855f1651ee28263cc9f60423ca0c..b2bfb3893200362ac35ae60982f203f86a1148ec 100644
|
||||
--- a/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
+++ b/src/main/java/net/minecraft/server/network/ServerConnectionListener.java
|
||||
@@ -52,10 +52,10 @@ public class ServerConnectionListener {
|
Some files were not shown because too many files have changed in this diff Show more
Loading…
Reference in a new issue