タスクを起こせ!Waker!
Future
は最初にpoll
された時に完了できないことがよくあります。これが発生した時、Future
はさらに前進する準備ができたら再度ポーリングされるようにする必要があります。
これはWaker
型で行われます。
Future
がポーリングされるたびに、「タスク」の一部としてポーリングされます。
タスクは、エグゼキューターに送信されたトップレベルのFuture
です。
Waker
は関連付けられたタスクを起動する必要があることをエグゼキューターに伝えるために使用できるwake()
メソッドを提供します。
wake()
が呼び出された時、エグゼキューターは、Waker
と関連するタスクが進む準備が出来たことを知っています。そして、再びポーリングする必要があることも。
Waker
はclone()
も実装しているため、コピーして保存することが出来ます。
Waker
を使用して単純なタイマーを実装してみましょう!
タイマー作成
この例では、タイマーが作成された時に新しいスレッドを立て、必要な時間だけスリープし、時間経過した時にタイマーのFuture
を通知します。
必要なインポートは次のとおりです。
# #![allow(unused_variables)] #fn main() { use { std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }, }; #}
Future
の型自体を定義するところからです。私達の future にはスレッドが、タイマーが経過し、future が完了するべきであることを伝える方法が必要です。 Arc<Mutex<..>>
を使用して、スレッドと future の間で通信します。
# #![allow(unused_variables)] #fn main() { pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, } /// Shared state between the future and the waiting thread struct SharedState { /// Whether or not the sleep time has elapsed completed: bool, /// The waker for the task that `TimerFuture` is running on. /// The thread can use this after setting `completed = true` to tell /// `TimerFuture`'s task to wake up, see that `completed = true`, and /// move forward. waker: Option<Waker>, } #}
実際にFuture
の実装を書いていきましょう!
# #![allow(unused_variables)] #fn main() { impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Look at the shared state to see if the timer has already completed. let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { Poll::Ready(()) } else { // Set waker so that the thread can wake up the current task // when the timer has completed, ensuring that the future is polled // again and sees that `completed = true`. // // It's tempting to do this once rather than repeatedly cloning // the waker each time. However, the `TimerFuture` can move between // tasks on the executor, which could cause a stale waker pointing // to the wrong task, preventing `TimerFuture` from waking up // correctly. // // N.B. it's possible to check for this using the `Waker::will_wake` // function, but we omit that here to keep things simple. shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } #}
かなり簡単ですね。スレッドにshared_state.completed = true
が設定されている時、完了です! それ以外の場合Waker
は現在のタスクのクローンを作成してshared_state.waker
に渡し、スレッドがタスクを復帰できるようにします。
重要なのは、future が異なるWaker
を持つ異なるタスクに移動した可能性があるため、future がポーリングされるたびにWaker
を更新する必要があることです。これは、ポーリングされたあと、タスク間で future が渡される時に発生します。
最後に、実際にタイマーを構築してスレッドを開始する API が必要です。
# #![allow(unused_variables)] #fn main() { impl TimerFuture { /// Create a new `TimerFuture` which will complete after the provided /// timeout. pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // Spawn the new thread let thread_shared_state = shared_state.clone(); thread::spawn(move || { thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // Signal that the timer has completed and wake up the last // task on which the future was polled, if one exists. shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); TimerFuture { shared_state } } } #}
すげぇ!単純なタイマーの future を構築するために必要なのはそれだけです。 さて、future を実行するエグゼキューターがいれば、、、