본문 바로가기

[RxKotlin] ReactiveX #4 - 연산자 'Operator'

ReactiveX #4 - 연산자 'Operator'

 

RxKotlin 이전 글

  1. ReactiveX #1 - 'ReactiveX 란?'
  2. ReactiveX #2 - Observable
  3. ReactiveX #3 - Subject

 

 

연산자 'Operator'

    • RxKotlin에도 Collection과 유사한 연산자들이 존재

      가장 친숙한 연산자인 filter 또는 map을 비롯하여 다양한 연산자를 제공
    • Collection 연산자와 차이점

      • 입력 - Observable 또는 flowable
      • 출력 - Observable 또는 flowable
    • 생성 연산자 : create, interval, timer, range, fromXXX
    • 변환 연산자 : filter, map, concatMap, flatMap, switchMap, reduce, scan
    • 결합 연산자 : zip, zipWith, combineLatest, merge, concat
    • 조건 연산자 : amd, takeUntil, skipUntil, all
    • Util Function : delay, timeInterval

 

생성 연산자

 

[RxKotlin] ReactiveX #2 - Observable

Observable (구독 대상자) Contents Observable 의미 Observable 주요 이벤트 (Callback) : onSubscribe(), onNext(), onError(), onComplete() Observable 연산자(Operators) : 생성 - create(), just(), range(..

jaejong.tistory.com

 

 

 

변환 연산자

  • filter() : Observable에서 원하는 데이터만 골라 발행하는 함수
  • map() : 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 함수
  • concatMap() : 먼저 들어온 데이터 순서대로 처리를 보장하는 함수 (순서보장)
  • flatMap() : concatMap()과 다르게 순서를 보장하지 않는 함수 (순서보장 X)
  • switchMap() : 순서를 보장하지만 중간에 발행되면 기존에 진행 중이던 작업을 바로 중단, 새로운 값 처리
  • reduce() : 발행한 데이터를 모두 사용하여 최종 결과 데이터를 합성할 때 사용
  • scan() : 실행할 때마다 입력값에 맞는 중간 결과 및 최종 결과를 발행

 

각 함수들의 설명/사용방법은 접은글로 접어놨습니다.

각 함수들에 보이는 "예시보기"를 누르면 펼쳐집니다


filter() 함수

: Observable의 발행 데이터에서 원하는 데이터만 걸러내는 함수

더보기
val items = arrayOf("item 1", "item 2", "value 3", "item 4", "value 5")

Observable.fromArray(*items)
    .filter { item -> item.startsWith("value") }
    .subscribe { println("$it") }

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

value 3
value 5


: 5개의 요소가 들어있는 String 배열에서 value로 시작하는 item만 배출하는 filter함수

출력 결과는 item으로 시작하는 요소를 제외한 value 3, value 5만 출력을 볼 수 있습니다

 

 


filter()와 비슷하게 이용할 수 있는 함수들

  • first(default) : Observable의 첫 번째 값만 발행, 만약 값이 없으면 default값 발행
  • last(default) : Observable의 마지막 값만 발행, 만약 값이 없으면 default값 발행
  • take(N) : Observable의 최초 N개의 값을 발행
  • takeLast(N) : Observable의 마지막 N개 값을 발행
  • skip(N) : Observable의 최초 N개 값을 제외하고 발행
  • skipLast(N) : Observable의 마지막 N개 값을 제외하고 발행

 


map() 함수

: 입력값을 어떤 함수에 넣어서 원하는 값변환하는 함수

더보기

 

val numbers = arrayOf(1, 2, 3, 4, 5)

Observable.fromArray(*numbers)
    .map { item -> item * 10 }
    .subscribe { println("$it")}

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

10
20
30
40
50


: int배열 numbers의 값(1,2,3,4,5)를 각각 10씩 곱해서 새로운 값으로 배출한 결과

10,20,30,40,50을 확인할 수 있습니다

 

 

  • map()flatMap() / concatMap() 차이점

    • map() - 단일 값 반환(return)
    • flatMap() / concatMap() - Sub Observable 반환(return)

 


concatMap() 함수

: 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장 (순서 보장)

더보기
val balls = arrayOf("1", "3", "5")

// interval(), 0.1초마다 값을 발행하는 Observable생성
Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }				// interval은 Long객체를 0부터 발행, long -> int 변환 
    .map { idx -> balls[idx] }			// map(), balls[index]로 배열 내 값으로 변환
    .take(balls.size.toLong())			// take(), balls배열의 크기만큼만 발행 설정
    .concatMap { ball ->			// concatMap(), Sub Observable반환
        Observable.interval(200L, TimeUnit.MILLISECONDS)	// 0.2초마다 값 발행
            .map { "$ball<>" }			// map(), 값 변환 ball -> ball<> 
            .take(2)				// take(), 처음 2개만 발행 설정 (interval은 무제한 발행이기 때문에)
    }
    .subscribe(System.out::println)		// subscriber 구독설정

