一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

node.js|vue.js|jquery|angularjs|React|json|js教程|

服務器之家 - 編程語言 - JavaScript - node.js - nodejs中使用worker_threads來創建新的線程的方法

nodejs中使用worker_threads來創建新的線程的方法

2022-01-06 15:10flydean程序那些事 node.js

這篇文章主要介紹了nodejs中使用worker_threads來創建新的線程的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

簡介

之前的文章中提到了,nodejs中有兩種線程,一種是event loop用來相應用戶的請求和處理各種callback。另一種就是worker pool用來處理各種耗時操作。

nodejs的官網提到了一個能夠使用nodejs本地woker pool的lib叫做webworker-threads。

可惜的是webworker-threads的最后一次更新還是在2年前,而在最新的nodejs 12中,根本無法使用。

而webworker-threads的作者則推薦了一個新的lib叫做web-worker。

web-worker是構建于nodejs的worker_threads之上的,本文將會詳細講解worker_threads和web-worker的使用。

worker_threads

worker_threads模塊的源代碼源自lib/worker_threads.js,它指的是工作線程,可以開啟一個新的線程來并行執行javascript程序。

worker_threads主要用來處理CPU密集型操作,而不是IO操作,因為nodejs本身的異步IO已經非常強大了。

worker_threads中主要有5個屬性,3個class和3個主要的方法。接下來我們將會一一講解。

isMainThread

isMainThread用來判斷代碼是否在主線程中運行,我們看一個使用的例子:

?
1
2
3
4
5
6
7
8
9
const { Worker, isMainThread } = require('worker_threads');
 
if (isMainThread) {
 console.log('在主線程中');
 new Worker(__filename);
} else {
 console.log('在工作線程中');
 console.log(isMainThread); // 打印 'false'。
}

上面的例子中,我們從worker_threads模塊中引入了Worker和isMainThread,Worker就是工作線程的主類,我們將會在后面詳細講解,這里我們使用Worker創建了一個工作線程。

MessageChannel

MessageChannel代表的是一個異步雙向通信channel。MessageChannel中沒有方法,主要通過MessageChannel來連接兩端的MessagePort。

?
1
2
3
4
class MessageChannel {
  readonly port1: MessagePort;
  readonly port2: MessagePort;
 }

當我們使用new MessageChannel()的時候,會自動創建兩個MessagePort。

?
1
2
3
4
5
6
const { MessageChannel } = require('worker_threads');
 
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// Prints: received { foo: 'bar' } from the `port1.on('message')` listener

通過MessageChannel,我們可以進行MessagePort間的通信。

parentPort和MessagePort

parentPort是一個MessagePort類型,parentPort主要用于worker線程和主線程進行消息交互。

