Return-Path: Delivered-To: apmail-incubator-trafficserver-commits-archive@minotaur.apache.org Received: (qmail 62039 invoked from network); 9 Dec 2009 23:02:05 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 9 Dec 2009 23:02:05 -0000 Received: (qmail 76584 invoked by uid 500); 9 Dec 2009 23:02:05 -0000 Delivered-To: apmail-incubator-trafficserver-commits-archive@incubator.apache.org Received: (qmail 76547 invoked by uid 500); 9 Dec 2009 23:02:05 -0000 Mailing-List: contact trafficserver-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: trafficserver-dev@incubator.apache.org Delivered-To: mailing list trafficserver-commits@incubator.apache.org Received: (qmail 76538 invoked by uid 99); 9 Dec 2009 23:02:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Dec 2009 23:02:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Dec 2009 23:01:54 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 55CD523888FD; Wed, 9 Dec 2009 23:01:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r889011 [1/2] - in /incubator/trafficserver/traffic/trunk: iocore/net/ proxy/ Date: Wed, 09 Dec 2009 23:01:30 -0000 To: trafficserver-commits@incubator.apache.org From: jplevyak@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091209230131.55CD523888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jplevyak Date: Wed Dec 9 23:01:29 2009 New Revision: 889011 URL: http://svn.apache.org/viewvc?rev=889011&view=rev Log: TS-54: part 1 Convert iocore/net over to using the new List with the offset to the link specified as part of the collection template. Also, use atomic lists instead of just taking a lock to handle enables from other threads and integrate the epoll data. Finally, unify the stle of member variables in the iocore/net directory. This will be replaced by our global style at a later date. Modified: incubator/trafficserver/traffic/trunk/iocore/net/I_NetVConnection.h incubator/trafficserver/traffic/trunk/iocore/net/I_UDPPacket.h incubator/trafficserver/traffic/trunk/iocore/net/P_LibBulkIO.h incubator/trafficserver/traffic/trunk/iocore/net/P_Net.h incubator/trafficserver/traffic/trunk/iocore/net/P_NetAccept.h incubator/trafficserver/traffic/trunk/iocore/net/P_UDPConnection.h incubator/trafficserver/traffic/trunk/iocore/net/P_UDPNet.h incubator/trafficserver/traffic/trunk/iocore/net/P_UDPPacket.h incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNet.h incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetProcessor.h incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetState.h incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetVConnection.h incubator/trafficserver/traffic/trunk/iocore/net/P_UnixUDPConnection.h incubator/trafficserver/traffic/trunk/iocore/net/SSLNetVConnection.cc incubator/trafficserver/traffic/trunk/iocore/net/SSLUnixNet.cc incubator/trafficserver/traffic/trunk/iocore/net/UnixNet.cc incubator/trafficserver/traffic/trunk/iocore/net/UnixNetAccept.cc incubator/trafficserver/traffic/trunk/iocore/net/UnixNetPages.cc incubator/trafficserver/traffic/trunk/iocore/net/UnixNetProcessor.cc incubator/trafficserver/traffic/trunk/iocore/net/UnixNetVConnection.cc incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPConnection.cc incubator/trafficserver/traffic/trunk/iocore/net/UnixUDPNet.cc incubator/trafficserver/traffic/trunk/proxy/CoreUtils.cc incubator/trafficserver/traffic/trunk/proxy/InkIOCoreAPI.cc incubator/trafficserver/traffic/trunk/proxy/Main.cc Modified: incubator/trafficserver/traffic/trunk/iocore/net/I_NetVConnection.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/I_NetVConnection.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/I_NetVConnection.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/I_NetVConnection.h Wed Dec 9 23:01:29 2009 @@ -32,7 +32,7 @@ #include "I_IOBuffer.h" #include "I_Socks.h" -#define WITH_DETTAILED_VCONNECTION_LOGGING 1 +// #define WITH_DETTAILED_VCONNECTION_LOGGING 1 #if WITH_DETTAILED_VCONNECTION_LOGGING #include "DetailedLog.h" @@ -457,6 +457,12 @@ { return (logging != NULL); } +#else + void addLogMessage(const char *message) {} + bool loggingEnabled() const { return false; } + ink_hrtime getLogsTotalTime() const { return 0; } + void printLogs() const {} + void clearLogs() {} #endif private: Modified: incubator/trafficserver/traffic/trunk/iocore/net/I_UDPPacket.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/I_UDPPacket.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/I_UDPPacket.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/I_UDPPacket.h Wed Dec 9 23:01:29 2009 @@ -67,10 +67,10 @@ char *asBuf(int *len = NULL); virtual void UDPPacket_is_abstract() = 0; - struct sockaddr_in m_from; // what address came from - struct sockaddr_in m_to; // what address to send to + struct sockaddr_in from; // what address came from + struct sockaddr_in to; // what address to send to - int m_from_size; + int from_size; Link link; }; Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_LibBulkIO.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_LibBulkIO.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_LibBulkIO.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_LibBulkIO.h Wed Dec 9 23:01:29 2009 @@ -64,87 +64,87 @@ { InkBulkIOAggregator() { - m_metaReqCount = 0; - m_metablockInfo.ptr = NULL; - m_metablockInfo.id = 0xffffffff; - m_metablockReqPtr = NULL; - - m_lastReqFragCount = 0; - m_lastReq = NULL; - m_reqblockInfo.ptr = NULL; - m_reqblockInfo.id = 0xffffffff; - m_reqblockPktPtr = NULL; + metaReqCount = 0; + metablockInfo.ptr = NULL; + metablockInfo.id = 0xffffffff; + metablockReqPtr = NULL; + + lastReqFragCount = 0; + lastReq = NULL; + reqblockInfo.ptr = NULL; + reqblockInfo.id = 0xffffffff; + reqblockPktPtr = NULL; }; - struct InkBulkIOBlock m_metablockInfo; + struct InkBulkIOBlock metablockInfo; // Location where the next req. block id should be stuffed in the meta block. - uint32_t *m_metablockReqPtr; - uint32_t m_metaReqCount; - struct InkBulkIOBlock m_reqblockInfo; + uint32_t *metablockReqPtr; + uint32_t metaReqCount; + struct InkBulkIOBlock reqblockInfo; // Location where the next packet should be stuffed in the req. block - struct InkBulkIOPkt *m_reqblockPktPtr; + struct InkBulkIOPkt *reqblockPktPtr; // # of fragments in the last request. - uint32_t m_lastReqFragCount; - struct InkBulkIORequest *m_lastReq; + uint32_t lastReqFragCount; + struct InkBulkIORequest *lastReq; void ResetLastRequestInfo() { - m_lastReqFragCount = 0; - m_lastReq = NULL; - m_reqblockInfo.ptr = NULL; - m_reqblockInfo.id = 0xffffffff; - m_reqblockPktPtr = NULL; + lastReqFragCount = 0; + lastReq = NULL; + reqblockInfo.ptr = NULL; + reqblockInfo.id = 0xffffffff; + reqblockPktPtr = NULL; }; void ResetMetaBlockInfo() { - m_metaReqCount = 0; - m_metablockInfo.ptr = NULL; - m_metablockInfo.id = 0xffffffff; - m_metablockReqPtr = NULL; + metaReqCount = 0; + metablockInfo.ptr = NULL; + metablockInfo.id = 0xffffffff; + metablockReqPtr = NULL; }; bool AppendLastRequest() { - if (m_metaReqCount >= INKBIO_MAX_REQS_PER_REQ_BLOCK) + if (metaReqCount >= INKBIO_MAX_REQS_PER_REQ_BLOCK) return false; - memcpy(m_metablockReqPtr, &(m_reqblockInfo.id), sizeof(uint32_t)); - m_metablockReqPtr++; - m_metaReqCount++; + memcpy(metablockReqPtr, &(reqblockInfo.id), sizeof(uint32_t)); + metablockReqPtr++; + metaReqCount++; return true; }; void TerminateMetaBlock() { - *m_metablockReqPtr = 0xffffffff; + *metablockReqPtr = 0xffffffff; }; void TerminateLastRequest() { - m_reqblockPktPtr->blockID = 0xffffffff; - m_reqblockPktPtr->pktsize = 0xffff; - m_reqblockPktPtr->inChain = 0; - m_reqblockPktPtr->reserved = 0; + reqblockPktPtr->blockID = 0xffffffff; + reqblockPktPtr->pktsize = 0xffff; + reqblockPktPtr->inChain = 0; + reqblockPktPtr->reserved = 0; }; void InitMetaBlock() { - m_metablockReqPtr = (uint32_t *) m_metablockInfo.ptr; - m_metaReqCount = 0; + metablockReqPtr = (uint32_t *) metablockInfo.ptr; + metaReqCount = 0; }; void InitSendtoReqBlock() { - m_reqblockPktPtr = (struct InkBulkIOPkt *) - ((caddr_t) m_reqblockInfo.ptr + sizeof(InkBulkIORequest)); - m_lastReq = (struct InkBulkIORequest *) m_reqblockInfo.ptr; - m_lastReq->reqType = INKBIO_SENDTO_REQUEST; - m_lastReq->request.sendto.pktCount = 0; - m_lastReqFragCount = 0; + reqblockPktPtr = (struct InkBulkIOPkt *) + ((caddr_t) reqblockInfo.ptr + sizeof(InkBulkIORequest)); + lastReq = (struct InkBulkIORequest *) reqblockInfo.ptr; + lastReq->reqType = INKBIO_SENDTO_REQUEST; + lastReq->request.sendto.pktCount = 0; + lastReqFragCount = 0; }; void InitSplitReqBlock() { - m_reqblockPktPtr = (struct InkBulkIOPkt *) - ((caddr_t) m_reqblockInfo.ptr + sizeof(InkBulkIORequest)); - m_lastReq = (struct InkBulkIORequest *) m_reqblockInfo.ptr; - m_lastReq->reqType = INKBIO_SPLIT_REQUEST; - m_lastReq->request.split.recvCount = 0; - m_lastReq->request.split.perDestHeader = 0; - m_lastReqFragCount = 0; + reqblockPktPtr = (struct InkBulkIOPkt *) + ((caddr_t) reqblockInfo.ptr + sizeof(InkBulkIORequest)); + lastReq = (struct InkBulkIORequest *) reqblockInfo.ptr; + lastReq->reqType = INKBIO_SPLIT_REQUEST; + lastReq->request.split.recvCount = 0; + lastReq->request.split.perDestHeader = 0; + lastReqFragCount = 0; }; }; Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_Net.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_Net.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_Net.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_Net.h Wed Dec 9 23:01:29 2009 @@ -116,18 +116,6 @@ #include "P_SSLCertLookup.h" #endif -//added by YTS Team, yamsat -struct epoll_data_ptr -{ - int type; - union - { - UnixNetVConnection *vc; - DNSConnection *dnscon; - NetAccept *na; - UnixUDPConnection *uc; - } data; -}; #undef NET_SYSTEM_MODULE_VERSION #define NET_SYSTEM_MODULE_VERSION makeModuleVersion( \ @@ -135,14 +123,4 @@ NET_SYSTEM_MODULE_MINOR_VERSION, \ PRIVATE_MODULE_HEADER) -//Debug stuff -//define ENABLE_NET_TRUSS - -#ifdef ENABLE_NET_TRUSS -#define NET_TRUSS(x) x -#else -#define NET_TRUSS(x) -#endif - - #endif Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_NetAccept.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_NetAccept.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_NetAccept.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_NetAccept.h Wed Dec 9 23:01:29 2009 @@ -91,12 +91,13 @@ int ifd; int ifd_seq_num; bool callback_on_open; - Ptr action_; + Ptr action_; int recv_bufsize; int send_bufsize; unsigned long sockopt_flags; EventType etype; UnixNetVConnection *epoll_vc; // only storage for epoll events + struct epoll_data_ptr ep; // Functions all THREAD_FREE and THREAD_ALLOC to be performed // for both SSL and regular NetVConnection transparent to @@ -117,8 +118,8 @@ int acceptLoopEvent(int event, Event * e); void cancel(); - NetAccept(); - virtual ~ NetAccept() + NetAccept(); + virtual ~ NetAccept() { action_ = NULL; }; Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_UDPConnection.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_UDPConnection.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_UDPConnection.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_UDPConnection.h Wed Dec 9 23:01:29 2009 @@ -43,14 +43,14 @@ Continuation *continuation; int recvActive; // interested in receiving - int m_refcount; // public for assertion + int refcount; // public for assertion - SOCKET m_fd; - struct sockaddr_in m_binding; - int m_binding_valid; - int m_tobedestroyed; - int m_sendGenerationNum; - ink64 m_lastSentPktTSSeqNum; + SOCKET fd; + struct sockaddr_in binding; + int binding_valid; + int tobedestroyed; + int sendGenerationNum; + ink64 lastSentPktTSSeqNum; // this is for doing packet scheduling: we keep two values so that we can // implement cancel. The first value tracks the startTime of the last @@ -58,53 +58,44 @@ // startTime of the last packet when we are doing scheduling; whenever the // associated continuation cancels a packet, we rest lastPktStartTime to be // the same as the lastSentPktStartTime. - inku64 m_lastSentPktStartTime; - inku64 m_lastPktStartTime; - ink32 m_pipe_class; - inku32 m_nBytesDone; - inku32 m_nBytesTodo; + inku64 lastSentPktStartTime; + inku64 lastPktStartTime; + ink32 pipe_class; + inku32 nBytesDone; + inku32 nBytesTodo; // flow rate in Bytes per sec. - double m_flowRateBps; - double m_avgPktSize; - ink64 m_allocedbps; + double flowRateBps; + double avgPktSize; + ink64 allocedbps; //this class is abstract }; inline UDPConnectionInternal::UDPConnectionInternal() - : -continuation(NULL) - , -recvActive(0) - , -m_refcount(0) - , -m_fd(-1) - , -m_binding_valid(0) - , -m_tobedestroyed(0) - , -m_nBytesDone(0) - , -m_nBytesTodo(0) -{ - m_sendGenerationNum = 0; - m_lastSentPktTSSeqNum = -1; - m_lastSentPktStartTime = 0; - m_lastPktStartTime = 0; - m_pipe_class = 0; - m_flowRateBps = 0.0; - m_avgPktSize = 0.0; - m_allocedbps = 0; - memset(&m_binding, 0, sizeof m_binding); + : continuation(NULL) + , recvActive(0) + , refcount(0) + , fd(-1) + , binding_valid(0) + , tobedestroyed(0) + , nBytesDone(0) + , nBytesTodo(0) +{ + sendGenerationNum = 0; + lastSentPktTSSeqNum = -1; + lastSentPktStartTime = 0; + lastPktStartTime = 0; + pipe_class = 0; + flowRateBps = 0.0; + avgPktSize = 0.0; + allocedbps = 0; + memset(&binding, 0, sizeof binding); //SET_HANDLER(&BaseUDPConnection::callbackHandler); } inline -UDPConnectionInternal::~ -UDPConnectionInternal() +UDPConnectionInternal::~UDPConnectionInternal() { udpNet.FreeBandwidth(this); continuation = NULL; @@ -115,78 +106,78 @@ INK_INLINE SOCKET UDPConnection::getFd() { - return ((UDPConnectionInternal *) this)->m_fd; + return ((UDPConnectionInternal *) this)->fd; } INK_INLINE void UDPConnection::setBinding(struct sockaddr_in *s) { UDPConnectionInternal *p = (UDPConnectionInternal *) this; - memcpy(&p->m_binding, s, sizeof(p->m_binding)); - p->m_binding_valid = 1; + memcpy(&p->binding, s, sizeof(p->binding)); + p->binding_valid = 1; } INK_INLINE int UDPConnection::getBinding(struct sockaddr_in *s) { UDPConnectionInternal *p = (UDPConnectionInternal *) this; - memcpy(s, &p->m_binding, sizeof(*s)); - return p->m_binding_valid; + memcpy(s, &p->binding, sizeof(*s)); + return p->binding_valid; } INK_INLINE int UDPConnection::get_ndone() { - return ((UDPConnectionInternal *) this)->m_nBytesDone; + return ((UDPConnectionInternal *) this)->nBytesDone; } INK_INLINE int UDPConnection::get_ntodo() { - return ((UDPConnectionInternal *) this)->m_nBytesTodo; + return ((UDPConnectionInternal *) this)->nBytesTodo; } // return the b/w allocated to this UDPConnection in Mbps INK_INLINE double UDPConnection::get_allocatedBandwidth() { - return (((UDPConnectionInternal *) this)->m_flowRateBps * 8.0) / (1024.0 * 1024.0); + return (((UDPConnectionInternal *) this)->flowRateBps * 8.0) / (1024.0 * 1024.0); } INK_INLINE void UDPConnection::destroy() { - ((UDPConnectionInternal *) this)->m_tobedestroyed = 1; + ((UDPConnectionInternal *) this)->tobedestroyed = 1; } INK_INLINE int UDPConnection::shouldDestroy() { - return ((UDPConnectionInternal *) this)->m_tobedestroyed; + return ((UDPConnectionInternal *) this)->tobedestroyed; } INK_INLINE void UDPConnection::AddRef() { - ink_atomic_increment(&((UDPConnectionInternal *) this)->m_refcount, 1); + ink_atomic_increment(&((UDPConnectionInternal *) this)->refcount, 1); } INK_INLINE int UDPConnection::GetRefCount() { - return ((UDPConnectionInternal *) this)->m_refcount; + return ((UDPConnectionInternal *) this)->refcount; } INK_INLINE int UDPConnection::GetSendGenerationNumber() { - return ((UDPConnectionInternal *) this)->m_sendGenerationNum; + return ((UDPConnectionInternal *) this)->sendGenerationNum; } INK_INLINE int UDPConnection::getPortNum(void) { - return ((UDPConnectionInternal *) this)->m_binding.sin_port; + return ((UDPConnectionInternal *) this)->binding.sin_port; } INK_INLINE ink64 @@ -194,15 +185,15 @@ { UDPConnectionInternal *p = (UDPConnectionInternal *) this; - p->m_sendGenerationNum++; - p->m_lastPktStartTime = p->m_lastSentPktStartTime; - return p->m_lastSentPktTSSeqNum; + p->sendGenerationNum++; + p->lastPktStartTime = p->lastSentPktStartTime; + return p->lastSentPktTSSeqNum; }; INK_INLINE void UDPConnection::SetLastSentPktTSSeqNum(ink64 sentSeqNum) { - ((UDPConnectionInternal *) this)->m_lastSentPktTSSeqNum = sentSeqNum; + ((UDPConnectionInternal *) this)->lastSentPktTSSeqNum = sentSeqNum; }; INK_INLINE void Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_UDPNet.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_UDPNet.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_UDPNet.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_UDPNet.h Wed Dec 9 23:01:29 2009 @@ -44,33 +44,26 @@ class UDPNetHandler; -struct UDPNetProcessorInternal:public UDPNetProcessor +struct UDPNetProcessorInternal : public UDPNetProcessor { - virtual int start(int n_udp_threads); - #if defined (_IOCORE_WIN32) SOCKET create_dgram_socket_internal(); - #else - void udp_read_from_net(UDPNetHandler * nh, UDPConnection * uc, PollDescriptor * pd, EThread * thread); - int udp_callback(UDPNetHandler * nh, UDPConnection * uc, EThread * thread); #endif #if defined (_IOCORE_WIN32) - EThread *m_ethread; - UDPNetHandler *m_udpNetHandler; + EThread *ethread; + UDPNetHandler *udpNetHandler; #else ink_off_t pollCont_offset; ink_off_t udpNetHandler_offset; #endif public: - virtual void UDPNetProcessor_is_abstract() - { - }; + virtual void UDPNetProcessor_is_abstract() { } }; extern UDPNetProcessorInternal udpNetInternal; @@ -80,8 +73,6 @@ class UDPQueue { public: - UDPQueue(InkAtomicList *); - virtual ~ UDPQueue(); void service(UDPNetHandler *); // these are internal APIs @@ -94,15 +85,17 @@ // Interface exported to the outside world void send(UDPPacket * p); - Queue m_reliabilityPktQueue; + Que(UDPPacketInternal, link) reliabilityPktQueue; + InkAtomicList atomicQueue; + ink_hrtime last_report; + ink_hrtime last_service; + ink_hrtime last_byteperiod; + int bytesSent; + int packets; + int added; - InkAtomicList *m_atomicQueue; - ink_hrtime m_last_report; - ink_hrtime m_last_service; - ink_hrtime m_last_byteperiod; - int m_bytesSent; - int m_packets; - int m_added; + UDPQueue(); + ~UDPQueue(); }; #ifdef PACKETQUEUE_IMPL_AS_RING @@ -121,7 +114,7 @@ :nPackets(0) , now_slot(0) { - m_lastPullLongTermQ = 0; + lastPullLongTermQ = 0; init(); } @@ -130,9 +123,9 @@ } int nPackets; - ink_hrtime m_lastPullLongTermQ; - Queue m_longTermQ; - Queue bucket[N_SLOTS]; + ink_hrtime lastPullLongTermQ; + Que(UDPPacketInternal, link) longTermQ; + Que(UDPPacketInternal, link) bucket[N_SLOTS]; ink_hrtime delivery_time[N_SLOTS]; int now_slot; @@ -163,10 +156,10 @@ ink_assert(delivery_time[now_slot]); - if (e->m_delivery_time < now) - e->m_delivery_time = now; + if (e->delivery_time < now) + e->delivery_time = now; - ink_hrtime s = e->m_delivery_time - delivery_time[now_slot]; + ink_hrtime s = e->delivery_time - delivery_time[now_slot]; if (s < 0) { before = 1; @@ -178,7 +171,7 @@ // need a thingy to hold packets in a "long-term" slot; then, pull packets // from long-term slot whenever you advance. if (s >= N_SLOTS - 1) { - m_longTermQ.enqueue(e); + longTermQ.enqueue(e); e->in_heap = 0; e->in_the_priority_queue = 1; return; @@ -186,8 +179,8 @@ slot = (s + now_slot) % N_SLOTS; // so that slot+1 is still "in future". - ink_assert((before || delivery_time[slot] <= e->m_delivery_time) && - (delivery_time[(slot + 1) % N_SLOTS] >= e->m_delivery_time)); + ink_assert((before || delivery_time[slot] <= e->delivery_time) && + (delivery_time[(slot + 1) % N_SLOTS] >= e->delivery_time)); e->in_the_priority_queue = 1; e->in_heap = slot; bucket[slot].enqueue(e); @@ -214,13 +207,13 @@ bool IsCancelledPacket(UDPPacketInternal * p) { // discard packets that'll never get sent... - return ((p->m_conn->shouldDestroy()) || (p->m_conn->GetSendGenerationNumber() != p->m_reqGenerationNum)); + return ((p->conn->shouldDestroy()) || (p->conn->GetSendGenerationNumber() != p->reqGenerationNum)); }; void FreeCancelledPackets(int numSlots) { UDPPacketInternal *p; - Queue tempQ; + Que(UDPPacketInternal, link) tempQ; int i, s; for (i = 0; i < numSlots; i++) { @@ -245,19 +238,18 @@ int s = now_slot; int prev; - if (ink_hrtime_to_msec(t - m_lastPullLongTermQ) >= SLOT_TIME_MSEC * ((N_SLOTS - 1) / 2)) { - Queue tempQ; + if (ink_hrtime_to_msec(t - lastPullLongTermQ) >= SLOT_TIME_MSEC * ((N_SLOTS - 1) / 2)) { + Que(UDPPacketInternal, link) tempQ; UDPPacketInternal *p; // pull in all the stuff from long-term slot - m_lastPullLongTermQ = t; + lastPullLongTermQ = t; // this is to handle wierdoness where someone is trying to queue a // packet to be sent in SLOT_TIME_MSEC * N_SLOTS * (2+)---the packet - // will get back to m_longTermQ and we'll have an infinite loop. - while ((p = m_longTermQ.dequeue()) != NULL) + // will get back to longTermQ and we'll have an infinite loop. + while ((p = longTermQ.dequeue()) != NULL) tempQ.enqueue(p); - while ((p = tempQ.dequeue()) != NULL) { + while ((p = tempQ.dequeue()) != NULL) addPacket(p); - } } while (!bucket[s].head && (t > delivery_time[s] + SLOT_TIME)) { @@ -331,38 +323,28 @@ struct UDPNetHandler:Continuation { public: - //PollDescriptor * pollDescriptor; - -#define MAX_UDP_CONNECTION 8000 - UnixUDPConnection ** udpConnections; - - int startNetEvent(int event, Event * data); - int mainNetEvent(int event, Event * data); - PollDescriptor *build_poll(PollDescriptor *); - PollDescriptor *build_one_udpread_poll(int fd, UnixUDPConnection *, PollDescriptor * pd); - - - UDPNetHandler(); - // to be polled for read - Queue *udp_polling; + Que(UnixUDPConnection, polling_link) udp_polling; // to be called back with data - Queue *udp_callbacks; - + Que(UnixUDPConnection, callback_link) udp_callbacks; // outgoing packets InkAtomicList udpAtomicQueue; - UDPQueue *udpOutQueue; - + UDPQueue udpOutQueue; // to hold the newly created descriptors before scheduling them on // the servicing buckets. // atomically added to by a thread creating a new connection with // UDPBind InkAtomicList udpNewConnections; - Event *trigger_event; - ink_hrtime nextCheck; ink_hrtime lastCheck; + + int startNetEvent(int event, Event * data); + int mainNetEvent(int event, Event * data); + PollDescriptor *build_poll(PollDescriptor *); + PollDescriptor *build_one_udpread_poll(int fd, UnixUDPConnection *, PollDescriptor * pd); + + UDPNetHandler(); }; #endif @@ -375,34 +357,27 @@ class UDPNetHandler:Continuation { public: - UDPNetHandler(); - virtual ~ UDPNetHandler(); - int startNetEvent(int event, Event * data); - int mainNetEvent(int event, Event * data); - -#define MAX_UDP_CONNECTION 8000 - UDPConnection **udpConnections; - - // to be polled for read - Queue *udp_polling; + Que(UnixUDPConnection, polling_link) udp_polling; // to be called back with data - Queue *udp_callbacks; - + Que(UnixUDPConnection, callback_link) udp_callbacks; // outgoing packets InkAtomicList udpAtomicQueue; - UDPQueue *udpOutQueue; - + UDPQueue udpOutQueue; // to hold the newly created descriptors before scheduling them on // the servicing buckets. // atomically added to by a thread creating a new connection with // UDPBind InkAtomicList udpNewConnections; - Event *trigger_event; - ink_hrtime nextCheck; ink_hrtime lastCheck; + + int startNetEvent(int event, Event * data); + int mainNetEvent(int event, Event * data); + + UDPNetHandler(); + virtual ~ UDPNetHandler(); }; #endif @@ -425,39 +400,39 @@ { InkSinglePipeInfo() { - m_wt = 0.0; - m_bwLimit = 0; - m_destIP = 0; - m_count = 0; - m_bytesSent = m_pktsSent = 0; - m_bwAlloc = 0; - m_bwUsed = 0.0; - m_queue = NEW(new PacketQueue()); + wt = 0.0; + bwLimit = 0; + destIP = 0; + count = 0; + bytesSent = pktsSent = 0; + bwAlloc = 0; + bwUsed = 0.0; + queue = NEW(new PacketQueue()); }; ~InkSinglePipeInfo() { - delete m_queue; + delete queue; } - double m_wt; + double wt; // all are in bps (bits per sec.) so that we can do ink_atomic_increment - ink64 m_bwLimit; - ink64 m_bwAlloc; + ink64 bwLimit; + ink64 bwAlloc; // this is in Mbps - double m_bwUsed; - ink32 m_destIP; - inku32 m_count; - inku64 m_bytesSent; - inku64 m_pktsSent; - PacketQueue *m_queue; + double bwUsed; + ink32 destIP; + inku32 count; + inku64 bytesSent; + inku64 pktsSent; + PacketQueue *queue; }; struct InkPipeInfo { - int m_numPipes; - double m_interfaceMbps; - double m_reliabilityMbps; - InkSinglePipeInfo *m_perPipeInfo; + int numPipes; + double interfaceMbps; + double reliabilityMbps; + InkSinglePipeInfo *perPipeInfo; }; extern InkPipeInfo G_inkPipeInfo; @@ -465,8 +440,8 @@ class UDPWorkContinuation:public Continuation { public: - UDPWorkContinuation():m_cont(NULL), m_numPairs(0), m_myIP(0), m_destIP(0), - m_sendbufsize(0), m_recvbufsize(0), m_udpConns(NULL), m_resultCode(NET_EVENT_DATAGRAM_OPEN) + UDPWorkContinuation():cont(NULL), numPairs(0), myIP(0), destIP(0), + sendbufsize(0), recvbufsize(0), udpConns(NULL), resultCode(NET_EVENT_DATAGRAM_OPEN) { }; ~UDPWorkContinuation() { @@ -475,15 +450,15 @@ int StateCreatePortPairs(int event, void *data); int StateDoCallback(int event, void *data); - Action m_action; + Action action; private: - Continuation * m_cont; - int m_numPairs; - unsigned int m_myIP, m_destIP; - int m_sendbufsize, m_recvbufsize; - UnixUDPConnection **m_udpConns; - int m_resultCode; + Continuation * cont; + int numPairs; + unsigned int myIP, destIP; + int sendbufsize, recvbufsize; + UnixUDPConnection **udpConns; + int resultCode; }; typedef int (UDPWorkContinuation::*UDPWorkContinuation_Handler) (int, void *); Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_UDPPacket.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_UDPPacket.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_UDPPacket.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_UDPPacket.h Wed Dec 9 23:01:29 2009 @@ -51,67 +51,53 @@ virtual void free(); - SLink alink; // atomic link + SLink alink; // atomic link // packet scheduling stuff: keep it a doubly linked list - Link slink; - // From the packet scheduler point of view... - inku64 m_pktSendStartTime; - inku64 m_pktSendFinishTime; - inku32 m_pktLength; + inku64 pktSendStartTime; + inku64 pktSendFinishTime; + inku32 pktLength; - bool m_isReliabilityPkt; + bool isReliabilityPkt; - int m_reqGenerationNum; + int reqGenerationNum; // Associate a TS seq. # with each packet... We need this for WMT---WMT // maintains its own sequence numbers that need to increment by 1 on each // packet send. Since packets can be cancelled during a seek, WMT needs to // know the next "WMT seq. #" that it can tag to a packet. To determine the // "WMT seq. #", WMT code maintains a mapping betweeen WMT seq. # and TS - // seq. #. If m_pktTSSeqNum is set to -1, then this value is ignored by the + // seq. #. If pktTSSeqNum is set to -1, then this value is ignored by the // UDP code. - ink64 m_pktTSSeqNum; + ink64 pktTSSeqNum; - ink_hrtime m_delivery_time; // when to deliver packet - ink_hrtime m_arrival_time; // when packet arrived + ink_hrtime delivery_time; // when to deliver packet + ink_hrtime arrival_time; // when packet arrived - Ptr m_chain; - Continuation *m_cont; // callback on error - UDPConnectionInternal *m_conn; // connection where packet should be sent to. + Ptr chain; + Continuation *cont; // callback on error + UDPConnectionInternal *conn; // connection where packet should be sent to. #if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING) int in_the_priority_queue; int in_heap; #endif - virtual void UDPPacket_is_abstract() - { - }; + virtual void UDPPacket_is_abstract() { } }; inkcoreapi extern ClassAllocator udpPacketAllocator; inline UDPPacketInternal::UDPPacketInternal() - : -m_pktSendStartTime(0) - , -m_pktSendFinishTime(0) - , -m_pktLength(0) - , -m_isReliabilityPkt(false) - , -m_reqGenerationNum(0) - , -m_pktTSSeqNum(-1) - , -m_delivery_time(0) - , -m_arrival_time(0) - , -m_cont(NULL) - , -m_conn(NULL) + : pktSendStartTime(0) + , pktSendFinishTime(0) + , pktLength(0) + , isReliabilityPkt(false) + , reqGenerationNum(0) + , pktTSSeqNum(-1) + , delivery_time(0) + , arrival_time(0) + , cont(NULL) + , conn(NULL) #if defined(PACKETQUEUE_IMPL_AS_PQLIST) || defined(PACKETQUEUE_IMPL_AS_RING) , in_the_priority_queue(0) @@ -119,24 +105,24 @@ in_heap(0) #endif { - memset(&m_from, '\0', sizeof(m_from)); - memset(&m_to, '\0', sizeof(m_to)); + memset(&from, '\0', sizeof(from)); + memset(&to, '\0', sizeof(to)); } inline UDPPacketInternal::~ UDPPacketInternal() { - m_chain = NULL; + chain = NULL; } inline void UDPPacketInternal::free() { - m_chain = NULL; - if (m_conn) - m_conn->Release(); - m_conn = NULL; + chain = NULL; + if (conn) + conn->Release(); + conn = NULL; udpPacketAllocator.free(this); } @@ -155,14 +141,14 @@ { UDPPacketInternal *p = (UDPPacketInternal *) this; - p->m_isReliabilityPkt = true; + p->isReliabilityPkt = true; } INK_INLINE void UDPPacket::setPktTSSeq(ink64 seqno) { UDPPacketInternal *p = (UDPPacketInternal *) this; - p->m_pktTSSeqNum = seqno; + p->pktTSSeqNum = seqno; } INK_INLINE void @@ -171,14 +157,14 @@ UDPPacketInternal *p = (UDPPacketInternal *) this; if (block) { - if (p->m_chain) { // append to end - IOBufferBlock *last = p->m_chain; + if (p->chain) { // append to end + IOBufferBlock *last = p->chain; while (last->next != NULL) { last = last->next; } last->next = block; } else { - p->m_chain = block; + p->chain = block; } } } @@ -187,10 +173,10 @@ UDPPacket::asBuf(int *len) { UDPPacketInternal *p = (UDPPacketInternal *) this; - if (p->m_chain) { + if (p->chain) { if (len) - *len = p->m_chain->size(); - return p->m_chain->start(); + *len = p->chain->size(); + return p->chain->start(); } else { return NULL; } @@ -202,13 +188,13 @@ UDPPacketInternal *p = (UDPPacketInternal *) this; IOBufferBlock *b; - p->m_pktLength = 0; - b = p->m_chain; + p->pktLength = 0; + b = p->chain; while (b) { - p->m_pktLength += b->read_avail(); + p->pktLength += b->read_avail(); b = b->next; } - return p->m_pktLength; + return p->pktLength; } INK_INLINE void @@ -220,20 +206,20 @@ INK_INLINE void UDPPacket::setContinuation(Continuation * c) { - ((UDPPacketInternal *) this)->m_cont = c; + ((UDPPacketInternal *) this)->cont = c; } INK_INLINE void UDPPacket::setConnection(UDPConnection * c) { /*Code reviewed by Case Larsen. Previously, we just had - ink_assert(!m_conn). This prevents tunneling of packets + ink_assert(!conn). This prevents tunneling of packets correctly---that is, you get packets from a server on a udp conn. and want to send it to a player on another connection, the assert will prevent that. The "if" clause enables correct handling of the connection ref. counts in such a scenario. */ - UDPConnectionInternal *&conn = ((UDPPacketInternal *) this)->m_conn; + UDPConnectionInternal *&conn = ((UDPPacketInternal *) this)->conn; if (conn) { if (conn == c) @@ -248,19 +234,19 @@ INK_INLINE IOBufferBlock * UDPPacket::getIOBlockChain(void) { - return ((UDPPacketInternal *) this)->m_chain; + return ((UDPPacketInternal *) this)->chain; } INK_INLINE UDPConnection * UDPPacket::getConnection(void) { - return ((UDPPacketInternal *) this)->m_conn; + return ((UDPPacketInternal *) this)->conn; } INK_INLINE void UDPPacket::setArrivalTime(ink_hrtime t) { - ((UDPPacketInternal *) this)->m_arrival_time = t; + ((UDPPacketInternal *) this)->arrival_time = t; } INK_INLINE UDPPacket * @@ -272,8 +258,8 @@ p->in_the_priority_queue = 0; p->in_heap = 0; #endif - p->m_delivery_time = when; - memcpy(&p->m_to, to, sizeof(p->m_to)); + p->delivery_time = when; + memcpy(&p->to, to, sizeof(p->to)); if (buf) { IOBufferBlock *body = new_IOBufferBlock(); @@ -297,8 +283,8 @@ p->in_the_priority_queue = 0; p->in_heap = 0; #endif - p->m_delivery_time = when; - memcpy(&p->m_to, to, sizeof(p->m_to)); + p->delivery_time = when; + memcpy(&p->to, to, sizeof(p->to)); while (buf) { body = buf->clone(); @@ -317,10 +303,10 @@ p->in_the_priority_queue = 0; p->in_heap = 0; #endif - p->m_delivery_time = when; + p->delivery_time = when; if (to) - memcpy(&p->m_to, to, sizeof(p->m_to)); - p->m_chain = buf; + memcpy(&p->to, to, sizeof(p->to)); + p->chain = buf; return p; } @@ -339,8 +325,8 @@ p->in_the_priority_queue = 0; p->in_heap = 0; #endif - p->m_delivery_time = 0; - memcpy(&p->m_from, from, sizeof(p->m_from)); + p->delivery_time = 0; + memcpy(&p->from, from, sizeof(p->from)); IOBufferBlock *body = new_IOBufferBlock(); body->alloc(iobuffer_size_to_index(len)); Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNet.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNet.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNet.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNet.h Wed Dec 9 23:01:29 2009 @@ -26,15 +26,28 @@ #include #include "List.h" + +class UnixNetVConnection; +class DNSConnection; +class NetAccept; +class UnixUDPConnection; +struct epoll_data_ptr +{ + int type; + union + { + UnixNetVConnection *vc; + DNSConnection *dnscon; + NetAccept *na; + UnixUDPConnection *uc; + } data; +}; + #include "P_UnixNetProcessor.h" #include "P_UnixNetVConnection.h" #include "P_NetAccept.h" #include "P_DNSConnection.h" -// -//added by YTS Team, yamsat -//Epoll data pointer's data type -// #define EPOLL_NETACCEPT 1 #define EPOLL_READWRITE_VC 2 #define EPOLL_DNS_CONNECTION 3 @@ -53,7 +66,6 @@ extern int fds_limit; extern ink_hrtime last_transient_accept_error; extern int http_accept_port_number; -extern int n_netq_list; //#define INACTIVITY_TIMEOUT @@ -95,68 +107,13 @@ #define NET_THROTTLE_DELAY 50 /* mseconds */ #define INK_MIN_PRIORITY 0 -#define INK_MAX_PRIORITY (n_netq_list - 1) -#ifdef XXTIME -#define XTIME(_x) _x -#else -#define XTIME(_x) -#endif - - -#define PRINT_IP(x) ((inku8*)&(x))[0],((inku8*)&(x))[1], \ - ((inku8*)&(x))[2],((inku8*)&(x))[3] +#define PRINT_IP(x) ((inku8*)&(x))[0],((inku8*)&(x))[1], ((inku8*)&(x))[2],((inku8*)&(x))[3] -//function prototype needed for SSLUnixNetVConnection +// function prototype needed for SSLUnixNetVConnection unsigned int net_next_connection_number(); -struct PriorityPollQueue -{ - - Queue read_after[MAX_NET_BUCKETS]; - Queue read_poll; - Queue write_after[MAX_NET_BUCKETS]; - Queue write_poll; - inku32 position; - - int iafter(inku32 now, NetState * ns) - { - int delta = (int) (ns->do_next_at - now); - ink_assert(delta >= 0); - ink_assert((delta < n_netq_list) || (n_netq_list == 1)); - return (position + delta) % n_netq_list; - } - void enqueue(UnixNetVConnection * vc, NetState * ns, Queue *q, inku32 now) - { - int i = iafter(now, ns); - ink_assert(!ns->queue); - ns->queue = &q[i]; - q[i].enqueue(vc, ns->link); - } - void enqueue_read(UnixNetVConnection * vc, inku32 now) - { - enqueue(vc, &vc->read, read_after, now); - } - void enqueue_write(UnixNetVConnection * vc, inku32 now) - { - enqueue(vc, &vc->write, write_after, now); - } - static void remove_read(UnixNetVConnection * vc) - { - ((Queue *)vc->read.queue)->remove(vc, vc->read.link); - vc->read.queue = NULL; - } - static void remove_write(UnixNetVConnection * vc) - { - ((Queue *)vc->write.queue)->remove(vc, vc->write.link); - vc->write.queue = NULL; - } - - PriorityPollQueue(); -}; - - struct PollCont:public Continuation { NetHandler *net_handler; @@ -164,95 +121,12 @@ PollDescriptor *nextPollDescriptor; int poll_timeout; - PollCont(ProxyMutex * m); - PollCont(ProxyMutex * m, NetHandler * nh); - ~PollCont(); + PollCont(ProxyMutex * m); + PollCont(ProxyMutex * m, NetHandler * nh); + ~PollCont(); int pollEvent(int event, Event * e); }; - - -// -//added by YTS Team, yamsat -//Class consisting of ready queues and lock pending queues -//Ready queues consist of triggered and enabled events -//NetHandler processes the ready queues -//VCs which could not acquire the lock are added to lock -//pending queues -// -struct ReadyQueue -{ -public: - Queue read_ready_queue; - Queue write_ready_queue; - - void epoll_addto_read_ready_queue(UnixNetVConnection * vc) - { - vc->read.netready_queue = &read_ready_queue; - read_ready_queue.enqueue(vc, vc->read.netready_link); - } - - void epoll_addto_write_ready_queue(UnixNetVConnection * vc) - { - vc->write.netready_queue = &write_ready_queue; - write_ready_queue.enqueue(vc, vc->write.netready_link); - } - - static void epoll_remove_from_read_ready_queue(UnixNetVConnection * vc) - { - ((Queue *)vc->read.netready_queue)->remove(vc, vc->read.netready_link); - vc->read.netready_queue = NULL; - } - - static void epoll_remove_from_write_ready_queue(UnixNetVConnection * vc) - { - ((Queue *)vc->write.netready_queue)->remove(vc, vc->write.netready_link); - vc->write.netready_queue = NULL; - } - - ReadyQueue() { - } -}; - -// -//added by YTS Team, yamsat -//Class consisting of wait queues -//Wait queues consist of VCs which should not be processed -// -struct WaitList -{ -public: - Queue read_wait_list; - Queue write_wait_list; - - void epoll_addto_read_wait_list(UnixNetVConnection * vc) - { - vc->read.queue = &read_wait_list; - read_wait_list.enqueue(vc, vc->read.link); - } - - void epoll_addto_write_wait_list(UnixNetVConnection * vc) - { - vc->write.queue = &write_wait_list; - write_wait_list.enqueue(vc, vc->write.link); - } - - - static void epoll_remove_from_read_wait_list(UnixNetVConnection * vc) - { - ((Queue *)vc->read.queue)->remove(vc, vc->read.link); - vc->read.queue = NULL; - } - static void epoll_remove_from_write_wait_list(UnixNetVConnection * vc) - { - ((Queue *)vc->write.queue)->remove(vc, vc->write.link); - vc->write.queue = NULL; - } - - WaitList() { - } -}; - // // NetHandler // @@ -262,30 +136,24 @@ class NetHandler:public Continuation { public: - Event * trigger_event; - PriorityPollQueue pollq; - - ReadyQueue ready_queue; //added by YTS Team, yamsat - WaitList wait_list; //added by YTS Team, yamsat - - inku32 cur_msec; - bool ext_main; + Event *trigger_event; - Queue dnsqueue; //added by YTS Team, yamsat - Queue read_enable_list; //added by YTS Team, yamsat - Queue write_enable_list; //added by YTS Team, yamsat - ProxyMutexPtr read_enable_mutex; //added by YTS Team, yamsat - ProxyMutexPtr write_enable_mutex; //added by YTS Team, yamsat + Que(UnixNetVConnection, read.ready_link) read_ready_list; + Que(UnixNetVConnection, write.ready_link) write_ready_list; + Que(UnixNetVConnection, link) open_list; + Que(DNSConnection, link) dnsqueue; + ASSL(UnixNetVConnection, read.enable_link) read_enable_list; + ASSL(UnixNetVConnection, write.enable_link) write_enable_list; int startNetEvent(int event, Event * data); int mainNetEvent(int event, Event * data); - void process_sm_enabled_list(NetHandler *, EThread *); //added by YTS Team, yamsat int mainNetEventExt(int event, Event * data); + void process_enabled_list(NetHandler *, EThread *); PollDescriptor *build_poll(PollDescriptor * pd); PollDescriptor *build_one_read_poll(int fd, UnixNetVConnection *, PollDescriptor * pd); PollDescriptor *build_one_write_poll(int fd, UnixNetVConnection *, PollDescriptor * pd); - NetHandler(bool _ext_main = false); + NetHandler(); }; static inline NetHandler * @@ -492,12 +360,8 @@ if (!vc->write.enabled) vc->next_inactivity_timeout_at = 0; #endif - if (vc->read.enabled) { - vc->read.enabled = 0; - } - if (vc->read.netready_queue) { - ReadyQueue::epoll_remove_from_read_ready_queue(vc); - } + vc->read.enabled = 0; + nh->read_ready_list.remove(vc); } static inline void @@ -511,18 +375,12 @@ } } #else - if (vc->next_inactivity_timeout_at) { - if (!vc->read.enabled) { + if (vc->next_inactivity_timeout_at) + if (!vc->read.enabled) vc->next_inactivity_timeout_at = 0; - } - } #endif - if (vc->write.enabled) { - vc->write.enabled = 0; - } - if (vc->write.netready_queue) { - ReadyQueue::epoll_remove_from_write_ready_queue(vc); - } + vc->write.enabled = 0; + nh->write_ready_list.remove(vc); } @@ -541,24 +399,18 @@ (void) event; ink_hrtime now = ink_get_hrtime(); NetHandler *nh = get_NetHandler(this_ethread()); - UnixNetVConnection *vc = NULL; - UnixNetVConnection *next_vc = NULL; - Queue &q = nh->wait_list.read_wait_list; - for (vc = (UnixNetVConnection *) q.head; vc; vc = next_vc) { - next_vc = (UnixNetVConnection *) vc->read.link.next; - if (vc->inactivity_timeout_in && vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at < now) { + UnixNetVConnection * vc = nh->open_list.head, *vc_next = 0; + while (vc) { + vc_next = (UnixNetVConnection*)vc->link.next; + if (vc->inactivity_timeout_in && vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at < now) vc->handleEvent(EVENT_IMMEDIATE, e); - } else { - if (vc->closed) { + else + if (vc->closed) close_UnixNetVConnection(vc, e->ethread); - } - } - + vc = vc_next; } return 0; } }; #endif - - #endif Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetProcessor.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetProcessor.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetProcessor.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetProcessor.h Wed Dec 9 23:01:29 2009 @@ -78,7 +78,7 @@ char *throttle_error_message; Event *accept_thread_event; - AtomicSLL accepts_on_thread; + ASSL(NetAccept, link) accepts_on_thread; int accept_epoll_fd; //added by YTS Team, yamsat Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetState.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetState.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetState.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetState.h Wed Dec 9 23:01:29 2009 @@ -49,23 +49,12 @@ struct NetState { volatile int enabled; - int priority; VIO vio; - void *queue; - void *netready_queue; //added by YTS Team, yamsat - void *enable_queue; //added by YTS Team, yamsat - int ifd; - ink_hrtime do_next_at; - Link link; - Link netready_link; //added by YTS Team, yamsat - Link enable_link; //added by YTS Team, yamsat - ink32 next_vc; - int npending_scheds; + Link ready_link; + SLink enable_link; + int in_enabled_list; + int triggered; - int triggered; // added by YTS Team, yamsat - - void enqueue(void *q, UnixNetVConnection * vc); - - NetState(); + NetState() : enabled(0), vio(VIO::NONE), in_enabled_list(0), triggered(0) {} }; #endif Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetVConnection.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetVConnection.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetVConnection.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_UnixNetVConnection.h Wed Dec 9 23:01:29 2009 @@ -84,7 +84,6 @@ class UnixNetVConnection:public NetVConnection { - public: virtual VIO * do_io_read(Continuation * c, int nbytes, MIOBuffer * buf); @@ -93,32 +92,12 @@ virtual Action *send_OOB(Continuation * cont, char *buf, int len); virtual void cancel_OOB(); - virtual bool is_over_ssl() - { - return (false); - } - virtual void setSSLHandshakeWantsRead(bool flag) - { - return; - }; - virtual bool getSSLHandshakeWantsRead() - { - return false; - }; - virtual void setSSLHandshakeWantsWrite(bool flag) - { - return; - }; - virtual bool getSSLHandshakeWantsWrite() - { - return false; - }; - ///////////////////////////////////// - // DEPRICATED - bool is_read_enabled(); - bool is_write_enabled(); - // DEPRICATED - ///////////////////////////////////// + virtual bool is_over_ssl() { return (false); } + virtual void setSSLHandshakeWantsRead(bool flag) { return; } + virtual bool getSSLHandshakeWantsRead() { return false; } + virtual void setSSLHandshakeWantsWrite(bool flag) { return; } + + virtual bool getSSLHandshakeWantsWrite() { return false; } virtual void do_io_close(int lerrno = -1); virtual void do_io_shutdown(ShutdownHowTo_t howto); @@ -144,12 +123,6 @@ virtual void reenable(VIO * vio); virtual void reenable_re(VIO * vio); - ////////////////////////////////////////////////// - // Indicate that the connection is likely to be // - // actively used soon. this is a no-op on NT. // - ////////////////////////////////////////////////// - virtual void boost(); - virtual SOCKET get_socket() { return con.fd; @@ -224,11 +197,9 @@ } Action action_; - volatile int closed; NetState read; NetState write; - ink_hrtime inactivity_timeout_in; ink_hrtime active_timeout_in; #ifdef INACTIVITY_TIMEOUT @@ -237,15 +208,11 @@ ink_hrtime next_inactivity_timeout_at; #endif Event *active_timeout; - - - struct epoll_data_ptr *ep; //added by YTS Team, yamsat - NetHandler *nh; //added by YTS Team, yamsat - + struct epoll_data_ptr ep; + NetHandler *nh; unsigned int id; - unsigned int ip; - unsigned int _interface; + unsigned int _interface; // 'interface' conflicts with the C++ keyword int accept_port; int port; @@ -263,9 +230,7 @@ struct sockaddr_in local_sa; Connection con; - int recursion; - ink_hrtime submit_time; OOB_callback *oob_ptr; @@ -315,21 +280,6 @@ return inactivity_timeout_in; } -// this is currently not implemented on NT -INK_INLINE bool -UnixNetVConnection::is_read_enabled() -{ - return !!read.enabled; -} - -INK_INLINE bool -UnixNetVConnection::is_write_enabled() -{ - return !!write.enabled; -} - - - INK_INLINE void UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout) { Modified: incubator/trafficserver/traffic/trunk/iocore/net/P_UnixUDPConnection.h URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/P_UnixUDPConnection.h?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/P_UnixUDPConnection.h (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/P_UnixUDPConnection.h Wed Dec 9 23:01:29 2009 @@ -36,58 +36,37 @@ #include "P_UDPConnection.h" class UnixUDPConnection:public UDPConnectionInternal { - public: - - UnixUDPConnection(int fd); - virtual ~ UnixUDPConnection(); - void init(int fd); - - void setPollvecIndex(int i); - int getPollvecIndex(); - void clearPollvecIndex(); void setEthread(EThread * e); - void errorAndDie(int e); + int callbackHandler(int event, void *data); - Link polling_link; - - Link callback_link; - - SLink newconn_alink; + Link polling_link; + Link callback_link; + SLink newconn_alink; - int callbackHandler(int event, void *data); InkAtomicList inQueue; - int onCallbackQueue; - Action *callbackAction; - EThread *m_ethread; - struct epoll_data_ptr *eptr; - virtual void UDPConnection_is_abstract() - { - }; + EThread *ethread; + struct epoll_data_ptr ep; + UnixUDPConnection(int fd); + virtual ~ UnixUDPConnection(); private: - int m_pollvec_index; // used by nethandler for polling. int m_errno; + virtual void UDPConnection_is_abstract() {}; }; inline UnixUDPConnection::UnixUDPConnection(int fd) - : -onCallbackQueue(0) - , -callbackAction(NULL) - , -m_ethread(NULL) - , -m_pollvec_index(-1) - , -m_errno(0) + : onCallbackQueue(0) + , callbackAction(NULL) + , ethread(NULL) + , m_errno(0) { - m_fd = fd; + fd = fd; UDPPacketInternal p; ink_atomiclist_init(&inQueue, "Incoming UDP Packet queue", (char *) &p.alink.next - (char *) &p); SET_HANDLER(&UnixUDPConnection::callbackHandler); @@ -96,11 +75,10 @@ inline void UnixUDPConnection::init(int fd) { - m_fd = fd; + fd = fd; onCallbackQueue = 0; callbackAction = NULL; - m_ethread = NULL; - m_pollvec_index = -1; + ethread = NULL; m_errno = 0; UDPPacketInternal p; @@ -109,27 +87,9 @@ } inline void -UnixUDPConnection::setPollvecIndex(int i) -{ - m_pollvec_index = i; -} - -inline int -UnixUDPConnection::getPollvecIndex() -{ - return m_pollvec_index; -} - -inline void -UnixUDPConnection::clearPollvecIndex() -{ - m_pollvec_index = -1; -} - -inline void UnixUDPConnection::setEthread(EThread * e) { - m_ethread = e; + ethread = e; } inline void @@ -142,7 +102,7 @@ UDPConnection::Release() { UnixUDPConnection *p = (UnixUDPConnection *) this; - PollCont *pc = get_UDPPollCont(p->m_ethread); + PollCont *pc = get_UDPPollCont(p->ethread); #if defined(USE_EPOLL) struct epoll_event ev; @@ -153,12 +113,8 @@ EV_SET(&ev[1], getFd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); kevent(pc->pollDescriptor->kqueue_fd, &ev[0], 2, NULL, 0, NULL); #endif - if (p->eptr) { - free(p->eptr); - p->eptr = NULL; - } - if (ink_atomic_increment(&p->m_refcount, -1) == 1) { + if (ink_atomic_increment(&p->refcount, -1) == 1) { ink_debug_assert(p->callback_link.next == NULL); ink_debug_assert(p->callback_link.prev == NULL); ink_debug_assert(p->polling_link.next == NULL); Modified: incubator/trafficserver/traffic/trunk/iocore/net/SSLNetVConnection.cc URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/SSLNetVConnection.cc?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/SSLNetVConnection.cc (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/SSLNetVConnection.cc Wed Dec 9 23:01:29 2009 @@ -192,31 +192,20 @@ } if (ret == EVENT_ERROR) { - //added by YTS Team, yamsat - if (this->read.triggered == 1) { - this->read.triggered = 0; - } + this->read.triggered = 0; readSignalError(nh, err); } else if (ret == SSL_HANDSHAKE_WANT_READ || ret == SSL_HANDSHAKE_WANT_ACCEPT || ret == SSL_HANDSHAKE_WANT_CONNECT || ret == SSL_HANDSHAKE_WANT_WRITE) { read.triggered = 0; - if (read.netready_queue) { - ReadyQueue::epoll_remove_from_read_ready_queue(this); - } + nh->read_ready_list.remove(this); write.triggered = 0; - if (write.netready_queue) { - ReadyQueue::epoll_remove_from_write_ready_queue(this); - } + nh->write_ready_list.remove(this); } else if (ret == EVENT_DONE) { write.triggered = 1; - if (write.enabled) { - if (!write.netready_queue) { - nh->ready_queue.epoll_addto_write_ready_queue(this); - } - } - } else { + if (write.enabled) + nh->write_ready_list.in_or_enqueue(this); + } else readReschedule(nh); - } return; } // If there is nothing to do, disable connection @@ -226,8 +215,6 @@ return; } - - do { if (!buf.writer()->write_avail()) { buf.writer()->add_block(); @@ -263,10 +250,7 @@ // reset the tigger and remove from the ready queue // we will need to be retriggered to read from this socket again read.triggered = 0; - if (read.netready_queue) { - ReadyQueue::epoll_remove_from_read_ready_queue(this); - } - + nh->read_ready_list.remove(this); Debug("ssl", "read_from_net, read finished - would block"); break; @@ -287,10 +271,7 @@ Debug("ssl", "read_from_net, read finished - signal done"); break; case SSL_READ_ERROR: - //added by YTS Team, yamsat - if (this->read.triggered == 1) { - this->read.triggered = 0; - } + this->read.triggered = 0; readSignalError(nh, r); Debug("ssl", "read_from_net, read finished - read error"); break; @@ -410,7 +391,6 @@ ssl = NULL; } -//changed by YTS Team, yamsat void SSLNetVConnection::free(EThread * t) { @@ -424,15 +404,8 @@ this->mutex.clear(); flags = 0; SET_CONTINUATION_HANDLER(this, (SSLNetVConnHandler) & SSLNetVConnection::startEvent); - ink_assert(!read.queue && !write.queue); ink_assert(con.fd == NO_FD); - if (ep) { - xfree(ep); - ep = NULL; - } - if (nh) { - nh = NULL; - } + nh = NULL; //printf("total %d read calls for this connection fd: %d\n", read_calls, con.fd); //printf("total %d write calls for this connection fd: %d\n", write_calls, con.fd); read_calls = 0; Modified: incubator/trafficserver/traffic/trunk/iocore/net/SSLUnixNet.cc URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/SSLUnixNet.cc?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/SSLUnixNet.cc (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/SSLUnixNet.cc Wed Dec 9 23:01:29 2009 @@ -145,25 +145,22 @@ a = this; EThread *t = eventProcessor.eventthread[ET_SSL][i]; - //added by YTS Team, yamsat PollDescriptor *pd = get_PollDescriptor(t); - struct epoll_data_ptr *eptr; - eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr)); - eptr->type = EPOLL_NETACCEPT; - eptr->data.na = (NetAccept *) a; + ep.type = EPOLL_NETACCEPT; + ep.data.na = (NetAccept *) a; #if defined(USE_EPOLL) struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN | EPOLLET; - ev.data.ptr = eptr; + ev.data.ptr = &ep; if (epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, a->server.fd, &ev) < 0) { printf("error in epoll_ctl\n"); } #elif defined(USE_KQUEUE) struct kevent ev; - EV_SET(&ev, a->server.fd, EVFILT_READ, EV_ADD, 0, 0, eptr); + EV_SET(&ev, a->server.fd, EVFILT_READ, EV_ADD, 0, 0, &ep); if (kevent(pd->kqueue_fd, &ev, 1, NULL, 0, NULL) < 0) { printf("error in kevent\n"); } Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixNet.cc URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixNet.cc?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/UnixNet.cc (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/UnixNet.cc Wed Dec 9 23:01:29 2009 @@ -30,14 +30,6 @@ int fds_throttle; int fds_limit = 8000; ink_hrtime last_transient_accept_error; -int n_netq_list = 32; - -NetState::NetState(): -enabled(0), priority(INK_MIN_PRIORITY), vio(VIO::NONE), queue(0), netready_queue(0), //added by YTS Team, yamsat - enable_queue(0), //added by YTS Team, yamsat - ifd(-1), do_next_at(0), next_vc(0), npending_scheds(0), triggered(0) //added by YTS Team, yamsat -{ -} PollCont::PollCont(ProxyMutex * m):Continuation(m), net_handler(NULL), poll_timeout(REAL_DEFAULT_EPOLL_TIMEOUT) { @@ -75,10 +67,10 @@ if (likely(net_handler)) { /* checking to see whether there are connections on the ready_queue (either read or write) that need processing [ebalsa] */ if (likely - (!net_handler->ready_queue.read_ready_queue.empty() || !net_handler->ready_queue.write_ready_queue.empty() || + (!net_handler->read_ready_list.empty() || !net_handler->read_ready_list.empty() || !net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) { - Debug("epoll", "rrq: %d, wrq: %d, rel: %d, wel: %d", net_handler->ready_queue.read_ready_queue.empty(), - net_handler->ready_queue.write_ready_queue.empty(), net_handler->read_enable_list.empty(), + Debug("epoll", "rrq: %d, wrq: %d, rel: %d, wel: %d", net_handler->read_ready_list.empty(), + net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(), net_handler->write_enable_list.empty()); poll_timeout = 0; //poll immediately returns -- we have triggered stuff to process right now } else { @@ -119,16 +111,13 @@ if ((max_poll_delay & (max_poll_delay - 1)) || (max_poll_delay MAX_NET_BUCKETS * NET_PRIORITY_MSEC)) { IOCORE_SignalWarning(REC_SIGNAL_SYSTEM_ERROR, "proxy.config.net.max_poll_delay range is [4-1024]"); - } else - n_netq_list = max_poll_delay / 4; + } poll_delay_read = true; } - new((ink_dummy_for_new *) get_NetHandler(thread)) NetHandler(false); + new((ink_dummy_for_new *) get_NetHandler(thread)) NetHandler(); new((ink_dummy_for_new *) get_PollCont(thread)) PollCont(thread->mutex, get_NetHandler(thread)); get_NetHandler(thread)->mutex = new_ProxyMutex(); - get_NetHandler(thread)->read_enable_mutex = new_ProxyMutex(); - get_NetHandler(thread)->write_enable_mutex = new_ProxyMutex(); thread->schedule_imm(get_NetHandler(thread)); #ifndef INACTIVITY_TIMEOUT @@ -139,7 +128,7 @@ // NetHandler method definitions -NetHandler::NetHandler(bool _ext_main):Continuation(NULL), trigger_event(0), cur_msec(0), ext_main(_ext_main) +NetHandler::NetHandler():Continuation(NULL), trigger_event(0) { SET_HANDLER((NetContHandler) & NetHandler::startNetEvent); } @@ -159,42 +148,25 @@ } // -//Function added by YTS Team, yamsat +// Move VC's enabled on a different thread to the ready list // void -NetHandler::process_sm_enabled_list(NetHandler * nh, EThread * t) +NetHandler::process_enabled_list(NetHandler * nh, EThread * t) { - UnixNetVConnection *vc = NULL; - MUTEX_TRY_LOCK(rlistlock, nh->read_enable_mutex, t); - if (rlistlock) { - Queue &rq = nh->read_enable_list; - while ((vc = (UnixNetVConnection *) rq.dequeue(rq.head, rq.head->read.enable_link))) { - vc->read.enable_queue = NULL; - if ((vc->read.enabled && vc->read.triggered) || vc->closed) { - if (!vc->read.netready_queue) { - nh->ready_queue.epoll_addto_read_ready_queue(vc); - } - } - } - MUTEX_RELEASE(rlistlock); + SList(UnixNetVConnection, read.enable_link) rq(nh->read_enable_list.popall()); + while ((vc = rq.pop())) { + vc->read.in_enabled_list = 0; + if ((vc->read.enabled && vc->read.triggered) || vc->closed) + nh->read_ready_list.in_or_enqueue(vc); } - vc = NULL; - - MUTEX_TRY_LOCK(wlistlock, nh->write_enable_mutex, t); - if (wlistlock) { - Queue &wq = nh->write_enable_list; - while ((vc = (UnixNetVConnection *) wq.dequeue(wq.head, wq.head->write.enable_link))) { - vc->write.enable_queue = NULL; - if ((vc->write.enabled && vc->write.triggered) || vc->closed) { - if (!vc->write.netready_queue) { - nh->ready_queue.epoll_addto_write_ready_queue(vc); - } - } - } - MUTEX_RELEASE(wlistlock); + SList(UnixNetVConnection, write.enable_link) wq(nh->write_enable_list.popall()); + while ((vc = wq.pop())) { + vc->write.in_enabled_list = 0; + if ((vc->write.enabled && vc->write.triggered) || vc->closed) + nh->write_ready_list.in_or_enqueue(vc); } } @@ -213,23 +185,13 @@ int poll_timeout = REAL_DEFAULT_EPOLL_TIMEOUT; NET_INCREMENT_DYN_STAT(net_handler_run_stat); - //UnixNetVConnection *closed_vc = NULL, *next_closed_vc = NULL; - //Queue &q = wait_list.read_wait_list; - //for (closed_vc= (UnixNetVConnection*)q.head ; closed_vc ; closed_vc = next_closed_vc){ - // next_closed_vc = (UnixNetVConnection*) closed_vc->read.link.next; - //if (closed_vc->closed){ - //printf("MESSEDUP connection closed for fd :%d\n",closed_vc->con.fd); - //close_UnixNetVConnection(closed_vc, trigger_event->ethread); - //} - //} - - process_sm_enabled_list(this, e->ethread); - if (likely(!ready_queue.read_ready_queue.empty() || !ready_queue.write_ready_queue.empty() || - !read_enable_list.empty() || !write_enable_list.empty())) { - poll_timeout = 0; //poll immediately returns -- we have triggered stuff to process right now - } else { + + process_enabled_list(this, e->ethread); + if (likely(!read_ready_list.empty() || !write_ready_list.empty() || + !read_enable_list.empty() || !write_enable_list.empty())) + poll_timeout = 0; // poll immediately returns -- we have triggered stuff to process right now + else poll_timeout = REAL_DEFAULT_EPOLL_TIMEOUT; - } PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread); UnixNetVConnection *vc = NULL; @@ -256,71 +218,60 @@ if (get_ev_events(pd,x) & (INK_EVP_IN)) { vc->read.triggered = 1; vc->addLogMessage("read triggered"); - if ((vc->read.enabled || vc->closed) && !vc->read.netready_queue) { - ready_queue.epoll_addto_read_ready_queue(vc); - } else if (get_ev_events(pd,x) & (INK_EVP_PRI | INK_EVP_HUP | INK_EVP_ERR)) { + if ((vc->read.enabled || vc->closed) && !read_ready_list.in(vc)) + read_ready_list.enqueue(vc); + else if (get_ev_events(pd,x) & (INK_EVP_PRI | INK_EVP_HUP | INK_EVP_ERR)) { // check for unhandled epoll events that should be handled - Debug("epoll_miss", "Unhandled epoll event on read: 0x%04x read.enabled=%d closed=%d read.netready_queue=%p", - get_ev_events(pd,x), vc->read.enabled, vc->closed, vc->read.netready_queue); + Debug("epoll_miss", "Unhandled epoll event on read: 0x%04x read.enabled=%d closed=%d read.netready_queue=%d", + get_ev_events(pd,x), vc->read.enabled, vc->closed, read_ready_list.in(vc)); } } vc = epd->data.vc; if (get_ev_events(pd,x) & (INK_EVP_OUT)) { vc->write.triggered = 1; vc->addLogMessage("write triggered"); - if ((vc->write.enabled || vc->closed) && !vc->write.netready_queue) { - ready_queue.epoll_addto_write_ready_queue(vc); - } else if (get_ev_events(pd,x) & (INK_EVP_PRI | INK_EVP_HUP | INK_EVP_ERR)) { + if ((vc->write.enabled || vc->closed) && !write_ready_list.in(vc)) + write_ready_list.enqueue(vc); + else if (get_ev_events(pd,x) & (INK_EVP_PRI | INK_EVP_HUP | INK_EVP_ERR)) { // check for unhandled epoll events that should be handled Debug("epoll_miss", - "Unhandled epoll event on write: 0x%04x write.enabled=%d closed=%d write.netready_queue=%p", - get_ev_events(pd,x), vc->write.enabled, vc->closed, vc->write.netready_queue); + "Unhandled epoll event on write: 0x%04x write.enabled=%d closed=%d write.netready_queue=%d", + get_ev_events(pd,x), vc->write.enabled, vc->closed, write_ready_list.in(vc)); } } else if (!(get_ev_events(pd,x) & (INK_EVP_IN)) && get_ev_events(pd,x) & (INK_EVP_PRI | INK_EVP_HUP | INK_EVP_ERR)) { Debug("epoll_miss", "Unhandled epoll event: 0x%04x", get_ev_events(pd,x)); } } else if (epd->type == EPOLL_DNS_CONNECTION) { - if (epd->data.dnscon != NULL) { - dnsqueue.enqueue(epd->data.dnscon, epd->data.dnscon->link); - } + if (epd->data.dnscon != NULL) + dnsqueue.enqueue(epd->data.dnscon); } } pd->result = 0; UnixNetVConnection *next_vc = NULL; - vc = ready_queue.read_ready_queue.head; - + vc = read_ready_list.head; while (vc) { - next_vc = vc->read.netready_link.next; - if (vc->closed) { + next_vc = vc->read.ready_link.next; + if (vc->closed) close_UnixNetVConnection(vc, trigger_event->ethread); - } else if (vc->read.enabled && vc->read.triggered) { + else if (vc->read.enabled && vc->read.triggered) vc->net_read_io(this, trigger_event->ethread); - } vc = next_vc; } next_vc = NULL; - vc = ready_queue.write_ready_queue.head; - + vc = write_ready_list.head; while (vc) { - next_vc = vc->write.netready_link.next; - if (vc->closed) { + next_vc = vc->write.ready_link.next; + if (vc->closed) close_UnixNetVConnection(vc, trigger_event->ethread); - } else if (vc->write.enabled && vc->write.triggered) { + else if (vc->write.enabled && vc->write.triggered) write_to_net(this, vc, pd, trigger_event->ethread); - } vc = next_vc; } return EVENT_CONT; } -// PriorityPollQueue methods - -PriorityPollQueue::PriorityPollQueue() -{ - position = ink_get_hrtime() / HRTIME_MSECOND; -} Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixNetAccept.cc URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixNetAccept.cc?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/UnixNetAccept.cc (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/UnixNetAccept.cc Wed Dec 9 23:01:29 2009 @@ -118,11 +118,7 @@ count++; na->alloc_cache = NULL; -#ifdef XXTIME - vc->submit_time = ink_get_hrtime_internal(); -#else vc->submit_time = ink_get_hrtime(); -#endif vc->ip = vc->con.sa.sin_addr.s_addr; vc->port = ntohs(vc->con.sa.sin_port); vc->accept_port = ntohs(na->server.sa.sin_port); @@ -146,7 +142,6 @@ // // Special purpose MAIN proxy accept code // Seperate accept thread function -// Modified by YTS Team, yamsat // int net_accept_main_blocking(NetAccept * na, Event * e, bool blockable) @@ -154,27 +149,24 @@ (void) blockable; (void) e; - struct epoll_data_ptr *temp_eptr = NULL; struct PollDescriptor *epd = (PollDescriptor *) xmalloc(sizeof(PollDescriptor)); epd->init(); - //unix_netProcessor.accept_epoll_fd = epd->epoll_fd; - //added by vijay - bug 2237131 - struct epoll_data_ptr *eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr)); - eptr->type = EPOLL_NETACCEPT; //NetAccept - eptr->data.na = na; + struct epoll_data_ptr ep; + ep.type = EPOLL_NETACCEPT; // NetAccept + ep.data.na = na; #if defined(USE_EPOLL) struct epoll_event ev; memset(&ev, 0, sizeof(ev)); ev.events = EPOLLIN | EPOLLET; - ev.data.ptr = eptr; + ev.data.ptr = &ep; if (epoll_ctl(epd->epoll_fd, EPOLL_CTL_ADD, na->server.fd, &ev) < 0) { Debug("iocore_net", "init_accept_loop : Error in epoll_ctl\n"); } #elif defined(USE_KQUEUE) struct kevent ev; - EV_SET(&ev, na->server.fd, EVFILT_READ, EV_ADD, 0, 0, eptr); + EV_SET(&ev, na->server.fd, EVFILT_READ, EV_ADD, 0, 0, &ep); if (kevent(epd->kqueue_fd, &ev, 1, NULL, 0, NULL) < 0) { Debug("iocore_net", "init_accept_loop : Error in kevent\n"); } @@ -199,7 +191,7 @@ #endif for (int x = 0; x < epd->result; x++) { if (get_ev_events(epd,x) & INK_EVP_IN) { - temp_eptr = (epoll_data_ptr *)get_ev_data(epd,x); + struct epoll_data_ptr *temp_eptr = (epoll_data_ptr *)get_ev_data(epd,x); if (temp_eptr) net_accept = temp_eptr->data.na; if (net_accept) { @@ -305,25 +297,22 @@ a = this; EThread *t = eventProcessor.eventthread[ET_NET][i]; - //added by YTS Team, yamsat PollDescriptor *pd = get_PollDescriptor(t); - struct epoll_data_ptr *eptr; - eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr)); - eptr->type = EPOLL_NETACCEPT; - eptr->data.na = a; + ep.type = EPOLL_NETACCEPT; + ep.data.na = a; #if defined(USE_EPOLL) struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN | EPOLLET; - ev.data.ptr = eptr; + ev.data.ptr = &ep; if (epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, a->server.fd, &ev) < 0) { Debug("iocore_net", "init_accept_per_thread : Error in epoll_ctl\n"); } #elif defined(USE_KQUEUE) struct kevent ev; - EV_SET(&ev, a->server.fd, EVFILT_READ, EV_ADD, 0, 0, eptr); + EV_SET(&ev, a->server.fd, EVFILT_READ, EV_ADD, 0, 0, &ep); if (kevent(pd->kqueue_fd, &ev, 1, NULL, 0, NULL) < 0) { Debug("iocore_net", "init_accept_per_thread : Error in kevent\n"); } @@ -412,11 +401,7 @@ RecIncrGlobalRawStatSum(net_rsb, net_connections_currently_open_stat, 1); vc->closed = 0; -#ifdef XXTIME - vc->submit_time = ink_get_hrtime_internal(); -#else vc->submit_time = now; -#endif vc->ip = vc->con.sa.sin_addr.s_addr; vc->port = ntohs(vc->con.sa.sin_port); vc->accept_port = ntohs(server.sa.sin_port); @@ -555,9 +540,6 @@ #endif ) { ink_assert(vc->con.fd == NO_FD); - ink_assert(!vc->read.queue && !vc->write.queue); - ink_assert(!vc->read.link.prev && !vc->read.link.next); - ink_assert(!vc->write.link.prev && !vc->write.link.next); ink_assert(!vc->link.next && !vc->link.prev); freeThread(vc, e->ethread); goto Ldone; @@ -576,34 +558,25 @@ NET_INCREMENT_DYN_STAT(net_connections_currently_open_stat); vc->id = net_next_connection_number(); -#ifdef XXTIME - vc->submit_time = ink_get_hrtime_internal(); -#else vc->submit_time = ink_get_hrtime(); -#endif vc->ip = vc->con.sa.sin_addr.s_addr; vc->port = ntohs(vc->con.sa.sin_port); vc->accept_port = ntohs(server.sa.sin_port); vc->mutex = new_ProxyMutex(); vc->thread = e->ethread; - vc->ep = NULL; - vc->nh = get_NetHandler(e->ethread); SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::mainEvent); - struct epoll_data_ptr *eptr; - eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr)); - eptr->type = EPOLL_READWRITE_VC; - eptr->data.vc = vc; + vc->ep.type = EPOLL_READWRITE_VC; + vc->ep.data.vc = vc; - vc->ep = eptr; #if defined(USE_EPOLL) struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - ev.data.ptr = eptr; + ev.data.ptr = &vc->ep; if (epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, vc->con.fd, &ev) < 0) { Debug("iocore_net", "acceptFastEvent : Error in inserting fd[%d] in epoll_list\n", vc->con.fd); @@ -612,8 +585,8 @@ } #elif defined(USE_KQUEUE) struct kevent ev[2]; - EV_SET(&ev[0], vc->con.fd, EVFILT_READ, EV_ADD, 0, 0, eptr); - EV_SET(&ev[1], vc->con.fd, EVFILT_WRITE, EV_ADD, 0, 0, eptr); + EV_SET(&ev[0], vc->con.fd, EVFILT_READ, EV_ADD, 0, 0, &vc->ep); + EV_SET(&ev[1], vc->con.fd, EVFILT_WRITE, EV_ADD, 0, 0, &vc->ep); if (kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL) < 0) { Debug("iocore_net", "acceptFastEvent : Error in inserting fd[%d] in kevent\n", vc->con.fd); close_UnixNetVConnection(vc, e->ethread); @@ -622,23 +595,20 @@ #else #error port me #endif + + vc->nh->open_list.enqueue(vc); + // Set the vc as triggered and place it in the read ready queue in case there is already data on the socket. // The request will timeout on the connection if the client has already sent data and it is on the socket // ready to be read. This can occur under heavy load. Debug("iocore_net", "acceptEvent : Setting triggered and adding to the read ready queue"); vc->read.triggered = 1; - vc->nh->ready_queue.epoll_addto_read_ready_queue(vc); + vc->nh->read_ready_list.enqueue(vc); - Debug("iocore_net", "acceptFastEvent : Adding fd %d to read wait list\n", vc->con.fd); - vc->nh->wait_list.epoll_addto_read_wait_list(vc); - Debug("iocore_net", "acceptFastEvent : Adding fd %d to write wait list\n", vc->con.fd); - vc->nh->wait_list.epoll_addto_write_wait_list(vc); - - if (!action_->cancelled) { + if (!action_->cancelled) action_->continuation->handleEvent(NET_EVENT_ACCEPT, vc); - } else { + else close_UnixNetVConnection(vc, e->ethread); - } } while (loop); Ldone: @@ -648,9 +618,6 @@ Lerror: server.close(); e->cancel(); - if (vc->ep != NULL) { - free(vc->ep); - } freeThread(vc, e->ethread); NET_DECREMENT_DYN_STAT(net_accepts_currently_open_stat); delete this; Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixNetPages.cc URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixNetPages.cc?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/UnixNetPages.cc (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/UnixNetPages.cc Wed Dec 9 23:01:29 2009 @@ -59,9 +59,7 @@ } ink_hrtime now = ink_get_hrtime(); - //for (int i = 0; i < n_netq_list; i++) { - Queue &q = nh->wait_list.read_wait_list; - for (UnixNetVConnection * vc = (UnixNetVConnection *) q.head; vc; vc = (UnixNetVConnection *) vc->read.link.next) { + forl_LL(UnixNetVConnection, vc, nh->open_list) { if (ip && ip != vc->ip) continue; if (port && port != vc->port && port != vc->accept_port) @@ -80,11 +78,9 @@ "%d secs ago" // start time "%d" // thread id "%d" // read enabled - "%d" // read priority "%d" // read NBytes "%d" // read NDone "%d" // write enabled - "%d" // write priority "%d" // write nbytes "%d" // write ndone "%d secs" // Inactivity timeout at @@ -100,17 +96,14 @@ (int) ((now - vc->submit_time) / HRTIME_SECOND), ethread->id, vc->read.enabled, - vc->read.priority, vc->read.vio.nbytes, vc->read.vio.ndone, vc->write.enabled, - vc->write.priority, vc->write.vio.nbytes, vc->write.vio.ndone, (int) (vc->inactivity_timeout_in / HRTIME_SECOND), (int) (vc->active_timeout_in / HRTIME_SECOND), vc->f.shutdown, vc->closed ? "closed " : "")); } - //} ithread++; if (ithread < eventProcessor.n_threads_for_type[ET_NET]) eventProcessor.eventthread[ET_NET][ithread]->schedule_imm(this); @@ -163,22 +156,8 @@ CHECK_SHOW(show("

