Skip to content

Conversation

@Q1ngbo
Copy link
Contributor

@Q1ngbo Q1ngbo commented Jan 15, 2026

What problem does this PR solve?

Issue Number: resolve #2354 #978

Problem Summary:
Hi, 我基于先前的commit #3062 完成了flatbuffers协议对brpc的适配,准备将完整实现提交至社区。在brpc中增加flatbuffers(fb)支持可分为fb message构造与fb协议处理两部分。前者解决的问题是如何使用flatbuffers库提供的接口去创建一个message,以及应用接收到message后如何从中读取出数据;后者解决的是brpc如何处理"fb"协议。因此,我将相关实现拆分为了两个提交。本提交重点在于message构造。

我之所以将message构造提交至brpc仓库中而不是像grpc一样在google/flatbuffers中实现,主要出于两点考虑:首先,为了与IOBuf兼容,创建message使用的底层数据结构为之前已经提交的SingleIOBuf,在flatbuffers中使用该数据结构会导致循环依赖;其次,flatbuffers提供了完善的接口,只需要在brpc里定义好内存分配器,即可调用相关接口来构造消息,实现上很方便。

特别说明:

  1. 本次提交主要是为了征求各位评审老师的意见,暂未添加单测。
  2. 对于google/flatbuffers的修改需要与brpc中部分实现保持一致,所以针对flatbuffers仓库的修改暂未提交。
  3. 为了便于大家理解,我在example中增加了示例程序bechmark_fb,后续正式入库前将移除它,该目录下的test_generated.h, test.brpc.fb.h与test.brpc.fb.cpp正式由flatbuffers的编译工具flatc基于test.fbs自动生成的。基于该程序,在我们的测试环境下,flatbuffers协议相较于"baidu_std"协议在32~8KB大小的消息上QPS有20%+的提升。

Check List:

Key components:
- flatbuffers_common.h: Defines abstract interfaces (RpcChannel and Service)
  for FlatBuffers-based RPC communication, similar to protobuf's RPC interfaces.
- flatbuffers_impl.h: Implements FlatBuffers-specific message handling:
  * SlabAllocator: Custom allocator using SingleIOBuf for zero-copy operations.
  * Message: Wrapper for FlatBuffers messages with SingleIOBuf storage.
  * MessageBuilder: BRPC-specific FlatBufferBuilder with SlabAllocator.
  * ServiceDescriptor/MethodDescriptor: Service introspection support.
- flatbuffers_impl.cpp: Implementation of allocation, serialization, and
  service descriptor initialization logic
@Q1ngbo
Copy link
Contributor Author

Q1ngbo commented Jan 15, 2026

为了便于理解,我想对flatbuffers构造消息的基本流程进行简要补充说明,该示例程序的Scheme文件(类似于.proto文件)如下。

table EchoRequest {
    opcode:int;
    attachment:int;
}

Message需要通过MessageBuilder创建(定义在flatbuffers_impl.h中),该类继承了::flatbuffers::FlatBufferBuilder,它才是实际起作用的类,MessageBuilder的主要作用是向它传递自定义的内存分配器(SlabAllocator)。应用在创建message时,需要先定义一个MessageBuilder,然后基于它来添加元素,最终调用ReleaseMessage方法来获取完整消息,以下是一段示例代码。

brpc::flatbuffers::MessageBuilder mb;
auto message = mb.CreateString(g_request);
auto req = test::CreateBenchmarkRequest(mb, 123, 333, 1111, 2222, 0, message);
mb.Finish(req);
brpc::flatbuffers::Message request = mb.ReleaseMessage();

在构造message过程中,flatbuffers会使用一段连续buffer存储数据和metadata,在代码里,该buffer使用vector_downward表示,由FlatBufferBuilder管理。该Buffer维护了scratch_和cur_两个指针。在初次向buffer中添加数据时,会申请max(需要长度,1024)字节的内存,之后会使用cur_将数据添加在buffer末尾,将偏移量信息(FieldLoc)添加在buffer起始位置,如下图所示。如果在添加过程中buffer不够了,则会重新分配一个足够大的buffer,并将数据按序拷贝过去,所以或许可以配置一个GFlag来调整初次分配buffer的大小(TODO)。
image

