larry babby and threejs for glsl

This commit is contained in:
Sam
2024-06-24 21:24:00 +12:00
parent 87d5dc634d
commit 907ebae4c0
6474 changed files with 1279596 additions and 8 deletions

View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017-present Devon Govett
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,23 @@
import {FilePath} from '@parcel/types';
type BackendType = 'process' | 'threads';
export type FarmOptions = {
maxConcurrentWorkers: number;
maxConcurrentCallsPerWorker: number;
forcedKillTime: number;
useLocalWorker: boolean;
warmWorkers: boolean;
workerPath?: FilePath;
backend: BackendType;
shouldPatchConsole?: boolean;
shouldTrace?: boolean;
};
declare class WorkerFarm {
constructor(options: FarmOptions);
end(): Promise<void>;
}
export default WorkerFarm;

View File

@@ -0,0 +1,46 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
var _package = _interopRequireDefault(require("../package.json"));
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
// $FlowFixMe
let HANDLE_ID = 0;
// $FlowFixMe
const handleById = new Map();
class Handle {
constructor(opts) {
var _opts$id;
this.id = (_opts$id = opts.id) !== null && _opts$id !== void 0 ? _opts$id : ++HANDLE_ID;
this.fn = opts.fn;
this.childId = opts.childId;
handleById.set(this.id, this);
}
dispose() {
handleById.delete(this.id);
}
serialize() {
return {
id: this.id,
childId: this.childId
};
}
static deserialize(opts) {
return new Handle(opts);
}
}
// Register the Handle as a serializable class so that it will properly be deserialized
// by anything that uses WorkerFarm.
exports.default = Handle;
(0, _core().registerSerializableClass)(`${_package.default.version}:Handle`, Handle);

View File

@@ -0,0 +1,188 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
function _nullthrows() {
const data = _interopRequireDefault(require("nullthrows"));
_nullthrows = function () {
return data;
};
return data;
}
function _events() {
const data = _interopRequireDefault(require("events"));
_events = function () {
return data;
};
return data;
}
function _diagnostic() {
const data = _interopRequireDefault(require("@parcel/diagnostic"));
_diagnostic = function () {
return data;
};
return data;
}
var _backend = require("./backend");
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
let WORKER_ID = 0;
class Worker extends _events().default {
id = WORKER_ID++;
sentSharedReferences = new Set();
calls = new Map();
exitCode = null;
callId = 0;
ready = false;
stopped = false;
isStopping = false;
constructor(options) {
super();
this.options = options;
}
async fork(forkModule) {
let filteredArgs = [];
if (process.execArgv) {
filteredArgs = process.execArgv.filter(v => !/^--(debug|inspect|no-opt|max-old-space-size=|max-semi-space-size=|expose-gc)/.test(v));
for (let i = 0; i < filteredArgs.length; i++) {
let arg = filteredArgs[i];
let isArgWithParam = (arg === '-r' || arg === '--require') && filteredArgs[i + 1] === '@parcel/register' || arg === '--title';
if (isArgWithParam) {
filteredArgs.splice(i, 2);
i--;
}
}
}
// Workaround for https://github.com/nodejs/node/issues/29117
if (process.env.NODE_OPTIONS) {
// arg parsing logic adapted from https://stackoverflow.com/a/46946420/2352201
let opts = [''];
let quote = false;
for (let c of (0, _nullthrows().default)(process.env.NODE_OPTIONS.match(/.|^$/g))) {
if (c === '"') {
quote = !quote;
} else if (!quote && c === ' ') {
opts.push('');
} else {
opts[opts.length - 1] += c.replace(/\\(.)/, '$1');
}
}
for (let i = 0; i < opts.length; i++) {
let opt = opts[i];
if (opt === '-r' || opt === '--require') {
filteredArgs.push(opt, opts[i + 1]);
i++;
}
}
}
let onMessage = data => this.receive(data);
let onExit = code => {
this.exitCode = code;
this.emit('exit', code);
};
let onError = err => {
this.emit('error', err);
};
let WorkerBackend = (0, _backend.getWorkerBackend)(this.options.backend);
this.worker = new WorkerBackend(filteredArgs, onMessage, onError, onExit);
await this.worker.start();
await new Promise((resolve, reject) => {
this.call({
method: 'childInit',
args: [forkModule, {
shouldPatchConsole: !!this.options.shouldPatchConsole,
shouldTrace: !!this.options.shouldTrace
}],
retries: 0,
skipReadyCheck: true,
resolve,
reject
});
});
let sharedRefs = this.options.sharedReferences;
let refsShared = new Set();
// in case more refs are created while initial refs are sending
while (refsShared.size < sharedRefs.size) {
await Promise.all([...sharedRefs].filter(([ref]) => !refsShared.has(ref)).map(async ([ref, value]) => {
await this.sendSharedReference(ref, value);
refsShared.add(ref);
}));
}
this.ready = true;
this.emit('ready');
}
sendSharedReference(ref, value) {
this.sentSharedReferences.add(ref);
return new Promise((resolve, reject) => {
this.call({
method: 'createSharedReference',
args: [ref, value],
resolve,
reject,
retries: 0,
skipReadyCheck: true
});
});
}
send(data) {
this.worker.send(data);
}
call(call) {
if (this.stopped || this.isStopping) {
return;
}
let idx = this.callId++;
this.calls.set(idx, call);
let msg = {
type: 'request',
idx: idx,
child: this.id,
handle: call.handle,
method: call.method,
args: call.args
};
if (this.ready || call.skipReadyCheck === true) {
this.send(msg);
} else {
this.once('ready', () => this.send(msg));
}
}
receive(message) {
if (this.stopped || this.isStopping) {
return;
}
if (message.type === 'request') {
this.emit('request', message);
} else if (message.type === 'response') {
let idx = message.idx;
if (idx == null) {
return;
}
let call = this.calls.get(idx);
if (!call) {
// Return for unknown calls, these might accur if a third party process uses workers
return;
}
if (message.contentType === 'error') {
call.reject(new (_diagnostic().default)({
diagnostic: message.content
}));
} else {
call.resolve(message.content);
}
this.calls.delete(idx);
this.emit('response', message);
}
}
async stop() {
if (!this.stopped) {
this.stopped = true;
if (this.worker) {
await this.worker.stop();
}
}
}
}
exports.default = Worker;

View File

