Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions site/source/docs/tools_reference/settings_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2466,10 +2466,10 @@ If true, enables deep debugging of Web Audio backend.

Default value: 0

.. _pthread_pool_size:
.. _pthread_manager:

PTHREAD_POOL_SIZE
=================
PTHREAD_MANAGER
===============

In web browsers, Workers cannot be created while the main browser thread
is executing JS/Wasm code, but the main thread must regularly yield back
Expand All @@ -2479,6 +2479,17 @@ when called from the main browser thread, and the main thread must
repeatedly yield back to the JS event loop in order for the thread to
actually start.
If your application needs to be able to synchronously create new threads,
If true, a dedicated worker is used to manage pthread lifecycles.
This allows synchronous thread creation even when the main thread is
blocked.

Default value: false

.. _pthread_pool_size:

PTHREAD_POOL_SIZE
=================

you can pre-create a pthread pool by specifying -sPTHREAD_POOL_SIZE=x,
in which case the specified number of Workers will be preloaded into a pool
before the application starts, and that many threads can then be available
Expand Down
143 changes: 134 additions & 9 deletions src/lib/libpthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ const CMD_CLEANUP_THREAD = 6;
const CMD_MARK_AS_FINISHED = 7;
const CMD_UNCAUGHT_EXN = 8;
const CMD_CALL_HANDLER = 9;
#if PTHREAD_MANAGER
const CMD_LOAD_MANAGER = 10;
const CMD_CREATE_WORKER = 11;
const CMD_TERMINATE_WORKER = 12;
const CMD_FORWARD_TO_WORKER = 13;
const CMD_FORWARD_FROM_WORKER = 14;
const CMD_ERROR_FROM_WORKER = 15;
#endif