brpc只需要定义好内存分配器SlabAllocator,在添加元素的过程中,当内存不足时flatbuffers会调用allocate或reallocate_downward,这实际上会调用SingleIOBuf中的相关方法。我们发现将rpc请求的header部分与message连续存储可获得明显性能提升。因此,在申请buffer时我们在Buffer前面预留出了一部分空间,其大小由宏DEFAULT_RESERVE_SIZE指定,默认为64B(也可通过GFlag配置,TODO)。这部分内存的消费者是brpc中的协议处理逻辑,在创建message过程中不会被使用到。
在完成数据添加后,应用需调用Finish方法完成创建,这一步flatbuffers会创建vtable,其主要执行逻辑为:1. 根据data大小为vtable申请空间;2. 从buffer起始处取出FieldLoc,将实际数据相对于"Table Data"起始地址的偏移量写入到vtable中;3. 在table data部分开头记录该位置相对于vtable开头的偏移量;4. 填充vtable中的vtable总长度、data总长度;5. 将"Table Data"相对于有效起始地址的偏移写入开头。vtable不是从buffer起始地址、而是从cur_左侧构建的,所以vtable和table data仍然是连续的,vtable左侧多余空间会被释放掉。以上面提到的EchoRequest为例,最终创建的完整内存布局如下所示(图中没画出为header部分预留的空间):
image
应用调用ReleaseMessage所获取到的Message中包含了该连续buffer对应的SingleIOBuf,该函数的执行逻辑为:1. 从vector_downward(buffer)中获取前面构造的msg地址;2. 从SingleIOBuf中获取分配该buffer所使用的block起始地址;3. 计算msg在block中的偏移,基于它创建一个BlockRef;4. 通过BlockRef创建SingleIOBuf,它会作为Message的一个成员变量。至此,创建message的过程就结束了。

flatbuffers读取消息的本质是进行间接寻址。参考上图,当读取EchoRequest中的opcode时,首先从buffer起始地址出读取Root Offset,获取Table Data相对于buffer起始地址的偏移量;然后从Table Data起始处获取Vtable的相对偏移量,再次跳转;然后从预先生成的FlatBuffersVTableOffset中获取opcode在vtable中的位置,访问它获取实际偏移量;当字段合法时,通过(Table Data + 偏移量)获取到实际值。该过程需要3次间接寻址。

message:string;
}

rpc_service BenchmarkService {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flatc 应该是不支持 rpc_service 的,请问这里是怎么处理的?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flatc 应该是不支持 rpc_service 的,请问这里是怎么处理的?

类似于为grpc实现的--grpc参数,可以给flatc增加了--brpc参数,来将其编译为brpc可用的service。如example/benchmark_fb/test.brpc.fb.h中的class BenchmarkService。但我尚未向flatbuffers提交pr,因为还可能需要根据brpc中实现进行相应修改。

Copy link
Contributor

@ivanallen ivanallen Jan 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fb 是否可以这样使用,显式的在 method 后指定 index?
对于从中间删除 method 会导致不兼容的情况,感觉是个隐患,用户不太容易注意到。

rpc_service BenchmarkService {
   Test1(BenchmarkRequest):BenchmarkResponse = 1;
   Test2(BenchmarkRequest):BenchmarkResponse = 2;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fb 是否可以这样使用,显式的在 method 后指定 index?

好主意!我可以修改下flatc。但为了兼容flatbuffers语法,fbs文件里需要这么写

rpc_service BenchmarkService {
  Test(BenchmarkRequest):BenchmarkResponse (id: 2);
  Test2(BenchmarkRequest):BenchmarkResponse (id: 5);
  Test3(BenchmarkRequest):BenchmarkResponse (id: 1);
}

最终可以在test.brpc.fb.cpp中生成如下代码。

switch (method->index()) {
  case 2:
    Test(controller, request, response, done);
    break;
  case 5:
    Test2(controller, request, response, done);
    break;
  case 1:
    Test3(controller, request, response, done);
    break;
  default:
    std::cout << "ERROR: " << "Bad method index; this should never happen."
              << std::endl;
    break;
}

这样后续任意删除method也能保持兼容性,而且不需要修改brpc中的处理逻辑

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request adds FlatBuffers message construction logic to brpc as part 2 of a 3-part implementation to support the FlatBuffers protocol. The PR introduces custom allocators, message builders, and service descriptors that integrate FlatBuffers with brpc's zero-copy IOBuf system.

Changes:

  • Adds FlatBuffers message construction infrastructure with custom SlabAllocator for zero-copy operations
  • Implements Message and MessageBuilder classes for FlatBuffers message lifecycle management
  • Provides ServiceDescriptor and MethodDescriptor classes for RPC method resolution
  • Fixes a bug in SingleIOBuf::assign() by moving reset() call to ensure proper cleanup in all paths

Reviewed changes

Copilot reviewed 11 out of 11 changed files in this pull request and generated 23 comments.

Show a summary per file
File Description
src/butil/single_iobuf.cpp Bug fix: moves reset() call before conditional to ensure proper cleanup in both code paths
src/brpc/details/flatbuffers_common.h Defines base interfaces (Service, RpcChannel) for FlatBuffers RPC integration
src/brpc/details/flatbuffers_impl.h Core implementation header with SlabAllocator, Message, MessageBuilder, and descriptor classes
src/brpc/details/flatbuffers_impl.cpp Implementation of allocators, message handling, and service descriptor parsing
example/benchmark_fb/test_generated.h Auto-generated FlatBuffers message definitions (temporary example)
example/benchmark_fb/test.fbs FlatBuffers schema for benchmark service (temporary example)
example/benchmark_fb/test.brpc.fb.h Auto-generated brpc service stub header (temporary example)
example/benchmark_fb/test.brpc.fb.cpp Auto-generated brpc service stub implementation (temporary example)
example/benchmark_fb/server.cpp Example benchmark server implementation (temporary example)
example/benchmark_fb/client.cpp Example benchmark client implementation (temporary example)
example/benchmark_fb/CMakeLists.txt Build configuration for benchmark example (temporary)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


namespace test {

static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL};
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential memory leak: The static array file_level_service_descriptors_my_2eproto[0] is allocated in parse_service_descriptors but never deallocated. This creates a permanent memory leak. Consider using a smart pointer or implementing proper cleanup, perhaps via an atexit handler or a static destructor pattern to ensure the ServiceDescriptor is properly deleted at program exit.

Suggested change
static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL};
static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL};
// Ensure the statically allocated ServiceDescriptor is cleaned up at program exit.
struct FileLevelServiceDescriptorsCleanup {
~FileLevelServiceDescriptorsCleanup() {
if (file_level_service_descriptors_my_2eproto[0] != NULL) {
delete file_level_service_descriptors_my_2eproto[0];
file_level_service_descriptors_my_2eproto[0] = NULL;
}
}
};
static FileLevelServiceDescriptorsCleanup file_level_service_descriptors_cleanup;

