Message-Passing
The preferred way for inter-thread communication in Drone OS is
message-passing. In a similar way as Rust's stdlib offers std::sync::mpsc
for
multi-producer single-consumer queues, Drone offers three different kinds of
single-producer single-consumer queues under drone_core::sync::spsc
.
Oneshot
The oneshot channel is used to transfer an ownership of a single value from one thread to another. You can create a channel like this:
#![allow(unused)] fn main() { use drone_core::sync::spsc::oneshot; let (tx, rx) = oneshot::channel(); }
tx
and rx
are transmitting and receiving parts respectively, they can be
passed to different threads. The tx
part has a send
method, which takes
self
by value, meaning it can be called only once:
#![allow(unused)] fn main() { tx.send(my_message); }
The rx
part is a future, which means it can be .await
ed:
#![allow(unused)] fn main() { let my_message = rx.await; }
Ring
For passing multiple values of one type, there is the ring channel. It works by allocating a fixed-size ring-buffer:
#![allow(unused)] fn main() { use drone_core::sync::spsc::ring; let (tx, rx) = ring::channel(100); }
Here 100
is the size of the underlying ring buffer. The tx
part is used to
send values over the channel:
#![allow(unused)] fn main() { tx.send(value1); tx.send(value2); tx.send(value3); }
The rx
part is a stream:
#![allow(unused)] fn main() { while let Some(value) = rx.next().await { // new value received } }
Pulse
When you need to repeatedly notify the other thread about some event, but without any payload, the ring channel might be an overkill. There is the pulse channel, which is backed by an atomic counter:
#![allow(unused)] fn main() { use drone_core::sync::spsc::pulse; let (tx, rx) = pulse::channel(); }
The tx
part has a send
method, which takes a number to add to the underlying
counter:
#![allow(unused)] fn main() { tx.send(1); tx.send(3); tx.send(100); }
The rx
part is a stream. Each successful poll of the stream clears the
underlying counter and returns the number, which was stored:
#![allow(unused)] fn main() { while let Some(pulses) = rx.next().await { // `pulses` number of events was happened since the last poll } }
Futures and streams
Thread tokens have methods that helps creating described channels for connecting with a particular thread.
add_future
takes a fiber and returns a future (rx
part of a oneshot
channel). The future will be resolved when the fiber returns fib::Complete
:
#![allow(unused)] fn main() { use drone_cortexm::{fib, thr::prelude::*}; let pll_ready = thr.rcc.add_future(fib::new_fn(|| { if pll_ready_flag.read_bit() { fib::Complete(()) } else { fib::Yielded(()) } })); pll_ready.await; }
add_try_stream
returns a stream (rx
part of a ring channel), which resolves
each time the fiber returns fib::Yielded(Some(...))
or
fib::Complete(Some(...))
:
#![allow(unused)] fn main() { use drone_cortexm::{fib, thr::prelude::*}; let uart_bytes = thr.uart.add_try_stream( 100, // The ring buffer size || panic!("Ring buffer overflow"), fib::new_fn(|| { if let Some(byte) = read_uart_byte() { fib::Yielded(Some(byte)) } else { fib::Yielded(None) } }), ); }
add_pulse_try_stream
returns a stream (rx
part of pulse channel), which
resolves each time the fiber returns fib::Yielded(Some(number))
or
fib::Complete(Some(number))
:
#![allow(unused)] fn main() { use drone_cortexm::{fib, thr::prelude::*}; let sys_tick_stream = thr.sys_tick.add_pulse_try_stream( || panic!("Counter overflow"), fib::new_fn(|| fib::Yielded(Some(1))), ); }