본문 바로가기

[RxKotlin] ReactiveX #6 - 디버깅 / 예외처리

ReactiveX #6 - 디버깅 / 예외처리

 

 

RxKotlin 이전 글

  1. ReactiveX #1 - 'ReactiveX 란?'
  2. ReactiveX #2 - Observable
  3. ReactiveX #3 - Subject
  4. ReactiveX #4 - 연산자 'Operator'
  5. ReactiveX #5 - Scheduler '병렬처리'

 

 

INDEX

  1. 디버깅(Debugging)
  2. doOnXXX()
  3. 예외 처리

 

 

 

디버깅

  • RxJava / RxKotlintry-catch문을 사용할 수 없고 로그를 넣을 수 있는 공간이 없기 때문에

    doOnXXX() 계열의 함수를 이용하여 강제로 부수 효과를 일으켜 디버깅(Debugging) 처리

 

 

 

doOnXXX()

  • 강제로 부수효과를 일으켜 버그를 알 수 있게 하는 함수들 (중간 연산자)
  • doOnNext() : 데이터 발행(onNext) 시 이벤트 발생
  • doOnComplete() : 모든 데이터가 발행(onComplete) 시 이벤트 발생
  • doOnError() : 중간에 에러가 발생(onError) 시 이벤트 발생
  • doOnEach() : doOnNext() + doOnComplete() + doOnError()를 한번에 처리하는 이벤트
  • doOnTerminate() : Observable이 끝나는 onComplete 또는 onError 시 이벤트 발생 함수
  • doOnSubscribe() : Observable을 구독(subscribe) 시  이벤트 발생
  • doOnDispose() : Observable의 구독 해지(dispose) 시 이벤트 발생
  • doOnLifeCycle() : doOnSubscribe() + doOnDispose() 한번에 처리하는 이벤트

 

각 함수들의 설명/사용방법은 접은글로 접어놨습니다.

각 함수들에 보이는 "예시보기"를 누르면 펼쳐집니다


doOnNext() doOnComplete() doOnError() 함수