Copilot uses AI. Check for mistakes.
Comment on lines +55 to +56
if (table.service_name == "" || table.prefix == "" ||
table.method_name_list == "") {
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String comparison issue: Comparing std::string objects with "" using == is correct but using empty() method is more idiomatic and clearer. Consider changing 'table.service_name == ""' to 'table.service_name.empty()' (and similarly for prefix and method_name_list) for better code clarity and style consistency.

Suggested change
if (table.service_name == "" || table.prefix == "" ||
table.method_name_list == "") {
if (table.service_name.empty() || table.prefix.empty() ||
table.method_name_list.empty()) {

Copilot uses AI. Check for mistakes.
// deserialization without unnecessary memory copies.
class SlabAllocator : public ::flatbuffers::Allocator {
public:
SlabAllocator() {}
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uninitialized member variables: The SlabAllocator class has member variables _full_buf_head, _fb_begin_head, and _old_size_param that are not initialized in the default constructor. These should be initialized to safe default values (e.g., nullptr for pointers, 0 for size_t) to prevent undefined behavior if methods are called before allocate() is invoked.

Suggested change
SlabAllocator() {}
SlabAllocator()
: _full_buf_head(nullptr)
, _fb_begin_head(nullptr)
, _old_size_param(0) {}

Copilot uses AI. Check for mistakes.
const test::BenchmarkRequest* request = request_base->GetRoot<test::BenchmarkRequest>();
// Set Response Message
brpc::flatbuffers::MessageBuilder mb_;
const char *req_str = request->message()->c_str();
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential null pointer dereference: The request->message() call on line 29 can return nullptr if the message field is not set in the FlatBuffers message. Calling c_str() on a nullptr will cause a crash. Add a null check before accessing the string, or handle the case where message is not present.

Suggested change
const char *req_str = request->message()->c_str();
const auto* msg = request->message();
const char* req_str = msg ? msg->c_str() : "";

Copilot uses AI. Check for mistakes.
FB_BRPC_DISALLOW_EVIL_CONSTRUCTORS(MethodDescriptor);
};

bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, size_t meta_remaind = 0);
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling error in parameter name: The parameter name 'meta_remaind' should be 'meta_remain' or 'meta_remaining' for correct spelling.

Suggested change
bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, size_t meta_remaind = 0);
bool ParseFbFromIOBUF(Message* msg, size_t msg_size, const butil::IOBuf& buf, size_t meta_remaining = 0);

Copilot uses AI. Check for mistakes.
Comment on lines +153 to +155
Message::Message(const butil::SingleIOBuf &iobuf,
uint32_t meta_size = 0,
uint32_t msg_size = 0)
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Const-correctness issue: The default parameter values for meta_size and msg_size should not be specified in the constructor definition when they are private constructors. Default parameter values should only be declared in the header file (lines 97-103 in flatbuffers_impl.h), not redeclared in the implementation. This can cause confusion and potential mismatches.

Copilot uses AI. Check for mistakes.
Comment on lines +29 to +43
uint8_t *SlabAllocator::reallocate_downward(uint8_t *old_p,
size_t old_size,
size_t new_size,
size_t in_use_back,
size_t in_use_front) {
_old_size_param = new_size;
size_t new_real_size = new_size + DEFAULT_RESERVE_SIZE;

void* old_block = _iobuf.get_cur_block();
butil::SingleIOBuf::target_block_inc_ref(old_block);
_full_buf_head = (uint8_t *)_iobuf.reallocate_downward(new_real_size, 0, 0);
_fb_begin_head = _full_buf_head + DEFAULT_RESERVE_SIZE;
memcpy_downward(old_p, old_size, _fb_begin_head, new_size, in_use_back, in_use_front);
butil::SingleIOBuf::target_block_dec_ref(old_block);
return _fb_begin_head;
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing null pointer check: After calling _iobuf.reallocate_downward(new_real_size, 0, 0) on line 39, the return value should be checked for NULL before proceeding. If reallocation fails, _full_buf_head will be NULL, and subsequent operations including pointer arithmetic and memcpy_downward will result in undefined behavior. Add a null check and handle the error case appropriately, ensuring old_block reference is properly cleaned up.

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +108
Message &operator=(const Message &other) = delete;
private:
Message(const butil::SingleIOBuf &iobuf,
uint32_t meta_size,
uint32_t msg_size);

Message(const butil::IOBuf::BlockRef &ref,
uint32_t meta_size,
uint32_t msg_size);
friend class MessageBuilder;
public:
Message(Message &&other) = default;

Message(const Message &other) = delete;
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant copy constructor deletion: The Message class has both 'Message(const Message &other) = delete;' on line 108 and 'Message &operator=(const Message &other) = delete;' on line 95. However, line 108 is redundant because the copy constructor is already deleted on line 108. The order is also unusual - typically copy constructor and copy assignment operator are declared together. Consider removing line 95 or organizing the declarations more conventionally.

Copilot uses AI. Check for mistakes.
<< " msg latency=" << g_msg_recorder.latency(1) << "ns";
}

LOG(INFO) << "EchoClient is going to quit";
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent error message: The error message says "EchoClient is going to quit" but this is a benchmark client, not EchoClient. The message should be updated to "BenchmarkClient is going to quit" or simply "Client is going to quit" for consistency.

Suggested change
LOG(INFO) << "EchoClient is going to quit";
LOG(INFO) << "Client is going to quit";

Copilot uses AI. Check for mistakes.
Comment on lines +6 to +19

namespace test {

static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL};
BenchmarkService::~BenchmarkService() {}
const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() {
if (file_level_service_descriptors_my_2eproto[0] == NULL) {
const brpc::flatbuffers::BrpcDescriptorTable desc_table = {
"test.", "BenchmarkService" , "Test"};
if (brpc::flatbuffers::parse_service_descriptors(desc_table, &file_level_service_descriptors_my_2eproto[0])) {
std::cout << "ERROR: " << "Fail to parse_service_descriptors" << std::endl;
return NULL;
}
}
Copy link

Copilot AI Jan 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread safety issue: The descriptor() method has a race condition. Multiple threads could simultaneously check if file_level_service_descriptors_my_2eproto[0] is NULL and call parse_service_descriptors concurrently, leading to memory leaks and undefined behavior. This should be protected with a mutex or use std::call_once for thread-safe lazy initialization. This is a code generation issue that should be addressed in the flatc code generator.

Suggested change
namespace test {
static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL};
BenchmarkService::~BenchmarkService() {}
const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() {
if (file_level_service_descriptors_my_2eproto[0] == NULL) {
const brpc::flatbuffers::BrpcDescriptorTable desc_table = {
"test.", "BenchmarkService" , "Test"};
if (brpc::flatbuffers::parse_service_descriptors(desc_table, &file_level_service_descriptors_my_2eproto[0])) {
std::cout << "ERROR: " << "Fail to parse_service_descriptors" << std::endl;
return NULL;
}
}
#include <mutex>
namespace test {
static brpc::flatbuffers::ServiceDescriptor* file_level_service_descriptors_my_2eproto[1] = {NULL};
static std::once_flag file_level_service_descriptors_my_2eproto_once_flag;
BenchmarkService::~BenchmarkService() {}
const brpc::flatbuffers::ServiceDescriptor* BenchmarkService::descriptor() {
std::call_once(file_level_service_descriptors_my_2eproto_once_flag, []() {
const brpc::flatbuffers::BrpcDescriptorTable desc_table = {
"test.", "BenchmarkService" , "Test"};
if (brpc::flatbuffers::parse_service_descriptors(
desc_table, &file_level_service_descriptors_my_2eproto[0])) {
std::cout << "ERROR: " << "Fail to parse_service_descriptors" << std::endl;
}
});

Copilot uses AI. Check for mistakes.
#include "brpc/details/flatbuffers_common.h"

namespace brpc {
namespace flatbuffers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

命名空间和文件路径不一致

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

brpc只能使用protobuf吗?如果我想使用flatbuffer,能支持吗

3 participants