sleep(2000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1<>
1<>
3<>
3<>
5<>
5<>


: concatMap()의 각 Sub Observable의 값 발행을 보면 0.2초 간격으로 $ball<>값을 발행 설정

interval은 시간간격으로 무제한 값을 발행하기 때문에 take(2)로 2개의 값만 발행하게 설정

즉, concatMap으로 생성되는 각 Sub Observable은 1<>1<> / 3<>3<> / 5<>5<> 이렇게 2개의 값을 발행해야 합니다

결과를 보면 3개의 Sub Observable이 순서보장받으면서 출력되는 것을 확인 가능  

 


flatMap() 함수

: concatMap()과 달리 순서를 보장하지 않습니다

더보기

: map()과 비슷하지만 return타입Observable로 반환되기 때문에 여러개결과를 반환 가능

val balls = arrayOf("1", "3", "5")

// interval(), 0.1초마다 값을 발행하는 Observable생성
Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }				// interval은 Long객체를 0부터 발행, long -> int 변환 
    .map { idx -> balls[idx] }		// map(), balls[index]로 배열 내 값으로 변환
    .take(balls.size.toLong())		// take(), balls배열의 크기만큼만 발행 설정
    .flatMap { ball ->				// flatMap(), Sub Observable반환
        Observable.interval(200L, TimeUnit.MILLISECONDS)	// 0.2초마다 값을 발행하는 Observable 생성
            .map { "$ball<>" }	// map(), 값 변환 ball -> ball<> 
            .take(2)			// take(), 처음 2개만 발행 설정 (interval은 무제한 발행이기 때문에)
    }
    .subscribe(System.out::println)	// subscriber 구독설정

sleep(2000)

    
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1<>
3<>
5<>
1<>
3<>
5<>


: flatMap()은 Observable을 return하기 때문에 2개의 값을 발행하는 Observable을 생성하였고

concatMap()은 각 Sub Observable의 순서보장받으면서 출력이 되었는데

flatMap()은 각 Sub Observable이 순서보장받지 못하면서 출력하는것을 확인할 수 있습니다

 


switchMap() 함수

: 순서 보장은 concatMap()과 비슷하지만,

switchMap()은 중간에 발행되면 기존에 진행 중이던 작업을 바로 중단하고 그 때 발행된 값처리하는 함수 

더보기

: 여러 개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 유용한 함수

중간에 끊기더라도 마지막 데이터의 처리는 보장되기 때문에

 

val balls = arrayOf("1", "3", "5")

Observable.interval(100L, TimeUnit.MILLISECONDS)
    .map { it.toInt() }
    .map { idx -> balls[idx] }
    .take(balls.size.toLong())
    .switchMap { ball ->
        Observable.interval(200L, TimeUnit.MILLISECONDS)
            .map { "$ball<>" }
            .take(2)
    }
    .subscribe(System.out::println)

sleep(2000)
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

5<>
5<>


: 위 flatMap(), concatMap()의 예시 코드와 동일한 예시입니다

그런데 출력 결과를 보면 3개의 Sub Observable중 마지막 Sub Observable의 값만 출력

이유는 Main Observable이 0.1초 간격으로 3개의 Sub Observable을 발행합니다

그리고 각 Sub Observable은 0.2초 간격으로 값을 발행합니다

"1"의 값에 해당하는 1번째 Sub Observable이 값 발행까지 0.2초 대기 중에 

"2"의 값에 해당하는 2번째 Sub Observable이 생성되었기 때문에 1번째 Sub Observable은 중단

다시 2번째 Sub Observable이 값 발행까지 0.2초 대기 중

"3"의 값에 해당하는 3번째 Sub Observable이 생성되었기 때문에 2번째 Sub Observable은 중단

