'Rxjs'에 해당되는 글 3건

  1. 2019.11.03 RxJS - Scheduler 1
  2. 2019.11.03 RxJS - Subject
  3. 2019.11.03 RxJS - Subscription(구독)
JavaScript/RxJS2019. 11. 3. 17:27
반응형

Scheduler(스케줄러)는 무엇일까요? Scheduler는 구독의 시작과 알림을 전달하는 시기를 제어합니다. Scheduler는 세개의 구성요소로 구성되어 있습니다.

  • Scheduler는 자료구조입니다. 우선순위나 다른 기준에 따라서 작업을 저장하고 대기열에 넣는 방법을 알고 있습니다.
  • Scheduler는 실행 컨텍스트 입니다. 작업이 실행되는 장소와 시간을 나타냅니다.(예를 들어 즉시 혹은 setTimeout, process.nextTick, the animation frame 등과 같은 다른 콜백 메커니즘)
  • 스케줄러에는 (가상의)시계가 있습니다. 스케줄러의 getter 메소드 now()에 의해 "시간"개념을 제공합니다. 특정 스케줄러에서 예약된 작업은 해당 스케줄러의 시계가 나타내는 시간만 준수합니다.
  • 스케줄러를 사용하면 Observable 어떤 실행 콘텍스트에서 Observer에게 알림을 전달할지 정의할 수 있습니다.

아래의 예에서 값 1, 2, 3을 동기적으로 내보내는(emits) 보통의 단순한 Observable에 연산자 observeOn 사용해서 값을 전달하는데 사용할 비동기 스케줄러를 지정합니다.

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

const observable = new Observable((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
}).pipe(
  observeOn(asyncScheduler)
);

console.log('just before subscribe');
observable.subscribe({
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
     console.log('done');
  }
});
console.log('just after subscribe');

출력과 같이 실행됩니다.

just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

지금까지 본 기본적인 동작과 다른 구독 바로 직후에 값이 전달된 것에 주목하십시오. 이는 observeOn(asyncScheduler)가 새로운 Observable과 최종 Observer 사이에 프록시 Observer를 도입하기 때문입니다. 예제 코드에서 구분을 명확히 하기 위해서 몇개의 식별자의 이름을 변경하겠습니다.

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

var observable = new Observable((proxyObserver) => {
  proxyObserver.next(1);
  proxyObserver.next(2);
  proxyObserver.next(3);
  proxyObserver.complete();
}).pipe(
  observeOn(asyncScheduler)
);

