Future トレイト

Futureトレイトは Rust の非同期プログラミングの中心人物です。(超重役だぞ!)

Futureは値を生成できる非同期計算です。(()のような空の値の時もあります)

簡略化したFutureトレイトは以下のようになります。


# #![allow(unused_variables)]
#fn main() {
trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}
#}

このpoll関数を呼び出すことでFutureを進めることが出来ます、これにより、Futureが可能な限り完了するようになります。Futureが完了するとPoll::Ready(result)を返し、未完了のときはPoll::Pendingを返して、Futureがさらに進む準備ができたときにwake()関数が呼び出されるように準備します。

wake()が呼び出されると、Futureを駆動するエグゼキューターがpollを再度呼び出し、Futureを更に進めようとします。

wakte()がなければ、エグゼキューターはFutureがいつ進むかを知る方法がなく、つねにすべてのfutureをポーリングする必要があります。wake()を使用することで、エグゼキューターはどのFuturepollする準備ができているかを正確に把握できます。

例えば、すでに利用可能なデータが有る場合とない場合があるソケットから読み取りたい場合を考えます。データが有る場合、Poll::Ready(data)でそれを読み込んで返すことが出来ます。 しかし、データの準備ができていない時Futureはブロックされ、進行できなくなります。 データが利用できない時、ソケットでデータの準備ができた時にwakeが呼び出されるように登録する必要があります。 これにより、エグゼキューターに準備が整ったことが分かります。

単純なSocketReadは次のようになります。


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // ソケットにはデータがあります。それをバッファに読み込んで返します。
            Poll::Ready(self.socket.read_buf())
        } else {
            // ソケットにはまだデータがありません
            //
            // データが利用可能になったらwakeが呼び出されるようにします。
            // データが利用可能になるとwakeが呼び出され
            // このFutureのユーザーは再度pollを呼び出してデータを受信することが分かります。
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

このモデルでは、中間割当を必要とせずに、複数の非同期オペレーションを一緒に構築できます。 次のように、複数のFutureを実行したり、連鎖させたりすることは割当のないステートマシンを介して実装できます。


# #![allow(unused_variables)]
#fn main() {
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}
#}

これは、個別の割当を必要とせずに複数のFutureを同時に実行できる方法を示し、より効率的な非同期プログラムを可能にします。同様に、複数のシーケンシャルFutureを次々に実行することもできます。


# #![allow(unused_variables)]
#fn main() {
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}
#}

これらの例は複数の割り当てられたオブジェクトと深くネストされたコールバックを必要とせずに、Futureトレイトを使用して非同期制御フローを表現する方法を示しています。

基本的な制御フローが終わったら、実際のFutureトレイトとSimpleFutureがどのように異なるかを話します。


# #![allow(unused_variables)]
#fn main() {
trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
#}

気づきやすい最初の変更はselfがもはや&mut self出ないことです。 Pin<&mut Self>に変更されました。Pinについては、あとのセクションで詳しくお話します、が、現時点では、Pinによって固定したFutureを作成できることを知っています。固定されたオブジェクトはフィールドなどにポインタを格納できます。struct MyFut { a: i32, ptr_to_a: *const i32 }のように。 async / awaitを有効にするにはピン留めが必要になります。

次に、wake: fn()&mut Context<'_>'に変更されました。 SimpleFutureでは関数ポインターの呼び出しを使用して、Futureをポーリングすることをエグゼキューターに伝えました。しかし、fn()はサイズがゼロであるため、wakeが呼ばれたことなどのデータを保存することができません。

実際は、Web サーバーのような複雑なアプリケーションには起動をすべて個別に管理する必要がある数千の異なる接続が存在する場合があります。 このContextタイプは特定のタスクを起動するために使用できるWakerへのアクセスを提供することでこれを解決します。