Coroutines in Kotlin: flow

flow introduction

The methods of launching coroutines introduced earlier, such as launch and async, are all single-start coroutines. If there are complex scenarios, such as sending multiple data, you need to use the flow data flow. In flow, data is sent upstream like water flow, processed by intermediate stations, and received downstream.

create flow

There are 3 ways to create a flow:

  1. flow{}
  2. flowOf()
  3. asFlow()

flow

Use emit in flow{} to send data.

fun flowEmit() = runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }
        .filter {
            it > 2
        }
        .map {
            it * 2
        }
        .take(2)
        .collect {
            // 6 8
            println(it)
        }
}

flowOf

flowOf() can convert a specified string of data into a flow, accepting variable parameters.

fun flowOfFun() = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect {
            // 6 8
            println(it)
        }

    listOf(1, 2, 3, 4, 5)
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .forEach {
            // 6 8
            println(it)
        }
}

asFlow

asFlow() can convert a List collection to a flow. toList() can convert flow to List collection.

fun flow2list() = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        // flow to list
        .toList()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .forEach {
            println(it)
        }

    listOf(1, 2, 3, 4, 5)
        // list as flow
        .asFlow()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect {
            println(it)
        }
}

Intermediate operator

After creating the flow, use intermediate operators to process each data of the flow. The intermediate operators of flow are very similar to the operators of list collections.
Common intermediate operators:

  1. filter
  2. map
  3. take

filter

filter incoming judgment conditions, filter the data when the conditions are met, otherwise the data will not flow downstream.

map

map is passed into the mapping function, each data is passed into the mapping function, and the result is passed to the downstream.

take

take passes in a non-negative integer n, and takes the first n data and passes it downstream.

fun flowEmit() = runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }
        .filter {
            it > 2
        }
        .map {
            it * 2
        }
        .take(2)
        .collect {
            // 6 8
            println(it)
        }
}

termination operator

collect is the termination operator of flow. It collects the final result of each data after passing through the intermediate operator, indicating the termination of the flow, and the intermediate operator cannot be called later.

Besides collect, there are some other termination operators, first, single, fold, reduce.

collect

Returns all elements, ending the flow.

fun flowEmit() = runBlocking {
    flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }
        .filter {
            it > 2
        }
        .map {
            it * 2
        }
        .take(2)
        .collect {
            // 6 8
            println(it)
        }
}

first

Returns the first element, ending the flow.

fun flowFirst() = runBlocking {
    val first = flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }
        .filter {
            it > 2
        }
        .map {
            it * 2
        }
        .take(2)
        .first()
    // 6
    println(first)
}

single

Returns the unique element, ending the flow. There can be no more than one, nor one without.

fun flowSingle() = runBlocking {
    val single = flow {
        emit(3)
    }
        .filter {
            it > 2
        }
        .map {
            it * 2
        }
        .take(2)
        .single()
    // 6
    println(single)
}

fold

Collapse all elements. Specify a function and initial value, execute the function repeatedly for each element, and return the final result.

fun flowFold() = runBlocking {
    val fold = flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.filter {
        it > 2
    }.map {
        it * 2
    }.take(2)
        .fold(0) { acc, value ->
            acc + value
        }

    // 14
    println(fold)
}

reduce

reduce is similar to fold, reduce has no initial value.

fun flowReduce() = runBlocking {
    val reduce = flow {
        emit(1)
        emit(2)
        emit(3)
        emit(4)
        emit(5)
    }.filter {
        it > 2
    }.map {
        it * 2
    }.take(2)
        .reduce { acc, value ->
            acc + value
        }
    // 14
    println(reduce)
}

first, single, fold, and reduce are essentially encapsulations of collect, so they are all termination operators.

The life cycle

onStart

onStart is the flow's start lifecycle callback. The timing of onStart's execution has nothing to do with its position in the flow.

Comparing the following two methods, onStart will be called back at the first time.

fun onStartFun() = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        .filter {
            println("filter: $it")
            it > 2
        }
        .map {
            println("map: $it")
            it * 2
        }
        .take(2)
        .onStart {
            println("onStart")
        }
        .collect {
            println("collect: $it")
        }
}

fun onStartFun2() = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        .take(2)
        .filter {
            println("filter: $it")
            it > 2
        }
        .map {
            println("map: $it")
            it * 2
        }
        .onStart {
            println("onStart")
        }
        .collect {
            println("collect: $it")
        }
}

