geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [21/53] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Mon, 06 Jul 2015 18:15:28 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
index ac1f5c7..5350f1a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegionDataView.java
@@ -29,8 +29,8 @@ public class LocalRegionDataView implements InternalDataView {
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getDeserializedValue(java.lang.Object, com.gemstone.gemfire.internal.cache.LocalRegion, boolean)
    */
   public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion,
-      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones) {
-    return localRegion.getDeserializedValue(keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones);
+      boolean updateStats, boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadsFromHDFS, boolean retainResult) {
+    return localRegion.getDeserializedValue(null, keyInfo, updateStats, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadsFromHDFS, retainResult);
   }
 
   /* (non-Javadoc)
@@ -129,8 +129,8 @@ public class LocalRegionDataView implements InternalDataView {
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#findObject(com.gemstone.gemfire.internal.cache.LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
    */
   public Object findObject(KeyInfo keyInfo, LocalRegion r, boolean isCreate,
-      boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) {
-   return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, clientEvent, returnTombstones);
+      boolean generateCallbacks, Object value, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) {
+   return r.nonTxnFindObject(keyInfo, isCreate, generateCallbacks, value, disableCopyOnRead, preferCD, clientEvent, returnTombstones, allowReadFromHDFS);
   }
 
   /* (non-Javadoc)
@@ -172,7 +172,7 @@ public class LocalRegionDataView implements InternalDataView {
    * (non-Javadoc)
    * @see com.gemstone.gemfire.internal.cache.InternalDataView#getSerializedValue(com.gemstone.gemfire.internal.cache.BucketRegion, java.lang.Object, java.lang.Object)
    */
-  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) throws DataLocationException {
+  public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS) throws DataLocationException {
     throw new IllegalStateException();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/MemberFunctionStreamingMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/MemberFunctionStreamingMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/MemberFunctionStreamingMessage.java
index fed1763..ecfe970 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/MemberFunctionStreamingMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/MemberFunctionStreamingMessage.java
@@ -398,4 +398,9 @@ public class MemberFunctionStreamingMessage extends DistributionMessage implemen
   public boolean canParticipateInTransaction() {
     return true;
   }
+  
+  @Override
+  public boolean isTransactionDistributed() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
index 20d09f2..dcf4115 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/NonLocalRegionEntry.java
@@ -193,6 +193,11 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
     return this.value;
   }
   
+  @Override
+  public final Object getValueRetain(RegionEntryContext context) {
+    return this.value;
+  }
+  
   /** update the value held in this non-local region entry */
   void setCachedValue(Object newValue) {
     this.value = newValue;
@@ -244,7 +249,7 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
     throw new UnsupportedOperationException(LocalizedStrings.PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY.toLocalizedString());
   }
   @Override
