Web Workers

Web Workers允许JavaScript在后台线程中运行,避免长时间运行的任务阻塞用户界面。本章将介绍Web Workers的基本概念和使用方法。

为什么需要Web Workers

JavaScript的单线程限制

const singleThreadIssue = {
    problem: `
        JavaScript是单线程语言,所有代码都在主线程执行:
        - 长时间运行的任务会阻塞UI
        - 用户无法交互,页面卡死
        - 动画和滚动不流畅
    `,
    
    example: `
        function heavyComputation() {
            let result = 0;
            for (let i = 0; i < 10000000000; i++) {
                result += Math.sqrt(i);
            }
            return result;
        }
        
        document.getElementById('btn').addEventListener('click', () => {
            heavyComputation();
            console.log('计算完成');
        });
    `,
    
    consequence: '点击按钮后,整个页面会卡死直到计算完成'
};

Web Workers解决方案

const workerSolution = {
    concept: 'Web Workers在独立线程中运行脚本',
    
    benefits: [
        '不阻塞主线程',
        '保持UI响应',
        '利用多核CPU',
        '适合CPU密集型任务'
    ],
    
    limitations: [
        '不能直接操作DOM',
        '不能访问window对象',
        '通信有开销',
        '需要序列化数据'
    ]
};

创建Web Worker

基本用法

const workerCode = `
    self.onmessage = function(e) {
        const { data } = e;
        const result = heavyComputation(data);
        self.postMessage(result);
    };
    
    function heavyComputation(n) {
        let result = 0;
        for (let i = 0; i < n; i++) {
            result += Math.sqrt(i);
        }
        return result;
    }
`;

const blob = new Blob([workerCode], { type: 'application/javascript' });
const workerUrl = URL.createObjectURL(blob);
const worker = new Worker(workerUrl);

worker.onmessage = function(e) {
    console.log('收到结果:', e.data);
};

worker.onerror = function(e) {
    console.error('Worker错误:', e.message);
};

worker.postMessage(10000000);

worker.terminate();

使用独立文件

const worker = new Worker('worker.js');

worker.postMessage({ type: 'start', data: [1, 2, 3, 4, 5] });

worker.onmessage = function(e) {
    const { type, result } = e.data;
    
    switch (type) {
        case 'progress':
            updateProgress(result);
            break;
        case 'complete':
            displayResult(result);
            worker.terminate();
            break;
        case 'error':
            handleError(result);
            break;
    }
};

worker.onerror = function(error) {
    console.error('Worker错误:', {
        message: error.message,
        filename: error.filename,
        lineno: error.lineno
    });
};

worker.js:

self.onmessage = function(e) {
    const { type, data } = e.data;
    
    switch (type) {
        case 'start':
            processData(data);
            break;
    }
};

function processData(data) {
    const total = data.length;
    
    data.forEach((item, index) => {
        const result = heavyOperation(item);
        
        self.postMessage({
            type: 'progress',
            result: { current: index + 1, total }
        });
    });
    
    self.postMessage({
        type: 'complete',
        result: data.map(heavyOperation)
    });
}

function heavyOperation(item) {
    return item * item;
}

Worker通信

双向通信

class WorkerClient {
    constructor(workerUrl) {
        this.worker = new Worker(workerUrl);
        this.pendingRequests = new Map();
        this.requestId = 0;
        
        this.worker.onmessage = (e) => {
            const { id, result, error } = e.data;
            const { resolve, reject } = this.pendingRequests.get(id) || {};
            
            if (error) {
                reject?.(new Error(error));
            } else {
                resolve?.(result);
            }
            
            this.pendingRequests.delete(id);
        };
    }
    
    send(type, data) {
        return new Promise((resolve, reject) => {
            const id = ++this.requestId;
            
            this.pendingRequests.set(id, { resolve, reject });
            
            this.worker.postMessage({ id, type, data });
            
            setTimeout(() => {
                if (this.pendingRequests.has(id)) {
                    this.pendingRequests.delete(id);
                    reject(new Error('请求超时'));
                }
            }, 30000);
        });
    }
    
    terminate() {
        this.worker.terminate();
        this.pendingRequests.forEach(({ reject }) => {
            reject(new Error('Worker已终止'));
        });
        this.pendingRequests.clear();
    }
}

const client = new WorkerClient('api-worker.js');