onComplete

Call back onComplete after the flow is executed. The timing of onComplete's execution has nothing to do with its position in the flow.

The flow calls back onComplete after the normal execution.

fun onCompleteFun() = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        .onCompletion {
            println("onCompletion")
        }
        .filter {
            println("filter: $it")
            it > 2
        }
        .take(2)
        .collect {
            println("collect: $it")
        }
}

The flow execution was canceled, or an exception occurred in the flow execution.

fun cancelOnCompleteFun() = runBlocking {
    launch {
        flow {
            emit(1)
            emit(2)
            emit(3)
        }
            // collect: 1
            //collect: 2
            //cancel
            //onCompletion first: kotlinx.coroutines.JobCancellationException
            .onCompletion {
                println("onCompletion first: $it")
            }
            .collect {
                println("collect: $it")
                if (it == 2) {
                    // cancel flow
                    cancel()
                    println("cancel")
                }
            }
    }

    delay(1000)

    flowOf(4, 5, 6)
//    collect: 4
//    onCompletion second: java.lang.IllegalStateException
        .onCompletion {
            println("onCompletion second: $it")
        }
        .collect {
            println("collect: $it")
            throw IllegalStateException()
        }

}

exception handling

The exception handling of flow can be divided into upstream exception and downstream exception. Upstream exceptions refer to exceptions that occur when creating a flow or an intermediate operator. Downstream exceptions refer to exceptions that occur with the terminating operator collect.

upstream exception

Upstream exceptions can be caught with the catch function. The catch function is relative to its position and can only catch exceptions upstream of the catch.

fun flowCatch() = runBlocking {
    val flow = flow {
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }
    flow.map {
        it * 2
    }.catch {
        println("catch: $it")
    }.collect {
        println(it)
    }
    //    2
    //    4
    //    catch: java.lang.IllegalStateException
}

Downstream exception

Downstream exceptions cannot be caught with the catch function, and need to be caught with try-catch in the scope of collect.

The catch function cannot catch the downstream filter divide-by-0 exception.

fun flowCatchDownStream() = runBlocking {
    val flow = flow {
        emit(1)
        emit(2)
        emit(3)
    }

    flow.map {
        it * 2
    }.catch {
        println("catch: $it")
    }.filter {
        it / 0 > 1
    }.collect {
        println(it)
    }
    // Exception in thread "main" java.lang.ArithmeticException: / by zero
}

Use try-catch to catch collect exceptions.

fun flowTryCatch() = runBlocking {
    flowOf(4, 5, 6)
        .onCompletion {
            println("onCompletion second: $it")
        }
        .collect {
            try {
                println("collect: $it")
                throw IllegalStateException()
            } catch (e: Exception) {
                println("catch $e")
            }
        }
//    collect: 4
//    catch java.lang.IllegalStateException
//            collect: 5
//    catch java.lang.IllegalStateException
//            collect: 6
//    catch java.lang.IllegalStateException
//            onCompletion second: null
}

thread switching

flowOn

flowOn can specify the thread on which all upstream operators run, relative to its location.

collect runs on the main thread and upstream runs on the IO thread, specifying the DefaultDispatcher.

fun flowOn() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit: 1")
        emit(2)
        logX("Emit: 2")
        emit(3)
        logX("Emit: 3")
    }

    flow.filter {
        logX("Filter: $it")
        it > 2
    }
        .flowOn(Dispatchers.IO)
        .collect {
            logX("Collect: $it")
        }
//    ================================
//    Start
//    Thread:DefaultDispatcher-worker-1, time:1666096501866
//    ================================
//    ================================
//    Filter: 1
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Emit: 1
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Filter: 2
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Emit: 2
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Filter: 3
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Emit: 3
//    Thread:DefaultDispatcher-worker-1, time:1666096501917
//    ================================
//    ================================
//    Collect: 3
//    Thread:main, time:1666096501917
//    ================================
}

Before flowOn filter, emit is executed on the IO thread, and filter and collect are executed on the main thread.

fun flowOnIO() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit: 1")
    }

    flow.flowOn(Dispatchers.IO)
        .filter {
            logX("Filter: $it")
            it > 0
        }
        .collect {
            logX("Collect: $it")
        }

