Skip to content
大纲

手写 async/await 的实现

包含以下功能

  1. 手写 async/await 的实现
  2. 异步并发、串行、并行
  3. 并发控制
  4. 请求封装及异步控制

手写 async/await 的实现

来源:https://github.com/sisterAn/JavaScript-Algorithms/issues/56

await 内部实现了 generator,其实 await 就是 generator 加上 Promise 的语法糖,且内部实现了自动执行 generator。如果你熟悉 co 的话,其实自己就可以实现这样的语法糖。

js
/**
 * async/await 实现
 * @param {*} generatorFunc
 */
function asyncToGenerator(generatorFunc) {
  // 返回的是一个新的函数
  return function (...args) {
    // 先调用 generator 函数 生成迭代器
    // 对应 var gen = testG()
    const gen = generatorFunc.apply(this, args)

    // 返回一个 promise 因为外部是用.then 的方式 或者 await 的方式去使用这个函数的返回值的
    // var test = asyncToGenerator(testG)
    // test().then(res => console.log(res))
    return new Promise((resolve, reject) => {
      // 内部定义一个 step 函数 用来一步一步的跨过 yield 的阻碍
      // key 有 next 和 throw 两种取值,分别对应了 gen 的 next 和 throw 方法
      // arg 参数则是用来把 promise resolve 出来的值交给下一个 yield
      function step(key, arg) {
        let genResult

        // 这个方法需要包裹在 try catch 中
        // 如果报错了 就把 promise 给 reject 掉 外部通过.catch 可以获取到错误
        try {
          genResult = gen[key](arg)
        } catch (error) {
          return reject(error)
        }

        // gen.next() 得到的结果是一个 { value, done } 的结构
        const { value, done } = genResult

        if (done) {
          // 如果已经完成了 就直接 resolve 这个 promise
          // 这个 done 是在最后一次调用 next 后才会为 true
          // 以本文的例子来说 此时的结果是 { done: true, value: 'success' }
          // 这个 value 也就是 generator 函数最后的返回值
          return resolve(value)
        } else {
          // 除了最后结束的时候外,每次调用 gen.next()
          // 其实是返回 { value: Promise, done: false } 的结构,
          // 这里要注意的是 Promise.resolve 可以接受一个 promise 为参数
          // 并且这个 promise 参数被 resolve 的时候,这个 then 才会被调用
          return Promise.resolve(
            // 这个 value 对应的是 yield 后面的 promise
            value
          ).then(
            // value 这个 promise 被 resove 的时候,就会执行 next
            // 并且只要 done 不是 true 的时候 就会递归的往下解开 promise
            // 对应 gen.next().value.then(value => {
            //    gen.next(value).value.then(value2 => {
            //       gen.next()
            //
            //      此时 done 为 true 了 整个 promise 被 resolve 了
            //      最外部的 test().then(res => console.log(res)) 的 then 就开始执行了
            //    })
            // })
            function onResolve(val) {
              step('next', val)
            },
            // 如果 promise 被 reject 了 就再次进入 step 函数
            // 不同的是,这次的 try catch 中调用的是 gen.throw(err)
            // 那么自然就被 catch 到 然后把 promise 给 reject 掉啦
            function onReject(err) {
              step('throw', err)
            }
          )
        }
      }
      step('next')
    })
  }
}

var getData = () =>
  new Promise((resolve) => setTimeout(() => resolve('data'), 1000))
function* testG() {
  const data = yield getData()
  console.log('data: ', data)
  const data2 = yield getData()
  console.log('data2: ', data2)
  return 'success'
}

var gen = asyncToGenerator(testG)
gen().then((res) => console.log(res))
js
function asyncToGen(genFunction) {
  return function (...args) {
    const gen = genFunction.apply(this, args)
    return new Promise((resolve, reject) => {
      function step(key, arg) {
        let genResult
        try {
          genResult = gen[key](arg)
        } catch (err) {
          return reject(err)
        }
        const { value, done } = genResult
        if (done) {
          return resolve(value)
        }
        return Promise.resolve(value).then(
          (val) => {
            step('next', val)
          },
          (err) => {
            step('throw', err)
          }
        )
      }
      step('next')
    })
  }
}

const getData = () =>
  new Promise((resolve) => setTimeout(() => resolve('data'), 1000))
function* testG() {
  const data = yield getData()
  console.log('data: ', data)
  const data2 = yield getData()
  console.log('data2: ', data2)
  return 'success'
}

const gen = asyncToGen(testG)
gen().then((res) => console.log(res))
js
// 本质是希望实现一个 co 函数

let delay = function (time, fnc) {
  setTimeout(() => {
    fnc(time)
  }, time)
}

let promisefy = (fn) => {
  return (...arg) => {
    return new Promise((resolve, reject) => {
      fn(...arg, (param) => {
        resolve(param)
      })
    })
  }
}

let delayP = promisefy(delay)