#if WASM_ESM_INTEGRATION
const pthreadWorkerScript = TARGET_BASENAME + '.pthread.mjs';
Expand Down Expand Up @@ -113,13 +121,48 @@ var LibraryPThread = {
// the reverse mapping, each worker has a `pthread_ptr` when its running a
// pthread.
pthreads: {},
#if PTHREAD_MANAGER
proxyWorkers: {},
// A ProxyWorker acts as a main-thread representative for a Web Worker
// managed by the manager worker. It implements the standard Worker
// interface (postMessage, terminate, onmessage, onerror) so that the main
// thread's scheduler can interact with it transparently and pool it just
// like a standard worker.
ProxyWorker: class {
constructor(workerID) {
this.workerID = workerID;
PThread.proxyWorkers[workerID] = this;
PThread.managerWorker.postMessage({
cmd: {{{ CMD_CREATE_WORKER }}},
workerID: workerID
});
}
postMessage(msg, transfer) {
PThread.managerWorker.postMessage({
cmd: {{{ CMD_FORWARD_TO_WORKER }}},
workerID: this.workerID,
msg: msg,
transferList: transfer
}, transfer);
}
terminate() {
PThread.managerWorker.postMessage({
cmd: {{{ CMD_TERMINATE_WORKER }}},
workerID: this.workerID
});
delete PThread.proxyWorkers[this.workerID];
}
},
#endif
#if MAIN_MODULE
outstandingPromises: {},
// Finished threads are threads that have finished running but we are not yet
// joined.
finishedThreads: new Set(),
#endif
#if ASSERTIONS
#if ASSERTIONS || PTHREAD_MANAGER
// Used for debugging/assertions, or functionally by PTHREAD_MANAGER to
// route multiplexed messages/errors to the correct ProxyWorker instance.
nextWorkerID: 1,
#endif
init() {
Expand All @@ -128,19 +171,46 @@ var LibraryPThread = {
}
},
initMainThread() {
#if ENVIRONMENT_MAY_BE_NODE
if (ENVIRONMENT_IS_NODE) return;
#endif
#if PTHREAD_MANAGER
#if ASSERTIONS
dbg('PThread: initializing manager worker');
#endif
var managerReadyResolve;
PThread.managerWorkerReady = new Promise((resolve) => { managerReadyResolve = resolve; });
PThread.managerWorker = PThread.createRealWorker();
addOnPreRun(async () => {
var managerReady = PThread.initManagerWorker(PThread.managerWorker);
addRunDependency('manager-worker');
await managerReady;
#if PTHREAD_POOL_SIZE
var pthreadPoolSize = {{{ PTHREAD_POOL_SIZE }}};
// Start loading up the Worker pool, if requested.
while (pthreadPoolSize--) {
PThread.allocateUnusedWorker();
}
PThread.spawnPool();
#endif
removeRunDependency('manager-worker');
managerReadyResolve();
});
#endif // PTHREAD_MANAGER
#if PTHREAD_POOL_SIZE
#if ASSERTIONS
dbg('PThread: initializing worker pool');
#endif
#if !PTHREAD_MANAGER
PThread.spawnPool();
#endif
#if !MINIMAL_RUNTIME
// MINIMAL_RUNTIME takes care of calling loadWasmModuleToAllWorkers
// in postamble_minimal.js
addOnPreRun(async () => {
var pthreadPoolReady = PThread.loadWasmModuleToAllWorkers();
#if !PTHREAD_POOL_DELAY_LOAD
addRunDependency('loading-workers');
#endif
#if PTHREAD_MANAGER
await PThread.managerWorkerReady;
#endif
var pthreadPoolReady = PThread.loadWasmModuleToAllWorkers();
#if !PTHREAD_POOL_DELAY_LOAD
await pthreadPoolReady;
removeRunDependency('loading-workers');
#endif // PTHREAD_POOL_DELAY_LOAD
Expand Down Expand Up @@ -265,6 +335,37 @@ var LibraryPThread = {
// module loaded.
PThread.tlsInitFunctions.forEach((f) => f());
},
#if PTHREAD_MANAGER
initManagerWorker: (worker) => new Promise((onFinishedLoading) => {
worker.onmessage = (e) => {
var d = e.data;
var cmd = d.cmd;
switch (cmd) {
case {{{ CMD_FORWARD_FROM_WORKER }}}:
PThread.proxyWorkers[d.workerID]?.onmessage({ data: d.msg });
break;
case {{{ CMD_ERROR_FROM_WORKER }}}:
PThread.proxyWorkers[d.workerID]?.onerror(d.error);
break;
case {{{ CMD_LOADED }}}:
onFinishedLoading(worker);
break;
default:
if (cmd) err(`manager worker sent an unknown command ${cmd}`);
}
};
worker.onerror = (e) => {
err(`Manager worker sent an error! ${e.filename}:${e.lineno}: ${e.message}`);
throw e;
};
worker.postMessage({
cmd: {{{ CMD_LOAD_MANAGER }}},
#if ASSERTIONS
workerID: worker.workerID,
#endif
});
}),
#endif
// Loads the WebAssembly module into the given Worker.
// onFinishedLoading: A callback function that will be called once all of
// the workers have been initialized and are
Expand Down Expand Up @@ -468,7 +569,7 @@ var LibraryPThread = {
#endif // PTHREAD_POOL_SIZE

// Creates a new web Worker and places it in the unused worker pool to wait for its use.
allocateUnusedWorker() {
createRealWorker() {
var worker;
#if EXPORT_ES6
// If we're using module output, use bundler-friendly pattern.
Expand Down Expand Up @@ -541,12 +642,34 @@ var LibraryPThread = {
#endif
worker = new Worker(pthreadMainJs, {{{ pthreadWorkerOptions }}});
#endif // EXPORT_ES6
#if ASSERTIONS
#if ASSERTIONS || PTHREAD_MANAGER
worker.workerID = PThread.nextWorkerID++;
#endif
return worker;
},

allocateUnusedWorker() {
var worker;
#if PTHREAD_MANAGER
if (PThread.managerWorker) {
var workerID = PThread.nextWorkerID++;
worker = new PThread.ProxyWorker(workerID);
} else
#endif
{
worker = PThread.createRealWorker();
}
PThread.unusedWorkers.push(worker);
return worker;
},
#if PTHREAD_POOL_SIZE
spawnPool() {
var pthreadPoolSize = {{{ PTHREAD_POOL_SIZE }}};
while (pthreadPoolSize--) {
PThread.allocateUnusedWorker();
}
},
#endif

getNewWorker() {
if (PThread.unusedWorkers.length == 0) {
Expand Down Expand Up @@ -698,6 +821,8 @@ var LibraryPThread = {
assert(threadParams.pthread_ptr, 'spawnThread called with null pthread ptr');
#endif



var worker = PThread.getNewWorker();
if (!worker) {
// No available workers in the PThread pool.
Expand Down
66 changes: 66 additions & 0 deletions src/runtime_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,68 @@ if (ENVIRONMENT_IS_PTHREAD) {
// notified about them.
self.onunhandledrejection = (e) => { throw e.reason || e; };

#if PTHREAD_MANAGER
const targetWorkers = {};

function handleManagerMessage(e) {
var d = e.data;
var cmd = d.cmd;
if (cmd === {{{ CMD_CREATE_WORKER }}}) {
var workerID = d.workerID;
var worker = PThread.createRealWorker();
worker.workerID = workerID;
targetWorkers[workerID] = worker;
worker.onmessage = (e) => {
var msg = e.data;
var transferList = msg && msg.transferList;
postMessage({
cmd: {{{ CMD_FORWARD_FROM_WORKER }}},
workerID: workerID,
msg: msg
}, transferList);
};
worker.onerror = (e) => {
postMessage({
cmd: {{{ CMD_ERROR_FROM_WORKER }}},
workerID: workerID,
error: {
message: e.message,
filename: e.filename,
lineno: e.lineno,
colno: e.colno
}
});
};
} else if (cmd === {{{ CMD_FORWARD_TO_WORKER }}}) {
var worker = targetWorkers[d.workerID];
if (worker) {
worker.postMessage(d.msg, d.transferList);
}
} else if (cmd === {{{ CMD_TERMINATE_WORKER }}}) {
var worker = targetWorkers[d.workerID];
if (worker) {
worker.terminate();
delete targetWorkers[d.workerID];
}
} else {
#if ASSERTIONS
assert(false, "unknown message in pthread manager: " + cmd);
#endif
}
}
#endif

{{{ asyncIf(ASYNCIFY == 2) }}}function handleMessage(e) {
try {
var msgData = e.data;
//dbg('msgData: ' + Object.keys(msgData));
var cmd = msgData.cmd;

if (cmd == {{{ CMD_LOAD }}}) { // Preload command that is called once per worker to parse and load the Emscripten code.
#if ASSERTIONS
workerID = msgData.workerID;
#endif

#if PTHREADS_DEBUG
dbg('worker: loading module')
#endif
Expand All @@ -61,8 +114,12 @@ if (ENVIRONMENT_IS_PTHREAD) {

// And add a callback for when the runtime is initialized.
startWorker = () => {
#if PTHREADS_DEBUG
dbg('worker: startWorker');
#endif
// Notify the main thread that this thread has loaded.
postMessage({ cmd: {{{ CMD_LOADED }}} });

// Process any messages that were queued before the thread was ready.
for (let msg of messageQueue) {
handleMessage(msg);
Expand Down Expand Up @@ -127,6 +184,14 @@ if (ENVIRONMENT_IS_PTHREAD) {
run();
#endif
#endif // MINIMAL_RUNTIME
#endif
#if PTHREAD_MANAGER
} else if (cmd == {{{ CMD_LOAD_MANAGER }}}) {
#if RUNTIME_DEBUG
dbg('creating worker manager');
#endif
postMessage({ cmd: {{{ CMD_LOADED }}} });
self.onmessage = handleManagerMessage;
#endif
} else if (cmd == {{{ CMD_RUN }}}) {
#if ASSERTIONS
Expand Down Expand Up @@ -178,6 +243,7 @@ if (ENVIRONMENT_IS_PTHREAD) {
if (initializedJS) {
checkMailbox();
}

} else if (cmd) {
// The received message looks like something that should be handled by this message
// handler, (since there is a cmd field present), but is not one of the
Expand Down
5 changes: 5 additions & 0 deletions src/settings.js
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,11 @@ var WEBAUDIO_DEBUG = 0;
// repeatedly yield back to the JS event loop in order for the thread to
// actually start.
// If your application needs to be able to synchronously create new threads,
// If true, a dedicated worker is used to manage pthread lifecycles.
// This allows synchronous thread creation even when the main thread is
// blocked.
var PTHREAD_MANAGER = false;

// you can pre-create a pthread pool by specifying -sPTHREAD_POOL_SIZE=x,
// in which case the specified number of Workers will be preloaded into a pool
// before the application starts, and that many threads can then be available
Expand Down
29 changes: 29 additions & 0 deletions test/pthread/test_pthread_manager.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include <pthread.h>
#include <stdio.h>

#include <emscripten/em_js.h>

EM_JS(int, get_worker_id, (), {
return workerID;
});


void *thread_main(void* arg) {
printf("thread_main (workerID=%d)\n", get_worker_id());
return NULL;
}

int main() {
printf("main (workerID=%d)\n", get_worker_id());
pthread_t t;

pthread_create(&t, NULL, thread_main, NULL);
pthread_join(t, NULL);

// The second pthread should run on the same worker.
pthread_create(&t, NULL, thread_main, NULL);
pthread_join(t, NULL);

printf("done\n");
return 0;
}
Loading
Loading