文章482
标签257
分类63

Rust的GRPC实现Tonic

tonic 是rust中的一个GRPC客户端和服务端的异步实现,底层使用了tokio的prost生成Protocol Buffers对应的代码;

本文讲解了如何使用Tonic,并提供了一个包含多个proto文件的项目案例;

源代码:


Rust的GRPC实现Tonic

前言

tonic是基于HTTP/2的gRPC实现,专注于高性能,互通性和灵活性;

创建该库的目的是为了对async/await具有一流的支持,并充当用Rust编写的生产系统的核心构建块;

特性:

  • 双向流传输
  • 高性能异步io
  • 互通性
  • 通过rustls进行TLS加密支持
  • 负载均衡
  • 自定义元数据
  • 身份认证
  • 健康检查
  • ……

编译 Protobuf 还是需要安装 protoc 的,可以参考官方文档:

另外,除了这个实现之外,PingCAP 也开源了一个实现:

我试了一下,说实话并没有 Tonic 好用,但是他的 benchmark 稍微高一些;

下面开始编写一个包含多个proto文件的项目案例;


创建项目

最终的目录结构如下:

$ tree .       
.
├── Cargo.toml
├── Cargo.lock 
├── build.rs
├── proto
  │   ├── basic
  │   │   └── basic.proto
  │   ├── goodbye.proto
  │   └── hello.proto
  └── src
  ├── bin
  │   ├── client.rs
  │   └── server.rs
  └── lib.rs

其中:

  • proto 目录中定义了服务;
  • build.rs 中声明了通过 proto 生成 rs 文件的脚本;
  • lib.rs 中引入了 build.rs 编译 proto 后生成的 rs 文件;
  • bin 目录下定义了客户端、服务端的实现;

首先创建一个 lib 项目:

cargo new tonic-demo --lib

在这个 lib 中我们实现服务代码,并通过 bin 目录下的 clientserver 实现客户端和服务端;

修改 Cargo 配置:

Cargo.toml

[[bin]]
name="server"
path="src/bin/server.rs"

[[bin]]
name="client"
path="src/bin/client.rs"

[dependencies]
prost = "0.11.3"
tokio = { version = "1.19.2", features = ["macros", "rt-multi-thread"] }
tonic = "0.8.3"

[build-dependencies]
tonic-build = "0.8.4"

定义服务

创建 proto 目录,并声明相应的服务;

由于网上的资料大多都是一个 proto 文件,而实际项目中基本上都是具有层级结构的;

因此这里我也使用了多个 proto 文件来演示;

定义如下:

// tonic-demo/proto/basic/basic.proto
syntax = "proto3";

package basic;

message BaseResponse {
  string message = 1;
  int32 code = 2;
}

// tonic-demo/proto/hello.proto
syntax = "proto3";

import "basic/basic.proto";

package hello;

service Hello {
  rpc Hello(HelloRequest) returns (HelloResponse) {}
}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string data = 1;
  basic.BaseResponse message = 2;
}

// tonic-demo/proto/goodbye.proto
syntax = "proto3";

import "basic/basic.proto";

package goodbye;

service Goodbye {
  rpc Goodbye(GoodbyeRequest) returns (GoodbyeResponse) {}
}

message GoodbyeRequest {
  string name = 1;
}

message GoodbyeResponse {
  string data = 1;
  basic.BaseResponse message = 2;
}

proto/basic 目录下定义了:BaseResponse

而在 hello.protogoodbye.proto 中都引入了他;


配置编译

下面来看 build.rs,这也是编译 protobuf 文件的关键!

众所周知,在 build.rs 中定义的代码,会在真正编译项目代码前被执行,用于在编译真正的项目前做一些骚操作;

因此,我们可以在这里先编译 protobuf 文件;

在上面 Cargo 配置中我们引入了:

[build-dependencies]
tonic-build = "0.8.4"

因此在这里被使用:

build.rs

use std::error::Error;
use std::fs;

static OUT_DIR: &str = "src/proto-gen";

fn main() -> Result<(), Box<dyn Error>> {
    let protos = [
        "proto/basic/basic.proto",
        "proto/hello.proto",
        "proto/goodbye.proto",
    ];

    fs::create_dir_all(OUT_DIR).unwrap();
    tonic_build::configure()
        .build_server(true)
        .out_dir(OUT_DIR)
        .compile(&protos, &["proto/"])?;

    rerun(&protos);

    Ok(())
}

