こんにちは。CADDi でバックエンドエンジニアをしている 高藤 です。 この記事は CADDi Advent Calendar 21日目の記事です。昨日は、寺田さんによる RustでRAMの動作原理をシミュレートする でした!
今回はRustのtracintg
crateについて紹介したいと思います。
目次
[toc]
はじめに
キャディではバックエンドのAPIをgRPCを使って実装しています。
実装にはtonic
というRustでは比較的新しいcrateを使っています。使いやすいこともあり比較的使ってみたなどの記事は散見されるのですが、今回は本番環境で運用するのに大事なloggingの観点で説明をしたいと思っています。
gRPCサーバの実装
まずはtonicのサンプルを紹介します。以下のコードはtonic にあるexamples/src/helloworld/server.rs
をもとに説明を行います。
まずコードを見てみましょう。非常にシンプルなサーバです。Requestに名前を含めると返事をしてくれるそれだけのサーバですが、まずはこのコードを使っていくつか検証をして行こうと思います。
use tonic::{transport::Server, Request, Response, Status};
use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
pub mod hello_world {
tonic::include_proto!("helloworld");
}
#[derive(Default)]
pub struct MyGreeter {}
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
println!("Got a request from {:?}", request.remote_addr());
let reply = hello_world::HelloReply {
message: format!("Hello {}!", request.into_inner().name),
};
Ok(Response::new(reply))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();
println!("GreeterServer listening on {}", addr);
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
挙動を確認するためにこちらのServerをまずは実行してみます。
❯ cargo run --bin helloworld-server`
サーバを起動すると
GreeterServer listening on [::1]:50051
このように表示されてサーバが起動します。
このままRequestを送るためgrpcurl
を使って見ます
❯ grpcurl -plaintext -d '{"name": "foo"}' -proto proto/helloworld/helloworld.proto -import-path ./proto localhost:50051 helloworld.Greeter/SayHello
{
"message": "Hello foo!"
}
サーバ側
Got a request from Some([::1]:58752)
ちゃんと動いていますね。
あくまでサンプルですが、このコードにDBとの接続処理やロジックを記述していくことでサービスを提供できそうです。ですが、本番でちゃんと運用するにはログをちゃんと出力しないと難しいです。
上記の例では標準出力にprintln!
を使ってRequestが来たことは出力されていますがtimestampもなくいつ処理されたものなのかもわかりません。
本番環境での運用を考えてgRPCサーバのloggingについて考えて見ようと思います。
env_loggerの利用
Rustではログの出力を行うためログ出力機能が抽象化されたlog
crateとその実装crateが存在しています。crates.ioでも上位にあるenv_logger
を使ってログの出力を行ってみます。
まず、Cargo.toml
に対して依存するcrateの追加を行います。dependencies
に以下の2つのcrateを追加します。
env_logger = "0.8.2"
log = "0.4.11"
env_loggerの初期化
main関数部分でenv_loggerの初期化処理を追加します。あわせてprintln!
を使って標準出力を行っている部分をlog::info!
に書き換えて見ましょう
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// ここを追加
env_logger::init();
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();
log::info!!("GreeterServer listening on {}", addr);
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
こちらを改めて起動して確認をします。
起動直後
[2020-11-06T16:25:56Z INFO helloworld_server] GreeterServer listening on [::1]:50051
Requestの送信時
[2020-11-06T16:26:43Z INFO helloworld_server] Got a request from Some([::1]:34238)
無事timestampやlogレベルをあわせて出力することが出来ました。 ただし、これだけだと処理の内容がわからずlogを出力する意味があまりない状態なので処理の終了時にRequestの内容と処理が終わった旨を出力するように修正してみます。
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let reply = hello_world::HelloReply {
message: format!("Hello {}!", request.get_ref().name),
};
log::info!("Request: {:?}, Done", request);
Ok(Response::new(reply))
}
}
実行結果
[2020-11-06T16:35:39Z INFO helloworld_server] GreeterServer listening on [::1]:50051
[2020-11-06T16:35:40Z INFO helloworld_server] Request: Request { metadata: MetadataMap { headers: {"content-type": "application/grpc", "user-agent": "grpc-go/1.30.0", "te": "trailers"} }, message: HelloRequest { name: "foo" }, extensions: Extensions }, Done
これで意図したとおりに処理が終わった旨の出力とRequestの内容が表示されるようになりました。 もう少し実用的なアプリケーションを想定して処理を追加してみます
冒頭のimport宣言にCode
を追加します。
use tonic::{transport::Server, Code, Request, Response, Status};
何かしらの処理を行う関数some_logic()
の追加とそれを利用するようにsay_hello()
メソッドの修正を行います。
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let reply = hello_world::HelloReply {
message: format!("Hello {}!", some_logic(&request.get_ref().name).await?),
};
log::info!("Request: {:?}, Done", request);
Ok(Response::new(reply))
}
}
async fn some_logic(name: &str) -> Result<String, Status> {
log::info!("run some logic");
match name {
"foo" => {
log::error!("Failed some_logic");
Err(Status::new(Code::InvalidArgument, "who is foo"))
}
_ => Ok(name.to_string()),
}
}
Requestに含む名前によってはエラーを出力するように修正を行いました。サーバを起動し、先程と同様にgrpcurlでRequestを投げると以下のような出力を得ることが出来ます。
[2020-11-07T00:39:25Z INFO helloworld_server] GreeterServer listening on [::1]:50051
[2020-11-07T00:39:36Z INFO helloworld_server] run some logic
[2020-11-07T00:39:36Z ERROR helloworld_server] Failed some_logic
想定している通り失敗した時にERRORログが出力されることが確認できました。 しかしこの方法だと問題があります。
- ERRORログにRequestの情報がないので複数のRequestを受けている時にどのRequestがエラーになったのか判断出来ない
- 今回の処理は全てasync fnにより非同期に実行されるため、ログの出力に1つのRequestからなる処理の内容がが混ざって表示される
愚直に問題を解決させるなら、RequestやRequestヘッダーにRequesを識別できるIdを含めてそれをsome_logic()
関数に渡すことで解消はできます。
async fn some_logic(request_id: RequestId, name: &str) -> Result<String, State>
ただしこのやり方では更にlogicが複雑になったときなどに全てのlogicに対してRequestや識別子を持ち回す事を行わないと実現することが出来ません。
このような問題を解決するためにtracing
crateを利用することが出来ます。
tracing crateの利用
Cargo.toml
にはすでにtracingの依存が含まれている状態なので、修正はserver.rs
のみとなります。
#[derive(Default, Debug)]
pub struct MyGreeter {}
#[tonic::async_trait]
impl Greeter for MyGreeter {
#[tracing::instrument]
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let reply = hello_world::HelloReply {
message: format!("Hello {}!", some_logic(&request.get_ref().name).await?),
};
tracing::info!("Done");
Ok(Response::new(reply))
}
}
async fn some_logic(name: &str) -> Result<String, Status> {
log::info!("run some logic");
match name {
"foo" => {
tracing::error!("Failed some_logic");
Err(Status::new(Code::InvalidArgument, "who is foo"))
}
_ => Ok(name.to_string()),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();
log::info!("GreeterServer listening on {}", addr);
Server::builder()
.trace_fn(|_| tracing::info_span!("gRPC server"))
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
いくつかの修正を行っているため、修正点を列挙します。
- MyGreeterにDebug traitを実装
say_hello
メソッドに#[tracing::instrument]
を追加log::info
,log::error
としている部分をそれぞれtracing::info
,tracing::error
となるように修正main
関数で初期化していたenv_logger
の初期化処理を削除し、tracing_subscriber
の初期化処理を追加
このコードを実行し先程のエラーが起きるRequestを送信するとログの出力が以下のようになります
Nov 07 09:57:42.891 INFO helloworld_server: GreeterServer listening on [::1]:50051
Nov 07 10:02:51.585 INFO gRPC server:say_hello{self=MyGreeter request=Request { metadata: MetadataMap { headers: {"content-type": "application/grpc", "user-agent": "grpc-go/1.30.0", "te": "trailers"} }, message: HelloRequest { name: "foo" }, extensions: Extensions }}: helloworld_server: run some logic
Nov 07 10:02:51.585 ERROR gRPC server:say_hello{self=MyGreeter request=Request { metadata: MetadataMap { headers: {"content-type": "application/grpc", "user-agent": "grpc-go/1.30.0", "te": "trailers"} }, message: HelloRequest { name: "foo" }, extensions: Extensions }}: helloworld_server: Failed some_logic
実際にログを出力しているsome_logic()
関数内にはRequestの情報は渡していないにも関わらずログの出力にRequestの情報など付与されるようになりました。
どのような仕組みになっているのか少し説明をします。
tracing
crateは In−Process Tracing機能を提供するcrateとなります。Microservice等の分散処理システムの文脈ではJaeger
、Zipkin
を始めとする分散トレーシングという技術を利用してどこのサービスからどのサービスへ通信がされたか、その処理時間はなどメトリクスを取得することが出来ます。tracing
crateも同様にプロセス内部の処理を追跡できるような形で記録する仕組みを提供しています。
仕組みを理解する上で重要になるのが以下の3つの要素となります。
Span
- 処理を記録する期間を表します
- 名前やあわせて記録しておきたい情報を保持することができる
Event
Span
に記録するトレースしたい事象を表します- 発生した事象を記録したい情報とあわせて保持することが出来ます
Subscriber
Span
や紐付いたEvent
を収集するための処理を表します
今回の例を上記3つの要素を明確に使ってsay_hello()
メソッドの部分を書き直すと以下のようになります。
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let args = format!("{:?}", request);
let span = tracing::span!(tracing::Level::INFO, "say_hello", request = args.as_str());
let _enter = span.enter();
let reply = hello_world::HelloReply {
message: format!("Hello {}!", some_logic(&request.get_ref().name).await?),
};
tracing::event!(tracing::Level::INFO, "Done");
Ok(Response::new(reply))
}
- 処理の冒頭で
Span
を定義します- 定義内容
- 名前:
say_hello
Span
に含める情報 = RequestをDebug traitを使って文字列にした情報
- 名前:
- 定義内容
Span.enter()
を行いSpanの中に入る事を表す。(enter()はRAIIガードオブジェクトを返し、DropされたタイミングでSpanを閉じます)event!()
マクロを使って記録する内容を記述します。
上記の例からわかるとおり、#[tracing::instrument]
の処理ではSpan
の定義とSpan::enter()
の処理を自動的に生成しています。また、tracing::info!()
やtracing::error!()
はEvent
の生成をlog
crateと同様のI/Fで定義できるように作られています。
注意
公式のドキュメントにも記述されていますが非同期処理内でのSpan::enter()
の処理は慎重に利用するか避けることが明記されています。非同期関数の場合は#[tracing::instrument]
を使った場合にただしく生成できるとドキュメントに書かれているように#[tracing::instrument]
を利用することを推奨します。
tonicとの統合
すでに実装例で示していますが、tonicのServer::Builder
にはtrace_fn()
メソッドが用意されており、ここでRequest毎のSpan
を生成しています。次の例ではRequest全体の情報をSpan
に含めず、Request Headerにtrace_id
という文字列の情報を出力するように変更しています。(もちろんClient側でRequestする際にIdをヘッダーに入れる必要があります)
#[derive(Default, Debug)]
pub struct MyGreeter {}
#[tonic::async_trait]
impl Greeter for MyGreeter {
#[tracing::instrument(skip(self, request))]
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let reply = hello_world::HelloReply {
message: format!("Hello {}!", some_logic(&request.get_ref().name).await?),
};
tracing::info!("Done");
Ok(Response::new(reply))
}
}
async fn some_logic(name: &str) -> Result<String, Status> {
tracing::info!("run some logic");
match name {
"foo" => {
tracing::error!("Failed some_logic");
Err(Status::new(Code::InvalidArgument, "who is foo"))
}
_ => Ok(name.to_string()),
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
let addr = "[::1]:50051".parse().unwrap();
let greeter = MyGreeter::default();
log::info!("GreeterServer listening on {}", addr);
Server::builder()
.trace_fn(|header| {
let trace_id = header
.get("trace_id")
.map(|value| value.to_str().unwrap_or("Unknown"))
.unwrap_or("Unknown");
tracing::info_span!("gRPC server", trace_id = trace_id)
})
.add_service(GreeterServer::new(greeter))
.serve(addr)
.await?;
Ok(())
}
修正点
#[tracing::instrument(skip(self, request))]
- instrumentを使ってSpanを生成する場合、引数を全て
Span
に含める挙動になりますが、今回はtraice_id
のみを出力するために引数をSpan
に含めないようにしています
- instrumentを使ってSpanを生成する場合、引数を全て
trace_fn(|header| ....)
trace_fn()
メソッドは引数にHeaderMap
型をとり、Requestに含まれるHeaderの情報を取得することができます
実行結果
Nov 07 12:02:03.233 INFO helloworld_server: GreeterServer listening on [::1]:50051
Nov 07 12:02:06.875 INFO gRPC server{trace_id="Unknown"}:say_hello: helloworld_server: run some logic
Nov 07 12:02:06.875 ERROR gRPC server{trace_id="Unknown"}:say_hello: helloworld_server: Failed some_logic
Nov 07 12:02:12.284 INFO gRPC server{trace_id="xxxxxxxxxxxxxxxxxxxx"}:say_hello: helloworld_server: run some logic
Nov 07 12:02:12.284 ERROR gRPC server{trace_id="xxxxxxxxxxxxxxxxxxxx"}:say_hello: helloworld_server: Failed some_logic
出力結果にHeaderから取得したtrace_id
を含めることができました。
このようにtracing
crateを利用することで非同期に実行される処理に対してContextを含めたログの出力を行うことが出来ます。
おわりに
今回取り上げたtracing
にはこの処理機構を使って様々な処理を拡張するためのcrateが存在しており1つのecosystemが形成されて来ています。今回は単純にログを出力するだけのFmtSubscriber
を利用しましたが、tracing-opentelemetry
crateなどを利用すると前述した分散トレーシングシステムに対して出力することも可能です。
こちらは依存するopentelemetry
crateの変更が激しく、今回割愛していますが興味がある方は試してみると面白いと思います。私が検証した内容では
tracing-opentelemetry = "0.7"
opentelemetry-jaeger = "0.7.0"
opentelemetry = "0.8"
上記のような依存関係だとうまく実装が出来ましたが、すでにopentelemetry
crateは0.10.0
がリリースされている状態なので、本番への適用はもう少し様子を見たほうが良いかもしれません。
この記事がどこかの誰かの役に立つ日がくれば幸いです。