Thread: %d

\n", ithread)); CHECK_SHOW(show("\n")); int connections = 0; - /*int *read_pri = new int[n_netq_list]; - int *write_pri = new int[n_netq_list]; - int *read_buck = new int[n_netq_list]; - int *write_buck = new int[n_netq_list]; */ - Queue &qr = nh->wait_list.read_wait_list; - UnixNetVConnection *vc = (UnixNetVConnection *) qr.head; - for (; vc; vc = (UnixNetVConnection *) vc->read.link.next) { + forl_LL(UnixNetVConnection, vc, nh->open_list) connections++; - //read_pri[vc->read.priority <= 0 ? 0 : vc->read.priority]++; - //read_buck[i]++; - } - Queue &qw = nh->wait_list.write_wait_list; - for (vc = (UnixNetVConnection *) qw.head; vc; vc = (UnixNetVConnection *) vc->write.link.next) { - //write_pri[vc->write.priority <= 0 ? 0 : vc->write.priority]++; - //write_buck[i]++; - } CHECK_SHOW(show("\n", "Connections", connections)); CHECK_SHOW(show("\n", "Last Poll Size", pollDescriptor->nfds)); CHECK_SHOW(show("\n", "Last Poll Ready", pollDescriptor->result)); @@ -186,11 +165,6 @@ CHECK_SHOW(show("
%s%d
%s%d
%s%d
\n")); CHECK_SHOW(show ("\n")); - /*for (i = 0; i < n_netq_list; i++) { - CHECK_SHOW(show( - "\n", - i, read_pri[i],read_buck[i],write_pri[i],write_buck[i])); - } */ CHECK_SHOW(show("
#Read PriorityRead BucketWrite PriorityWrite Bucket
%d%d%d%d%d
\n")); ithread++; if (ithread < eventProcessor.n_threads_for_type[ET_NET]) Modified: incubator/trafficserver/traffic/trunk/iocore/net/UnixNetProcessor.cc URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/net/UnixNetProcessor.cc?rev=889011&r1=889010&r2=889011&view=diff ============================================================================== --- incubator/trafficserver/traffic/trunk/iocore/net/UnixNetProcessor.cc (original) +++ incubator/trafficserver/traffic/trunk/iocore/net/UnixNetProcessor.cc Wed Dec 9 23:01:29 2009 @@ -130,17 +130,13 @@ na->etype = etype; if (na->callback_on_open) na->mutex = cont->mutex; - if (frequent_accept) // true - { - if (use_accept_thread) // 0 - { + if (frequent_accept) { // true + if (use_accept_thread) // 0 na->init_accept_loop(); - } else { + else na->init_accept_per_thread(); - } - } else { + } else na->init_accept(); - } if (bound_sockaddr && bound_sockaddr_size) safe_getsockname(na->server.fd, bound_sockaddr, bound_sockaddr_size); @@ -184,11 +180,7 @@ NET_INCREMENT_DYN_STAT(net_connections_currently_open_stat); vc->id = net_next_connection_number(); -#ifdef XXTIME - vc->submit_time = ink_get_hrtime_internal(); -#else vc->submit_time = ink_get_hrtime(); -#endif vc->setSSLClientConnection(true); vc->ip = ip; vc->port = port; @@ -213,11 +205,7 @@ #endif NET_INCREMENT_DYN_STAT(net_connections_currently_open_stat); vc->id = net_next_connection_number(); -#ifdef XXTIME - vc->submit_time = ink_get_hrtime_internal(); -#else vc->submit_time = ink_get_hrtime(); -#endif vc->setSSLClientConnection(true); vc->ip = ip; vc->port = port; @@ -273,7 +261,6 @@ } } - Action * UnixNetProcessor::connect(Continuation * cont, UnixNetVConnection ** avc, @@ -297,11 +284,7 @@ else opt = &vc->options; vc->id = net_next_connection_number(); -#ifdef XXTIME - vc->submit_time = ink_get_hrtime_internal(); -#else vc->submit_time = ink_get_hrtime(); -#endif vc->setSSLClientConnection(true); vc->ip = ip; vc->port = port; @@ -330,15 +313,8 @@ check_emergency_throttle(vc->con); - // start up next round immediately - - //added by YTS Team, yamsat - struct epoll_data_ptr *eptr; - eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr)); - eptr->type = EPOLL_READWRITE_VC; - eptr->data.vc = vc; - - vc->ep = eptr; + vc->ep.type = EPOLL_READWRITE_VC; + vc->ep.data.vc = vc; PollDescriptor *pd = get_PollDescriptor(t); @@ -346,7 +322,7 @@ struct epoll_event ev; memset(&ev, 0, sizeof(struct epoll_event)); ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - ev.data.ptr = eptr; + ev.data.ptr = &vc->ep; res = epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, vc->con.fd, &ev); @@ -358,14 +334,14 @@ #elif defined(USE_KQUEUE) struct kevent ev; - EV_SET(&ev, vc->con.fd, EVFILT_READ, EV_ADD, 0, 0, eptr); + EV_SET(&ev, vc->con.fd, EVFILT_READ, EV_ADD, 0, 0, &vc->ep); if (kevent(pd->kqueue_fd, &ev, 1, NULL, 0, NULL) < 0) { Debug("iocore_net", "connect : Error in adding to kqueue list\n"); close_UnixNetVConnection(vc, vc->thread); return ACTION_RESULT_DONE; } - EV_SET(&ev, vc->con.fd, EVFILT_WRITE, EV_ADD, 0, 0, eptr); + EV_SET(&ev, vc->con.fd, EVFILT_WRITE, EV_ADD, 0, 0, &vc->ep); if (kevent(pd->kqueue_fd, &ev, 1, NULL, 0, NULL) < 0) { Debug("iocore_net", "connect : Error in adding to kqueue list\n"); close_UnixNetVConnection(vc, vc->thread); @@ -375,21 +351,15 @@ #error port me #endif - Debug("iocore_net", "connect : Adding fd %d to read wait list\n", vc->con.fd); - vc->nh->wait_list.epoll_addto_read_wait_list(vc); - Debug("iocore_net", "connect : Adding fd %d to write wait list\n", vc->con.fd); - vc->nh->wait_list.epoll_addto_write_wait_list(vc); + vc->nh->open_list.enqueue(vc); SET_CONTINUATION_HANDLER(vc, (NetVConnHandler) & UnixNetVConnection::mainEvent); ink_assert(!vc->inactivity_timeout_in); ink_assert(!vc->active_timeout_in); - XTIME(printf("%d 1connect\n", vc->id)); *avc = vc; return ACTION_RESULT_DONE; } - - struct CheckConnect:public Continuation { UnixNetVConnection *vc;