タスクを起こせ!Waker!
Future
は最初にpoll
された時に完了できないことがよくあります。これが発生した時、Future
はさらに前進する準備ができたら再度ポーリングされるようにする必要があります。
これはWaker
型で行われます。
Future
がポーリングされるたびに、「タスク」の一部としてポーリングされます。
タスクは、エグゼキューターに送信されたトップレベルのFuture
です。
Waker
は関連付けられたタスクを起動する必要があることをエグゼキューターに伝えるために使用できるwake()
メソッドを提供します。
wake()
が呼び出された時、エグゼキューターは、Waker
と関連するタスクが進む準備が出来たことを知っています。そして、再びポーリングする必要があることも。
Waker
はclone()
も実装しているため、コピーして保存することが出来ます。
Waker
を使用して単純なタイマーを実装してみましょう!
タイマー作成
この例では、タイマーが作成された時に新しいスレッドを立て、必要な時間だけスリープし、時間経過した時にタイマーのFuture
を通知します。
必要なインポートは次のとおりです。
#![allow(unused_variables)]
fn main() {
use {
std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
},
};
}
Future
の型自体を定義するところからです。私達の future にはスレッドが、タイマーが経過し、future が完了するべきであることを伝える方法が必要です。 Arc<Mutex<..>>
を使用して、スレッドと future の間で通信します。
#![allow(unused_variables)]
fn main() {
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
struct SharedState {
/// Whether or not the sleep time has elapsed
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
waker: Option<Waker>,
}
}
実際にFuture
の実装を書いていきましょう!
#![allow(unused_variables)]
fn main() {
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Look at the shared state to see if the timer has already completed.
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
}
かなり簡単ですね。スレッドにshared_state.completed = true
が設定されている時、完了です! それ以外の場合Waker
は現在のタスクのクローンを作成してshared_state.waker
に渡し、スレッドがタスクを復帰できるようにします。
重要なのは、future が異なるWaker
を持つ異なるタスクに移動した可能性があるため、future がポーリングされるたびにWaker
を更新する必要があることです。これは、ポーリングされたあと、タスク間で future が渡される時に発生します。
最後に、実際にタイマーを構築してスレッドを開始する API が必要です。
#![allow(unused_variables)]
fn main() {
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
}
すげぇ!単純なタイマーの future を構築するために必要なのはそれだけです。 さて、future を実行するエグゼキューターがいれば、、、