geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [28/57] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Thu, 09 Jul 2015 17:02:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
index ed997a6..b67d0f1 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
@@ -8,6 +8,7 @@
 package com.gemstone.gemfire.internal.cache;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.logging.log4j.Logger;
 
@@ -18,6 +19,7 @@ import com.gemstone.gemfire.internal.Assert;
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.AsyncDiskEntry;
 import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
 import com.gemstone.gemfire.internal.cache.lru.LRUClockNode;
@@ -29,6 +31,15 @@ import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
 import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.Releasable;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
+import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl.Chunk;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 
 /**
@@ -62,6 +73,20 @@ public interface DiskEntry extends RegionEntry {
    * @param value an entry value.
    */
   public void setValueWithContext(RegionEntryContext context,Object value);
+  
+  /**
+   * In some cases we need to do something just before we drop the value
+   * from a DiskEntry that is being moved (i.e. overflowed) to disk.
+   * @param context
+   */
+  public void handleValueOverflow(RegionEntryContext context);
+  
+  /**
+   * In some cases we need to do something just after we unset the value
+   * from a DiskEntry that has been moved (i.e. overflowed) to disk.
+   * @param context
+   */
+  public void afterValueOverflow(RegionEntryContext context);
 
   /**
    * Returns true if the DiskEntry value is equal to {@link Token#DESTROYED}, {@link Token#REMOVED_PHASE1}, or {@link Token#REMOVED_PHASE2}.
@@ -172,13 +197,26 @@ public interface DiskEntry extends RegionEntry {
      * @since 5.1
      */
     static Object getValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) {
-      Object v = getOffHeapValueOnDiskOrBuffer(entry, dr, context);
+      @Released Object v = getOffHeapValueOnDiskOrBuffer(entry, dr, context);
       if (v instanceof CachedDeserializable) {
+        if (v instanceof Chunk) {
+          @Released Chunk ohv = (Chunk) v;
+          try {
+            v = ohv.getDeserializedValue(null, null);
+            if (v == ohv) {
+              throw new IllegalStateException("sqlf tried to use getValueOnDiskOrBuffer");
+            }
+          } finally {
+            ohv.release(); // OFFHEAP the offheap ref is decremented here
+          }
+        } else {
           v = ((CachedDeserializable)v).getDeserializedValue(null, null);
+        }
       }
       return v;
     }
     
