异步迭代器是支持异步操作的迭代器。
// 同步迭代器
const syncIterator = {
data: [1, 2, 3],
index: 0,
[Symbol.iterator]() {
return {
next: () => {
if (this.index < this.data.length) {
return { value: this.data[this.index++], done: false };
}
return { done: true };
}
};
}
};
for (const item of syncIterator) {
console.log(item); // 1, 2, 3
}
// 异步迭代器使用 Symbol.asyncIterator
const asyncIterator = {
data: [1, 2, 3],
index: 0,
[Symbol.asyncIterator]() {
return {
next: async () => {
if (this.index < this.data.length) {
await delay(100); // 模拟异步操作
return { value: this.data[this.index++], done: false };
}
return { done: true };
}
};
}
};
function delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async function useAsyncIterator() {
const iterator = asyncIterator[Symbol.asyncIterator]();
let result = await iterator.next();
while (!result.done) {
console.log(result.value);
result = await iterator.next();
}
}
useAsyncIterator(); // 1, 2, 3
使用async和yield创建异步生成器。
// 异步生成器函数
async function* asyncGenerator() {
yield 1;
yield 2;
yield 3;
}
// 使用
async function useGenerator() {
for await (const value of asyncGenerator()) {
console.log(value); // 1, 2, 3
}
}
// 生成异步数据
async function* fetchPages(url) {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(`${url}?page=${page}`);
const data = await response.json();
yield data.items;
hasMore = data.hasMore;
page++;
}
}
// 使用
async function loadAllItems() {
const allItems = [];
for await (const items of fetchPages("/api/items")) {
allItems.push(...items);
console.log(`加载了 ${items.length} 条数据`);
}
console.log(`总共 ${allItems.length} 条数据`);
}
// 无限异步序列
async function* infiniteSequence() {
let i = 0;
while (true) {
await delay(1000);
yield i++;
}
}
// 使用
async function demo() {
for await (const value of infiniteSequence()) {
console.log(value);
if (value >= 5) break; // 必须手动退出
}
}
异步迭代语法。
// 遍历异步可迭代对象
async function* generator() {
yield "东巴文";
yield "db-w.cn";
}
async function demo() {
for await (const value of generator()) {
console.log(value);
}
}
demo();
// 东巴文
// db-w.cn
// for await...of会等待每个Promise
async function processPromises() {
const promises = [
Promise.resolve(1),
Promise.resolve(2),
Promise.resolve(3)
];
for await (const value of promises) {
console.log(value); // 1, 2, 3
}
}
// 注意:这与Promise.all不同
// Promise.all是并行执行
// for await...of是顺序执行
class AsyncQueue {
constructor() {
this.queue = [];
this.resolvers = [];
}
push(value) {
if (this.resolvers.length > 0) {
const resolve = this.resolvers.shift();
resolve({ value, done: false });
} else {
this.queue.push(value);
}
}
async *[Symbol.asyncIterator]() {
while (true) {
if (this.queue.length > 0) {
yield this.queue.shift();
} else {
const result = await new Promise(resolve => {
this.resolvers.push(resolve);
});
if (result.done) return;
yield result.value;
}
}
}
}
// 使用
const queue = new AsyncQueue();
async function consumer() {
for await (const value of queue) {
console.log("消费:", value);
}
}
consumer();
queue.push("东巴文");
queue.push("db-w.cn");
异步生成器的方法。
// yield* 委托给另一个异步生成器
async function* inner() {
yield 1;
yield 2;
}
async function* outer() {
yield "开始";
yield* inner();
yield "结束";
}
async function demo() {
for await (const value of outer()) {
console.log(value);
}
}
demo();
// 开始
// 1
// 2
// 结束
async function* generator() {
try {
yield 1;
yield 2;
yield 3;
} catch (error) {
console.log("捕获错误:", error);
yield "恢复";
}
}
async function demoReturn() {
const gen = generator();
console.log(await gen.next()); // { value: 1, done: false }
console.log(await gen.return("提前结束")); // { value: "提前结束", done: true }
console.log(await gen.next()); // { value: undefined, done: true }
}
async function demoThrow() {
const gen = generator();
console.log(await gen.next()); // { value: 1, done: false }
console.log(await gen.throw(new Error("出错了"))); // { value: "恢复", done: false }
console.log(await gen.next()); // { value: 3, done: false }
}
class DataSource {
constructor(data) {
this.data = data;
}
async *[Symbol.asyncIterator]() {
for (const item of this.data) {
await delay(100);
yield item;
}
}
async *filter(predicate) {
for await (const item of this) {
if (predicate(item)) {
yield item;
}
}
}
async *map(transformer) {
for await (const item of this) {
yield transformer(item);
}
}
}
// 使用
async function demo() {
const source = new DataSource([1, 2, 3, 4, 5]);
for await (const value of source.filter(x => x % 2 === 0)) {
console.log(value); // 2, 4
}
for await (const value of source.map(x => x * 2)) {
console.log(value); // 2, 4, 6, 8, 10
}
}
// 流式读取大文件(Node.js)
async function* readLines(filePath) {
const fs = require("fs");
const readline = require("readline");
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity
});
for await (const line of rl) {
yield line;
}
}
// 处理大文件
async function processLargeFile(filePath) {
let lineCount = 0;
for await (const line of readLines(filePath)) {
// 逐行处理,内存友好
lineCount++;
if (line.includes("东巴文")) {
console.log(`第${lineCount}行: ${line}`);
}
}
console.log(`总行数: ${lineCount}`);
}
// 分页API迭代器
class PaginatedAPI {
constructor(baseUrl, pageSize = 10) {
this.baseUrl = baseUrl;
this.pageSize = pageSize;
}
async *[Symbol.asyncIterator]() {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(
`${this.baseUrl}?page=${page}&size=${this.pageSize}`
);
const data = await response.json();
yield* data.items;
hasMore = data.hasMore;
page++;
}
}
}
// 使用
async function loadAllUsers() {
const api = new PaginatedAPI("/api/users", 20);
const users = [];
for await (const user of api) {
users.push(user);
console.log(`加载用户: ${user.name}`);
}
console.log(`总共 ${users.length} 个用户`);
}
// 实时数据流
class RealtimeStream {
constructor(url) {
this.url = url;
this.listeners = [];
}
async *[Symbol.asyncIterator]() {
const ws = new WebSocket(this.url);
const messageQueue = [];
let resolve = null;
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (resolve) {
resolve({ value: data, done: false });
resolve = null;
} else {
messageQueue.push(data);
}
};
ws.onclose = () => {
if (resolve) {
resolve({ done: true });
}
};
while (ws.readyState === WebSocket.OPEN) {
if (messageQueue.length > 0) {
yield messageQueue.shift();
} else {
const result = await new Promise(r => { resolve = r; });
if (result.done) return;
yield result.value;
}
}
}
}
// 使用
async function listenToStream() {
const stream = new RealtimeStream("wss://api.example.com/stream");
for await (const message of stream) {
console.log("收到消息:", message);
}
}
// 异步管道处理
async function* pipe(...generators) {
let current = generators[0];
for (let i = 1; i < generators.length; i++) {
current = generators[i](current);
}
yield* current;
}
async function* source() {
yield* [1, 2, 3, 4, 5];
}
async function* filter(iterable, predicate) {
for await (const item of iterable) {
if (predicate(item)) {
yield item;
}
}
}
async function* map(iterable, transformer) {
for await (const item of iterable) {
yield transformer(item);
}
}
// 使用
async function demo() {
const pipeline = pipe(
source(),
async function*(iter) {
yield* filter(iter, x => x % 2 === 0);
},
async function*(iter) {
yield* map(iter, x => x * 2);
}
);
for await (const value of pipeline) {
console.log(value); // 4, 8
}
}
掌握了异步迭代后,让我们继续学习:
东巴文(db-w.cn) - 让编程学习更简单
🎯 东巴文寄语:异步迭代器和异步生成器是处理异步数据流的强大工具,特别适合处理分页数据、实时流和大型数据集。在 db-w.cn,我们帮你掌握高级异步编程技巧!