geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kl...@apache.org
Subject [28/94] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917
Date Tue, 23 Feb 2016 20:23:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheServerHelper.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheServerHelper.java
index b120b57,0000000..b0b0be1
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheServerHelper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheServerHelper.java
@@@ -1,187 -1,0 +1,193 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.gemstone.gemfire.internal.cache.tier.sockets;
 +
++import com.gemstone.gemfire.cache.server.CacheServer;
 +import com.gemstone.gemfire.internal.HeapDataOutputStream;
 +import com.gemstone.gemfire.internal.Version;
++import com.gemstone.gemfire.internal.cache.CacheServerImpl;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.util.BlobHelper;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.io.UTFDataFormatException;
 +import java.io.UnsupportedEncodingException;
 +
 +import java.util.zip.GZIPInputStream;
 +import java.util.zip.GZIPOutputStream;
 +
 +/**
 + * <code>CacheServerHelper</code> is a static class that provides helper methods
 + * for the CacheServer classes.
 + *
 + * @author Barry Oglesby
 + * @since 3.5
 + */
- public class CacheServerHelper
-   {
-   public static byte[] serialize(Object obj) throws IOException
-   {
++public class CacheServerHelper {
++  
++  public static void setIsDefaultServer(CacheServer server) {
++    if (server instanceof CacheServerImpl) {
++      ((CacheServerImpl)server).setIsDefaultServer();
++    }
++  }
++  
++  public static boolean isDefaultServer(CacheServer server) {
++    if ( !(server instanceof CacheServerImpl) ) {
++      return false;
++    }
++    return ((CacheServerImpl)server).isDefaultServer();
++  }
++  
++  public static byte[] serialize(Object obj) throws IOException {
 +    return serialize(obj, false);
 +  }
 +
-   public static byte[] serialize(Object obj, boolean zipObject) throws IOException
-   {
++  public static byte[] serialize(Object obj, boolean zipObject) throws IOException {
 +    return zipObject
 +      ? zip(obj)
 +      : BlobHelper.serializeToBlob(obj);
 +  }
 +
-   public static Object deserialize(byte[] blob) throws IOException, ClassNotFoundException
-   {
++  public static Object deserialize(byte[] blob) throws IOException, ClassNotFoundException {
 +    return deserialize(blob, false);
 +  }
 +
-   public static Object deserialize(byte[] blob, boolean unzipObject) throws IOException, ClassNotFoundException
-   {
++  public static Object deserialize(byte[] blob, boolean unzipObject) throws IOException, ClassNotFoundException {
 +    return unzipObject
 +      ? unzip(blob)
 +      : BlobHelper.deserializeBlob(blob);
 +  }
 +
-   public static Object deserialize(byte[] blob, Version version, boolean unzipObject) throws IOException, ClassNotFoundException
-   {
++  public static Object deserialize(byte[] blob, Version version, boolean unzipObject) throws IOException, ClassNotFoundException {
 +    return unzipObject
 +      ? unzip(blob)
 +      : BlobHelper.deserializeBlob(blob, version, null);
 +  }
 +  
-   public static byte[] zip(Object obj) throws IOException
-   {
++  public static byte[] zip(Object obj) throws IOException {
 +//logger.info("CacheServerHelper: Zipping object to blob: " + obj);
 +    ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +    GZIPOutputStream gz = new GZIPOutputStream(baos);
 +    ObjectOutputStream oos = new ObjectOutputStream(gz);
 +    oos.writeObject(obj);
 +    oos.flush();
 +    oos.close();
 +    byte[] blob = baos.toByteArray();
 +//logger.info("CacheServerHelper: Zipped object to blob: " + blob);
 +    return blob;
 +  }
 +
-   public static Object unzip(byte[] blob) throws IOException, ClassNotFoundException
-   {
++  public static Object unzip(byte[] blob) throws IOException, ClassNotFoundException {
 +//logger.info("CacheServerHelper: Unzipping blob to object: " + blob);
 +    ByteArrayInputStream bais = new ByteArrayInputStream(blob);
 +    GZIPInputStream gs = new GZIPInputStream(bais);
 +    ObjectInputStream ois = new ObjectInputStream(gs);
 +    Object obj = ois.readObject();
 +//logger.info("CacheServerHelper: Unzipped blob to object: " + obj);
 +    ois.close();
 +    bais.close();
 +    return obj;
 +  }
 +
 +
 +  /**
 +   * The logic used here is based on java's DataInputStream.writeUTF() from the version 1.6.0_10.
 +   * 
 +   * @param s
 +   * @return byte[]
 +   */
-   public static byte[] toUTF(String s)
-   {
++  public static byte[] toUTF(String s) {
 +    HeapDataOutputStream hdos = new HeapDataOutputStream(s);
 +    return hdos.toByteArray();
 +  }
 +
 +  /**
 +   * The logic used here is based on java's DataInputStream.readUTF() from the version 1.6.0_10.
 +   * @param bytearr
 +   * @return String 
 +   */
-   public static String fromUTF(byte[] bytearr)
-   {
++  public static String fromUTF(byte[] bytearr) {
 +    int utflen = bytearr.length;
 +    int c, char2, char3;
 +    int count = 0;
 +    int chararr_count=0;
 +
 +    char[] chararr = new char[utflen];
 +
 +    while (count < utflen) {
 +      c = (int) bytearr[count] & 0xff;
 +      if (c > 127) break;
 +      count++;
 +      chararr[chararr_count++]=(char)c;
 +    }
 +
 +    while (count < utflen) {
 +      c = (int) bytearr[count] & 0xff;
 +      switch (c >> 4) {
 +        case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
 +          /* 0xxxxxxx*/
 +          count++;
 +          chararr[chararr_count++]=(char)c;
 +          break;
 +        case 12: case 13:
 +          /* 110x xxxx   10xx xxxx*/
 +          count += 2;
 +          if (count > utflen) {
 +            throw new RuntimeException(LocalizedStrings.CacheServerHelper_UTF8_EXCEPTION.toLocalizedString(), 
 +                new UTFDataFormatException("malformed input: partial character at end"));
 +          }
 +          char2 = (int) bytearr[count-1];
 +          if ((char2 & 0xC0) != 0x80)
 +            throw new RuntimeException(
 +                "malformed input around byte " + count);
 +          chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
 +              (char2 & 0x3F));
 +          break;
 +        case 14:
 +          /* 1110 xxxx  10xx xxxx  10xx xxxx */
 +          count += 3;
 +          if (count > utflen){
 +            throw new RuntimeException(LocalizedStrings.CacheServerHelper_UTF8_EXCEPTION.toLocalizedString(), 
 +                new UTFDataFormatException("malformed input: partial character at end")); 
 +          }
 +          char2 = (int) bytearr[count-2];
 +          char3 = (int) bytearr[count-1];
 +          if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)){
 +            throw new RuntimeException(LocalizedStrings.CacheServerHelper_UTF8_EXCEPTION.toLocalizedString(), 
 +                new UTFDataFormatException("malformed input around byte " + (count-1)));
 +          }
 +          chararr[chararr_count++]=(char)(((c     & 0x0F) << 12) |
 +              ((char2 & 0x3F) << 6)  |
 +              ((char3 & 0x3F) << 0));
 +          break;
 +        default:
 +          /* 10xx xxxx,  1111 xxxx */
 +          throw new RuntimeException(LocalizedStrings.CacheServerHelper_UTF8_EXCEPTION.toLocalizedString(), 
 +              new UTFDataFormatException("malformed input around byte " + count));
 +      }
 +    }
 +    // The number of chars produced may be less than utflen
 +    return new String(chararr, 0, chararr_count);
 +  }
 +
 +  private CacheServerHelper() {}
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index f6866bf,0000000..4bfd44b
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@@ -1,1190 -1,0 +1,1100 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.internal.cache.tier.sockets;
 +
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.OutputStream;
 +import java.net.Socket;
 +import java.net.SocketTimeoutException;
 +import java.nio.ByteBuffer;
 +import java.nio.channels.SocketChannel;
 +import java.util.concurrent.Semaphore;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.SerializationException;
 +import com.gemstone.gemfire.internal.Assert;
 +import com.gemstone.gemfire.internal.HeapDataOutputStream;
 +import com.gemstone.gemfire.internal.SocketUtils;
 +import com.gemstone.gemfire.internal.Version;
- import com.gemstone.gemfire.internal.cache.CachedDeserializable;
 +import com.gemstone.gemfire.internal.cache.TXManagerImpl;
 +import com.gemstone.gemfire.internal.cache.tier.MessageType;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 +import com.gemstone.gemfire.internal.offheap.StoredObject;
- import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 +import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 +import com.gemstone.gemfire.internal.util.BlobHelper;
 +
 +/**
 + * This class encapsulates the wire protocol. It provides accessors to
 + * encode and decode a message and  serialize it out to the wire.
 + *
 + * <PRE>
 + * msgType       - int   - 4 bytes type of message, types enumerated below
 + *
 + * msgLength     - int - 4 bytes   total length of variable length payload
 + *
 + * numberOfParts - int - 4 bytes   number of elements (LEN-BYTE* pairs)
 + *                     contained in the payload. Message can
 + *                       be a multi-part message
 + *
 + * transId       - int - 4 bytes  filled in by the requestor, copied back into
 + *                    the response
 + *
-  * earlyAck      - byte- 1 byte   filled in by the requestor
++ * flags         - byte- 1 byte   filled in by the requestor
 + * len1
 + * part1
 + * .
 + * .
 + * .
 + * lenn
 + * partn
 + * </PRE>
 + *
 + * We read the fixed length 16 bytes into a byte[] and populate a bytebuffer
 + * We read the fixed length header tokens from the header
 + * parse the header and use information contained in there to read the payload.
 + *
 + * <P>
 + *
 + * See also <a href="package-summary.html#messages">package description</a>.
 + *
 + * @see com.gemstone.gemfire.internal.cache.tier.MessageType
 + *
-  * @author Sudhir Menon
-  * @since 2.0.2
 + */
 +public class Message  {
 +
 +  private static final Logger logger = LogService.getLogger();
 +  
-   @Override
-   public String toString() {
-     StringBuffer sb = new StringBuffer();
-     sb.append("type=" + MessageType.getString(msgType));
-     sb.append("; payloadLength=" + payloadLength);
-     sb.append("; numberOfParts=" + numberOfParts);
-     sb.append("; transactionId=" + transactionId);
-     //sb.append("; bufferLength=" + bufferLength);
-     sb.append("; currentPart=" + currentPart);
-     sb.append("; messageModified=" + messageModified);
-     sb.append("; earlyAck=" + earlyAck);
-     for (int i = 0; i < numberOfParts; i ++) {
-       sb.append("; part[" + i + "]={");
-       sb.append(this.partsList[i].toString());
-       sb.append("}");
-     }
-     return sb.toString();
-   }
++  private static final int PART_HEADER_SIZE = 5; // 4 bytes for length, 1 byte for isObject
++  
++  private static final int FIXED_LENGTH = 17;
++
++  private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>();
 +
-   protected final static int FIXED_LENGTH = 17;
 +  protected int msgType;
 +  protected int payloadLength=0;
 +  protected int numberOfParts =0;
 +  protected int transactionId = TXManagerImpl.NOTX;
 +  protected int currentPart = 0;
 +  protected Part[] partsList = null;
 +  protected ByteBuffer cachedCommBuffer;
 +  protected Socket socket = null;
 +  protected SocketChannel sockCh = null;
 +  protected OutputStream os = null;
 +  protected InputStream is = null;
 +  protected boolean messageModified = true;
 +  /** is this message a retry of a previously sent message? */
 +  protected boolean isRetry;
-   private byte earlyAck = 0x00;
++  private byte flags = 0x00;
 +  protected MessageStats msgStats = null;
 +  protected ServerConnection sc = null;
-   private int MAX_DATA = -1;
++  private int maxIncomingMessageLength = -1;
 +  private Semaphore dataLimiter = null;
 +//  private int MAX_MSGS = -1;
 +  private Semaphore msgLimiter = null;
 +  private boolean hdrRead = false;  
 +  private int chunkSize = 1024;//Default Chunk Size.
 +
 +  protected Part securePart = null;
++  private boolean isMetaRegion = false;
++
 +
-   // These two statics are fields shoved into the earlyAck byte for transmission.
++  // These two statics are fields shoved into the flags byte for transmission.
 +  // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other
 +  // is left in place
 +  public static final byte MESSAGE_HAS_SECURE_PART = (byte)0x02;
 +  public static final byte MESSAGE_IS_RETRY = (byte)0x04;
 +  
 +  public static final byte MESSAGE_IS_RETRY_MASK = (byte)0xFB;
 +
 +  // Tentative workaround to avoid OOM stated in #46754.
 +  public static final ThreadLocal<Integer> messageType = new ThreadLocal<Integer>();
 +  
 +  Version version;
 +  
 +  /**
 +   * Creates a new message with the given number of parts
 +   */
 +  public Message(int numberOfParts, Version destVersion) {
 +    this.version = destVersion;
 +    Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message");
 +    partsList = new Part[numberOfParts];
 +    this.numberOfParts = numberOfParts;
 +    for (int i=0;i<partsList.length;i++) {
 +      partsList[i] = new Part();
 +    }
 +  }
 +
 +  public boolean isSecureMode() {    
 +    return securePart != null;
 +  }
 +  
 +  public byte[] getSecureBytes()
 +    throws IOException, ClassNotFoundException {
 +    return (byte[])this.securePart.getObject();
 +  }
 +  
 +  public void setMessageType(int msgType) {
 +    this.messageModified = true;
 +    if (!MessageType.validate(msgType)) {
 +      throw new IllegalArgumentException(LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString());
 +    }
 +    this.msgType = msgType;
 +  }
 +  
 +  public void setVersion(Version clientVersion) {
 +    this.version = clientVersion;
 +  }
 +
-   /**
-    * Sets whether this message is early-ack
-    * @param earlyAck whether this message is early-ack
-    */
-   public void setEarlyAck(boolean earlyAck) {
-     if (earlyAck) {
-       this.earlyAck = 0x01;
-     } else {
-       this.earlyAck = 0x00;
-     }
++  public void setMessageHasSecurePartFlag() {
++    this.flags = (byte)(this.flags | MESSAGE_HAS_SECURE_PART);
 +  }
- 
-   // TODO (ashetkar) To be removed later.
-   public void setEarlyAck(byte earlyAck) {
-     // Check that the passed in value is within the acceptable range.
-     if (0x00 <= earlyAck && earlyAck <= 0x02) {
-       this.earlyAck |= earlyAck;
-     }
++  
++  public void clearMessageHasSecurePartFlag() {
++    this.flags = (byte)(this.flags & MESSAGE_HAS_SECURE_PART);
 +  }
 +
-   /*
-    * public void setPayloadLength(int payloadLength) {
-      this.payloadLength = payloadLength;
-   }*/
- 
 +  /**
 +   *  Sets and builds the {@link Part}s that are sent
 +   *  in the payload of the Message
 +   * @param numberOfParts
 +   */
 +  public void setNumberOfParts(int numberOfParts) {
 +    //TODO:hitesh need to add security header here from server
 +    //need to insure it is not chunked message
 +    //should we look message type to avoid internal message like ping
 +    this.messageModified = true;
 +    this.currentPart=0;
 +    this.numberOfParts = numberOfParts;
 +    if (numberOfParts > this.partsList.length) {
 +      Part[] newPartsList = new Part[numberOfParts];
 +      for (int i=0;i<numberOfParts;i++) {
 +        if (i < this.partsList.length) {
 +          newPartsList[i] = this.partsList[i];
 +        } else {
 +          newPartsList[i] = new Part();
 +        }
 +      }
 +      this.partsList = newPartsList;
 +    }
 +  }
 +
 +  public void setTransactionId(int transactionId) {
 +    this.messageModified = true;
 +    this.transactionId = transactionId;
 +  }
 +  
 +  public void setIsRetry() {
 +    this.isRetry = true;
 +  }
 +  
 +  /**
 +   * This returns true if the message has been marked as having been previously
 +   * transmitted to a different server.
 +   */
 +  public boolean isRetry() {
 +    return this.isRetry;
 +  }
 +
 +  /*Sets size for HDOS chunk.*/
 +  public void setChunkSize(int chunkSize) {
 +    this.chunkSize = chunkSize;
 +  }
++  
++  /**
++   * When building a Message this will return the number of the
++   * next Part to be added to the message
++   */
++  public int getNextPartNumber() {
++    return this.currentPart;
++  }
 +
 +  public void addStringPart(String str) {
 +    if (str==null) {
 +      addRawPart((byte[])null, false);
 +    }
 +    else {
 +      HeapDataOutputStream hdos = new HeapDataOutputStream(str);
 +      this.messageModified = true;
 +      Part part = partsList[this.currentPart];
 +      part.setPartState(hdos, false);
 +      this.currentPart++;
 +    }
 +  }
 +
-   /**
-    * Sets whether or not a
-    * <code>DataOutputStream</code>/<code>DataOutputStream</code>
-    * should be used to send/receive data.
-       public void setUseDataStream (boolean useDataStream) {
-         this.useDataStream = useDataStream;
-     }
-    */
- 
 +  /*
 +   * Adds a new part to this message that contains a <code>byte</code>
 +   * array (as opposed to a serialized object).
 +   *
 +   * @see #addPart(byte[], boolean)
 +   */
 +  public void addBytesPart(byte[] newPart) {
 +    addRawPart(newPart, false);
 +  }
 +
 +  public void addStringOrObjPart(Object o) {
 +    if (o instanceof String || o == null) {
 +      addStringPart((String)o);
 +    } else {
 +      // Note even if o is a byte[] we need to serialize it.
 +      // This could be cleaned up but it would require C client code to change.
 +      serializeAndAddPart(o, false);
 +    }
 +  }
 +
-   public void addDeltaPart(HeapDataOutputStream hdos) { // TODO: Amogh- Should it be just DataOutput?
++  public void addDeltaPart(HeapDataOutputStream hdos) {
 +    this.messageModified = true;
 +    Part part = partsList[this.currentPart];
 +    part.setPartState(hdos, false);
 +    this.currentPart++;
 +  }
 +
 +  public void addObjPart(Object o) {
 +    addObjPart(o, false);
 +  }
 +  /**
 +   * Like addObjPart(Object) but also prefers to reference
 +   * objects in the part instead of copying them into a byte buffer.
 +   */
 +  public void addObjPartNoCopying(Object o) {
 +    if (o == null || o instanceof byte[]) {
 +      addRawPart((byte[])o, false);
 +    } else {
 +      serializeAndAddPartNoCopying(o);
 +    }
 +  }
 +  public void addObjPart(Object o, boolean zipValues) {
 +    if (o == null || o instanceof byte[]) {
 +      addRawPart((byte[])o, false);
 +    } else {
 +      serializeAndAddPart(o, zipValues);
 +    }
 +  }
 +  public void addPartInAnyForm(@Unretained Object o, boolean isObject) {
 +    if (o == null) {
 +      addRawPart((byte[])o, false);
 +    } else if (o instanceof byte[]) {
 +      addRawPart((byte[])o, isObject);
 +    } else if (o instanceof StoredObject) {
 +      // It is possible it is an off-heap StoredObject that contains a simple non-object byte[].
 +      this.messageModified = true;
 +      Part part = partsList[this.currentPart];
 +      part.setPartState((StoredObject)o, isObject);
 +      this.currentPart++;
 +    } else {
 +      serializeAndAddPart(o, false);
 +    }
 +  }
 +  
 +  private void serializeAndAddPartNoCopying(Object o) {
 +    HeapDataOutputStream hdos;
 +    Version v = version;
 +    if (version.equals(Version.CURRENT)){
 +      v = null;
 +    }
 +    // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources passed to it.
 +    hdos = new HeapDataOutputStream(chunkSize, v, true);
 +    // TODO OFFHEAP: Change Part to look for an HDOS and just pass a reference to its DirectByteBuffer.
 +    // Then change HDOS sendTo(SocketChannel...) to use the GatheringByteChannel to write a bunch of bbs.
 +    // TODO OFFHEAP This code optimizes one part which works pretty good for getAll since all the values are
 +    // returned in one part. But the following seems even better...
 +    // BETTER: change Message to consolidate all the part hdos bb lists into a single bb array and have it do the GatheringByteChannel write.
 +    // Message can use slice for the small parts (msg header and part header) that are not in the parts data (its a byte array, Chunk, or HDOS).
 +    // EVEN BETTER: the message can have a single HDOS which owns a direct comm buffer. It can reserve space if it does not yet know the value to write (for example the size of the message or part).
 +    // If we write something to the HDOS that is direct then it does not need to be copied.
 +    // But large heap byte arrays will need to be copied to the hdos (the socket write does this anyway).
 +    // If the direct buffer is full then we can allocate another one. If a part is already in a heap byte array
 +    // then we could defer copying it by slicing the current direct bb and then adding the heap byte array
 +    // as bb using ByteBuffer.wrap. Once we have all the data in the HDOS we can finally generate the header
 +    // and then start working on sending the ByteBuffers to the channel. If we have room in a direct bb then
 +    // we can copy a heap bb to it. Otherwise we can write the bb ahead of it which would free up room to copy
 +    // the heap bb to the existing direct bb without needing to allocate extra direct bbs.
 +    // Delaying the flush uses more direct memory but reduces the number of system calls.
 +    try {
- //      logger.fine("hitesh before serializatino: " );
- //      
- //      if (o != null ){
- //        logger.fine("hitesh before serializatino: " + o.toString());
- //        logger.fine("hitesh before serializatino: " + o.getClass().getName());
- //      }
 +      BlobHelper.serializeTo(o, hdos);
 +    } catch (IOException ex) {
 +      throw new SerializationException("failed serializing object", ex);
 +    }
 +    this.messageModified = true;
 +    Part part = partsList[this.currentPart];
 +    part.setPartState(hdos, true);
 +    this.currentPart++;
 +    
 +  }
 +
 +  private void serializeAndAddPart(Object o, boolean zipValues) {
 +    if (zipValues) {
 +      throw new UnsupportedOperationException("zipValues no longer supported");    
 +      
- //       byte[] b = CacheServerHelper.serialize(o, zipValues);
- //       addRawPart(b, true);
 +    } else {
 +      HeapDataOutputStream hdos;
 +      Version v = version;
 +      if (version.equals(Version.CURRENT)){
 +        v = null;
 +      }
 +      hdos = new HeapDataOutputStream(chunkSize, v);
 +      try {
- //        logger.fine("hitesh before serializatino: " );
- //        
- //        if (o != null ){
- //          logger.fine("hitesh before serializatino: " + o.toString());
- //          logger.fine("hitesh before serializatino: " + o.getClass().getName());
- //        }
 +        BlobHelper.serializeTo(o, hdos);
 +      } catch (IOException ex) {
 +        throw new SerializationException("failed serializing object", ex);
 +      }
 +      this.messageModified = true;
 +      Part part = partsList[this.currentPart];
 +      part.setPartState(hdos, true);
 +      this.currentPart++;
 +    }
 +  }
 +
 +  public void addIntPart(int v) {
 +    this.messageModified = true;
 +    Part part = partsList[this.currentPart];
 +    part.setInt(v);
 +    this.currentPart++;
 +  }
 +  
 +  public void addLongPart(long v) {
 +    this.messageModified = true;
 +    Part part = partsList[this.currentPart];
 +    part.setLong(v);
 +    this.currentPart++;
 +  }
 +  
 +  /**
 +   * Adds a new part to this message that may contain a serialized
 +   * object.
 +   */
 +  public void addRawPart(byte[] newPart,boolean isObject) {
 +    this.messageModified = true;
 +    Part part = partsList[this.currentPart];
 +    part.setPartState(newPart, isObject);
 +    this.currentPart++;
 +  }
 +
 +  public int getMessageType() {
 +    return this.msgType;
 +  }
 +
 +  public int getPayloadLength() {
 +    return this.payloadLength;
 +  }
 +
 +  public int getHeaderLength() {
 +    return FIXED_LENGTH;
 +  }
 +
 +  public int getNumberOfParts() {
 +    return this.numberOfParts;
 +  }
 +
 +  public int getTransactionId() {
 +    return this.transactionId;
 +  }
 +  
 +  public Part getPart(int index) {
 +    if (index < this.numberOfParts) {
 +      Part p = partsList[index];
 +      if (this.version != null) {
 +        p.setVersion(this.version);
 +      }
 +      return p;
 +    }
 +    return null;
 +  }
 +
-   public boolean getEarlyAck() {
-     return this.earlyAck == 0x01 ? true: false;
-   }
- 
-   // TODO (ashetkar) To be removed
-   public byte getEarlyAckByte() {
-     return this.earlyAck;
-   }
- 
-   private static ThreadLocal tlCommBuffer = new ThreadLocal();
- 
 +  public static ByteBuffer setTLCommBuffer(ByteBuffer bb) {
-     ByteBuffer result = (ByteBuffer)tlCommBuffer.get();
++    ByteBuffer result = tlCommBuffer.get();
 +    tlCommBuffer.set(bb);
 +    return result;
 +  }
 +
 +  public ByteBuffer getCommBuffer() {
 +    if (this.cachedCommBuffer != null) {
 +      return this.cachedCommBuffer;
 +    }
 +    else {
-       return (ByteBuffer)tlCommBuffer.get();
++      return tlCommBuffer.get();
 +    }
 +  }
 +
 +  public void clear() {
 +    this.isRetry = false;
 +    int len = this.payloadLength;
 +    if (len != 0) {
 +      this.payloadLength = 0;
 +    }
 +    if (this.hdrRead) {
 +      if (this.msgStats != null) {
 +        this.msgStats.decMessagesBeingReceived(len);
 +      }
 +    }
-     if (this.socket != null) {
-       getCommBuffer().clear();
++    ByteBuffer buffer = getCommBuffer();
++    if (buffer != null) {
++      buffer.clear();
 +    }
 +    clearParts();
 +    if (len != 0 && this.dataLimiter != null) {
 +      this.dataLimiter.release(len);
 +      this.dataLimiter = null;
-       this.MAX_DATA = 0;
++      this.maxIncomingMessageLength = 0;
 +    }
 +    if (this.hdrRead) {
 +      if (this.msgLimiter != null) {
 +        this.msgLimiter.release(1);
 +        this.msgLimiter = null;
 +      }
 +      this.hdrRead = false;
 +    }
++    this.flags = 0;
 +  }
 +
 +  protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) {
-     //TODO:hitesh setting second bit of early ack for client 
++    //TODO:hitesh setting second bit of flags byte for client 
 +    //this is not require but this makes all changes easily at client side right now
 +    //just see this bit and process security header
-     byte eAck = this.earlyAck;
++    byte flagsByte = this.flags;
 +    if (isSecurityHeader) {
-       eAck |= MESSAGE_HAS_SECURE_PART;
++      flagsByte |= MESSAGE_HAS_SECURE_PART;
 +    }
 +    if (this.isRetry) {
-       eAck |= MESSAGE_IS_RETRY;
++      flagsByte |= MESSAGE_IS_RETRY;
 +    }
 +    getCommBuffer()
 +      .putInt(this.msgType)
 +      .putInt(msgLen)
 +      .putInt(this.numberOfParts)
 +      .putInt(this.transactionId)
-       .put(eAck);
++      .put(flagsByte);
 +  }
 +
-   private static final int PART_HEADER_SIZE = 5; // 4 bytes for length, 1 byte for isObject
-   
 +  protected Part getSecurityPart() {
 +    if (this.sc != null ) {
 +      //look types right put get etc
 +     return this.sc.updateAndGetSecurityPart(); 
 +    }
 +    return null;
 +  }
 +
 +  public void setSecurePart(byte[] bytes) {
 +    this.securePart = new Part();
 +    this.securePart.setPartState(bytes, false);
 +  }
 +
-   private boolean m_isMetaRegion = false;
- 
 +  public void setMetaRegion(boolean isMetaRegion) {
-     this.m_isMetaRegion = isMetaRegion;
++    this.isMetaRegion = isMetaRegion;
 +  }
 +
 +  public boolean getAndResetIsMetaRegion() {
-     boolean isMetaRegion = this.m_isMetaRegion;
-     this.m_isMetaRegion = false;
++    boolean isMetaRegion = this.isMetaRegion;
++    this.isMetaRegion = false;
 +    return isMetaRegion;
 +  }
 +
 +  /**
 +   * Sends this message out on its socket.
 +   */
 +  protected void sendBytes(boolean clearMessage) throws IOException {
 +    if (this.sc != null) {
 +      // Keep track of the fact that we are making progress.
 +      this.sc.updateProcessingMessage();
 +    }
 +    if (this.socket != null) {
 +      final ByteBuffer cb = getCommBuffer();
 +      if (cb == null) {
 +        throw new IOException("No buffer");
 +      }
 +      synchronized(cb) {
 +        int numOfSecureParts = 0;
 +        Part securityPart = this.getSecurityPart();
 +        boolean isSecurityHeader = false;
 +        
 +        if (securityPart != null) {
 +          isSecurityHeader = true;
 +          numOfSecureParts = 1;
 +        }
 +        else if (this.securePart != null) {
 +          // This is a client sending this message.
 +          securityPart = this.securePart;
 +          isSecurityHeader = true;
 +          numOfSecureParts = 1;          
 +        }
 +
-         //this.logger.fine("hitesh sendbytes forServer_SecurityPart " + numOfSecureParts);
 +        int totalPartLen = 0;
 +        for (int i=0;i<this.numberOfParts;i++){
 +          Part part = this.partsList[i];
 +          totalPartLen += part.getLength();
 +        }
 +
 +        if(numOfSecureParts == 1) {
 +          totalPartLen += securityPart.getLength();
 +        }
 +        int msgLen = (PART_HEADER_SIZE * (this.numberOfParts + numOfSecureParts)) + totalPartLen;
 +        cb.clear();
 +        packHeaderInfoForSending(msgLen, isSecurityHeader);
 +        for (int i=0;i<this.numberOfParts + numOfSecureParts;i++) {
 +          Part part = null;
 +          if(i == this.numberOfParts) {
 +            part = securityPart;
 +          }
 +          else {
 +            part = partsList[i];
 +          }
 +          if (cb.remaining() < PART_HEADER_SIZE) {
 +            flushBuffer();
 +          }
 +          int partLen = part.getLength();
 +          cb.putInt(partLen);
 +          cb.put(part.getTypeCode());
 +          if (partLen <= cb.remaining()) {
 +            part.sendTo(cb);
 +          } else {
 +            flushBuffer();
 +            // send partBytes
 +            if (this.sockCh != null) {
 +              part.sendTo(this.sockCh, cb);
 +            } else {
 +              part.sendTo(this.os, cb);
 +            }
 +            if (this.msgStats != null) {
 +              this.msgStats.incSentBytes(partLen);
 +            }
 +          }
 +        }
 +        if (cb.position() != 0) {
 +          flushBuffer();
 +        }
 +        this.messageModified = false;
 +        if (this.sockCh == null) {
 +          this.os.flush();
 +        }
 +      }
 +      if(clearMessage) {
 +        clearParts();
 +      }
 +    }
 +    else {
 +      throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
 +    }
 +  }
 +
 +  protected void flushBuffer() throws IOException {
 +    final ByteBuffer cb = getCommBuffer();
 +    if (this.sockCh != null) {
 +      cb.flip();
 +      do {
 +        this.sockCh.write(cb);
 +      } while (cb.remaining() > 0);
 +    } else {
 +      this.os.write(cb.array(), 0, cb.position());
 +    }
 +    if (this.msgStats != null) {
 +      this.msgStats.incSentBytes(cb.position());
 +    }
 +    cb.clear();
 +  }
 +
 +  private void read()
 +  throws IOException {
 +    clearParts();
 +    //TODO:Hitesh ??? for server changes make sure sc is not null as this class also used by client :(
 +    readHeaderAndPayload();
 +  }
 +
 +  /**
 +   * Read the actual bytes of the header off the socket
 +   */
 +  protected final void fetchHeader() throws IOException {
 +    final ByteBuffer cb = getCommBuffer();
 +    cb.clear();
 +    // msgType is invalidated here and can be used as an indicator
 +    // of problems reading the message
 +    this.msgType = MessageType.INVALID;
 +
 +    int hdr = 0;
 +
 +    final int headerLength = getHeaderLength();
 +    if (this.sockCh != null) {
 +      cb.limit(headerLength);
 +      do {
 +        int bytesRead = this.sockCh.read(cb);
 +        //System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes commBuffer=" + cb);
 +        if (bytesRead == -1) {
 +          throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER.toLocalizedString());
 +        }
 +        if (this.msgStats != null) {
 +          this.msgStats.incReceivedBytes(bytesRead);
 +        }
 +      } while (cb.remaining() > 0);
 +      cb.flip();
 +    } else {
 +      do {
 +        int bytesRead = -1;
 +        try {
 +          bytesRead = this.is.read(cb.array(),hdr, headerLength-hdr);
 +        }
 +        catch (SocketTimeoutException e) {
 +//          bytesRead = 0;
 +          // TODO add a cancellation check
 +          throw e;
 +        }
 +        if (bytesRead == -1) {
 +          throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER.toLocalizedString());
 +        }
 +        hdr += bytesRead;
 +        if (this.msgStats != null) {
 +          this.msgStats.incReceivedBytes(bytesRead);
 +        }
 +      } while (hdr < headerLength);
 +
 +      // now setup the commBuffer for the caller to parse it
 +      cb.rewind();
 +    }
 +  }
 +
 +  private void readHeaderAndPayload()
 +  throws IOException {
 +    //TODO:Hitesh ???
 +    fetchHeader();
 +    final ByteBuffer cb = getCommBuffer();
 +    final int type = cb.getInt();
 +    final int len = cb.getInt();
 +    final int numParts = cb.getInt();
 +    final int txid = cb.getInt();
-     byte early = cb.get();
++    byte bits = cb.get();
 +    cb.clear();
 +
 +    if (!MessageType.validate(type)) {
 +      throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER.toLocalizedString(Integer.valueOf(type)));
 +    }
 +    int timeToWait = 0;
 +    if (this.sc != null) {
 +      // Keep track of the fact that a message is being processed.
 +      this.sc.setProcessingMessage();
 +      timeToWait = sc.getClientReadTimeout();
 +    }
 +    this.hdrRead = true;
 +    if (this.msgLimiter != null) {
 +        for (;;) {
 +          this.sc.getCachedRegionHelper().checkCancelInProgress(null);
 +          boolean interrupted = Thread.interrupted();
 +          try {
 +            if (timeToWait == 0) {
 +              this.msgLimiter.acquire(1);
 +            } 
 +            else {
 +              if (!this.msgLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) {
 +                if (this.msgStats != null
 +                    && this.msgStats instanceof CacheServerStats) {
 +                  ((CacheServerStats)this.msgStats).incConnectionsTimedOut();
 +                }
 +                throw new IOException(LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS.toLocalizedString(Integer.valueOf(timeToWait)));
 +              }
 +            }
 +            break;
 +          }
 +          catch (InterruptedException e) {
 +            interrupted = true;
 +          }
 +          finally {
 +            if (interrupted) {
 +              Thread.currentThread().interrupt();
 +            }
 +          }
 +        } // for
 +    }
 +    if (len > 0) {
-       if (this.MAX_DATA > 0 && len > this.MAX_DATA) {
-         throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1.toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(this.MAX_DATA)}));
++      if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) {
++        throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1.toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(this.maxIncomingMessageLength)}));
 +      }
 +      if (this.dataLimiter != null) {
 +        for (;;) {
 +          if (sc != null) {
 +            this.sc.getCachedRegionHelper().checkCancelInProgress(null);
 +          }
 +          boolean interrupted = Thread.interrupted();
 +          try {
 +            if (timeToWait == 0) {
 +              this.dataLimiter.acquire(len);
 +            } 
 +            else {
 +              int newTimeToWait = timeToWait;
 +              if (this.msgLimiter != null) {
 +                // may have waited for msg limit so recalc time to wait
 +                newTimeToWait -= (int)sc.getCurrentMessageProcessingTime();
 +              }
 +              if (newTimeToWait <= 0 || !this.msgLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) {
 +                throw new IOException(LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS.toLocalizedString(timeToWait));
 +              }
 +            }
 +            this.payloadLength = len; // makes sure payloadLength gets set now so we will release the semaphore
 +            break; // success
 +          }
 +          catch (InterruptedException e) {
 +            interrupted = true;
 +          }
 +          finally {
 +            if (interrupted) {
 +              Thread.currentThread().interrupt();
 +            }
 +          }
 +        }
 +      }
 +    }
 +    if (this.msgStats != null) {
 +      this.msgStats.incMessagesBeingReceived(len);
 +      this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear
 +    }
 +    
-     this.isRetry = (early & MESSAGE_IS_RETRY) != 0;
-     early = (byte)(early & MESSAGE_IS_RETRY_MASK);
- 
-     //TODO:hitesh it was below ??
-     this.earlyAck = early;
++    this.isRetry = (bits & MESSAGE_IS_RETRY) != 0;
++    bits = (byte)(bits & MESSAGE_IS_RETRY_MASK);
++    this.flags = bits;
++    // TODO why is the msgType set twice, here and after reading the payload fields?
 +    this.msgType = type;
-     //this.logger.fine("Before reading message parts, earlyAck already read as " + this.earlyAck);
++
 +    readPayloadFields(numParts, len);
 +
 +    // Set the header and payload fields only after receiving all the
 +    // socket data, providing better message consistency in the face
 +    // of exceptional conditions (e.g. IO problems, timeouts etc.)
 +    this.msgType = type;
 +    this.payloadLength = len;
 +    // this.numberOfParts = numParts;  Already set in setPayloadFields via setNumberOfParts
 +    this.transactionId = txid;
-     this.earlyAck = early;
++    this.flags = bits;
 +    if (this.sc != null) {
 +      // Keep track of the fact that a message is being processed.
 +      this.sc.updateProcessingMessage();
 +    }
 +  }
 +
- //   static final int MAX_PART_BUFFERS = 2;
- //   static final int MIN_PART_BUFFER_SIZE = 999;
- //   static final int MAX_PART_BUFFER_SIZE = 1024*1024*11;
- //   static ArrayList partBuffers = new ArrayList(2);
- //   static int partBufferIdx = 0;
- //   static {
- //     for (int i=0; i < MAX_PART_BUFFERS; i++) {
- //       partBuffers.add(i, null);
- //     }
- //   }
- 
- //   private static synchronized byte[] getPartBuffer(int size) {
- //     byte[] result;
- //     synchronized (partBuffers) {
- //       result = (byte[])partBuffers.get(partBufferIdx);
- //       if (result == null) {
- //         result = new byte[size];
- //         partBuffers.add(partBufferIdx, result);
- //       } else if (result.length != size) {
- //         // can't use a cached one
- //         return null;
- //       }
- //       partBufferIdx++;
- //       if (partBufferIdx >= MAX_PART_BUFFERS) {
- //         partBufferIdx = 0;
- //       }
- //     }
- //     return result;
- //   }
- 
 +  protected void readPayloadFields(final int numParts, final int len)
 +  throws IOException {
 +    //TODO:Hitesh
 +    if (len > 0 && numParts <= 0 ||
 +        len <= 0 && numParts > 0) {
 +      throw new IOException(LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT.toLocalizedString(
 +            new Object[] {Integer.valueOf(len), Integer.valueOf(numParts)}));
 +    }
 +
 +    Integer msgType = messageType.get();
 +    if (msgType != null && msgType == MessageType.PING) {
 +      messageType.set(null); // set it to null right away.
 +      int pingParts = 10; // Some number which will not throw OOM but still be acceptable for a ping operation.
 +      if (numParts > pingParts) {
 +        throw new IOException("Part length ( " + numParts
 +            + " ) is  inconsistent for " + MessageType.getString(msgType)
 +            + " operation.");
 +      }
 +    }
 +    setNumberOfParts(numParts);
 +    if (numParts <= 0)
 +      return;
 +  
 +    if (len < 0) {
 +      logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, len));
 +      throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
 +    }    
 +    
 +    final ByteBuffer cb = getCommBuffer();
 +    cb.clear();
 +    cb.flip();
 +
 +    int readSecurePart = 0;
 +    //TODO:Hitesh look if securePart can be cached here
-     //this.logger.fine("readPayloadFields() early ack = " + this.earlyAck);
 +    readSecurePart = checkAndSetSecurityPart();
 +    
 +    int bytesRemaining = len;
-     //this.logger.fine("readPayloadFields() : numParts=" + numParts + " len=" + len);
 +    for (int i = 0; ((i < numParts + readSecurePart) || ((readSecurePart == 1) && (cb
 +        .remaining() > 0))); i++) {
 +      int bytesReadThisTime = readPartChunk(bytesRemaining);
 +      bytesRemaining -= bytesReadThisTime;
 +
 +      Part part;
 +      
 +      if(i < numParts) {
 +        part = this.partsList[i];
 +      }
 +      else {
 +        part = this.securePart;
 +      }
 +      
 +      int partLen = cb.getInt();
 +      byte partType = cb.get();
 +      byte[] partBytes = null;
- //      this.logger.fine("readPayloadFields(): partLen=" + partLen + " partType=" + partType);
 +      if (partLen > 0) {
- //         if (partLen >= MIN_PART_BUFFER_SIZE && partLen <= MAX_PART_BUFFER_SIZE) {
- //           partBytes = getPartBuffer(partLen);
- //         }
- //         if (partBytes == null) {
-           partBytes = new byte[partLen];
- //         }
++        partBytes = new byte[partLen];
 +        int alreadyReadBytes = cb.remaining();
 +        if (alreadyReadBytes > 0) {
 +          if (partLen < alreadyReadBytes) {
 +            alreadyReadBytes = partLen;
 +          }
 +          cb.get(partBytes, 0, alreadyReadBytes);
 +        }
 +        // now we need to read partLen - alreadyReadBytes off the wire
 +        int off = alreadyReadBytes;
 +        int remaining = partLen - off;
 +        while (remaining > 0) {
 +          if (this.sockCh != null) {
 +            int bytesThisTime = remaining;
 +            cb.clear();
 +            if (bytesThisTime > cb.capacity()) {
 +              bytesThisTime = cb.capacity();
 +            }
 +            cb.limit(bytesThisTime);
 +            int res = this.sockCh.read(cb);
-             //System.out.println("DEBUG: part read " + res + " bytes commBuffer=" + cb);
 +            if (res != -1) {
 +              cb.flip();
 +              bytesRemaining -= res;
 +              remaining -= res;
 +              cb.get(partBytes, off, res);
 +              off += res;
 +              if (this.msgStats != null) {
 +                this.msgStats.incReceivedBytes(res);
 +              }
 +            } else {
 +              throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_A_PART.toLocalizedString());
 +            }
 +          } else {
 +            int res = 0;
 +            try {
 +              res = this.is.read(partBytes, off, remaining);
 +            }
 +            catch (SocketTimeoutException e) {
- //              res = 0;
 +              // TODO: add cancellation check
 +              throw e;
 +            }
 +            if (res != -1) {
 +              bytesRemaining -= res;
 +              remaining -= res;
 +              off += res;
 +              if (this.msgStats != null) {
 +                this.msgStats.incReceivedBytes(res);
 +              }
 +            } else {
 +              throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_A_PART.toLocalizedString());
 +            }
 +          }
 +        }
 +      }
 +      part.init(partBytes, partType);
 +    }
 +  }
 +
 +  protected int checkAndSetSecurityPart() {
-     if ((this.earlyAck | MESSAGE_HAS_SECURE_PART) == this.earlyAck) {
++    if ((this.flags | MESSAGE_HAS_SECURE_PART) == this.flags) {
 +      this.securePart = new Part();
 +      return 1;
 +    }
 +    else {
 +      this.securePart = null;
 +      return 0;
 +    }
 +  }
 +
 +  /**
 +   * @param bytesRemaining the most bytes we can read
 +   * @return the number of bytes read into commBuffer
 +   */
 +  private int readPartChunk(int bytesRemaining) throws IOException {
 +    final ByteBuffer cb = getCommBuffer();
-     //this.logger.info("DEBUG: commBuffer.remaining=" + cb.remaining());
 +    if (cb.remaining() >= PART_HEADER_SIZE) {
 +      // we already have the next part header in commBuffer so just return
 +      return 0;
 +    }
 +    if (cb.position() != 0) {
 +      cb.compact();
 +    } else {
 +      cb.position(cb.limit());
 +      cb.limit(cb.capacity());
 +    }
 +    int bytesRead = 0;
 +    if (this.sc != null) {
 +      // Keep track of the fact that we are making progress
 +      this.sc.updateProcessingMessage();
 +    }
 +    if (this.sockCh != null) {
 +      int remaining = cb.remaining();
 +      if (remaining > bytesRemaining) {
 +        remaining = bytesRemaining;
 +        cb.limit(cb.position()+bytesRemaining);
 +      }
 +      while (remaining > 0) {
 +        int res = this.sockCh.read(cb);
-         //System.out.println("DEBUG: partChunk read " + res + " bytes commBuffer=" + cb);
 +        if (res != -1) {
 +          remaining -= res;
 +          bytesRead += res;
 +          if (this.msgStats != null) {
 +            this.msgStats.incReceivedBytes(res);
 +          }
 +        } else {
 +          throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_PAYLOAD.toLocalizedString());
 +        }
 +      }
 +
 +    } else {
 +      int bufSpace = cb.capacity() - cb.position();
 +      int bytesToRead = bufSpace;
 +      if (bytesRemaining < bytesToRead) {
 +        bytesToRead = bytesRemaining;
 +      }
 +      int pos = cb.position();
 +      while (bytesToRead > 0) {
 +        int res = 0;
 +        try {
 +          res = this.is.read(cb.array(), pos, bytesToRead);
 +        }
 +        catch (SocketTimeoutException e) {
- //          res = 0;
 +          // TODO add a cancellation check
 +          throw e;
 +        }
 +        if (res != -1) {
 +          bytesToRead -= res;
 +          pos += res;
 +          bytesRead += res;
 +          if (this.msgStats != null) {
 +            this.msgStats.incReceivedBytes(res);
 +          }
 +        } else {
 +          throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_PAYLOAD.toLocalizedString());
 +        }
 +      }
 +      cb.position(pos);
 +    }
 +    cb.flip();
 +    return bytesRead;
 +  }
 +
 +  /**
 +   * Gets rid of all the parts that have been added to this message.
 +   */
 +  public void clearParts() {
 +    for (int i=0; i< partsList.length; i++){
 +      partsList[i].clear();
 +    }
 +    this.currentPart=0;
 +  }
