geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [11/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code
Date Wed, 27 Apr 2016 20:49:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
index ea3c975..b6c072c 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -1938,6 +1938,10 @@ public class CacheXmlGenerator extends CacheXml implements XMLReader {
           dpString = PERSISTENT_REPLICATE_DP;
         } else if (dp == DataPolicy.PERSISTENT_PARTITION) {
           dpString = PERSISTENT_PARTITION_DP;
+        } else if (dp == DataPolicy.HDFS_PARTITION) {
+          dpString = HDFS_PARTITION_DP;
+        } else if (dp == DataPolicy.HDFS_PERSISTENT_PARTITION) {
+          dpString = HDFS_PERSISTENT_PARTITION_DP;
         } else if (dp.isPartition()) {
           if (this.version.compareTo(CacheXmlVersion.GEMFIRE_5_1) >= 0) {
             dpString = PARTITION_DP;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
index f344938..890f8aa 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
@@ -87,6 +87,7 @@ import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
 import com.gemstone.gemfire.cache.client.ClientCache;
 import com.gemstone.gemfire.cache.client.PoolFactory;
 import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
 import com.gemstone.gemfire.cache.query.IndexType;
 import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
@@ -1019,7 +1020,161 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
 
     stack.push(attrs);
   }
+  /**
+   * When a <code>hdfs-store</code> element is first encountered, we
+   * create a {@link HDFSStoreCreation}, populate it accordingly, and
+   * push it on the stack.
+   * <pre>
+   * {@code
+   * <hdfs-store name="" gemfire-home-dir="" namenode-url="" hdfs-client-config-file="">
+   * ...
+   * </hdfs-store>
+   * }
+   * 
+   */
+  private void startHDFSStore(Attributes atts) {
+    // this is the only place to create DSAC objects
+    HDFSStoreCreation attrs = new HDFSStoreCreation();
+    String name = atts.getValue(NAME);
+    if (name == null) {
+      throw new InternalGemFireException(
+          LocalizedStrings.CacheXmlParser_NULL_DiskStoreName.toLocalizedString());
+    } else {
+      attrs.setName(name);
+    }
 
+    String namenode = atts.getValue(HDFS_NAMENODE_URL);
+    if (namenode == null) {
+      throw new InternalGemFireException(
+          LocalizedStrings.CacheXmlParser_NULL_DiskStoreName.toLocalizedString());
+    } else {
+      attrs.setNameNodeURL(namenode);
+    }
+
+    String clientConfig = atts.getValue(HDFS_CLIENT_CONFIG_FILE);
+    if (clientConfig != null) {
+      attrs.setHDFSClientConfigFile(clientConfig);
+    }
+    
+    String folderPath = atts.getValue(HDFS_HOME_DIR);
+    if (folderPath != null) {
+      attrs.setHomeDir(folderPath);
+    }
+   
+    String readCacheSize = atts.getValue(HDFS_READ_CACHE_SIZE);
+    if (readCacheSize != null) {
+      try {
+        attrs.setBlockCacheSize(Float.valueOf(readCacheSize));
+      } catch (NumberFormatException e) {
+        throw new CacheXmlException(
+            LocalizedStrings.DistributedSystemConfigImpl_0_IS_NOT_A_VALID_INTEGER_1
+            .toLocalizedString(new Object[] { readCacheSize, HDFS_READ_CACHE_SIZE }),
+            e);
+      }
+    }
+    
+    Integer maxMemory = getIntValue(atts, HDFS_MAX_MEMORY);
+    if (maxMemory != null) {
+      attrs.setMaxMemory(maxMemory);
+    }
+    
+    Integer batchSize = getIntValue(atts, HDFS_BATCH_SIZE);
+    if (batchSize != null) {
+      attrs.setBatchSize(batchSize);
+    }
+    
+    Integer batchInterval = getIntValue(atts, HDFS_BATCH_INTERVAL);
+    if (batchInterval != null) {
+      attrs.setBatchInterval(batchInterval);
+    }
+    
+    Integer dispatcherThreads = getIntValue(atts, HDFS_DISPATCHER_THREADS);
+    if (dispatcherThreads != null) {
+      attrs.setDispatcherThreads(dispatcherThreads);
+    }
+    
+    Boolean bufferPersistent = getBoolean(atts, HDFS_BUFFER_PERSISTENT);
+    if (bufferPersistent != null) {
+      attrs.setBufferPersistent(bufferPersistent);
+    }
+    
+    Boolean synchronousDiskWrite = getBoolean(atts, HDFS_SYNCHRONOUS_DISK_WRITE);
+    if (synchronousDiskWrite != null) {
+      attrs.setSynchronousDiskWrite(synchronousDiskWrite);
+    }
+    
+    String diskstoreName = atts.getValue(HDFS_DISK_STORE);
+    if (diskstoreName != null) {
+      attrs.setDiskStoreName(diskstoreName);
+    }
+    
+    Integer purgeInterval = getInteger(atts, HDFS_PURGE_INTERVAL);
+    if (purgeInterval != null) {
+      attrs.setPurgeInterval(purgeInterval);
+    }
+    Boolean majorCompaction = getBoolean(atts, HDFS_MAJOR_COMPACTION);
+    if (majorCompaction != null) {
+      attrs.setMajorCompaction(Boolean.valueOf(majorCompaction));
+    }
+    
+    // configure major compaction interval
+    Integer majorCompactionInterval = getIntValue(atts, HDFS_MAJOR_COMPACTION_INTERVAL);
+    if (majorCompactionInterval != null) {
+      attrs.setMajorCompactionInterval(majorCompactionInterval);
+    }
+    
+    // configure compaction concurrency
+    Integer value = getIntValue(atts, HDFS_MAJOR_COMPACTION_THREADS);
+    if (value != null)
+      attrs.setMajorCompactionThreads(value);
+    
+    Boolean minorCompaction = getBoolean(atts, HDFS_MINOR_COMPACTION);
+    if (minorCompaction != null) {
+      attrs.setMinorCompaction(Boolean.valueOf(minorCompaction));
+    }
+    
+    // configure compaction concurrency
+    value = getIntValue(atts, HDFS_MINOR_COMPACTION_THREADS);
+    if (value != null)
+      attrs.setMinorCompactionThreads(value);
+    
+    String maxFileSize = atts.getValue(HDFS_MAX_WRITE_ONLY_FILE_SIZE);
+    if (maxFileSize != null) {
+      attrs.setWriteOnlyFileRolloverSize(parseInt(maxFileSize));
+    }
+    
+    String fileRolloverInterval = atts.getValue(HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL);
+    if (fileRolloverInterval != null) {
+      attrs.setWriteOnlyFileRolloverInterval(parseInt(fileRolloverInterval));
+    }
+    stack.push(name);
+    stack.push(attrs);
+  }
+  
+  /**
+   * After popping the current <code>HDFSStoreCreation</code> off the
+   * stack, we add it to the <code>HDFSStoreCreation</code> that should be on the
+   * top of the stack.
+   */
+  private void endHDFSStore() {
+    HDFSStoreCreation hsc = (HDFSStoreCreation) stack.pop();
+    String name = (String) stack.pop();
+    CacheCreation cache;
+    Object top = stack.peek();
+    if (top instanceof CacheCreation) {
+      cache = (CacheCreation) top;
+    }
+    else {
+      String s = "Did not expect a " + top.getClass().getName()
+          + " on top of the stack.";
+      Assert.assertTrue(false, s);
+      cache = null; // Dead code
+    }
+    if (name != null) {
+      cache.addHDFSStore(name, hsc);
+    }
+  }
+	
   private Integer getIntValue(Attributes atts, String param) {
     String maxInputFileSizeMB = atts.getValue(param);
     if (maxInputFileSizeMB != null) {
@@ -1114,6 +1269,12 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
       else if (dp.equals(PERSISTENT_PARTITION_DP)) {
         attrs.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
       }
+      else if (dp.equals(HDFS_PARTITION_DP)) {
+        attrs.setDataPolicy(DataPolicy.HDFS_PARTITION);
+      }
+      else if (dp.equals(HDFS_PERSISTENT_PARTITION_DP)) {
+        attrs.setDataPolicy(DataPolicy.HDFS_PERSISTENT_PARTITION);
+      }
       else {
         throw new InternalGemFireException(LocalizedStrings.CacheXmlParser_UNKNOWN_DATA_POLICY_0.toLocalizedString(dp));
       }
@@ -1234,7 +1395,16 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
     if(offHeapStr != null) {
       attrs.setOffHeap(Boolean.valueOf(offHeapStr).booleanValue());
     }
+    String hdfsStoreName = atts.getValue(HDFS_STORE_NAME);
+    if (hdfsStoreName != null) {
+      attrs.setHDFSStoreName(hdfsStoreName);
+    }
+    String hdfsWriteOnly= atts.getValue(HDFS_WRITE_ONLY);
+    if (hdfsWriteOnly != null) {
+      attrs.setHDFSWriteOnly(Boolean.valueOf(hdfsWriteOnly).booleanValue());
+    }
 
+    
     stack.push(attrs);
   }
   
@@ -2836,6 +3006,9 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
     } else if(qName.equals(PDX_SERIALIZER)) {
       //do nothing
     }
+	else if (qName.equals(HDFS_STORE)) {
+        startHDFSStore(atts);
+    }
     else if (qName.equals(COMPRESSOR)) {
     }
     else {
@@ -3244,6 +3417,9 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
       else if (qName.equals(PDX_SERIALIZER)) {
         endPdxSerializer();
       }
+      else if (qName.equals(HDFS_STORE)) {
+          endHDFSStore();
+      }
       else if (qName.equals(COMPRESSOR)) {
         endCompressor();
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/RegionAttributesCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/RegionAttributesCreation.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/RegionAttributesCreation.java
index 4dfe6ae..d0f5676 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/RegionAttributesCreation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/RegionAttributesCreation.java
@@ -28,6 +28,7 @@ import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.cache.CacheListener;
 import com.gemstone.gemfire.cache.CacheLoader;
 import com.gemstone.gemfire.cache.CacheWriter;
+import com.gemstone.gemfire.cache.CustomEvictionAttributes;
 import com.gemstone.gemfire.cache.CustomExpiry;
 import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.DiskStoreFactory;
@@ -122,6 +123,8 @@ public class RegionAttributesCreation extends UserSpecifiedRegionAttributes impl
   * @since prPersistPrint2 
   * */
   private String diskStoreName;
+  private String hdfsStoreName;
+  private boolean hdfsWriteOnly = false;
   private boolean isDiskSynchronous = AttributesFactory.DEFAULT_DISK_SYNCHRONOUS;
   
   private boolean cloningEnabled = false;
@@ -268,7 +271,8 @@ public class RegionAttributesCreation extends UserSpecifiedRegionAttributes impl
     this.poolName = attrs.getPoolName();
     this.multicastEnabled = attrs.getMulticastEnabled();
     this.cloningEnabled = attrs.getCloningEnabled();
-
+	this.hdfsStoreName = attrs.getHDFSStoreName();
+    
     this.compressor = attrs.getCompressor();
     this.offHeap = attrs.getOffHeap();
     if (attrs instanceof UserSpecifiedRegionAttributes) {
@@ -496,6 +500,10 @@ public class RegionAttributesCreation extends UserSpecifiedRegionAttributes impl
     if(this.cloningEnabled != other.getCloningEnabled()){
       throw new RuntimeException(LocalizedStrings.RegionAttributesCreation__CLONING_ENABLE_IS_NOT_THE_SAME_THIS_0_OTHER_1.toLocalizedString(new Object[] {Boolean.valueOf(this.cloningEnabled), Boolean.valueOf(other.getCloningEnabled())}));
     }
+ 	if (! equal(this.hdfsStoreName, other.getHDFSStoreName())) {
+      //TODO:HDFS write a new exception string
+      throw new RuntimeException(" HDFS Store name does not match");
+    }
     if(! equal(this.compressor, other.getCompressor())) {
       throw new RuntimeException("Compressors are not the same.");
     }
@@ -1440,7 +1448,25 @@ public class RegionAttributesCreation extends UserSpecifiedRegionAttributes impl
         setDiskSynchronous(parent.isDiskSynchronous());
       }
     }
-
+    if (!hasHDFSStoreName()) {
+      if (parentIsUserSpecified) {
+        if (parentWithHas.hasHDFSStoreName()) {
+          setHDFSStoreName(parent.getHDFSStoreName());
+        }
+      } else {
+        setHDFSStoreName(parent.getHDFSStoreName());
+      }
+    }
+    if (!hasHDFSWriteOnly()) {
+      if (parentIsUserSpecified) {
+        if (parentWithHas.hasHDFSWriteOnly()) {
+          setHDFSWriteOnly(parent.getHDFSWriteOnly());
+        }
+      } else {
+        setHDFSWriteOnly(parent.getHDFSWriteOnly());
+      }
+    }
+    
     if(!hasCompressor()) {
       if (parentIsUserSpecified) {
         if (parentWithHas.hasCompressor()) {
@@ -1528,6 +1554,15 @@ public class RegionAttributesCreation extends UserSpecifiedRegionAttributes impl
     return this.evictionAttributes;
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public CustomEvictionAttributes getCustomEvictionAttributes() {
+    // TODO: HDFS: no support for configuring this from XML yet
+    return null;
+  }
+
   public void setPoolName(String poolName) {
     if ("".equals(poolName)) {
       poolName = null;
@@ -1620,4 +1655,20 @@ public class RegionAttributesCreation extends UserSpecifiedRegionAttributes impl
   public Set<String> getGatewaySenderIds() {
     return this.gatewaySenderIds;
   }
+  public String getHDFSStoreName() {
+    return this.hdfsStoreName;
+  }
+  public void setHDFSStoreName(String hdfsStoreName) {
+    //TODO:HDFS : throw an exception if a disk store is already configured
+    // and vice versa
+    this.hdfsStoreName = hdfsStoreName;
+    setHasHDFSStoreName(true);
+  }
+  public void setHDFSWriteOnly(boolean writeOnly) {
+    this.hdfsWriteOnly= writeOnly;
+    setHasHDFSWriteOnly(true);
+  }
+  public boolean getHDFSWriteOnly() {
+    return hdfsWriteOnly;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
index ff960ca..2a939b4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/i18n/LocalizedStrings.java
@@ -1997,6 +1997,32 @@ public class LocalizedStrings extends ParentLocalizedStrings {
   public static final StringId SnappyCompressor_UNABLE_TO_LOAD_NATIVE_SNAPPY_LIBRARY = new StringId(5502, "Unable to load native Snappy library.");
   public static final StringId SnappyCompressor_UNABLE_TO_LOAD_NATIVE_SNAPPY_LIBRARY_MISSING_LIBRARY = new StringId(5503, "Unable to load native Snappy library from: {0}");
   
+  /** HOPLOG STRINGS, 5505 - 5600 **/
+  public static final StringId HOPLOG_REGION_CLOSE_FAILED = new StringId(5505, "IO error while trying to close region and release hdfs connection: {0}");
+  public static final StringId HOPLOG_HDFS_CLIENT_CONFIG_FILE_ABSENT = new StringId(5506, "HDFS client config file does not exist: {0}");
+  public static final StringId HOPLOG_IO_ERROR = new StringId(5507, "IO Exception while executing HDFS operation: {0}");
+  public static final StringId HOPLOG_UNABLE_TO_DELETE_FILE = new StringId(5508, "Unable to delete file: {0}");
+  public static final StringId HOPLOG_UNABLE_TO_DELETE_HDFS_DATA = new StringId(5509, "Unable to delete HDFS data while destroying region");
+  public static final StringId HOPLOG_CLOSE_FAILED = new StringId(5510, "IO error while trying to close hoplog.");
+  public static final StringId HOPLOG_FLUSH_FOR_BATCH_FAILED = new StringId(5511, "A batch of data could not be persisted on HDFS. It will be retried.");
+  public static final StringId HOPLOG_HDFS_STORE_NOT_FOUND = new StringId(5512, "HDFS store ''{0}'' does not exist.");
+  public static final StringId HOPLOG_TRYING_TO_CREATE_STANDALONE_SYSTEM = new StringId(5513, "The namenode url {0} is not valid. Please use the format hdfs://HOST:PORT");
+  public static final StringId HOPLOG_DOES_NOT_USE_HDFSSTORE = new StringId(5514, "{0} does not use HDFSSTORE");
+  public static final StringId HOPLOG_CONFIGURED_AS_WRITEONLY = new StringId(5515, "{0} is defined as WRITEONLY");
+  public static final StringId HOPLOG_MISSING_IN_BUCKET_FORCED_CLOSED = new StringId(5516, "A hoplog file, {0}, was not found in bucket lists. Closing it now, it may impact active reads.");
+  public static final StringId HOPLOG_MIN_IS_MORE_THAN_MAX = new StringId(5517, "Value of {0} is {1}. It should not be more than {2} value {3}");
+  public static final StringId HOPLOG_NOT_STARTED_YET = new StringId(5518, "HDFS store is not started yet. Gemfire is running without HDFS.");
+  public static final StringId HOPLOG_0_COLOCATE_WITH_REGION_1_NOT_INITIALIZED_YET = new StringId(5519, "Current region: {0} colocated with region {1} is yet initialized.");
+  public static final StringId HOPLOG_SUSPEND_OF_0_FAILED_IN_1 = new StringId(5520, "Failed to suspend active {0} in {1}");
+  public static final StringId HOPLOG_CLEANED_UP_BY_JANITOR = new StringId(5521, "Hoplog is cleaned up by janitor task.");
+  public static final StringId HOPLOG_HDFS_UNREACHABLE = new StringId(5522, "HDFS at {0} is unreachable.");
+  public static final StringId HOPLOG_MAJOR_COMPACTION_SCHEDULED_FOR_BETTER_ESTIMATE = new StringId(5523, "A major compaction has been automatically scheduled for better accuracy of count_estimate() function");
+  public static final StringId HOPLOG_FAILED_TO_READ_HDFS_FILE = new StringId(5524, "Exception while reading file on HDFS: {0}");
+  public static final StringId HOPLOG_HDFS_COMPACTION_ERROR = new StringId(5525, "Error while compacting files of bucket {0}");
+  public static final StringId HOPLOG_HDFS_COMPACTION_OVERLOADED = new StringId(5526, "Too many pending tasks for {0}. Skipping compaction request for {1}");
+  public static final StringId HOPLOG_FLUSH_OPERATION_FAILED = new StringId(5527, "IO error while trying to flush buffer and create hoplog.");
+  public static final StringId HOPLOG_HOPLOG_REMOVE_FAILED = new StringId(5528, "IO error while trying to remove hoplog.");
+  /** HOPLOG STRINGS, 5505 - 5600 **/
 
   public static final StringId PartitionAttributesImpl_CANNOT_DETERMINE_LOCAL_MAX_MEMORY_FOR_PARTITION_ATTRIBUTE_SINCE_NO_CACHE_IS_AVAILABLE_FROM_WHICH_TO_FETCH_THE_OFF_HEAP_MEMORY_ALLOCATOR = new StringId(5600, "Cannot determine local max memory for partition attribute since no cache is available from which to fetch the off-heap memory allocator");
 
@@ -2070,6 +2096,10 @@ public class LocalizedStrings extends ParentLocalizedStrings {
   public static final StringId ParallelAsyncEventQueue_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1 = new StringId(5716,"Parallel Async Event Queue {0} can not be used with replicated region {1}");
   public static final StringId ParallelGatewaySender_0_CAN_NOT_BE_USED_WITH_REPLICATED_REGION_1 = new StringId(5717,"Parallel gateway sender {0} can not be used with replicated region {1}");
 
+  public static final StringId HDFSSTORE_IS_USED_IN_NONHDFS_REGION = new StringId(5808, "Only regions with HDFS_PARTITION or HDFS_PERSISTENT_PARTITION data policies can specify a HDFS Store");
+  public static final StringId EVICTORSERVICE_CAUGHT_EXCEPTION_0 = new StringId(5809, "Evictor Service caught following exception : {0}");
+  public static final StringId HDFSSTORE_IS_USED_IN_REPLICATED_TABLE = new StringId(5810, "HDFS Store cannot be used for REPLICATED TABLE");
+  public static final StringId HDFS_USER_IS_SAME_AS_GF_USER = new StringId(5811, "Gemfire user is the same as HDFS user, may cause security risks: {0}");
   public static final StringId GF_KERBEROS_KEYTAB_FILE_ABSENT = new StringId(5812, "Gemfire kerberos keytab file is missing: {0}");
   public static final StringId GF_KERBEROS_NAMENODE_PRINCIPAL_UNDEF = new StringId(5813, "Namenode principal must be configured when using kerberos authentication");
   public static final StringId GF_KERBEROS_KEYTAB_UNDEF = new StringId(5814, "Gemfire kerberos keytab file is not configured");

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
index 08de0c9..9ff3249 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/redis/RegionProvider.java
@@ -393,7 +393,7 @@ public class RegionProvider implements Closeable {
     r = cache.getRegion(key);
     if (r != null) return r;
     do {
-      Result result = cliCmds.createRegion(key, defaultRegionType, null, null, true, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+      Result result = cliCmds.createRegion(key, defaultRegionType, null, null, true, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
       r = cache.getRegion(key);
       if (result.getStatus() == Status.ERROR && r == null) {
         String err = "";

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedRegionMXBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedRegionMXBean.java b/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedRegionMXBean.java
index f087c89..3003827 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedRegionMXBean.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedRegionMXBean.java
@@ -130,6 +130,7 @@ public interface DistributedRegionMXBean {
   /**
    * Returns the number of entries in the Region.
    * 
+   * For HDFS regions it will be count of only in memory data.
    */
   public long getSystemRegionEntryCount();
 
@@ -304,4 +305,14 @@ public interface DistributedRegionMXBean {
    * Returns the number of members whose entry count is 0.
    */
   public int getEmptyNodes();
+  
+  
+  /**
+   * An estimated entry count for HDFS Read-Write region.This may not be accurate but acts
+   * as an indicative value.
+   * 
+   * For other regions it will be -1 ( Not Available)
+   */
+  public long getEstimatedSizeForHDFSRegion();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedSystemMXBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedSystemMXBean.java b/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedSystemMXBean.java
index 88c4058..a6f65d4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedSystemMXBean.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/DistributedSystemMXBean.java
@@ -130,6 +130,14 @@ public interface DistributedSystemMXBean {
    */
   public Map<String, String[]> listMemberDiskstore();
 
+  
+  /**
+   *  @return A map of all {@link DistributedMember}s and their HDFSStore's.
+   */
+  
+  public Map<String, String[]> listMemberHDFSStore();
+  
+  
   /**
    * Returns a list of IDs for all gateway senders.
    */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/MemberMXBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/MemberMXBean.java b/geode-core/src/main/java/com/gemstone/gemfire/management/MemberMXBean.java
index 4b849e0..ed27569 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/MemberMXBean.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/MemberMXBean.java
@@ -227,6 +227,13 @@ public interface MemberMXBean {
   public String[] listDiskStores(boolean includeRegionOwned);
 
   /**
+   * 
+   * @return  list of HDFSStore's present in the Cache
+   */
+  
+  public String[] getHDFSStores();
+
+  /**
    * Returns the GemFire specific properties for this member.
    */
   public GemFireProperties listGemFireProperties();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/RegionMXBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/RegionMXBean.java b/geode-core/src/main/java/com/gemstone/gemfire/management/RegionMXBean.java
index a913105..8c11d00 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/RegionMXBean.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/RegionMXBean.java
@@ -127,6 +127,8 @@ public interface RegionMXBean {
    * partitioned regions it will be the entry count for the primary buckets
    * hosted within this member.
    *
+   * For HDFS regions it will be count of only in memory data.
+   * 
    */
   public long getEntryCount();
 
@@ -348,4 +350,12 @@ public interface RegionMXBean {
    */
   public int getLocalMaxMemory();
   
+  /**
+   * Estimated entry count for HDFS Read-Write regions.This may not be accurate but
+   * acts as an indicative value. All HDFS Read-Write regions regions are PartitionedRegions. Hence
+   * the estimated value will be for primary buckets hosted within the member.
+   * 
+   * For other regions it will be -1 ( Not Available)
+   */
+  public long getEstimatedSizeForHDFSRegion();
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/cli/ConverterHint.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/cli/ConverterHint.java b/geode-core/src/main/java/com/gemstone/gemfire/management/cli/ConverterHint.java
index a1e70e7..69e079d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/cli/ConverterHint.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/cli/ConverterHint.java
@@ -48,4 +48,5 @@ public interface ConverterHint {
   public static final String LOG_LEVEL             = "converter.hint.log.levels";
 
   public static final String STRING_DISABLER       = "converter.hint.disable-string-converter";
+  public static final String HDFSSTORE_ALL         = "converter.hint.cluster.hdfsstore";
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionBridge.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionBridge.java
index 48b899b..5fbbc61 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionBridge.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionBridge.java
@@ -674,4 +674,9 @@ public class DistributedRegionBridge {
       return false;
     }
   }
+  
+  public long getEstimatedSizeForHDFSRegion() {
+    return monitor.getEstimatedSizeForHDFSRegion();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionMBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionMBean.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionMBean.java
index 4580e7f..549acc7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionMBean.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedRegionMBean.java
@@ -321,4 +321,9 @@ public class DistributedRegionMBean implements DistributedRegionMXBean {
     return bridge.getEntrySize();
   }
 
+  @Override
+  public long getEstimatedSizeForHDFSRegion() {
+    return bridge.getEstimatedSizeForHDFSRegion();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemBridge.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemBridge.java
index 632415a..bcacc41 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemBridge.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemBridge.java
@@ -821,6 +821,25 @@ public class DistributedSystemBridge {
     return Collections.emptyMap();
   }
   
+  
+  /**
+   *  @return A map of all {@link DistributedMember}s and their HDFSStore's.
+   */  
+  
+  public Map<String, String[]> getMemberHDFSStoreMap() {
+    Iterator<MemberMXBean> memberIterator = mapOfMembers.values().iterator();    
+    if (memberIterator != null) {
+      Map<String, String[]> mapOfHdfs = new HashMap<String, String[]>();
+      while (memberIterator.hasNext()) {
+        MemberMXBean bean = memberIterator.next();
+        mapOfHdfs.put(bean.getMember(), bean.getHDFSStores());
+      }
+
+      return mapOfHdfs;
+    }
+    return Collections.emptyMap();
+  }
+
   /**
    *
    * @param member

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemMBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemMBean.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemMBean.java
index 3458bf5..bd92f9f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemMBean.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/DistributedSystemMBean.java
@@ -450,4 +450,11 @@ public class DistributedSystemMBean extends NotificationBroadcasterSupport
   public void setQueryCollectionsDepth(int queryCollectionsDepth) {
     bridge.setQueryCollectionsDepth(queryCollectionsDepth);;
   }
+
+  @Override
+  public Map<String, String[]> listMemberHDFSStore() {
+    return bridge.getMemberHDFSStoreMap();
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/HDFSRegionBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/HDFSRegionBridge.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/HDFSRegionBridge.java
new file mode 100644
index 0000000..29bc246
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/HDFSRegionBridge.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.management.internal.beans;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion.SizeEntry;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.management.internal.ManagementConstants;
+import com.gemstone.gemfire.management.internal.beans.stats.MBeanStatsMonitor;
+import com.gemstone.gemfire.management.internal.beans.stats.StatType;
+import com.gemstone.gemfire.management.internal.beans.stats.StatsRate;
+
+/**
+ * 
+ * 
+ * MBean Bridge for HDFS region which is a type of Partitioned Region
+ */
+public class HDFSRegionBridge<K, V> extends PartitionedRegionBridge<K, V> {
+
+  private SortedOplogStatistics soplogStats;
+
+  private MBeanStatsMonitor hdfsRegionMonitor;
+
+  private static final String WRITTEN_BYTES = "writeBytes";
+
+  private static final String READ_BYTES = "readBytes";
+
+  private static final String SCANNED_BYTES = "scanBytes";
+
+  public static final String HDFS_REGION_MONITOR = "HDFSRegionMonitor";
+
+  private StatsRate diskWritesRate;
+
+  private StatsRate diskReadsRate;
+  
+  private PartitionedRegion parRegion;
+
+  public HDFSRegionBridge(Region<K, V> region) {
+    super(region);
+
+    HDFSRegionDirector director = HDFSRegionDirector.getInstance();
+
+    String regionFullPath = region.getFullPath();
+    this.soplogStats = director.getHdfsRegionStats(regionFullPath);
+    this.hdfsRegionMonitor = new MBeanStatsMonitor(HDFS_REGION_MONITOR + "_" + regionFullPath);
+    hdfsRegionMonitor.addStatisticsToMonitor(soplogStats.getStats());
+    this.parRegion = (PartitionedRegion)region;
+    configureHDFSRegionMetrics();
+  }
+
+  private void configureHDFSRegionMetrics() {
+
+    diskWritesRate = new StatsRate(WRITTEN_BYTES, StatType.INT_TYPE, hdfsRegionMonitor);
+
+    String[] readsRates = new String[] { READ_BYTES, SCANNED_BYTES };
+
+    diskReadsRate = new StatsRate(readsRates, StatType.INT_TYPE, hdfsRegionMonitor);
+  }
+
+  
+  private long estimatedEntryCount = 0;
+  
+
+  /**
+   * Initialized skipCount to 10 as for the first time we want to compute size
+   * of HDFS region.
+   */
+  private int skipCount = 10;
+
+  /**
+   * 
+   * An estimated entry count for HDFS region.This may not be accurate but acts
+   * as an indicative value.
+   * 
+   * 
+   * Even for estimating size we need to iterate over all BucketRegions and call
+   * BucketRegion.size(). This is expensive as compared to reading directly from
+   * a statistics value. Hence we are skipping 10 samples.
+   * 
+   */
+  public long getEstimatedSizeForHDFSRegion() {
+    if(parRegion.isHDFSReadWriteRegion()){
+      if(skipCount % 10 == 0) {
+        computeEntryCount();
+        skipCount = 1;
+      } else {
+        skipCount++;
+      }
+      return estimatedEntryCount;
+    }else{
+      return ManagementConstants.NOT_AVAILABLE_LONG;
+    }
+    
+  }
+  
+  private void computeEntryCount() {
+
+    if (parRegion.isDataStore()) { //if not a DataStore do nothing and keep the entryCount as 0;
+      int numLocalEntries = 0;
+      Map<Integer, SizeEntry> localPrimaryBucketRegions = parRegion.getDataStore()
+          .getSizeEstimateForLocalPrimaryBuckets();
+      if (localPrimaryBucketRegions != null && localPrimaryBucketRegions.size() > 0) {
+        for (Map.Entry<Integer, SizeEntry> me : localPrimaryBucketRegions.entrySet()) {
+          numLocalEntries += me.getValue().getSize();
+
+        }
+      }
+      this.estimatedEntryCount = numLocalEntries;
+    }
+  }
+  
+  @Override
+  public long getEntryCount() {
+    if (parRegion.isDataStore()) {
+      int numLocalEntries = 0;
+      Set<BucketRegion> localPrimaryBucketRegions = parRegion.getDataStore().getAllLocalPrimaryBucketRegions();
+      if (localPrimaryBucketRegions != null && localPrimaryBucketRegions.size() > 0) {
+        for (BucketRegion br : localPrimaryBucketRegions) {
+          // TODO soplog, fix this for griddb regions
+          numLocalEntries += br.getRegionMap().sizeInVM() - br.getTombstoneCount();
+
+        }
+      }
+      return numLocalEntries;
+    } else {
+      return  ManagementConstants.ZERO;
+    }
+  }
+
+
+  @Override
+  public long getEntrySize() {
+    return ManagementConstants.NOT_AVAILABLE_LONG;
+  }
+
+  @Override
+  public long getDiskUsage() {
+    if (soplogStats != null) {
+      return soplogStats.getStoreUsageBytes();
+    }
+    return ManagementConstants.NOT_AVAILABLE_LONG;
+  }
+
+  @Override
+  public float getDiskReadsRate() {
+    return diskReadsRate.getRate();
+  }
+
+  @Override
+  public float getDiskWritesRate() {
+    return diskWritesRate.getRate();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBean.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBean.java
index b82b94d..21d7140 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBean.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBean.java
@@ -455,6 +455,11 @@ public class MemberMBean extends NotificationBroadcasterSupport implements
   }
 
   @Override
+  public String[] getHDFSStores() {
+    return bridge.getHDFSStores();
+  }
+  
+  @Override
   public long getGetsAvgLatency() {
     return bridge.getGetsAvgLatency();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
index 638ba06..1425572 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/MemberMBeanBridge.java
@@ -49,6 +49,7 @@ import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.DiskStore;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
 import com.gemstone.gemfire.cache.persistence.PersistentID;
 import com.gemstone.gemfire.cache.wan.GatewayReceiver;
 import com.gemstone.gemfire.cache.wan.GatewaySender;
@@ -1009,6 +1010,32 @@ public class MemberMBeanBridge {
     return listDiskStores(true);
   }
 
+  
+
+  
+  /**
+   * @return list all the HDFSStore's name at cache level
+   */
+  
+  public String[] getHDFSStores() {
+    GemFireCacheImpl cacheImpl = (GemFireCacheImpl) cache;
+    String[] retStr = null;
+    Collection<HDFSStoreImpl> hdfsStoreCollection = null;
+    hdfsStoreCollection = cacheImpl.getHDFSStores();
+      
+    if (hdfsStoreCollection != null && hdfsStoreCollection.size() > 0) {
+      retStr = new String[hdfsStoreCollection.size()];
+      Iterator<HDFSStoreImpl> it = hdfsStoreCollection.iterator();
+      int i = 0;
+      while (it.hasNext()) {
+        retStr[i] = it.next().getName();
+        i++;
+
+      }
+    }
+    return retStr;
+  }
+      
   /**
    * 
    * @return log of the member.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/PartitionedRegionBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/PartitionedRegionBridge.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/PartitionedRegionBridge.java
index 7450746..3a8440a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/PartitionedRegionBridge.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/PartitionedRegionBridge.java
@@ -76,7 +76,14 @@ public class PartitionedRegionBridge<K, V>  extends RegionMBeanBridge<K, V> {
   
   
   public static <K, V> PartitionedRegionBridge<K, V> getInstance(Region<K, V> region) {
-    return new PartitionedRegionBridge<K, V> (region);
+
+    if (region.getAttributes().getDataPolicy().withHDFS()) {
+      PartitionedRegionBridge<K, V> bridge = new HDFSRegionBridge<K, V>(region);
+      return bridge;
+    } else {
+      return new PartitionedRegionBridge<K, V> (region);
+    }
+
   }
   
   
@@ -302,4 +309,8 @@ public class PartitionedRegionBridge<K, V>  extends RegionMBeanBridge<K, V> {
   public int getLocalMaxMemory() {
     return partitionAttributesData.getLocalMaxMemory();
   }
+
+  public long getEstimatedSizeForHDFSRegion() {
+    return -1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBean.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBean.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBean.java
index 86fe73e..1c7dcf7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBean.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBean.java
@@ -314,4 +314,9 @@ public class RegionMBean<K, V> extends NotificationBroadcasterSupport implements
     return bridge.getLocalMaxMemory(); 
   }
 
+  @Override
+  public long getEstimatedSizeForHDFSRegion() {
+    return bridge.getEstimatedSizeForHDFSRegion();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBeanBridge.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBeanBridge.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBeanBridge.java
index 66f61e2..cd3cb90 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBeanBridge.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/RegionMBeanBridge.java
@@ -590,4 +590,9 @@ public class RegionMBeanBridge<K, V> {
   public int getLocalMaxMemory() {
     return -1;
   }
+
+  
+  public long getEstimatedSizeForHDFSRegion() {
+    return -1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/RegionClusterStatsMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/RegionClusterStatsMonitor.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/RegionClusterStatsMonitor.java
index 7a4d9b4..c855171 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/RegionClusterStatsMonitor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/beans/stats/RegionClusterStatsMonitor.java
@@ -111,6 +111,8 @@ public class RegionClusterStatsMonitor {
 
   private static final String PERSISTENT_ENABLED = "PersistentEnabled";
   
+  private static final String ESTIMATED_SIZE_FOR_HDFS_REGION = "EstimatedSizeForHDFSRegion";
+
   private volatile long lastAccessedTime = 0;
 
   private volatile long lastModifiedTime = 0;
@@ -190,6 +192,7 @@ public class RegionClusterStatsMonitor {
     typeMap.put(AVERAGE_READS, Float.TYPE);
     typeMap.put(AVERAGE_WRITES, Float.TYPE);
     typeMap.put(ENTRY_SIZE, Long.TYPE);
+    typeMap.put(ESTIMATED_SIZE_FOR_HDFS_REGION, Long.TYPE);
 
   }
 
@@ -333,6 +336,10 @@ public class RegionClusterStatsMonitor {
   public long getTotalEntriesOnlyOnDisk() {
     return aggregator.getLongValue(TOTAL_ENTRIES_ONLY_ON_DISK);
   }
+  
+  public long getEstimatedSizeForHDFSRegion() {
+    return aggregator.getLongValue(ESTIMATED_SIZE_FOR_HDFS_REGION);
+  }
 
   public int getAvgBucketSize() {
     int bucketNum = getBucketCount();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/CreateAlterDestroyRegionCommands.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/CreateAlterDestroyRegionCommands.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/CreateAlterDestroyRegionCommands.java
index ad006b7..cb893bd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/CreateAlterDestroyRegionCommands.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/CreateAlterDestroyRegionCommands.java
@@ -210,6 +210,14 @@ public class CreateAlterDestroyRegionCommands extends AbstractCommandsSupport {
                   help = CliStrings.CREATE_REGION__GATEWAYSENDERID__HELP)
       @CliMetaData (valueSeparator = ",") 
       String[] gatewaySenderIds,
+      @CliOption (key = CliStrings.CREATE_REGION__HDFSSTORE_NAME,
+                  help = CliStrings.CREATE_REGION__HDFSSTORE_NAME__HELP ,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE)
+      String hdfsStoreName,
+      @CliOption (key = CliStrings.CREATE_REGION__HDFSSTORE_WRITEONLY,      
+                  help = CliStrings.CREATE_REGION__HDFSSTORE_WRITEONLY__HELP,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE)
+      Boolean hdfsWriteOnly,      
       @CliOption (key = CliStrings.CREATE_REGION__KEYCONSTRAINT,
                   help = CliStrings.CREATE_REGION__KEYCONSTRAINT__HELP)
       String keyConstraint,
@@ -319,7 +327,7 @@ public class CreateAlterDestroyRegionCommands extends AbstractCommandsSupport {
             prColocatedWith, prLocalMaxMemory, prRecoveryDelay,
             prRedundantCopies, prStartupRecoveryDelay,
             prTotalMaxMemory, prTotalNumBuckets,
-            offHeap, mcastEnabled, regionAttributes);
+            offHeap, mcastEnabled, hdfsStoreName , hdfsWriteOnly,  regionAttributes);
         
 
         if (regionAttributes.getPartitionAttributes() == null && regionFunctionArgs.hasPartitionAttributes()) {
@@ -339,7 +347,7 @@ public class CreateAlterDestroyRegionCommands extends AbstractCommandsSupport {
           concurrencyChecksEnabled, cloningEnabled, concurrencyLevel, 
           prColocatedWith, prLocalMaxMemory, prRecoveryDelay,
           prRedundantCopies, prStartupRecoveryDelay,
-          prTotalMaxMemory, prTotalNumBuckets, null,compressor, offHeap , mcastEnabled);
+          prTotalMaxMemory, prTotalNumBuckets, null,compressor, offHeap , mcastEnabled, hdfsStoreName , hdfsWriteOnly);
         
         if (!regionShortcut.name().startsWith("PARTITION") && regionFunctionArgs.hasPartitionAttributes()) {
           throw new IllegalArgumentException(

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/HDFSStoreCommands.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/HDFSStoreCommands.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/HDFSStoreCommands.java
new file mode 100644
index 0000000..6e573f1
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/commands/HDFSStoreCommands.java
@@ -0,0 +1,695 @@
+package com.gemstone.gemfire.management.internal.cli.commands;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+
+import org.springframework.shell.core.annotation.CliAvailabilityIndicator;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.execute.Execution;
+import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
+import com.gemstone.gemfire.cache.execute.ResultCollector;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.lang.ClassUtils;
+import com.gemstone.gemfire.management.cli.CliMetaData;
+import com.gemstone.gemfire.management.cli.ConverterHint;
+import com.gemstone.gemfire.management.cli.Result;
+import com.gemstone.gemfire.management.cli.Result.Status;
+import com.gemstone.gemfire.management.internal.cli.CliUtil;
+import com.gemstone.gemfire.management.internal.cli.functions.AlterHDFSStoreFunction;
+import com.gemstone.gemfire.management.internal.cli.functions.AlterHDFSStoreFunction.AlterHDFSStoreAttributes;
+import com.gemstone.gemfire.management.internal.cli.functions.CliFunctionResult;
+import com.gemstone.gemfire.management.internal.cli.functions.CreateHDFSStoreFunction;
+import com.gemstone.gemfire.management.internal.cli.functions.DescribeHDFSStoreFunction;
+import com.gemstone.gemfire.management.internal.cli.functions.DestroyHDFSStoreFunction;
+import com.gemstone.gemfire.management.internal.cli.functions.ListHDFSStoresFunction;
+import com.gemstone.gemfire.management.internal.cli.functions.ListHDFSStoresFunction.HdfsStoreDetails;
+import com.gemstone.gemfire.management.internal.cli.i18n.CliStrings;
+import com.gemstone.gemfire.management.internal.cli.result.CommandResultException;
+import com.gemstone.gemfire.management.internal.cli.result.CompositeResultData;
+import com.gemstone.gemfire.management.internal.cli.result.ResultBuilder;
+import com.gemstone.gemfire.management.internal.cli.result.ResultDataException;
+import com.gemstone.gemfire.management.internal.cli.result.TabularResultData;
+import com.gemstone.gemfire.management.internal.cli.util.HDFSStoreNotFoundException;
+import com.gemstone.gemfire.management.internal.cli.util.MemberNotFoundException;
+import com.gemstone.gemfire.management.internal.configuration.SharedConfigurationWriter;
+import com.gemstone.gemfire.management.internal.configuration.domain.XmlEntity;
+
+/**
+ * The HdfsStoreCommands class encapsulates all GemFire Hdfs Store commands in Gfsh.
+ *  </p>
+ *  
+ * @author Namrata Thanvi
+ * @see com.gemstone.gemfire.management.internal.cli.commands.AbstractCommandsSupport
+ */
+
+
+public class HDFSStoreCommands   extends AbstractCommandsSupport {  
+  @CliCommand (value = CliStrings.CREATE_HDFS_STORE, help = CliStrings.CREATE_HDFS_STORE__HELP)
+  @CliMetaData (relatedTopic = CliStrings.TOPIC_GEMFIRE_HDFSSTORE, writesToSharedConfiguration = true)
+  public Result createHdfsStore(      
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__NAME,                  
+                  mandatory = true,
+                  optionContext = ConverterHint.HDFSSTORE_ALL, 
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__NAME__HELP)
+      String hdfsUniqueName,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__NAMENODE,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__NAMENODE__HELP)
+      String namenode, 
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__HOMEDIR,
+                  optionContext = ConverterHint.DIR_PATHSTRING,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__HOMEDIR__HELP)
+      String homeDir,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__BATCHSIZE,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__BATCHSIZE__HELP)
+      Integer batchSize,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__BATCHINTERVAL,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__BATCHINTERVAL__HELP)
+      Integer batchInterval,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__READCACHESIZE,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__READCACHESIZE__HELP)
+      Float readCacheSize,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__DISPATCHERTHREADS,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.CREATE_HDFS_STORE__DISPATCHERTHREADS__HELP)
+      Integer dispatcherThreads,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__MAXMEMORY,
+                  mandatory = false,
+                  unspecifiedDefaultValue =CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__MAXMEMORY__HELP)
+      Integer maxMemory,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__BUFFERPERSISTENT,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__BUFFERPERSISTENT__HELP)
+      Boolean bufferPersistent,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__SYNCDISKWRITE,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__SYNCDISKWRITE__HELP)
+      Boolean syncDiskWrite,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__DISKSTORENAME,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__DISKSTORENAME__HELP)
+      String diskStoreName,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__MINORCOMPACT,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__MINORCOMPACT__HELP)
+      Boolean minorCompact,            
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__MINORCOMPACTIONTHREADS,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__MINORCOMPACTIONTHREADS__HELP)
+      Integer minorCompactionThreads,
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__MAJORCOMPACT,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__MAJORCOMPACT__HELP)
+      Boolean majorCompact,   
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__MAJORCOMPACTINTERVAL,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__MAJORCOMPACTINTERVAL__HELP)
+      Integer majorCompactionInterval, 
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__MAJORCOMPACTIONTHREADS,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__MAJORCOMPACTIONTHREADS__HELP)
+      Integer majorCompactionThreads,  
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__PURGEINTERVAL,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__PURGEINTERVAL__HELP)
+      Integer purgeInterval,  
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__WRITEONLYFILESIZE,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__WRITEONLYFILESIZE__HELP)
+      Integer maxWriteonlyFileSize,  
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__FILEROLLOVERINTERVAL,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__FILEROLLOVERINTERVAL__HELP)
+      Integer fileRolloverInterval,  
+      @CliOption (key = CliStrings.CREATE_HDFS_STORE__CLIENTCONFIGFILE,
+                  optionContext = ConverterHint.FILE_PATHSTRING,
+                  mandatory = false,
+                  unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+                  help = CliStrings.CREATE_HDFS_STORE__CLIENTCONFIGFILE__HELP)      
+      String clientConfigFile,
+      @CliOption(key=CliStrings.CREATE_HDFS_STORE__GROUP,
+                 help=CliStrings.CREATE_HDFS_STORE__GROUP__HELP,
+                 optionContext=ConverterHint.MEMBERGROUP)
+      @CliMetaData (valueSeparator = ",")
+       String[] groups ) {
+    try {
+      
+      return getCreatedHdfsStore(groups, hdfsUniqueName, namenode, homeDir, clientConfigFile, fileRolloverInterval,
+          maxWriteonlyFileSize, minorCompact, majorCompact, batchSize, batchInterval, diskStoreName, bufferPersistent,
+          dispatcherThreads, syncDiskWrite, readCacheSize, majorCompactionInterval, majorCompactionThreads,
+          minorCompactionThreads, purgeInterval, maxMemory);
+      
+    } catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+
+    } catch (Throwable th) {
+      String formattedErrString = CliStrings.format(CliStrings.CREATE_HDFS_STORE__ERROR_WHILE_CREATING_REASON_0,
+          new Object[] { th.getMessage() });
+      SystemFailure.checkFailure();
+      return ResultBuilder.createGemFireErrorResult(formattedErrString);
+    }
+  }
+
+  public Result getCreatedHdfsStore(String[] groups, String hdfsUniqueName, String namenode, String homeDir,
+      String clientConfigFile, Integer fileRolloverInterval, Integer maxWriteonlyFileSize, Boolean minorCompact,
+      Boolean majorCompact, Integer batchSize, Integer batchInterval, String diskStoreName, Boolean bufferPersistent,
+      Integer dispatcherThreads, Boolean syncDiskWrite, Float readCacheSize, Integer majorCompactionInterval,
+      Integer majorCompactionThreads, Integer minorCompactionThreads, Integer purgeInterval, Integer maxMemory) {
+
+    XmlEntity xmlEntity = null;
+
+    Set<DistributedMember> targetMembers = null;
+
+    try {
+      targetMembers = getGroupMembers(groups);
+    } catch (CommandResultException cre) {
+      return cre.getResult();
+    }
+
+    HDFSStoreConfigHolder configHolder = new HDFSStoreConfigHolder();
+    configHolder.setName(hdfsUniqueName);
+    if (readCacheSize != null)
+      configHolder.setBlockCacheSize(readCacheSize);
+
+    if (fileRolloverInterval != null)
+      configHolder.setWriteOnlyFileRolloverInterval(fileRolloverInterval);
+    if (clientConfigFile != null)
+      configHolder.setHDFSClientConfigFile(clientConfigFile);
+    if (homeDir != null)
+      configHolder.setHomeDir(homeDir);
+    if (maxWriteonlyFileSize != null)
+      configHolder.setWriteOnlyFileRolloverSize(maxWriteonlyFileSize);
+    if (namenode != null)
+      configHolder.setNameNodeURL(namenode);
+
+    if (minorCompact != null)
+      configHolder.setMinorCompaction(minorCompact);
+    if (majorCompact != null)
+      configHolder.setMajorCompaction(majorCompact);
+    if (majorCompactionInterval != null)
+      configHolder.setMajorCompactionInterval(majorCompactionInterval);
+    if (majorCompactionThreads != null)
+      configHolder.setMajorCompactionThreads(majorCompactionThreads);
+    if (minorCompactionThreads != null)
+      configHolder.setMinorCompactionThreads(minorCompactionThreads);
+    if (purgeInterval != null)
+      configHolder.setPurgeInterval(purgeInterval);
+
+    if (batchSize != null)
+      configHolder.setBatchSize(batchSize);
+    if (batchInterval != null)
+      configHolder.setBatchInterval(batchInterval);
+    if (diskStoreName != null)
+      configHolder.setDiskStoreName(diskStoreName);
+    if (syncDiskWrite != null)
+      configHolder.setSynchronousDiskWrite(syncDiskWrite);
+    if (dispatcherThreads != null)
+      configHolder.setDispatcherThreads(dispatcherThreads);
+    if (maxMemory != null)
+      configHolder.setMaxMemory(maxMemory);
+    if (bufferPersistent != null)
+      configHolder.setBufferPersistent(bufferPersistent);
+
+    ResultCollector<?, ?> resultCollector = getMembersFunctionExecutor(targetMembers)
+    .withArgs(configHolder).execute(new CreateHDFSStoreFunction());
+    
+    List<CliFunctionResult> hdfsStoreCreateResults = CliFunctionResult.cleanResults((List<?>)resultCollector
+        .getResult());
+
+    TabularResultData tabularResultData = ResultBuilder.createTabularResultData();
+
+    Boolean accumulatedData = false;
+
+    for (CliFunctionResult hdfsStoreCreateResult : hdfsStoreCreateResults) {
+      if (hdfsStoreCreateResult.getThrowable() != null) {
+        String memberId = hdfsStoreCreateResult.getMemberIdOrName();
+        String errorMsg = hdfsStoreCreateResult.getThrowable().getMessage();
+        String errClass = hdfsStoreCreateResult.getThrowable().getClass().getName();
+        tabularResultData.accumulate("Member", memberId);
+        tabularResultData.accumulate("Result", "ERROR: " + errClass + ": " + errorMsg);
+        accumulatedData = true;
+        tabularResultData.setStatus(Status.ERROR);
+      }
+      else if (hdfsStoreCreateResult.isSuccessful()) {
+        String memberId = hdfsStoreCreateResult.getMemberIdOrName();
+        String successMsg = hdfsStoreCreateResult.getMessage();
+        tabularResultData.accumulate("Member", memberId);
+        tabularResultData.accumulate("Result", successMsg);
+        if (xmlEntity == null) {
+          xmlEntity = hdfsStoreCreateResult.getXmlEntity();
+        }
+        accumulatedData = true;
+      }
+    }
+
+    if (!accumulatedData) {
+      return ResultBuilder.createInfoResult("Unable to create hdfs store:" + hdfsUniqueName);
+    }
+
+    Result result = ResultBuilder.buildResult(tabularResultData);
+    if (xmlEntity != null) {
+      result.setCommandPersisted((new SharedConfigurationWriter()).addXmlEntity(xmlEntity, groups));
+    }
+
+    return ResultBuilder.buildResult(tabularResultData);
+  }
+  
+  
+  @CliCommand(value = CliStrings.DESCRIBE_HDFS_STORE, help = CliStrings.DESCRIBE_HDFS_STORE__HELP)
+  @CliMetaData(shellOnly = false, relatedTopic = { CliStrings.TOPIC_GEMFIRE_HDFSSTORE})
+  public Result describeHdfsStore(
+      @CliOption(key = CliStrings.DESCRIBE_HDFS_STORE__MEMBER, 
+                 mandatory = true, optionContext = ConverterHint.MEMBERIDNAME, 
+                 help = CliStrings.DESCRIBE_HDFS_STORE__MEMBER__HELP)
+      final String memberName,
+      @CliOption(key = CliStrings.DESCRIBE_HDFS_STORE__NAME, 
+                 mandatory = true, 
+                 optionContext = ConverterHint.HDFSSTORE_ALL, 
+                 help = CliStrings.DESCRIBE_HDFS_STORE__NAME__HELP)
+      final String hdfsStoreName) {
+    try{
+      return toCompositeResult(getHDFSStoreDescription(memberName , hdfsStoreName));
+      
+      }catch (HDFSStoreNotFoundException e){
+         return ResultBuilder.createShellClientErrorResult(((HDFSStoreNotFoundException)e).getMessage());
+      } 
+      catch (FunctionInvocationTargetException ignore) {
+      return ResultBuilder.createGemFireErrorResult(CliStrings.format(CliStrings.COULD_NOT_EXECUTE_COMMAND_TRY_AGAIN,
+          CliStrings.DESCRIBE_HDFS_STORE));
+      
+    } catch (MemberNotFoundException e) {
+      return ResultBuilder.createShellClientErrorResult(e.getMessage());
+      
+    } catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+      
+    } catch (Throwable t) {
+      SystemFailure.checkFailure();
+      return ResultBuilder.createGemFireErrorResult(String.format(CliStrings.DESCRIBE_HDFS_STORE__ERROR_MESSAGE,
+          memberName, hdfsStoreName, t));
+    }
+  }        
+  
+  public HDFSStoreConfigHolder getHDFSStoreDescription(String memberName, String hdfsStoreName) {
+
+    final DistributedMember member = getMember(getCache(), memberName);
+    
+    ResultCollector<?, ?> resultCollector = getMembersFunctionExecutor(Collections.singleton(member))
+    .withArgs(hdfsStoreName).execute(new DescribeHDFSStoreFunction());
+    
+    Object result = ((List<?>)resultCollector.getResult()).get(0);
+
+    if (result instanceof HDFSStoreConfigHolder) {
+      return (HDFSStoreConfigHolder)result;
+    }
+    if (result instanceof HDFSStoreNotFoundException) {
+      throw (HDFSStoreNotFoundException)result;
+    }
+    else {
+      final Throwable cause = (result instanceof Throwable ? (Throwable)result : null);
+      throw new RuntimeException(CliStrings.format(CliStrings.UNEXPECTED_RETURN_TYPE_EXECUTING_COMMAND_ERROR_MESSAGE,
+          ClassUtils.getClassName(result), CliStrings.DESCRIBE_HDFS_STORE), cause);
+
+    }
+  }
+  
+  public Result toCompositeResult(final HDFSStoreConfigHolder storePrms) {
+    final CompositeResultData hdfsStoreCompositeResult = ResultBuilder.createCompositeResultData();
+    final CompositeResultData.SectionResultData hdfsStoreSection = hdfsStoreCompositeResult.addSection();
+
+    hdfsStoreSection.addData("Hdfs Store Name", storePrms.getName());
+    hdfsStoreSection.addData("Name Node URL", storePrms.getNameNodeURL());
+    hdfsStoreSection.addData("Home Dir", storePrms.getHomeDir());
+    hdfsStoreSection.addData("Block Cache", storePrms.getBlockCacheSize());
+    hdfsStoreSection.addData("File RollOver Interval", storePrms.getWriteOnlyFileRolloverInterval());
+    hdfsStoreSection.addData("Max WriteOnly File Size", storePrms.getWriteOnlyFileRolloverSize());
+
+    hdfsStoreSection.addData("Client Configuration File", storePrms.getHDFSClientConfigFile());
+
+    hdfsStoreSection.addData("Disk Store Name", storePrms.getDiskStoreName());
+    hdfsStoreSection.addData("Batch Size In MB", storePrms.getBatchSize());
+    hdfsStoreSection.addData("Batch Interval Time", storePrms.getBatchInterval());
+    hdfsStoreSection.addData("Maximum Memory", storePrms.getMaxMemory());
+    hdfsStoreSection.addData("Dispatcher Threads", storePrms.getDispatcherThreads());
+    hdfsStoreSection.addData("Buffer Persistence", storePrms.getBufferPersistent());
+    hdfsStoreSection.addData("Synchronous Persistence", storePrms.getSynchronousDiskWrite());
+
+    hdfsStoreSection.addData("Major Compaction Enabled", storePrms.getMajorCompaction());
+    hdfsStoreSection.addData("Major Compaction Threads", storePrms.getMajorCompactionThreads());
+    hdfsStoreSection.addData("Major compaction Interval", storePrms.getMajorCompactionInterval());
+    hdfsStoreSection.addData("Minor Compaction Enabled", storePrms.getMinorCompaction());
+    hdfsStoreSection.addData("Minor Compaction Threads", storePrms.getMinorCompactionThreads());
+    hdfsStoreSection.addData("Purge Interval", storePrms.getPurgeInterval());
+
+    return ResultBuilder.buildResult(hdfsStoreCompositeResult);
+  } 
+  
+  @CliCommand(value = CliStrings.LIST_HDFS_STORE, help = CliStrings.LIST_HDFS_STORE__HELP)
+  @CliMetaData(shellOnly = false, relatedTopic = { CliStrings.TOPIC_GEMFIRE_HDFSSTORE })
+  public Result listHdfsStore() {  
+    try {
+      Set<DistributedMember> dataMembers = getNormalMembers(getCache());
+      if (dataMembers.isEmpty()) {
+        return ResultBuilder.createInfoResult(CliStrings.NO_CACHING_MEMBERS_FOUND_MESSAGE);
+      }
+      return toTabularResult(getHdfsStoreListing(dataMembers));
+
+    } catch (FunctionInvocationTargetException ignore) {
+      return ResultBuilder.createGemFireErrorResult(
+          CliStrings.format(CliStrings.COULD_NOT_EXECUTE_COMMAND_TRY_AGAIN,
+          CliStrings.LIST_HDFS_STORE));
+
+    } catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+
+    } catch (Throwable t) {
+      SystemFailure.checkFailure();
+      return ResultBuilder.createGemFireErrorResult(
+          String.format(CliStrings.LIST_HDFS_STORE__ERROR_MESSAGE, t.getMessage()));
+    }
+  }
+  
+  protected List<HdfsStoreDetails> getHdfsStoreListing(Set<DistributedMember> members) {
+
+    final Execution membersFunctionExecutor = getMembersFunctionExecutor(members);
+
+    if (membersFunctionExecutor instanceof AbstractExecution) {
+      ((AbstractExecution)membersFunctionExecutor).setIgnoreDepartedMembers(true);
+    }
+
+    final ResultCollector<?, ?> resultCollector = membersFunctionExecutor.execute(new ListHDFSStoresFunction());
+    final List<?> results = (List<?>)resultCollector.getResult();
+    final List<HdfsStoreDetails> hdfsStoreList = new ArrayList<HdfsStoreDetails>(results.size());
+
+    for (final Object result : results) {
+      if (result instanceof Set) { // ignore FunctionInvocationTargetExceptions and other Exceptions...
+        hdfsStoreList.addAll((Set<HdfsStoreDetails>)result);
+      }
+    }
+
+    Collections.sort(hdfsStoreList, new Comparator<HdfsStoreDetails>() {
+      public <T extends Comparable<T>> int compare(final T obj1, final T obj2) {
+        return (obj1 == null && obj2 == null ? 0 : (obj1 == null ? 1 : (obj2 == null ? -1 : obj1.compareTo(obj2))));
+      }
+
+      @Override
+      public int compare(HdfsStoreDetails store1, HdfsStoreDetails store2) {
+        int comparisonValue = compare(store1.getMemberName(), store2.getMemberName());
+        comparisonValue = (comparisonValue != 0 ? comparisonValue : compare(store1.getMemberId(), store2.getMemberId()));
+        return (comparisonValue != 0 ? comparisonValue : store1.getStoreName().compareTo(store2.getStoreName()));
+      }
+    });
+
+    return hdfsStoreList;
+  }
+  
+
+  protected Result toTabularResult(final List<HdfsStoreDetails> hdfsStoreList) throws ResultDataException {
+    if (!hdfsStoreList.isEmpty()) {
+      final TabularResultData hdfsStoreData = ResultBuilder.createTabularResultData();
+      for (final HdfsStoreDetails hdfsStoreDetails : hdfsStoreList) {
+        hdfsStoreData.accumulate("Member Name", hdfsStoreDetails.getMemberName());
+        hdfsStoreData.accumulate("Member Id", hdfsStoreDetails.getMemberId());
+        hdfsStoreData.accumulate("Hdfs Store Name", hdfsStoreDetails.getStoreName());
+      }
+      return ResultBuilder.buildResult(hdfsStoreData);
+    }
+    else {
+      return ResultBuilder.createInfoResult(CliStrings.LIST_HDFS_STORE__HDFS_STORES_NOT_FOUND_MESSAGE);
+    }
+  }
+  
+
+  @CliCommand(value=CliStrings.DESTROY_HDFS_STORE, help=CliStrings.DESTROY_HDFS_STORE__HELP)
+  @CliMetaData(shellOnly=false, relatedTopic={CliStrings.TOPIC_GEMFIRE_HDFSSTORE}, writesToSharedConfiguration=true)
+  public Result destroyHdfstore(
+      @CliOption  (key=CliStrings.DESTROY_HDFS_STORE__NAME, 
+                   optionContext=ConverterHint.HDFSSTORE_ALL,
+                   mandatory=true,
+                   help=CliStrings.DESTROY_HDFS_STORE__NAME__HELP)
+        String hdfsStoreName,
+      @CliOption(key=CliStrings.DESTROY_HDFS_STORE__GROUP,
+                 help=CliStrings.DESTROY_HDFS_STORE__GROUP__HELP,
+                 optionContext=ConverterHint.MEMBERGROUP)
+      @CliMetaData (valueSeparator = ",")
+        String[] groups) {
+    try{      
+       return destroyStore(hdfsStoreName,groups);
+ 
+    } catch (FunctionInvocationTargetException ignore) {
+      return ResultBuilder.createGemFireErrorResult(CliStrings.format(CliStrings.COULD_NOT_EXECUTE_COMMAND_TRY_AGAIN,
+          CliStrings.DESTROY_HDFS_STORE));
+      
+    } catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+      
+    } catch (Throwable th) {
+      SystemFailure.checkFailure();
+      return ResultBuilder.createGemFireErrorResult(CliStrings.format(
+          CliStrings.DESTROY_HDFS_STORE__ERROR_WHILE_DESTROYING_REASON_0, new Object[] { th.getMessage() }));
+    }
+ }
+  
+  protected Result destroyStore(String hdfsStoreName , String[] groups){
+      TabularResultData tabularData = ResultBuilder.createTabularResultData();
+      boolean accumulatedData = false;
+
+      Set<DistributedMember> targetMembers = null;
+      try {
+        targetMembers = getGroupMembers(groups);
+      } catch (CommandResultException cre) {
+        return cre.getResult();
+      }
+      
+      ResultCollector<?, ?> rc = getMembersFunctionExecutor(targetMembers)
+      .withArgs(hdfsStoreName).execute(new DestroyHDFSStoreFunction());
+      
+      List<CliFunctionResult> results = CliFunctionResult.cleanResults((List<?>)rc.getResult());
+
+      XmlEntity xmlEntity = null;
+      for (CliFunctionResult result : results) {
+        
+        if (result.getThrowable() != null) {
+          tabularData.accumulate("Member", result.getMemberIdOrName());
+          tabularData.accumulate("Result", "ERROR: " + result.getThrowable().getClass().getName() + ": "
+              + result.getThrowable().getMessage());
+          accumulatedData = true;
+          tabularData.setStatus(Status.ERROR);
+        }
+        else if (result.getMessage() != null) {
+          tabularData.accumulate("Member", result.getMemberIdOrName());
+          tabularData.accumulate("Result", result.getMessage());
+          accumulatedData = true;
+          
+          if (xmlEntity == null) {
+            xmlEntity = result.getXmlEntity();
+          }
+        }
+      }
+      
+      if (!accumulatedData) {
+        return ResultBuilder.createInfoResult("No matching hdfs stores found.");
+      }
+      
+      Result result = ResultBuilder.buildResult(tabularData);
+      if (xmlEntity != null) {
+        result.setCommandPersisted((new SharedConfigurationWriter()).deleteXmlEntity(xmlEntity, groups));
+      }
+      
+      return result;
+  }
+  @CliCommand(value=CliStrings.ALTER_HDFS_STORE, help=CliStrings.ALTER_HDFS_STORE__HELP)
+  @CliMetaData(shellOnly=false, relatedTopic={CliStrings.TOPIC_GEMFIRE_HDFSSTORE}, writesToSharedConfiguration=true)
+  public Result alterHdfstore(
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__NAME,                  
+          mandatory = true,
+          optionContext = ConverterHint.HDFSSTORE_ALL, 
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__NAME__HELP)
+      String hdfsUniqueName,     
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__BATCHSIZE,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__BATCHSIZE__HELP)
+      Integer batchSize,
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__BATCHINTERVAL,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__BATCHINTERVAL__HELP)
+      Integer batchInterval,      
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__MINORCOMPACT,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__MINORCOMPACT__HELP)
+      Boolean minorCompact,                                                                                                         
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__MINORCOMPACTIONTHREADS,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__MINORCOMPACTIONTHREADS__HELP)
+      Integer minorCompactionThreads,
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__MAJORCOMPACT,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__MAJORCOMPACT__HELP)
+      Boolean majorCompact,   
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__MAJORCOMPACTINTERVAL,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__MAJORCOMPACTINTERVAL__HELP)
+      Integer majorCompactionInterval, 
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__MAJORCOMPACTIONTHREADS,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__MAJORCOMPACTIONTHREADS__HELP)
+      Integer majorCompactionThreads,  
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__PURGEINTERVAL,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__PURGEINTERVAL__HELP)
+      Integer purgeInterval,        
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__FILEROLLOVERINTERVAL,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__FILEROLLOVERINTERVAL__HELP)
+      Integer fileRolloverInterval,
+      @CliOption (key = CliStrings.ALTER_HDFS_STORE__WRITEONLYFILESIZE,
+          mandatory = false,
+          unspecifiedDefaultValue = CliMetaData.ANNOTATION_NULL_VALUE,
+          help = CliStrings.ALTER_HDFS_STORE__WRITEONLYFILESIZE__HELP)
+      Integer maxWriteonlyFileSize,  
+      @CliOption(key=CliStrings.ALTER_HDFS_STORE__GROUP,
+         help=CliStrings.ALTER_HDFS_STORE__GROUP__HELP,
+         optionContext=ConverterHint.MEMBERGROUP)
+      @CliMetaData (valueSeparator = ",")
+      String[] groups){
+    try {                         
+      
+      return getAlteredHDFSStore(groups, hdfsUniqueName, batchSize, batchInterval, minorCompact,
+          minorCompactionThreads, majorCompact, majorCompactionInterval, majorCompactionThreads, purgeInterval,
+          fileRolloverInterval, maxWriteonlyFileSize);
+      
+    } catch (FunctionInvocationTargetException ignore) {
+      return ResultBuilder.createGemFireErrorResult(CliStrings.format(CliStrings.COULD_NOT_EXECUTE_COMMAND_TRY_AGAIN,
+          CliStrings.ALTER_HDFS_STORE));
+      
+    } catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+      
+    } catch (Throwable th) {
+      SystemFailure.checkFailure();
+      return ResultBuilder.createGemFireErrorResult(CliStrings.format(
+          CliStrings.ALTER_HDFS_STORE__ERROR_WHILE_ALTERING_REASON_0, new Object[] { th.getMessage() }));
+    }
+ }
+  
+  
+  protected Result getAlteredHDFSStore(String[] groups, String hdfsUniqueName, Integer batchSize,
+      Integer batchInterval, Boolean minorCompact, Integer minorCompactionThreads, Boolean majorCompact,
+      Integer majorCompactionInterval, Integer majorCompactionThreads, Integer purgeInterval,
+      Integer fileRolloverInterval, Integer maxWriteonlyFileSize) {
+    
+    Set<DistributedMember> targetMembers = null;
+    try {
+      targetMembers = getGroupMembers(groups);
+    } catch (CommandResultException cre) {
+      return cre.getResult();
+    }
+    
+    TabularResultData tabularData = ResultBuilder.createTabularResultData();
+    
+	AlterHDFSStoreAttributes alterAttributes = new AlterHDFSStoreAttributes(
+				hdfsUniqueName, batchSize, batchInterval, minorCompact,
+				majorCompact, minorCompactionThreads, majorCompactionInterval,
+				majorCompactionThreads, purgeInterval, fileRolloverInterval,
+				maxWriteonlyFileSize);
+	
+    ResultCollector<?, ?> rc = getMembersFunctionExecutor(targetMembers)
+    .withArgs(alterAttributes).execute(new AlterHDFSStoreFunction());
+    
+    List<CliFunctionResult> results = CliFunctionResult.cleanResults((List<?>)rc.getResult());
+
+    XmlEntity xmlEntity = null;
+
+    for (CliFunctionResult result : results) {
+      if (result.getThrowable() != null) {
+        tabularData.accumulate("Member", result.getMemberIdOrName());
+        tabularData.accumulate("Result", "ERROR: " + result.getThrowable().getClass().getName() + ": "
+            + result.getThrowable().getMessage());
+        tabularData.setStatus(Status.ERROR);
+      }
+      else if (result.getMessage() != null) {
+        tabularData.accumulate("Member", result.getMemberIdOrName());
+        tabularData.accumulate("Result", result.getMessage());
+
+        if (xmlEntity == null) {
+          xmlEntity = result.getXmlEntity();
+        }
+      }
+    }
+    
+    Result result = ResultBuilder.buildResult(tabularData);
+    
+    if (xmlEntity != null) {
+      result.setCommandPersisted((new SharedConfigurationWriter()).deleteXmlEntity(xmlEntity, groups));
+    }
+    
+    return result;
+  }
+  @CliAvailabilityIndicator({CliStrings.CREATE_HDFS_STORE, CliStrings.LIST_HDFS_STORE,
+    CliStrings.DESCRIBE_HDFS_STORE, CliStrings.ALTER_HDFS_STORE, CliStrings.DESTROY_HDFS_STORE})
+  public boolean hdfsStoreCommandsAvailable() {
+    // these hdfs store commands are always available in GemFire
+    return (!CliUtil.isGfshVM() || (getGfsh() != null && getGfsh().isConnectedAndReady()));
+  }  
+  
+  @Override
+  protected Set<DistributedMember> getMembers(final Cache cache) {
+    return CliUtil.getAllMembers(cache);
+  }
+  
+  protected Set<DistributedMember> getNormalMembers(final Cache cache) {
+    return CliUtil.getAllNormalMembers(cache);
+  }
+  
+  protected Set<DistributedMember> getGroupMembers(String[] groups) throws CommandResultException {    
+      return  CliUtil.findAllMatchingMembers(groups, null); 
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/converters/HdfsStoreNameConverter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/converters/HdfsStoreNameConverter.java b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/converters/HdfsStoreNameConverter.java
new file mode 100644
index 0000000..e595c77
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/management/internal/cli/converters/HdfsStoreNameConverter.java
@@ -0,0 +1,88 @@
+/*
+ * =========================================================================
+ *  Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ *  This product is protected by U.S. and international copyright
+ *  and intellectual property laws. Pivotal products are covered by
+ *  more patents listed at http://www.pivotal.io/patents.
+ * ========================================================================
+ */
+package com.gemstone.gemfire.management.internal.cli.converters;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import com.gemstone.gemfire.management.cli.ConverterHint;
+import com.gemstone.gemfire.management.internal.cli.shell.Gfsh;
+
+import org.springframework.shell.core.Completion;
+import org.springframework.shell.core.Converter;
+import org.springframework.shell.core.MethodTarget;
+
+/**
+ * 
+ * @author Namrata Thanvi
+ * 
+ */
+
+public class HdfsStoreNameConverter implements Converter<String> {
+
+  @Override
+  public boolean supports(Class<?> type, String optionContext) {
+    return String.class.equals(type) && ConverterHint.HDFSSTORE_ALL.equals(optionContext);
+  }
+
+  @Override
+  public String convertFromText(String value, Class<?> targetType, String optionContext) {
+    return value;
+  }
+
+  @Override
+  public boolean getAllPossibleValues(List<Completion> completions, Class<?> targetType, String existingData,
+      String optionContext, MethodTarget target) {
+    if (String.class.equals(targetType) && ConverterHint.HDFSSTORE_ALL.equals(optionContext)) {
+      Set<String> hdfsStoreNames = getHdfsStoreNames();
+
+      for (String hdfsStoreName : hdfsStoreNames) {
+        if (existingData != null) {
+          if (hdfsStoreName.startsWith(existingData)) {
+            completions.add(new Completion(hdfsStoreName));
+          }
+        }
+        else {
+          completions.add(new Completion(hdfsStoreName));
+        }
+      }
+    }
+
+    return !completions.isEmpty();
+  }
+
+  private Set<String> getHdfsStoreNames() {
+    SortedSet<String> hdfsStoreNames = new TreeSet<String>();
+    Gfsh gfsh = Gfsh.getCurrentInstance();
+
+    if (gfsh != null && gfsh.isConnectedAndReady()) {
+      Map<String, String[]> hdfsStoreInfo = gfsh.getOperationInvoker().getDistributedSystemMXBean()
+          .listMemberHDFSStore();
+      if (hdfsStoreInfo != null) {
+        Set<Entry<String, String[]>> entries = hdfsStoreInfo.entrySet();
+
+        for (Entry<String, String[]> entry : entries) {
+          String[] value = entry.getValue();
+          if (value != null) {
+            hdfsStoreNames.addAll(Arrays.asList(value));
+          }
+        }
+
+      }
+    }
+
+    return hdfsStoreNames;
+  }
+
+}


Mime
View raw message