코루틴 #4 - CoroutineBuilder와 ScopeBuilder
launch / async / withContext / runBlocking / actor / produce
Coroutine 이전 글
Scope
Scope는 코루틴이 실행되는 범위
GlobalScope는 Application이 종료될 때 까지 존재하는 코루틴
(Activity가 종료되어도 존재하는 코루틴이므로 구분해서 사용해야 함)
CoroutineBuilder
CoroutineScope의 확장함수로, 다양한 요구사항에 맞게 개별적인 Coroutine(코루틴)을 만드는 방법
Coroutine Builder에는 여러 종류 존재 ( launch / async / withContext / runBlocking / actor / produce )
launch() -> Job 반환
: 결과가 없는 코루틴을 생성하는 빌더, 여기서 결과는 반환인스턴스가 아닌 결과값(Value)을 의미
반환하는 Job인스턴스는 생성된 해당 코루틴을 제어하는데 사용
async() -> Deferred<T> 반환
: launch와 다르게 결과를 가지는 코루틴을 생성하는 빌더,
반환하는 Deferred<T>로 코루틴을 제어 / 결과를 받을 수 있는데
결과를 받는다는 의미는 Deferred의 await() 함수를 통해 코루틴 영역의 마지막 라인(결과)를 받을 수 있다
withContext() - T 반환
: async처럼 결과값을 반환하는 빌더로 async와 유사하지만 다른점이 존재한다
async는 반환하는 Deferred<T> 객체로 결과값을 원하는 시점에 await()함수를 통해 결과값을 얻지만,
withContext()는 Deferred<T>객체로 반환하지 않고, 결과(T)를 그 자리에서 반납한다
( async의 Deferred<T>는 지연객체라고 생각하면 좋다. 내가 원하는 시점에 await()으로 결과를 받기에 - 지연 )
runBlocking() - T 반환
: runBlocking은 Scope내의 코루틴(루틴)들이 모두 완료할 때 까지 스레드를 점유한다.
그 말은 Main()에서 CoroutineScope로 코루틴을 생성/실행 할 경우 점유하지 않기 때문에,
Main()은 코루틴이 실행 중임에도 본인의 함수는 끝나고 종료된다.
fun main() {
GlobalScope.launch { // 1
println("Coroutine Running") // 2
delay(5000) // 4
}
println("test") // 3
}
// test
// main() 종료
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
fun main() = runBlocking {
launch {
println("Coroutine Running")
delay(5000)
}
println("test")
}
// test
// Coroutine Running
// 5초 뒤 main() 종료
윗부분은 GlobalScope로 코루틴을 생성/실행 한 코드입니다
명령어 실행순서는 1-2-3-4 순서로 실행되는데, 왜 test가 먼저 출력되고 끝나는 걸까?
: GlobalScope로 생성된 코루틴 내부의 println("Coroutine Running")이 먼저 실행되지만 스레드 풀에서 인식하고 처리까지 시간이 살짝 걸린다. 그것 때문에 println("test")가 먼저 출력되고 코루틴이 진행 중 임에도 main()함수는 끝나버린다
아래부분은 runBlocking으로 코루틴을 생성/실행 한 코드입니다.
runBlokcing으로 실행하면 내부 코루틴(루틴)들이 종료할 때 까지 runBlocking블록이 끝나지 않고 대기하기 때문에 모두 출력되는걸 볼 수 있다
actor() - SendChannel<E> 반환
: 채널을 통해 코루틴 블럭(Scope)와 외부에서 통신을 통해 전송 / 처리의 루틴을 실행하는 빌더이다.
actor 빌더는 SendChannel<E>를 반환, Send채널을 통해 actor()블록으로 채널을 통해 전송을 할 수 있다.
즉, actor{} 블록 내부는 수신자(Receiver)가 되고 / 반환된 SendChannel이 송신자(Sender)라고 보면 된다
produce() - ReceiveChannel<E> 반환
: produce도 actor와 같이 채널을 통해 전송 / 처리 루틴을 실행하는 빌더이다.
actor와 다른 점은 produce()빌더는 ReceiveChannel<E>를 반환한다.
즉, produce{} 블록 내부는 송신자(Sender)가 되고 / 반환된 ReceiveChannel이 수신자(Receiver)가 된다
가장 일반적으로 사용하는 코루틴 빌더는 launch/async 두 가지를 일반적으로 사용
launch
launch는 새로운 코루틴을 생성하고 해당 코루틴에 대한 Job 객체를 반환
반환된 Job을 가지고 해당 코루틴을 제어 'cancel() / cancelAndJoin() / join() 등'
Job은 비동기 작업을 수행하지만 결과 값을 return하지 않는 래퍼런스 (async의 Deferred<T>와 차이점)
// launch의 함수 들여보기
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext, // context, 스레드영역 설정(Main/IO/Default)
start: CoroutineStart = CoroutineStart.DEFAULT, // start, 코루틴 실행시점 설정(DEFAULT/LAZY/)
block: suspend CoroutineScope.() -> Unit // suspend, 코루틴 실행 블록
): Job
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 사용 예시
// context, start인수 생략 - context인수 생략시 부모의 Dispatchers값 사용
launch { // block은 람다식으로 선언
delay(1000L) // 1초 대기
println("Sample Main Thread Running") // Sample Coroutin 출력
}
// context인수 값을 Dispatchers.IO로 설정 (백그라운드 작업)
launch(Dispatchers.IO) {
delay(1000L)
println("Sample Background Thread Running")
}
async
async는 launch와 비슷하게 코루틴을 만들고 해당 코루틴에 대한 래퍼런스를 받아오는데,
launch - Job / async - Deferred<T>의 형태로 반환
async는 미래의 계산 결고가 예상되는 비동기 작업에 대해 사용, 결과를 얻어서 다른 작업에 사용할 때
async의 Deferred<T>는 join() 대신 await() 함수를 통해 대기하고 결과값을 받아오는 점이 차이점
// async 함수 들여보기
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> // 반환 자료형의 차이, Deferred<T>
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 사용 예시
fun main() = runBlocking<Unit> {
val deferred = async { // async로 코루틴 생성
delay(100) // 0.1초 대기
println("async Start")
"async result" // 해당 코루틴블럭(Deferred)의 await() 호출 시 반환 값
}
println("Test")
val job = GlobalScope.launch { // launch로 코루틴 생성
try {
delay(200) // 0.2초 대기
println("launch Start")
delay(Long.MAX_VALUE) // 딜레이
} finally {
println("launch Cancelled") // 해당 job의 cancel() 취소 시 호출되는 finally
}
}
delay(500) // 0.5초 대기
job.cancelAndJoin() // job 취소, 대기
delay(500) // 0.5초 대기
println(deferred.await()) // deferred의 결과 값 얻기
}
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 호출 순서
Test
async Start
launch Start
launch Cancelled
async result
즉시실행
기본적으로 코루틴 시작은 즉시 실행으로 처리
launch/async 함수 호출 시 start인수가 Default값으로 설정되어 있기 때문에,
실행을 미루고 싶다면 start인수를 CoroutineStart.LAZY로 설정하고 실행을 원할 때 해당 코루틴을 호출
fun main() = runBlocking<Unit> {
val deferred = async(start = CoroutineStart.LAZY) { // LAZY로 start 설정
println("async Start")
"async Result"
}
println("Test")
val job = GlobalScope.launch(start = CoroutineStart.LAZY) { // LAZY로 start 설정
println("launch Start")
}
job.start() // launch는 start() 또는 join()으로 해당 코루틴을 실행
// job.join() // start와 같이 실행시키고 join은 대기까지
// deferred.start() // async는 start() 또는 await()으로 해당 코루틴을 실행
deferred.await() // await도 실행시키고 대기까지
}
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 실행결과
Test
launch Start
async Start
CoroutineStart.LAZY로 설정 시 코루틴 블럭의 실행을 호출 시점까지 대기.
실행을 위한 호출은 start() 또는 launch - join() / async - await() 으로 해당 코루틴을 실행할 수 있는데
start()와 join() / await()의 차이점은 start는 실행만 시키고 종료를 대기하지 않는점이 차이점
start() - non-Blocking function / join(),await() - Blocking function
withContext
async와 동일한 역할을 하는 키워드로, 결과 값을 반환하는 형태
async와 차이점은 async는 결과값을 얻으려면 await()을 호출해야 하지만
withContext는 처음부터 결과 리턴까지 대기하는 형태
suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T (source)
그런 점으로 withContext는 주로 코루틴이 취소(Cancellation)되었을 때 finally{} 블록에서 주로 사용
취소(Cancellation) 할 경우 코루틴은 CancellationException 예외를 발생시킨다.
CancellationException 예외 발생 시, try~finally의 finally{} 블록에서 자원을 반납하거나 예외 처리를 한다
이때, finnally{} 블록에서 이미 코루틴이 취소된 상황이기 때문에 다른 중단함수(suspend) delay() 등을 호출할 수가 없는데, 여기서 launch등 새로운 코루틴을 생성하면 CancellationException 예외를 발생시키므로 중단함수를 사용할 방법이 없는 점을 해결할 수 있는게,
withContext + NonCancellable을 사용하는 것이다.
withContext는 async와 다르게 내부 코루틴 블록이 끝나고 결과값 반환까지 종료하지 않는다.
이런 점을 사용해 finally{} 블록에서 자원의 반납 등 중단함수를 사용해서 코루틴을 마무리하는데 사용
withContext 사용 예시 1
fun main(args: Array<String>) = runBlocking {
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
delay(1000)
println("main : I'm running finally!")
}
}
}
delay(1300L)
println("main : I'm tired of waiting!")
job.cancelAndJoin()
println("main : Now I can quit.")
}
18라인에서 cancel() 호출 시 repeat()을 실행하던 코루틴에 CancellationException이 발생하고
finally{}으로 이동해서 delay(1000) 1초 대기 후 println까지 마친 다음에 코루틴이 마무리된다.
왜 withContext + NonCancellable을 사용하면 코루틴이 안죽고 마무리 될까?
의문점은 NonCancellable 구조를 보면 이해가 된다
public object NonCancellable : AbstractCoroutineContextElement(Job), Job {
/**
* Always returns `true`.
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
override val isActive: Boolean get() = true
/**
* Always returns `false`.
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
override val isCompleted: Boolean get() = false
/**
* Always returns `false`.
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
override val isCancelled: Boolean get() = false
// ... and more...
}
구조에서 중요하게 봐야할 점은 isActive(), isCancelled()를 보면 이해가 된다
NonCancellable은 isActive 속성이 항상 true를 반환하도록 구현이 되어있는데,
이 말은 해당 코루틴이 Completed(종료)될 때 까지 무조건 true 반환으로 취소예외에 영향없이 작업을 수행한다
withContext 사용 예시 2
GlobalScope.launch(Dispatchers.IO) { // 1
val name = withContext(Dispatchers.Main) { // 2
println("Start withContext")
sleep(2000)
"My name is Android"
}
println("withContext end?")
println("Result : $name") // 3
}
1. 해당 코루틴(launch)은 IO 스레드 (워커스레드, 백그라운드 스레드)에서 동작
2. 해당 코루틴(withContext)은 Main 스레드 (UI스레드)에서 동작
3. withContext 코루틴이 완료될 때 까지 실행되지 않음
withContext 코루틴이 종료되면 실행
name변수는 마지막 반환값(My name is Android)의 자료형을 타입추론
async와 비슷한 결과값을 리턴하는 코루틴 빌더지만, 다른점은 await()으로 대기하고 결과를 받아올 필요가 없다
실행 결과
Start withContext
withContext end?
Result : My name is Android
runBlocking
runBlocking은 새로운 코루틴을 시작하고 완료까지 현재 스레드를 차단(점유)
runBlocking이 사용하는 스레드는 호출된 지점의 스레드를 사용하는데 Android에서 MainThread 내부에 선언시 runBlocking은 메인스레드를 차단(점유)하기 때문에 5초이상 작업이 발생할 경우 시스템에 의해 ANR 발생
사용을 권장하지 않는 코루틴 빌더
runBlocking의 사용예시
// GlobalScope 사용예시
fun main(args: Array<String>) {
GlobalScope.launch {
println("launch Start")
var job = launch {
delay(5000) // 5초 대기
println("1")
}
job.join() // GlobalScope코루틴의 자식인 job의 종료까지 대기, joint()
}
Thread.sleep(1000) // GlobalScope 실행을 위해 메인스레드 잠시 대기
println("2") // 2 출력
}
// 2
// launch Start
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// runBlocking 사용예시
fun main() = runBlocking<Unit> {
var job = launch { // runBlocking의 자식 코루틴 생성
delay(5000)
println("1")
}
println("2") // 2 출력
}
// 2
// 1
GlobalScope를 사용한 윗부분은 job의 5초대기 후 1을 출력하지 못하고 main()함수가 끝나버림
runBlocking을 사용한 아래부분은 5초 대기 후 1까지 출력을 마치고 main()함수가 종료.
runBlocking은 코루틴이 모두 종료할 때 까지 스레드를 차단(점유)하기 때문에 main()함수가 종료하지 않음
actor
actor는 coroutine의 복합체로 구성 (코루틴 + 코루틴 내부로 캡슐화된 속성값 + 통신가능한 채널)
속성(변수)는 사용된 코루틴 안에서만 유효하고, 다른 Coroutine과는 Channel로 통신하는 방식
actor는 간단히 사용할 때는 function으로 사용하지만, 복잡한 속성(State)을 가지면 class로 사용하는게 적합
actor ?
코루틴 빌더 중 하나, MailBox Channel(메일박스 채널)을 스코프와 결합한 형태
- SendChannel(송신채널)을 반환 하여 스코프 외부에서 채널을 통해 송신(offer)
- 스코프 내부에서 SendChannel이 보낸 메시지를 수신(receive) 후 처리
actor 빌더 구조
@ObsoleteCoroutinesApi
fun <E> CoroutineScope.actor(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
block: suspend ActorScope<E>.() -> Unit
): SendChannel<E> // SendChannel을 반환
CoroutineScope.actor 형태를 보면 actor() 함수는 Scope의 확장함수라는 걸 알 수 있으며,
내부 속성(프로퍼티)를 살펴보면 다른 코루틴 빌더와 다른 점을 찾아볼 수 있다.
1) capacity : 통신하는 채널의 버퍼크기를 의미, 기본 설정값이 0인 것은, 버퍼를 갖지 않는다는 것을 의미
즉, 반환 되는 SendChannel과 ActorScope내부의 ReceiveChannel이 1:1 순서로 통신이 이루어진다
(전송-수신, 전송-수신) Send채널이 전송하고 Receive채널이 수신할 때 까지 Send채널은 대기한다는 의미
2) block : suspend ActorScope<E>.() : 다른 빌더들과는 다른 ActorScope<E>타입,
내부에 CoroutineScope와 ReceiveChannel<E>를 모두 구현하는 인터페이스
ReceiveChannel를 구현함으로써 블록 내부에서 Channel의 수신(receive)을 직접 호출이 가능
@ObsoleteCoroutinesApi
interface ActorScope<E> : CoroutineScope, ReceiveChannel<E>
actor는 Thread-Handler 방식과 유사하게 생각하면 조금 수월하게 이해할 수 있습니다.
연산작업은 Background 처리, 결과표시는 Handler가 수행
Handler에 던진 message는 순차적으로 처리되는데 여기서 Handler의 역할이 Actor라고 생각하면 됩니다.
동기화 이슈가 존재하는 속성값들은 actor가 가지고 있으면서, 속성 값 변경이 필요할 때 actor에게 요청하는 방식
actor는 Channel을 이용하기 때문에 순차적 실행을 보장하면서 동시성으로 인한 오류 방지가 가능
Actor란?
액터는 '스레드' 혹은 '객체'와 구별되는 추상이다.
액터가 차지하는 메모리 공간은 어느 다른 쓰레드 혹은 액터가 접근할 수 없다.
액터 내부에서 일어나는 일은 어느 누구와도 '공유' 되지 않는다는 말
Actor는 서로간에 공유하는 자원이 없고, 서로간의 상태에 접근할 수 없다.
오직 Message만을 이용해서 상태(정보)를 전달하는 방식
-> deadLock '데드락'의 방지
actor 사용예시 1
// actor{} 빌더는 SendChannel을 반환
var sendChannel = actor {
var count = 0 // actor 로컬 변수
for (msg in channel) { // channel을 순회하는 For-loop문
count++ // 수신마다 count 로컬변수 값 증가
printn("receive : $msg") // 수신마다 출력
}
// channel이 닫히고 for-loop문을 벗어나면 출력
println("result $count")
}
}
// 0.5초 딜레이를 갖고 3번 채널에 send(송신) 반복
repeat(3) {
delay(500)
sendChannel.offer("send $it")
}
send.close() // close() 채널닫기 호출 시 채널을 통해 특별한 Close토큰을 전송
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 결과
receive : send 0
receive : send 1
receive : send 2
result 3
actor 사용예시 2
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlin.system.measureTimeMillis
/*
* Sealed Class CounterMsg 정의
* : Sealed class는 enum class의 확장판 개념 (enum은 상수들의 집합이라면 sealed는 클래스들의 집합)
* : Sealed Class는 모두 abstract(추상) 상태로 사용 시 구현이 필요
* : 내부 클래스는 sealed class를 상속(extends) 필수, CounterMsg()
* */
sealed class CounterMsg
object IncCounter : CounterMsg() // Object - 싱글톤 의미
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // GetCounter클래스 (response 속성을 가짐)
fun main() {
runBlocking {
// 확장함수 counterActor() 호출 -> SendChannnel 반환 (counter = SendChannel)
val counter = counterActor()
GlobalScope.massiveRun { // 확장함수 massiveRun(), 인수가 action 하나뿐이라 소괄호'()' 생략 후 람다식으로 표현
counter.send(IncCounter) // massiveRun함수 내부에서 action() 호출 시 실행될 부분
}
// massiveRun이 끝난뒤 실행
val response = CompletableDeferred<Int>() // CompletableDeferred<Int>란 지연가능한 실행객체로 Int를 반환한다는 의미
counter.send(GetCounter(response)) // 채널을 통해 GetCounter(response)를 전송
println("Counter = ${response.await()}") // 결과값 대기 await()
counter.close() // 채널(Channel) 닫기, close() 호출하면 채널을 통해 특별한 닫힘토큰이 전송됨
}
}
// CoroutineScope 확장함수 선언, actor() 빌더 사용으로 SendChannel 인스턴스를 반환
fun CoroutineScope.counterActor() = actor<CounterMsg> {
// actor 블록 내에 속성들은 로컬(지역) 변수로 사용됨
var counter = 0
// actor{} 블록 내부는 수신자(Receiver) 역할로, 채널 전부 순회를 위한 for-loop문
for (msg in channel) {
// when을 이용해 채널을 통해 전송된 msg가 selaed Class 내부의 어떤 type인지 구분
when (msg) {
is IncCounter -> counter++ // abstract(구현), IncCounter 타입이면 counter++
is GetCounter -> msg.response.complete(counter) // GetCounter의 속성인 response의 값을 반환
// complete() 함수? CompletableDeferred<Int>는 지연실행 가능한 객체로 타입은 Int이다.
// complete()는 실행한다는 의미인데 int값을 반환한다는 걸로 이해하면 된다
}
}
}
// CoroutineScope 확장함수, 인수가 "action :suspend () -> Unit"으로 반환값이 없는 지연(suspend) 고차함수(high-order function)
suspend fun CoroutineScope.massiveRun(action: suspend () -> Unit) {
val n = 100 // 로컬(지역) 변수
val k = 1000
val time = measureTimeMillis {
val jobs = List(n) { // List(100), 100개의 코루틴을 생성 반복
launch {
repeat(k) { // repeat(1000), 코루틴 1개당 1000번 반복 실행
action() // action 고차함수 실행
}
}
}
jobs.forEach { it.join() } // 100개의 코루틴(jobs)를 하나씩 join() 종료 대기
}
println("Completed ${n * k} actions in $time ms") // 코루틴이 모두 종료 되고나서 출력할 내용
}
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 실행 결과
Completed 100000 actions in 937 ms
Counter = 100000
produce
produce는 actor와 비슷하게 채널을 통해 통신을 주고받으며 처리를 하는 빌더
actor는 SendChannel을 반환하지만, produce는 ReceiveChannel을 반환한다
actor 반대로 produce{} 블록이 송신자(Sender) / 반환된 Receiver채널이 수신자(Receiver)가 된다
produce 사용예시 1
/* CoroutineScope 확장함수 produceSquares 선언
* : ReceiveChannel<Int>는 produce의 반환 인스턴스
* : produce{} 블록 내부가 송신자(Sender)가 됨
*/
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
// 1~10까지 배수의 값을 전달
for (i in 1..10) send(i * i)
}
fun main() = runBlocking {
// ReceiverChannel<Int> 인스턴스 반환, squares == ReceiverChannel
val squares = produceSquares()
// for-loop문 역할을 하는 학장함수(consumeEach 사용)
squares.consumeEach { println(it) } // 채널을 통해 받은 값 출력
println("Suqares running Done!")
}
ㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡㅡ
// 실행 결과
1
4
9
16
25
36
49
64
81
100
Suqares running Done!