본문 바로가기

[RxKotlin] ReactiveX #5 - Scheduler '병렬처리'

ReactiveX #5 - Scheduler '병렬처리'

 

RxKotlin 이전 글

  1. ReactiveX #1 - 'ReactiveX 란?'
  2. ReactiveX #2 - Observable
  3. ReactiveX #3 - Subject
  4. ReactiveX #4 - 연산자 'Operator'

 

 

INDEX

 

 

 

Scheduler '스케쥴러'

  • Observable은 기본적으로 호출하는 Thread에서 동작

     : subscribe{ ... } 블럭에서 모든 데이터를 수신받고 처리해야지만 해당 블럭을 벗어나는 것을 의미

       ( interval(), timer()로 생성한 Observable 예외 - 계산 스케쥴러 사용 (Scheduler.Computation)
  • Scheduler(스케쥴러)는 Observable의 Thread를 변경(지정) 할 수 있는 역할을 합니다
  • subscribeOn() + observeOn() 함수를 모두 지정

    : 데이터 흐름 발생 스레드(Observable)와 결과를 전달하는 스레드(Observer) 분리 가능
  • 스케쥴러를 별도로 지정하지 않으면 기본적으로 현재(main) Thread 동작

 

 

Schedulers

  • Scheduler.io()

     : 파일 / Network IO 작업을 할 때 사용하는 스케쥴러
     
       내부적으로 CachedPool 사용하므로 Thread가 동시에 계속 늘어나면서 생성할 수 있으며,

       유후 Threa 존재 시 재활용 
  • Scheduler.computation()

     : CPU 의존적인 계산을 수행 위한 Thread pool을 사용하는 스케쥴러


       CPU 코어 개수만큼 Thread pool을 만들어서 사용
  • Scheduler.newThread()

     : new Thread() 처럼 새로운 Thread를 하나 만들어 사용
  • Scheduler.single()

     : singleThreadPool 사용, 해당 Scheduler로 여러 작업 수행 시 Queuing 되어 순서가 보장 
  • Scheduler.trampoline()

     : 호출을 수행한 Thread를 이용하여 수행

       호출한 Thread 역시 단일 Thread 이므로 여러 작업 요청 시 Queuing 되어 순서가 보장

       단, 호출한 Thread를 사용하기 때문에 Queuing된 모든 작업이 종료되어야 다음 코드라인 실행
  • Scheduler.from()

     : Executor를 전달하여 새로운 Scheduler를 생성

 

fun main() {

    val ob = Observable.just(1)

    ob.subscribeOn(Schedulers.io())
        .subscribe { println("Schedulers.io() - ${Thread.currentThread().name}") }

    ob.subscribeOn(Schedulers.computation())
        .subscribe { println("Schedulers.computation() - ${Thread.currentThread().name}") }

    ob.subscribeOn(Schedulers.newThread())
        .subscribe { println("Schedulers.newThread() - ${Thread.currentThread().name}") }

    ob.subscribeOn(Schedulers.single())
        .subscribe { println("Schedulers.single() - ${Thread.currentThread().name}") }

    ob.subscribeOn(Schedulers.trampoline())
        .subscribe { println("Schedulers.trampoline() - ${Thread.currentThread().name}") }

    val executor = Executors.newFixedThreadPool(2)
    val customScheduler = Schedulers.from(executor)
    ob.subscribeOn(customScheduler)
        .subscribe { println("Schedulers.from() - ${Thread.currentThread().name}") }

    sleep(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

Schedulers.io() - RxCachedThreadScheduler-1
Schedulers.computation() - RxComputationThreadPool-1
Schedulers.newThread() - RxNewThreadScheduler-1
Schedulers.single() - RxSingleScheduler-1
Schedulers.trampoline() - main
Schedulers.from() - pool-2-thread-1

 

 


Schedulers.newThread()

: 요청을 받을 때마다 새로운 스레드생성하여 작업

val ob = Observable.just(1, 2, 3)

ob.subscribeOn(Schedulers.newThread())
.subscribe {
    runBlocking { delay(100) }
    println("$it: Observable#1 - ${Thread.currentThread().name}")
}

ob.subscribeOn(Schedulers.newThread())
.subscribe {
    runBlocking { delay(100) }
    println("$it: Observable#2 - ${Thread.currentThread().name}")
}

ob.subscribeOn(Schedulers.newThread())
    .subscribe {
    	runBlocking { delay(100) }
        println("$it: Observable#3 - ${Thread.currentThread().name}")
    }

sleep(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1: Observable#3 - RxNewThreadScheduler-3
1: Observable#2 - RxNewThreadScheduler-2
1: Observable#1 - RxNewThreadScheduler-1
2: Observable#2 - RxNewThreadScheduler-2
2: Observable#3 - RxNewThreadScheduler-3
2: Observable#1 - RxNewThreadScheduler-1
3: Observable#3 - RxNewThreadScheduler-3
3: Observable#2 - RxNewThreadScheduler-2
3: Observable#1 - RxNewThreadScheduler-1


: 3개 newThread()를 구현한 예시입니다

newThread()는 요청마다 새로운 Thread를 생성하는 스케쥴러입니다.

3개의 newThrea()로 생성된 Observable은 각각 서로다른 Thread이기 때문에 비동기로 출력됩니다

 


Schedulers.io vs Schedulers.computation

: 둘 다 비동기로 다른 Thread에서 동작하는 스케쥴러입니다

computation()은 계산 스케쥴러로, '계산' 작업에 사용하고 별도의 I/O가 없는 스케쥴러
내부적으로 Thread pool을 생성하며 별도의 설정이 없으면 스레드의 개수는 프로세서의 개수와 동일
     ( **interval() / timer() 함수는 기본적으로 계산 스케쥴러를 사용)

io()는 네트워크, 각종 I/O 작업 위한 스케쥴러, 필요할 때 마다 스레드를 계속 생성
IO 작업은 비동기로 실행되고, 결과를 얻기까지 대기 시간이 김

val ob = Observable.just(1, 2, 3)

println("start Schedulers.io()")

ob.subscribeOn(Schedulers.io())
    .subscribe {
    	runBlocking { delay(100) }
        println("$it: Schedulers.io() - ${Thread.currentThread().name}")
    }

println("start Schedulers.computation()")

ob.subscribeOn(Schedulers.computation())
    .subscribe {
    	runBlocking { delay(100) }
        println("$it: Schedulers.computation() - ${Thread.currentThread().name}")
    }

println("done")
delay(500)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

start Schedulers.io()
start Schedulers.computation()
done
1: Schedulers.computation() - RxComputationThreadPool-1
1: Schedulers.io() - RxCachedThreadScheduler-1
2: Schedulers.io() - RxCachedThreadScheduler-1
2: Schedulers.computation() - RxComputationThreadPool-1
3: Schedulers.io() - RxCachedThreadScheduler-1
3: Schedulers.computation() - RxComputationThreadPool-1


: io / computation 두 개의 다른 비동기 Thread에서 실행되기 때문에 호출한 thread의 코드인

"start Schdulers.io()", "start Schedulers.computation()" 그리고 "done"이 먼저 찍히는 걸 확인

그리고 2개의 Observable은 서로 다른 비동기 Thread 이므로 각자 출력을 확인 

 


Schedulers.single() vs Schedulers.trampoline()

single()은 worker Thread를 하나 만들어 해당 Thread Queue에 작업을 넘기는 방식

trampoline()은 호출한 Thread의 Queue에 넘겨주는 방식

둘 다 single 스레드로 순서를 보장하지만, single()은 같은 single()을 사용하는 Observable간의 순서만 보장

trampoline은 trampoline()을 사용한 Observable을 포함한 다른 코드에 까지 영향을 준다는 점

 

Schedulers.single()

val ob = Observable.just(1, 2, 3)

println("start - Observable#1")

ob.subscribeOn(Schedulers.single())
    .subscribe {
    	runBlocking { delay(100) }
        println("$it: Observable#1 - ${Thread.currentThread().name}")
    }

println("start - Observable#2")

ob.subscribeOn(Schedulers.single())
    .subscribe {
    	runBlocking { delay(100) }
        println("$it: Observable#2 - ${Thread.currentThread().name}")
    }

println("done")
delay(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

start - Observable#1
start - Observable#2
done
1: Observable#1 - RxSingleScheduler-1
2: Observable#1 - RxSingleScheduler-1
3: Observable#1 - RxSingleScheduler-1
1: Observable#2 - RxSingleScheduler-1
2: Observable#2 - RxSingleScheduler-1
3: Observable#2 - RxSingleScheduler-1


: single()로 2개의 Observable에 스레드를 지정하였습니다.

같은 single() 사용하는 Observable간에 순서를 보장하는 것을 결과로 보면 알 수 있습니다.

single()은 별도의 Thread이기 때문에 main Thread 에서 실행된 "start XXX"와 "done"이 먼저 출력됩니다

 

Schedulers.trampoline()

val ob = Observable.just(1, 2, 3)

println("start - Observable#1")

ob.subscribeOn(Schedulers.trampoline())
    .subscribe {
    	runBlocking { delay(100) }
        println("$it: Observable#1 - ${Thread.currentThread().name}")
    }

println("start - Observable#2")

ob.subscribeOn(Schedulers.trampoline())
    .subscribe {
    	runBlocking { delay(100) }
        println("$it: Observable#2 - ${Thread.currentThread().name}")
    }

println("done")
delay(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

start - Observable#1
1: Observable#1 - main
2: Observable#1 - main
3: Observable#1 - main
start - Observable#2
1: Observable#2 - main
2: Observable#2 - main
3: Observable#2 - main
done


: single() 메서드의 예시와 동일한 코드에서 single() -> trampoline()으로 변경한 코드입니다

trampoline은 현재(main) Thread에서 동작하기 때문에 위 코드는 모두 main Thread에서 동작

그러므로 출력은, 코드 순서대로 진행하고 출력하게 되어 single() 사용 시와 다른 결과를 보여줍니다

 

 

Scheduler 지정

  • subscribeOn() : 작업을 처리할 스레드를 지정 (Observable)
  • observeOn() : 작업결과를 보여줄 스레드를 지정 (Observer)

 

 


subscribeOn() 함수

: 작업을 처리할 때의 스레드를 결정하는 함수이지만, observeOn() 함수를 지정하지 않는다면

선언 위치에 상관없이 ObservableObserver 모두 특정 Scheduler에서 동작하도록 지정합니다

(데이터의 생산과 소비를 동일한 스케쥴러를 사용하도록 지정)

 

Observable.just(1, 2, 3)
    .map {
    	println("mapping $it - ${Thread.currentThread().name}")
        it
    }
    .subscribeOn(Schedulers.io())
    .subscribe {
    	println("subscribe $it - ${Thread.currentThread().name}")
    }

delay(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

mapping 1 - RxCachedThreadScheduler-1
subscribe 1 - RxCachedThreadScheduler-1
mapping 2 - RxCachedThreadScheduler-1
subscribe 2 - RxCachedThreadScheduler-1
mapping 3 - RxCachedThreadScheduler-1
subscribe 3 - RxCachedThreadScheduler-1


: Schedulers.io()로 IO 스케쥴러를 지정하였습니다.

데이터를 생산하는 Observable Thread와 데이터를 소비하는 Observer Thread가 동일함을 확인

 

 


observeOn() 함수

선언부분 이하의 downStream이 사용할 Scheduler를 지정하는 함수

 

Observable.just(1)
    .observeOn(Schedulers.io())
    .map {
    	println("mapping#1 - ${Thread.currentThread().name}")
        it
    }
    .observeOn(Schedulers.computation())
    .map {
    	println("mapping#2 - ${Thread.currentThread().name}")
        it
    }
    .observeOn(Schedulers.single())
    .subscribe{
        println("subscribe - ${Thread.currentThread().name}")
    }

delay(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

mapping#1 - RxCachedThreadScheduler-1
mapping#2 - RxComputationThreadPool-1
subscribe - RxSingleScheduler-1


: 3개의 observeOn()을 지정하였습니다.

observeOn() 지정 후 downStream 마다 Threa name을 호출한 결과 각각 다른 걸 확인

 

 

subscribeOn / observeOn 우선순위

  • subscribeOn은 구문 위치에 상관없이 선언된 Scheduler로 생산과, 소비 모두를 지정합니다
  • observeOn은 선언된 이후 하위(down) stream의 처리를 선언된 Scheduler로 지정합니다

 

 


case #1 subscribeOn + observeOn 혼용 사용

: 데이터 생산 및 처리는 background에서 수행하고, 결과를 받아와 UI에 출력하는 예시

더보기
fun main(args: Array<String>) = runBlocking<Unit> {

    Observable.just(1)
        .subscribeOn(Schedulers.io())
        .map {
            println("processing in -  ${Thread.currentThread().name}")
            it
        }
        .observeOn(Schedulers.single())
        .subscribe{
            println("subscribe: $it - ${Thread.currentThread().name}")
        }


    delay(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

processing in -  RxCachedThreadScheduler-1
subscribe: 1 - RxSingleScheduler-1


: 데이터를 생산하는 map { ... } 부분은 io Scheduler가 적용된 것을 볼 수 있습니다

결과를 출력하는 subscribe { ... } 부분은 single Scheduler가 적용된 것을 볼 수 있습니다

 

 


case #2 observeOn 아래에 subscribeOn 사용 예시

더보기
fun main(args: Array<String>) = runBlocking<Unit> {

    Observable.just(1)
        .map {
            println("processing in -  ${Thread.currentThread().name}")
            it
        }
        .observeOn(Schedulers.single())
        .subscribeOn(Schedulers.io())
        .subscribe{
            println("subscribe: $it - ${Thread.currentThread().name}")
        }


    delay(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

processing in -  RxCachedThreadScheduler-1
subscribe: 1 - RxSingleScheduler-1

 

: subscribeOn()을 observeOn()아래로 옮겨도 결과는 똑같습니다

subscribeOn()의 효과범위는 데이터 생산(Observable) 데이터 소비(Observer) 전역에 포함되고,

observeOn()은 선언 이후 아래(down) Stream에 포함되는데

우선순위는 observeOn() > subscribeOn() 의 관계이기 때문입니다.

 

 


case #3 subscribeOn 여러개 사용 예시

더보기
fun main(args: Array<String>) = runBlocking<Unit> {

    Observable.just(1)
    	.subscribeOn(Schedulers.io())
        .map {
            println("processing in -  ${Thread.currentThread().name}")
            it
        }
        .subscribeOn(Schedulers.computation())
        .subscribe{
            println("subscribe: $it - ${Thread.currentThread().name}")
        }


    delay(1000)
}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

processing in -  RxCachedThreadScheduler-1
subscribe: 1 - RxCachedThreadScheduler-1


: subscribeOn을 중복해서 사용하면 제일 먼저 선언된 Scheduler로 동작하는걸 확인

 

우선순위 정리

  • subscribeOn은 어디에 선언되든 Observable / subscribe가 동작되는 전체 Scheduler를 지정
  • subscribeOn이 여러개 선언되면, 가장 먼저 선언된 subscribeOn의 Scheduler 지정 (나머지 무시)
  • subscribeOn과 observeOn이 혼용될 경우

     : subscribeOn은 observeOn 선언 부분의 이전 부분 (Up Stream)에 적용되고

       observeOn 선언 이후부터는 observeOn의 Scheduler로 지정