++
++  @Override
++  public String toString() {
++    StringBuffer sb = new StringBuffer();
++    sb.append("type=").append(MessageType.getString(msgType));
++    sb.append("; payloadLength=").append(payloadLength);
++    sb.append("; numberOfParts=").append(numberOfParts);
++    sb.append("; transactionId=").append(transactionId);
++    sb.append("; currentPart=").append(currentPart);
++    sb.append("; messageModified=").append(messageModified);
++    sb.append("; flags=").append(Integer.toHexString(flags));
++    for (int i = 0; i < numberOfParts; i ++) {
++      sb.append("; part[").append(i).append("]={");
++      sb.append(this.partsList[i].toString());
++      sb.append("}");
++    }
++    return sb.toString();
++  }
++
++  
 +  public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
 +    this.sc = sc;
 +    setComms(socket, bb, msgStats);
 +  }
 +
 +  public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
 +    this.sockCh = socket.getChannel();
 +    if (this.sockCh == null) {
 +      setComms(socket, SocketUtils.getInputStream(socket), SocketUtils.getOutputStream(socket), bb, msgStats);
 +    } else {
 +      setComms(socket, null, null,  bb, msgStats);
 +    }
 +  }
 +  
 +  public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb, MessageStats msgStats)
 +    throws IOException
 +  {
 +    Assert.assertTrue(socket != null);
 +    this.socket = socket;
 +    this.sockCh = socket.getChannel();
 +    this.is = is;
 +    this.os = os;
 +    this.cachedCommBuffer = bb;
 +    this.msgStats = msgStats;
 +  }
 +  /**
 +   * Undo any state changes done by setComms.
 +   * @since 5.7
 +   */
 +  public void unsetComms() {
 +    this.socket = null;
 +    this.sockCh = null;
 +    this.is = null;
 +    this.os = null;
 +    this.cachedCommBuffer = null;
 +    this.msgStats = null;
 +  }
 +
 +  /**
 +   * Sends this message to its receiver over its
 +   * setOutputStream?? output stream.
 +   */
 +  public void send()
 +  throws IOException {
 +    send(true);
 +  }
 +  
 +  public void send(ServerConnection servConn)
 +  throws IOException {
 +    if (this.sc != servConn) throw new IllegalStateException("this.sc was not correctly set");
 +    send(true);
 +  }
 +  
 +  /**
 +   * Sends this message to its receiver over its
 +   * setOutputStream?? output stream.
 +   */
 +  public void send(boolean clearMessage)
 +  throws IOException {
 +    sendBytes(clearMessage);
 +  }
 +
 +  /**
 +   *  Populates the stats of this <code>Message</code> with information
 +   *  received via its socket
 +   */
 +  public void recv()
 +  throws IOException {
 +    if (this.socket != null) {
 +      synchronized(getCommBuffer()) {
 +        read();
 +      }
 +    }
 +    else {
 +      throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
 +    }
 +  }
