RPC
如何基于protobuf实现一个极简版的RPC - Ying
GitHub - Gooddbird/rocket: c++ rpc framework, simplified version of tinyrpc。
前者比较好入门RPC框架,需要用到proto工具,后者是另一个rpc的开源实现,他们都不同于grpc。这涉及到通用服务和grpc服务的问题。前者代码比较老,版本问题太多,放弃,主要学习一下调用逻辑,后者可以跑通。
rocket的坑:按照issue配置好xml的环境,ld path加上/usr/local/lib就行,还需要protoc —cpp_out=. order.proto自己生成对应的cc和h文件。
rocket的Makefile解释:比较清晰易懂
通用服务和grpc服务?
grpc官方的方式是grpc服务,如果我们想自定义rpc实现,需要在proto中修改option cc_generic_services = true;
该字段与grpc插件互斥。这样生成的pb文件就只包含通用服务,不过该字段现在已经被建议废弃使用。同时,我们还需要自己实现数据发送接收,比如tcp、http等。grpc就是全部帮你封装好了,内部是http。数据编码解码全部采用protobuf,同时grpc自动帮你处理了service、method的对应关系。自己实现rpc就需要自己再写一遍。这里只记录学习过程。具体参考如何基于protobuf实现一个极简版的RPC - Ying
关于一些版本问题
编译器默认的include目录可以通过echo | g++ -v -x c++ -E -
查看
遇到的版本问题:protoc编译版本和protobuf库版本不匹配,系统中有多个protobuf.pc文件,首先查看一下pkg-config的搜索路径
1 | echo $(pkg-config --variable pc_path pkg-config) | tr ':' '\n' |
然后一个个找,查看 protobuf.pc里面的版本
1 | pkg-config --modversion protobuf |
因为按照grpc官方的安装方式,grpc还有相应的库依赖全部装在.local目录下,所以得添加到pkg-config的搜索目录下
1 | echo 'export PKG_CONFIG_PATH=/home/zyt/.local/lib/pkgconfig:$PKG_CONFIG_PATH' >> ~/.bashrc |
想查看cmake的include路径,可以通过
1 | get_directory_property(dirs INCLUDE_DIRECTORIES) |
grpc官方例子
CMakeLists.txt如下
1 | cmake_minimum_required(VERSION 3.16) |
CMAKE_CURRENT_BINARY_DIR
在这里实际上就是cmake/build目录,也就是你执行cmake命令的目录,因此我们include_directories("${CMAKE_CURRENT_BINARY_DIR}")
便会自动搜索到生成的pb文件。
proto文件如下
1 | // The greeting service definition. |
client和server代码比较简单就不贴了,只梳理一下流程
我们在proto中定义了Greeter这个service,protoc会自动帮我们生成pb文件实现这个类,客户端指名server端地址以后,只需要实例化一个channel传递给客户端的构造函数,然后进一步实例化Greeter::Stub这个类,客户端发送请求的时候,内部其实就是通过这个stub去发送的。
服务端需要重写SayHello方法,然后注册service以后,server.wait一直等。
grpc的一些坑
1 | export MY_INSTALL_DIR=$HOME/.local |
absl::check
之类的经常出现先链接库的时候,但他们其实不是库,他们只是一个编译target,在某些cmake文件里面会定义,不用深究。。vscode报红,设置里面选C_Cpp.SelectIntelliSenseConfiguration,选一个配置就行,一般选cmake生成的compile_commands.json就行。
rocket的调用流程
class RpcController : public google::protobuf::RpcController
:貌似是来记录一次调用的相关信息
class RpcClosure : public google::protobuf::Closure
class RpcInterface : public std::enable_shared_from_this<RpcInterface>
class RpcChannel : public google::protobuf::RpcChannel, public std::enable_shared_from_this<RpcChannel>
客户端流程
1 | int main() { |
初始化GlobalConfig、初始化Logger模块。以上两步其实是和server端是复用的,主要是server端需要读取xml配置,Logger模块分配两个线程。客户端不用做这些。
依次构建channel、request、response、controller、closure等。channel的构建其实就是拿到服务端的ip port,这些数据是从xml配置中读的,初始化channel就只是保存了服务端的地址端口信息m_peer_addr。然后Init再进行一些赋值等。request是Message对象,会进行一些数据填充。
1 | Order_Stub(channel.get()).makeOrder(controller.get(), request.get(), response.get(), closure.get()); |
这段代码实际上走到了RpcChannel::CallMethod
构建一个
std::shared_ptr<rocket::TinyPBProtocol> req_protocol
一些校验
构建一个
m_client = std::make_shared<TcpClient>(m_peer_addr);
req_protocol设置m_msg_id、m_method_name
request->SerializeToString(&(req_protocol->m_pb_data))
,序列化request,结果保存在m_pb_data中,这里虽然用string存,但是内部其实都是二进制数据。设置一个单次定时事件,超时未完成就手动销毁,把该定时事件加到m_client 中
m_client->connect,这里会传入一个回调done,connect逻辑比较简单,就是原生的connect,返回0表示连接成功,然后执行done回调,返回-1的话就看看是不是在连接中EINPROGRESS,是就把这个fdevent监听可写,可写触发connect、初始化本地addr、设置连接状态等,也就是正常connect的情况。最后把这个fdevent删掉。因为这里只是为了保证下次连接正常进行,否则会一直触发写。
connect里面的回调,正常来说是连接成功了才会执行。匿名函数内部首先校验有没有错误,然后执行getTcpClient()->writeMessage(),其中匿名函数捕获了req_protocol和this,其实就是连接成功以后记录了一对
,然后监听可写事件,这里可写的回调就是把前面记录的message编码以后通过socket发送出去(message就是req_protocol,req_protocol中存着request序列化以后的数据),发送完了以后,然后依次执行前面的done回调函数,在这个回调函数中,会调用getTcpClient()->readMessage(),其中第一个参数req_protocol->m_msg_id,第二个参数是匿名函数,捕获了this和my_controller。这里其实是上面的回调触发的时候执行到这里,然后又记录一对 ,然后监听可读事件,代表服务端已经执行完了送来了response。在这个done里面就是判断response有没有解析出错,没问题的话就执行callback回调,callback回调就比较简单了,标记finished,然后执行m_closure->run。这里的response怎么来的呢?其实是readMessage监听可读事件的时候,检测到可读了,然后触发可读的回调,TcpConnection::onRead,在这个函数里面首先从socket中读出数据存在in_buffer中,然后调用excute函数,依次解码得到message对象,然后执行之前记录好的回调。总结来说,就是connect的时候,注册一个回调A,连接上了才会执行回调A。在A里面又调用了writeMessage,在这个函数中会注册回调B,writeMessage这个函数其实就是把req_protocol写入到m_out_buffer,然后全部发送出去,在内部会记录每个req_protocol和其对应的回调。发送完了以后执行回调B。而在回调B中,又会调用readMessage,在这个函数中,又会注册回调C。readMessage是用来读取数据到m_in_buffer,在内部会记录每次请求的m_msg_id以及对应的回调,然后在excute里面,根据是服务端还是客户端,分别执行请求的分发和客户端自己的回调。在回调C中,其实就是收到服务端的响应以后,进行反序列化拿到response,也就是message,然后进行校验,最后调用RpcChannel::callBack标记这一次请求已经完成,然后执行closure里面的run函数,内部就是执行初始化closure时传入的回调函数。closure一般来说是一次rpc请求完成以后,在客户端执行的,在这里可以执行真正的业务逻辑。 TcpConnection::onWrite 该函数客户端和服务端其实共用了,如果是客户端就会先encode到m_out_buffer,然后再发送,发送完了,如果是客户端,再额外执行事先绑定的回调,也就是写回调,写回调里面会调用getTcpClient()->readMessage(),会注册一个读回调
TcpConnection::onRead 该函数客户端和服务端也是共用了。从socket中读取数据存在in_buffer中,然后执行excute函数,如果是客户端,就decode得到message对象,然后执行之前注册的回调。如果是服务端,就代表要处理多少个响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16if (m_connection_type == TcpConnectionByServer) {
// 将 RPC 请求执行业务逻辑,获取 RPC 响应, 再把 RPC 响应发送回去
std::vector<AbstractProtocol::s_ptr> result;
m_coder->decode(result, m_in_buffer);
for (size_t i = 0; i < result.size(); ++i) {
// 1. 针对每一个请求,调用 rpc 方法,获取响应 message
// 2. 将响应 message 放入到发送缓冲区,监听可写事件回包
INFOLOG("success get request[%s] from client[%s]", result[i]->m_msg_id.c_str(), m_peer_addr->toString().c_str());
std::shared_ptr<TinyPBProtocol> message = std::make_shared<TinyPBProtocol>();
// message->m_pb_data = "hello. this is rocket rpc test data";
// message->m_msg_id = result[i]->m_msg_id;
RpcDispatcher::GetRpcDispatcher()->dispatch(result[i], message, this);
}
}RpcDispatcher::dispatch 该函数就是用来分发服务调用的,首先进行一些校验,构造rsp_protocol,获取method、service,构造两个google::protobuf::Message,req_msg和rsp_msg,然后将req_protocol->m_pb_data的数据反序列化保存在req_msg中,然后构建一个RpcController rpc_controller,最后构建一个RpcClosure,构造函数里面会注册一个回调(在这个回调里首先将rsp_msg序列化到rsp_protocol->m_pb_data中,然后调用TcpConnection的reply函数,在内部encode rsp_protocol,然后监听可写,写入发到对端)最后调用service->CallMethod,这里是服务端调用method的地方。最终调到了OrderImpl::makeOrder,这是协议里面定义的rpc服务具体的实现,内部就是把request校验一下,然后给response填充,完了以后调用前面done->run,也就是前面RpcClosure事先注册的*回调。
服务端流程
编写OrderImpl继承自Order类,在这里实现真正的服务
1 | int main(int argc, char* argv[]) { |
std::shared_ptr<OrderImpl>
service,然后注册服务,在内部其实是有一个map,key为服务名,value为具体的服务实例。然后构建一个TcpServer,然后start开启循环。
1 | void TcpServer::init() { |
构建TcpServer的时候,在init中会构建一个TcpAcceptor,这个就是拿来listen和accept的,成功会记录一对
TcpServer::onAccept()中就是用m_acceptor->accept拿到连接的fd,然后取出来一个IO线程对象,然后构建一个TcpConnection对象,每个对象都有一个eventloop,但是这个eventloop是IO线程的,如果是服务端,就还需要监听可读。然后可读的回调TcpConnection::onRead其实就是从socket读数据,然后解码成rpc请求分发到真正的服务那里执行。这里可能还有一个疑惑。fdevent没有绑定fd啊?实际上在初始化的时候,m_fd_event = FdEventGroup::GetFdEventGroup()->getFdEvent(fd);这里就事先建立了fd和m_fd_event的映射关系,其实就是事先创建好了128个fdevent,对应的fd就是0-127。
总结来说,服务端在TcpServer::init里面注册了新连接的回调,TcpServer::onAccept。在这个函数里面,每连接一个新客户端的时候,都会从4个IO线程中,选出来一个交给他去管理。并且给每个连接创建一个TcpConnection并且记录,然后监听可读事件。实际上在初始化TcpConnection的时候,因为是在主线程操作的,所以在监听可读事件的时候,会调用m_event_loop->addEpollEvent(m_fd_event);但是这里很明显是在主线程中,调了子线程的eventloop,所以构造一个task存起来,存在子线程对应的eventloop的m_pending_tasks中,然后唤醒这个子线程的eventloop,唤醒是为了防止这个子eventloop是在阻塞。再回过头来看eventloop::loop到底是什么死循环呢?
1 | void EventLoop::loop() { |
- swap一次性取出任务
- 依次执行任务(比如对应上面的task,也就是addevent)
- 然后就是执行epoll_wait,有一个超时时间。如果有返回值,那么就代表有可读或者可写事件,然后分别将相对应的回调addTask
值得注意的点
AsyncLogger::Loop在while循环的时候,进去就直接上锁,子while循环直接阻塞,等到新buffer来了,才会解除阻塞,然后继续拿回锁
主线程会保留所有io线程的编号和线程对象,也就间接保留了所有从eventloop的对象。因为每个从eventloop初始化时会记录当前子线程的id。所以主线程的accept回调触发的时候,最终是在主线程里面拿到某个子线程的eventloop,然后addEpollEvent。具体调用为TcpServer::onAccept—>实例化TcpConnection::s_ptr connetion—>TcpConnection::listenRead—>m_event_loop->addEpollEvent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42TcpConnection::TcpConnection(EventLoop* event_loop, int fd, int buffer_size, NetAddr::s_ptr peer_addr, NetAddr::s_ptr local_addr, TcpConnectionType type /*= TcpConnectionByServer*/)
: m_event_loop(event_loop), m_local_addr(local_addr), m_peer_addr(peer_addr), m_state(NotConnected), m_fd(fd), m_connection_type(type) {
m_in_buffer = std::make_shared<TcpBuffer>(buffer_size);
m_out_buffer = std::make_shared<TcpBuffer>(buffer_size);
m_fd_event = FdEventGroup::GetFdEventGroup()->getFdEvent(fd);
m_fd_event->setNonBlock();
m_coder = new TinyPBCoder();
if (m_connection_type == TcpConnectionByServer) {
listenRead();
}
}
void TcpConnection::listenRead() {
m_fd_event->listen(FdEvent::IN_EVENT, std::bind(&TcpConnection::onRead, this));
m_event_loop->addEpollEvent(m_fd_event);
}
bool EventLoop::isInLoopThread() {
return getThreadId() == m_thread_id;
}
// 如果是当前线程调用自己的eventloop,那么直接添加到 epoll 里面
// 如果不是当前线程调用自己的eventloop,那么添加到任务队列里面,等到自己线程执行完后再添加到 epoll 里面
void EventLoop::addEpollEvent(FdEvent* event) {
if (isInLoopThread()) {
ADD_TO_EPOLL();
} else {
auto cb = [this, event]() {
ADD_TO_EPOLL();
};
addTask(cb, true);
}
}所以现在当前线程的id,和eventloop绑定的id,不是同一个,那么就走到了EventLoop::addTask。
1
2
3
4
5
6
7
8
9void EventLoop::addTask(std::function<void()> cb, bool is_wake_up /*=false*/) {
ScopeMutex<Mutex> lock(m_mutex);
m_pending_tasks.push(cb);
lock.unlock();
if (is_wake_up) {
wakeup();
}
}
那为什么要加锁?首先这里的锁,是每个eventloop独有的。各个子线程之间貌似互不干扰啊?但是从上面的分析可以看到,这里是主线程调到了子线程的eventloop(因为事先记录了!)所以这里的eventloop实际上就是一个临界资源(共享数据)呀!刚好这个临界资源(共享数据)又有一把锁,这不就类似于主线程子线程共用一个全局变量锁吗!所以在每个eventloop里面操作的时候,都需要加锁上锁呀!
那再看看addTask到底做了啥呢?就是把任务放到这个eventloop的队列里,然后再调用wakeup!那wakeup到底做了啥?在每个eventloop初始化的时候,都会调用initWakeUpFdEevent,在这里会创建一个eventfd。然后一直监听!wakeup本质上就是触发一次可读事件,然后解除epoll的阻塞。这里wakeup的意义就是怕这个从eventloop一直阻塞住了,我现在要让他去监听新的客户端fd,所以通过这个eventfd去解除他的阻塞!值得注意的是,如果这个eventloop之前就监听了一些客户端fd,刚好这个时候有客户端fd可读了,也解除了epoll的阻塞,那么确实当前这个wakeup就没啥意义了!总之,我们手动wakeup一次,触发可读,保证解除epoll阻塞,肯定是有意义的,不管怎样,都能保证eventloop及时处理新来的任务!这里新来的任务就是监听新的客户端fd,其实也可以是其他任务,比如注销fd,某些定时任务,回调函数等等!但是在这份源码中好像是只有连接回调,也就是监听新的客户端fd。
在eventloop开头的时候,直接swap,可以高效取出所有的任务,减少上锁的时间,相当于利用c++的特性吧,然后后面再慢慢处理每个task
- 每个m_fd_event->listen的时候,都会在内部保存m_listen_events.data.ptr = this;这样每次epoll返回的时候,能够拿到这个FdEvent对象。
任务的最终执行其实都是在eventloop的开头,epoll返回的时候,会告诉你哪个fd可写可读,该fd会与一个FdEvent绑定,我们需要转换成FdEvent,然后根据fd是可读还是可写,找到事先定义好的回调,然后添加到任务列表m_pending_tasks中
req_msg和rsp_msg都是google::protobuf::Message*类型,proto文件中定义的rpc服务,request和response实际上都是继承自Message
service是google::protobuf::Service类型
- socket通信的时候,客户端连接触发可写,服务端连接触发可读。
客户端请求的时候,request对象赋值以后,先是声明req_protocol,然后序列化到req_protocol->m_pb_data中,然后再将这个req_protocol编码,也就是encode,其实就是把req_protocol转化成字节流数组,然后写入到m_out_buffer中,最后通过write发过去。服务端读取的时候,数据读到m_in_buffer中,然后再decode还原成TinyPBProtocol,也就是客户端发过来的req_protocol对象。然后反序列化req_protocol->m_pb_data到req_msg中,其实就是request对象。然后服务端处理完以后,构建一个response对象。然后在服务端的closure中,再把这个reponse序列化为rsp_protocol,最后调用connection->reply(replay_messages);内部同样是encode编码以后发出去。
socket传输的是字节流
- 历史原因:在C语言中,
char
类型被设计为最小的可寻址单元,通常与一个字节(8位)对应。因此,char*
常被用作处理原始内存的通用指针。 - 字节流的概念:字节流意味着数据被视为一系列连续的字节,而没有任何固有含义。发送数据时,我们并不关心这些字节代表什么(字符、整数、浮点数等),只关心它们作为字节序列的传输。
- 套接字的本质:套接字(socket)传输的是原始字节。应用程序负责赋予这些字节意义。无论是字符串、图像还是其他二进制数据,在套接字层面都被视为字节流。