etch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vei...@apache.org
Subject svn commit: r1579568 - in /etch/trunk/binding-cpp/runtime: include/transport/ src/main/serialization/ src/main/transport/ src/test/ src/test/transport/
Date Thu, 20 Mar 2014 09:05:31 GMT
Author: veithm
Date: Thu Mar 20 09:05:30 2014
New Revision: 1579568

URL: http://svn.apache.org/r1579568
Log:
ETCH-278 Enable receiving of multiple packages in one buffer in EtchPackizer

Till now the packetizer was not able to handle more then one packet in a
buffer received from the transport layer.
This patches introduces a new algorithm to handle packets and adds tests
to ensure it works for all possible situations.

Change-Id: I7ba4b04a012ed8778861dc734ba2574d2b487ac9

Modified:
    etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h
    etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h
    etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp
    etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp
    etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp
    etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt
    etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp
    etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp

Modified: etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h
URL: http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h (original)
+++ etch/trunk/binding-cpp/runtime/include/transport/EtchFlexBuffer.h Thu Mar 20 09:05:30
2014
@@ -18,6 +18,8 @@
 
 #ifndef __ETCHFLEXBUFFER_H__
 #define __ETCHFLEXBUFFER_H__
+
+#include "capu/util/SmartPointer.h"
 #include "common/EtchConfig.h"
 #include "common/EtchError.h"
 #include "transport/EtchByteRepresentation.h"
@@ -90,7 +92,7 @@ public:
    *         ETCH_EINVAL if the byte representation is not BIG ENDIAN or LITTLE ENDIAN
    *         ETCH_ERANGE if there is not enough value to be read (less than 4 byte)
    */
-  status_t getInteger(capu::int32_t & value);
+  status_t getInteger(capu::uint32_t & value);
 
   /**
    * gets an byte value from flexible buffer
@@ -322,6 +324,7 @@ private:
   void copy(capu::int8_t *buffer, capu::uint32_t numBytes);
 };
 
+typedef capu::SmartPointer<EtchFlexBuffer> EtchFlexBufferPtr;
 
 #endif /* ETCHFLEXBUFFER_H */
 

Modified: etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h
URL: http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h (original)
+++ etch/trunk/binding-cpp/runtime/include/transport/EtchPacketizer.h Thu Mar 20 09:05:30
2014
@@ -51,8 +51,12 @@ public:
   const static EtchString& MAX_PKT_SIZE_TERM();
 
   const static capu::int32_t& SIG();
-
+  
+  const static capu::uint32_t& SIG_SIZE();
+  
   const static capu::uint32_t& HEADER_SIZE();
+  
+
 
   /**
    * Constructs the Packetizer with null packet handler and uri specified
@@ -72,6 +76,8 @@ public:
    */
   EtchPacketizer(EtchRuntime* runtime, EtchTransportData* transport, EtchURL* uri);
 
+
+public:
   /**
    * Destructor
    */
@@ -146,13 +152,16 @@ private:
 
   capu::uint32_t mMaxPktSize;
 
-  capu::uint32_t mBodyLen;
-
-  capu::bool_t mWantHeader;
-
-  capu::SmartPointer<EtchFlexBuffer> mSavedBuf;
-
-  status_t processHeader(EtchFlexBuffer* buf, capu::bool_t reset, capu::uint32_t &pktSize);
+  EtchFlexBufferPtr mReadBuf;
+  EtchFlexBufferPtr mTempBuf;
+    
+    
+  capu::bool_t      bufferContainsPacket();
+  capu::bool_t      bufferContainsHeader();
+  void              bufferClean();
+  EtchFlexBufferPtr extractPacket();
+  void              handleCorruptedBuffer();
+  
 
 };
 

