关于回调

关于客户端和服务端回调的时候,写了一个简单的例子

共6个文件 client.cpp common.hpp c_test.cpp Makefile server.cpp s_test.cpp

server.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include "common.hpp"
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <vector>
#include <unordered_map>

class AsyncServer
{
public:
AsyncServer(int port) : port_(port), listen_fd_(-1) {}

void SetConnectCallback(ConnectCallback cb) { conn_cb_ = cb; }
void SetMessageCallback(MessageCallback cb) { msg_cb_ = cb; }

void Start()
{
// 创建监听socket
listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in addr{AF_INET, htons(port_), INADDR_ANY};

bind(listen_fd_, (sockaddr *)&addr, sizeof(addr));
listen(listen_fd_, 128);

// 事件循环
while (true)
{
sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
int client_fd = accept(listen_fd_, (sockaddr *)&client_addr, &len);
std::cout << "accept client = " << client_fd << std::endl;
if (client_fd > 0)
{
// 触发连接回调
if (conn_cb_)
conn_cb_(client_fd);

// 注册消息回调
contexts_[client_fd] = {client_fd, nullptr,
[this](int fd, const std::string &msg)
{
if (msg_cb_)
msg_cb_(fd, msg);
}};

// 启动接收线程
std::thread([this, client_fd]()
{
char buf[1024];
while(true) {
ssize_t len = recv(client_fd, buf, sizeof(buf), 0);
if(len > 0) {
contexts_[client_fd].on_message(client_fd, std::string(buf, len));
send(client_fd, buf, len, 0);
}
else break;
}
close(client_fd); })
.detach();
}
}
}

private:
int port_, listen_fd_;
std::unordered_map<int, EventContext> contexts_;
ConnectCallback conn_cb_;
MessageCallback msg_cb_;
};

client.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include "common.hpp"
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>

class AsyncClient
{
public:
AsyncClient(const std::string &ip, int port)
: server_ip_(ip), server_port_(port), sock_fd_(-1) {}

void SetConnectCallback(ConnectCallback cb) { conn_cb_ = cb; }
void SetMessageCallback(MessageCallback cb) { msg_cb_ = cb; }

void Connect()
{
sock_fd_ = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in addr{AF_INET, htons(server_port_), inet_addr(server_ip_.c_str())};

// 异步连接
std::thread([this, addr]()
{
if(connect(sock_fd_, (sockaddr*)&addr, sizeof(addr)) == 0) {
std::cout << "connect success" << std::endl;
if(conn_cb_) conn_cb_(sock_fd_);

// 启动接收线程
char buf[1024];
while(true) {
ssize_t len = recv(sock_fd_, buf, sizeof(buf), 0);
if(len > 0 && msg_cb_) {
msg_cb_(sock_fd_, std::string(buf, len));
}
else break;
}
} })
.detach();


std::string msg;
while(true){
std::cout << "sock_fd_ = " << sock_fd_ << std::endl;
std::cout << ">> ";
std::getline(std::cin, msg);
Send(msg);
}

}

void Send(const std::string &msg)
{
send(sock_fd_, msg.c_str(), msg.size(), 0);
}

private:
std::string server_ip_;
int server_port_, sock_fd_;
ConnectCallback conn_cb_;
MessageCallback msg_cb_;
};

common.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#pragma once
#include <functional>
#include <string>
#include <iostream>
#include <thread>

// 定义回调类型
using ConnectCallback = std::function<void(int)>;
using MessageCallback = std::function<void(int, const std::string &)>;
using CloseCallback = std::function<void(int)>;

struct EventContext
{
int fd;
ConnectCallback on_connect;
MessageCallback on_message;
};

c_test.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include "client.cpp"

void RunClient()
{
AsyncClient client("0.0.0.0", 8080);
client.SetConnectCallback([&client](int fd)
{
std::cout << "[Client] Connected!" << std::endl;
client.Send("Hello Server!"); });
client.SetMessageCallback([](int fd, const std::string &msg)
{ std::cout << "[Client] Received: " << msg << std::endl; });
client.Connect();

}

int main()
{
RunClient();
return 0;
}

s_test.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "server.cpp"

void RunServer()
{
AsyncServer server(8080);
server.SetConnectCallback([](int fd)
{ std::cout << "[Server] New client: " << fd << std::endl; });
server.SetMessageCallback([](int fd, const std::string &msg)
{ std::cout << "[Server] Received from " << fd << ": " << msg << std::endl; });
server.Start();
}

int main()
{
RunServer();
return 0;
}

Makefile

1
2
3
4
5
6
7
8
9
10
11
12
13
CXX := g++
CXXFLAGS := -std=c++17 -lpthread

all: s_test c_test

s_test: s_test.cpp
$(CXX) $(CXXFLAGS) s_test.cpp -o s_test

c_test: c_test.cpp
$(CXX) $(CXXFLAGS) c_test.cpp -o c_test

clean:
rm -f *.o *test

一些注意的点

使用netstat -tulnp | grep 端口号去排查你的端口是否被占用,占用了得关闭相关的进程,该命令的输出会显示ipv4和ipv6所有本地以及外部监听的端口。

代码比较简单,以客户端为例,其实就是注册一个连接的回调A,注册一个收到消息的回调B。关键在于回调的调用时机