异步迭代

异步迭代器

异步迭代器是支持异步操作的迭代器。

同步迭代器回顾

// 同步迭代器
const syncIterator = {
    data: [1, 2, 3],
    index: 0,
    [Symbol.iterator]() {
        return {
            next: () => {
                if (this.index < this.data.length) {
                    return { value: this.data[this.index++], done: false };
                }
                return { done: true };
            }
        };
    }
};

for (const item of syncIterator) {
    console.log(item);  // 1, 2, 3
}

异步迭代器定义

// 异步迭代器使用 Symbol.asyncIterator
const asyncIterator = {
    data: [1, 2, 3],
    index: 0,
    [Symbol.asyncIterator]() {
        return {
            next: async () => {
                if (this.index < this.data.length) {
                    await delay(100);  // 模拟异步操作
                    return { value: this.data[this.index++], done: false };
                }
                return { done: true };
            }
        };
    }
};

function delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

手动使用异步迭代器

async function useAsyncIterator() {
    const iterator = asyncIterator[Symbol.asyncIterator]();
    
    let result = await iterator.next();
    while (!result.done) {
        console.log(result.value);
        result = await iterator.next();
    }
}

useAsyncIterator();  // 1, 2, 3

异步生成器

使用async和yield创建异步生成器。

基本语法

// 异步生成器函数
async function* asyncGenerator() {
    yield 1;
    yield 2;
    yield 3;
}

// 使用
async function useGenerator() {
    for await (const value of asyncGenerator()) {
        console.log(value);  // 1, 2, 3
    }
}

异步数据生成

// 生成异步数据
async function* fetchPages(url) {
    let page = 1;
    let hasMore = true;
    
    while (hasMore) {
        const response = await fetch(`${url}?page=${page}`);
        const data = await response.json();
        
        yield data.items;
        
        hasMore = data.hasMore;
        page++;
    }
}

// 使用
async function loadAllItems() {
    const allItems = [];
    
    for await (const items of fetchPages("/api/items")) {
        allItems.push(...items);
        console.log(`加载了 ${items.length} 条数据`);
    }
    
    console.log(`总共 ${allItems.length} 条数据`);
}

无限序列

// 无限异步序列
async function* infiniteSequence() {
    let i = 0;
    while (true) {
        await delay(1000);
        yield i++;
    }
}

// 使用
async function demo() {
    for await (const value of infiniteSequence()) {
        console.log(value);
        if (value >= 5) break;  // 必须手动退出
    }
}

for await...of

异步迭代语法。

基本用法

// 遍历异步可迭代对象
async function* generator() {
    yield "东巴文";
    yield "db-w.cn";
}

async function demo() {
    for await (const value of generator()) {
        console.log(value);
    }
}

demo();
// 东巴文
// db-w.cn

遍历Promise数组

// for await...of会等待每个Promise
async function processPromises() {
    const promises = [
        Promise.resolve(1),
        Promise.resolve(2),
        Promise.resolve(3)
    ];
    
    for await (const value of promises) {
        console.log(value);  // 1, 2, 3
    }
}

// 注意:这与Promise.all不同
// Promise.all是并行执行
// for await...of是顺序执行

异步队列

class AsyncQueue {
    constructor() {
        this.queue = [];
        this.resolvers = [];
    }
    
    push(value) {
        if (this.resolvers.length > 0) {
            const resolve = this.resolvers.shift();
            resolve({ value, done: false });
        } else {
            this.queue.push(value);
        }
    }
    
    async *[Symbol.asyncIterator]() {
        while (true) {
            if (this.queue.length > 0) {
                yield this.queue.shift();
            } else {
                const result = await new Promise(resolve => {
                    this.resolvers.push(resolve);
                });
                if (result.done) return;
                yield result.value;
            }
        }
    }
}

// 使用
const queue = new AsyncQueue();

async function consumer() {
    for await (const value of queue) {
        console.log("消费:", value);
    }
}

consumer();

queue.push("东巴文");
queue.push("db-w.cn");

异步生成器方法

异步生成器的方法。

yield*

// yield* 委托给另一个异步生成器
async function* inner() {
    yield 1;
    yield 2;
}

async function* outer() {
    yield "开始";
    yield* inner();
    yield "结束";
}

async function demo() {
    for await (const value of outer()) {
        console.log(value);
    }
}

demo();
// 开始
// 1
// 2
// 结束

return和throw

async function* generator() {
    try {
        yield 1;
        yield 2;
        yield 3;
    } catch (error) {
        console.log("捕获错误:", error);
        yield "恢复";
    }
}

async function demoReturn() {
    const gen = generator();
    
    console.log(await gen.next());  // { value: 1, done: false }
    console.log(await gen.return("提前结束"));  // { value: "提前结束", done: true }
    console.log(await gen.next());  // { value: undefined, done: true }
}

