Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 4772 invoked from network); 12 Sep 2007 00:37:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Sep 2007 00:37:40 -0000 Received: (qmail 81265 invoked by uid 500); 12 Sep 2007 00:37:34 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 81257 invoked by uid 500); 12 Sep 2007 00:37:34 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 81246 invoked by uid 99); 12 Sep 2007 00:37:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Sep 2007 17:37:34 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Sep 2007 00:37:40 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id EBBC41A9832; Tue, 11 Sep 2007 17:37:19 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r574735 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/RecoveryManagerImpl.cpp cpp/src/qpid/framing/AMQFrame.cpp cpp/src/qpid/framing/AMQFrame.h python/qpid/codec.py python/qpid/connection.py Date: Wed, 12 Sep 2007 00:37:19 -0000 To: qpid-commits@incubator.apache.org From: astitcher@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070912003719.EBBC41A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: astitcher Date: Tue Sep 11 17:37:17 2007 New Revision: 574735 URL: http://svn.apache.org/viewvc?rev=574735&view=rev Log: * python/qpid/codec.py Comment typo * cpp/src/qpid/broker/RecoveryManagerImpl.cpp Cruft removal * python/qpid/codec.py * python/qpid/connection.py * cpp/src/qpid/framing/AMQFrame.h * cpp/src/qpid/framing/AMQFrame.cpp Initial implementation of 0-10 framing - This uses the new 12 byte frame header, but doesn't support splitting segments/framesets over multiple frames yet. Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h incubator/qpid/trunk/qpid/python/qpid/codec.py incubator/qpid/trunk/qpid/python/qpid/connection.py Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=574735&r1=574734&r2=574735&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Sep 11 17:37:17 2007 @@ -107,8 +107,6 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) { - buffer.record(); - //peek at type: Message::shared_ptr message(new Message()); message->decodeHeader(buffer); return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=574735&r1=574734&r2=574735&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Sep 11 17:37:17 2007 @@ -70,40 +70,70 @@ return boost::apply_visitor(GetBodyVisitor(), const_cast(body)); } +// This is now misleadingly named as it is not the frame size as defined in the spec +// (as it also includes the end marker) +uint32_t AMQFrame::size() const{ + return frameOverhead() + boost::apply_visitor(SizeVisitor(), body); +} + +uint32_t AMQFrame::frameOverhead() { + return 12 /*frame header*/ + 1/*0xCE*/; +} + void AMQFrame::encode(Buffer& buffer) const { + uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0); + buffer.putOctet(flags); buffer.putOctet(getBody()->type()); + buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself) + buffer.putOctet(0); + buffer.putOctet(0x0f & subchannel); buffer.putShort(channel); - buffer.putLong(boost::apply_visitor(SizeVisitor(), body)); + buffer.putLong(0); boost::apply_visitor(EncodeVisitor(buffer), body); buffer.putOctet(0xCE); } -uint32_t AMQFrame::size() const{ - return frameOverhead() + boost::apply_visitor(SizeVisitor(), body); -} - -uint32_t AMQFrame::frameOverhead() { - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/; -} - bool AMQFrame::decode(Buffer& buffer) { - if(buffer.available() < 7) + if(buffer.available() < frameOverhead() - 1) return false; buffer.record(); - uint8_t type = buffer.getOctet(); + uint8_t flags = buffer.getOctet(); + uint8_t framing_version = (flags & 0xc0) >> 6; + if (framing_version != 0) + THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported"); + bof = flags & 0x08; + eof = flags & 0x04; + bos = flags & 0x02; + bos = flags & 0x01; + uint8_t type = buffer.getOctet(); + uint16_t frame_size = buffer.getShort(); + if (frame_size < frameOverhead()-1) + THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small"); + uint8_t reserved1 = buffer.getOctet(); + uint8_t field1 = buffer.getOctet(); + subchannel = field1 & 0x0f; channel = buffer.getShort(); - uint32_t size = buffer.getLong(); + (void) buffer.getLong(); // reserved2 + + // Verify that the protocol header meets current spec + // TODO: should we check reserved2 against zero as well? - the spec isn't clear + if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0) + THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero"); - if(buffer.available() < size+1){ + // TODO: should no longer care about body size and only pass up B,E,b,e flags + uint16_t body_size = frame_size + 1 - frameOverhead(); + if (buffer.available() < body_size+1u){ buffer.restore(); return false; } - decodeBody(buffer, size, type); + decodeBody(buffer, body_size, type); + uint8_t end = buffer.getOctet(); - if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); + if (end != 0xCE) + THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); return true; } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=574735&r1=574734&r2=574735&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Sep 11 17:37:17 2007 @@ -37,14 +37,14 @@ class AMQFrame : public AMQDataBlock { public: - AMQFrame() : channel(0) {} + AMQFrame() : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(0) {} /** Construct a frame with a copy of b */ - AMQFrame(ChannelId c, const AMQBody* b) : channel(c) { + AMQFrame(ChannelId c, const AMQBody* b) : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(c) { setBody(*b); } - AMQFrame(ChannelId c, const AMQBody& b) : channel(c) { + AMQFrame(ChannelId c, const AMQBody& b) : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(c) { setBody(b); } @@ -97,6 +97,11 @@ void decodeBody(Buffer& buffer, uint32_t size, uint8_t type); + bool bof; + bool eof; + bool bos; + bool eos; + uint8_t subchannel; uint16_t channel; Variant body; }; Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?rev=574735&r1=574734&r2=574735&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/codec.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/codec.py Tue Sep 11 17:37:17 2007 @@ -53,7 +53,7 @@ def read(self, n): """ - reads in 'n' bytes from the stream. Can raise EFO exception + reads in 'n' bytes from the stream. Can raise EOF exception """ data = self.stream.read(n) if n > 0 and len(data) == 0: Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?rev=574735&r1=574734&r2=574735&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/python/qpid/connection.py (original) +++ incubator/qpid/trunk/qpid/python/qpid/connection.py Tue Sep 11 17:37:17 2007 @@ -92,23 +92,44 @@ def write(self, frame): c = self.codec + c.encode_octet(0x0f) # TODO: currently fixed at ver=0, B=E=b=e=1 c.encode_octet(self.spec.constants.byname[frame.type].id) - c.encode_short(frame.channel) body = StringIO() enc = codec.Codec(body, self.spec) frame.encode(enc) enc.flush() - c.encode_longstr(body.getvalue()) + frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size) + c.encode_short(frame_size) + c.encode_octet(0) # Reserved + c.encode_octet(frame.subchannel & 0x0f) + c.encode_short(frame.channel) + c.encode_long(0) # Reserved + c.write(body.getvalue()) c.encode_octet(self.FRAME_END) def read(self): c = self.codec + flags = c.decode_octet() # TODO: currently ignoring flags + framing_version = (flags & 0xc0) >> 6 + if framing_version != 0: + raise "frame error: unknown framing version" type = self.spec.constants.byid[c.decode_octet()].name + frame_size = c.decode_short() + if frame_size < 12: # TODO: Magic number (frame header size) + raise "frame error: frame size too small" + reserved1 = c.decode_octet() + field = c.decode_octet() + subchannel = field & 0x0f channel = c.decode_short() - body = c.decode_longstr() + reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0 + if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0: + raise "frame error: reserved bits not all zero" + body_size = frame_size - 12 # TODO: Magic number (frame header size) + body = c.read(body_size) dec = codec.Codec(StringIO(body), self.spec) frame = Frame.DECODERS[type].decode(self.spec, dec, len(body)) frame.channel = channel + frame.subchannel = subchannel end = c.decode_octet() if end != self.FRAME_END: garbage = "" @@ -145,6 +166,7 @@ def init(self, args, kwargs): self.channel = kwargs.pop("channel", 0) + self.subchannel = kwargs.pop("subchannel", 0) def encode(self, enc): abstract