diff --git a/patches/node/.patches b/patches/node/.patches index 76657ee0693b..22747c42c8e5 100644 --- a/patches/node/.patches +++ b/patches/node/.patches @@ -35,3 +35,4 @@ build_add_mjs_support_to_js2c.patch src_inline_asynccleanuphookhandle_in_headers.patch fix_handle_new_tostring_behavior_in_v8_serdes_test.patch fix_the_--harmony-weak-refs_has_been_removed_remove_from_specs.patch +node-api_faster_threadsafe_function.patch diff --git a/patches/node/node-api_faster_threadsafe_function.patch b/patches/node/node-api_faster_threadsafe_function.patch new file mode 100644 index 000000000000..2735193ca6af --- /dev/null +++ b/patches/node/node-api_faster_threadsafe_function.patch @@ -0,0 +1,262 @@ +From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001 +From: Fedor Indutny +Date: Sat, 1 May 2021 11:26:46 -0700 +Subject: node-api: faster threadsafe_function + +Invoke threadsafe_function during the same tick and avoid marshalling +costs between threads and/or churning event loop if either: + +1. There's a queued call already +2. `Push()` is called while the main thread was running + threadsafe_function + +PR-URL: https://github.com/nodejs/node/pull/38506 +Reviewed-By: Anna Henningsen +Reviewed-By: Rich Trott +Reviewed-By: James M Snell + +diff --git a/src/node_api.cc b/src/node_api.cc +index f1a5265b6a7234dc754aedc86ecd3132f3d90b09..d1076b29aeb5133a0325d3e7ebd097d207e4f4a6 100644 +--- a/src/node_api.cc ++++ b/src/node_api.cc +@@ -12,6 +12,7 @@ + #include "tracing/traced_value.h" + #include "util-inl.h" + ++#include + #include + + struct node_napi_env__ : public napi_env__ { +@@ -131,6 +132,7 @@ class ThreadSafeFunction : public node::AsyncResource { + *v8::String::Utf8Value(env_->isolate, name)), + thread_count(thread_count_), + is_closing(false), ++ dispatch_state(kDispatchIdle), + context(context_), + max_queue_size(max_queue_size_), + env(env_), +@@ -170,10 +172,8 @@ class ThreadSafeFunction : public node::AsyncResource { + return napi_closing; + } + } else { +- if (uv_async_send(&async) != 0) { +- return napi_generic_failure; +- } + queue.push(data); ++ Send(); + return napi_ok; + } + } +@@ -205,9 +205,7 @@ class ThreadSafeFunction : public node::AsyncResource { + if (is_closing && max_queue_size > 0) { + cond->Signal(lock); + } +- if (uv_async_send(&async) != 0) { +- return napi_generic_failure; +- } ++ Send(); + } + } + +@@ -232,7 +230,6 @@ class ThreadSafeFunction : public node::AsyncResource { + cond = std::make_unique(); + } + if (max_queue_size == 0 || cond) { +- CHECK_EQ(0, uv_idle_init(loop, &idle)); + return napi_ok; + } + +@@ -257,21 +254,46 @@ class ThreadSafeFunction : public node::AsyncResource { + + napi_status Unref() { + uv_unref(reinterpret_cast(&async)); +- uv_unref(reinterpret_cast(&idle)); + + return napi_ok; + } + + napi_status Ref() { + uv_ref(reinterpret_cast(&async)); +- uv_ref(reinterpret_cast(&idle)); + + return napi_ok; + } + +- void DispatchOne() { ++ inline void* Context() { ++ return context; ++ } ++ ++ protected: ++ void Dispatch() { ++ bool has_more = true; ++ ++ // Limit maximum synchronous iteration count to prevent event loop ++ // starvation. See `src/node_messaging.cc` for an inspiration. ++ unsigned int iterations_left = kMaxIterationCount; ++ while (has_more && --iterations_left != 0) { ++ dispatch_state = kDispatchRunning; ++ has_more = DispatchOne(); ++ ++ // Send() was called while we were executing the JS function ++ if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) { ++ has_more = true; ++ } ++ } ++ ++ if (has_more) { ++ Send(); ++ } ++ } ++ ++ bool DispatchOne() { + void* data = nullptr; + bool popped_value = false; ++ bool has_more = false; + + { + node::Mutex::ScopedLock lock(this->mutex); +@@ -296,9 +318,9 @@ class ThreadSafeFunction : public node::AsyncResource { + cond->Signal(lock); + } + CloseHandlesAndMaybeDelete(); +- } else { +- CHECK_EQ(0, uv_idle_stop(&idle)); + } ++ } else { ++ has_more = true; + } + } + } +@@ -316,6 +338,8 @@ class ThreadSafeFunction : public node::AsyncResource { + call_js_cb(env, js_callback, context, data); + }); + } ++ ++ return has_more; + } + + void Finalize() { +@@ -329,10 +353,6 @@ class ThreadSafeFunction : public node::AsyncResource { + EmptyQueueAndDelete(); + } + +- inline void* Context() { +- return context; +- } +- + void CloseHandlesAndMaybeDelete(bool set_closing = false) { + v8::HandleScope scope(env->isolate); + if (set_closing) { +@@ -352,18 +372,20 @@ class ThreadSafeFunction : public node::AsyncResource { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::async, + reinterpret_cast(handle)); +- v8::HandleScope scope(ts_fn->env->isolate); +- ts_fn->env->node_env()->CloseHandle( +- reinterpret_cast(&ts_fn->idle), +- [](uv_handle_t* handle) -> void { +- ThreadSafeFunction* ts_fn = +- node::ContainerOf(&ThreadSafeFunction::idle, +- reinterpret_cast(handle)); +- ts_fn->Finalize(); +- }); ++ ts_fn->Finalize(); + }); + } + ++ void Send() { ++ // Ask currently running Dispatch() to make one more iteration ++ unsigned char current_state = dispatch_state.fetch_or(kDispatchPending); ++ if ((current_state & kDispatchRunning) == kDispatchRunning) { ++ return; ++ } ++ ++ CHECK_EQ(0, uv_async_send(&async)); ++ } ++ + // Default way of calling into JavaScript. Used when ThreadSafeFunction is + // without a call_js_cb_. + static void CallJs(napi_env env, napi_value cb, void* context, void* data) { +@@ -387,16 +409,10 @@ class ThreadSafeFunction : public node::AsyncResource { + } + } + +- static void IdleCb(uv_idle_t* idle) { +- ThreadSafeFunction* ts_fn = +- node::ContainerOf(&ThreadSafeFunction::idle, idle); +- ts_fn->DispatchOne(); +- } +- + static void AsyncCb(uv_async_t* async) { + ThreadSafeFunction* ts_fn = + node::ContainerOf(&ThreadSafeFunction::async, async); +- CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb)); ++ ts_fn->Dispatch(); + } + + static void Cleanup(void* data) { +@@ -405,14 +421,20 @@ class ThreadSafeFunction : public node::AsyncResource { + } + + private: ++ static const unsigned char kDispatchIdle = 0; ++ static const unsigned char kDispatchRunning = 1 << 0; ++ static const unsigned char kDispatchPending = 1 << 1; ++ ++ static const unsigned int kMaxIterationCount = 1000; ++ + // These are variables protected by the mutex. + node::Mutex mutex; + std::unique_ptr cond; + std::queue queue; + uv_async_t async; +- uv_idle_t idle; + size_t thread_count; + bool is_closing; ++ std::atomic_uchar dispatch_state; + + // These are variables set once, upon creation, and then never again, which + // means we don't need the mutex to read them. +diff --git a/test/node-api/test_threadsafe_function/binding.c b/test/node-api/test_threadsafe_function/binding.c +index c9c526153804c60b5bd5844d2c2aacac7f0fb514..ae3ec67de43cfffdfb10772761dbd69f01c623bb 100644 +--- a/test/node-api/test_threadsafe_function/binding.c ++++ b/test/node-api/test_threadsafe_function/binding.c +@@ -7,7 +7,7 @@ + #include + #include "../../js-native-api/common.h" + +-#define ARRAY_LENGTH 10 ++#define ARRAY_LENGTH 10000 + #define MAX_QUEUE_SIZE 2 + + static uv_thread_t uv_threads[2]; +@@ -72,7 +72,7 @@ static void data_source_thread(void* data) { + for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { + status = napi_call_threadsafe_function(ts_fn, &ints[index], + ts_fn_info->block_on_full); +- if (ts_fn_info->max_queue_size == 0) { ++ if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) { + // Let's make this thread really busy for 200 ms to give the main thread a + // chance to abort. + uint64_t start = uv_hrtime(); +diff --git a/test/node-api/test_threadsafe_function/test.js b/test/node-api/test_threadsafe_function/test.js +index 3603d79ee6b5d36590503989d8168368eaf12b03..ccd3f4228a793ae77eff760309e31191ba8de49a 100644 +--- a/test/node-api/test_threadsafe_function/test.js ++++ b/test/node-api/test_threadsafe_function/test.js +@@ -210,6 +210,15 @@ new Promise(function testWithoutJSMarshaller(resolve) { + })) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + ++// Make sure that threadsafe function isn't stalled when we hit ++// `kMaxIterationCount` in `src/node_api.cc` ++.then(() => testWithJSMarshaller({ ++ threadStarter: 'StartThreadNonblocking', ++ maxQueueSize: binding.ARRAY_LENGTH >>> 1, ++ quitAfter: binding.ARRAY_LENGTH ++})) ++.then((result) => assert.deepStrictEqual(result, expectedArray)) ++ + // Start a child process to test rapid teardown + .then(() => testUnref(binding.MAX_QUEUE_SIZE)) +