fn rerun(proto_files: &[&str]) {
    for proto_file in proto_files {
        println!("cargo:rerun-if-changed={}", proto_file);
    }
}

首先,声明了我们要编译的 proto 文件,随后创建 proto 文件编译后的输出位置(默认在 target/build 目录下);

最后,使用 tonic_build 编译了 server 端的文件;

项目编译后,被编译的 proto 文件会输出至我们定义好的 src/proto-gen 下;

tonic-demo/src/proto-gen/basic.rs

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct BaseResponse {
    #[prost(string, tag = "1")]
    pub message: ::prost::alloc::string::String,
    #[prost(int32, tag = "2")]
    pub code: i32,
}

tonic-demo/src/proto-gen/hello.rs

#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloRequest {
    #[prost(string, tag = "1")]
    pub name: ::prost::alloc::string::String,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HelloResponse {
    #[prost(string, tag = "1")]
    pub data: ::prost::alloc::string::String,
    #[prost(message, optional, tag = "2")]
    pub message: ::core::option::Option<super::basic::BaseResponse>,
}
/// Generated client implementations.
pub mod hello_client {
    #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
    use tonic::codegen::*;
    use tonic::codegen::http::Uri;
    #[derive(Debug, Clone)]
    pub struct HelloClient<T> {
        inner: tonic::client::Grpc<T>,
    }
    impl HelloClient<tonic::transport::Channel> {
        /// Attempt to create a new client by connecting to a given endpoint.
        pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
        where
            D: std::convert::TryInto<tonic::transport::Endpoint>,
            D::Error: Into<StdError>,
        {
            let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
            Ok(Self::new(conn))
        }
    }
    impl<T> HelloClient<T>
    where
        T: tonic::client::GrpcService<tonic::body::BoxBody>,
        T::Error: Into<StdError>,
        T::ResponseBody: Body<Data = Bytes> + Send + 'static,
        <T::ResponseBody as Body>::Error: Into<StdError> + Send,
    {
        pub fn new(inner: T) -> Self {
            let inner = tonic::client::Grpc::new(inner);
            Self { inner }
        }
        pub fn with_origin(inner: T, origin: Uri) -> Self {
            let inner = tonic::client::Grpc::with_origin(inner, origin);
            Self { inner }
        }
        pub fn with_interceptor<F>(
            inner: T,
            interceptor: F,
        ) -> HelloClient<InterceptedService<T, F>>
        where
            F: tonic::service::Interceptor,
            T::ResponseBody: Default,
            T: tonic::codegen::Service<
                http::Request<tonic::body::BoxBody>,
                Response = http::Response<
                    <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
                >,
            >,
            <T as tonic::codegen::Service<
                http::Request<tonic::body::BoxBody>,
            >>::Error: Into<StdError> + Send + Sync,
        {
            HelloClient::new(InterceptedService::new(inner, interceptor))
        }
        /// Compress requests with the given encoding.
        ///
        /// This requires the server to support it otherwise it might respond with an
        /// error.
        #[must_use]
        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
            self.inner = self.inner.send_compressed(encoding);
            self
        }
        /// Enable decompressing responses.
        #[must_use]
        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
            self.inner = self.inner.accept_compressed(encoding);
            self
        }
        pub async fn hello(
            &mut self,
            request: impl tonic::IntoRequest<super::HelloRequest>,
        ) -> Result<tonic::Response<super::HelloResponse>, tonic::Status> {
            self.inner
                .ready()
                .await
                .map_err(|e| {
                    tonic::Status::new(
                        tonic::Code::Unknown,
                        format!("Service was not ready: {}", e.into()),
                    )
                })?;
            let codec = tonic::codec::ProstCodec::default();
            let path = http::uri::PathAndQuery::from_static("/hello.Hello/Hello");
            self.inner.unary(request.into_request(), path, codec).await
        }
    }
}
/// Generated server implementations.
pub mod hello_server {
    #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
    use tonic::codegen::*;
    /// Generated trait containing gRPC methods that should be implemented for use with HelloServer.
    #[async_trait]
    pub trait Hello: Send + Sync + 'static {
        async fn hello(
            &self,
            request: tonic::Request<super::HelloRequest>,
        ) -> Result<tonic::Response<super::HelloResponse>, tonic::Status>;
    }
    #[derive(Debug)]
    pub struct HelloServer<T: Hello> {
        inner: _Inner<T>,
        accept_compression_encodings: EnabledCompressionEncodings,
        send_compression_encodings: EnabledCompressionEncodings,
    }
    struct _Inner<T>(Arc<T>);
    impl<T: Hello> HelloServer<T> {
        pub fn new(inner: T) -> Self {
            Self::from_arc(Arc::new(inner))
        }
        pub fn from_arc(inner: Arc<T>) -> Self {
            let inner = _Inner(inner);
            Self {
                inner,
                accept_compression_encodings: Default::default(),
                send_compression_encodings: Default::default(),
            }
        }
        pub fn with_interceptor<F>(
            inner: T,
            interceptor: F,
        ) -> InterceptedService<Self, F>
        where
            F: tonic::service::Interceptor,
        {
            InterceptedService::new(Self::new(inner), interceptor)
        }
        /// Enable decompressing requests with the given encoding.
        #[must_use]
        pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
            self.accept_compression_encodings.enable(encoding);
            self
        }
        /// Compress responses with the given encoding, if the client supports it.
        #[must_use]
        pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
            self.send_compression_encodings.enable(encoding);
            self
        }
    }
    impl<T, B> tonic::codegen::Service<http::Request<B>> for HelloServer<T>
    where
        T: Hello,
        B: Body + Send + 'static,
        B::Error: Into<StdError> + Send + 'static,
    {
        type Response = http::Response<tonic::body::BoxBody>;
        type Error = std::convert::Infallible;
        type Future = BoxFuture<Self::Response, Self::Error>;
        fn poll_ready(
            &mut self,
            _cx: &mut Context<'_>,
        ) -> Poll<Result<(), Self::Error>> {
            Poll::Ready(Ok(()))
        }
        fn call(&mut self, req: http::Request<B>) -> Self::Future {
            let inner = self.inner.clone();
            match req.uri().path() {
                "/hello.Hello/Hello" => {
                    #[allow(non_camel_case_types)]
                    struct HelloSvc<T: Hello>(pub Arc<T>);
                    impl<T: Hello> tonic::server::UnaryService<super::HelloRequest>
                    for HelloSvc<T> {
                        type Response = super::HelloResponse;
                        type Future = BoxFuture<
                            tonic::Response<Self::Response>,
                            tonic::Status,
                        >;
                        fn call(
                            &mut self,
                            request: tonic::Request<super::HelloRequest>,
                        ) -> Self::Future {
                            let inner = self.0.clone();
                            let fut = async move { (*inner).hello(request).await };
                            Box::pin(fut)
                        }
                    }
                    let accept_compression_encodings = self.accept_compression_encodings;
                    let send_compression_encodings = self.send_compression_encodings;
                    let inner = self.inner.clone();
                    let fut = async move {
                        let inner = inner.0;
                        let method = HelloSvc(inner);
                        let codec = tonic::codec::ProstCodec::default();
                        let mut grpc = tonic::server::Grpc::new(codec)
                            .apply_compression_config(
                                accept_compression_encodings,
                                send_compression_encodings,
                            );
                        let res = grpc.unary(method, req).await;
                        Ok(res)
                    };
                    Box::pin(fut)
                }
                _ => {
                    Box::pin(async move {
                        Ok(
                            http::Response::builder()
                                .status(200)
                                .header("grpc-status", "12")
                                .header("content-type", "application/grpc")
                                .body(empty_body())
                                .unwrap(),
                        )
                    })
                }
            }
        }
    }
    impl<T: Hello> Clone for HelloServer<T> {
        fn clone(&self) -> Self {
            let inner = self.inner.clone();
            Self {
                inner,
                accept_compression_encodings: self.accept_compression_encodings,
                send_compression_encodings: self.send_compression_encodings,
            }
        }
    }
    impl<T: Hello> Clone for _Inner<T> {
        fn clone(&self) -> Self {
            Self(self.0.clone())
        }
    }
    impl<T: std::fmt::Debug> std::fmt::Debug for _Inner<T> {
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            write!(f, "{:?}", self.0)
        }
    }
    impl<T: Hello> tonic::server::NamedService for HelloServer<T> {
        const NAME: &'static str = "hello.Hello";
    }
}

