Rust Concurrency and Asynchronous Processing

From bibbleWiki
Revision as of 19:36, 15 October 2024 by Iwiseman (talk | contribs) (AsRef and AsMut (Probably should be AsMutRef))
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

AsRef and AsMut (Probably should be AsMutRef)

AsRef allows you to do type conversion on functions. A good example I found was std::fs:rename.

fs::rename(Path::new("a.txt"), Path::new("b.txt"))?;

If rename had a AsRef<Path> we could simplify this to be.

fs::rename("a.txt", "b.txt")?;

AsRef Simple Example

Here is an example of allowing the string to be passed to a print function on a struct.

struct MyStruct(String);

impl AsRef<String> for MyStruct {
    fn as_ref(&self) -> &String {
        &self.0
    }
}

fn print_ref(s: impl AsRef<str>) {
    println!("str: {}", s.as_ref());
}

fn main() {
    let s = MyStruct(String::from("Hello"));
    print_ref(&s);
}

AsMut Simple Example

This is an example of usage of AsMut

// Squares a number using `as_mut()`.
fn num_sq<T: AsRef<u32> + AsMut<u32>>(arg: &mut T) {
    // TODO: Implement the function body.
    let num = arg.as_ref().pow(2);
    *arg.as_mut() = num;
}
...
    #[test]
    fn mut_box() {
        let mut num = Box::new(3);
        num_sq(&mut num);
        assert_eq!(*num, 9);
    }

Smart Pointer

Box

Finally got some good advice around Box. Box is a way to make objects on the heap rather than the stack. We need to do this for at least these reasons

  • When a size is not known, things must be on the heap
  • When large or not used a lot then best to reduce stack size

Box allow us to do this. If you think of C++ and forward declares, this is the same thing. As pointers in C++ can point to various sized objects, e.g. array. For rust we see the error the size of values of type blah cannot be known at compilation time. A good reason for this message to appear is when we use a trait which could be implemented in many ways

trait Vehicle {
  fn drive(&self);
}

struct Truck;
impl Vehicle for Truck {
...

// Not allowed to do let t: dyn Vehicle
// We have to do
let t: Box<dyn Vehicle>
t = Box::new(Truck);

The other use case for Box is when the type is used in the struct of the same type. In this case, without the box the compiler would say recursive type 'LinkListItem' has infinite size

struct LinkListItem {
   next_item: Option<Box<LinkListItem>>
}

Rc (Reference Counter)

This is the same as C++ smart pointer. For rust we cannot do this as we are borrowing truckb for facility_2

    let (trucka, truckb, truckc) = (
        Truck { capacity: 1 },
        Truck { capacity: 11 },
        Truck { capacity: 3 },
    );

    let facility_1 = vec![trucka, truckb];
    let facility_2 = vec![truckb, truckc];

Making them Rc means rust will allow us to clone the pointers. There is only one truckb in memory and will be de-allocated when the Rc go out of scope

    let (trucka, truckb, truckc) = (
        Rc:new::(Truck { capacity: 1 }),
        Rc:new::(Truck { capacity: 11 }),
        Rc:new::(Truck { capacity: 3 }),
    );

    let facility_1 = vec![trucka, Rc::clone(&truckb)];
    let facility_2 = vec![truckb, truckc];

// You can get the count with
printIn!("Ref Count: {:?}", Rc::strong_count(&truck_b));

Arc (Atomic Reference Count)

This is the same as Rc but it implements Send which is required if you are passing to a thread.

    let (trucka, truckb, truckc) = (
        Arc::new(Truck { capacity: 1 }),
        Arc::new(Truck { capacity: 11 }),
        Arc::new(Truck { capacity: 3 }),
    );


    let thread = std::thread::spawn(move || {
        let facility_1 = vec![Arc::clone(&trucka), Arc::clone(&truckb)];
        let facility_2 = vec![Arc::clone(&truckb), Arc::clone(&truckc)];
    });

Cow (Copy on Write/Clone on Write)

This is a smart pointer where if you can determine a possible saving you can return a Cow and prevent allocated. In this example the returned value when > 10 can be the value passed in and therefore saving allocation

fn hello(a: usize, b: &str) -> Cow<str> {
    if a > 10 {
        Cow::Borrowed(b)
    } else {
        Cow::Owned(b.to_string())
    }
}

Asyncronous

Async and Await (Future Trait)

This is just like promises. I learned a few things doing this. Renaming namespaces can be done using as. For the asynchronous work with thread still present there was a lot of crossover between the two. Note not all code listed so additional use statements demonstrate the renaming requirement.

use async_std::{prelude::*, io as async_io, task as async_task, fs as async_fs, fs::File as async_file};
use std::{cell::RefCell, fs, thread, sync::{mpsc::sync_channel, Arc, Mutex}};

async fn read_file(path: &str) -> async_io::Result<String> {
    let mut file: async_file = async_fs::File::open(path).await?;
    let mut contents = String::new();
    file.read_to_string(&mut contents).await?;
    Ok(contents)
}

fn main() {
    let task = async_task::spawn(async {
        let  result = read_file("Cargo.toml").await;
        match result {
            Ok(k) => println!("contents {}", k),
            Err(err) => println!("error {}", err),
            
        }
    });

    async_std::task::block_on(task);
    println!("Task stopped");
}

Concurrency

Thread Join

Threads are similar to C++ and C#. Let do a basic join

let handle = thread::spawn(move || {
    println!("Hello from a thread!")
});

handle.join().unwrap();
println!("Hello Main!")

Not threads do not always finish in the order created

