Kotlin Coroutines
Moores Law
I am doing this because of this graph
Previously there is fork/join for asynchronous but this code is far more complicated than it probably needs to be
override fun compute(): Long {
return if (high - low <= SEQUENTIAL_THRESHOLD) {
(low until high)
.map { array[it].toLong() }
.sum()
} else {
val mid = low + (high - low) / 2
val left = Sum(array, low, mid)
val right = Sum(array, mid, high)
left.fork()
val rightAns = right.compute()
val leftAns = left.join()
leftAns + rightAns
}
}
Using the suspend approach is far more easier to read
suspend fun compute(array: IntArray, low: Int, high: Int): Long {
// println("low: $low, high: $high on ${Thread.currentThread().name}")
return if (high - low <= SEQUENTIAL_THRESHOLD) {
(low until high)
.map { array[it].toLong() }
.sum()
} else {
val mid = low + (high - low) / 2
val left = async { compute(array, low, mid) }
val right = compute(array, mid, high)
return left.await() + right
}
}
Coroutines
Introduction
There are many co-routine builders (maybe)
- runBlocking (wait for co-routine to finish used for unit tests)
- launch non-blocking
- run
- async
Co-routines are lightweight threads and you can run many more co-routines than threads. They are scheduled onto a thread so they do not necessarily run on the same thread. A delay operation does not stop the thread only the co-routine. e.g.
...
launch {
delay(1000)
println("world")
}
It is very important not to use blocking code in a co-routine. As above delay is non-blocking to the thread but does delay the co-routine whereas Thread.sleep(5000) blocking.
Join ( job.join() )
Fairly simple
fun main(args: Array<String>) = runBlocking {
val job = launch {
delay(1000)
println("world")
}
job.join()
}
Cancel ( job.cancelAndJoin() )
For Cancel we do cancel and join or of course we use the cancelAndJoin(). This cancels because delay() checks for cancel.
fun main(args: Array<String>) = runBlocking {
val job = launch {
repeat(1000) {
delay(100)
println(".")
}
delay(100)
job.cancel()
job.join()
// Or
// job.cancelAndJoin()
}
Yield ( yield() ) or isActive
We can use yield within out own code or isActive() if we want to do stuff beyond yield.
fun main(args: Array<String>) = runBlocking {
val job = launch {
repeat(1000) {
yield()
// if (!isActive) throw CancellationException()
println(".")
}
delay(100)
job.cancelAndJoin()
}
Warning We must do a return to a non local return, i.e. a return outside of our loop. The code below will not work
fun main(args: Array<String>) = runBlocking {
val job = launch {
repeat(1000) {
yield()
if (!isActive) return@repeat
println(".")
}
job.cancelAndJoin()
}
Exceptions in Coroutines
We need to be careful as ever in cancelling and making sure we understand how the co-routine is torn down below demonstrate how this could be done. The run(NonCancellable) allow you to run a suspend function within your handling but be careful
...
try {
...
} catch(ex: CancellationException) {
println("Cancelled: ${ex.message}")
} finally {
run(NonCancellable) {
println("Forced non Cancel")
}
}
delay(100)
job.cancel(CancellationException("Because I can"))
job.join()
}
Summary for Exceptions
- Can be use to specify the reason why
- job.cancel(CancellationException("why"))
- Can specify any exception
- Job.cancel(SomeExceptionType()_
- Be Careful with this
- if using launch will tear down the thread/kill application
- Can use it with the async co-routine builder
Timeouts withTimeout() and withTimeoutWithNull()
These support timeout withTimeout() we have to wrap out co-routine with try catch. With withTimeoutWithNull() we only need to test the job for null to know if we completed.
Contexts
Introduction
Contexts provide a coroutine dispatcher to determine which thread the corountine is run on. Coroutines can run on
- Pool thread
- Unconfined thread
- Specific thread
You specify the context in the coroutine builder. You can see the thread it is running on using Thread.currentThread().name
- Unconfined (Start on Thread from context of current coroutine but after delay is managed)
- DefaultDispatcher (default currently Fork/Join Pool)
- CommonPoool (default currently Fork/Join Pool)
- newSingleThreadContext (runs on specified thread Expensive)
- coroutineContext (inherit from context of current coroutine)
Naming Contexts
We can name the contexts, ideal for debugging
fun main(args: Array<String>) = runBlocking {
val job = launch(CoroutineName("Iain was ere") + coroutineContext) {
println("Great ${Thread.currentThread().name}")
}
job.join()
}
Accessing Job Attributes
Within the coroutine you can access job attributes via the coroutine context. A silly example might by
fun main(args: Array<String>) = runBlocking {
val job = launch {
println("isActive? ${corountineContext[Job.Key]!!.isActive)")
// more concisely println("isActive? ${corountineContext[Job]!!.isActive)")
}
job.join()
}
Parent Child
Lauching coroutines within corountines we need to consider if they are dependant. If they are we need to make sure the outer waits for the children. One approach is to run the inner coroutine in the same context. e.g.
fun main(args: Array<String>) = runBlocking {
val outer = launch {
launch(coroutineContext) {
...
}
}
outer.join()
}
Cancelling the outer corountine will not cancel the children without linking the two.
newSingleThreadConext
With this context we are managing the resources of the context so it is important we ensure it is disposed of appropriately. To do this we use the c# equivalent of using
newSingleThreadContext("MyName").use { ctx ->
val job = launch(ctx) {
println("SingleThreadContext thread ${Thread.currentThread().name}")
}
job.join()
}
Returning Data (async and await)
Async and Deferred
When using async we get a Deferred<T> object back which is derived from job. I.E. isActive are available to us. The deferred object is like a promise in javascript or a future in java. I was surpised but suspend does not mean async, it means it can be. It we omit the async an int is returned and the code runs syncronously. I really like the async keyword wrapper because it is clear what we are doing.
fun main(args: Array<String>) = runBlocking {
val job = launch {
var time = measureTimeMillis {
println("About to work")
val r1 = async {doWorkOne()}
println("About to do more work")
val r2 = async{doWorkTwo()}
println("result: ${r1.await() + r2.await()}")
}
println("Done in $time")
}
job.join()
}
suspend fun doWorkOne(): Int {
delay(100)
println("Working 1")
return Random(System.currentTimeMillis()).nextInt(42)
}
suspend fun doWorkTwo(): Int {
delay(200)
println("Working 2")
return Random(System.currentTimeMillis()).nextInt(42)
}
Lazy
We can have lazy evaluation for example
fun main(args: Array<String>) = runBlocking {
val job = launch {
val result = async(start = CoroutineStart.LAZY) {doWorkLazy()}
println("resultm is ${result.await()}")
}
delay(500)
job.join()
}
Channels
Introduction
We use channels to communicate with coroutines.
- Send to and receive from a channel
- More than one item of data
- Channels block
- Can create buffered channels
- Need to know when channel has finished
A simple example below. Note that the send blocks until you receive and vice-versa.
fun main(args: Array<String>) = runBlocking {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) {
println("send $x")
channel.send(x * x)
}
}
println(channel.receive())
repeat(4) { println(channel.receive()) }
}
Consumers and Producers
The producer builder provides a way to simplify the producing of data down a channel. There is no wrapping in a subroutine and send can be called directly. To consume the data we can call consumeEach. The code below is far simpler than the code above. Note the use of it which can be thought of as the iterator.
// go from the previous demo to this
fun produceSquares() : ProducerJob<Int> = produce<Int> {
for (x in 1..5) {
println("sending")
send(x * x)
}
println("sending - done")
}
fun main(args: Array<String>) = runBlocking<Unit> {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}