async function fetchData() {
    const result = await client.send('fetch', { url: '/api/data' });
    console.log(result);
}

Transferable对象

const transferableExample = {
    description: 'Transferable对象可以直接转移所有权,避免复制开销',
    
    types: ['ArrayBuffer', 'MessagePort', 'ImageBitmap', 'OffscreenCanvas'],
    
    normal: `
        const largeArray = new Float32Array(10000000);
        worker.postMessage(largeArray);
    `,
    
    transfer: `
        const largeArray = new Float32Array(10000000);
        worker.postMessage(largeArray, [largeArray.buffer]);
        
        console.log(largeArray.length);
    `
};

const worker = new Worker('image-processor.js');

async function processImage(imageData) {
    const response = await fetch(imageData.url);
    const blob = await response.blob();
    const bitmap = await createImageBitmap(blob);
    
    worker.postMessage({ bitmap, options: imageData.options }, [bitmap]);
}

worker.onmessage = function(e) {
    const { processedBitmap } = e.data;
    const canvas = document.getElementById('canvas');
    const ctx = canvas.getContext('2d');
    ctx.drawImage(processedBitmap, 0, 0);
};

Worker类型

Dedicated Worker

const dedicatedWorker = {
    description: '专用Worker,只能被创建它的脚本使用',
    
    main: `
        const worker = new Worker('dedicated-worker.js');
        worker.postMessage('Hello');
        worker.onmessage = (e) => console.log(e.data);
    `,
    
    worker: `
        self.onmessage = (e) => {
            self.postMessage('Received: ' + e.data);
        };
    `
};

Shared Worker

const sharedWorker = {
    description: '共享Worker,可以被多个页面共享',
    
    main: `
        const worker = new SharedWorker('shared-worker.js');
        worker.port.onmessage = (e) => {
            console.log('收到消息:', e.data);
        };
        worker.port.start();
        worker.port.postMessage('来自页面的消息');
    `,
    
    worker: `
        const connections = [];
        
        self.onconnect = (e) => {
            const port = e.ports[0];
            connections.push(port);
            
            port.onmessage = (e) => {
                connections.forEach(conn => {
                    conn.postMessage('广播: ' + e.data);
                });
            };
            
            port.start();
        };
    `
};

Service Worker

const serviceWorkerExample = {
    description: '服务Worker,用于缓存和离线功能',
    
    register: `
        if ('serviceWorker' in navigator) {
            navigator.serviceWorker.register('/sw.js')
                .then(reg => console.log('注册成功', reg.scope))
                .catch(err => console.error('注册失败', err));
        }
    `,
    
    swCode: `
        const CACHE_NAME = 'v1';
        const urlsToCache = [
            '/',
            '/styles.css',
            '/script.js'
        ];
        
        self.addEventListener('install', (e) => {
            e.waitUntil(
                caches.open(CACHE_NAME)
                    .then(cache => cache.addAll(urlsToCache))
            );
        });
        
        self.addEventListener('fetch', (e) => {
            e.respondWith(
                caches.match(e.request)
                    .then(response => response || fetch(e.request))
            );
        });
    `
};

实际应用示例

数据处理Worker

class DataProcessorWorker {
    constructor() {
        this.worker = this.createWorker();
    }
    
    createWorker() {
        const code = `
            self.onmessage = function(e) {
                const { type, data } = e.data;
                
                switch (type) {
                    case 'filter':
                        self.postMessage({
                            type: 'result',
                            result: data.items.filter(data.predicate)
                        });
                        break;
                        
                    case 'sort':
                        self.postMessage({
                            type: 'result',
                            result: data.items.sort((a, b) => {
                                return data.order === 'desc' 
                                    ? b[data.key] - a[data.key]
                                    : a[data.key] - b[data.key];
                            })
                        });
                        break;
                        
                    case 'aggregate':
                        const result = data.items.reduce((acc, item) => {
                            acc[data.operation](item[data.field]);
                            return acc;
                        }, data.initial);
                        self.postMessage({ type: 'result', result });
                        break;
                        
                    case 'groupBy':
                        const groups = {};
                        data.items.forEach(item => {
                            const key = item[data.key];
                            if (!groups[key]) groups[key] = [];
                            groups[key].push(item);
                        });
                        self.postMessage({ type: 'result', result: groups });
                        break;
                }
            };
        `;
        
        const blob = new Blob([code], { type: 'application/javascript' });
        return new Worker(URL.createObjectURL(blob));
    }
    
