Rust Concurrency and Asynchronous Processing: Difference between revisions
(7 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
=AsRef and AsMut (Probably should be AsMutRef)= | |||
AsRef allows you to do type conversion on functions. A good example I found was std::fs:rename. | |||
<syntaxhighlight lang="rs"> | |||
fs::rename(Path::new("a.txt"), Path::new("b.txt"))?; | |||
</syntaxhighlight> | |||
If rename had a AsRef<Path> we could simplify this to be. | |||
<syntaxhighlight lang="rs"> | |||
fs::rename("a.txt", "b.txt")?; | |||
</syntaxhighlight> | |||
==AsRef Simple Example== | |||
Here is an example of allowing the string to be passed to a print function on a struct. | |||
<syntaxhighlight lang="rs"> | |||
struct MyStruct(String); | |||
impl AsRef<String> for MyStruct { | |||
fn as_ref(&self) -> &String { | |||
&self.0 | |||
} | |||
} | |||
fn print_ref(s: impl AsRef<str>) { | |||
println!("str: {}", s.as_ref()); | |||
} | |||
fn main() { | |||
let s = MyStruct(String::from("Hello")); | |||
print_ref(&s); | |||
} | |||
</syntaxhighlight> | |||
==AsMut Simple Example== | |||
This is an example of usage of AsMut | |||
<syntaxhighlight lang="rs"> | |||
// Squares a number using `as_mut()`. | |||
fn num_sq<T: AsRef<u32> + AsMut<u32>>(arg: &mut T) { | |||
// TODO: Implement the function body. | |||
let num = arg.as_ref().pow(2); | |||
*arg.as_mut() = num; | |||
} | |||
... | |||
#[test] | |||
fn mut_box() { | |||
let mut num = Box::new(3); | |||
num_sq(&mut num); | |||
assert_eq!(*num, 9); | |||
} | |||
</syntaxhighlight> | |||
=Smart Pointer= | =Smart Pointer= | ||
==Box== | ==Box== | ||
Line 34: | Line 81: | ||
); | ); | ||
let facility_1 | let facility_1 = vec![trucka, truckb]; | ||
let facility_2 | let facility_2 = vec![truckb, truckc]; | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Making them Rc means rust will allow us to clone the pointers. There is only one truckb in memory and will be de-allocated when the Rc go out of scope | Making them Rc means rust will allow us to clone the pointers. There is only one truckb in memory and will be de-allocated when the Rc go out of scope | ||
Line 45: | Line 92: | ||
); | ); | ||
let facility_1 | let facility_1 = vec![trucka, Rc::clone(&truckb)]; | ||
let facility_2 | let facility_2 = vec![truckb, truckc]; | ||
// You can get the count with | // You can get the count with | ||
printIn!("Ref Count: {:?}", Rc::strong_count(&truck_b)); | printIn!("Ref Count: {:?}", Rc::strong_count(&truck_b)); | ||
</syntaxhighlight> | </syntaxhighlight> | ||
==Arc (Atomic Reference Count)== | ==Arc (Atomic Reference Count)== | ||
This is the same as Rc but it implements Send which is required if you are passing to a thread. | This is the same as Rc but it implements Send which is required if you are passing to a thread. | ||
Line 67: | Line 115: | ||
}); | }); | ||
</syntaxhighlight> | </syntaxhighlight> | ||
==Cow== | ==Cow (Copy on Write/Clone on Write)== | ||
This is a smart pointer where if you can determine a possible saving you can return a Cow and prevent allocated. In this example the returned value when > 10 can be the value passed in and therefore saving allocation | |||
<syntaxhighlight lang="rs"> | |||
fn hello(a: usize, b: &str) -> Cow<str> { | |||
if a > 10 { | |||
Cow::Borrowed(b) | |||
} else { | |||
Cow::Owned(b.to_string()) | |||
} | |||
} | |||
</syntaxhighlight> | |||
=Asyncronous= | =Asyncronous= | ||
Line 176: | Line 234: | ||
===Introduction=== | ===Introduction=== | ||
These a like c++ mutexes. You need to get and release a lock when using the resource. It was stressed the the atomic reference counter Arc (no Rc) is used for sharing resources across threads. Mutexes are used for mutating (modifying) data that is shared across threads. | These a like c++ mutexes. You need to get and release a lock when using the resource. It was stressed the the atomic reference counter Arc (no Rc) is used for sharing resources across threads. Mutexes are used for mutating (modifying) data that is shared across threads. | ||
===Example of Need for Mutex=== | |||
So here is an example of the need for a Mutex given in the Rustlings | |||
<syntaxhighlight lang="rs" highlight="9"> | |||
let status = Arc::new(JobStatus { jobs_done: 0 }); | |||
let mut handles = Vec::new(); | |||
for _ in 0..10 { | |||
let status_shared = Arc::clone(&status); | |||
let handle = thread::spawn(move || { | |||
thread::sleep(Duration::from_millis(250)); | |||
status_shared.jobs_done += 1; | |||
}); | |||
handles.push(handle); | |||
} | |||
</syntaxhighlight> | |||
This fails with '''cannot assign to data in an `Arc`'''. This is because you cannot mutate data in an Arc. To fix this we put the data in a Mutex and lock it when we update. Hopefully this example will provide a useful approach next time I have this issue. | |||
<syntaxhighlight lang="rs" highlight="1,8"> | |||
let status = Arc::new(Mutex::new(JobStatus { jobs_done: 0 })); | |||
let mut handles = Vec::new(); | |||
for _ in 0..10 { | |||
let status_shared = Arc::clone(&status); | |||
let handle = thread::spawn(move || { | |||
thread::sleep(Duration::from_millis(250)); | |||
status_shared.lock().unwrap().jobs_done += 1; | |||
}); | |||
handles.push(handle); | |||
} | |||
</syntaxhighlight> | |||
===Deadlocks=== | ===Deadlocks=== | ||
Below is an example of a deadlock where the same thread asks for the lock without releasing. Note you can release the mutex with drop on the guard (numMutexGuard). I also noticed if you did not use the value then the code did complete - maybe an optimizer. | Below is an example of a deadlock where the same thread asks for the lock without releasing. Note you can release the mutex with drop on the guard (numMutexGuard). I also noticed if you did not use the value then the code did complete - maybe an optimizer. | ||
Line 218: | Line 307: | ||
println!("lock value: {}", *guard); | println!("lock value: {}", *guard); | ||
</syntaxhighlight> | </syntaxhighlight> | ||
==Rayon== | ==Rayon== | ||
Quick example of rayon for parallelization with Rayon. Was a little unhappy with the changing of signatures for reduce in rayon but ho-hum. This took 6550ms single threaded and 186ms multi-threaded. | Quick example of rayon for parallelization with Rayon. Was a little unhappy with the changing of signatures for reduce in rayon but ho-hum. This took 6550ms single threaded and 186ms multi-threaded. |
Latest revision as of 19:36, 15 October 2024
AsRef and AsMut (Probably should be AsMutRef)
AsRef allows you to do type conversion on functions. A good example I found was std::fs:rename.
fs::rename(Path::new("a.txt"), Path::new("b.txt"))?;
If rename had a AsRef<Path> we could simplify this to be.
fs::rename("a.txt", "b.txt")?;
AsRef Simple Example
Here is an example of allowing the string to be passed to a print function on a struct.
struct MyStruct(String);
impl AsRef<String> for MyStruct {
fn as_ref(&self) -> &String {
&self.0
}
}
fn print_ref(s: impl AsRef<str>) {
println!("str: {}", s.as_ref());
}
fn main() {
let s = MyStruct(String::from("Hello"));
print_ref(&s);
}
AsMut Simple Example
This is an example of usage of AsMut
// Squares a number using `as_mut()`.
fn num_sq<T: AsRef<u32> + AsMut<u32>>(arg: &mut T) {
// TODO: Implement the function body.
let num = arg.as_ref().pow(2);
*arg.as_mut() = num;
}
...
#[test]
fn mut_box() {
let mut num = Box::new(3);
num_sq(&mut num);
assert_eq!(*num, 9);
}
Smart Pointer
Box
Finally got some good advice around Box. Box is a way to make objects on the heap rather than the stack. We need to do this for at least these reasons
- When a size is not known, things must be on the heap
- When large or not used a lot then best to reduce stack size
Box allow us to do this. If you think of C++ and forward declares, this is the same thing. As pointers in C++ can point to various sized objects, e.g. array. For rust we see the error the size of values of type blah cannot be known at compilation time. A good reason for this message to appear is when we use a trait which could be implemented in many ways
trait Vehicle {
fn drive(&self);
}
struct Truck;
impl Vehicle for Truck {
...
// Not allowed to do let t: dyn Vehicle
// We have to do
let t: Box<dyn Vehicle>
t = Box::new(Truck);
The other use case for Box is when the type is used in the struct of the same type. In this case, without the box the compiler would say recursive type 'LinkListItem' has infinite size
struct LinkListItem {
next_item: Option<Box<LinkListItem>>
}
Rc (Reference Counter)
This is the same as C++ smart pointer. For rust we cannot do this as we are borrowing truckb for facility_2
let (trucka, truckb, truckc) = (
Truck { capacity: 1 },
Truck { capacity: 11 },
Truck { capacity: 3 },
);
let facility_1 = vec![trucka, truckb];
let facility_2 = vec![truckb, truckc];
Making them Rc means rust will allow us to clone the pointers. There is only one truckb in memory and will be de-allocated when the Rc go out of scope
let (trucka, truckb, truckc) = (
Rc:new::(Truck { capacity: 1 }),
Rc:new::(Truck { capacity: 11 }),
Rc:new::(Truck { capacity: 3 }),
);
let facility_1 = vec![trucka, Rc::clone(&truckb)];
let facility_2 = vec![truckb, truckc];
// You can get the count with
printIn!("Ref Count: {:?}", Rc::strong_count(&truck_b));
Arc (Atomic Reference Count)
This is the same as Rc but it implements Send which is required if you are passing to a thread.
let (trucka, truckb, truckc) = (
Arc::new(Truck { capacity: 1 }),
Arc::new(Truck { capacity: 11 }),
Arc::new(Truck { capacity: 3 }),
);
let thread = std::thread::spawn(move || {
let facility_1 = vec![Arc::clone(&trucka), Arc::clone(&truckb)];
let facility_2 = vec![Arc::clone(&truckb), Arc::clone(&truckc)];
});
Cow (Copy on Write/Clone on Write)
This is a smart pointer where if you can determine a possible saving you can return a Cow and prevent allocated. In this example the returned value when > 10 can be the value passed in and therefore saving allocation
fn hello(a: usize, b: &str) -> Cow<str> {
if a > 10 {
Cow::Borrowed(b)
} else {
Cow::Owned(b.to_string())
}
}
Asyncronous
Async and Await (Future Trait)
This is just like promises. I learned a few things doing this. Renaming namespaces can be done using as. For the asynchronous work with thread still present there was a lot of crossover between the two. Note not all code listed so additional use statements demonstrate the renaming requirement.
use async_std::{prelude::*, io as async_io, task as async_task, fs as async_fs, fs::File as async_file};
use std::{cell::RefCell, fs, thread, sync::{mpsc::sync_channel, Arc, Mutex}};
async fn read_file(path: &str) -> async_io::Result<String> {
let mut file: async_file = async_fs::File::open(path).await?;
let mut contents = String::new();
file.read_to_string(&mut contents).await?;
Ok(contents)
}
fn main() {
let task = async_task::spawn(async {
let result = read_file("Cargo.toml").await;
match result {
Ok(k) => println!("contents {}", k),
Err(err) => println!("error {}", err),
}
});
async_std::task::block_on(task);
println!("Task stopped");
}
Concurrency
Thread Join
Threads are similar to C++ and C#. Let do a basic join
let handle = thread::spawn(move || {
println!("Hello from a thread!")
});
handle.join().unwrap();
println!("Hello Main!")
Not threads do not always finish in the order created
v.into_iter().for_each(|e| {
thread_handles.push(thread::spawn(move || println!("Thread {}",e)));
});
thread_handles.into_iter().for_each(|handle| {
handle.join().unwrap();
});
// Thread 2
// Thread 1
// Thread 3
Channels
This is like channels in kotlin. These live in the mpsc namespace which stands for multi producer single consumer. The value being sent is taken on send and receive. I.E. you cannot use the value being sent after send.
Example One Producer
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
// Spawn off an expensive computation
thread::spawn(move|| {
sender.send(expensive_computation()).unwrap();
});
// Do some useful work for awhile
// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());
Example Two Producer
We can have multiple producers and one receiver. To ensure that the receiver is not overwhelmed rust provides a sync_channel method on the mpsc. When used the sender will block when the queue if full and automatically continue when reduced.
let (producer1, receiver) = sync_channel(1000);
let producer2 = producer1.clone();
// Send from Producer 1
thread::spawn(move|| {
let vec = vec![String::from("transmitting"), String::from("hello"), String::from("world"),];
for val in vec {
producer1.send(val).unwrap();
}
});
thread::spawn(move|| {
let vec = vec![String::from("producer2"), String::from("hello"), String::from("world 2"),];
for val in vec {
producer2.send(val).unwrap();
}
});
// Send from Producer
for received in receiver {
println!("Got: {}", received);
}
Sync and Send Type Traits
Send
Types that implement send are safe to pass by value to another thread and moved across threads. Almost all types implement send but there are exceptions, e.g. Rc however the Atomic Reference Counter (Arc) can be. This did not compile for me if I tried to use Rc with the error std::rc::Rc<std::string::String>` cannot be sent between threads safely within `{closure@src/main.rs:116:24: 116:31}`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::string::String> but it did on the course.
Sync
Types that implement sync are safe to pass by non mutable reference to another thread. These types can be shared across threads.
Introduction
These a like c++ mutexes. You need to get and release a lock when using the resource. It was stressed the the atomic reference counter Arc (no Rc) is used for sharing resources across threads. Mutexes are used for mutating (modifying) data that is shared across threads.
Example of Need for Mutex
So here is an example of the need for a Mutex given in the Rustlings
let status = Arc::new(JobStatus { jobs_done: 0 });
let mut handles = Vec::new();
for _ in 0..10 {
let status_shared = Arc::clone(&status);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(250));
status_shared.jobs_done += 1;
});
handles.push(handle);
}
This fails with cannot assign to data in an `Arc`. This is because you cannot mutate data in an Arc. To fix this we put the data in a Mutex and lock it when we update. Hopefully this example will provide a useful approach next time I have this issue.
let status = Arc::new(Mutex::new(JobStatus { jobs_done: 0 }));
let mut handles = Vec::new();
for _ in 0..10 {
let status_shared = Arc::clone(&status);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(250));
status_shared.lock().unwrap().jobs_done += 1;
});
handles.push(handle);
}
Deadlocks
Below is an example of a deadlock where the same thread asks for the lock without releasing. Note you can release the mutex with drop on the guard (numMutexGuard). I also noticed if you did not use the value then the code did complete - maybe an optimizer.
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move|| {
let mut numMutexGuard = counter.lock().unwrap();
// *numMutexGuard += 1;
// std::mem::drop(numMutexGuard);
let mut numMutexGuard2 = counter.lock().unwrap();
*numMutexGuard2 += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
Poisoned Mutex
This is a term which means when a thread is executing and has a lock but panics. The mutex is hanging or poisoned. In rust we can actually recover from this though I suspect this is very undesirable. Here is the sample code where we match on poisoned.
let lock = Arc::new(Mutex::new(0));
let lock2 = Arc::clone(&lock);
let _ = thread::spawn(move || {
let _guard = lock2.lock().unwrap(); // acquire lock
panic!("thread1"); // mutex is not poisoned
}).join();
let mut guard = match lock.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
};
*guard += 1;
println!("lock value: {}", *guard);
Rayon
Quick example of rayon for parallelization with Rayon. Was a little unhappy with the changing of signatures for reduce in rayon but ho-hum. This took 6550ms single threaded and 186ms multi-threaded.
fn factoral (n: u32) -> BigUint {
if n == 0 || n ==1 {
BigUint::from(1u32)
} else {
// Reduce in Typescript is array.reduce((acc, next_value) => acc * next_value, 1)
(1..=n).map(BigUint::from).reduce(|acc, next_value| acc * next_value).unwrap()
// new way
// n * factoral(n - 1)
}
}
fn factoral_with_rayon (n: u32) -> BigUint {
if n == 0 || n ==1 {
BigUint::from(1u32)
} else {
(1..=n).into_par_iter().map(BigUint::from).reduce(|| BigUint::from(1u32), |acc, x| acc * x)
}
}
main() {
let mut now = std::time::Instant::now();
factoral(80000);
println!("factoral took {} seconds", now.elapsed().as_millis());
now = std::time::Instant::now();
factoral_with_rayon(80000);
println!("factoral with rayon took {} seconds", now.elapsed().as_millis());
}