//    ================================
//    Start
//    Thread:DefaultDispatcher-worker-1, time:1666165816908
//    ================================
//    ================================
//    Emit: 1
//    Thread:DefaultDispatcher-worker-1, time:1666165816942
//    ================================
//    ================================
//    Filter: 1
//    Thread:main, time:1666165816944
//    ================================
//    ================================
//    Collect: 1
//    Thread:main, time:1666165816944
//    ================================
}

Because flowOn can only be used upstream, you can use withContext in collect to switch threads, but this is not recommended.

collect runs on DefaultDispatcher and others run on main thread.

fun flowWithContext() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit: 1")
    }

    flow.filter {
        logX("Filter: $it")
        it > 0
    }
        .collect {
            // not recommend
            withContext(Dispatchers.IO) {
                logX("Collect: $it")
            }
        }

//    ================================
//    Start
//    Thread:main, time:1666167319244
//    ================================
//    ================================
//    Filter: 1
//    Thread:main, time:1666167319297
//    ================================
//    ================================
//    Collect: 1
//    Thread:DefaultDispatcher-worker-2, time:1666167319311
//    ================================
//    ================================
//    Emit: 1
//    Thread:main, time:1666167319312
//    ================================
}

flow's emit, filter, and collect all run on DefaultDispatcher.

fun flowWithContextAll() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit: 1")
    }

    // not recommend
    withContext(Dispatchers.IO) {
        flow.filter {
            logX("Filter: $it")
            it > 0
        }.collect {
            logX("Collect: $it")
        }
    }
//    ================================
//    Start
//    Thread:DefaultDispatcher-worker-1, time:1666167769589
//    ================================
//    ================================
//    Filter: 1
//    Thread:DefaultDispatcher-worker-1, time:1666167769645
//    ================================
//    ================================
//    Collect: 1
//    Thread:DefaultDispatcher-worker-1, time:1666167769645
//    ================================
//    ================================
//    Emit: 1
//    Thread:DefaultDispatcher-worker-1, time:1666167769645
//    ================================
}

launchIn

flow provides the launchIn function to specify which thread to execute on. launchIn runs in the specified CoroutineScope.

Before flowOn runs in Dispatchers.IO, downstream runs in the scope specified by launchIn.

fun flowLaunchIn() = runBlocking {
    val flow = flow {
        logX("Start")
        emit(1)
        logX("Emit: 1")
    }

    val scope = CoroutineScope(mySingleDispatcher)

    flow.flowOn(Dispatchers.IO)
        .filter {
            logX("Filter: $it")
            it > 0
        }.onEach {
            logX("Collect: $it")
        }.launchIn(scope)

    delay(100L)

//    ================================
//    Start
//    Thread:DefaultDispatcher-worker-1, time:1666168824669
//    ================================
//    ================================
//    Emit: 1
//    Thread:DefaultDispatcher-worker-1, time:1666168824704
//    ================================
//    ================================
//    Filter: 1
//    Thread:mySingleThread, time:1666168824706
//    ================================
//    ================================
//    Collect: 1
//    Thread:mySingleThread, time:1666168824706
//    ================================

}

launchIn calls the scope's launch and then executes collect. Equivalent to the termination operator.

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}

flow is cold

The flow is cold and will only send data if the receiver exists. If collect is not called, emit will not execute. Instead the channel is hot and will send regardless of whether there are receivers.

The emit of flow is not executed.

fun flowCold() = runBlocking {
    val flow = flow {
        (1..3).forEach {
            println("Before send $it")
            emit(it)
            println("Send $it")
        }
    }

    val channel = produce<Int>(capacity = 0) {
        (1..3).forEach {
            println("Before send $it")
            send(it)
            println("Send $it")
        }
    }

    println("end")
//    end
//    Before send 1

}

Summarize

flow is a solution provided by kotlin to solve complex asynchronous scenarios.

  1. flow consists of three parts: creation, intermediate operator, and termination operator.
  2. The life cycle of a flow can be divided into onStart and onComplete, regardless of their position in the flow.
  3. flow exception handling uses catch. catch is position dependent.
  4. The thread switching of flow uses flowOn and launchIn. flowOn controls the upstream, and launchIn controls the global.
  5. A flow is cold and will only start executing if there are receivers.

Tags: Java Android kotlin

Posted by Deany on Thu, 20 Oct 2022 20:14:48 +0530