diff --git a/BUILD.bazel b/BUILD.bazel index 22cb508548..659f07a470 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -51,6 +51,9 @@ DEFINES = [ }) + select({ "//bazel/config:brpc_with_rdma": ["BRPC_WITH_RDMA=1"], "//conditions:default": [], +}) + select({ + "//bazel/config:brpc_with_gdr": ["-DBRPC_WITH_GDR=1"], + "//conditions:default": [""], }) + select({ "//bazel/config:brpc_with_debug_bthread_sche_safety": ["BRPC_DEBUG_BTHREAD_SCHE_SAFETY=1"], "//conditions:default": ["BRPC_DEBUG_BTHREAD_SCHE_SAFETY=0"], @@ -94,6 +97,11 @@ LINKOPTS = [ "-libverbs", ], "//conditions:default": [], +}) + select({ + "//bazel/config:brpc_with_gdr": [ + "-lcuda -lcudart", + ], + "//conditions:default": [], }) + select({ "//bazel/config:brpc_with_asan": ["-fsanitize=address"], "//conditions:default": [], @@ -235,6 +243,7 @@ BUTIL_SRCS = [ "src/butil/iobuf.cpp", "src/butil/single_iobuf.cpp", "src/butil/iobuf_profiler.cpp", + "src/butil/gpu/gpu_block_pool.cpp", "src/butil/binary_printer.cpp", "src/butil/recordio.cc", "src/butil/popen.cpp", diff --git a/Makefile b/Makefile index abe029e360..29390ac6f5 100644 --- a/Makefile +++ b/Makefile @@ -97,6 +97,7 @@ BUTIL_SOURCES = \ src/butil/files/scoped_temp_dir.cc \ src/butil/file_util.cc \ src/butil/file_util_posix.cc \ + src/butil/gpu/gpu_block_pool.cpp \ src/butil/guid.cc \ src/butil/guid_posix.cc \ src/butil/hash.cc \ diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel index eec551da8b..12319f0dbe 100644 --- a/bazel/config/BUILD.bazel +++ b/bazel/config/BUILD.bazel @@ -104,6 +104,12 @@ config_setting( visibility = ["//visibility:public"], ) +config_setting( + name = "brpc_with_gdr", + define_values = {"BRPC_WITH_GDR": "true"}, + visibility = ["//visibility:public"], +) + config_setting( name = "brpc_with_boringssl", define_values = {"BRPC_WITH_BORINGSSL": "true"}, @@ -149,4 +155,4 @@ config_setting( name = "with_babylon_counter", define_values = {"with_babylon_counter": "true"}, visibility = ["//visibility:public"], -) \ No newline at end of file +) diff --git a/config_brpc.sh b/config_brpc.sh index 4526d218a8..de2dfdc74a 100755 --- a/config_brpc.sh +++ b/config_brpc.sh @@ -54,10 +54,11 @@ else LDD=ldd fi -TEMP=`getopt -o v: --long headers:,libs:,cc:,cxx:,with-glog,with-thrift,with-rdma,with-mesalink,with-bthread-tracer,with-debug-bthread-sche-safety,with-debug-lock,with-asan,nodebugsymbols,werror -n 'config_brpc' -- "$@"` +TEMP=`getopt -o v: --long headers:,libs:,cc:,cxx:,with-glog,with-thrift,with-rdma,with-gdr,with-mesalink,with-bthread-tracer,with-debug-bthread-sche-safety,with-debug-lock,with-asan,nodebugsymbols,werror -n 'config_brpc' -- "$@"` WITH_GLOG=0 WITH_THRIFT=0 WITH_RDMA=0 +WITH_GDR=0 WITH_MESALINK=0 WITH_BTHREAD_TRACER=0 WITH_ASAN=0 @@ -87,6 +88,7 @@ while true; do --with-glog ) WITH_GLOG=1; shift 1 ;; --with-thrift) WITH_THRIFT=1; shift 1 ;; --with-rdma) WITH_RDMA=1; shift 1 ;; + --with-gdr) WITH_GDR=1; shift 1 ;; --with-mesalink) WITH_MESALINK=1; shift 1 ;; --with-bthread-tracer) WITH_BTHREAD_TRACER=1; shift 1 ;; --with-debug-bthread-sche-safety ) BRPC_DEBUG_BTHREAD_SCHE_SAFETY=1; shift 1 ;; @@ -532,6 +534,18 @@ if [ $WITH_RDMA != 0 ]; then append_to_output "WITH_RDMA=1" fi +if [ $WITH_GDR != 0 ]; then + CUDA_LIB="/usr/local/cuda/lib64" + CUDA_HDR="/usr/local/cuda/include" + append_to_output_libs "$CUDA_LIB" + append_to_output_headers "$CUDA_HDR" + + CPPFLAGS="${CPPFLAGS} -DBRPC_WITH_GDR" + + append_to_output "DYNAMIC_LINKINGS+=-lcuda -lcudart" + append_to_output "WITH_GDR=1" +fi + if [ $WITH_MESALINK != 0 ]; then CPPFLAGS="${CPPFLAGS} -DUSE_MESALINK" fi @@ -652,6 +666,7 @@ print_info "System: $SYSTEM" if [ $WITH_GLOG -ne 0 ]; then print_info "With glog: yes"; fi if [ $WITH_THRIFT -ne 0 ]; then print_info "With thrift: yes"; fi if [ $WITH_RDMA -ne 0 ]; then print_info "With RDMA: yes"; fi +if [ $WITH_GDR -ne 0 ]; then print_info "With GDR: yes"; fi if [ $WITH_MESALINK -ne 0 ]; then print_info "With MesaLink: yes"; fi if [ $WITH_BTHREAD_TRACER -ne 0 ]; then print_info "With bthread tracer: yes"; fi if [ $WITH_ASAN -ne 0 ]; then print_info "With ASAN: yes"; fi diff --git a/docs/cn/gdr.md b/docs/cn/gdr.md new file mode 100644 index 0000000000..7e46f19d2f --- /dev/null +++ b/docs/cn/gdr.md @@ -0,0 +1,43 @@ +# 编译 + +GDR: GPU Direct Rdma, gdr 是rdma的一种特殊模式,其通过rdma将数据直接收到了gpu的显存上。 + +由于GDR对驱动与硬件有要求,目前仅支持在Linux系统编译并运行GDR功能。 + +目前GDR只支持baidu std protocol。 + +使用config_brpc: +```bash +sh config_brpc.sh --with-rdma --with-gdr --headers="/usr/include" --libs="/usr/lib64 /usr/bin" +make + +cd example/rdma_performance # 示例程序 +make +``` + +使用bazel: +```bash +# Server +bazel build --define=BRPC_WITH_RDMA=true --define=BRPC_WITH_GDR=true example:rdma_performance_server +# Client +bazel build --define=BRPC_WITH_RDMA=true --define=BRPC_WITH_GDR=true example:rdma_performance_client +``` + +# 基本实现 + +GDR是RDMA的一种特殊形式,在使用GDR之前,必须对RDMA和GDR都进行Global Init。 +GDR新增了一个显存池,类似于RDMA内存池,显存池的数据也是按照block进行组织的。 +当打开GDR功能后,框架通过DoPostRecvGDR来发起显存上的WQE。 +在接收到数据后,我们将header、meta、body(不包括attachment)copy回内存进行处理。 +AttachMent位于显存上,用户可以调用IOBuf::copy_from_gpu接口将attachment从brpc框架层copy到应用层进行处理。 + + +注意: +1. 在使用gdr功能时,需要将环境变量MLX5_SCATTER_TO_CQE设置为0. + + +# 参数 + +可配置参数说明: +* gdr_block_size_kb: 使用gdr传送数据时,block的大小(单位为KB),默认为512; +* max_gdr_regions: gdr显存池所使用Region的最大个数,每个Region大小为1GB; diff --git a/docs/en/gdr.md b/docs/en/gdr.md new file mode 100644 index 0000000000..2e968f3e69 --- /dev/null +++ b/docs/en/gdr.md @@ -0,0 +1,44 @@ +Compile GDR: + +GPU Direct RDMA. GDR is a special mode of RDMA that allows data to be received directly into the GPU’s memory through RDMA. +Because GDR requires specific drivers and hardware support, it is currently only available for compilation and execution on Linux systems. +At present, GDR only supports the Baidu STD protocol. + +To use config_brpc: + +sh config_brpc.sh --with-rdma --with-gdr --headers="/usr/include" --libs="/usr/lib64 /usr/bin" +make +cd example/rdma_performance # Example program +make + +To use Bazel: + +# Server +bazel build --define=BRPC_WITH_RDMA=true --define=BRPC_WITH_GDR=true example:rdma_performance_server + +# Client +bazel build --define=BRPC_WITH_RDMA=true --define=BRPC_WITH_GDR=true example:rdma_performance_client + + +Basic Implementation: + +GDR is a special form of RDMA. Before using GDR, both RDMA and GDR must be globally initialized. + +GDR introduces a GPU memory pool, similar to the RDMA memory pool. Data in the GPU memory pool is also organized in blocks. + +When GDR is enabled, the framework initiates WQEs on GPU memory through DoPostRecvGDR. + +After receiving data, the header, meta, and body (excluding attachments) are copied back to host memory for processing. +Attachments remain in GPU memory, and users can call IOBuf::copy_from_gpu to copy attachments from the brpc framework layer to the application layer. + +Note: + +When using GDR, the environment variable MLX5_SCATTER_TO_CQE must be set to 0. + +Parameters + +Configurable parameters: + +gdr_block_size_kb: The block size (in KB) used when transferring data via GDR. Default is 512. + +max_gdr_regions: The maximum number of regions used by the GDR GPU memory pool. Each region is 1 GB. diff --git a/example/BUILD.bazel b/example/BUILD.bazel index 4ee7cb140f..098d283973 100644 --- a/example/BUILD.bazel +++ b/example/BUILD.bazel @@ -34,6 +34,9 @@ COPTS = [ }) + select({ "//bazel/config:brpc_with_rdma": ["-DBRPC_WITH_RDMA=1"], "//conditions:default": [""], +}) + select({ + "//bazel/config:brpc_with_gdr": ["-DBRPC_WITH_GDR=1"], + "//conditions:default": [""], }) brpc_proto_library( @@ -119,4 +122,4 @@ cc_binary( deps = [ "//:brpc", ], -) \ No newline at end of file +) diff --git a/example/rdma_performance/client.cpp b/example/rdma_performance/client.cpp index 2e8acc4051..40944d65a1 100644 --- a/example/rdma_performance/client.cpp +++ b/example/rdma_performance/client.cpp @@ -15,6 +15,10 @@ // specific language governing permissions and limitations // under the License. +#ifdef BRPC_WITH_GDR +#include +#include +#endif #include #include #include @@ -42,6 +46,7 @@ DEFINE_string(connection_type, "single", "Connection type of the channel"); DEFINE_string(protocol, "baidu_std", "Protocol type."); DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers"); DEFINE_bool(use_rdma, true, "Use RDMA or not"); +DEFINE_bool(use_gdr, false, "Use GDR or not"); DEFINE_int32(rpc_timeout_ms, 2000, "RPC call timeout"); DEFINE_int32(test_seconds, 20, "Test running time"); DEFINE_int32(test_iterations, 0, "Test iterations"); @@ -84,16 +89,46 @@ class PerformanceTest { , _stop(false) { if (attachment_size > 0) { - _addr = malloc(attachment_size); - butil::fast_rand_bytes(_addr, attachment_size); - _attachment.append(_addr, attachment_size); +#ifdef BRPC_WITH_GDR + if (FLAGS_use_gdr) { + int gpu_id = 0; + cudaSetDevice(gpu_id); + cudaMalloc(&_addr, attachment_size); + auto pd = brpc::rdma::GetRdmaPd(); + mr = ibv_reg_mr(pd, _addr, attachment_size, + IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE); + if (!mr) { + LOG(FATAL) << "Failed to register MR:" << strerror(errno) + << ", addr:" << _addr; + } + auto deleter = [](void* data) {}; + _attachment.append_user_data_with_meta(_addr, attachment_size, deleter, mr->lkey); + } + else +#endif + { + _addr = malloc(attachment_size); + butil::fast_rand_bytes(_addr, attachment_size); + _attachment.append(_addr, attachment_size); + } } _echo_attachment = echo_attachment; } ~PerformanceTest() { if (_addr) { - free(_addr); +#ifdef BRPC_WITH_GDR + if (FLAGS_use_gdr) { + ibv_dereg_mr(mr); + cudaFree(_addr); + } + else +#endif + { + free(_addr); + } } delete _channel; } @@ -103,6 +138,11 @@ class PerformanceTest { int Init() { brpc::ChannelOptions options; options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : brpc::SOCKET_MODE_TCP; +#ifdef BRPC_WITH_GDR + if (FLAGS_use_gdr) { + options.socket_mode = brpc::SOCKET_MODE_GDR; + } +#endif options.protocol = FLAGS_protocol; options.connection_type = FLAGS_connection_type; options.timeout_ms = FLAGS_rpc_timeout_ms; @@ -203,6 +243,9 @@ class PerformanceTest { } private: +#ifdef BRPC_WITH_GDR + ibv_mr* mr; +#endif void* _addr; brpc::Channel* _channel; uint64_t _start_time; @@ -223,6 +266,7 @@ void Test(int thread_num, int attachment_size) { << ", Depth: " << FLAGS_queue_depth << ", Attachment: " << attachment_size << "B" << ", RDMA: " << (FLAGS_use_rdma ? "yes" : "no") + << ", GDR: " << (FLAGS_use_gdr ? "yes" : "no") << ", Echo: " << (FLAGS_echo_attachment ? "yes]" : "no]") << std::endl; g_total_bytes.store(0, butil::memory_order_relaxed); @@ -278,6 +322,12 @@ int main(int argc, char* argv[]) { if (FLAGS_use_rdma) { brpc::rdma::GlobalRdmaInitializeOrDie(); } +#ifdef BRPC_WITH_GDR + else if (FLAGS_use_gdr) { + brpc::rdma::GlobalRdmaInitializeOrDie(); + brpc::rdma::GlobalGdrInitializeOrDie(); + } +#endif brpc::StartDummyServerAt(FLAGS_dummy_port); diff --git a/example/rdma_performance/server.cpp b/example/rdma_performance/server.cpp index 2e93e1eec7..eca2641513 100644 --- a/example/rdma_performance/server.cpp +++ b/example/rdma_performance/server.cpp @@ -28,6 +28,7 @@ DEFINE_int32(port, 8002, "TCP Port of this server"); DEFINE_bool(use_rdma, true, "Use RDMA or not"); +DEFINE_bool(use_gdr, false, "Use GDR or not"); butil::atomic g_last_time(0); @@ -77,6 +78,12 @@ int main(int argc, char* argv[]) { brpc::ServerOptions options; options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : brpc::SOCKET_MODE_TCP; +#ifdef BRPC_WITH_GDR + if (FLAGS_use_gdr) { + options.socket_mode = brpc::SOCKET_MODE_GDR; + } +#endif + if (server.Start(FLAGS_port, &options) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; diff --git a/src/brpc/acceptor.h b/src/brpc/acceptor.h index 77942beca2..f28d3f5bce 100644 --- a/src/brpc/acceptor.h +++ b/src/brpc/acceptor.h @@ -111,7 +111,7 @@ friend class Server; bool _force_ssl; std::shared_ptr _ssl_ctx; - // Choose to use a certain socket: 0 TCP, 1 RDMA + // Choose to use a certain socket: 0 TCP, 1 RDMA, 2 GDR SocketMode _socket_mode; // Acceptor belongs to this tag diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index a8caeaf953..08f1445a58 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -134,6 +134,8 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { } if (opt.socket_mode == SOCKET_MODE_RDMA) { buf.append("|rdma"); + } else if (opt.socket_mode == SOCKET_MODE_GDR) { + buf.append("|gdr"); } butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size()); buf.clear(); diff --git a/src/brpc/gdr_transport.cpp b/src/brpc/gdr_transport.cpp new file mode 100644 index 0000000000..005b191380 --- /dev/null +++ b/src/brpc/gdr_transport.cpp @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include "brpc/gdr_transport.h" +#include "brpc/rdma/rdma_helper.h" + +namespace brpc { + +void GdrTransport::Init(Socket *socket, const SocketOptions &options) { + DoInit(socket, options, true); +} + +int GdrTransport::GdrContextInitOrDie() { + rdma::GlobalGdrInitializeOrDie(); + return 0; +} + +} // namespace brpc +#endif diff --git a/src/brpc/gdr_transport.h b/src/brpc/gdr_transport.h new file mode 100644 index 0000000000..0f41c0c4db --- /dev/null +++ b/src/brpc/gdr_transport.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_GDR_TRANSPORT_H +#define BRPC_GDR_TRANSPORT_H + +#if BRPC_WITH_GDR +#include "brpc/rdma_transport.h" + +namespace brpc { +class GdrTransport : public RdmaTransport { +public: + void Init(Socket* socket, const SocketOptions& options) override; + static int GdrContextInitOrDie(); +}; +} // namespace brpc +#endif // BRPC_WITH_GDR +#endif //BRPC_GDR_TRANSPORT_H diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 2c5a7e7224..56b694726d 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -104,6 +104,12 @@ static void SerializeRpcHeaderAndMeta( ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, bool /*read_eof*/, const void*) { +#if BRPC_WITH_GDR + bool is_gpu_memory = source->is_gpu_memory(); + if (is_gpu_memory) { + return ParseRpcMessageGpu(source, socket, false /* not use */, nullptr /* not use */); + } +#endif // BRPC_WITH_GDR char header_buf[12]; const size_t n = source->copy_to(header_buf, sizeof(header_buf)); if (n >= 4) { @@ -796,7 +802,15 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { butil::IOBuf req_buf; int body_without_attachment_size = req_size - meta.attachment_size(); - msg->payload.cutn(&req_buf, body_without_attachment_size); +#if BRPC_WITH_GDR + bool is_gpu_memory = msg->payload.is_gpu_memory(); + if (is_gpu_memory) { + FillReqBufGpu(&req_buf, msg.get(), body_without_attachment_size); + } else +#endif // BRPC_WITH_GDR + { + msg->payload.cutn(&req_buf, body_without_attachment_size); + } if (meta.attachment_size() > 0) { cntl->request_attachment().swap(msg->payload); } @@ -968,17 +982,25 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { butil::IOBuf res_buf; const int res_size = msg->payload.length(); butil::IOBuf* res_buf_ptr = &msg->payload; - if (meta.has_attachment_size()) { - if (meta.attachment_size() > res_size) { - cntl->SetFailed( - ERESPONSE, "attachment_size=%d is larger than response_size=%d", - meta.attachment_size(), res_size); - break; +#if BRPC_WITH_GDR + bool is_gpu_memory = msg->payload.is_gpu_memory(); + if (is_gpu_memory) { + FillResBufGpu(&res_buf, msg.get(), meta, &res_buf_ptr, cntl); + } else +#endif // BRPC_WITH_GDR + { + if (meta.has_attachment_size()) { + if (meta.attachment_size() > res_size) { + cntl->SetFailed( + ERESPONSE, "attachment_size=%d is larger than response_size=%d", + meta.attachment_size(), res_size); + break; + } + int body_without_attachment_size = res_size - meta.attachment_size(); + msg->payload.cutn(&res_buf, body_without_attachment_size); + res_buf_ptr = &res_buf; + cntl->response_attachment().swap(msg->payload); } - int body_without_attachment_size = res_size - meta.attachment_size(); - msg->payload.cutn(&res_buf, body_without_attachment_size); - res_buf_ptr = &res_buf; - cntl->response_attachment().swap(msg->payload); } ContentType content_type = meta.content_type(); diff --git a/src/brpc/policy/baidu_rpc_protocol.h b/src/brpc/policy/baidu_rpc_protocol.h index 77ecc780a2..6a3c379142 100644 --- a/src/brpc/policy/baidu_rpc_protocol.h +++ b/src/brpc/policy/baidu_rpc_protocol.h @@ -19,6 +19,8 @@ #ifndef BRPC_POLICY_BRPC_PROTOCOL_H #define BRPC_POLICY_BRPC_PROTOCOL_H +#include "brpc/policy/baidu_rpc_meta.pb.h" // RpcRequestMeta +#include "brpc/policy/most_common_message.h" #include "brpc/protocol.h" namespace brpc { @@ -53,6 +55,17 @@ void PackRpcRequest(butil::IOBuf* buf, // Returns the `name' of the 'content_type'. const char* ContentTypeToCStr(ContentType content_type); +#if BRPC_WITH_GDR +// Parse binary format of baidu_std +ParseResult ParseRpcMessageGpu(butil::IOBuf* source, Socket *socket, bool read_eof, + const void *arg); + +void FillReqBufGpu(butil::IOBuf* req_buf, MostCommonMessage* msg, int body_without_attachment_size); + +void FillResBufGpu(butil::IOBuf* res_buf, MostCommonMessage* msg, const RpcMeta& meta, + butil::IOBuf** res_buf_ptr, Controller* cntl); + +#endif } // namespace policy } // namespace brpc diff --git a/src/brpc/policy/baidu_rpc_protocol_gpu.cpp b/src/brpc/policy/baidu_rpc_protocol_gpu.cpp new file mode 100644 index 0000000000..67b06cdcf4 --- /dev/null +++ b/src/brpc/policy/baidu_rpc_protocol_gpu.cpp @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include // MethodDescriptor +#include // Message +#include +#include +#include + +#include "butil/gpu/gpu_block_pool.h" +#include "butil/iobuf.h" // butil::IOBuf +#include "butil/logging.h" // LOG() +#include "butil/raw_pack.h" // RawPacker RawUnpacker +#include "butil/memory/scope_guard.h" +#include "butil/raw_pack.h" // RawPacker RawUnpacker +#include "butil/strings/string_util.h" + +#include "json2pb/json_to_pb.h" +#include "json2pb/pb_to_json.h" +#include "brpc/controller.h" // Controller +#include "brpc/socket.h" // Socket +#include "brpc/server.h" // Server +#include "brpc/span.h" +#include "brpc/compress.h" // ParseFromCompressedData +#include "brpc/checksum.h" +#include "brpc/stream_impl.h" +#include "brpc/rpc_dump.h" // SampledRequest +#include "brpc/rpc_pb_message_factory.h" +#include "brpc/policy/baidu_rpc_meta.pb.h" // RpcRequestMeta +#include "brpc/policy/baidu_rpc_protocol.h" +#include "brpc/policy/most_common_message.h" +#include "brpc/policy/streaming_rpc_protocol.h" +#include "brpc/details/usercode_backup_pool.h" +#include "brpc/details/controller_private_accessor.h" +#include "brpc/details/server_private_accessor.h" + +namespace brpc { +namespace policy { + +// Notes: +// 1. 12-byte header [PRPC][body_size][meta_size] +// 2. body_size and meta_size are in network byte order +// 3. Use service->full_name() + method_name to specify the method to call +// 4. `attachment_size' is set iff request/response has attachment +// 5. Not supported: chunk_info + +// Pack header into `buf' + +const int header_size = 12; +// if we recv data into gpu, the header/meta/body will be copied to cpu and processed. +// in to to limit the count of d2h, we will prefetch 512B from gpu to cpu. +// if header_size + meta_size + body_size(without attachment) is less than 512, then one +// d2h is enough for one rpc. + +const int prefetch_d2h_size = 512; + +ParseResult ParseRpcMessageGpu(butil::IOBuf* source, Socket* socket, + bool /*read_eof*/, const void*) { + + char header_buf[12]; + size_t n = 0; + uint32_t body_size; + uint32_t meta_size; + ParseError pe = PARSE_OK; + + void* prefetch_d2h_data = NULL; + bool is_gpu_memory = source->is_gpu_memory(); + if (!is_gpu_memory) { + LOG(FATAL) << "RpcMessage is not in gpu!!!"; + } + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + prefetch_d2h_data = host_allocator->AllocateRaw(prefetch_d2h_size); + if (prefetch_d2h_data == NULL) { + LOG(FATAL) << "alloc host data failed!!!"; + } + + // n is the bytes we real frefetch, n maybe less than prefetch_d2h_size; + n = source->copy_from_gpu(prefetch_d2h_data, prefetch_d2h_size); + size_t copy_size = n > 12 ? 12 : n; + memcpy(header_buf, prefetch_d2h_data, copy_size); + + do { + if (n >= 4) { + void* dummy = header_buf; + if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } else { + if (memcmp(header_buf, "PRPC", n) != 0) { + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } + if (n < sizeof(header_buf)) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; + } + butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); + if (body_size > FLAGS_max_body_size) { + // We need this log to report the body_size to give users some clues + // which is not printed in InputMessenger. + LOG(ERROR) << "body_size=" << body_size << " from " + << socket->remote_side() << " is too large"; + pe = PARSE_ERROR_TOO_BIG_DATA; + break; + } else if (source->length() < sizeof(header_buf) + body_size) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; + } + if (meta_size > body_size) { + LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" + << body_size; + // Pop the message + source->pop_front(sizeof(header_buf) + body_size); + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } while (0); + + if (pe != PARSE_OK) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + return MakeParseError(pe); + } + + source->pop_front(sizeof(header_buf)); + MostCommonMessage* msg = MostCommonMessage::Get(); + + if (header_size + meta_size <= n) { + auto deleter = [host_allocator, prefetch_d2h_data](void* data) { host_allocator->DeallocateRaw(prefetch_d2h_data); }; + // n is the bytes we real frefetch. We set n as the meta and n will be used in ProcessRpcRequest/ProcessRpcResponse. + // This is a trick, we should keep n in another better way. + msg->meta.append_user_data_with_meta((char*)prefetch_d2h_data + header_size, meta_size, deleter, n); + source->pop_front(meta_size); + } else { + host_allocator->DeallocateRaw(prefetch_d2h_data); + source->cutn_from_gpu(&msg->meta, meta_size); + } + source->cutn(&msg->payload, body_size - meta_size); + return MakeMessage(msg); +} + + +void FillReqBufGpu(butil::IOBuf* req_buf, MostCommonMessage* msg, int body_without_attachment_size) { + int meta_size = msg->meta.size(); + bool is_gpu_memory = msg->payload.is_gpu_memory(); + if (!is_gpu_memory) { + LOG(FATAL) << "message is not on gpu!!!"; + } + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + req_buf->append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(req_buf, body_without_attachment_size); + } +} + +void FillResBufGpu(butil::IOBuf* res_buf, MostCommonMessage* msg, const RpcMeta& meta, + butil::IOBuf** res_buf_ptr, Controller* cntl) { + const int res_size = msg->payload.length(); + int meta_size = msg->meta.size(); + bool is_gpu_memory = msg->payload.is_gpu_memory(); + if (!is_gpu_memory) { + LOG(FATAL) << "message is not on gpu!!!"; + } + if (meta.has_attachment_size()) { + if (meta.attachment_size() > res_size) { + cntl->SetFailed( + ERESPONSE, "attachment_size=%d is larger than response_size=%d", + meta.attachment_size(), res_size); + return; + } + int body_without_attachment_size = res_size - meta.attachment_size(); + + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf->append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(res_buf, body_without_attachment_size); + } + *res_buf_ptr = res_buf; + cntl->response_attachment().swap(msg->payload); + } else { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + res_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf->append((char*)data + meta_size, res_size); + msg->payload.pop_front(res_size); + } else { + msg->payload.cutn_from_gpu(res_buf, res_size); + } + *res_buf_ptr = res_buf; + } +} + +#endif +} // namespace policy +} // namespace brpc diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index f09d723ca1..1eb576d7fa 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -19,6 +19,9 @@ #include #include "butil/fd_utility.h" +#if BRPC_WITH_GDR +#include "butil/gpu/gpu_block_pool.h" +#endif #include "butil/logging.h" // CHECK, LOG #include "butil/sys_byteorder.h" // HostToNet,NetToHost #include "bthread/bthread.h" @@ -57,7 +60,7 @@ DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side"); DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy"); DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once."); DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP."); -DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP."); +DEFINE_int32(rdma_prepared_qp_cnt, 256, "Initial count of prepared QP."); DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely"); BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate); DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA."); @@ -90,6 +93,7 @@ static uint16_t g_rdma_hello_msg_len = 40; // In Byte static uint16_t g_rdma_hello_version = 2; static uint16_t g_rdma_impl_version = 1; static uint32_t g_rdma_recv_block_size = 0; +static uint32_t g_gdr_recv_block_size = 0; // static const uint32_t MAX_INLINE_DATA = 64; static const uint8_t MAX_HOP_LIMIT = 16; @@ -166,8 +170,9 @@ RdmaResource::~RdmaResource() { } } -RdmaEndpoint::RdmaEndpoint(Socket* s) +RdmaEndpoint::RdmaEndpoint(Socket* s, bool use_gdr) : _socket(s) + , _use_gdr(use_gdr) , _state(UNINIT) , _resource(NULL) , _send_cq_events(0) @@ -450,7 +455,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { local_msg.msg_len = g_rdma_hello_msg_len; local_msg.hello_ver = g_rdma_hello_version; local_msg.impl_ver = g_rdma_impl_version; - local_msg.block_size = g_rdma_recv_block_size; + local_msg.block_size = ep->use_gdr() ? g_gdr_recv_block_size : g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; local_msg.lid = GetRdmaLid(); @@ -668,7 +673,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { } else { local_msg.lid = GetRdmaLid(); local_msg.gid = GetRdmaGid(); - local_msg.block_size = g_rdma_recv_block_size; + local_msg.block_size = ep->use_gdr() ? g_gdr_recv_block_size : g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; local_msg.hello_ver = g_rdma_hello_version; @@ -1001,8 +1006,15 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { case IBV_WC_RECV: { // recv completion // Please note that only the first wc.byte_len bytes is valid if (wc.byte_len > 0) { - if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) { - zerocopy = false; +#if BRPC_WITH_GDR + if (_use_gdr) { + zerocopy = true; + } else +#endif // BRPC_WITH_GDR + { + if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) { + zerocopy = false; + } } CHECK(_state != FALLBACK_TCP); if (zerocopy) { @@ -1063,26 +1075,76 @@ int RdmaEndpoint::DoPostRecv(void* block, size_t block_size) { return 0; } +int RdmaEndpoint::DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey) { + ibv_recv_wr wr; + memset(&wr, 0, sizeof(wr)); + ibv_sge sge; + sge.addr = (uint64_t)block; + sge.length = block_size; + sge.lkey = lkey; + wr.num_sge = 1; + wr.sg_list = &sge; + //LOG(INFO) << "POST recv: addr=0x" << std::hex << sge.addr + // << std::dec << " length=0x" << sge.length + // << " lkey=0x" << sge.lkey; + //LOG(INFO) << block << " " << _device_allocator->get_lkey(); + ibv_recv_wr* bad = NULL; + int err = ibv_post_recv(_resource->qp, &wr, &bad); + if (err != 0) { + LOG(WARNING) << "Fail to ibv_post_recv: " << berror(err); + return -1; + } + return 0; +} + int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { // We do the post repeatedly from the _rbuf[_rq_received]. while (num > 0) { +#if BRPC_WITH_GDR + uint32_t lkey = 0; +#endif // if BRPC_WITH_GDR if (zerocopy) { _rbuf[_rq_received].clear(); - butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], - g_rdma_recv_block_size + IOBUF_BLOCK_HEADER_LEN); - int size = 0; - if (!os.Next(&_rbuf_data[_rq_received], &size)) { - // Memory is not enough for preparing a block - PLOG(WARNING) << "Fail to allocate rbuf"; - return -1; - } else { - CHECK(static_cast(size) == g_rdma_recv_block_size) << size; + +#if BRPC_WITH_GDR + if (_use_gdr) { + butil::gdr::BlockPoolAllocator* device_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_allocator(); + void* device_ptr = device_allocator->AllocateRaw(g_gdr_recv_block_size); + auto deleter = [device_allocator](void* data) { device_allocator->DeallocateRaw(data); }; + lkey = device_allocator->get_lkey(device_ptr); + // we keep lkey into the meta, and this is a thick. we also keep prefetch d2h size in meta too. + _rbuf[_rq_received].append_user_data_with_meta(device_ptr, g_gdr_recv_block_size, deleter, lkey); + _rbuf_data[_rq_received] = device_ptr; + } else +#endif // if BRPC_WITH_GDR + { + butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], + g_rdma_recv_block_size + IOBUF_BLOCK_HEADER_LEN); + int size = 0; + if (!os.Next(&_rbuf_data[_rq_received], &size)) { + // Memory is not enough for preparing a block + PLOG(WARNING) << "Fail to allocate rbuf"; + return -1; + } else { + CHECK(static_cast(size) == g_rdma_recv_block_size) << size; + } } } - if (DoPostRecv(_rbuf_data[_rq_received], g_rdma_recv_block_size) < 0) { - _rbuf[_rq_received].clear(); - return -1; +#if BRPC_WITH_GDR + if (_use_gdr) { + if (DoPostRecvGDR(_rbuf_data[_rq_received], g_gdr_recv_block_size, lkey) < 0) { + _rbuf[_rq_received].clear(); + return -1; + } + } else +#endif // if BRPC_WITH_GDR + { + if (DoPostRecv(_rbuf_data[_rq_received], g_rdma_recv_block_size) < 0) { + _rbuf[_rq_received].clear(); + return -1; + } } + --num; ++_rq_received; if (_rq_received == _rq_size) { @@ -1659,6 +1721,14 @@ void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector) con << connector << "rdma_unsignaled_sq_wr=" << _sq_unsignaled; } +int RdmaEndpoint::GlobalGdrInitialize() { +#if BRPC_WITH_GDR + g_gdr_recv_block_size = butil::gdr::GetGdrBlockSize() - IOBUF_BLOCK_HEADER_LEN; + LOG(INFO) << "g_gdr_recv_block_size: " << g_gdr_recv_block_size; +#endif // BRPC_WITH_GDR + return 0; +} + int RdmaEndpoint::GlobalInitialize() { g_rdma_recv_block_size = GetRdmaBlockSize() - IOBUF_BLOCK_HEADER_LEN; if (g_rdma_recv_block_size <= 0) { diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 54a008f1f7..d6e891903e 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -31,7 +31,6 @@ #include "butil/containers/mpsc_queue.h" #include "brpc/socket.h" - namespace brpc { class Socket; namespace rdma { @@ -75,15 +74,21 @@ class BAIDU_CACHELINE_ALIGNMENT RdmaEndpoint : public SocketUser { friend class RdmaConnect; friend class Socket; public: - explicit RdmaEndpoint(Socket* s); + explicit RdmaEndpoint(Socket* s, bool use_gdr = false); ~RdmaEndpoint() override; - // Global initialization + // Global Rdma initialization // Return 0 if success, -1 if failed and errno set static int GlobalInitialize(); + // Global Gdr initialization + // Return 0 if success, -1 if failed and errno set + static int GlobalGdrInitialize(); + static void GlobalRelease(); + bool use_gdr() { return _use_gdr; } + // Reset the endpoint (for next use) void Reset(); @@ -177,6 +182,16 @@ friend class Socket; // -1: failed, errno set int DoPostRecv(void* block, size_t block_size); + // Post a WR pointing to the gpu block to the local Recv Queue + // Arguments: + // block: the gpu addr to receive data (ibv_sge.addr) + // block_size: the maximum length can be received (ibv_sge.length) + // lkey: the lkey of block + // Return: + // 0: success + // -1: failed, errno set + int DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey); + // Read at most len bytes from fd in _socket to data // wait for _read_butex if encounter EAGAIN // return -1 if encounter other errno (including EOF) @@ -222,6 +237,9 @@ friend class Socket; // Not owner Socket* _socket; + // whether open gpu direct rdma + bool _use_gdr; + // State of Handshake State _state; diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 96348902f8..e04c4756e9 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -25,6 +25,9 @@ #include "butil/containers/flat_map.h" // butil::FlatMap #include "butil/fd_guard.h" #include "butil/fd_utility.h" // butil::make_non_blocking +#if BRPC_WITH_GDR +#include "butil/gpu/gpu_block_pool.h" +#endif #include "butil/logging.h" #include "brpc/socket.h" #include "brpc/rdma/block_pool.h" @@ -86,6 +89,8 @@ static uint16_t g_lid; static int g_max_sge = 0; static uint8_t g_port_num = 1; +static int g_gpu_index = 0; + static int g_comp_vector_index = 0; butil::atomic g_rdma_available(false); @@ -95,7 +100,7 @@ DEFINE_string(rdma_device, "", "The name of the HCA device used " "(Empty means using the first active device)"); DEFINE_int32(rdma_port, 1, "The port number to use. For RoCE, it is always 1."); DEFINE_int32(rdma_gid_index, -1, "The GID index to use. -1 means using the last one."); - +DEFINE_int32(gpu_index, 0, "The GPU device index to use. In GDR, we suggest to use the GPU that is connected to the same PCIe switch with rdma devices"); // static const size_t SYSFS_SIZE = 4096; static ibv_device** g_devices = NULL; static ibv_context* g_context = NULL; @@ -588,7 +593,26 @@ static void GlobalRdmaInitializeOrDieImpl() { g_rdma_available.store(true, butil::memory_order_relaxed); } +static void GlobalGdrInitializeOrDieImpl() { +#if BRPC_WITH_GDR + g_gpu_index = FLAGS_gpu_index; + + if (!butil::gdr::InitGPUBlockPool(g_gpu_index, GetRdmaPd())) { + PLOG(ERROR) << "Fail to initialize RDMA GPU memory pool"; + ExitWithError(); + } + if (RdmaEndpoint::GlobalGdrInitialize() < 0) { + LOG(ERROR) << "gdr_block_size_kb incorrect " + << "(must be larger than 0)"; + ExitWithError(); + } + +#endif // if BRPC_WITH_GDR + +} + static pthread_once_t initialize_rdma_once = PTHREAD_ONCE_INIT; +static pthread_once_t initialize_gdr_once = PTHREAD_ONCE_INIT; void GlobalRdmaInitializeOrDie() { if (pthread_once(&initialize_rdma_once, @@ -598,6 +622,14 @@ void GlobalRdmaInitializeOrDie() { } } +void GlobalGdrInitializeOrDie() { + if (pthread_once(&initialize_gdr_once, + GlobalGdrInitializeOrDieImpl) != 0) { + LOG(FATAL) << "Fail to pthread_once GlobalGdrInitializeOrDie"; + exit(1); + } +} + uint32_t RegisterMemoryForRdma(void* buf, size_t len) { ibv_mr* mr = IbvRegMr(g_pd, buf, len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_RELAXED_ORDERING); if (!mr) { @@ -691,6 +723,10 @@ uint8_t GetRdmaPortNum() { return g_port_num; } +int GetGPUIndex() { + return g_gpu_index; +} + bool IsRdmaAvailable() { return g_rdma_available.load(butil::memory_order_acquire); } diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 052763325b..06cbb1f5c2 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -33,6 +33,10 @@ namespace rdma { // Exit if failed void GlobalRdmaInitializeOrDie(); +// Initialize GDR environment +// Exit if failed +void GlobalGdrInitializeOrDie(); + // Initialize RDMA polling mode with tag bool InitPollingModeWithTag(bthread_tag_t tag, std::function callback = nullptr, @@ -74,6 +78,9 @@ int GetRdmaCompVector(); // Return current port number used uint8_t GetRdmaPortNum(); +// Get GPU index used +int GetGPUIndex(); + // Get max_sge supported by the device int GetRdmaMaxSge(); diff --git a/src/brpc/rdma_transport.cpp b/src/brpc/rdma_transport.cpp index 88d89a7b06..97b231ddcc 100644 --- a/src/brpc/rdma_transport.cpp +++ b/src/brpc/rdma_transport.cpp @@ -29,10 +29,13 @@ DECLARE_bool(usercode_in_pthread); extern SocketVarsCollector *g_vars; -void RdmaTransport::Init(Socket *socket, const SocketOptions &options) { +void RdmaTransport::DoInit(Socket *socket, const SocketOptions &options, bool use_gdr) { CHECK(_rdma_ep == NULL); - if (options.socket_mode == SOCKET_MODE_RDMA) { - _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket); + // gdr mode is a special mode of rdma mode. + // both rdma mode and gdr mode need init rdma::RdmaEndpoint. + if (options.socket_mode == SOCKET_MODE_RDMA || + options.socket_mode == SOCKET_MODE_GDR) { + _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket, use_gdr); if (!_rdma_ep) { const int saved_errno = errno; PLOG(ERROR) << "Fail to create RdmaEndpoint"; @@ -54,6 +57,10 @@ void RdmaTransport::Init(Socket *socket, const SocketOptions &options) { _tcp_transport->Init(socket, options); } +void RdmaTransport::Init(Socket *socket, const SocketOptions &options) { + DoInit(socket, options, false); +} + void RdmaTransport::Release() { if (_rdma_ep) { delete _rdma_ep; diff --git a/src/brpc/rdma_transport.h b/src/brpc/rdma_transport.h index 65ae88f7a6..bb579c6ac5 100644 --- a/src/brpc/rdma_transport.h +++ b/src/brpc/rdma_transport.h @@ -29,6 +29,7 @@ class RdmaTransport : public Transport { friend class rdma::RdmaEndpoint; friend class rdma::RdmaConnect; public: + void DoInit(Socket* socket, const SocketOptions& options, bool use_gdr); void Init(Socket* socket, const SocketOptions& options) override; void Release() override; int Reset(int32_t expected_nref) override; @@ -62,4 +63,4 @@ class RdmaTransport : public Transport { }; } // namespace brpc #endif // BRPC_WITH_RDMA -#endif //BRPC_RDMA_TRANSPORT_H \ No newline at end of file +#endif //BRPC_RDMA_TRANSPORT_H diff --git a/src/brpc/socket_mode.h b/src/brpc/socket_mode.h index b5d42be4aa..9fb0276efa 100644 --- a/src/brpc/socket_mode.h +++ b/src/brpc/socket_mode.h @@ -20,7 +20,8 @@ namespace brpc { enum SocketMode { SOCKET_MODE_TCP = 0, - SOCKET_MODE_RDMA = 1 + SOCKET_MODE_RDMA = 1, + SOCKET_MODE_GDR = 2 }; } // namespace brpc -#endif //BRPC_SOCKET_MODE_H \ No newline at end of file +#endif //BRPC_SOCKET_MODE_H diff --git a/src/brpc/transport_factory.cpp b/src/brpc/transport_factory.cpp index b689e2edd2..76623f505c 100644 --- a/src/brpc/transport_factory.cpp +++ b/src/brpc/transport_factory.cpp @@ -18,6 +18,7 @@ #include "brpc/transport_factory.h" #include "brpc/tcp_transport.h" #include "brpc/rdma_transport.h" +#include "brpc/gdr_transport.h" namespace brpc { int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, const void* _options) { @@ -28,6 +29,15 @@ int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, const else if (mode == SOCKET_MODE_RDMA) { return RdmaTransport::ContextInitOrDie(serverOrNot, _options); } +#endif +#if BRPC_WITH_GDR + else if (mode == SOCKET_MODE_GDR) { + // gdr is a special case of rdma, so we should init rdma first; + if (RdmaTransport::ContextInitOrDie(serverOrNot, _options) < 0) { + return -1; + } + return GdrTransport::GdrContextInitOrDie(); + } #endif else { LOG(ERROR) << "unknown transport type " << mode; @@ -43,10 +53,15 @@ std::unique_ptr TransportFactory::CreateTransport(SocketMode mode) { else if (mode == SOCKET_MODE_RDMA) { return std::unique_ptr(new RdmaTransport()); } +#endif +#if BRPC_WITH_GDR + else if (mode == SOCKET_MODE_GDR) { + return std::unique_ptr(new GdrTransport()); + } #endif else { LOG(ERROR) << "socket_mode set error"; return nullptr; } } -} // namespace brpc \ No newline at end of file +} // namespace brpc diff --git a/src/butil/gpu/gpu_block_pool.cpp b/src/butil/gpu/gpu_block_pool.cpp new file mode 100644 index 0000000000..86a307ba06 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.cpp @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include +#include +#include +#include +#include "butil/fast_rand.h" +#include "gpu_block_pool.h" +namespace butil { +namespace gdr { +DEFINE_int32(gdr_block_size_kb, 512, "gdr block size in KB"); +DEFINE_int32(max_gdr_regions, 32, "max num of gdr regions"); + +#define CHECK_CUDA(call) \ +do { \ + auto _sts = (call); \ + if (_sts != cudaSuccess) { \ + LOG(FATAL) << " cuda error:" \ + << (cudaGetErrorString(_sts)) << std::string(" at ") \ + << __FILE__ << ": " << __LINE__; \ + } \ +} while (0); + +size_t GetGdrBlockSize() { + return FLAGS_gdr_block_size_kb * 1024; +} + +bool verify_same_context() { + static int original_device = -1; + static bool first_call = true; + + int current_device; + cudaGetDevice(¤t_device); + + if (first_call) { + original_device = current_device; + first_call = false; + return true; + } + + return (current_device == original_device); +} + +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + void *d_data; + + LOG(INFO) << "try to alloc " << gpu_mem_size << " bytes from gpu " << gpu_id; + + CHECK_CUDA(cudaMalloc(&d_data, gpu_mem_size)); + cudaDeviceSynchronize(); + return (void *)d_data; +} + +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + + LOG(INFO) << "try to alloc " << cpu_mem_size << " bytes from gpu " << gpu_id << "on host"; + + void* mem = NULL; + + CHECK_CUDA(cudaMallocHost(&mem, cpu_mem_size)); + + cudaDeviceSynchronize(); + + return mem; +} + + +BlockPoolAllocators* BlockPoolAllocators::instance_ = nullptr; + +BlockPoolAllocators* BlockPoolAllocators::singleton() { + static std::mutex mutex; + if (instance_ == nullptr) { + std::lock_guard l(mutex); + if (instance_ == nullptr) { + instance_ = new BlockPoolAllocators(); + std::atomic_thread_fence(std::memory_order_release); + } + } + std::atomic_thread_fence(std::memory_order_acquire); + return instance_; +} + +void BlockPoolAllocators::init(int gpu_id, ibv_pd* pd) { + LOG(INFO) << "set GPU BlockPoolAllocator for " << gpu_id; + size_t region_size = 1024LL * 1024 * 1024; + size_t block_size = FLAGS_gdr_block_size_kb * 1024; + gpu_mem_alloc = new BlockPoolAllocator(gpu_id, true, pd, block_size, region_size); + + region_size = 32LL * 1024 * 1024; + block_size = 512; + cpu_mem_alloc = new BlockPoolAllocator(gpu_id, false, pd, block_size, region_size); + + gpu_stream_pool = new GPUStreamPool(gpu_id); +} + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd) { + BlockPoolAllocators::singleton()->init(gpu_id, pd); + return true; +} + +class BlockHeaderList { + public: + BlockHeaderList() { + objects_.reserve(kMaxObjects); + } + virtual ~BlockHeaderList() { + for (size_t i = 0; i < objects_.size(); i++) { + delete objects_[i]; + } + } + + BlockHeader* New() { + { + std::lock_guard lock(mu_); + if (!objects_.empty()) { + BlockHeader* result = objects_.back(); + objects_.pop_back(); + return result; + } + } + return new BlockHeader; + } + void Release(BlockHeader* obj) { + obj->Reset(); + { + std::lock_guard lock(mu_); + if (objects_.size() < kMaxObjects) { + objects_.push_back(obj); + return; + } + } + delete obj; + } + + private: + static const int kMaxObjects = 100000; + + std::mutex mu_; + std::vector objects_; +}; + +static BlockHeaderList* get_bh_list() { + static BlockHeaderList* bh_list = new BlockHeaderList(); + return bh_list; +} + +BlockPoolAllocator::BlockPoolAllocator(int gpuId, bool onGpu, ibv_pd* ibvPd, + size_t blockSize, size_t regionSize) : + gpu_id(gpuId) + , on_gpu(onGpu) + , pd(ibvPd) + , BLOCK_SIZE(std::max(blockSize, sizeof(BlockHeader))) + , REGION_SIZE((regionSize / blockSize) * blockSize) // 对齐到块大小的倍数 + , freeList(nullptr) + , g_region_num(0) + , totalAllocated(0) + , totalDeallocated(0) + , peakUsage(0) { + g_regions.resize(FLAGS_max_gdr_regions); + LOG(INFO) << "Memory Pool initialized: block_size=" << BLOCK_SIZE + << ", region_size=" << REGION_SIZE << ", max_gdr_regions=" << FLAGS_max_gdr_regions + << ", gpu_id=" << gpu_id << ", on_gpu=" << on_gpu << ", pd=" << pd; + + extendRegion(); +} + +BlockPoolAllocator::~BlockPoolAllocator() { +#ifdef DEBUG + printStatistics(); +#endif + + for (int i = 0; i < FLAGS_max_gdr_regions; i++) { + Region* r = &g_regions[i]; + if (!r->mr) { + return; + } + + LOG(INFO) << "try to free " << r->size << " bytes from gpu " << gpu_id << ", on_gpu " << on_gpu; + ibv_dereg_mr(r->mr); + if (on_gpu) { + CHECK_CUDA(cudaFree(reinterpret_cast(r->start))); + } else { + CHECK_CUDA(cudaFreeHost(reinterpret_cast(r->start))); + } + } +} + +Region* BlockPoolAllocator::GetRegion(const void* buf) { + if (!buf) { + errno = EINVAL; + return NULL; + } + Region* r = NULL; + uintptr_t addr = (uintptr_t)buf; + for (int i = 0; i < FLAGS_max_gdr_regions; ++i) { + if (g_regions[i].aligned_start == 0) { + break; + } + if (addr >= g_regions[i].aligned_start && + addr < g_regions[i].aligned_start + g_regions[i].aligned_size) { + r = &g_regions[i]; + break; + } + } + return r; +} + +uint32_t BlockPoolAllocator::get_lkey(const void* buf) { + Region* r = GetRegion(buf); + if (!r) { + LOG(ERROR) << "can not get a region for buf " << buf; + return 0; + } + + if (!r->mr) { + LOG(FATAL) << "region has not been registered into rdma yet, addr:" << r->start; + return 0; + } + + return r->mr->lkey; +} + +void* BlockPoolAllocator::AllocateRaw(size_t num_bytes) { + if (num_bytes == 0) { + return nullptr; + } + if (num_bytes > BLOCK_SIZE) { + LOG(FATAL) << "try to alloc " << num_bytes << " bytes, its bigger than block_size " << BLOCK_SIZE; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + + std::lock_guard lock(poolMutex); + + if (!freeList) { + extendRegion(); + } + + BlockHeader* block = freeList; + freeList = freeList->next; + + void* addr = block->addr; + get_bh_list()->Release(block); + + totalAllocated++; + peakUsage = std::max(peakUsage, totalAllocated - totalDeallocated); + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + +#ifdef DEBUG + if (duration.count() > 1000) { + LOG(INFO) << "Slow allocation: " << duration.count() << " ns"; + } +#endif + + return addr; +} + +void BlockPoolAllocator::DeallocateRaw(void* ptr) { + if (!ptr) return; + + std::lock_guard lock(poolMutex); + + BlockHeader* block = get_bh_list()->New(); + block->addr = ptr; + block->next = freeList; + freeList = block; + + totalDeallocated++; +} + +void BlockPoolAllocator::printStatistics() const { + LOG(INFO) << "=== Memory Pool Statistics ==="; + LOG(INFO) << "Total regions: " << g_region_num + << ", Total blocks allocated: " << totalAllocated + << ", Total blocks deallocated: " << totalDeallocated + << ", Current usage: " << (totalAllocated - totalDeallocated) << " blocks" + << ", Peak usage: " << peakUsage << " blocks" + << ", Memory efficiency: " + << (static_cast(totalAllocated - totalDeallocated) / + (g_region_num * (REGION_SIZE / BLOCK_SIZE)) * 100) + << "%"; +} + +void BlockPoolAllocator::extendRegion() { + if (g_region_num == FLAGS_max_gdr_regions) { + LOG(FATAL) << "Gdr Memory pool reaches max regions"; + return ; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + void* ptr = nullptr; + void* aligned_ptr = nullptr; + int alignment = 4096; + + if (on_gpu) { + ptr = get_gpu_mem(gpu_id, REGION_SIZE); + } else { + ptr = get_cpu_mem(gpu_id, REGION_SIZE); + } + + aligned_ptr = (void*)(((uintptr_t)ptr + alignment - 1) & ~(alignment - 1)); + + int64_t aligned_bytes = REGION_SIZE; + if (ptr != aligned_ptr) { + uintptr_t region_end = uintptr_t(ptr) + REGION_SIZE; + uintptr_t aligned_end_ptr = region_end & ~(alignment - 1); + aligned_bytes = uintptr_t(aligned_end_ptr) - uintptr_t(aligned_ptr); + LOG(WARNING) << "addr is not aligned with 4096: " << ptr << ", aligned_bytes: " << aligned_bytes + << ", region_size: " << REGION_SIZE; + } + + LOG(INFO) << "reg_mr for ptr: " << aligned_ptr << ", size:" << aligned_bytes; + auto mr = ibv_reg_mr(pd, aligned_ptr, aligned_bytes, + IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE | + IBV_ACCESS_RELAXED_ORDERING); + + if (!mr) { + LOG(FATAL) << "Failed to register MR: " << strerror(errno) + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } else { + LOG(INFO) << "Success to register MR: " + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } + + LOG(INFO) << "try to init region, g_region_num:" << g_region_num; + size_t blockCount = aligned_bytes / BLOCK_SIZE; + Region* region = &g_regions[g_region_num++]; + region->start = (uintptr_t)ptr; + region->aligned_start = (uintptr_t)aligned_ptr; + region->mr = mr; + region->size = REGION_SIZE; + region->aligned_size = aligned_bytes; + region->blockCount = blockCount; + + + LOG(INFO) << "try to insert list, freeList:" << freeList << ", blockCount:" << blockCount; + BlockHeader* lastBlock = nullptr; + for (size_t i = 0; i < blockCount; ++i) { + BlockHeader* block = get_bh_list()->New(); + block->addr = reinterpret_cast(static_cast(aligned_ptr) + i * BLOCK_SIZE); + if (lastBlock != nullptr) { + lastBlock->next = block; + } else { + freeList = block; + } + lastBlock = block; + } + + if (lastBlock) { + lastBlock->next = nullptr; + } + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + + LOG(INFO) << "Extended region #" << g_region_num << ": " << blockCount + << " blocks (" << (REGION_SIZE / (1024 * 1024)) << " MB)" << ", on_gpu " << on_gpu + << ", cost " << duration.count() << " ns"; +} + +GPUStreamPool::GPUStreamPool(int gpu_id) : + gpu_id_(gpu_id) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + d2d_streams_.resize(kMaxConcurrent); + d2h_streams_.resize(kMaxConcurrent); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamCreate(&d2d_streams_[i])); + CHECK_CUDA(cudaStreamCreate(&d2h_streams_[i])); + } + CHECK_CUDA(cudaDeviceSynchronize()); +} + +GPUStreamPool::~GPUStreamPool() { + CHECK_CUDA(cudaDeviceSynchronize()); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamDestroy(d2d_streams_[i])); + CHECK_CUDA(cudaStreamDestroy(d2h_streams_[i])); + } +} + +void GPUStreamPool::fast_d2d(std::vector& src_list, + std::vector& length_list, + void* dst) { +#ifdef DEBUG + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } +#endif + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2d_lb_lock_); + d2d_cnt_.fetch_add(1); + stream_idx = d2d_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2d_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToDevice, d2d_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); +} + +void GPUStreamPool::fast_d2h(std::vector& src_list, + std::vector& length_list, + void* dst) { + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2h_lb_lock_); + d2h_cnt_.fetch_add(1); + stream_idx = d2h_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2h_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToHost, d2h_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); +} + +} +} + +#endif // BRPC_WITH_GDR diff --git a/src/butil/gpu/gpu_block_pool.h b/src/butil/gpu/gpu_block_pool.h new file mode 100644 index 0000000000..655c487a92 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.h @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef BUTIL_GPU_GPU_BLOCK_POOL_H +#define BUTIL_GPU_GPU_BLOCK_POOL_H + +#if BRPC_WITH_GDR + +#include +#include +#include +#include +#include +#include +#include +#include "butil/containers/hash_tables.h" +#include "butil/logging.h" +#include +#include "cuda.h" + +namespace butil { +namespace gdr { + +size_t GetGdrBlockSize(); +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size); +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size); + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd); + +struct Region { + Region() { start = 0; aligned_start = 0;} + uintptr_t start; + uintptr_t aligned_start; + + size_t size; + size_t aligned_size; + size_t blockCount; + struct ibv_mr *mr {nullptr}; +}; + +struct BlockHeader { + BlockHeader() { addr = nullptr; next = nullptr;} + void Reset() { addr = nullptr; next = nullptr; } + void* addr; + BlockHeader* next; +}; + +class BlockPoolAllocator { + private: + int gpu_id; + bool on_gpu; + ibv_pd* pd {nullptr}; + + const size_t BLOCK_SIZE; + const size_t REGION_SIZE; + + BlockHeader* freeList; + int g_region_num {0}; + std::vector g_regions; + std::mutex poolMutex; + + // stat + size_t totalAllocated; + size_t totalDeallocated; + size_t peakUsage; + + public: + explicit BlockPoolAllocator(int gpuId, + bool onGpu, ibv_pd* ibvPd, + size_t blockSize, size_t regionSize); + + ~BlockPoolAllocator(); + + void* AllocateRaw(size_t num_bytes); + + void DeallocateRaw(void* ptr); + + void printStatistics() const; + + int64_t getCurrentUsage() const { + return totalAllocated - totalDeallocated; + } + + int64_t getTotalMemory() const { + return g_region_num * REGION_SIZE; + } + + int64_t get_block_size() const { + return BLOCK_SIZE; + } + + Region* GetRegion(const void* buf); + + uint32_t get_lkey(const void* buf); + + private: + void extendRegion(); +}; + +class GPUStreamPool { +public: + explicit GPUStreamPool(int gpu_id); + + ~GPUStreamPool(); + + GPUStreamPool(const GPUStreamPool&) = delete; + GPUStreamPool& operator=(const GPUStreamPool&) = delete; + + void fast_d2h(std::vector& src_list, std::vector& length_list, void* dst); + + void fast_d2d(std::vector& src_list, std::vector& length_list, void* dst); + + static constexpr int kMaxConcurrent = 32; +private: + int gpu_id_ {-1}; + std::atomic d2h_cnt_ {0}; + std::atomic d2d_cnt_ {0}; + std::mutex d2h_locks_[kMaxConcurrent]; + std::mutex d2d_locks_[kMaxConcurrent]; + std::mutex d2h_lb_lock_; + std::mutex d2d_lb_lock_; + std::vector d2h_streams_; + std::vector d2d_streams_; +}; + +class BlockPoolAllocators { +public: + static BlockPoolAllocators* singleton(); + BlockPoolAllocators() {} + virtual ~BlockPoolAllocators() { + CHECK_EQ(this, instance_); + instance_ = nullptr; + } + + void init(int gpu_id, ibv_pd* pd); + + BlockPoolAllocator* get_gpu_allocator() { + return gpu_mem_alloc; + } + + BlockPoolAllocator* get_cpu_allocator() { + return cpu_mem_alloc; + } + + GPUStreamPool* get_gpu_stream_pool() { + return gpu_stream_pool; + } + +public: + static BlockPoolAllocators* instance_; + +private: + BlockPoolAllocator* gpu_mem_alloc {nullptr}; + BlockPoolAllocator* cpu_mem_alloc {nullptr}; + GPUStreamPool* gpu_stream_pool {nullptr}; +}; +} +} + +#endif // BRPC_WITH_GDR + +#endif diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index af77d968cf..77bf5c9fcc 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -40,6 +40,9 @@ #include "butil/fd_guard.h" // butil::fd_guard #include "butil/iobuf.h" #include "butil/iobuf_profiler.h" +#ifdef BRPC_WITH_GDR +#include "butil/gpu/gpu_block_pool.h" +#endif namespace butil { static size_t default_block_size = 8192; @@ -737,6 +740,46 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) { return saved_n; } +#if BRPC_WITH_GDR +size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) { + if (n == 0) { + return 0; + } + + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + bool alloc_from_host_alloc = (n <= host_allocator->get_block_size()); + void* mem = NULL; + if (alloc_from_host_alloc) { + mem = host_allocator->AllocateRaw(n); + } else { + mem = malloc(n); + } + + if (mem == NULL) { + return 0; + } + size_t saved_n = copy_from_gpu(mem, n, 0, false); + if (saved_n > 0) { + if (alloc_from_host_alloc) { + auto deleter = [host_allocator](void* data) { host_allocator->DeallocateRaw(data); }; + out->append_user_data(mem, saved_n, deleter); + } else { + auto deleter = [](void* data) { free(data); }; + out->append_user_data(mem, saved_n, deleter); + } + pop_front(saved_n); + } else { + if (alloc_from_host_alloc) { + host_allocator->DeallocateRaw(mem); + } else { + free(mem); + } + } + + return saved_n; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::cutn(void* out, size_t n) { const size_t len = length(); if (n > len) { @@ -1170,6 +1213,15 @@ uint64_t IOBuf::get_first_data_meta() { return r.block->u.data_meta; } +void* IOBuf::get_first_data_ptr() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + return r.block->data; +} + + int IOBuf::resize(size_t n, char c) { const size_t saved_len = length(); if (n < saved_len) { @@ -1332,6 +1384,46 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos) const { return n - m; } +#if BRPC_WITH_GDR +size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const { + if (n == 0) { + return 0; + } + const size_t nref = _ref_num(); + // Skip `pos' bytes. `offset' is the starting position in starting BlockRef. + size_t offset = pos; + size_t i = 0; + for (; offset != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + if (offset < (size_t)r.length) { + break; + } + offset -= r.length; + } + + butil::gdr::GPUStreamPool* gpu_stream_pool = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_stream_pool(); + size_t m = n; + std::vector src_list; + std::vector length_list; + for (; m != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + const size_t nc = std::min(m, (size_t)r.length - offset); + void* gpu_src = r.block->data + r.offset + offset; + src_list.push_back(gpu_src); + length_list.push_back(nc); + offset = 0; + m -= nc; + } + if (to_gpu) { + gpu_stream_pool->fast_d2d(src_list, length_list, d); + } else { + gpu_stream_pool->fast_d2h(src_list, length_list, d); + } + // If nref == 0, here returns 0 correctly + return n - m; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const { const size_t len = length(); if (len <= pos) { @@ -1478,6 +1570,16 @@ bool IOBuf::equals(const butil::IOBuf& other) const { return true; } +#if BRPC_WITH_GDR +// when IOBuf is used for send, data_meta is set by user; +// when IOBf is used for recv and gdr is open, data_meta is set by brpc +// and it is lkey. +bool IOBuf::is_gpu_memory() { + uint64_t data_meta = get_first_data_meta(); + return (data_meta > 0 && data_meta <= UINT_MAX); +} +#endif + ////////////////////////////// IOPortal ////////////////// IOPortal::~IOPortal() { return_cached_blocks(); } diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 978aa34fe3..487279c2a9 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -144,6 +144,13 @@ friend class SingleIOBuf; size_t cutn(IOBuf* out, size_t n); size_t cutn(void* out, size_t n); size_t cutn(std::string* out, size_t n); + +#if BRPC_WITH_GDR + size_t cutn_from_gpu(IOBuf* out, size_t n); + size_t copy_from_gpu(void* d, size_t n, size_t pos = 0, bool to_gpu = false) const; + bool is_gpu_memory(); +#endif // BRPC_WITH_GDR + // Cut off 1 byte from the front side and set to *c // Return true on cut, false otherwise. bool cut1(void* c); @@ -263,6 +270,9 @@ friend class SingleIOBuf; // 0 means the meta is invalid. uint64_t get_first_data_meta(); + // Get the data addr of the first byte in this IOBuf. + void* get_first_data_ptr(); + // Resizes the buf to a length of n characters. // If n is smaller than the current length, all bytes after n will be // truncated.