본문 바로가기

Web[웹]/ES6+ 함수형 언어

[JS: ES6+] 비동기: 동시성 프로그래밍 (2)

 

#1 지연 평가와 Promise

 

지연 평가 + Promise - L.map, map과 take

 

먼저, go 안에 Promise로 구성하여 출력해보자.

 

go([Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)],
   L.map(a => a + 10),
   take(2),
   console.log); // ["[object Promise]10", "[object Promise]10"]

 

go에 Promise로 구성하면, 정상적인 연산이 되지 않는다. 이를 정상적으로 연산이 되게끔 구성을 해볼 것 이다.

 

앞에서 사용했던 L.map 함수를 보자.

 

L.map = curry(function* (f, iter) {
   for (const a of iter) {
     yield f(a);
   }
 });

 

이를 앞챕터에서 썼던 go1함수를 통해서 해결해보자.

 

const go1 = (a, f) => a instanceof Promise ? a.then(f) : f(a);

L.map = curry(function* (f, iter) {
   for (const a of iter) {
      yield go1(a, f);
   }
});
/* --------------------------------------------------------------- */
/* ▼(2) [Promise, Promise]
     ▶0: Promise {<resolved>: 11}
     ▶1: Promise {<resolved>: 12}
        length: 2
     ▶__proto__: Array(0)
*/
go([Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)],
   L.map(a => a + 10),
   take(2),
   console.log);

 

위와같이 go1을 감싸주면 11, 12가 될 수 있는 값으로 만들어 줄 수가 있다.

 

이제 take 함수를 Promise를 통해 값이 정상적으로 돌 수 있도록 수정해주자.

 

/* 변경 전 take */
const take = curry((l, iter) => {
  let res = [];
  iter = iter[Symbol.iterator]();
  let cur;
  while (!(cur = iter.next()).done) {
    const a = cur.value;
    res.push(a);
    if (res.length == l) return res;
  }
  return res;
});

/* 변경 후 take */
const take = curry((l, iter) => {
   let res = [];
   iter = iter[Symbol.iterator]();
   return function recur(){
      let cur;
      while (!(cur = iter.next()).done) {
         const a = cur.value;
         if(a instanceof Promise) return a.then(
            a => (res.push(a), res).length == l ? res : recur());
         res.push(a);
         if (res.length == l) return res;
      }
      return res;
   } ();
});

 

위와 같이 변경을 해주면, 정상적으로 [11, 12]처럼 값이 나올 것이다.

 

/* 비동기 상황에서도 동작 */
go([Promise.resolve(1), Promise.resolve(2), Promise.resolve(3)],
   L.map(a => a + 10),
   take(2),
   console.log); // [11, 12]

/* 동기 상황에서도 동작 */
go([1, 2, 3],
   L.map(a => a + 10),
   take(2),
   console.log); // [11, 12]   

 

 

 


 

 

Kleisli Composition - L.filter, filter, nop, take

 

먼저, go구문에서 앞에서 오는 값이 Promise가 되어서 오거나, filter가 내뱉는 값이 Promise일때, 어떻게 하면 지연평가도 되면서 비동기 동시성을 모두 지원할 수 있는지 알아보자.

 

go([1, 2, 3, 4, 5, 6],
    L.map(a => Promise.resolve(a * a)),
    L.filter(a => a % 2),
    take(2),
    console.log); // []

 

현재 위 코드처럼 하면 filter에 넘어오는 값이 Promise인체로 넘어오기 때문에 정상적으로 동작하지 못하는 걸 알 수 있다.

 

앞에서 작성한 filter를 다음과 같이 바꿔준 후 take도 바꿔준다.

 

const nop = Symbol('nop'); //reject 문구

L.filter = curry(function* (f, iter){
   for(const a of iter){
      const b = go1(a, f);
      if(b instanceof Promise) yield b.then(b => b ? a : Promise.reject(nop));
      else if(b) yield a;
   }
});


const take = curry((l, iter) => {
   let res = [];
   iter = iter[Symbol.iterator]();
   return function recur(){
      let cur;
      while(!(cur = iter.next()).done){
         const a = cur.value;
         if(a instanceof Promise){
         return a
            .then(a => (res.push(a), res).length == l ? res : recur())
            .catch(e => e == nop ? recur() : Promise.reject(e));
         }
         res.push(a);
         if(res.length == l) return res;
      }
      return res;
   }();
});

 

위와같이 go1, reject, catch를 이용해 비동기, 동기 상황에서도 둘다 동작하게끔 되었다.

 

/* map에서 Promise여도 동작 */
go([1, 2, 3, 4, 5, 6],
    L.map(a => Promise.resolve(a * a)),
    L.filter(a => a % 2),
    take(2),
    console.log);

