CSocket

CSocket

Csocket是Carlsdk中的socket库,使用这个库即可轻松的完成socket网络编程。

创建套接字

根据需要的套接字类型,创建套接字,此处我打开了端口复用和地址复用,以便服务出现重启后可以立马重新建立连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
bool CSocket::create(int socket_type)
{
int reuse = 1;

sa_family_t sa_family = AF_INET;

if (socket_type == C_SOCKET_LOCAL)
{
sa_family = AF_UNIX;
}

/*Creak socket*/
if ((mSock = socket(sa_family, SOCK_STREAM, 0)) == -1)
{
return false;
}

/*Enable address reuse and port reuse*/
if ((setsockopt(mSock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse)) == -1 ||
setsockopt(mSock, SOL_SOCKET, SO_REUSEPORT, (char *)&reuse, sizeof(reuse)) == -1))
{
return false;
}
else
{
mBlocking = true;
mStatus = C_SOCKET_INIT;
return true;
}
}

绑定(服务端)

由于创建的套接字有两种类型,所以绑定也有两种,使用C++的重载,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/*socket_type is C_SOCKET_NETWORK*/
bool CSocket::bind(char *local_ip, int local_port)
{
sockaddr_in local_addr;

if (mSock == -1)
{
return false;
}

local_addr.sin_family = AF_INET;
if (local_ip == nullptr)
{
local_addr.sin_addr.s_addr = INADDR_ANY;
}
else
{
/*IP address translation*/
inet_pton(AF_INET, local_ip, &local_addr.sin_addr);
}
local_addr.sin_port = htons(local_port);

if (::bind(mSock, (struct sockaddr *)&local_addr, sizeof(local_addr)) == -1)
{
return false;
}
else
{
return true;
}
}

/*socket_type is C_SOCKET_LOCAL*/
bool CSocket::bind(char *socket_name)
{
struct sockaddr_un local_addr;
if (mSock == -1)
{
return false;
}
if (socket_name == nullptr)
{
return false;
}
memset(&local_addr, 0, sizeof(local_addr));
local_addr.sun_family = AF_UNIX;
/*Set the socket file path*/
snprintf(local_addr.sun_path, sizeof(local_addr.sun_path), "%s/%s", C_SOCKET_DIR, socket_name);
unlink(local_addr.sun_path);

if (::bind(mSock, (struct sockaddr *)&local_addr, sizeof(local_addr)) == -1)
{
return false;
}
else
{
return true;
}
}

监听(服务端)

listen 的作用是初始化连接队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool CSocket::listen()
{
if (mSock == -1)
{
return false;
}

/*The maximum number of connections is 15*/
if (::listen(mSock, 15) == -1)
{
return false;
}
else
{
mStatus = C_SOCKET_LISTEN;
return true;
}
}

设置阻塞模式

设置套接字是否阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/*Set blocking mode*/
bool CSocket::setBlocking(bool is_blocking)
{
int soc_option;
if (mSock == -1)
{
return false;
}

if ((soc_option = fcntl(mSock, F_GETFL, 0)) < 0)
{
return false;
}

if (is_blocking)
{
soc_option &= ~O_NONBLOCK;
mBlocking = true;
}
else
{
soc_option |= O_NONBLOCK;
mBlocking = false;
}

if (fcntl(mSock, F_SETFL, soc_option) < 0)
{
return false;
}
else
{
return true;
}
}

心跳检测

TCP内嵌有心跳包,以服务端为例,当server检测到超过一定时间(/proc/sys/net/ipv4/tcp_keepalive_time 7200 即2小时)没有数据传输,那么会向client端发送一个keepalive packet,此时client端有三种反应:

  1. client端连接正常,返回一个ACK.server端收到ACK后重置计时器,在2小时后在发送探测.如果2小时内连接上有数据传输,那么在该时间的基础上向后推延2小时发送探测包;
  2. 客户端异常关闭,或网络断开。client无响应,server收不到ACK,在一定时间(/proc/sys/net/ipv4/tcp_keepalive_intvl 75 即75秒)后重发keepalive packet, 并且重发一定次数(/proc/sys/net/ipv4/tcp_keepalive_probes 9 即9次);
  3. 客户端曾经崩溃,但已经重启.server收到的探测响应是一个复位,server端终止连接。

