Kotlin Coroutines: Difference between revisions

From bibbleWiki
Jump to navigation Jump to search
 
(27 intermediate revisions by the same user not shown)
Line 1: Line 1:
=Moores Law=
=Moores Law=
I am doing this because of this graph
I am doing this because of this graph<br>
[[File:Moores law.png|700px]]
[[File:42-Years-of-Microprocessor.png|700px]]
<br>
<br>
Previously there is fork/join for asynchronous but this code is far more complicated than it probably needs to be
Previously there is fork/join for asynchronous but this code is far more complicated than it probably needs to be
Line 39: Line 39:
}
}
</syntaxhighlight>
</syntaxhighlight>
=Coroutines=
=Coroutines=
==Introduction==
==Introduction==
Line 46: Line 47:
*run
*run
*async
*async
Co-routines are lightweight threads and you can run many more co-routines than threads. They are scheduled onto a thread so they do not necessarily run on the same thread. A delay operation does not stop the thread only the co-routine. e.g.
 
The major advantage: Coroutines are '''not bound to a single thread'''. They may start execution in one, but then resume in another — it should be seen as a “light-weight thread.” That’s why you can easily start 100K coroutines without experiences a heavy performance impact or worse getting a OutOfMemoryException.
<syntaxhighlight lang="kotlin">
<syntaxhighlight lang="kotlin">
...
...
Line 55: Line 57:
</syntaxhighlight>
</syntaxhighlight>
It is very important not to use blocking code in a co-routine. As above delay is non-blocking to the thread but does delay the co-routine whereas Thread.sleep(5000) blocking.
It is very important not to use blocking code in a co-routine. As above delay is non-blocking to the thread but does delay the co-routine whereas Thread.sleep(5000) blocking.
==An Example==
This example show the usage of suspend routines for maybe an order processing.<br><br>
Before we accept a customer’s order, we need to make sure of the following things before accepting the order:
* Customers’ contact details are valid
* not on our blocked list.
* Check if the customer’s already registered.
* Availability check of all items: are they in stock?
* What’s the current workload for our shipment team?
<br>
[[File:Coroutines1.png]]
<syntaxhighlight lang="kotlin">
import kotlinx.coroutines.*
import kotlin.random.Random.Default.nextBoolean
import kotlin.random.Random.Default.nextDouble
import kotlin.system.measureTimeMillis
fun main() {
  val timeSpend = measureTimeMillis {
    runBlocking {
      val canAcceptOrder = (
        listOf(
          async { isOnBlockedList("max@mustermann.de") },
          async { isAlreadyRegistered("max@mustermann.de") },
          async { checkWorkload() < 0.75 }
        ) +
        listOf("A", "B", "C")
          .map { async { isItemInStock(it) } }
      ).map { it.await() }.none { !it }
      println("Order is acceptable? -> $canAcceptOrder")
    }
  }
  println("Time spend: ${timeSpend}ms")
}
suspend fun isOnBlockedList(email: String): Boolean {
  delay(800L)
  val isBlocked = nextBoolean()
  println("IsBlocked? -> $isBlocked")
  return isBlocked
}
suspend fun isAlreadyRegistered(email: String): Boolean {
  delay(750L)
  val isAlreadyRegistered = nextBoolean()
  println("isAlreadyRegistered? -> $isAlreadyRegistered")
  return isAlreadyRegistered
}
suspend fun isItemInStock(id: String): Boolean {
  delay(250L)
  val isItemInStock = nextBoolean()
  println("isItemInStock? -> $isItemInStock")
  return isItemInStock
}
suspend fun checkWorkload(): Double {
  delay(1000L)
  val workLoad = nextDouble()
  println("Workload? -> $workLoad")
  return workLoad
}
</syntaxhighlight>
This is how the code is executed.<br>
[[File:Coroutines2.png]]


