Do not allow ticket level decreases to be processed asynchronously
Note: This cannot happen on the Fabric/NeoForge versions since async ticket level processing is not allowed, but can happen on Paper. This change is made here so that Paper can remain in sync. Ticket level decreases may be handled asynchronously when the off-thread invokes processTicketUpdates() when the main thread is running ChunkHolderManager#tick(). This is because the ticket update is queued during tick(), but not executed (invoking processTicketUpdates) until after releasing the ticket lock. This creates a small window for an off-thread to invoke processTicketUpdates() and steal the update. When the update is stolen, the full chunk status update (if any) will be eventually queued to execute via the chunk task queue. If the chunk queue is processed during the server tick at any point other than the ChunkHolderManager tick, then any ticket level decrease will violate an important invariant in the Moonrise chunk system: ticket level decreases only occur during ChunkHolderManager tick. This invariant exists to make interfacing with the chunk system easier, especially working with off-thread contexts. This change is specifically made to work towards fixing https://github.com/PaperMC/Folia/issues/363
This commit is contained in:
parent
219f86ee06
commit
e4eb69b8a1
1 changed files with 64 additions and 44 deletions
|
@ -6874,12 +6874,13 @@ index 0000000000000000000000000000000000000000..7eafc5b7cba23d8dec92ecc1050afe3f
|
|||
\ No newline at end of file
|
||||
diff --git a/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkHolderManager.java b/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkHolderManager.java
|
||||
new file mode 100644
|
||||
index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e5796a123
|
||||
index 0000000000000000000000000000000000000000..6ce4a98e4d3b633e3c87944c23b6b3f0ff58f159
|
||||
--- /dev/null
|
||||
+++ b/ca/spottedleaf/moonrise/patches/chunk_system/scheduling/ChunkHolderManager.java
|
||||
@@ -0,0 +1,1541 @@
|
||||
@@ -0,0 +1,1561 @@
|
||||
+package ca.spottedleaf.moonrise.patches.chunk_system.scheduling;
|
||||
+
|
||||
+import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue;
|
||||
+import ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock;
|
||||
+import ca.spottedleaf.concurrentutil.map.ConcurrentLong2ReferenceChainedHashTable;
|
||||
+import ca.spottedleaf.concurrentutil.util.Priority;
|
||||
|
@ -6962,6 +6963,7 @@ index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e
|
|||
+ private long currentTick;
|
||||
+
|
||||
+ private final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = new ArrayDeque<>();
|
||||
+ private final MultiThreadedQueue<NewChunkHolder> offThreadPendingFullLoadUpdate = new MultiThreadedQueue<>();
|
||||
+ private final ObjectRBTreeSet<NewChunkHolder> autoSaveQueue = new ObjectRBTreeSet<>((final NewChunkHolder c1, final NewChunkHolder c2) -> {
|
||||
+ if (c1 == c2) {
|
||||
+ return 0;
|
||||
|
@ -6992,20 +6994,20 @@ index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e
|
|||
+ this.unloadQueue = new ChunkUnloadQueue(((ChunkSystemServerLevel)world).moonrise$getRegionChunkShift());
|
||||
+ }
|
||||
+
|
||||
+ public boolean processTicketUpdates(final int posX, final int posZ) {
|
||||
+ public boolean processTicketUpdates(final int chunkX, final int chunkZ) {
|
||||
+ final int ticketShift = ThreadedTicketLevelPropagator.SECTION_SHIFT;
|
||||
+ final int ticketMask = (1 << ticketShift) - 1;
|
||||
+ final List<ChunkProgressionTask> scheduledTasks = new ArrayList<>();
|
||||
+ final List<NewChunkHolder> changedFullStatus = new ArrayList<>();
|
||||
+ final boolean ret;
|
||||
+ final ReentrantAreaLock.Node ticketLock = this.ticketLockArea.lock(
|
||||
+ ((posX >> ticketShift) - 1) << ticketShift,
|
||||
+ ((posZ >> ticketShift) - 1) << ticketShift,
|
||||
+ (((posX >> ticketShift) + 1) << ticketShift) | ticketMask,
|
||||
+ (((posZ >> ticketShift) + 1) << ticketShift) | ticketMask
|
||||
+ ((chunkX >> ticketShift) - 1) << ticketShift,
|
||||
+ ((chunkZ >> ticketShift) - 1) << ticketShift,
|
||||
+ (((chunkX >> ticketShift) + 1) << ticketShift) | ticketMask,
|
||||
+ (((chunkZ >> ticketShift) + 1) << ticketShift) | ticketMask
|
||||
+ );
|
||||
+ try {
|
||||
+ ret = this.processTicketUpdatesNoLock(posX >> ticketShift, posZ >> ticketShift, scheduledTasks, changedFullStatus);
|
||||
+ ret = this.processTicketUpdatesNoLock(chunkX >> ticketShift, chunkZ >> ticketShift, scheduledTasks, changedFullStatus);
|
||||
+ } finally {
|
||||
+ this.ticketLockArea.unlock(ticketLock);
|
||||
+ }
|
||||
|
@ -7711,6 +7713,9 @@ index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e
|
|||
+ return removeDelay <= 0L;
|
||||
+ };
|
||||
+
|
||||
+ final List<ChunkProgressionTask> scheduledTasks = new ArrayList<>();
|
||||
+ final List<NewChunkHolder> changedFullStatus = new ArrayList<>();
|
||||
+
|
||||
+ for (final PrimitiveIterator.OfLong iterator = this.sectionToChunkToExpireCount.keyIterator(); iterator.hasNext();) {
|
||||
+ final long sectionKey = iterator.nextLong();
|
||||
+
|
||||
|
@ -7719,9 +7724,16 @@ index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e
|
|||
+ continue;
|
||||
+ }
|
||||
+
|
||||
+ final int lowerChunkX = CoordinateUtils.getChunkX(sectionKey) << sectionShift;
|
||||
+ final int lowerChunkZ = CoordinateUtils.getChunkZ(sectionKey) << sectionShift;
|
||||
+
|
||||
+ final int ticketShift = ThreadedTicketLevelPropagator.SECTION_SHIFT;
|
||||
+ final int ticketMask = (1 << ticketShift) - 1;
|
||||
+ final ReentrantAreaLock.Node ticketLock = this.ticketLockArea.lock(
|
||||
+ CoordinateUtils.getChunkX(sectionKey) << sectionShift,
|
||||
+ CoordinateUtils.getChunkZ(sectionKey) << sectionShift
|
||||
+ ((lowerChunkX >> ticketShift) - 1) << ticketShift,
|
||||
+ ((lowerChunkZ >> ticketShift) - 1) << ticketShift,
|
||||
+ (((lowerChunkX >> ticketShift) + 1) << ticketShift) | ticketMask,
|
||||
+ (((lowerChunkZ >> ticketShift) + 1) << ticketShift) | ticketMask
|
||||
+ );
|
||||
+
|
||||
+ try {
|
||||
|
@ -7768,9 +7780,23 @@ index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e
|
|||
+ if (chunkToExpireCount.isEmpty()) {
|
||||
+ this.sectionToChunkToExpireCount.remove(sectionKey);
|
||||
+ }
|
||||
+
|
||||
+ // In order to prevent a race condition where an off-thread invokes processTicketUpdates(), we need to process ticket updates here
|
||||
+ // so that we catch any additions to the changed full status list. If an off-thread were to process tickets here, it would not be guaranteed
|
||||
+ // that it would be added to the full changed status set by the end of the call - possibly allowing ticket level decreases to be processed
|
||||
+ // outside of this call, which is not an intended or expected of this chunk system.
|
||||
+ this.processTicketUpdatesNoLock(lowerChunkX >> ThreadedTicketLevelPropagator.SECTION_SHIFT, lowerChunkZ >> ThreadedTicketLevelPropagator.SECTION_SHIFT, scheduledTasks, changedFullStatus);
|
||||
+ } finally {
|
||||
+ this.ticketLockArea.unlock(ticketLock);
|
||||
+ }
|
||||
+
|
||||
+ this.addChangedStatuses(changedFullStatus);
|
||||
+ changedFullStatus.clear(); // clear for next loop iteration
|
||||
+
|
||||
+ for (int i = 0, len = scheduledTasks.size(); i < len; ++i) {
|
||||
+ scheduledTasks.get(i).schedule();
|
||||
+ }
|
||||
+ scheduledTasks.clear(); // clear for next loop iteration
|
||||
+ }
|
||||
+
|
||||
+ this.processTicketUpdates();
|
||||
|
@ -7997,14 +8023,9 @@ index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e
|
|||
+ return;
|
||||
+ }
|
||||
+ if (!TickThread.isTickThread()) {
|
||||
+ this.taskScheduler.scheduleChunkTask(() -> {
|
||||
+ final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = ChunkHolderManager.this.pendingFullLoadUpdate;
|
||||
+ for (int i = 0, len = changedFullStatus.size(); i < len; ++i) {
|
||||
+ pendingFullLoadUpdate.add(changedFullStatus.get(i));
|
||||
+ }
|
||||
+
|
||||
+ ChunkHolderManager.this.processPendingFullUpdate();
|
||||
+ }, Priority.HIGHEST);
|
||||
+ // These will be handled on the next ServerChunkCache$MainThreadExecutor#pollTask, as it runs the distance manager update
|
||||
+ // which will invoke processTicketUpdates
|
||||
+ this.offThreadPendingFullLoadUpdate.addAll(changedFullStatus);
|
||||
+ } else {
|
||||
+ final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = this.pendingFullLoadUpdate;
|
||||
+ for (int i = 0, len = changedFullStatus.size(); i < len; ++i) {
|
||||
|
@ -8285,36 +8306,20 @@ index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e
|
|||
+ }
|
||||
+
|
||||
+ public boolean processTicketUpdates() {
|
||||
+ return this.processTicketUpdates(true, null);
|
||||
+ }
|
||||
+
|
||||
+ private static final ThreadLocal<List<ChunkProgressionTask>> CURRENT_TICKET_UPDATE_SCHEDULING = new ThreadLocal<>();
|
||||
+
|
||||
+ static List<ChunkProgressionTask> getCurrentTicketUpdateScheduling() {
|
||||
+ return CURRENT_TICKET_UPDATE_SCHEDULING.get();
|
||||
+ }
|
||||
+
|
||||
+ private boolean processTicketUpdates(final boolean processFullUpdates, List<ChunkProgressionTask> scheduledTasks) {
|
||||
+ if (BLOCK_TICKET_UPDATES.get() == Boolean.TRUE) {
|
||||
+ throw new IllegalStateException("Cannot update ticket level while unloading chunks or updating entity manager");
|
||||
+ }
|
||||
+ if (!PlatformHooks.get().allowAsyncTicketUpdates() && !TickThread.isTickThread()) {
|
||||
+ final boolean isTickThread = TickThread.isTickThread();
|
||||
+
|
||||
+ if (!PlatformHooks.get().allowAsyncTicketUpdates() && isTickThread) {
|
||||
+ TickThread.ensureTickThread("Cannot asynchronously process ticket updates");
|
||||
+ }
|
||||
+
|
||||
+ List<NewChunkHolder> changedFullStatus = null;
|
||||
+
|
||||
+ final boolean isTickThread = TickThread.isTickThread();
|
||||
+
|
||||
+ boolean ret = false;
|
||||
+ final boolean canProcessFullUpdates = processFullUpdates & isTickThread;
|
||||
+ final boolean canProcessScheduling = scheduledTasks == null;
|
||||
+
|
||||
+ if (this.ticketLevelPropagator.hasPendingUpdates()) {
|
||||
+ if (scheduledTasks == null) {
|
||||
+ scheduledTasks = new ArrayList<>();
|
||||
+ }
|
||||
+ changedFullStatus = new ArrayList<>();
|
||||
+ final List<ChunkProgressionTask> scheduledTasks = new ArrayList<>();
|
||||
+ final List<NewChunkHolder> changedFullStatus = new ArrayList<>();
|
||||
+
|
||||
+ this.blockTicketUpdates();
|
||||
+ try {
|
||||
|
@ -8325,27 +8330,42 @@ index 0000000000000000000000000000000000000000..f473999938840562b1007a789600342e
|
|||
+ } finally {
|
||||
+ this.unblockTicketUpdates(Boolean.FALSE);
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ if (changedFullStatus != null) {
|
||||
+ this.addChangedStatuses(changedFullStatus);
|
||||
+ }
|
||||
+
|
||||
+ if (canProcessScheduling && scheduledTasks != null) {
|
||||
+ for (int i = 0, len = scheduledTasks.size(); i < len; ++i) {
|
||||
+ scheduledTasks.get(i).schedule();
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ if (canProcessFullUpdates) {
|
||||
+ if (isTickThread) {
|
||||
+ ret |= this.processPendingFullUpdate();
|
||||
+ }
|
||||
+
|
||||
+ return ret;
|
||||
+ }
|
||||
+
|
||||
+ private static final ThreadLocal<List<ChunkProgressionTask>> CURRENT_TICKET_UPDATE_SCHEDULING = new ThreadLocal<>();
|
||||
+
|
||||
+ static List<ChunkProgressionTask> getCurrentTicketUpdateScheduling() {
|
||||
+ return CURRENT_TICKET_UPDATE_SCHEDULING.get();
|
||||
+ }
|
||||
+
|
||||
+ // only call on tick thread
|
||||
+ private void processOffThreadFullUpdates() {
|
||||
+ final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = this.pendingFullLoadUpdate;
|
||||
+ final MultiThreadedQueue<NewChunkHolder> offThreadPendingFullLoadUpdate = this.offThreadPendingFullLoadUpdate;
|
||||
+
|
||||
+ NewChunkHolder toUpdate;
|
||||
+ while ((toUpdate = offThreadPendingFullLoadUpdate.poll()) != null) {
|
||||
+ pendingFullLoadUpdate.add(toUpdate);
|
||||
+ }
|
||||
+ }
|
||||
+
|
||||
+ // only call on tick thread
|
||||
+ private boolean processPendingFullUpdate() {
|
||||
+ this.processOffThreadFullUpdates();
|
||||
+
|
||||
+ final ArrayDeque<NewChunkHolder> pendingFullLoadUpdate = this.pendingFullLoadUpdate;
|
||||
+
|
||||
+ boolean ret = false;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue