async/awaitで躓いて学んだ、「オレは雰囲気でRustをしている!」からの脱し方。

こんにちは。テクノロジー本部バックエンド開発グループの小倉です。 この記事は キャディ Advent Calendar 2020 の9日目です。前日は山田さんの protobuf v3 の optional について でした。

Rustはイカしたエコシステムが充実していて、型が厳しい言語の割にはコンパイラやリンターに従うだけで動くコードが書けたりします。その結果「オレは雰囲気でRustをしている!」という気持ちになったりするんですよね〜。


本記事では、巷にあふれる入門記事を読んで実務に挑み、その結果でてきたよくわからない点を調べた結果得た知識を紹介します。

巷にあふれているRustのasync/awaitの記事。

これらを読みました

読んだ感想

async関数は「あとでやっておきますよオブジェクト」を返すもの。

  • 「あとでやっておきますよオブジェクト」は正しくはステートマシン(State Machine)という
  • 厳密には「 あとでやっておく」+「言われたら出来るところまで進めておく」。
  • すなわち「まだ途中で今ここ」「もう終わった」などのステート(状態)を持つマシン(計算機)
  • 残りの処理の内容も含めてステートマシンが持っているので、途中の状態のステートマシンをスレッド間で渡し合うことで、柔軟に並列処理できる(便利!)
  • RustではFutureオブジェクトがステートマシンの実装の一例。かつてFutureはデファクトなクレートだったが、Rust1.39で標準ライブラリに入った。
    • Future::pollを呼ぶことが「出来るところまで進めるように言う」に相当する。

ステートマシンとは別に「ステートマシン走らせる人」がいて初めて非同期処理が出来る。

  • 「ステートマシン走らせる人」をランタイム(Runtime)と呼ぶ
    • Rust文脈だとFuture::pollをうまく呼ぶ人
  • ランタイムはステートマシンの様子を見ながら適宜「出来るところまでやるように言う」。
  • Runtimeの例:tokio
  • block_on(非同期タスク)ってやればとりあえず実行できる、、、?

非同期関数を作るときは、awaitのタイミングを意識しないと非同期処理の恩恵が受けられない

  • awaitを付けるとFutureが終わるまで待たせる
  • awaitを付ける行為はステートマシンの「途中まで」の「途中」を増やすだけ。それだけでは並列処理にはならない。
    • 昔作ったfutureをあとでawaitすると並列化した気になるけれど、実はawaitのタイミングまでfutureの中身の処理は走らない。
  • 並列化したかったらspawnして独立したタスクとして切り出すか、joinやselectなど「複数のFutureを渡してよしなに全部終わらせてくれるやつ」を使う必要がある。

実務に挑んだ結果

  • 案外なんとかなる。
  • 同時に「オレは雰囲気でasync/awaitしている!」という気持ちにもなる。
    • よくわからないけど、コンパイラやリンターに言われたとおりにすればコンパイル通った、という事例がしばしば
      • moveを付けたり付けなかったり
      • Sync, Sendを付けたり付けなかったり
      • Cloneを付けたり付けなかったり

つらかったこと

    'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted\
     to block the current thread while the thread is being used to drive asynchronous tasks.'

脱雰囲気のためのナレッジ整理

moveはいつ必要?

moveはasyncに限らずclosureの文脈で使われるキーワードである。 クロージャー内では引数でもらっていない値を使うことが出来る。 クロージャーが定義されるタイミングの環境にある変数をキャプチャしているのである。

fn main(){
    let x = 3;
    let is_x = |a| a == x;
    println!("{}", is_x(3));
}

キャプチャは3種類のやり方がある。 - 環境にある変数を所有権ごと奪う。 - 環境にある変数の可変参照を持つ。 - 環境にある変数を(可変でない)参照を持つ。

所有権を奪いたい場合は

fn main(){
    let x = 3;
    let is_x = move |a| a == x;//xの所有権はクロージャーに移動
    println!("{}", is_x(3));
}

のようにすれば良い。

asyncに置けるmoveも同様である。asyncクロージャーやasyncスコープはFutureを返すが、そのFutureは環境のキャプチャを保持している。

#[tokio::main]
async fn main(){
    let x = 3;
    //asyncブロック
    let my_future = async{
        x * 2
    };
    println!("{}", my_future.await);
}

以下は、筆者がかつてasyncクロージャのつもりで書いたがコンパイルエラーになるコードである。

#[tokio::main]
async fn main(){
    let my_closure = |y| async{
        3 + y
    };
    let x = 10;
    let my_future = my_closure(x);
    println!("{}", my_future.await);
}

筆者が犯した間違いは2つ: - 上記コードのmy_closureはasyncクロージャではなく、asyncブロックを返す普通のクロージャである。 - yの所有権が何やら良くない。