==Join ( job.join() ) ==
==Join ( job.join() ) ==
Line 140: Line 207:
==Timeouts withTimeout() and withTimeoutWithNull() ==
==Timeouts withTimeout() and withTimeoutWithNull() ==
These support timeout withTimeout() we have to wrap out co-routine with try catch. With withTimeoutWithNull() we only need to test the job for null to know if we completed.
These support timeout withTimeout() we have to wrap out co-routine with try catch. With withTimeoutWithNull() we only need to test the job for null to know if we completed.
=Contexts=
=Contexts=
==Introduction==
==Introduction==
Line 261: Line 329:
}
}
</syntaxhighlight>
</syntaxhighlight>
=Consumers and Prdoucers=
==Consumers and Producers==
The producer builder provides a way to simplify the producing of data down a channel. There is no wrapping in a subroutine and send can be called directly. To consume the data we can call consumeEach. The code below is far simpler than the code above. Note the use of '''it''' which can be thought of as the iterator.
The producer builder provides a way to simplify the producing of data down a channel. There is no wrapping in a subroutine and send can be called directly. To consume the data we can call consumeEach. The code below is far simpler than the code above. Note the use of '''it''' which can be thought of as the iterator.
<syntaxhighlight lang="kotlin">
<syntaxhighlight lang="kotlin">
Line 277: Line 345:
     squares.consumeEach { println(it) }
     squares.consumeEach { println(it) }
     println("Done!")
     println("Done!")
}
</syntaxhighlight>
==Pipelining==
We can feed a channel into another channel. Here we feed the produce number into the square channel
<syntaxhighlight lang="kotlin">
fun produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}
fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
    for (x in numbers) send(x * x)
}
fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    squares.cancel() // need to cancel these coroutines in a larger app
    numbers.cancel()
}
</syntaxhighlight>
==Fan out and Fan in==
We can receive on many consumers from one producer (fan out) and send on many producers (fan in) and receive on one consumer.
==Load Balancing==
Let's create a load balancer
[[File:Kotlin Channel Load Balancing.png|700px]]
<syntaxhighlight lang="kotlin">
data class Work(var x: Long = 0, var y: Long = 0, var z: Long = 0)
val numberOfWorkers = 10
var totalWork = 20
val finish = Channel<Boolean>()
var workersRunning = AtomicInteger()
suspend fun worker(input: Channel<Work>, output: Channel<Work>) {
    workersRunning.getAndIncrement()
    for (w in input) {
        w.z = w.x * w.y
        delay(w.z)
        output.send(w)
    }
    workersRunning.getAndDecrement()
    if(workersRunning.get() === 0)
    {
        output.close()
        println("Closing output")
    }
}
fun run() {
    val input = Channel<Work>()
    val output = Channel<Work>()
    println("Launch workers")
    repeat (numberOfWorkers) {
        launch { worker(input, output) }
    }
    launch { sendLotsOfWork(input) }
    launch { receiveLotsOfResults(output) }
}
suspend fun receiveLotsOfResults(channel: Channel<Work>) {
    println("receiveLotsOfResults start")
    for(work in channel) {
        println("${work.x}*${work.y} = ${work.z}")
    }
    println("receiveLotsOfResults done")
    finish.send(true)
}
suspend fun sendLotsOfWork(input: Channel<Work>) {
    repeat(totalWork) {
        input.send(Work((0L..100).random(), (0L..10).random()))
    }
    println("close input")
    input.close()
}
fun main(args: Array<String>) {
    run()
    runBlocking { finish.receive() }
    println("main done")
}
private object RandomRangeSingleton : Random()
fun ClosedRange<Long>.random() = (RandomRangeSingleton.nextInt((endInclusive.toInt() + 1) - start.toInt()) + start)
</syntaxhighlight>
==Select==
We can use select to read from a number of channels. The select will always favour the first listed channel. Note that reading from a channel which is closed will result in error unless we use the onReceiveOrNull. We can add timeouts with onTimeout.
<syntaxhighlight lang="kotlin">
fun producer1() = produce {
    send("from producer 1")
}
fun producer2() = produce {
    send("from producer 2")
}
suspend fun selector(message1: ReceiveChannel<String>, message2: ReceiveChannel<String>): String =
    select<String> {
        message2.onReceiveOrNull { value ->
            value ?: "Channel 2 is closed"
        }
        message1.onReceiveOrNull { value ->
            value ?: "Channel 1 is closed"
        }
        // onTimeout(100) {
        //  println("Timed out")
        // }
    }