다시 3번째 Sub Observable은 값 발행까지 0.2초 대기 중에 다른 값의 발행이 없어서

3번째 Sub Observable은 제대로 값을 발행할 수 있습니다

 


reduce() 함수

: 발행한 데이터를 모두 사용하여 최종 결과 데이터를 합성할 때 이용

더보기
val arrays = arrayOf("1", "2", "3", "4", "5")

Observable.fromArray(*arrays)
    .reduce { prev, next -> "$next($prev)" }
    .subscribe(System.out::println)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

5(4(3(2(1))))


: prev는 이전 값, next는 현재 값을 의미합니다

arrays의 마지막 값까지 더해진 total값을 한번 발행합니다

 


scan() 함수

: reduce() 함수는 모든 데이터가 입력된 후 그것을 종합하여 마지막 1개의 데이터를 발행

scan() 함수는 실행할 때 마다 입력값에 맞는 중간 결과최종 결과 데이터를 발행

더보기
val arrays = arrayOf("1", "2", "3", "4", "5")

Observable.fromArray(*arrays)
    .scan { prev, next -> "$next($prev)" }
    .subscribe(System.out::println)
    
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1
2(1)
3(2(1))
4(3(2(1)))
5(4(3(2(1))))


: reduce()는 최종 마지막 값만 발행해주며, scan()은 각 중간 값들도 발행해주는 차이점이 존재

 

 

결합 연산자

  • 생성 연산자와 변환 연산자는 1개의 Observable(=데이터 흐름)을 대상으로 하고,

    결합 연산자는 n개의 Observable을 다루는 연산자
  • zip() : 입력 Observable에서 데이터를 모두 새로 발행했을 때 그것을 결합
  • zipWith() : zip()와 동일하지만, Observable을 다양한 함수와 조합하면서 틈틈히 호출
  • combineLatest() : 처음 각 Observable에서 데이터를 발행한 후에는 어디에서 값을 발행하던 최신 값 갱신
  • merge() : 최신 데이터 여부와 상관없이 각 Observable에서 발행하는 값 그대로 출력 (가장 단순)
  • concat() : 입력된 Observable을 Observable 단위로 이어 붙여 결합

 


zip() 함수

: 2개 이상의 Observable을 결합할 때 사용

더보기

: 만약 한쪽의 Observable에서 처리가 안된다면 모두 처리가 될 때까지 발행을 대기

val shapes = listOf("BALL", "PENTAGON", "STAR")
val coloredTriangles = listOf("2-T", "6-T", "4-T")

// shapes, coloredtriangles 두 개의 Observable 결합
Observable.zip(
	// zip()의 1번째 파라미터(Observable)
    Observable.fromIterable(shapes).map {		
        when (it) {
            "HEXAGON" -> return@map "-H"		// return@map은 return으로 반환할 값의 위치
            "OCTAGON" -> return@map "-O"
            "RECTANGLE" -> return@map "-R"
            "TRIANGLE" -> return@map "-T"
            "DIAMOND" -> return@map "<>"
            "PENTAGON" -> return@map "-P"	// 2번째 요소(PENTAGON) 해당
            "STAR" -> return@map "-S"		// 3번째 요소(STAR) 해당
            else -> return@map ""		// 1번째 요소(BALL) 해당
        }
    },
    // zip()의 2번째 파라미터(Observable)
    Observable.fromIterable(coloredTriangles).map {	
        val hyphen = it.indexOf("-")		// 하이픈(-)의 index반환 (1 반환)

        if (hyphen > 0) {					// 하이픈 위치(index)가 0보다 크다면
            return@map it.substring(0, hyphen) // subString으로 나눠서 return@map
        }

        return@map it
    },
    // zip()의 3번째 파라미터(Function), zip으로 새로 발행할 값을 생성하는 함수
    BiFunction<String, String, String> { suffix, color ->  return@BiFunction color + suffix}
).subscribe(System.out::println)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

2
6-P
4-S


BiFunction< 입력타입1, 입력타입2, 리턴타입> 중 입력값하고 리턴값 자료형을
구체적으로 명시해야 합니다

2개의 Observable의 각각의 값에 하나의 zip 발행값으로 출력되는 모습을 확인할 수 있습니다

 


zipWith() 함수