@@ -0,0 +1,563 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
Object.defineProperty(exports, "Handle", {
enumerable: true,
get: function () {
return _Handle.default;
}
});
exports.default = void 0;
var coreWorker = _interopRequireWildcard(require("./core-worker"));
var bus = _interopRequireWildcard(require("./bus"));
function _assert() {
const data = _interopRequireDefault(require("assert"));
_assert = function () {
return data;
};
return data;
}
function _nullthrows() {
const data = _interopRequireDefault(require("nullthrows"));
_nullthrows = function () {
return data;
};
return data;
}
function _events() {
const data = _interopRequireDefault(require("events"));
_events = function () {
return data;
};
return data;
}
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
function _diagnostic() {
const data = _interopRequireWildcard(require("@parcel/diagnostic"));
_diagnostic = function () {
return data;
};
return data;
}
var _Worker = _interopRequireDefault(require("./Worker"));
var _cpuCount = _interopRequireDefault(require("./cpuCount"));
var _Handle = _interopRequireDefault(require("./Handle"));
var _childState = require("./childState");
var _backend = require("./backend");
function _profiler() {
const data = require("@parcel/profiler");
_profiler = function () {
return data;
};
return data;
}
function _fs() {
const data = _interopRequireDefault(require("fs"));
_fs = function () {
return data;
};
return data;
}
function _logger() {
const data = _interopRequireDefault(require("@parcel/logger"));
_logger = function () {
return data;
};
return data;
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _getRequireWildcardCache(e) { if ("function" != typeof WeakMap) return null; var r = new WeakMap(), t = new WeakMap(); return (_getRequireWildcardCache = function (e) { return e ? t : r; })(e); }
function _interopRequireWildcard(e, r) { if (!r && e && e.__esModule) return e; if (null === e || "object" != typeof e && "function" != typeof e) return { default: e }; var t = _getRequireWildcardCache(r); if (t && t.has(e)) return t.get(e); var n = { __proto__: null }, a = Object.defineProperty && Object.getOwnPropertyDescriptor; for (var u in e) if ("default" !== u && Object.prototype.hasOwnProperty.call(e, u)) { var i = a ? Object.getOwnPropertyDescriptor(e, u) : null; i && (i.get || i.set) ? Object.defineProperty(n, u, i) : n[u] = e[u]; } return n.default = e, t && t.set(e, n), n; }
let referenceId = 1;
const DEFAULT_MAX_CONCURRENT_CALLS = 30;
/**
* workerPath should always be defined inside farmOptions
*/
class WorkerFarm extends _events().default {
callQueue = [];
ending = false;
warmWorkers = 0;
readyWorkers = 0;
workers = new Map();
handles = new Map();
sharedReferences = new Map();
sharedReferencesByValue = new Map();
serializedSharedReferences = new Map();
constructor(farmOptions = {}) {
var _process$stdout;
super();
this.options = {
maxConcurrentWorkers: WorkerFarm.getNumWorkers(),
maxConcurrentCallsPerWorker: WorkerFarm.getConcurrentCallsPerWorker(farmOptions.shouldTrace ? 1 : DEFAULT_MAX_CONCURRENT_CALLS),
forcedKillTime: 500,
warmWorkers: false,
useLocalWorker: true,
// TODO: setting this to false makes some tests fail, figure out why
backend: (0, _backend.detectBackend)(),
...farmOptions
};
if (!this.options.workerPath) {
throw new Error('Please provide a worker path!');
}
// $FlowFixMe
if (process.browser) {
if (this.options.workerPath === '@parcel/core/src/worker.js') {
this.localWorker = coreWorker;
} else {
throw new Error('No dynamic require possible: ' + this.options.workerPath);
}
} else {
// $FlowFixMe this must be dynamic
this.localWorker = require(this.options.workerPath);
}
this.localWorkerInit = this.localWorker.childInit != null ? this.localWorker.childInit() : null;
this.run = this.createHandle('run');
// Worker thread stdout is by default piped into the process stdout, if there are enough worker
// threads to exceed the default listener limit, then anything else piping into stdout will trigger
// the `MaxListenersExceededWarning`, so we should ensure the max listeners is at least equal to the
// number of workers + 1 for the main thread.
//
// Note this can't be fixed easily where other things pipe into stdout - even after starting > 10 worker
// threads `process.stdout.getMaxListeners()` will still return 10, however adding another pipe into `stdout`
// will give the warning with `<worker count + 1>` as the number of listeners.
(_process$stdout = process.stdout) === null || _process$stdout === void 0 || _process$stdout.setMaxListeners(Math.max(process.stdout.getMaxListeners(), WorkerFarm.getNumWorkers() + 1));
this.startMaxWorkers();
}
workerApi = {
callMaster: async (request, awaitResponse = true) => {
// $FlowFixMe
let result = await this.processRequest({
...request,
awaitResponse
});
return (0, _core().deserialize)((0, _core().serialize)(result));
},
createReverseHandle: fn => this.createReverseHandle(fn),
callChild: (childId, request) => new Promise((resolve, reject) => {
(0, _nullthrows().default)(this.workers.get(childId)).call({
...request,
resolve,
reject,
retries: 0
});
}),
runHandle: (handle, args) => this.workerApi.callChild((0, _nullthrows().default)(handle.childId), {
handle: handle.id,
args
}),
getSharedReference: ref => this.sharedReferences.get(ref),
resolveSharedReference: value => this.sharedReferencesByValue.get(value)
};
warmupWorker(method, args) {
// Workers are already stopping
if (this.ending) {
return;
}
// Workers are not warmed up yet.
// Send the job to a remote worker in the background,
// but use the result from the local worker - it will be faster.
let promise = this.addCall(method, [...args, true]);
if (promise) {
promise.then(() => {
this.warmWorkers++;
if (this.warmWorkers >= this.workers.size) {
this.emit('warmedup');
}
}).catch(() => {});
}
}
shouldStartRemoteWorkers() {
return this.options.maxConcurrentWorkers > 0 || !this.options.useLocalWorker;
}
createHandle(method, useMainThread = false) {
if (!this.options.useLocalWorker) {
useMainThread = false;
}
return async (...args) => {
// Child process workers are slow to start (~600ms).
// While we're waiting, just run on the main thread.
// This significantly speeds up startup time.
if (this.shouldUseRemoteWorkers() && !useMainThread) {
return this.addCall(method, [...args, false]);
} else {
if (this.options.warmWorkers && this.shouldStartRemoteWorkers()) {
this.warmupWorker(method, args);
}
let processedArgs;
if (!useMainThread) {
processedArgs = (0, _core().restoreDeserializedObject)((0, _core().prepareForSerialization)([...args, false]));
} else {
processedArgs = args;
}
if (this.localWorkerInit != null) {
await this.localWorkerInit;
this.localWorkerInit = null;
}
return this.localWorker[method](this.workerApi, ...processedArgs);
}
};
}
onError(error, worker) {
// Handle ipc errors
if (error.code === 'ERR_IPC_CHANNEL_CLOSED') {
return this.stopWorker(worker);
} else {
_logger().default.error(error, '@parcel/workers');
}
}
startChild() {
let worker = new _Worker.default({
forcedKillTime: this.options.forcedKillTime,
backend: this.options.backend,
shouldPatchConsole: this.options.shouldPatchConsole,
shouldTrace: this.options.shouldTrace,
sharedReferences: this.sharedReferences
});
worker.fork((0, _nullthrows().default)(this.options.workerPath));
worker.on('request', data => this.processRequest(data, worker));
worker.on('ready', () => {
this.readyWorkers++;
if (this.readyWorkers === this.options.maxConcurrentWorkers) {
this.emit('ready');
}
this.processQueue();
});
worker.on('response', () => this.processQueue());
worker.on('error', err => this.onError(err, worker));
worker.once('exit', () => this.stopWorker(worker));
this.workers.set(worker.id, worker);
}
async stopWorker(worker) {
if (!worker.stopped) {
this.workers.delete(worker.id);
worker.isStopping = true;
if (worker.calls.size) {
for (let call of worker.calls.values()) {
call.retries++;
this.callQueue.unshift(call);
}
}
worker.calls.clear();
await worker.stop();
// Process any requests that failed and start a new worker
this.processQueue();
}
}
processQueue() {
if (this.ending || !this.callQueue.length) return;
if (this.workers.size < this.options.maxConcurrentWorkers) {
this.startChild();
}
let workers = [...this.workers.values()].sort((a, b) => a.calls.size - b.calls.size);
for (let worker of workers) {
if (!this.callQueue.length) {
break;
}
if (!worker.ready || worker.stopped || worker.isStopping) {
continue;
}
if (worker.calls.size < this.options.maxConcurrentCallsPerWorker) {
this.callWorker(worker, this.callQueue.shift());
}
}
}
async callWorker(worker, call) {
for (let ref of this.sharedReferences.keys()) {
if (!worker.sentSharedReferences.has(ref)) {
await worker.sendSharedReference(ref, this.getSerializedSharedReference(ref));
}
}
worker.call(call);
}
async processRequest(data, worker) {
let {
method,
args,
location,
awaitResponse,
idx,
handle: handleId
} = data;
let mod;
if (handleId != null) {
var _this$handles$get;
mod = (0, _nullthrows().default)((_this$handles$get = this.handles.get(handleId)) === null || _this$handles$get === void 0 ? void 0 : _this$handles$get.fn);
} else if (location) {
// $FlowFixMe
if (process.browser) {
if (location === '@parcel/workers/src/bus.js') {
mod = bus;
} else {
throw new Error('No dynamic require possible: ' + location);
}
} else {
// $FlowFixMe this must be dynamic
mod = require(location);
}
} else {
throw new Error('Unknown request');
}
const responseFromContent = content => ({
idx,
type: 'response',
contentType: 'data',
content
});
const errorResponseFromError = e => ({
idx,
type: 'response',
contentType: 'error',
content: (0, _diagnostic().anyToDiagnostic)(e)
});
let result;
if (method == null) {
try {
result = responseFromContent(await mod(...args));
} catch (e) {
result = errorResponseFromError(e);
}
} else {
// ESModule default interop
if (mod.__esModule && !mod[method] && mod.default) {
mod = mod.default;
}
try {
// $FlowFixMe
result = responseFromContent(await mod[method](...args));
} catch (e) {
result = errorResponseFromError(e);
}
}
if (awaitResponse) {
if (worker) {
worker.send(result);
} else {
if (result.contentType === 'error') {
throw new (_diagnostic().default)({
diagnostic: result.content
});
}
return result.content;
}
}
}
addCall(method, args) {
if (this.ending) {
throw new Error('Cannot add a worker call if workerfarm is ending.');
}
return new Promise((resolve, reject) => {
this.callQueue.push({
method,
args: args,
retries: 0,
resolve,
reject
});
this.processQueue();
});
}
async end() {
this.ending = true;
await Promise.all(Array.from(this.workers.values()).map(worker => this.stopWorker(worker)));
for (let handle of this.handles.values()) {
handle.dispose();
}
this.handles = new Map();
this.sharedReferences = new Map();
this.sharedReferencesByValue = new Map();
this.ending = false;
}
startMaxWorkers() {
// Starts workers until the maximum is reached
if (this.workers.size < this.options.maxConcurrentWorkers) {
let toStart = this.options.maxConcurrentWorkers - this.workers.size;
while (toStart--) {
this.startChild();
}
}
}
shouldUseRemoteWorkers() {
return !this.options.useLocalWorker || (this.warmWorkers >= this.workers.size || !this.options.warmWorkers) && this.options.maxConcurrentWorkers > 0;
}
createReverseHandle(fn) {
let handle = new _Handle.default({
fn
});
this.handles.set(handle.id, handle);
return handle;
}
createSharedReference(value, isCacheable = true) {
let ref = referenceId++;
this.sharedReferences.set(ref, value);
this.sharedReferencesByValue.set(value, ref);
if (!isCacheable) {
this.serializedSharedReferences.set(ref, null);
}
return {
ref,
dispose: () => {
this.sharedReferences.delete(ref);
this.sharedReferencesByValue.delete(value);
this.serializedSharedReferences.delete(ref);
let promises = [];
for (let worker of this.workers.values()) {
if (!worker.sentSharedReferences.has(ref)) {
continue;
}
worker.sentSharedReferences.delete(ref);
promises.push(new Promise((resolve, reject) => {
worker.call({
method: 'deleteSharedReference',
args: [ref],
resolve,
reject,
skipReadyCheck: true,
retries: 0
});
}));
}
return Promise.all(promises);
}
};
}
getSerializedSharedReference(ref) {
let cached = this.serializedSharedReferences.get(ref);
if (cached) {
return cached;
}
let value = this.sharedReferences.get(ref);
let buf = (0, _core().serialize)(value).buffer;
// If the reference was created with the isCacheable option set to false,
// serializedSharedReferences will contain `null` as the value.
if (cached !== null) {
this.serializedSharedReferences.set(ref, buf);
}
return buf;
}
async startProfile() {
let promises = [];
for (let worker of this.workers.values()) {
promises.push(new Promise((resolve, reject) => {
worker.call({
method: 'startProfile',
args: [],
resolve,
reject,
retries: 0,
skipReadyCheck: true
});
}));
}
this.profiler = new (_profiler().SamplingProfiler)();
promises.push(this.profiler.startProfiling());
await Promise.all(promises);
}
async endProfile() {
if (!this.profiler) {
return;
}
let promises = [this.profiler.stopProfiling()];
let names = ['Master'];
for (let worker of this.workers.values()) {
names.push('Worker ' + worker.id);
promises.push(new Promise((resolve, reject) => {
worker.call({
method: 'endProfile',
args: [],
resolve,
reject,
retries: 0,
skipReadyCheck: true
});
}));
}
var profiles = await Promise.all(promises);
let trace = new (_profiler().Trace)();
let filename = `profile-${getTimeId()}.trace`;
let stream = trace.pipe(_fs().default.createWriteStream(filename));
for (let profile of profiles) {
trace.addCPUProfile(names.shift(), profile);
}
trace.flush();
await new Promise(resolve => {
stream.once('finish', resolve);
});
_logger().default.info({
origin: '@parcel/workers',
message: (0, _diagnostic().md)`Wrote profile to ${filename}`
});
}
async callAllWorkers(method, args) {
let promises = [];
for (let worker of this.workers.values()) {
promises.push(new Promise((resolve, reject) => {
worker.call({
method,
args,
resolve,
reject,
retries: 0
});
}));
}
promises.push(this.localWorker[method](this.workerApi, ...args));
await Promise.all(promises);
}
async takeHeapSnapshot() {
let snapshotId = getTimeId();
try {
let snapshotPaths = await Promise.all([...this.workers.values()].map(worker => new Promise((resolve, reject) => {
worker.call({
method: 'takeHeapSnapshot',
args: [snapshotId],
resolve,
reject,
retries: 0,
skipReadyCheck: true
});
})));
_logger().default.info({
origin: '@parcel/workers',
message: (0, _diagnostic().md)`Wrote heap snapshots to the following paths:\n${snapshotPaths.join('\n')}`
});
} catch {
_logger().default.error({
origin: '@parcel/workers',
message: 'Unable to take heap snapshots. Note: requires Node 11.13.0+'
});
}
}
static getNumWorkers() {
return process.env.PARCEL_WORKERS ? parseInt(process.env.PARCEL_WORKERS, 10) : Math.min(4, Math.ceil((0, _cpuCount.default)() / 2));
}
static isWorker() {
return !!_childState.child;
}
static getWorkerApi() {
(0, _assert().default)(_childState.child != null, 'WorkerFarm.getWorkerApi can only be called within workers');
return _childState.child.workerApi;
}
static getConcurrentCallsPerWorker(defaultValue = DEFAULT_MAX_CONCURRENT_CALLS) {
return parseInt(process.env.PARCEL_MAX_CONCURRENT_CALLS, 10) || defaultValue;
}
}
exports.default = WorkerFarm;
function getTimeId() {
let now = new Date();
return String(now.getFullYear()) + String(now.getMonth() + 1).padStart(2, '0') + String(now.getDate()).padStart(2, '0') + '-' + String(now.getHours()).padStart(2, '0') + String(now.getMinutes()).padStart(2, '0') + String(now.getSeconds()).padStart(2, '0');
}

View File

@@ -0,0 +1,34 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.detectBackend = detectBackend;
exports.getWorkerBackend = getWorkerBackend;
function detectBackend() {
// $FlowFixMe
if (process.browser) return 'web';
switch (process.env.PARCEL_WORKER_BACKEND) {
case 'threads':
case 'process':
return process.env.PARCEL_WORKER_BACKEND;
}
try {
require('worker_threads');
return 'threads';
} catch (err) {
return 'process';
}
}
function getWorkerBackend(backend) {
switch (backend) {
case 'threads':
return require('./threads/ThreadsWorker').default;
case 'process':
return require('./process/ProcessWorker').default;
case 'web':
return require('./web/WebWorker').default;
default:
throw new Error(`Invalid backend: ${backend}`);
}
}

View File

@@ -0,0 +1,31 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
function _events() {
const data = _interopRequireDefault(require("events"));
_events = function () {
return data;
};
return data;
}
var _childState = require("./childState");
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
class Bus extends _events().default {
emit(event, ...args) {
if (_childState.child) {
_childState.child.workerApi.callMaster({
// $FlowFixMe
location: process.browser ? '@parcel/workers/src/bus.js' : __filename,
method: 'emit',
args: [event, ...args]
}, false);
return true;
} else {
return super.emit(event, ...args);
}
}
}
var _default = exports.default = new Bus();

View File

@@ -0,0 +1,295 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.Child = void 0;
var coreWorker = _interopRequireWildcard(require("./core-worker"));
function _assert() {
const data = _interopRequireDefault(require("assert"));
_assert = function () {
return data;
};
return data;
}
function _nullthrows() {
const data = _interopRequireDefault(require("nullthrows"));
_nullthrows = function () {
return data;
};
return data;
}
function _logger() {
const data = _interopRequireWildcard(require("@parcel/logger"));
_logger = function () {
return data;
};
return data;
}
function _diagnostic() {
const data = _interopRequireWildcard(require("@parcel/diagnostic"));
_diagnostic = function () {
return data;
};
return data;
}
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
var _bus = _interopRequireDefault(require("./bus"));
function _profiler() {
const data = require("@parcel/profiler");
_profiler = function () {
return data;
};
return data;
}
var _Handle2 = _interopRequireDefault(require("./Handle"));
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _getRequireWildcardCache(e) { if ("function" != typeof WeakMap) return null; var r = new WeakMap(), t = new WeakMap(); return (_getRequireWildcardCache = function (e) { return e ? t : r; })(e); }
function _interopRequireWildcard(e, r) { if (!r && e && e.__esModule) return e; if (null === e || "object" != typeof e && "function" != typeof e) return { default: e }; var t = _getRequireWildcardCache(r); if (t && t.has(e)) return t.get(e); var n = { __proto__: null }, a = Object.defineProperty && Object.getOwnPropertyDescriptor; for (var u in e) if ("default" !== u && Object.prototype.hasOwnProperty.call(e, u)) { var i = a ? Object.getOwnPropertyDescriptor(e, u) : null; i && (i.get || i.set) ? Object.defineProperty(n, u, i) : n[u] = e[u]; } return n.default = e, t && t.set(e, n), n; }
// The import of './Handle' should really be imported eagerly (with @babel/plugin-transform-modules-commonjs's lazy mode).
const Handle = _Handle2.default;
class Child {
callQueue = [];
maxConcurrentCalls = 10;
responseId = 0;
responseQueue = new Map();
handles = new Map();
sharedReferences = new Map();
sharedReferencesByValue = new Map();
constructor(ChildBackend) {
this.child = new ChildBackend(m => {
this.messageListener(m);
}, () => this.handleEnd());
// Monitior all logging events inside this child process and forward to
// the main process via the bus.
this.loggerDisposable = _logger().default.onLog(event => {
_bus.default.emit('logEvent', event);
});
// .. and do the same for trace events
this.tracerDisposable = _profiler().tracer.onTrace(event => {
_bus.default.emit('traceEvent', event);
});
}
workerApi = {
callMaster: (request, awaitResponse = true) => this.addCall(request, awaitResponse),
createReverseHandle: fn => this.createReverseHandle(fn),
runHandle: (handle, args) => this.workerApi.callMaster({
handle: handle.id,
args
}, true),
getSharedReference: ref => this.sharedReferences.get(ref),
resolveSharedReference: value => this.sharedReferencesByValue.get(value)
};
messageListener(message) {
if (message.type === 'response') {
return this.handleResponse(message);
} else if (message.type === 'request') {
return this.handleRequest(message);
}
}
send(data) {
this.child.send(data);
}
async childInit(module, childId) {
// $FlowFixMe
if (process.browser) {
if (module === '@parcel/core/src/worker.js') {
this.module = coreWorker;
} else {
throw new Error('No dynamic require possible: ' + module);
}
} else {
// $FlowFixMe this must be dynamic
this.module = require(module);
}
this.childId = childId;
if (this.module.childInit != null) {
await this.module.childInit();
}
}
async handleRequest(data) {
let {
idx,
method,
args,
handle: handleId
} = data;
let child = (0, _nullthrows().default)(data.child);
const responseFromContent = content => ({
idx,
child,
type: 'response',
contentType: 'data',
content
});
const errorResponseFromError = e => ({
idx,
child,
type: 'response',
contentType: 'error',
content: (0, _diagnostic().anyToDiagnostic)(e)
});
let result;
if (handleId != null) {
try {
var _this$handles$get;
let fn = (0, _nullthrows().default)((_this$handles$get = this.handles.get(handleId)) === null || _this$handles$get === void 0 ? void 0 : _this$handles$get.fn);
result = responseFromContent(fn(...args));
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'childInit') {
try {
let [moduleName, childOptions] = args;
if (childOptions.shouldPatchConsole) {
(0, _logger().patchConsole)();
} else {
(0, _logger().unpatchConsole)();
}
if (childOptions.shouldTrace) {
_profiler().tracer.enable();
}
result = responseFromContent(await this.childInit(moduleName, child));
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'startProfile') {
this.profiler = new (_profiler().SamplingProfiler)();
try {
result = responseFromContent(await this.profiler.startProfiling());
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'endProfile') {
try {
let res = this.profiler ? await this.profiler.stopProfiling() : null;
result = responseFromContent(res);
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'takeHeapSnapshot') {
try {
let v8 = require('v8');
result = responseFromContent(v8.writeHeapSnapshot('heap-' + args[0] + '-' + (this.childId ? 'worker' + this.childId : 'main') + '.heapsnapshot'));
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'createSharedReference') {
let [ref, _value] = args;
let value = _value instanceof ArrayBuffer ?
// In the case the value is pre-serialized as a buffer,
// deserialize it.
(0, _core().deserialize)(Buffer.from(_value)) : _value;
this.sharedReferences.set(ref, value);
this.sharedReferencesByValue.set(value, ref);
result = responseFromContent(null);
} else if (method === 'deleteSharedReference') {
let ref = args[0];
let value = this.sharedReferences.get(ref);
this.sharedReferencesByValue.delete(value);
this.sharedReferences.delete(ref);
result = responseFromContent(null);
} else {
try {
result = responseFromContent(
// $FlowFixMe
await this.module[method](this.workerApi, ...args));
} catch (e) {
result = errorResponseFromError(e);
}
}
try {
this.send(result);
} catch (e) {
result = this.send(errorResponseFromError(e));
}
}
handleResponse(data) {
let idx = (0, _nullthrows().default)(data.idx);
let contentType = data.contentType;
let content = data.content;
let call = (0, _nullthrows().default)(this.responseQueue.get(idx));
if (contentType === 'error') {
(0, _assert().default)(typeof content !== 'string');
call.reject(new (_diagnostic().default)({
diagnostic: content
}));
} else {
call.resolve(content);
}
this.responseQueue.delete(idx);
// Process the next call
this.processQueue();
}
// Keep in mind to make sure responses to these calls are JSON.Stringify safe
addCall(request, awaitResponse = true) {
var _promise;
// $FlowFixMe
let call = {
...request,
type: 'request',
child: this.childId,
// $FlowFixMe Added in Flow 0.121.0 upgrade in #4381
awaitResponse,
resolve: () => {},
reject: () => {}
};
let promise;
if (awaitResponse) {
promise = new Promise((resolve, reject) => {
call.resolve = resolve;
call.reject = reject;
});
}
this.callQueue.push(call);
this.processQueue();
return (_promise = promise) !== null && _promise !== void 0 ? _promise : Promise.resolve();
}
sendRequest(call) {
let idx;
if (call.awaitResponse) {
idx = this.responseId++;
this.responseQueue.set(idx, call);
}
this.send({
idx,
child: call.child,
type: call.type,
location: call.location,
handle: call.handle,
method: call.method,
args: call.args,
awaitResponse: call.awaitResponse
});
}
processQueue() {
if (!this.callQueue.length) {
return;
}
if (this.responseQueue.size < this.maxConcurrentCalls) {
this.sendRequest(this.callQueue.shift());
}
}
handleEnd() {
this.loggerDisposable.dispose();
this.tracerDisposable.dispose();
}
createReverseHandle(fn) {
let handle = new Handle({
fn,
childId: this.childId
});
this.handles.set(handle.id, handle);
return handle;
}
}
exports.Child = Child;

View File

@@ -0,0 +1,14 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.child = void 0;
exports.setChild = setChild;
// This file is imported by both the WorkerFarm and child implementation.
// When a worker is inited, it sets the state in this file.
// This way, WorkerFarm can access the state without directly importing the child code.
let child = exports.child = null;
function setChild(c) {
exports.child = child = c;
}

View File

@@ -0,0 +1,4 @@
"use strict";
// eslint-disable-next-line monorepo/no-internal-import
module.exports = require('@parcel/core/src/worker.js');

View File

@@ -0,0 +1,4 @@
"use strict";
// This is used only in browser builds
module.exports = {};

View File

@@ -0,0 +1,79 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = getCores;
exports.detectRealCores = detectRealCores;
function _os() {
const data = _interopRequireDefault(require("os"));
_os = function () {
return data;
};
return data;
}
function _child_process() {
const data = require("child_process");
_child_process = function () {
return data;
};
return data;
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
const exec = command => {
try {
let stdout = (0, _child_process().execSync)(command, {
encoding: 'utf8',
// This prevents the command from outputting to the console
stdio: [null, null, null]
});
return stdout.trim();
} catch (e) {
return '';
}
};
function detectRealCores() {
let platform = _os().default.platform();
let amount = 0;
if (platform === 'linux') {
amount = parseInt(exec('lscpu -p | egrep -v "^#" | sort -u -t, -k 2,4 | wc -l'), 10);
} else if (platform === 'darwin') {
amount = parseInt(exec('sysctl -n hw.physicalcpu_max'), 10);
} else if (platform === 'win32') {
const str = exec('wmic cpu get NumberOfCores').match(/\d+/g);
if (str !== null) {
amount = parseInt(str.filter(n => n !== '')[0], 10);
}
}
if (!amount || amount <= 0) {
throw new Error('Could not detect cpu count!');
}
return amount;
}
let cores;
function getCores(bypassCache = false) {
// Do not re-run commands if we already have the count...
if (cores && !bypassCache) {
return cores;
}
// $FlowFixMe
if (process.browser) {
// eslint-disable-next-line no-undef
cores = navigator.hardwareConcurrency / 2;
}
if (!cores) {
try {
cores = detectRealCores();
} catch (e) {
// Guess the amount of real cores
cores = _os().default.cpus().filter((cpu, index) => !cpu.model.includes('Intel') || index % 2 === 1).length;
}
}
// Another fallback
if (!cores) {
cores = 1;
}
return cores;
}

View File

@@ -0,0 +1,75 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
Object.defineProperty(exports, "Handle", {
enumerable: true,
get: function () {
return _WorkerFarm.Handle;
}
});
Object.defineProperty(exports, "bus", {
enumerable: true,
get: function () {
return _bus.default;
}
});
exports.default = void 0;
function _assert() {
const data = _interopRequireDefault(require("assert"));
_assert = function () {
return data;
};
return data;
}
var _WorkerFarm = _interopRequireWildcard(require("./WorkerFarm"));
function _logger() {
const data = _interopRequireDefault(require("@parcel/logger"));
_logger = function () {
return data;
};
return data;
}
var _bus = _interopRequireDefault(require("./bus"));
function _profiler() {
const data = require("@parcel/profiler");
_profiler = function () {
return data;
};
return data;
}
function _getRequireWildcardCache(e) { if ("function" != typeof WeakMap) return null; var r = new WeakMap(), t = new WeakMap(); return (_getRequireWildcardCache = function (e) { return e ? t : r; })(e); }
function _interopRequireWildcard(e, r) { if (!r && e && e.__esModule) return e; if (null === e || "object" != typeof e && "function" != typeof e) return { default: e }; var t = _getRequireWildcardCache(r); if (t && t.has(e)) return t.get(e); var n = { __proto__: null }, a = Object.defineProperty && Object.getOwnPropertyDescriptor; for (var u in e) if ("default" !== u && Object.prototype.hasOwnProperty.call(e, u)) { var i = a ? Object.getOwnPropertyDescriptor(e, u) : null; i && (i.get || i.set) ? Object.defineProperty(n, u, i) : n[u] = e[u]; } return n.default = e, t && t.set(e, n), n; }
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
if (!_WorkerFarm.default.isWorker()) {
// Forward all logger events originating from workers into the main process
_bus.default.on('logEvent', e => {
switch (e.level) {
case 'info':
_logger().default.info(e.diagnostics);
break;
case 'progress':
(0, _assert().default)(typeof e.message === 'string');
_logger().default.progress(e.message);
break;
case 'verbose':
_logger().default.verbose(e.diagnostics);
break;
case 'warn':
_logger().default.warn(e.diagnostics);
break;
case 'error':
_logger().default.error(e.diagnostics);
break;
default:
throw new Error('Unknown log level');
}
});
// Forward all trace events originating from workers into the main process
_bus.default.on('traceEvent', e => {
_profiler().tracer.trace(e);
});
}
var _default = exports.default = _WorkerFarm.default;

View File

@@ -0,0 +1,58 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
function _nullthrows() {
const data = _interopRequireDefault(require("nullthrows"));
_nullthrows = function () {
return data;
};
return data;
}
var _childState = require("../childState");
var _child = require("../child");
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
class ProcessChild {
constructor(onMessage, onExit) {
if (!process.send) {
throw new Error('Only create ProcessChild instances in a worker!');
}
this.onMessage = onMessage;
this.onExit = onExit;
process.on('message', data => this.handleMessage(data));
}
handleMessage(data) {
if (data === 'die') {
return this.stop();
}
this.onMessage((0, _core().deserialize)(Buffer.from(data, 'base64')));
}
send(data) {
let processSend = (0, _nullthrows().default)(process.send).bind(process);
processSend((0, _core().serialize)(data).toString('base64'), err => {
if (err && err instanceof Error) {
// $FlowFixMe[prop-missing]
if (err.code === 'ERR_IPC_CHANNEL_CLOSED') {
// IPC connection closed
// no need to keep the worker running if it can't send or receive data
return this.stop();
}
}
});
}
stop() {
this.onExit(0);
process.exit();
}
}
exports.default = ProcessChild;
(0, _childState.setChild)(new _child.Child(ProcessChild));

View File

@@ -0,0 +1,83 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
function _child_process() {
const data = _interopRequireDefault(require("child_process"));
_child_process = function () {
return data;
};
return data;
}
function _path() {
const data = _interopRequireDefault(require("path"));
_path = function () {
return data;
};
return data;
}
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
const WORKER_PATH = _path().default.join(__dirname, 'ProcessChild.js');
class ProcessWorker {
processQueue = true;
sendQueue = [];
constructor(execArgv, onMessage, onError, onExit) {
this.execArgv = execArgv;
this.onMessage = onMessage;
this.onError = onError;
this.onExit = onExit;
}
start() {
this.child = _child_process().default.fork(WORKER_PATH, process.argv, {
execArgv: this.execArgv,
env: process.env,
cwd: process.cwd()
});
this.child.on('message', data => {
this.onMessage((0, _core().deserialize)(Buffer.from(data, 'base64')));
});
this.child.once('exit', this.onExit);
this.child.on('error', this.onError);
return Promise.resolve();
}
async stop() {
this.child.send('die');
let forceKill = setTimeout(() => this.child.kill('SIGINT'), 500);
await new Promise(resolve => {
this.child.once('exit', resolve);
});
clearTimeout(forceKill);
}
send(data) {
if (!this.processQueue) {
this.sendQueue.push(data);
return;
}
let result = this.child.send((0, _core().serialize)(data).toString('base64'), error => {
if (error && error instanceof Error) {
// Ignore this, the workerfarm handles child errors
return;
}
this.processQueue = true;
if (this.sendQueue.length > 0) {
let queueCopy = this.sendQueue.slice(0);
this.sendQueue = [];
queueCopy.forEach(entry => this.send(entry));
}
});
if (!result || /^win/.test(process.platform)) {
// Queue is handling too much messages throttle it
this.processQueue = false;
}
}
}
exports.default = ProcessWorker;

View File

@@ -0,0 +1,49 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
function _worker_threads() {
const data = require("worker_threads");
_worker_threads = function () {
return data;
};
return data;
}
function _nullthrows() {
const data = _interopRequireDefault(require("nullthrows"));
_nullthrows = function () {
return data;
};
return data;
}
var _childState = require("../childState");
var _child = require("../child");
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
class ThreadsChild {
constructor(onMessage, onExit) {
if (_worker_threads().isMainThread || !_worker_threads().parentPort) {
throw new Error('Only create ThreadsChild instances in a worker!');
}
this.onMessage = onMessage;
this.onExit = onExit;
_worker_threads().parentPort.on('message', data => this.handleMessage(data));
_worker_threads().parentPort.on('close', this.onExit);
}
handleMessage(data) {
this.onMessage((0, _core().restoreDeserializedObject)(data));
}
send(data) {
(0, _nullthrows().default)(_worker_threads().parentPort).postMessage((0, _core().prepareForSerialization)(data));
}
}
exports.default = ThreadsChild;
(0, _childState.setChild)(new _child.Child(ThreadsChild));

View File

@@ -0,0 +1,61 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
function _worker_threads() {
const data = require("worker_threads");
_worker_threads = function () {
return data;
};
return data;
}
function _path() {
const data = _interopRequireDefault(require("path"));
_path = function () {
return data;
};
return data;
}
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
const WORKER_PATH = _path().default.join(__dirname, 'ThreadsChild.js');
class ThreadsWorker {
constructor(execArgv, onMessage, onError, onExit) {
this.execArgv = execArgv;
this.onMessage = onMessage;
this.onError = onError;
this.onExit = onExit;
}
start() {
this.worker = new (_worker_threads().Worker)(WORKER_PATH, {
execArgv: this.execArgv,
env: process.env
});
this.worker.on('message', data => this.handleMessage(data));
this.worker.on('error', this.onError);
this.worker.on('exit', this.onExit);
return new Promise(resolve => {
this.worker.on('online', resolve);
});
}
stop() {
// In node 12, this returns a promise, but previously it accepted a callback
// TODO: Pass a callback in earlier versions of Node
return Promise.resolve(this.worker.terminate());
}
handleMessage(data) {
this.onMessage((0, _core().restoreDeserializedObject)(data));
}
send(data) {
this.worker.postMessage((0, _core().prepareForSerialization)(data));
}
}
exports.default = ThreadsWorker;

View File

@@ -0,0 +1 @@
"use strict";

View File

@@ -0,0 +1,44 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
var _childState = require("../childState");
var _child = require("../child");
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
/* eslint-env worker*/
class WebChild {
constructor(onMessage, onExit) {
if (!(typeof WorkerGlobalScope !== 'undefined' && self instanceof WorkerGlobalScope)) {
throw new Error('Only create WebChild instances in a worker!');
}
this.onMessage = onMessage;
this.onExit = onExit;
self.addEventListener('message', ({
data
}) => {
if (data === 'stop') {
this.onExit(0);
self.postMessage('stopped');
}
// $FlowFixMe assume WorkerMessage as data
this.handleMessage(data);
});
self.postMessage('online');
}
handleMessage(data) {
this.onMessage((0, _core().restoreDeserializedObject)(data));
}
send(data) {
self.postMessage((0, _core().prepareForSerialization)(data));
}
}
exports.default = WebChild;
(0, _childState.setChild)(new _child.Child(WebChild));

View File

@@ -0,0 +1,85 @@
"use strict";
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.default = void 0;
function _core() {
const data = require("@parcel/core");
_core = function () {
return data;
};
return data;
}
function _utils() {
const data = require("@parcel/utils");
_utils = function () {
return data;
};
return data;
}
let id = 0;
class WebWorker {
constructor(execArgv, onMessage, onError, onExit) {
this.execArgv = execArgv;
this.onMessage = onMessage;
this.onError = onError;
this.onExit = onExit;
}
start() {
// $FlowFixMe[incompatible-call]
this.worker = new Worker(new URL('./WebChild.js', import.meta.url), {
name: `Parcel Worker ${id++}`,
type: 'module'
});
let {
deferred,
promise
} = (0, _utils().makeDeferredWithPromise)();
this.worker.onmessage = ({
data
}) => {
if (data === 'online') {
deferred.resolve();
return;
}
// $FlowFixMe assume WorkerMessage as data
this.handleMessage(data);
};
this.worker.onerror = this.onError;
// Web workers can't crash or intentionally stop on their own, apart from stop() below
// this.worker.on('exit', this.onExit);
return promise;
}
stop() {
if (!this.stopping) {
this.stopping = (async () => {
this.worker.postMessage('stop');
let {
deferred,
promise
} = (0, _utils().makeDeferredWithPromise)();
this.worker.addEventListener('message', ({
data
}) => {
if (data === 'stopped') {
deferred.resolve();
}
});
await promise;
this.worker.terminate();
this.onExit(0);
})();
}
return this.stopping;
}
handleMessage(data) {
this.onMessage((0, _core().restoreDeserializedObject)(data));
}
send(data) {
this.worker.postMessage((0, _core().prepareForSerialization)(data));
}
}
exports.default = WebWorker;

View File

@@ -0,0 +1,40 @@
{
"name": "@parcel/workers",
"version": "2.12.0",
"description": "Blazing fast, zero configuration web application bundler",
"license": "MIT",
"publishConfig": {
"access": "public"
},
"funding": {
"type": "opencollective",
"url": "https://opencollective.com/parcel"
},
"repository": {
"type": "git",
"url": "https://github.com/parcel-bundler/parcel.git"
},
"main": "lib/index.js",
"source": "src/index.js",
"types": "index.d.ts",
"engines": {
"node": ">= 12.0.0"
},
"dependencies": {
"@parcel/diagnostic": "2.12.0",
"@parcel/logger": "2.12.0",
"@parcel/profiler": "2.12.0",
"@parcel/types": "2.12.0",
"@parcel/utils": "2.12.0",
"nullthrows": "^1.1.1"
},
"peerDependencies": {
"@parcel/core": "^2.12.0"
},
"browser": {
"./src/process/ProcessWorker.js": false,
"./src/threads/ThreadsWorker.js": false,
"./src/core-worker.js": "./src/core-worker.browser.js"
},
"gitHead": "2059029ee91e5f03a273b0954d3e629d7375f986"
}

View File

@@ -0,0 +1,48 @@
// @flow strict-local
import {registerSerializableClass} from '@parcel/core';
// $FlowFixMe
import packageJson from '../package.json';
let HANDLE_ID = 0;
// $FlowFixMe
export type HandleFunction = (...args: Array<any>) => any;
type HandleOpts = {|
fn?: HandleFunction,
childId?: ?number,
id?: number,
|};
const handleById: Map<number, Handle> = new Map();
export default class Handle {
id: number;
childId: ?number;
fn: ?HandleFunction;
constructor(opts: HandleOpts) {
this.id = opts.id ?? ++HANDLE_ID;
this.fn = opts.fn;
this.childId = opts.childId;
handleById.set(this.id, this);
}
dispose() {
handleById.delete(this.id);
}
serialize(): {|childId: ?number, id: number|} {
return {
id: this.id,
childId: this.childId,
};
}
static deserialize(opts: HandleOpts): Handle {
return new Handle(opts);
}
}
// Register the Handle as a serializable class so that it will properly be deserialized
// by anything that uses WorkerFarm.
registerSerializableClass(`${packageJson.version}:Handle`, Handle);

View File

@@ -0,0 +1,227 @@
// @flow
import type {FilePath} from '@parcel/types';
import type {BackendType, WorkerImpl, WorkerMessage} from './types';
import type {SharedReference} from './WorkerFarm';
import nullthrows from 'nullthrows';
import EventEmitter from 'events';
import ThrowableDiagnostic from '@parcel/diagnostic';
import {getWorkerBackend} from './backend';
export type WorkerCall = {|
method?: string,
handle?: number,
args: $ReadOnlyArray<any>,
retries: number,
skipReadyCheck?: boolean,
resolve: (result: Promise<any> | any) => void,
reject: (error: any) => void,
|};
type WorkerOpts = {|
forcedKillTime: number,
backend: BackendType,
shouldPatchConsole?: boolean,
shouldTrace?: boolean,
sharedReferences: $ReadOnlyMap<SharedReference, mixed>,
|};
let WORKER_ID = 0;
export default class Worker extends EventEmitter {
+options: WorkerOpts;
worker: WorkerImpl;
id: number = WORKER_ID++;
sentSharedReferences: Set<SharedReference> = new Set();
calls: Map<number, WorkerCall> = new Map();
exitCode: ?number = null;
callId: number = 0;
ready: boolean = false;
stopped: boolean = false;
isStopping: boolean = false;
constructor(options: WorkerOpts) {
super();
this.options = options;
}
async fork(forkModule: FilePath) {
let filteredArgs = [];
if (process.execArgv) {
filteredArgs = process.execArgv.filter(
v =>
!/^--(debug|inspect|no-opt|max-old-space-size=|max-semi-space-size=|expose-gc)/.test(
v,
),
);
for (let i = 0; i < filteredArgs.length; i++) {
let arg = filteredArgs[i];
let isArgWithParam =
((arg === '-r' || arg === '--require') &&
filteredArgs[i + 1] === '@parcel/register') ||
arg === '--title';
if (isArgWithParam) {
filteredArgs.splice(i, 2);
i--;
}
}
}
// Workaround for https://github.com/nodejs/node/issues/29117
if (process.env.NODE_OPTIONS) {
// arg parsing logic adapted from https://stackoverflow.com/a/46946420/2352201
let opts = [''];
let quote = false;
for (let c of nullthrows(process.env.NODE_OPTIONS.match(/.|^$/g))) {
if (c === '"') {
quote = !quote;
} else if (!quote && c === ' ') {
opts.push('');
} else {
opts[opts.length - 1] += c.replace(/\\(.)/, '$1');
}
}
for (let i = 0; i < opts.length; i++) {
let opt = opts[i];
if (opt === '-r' || opt === '--require') {
filteredArgs.push(opt, opts[i + 1]);
i++;
}
}
}
let onMessage = data => this.receive(data);
let onExit = code => {
this.exitCode = code;
this.emit('exit', code);
};
let onError = err => {
this.emit('error', err);
};
let WorkerBackend = getWorkerBackend(this.options.backend);
this.worker = new WorkerBackend(filteredArgs, onMessage, onError, onExit);
await this.worker.start();
await new Promise((resolve, reject) => {
this.call({
method: 'childInit',
args: [
forkModule,
{
shouldPatchConsole: !!this.options.shouldPatchConsole,
shouldTrace: !!this.options.shouldTrace,
},
],
retries: 0,
skipReadyCheck: true,
resolve,
reject,
});
});
let sharedRefs = this.options.sharedReferences;
let refsShared = new Set();
// in case more refs are created while initial refs are sending
while (refsShared.size < sharedRefs.size) {
await Promise.all(
[...sharedRefs]
.filter(([ref]) => !refsShared.has(ref))
.map(async ([ref, value]) => {
await this.sendSharedReference(ref, value);
refsShared.add(ref);
}),
);
}
this.ready = true;
this.emit('ready');
}
sendSharedReference(ref: SharedReference, value: mixed): Promise<any> {
this.sentSharedReferences.add(ref);
return new Promise((resolve, reject) => {
this.call({
method: 'createSharedReference',
args: [ref, value],
resolve,
reject,
retries: 0,
skipReadyCheck: true,
});
});
}
send(data: WorkerMessage): void {
this.worker.send(data);
}
call(call: WorkerCall): void {
if (this.stopped || this.isStopping) {
return;
}
let idx = this.callId++;
this.calls.set(idx, call);
let msg = {
type: 'request',
idx: idx,
child: this.id,
handle: call.handle,
method: call.method,
args: call.args,
};
if (this.ready || call.skipReadyCheck === true) {
this.send(msg);
} else {
this.once('ready', () => this.send(msg));
}
}
receive(message: WorkerMessage): void {
if (this.stopped || this.isStopping) {
return;
}
if (message.type === 'request') {
this.emit('request', message);
} else if (message.type === 'response') {
let idx = message.idx;
if (idx == null) {
return;
}
let call = this.calls.get(idx);
if (!call) {
// Return for unknown calls, these might accur if a third party process uses workers
return;
}
if (message.contentType === 'error') {
call.reject(new ThrowableDiagnostic({diagnostic: message.content}));
} else {
call.resolve(message.content);
}
this.calls.delete(idx);
this.emit('response', message);
}
}
async stop() {
if (!this.stopped) {
this.stopped = true;
if (this.worker) {
await this.worker.stop();
}
}
}
}

View File

@@ -0,0 +1,707 @@
// @flow
import type {ErrorWithCode, FilePath} from '@parcel/types';
import type {
CallRequest,
HandleCallRequest,
WorkerRequest,
WorkerDataResponse,
WorkerErrorResponse,
BackendType,
} from './types';
import type {HandleFunction} from './Handle';
import * as coreWorker from './core-worker';
import * as bus from './bus';
import invariant from 'assert';
import nullthrows from 'nullthrows';
import EventEmitter from 'events';
import {
deserialize,
prepareForSerialization,
restoreDeserializedObject,
serialize,
} from '@parcel/core';
import ThrowableDiagnostic, {anyToDiagnostic, md} from '@parcel/diagnostic';
import Worker, {type WorkerCall} from './Worker';
import cpuCount from './cpuCount';
import Handle from './Handle';
import {child} from './childState';
import {detectBackend} from './backend';
import {SamplingProfiler, Trace} from '@parcel/profiler';
import fs from 'fs';
import logger from '@parcel/logger';
let referenceId = 1;
export opaque type SharedReference = number;
export type FarmOptions = {|
maxConcurrentWorkers: number,
maxConcurrentCallsPerWorker: number,
forcedKillTime: number,
useLocalWorker: boolean,
warmWorkers: boolean,
workerPath?: FilePath,
backend: BackendType,
shouldPatchConsole?: boolean,
shouldTrace?: boolean,
|};
type WorkerModule = {
+[string]: (...args: Array<mixed>) => Promise<mixed>,
...
};
export type WorkerApi = {|
callMaster(CallRequest, ?boolean): Promise<mixed>,
createReverseHandle(fn: HandleFunction): Handle,
getSharedReference(ref: SharedReference): mixed,
resolveSharedReference(value: mixed): ?SharedReference,
callChild?: (childId: number, request: HandleCallRequest) => Promise<mixed>,
|};
export {Handle};
const DEFAULT_MAX_CONCURRENT_CALLS: number = 30;
/**
* workerPath should always be defined inside farmOptions
*/
export default class WorkerFarm extends EventEmitter {
callQueue: Array<WorkerCall> = [];
ending: boolean = false;
localWorker: WorkerModule;
localWorkerInit: ?Promise<void>;
options: FarmOptions;
run: HandleFunction;
warmWorkers: number = 0;
readyWorkers: number = 0;
workers: Map<number, Worker> = new Map();
handles: Map<number, Handle> = new Map();
sharedReferences: Map<SharedReference, mixed> = new Map();
sharedReferencesByValue: Map<mixed, SharedReference> = new Map();
serializedSharedReferences: Map<SharedReference, ?ArrayBuffer> = new Map();
profiler: ?SamplingProfiler;
constructor(farmOptions: $Shape<FarmOptions> = {}) {
super();
this.options = {
maxConcurrentWorkers: WorkerFarm.getNumWorkers(),
maxConcurrentCallsPerWorker: WorkerFarm.getConcurrentCallsPerWorker(
farmOptions.shouldTrace ? 1 : DEFAULT_MAX_CONCURRENT_CALLS,
),
forcedKillTime: 500,
warmWorkers: false,
useLocalWorker: true, // TODO: setting this to false makes some tests fail, figure out why
backend: detectBackend(),
...farmOptions,
};
if (!this.options.workerPath) {
throw new Error('Please provide a worker path!');
}
// $FlowFixMe
if (process.browser) {
if (this.options.workerPath === '@parcel/core/src/worker.js') {
this.localWorker = coreWorker;
} else {
throw new Error(
'No dynamic require possible: ' + this.options.workerPath,
);
}
} else {
// $FlowFixMe this must be dynamic
this.localWorker = require(this.options.workerPath);
}
this.localWorkerInit =
this.localWorker.childInit != null ? this.localWorker.childInit() : null;
this.run = this.createHandle('run');
// Worker thread stdout is by default piped into the process stdout, if there are enough worker
// threads to exceed the default listener limit, then anything else piping into stdout will trigger
// the `MaxListenersExceededWarning`, so we should ensure the max listeners is at least equal to the
// number of workers + 1 for the main thread.
//
// Note this can't be fixed easily where other things pipe into stdout - even after starting > 10 worker
// threads `process.stdout.getMaxListeners()` will still return 10, however adding another pipe into `stdout`
// will give the warning with `<worker count + 1>` as the number of listeners.
process.stdout?.setMaxListeners(
Math.max(
process.stdout.getMaxListeners(),
WorkerFarm.getNumWorkers() + 1,
),
);
this.startMaxWorkers();
}
workerApi: {|
callChild: (childId: number, request: HandleCallRequest) => Promise<mixed>,
callMaster: (
request: CallRequest,
awaitResponse?: ?boolean,
) => Promise<mixed>,
createReverseHandle: (fn: HandleFunction) => Handle,
getSharedReference: (ref: SharedReference) => mixed,
resolveSharedReference: (value: mixed) => void | SharedReference,
runHandle: (handle: Handle, args: Array<any>) => Promise<mixed>,
|} = {
callMaster: async (
request: CallRequest,
awaitResponse: ?boolean = true,
): Promise<mixed> => {
// $FlowFixMe
let result = await this.processRequest({
...request,
awaitResponse,
});
return deserialize(serialize(result));
},
createReverseHandle: (fn: HandleFunction): Handle =>
this.createReverseHandle(fn),
callChild: (childId: number, request: HandleCallRequest): Promise<mixed> =>
new Promise((resolve, reject) => {
nullthrows(this.workers.get(childId)).call({
...request,
resolve,
reject,
retries: 0,
});
}),
runHandle: (handle: Handle, args: Array<any>): Promise<mixed> =>
this.workerApi.callChild(nullthrows(handle.childId), {
handle: handle.id,
args,
}),
getSharedReference: (ref: SharedReference) =>
this.sharedReferences.get(ref),
resolveSharedReference: (value: mixed) =>
this.sharedReferencesByValue.get(value),
};
warmupWorker(method: string, args: Array<any>): void {
// Workers are already stopping
if (this.ending) {
return;
}
// Workers are not warmed up yet.
// Send the job to a remote worker in the background,
// but use the result from the local worker - it will be faster.
let promise = this.addCall(method, [...args, true]);
if (promise) {
promise
.then(() => {
this.warmWorkers++;
if (this.warmWorkers >= this.workers.size) {
this.emit('warmedup');
}
})
.catch(() => {});
}
}
shouldStartRemoteWorkers(): boolean {
return (
this.options.maxConcurrentWorkers > 0 || !this.options.useLocalWorker
);
}
createHandle(method: string, useMainThread: boolean = false): HandleFunction {
if (!this.options.useLocalWorker) {
useMainThread = false;
}
return async (...args) => {
// Child process workers are slow to start (~600ms).
// While we're waiting, just run on the main thread.
// This significantly speeds up startup time.
if (this.shouldUseRemoteWorkers() && !useMainThread) {
return this.addCall(method, [...args, false]);
} else {
if (this.options.warmWorkers && this.shouldStartRemoteWorkers()) {
this.warmupWorker(method, args);
}
let processedArgs;
if (!useMainThread) {
processedArgs = restoreDeserializedObject(
prepareForSerialization([...args, false]),
);
} else {
processedArgs = args;
}
if (this.localWorkerInit != null) {
await this.localWorkerInit;
this.localWorkerInit = null;
}
return this.localWorker[method](this.workerApi, ...processedArgs);
}
};
}
onError(error: ErrorWithCode, worker: Worker): void | Promise<void> {
// Handle ipc errors
if (error.code === 'ERR_IPC_CHANNEL_CLOSED') {
return this.stopWorker(worker);
} else {
logger.error(error, '@parcel/workers');
}
}
startChild() {
let worker = new Worker({
forcedKillTime: this.options.forcedKillTime,
backend: this.options.backend,
shouldPatchConsole: this.options.shouldPatchConsole,
shouldTrace: this.options.shouldTrace,
sharedReferences: this.sharedReferences,
});
worker.fork(nullthrows(this.options.workerPath));
worker.on('request', data => this.processRequest(data, worker));
worker.on('ready', () => {
this.readyWorkers++;
if (this.readyWorkers === this.options.maxConcurrentWorkers) {
this.emit('ready');
}
this.processQueue();
});
worker.on('response', () => this.processQueue());
worker.on('error', err => this.onError(err, worker));
worker.once('exit', () => this.stopWorker(worker));
this.workers.set(worker.id, worker);
}
async stopWorker(worker: Worker): Promise<void> {
if (!worker.stopped) {
this.workers.delete(worker.id);
worker.isStopping = true;
if (worker.calls.size) {
for (let call of worker.calls.values()) {
call.retries++;
this.callQueue.unshift(call);
}
}
worker.calls.clear();
await worker.stop();
// Process any requests that failed and start a new worker
this.processQueue();
}
}
processQueue(): void {
if (this.ending || !this.callQueue.length) return;
if (this.workers.size < this.options.maxConcurrentWorkers) {
this.startChild();
}
let workers = [...this.workers.values()].sort(
(a, b) => a.calls.size - b.calls.size,
);
for (let worker of workers) {
if (!this.callQueue.length) {
break;
}
if (!worker.ready || worker.stopped || worker.isStopping) {
continue;
}
if (worker.calls.size < this.options.maxConcurrentCallsPerWorker) {
this.callWorker(worker, this.callQueue.shift());
}
}
}
async callWorker(worker: Worker, call: WorkerCall): Promise<void> {
for (let ref of this.sharedReferences.keys()) {
if (!worker.sentSharedReferences.has(ref)) {
await worker.sendSharedReference(
ref,
this.getSerializedSharedReference(ref),
);
}
}
worker.call(call);
}
async processRequest(
data: {|
location: FilePath,
|} & $Shape<WorkerRequest>,
worker?: Worker,
): Promise<?string> {
let {method, args, location, awaitResponse, idx, handle: handleId} = data;
let mod;
if (handleId != null) {
mod = nullthrows(this.handles.get(handleId)?.fn);
} else if (location) {
// $FlowFixMe
if (process.browser) {
if (location === '@parcel/workers/src/bus.js') {
mod = (bus: any);
} else {
throw new Error('No dynamic require possible: ' + location);
}
} else {
// $FlowFixMe this must be dynamic
mod = require(location);
}
} else {
throw new Error('Unknown request');
}
const responseFromContent = (content: any): WorkerDataResponse => ({
idx,
type: 'response',
contentType: 'data',
content,
});
const errorResponseFromError = (e: Error): WorkerErrorResponse => ({
idx,
type: 'response',
contentType: 'error',
content: anyToDiagnostic(e),
});
let result;
if (method == null) {
try {
result = responseFromContent(await mod(...args));
} catch (e) {
result = errorResponseFromError(e);
}
} else {
// ESModule default interop
if (mod.__esModule && !mod[method] && mod.default) {
mod = mod.default;
}
try {
// $FlowFixMe
result = responseFromContent(await mod[method](...args));
} catch (e) {
result = errorResponseFromError(e);
}
}
if (awaitResponse) {
if (worker) {
worker.send(result);
} else {
if (result.contentType === 'error') {
throw new ThrowableDiagnostic({diagnostic: result.content});
}
return result.content;
}
}
}
addCall(method: string, args: Array<any>): Promise<any> {
if (this.ending) {
throw new Error('Cannot add a worker call if workerfarm is ending.');
}
return new Promise((resolve, reject) => {
this.callQueue.push({
method,
args: args,
retries: 0,
resolve,
reject,
});
this.processQueue();
});
}
async end(): Promise<void> {
this.ending = true;
await Promise.all(
Array.from(this.workers.values()).map(worker => this.stopWorker(worker)),
);
for (let handle of this.handles.values()) {
handle.dispose();
}
this.handles = new Map();
this.sharedReferences = new Map();
this.sharedReferencesByValue = new Map();
this.ending = false;
}
startMaxWorkers(): void {
// Starts workers until the maximum is reached
if (this.workers.size < this.options.maxConcurrentWorkers) {
let toStart = this.options.maxConcurrentWorkers - this.workers.size;
while (toStart--) {
this.startChild();
}
}
}
shouldUseRemoteWorkers(): boolean {
return (
!this.options.useLocalWorker ||
((this.warmWorkers >= this.workers.size || !this.options.warmWorkers) &&
this.options.maxConcurrentWorkers > 0)
);
}
createReverseHandle(fn: HandleFunction): Handle {
let handle = new Handle({fn});
this.handles.set(handle.id, handle);
return handle;
}
createSharedReference(
value: mixed,
isCacheable: boolean = true,
): {|ref: SharedReference, dispose(): Promise<mixed>|} {
let ref = referenceId++;
this.sharedReferences.set(ref, value);
this.sharedReferencesByValue.set(value, ref);
if (!isCacheable) {
this.serializedSharedReferences.set(ref, null);
}
return {
ref,
dispose: () => {
this.sharedReferences.delete(ref);
this.sharedReferencesByValue.delete(value);
this.serializedSharedReferences.delete(ref);
let promises = [];
for (let worker of this.workers.values()) {
if (!worker.sentSharedReferences.has(ref)) {
continue;
}
worker.sentSharedReferences.delete(ref);
promises.push(
new Promise((resolve, reject) => {
worker.call({
method: 'deleteSharedReference',
args: [ref],
resolve,
reject,
skipReadyCheck: true,
retries: 0,
});
}),
);
}
return Promise.all(promises);
},
};
}
getSerializedSharedReference(ref: SharedReference): ArrayBuffer {
let cached = this.serializedSharedReferences.get(ref);
if (cached) {
return cached;
}
let value = this.sharedReferences.get(ref);
let buf = serialize(value).buffer;
// If the reference was created with the isCacheable option set to false,
// serializedSharedReferences will contain `null` as the value.
if (cached !== null) {
this.serializedSharedReferences.set(ref, buf);
}
return buf;
}
async startProfile() {
let promises = [];
for (let worker of this.workers.values()) {
promises.push(
new Promise((resolve, reject) => {
worker.call({
method: 'startProfile',
args: [],
resolve,
reject,
retries: 0,
skipReadyCheck: true,
});
}),
);
}
this.profiler = new SamplingProfiler();
promises.push(this.profiler.startProfiling());
await Promise.all(promises);
}
async endProfile() {
if (!this.profiler) {
return;
}
let promises = [this.profiler.stopProfiling()];
let names = ['Master'];
for (let worker of this.workers.values()) {
names.push('Worker ' + worker.id);
promises.push(
new Promise((resolve, reject) => {
worker.call({
method: 'endProfile',
args: [],
resolve,
reject,
retries: 0,
skipReadyCheck: true,
});
}),
);
}
var profiles = await Promise.all(promises);
let trace = new Trace();
let filename = `profile-${getTimeId()}.trace`;
let stream = trace.pipe(fs.createWriteStream(filename));
for (let profile of profiles) {
trace.addCPUProfile(names.shift(), profile);
}
trace.flush();
await new Promise(resolve => {
stream.once('finish', resolve);
});
logger.info({
origin: '@parcel/workers',
message: md`Wrote profile to ${filename}`,
});
}
async callAllWorkers(method: string, args: Array<any>) {
let promises = [];
for (let worker of this.workers.values()) {
promises.push(
new Promise((resolve, reject) => {
worker.call({
method,
args,
resolve,
reject,
retries: 0,
});
}),
);
}
promises.push(this.localWorker[method](this.workerApi, ...args));
await Promise.all(promises);
}
async takeHeapSnapshot() {
let snapshotId = getTimeId();
try {
let snapshotPaths = await Promise.all(
[...this.workers.values()].map(
worker =>
new Promise((resolve, reject) => {
worker.call({
method: 'takeHeapSnapshot',
args: [snapshotId],
resolve,
reject,
retries: 0,
skipReadyCheck: true,
});
}),
),
);
logger.info({
origin: '@parcel/workers',
message: md`Wrote heap snapshots to the following paths:\n${snapshotPaths.join(
'\n',
)}`,
});
} catch {
logger.error({
origin: '@parcel/workers',
message: 'Unable to take heap snapshots. Note: requires Node 11.13.0+',
});
}
}
static getNumWorkers(): number {
return process.env.PARCEL_WORKERS
? parseInt(process.env.PARCEL_WORKERS, 10)
: Math.min(4, Math.ceil(cpuCount() / 2));
}
static isWorker(): boolean {
return !!child;
}
static getWorkerApi(): {|
callMaster: (
request: CallRequest,
awaitResponse?: ?boolean,
) => Promise<mixed>,
createReverseHandle: (fn: (...args: Array<any>) => mixed) => Handle,
getSharedReference: (ref: SharedReference) => mixed,
resolveSharedReference: (value: mixed) => void | SharedReference,
runHandle: (handle: Handle, args: Array<any>) => Promise<mixed>,
|} {
invariant(
child != null,
'WorkerFarm.getWorkerApi can only be called within workers',
);
return child.workerApi;
}
static getConcurrentCallsPerWorker(
defaultValue?: number = DEFAULT_MAX_CONCURRENT_CALLS,
): number {
return (
parseInt(process.env.PARCEL_MAX_CONCURRENT_CALLS, 10) || defaultValue
);
}
}
function getTimeId() {
let now = new Date();
return (
String(now.getFullYear()) +
String(now.getMonth() + 1).padStart(2, '0') +
String(now.getDate()).padStart(2, '0') +
'-' +
String(now.getHours()).padStart(2, '0') +
String(now.getMinutes()).padStart(2, '0') +
String(now.getSeconds()).padStart(2, '0')
);
}

View File

@@ -0,0 +1,33 @@
// @flow
import type {BackendType, WorkerImpl} from './types';
export function detectBackend(): BackendType {
// $FlowFixMe
if (process.browser) return 'web';
switch (process.env.PARCEL_WORKER_BACKEND) {
case 'threads':
case 'process':
return process.env.PARCEL_WORKER_BACKEND;
}
try {
require('worker_threads');
return 'threads';
} catch (err) {
return 'process';
}
}
export function getWorkerBackend(backend: BackendType): Class<WorkerImpl> {
switch (backend) {
case 'threads':
return require('./threads/ThreadsWorker').default;
case 'process':
return require('./process/ProcessWorker').default;
case 'web':
return require('./web/WebWorker').default;
default:
throw new Error(`Invalid backend: ${backend}`);
}
}

View File

@@ -0,0 +1,24 @@
// @flow
import EventEmitter from 'events';
import {child} from './childState';
class Bus extends EventEmitter {
emit(event: string, ...args: Array<any>): boolean {
if (child) {
child.workerApi.callMaster(
{
// $FlowFixMe
location: process.browser ? '@parcel/workers/src/bus.js' : __filename,
method: 'emit',
args: [event, ...args],
},
false,
);
return true;
} else {
return super.emit(event, ...args);
}
}
}
export default (new Bus(): Bus);

View File

@@ -0,0 +1,322 @@
// @flow
import type {
CallRequest,
WorkerDataResponse,
WorkerErrorResponse,
WorkerMessage,
WorkerRequest,
WorkerResponse,
ChildImpl,
} from './types';
import type {Async, IDisposable} from '@parcel/types';
import type {SharedReference} from './WorkerFarm';
import * as coreWorker from './core-worker';
import invariant from 'assert';
import nullthrows from 'nullthrows';
import Logger, {patchConsole, unpatchConsole} from '@parcel/logger';
import ThrowableDiagnostic, {anyToDiagnostic} from '@parcel/diagnostic';
import {deserialize} from '@parcel/core';
import bus from './bus';
import {SamplingProfiler, tracer} from '@parcel/profiler';
import _Handle from './Handle';
// The import of './Handle' should really be imported eagerly (with @babel/plugin-transform-modules-commonjs's lazy mode).
const Handle = _Handle;
type ChildCall = WorkerRequest & {|
resolve: (result: Promise<any> | any) => void,
reject: (error: any) => void,
|};
export class Child {
callQueue: Array<ChildCall> = [];
childId: ?number;
maxConcurrentCalls: number = 10;
module: ?any;
responseId: number = 0;
responseQueue: Map<number, ChildCall> = new Map();
loggerDisposable: IDisposable;
tracerDisposable: IDisposable;
child: ChildImpl;
profiler: ?SamplingProfiler;
handles: Map<number, Handle> = new Map();
sharedReferences: Map<SharedReference, mixed> = new Map();
sharedReferencesByValue: Map<mixed, SharedReference> = new Map();
constructor(ChildBackend: Class<ChildImpl>) {
this.child = new ChildBackend(
m => {
this.messageListener(m);
},
() => this.handleEnd(),
);
// Monitior all logging events inside this child process and forward to
// the main process via the bus.
this.loggerDisposable = Logger.onLog(event => {
bus.emit('logEvent', event);
});
// .. and do the same for trace events
this.tracerDisposable = tracer.onTrace(event => {
bus.emit('traceEvent', event);
});
}
workerApi: {|
callMaster: (
request: CallRequest,
awaitResponse?: ?boolean,
) => Promise<mixed>,
createReverseHandle: (fn: (...args: Array<any>) => mixed) => Handle,
getSharedReference: (ref: SharedReference) => mixed,
resolveSharedReference: (value: mixed) => void | SharedReference,
runHandle: (handle: Handle, args: Array<any>) => Promise<mixed>,
|} = {
callMaster: (
request: CallRequest,
awaitResponse: ?boolean = true,
): Promise<mixed> => this.addCall(request, awaitResponse),
createReverseHandle: (fn: (...args: Array<any>) => mixed): Handle =>
this.createReverseHandle(fn),
runHandle: (handle: Handle, args: Array<any>): Promise<mixed> =>
this.workerApi.callMaster({handle: handle.id, args}, true),
getSharedReference: (ref: SharedReference) =>
this.sharedReferences.get(ref),
resolveSharedReference: (value: mixed) =>
this.sharedReferencesByValue.get(value),
};
messageListener(message: WorkerMessage): Async<void> {
if (message.type === 'response') {
return this.handleResponse(message);
} else if (message.type === 'request') {
return this.handleRequest(message);
}
}
send(data: WorkerMessage): void {
this.child.send(data);
}
async childInit(module: string, childId: number): Promise<void> {
// $FlowFixMe
if (process.browser) {
if (module === '@parcel/core/src/worker.js') {
this.module = coreWorker;
} else {
throw new Error('No dynamic require possible: ' + module);
}
} else {
// $FlowFixMe this must be dynamic
this.module = require(module);
}
this.childId = childId;
if (this.module.childInit != null) {
await this.module.childInit();
}
}
async handleRequest(data: WorkerRequest): Promise<void> {
let {idx, method, args, handle: handleId} = data;
let child = nullthrows(data.child);
const responseFromContent = (content: any): WorkerDataResponse => ({
idx,
child,
type: 'response',
contentType: 'data',
content,
});
const errorResponseFromError = (e: Error): WorkerErrorResponse => ({
idx,
child,
type: 'response',
contentType: 'error',
content: anyToDiagnostic(e),
});
let result;
if (handleId != null) {
try {
let fn = nullthrows(this.handles.get(handleId)?.fn);
result = responseFromContent(fn(...args));
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'childInit') {
try {
let [moduleName, childOptions] = args;
if (childOptions.shouldPatchConsole) {
patchConsole();
} else {
unpatchConsole();
}
if (childOptions.shouldTrace) {
tracer.enable();
}
result = responseFromContent(await this.childInit(moduleName, child));
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'startProfile') {
this.profiler = new SamplingProfiler();
try {
result = responseFromContent(await this.profiler.startProfiling());
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'endProfile') {
try {
let res = this.profiler ? await this.profiler.stopProfiling() : null;
result = responseFromContent(res);
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'takeHeapSnapshot') {
try {
let v8 = require('v8');
result = responseFromContent(
v8.writeHeapSnapshot(
'heap-' +
args[0] +
'-' +
(this.childId ? 'worker' + this.childId : 'main') +
'.heapsnapshot',
),
);
} catch (e) {
result = errorResponseFromError(e);
}
} else if (method === 'createSharedReference') {
let [ref, _value] = args;
let value =
_value instanceof ArrayBuffer
? // In the case the value is pre-serialized as a buffer,
// deserialize it.
deserialize(Buffer.from(_value))
: _value;
this.sharedReferences.set(ref, value);
this.sharedReferencesByValue.set(value, ref);
result = responseFromContent(null);
} else if (method === 'deleteSharedReference') {
let ref = args[0];
let value = this.sharedReferences.get(ref);
this.sharedReferencesByValue.delete(value);
this.sharedReferences.delete(ref);
result = responseFromContent(null);
} else {
try {
result = responseFromContent(
// $FlowFixMe
await this.module[method](this.workerApi, ...args),
);
} catch (e) {
result = errorResponseFromError(e);
}
}
try {
this.send(result);
} catch (e) {
result = this.send(errorResponseFromError(e));
}
}
handleResponse(data: WorkerResponse): void {
let idx = nullthrows(data.idx);
let contentType = data.contentType;
let content = data.content;
let call = nullthrows(this.responseQueue.get(idx));
if (contentType === 'error') {
invariant(typeof content !== 'string');
call.reject(new ThrowableDiagnostic({diagnostic: content}));
} else {
call.resolve(content);
}
this.responseQueue.delete(idx);
// Process the next call
this.processQueue();
}
// Keep in mind to make sure responses to these calls are JSON.Stringify safe
addCall(
request: CallRequest,
awaitResponse: ?boolean = true,
): Promise<mixed> {
// $FlowFixMe
let call: ChildCall = {
...request,
type: 'request',
child: this.childId,
// $FlowFixMe Added in Flow 0.121.0 upgrade in #4381
awaitResponse,
resolve: () => {},
reject: () => {},
};
let promise;
if (awaitResponse) {
promise = new Promise((resolve, reject) => {
call.resolve = resolve;
call.reject = reject;
});
}
this.callQueue.push(call);
this.processQueue();
return promise ?? Promise.resolve();
}
sendRequest(call: ChildCall): void {
let idx;
if (call.awaitResponse) {
idx = this.responseId++;
this.responseQueue.set(idx, call);
}
this.send({
idx,
child: call.child,
type: call.type,
location: call.location,
handle: call.handle,
method: call.method,
args: call.args,
awaitResponse: call.awaitResponse,
});
}
processQueue(): void {
if (!this.callQueue.length) {
return;
}
if (this.responseQueue.size < this.maxConcurrentCalls) {
this.sendRequest(this.callQueue.shift());
}
}
handleEnd(): void {
this.loggerDisposable.dispose();
this.tracerDisposable.dispose();
}
createReverseHandle(fn: (...args: Array<any>) => mixed): Handle {
let handle = new Handle({
fn,
childId: this.childId,
});
this.handles.set(handle.id, handle);
return handle;
}
}

View File

@@ -0,0 +1,10 @@
// @flow
import type {Child} from './child';
// This file is imported by both the WorkerFarm and child implementation.
// When a worker is inited, it sets the state in this file.
// This way, WorkerFarm can access the state without directly importing the child code.
export let child: ?Child = null;
export function setChild(c: Child) {
child = c;
}

View File

@@ -0,0 +1,3 @@
// @flow
// eslint-disable-next-line monorepo/no-internal-import
module.exports = require('@parcel/core/src/worker.js');

View File

@@ -0,0 +1,2 @@
// This is used only in browser builds
module.exports = {};

View File

@@ -0,0 +1,75 @@
// @flow
import os from 'os';
import {execSync} from 'child_process';
const exec = (command: string): string => {
try {
let stdout = execSync(command, {
encoding: 'utf8',
// This prevents the command from outputting to the console
stdio: [null, null, null],
});
return stdout.trim();
} catch (e) {
return '';
}
};
export function detectRealCores(): number {
let platform = os.platform();
let amount = 0;
if (platform === 'linux') {
amount = parseInt(
exec('lscpu -p | egrep -v "^#" | sort -u -t, -k 2,4 | wc -l'),
10,
);
} else if (platform === 'darwin') {
amount = parseInt(exec('sysctl -n hw.physicalcpu_max'), 10);
} else if (platform === 'win32') {
const str = exec('wmic cpu get NumberOfCores').match(/\d+/g);
if (str !== null) {
amount = parseInt(str.filter(n => n !== '')[0], 10);
}
}
if (!amount || amount <= 0) {
throw new Error('Could not detect cpu count!');
}
return amount;
}
let cores;
export default function getCores(bypassCache?: boolean = false): number {
// Do not re-run commands if we already have the count...
if (cores && !bypassCache) {
return cores;
}
// $FlowFixMe
if (process.browser) {
// eslint-disable-next-line no-undef
cores = navigator.hardwareConcurrency / 2;
}
if (!cores) {
try {
cores = detectRealCores();
} catch (e) {
// Guess the amount of real cores
cores = os
.cpus()
.filter(
(cpu, index) => !cpu.model.includes('Intel') || index % 2 === 1,
).length;
}
}
// Another fallback
if (!cores) {
cores = 1;
}
return cores;
}

View File

@@ -0,0 +1,43 @@
// @flow
import type {TraceEvent, LogEvent} from '@parcel/types';
import invariant from 'assert';
import WorkerFarm from './WorkerFarm';
import Logger from '@parcel/logger';
import bus from './bus';
import {tracer} from '@parcel/profiler';
if (!WorkerFarm.isWorker()) {
// Forward all logger events originating from workers into the main process
bus.on('logEvent', (e: LogEvent) => {
switch (e.level) {
case 'info':
Logger.info(e.diagnostics);
break;
case 'progress':
invariant(typeof e.message === 'string');
Logger.progress(e.message);
break;
case 'verbose':
Logger.verbose(e.diagnostics);
break;
case 'warn':
Logger.warn(e.diagnostics);
break;
case 'error':
Logger.error(e.diagnostics);
break;
default:
throw new Error('Unknown log level');
}
});
// Forward all trace events originating from workers into the main process
bus.on('traceEvent', (e: TraceEvent) => {
tracer.trace(e);
});
}
export default WorkerFarm;
export {bus};
export {Handle} from './WorkerFarm';
export type {WorkerApi, FarmOptions, SharedReference} from './WorkerFarm';

View File

@@ -0,0 +1,56 @@
// @flow
import type {
ChildImpl,
MessageHandler,
ExitHandler,
WorkerMessage,
} from '../types';
import nullthrows from 'nullthrows';
import {setChild} from '../childState';
import {Child} from '../child';
import {serialize, deserialize} from '@parcel/core';
export default class ProcessChild implements ChildImpl {
onMessage: MessageHandler;
onExit: ExitHandler;
constructor(onMessage: MessageHandler, onExit: ExitHandler) {
if (!process.send) {
throw new Error('Only create ProcessChild instances in a worker!');
}
this.onMessage = onMessage;
this.onExit = onExit;
process.on('message', data => this.handleMessage(data));
}
handleMessage(data: string): void {
if (data === 'die') {
return this.stop();
}
this.onMessage(deserialize(Buffer.from(data, 'base64')));
}
send(data: WorkerMessage) {
let processSend = nullthrows(process.send).bind(process);
processSend(serialize(data).toString('base64'), err => {
if (err && err instanceof Error) {
// $FlowFixMe[prop-missing]
if (err.code === 'ERR_IPC_CHANNEL_CLOSED') {
// IPC connection closed
// no need to keep the worker running if it can't send or receive data
return this.stop();
}
}
});
}
stop() {
this.onExit(0);
process.exit();
}
}
setChild(new Child(ProcessChild));

View File

@@ -0,0 +1,91 @@
// @flow
import type {
WorkerImpl,
MessageHandler,
ErrorHandler,
ExitHandler,
WorkerMessage,
} from '../types';
import childProcess, {type ChildProcess} from 'child_process';
import path from 'path';
import {serialize, deserialize} from '@parcel/core';
const WORKER_PATH = path.join(__dirname, 'ProcessChild.js');
export default class ProcessWorker implements WorkerImpl {
execArgv: Object;
onMessage: MessageHandler;
onError: ErrorHandler;
onExit: ExitHandler;
child: ChildProcess;
processQueue: boolean = true;
sendQueue: Array<any> = [];
constructor(
execArgv: Object,
onMessage: MessageHandler,
onError: ErrorHandler,
onExit: ExitHandler,
) {
this.execArgv = execArgv;
this.onMessage = onMessage;
this.onError = onError;
this.onExit = onExit;
}
start(): Promise<void> {
this.child = childProcess.fork(WORKER_PATH, process.argv, {
execArgv: this.execArgv,
env: process.env,
cwd: process.cwd(),
});
this.child.on('message', (data: string) => {
this.onMessage(deserialize(Buffer.from(data, 'base64')));
});
this.child.once('exit', this.onExit);
this.child.on('error', this.onError);
return Promise.resolve();
}
async stop() {
this.child.send('die');
let forceKill = setTimeout(() => this.child.kill('SIGINT'), 500);
await new Promise(resolve => {
this.child.once('exit', resolve);
});
clearTimeout(forceKill);
}
send(data: WorkerMessage) {
if (!this.processQueue) {
this.sendQueue.push(data);
return;
}
let result = this.child.send(serialize(data).toString('base64'), error => {
if (error && error instanceof Error) {
// Ignore this, the workerfarm handles child errors
return;
}
this.processQueue = true;
if (this.sendQueue.length > 0) {
let queueCopy = this.sendQueue.slice(0);
this.sendQueue = [];
queueCopy.forEach(entry => this.send(entry));
}
});
if (!result || /^win/.test(process.platform)) {
// Queue is handling too much messages throttle it
this.processQueue = false;
}
}
}

View File

@@ -0,0 +1,39 @@
// @flow
import type {
ChildImpl,
MessageHandler,
ExitHandler,
WorkerMessage,
} from '../types';
import {isMainThread, parentPort} from 'worker_threads';
import nullthrows from 'nullthrows';
import {setChild} from '../childState';
import {Child} from '../child';
import {prepareForSerialization, restoreDeserializedObject} from '@parcel/core';
export default class ThreadsChild implements ChildImpl {
onMessage: MessageHandler;
onExit: ExitHandler;
constructor(onMessage: MessageHandler, onExit: ExitHandler) {
if (isMainThread || !parentPort) {
throw new Error('Only create ThreadsChild instances in a worker!');
}
this.onMessage = onMessage;
this.onExit = onExit;
parentPort.on('message', data => this.handleMessage(data));
parentPort.on('close', this.onExit);
}
handleMessage(data: WorkerMessage) {
this.onMessage(restoreDeserializedObject(data));
}
send(data: WorkerMessage) {
nullthrows(parentPort).postMessage(prepareForSerialization(data));
}
}
setChild(new Child(ThreadsChild));

View File

@@ -0,0 +1,63 @@
// @flow
import type {
WorkerImpl,
MessageHandler,
ErrorHandler,
ExitHandler,
WorkerMessage,
} from '../types';
import {Worker} from 'worker_threads';
import path from 'path';
import {prepareForSerialization, restoreDeserializedObject} from '@parcel/core';
const WORKER_PATH = path.join(__dirname, 'ThreadsChild.js');
export default class ThreadsWorker implements WorkerImpl {
execArgv: Object;
onMessage: MessageHandler;
onError: ErrorHandler;
onExit: ExitHandler;
worker: Worker;
constructor(
execArgv: Object,
onMessage: MessageHandler,
onError: ErrorHandler,
onExit: ExitHandler,
) {
this.execArgv = execArgv;
this.onMessage = onMessage;
this.onError = onError;
this.onExit = onExit;
}
start(): Promise<void> {
this.worker = new Worker(WORKER_PATH, {
execArgv: this.execArgv,
env: process.env,
});
this.worker.on('message', data => this.handleMessage(data));
this.worker.on('error', this.onError);
this.worker.on('exit', this.onExit);
return new Promise<void>(resolve => {
this.worker.on('online', resolve);
});
}
stop(): Promise<void> {
// In node 12, this returns a promise, but previously it accepted a callback
// TODO: Pass a callback in earlier versions of Node
return Promise.resolve(this.worker.terminate());
}
handleMessage(data: WorkerMessage) {
this.onMessage(restoreDeserializedObject(data));
}
send(data: WorkerMessage) {
this.worker.postMessage(prepareForSerialization(data));
}
}

View File

@@ -0,0 +1,68 @@
// @flow
import type {Diagnostic} from '@parcel/diagnostic';
import type {FilePath} from '@parcel/types';
export type LocationCallRequest = {|
args: $ReadOnlyArray<mixed>,
location: string,
method?: string,
|};
export type HandleCallRequest = {|
args: $ReadOnlyArray<mixed>,
handle: number,
|};
export type CallRequest = LocationCallRequest | HandleCallRequest;
export type WorkerRequest = {|
args: $ReadOnlyArray<any>,
awaitResponse?: boolean,
child?: ?number,
idx?: number,
location?: FilePath,
method?: ?string,
type: 'request',
handle?: number,
|};
export type WorkerDataResponse = {|
idx?: number,
child?: number,
type: 'response',
contentType: 'data',
content: string,
|};
export type WorkerErrorResponse = {|
idx?: number,
child?: number,
type: 'response',
contentType: 'error',
content: Diagnostic | Array<Diagnostic>,
|};
export type WorkerResponse = WorkerDataResponse | WorkerErrorResponse;
export type WorkerMessage = WorkerRequest | WorkerResponse;
export type MessageHandler = (data: WorkerMessage) => void;
export type ErrorHandler = (err: Error) => void;
export type ExitHandler = (code: number) => void;
export interface WorkerImpl {
constructor(
execArgv: Object,
onMessage: MessageHandler,
onError: ErrorHandler,
onExit: ExitHandler,
): void;
start(): Promise<void>;
stop(): Promise<void>;
send(data: WorkerMessage): void;
}
export interface ChildImpl {
constructor(onMessage: MessageHandler, onExit: ExitHandler): void;
send(data: WorkerMessage): void;
}
export type BackendType = 'threads' | 'process' | 'web';

View File

@@ -0,0 +1,50 @@
// @flow
/* eslint-env worker*/
import type {
ChildImpl,
MessageHandler,
ExitHandler,
WorkerMessage,
} from '../types';
import {setChild} from '../childState';
import {Child} from '../child';
import {prepareForSerialization, restoreDeserializedObject} from '@parcel/core';
export default class WebChild implements ChildImpl {
onMessage: MessageHandler;
onExit: ExitHandler;
constructor(onMessage: MessageHandler, onExit: ExitHandler) {
if (
!(
typeof WorkerGlobalScope !== 'undefined' &&
self instanceof WorkerGlobalScope
)
) {
throw new Error('Only create WebChild instances in a worker!');
}
this.onMessage = onMessage;
this.onExit = onExit;
self.addEventListener('message', ({data}: MessageEvent) => {
if (data === 'stop') {
this.onExit(0);
self.postMessage('stopped');
}
// $FlowFixMe assume WorkerMessage as data
this.handleMessage(data);
});
self.postMessage('online');
}
handleMessage(data: WorkerMessage) {
this.onMessage(restoreDeserializedObject(data));
}
send(data: WorkerMessage) {
self.postMessage(prepareForSerialization(data));
}
}
setChild(new Child(WebChild));

View File

@@ -0,0 +1,85 @@
// @flow
import type {
WorkerImpl,
MessageHandler,
ErrorHandler,
ExitHandler,
WorkerMessage,
} from '../types';
import {prepareForSerialization, restoreDeserializedObject} from '@parcel/core';
import {makeDeferredWithPromise} from '@parcel/utils';
let id = 0;
export default class WebWorker implements WorkerImpl {
execArgv: Object;
onMessage: MessageHandler;
onError: ErrorHandler;
onExit: ExitHandler;
worker: Worker;
stopping: ?Promise<void>;
constructor(
execArgv: Object,
onMessage: MessageHandler,
onError: ErrorHandler,
onExit: ExitHandler,
) {
this.execArgv = execArgv;
this.onMessage = onMessage;
this.onError = onError;
this.onExit = onExit;
}
start(): Promise<void> {
// $FlowFixMe[incompatible-call]
this.worker = new Worker(new URL('./WebChild.js', import.meta.url), {
name: `Parcel Worker ${id++}`,
type: 'module',
});
let {deferred, promise} = makeDeferredWithPromise();
this.worker.onmessage = ({data}) => {
if (data === 'online') {
deferred.resolve();
return;
}
// $FlowFixMe assume WorkerMessage as data
this.handleMessage(data);
};
this.worker.onerror = this.onError;
// Web workers can't crash or intentionally stop on their own, apart from stop() below
// this.worker.on('exit', this.onExit);
return promise;
}
stop(): Promise<void> {
if (!this.stopping) {
this.stopping = (async () => {
this.worker.postMessage('stop');
let {deferred, promise} = makeDeferredWithPromise();
this.worker.addEventListener('message', ({data}: MessageEvent) => {
if (data === 'stopped') {
deferred.resolve();
}
});
await promise;
this.worker.terminate();
this.onExit(0);
})();
}
return this.stopping;
}
handleMessage(data: WorkerMessage) {
this.onMessage(restoreDeserializedObject(data));
}
send(data: WorkerMessage) {
this.worker.postMessage(prepareForSerialization(data));
}
}

View File

@@ -0,0 +1,19 @@
import assert from 'assert';
import os from 'os';
import getCores, {detectRealCores} from '../src/cpuCount';
describe('cpuCount', function () {
it('Should be able to detect real cpu count', () => {
// Windows not supported as getting the cpu count takes a couple seconds...
if (os.platform() === 'win32') return;
let cores = detectRealCores();
assert(cores > 0);
});
it('getCores should return more than 0', () => {
let cores = getCores(true);
assert(cores > 0);
});
});

View File

@@ -0,0 +1,15 @@
const WorkerFarm = require('../../../src/WorkerFarm').default;
function run() {
if (WorkerFarm.isWorker()) {
// Only test this behavior in workers. Logging in the main process will
// always work.
console.log('one');
console.info('two');
console.warn('three');
console.error('four');
console.debug('five');
}
}
exports.run = run;

View File

@@ -0,0 +1,5 @@
function run(_, data) {
return data;
}
exports.run = run;

View File

@@ -0,0 +1,18 @@
const WorkerFarm = require('../../../src/WorkerFarm').default;
function run(api) {
let result = [process.pid];
return new Promise((resolve, reject) => {
api.callMaster({
location: require.resolve('./master-process-id.js'),
args: []
})
.then(pid => {
result.push(pid);
resolve(result);
})
.catch(reject);
});
}
exports.run = run;

View File

@@ -0,0 +1,10 @@
const WorkerFarm = require('../../../src/WorkerFarm').default;
function run(api, a, b) {
return api.callMaster({
location: require.resolve('./master-sum.js'),
args: [a, b]
});
}
exports.run = run;

View File

@@ -0,0 +1,19 @@
const WorkerFarm = require('../../../src/WorkerFarm').default;
const Logger = require('@parcel/logger').default;
function run() {
if (WorkerFarm.isWorker()) {
// Only test this behavior in workers. Logging in the main process will
// always work.
Logger.info({
origin: 'logging-worker',
message: 'omg it works'
});
Logger.error({
origin: 'logging-worker',
message: 'errors objects dont work yet'
});
}
}
exports.run = run;

View File

@@ -0,0 +1,3 @@
module.exports = function() {
return process.pid;
};

View File

@@ -0,0 +1,3 @@
module.exports = function(a, b) {
return a + b;
};

View File

@@ -0,0 +1,5 @@
function run() {
return 'pong';
}
exports.run = run;

View File

@@ -0,0 +1,5 @@
function run(workerApi, ref) {
return ref === workerApi.resolveSharedReference(workerApi.getSharedReference(ref));
}
exports.run = run;

View File

@@ -0,0 +1,5 @@
function run(workerApi, handle) {
return workerApi.runHandle(handle, []);
}
exports.run = run;

View File

@@ -0,0 +1,6 @@
function run(workerApi, ref) {
let sharedReference = workerApi.getSharedReference(ref);
return sharedReference || 'Shared reference does not exist';
}
exports.run = run;

View File

@@ -0,0 +1,362 @@
import Logger from '@parcel/logger';
import assert from 'assert';
import WorkerFarm from '../src';
describe('WorkerFarm', function () {
this.timeout(30000);
it('Should start up workers', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ping.js'),
});
assert.equal(await workerfarm.run(), 'pong');
await workerfarm.end();
});
it('Should handle 1000 requests without any issue', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/echo.js'),
});
let promises = [];
for (let i = 0; i < 1000; i++) {
promises.push(workerfarm.run(i));
}
await Promise.all(promises);
await workerfarm.end();
});
it('Should warm up workers', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: true,
workerPath: require.resolve('./integration/workerfarm/echo.js'),
});
for (let i = 0; i < 100; i++) {
assert.equal(await workerfarm.run(i), i);
}
await new Promise(resolve => workerfarm.once('warmedup', resolve));
assert(workerfarm.workers.size > 0, 'Should have spawned workers.');
assert(
workerfarm.warmWorkers >= workerfarm.workers.size,
'Should have warmed up workers.',
);
await workerfarm.end();
});
it('Should use the local worker', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: true,
workerPath: require.resolve('./integration/workerfarm/echo.js'),
});
assert.equal(await workerfarm.run('hello world'), 'hello world');
assert.equal(workerfarm.shouldUseRemoteWorkers(), false);
await workerfarm.end();
});
it('Should be able to use bi-directional communication', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc.js'),
});
assert.equal(await workerfarm.run(1, 2), 3);
await workerfarm.end();
});
it('Should be able to handle 1000 bi-directional calls', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc.js'),
});
for (let i = 0; i < 1000; i++) {
assert.equal(await workerfarm.run(1 + i, 2), 3 + i);
}
await workerfarm.end();
});
it.skip('Bi-directional call should return masters pid', async () => {
// TODO: this test is only good for processes not threads
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/ipc-pid.js'),
});
let result = await workerfarm.run();
assert.equal(result.length, 2);
assert.equal(result[1], process.pid);
assert.notEqual(result[0], process.pid);
await workerfarm.end();
});
it('Should handle 10 big concurrent requests without any issue', async () => {
// This emulates the node.js ipc bug for win32
let workerfarm = new WorkerFarm({
warmWorkers: false,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/echo.js'),
});
let bigData = [];
for (let i = 0; i < 10000; i++) {
bigData.push('This is some big data');
}
let promises = [];
for (let i = 0; i < 10; i++) {
promises.push(workerfarm.run(bigData));
}
await Promise.all(promises);
await workerfarm.end();
});
it('Forwards stdio from the child process and levels event source if shouldPatchConsole is true', async () => {
let events = [];
let logDisposable = Logger.onLog(event => events.push(event));
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/console.js'),
shouldPatchConsole: true,
});
await workerfarm.run();
assert.deepEqual(events, [
{
level: 'info',
type: 'log',
diagnostics: [
{
origin: 'console',
message: 'one',
skipFormatting: true,
},
],
},
{
level: 'info',
type: 'log',
diagnostics: [
{
origin: 'console',
message: 'two',
skipFormatting: true,
},
],
},
{
level: 'warn',
type: 'log',
diagnostics: [
{
origin: 'console',
message: 'three',
skipFormatting: true,
},
],
},
{
level: 'error',
type: 'log',
diagnostics: [
{
origin: 'console',
message: 'four',
skipFormatting: true,
},
],
},
{
level: 'verbose',
type: 'log',
diagnostics: [
{
message: 'five',
origin: 'console',
skipFormatting: true,
},
],
},
]);
logDisposable.dispose();
await workerfarm.end();
});
it('Forwards logger events to the main process', async () => {
let events = [];
let logDisposable = Logger.onLog(event => events.push(event));
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/logging.js'),
});
await workerfarm.run();
// assert.equal(events.length, 2);
assert.deepEqual(events, [
{
level: 'info',
diagnostics: [
{
origin: 'logging-worker',
message: 'omg it works',
},
],
type: 'log',
},
{
level: 'error',
diagnostics: [
{
origin: 'logging-worker',
message: 'errors objects dont work yet',
},
],
type: 'log',
},
]);
logDisposable.dispose();
await workerfarm.end();
});
it('Should support reverse handle functions in main process that can be called in workers', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/reverse-handle.js'),
});
let handle = workerfarm.createReverseHandle(() => 42);
let result = await workerfarm.run(handle);
assert.equal(result, 42);
await workerfarm.end();
});
it('Should dispose of handle objects when ending', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/reverse-handle.js'),
});
workerfarm.createReverseHandle(() => 42);
assert.equal(workerfarm.handles.size, 1);
await workerfarm.end();
assert.equal(workerfarm.handles.size, 0);
});
it('Should support shared references in workers', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve(
'./integration/workerfarm/shared-reference.js',
),
});
let sharedValue = 'Something to be shared';
let {ref, dispose} = await workerfarm.createSharedReference(sharedValue);
let result = await workerfarm.run(ref);
assert.equal(result, 'Something to be shared');
await dispose();
result = await workerfarm.run(ref);
assert.equal(result, 'Shared reference does not exist');
});
it('Should resolve shared references in workers', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve(
'./integration/workerfarm/resolve-shared-reference.js',
),
});
let sharedValue = 'Something to be shared';
let {ref, dispose} = await workerfarm.createSharedReference(sharedValue);
assert.equal(workerfarm.workerApi.resolveSharedReference(sharedValue), ref);
assert.ok(await workerfarm.run(ref));
await dispose();
assert(workerfarm.workerApi.resolveSharedReference(sharedValue) == null);
});
it('Should support shared references in local worker', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: true,
workerPath: require.resolve(
'./integration/workerfarm/shared-reference.js',
),
});
let sharedValue = 'Something to be shared';
let {ref, dispose} = await workerfarm.createSharedReference(sharedValue);
let result = await workerfarm.run(ref);
assert.equal(result, 'Something to be shared');
await dispose();
result = await workerfarm.run(ref);
assert.equal(result, 'Shared reference does not exist');
});
it('should resolve shared references in local worker', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: true,
workerPath: require.resolve(
'./integration/workerfarm/resolve-shared-reference.js',
),
});
let sharedValue = 'Something to be shared';
let {ref, dispose} = await workerfarm.createSharedReference(sharedValue);
assert.equal(workerfarm.workerApi.resolveSharedReference(sharedValue), ref);
assert.ok(await workerfarm.run(ref));
await dispose();
assert(workerfarm.workerApi.resolveSharedReference(sharedValue) == null);
});
it('Should dispose of shared references when ending', async () => {
let workerfarm = new WorkerFarm({
warmWorkers: true,
useLocalWorker: false,
workerPath: require.resolve('./integration/workerfarm/reverse-handle.js'),
});
workerfarm.createSharedReference('Something to be shared');
assert.equal(workerfarm.sharedReferences.size, 1);
await workerfarm.end();
assert.equal(workerfarm.sharedReferences.size, 0);
});
});