rocket-rpc笔记

RPC

如何基于protobuf实现一个极简版的RPC - Ying

GitHub - Gooddbird/rocket: c++ rpc framework, simplified version of tinyrpc。

前者比较好入门RPC框架,需要用到proto工具,后者是另一个rpc的开源实现,他们都不同于grpc。这涉及到通用服务和grpc服务的问题。前者代码比较老,版本问题太多,放弃,主要学习一下调用逻辑,后者可以跑通。

rocket的坑:按照issue配置好xml的环境,ld path加上/usr/local/lib就行,还需要protoc —cpp_out=. order.proto自己生成对应的cc和h文件。

rocket的Makefile解释:比较清晰易懂

通用服务和grpc服务?

grpc官方的方式是grpc服务,如果我们想自定义rpc实现,需要在proto中修改option cc_generic_services = true;该字段与grpc插件互斥。这样生成的pb文件就只包含通用服务,不过该字段现在已经被建议废弃使用。同时,我们还需要自己实现数据发送接收,比如tcp、http等。grpc就是全部帮你封装好了,内部是http。数据编码解码全部采用protobuf,同时grpc自动帮你处理了service、method的对应关系。自己实现rpc就需要自己再写一遍。这里只记录学习过程。具体参考如何基于protobuf实现一个极简版的RPC - Ying

关于一些版本问题

编译器默认的include目录可以通过echo | g++ -v -x c++ -E -查看

遇到的版本问题:protoc编译版本和protobuf库版本不匹配,系统中有多个protobuf.pc文件,首先查看一下pkg-config的搜索路径

1
2
3
4
5
6
7
8
echo $(pkg-config --variable pc_path pkg-config) | tr ':' '\n'

/usr/local/lib/x86_64-linux-gnu/pkgconfig
/usr/local/lib/pkgconfig
/usr/local/share/pkgconfig
/usr/lib/x86_64-linux-gnu/pkgconfig
/usr/lib/pkgconfig
/usr/share/pkgconfig

然后一个个找,查看 protobuf.pc里面的版本

1
pkg-config --modversion protobuf

因为按照grpc官方的安装方式,grpc还有相应的库依赖全部装在.local目录下,所以得添加到pkg-config的搜索目录下

1
2
3
4
5
6
echo 'export PKG_CONFIG_PATH=/home/zyt/.local/lib/pkgconfig:$PKG_CONFIG_PATH' >> ~/.bashrc

or

export PKG_CONFIG_PATH="$PKG_CONFIG_PAT:/home/zyt/.local/lib/pkgconfig"
source ~/.bashrc
不同项目的proto版本都不同,api也不一样,所以别纠结版本的问题了

想查看cmake的include路径,可以通过

1
2
get_directory_property(dirs INCLUDE_DIRECTORIES)
message(STATUS "After adding generated directory: ${dirs}")

grpc官方例子

CMakeLists.txt如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
cmake_minimum_required(VERSION 3.16)

project(HelloWorld C CXX)

include(../cmake/common.cmake)

# Proto file
get_filename_component(hw_proto "../../protos/helloworld.proto" ABSOLUTE)
get_filename_component(hw_proto_path "${hw_proto}" PATH)

# Generated sources
set(hw_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.cc")
set(hw_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.pb.h")
set(hw_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.cc")
set(hw_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/helloworld.grpc.pb.h")
add_custom_command(
OUTPUT "${hw_proto_srcs}" "${hw_proto_hdrs}" "${hw_grpc_srcs}" "${hw_grpc_hdrs}"
COMMAND ${_PROTOBUF_PROTOC}
ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}"
--cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
-I "${hw_proto_path}"
--plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
"${hw_proto}"
DEPENDS "${hw_proto}")

# Include generated *.pb.h files
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
message("CMAKE_CURRENT_BINARY_DIR = ${CMAKE_CURRENT_BINARY_DIR}")
# hw_grpc_proto
add_library(hw_grpc_proto
${hw_grpc_srcs}
${hw_grpc_hdrs}
${hw_proto_srcs}
${hw_proto_hdrs})
target_link_libraries(hw_grpc_proto
absl::check
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})

# Targets greeter_[async_](client|server)
foreach(_target
greeter_client greeter_server
greeter_callback_client greeter_callback_server
greeter_async_client greeter_async_client2 greeter_async_server)
add_executable(${_target} "${_target}.cc")
target_link_libraries(${_target}
hw_grpc_proto
absl::check
absl::flags
absl::flags_parse
absl::log
absl::log_initialize
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})
endforeach()

CMAKE_CURRENT_BINARY_DIR在这里实际上就是cmake/build目录,也就是你执行cmake命令的目录,因此我们include_directories("${CMAKE_CURRENT_BINARY_DIR}")便会自动搜索到生成的pb文件。

proto文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}