    process(type, data) {
        return new Promise((resolve, reject) => {
            const handler = (e) => {
                if (e.data.type === 'result') {
                    this.worker.removeEventListener('message', handler);
                    resolve(e.data.result);
                }
            };
            
            this.worker.addEventListener('message', handler);
            this.worker.postMessage({ type, data });
            
            setTimeout(() => reject(new Error('处理超时')), 30000);
        });
    }
    
    terminate() {
        this.worker.terminate();
    }
}

const processor = new DataProcessorWorker();

async function processLargeData() {
    const data = Array.from({ length: 100000 }, (_, i) => ({
        id: i,
        value: Math.random() * 100,
        category: ['A', 'B', 'C'][Math.floor(Math.random() * 3)]
    }));
    
    const filtered = await processor.process('filter', {
        items: data,
        predicate: item => item.value > 50
    });
    
    const sorted = await processor.process('sort', {
        items: filtered,
        key: 'value',
        order: 'desc'
    });
    
    const grouped = await processor.process('groupBy', {
        items: sorted,
        key: 'category'
    });
    
    processor.terminate();
}

图像处理Worker

const imageWorkerCode = `
    self.onmessage = function(e) {
        const { imageData, operation, params } = e.data;
        const { data, width, height } = imageData;
        
        switch (operation) {
            case 'grayscale':
                for (let i = 0; i < data.length; i += 4) {
                    const avg = (data[i] + data[i + 1] + data[i + 2]) / 3;
                    data[i] = data[i + 1] = data[i + 2] = avg;
                }
                break;
                
            case 'invert':
                for (let i = 0; i < data.length; i += 4) {
                    data[i] = 255 - data[i];
                    data[i + 1] = 255 - data[i + 1];
                    data[i + 2] = 255 - data[i + 2];
                }
                break;
                
            case 'brightness':
                const factor = params.factor;
                for (let i = 0; i < data.length; i += 4) {
                    data[i] = Math.min(255, data[i] * factor);
                    data[i + 1] = Math.min(255, data[i + 1] * factor);
                    data[i + 2] = Math.min(255, data[i + 2] * factor);
                }
                break;
                
            case 'blur':
                const radius = params.radius || 1;
                const result = new Uint8ClampedArray(data);
                for (let y = radius; y < height - radius; y++) {
                    for (let x = radius; x < width - radius; x++) {
                        let r = 0, g = 0, b = 0, count = 0;
                        for (let dy = -radius; dy <= radius; dy++) {
                            for (let dx = -radius; dx <= radius; dx++) {
                                const idx = ((y + dy) * width + (x + dx)) * 4;
                                r += data[idx];
                                g += data[idx + 1];
                                b += data[idx + 2];
                                count++;
                            }
                        }
                        const idx = (y * width + x) * 4;
                        result[idx] = r / count;
                        result[idx + 1] = g / count;
                        result[idx + 2] = b / count;
                    }
                }
                imageData.data.set(result);
                break;
        }
        
        self.postMessage({ imageData }, [imageData.data.buffer]);
    };
`;

class ImageProcessor {
    constructor() {
        const blob = new Blob([imageWorkerCode], { type: 'application/javascript' });
        this.worker = new Worker(URL.createObjectURL(blob));
    }
    
    process(canvas, operation, params = {}) {
        return new Promise((resolve) => {
            const ctx = canvas.getContext('2d');
            const imageData = ctx.getImageData(0, 0, canvas.width, canvas.height);
            
            this.worker.onmessage = (e) => {
                ctx.putImageData(e.data.imageData, 0, 0);
                resolve();
            };
            
            this.worker.postMessage({ imageData, operation, params });
        });
    }
    
    terminate() {
        this.worker.terminate();
    }
}

Worker池

class WorkerPool {
    constructor(workerUrl, poolSize = navigator.hardwareConcurrency || 4) {
        this.poolSize = poolSize;
        this.workers = [];
        this.taskQueue = [];
        this.activeTasks = new Map();
        
        for (let i = 0; i < poolSize; i++) {
            const worker = new Worker(workerUrl);
            worker.idle = true;
            worker.onmessage = (e) => this.handleResult(worker, e);
            worker.onerror = (e) => this.handleError(worker, e);
            this.workers.push(worker);
        }
    }
    