通過parentPort.postMessage()發送的消息在主線程中將可以通過worker.on(‘message')接收。

主線程中通過worker.postMessage()發送的消息將可以在工作線程中通過parentPort.on(‘message')接收。

我們看一下MessagePort的定義:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class MessagePort extends EventEmitter {
  close(): void;
  postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
  ref(): void;
  unref(): void;
  start(): void;
 
  addListener(event: "close", listener: () => void): this;
  addListener(event: "message", listener: (value: any) => void): this;
  addListener(event: string | symbol, listener: (...args: any[]) => void): this;
 
  emit(event: "close"): boolean;
  emit(event: "message", value: any): boolean;
  emit(event: string | symbol, ...args: any[]): boolean;
 
  on(event: "close", listener: () => void): this;
  on(event: "message", listener: (value: any) => void): this;
  on(event: string | symbol, listener: (...args: any[]) => void): this;
 
  once(event: "close", listener: () => void): this;
  once(event: "message", listener: (value: any) => void): this;
  once(event: string | symbol, listener: (...args: any[]) => void): this;
 
  prependListener(event: "close", listener: () => void): this;
  prependListener(event: "message", listener: (value: any) => void): this;
  prependListener(event: string | symbol, listener: (...args: any[]) => void): this;
 
  prependOnceListener(event: "close", listener: () => void): this;
  prependOnceListener(event: "message", listener: (value: any) => void): this;
  prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;
 
  removeListener(event: "close", listener: () => void): this;
  removeListener(event: "message", listener: (value: any) => void): this;
  removeListener(event: string | symbol, listener: (...args: any[]) => void): this;
 
  off(event: "close", listener: () => void): this;
  off(event: "message", listener: (value: any) => void): this;
  off(event: string | symbol, listener: (...args: any[]) => void): this;
 }

MessagePort繼承自EventEmitter,它表示的是異步雙向通信channel的一端。這個channel就叫做MessageChannel,MessagePort通過MessageChannel來進行通信。

我們可以通過MessagePort來傳輸結構體數據,內存區域或者其他的MessagePorts。

從源代碼中,我們可以看到MessagePort中有兩個事件,close和message。

close事件將會在channel的中任何一端斷開連接的時候觸發,而message事件將會在port.postMessage時候觸發,下面我們看一個例子:

?
1
2
3
4
5
6
7
8
9
10
11
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
 
// Prints:
// foobar
// closed!
port2.on('message', (message) => console.log(message));
port2.on('close', () => console.log('closed!'));
 
port1.postMessage('foobar');
port1.close();

port.on(‘message')實際上為message事件添加了一個listener,port還提供了addListener方法來手動添加listener。

port.on(‘message')會自動觸發port.start()方法,表示啟動一個port。

當port有listener存在的時候,這表示port存在一個ref,當存在ref的時候,程序是不會結束的。我們可以通過調用port.unref方法來取消這個ref。

接下來我們看一下怎么通過port來傳輸消息:

?
1
port.postMessage(value[, transferList])

postMessage可以接受兩個參數,第一個參數是value,這是一個JavaScript對象。第二個參數是transferList。

先看一個傳遞一個參數的情況:

?
1
2
3
4
5
6
7
8
9
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
 
port1.on('message', (message) => console.log(message));
 
const circularData = {};
circularData.foo = circularData;
// Prints: { foo: [Circular] }
port2.postMessage(circularData);

通常來說postMessage發送的對象都是value的拷貝,但是如果你指定了transferList,那么在transferList中的對象將會被transfer到channel的接受端,并且不再存在于發送端,就好像把對象傳送出去一樣。

transferList是一個list,list中的對象可以是ArrayBuffer, MessagePort 和 FileHandle。

如果value中包含SharedArrayBuffer對象,那么該對象不能被包含在transferList中。

看一個包含兩個參數的例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
const { MessageChannel } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
 
port1.on('message', (message) => console.log(message));
 
const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]);
// post uint8Array的拷貝:
port2.postMessage(uint8Array);
 
port2.postMessage(uint8Array, [ uint8Array.buffer ]);
 
//port2.postMessage(uint8Array);

上面的例子將輸出:

Uint8Array(4) [ 1, 2, 3, 4 ]
Uint8Array(4) [ 1, 2, 3, 4 ]

第一個postMessage是拷貝,第二個postMessage是transfer Uint8Array底層的buffer。

如果我們再次調用port2.postMessage(uint8Array),我們會得到下面的錯誤:

?
1
DOMException [DataCloneError]: An ArrayBuffer is detached and could not be cloned.

buffer是TypedArray的底層存儲結構,如果buffer被transfer,那么之前的TypedArray將會變得不可用。

markAsUntransferable

要想避免這個問題,我們可以調用markAsUntransferable將buffer標記為不可transferable. 我們看一個markAsUntransferable的例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
const { MessageChannel, markAsUntransferable } = require('worker_threads');
 
const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);
 
markAsUntransferable(pooledBuffer);
 
const { port1 } = new MessageChannel();
port1.postMessage(typedArray1, [ typedArray1.buffer ]);
 
console.log(typedArray1);
console.log(typedArray2);

SHARE_ENV

SHARE_ENV是傳遞給worker構造函數的一個env變量,通過設置這個變量,我們可以在主線程與工作線程進行共享環境變量的讀寫。

?
1
2
3
4
5
const { Worker, SHARE_ENV } = require('worker_threads');
new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
 .on('exit', () => {
 console.log(process.env.SET_IN_WORKER); // Prints 'foo'.
 });

workerData

除了postMessage(),還可以通過在主線程中傳遞workerData給worker的構造函數,從而將主線程中的數據傳遞給worker:

?
1
2
3
4
5
6
7
const { Worker, isMainThread, workerData } = require('worker_threads');
 
if (isMainThread) {
 const worker = new Worker(__filename, { workerData: 'Hello, world!' });
} else {
 console.log(workerData); // Prints 'Hello, world!'.
}

worker類

先看一下worker的定義:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Worker extends EventEmitter {
 readonly stdin: Writable | null;
 readonly stdout: Readable;
 readonly stderr: Readable;
 readonly threadId: number;
 readonly resourceLimits?: ResourceLimits;
 
 constructor(filename: string | URL, options?: WorkerOptions);
 
 postMessage(value: any, transferList?: Array<ArrayBuffer | MessagePort>): void;
 ref(): void;
 unref(): void;
 
 terminate(): Promise<number>;
 
 getHeapSnapshot(): Promise<Readable>;
 
 addListener(event: "error", listener: (err: Error) => void): this;
 addListener(event: "exit", listener: (exitCode: number) => void): this;
 addListener(event: "message", listener: (value: any) => void): this;
 addListener(event: "online", listener: () => void): this;
 addListener(event: string | symbol, listener: (...args: any[]) => void): this;
 
 ...
}

worker繼承自EventEmitter,并且包含了4個重要的事件:error,exit,message和online。

worker表示的是一個獨立的 JavaScript 執行線程,我們可以通過傳遞filename或者URL來構造worker。

每一個worker都有一對內置的MessagePort,在worker創建的時候就會相互關聯。worker使用這對內置的MessagePort來和父線程進行通信。

通過parentPort.postMessage()發送的消息在主線程中將可以通過worker.on(‘message')接收。

主線程中通過worker.postMessage()發送的消息將可以在工作線程中通過parentPort.on(‘message')接收。

當然,你也可以顯式的創建MessageChannel 對象,然后將MessagePort作為消息傳遞給其他線程,我們看一個例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const assert = require('assert');
const {
 Worker, MessageChannel, MessagePort, isMainThread, parentPort
} = require('worker_threads');
if (isMainThread) {
 const worker = new Worker(__filename);
 const subChannel = new MessageChannel();
 worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]);
 subChannel.port2.on('message', (value) => {
 console.log('接收到:', value);
 });
} else {
 parentPort.once('message', (value) => {
 assert(value.hereIsYourPort instanceof MessagePort);
 value.hereIsYourPort.postMessage('工作線程正在發送此消息');
 value.hereIsYourPort.close();
 });
}

上面的例子中,我們借助了worker和parentPort本身的消息傳遞功能,傳遞了一個顯式的MessageChannel中的MessagePort。

然后又通過該MessagePort來進行消息的分發。

receiveMessageOnPort

除了port的on(‘message')方法之外,我們還可以使用receiveMessageOnPort來手動接收消息:

?
1
2
3
4
5
6
7
8
const { MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { port1, port2 } = new MessageChannel();
port1.postMessage({ hello: 'world' });
 
console.log(receiveMessageOnPort(port2));
// Prints: { message: { hello: 'world' } }
console.log(receiveMessageOnPort(port2));
// Prints: undefined

moveMessagePortToContext

先了解一下nodejs中的Context的概念,我們可以從vm中創建context,它是一個隔離的上下文環境,從而保證不同運行環境的安全性,我們看一個context的例子:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const vm = require('vm');
 
const x = 1;
 
const context = { x: 2 };
vm.createContext(context); // 上下文隔離化對象。
 
const code = 'x += 40; var y = 17;';
// `x` and `y` 是上下文中的全局變量。
// 最初,x 的值為 2,因為這是 context.x 的值。
vm.runInContext(code, context);
 
console.log(context.x); // 42
console.log(context.y); // 17
 
console.log(x); // 1; y 沒有定義。

在worker中,我們可以將一個MessagePort move到其他的context中。

?
1
worker.moveMessagePortToContext(port, contextifiedSandbox)

這個方法接收兩個參數,第一個參數就是要move的MessagePort,第二個參數就是vm.createContext()創建的context對象。

worker_threads的線程池

上面我們提到了使用單個的worker thread,但是現在程序中一個線程往往是不夠的,我們需要創建一個線程池來維護worker thread對象。

nodejs提供了AsyncResource類,來作為對異步資源的擴展。

AsyncResource類是async_hooks模塊中的。

下面我們看下怎么使用AsyncResource類來創建worker的線程池。

假設我們有一個task,使用來執行兩個數相加,腳本名字叫做task_processor.js:

?
1
2
3
4
const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
 parentPort.postMessage(task.a + task.b);
});

下面是worker pool的實現:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
const { Worker } = require('worker_threads');
 
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
 
class WorkerPoolTaskInfo extends AsyncResource {
 constructor(callback) {
 super('WorkerPoolTaskInfo');
 this.callback = callback;
 }
 
 done(err, result) {
 this.runInAsyncScope(this.callback, null, err, result);
 this.emitDestroy(); // `TaskInfo`s are used only once.
 }
}
 
class WorkerPool extends EventEmitter {
 constructor(numThreads) {
 super();
 this.numThreads = numThreads;
 this.workers = [];
 this.freeWorkers = [];
 
 for (let i = 0; i < numThreads; i++)
  this.addNewWorker();
 }
 
 addNewWorker() {
 const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
 worker.on('message', (result) => {
  // In case of success: Call the callback that was passed to `runTask`,
  // remove the `TaskInfo` associated with the Worker, and mark it as free
  // again.
  worker[kTaskInfo].done(null, result);
  worker[kTaskInfo] = null;
  this.freeWorkers.push(worker);
  this.emit(kWorkerFreedEvent);
 });
 worker.on('error', (err) => {
  // In case of an uncaught exception: Call the callback that was passed to
  // `runTask` with the error.
  if (worker[kTaskInfo])
  worker[kTaskInfo].done(err, null);
  else
  this.emit('error', err);
  // Remove the worker from the list and start a new Worker to replace the
  // current one.
  this.workers.splice(this.workers.indexOf(worker), 1);
  this.addNewWorker();
 });
 this.workers.push(worker);
 this.freeWorkers.push(worker);
 this.emit(kWorkerFreedEvent);
 }
 
 runTask(task, callback) {
 if (this.freeWorkers.length === 0) {
  // No free threads, wait until a worker thread becomes free.
  this.once(kWorkerFreedEvent, () => this.runTask(task, callback));
  return;
 }
 
 const worker = this.freeWorkers.pop();
 worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
 worker.postMessage(task);
 }
 
 close() {
 for (const worker of this.workers) worker.terminate();
 }
}
 
module.exports = WorkerPool;

我們給worker創建了一個新的kTaskInfo屬性,并且將異步的callback封裝到WorkerPoolTaskInfo中,賦值給worker.kTaskInfo.

接下來我們就可以使用workerPool了:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
const WorkerPool = require('./worker_pool.js');
const os = require('os');
 
const pool = new WorkerPool(os.cpus().length);
 
let finished = 0;
for (let i = 0; i < 10; i++) {
 pool.runTask({ a: 42, b: 100 }, (err, result) => {
 console.log(i, err, result);
 if (++finished === 10)
  pool.close();
 });
}

到此這篇關于nodejs中使用worker_threads來創建新的線程的方法的文章就介紹到這了,更多相關nodejs使用worker_threads創建線程內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://blog.csdn.net/superfjj/article/details/112971454

延伸 · 閱讀

精彩推薦
  • node.jsrequire加載器實現原理的深入理解

    require加載器實現原理的深入理解

    這篇文章主要給大家介紹了關于require加載器實現原理的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需...

    隱冬8462022-03-03
  • node.js詳解node.js創建一個web服務器(Server)的詳細步驟

    詳解node.js創建一個web服務器(Server)的詳細步驟

    這篇文章主要介紹了詳解node.js創建一個web服務器(Server)的詳細步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,...

    王佳斌8952021-12-31
  • node.jsnodejs中使用worker_threads來創建新的線程的方法

    nodejs中使用worker_threads來創建新的線程的方法

    這篇文章主要介紹了nodejs中使用worker_threads來創建新的線程的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友...

    flydean程序那些事8982022-01-06
  • node.jsNode.js ObjectWrap 的弱引用問題

    Node.js ObjectWrap 的弱引用問題

    最近在寫 Node.js Addon 的過程中,遇到了一個問題,然后發現是 ObjectWrap 弱引用導致的,本文介紹一下具體的問題和排查過程,以及 ObjectWrap 的使用問題。...

    編程雜技9852022-01-04
  • node.jsk8s node節點重新加入master集群的實現

    k8s node節點重新加入master集群的實現

    這篇文章主要介紹了k8s node節點重新加入master集群的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋...

    Scarborought13922022-01-22
  • node.jsNode.js 中如何收集和解析命令行參數

    Node.js 中如何收集和解析命令行參數

    這篇文章主要介紹了Node.js 中如何收集和解析命令行參數,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋...

    descire8802021-12-28
  • node.js在瀏覽器中,把 Vite 跑起來了!

    在瀏覽器中,把 Vite 跑起來了!

    大家好,我是 ssh,前幾天在推上沖浪的時候,看到 Francois Valdy 宣布他制作了 browser-vite[1],成功把 Vite 成功在瀏覽器中運行起來了。這引起了我的興趣,如...

    前端從進階到入院9282022-01-11
  • node.jslinux服務器快速卸載安裝node環境(簡單上手)

    linux服務器快速卸載安裝node環境(簡單上手)

    這篇文章主要介紹了linux服務器快速卸載安裝node環境(簡單上手),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需...

    mose-x8462022-01-22
主站蜘蛛池模板: 国产亚洲精品美女久久久 | 欧美国产在线 | 欧美草比视频 | 青青在线视频观看 | 免费一级特黄特色大片在线 | 欧美video丝袜连裤袜bd | 91麻豆精品国产91久久久 | 国产精自产拍久久久久久 | 四虎最新免费观看网址 | 97色资源| 香蕉tv国产在线永久播放 | 精品无人区乱码1区2区3区免费 | 四虎免费在线视频 | 美女下面被cao出水 美女污视频 | 高清在线观看免费 | 91大片淫黄大片在线天堂 | 国产欧美日韩视频在线观看一区二区 | 羞羞视频免费观 | 免费国产影视观看网站入口 | free chinese麻豆 | 丝瓜污污 | 亚洲福利一区二区三区 | 性刺激欧美三级在线现看中文 | 美女脱了内裤让男生尿囗 | 波多野结衣在线看 | 91九色国产porny | 亚洲丰满女人ass硕大 | 免费特黄一区二区三区视频一 | 国产在线乱子伦一区二区 | 精品精品国产自在久久高清 | 香蕉在线精品亚洲第一区 | 国内精品国语自产拍在线观看55 | 国产精品久久久久一区二区三区 | 久久爽狠狠添AV激情五月 | 欧产日产国产精品专区 | 14一18cad中国大学生 | 色多多视频在线 | 色婷婷综合久久久中文字幕 | 四虎永久在线精品国产馆v视影院 | 国产精品欧美亚洲韩国日本 | 日韩精品一区二区三区视频 |