如果我们不能接受如此之长的等待时间,从TCP-Keepalive-HOWTO上可以知道一共有两种方式可以设置,一种是修改内核关于网络方面的 配置参数,另外一种就是SOL_TCP字段的TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT三个选项。

  • TCP_KEEPIDLE: 开始首次KeepAlive探测前的TCP空闭时间
  • TCP_KEEPINTVL:两次KeepAlive探测间的时间间隔
  • TCP_KEEPCNT: 判定断开前的KeepAlive探测次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int CSocket::keepAlive(int on_off, int keep_idle, int keep_interval, int keep_count)
{
if (mSock == -1)
{
return -1;
}
/*SO_KEEPALIVE: Whether to enable heartbeat detection*/
/*TCP_KEEPIDLE: The tcp_keepidle parameter specifies the interval of inactivity that causes TCP to generate a KEEPALIVE transmission
for an application that requests them. tcp_keepidle defaults to 14400 (two hours).*/
/*TCP_KEEPINTVL:The tcp_keepintvl parameter specifies the interval between the nine retries that are attempted if a KEEPALIVE
transmission is not acknowledged. tcp_keepintvl defaults to 150 (75 seconds).*/
/*TCP_KEEPCNT: The tcp_keepcnt option specifies the maximum number of keepalive probes to be sent. The value of TCP_KEEPCNT is an integer
value between 1 and n, where n is the value of the systemwide tcp_keepcnt parameter.*/
if (setsockopt(mSock, SOL_SOCKET, SO_KEEPALIVE, (void *)&on_off, sizeof(on_off)) == -1 ||
setsockopt(mSock, SOL_TCP, TCP_KEEPIDLE, (void *)&keep_idle, sizeof(keep_idle)) == -1 ||
setsockopt(mSock, SOL_TCP, TCP_KEEPINTVL, (void *)&keep_interval, sizeof(keep_interval)) == -1 ||
setsockopt(mSock, SOL_TCP, TCP_KEEPCNT, (void *)&keep_count, sizeof(keep_count)) == -1)
{
return -1;
}

return 0;
}

等待连接(服务端)

创建等待队列之后,等待并接收客户端的连接请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int CSocket::accept()
{
sockaddr_in server_addr;
int addr_length = 0;
int sock_fd = -1;

if (mSock == -1)
{
return -1;
}

addr_length = sizeof(server_addr);

while (1)
{
if ((sock_fd = ::accept(mSock, (struct sockaddr *)&server_addr, (socklen_t *)&addr_length)) == -1)
{
/*Interrupted, try again*/
if (errno == EINTR)
{
continue;
}
}
return sock_fd;
}
}

发起连接(客户端)

