Rust Concurrency and Asynchronous Processing: Difference between revisions

From bibbleWiki
Jump to navigation Jump to search
Created page with "=Asyncronous= ==Async and Await (Future Trait)== This is just like promises. I learned a few things doing this. Renaming namespaces can be done using as. For the asynchronous work with thread still present there was a lot of crossover between the two. Note not all code listed so additional use statements demonstrate the renaming requirement. <syntaxhighlight lang="rs"> use async_std::{prelude::*, io as async_io, task as async_task, fs as async_fs, fs::File as async_file}..."
 
No edit summary
Line 25: Line 25:
     async_std::task::block_on(task);
     async_std::task::block_on(task);
     println!("Task stopped");
     println!("Task stopped");
}
</syntaxhighlight>
=Concurrency=
==Thread Join==
Threads are similar to C++ and C#. Let do a basic join
<syntaxhighlight lang="rs">
let handle = thread::spawn(move || {
    println!("Hello from a thread!")
});
handle.join().unwrap();
println!("Hello Main!")
</syntaxhighlight>
Not threads do not always finish in the order created
<syntaxhighlight lang="rs">
    v.into_iter().for_each(|e| {
        thread_handles.push(thread::spawn(move || println!("Thread {}",e)));
    });
    thread_handles.into_iter().for_each(|handle| {
        handle.join().unwrap();
    });
// Thread 2
// Thread 1
// Thread 3
</syntaxhighlight>
==Channels==
This is like channels in kotlin. These live in the mpsc namespace which stands for multi producer single consumer. The value being sent is taken on send and receive. I.E. you cannot use the value being sent after send.
==Example One Producer==
<syntaxhighlight lang="rs">
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
// Spawn off an expensive computation
thread::spawn(move|| {
    sender.send(expensive_computation()).unwrap();
});
// Do some useful work for awhile
// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());
</syntaxhighlight>
==Example Two Producer==
We can have multiple producers and one receiver. To ensure that the receiver is not overwhelmed rust provides a sync_channel method on the mpsc. When used the sender will block when the queue if full and automatically continue when reduced.
<syntaxhighlight lang="rs" highlight="1">
    let (producer1, receiver) = sync_channel(1000);
    let producer2 = producer1.clone();
    // Send from Producer 1
    thread::spawn(move|| {
        let vec = vec![String::from("transmitting"), String::from("hello"), String::from("world"),];
        for val in vec {
            producer1.send(val).unwrap();
        }
    });
    thread::spawn(move|| {
        let vec = vec![String::from("producer2"), String::from("hello"), String::from("world 2"),];
        for val in vec {
            producer2.send(val).unwrap();
        }
    });
    // Send from Producer
    for received in receiver {
        println!("Got: {}", received);
    }
</syntaxhighlight>
==Sync and Send Type Traits==
===Send===
Types that implement send are safe to pass by value to another thread and moved across threads. Almost all types implement send but there are exceptions, e.g. Rc however the Atomic Reference Counter (Arc) can be. This  did not compile for me if I tried to use Rc with the error '''std::rc::Rc<std::string::String>` cannot be sent between threads safely within `{closure@src/main.rs:116:24: 116:31}`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::string::String>''' but it did on the course.
===Sync===
Types that implement sync are safe to pass by non mutable reference to another thread. These types can be shared across threads.
==Mutexes and Threads (Shared State)==
===Introduction===
These a like c++ mutexes. You need to get and release a lock when using the resource. It was stressed the the atomic reference counter Arc (no Rc) is used for sharing resources across threads. Mutexes are used for mutating (modifying) data that is shared across threads.
===Deadlocks===
Below is an example of a deadlock where the same thread asks for the lock without releasing. Note you can release the mutex with drop on the guard (numMutexGuard). I also noticed if you did not use the value then the code did complete - maybe an optimizer.
<syntaxhighlight lang="rs" highlight="7-8">
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move|| {
            let mut numMutexGuard = counter.lock().unwrap();
            // *numMutexGuard += 1;
            // std::mem::drop(numMutexGuard);
            let mut numMutexGuard2 = counter.lock().unwrap();
            *numMutexGuard2 += 1;
        });
        handles.push(handle);
    }
    for handle in handles {
        handle.join().unwrap();
    }
    println!("Result: {}", *counter.lock().unwrap());