    v.into_iter().for_each(|e| {
        thread_handles.push(thread::spawn(move || println!("Thread {}",e)));
    });

    thread_handles.into_iter().for_each(|handle| {
        handle.join().unwrap();
    });

// Thread 2
// Thread 1
// Thread 3

Channels

This is like channels in kotlin. These live in the mpsc namespace which stands for multi producer single consumer. The value being sent is taken on send and receive. I.E. you cannot use the value being sent after send.

Example One Producer

use std::sync::mpsc::channel;
use std::thread;

let (sender, receiver) = channel();

// Spawn off an expensive computation
thread::spawn(move|| {
    sender.send(expensive_computation()).unwrap();
});

// Do some useful work for awhile

// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());

Example Two Producer

We can have multiple producers and one receiver. To ensure that the receiver is not overwhelmed rust provides a sync_channel method on the mpsc. When used the sender will block when the queue if full and automatically continue when reduced.

    let (producer1, receiver) = sync_channel(1000);
    let producer2 = producer1.clone();

    // Send from Producer 1
    thread::spawn(move|| {
        let vec = vec![String::from("transmitting"), String::from("hello"), String::from("world"),];
        for val in vec {
            producer1.send(val).unwrap();
        }
    });

    thread::spawn(move|| {
        let vec = vec![String::from("producer2"), String::from("hello"), String::from("world 2"),];
        for val in vec {
            producer2.send(val).unwrap();
        }
    });

    // Send from Producer 
    for received in receiver {
        println!("Got: {}", received);
    }

Sync and Send Type Traits

Send

Types that implement send are safe to pass by value to another thread and moved across threads. Almost all types implement send but there are exceptions, e.g. Rc however the Atomic Reference Counter (Arc) can be. This did not compile for me if I tried to use Rc with the error std::rc::Rc<std::string::String>` cannot be sent between threads safely within `{closure@src/main.rs:116:24: 116:31}`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::string::String> but it did on the course.

Sync

Types that implement sync are safe to pass by non mutable reference to another thread. These types can be shared across threads.

Mutexes and Threads (Shared State)

Introduction

These a like c++ mutexes. You need to get and release a lock when using the resource. It was stressed the the atomic reference counter Arc (no Rc) is used for sharing resources across threads. Mutexes are used for mutating (modifying) data that is shared across threads.

Example of Need for Mutex

So here is an example of the need for a Mutex given in the Rustlings

    let status = Arc::new(JobStatus { jobs_done: 0 });

    let mut handles = Vec::new();
    for _ in 0..10 {
        let status_shared = Arc::clone(&status);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(250));

            status_shared.jobs_done += 1;
        });
        handles.push(handle);
    }

This fails with cannot assign to data in an `Arc`. This is because you cannot mutate data in an Arc. To fix this we put the data in a Mutex and lock it when we update. Hopefully this example will provide a useful approach next time I have this issue.

    let status = Arc::new(Mutex::new(JobStatus { jobs_done: 0 }));

    let mut handles = Vec::new();
    for _ in 0..10 {
        let status_shared = Arc::clone(&status);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(250));
            status_shared.lock().unwrap().jobs_done += 1;
        });
        handles.push(handle);
    }

Deadlocks

Below is an example of a deadlock where the same thread asks for the lock without releasing. Note you can release the mutex with drop on the guard (numMutexGuard). I also noticed if you did not use the value then the code did complete - maybe an optimizer.

    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move|| {
            let mut numMutexGuard = counter.lock().unwrap();
            // *numMutexGuard += 1;
            // std::mem::drop(numMutexGuard);
            let mut numMutexGuard2 = counter.lock().unwrap();
            *numMutexGuard2 += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());

Poisoned Mutex

This is a term which means when a thread is executing and has a lock but panics. The mutex is hanging or poisoned. In rust we can actually recover from this though I suspect this is very undesirable. Here is the sample code where we match on poisoned.

    let lock = Arc::new(Mutex::new(0));
    let lock2 = Arc::clone(&lock);

    let _ = thread::spawn(move || {
        let _guard = lock2.lock().unwrap(); // acquire lock
        panic!("thread1"); // mutex is not poisoned
    }).join();

    let mut guard = match lock.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };

    *guard += 1;
    println!("lock value: {}", *guard);

Rayon

Quick example of rayon for parallelization with Rayon. Was a little unhappy with the changing of signatures for reduce in rayon but ho-hum. This took 6550ms single threaded and 186ms multi-threaded.

fn factoral (n: u32) -> BigUint {
    if n == 0 || n ==1 {
        BigUint::from(1u32)
    } else {
        // Reduce in Typescript is array.reduce((acc, next_value) => acc * next_value, 1)
        (1..=n).map(BigUint::from).reduce(|acc, next_value| acc * next_value).unwrap()
        // new way 
        // n * factoral(n - 1)
    }

}

fn factoral_with_rayon (n: u32) -> BigUint {
    if n == 0 || n ==1 {
        BigUint::from(1u32)
    } else {
        (1..=n).into_par_iter().map(BigUint::from).reduce(|| BigUint::from(1u32), |acc, x| acc * x)
    }
}


main() {
    let mut now = std::time::Instant::now();
    factoral(80000);
    println!("factoral took {} seconds", now.elapsed().as_millis());
    now = std::time::Instant::now();
    factoral_with_rayon(80000);
    println!("factoral with rayon took {} seconds", now.elapsed().as_millis());
}