客户端向服务器发起连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
bool CSocket::connect(char *server_ip, int server_port)
{
int ret = 0;
sockaddr_in server_addr;

if (mSock == -1)
{
return false;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
inet_pton(AF_INET, server_ip, &server_addr.sin_addr);
server_addr.sin_port = htons(server_port);

ret = ::connect(mSock, (struct sockaddr *)&server_addr, sizeof(server_addr));

if (ret == -1)
{
/*Connection in progress*/
if (errno == EINPROGRESS)
{
bool ready = false;
/* wait for 100 * write_ready timeout */
for (int count = 100; count > 0; count--)
{
/*Whether the socket can be written. If it can be written, the connection is completed*/
if (isWriteReady())
{
ready = true;
break;
}
}
if (ready == false)
{
return false;
}
}
else
{
return false;
}
}

mStatus = C_SOCKET_CONNECTED;
return true;
}

bool CSocket::connect(char *socket_name)
{
struct sockaddr_un local_addr;
if (mSock == -1)
{
return false;
}
if (socket_name == nullptr)
{
return false;
}
memset(&local_addr, 0, sizeof(local_addr));
local_addr.sun_family = AF_UNIX;
snprintf(local_addr.sun_path, sizeof(local_addr.sun_path), "%s/%s", C_SOCKET_DIR, socket_name);

if (::connect(mSock, (struct sockaddr *)&local_addr, sizeof(local_addr)) == -1)
{
return false;
}
else
{
mStatus = C_SOCKET_CONNECTED;
return true;
}
}

发送和接受

发送和接收主要多了错误判断,如果因为中断而失败就等待1S后重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
int CSocket::send(const char *data, int data_length)
{
int send_length = 0;
if (mSock == -1)
{
return -1;
}

do
{
send_length = ::send(mSock, data, data_length, 0);
printf("::send[%d], ret:%d, errno:%d\n", mSock, send_length, errno);
/*Determine the cause of the error*/
if (send_length == -1)
{
switch (errno)
{
case EAGAIN:
#if (EAGAIN != EWOULDBLOCK)
case EWOULDBLOCK:
#endif
break;
/*Interrupted, try again*/
case EINTR:
usleep(1 * 1000);
continue;

default:
break;
}
}
return send_length;
} while (1);
}

int CSocket::recv(char *data, int data_length)
{
int recv_length = 0;
if (mSock == -1)
{
return -1;
}

do
{
recv_length = ::recv(mSock, data, data_length, 0);
printf("::recv[%d], ret:%d, errno:%d.\n", mSock, recv_length, errno);
/*Determine the cause of the error*/
if (recv_length == -1)
{
switch (errno)
{
case EAGAIN:
#if (EAGAIN != EWOULDBLOCK)
case EWOULDBLOCK:
#endif
break;
/*Interrupted, try again*/
case EINTR:
usleep(1 * 1000);
continue;

default:
break;
}
}
return recv_length;
} while (1);
}

关闭套接字

Linux下tcp连接断开的时候调用close()函数,有优雅断开和强制断开两种方式。

那么如何设置断开连接的方式呢?是通过设置socket描述符一个linger结构体属性。

linger结构体数据结构如下:

1
2
3
4
5
6
7
#include <arpa/inet.h>

struct linger
{
  int l_onoff;
  int l_linger;
};

三种断开方式:

  1. l_onoff = 0; l_linger忽略

    close()立刻返回,底层会将未发送完的数据发送完成后再释放资源,即优雅退出。

  2. l_onoff != 0; l_linger = 0;

    close()立刻返回,但不会发送未发送完成的数据,而是通过一个REST包强制的关闭socket描述符,即强制退出。

  3. l_onoff != 0; l_linger > 0;

    close()不会立刻返回,内核会延迟一段时间,这个时间就由l_linger的值来决定。如果超时时间到达之前,发送完未发送的数据(包括FIN包)并得到另一端的确认,close()会返回正确,socket描述符优雅性退出。否则,close()会直接返回错误值,未发送数据丢失,socket描述符被强制性退出。需要注意的时,如果socket描述符被设置为非堵塞型,则close()会直接返回值。

使用方式如下:

1
2
struct linger ling = {0, 0};
setsockopt(socketfd, SOL_SOCKET, SO_LINGER, (void*)&ling, sizeof(ling));

具体实现入下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void CSocket::close()
{
struct linger linger;
linger.l_onoff = 1;
linger.l_linger = 1;

if (mSock != -1)
{
/*If there is unsent data, wait for one second*/
setsockopt(mSock, SOL_SOCKET, SO_LINGER, (char *)&linger, sizeof(linger));
::close(mSock);
mSock = -1;
}
mStatus = C_SOCKET_CLOSED;
}

/*SHUT_RDWR close read and write func*/
void CSocket::shutdown()
{
if (mSock != -1)
{
/*Shutdown connection*/
::shutdown(mSock, SHUT_RDWR);
}
mStatus = C_SOCKET_CLOSED;
}

是否可读或可写

此接口是用来判断因为超时在重试中的连接是否连接成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
bool CSocket::isWriteReady()
{
bool write_ready = false;
int ret;
int count = 10;
fd_set wfds;
struct timeval tv;

if (mBlocking)
{
return true;
}

do
{
/*set ms_sock to write fds*/
FD_ZERO(&wfds);
FD_SET(mSock, &wfds);
/*set timeout timer*/
tv.tv_sec = 0;
tv.tv_usec = 5 * 1000;

ret = select(mSock + 1, NULL, &wfds, NULL, &tv);

if (ret > 0)
{
/*ready for write*/
if (FD_ISSET(mSock, &wfds))
{
write_ready = true;
break;
}
}
else if (ret == 0)
{
/*select is timeout*/
write_ready = false;
count--;
}
else
{
/*Interrupted, try again*/
if (errno != EINTR)
{
write_ready = false;
break;
}
}

} while (count > 0);

return write_ready;
}

bool CSocket::isReadReady()
{
bool read_ready = false;
int ret;
int count = 10;
fd_set rfds;
struct timeval tv;

if (mBlocking)
{
return true;
}

do
{
/*set ms_sock to read fds*/
FD_ZERO(&rfds);
FD_SET(mSock, &rfds);
/*set timeout timer*/
tv.tv_sec = 0;
tv.tv_usec = 5 * 1000;

ret = select(mSock + 1, &rfds, NULL, NULL, &tv);

if (ret > 0)
{
/*ready for read*/
if (FD_ISSET(mSock, &rfds))
{
read_ready = true;
break;
}
}
else if (ret == 0)
{
/*select is timeout*/
count--;
read_ready = false;
}
else if (ret < 0)
{
/*Interrupted, try again*/
if (errno != EINTR)
{
read_ready = false;
break;
}
}
} while (count > 0);
return read_ready;
}

CSocket
https://carl-5535.github.io/2022/10/23/CarlSDK/CSocket/
作者
Carl Chen
发布于
2022年10月23日
许可协议