手写 async/await 的实现
包含以下功能
- 手写 async/await 的实现
- 异步并发、串行、并行
- 并发控制
- 请求封装及异步控制
手写 async/await 的实现
来源:https://github.com/sisterAn/JavaScript-Algorithms/issues/56
await
内部实现了 generator
,其实 await
就是 generator
加上 Promise
的语法糖,且内部实现了自动执行 generator
。如果你熟悉 co
的话,其实自己就可以实现这样的语法糖。
js
/**
* async/await 实现
* @param {*} generatorFunc
*/
function asyncToGenerator(generatorFunc) {
// 返回的是一个新的函数
return function (...args) {
// 先调用 generator 函数 生成迭代器
// 对应 var gen = testG()
const gen = generatorFunc.apply(this, args)
// 返回一个 promise 因为外部是用.then 的方式 或者 await 的方式去使用这个函数的返回值的
// var test = asyncToGenerator(testG)
// test().then(res => console.log(res))
return new Promise((resolve, reject) => {
// 内部定义一个 step 函数 用来一步一步的跨过 yield 的阻碍
// key 有 next 和 throw 两种取值,分别对应了 gen 的 next 和 throw 方法
// arg 参数则是用来把 promise resolve 出来的值交给下一个 yield
function step(key, arg) {
let genResult
// 这个方法需要包裹在 try catch 中
// 如果报错了 就把 promise 给 reject 掉 外部通过.catch 可以获取到错误
try {
genResult = gen[key](arg)
} catch (error) {
return reject(error)
}
// gen.next() 得到的结果是一个 { value, done } 的结构
const { value, done } = genResult
if (done) {
// 如果已经完成了 就直接 resolve 这个 promise
// 这个 done 是在最后一次调用 next 后才会为 true
// 以本文的例子来说 此时的结果是 { done: true, value: 'success' }
// 这个 value 也就是 generator 函数最后的返回值
return resolve(value)
} else {
// 除了最后结束的时候外,每次调用 gen.next()
// 其实是返回 { value: Promise, done: false } 的结构,
// 这里要注意的是 Promise.resolve 可以接受一个 promise 为参数
// 并且这个 promise 参数被 resolve 的时候,这个 then 才会被调用
return Promise.resolve(
// 这个 value 对应的是 yield 后面的 promise
value
).then(
// value 这个 promise 被 resove 的时候,就会执行 next
// 并且只要 done 不是 true 的时候 就会递归的往下解开 promise
// 对应 gen.next().value.then(value => {
// gen.next(value).value.then(value2 => {
// gen.next()
//
// 此时 done 为 true 了 整个 promise 被 resolve 了
// 最外部的 test().then(res => console.log(res)) 的 then 就开始执行了
// })
// })
function onResolve(val) {
step('next', val)
},
// 如果 promise 被 reject 了 就再次进入 step 函数
// 不同的是,这次的 try catch 中调用的是 gen.throw(err)
// 那么自然就被 catch 到 然后把 promise 给 reject 掉啦
function onReject(err) {
step('throw', err)
}
)
}
}
step('next')
})
}
}
var getData = () =>
new Promise((resolve) => setTimeout(() => resolve('data'), 1000))
function* testG() {
const data = yield getData()
console.log('data: ', data)
const data2 = yield getData()
console.log('data2: ', data2)
return 'success'
}
var gen = asyncToGenerator(testG)
gen().then((res) => console.log(res))
js
function asyncToGen(genFunction) {
return function (...args) {
const gen = genFunction.apply(this, args)
return new Promise((resolve, reject) => {
function step(key, arg) {
let genResult
try {
genResult = gen[key](arg)
} catch (err) {
return reject(err)
}
const { value, done } = genResult
if (done) {
return resolve(value)
}
return Promise.resolve(value).then(
(val) => {
step('next', val)
},
(err) => {
step('throw', err)
}
)
}
step('next')
})
}
}
const getData = () =>
new Promise((resolve) => setTimeout(() => resolve('data'), 1000))
function* testG() {
const data = yield getData()
console.log('data: ', data)
const data2 = yield getData()
console.log('data2: ', data2)
return 'success'
}
const gen = asyncToGen(testG)
gen().then((res) => console.log(res))
js
// 本质是希望实现一个 co 函数
let delay = function (time, fnc) {
setTimeout(() => {
fnc(time)
}, time)
}
let promisefy = (fn) => {
return (...arg) => {
return new Promise((resolve, reject) => {
fn(...arg, (param) => {
resolve(param)
})
})
}
}
let delayP = promisefy(delay)
const gen = function* () {
const ret1 = yield delayP(1000)
console.log(ret1)
const ret2 = yield delayP(2000)
console.log(ret2)
}
// 阴间写法
const g = gen()
g.next().value.then((res1) => {
g.next(res1).value.then((res2) => {
//
})
})
// 正常写法
function co(generator) {
return new Promise((resolve, reject) => {
const gen = generator()
function next(...param) {
let tmp = gen.next(...param)
if (tmp.done) {
resolve(tmp.value)
return
}
tmp.value.then((...ret) => {
next(...ret)
})
}
next()
})
}
co(gen).then((res) => {
console.log(res)
})
js
// 基于前面的测试用例实现一个简单版的(代码可以复制运行)
// - 本质上是 generator + promise
// - generator 的 done 为 false 时,递归
// - generator 的 done 为 true 时,递归终止,resolve 结果
// - generator 的 value 和 done 状态是迭代器协议的返回值
/**
* async 的实现
* @author waldon
* @param {*} generatorFn - 生成器函数
*/
function asyncWrapper(generatorFn) {
const g = generatorFn()
return new Promise((resolve, reject) => {
function autoNext(g, nextVal) {
const { value, done } = g.next(nextVal)
if (!done) {
value.then((res) => {
autoNext(g, res)
})
} else {
resolve(value)
}
}
autoNext(g)
})
}
// 测试
const getData = () =>
new Promise((resolve) => setTimeout(() => resolve('data'), 1000))
function* testG() {
const data = yield getData()
console.log('data: ', data)
const data2 = yield getData()
console.log('data2: ', data2)
return 'success'
}
asyncWrapper(testG).then((res) => {
console.log(res)
})
// 期望顺序输出 data data2 success
实现一个异步的 sum/add
这里有一道不错的面试题,水平较高,涉及 promise、串行、并行、并发控制,层次递进
查看详细:实现一个异步的 sum/add
并发控制 async-pool
解析源码 tiny-async-pool
asyncPool(concurrency, iterable, iteratorFn)
在有限的并发池中运行多个承诺返回和异步函数。一旦其中一个承诺被拒绝,它就会立即拒绝。它会尽快调用迭代器函数(在并发限制下)。它返回一个异步迭代器,该迭代器在承诺完成后立即生成(在并发限制下)。
asyncPool 有如下三个版本的实现
js
// ES9
async function* asyncPool(concurrency, iterable, iteratorFn) {
const executing = new Set()
async function consume() {
const [promise, value] = await Promise.race(executing)
executing.delete(promise)
return value
}
for (const item of iterable) {
// Wrap iteratorFn() in an async fn to ensure we get a promise.
// Then expose such promise, so it's possible to later reference and
// remove it from the executing pool.
const promise = (async () => await iteratorFn(item, iterable))().then(
(value) => [promise, value]
)
executing.add(promise)
if (executing.size >= concurrency) {
yield await consume()
}
}
while (executing.size) {
yield await consume()
}
}
// 用法 1
import asyncPool from 'tiny-async-pool'
const timeout = (ms) =>
new Promise((resolve) => setTimeout(() => resolve(ms), ms))
// ES9 for await...of
for await (const value of asyncPool(2, [1000, 5000, 3000, 2000], timeout)) {
console.log(value)
}
// 用法 2
async function asyncPoolAll(...args) {
const results = [];
for await (const result of asyncPool(...args)) {
results.push(result);
}
return results;
}
// ES7 API style available on our previous 1.x version
const results = await asyncPoolAll(concurrency, iterable, iteratorFn);
// ES6 API style available on our previous 1.x version
return asyncPoolAll(2, [1000, 5000, 3000, 2000], timeout).then(results => {...});
js
// ES7
async function asyncPool(poolLimit, iterable, iteratorFn) {
const ret = []
const executing = new Set()
for (const item of iterable) {
const p = Promise.resolve().then(() => iteratorFn(item, iterable))
ret.push(p)
executing.add(p)
const clean = () => executing.delete(p)
p.then(clean).catch(clean)
if (executing.size >= poolLimit) {
await Promise.race(executing)
}
}
return Promise.all(ret)
}
js
// ES6
function asyncPool(poolLimit, iterable, iteratorFn) {
let i = 0
const ret = []
const executing = new Set()
const enqueue = function () {
if (i === iterable.length) {
return Promise.resolve()
}
const item = iterable[i++]
const p = Promise.resolve().then(() => iteratorFn(item, iterable))
ret.push(p)
executing.add(p)
const clean = () => executing.delete(p)
p.then(clean).catch(clean)
let r = Promise.resolve()
if (executing.size >= poolLimit) {
r = Promise.race(executing)
}
return r.then(() => enqueue())
}
return enqueue().then(() => Promise.all(ret))
}