Subject
는 무엇일까요? RxJS 의 Subject
는 많은 Observer
에게 값을 다중캐스트 할 수 있는 Observable
의 특별한 유형중의 하나 입니다. 단순한 Observable
이 유니캐스트(구독중인 Observer
가 각각 독립적으로 실행되는 Observable
을 가짐)인 것에 반해 Subject는 다중캐스트 입니다.
Subject
는 Observable
과 비슷하지만 많은 Observer
에게 다중캐스트할 수 있습니다. Subject
는 EventEmitter
(많은 Listener의 목록을 관리)와 유사합니다.
모든 Subject
는 Observable
입니다. 주어진 Subject
에 Observer
를 제공하여 구독함으로서 정상적으로 값을 받기 시작할 수 있습니다. Observer
의 관점으로 보면 Observable
의 실행이 단순한 유니캐스트 Observable
인지 Subject
인지 판단할 수 없습니다.
Subject
내부적으로 구독(subscribe를 호출하는 것을 의미)은 값을 전달하는 새로운 실행을 호출하지 않습니다. 다른 라이브러리 혹은 언어의 addListener
와 유사하게 단순하게 지정된 Observer
를 Observers 목록에 등록할 뿐입니다.
모든 Subject
는 Observer
관찰자이기도 합니다. next(v)
, error(e)
, complete()
와 같은 메소드를 가진 오브젝트입니다. 단순히 next(theValue)
를 호출해서 Subject
에게 새로운 값을 공급함으로서 Subject
를 구독하고 있는 등록된 Observer
에게 다중캐스트 됩니다.
아래의 예에서는 Subject
에 두 개의 Observer
가 두 개의 관찰자가 붙여져 있으며 Subject
에 몇개의 값을 공급합니다.
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
Subject
는 Observer
이기도하므로 아래의 예와 같이 Subject
를 다른 Observable
를 구독하기 위한 인자로 전달할 수 있음을 의미합니다.
import { Subject, from } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject); // Subject로 subscribe에 인자로 전달할 수 있습니다.
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
위의 접근 방식은 본질적으로 Subject
를 통해 유니캐스트를 다중캐스트로 변환한 것입니다. 이것은 Observable
의 실행을 여러 Observer
에게 공유할 수 있는 유일한 방법이 Subject
임을 보여줍니다.
BehaviorSubject
, ReplaySubject
, 그리고 AsyncSubject
라는 몇가지 특화 Subject
유형도 존재합니다.
다중캐스트 Observable
다중캐스트 Observable
은 많은 구독자를 가지고 있을 수 있는 Subject
를 통해서 알림을 전달하는 반면에 단순 유니캐스트 Observable
은 단일 Observer
에게 알림을 전달합니다.
다중캐스트
Observable
은 내부의Subject
를 사용해서 복수의Observer
에게 동일한Observable
의 실행을 인식시킵니다.
내부적으로 다중캐스트의 동작방식은 다음과 같습니다. Observer
는 하부의 Subject
를 구독하고 그 Subject
가 원천 Observable
을 구독합니다. 다음 예제는 observable.subscribe(subject)
를 사용하는 이전 예제와 비슷합니다.
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = from([1, 2, 3]);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
// 내부적으로 `subject.subscribe({...})` 호출:
multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
// 내부적으로 `source.subscribe(subject)`을 호출:
multicasted.connect();
multicast
는 보통의 Observable
처럼 보이는 Observable
을 반환하지만 구독시에 Subject
처럼 동작합니다. multicast
는 간단하게 connect()
메소드를 가진 Observable
인 ConnectableObservable
을 반환합니다.
connect()
메소드는 공유된 Observable
의 실행을 결정하는데 중요합니다. 왜냐하면 connect()
는 내부적으로 source.subscribe(subject)
을 실행하고 공유된 Observable
의 실행을 취소하기 위한 unsubscribe
가 가능한 Subscription
을 반환합니다.
Reference counting
connect()
를 수동으로 호출하고 구독을 처리하는 것을 종종 성가신 일입니다. 보통 첫번째 Observer
가 도착하면 연결하고 마지막 Observer
가 구독을 해제하면 자동적으로 실행이 취소되기를 바랍니다.
이 목록에 요약 된대로 구독이 발생하는 다음 예를 자세히 확인해 보십시오.
- 첫번째
Observer
는 다중캐스트된Observable
을 구독합니다. - 다중캐스트된
Observable
이 연결됩니다. - 다음 값으로 0 이 첫번째
Observer
에게 전달됩니다. - 두번째
Observer
가 다중캐스트된Observable
을 구독합니다. - 다음 값 1 이 첫번째
Observer
에게 전달됩니다. - 다음 값 1 이 두번째
Observer
에게 전달됩니다. - 첫번째
Observer
가 다중캐스트된Observable
의 구독을 해지 합니다. - 다음 값 2 이 두번째
Observer
에게 전달됩니다. - 두번째
Observer
가 다중캐스트된Observable
의 구독을 해지 합니다. - 다중캐스트된
Observable
에 연결이 구독 해지됩니다.
connect()
를 명시적으로 호출하여 위의 예제를 실현하기 위해서 아래와 같이 코드를 기술했습니다.
import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;
subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();
setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);
setTimeout(() => {
subscription1.unsubscribe();
}, 1200);
// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);
명시적으로 connect()
를 호출하는 것을 피하기 위해서 구독자 수를 추적하는 Observable
을 반환하는 ConnectableObservable
의 refCount()
(reference counting) 메소드를 사용할 수 있습니다. 구독자수가 0에서 1로 증가할 때 connect()
를 호출하고 공유된 실행이 시작됩니다. 구독자 수가 1에서 0으로 감소할때만 완전히 구독이 해지되어 추가적인 실행이 중지됩니다.
refCount
는 첫번째 구독자가 도착할 때 다중캐스트된Observable
의 실행을 시작하고 마지막 구독자가 떠날때 실행을 중지합니다.
아래는 예제입니다.
import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';
const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;
// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);
setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);
// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);
// Logs
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed
refCount()
메소드는 ConnectableObservable
에만 존재하며 다른 ConnectableObservable
이 아닌 Observable
을 반환합니다.
BehaviorSubject
Subject
의 변형 중 하나로 "현재 값"이라는 개념을 가진 BehaviorSubject
가 있습니다. 컨슈머에게 내보냈던 가장 최근의 값을 저장하고 새로운 Observer
가 구독을 시작할 때마다 "현재 값"을 BehaviorSubject
로 부터 즉시 수신할 수 있습니다.
BehaviorSubject
는 시간에 따른 값을 나타내는데 유용합니다. 예를 들어 생일에 대한 이벤트 스트림은 Subject
이지만 나이의 이벤트 스트림은 BehaviorSubject
가 될 수 있습니다.
다음 예에서는 첫번째 Observer
가 구독할 때 수신할 값 0으로 BehaviorSubject
가 초기화 됩니다. 두번째 Observer
값 2가 전송된 이후에 구독을 시작하더라도 값 2를 수신합니다.
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
ReplaySubject
ReplaySubject
는 이전 값을 신규 구독자에게 보낼 수 있다는 점에서 BehaviorSubject
와 비슷 하지만 Observable
의 실행의 일부를 기록하는 것도 가능합니다.
ReplaySubject
는 Observable
실행으로 부터 여러개의 값을 기록해 두고 이를 새 구독자에게 다시 전송합니다.
ReplaySubject
를 생성할 때 몇개의 값을 다시 전송할지 지정할 수 있습니다.
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 새로운 구독자에게 전송할 값의 버퍼를 3으로 지정합니다.
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
또한 버퍼 사이즈 이외에 유예시간을 밀리 초 단위로 지정해서 기록된 값이 얼마나 유효할지 결정할 수 있습니다. 다음 예제에서 버퍼 사이즈를 100으로 크게 지정했지만 유예시간의 파라미터를 500 밀리 초로 짧게 지정했습니다.
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);
// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...
AsyncSubject
AsyncSubject
는 Observable
의 실행이 완료될때 마지막 값만 Observer
에게 전달되는 변종입니다.
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
AsyncSubject
는 단일값을 전달하기 위해 완료 통지를 기다린다는 점에서 last()
연산자(Observable
이 가지고 있는)와 비슷합니다.
출처
이글은 RxJS 공식 사이트 Subject, CC BY 4.0 페이지의 번역물 입니다.
'JavaScript > RxJS' 카테고리의 다른 글
RxJS - Scheduler (1) | 2019.11.03 |
---|---|
RxJS - Subscription(구독) (0) | 2019.11.03 |