エグゼキューターを書いてみよう
Rust のFuture
は怠け者です。積極的に完了しない限り、何もしてくれません。
future を完了させるための一つの方法はasync
関数内の.await
です。
それは問題は 1 レベル上に押し上げるだけです。トップレベルの非同期関数から返された future を誰が実行しますか?
Future
のエグゼキューターが必要です。
Future
エグゼキューターは、トップレベルのFuture
セットを取得し、Future
が進行するたびにpoll
を呼び出すことにより、それらを完了まで実行します。
通常、エグゼキューターは future を一回poll
して開始します。Future
がwake()
を呼び出して進行する準備ができたことを示すと、それらはキューに戻され、poll
が再度呼び出され、Future
が完了するまで繰り返されます。
このセクションでは、多数のトップレベル future を同時に実行できる、独自のシンプルなエグゼキューターを作成していきます。
この例では、Waker
を構築する簡単な方法を提供するArcWake
トレイトの future クレートに依存しています。
[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures-preview = "=0.3.0-alpha.17"
次に、src/main.rs
の先頭に次のインポートが必要です。
#![allow(unused_variables)]
fn main() {
use {
futures::{
future::{FutureExt, BoxFuture},
task::{ArcWake, waker_ref},
},
std::{
future::Future,
sync::{Arc, Mutex},
sync::mpsc::{sync_channel, SyncSender, Receiver},
task::{Context, Poll},
time::Duration,
},
// The timer we wrote in the previous section:
timer_future::TimerFuture,
};
}
エグゼキューターはチャネルを介して実行するタスクを送信することで動作します。エグゼキューターはチャネルからイベントを取得して実行します。タスクがより多くの作業をする準備ができた時に(起こされた時)、タスクをチャネルに戻すことにより、再度ポーリングされるようにスケジュールできます。
この設計では、エグゼキューター自体にタスクチャネルの受信側が必要です。ユーザーは新しい future を作成できるように送信側を取得します。タスク自体は自分自身を再スケジュールする future です。 したがって、タスク自体をリキューするために使用できる送信者とペアになった future として保存します。
#![allow(unused_variables)]
fn main() {
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
/// In-progress future that should be pushed to completion.
///
/// The `Mutex` is not necessary for correctness, since we only have
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread,
/// so we need use the `Mutex` to prove thread-safety. A production
/// executor would not need this, and could use `UnsafeCell` instead.
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender})
}
}
また、spawner にメソッドを追加して、新しい future を簡単に生成できるようにします。このメソッドは future の型を取得し、それを box 化してFutureObj
に入れ、その中にエグゼキューターにエンキューできる新しいArc<Task>
を作成します。
#![allow(unused_variables)]
fn main() {
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
}
future を poll するにはWaker
を作成する必要があります。Waker の章でも説明したようにWaker
はwake()
が呼び出された時に再度ポーリングされるタスクをスケジュールする責任があります。Waker
はどのタスクが準備完了になったかをエグゼキューターに正確に伝え、準備ができている future だけをポーリングできることを忘れないでください。新しいWaker
を作成する簡単な方法はArcWake
トレイトを実装し、waker_ref
または.into_waker()
関数を使用してArc<impl ArcWake>
をWaker
に変更することです。タスクにArcWake
を実装してタスクをWaker
に変えて目覚めさせてみましょう。
#![allow(unused_variables)]
fn main() {
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("too many tasks queued");
}
}
}
Arc<Task>
からWaker
が作成された時にwake()
を呼び出すと、Arc のコピーがタスクチャネルに送信されます。次に、エグゼキューターがタスクを取得してポーリングする必要があります。それを実装しましょう。
#![allow(unused_variables)]
fn main() {
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// Create a `LocalWaker` from the task itself
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>` is a type alias for
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
if let Poll::Pending = future.as_mut().poll(context) {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
}
}
}
}
おめでとう!future エグゼキューターができました!これを使用して、async / .await
コードと先ほど書いたTimerFuture
などのカスタム future を実行することができます。
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer.
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run.
drop(spawner);
// Run the executor until the task queue is empty.
// This will print "howdy!", pause, and then print "done!".
executor.run();
}