タスクを起こせ!Waker!

Futureは最初にpollされた時に完了できないことがよくあります。これが発生した時、Futureはさらに前進する準備ができたら再度ポーリングされるようにする必要があります。 これはWaker型で行われます。

Futureがポーリングされるたびに、「タスク」の一部としてポーリングされます。 タスクは、エグゼキューターに送信されたトップレベルのFutureです。

Wakerは関連付けられたタスクを起動する必要があることをエグゼキューターに伝えるために使用できるwake()メソッドを提供します。

wake()が呼び出された時、エグゼキューターは、Wakerと関連するタスクが進む準備が出来たことを知っています。そして、再びポーリングする必要があることも。

Wakerclone()も実装しているため、コピーして保存することが出来ます。

Wakerを使用して単純なタイマーを実装してみましょう!

タイマー作成

この例では、タイマーが作成された時に新しいスレッドを立て、必要な時間だけスリープし、時間経過した時にタイマーのFutureを通知します。

必要なインポートは次のとおりです。


# #![allow(unused_variables)]
#fn main() {
use {
    std::{
        future::Future,
        pin::Pin,
        sync::{Arc, Mutex},
        task::{Context, Poll, Waker},
        thread,
        time::Duration,
    },
};
#}

Futureの型自体を定義するところからです。私達の future にはスレッドが、タイマーが経過し、future が完了するべきであることを伝える方法が必要です。 Arc<Mutex<..>>を使用して、スレッドと future の間で通信します。


# #![allow(unused_variables)]
#fn main() {
pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
struct SharedState {
    /// Whether or not the sleep time has elapsed
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    waker: Option<Waker>,
}
#}

実際にFutureの実装を書いていきましょう!


# #![allow(unused_variables)]
#fn main() {
impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}
#}

かなり簡単ですね。スレッドにshared_state.completed = trueが設定されている時、完了です! それ以外の場合Wakerは現在のタスクのクローンを作成してshared_state.wakerに渡し、スレッドがタスクを復帰できるようにします。

重要なのは、future が異なるWakerを持つ異なるタスクに移動した可能性があるため、future がポーリングされるたびにWakerを更新する必要があることです。これは、ポーリングされたあと、タスク間で future が渡される時に発生します。

最後に、実際にタイマーを構築してスレッドを開始する API が必要です。


# #![allow(unused_variables)]
#fn main() {
impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration);
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}
#}

すげぇ!単純なタイマーの future を構築するために必要なのはそれだけです。 さて、future を実行するエグゼキューターがいれば、、、