Coin163

首页 > UDT源码剖析(四):UDT的GC线程相关过程代码注释 - 博客频道 - CSDN.NET

UDT源码剖析(四):UDT的GC线程相关过程代码注释 - 博客频道 - CSDN.NET

相关标签: 线程 源码

2021腾讯云限时秒杀,爆款1核2G云服务器298元/3年!(领取2860元代金券),
地址https://cloud.tencent.com/act/cps/redirect?redirect=1062

2021阿里云最低价产品入口+领取代金券(老用户3折起),
入口地址https://www.aliyun.com/minisite/goods

随着UDT::Startup()的启动,GC线程也随之运行了。
GC主要关注的就是UDTSocket的释放,下面我们来看看这个GC线程是怎么实现的。

查看源代码 打印 帮助 1 #ifndef WIN32 2     void * CUDTUnited::garbageCollect( void * p) 3 #else 4     DWORD  WINAPI CUDTUnited::garbageCollect( LPVOID  p) 5 #endif 6 { 7     // 获得当前GC对应的Socket单位 8     CUDTUnited* self = (CUDTUnited*)p; 9   10     // 应用锁保护,这里的锁是CUDTUnited::m_GCStopLock 11     CGuard gcguard(self->m_GCStopLock); 12   13     // 如果Socket单位没有标记为关闭中,那么GC持续生效 14     while  (!self->m_bClosing) 15     { 16        self->checkBrokenSockets(); 17   18        #ifdef WIN32 19           self->checkTLSValue(); 20        #endif 21   22        // 这里处理超时解锁,windows下的WaitForSingleObject函数直接输入超时时间间隔 23        // 而非windows的需要提供绝对时间,所以需要先根据当前时间得到超时时间,然后作为参数传入pthread_cond_timedwait 24        // 实际上就是当计时器用,确保两次检查的时间间隔最多为1秒钟 25        #ifndef WIN32 26           timeval now; 27           timespec timeout; 28           gettimeofday(&now, 0); 29           timeout.tv_sec = now.tv_sec + 1; 30           timeout.tv_nsec = now.tv_usec * 1000; 31   32           pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout); 33        #else 34           WaitForSingleObject(self->m_GCStopCond, 1000); 35        #endif 36     } 37   38     // 到这里说明Socket单位处于正在关闭的状态 39   40     // remove all sockets and multiplexers 41     // 移除所有的Sockets和多路复用器 42   43     // 启动锁保护,这里不直接用CGuard是因为要多次用到这个锁,并且在下面的checkBrokenSockets中也要使用该锁,所以手动处理 44     CGuard::enterCS(self->m_ControlLock); 45     for  (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i) 46     { 47        // 标记作废,调用关闭,并标记为CLOSED,更新时间戳,将Socket对象加入已关闭列表 48        i->second->m_pUDT->m_bBroken =  true ; 49        i->second->m_pUDT->close(); 50        i->second->m_Status = CLOSED; 51        i->second->m_TimeStamp = CTimer::getTime(); 52        self->m_ClosedSockets[i->first] = i->second; 53   54        // remove from listener's queue 55        // 从监听列表中移除 56        map<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket); 57        if  (ls == self->m_Sockets.end()) 58        { 59            // 如果在Socket列表和已关闭列表中都未找到监听Socket,则不作处理 60           ls = self->m_ClosedSockets.find(i->second->m_ListenSocket); 61           if  (ls == self->m_ClosedSockets.end()) 62              continue ; 63        } 64   65        // 运行到这里说明找到了监听Socket 66        // 从监听Socket的待Accept及已Accept列表中移除当前Socket 67        CGuard::enterCS(ls->second->m_AcceptLock); 68        ls->second->m_pQueuedSockets->erase(i->second->m_SocketID); 69        ls->second->m_pAcceptSockets->erase(i->second->m_SocketID); 70        CGuard::leaveCS(ls->second->m_AcceptLock); 71     } 72     // 已移除所有Socket项目,清空列表 73     self->m_Sockets.clear(); 74   75     // 将已关闭列表中的所有项时间戳标记为0 76     for  (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j) 77     { 78        j->second->m_TimeStamp = 0; 79     } 80   81     // 手动解锁 82     CGuard::leaveCS(self->m_ControlLock); 83   84     while  ( true ) 85     { 86        // 检查作废Socket,直到已关闭Socket列表为空 87        self->checkBrokenSockets(); 88   89        CGuard::enterCS(self->m_ControlLock); 90        bool  empty = self->m_ClosedSockets.empty(); 91        CGuard::leaveCS(self->m_ControlLock); 92   93        if  (empty) 94           break ; 95   96        CTimer::sleep(); 97     } 98   99     #ifndef WIN32 100        return  NULL; 101     #else 102        return  0; 103     #endif 104 } 1 void  CUDTUnited::checkBrokenSockets() 2 { 3     CGuard cg(m_ControlLock); 4   5     // set of sockets To Be Closed and To Be Removed 6     // tbc要关闭的Socket列表 7     // tbr要移除的Socket列表 8     vector<UDTSOCKET> tbc; 9     vector<UDTSOCKET> tbr; 10   11     // 循环单元中所有Socket 12     for  (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i) 13     { 14        // check broken connection 15        // 检查状态是否为已作废 16        if  (i->second->m_pUDT->m_bBroken) 17        { 18           if  (i->second->m_Status == LISTENING) 19           { 20              // for a listening socket, it should wait an extra 3 seconds in case a client is connecting 21              // 如果该Socket是一个监听Socket,那么需要在作废后3秒钟再处理该Socket,因为可能有客户端正在连接 22              if  (CTimer::getTime() - i->second->m_TimeStamp < 3000000) 23                 continue ; 24           } 25           else  if  ((i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0)) 26           { 27              // if there is still data in the receiver buffer, wait longer 28              // 如果接收缓存中还有数据,并且检查计数大于0,则暂不处理 29              // 检查计数m_iBrokenCounter,在通常情况下为30,调用shutdown时为60,可以理解为30秒、60秒 30              continue ; 31           } 32   33           //close broken connections and start removal timer 34           // 关闭作废的连接,并移除计时器 35           // 标记状态为CLOSED,更新时间戳为当前,将Socket加入要关闭的Socket列表 36           i->second->m_Status = CLOSED; 37           i->second->m_TimeStamp = CTimer::getTime(); 38           tbc.push_back(i->first); 39           m_ClosedSockets[i->first] = i->second; 40   41           // remove from listener's queue 42           // 从监听列表中移除 43           map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket); 44           if  (ls == m_Sockets.end()) 45           { 46               // 如果在Socket列表和已关闭列表中都未找到监听Socket,则不作处理 47              ls = m_ClosedSockets.find(i->second->m_ListenSocket); 48              if  (ls == m_ClosedSockets.end()) 49                 continue ; 50           } 51   52           // 运行到这里说明找到了监听Socket 53           // 从监听Socket的待Accept及已Accept列表中移除当前Socket 54           CGuard::enterCS(ls->second->m_AcceptLock); 55           ls->second->m_pQueuedSockets->erase(i->second->m_SocketID); 56           ls->second->m_pAcceptSockets->erase(i->second->m_SocketID); 57           CGuard::leaveCS(ls->second->m_AcceptLock); 58        } 59     } 60   61     // 遍历已关闭Socket列表中的项 62     for  (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j) 63     { 64        // 有犹豫时间 65        if  (j->second->m_pUDT->m_ullLingerExpiration > 0) 66        { 67           // asynchronous close: 68           // 异步关闭 69           // 关闭条件为:发送列表已经为空,或者发送列表的当前缓存大小为0,或者超出犹豫时间 70           if  ((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime())) 71           { 72              j->second->m_pUDT->m_ullLingerExpiration = 0; 73              j->second->m_pUDT->m_bClosing =  true ; 74              j->second->m_TimeStamp = CTimer::getTime(); 75           } 76        } 77   78        // timeout 1 second to destroy a socket AND it has been removed from RcvUList 79        // Socket被标记时间戳后1秒钟,并且已经从接收列表节点中移除,那么把它放入可移除列表 80        if  ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList)) 81        { 82           tbr.push_back(j->first); 83        } 84     } 85   86     // move closed sockets to the ClosedSockets structure 87     // 将tbc可关闭Socket列表中的对象从当前Socket列表中移除,这里作者的注释有问题 88     // 实际上在上面m_ClosedSockets[i->first] = i->second;已经把这些Socket添加到ClosedSockets中了 89     for  (vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k) 90        m_Sockets.erase(*k); 91   92     // remove those timeout sockets 93     // 对tbr可移除Socket列表中的项执行removeSocket 94     for  (vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l) 95        removeSocket(*l); 96 } 1 void  CUDTUnited::removeSocket( const  UDTSOCKET u) 2 { 3     // 可移除的Socket必须在m_ClosedSockets列表中存在 4     map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u); 5   6     // invalid socket ID 7     // 如果不存在则不予移除 8     if  (i == m_ClosedSockets.end()) 9        return ; 10   11     // decrease multiplexer reference count, and remove it if necessary

原文

随着UDT::Startup()的启动,GC线程也随之运行了。 GC主要关注的就是UDTSocket的释放,下面我们来看看这个GC线程是怎么实现的。 查看源代码 打印 帮助 1 #ifndef WIN32 2     void * CUDTUnited:

------分隔线----------------------------