apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tus...@apache.org
Subject [1/2] apex-core git commit: APEXCORE-591 - SubscribeRequestTuple has wrong buffer size when mask is zero
Date Thu, 12 Jan 2017 17:42:41 GMT
Repository: apex-core
Updated Branches:
  refs/heads/master 99d3a9538 -> 7ea7f6073


APEXCORE-591 - SubscribeRequestTuple has wrong buffer size when mask is zero


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/97b298e9
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/97b298e9
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/97b298e9

Branch: refs/heads/master
Commit: 97b298e934bcc5f452483e5dfaa0c956efe91612
Parents: 3f06ce7
Author: Vlad Rozov <v.rozov@datatorrent.com>
Authored: Fri Dec 16 08:31:05 2016 -0800
Committer: Vlad Rozov <v.rozov@datatorrent.com>
Committed: Fri Dec 16 08:31:05 2016 -0800

----------------------------------------------------------------------
 .../bufferserver/packet/DataTuple.java          | 24 +------
 .../bufferserver/packet/EmptyTuple.java         | 32 ---------
 .../packet/GenericRequestTuple.java             | 37 ++++------
 .../bufferserver/packet/PayloadTuple.java       | 44 +++---------
 .../packet/PublishRequestTuple.java             |  5 ++
 .../bufferserver/packet/PurgeRequestTuple.java  |  5 ++
 .../bufferserver/packet/RequestTuple.java       | 31 +++------
 .../bufferserver/packet/ResetRequestTuple.java  |  5 ++
 .../bufferserver/packet/ResetWindowTuple.java   | 32 ++-------
 .../packet/SubscribeRequestTuple.java           | 72 +++++++-------------
 .../datatorrent/bufferserver/packet/Tuple.java  | 70 +++++++++----------
 .../bufferserver/packet/WindowIdTuple.java      | 37 +++-------
 .../packet/SubscribeRequestTupleTest.java       | 24 +++++--
 13 files changed, 143 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
index cb1ad5f..aee10a6 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java
@@ -33,33 +33,15 @@ public class DataTuple extends Tuple
   }
 
   @Override
-  public int getWindowId()
+  public MessageType getType()
   {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getPartition()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
+    return MessageType.CODEC_STATE;
   }
 
   @Override
   public Slice getData()
   {
-    return new Slice(buffer, offset + 1, length - 1);
-  }
-
-  @Override
-  public int getBaseSeconds()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getWindowWidth()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
+    return new Slice(buffer, offset, limit - offset);
   }
 
   public static byte[] getSerializedTuple(byte type, Slice f)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
index 3c3f184..831311e 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java
@@ -18,8 +18,6 @@
  */
 package com.datatorrent.bufferserver.packet;
 
-import com.datatorrent.netlet.util.Slice;
-
 /**
  * <p>EmptyTuple class.</p>
  *
@@ -38,36 +36,6 @@ public class EmptyTuple extends Tuple
     return MessageType.NO_MESSAGE;
   }
 
-  @Override
-  public int getWindowId()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getPartition()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public Slice getData()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getBaseSeconds()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getWindowWidth()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
   public static byte[] getSerializedTuple(byte value)
   {
     return new byte[] {value};

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
index c94c6d5..7ee28cc 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java
@@ -45,58 +45,51 @@ public class GenericRequestTuple extends RequestTuple
   }
 
   @Override
+  public MessageType getType()
+  {
+    return null;
+  }
+
+  @Override
   public boolean isValid()
   {
     return valid;
   }
 
   @Override
-  public void parse()
+  protected void parse()
   {
     parsed = true;
 
-    int dataOffset = offset + 1;
-    int limit = offset + length;
-
     try {
       /*
        * read the version.
        */
-      int idlen = readVarInt(dataOffset, limit);
+      int idlen = readVarInt();
       if (idlen > 0) {
-        while (buffer[dataOffset++] < 0) {
-        }
-        version = new String(buffer, dataOffset, idlen);
-        dataOffset += idlen;
+        version = new String(buffer, offset, idlen);
+        offset += idlen;
       } else if (idlen == 0) {
         version = EMPTY_STRING;
-        dataOffset++;
       } else {
         return;
       }
       /*
        * read the identifier.
        */
-      idlen = readVarInt(dataOffset, limit);
+      idlen = readVarInt();
       if (idlen > 0) {
-        while (buffer[dataOffset++] < 0) {
-        }
-        identifier = new String(buffer, dataOffset, idlen);
-        dataOffset += idlen;
+        identifier = new String(buffer, offset, idlen);
+        offset += idlen;
       } else if (idlen == 0) {
         identifier = EMPTY_STRING;
-        dataOffset++;
       } else {
         return;
       }
 
