Trait txrx::SenderExt [−][src]
pub trait SenderExt: 'static + Sealed + Sender + Sized {
fn map<F, Ret>(self, func: F) -> Map<Self, F>
where
F: FnOnce(Self::Output) -> Ret,
{ ... }
fn sync_wait(self) -> WaitResult<Self::Output> { ... }
fn ensure_started(self) -> EnsureStarted<Self> { ... }
fn transfer<Sched>(self, scheduler: Sched) -> Transfer<Self, Sched> { ... }
fn when_both<Rhs>(self, rhs: Rhs) -> WhenBoth<Self, Rhs> { ... }
fn and_then<Func>(self, func: Func) -> AndThen<Self, Func> { ... }
fn bulk<Func, BulkResult>(self, size: usize, func: Func) -> Bulk<Self, Func>
where
Func: Clone + Fn(usize, &Self::Output) -> BulkResult,
{ ... }
fn into_awaitable(self) -> Awaitable<Self>ⓘ { ... }
}
Provided methods
fn sync_wait(self) -> WaitResult<Self::Output>
fn ensure_started(self) -> EnsureStarted<Self>
Returns a sender that completes when both the self
sender and the rhs
sender completes.
The output type of the sender is (Self::Output, Right::Output)
. Scheduler is Self::Scheduler
.
Example
use txrx::SenderExt;
let value = txrx::factories::just(10)
.when_both(txrx::factories::just("hello"))
.sync_wait()
.unwrap();
assert_eq!(value, (10, "hello"));
Returns a sender that invokes the provided function func
size
times.
bulk()
will schedule func
size
times on the scheduler associated with the input sender.
This means that if the associated scheduler is multi-threaded, via a thread pool for instance,
the functions may run in parallel.
func
must be cloneable and takes as its arguments an index and a reference to the value sent
by the previous sender.
The index tells which run in the range (0..size)
that a particular invocation is.
The output from a bulk()
sender is (Input::Output, Vec<Func::Output>)
.
For instance the following bulk sender txrx::factories::just("hello").bulk(2, |_, _| 5)
will have (&str, Vec<i32>)
as its output type. The vector contains the results of the
various bulk function invocations, based on the index.
Examples
use txrx::traits::{Scheduler, Sender, Receiver, SenderExt};
// This is a scheduler that always spins up a new thread to start its work on.
#[derive(Copy, Clone)]
struct NewThreadScheduler;
impl Sender for NewThreadScheduler {
type Output = ();
type Scheduler = Self;
fn start<R>(self, receiver: R)
where
R: 'static + Send + Receiver<Input=Self::Output>
{
std::thread::spawn(move || { receiver.set_value(()) });
}
fn get_scheduler(&self) -> Self::Scheduler {
Self
}
}
impl Scheduler for NewThreadScheduler {
type Sender = Self;
fn schedule(&mut self) -> Self::Sender {
Self
}
}
let result = NewThreadScheduler
.map(|_| {
5
})
.bulk(4, |i, always_5| {
println!("Running step {} on thread {:?}", i, std::thread::current().id());
println!("Always 5 = {}", always_5);
i + always_5
}).sync_wait();
assert_eq!(result.unwrap(), (5, vec![5, 6, 7, 8]))
One possible output from running this is
Running step 3 on thread ThreadId(2)
Always 5 = 5
Running step 0 on thread ThreadId(3)
Always 5 = 5
Running step 1 on thread ThreadId(4)
Always 5 = 5
Running step 2 on thread ThreadId(5)
Always 5 = 5
But since the scheduler isn’t deterministic the output may/will vary from run to run.