</syntaxhighlight>
===Poisoned Mutex===
This is a term which means when a thread is executing and has a lock but panics. The mutex is hanging or poisoned. In rust we can actually recover from this though I suspect this is very undesirable. Here is the sample code where we match on poisoned.
<syntaxhighlight lang="rs">
    let lock = Arc::new(Mutex::new(0));
    let lock2 = Arc::clone(&lock);
    let _ = thread::spawn(move || {
        let _guard = lock2.lock().unwrap(); // acquire lock
        panic!("thread1"); // mutex is not poisoned
    }).join();
    let mut guard = match lock.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };
    *guard += 1;
    println!("lock value: {}", *guard);
</syntaxhighlight>
==Rayon==
Quick example of rayon for parallelization with Rayon. Was a little unhappy with the changing of signatures for reduce in rayon but ho-hum. This took 6550ms single threaded and 186ms multi-threaded.
<syntaxhighlight lang="rs">
fn factoral (n: u32) -> BigUint {
    if n == 0 || n ==1 {
        BigUint::from(1u32)
    } else {
        // Reduce in Typescript is array.reduce((acc, next_value) => acc * next_value, 1)
        (1..=n).map(BigUint::from).reduce(|acc, next_value| acc * next_value).unwrap()
        // new way
        // n * factoral(n - 1)
    }
}
fn factoral_with_rayon (n: u32) -> BigUint {
    if n == 0 || n ==1 {
        BigUint::from(1u32)
    } else {
        (1..=n).into_par_iter().map(BigUint::from).reduce(|| BigUint::from(1u32), |acc, x| acc * x)
    }
}
main() {
    let mut now = std::time::Instant::now();
    factoral(80000);
    println!("factoral took {} seconds", now.elapsed().as_millis());
    now = std::time::Instant::now();
    factoral_with_rayon(80000);
    println!("factoral with rayon took {} seconds", now.elapsed().as_millis());
}
}
</syntaxhighlight>
</syntaxhighlight>

Revision as of 02:40, 10 October 2024

Asyncronous

Async and Await (Future Trait)

This is just like promises. I learned a few things doing this. Renaming namespaces can be done using as. For the asynchronous work with thread still present there was a lot of crossover between the two. Note not all code listed so additional use statements demonstrate the renaming requirement.

use async_std::{prelude::*, io as async_io, task as async_task, fs as async_fs, fs::File as async_file};
use std::{cell::RefCell, fs, thread, sync::{mpsc::sync_channel, Arc, Mutex}};

async fn read_file(path: &str) -> async_io::Result<String> {
    let mut file: async_file = async_fs::File::open(path).await?;
    let mut contents = String::new();
    file.read_to_string(&mut contents).await?;
    Ok(contents)
}

fn main() {
    let task = async_task::spawn(async {
        let  result = read_file("Cargo.toml").await;
        match result {
            Ok(k) => println!("contents {}", k),
            Err(err) => println!("error {}", err),
            
        }
    });

    async_std::task::block_on(task);
    println!("Task stopped");
}

Concurrency

Thread Join

Threads are similar to C++ and C#. Let do a basic join

let handle = thread::spawn(move || {
    println!("Hello from a thread!")
});

handle.join().unwrap();
println!("Hello Main!")

Not threads do not always finish in the order created

    v.into_iter().for_each(|e| {
        thread_handles.push(thread::spawn(move || println!("Thread {}",e)));
    });

    thread_handles.into_iter().for_each(|handle| {
        handle.join().unwrap();
    });

// Thread 2
// Thread 1
// Thread 3

Channels

This is like channels in kotlin. These live in the mpsc namespace which stands for multi producer single consumer. The value being sent is taken on send and receive. I.E. you cannot use the value being sent after send.

Example One Producer

use std::sync::mpsc::channel;
use std::thread;

let (sender, receiver) = channel();

