Android Kotlin Flows

From bibbleWiki
Revision as of 22:32, 18 March 2025 by Iwiseman (talk | contribs) (The Unit Test)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Introduction

This is a page to capture anything important about kotlin flows. This very similar to RxJava

Simple Example

Note flow this code would not be executed as it is a cold flow, if there are no subscribers like RxJava

 
  val countDownFlow = flow<Int> {
    val startingValue = 10
    var currentValue = startingValue
    emit(currentValue)
    while (currentValue > 0) {
      delay(1000L)
      currentValue--
      emit(currentValue)
    }
  }

We can log our emits in the ViewModel to debug with

 
    init {
        collectFlow()
    }

    private fun collectFlow() {
        viewModelScope.launch {
            countDownFlow.collect { time ->
                println("Time remaining: $time")
            }
        }
    }

We can use collectLatest which will only output the latest state

Flow Operators

Like RxJava there are a billion operators. Here is filter.

 
    private fun collectFlow() {
        viewModelScope.launch {
            countDownFlow.filter {time ->
                time % 2 == 0
            }.collect { time ->
                println("Time Remaining: $time")
            }
        }
    }

Popular ones covered in the video are

  • Filter
  • Map
  • OnEach (for debugging, forgotten the one for RxJava)

Terminal Flow Operators

  • count (at the end)
  • reduce
  • fold (Same as Reduce with an initial value)

Flattening Operarors

  • flatMap
  • flatMapConcat
  • flatmapMerge

Other Operators with no name

  • buffer (This moves the emits to different co-routines)
  • conflate (need to read up on this)
  • collectLatest

State Flow and Shared Flow

State Flows are hot flows, and are used in the ViewModel to store view state. If the device is rotated we need something to get the original UI state from. We see this used this the following

 
private val _stateFlow = MutableStateFlow(0)
val stateFlow = _stateFlow.asStateFlow()

fun increase() {
  _stateFlow.value += 1
}

State Flow are events you need to keep. Shared Flows are for one time events

Unit Testing Flows

Set Up

The libraries he liked to use were

  • app.cash.turbine
  • com.google.truth
  • org.jetbrains.kotlinx-coroutines-test
 
    testImplementation(libs.turbine) // 1.2.0
    testImplementation(libs.truth) // 1.4.4
    testImplementation(libs.kotlinx.coroutines.test) // 1.10.1

Overview

So I guess the first thing to mention is that Dependency Injection is so important to allow unit testing.

This was a bit of a challenge but got it working in the end. The main thrust of the test was to demonstrate how to cope with tests were delay was used and how to manage time. I suspect my approach is probably old but this worked for me. I will break this down into

  • What I am testing
  • The Dispatchers
  • The Unit Test

What I am testing

Here is the View Model I will be testing from the YouTube [here].

 
class FlowyViewModel(
    private val dispatchers: DispatcherProvider
) : ViewModel() {

    val countDownFlow = flow<Int> {
        val startingValue = 5
        var currentValue = startingValue
        emit(startingValue)
        while (currentValue > 0) {
            delay(1000L)
            currentValue--
            emit(currentValue)
        }
    }.flowOn(dispatchers.main)

    private val _stateFlow = MutableStateFlow(0)
    val stateFlow = _stateFlow.asStateFlow()

    private val _sharedFlow = MutableSharedFlow<Int>(replay = 5)
    val sharedFlow = _sharedFlow.asSharedFlow()

    init {
        squareNumber(3)
        viewModelScope.launch(dispatchers.main) {
            sharedFlow.collect {
                delay(2000L)
                println("FIRST FLOW: The received number is $it")
            }
        }
        viewModelScope.launch(dispatchers.main) {
            sharedFlow.collect {
                delay(3000L)
                println("SECOND FLOW: The received number is $it")
            }
        }
    }

    fun squareNumber(number: Int) {
        viewModelScope.launch(dispatchers.main) {
            _sharedFlow.emit(number * number)
        }
    }

    fun incrementCounter() {
        _stateFlow.value += 1
    }

    private fun collectFlow() {
        val flow = flow {
            delay(250L)
            emit("Appetizer")
            delay(1000L)
            emit("Main dish")
            delay(100L)
            emit("Dessert")
        }
        viewModelScope.launch {
            flow.onEach {
                println("FLOW: $it is delivered")
            }
                .collectLatest {
                    println("FLOW: Now eating $it")
                    delay(1500L)
                    println("FLOW: Finished eating $it")
                }
        }
    }
}

The Dispatchers

To allow to override the delay we need to change the dispatchers. So we create an interface

 
interface DispatcherProvider {
    val main: CoroutineDispatcher
    val io: CoroutineDispatcher
    val default: CoroutineDispatcher
}

These are injected into the ViewModel and therefore can be changed for unit testing. In the example Phillipp uses the TestCoroutineDispatcher which is depreciated now in 2024.3.1. Instead you have to use the UnconfinedTestDispatcher.

 
@ExperimentalCoroutinesApi
class DispatcherProviderTest: DispatcherProvider {
    val testDispatcher = UnconfinedTestDispatcher()
    override val main: CoroutineDispatcher
        get() = testDispatcher
    override val io: CoroutineDispatcher
        get() = testDispatcher
    override val default: CoroutineDispatcher
        get() = testDispatcher
}

The Unit Test

Thought this would be a straight swap but it wasn't. In the original code the co-routines use runBlocking but this does not provide advanceTimeBy so we need to instead use runTest. The other thing was, and the reason I do not feel I have solved completely, I had to set the main dispatcher to the testDispatcher because the first test, tests the ViewModel that specifically uses the mainDispatcher. Not doing so results in

My suspicion is that the test uses a different dispatcher and causes the issue

 
@ExperimentalCoroutinesApi
class MainViewModelTest {

    private lateinit var viewModel: FlowyViewModel
    private lateinit var testDispatchers: DispatcherProviderTest

    @Before
    fun setUp() {
        testDispatchers = DispatcherProviderTest()
        viewModel = FlowyViewModel(testDispatchers)
        Dispatchers.setMain(testDispatchers.testDispatcher)
    }

    @After
    fun cleanup() {
        Dispatchers.resetMain()    
    }

    @Test
    fun `countDownFlow, properly counts down from 5 to 0`() = runTest {

        val job = launch {
            viewModel.countDownFlow.test {
                for (i in 5 downTo 0) {
                    // testDispatchers.testDispatcher.advanceTimeBy(1000L)
                    advanceTimeBy(1000L)
                    val emission = awaitItem()
                    assertThat(emission).isEqualTo(i)
                }
                cancelAndConsumeRemainingEvents()
            }
        }
        job.join()
        job.cancel()
    }

    @Test
    fun `squareNumber, number properly squared`() = runTest {
        val job = launch {
            viewModel.sharedFlow.test {
                val emission = awaitItem()
                assertThat(emission).isEqualTo(9)
                cancelAndConsumeRemainingEvents()
            }
        }
        viewModel.squareNumber(3)
        job.join()
        job.cancel()
    }
}

Finally I did some more reading and it is the launching of the test which causes the issue. So we can fix this better by adding the dispatch to the launch. e.g.

 
    @Test
    fun `countDownFlow, properly counts down from 5 to 0`() = runTest(testDispatchers.testDispatcher) {

        val job = launch {
...

Now we can remove in @After Dispatchers.resetMain and in @Before Dispatchers.setMain