본문 바로가기

[RxKotlin] ReactivX #3 - Subject

ReactivX #3 - Subject

 

RxKotlin 이전 글

  1. ReactiveX #1 - 'ReactiveX 란?'
  2. ReactiveX #2 - Observable

 

 

Subject

  • publish() 처럼 Cold Observable -> Hot Observable 변환하는 클래스 
  • Subject는 Observable + Observer (Observable이면서Observer의 역할을 수행)

    • Observer 역할 - 다른 Observable에게 구독하여 item 수신
    • Observable 역할 - Observable에게 수신받은 item 재배출 또는 새로운 item 배출
  • Subject의 4가지 종류

    • PublishSubject, BehaviorSubject, AsyncSubject, ReplaySubject

 

각 함수들의 설명/사용방법은 접은글로 접어놨습니다.

각 함수들에 보이는 "예시보기"를 누르면 펼쳐집니다


PublishSubject

: 등록 시점 이후부터 데이터를 수신하는 Subject

더보기

 PublishSubject.create<T>()

출처 http://reactivex.io/documentation/ko/subject.html

// Cold Observable, 0.1초마다 값 생성 (subscribe 호출 시 발행시작)
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)	

val subject = PublishSubject.create<Long>()	// PublishSubject 생성, Observable + Observer Type

observable.subscribe(subject)			// subject가 구독(subject - Observer역할)
runBlocking { delay(300) }			// 0.3초 대기

subject.subscribe { println("1st : $it") }	// subject에 1번째 Observer 등록, (subject - Observable역할)
runBlocking { delay(300) }			// 0.3초 대기

subject.subscribe { println("2st : $it") }	// subject에 2번째 Observer 등록, (subject - Observable역할)
runBlocking { delay(300) }			// 0.3초 대기

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1st : 3
1st : 4
1st : 5
1st : 6
2st : 6
1st : 7
2st : 7
1st : 8
2st : 8


: observable은 interval()로 0.1초마다 지속적으로 값을 생성하는 Cold Observable, subscribe() 시 발행

subject가 observable을 subscribe(구독) 하는 시점부터 observable의 값이 발행 시작

이후 0.3초 이후에 1번째 Observer를 subject에 등록하면, 3번째 데이터부터 수신

이후 0.3초 이후에 2번째 Observer를 subject에 등록하면, 6번째 데이터부터 수신

-> observable은 Cold Observable인데, subject를 통해 Hot Observable처럼 동작하는 예시

 


BehaviorSubject

: 등록 시점 이전에 배출된 직전값 하나를 전달받고 시작

구독(subscribe) 시점에 데이터가 아직 없으면 기본값을 전달

더보기

BehaviorSubject.createDefault(defaultValue) - 데이터가 아직 없으면 Default값 전달

 

출처 http://reactivex.io/documentation/ko/subject.html

val subject = BehaviorSubject.createDefault("5")
subject.subscribe { data -> println("Subscriber #1 => $data") }
subject.onNext("1")
subject.onNext("2")

subject.subscribe { data -> println("Subscriber #2 => $data") }
subject.onNext("3")
subject.onComplete()

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

Subscriber #1 => 5
Subscriber #1 => 1
Subscriber #1 => 2
Subscriber #2 => 2
Subscriber #1 => 3
Subscriber #2 => 3


: PublishSubject에서 수신했던 값과 다르게 바로 직전의 값을 수신해서 출력하는 것을 볼 수 있습니다.

1번째 Subscriber는 onNext 호출전이라 데이터가 없으므로 기본값 5를 처음 수신

2번째 Subscriber는 구독(subscribe) 이전의 데이터 2를 먼저 수신


AsyncSubject

: AsyncSubject는 Observable의 마지막값한번만 배출

마지막 값? onComplete() 이후 가장 최신 데이터 

더보기

 위와 동일한 예제로 사용할 경우 interval() 함수는 마지막 값이란 존재하지 않기 때문에 테스트가 불가

interval()은 정해진 시간 간격마다 계속 값을 발행하기 때문에 마지막 개념 X

interval 대신 just를 이용하여 지정된 갯수를 갖는 Observable을 예시로 사용

출처 http://reactivex.io/documentation/ko/subject.html

// Cold Observable, 1부터 10개의 값 발행
val observable = Observable.range(1,10)

// AsyncSubject 생성
val subject = AsyncSubject.create<Int>()

// observable 값 발행시작
observable.subscribe(subject)			

subject.subscribe { println("1st : $it") }	
runBlocking { delay(300) }			

subject.subscribe { println("2st : $it") }

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1st : 10
2st : 10


:  AsyncSubject에 0.3초 간격을 두고 2개의 Observer를 구독신청 하였습니다.

결과로는 2개의 Observer 모두 10값을 수신합니다

AsyncSubject는 쉽게 Observable의 마지막 발행된 값을 저장하고

이후 subscrbie() 구독 시 저장한 마지막 값을 발행해주는 Subject

 


ReplaySubject

: Cold Observable과 비슷하게 등록 시점 이전값모두 수신받은 후 새로 배출되는 값을 전달 받습니다

더보기

 ReplaySubject.create<T>() - 생성 함수

출처 http://reactivex.io/documentation/ko/subject.html

// Cold Observable, 0.1초마다 값 생성 (subscribe 호출 시 발행시작)
val observable = Observable.interval(100,TimeUnit.MILLISECONDS)

val subject = ReplaySubject.create<Long>()	// ReplaySubject, Observable + Observer Type

observable.subscribe(subject)			// subject가 구독(subject - Observer역할)
runBlocking { delay(200) }			// 0.2초 대기

subject.subscribe { println("1st : $it") }	// subject에 1번째 Observer 등록, (subject - Observable역할)
runBlocking { delay(200) }			// 0.2초 대기

subject.subscribe { println("2st : $it") }	// subject에 2번째 Observer 등록, (subject - Observable역할)
runBlocking { delay(200) }			// 0.2초 대기

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1st : 0
1st : 1
1st : 2
1st : 3
2st : 0
2st : 1
2st : 2
2st : 3
1st : 4
2st : 4
1st : 5
2st : 5


: observable은 interval()로 0.1초마다 지속적으로 값을 생성하는 Cold Observable, subscribe() 시 발행

subject가 observable을 subscribe(구독) 하는 시점부터 observable의 값이 발행 시작

이후 200ms 이후에 1번째 Observer를 subject에 등록하면, 이전값(0,1) 모두 수신

이후 200ms 이후에 2번째 Observer를 subject에 등록하면, 이전값(0,1,2,3) 모두 수신