geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [24/57] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Thu, 09 Jul 2015 17:02:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java
index 1ee6d1d..ca9a4ef 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryExpiryTask.java
@@ -111,15 +111,19 @@ public class EntryExpiryTask extends ExpiryTask {
     RegionEntry re = getCheckedRegionEntry();
     Object key = re.getKey();
     LocalRegion lr = getLocalRegion();
-    EntryEventImpl event = new EntryEventImpl(
+    EntryEventImpl event = EntryEventImpl.create(
         lr, Operation.EXPIRE_DESTROY, key, null,
         createExpireEntryCallback(lr, key), false, lr.getMyId());
+    try {
     event.setPendingSecondaryExpireDestroy(isPending);
     if (lr.generateEventID()) {
       event.setNewEventId(lr.getCache().getDistributedSystem());
     }
     lr.expireDestroy(event, true); // expectedOldValue
     return true;
+    } finally {
+      event.release();
+    }
   }
   
   @Override
@@ -129,13 +133,17 @@ public class EntryExpiryTask extends ExpiryTask {
     RegionEntry re = getCheckedRegionEntry();
     Object key = re.getKey();
     LocalRegion lr = getLocalRegion();
-    EntryEventImpl event = new EntryEventImpl(lr,
+    EntryEventImpl event = EntryEventImpl.create(lr,
         Operation.EXPIRE_INVALIDATE, key, null,
         createExpireEntryCallback(lr, key), false, lr.getMyId());
+    try {
     if (lr.generateEventID()) {
       event.setNewEventId(lr.getCache().getDistributedSystem());
     }
     lr.expireInvalidate(event);
+    } finally {
+      event.release();
+    }
     return true;
   }
 
@@ -145,13 +153,17 @@ public class EntryExpiryTask extends ExpiryTask {
     RegionEntry re = getCheckedRegionEntry();
     Object key = re.getKey();
     LocalRegion lr = getLocalRegion();
-    EntryEventImpl event = new EntryEventImpl(lr,
+    EntryEventImpl event = EntryEventImpl.create(lr,
         Operation.EXPIRE_LOCAL_DESTROY, key, null,
         createExpireEntryCallback(lr, key), false, lr.getMyId());
+    try {
     if (lr.generateEventID()) {
       event.setNewEventId(lr.getCache().getDistributedSystem());
     }
     lr.expireDestroy(event, false); // expectedOldValue
+    } finally {
+      event.release();
+    }
     return true;
   }
 
@@ -161,13 +173,17 @@ public class EntryExpiryTask extends ExpiryTask {
     RegionEntry re = getCheckedRegionEntry();
     Object key = re.getKey();
     LocalRegion lr = getLocalRegion();
-    EntryEventImpl event = new EntryEventImpl(lr,
+    EntryEventImpl event = EntryEventImpl.create(lr,
         Operation.EXPIRE_LOCAL_INVALIDATE, key, null,
         createExpireEntryCallback(lr, key), false, lr.getMyId());
+    try {
     if (lr.generateEventID()) {
       event.setNewEventId(lr.getCache().getDistributedSystem());
     }
     lr.expireInvalidate(event);
+    } finally {
+      event.release();
+    }
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
index 289eaae..3a10121 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
@@ -434,6 +434,36 @@ public class EventTracker
     } // synchronized
   }
   
+  public VersionTag findVersionTagForGateway(EventID eventID) {
+    ThreadIdentifier threadID = new ThreadIdentifier(
+        eventID.getMembershipID(), eventID.getThreadID());
+        
+    EventSeqnoHolder evh = recordedEvents.get(threadID);
+    if (evh == null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("search for version tag failed as no event is recorded for {}", threadID.expensiveToString());
+      }
+      return null;
+    }
+    
+    synchronized (evh) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("search for version tag located last event for {}: {} {}",threadID.expensiveToString(), evh, eventID.getSequenceID() );
+      }
+      
+      if (evh.lastSeqno < eventID.getSequenceID()) {
+        return null;
+      }
+      // log at fine because partitioned regions can send event multiple times
+      // during normal operation during bucket region initialization
+      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && evh.versionTag == null) {
+        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, "Could not recover version tag.  Found event holder with no version tag for {}", eventID);
+      }
+      return evh.versionTag;
+    } // synchronized
+  }
+  
+  
   public VersionTag findVersionTagForBulkOp(EventID eventID) {
     ThreadIdentifier threadID = new ThreadIdentifier(
         eventID.getMembershipID(), eventID.getThreadID());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictionAttributesImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictionAttributesImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictionAttributesImpl.java
index 094f287..c575db9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictionAttributesImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictionAttributesImpl.java
@@ -174,18 +174,18 @@ public final class EvictionAttributesImpl extends EvictionAttributes
    * 
    * @see EvictionAttributes
    */
-  public LRUAlgorithm createEvictionController(Region region) 
+  public LRUAlgorithm createEvictionController(Region region, boolean isOffHeap) 
   {
     if (this.algorithm == EvictionAlgorithm.LRU_ENTRY) {
       this.evictionController = new LRUCapacityController(this.maximum, this.action,region); 
     } else if (this.algorithm == EvictionAlgorithm.LRU_HEAP) {
       this.evictionController = new HeapLRUCapacityController(this.sizer,this.action,region);       
     } else if (this.algorithm == EvictionAlgorithm.LRU_MEMORY) {
-      this.evictionController = new MemLRUCapacityController(this.maximum, this.sizer, this.action,region);
+      this.evictionController = new MemLRUCapacityController(this.maximum, this.sizer, this.action,region, isOffHeap);
     } else if(this.algorithm == EvictionAlgorithm.LIFO_ENTRY){
       this.evictionController = new LRUCapacityController(this.maximum, this.action,region);
     } else if(this.algorithm == EvictionAlgorithm.LIFO_MEMORY){
-      this.evictionController = new MemLRUCapacityController(this.maximum, this.sizer, this.action,region);
+      this.evictionController = new MemLRUCapacityController(this.maximum, this.sizer, this.action,region, isOffHeap);
     }  else {
       // for all other algorithms, return null
       this.evictionController = null;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
new file mode 100644
index 0000000..fd7af42
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/EvictorService.java
@@ -0,0 +1,276 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.EvictionCriteria;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.gemstone.gemfire.internal.offheap.Releasable;
+/**
+ * Schedules each iteration periodically. EvictorService takes absolute time and
+ * a period as input and schedules Eviction at absolute times by calculating the
+ * interval. For scheduling the next eviction iteration it also takes into
+ * account the time taken to complete one iteration. If an iteration takes more
+ * time than the specified period then another iteration is scheduled
+ * immediately.
+ * 
+ * @author skumar
+ * 
+ */
+
+public class EvictorService extends AbstractScheduledService {
+
+  private final EvictionCriteria<Object, Object> criteria;
+
+  // period is always in seconds
+  private long interval;
+
+  private volatile boolean stopScheduling;
+
+  private long nextScheduleTime;
+
+  private GemFireCacheImpl cache;
+
+  private Region region;
+  
+  private volatile ScheduledExecutorService executorService;
+
+  public EvictorService(EvictionCriteria<Object, Object> criteria,
+      long evictorStartTime, long evictorInterval, TimeUnit unit, Region r) {
+    this.criteria = criteria;
+    this.interval = unit.toSeconds(evictorInterval);
+    this.region = r;
+    try {
+      this.cache = GemFireCacheImpl.getExisting();
+    } catch (CacheClosedException cce) {
+      
+    }
+    //TODO: Unless we revisit System.currentTimeMillis or cacheTimeMillis keep the default
+//    long now = (evictorStartTime != 0 ? evictorStartTime
+//        + this.cache.getDistributionManager().getCacheTimeOffset() : this.cache
+//        .getDistributionManager().cacheTimeMillis()) / 1000;
+    long now = this.cache.getDistributionManager().cacheTimeMillis() / 1000;
+    if (this.cache.getLoggerI18n().fineEnabled()) {
+      this.cache.getLoggerI18n().fine(
+          "EvictorService: The startTime(now) is " + now + " evictorStartTime : " + evictorStartTime);
+    }
+    
+    this.nextScheduleTime = now + 10;
+
+    if (this.cache.getLoggerI18n().fineEnabled()) {
+      this.cache.getLoggerI18n().fine(
+          "EvictorService: The startTime is " + this.nextScheduleTime);
+    }
+  }
+
+  @Override
+  protected void runOneIteration() throws Exception {
+    if (this.cache.getLoggerI18n().fineEnabled()) {
+      this.cache.getLoggerI18n()
+          .fine(
+              "EvictorService: Running the iteration at "
+                  + cache.cacheTimeMillis());
+    }
+    if (stopScheduling || checkCancelled(cache)) {
+      stopScheduling(); // if check cancelled
+      if (this.cache.getLoggerI18n().fineEnabled()) {
+        this.cache
+            .getLoggerI18n()
+            .fine(
+                "EvictorService: Abort eviction since stopScheduling OR cancel in progress. Evicted 0 entries ");
+      }
+      return;
+    }
+    CachePerfStats stats = ((LocalRegion)this.region).getCachePerfStats();
+    long startEvictionTime = stats.startCustomEviction();
+    int evicted = 0;
+    long startEvaluationTime = stats.startEvaluation();
+    Iterator<Entry<Object, Object>> keysItr = null;
+    long totalIterationsTime = 0;
+    
+    keysItr = this.criteria.getKeysToBeEvicted(this.cache
+        .getDistributionManager().cacheTimeMillis(), this.region);
+    try {
+    stats.incEvaluations(this.region.size());
+    // if we have been asked to stop scheduling
+    // or the cache is closing stop in between.
+    
+    
+    while (keysItr.hasNext() && !stopScheduling && !checkCancelled(cache)) {
+      Map.Entry<Object, Object> entry = keysItr.next();
+      long startIterationTime = this.cache
+          .getDistributionManager().cacheTimeMillis();
+      Object routingObj = entry.getValue();
+      if (this.cache.getLoggerI18n().fineEnabled()) {
+        this.cache.getLoggerI18n().fine(
+            "EvictorService: Going to evict the following entry " + entry);
+      }
+      if (this.region instanceof PartitionedRegion) {
+        try {
+          PartitionedRegion pr = (PartitionedRegion)this.region;
+          stats.incEvictionsInProgress();
+          int bucketId = PartitionedRegionHelper.getHashKey(pr, routingObj);
+          BucketRegion br = pr.getDataStore().getLocalBucketById(bucketId);
+          // This has to be called on BucketRegion directly and not on the PR as
+          // PR doesn't allow operation on Secondary buckets.
+          if (br != null) {
+            if (this.cache.getLoggerI18n().fineEnabled()) {
+              this.cache.getLoggerI18n().fine(
+                  "EvictorService: Going to evict the following entry " + entry
+                      + " from bucket " + br);
+            }
+            if (br.getBucketAdvisor().isPrimary()) {
+              boolean succ = false;
+              try {
+                succ = br.customEvictDestroy(entry.getKey());
+              } catch (PrimaryBucketException e) {
+                if (this.cache.getLoggerI18n().fineEnabled()) {
+                  this.cache.getLoggerI18n().warning(
+                      LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
+                }
+              }
+              
+              if (succ)
+                evicted++;
+              if (this.cache.getLoggerI18n().fineEnabled()) {
+                this.cache.getLoggerI18n()
+                    .fine(
+                        "EvictorService: Evicted the following entry " + entry
+                            + " from bucket " + br + " successfully " + succ
+                            + " the value in buk is " /*
+                                                       * +
+                                                       * br.get(entry.getKey())
+                                                       */);
+              }
+            }
+          }
+          stats.incEvictions();
+        } catch (Exception e) {
+          if (this.cache.getLoggerI18n().fineEnabled()) {
+            this.cache.getLoggerI18n().warning(
+                LocalizedStrings.EVICTORSERVICE_CAUGHT_EXCEPTION_0, e);
+          }
+          // TODO:
+          // Do the exception handling .
+          // Check if the bucket is present
+          // If the entry could not be evicted then log the warning
+          // Log any other exception.
+        }finally{
+          stats.decEvictionsInProgress();
+          long endIterationTime = this.cache
+              .getDistributionManager().cacheTimeMillis();
+          totalIterationsTime += (endIterationTime - startIterationTime);
+        }
+      }
+    }
+    }finally {
+      if(keysItr instanceof Releasable) {
+        ((Releasable)keysItr).release();
+      }
+    }
+    stats.endEvaluation(startEvaluationTime, totalIterationsTime);    
+    
+    if (this.cache.getLoggerI18n().fineEnabled()) {
+      this.cache.getLoggerI18n().fine(
+          "EvictorService: Completed an iteration at time "
+              + this.cache.getDistributionManager().cacheTimeMillis() / 1000
+              + ". Evicted " + evicted + " entries.");
+    }
+    stats.endCustomEviction(startEvictionTime);
+  }
+
+  private boolean checkCancelled(GemFireCacheImpl cache) {
+    if (cache.getCancelCriterion().cancelInProgress() != null) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected Scheduler scheduler() {
+    return new CustomScheduler() {
+      @Override
+      protected Schedule getNextSchedule() throws Exception {
+        // get the current time in seconds from DM.
+        // it takes care of clock skew etc in different VMs
+        long now = cache.getDistributionManager().cacheTimeMillis() / 1000;
+        if (cache.getLoggerI18n().fineEnabled()) {
+          cache.getLoggerI18n().fine("EvictorService: Now is " + now);
+        }
+        long delay = 0;
+        if (now < nextScheduleTime) {
+          delay = nextScheduleTime - now;
+        }
+        nextScheduleTime += interval;
+        // calculate the next immediate time i.e. schedule time in seconds
+        // set the schedule.delay to that scheduletime - currenttime
+        if (cache.getLoggerI18n().fineEnabled()) {
+          cache.getLoggerI18n().fine(
+              "EvictorService: Returning the next schedule with delay " + delay
+                  + " next schedule is at : " + nextScheduleTime);
+        }
+
+        return new Schedule(delay, TimeUnit.SECONDS);
+      }
+    };
+  }
+
+  /**
+   * Region.destroy and Region.close should make sure to call this method. This
+   * will be called here.
+   */
+  public void stopScheduling() {
+    this.stopScheduling = true;
+  }
+
+  // this will be called when we stop the service.
+  // not sure if we have to do any cleanup
+  // to stop the service call stop()
+  @Override
+  protected void shutDown() throws Exception {
+    this.executorService.shutdownNow();
+    this.region= null;
+    this.cache = null;
+  }
+
+  // This will be called when we start the service.
+  // not sure if we have to any intialization
+  @Override
+  protected void startUp() throws Exception {
+
+  }
+
+  public void changeEvictionInterval(long newInterval) {
+    this.interval = newInterval / 1000;
+    if (cache.getLoggerI18n().fineEnabled()) {
+      cache.getLoggerI18n().fine(
+          "EvictorService: New interval is " + this.interval);
+    }
+  }
+
+  public void changeStartTime(long newStart) {
+    this.nextScheduleTime = newStart/1000;
+    if (cache.getLoggerI18n().fineEnabled()) {
+      cache.getLoggerI18n().fine("EvictorService: New start time is " + this.nextScheduleTime);
+    }
+  }
+  
+  protected ScheduledExecutorService executor() {
+    this.executorService = super.executor();
+    return this.executorService;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpirationScheduler.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpirationScheduler.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpirationScheduler.java
index 02eab40..62e2f64 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpirationScheduler.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpirationScheduler.java
@@ -7,7 +7,6 @@
  */
 package com.gemstone.gemfire.internal.cache;
 
-import java.util.Date;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.logging.log4j.Logger;
@@ -71,10 +70,8 @@ public class ExpirationScheduler
       if(logger.isTraceEnabled()) {
         logger.trace(LocalizedMessage.create(LocalizedStrings.ExpirationScheduler_SCHEDULING__0__TO_FIRE_IN__1__MS, new Object[] {task, Long.valueOf(task.getExpiryMillis())}));
       }
-      // By using getExpirationTime and passing a Date to schedule
-      // we get rid of two calls of System.currentTimeMillis().
-      // The Date object creation is very simple and has a very short life.
-      timer.schedule(task, new Date(task.getExpirationTime()));
+      // To fix bug 52267 do not create a Date here; instead calculate the relative duration.
+      timer.schedule(task, task.getExpiryMillis());
     }
     catch (EntryNotFoundException e) {
       // ignore - there are unsynchronized paths that allow an entry to
@@ -87,33 +84,10 @@ public class ExpirationScheduler
     }
     return task;
   }
-
-  /** schedules the given entry expiration task */
+  
+  /** schedules the given entry expiration task and returns true; returns false if not scheduled */
   public boolean addEntryExpiryTask(EntryExpiryTask task) {
-    try {
-      if(logger.isTraceEnabled()) {
-        logger.trace(LocalizedMessage.create(LocalizedStrings.ExpirationScheduler_SCHEDULING__0__TO_FIRE_IN__1__MS, new Object[] {task, Long.valueOf(task.getExpiryMillis())}));
-      }
-      // By using getExpirationTime and passing a Date to schedule
-      // we get rid of two calls of System.currentTimeMillis().
-      // The Date object creation is very simple and has a very short life.
-      timer.schedule(task, new Date(task.getExpirationTime()));
-    }
-    catch (EntryNotFoundException e) {
-      // ignore - there are unsynchronized paths that allow an entry to
-      // be destroyed out from under us.
-      return false;
-    }
-    catch (IllegalStateException e) {
-      // task must have been cancelled by another thread so don't schedule it
-      return false;
-    }
-    return true;
-  }
-
-  /** schedule a java.util.TimerTask for execution */
-  public void schedule(SystemTimer.SystemTimerTask task, long when) {
-    timer.schedule(task, when);
+    return addExpiryTask(task) != null;
   }
 
   /** @see java.util.Timer#cancel() */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
index d9d7dcd..4e1f64b 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
@@ -29,6 +29,7 @@ import com.gemstone.gemfire.cache.ExpirationAction;
 import com.gemstone.gemfire.cache.ExpirationAttributes;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.util.BridgeWriterException;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.PooledExecutorWithDMStats;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -467,12 +468,19 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
    * {@link #clearNow()} in a finally block after calling this method.
    */
   public static void setNow() {
+    now.set(calculateNow());
+  }
+  
+  private static long calculateNow() {
     GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
     if (cache != null) {
-      // Do not use cache.cacheTimeMillis here.
-      // Since expiration uses the System timer we need to use its clock.
-      now.set(System.currentTimeMillis());
+      // Use cache.cacheTimeMillis here. See bug 52267.
+      InternalDistributedSystem ids = cache.getDistributedSystem();
+      if (ids != null) {
+        return ids.getClock().cacheTimeMillis();
+      }
     }
+    return 0L;
   }
 
   /**
@@ -495,9 +503,7 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
     if (tl != null) {
       result = tl.longValue();
     } else {
-      // Do not use cache.cacheTimeMillis here.
-      // Since expiration uses the System timer we need to use its clock.
-      result = System.currentTimeMillis();
+      result = calculateNow();
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java
index 3d5382f..b9e0ddf 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FilterProfile.java
@@ -34,6 +34,7 @@ import com.gemstone.gemfire.cache.CacheEvent;
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.SerializedCacheValue;
 import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
 import com.gemstone.gemfire.cache.query.internal.cq.CqService;
 import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
@@ -1656,10 +1657,16 @@ public class FilterProfile implements DataSerializableFixedID {
       }
     }
     if (foi != null && foi.size() > 0) {
-      Object value = event.getSerializedNewValue();
-      boolean serialized = (value != null);
+      Object value;
+      boolean serialized;
+      {
+      SerializedCacheValue<?> serValue = event.getSerializedNewValue();
+      serialized = (serValue != null);
       if (!serialized) {
         value = event.getNewValue();
+      } else {
+        value = serValue.getSerializedValue();
+      }
       }
       InterestEvent iev = new InterestEvent(event.getKey(), value, !serialized);
       Operation op = event.getOperation();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 1f809ce..76488dd 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -78,6 +78,7 @@ import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheExistsException;
 import com.gemstone.gemfire.cache.CacheRuntimeException;
 import com.gemstone.gemfire.cache.CacheTransactionManager;
 import com.gemstone.gemfire.cache.CacheWriterException;
@@ -104,6 +105,7 @@ import com.gemstone.gemfire.cache.TimeoutException;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import com.gemstone.gemfire.cache.client.ClientCache;
 import com.gemstone.gemfire.cache.client.ClientRegionFactory;
 import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
@@ -114,6 +116,18 @@ import com.gemstone.gemfire.cache.client.internal.ClientMetadataService;
 import com.gemstone.gemfire.cache.client.internal.ClientRegionFactoryImpl;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSIntegrationUtil;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSFlushQueueFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSForceCompactionFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSLastCompactionTimeFunction;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
+import com.gemstone.gemfire.cache.lucene.LuceneService;
+import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider;
 import com.gemstone.gemfire.cache.query.QueryService;
 import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
 import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
@@ -124,6 +138,7 @@ import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.cache.snapshot.CacheSnapshotService;
 import com.gemstone.gemfire.cache.util.BridgeServer;
 import com.gemstone.gemfire.cache.util.GatewayConflictResolver;
+import com.gemstone.gemfire.cache.util.ObjectSizer;
 import com.gemstone.gemfire.cache.wan.GatewayReceiver;
 import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
 import com.gemstone.gemfire.cache.wan.GatewaySender;
@@ -157,6 +172,7 @@ import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SocketIOWithTimeout;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
 import com.gemstone.gemfire.internal.cache.control.ResourceAdvisor;
 import com.gemstone.gemfire.internal.cache.execute.util.FindRestEnabledServersFunction;
 import com.gemstone.gemfire.internal.cache.extension.Extensible;
@@ -165,6 +181,7 @@ import com.gemstone.gemfire.internal.cache.extension.SimpleExtensionPoint;
 import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
 import com.gemstone.gemfire.internal.cache.locks.TXLockService;
 import com.gemstone.gemfire.internal.cache.lru.HeapEvictor;
+import com.gemstone.gemfire.internal.cache.lru.OffHeapEvictor;
 import com.gemstone.gemfire.internal.cache.partitioned.RedundancyAlreadyMetException;
 import com.gemstone.gemfire.internal.cache.persistence.BackupManager;
 import com.gemstone.gemfire.internal.cache.persistence.PersistentMemberID;
@@ -176,6 +193,7 @@ import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
 import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientHealthMonitor;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
 import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAdvisor;
 import com.gemstone.gemfire.internal.cache.wan.WANServiceProvider;
@@ -190,10 +208,10 @@ import com.gemstone.gemfire.internal.logging.InternalLogWriter;
 import com.gemstone.gemfire.internal.logging.LogService;
 import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.offheap.MemoryAllocator;
 import com.gemstone.gemfire.internal.process.ClusterConfigurationNotAvailableException;
 import com.gemstone.gemfire.internal.sequencelog.SequenceLoggerImpl;
 import com.gemstone.gemfire.internal.tcp.ConnectionTable;
-import com.gemstone.gemfire.internal.util.ArrayUtils;
 import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
 import com.gemstone.gemfire.lang.Identifiable;
 import com.gemstone.gemfire.management.internal.JmxManagerAdvisee;
@@ -211,6 +229,9 @@ import com.gemstone.gemfire.pdx.internal.AutoSerializableManager;
 import com.gemstone.gemfire.pdx.internal.PdxInstanceFactoryImpl;
 import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl;
 import com.gemstone.gemfire.pdx.internal.TypeRegistry;
+import com.gemstone.gemfire.redis.GemFireRedisServer;
+import com.sun.jna.Native;
+import com.sun.jna.Platform;
 
 // @todo somebody Come up with more reasonable values for {@link #DEFAULT_LOCK_TIMEOUT}, etc.
 /**
@@ -220,7 +241,6 @@ import com.gemstone.gemfire.pdx.internal.TypeRegistry;
  */
 @SuppressWarnings("deprecation")
 public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePerfStats, DistributionAdvisee, Extensible<Cache> {
-
   private static final Logger logger = LogService.getLogger();
   
   // moved *SERIAL_NUMBER stuff to DistributionAdvisor
@@ -287,7 +307,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
    * This property defines internal function that will get executed on each node to fetch active REST service endpoints (servers).
    */
   public static final String FIND_REST_ENABLED_SERVERS_FUNCTION_ID = FindRestEnabledServersFunction.class.getName();
-  
+
+  /**
+   * True if the user is allowed lock when memory resources appear to be overcommitted. 
+   */
+  public static final boolean ALLOW_MEMORY_LOCK_WHEN_OVERCOMMITTED = Boolean.getBoolean("gemfire.Cache.ALLOW_MEMORY_OVERCOMMIT");
+
   //time in ms
   private static final int FIVE_HOURS = 5 * 60 * 60 * 1000;
   /** To test MAX_QUERY_EXECUTION_TIME option. */
@@ -473,9 +498,13 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   private final AtomicReference<BackupManager> backupManager = new AtomicReference<BackupManager>();
 
   private HeapEvictor heapEvictor = null;
+  
+  private OffHeapEvictor offHeapEvictor = null;
 
   private final Object heapEvictorLock = new Object();
   
+  private final Object offHeapEvictorLock = new Object();
+
   private ResourceEventsListener listener;
 
   /**
@@ -541,16 +570,25 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   private GemFireMemcachedServer memcachedServer;
   
   /**
+   * Redis server is started when {@link DistributionConfig#getRedisPort()} is set
+   */
+  private GemFireRedisServer redisServer;
+  
+  /**
    * {@link ExtensionPoint} support.
    * @since 8.1
    */
   private SimpleExtensionPoint<Cache> extensionPoint = new SimpleExtensionPoint<Cache>(this, this);
   
   private final CqService cqService;
+  
+  private final LuceneService luceneService;
 
   public static final int DEFAULT_CLIENT_FUNCTION_TIMEOUT = 0;
 
   private static int clientFunctionTimeout;
+  
+  private final static Boolean DISABLE_AUTO_EVICTION = Boolean.getBoolean("gemfire.disableAutoEviction");
 
   static {
     // this works around jdk bug 6427854, reported in ticket #44434
@@ -562,6 +600,51 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   }
 
   /**
+   * Invokes mlockall().  Locks  all pages mapped into the address space of the 
+   * calling process.  This includes the pages of the code, data and stack segment, 
+   * as well as shared libraries, user space kernel data, shared memory, and 
+   * memory-mapped files.  All mapped pages are guaranteed to be resident in RAM 
+   * when the call returns successfully; the pages are guaranteed to stay in RAM 
+   * until later unlocked.
+   * 
+   * @param flags
+   *    MCL_CURRENT 1 - Lock all pages which are currently mapped into the 
+   *    address space of the process.
+   *    
+   *    MCL_FUTURE  2 - Lock all pages which will become mapped into the address 
+   *    space of the process in the future.  These could be for instance new 
+   *    pages required by a growing heap and stack as well as new memory mapped 
+   *    files or shared memory regions.
+   *    
+   * @return 
+   *    0 if success, non-zero if error and errno set
+   *    
+   */
+  private static native int mlockall(int flags);
+
+  public static void lockMemory() {
+    int result = 0;
+    try {
+      Native.register(Platform.C_LIBRARY_NAME);
+      result = mlockall(1);
+      if (result == 0) {
+        return;
+      }
+    } catch (Throwable t) {
+      throw new IllegalStateException("Error trying to lock memory", t);
+    }
+
+    int errno = Native.getLastError();
+    String msg = "mlockall failed: " + errno;
+    if (errno == 1 || errno == 12) {  // EPERM || ENOMEM
+      msg = "Unable to lock memory due to insufficient free space or privileges.  " 
+          + "Please check the RLIMIT_MEMLOCK soft resource limit (ulimit -l) and " 
+          + "increase the available memory if needed";
+    }
+    throw new IllegalStateException(msg);
+  }
+  
+  /**
    * This is for debugging cache-open issues (esp. {@link com.gemstone.gemfire.cache.CacheExistsException})
    */
   @Override
@@ -670,7 +753,27 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   public static GemFireCacheImpl create(DistributedSystem system, CacheConfig cacheConfig) {
     return new GemFireCacheImpl(false, null, system, cacheConfig).init();
   }
+  public static Cache create(DistributedSystem system, boolean existingOk, CacheConfig cacheConfig)
+  throws CacheExistsException, TimeoutException, CacheWriterException,
+  GatewayException,
+  RegionExistsException 
+  {
+    GemFireCacheImpl instance = getInstance();
 
+    if (instance != null && !instance.isClosed()) {
+      if (existingOk) {
+        // Check if cache configuration matches.
+        cacheConfig.validateCacheConfig(instance);
+
+        return instance;
+      } else {
+        // instance.creationStack argument is for debugging...
+        throw new CacheExistsException(instance, LocalizedStrings.CacheFactory_0_AN_OPEN_CACHE_ALREADY_EXISTS.toLocalizedString(instance), instance.creationStack);
+      }
+    }
+    return create(system, cacheConfig);
+  }
+  
   /**
    * Creates a new instance of GemFireCache and populates it according to the <code>cache.xml</code>, if appropriate.
    */
@@ -682,6 +785,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     // Synchronized to prevent a new cache from being created
     // before an old one has finished closing
     synchronized (GemFireCacheImpl.class) {
+      
       // start JTA transaction manager within this synchronized block
       // to prevent race with cache close. fixes bug 43987
       JNDIInvoker.mapTransactions();
@@ -710,6 +814,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
       
       this.cqService = CqServiceProvider.create(this);
 
+      this.luceneService = LuceneServiceProvider.create(this);
+
       initReliableMessageQueueFactory();
 
       // Create the CacheStatistics
@@ -754,7 +860,16 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
       
       resourceManager = InternalResourceManager.createResourceManager(this);
       this.serialNumber = DistributionAdvisor.createSerialNumber();
-      getResourceManager().addResourceListener(getHeapEvictor());
+
+      getResourceManager().addResourceListener(ResourceType.HEAP_MEMORY, getHeapEvictor());
+      
+      /*
+       * Only bother creating an off-heap evictor if we have off-heap memory enabled.
+       */
+      if(null != getOffHeapStore()) {
+        getResourceManager().addResourceListener(ResourceType.OFFHEAP_MEMORY, getOffHeapEvictor());
+      }
+      
       recordedEventSweeper = EventTracker.startTrackerServices(this);
       tombstoneService = TombstoneService.initialize(this);
 
@@ -769,6 +884,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
         HARegionQueue.setMessageSyncInterval(HARegionQueue.DEFAULT_MESSAGE_SYNC_INTERVAL);
       }
       FunctionService.registerFunction(new PRContainsValueFunction());
+      FunctionService.registerFunction(new HDFSLastCompactionTimeFunction());
+      FunctionService.registerFunction(new HDFSForceCompactionFunction());
+      FunctionService.registerFunction(new HDFSFlushQueueFunction());
       this.expirationScheduler = new ExpirationScheduler(this.system);
 
       // uncomment following line when debugging CacheExistsException
@@ -1026,6 +1144,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     
     startMemcachedServer();
     
+    startRedisServer();
+    
     startRestAgentServer(this);
     
     int time = Integer.getInteger("gemfire.CLIENT_FUNCTION_TIMEOUT",
@@ -1074,6 +1194,24 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
       this.memcachedServer.start();
     }
   }
+  
+  private void startRedisServer() {
+    int port = system.getConfig().getRedisPort();
+    if (port != 0) {
+      String bindAddress = system.getConfig().getRedisBindAddress();
+      assert bindAddress != null;
+      if (bindAddress.equals(DistributionConfig.DEFAULT_REDIS_BIND_ADDRESS)) {
+        getLoggerI18n().info(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_REDIS_SERVER_ON_PORT_0,
+            new Object[] { port });
+      } else {
+        getLoggerI18n().info(LocalizedStrings.GemFireCacheImpl_STARTING_GEMFIRE_REDIS_SERVER_ON_BIND_ADDRESS_0_PORT_1,
+            new Object[] { bindAddress, port });
+      }
+      this.redisServer = new GemFireRedisServer(bindAddress, port);
+      this.redisServer.start();
+    }
+  }
+
 
   public URL getCacheXmlURL() {
     if (this.getMyId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
@@ -1754,6 +1892,16 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
   }
 
+  public OffHeapEvictor getOffHeapEvictor() {
+    synchronized (this.offHeapEvictorLock) {
+      stopper.checkCancelInProgress(null);
+      if (this.offHeapEvictor == null) {
+        this.offHeapEvictor = new OffHeapEvictor(this);
+      }
+      return this.offHeapEvictor;
+    }    
+  }
+  
   public PersistentMemberManager getPersistentMemberManager() {
     return persistentMemberManager;
   }
@@ -1887,6 +2035,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
           this.stopServers();
 
           stopMemcachedServer();
+          
+          stopRedisServer();
 
           // no need to track PR instances since we won't create any more
           // bridgeServers or gatewayHubs
@@ -1966,6 +2116,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
           closeDiskStores();
           diskMonitor.close();
           
+          closeHDFSStores();
+          
           // Close the CqService Handle.
           try {
             if (isDebugEnabled) {
@@ -2055,6 +2207,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
         } catch (CancelException e) {
           // make sure the disk stores get closed
           closeDiskStores();
+          closeHDFSStores();
           // NO DISTRIBUTED MESSAGING CAN BE DONE HERE!
 
           // okay, we're taking too long to do this stuff, so let's
@@ -2168,6 +2321,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
   }
   
+  private void stopRedisServer() {
+    if (redisServer != null)
+      this.redisServer.shutdown();
+  }
+  
   private void stopRestAgentServer() {
     if ( this.restAgent != null) {
       logger.info(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_REST_SERVER_ON_PORT_0_IS_SHUTTING_DOWN,
@@ -2455,6 +2613,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     if (isDebugEnabled) {
       logger.debug("{}: stopping bridge servers...", this);
     }
+    boolean stoppedBridgeServer = false;
     Iterator allBridgeServersIterator = this.allBridgeServers.iterator();
     while (allBridgeServersIterator.hasNext()) {
       BridgeServerImpl bridge = (BridgeServerImpl) allBridgeServersIterator.next();
@@ -2469,6 +2628,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
         }
       }
       allBridgeServers.remove(bridge);
+      stoppedBridgeServer = true;
+    }
+    if (stoppedBridgeServer) {
+      // now that all the bridge servers have stopped empty the static pool of commBuffers it might have used.
+      ServerConnection.emptyCommBufferPool();
     }
     
     // stop HA services if they had been started
@@ -2869,6 +3033,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     final boolean isPartitionedRegion = (attrs.getPartitionAttributes() == null) ? false : true;
     final boolean isReinitCreate = snapshotInputStream != null || imageTarget != null || recreate;
 
+    final String regionPath = LocalRegion.calcFullPath(name, null);
+
     try {
       for (;;) {
         getCancelCriterion().checkCancelInProgress(null);
@@ -2888,6 +3054,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
             future = (Future) this.reinitializingRegions.get(fullPath);
           }
           if (future == null) {
+            HDFSIntegrationUtil.createAndAddAsyncQueue(regionPath, attrs, this);
+            attrs = setEvictionAttributesForLargeRegion(attrs);
             if (internalRegionArgs.getInternalMetaRegion() != null) {
               rgn = internalRegionArgs.getInternalMetaRegion();
             } else if (isPartitionedRegion) {
@@ -2997,7 +3165,55 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     return rgn;
   }
 
-  public Region getRegion(String path) {
+  /**
+   * turn on eviction by default for HDFS regions
+   */
+  @SuppressWarnings("deprecation")
+  public <K, V> RegionAttributes<K, V> setEvictionAttributesForLargeRegion(
+      RegionAttributes<K, V> attrs) {
+    RegionAttributes<K, V> ra = attrs;
+    if (DISABLE_AUTO_EVICTION) {
+      return ra;
+    }
+    if (attrs.getDataPolicy().withHDFS()
+        || attrs.getHDFSStoreName() != null) {
+      // make the region overflow by default
+      EvictionAttributes evictionAttributes = attrs.getEvictionAttributes();
+      boolean hasNoEvictionAttrs = evictionAttributes == null
+          || evictionAttributes.getAlgorithm().isNone();
+      AttributesFactory<K, V> af = new AttributesFactory<K, V>(attrs);
+      String diskStoreName = attrs.getDiskStoreName();
+      // set the local persistent directory to be the same as that for
+      // HDFS store
+      if (attrs.getHDFSStoreName() != null) {
+        HDFSStoreImpl hdfsStore = findHDFSStore(attrs.getHDFSStoreName());
+        if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && hdfsStore == null) {
+          // HDFS store expected to be found at this point
+          throw new IllegalStateException(
+              LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND
+                  .toLocalizedString(attrs.getHDFSStoreName()));
+        }
+        // if there is no disk store, use the one configured for hdfs queue
+        if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && diskStoreName == null) {
+          diskStoreName = hdfsStore.getHDFSEventQueueAttributes().getDiskStoreName();
+        }
+      }
+      // set LRU heap eviction with overflow to disk for HDFS stores with
+      // local Oplog persistence
+      // set eviction attributes only if not set
+      if (hasNoEvictionAttrs) {
+        if (diskStoreName != null) {
+          af.setDiskStoreName(diskStoreName);
+        }
+        af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(
+            ObjectSizer.DEFAULT, EvictionAction.OVERFLOW_TO_DISK));
+      }
+      ra = af.create();
+    }
+    return ra;
+  }
+
+  public final Region getRegion(String path) {
     return getRegion(path, false);
   }
 
@@ -3150,6 +3366,52 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     return root.getSubregion(pathParts[1], returnDestroyedRegion);
   }
 
+  /**
+   * @param returnDestroyedRegion
+   *          if true, okay to return a destroyed partitioned region
+   */
+  public final Region getPartitionedRegion(String path, boolean returnDestroyedRegion) {
+    stopper.checkCancelInProgress(null);
+    {
+      LocalRegion result = getRegionByPath(path);
+      // Do not waitOnInitialization() for PR
+      if (result != null) {
+        if (!(result instanceof PartitionedRegion)) {
+          return null;
+        } else {
+          return result;
+        }
+      }
+    }
+ 
+    String[] pathParts = parsePath(path);
+    LocalRegion root;
+    LogWriterI18n logger = getLoggerI18n();
+    synchronized (this.rootRegions) {
+      root = (LocalRegion) this.rootRegions.get(pathParts[0]);
+      if (root == null) {
+        if (logger.fineEnabled()) {
+          logger.fine("GemFireCache.getRegion, no region found for " + pathParts[0]);
+        }
+        stopper.checkCancelInProgress(null);
+        return null;
+      }
+      if (!returnDestroyedRegion && root.isDestroyed()) {
+        stopper.checkCancelInProgress(null);
+        return null;
+      }
+    }
+    if (logger.fineEnabled()) {
+      logger.fine("GemFireCache.getPartitionedRegion, calling getSubregion on root(" + pathParts[0] + "): " + pathParts[1]);
+    }
+    Region result = root.getSubregion(pathParts[1], returnDestroyedRegion);
+    if (result != null && !(result instanceof PartitionedRegion)) {
+      return null;
+    } else {
+      return result;
+    }
+  }
+
   /** Return true if this region is initializing */
   boolean isGlobalRegionInitializing(String fullPath) {
     stopper.checkCancelInProgress(null);
@@ -3675,6 +3937,21 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
     return null;
   }
+
+  public void removeAsyncEventQueue(AsyncEventQueue asyncQueue) {
+    if (isClient()) {
+      throw new UnsupportedOperationException(
+          "operation is not supported on a client cache");
+    }
+    // first remove the gateway sender of the queue
+    if (asyncQueue instanceof AsyncEventQueueImpl) {
+      removeGatewaySender(((AsyncEventQueueImpl)asyncQueue).getSender());
+    }
+    // using gateway senders lock since async queue uses a gateway sender
+    synchronized (allGatewaySendersLock) {
+      this.allAsyncEventQueues.remove(asyncQueue);
+    }
+  }
   
   /* Cache API - get the conflict resolver for WAN */
   public GatewayConflictResolver getGatewayConflictResolver() {
@@ -4179,7 +4456,65 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
       }
     }
   }
-  
+
+  /**
+   * Wait for given sender queue to flush for given timeout.
+   * 
+   * @param id
+   *          ID of GatewaySender or AsyncEventQueue
+   * @param isAsyncListener
+   *          true if this is for an AsyncEventQueue and false if for a
+   *          GatewaySender
+   * @param maxWaitTime
+   *          maximum time to wait in seconds; zero or -ve means infinite wait
+   * 
+   * @return zero if maxWaitTime was not breached, -1 if queue could not be
+   *         found or is closed, and elapsed time if timeout was breached
+   */
+  public int waitForSenderQueueFlush(String id, boolean isAsyncListener,
+      int maxWaitTime) {
+    getCancelCriterion().checkCancelInProgress(null);
+    AbstractGatewaySender gatewaySender = null;
+    if (isAsyncListener) {
+      AsyncEventQueueImpl asyncQueue = (AsyncEventQueueImpl)
+          getAsyncEventQueue(id);
+      if (asyncQueue != null) {
+        gatewaySender = (AbstractGatewaySender) asyncQueue.getSender();
+      }
+    }
+    else {
+      gatewaySender = (AbstractGatewaySender)getGatewaySender(id);
+    }
+    RegionQueue rq;
+    final long startTime = System.currentTimeMillis();
+    long elapsedTime;
+    if (maxWaitTime <= 0) {
+      maxWaitTime = Integer.MAX_VALUE;
+    }
+    while (gatewaySender != null && gatewaySender.isRunning()
+        && (rq = gatewaySender.getQueue()) != null) {
+      if (rq.size() == 0) {
+        // return zero since it was not a timeout
+        return 0;
+      }
+      try {
+        Thread.sleep(500);
+        getCancelCriterion().checkCancelInProgress(null);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        getCancelCriterion().checkCancelInProgress(ie);
+      }
+      // clear interrupted flag before retry
+      Thread.interrupted();
+      elapsedTime = System.currentTimeMillis() - startTime;
+      if (elapsedTime >= (maxWaitTime * 1000L)) {
+        // return elapsed time
+        return (int)(elapsedTime / 1000L);
+      }
+    }
+    return -1;
+  }
+
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
   public void setQueryMonitorRequiredForResourceManager(boolean required) {
     QUERY_MONITOR_REQUIRED_FOR_RESOURCE_MANAGER = required;
@@ -4586,6 +4921,48 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
         c.setRegionAttributes(pra.toString(), af.create());
         break;
       }
+      case PARTITION_HDFS: {
+    	  AttributesFactory af = new AttributesFactory();
+          af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          af.setPartitionAttributes(paf.create());
+          af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+          af.setHDFSWriteOnly(false);
+          c.setRegionAttributes(pra.toString(), af.create());
+          break;
+        }
+      case PARTITION_REDUNDANT_HDFS: {
+    	  AttributesFactory af = new AttributesFactory();
+          af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(1);
+          af.setPartitionAttributes(paf.create());
+          af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+          af.setHDFSWriteOnly(false);
+          c.setRegionAttributes(pra.toString(), af.create());
+          break;
+        }
+      case PARTITION_WRITEONLY_HDFS_STORE: {
+        AttributesFactory af = new AttributesFactory();
+          af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          af.setPartitionAttributes(paf.create());
+          af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+          af.setHDFSWriteOnly(true);
+          c.setRegionAttributes(pra.toString(), af.create());
+          break;
+        }
+      case PARTITION_REDUNDANT_WRITEONLY_HDFS_STORE: {
+        AttributesFactory af = new AttributesFactory();
+          af.setDataPolicy(DataPolicy.HDFS_PARTITION);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          paf.setRedundantCopies(1);
+          af.setPartitionAttributes(paf.create());
+          af.setEvictionAttributes(EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.OVERFLOW_TO_DISK));
+          af.setHDFSWriteOnly(true);
+          c.setRegionAttributes(pra.toString(), af.create());
+          break;
+        }
       default:
         throw new IllegalStateException("unhandled enum " + pra);
       }
@@ -4979,10 +5356,53 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
   }
   
+  @Override
+  public HDFSStoreFactory createHDFSStoreFactory() {
+    // TODO Auto-generated method stub
+    return new HDFSStoreFactoryImpl(this);
+  }
+  
+  public HDFSStoreFactory createHDFSStoreFactory(HDFSStoreCreation creation) {
+    return new HDFSStoreFactoryImpl(this, creation);
+  }
+  public void addHDFSStore(HDFSStoreImpl hsi) {
+    HDFSStoreDirector.getInstance().addHDFSStore(hsi);
+    //TODO:HDFS Add a resource event for hdfs store creation as well 
+    // like the following disk store event
+    //system.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, dsi);
+  }
+
+  public void removeHDFSStore(HDFSStoreImpl hsi) {
+    //hsi.destroy();
+    HDFSStoreDirector.getInstance().removeHDFSStore(hsi.getName());
+    //TODO:HDFS Add a resource event for hdfs store as well 
+    // like the following disk store event
+    //system.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, dsi);
+  }
+
+  public void closeHDFSStores() {
+    HDFSRegionDirector.reset();
+    HDFSStoreDirector.getInstance().closeHDFSStores();
+  }
+
+  
+  public HDFSStoreImpl findHDFSStore(String name) {
+    return HDFSStoreDirector.getInstance().getHDFSStore(name);
+  }
+  
+  public Collection<HDFSStoreImpl> getHDFSStores() {
+    return HDFSStoreDirector.getInstance().getAllHDFSStores();
+  }
+  
+  
   public TemporaryResultSetFactory getResultSetFactory() {
     return this.resultSetFactory;
   }
-  
+
+  public MemoryAllocator getOffHeapStore() {
+    return this.getSystem().getOffHeapStore();
+  }
+
   public DiskStoreMonitor getDiskStoreMonitor() {
     return diskMonitor;
   }
@@ -5003,4 +5423,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   public CqService getCqService() {
     return this.cqService;
   }
+  
+  /**
+   * get reference to LuceneService singleton
+   * @return LuceneService
+   */
+  public LuceneService getLuceneService() {
+    return this.luceneService;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
index a5fc250..5195614 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HARegion.java
@@ -221,8 +221,9 @@ public final class HARegion extends DistributedRegion
       throws TimeoutException, CacheWriterException {
     checkReadiness();
 
-    EntryEventImpl event = new EntryEventImpl(this, Operation.UPDATE, key,
+    EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key,
         value, aCallbackArgument, false, getMyId());
+    try {
 
     Object oldValue = null;
 
@@ -234,6 +235,9 @@ public final class HARegion extends DistributedRegion
       oldValue = event.getOldValue();
     }
     return handleNotAvailable(oldValue);
+    } finally {
+      event.release();
+    }
   }
 
   /**
@@ -360,13 +364,13 @@ public final class HARegion extends DistributedRegion
   
   /**
    * @return the deserialized value
-   * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean)
+   * @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean)
    *      
    */
   @Override
   protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
       TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
-      boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones)
+      boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
     throws CacheLoaderException, TimeoutException  {
 
     Object value = null;
@@ -397,10 +401,14 @@ public final class HARegion extends DistributedRegion
             op = Operation.LOCAL_LOAD_UPDATE;
           }
 
-          EntryEventImpl event = new EntryEventImpl(
+          EntryEventImpl event = EntryEventImpl.create(
               this, op, key, value,
               aCallbackArgument, false, getMyId(), generateCallbacks);
+          try {
           re = basicPutEntry(event, 0L);
+          } finally {
+            event.release();
+          }
           if (txState == null) {
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
new file mode 100644
index 0000000..d0411f8
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSLRURegionMap.java
@@ -0,0 +1,103 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Collection;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
+import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+import com.gemstone.gemfire.internal.cache.lru.NewLRUClockHand;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Implementation of RegionMap that reads data from HDFS and adds LRU behavior
+ * 
+ * @author sbawaska
+ */
+public class HDFSLRURegionMap extends AbstractLRURegionMap implements HDFSRegionMap {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final HDFSRegionMapDelegate delegate;
+
+  /**
+   *  A tool from the eviction controller for sizing entries and
+   *  expressing limits.
+   */
+  private EnableLRU ccHelper;
+
+  /**  The list of nodes in LRU order */
+  private NewLRUClockHand lruList;
+
+  private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+
+  public HDFSLRURegionMap(LocalRegion owner, Attributes attrs,
+      InternalRegionArguments internalRegionArgs) {
+    super(internalRegionArgs);
+    assert owner instanceof BucketRegion;
+    initialize(owner, attrs, internalRegionArgs);
+    this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this);
+  }
+
+  @Override
+  public RegionEntry getEntry(Object key) {
+    return delegate.getEntry(key, null);
+  }
+
+  @Override
+  protected RegionEntry getEntry(EntryEventImpl event) {
+    return delegate.getEntry(event);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Collection<RegionEntry> regionEntries() {
+    return delegate.regionEntries();
+  }
+    
+  @Override
+  public int size() {
+    return delegate.size();
+  }
+    
+  @Override
+  public boolean isEmpty() {
+    return delegate.isEmpty();
+  }
+
+  @Override
+  protected void _setCCHelper(EnableLRU ccHelper) {
+    this.ccHelper = ccHelper;
+  }
+
+  @Override
+  protected EnableLRU _getCCHelper() {
+    return this.ccHelper;
+  }
+
+  @Override
+  protected void _setLruList(NewLRUClockHand lruList) {
+    this.lruList = lruList;
+  }
+
+  @Override
+  protected NewLRUClockHand _getLruList() {
+    return this.lruList;
+  }
+
+  @Override
+  public HDFSRegionMapDelegate getDelegate() {
+    return this.delegate;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
new file mode 100644
index 0000000..7714b0b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMap.java
@@ -0,0 +1,17 @@
+package com.gemstone.gemfire.internal.cache;
+
+/**
+ * Interface implemented by RegionMap implementations that
+ * read from HDFS.
+ * 
+ * @author sbawaska
+ *
+ */
+public interface HDFSRegionMap {
+
+  /**
+   * @return the {@link HDFSRegionMapDelegate} that does
+   * all the work
+   */
+  public HDFSRegionMapDelegate getDelegate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
new file mode 100644
index 0000000..7bdc849
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapDelegate.java
@@ -0,0 +1,532 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSEntriesSet.HDFSIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSGatewayEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSParallelGatewaySenderQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
+import com.gemstone.gemfire.internal.cache.RegionMap.Attributes;
+import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
+import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.util.concurrent.FutureResult;
+
+/**
+ * This class encapsulates all the functionality of HDFSRegionMap, so
+ * that it can be provided to HDFSLRURegionMap. 
+ * 
+ * @author sbawaska
+ */
+public class HDFSRegionMapDelegate {
+
+  private static final Logger logger = LogService.getLogger();
+
+  private final BucketRegion owner;
+
+  private ConcurrentParallelGatewaySenderQueue hdfsQueue;
+
+  private final RegionMap backingRM;
+
+  /** queue of dead iterators */
+  private final ReferenceQueue<HDFSIterator> refs;
+  
+  private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+  
+  /**
+   * used for serializing fetches from HDFS
+   */
+  private ConcurrentMap<Object, FutureResult> futures = new ConcurrentHashMap<Object, FutureResult>();
+
+  public HDFSRegionMapDelegate(LocalRegion owner, Attributes attrs,
+      InternalRegionArguments internalRegionArgs, RegionMap backingRM) {
+    assert owner instanceof BucketRegion;
+    this.owner = (BucketRegion) owner;
+    this.backingRM = backingRM;
+    refs = new ReferenceQueue<HDFSEntriesSet.HDFSIterator>();
+  }
+
+  public RegionEntry getEntry(Object key, EntryEventImpl event) {
+    
+    RegionEntry re = getEntry(key, event, true);
+    // get from tx should put the entry back in map
+    // it should be evicted once tx completes
+    /**MergeGemXDHDFSToGFE txstate does not apply for this*/
+    /* if (re != null && getTXState(event) != null) {
+    if (re != null) {
+      // put the region entry in backing CHM of AbstractRegionMap so that
+      // it can be locked in basicPut/destroy
+      RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+      if (oldRe != null) {
+        if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+          ((OffHeapRegionEntry)re).release();
+        }
+        return oldRe;
+      }
+      re.setMarkedForEviction();
+      owner.updateSizeOnCreate(key,
+          owner.calculateRegionEntryValueSize(re));
+      ((AbstractRegionMap)backingRM).incEntryCount(1);
+      ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+    }*/
+    return re;
+  }
+
+  /*
+  private TXStateInterface getTXState(EntryEventImpl event) {
+    return event != null ? event.getTXState(this.owner) : this.owner
+        .getTXState();
+  }*/
+
+  /**
+   * 
+   * @param key
+   * @param event
+   * @param forceOnHeap if true will return heap version of off-heap region entries
+   */
+  private RegionEntry getEntry(Object key, EntryEventImpl event, boolean forceOnHeap) {
+    closeDeadIterators();
+    
+    RegionEntry re = backingRM.getEntryInVM(key);
+    if (logger.isTraceEnabled() || DEBUG) {
+      logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: Found the key in CHM: " + key
+          + " ,value=" + (re == null? "null" : "[" + re._getValue() + " or (" + re.getValueAsToken() + ")]")));
+    }
+    if ((re == null || (re.isRemoved() && !re.isTombstone()))
+        && owner.getBucketAdvisor().isPrimary() && allowReadFromHDFS()) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: fetching from hdfs key:" + key));
+      }
+      try {
+        this.owner.getPartitionedRegion().hdfsCalled(key);
+        re = getEntryFromFuture(key);
+        if (re != null) {
+          return re;
+        }
+
+        assert this.owner.getPartitionedRegion().getDataPolicy()
+            .withHDFS();
+        byte[] k = EntryEventImpl.serialize(key);
+      
+        // for destroy ops we will retain the entry in the region map so
+        // tombstones can be tracked
+        //final boolean forceOnHeap = (event==null || !event.getOperation().isDestroy());
+        
+        // get from queue
+        re = getFromHDFSQueue(key, k, forceOnHeap);
+        if (re == null) {
+          // get from HDFS
+          re = getFromHDFS(key, k, forceOnHeap);
+        }
+        if (re != null && re.isTombstone()) {
+          RegionVersionVector vector = this.owner.getVersionVector();
+//          if (vector == null) {
+//            this.owner.getLogWriterI18n().info(LocalizedStrings.DEBUG,
+//            "found a tombstone in a region w/o a version vector: " + re + "; region: " + this.owner);
+//          }
+          if (vector == null
+              || vector.isTombstoneTooOld(re.getVersionStamp().getMemberID(),
+                                    re.getVersionStamp().getRegionVersion())) {
+            re = null;
+          }
+        }
+        if (logger.isTraceEnabled() || DEBUG) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: returning from hdfs re:" + re));
+        }
+      } catch (ForceReattemptException e) {
+        throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+      } catch (IOException e) {
+        throw new HDFSIOException("Error reading from HDFS", e);
+      } finally {
+        notifyFuture(key, re);
+        // If we mark it here, the table scan may miss it causing updates/delete using table
+        // scan to fail.
+//        if (re != null) {
+//          re.setMarkedForEviction();
+//        }
+        if(re != null && event != null && !re.isTombstone()) {
+          if (logger.isTraceEnabled() || DEBUG) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: loaded from hdfs re:" + re));
+          }
+          BucketRegion br = (BucketRegion)owner;
+          //CustomEvictionAttributes csAttr = br.getCustomEvictionAttributes();
+          //if(csAttr!=null)
+          event.setLoadedFromHDFS(true);
+        }
+      }
+    }
+    if(re!=null && re.isMarkedForEviction() && !re.isTombstone()) {
+      if(event!=null) {
+        event.setLoadedFromHDFS(true);
+      }
+    }
+
+    return re;
+  }
+
+  /**
+   * This method returns true if the RegionEntry should be read from HDFS.
+   * fixes #49101 by not allowing reads from HDFS for persistent regions
+   * that do not define an eviction criteria.
+   * 
+   * @return true if RegionEntry should be read from HDFS
+   */
+  private boolean allowReadFromHDFS() {
+    if (!owner.getDataPolicy().withPersistence()
+        || owner.getCustomEvictionAttributes() != null
+        || isEvictionActionLocalDestroy()){
+        /**MergeGemXDHDFSToGFE this is used for global index. Hence not required here*/ 
+        //|| owner.isUsedForIndex()) {
+      // when region does not have persistence, we have to read from HDFS (even
+      // though there is no eviction criteria) for constraint checks
+      return true;
+    }
+    return false;
+  }
+
+  private boolean isEvictionActionLocalDestroy() {
+    PartitionedRegion pr = owner.getPartitionedRegion();
+    if (pr.getEvictionAttributes() != null) {
+      return pr.getEvictionAttributes().getAction() == EvictionAction.LOCAL_DESTROY;
+    }
+    return false;
+  }
+
+  protected RegionEntry getEntry(EntryEventImpl event) {
+    RegionEntry re = getEntry(event.getKey(), event, false);
+    if (re != null && event.isLoadedFromHDFS()) {
+      // put the region entry in backing CHM of AbstractRegionMap so that
+      // it can be locked in basicPut/destroy
+      RegionEntry oldRe = backingRM.putEntryIfAbsent(event.getKey(), re);
+      if (oldRe != null) {
+        if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+          ((OffHeapRegionEntry) re).release();
+        }
+        return oldRe;
+      }
+      // since the entry is faulted in from HDFS, it must have
+      // satisfied the eviction criteria in the past, so mark it for eviction
+      re.setMarkedForEviction();
+
+      owner.updateSizeOnCreate(event.getKey(), owner.calculateRegionEntryValueSize(re));
+      ((AbstractRegionMap) backingRM).incEntryCount(1);
+      ((AbstractRegionMap) backingRM).lruEntryCreate(re);
+    }
+    return re;
+  }
+
+  @SuppressWarnings("unchecked")
+  public Collection<RegionEntry> regionEntries() {
+    closeDeadIterators();
+    if (!owner.getPartitionedRegion().includeHDFSResults()) {
+      if (logger.isDebugEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #regionEntries"));
+      }
+      return backingRM.regionEntriesInVM();
+    }
+
+    try {
+      return createEntriesSet(IteratorType.ENTRIES);
+    } catch (ForceReattemptException e) {
+      throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+    }
+  }
+    
+  public int size() {
+    closeDeadIterators();
+    if (!owner.getPartitionedRegion().includeHDFSResults()) {
+      if (logger.isDebugEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #size"));
+      }
+      return backingRM.sizeInVM();
+    }
+
+    try {
+      return createEntriesSet(IteratorType.KEYS).size();
+    } catch (ForceReattemptException e) {
+      throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+    }
+  }
+    
+  public boolean isEmpty() {
+    closeDeadIterators();
+    if (!owner.getPartitionedRegion().includeHDFSResults()) {
+      if (logger.isDebugEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Ignoring HDFS results for #isEmpty"));
+      }
+      return backingRM.sizeInVM() == 0;
+    }
+
+    try {
+      return createEntriesSet(IteratorType.KEYS).isEmpty();
+    } catch (ForceReattemptException e) {
+      throw new PrimaryBucketException(e.getLocalizedMessage(), e);
+    }
+  }
+  
+  private void notifyFuture(Object key, RegionEntry re) {
+    FutureResult future = this.futures.remove(key);
+    if (future != null) {
+      future.set(re);
+    }
+  }
+
+  private RegionEntry getEntryFromFuture(Object key) {
+    FutureResult future = new FutureResult(this.owner.getCancelCriterion());
+    FutureResult old = this.futures.putIfAbsent(key, future);
+    if (old != null) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: waiting for concurrent fetch to complete for key:" + key));
+      }
+      try {
+        return (RegionEntry) old.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        this.owner.getCache().getCancelCriterion().checkCancelInProgress(null);
+      }
+    }
+    return null;
+  }
+
+  private RegionEntry getFromHDFS(Object key, byte[] k, boolean forceOnHeap) throws IOException, ForceReattemptException {
+    SortedHoplogPersistedEvent ev;
+    try {
+      ev = (SortedHoplogPersistedEvent) owner.getHoplogOrganizer().read(k);
+    } catch (IOException e) {
+      owner.checkForPrimary();
+      throw e;
+    }
+    if (ev != null) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs ev:" + ev));
+      }
+      return getEntryFromEvent(key, ev, forceOnHeap, false);
+    }
+    return null;
+  }
+
+  /**
+   * set the versionTag on the newly faulted-in entry
+   */
+  private void setVersionTag(RegionEntry re, VersionTag versionTag) {
+    if (owner.concurrencyChecksEnabled) {
+      versionTag.setMemberID(
+            owner.getVersionVector().getCanonicalId(versionTag.getMemberID()));
+      VersionStamp versionedRe = (VersionStamp) re;
+      versionedRe.setVersions(versionTag);
+    }
+  }
+
+  private RegionEntry getFromHDFSQueue(Object key, byte[] k, boolean forceOnHeap) throws ForceReattemptException {
+    ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+    if (q == null) return null;
+    HDFSGatewayEventImpl hdfsGatewayEvent = (HDFSGatewayEventImpl) q.get(owner.getPartitionedRegion(), k, owner.getId());
+    if (hdfsGatewayEvent != null) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "HDFS: got from hdfs queue: " + hdfsGatewayEvent));
+      }
+      return getEntryFromEvent(key, hdfsGatewayEvent, forceOnHeap, false);
+    }
+    return null;
+  }
+
+  private ConcurrentParallelGatewaySenderQueue getHDFSQueue()
+      throws ForceReattemptException {
+    if (this.hdfsQueue == null) {
+      String asyncQId = this.owner.getPartitionedRegion().getHDFSEventQueueName();
+      final AsyncEventQueueImpl asyncQ =  (AsyncEventQueueImpl)this.owner.getCache().getAsyncEventQueue(asyncQId);
+      final AbstractGatewaySender gatewaySender = (AbstractGatewaySender)asyncQ.getSender();
+      AbstractGatewaySenderEventProcessor ep = gatewaySender.getEventProcessor();
+      if (ep == null) return null;
+      hdfsQueue = (ConcurrentParallelGatewaySenderQueue)ep.getQueue();
+    }
+    
+    // Check whether the queue has become primary here.
+    // There could be some time between bucket becoming a primary
+    // and underlying queue becoming a primary, so isPrimaryWithWait()
+    // waits for some time for the queue to become a primary on this member
+    final HDFSBucketRegionQueue brq = hdfsQueue.getBucketRegionQueue(
+        this.owner.getPartitionedRegion(), this.owner.getId());
+    if (brq != null) {
+      if (owner.getBucketAdvisor().isPrimary()
+          && !brq.getBucketAdvisor().isPrimaryWithWait()) {
+        InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
+            .basicGetPrimaryMember();
+        throw new PrimaryBucketException("Bucket " + brq.getName()
+            + " is not primary. Current primary holder is " + primaryHolder);
+      }
+    }
+      
+    return hdfsQueue;
+  }
+
+  public RegionEntry getEntryFromEvent(Object key, HDFSGatewayEventImpl event, boolean forceOnHeap, boolean forUpdate) {
+    Object val;
+    if (event.getOperation().isDestroy()) {
+      val = Token.TOMBSTONE;
+    } else if (event.getOperation().isInvalidate()) {
+      val = Token.INVALID;
+    } else {
+      val = event.getValue();
+    }
+    RegionEntry re = null;
+    final TXStateInterface tx = owner.getTXState();
+    if (tx == null) {
+      re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap);
+      return re;
+    }
+    else
+    if (val != null) {
+      if (((re = this.backingRM.getEntryInVM(key)) == null)
+          || (re.isRemoved() && !re.isTombstone())) {
+        boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate); 
+        re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry);
+        if (forUpdate) {
+          if (re != null && tx != null) {
+            // put the region entry in backing CHM of AbstractRegionMap so that
+            // it can be locked in basicPut/destroy
+            RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+            if (oldRe != null) {
+              if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+                ((OffHeapRegionEntry)re).release();
+              }
+              return oldRe;
+            }
+            re.setMarkedForEviction();
+            owner.updateSizeOnCreate(key,
+                owner.calculateRegionEntryValueSize(re));
+            ((AbstractRegionMap)backingRM).incEntryCount(1);
+            ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+          }
+        }
+      }
+    }
+    return re;
+  }
+
+  public RegionEntry getEntryFromEvent(Object key, SortedHoplogPersistedEvent event, boolean forceOnHeap, boolean forUpdate) {
+    Object val = getValueFromEvent(event);
+    RegionEntry re = null;
+    final TXStateInterface tx = owner.getTXState();
+    if (tx == null) {
+      re = createRegionEntry(key, val, event.getVersionTag(), forceOnHeap);
+      return re;
+    }
+    else // FOR TX case, we need to create off heap entry if required
+    if (val != null) {
+      if (((re = this.backingRM.getEntryInVM(key)) == null)
+          || (re.isRemoved() && !re.isTombstone())) {
+        boolean shouldCreateOnHeapEntry = !(owner.getOffHeap() && forUpdate); 
+        re = createRegionEntry(key, val, event.getVersionTag(), shouldCreateOnHeapEntry);
+        if(forUpdate) {
+          if (re != null && tx != null) {
+            // put the region entry in backing CHM of AbstractRegionMap so that
+            // it can be locked in basicPut/destroy
+            RegionEntry oldRe = backingRM.putEntryIfAbsent(key, re);
+            if (oldRe != null) {
+              if (re instanceof OffHeapRegionEntry && !oldRe.equals(re)) {
+                ((OffHeapRegionEntry)re).release();
+              }
+              return oldRe;
+            }
+            re.setMarkedForEviction();
+            owner.updateSizeOnCreate(key,
+                owner.calculateRegionEntryValueSize(re));
+            ((AbstractRegionMap)backingRM).incEntryCount(1);
+            ((AbstractRegionMap)backingRM).lruEntryCreate(re);
+          }
+        }
+      }
+    }
+    return re;
+  }
+
+  private RegionEntry createRegionEntry(Object key, Object value, VersionTag tag, boolean forceOnHeap) {
+    RegionEntryFactory ref = backingRM.getEntryFactory();
+    if (forceOnHeap) {
+      ref = ref.makeOnHeap();
+    }
+    value = getValueDuringGII(key, value);
+    RegionEntry re = ref.createEntry(this.owner, key, value);
+    setVersionTag(re, tag);
+    if (re instanceof LRUEntry) {
+      assert backingRM instanceof AbstractLRURegionMap;
+      EnableLRU ccHelper = ((AbstractLRURegionMap)backingRM)._getCCHelper();
+      ((LRUEntry)re).updateEntrySize(ccHelper);
+    }
+    return re;
+  }
+
+  private Object getValueDuringGII(Object key, Object value) {
+    if (owner.getIndexUpdater() != null && !owner.isInitialized()) {
+      return AbstractRegionMap.listOfDeltasCreator.newValue(key, owner, value,
+          null);
+    }
+    return value;
+  }
+
+  private Set createEntriesSet(IteratorType type)
+      throws ForceReattemptException {
+    ConcurrentParallelGatewaySenderQueue q = getHDFSQueue();
+    if (q == null) return Collections.emptySet();
+    HDFSBucketRegionQueue brq = q.getBucketRegionQueue(this.owner.getPartitionedRegion(), owner.getId());
+    return new HDFSEntriesSet(owner, brq, owner.getHoplogOrganizer(), type, refs);
+  }
+
+  private void closeDeadIterators() {
+    Reference<? extends HDFSIterator> weak;
+    while ((weak = refs.poll()) != null) {
+      if (logger.isTraceEnabled() || DEBUG) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Closing weak ref for iterator "
+            + weak.get()));
+      }
+      weak.get().close();
+    }
+  }
+
+  /**
+   * gets the value from event, deserializing if necessary.
+   */
+  private Object getValueFromEvent(PersistedEventImpl ev) {
+    if (ev.getOperation().isDestroy()) {
+      return Token.TOMBSTONE;
+    } else if (ev.getOperation().isInvalidate()) {
+      return Token.INVALID;
+    }
+    return ev.getValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
new file mode 100644
index 0000000..7c5846c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/HDFSRegionMapImpl.java
@@ -0,0 +1,66 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.util.Collection;
+
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.internal.size.SingleObjectSizer;
+
+/**
+ * Implementation of RegionMap that reads data from HDFS.
+ * 
+ * @author sbawaska
+ */
+public class HDFSRegionMapImpl extends AbstractRegionMap implements HDFSRegionMap {
+
+  private final HDFSRegionMapDelegate delegate;
+
+  private static final boolean DEBUG = Boolean.getBoolean("hdfsRegionMap.DEBUG");
+
+  public HDFSRegionMapImpl(LocalRegion owner, Attributes attrs,
+      InternalRegionArguments internalRegionArgs) {
+    super(internalRegionArgs);
+    assert owner instanceof BucketRegion;
+    initialize(owner, attrs, internalRegionArgs, false);
+    this.delegate = new HDFSRegionMapDelegate(owner, attrs, internalRegionArgs, this);
+  }
+
+  @Override
+  public RegionEntry getEntry(Object key) {
+    return delegate.getEntry(key, null);
+  }
+
+  @Override
+  protected RegionEntry getEntry(EntryEventImpl event) {
+    return delegate.getEntry(event);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Collection<RegionEntry> regionEntries() {
+    return delegate.regionEntries();
+  }
+    
+  @Override
+  public int size() {
+    return delegate.size();
+  }
+    
+  @Override
+  public boolean isEmpty() {
+    return delegate.isEmpty();
+  }
+
+  @Override
+  public HDFSRegionMapDelegate getDelegate() {
+    return this.delegate;
+  }
+
+}


Mime
View raw message