rpc SayHelloStreamReply (HelloRequest) returns (stream HelloReply) {}

rpc SayHelloBidiStream (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

client和server代码比较简单就不贴了,只梳理一下流程

我们在proto中定义了Greeter这个service,protoc会自动帮我们生成pb文件实现这个类,客户端指名server端地址以后,只需要实例化一个channel传递给客户端的构造函数,然后进一步实例化Greeter::Stub这个类,客户端发送请求的时候,内部其实就是通过这个stub去发送的。

服务端需要重写SayHello方法,然后注册service以后,server.wait一直等。

grpc的一些坑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
export MY_INSTALL_DIR=$HOME/.local

mkdir -p $MY_INSTALL_DIR

export PATH="$MY_INSTALL_DIR/bin:$PATH"

sudo apt install -y build-essential autoconf libtool pkg-config

git clone --recurse-submodules -b v1.72.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc


mkdir -p cmake/build
pushd cmake/build
cmake -DgRPC_INSTALL=ON \
-DgRPC_BUILD_TESTS=OFF \
-DCMAKE_CXX_STANDARD=17 \
-DCMAKE_INSTALL_PREFIX=$MY_INSTALL_DIR \
../..
make -j4
make install


mkdir -p cmake/build
pushd cmake/build
cmake -DCMAKE_PREFIX_PATH=$MY_INSTALL_DIR -DCMAKE_EXPORT_COMPILE_COMMANDS=1 -DCMAKE_CXX_STANDARD=17../..
make -j4

protoc --cpp_out=. contact.proto
  • absl::check之类的经常出现先链接库的时候,但他们其实不是库,他们只是一个编译target,在某些cmake文件里面会定义,不用深究。。

  • vscode报红,设置里面选C_Cpp.SelectIntelliSenseConfiguration,选一个配置就行,一般选cmake生成的compile_commands.json就行。

rocket的调用流程

class RpcController : public google::protobuf::RpcController:貌似是来记录一次调用的相关信息

class RpcClosure : public google::protobuf::Closure

class RpcInterface : public std::enable_shared_from_this<RpcInterface>

class RpcChannel : public google::protobuf::RpcChannel, public std::enable_shared_from_this<RpcChannel>

客户端流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
int main() {

rocket::Config::SetGlobalConfig(NULL);

rocket::Logger::InitGlobalLogger(0);

// test_tcp_client();
test_rpc_channel();

INFOLOG("test_rpc_channel end");

return 0;
}

void test_rpc_channel() {

NEWRPCCHANNEL("127.0.0.1:12345", channel);

// std::shared_ptr<makeOrderRequest> request = std::make_shared<makeOrderRequest>();

NEWMESSAGE(makeOrderRequest, request);
NEWMESSAGE(makeOrderResponse, response);

request->set_price(100);
request->set_goods("apple");

NEWRPCCONTROLLER(controller);
controller->SetMsgId("99998888");
controller->SetTimeout(10000);

std::shared_ptr<rocket::RpcClosure> closure = std::make_shared<rocket::RpcClosure>(nullptr, [request, response, channel, controller]() mutable {
if (controller->GetErrorCode() == 0) {
INFOLOG("call rpc success, request[%s], response[%s]", request->ShortDebugString().c_str(), response->ShortDebugString().c_str());
// 执行业务逻辑
if (response->order_id() == "xxx") {
// xx
}
} else {
ERRORLOG("call rpc failed, request[%s], error code[%d], error info[%s]",
request->ShortDebugString().c_str(),
controller->GetErrorCode(),
controller->GetErrorInfo().c_str());
}

INFOLOG("now exit eventloop");
// channel->getTcpClient()->stop();
channel.reset();
});

{
std::shared_ptr<rocket::RpcChannel> channel = std::make_shared<rocket::RpcChannel>(rocket::RpcChannel::FindAddr("127.0.0.1:12345"));
;
channel->Init(controller, request, response, closure);
Order_Stub(channel.get()).makeOrder(controller.get(), request.get(), response.get(), closure.get());
}

// CALLRPRC("127.0.0.1:12345", Order_Stub, makeOrder, controller, request, response, closure);



// xxx
// 协程
}

初始化GlobalConfig、初始化Logger模块。以上两步其实是和server端是复用的,主要是server端需要读取xml配置,Logger模块分配两个线程。客户端不用做这些。

依次构建channel、request、response、controller、closure等。channel的构建其实就是拿到服务端的ip port,这些数据是从xml配置中读的,初始化channel就只是保存了服务端的地址端口信息m_peer_addr。然后Init再进行一些赋值等。request是Message对象,会进行一些数据填充。

1
Order_Stub(channel.get()).makeOrder(controller.get(), request.get(), response.get(), closure.get());

这段代码实际上走到了RpcChannel::CallMethod

  • 构建一个std::shared_ptr<rocket::TinyPBProtocol> req_protocol

  • 一些校验

  • 构建一个m_client = std::make_shared<TcpClient>(m_peer_addr);

  • req_protocol设置m_msg_id、m_method_name

  • request->SerializeToString(&(req_protocol->m_pb_data)),序列化request,结果保存在m_pb_data中,这里虽然用string存,但是内部其实都是二进制数据。

  • 设置一个单次定时事件,超时未完成就手动销毁,把该定时事件加到m_client 中

  • m_client->connect,这里会传入一个回调done,connect逻辑比较简单,就是原生的connect,返回0表示连接成功,然后执行done回调,返回-1的话就看看是不是在连接中EINPROGRESS,是就把这个fdevent监听可写,可写触发connect、初始化本地addr、设置连接状态等,也就是正常connect的情况。最后把这个fdevent删掉。因为这里只是为了保证下次连接正常进行,否则会一直触发写。

  • connect里面的回调,正常来说是连接成功了才会执行。匿名函数内部首先校验有没有错误,然后执行getTcpClient()->writeMessage(),其中匿名函数捕获了req_protocol和this,其实就是连接成功以后记录了一对,然后监听可写事件,这里可写的回调就是把前面记录的message编码以后通过socket发送出去(message就是req_protocol,req_protocol中存着request序列化以后的数据),发送完了以后,然后依次执行前面的done回调函数,在这个回调函数中,会调用getTcpClient()->readMessage(),其中第一个参数req_protocol->m_msg_id,第二个参数是匿名函数,捕获了this和my_controller。这里其实是上面的回调触发的时候执行到这里,然后又记录一对,然后监听可读事件,代表服务端已经执行完了送来了response。在这个done里面就是判断response有没有解析出错,没问题的话就执行callback回调,callback回调就比较简单了,标记finished,然后执行m_closure->run。这里的response怎么来的呢?其实是readMessage监听可读事件的时候,检测到可读了,然后触发可读的回调,TcpConnection::onRead,在这个函数里面首先从socket中读出数据存在in_buffer中,然后调用excute函数,依次解码得到message对象,然后执行之前记录好的回调。总结来说,就是connect的时候,注册一个回调A,连接上了才会执行回调A。在A里面又调用了writeMessage,在这个函数中会注册回调B,writeMessage这个函数其实就是把req_protocol写入到m_out_buffer,然后全部发送出去,在内部会记录每个req_protocol和其对应的回调。发送完了以后执行回调B。而在回调B中,又会调用readMessage,在这个函数中,又会注册回调C。readMessage是用来读取数据到m_in_buffer,在内部会记录每次请求的m_msg_id以及对应的回调,然后在excute里面,根据是服务端还是客户端,分别执行请求的分发和客户端自己的回调。在回调C中,其实就是收到服务端的响应以后,进行反序列化拿到response,也就是message,然后进行校验,最后调用RpcChannel::callBack标记这一次请求已经完成,然后执行closure里面的run函数,内部就是执行初始化closure时传入的回调函数。closure一般来说是一次rpc请求完成以后,在客户端执行的,在这里可以执行真正的业务逻辑。

  • TcpConnection::onWrite 该函数客户端和服务端其实共用了,如果是客户端就会先encode到m_out_buffer,然后再发送,发送完了,如果是客户端,再额外执行事先绑定的回调,也就是写回调,写回调里面会调用getTcpClient()->readMessage(),会注册一个读回调

  • TcpConnection::onRead 该函数客户端和服务端也是共用了。从socket中读取数据存在in_buffer中,然后执行excute函数,如果是客户端,就decode得到message对象,然后执行之前注册的回调。如果是服务端,就代表要处理多少个响应

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    if (m_connection_type == TcpConnectionByServer) {
    // 将 RPC 请求执行业务逻辑,获取 RPC 响应, 再把 RPC 响应发送回去
    std::vector<AbstractProtocol::s_ptr> result;
    m_coder->decode(result, m_in_buffer);
    for (size_t i = 0; i < result.size(); ++i) {
    // 1. 针对每一个请求,调用 rpc 方法,获取响应 message
    // 2. 将响应 message 放入到发送缓冲区,监听可写事件回包
    INFOLOG("success get request[%s] from client[%s]", result[i]->m_msg_id.c_str(), m_peer_addr->toString().c_str());

    std::shared_ptr<TinyPBProtocol> message = std::make_shared<TinyPBProtocol>();
    // message->m_pb_data = "hello. this is rocket rpc test data";
    // message->m_msg_id = result[i]->m_msg_id;

    RpcDispatcher::GetRpcDispatcher()->dispatch(result[i], message, this);
    }
    }
  • RpcDispatcher::dispatch 该函数就是用来分发服务调用的,首先进行一些校验,构造rsp_protocol,获取method、service,构造两个google::protobuf::Message,req_msg和rsp_msg,然后将req_protocol->m_pb_data的数据反序列化保存在req_msg中,然后构建一个RpcController rpc_controller,最后构建一个RpcClosure,构造函数里面会注册一个回调(在这个回调里首先将rsp_msg序列化到rsp_protocol->m_pb_data中,然后调用TcpConnection的reply函数,在内部encode rsp_protocol,然后监听可写,写入发到对端)最后调用service->CallMethod,这里是服务端调用method的地方。最终调到了OrderImpl::makeOrder,这是协议里面定义的rpc服务具体的实现,内部就是把request校验一下,然后给response填充,完了以后调用前面done->run,也就是前面RpcClosure事先注册的*回调

服务端流程

编写OrderImpl继承自Order类,在这里实现真正的服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int main(int argc, char* argv[]) {

if (argc != 2) {
printf("Start test_rpc_server error, argc not 2 \n");
printf("Start like this: \n");
printf("./test_rpc_server ../conf/rocket.xml \n");
return 0;
}

rocket::Config::SetGlobalConfig(argv[1]);

rocket::Logger::InitGlobalLogger();

std::shared_ptr<OrderImpl> service = std::make_shared<OrderImpl>();
rocket::RpcDispatcher::GetRpcDispatcher()->registerService(service);

rocket::IPNetAddr::s_ptr addr = std::make_shared<rocket::IPNetAddr>("127.0.0.1", rocket::Config::GetGlobalConfig()->m_port);

rocket::TcpServer tcp_server(addr);

tcp_server.start();

return 0;
}

std::shared_ptr<OrderImpl> service,然后注册服务,在内部其实是有一个map,key为服务名,value为具体的服务实例。然后构建一个TcpServer,然后start开启循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TcpServer::init() {

m_acceptor = std::make_shared<TcpAcceptor>(m_local_addr);

m_main_event_loop = EventLoop::GetCurrentEventLoop();
m_io_thread_group = new IOThreadGroup(Config::GetGlobalConfig()->m_io_threads);

m_listen_fd_event = new FdEvent(m_acceptor->getListenFd());
m_listen_fd_event->listen(FdEvent::IN_EVENT, std::bind(&TcpServer::onAccept, this));

m_main_event_loop->addEpollEvent(m_listen_fd_event);

m_clear_client_timer_event = std::make_shared<TimerEvent>(5000, true, std::bind(&TcpServer::ClearClientTimerFunc, this));
m_main_event_loop->addTimerEvent(m_clear_client_timer_event);

}

构建TcpServer的时候,在init中会构建一个TcpAcceptor,这个就是拿来listen和accept的,成功会记录一对。然后就是自己会维护一个主eventloop,然后创建4个IO线程,即4个从eventloop。然后创建一个事件,检测可读,回调为TcpServer::onAccept。加入到主eventloop中,然后加一个每5s定时任务到主eventloop中。然后开启主从loop的循环。

TcpServer::onAccept()中就是用m_acceptor->accept拿到连接的fd,然后取出来一个IO线程对象,然后构建一个TcpConnection对象,每个对象都有一个eventloop,但是这个eventloop是IO线程的,如果是服务端,就还需要监听可读。然后可读的回调TcpConnection::onRead其实就是从socket读数据,然后解码成rpc请求分发到真正的服务那里执行。这里可能还有一个疑惑。fdevent没有绑定fd啊?实际上在初始化的时候,m_fd_event = FdEventGroup::GetFdEventGroup()->getFdEvent(fd);这里就事先建立了fd和m_fd_event的映射关系,其实就是事先创建好了128个fdevent,对应的fd就是0-127。

总结来说,服务端在TcpServer::init里面注册了新连接的回调,TcpServer::onAccept。在这个函数里面,每连接一个新客户端的时候,都会从4个IO线程中,选出来一个交给他去管理。并且给每个连接创建一个TcpConnection并且记录,然后监听可读事件。实际上在初始化TcpConnection的时候,因为是在主线程操作的,所以在监听可读事件的时候,会调用m_event_loop->addEpollEvent(m_fd_event);但是这里很明显是在主线程中,调了子线程的eventloop,所以构造一个task存起来,存在子线程对应的eventloop的m_pending_tasks中,然后唤醒这个子线程的eventloop,唤醒是为了防止这个子eventloop是在阻塞。

再回过头来看eventloop::loop到底是什么死循环呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void EventLoop::loop() {
m_is_looping = true;
while(!m_stop_flag) {
ScopeMutex<Mutex> lock(m_mutex);
std::queue<std::function<void()>> tmp_tasks;
m_pending_tasks.swap(tmp_tasks);
lock.unlock();

while (!tmp_tasks.empty()) {
std::function<void()> cb = tmp_tasks.front();
tmp_tasks.pop();
if (cb) {
cb();
}
}

// 如果有定时任务需要执行,那么执行
// 1. 怎么判断一个定时任务需要执行? (now() > TimerEvent.arrtive_time)
// 2. arrtive_time 如何让 eventloop 监听

// 最多等待10000ms
int timeout = g_epoll_max_timeout;
epoll_event result_events[g_epoll_max_events];
// DEBUGLOG("now begin to epoll_wait");
int rt = epoll_wait(m_epoll_fd, result_events, g_epoll_max_events, timeout);
// DEBUGLOG("now end epoll_wait, rt = %d", rt);

if (rt < 0) {
ERRORLOG("epoll_wait error, errno=%d, error=%s", errno, strerror(errno));
} else {
for (int i = 0; i < rt; ++i) {
epoll_event trigger_event = result_events[i];
FdEvent* fd_event = static_cast<FdEvent*>(trigger_event.data.ptr);
if (fd_event == NULL) {
ERRORLOG("fd_event = NULL, continue");
continue;
}

// int event = (int)(trigger_event.events);
// DEBUGLOG("unkonow event = %d", event);

if (trigger_event.events & EPOLLIN) {

// DEBUGLOG("fd %d trigger EPOLLIN event", fd_event->getFd())
addTask(fd_event->handler(FdEvent::IN_EVENT));
}
if (trigger_event.events & EPOLLOUT) {
// DEBUGLOG("fd %d trigger EPOLLOUT event", fd_event->getFd())
addTask(fd_event->handler(FdEvent::OUT_EVENT));
}

// EPOLLHUP EPOLLERR
if (trigger_event.events & EPOLLERR) {
DEBUGLOG("fd %d trigger EPOLLERROR event", fd_event->getFd())
// 删除出错的套接字
deleteEpollEvent(fd_event);
if (fd_event->handler(FdEvent::ERROR_EVENT) != nullptr) {
DEBUGLOG("fd %d add error callback", fd_event->getFd())
addTask(fd_event->handler(FdEvent::OUT_EVENT));
}
}
}
}

}

}
  • swap一次性取出任务
  • 依次执行任务(比如对应上面的task,也就是addevent)
  • 然后就是执行epoll_wait,有一个超时时间。如果有返回值,那么就代表有可读或者可写事件,然后分别将相对应的回调addTask

值得注意的点

  • AsyncLogger::Loop在while循环的时候,进去就直接上锁,子while循环直接阻塞,等到新buffer来了,才会解除阻塞,然后继续拿回锁

  • 主线程会保留所有io线程的编号和线程对象,也就间接保留了所有从eventloop的对象。因为每个从eventloop初始化时会记录当前子线程的id。所以主线程的accept回调触发的时候,最终是在主线程里面拿到某个子线程的eventloop,然后addEpollEvent。具体调用为TcpServer::onAccept—>实例化TcpConnection::s_ptr connetion—>TcpConnection::listenRead—>m_event_loop->addEpollEvent

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    TcpConnection::TcpConnection(EventLoop* event_loop, int fd, int buffer_size, NetAddr::s_ptr peer_addr, NetAddr::s_ptr local_addr, TcpConnectionType type /*= TcpConnectionByServer*/)
    : m_event_loop(event_loop), m_local_addr(local_addr), m_peer_addr(peer_addr), m_state(NotConnected), m_fd(fd), m_connection_type(type) {

    m_in_buffer = std::make_shared<TcpBuffer>(buffer_size);
    m_out_buffer = std::make_shared<TcpBuffer>(buffer_size);

    m_fd_event = FdEventGroup::GetFdEventGroup()->getFdEvent(fd);
    m_fd_event->setNonBlock();

    m_coder = new TinyPBCoder();

    if (m_connection_type == TcpConnectionByServer) {
    listenRead();
    }

    }

    void TcpConnection::listenRead() {

    m_fd_event->listen(FdEvent::IN_EVENT, std::bind(&TcpConnection::onRead, this));
    m_event_loop->addEpollEvent(m_fd_event);
    }

    bool EventLoop::isInLoopThread() {
    return getThreadId() == m_thread_id;
    }

    // 如果是当前线程调用自己的eventloop,那么直接添加到 epoll 里面
    // 如果不是当前线程调用自己的eventloop,那么添加到任务队列里面,等到自己线程执行完后再添加到 epoll 里面
    void EventLoop::addEpollEvent(FdEvent* event) {
    if (isInLoopThread()) {
    ADD_TO_EPOLL();
    } else {
    auto cb = [this, event]() {
    ADD_TO_EPOLL();
    };
    addTask(cb, true);
    }

    }


    所以现在当前线程的id,和eventloop绑定的id,不是同一个,那么就走到了EventLoop::addTask。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    void EventLoop::addTask(std::function<void()> cb, bool is_wake_up /*=false*/) {
    ScopeMutex<Mutex> lock(m_mutex);
    m_pending_tasks.push(cb);
    lock.unlock();

    if (is_wake_up) {
    wakeup();
    }
    }
  • 那为什么要加锁?首先这里的锁,是每个eventloop独有的。各个子线程之间貌似互不干扰啊?但是从上面的分析可以看到,这里是主线程调到了子线程的eventloop(因为事先记录了!)所以这里的eventloop实际上就是一个临界资源(共享数据)呀!刚好这个临界资源(共享数据)又有一把锁,这不就类似于主线程子线程共用一个全局变量锁吗!所以在每个eventloop里面操作的时候,都需要加锁上锁呀!

    那再看看addTask到底做了啥呢?就是把任务放到这个eventloop的队列里,然后再调用wakeup!那wakeup到底做了啥?在每个eventloop初始化的时候,都会调用initWakeUpFdEevent,在这里会创建一个eventfd。然后一直监听!wakeup本质上就是触发一次可读事件,然后解除epoll的阻塞。这里wakeup的意义就是怕这个从eventloop一直阻塞住了,我现在要让他去监听新的客户端fd,所以通过这个eventfd去解除他的阻塞!值得注意的是,如果这个eventloop之前就监听了一些客户端fd,刚好这个时候有客户端fd可读了,也解除了epoll的阻塞,那么确实当前这个wakeup就没啥意义了!总之,我们手动wakeup一次,触发可读,保证解除epoll阻塞,肯定是有意义的,不管怎样,都能保证eventloop及时处理新来的任务!这里新来的任务就是监听新的客户端fd,其实也可以是其他任务,比如注销fd,某些定时任务,回调函数等等!但是在这份源码中好像是只有连接回调,也就是监听新的客户端fd。

  • 在eventloop开头的时候,直接swap,可以高效取出所有的任务,减少上锁的时间,相当于利用c++的特性吧,然后后面再慢慢处理每个task

  • 每个m_fd_event->listen的时候,都会在内部保存m_listen_events.data.ptr = this;这样每次epoll返回的时候,能够拿到这个FdEvent对象。
  • 任务的最终执行其实都是在eventloop的开头,epoll返回的时候,会告诉你哪个fd可写可读,该fd会与一个FdEvent绑定,我们需要转换成FdEvent,然后根据fd是可读还是可写,找到事先定义好的回调,然后添加到任务列表m_pending_tasks

  • req_msg和rsp_msg都是google::protobuf::Message*类型,proto文件中定义的rpc服务,request和response实际上都是继承自Message

  • service是google::protobuf::Service类型

  • socket通信的时候,客户端连接触发可写,服务端连接触发可读。
  • 客户端请求的时候,request对象赋值以后,先是声明req_protocol,然后序列化到req_protocol->m_pb_data中,然后再将这个req_protocol编码,也就是encode,其实就是把req_protocol转化成字节流数组,然后写入到m_out_buffer中,最后通过write发过去。服务端读取的时候,数据读到m_in_buffer中,然后再decode还原成TinyPBProtocol,也就是客户端发过来的req_protocol对象。然后反序列化req_protocol->m_pb_data到req_msg中,其实就是request对象。然后服务端处理完以后,构建一个response对象。然后在服务端的closure中,再把这个reponse序列化为rsp_protocol,最后调用connection->reply(replay_messages);内部同样是encode编码以后发出去。

最后再补充一下,为什么要用char* 字节数组去send和write?

socket传输的是字节流

  1. 历史原因:在C语言中,char类型被设计为最小的可寻址单元,通常与一个字节(8位)对应。因此,char*常被用作处理原始内存的通用指针。
  2. 字节流的概念:字节流意味着数据被视为一系列连续的字节,而没有任何固有含义。发送数据时,我们并不关心这些字节代表什么(字符、整数、浮点数等),只关心它们作为字节序列的传输。
  3. 套接字的本质:套接字(socket)传输的是原始字节。应用程序负责赋予这些字节意义。无论是字符串、图像还是其他二进制数据,在套接字层面都被视为字节流。