geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [48/53] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Mon, 06 Jul 2015 18:15:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/ResourceManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/ResourceManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/ResourceManager.java
index d7ec83c..8627db9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/ResourceManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/control/ResourceManager.java
@@ -16,7 +16,6 @@ import com.gemstone.gemfire.cache.LowMemoryException;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.query.QueryService;
-import com.gemstone.gemfire.cache.server.CacheServer;
 
 /**
  * Provides support for managing resources used by the local
@@ -42,7 +41,7 @@ public interface ResourceManager {
    * @see ResourceManager#setCriticalHeapPercentage(float)
    * @see ResourceManager#getCriticalHeapPercentage()
    */
-  public static final float DEFAULT_CRITICAL_HEAP_PERCENTAGE = 0.0f;
+  public static final float DEFAULT_CRITICAL_PERCENTAGE = 0.0f;
 
   /**
    * The default percent of heap memory at which the VM should begin evicting
@@ -55,8 +54,8 @@ public interface ResourceManager {
    * @see ResourceManager#setEvictionHeapPercentage(float)
    * @see ResourceManager#getEvictionHeapPercentage()
    */
-  public static final float DEFAULT_EVICTION_HEAP_PERCENTAGE = 0.0f;
-  
+  public static final float DEFAULT_EVICTION_PERCENTAGE = 0.0f;
+
   /**
    * Creates a factory for defining and starting {@link RebalanceOperation
    * RebalanceOperations}.
@@ -125,7 +124,7 @@ public interface ResourceManager {
    * @since 6.0
    */
   public void setCriticalHeapPercentage(float heapPercentage);
-
+  
   /**
    * Get the percentage of heap at or above which the cache is considered in
    * danger of becoming inoperable.
@@ -136,6 +135,48 @@ public interface ResourceManager {
    * @since 6.0
    */
   public float getCriticalHeapPercentage();
+  
+  /**
+   * Set the percentage of off-heap at or above which the cache is considered in
+   * danger of becoming inoperable due to out of memory exceptions.
+   *
+   * <p>
+   * Changing this value can cause {@link LowMemoryException} to be thrown from
+   * the following {@link Cache} operations:
+   * <ul>
+   * <li>{@link Region#put(Object, Object)}
+   * <li>{@link Region#put(Object, Object, Object)}
+   * <li>{@link Region#create(Object, Object)}
+   * <li>{@link Region#create(Object, Object, Object)}
+   * <li>{@link Region#putAll(java.util.Map)}
+   * <li>{@linkplain QueryService#createIndex(String, com.gemstone.gemfire.cache.query.IndexType, String, String) index creation}
+   * <li>Execution of {@link Function}s whose {@link Function#optimizeForWrite()} returns true.
+   * </ul>
+   *
+   * <p>
+   * Only one change to this attribute or the eviction off-heap percentage will be
+   * allowed at any given time and its effect will be fully realized before the
+   * next change is allowed.
+   *
+   * @param offHeapPercentage a percentage of the maximum off-heap memory available
+   * @throws IllegalStateException if the ofHeapPercentage value is not >= 0 or
+   * <= 100 or when less than the current eviction off-heap percentage
+   * @see #getCriticalOffHeapPercentage()
+   * @see #getEvictionOffHeapPercentage()
+   * @since 9.0
+   */
+  public void setCriticalOffHeapPercentage(float offHeapPercentage);
+  
+  /**
+   * Get the percentage of off-heap at or above which the cache is considered in
+   * danger of becoming inoperable.
+   *
+   * @return either the current or recently used percentage of the maximum
+   * off-heap memory
+   * @see #setCriticalOffHeapPercentage(float)
+   * @since 9.0
+   */
+  public float getCriticalOffHeapPercentage();
 
   /**
    * Set the percentage of heap at or above which the eviction should begin on
@@ -161,7 +202,7 @@ public interface ResourceManager {
    * @since 6.0
    */
   public void setEvictionHeapPercentage(float heapPercentage);
-
+  
   /**
    * Get the percentage of heap at or above which the eviction should begin on
    * Regions configured for {@linkplain 
@@ -173,4 +214,38 @@ public interface ResourceManager {
    * @since 6.0
    */
   public float getEvictionHeapPercentage();