// Spawn off an expensive computation
thread::spawn(move|| {
    sender.send(expensive_computation()).unwrap();
});

// Do some useful work for awhile

// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());

Example Two Producer

We can have multiple producers and one receiver. To ensure that the receiver is not overwhelmed rust provides a sync_channel method on the mpsc. When used the sender will block when the queue if full and automatically continue when reduced.

    let (producer1, receiver) = sync_channel(1000);
    let producer2 = producer1.clone();

    // Send from Producer 1
    thread::spawn(move|| {
        let vec = vec![String::from("transmitting"), String::from("hello"), String::from("world"),];
        for val in vec {
            producer1.send(val).unwrap();
        }
    });

    thread::spawn(move|| {
        let vec = vec![String::from("producer2"), String::from("hello"), String::from("world 2"),];
        for val in vec {
            producer2.send(val).unwrap();
        }
    });

    // Send from Producer 
    for received in receiver {
        println!("Got: {}", received);
    }

Sync and Send Type Traits

Send

Types that implement send are safe to pass by value to another thread and moved across threads. Almost all types implement send but there are exceptions, e.g. Rc however the Atomic Reference Counter (Arc) can be. This did not compile for me if I tried to use Rc with the error std::rc::Rc<std::string::String>` cannot be sent between threads safely within `{closure@src/main.rs:116:24: 116:31}`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::string::String> but it did on the course.

Sync

Types that implement sync are safe to pass by non mutable reference to another thread. These types can be shared across threads.

Mutexes and Threads (Shared State)

Introduction

These a like c++ mutexes. You need to get and release a lock when using the resource. It was stressed the the atomic reference counter Arc (no Rc) is used for sharing resources across threads. Mutexes are used for mutating (modifying) data that is shared across threads.

Deadlocks

Below is an example of a deadlock where the same thread asks for the lock without releasing. Note you can release the mutex with drop on the guard (numMutexGuard). I also noticed if you did not use the value then the code did complete - maybe an optimizer.

    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move|| {
            let mut numMutexGuard = counter.lock().unwrap();
            // *numMutexGuard += 1;
            // std::mem::drop(numMutexGuard);
            let mut numMutexGuard2 = counter.lock().unwrap();
            *numMutexGuard2 += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());

Poisoned Mutex

This is a term which means when a thread is executing and has a lock but panics. The mutex is hanging or poisoned. In rust we can actually recover from this though I suspect this is very undesirable. Here is the sample code where we match on poisoned.

    let lock = Arc::new(Mutex::new(0));
    let lock2 = Arc::clone(&lock);

    let _ = thread::spawn(move || {
        let _guard = lock2.lock().unwrap(); // acquire lock
        panic!("thread1"); // mutex is not poisoned
    }).join();

    let mut guard = match lock.lock() {
        Ok(guard) => guard,
        Err(poisoned) => poisoned.into_inner(),
    };

    *guard += 1;
    println!("lock value: {}", *guard);

Rayon

Quick example of rayon for parallelization with Rayon. Was a little unhappy with the changing of signatures for reduce in rayon but ho-hum. This took 6550ms single threaded and 186ms multi-threaded.

fn factoral (n: u32) -> BigUint {
    if n == 0 || n ==1 {
        BigUint::from(1u32)
    } else {
        // Reduce in Typescript is array.reduce((acc, next_value) => acc * next_value, 1)
        (1..=n).map(BigUint::from).reduce(|acc, next_value| acc * next_value).unwrap()
        // new way 
        // n * factoral(n - 1)
    }

}

fn factoral_with_rayon (n: u32) -> BigUint {
    if n == 0 || n ==1 {
        BigUint::from(1u32)
    } else {
        (1..=n).into_par_iter().map(BigUint::from).reduce(|| BigUint::from(1u32), |acc, x| acc * x)
    }
}


main() {
    let mut now = std::time::Instant::now();
    factoral(80000);
    println!("factoral took {} seconds", now.elapsed().as_millis());
    now = std::time::Instant::now();
    factoral_with_rayon(80000);
    println!("factoral with rayon took {} seconds", now.elapsed().as_millis());
}