Future トレイト
Future
トレイトは Rust の非同期プログラミングの中心人物です。(超重役だぞ!)
Future
は値を生成できる非同期計算です。(()
のような空の値の時もあります)
簡略化したFuture
トレイトは以下のようになります。
#![allow(unused_variables)]
fn main() {
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
}
このpoll
関数を呼び出すことでFuture
を進めることが出来ます、これにより、Future
が可能な限り完了するようになります。Future
が完了するとPoll::Ready(result)
を返し、未完了のときはPoll::Pending
を返して、Future
がさらに進む準備ができたときにwake()
関数が呼び出されるように準備します。
wake()
が呼び出されると、Future
を駆動するエグゼキューターがpoll
を再度呼び出し、Future
を更に進めようとします。
wakte()
がなければ、エグゼキューターはFuture
がいつ進むかを知る方法がなく、つねにすべてのfuture
をポーリングする必要があります。wake()
を使用することで、エグゼキューターはどのFuture
をpoll
する準備ができているかを正確に把握できます。
例えば、すでに利用可能なデータが有る場合とない場合があるソケットから読み取りたい場合を考えます。データが有る場合、Poll::Ready(data)
でそれを読み込んで返すことが出来ます。
しかし、データの準備ができていない時Future
はブロックされ、進行できなくなります。
データが利用できない時、ソケットでデータの準備ができた時にwake
が呼び出されるように登録する必要があります。
これにより、エグゼキューターに準備が整ったことが分かります。
単純なSocketRead
は次のようになります。
#![allow(unused_variables)]
fn main() {
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// ソケットにはデータがあります。それをバッファに読み込んで返します。
Poll::Ready(self.socket.read_buf())
} else {
// ソケットにはまだデータがありません
//
// データが利用可能になったらwakeが呼び出されるようにします。
// データが利用可能になるとwakeが呼び出され
// このFutureのユーザーは再度pollを呼び出してデータを受信することが分かります。
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
}
このモデルでは、中間割当を必要とせずに、複数の非同期オペレーションを一緒に構築できます。 次のように、複数のFuture
を実行したり、連鎖させたりすることは割当のないステートマシンを介して実装できます。
#![allow(unused_variables)]
fn main() {
/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
// Each field may contain a future that should be run to completion.
// If the future has already completed, the field is set to `None`.
// This prevents us from polling a future after it has completed, which
// would violate the contract of the `Future` trait.
a: Option<FutureA>,
b: Option<FutureB>,
}
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// Attempt to complete future `a`.
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}
// Attempt to complete future `b`.
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}
if self.a.is_none() && self.b.is_none() {
// Both futures have completed-- we can return successfully
Poll::Ready(())
} else {
// One or both futures returned `Poll::Pending` and still have
// work to do. They will call `wake()` when progress can be made.
Poll::Pending
}
}
}
}
これは、個別の割当を必要とせずに複数のFuture
を同時に実行できる方法を示し、より効率的な非同期プログラムを可能にします。同様に、複数のシーケンシャルFuture
を次々に実行することもできます。
#![allow(unused_variables)]
fn main() {
/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
first: Option<FutureA>,
second: FutureB,
}
impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if let Some(first) = &mut self.first {
match first.poll(wake) {
// We've completed the first future-- remove it and start on
// the second!
Poll::Ready(()) => self.first.take(),
// We couldn't yet complete the first future.
Poll::Pending => return Poll::Pending,
};
}
// Now that the first future is done, attempt to complete the second.
self.second.poll(wake)
}
}
}
これらの例は複数の割り当てられたオブジェクトと深くネストされたコールバックを必要とせずに、Future
トレイトを使用して非同期制御フローを表現する方法を示しています。
基本的な制御フローが終わったら、実際のFuture
トレイトとSimpleFuture
がどのように異なるかを話します。
#![allow(unused_variables)]
fn main() {
trait Future {
type Output;
fn poll(
// Note the change from `&mut self` to `Pin<&mut Self>`:
self: Pin<&mut Self>,
// and the change from `wake: fn()` to `cx: &mut Context<'_>`:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
}
気づきやすい最初の変更はself
がもはや&mut self
出ないことです。
Pin<&mut Self>
に変更されました。Pin
については、あとのセクションで詳しくお話します、が、現時点では、Pin
によって固定したFuture
を作成できることを知っています。固定されたオブジェクトはフィールドなどにポインタを格納できます。struct MyFut { a: i32, ptr_to_a: *const i32 }
のように。
async / await
を有効にするにはピン留めが必要になります。
次に、wake: fn()
は&mut Context<'_>'
に変更されました。
SimpleFuture
では関数ポインターの呼び出しを使用して、Future
をポーリングすることをエグゼキューターに伝えました。しかし、fn()
はサイズがゼロであるため、wake
が呼ばれたことなどのデータを保存することができません。
実際は、Web サーバーのような複雑なアプリケーションには起動をすべて個別に管理する必要がある数千の異なる接続が存在する場合があります。
このContext
タイプは特定のタスクを起動するために使用できるWaker
へのアクセスを提供することでこれを解決します。