-   public void recv(ServerConnection sc, int MAX_DATA, Semaphore dataLimiter, int MAX_MSGS, Semaphore msgLimiter)
++  public void recv(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter, Semaphore msgLimiter)
 +  throws IOException {
 +    this.sc = sc;
-     this.MAX_DATA = MAX_DATA;
++    this.maxIncomingMessageLength = maxMessageLength;
 +    this.dataLimiter = dataLimiter;
- //    this.MAX_MSGS = MAX_MSGS;
 +    this.msgLimiter = msgLimiter;
 +    recv();
 +  }
 +
-   public boolean canStartRemoteTransaction() {
-     return true;
-   }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
index 5418c68,0000000..f5f6326
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
@@@ -1,457 -1,0 +1,452 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.internal.cache.tier.sockets;
 +
 +import com.gemstone.gemfire.internal.*;
 +import com.gemstone.gemfire.internal.cache.CachedDeserializable;
 +import com.gemstone.gemfire.internal.offheap.Chunk;
 +import com.gemstone.gemfire.internal.offheap.DataAsAddress;
 +import com.gemstone.gemfire.internal.offheap.StoredObject;
 +import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
 +
 +import java.io.*;
 +import java.nio.*;
 +import java.nio.channels.*;
 +
 +/**
 + * Represents one unit of information (essentially a <code>byte</code>
 + * array) in the wire protocol.  Each server connection runs in its
 + * own thread to maximize concurrency and improve response times to
 + * edge requests
 + *
 + * @see Message
 + *
 + * @author Sudhir Menon
 + * @since 2.0.2
 + */
 +public class Part {
 +  private static final byte BYTE_CODE = 0;
 +  private static final byte OBJECT_CODE = 1;
 +  
 +  private Version version;
 +  /**
 +   * Used to represent and empty byte array for bug 36279
 +   * @since 5.1
 +   */
 +  private static final byte EMPTY_BYTEARRAY_CODE = 2;
 +  private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
 +
 +  /** The payload of this part.
 +   * Could be null, a byte[] or a HeapDataOutputStream on the send side.
 +   * Could be null, or a byte[] on the receiver side.
 +   */
 +  private Object part;
 +
 +  /** Is the payload (<code>part</code>) a serialized object? */
 +  private byte typeCode;
 +
 +  public void init(byte[] v, byte tc) {
 +    if (tc == EMPTY_BYTEARRAY_CODE) {
 +      this.part = EMPTY_BYTE_ARRAY;
 +    }
 +    else {
 +      this.part = v;
 +    }
 +    this.typeCode = tc;
 +  }
 +
- //   public void init(HeapDataOutputStream os, byte typeCode) {
- //     this.part = os;
- //     this.typeCode = typeCode;
- //   }
 +
 +  public void clear() {
 +    this.part = null;
 +    this.typeCode = BYTE_CODE;
 +  }
 +
 +  public boolean isNull() {
 +    if (this.part == null) {
 +      return true;
 +    }
 +    if (isObject() && this.part instanceof byte[]) {
 +      byte[] b = (byte[])this.part;
 +      if (b.length == 1 && b[0] == DSCODE.NULL) {
 +        return true;
 +      }
 +    }
 +    return false;
 +  }
 +  public boolean isObject() {
 +    return this.typeCode == OBJECT_CODE;
 +  }
 +  public boolean isBytes() {
 +    return this.typeCode == BYTE_CODE || this.typeCode == EMPTY_BYTEARRAY_CODE;
 +  }
- //   public boolean isString() {
- //     return this.typeCode == STRING_CODE;
- //   }
++
 +  public void setPartState(byte[] b, boolean isObject) {
 +    if (isObject) {
 +      this.typeCode = OBJECT_CODE;
 +    } else if (b != null && b.length == 0) {
 +      this.typeCode = EMPTY_BYTEARRAY_CODE;
 +      b = EMPTY_BYTE_ARRAY;
 +    } else {
 +      this.typeCode = BYTE_CODE;
 +    }
 +    this.part = b;
 +  }
++  
 +  public void setPartState(HeapDataOutputStream os, boolean isObject) {
 +    if (isObject) {
 +      this.typeCode = OBJECT_CODE;
 +      this.part = os;
 +    } else if (os != null && os.size() == 0) {
 +      this.typeCode = EMPTY_BYTEARRAY_CODE;
 +      this.part = EMPTY_BYTE_ARRAY;
 +    } else {
 +      this.typeCode = BYTE_CODE;
 +      this.part = os;
 +    }
 +  }
 +  public void setPartState(StoredObject so, boolean isObject) {
 +    if (isObject) {
 +      this.typeCode = OBJECT_CODE;
 +    } else if (so.getValueSizeInBytes() == 0) {
 +      this.typeCode = EMPTY_BYTEARRAY_CODE;
 +      this.part = EMPTY_BYTE_ARRAY;
 +      return;
 +    } else {
 +      this.typeCode = BYTE_CODE;
 +    }
 +    if (so instanceof DataAsAddress) {
 +      this.part = ((DataAsAddress)so).getRawBytes();
 +    } else {
 +      this.part = (Chunk)so;
 +    }
 +  }
 +  public byte getTypeCode() {
 +    return this.typeCode;
 +  }
 +  /**
 +   * Return the length of the part. The length is the number of bytes needed
 +   * for its serialized form.
 +   */
 +  public int getLength() {
 +    if (this.part == null) {
 +      return 0;
 +    } else if (this.part instanceof byte[]) {
 +      return ((byte[])this.part).length;
 +    } else if (this.part instanceof Chunk) {
 +      return ((Chunk) this.part).getValueSizeInBytes();
 +    } else {
 +      return ((HeapDataOutputStream)this.part).size();
 +    }
 +  }
 +  public String getString() {
 +    if (this.part == null) {
 +      return null;
 +    }
 +    if (!isBytes()) {
 +      Assert.assertTrue(false, "expected String part to be of type BYTE, part ="
 +          + this.toString());
 +    }
 +    return CacheServerHelper.fromUTF((byte[])this.part);
 +  }
 +  
 +  public int getInt() {
 +    if (!isBytes()) {
 +      Assert.assertTrue(false, "expected int part to be of type BYTE, part = "
 +          + this.toString()); 
 +    }
 +    if (getLength() != 4) {
 +      Assert.assertTrue(false, 
 +          "expected int length to be 4 but it was " + getLength()
 +          + "; part = " + this.toString());
 +    }
 +    byte[] bytes = getSerializedForm();
 +    return decodeInt(bytes, 0);
 +  }
 +
 +  public static int decodeInt(byte[] bytes, int offset) {
 +    return (((bytes[offset + 0]) << 24) & 0xFF000000)
 +        | (((bytes[offset + 1]) << 16) & 0x00FF0000)
 +        | (((bytes[offset + 2]) << 8) & 0x0000FF00)
 +        | ((bytes[offset + 3]) & 0x000000FF);
 +  }
 +
 +  public void setInt(int v) {
 +    byte[] bytes = new byte[4];
 +    encodeInt(v, bytes);
 +    this.typeCode = BYTE_CODE;
 +    this.part = bytes;
 +  }
 +
 +  /**
 +   * @since 5.7
 +   */
 +  public static void encodeInt(int v, byte[] bytes) {
 +    encodeInt(v, bytes, 0);
 +  }
 +
 +  public static void encodeInt(int v, byte[] bytes, int offset) {
 +    // encode an int into the given byte array
 +    bytes[offset + 0] = (byte) ((v & 0xFF000000) >> 24);
 +    bytes[offset + 1] = (byte) ((v & 0x00FF0000) >> 16);
 +    bytes[offset + 2] = (byte) ((v & 0x0000FF00) >> 8 );
 +    bytes[offset + 3] = (byte) (v & 0x000000FF);
 +  }
 +  
 +  public void setLong(long v) {
 +    byte[] bytes = new byte[8];
 +    bytes[0] = (byte) ((v & 0xFF00000000000000l) >> 56);
 +    bytes[1] = (byte) ((v & 0x00FF000000000000l) >> 48);
 +    bytes[2] = (byte) ((v & 0x0000FF0000000000l) >> 40);
 +    bytes[3] = (byte) ((v & 0x000000FF00000000l) >> 32);
 +    bytes[4] = (byte) ((v & 0x00000000FF000000l) >> 24);
 +    bytes[5] = (byte) ((v & 0x0000000000FF0000l) >> 16);
 +    bytes[6] = (byte) ((v & 0x000000000000FF00l) >>  8);
 +    bytes[7] = (byte) (v & 0xFF);
 +    this.typeCode = BYTE_CODE;
 +    this.part = bytes;
 +  }
 +
 +  public long getLong() {
 +    if (!isBytes()) {
 +      Assert.assertTrue(false, "expected long part to be of type BYTE, part = "
 +          + this.toString()); 
 +    }
 +    if (getLength() != 8) {
 +      Assert.assertTrue(false, 
 +          "expected long length to be 8 but it was " + getLength()
 +          + "; part = " + this.toString());
 +    }
 +    byte[] bytes = getSerializedForm();
 +    return ((((long)bytes[0]) << 56) & 0xFF00000000000000l) |
 +           ((((long)bytes[1]) << 48) & 0x00FF000000000000l) |
 +           ((((long)bytes[2]) << 40) & 0x0000FF0000000000l) |
 +           ((((long)bytes[3]) << 32) & 0x000000FF00000000l) |
 +           ((((long)bytes[4]) << 24) & 0x00000000FF000000l) |
 +           ((((long)bytes[5]) << 16) & 0x0000000000FF0000l) |
 +           ((((long)bytes[6]) <<  8) & 0x000000000000FF00l) |
 +           (        bytes[7]         & 0x00000000000000FFl);
 +  }
 +
 +
 +  public byte[] getSerializedForm() {
 +    if (this.part == null) {
 +      return null;
 +    } else if (this.part instanceof byte[]) {
 +      return (byte[])this.part;
 +    } else {
 +      return null; // should not be called on sender side?
 +    }
 +  }
 +  public Object getObject(boolean unzip) throws IOException, ClassNotFoundException {
 +    if (isBytes()) {
 +      return this.part;
 +    }
 +    else {
 +      if (this.version != null) {
 +        return CacheServerHelper.deserialize((byte[])this.part, this.version,
 +            unzip);
 +      }
 +      else {
 +        return CacheServerHelper.deserialize((byte[])this.part, unzip);
 +      }
 +    }
 +  }
 +  public Object getObject() throws IOException, ClassNotFoundException {
 +    return getObject(false);
 +  }
 +
 +  public Object getStringOrObject() throws IOException, ClassNotFoundException {
 +    if (isObject()) {
 +      return getObject();
 +    } else {
 +      return getString();
 +    }
 +  }
 +  
 +  /**
 +   * Write the contents of this part to the specified output stream.
 +   * This is only called for parts that will not fit into the commBuffer
 +   * so they need to be written directly to the stream.
 +   * A stream is used because the client is configured for old IO (instead of nio).
 +   * @param buf the buffer to use if any data needs to be copied to one
 +   */
 +  public final void sendTo(OutputStream out, ByteBuffer buf) throws IOException {
 +    if (getLength() > 0) {
 +      if (this.part instanceof byte[]) {
 +        byte[] bytes = (byte[])this.part;
 +        out.write(bytes, 0, bytes.length);
 +      } else if (this.part instanceof Chunk) {
 +        Chunk c = (Chunk) this.part;
 +        ByteBuffer cbb = c.createDirectByteBuffer();
 +        if (cbb != null) {
 +          HeapDataOutputStream.writeByteBufferToStream(out,  buf, cbb);
 +        } else {
 +          int bytesToSend = c.getDataSize();
 +          long addr = c.getAddressForReading(0, bytesToSend);
 +          while (bytesToSend > 0) {
 +            if (buf.remaining() == 0) {
 +              HeapDataOutputStream.flushStream(out,  buf);
 +            }
 +            buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
 +            addr++;
 +            bytesToSend--;
 +          }
 +        }
 +      } else {
 +        HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
 +        hdos.sendTo(out, buf);
 +        hdos.rewind();
 +      }
 +    }
 +  }
 +  /**
 +   * Write the contents of this part to the specified byte buffer.
 +   * Precondition: caller has already checked the length of this part
 +   * and it will fit into "buf".
 +   */
 +  public final void sendTo(ByteBuffer buf) {
 +    if (getLength() > 0) {
 +      if (this.part instanceof byte[]) {
 +        buf.put((byte[])this.part);
 +      } else if (this.part instanceof Chunk) {
 +        Chunk c = (Chunk) this.part;
 +        ByteBuffer bb = c.createDirectByteBuffer();
 +        if (bb != null) {
 +          buf.put(bb);
 +        } else {
 +          int bytesToSend = c.getDataSize();
 +          long addr = c.getAddressForReading(0, bytesToSend);
 +          while (bytesToSend > 0) {
 +            buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
 +            addr++;
 +            bytesToSend--;
 +          }
 +        }
 +      } else {
 +        HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
 +        hdos.sendTo(buf);
 +        hdos.rewind();
 +      }
 +    }
 +  }
 +  /**
 +   * Write the contents of this part to the specified socket channel
 +   * using the specified byte buffer.
 +   * This is only called for parts that will not fit into the commBuffer
 +   * so they need to be written directly to the socket.
 +   * Precondition: buf contains nothing that needs to be sent
 +   */
 +  public final void sendTo(SocketChannel sc, ByteBuffer buf) throws IOException {
 +    if (getLength() > 0) {
 +      final int BUF_MAX = buf.capacity();
 +      if (this.part instanceof byte[]) {
 +        final byte[] bytes = (byte[])this.part;
 +        int off = 0;
 +        int len = bytes.length;
 +        buf.clear();
 +        while (len > 0) {
 +          int bytesThisTime = len;
 +          if (bytesThisTime > BUF_MAX) {
 +            bytesThisTime = BUF_MAX;
 +          }
 +          buf.put(bytes, off, bytesThisTime);
 +          len -= bytesThisTime;
 +          off += bytesThisTime;
 +          buf.flip();
 +          while (buf.remaining() > 0) {
 +            sc.write(buf);
 +          }
 +          buf.clear();
 +        }
 +      } else if (this.part instanceof Chunk) {
 +        // instead of copying the Chunk to buf try to create a direct ByteBuffer and
 +        // just write it directly to the socket channel.
 +        Chunk c = (Chunk) this.part;
 +        ByteBuffer bb = c.createDirectByteBuffer();
 +        if (bb != null) {
 +          while (bb.remaining() > 0) {
 +            sc.write(bb);
 +          }
 +        } else {
 +          int len = c.getDataSize();
 +          long addr = c.getAddressForReading(0, len);
 +          buf.clear();
 +          while (len > 0) {
 +            int bytesThisTime = len;
 +            if (bytesThisTime > BUF_MAX) {
 +              bytesThisTime = BUF_MAX;
 +            }
 +            len -= bytesThisTime;
 +            while (bytesThisTime > 0) {
 +              buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
 +              addr++;
 +              bytesThisTime--;
 +            }
 +            buf.flip();
 +            while (buf.remaining() > 0) {
 +              sc.write(buf);
 +            }
 +            buf.clear();
 +          }
 +        }
 +      } else {
 +        HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
 +        hdos.sendTo(sc, buf);
 +        hdos.rewind();
 +      }
 +    }
 +  }
 +  
 +  static private String typeCodeToString(byte c) {
 +    switch (c) {
 +    case BYTE_CODE:
 +      return "BYTE_CODE";
 +    case OBJECT_CODE:
 +      return "OBJECT_CODE";
 +    case EMPTY_BYTEARRAY_CODE:
 +      return "EMPTY_BYTEARRAY_CODE";
 +    default:
 +      return "unknown code " + c;
 +    }
 +  }
 +
 +  @Override
 +  public String toString() {
 +    StringBuffer sb = new StringBuffer();
 +    sb.append("partCode=");
 +    sb.append(typeCodeToString(this.typeCode));
 +    sb.append(" partLength=" + getLength());
 +//    sb.append(" partBytes=");
 +//    byte[] b = getSerializedForm();
 +//    if (b == null) {
 +//      sb.append("null");
 +//    }
 +//    else {
 +//      sb.append("(");
 +//      for (int i = 0; i < b.length; i ++) {
 +//        sb.append(Integer.toString(b[i]));
 +//        sb.append(" ");
 +//      }
 +//      sb.append(")");
 +//    }
 +    return sb.toString();
 +  }
 +
 +  public void setVersion(Version clientVersion) {
 +    this.version = clientVersion;
 +  }
 +}


Mime
View raw message