07. 코루틴 방탈출 4번 문제 풀이

문제

4번 문제는 StateFlowSharedFlow에 대한 문제이다

// file: "문제 4번.kt"
package roomescape

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import roomescape.assertion.assertHashcode

class Step4 {

    @OptIn(ExperimentalCoroutinesApi::class)
    @Test
    fun `StateFlow SharedFlow`() = runTest {
        // given
        val actual: StringBuilder = StringBuilder()

        val a = MutableStateFlow(1)
        val b = MutableStateFlow(true)
        val c = MutableSharedFlow<Boolean>()

        // when
        val collectorJob = launch {
            a
                .flatMapLatest { b.filter { it } }
                .flatMapLatest { c.filter { it } }
                .onEach { actual.append(it) }
                .collect()
        }
        val emitterJob = launch {
            delay(100)
            c.emit(true)
            b.value = false
            a.value = 10
            c.emit(false)
            b.value = true
            a.value = 5
        }
        emitterJob.join()
        collectorJob.cancelAndJoin()

        // then
        val expected = "" // TODO: 결과값 예상
        /*
            TODO: 간단한 풀이과정 작성
         */

        // assert문 수정하지 마세요!
        assertHashcode(actual, expected)
    }
}

개념 정리

우선 SharedFlowStateFlow에 대해서 간단하게 알고 가자.
Flow 하면 대표적으로 Cold StreamHot Stream에 대한 이야기가 나오게 된다.

  • Cold Stream, HotStream

    간단하게 정리하면,

    구분ColdStreamHotStream
    데이터가 생성되는 위치내부외부
    발행한 데이터를 여러명이 동시에 수신 가능 여부XO
    데이터 생산 시점소비자가 소비 시작 시소비자가 있든 말든 신경 X

    이다.
    FlowCold Stream이지만 StateFlowStateFlowHot Stream에 해당된다.

  • StateFlow, SharedFlow

    역시 간단하게 StateFlow와 SharedFlow의 차이를 알아보자

    구분StateFlowSharedFlow
    초기값 필요 여부OX
    replay(collect 이전 시점의 값) 설정 가능 여부XO
    replay 기본 값10
    주 목적UI 등 최신 상태 보유하기 위해특정 이벤트 트리거 등을 위해

    여기서 주의할 건 StateFlow의 경우 replay 설정이 되지 않지만, 기본적으로 replay가 1인 것과 동일한 기능을 한다.
    즉 collect를 했을 때 StateFlow가 가지고 있던 가장 최신 값을 수신하면서 시작한다.

    아래는 StateFlow에 대한 설명 일부이다.

    StateFlow는 특수 목적의 SharedFlow이다.
    StateFlow는 가장 최근의 값을 하나만 다시 재생하며 resetReplayCache를 지원하지 않는다.

    StateFlow is SharedFlow StateFlow 파일 중 관련 설명 일부

  • flatMapLatest, filter, onEach 함수

    본 문제에서 쓰인 위 함수에 대해 조금은 알아둘 필요가 있을 것 같아 설명을 추가한다.

    위 3개의 함수는 모두 중간 연산자상위 Flow에서 들어온 값으로 내부의 다른 Flow를 생성하며 이전 내부 Flow가 아직 동작하고 있었을 경우 취소한다.

    이들은 Flow를 생성하며, Flow는 Cold Stream이다

    중간 연산자라는 것에 주목하자.
    이 함수들은 Flow를 구독하는 함수가 아니다.
    따라서 구독을 하지 않으면 동작하지 않는다.

    각각의 함수의 역할은 아래와 같다.

    구분역할
    flatMapLatest상위 Flow에서 들어온 값을 이용해 내부에서 Flow 생성
    끝나지 않은 기존 내부 Flow는 취소시킴
    filterfilter의 조건에 맞을 시 값을 내보내는 Flow를 생성함
    onEach값이 들어올 때마다 Side Effect 처리

풀이

이제 코드를 보자.

runTest를 제외하고, 위에서부터 차례대로

  1. 초기 값 1을 가지는 StateFlow a 생성

  2. 초기 값 true를 가지는 StateFlow b 생성

  3. 초기 값을 가지지 않으며, Boolean 타입을 받는 SharedFlow c 생성

  4. 구독하는 코드가 들어있는 자식 코루틴 collectorJob 실행

    1. a.flatMapLatest로 들어오는 값에 대해 b.filter{ it }로 Flow 생성

    2. 다시 한번 flatMapLatest4-1번 과정을 통해 들어온 값에 대해서 c.filter{ it }으로 Flow 생성

    3. 이렇게 들어온 값 각각에 대해서 actual에 추가

    4. 이러한 Flow를 구독하는 collect 실행

  5. 값을 방출하는 코드가 들어있는 자식 코루틴 emitterJob 실행

    1. 잠시 기다렸다가

    2. SharedFlow ctrue 값 방출

    3. StateFlow b의 상태를 false로 변경

    4. StateFlow a의 값을 10으로 변경

    5. SharedFlow cfalse 값 방출

    6. StateFlow a의 값을 5로 변경

  6. emitterJob이 끝나기를 기다렸다가, 끝나면 구독하는 collectorJob도 종료

앞서 StateFlow 설명에서 collect가장 최신 상태를 수신하면서 구독을 시작한다는 것을 참고한다면,
collectJob이 실행되었을 때 a.flatMapLatest가 반환하는 Flow에서 1을 방출한다.

그리고 내부의 b.filter{ it}가 동작하여 b의 값이 true일 경우 true를 방출하는 Flow를 생성하는데, b의 값은 true이므로 true가 방출된다.

