2021腾讯云限时秒杀,爆款1核2G云服务器298元/3年!(领取2860元代金券),
地址:https://cloud.tencent.com/act/cps/redirect?redirect=1062
2021阿里云最低价产品入口+领取代金券(老用户3折起),
入口地址:https://www.aliyun.com/minisite/goods
随着UDT::Startup()的启动,GC线程也随之运行了。
GC主要关注的就是UDTSocket的释放,下面我们来看看这个GC线程是怎么实现的。
查看源代码 打印 帮助
1
#ifndef WIN32
2
void
* CUDTUnited::garbageCollect(
void
* p)
3
#else
4
DWORD
WINAPI CUDTUnited::garbageCollect(
LPVOID
p)
5
#endif
6
{
7
// 获得当前GC对应的Socket单位
8
CUDTUnited* self = (CUDTUnited*)p;
9
10
// 应用锁保护,这里的锁是CUDTUnited::m_GCStopLock
11
CGuard gcguard(self->m_GCStopLock);
12
13
// 如果Socket单位没有标记为关闭中,那么GC持续生效
14
while
(!self->m_bClosing)
15
{
16
self->checkBrokenSockets();
17
18
#ifdef WIN32
19
self->checkTLSValue();
20
#endif
21
22
// 这里处理超时解锁,windows下的WaitForSingleObject函数直接输入超时时间间隔
23
// 而非windows的需要提供绝对时间,所以需要先根据当前时间得到超时时间,然后作为参数传入pthread_cond_timedwait
24
// 实际上就是当计时器用,确保两次检查的时间间隔最多为1秒钟
25
#ifndef WIN32
26
timeval now;
27
timespec timeout;
28
gettimeofday(&now, 0);
29
timeout.tv_sec = now.tv_sec + 1;
30
timeout.tv_nsec = now.tv_usec * 1000;
31
32
pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);
33
#else
34
WaitForSingleObject(self->m_GCStopCond, 1000);
35
#endif
36
}
37
38
// 到这里说明Socket单位处于正在关闭的状态
39
40
// remove all sockets and multiplexers
41
// 移除所有的Sockets和多路复用器
42
43
// 启动锁保护,这里不直接用CGuard是因为要多次用到这个锁,并且在下面的checkBrokenSockets中也要使用该锁,所以手动处理
44
CGuard::enterCS(self->m_ControlLock);
45
for
(map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i)
46
{
47
// 标记作废,调用关闭,并标记为CLOSED,更新时间戳,将Socket对象加入已关闭列表
48
i->second->m_pUDT->m_bBroken =
true
;
49
i->second->m_pUDT->close();
50
i->second->m_Status = CLOSED;
51
i->second->m_TimeStamp = CTimer::getTime();
52
self->m_ClosedSockets[i->first] = i->second;
53
54
// remove from listener's queue
55
// 从监听列表中移除
56
map<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);
57
if
(ls == self->m_Sockets.end())
58
{
59
// 如果在Socket列表和已关闭列表中都未找到监听Socket,则不作处理
60
ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);
61
if
(ls == self->m_ClosedSockets.end())
62
continue
;
63
}
64
65
// 运行到这里说明找到了监听Socket
66
// 从监听Socket的待Accept及已Accept列表中移除当前Socket
67
CGuard::enterCS(ls->second->m_AcceptLock);
68
ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
69
ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
70
CGuard::leaveCS(ls->second->m_AcceptLock);
71
}
72
// 已移除所有Socket项目,清空列表
73
self->m_Sockets.clear();
74
75
// 将已关闭列表中的所有项时间戳标记为0
76
for
(map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j)
77
{
78
j->second->m_TimeStamp = 0;
79
}
80
81
// 手动解锁
82
CGuard::leaveCS(self->m_ControlLock);
83
84
while
(
true
)
85
{
86
// 检查作废Socket,直到已关闭Socket列表为空
87
self->checkBrokenSockets();
88
89
CGuard::enterCS(self->m_ControlLock);
90
bool
empty = self->m_ClosedSockets.empty();
91
CGuard::leaveCS(self->m_ControlLock);
92
93
if
(empty)
94
break
;
95
96
CTimer::sleep();
97
}
98
99
#ifndef WIN32
100
return
NULL;
101
#else
102
return
0;
103
#endif
104
}
1
void
CUDTUnited::checkBrokenSockets()
2
{
3
CGuard cg(m_ControlLock);
4
5
// set of sockets To Be Closed and To Be Removed
6
// tbc要关闭的Socket列表
7
// tbr要移除的Socket列表
8
vector<UDTSOCKET> tbc;
9
vector<UDTSOCKET> tbr;
10
11
// 循环单元中所有Socket
12
for
(map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)
13
{
14
// check broken connection
15
// 检查状态是否为已作废
16
if
(i->second->m_pUDT->m_bBroken)
17
{
18
if
(i->second->m_Status == LISTENING)
19
{
20
// for a listening socket, it should wait an extra 3 seconds in case a client is connecting
21
// 如果该Socket是一个监听Socket,那么需要在作废后3秒钟再处理该Socket,因为可能有客户端正在连接
22
if
(CTimer::getTime() - i->second->m_TimeStamp < 3000000)
23
continue
;
24
}
25
else
if
((i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0))
26
{
27
// if there is still data in the receiver buffer, wait longer
28
// 如果接收缓存中还有数据,并且检查计数大于0,则暂不处理
29
// 检查计数m_iBrokenCounter,在通常情况下为30,调用shutdown时为60,可以理解为30秒、60秒
30
continue
;
31
}
32
33
//close broken connections and start removal timer
34
// 关闭作废的连接,并移除计时器
35
// 标记状态为CLOSED,更新时间戳为当前,将Socket加入要关闭的Socket列表
36
i->second->m_Status = CLOSED;
37
i->second->m_TimeStamp = CTimer::getTime();
38
tbc.push_back(i->first);
39
m_ClosedSockets[i->first] = i->second;
40
41
// remove from listener's queue
42
// 从监听列表中移除
43
map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);
44
if
(ls == m_Sockets.end())
45
{
46
// 如果在Socket列表和已关闭列表中都未找到监听Socket,则不作处理
47
ls = m_ClosedSockets.find(i->second->m_ListenSocket);
48
if
(ls == m_ClosedSockets.end())
49
continue
;
50
}
51
52
// 运行到这里说明找到了监听Socket
53
// 从监听Socket的待Accept及已Accept列表中移除当前Socket
54
CGuard::enterCS(ls->second->m_AcceptLock);
55
ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
56
ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
57
CGuard::leaveCS(ls->second->m_AcceptLock);
58
}
59
}
60
61
// 遍历已关闭Socket列表中的项
62
for
(map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j)
63
{
64
// 有犹豫时间
65
if
(j->second->m_pUDT->m_ullLingerExpiration > 0)
66
{
67
// asynchronous close:
68
// 异步关闭
69
// 关闭条件为:发送列表已经为空,或者发送列表的当前缓存大小为0,或者超出犹豫时间
70
if
((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime()))
71
{
72
j->second->m_pUDT->m_ullLingerExpiration = 0;
73
j->second->m_pUDT->m_bClosing =
true
;
74
j->second->m_TimeStamp = CTimer::getTime();
75
}
76
}
77
78
// timeout 1 second to destroy a socket AND it has been removed from RcvUList
79
// Socket被标记时间戳后1秒钟,并且已经从接收列表节点中移除,那么把它放入可移除列表
80
if
((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList))
81
{
82
tbr.push_back(j->first);
83
}
84
}
85
86
// move closed sockets to the ClosedSockets structure
87
// 将tbc可关闭Socket列表中的对象从当前Socket列表中移除,这里作者的注释有问题
88
// 实际上在上面m_ClosedSockets[i->first] = i->second;已经把这些Socket添加到ClosedSockets中了
89
for
(vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)
90
m_Sockets.erase(*k);
91
92
// remove those timeout sockets
93
// 对tbr可移除Socket列表中的项执行removeSocket
94
for
(vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
95
removeSocket(*l);
96
}
1
void
CUDTUnited::removeSocket(
const
UDTSOCKET u)
2
{
3
// 可移除的Socket必须在m_ClosedSockets列表中存在
4
map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);
5
6
// invalid socket ID
7
// 如果不存在则不予移除
8
if
(i == m_ClosedSockets.end())
9
return
;
10
11
// decrease multiplexer reference count, and remove it if necessary