papermc/Spigot-Server-Patches/0275-Improved-Async-Task-Scheduler.patch
Aikar 7dfbe44247
Improved Async Task Scheduler
The Craft Scheduler still uses the primary thread for task scheduling.
This results in the main thread still having to do work as part of the
dispatching of async tasks.

If plugins make use of lots of async tasks, such as particle emitters
that want to keep the logic off the main thread, the main thread still
receives quite a bit of load from processing all of these queued tasks.

Additionally, resizing and managing the pending entries for all of
these asynchronous tasks takes up time on the main thread too.

This commit replaces the implementation of the scheduler when working
with asynchronous tasks, by forwarding calls to the new scheduler.

The Async Scheduler uses a single thread executor for "management" tasks.
The Management Thread is responsible for all adding and dispatching of
scheduled tasks.

The mainThreadHeartbeat will send a heartbeat task to the management thread
with the currentTick value, so that it can find which tasks to execute.

Scheduling of an async tasks also dispatches a management task, ensuring
that any Queue resizing operation occurs off of the main thread.

The async queue uses a complete separate PriorityQueue, ensuring that resize
operations are decoupled from the sync tasks queue.

Additionally, an optimization was made that if a plugin schedules
a single, non repeating, no delay task, that we immediately dispatch it
to the executor pool instead of scheduling it. This avoids an unnecessary
round trip through the queue, as well as will reduce the size growth of the
queue if a plugin schedules lots of asynchronous tasks.
2018-03-16 23:09:51 -04:00

386 lines
16 KiB
Diff