: zip() 함수와 동일하지만 Observable을 다양한 함수와 조합하면서 틈틈히 호출할 수 있는 장점

더보기

 

Observable.zip(
    Observable.just(100, 200, 300),		// 1번째 Observable
    Observable.just(10, 20, 30),		// 2번째 Observable
    BiFunction<Int, Int, Int> {a, b -> a + b})	// <입력type, 입력type, 리턴type> { 리턴식 }
    // zip()의 발행값에 추가로 funtion 더해서 값 발행
    .zipWith(Observable.just(1, 2, 3), BiFunction<Int, Int, Int> { ab, c -> ab + c }) // ab = zip의 Bifunction값, c는 zipwitdh의 Obserable 값
    .subscribe(System.out::println)
    
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

111
222
333


: zipWith()은 zip()이 발행하는 Observable의 값을 추가적으로 함수와 조합하여 다시 발행하는 기능을 합니다

zipWith(Observable.just(1,2,3), Bifunction<int, int, int> { ab, c -> ab + c })

  -> zip()에서 발행하는 Bifunction값에 추가적으로 조합해서 발행하는 함수

     ab = zip()의 BiFunction 리턴값을 의미

     c = zipWith()내부 Observable의 발행 값을 의미

 


combineLatest() 함수

: 2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수

더보기

 

val data1 = listOf("6-T", "7-T", "4-T", "2-T")
val data2 = listOf("DIAMOND", "STAR", "PENTAGON")

Observable.combineLatest(
    Observable.fromIterable(data1)
        .zipWith(
            Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),	// 0.15초 시작, 0.2초 간격
            // 입력(String), 입력(Long), 출력(String)
            BiFunction<String, Long, String> { color, _ ->	// 2번째 파라미터(7라인의 interval값)는 사용X '_' 처리
            
                val hyphen = color.indexOf("-")		// 하이픈의 위치(index) 찾기

                if (hyphen > 0) {
                    return@BiFunction color.substring(0, hyphen)	// 하이픈의 index가 0보다 크면 String 분할
                }

                return@BiFunction color	// return@BiFunction의미는 9번라인 BiFunction의 반환을 의미
            }
        ),
    // CombineLatest의 2번째 파라미터(Observable)는 0.2초 간격으로 값 발행
    Observable.fromIterable(data2)
        .zipWith(
            Observable.interval(200L, TimeUnit.MILLISECONDS),	
            BiFunction<String, Long, String> { shape, _ ->
                when (shape) {
                    "HEXAGON" -> return@BiFunction "-H"
                    "OCTAGON" -> return@BiFunction "-O"
                    "RECTANGLE" -> return@BiFunction "-R"
                    "TRIANGLE" -> return@BiFunction "-T"
                    "DIAMOND" -> return@BiFunction "<>"
                    "PENTAGON" -> return@BiFunction "-P"
                    "STAR" -> return@BiFunction "-S"
                    else -> return@BiFunction ""
                }
            }
        ),
    BiFunction<String, String, String> { color, suffix -> color + suffix }
).subscribe(System.out::println)