    execute(data) {
        return new Promise((resolve, reject) => {
            const task = { data, resolve, reject };
            
            const idleWorker = this.workers.find(w => w.idle);
            
            if (idleWorker) {
                this.runTask(idleWorker, task);
            } else {
                this.taskQueue.push(task);
            }
        });
    }
    
    runTask(worker, task) {
        worker.idle = false;
        this.activeTasks.set(worker, task);
        worker.postMessage(task.data);
    }
    
    handleResult(worker, e) {
        const task = this.activeTasks.get(worker);
        this.activeTasks.delete(worker);
        
        task.resolve(e.data);
        
        this.processNextTask(worker);
    }
    
    handleError(worker, e) {
        const task = this.activeTasks.get(worker);
        this.activeTasks.delete(worker);
        
        task.reject(new Error(e.message));
        
        this.processNextTask(worker);
    }
    
    processNextTask(worker) {
        if (this.taskQueue.length > 0) {
            const nextTask = this.taskQueue.shift();
            this.runTask(worker, nextTask);
        } else {
            worker.idle = true;
        }
    }
    
    terminate() {
        this.workers.forEach(worker => worker.terminate());
        this.workers = [];
        this.taskQueue = [];
        this.activeTasks.clear();
    }
    
    get stats() {
        return {
            total: this.poolSize,
            active: this.workers.filter(w => !w.idle).length,
            idle: this.workers.filter(w => w.idle).length,
            queued: this.taskQueue.length
        };
    }
}

const pool = new WorkerPool('compute-worker.js', 4);

async function processBatch(items) {
    const results = await Promise.all(
        items.map(item => pool.execute({ type: 'process', data: item }))
    );
    return results;
}

错误处理和调试

class SafeWorker {
    constructor(workerUrl, options = {}) {
        this.worker = new Worker(workerUrl);
        this.timeout = options.timeout || 30000;
        this.maxRetries = options.maxRetries || 3;
        this.retries = 0;
        
        this.setupHandlers();
    }
    
    setupHandlers() {
        this.worker.onerror = (e) => {
            console.error('Worker错误:', {
                message: e.message,
                filename: e.filename,
                lineno: e.lineno,
                colno: e.colno
            });
            
            e.preventDefault();
        };
        
        this.worker.onmessageerror = (e) => {
            console.error('消息反序列化错误:', e);
        };
    }
    
    send(data) {
        return new Promise((resolve, reject) => {
            let timeoutId;
            
            const messageHandler = (e) => {
                clearTimeout(timeoutId);
                this.worker.removeEventListener('message', messageHandler);
                resolve(e.data);
            };
            
            const errorHandler = (e) => {
                clearTimeout(timeoutId);
                this.worker.removeEventListener('error', errorHandler);
                
                if (this.retries < this.maxRetries) {
                    this.retries++;
                    console.log(`重试 ${this.retries}/${this.maxRetries}`);
                    this.send(data).then(resolve).catch(reject);
                } else {
                    reject(new Error(`Worker错误: ${e.message}`));
                }
            };
            
            this.worker.addEventListener('message', messageHandler);
            this.worker.addEventListener('error', errorHandler);
            
            timeoutId = setTimeout(() => {
                this.worker.removeEventListener('message', messageHandler);
                this.worker.removeEventListener('error', errorHandler);
                reject(new Error('Worker超时'));
            }, this.timeout);
            
            this.worker.postMessage(data);
        });
    }
    
    terminate() {
        this.worker.terminate();
    }
}

东巴文小贴士

🧵 Worker使用建议

  1. 任务粒度:任务太小,通信开销大于计算收益;任务太大,响应延迟高
  2. 数据传输:使用Transferable对象减少复制开销
  3. Worker池:避免频繁创建销毁Worker,使用Worker池复用
  4. 错误处理:Worker内部错误不会冒泡到主线程,需要单独处理

性能考量

  • Worker创建有开销(约几十ms)
  • 数据序列化/反序列化有开销
  • 适合CPU密集型任务,不适合IO密集型任务
  • 任务执行时间应远大于通信时间

下一步

下一章将探讨 [Service Worker](file:///e:/db-w.cn/md_data/javascript/71_Service Worker.md),学习如何实现离线缓存和后台同步功能。