방출하는 건 a.flatMapLatest가 아니라 b의 filter가 반환하는 Flow이다.
bboolean이므로 1이 아니라 true라는 값이 방출되는 것이다.

이어서 다음 flatMapLatest가 실행되어 들어온 c.filter{ it }이 실행된다.
다만 cSharedFlow초기 값이 없고, 딱히 상태를 가지지 않으므로 값을 방출하지 않는다.
따라서 부모 Flow에서 값이 방출된 게 없으므로 아래의 onEach실행되지 않는다.

그 동안 emitterJob이 실행되고, delay로 일정 시간 기다린 이후, emit을 진행한다. ctrue 값의 이벤트를 방출하게 되면, c의 값이 true가 되어 c.filter { it }에서 true를 방출한다.
그러면 하위 onEachtrue가 들어오게 되고 이를 actual에 넣는다.

그 이후 b.valuefalse가 되는데 false이면 filter에 만족하지 않아 flatMapLatest에서 값을 방출하지 않기 때문에,
기존의 2번째 flatMapLatest 내부의 기존 Flow가 취소 및 재생성 되지 않는다.

그 이후 a.value10으로 설정하면, a에서 새로운 값을 방출하므로, 맨 위 flatMapLatest의 내부 Flow인 b.filter { it }을 취소하고 다시 시작한다.
bfalse이므로 filter에 통과되지 않아 Flow에서 값을 방출하지 않아 두 번째 flatMapLatest내부 Flow c.filter { it }을 그대로 유지한다.

그리고 cfalse를 방출하지만, filter의 조건에 통과되지 않아 값을 방출하지 않는다.

그리고 b.valuetrue가 되어 두 번째 flatMapLatest내부 Flow가 취소되고, 다시 실행된다.
다만 cSharedFlow로 이전 값을 저장하지 않기 때문에 값이 없어 filter가 작동하지 않는다.

그리고 a.value5로 다시 바뀌어 첫 번째 flatMapLatest의 내부 Flow가 취소됐다가 다시 실행된다.
이어서 btrue이므로 두 번째 flatMapLatest의 내부 Flow가 취소됐다가 다시 실행되지만, 마찬가지로 c의 값이 없어 방출하지 않는다.

이렇게 해서 emitterJob의 작업이 모두 끝나게 된다.

정리하면

  1. emitterJob 실행 이전에 a의 초기 값과 b의 초기값을 가지고 collectorJob이 실행됨
  2. 첫번째 flatMapLatest까지는 통과했지만, c가 SharedFlow라서 값을 가지고 있지 않아 값을 내부 flow에서 값을 방출하지 않음
  3. 그러다 emiterJob의 2번째 작업인 c.emit(true)로 인해 ctrue가 되면서 돌아가고 있던 c.filter가 반환하는 Flow에서 true 방출
  4. onEach에서는 내려받은 값(it)을 actual에 추가
  5. b.value = false가 실행되지만, b.value가 false면 첫 번째 flatMapLatest 내부 Flow의 조건에 부합하지 않아 값을 방출하지 않음 => 두 번째 flatMapLatest 내부의 Flow가 취소 되지 않고 유지됨
  6. a.value=10가 실행되어 첫 번째 flatMapLatest의 내부 Flow가 취소되고 재생성되지만, bfalse이기 때문에 마찬가지로 두 번째 flatMapLatest가 취소 되지 않고 유지됨
  7. cfalse를 방출하지만 내부 Flow의 조건에 부합하지 않아 값을 방출하지 않음
  8. b.value = true가 실행돼 첫번째 flatMapLatest 내부 Flow의 조건을 만족해 값이 방출되고, 두 번째 flatMapLatest 내부 Flow가 취소 및 재생성되지만, c는 값이 없으므로 filter에 통과되지 않아 Flow에서 값을 방출하지 않음
  9. a.value를 바꾸어 첫 번째 flatMapLatest 내부 Flow가 취소되고, 조건을 만족하여 값을 방출한 뒤, 두 번째 flatMapLatest 내부 Flow가 취소 및 재생성되지만, 마찬가지로 c는 값이 없어 값을 방출하지 않아 onEach가 작동하지 않음

그렇기 때문에 true가 정답이 된다.

번외

  1. 만약 emitterJob에서 b.value = falsea.value = 10 사이에 c.emit(true)가 있었다면?

    b.valuefalse라면 첫 번째 flatMapLatest 내부의 Flow에서 조건을 만족하지 않아 값을 방출하지 않는다.
    => 아래의 flatMapLatest에서 내부 Flow를 취소 및 재생성 하지 않기 때문에, c.filter{it}은 그대로 존재한다.
    => ctrue가 들어오면 c.filter{it}의 조건에 부합하므로 값을 방출하고, onEach가 다시 동작하여 정답은 truetrue가 된다.

  2. 만약 collectorJob의 마지막에 collect를 해주지 않았다면?

    onEach중간 연산자는 구독을 하는 함수가 아니다.
    그리고 collect를 하는 대상onEach에서 반환하는 Flow를 구독하는데,
    기본적으로 Flow는 Cold Stream이기 때문에, 구독하지 않으면 데이터를 생산하지 않아 아무런 일도 일어나지 않는다.

  3. onEach의 Flow가 HotStream이라면?

    stateIn이라는 함수를 통해서 Flow를 StateFlow로 만들어 HotStream으로 바꿀 수 있다.
    onEach { actual.append(it) }.stateIn(this)와 같이 Flow를 HotStream으로 바꾼다면 collect를 해주지 않더라도 작업이 진행되어, 구독하지 않았음에도 true가 정답이 된다.


© 2025. Na2te All rights reserved.