+    @Retained
     static Object getOffHeapValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, RegionEntryContext context) {
       DiskId did = entry.getDiskId();
       Object syncObj = did;
@@ -191,7 +229,7 @@ public interface DiskEntry extends RegionEntry {
       try {
         synchronized (syncObj) {
           if (did != null && did.isPendingAsync()) {
-            Object v = entry._getValueUse(context, true); // TODO:KIRK:OK Rusty had Object v = entry.getValueWithContext(context);
+            @Retained Object v = entry._getValueRetain(context, true); // TODO:KIRK:OK Rusty had Object v = entry.getValueWithContext(context);
             if (Token.isRemovedFromDisk(v)) {
               v = null;
             }
@@ -277,7 +315,7 @@ public interface DiskEntry extends RegionEntry {
      */
     static boolean fillInValue(DiskEntry de, InitialImageOperation.Entry entry,
                                DiskRegion dr, DM mgr, ByteArrayDataInput in, RegionEntryContext context) {
-      Object v = null;
+      @Retained @Released Object v = null;
       DiskId did;
       synchronized (de) {
         did = de.getDiskId();
@@ -293,7 +331,9 @@ public interface DiskEntry extends RegionEntry {
       synchronized (syncObj) {
         entry.setLastModified(mgr, de.getLastModified());
                               
-        v = de._getValueUse(context, true); // OFFHEAP copied to heap entry; todo allow entry to refer to offheap since it will be copied to network.
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(entry);
+        v = de._getValueRetain(context, true); // OFFHEAP copied to heap entry; todo allow entry to refer to offheap since it will be copied to network.
+        SimpleMemoryAllocatorImpl.setReferenceCountOwner(null);
         if (v == null) {
           if (did == null) {
             // fix for bug 41449
@@ -342,9 +382,23 @@ public interface DiskEntry extends RegionEntry {
         // fix for bug 31757
         return false;
       } else if (v instanceof CachedDeserializable) {
-          {
+        try {
+          if (v instanceof StoredObject && !((StoredObject) v).isSerialized()) {
+            entry.setSerialized(false);
+            entry.value = ((StoredObject) v).getDeserializedForReading();
+            
+            //For SQLFire we prefer eager deserialized
+//            if(v instanceof ByteSource) {
+//              entry.setEagerDeserialize();
+//            }
+          } else {
             // don't serialize here if it is not already serialized
+            
             Object tmp = ((CachedDeserializable)v).getValue();
+          //For SQLFire we prefer eager deserialized
+//            if(v instanceof ByteSource) {
+//              entry.setEagerDeserialize();
+//            }
             if (tmp instanceof byte[]) {
               byte[] bb = (byte[])tmp;
               entry.value = bb;
@@ -371,6 +425,14 @@ public interface DiskEntry extends RegionEntry {
               }
             }
           }
+        } finally {
+          // If v == entry.value then v is assumed to be an OffHeapByteSource
+          // and release() will be called on v after the bytes have been read from
+          // off-heap.
+          if (v != entry.value) {
+            OffHeapHelper.releaseWithNoTracking(v);
+          }
+        }
       }
       else if (v instanceof byte[]) {
         entry.value = v;
@@ -390,10 +452,23 @@ public interface DiskEntry extends RegionEntry {
         entry.setLocalInvalid();
       } else if (v == Token.TOMBSTONE) {
         entry.setTombstone();
-      } else {
+      }
+      else {
+        Object preparedValue = v;
+        if (preparedValue != null) {
+          preparedValue = AbstractRegionEntry.prepareValueForGII(preparedValue);
+          if (preparedValue == null) {
+            return false;
+          }
+        }
+      if (CachedDeserializableFactory.preferObject()) {
+        entry.value = preparedValue;
+        entry.setEagerDeserialize();
+      }
+      else {
         try {
           HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
-          BlobHelper.serializeTo(v, hdos);
+          BlobHelper.serializeTo(preparedValue, hdos);
           hdos.trim();
           entry.value = hdos;
           entry.setSerialized(true);
@@ -403,6 +478,7 @@ public interface DiskEntry extends RegionEntry {
           throw e2;
         }
       }
+      }
       return true;
     }
 
@@ -441,7 +517,8 @@ public interface DiskEntry extends RegionEntry {
           incrementBucketStats(r, 0/*InVM*/, 1/*OnDisk*/, did.getValueLength());
         }
         else {
-          entry.setValueWithContext(drv, AbstractRegionMap.prepareValueForCache((RegionEntryContext) r, re.getValue()));
+          entry.setValueWithContext(drv, entry.prepareValueForCache((RegionEntryContext) r,
+              re.getValue(), false));
           drv.incNumEntriesInVM(1L);
           incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0);
         }
@@ -456,85 +533,335 @@ public interface DiskEntry extends RegionEntry {
       }
     }
     
-    private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async) throws RegionClearedException {
-      writeToDisk(entry, region, async, null);
+    private static final ValueWrapper INVALID_VW = new ByteArrayValueWrapper(true, INVALID_BYTES);
+    private static final ValueWrapper LOCAL_INVALID_VW = new ByteArrayValueWrapper(true, LOCAL_INVALID_BYTES);
+    private static final ValueWrapper TOMBSTONE_VW = new ByteArrayValueWrapper(true, TOMBSTONE_BYTES);
+    
+    public static interface ValueWrapper {
+      public boolean isSerialized();
+      public int getLength();
+      public byte getUserBits();
+      public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException;
+      public String getBytesAsString();
     }
+    public static interface Flushable {
+      public void flush() throws IOException;
+
+      public void flush(ByteBuffer bb, ByteBuffer chunkbb) throws IOException;
+    }
+    public static class ByteArrayValueWrapper implements ValueWrapper {
+      public final boolean isSerializedObject;
+      public final byte[] bytes;
+      
+     public ByteArrayValueWrapper(boolean isSerializedObject, byte[] bytes) {
+        this.isSerializedObject = isSerializedObject;
+        this.bytes = bytes;
+      }
+
+      @Override
+      public boolean isSerialized() {
+        return this.isSerializedObject;
+      }
+
+      @Override
+      public int getLength() {
+        return (this.bytes != null) ? this.bytes.length : 0;
+      }
+
+      private boolean isInvalidToken() {
+        return this == INVALID_VW;
+      }
+
+      private boolean isLocalInvalidToken() {
+        return this == LOCAL_INVALID_VW;
+      }
+
+      private boolean isTombstoneToken() {
+        return this == TOMBSTONE_VW;
+      }
+
+      @Override
+      public byte getUserBits() {
+        byte userBits = 0x0;
+        if (isSerialized()) {
+          if (isTombstoneToken()) {
+            userBits = EntryBits.setTombstone(userBits, true);
+          } else if (isInvalidToken()) {
+            userBits = EntryBits.setInvalid(userBits, true);
+          } else if (isLocalInvalidToken()) {
+            userBits = EntryBits.setLocalInvalid(userBits, true);
+          } else {
+            if (this.bytes == null) {
+              throw new IllegalStateException("userBits==1 and value is null");
+            } else if (this.bytes.length == 0) {
+              throw new IllegalStateException("userBits==1 and value is zero length");
+            }
+            userBits = EntryBits.setSerialized(userBits, true);
+          }
+        }
+        return userBits;
+      }
+
+      @Override
+      public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException {
+        int offset = 0;
+        final int maxOffset = getLength();
+        while (offset < maxOffset) {
+          int bytesThisTime = maxOffset - offset;
+          boolean needsFlush = false;
+          if (bytesThisTime > bb.remaining()) {
+            needsFlush = true;
+            bytesThisTime = bb.remaining();
+          }
+          bb.put(this.bytes, offset, bytesThisTime);
+          offset += bytesThisTime;
+          if (needsFlush) {
+            flushable.flush();
+          }
+        }
+      }
+
+      @Override
+      public String getBytesAsString() {
+        if (this.bytes == null) {
+          return "null";
+        }
+        StringBuffer sb = new StringBuffer();
+        int len = getLength();
+        for (int i = 0; i < len; i++) {
+          sb.append(this.bytes[i]).append(", ");
+        }
+        return sb.toString();
+      }
+    }
+    
     /**
-     * Writes the key/value object stored in the given entry to disk
-     * @throws RegionClearedException
-     * 
-     * @see DiskRegion#put
+     * This class is a bit of a hack used by the compactor.
+     * For the compactor always copies to a byte[] so
+     * this class is just a simple wrapper.
+     * It is possible that the length of the byte array is greater
+     * than the actual length of the wrapped data.
+     * At the time we create this we are all done with isSerialized
+     * and userBits so those methods are not supported.
      */
-    private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async, EntryEventImpl event) throws RegionClearedException {
-      DiskRegion dr = region.getDiskRegion();
-      byte[] bytes = null;
-      boolean isSerializedObject = true;
+    public static class CompactorValueWrapper extends ByteArrayValueWrapper {
+      private final int length;
+      
+      public CompactorValueWrapper(byte[] bytes, int length) {
+        super(false, bytes);
+        this.length = length;
+      }
+      
+      @Override
+      public boolean isSerialized() {
+        throw new UnsupportedOperationException();
+      }
 
-      if (event != null && event.getCachedSerializedNewValue() != null) {
-        bytes = event.getCachedSerializedNewValue();
-      } else {
-        // If event != null then we could get the new value from it.
-        // Getting it from the entry is expensive on a compressed region.
-        Object value;
-        if (event != null && !event.hasDelta() && event.getRawNewValue() != null) {
-          // We don't do this for the delta case because getRawNewValue returns delta
-          // and we want to write the entire new value to disk.
-          value = event.getRawNewValue();
+      @Override
+      public int getLength() {
+        return this.length;
+      }
+
+      @Override
+      public byte getUserBits() {
+        throw new UnsupportedOperationException();
+      }
+    }
+    
+    /**
+     * Note that the Chunk this ValueWrapper is created with
+     * is unretained so it must be used before the owner of
+     * the chunk releases it.
+     * Since the RegionEntry that has the value we are writing to
+     * disk has it retained we are ok as long as this ValueWrapper's
+     * life ends before the RegionEntry sync is released.
+     * Note that this class is only used with uncompressed chunks.
+     */
+    public static class ChunkValueWrapper implements ValueWrapper {
+      private final @Unretained Chunk chunk;
+      public ChunkValueWrapper(Chunk c) {
+        assert !c.isCompressed();
+        this.chunk = c;
+      }
+      @Override
+      public boolean isSerialized() {
+        return this.chunk.isSerialized();
+      }
+      @Override
+      public int getLength() {
+        return this.chunk.getDataSize();
+      }
+      @Override
+      public byte getUserBits() {
+        byte userBits = 0x0;
+        if (isSerialized()) {
+          userBits = EntryBits.setSerialized(userBits, true);
+        }
+        return userBits;
+      }
+      @Override
+      public void sendTo(ByteBuffer bb, Flushable flushable) throws IOException {
+        final int maxOffset = getLength();
+        if (maxOffset == 0) {
+          return;
+        }
+        if (maxOffset > bb.capacity()) {
+          ByteBuffer chunkbb = this.chunk.createDirectByteBuffer();
+          if (chunkbb != null) {
+            flushable.flush(bb, chunkbb);
+            return;
+          }
+        }
+        final long bbAddress = Chunk.getDirectByteBufferAddress(bb);
+        if (bbAddress != 0L) {
+          int bytesRemaining = maxOffset;
+          int availableSpace = bb.remaining();
+          long addrToWrite = bbAddress + bb.position();
+          long addrToRead = this.chunk.getAddressForReading(0, maxOffset);
+          if (bytesRemaining > availableSpace) {
+            do {
+              UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, availableSpace);
+              bb.position(bb.position()+availableSpace);
+              addrToRead += availableSpace;
+              bytesRemaining -= availableSpace;
+              flushable.flush();
+              addrToWrite = bbAddress + bb.position();
+              availableSpace = bb.remaining();
+            } while (bytesRemaining > availableSpace);
+          }
+          UnsafeMemoryChunk.copyMemory(addrToRead, addrToWrite, bytesRemaining);
+          bb.position(bb.position()+bytesRemaining);
         } else {
-          value = entry._getValueUse(region, true);
+          long addr = this.chunk.getAddressForReading(0, maxOffset);
+          final long endAddr = addr + maxOffset;
+          while (addr != endAddr) {
+            bb.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+            addr++;
+            if (!bb.hasRemaining()) {
+              flushable.flush();
+            }
+          }
         }
+      }
+      @Override
+      public String getBytesAsString() {
+        return this.chunk.getStringForm();
+      }
+    }
 
+    public static ValueWrapper createValueWrapper(Object value, EntryEventImpl event) {
       if (value == Token.INVALID) {
         // even though it is not serialized we say it is because
         // bytes will never be an empty array when it is serialized
         // so that gives us a way to specify the invalid value
         // given a byte array and a boolean flag.
-        bytes = INVALID_BYTES;
+        return INVALID_VW;
       }
       else if (value == Token.LOCAL_INVALID) {
         // even though it is not serialized we say it is because
         // bytes will never be an empty array when it is serialized
         // so that gives us a way to specify the local-invalid value
         // given a byte array and a boolean flag.
-        bytes = LOCAL_INVALID_BYTES;
+        return LOCAL_INVALID_VW;
       }
       else if (value == Token.TOMBSTONE) {
-        bytes = TOMBSTONE_BYTES;
-      }
-      else if (value instanceof byte[]) {
-        isSerializedObject = false;
-        bytes = (byte[])value;
-      }
-      else if (value instanceof CachedDeserializable) {
-        CachedDeserializable proxy = (CachedDeserializable)value;
-        bytes = proxy.getSerializedValue();
-        if (event != null) {
-          event.setCachedSerializedNewValue(bytes);
-        }
+        return TOMBSTONE_VW;
       }
       else {
-        Assert.assertTrue(!Token.isRemovedFromDisk(value));
-        if (event != null && event.getCachedSerializedNewValue() != null) {
-          bytes = event.getCachedSerializedNewValue();
-        } else {
-          bytes = EntryEventImpl.serialize(value);
-          if (bytes.length == 0) {
-            throw new IllegalStateException("serializing <" + value + "> produced empty byte array");
+        boolean isSerializedObject = true;
+        byte[] bytes;
+        if (value instanceof CachedDeserializable) {
+          CachedDeserializable proxy = (CachedDeserializable)value;
+          if (proxy instanceof Chunk) {
+            return new ChunkValueWrapper((Chunk) proxy);
           }
-          if (event != null) {
+          if (proxy instanceof StoredObject) {
+            StoredObject ohproxy = (StoredObject) proxy;
+            isSerializedObject = ohproxy.isSerialized();
+            if (isSerializedObject) {
+              bytes = ohproxy.getSerializedValue();
+            } else {
+              bytes = (byte[]) ohproxy.getDeserializedForReading();
+            }
+          } else {
+            bytes = proxy.getSerializedValue();
+          }
+          if (event != null && isSerializedObject) {
             event.setCachedSerializedNewValue(bytes);
           }
         }
+        else if (value instanceof byte[]) {
+          isSerializedObject = false;
+          bytes = (byte[])value;
+        }
+        else {
+          Assert.assertTrue(!Token.isRemovedFromDisk(value));
+          if (event != null && event.getCachedSerializedNewValue() != null) {
+            bytes = event.getCachedSerializedNewValue();
+          } else {
+            bytes = EntryEventImpl.serialize(value);
+            if (bytes.length == 0) {
+              throw new IllegalStateException("serializing <" + value + "> produced empty byte array");
+            }
+            if (event != null) {
+              event.setCachedSerializedNewValue(bytes);
+            }
+          }
+        }
+        return new ByteArrayValueWrapper(isSerializedObject, bytes);
+      }
+    }
+    public static ValueWrapper createValueWrapperFromEntry(DiskEntry entry, LocalRegion region, EntryEventImpl event) {
+      if (event != null) {
+        // For off-heap it should be faster to pass a reference to the
+        // StoredObject instead of using the cached byte[] (unless it is also compressed).
+        // Since NIO is used if the chunk of memory is large we can write it
+        // to the file with using the off-heap memory with no extra copying.
+        // So we give preference to getRawNewValue over getCachedSerializedNewValue
+        Object rawValue = null;
+        if (!event.hasDelta()) {
+          // We don't do this for the delta case because getRawNewValue returns delta
+          // and we want to write the entire new value to disk.
+          rawValue = event.getRawNewValue();
+          if (rawValue instanceof Chunk) {
+            return new ChunkValueWrapper((Chunk) rawValue);
+          }
+        }
+        if (event.getCachedSerializedNewValue() != null) {
+          return new ByteArrayValueWrapper(true, event.getCachedSerializedNewValue());
+        }
+        if (rawValue != null) {
+          return createValueWrapper(rawValue, event);
+        }
       }
+      // TODO OFFHEAP: No need to retain since we hold the sync on entry but we need a flavor of _getValue that will decompress
+      @Retained Object value = entry._getValueRetain(region, true);
+      try {
+        return createValueWrapper(value, event);
+      } finally {
+        OffHeapHelper.release(value);
       }
+    }
+    
+    private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async) throws RegionClearedException {
+      writeToDisk(entry, region, async, null);
+    }
 
-      {
-        DiskId did = entry.getDiskId();
-        // @todo does the following unmark need to be called when an async
-        // write is scheduled or is it ok for doAsyncFlush to do it?
-        did.unmarkForWriting();
-        dr.put(entry, region, bytes, isSerializedObject, async);
-      }
+    /**
+     * Writes the key/value object stored in the given entry to disk
+     * @throws RegionClearedException
+     * 
+     * @see DiskRegion#put
+     */
+    private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async, EntryEventImpl event) throws RegionClearedException {
+      writeBytesToDisk(entry, region, async, createValueWrapperFromEntry(entry, region, event));
+    }
+    
+    private static void writeBytesToDisk(DiskEntry entry, LocalRegion region, boolean async, ValueWrapper vw) throws RegionClearedException {
+      // @todo does the following unmark need to be called when an async
+      // write is scheduled or is it ok for doAsyncFlush to do it?
+      entry.getDiskId().unmarkForWriting();
+      region.getDiskRegion().put(entry, region, vw, async);
     }
 
     public static void update(DiskEntry entry, LocalRegion region, Object newValue) throws RegionClearedException {
@@ -628,25 +955,53 @@ public interface DiskEntry extends RegionEntry {
           // Second, do the stats done for the current recovered value
           if (re.getRecoveredKeyId() < 0) {
             if (!entry.isValueNull()) {
-              entry.setValueWithContext(region, null); // fixes bug 41119
+              try {
+                entry.handleValueOverflow(region);
+                entry.setValueWithContext(region, null); // fixes bug 41119
+              }finally {
+                entry.afterValueOverflow(region);
+              }
+              
             }
             dr.incNumOverflowOnDisk(1L);
             incrementBucketStats(region, 0/*InVM*/, 1/*OnDisk*/,
                                  did.getValueLength());
           } else {
-            entry.setValueWithContext(region, AbstractRegionMap.prepareValueForCache(region, re.getValue()));
+            entry.setValueWithContext(region, entry.prepareValueForCache(region, re.getValue(), false));
             dr.incNumEntriesInVM(1L);
             incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0);
           }
         }
         else {
-          entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+          //The new value in the entry needs to be set after the disk writing 
+          // has succeeded. If not , for GemFireXD , it is possible that other thread
+          // may pick this transient value from region entry ( which for 
+          //offheap will eventually be released ) as index key, 
+          //given that this operation is bound to fail in case of
+          //disk access exception.
+          
+          //entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
           if (dr.isBackup()) {
             dr.testIsRecoveredAndClear(did); // fixes bug 41409
             oldValueLength = getValueLength(did);
             if (dr.isSync()) {
-              writeToDisk(entry, region, false, event);
+              //In case of compression the value is being set first 
+              // because atleast for now , GemFireXD does not support compression
+              // if and when it does support, this needs to be taken care of else
+              // we risk Bug 48965
+              if (AbstractRegionEntry.isCompressible(dr, newValue)) {
+                entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+                
+                // newValue is prepared and compressed. We can't write compressed values to disk.
+                writeToDisk(entry, region, false, event);
+              } else {
+                writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
+                entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+              }
+              
             } else if (did.isPendingAsync() && !maintainRVV) {
+              entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+              
               // nothing needs to be done except
               // fixing up LRU stats
               // @todo fixup LRU stats if needed
@@ -665,14 +1020,20 @@ public interface DiskEntry extends RegionEntry {
               if(stamp != null) {
                 tag = stamp.asVersionTag();
               }
+              entry.setValueWithContext(region, newValue); 
             }
           } else if (did != null) {
+            entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+            
             // Mark the id as needing to be written
             // The disk remove that this section used to do caused bug 30961
             // @todo this seems wrong. How does leaving it on disk fix the bug?
             did.markForWriting();
             //did.setValueSerializedSize(0);
+          }else {
+            entry.setValueWithContext(region, newValue);
           }
+          
           if (Token.isRemovedFromDisk(oldValue)) {
             // Note we now initialize entries removed and then set their
             // value once we find no existing entry.
@@ -744,10 +1105,16 @@ public interface DiskEntry extends RegionEntry {
         did.setUserBits(newValue.getUserBits());
         did.setValueLength(newValue.getValueLength());
         if (newValue.getRecoveredKeyId() >= 0) {
-          entry.setValueWithContext(context, AbstractRegionMap.prepareValueForCache(drv, newValue.getValue()));
+          entry.setValueWithContext(context, entry.prepareValueForCache(drv, newValue.getValue(), 
+              false));
         } else {
           if (!oldValueWasNull) {
-            entry.setValueWithContext(context,null); // fixes bug 41119
+            try {
+              entry.handleValueOverflow(context);
+              entry.setValueWithContext(context,null); // fixes bug 41119
+            }finally {
+              entry.afterValueOverflow(context);
+            }
           }
         }
         if (entry instanceof LRUEntry) {
@@ -765,19 +1132,24 @@ public interface DiskEntry extends RegionEntry {
     }
 
     public static Object getValueInVMOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) {
-      Object result = getValueOffHeapOrDiskWithoutFaultIn(entry, region);
+      Object result = OffHeapHelper.copyAndReleaseIfNeeded(getValueOffHeapOrDiskWithoutFaultIn(entry, region));
       if (result instanceof CachedDeserializable) {
         result = ((CachedDeserializable)result).getDeserializedValue(null, null);
       }
+      if (result instanceof StoredObject) {
+        ((StoredObject) result).release();
+        throw new IllegalStateException("sqlf tried to use getValueInVMOrDiskWithoutFaultIn");
+      }
       return result;
     }
     
+    @Retained
     public static Object getValueOffHeapOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) {
-      Object v = entry._getValueUse(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region);
+      @Retained Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region);
       if (v == null || Token.isRemovedFromDisk(v)
           && !region.isIndexCreationThread()) {
         synchronized (entry) {
-          v = entry._getValueUse(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region);
+          v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region);
           if (v == null) {
             v = Helper.getOffHeapValueOnDiskOrBuffer(entry, region.getDiskRegion(),region);
           }
@@ -786,9 +1158,17 @@ public interface DiskEntry extends RegionEntry {
       if (Token.isRemovedFromDisk(v)) {
         // fix for bug 31800
         v = null;
+//      } else if (v instanceof ByteSource) {
+//        // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it
+//        Object deserVal = ((CachedDeserializable)v).getDeserializedForReading();
+//        if (deserVal != v) {
+//          OffHeapHelper.release(v);
+//          v = deserVal;
+//        }
       }
       return v;
     }
+
     /**
      * 
      * @param entry
@@ -796,13 +1176,24 @@ public interface DiskEntry extends RegionEntry {
      * @return Value
      * @throws DiskAccessException
      */
-
-    public static Object faultInValue(DiskEntry entry, LocalRegion region)
+    public static Object faultInValue(DiskEntry entry, LocalRegion region) {
+      return faultInValue(entry, region, false);
+    }
+    @Retained
+    public static Object faultInValueRetain(DiskEntry entry, LocalRegion region) {
+      return faultInValue(entry, region, true);
+    }
+    /**
+     * @param retainResult if true then the result may be a retained off-heap reference
+     */
+    @Retained
+    private static Object faultInValue(DiskEntry entry, LocalRegion region, boolean retainResult)
     {
       DiskRegion dr = region.getDiskRegion();
-      Object v = entry._getValueUse(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region);
+      @Retained Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = entry.getValueWithContext(region);
       boolean lruFaultedIn = false;
       boolean done = false;
+      try {
       //Asif: If the entry is instance of LRU then DidkRegion cannot be null.
       //Since SqlFabric is accessing this method direcly & it passes the owning region,
       //if the region happens to be persistent PR type, the owning region passed is PR,
@@ -837,9 +1228,9 @@ public interface DiskEntry extends RegionEntry {
       if (!done
           && (v == null || Token.isRemovedFromDisk(v) && !region.isIndexCreationThread())) {
         synchronized (entry) {
-          v = entry._getValueUse(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region);
+          v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = entry.getValueWithContext(region);
           if (v == null) {
-            v = readValueFromDisk(entry, region, -1, false, null);
+            v = readValueFromDisk(entry, region);
             if (entry instanceof LRUEntry) {
               if (v != null && !Token.isInvalid(v)) {
                 lruEntryFaultIn((LRUEntry) entry, region);
@@ -850,6 +1241,12 @@ public interface DiskEntry extends RegionEntry {
           }
         }
       }
+      } finally {
+        if (!retainResult) {
+          v = OffHeapHelper.copyAndReleaseIfNeeded(v);
+          // At this point v should be either a heap object
+        }
+      }
       if (Token.isRemoved(v)) {
         // fix for bug 31800
         v = null;
@@ -866,12 +1263,30 @@ public interface DiskEntry extends RegionEntry {
       boolean lruFaultedIn = false;
       synchronized (entry) {
         if (entry.isValueNull()) {
-          Object v = readValueFromDisk(entry, recoveryStore, oplogId, true, in); // OFFHEAP: Off heap value ok since only used for token check
-          if (entry instanceof LRUEntry) {
-            if (v != null && !Token.isInvalid(v)) {
-              lruEntryFaultIn((LRUEntry) entry, recoveryStore);
-              
-              lruFaultedIn = true;
+          DiskId did = entry.getDiskId();
+          if (did != null) {
+            Object value = null;
+            DiskRecoveryStore region = recoveryStore;
+            DiskRegionView dr = region.getDiskRegionView();
+            dr.acquireReadLock();
+            try {
+              synchronized (did) {
+                // don't read if the oplog has changed.
+                if (oplogId == did.getOplogId()) {
+                  value = getValueFromDisk(dr, did, in);
+                  if (value != null) {
+                    setValueOnFaultIn(value, did, entry, dr, region);
+                  }
+                }
+              }
+            } finally {
+              dr.releaseReadLock();
+            }
+            if (entry instanceof LRUEntry) {
+              if (value != null && !Token.isInvalid(value)) {
+                lruEntryFaultIn((LRUEntry) entry, recoveryStore);
+                lruFaultedIn = true;
+              }
             }
           }
         }
@@ -881,6 +1296,38 @@ public interface DiskEntry extends RegionEntry {
       }
     }
     
+    /**
+     *  Caller must have "did" synced.
+     */
+    private static Object getValueFromDisk(DiskRegionView dr, DiskId did, ByteArrayDataInput in) {
+      Object value;
+      if (dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID) {
+        // must have been destroyed
+        value = null;
+      } else {
+        if (did.isKeyIdNegative()) {
+          did.setKeyId(- did.getKeyId());
+        }
+        // if a bucket region then create a CachedDeserializable here instead of object
+        value = dr.getRaw(did); // fix bug 40192
+        if (value instanceof BytesAndBits) {
+          BytesAndBits bb = (BytesAndBits)value;
+          if (EntryBits.isInvalid(bb.getBits())) {
+            value = Token.INVALID;
+          } else if (EntryBits.isLocalInvalid(bb.getBits())) {
+            value = Token.LOCAL_INVALID;
+          } else if (EntryBits.isTombstone(bb.getBits())) {
+            value = Token.TOMBSTONE;
+          } else if (EntryBits.isSerialized(bb.getBits())) {
+            value = readSerializedValue(bb.getBytes(), bb.getVersion(), in, false);
+          } else {
+            value = readRawValue(bb.getBytes(), bb.getVersion(), in);
+          }
+        }
+      }
+      return value;
+    }
+    
     private static void lruUpdateCallback(DiskRecoveryStore recoveryStore) {
       /* 
        * Used conditional check to see if
@@ -920,10 +1367,11 @@ public interface DiskEntry extends RegionEntry {
      * Sets the value in the entry.
      * This is only called by the faultIn code once it has determined that
      * the value is no longer in memory.
+     * return the result will only be off-heap if the value is a sqlf ByteSource. Otherwise result will be on-heap.
+     * Caller must have "entry" synced.
      */
-    private static Object readValueFromDisk(DiskEntry entry,
-        DiskRecoveryStore region, long oplogId, boolean checkOplogId,
-        ByteArrayDataInput in) {
+    @Retained
+    private static Object readValueFromDisk(DiskEntry entry, DiskRecoveryStore region) {
 
       DiskRegionView dr = region.getDiskRegionView();
       DiskId did = entry.getDiskId();
@@ -933,55 +1381,47 @@ public interface DiskEntry extends RegionEntry {
       dr.acquireReadLock();
       try {
       synchronized (did) {
-        // long id = diskId.getKeyId();
-        if (dr.isBackup() && did.getKeyId() == DiskRegion.INVALID_ID) {
-          return null; // must have been destroyed
+        Object value = getValueFromDisk(dr, did, null);
+        if (value == null) return null;
+        @Unretained Object preparedValue = setValueOnFaultIn(value, did, entry, dr, region);
+        // For Sqlfire we want to return the offheap representation.
+        // So we need to retain it for the caller to release.
+        /*if (preparedValue instanceof ByteSource) {
+          // This is the only case in which we return a retained off-heap ref.
+          ((ByteSource)preparedValue).retain();
+          return preparedValue;
+        } else */{
+          return value;
         }
-        if(checkOplogId) {
-          // If we're only supposed to read from this specific oplog, don't
-          // read if the oplog has changed.
-          if(oplogId != did.getOplogId()) {
-            return null;
-          }
-        }
-        if (did.isKeyIdNegative()) {
-          did.setKeyId(- did.getKeyId());
-        }
-        // if a bucket region then create a CachedDeserializable here instead of object
-        Object value = dr.getRaw(did); // fix bug 40192
-        if (value instanceof BytesAndBits) {
-          BytesAndBits bb = (BytesAndBits)value;
-          if (EntryBits.isInvalid(bb.getBits())) {
-            value = Token.INVALID;
-          } else if (EntryBits.isLocalInvalid(bb.getBits())) {
-            value = Token.LOCAL_INVALID;
-          } else if (EntryBits.isTombstone(bb.getBits())) {
-            value = Token.TOMBSTONE;
-          } else if (EntryBits.isSerialized(bb.getBits())) {
-            value = readSerializedValue(bb.getBytes(), bb.getVersion(),
-                in, false);
-          } else {
-            value = readRawValue(bb.getBytes(), bb.getVersion(), in);
-          }
-        }
-        int bytesOnDisk = getValueLength(did);
-        Assert.assertTrue(value != null);
-        Object preparedValue = AbstractRegionMap.prepareValueForCache((RegionEntryContext) region, value);
-        region.updateSizeOnFaultIn(entry.getKey(), region.calculateValueSize(preparedValue), bytesOnDisk);
-        //did.setValueSerializedSize(0);
-        // I think the following assertion is true but need to run
-        // a regression with it. Reenable this post 6.5
-        //Assert.assertTrue(entry._getValue() == null);
-        entry.setValueWithContext((RegionEntryContext) region, preparedValue);
-        dr.incNumEntriesInVM(1L);
-        dr.incNumOverflowOnDisk(-1L);
-        incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk);
-        return value; // OFFHEAP: off heap value return ok
       }
       } finally {
         dr.releaseReadLock();
       }
     }
+    
+    /**
+     * Caller must have "entry" and "did" synced and "dr" readLocked.
+     * @return the unretained result must be used by the caller before it releases the sync on "entry".
+     */
+    @Unretained
+    private static Object setValueOnFaultIn(Object value, DiskId did, DiskEntry entry, DiskRegionView dr, DiskRecoveryStore region) {
+//    dr.getOwner().getCache().getLogger().info("DEBUG: faulting in entry with key " + entry.getKey());
+      int bytesOnDisk = getValueLength(did);
+      // Retained by the prepareValueForCache call for the region entry.
+      // NOTE that we return this value unretained because the retain is owned by the region entry not the caller.
+      @Retained Object preparedValue = entry.prepareValueForCache((RegionEntryContext) region, value,
+          false);
+      region.updateSizeOnFaultIn(entry.getKey(), region.calculateValueSize(preparedValue), bytesOnDisk);
+      //did.setValueSerializedSize(0);
+      // I think the following assertion is true but need to run
+      // a regression with it. Reenable this post 6.5
+      //Assert.assertTrue(entry._getValue() == null);
+      entry.setValueWithContext((RegionEntryContext) region, preparedValue);
+      dr.incNumEntriesInVM(1L);
+      dr.incNumOverflowOnDisk(-1L);
+      incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk);
+      return preparedValue;
+    }
 
     static Object readSerializedValue(byte[] valueBytes, Version version,
         ByteArrayDataInput in, boolean forceDeserialize) {
@@ -1055,7 +1495,13 @@ public interface DiskEntry extends RegionEntry {
         ((LRUEntry)entry).setDelayedDiskId(region);
         did = entry.getDiskId();
       }
-
+      
+      // Notify the SQLFire IndexManager if present
+     /* final IndexUpdater indexUpdater = region.getIndexUpdater();
+      if(indexUpdater != null && dr.isSync()) {
+        indexUpdater.onOverflowToDisk(entry);
+      }*/
+      
       int change = 0;
       boolean scheduledAsyncHere = false;
       dr.acquireReadLock();
@@ -1091,7 +1537,12 @@ public interface DiskEntry extends RegionEntry {
         } else {
           region.updateSizeOnEvict(entry.getKey(), oldSize);
           //did.setValueSerializedSize(byteSizeOnDisk);
-          entry.setValueWithContext(region,null);
+          try {
+            entry.handleValueOverflow(region);
+            entry.setValueWithContext(region,null);
+          }finally {
+            entry.afterValueOverflow(region);
+          }
           movedValueToDisk = true;
           change = ((LRUClockNode)entry).updateEntrySize(ccHelper);
         }
@@ -1169,7 +1620,12 @@ public interface DiskEntry extends RegionEntry {
                 // onDisk was already inced so just do the valueLength here
                 incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
                                      did.getValueLength());
-                entry.setValueWithContext(region,null);
+                try {
+                  entry.handleValueOverflow(region);
+                  entry.setValueWithContext(region,null);
+                }finally {
+                  entry.afterValueOverflow(region);
+                }
               }
               
               //See if we the entry we wrote to disk has the same tag
@@ -1199,6 +1655,7 @@ public interface DiskEntry extends RegionEntry {
     }
     
     public static void doAsyncFlush(VersionTag tag, LocalRegion region) {
+      if (region.isThisRegionBeingClosedOrDestroyed()) return;
       DiskRegion dr = region.getDiskRegion();
       if (!dr.isBackup()) {
         return;
@@ -1218,6 +1675,7 @@ public interface DiskEntry extends RegionEntry {
      * @since prPersistSprint1
      */
     public static void doAsyncFlush(DiskEntry entry, LocalRegion region, VersionTag tag) {
+      if (region.isThisRegionBeingClosedOrDestroyed()) return;
       DiskRegion dr = region.getDiskRegion();
       dr.setClearCountReference();
       synchronized (entry) { // fixes 40116
@@ -1239,6 +1697,7 @@ public interface DiskEntry extends RegionEntry {
             boolean remove = false;
             try {
               if (Token.isRemovedFromDisk(entryVal)) {
+                if (region.isThisRegionBeingClosedOrDestroyed()) return;
                 // onDisk was already deced so just do the valueLength here
                 incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
                                      -did.getValueLength());
@@ -1280,7 +1739,12 @@ public interface DiskEntry extends RegionEntry {
                 // onDisk was already inced so just do the valueLength here
                 incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
                                      did.getValueLength());
-                entry.setValueWithContext(region,null);
+                try {
+                 entry.handleValueOverflow(region);
+                 entry.setValueWithContext(region,null);
+                }finally {
+                  entry.afterValueOverflow(region);
+                }
               }
             } catch (RegionClearedException ignore) {
               // no need to do the op since it was clobbered by a region clear

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskInitFile.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskInitFile.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskInitFile.java
index 7425eca..34eb946 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskInitFile.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskInitFile.java
@@ -380,11 +380,21 @@ public class DiskInitFile implements DiskInitFileInterpreter {
   public static final byte IFREC_CLEAR_REGION_WITH_RVV_ID = 83;
   
   /**
-   * Written to IF. Used to record regions config Byte Format: RegionId 1:
-   * lruAlgorithm 1: lruAction 4: lruLimit (int) // no need to ObjectSize during
-   * recovery since all data is in blob form 4: concurrencyLevel (int) 4:
-   * initialCapacity (int) 4: loadFactor (float) 1: statisticsEnabled (boolean)
-   * 1: isBucket (boolean) variable: compressorClassName 1: EndOfRecordMarker
+   * Written to IF. Used to record regions config Byte Format: 
+   * RegionId 
+   * 1: lruAlgorithm 
+   * 1: lruAction 
+   * 4: lruLimit (int) // no need to ObjectSize during recovery since all data is in blob form 
+   * 4: concurrencyLevel (int) 
+   * 4: initialCapacity (int) 
+   * 4: loadFactor (float)
+   * 1: statisticsEnabled (boolean)
+   * 1: isBucket (boolean) 
+   * variable: partitionName (utf)
+   * 4: startingBucketId (int)
+   * variable: compressorClassName (utf)
+   * 1: versioned (boolean)
+   * 1: EndOfRecordMarker
    * 
    */
   public static final byte IFREC_REGION_CONFIG_ID_80 = 88;
@@ -400,6 +410,27 @@ public class DiskInitFile implements DiskInitFileInterpreter {
    */
   public static final byte OPLOG_MAGIC_SEQ_ID = 89;
   
+  /**
+   * Written to IF. Used to record regions config Byte Format: 
+   * RegionId 
+   * 1: lruAlgorithm 
+   * 1: lruAction 
+   * 4: lruLimit (int) // no need to ObjectSize during recovery since all data is in blob form 
+   * 4: concurrencyLevel (int) 
+   * 4: initialCapacity (int) 
+   * 4: loadFactor (float)
+   * 1: statisticsEnabled (boolean)
+   * 1: isBucket (boolean) 
+   * variable: partitionName (utf)
+   * 4: startingBucketId (int)
+   * variable: compressorClassName (utf)
+   * 1: versioned (boolean)
+   * 1: offHeap (boolean) added in 9.0
+   * 1: EndOfRecordMarker
+   * @since 9.0
+   */
+  public static final byte IFREC_REGION_CONFIG_ID_90 = 90;
+
   private final DiskStoreImpl parent;
   
   private final File ifFile;
@@ -632,7 +663,7 @@ public class DiskInitFile implements DiskInitFileInterpreter {
                               float loadFactor, boolean statisticsEnabled,
                               boolean isBucket, EnumSet<DiskRegionFlag> flags,
                               String partitionName, int startingBucketId,
-                              String compressorClassName) {
+                              String compressorClassName, boolean offHeap) {
     DiskRegionView dr = getDiskRegionById(drId);
     if (dr != null) {
       //We need to add the IS_WITH_VERSIONING to persistent regions
@@ -650,7 +681,7 @@ public class DiskInitFile implements DiskInitFileInterpreter {
     dr.setConfig(lruAlgorithm, lruAction, lruLimit,
                  concurrencyLevel, initialCapacity, loadFactor,
                  statisticsEnabled, isBucket, flags, 
-                 partitionName, startingBucketId, compressorClassName);
+                 partitionName, startingBucketId, compressorClassName, offHeap);
 
     // Just count this as a live record even though it is possible
     // that we have an extra one due to the config changing while
@@ -922,14 +953,14 @@ public class DiskInitFile implements DiskInitFileInterpreter {
                                      DiskExceptionHandler exceptionHandler,
                                      RegionAttributes ra, EnumSet<DiskRegionFlag> flags,
                                      String partitionName, int startingBucketId, 
-                                     Compressor compressor) {
+                                     Compressor compressor, boolean offHeap) {
     lock.lock(false);
     try {
       // need to call the constructor and addDiskRegion while synced
       DiskRegion result = new DiskRegion(dsi, name, isBucket, isPersistBackup,
           overflowEnabled, isSynchronous,
           stats, cancel, exceptionHandler, ra, flags, partitionName, startingBucketId,
-          compressor == null ? null : compressor.getClass().getName());
+          compressor == null ? null : compressor.getClass().getName(), offHeap);
       dsi.addDiskRegion(result);
       return result;
     } finally {
@@ -1750,8 +1781,9 @@ public class DiskInitFile implements DiskInitFileInterpreter {
   private void writeRegionConfig(DiskRegionView drv) {
     try {
       int len = estimateByteSize(drv.getPartitionName());
-      HeapDataOutputStream bb = new HeapDataOutputStream(1+DR_ID_MAX_BYTES+1+1+4+4+4+1+1+4+len+4, Version.CURRENT);
-      bb.write(IFREC_REGION_CONFIG_ID_80);
+      int comprLen = estimateByteSize(drv.getCompressorClassName());
+      HeapDataOutputStream bb = new HeapDataOutputStream(1+DR_ID_MAX_BYTES+1+1+4+4+4+1+1+4+len+4+1+1+1, Version.CURRENT);
+      bb.write(IFREC_REGION_CONFIG_ID_90);
       writeDiskRegionID(bb, drv.getId());
       bb.write(drv.getLruAlgorithm());
       bb.write(drv.getLruAction());
@@ -1766,6 +1798,8 @@ public class DiskInitFile implements DiskInitFileInterpreter {
       bb.writeInt(drv.getStartingBucketId());
       bb.writeUTF(drv.getCompressorClassName() == null ? "" : drv.getCompressorClassName());
       bb.writeBoolean(flags.contains(DiskRegionFlag.IS_WITH_VERSIONING));
+      // TODO the offheap flag needs to be in a new version
+      bb.writeBoolean(drv.getOffHeap());
       bb.write(END_OF_RECORD_ID);
       writeIFRecord(bb, false); // don't do stats for these small records
     } catch (IOException ex) {
@@ -2217,6 +2251,9 @@ public class DiskInitFile implements DiskInitFileInterpreter {
         this.ifRAF.close();
       } catch (IOException ignore) {
       }
+      for (DiskRegionView k: this.getKnown()) {
+        k.close();
+      }
       if (this.liveRegions == 0 && !parent.isValidating()) {
         basicDestroy();
       }
@@ -2628,6 +2665,7 @@ public class DiskInitFile implements DiskInitFileInterpreter {
                              String loadFactorOption,
                              String compressorClassNameOption,
                              String statisticsEnabledOption,
+                             String offHeapOption,
                              boolean printToConsole) {
     StringBuffer sb = new StringBuffer();
     ArrayList<PlaceHolderDiskRegion> buckets = new ArrayList<PlaceHolderDiskRegion>();
@@ -2646,7 +2684,7 @@ public class DiskInitFile implements DiskInitFileInterpreter {
       for (PlaceHolderDiskRegion dr: buckets) {
         String message = basicModifyRegion(printInfo, dr, lruOption, lruActionOption, lruLimitOption,
             concurrencyLevelOption, initialCapacityOption, loadFactorOption,
-            compressorClassNameOption, statisticsEnabledOption, printToConsole);
+            compressorClassNameOption, statisticsEnabledOption, offHeapOption, printToConsole);
         if (printInfo)
           sb.append(message);
         printInfo = false;
@@ -2665,13 +2703,14 @@ public class DiskInitFile implements DiskInitFileInterpreter {
                            String loadFactorOption,
                            String compressorClassNameOption,
                            String statisticsEnabledOption,
+                           String offHeapOption,
                            boolean printToConsole) {
     lock.lock();
     try {
       return basicModifyRegion(false, drv, lruOption, lruActionOption, lruLimitOption,
           concurrencyLevelOption, initialCapacityOption,
           loadFactorOption, compressorClassNameOption,
-          statisticsEnabledOption, printToConsole);
+          statisticsEnabledOption, offHeapOption, printToConsole);
     } finally {
       lock.unlock();
     }
@@ -2685,6 +2724,7 @@ public class DiskInitFile implements DiskInitFileInterpreter {
                                  String loadFactorOption,
                                  String compressorClassNameOption,
                                  String statisticsEnabledOption,
+                                 String offHeapOption,
                                  boolean printToConsole) {
     byte lruAlgorithm = drv.getLruAlgorithm();
     byte lruAction = drv.getLruAction();
@@ -2694,6 +2734,7 @@ public class DiskInitFile implements DiskInitFileInterpreter {
     float loadFactor = drv.getLoadFactor();
     String compressorClassName = drv.getCompressorClassName();
     boolean statisticsEnabled = drv.getStatisticsEnabled();
+    boolean offHeap = drv.getOffHeap();
     StringBuffer sb = new StringBuffer();
     final String lineSeparator = System.getProperty("line.separator");
     
@@ -2755,6 +2796,15 @@ public class DiskInitFile implements DiskInitFileInterpreter {
         }
       }
     }
+    if (offHeapOption != null) {
+      offHeap = Boolean.parseBoolean(offHeapOption);
+      if (!offHeap) {
+        // make sure it is "false"
+        if (!offHeapOption.equalsIgnoreCase("false")) {
+        throw new IllegalArgumentException("Expected offHeap to be \"true\" or \"false\"");
+        }
+      }
+    }
     
     sb.append("Before modification: ");
     sb.append(lineSeparator);
@@ -2764,7 +2814,7 @@ public class DiskInitFile implements DiskInitFileInterpreter {
     drv.setConfig(lruAlgorithm, lruAction, lruLimit,
                   concurrencyLevel, initialCapacity, loadFactor, statisticsEnabled,
                   drv.isBucket(), drv.getFlags(), drv.getPartitionName(), drv.getStartingBucketId(),
-                  compressorClassName);
+                  compressorClassName, offHeap);
 
     // Make sure the combined lru args can still produce a legal eviction attributes
     // before writing them to disk.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskRegion.java
index 5546344..e8548ed 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskRegion.java
@@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.compression.Compressor;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 import com.gemstone.gemfire.internal.cache.DiskInitFile.DiskRegionFlag;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.AsyncDiskEntry;
 import com.gemstone.gemfire.internal.cache.InitialImageOperation.GIIStatus;
@@ -80,7 +81,7 @@ public class DiskRegion extends AbstractDiskRegion {
                        DiskExceptionHandler exceptionHandler,
                        RegionAttributes ra, EnumSet<DiskRegionFlag> flags,
                        String partitionName, int startingBucketId, 
-                       String compressorClassName) {
+                       String compressorClassName,  boolean offHeap) {
     super(ds, name);
     if(this.getPartitionName() != null){
       // I think this code is saying to prefer the recovered partitionName and startingBucketId.
@@ -174,7 +175,7 @@ public class DiskRegion extends AbstractDiskRegion {
                 ra.getLoadFactor(),
                 ra.getStatisticsEnabled(),
                 isBucket, flags, partitionName, startingBucketId,
-                compressorClassName);
+                compressorClassName, offHeap);
     }
     
     if(!isBucket) {
@@ -191,12 +192,12 @@ public class DiskRegion extends AbstractDiskRegion {
                            DiskExceptionHandler exceptionHandler,
                            RegionAttributes ra, EnumSet<DiskRegionFlag> flags,
                            String partitionName, int startingBucketId,
-                           Compressor compressor) {
+                           Compressor compressor, boolean offHeap) {
     return dsi.getDiskInitFile().createDiskRegion(dsi, name, isBucket, isPersistBackup,
                                                   overflowEnabled, isSynchronous,
                                                   stats, cancel, exceptionHandler, ra, flags,
                                                   partitionName, startingBucketId,
-                                                  compressor);
+                                                  compressor, offHeap);
   }
 
   public CancelCriterion getCancelCriterion() {
@@ -352,9 +353,6 @@ public class DiskRegion extends AbstractDiskRegion {
    *
    * @param entry
    *          The entry which is going to be written to disk
-   * @param isSerializedObject
-   *                Do the bytes in <code>value</code> contain a serialized
-   *                object (or an actually <code>byte</code> array)?
    * @throws RegionClearedException
    *                 If a clear operation completed before the put operation
    *                 completed successfully, resulting in the put operation to
@@ -362,10 +360,10 @@ public class DiskRegion extends AbstractDiskRegion {
    * @throws IllegalArgumentException
    *         If <code>id</code> is less than zero
    */
-  final void put(DiskEntry entry, LocalRegion region, byte[] value, boolean isSerializedObject, boolean async)
+  final void put(DiskEntry entry, LocalRegion region, ValueWrapper value, boolean async)
       throws  RegionClearedException
   {
-    getDiskStore().put(region, entry, value, isSerializedObject, async);
+    getDiskStore().put(region, entry, value, async);
   }
     
   /**
@@ -852,4 +850,9 @@ public class DiskRegion extends AbstractDiskRegion {
       releaseReadLock();
     }
   }
+
+  @Override
+  public void close() {
+    // nothing needed
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java
index c6b4b0a..61d7a16 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskStoreImpl.java
@@ -71,6 +71,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM
 import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.FileUtil;
 import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.DiskEntry.Helper.ValueWrapper;
 import com.gemstone.gemfire.internal.cache.DiskEntry.RecoveredEntry;
 import com.gemstone.gemfire.internal.cache.ExportDiskRegion.ExportWriter;
 import com.gemstone.gemfire.internal.cache.lru.LRUAlgorithm;
@@ -100,6 +101,10 @@ import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.offheap.MemoryChunkWithRefCount;
+import com.gemstone.gemfire.internal.offheap.OffHeapHelper;
+import com.gemstone.gemfire.internal.offheap.annotations.Released;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
 import com.gemstone.gemfire.internal.util.BlobHelper;
 import com.gemstone.gemfire.pdx.internal.EnumInfo;
 import com.gemstone.gemfire.pdx.internal.PdxField;
@@ -681,17 +686,14 @@ public class DiskStoreImpl implements DiskStore {
    * 
    * @param entry
    *          The entry which is going to be written to disk
-   * @param isSerializedObject
-   *          Do the bytes in <code>value</code> contain a serialized object (or
-   *          an actually <code>byte</code> array)?
    * @throws RegionClearedException
    *           If a clear operation completed before the put operation completed
    *           successfully, resulting in the put operation to abort.
    * @throws IllegalArgumentException
    *           If <code>id</code> is less than zero
    */
-  final void put(LocalRegion region, DiskEntry entry, byte[] value,
-      boolean isSerializedObject, boolean async) throws RegionClearedException {
+  final void put(LocalRegion region, DiskEntry entry, ValueWrapper value,
+      boolean async) throws RegionClearedException {
     DiskRegion dr = region.getDiskRegion();
     DiskId id = entry.getDiskId();
     if (dr.isBackup() && id.getKeyId() < 0) {
@@ -737,9 +739,9 @@ public class DiskStoreImpl implements DiskStore {
           // modify and not create
           OplogSet oplogSet = getOplogSet(dr);
           if (doingCreate) {
-            oplogSet.create(region, entry, value, isSerializedObject, async);
+            oplogSet.create(region, entry, value, async);
           } else {
-            oplogSet.modify(region, entry, value, isSerializedObject, async);
+            oplogSet.modify(region, entry, value, async);
           }
         } else {
           throw new RegionClearedException(
@@ -2244,7 +2246,7 @@ public class DiskStoreImpl implements DiskStore {
             if (rvv == null && region != null) {
               // If we have no RVV, clear the region under lock
               region.txClearRegion();
-              region.entries.clear(null);
+              region.clearEntries(null);
               dr.incClearCount();
             }
           } finally {
@@ -2264,7 +2266,7 @@ public class DiskStoreImpl implements DiskStore {
       // If we have an RVV, we need to clear the region
       // without holding a lock.
       region.txClearRegion();
-      region.entries.clear(rvv);
+      region.clearEntries(rvv);
       // Note, do not increment the clear count in this case.
     }
   }
@@ -2296,7 +2298,7 @@ public class DiskStoreImpl implements DiskStore {
     return this.closed;
   }
   
-  void close() {
+  public void close() {
     close(false);
   }
 
@@ -2383,9 +2385,13 @@ public class DiskStoreImpl implements DiskStore {
     }
   }
 
+  final DiskAccessException getDiskAccessException() {
+    return diskException.get();
+  }
+
   boolean allowKrfCreation() {
     // Compactor might be stopped by cache-close. In that case, we should not create krf
-    return this.oplogCompactor == null || this.oplogCompactor.keepCompactorRunning();
+    return diskException.get() == null && (this.oplogCompactor == null || this.oplogCompactor.keepCompactorRunning());
   }
   
   void closeCompactor(boolean isPrepare) {
@@ -2429,7 +2435,7 @@ public class DiskStoreImpl implements DiskStore {
   private void basicClose(LocalRegion region, DiskRegion dr, boolean closeDataOnly) {
     if (dr.isBackup()) {
       if (region != null) {
-        region.entries.clear(null);
+        region.closeEntries();
       }
       if(!closeDataOnly) {
         getDiskInitFile().closeRegion(dr);
@@ -2443,7 +2449,7 @@ public class DiskStoreImpl implements DiskStore {
         clearAsyncQueue(region, true, null); // no need to try to write these to
                                              // disk any longer
         dr.freeAllEntriesOnDisk(region);
-        region.entries.clear(null);
+        region.closeEntries();
         this.overflowMap.remove(dr);
       }
     }
@@ -2652,14 +2658,14 @@ public class DiskStoreImpl implements DiskStore {
   private void basicDestroy(LocalRegion region, DiskRegion dr) {
     if (dr.isBackup()) {
       if (region != null) {
-        region.entries.clear(null);
+        region.closeEntries();
       }
       PersistentOplogSet oplogSet = getPersistentOplogSet(dr);
       oplogSet.basicDestroy(dr);
     } else {
       dr.freeAllEntriesOnDisk(region);
       if (region != null) {
-        region.entries.clear(null);
+        region.closeEntries();
       }
     }
   }
@@ -3739,7 +3745,7 @@ public class DiskStoreImpl implements DiskStore {
       String lruActionOption, String lruLimitOption,
       String concurrencyLevelOption, String initialCapacityOption,
       String loadFactorOption, String compressorClassNameOption,
-      String statisticsEnabledOption, boolean printToConsole) {
+      String statisticsEnabledOption, String offHeapOption, boolean printToConsole) {
     assert isOffline();
     DiskRegionView drv = getDiskInitFile().getDiskRegionByName(regName);
     if (drv == null) {
@@ -3751,13 +3757,13 @@ public class DiskStoreImpl implements DiskStore {
         return getDiskInitFile().modifyPRRegion(regName, lruOption,
             lruActionOption, lruLimitOption, concurrencyLevelOption,
             initialCapacityOption, loadFactorOption, compressorClassNameOption,
-            statisticsEnabledOption, printToConsole);
+            statisticsEnabledOption, offHeapOption, printToConsole);
       }
     } else {
       return getDiskInitFile().modifyRegion(drv, lruOption, lruActionOption,
           lruLimitOption, concurrencyLevelOption, initialCapacityOption,
           loadFactorOption, compressorClassNameOption,
-          statisticsEnabledOption, printToConsole);
+          statisticsEnabledOption, offHeapOption, printToConsole);
     }
   }
 
@@ -3828,7 +3834,7 @@ public class DiskStoreImpl implements DiskStore {
     ArrayList<Object> result = new ArrayList<>();
     Pattern pattern = createPdxRenamePattern(oldBase);
     for (RegionEntry re: foundPdx.getRecoveredEntryMap().regionEntries()) {
-      Object value = re._getValueUse(foundPdx, true);
+      Object value = re._getValueRetain(foundPdx, true);
       if (Token.isRemoved(value)) {
         continue;
       }
@@ -3902,7 +3908,7 @@ public class DiskStoreImpl implements DiskStore {
     PersistentOplogSet oplogSet = (PersistentOplogSet) getOplogSet(foundPdx);
     ArrayList<PdxType> result = new ArrayList<PdxType>();
     for (RegionEntry re: foundPdx.getRecoveredEntryMap().regionEntries()) {
-      Object value = re._getValueUse(foundPdx, true);
+      Object value = re._getValueRetain(foundPdx, true);
       if (Token.isRemoved(value)) {
         continue;
       }
@@ -3949,7 +3955,7 @@ public class DiskStoreImpl implements DiskStore {
     recoverRegionsThatAreReady();
     ArrayList<PdxType> result = new ArrayList<PdxType>();
     for (RegionEntry re: foundPdx.getRecoveredEntryMap().regionEntries()) {
-      Object value = re._getValueUse(foundPdx, true);
+      Object value = re._getValueRetain(foundPdx, true);
       if (Token.isRemoved(value)) {
         continue;
       }
@@ -3991,7 +3997,7 @@ public class DiskStoreImpl implements DiskStore {
     recoverRegionsThatAreReady();
     ArrayList<Object> result = new ArrayList<Object>();
     for (RegionEntry re: foundPdx.getRecoveredEntryMap().regionEntries()) {
-      Object value = re._getValueUse(foundPdx, true);
+      Object value = re._getValueRetain(foundPdx, true);
       if (Token.isRemoved(value)) {
         continue;
       }
@@ -4139,7 +4145,7 @@ public class DiskStoreImpl implements DiskStore {
       result = this.prlruStatMap.get(prName);
       if (result == null) {
         EvictionAttributesImpl ea = dr.getEvictionAttributes();
-        LRUAlgorithm ec = ea.createEvictionController(null);
+        LRUAlgorithm ec = ea.createEvictionController(null, dr.getOffHeap());
         StatisticsFactory sf = cache.getDistributedSystem();
         result = ec.getLRUHelper().initStats(dr, sf);
         this.prlruStatMap.put(prName, result);
@@ -4341,7 +4347,7 @@ public class DiskStoreImpl implements DiskStore {
     return diskStoreBackup;
   }
 
-  private Collection<DiskRegionView> getKnown() {
+  public Collection<DiskRegionView> getKnown() {
     return this.initFile.getKnown();
   }
 
@@ -4438,13 +4444,14 @@ public class DiskStoreImpl implements DiskStore {
       String lruLimitOption, String concurrencyLevelOption,
       String initialCapacityOption, String loadFactorOption,
       String compressorClassNameOption, String statisticsEnabledOption,
+      String offHeapOption,
       boolean printToConsole) throws Exception {
     try {
       DiskStoreImpl dsi = createForOffline(dsName, dsDirs);
       return dsi.modifyRegion(regName, lruOption, lruActionOption,
           lruLimitOption, concurrencyLevelOption, initialCapacityOption,
           loadFactorOption, compressorClassNameOption,
-          statisticsEnabledOption, printToConsole);
+          statisticsEnabledOption, offHeapOption, printToConsole);
     } finally {
       cleanupOffline();
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java
new file mode 100644
index 0000000..7719d80
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistPeerTXStateStub.java
@@ -0,0 +1,358 @@
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
+import com.gemstone.gemfire.internal.cache.tx.DistTxEntryEvent;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * DistPeerTXStateStub lives on the transaction coordinator for a distributed
+ * transaction
+ * </br>1. It forwards TX operations to primary or a selected replica (in case of RR)
+ * for each op
+ * </br>2.It also records those transactional operations in order to send those to
+ * secondaries/replicas(in one batch) at commit time.
+ * 
+ * @author shirishd
+ *
+ */
+public final class DistPeerTXStateStub extends PeerTXStateStub implements
+    DistTXCoordinatorInterface {
+  private ArrayList<DistTxEntryEvent> primaryTransactionalOperations = null;
+  private ArrayList<DistTxEntryEvent> secondaryTransactionalOperations = null;
+  private DistTXPrecommitMessage precommitDistTxMsg = null;
+  private DistTXCommitMessage commitDistTxMsg = null;
+  private DistTXRollbackMessage rollbackDistTxMsg = null;
+  private DM dm = null;
+
+  public DistPeerTXStateStub(TXStateProxy stateProxy, DistributedMember target,
+      InternalDistributedMember onBehalfOfClient) {
+    super(stateProxy, target, onBehalfOfClient);
+    primaryTransactionalOperations = new ArrayList<DistTxEntryEvent>();
+    secondaryTransactionalOperations = new ArrayList<DistTxEntryEvent>();
+  }
+
+  @Override
+  public void precommit() throws CommitConflictException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("DistPeerTXStateStub.precommit target=" + target
+          + " ,primaryTransactionalOperations="
+          + primaryTransactionalOperations
+          + " ,secondaryTransactionalOperations="
+          + secondaryTransactionalOperations);
+    }
+    assert target != null;
+    assert primaryTransactionalOperations != null
+        || secondaryTransactionalOperations != null;
+    
+    // [DISTTX] TODO Handle Stats
+    
+    this.precommitDistTxMsg
+        .setSecondaryTransactionalOperations(secondaryTransactionalOperations);
+    final Set<DistributedMember> recipients = Collections.singleton(target);
+    this.precommitDistTxMsg.setRecipients(recipients);
+    this.dm.putOutgoing(this.precommitDistTxMsg);
+    this.precommitDistTxMsg.resetRecipients();
+
+    // TODO [DISTTX] any precommit hooks
+  }
+  
+  @Override
+  public void commit() throws CommitConflictException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("DistPeerTXStateStub.commit target=" + target);
+    }
+    
+    // [DISTTX] TODO Handle Stats
+    this.dm.getStats().incSentCommitMessages(1L);
+    
+    final Set<DistributedMember> recipients = Collections.singleton(target);
+    this.commitDistTxMsg.setRecipients(recipients);
+    this.dm.putOutgoing(this.commitDistTxMsg);
+    this.commitDistTxMsg.resetRecipients();
+  }
+  
+  @Override
+  public void rollback() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("DistPeerTXStateStub.rollback target=" + target);
+    }
+    
+    // [DISTTX] TODO Handle callbacks
+//    if (this.internalAfterSendRollback != null) {
+//      this.internalAfterSendRollback.run();
+//    }
+    
+    final Set<DistributedMember> recipients = Collections.singleton(target);
+    this.rollbackDistTxMsg.setRecipients(recipients);
+    this.dm.putOutgoing(this.rollbackDistTxMsg);
+    this.rollbackDistTxMsg.resetRecipients();
+  }
+
+  @Override
+  public final ArrayList<DistTxEntryEvent> getPrimaryTransactionalOperations()
+      throws UnsupportedOperationInTransactionException {
+    return primaryTransactionalOperations;
+  }
+  
+  private final void addPrimaryTransactionalOperations(DistTxEntryEvent dtop) {
+    if (logger.isDebugEnabled()) {
+      // [DISTTX] TODO Remove these
+      logger.debug("DistPeerTXStateStub.addPrimaryTransactionalOperations add "
+          + dtop + " ,stub before=" + this);
+    }
+    primaryTransactionalOperations.add(dtop);
+    if (logger.isDebugEnabled()) {
+      // [DISTTX] TODO Remove these
+      logger
+          .debug("DistPeerTXStateStub.addPrimaryTransactionalOperations stub after add = "
+              + this);
+    }
+  }
+  
+  @Override
+  public final void addSecondaryTransactionalOperations(DistTxEntryEvent dtop)
+      throws UnsupportedOperationInTransactionException {
+    secondaryTransactionalOperations.add(dtop);
+  }
+
+  @Override
+  protected void cleanup() {
+    super.cleanup();
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.TXStateStub#putEntry(com.gemstone.gemfire
+   * .internal.cache.EntryEventImpl, boolean, boolean, java.lang.Object,
+   * boolean, long, boolean)
+   */
+  @Override
+  public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
+      Object expectedOldValue, boolean requireOldValue, long lastModified,
+      boolean overwriteDestroyed) {
+    if (logger.isDebugEnabled()) {
+      // [DISTTX] TODO Remove throwable
+      logger.debug("DistPeerTXStateStub.putEntry "
+          + event.getKeyInfo().getKey(), new Throwable());
+    }
+    boolean returnValue = super.putEntry(event, ifNew, ifOld, expectedOldValue,
+        requireOldValue, lastModified, overwriteDestroyed);
+    addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
+    
+    return returnValue;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.InternalDataView#putEntryOnRemote(com
+   * .gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean,
+   * java.lang.Object, boolean, long, boolean)
+   */
+  @Override
+  public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew,
+      boolean ifOld, Object expectedOldValue, boolean requireOldValue,
+      long lastModified, boolean overwriteDestroyed)
+      throws DataLocationException {
+    if (logger.isDebugEnabled()) {
+      // [DISTTX] TODO Remove throwable
+      logger.debug("DistPeerTXStateStub.putEntryOnRemote "
+          + event.getKeyInfo().getKey(), new Throwable());
+    }
+    boolean returnValue = super.putEntryOnRemote(event, ifNew, ifOld, expectedOldValue,
+        requireOldValue, lastModified, overwriteDestroyed);
+    addPrimaryTransactionalOperations(new DistTxEntryEvent(event));
+    
+    return returnValue;
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.TXStateInterface#destroyExistingEntry
+   * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean,
+   * java.lang.Object)
+   */
+  public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
+      Object expectedOldValue) throws EntryNotFoundException {
+//    logger.debug("DistPeerTXStateStub.destroyExistingEntry", new Throwable());
+    this.primaryTransactionalOperations.add(new DistTxEntryEvent(event));
+    super.destroyExistingEntry(event, cacheWrite, expectedOldValue);
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.InternalDataView#destroyOnRemote(java
+   * .lang.Integer, com.gemstone.gemfire.internal.cache.EntryEventImpl,
+   * java.lang.Object)
+   */
+  public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite,
+      Object expectedOldValue) throws DataLocationException {
+//    logger.debug("DistPeerTXStateStub.destroyOnRemote", new Throwable());
+    super.destroyOnRemote(event, cacheWrite, expectedOldValue);
+    this.primaryTransactionalOperations.add(new DistTxEntryEvent(event));
+  }
+  
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.TXStateInterface#invalidateExistingEntry
+   * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
+   */
+  public void invalidateExistingEntry(EntryEventImpl event,
+      boolean invokeCallbacks, boolean forceNewEntry) {
+//    logger
+//        .debug("DistPeerTXStateStub.invalidateExistingEntry", new Throwable());
+    super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
+    this.primaryTransactionalOperations.add(new DistTxEntryEvent(event));
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * com.gemstone.gemfire.internal.cache.InternalDataView#invalidateOnRemote
+   * (com.gemstone.gemfire.internal.cache.EntryEventImpl, boolean, boolean)
+   */
+  public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
+      boolean forceNewEntry) throws DataLocationException {
+//    logger.debug("DistPeerTXStateStub.invalidateOnRemote", new Throwable());
+    super.invalidateExistingEntry(event, invokeCallbacks, forceNewEntry);
+    this.primaryTransactionalOperations.add(new DistTxEntryEvent(event));
+  }
+  
+  public void postPutAll(DistributedPutAllOperation putallOp,
+      VersionedObjectList successfulPuts, LocalRegion region) {
+    super.postPutAll(putallOp, successfulPuts, region);
+    EntryEventImpl event = EntryEventImpl.createPutAllEvent(putallOp, region,
+        Operation.PUTALL_CREATE, putallOp.getBaseEvent().getKey(), putallOp
+            .getBaseEvent().getValue());
+    event.setEventId(putallOp.getBaseEvent().getEventId());
+    DistTxEntryEvent dtop = new DistTxEntryEvent(event);
+    dtop.setPutAllOperation(putallOp);
+    this.primaryTransactionalOperations.add(dtop);
+  }
+  
+  public void postRemoveAll(DistributedRemoveAllOperation removeAllOp,
+      VersionedObjectList successfulOps, LocalRegion region) {
+    super.postRemoveAll(removeAllOp, successfulOps, region);
+    EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(removeAllOp,
+        region, removeAllOp.getBaseEvent().getKey());
+    event.setEventId(removeAllOp.getBaseEvent().getEventId());
+    DistTxEntryEvent dtop = new DistTxEntryEvent(event);
+    dtop.setRemoveAllOperation(removeAllOp);
+    this.primaryTransactionalOperations.add(dtop);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(super.toString());
+    builder.append(" ,primary txOps=").append(this.primaryTransactionalOperations);
+    builder.append(" ,secondary txOps=").append(this.secondaryTransactionalOperations);
+    return builder.toString();
+  }
+  
+  @Override
+  public boolean getPreCommitResponse()
+      throws UnsupportedOperationInTransactionException {
+    throw new UnsupportedOperationInTransactionException(
+        LocalizedStrings.Dist_TX_PRECOMMIT_NOT_SUPPORTED_IN_A_TRANSACTION
+            .toLocalizedString("getPreCommitResponse"));
+  }
+
+  @Override
+  public boolean getRollbackResponse()
+      throws UnsupportedOperationInTransactionException {
+    throw new UnsupportedOperationInTransactionException(
+        LocalizedStrings.Dist_TX_ROLLBACK_NOT_SUPPORTED_IN_A_TRANSACTION
+            .toLocalizedString("getRollbackResponse"));
+  }
+  
+  @Override
+  public void setPrecommitMessage(DistTXPrecommitMessage precommitMsg, DM dm)
+      throws UnsupportedOperationInTransactionException {
+    this.precommitDistTxMsg = precommitMsg;
+    this.dm = dm;
+  }
+  
+  @Override
+  public void setCommitMessage(DistTXCommitMessage commitMsg, DM dm)
+      throws UnsupportedOperationInTransactionException {
+    this.commitDistTxMsg = commitMsg;
+    this.dm = dm;
+  }
+
+  @Override
+  public void setRollbackMessage(DistTXRollbackMessage rollbackMsg, DM dm)
+      throws UnsupportedOperationInTransactionException {
+    this.rollbackDistTxMsg = rollbackMsg;
+    this.dm = dm;
+  }
+  
+  @Override
+  public void gatherAffectedRegions(HashSet<LocalRegion> regionSet,
+      boolean includePrimaryRegions, boolean includeRedundantRegions)
+      throws UnsupportedOperationInTransactionException {
+    if (includePrimaryRegions) {
+      for (DistTxEntryEvent dtos : this.primaryTransactionalOperations) {
+        regionSet.add(dtos.getRegion());
+      }
+    }
+    if (includeRedundantRegions) {
+      for (DistTxEntryEvent dtos : this.secondaryTransactionalOperations) {
+        regionSet.add(dtos.getRegion());
+      }
+    }
+  }
+  
+  @Override
+  public void gatherAffectedRegionsName(TreeSet<String> sortedRegionName,
+      boolean includePrimaryRegions, boolean includeRedundantRegions)
+      throws UnsupportedOperationInTransactionException {
+    if (includePrimaryRegions) {
+      DistTXStateOnCoordinator.gatherAffectedRegions(sortedRegionName,
+          this.primaryTransactionalOperations);
+    }
+    if (includeRedundantRegions) {
+      DistTXStateOnCoordinator.gatherAffectedRegions(sortedRegionName,
+          this.secondaryTransactionalOperations);
+    }
+  }
+  
+  @Override
+  public boolean isDistTx() {
+    return true;
+  }
+  
+  @Override
+  public boolean isCreatedOnDistTxCoordinator() {
+    return true;
+  }
+  
+  @Override
+  public void finalCleanup() {
+    cleanup();
+  }
+}


Mime
View raw message