문제
4번 문제는 StateFlow와 SharedFlow에 대한 문제이다
// file: "문제 4번.kt"
package roomescape
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import roomescape.assertion.assertHashcode
class Step4 {
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `StateFlow와 SharedFlow`() = runTest {
// given
val actual: StringBuilder = StringBuilder()
val a = MutableStateFlow(1)
val b = MutableStateFlow(true)
val c = MutableSharedFlow<Boolean>()
// when
val collectorJob = launch {
a
.flatMapLatest { b.filter { it } }
.flatMapLatest { c.filter { it } }
.onEach { actual.append(it) }
.collect()
}
val emitterJob = launch {
delay(100)
c.emit(true)
b.value = false
a.value = 10
c.emit(false)
b.value = true
a.value = 5
}
emitterJob.join()
collectorJob.cancelAndJoin()
// then
val expected = "" // TODO: 결과값 예상
/*
TODO: 간단한 풀이과정 작성
*/
// assert문 수정하지 마세요!
assertHashcode(actual, expected)
}
}
개념 정리
우선 SharedFlow와 StateFlow에 대해서 간단하게 알고 가자.
Flow 하면 대표적으로 Cold Stream과 Hot Stream에 대한 이야기가 나오게 된다.
Cold Stream, HotStream
간단하게 정리하면,
구분 ColdStream HotStream 데이터가 생성되는 위치 내부 외부 발행한 데이터를 여러명이 동시에 수신 가능 여부 X O 데이터 생산 시점 소비자가 소비 시작 시 소비자가 있든 말든 신경 X 이다.
Flow는 Cold Stream이지만 StateFlow와 StateFlow는 Hot Stream에 해당된다.StateFlow, SharedFlow
역시 간단하게 StateFlow와 SharedFlow의 차이를 알아보자
구분 StateFlow SharedFlow 초기값 필요 여부 O X replay(collect 이전 시점의 값) 설정 가능 여부 X O replay 기본 값 1 0 주 목적 UI 등 최신 상태 보유하기 위해 특정 이벤트 트리거 등을 위해 여기서 주의할 건 StateFlow의 경우 replay 설정이 되지 않지만, 기본적으로 replay가 1인 것과 동일한 기능을 한다.
즉 collect를 했을 때 StateFlow가 가지고 있던 가장 최신 값을 수신하면서 시작한다.아래는 StateFlow에 대한 설명 일부이다.
StateFlow는 특수 목적의 SharedFlow이다.
StateFlow는 가장 최근의 값을 하나만 다시 재생하며 resetReplayCache를 지원하지 않는다.
StateFlow 파일 중 관련 설명 일부flatMapLatest, filter, onEach 함수
본 문제에서 쓰인 위 함수에 대해 조금은 알아둘 필요가 있을 것 같아 설명을 추가한다.
위 3개의 함수는 모두 중간 연산자로 상위 Flow에서 들어온 값으로 내부의 다른 Flow를 생성하며 이전 내부 Flow가 아직 동작하고 있었을 경우 취소한다.
이들은 Flow를 생성하며, Flow는 Cold Stream이다
중간 연산자라는 것에 주목하자.
이 함수들은 Flow를 구독하는 함수가 아니다.
따라서 구독을 하지 않으면 동작하지 않는다.각각의 함수의 역할은 아래와 같다.
구분 역할 flatMapLatest 상위 Flow에서 들어온 값을 이용해 내부에서 Flow 생성
끝나지 않은 기존 내부 Flow는 취소시킴filter filter의 조건에 맞을 시 값을 내보내는 Flow를 생성함 onEach 값이 들어올 때마다 Side Effect 처리
풀이
이제 코드를 보자.
runTest를 제외하고, 위에서부터 차례대로
초기 값
1을 가지는StateFlow a생성초기 값
true를 가지는StateFlow b생성초기 값을 가지지 않으며,
Boolean타입을 받는SharedFlow c생성구독하는 코드가 들어있는 자식 코루틴
collectorJob 실행a.flatMapLatest로 들어오는 값에 대해b.filter{ it }로 Flow 생성다시 한번
flatMapLatest로4-1번 과정을 통해 들어온 값에 대해서c.filter{ it }으로 Flow 생성이렇게 들어온 값 각각에 대해서
actual에 추가이러한 Flow를 구독하는
collect실행
값을 방출하는 코드가 들어있는 자식 코루틴
emitterJob실행잠시 기다렸다가
SharedFlow c에true값 방출StateFlow b의 상태를false로 변경StateFlow a의 값을10으로 변경SharedFlow c에false값 방출StateFlow a의 값을5로 변경
emitterJob이 끝나기를 기다렸다가, 끝나면 구독하는collectorJob도 종료
앞서 StateFlow 설명에서 collect시 가장 최신 상태를 수신하면서 구독을 시작한다는 것을 참고한다면,
collectJob이 실행되었을 때 a.flatMapLatest가 반환하는 Flow에서 1을 방출한다.
그리고 내부의 b.filter{ it}가 동작하여 b의 값이 true일 경우 true를 방출하는 Flow를 생성하는데, b의 값은 true이므로 true가 방출된다.
방출하는 건 a.flatMapLatest가 아니라 b의 filter가 반환하는 Flow이다.
b는 boolean이므로 1이 아니라 true라는 값이 방출되는 것이다.
이어서 다음 flatMapLatest가 실행되어 들어온 c.filter{ it }이 실행된다.
다만 c는 SharedFlow로 초기 값이 없고, 딱히 상태를 가지지 않으므로 값을 방출하지 않는다.
따라서 부모 Flow에서 값이 방출된 게 없으므로 아래의 onEach는 실행되지 않는다.
그 동안 emitterJob이 실행되고, delay로 일정 시간 기다린 이후, emit을 진행한다. c에 true 값의 이벤트를 방출하게 되면, c의 값이 true가 되어 c.filter { it }에서 true를 방출한다.
그러면 하위 onEach로 true가 들어오게 되고 이를 actual에 넣는다.
그 이후 b.value가 false가 되는데 false이면 filter에 만족하지 않아 flatMapLatest에서 값을 방출하지 않기 때문에,
기존의 2번째 flatMapLatest 내부의 기존 Flow가 취소 및 재생성 되지 않는다.
그 이후 a.value를 10으로 설정하면, a에서 새로운 값을 방출하므로, 맨 위 flatMapLatest의 내부 Flow인 b.filter { it }을 취소하고 다시 시작한다.
b는 false이므로 filter에 통과되지 않아 Flow에서 값을 방출하지 않아 두 번째 flatMapLatest는 내부 Flow c.filter { it }을 그대로 유지한다.
그리고 c에 false를 방출하지만, filter의 조건에 통과되지 않아 값을 방출하지 않는다.
그리고 b.value가 true가 되어 두 번째 flatMapLatest의 내부 Flow가 취소되고, 다시 실행된다.
다만 c는 SharedFlow로 이전 값을 저장하지 않기 때문에 값이 없어 filter가 작동하지 않는다.
그리고 a.value가 5로 다시 바뀌어 첫 번째 flatMapLatest의 내부 Flow가 취소됐다가 다시 실행된다.
이어서 b도 true이므로 두 번째 flatMapLatest의 내부 Flow가 취소됐다가 다시 실행되지만, 마찬가지로 c의 값이 없어 방출하지 않는다.
이렇게 해서 emitterJob의 작업이 모두 끝나게 된다.
정리하면
emitterJob실행 이전에a의 초기 값과b의 초기값을 가지고collectorJob이 실행됨- 첫번째
flatMapLatest까지는 통과했지만, c가 SharedFlow라서 값을 가지고 있지 않아 값을 내부 flow에서 값을 방출하지 않음 - 그러다
emiterJob의 2번째 작업인c.emit(true)로 인해c가true가 되면서 돌아가고 있던c.filter가 반환하는 Flow에서true방출 onEach에서는 내려받은 값(it)을actual에 추가b.value = false가 실행되지만,b.value가 false면 첫 번째flatMapLatest내부 Flow의 조건에 부합하지 않아 값을 방출하지 않음 => 두 번째 flatMapLatest 내부의 Flow가 취소 되지 않고 유지됨a.value=10가 실행되어 첫 번째flatMapLatest의 내부 Flow가 취소되고 재생성되지만,b가false이기 때문에 마찬가지로 두 번째flatMapLatest가 취소 되지 않고 유지됨c에false를 방출하지만 내부 Flow의 조건에 부합하지 않아 값을 방출하지 않음b.value = true가 실행돼 첫번째flatMapLatest내부 Flow의 조건을 만족해 값이 방출되고, 두 번째flatMapLatest내부 Flow가 취소 및 재생성되지만, c는 값이 없으므로filter에 통과되지 않아 Flow에서 값을 방출하지 않음a.value를 바꾸어 첫 번째flatMapLatest내부 Flow가 취소되고, 조건을 만족하여 값을 방출한 뒤, 두 번째flatMapLatest내부 Flow가 취소 및 재생성되지만, 마찬가지로 c는 값이 없어 값을 방출하지 않아onEach가 작동하지 않음
그렇기 때문에 true가 정답이 된다.
번외
만약
emitterJob에서b.value = false와a.value = 10사이에c.emit(true)가 있었다면?b.value가false라면 첫 번째flatMapLatest내부의 Flow에서 조건을 만족하지 않아 값을 방출하지 않는다.
=> 아래의flatMapLatest에서 내부 Flow를 취소 및 재생성 하지 않기 때문에,c.filter{it}은 그대로 존재한다.
=>c가true가 들어오면c.filter{it}의 조건에 부합하므로 값을 방출하고,onEach가 다시 동작하여 정답은truetrue가 된다.만약
collectorJob의 마지막에collect를 해주지 않았다면?onEach등 중간 연산자는 구독을 하는 함수가 아니다.
그리고collect를 하는 대상은onEach에서 반환하는 Flow를 구독하는데,
기본적으로 Flow는Cold Stream이기 때문에, 구독하지 않으면 데이터를 생산하지 않아 아무런 일도 일어나지 않는다.onEach의 Flow가HotStream이라면?stateIn이라는 함수를 통해서 Flow를StateFlow로 만들어HotStream으로 바꿀 수 있다.
onEach { actual.append(it) }.stateIn(this)와 같이 Flow를HotStream으로 바꾼다면collect를 해주지 않더라도 작업이 진행되어, 구독하지 않았음에도true가 정답이 된다.