var finalObserver = {
  next(x) {
    console.log('got value ' + x)
  },
  error(err) {
    console.error('something wrong occurred: ' + err);
  },
  complete() {
     console.log('done');
  }
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

observeOn(asyncScheduler)에서 작성된 proxyObserver의 함수 next(val)는 대략 아래와 같습니다.

const proxyObserver = {
  next(val) {
    asyncScheduler.schedule(
      (x) => finalObserver.next(x),
      0 /* delay */,
      val /* will be the x for the function above */
    );
  },

  // ...
}

비동기 스케줄러는 설령 지정된 지연 시간이 0 이더라도 setTimeout 혹은 setTimeout과 함께 동작합니다. JavaScript에서 setTimeout(fn, 0)는 보통 다음 이벤트 루프에서 함수 fn을 가장 빠르게 실행하는 것으로 알려져 있습니다. 이는 왜 구독 바로 직후에 값 1이 finalObserver에게 전달되었는지를 설명합니다.

스케줄러의 schedule() 메소드는 스케줄러 내부 시계와 관련된 시간량으로 참조되는 지연시간을 인자로 받습니다. 스케줄러의 시계는 실제의 시간과 관련이 있을 필요가 없습니다. 지연 연산과 같은 시간과 관계된 연산자는 실제 시간이 아니라 스케줄러의 시계에 의해서 동작합니다. 이는 실제로 스케줄된 작업을 동기적으로 실행하는 동안 가상시간 스케줄러를 이용하여 시간을 위조해야하는 테스트에 매우 유용합니다.

Scheduler Types

비동기 스케줄러는 RxJS가 제공하는 빌트인 스케줄러의 하나입니다. 그것은 각각 스케줄러 오브젝트의 정적 프로퍼티에 의해서 생성되어 반환될 수 있습니다.

스케줄러 목적
null 스케줄러를 전달하지 않으면 동기 및 재귀적으로 전달됩니다. 이것은 일정시간 연산 혹은 꼬리재귀에 사용할 수 있습니다.
queueScheduler 현재의 이벤트 프레임(trampoline scheduler) 큐에 스케줄링합니다. 반복 연산자에 사용하십시오.
asapScheduler ``Promise`에 사용되는 것과 동일한 마이크로 작업 큐에 스케줄링 합니다. 기본적으로 현재 잡이후 다음 잡이 실행되지 전에 스케줄링 됩니다. 비동기 변환에 사용됩니다.
asyncScheduler setInterval과 함께 동작합니다. 시간 기반 연산자에 사용됩니다.
animationFrameScheduler 다음 브라우저 재구획 직전에 스케줄링 됩니다. 부드러운 브라우저 애니메이션을 위해 사용할 수 있습니다.

Using Schedulers

당신이 RxJS 코드상에서 명시적으로 스케줄러를 지정하지 않았더라도 이미 사용되었을 수 있습니다. 왜냐하면 동시성을 처리하는 모든 Observable 연산자는 선택적인 스케줄러를 가지고 있기 때문입니다. 스케줄러를 지정하지 않았을 경우 RxJS는 최소 동시성 원칙을 사용하여 기본 스케줄러를 선택합니다. 이것은 선택된 연산자가 요구하는 최소한의 동시성을 만족시키는 스케줄러가 선택됨을 의미합니다. 예를 들어 작고 유한한 수의 메시지를 반환하는 Observable의 연산자는 스케줄러를 사용하지 않습니다. 즉 null 혹은 undefined 스케줄러 입니다. 잠재적으로 대량 혹은 무한대의 메시지가 반환되는 연산자의 경우 큐 스케줄러가 사용됩니다. 타이머를 사용하는 연산자의 경우에는 비동기 스케줄러가 사용됩니다.

RxJS는 최소한의 동시성 스케줄러를 사용하므로 성능을 목적으로 동시성을 도입하려는 경우 다른 스케줄러를 지정할 수 있습니다. 특정 스케줄러를 지정하기 위해 스케줄러를 사용하는 연산자 메소드, 예를 들어 from ([10, 20, 30], asyncScheduler)을 사용할 수 있습니다.

일반적으로 정적 생성 연산자는 스케줄러를 인자로 받습니다. 예를 들어 from (array, scheduler)를 사용하면 배열에서 변환된 각각의 알림을 전달할 때 사용할 스케줄러를 지정할 수 있습니다. 보통 연산자의 마지막 인자입니다. 다음의 정적 생성 연산자는 스케줄러를 인자로 받습니다.

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

subscribeOn을 사용하여 subscribe()가 호출될 컨텍스트를 스케줄링 하십시오. 기본적으로 Observablesubscribe() 호출은 동기적으로 즉시 발생합니다. 그러나 인스턴스의 연산자 subscribeOn(scheduler)를 사용하여 scheduler에 인수로 지정한 스케줄러에 의해 실제 구독을 지연시키거나 스케줄링 할 수 있습니다.

observeOn을 사용하여 알림이 전달 될 컨텍스트를 스케줄링합니다. 위의 예에서 알 수 있듯이 인스턴스 연산자 observeOn(scheduler)는 소스 Observable 대상 Observable 사이에 지정한 스케줄러를 사용해서 스케줄링 해주는 중재자 Observer를 도입합니다.

인스턴스 연산자는 인자로 스케줄러를 받을 수 있습니다.

bufferTime, debounceTime, delay, auditTime, sampleTime,throttleTime, timeInterval, timeout, timeoutWith, windowTime과 같은 시간 관련 연산자 모두 스케줄러를 마지막 인자 받으며 지정하지 않을 경우 asyncScheduler를 사용합니다.

이외에도 cache, combineLatest, concat, expand, merge, publishReplay, startWith가 스케줄러를 인자로 받습니다.

cachepublishReplayReplaySubject를 사용하므로 스케줄러를 받는 것에 주의하기 바랍니다. 스케줄러의 컨텍스트에서만 의미가 있긴하지만 ReplaySubject는 시간을 처리할 수 있으므로 ReplaySubjects의 생성자는 마지막 인자로 스케줄러를 선택적으로 받습니다. 기본적으로 ReplaySubject는 큐 스케줄러를 이용하여 시간을 제공합니다.

출처

이글은 RxJS 공식 사이트 Scheduler, CC BY 4.0 페이지의 번역물 입니다.

 

이는 왜

'JavaScript > RxJS' 카테고리의 다른 글

RxJS - Subject  (0) 2019.11.03
RxJS - Subscription(구독)  (0) 2019.11.03
Posted by Reiphiel
JavaScript/RxJS2019. 11. 3. 17:23
반응형

Subject는 무엇일까요? RxJSSubject는 많은 Observer에게 값을 다중캐스트 할 수 있는 Observable의 특별한 유형중의 하나 입니다. 단순한 Observable이 유니캐스트(구독중인 Observer가 각각 독립적으로 실행되는 Observable을 가짐)인 것에 반해 Subject는 다중캐스트 입니다.

SubjectObservable과 비슷하지만 많은 Observer에게 다중캐스트할 수 있습니다. SubjectEventEmitter(많은 Listener의 목록을 관리)와 유사합니다.

모든 SubjectObservable입니다. 주어진 SubjectObserver를 제공하여 구독함으로서 정상적으로 값을 받기 시작할 수 있습니다. Observer의 관점으로 보면 Observable의 실행이 단순한 유니캐스트 Observable인지 Subject인지 판단할 수 없습니다.

Subject 내부적으로 구독(subscribe를 호출하는 것을 의미)은 값을 전달하는 새로운 실행을 호출하지 않습니다. 다른 라이브러리 혹은 언어의 addListener와 유사하게 단순하게 지정된 Observer를 Observers 목록에 등록할 뿐입니다.

모든 SubjectObserver 관찰자이기도 합니다. next(v), error(e), complete()와 같은 메소드를 가진 오브젝트입니다. 단순히 next(theValue)를 호출해서 Subject에게 새로운 값을 공급함으로서 Subject를 구독하고 있는 등록된 Observer에게 다중캐스트 됩니다.

아래의 예에서는 Subject에 두 개의 Observer가 두 개의 관찰자가 붙여져 있으며 Subject에 몇개의 값을 공급합니다.

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(1);
subject.next(2);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

SubjectObserver이기도하므로 아래의 예와 같이 Subject를 다른 Observable를 구독하기 위한 인자로 전달할 수 있음을 의미합니다.

import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

const observable = from([1, 2, 3]);

observable.subscribe(subject); // Subject로 subscribe에 인자로 전달할 수 있습니다.

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

위의 접근 방식은 본질적으로 Subject를 통해 유니캐스트를 다중캐스트로 변환한 것입니다. 이것은 Observable의 실행을 여러 Observer에게 공유할 수 있는 유일한 방법이 Subject임을 보여줍니다.

BehaviorSubject, ReplaySubject, 그리고 AsyncSubject라는 몇가지 특화 Subject 유형도 존재합니다.

다중캐스트 Observable

다중캐스트 Observable 은 많은 구독자를 가지고 있을 수 있는 Subject를 통해서 알림을 전달하는 반면에 단순 유니캐스트 Observable은 단일 Observer에게 알림을 전달합니다.

다중캐스트 Observable은 내부의 Subject를 사용해서 복수의 Observer에게 동일한 Observable의 실행을 인식시킵니다.

내부적으로 다중캐스트의 동작방식은 다음과 같습니다. Observer는 하부의 Subject를 구독하고 그 Subject가 원천 Observable을 구독합니다. 다음 예제는 observable.subscribe(subject)를 사용하는 이전 예제와 비슷합니다.

import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

// 내부적으로 `subject.subscribe({...})` 호출:
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

// 내부적으로 `source.subscribe(subject)`을 호출:
multicasted.connect();

multicast는 보통의 Observable처럼 보이는 Observable을 반환하지만 구독시에 Subject처럼 동작합니다. multicast는 간단하게 connect() 메소드를 가진 ObservableConnectableObservable을 반환합니다.

connect() 메소드는 공유된 Observable의 실행을 결정하는데 중요합니다. 왜냐하면 connect()는 내부적으로 source.subscribe(subject)을 실행하고 공유된 Observable의 실행을 취소하기 위한 unsubscribe가 가능한 Subscription을 반환합니다.

Reference counting

connect()를 수동으로 호출하고 구독을 처리하는 것을 종종 성가신 일입니다. 보통 첫번째 Observer가 도착하면 연결하고 마지막 Observer가 구독을 해제하면 자동적으로 실행이 취소되기를 바랍니다.

이 목록에 요약 된대로 구독이 발생하는 다음 예를 자세히 확인해 보십시오.

  1. 첫번째 Observer는 다중캐스트된 Observable을 구독합니다.
  2. 다중캐스트된 Observable이 연결됩니다.
  3. 다음 값으로 0 이 첫번째 Observer에게 전달됩니다.
  4. 두번째 Observer가 다중캐스트된 Observable을 구독합니다.
  5. 다음 값 1 이 첫번째 Observer에게 전달됩니다.
  6. 다음 값 1 이 두번째 Observer에게 전달됩니다.
  7. 첫번째 Observer가 다중캐스트된 Observable의 구독을 해지 합니다.
  8. 다음 값 2 이 두번째 Observer에게 전달됩니다.
  9. 두번째 Observer가 다중캐스트된 Observable의 구독을 해지 합니다.
  10. 다중캐스트된 Observable에 연결이 구독 해지됩니다.

connect()를 명시적으로 호출하여 위의 예제를 실현하기 위해서 아래와 같이 코드를 기술했습니다.

import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

명시적으로 connect()를 호출하는 것을 피하기 위해서 구독자 수를 추적하는 Observable을 반환하는 ConnectableObservablerefCount()(reference counting) 메소드를 사용할 수 있습니다. 구독자수가 0에서 1로 증가할 때 connect()를 호출하고 공유된 실행이 시작됩니다. 구독자 수가 1에서 0으로 감소할때만 완전히 구독이 해지되어 추가적인 실행이 중지됩니다.

refCount는 첫번째 구독자가 도착할 때 다중캐스트된 Observable의 실행을 시작하고 마지막 구독자가 떠날때 실행을 중지합니다.

아래는 예제입니다.

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed

refCount() 메소드는 ConnectableObservable에만 존재하며 다른 ConnectableObservable이 아닌 Observable을 반환합니다.

BehaviorSubject

Subject의 변형 중 하나로 "현재 값"이라는 개념을 가진 BehaviorSubject가 있습니다. 컨슈머에게 내보냈던 가장 최근의 값을 저장하고 새로운 Observer가 구독을 시작할 때마다 "현재 값"을 BehaviorSubject로 부터 즉시 수신할 수 있습니다.

BehaviorSubject는 시간에 따른 값을 나타내는데 유용합니다. 예를 들어 생일에 대한 이벤트 스트림은 Subject이지만 나이의 이벤트 스트림은 BehaviorSubject가 될 수 있습니다.

다음 예에서는 첫번째 Observer가 구독할 때 수신할 값 0으로 BehaviorSubject가 초기화 됩니다. 두번째 Observer 값 2가 전송된 이후에 구독을 시작하더라도 값 2를 수신합니다.

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(3);

// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject

ReplaySubject는 이전 값을 신규 구독자에게 보낼 수 있다는 점에서 BehaviorSubject와 비슷 하지만 Observable의 실행의 일부를 기록하는 것도 가능합니다.

ReplaySubjectObservable 실행으로 부터 여러개의 값을 기록해 두고 이를 새 구독자에게 다시 전송합니다.

ReplaySubject를 생성할 때 몇개의 값을 다시 전송할지 지정할 수 있습니다.

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 새로운 구독자에게 전송할 값의 버퍼를 3으로 지정합니다.

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

또한 버퍼 사이즈 이외에 유예시간을 밀리 초 단위로 지정해서 기록된 값이 얼마나 유효할지 결정할 수 있습니다. 다음 예제에서 버퍼 사이즈를 100으로 크게 지정했지만 유예시간의 파라미터를 500 밀리 초로 짧게 지정했습니다.

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject

AsyncSubjectObservable의 실행이 완료될때 마지막 값만 Observer에게 전달되는 변종입니다.

import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

AsyncSubject는 단일값을 전달하기 위해 완료 통지를 기다린다는 점에서 last()연산자(Observable이 가지고 있는)와 비슷합니다.

출처

이글은 RxJS 공식 사이트 Subject, CC BY 4.0 페이지의 번역물 입니다.

 

'JavaScript > RxJS' 카테고리의 다른 글

RxJS - Scheduler  (1) 2019.11.03
RxJS - Subscription(구독)  (0) 2019.11.03
Posted by Reiphiel
JavaScript/RxJS2019. 11. 3. 17:23
반응형

Subscription(구독)은 일반적으로 실행중인 Observable과 같이 사용후 처분가능한 리소스를 나타냅니다. Subscriptionunsubscribe라는 구독이 보유하고 있는 리소스를 처분하는 인자없는 중요한 메소드를 가지고 있습니다. RxJS의 이전 버전에서는 SubscriptionDisposable로 불렸습니다.

import { interval } from 'rxjs';

const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));
// Later:
// This cancels the ongoing Observable execution which
// was started by calling subscribe with an Observer.
subscription.unsubscribe();

Subscription은 본질적으로 리소스를 릴리즈하거나 Observable의 실행을 취소하는 unsubscribe()라는 function을 가집니다.

Subscriptionunsubscribe() 한번호출로 여러 Subscription을 구독해지 할 수 있도록 합칠 수 있습니다. 한 Subscription을 다른 Subscriptionadding 함으로서 합칠 수 있습니다.

import { interval } from 'rxjs';
 
const observable1 = interval(400);
const observable2 = interval(300);
 
const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));
 
subscription.add(childSubscription);
 
setTimeout(() => {
  // subscription 과 childSubscription 둘다 구독해지
  subscription.unsubscribe();
}, 1000);

콘솔에서 아래와 같은 실행결과를 볼 수 있습니다.

second: 0
first: 0
second: 1
first: 1
second: 2

Subscription에는 추가했던 자식 Subscription의 추가를 취소하는 remove(otherSubscription)라는 메소드도 가지고 있습니다.

출처

이글은 RxJS 공식 사이트 Subscription, CC BY 4.0 페이지의 번역물 입니다.

 

'JavaScript > RxJS' 카테고리의 다른 글

RxJS - Scheduler  (1) 2019.11.03
RxJS - Subject  (0) 2019.11.03
Posted by Reiphiel