fun main(args: Array<String>) = runBlocking<Unit> {
    val m1 = producer1()
    val m2 = producer2()
    repeat(15) {
        println(selector(m1, m2))
    }
}
</syntaxhighlight>
=Actors=
==Why==
Typically in thread programming we may use the following to protect the state of the data.
*Volatile
*Atomic Types
*Locks
*Thread Confinement
Actor provide a way to manage these problems below are examples on how you might solve these problems when incrementing an integer. The Actor example demonstrates and approach to better organization of code which is not necessarily faster - well not faster but cleaner.
==Example==
===Intro===
An example of the sort of problems we might get is when we try and increment a value share by several thread. Unless we specifically wrap this with Atomic then the value will not be accurate.<br>
<br>
Really liked this demo which shows why actors might be for you. They provided five approaches
*Base counter Gave the '''wrong''' result because we do not manage the increment at all
*Fine Grained Slow because of marshalling to common thread
*Course Grained ok
*Atomic of but slightly slow and Course grained
*Mutex much slower than the others
Result are below
===Code===
<syntaxhighlight lang="kotlin">
// 1. show counter class and explain what run does
// 2. run the base counter and show it doesn't work
// 3. add the finr grained code and show it works but is much slower
// 4. change it to course grained
// 5. Show the mutex code and show how slow it is
// 6. Finally show the 'atomic' code as a solution in thie case
open class Counter {
    private var counter = 0
    open suspend fun increment() {
        counter++
    }
    open var value: Int
        get() = counter
        set(value) {
            counter = value
        }
    suspend fun run(context: CoroutineContext, numberOfJobs: Int, count: Int, action: suspend () -> Unit): Long {
        // action is repeated by each coroutine
        return measureTimeMillis {
            val jobs = List(numberOfJobs) {
                launch(context) {
                    repeat(count) { action() }
                }
            }
            jobs.forEach { it.join() }
        }
    }
}
class AtomicCounter : Counter() {
    var counter = AtomicInteger()
    override suspend fun increment() {
        counter.incrementAndGet()
    }
    override var value: Int
        get() = counter.get()
        set(value) {
            counter.set(value)
        }
}
class MutexCounter : Counter() {
    val mutex = Mutex()
    var counter:Int = 0
    override  suspend fun increment() {
        mutex.withLock {
            counter++
        }
    }
    override var value: Int
        get() = counter
        set(value) {
            counter = value
        }
}
fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = 1000 // number of coroutines to launch
    val count = 1000 // work in each coroutine
    var counter = Counter()
    // warm up code
    counter.run(CommonPool, jobs, count) {
        counter.increment()
    }
    counter.value = 0
    var time = counter.run(CommonPool, jobs, count) {
        counter.increment()
    }
    logResult("Base counter", jobs, count, time, counter)
    counter.value = 0
    // use single thread context - fine grained
    val ctx = newSingleThreadContext("Counter")
    time = counter.run(CommonPool, jobs, count) {
        withContext(ctx) {
            counter.increment()
        }
    }
    logResult("Fine grained", jobs, count, time, counter)
    counter.value = 0
    // use single thread context - course grained
    time = counter.run(ctx, jobs, count) {
        counter.increment()
    }
    logResult("Course grained", jobs, count, time, counter)
    counter = AtomicCounter()
    time = counter.run(CommonPool, jobs, count) {
        counter.increment()
    }
    logResult("Atomic", jobs, count, time, counter)
    counter = MutexCounter()
    time = counter.run(CommonPool, jobs, count) {
        counter.increment()
    }
    logResult("Mutex", jobs, count, time, counter)
}
private fun logResult(counterType: String, n: Int, k: Int, time: Long, c: Counter) {
    println("${counterType} completed ${n * k} actions in $time ms")
    println("Counter  : ${c.value}")
}
</syntaxhighlight>
===Result===
*Base counter completed 1000000 actions in 38 ms
Counter  : 229992
*Fine grained completed 1000000 actions in 2941 ms
Counter  : 1000000
*Course grained completed 1000000 actions in 51 ms
Counter  : 1000000
*Atomic completed 1000000 actions in 186 ms
Counter  : 1000000
*Mutex completed 1000000 actions in 1381 ms
Counter  : 1000000
==Creating An Actor==
An Actor consists of three parts
*Coroutine
*State
*Messages
The example below creates three messages. I guess you look at the problem you are trying to solve and create message for each. Sounding like Redux me thinks.
<syntaxhighlight lang="kotlin">
suspend fun run(context: CoroutineContext, numberOfJobs: Int, count: Int, action: suspend () -> Unit): Long {
    // action is repeated by each coroutine
    return measureTimeMillis {
        val jobs = List(numberOfJobs) {
            launch(context) {
                repeat(count) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
}
sealed class CounterMsg
object InitCounter : CounterMsg()
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()
fun counterActor() = actor<CounterMsg> {
    var counter = 0
    for(msg in channel) {
        when(msg) {
            is InitCounter -> counter = 0
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}
fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = 100
    val count = 10000
    val counter = counterActor()
    counter.send(InitCounter)
    val time = run(CommonPool, jobs, count) {
        counter.send(IncCounter)
    }
    var response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Completed ${jobs * count} actions in $time ms")
    println("result is ${response.await()}")
}
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
</syntaxhighlight>
<br>
Result was
* Completed 1000000 actions in 619 ms
result is 1000000
Actors deal with this. Actors are channels with state
*avoid some of the pitfalls of concurrency
*lighwieght
*directly supported by Kotlin
==Just a Fun Rock Paper Scissors Example==
Just put this here to demonstrate some kotlin using actors.
<syntaxhighlight lang="kotlin">
sealed class Move {
    override fun toString(): String {
        return this.javaClass.simpleName.toString()
    }
}
object Rock : Move()
object Paper : Move()
object Scissors : Move()
sealed class Game
class Start(val response: CompletableDeferred<Int>) : Game()
class Play(val sender: Channel<Game>, val name: String) : Game()
class Throw(val actor: String, val move: Move) : Game()
fun playerActor() = actor<Game> {
    var name: String
    for (msg in channel) {
        when (msg) {
            is Play -> {
                name = msg.name
                val selection = (1..4).random()
                lateinit var move: Move
                when (selection) {
                    1 -> move = Rock
                    2 -> move = Paper
                    3 -> move = Scissors
                }
                msg.sender.send(Throw(name, move))
            }
        }
    }
}
fun coordinatorActor() = actor<Game> {
    lateinit var response: CompletableDeferred<Int>
    val player1 = playerActor()
    val player2 = playerActor()
    for (msg in channel) {
        when (msg) {
            is Start -> {
                response = msg.response
                player1.send(Play(channel, "Player 1"))
                player2.send(Play(channel, "Player 2"))
            }
            is Throw -> {
                val playerA = msg.actor
                val moveA = msg.move
                val msg2 = channel.receive() as Throw
                val playerB = msg2.actor
                val moveB = msg2.move
                announce(playerA, moveA, playerB, moveB)
                response.complete(0)
            }
        }
    }
}
fun announce(playerA: String, moveA: Move, playerB: String, moveB: Move) {
    var awin = false
    print("$playerA -> $moveA, $playerB -> $moveB ")
    if (moveA == moveB) {
        println("Draw")
        return
    }
    when (moveA) {
        is Rock -> {
            when (moveB) {
                is Scissors -> {
                    awin = true
                }
            }
        }
        is Scissors -> {
            when (moveB) {
                is Paper -> {
                    awin = true
                }
            }
        }
        is Paper -> {
            when (moveB) {
                is Rock -> {
                    awin = true
                }
            }
        }
    }
    if (awin) {
        println("$playerA wins")
    } else {
        println("$playerB wins")
    }
}
fun main(args: Array<String>) = runBlocking<Unit> {
    repeat(10) {
        val response = CompletableDeferred<Int>()
        var coord = coordinatorActor()
        coord.send(Start(response))
        response.await()
    }
}
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun ClosedRange<Int>.random() = Random().nextInt(endInclusive - start) + start
</syntaxhighlight>
=Using Corountines in the UI=
Need to review this but the approach was to create a suspend function and launch the work from the UI thread. This allows us to cancel because we have a reference to the job.
<syntaxhighlight lang="kotlin">
fun main(args: Array<String>) = launch<CoroutineApp>(args)
class CoroutineApp : App(IntroView::class) {
    override fun start(stage: Stage) {
        stage.width = 200.0
        stage.height = 400.0
        super.start(stage)
    }
}
class IntroView : View() {
    override val root = BorderPane()
    val counter = SimpleIntegerProperty()
    lateinit var counterJob: Job
    init {
        title = "Counter"
        with(root) {
            style {
                padding = box(20.px)
            }
            center {
                vbox(10.0) {
                    alignment = Pos.CENTER
                    label() {
                        bind(counter)
                        style { fontSize = 25.px }
                    }
                    button("Click to increment") {
                    }.setOnAction {
                        // 2. add launch coroutine
                        counterJob = launch(UI) { increment() }
                    }
                    button("Click to cancel") {
                    }.setOnAction {
                        // 2. add launch coroutine
                        launch(UI) { counterJob.cancel() }
                    }
                }
            }
        }
    }
    suspend fun increment() {
        // 1. add delay
        delay(3000)
        counter.value += 1
    }
}
}
</syntaxhighlight>
</syntaxhighlight>

Latest revision as of 04:13, 9 January 2022

Moores Law

I am doing this because of this graph

Previously there is fork/join for asynchronous but this code is far more complicated than it probably needs to be

    override fun compute(): Long {
        return if (high - low <= SEQUENTIAL_THRESHOLD) {
            (low until high)
                    .map { array[it].toLong() }
                    .sum()
        } else {
            val mid = low + (high - low) / 2
            val left = Sum(array, low, mid)
            val right = Sum(array, mid, high)
            left.fork()
            val rightAns = right.compute()
            val leftAns = left.join()
            leftAns + rightAns
        }
    }

Using the suspend approach is far more easier to read

suspend fun compute(array: IntArray, low: Int, high: Int): Long {

//    println("low: $low, high: $high  on ${Thread.currentThread().name}")

    return if (high - low <= SEQUENTIAL_THRESHOLD) {
        (low until high)
                .map { array[it].toLong() }
                .sum()
    } else {
        val mid = low + (high - low) / 2
        val left = async { compute(array, low, mid) }
        val right = compute(array, mid, high)
        return left.await() + right
    }
}

Coroutines

Introduction

There are many co-routine builders (maybe)

  • runBlocking (wait for co-routine to finish used for unit tests)
  • launch non-blocking
  • run
  • async

The major advantage: Coroutines are not bound to a single thread. They may start execution in one, but then resume in another — it should be seen as a “light-weight thread.” That’s why you can easily start 100K coroutines without experiences a heavy performance impact or worse getting a OutOfMemoryException.

...
launch {
   delay(1000)
   println("world")
}

It is very important not to use blocking code in a co-routine. As above delay is non-blocking to the thread but does delay the co-routine whereas Thread.sleep(5000) blocking.

An Example

This example show the usage of suspend routines for maybe an order processing.

Before we accept a customer’s order, we need to make sure of the following things before accepting the order:

  • Customers’ contact details are valid
  • not on our blocked list.
  • Check if the customer’s already registered.
  • Availability check of all items: are they in stock?
  • What’s the current workload for our shipment team?


import kotlinx.coroutines.*
import kotlin.random.Random.Default.nextBoolean
import kotlin.random.Random.Default.nextDouble
import kotlin.system.measureTimeMillis

fun main() {
  val timeSpend = measureTimeMillis {
    runBlocking {
      val canAcceptOrder = (
        listOf(
          async { isOnBlockedList("max@mustermann.de") },
          async { isAlreadyRegistered("max@mustermann.de") },
          async { checkWorkload() < 0.75 }
        ) +
        listOf("A", "B", "C")
          .map { async { isItemInStock(it) } }
      ).map { it.await() }.none { !it }
      println("Order is acceptable? -> $canAcceptOrder")
    }
  }
  println("Time spend: ${timeSpend}ms")
}

suspend fun isOnBlockedList(email: String): Boolean {
  delay(800L)
  val isBlocked = nextBoolean()
  println("IsBlocked? -> $isBlocked")
  return isBlocked
}

suspend fun isAlreadyRegistered(email: String): Boolean {
  delay(750L)
  val isAlreadyRegistered = nextBoolean()
  println("isAlreadyRegistered? -> $isAlreadyRegistered")
  return isAlreadyRegistered
}

suspend fun isItemInStock(id: String): Boolean {
  delay(250L)
  val isItemInStock = nextBoolean()
  println("isItemInStock? -> $isItemInStock")
  return isItemInStock
}

suspend fun checkWorkload(): Double {
  delay(1000L)
  val workLoad = nextDouble()
  println("Workload? -> $workLoad")
  return workLoad
}

This is how the code is executed.

Join ( job.join() )

Fairly simple

fun main(args: Array<String>) = runBlocking {
   val job = launch {
      delay(1000)
      println("world")
   }
  job.join()
}

Cancel ( job.cancelAndJoin() )

For Cancel we do cancel and join or of course we use the cancelAndJoin(). This cancels because delay() checks for cancel.

fun main(args: Array<String>) = runBlocking {
   val job = launch {
      repeat(1000) {
      delay(100)
      println(".")
  }
  delay(100)
  job.cancel()
  job.join()
// Or
  // job.cancelAndJoin()
}

Yield ( yield() ) or isActive

We can use yield within out own code or isActive() if we want to do stuff beyond yield.

fun main(args: Array<String>) = runBlocking {
   val job = launch {
      repeat(1000) {
      yield()
      // if (!isActive) throw CancellationException()
      println(".")
  }
  delay(100)
  job.cancelAndJoin()
}

Warning We must do a return to a non local return, i.e. a return outside of our loop. The code below will not work

fun main(args: Array<String>) = runBlocking {
   val job = launch {
      repeat(1000) {
      yield()
      if (!isActive) return@repeat
      println(".")
  }
  job.cancelAndJoin()
}

Exceptions in Coroutines

We need to be careful as ever in cancelling and making sure we understand how the co-routine is torn down below demonstrate how this could be done. The run(NonCancellable) allow you to run a suspend function within your handling but be careful

...
  try {
...
  } catch(ex: CancellationException) {
    println("Cancelled: ${ex.message}")
  } finally {
    run(NonCancellable) {
       println("Forced non Cancel")
    }
  }
  delay(100)
  job.cancel(CancellationException("Because I can"))
  job.join()
}

Summary for Exceptions

  • Can be use to specify the reason why
    • job.cancel(CancellationException("why"))
  • Can specify any exception
    • Job.cancel(SomeExceptionType()_
  • Be Careful with this
    • if using launch will tear down the thread/kill application
    • Can use it with the async co-routine builder

Timeouts withTimeout() and withTimeoutWithNull()

These support timeout withTimeout() we have to wrap out co-routine with try catch. With withTimeoutWithNull() we only need to test the job for null to know if we completed.

Contexts

Introduction

Contexts provide a coroutine dispatcher to determine which thread the corountine is run on. Coroutines can run on

  • Pool thread
  • Unconfined thread
  • Specific thread

You specify the context in the coroutine builder. You can see the thread it is running on using Thread.currentThread().name

  • Unconfined (Start on Thread from context of current coroutine but after delay is managed)
  • DefaultDispatcher (default currently Fork/Join Pool)
  • CommonPoool (default currently Fork/Join Pool)
  • newSingleThreadContext (runs on specified thread Expensive)
  • coroutineContext (inherit from context of current coroutine)

Naming Contexts

We can name the contexts, ideal for debugging

fun main(args: Array<String>) = runBlocking {
   val job  = launch(CoroutineName("Iain was ere") + coroutineContext) {
      println("Great ${Thread.currentThread().name}")
   }
   job.join()
}

Accessing Job Attributes

Within the coroutine you can access job attributes via the coroutine context. A silly example might by

fun main(args: Array<String>) = runBlocking {
   val job  = launch {
      println("isActive? ${corountineContext[Job.Key]!!.isActive)")
      // more concisely println("isActive? ${corountineContext[Job]!!.isActive)")
   }
   job.join()
}

Parent Child

Lauching coroutines within corountines we need to consider if they are dependant. If they are we need to make sure the outer waits for the children. One approach is to run the inner coroutine in the same context. e.g.

fun main(args: Array<String>) = runBlocking {
   val outer  = launch {
      launch(coroutineContext) {
 ...
      }
   }
   outer.join()
}

Cancelling the outer corountine will not cancel the children without linking the two.

newSingleThreadConext

With this context we are managing the resources of the context so it is important we ensure it is disposed of appropriately. To do this we use the c# equivalent of using

newSingleThreadContext("MyName").use { ctx ->
   val job = launch(ctx) {
       println("SingleThreadContext thread ${Thread.currentThread().name}")
   }
   job.join()
}

Returning Data (async and await)

Async and Deferred

When using async we get a Deferred<T> object back which is derived from job. I.E. isActive are available to us. The deferred object is like a promise in javascript or a future in java. I was surpised but suspend does not mean async, it means it can be. It we omit the async an int is returned and the code runs syncronously. I really like the async keyword wrapper because it is clear what we are doing.

fun main(args: Array<String>) = runBlocking {
    val job = launch {
        var time = measureTimeMillis {
            println("About to work")
            val r1 = async {doWorkOne()}
            println("About to do more work")
            val r2 = async{doWorkTwo()}

            println("result: ${r1.await() + r2.await()}")
        }
        println("Done in $time")
    }
    job.join()
}
suspend fun doWorkOne(): Int {
    delay(100)
    println("Working 1")
    return Random(System.currentTimeMillis()).nextInt(42)
}
suspend fun doWorkTwo(): Int {
    delay(200)
    println("Working 2")
    return Random(System.currentTimeMillis()).nextInt(42)
}

Lazy

We can have lazy evaluation for example

fun main(args: Array<String>) = runBlocking {
    val job = launch {
        val result = async(start = CoroutineStart.LAZY) {doWorkLazy()}
        println("resultm is ${result.await()}")
    }
    delay(500)
    job.join()
}

Channels

Introduction

We use channels to communicate with coroutines.

  • Send to and receive from a channel
  • More than one item of data
  • Channels block
  • Can create buffered channels
  • Need to know when channel has finished

A simple example below. Note that the send blocks until you receive and vice-versa.

fun main(args: Array<String>) = runBlocking {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
        for (x in 1..5) {
            println("send $x")
            channel.send(x * x)
        }
    }
    println(channel.receive())
    repeat(4) { println(channel.receive()) }
}

Consumers and Producers

The producer builder provides a way to simplify the producing of data down a channel. There is no wrapping in a subroutine and send can be called directly. To consume the data we can call consumeEach. The code below is far simpler than the code above. Note the use of it which can be thought of as the iterator.

// go from the previous demo to this
fun produceSquares() : ProducerJob<Int> = produce<Int> {
    for (x in 1..5) {
        println("sending")
        send(x * x)
    }
    println("sending - done")
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

Pipelining

We can feed a channel into another channel. Here we feed the produce number into the square channel

fun produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun square(numbers: ReceiveChannel<Int>) = produce<Int> {
    for (x in numbers) send(x * x)
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    squares.cancel() // need to cancel these coroutines in a larger app
    numbers.cancel()
}

Fan out and Fan in

We can receive on many consumers from one producer (fan out) and send on many producers (fan in) and receive on one consumer.

Load Balancing

Let's create a load balancer

data class Work(var x: Long = 0, var y: Long = 0, var z: Long = 0)
val numberOfWorkers = 10
var totalWork = 20
val finish = Channel<Boolean>()
var workersRunning = AtomicInteger()
suspend fun worker(input: Channel<Work>, output: Channel<Work>) {
    workersRunning.getAndIncrement()
    for (w in input) {
        w.z = w.x * w.y
        delay(w.z)
        output.send(w)
    }
    workersRunning.getAndDecrement()
    if(workersRunning.get() === 0)
    {
        output.close()
        println("Closing output")
    }
}
fun run() {
    val input = Channel<Work>()
    val output = Channel<Work>()

    println("Launch workers")
    repeat (numberOfWorkers) {
        launch { worker(input, output) }
    }
    launch { sendLotsOfWork(input) }
    launch { receiveLotsOfResults(output) }
}
suspend fun receiveLotsOfResults(channel: Channel<Work>) {

    println("receiveLotsOfResults start")

    for(work in channel) {
        println("${work.x}*${work.y} = ${work.z}")
    }
    println("receiveLotsOfResults done")
    finish.send(true)
}
suspend fun sendLotsOfWork(input: Channel<Work>) {
    repeat(totalWork) {
        input.send(Work((0L..100).random(), (0L..10).random()))
    }
    println("close input")
    input.close()
}
fun main(args: Array<String>) {
    run()
    runBlocking { finish.receive() }
    println("main done")
}
private object RandomRangeSingleton : Random()
fun ClosedRange<Long>.random() = (RandomRangeSingleton.nextInt((endInclusive.toInt() + 1) - start.toInt()) + start)

Select

We can use select to read from a number of channels. The select will always favour the first listed channel. Note that reading from a channel which is closed will result in error unless we use the onReceiveOrNull. We can add timeouts with onTimeout.

fun producer1() = produce {
    send("from producer 1")
}
fun producer2() = produce {
    send("from producer 2")
}
suspend fun selector(message1: ReceiveChannel<String>, message2: ReceiveChannel<String>): String =
    select<String> {
        message2.onReceiveOrNull { value ->
            value ?: "Channel 2 is closed"
        }
        message1.onReceiveOrNull { value ->
            value ?: "Channel 1 is closed"
        }
        // onTimeout(100) {
        //   println("Timed out")
        // }
    }
fun main(args: Array<String>) = runBlocking<Unit> {
    val m1 = producer1()
    val m2 = producer2()

    repeat(15) {
        println(selector(m1, m2))
    }
}

Actors

Why

Typically in thread programming we may use the following to protect the state of the data.

  • Volatile
  • Atomic Types
  • Locks
  • Thread Confinement

Actor provide a way to manage these problems below are examples on how you might solve these problems when incrementing an integer. The Actor example demonstrates and approach to better organization of code which is not necessarily faster - well not faster but cleaner.

Example

Intro

An example of the sort of problems we might get is when we try and increment a value share by several thread. Unless we specifically wrap this with Atomic then the value will not be accurate.

Really liked this demo which shows why actors might be for you. They provided five approaches

  • Base counter Gave the wrong result because we do not manage the increment at all
  • Fine Grained Slow because of marshalling to common thread
  • Course Grained ok
  • Atomic of but slightly slow and Course grained
  • Mutex much slower than the others

Result are below

Code

// 1. show counter class and explain what run does
// 2. run the base counter and show it doesn't work
// 3. add the finr grained code and show it works but is much slower
// 4. change it to course grained
// 5. Show the mutex code and show how slow it is
// 6. Finally show the 'atomic' code as a solution in thie case

open class Counter {
    private var counter = 0
    open suspend fun increment() {
        counter++
    }
    open var value: Int
        get() = counter
        set(value) {
            counter = value
        }
    suspend fun run(context: CoroutineContext, numberOfJobs: Int, count: Int, action: suspend () -> Unit): Long {
        // action is repeated by each coroutine
        return measureTimeMillis {
            val jobs = List(numberOfJobs) {
                launch(context) {
                    repeat(count) { action() }
                }
            }
            jobs.forEach { it.join() }
        }
    }
}
class AtomicCounter : Counter() {
    var counter = AtomicInteger()
    override suspend fun increment() {
        counter.incrementAndGet()
    }
    override var value: Int
        get() = counter.get()
        set(value) {
            counter.set(value)
        }
}
class MutexCounter : Counter() {
    val mutex = Mutex()
    var counter:Int = 0
    override  suspend fun increment() {
        mutex.withLock {
            counter++
        }
    }
    override var value: Int
        get() = counter
        set(value) {
            counter = value
        }
}
fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = 1000 // number of coroutines to launch
    val count = 1000 // work in each coroutine
    var counter = Counter()
    // warm up code
    counter.run(CommonPool, jobs, count) {
        counter.increment()
    }
    counter.value = 0
    var time = counter.run(CommonPool, jobs, count) {
        counter.increment()
    }
    logResult("Base counter", jobs, count, time, counter)

    counter.value = 0
    // use single thread context - fine grained
    val ctx = newSingleThreadContext("Counter")
    time = counter.run(CommonPool, jobs, count) {
        withContext(ctx) {
            counter.increment()
        }
    }
    logResult("Fine grained", jobs, count, time, counter)

    counter.value = 0
    // use single thread context - course grained
    time = counter.run(ctx, jobs, count) {
        counter.increment()
    }
    logResult("Course grained", jobs, count, time, counter)

    counter = AtomicCounter()
    time = counter.run(CommonPool, jobs, count) {
        counter.increment()
    }
    logResult("Atomic", jobs, count, time, counter)

    counter = MutexCounter()
    time = counter.run(CommonPool, jobs, count) {
        counter.increment()
    }
    logResult("Mutex", jobs, count, time, counter)
}
private fun logResult(counterType: String, n: Int, k: Int, time: Long, c: Counter) {
    println("${counterType} completed ${n * k} actions in $time ms")
    println("Counter  : ${c.value}")
}

Result

  • Base counter completed 1000000 actions in 38 ms

Counter  : 229992

  • Fine grained completed 1000000 actions in 2941 ms

Counter  : 1000000

  • Course grained completed 1000000 actions in 51 ms

Counter  : 1000000

  • Atomic completed 1000000 actions in 186 ms

Counter  : 1000000

  • Mutex completed 1000000 actions in 1381 ms

Counter  : 1000000

Creating An Actor

An Actor consists of three parts

  • Coroutine
  • State
  • Messages

The example below creates three messages. I guess you look at the problem you are trying to solve and create message for each. Sounding like Redux me thinks.

suspend fun run(context: CoroutineContext, numberOfJobs: Int, count: Int, action: suspend () -> Unit): Long {
    // action is repeated by each coroutine
    return measureTimeMillis {
        val jobs = List(numberOfJobs) {
            launch(context) {
                repeat(count) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
}

sealed class CounterMsg
object InitCounter : CounterMsg()
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun counterActor() = actor<CounterMsg> {
    var counter = 0
    for(msg in channel) {
        when(msg) {
            is InitCounter -> counter = 0
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val jobs = 100
    val count = 10000
    val counter = counterActor()
    counter.send(InitCounter)
    val time = run(CommonPool, jobs, count) {
        counter.send(IncCounter)
    }
    var response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))

    println("Completed ${jobs * count} actions in $time ms")
    println("result is ${response.await()}")
}
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")


Result was

  • Completed 1000000 actions in 619 ms

result is 1000000

Actors deal with this. Actors are channels with state

  • avoid some of the pitfalls of concurrency
  • lighwieght
  • directly supported by Kotlin

Just a Fun Rock Paper Scissors Example

Just put this here to demonstrate some kotlin using actors.

sealed class Move {
    override fun toString(): String {
        return this.javaClass.simpleName.toString()
    }
}

object Rock : Move()
object Paper : Move()
object Scissors : Move()

sealed class Game
class Start(val response: CompletableDeferred<Int>) : Game()
class Play(val sender: Channel<Game>, val name: String) : Game()
class Throw(val actor: String, val move: Move) : Game()

fun playerActor() = actor<Game> {
    var name: String

    for (msg in channel) {
        when (msg) {
            is Play -> {
                name = msg.name

                val selection = (1..4).random()
                lateinit var move: Move

                when (selection) {
                    1 -> move = Rock
                    2 -> move = Paper
                    3 -> move = Scissors
                }
                msg.sender.send(Throw(name, move))
            }
        }
    }
}

fun coordinatorActor() = actor<Game> {
    lateinit var response: CompletableDeferred<Int>

    val player1 = playerActor()
    val player2 = playerActor()

    for (msg in channel) {
        when (msg) {
            is Start -> {
                response = msg.response
                player1.send(Play(channel, "Player 1"))
                player2.send(Play(channel, "Player 2"))
            }
            is Throw -> {
                val playerA = msg.actor
                val moveA = msg.move
                val msg2 = channel.receive() as Throw
                val playerB = msg2.actor
                val moveB = msg2.move
                announce(playerA, moveA, playerB, moveB)
                response.complete(0)
            }

        }
    }
}

fun announce(playerA: String, moveA: Move, playerB: String, moveB: Move) {
    var awin = false

    print("$playerA -> $moveA, $playerB -> $moveB ")

    if (moveA == moveB) {
        println("Draw")
        return
    }
    when (moveA) {
        is Rock -> {
            when (moveB) {
                is Scissors -> {
                    awin = true
                }
            }
        }
        is Scissors -> {
            when (moveB) {
                is Paper -> {
                    awin = true
                }
            }
        }
        is Paper -> {
            when (moveB) {
                is Rock -> {
                    awin = true
                }
            }
        }
    }

    if (awin) {
        println("$playerA wins")
    } else {
        println("$playerB wins")
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    repeat(10) {
        val response = CompletableDeferred<Int>()
        var coord = coordinatorActor()
        coord.send(Start(response))

        response.await()
    }
}

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun ClosedRange<Int>.random() = Random().nextInt(endInclusive - start) + start

Using Corountines in the UI

Need to review this but the approach was to create a suspend function and launch the work from the UI thread. This allows us to cancel because we have a reference to the job.

fun main(args: Array<String>) = launch<CoroutineApp>(args)

class CoroutineApp : App(IntroView::class) {
    override fun start(stage: Stage) {
        stage.width = 200.0
        stage.height = 400.0
        super.start(stage)
    }
}

class IntroView : View() {
    override val root = BorderPane()
    val counter = SimpleIntegerProperty()
    lateinit var counterJob: Job

    init {
        title = "Counter"

        with(root) {
            style {
                padding = box(20.px)
            }

            center {
                vbox(10.0) {
                    alignment = Pos.CENTER

                    label() {
                        bind(counter)
                        style { fontSize = 25.px }
                    }

                    button("Click to increment") {

                    }.setOnAction {
                        // 2. add launch coroutine
                        counterJob = launch(UI) { increment() }
                    }
                    button("Click to cancel") {

                    }.setOnAction {
                        // 2. add launch coroutine
                        launch(UI) { counterJob.cancel() }
                    }
                }
            }
        }
    }

    suspend fun increment() {
        // 1. add delay
        delay(3000)
        counter.value += 1
    }
}