Getting Started
ようこそ Rust の非同期プログラミングへ! 非同期の Rust コードを作りたいなら、キミは適切な本を探し当てたね!
Web サーバーやデータベース、オペレーティングシステムのいずれかを構築している場合でも、このドキュメントは、Rust の非同期プログラミングツールを使用してハードウェア性能を最大限に活用する方法を示します。
What This Book Covers
この本は、Rust の非同期な言語機能、ライブラリを使用するための、初心者と熟練者に適した、最新の、包括的なガイドを目指しています。
-
初期の章では、非同期プログラミングの一般的な概要と、それに対して Rust がどのように取り組んでいるか話します。
-
中間章では、非同期なコードを書く時に使うことのできる、主なユーティリティと制御フローツールについて説明し、パフォーマンスと再利用性を最大限発揮するために、ライブラリとアプリケーションを構築するときのベストプラクティスを紹介します。
-
最後の章では、広い範囲の非同期エコシステムについて説明し、一般的た課題を達成するための例をいくつか紹介します
なぜ非同期なのか?
Rust を使うと、高速で安全なソフトウェアが書けます。しかし、なぜ非同期コードを書くんでしょう?
非同期コードを使用すると、同じ OS のスレッドで複数のタスクを同時に実行できるようになる。 典型的なスレッドを使ったアプリケーションでは、2 つの Web ページを同時にダウンロードしたい時、次のように 2 つのスレッドに作業を分散します。
# #![allow(unused_variables)] #fn main() { fn get_two_sites() { // 作業を行うために2つのスレッドを作成します。 let thread_one = thread::spawn(|| download("https:://www.foo.com")); let thread_two = thread::spawn(|| download("https:://www.bar.com")); // 両方のスレッドが完了するまで待ちます。 thread_one.join().expect("thread one panicked"); thread_two.join().expect("thread two panicked"); } #}
これは多くのアプリケーションでうまく機能する。スレッドはこれを行うように設計されているからね。複数の異なるタスクを同時に実行するようにね。
ただ、いくつか制限もあるんだ。スレッドの切り替えや、スレッド間でのデータ共有をするプロセスにはオーバーヘッドが結構あるのです…。
ただ何もしないで居座っているスレッドでさえ、貴重なシステムリソースを食いつぶします。これらは、非同期コードが排除するために設計されたコストです。私達は Rust のasync
/.await
記法を使って、先ほどのコードを書き換えることができます。それも、複数スレッドを作成することなく、一度に複数タスクを実行できるようにね。
# #![allow(unused_variables)] #fn main() { async fn get_two_sites_async() { // 2つの異なるfutureを作成します。これらは、完了するまで実行すると // 非同期でWebページをダウンロードします。 let future_one = download_async("https:://www.foo.com"); let future_two = download_async("https:://www.bar.com"); // 両方のfutureを同時に完了するまで実行します。 join!(future_one, future_two); } #}
全体として、非同期アプリケーションはスレッド実装よりもすごく高速で、使うリソースも少ない可能性があります。ただし、コストがかかってしまいます。 スレッドは OS によってネイティブにサポートされているので、特別なプログラミングモデルは不必要です、どんな関数もスレッドを作成でき、通常の関数と同じくらい簡単にスレッドを使用する関数を呼び出すことが出来ます。
ただし、非同期関数は、言語もしくはライブラリの特別なサポートが必要になります。Rust では、async fn
がfuture
を返す非同期関数を作ってくれます。関数本体を実行するにはfuture
を最後まで実行する必要があります。
従来のスレッド化されたアプリケーションも非常に効果的で、Rust の小さなメモリ追跡と予測によってasync
を使わなくても十分である可能性を忘れないでください。
非同期プログラミングモデルによる複雑性の増加は、常にそれだけの価値があるか分からいないものです。単純なスレッドモデルの使用を考慮することも重要です。
非同期 Rust の現状
非同期 Rust はエコシステム時間とともに大きく進化してきたため、使うツール、投資するライブラリ、読むべきドキュメントを把握するのは大変です。ただ、標準ライブラリのFuture
トレイトと言語機能のasync
/ await
は最近安定化されました。(パチパチ!) なので、エコシステム全体は新しく安定化された API への移行の真っ最中で、その後、エコシステムの激しい動きは大幅に解消されます。
しかし現時点では、エコシステムはまだ急速に発展していて、非同期 Rust の経験は磨かれていません。ほとんどのライブラリはまだ、futures
クレートの 0.1 を使っています。そのため開発者がfutures
0.3 と相互運用したければcompat
クレートの機能を頻繁に使用する必要があります。
また、トレイトメソッド内のasync fn
構文はまだ実装されていなかったり、分かりやすいコンパイルエラーメッセージはまだなかったりします。async
/ await
言語機能はまだ、新しいからですね。
とは言っても、Rust はパフォーマンスが高く、非同期プログラミングに対して人間工学に基づいたサポートもあります、なのであなたが冒険、探検を恐れないのであれば、Rust の非同期プログラミングの世界に飛び込みましょう! ダイブ!
async
/ .await
入門!
async/.await
は通常の同期コードの用に見える非同期関数を作成するための Rust のビルドインツールです。 async
はコードブロックをFuture
トレイトを実装しているステートマシンに変換するものです。 一方、同期メソッドでブロッキング関数を呼び出すとスレッド全体がブロックされてしまいますが、ブロックされたFuture
はスレッドの制御をもたらし、他のFuture
を実行できるようにします。
非同期関数を作成するには次のasync fn
構文を使用できます。
# #![allow(unused_variables)] #fn main() { async fn do_something() { ... } #}
async fn
によってこの関数の返り値はFuture
になります。
Future
は次のようにエグゼキューターで実行する必要があります。
// block_onは提供されたfutureが完了するまで現在のスレッドをブロックします。 // 他のエグゼキューターは、同じスレッドに複数のfutureをスケジュールするなど、 // より複雑な動作を提供します。 use futures::executor::block_on; async fn hello_world() { println!("hello, world!"); } fn main() { let future = hello_world(); // ここでは何もprintされない block_on(future); // futureが動き"hello, world!"が表示される }
async fn
内では.await
を使うことで、ほかのFuture
トレイトを実装する別の型の完了を待つことができます。block_on
とは異なり、.await
は現在のスレッドをブロックしません、代わりに、Future
が完了するのを非同期で待機し、Future
が現在進行できないときは他のタスクを実行できるようにします。
例として、3 つのasync fn
を考えてみましょう。learn_song
, sing_song
, dance
です。
# #![allow(unused_variables)] #fn main() { async fn learn_song() -> Song { ... } async fn sing_song(song: Song) { ... } async fn dance() { ... } #}
歌、学習、ダンスを行う方法の一つは、それぞれ個別にブロックすることです。
fn main() { let song = block_on(learn_song()); block_on(sing_song(song)); block_on(dance()); }
ただ、この方法では最高のパフォーマンスを実現しているわけではありません。一つのことしか実行してないからね!
明らかに、歌を歌うには歌を学ぶ必要があります。しかし、歌を学んだあとに歌うのと、同時に踊ることも可能ですよね?
これを行うには、同時に実行できる 2 つの独立したasync fn
を作ることです。
async fn learn_and_sing() { // Wait until the song has been learned before singing it. // We use `.await` here rather than `block_on` to prevent blocking the // thread, which makes it possible to `dance` at the same time. let song = learn_song().await; sing_song(song).await; } async fn async_main() { let f1 = learn_and_sing(); let f2 = dance(); // `join!` is like `.await` but can wait for multiple futures concurrently. // If we're temporarily blocked in the `learn_and_sing` future, the `dance` // future will take over the current thread. If `dance` becomes blocked, // `learn_and_sing` can take back over. If both futures are blocked, then // `async_main` is blocked and will yield to the executor. futures::join!(f1, f2); } fn main() { block_on(async_main()); }
この例では、歌を歌う前に歌を学習する必要がありますが、歌を学ぶと同時に踊ることもできます。 learn_and_sing
でlearn_song().await
ではなくblock_on(learn_son())
を使ってしまうと、スレッドはしばらくの間他のことを行うことができなくなり、同時に踊ることを不可能にします。
今学習した、async / await
の例を試してみましょう!
HTTP サーバーを書いてみよう!
async / .await
を使用してエコーサーバーを構築してみましょう!
最初に、rustup update nightly
で Rust の最新かつ最高のコピーを手に入れてください。
それが完了したら、cargo +nightly new async-await-echo
を実行して新プロジェクトを作成します。
Cargo.toml
ファイルにいくつかの依存関係を追加しましょう
[dependencies]
# The latest version of the "futures" library, which has lots of utilities
# for writing async code. Enable the "compat" feature to include the
# functions for using futures 0.3 and async/await with the Hyper library,
# which use futures 0.1.
futures-preview = { version = "=0.3.0-alpha.17", features = ["compat"] }
# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP
# server and to make HTTP requests.
hyper = "0.12.9"
依存関係を追加したので、コードを書いていきましょう! 追加するインポートがいくつかあります
# #![allow(unused_variables)] #fn main() { use { hyper::{ // Miscellaneous types from Hyper for working with HTTP. Body, Client, Request, Response, Server, Uri, // This function turns a closure which returns a future into an // implementation of the the Hyper `Service` trait, which is an // asynchronous function from a generic `Request` to a `Response`. service::service_fn, // A function which runs a future to completion using the Hyper runtime. rt::run, }, futures::{ // Extension trait for futures 0.1 futures, adding the `.compat()` method // which allows us to use `.await` on 0.1 futures. compat::Future01CompatExt, // Extension traits providing additional methods on futures. // `FutureExt` adds methods that work for all futures, whereas // `TryFutureExt` adds methods to futures that return `Result` types. future::{FutureExt, TryFutureExt}, }, std::net::SocketAddr, }; #}
次はリクエストを処理できるようにしていきましょう。
async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> { // Always return successfully with a response containing a body with // a friendly greeting ;) Ok(Response::new(Body::from("hello, world!"))) } async fn run_server(addr: SocketAddr) { println!("Listening on http://{}", addr); // Create a server bound on the provided address let serve_future = Server::bind(&addr) // Serve requests using our `async serve_req` function. // `serve` takes a closure which returns a type implementing the // `Service` trait. `service_fn` returns a value implementing the // `Service` trait, and accepts a closure which goes from request // to a future of the response. To use our `serve_req` function with // Hyper, we have to box it and put it in a compatability // wrapper to go from a futures 0.3 future (the kind returned by // `async fn`) to a futures 0.1 future (the kind used by Hyper). .serve(|| service_fn(|req| serve_req(req).boxed().compat())); // Wait for the server to complete serving or exit with an error. // If an error occurred, print it to stderr. if let Err(e) = serve_future.compat().await { eprintln!("server error: {}", e); } } fn main() { // Set the address to run our socket on. let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); // Call our `run_server` function, which returns a future. // As with every `async fn`, for `run_server` to do anything, // the returned future needs to be run. Additionally, // we need to convert the returned future from a futures 0.3 future into a // futures 0.1 future. let futures_03_future = run_server(addr); let futures_01_future = futures_03_future.unit_error().boxed().compat(); // Finally, we can run the future to completion using the `run` function // provided by Hyper. run(futures_01_future); }
cargo run
でターミナルに「Listening on http://127.0.0.1:3000」というメッセージが表示されるはずです。
ブラウザでその URL を開くとどうなりますか? 「hello, world」と見慣れた挨拶が表示されれば順調な証拠です。 おめでとうございます!
Rust で最初の非同期 Web サーバーを作成しました。
また、リクエスト URL、HTTP のバージョン、ヘッダー、その他のメタデータなどの情報を調べることも出来ます。例えば、次のようにリクエストの URL を出力できます。
# #![allow(unused_variables)] #fn main() { println!("Got request at {:?}", req.uri()); #}
お気づきかな?すぐにレスポンスを返すため、リクエストを処理する際に非同期処理をまだ行ってないことに。 静的なメッセージを返すのではなく、Hyper の HTTP クライアントを使用して、ユーザーのリクエストを別の WEB サイトにプロキシしてみましょう。
URL を解析することから初めます。
# #![allow(unused_variables)] #fn main() { let url_str = "http://www.rust-lang.org/en-US/"; let url = url_str.parse::<Uri>().expect("failed to parse URL"); #}
次に、新しくhyper::Client
を作成し、GET
リクエストを送りユーザーにレスポンスを返します。
# #![allow(unused_variables)] #fn main() { let res = Client::new().get(url).compat().await; // Return the result of the request directly to the user println!("request finished-- returning response"); res #}
Client::get
はhyper::client::FutureResponse
を返します。
これは、Future<Output = Result<Response, Error>>
を実装します。
.await
するとき、HTTP リクエストが送信され、現在のタスクが一時停止され、レスポンスが利用可能になったらタスクがキューに入れられて続行されます。
cargo run
をして、http://127.0.0.1:3000/foo
を開いてみてください、Rust のホームページと以下の出力がターミナルで見れるはずです。
Listening on http://127.0.0.1:3000
Got request at /foo
making request to http://www.rust-lang.org/en-US/
request finished-- returning response
HTTP リクエストのプロキシに成功しました!!
中身を見てみよう: Future と Task の実行
このセクションでは,Future
および非同期タスクがどのようにスケジューリングされるのか基本的は構造について説明します。
既存のFuture
型を使用してのハイレベルなコードの記述方法に興味があり、Future
型の動作の詳細に興味がない場合は、async / await
の章へ進んでください。
しかし!この章の内容のいくつかは、async / await
コードがどのように動作するのか理解するのに役立ちますよ。async / await
コードが動作するランタイム及びパフォーマンス特性を理解し、新しい非同期プリミティブを作成します。
この章をスキップするなら、将来のためにブックマークすることをオススメします。
さて、それが完了したならFuture
トレイトについてお話していきましょうか。
Future トレイト
Future
トレイトは Rust の非同期プログラミングの中心人物です。(超重役だぞ!)
Future
は値を生成できる非同期計算です。(()
のような空の値の時もあります)
簡略化したFuture
トレイトは以下のようになります。
# #![allow(unused_variables)] #fn main() { trait SimpleFuture { type Output; fn poll(&mut self, wake: fn()) -> Poll<Self::Output>; } enum Poll<T> { Ready(T), Pending, } #}
このpoll
関数を呼び出すことでFuture
を進めることが出来ます、これにより、Future
が可能な限り完了するようになります。Future
が完了するとPoll::Ready(result)
を返し、未完了のときはPoll::Pending
を返して、Future
がさらに進む準備ができたときにwake()
関数が呼び出されるように準備します。
wake()
が呼び出されると、Future
を駆動するエグゼキューターがpoll
を再度呼び出し、Future
を更に進めようとします。
wakte()
がなければ、エグゼキューターはFuture
がいつ進むかを知る方法がなく、つねにすべてのfuture
をポーリングする必要があります。wake()
を使用することで、エグゼキューターはどのFuture
をpoll
する準備ができているかを正確に把握できます。
例えば、すでに利用可能なデータが有る場合とない場合があるソケットから読み取りたい場合を考えます。データが有る場合、Poll::Ready(data)
でそれを読み込んで返すことが出来ます。
しかし、データの準備ができていない時Future
はブロックされ、進行できなくなります。
データが利用できない時、ソケットでデータの準備ができた時にwake
が呼び出されるように登録する必要があります。
これにより、エグゼキューターに準備が整ったことが分かります。
単純なSocketRead
は次のようになります。
# #![allow(unused_variables)] #fn main() { pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>; fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { // ソケットにはデータがあります。それをバッファに読み込んで返します。 Poll::Ready(self.socket.read_buf()) } else { // ソケットにはまだデータがありません // // データが利用可能になったらwakeが呼び出されるようにします。 // データが利用可能になるとwakeが呼び出され // このFutureのユーザーは再度pollを呼び出してデータを受信することが分かります。 self.socket.set_readable_callback(wake); Poll::Pending } } } #}
このモデルでは、中間割当を必要とせずに、複数の非同期オペレーションを一緒に構築できます。 次のように、複数のFuture
を実行したり、連鎖させたりすることは割当のないステートマシンを介して実装できます。
# #![allow(unused_variables)] #fn main() { /// A SimpleFuture that runs two other futures to completion concurrently. /// /// Concurrency is achieved via the fact that calls to `poll` each future /// may be interleaved, allowing each future to advance itself at its own pace. pub struct Join<FutureA, FutureB> { // Each field may contain a future that should be run to completion. // If the future has already completed, the field is set to `None`. // This prevents us from polling a future after it has completed, which // would violate the contract of the `Future` trait. a: Option<FutureA>, b: Option<FutureB>, } impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { // Attempt to complete future `a`. if let Some(a) = &mut self.a { if let Poll::Ready(()) = a.poll(wake) { self.a.take(); } } // Attempt to complete future `b`. if let Some(b) = &mut self.b { if let Poll::Ready(()) = b.poll(wake) { self.b.take(); } } if self.a.is_none() && self.b.is_none() { // Both futures have completed-- we can return successfully Poll::Ready(()) } else { // One or both futures returned `Poll::Pending` and still have // work to do. They will call `wake()` when progress can be made. Poll::Pending } } } #}
これは、個別の割当を必要とせずに複数のFuture
を同時に実行できる方法を示し、より効率的な非同期プログラムを可能にします。同様に、複数のシーケンシャルFuture
を次々に実行することもできます。
# #![allow(unused_variables)] #fn main() { /// A SimpleFuture that runs two futures to completion, one after another. // // Note: for the purposes of this simple example, `AndThenFut` assumes both // the first and second futures are available at creation-time. The real // `AndThen` combinator allows creating the second future based on the output // of the first future, like `get_breakfast.and_then(|food| eat(food))`. pub struct AndThenFut<FutureA, FutureB> { first: Option<FutureA>, second: FutureB, } impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB> where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>, { type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if let Some(first) = &mut self.first { match first.poll(wake) { // We've completed the first future-- remove it and start on // the second! Poll::Ready(()) => self.first.take(), // We couldn't yet complete the first future. Poll::Pending => return Poll::Pending, }; } // Now that the first future is done, attempt to complete the second. self.second.poll(wake) } } #}
これらの例は複数の割り当てられたオブジェクトと深くネストされたコールバックを必要とせずに、Future
トレイトを使用して非同期制御フローを表現する方法を示しています。
基本的な制御フローが終わったら、実際のFuture
トレイトとSimpleFuture
がどのように異なるかを話します。
# #![allow(unused_variables)] #fn main() { trait Future { type Output; fn poll( // Note the change from `&mut self` to `Pin<&mut Self>`: self: Pin<&mut Self>, // and the change from `wake: fn()` to `cx: &mut Context<'_>`: cx: &mut Context<'_>, ) -> Poll<Self::Output>; } #}
気づきやすい最初の変更はself
がもはや&mut self
出ないことです。
Pin<&mut Self>
に変更されました。Pin
については、あとのセクションで詳しくお話します、が、現時点では、Pin
によって固定したFuture
を作成できることを知っています。固定されたオブジェクトはフィールドなどにポインタを格納できます。struct MyFut { a: i32, ptr_to_a: *const i32 }
のように。
async / await
を有効にするにはピン留めが必要になります。
次に、wake: fn()
は&mut Context<'_>'
に変更されました。
SimpleFuture
では関数ポインターの呼び出しを使用して、Future
をポーリングすることをエグゼキューターに伝えました。しかし、fn()
はサイズがゼロであるため、wake
が呼ばれたことなどのデータを保存することができません。
実際は、Web サーバーのような複雑なアプリケーションには起動をすべて個別に管理する必要がある数千の異なる接続が存在する場合があります。
このContext
タイプは特定のタスクを起動するために使用できるWaker
へのアクセスを提供することでこれを解決します。
タスクを起こせ!Waker!
Future
は最初にpoll
された時に完了できないことがよくあります。これが発生した時、Future
はさらに前進する準備ができたら再度ポーリングされるようにする必要があります。
これはWaker
型で行われます。
Future
がポーリングされるたびに、「タスク」の一部としてポーリングされます。
タスクは、エグゼキューターに送信されたトップレベルのFuture
です。
Waker
は関連付けられたタスクを起動する必要があることをエグゼキューターに伝えるために使用できるwake()
メソッドを提供します。
wake()
が呼び出された時、エグゼキューターは、Waker
と関連するタスクが進む準備が出来たことを知っています。そして、再びポーリングする必要があることも。
Waker
はclone()
も実装しているため、コピーして保存することが出来ます。
Waker
を使用して単純なタイマーを実装してみましょう!
タイマー作成
この例では、タイマーが作成された時に新しいスレッドを立て、必要な時間だけスリープし、時間経過した時にタイマーのFuture
を通知します。
必要なインポートは次のとおりです。
# #![allow(unused_variables)] #fn main() { use { std::{ future::Future, pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll, Waker}, thread, time::Duration, }, }; #}
Future
の型自体を定義するところからです。私達の future にはスレッドが、タイマーが経過し、future が完了するべきであることを伝える方法が必要です。 Arc<Mutex<..>>
を使用して、スレッドと future の間で通信します。
# #![allow(unused_variables)] #fn main() { pub struct TimerFuture { shared_state: Arc<Mutex<SharedState>>, } /// Shared state between the future and the waiting thread struct SharedState { /// Whether or not the sleep time has elapsed completed: bool, /// The waker for the task that `TimerFuture` is running on. /// The thread can use this after setting `completed = true` to tell /// `TimerFuture`'s task to wake up, see that `completed = true`, and /// move forward. waker: Option<Waker>, } #}
実際にFuture
の実装を書いていきましょう!
# #![allow(unused_variables)] #fn main() { impl Future for TimerFuture { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Look at the shared state to see if the timer has already completed. let mut shared_state = self.shared_state.lock().unwrap(); if shared_state.completed { Poll::Ready(()) } else { // Set waker so that the thread can wake up the current task // when the timer has completed, ensuring that the future is polled // again and sees that `completed = true`. // // It's tempting to do this once rather than repeatedly cloning // the waker each time. However, the `TimerFuture` can move between // tasks on the executor, which could cause a stale waker pointing // to the wrong task, preventing `TimerFuture` from waking up // correctly. // // N.B. it's possible to check for this using the `Waker::will_wake` // function, but we omit that here to keep things simple. shared_state.waker = Some(cx.waker().clone()); Poll::Pending } } } #}
かなり簡単ですね。スレッドにshared_state.completed = true
が設定されている時、完了です! それ以外の場合Waker
は現在のタスクのクローンを作成してshared_state.waker
に渡し、スレッドがタスクを復帰できるようにします。
重要なのは、future が異なるWaker
を持つ異なるタスクに移動した可能性があるため、future がポーリングされるたびにWaker
を更新する必要があることです。これは、ポーリングされたあと、タスク間で future が渡される時に発生します。
最後に、実際にタイマーを構築してスレッドを開始する API が必要です。
# #![allow(unused_variables)] #fn main() { impl TimerFuture { /// Create a new `TimerFuture` which will complete after the provided /// timeout. pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, })); // Spawn the new thread let thread_shared_state = shared_state.clone(); thread::spawn(move || { thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // Signal that the timer has completed and wake up the last // task on which the future was polled, if one exists. shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } }); TimerFuture { shared_state } } } #}
すげぇ!単純なタイマーの future を構築するために必要なのはそれだけです。 さて、future を実行するエグゼキューターがいれば、、、
エグゼキューターを書いてみよう
Rust のFuture
は怠け者です。積極的に完了しない限り、何もしてくれません。
future を完了させるための一つの方法はasync
関数内の.await
です。
それは問題は 1 レベル上に押し上げるだけです。トップレベルの非同期関数から返された future を誰が実行しますか?
Future
のエグゼキューターが必要です。
Future
エグゼキューターは、トップレベルのFuture
セットを取得し、Future
が進行するたびにpoll
を呼び出すことにより、それらを完了まで実行します。
通常、エグゼキューターは future を一回poll
して開始します。Future
がwake()
を呼び出して進行する準備ができたことを示すと、それらはキューに戻され、poll
が再度呼び出され、Future
が完了するまで繰り返されます。
このセクションでは、多数のトップレベル future を同時に実行できる、独自のシンプルなエグゼキューターを作成していきます。
この例では、Waker
を構築する簡単な方法を提供するArcWake
トレイトの future クレートに依存しています。
[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures-preview = "=0.3.0-alpha.17"
次に、src/main.rs
の先頭に次のインポートが必要です。
# #![allow(unused_variables)] #fn main() { use { futures::{ future::{FutureExt, BoxFuture}, task::{ArcWake, waker_ref}, }, std::{ future::Future, sync::{Arc, Mutex}, sync::mpsc::{sync_channel, SyncSender, Receiver}, task::{Context, Poll}, time::Duration, }, // The timer we wrote in the previous section: timer_future::TimerFuture, }; #}
エグゼキューターはチャネルを介して実行するタスクを送信することで動作します。エグゼキューターはチャネルからイベントを取得して実行します。タスクがより多くの作業をする準備ができた時に(起こされた時)、タスクをチャネルに戻すことにより、再度ポーリングされるようにスケジュールできます。
この設計では、エグゼキューター自体にタスクチャネルの受信側が必要です。ユーザーは新しい future を作成できるように送信側を取得します。タスク自体は自分自身を再スケジュールする future です。 したがって、タスク自体をリキューするために使用できる送信者とペアになった future として保存します。
# #![allow(unused_variables)] #fn main() { /// Task executor that receives tasks off of a channel and runs them. struct Executor { ready_queue: Receiver<Arc<Task>>, } /// `Spawner` spawns new futures onto the task channel. #[derive(Clone)] struct Spawner { task_sender: SyncSender<Arc<Task>>, } /// A future that can reschedule itself to be polled by an `Executor`. struct Task { /// In-progress future that should be pushed to completion. /// /// The `Mutex` is not necessary for correctness, since we only have /// one thread executing tasks at once. However, Rust isn't smart /// enough to know that `future` is only mutated from one thread, /// so we need use the `Mutex` to prove thread-safety. A production /// executor would not need this, and could use `UnsafeCell` instead. future: Mutex<Option<BoxFuture<'static, ()>>>, /// Handle to place the task itself back onto the task queue. task_sender: SyncSender<Arc<Task>>, } fn new_executor_and_spawner() -> (Executor, Spawner) { // Maximum number of tasks to allow queueing in the channel at once. // This is just to make `sync_channel` happy, and wouldn't be present in // a real executor. const MAX_QUEUED_TASKS: usize = 10_000; let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); (Executor { ready_queue }, Spawner { task_sender}) } #}
また、spawner にメソッドを追加して、新しい future を簡単に生成できるようにします。このメソッドは future の型を取得し、それを box 化してFutureObj
に入れ、その中にエグゼキューターにエンキューできる新しいArc<Task>
を作成します。
# #![allow(unused_variables)] #fn main() { impl Spawner { fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("too many tasks queued"); } } #}
future を poll するにはWaker
を作成する必要があります。Waker の章でも説明したようにWaker
はwake()
が呼び出された時に再度ポーリングされるタスクをスケジュールする責任があります。Waker
はどのタスクが準備完了になったかをエグゼキューターに正確に伝え、準備ができている future だけをポーリングできることを忘れないでください。新しいWaker
を作成する簡単な方法はArcWake
トレイトを実装し、waker_ref
または.into_waker()
関数を使用してArc<impl ArcWake>
をWaker
に変更することです。タスクにArcWake
を実装してタスクをWaker
に変えて目覚めさせてみましょう。
# #![allow(unused_variables)] #fn main() { impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { // Implement `wake` by sending this task back onto the task channel // so that it will be polled again by the executor. let cloned = arc_self.clone(); arc_self.task_sender.send(cloned).expect("too many tasks queued"); } } #}
Arc<Task>
からWaker
が作成された時にwake()
を呼び出すと、Arc のコピーがタスクチャネルに送信されます。次に、エグゼキューターがタスクを取得してポーリングする必要があります。それを実装しましょう。
# #![allow(unused_variables)] #fn main() { impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // Take the future, and if it has not yet completed (is still Some), // poll it in an attempt to complete it. let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // Create a `LocalWaker` from the task itself let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // `BoxFuture<T>` is a type alias for // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`. // We can get a `Pin<&mut dyn Future + Send + 'static>` // from it by calling the `Pin::as_mut` method. if let Poll::Pending = future.as_mut().poll(context) { // We're not done processing the future, so put it // back in its task to be run again in the future. *future_slot = Some(future); } } } } } #}
おめでとう!future エグゼキューターができました!これを使用して、async / .await
コードと先ほど書いたTimerFuture
などのカスタム future を実行することができます。
fn main() { let (executor, spawner) = new_executor_and_spawner(); // Spawn a task to print before and after waiting on a timer. spawner.spawn(async { println!("howdy!"); // Wait for our timer future to complete after two seconds. TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); }); // Drop the spawner so that our executor knows it is finished and won't // receive more incoming tasks to run. drop(spawner); // Run the executor until the task queue is empty. // This will print "howdy!", pause, and then print "done!". executor.run(); }
エグゼキューターとシステム IO
前のFuture トレイトの章でソケットで非同期読み取りを実行した Future の例を説明しました。
# #![allow(unused_variables)] #fn main() { pub struct SocketRead<'a> { socket: &'a Socket, } impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>; fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { // ソケットにはデータがあります。それをバッファに読み込んで返します。 Poll::Ready(self.socket.read_buf()) } else { // ソケットにはまだデータがありません // // データが利用可能になったらwakeが呼び出されるようにします。 // データが利用可能になるとwakeが呼び出され // このFutureのユーザーは再度pollを呼び出してデータを受信することが分かります。 self.socket.set_readable_callback(wake); Poll::Pending } } } #}
この future はソケット上の利用可能なデータを読み取り、データが利用できない場合エグゼキューターに譲り、ソケットが再び読み取り可能になった時にタスクが起動されるように要求します。この例からSocket
型がどのように実装されているかは明確ではなく、特にset_readable_callback
関数がどのように機能するかは明らかでありません。
ソケットが読み取り可能になったらlw.wake()
が呼び出されるようにするにはどうすればよいですか? 1 つのオプションはsocket
が読み取り可能かどうかを継続的にチェックし、適切な時にwake
を呼び出すスレッドを持つことです。
ただし、これは非常に非効率でブロックされた IO ごとに個別のスレッドが必要になります。これにより、非同期コードの効率が大幅に低下します。
実際にはこの問題は、Linux のepoll
、FreeBSD のkqueue
及び、MacOS,Windows の IOCP、Fuchsia の port などの IO 対応システムブロックプリミティブとの統合によって解決されます。(これらはすべてクロスプラットフォームの Rust クレートmio
を通じで公開されます。)
これらのプリミティブはすべて、スレッドが複数の非同期 IO イベントでブロックすることを許可し、イベントの 1 つが完了すると戻ります。
これらの API は通常以下のようになります。
# #![allow(unused_variables)] #fn main() { struct IoBlocker { ... } struct Event { // 発生し、リッスンされたイベントを一意に識別するID id: usize, // 待機、または発生したシグナルのセット signals: Signals, } impl IoBlocker { /// ブロックする非同期IOイベントの新しいコレクションを作成 fn new() -> Self { ... } fn add_io_event_interest( &self, /// イベントが発火するオブジェクト io_object: &IoObject, /// イベントをトリガーするio_objectに表示される可能性のあるシグナルのセット /// この関心から生じるイベントに与えるIDとペアになります。 event: Event, ) { ... } /// イベントの1つが発生するまでブロックします fn block(&self) -> Event { ... } } let mut io_blocker = IoBlocker::new(); io_blocker.add_io_event_interest( &socket_1, Event { id: 1, signals: READABLE }, ); io_blocker.add_io_event_interest( &socket_2, Event { id: 2, signals: READABLE | WRITABLE }, ); let event = io_blocker.block(); println!("Socket {:?} is now {:?}", event.id, event.signals); #}
Future エグゼキューターは、これらのプリミティブを使用して、特定の IO イベントが発生した時に実行されるコールバックを構成できるソケットなどの非同期 IO オブジェクトを提供できます。
上記のSocketRead
の場合Socket::set_readable_callback
関数は次のような擬似コードのようになります。
# #![allow(unused_variables)] #fn main() { impl Socket { fn set_readable_callback(&self, waker: Waker) { // local_executorはローカルエグゼキューターへの参照です // これは、ソケットの作成時に提供できますが // 実際には多くのエグゼキューターの実装は、 // 便宜上、スレッドローカルストレージを介してそれを渡します。 let local_executor = self.local_executor; // このIOオブジェクトのID let id = self.id; // Store the local waker in the executor's map so that it can be called // once the IO event arrives. // IOイベントが到着したら呼び出せるようにローカルwakerをエグゼキューターのマップに保存します。 local_executor.event_map.insert(id, waker); local_executor.add_io_event_interest( &self.socket_file_descriptor, Event { id, signals: READABLE }, ); } } #}
IO イベントを受信して適切なWaker
タスクにディスパッチできるエグゼキュータースレッドを一つだけ持つことができるようになりました。