Fix packet duplicating at some points (#8566)

Due to the weakly consistent of ConcurrentLinkedQueue iterator, at some points, packet will be resent twice times or more, causing some weird behaviors (e.g. kicked for illegal movement since the same ClientboundPlayerPositionPacket was sent two times). This changes for the patch add a flag for marking if the packet was consumed to prevent such issue and ensure consistently of the packet queue.
This commit is contained in:
sandtechnology 2022-11-28 00:36:35 +08:00 committed by GitHub
parent f1583fcd74
commit 28b4027d45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 65 additions and 37 deletions

View file

@ -25,10 +25,10 @@ Also adds Netty Channel Flush Consolidation to reduce the amount of flushing
Also avoids spamming closed channel exception by rechecking closed state in dispatch
and then catch exceptions and close if they fire.
Part of this commit was authored by: Spottedleaf
Part of this commit was authored by: Spottedleaf, sandtechnology
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f7d57d800 100644
index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..8bc8b6013ea5803e091cf3534130b72278b9c29e 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -116,6 +116,10 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@ -160,7 +160,7 @@ index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f
ChannelFuture channelfuture = this.channel.writeAndFlush(packet);
if (callbacks != null) {
@@ -275,28 +359,64 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -275,28 +359,72 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
});
}
@ -185,10 +185,6 @@ index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f
}
- private void flushQueue() {
- try { // Paper - add pending task queue
- if (this.channel != null && this.channel.isOpen()) {
- Queue queue = this.queue;
-
+ // Paper start - rewrite this to be safer if ran off main thread
+ private boolean flushQueue() { // void -> boolean
+ if (!isConnected()) {
@ -198,19 +194,16 @@ index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f
+ return processQueue();
+ } else if (isPending) {
+ // Should only happen during login/status stages
synchronized (this.queue) {
- Connection.PacketHolder networkmanager_queuedpacket;
-
- while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) {
- this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener);
- }
+ synchronized (this.queue) {
+ return this.processQueue();
+ }
+ }
+ return false;
+ }
+ private boolean processQueue() {
+ try { // Paper - add pending task queue
try { // Paper - add pending task queue
- if (this.channel != null && this.channel.isOpen()) {
- Queue queue = this.queue;
+ if (this.queue.isEmpty()) return true;
+ // If we are on main, we are safe here in that nothing else should be processing queue off main anymore
+ // But if we are not on main due to login/status, the parent is synchronized on packetQueue
@ -223,19 +216,32 @@ index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f
+ return true;
+ }
- synchronized (this.queue) {
- Connection.PacketHolder networkmanager_queuedpacket;
+ // Paper start - checking isConsumed flag and skipping packet sending
+ if (queued.isConsumed()) {
+ continue;
+ }
+ // Paper end - checking isConsumed flag and skipping packet sending
- while ((networkmanager_queuedpacket = (Connection.PacketHolder) this.queue.poll()) != null) {
- this.sendPacket(networkmanager_queuedpacket.packet, networkmanager_queuedpacket.listener);
+ Packet<?> packet = queued.packet;
+ if (!packet.isReady()) {
+ return false;
+ } else {
+ iterator.remove();
+ if (queued.tryMarkConsumed()) { // Paper - try to mark isConsumed flag for de-duplicating packet
+ this.sendPacket(packet, queued.listener);
}
-
}
}
+ return true;
} finally { // Paper start - add pending task queue
Runnable r;
while ((r = this.pendingTasks.poll()) != null) {
@@ -304,6 +424,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -304,6 +432,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
}
} // Paper end - add pending task queue
}
@ -243,7 +249,7 @@ index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f
public void tick() {
this.flushQueue();
@@ -340,9 +461,22 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -340,9 +469,22 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
return this.address;
}
@ -266,7 +272,7 @@ index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f
// Spigot End
if (this.channel.isOpen()) {
this.channel.close(); // We can't wait as this may be called from an event loop.
@@ -460,7 +594,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -460,7 +602,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
public void handleDisconnection() {
if (this.channel != null && !this.channel.isOpen()) {
if (this.disconnectionHandled) {
@ -275,7 +281,7 @@ index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f
} else {
this.disconnectionHandled = true;
if (this.getDisconnectedReason() != null) {
@@ -468,7 +602,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -468,7 +610,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
} else if (this.getPacketListener() != null) {
this.getPacketListener().onDisconnect(Component.translatable("multiplayer.disconnect.generic"));
}
@ -284,6 +290,25 @@ index 8b1c39cc7f77ca36d0341fb68de1441cc61f19e4..593ea68037b467797aeeaee331a0349f
// Paper start - Add PlayerConnectionCloseEvent
final PacketListener packetListener = this.getPacketListener();
if (packetListener instanceof net.minecraft.server.network.ServerGamePacketListenerImpl) {
@@ -508,6 +650,18 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@Nullable
final PacketSendListener listener;
+ // Paper start - isConsumed flag for the connection
+ private java.util.concurrent.atomic.AtomicBoolean isConsumed = new java.util.concurrent.atomic.AtomicBoolean(false);
+
+ public boolean tryMarkConsumed() {
+ return isConsumed.compareAndSet(false, true);
+ }
+
+ public boolean isConsumed() {
+ return isConsumed.get();
+ }
+ // Paper end - isConsumed flag for the connection
+
public PacketHolder(Packet<?> packet, @Nullable PacketSendListener callbacks) {
this.packet = packet;
this.listener = callbacks;
diff --git a/src/main/java/net/minecraft/network/protocol/Packet.java b/src/main/java/net/minecraft/network/protocol/Packet.java
index 74bfe0d3942259c45702b099efdc4e101a4e3022..e8fcd56906d26f6dc87959e32c4c7c78cfea9658 100644
--- a/src/main/java/net/minecraft/network/protocol/Packet.java

View file

@ -8,10 +8,10 @@ the world per tick, this attempts to reduce the impact that join floods
has on the server
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index bbedcdb71a326b3286805d0081e71c54a4312622..22e2e314a4bb1b22758130d4e9065f9b87b0116e 100644
index dca0c978b30962d4216dc443d7d105e54a29ae1b..3ae55dd6441b2dfdb67eb2f24ecf885bab89e7a6 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -426,8 +426,23 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -434,8 +434,23 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
}
// Paper end

View file

@ -122,10 +122,10 @@ index 0000000000000000000000000000000000000000..0d7e7db9e37ef0183c32b217bd944fb4
+ COMPRESSION_DISABLED
+}
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index de1fdb93e0e3acd58429b042629df8c00bfb65ad..13d33cd159eca63c98d7239f527c444c71519634 100644
index f76ee2131c35a9dbf7ad9d086b51c9644b0a2462..a7ab818440dc98087d877d7efdfafbf65e9ca92d 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -597,6 +597,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -605,6 +605,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
} else {
this.channel.pipeline().addBefore("encoder", "compress", new CompressionEncoder(compressionThreshold));
}
@ -133,7 +133,7 @@ index de1fdb93e0e3acd58429b042629df8c00bfb65ad..13d33cd159eca63c98d7239f527c444c
} else {
if (this.channel.pipeline().get("decompress") instanceof CompressionDecoder) {
this.channel.pipeline().remove("decompress");
@@ -605,6 +606,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -613,6 +614,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
if (this.channel.pipeline().get("compress") instanceof CompressionEncoder) {
this.channel.pipeline().remove("compress");
}

View file

@ -11,10 +11,10 @@ Tested-by: Mariell Hoversholm <proximyst@proximyst.com>
Reviewed-by: Mariell Hoversholm <proximyst@proximyst.com>
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index 13d33cd159eca63c98d7239f527c444c71519634..7bece5bd90d8f372ead5aef473f077a2a1ef9fa2 100644
index a7ab818440dc98087d877d7efdfafbf65e9ca92d..ea6c1439e92c24194cde4dcf6388363a8cc5d649 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -671,6 +671,11 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -691,6 +691,11 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
// Spigot Start
public SocketAddress getRawAddress()
{

View file

@ -9,7 +9,7 @@ This patch will be used to optimise out flush calls in later
patches.
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index 7bece5bd90d8f372ead5aef473f077a2a1ef9fa2..a71db5b49b1e6a094790d060db9f30a711581db0 100644
index ea6c1439e92c24194cde4dcf6388363a8cc5d649..40a0f68c42000757ba8b0e8bdbd4cd34cce89cfe 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -122,6 +122,39 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@ -120,7 +120,7 @@ index 7bece5bd90d8f372ead5aef473f077a2a1ef9fa2..a71db5b49b1e6a094790d060db9f30a7
// If we are on main, we are safe here in that nothing else should be processing queue off main anymore
// But if we are not on main due to login/status, the parent is synchronized on packetQueue
java.util.Iterator<PacketHolder> iterator = this.queue.iterator();
@@ -407,16 +457,22 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -407,7 +457,7 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
PacketHolder queued = iterator.next(); // poll -> peek
// Fix NPE (Spigot bug caused by handleDisconnection())
@ -129,6 +129,8 @@ index 7bece5bd90d8f372ead5aef473f077a2a1ef9fa2..a71db5b49b1e6a094790d060db9f30a7
return true;
}
@@ -419,11 +469,17 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
Packet<?> packet = queued.packet;
if (!packet.isReady()) {
+ // Paper start - make only one flush call per sendPacketQueue() call
@ -139,9 +141,10 @@ index 7bece5bd90d8f372ead5aef473f077a2a1ef9fa2..a71db5b49b1e6a094790d060db9f30a7
return false;
} else {
iterator.remove();
if (queued.tryMarkConsumed()) { // Paper - try to mark isConsumed flag for de-duplicating packet
- this.sendPacket(packet, queued.listener);
+ this.sendPacket(packet, queued.listener, (!iterator.hasNext() && (needsFlush || this.canFlush)) ? Boolean.TRUE : Boolean.FALSE); // Paper - make only one flush call per sendPacketQueue() call
+ hasWrotePacket = true; // Paper - make only one flush call per sendPacketQueue() call
}
}
return true;
}

View file

@ -7,10 +7,10 @@ Subject: [PATCH] Detail more information in watchdog dumps
- Dump player name, player uuid, position, and world for packet handling
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index a71db5b49b1e6a094790d060db9f30a711581db0..489ab7c7a66969501e60fbd44c16ba4cdc180d46 100644
index 40a0f68c42000757ba8b0e8bdbd4cd34cce89cfe..31dee6fbf50db4b1fe779a8ad67ac771ac7148b7 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -505,9 +505,15 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -513,9 +513,15 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
PacketListener packetlistener = this.packetListener;
if (packetlistener instanceof TickablePacketListener) {

View file

@ -268,10 +268,10 @@ index 792883afe53d2b7989c25a81c2f9a639d5e21d20..c04379ca8a4db0f4de46ad2b3b338431
return this.threshold;
}
diff --git a/src/main/java/net/minecraft/network/Connection.java b/src/main/java/net/minecraft/network/Connection.java
index 08b74302e99e596a99f142856ae33ee29a9b1b77..69b8d1276045cd6742770dcedd6246bb1713fd3b 100644
index 2bc49ccf7eb2f1f3f956da365340e958dbb076a7..f1e1a4a48349c0e431b31327fdf217989db027b3 100644
--- a/src/main/java/net/minecraft/network/Connection.java
+++ b/src/main/java/net/minecraft/network/Connection.java
@@ -682,11 +682,28 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -690,11 +690,28 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
return networkmanager;
}
@ -304,7 +304,7 @@ index 08b74302e99e596a99f142856ae33ee29a9b1b77..69b8d1276045cd6742770dcedd6246bb
public boolean isEncrypted() {
return this.encrypted;
@@ -715,16 +732,17 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
@@ -723,16 +740,17 @@ public class Connection extends SimpleChannelInboundHandler<Packet<?>> {
public void setupCompression(int compressionThreshold, boolean rejectsBadPackets) {
if (compressionThreshold >= 0) {