/* 동기 상황에서도 동작 */
go([1, 2, 3, 4, 5, 6],
    L.map(a => a * a),
    L.filter(a => a % 2),
    take(2),
    console.log);

/* 비동기 상황에서도 동작 */
go([1, 2, 3, 4, 5, 6],
    L.map(a => Promise.resolve(a * a)),
    L.filter(a => Promise.resolve(a % 2)),
    take(2),
    console.log);

 

Kleisli 합성을 이용해서 reject에 nop을 넘겨서, 그 구분자를 가지고 take에서 값을 담거나 안담거나를 이용해 비동기 동시성과 지연평가가 가능해졌다.

 

 

 


 

 

reduce에서 nop 지원

 

지연성과 Promise를 잘지원하는 reduce를 만들어 보자.

 

go([1, 2, 3, 4],
   L.map(a => Promise.resolve(a * a)),
   L.filter(a => Promise.resolve(a % 2)),
   reduce(add)
   console.log);

 

위 코드는 에러가 나는데 그 이유는 아래의 reduce 코드에서 볼 수 있다.

 

const reduce = curry((f, acc, iter) => {
   if (!iter) {
     iter = acc[Symbol.iterator]();
     acc = iter.next().value;
   } else {
     iter = iter[Symbol.iterator]();
   }
   return go1(acc, function recur(acc) {
     let cur;
     while (!(cur = iter.next()).done) {
       const a = cur.value;
       acc = f(acc, a); /* 문제가 되는 구문 */
       if (acc instanceof Promise) return acc.then(recur);
     }
     return acc;
   });
 });

 

reduce의 "acc = f(acc, a)"에 전달되는 값들이 1과 Promise를 전달하는 식으로 동작하기 때문에 정상적으로 동작하지 못한다. 명확하게 잘 동작하도록 바꿔보자.

 

const reduceF = (acc, a, f) =>
  a instanceof Promise ?
    a.then(a => f(acc, a), e => e == nop ? 
    acc : 
    Promise.reject(e)) :
    f(acc, a);

const reduce = curry((f, acc, iter) => {
   if (!iter) {
     iter = acc[Symbol.iterator]();
     acc = iter.next().value;
   } else {
     iter = iter[Symbol.iterator]();
   }
   return go1(acc, function recur(acc) {
     let cur;
     while (!(cur = iter.next()).done) {
       acc = reduceF(acc, cur.value, f); /* reduceF로 받기 */
       if (acc instanceof Promise) return acc.then(recur);
     }
     return acc;
   });
 });

 

이처럼 바꾸면 아래와 같이 정상적으로 동작함을 볼 수 있다.

 

go([1, 2, 3, 4],
   L.map(a => Promise.resolve(a * a)),
   L.filter(a => Promise.resolve(a % 2)),
   reduce(add)
   console.log); // 10

 

좀 더 다양한 경우의 수를 합성해주게끔 할 수 있다.

 