Modified: etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp
URL: http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp (original)
+++ etch/trunk/binding-cpp/runtime/src/main/serialization/EtchBinaryTaggedDataInput.cpp Thu
Mar 20 09:05:30 2014
@@ -473,7 +473,7 @@ status_t EtchBinaryTaggedDataInput::read
 
     case EtchTypeCode::INT:
     {
-      capu::int32_t tmp;
+      capu::uint32_t tmp;
       if (mBuffer->getInteger(tmp) != ETCH_OK)
         return ETCH_ERROR;
       capu::SmartPointer<EtchInt32> intNum = new EtchInt32(tmp);

Modified: etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp
URL: http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp (original)
+++ etch/trunk/binding-cpp/runtime/src/main/transport/EtchFlexBuffer.cpp Thu Mar 20 09:05:30
2014
@@ -56,20 +56,20 @@ capu::uint32_t EtchFlexBuffer::getIndex(
   return mIndex;
 }
 
-status_t EtchFlexBuffer::getInteger(capu::int32_t & value) {
+status_t EtchFlexBuffer::getInteger(capu::uint32_t & value) {
 
-  if (sizeof (capu::int32_t) > getAvailableBytes())
+  if (sizeof (capu::uint32_t) > getAvailableBytes())
     return ETCH_ERANGE;
 
   if (mByteRepresentation == ETCH_LITTLE_ENDIAN) {
     // little-endian
-    capu::int32_t result = (mBuffer[mIndex++] & 255);
+    capu::uint32_t result = (mBuffer[mIndex++] & 255);
     result += (mBuffer[mIndex++] & 255) << 8;
     result += (mBuffer[mIndex++] & 255) << 16;
     value = result + ((mBuffer[mIndex++] & 255) << 24);
   } else if (mByteRepresentation == ETCH_BIG_ENDIAN) {
     // big-endian
-    capu::int32_t result = mBuffer[mIndex++] & 255;
+    capu::uint32_t result = mBuffer[mIndex++] & 255;
     result = (result << 8) + (mBuffer[mIndex++] & 255);
     result = (result << 8) + (mBuffer[mIndex++] & 255);
     value = (result << 8) + (mBuffer[mIndex++] & 255);
@@ -136,7 +136,7 @@ status_t EtchFlexBuffer::getLong(capu::i
 }
 
 status_t EtchFlexBuffer::getFloat(capu::float_t & value) {
-  capu::int32_t tmp;
+  capu::uint32_t tmp;
   status_t err = getInteger(tmp);
   if (err != ETCH_OK)
     return err;
@@ -219,11 +219,7 @@ status_t EtchFlexBuffer::put(EtchFlexBuf
 
   copy(&buffer.mBuffer[buffer.mIndex], buffer.getAvailableBytes());
 
-  if (mIndex > mLength)
-    mLength = mIndex;
-
   return ETCH_OK;
-
 }
 
 status_t EtchFlexBuffer::put(EtchFlexBuffer & buffer, capu::uint32_t numBytes) {

Modified: etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp
URL: http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp (original)
+++ etch/trunk/binding-cpp/runtime/src/main/transport/EtchPacketizer.cpp Thu Mar 20 09:05:30
2014
@@ -29,6 +29,12 @@ const capu::int32_t& EtchPacketizer::SIG
   return sig;
 }
 
+const capu::uint32_t& EtchPacketizer::SIG_SIZE() {
+  static const capu::uint32_t sigSize(4);
+  return sigSize;
+}
+
+
 const capu::int32_t& EtchPacketizer::DEFAULT_MAX_PKT_SIZE(){
   static const capu::int32_t pktSize(16384 - EtchPacketizer::HEADER_SIZE());
   return pktSize;
@@ -40,7 +46,7 @@ const EtchString& EtchPacketizer::MAX_PK
 }
 
 EtchPacketizer::EtchPacketizer(EtchRuntime* runtime, EtchTransportData* transport, EtchString&
uri)
-: mRuntime(runtime), mTransport(transport), mBodyLen(0), mWantHeader(true) {
+: mRuntime(runtime), mTransport(transport) {
   if (mTransport != NULL)
     mTransport->setSession(this);
 
@@ -55,11 +61,12 @@ EtchPacketizer::EtchPacketizer(EtchRunti
     if (mMaxPktSize <= 0)
       mMaxPktSize = DEFAULT_MAX_PKT_SIZE();
   }
-  mSavedBuf = new EtchFlexBuffer();
+  mReadBuf = new EtchFlexBuffer();
+  mTempBuf = new EtchFlexBuffer();
 }
 
 EtchPacketizer::EtchPacketizer(EtchRuntime* runtime, EtchTransportData* transport, EtchURL*
uri)
-: mRuntime(runtime), mTransport(transport), mBodyLen(0), mWantHeader(true) {
+: mRuntime(runtime), mTransport(transport) {
   EtchString value("");
   if (mTransport != NULL)
     transport->setSession(this);
@@ -73,7 +80,8 @@ EtchPacketizer::EtchPacketizer(EtchRunti
     if (mMaxPktSize <= 0)
       mMaxPktSize = DEFAULT_MAX_PKT_SIZE();
   }
-  mSavedBuf = new EtchFlexBuffer();
+  mReadBuf = new EtchFlexBuffer();
+  mTempBuf = new EtchFlexBuffer();
 }
 
 EtchPacketizer::~EtchPacketizer() {
@@ -135,124 +143,118 @@ status_t EtchPacketizer::transportPacket
 }
 
 status_t EtchPacketizer::sessionData(capu::SmartPointer<EtchWho> sender, capu::SmartPointer<EtchFlexBuffer>
buf) {
-//TODO: compare with java version
-
+  status_t result = ETCH_OK;
+  
+  if(mReadBuf->getAvailableBytes() > 0)
+  {
+    //If there is already data in the buffer, amend the new buffer by setting the index to
the end of the buffer
+    mReadBuf->setIndex(mReadBuf->getLength());
+  }
 
-  // there are two options here. one is that we have no buffered data
-  // and the entire packet is contained within the buf. in that case
-  // we could optimize the daylights out of the process and directly
-  // drop the packet on the handler.
-
-  status_t result;
-  if (buf->getAvailableBytes() > 0) {
-    if (mWantHeader) {
-      // do we have enough to make a header?
-
-      if (mSavedBuf->getLength() + buf->getAvailableBytes() >= HEADER_SIZE()) {
-        capu::uint32_t pktSize;
-
-        if (mSavedBuf->getLength() == 0) {
-          // savedBuf is empty, entire header in buf.
-
-          result = processHeader(buf.get(), false, pktSize);
-          if (result != ETCH_OK)
-            return result;
-        } else // header split across savedBuf and buf
-        {
-          // move just enough data from buf to savedBuf to have a header.
-
-          capu::uint32_t needFromBuf = HEADER_SIZE() - mSavedBuf->getLength();
-          mSavedBuf->put(*buf, needFromBuf);
-          mSavedBuf->setIndex(0);
-
-          result = processHeader(mSavedBuf.get(), true, pktSize);
-          if (result != ETCH_OK)
-            return result;
-        }
-
-        mBodyLen = pktSize;
-        mWantHeader = false;
-      } else // want header, but there's not enough to make it.
-      {
-        // save buf in savedBuf.
-
-        mSavedBuf->setIndex(mSavedBuf->getLength());
-        mSavedBuf->put(*buf);
-      }
-    }
-    if (!mWantHeader && mSavedBuf->getLength() + buf->getAvailableBytes() >=
mBodyLen) {
-      // want body, and there's enough to make it.
+  mReadBuf->put(*buf);
+  mReadBuf->setIndex(0);
 
-      // three possible cases: the body is entirely in savedBuf,
-      // the body is split, or the body is entirely in buf. assert
-      // that the body cannot entirely be in savedBuf, or else
-      // we'd have processed it last time.
-
-      if (mSavedBuf->getLength() == 0) {
-        // savedBuf is empty, entire body in buf.
-
-        capu::uint32_t length = buf->getLength();
-        capu::uint32_t index = buf->getIndex();
-        buf->setLength(index + mBodyLen);
-        ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(),
"Entire body is in buffer: header is parsed and the body of message is sent to Messagizer");
-        result = mSession->sessionPacket(sender, buf);
-
-        if (result != ETCH_OK) {
-          return result;
-        }
-
-        buf->setLength(length);
-        buf->setIndex(index + mBodyLen);
-
-        mWantHeader = true;
-      } else // body split across savedBuf and buf
-      {
-        // move just enough data from buf to savedBuf to have a body.
-
-        capu::uint32_t needFromBuf = mBodyLen - mSavedBuf->getLength();
-        mSavedBuf->put(*buf, needFromBuf);
-        mSavedBuf->setIndex(0);
-
-        ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(),
"more data in buffer: header is parsed and the body of message is sent to Messagizer");
-        mSession->sessionPacket(sender, mSavedBuf);
-
-        mSavedBuf->reset();
-        mWantHeader = true;
-      }
-    } else if (!mWantHeader)// want body, but there's not enough to make it.
-    {
-      // save buf in savedBuf.
-      mSavedBuf->setIndex(mSavedBuf->getLength());
-      mSavedBuf->put(*buf);
-    }
+  while(bufferContainsPacket())
+  {
+    EtchFlexBufferPtr packetData = extractPacket();
+    result = mSession->sessionPacket(sender, packetData);
   }
-  // buf is now empty, and there's nothing else to do.
-  if (buf->getAvailableBytes() != 0)
-    return ETCH_ERROR;
-  return ETCH_OK;
+  return result;
 }
 
-status_t EtchPacketizer::processHeader(EtchFlexBuffer* buf, capu::bool_t reset, capu::uint32_t
&pktSize) {
-  capu::int32_t sig = 0;
-  buf->getInteger(sig);
-
-  if (sig != SIG()) {
-    ETCH_LOG_ERROR(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(),
"SIG is not correct, message will be discarded");
-    return ETCH_ERROR;
+capu::bool_t EtchPacketizer::bufferContainsPacket()
+{
+  if(!bufferContainsHeader())
+  {
+    return false;
   }
+  capu::uint32_t packetSize = 0;
+  if(mReadBuf->getInteger(packetSize) != ETCH_OK || mReadBuf->getAvailableBytes() <
packetSize)
+  {
+    mReadBuf->setIndex(0);
+    return false;
+  }
+  if (packetSize > mMaxPktSize) {
+    ETCH_LOG_ERROR(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(),
"EtchPacketizer: PacketSize exceeded Maximum Packetsize. MaximumSize: " << mMaxPktSize
<< " PacketSize: " << packetSize);
+    mReadBuf->setIndex(mReadBuf->getIndex() + packetSize);
+    bufferClean();
+  }
+
+  mReadBuf->setIndex(0);
+  return true;
+}
 
-  buf->getInteger((capu::int32_t&)pktSize);
+capu::bool_t EtchPacketizer::bufferContainsHeader()
+{
+  if(mReadBuf->getAvailableBytes() < HEADER_SIZE())
+    return false;
+
+  capu::uint32_t sig = 0;
+  mReadBuf->getInteger(sig);
+  
+  if(sig == SIG()) {
+    return true;
+  }
 
-  if (reset)
-    buf->reset();
+  ETCH_LOG_ERROR(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(),
"EtchPacketizer: Packet SIG is incorrect - discarding package and searching for next Valid
SIG.");
+  
+  capu::uint32_t length = mReadBuf->getLength() - sizeof(capu::int32_t);
+  capu::uint32_t index = mReadBuf->getIndex();
+    
+  for(capu::uint32_t i = index; i < length; ++i) {
+    mReadBuf->getInteger(sig);
+    if(sig == SIG()) {
+      ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(),
"EtchPacketizer: Found correct SIG in Buffer after having a damaged Buffer.");
+      mReadBuf->setIndex(i); //we found a correct Header
+      bufferClean();
+      mReadBuf->setIndex(SIG_SIZE());
+      return true;
+    }
+    mReadBuf->setIndex(i+1);
+  }
 
-  if (pktSize > mMaxPktSize) {
-    ETCH_LOG_ERROR(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(),
"Packet size exceeds the maximum packet size, message will be discarded");
-    return ETCH_ERROR;
+  mReadBuf->setIndex(mReadBuf->getLength() - HEADER_SIZE()); //Save the last 8 Bytes
+  bufferClean();
+  return false;
+}
+
+void EtchPacketizer::bufferClean()
+{
+  mTempBuf->put(*mReadBuf);
+  mTempBuf->setIndex(0);
+  
+  mReadBuf->reset();
+  mReadBuf->put(*mTempBuf);
+  mReadBuf->setIndex(0);
+  
+  mTempBuf->reset();
+}
+
+EtchFlexBufferPtr EtchPacketizer::extractPacket()
+{
+  mReadBuf->setIndex(mReadBuf->getIndex() + 4);
+  capu::uint32_t packetSize = 0;
+  mReadBuf->getInteger(packetSize);
+  
+  EtchFlexBufferPtr packetData = new EtchFlexBuffer();
+  packetData->put(*mReadBuf, packetSize);
+  
+  mReadBuf->setIndex(mReadBuf->getIndex() + packetSize);
+  if(mReadBuf->getAvailableBytes() > 0)
+  {
+    mTempBuf->put(*mReadBuf);
+    mTempBuf->setIndex(0);
   }
-  return ETCH_OK;
+  mReadBuf->reset();
+  mReadBuf->put(*mTempBuf);
+  mReadBuf->setIndex(0);
+  mTempBuf->reset();
+  packetData->setIndex(0);
+  ETCH_LOG_DEBUG(mRuntime->getLogger(), mRuntime->getLogger().getPacketizerContext(),
"EtchPacketizer: extracted packet - passing " << packetSize << " Bytes to Messagizer.")
+  return packetData;
 }
 
+
 EtchTransportData* EtchPacketizer::getTransport() {
   return mTransport;
 }

Modified: etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt
URL: http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt (original)
+++ etch/trunk/binding-cpp/runtime/src/test/CMakeLists.txt Thu Mar 20 09:05:30 2014
@@ -110,6 +110,8 @@ ELSEIF(TARGET_OS STREQUAL "Windows")
   target_link_libraries (etch-cpp-test etch-cpp capu)
 ELSEIF(TARGET_OS STREQUAL "QNX")
   target_link_libraries (etch-cpp-test etch-cpp capu c socket)
+ELSEIF(TARGET_OS STREQUAL "MacOSX")
+  target_link_libraries (etch-cpp-test etch-cpp capu)
 ENDIF()
 
 IF (TARGET_OS STREQUAL "Windows" AND BUILD_CHECK_MEMORY)

Modified: etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp
URL: http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp (original)
+++ etch/trunk/binding-cpp/runtime/src/test/transport/EtchFlexBufferTest.cpp Thu Mar 20 09:05:30
2014
@@ -179,7 +179,7 @@ TEST(EtchFlexBufferTest, putAndgetIntege
   EtchFlexBuffer *buffer = new EtchFlexBuffer();
   capu::int8_t key [] = {'a', 'b', 'c'};
 
-  capu::int32_t value = 5;
+  capu::uint32_t value = 5;
   EXPECT_TRUE(buffer->put(NULL, 32) == ETCH_EINVAL);
   //try to read an integer
   EXPECT_TRUE(buffer->getInteger(value) == ETCH_ERANGE);

Modified: etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp
URL: http://svn.apache.org/viewvc/etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp?rev=1579568&r1=1579567&r2=1579568&view=diff
==============================================================================
--- etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp (original)
+++ etch/trunk/binding-cpp/runtime/src/test/transport/EtchPacketizerTest.cpp Thu Mar 20 09:05:30
2014
@@ -28,6 +28,9 @@
 #include "transport/EtchSessionPacket.h"
 #include "transport/EtchFlexBuffer.h"
 
+using testing::_;
+using testing::Invoke;
+
 class MockListener3 : public virtual EtchSessionListener<EtchSocket> {
 public:
 
@@ -145,26 +148,413 @@ TEST_F(EtchPacketizerTest, TransportPack
   delete packetizer;
 }
 
-TEST_F(EtchPacketizerTest, SessionDataTest) {
-  EtchURL u("tcp://127.0.0.1:4001");
+class MessagizerCorruptedCheck : public EtchSessionPacket {
+public:
+    
+    status_t sessionPacket(capu::SmartPointer<EtchWho> receipent, capu::SmartPointer<EtchFlexBuffer>
buf)
+    {
+        capu::uint32_t val;
+
+        buf->getInteger(val);
+        EXPECT_EQ(11, val);
+
+        buf->getInteger(val);
+        EXPECT_EQ(1122, val);
+
+        buf->getInteger(val);
+        EXPECT_EQ(112233, val);
+
+        buf->getInteger(val);
+        EXPECT_EQ(11223344, val);
+        return ETCH_OK;
+    }
+    
+    status_t sessionQuery (capu::SmartPointer<EtchObject> query, capu::SmartPointer<EtchObject>
&result)
+    {
+        return ETCH_OK;
+    }
+    
+    status_t sessionControl(capu::SmartPointer<EtchObject> control, capu::SmartPointer<EtchObject>
value)
+    {
+        return ETCH_OK;
+    }
+    
+    status_t sessionNotify(capu::SmartPointer<EtchObject> event) {
+        return ETCH_OK;
+    }
+};
+
+class MockMessagizerCorrupted : public EtchSessionPacket {
+public:
+    MockMessagizerCorrupted() {
+        // By default, all calls are delegated to the real object.
+        ON_CALL(*this, sessionPacket(_, _)).WillByDefault(Invoke(&real_, &MessagizerCorruptedCheck::sessionPacket));
+    }
+    MOCK_METHOD2(sessionPacket, status_t(capu::SmartPointer<EtchWho> receipent, capu::SmartPointer<EtchFlexBuffer>
buf));
+    
+    MOCK_METHOD2(sessionQuery, status_t(capu::SmartPointer<EtchObject> query, capu::SmartPointer<EtchObject>
&result));
+    
+    MOCK_METHOD2(sessionControl, status_t(capu::SmartPointer<EtchObject> control, capu::SmartPointer<EtchObject>
value));
+    
+    status_t sessionNotify(capu::SmartPointer<EtchObject> event) {
+        return ETCH_OK;
+    }
+private:
+    MessagizerCorruptedCheck real_;
+};
+
+
+TEST_F(EtchPacketizerTest, SinglePackagesTest) {
+    EtchURL u("tcp://127.0.0.1:4001");
+    EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+    EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+    capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+    EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+    packetizer->setSession(mSessionPacker);
+    
+    EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_, _)).Times(1);
//Expect two Clean packages
+
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 24);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    packetizer->setSession(NULL);
+    delete mSessionPacker;
+    delete packetizer;
+    delete conn;
+}
+
+TEST_F(EtchPacketizerTest, MutliplePackagesTest) {
+    EtchURL u("tcp://127.0.0.1:4001");
+    EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+    EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+    capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+    EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+    packetizer->setSession(mSessionPacker);
+    
+    EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_, _)).Times(3);
//Expect two Clean packages
+    
+    //Build up a Correct Package and Attach a damaged Header behind it, which should lead
to a bad SIG in the next loop
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 72);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    packetizer->setSession(NULL);
+    delete mSessionPacker;
+    delete packetizer;
+    delete conn;
+}
+
+TEST_F(EtchPacketizerTest, SplitPackagesTest) {
+    EtchURL u("tcp://127.0.0.1:4001");
+    EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+    EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+    capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+    EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+    packetizer->setSession(mSessionPacker);
+    
+    EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_, _)).Times(3);
//Expect two Clean packages
+    
+    //Build up a Correct Package and Attach a damaged Header behind it, which should lead
to a bad SIG in the next loop
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+
+  
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 36);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    buffer->reset();
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 36);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    packetizer->setSession(NULL);
+    delete mSessionPacker;
+    delete packetizer;
+    delete conn;
+}
+
+TEST_F(EtchPacketizerTest, SplitHeaderTest) {
+    EtchURL u("tcp://127.0.0.1:4001");
+    EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+    EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+    capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+    EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+    packetizer->setSession(mSessionPacker);
+    
+    EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_, _)).Times(3);
//Expect two Clean packages
+    
+    //Build up a Correct Package and Attach a damaged Header behind it, which should lead
to a bad SIG in the next loop
+    buffer->putInt(packetizer->SIG());
+    
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 4);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    buffer->reset();
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 68);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    packetizer->setSession(NULL);
+    delete mSessionPacker;
+    delete packetizer;
+    delete conn;
+}
+
+TEST_F(EtchPacketizerTest, SplitMultiplePackagesTest) {
+    EtchURL u("tcp://127.0.0.1:4001");
+    EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+    EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+    capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+    EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+    packetizer->setSession(mSessionPacker);
+    
+    EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_, _)).Times(3);
//Expect two Clean packages
+    
+    //Build up a Correct Package and Attach a damaged Header behind it, which should lead
to a bad SIG in the next loop
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    
+    
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 36);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    buffer->reset();
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 8);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    buffer->reset();
+    buffer->putInt(11223344);
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 28);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    packetizer->setSession(NULL);
+    delete mSessionPacker;
+    delete packetizer;
+    delete conn;
+}
+
+
+
+TEST_F(EtchPacketizerTest, CorruptedPackagesTest_OnePacket) {
+    EtchURL u("tcp://127.0.0.1:4001");
+    EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+    EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+    capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+    EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+    packetizer->setSession(mSessionPacker);
+
+    EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_, _)).Times(2);
//Expect two Clean packages
+    
+    //Build up a Correct Package and Attach a damaged Header behind it, which should lead
to a bad SIG in the next loop
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    //CorruptedPackage Header
+    buffer->putShort(0xBEEFu); //SIG = DEADBEEF
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 26);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    buffer->reset();
+    buffer->putShort(0xDEADu); //Header SIG is now BEEFDEAD => wrong
+    buffer->put((capu::int8_t*)"123456789123456789123456789", 27); //some bad Data
+    buffer->putInt(packetizer->SIG()); //a fine package behind the corrupted one
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 53);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    packetizer->setSession(NULL);
+    delete mSessionPacker;
+    delete packetizer;
+    delete conn;
+}
+
+
+TEST_F(EtchPacketizerTest, CorruptedPackagesTest_MultiplePacket) {
+    EtchURL u("tcp://127.0.0.1:4001");
+    EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
+    EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+    capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
+    EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+    packetizer->setSession(mSessionPacker);
+    
+    EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_, _)).Times(2);
//Expect two Clean packages
+    
+    //Build up a Correct Package and Attach a damaged Header behind it, which should lead
to a bad SIG in the next loop
+    buffer->putInt(packetizer->SIG());
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    //CorruptedPackage Header
+    buffer->putShort(0xBEEFu); //SIG = DEADBEEF
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 26);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    buffer->reset();
+    buffer->putShort(0xDEADu); //Header SIG is now BEEFDEAD => wrong
+    buffer->put((capu::int8_t*)"123456789123456789123456789", 27); //some bad Data
+    buffer->putInt(0xBEEFDEAD);
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 33);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    buffer->reset();
+    buffer->putShort(0xDEADu); //Header SIG is now DEADDEAD => wrong
+    buffer->put((capu::int8_t*)"123456789123456789123456789", 27); //some bad Data
+    buffer->putInt(0xBEEFDEAD); //Half the Header "DEAD" = the beginning of the correct
Header
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 33);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    buffer->reset();
+    buffer->putShort(0xBEEFu); //Fine with previous data
+    buffer->putInt(16); //size
+    buffer->putInt(11);
+    buffer->putInt(1122);
+    buffer->putInt(112233);
+    buffer->putInt(11223344);
+    buffer->setIndex(0);
+    EXPECT_TRUE(buffer->getLength() == 22);
+    EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
+    
+    packetizer->setSession(NULL);
+    delete mSessionPacker;
+    delete packetizer;
+    delete conn;
+}
+
+
+TEST_F(EtchPacketizerTest, PacketToLargeTest) {
+  EtchURL u("tcp://127.0.0.1:4001?Packetizer.maxPktSize=18");
   EtchTransportData* conn = new EtchTcpConnection(mRuntime, NULL, &u);
   EtchPacketizer* packetizer = new EtchPacketizer(mRuntime, conn, &u);
+
   capu::SmartPointer<EtchFlexBuffer> buffer = new EtchFlexBuffer();
-  //A packet is created
-  capu::int32_t pktsize = 4;
+  EtchSessionPacket* mSessionPacker = new MockMessagizerCorrupted();
+  packetizer->setSession(mSessionPacker);
+  
+  EXPECT_CALL(*(MockMessagizerCorrupted*)mSessionPacker, sessionPacket(_, _)).Times(2); //Expect
two Clean packages
+  
+  //Build up a Correct Package and Attach a damaged Header behind it, which should lead to
a bad SIG in the next loop
   buffer->putInt(packetizer->SIG());
-  buffer->putInt(pktsize);
-  buffer->put((capu::int8_t *)"test", pktsize);
+  buffer->putInt(16); //size
+  buffer->putInt(11);
+  buffer->putInt(1122);
+  buffer->putInt(112233);
+  buffer->putInt(11223344);
+  //CorruptedPackage Header
+  buffer->putInt(packetizer->SIG());
+  buffer->putInt(20); //size
+  buffer->putInt(11);
+  buffer->putInt(1122);
+  buffer->putInt(112233);
+  buffer->putInt(11223344);
+  buffer->putInt(1122334455);
+  buffer->putInt(packetizer->SIG());
+  buffer->putInt(16); //size
+  buffer->putInt(11);
+  buffer->putInt(1122);
+  buffer->putInt(112233);
+  buffer->putInt(11223344);
   buffer->setIndex(0);
-  EXPECT_TRUE(buffer->getLength() == 12);
-
-  EtchSessionPacket* mSessionPacker = new MockMessagizer();
-  packetizer->setSession(mSessionPacker);
-
-  MockMessagizer * mock = (MockMessagizer*) mSessionPacker;
-  EXPECT_CALL(*mock, sessionPacket(capu::SmartPointer<EtchWho > (NULL), buffer));
+  EXPECT_TRUE(buffer->getLength() == 76);
   EXPECT_TRUE(packetizer->sessionData(NULL, buffer) == ETCH_OK);
 
+  
   packetizer->setSession(NULL);
   delete mSessionPacker;
   delete packetizer;



Mime
View raw message