From 64c30d8a2caa9ff443e46f399839579d9b4517c2 Mon Sep 17 00:00:00 2001
From: Aikar <aikar@aikar.co>
Date: Fri, 16 Mar 2018 22:59:43 -0400
Subject: [PATCH] Improved Async Task Scheduler
The Craft Scheduler still uses the primary thread for task scheduling.
This results in the main thread still having to do work as part of the
dispatching of async tasks.
If plugins make use of lots of async tasks, such as particle emitters
that want to keep the logic off the main thread, the main thread still
receives quite a bit of load from processing all of these queued tasks.
Additionally, resizing and managing the pending entries for all of
these asynchronous tasks takes up time on the main thread too.
This commit replaces the implementation of the scheduler when working
with asynchronous tasks, by forwarding calls to the new scheduler.
The Async Scheduler uses a single thread executor for "management" tasks.
The Management Thread is responsible for all adding and dispatching of
scheduled tasks.
The mainThreadHeartbeat will send a heartbeat task to the management thread
with the currentTick value, so that it can find which tasks to execute.
Scheduling of an async tasks also dispatches a management task, ensuring
that any Queue resizing operation occurs off of the main thread.
The async queue uses a complete separate PriorityQueue, ensuring that resize
operations are decoupled from the sync tasks queue.
Additionally, an optimization was made that if a plugin schedules
a single, non repeating, no delay task, that we immediately dispatch it
to the executor pool instead of scheduling it. This avoids an unnecessary
round trip through the queue, as well as will reduce the size growth of the
queue if a plugin schedules lots of asynchronous tasks.
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncScheduler.java
new file mode 100644
index 000000000..b1efbc3e7
--- /dev/null
+++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncScheduler.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2018 Daniel Ennis (Aikar) MIT License
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining
+ * a copy of this software and associated documentation files (the
+ * "Software"), to deal in the Software without restriction, including
+ * without limitation the rights to use, copy, modify, merge, publish,
+ * distribute, sublicense, and/or sell copies of the Software, and to
+ * permit persons to whom the Software is furnished to do so, subject to
+ * the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+ * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+ * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+ * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+package org.bukkit.craftbukkit.scheduler;
+
+import com.destroystokyo.paper.ServerSchedulerReportingWrapper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.bukkit.plugin.Plugin;
+import org.bukkit.scheduler.BukkitTask;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class CraftAsyncScheduler extends CraftScheduler {
+
+ private final Executor management = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
+ .setNameFormat("Craft Scheduler Management Thread").build());
+ CraftAsyncScheduler() {
+ super(true);
+ }
+
+ @Override
+ public void cancelTask(int taskId) {
+ this.management.execute(() -> this.removeTask(taskId));
+ }
+
+ private synchronized void removeTask(int taskId) {
+ this.pending.removeIf((task) -> {
+ if (task.getTaskId() == taskId) {
+ task.cancel0();
+ return true;
+ }
+ return false;
+ });
+ }
+
+ @Override
+ public void mainThreadHeartbeat(int currentTick) {
+ this.currentTick = currentTick;
+ this.management.execute(() -> this.runTasks(currentTick));
+ }
+
+ private synchronized void runTasks(int currentTick) {
+ final List<CraftTask> temp = new ArrayList<>();
+ while (!this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick) {
+ CraftTask task = this.pending.remove();
+ this.runners.put(task.getTaskId(), task);
+ this.executor.execute(new ServerSchedulerReportingWrapper(task));
+ final long period = task.getPeriod();
+ if (period > 0) {
+ task.setNextRun(currentTick + period);
+ temp.add(task);
+ }
+ }
+ this.pending.addAll(temp);
+ }
+
+ @Override
+ protected CraftTask handle(CraftTask task, final long delay) {
+ if (task.getPeriod() == -1L && delay == 0L) {
+ this.executor.execute(task);
+ return task;
+ }
+ task.setNextRun(this.currentTick + delay);
+ this.management.execute(() -> this.addTask(task));
+ return task;
+ }
+
+ private synchronized void addTask(CraftTask task) {
+ this.pending.add(task);
+ }
+
+ @Override
+ public synchronized void cancelTasks(Plugin plugin) {
+ for (Iterator<CraftTask> iterator = this.pending.iterator(); iterator.hasNext(); ) {
+ CraftTask taskPending = iterator.next();
+ if (taskPending.getTaskId() != -1 && (plugin == null || taskPending.getOwner().equals(plugin))) {
+ taskPending.cancel0();
+ iterator.remove();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void cancelAllTasks() {
+ cancelTasks(null);
+ }
+
+ @Override
+ public synchronized List<BukkitTask> getPendingTasks() {
+ ArrayList<BukkitTask> list = new ArrayList<>();
+ for (CraftTask task : this.runners.values()) {
+ if (isValid(task)) {
+ list.add(task);
+ }
+ }
+ for (CraftTask task : this.pending) {
+ if (isValid(task) && !list.contains(task)) {
+ list.add(task);
+ }
+ }
+
+ return list;
+ }
+
+ @Override
+ public synchronized boolean isQueued(int taskId) {
+ CraftTask runningTask = this.runners.get(taskId);
+ if (runningTask != null && isValid(runningTask)) {
+ return true;
+ }
+ for (CraftTask task : this.pending) {
+ if (task.getTaskId() == taskId) {
+ return isValid(task); // The task will run
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Task is not cancelled
+ * @param runningTask
+ * @return
+ */
+ static boolean isValid(CraftTask runningTask) {
+ return runningTask.getPeriod() >= -1L;
+ }
+}
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java
index e47f4cca2..4a4159879 100644
--- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java
+++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java
@@ -15,7 +15,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import co.aikar.timings.MinecraftTimings; // Paper
-import com.destroystokyo.paper.ServerSchedulerReportingWrapper;
import com.destroystokyo.paper.event.server.ServerExceptionEvent;
import com.destroystokyo.paper.exception.ServerSchedulerException;
import org.apache.commons.lang.Validate;
@@ -61,7 +60,7 @@ public class CraftScheduler implements BukkitScheduler {
/**
* Main thread logic only
*/
- private final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10,
+ final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10, // Paper
new Comparator<CraftTask>() {
public int compare(final CraftTask o1, final CraftTask o2) {
int value = Long.compare(o1.getNextRun(), o2.getNextRun());
@@ -77,9 +76,9 @@ public class CraftScheduler implements BukkitScheduler {
/**
* These are tasks that are currently active. It's provided for 'viewing' the current state.
*/
- private final ConcurrentHashMap<Integer, CraftTask> runners = new ConcurrentHashMap<Integer, CraftTask>();
- private volatile int currentTick = -1;
- private final Executor executor = Executors.newCachedThreadPool(new com.google.common.util.concurrent.ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %1$d").build()); // Spigot
+ final ConcurrentHashMap<Integer, CraftTask> runners = new ConcurrentHashMap<Integer, CraftTask>(); // Paper
+ volatile int currentTick = -1; // Paper
+ final Executor executor = Executors.newCachedThreadPool(new com.google.common.util.concurrent.ThreadFactoryBuilder().setNameFormat("Craft Scheduler Thread - %1$d").build()); // Spigot // Paper
//private CraftAsyncDebugger debugHead = new CraftAsyncDebugger(-1, null, null) {@Override StringBuilder debugTo(StringBuilder string) {return string;}}; // Paper
//private CraftAsyncDebugger debugTail = debugHead; // Paper
private static final int RECENT_TICKS;
@@ -88,6 +87,23 @@ public class CraftScheduler implements BukkitScheduler {
RECENT_TICKS = 30;
}
+ // Paper start
+ private final CraftScheduler asyncScheduler;
+ private final boolean isAsyncScheduler;
+ public CraftScheduler() {
+ this(false);
+ }
+
+ public CraftScheduler(boolean isAsync) {
+ this.isAsyncScheduler = isAsync;
+ if (isAsync) {
+ this.asyncScheduler = this;
+ } else {
+ this.asyncScheduler = new CraftAsyncScheduler();
+ }
+ }
+ // Paper end
+
public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task) {
return this.scheduleSyncDelayedTask(plugin, task, 0l);
}
@@ -154,7 +170,7 @@ public class CraftScheduler implements BukkitScheduler {
} else if (period < -1l) {
period = -1l;
}
- return handle(new CraftAsyncTask(runners, plugin, runnable, nextId(), period), delay);
+ return handle(new CraftAsyncTask(this.asyncScheduler.runners, plugin, runnable, nextId(), period), delay); // Paper
}
public <T> Future<T> callSyncMethod(final Plugin plugin, final Callable<T> task) {
@@ -168,6 +184,11 @@ public class CraftScheduler implements BukkitScheduler {
if (taskId <= 0) {
return;
}
+ // Paper start
+ if (!this.isAsyncScheduler) {
+ this.asyncScheduler.cancelTask(taskId);
+ }
+ // Paper end
CraftTask task = runners.get(taskId);
if (task != null) {
task.cancel0();
@@ -207,6 +228,11 @@ public class CraftScheduler implements BukkitScheduler {
public void cancelTasks(final Plugin plugin) {
Validate.notNull(plugin, "Cannot cancel tasks of null plugin");
+ // Paper start
+ if (!this.isAsyncScheduler) {
+ this.asyncScheduler.cancelTasks(plugin);
+ }
+ // Paper end
final CraftTask task = new CraftTask(
new Runnable() {
public void run() {
@@ -244,6 +270,11 @@ public class CraftScheduler implements BukkitScheduler {
}
public void cancelAllTasks() {
+ // Paper start
+ if (!this.isAsyncScheduler) {
+ this.asyncScheduler.cancelAllTasks();
+ }
+ // Paper end
final CraftTask task = new CraftTask(
new Runnable() {
public void run() {
@@ -272,6 +303,11 @@ public class CraftScheduler implements BukkitScheduler {
}
public boolean isCurrentlyRunning(final int taskId) {
+ // Paper start
+ if (!isAsyncScheduler) { //noinspection TailRecursion
+ return this.asyncScheduler.isCurrentlyRunning(taskId);
+ }
+ // Paper end
final CraftTask task = runners.get(taskId);
if (task == null || task.isSync()) {
return false;
@@ -286,6 +322,11 @@ public class CraftScheduler implements BukkitScheduler {
if (taskId <= 0) {
return false;
}
+ // Paper start
+ if (!this.isAsyncScheduler && this.asyncScheduler.isQueued(taskId)) {
+ return true;
+ }
+ // Paper end
for (CraftTask task = head.getNext(); task != null; task = task.getNext()) {
if (task.getTaskId() == taskId) {
return task.getPeriod() >= -1l; // The task will run
@@ -296,6 +337,12 @@ public class CraftScheduler implements BukkitScheduler {
}
public List<BukkitWorker> getActiveWorkers() {
+ // Paper start
+ if (!isAsyncScheduler) {
+ //noinspection TailRecursion
+ return this.asyncScheduler.getActiveWorkers();
+ }
+ // Paper end
final ArrayList<BukkitWorker> workers = new ArrayList<BukkitWorker>();
for (final CraftTask taskObj : runners.values()) {
// Iterator will be a best-effort (may fail to grab very new values) if called from an async thread
@@ -332,6 +379,11 @@ public class CraftScheduler implements BukkitScheduler {
pending.add(task);
}
}
+ // Paper start
+ if (!this.isAsyncScheduler) {
+ pending.addAll(this.asyncScheduler.getPendingTasks());
+ }
+ // Paper end
return pending;
}
@@ -339,6 +391,11 @@ public class CraftScheduler implements BukkitScheduler {
* This method is designed to never block or wait for locks; an immediate execution of all current tasks.
*/
public void mainThreadHeartbeat(final int currentTick) {
+ // Paper start
+ if (!this.isAsyncScheduler) {
+ this.asyncScheduler.mainThreadHeartbeat(currentTick);
+ }
+ // Paper end
this.currentTick = currentTick;
final List<CraftTask> temp = this.temp;
parsePending();
@@ -372,7 +429,7 @@ public class CraftScheduler implements BukkitScheduler {
parsePending();
} else {
//debugTail = debugTail.setNext(new CraftAsyncDebugger(currentTick + RECENT_TICKS, task.getOwner(), task.getTaskClass())); // Paper
- executor.execute(new ServerSchedulerReportingWrapper(task)); // Paper
+ task.getOwner().getLogger().log(Level.SEVERE, "Unexpected Async Task in the Sync Scheduler. Report this to Paper"); // Paper
// We don't need to parse pending
// (async tasks must live with race-conditions if they attempt to cancel between these few lines of code)
}
@@ -400,7 +457,13 @@ public class CraftScheduler implements BukkitScheduler {
tailTask.setNext(task);
}
- private CraftTask handle(final CraftTask task, final long delay) {
+ protected CraftTask handle(final CraftTask task, final long delay) { // Paper
+ // Paper start
+ if (!this.isAsyncScheduler && !task.isSync()) {
+ this.asyncScheduler.handle(task, delay);
+ return task;
+ }
+ // Paper end
task.setNextRun(currentTick + delay);
addTask(task);
return task;
--
2.16.2