sleep(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

6<>
7<>
7-S
4-S
4-P
2-P


: combineLatest() 3개의 파라미터 (Observable, Observable, BiFunction)

2개의 Observable이 각각 값이 변경되었을 때 갱신(발행)해주는 예시로

1번째 Observable은 시작시간 150ms설정, 그 후 간격 발행간격 200ms 설정

2번째 Observable은 200ms 간격으로 발행 설정

즉, 2개의 Observable은 서로 발행하는 간격이 다르게 됩니다

1번째 Observable발행 값 "6" / 2번째 Observable의 발행 값 "<>"의 상태에서

1번째 Observable이 새로운 값 "7"을 발행하면, 2번째 Observable의 값"<>"에 새로 갱신해서 출력

 


merge() 함수

: 가장 단순한 결합 함수, 아무 관여없이 업스트림에서 먼저 입력되는 데이터를 그대로 발행

더보기
val data1 = listOf("1", "10")
val data2 = listOf("one", "two", "three", "four")

// 1st Observable 생성
val source1 = Observable.interval(0, 100, TimeUnit.MILLISECONDS)	// 시작시간 0ms, 간격 100ms
    .map { it.toInt() }			
    .map { idx -> data1[idx] }	// data1 List 값으로 변경
    .take(data1.size.toLong())	// data1의 크기만큼 발행

// 2st Observable 생성
val source2 = Observable.interval(50, TimeUnit.MILLISECONDS)		// 간격 50ms
    .map { it.toInt() }
    .map { idx -> data2[idx] }	// data2 List 값으로 변경
    .take(data2.size.toLong())	// data2의 크기만큼 발행

// merge(), Observable 생성
Observable.merge(source1, source2)
    .subscribe(System.out::println)

sleep(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1
one
10
two
three
four


: merge()는 아무런 관여없이 업스트림(source1, source2)의 발행값을 발행 즉시 전달해주는 함수

업스트림(Observable)들의 발행 개수는 서로 달라도 상관없는 걸 확인할 수 있다

 


concat() 함수

: 2개 이상의 Observable을 이어 붙여주는 함수

onComplete()가 발생해야 그 다음 Observable을 구독한다는 점이 중요

더보기

 

val data1 = listOf("1", "3", "5")
val data2 = listOf("2", "3", "6")

val source1 = Observable.fromIterable(data1)
    .doOnComplete { println("source1 onComplete()") }	// onComplete() 호출 시, Callback되는 함수 doOnComplete()

val source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)	// 100ms 간격으로 발행
    .map { it.toInt() }
    .map { idx -> data2[idx] }
    .take(data2.size.toLong())
    .doOnComplete { println("source2 onComplete()") }	

Observable.concat(source1, source2)
    .doOnComplete { println("concat onComplete()") }	// onComplete() 호출 시, Callback 함수
    .subscribe(System.out::println)

sleep(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1
3
5
source1 onComplete()
2
3
6
source2 onComplete()
concat onComplete()


: onComplete()가 호출되는 지 확인하기 위해 Observable에 doOnComplete()함수를 사용

doOnComplete()는 Observer에 onComplete()이벤트가 발생 시 Callback되는 함수

concat()은 2개 이상의 Observable을 단순하게 차례대로 결합하는데,

앞의 Observable이 onComplete()를 호출하지 않는다면 뒤의 Observable은 결합되지 않는 점

 

 

조건 연산자

  • Observable의 흐름을 제어하는 연산자
  • amb() : 여러 Observable 중 어느 것이든 먼저 나오는 Observable을 채택
  • takeUntil(other) : other Observable에서 데이터가 발행하기 전까지만 현재 Observable 값 발행
  • skipUntil(other) : other Observable에서 데이터가 발행될 때 까지 현재 Observable 발행 값 무시
  • all() : Observable에 입력되는 값이 모두 특정 조건에 맞을 때만 true를 발행

 


amb() 함수

: 여러개의 Observable 중에서 가장 먼저 데이터를 발행하는 Observable을 선택하는 연산자

더보기
val data1 = listOf("김루피", "최상디", "박조로")
val data2 = listOf("신짱구", "신짱아", "훈이")

val sources = listOf(
    // Observable 1
    Observable.fromIterable(data1)
    	.delay(200L, TimeUnit.MILLISECONDS)		// 200ms 간격
        .doOnComplete { println("Observable #1 : onComplete()") },
    // Observable 2
    Observable.fromIterable(data2)		
        .delay(100L, TimeUnit.MILLISECONDS)		// 100ms 간격
        .doOnComplete { println("Observable #2 : onComplete()") })

// Observable.amb() 
Observable.amb<String>(sources)
    .doOnComplete { println("Result : onComplete()") }
    .subscribe(System.out::println)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

신짱구
신짱아
훈이
Observable #2 : onComplete()
Result : onComplete()


: 2개의 List를 1개의 List<Observable>로 결합한 뒤에 amb() 파라미터로 전달

1번째 Observable은 200ms 간격으로 값을 발행

2번째 Observable은 100ms 간격으로 값을 발행

2개의 Observable 중 처음으로 먼저 값을 발행하는건 2번째의 Observable이기 때문에

amb()에서 2번째의 Observable이 선택되어 1번째 Observable은 무시된다

 


takeUntil() 함수

: 인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시 완료

즉, take() 함수처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable이 값을 발행하는지에 판단

더보기
val list = listOf("1", "2", "3", "4", "5", "6")

Observable.fromIterable(list)
    .zipWith(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction<String, Long, String> { data, _ -> data })
    .takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS))	// 발행 중지의 기준이 되는 Observable
    .subscribe(System.out::println)