더보기
fun main(args: Array<String>) = runBlocking<Unit> {

    val observer: Observer<Int> = object : Observer<Int> {
        override fun onSubscribe(d: Disposable) {
            println("onSubscribe()")
        }

        override fun onNext(t: Int) {
            println("onNext() - $t")
        }

        override fun onError(e: Throwable) {
            println("onError() - ${e.message}")
        }

        override fun onComplete() {
            println("onComplete()")
        }
    }

    Observable.just(10,5,0)
        .doOnNext { item -> println("doOnNext() - $item") }
        .doOnError { error -> println("doOnError() - ${error.message}") }
        .doOnComplete { println("doOnComplete()") }
        .subscribe(observer)


    delay(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

onSubscribe()
doOnNext() - 10
onNext() - 10
doOnNext() - 5
onNext() - 5
doOnNext() - 0
onNext() - 0
doOnComplete()
onComplete()


: Observer이 수신하는 이벤트와 doOnXXX() 함수의 호출순서를 보기 위해 Observer 객체를 등록

결과를 보면 Observer객체보다 Observable의 doOnXXX() 함수가 먼저 호출되는 것을 확인

 

Observable.just(10,5,0)
    .map {
    	100 / it
    }
    .doOnNext { item -> println("doOnNext() - $item") }
    .doOnError { error -> println("doOnError() - ${error.message}") }
    .doOnComplete { println("doOnComplete()") }
    .subscribe(observer)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

onSubscribe()
doOnNext() - 10
onNext() - 10
doOnNext() - 20
onNext() - 20
doOnError() - / by zero
onError() - / by zero


 : 이번에는 Error를 발생시킨 예시입니다 100의 숫자를 0으로 나누려 하는 부분에서 Error 발생

 


doOnEach() 함수

: onNext, onComplete, onError 이벤트를 한번에 처리하는 함수

더보기
fun main(args: Array<String>) = runBlocking<Unit> {
    Observable.just(10,5,0)
        .doOnEach { // Notification 객체로 전달
            if (it.isOnNext) println("doOnNext() - ${it.value}")
            if (it.isOnError) println("doOnError() - ${it.error?.message}")
            if (it.isOnComplete) println("doOnComplete()")
        }
        .subscribe{ println("$it")}

    delay(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

doOnNext() - 10
10
doOnNext() - 5
5
doOnNext() - 0
0
doOnComplete()


: doOnEach() 함수는 doOnNext + doOnError + doOnComplete 함수를 한번에 처리가 가능

대신 무슨 이벤트인지 구분처리가 필요 (isOnNext(), isOnError(), isOnComplete() )

doOnEach()에 전달되는 객체는 Notification 객체로 value() 또는 error.message()로 처리

 


doOnSubscribe() 함수, doOnDispose() 함수

더보기
fun main(args: Array<String>) = runBlocking<Unit> {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
        .doOnSubscribe { println("doOnSubscribe()") }
        .doOnDispose { println("doOnDispose()") }
        .subscribe { println(it) }

    delay(300)
    observable.dispose()
    delay(300)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

doOnSubscribe()
0
1
2
doOnDispose()


: subscribe() 호출 시와 dispose()시 출력되는 걸 확인 가능

 


doOnLifeCycle() 함수

: doOnSubscribe() + doOnDispose() 함수를 한번에 호출하는 함수

더보기
fun main(args: Array<String>) = runBlocking<Unit> {
    val observable = Observable.interval(100, TimeUnit.MILLISECONDS)
        .doOnLifecycle(				// 2개의 파라미터 (doOnSubscribe, doOndispose)
            { println("doOnSubscribe()")},	// doOnSubscribe
            { println("doOnDispose()")}		// doOnDispose
        )
        .subscribe { println(it) }

    delay(300)
    observable.dispose()
    delay(300)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

doOnSubscribe()
0
1
2
doOnDispose()


: 위 예시와 동일하게 출력하는걸 확인 가능

 


doOnterminate() 함수

: Observable이 끝나는 상태 (onComplete 또는 onError) 바로 직전에 실행되는 함수

doOnError(), doOnComplete() 호출 직전에 먼저 호출되는 의미

더보기
fun main(args: Array<String>) = runBlocking<Unit> {
    Observable.just(1,2,3)
        .doOnTerminate { println("doOnTerminate()") }
        .doOnError { println("doOnError() - ${it.message}") }
        .doOnComplete { println("doOnComplete()") }
        .subscribe { println(it) }

    delay(1000)
}


ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1
2
3
doOnTerminate()
doOnComplete()


: doOnComplete()가 호출되기 직전에 doOnTerminate()가 호출되는 것을 확인

 

 

예외 처리

  • RxJava / RxKotlin은 try-catch문을 이용하여 예외처리를 할 수 없습니다

    기존과 다른 방식을 이용
  • onErrorReturn() / onErrorReturnItem() : 에러(Error) 발생 시 에러 의미하는 다른 데이터로 전달(onNext())
  • onErrorResumeNext() : 에러(Error) 발생 시 다른 Observable로 대체하여 전달하는 함수
  • retry() : Observable 실행 중 에러 발생 시 재시도 함수
  • retryUntil() : 에러 발생 시 조건이 만족할때 까지 재시도하는 함수, retry { ... } 와 비슷한 형태

 

 


onErrorReturn() 함수

: RxJava / RxKotlin에서는 에러(Error)어떠한 데이터로 보는것이 적절

그렇기에 예외 발생 시, 에러를 의미하는 다른 데이터로 변환 후 onNext를 전송하는게 적절

더보기
fun main(args: Array<String>) = runBlocking<Unit> {

    val numbers = listOf("100", "200", "$300", "400", "500")
    Observable.fromIterable(numbers)
        .map { item ->
            Integer.parseInt(item)
        }
        .onErrorReturn { error ->
            if (error is NumberFormatException) {
                error.printStackTrace()
            }
            return@onErrorReturn -1         // 에러를 의미하는 -1 전달
        }
        .subscribe { item ->
            if (item < 0) println("Error!") else println("item is $item")
        }

    delay(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

item is 100
item is 200
Error!
java.lang.NumberFormatException: For input string: "$300"
	at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68)
	...
	at TestKt.main(test.kt:22)


: subscriber가 에러를 의미하는 0보다 작은 값 (-1)을 수신하면 Error를 출력하는 걸 확인

onErrorReturn()은 Error 객체를 전달받은 후 subscriber에게 에러를 의미하는 다른데이터로 전달하는데 의미가 있는 함수

 


onErrorReturnItem() 함수

: onErrorReturn()와 비슷하지만 중간에 에러를 넘기는 부분 구현 생략이 가능하기에 좀더 간결하게 표현

더보기
fun main(args: Array<String>) = runBlocking<Unit> {

    val numbers = listOf("100", "200", "$300", "400", "500")
    Observable.fromIterable(numbers)
        .map { item ->
            Integer.parseInt(item)
        }
        .onErrorReturnItem(-1)
        .subscribe { item ->
            if (item < 0) println("Error!") else println("item is $item")
        }

    delay(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

item is 100
item is 200
Error!

 


onErrorResumeNext() 함수

: onErrorResumeNext()는 에러 발생 시 원하는 Observable대체하여 전달하는 함수

더보기

 

fun main(args: Array<String>) = runBlocking<Unit> {

    val numbers = listOf("100", "200", "$300", "400", "500")
    
    val onParseError = Observable.defer {
        println("Change Observable !!")
        Observable.just(-1)
    }.subscribeOn(Schedulers.io())
    
    Observable.fromIterable(numbers)
        .map { item ->
            Integer.parseInt(item)
        }
        .onErrorResumeNext(onParseError)
        .subscribe { item ->
            if (item < 0) println("Error!") else println("item is $item")
        }

    delay(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

item is 100
item is 200
Change Observable !!
Error!


: 에러(Error)가 발생하면 onErrorResumeNext()에 설정된 Observable대체되어 발행합니다

에러 발생 시 자원반납 등의 용도로 사용이 가능

 


retry() 함수

: Observable 실행 중 에러(Error) 발생 시 재시도하는 함수

보통 네트워크 연결 실패 시 재요청 같은 경우 때 사용합니다

더보기
fun main(args: Array<String>) = runBlocking<Unit> {

    val ob = Observable.range(1, 10)
        .map {
            if (it == 3) throw Exception("Error !") else it
        }

    println("retry(2) Start !")
    
    ob.retry(2)
        .subscribeBy(
            onNext = { println("onNext() - $it") },
            onError = { println("onError() - ${it.message}") }
        )

    println("\nretry { retryCount, e } Start !")
    
    ob.retry { retryCount, e ->
        println("retry Count : $retryCount")
        if (retryCount > 2) false else true
    }
        .subscribeBy(
            onNext = { println("onNext() - $it") },
            onError = { println("onError() - ${it.message}") }
        )
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

retry(2) Start !
onNext() - 1
onNext() - 2
onNext() - 1
onNext() - 2
onNext() - 1
onNext() - 2
onError() - Error !

retry { count, e } Start !
onNext() - 1
onNext() - 2
retry Count : 1
onNext() - 1
onNext() - 2
retry Count : 2
onNext() - 1
onNext() - 2
retry Count : 3
onError() - Error !

Process finished with exit code 0


: 1부터 10개의 값을 발행하는 Observable이 값이 3이면 의도적으로 예외를 발생하는 코드입니다

retry() 함수의 사용방법은 2가지로 분류합니다

   1) retry(반복회수) : 단순히 반복회수 만큼 반복을 정의

    2) retry { retryCount, Throwable -> Boolean } : 람다식으로 재시도 여부를 판단하여 실행

 

true 반환 - 재시도, false 반환 - 중단 의미

 

retry(2)

  : 재시도(retry) 회수를 지정해놓은 경우 2번은 자동으로 true를 반환하여 재시도를 하며,

    2번 이후에는 false를 반환하여 재시도를 중단하고 onError()를 호출합니다

 

retry { retryCount, e -> if (retryCount > 2) false else true }

   : 위 경우는 자동으로 실패하면 현재까지 재시도 횟수(Count)와 Error객체를 전달해주고

     사용자가 판단로직을 구현하여 true(재시도) false(중단)의 반환값을 직접 선언해줘야 합니다

    예시에서는 재시도 회수가 2번 이상일 경우 false를 반환해 재시도를 중단하고 onError를 호출합니다

 


retryUntil() 함수

: retryUntil()retry { ... } 람다식의 형태랑 비슷한 함수입니다

조건이 만족할때 까지 재시도 하는 함수

더보기
fun main(args: Array<String>) = runBlocking<Unit> {

    val ob = Observable.range(1, 3)
        .map {
            if (it == 3) throw Exception("Error !") else it
        }

    var retryCount = 0
    ob.retryUntil {
        if (retryCount == 3) {
            true
        } else {
            retryCount++
            println("Retry - $retryCount \n")
            false
        }
    }
        .subscribeBy(
        onNext = { println("onNext() - $it") },
        onError = { println("onError() - ${it.message}") }
    )
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

onNext() - 1
onNext() - 2
Retry - 1

onNext() - 1
onNext() - 2
Retry - 2

onNext() - 1
onNext() - 2
Retry - 3

onNext() - 1
onNext() - 2
onError() - Error !


: retry { retryCount, Throwable -> Boolean } 람다식 형태의 retry() 함수와 비슷한 형태입니다