const reduce = curry((f, acc, iter) => {
   if (!iter) {
     iter = acc[Symbol.iterator]();
     acc = iter.next().value;
   }
   .
   .
   .

 

위 코드에 if문 안에 있는 내용을 바꿀것인데, acc에 next를 꺼냈을때 이 꺼내는 부분 역시도 promise를 잘 다룰 수 있도록 짜보자.

 

const head = iter => go1(take(1, iter), ([h]) => h);

const reduce = curry((f, acc, iter) => {
  if(!iter) return reduce(f, head(iter = acc[Symbol.iterator]()), iter);

  iter = iter[Symbol.iterator]();
  return go1(acc, function recur(acc){
    let cur;
    while(!(cur = iter.next()).done){
      acc = reduceF(acc, cur.value, f);
      if(acc instanceof Promise) return acc.then(recur);
    }
    return acc;
  });
});

 

head를 따로 추가해서 head를 뽑아서 이후에 reduce전달하는 코드를 구성하였다.

 

 


 

 

#2 병렬적 평가

 

지연된 함수열을 병렬적으로 평가하기 - C.reduce, C.take

 

먼저 이 챕터의 이해를 위해 아래 코드를 보자.

 

const delay1000 = a => new Promise(resolve =>
   setTimeout(() => resolve(a), 1000));

go([1, 2, 3, 4, 5, 6],
   L.map(a => delay1000(a * a)),
   L.filter(a => a % 2),
   reduce(add),
   console.log);

 

위 코드를 보면, L.map에서 한번 실행할때 마다 1초정도 걸리는데, 총 6개의 인자이므로 6초 정도 걸린다. 이 것을 1개씩 실행시켜서 코드가 순회하기 때문이지만, 이를 순차적이 아닌 병렬적으로 처리한다면 시간은 단축될 것이다.

 

병렬적으로 동작하는 C.reduce를 만들어 보자.

 

const C = {};

C.reduce = curry((f, acc, iter) => iter ?
   reduce(f, acc, [...iter]) :
   reduce(f, [...acc]));

 

reduce로 해당하는 코드를 그대로 넘기지만, 대기된 함수를 모두 다 실행을 시켜버리고 다시한번 reduce에서 값을 꺼낸다. 즉, go 첫 인자로 나온 값들을 실행을 전부 한 후 개별적으로 비동기 제어를 해서 앞에서부터 누적을 시키는 것이다.

 

효율성을 체크해보자.

 

/* 순차적으로 실행 */
console.time('');
go([1, 2, 3, 4, 5, 6],
   L.map(a => delay1000(a * a)),
   L.filter(a => a % 2),
   reduce(add),
   console.log, // 35
   _ => console.timeEnd(''));
   // 11081.9541015625ms
   
-----------------------------------------------

/* 병렬적으로 실행 */
console.time('');
go([1, 2, 3, 4, 5, 6],
   L.map(a => delay1000(a * a)),
   L.filter(a => a % 2),
   C.reduce(add),
   console.log, // 35
   _ => console.timeEnd(''));
   //1091.94384765625ms

 

시간을 체크해보면 확실히 순차적으로 실행한 것이 병렬적보다 느림을 알 수 있다.

 

한가지 체크해보자.

 

const C = {};
C.reduce = curry((f, acc, iter) => iter ?
    reduce(f, acc, [...iter]) :
    reduce(f, [...acc]));

const delay1000 = a => new Promise(resolve =>
   setTimeout(() => resolve(a), 1000));

go([1, 2, 3, 4, 5, 6],
   L.map(a => delay1000(a * a)),
   L.filter(a => delay1000(a % 2)),
   L.map(a => delay1000(a * a)),
   C.reduce(add),
   console.log);

 

위 코드를 실행했을때, catch되지 않은 부분이 있다고 나오지만, 결과는 정상적으로 나온다.  catch되지 않는 부분이 있지만, 이후에 잘 catch를 해서 정리를 해줄 것이기때문에, 비동기적으로 해당하는 에러를 캐치해줄 것이란것을 명시해줘야 한다.

 

C.reduce = curry((f, acc, iter) => {
    const iter2 = iter ? [...iter] : [...acc];
    iter2.forEach(a => a.catch(function() {})); /* 임시적으로 catch */
    return iter ?
       reduce(f, acc, iter2) :
       reduce(f, iter2);
});

 

위와 같이 해주면, 정상적으로 동작하면서, catch에러를 품지않는다.

 

위 코드를 좀 더 간결하게 정리해보자.

 

/* 간결하게 정리 */
function noop() {}
const catchNoop = arr =>
    (arr.forEach(a => a instanceof Promise ? a.catch(noop) : a), arr);

C.reduce = curry((f, acc, iter) => {
    const iter2 = catchNoop(iter ? [...iter] : [...acc]);
    return iter ?
       reduce(f, acc, iter2) :
       reduce(f, iter2);
});

------------------------------------------------------
/* 조금 더 간결하게 정리 */
function noop() {}
const catchNoop = ([...arr]) =>
   (arr.forEach(a => a instanceof Promise ? a.catch(noop) : a), arr);

C.reduce = curry((f, acc, iter) => iter ?
   reduce(f, acc, catchNoop(iter)) :
   reduce(f, catchNoop(acc)));

 

take역시도 똑같은 방식으로 작성할 수 있다.

 

C.take = curry((l, iter) => take(l, catchNoop([...iter])));

 

C.take 역시 병렬적으로 실행되기 때문에, 아래 코드에서 기존 take보다 빠르게 실행된다.

 

go([1, 2, 3, 4, 5, 6],
   L.map(a => delay1000(a * a)),
   L.filter(a => delay1000(a % 2)),
   L.map(a => delay1000(a * a)),
   C.take(2),
   reduce(add),
   console.log);

 

 


 

즉시 병렬적으로 평가하기 - C.map, C.filter

 

이번에 해볼 것은 특정 함수 라인에서만 병렬적으로 평가하고 그 이후엔 그냥 실행하는 C.map, C.filter를 만들어 볼 것이다.

 

앞에서 했던 takeAll을 만들어 다루는 것과 유사하다.

 

C.take = curry((l, iter) => take(l, catchNoop([...iter])));
C.takeAll = C.take(Infinity);

C.map = curry(pipe(L.map, C.takeAll));
C.filter = curry(pipe(L.filter, C.takeAll));

/* 실행 구문 */
C.map(a => delay1000(a * a), [1, 2, 3, 4]).then(console.log); // [1, 4, 9, 16]
C.filter(a => delay1000(a % 2), [1, 2, 3, 4]).then(console.log); // [1, 3]

 

위와같이 C.map, C.filter를 실행해보면, 동시에 평가되는 것을 확인 할 수 있다.