async function demoThrow() {
    const gen = generator();
    
    console.log(await gen.next());  // { value: 1, done: false }
    console.log(await gen.throw(new Error("出错了")));  // { value: "恢复", done: false }
    console.log(await gen.next());  // { value: 3, done: false }
}

生成器作为对象方法

class DataSource {
    constructor(data) {
        this.data = data;
    }
    
    async *[Symbol.asyncIterator]() {
        for (const item of this.data) {
            await delay(100);
            yield item;
        }
    }
    
    async *filter(predicate) {
        for await (const item of this) {
            if (predicate(item)) {
                yield item;
            }
        }
    }
    
    async *map(transformer) {
        for await (const item of this) {
            yield transformer(item);
        }
    }
}

// 使用
async function demo() {
    const source = new DataSource([1, 2, 3, 4, 5]);
    
    for await (const value of source.filter(x => x % 2 === 0)) {
        console.log(value);  // 2, 4
    }
    
    for await (const value of source.map(x => x * 2)) {
        console.log(value);  // 2, 4, 6, 8, 10
    }
}

实际应用

流式数据处理

// 流式读取大文件(Node.js)
async function* readLines(filePath) {
    const fs = require("fs");
    const readline = require("readline");
    
    const fileStream = fs.createReadStream(filePath);
    const rl = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity
    });
    
    for await (const line of rl) {
        yield line;
    }
}

// 处理大文件
async function processLargeFile(filePath) {
    let lineCount = 0;
    
    for await (const line of readLines(filePath)) {
        // 逐行处理,内存友好
        lineCount++;
        if (line.includes("东巴文")) {
            console.log(`第${lineCount}行: ${line}`);
        }
    }
    
    console.log(`总行数: ${lineCount}`);
}

分页数据加载

// 分页API迭代器
class PaginatedAPI {
    constructor(baseUrl, pageSize = 10) {
        this.baseUrl = baseUrl;
        this.pageSize = pageSize;
    }
    
    async *[Symbol.asyncIterator]() {
        let page = 1;
        let hasMore = true;
        
        while (hasMore) {
            const response = await fetch(
                `${this.baseUrl}?page=${page}&size=${this.pageSize}`
            );
            const data = await response.json();
            
            yield* data.items;
            
            hasMore = data.hasMore;
            page++;
        }
    }
}

// 使用
async function loadAllUsers() {
    const api = new PaginatedAPI("/api/users", 20);
    const users = [];
    
    for await (const user of api) {
        users.push(user);
        console.log(`加载用户: ${user.name}`);
    }
    
    console.log(`总共 ${users.length} 个用户`);
}

实时数据流

// 实时数据流
class RealtimeStream {
    constructor(url) {
        this.url = url;
        this.listeners = [];
    }
    
    async *[Symbol.asyncIterator]() {
        const ws = new WebSocket(this.url);
        const messageQueue = [];
        let resolve = null;
        
        ws.onmessage = (event) => {
            const data = JSON.parse(event.data);
            if (resolve) {
                resolve({ value: data, done: false });
                resolve = null;
            } else {
                messageQueue.push(data);
            }
        };
        
        ws.onclose = () => {
            if (resolve) {
                resolve({ done: true });
            }
        };
        
        while (ws.readyState === WebSocket.OPEN) {
            if (messageQueue.length > 0) {
                yield messageQueue.shift();
            } else {
                const result = await new Promise(r => { resolve = r; });
                if (result.done) return;
                yield result.value;
            }
        }
    }
}

// 使用
async function listenToStream() {
    const stream = new RealtimeStream("wss://api.example.com/stream");
    
    for await (const message of stream) {
        console.log("收到消息:", message);
    }
}

异步管道

// 异步管道处理
async function* pipe(...generators) {
    let current = generators[0];
    
    for (let i = 1; i < generators.length; i++) {
        current = generators[i](current);
    }
    
    yield* current;
}

async function* source() {
    yield* [1, 2, 3, 4, 5];
}

async function* filter(iterable, predicate) {
    for await (const item of iterable) {
        if (predicate(item)) {
            yield item;
        }
    }
}

async function* map(iterable, transformer) {
    for await (const item of iterable) {
        yield transformer(item);
    }
}

// 使用
async function demo() {
    const pipeline = pipe(
        source(),
        async function*(iter) {
            yield* filter(iter, x => x % 2 === 0);
        },
        async function*(iter) {
            yield* map(iter, x => x * 2);
        }
    );
    
    for await (const value of pipeline) {
        console.log(value);  // 4, 8
    }
}

下一步

掌握了异步迭代后,让我们继续学习:

  1. Ajax - 学习Ajax请求
  2. [Fetch API](./50_Fetch API.md) - 学习现代网络请求
  3. WebSocket - 学习实时通信

东巴文(db-w.cn) - 让编程学习更简单

🎯 东巴文寄语:异步迭代器和异步生成器是处理异步数据流的强大工具,特别适合处理分页数据、实时流和大型数据集。在 db-w.cn,我们帮你掌握高级异步编程技巧!