diff --git a/site/source/docs/tools_reference/settings_reference.rst b/site/source/docs/tools_reference/settings_reference.rst index c61cd19e1dc15..52a39d8426cda 100644 --- a/site/source/docs/tools_reference/settings_reference.rst +++ b/site/source/docs/tools_reference/settings_reference.rst @@ -2466,10 +2466,10 @@ If true, enables deep debugging of Web Audio backend. Default value: 0 -.. _pthread_pool_size: +.. _pthread_manager: -PTHREAD_POOL_SIZE -================= +PTHREAD_MANAGER +=============== In web browsers, Workers cannot be created while the main browser thread is executing JS/Wasm code, but the main thread must regularly yield back @@ -2479,6 +2479,17 @@ when called from the main browser thread, and the main thread must repeatedly yield back to the JS event loop in order for the thread to actually start. If your application needs to be able to synchronously create new threads, +If true, a dedicated worker is used to manage pthread lifecycles. +This allows synchronous thread creation even when the main thread is +blocked. + +Default value: false + +.. _pthread_pool_size: + +PTHREAD_POOL_SIZE +================= + you can pre-create a pthread pool by specifying -sPTHREAD_POOL_SIZE=x, in which case the specified number of Workers will be preloaded into a pool before the application starts, and that many threads can then be available diff --git a/src/lib/libpthread.js b/src/lib/libpthread.js index c9ba01e4820d6..1d853d68201f8 100644 --- a/src/lib/libpthread.js +++ b/src/lib/libpthread.js @@ -40,6 +40,14 @@ const CMD_CLEANUP_THREAD = 6; const CMD_MARK_AS_FINISHED = 7; const CMD_UNCAUGHT_EXN = 8; const CMD_CALL_HANDLER = 9; +#if PTHREAD_MANAGER +const CMD_LOAD_MANAGER = 10; +const CMD_CREATE_WORKER = 11; +const CMD_TERMINATE_WORKER = 12; +const CMD_FORWARD_TO_WORKER = 13; +const CMD_FORWARD_FROM_WORKER = 14; +const CMD_ERROR_FROM_WORKER = 15; +#endif #if WASM_ESM_INTEGRATION const pthreadWorkerScript = TARGET_BASENAME + '.pthread.mjs'; @@ -113,13 +121,48 @@ var LibraryPThread = { // the reverse mapping, each worker has a `pthread_ptr` when its running a // pthread. pthreads: {}, +#if PTHREAD_MANAGER + proxyWorkers: {}, + // A ProxyWorker acts as a main-thread representative for a Web Worker + // managed by the manager worker. It implements the standard Worker + // interface (postMessage, terminate, onmessage, onerror) so that the main + // thread's scheduler can interact with it transparently and pool it just + // like a standard worker. + ProxyWorker: class { + constructor(workerID) { + this.workerID = workerID; + PThread.proxyWorkers[workerID] = this; + PThread.managerWorker.postMessage({ + cmd: {{{ CMD_CREATE_WORKER }}}, + workerID: workerID + }); + } + postMessage(msg, transfer) { + PThread.managerWorker.postMessage({ + cmd: {{{ CMD_FORWARD_TO_WORKER }}}, + workerID: this.workerID, + msg: msg, + transferList: transfer + }, transfer); + } + terminate() { + PThread.managerWorker.postMessage({ + cmd: {{{ CMD_TERMINATE_WORKER }}}, + workerID: this.workerID + }); + delete PThread.proxyWorkers[this.workerID]; + } + }, +#endif #if MAIN_MODULE outstandingPromises: {}, // Finished threads are threads that have finished running but we are not yet // joined. finishedThreads: new Set(), #endif -#if ASSERTIONS +#if ASSERTIONS || PTHREAD_MANAGER + // Used for debugging/assertions, or functionally by PTHREAD_MANAGER to + // route multiplexed messages/errors to the correct ProxyWorker instance. nextWorkerID: 1, #endif init() { @@ -128,19 +171,46 @@ var LibraryPThread = { } }, initMainThread() { +#if ENVIRONMENT_MAY_BE_NODE + if (ENVIRONMENT_IS_NODE) return; +#endif +#if PTHREAD_MANAGER +#if ASSERTIONS + dbg('PThread: initializing manager worker'); +#endif + var managerReadyResolve; + PThread.managerWorkerReady = new Promise((resolve) => { managerReadyResolve = resolve; }); + PThread.managerWorker = PThread.createRealWorker(); + addOnPreRun(async () => { + var managerReady = PThread.initManagerWorker(PThread.managerWorker); + addRunDependency('manager-worker'); + await managerReady; #if PTHREAD_POOL_SIZE - var pthreadPoolSize = {{{ PTHREAD_POOL_SIZE }}}; - // Start loading up the Worker pool, if requested. - while (pthreadPoolSize--) { - PThread.allocateUnusedWorker(); - } + PThread.spawnPool(); +#endif + removeRunDependency('manager-worker'); + managerReadyResolve(); + }); +#endif // PTHREAD_MANAGER +#if PTHREAD_POOL_SIZE +#if ASSERTIONS + dbg('PThread: initializing worker pool'); +#endif +#if !PTHREAD_MANAGER + PThread.spawnPool(); +#endif #if !MINIMAL_RUNTIME // MINIMAL_RUNTIME takes care of calling loadWasmModuleToAllWorkers // in postamble_minimal.js addOnPreRun(async () => { - var pthreadPoolReady = PThread.loadWasmModuleToAllWorkers(); #if !PTHREAD_POOL_DELAY_LOAD addRunDependency('loading-workers'); +#endif +#if PTHREAD_MANAGER + await PThread.managerWorkerReady; +#endif + var pthreadPoolReady = PThread.loadWasmModuleToAllWorkers(); +#if !PTHREAD_POOL_DELAY_LOAD await pthreadPoolReady; removeRunDependency('loading-workers'); #endif // PTHREAD_POOL_DELAY_LOAD @@ -265,6 +335,37 @@ var LibraryPThread = { // module loaded. PThread.tlsInitFunctions.forEach((f) => f()); }, +#if PTHREAD_MANAGER + initManagerWorker: (worker) => new Promise((onFinishedLoading) => { + worker.onmessage = (e) => { + var d = e.data; + var cmd = d.cmd; + switch (cmd) { + case {{{ CMD_FORWARD_FROM_WORKER }}}: + PThread.proxyWorkers[d.workerID]?.onmessage({ data: d.msg }); + break; + case {{{ CMD_ERROR_FROM_WORKER }}}: + PThread.proxyWorkers[d.workerID]?.onerror(d.error); + break; + case {{{ CMD_LOADED }}}: + onFinishedLoading(worker); + break; + default: + if (cmd) err(`manager worker sent an unknown command ${cmd}`); + } + }; + worker.onerror = (e) => { + err(`Manager worker sent an error! ${e.filename}:${e.lineno}: ${e.message}`); + throw e; + }; + worker.postMessage({ + cmd: {{{ CMD_LOAD_MANAGER }}}, +#if ASSERTIONS + workerID: worker.workerID, +#endif + }); + }), +#endif // Loads the WebAssembly module into the given Worker. // onFinishedLoading: A callback function that will be called once all of // the workers have been initialized and are @@ -468,7 +569,7 @@ var LibraryPThread = { #endif // PTHREAD_POOL_SIZE // Creates a new web Worker and places it in the unused worker pool to wait for its use. - allocateUnusedWorker() { + createRealWorker() { var worker; #if EXPORT_ES6 // If we're using module output, use bundler-friendly pattern. @@ -541,12 +642,34 @@ var LibraryPThread = { #endif worker = new Worker(pthreadMainJs, {{{ pthreadWorkerOptions }}}); #endif // EXPORT_ES6 -#if ASSERTIONS +#if ASSERTIONS || PTHREAD_MANAGER worker.workerID = PThread.nextWorkerID++; #endif + return worker; + }, + + allocateUnusedWorker() { + var worker; +#if PTHREAD_MANAGER + if (PThread.managerWorker) { + var workerID = PThread.nextWorkerID++; + worker = new PThread.ProxyWorker(workerID); + } else +#endif + { + worker = PThread.createRealWorker(); + } PThread.unusedWorkers.push(worker); return worker; }, +#if PTHREAD_POOL_SIZE + spawnPool() { + var pthreadPoolSize = {{{ PTHREAD_POOL_SIZE }}}; + while (pthreadPoolSize--) { + PThread.allocateUnusedWorker(); + } + }, +#endif getNewWorker() { if (PThread.unusedWorkers.length == 0) { @@ -698,6 +821,8 @@ var LibraryPThread = { assert(threadParams.pthread_ptr, 'spawnThread called with null pthread ptr'); #endif + + var worker = PThread.getNewWorker(); if (!worker) { // No available workers in the PThread pool. diff --git a/src/runtime_pthread.js b/src/runtime_pthread.js index de70f2460d5c9..56dfc5975f247 100644 --- a/src/runtime_pthread.js +++ b/src/runtime_pthread.js @@ -42,15 +42,68 @@ if (ENVIRONMENT_IS_PTHREAD) { // notified about them. self.onunhandledrejection = (e) => { throw e.reason || e; }; +#if PTHREAD_MANAGER + const targetWorkers = {}; + + function handleManagerMessage(e) { + var d = e.data; + var cmd = d.cmd; + if (cmd === {{{ CMD_CREATE_WORKER }}}) { + var workerID = d.workerID; + var worker = PThread.createRealWorker(); + worker.workerID = workerID; + targetWorkers[workerID] = worker; + worker.onmessage = (e) => { + var msg = e.data; + var transferList = msg && msg.transferList; + postMessage({ + cmd: {{{ CMD_FORWARD_FROM_WORKER }}}, + workerID: workerID, + msg: msg + }, transferList); + }; + worker.onerror = (e) => { + postMessage({ + cmd: {{{ CMD_ERROR_FROM_WORKER }}}, + workerID: workerID, + error: { + message: e.message, + filename: e.filename, + lineno: e.lineno, + colno: e.colno + } + }); + }; + } else if (cmd === {{{ CMD_FORWARD_TO_WORKER }}}) { + var worker = targetWorkers[d.workerID]; + if (worker) { + worker.postMessage(d.msg, d.transferList); + } + } else if (cmd === {{{ CMD_TERMINATE_WORKER }}}) { + var worker = targetWorkers[d.workerID]; + if (worker) { + worker.terminate(); + delete targetWorkers[d.workerID]; + } + } else { +#if ASSERTIONS + assert(false, "unknown message in pthread manager: " + cmd); +#endif + } + } +#endif + {{{ asyncIf(ASYNCIFY == 2) }}}function handleMessage(e) { try { var msgData = e.data; //dbg('msgData: ' + Object.keys(msgData)); var cmd = msgData.cmd; + if (cmd == {{{ CMD_LOAD }}}) { // Preload command that is called once per worker to parse and load the Emscripten code. #if ASSERTIONS workerID = msgData.workerID; #endif + #if PTHREADS_DEBUG dbg('worker: loading module') #endif @@ -61,8 +114,12 @@ if (ENVIRONMENT_IS_PTHREAD) { // And add a callback for when the runtime is initialized. startWorker = () => { +#if PTHREADS_DEBUG + dbg('worker: startWorker'); +#endif // Notify the main thread that this thread has loaded. postMessage({ cmd: {{{ CMD_LOADED }}} }); + // Process any messages that were queued before the thread was ready. for (let msg of messageQueue) { handleMessage(msg); @@ -127,6 +184,14 @@ if (ENVIRONMENT_IS_PTHREAD) { run(); #endif #endif // MINIMAL_RUNTIME +#endif +#if PTHREAD_MANAGER + } else if (cmd == {{{ CMD_LOAD_MANAGER }}}) { +#if RUNTIME_DEBUG + dbg('creating worker manager'); +#endif + postMessage({ cmd: {{{ CMD_LOADED }}} }); + self.onmessage = handleManagerMessage; #endif } else if (cmd == {{{ CMD_RUN }}}) { #if ASSERTIONS @@ -178,6 +243,7 @@ if (ENVIRONMENT_IS_PTHREAD) { if (initializedJS) { checkMailbox(); } + } else if (cmd) { // The received message looks like something that should be handled by this message // handler, (since there is a cmd field present), but is not one of the diff --git a/src/settings.js b/src/settings.js index c9e8ad2f6eed5..638cfa7966273 100644 --- a/src/settings.js +++ b/src/settings.js @@ -1617,6 +1617,11 @@ var WEBAUDIO_DEBUG = 0; // repeatedly yield back to the JS event loop in order for the thread to // actually start. // If your application needs to be able to synchronously create new threads, +// If true, a dedicated worker is used to manage pthread lifecycles. +// This allows synchronous thread creation even when the main thread is +// blocked. +var PTHREAD_MANAGER = false; + // you can pre-create a pthread pool by specifying -sPTHREAD_POOL_SIZE=x, // in which case the specified number of Workers will be preloaded into a pool // before the application starts, and that many threads can then be available diff --git a/test/pthread/test_pthread_manager.c b/test/pthread/test_pthread_manager.c new file mode 100644 index 0000000000000..1f66eeb82294d --- /dev/null +++ b/test/pthread/test_pthread_manager.c @@ -0,0 +1,29 @@ +#include +#include + +#include + +EM_JS(int, get_worker_id, (), { + return workerID; +}); + + +void *thread_main(void* arg) { + printf("thread_main (workerID=%d)\n", get_worker_id()); + return NULL; +} + +int main() { + printf("main (workerID=%d)\n", get_worker_id()); + pthread_t t; + + pthread_create(&t, NULL, thread_main, NULL); + pthread_join(t, NULL); + + // The second pthread should run on the same worker. + pthread_create(&t, NULL, thread_main, NULL); + pthread_join(t, NULL); + + printf("done\n"); + return 0; +} diff --git a/test/test_browser.py b/test/test_browser.py index 31b3d16d04ac4..261245e9bbef3 100644 --- a/test/test_browser.py +++ b/test/test_browser.py @@ -3672,13 +3672,24 @@ def test_pthread_in_pthread_pool_size_strict(self): # Check that it fails when there's a pthread creating another pthread. self.btest_exit('pthread/test_pthread_create_pthread.c', cflags=['-g2', '-pthread', '-sPTHREAD_POOL_SIZE=1', '-sPTHREAD_POOL_SIZE_STRICT=2', '-DSMALL_POOL']) + @parameterized({ + '': ([],), + 'pool': (['-sPTHREAD_POOL_SIZE=2'],), + }) + def test_pthread_manager(self, args): + self.btest_exit('pthread/test_pthread_manager.c', cflags=['-pthread', '-sPTHREAD_MANAGER'] + args) + # Test that the emscripten_ atomics api functions work. + @parameterized({ + '': (['-sPTHREAD_POOL_SIZE=8'],), + 'manager': (['-sPTHREAD_MANAGER'],), + }) @parameterized({ '': ([],), 'closure': (['--closure=1'],), }) - def test_pthread_atomics(self, args): - self.btest_exit('pthread/test_pthread_atomics.c', cflags=['-O3', '-pthread', '-sPTHREAD_POOL_SIZE=8', '-g1'] + args) + def test_pthread_atomics(self, args1, args2): + self.btest_exit('pthread/test_pthread_atomics.c', cflags=['-O3', '-pthread', '-g1'] + args1 + args2) # Test 64-bit atomics. def test_pthread_64bit_atomics(self): diff --git a/tools/link.py b/tools/link.py index 3d3d838da3733..d8cff58734c90 100644 --- a/tools/link.py +++ b/tools/link.py @@ -1511,7 +1511,7 @@ def limit_incoming_module_api(): if settings.PTHREADS: setup_pthreads() add_system_js_lib('libpthread.js') - if settings.PROXY_TO_PTHREAD: + if settings.PROXY_TO_PTHREAD or settings.PTHREAD_MANAGER: settings.PTHREAD_POOL_SIZE_STRICT = 0 settings.DEFAULT_LIBRARY_FUNCS_TO_INCLUDE += ['$runtimeKeepalivePush'] else: