Trait txrx::SenderExt[][src]

pub trait SenderExt: 'static + Sealed + Sender + Sized {
    fn map<F, Ret>(self, func: F) -> Map<Self, F>
    where
        F: FnOnce(Self::Output) -> Ret
, { ... }
fn sync_wait(self) -> WaitResult<Self::Output> { ... }
fn ensure_started(self) -> EnsureStarted<Self> { ... }
fn transfer<Sched>(self, scheduler: Sched) -> Transfer<Self, Sched> { ... }
fn when_both<Rhs>(self, rhs: Rhs) -> WhenBoth<Self, Rhs> { ... }
fn and_then<Func>(self, func: Func) -> AndThen<Self, Func> { ... }
fn bulk<Func, BulkResult>(self, size: usize, func: Func) -> Bulk<Self, Func>
    where
        Func: Clone + Fn(usize, &Self::Output) -> BulkResult
, { ... }
fn into_awaitable(self) -> Awaitable<Self>
Notable traits for Awaitable<S>
impl<S: Sender> Future for Awaitable<S> type Output = Result<S::Output>;
{ ... } }

Provided methods

Returns a sender that completes when both the self sender and the rhs sender completes.

The output type of the sender is (Self::Output, Right::Output). Scheduler is Self::Scheduler.

Example

use txrx::SenderExt;
let value = txrx::factories::just(10)
    .when_both(txrx::factories::just("hello"))
    .sync_wait()
    .unwrap();

assert_eq!(value, (10, "hello"));

Returns a sender that invokes the provided function func size times.

bulk() will schedule func size times on the scheduler associated with the input sender. This means that if the associated scheduler is multi-threaded, via a thread pool for instance, the functions may run in parallel.

func must be cloneable and takes as its arguments an index and a reference to the value sent by the previous sender.

The index tells which run in the range (0..size) that a particular invocation is.

The output from a bulk() sender is (Input::Output, Vec<Func::Output>).

For instance the following bulk sender txrx::factories::just("hello").bulk(2, |_, _| 5) will have (&str, Vec<i32>) as its output type. The vector contains the results of the various bulk function invocations, based on the index.

Examples

use txrx::traits::{Scheduler, Sender, Receiver, SenderExt};

// This is a scheduler that always spins up a new thread to start its work on.
#[derive(Copy, Clone)]
struct NewThreadScheduler;

impl Sender for NewThreadScheduler {
    type Output = ();
    type Scheduler = Self;

    fn start<R>(self, receiver: R)
    where
        R: 'static + Send + Receiver<Input=Self::Output>
    {
        std::thread::spawn(move || { receiver.set_value(()) });
    }

    fn get_scheduler(&self) -> Self::Scheduler {
        Self
    }
}

impl Scheduler for NewThreadScheduler {
    type Sender = Self;

    fn schedule(&mut self) -> Self::Sender {
        Self
    }
}

let result = NewThreadScheduler
    .map(|_| {
        5
    })
    .bulk(4, |i, always_5| {
        println!("Running step {} on thread {:?}", i, std::thread::current().id());
        println!("Always 5 = {}", always_5);
        i + always_5
    }).sync_wait();
assert_eq!(result.unwrap(), (5, vec![5, 6, 7, 8]))

One possible output from running this is

Running step 3 on thread ThreadId(2)
Always 5 = 5
Running step 0 on thread ThreadId(3)
Always 5 = 5
Running step 1 on thread ThreadId(4)
Always 5 = 5
Running step 2 on thread ThreadId(5)
Always 5 = 5

But since the scheduler isn’t deterministic the output may/will vary from run to run.

Starts the sender and returns an awaitable that be used to retrieve the result.

Implementors