sleep(1000)


ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

1
2
3
4


: 예시를 보면 takeUntil()에 timer로 생성한 Observable이 파라미터로 등록.

takeUntil에 전달된 Observable이 값을 발행하는 순간이 기존 Observable의 발행 중지 시점이 됨

500ms가 지나면 값을 발행하기 때문에 500ms동안 값을 발행할 수 있는 의미

 


skipUntil() 함수

: takeUntil() 함수의 반대로 현재 Observable의 발행을 무시하다가,

설정한 Observable이 발행하면 그 때 발행하기 시작

더보기

 

val list = listOf("1", "2", "3", "4", "5", "6")

Observable.fromIterable(list)
    .zipWith(
        Observable.interval(100L, TimeUnit.MILLISECONDS),
        BiFunction<String, Long, String> {data, _ -> data})
    .skipUntil(Observable.timer(500L, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println)

sleep(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

5
6


: takeUntil() 과 반대로 skipUntil()에 넘겨진 Observable이 값을 발행하는 시점이,

기존 Observable의 값이 발행되는 시점이 된다

 


all() 함수

: 주어진 조건이 모두 만족할때만 발행하는 연산자

더보기
val list = listOf("1", "2", "3", "4")

Observable.fromIterable(list)
    .map {
        when {
            it.endsWith("-H") -> return@map "HEXAGON"
            it.endsWith("-H") -> return@map "OCTAGON"
            it.endsWith("-H") -> return@map "RECTANGLE"
            it.endsWith("-H") -> return@map "TRIANGLE"
            it.endsWith("<>") -> return@map "DIAMOND"
            it.endsWith("-P") -> return@map "PENTAGON"
            it.endsWith("-S") -> return@map "STAR"
            else -> return@map "BALL"			// 모두 BALL 리턴
        }
    }
    .all {
        it == "BALL"
    }
    .subscribe { result ->
        println(result!!)
    }

sleep(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

true


: Observable이 발행할 값에 all()의 조건이 모두 true이면 subscribe의 값을 발행 

Observable의 인자 (1,2,3,4,5)가 map의 when조건에서 모두 해당하지 않아 "BALL"을 return(반환)

all()의 파라미터를 보면 전달된 값(it)이 "BALL"인지를 검사

5개의 인자가 모두 True에 성립할 경우 subscribe()의 result가 출력

 

Util Function (유틸리티 함수)

  • delay() : Observable의 발행을 지연시키는 함수
  • timeInterval() : 이전 발행 데이터와 현재 발행 데이터의 발행시간 차이를 갖는 Timed 객체를 발행하는 함수

 


delay() 함수

: 일정시간을 받아서 데이터 발행지연시켜주는 함수 

더보기
val data = listOf("1", "2", "3", "4")

Observable.fromIterable(data)
    .delay(500L, TimeUnit.MILLISECONDS)
    .subscribe(System.out::println)
sleep(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

// 500ms 만큼 지연되고 데이터를 발행
1
2
3
4


: delay() 함수로 500ms 지연값을 설정.

실제로 500ms 지연 된 후 Observable이 값을 발행

 


timeInterval() 함수

: 이전 값을 발행한 이후 시간이 얼마나 흘렸는지를 알려주는 연산자

더보기
val data = listOf("1", "3", "7")

Observable.fromIterable(data)
    .delay {
        sleep(Random().nextInt(100).toLong())
        return@delay Observable.just(it)
    }
    .timeInterval()
    .subscribe(System.out::println)		// Timed 객체가 전달

sleep(1000)

ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과

Timed[time=41, unit=MILLISECONDS, value=1]
Timed[time=17, unit=MILLISECONDS, value=3]
Timed[time=97, unit=MILLISECONDS, value=7]


: 지연되는 시간을 보기 위해 delay() 함수를 사용해 0~100ms랜덤으로 지연시키고,

data리스트의 값을 발행하였습니다

timeInterval() 연산자를 사용하면 subscribe()의 전달되는 값은 Timed 객체로 전달됩니다.

timed객체는 time, unit, value 이렇게 3개의 속성(프로퍼티)를 갖고있으며

time은 이전 값과 지금 발행된 값의 시간 간격을 갖고있고,

unit은 시간단위를 갖고있으며

value는 Observable에서 발행한 값을 갖고있습니다