코루틴 #7 - Flow (플로우)
Coroutine 이전 글
- 코루틴 #1 - 기본
- 코루틴 #2 - CoroutineContext와 CoroutineScope란?
- 코루틴 #3 - suspend Function (중단함수)
- 코루틴 #4 - CoroutineBuilder와 ScopeBuilder
- 코루틴 #5 - Channel (채널)
- 코루틴 #6 - 취소(Cancellation), 예외(Exception), 핸들러(Handler)
Asynchronous Flow
suspending Function(중단 함수)는 비동기로 동작하면서 하나의 값을 반환합니다.
Flow는 비동기로 동작하면서 여러개의 값을 반환하는 Function은 만들 때 사용하는 Builder입니다
다중 값 나타내기 (Repressenting Multiple Values)
다중 값이란 보통 Collection(컬렉션)을 통해 나타내는 값들
ex) Collection - Array, List, Map, Hash, Set, ArrayList 등
예를 들어 3개의 수를 요소로 갖는 리스트(List)를 반환하는 foo() 함수가 있다면,
반환되는 List를 forEach() 함수로 리스트 내 모든 요소들을 순회할 수 있습니다.
컬렉션 사용 예시
fun foo(): List<Int> = listOf(1, 2, 3)
fun main() {
foo().forEach { value -> println(value) }
}
출력 결과
1
2
3
기본적으로 쓰는 방식의 컬렉션으로 순회하는 예시입니다.
List<Int> 컬렉션을 함수의 반환 타입으로 사용한다는 것은,
foo()함수의 연산이 모두 종료되고 나서 한번에 List를 반환하고 그 List를 사용해야 한다는 점입니다
동일 구조를 Flow로 작성한 예시
fun foo(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
foo().collect { value -> println(value) }
}
Flow로 작성한 위 코드는 launch 코루틴과 flow연산이 각각 따로 실행되고 출력됩니다.
출력 결과
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
결과를 보면 launch내 반복출력과 flow를 활용한 연산이 동시에 이루어지고 있는걸 확인할 수 있습니다.
- Flow 타입의 생성은 flow{ } 빌더를 사용
- Flow{ ... } 블록 내부 코드는 중단(suspend) 가능
- foo() 함수는 더 이상 suspend로 마킹되지 않음
- 결과 값들은 flow에서 emit() 함수를 사용해 방출
- flow에서 방출된 값은 collect 함수를 이용해 수집
foo() 함수 내부의 코드는 foo().collect 함수를 호출 시 실행된다는 점을 기억해야 합니다.
flow { } 블록 내부에서 emit() 메소드는 collect { } 블록으로 값을 전송(방출) 한다는 점.
Flows are cold
Flow는 Sequnce와 동일하게 콜드 스트림입니다.
즉, Flow{ } 빌더의 코드블럭은 플로우가 수집(Collect)되기 전까지 실행되지 않는다는 것을 의미
콜드스트림 이해를 위한 예시
// flow 반환 함수
fun foo(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i) // collect에 값 방출
}
}
fun main() = runBlocking<Unit> {
println("Calling foo...")
val flow = foo()
println("Calling collect...")
flow.collect { value -> println(value) } // flow 수집 1
println("Calling collect again...")
flow.collect { value -> println(value) } // flow 수집 2
}
출력 결과
Calling foo...
Calling collect...
Flow started // collect 수집 (foo() 함수 실행)
1
2
3
Calling collect again...
Flow started // collect 수집 (foo() 함수 실행)
1
2
3
결과를 보면 FFlow를 반환하는 foo() 함수가 suspend로 표시되지 않는 중요한 점입니다.
foo() 함수는 호출 시 바로 반환되며 어떤 무엇도 기다리지 않습니다.
그리고 플로우는 매번 수집(collect) 될 때 마다 새로 시작하게 됩니다
플로우 빌더 (Flow Builder)
앞에서 본 flow{ } 사용 예시는 가장 기본적인 플로우 빌더로, 그 이외의 다양한 빌더들을 설명
- flowOf { } - 고정된 값들을 방출하는 플로우를 정의
- .asFlow () - 다른 Collection/Sequence들을 -> Flow로 변환
ex) List / Map / Array -> Flow로 변환
플로우로 1~3까지 출력하는 예시
// (1..3)는 Int형 배열 의미
(1..3).asFlow().collect { value -> println(value) }
출력 결과
1
2
3
(1..3)은 1,2,3의 Int형 배열 생성을 의미합니다.
.asFlow()는 다른 컬렉션/시퀸스를 Flow로 변환, (int배열(1..3)을 -> Flow로 변환)
collect { }는 수집의 개념으로 Flow의 각각 값(1,2,3)을 처리하는 코드 블럭
플로우 중간 연산자 (Intermediate flow operators)
플로우는 컬렉션과 시퀸스처럼 중간 연산자를 사용해 변환이 가능합니다.
- 중간 연산자는 업스트림 플로우에 적용되어 다운스트림 플로우를 반환
- 중간 연산자도 마찬가지로 플로우와 같이 콜드(Cold) 타입으로 동작
- 중간 연산자는 suspend(중단)함수가 아니기에 새롭게 변환된 플로우를 즉시 반환 (지연x)
- Collection/Sequence와 차이점은 중간 연산자(map, filter등) 블록 내부에서 suspend(중단)함수 호출 가능
중간 연산자(map)을 사용해 플로우를 원하는 값으로 매핑하여 반환하는 예시
suspend fun performRequest(request: Int): String {
delay(1000) // 1초 대기
return "response $request" // 플로우 값 매핑 "request -> response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // .asFlow() - 배열 -> Flow
.map { request -> performRequest(request) } // 대상 플로우를 매핑해서 새로운 플로우로 반환
.collect { response -> println(response) } // map으로 반환된 새 플로우에 대한 수집(collect)
}
출력 결과
response 1
response 2
response 3
1,2,3의 배열을 Flow로 변환 후 중간 연산자(map)를 이용해서 매핑 후 새로운 Flow를 반환
Flow요소들 매핑 (1,2,3 -> response 1, response2, response3)
새로 반환받은 플로우에 대해 수집(collect) 처리 (println)
변환 연산자 (Transform operator)
플로우 변환 연산자들 중에서 가장 일반적인 연산자는 transform 연산자입니다.
transform 연산자는 중간 연산자(map, filter) 같이 단순한 변환 보다 복잡한 변환을 처리위한 연산자입니다.
복잡한 변환 ? 임의의 횟수나 임의의 값으로 변환 하는것을 의미
예시로 위에서 봤던 map과 달리 복잡한 변환 후 출력
suspend fun performRequest(request: Int): String {
delay(1000) // 1초 대기
return "response $request" // 플로우 값 매핑 "request -> response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // 배열 -> Flow 변환
.transform { request -> // 변환 연산자(transform)
emit("Making request $request") // emit() - flow 방출
emit(performRequest(request)) // emit() - flow 방출
}.collect { response -> println(response) } // flow 수집
}
출력 결과
Making request 1 // Flow 요소 1
response 1
Making request 2 // Flow 요소 2
response 2
Making request 3 // Flow 요소 3
response 3
중간 연산자(map)은 요소마다 1개의 변환밖에 하지 못하지만
변환 연산자(transform)은 예시처럼 emit()을 추가하여 요소마다 여러개의 변환이 가능하게 해줍니다.
플로우 종단 연산자 (Terminal flow operator)
종단 연산자 ? 플로우 수집을 시작하는 종단 함수 (대표적으로 collect)
대표적으로 사용하는 종단 연산자가 collect { }가 있는데, 그 외에도 유용한 종단 연산자가 존재합니다
Collection 변환 종단 연산자
- toList 또는 toSet : Flow를 Mutable Collection으로 변환
- first : 첫 번째 원소를 반환하고 나머지는 Cancel, 첫 번째 요소만 처리할 때
- reduce : 첫 번째 원소부터 주어진 operation을 이용하여 누적시키면서 최종값을 반환
- fold : 초기 값을 입력받아 주어진 operation을 이용하여 누적시키면서 최종값을 반환
- collectIndexed : collect와 같은 동작이지만 index 요소가 추가되어 원하는 index에 맞는 처리가 가능
reduce
첫 번째 원소부터 주어진 operation을 이용하여 누적시키면서 최종값을 반환
fun main(args: Array<String>) = runBlocking<Unit> {
val sum = (1..5).asFlow()
.reduce { accumulator, value -> accumulator + value }
println(sum)
}
출력 결과
accumulator [1] value [2]
accumulator [3] value [3]
accumulator [6] value [4]
accumulator [10] value [5]
result [15]
reduce 연산자의 기본 프로퍼티를 보면 accumulator, value 두 종류가 존재합니다.
value는 순차적으로 각각 요소(1,2,3,4,5)가 되며,
accumulator는 reduce 블록 내 로직 반환값(위에서는 accumulator + value의 값)이 됩니다.
즉, 이전 reduce 연산 결과값과 현재 요소를 뜻하게 되며 위 예시는 요소(1,2,3,4,5)를 모두 더한 결과를 반환해줍니다.
Flow의 요소 (1,2,3,4,5)가 모두 더해진 값 15가 출력됩니다.
fold
초기 값을 입력받아 주어진 operation을 이용하여 누적시키면서 최종값을 반환
fun main(args: Array<String>) = runBlocking<Unit> {
val sum = (1..5).asFlow()
.fold(10) { acc, value ->
println("acc [$acc] value [$value]")
acc + value
}
println("result [$sum]")
}
출력 결과
acc [10] value [1]
acc [11] value [2]
acc [13] value [3]
acc [16] value [4]
acc [20] value [5]
result [25]
reduce처럼 누적시키면서 결과 값을 반환하는 건 비슷하지만 조금 다릅니다.
reduce는 첫 연산의 대상이 첫번째 요소를 기준으로 더해서 결과 값을 반환 하지만,
fold는 첫 연산의 대상을 미리 정해주어서 그 요소를 기준으로 더해서 결과 값을 반환 합니다.
reduce와 fold의 차이점을 쉽게 이해 하기위해 모두 실행마다 출력을 찍어봤습니다
- reduce는 5개의 요소지만 4번의 operation이 실행됩니다. (첫 번째 요소 생략)
- fold는 5개의 요소 모두 5번의 operation이 실행됩니다. (첫 번째 요소 포함)
두 연산자 모두 각 operation의 결과를 누적시키는 점은 같지만 첫 번째 시작점의 차이를 이해하시면 됩니다
collectIndexed
collect와 같은 동작이지만 index 요소가 추가되어서 원하는 index에 맞는 처리가 가능하게 됩니다.
fun main(args: Array<String>) = runBlocking<Unit> {
(1..5).asFlow()
.collectIndexed { index, value ->
if (index == 1)
println(value)
}
}
출력 결과
2
collectIndexed 요소에 index도 추가되어서 인덱스에 맞게 처리를 가능하게 해줍니다.
index는 배열처럼 0부터 시작하기 때문에 index 1은 2번째 요소를 가리켜 2를 출력합니다.
다중 플로우 합성 (Composing multiple flows)
zip
두 개의 플로우들의 값(요소)들을 병합하는 연산자
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
출력 결과
1 -> one
2 -> two
3 -> three
zip 연산자의 요소 a,b는 각각 a=nums 플로우 / b=strs 플로우의 요소들이 됩니다.
만약 두 개의 플로우 요소의 크기가 다르다면? : 두 개의 플로우 중 크기가 작은 플로우에 맞춰서 출력
val nums = (1..5).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
nums 플로우는 총 5개의 요소, strs 플로우는 총 3개의 요소 -> 새로운 플로우는 3개의 요소로 맞춰짐
출력 결과는 위의 예시와 동일합니다.
1 -> one
2 -> two
3 -> three