-  public Object _getValueUse(RegionEntryContext context, boolean decompress) {
+  public Object _getValueRetain(RegionEntryContext context, boolean decompress) {
     throw new UnsupportedOperationException(LocalizedStrings.PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY.toLocalizedString());
   }
   @Override
@@ -445,7 +450,27 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
   public void setUpdateInProgress(boolean underUpdate) {
     throw new UnsupportedOperationException(LocalizedStrings.PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY.toLocalizedString());
   }
-  
+
+  @Override
+  public boolean isMarkedForEviction() {
+    throw new UnsupportedOperationException(LocalizedStrings
+        .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+            .toLocalizedString());
+  }
+  @Override
+  public void setMarkedForEviction() {
+    throw new UnsupportedOperationException(LocalizedStrings
+        .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+            .toLocalizedString());
+  }
+
+  @Override
+  public void clearMarkedForEviction() {
+    throw new UnsupportedOperationException(LocalizedStrings
+        .PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY
+            .toLocalizedString());
+  }
+
   @Override
   public boolean isValueNull() {
     return (null == getValueAsToken());
@@ -531,4 +556,15 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp {
   @Override
   public void resetRefCount(NewLRUClockHand lruList) {
   }
+
+  @Override
+  public Object prepareValueForCache(RegionEntryContext r, Object val, boolean isEntryUpdate) {
+    throw new IllegalStateException("Should never be called");
+  }
+
+  @Override
+  public Object prepareValueForCache(RegionEntryContext r, Object val,
+      EntryEventImpl event, boolean isEntryUpdate) {
+    throw new IllegalStateException("Should never be called");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OffHeapRegionEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OffHeapRegionEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OffHeapRegionEntry.java
new file mode 100644
index 0000000..39aa633
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OffHeapRegionEntry.java
@@ -0,0 +1,26 @@
+
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.internal.offheap.Releasable;
+
+/**
+ * Any RegionEntry that is stored off heap must
+ * implement this interface.
+ * 
+ * @author darrel
+ *
+ */
+public interface OffHeapRegionEntry extends RegionEntry, Releasable {
+  /**
+   * OFF_HEAP_FIELD_READER
+   * @return OFF_HEAP_ADDRESS
+   */
+  public long getAddress();
+  /**
+   * OFF_HEAP_FIELD_WRITER
+   * @param expectedAddr OFF_HEAP_ADDRESS
+   * @param newAddr OFF_HEAP_ADDRESS
+   * @return newAddr OFF_HEAP_ADDRESS
+   */
+  public boolean setAddress(long expectedAddr, long newAddr);
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OfflineCompactionDiskRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OfflineCompactionDiskRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OfflineCompactionDiskRegion.java
index 57887cd..f8e3c55 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OfflineCompactionDiskRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OfflineCompactionDiskRegion.java
@@ -24,12 +24,12 @@ public class OfflineCompactionDiskRegion extends DiskRegion implements DiskRecov
           new DummyCancelCriterion(),
           new DummyDiskExceptionHandler(),
           null, drv.getFlags(), drv.getPartitionName(), drv.getStartingBucketId(),
-          drv.getCompressorClassName());
+          drv.getCompressorClassName(), drv.getOffHeap());
     setConfig(drv.getLruAlgorithm(), drv.getLruAction(), drv.getLruLimit(),
               drv.getConcurrencyLevel(), drv.getInitialCapacity(),
               drv.getLoadFactor(), drv.getStatisticsEnabled(),
               drv.isBucket(), drv.getFlags(), drv.getPartitionName(), drv.getStartingBucketId(),
-              drv.getCompressorClassName());
+              drv.getCompressorClassName(), drv.getOffHeap());
   }
   
   static OfflineCompactionDiskRegion create(DiskStoreImpl dsi, DiskRegionView drv) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
index 8642a37..f2384f0 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/Oplog.java
@@ -70,9 +70,11 @@ import com.gemstone.gemfire.internal.FileUtil;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.InsufficientDiskSpaceException;
 import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.InternalDataSerializer.Sendable;
 import com.gemstone.gemfire.internal.InternalStatisticsDisabledException;
+import com.gemstone.gemfire.internal.Sendable;
 import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.Flushable;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 import com.gemstone.gemfire.internal.cache.DiskInitFile.DiskRegionFlag;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.OplogCompactor;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.OplogEntryIdSet;
@@ -96,6 +98,12 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.sequencelog.EntryLogger;
 import com.gemstone.gemfire.internal.shared.NativeCalls;
 import com.gemstone.gemfire.internal.util.BlobHelper;
@@ -115,7 +123,7 @@ import com.gemstone.gemfire.pdx.internal.PdxWriterImpl;
  * @since 5.1
  */
 
-public final class Oplog implements CompactableOplog {
+public final class Oplog implements CompactableOplog, Flushable {
   private static final Logger logger = LogService.getLogger();
 
   /** Extension of the oplog file * */
@@ -543,6 +551,7 @@ public final class Oplog implements CompactableOplog {
    * cause a switch of oplogs
    */
   final Object lock = new Object();
+  final ByteBuffer[] bbArray = new ByteBuffer[2];
 
   private boolean lockedForKRFcreate = false;
 
@@ -3462,15 +3471,9 @@ public final class Oplog implements CompactableOplog {
    * @param userBits
    * @throws IOException
    */
-  private void initOpState(byte opCode, DiskRegionView dr, DiskEntry entry, byte[] value, byte userBits, boolean notToUseUserBits)
-      throws IOException {
-    int len = value != null ? value.length : 0;
-    initOpState(opCode, dr, entry, value, len, userBits, notToUseUserBits);
-  }
-
-  private void initOpState(byte opCode, DiskRegionView dr, DiskEntry entry, byte[] value, int valueLength, byte userBits,
+  private void initOpState(byte opCode, DiskRegionView dr, DiskEntry entry, ValueWrapper value, byte userBits,
       boolean notToUseUserBits) throws IOException {
-    this.opState.initialize(opCode, dr, entry, value, valueLength, userBits, notToUseUserBits);
+    this.opState.initialize(opCode, dr, entry, value, userBits, notToUseUserBits);
   }
 
   private void clearOpState() {
@@ -3488,29 +3491,8 @@ public final class Oplog implements CompactableOplog {
     return this.opState.getValueOffset();
   }
 
-  private byte calcUserBits(byte[] value, boolean isSerializedObject) {
-    byte userBits = 0x0;
-
-    if (isSerializedObject) {
-      if (value == DiskEntry.INVALID_BYTES) {
-        // its the invalid token
-        userBits = EntryBits.setInvalid(userBits, true);
-      } else if (value == DiskEntry.LOCAL_INVALID_BYTES) {
-        // its the local-invalid token
-        userBits = EntryBits.setLocalInvalid(userBits, true);
-      } else if (value == DiskEntry.TOMBSTONE_BYTES) {
-        // its the tombstone token
-        userBits = EntryBits.setTombstone(userBits, true);
-      } else {
-        if (value == null) {
-          throw new IllegalStateException("userBits==1 and value is null");
-        } else if (value.length == 0) {
-          throw new IllegalStateException("userBits==1 and value is zero length");
-        }
-        userBits = EntryBits.setSerialized(userBits, true);
-      }
-    }
-    return userBits;
+  private byte calcUserBits(ValueWrapper vw) {
+    return vw.getUserBits();
   }
 
   /**
@@ -3539,18 +3521,14 @@ public final class Oplog implements CompactableOplog {
    *          The DiskEntry object for this key/value pair.
    * @param value
    *          byte array representing the value
-   * @param isSerializedObject
-   *          boolean indicating whether the byte array is a serialized value or
-   *          not Do the bytes in <code>value</code> contain a serialized object
-   *          (or an actually <code>byte</code> array)?
    * @throws DiskAccessException
    * @throws IllegalStateException
    * 
    */
-  public final void create(LocalRegion region, DiskEntry entry, byte[] value, boolean isSerializedObject, boolean async) {
+  public final void create(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) {
 
     if (this != getOplogSet().getChild()) {
-      getOplogSet().getChild().create(region, entry, value, isSerializedObject, async);
+      getOplogSet().getChild().create(region, entry, value, async);
     } else {
       DiskId did = entry.getDiskId();
       boolean exceptionOccured = false;
@@ -3559,7 +3537,7 @@ public final class Oplog implements CompactableOplog {
       try {
         // It is ok to do this outside of "lock" because
         // create records do not need to change.
-        byte userBits = calcUserBits(value, isSerializedObject);
+        byte userBits = calcUserBits(value);
         // save versions for creates and updates even if value is bytearrary in
         // 7.0
         if (entry.getVersionStamp() != null) {
@@ -3612,7 +3590,7 @@ public final class Oplog implements CompactableOplog {
    * @throws IOException
    * @throws InterruptedException
    */
-  private void basicCreate(DiskRegion dr, DiskEntry entry, byte[] value, byte userBits, boolean async) throws IOException,
+  private void basicCreate(DiskRegion dr, DiskEntry entry, ValueWrapper value, byte userBits, boolean async) throws IOException,
       InterruptedException {
     DiskId id = entry.getDiskId();
     boolean useNextOplog = false;
@@ -3689,7 +3667,7 @@ public final class Oplog implements CompactableOplog {
         // }
         this.crf.currSize = temp;
         if (EntryBits.isNeedsValue(userBits)) {
-          id.setValueLength(value.length);
+          id.setValueLength(value.getLength());
         } else {
           id.setValueLength(0);
         }
@@ -3711,8 +3689,8 @@ public final class Oplog implements CompactableOplog {
           }
           logger.trace(LogMarker.PERSIST_WRITES,
               "basicCreate: id=<{}> key=<{}> valueOffset={} userBits={} valueLen={} valueBytes={} drId={} versionTag={} oplog#{}",
-              abs(id.getKeyId()), entry.getKey(), startPosForSynchOp, userBits, (value != null ? value.length : 0),
-              baToString(value), dr.getId(), tag, getOplogId());
+              abs(id.getKeyId()), entry.getKey(), startPosForSynchOp, userBits, (value != null ? value.getLength() : 0),
+              value.getBytesAsString(), dr.getId(), tag, getOplogId());
         }
         id.setOffsetInOplog(startPosForSynchOp);
         addLive(dr, entry);
@@ -4138,6 +4116,9 @@ public final class Oplog implements CompactableOplog {
         return;
       }
 
+      this.krf.dos.flush();
+      this.krf.fos.getChannel().force(true);
+
       this.krf.dos.close();
       this.krf.dos = null;
       this.krf.bos.close();
@@ -4159,7 +4140,11 @@ public final class Oplog implements CompactableOplog {
       allClosed = true;
     } catch (IOException e) {
       // TODO Auto-generated catch block
-      throw new DiskAccessException("Fail to close krf file " + this.krf.f, e, getParent());
+      if (getParent().getDiskAccessException() == null) {
+        throw new DiskAccessException("Fail to close krf file " + this.krf.f, e, getParent());
+      } else {
+        logger.info("Fail to close krf file " + this.krf.f+", but a DiskAccessException happened ealier", getParent().getDiskAccessException());
+      }
     } finally {
       if (!allClosed) {
         // IOException happened during close, delete this krf
@@ -4317,10 +4302,10 @@ public final class Oplog implements CompactableOplog {
     DiskId did = entry.getDiskId();
     byte userBits = 0;
     long oplogOffset = did.getOffsetInOplog();
-    Object value = entry._getValueUse(dr, true); // OFFHEAP for now copy into
-                                                 // heap CD; todo optimize by
-                                                 // keeping offheap for life of
-                                                 // wrapper
+    SimpleMemoryAllocatorImpl.skipRefCountTracking();
+    // TODO OFFHEAP: no need to retain. We just use it while we have the RegionEntry synced.
+    @Retained @Released Object value = entry._getValueRetain(dr, true);
+    SimpleMemoryAllocatorImpl.unskipRefCountTracking();
     // TODO:KIRK:OK Object value = entry.getValueWithContext(dr);
     boolean foundData = false;
     if (value == null) {
@@ -4403,8 +4388,17 @@ public final class Oplog implements CompactableOplog {
                                                                                                       */);
       } else if (value instanceof CachedDeserializable) {
         CachedDeserializable proxy = (CachedDeserializable) value;
-        userBits = EntryBits.setSerialized(userBits, true);
-        proxy.fillSerializedValue(wrapper, userBits);
+        if (proxy instanceof StoredObject) {
+          @Released StoredObject ohproxy = (StoredObject) proxy;
+          try {
+            ohproxy.fillSerializedValue(wrapper, userBits);
+          } finally {
+            OffHeapHelper.releaseWithNoTracking(ohproxy);
+          }
+        } else {
+          userBits = EntryBits.setSerialized(userBits, true);
+          proxy.fillSerializedValue(wrapper, userBits);
+        }
       } else if (value instanceof byte[]) {
         byte[] valueBytes = (byte[]) value;
         // Asif: If the value is already a byte array then the user bit
@@ -4460,9 +4454,6 @@ public final class Oplog implements CompactableOplog {
    * 
    * @param value
    *          byte array representing the value
-   * @param isSerializedObject
-   *          Do the bytes in <code>value</code> contain a serialized object (or
-   *          an actually <code>byte</code> array)?
    * @throws DiskAccessException
    * @throws IllegalStateException
    */
@@ -4471,17 +4462,17 @@ public final class Oplog implements CompactableOplog {
    * during transition. Minimizing the synchronization allowing multiple put
    * operations for different entries to proceed concurrently for asynch mode
    */
-  public final void modify(LocalRegion region, DiskEntry entry, byte[] value, boolean isSerializedObject, boolean async) {
+  public final void modify(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) {
 
     if (getOplogSet().getChild() != this) {
-      getOplogSet().getChild().modify(region, entry, value, isSerializedObject, async);
+      getOplogSet().getChild().modify(region, entry, value, async);
     } else {
       DiskId did = entry.getDiskId();
       boolean exceptionOccured = false;
       byte prevUsrBit = did.getUserBits();
       int len = did.getValueLength();
       try {
-        byte userBits = calcUserBits(value, isSerializedObject);
+        byte userBits = calcUserBits(value);
         // save versions for creates and updates even if value is bytearrary in
         // 7.0
         if (entry.getVersionStamp() != null) {
@@ -4491,8 +4482,7 @@ public final class Oplog implements CompactableOplog {
           // pdx and tx will not use version
           userBits = EntryBits.setWithVersions(userBits, true);
         }
-        int valueLen = value != null ? value.length : 0;
-        basicModify(region.getDiskRegion(), entry, value, valueLen, userBits, async, false);
+        basicModify(region.getDiskRegion(), entry, value, userBits, async, false);
       } catch (IOException ex) {
         exceptionOccured = true;
         region.getCancelCriterion().checkCancelInProgress(ex);
@@ -4517,7 +4507,8 @@ public final class Oplog implements CompactableOplog {
   public void offlineModify(DiskRegionView drv, DiskEntry entry, byte[] value,
       boolean isSerializedObject) {
     try {
-      byte userBits = calcUserBits(value, isSerializedObject);
+      ValueWrapper vw = new DiskEntry.Helper.ByteArrayValueWrapper(isSerializedObject, value);
+      byte userBits = calcUserBits(vw);
       // save versions for creates and updates even if value is bytearrary in 7.0
       VersionStamp vs = entry.getVersionStamp();
       if (vs != null) {
@@ -4534,8 +4525,7 @@ public final class Oplog implements CompactableOplog {
         vs.setVersions(vt);
         userBits = EntryBits.setWithVersions(userBits, true);
       }
-      int valueLen = value != null ? value.length : 0;
-      basicModify(drv, entry, value, valueLen, userBits, false, false);
+      basicModify(drv, entry, vw, userBits, false, false);
     } catch (IOException ex) {
       throw new DiskAccessException(LocalizedStrings.Oplog_FAILED_WRITING_KEY_TO_0.toLocalizedString(this.diskFile.getPath()), ex, drv.getName());
     } catch (InterruptedException ie) {
@@ -4583,17 +4573,25 @@ public final class Oplog implements CompactableOplog {
     }
   }
 
-  private final void copyForwardModifyForCompact(DiskRegionView dr, DiskEntry entry, byte[] value, int valueLength, byte userBits) {
+  private final void copyForwardModifyForCompact(DiskRegionView dr, DiskEntry entry, BytesAndBitsForCompactor wrapper) {
     if (getOplogSet().getChild() != this) {
-      getOplogSet().getChild().copyForwardModifyForCompact(dr, entry, value, valueLength, userBits);
+      getOplogSet().getChild().copyForwardModifyForCompact(dr, entry, wrapper);
     } else {
       DiskId did = entry.getDiskId();
       boolean exceptionOccured = false;
       int len = did.getValueLength();
       try {
+        // TODO: compaction needs to get version?
+        byte userBits = wrapper.getBits();
+        ValueWrapper vw;
+        if (wrapper.getDataChunk() != null) {
+          vw = new DiskEntry.Helper.ChunkValueWrapper(wrapper.getDataChunk());
+        } else {
+          vw = new DiskEntry.Helper.CompactorValueWrapper(wrapper.getBytes(), wrapper.getValidLength());
+        }
         // Compactor always says to do an async basicModify so that its writes
         // will be grouped. This is not a true async write; just a grouped one.
-        basicModify(dr, entry, value, valueLength, userBits, true, true);
+        basicModify(dr, entry, vw, userBits, true, true);
       } catch (IOException ex) {
         exceptionOccured = true;
         getParent().getCancelCriterion().checkCancelInProgress(ex);
@@ -4607,6 +4605,9 @@ public final class Oplog implements CompactableOplog {
             LocalizedStrings.Oplog_FAILED_WRITING_KEY_TO_0_DUE_TO_FAILURE_IN_ACQUIRING_READ_LOCK_FOR_ASYNCH_WRITING
                 .toLocalizedString(this.diskFile.getPath()), ie, getParent());
       } finally {
+        if (wrapper.getDataChunk() != null) {
+          wrapper.setChunkData(null, (byte) 0);
+        }
         if (exceptionOccured) {
           did.setValueLength(len);
         }
@@ -4626,7 +4627,7 @@ public final class Oplog implements CompactableOplog {
    * @throws IOException
    * @throws InterruptedException
    */
-  private void basicModify(DiskRegionView dr, DiskEntry entry, byte[] value, int valueLength, byte userBits, boolean async,
+  private void basicModify(DiskRegionView dr, DiskEntry entry, ValueWrapper value, byte userBits, boolean async,
       boolean calledByCompactor) throws IOException, InterruptedException {
     DiskId id = entry.getDiskId();
     boolean useNextOplog = false;
@@ -4643,7 +4644,7 @@ public final class Oplog implements CompactableOplog {
       if (getOplogSet().getChild() != this) {
         useNextOplog = true;
       } else {
-        initOpState(OPLOG_MOD_ENTRY_1ID, dr, entry, value, valueLength, userBits, false);
+        initOpState(OPLOG_MOD_ENTRY_1ID, dr, entry, value, userBits, false);
         adjustment = getOpStateSize();
         assert adjustment > 0;
         long temp = (this.crf.currSize + adjustment);
@@ -4671,11 +4672,11 @@ public final class Oplog implements CompactableOplog {
             }
             logger.trace(LogMarker.PERSIST_WRITES,
               "basicModify: id=<{}> key=<{}> valueOffset={} userBits={} valueLen={} valueBytes=<{}> drId={} versionStamp={} oplog#{}",
-              abs(id.getKeyId()), entry.getKey(), startPosForSynchOp, userBits, valueLength, baToString(value, valueLength),
+              abs(id.getKeyId()), entry.getKey(), startPosForSynchOp, userBits, value.getLength(), value.getBytesAsString(),
               dr.getId(), tag, getOplogId());
           }
           if (EntryBits.isNeedsValue(userBits)) {
-            id.setValueLength(valueLength);
+            id.setValueLength(value.getLength());
           } else {
             id.setValueLength(0);
           }
@@ -4733,7 +4734,7 @@ public final class Oplog implements CompactableOplog {
         CacheObserverHolder.getInstance().afterSwitchingOplog();
       }
       Assert.assertTrue(getOplogSet().getChild() != this);
-      getOplogSet().getChild().basicModify(dr, entry, value, valueLength, userBits, async, calledByCompactor);
+      getOplogSet().getChild().basicModify(dr, entry, value, userBits, async, calledByCompactor);
     } else {
       if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
         CacheObserverHolder.getInstance().afterSettingOplogOffSet(startPosForSynchOp);
@@ -5173,6 +5174,22 @@ public final class Oplog implements CompactableOplog {
     flushAllNoSync(false); // @todo
     // flush(olf, false);
   }
+  
+  @Override
+  public void flush() throws IOException {
+    flushAllNoSync(false);
+  }
+
+  @Override
+  public void flush(ByteBuffer b1, ByteBuffer b2) throws IOException {
+    if (b1 == this.drf.writeBuf) {
+      flush(this.drf, b1, b2);
+      flush(this.crf, false);
+    } else {
+      flush(this.drf, false);
+      flush(this.crf, b1, b2);
+    }
+  }
 
   private final void flushAndSync(OplogFile olf) throws IOException {
     flushAll(false); // @todo
@@ -5213,6 +5230,31 @@ public final class Oplog implements CompactableOplog {
     }
   }
 
+  private final void flush(OplogFile olf, ByteBuffer b1, ByteBuffer b2) throws IOException {
+    try {
+      synchronized (this.lock/* olf */) {
+        if (olf.RAFClosed) {
+          return;
+        }
+        this.bbArray[0] = b1;
+        this.bbArray[1] = b2;
+        b1.flip();
+        long flushed = olf.channel.write(this.bbArray);
+        this.bbArray[0] = null;
+        this.bbArray[1] = null;
+        // update bytesFlushed after entire writeBuffer is flushed to fix bug 41201
+        olf.bytesFlushed += flushed;
+        b1.clear();
+      }
+    } catch (ClosedChannelException ignore) {
+      // It is possible for a channel to be closed when our code does not
+      // explicitly call channel.close (when we will set RAFclosed).
+      // This can happen when a thread is doing an io op and is interrupted.
+      // That thread will see ClosedByInterruptException but it will also
+      // close the channel and then we will see ClosedChannelException.
+    }
+  }
+  
   public final void flushAll() {
     flushAll(false);
   }
@@ -6072,23 +6114,19 @@ public final class Oplog implements CompactableOplog {
                 }
                 boolean toCompact = getBytesAndBitsForCompaction(dr, de, wrapper);
                 if (toCompact) {
-                  byte[] valueBytes = wrapper.getBytes();
-                  int length = wrapper.getValidLength();
-                  byte userBits = wrapper.getBits();
-                  // TODO: compaction needs to get version?
                   if (oplogId != did.getOplogId()) {
-                    // @todo: Is this even possible? Perhaps I should just
-                    // assert here
+                    // @todo: Is this even possible? Perhaps I should just assert here
                     // skip this guy his oplogId changed
                     if (!wrapper.isReusable()) {
                       wrapper = new BytesAndBitsForCompactor();
+                    } else if (wrapper.getDataChunk() != null) {
+                      wrapper.setChunkData(null, (byte) 0);
                     }
                     continue;
                   }
                   // write it to the current oplog
-                  getOplogSet().getChild().copyForwardModifyForCompact(dr, de, valueBytes, length, userBits);
-                  // the did's oplogId will now be set to the current active
-                  // oplog
+                  getOplogSet().getChild().copyForwardModifyForCompact(dr, de, wrapper);
+                  // the did's oplogId will now be set to the current active oplog
                   didCompact = true;
                 }
               } // did
@@ -6464,8 +6502,7 @@ public final class Oplog implements CompactableOplog {
      */
     private int size;
     private boolean needsValue;
-    private byte[] value;
-    private int valueLength;
+    private ValueWrapper value;
     private int drIdLength; // 1..9
     private final byte[] drIdBytes = new byte[DiskInitFile.DR_ID_MAX_BYTES];
     private byte[] keyBytes;
@@ -6488,10 +6525,13 @@ public final class Oplog implements CompactableOplog {
 
     public String debugStr() {
       StringBuilder sb = new StringBuilder();
-      sb.append(" opcode=").append(this.opCode).append(" len=").append(this.valueLength).append(" vb=").append(
-          baToString(this.value, this.valueLength));
+      sb.append(" opcode=").append(this.opCode).append(" len=").append(this.value.getLength()).append(" vb=").append(this.value.getBytesAsString());
       return sb.toString();
     }
+    
+    private final void write(OplogFile olf, ValueWrapper vw) throws IOException {
+      vw.sendTo(olf.writeBuf, Oplog.this);
+    }
 
     private final void write(OplogFile olf, byte[] bytes, int byteLength) throws IOException {
       int offset = 0;
@@ -6573,9 +6613,8 @@ public final class Oplog implements CompactableOplog {
     public void initialize(Map<Long, AbstractDiskRegion> drMap, boolean gcRVV) throws IOException {
       this.opCode = OPLOG_RVV;
       byte[] rvvBytes = serializeRVVs(drMap, gcRVV);
-      this.value = rvvBytes;
+      this.value = new DiskEntry.Helper.ByteArrayValueWrapper(true, rvvBytes);
       // Size is opCode + length + end of record
-      this.valueLength = rvvBytes.length;
       this.size = 1 + rvvBytes.length + 1;
     }
 
@@ -6586,9 +6625,8 @@ public final class Oplog implements CompactableOplog {
       saveUserBits(notToUseUserBits, userBits);
 
       this.keyBytes = keyBytes;
-      this.value = valueBytes;
-      this.valueLength = this.value.length;
-      if (this.userBits == 1 && this.valueLength == 0) {
+      this.value = new DiskEntry.Helper.CompactorValueWrapper(valueBytes, valueBytes.length);
+      if (this.userBits == 1 && this.value.getLength() == 0) {
         throw new IllegalStateException("userBits==1 and valueLength is 0");
       }
 
@@ -6598,7 +6636,7 @@ public final class Oplog implements CompactableOplog {
       initVersionsBytes(tag);
 
       if (this.needsValue) {
-        this.size += 4 + this.valueLength;
+        this.size += 4 + this.value.getLength();
       }
       this.deltaIdBytesLength = 0;
       {
@@ -6639,15 +6677,14 @@ public final class Oplog implements CompactableOplog {
       }
     }
 
-    public void initialize(byte opCode, DiskRegionView dr, DiskEntry entry, byte[] value, int valueLength, byte userBits,
+    public void initialize(byte opCode, DiskRegionView dr, DiskEntry entry, ValueWrapper value, byte userBits,
         boolean notToUseUserBits) throws IOException {
       this.opCode = opCode;
       this.size = 1;// for the opcode
       saveUserBits(notToUseUserBits, userBits);
 
       this.value = value;
-      this.valueLength = valueLength;
-      if (this.userBits == 1 && this.valueLength == 0) {
+      if (this.userBits == 1 && this.value.getLength() == 0) {
         throw new IllegalStateException("userBits==1 and valueLength is 0");
       }
 
@@ -6682,7 +6719,7 @@ public final class Oplog implements CompactableOplog {
         saveDrId(drId);
       }
       if (this.needsValue) {
-        this.size += 4 + this.valueLength;
+        this.size += 4 + this.value.getLength();
       }
       this.deltaIdBytesLength = 0;
       if (this.opCode != OPLOG_NEW_ENTRY_0ID) {
@@ -6791,8 +6828,8 @@ public final class Oplog implements CompactableOplog {
         write(olf, this.magic.getBytes(), OPLOG_TYPE.getLen());
         bytesWritten += OPLOG_TYPE.getLen();
       } else if (this.opCode == OPLOG_RVV) {
-        write(olf, this.value, this.valueLength);
-        bytesWritten += this.valueLength;
+        write(olf, this.value);
+        bytesWritten += this.value.getLength();
       } else if (this.opCode == OPLOG_GEMFIRE_VERSION) {
         writeOrdinal(olf, this.gfversion);
         bytesWritten++;
@@ -6823,11 +6860,12 @@ public final class Oplog implements CompactableOplog {
           bytesWritten += this.versionsBytes.length;
         }
         if (this.needsValue) {
-          writeInt(olf, this.valueLength);
+          int len = this.value.getLength();
+          writeInt(olf, len);
           bytesWritten += 4;
-          if (this.valueLength > 0) {
-            write(olf, this.value, this.valueLength);
-            bytesWritten += this.valueLength;
+          if (len > 0) {
+            write(olf, this.value);
+            bytesWritten += len;
           }
         }
         if (this.keyBytes != null) {
@@ -7044,10 +7082,19 @@ public final class Oplog implements CompactableOplog {
     }
 
     @Override
-    public Object _getValueUse(RegionEntryContext context, boolean decompress) {
+    public void handleValueOverflow(RegionEntryContext context) {throw new IllegalStateException();}
+
+    @Override
+    public void afterValueOverflow(RegionEntryContext context) {throw new IllegalStateException();}
+    @Override
+    public Object prepareValueForCache(RegionEntryContext r, Object val, boolean isEntryUpdate) { throw new IllegalStateException("Should never be called");  }
+
+    @Override
+    public Object _getValueRetain(RegionEntryContext context, boolean decompress) {
       throw new IllegalStateException();
     }
 
+    @Override
     public void setValueWithContext(RegionEntryContext context, Object value) {
       throw new IllegalStateException();
     }
@@ -7233,6 +7280,11 @@ public final class Oplog implements CompactableOplog {
     }
 
     @Override
+    public Object getValueRetain(RegionEntryContext context) {
+      return null;
+    }
+
+    @Override
     public void setValue(RegionEntryContext context, Object value) throws RegionClearedException {
       // TODO Auto-generated method stub
     }
@@ -7327,7 +7379,19 @@ public final class Oplog implements CompactableOplog {
     public void setUpdateInProgress(boolean underUpdate) {
       // TODO Auto-generated method stub
     }
-
+    @Override
+    public boolean isMarkedForEviction() {
+      // TODO Auto-generated method stub
+      return false;
+    }
+    @Override
+    public void setMarkedForEviction() {
+      // TODO Auto-generated method stub
+    }
+    @Override
+    public void clearMarkedForEviction() {
+      // TODO Auto-generated method stub
+    }
     @Override
     public boolean isInvalid() {
       // TODO Auto-generated method stub
@@ -7400,6 +7464,13 @@ public final class Oplog implements CompactableOplog {
     @Override
     public void resetRefCount(NewLRUClockHand lruList) {
     }
+
+    @Override
+    public Object prepareValueForCache(RegionEntryContext r, Object val,
+        EntryEventImpl event, boolean isEntryUpdate) {
+      throw new IllegalStateException("Should never be called");
+    }
+    
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OplogSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OplogSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OplogSet.java
index f8dfbe7..3a71b28 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OplogSet.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OplogSet.java
@@ -1,15 +1,15 @@
 package com.gemstone.gemfire.internal.cache;
 
-import java.util.concurrent.atomic.AtomicLong;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 
 public interface OplogSet {
   
   
-  public void create(LocalRegion region, DiskEntry entry, byte[] value,
-      boolean isSerializedObject, boolean async);
+  public void create(LocalRegion region, DiskEntry entry, ValueWrapper value,
+      boolean async);
   
-  public void modify(LocalRegion region, DiskEntry entry, byte[] value,
-      boolean isSerializedObject, boolean async);
+  public void modify(LocalRegion region, DiskEntry entry, ValueWrapper value,
+      boolean async);
 
   public CompactableOplog getChild(long oplogId);
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplog.java
index 057ccdb..63d9385 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplog.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplog.java
@@ -26,6 +26,8 @@ import com.gemstone.gemfire.cache.DiskAccessException;
 import com.gemstone.gemfire.cache.EntryDestroyedException;
 import com.gemstone.gemfire.distributed.OplogCancelledException;
 import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.Flushable;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.OplogCompactor;
 import com.gemstone.gemfire.internal.cache.Oplog.OplogDiskEntry;
 import com.gemstone.gemfire.internal.cache.persistence.BytesAndBits;
@@ -43,7 +45,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
  * 
  * @since prPersistSprint2
  */
-class OverflowOplog implements CompactableOplog {
+class OverflowOplog implements CompactableOplog, Flushable {
   private static final Logger logger = LogService.getLogger();
 
   /** Extension of the oplog file * */
@@ -56,6 +58,7 @@ class OverflowOplog implements CompactableOplog {
   private volatile boolean closed;
 
   private final OplogFile crf = new OplogFile();
+  private final ByteBuffer[] bbArray = new ByteBuffer[2];
 
   /** preallocated space available for writing to* */
   // volatile private long opLogSpace = 0L;
@@ -223,6 +226,7 @@ class OverflowOplog implements CompactableOplog {
     this.crf.f = f;
     this.crf.raf = new RandomAccessFile(f, "rw");
     this.crf.writeBuf = allocateWriteBuf(previous);
+    this.bbArray[0] = this.crf.writeBuf;
     preblow();
     logger.info(LocalizedMessage.create(LocalizedStrings.Oplog_CREATE_0_1_2,
         new Object[] {toString(), "crf", this.parent.getName()}));
@@ -246,6 +250,7 @@ class OverflowOplog implements CompactableOplog {
     synchronized (this.crf) {
       ByteBuffer result = this.crf.writeBuf;
       this.crf.writeBuf = null;
+      this.bbArray[0] = null;
       return result;
     }
   }
@@ -510,11 +515,10 @@ class OverflowOplog implements CompactableOplog {
   }
 
   private void initOpState(DiskEntry entry,
-                           byte[] value,
-                           int valueLength,
+                           ValueWrapper value,
                            byte userBits)
   {
-    this.opState.initialize(entry, value, valueLength, userBits);
+    this.opState.initialize(entry, value, userBits);
   }
 
   private void clearOpState() {
@@ -528,39 +532,18 @@ class OverflowOplog implements CompactableOplog {
     return this.opState.getSize();
   }
 
-  private byte calcUserBits(byte[] value,
-                            boolean isSerializedObject) {
-    byte userBits = 0x0;
-  
-    if (isSerializedObject) {
-      if (value == DiskEntry.INVALID_BYTES) {
-        // its the invalid token
-        userBits = EntryBits.setInvalid(userBits, true);
-      } else if (value == DiskEntry.LOCAL_INVALID_BYTES) {
-        // its the local-invalid token
-        userBits = EntryBits.setLocalInvalid(userBits, true);
-      } else if (value == DiskEntry.TOMBSTONE_BYTES) {
-        // its the tombstone token
-        userBits = EntryBits.setTombstone(userBits, true);
-      } else {
-        userBits = EntryBits.setSerialized(userBits, true);
-      }
-    }
-    return userBits;
+  private byte calcUserBits(ValueWrapper value) {
+    return value.getUserBits();
   }
   
   /**
    * Modifies a key/value pair from a region entry on disk. Updates all of the
    * necessary {@linkplain DiskStoreStats statistics} and invokes basicModify
-   * 
    * @param entry
    *          DiskEntry object representing the current Entry
-   * 
    * @param value
    *          byte array representing the value
-   * @param isSerializedObject
-   *          Do the bytes in <code>value</code> contain a serialized object
-   *          (or an actually <code>byte</code> array)?
+   * 
    * @throws DiskAccessException
    * @throws IllegalStateException
    */
@@ -570,12 +553,12 @@ class OverflowOplog implements CompactableOplog {
    * operations for different entries to proceed concurrently for asynch mode
    * @return true if modify was done; false if this file did not have room
    */
-  public final boolean modify(DiskRegion dr, DiskEntry entry, byte[] value,
-                              boolean isSerializedObject, boolean async)
+  public final boolean modify(DiskRegion dr, DiskEntry entry, ValueWrapper value,
+                              boolean async)
   {
     try {
-      byte userBits = calcUserBits(value, isSerializedObject);
-      return basicModify(entry, value, value.length, userBits, async);
+      byte userBits = calcUserBits(value);
+      return basicModify(entry, value, userBits, async);
     } catch (IOException ex) {
       throw new DiskAccessException(LocalizedStrings.Oplog_FAILED_WRITING_KEY_TO_0.toLocalizedString(this.diskFile.getPath()), ex, dr.getName());
     } catch (InterruptedException ie) {
@@ -584,9 +567,11 @@ class OverflowOplog implements CompactableOplog {
       throw new DiskAccessException(LocalizedStrings.Oplog_FAILED_WRITING_KEY_TO_0_DUE_TO_FAILURE_IN_ACQUIRING_READ_LOCK_FOR_ASYNCH_WRITING.toLocalizedString(this.diskFile.getPath()), ie, dr.getName());
     }
   }
+  
   public final boolean copyForwardForOverflowCompact(DiskEntry entry, byte[] value, int length, byte userBits) {
     try {
-      return basicModify(entry, value, length, userBits, true);
+      ValueWrapper vw = new DiskEntry.Helper.CompactorValueWrapper(value, length);
+      return basicModify(entry, vw, userBits, true);
     } catch (IOException ex) {
       throw new DiskAccessException(LocalizedStrings.Oplog_FAILED_WRITING_KEY_TO_0.toLocalizedString(this.diskFile.getPath()), ex, getParent().getName());
     } catch (InterruptedException ie) {
@@ -611,8 +596,7 @@ class OverflowOplog implements CompactableOplog {
    * @throws InterruptedException
    */
   private boolean basicModify(DiskEntry entry,
-                              byte[] value,
-                              int valueLength,
+                              ValueWrapper value,
                               byte userBits, boolean async)
     throws IOException, InterruptedException
   {
@@ -620,7 +604,7 @@ class OverflowOplog implements CompactableOplog {
     long startPosForSynchOp = -1L;
     OverflowOplog emptyOplog = null;
     synchronized (this.crf) {
-      initOpState(entry, value, valueLength, userBits);
+      initOpState(entry, value, userBits);
       int adjustment = getOpStateSize();
       assert adjustment > 0;
       int oldOplogId;
@@ -638,7 +622,7 @@ class OverflowOplog implements CompactableOplog {
           oldOplogId = (int)id.setOplogId(getOplogId());
           id.setOffsetInOplog(startPosForSynchOp);
           if (EntryBits.isNeedsValue(userBits)) {
-            id.setValueLength(valueLength);
+            id.setValueLength(value.getLength());
           } else {
             id.setValueLength(0);
           }
@@ -738,7 +722,8 @@ class OverflowOplog implements CompactableOplog {
 //   public final ByteBuffer getWriteBuf() {
 //     return this.crf.writeBuf;
 //   }
-  private final void flush() throws IOException {
+  @Override
+  public final void flush() throws IOException {
     final OplogFile olf = this.crf;
     synchronized (olf) {
       if (olf.RAFClosed) {
@@ -766,6 +751,32 @@ class OverflowOplog implements CompactableOplog {
     }
   }
 
+  @Override
+  public final void flush(ByteBuffer b1, ByteBuffer b2) throws IOException {
+    final OplogFile olf = this.crf;
+    synchronized (olf) {
+      if (olf.RAFClosed) {
+        return;
+      }
+      try {
+        assert b1 == olf.writeBuf;
+        b1.flip();
+        this.bbArray[1] = b2;
+        long flushed = olf.channel.write(this.bbArray);
+        this.bbArray[1] = null;
+        // update bytesFlushed after entire writeBuffer is flushed to fix bug 41201
+        olf.bytesFlushed += flushed;
+        b1.clear();
+      } catch (ClosedChannelException ignore) {
+        // It is possible for a channel to be closed when our code does not
+        // explicitly call channel.close (when we will set RAFclosed).
+        // This can happen when a thread is doing an io op and is interrupted.
+        // That thread will see ClosedByInterruptException but it will also
+        // close the channel and then we will see ClosedChannelException.
+      }
+    }
+  }
+  
   public final void flushAll() {
     try {
       flush();
@@ -1223,8 +1234,7 @@ class OverflowOplog implements CompactableOplog {
      */
     private int size;
     private boolean needsValue;
-    private byte[] value;
-    private int valueLength;
+    private ValueWrapper value;
 
     public final int getSize() {
       return this.size;
@@ -1237,45 +1247,31 @@ class OverflowOplog implements CompactableOplog {
       this.value = null;
     }
 
-    private final void write(byte[] bytes, int byteLength) throws IOException {
-      int offset = 0;
-      final int maxOffset = byteLength;
-      ByteBuffer bb = getOLF().writeBuf;
-      while (offset < maxOffset) {
-        
-        int bytesThisTime = maxOffset - offset;
-        boolean needsFlush = false;
-        if (bytesThisTime > bb.remaining()) {
-          needsFlush = true;
-          bytesThisTime = bb.remaining();
-        }
-        bb.put(bytes, offset, bytesThisTime);
-        offset += bytesThisTime;
-        if (needsFlush) {
-          flush();
-        }
-      }
+    private final void write(ValueWrapper vw) throws IOException {
+      vw.sendTo(getOLF().writeBuf, OverflowOplog.this);
     }
+    
     public void initialize(DiskEntry entry,
-                           byte[] value,
-                           int valueLength,
+                           ValueWrapper value,
                            byte userBits)
     {
       this.userBits = userBits;
       this.value = value;
-      this.valueLength = valueLength;
 
       this.size = 0;
       this.needsValue = EntryBits.isNeedsValue(this.userBits);
       if (this.needsValue) {
-        this.size += this.valueLength;
+        this.size += this.value.getLength();
       }
     }
     public long write() throws IOException {
       long bytesWritten = 0;
-      if (this.needsValue && this.valueLength > 0) {
-        write(this.value, this.valueLength);
-        bytesWritten += this.valueLength;
+      if (this.needsValue) {
+        int valueLength = this.value.getLength();
+        if (valueLength > 0) {
+          write(this.value);
+          bytesWritten += valueLength;
+        }
       }
       return bytesWritten;
     }
@@ -1461,6 +1457,7 @@ class OverflowOplog implements CompactableOplog {
     long oplogOffset = did.getOffsetInOplog();
     boolean foundData = false;
     if (entry.isValueNull()) {
+      // TODO OFFHEAP: optimize BytesAndBitsForCompactor to be able to have off-heap value reference instead of copying
       // Asif: If the mode is synch it is guaranteed to be present in the disk
       foundData = basicGetForCompactor(oplogOffset, false,
                                        did.getValueLength(),

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplogSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplogSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplogSet.java
index 4be567f..62cfe82 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplogSet.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/OverflowOplogSet.java
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.cache.DiskAccessException;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
@@ -49,20 +50,20 @@ public class OverflowOplogSet implements OplogSet {
   }
   
   @Override
-  public final void modify(LocalRegion lr, DiskEntry entry, byte[] value,
-      boolean isSerializedObject, boolean async) {
+  public final void modify(LocalRegion lr, DiskEntry entry, ValueWrapper value,
+      boolean async) {
     DiskRegion dr = lr.getDiskRegion();
     synchronized (this.overflowMap) {
       if (this.lastOverflowWrite != null) {
-        if (this.lastOverflowWrite.modify(dr, entry, value, isSerializedObject, async)) {
+        if (this.lastOverflowWrite.modify(dr, entry, value, async)) {
           return;
         }
       }
       // Create a new one and put it on the front of the list.
-      OverflowOplog oo = createOverflowOplog(value.length);
+      OverflowOplog oo = createOverflowOplog(value.getLength());
       addOverflow(oo);
       this.lastOverflowWrite = oo;
-      boolean didIt = oo.modify(dr, entry, value, isSerializedObject, async);
+      boolean didIt = oo.modify(dr, entry, value, async);
       assert didIt;
     }
   }
@@ -226,9 +227,9 @@ public class OverflowOplogSet implements OplogSet {
   
   
   @Override
-  public void create(LocalRegion region, DiskEntry entry, byte[] value,
-      boolean isSerializedObject, boolean async) {
-    modify(region, entry, value, isSerializedObject, async);
+  public void create(LocalRegion region, DiskEntry entry, ValueWrapper value,
+      boolean async) {
+    modify(region, entry, value, async);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PRQueryProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PRQueryProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PRQueryProcessor.java
index 47053fb..2db55c3 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PRQueryProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PRQueryProcessor.java
@@ -34,13 +34,16 @@ import com.gemstone.gemfire.cache.query.QueryException;
 import com.gemstone.gemfire.cache.query.QueryInvocationTargetException;
 import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.internal.CompiledSelect;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.ExecutionContext;
 import com.gemstone.gemfire.cache.query.internal.IndexTrackingQueryObserver;
+import com.gemstone.gemfire.cache.query.internal.NWayMergeResults;
 import com.gemstone.gemfire.cache.query.internal.QueryExecutionContext;
 import com.gemstone.gemfire.cache.query.internal.QueryMonitor;
 import com.gemstone.gemfire.cache.query.internal.QueryObserver;
 import com.gemstone.gemfire.cache.query.internal.QueryObserverHolder;
+import com.gemstone.gemfire.cache.query.types.ObjectType;
 import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.DataSerializableFixedID;
 import com.gemstone.gemfire.internal.Version;
@@ -79,14 +82,15 @@ public class PRQueryProcessor
   private PartitionedRegion pr;
   private final DefaultQuery query;
   private final Object[] parameters;
-  private final List _bucketsToQuery;
+  private final List<Integer> _bucketsToQuery;
   private volatile int numBucketsProcessed = 0;
+  private volatile ObjectType resultType = null; 
  
   private boolean isIndexUsedForLocalQuery = false;
 //  private List _failedBuckets;
 
   public PRQueryProcessor(PartitionedRegionDataStore prDS,
-      DefaultQuery query, Object[] parameters, List buckets) {
+      DefaultQuery query, Object[] parameters, List<Integer> buckets) {
     Assert.assertTrue(!buckets.isEmpty(), "bucket list can not be empty. ");
     this._prds = prDS;
     this._bucketsToQuery = buckets;
@@ -117,10 +121,11 @@ public class PRQueryProcessor
   /**
    * Executes a pre-compiled query on a data store.
    * Adds result objects to resultQueue
+   * @return boolean true if the result is a struct type
    * @throws QueryException
    * @throws ForceReattemptException if query should be tried again
    */
-  public void executeQuery(Collection<Collection> resultCollector)
+  public boolean executeQuery(Collection<Collection> resultCollector)
     throws QueryException, InterruptedException, ForceReattemptException {   
     //Set indexInfoMap to this threads observer.
     //QueryObserver observer = QueryObserverHolder.getInstance();
@@ -132,7 +137,8 @@ public class PRQueryProcessor
       executeWithThreadPool(resultCollector);
     } else {
       executeSequentially(resultCollector, this._bucketsToQuery);
-    }    
+    }
+    return this.resultType.isStructType();
   }
   
   private void executeWithThreadPool(Collection<Collection> resultCollector)
@@ -189,6 +195,16 @@ public class PRQueryProcessor
             }
           }
         }
+        
+        CompiledSelect cs = this.query.getSimpleSelect();
+       
+        if(cs != null && (cs.isOrderBy() || cs.isGroupBy())) {      
+          ExecutionContext context = new QueryExecutionContext(this.parameters, pr.getCache());
+          int limit = this.query.getLimit(parameters);
+          Collection mergedResults =coalesceOrderedResults(resultCollector, context, cs, limit);
+          resultCollector.clear();
+          resultCollector.add(mergedResults);
+        }
       }
     }
     
@@ -244,6 +260,7 @@ public class PRQueryProcessor
         if (limit < 0 || (rq.size() - numBucketsProcessed) < limit) {
           results = (SelectResults) query.prExecuteOnBucket(params, pr,
               bukRegion);
+          this.resultType = results.getCollectionType().getElementType(); 
         } 
         
         if (!bukRegion.isBucketDestroyed()) {
@@ -318,11 +335,47 @@ public class PRQueryProcessor
     }*/
     
     ExecutionContext context = new QueryExecutionContext(this.parameters, this.pr.getCache(), this.query);
-    context.setBucketList(buckets);
-    context.setCqQueryContext(query.isCqQuery());
     
+    CompiledSelect cs = this.query.getSimpleSelect();
+    int limit = this.query.getLimit(parameters);
+    if(cs != null && cs.isOrderBy() ) {
+      for(Integer bucketID : this._bucketsToQuery) {
+        List<Integer> singleBucket = Collections.singletonList(bucketID);
+        context.setBucketList(singleBucket);
+        executeQueryOnBuckets(resultCollector, context);
+      }     
+      Collection mergedResults =coalesceOrderedResults(resultCollector, context, cs, limit);
+      resultCollector.clear();
+      resultCollector.add(mergedResults);
+      
+    }else {
+      context.setBucketList(buckets);        
+      executeQueryOnBuckets(resultCollector, context);
+    }
+  }
+  
+  private Collection coalesceOrderedResults(Collection<Collection> results, 
+      ExecutionContext context, CompiledSelect cs, int limit) {
+    List<Collection> sortedResults = new ArrayList<Collection>(results.size());
+    //TODO :Asif : Deal with UNDEFINED
+    for(Object o : results) {
+      if(o instanceof Collection) {
+        sortedResults.add((Collection)o);
+      }        
+    }
+   
+    NWayMergeResults mergedResults = new NWayMergeResults(sortedResults, cs.isDistinct(), limit, 
+        cs.getOrderByAttrs(), context,cs.getElementTypeForOrderByQueries());
+    return mergedResults;
+  
+  }
+
+  private void executeQueryOnBuckets(Collection<Collection> resultCollector,
+      ExecutionContext context) throws ForceReattemptException,
+      QueryInvocationTargetException, QueryException {
     // Check if QueryMonitor is enabled, if so add query to be monitored.
     QueryMonitor queryMonitor = null;
+    context.setCqQueryContext(query.isCqQuery());
     if (GemFireCacheImpl.getInstance() != null)
     {
       queryMonitor = GemFireCacheImpl.getInstance().getQueryMonitor();
@@ -335,12 +388,16 @@ public class PRQueryProcessor
       }
       
       Object results = query.executeUsingContext(context);
-      synchronized (resultCollector) {
-        if (results == QueryService.UNDEFINED) {
+      
+      synchronized (resultCollector) {        
+        //TODO:Asif: In what situation would the results object itself be undefined?
+        // The elements of the results can be undefined , but not the resultset itself
+        /*if (results == QueryService.UNDEFINED) {
           resultCollector.add(Collections.singleton(results));
-        } else {
+        } else {*/
+          this.resultType = ((SelectResults)results).getCollectionType().getElementType(); 
           resultCollector.add((SelectResults) results);
-        }
+        //}
       }
       isIndexUsedForLocalQuery =((QueryExecutionContext)context).isIndexUsed();
       
@@ -514,9 +571,11 @@ public class PRQueryProcessor
         }
         
         final Integer bId = Integer.valueOf(this._bucketId);
-        ArrayList bucketList = new ArrayList();
-        bucketList.add(this._bucketId);
-        executeSequentially(this.resultColl, bucketList);
+        List<Integer> bucketList = Collections.singletonList(bId);       
+        ExecutionContext context = new QueryExecutionContext(this.parameters, pr.getCache(), this.query);
+        context.setBucketList(bucketList);
+        executeQueryOnBuckets(this.resultColl, context);
+        //executeSequentially(this.resultColl, bucketList);
         // success
         //doBucketQuery(bId, this._prDs, this.query, this.parameters, this.resultColl);
       } catch (ForceReattemptException fre) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionAttributesImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionAttributesImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionAttributesImpl.java
index 95086e9..df5574b 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionAttributesImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionAttributesImpl.java
@@ -19,23 +19,25 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.DataSerializable;
 import com.gemstone.gemfire.DataSerializer;
 import com.gemstone.gemfire.InternalGemFireError;
-import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.FixedPartitionResolver;
+import com.gemstone.gemfire.cache.FixedPartitionAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.FixedPartitionAttributes;
-import com.gemstone.gemfire.internal.InternalDataSerializer;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.cache.PartitionResolver;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
-
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.offheap.OffHeapStorage;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 
 /**
  * Internal implementation of PartitionAttributes. New attributes existing   
@@ -48,7 +50,10 @@ import com.gemstone.gemfire.cache.partition.PartitionListener;
 public class PartitionAttributesImpl implements PartitionAttributes,
       Cloneable, DataSerializable
 {
+  private static final Logger logger = LogService.getLogger();
   private static final long serialVersionUID = -7120239286748961954L;
+  
+  private static final int OFF_HEAP_LOCAL_MAX_MEMORY_PLACEHOLDER = 1;
 
   /** Partition resolver. */
   private transient PartitionResolver partitionResolver;
@@ -74,15 +79,27 @@ public class PartitionAttributesImpl implements PartitionAttributes,
    *  LOCAL_MAX_MEMORY_PROPERTY - deprecated, use setLocalMaxMemory
    */
   private Properties globalProperties = new Properties();
-    
-  // transient ExpirationAttributes entryTimeToLiveExpiration = ExpirationAttributes.DEFAULT;
-
-  // transient ExpirationAttributes entryIdleTimeoutExpiration = ExpirationAttributes.DEFAULT;
+  
+  /*
+   * This is used to artificially set the amount of available off-heap memory
+   * when no distributed system is available. This value works the same way as
+   * specifying off-heap as a GemFire property, so "100m" = 100 megabytes,
+   * "100g" = 100 gigabytes, etc.
+   */
+  private static String testAvailableOffHeapMemory = null;
 
   /** the amount of local memory to use, in megabytes */
   private int localMaxMemory = PartitionAttributesFactory.LOCAL_MAX_MEMORY_DEFAULT;
   private transient boolean hasLocalMaxMemory;
+  private transient boolean localMaxMemoryExists;
 
+  /** Used to determine how to calculate the default local max memory.
+   * This was made transient since we do not support p2p backwards compat changes to values stored in a region
+   * and our PR implementation stores this object in the internal PRRoot internal region.
+   */
+  private transient boolean offHeap = false;
+  private transient boolean hasOffHeap;
+  
   /** placeholder for javadoc for this variable */
   private int totalNumBuckets = PartitionAttributesFactory.GLOBAL_MAX_BUCKETS_DEFAULT;
   private transient boolean hasTotalNumBuckets;
@@ -135,8 +152,17 @@ public class PartitionAttributesImpl implements PartitionAttributes,
     this.localProperties.setProperty(PartitionAttributesFactory.LOCAL_MAX_MEMORY_PROPERTY,
                                      String.valueOf(this.localMaxMemory));
     this.hasLocalMaxMemory = true;
+    this.localMaxMemoryExists = true;
   }
     
+  public void setOffHeap(final boolean offHeap) {
+    this.offHeap = offHeap;
+    this.hasOffHeap = true;
+    if (this.offHeap && !this.hasLocalMaxMemory) {
+      this.localMaxMemory = computeOffHeapLocalMaxMemory();
+    }
+  }
+  
   public void setColocatedWith(String colocatedRegionFullPath) {
     this.colocatedRegionName = colocatedRegionFullPath;
     this.hasColocatedRegionName = true;
@@ -213,7 +239,55 @@ public class PartitionAttributesImpl implements PartitionAttributes,
     return this.totalMaxMemory;
   }
     
+  public boolean getOffHeap() {
+    return this.offHeap;
+  }
+  
+  /**
+   * Returns localMaxMemory that must not be a temporary placeholder for
+   * offHeapLocalMaxMemory if off-heap. This must return the true final value
+   * of localMaxMemory which requires the DistributedSystem to be created if
+   * off-heap. See bug #52003.
+   * 
+   * @throws IllegalStateException if off-heap and the actual value is not yet known (because the DistributedSystem has not yet been created)
+   * @see #getLocalMaxMemoryForValidation()
+   */
   public int getLocalMaxMemory() {
+    if (this.offHeap && !this.localMaxMemoryExists) {
+      int value = computeOffHeapLocalMaxMemory();
+      if (this.localMaxMemoryExists) { // real value now exists so set it and return
+        this.localMaxMemory = value;
+      }
+    }
+    checkLocalMaxMemoryExists();
+    return this.localMaxMemory;
+  }
+  /**
+   * @throws IllegalStateException if off-heap and the actual value is not yet known (because the DistributedSystem has not yet been created)
+   */
+  private void checkLocalMaxMemoryExists() {
+    if (this.offHeap && !this.localMaxMemoryExists) { // real value does NOT yet exist so throw IllegalStateException
+      throw new IllegalStateException("Attempting to use localMaxMemory for off-heap but value is not yet known (default value is equal to off-heap-memory-size)");
+    }
+  }
+  
+  /**
+   * Returns localMaxMemory for validation of attributes before Region is 
+   * created (possibly before DistributedSystem is created). Returned value may 
+   * be the temporary placeholder representing offHeapLocalMaxMemory which 
+   * cannot be calculated until the DistributedSystem is created. See bug 
+   * #52003.
+   * 
+   * @see #OFF_HEAP_LOCAL_MAX_MEMORY_PLACEHOLDER 
+   * @see #getLocalMaxMemory()
+   */
+  public int getLocalMaxMemoryForValidation() {
+    if (this.offHeap && !this.hasLocalMaxMemory && !this.localMaxMemoryExists) {
+      int value = computeOffHeapLocalMaxMemory();
+      if (this.localMaxMemoryExists) { // real value now exists so set it and return
+        this.localMaxMemory = value;
+      }
+    }
     return this.localMaxMemory;
   }
     
@@ -284,22 +358,22 @@ public class PartitionAttributesImpl implements PartitionAttributes,
   }
 
   @Override
-    public String toString()
-    {
-      StringBuffer s = new StringBuffer();
-      return s.append("PartitionAttributes@")
-        .append(System.identityHashCode(this))
-        .append("[redundantCopies=").append(getRedundantCopies())
-        .append(";localMaxMemory=").append(this.localMaxMemory)
-        .append(";totalMaxMemory=").append(this.totalMaxMemory)
-        .append(";totalNumBuckets=").append(this.totalNumBuckets)
-        .append(";partitionResolver=").append(this.partitionResolver)
-        .append(";colocatedWith=").append(this.colocatedRegionName)
-        .append(";recoveryDelay=").append(this.recoveryDelay)
-        .append(";startupRecoveryDelay=").append(this.startupRecoveryDelay)
-        .append(";FixedPartitionAttributes=").append(this.fixedPAttrs)
-        .append(";partitionListeners=").append(this.partitionListeners)
-        .append("]") .toString();
+  public String toString()
+  {
+    StringBuffer s = new StringBuffer();
+    return s.append("PartitionAttributes@")
+      .append(System.identityHashCode(this))
+      .append("[redundantCopies=").append(getRedundantCopies())
+      .append(";localMaxMemory=").append(getLocalMaxMemory())
+      .append(";totalMaxMemory=").append(this.totalMaxMemory)
+      .append(";totalNumBuckets=").append(this.totalNumBuckets)
+      .append(";partitionResolver=").append(this.partitionResolver)
+      .append(";colocatedWith=").append(this.colocatedRegionName)
+      .append(";recoveryDelay=").append(this.recoveryDelay)
+      .append(";startupRecoveryDelay=").append(this.startupRecoveryDelay)
+      .append(";FixedPartitionAttributes=").append(this.fixedPAttrs)
+      .append(";partitionListeners=").append(this.partitionListeners)
+      .append("]") .toString();
   }
 
   public String getStringForSQLF() {
@@ -312,31 +386,35 @@ public class PartitionAttributesImpl implements PartitionAttributes,
         ",startupRecoveryDelay=").append(this.startupRecoveryDelay).toString();
   }
 
-    public void toData(DataOutput out) throws IOException {
-      out.writeInt(this.redundancy);
-      out.writeLong(this.totalMaxMemory);
-      out.writeInt(this.localMaxMemory);
-      out.writeInt(this.totalNumBuckets);
-      DataSerializer.writeString(this.colocatedRegionName, out);
-      DataSerializer.writeObject(this.localProperties, out);
-      DataSerializer.writeObject(this.globalProperties, out);
-      out.writeLong(this.recoveryDelay);
-      out.writeLong(this.startupRecoveryDelay);
-      DataSerializer.writeObject(this.fixedPAttrs, out);
-    }
-    public void fromData(DataInput in) throws IOException,
-        ClassNotFoundException {
-      this.redundancy = in.readInt();
-      this.totalMaxMemory = in.readLong();
-      this.localMaxMemory = in.readInt();
-      this.totalNumBuckets = in.readInt();
-      this.colocatedRegionName = DataSerializer.readString(in);
-      this.localProperties = (Properties)DataSerializer.readObject(in);
-      this.globalProperties = (Properties)DataSerializer.readObject(in);
-      this.recoveryDelay = in.readLong();
-      this.startupRecoveryDelay = in.readLong();
-      this.fixedPAttrs = DataSerializer.readObject(in);
-    }
+  /**
+   * @throws IllegalStateException if off-heap and the actual value is not yet known (because the DistributedSystem has not yet been created)
+   */
+  public void toData(DataOutput out) throws IOException {
+    checkLocalMaxMemoryExists();
+    out.writeInt(this.redundancy);
+    out.writeLong(this.totalMaxMemory);
+    out.writeInt(getLocalMaxMemory()); // call the gettor to force it to be computed in the offheap case
+    out.writeInt(this.totalNumBuckets);
+    DataSerializer.writeString(this.colocatedRegionName, out);
+    DataSerializer.writeObject(this.localProperties, out);
+    DataSerializer.writeObject(this.globalProperties, out);
+    out.writeLong(this.recoveryDelay);
+    out.writeLong(this.startupRecoveryDelay);
+    DataSerializer.writeObject(this.fixedPAttrs, out);
+  }
+  
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.redundancy = in.readInt();
+    this.totalMaxMemory = in.readLong();
+    this.localMaxMemory = in.readInt();
+    this.totalNumBuckets = in.readInt();
+    this.colocatedRegionName = DataSerializer.readString(in);
+    this.localProperties = (Properties)DataSerializer.readObject(in);
+    this.globalProperties = (Properties)DataSerializer.readObject(in);
+    this.recoveryDelay = in.readLong();
+    this.startupRecoveryDelay = in.readLong();
+    this.fixedPAttrs = DataSerializer.readObject(in);
+  }
     
   public static PartitionAttributesImpl createFromData(DataInput in)
     throws IOException, ClassNotFoundException {
@@ -351,37 +429,37 @@ public class PartitionAttributesImpl implements PartitionAttributes,
   }
 
   @Override
-    public boolean equals(final Object obj) {
-      if (this == obj) { 
-        return true;
-      }
-      
-    if (! (obj instanceof PartitionAttributesImpl)) {
-      return false;
+  public boolean equals(final Object obj) {
+    if (this == obj) { 
+      return true;
     }
-      
-    PartitionAttributesImpl other = (PartitionAttributesImpl) obj;
-      
-      if (this.redundancy != other.getRedundantCopies()
-          || this.localMaxMemory != other.getLocalMaxMemory()
-          || this.totalNumBuckets != other.getTotalNumBuckets()
-          || this.totalMaxMemory != other.getTotalMaxMemory()
-          || this.startupRecoveryDelay != other.getStartupRecoveryDelay()
-          || this.recoveryDelay != other.getRecoveryDelay()
+    
+  if (! (obj instanceof PartitionAttributesImpl)) {
+    return false;
+  }
+    
+  PartitionAttributesImpl other = (PartitionAttributesImpl) obj;
+    
+    if (this.redundancy != other.getRedundantCopies()
+        || getLocalMaxMemory() != other.getLocalMaxMemory()
+        || this.offHeap != other.getOffHeap()
+        || this.totalNumBuckets != other.getTotalNumBuckets()
+        || this.totalMaxMemory != other.getTotalMaxMemory()
+        || this.startupRecoveryDelay != other.getStartupRecoveryDelay()
+        || this.recoveryDelay != other.getRecoveryDelay()
 //          || ! this.localProperties.equals(other.getLocalProperties())
 //          || ! this.globalProperties.equals(other.getGlobalProperties())
-          || ((this.partitionResolver == null) != (other.getPartitionResolver() == null))
-          || (this.partitionResolver != null && !this.partitionResolver
-            .equals(other.getPartitionResolver()))
-          || ((this.colocatedRegionName == null) != (other.getColocatedWith() == null))
-          || (this.colocatedRegionName != null && !this.colocatedRegionName
-            .equals(other.getColocatedWith()))
-          ||((this.fixedPAttrs == null) != (other.getFixedPartitionAttributes()== null))
-          ||(this.fixedPAttrs != null && !this.fixedPAttrs.equals(other.getFixedPartitionAttributes()))
-          ) {
-        //throw new RuntimeException("this="+this.toString() + "   other=" + other.toString());
-        return false;
-        
+        || ((this.partitionResolver == null) != (other.getPartitionResolver() == null))
+        || (this.partitionResolver != null && !this.partitionResolver
+          .equals(other.getPartitionResolver()))
+        || ((this.colocatedRegionName == null) != (other.getColocatedWith() == null))
+        || (this.colocatedRegionName != null && !this.colocatedRegionName
+          .equals(other.getColocatedWith()))
+        ||((this.fixedPAttrs == null) != (other.getFixedPartitionAttributes()== null))
+        ||(this.fixedPAttrs != null && !this.fixedPAttrs.equals(other.getFixedPartitionAttributes()))
+        ) {
+      //throw new RuntimeException("this="+this.toString() + "   other=" + other.toString());
+      return false;
     }
 
     PartitionListener[] otherPListeners = other.getPartitionListeners();
@@ -645,6 +723,9 @@ public class PartitionAttributesImpl implements PartitionAttributes,
     if (pa.hasLocalMaxMemory) {
       setLocalMaxMemory(pa.getLocalMaxMemory());
     }
+    if (pa.hasOffHeap) {
+      setOffHeap(pa.getOffHeap());
+    }
     if (pa.hasTotalMaxMemory) {
       setTotalMaxMemory(pa.getTotalMaxMemory());
     }
@@ -670,20 +751,69 @@ public class PartitionAttributesImpl implements PartitionAttributes,
       this.addPartitionListeners(pa.partitionListeners);
     }
   }
-
   
-    @SuppressWarnings("unchecked")
-    public void setAll(@SuppressWarnings("rawtypes") PartitionAttributes pa) {
-      setRedundantCopies(pa.getRedundantCopies());
-      setLocalProperties(pa.getLocalProperties());
-      setGlobalProperties(pa.getGlobalProperties());
-      setLocalMaxMemory(pa.getLocalMaxMemory());
-      setTotalMaxMemory(pa.getTotalMaxMemory());
-      setTotalNumBuckets(pa.getTotalNumBuckets());
-      setPartitionResolver(pa.getPartitionResolver());
-      setColocatedWith(pa.getColocatedWith());
-      setRecoveryDelay(pa.getRecoveryDelay());
-      setStartupRecoveryDelay(pa.getStartupRecoveryDelay());
-      addFixedPartitionAttributes(pa.getFixedPartitionAttributes());
+  @SuppressWarnings("unchecked")
+  public void setAll(@SuppressWarnings("rawtypes")
+  PartitionAttributes pa) {
+    setRedundantCopies(pa.getRedundantCopies());
+    setLocalProperties(pa.getLocalProperties());
+    setGlobalProperties(pa.getGlobalProperties());
+    setLocalMaxMemory(pa.getLocalMaxMemory());
+    setTotalMaxMemory(pa.getTotalMaxMemory());
+    setTotalNumBuckets(pa.getTotalNumBuckets());
+    setPartitionResolver(pa.getPartitionResolver());
+    setColocatedWith(pa.getColocatedWith());
+    setRecoveryDelay(pa.getRecoveryDelay());
+    setStartupRecoveryDelay(pa.getStartupRecoveryDelay());
+    setOffHeap(((PartitionAttributesImpl) pa).getOffHeap());
+    addFixedPartitionAttributes(pa.getFixedPartitionAttributes());
+  }
+  
+  /**
+   * Only used for testing. Sets the amount of available off-heap memory when no
+   * distributed system is available. This method must be called before any
+   * instances of PartitionAttributesImpl are created. Specify the value the
+   * same way the off-heap memory property is specified. So, "100m" = 100
+   * megabytes, etc.
+   * 
+   * @param newTestAvailableOffHeapMemory The new test value for available
+   * off-heap memory.
+   */
+  public static void setTestAvailableOffHeapMemory(final String newTestAvailableOffHeapMemory) {
+    testAvailableOffHeapMemory = newTestAvailableOffHeapMemory;
+  }
+  
+  /**
+   * By default the partition can use up to 100% of the allocated off-heap
+   * memory.
+   */
+  private int computeOffHeapLocalMaxMemory() {
+    
+    long availableOffHeapMemoryInMB = 0;
+    if (testAvailableOffHeapMemory != null) {
+      availableOffHeapMemoryInMB = OffHeapStorage.parseOffHeapMemorySize(testAvailableOffHeapMemory) / (1024 * 1024);
+    } else if (InternalDistributedSystem.getAnyInstance() == null) {
+      this.localMaxMemoryExists = false;
+      return OFF_HEAP_LOCAL_MAX_MEMORY_PLACEHOLDER; // fix 52033: return non-negative, non-zero temporary placeholder for offHeapLocalMaxMemory
+    } else {
+      String offHeapSizeConfigValue = InternalDistributedSystem.getAnyInstance().getOriginalConfig().getOffHeapMemorySize();
+      availableOffHeapMemoryInMB = OffHeapStorage.parseOffHeapMemorySize(offHeapSizeConfigValue) / (1024 * 1024);
     }
+    
+    if (availableOffHeapMemoryInMB > Integer.MAX_VALUE) {
+      logger.warn(LocalizedMessage.create(LocalizedStrings.PartitionAttributesImpl_REDUCED_LOCAL_MAX_MEMORY_FOR_PARTITION_ATTRIBUTES_WHEN_SETTING_FROM_AVAILABLE_OFF_HEAP_MEMORY_SIZE));
+      return Integer.MAX_VALUE;
+    }
+    
+    this.localMaxMemoryExists = true;
+    return (int) availableOffHeapMemoryInMB;
+  }
+  
+  public int getLocalMaxMemoryDefault() {
+    if (!this.offHeap) {
+      return PartitionAttributesFactory.LOCAL_MAX_MEMORY_DEFAULT;
+    }
+    
+    return computeOffHeapLocalMaxMemory();
   }
+}


Mime
View raw message