需要注意的是:

为客户端生成的HelloClient类型:

  • 实现了Clone、Sync以及Send,因此可以跨线程使用;

为服务端生成的 HelloServer类型:

  • 包含 impl<T: Hello>,因此要求必须实现我们定义的 Hello Trait;

引入proto生成的文件

下面我们在 lib.rs 中引入 ptoroc 生成的文件:

lib.rs

pub mod basic {
    include!("./proto-gen/basic.rs");
}

pub mod hello {
    include!("./proto-gen/hello.rs");
}

pub mod goodbye {
    include!("./proto-gen/goodbye.rs");
}

这里使用了标准库提供的 include! 将文件引入;

如果你没有定义 proto 文件编译后的输出位置,则默认在 target/build 目录下;

此时也可以使用 tonic 提供的 include_proto!("hello") 宏,直接引入对应文件而不用额外制定路径了;

参考官方文档:


服务端实现

下面来实现服务端;

服务端的实现和其他语言基本类似,为对应 proto 定义的 Service 创建相应的 Service 实现即可:

tonic-demo/src/bin/server.rs

#[derive(Default)]
pub struct HelloService {}

#[tonic::async_trait]
impl Hello for HelloService {
    async fn hello(&self, req: Request<HelloRequest>) -> Result<Response<HelloResponse>, Status> {
        println!("hello receive request: {:?}", req);

        let response = HelloResponse {
            data: format!("Hello, {}", req.into_inner().name),
            message: Some(BaseResponse {
                message: "Ok".to_string(),
                code: 200,
            }),
        };
        Ok(Response::new(response))
    }
}