asyncクロージャはまだunstableであり、Nightlyでなければ使えないので、普通のクロージャのまま続けるとして、どのようにすれば上記コードのコンパイルが通るだろうか?

まず、yのライフタイムは my_closureクロージャの内部である。yはasyncブロックの中で参照され、そのasyncブロックはmy_closureの戻り値として、my_closureの外側に放り出される。

このasyncブロックが、すなわちFutureが、awaitされるまでyは生きている必要がある。よって以下のようにすることで解決する。

#[tokio::main]
async fn main(){
    let my_closure = |y| async move {
            3 + y
    };
    println!("{}", my_closure(10).await);
}

ようするに、Futureに所有権を移したい時にmoveをつければ良い。とある変数xの所有権をFutureに移したくなるのは - Futureがxのスコープを飛び出るとき

である、これはスレッド間通信があるときはもちろん、スレッドをまたがなくてもよくある話である。

Sendはいつ必要?

Send/Syncはスレッド間で値をやりとり/共有したいときに付けるTraitである。具体的な構造体にSendやSyncをimplするのは非推奨であり、通常はトレイト境界などの文脈で登場するのみである。

非同期タスクはRuntimeによっていくつかのスレッドに振り分けながら実行されていることを思い出すと、async関数の引数や戻り値はSendやSyncをimplしていなければならない気になる。これは事実そうであるが、必ずしもすべてにSendやSyncをimplしなければならないわけではない。ここではそれを詳細に説明する。

tokioのスレッド戦略

詳細な説明の前に、tokioのスレッド戦略を説明する。 tokioのRuntimeは、1コア内に、少しのコアスレッドと、必要な数のブロッキングスレッドを建て、それらのスレッドを駆使しながら処理を実行していく。 コアスレッドは非同期タスクをやるためのスレッドで、デフォルトでは1つしか建てない。複数建てるとしても少数である。 コアスレッドではやる準備ができたFutureオブジェクトに対してpollを行い、出来るところまで進めてもらう。複数のFutureを少しずつ進めながらやる場合も、コアスレッドで走らせるタスクを入れ替えながら行っている。 ブロッキングスレッドは重たい処理を行うためのスレッドである。必要に応じてスレッドを建てるので、膨大な数になることもある。 終わるまで時間がかかる処理をコアスレッドで行うと、他の実行可能な処理を待たせることになり非効率である。そのため、時間がかかりそうなタスクは同期非同期かかわらずブロッキングスレッドに「逃がす」運用が想定されている。

重い処理の逃がし方

重い処理を逃がすためには、spawn_blockingblock_in_placeの2つの方法がある。

spawn_blocking
use tokio::task;
#[tokio::async]
async fn main(){
    let res = task::spawn_blocking(move || {
        // めっちゃ重い処理をここでやる
        "done computing"
    }).await?;
}

上記コードのようにspawn_blockingに重いタスクを渡すと、別のブロッキングスレッドを建てて、そこで重いタスクを実行してくれる。 このときスレッド間の移動が発生するため、同期タスクその戻り値がSend+'staticを実装している必要がある。 'staticを付けているのは、本質的に参照や参照を含むオブジェクトを受け渡しできないようにするためである。('staticの参照などはそれ自体は'staticではないがSendをimplするので受け渡しできる) よもやま:昔はSend:'staticだったらしい、https://rust-lang.github.io/rfcs/0458-send-improvements.html

block_in_place
use tokio::task;
#[tokio::async]
async fn main(){
    let res = task::block_in_place(move || {
        // めっちゃ重い処理をここでやる
        "done computing"
    }).await?;
}

上記コードのようにblock_in_placeに重いタスクを渡すと、今のスレッドをブロッキングスレッドに変更し、別に新たなコアスレッドを建てる。 このときスレッド間移動が発生しないため、Sendはいらない

使い分け

Send+'staticが実装されていれば、spawn_blocking, 実装されていなければblock_in_placeを使えば良い。

Syncはいつ必要?

実務でSyncを要請されたのは、実はasync/awaitの文脈ではなく、tower-grpcが原因でした。 tower-grpc上ではApiはSyncをtrait境界にしています。

Cloneはいつ必要か?

実務でCloneを要請されたのは、以下のような箇所(IsContext(仮名)は独自のTraitです)

pub async fn my_function<
  T: 'static + Clone + Send + IsContext,
>(                                                                                                                                   
  ctx: &T,                                                                                                                         
  arg: i32,
) -> Result<()> {                                                                                                     
  let ctx = ctx.clone();
  spawn_blocking(move || other_function(&ctx, arg)).await?
}                       

spawn_blockingのためにctxの参照(&T)を別スレッドにSendしたい。 spawn_blockingで渡すクロージャーは'staticである必要があるため、参照をキャプチャできない。 よって、ctxをクローンして、それをキャプチャさせてSendしている。

ちなみに、T: Syncであれば &T: Sendである。どのみち'staticではないのでTにSyncをimplしようと、Cloneは必要になる。

