Rust Concurrency and Asynchronous Processing: Difference between revisions
No edit summary |
No edit summary |
||
Line 1: | Line 1: | ||
=Smart Pointer= | |||
==Box== | |||
Finally got some good advice around Box. Box is a way to make objects on the heap rather than the stack. We need to do this for at least these reasons | |||
*When a size is not known, things must be on the heap | |||
*When large or not used a lot then best to reduce stack size | |||
Box allow us to do this. If you think of C++ and forward declares, this is the same thing. As pointers in C++ can point to various sized objects, e.g. array. For rust we see the error '''the size of values of type blah cannot be known at compilation time'''. A good reason for this message to appear is when we use a trait which could be implemented in many ways | |||
<syntaxhighlight lang="rs"> | |||
trait Vehicle { | |||
fn drive(&self); | |||
} | |||
struct Truck; | |||
impl Vehicle for Truck { | |||
... | |||
// Not allowed to do let t: dyn Vehicle | |||
// We have to do | |||
let t: Box<dyn Vehicle> | |||
t = Box::new(Truck); | |||
</syntaxhighlight> | |||
The other use case for Box is when the type is used in the struct of the same type. In this case, without the box the compiler would say '''recursive type 'LinkListItem' has infinite size | |||
<syntaxhighlight lang="rs"> | |||
struct LinkListItem { | |||
next_item: Option<Box<LinkListItem>> | |||
} | |||
</syntaxhighlight> | |||
==Arc== | |||
==Cow== | |||
==Rc== | |||
=Asyncronous= | =Asyncronous= | ||
==Async and Await (Future Trait)== | ==Async and Await (Future Trait)== |
Revision as of 04:27, 14 October 2024
Smart Pointer
Box
Finally got some good advice around Box. Box is a way to make objects on the heap rather than the stack. We need to do this for at least these reasons
- When a size is not known, things must be on the heap
- When large or not used a lot then best to reduce stack size
Box allow us to do this. If you think of C++ and forward declares, this is the same thing. As pointers in C++ can point to various sized objects, e.g. array. For rust we see the error the size of values of type blah cannot be known at compilation time. A good reason for this message to appear is when we use a trait which could be implemented in many ways
trait Vehicle {
fn drive(&self);
}
struct Truck;
impl Vehicle for Truck {
...
// Not allowed to do let t: dyn Vehicle
// We have to do
let t: Box<dyn Vehicle>
t = Box::new(Truck);
The other use case for Box is when the type is used in the struct of the same type. In this case, without the box the compiler would say recursive type 'LinkListItem' has infinite size
struct LinkListItem {
next_item: Option<Box<LinkListItem>>
}
Arc
Cow
Rc
Asyncronous
Async and Await (Future Trait)
This is just like promises. I learned a few things doing this. Renaming namespaces can be done using as. For the asynchronous work with thread still present there was a lot of crossover between the two. Note not all code listed so additional use statements demonstrate the renaming requirement.
use async_std::{prelude::*, io as async_io, task as async_task, fs as async_fs, fs::File as async_file};
use std::{cell::RefCell, fs, thread, sync::{mpsc::sync_channel, Arc, Mutex}};
async fn read_file(path: &str) -> async_io::Result<String> {
let mut file: async_file = async_fs::File::open(path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
Ok(contents)
}
fn main() {
let task = async_task::spawn(async {
let result = read_file("Cargo.toml").await;
match result {
Ok(k) => println!("contents {}", k),
Err(err) => println!("error {}", err),
}
});
async_std::task::block_on(task);
println!("Task stopped");
}
Concurrency
Thread Join
Threads are similar to C++ and C#. Let do a basic join
let handle = thread::spawn(move || {
println!("Hello from a thread!")
});
handle.join().unwrap();
println!("Hello Main!")
Not threads do not always finish in the order created
v.into_iter().for_each(|e| {
thread_handles.push(thread::spawn(move || println!("Thread {}",e)));
});
thread_handles.into_iter().for_each(|handle| {
handle.join().unwrap();
});
// Thread 2
// Thread 1
// Thread 3
Channels
This is like channels in kotlin. These live in the mpsc namespace which stands for multi producer single consumer. The value being sent is taken on send and receive. I.E. you cannot use the value being sent after send.
Example One Producer
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
// Spawn off an expensive computation
thread::spawn(move|| {
sender.send(expensive_computation()).unwrap();
});
// Do some useful work for awhile
// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());
Example Two Producer
We can have multiple producers and one receiver. To ensure that the receiver is not overwhelmed rust provides a sync_channel method on the mpsc. When used the sender will block when the queue if full and automatically continue when reduced.
let (producer1, receiver) = sync_channel(1000);
let producer2 = producer1.clone();
// Send from Producer 1
thread::spawn(move|| {
let vec = vec![String::from("transmitting"), String::from("hello"), String::from("world"),];
for val in vec {
producer1.send(val).unwrap();
}
});
thread::spawn(move|| {
let vec = vec![String::from("producer2"), String::from("hello"), String::from("world 2"),];
for val in vec {
producer2.send(val).unwrap();
}
});
// Send from Producer
for received in receiver {
println!("Got: {}", received);
}
Sync and Send Type Traits
Send
Types that implement send are safe to pass by value to another thread and moved across threads. Almost all types implement send but there are exceptions, e.g. Rc however the Atomic Reference Counter (Arc) can be. This did not compile for me if I tried to use Rc with the error std::rc::Rc<std::string::String>` cannot be sent between threads safely within `{closure@src/main.rs:116:24: 116:31}`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::string::String> but it did on the course.
Sync
Types that implement sync are safe to pass by non mutable reference to another thread. These types can be shared across threads.
Introduction
These a like c++ mutexes. You need to get and release a lock when using the resource. It was stressed the the atomic reference counter Arc (no Rc) is used for sharing resources across threads. Mutexes are used for mutating (modifying) data that is shared across threads.
Deadlocks
Below is an example of a deadlock where the same thread asks for the lock without releasing. Note you can release the mutex with drop on the guard (numMutexGuard). I also noticed if you did not use the value then the code did complete - maybe an optimizer.
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move|| {
let mut numMutexGuard = counter.lock().unwrap();
// *numMutexGuard += 1;
// std::mem::drop(numMutexGuard);
let mut numMutexGuard2 = counter.lock().unwrap();
*numMutexGuard2 += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
Poisoned Mutex
This is a term which means when a thread is executing and has a lock but panics. The mutex is hanging or poisoned. In rust we can actually recover from this though I suspect this is very undesirable. Here is the sample code where we match on poisoned.
let lock = Arc::new(Mutex::new(0));
let lock2 = Arc::clone(&lock);
let _ = thread::spawn(move || {
let _guard = lock2.lock().unwrap(); // acquire lock
panic!("thread1"); // mutex is not poisoned
}).join();
let mut guard = match lock.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
*guard += 1;
println!("lock value: {}", *guard);
Rayon
Quick example of rayon for parallelization with Rayon. Was a little unhappy with the changing of signatures for reduce in rayon but ho-hum. This took 6550ms single threaded and 186ms multi-threaded.
fn factoral (n: u32) -> BigUint {
if n == 0 || n ==1 {
BigUint::from(1u32)
} else {
// Reduce in Typescript is array.reduce((acc, next_value) => acc * next_value, 1)
(1..=n).map(BigUint::from).reduce(|acc, next_value| acc * next_value).unwrap()
// new way
// n * factoral(n - 1)
}
}
fn factoral_with_rayon (n: u32) -> BigUint {
if n == 0 || n ==1 {
BigUint::from(1u32)
} else {
(1..=n).into_par_iter().map(BigUint::from).reduce(|| BigUint::from(1u32), |acc, x| acc * x)
}
}
main() {
let mut now = std::time::Instant::now();
factoral(80000);
println!("factoral took {} seconds", now.elapsed().as_millis());
now = std::time::Instant::now();
factoral_with_rayon(80000);
println!("factoral with rayon took {} seconds", now.elapsed().as_millis());
}