const gen = function* () {
  const ret1 = yield delayP(1000)
  console.log(ret1)
  const ret2 = yield delayP(2000)
  console.log(ret2)
}

// 阴间写法
const g = gen()
g.next().value.then((res1) => {
  g.next(res1).value.then((res2) => {
    //
  })
})

// 正常写法
function co(generator) {
  return new Promise((resolve, reject) => {
    const gen = generator()
    function next(...param) {
      let tmp = gen.next(...param)
      if (tmp.done) {
        resolve(tmp.value)
        return
      }
      tmp.value.then((...ret) => {
        next(...ret)
      })
    }
    next()
  })
}

co(gen).then((res) => {
  console.log(res)
})
js
// 基于前面的测试用例实现一个简单版的(代码可以复制运行)
// - 本质上是 generator + promise
//   - generator 的 done 为 false 时,递归
//   - generator 的 done 为 true 时,递归终止,resolve 结果
// - generator 的 value 和 done 状态是迭代器协议的返回值

/**
 * async 的实现
 * @author waldon
 * @param {*} generatorFn - 生成器函数
 */
function asyncWrapper(generatorFn) {
  const g = generatorFn()
  return new Promise((resolve, reject) => {
    function autoNext(g, nextVal) {
      const { value, done } = g.next(nextVal)
      if (!done) {
        value.then((res) => {
          autoNext(g, res)
        })
      } else {
        resolve(value)
      }
    }
    autoNext(g)
  })
}

// 测试
const getData = () =>
  new Promise((resolve) => setTimeout(() => resolve('data'), 1000))

function* testG() {
  const data = yield getData()
  console.log('data: ', data)
  const data2 = yield getData()
  console.log('data2: ', data2)
  return 'success'
}

asyncWrapper(testG).then((res) => {
  console.log(res)
})

// 期望顺序输出 data data2 success

实现一个异步的 sum/add

这里有一道不错的面试题,水平较高,涉及 promise、串行、并行、并发控制,层次递进

查看详细:实现一个异步的 sum/add

并发控制 async-pool

解析源码 tiny-async-pool

  • asyncPool(concurrency, iterable, iteratorFn)

在有限的并发池中运行多个承诺返回和异步函数。一旦其中一个承诺被拒绝,它就会立即拒绝。它会尽快调用迭代器函数(在并发限制下)。它返回一个异步迭代器,该迭代器在承诺完成后立即生成(在并发限制下)。

asyncPool 有如下三个版本的实现

js
// ES9
async function* asyncPool(concurrency, iterable, iteratorFn) {
  const executing = new Set()
  async function consume() {
    const [promise, value] = await Promise.race(executing)
    executing.delete(promise)
    return value
  }
  for (const item of iterable) {
    // Wrap iteratorFn() in an async fn to ensure we get a promise.
    // Then expose such promise, so it's possible to later reference and
    // remove it from the executing pool.
    const promise = (async () => await iteratorFn(item, iterable))().then(
      (value) => [promise, value]
    )
    executing.add(promise)
    if (executing.size >= concurrency) {
      yield await consume()
    }
  }
  while (executing.size) {
    yield await consume()
  }
}

// 用法 1
import asyncPool from 'tiny-async-pool'

const timeout = (ms) =>
  new Promise((resolve) => setTimeout(() => resolve(ms), ms))

// ES9 for await...of
for await (const value of asyncPool(2, [1000, 5000, 3000, 2000], timeout)) {
  console.log(value)
}

// 用法 2
async function asyncPoolAll(...args) {
  const results = [];
  for await (const result of asyncPool(...args)) {
    results.push(result);
  }
  return results;
}

// ES7 API style available on our previous 1.x version
const results = await asyncPoolAll(concurrency, iterable, iteratorFn);

// ES6 API style available on our previous 1.x version
return asyncPoolAll(2, [1000, 5000, 3000, 2000], timeout).then(results => {...});
js
// ES7
async function asyncPool(poolLimit, iterable, iteratorFn) {
  const ret = []
  const executing = new Set()
  for (const item of iterable) {
    const p = Promise.resolve().then(() => iteratorFn(item, iterable))
    ret.push(p)
    executing.add(p)
    const clean = () => executing.delete(p)
    p.then(clean).catch(clean)
    if (executing.size >= poolLimit) {
      await Promise.race(executing)
    }
  }
  return Promise.all(ret)
}
js
// ES6
function asyncPool(poolLimit, iterable, iteratorFn) {
  let i = 0
  const ret = []
  const executing = new Set()
  const enqueue = function () {
    if (i === iterable.length) {
      return Promise.resolve()
    }
    const item = iterable[i++]
    const p = Promise.resolve().then(() => iteratorFn(item, iterable))
    ret.push(p)
    executing.add(p)
    const clean = () => executing.delete(p)
    p.then(clean).catch(clean)
    let r = Promise.resolve()
    if (executing.size >= poolLimit) {
      r = Promise.race(executing)
    }
    return r.then(() => enqueue())
  }
  return enqueue().then(() => Promise.all(ret))
}