-      baseSeconds = readVarInt(dataOffset, limit);
-      while (buffer[dataOffset++] < 0) {
-      }
+      baseSeconds = readVarInt();
 
-      windowId = readVarInt(dataOffset, limit);
-      while (buffer[dataOffset++] < 0) {
-      }
+      windowId = readVarInt();
 
       valid = true;
     } catch (NumberFormatException nfe) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
index 256fc05..e75a314 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java
@@ -18,6 +18,8 @@
  */
 package com.datatorrent.bufferserver.packet;
 
+import java.nio.ByteBuffer;
+
 import com.datatorrent.netlet.util.Slice;
 
 /**
@@ -41,23 +43,13 @@ public class PayloadTuple extends Tuple
   @Override
   public int getPartition()
   {
-    int p = buffer[offset + 1];
-    p |= buffer[offset + 2] << 8;
-    p |= buffer[offset + 3] << 16;
-    p |= buffer[offset + 4] << 24;
-    return p;
-  }
-
-  @Override
-  public int getWindowId()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
+    return ByteBuffer.wrap(buffer, offset, 4).getInt();
   }
 
   @Override
   public Slice getData()
   {
-    return new Slice(buffer, offset + 5, length - 5);
+    return new Slice(buffer, offset + 4, limit - offset - 4);
   }
 
   @Override
@@ -66,37 +58,21 @@ public class PayloadTuple extends Tuple
     return "PayloadTuple{" + getPartition() + ", " + getData() + '}';
   }
 
-  @Override
-  public int getBaseSeconds()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getWindowWidth()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
   public static byte[] getSerializedTuple(int partition, int size)
   {
     byte[] array = new byte[size + 5];
-    array[0] = MessageType.PAYLOAD_VALUE;
-    array[1] = (byte)partition;
-    array[2] = (byte)(partition >> 8);
-    array[3] = (byte)(partition >> 16);
-    array[4] = (byte)(partition >> 24);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(array);
+    byteBuffer.put(MessageType.PAYLOAD_VALUE);
+    byteBuffer.putInt(partition);
     return array;
   }
 
   public static byte[] getSerializedTuple(int partition, Slice f)
   {
     byte[] array = new byte[5 + f.length];
-    array[0] = MessageType.PAYLOAD_VALUE;
-    array[1] = (byte)partition;
-    array[2] = (byte)(partition >> 8);
-    array[3] = (byte)(partition >> 16);
-    array[4] = (byte)(partition >> 24);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(array);
+    byteBuffer.put(MessageType.PAYLOAD_VALUE);
+    byteBuffer.putInt(partition);
     System.arraycopy(f.buffer, f.offset, array, 5, f.length);
     return array;
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
index 6a9ba39..23d233e 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java
@@ -30,6 +30,11 @@ public class PublishRequestTuple extends GenericRequestTuple
     super(array, offset, len);
   }
 
+  public MessageType getType()
+  {
+    return MessageType.PUBLISHER_REQUEST;
+  }
+
   public static byte[] getSerializedRequest(final String version, final String identifier,
final long startingWindowId)
   {
     return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId,

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
index 1db3131..cb65e14 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PurgeRequestTuple.java
@@ -30,6 +30,11 @@ public class PurgeRequestTuple extends GenericRequestTuple
     super(array, offset, length);
   }
 
+  public MessageType getType()
+  {
+    return MessageType.PURGE_REQUEST;
+  }
+
   public static byte[] getSerializedRequest(String version, String id, long windowId)
   {
     return GenericRequestTuple.getSerializedRequest(version, id, windowId, MessageType.PURGE_REQUEST_VALUE);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
index 53505b4..cc830a9 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java
@@ -18,7 +18,8 @@
  */
 package com.datatorrent.bufferserver.packet;
 
-import com.datatorrent.netlet.util.Slice;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * <p>Abstract RequestTuple class.</p>
@@ -27,12 +28,18 @@ import com.datatorrent.netlet.util.Slice;
  */
 public abstract class RequestTuple extends Tuple
 {
+  private static final Logger logger = LoggerFactory.getLogger(RequestTuple.class);
+
   protected boolean valid;
   protected boolean parsed;
 
-  public RequestTuple(byte[] buffer, int offset, int length)
+  protected RequestTuple(byte[] buffer, int offset, int length)
   {
     super(buffer, offset, length);
+    parse();
+    if (!isValid()) {
+      logger.error("Invalid Request Tuple of type {} received!", getType());
+    }
   }
 
   public boolean isValid()
@@ -40,25 +47,7 @@ public abstract class RequestTuple extends Tuple
     return valid;
   }
 
