エグゼキューターとシステム IO

前のFuture トレイトの章でソケットで非同期読み取りを実行した Future の例を説明しました。


# #![allow(unused_variables)]
#fn main() {
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // ソケットにはデータがあります。それをバッファに読み込んで返します。
            Poll::Ready(self.socket.read_buf())
        } else {
            // ソケットにはまだデータがありません
            //
            // データが利用可能になったらwakeが呼び出されるようにします。
            // データが利用可能になるとwakeが呼び出され
            // このFutureのユーザーは再度pollを呼び出してデータを受信することが分かります。
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
#}

この future はソケット上の利用可能なデータを読み取り、データが利用できない場合エグゼキューターに譲り、ソケットが再び読み取り可能になった時にタスクが起動されるように要求します。この例からSocket型がどのように実装されているかは明確ではなく、特にset_readable_callback関数がどのように機能するかは明らかでありません。

ソケットが読み取り可能になったらlw.wake()が呼び出されるようにするにはどうすればよいですか? 1 つのオプションはsocketが読み取り可能かどうかを継続的にチェックし、適切な時にwakeを呼び出すスレッドを持つことです。

ただし、これは非常に非効率でブロックされた IO ごとに個別のスレッドが必要になります。これにより、非同期コードの効率が大幅に低下します。

実際にはこの問題は、Linux のepoll、FreeBSD のkqueue及び、MacOS,Windows の IOCP、Fuchsia の port などの IO 対応システムブロックプリミティブとの統合によって解決されます。(これらはすべてクロスプラットフォームの Rust クレートmioを通じで公開されます。) これらのプリミティブはすべて、スレッドが複数の非同期 IO イベントでブロックすることを許可し、イベントの 1 つが完了すると戻ります。

これらの API は通常以下のようになります。


# #![allow(unused_variables)]
#fn main() {
struct IoBlocker {
    ...
}

struct Event {
    // 発生し、リッスンされたイベントを一意に識別するID
    id: usize,

    // 待機、または発生したシグナルのセット
    signals: Signals,
}

impl IoBlocker {
    /// ブロックする非同期IOイベントの新しいコレクションを作成
    fn new() -> Self { ... }

    fn add_io_event_interest(
        &self,

        /// イベントが発火するオブジェクト
        io_object: &IoObject,

        /// イベントをトリガーするio_objectに表示される可能性のあるシグナルのセット
        /// この関心から生じるイベントに与えるIDとペアになります。
        event: Event,
    ) { ... }

    /// イベントの1つが発生するまでブロックします
    fn block(&self) -> Event { ... }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

println!("Socket {:?} is now {:?}", event.id, event.signals);
#}

Future エグゼキューターは、これらのプリミティブを使用して、特定の IO イベントが発生した時に実行されるコールバックを構成できるソケットなどの非同期 IO オブジェクトを提供できます。

上記のSocketReadの場合Socket::set_readable_callback関数は次のような擬似コードのようになります。


# #![allow(unused_variables)]
#fn main() {
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // local_executorはローカルエグゼキューターへの参照です
        // これは、ソケットの作成時に提供できますが
        // 実際には多くのエグゼキューターの実装は、
        // 便宜上、スレッドローカルストレージを介してそれを渡します。
        let local_executor = self.local_executor;

        // このIOオブジェクトのID
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        // IOイベントが到着したら呼び出せるようにローカルwakerをエグゼキューターのマップに保存します。
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}
#}

IO イベントを受信して適切なWakerタスクにディスパッチできるエグゼキュータースレッドを一つだけ持つことができるようになりました。