ReactiveX #2 - Observable (구독 대상자)
RxKotlin 이전 글
INDEX
- Observable
- Observable 주요 이벤트 (Callback)
: onSubscribe(), onNext(), onError(), onComplete() - Observable 생성
: 생성 - create(), just(), range(), empty(), interval(), timer(), fromXXX() - Observable 구독 (subscribe)
- Observable 구독해지 (dispose)
- Hot & Cold Observable
Observable
- Observable은 Observer 또는 Consumer가 소비하는 값을 생성하는 구독 대상자가 됩니다
- Observer/Consumer들은 Observable에 구독(subscribe)을 신청
- Observable은 값을 생성한 후에 Consumer들에게 push방식으로 값을 전달
Observable 주요 이벤트
- 주요 이벤트 (Callback 함수들)
- onSubScribe(d :Disposable) : 구독을 신청하면 호출
* Disposable 객체는 Observer가 구독을 해제할 때 사용, dispose() - onNext(item :T) : 값을 발행(emit)할 때 호출하여 값을 넘겨줌
- onError(e :Throwable) : Observable에서 에러 발생 시 호출
- onComplete() : 값을 모두 발행(emit)하면 호출, End
- onSubScribe(d :Disposable) : 구독을 신청하면 호출
- 위 Callback 함수들을 구현해서 Observable에 등록하면 Observable이 전달하는 이벤트(event)를 받음
- 위 Callback 함수들은 Observer Interface에 정의
val observer :Observer<Int> = object :Observer<Int> {
override fun onSubscribe(d :Disposable) { // 구독 시 제일 먼저 호출
println("onSubscribe() - $d")
}
override fun onNext(item :Int) { // Observable가 값을 발행(emit)하면 호출
println("onNext() - $item")
}
override fun onError(e :Throwable) { // Observable 에러 발생 시 호출
println("onError() - ${e.message}")
}
override fun onComplete() { // Observable이 값을 모두 발행(emit)하면 호출
println("onComplete()")
}
}
Observable 생성
- Observable 생성방법
- create() : just()는 선언요소들을 순서대로 발행(emit), create는 직접 onNext()를 호출해줘야 함
- just() : 함수 선언 요소들을 순서대로 발행(emit), 최대 10개까지 값 등록
- range() : 특정 범위만큼 수를 생성하여 전달 ex) range(1..4) -> 1,2,3,4 전달
- empty() : 반환값이 없는 Observable, onSubscribe과 onComplete는 호출
- interval() : 특정 시간 간격으로 0부터 숫자를 증가시켜 값을 발행
- timer() : 특정 시간이 되면 한번만 값을 발행 (value = 0)
- formXXX() : 기존 구조체로부터 Observable을 생성
- fromIterable() - Iterable이 존재하는 Collection, Item을 순서대로 하나씩 전달
- fromCallable() - Callable객체 -> Observable 변경, Callable의 call() 값을 전달
- fromFuture() - Futrue객체 -> Observable 변경, Futurue의 get() 값을 전달
각 함수들에 보이는 "예시보기"를 누르면 펼쳐집니다
create() 함수
: just() 함수는 데이터를 넣으면 자동으로 이벤트가 발생하지만
create()는 just()와 다르게 선언 시 ObservableEmitter를 직접 호출해야 합니다.
( ObservableEmitter ? - onNext, onError, onComplete )
Obsevable을 선언할 때 create 블록이 실행되는게 아니라 Obsevable을 구독(subscribe) 하는 시점에 create 블록이 실행됩니다
// Observable 1 선언, '구독 대상자'
Observable.create<Int> {
it.onNext(1) // 값 방출(emit)
it.onNext(2)
it.onComplete() // 값 방출 종료(complete)
}.subscribeBy(
onNext = { println("first $it - onNext()") },
onComplete = { println("first onComplete()\n") }
)
// Error 예시
Observable.create<Int> {
it.onNext(10) // 값 방출(emit)
it.onNext(20)
it.onError(Exception("Error Now")) // 에러 호출(onError)
}.subscribeBy(
onNext = { println("second $it - onNext()") },
onError = { println("second $it - onError()") },
onComplete = { println("onComplete()") }
)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
first 1 - onNext()
first 2 - onNext()
first onComplete()
second 10 - onNext()
second 20 - onNext()
second java.lang.Exception: Error Now - onError()
: create() 부분에서 선언한 onNext() / onComplete() / onError()에 맞게 잘 구독하는걸 확인 가능
just() 함수
: just()는 받은 인자를 차례대로 순서 맞춰 발행하는 Observable을 생성
List나 Map을 전달받던 객체 자체를 그대로 전달하며, 최대 개수는 10개로 10개 이상 선언 시 예외 발생
fun main() {
// Observer 선언, '구독자'
val observer: Observer<Any> = object : Observer<Any> {
override fun onSubscribe(d: Disposable) = println("onSubscribe() - $d")
override fun onNext(item: Any) = println("onNext() - $item")
override fun onError(e: Throwable) = println("onError() - ${e.message}")
override fun onComplete() = println("onComplete()")
}
val list = listOf(1,2,3,4,5) // List Type
val num = 100 // Int Type
val str = "just Test" // String Type
val map = mapOf(1 to "One", 2 to "Two", 3 to "Three") // Map Type
Observable.just(list, num, str, map) // just() 함수, Observable 생성
.subscribe(observer) // 구독(subscribe)
}
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
onSubscribe() - io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable@66133adc
onNext() - [1, 2, 3, 4, 5]
onNext() - 100
onNext() - just Test
onNext() - {1=One, 2=Two, 3=Three}
onComplete()
: Observable.just()에 4개의 Item (List, Int, String, Map)을 순서대로 선언해서 Observable을 생성
구독(subscribe) 호출되면 그 시점에 just의 item들이 순서대로 onNext로 발행(emit) 됩니다.
중요한 점은 just()의 item 최대 개수는 10개로, 10개의 item까지만 등록이 가능합니다
위 사진을 보시면 just() 함수의 다형성(오버로딩)을 볼 수 있는데, 최대 item10개 까지 정의
range() 함수
: 주어진 값(n) 부터 m개의 Integer 객체를 발행(emit)
Observable.range(5,5)
.subscribeBy(
onNext = { println("$it - onNext()") },
onComplete = { println("onComplete()")}
)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
5 - onNext()
6 - onNext()
7 - onNext()
8 - onNext()
9 - onNext()
onComplete()
: 주어진 값(5) 부터 5개의 Integer객체를 발행
empty() 함수
: 아무값을 전달하지는 않지만 onComplete()를 마지막에 호출해 줍니다.
// Observable 1 선언, '구독 대상자'
Observable.empty<Unit>()
.subscribeBy(
onNext = { println("$it - onNext()") },
onComplete = { println("onComplete()\n") }
)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
onComplete()
: empty()는 아무런 값을 반환하지 않지만 그래도 onSubscribe() 과 onComplete()를 호출
interval() 함수
: 주어진 시간 간격으로 0부터 1씩 증가하는 Long객체를 생성
Observable.interval(100, TimeUnit.MILLISECONDS)
.take(5) // 5개까지만 발행
.subscribeBy(
onNext = { println("$it - onNext()") },
onComplete = { println("onComplete()")}
)
// Observable은 별도의 스레드에서 작동하기 때문에 동작이 완료할 때 까지 main스레드 대기
// Thread.sleep()
sleep(1000)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
0 - onNext()
1 - onNext()
2 - onNext()
3 - onNext()
4 - onNext()
onComplete()
: Observable은 0.1초마다 값을 발행(emit)하고, 종료를 위해 5개까지만 발행설정 (take() 함수)
출력 결과를 보면 5개까지만 값을 발행(emit)하여 onNext가 5번 호출된 걸 확인할 수 있습니다
timer() 함수
: timer()는 interval()과 비슷하지만, 주어진 시간이 지나면 한개의 데이터를 발행하고 onComplete() 이벤트 발생
println(SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Date()))
// 3초뒤에 데이터 발행
Observable.timer(3000L, TimeUnit.MILLISECONDS)
.map{ SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Date()) }
.subscribeBy(
onNext = { println("$it - onNext()") },
onComplete = { println("onComplete()")}
)
// Observable은 별도의 스레드에서 작동하기 때문에 동작이 완료할 때 까지 main스레드 대기
// Thread.sleep()
sleep(5000)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
2020/04/19 02:29:15
2020/04/19 02:29:18 - onNext()
onComplete()
: 주어진 시간(3000=3초)가 지나면 기본적으로 0값을 한번만 전달하고 종료(onComplete)
발행되는 값(0)을 변경하기위해 map() 함수를 사용해서 원하는 값으로 변경 후 전달
fromXXX() 함수
: from을 이용하면 기존 구조체로 부터 Observable을 생성할 수 있습니다.
대표적으로 fromIterable() / formCallable() / fromFuture()가 존재
- fromArray() : Array 내부 데이터를 하나씩 발행하는 Observable 반환
val arr = arrayOf("one", "two", "three")
Observable.fromArray(*arr)
.subscribe { println(it) }
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
one
two
three
- fromIterable() : iterable을 지원하는 구조체를 Observable 형태로 변경합니다
: Collection 아이템이 하나씩 발행(emit)
iterable의 경우 코틀린 Collection의 확장함수(extenstion function) 'toObservable()'를 이용해 Observable을 만들 수 있습니다
(toObservable()함수는 내부적으로 fromIterable()을 이용)
대표적인 Iterable 클래스 - List, ArrayList, ArrayBlockingQueue, HashSet, LinkedList, Stack, TreeSet 등
val names = ArrayList<String>()
names.add("홍길동")
names.add("김루피")
names.add("최상디")
Observable.fromIterable(names)
.subscribeBy (
onNext = { println("onNext() - $it") },
onComplete = { println("onComplete()")}
)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
onNext() - 홍길동
onNext() - 김루피
onNext() - 최상디
onComplete()
- fromCallable() : Callable 객체를 Observable 형태로 변경
: Callable의 call() 함수 return값이 Observer에 전달
val callable = {
sleep(500)
"500ms End sleep !"
}
Observable.fromCallable(callable)
.subscribeBy (
onNext = { println("onNext() - $it") },
onComplete = { println("onComplete()")}
)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
onNext() - 500ms End sleep !
onComplete()
- fromFuture() : Future 객체를 Observable 형태로 변경
: Future의 get() 함수 return값이 Observer에 전달
val future = object : Future<String> {
override fun get() = "Future get()" // 구독(subscribe) 시 전달될 값
override fun get(timeout: Long, unit: TimeUnit) = "Future get(timeOut)"
override fun isDone() = true
override fun cancel(mayInterruptIfRunning: Boolean) = false
override fun isCancelled() = false
}
Observable.fromFuture(future)
.subscribeBy (
onNext = { println("onNext() - $it") },
onComplete = { println("onComplete()")}
)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
onNext() - 500ms End sleep !
onComplete()
Observable 구독
Subscribe
- Observable.subscribe() 메서드로 구독신청
- subscribe() - Observable에서 방출하는 값을 받기위한 함수
- subscribe을 사용한 구독방법은 2가지
- Observer Instance(객체)를 등록
- Instance아닌, 사용자 필요 이벤트만 정의해서 등록
- 먼저 Observable 클래스의 subscribe() Param구조를 보겠습니다
- subscribe() : Disposable (no parameters)
- subscribe(onNext :Consumer!) :Disposable
- subscribe(onNext :Consumer!, onError :Consumer!) :Disposable
- subscribe(onNext :Consumer!, onError :Consumer!, onComplete :Action!) :Disposable
- subscribe(onNext :Consumer!, onError :Consumer!, onComplete :Action!, onSubscribe :Consumer!) :Disposable
- subscribe(observer :Observer)
: 1~5번까지 param 유형은 Observer Instance를 등록하는게 아닌 필요한 이벤트만 정의하는 유형입니다
1~5번까지 보시면 각 Param의 Type에 ! 키워드(Nullable)로 내가 원하는 이벤트만 정의가 가능합니다
6번의 Param유형은 Observer Instance를 등록하는 방식입니다
모든 유형에 마찬가지로 subscribe()은 Disposable Instance를 반환하기 때문에 이 Disposable로 구독해제가 가능
Observer Instance 등록
// Observer 선언, '구독자'
val observer: Observer<Any> = object : Observer<Any> {
// onSubscribe() 재정의
override fun onSubscribe(d: Disposable) {
println("onSubscribe() - $d")
}
// onNext() 재정의
override fun onNext(item: Any) {
println("onNext() - $item")
}
// onError() 재정의
override fun onError(e: Throwable) {
println("onError() - ${e.message}")
}
// onComplete() 재정의
override fun onComplete() {
println("onComplete()\n")
}
}
: Observer Instance를 구현하면 4개의 이벤트를 재정의(Override) 해야하는 불편함이 있지만 재활용이 가능
subscribe 필요 이벤트만 정의
// 예시1 - 4개 정의 (onNext / onError / onComplete / onSubscribe)
Observable.just(1)
.subscribe(
{ println("onNext() - $it") }, // onNext() 이벤트
{ println("onError() - $it") }, // onError() 이벤트
{ println("onComplete()") }, // onComplete() 이벤트
{ println("onSubscribe() - $it") } // onSubscribe() 이벤트
)
// 예시2 - 2개 정의 (onNext / onError)
Observable.just(1)
.subscribe(
{ println("onNext() - $it") }, // onNext() 이벤트
{ println("onError() - $it") } // onError() 이벤트
)
: Observer Instance를 등록하려면 4개의 이벤트를 구현한 객체를 할당해야 하지만
이렇게 Instance가 아닌 필요 메서드만 구현해서 구독을 할수가 있다
구독 해지
- Observer를 subscribe을 통해 구독하면 onSubscribe(d :Disposable) 호출로 Disposable Instance를 전달
- Disposable Interface 구조
public interface Disposable {
// 구독해지 요청 메서드
void dispose();
// 구독해지 상태확인 메서드
boolean isDisposed();
}
방법 1 - Observer를 통한 구독해지
fun main() {
// Observer 선언, '구독자'
val observer: Observer<Long> = object : Observer<Long> {
lateinit var disposable: Disposable // lateinit으로 늦은 초기화
override fun onSubscribe(d: Disposable) {
println("onSubscribe() - $d")
disposable = d // disposable 초기화
}
override fun onNext(item: Long) {
println("onNext() - $item")
if (item >= 10 && disposable.isDisposed == false){ // item 10이상이며 구독중인지? isDisposed()
disposable.dispose() // 구독 해지 dispose()
println("subscribe Dispose!")
}
}
override fun onError(e: Throwable) { println("onError() - ${e.message}") }
override fun onComplete() { println("onComplete()\n") }
}
Observable.interval(100, TimeUnit.MILLISECONDS) // 0.1초마다 값을 발행하는 interval Observable 생성
.subscribe(observer)
Thread() {
Thread.sleep(1500) // 1.5초 Thread Blocking
}.apply {
start()
join()
}
}
- 출력 결과
onSubscribe() - null
onNext() - 0
onNext() - 1
onNext() - 2
onNext() - 3
onNext() - 4
onNext() - 5
onNext() - 6
onNext() - 7
onNext() - 8
onNext() - 9
onNext() - 10
subscribe Dispose!
: onNext()에서 if문으로 검사 (item이 10이상이면서, 구독중인 상태인지(isDisposed))
검사 결과에 들어오면 구독해지
방법2 - subscribe() 반환 instance인 Disposable을 통한 구독해지
fun main() {
// Observable Instance 생성, 0.1초 간격으로 값 발행
val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
// subscribe() 반환 instance = Disposable
val disposable :Disposable = observable.subscribe(
{ println("onNext() - $it")}, // onNext() 정의
{ println("onError() - ${it.message}")}, // onError() 정의
{ println("onComplete()")} // onComplete() 정의
)
Thread() {
Thread.sleep(1200) // 1.2초 Thread 대기
disposable.dispose() // Observable 구독 해지, dispose()
}.apply {
start() // Thread 실행
join() // Thread 종료까지 대기
}
}
- 출력 결과
onNext() - 0
onNext() - 1
onNext() - 2
onNext() - 3
onNext() - 4
onNext() - 5
onNext() - 6
onNext() - 7
onNext() - 8
onNext() - 9
onNext() - 10
onNext() - 11
: 값 발행이 모두 완료된게 아닌 중간에 구독해제 상황이라서 onComplete()는 미호출
Hot & Cold Observable
- 위에 언급한 Observable들은 subscribe(구독) 신청하면 Observable이 데이터를 순서에 맞춰 전부 내보내줌
- 여러번 subscribe을 하더라도 처음부터 순서대로 동일한 데이터를 내보내주는 방식으로
- Observable의 데이터는 subscribe시 소모되는게 아닌, 저장된 값을 구독할 때 마다 전부 발행하는 방법
- 이러한 Observable의 유형을 Cold Observable이라고 합니다
- 반대로 Hot Observable은 데이터의 배출 시점이 subscribe(구독) 시점이 아니라서,
- 늦게 구독하는 Observer는 이전데이터를 받지 못하고 subscribe(구독) 이후의 데이터부터 받게 됩니다
Cold Observable
- 구독자가 subscribe(구독) 시점에 데이터를 배출
- 처음부터 모든 데이터가 순서대로 배출 (모든 데이터 수신보장)
- 여러번 subscribe(구독)에도 동일 데이터 배출 (데이터 배출 시 소모X)
ex) Cold Observable - 웹 요청, DB관련 요청
Hot Observable
- subscribe(구독)과 상관없이 데이터를 배출
- subscribe(구독) 시점 이후 데이터부터 전달받으며, 구독 이전의 데이터 수신X (모든 데이터 수신보장 X)
- Event를 전달받는 형태로 사용
ex) Hot Observable - 센서 이벤트, 입력 이벤트
ConnectableObservable
: Hot Observable 중 하나로 connect() 함수를 호출하면 배출을 시작하는 Observable
Observer는 subscribe(구독)와 상관없이 구독 신청 시점부터 데이터를 전달받습니다
publish() 함수 - Cold Observable -> Hot Observable 변환
// publish() 함수로 ConnectableObservable 생성
val connectableObservable = (1..4).toObservable().publish()
// 1번째 Observer 구독(subscribe)
connectableObservable .subscribe { println("first Observer - $it")}
println("Add first Observer")
// 2번째 Observer 구독(subscribe)
connectableObservable .subscribe { println("second Observer - $it")}
println("Add second Observer")
// connect() - Observable 데이터 발행
connectableObservable.connect()
// 3번째 Observer 구독(subscribe)
connectableObservable .subscribe { println("third Observer - $it")}
println("Add third Observer")
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
Add first Observer
Add second Observer
first Observer - 1
second Observer - 1
first Observer - 2
second Observer - 2
first Observer - 3
second Observer - 3
first Observer - 4
second Observer - 4
Add third Observer
: ConnectableObservable에 총 3개의 Observer를 구독(subscribe)한 예시입니다.
1,2번째의 Observer를 구독하고 connect() 호출 시점에 값이 발행되고,
그 이후 3번째 Observer가 구독(subscribe)을 했는데 이미 connect()값이 모두 발행된 후라 결과를 못받는걸 확인
subscribe() 시점에 값이 발행되지 않는다는 것이 중요합니다
다른 예로, interval()로 지속발행하는 Observable을 만들어 보겠습니다
// interval() - 0.1초 마다 값 발행, publish() - Hot Observable 변환
val connectableObservable = Observable.interval(100, TimeUnit.MILLISECONDS).publish()
// 1번째 Observer 구독
connectableObservable.subscribe { println("first Observer - $it") }
// connect() - Hot Observable 값 발행 시점
connectableObservable.connect()
// 0.3초 대기 (Thread 점유)
runBlocking { delay(300) }
// 2번째 Observer 구독
connectableObservable.subscribe { println("second Observer - $it") }
// 0.3초 대기 (Thread 점유)
runBlocking { delay(300) }
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
first Observer - 0
first Observer - 1
first Observer - 2
first Observer - 3
second Observer - 3
first Observer - 4
second Observer - 4
first Observer - 5
second Observer - 5
: 이번엔 interval()로 0.1초 마다 지속적으로 값을 발행하는 Observable을 구현.
publish()로 Hot Observable로 변환했기 때문에 Observable의 값 발행시점은 subscribe()가 아닌 connect() 시점
connect() 시점부터 값을 발행해서 1번째 Observer는 0부터 값을 수신하였고
delay(300)으로 인해 0.3초 지연됐던 2번째 Observer는 3부터 값을 수신한 것을 확인할 수 있습니다