'AsyncSubject'에 해당되는 글 1건

  1. 2019.11.03 RxJS - Subject
JavaScript/RxJS2019. 11. 3. 17:23
반응형

Subject는 무엇일까요? RxJSSubject는 많은 Observer에게 값을 다중캐스트 할 수 있는 Observable의 특별한 유형중의 하나 입니다. 단순한 Observable이 유니캐스트(구독중인 Observer가 각각 독립적으로 실행되는 Observable을 가짐)인 것에 반해 Subject는 다중캐스트 입니다.

SubjectObservable과 비슷하지만 많은 Observer에게 다중캐스트할 수 있습니다. SubjectEventEmitter(많은 Listener의 목록을 관리)와 유사합니다.

모든 SubjectObservable입니다. 주어진 SubjectObserver를 제공하여 구독함으로서 정상적으로 값을 받기 시작할 수 있습니다. Observer의 관점으로 보면 Observable의 실행이 단순한 유니캐스트 Observable인지 Subject인지 판단할 수 없습니다.

Subject 내부적으로 구독(subscribe를 호출하는 것을 의미)은 값을 전달하는 새로운 실행을 호출하지 않습니다. 다른 라이브러리 혹은 언어의 addListener와 유사하게 단순하게 지정된 Observer를 Observers 목록에 등록할 뿐입니다.

모든 SubjectObserver 관찰자이기도 합니다. next(v), error(e), complete()와 같은 메소드를 가진 오브젝트입니다. 단순히 next(theValue)를 호출해서 Subject에게 새로운 값을 공급함으로서 Subject를 구독하고 있는 등록된 Observer에게 다중캐스트 됩니다.

아래의 예에서는 Subject에 두 개의 Observer가 두 개의 관찰자가 붙여져 있으며 Subject에 몇개의 값을 공급합니다.

import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(1);
subject.next(2);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

SubjectObserver이기도하므로 아래의 예와 같이 Subject를 다른 Observable를 구독하기 위한 인자로 전달할 수 있음을 의미합니다.

import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

const observable = from([1, 2, 3]);

observable.subscribe(subject); // Subject로 subscribe에 인자로 전달할 수 있습니다.

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

위의 접근 방식은 본질적으로 Subject를 통해 유니캐스트를 다중캐스트로 변환한 것입니다. 이것은 Observable의 실행을 여러 Observer에게 공유할 수 있는 유일한 방법이 Subject임을 보여줍니다.

BehaviorSubject, ReplaySubject, 그리고 AsyncSubject라는 몇가지 특화 Subject 유형도 존재합니다.

다중캐스트 Observable

다중캐스트 Observable 은 많은 구독자를 가지고 있을 수 있는 Subject를 통해서 알림을 전달하는 반면에 단순 유니캐스트 Observable은 단일 Observer에게 알림을 전달합니다.

다중캐스트 Observable은 내부의 Subject를 사용해서 복수의 Observer에게 동일한 Observable의 실행을 인식시킵니다.

내부적으로 다중캐스트의 동작방식은 다음과 같습니다. Observer는 하부의 Subject를 구독하고 그 Subject가 원천 Observable을 구독합니다. 다음 예제는 observable.subscribe(subject)를 사용하는 이전 예제와 비슷합니다.

import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));

// 내부적으로 `subject.subscribe({...})` 호출:
multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

// 내부적으로 `source.subscribe(subject)`을 호출:
multicasted.connect();

multicast는 보통의 Observable처럼 보이는 Observable을 반환하지만 구독시에 Subject처럼 동작합니다. multicast는 간단하게 connect() 메소드를 가진 ObservableConnectableObservable을 반환합니다.

connect() 메소드는 공유된 Observable의 실행을 결정하는데 중요합니다. 왜냐하면 connect()는 내부적으로 source.subscribe(subject)을 실행하고 공유된 Observable의 실행을 취소하기 위한 unsubscribe가 가능한 Subscription을 반환합니다.

Reference counting

connect()를 수동으로 호출하고 구독을 처리하는 것을 종종 성가신 일입니다. 보통 첫번째 Observer가 도착하면 연결하고 마지막 Observer가 구독을 해제하면 자동적으로 실행이 취소되기를 바랍니다.