+
+  /**
+   * Set the percentage of off-heap at or above which the eviction should begin on
+   * Regions configured for {@linkplain 
+   * EvictionAttributes#createLRUHeapAttributes() HeapLRU eviction}.
+   *
+   * <p>
+   * Changing this value may cause eviction to begin immediately.
+   *
+   * <p>
+   * Only one change to this attribute or critical off-heap percentage will be
+   * allowed at any given time and its effect will be fully realized before the
+   * next change is allowed.
+   * 
+   * @param offHeapPercentage a percentage of the maximum off-heap memory available
+   * @throws IllegalStateException if the offHeapPercentage value is not >= 0 or 
+   * <= 100 or when greater than the current critical off-heap percentage.
+   * @see #getEvictionOffHeapPercentage()
+   * @see #getCriticalOffHeapPercentage()
+   * @since 9.0
+   */
+  public void setEvictionOffHeapPercentage(float offHeapPercentage);
+  
+  /**
+   * Get the percentage of off-heap at or above which the eviction should begin on
+   * Regions configured for {@linkplain 
+   * EvictionAttributes#createLRUHeapAttributes() HeapLRU eviction}.
+   *
+   * @return either the current or recently used percentage of the maximum 
+   * off-heap memory
+   * @see #setEvictionOffHeapPercentage(float)
+   * @since 9.0
+   */
+  public float getEvictionOffHeapPercentage();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSEventQueueAttributes.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSEventQueueAttributes.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSEventQueueAttributes.java
new file mode 100644
index 0000000..ef7e863
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSEventQueueAttributes.java
@@ -0,0 +1,72 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs;
+
+/**
+ * {@link HDFSEventQueueAttributes} represents the attributes of the buffer where events are 
+ * accumulated before they are persisted to HDFS  
+ * 
+ * @author Hemant Bhanawat
+ */
+public interface HDFSEventQueueAttributes {
+
+  /**
+   * The Disk store that is required for overflow and persistence
+   * @return    String
+   */
+  public String getDiskStoreName();
+
+  /**
+   * The maximum memory after which the data needs to be overflowed to disk
+   * @return    int
+   */
+  public int getMaximumQueueMemory();
+
+  /**
+   * Represents the size of a batch per bucket that gets delivered
+   * from the HDFS Event Queue to HDFS. A higher value means that 
+   * less number of bigger batches are persisted to HDFS and hence 
+   * big files are created on HDFS. But, bigger batches consume memory.  
+   *  
+   * This value is an indication. Batches per bucket with size less than the specified
+   * are sent to HDFS if interval specified by {@link #getBatchTimeInterval()}
+   * has elapsed.
+   * @return    int
+   */
+  public int getBatchSizeMB();
+  
+  /**
+   * Represents the batch time interval for a HDFS queue. This is the maximum time interval
+   * that can elapse before a batch of data from a bucket is sent to HDFS.
+   *
+   * @return  int
+   */
+  public int getBatchTimeInterval();
+  
+  /**
+   * Represents whether the  HDFS Event Queue is configured to be persistent or non-persistent
+   * @return    boolean
+   */
+  public boolean isPersistent();
+
+  /**
+   * Represents whether or not the writing to the disk is synchronous.
+   * 
+   * @return boolean 
+   */
+  public boolean isDiskSynchronous();
+  
+  /**
+   * Number of threads in VM consuming the events.
+   * default is one.
+   * 
+   * @return int
+   */
+  public int getDispatcherThreads();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSEventQueueAttributesFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSEventQueueAttributesFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSEventQueueAttributesFactory.java
new file mode 100644
index 0000000..fc09b7a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSEventQueueAttributesFactory.java
@@ -0,0 +1,160 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs;
+
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSEventQueueAttributesImpl;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Factory to create {@link HDFSEventQueueAttributes} . 
+ * {@link HDFSEventQueueAttributes} represents the attributes of the buffer where events are 
+ * accumulated before they are persisted to HDFS  
+ * 
+ * @author Hemant Bhanawat
+ */
+public class HDFSEventQueueAttributesFactory {
+
+  /**
+   * The default batch size
+   */
+  public static final int DEFAULT_BATCH_SIZE_MB = 32;
+
+  /**
+   * The default batch time interval in milliseconds
+   */
+  public static final int DEFAULT_BATCH_TIME_INTERVAL_MILLIS = 60000;
+
+  /**
+   * By default, queue is created for a read write HDFS store 
+   */
+  public static final boolean DEFAULT_FOR_WRITEONLY_HDFSSTORE = false;
+  
+  public HDFSEventQueueAttributesFactory() {
+  }
+
+  /**
+   * Copy constructor for {@link HDFSEventQueueAttributes}. The method creates
+   * an instance with same attribute values as of {@code attr}
+   * 
+   * @param attr
+   */
+  public HDFSEventQueueAttributesFactory(HDFSEventQueueAttributes attr) {
+    setDiskStoreName(attr.getDiskStoreName());
+    setMaximumQueueMemory(attr.getMaximumQueueMemory());
+    setBatchTimeInterval(attr.getBatchTimeInterval());
+    setBatchSizeMB(attr.getBatchSizeMB());
+    setPersistent(attr.isPersistent());
+    setDiskSynchronous(attr.isDiskSynchronous());
+    setDispatcherThreads(attr.getDispatcherThreads());
+  }
+  
+  /**
+   * Sets the disk store name for overflow or persistence.
+   * 
+   * @param name
+   */
+  public HDFSEventQueueAttributesFactory setDiskStoreName(String name) {
+    this.diskStoreName = name;
+    return this;
+  }
+  
+  /**
+   * Sets the maximum amount of memory (in MB) for an
+   * HDFS Event Queue.
+   * 
+   * @param memory
+   *          The maximum amount of memory (in MB) for an
+   *          HDFS Event Queue
+   */
+  public HDFSEventQueueAttributesFactory setMaximumQueueMemory(int memory) {
+    this.maximumQueueMemory = memory;
+    return this;
+  }
+  
+  /**
+   * Sets the batch time interval for a HDFS queue. This is the maximum time interval
+   * that can elapse before a batch of data from a bucket is sent to HDFS.
+   *
+   * @param intervalMillis
+   *          int time interval in milliseconds. Default is 60000 ms.
+   */
+  public HDFSEventQueueAttributesFactory setBatchTimeInterval(int intervalMillis){
+    this.batchIntervalMillis = intervalMillis;
+    return this;
+  }
+
+ 
+  /**
+   * Sets the size of a batch per bucket that gets delivered
+   * from the HDFS Event Queue to HDFS. Setting this to a higher value
+   * would mean that less number of bigger batches are persisted to
+   * HDFS and hence big files are created on HDFS. But, bigger batches
+   * would consume memory.  
+   *  
+   * This value is an indication. Batches per bucket with size less than the specified
+   * are sent to HDFS if interval specified by {@link #setBatchTimeInterval(int)}
+   * has elapsed.
+   *  
+   * @param size
+   *          The size of batches sent to HDFS in MB. Default is 32 MB.
+   */
+  public HDFSEventQueueAttributesFactory setBatchSizeMB(int size){
+    this.batchSize = size;
+    return this;
+  }
+  
+  /**
+   * Sets whether the HDFS Event Queue is persistent or not.
+   * 
+   * @param isPersistent
+   *          Whether to enable persistence for an HDFS Event Queue..
+   */
+  public HDFSEventQueueAttributesFactory setPersistent(boolean isPersistent) {
+    this.isPersistenceEnabled = isPersistent;
+    return this;
+  }
+  /**
+   * Sets whether or not the writing to the disk is synchronous.
+   *
+   * @param isSynchronous
+   *          boolean if true indicates synchronous writes
+   */
+  public HDFSEventQueueAttributesFactory setDiskSynchronous(boolean isSynchronous) {
+    this.diskSynchronous = isSynchronous;
+    return this;
+  }
+  
+  /**
+   * Number of threads in VM to consumer the events
+   * default is one.
+   * 
+   * @param dispatcherThreads
+   */
+  public void setDispatcherThreads(int dispatcherThreads) {
+  	this.dispatcherThreads = dispatcherThreads;
+  }
+  
+  /**
+   * Creates the <code>HDFSEventQueueAttributes</code>.    * 
+   * 
+   */
+  public HDFSEventQueueAttributes create() {
+    return new HDFSEventQueueAttributesImpl(this.diskStoreName, this.maximumQueueMemory, 
+        this.batchSize, this.isPersistenceEnabled,  this.batchIntervalMillis, this.diskSynchronous, this.dispatcherThreads);
+  }
+
+  private int maximumQueueMemory = GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY;
+  private int batchIntervalMillis = HDFSEventQueueAttributesFactory.DEFAULT_BATCH_TIME_INTERVAL_MILLIS;
+  private int batchSize = HDFSEventQueueAttributesFactory.DEFAULT_BATCH_SIZE_MB;
+  private boolean diskSynchronous = GatewaySender.DEFAULT_DISK_SYNCHRONOUS; 
+  private boolean isPersistenceEnabled = GatewaySender.DEFAULT_PERSISTENCE_ENABLED;
+  private int dispatcherThreads = GatewaySender.DEFAULT_HDFS_DISPATCHER_THREADS;
+  private String diskStoreName = null;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSIOException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSIOException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSIOException.java
new file mode 100644
index 0000000..87a8ac3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSIOException.java
@@ -0,0 +1,44 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.cache.hdfs;
+
+import com.gemstone.gemfire.GemFireIOException;
+
+/**
+ * Thrown when an error has occurred while attempted to use
+ * the HDFS file system. This error may indicate a failure of the HDFS
+ * system.
+ * 
+ * @author dsmith
+ * 
+ * @since 7.5
+ * 
+ */
+public class HDFSIOException extends GemFireIOException {
+
+  /**
+   * @param message
+   * @param cause
+   */
+  public HDFSIOException(String message, Throwable cause) {
+    super(message, cause);
+    // TODO Auto-generated constructor stub
+  }
+
+  /**
+   * @param message
+   */
+  public HDFSIOException(String message) {
+    super(message);
+    // TODO Auto-generated constructor stub
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStore.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStore.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStore.java
new file mode 100644
index 0000000..f5bd943
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStore.java
@@ -0,0 +1,181 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs;
+
+/**
+ * Provides HDFS storage for one or more regions. The regions in the same HDFS
+ * store will share the same persistence attributes.
+ * <p>
+ * Instances of this interface are created using {@link HDFSStoreFactory#create}
+ * So to create a <code>HDFSStore</code> named <code>myDiskStore</code> do
+ * this:
+ * 
+ * <PRE>
+ * new HDFSStoreFactory().create(&quot;myDiskStore&quot;);
+ * </PRE>
+ * <p>
+ * 
+ * @author Hemant Bhanawat
+ * @author Ashvin Agrawal
+ */
+
+public interface HDFSStore {
+  public static final String DEFAULT_HOME_DIR = "gemfire";
+  public static final float DEFAULT_BLOCK_CACHE_SIZE = 10f;
+  public static final int DEFAULT_MAX_WRITE_ONLY_FILE_SIZE = 256; 
+  public static final int DEFAULT_WRITE_ONLY_FILE_ROLLOVER_INTERVAL = 3600; 
+
+  /**
+   * @return name of HDFSStore provided at while creating the instance
+   */
+  public String getName();
+
+  /**
+   * @return Namenode URL associated with this store
+   */
+  public String getNameNodeURL();
+
+  /**
+   * @return Home directory where regions using this store will be persisted
+   */
+  public String getHomeDir();
+
+  /**
+   * @return hdfs client configuration referred by this store
+   */
+  public String getHDFSClientConfigFile();
+
+  /**
+   * @return the percentage of the heap to use for the block cache in the range
+   * 0 ... 100
+   */
+  public float getBlockCacheSize();
+
+  /**
+   * For write only tables, data is written to a single file until the file 
+   * reaches a size specified by this API or the time 
+   * for file rollover specified by {@link #getFileRolloverInterval()} has passed.
+   * Default is 256 MB.  
+   *   
+   * @return max file size in MB. 
+   */
+  public int getMaxFileSize();
+  
+  /**
+   * For write only tables, data is written to a single file until the file 
+   * reaches a certain size specified by {@link #getMaxFileSize()} or the time 
+   * for file rollover has passed. Default is 3600 seconds. 
+   *   
+   * @return time in seconds after which a file will be rolled over into a new file
+   */
+  public int getFileRolloverInterval();
+  
+  /**
+   * @return true if auto compaction is enabled
+   */
+  public boolean getMinorCompaction();
+
+  /**
+   * Return the HDFSEventQueueAttributes associated with this HDFSStore
+   */
+  public HDFSEventQueueAttributes getHDFSEventQueueAttributes();
+  
+  /**
+   * Destroys this hdfs store. Removes the disk store from the cache. All
+   * regions on this store must be closed.
+   * 
+   */
+  public void destroy();
+
+  /**
+   * @return Instance of compaction configuration associated with this store
+   */
+  public HDFSCompactionConfig getHDFSCompactionConfig();
+  
+  /**
+   * @return instance of mutator object that can be used to alter properties of
+   *         this store
+   */
+  public HDFSStoreMutator createHdfsStoreMutator();
+  
+  /**
+   * Applies new attribute values provided using mutator to this instance
+   * dynmically.
+   * 
+   * @param mutator
+   *          contains the changes
+   * @return hdfsStore reference representing the old store configuration
+   */
+  public HDFSStore alter(HDFSStoreMutator mutator);
+      
+  public static interface HDFSCompactionConfig {
+    public static final String INVALID = "invalid";
+    public static final String SIZE_ORIENTED = "size-oriented";
+    public static final String DEFAULT_STRATEGY = SIZE_ORIENTED;
+    
+    public static final boolean DEFAULT_AUTO_COMPACTION = true;
+    public static final boolean DEFAULT_AUTO_MAJOR_COMPACTION = true;
+    public static final int DEFAULT_MAX_INPUT_FILE_SIZE_MB = 512;
+    public static final int DEFAULT_MAX_INPUT_FILE_COUNT = 10;
+    public static final int DEFAULT_MIN_INPUT_FILE_COUNT = 4;
+    public static final int DEFAULT_MAX_THREADS = 10;
+    
+    public static final int DEFAULT_MAJOR_COMPACTION_MAX_THREADS = 2;
+    public static final int DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS = 720;
+    public static final int DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS = 30;
+    
+    /**
+     * @return name of the compaction strategy configured for this store
+     */
+    public String getCompactionStrategy();
+
+    /**
+     * @return size threshold (in MB). A file larger than this size will not be
+     *         considered for compaction
+     */
+    public int getMaxInputFileSizeMB();
+
+    /**
+     * @return minimum count threshold. Compaction cycle will commence if the
+     *         number of files to be compacted is more than this number
+     */
+    public int getMinInputFileCount();
+
+    /**
+     * @return maximum count threshold.  Compaction cycle will not include more
+     *          files than the maximum
+     */
+    public int getMaxInputFileCount();
+
+    /**
+     * @return maximum number of threads executing minor compaction
+     */
+    public int getMaxThreads();
+
+    /**
+     * @return true if auto major compaction is enabled
+     */
+    public boolean getAutoMajorCompaction();
+
+    /**
+     * @return interval configuration that guides major compaction frequency
+     */
+    public int getMajorCompactionIntervalMins();
+
+    /**
+     * @return maximum number of threads executing major compaction
+     */
+    public int getMajorCompactionMaxThreads();
+    
+    /**
+     * @return interval configuration that guides deletion of old files
+     */
+    public int getOldFilesCleanupIntervalMins();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStoreFactory.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStoreFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStoreFactory.java
new file mode 100644
index 0000000..516d2aa
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStoreFactory.java
@@ -0,0 +1,195 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore.HDFSCompactionConfig;
+
+/**
+ * Factory for creating instances of {@link HDFSStore}. To get an instance of
+ * this factory call {@link Cache#createHDFSStoreFactory}.
+ * <P>
+ * To use this factory configure it with the <code>set</code> methods and then
+ * call {@link #create} to produce a HDFS store instance.
+ * 
+ * @author Hemant Bhanawat
+ * @author Ashvin Agrawal
+ */
+public interface HDFSStoreFactory {
+
+  /**
+   * @param name
+   *          name of HDFSStore provided at while creating the instance
+   */
+  public HDFSStoreFactory setName(String name);
+
+  /**
+   * @param url
+   *          Namenode URL associated with this store
+   */
+  public HDFSStoreFactory setNameNodeURL(String url);
+
+  /**
+   * @param dir
+   *          Home directory where regions using this store will be persisted
+   */
+  public HDFSStoreFactory setHomeDir(String dir);
+
+  /**
+   * @param file
+   *          hdfs client configuration referred by this store
+   */
+  public HDFSStoreFactory setHDFSClientConfigFile(String file);
+
+  /**
+   * @param config
+   *          Instance of compaction configuration associated with this store
+   */
+  public HDFSStoreFactory setHDFSCompactionConfig(HDFSCompactionConfig config);
+  
+  /**
+   * @param percentage
+   *          Size of the block cache as a percentage of the heap in the range
+   *          0 ... 100 
+   */
+  public HDFSStoreFactory setBlockCacheSize(float percentage);
+  
+  /**
+   * Sets the HDFS event queue attributes
+   * This causes the store to use the {@link HDFSEventQueueAttributes}.
+   * @param hdfsEventQueueAttrs the attributes of the HDFS Event queue
+   * @return a reference to this RegionFactory object
+   * 
+   */
+  public HDFSStoreFactory setHDFSEventQueueAttributes(HDFSEventQueueAttributes hdfsEventQueueAttrs);
+  
+  /**
+   * For write only tables, data is written to a single file until the file 
+   * reaches a size specified by this API or the time 
+   * for file rollover specified by {@link #setFileRolloverInterval(int)} has passed.  
+   * Default is 256 MB. 
+   * 
+   * @param maxFileSize max file size in MB
+   */
+  public HDFSStoreFactory setMaxFileSize(int maxFileSize);
+  
+  /**
+   * For write only tables, data is written to a single file until the file 
+   * reaches a certain size specified by {@link #setMaxFileSize(int)} or the time 
+   * for file rollover has passed. Default is 3600 seconds. 
+   * 
+   * @param rolloverIntervalInSecs time in seconds after which a file will be rolled over into a new file
+   */
+  public HDFSStoreFactory setFileRolloverInterval(int rolloverIntervalInSecs);
+  
+  /**
+   * @param auto
+   *          true if auto compaction is enabled
+   */
+  public HDFSStoreFactory setMinorCompaction(boolean auto);
+
+  /**
+   * @param strategy
+   *          name of the compaction strategy or null for letting system choose
+   *          and apply default compaction strategy
+   * @return instance of {@link HDFSCompactionConfigFactory}
+   */
+  public HDFSCompactionConfigFactory createCompactionConfigFactory(String strategy);
+
+  public static interface HDFSCompactionConfigFactory {
+
+    /**
+     * @param size
+     *          size threshold (in MB). A file larger than this size will not be
+     *          considered for compaction
+     */
+    public HDFSCompactionConfigFactory setMaxInputFileSizeMB(int size);
+
+    /**
+     * @param count
+     *          minimum count threshold. Compaction cycle will commence if the
+     *          number of files to be compacted is more than this number
+     */
+    public HDFSCompactionConfigFactory setMinInputFileCount(int count);
+
+    /**
+     * @param count
+     *          maximum count threshold.  Compaction cycle will not include more
+     *          files than the maximum
+     */
+    public HDFSCompactionConfigFactory setMaxInputFileCount(int count);
+
+    /**
+     * @param count
+     *          maximum number of threads executing minor compaction. Count must
+     *          be greater than 0
+     */
+    public HDFSCompactionConfigFactory setMaxThreads(int count);
+
+    /**
+     * @param auto
+     *          true if auto major compaction is enabled
+     */
+    public HDFSCompactionConfigFactory setAutoMajorCompaction(boolean auto);
+
+    /**
+     * @param interval
+     *          interval configuration that guides major compaction frequency
+     */
+    public HDFSCompactionConfigFactory setMajorCompactionIntervalMins(int interval);
+
+    /**
+     * @param count
+     *          maximum number of threads executing major compaction. Count must
+     *          be greater than 0
+     */
+    public HDFSCompactionConfigFactory setMajorCompactionMaxThreads(int count);
+    
+    /**
+     * @param interval
+     *          interval configuration that guides deletion of old files
+     */
+    public HDFSCompactionConfigFactory setOldFilesCleanupIntervalMins(int interval);
+    
+    /**
+     * Create a {@link HDFSCompactionConfig}. The returned instance will have
+     * the same configuration as that this factory.
+     * 
+     * @return the newly created {@link HDFSCompactionConfig}
+     * @throws GemFireConfigException
+     *           if the cache xml is invalid
+     */
+    public HDFSCompactionConfig create() throws GemFireConfigException;
+    
+    /**
+     * @return A {@link HDFSCompactionConfig} view of this factory
+     * @throws GemFireConfigException
+     */
+    public HDFSCompactionConfig getConfigView();
+  }
+
+  /**
+   * Create a new HDFS store. The returned HDFS store's configuration will be
+   * the same as this factory's configuration.
+   * 
+   * @param name
+   *          the name of the HDFSStore
+   * @return the newly created HDFSStore.
+   * @throws GemFireConfigException
+   *           if the cache xml is invalid
+   * @throws StoreExistsException
+   *           if another instance of {@link HDFSStore} with the same exists
+   */
+  public HDFSStore create(String name) throws GemFireConfigException,
+      StoreExistsException;
+
+  // TODO this is the only non-factory instance getter in this class
+  HDFSEventQueueAttributes getHDFSEventQueueAttributes();
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStoreMutator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStoreMutator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStoreMutator.java
new file mode 100644
index 0000000..47b1708
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/HDFSStoreMutator.java
@@ -0,0 +1,199 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStore.HDFSCompactionConfig;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory.HDFSCompactionConfigFactory;
+
+public interface HDFSStoreMutator {
+  /**
+   * {@link HDFSStoreFactory#setMaxFileSize(int)}
+   */
+  public HDFSStoreMutator setMaxFileSize(int maxFileSize);
+
+  /**
+   * {@link HDFSStore#getMaxFileSize()}
+   * 
+   * @return value to be used when mutator is executed on hdfsStore. -1 if not
+   *         set
+   */
+  public int getMaxFileSize();
+  
+  /**
+   * {@link HDFSStoreFactory#setFileRolloverInterval(int)}
+   */
+  public HDFSStoreMutator setFileRolloverInterval(int rolloverIntervalInSecs);
+  
+  /**
+   * {@link HDFSStore#getFileRolloverInterval()}
+   * 
+   * @return value to be used when mutator is executed on hdfsStore. -1 if not
+   *         set
+   */
+  public int getFileRolloverInterval();
+  
+  /**
+   * {@link HDFSStore#getMinorCompaction()}
+   * 
+   * @return value to be used when mutator is executed on hdfsStore. null if
+   *         not set
+   */
+  public Boolean getMinorCompaction();
+
+  /**
+   * {@link HDFSStoreFactory#setMinorCompaction(boolean)}
+   */
+  public HDFSCompactionConfigMutator setMinorCompaction(boolean auto);
+  
+  /**
+   * Reuturns mutator for compaction configuration of hdfs store
+   * @return instance of {@link HDFSCompactionConfigMutator}
+   */
+  public HDFSCompactionConfigMutator getCompactionConfigMutator();
+
+  /**
+   * Reuturns mutator for hdfs event queue of hdfs store
+   * @return instance of {@link HDFSEventQueueAttributesMutator}
+   */
+  public HDFSEventQueueAttributesMutator getHDFSEventQueueAttributesMutator();
+  
+  public static interface HDFSEventQueueAttributesMutator {
+    /**
+     * {@link HDFSEventQueueAttributesFactory#setBatchSizeMB(int)}
+     */
+    public HDFSEventQueueAttributesMutator setBatchSizeMB(int size);
+    
+    /**
+     * {@link HDFSEventQueueAttributes#getBatchSizeMB()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if not
+     *         set
+     */
+    public int getBatchSizeMB();
+    
+    /**
+     * {@link HDFSEventQueueAttributesFactory#setBatchTimeInterval(int)}
+     */
+    public HDFSEventQueueAttributesMutator setBatchTimeInterval(int interval);
+    
+    /**
+     * {@link HDFSEventQueueAttributes#getBatchTimeInterval()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if not
+     *         set
+     */
+    public int getBatchTimeInterval();
+  }
+  
+  public static interface HDFSCompactionConfigMutator {
+    /**
+     * {@link HDFSCompactionConfigFactory#setMaxInputFileSizeMB(int)}
+     */
+    public HDFSCompactionConfigMutator setMaxInputFileSizeMB(int size);
+    
+    /**
+     * {@link HDFSCompactionConfig#getMaxInputFileSizeMB()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if
+     *         not set
+     */
+    public int getMaxInputFileSizeMB();
+
+    /**
+     * {@link HDFSCompactionConfigFactory#setMinInputFileCount(int)}
+     */
+    public HDFSCompactionConfigMutator setMinInputFileCount(int count);
+    
+    /**
+     * {@link HDFSCompactionConfig#getMinInputFileCount()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if
+     *         not set
+     */
+    public int getMinInputFileCount();
+
+    /**
+     * {@link HDFSCompactionConfigFactory#setMaxInputFileCount(int)}
+     */
+    public HDFSCompactionConfigMutator setMaxInputFileCount(int count);
+    
+    /**
+     * {@link HDFSCompactionConfig#getMaxInputFileCount()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if
+     *         not set
+     */
+    public int getMaxInputFileCount();
+
+    /**
+     * {@link HDFSCompactionConfigFactory#setMaxThreads(int)}
+     */
+    public HDFSCompactionConfigMutator setMaxThreads(int count);
+    
+    /**
+     * {@link HDFSCompactionConfig#getMaxThreads()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if
+     *         not set
+     */
+    public int getMaxThreads();
+
+    /**
+     * {@link HDFSCompactionConfigFactory#setAutoMajorCompaction(boolean)}
+     */
+    public HDFSCompactionConfigMutator setAutoMajorCompaction(boolean auto);
+    
+    /**
+     * {@link HDFSCompactionConfig#getAutoMajorCompaction()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. null if
+     *         not set
+     */
+    public Boolean getAutoMajorCompaction();
+
+    /**
+     * {@link HDFSCompactionConfigFactory#setMajorCompactionIntervalMins(int)}
+     */
+    public HDFSCompactionConfigMutator setMajorCompactionIntervalMins(int interval);
+    
+    /**
+     * {@link HDFSCompactionConfig#getMajorCompactionIntervalMins()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if
+     *         not set
+     */
+    public int getMajorCompactionIntervalMins();
+
+    /**
+     * {@link HDFSCompactionConfigFactory#setMajorCompactionMaxThreads(int)}
+     */
+    public HDFSCompactionConfigMutator setMajorCompactionMaxThreads(int count);
+    
+    /**
+     * {@link HDFSCompactionConfig#getMajorCompactionMaxThreads()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if
+     *         not set
+     */
+    public int getMajorCompactionMaxThreads();
+    
+    /**
+     * {@link HDFSCompactionConfigFactory#setOldFilesCleanupIntervalMins(int)}
+     */
+    public HDFSCompactionConfigMutator setOldFilesCleanupIntervalMins(int interval);
+    
+    /**
+     * {@link HDFSCompactionConfig#getOldFilesCleanupIntervalMins()}
+     * 
+     * @return value to be used when mutator is executed on hdfsStore. -1 if
+     *         not set
+     */
+    public int getOldFilesCleanupIntervalMins();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/StoreExistsException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/StoreExistsException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/StoreExistsException.java
new file mode 100644
index 0000000..9239ed2
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/StoreExistsException.java
@@ -0,0 +1,24 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs;
+
+import com.gemstone.gemfire.cache.CacheException;
+
+/**
+ * Thrown when attempting to create a {@link HDFSStore} if one already exists.
+ * 
+ * @author Ashvin Agrawal
+ */
+public class StoreExistsException extends CacheException {
+  private static final long serialVersionUID = 1L;
+
+  public StoreExistsException(String storeName) {
+    super(storeName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FailureTracker.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FailureTracker.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FailureTracker.java
new file mode 100644
index 0000000..802e2ba
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FailureTracker.java
@@ -0,0 +1,88 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * Class for tracking failures and backing off if necessary.
+ * @author dsmith
+ *
+ */
+public class FailureTracker  extends ThreadLocal<MutableInt> {
+  private final long minTime;
+  private final long maxTime;
+  private final float rate;
+  private final FailureCount waitTime = new FailureCount();
+  
+  
+  /**
+   * @param minTime the minimum wait time after a failure in ms.
+   * @param maxTime the maximum wait tim after a failure, in ms.
+   * @param rate the rate of growth of the failures
+   */
+  public FailureTracker(long minTime, long maxTime, float rate) {
+    this.minTime = minTime;
+    this.maxTime = maxTime;
+    this.rate = rate;
+  }
+  
+  /**
+   * Wait for the current wait time.
+   */
+  public void sleepIfRetry() throws InterruptedException {
+      Thread.sleep(waitTime());
+  }
+
+  /**
+   * @return the wait time = rate^(num_failures) * minTime
+   */
+  public long waitTime() {
+    return waitTime.get().longValue();
+  }
+  
+  public void record(boolean success) {
+    if(success) {
+      success();
+    } else {
+      failure();
+    }
+    
+  }
+  
+  public void success() {
+    waitTime.get().setValue(0);
+    
+  }
+  public void failure() {
+    long current = waitTime.get().intValue();
+    if(current == 0) {
+      current=minTime;
+    }
+    else if(current < maxTime) {
+      current = (long) (current * rate);
+    }
+    waitTime.get().setValue(Math.min(current, maxTime));
+  }
+
+
+  private static class FailureCount extends ThreadLocal<MutableLong> {
+
+    @Override
+    protected MutableLong initialValue() {
+      return new MutableLong();
+    }
+  }
+
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
new file mode 100644
index 0000000..f0801fe
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/FlushObserver.java
@@ -0,0 +1,38 @@
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Observes and reacts to flush events.
+ * 
+ * @author bakera
+ */
+public interface FlushObserver {
+  public interface AsyncFlushResult {
+    /**
+     * Waits for the most recently enqueued batch to completely flush.
+     * 
+     * @param time the time to wait
+     * @param unit the time unit
+     * @return true if flushed before the timeout
+     * @throws InterruptedException interrupted while waiting
+     */
+    public boolean waitForFlush(long time, TimeUnit unit) throws InterruptedException;
+  }
+
+  /**
+   * Returns true when the queued events should be drained from the queue
+   * immediately.
+   * 
+   * @return true if draining
+   */
+  boolean shouldDrainImmediately();
+  
+  /**
+   * Begins the flushing the queued events.
+   * 
+   * @return the async result
+   */
+  public AsyncFlushResult flush();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
new file mode 100644
index 0000000..d570c07
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSBucketRegionQueue.java
@@ -0,0 +1,1224 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.TimeoutException;
+import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventBuffer.BufferIterator;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.RegionEventImpl;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.CursorIterator;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * This class holds the sorted list required for HDFS. 
+ * 
+ * @author Hemant Bhanawat
+ * 
+ */
+public class HDFSBucketRegionQueue extends AbstractBucketRegionQueue {
+     private static final boolean VERBOSE = Boolean.getBoolean("hdfsBucketRegionQueue.VERBOSE");
+     private final int batchSize;
+     volatile HDFSEventQueue hdfsEventQueue = null;
+     
+     // set before releasing the primary lock. 
+     private final AtomicBoolean releasingPrimaryLock = new AtomicBoolean(true);
+     
+     // This is used to keep track of the current size of the queue in bytes. 
+     final AtomicLong queueSizeInBytes =  new AtomicLong(0);
+     public boolean isBucketSorted = true;
+     /**
+     * @param regionName
+     * @param attrs
+     * @param parentRegion
+     * @param cache
+     * @param internalRegionArgs
+     */
+    public HDFSBucketRegionQueue(String regionName, RegionAttributes attrs,
+        LocalRegion parentRegion, GemFireCacheImpl cache,
+        InternalRegionArguments internalRegionArgs) {
+      super(regionName, attrs, parentRegion, cache, internalRegionArgs);
+      
+      this.isBucketSorted = internalRegionArgs.getPartitionedRegion().getParallelGatewaySender().getBucketSorted();
+      if (isBucketSorted)
+        hdfsEventQueue = new MultiRegionSortedQueue();
+      else
+        hdfsEventQueue = new EventQueue();
+      
+      batchSize = internalRegionArgs.getPartitionedRegion().
+          getParallelGatewaySender().getBatchSize() * 1024 *1024;
+      this.keySet();
+    }
+    @Override
+    protected void initialize(InputStream snapshotInputStream,
+        InternalDistributedMember imageTarget,
+        InternalRegionArguments internalRegionArgs) throws TimeoutException,
+        IOException, ClassNotFoundException {
+
+      super.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
+
+      loadEventsFromTempQueue();
+      
+      this.initialized = true;
+      notifyEventProcessor();
+    }
+
+    private TreeSet<Long> createSkipListFromMap(Set keySet) {
+      TreeSet<Long> sortedKeys = null;
+      if (!hdfsEventQueue.isEmpty())
+        return sortedKeys;
+      
+      if (!keySet.isEmpty()) {
+        sortedKeys = new TreeSet<Long>(keySet);
+        if (!sortedKeys.isEmpty())
+        {
+          for (Long key : sortedKeys) {
+            if (this.isBucketSorted) {
+              Object hdfsevent = getNoLRU(key, true, false, false);
+              if (hdfsevent == null) { // this can happen when tombstones are recovered. 
+                if (logger.isDebugEnabled() || VERBOSE) {
+                  logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding key " + key + ", no event recovered"));
+                }
+              } else {
+                int eventSize = ((HDFSGatewayEventImpl)hdfsevent).
+                    getSizeOnHDFSInBytes(!this.isBucketSorted);
+                hdfsEventQueue.put(key,(HDFSGatewayEventImpl)hdfsevent, eventSize );
+                queueSizeInBytes.getAndAdd(eventSize);
+              }
+            }
+            else {
+              Object hdfsevent = getNoLRU(key, true, false, false);
+              if (hdfsevent != null) { // hdfs event can be null when tombstones are recovered.
+                queueSizeInBytes.getAndAdd(((HDFSGatewayEventImpl)hdfsevent).
+                    getSizeOnHDFSInBytes(!this.isBucketSorted));
+              }
+              ((EventQueue)hdfsEventQueue).put(key);
+            }
+              
+          }
+          getEventSeqNum().setIfGreater(sortedKeys.last());
+        }
+      
+      }
+      if (logger.isDebugEnabled() || VERBOSE) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+            "For bucket " + getId() + ", total keys recovered are : " + keySet.size()
+                + " and the seqNo is " + getEventSeqNum()));
+      }
+      return sortedKeys;
+    }
+    
+    @Override
+    protected void basicClear(RegionEventImpl ev) {
+      super.basicClear(ev);
+      queueSizeInBytes.set(0);
+      if ( this.getBucketAdvisor().isPrimary()) {
+        this.hdfsEventQueue.clear();
+      }
+    }
+    
+    protected void clearQueues(){
+      queueSizeInBytes.set(0);
+      if ( this.getBucketAdvisor().isPrimary()) {
+        this.hdfsEventQueue.clear();
+      }
+    }
+   
+    @Override
+    protected void basicDestroy(final EntryEventImpl event,
+        final boolean cacheWrite, Object expectedOldValue)
+        throws EntryNotFoundException, CacheWriterException, TimeoutException {
+      super.basicDestroy(event, cacheWrite, expectedOldValue);
+    }
+    
+    ArrayList peekABatch() {
+      ArrayList result = new ArrayList();
+      hdfsEventQueue.peek(result);
+      return result;
+    }
+    
+    @Override
+    protected void addToEventQueue(Object key, boolean didPut, EntryEventImpl event, int sizeOfHDFSEvent) {
+      if (didPut &&  this.getBucketAdvisor().isPrimary()) {
+        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)event.getValue();
+        if (sizeOfHDFSEvent == -1) { 
+          try {
+            // the size is calculated only on primary before event is inserted in the bucket. 
+            // If this node became primary after size was calculated, sizeOfHDFSEvent will be -1. 
+            // Try to get the size. #50016
+            sizeOfHDFSEvent = hdfsEvent.getSizeOnHDFSInBytes(!((HDFSBucketRegionQueue)this).isBucketSorted);
+          } catch (Throwable e) {
+           //   Ignore any exception while fetching the size.
+            sizeOfHDFSEvent = 0;
+          }
+        }
+        queueSizeInBytes.getAndAdd(sizeOfHDFSEvent);
+        if (this.initialized) {
+          Long longKey = (Long)key;
+          this.hdfsEventQueue.put(longKey, hdfsEvent, sizeOfHDFSEvent);
+        }
+        if (logger.isDebugEnabled()) {
+          logger.debug("Put successfully in the queue : " + hdfsEvent + " . Queue initialized: " 
+              + this.initialized);
+        }
+      }
+    }
+    
+    /**
+     * It removes the first key from the queue.
+     * 
+     * @return Returns the key for which value was destroyed.
+     * @throws ForceReattemptException
+     */
+    public Long remove() throws ForceReattemptException {
+      throw new UnsupportedOperationException("Individual entries cannot be removed in a HDFSBucketRegionQueue");
+    }
+    
+    /**
+     * It removes the first key from the queue.
+     * 
+     * @return Returns the value.
+     * @throws InterruptedException
+     * @throws ForceReattemptException
+     */
+    public Object take() throws InterruptedException, ForceReattemptException {
+      throw new UnsupportedOperationException("take() cannot be called for individual entries in a HDFSBucketRegionQueue");
+    }
+    
+    public void destroyKeys(ArrayList<HDFSGatewayEventImpl>  listToDestroy) {
+      
+      HashSet<Long> removedSeqNums = new HashSet<Long>();
+      
+      for (int index =0; index < listToDestroy.size(); index++) {
+        HDFSGatewayEventImpl entry = null;
+        if (this.isBucketSorted) {
+          // Remove the events in reverse order so that the events with higher sequence number
+          // are removed last to ensure consistency.
+          entry = listToDestroy.get(listToDestroy.size() - index -1);
+        } else {
+          entry = listToDestroy.get(index);
+        }
+       
+        try {
+          if (this.logger.isDebugEnabled())
+            logger.debug("destroying primary key " + entry.getShadowKey() + " bucket id: " + this.getId());
+          // removed from peeked list
+          boolean deleted = this.hdfsEventQueue.remove(entry);
+          if (deleted) {
+            // this is an onheap event so a call to size should be ok. 
+            long entrySize = entry.getSizeOnHDFSInBytes(!this.isBucketSorted);
+            destroyKey(entry.getShadowKey());
+            long queueSize = queueSizeInBytes.getAndAdd(-1*entrySize);
+            if (queueSize < 0) {
+              // In HA scenarios, queueSizeInBytes can go awry.
+              queueSizeInBytes.compareAndSet(queueSize, 0);
+            }
+            removedSeqNums.add(entry.getShadowKey());
+          }
+        }catch (ForceReattemptException e) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got ForceReattemptException for " + this
+            + " for bucket = " + this.getId());
+          }
+        }
+        catch(EntryNotFoundException e) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("ParallelGatewaySenderQueue#remove->HDFSBucketRegionQueue#destroyKeys: " + "Got EntryNotFoundException for " + this
+              + " for bucket = " + this.getId() + " and key " + entry.getShadowKey());
+          }
+        } finally {
+          entry.release();
+        }
+      }
+      
+      if (this.getBucketAdvisor().isPrimary()) {
+        hdfsEventQueue.handleRemainingElements(removedSeqNums);
+      }
+    }
+
+    
+    public boolean isReadyForPeek() {
+      return !this.isEmpty() && !this.hdfsEventQueue.isEmpty() && getBucketAdvisor().isPrimary();
+    }
+
+    public long getLastPeekTimeInMillis() {
+      return hdfsEventQueue.getLastPeekTimeInMillis();
+    }
+    
+    public long getQueueSizeInBytes() {
+      return queueSizeInBytes.get();
+    }
+    /*
+     * This function is called when the bucket takes as the role of primary.
+     */
+    @Override
+    public void beforeAcquiringPrimaryState() {
+      
+      queueSizeInBytes.set(0);
+      if (logger.isDebugEnabled() || VERBOSE) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+            "This node has become primary for bucket " + this.getId()  +". " +
+            		"Creating sorted data structure for the async queue."));
+      }
+      releasingPrimaryLock.set(false);
+      
+      // clear the hdfs queue in case it has already elements left if it was a primary
+      // in the past
+      hdfsEventQueue.clear();
+      if (isBucketSorted)
+        hdfsEventQueue = new MultiRegionSortedQueue();
+      else
+        hdfsEventQueue = new EventQueue();
+      
+      TreeSet<Long> sortedKeys = createSkipListFromMap(this.keySet());
+      
+      if (sortedKeys != null && sortedKeys.size() > 0) {    
+        // Mark the events equal to batch size as duplicate. 
+        // calculate the batch size based on the number of events currently in the queue
+        // This is an approximation. 
+        long batchSizeMB =  this.getPartitionedRegion().getParallelGatewaySender().getBatchSize();
+        long batchSizeInBytes = batchSizeMB*1024*1024;
+        long totalBucketSize = queueSizeInBytes.get();
+        totalBucketSize = totalBucketSize >  0 ? totalBucketSize: 1;
+        long totalEntriesInBucket = this.entryCount();
+        totalEntriesInBucket =  totalEntriesInBucket > 0 ? totalEntriesInBucket: 1;
+        
+        long perEntryApproxSize = totalBucketSize/totalEntriesInBucket;
+        perEntryApproxSize = perEntryApproxSize >  0 ? perEntryApproxSize: 1;
+        
+        int batchSize  = (int)(batchSizeInBytes/perEntryApproxSize);
+        
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG,
+              "Calculating batch size " +  " batchSizeMB: " + batchSizeMB + " batchSizeInBytes: " + batchSizeInBytes + 
+              " totalBucketSize: " + totalBucketSize + " totalEntriesInBucket: " + totalEntriesInBucket + 
+              " perEntryApproxSize: " + perEntryApproxSize + " batchSize: " + batchSize ));
+        }
+        
+        markEventsAsDuplicate(batchSize, sortedKeys.iterator());
+      }
+    }
+    
+    @Override
+    public void beforeReleasingPrimaryLockDuringDemotion() {
+      queueSizeInBytes.set(0);
+      releasingPrimaryLock.set(true);
+      // release memory in case of a clean transition
+      hdfsEventQueue.clear();
+    }
+
+    /**
+     * This function searches the skip list and the peeked skip list for a given region key
+     * @param region 
+     * 
+     */
+    public HDFSGatewayEventImpl getObjectForRegionKey(Region region, byte[] regionKey) {
+      // get can only be called for a sorted queue.
+      // Calling get with Long.MIN_VALUE seq number ensures that 
+      // the list will return the key which has highest seq number. 
+      return hdfsEventQueue.get(region, regionKey, Long.MIN_VALUE);
+    }
+
+    /**
+     * Get an iterator on the queue, passing in the partitioned region
+     * we want to iterate over the events from.
+     */
+    public SortedEventQueueIterator iterator(Region region) {
+      return hdfsEventQueue.iterator(region);
+    }
+
+    public long totalEntries() {
+      return entryCount();
+    }
+    
+    /**
+     * Ideally this function should be called from a thread periodically to 
+     * rollover the skip list when it is above a certain size. 
+     * 
+     */
+    public void rolloverSkipList() {
+      // rollover can only be called for a sorted queue.
+      hdfsEventQueue.rollover();
+    }
+    
+    public boolean shouldDrainImmediately() {
+      return hdfsEventQueue.getFlushObserver().shouldDrainImmediately();
+    }
+
+    public AsyncFlushResult flush() {
+      if (logger.isDebugEnabled() || VERBOSE) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flush requested"));
+      }
+      return hdfsEventQueue.getFlushObserver().flush();
+    }
+    
+    /**
+     * This class keeps the regionkey and seqNum. The objects of this class are 
+     * kept in a concurrent skip list. The order of elements is decided based on the 
+     * comparison of regionKey + seqNum. This kind of comparison allows us to keep 
+     * multiple updates on a single key (becaus it has different seq Num)
+     */
+    static class KeyToSeqNumObject implements Comparable<KeyToSeqNumObject>
+    {
+      private byte[] regionkey; 
+      private Long seqNum;
+      
+      KeyToSeqNumObject(byte[] regionkey, Long seqNum){
+        this.regionkey = regionkey;
+        this.seqNum = seqNum;
+      }
+      
+      /**
+       * This function compares the key first. If the keys are same then seq num is compared.
+       * This function is a key function because it ensures that the skiplists hold the elements 
+       * in an order we want it to and for multiple updates on key fetches the most recent one 
+       * Currently we are comparing seq numbers but we will have to change it to version stamps. 
+       * * List can have elements in following sequence 
+       * K1 Value1 version : 1 
+       * K2 Value2a version : 2
+       * K2 Value2 version : 1
+       * K3 Value3 version : 1
+       * For a get on K2, it should retunr K2 Value 2a.  
+       */
+      @Override
+      public int compareTo(KeyToSeqNumObject o) {
+        int compareOutput = ByteComparator.compareBytes(
+            this.getRegionkey(), 0, this.getRegionkey().length, o.getRegionkey(), 0, o.getRegionkey().length);
+        if (compareOutput != 0 )
+          return compareOutput;
+        
+        // If the keys are same and this is an object with dummy seq number, 
+        // return -1. This will ensure that ceiling function on a skip list will enumerate 
+        // all the entries and return the last one.   
+        if (this.getSeqNum() == Long.MIN_VALUE) 
+          return -1;
+        
+        // this is to just maintain consistency with the above statement. 
+        if (o.getSeqNum() == Long.MIN_VALUE) 
+          return 1;
+       
+        // minus operator pushes entries with lower seq number in the end so that 
+        // the order as mentioned above is maintained. And the entries with 
+        // higher version are fetched on a get. 
+        return this.getSeqNum().compareTo(o.getSeqNum()) * -1;  
+      }
+      
+      @Override
+      public boolean equals (Object o) {
+    	KeyToSeqNumObject obj = null;
+      	if (o == null)
+    		return false; 
+    	
+    	if (o instanceof KeyToSeqNumObject) 
+    		obj = (KeyToSeqNumObject)o;
+    	else
+    		return false;
+    	
+    	if (this.compareTo(obj) != 0)
+          return false;
+        else
+          return true;
+      }
+      
+      public int hashCode() {
+    	assert false : "hashCode not designed";
+    	return -1;
+      }
+      
+      byte[] getRegionkey() {
+        return regionkey;
+      }
+
+      public Long getSeqNum() {
+        return seqNum;
+      }
+
+      public void setSeqNum(Long seqNum) {
+        this.seqNum = seqNum;
+      }
+      
+      @Override
+      public String toString() {
+        return EntryEventImpl.deserialize(regionkey) + " {" + seqNum + "}";
+      }
+    }
+    
+    public interface HDFSEventQueue {
+      FlushObserver getFlushObserver();
+      
+      /** puts an event in the queue. */ 
+      public void put (long key, HDFSGatewayEventImpl event, int size);
+      
+      public SortedEventQueueIterator iterator(Region region);
+
+      public void rollover();
+
+      /** Get a value from the queue
+       * @throws IllegalStateException if this queue doesn't support get  
+       **/
+      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+          long minValue);
+
+      // Peeks a batch of size specified by batchSize
+      // And add the results to the array list
+      public void peek(ArrayList result);
+      
+      // Checks if there are elements to bee peeked 
+      public boolean isEmpty();
+      
+      // removes the event if it has already been peeked. 
+      public boolean remove(HDFSGatewayEventImpl event);
+      
+      // take care of the elements that were peeked 
+      // but were not removed after a batch dispatch 
+      // due to concurrency effects. 
+      public void handleRemainingElements(HashSet<Long> listToBeremoved);
+      
+      // clears the list. 
+      public void clear();
+      
+      // get the time when the last peek was done. 
+      public long getLastPeekTimeInMillis();
+    }
+    
+    class MultiRegionSortedQueue implements HDFSEventQueue {
+      ConcurrentMap<String, SortedEventQueue> regionToEventQueue = new ConcurrentHashMap<String, SortedEventQueue>();
+      volatile Set<SortedEventQueue> peekedQueues = Collections.EMPTY_SET;
+      private final AtomicBoolean peeking = new AtomicBoolean(false);
+      long lastPeekTimeInMillis = System.currentTimeMillis();
+      
+      private final FlushObserver flush = new FlushObserver() {
+        @Override
+        public AsyncFlushResult flush() {
+          final Set<AsyncFlushResult> flushes = new HashSet<AsyncFlushResult>();
+          for (SortedEventQueue queue : regionToEventQueue.values()) {
+            flushes.add(queue.getFlushObserver().flush());
+          }
+          
+          return new AsyncFlushResult() {
+            @Override
+            public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
+              long start = System.nanoTime();
+              long remaining = unit.toNanos(timeout);
+              for (AsyncFlushResult afr : flushes) {
+                if (!afr.waitForFlush(remaining, TimeUnit.NANOSECONDS)) {
+                  return false;
+                }
+                remaining -= (System.nanoTime() - start);
+              }
+              return true;
+            }
+          };
+        }
+        
+        @Override
+        public boolean shouldDrainImmediately() {
+          for (SortedEventQueue queue : regionToEventQueue.values()) {
+            if (queue.getFlushObserver().shouldDrainImmediately()) {
+              return true;
+            }
+          }
+          return false;
+        }
+      };
+      
+      @Override
+      public FlushObserver getFlushObserver() {
+        return flush;
+      }
+
+      @Override
+      public void put(long key, HDFSGatewayEventImpl event, int size) {
+        
+        String region = event.getRegionPath();
+        SortedEventQueue regionQueue = regionToEventQueue.get(region);
+        if(regionQueue == null) {
+          regionToEventQueue.putIfAbsent(region, new SortedEventQueue());
+          regionQueue = regionToEventQueue.get(region);
+        }
+        regionQueue.put(key, event, size);
+      }
+
+      @Override
+      public void peek(ArrayList result) {
+        // The elements that were peeked last time, have not been persisted to HDFS 
+        // yet. You cannot take out next batch until that is done.
+        if (!peeking.compareAndSet(false, true)) {
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
+          }
+          return;
+        }
+        //Maintain a separate set of peeked queues.
+        //All of these queues are statefull, and expect to be
+        //handleRemainingElements and clear to be called on
+        //them iff peek was called on them. However, new queues
+        //may be created in that time.
+        peekedQueues = Collections.newSetFromMap(new ConcurrentHashMap<SortedEventQueue, Boolean>(regionToEventQueue.size()));
+        
+        //Peek from all of the existing queues
+        for(SortedEventQueue queue : regionToEventQueue.values()) {
+          if(!queue.isEmpty()) {
+            queue.peek(result);
+            peekedQueues.add(queue);
+          }
+        }
+        if (result.isEmpty()) 
+          peeking.set(false);
+        
+        
+        this.lastPeekTimeInMillis = System.currentTimeMillis();
+      }
+
+      @Override
+      public boolean isEmpty() {
+        for(SortedEventQueue queue : regionToEventQueue.values()) {
+          if(!queue.isEmpty()) {
+            return false;
+          }
+        }
+        return true;
+      }
+
+      @Override
+      public boolean remove(HDFSGatewayEventImpl event) {
+        String region = event.getRegionPath();
+        SortedEventQueue regionQueue = regionToEventQueue.get(region);
+        return regionQueue.remove(event);
+      }
+
+      @Override
+      public void handleRemainingElements(HashSet<Long> removedSeqNums){
+        for(SortedEventQueue queue : peekedQueues) {
+          queue.handleRemainingElements(removedSeqNums);
+        }
+        peekedQueues.clear();
+        peeking.set(false);
+      }
+
+      @Override
+      public void clear() {
+        for(SortedEventQueue queue : regionToEventQueue.values()) {
+          queue.clear();
+        }
+        peekedQueues.clear();
+        peeking.set(false);
+      }
+
+      @Override
+      public long getLastPeekTimeInMillis() {
+        return this.lastPeekTimeInMillis;
+      }
+
+      @Override
+      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+          long minValue) {
+        SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
+        if(queue == null) {
+          return null;
+        }
+        return queue.get(region, regionKey, minValue);
+      }
+
+      @Override
+      public SortedEventQueueIterator iterator(Region region) {
+        SortedEventQueue queue = regionToEventQueue.get(region.getFullPath());
+        if(queue == null) {
+          return new SortedEventQueueIterator(new LinkedBlockingDeque<SortedEventBuffer>());
+        }
+        return queue.iterator(region);
+      }
+
+      @Override
+      public void rollover() {
+        for(SortedEventQueue queue : regionToEventQueue.values()) {
+          queue.rollover();
+        }
+      }
+    }
+    
+    class EventQueue implements HDFSEventQueue {
+      private final SignalledFlushObserver flush = new SignalledFlushObserver();
+      private final BlockingQueue<Long> eventSeqNumQueue = new LinkedBlockingQueue<Long>();
+      private final BlockingQueue<Long> peekedEvents = new LinkedBlockingQueue<Long>();
+      private long lastPeekTimeInMillis = System.currentTimeMillis(); 
+      
+      public EventQueue() {
+        
+      }
+      
+      @Override
+      public FlushObserver getFlushObserver() {
+        return flush;
+      }
+
+      @Override
+      public void put(long key, HDFSGatewayEventImpl event, int size) {
+        put(key);
+      }
+      public void put (long key) {
+        eventSeqNumQueue.add(key);
+        flush.push();
+        incQueueSize();
+      }
+      
+      
+      @Override
+      public HDFSGatewayEventImpl get(Region region, byte[] regionKey,
+          long minValue) {
+        throw new InternalGemFireError("Get not supported on unsorted queue");
+      }
+      
+      @Override
+      public void peek(ArrayList peekedEntries) {
+        if (peekedEvents.size() != 0) {
+          return;
+        }
+        
+        for(int size=0; size < batchSize; ) {
+          Long seqNum = eventSeqNumQueue.peek();
+          if (seqNum == null) {
+            // queue is now empty, return
+            break;
+          }
+          Object object = getNoLRU(seqNum, true, false, false);
+          if (object != null) {
+            peekedEvents.add(seqNum);
+            size += ((HDFSGatewayEventImpl)object).getSizeOnHDFSInBytes(!isBucketSorted);
+            peekedEntries.add(object);
+
+          } else {
+            logger.debug("The entry corresponding to the sequence number " + 
+               seqNum +  " is missing. This can happen when an entry is already" +
+               "dispatched before a bucket moved.");
+            // event is being ignored. Decrease the queue size
+            decQueueSize();
+            flush.pop(1);
+           
+          }
+          eventSeqNumQueue.poll();
+          
+        }
+        this.lastPeekTimeInMillis  = System.currentTimeMillis();
+      }
+
+      @Override
+      public boolean isEmpty() {
+        return eventSeqNumQueue.isEmpty();
+      }
+
+      
+      @Override
+      public boolean remove(HDFSGatewayEventImpl event) {
+        boolean deleted = peekedEvents.remove(event.getShadowKey());
+        if (deleted)
+         decQueueSize();
+        return deleted;
+      }
+
+      @Override
+      // It looks like that there is no need for this function 
+      // in EventQueue.
+      public void handleRemainingElements(HashSet<Long> removedSeqNums) {
+        flush.pop(removedSeqNums.size());
+        eventSeqNumQueue.addAll(peekedEvents);
+        peekedEvents.clear();
+      }
+
+      @Override
+      public void clear() {
+        flush.clear();
+        decQueueSize(eventSeqNumQueue.size());
+        eventSeqNumQueue.clear();
+        decQueueSize(peekedEvents.size());
+        peekedEvents.clear();
+      }
+
+      @Override
+      public long getLastPeekTimeInMillis() {
+        return this.lastPeekTimeInMillis;
+      }
+      @Override
+      public SortedEventQueueIterator iterator(Region region) {
+        throw new InternalGemFireError("not supported on unsorted queue");
+      }
+      @Override
+      public void rollover() {
+        throw new InternalGemFireError("not supported on unsorted queue");
+      }
+    }
+    
+    class SortedEventQueue implements HDFSEventQueue {
+      private final SignalledFlushObserver flush = new SignalledFlushObserver();
+
+      // List of all the skip lists that hold the data
+      final Deque<SortedEventBuffer> queueOfLists = 
+          new LinkedBlockingDeque<SortedEventBuffer>();
+      
+      // This points to the tail of the queue
+      volatile SortedEventBuffer currentSkipList = new SortedEventBuffer();
+      
+      private final AtomicBoolean peeking = new AtomicBoolean(false);
+      
+      private long lastPeekTimeInMillis = System.currentTimeMillis(); 
+      
+      public SortedEventQueue() {
+        queueOfLists.add(currentSkipList);
+      }
+      
+      @Override
+      public FlushObserver getFlushObserver() {
+        return flush;
+      }
+
+      public boolean remove(HDFSGatewayEventImpl event) {
+        SortedEventBuffer eventBuffer = queueOfLists.peek();
+        if (eventBuffer != null) {
+          return eventBuffer.copyToBuffer(event);
+        }
+        else {
+          // This can happen when the queue is cleared because of bucket movement 
+          // before the remove is called. 
+          return true;
+        }
+      } 
+
+      public void clear() {
+        flush.clear();
+        for (SortedEventBuffer buf : queueOfLists) {
+          decQueueSize(buf.size());
+          buf.clear();
+        }
+        
+        queueOfLists.clear();
+        rollList(false);
+
+        peeking.set(false);
+      }
+
+      public boolean isEmpty() {
+        if (queueOfLists.size() == 1)
+          return queueOfLists.peek().isEmpty();
+        return false;
+      }
+
+      public void put(long key, HDFSGatewayEventImpl event, int eventSize) {
+        if (logger.isTraceEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Inserting key " + event + " into list " + System.identityHashCode(currentSkipList)));
+        }
+        putInList(new KeyToSeqNumObject(((HDFSGatewayEventImpl)event).getSerializedKey(), key), 
+            eventSize);
+      }
+
+      private void putInList(KeyToSeqNumObject entry, int sizeInBytes) {
+        // It was observed during testing that peek can start peeking 
+        // elements from a list to which a put is happening. This happens 
+        // when the peek changes the value of currentSkiplist to a new list 
+        // but the put continues to write to an older list. 
+        // So there is a possibility that an element is added to the list 
+        // that has already been peeked. To handle this case, in handleRemainingElements
+        // function we re-add the elements that were not peeked. 
+        if (currentSkipList.add(entry, sizeInBytes) == null) {
+          flush.push();
+          incQueueSize();
+        }
+      }
+
+      public void rollover(boolean forceRollover) {
+        if (currentSkipList.bufferSize() >= batchSize || forceRollover) {
+          rollList(forceRollover);
+        }
+      }
+      
+      /**
+       * Ideally this function should be called from a thread periodically to 
+       * rollover the skip list when it is above a certain size. 
+       * 
+       */
+      public void rollover() {
+        rollover(false);
+      }
+
+      public void peek(ArrayList peekedEntries) {
+        // The elements that were peeked last time, have not been persisted to HDFS 
+        // yet. You cannot take out next batch until that is done.
+        if (!peeking.compareAndSet(false, true)) {
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peek already in progress, aborting"));
+          }
+          return;
+        }
+
+        if (queueOfLists.size() == 1) {
+          rollList(false);
+        }
+        
+        Assert.assertTrue(queueOfLists.size() > 1, "Cannot peek from head of queue");
+        BufferIterator itr = queueOfLists.peek().iterator();
+        while (itr.hasNext()) {
+          KeyToSeqNumObject entry = itr.next();
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Peeking key " + entry + " from list " + System.identityHashCode(queueOfLists.peek())));
+          }
+
+          HDFSGatewayEventImpl ev = itr.value();
+          ev.copyOffHeapValue();
+          peekedEntries.add(ev);
+        }
+        
+        // discard an empty batch as it is not processed and will plug up the
+        // queue
+        if (peekedEntries.isEmpty()) {
+          SortedEventBuffer empty = queueOfLists.remove();
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Discarding empty batch " + empty));
+          }
+          peeking.set(false);
+        }
+        this.lastPeekTimeInMillis = System.currentTimeMillis();
+      }
+
+      public HDFSGatewayEventImpl get(Region region, byte[] regionKey, long key) {
+        KeyToSeqNumObject event = new KeyToSeqNumObject(regionKey, key);
+        Iterator<SortedEventBuffer> queueIterator = queueOfLists.descendingIterator();
+        while (queueIterator.hasNext()) {
+          HDFSGatewayEventImpl evt = queueIterator.next().getFromQueueOrBuffer(event);
+          if (evt != null) {
+            return evt;
+          }
+        }
+        return null;
+      }
+      
+      public void handleRemainingElements(HashSet<Long> removedSeqNums) {
+        if (!peeking.get()) {
+          if (logger.isTraceEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Not peeked, just cleaning up empty batch; current list is " + currentSkipList));
+          }
+          return;
+        }
+
+        Assert.assertTrue(queueOfLists.size() > 1, "Cannot remove only event list");
+
+        // all done with the peeked elements, okay to throw away now
+        SortedEventBuffer buf = queueOfLists.remove();
+        SortedEventBuffer.BufferIterator bufIter = buf.iterator();
+        // Check if the removed buffer has any extra events. If yes, check if these extra 
+        // events are part of region. If yes, reinsert these as they were probably inserted 
+        // into this list while it was being peeked. 
+        while (bufIter.hasNext()) {
+          KeyToSeqNumObject key = bufIter.next();
+          if (!removedSeqNums.contains(key.getSeqNum())) {
+            HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) getNoLRU(key.getSeqNum(), true, false, false);
+            if (evt != null) {
+              flush.push();
+              incQueueSize();
+              queueOfLists.getFirst().add(key, evt.getSizeOnHDFSInBytes(!isBucketSorted));
+            }
+          }
+        }
+
+        decQueueSize(buf.size());
+        flush.pop(buf.size());
+        peeking.set(false);
+      }
+      
+      public long getLastPeekTimeInMillis(){
+        return this.lastPeekTimeInMillis;
+      }
+      
+      NavigableSet<KeyToSeqNumObject> getPeeked() {
+        assert peeking.get();
+        return queueOfLists.peek().keySet();
+      }
+      
+      private synchronized void rollList(boolean forceRollover) {
+        if (currentSkipList.bufferSize() < batchSize && queueOfLists.size() > 1 && !forceRollover)
+          return;
+        SortedEventBuffer tmp = new SortedEventBuffer();
+        queueOfLists.add(tmp);
+        if (logger.isTraceEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Rolling over list from " + currentSkipList + " to list " + tmp));
+        }
+        currentSkipList = tmp;
+      }
+
+      @Override
+      public SortedEventQueueIterator iterator(Region region) {
+        return new SortedEventQueueIterator(queueOfLists);
+      }
+    }
+    
+    public class SortedEventBuffer {
+      private final HDFSGatewayEventImpl NULL = new HDFSGatewayEventImpl();
+  
+      private final ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl> events = new ConcurrentSkipListMap<KeyToSeqNumObject, HDFSGatewayEventImpl>();
+      
+      private int bufferSize = 0;
+      
+      public boolean copyToBuffer(HDFSGatewayEventImpl event) {
+        KeyToSeqNumObject key = new KeyToSeqNumObject(event.getSerializedKey(), event.getShadowKey());
+        if (events.containsKey(key)) {
+          // After an event has been delivered in a batch, we copy it into the
+          // buffer so that it can be returned by an already in progress iterator.
+          // If we do not do this it is possible to miss events since the hoplog
+          // iterator uses a fixed set of files that are determined when the
+          // iterator is created.  The events will be GC'd once the buffer is no
+          // longer strongly referenced.
+          HDFSGatewayEventImpl oldVal = events.put(key, event);
+          assert oldVal == NULL;
+  
+          return true;
+        }
+        // If the primary lock is being relinquished, the events is cleared and probaly that is
+        // why we are here. return true if the primary lock is being relinquished
+        if (releasingPrimaryLock.get())
+          return true;
+        else 
+          return false;
+      }
+  
+      public HDFSGatewayEventImpl getFromQueueOrBuffer(KeyToSeqNumObject key) {
+        KeyToSeqNumObject result = events.ceilingKey(key);
+        if (result != null && Bytes.compareTo(key.getRegionkey(), result.getRegionkey()) == 0) {
+          
+          // first try to fetch the buffered event to make it fast. 
+          HDFSGatewayEventImpl evt = events.get(result);
+          if (evt != NULL) {
+            return evt;
+          }
+          // now try to fetch the event from the queue region
+          evt = (HDFSGatewayEventImpl) getNoLRU(result.getSeqNum(), true, false, false);
+          if (evt != null) {
+            return evt;
+          }
+          
+          // try to fetch again from the buffered events to avoid a race between 
+          // item deletion and the above two statements. 
+          evt = events.get(result);
+          if (evt != NULL) {
+            return evt;
+          }
+          
+        }
+        return null;
+      }
+  
+      public HDFSGatewayEventImpl add(KeyToSeqNumObject key, int sizeInBytes) {
+        bufferSize += sizeInBytes;
+        return events.put(key, NULL);
+      }
+  
+      public void clear() {
+        events.clear();
+      }
+  
+      public boolean isEmpty() {
+        return events.isEmpty();
+      }
+  
+      public int bufferSize() {
+        return bufferSize;
+      }
+      public int size() {
+        return events.size();
+      }
+      public NavigableSet<KeyToSeqNumObject> keySet() {
+        return events.keySet();
+      }
+  
+      public BufferIterator iterator() {
+        return new BufferIterator(events.keySet().iterator());
+      }
+  
+      public class BufferIterator implements Iterator<KeyToSeqNumObject> {
+        private final Iterator<KeyToSeqNumObject> src;
+
+        private KeyToSeqNumObject currentKey;
+        private HDFSGatewayEventImpl currentVal;
+
+        private KeyToSeqNumObject nextKey;
+        private HDFSGatewayEventImpl nextVal;
+        
+        public BufferIterator(Iterator<KeyToSeqNumObject> src) {
+          this.src = src;
+          moveNext();
+        }
+  
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+        
+        @Override
+        public boolean hasNext() {
+          return nextVal != null;
+        }
+        
+        @Override
+        public KeyToSeqNumObject next() {
+          if (!hasNext()) {
+            throw new NoSuchElementException();
+          }
+          
+          currentKey = nextKey;
+          currentVal = nextVal;
+          
+          moveNext();
+          
+          return currentKey;
+        }
+  
+        public KeyToSeqNumObject key() {
+          assert currentKey != null;
+          return currentKey;
+        }
+        
+        public HDFSGatewayEventImpl value() {
+          assert currentVal != null;
+          return currentVal;
+        }
+        
+        private void moveNext() {
+          while (src.hasNext()) {
+            nextKey = src.next();
+            nextVal = getFromQueueOrBuffer(nextKey);
+            if (nextVal != null) {
+              return;
+            } else if (logger.isDebugEnabled() || VERBOSE) {
+              logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "The entry corresponding to"
+                  + " the sequence number " + nextKey.getSeqNum() 
+                  + " is missing. This can happen when an entry is already" 
+                  + " dispatched before a bucket moved."));
+            }
+          }
+          nextKey = null;
+          nextVal = null;
+        }
+      }
+    }
+  
+    public final class SortedEventQueueIterator implements CursorIterator<HDFSGatewayEventImpl> {
+      /** the iterators to merge */
+      private final List<SortedEventBuffer.BufferIterator> iters;
+  
+      /** the current iteration value */
+      private HDFSGatewayEventImpl value;
+  
+      public SortedEventQueueIterator(Deque<SortedEventBuffer> queueOfLists) {
+        iters = new ArrayList<SortedEventBuffer.BufferIterator>();
+        for (Iterator<SortedEventBuffer> iter = queueOfLists.descendingIterator(); iter.hasNext();) {
+          SortedEventBuffer.BufferIterator buf = iter.next().iterator();
+          if (buf.hasNext()) {
+            buf.next();
+            iters.add(buf);
+          }
+        }
+      }
+      
+      public void close() {
+        value = null;
+        iters.clear();
+      }
+
+      @Override
+      public boolean hasNext() {
+        return !iters.isEmpty();
+      }
+      
+      @Override
+      public HDFSGatewayEventImpl next() {
+        if (!hasNext()) {
+          throw new UnsupportedOperationException();
+        }
+        
+        int diff = 0;
+        KeyToSeqNumObject min = null;
+        SortedEventBuffer.BufferIterator cursor = null;
+        
+        for (Iterator<SortedEventBuffer.BufferIterator> merge = iters.iterator(); merge.hasNext(); ) {
+          SortedEventBuffer.BufferIterator buf = merge.next();
+          KeyToSeqNumObject tmp = buf.key();
+          if (min == null || (diff = Bytes.compareTo(tmp.regionkey, min.regionkey)) < 0) {
+            min = tmp;
+            cursor = buf;
+            
+          } else if (diff == 0 && !advance(buf, min)) {
+            merge.remove();
+          }
+        }
+        
+        value = cursor.value();
+        assert value != null;
+
+        if (!advance(cursor, min)) {
+          iters.remove(cursor);
+        }
+        return current();
+      }
+      
+      @Override
+      public final HDFSGatewayEventImpl current() {
+        return value;
+      }
+
+      @Override 
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+      
+      private boolean advance(SortedEventBuffer.BufferIterator iter, KeyToSeqNumObject key) {
+        while (iter.hasNext()) {
+          if (Bytes.compareTo(iter.next().regionkey, key.regionkey) > 0) {
+            return true;
+          }
+        }
+        return false;
+      }
+    }
+}



Mime
View raw message