Rust Concurrency and Asynchronous Processing: Difference between revisions

From bibbleWiki
Jump to navigation Jump to search
No edit summary
No edit summary
Line 1: Line 1:
=Smart Pointer=
==Box==
Finally got some good advice around Box. Box is a way to make objects on the heap rather than the stack. We need to do this for at least these reasons
*When a size is not known, things must be on the heap
*When large or not used a lot then best to reduce stack size
Box allow us to do this. If you think of C++ and forward declares, this is the same thing. As pointers in C++ can point to various sized objects, e.g. array. For rust we see the error '''the size of values of type blah cannot be known at compilation time'''. A good reason for this message to appear is when we use a trait which could be implemented in many ways
<syntaxhighlight lang="rs">
trait Vehicle {
  fn drive(&self);
}
struct Truck;
impl Vehicle for Truck {
...
// Not allowed to do let t: dyn Vehicle
// We have to do
let t: Box<dyn Vehicle>
t = Box::new(Truck);
</syntaxhighlight>
The other use case for Box is when the type is used in the struct of the same type. In this case, without the box the compiler would say '''recursive type 'LinkListItem' has infinite size
<syntaxhighlight lang="rs">
struct LinkListItem {
  next_item: Option<Box<LinkListItem>>
}
</syntaxhighlight>
==Arc==
==Cow==
==Rc==
=Asyncronous=
=Asyncronous=
==Async and Await (Future Trait)==
==Async and Await (Future Trait)==

Revision as of 04:27, 14 October 2024

Smart Pointer

Box

Finally got some good advice around Box. Box is a way to make objects on the heap rather than the stack. We need to do this for at least these reasons

  • When a size is not known, things must be on the heap
  • When large or not used a lot then best to reduce stack size

Box allow us to do this. If you think of C++ and forward declares, this is the same thing. As pointers in C++ can point to various sized objects, e.g. array. For rust we see the error the size of values of type blah cannot be known at compilation time. A good reason for this message to appear is when we use a trait which could be implemented in many ways

trait Vehicle {
  fn drive(&self);
}

struct Truck;
impl Vehicle for Truck {
...

// Not allowed to do let t: dyn Vehicle
// We have to do
let t: Box<dyn Vehicle>
t = Box::new(Truck);

The other use case for Box is when the type is used in the struct of the same type. In this case, without the box the compiler would say recursive type 'LinkListItem' has infinite size

struct LinkListItem {
   next_item: Option<Box<LinkListItem>>
}

Arc

Cow

Rc

Asyncronous

Async and Await (Future Trait)

This is just like promises. I learned a few things doing this. Renaming namespaces can be done using as. For the asynchronous work with thread still present there was a lot of crossover between the two. Note not all code listed so additional use statements demonstrate the renaming requirement.

use async_std::{prelude::*, io as async_io, task as async_task, fs as async_fs, fs::File as async_file};
use std::{cell::RefCell, fs, thread, sync::{mpsc::sync_channel, Arc, Mutex}};

async fn read_file(path: &str) -> async_io::Result<String> {
    let mut file: async_file = async_fs::File::open(path).await?;
    let mut contents = String::new();
    file.read_to_string(&mut contents).await?;
    Ok(contents)
}

fn main() {
    let task = async_task::spawn(async {
        let  result = read_file("Cargo.toml").await;
        match result {
            Ok(k) => println!("contents {}", k),
            Err(err) => println!("error {}", err),
            
        }
    });

    async_std::task::block_on(task);
    println!("Task stopped");
}

Concurrency

Thread Join

Threads are similar to C++ and C#. Let do a basic join

let handle = thread::spawn(move || {
    println!("Hello from a thread!")
});

handle.join().unwrap();
println!("Hello Main!")

Not threads do not always finish in the order created

    v.into_iter().for_each(|e| {
        thread_handles.push(thread::spawn(move || println!("Thread {}",e)));
    });

    thread_handles.into_iter().for_each(|handle| {
        handle.join().unwrap();
    });

// Thread 2
// Thread 1
// Thread 3

Channels

This is like channels in kotlin. These live in the mpsc namespace which stands for multi producer single consumer. The value being sent is taken on send and receive. I.E. you cannot use the value being sent after send.

Example One Producer

use std::sync::mpsc::channel;
use std::thread;

let (sender, receiver) = channel();

// Spawn off an expensive computation
thread::spawn(move|| {
    sender.send(expensive_computation()).unwrap();
});

// Do some useful work for awhile

// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());

Example Two Producer

We can have multiple producers and one receiver. To ensure that the receiver is not overwhelmed rust provides a sync_channel method on the mpsc. When used the sender will block when the queue if full and automatically continue when reduced.

    let (producer1, receiver) = sync_channel(1000);
    let producer2 = producer1.clone();

    // Send from Producer 1
    thread::spawn(move|| {
        let vec = vec![String::from("transmitting"), String::from("hello"), String::from("world"),];
        for val in vec {
            producer1.send(val).unwrap();
        }
    });

    thread::spawn(move|| {
        let vec = vec![String::from("producer2"), String::from("hello"), String::from("world 2"),];
        for val in vec {
            producer2.send(val).unwrap();
        }
    });

    // Send from Producer 
    for received in receiver {
        println!("Got: {}", received);
    }

Sync and Send Type Traits

Send

Types that implement send are safe to pass by value to another thread and moved across threads. Almost all types implement send but there are exceptions, e.g. Rc however the Atomic Reference Counter (Arc) can be. This did not compile for me if I tried to use Rc with the error std::rc::Rc<std::string::String>` cannot be sent between threads safely within `{closure@src/main.rs:116:24: 116:31}`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::string::String> but it did on the course.

Sync

Types that implement sync are safe to pass by non mutable reference to another thread. These types can be shared across threads.

Mutexes and Threads (Shared State)

Introduction

These a like c++ mutexes. You need to get and release a lock when using the resource. It was stressed the the atomic reference counter Arc (no Rc) is used for sharing resources across threads. Mutexes are used for mutating (modifying) data that is shared across threads.

Deadlocks

Below is an example of a deadlock where the same thread asks for the lock without releasing. Note you can release the mutex with drop on the guard (numMutexGuard). I also noticed if you did not use the value then the code did complete - maybe an optimizer.

    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move|| {
            let mut numMutexGuard = counter.lock().unwrap();
            // *numMutexGuard += 1;
            // std::mem::drop(numMutexGuard);
            let mut numMutexGuard2 = counter.lock().unwrap();
            *numMutexGuard2 += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());

Poisoned Mutex

This is a term which means when a thread is executing and has a lock but panics. The mutex is hanging or poisoned. In rust we can actually recover from this though I suspect this is very undesirable. Here is the sample code where we match on poisoned.

    let lock = Arc::new(Mutex::new(0));
    let lock2 = Arc::clone(&lock);

    let _ = thread::spawn(move || {
        let _guard = lock2.lock().unwrap(); // acquire lock
        panic!("thread1"); // mutex is not poisoned
    }).join();

    let mut guard = match lock.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };

    *guard += 1;
    println!("lock value: {}", *guard);

Rayon

Quick example of rayon for parallelization with Rayon. Was a little unhappy with the changing of signatures for reduce in rayon but ho-hum. This took 6550ms single threaded and 186ms multi-threaded.

fn factoral (n: u32) -> BigUint {
    if n == 0 || n ==1 {
        BigUint::from(1u32)
    } else {
        // Reduce in Typescript is array.reduce((acc, next_value) => acc * next_value, 1)
        (1..=n).map(BigUint::from).reduce(|acc, next_value| acc * next_value).unwrap()
        // new way 
        // n * factoral(n - 1)
    }

}

fn factoral_with_rayon (n: u32) -> BigUint {
    if n == 0 || n ==1 {
        BigUint::from(1u32)
    } else {
        (1..=n).into_par_iter().map(BigUint::from).reduce(|| BigUint::from(1u32), |acc, x| acc * x)
    }
}


main() {
    let mut now = std::time::Instant::now();
    factoral(80000);
    println!("factoral took {} seconds", now.elapsed().as_millis());
    now = std::time::Instant::now();
    factoral_with_rayon(80000);
    println!("factoral with rayon took {} seconds", now.elapsed().as_millis());
}