#[derive(Default)]
pub struct GoodbyeService {}

#[tonic::async_trait]
impl Goodbye for GoodbyeService {
    async fn goodbye(
        &self,
        req: Request<GoodbyeRequest>,
    ) -> Result<Response<GoodbyeResponse>, Status> {
        println!("goodbye receive request: {:?}", req);

        let response = GoodbyeResponse {
            data: format!("Goodbye, {}", req.into_inner().name),
            message: Some(BaseResponse {
                message: "Ok".to_string(),
                code: 200,
            }),
        };
        Ok(Response::new(response))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "0.0.0.0:50051".parse()?;

    println!("server starting at: {}", addr);

    Server::builder()
        .add_service(HelloServer::new(HelloService::default()))
        .add_service(GoodbyeServer::new(GoodbyeService::default()))
        .serve(addr)
        .await?;

    Ok(())
}

在对应的 Trait 中实现接口的相应逻辑,最后在 main 中注册 Service 即可,逻辑非常清晰;


客户端实现

客户端的实现就更加的简单了,首先通过地址创建 Endpoint 连接,随后直接调用对应函数即可:

tonic-demo/src/bin/client.rs

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = Endpoint::from_static("https://127.0.0.1:50051");

    let mut hello_cli = HelloClient::connect(addr.clone()).await?;
    let request = Request::new(HelloRequest {
        name: "tonic".to_string(),
    });
    let response = hello_cli.hello(request).await?;
    println!("hello response: {:?}", response.into_inner());

    let mut goodbye_cli = GoodbyeClient::connect(addr).await?;
    let request = Request::new(GoodbyeRequest {
        name: "tonic".to_string(),
    });
    let response = goodbye_cli.goodbye(request).await?;
    println!("goodbye response: {:?}", response.into_inner());

    Ok(())
}

是不是非常的简单;


测试

下面来测试一下,首先启动服务端:

$ cargo run --bin server             

server starting at: 0.0.0.0:50051

再启动客户端:

$ cargo run --bin client     

hello response: HelloResponse { data: "Hello, tonic", message: Some(BaseResponse { message: "Ok", code: 200 }) }
goodbye response: GoodbyeResponse { data: "Goodbye, tonic", message: Some(BaseResponse { message: "Ok", code: 200 }) }

客户端收到响应,并且服务端打出日志:

hello receive request: Request { metadata: MetadataMap { headers: {"te": "trailers", "content-type": "application/grpc", "user-agent": "tonic/0.8.3"} }, message: HelloRequest { name: "tonic" }, extensions: Extensions }
goodbye receive request: Request { metadata: MetadataMap { headers: {"te": "trailers", "content-type": "application/grpc", "user-agent": "tonic/0.8.3"} }, message: GoodbyeRequest { name: "tonic" }, extensions: Extensions }

在 Github Action 中需要添加步骤:

- name: Install protoc
    run: sudo apt-get install -y protobuf-compiler

安装 protoc;

参考代码:


总结

可以看到,相比于其他语言来说,在 Rust 中使用 grpc 更加的简单,甚至不需要额外的去编写 protoc 生成的 shell 脚本,而是通过 build.rs 更加优雅的实现了!

更多tonic使用方法:


附录

源代码:

开源库:

参考文章:



本文作者:Jasonkay
本文链接:https://jasonkayzk.github.io/2022/12/03/Rust的GRPC实现Tonic/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可