이 목록에 요약 된대로 구독이 발생하는 다음 예를 자세히 확인해 보십시오.

  1. 첫번째 Observer는 다중캐스트된 Observable을 구독합니다.
  2. 다중캐스트된 Observable이 연결됩니다.
  3. 다음 값으로 0 이 첫번째 Observer에게 전달됩니다.
  4. 두번째 Observer가 다중캐스트된 Observable을 구독합니다.
  5. 다음 값 1 이 첫번째 Observer에게 전달됩니다.
  6. 다음 값 1 이 두번째 Observer에게 전달됩니다.
  7. 첫번째 Observer가 다중캐스트된 Observable의 구독을 해지 합니다.
  8. 다음 값 2 이 두번째 Observer에게 전달됩니다.
  9. 두번째 Observer가 다중캐스트된 Observable의 구독을 해지 합니다.
  10. 다중캐스트된 Observable에 연결이 구독 해지됩니다.

connect()를 명시적으로 호출하여 위의 예제를 실현하기 위해서 아래와 같이 코드를 기술했습니다.

import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
  subscription2 = multicasted.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 600);

setTimeout(() => {
  subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
  subscription2.unsubscribe();
  subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

명시적으로 connect()를 호출하는 것을 피하기 위해서 구독자 수를 추적하는 Observable을 반환하는 ConnectableObservablerefCount()(reference counting) 메소드를 사용할 수 있습니다. 구독자수가 0에서 1로 증가할 때 connect()를 호출하고 공유된 실행이 시작됩니다. 구독자 수가 1에서 0으로 감소할때만 완전히 구독이 해지되어 추가적인 실행이 중지됩니다.

refCount는 첫번째 구독자가 도착할 때 다중캐스트된 Observable의 실행을 시작하고 마지막 구독자가 떠날때 실행을 중지합니다.

아래는 예제입니다.

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

setTimeout(() => {
  console.log('observerB subscribed');
  subscription2 = refCounted.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 600);

setTimeout(() => {
  console.log('observerA unsubscribed');
  subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
  console.log('observerB unsubscribed');
  subscription2.unsubscribe();
}, 2000);

// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed

refCount() 메소드는 ConnectableObservable에만 존재하며 다른 ConnectableObservable이 아닌 Observable을 반환합니다.

BehaviorSubject

Subject의 변형 중 하나로 "현재 값"이라는 개념을 가진 BehaviorSubject가 있습니다. 컨슈머에게 내보냈던 가장 최근의 값을 저장하고 새로운 Observer가 구독을 시작할 때마다 "현재 값"을 BehaviorSubject로 부터 즉시 수신할 수 있습니다.

BehaviorSubject는 시간에 따른 값을 나타내는데 유용합니다. 예를 들어 생일에 대한 이벤트 스트림은 Subject이지만 나이의 이벤트 스트림은 BehaviorSubject가 될 수 있습니다.

다음 예에서는 첫번째 Observer가 구독할 때 수신할 값 0으로 BehaviorSubject가 초기화 됩니다. 두번째 Observer 값 2가 전송된 이후에 구독을 시작하더라도 값 2를 수신합니다.

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(3);

// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject

ReplaySubject는 이전 값을 신규 구독자에게 보낼 수 있다는 점에서 BehaviorSubject와 비슷 하지만 Observable의 실행의 일부를 기록하는 것도 가능합니다.

ReplaySubjectObservable 실행으로 부터 여러개의 값을 기록해 두고 이를 새 구독자에게 다시 전송합니다.

ReplaySubject를 생성할 때 몇개의 값을 다시 전송할지 지정할 수 있습니다.

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 새로운 구독자에게 전송할 값의 버퍼를 3으로 지정합니다.

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);

// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

또한 버퍼 사이즈 이외에 유예시간을 밀리 초 단위로 지정해서 기록된 값이 얼마나 유효할지 결정할 수 있습니다. 다음 예제에서 버퍼 사이즈를 100으로 크게 지정했지만 유예시간의 파라미터를 500 밀리 초로 짧게 지정했습니다.

import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

let i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`observerB: ${v}`)
  });
}, 1000);

// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject

AsyncSubjectObservable의 실행이 완료될때 마지막 값만 Observer에게 전달되는 변종입니다.

import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
  next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
subject.complete();

// Logs:
// observerA: 5
// observerB: 5

AsyncSubject는 단일값을 전달하기 위해 완료 통지를 기다린다는 점에서 last()연산자(Observable이 가지고 있는)와 비슷합니다.

출처

이글은 RxJS 공식 사이트 Subject, CC BY 4.0 페이지의 번역물 입니다.

 

'JavaScript > RxJS' 카테고리의 다른 글

RxJS - Scheduler  (1) 2019.11.03
RxJS - Subscription(구독)  (0) 2019.11.03
Posted by Reiphiel