for_eachとかmapとかfilterをasync文脈で呼び出したいときはどうすればいい?

Streamを使いましょう。 futuresトレイトで提供されています:https://docs.rs/futures/0.3.5/futures/stream/index.html これは非同期版Iteratorで、Iteratorでできそうなことはひととおり実装されています。

非同期な関数をmapの引数に入れたいとき

このときは Stream::mapではなくStream::thenを使うと良い。

use futures::stream::{self, StreamExt};

async fn twice(x: i32)->i32{
    x * 2
}
#[tokio::main]
async fn main(){
    let v = vec![1, 2, 3];
    let twiced_v = stream::iter(v).then(twice).collect::<Vec<i32>>().await;
    assert_eq!(vec![2, 4, 6], twiced_v);
}

なんでblock_onの中でblock_onしたらダメなの?

block_onが呼ばれると、そのスレッドがブロックされて、中身の実行が終わるまで他のことができなくなるから。(ここでいうブロックはOSのAPIを呼んで行われるOSのスレッドのブロック)。 すなわち、block_onの中では非同期タスクが、よしなに走らされているはずで、そのなかでコアスレッドをブロックされると、非同期タスクを非同期に実行できなくなってしまうので、本意から外れることになる。

同様の理由でロックを取る処理も非同期タスクの中でうかつにはやってはいけない。非同期タスク内でもロックが取れる機構(例:https://docs.rs/tokio/0.2.22/tokio/sync/struct.RwLock.html )が用意されているので、そちらを使用すれば問題ない。

r2d2-postgresは、非同期に使われる想定がされていなかったため、中ではスレッドをブロックするタイプのロック機構が使われている。よって、非同期タスク内で呼び出すとpanicが起きる。これが冒頭の「身に覚えのないpanic」の正体である。


というのが前回までのあらすじ

水平線で囲まれた、常体(だ、である調)で書かれている範囲が前回までのあらすじです。 上で書いたようなことをまとめて、社内向けに公開したら好評でした。私も鼻高々でした。

問題はその後で、moveとかSend/Syncとかの話はRustBookを読んだら全部書いてあるんですよね

びっくりしましたねえ。私は普段塾講師もしていて、中高生相手に情報科学を教えています。口を酸っぱくして「なにごとも、詳しいことが知りたくなったら、まずは公式ドキュメントをあたろう!」と教えてきたのに、、、これはとんだ医者の不養生でございました😌

まあ、最近進展があったasync/awaitあたりはもちろんRustBookには書いてないので、巷の記事をあさらないといけませんでしたね。AsyncBookがありますが、今は絶賛成長中です。 Futureを扱うトレートが乱立していたときの話とか、標準ライブラリに取り込まれるまでの歴史とかはBook系ではあまり深く踏み込まないので、そういうのが学びたければ巷の記事をあさるのもいいですね。歴史系は古い記事ほど良いソースです。

巷にあふれている良い記事を読んで思うのは、Rustの難しい部分と向き合いやすくなる「気持ち」を詳しく説明してくれているものが多いことですね。これはRustBookとかだと省かれていがちで、理解度を1から10に上げるときに役に立ちます。逆にRustBookに書かれていることは気持ちというよりは詳細な事実といった感じで、理解度を0から1に上げる部分に該当すると思います。

私が思うに理解度を0から1に上げる資料を読む前に理解度を1から10に上げるための資料を読んでRustに立ち向かうと「雰囲気でRustをしている!」という気持ちになるのでしょう。万物を完全に理解する必要はないので、大抵のものは「雰囲気で○○する」くらいがちょうどいいのだと思います。

ただ、タスクがスレッドを飛び越えまくるコードを書くともっとちゃんと理解しないと不安になってくる部分もあるかと思うので、そういうときはRustBookに帰ってきて、脱雰囲気をすると良いでしょう。

簡にして要を得た結論

まず、巷に溢れた記事を読んで気持ちを知ろう。すると雰囲気でRustができるようになる。 雰囲気でRustをする状態から脱したかったら、RustBookや仕様書に帰ってこよう。

おわりに

この記事もまた、巷にあふれる記事の仲間入りをするのでしょう。この記事では私の「気持ち」のフレーバーがかかっております。皆さんのお口にあったのであれば幸いです。そうでなければ、残念。でもまたどこか別の場所で良い「気持ち」に出会えることを願います。


CADDiでは「モノづくり産業のポテンシャルを解放する」ための仲間を探しています。 実現したい世界に向け、作らなければならないもの、改善したいことが無限にあります。

少しでも興味を持っていただけましたたら、リニューアルされたばかりの 採用サイト をご覧ください。(わかりやすく、ヘッダの仕掛けもかっこいいので是非!)

また、カジュアル面談も行っていますので、実際にエンジニアに会ってみたい方はこちらからどうぞ。