-  @Override
-  public int getPartition()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public Slice getData()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getWindowWidth()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  public abstract void parse();
+  protected abstract void parse();
 
   public abstract String getVersion();
 

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
index 17ca585..33796aa 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java
@@ -30,6 +30,11 @@ public class ResetRequestTuple extends GenericRequestTuple
     super(array, offset, length);
   }
 
+  public MessageType getType()
+  {
+    return MessageType.RESET_REQUEST;
+  }
+
   public static byte[] getSerializedRequest(final String version, final String identifier,
final long startingWindowId)
   {
     return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId,

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
index 6045416..c5b1c40 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java
@@ -18,8 +18,6 @@
  */
 package com.datatorrent.bufferserver.packet;
 
-
-import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.netlet.util.VarInt;
 
 /**
@@ -29,9 +27,14 @@ import com.datatorrent.netlet.util.VarInt;
  */
 public class ResetWindowTuple extends Tuple
 {
+  private final int baseSeconds;
+  private final int windowWidth;
+
   public ResetWindowTuple(byte[] buffer, int offset, int length)
   {
     super(buffer, offset, length);
+    baseSeconds = readVarInt();
+    windowWidth = readVarInt();
   }
 
   @Override
@@ -41,36 +44,15 @@ public class ResetWindowTuple extends Tuple
   }
 
   @Override
-  public int getWindowId()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getPartition()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public Slice getData()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
   public int getBaseSeconds()
   {
-    return readVarInt(offset + 1, offset + length);
+    return baseSeconds;
   }
 
   @Override
   public int getWindowWidth()
   {
-    int intervalOffset = offset + 1;
-    while (buffer[intervalOffset++] < 0) {
-    }
-    return readVarInt(intervalOffset, offset + length);
+    return windowWidth;
   }
 
   public static byte[] getSerializedTuple(int baseSeconds, int windowWidth)

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
index e029015..37efb70 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java
@@ -46,113 +46,91 @@ public class SubscribeRequestTuple extends RequestTuple
   private int bufferSize;
 
   @Override
+  public MessageType getType()
+  {
+    return MessageType.SUBSCRIBER_REQUEST;
+  }
+
+  @Override
   public void parse()
   {
     parsed = true;
-    int dataOffset = offset + 1;
-    int limit = offset + length;
     try {
       /*
        * read the version.
        */
-      int idlen = readVarInt(dataOffset, limit);
+      int idlen = readVarInt();
       if (idlen > 0) {
-        while (buffer[dataOffset++] < 0) {
-        }
-        version = new String(buffer, dataOffset, idlen);
-        dataOffset += idlen;
+        version = new String(buffer, offset, idlen);
+        offset += idlen;
       } else if (idlen == 0) {
         version = EMPTY_STRING;
-        dataOffset++;
       } else {
         return;
       }
       /*
        * read the identifier.
        */
-      idlen = readVarInt(dataOffset, limit);
+      idlen = readVarInt();
       if (idlen > 0) {
-        while (buffer[dataOffset++] < 0) {
-        }
-        identifier = new String(buffer, dataOffset, idlen);
-        dataOffset += idlen;
+        identifier = new String(buffer, offset, idlen);
+        offset += idlen;
       } else if (idlen == 0) {
         identifier = EMPTY_STRING;
-        dataOffset++;
       } else {
         return;
       }
 
-      baseSeconds = readVarInt(dataOffset, limit);
-      while (buffer[dataOffset++] < 0) {
-      }
+      baseSeconds = readVarInt();
 
-      windowId = readVarInt(dataOffset, limit);
-      while (buffer[dataOffset++] < 0) {
-      }
+      windowId = readVarInt();
       /*
        * read the type
        */
-      idlen = readVarInt(dataOffset, limit);
+      idlen = readVarInt();
       if (idlen > 0) {
-        while (buffer[dataOffset++] < 0) {
-        }
-        streamType = new String(buffer, dataOffset, idlen);
-        dataOffset += idlen;
+        streamType = new String(buffer, offset, idlen);
+        offset += idlen;
       } else if (idlen == 0) {
         streamType = EMPTY_STRING;
-        dataOffset++;
       } else {
         return;
       }
       /*
        * read the upstream identifier
        */
-      idlen = readVarInt(dataOffset, limit);
+      idlen = readVarInt();
       if (idlen > 0) {
-        while (buffer[dataOffset++] < 0) {
-        }
-        upstreamIdentifier = new String(buffer, dataOffset, idlen);
-        dataOffset += idlen;
+        upstreamIdentifier = new String(buffer, offset, idlen);
+        offset += idlen;
       } else if (idlen == 0) {
         upstreamIdentifier = EMPTY_STRING;
-        dataOffset++;
       } else {
         return;
       }
       /*
        * read the partition count
        */
-      int count = readVarInt(dataOffset, limit);
+      int count = readVarInt();
       if (count > 0) {
-        while (buffer[dataOffset++] < 0) {
-        }
-        mask = readVarInt(dataOffset, limit);
-        if (mask > 0) {
-          while (buffer[dataOffset++] < 0) {
-          }
-        } else {
+        mask = readVarInt();
+        if (mask <= 0) {
           /* mask cannot be zero */
           return;
         }
         partitions = new int[count];
         for (int i = 0; i < count; i++) {
-          partitions[i] = readVarInt(dataOffset, limit);
+          partitions[i] = readVarInt();
           if (partitions[i] == -1) {
             return;
-          } else {
-            while (buffer[dataOffset++] < 0) {
-            }
           }
         }
       }
 
-      bufferSize = readVarInt(dataOffset, limit);
+      bufferSize = readVarInt();
       if (bufferSize == -1) {
         return;
       }
-      while (buffer[dataOffset++] < 0) {
-      }
 
       valid = true;
     } catch (NumberFormatException nfe) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
index 408e8a2..a75c7f1 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java
@@ -35,15 +35,15 @@ public abstract class Tuple
 {
   public static final String CLASSIC_VERSION = "1.0";
   public static final String FAST_VERSION = "1.1";
-  public final byte[] buffer;
-  public final int offset;
-  public final int length;
+  protected final byte[] buffer;
+  protected int offset;
+  protected final int limit;
 
-  public Tuple(byte[] array, int offset, int length)
+  protected Tuple(byte[] array, int offset, int length)
   {
     this.buffer = array;
-    this.offset = offset;
-    this.length = length;
+    this.offset = offset + 1;
+    this.limit = offset + length;
   }
 
   public static Tuple getTuple(byte[] buffer, int offset, int length)
@@ -77,36 +77,16 @@ public abstract class Tuple
         return new WindowIdTuple(buffer, offset, length);
 
       case PUBLISHER_REQUEST:
-        PublishRequestTuple prt = new PublishRequestTuple(buffer, offset, length);
-        prt.parse();
-        if (!prt.isValid()) {
-          logger.error("Unparseable Generic Request Tuple of type {} received!", MessageType.valueOf(buffer[offset]));
-        }
-        return prt;
+        return new PublishRequestTuple(buffer, offset, length);
 
       case PURGE_REQUEST:
-        PurgeRequestTuple purgert = new PurgeRequestTuple(buffer, offset, length);
-        purgert.parse();
-        if (!purgert.isValid()) {
-          logger.error("Unparseable Purge Request Tuple of type {} received!", MessageType.valueOf(buffer[offset]));
-        }
-        return purgert;
+        return new PurgeRequestTuple(buffer, offset, length);
 
       case RESET_REQUEST:
-        ResetRequestTuple resetrt = new ResetRequestTuple(buffer, offset, length);
-        resetrt.parse();
-        if (!resetrt.isValid()) {
-          logger.error("Unparseable Reset Request Tuple of type {} received!", MessageType.valueOf(buffer[offset]));
-        }
-        return resetrt;
+        return new ResetRequestTuple(buffer, offset, length);
 
       case SUBSCRIBER_REQUEST:
-        SubscribeRequestTuple srt = new SubscribeRequestTuple(buffer, offset, length);
-        srt.parse();
-        if (!srt.isValid()) {
-          logger.error("Unparseable Subscriber Request Tuple received!");
-        }
-        return srt;
+        return new SubscribeRequestTuple(buffer, offset, length);
 
       default:
         return null;
@@ -120,7 +100,7 @@ public abstract class Tuple
     return offset + identifier.getBytes().length;
   }
 
-  public int readVarInt(int offset, int limit)
+  protected int readVarInt()
   {
     if (offset < limit) {
       byte tmp = buffer[offset++];
@@ -161,20 +141,32 @@ public abstract class Tuple
                                     + Arrays.toString(Arrays.copyOfRange(buffer, offset,
limit)));
   }
 
-  public MessageType getType()
+  public abstract MessageType getType();
+
+  public int getWindowId()
   {
-    return MessageType.valueOf(buffer[offset]);
+    throw new UnsupportedOperationException("Not supported yet.");
   }
 
-  public abstract int getWindowId();
-
-  public abstract int getPartition();
+  public int getPartition()
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 
-  public abstract Slice getData();
+  public Slice getData()
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 
-  public abstract int getBaseSeconds();
+  public int getBaseSeconds()
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 
-  public abstract int getWindowWidth();
+  public int getWindowWidth()
+  {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
 
   private static final Logger logger = LoggerFactory.getLogger(Tuple.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
index 014827c..7ac9c6c 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java
@@ -18,8 +18,6 @@
  */
 package com.datatorrent.bufferserver.packet;
 
-
-import com.datatorrent.netlet.util.Slice;
 import com.datatorrent.netlet.util.VarInt;
 
 /**
@@ -29,45 +27,26 @@ import com.datatorrent.netlet.util.VarInt;
  */
 public class WindowIdTuple extends Tuple
 {
+  private final MessageType messageType;
+  private final int windowId;
+
   public WindowIdTuple(byte[] array, int offset, int length)
   {
     super(array, offset, length);
-  }
-
-  @Override
-  public int getWindowId()
-  {
-    return readVarInt(offset + 1, offset + length);
+    messageType = MessageType.valueOf(array[offset]);
+    windowId = readVarInt();
   }
 
   @Override
   public MessageType getType()
   {
-    return MessageType.valueOf(buffer[offset]);
-  }
-
-  @Override
-  public int getPartition()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public Slice getData()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
+    return messageType;
   }
 
   @Override
-  public int getBaseSeconds()
-  {
-    throw new UnsupportedOperationException("Not supported yet.");
-  }
-
-  @Override
-  public int getWindowWidth()
+  public int getWindowId()
   {
-    throw new UnsupportedOperationException("Not supported yet.");
+    return windowId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-core/blob/97b298e9/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
index 20c658a..f7b8829 100644
--- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
+++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java
@@ -23,7 +23,8 @@ import org.testng.annotations.Test;
 
 import static com.datatorrent.bufferserver.packet.SubscribeRequestTuple.getSerializedRequest;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 
 /**
  *
@@ -44,17 +45,30 @@ public class SubscribeRequestTupleTest
     ArrayList<Integer> partitions = new ArrayList<Integer>();
     partitions.add(5);
     long startingWindowId = 0xcafebabe00000078L;
-    byte[] serial = getSerializedRequest(null, id, down_type, upstream_id, mask, partitions,
startingWindowId, 0);
+
+    byte[] serial = getSerializedRequest(null, id, down_type, upstream_id, mask, partitions,
startingWindowId, 32 * 1024);
     SubscribeRequestTuple tuple = (SubscribeRequestTuple)Tuple.getTuple(serial, 0, serial.length);
+
     assertEquals(tuple.getIdentifier(), id, "Identifier");
     assertEquals(tuple.getStreamType(), down_type, "UpstreamType");
     assertEquals(tuple.getUpstreamIdentifier(), upstream_id, "UpstreamId");
     assertEquals(tuple.getMask(), mask, "Mask");
-
+    assertEquals(tuple.getBufferSize(), 32 * 1024, "BufferSize");
     int[] parts = tuple.getPartitions();
-    assertTrue(parts != null && parts.length == 1 && parts[0] == 5);
+    assertNotNull(parts);
+    assertEquals(parts.length, 1);
+    assertEquals(parts[0], 5);
+    assertEquals((long)tuple.getBaseSeconds() << 32 | tuple.getWindowId(), startingWindowId,
"Window");
+
+    serial = getSerializedRequest(null, id, down_type, upstream_id, 0, null, startingWindowId,
32 * 1024);
+    tuple = (SubscribeRequestTuple)Tuple.getTuple(serial, 0, serial.length);
 
+    assertEquals(tuple.getIdentifier(), id, "Identifier");
+    assertEquals(tuple.getStreamType(), down_type, "UpstreamType");
+    assertEquals(tuple.getUpstreamIdentifier(), upstream_id, "UpstreamId");
+    assertEquals(tuple.getMask(), 0, "Mask");
+    assertEquals(tuple.getBufferSize(), 32 * 1024, "BufferSize");
+    assertNull(tuple.getPartitions());
     assertEquals((long)tuple.getBaseSeconds() << 32 | tuple.getWindowId(), startingWindowId,
"Window");
   }
-
 }


Mime
View raw message