geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ash...@apache.org
Subject [3/7] incubator-geode git commit: GEODE-10: Refactor HdfsStore api to match spec
Date Fri, 17 Jul 2015 06:03:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
index 50ea3c6..d663e3d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
@@ -13,13 +13,9 @@ import java.io.Serializable;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
-import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory;
 import com.gemstone.gemfire.cache.hdfs.HDFSStore;
 import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
 import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator.HDFSCompactionConfigMutator;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator.HDFSEventQueueAttributesMutator;
 import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
 import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
@@ -43,11 +39,23 @@ public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Seria
   private float blockCacheSize = DEFAULT_BLOCK_CACHE_SIZE;
   private int maxFileSize = DEFAULT_MAX_WRITE_ONLY_FILE_SIZE;
   private int fileRolloverInterval = DEFAULT_WRITE_ONLY_FILE_ROLLOVER_INTERVAL;
-  protected boolean isAutoCompact = HDFSCompactionConfig.DEFAULT_AUTO_COMPACTION;
-
-  private AbstractHDFSCompactionConfigHolder compactionConfig = null;
-
-  private HDFSEventQueueAttributes hdfsEventQueueAttrs = new HDFSEventQueueAttributesFactory().create();
+  protected boolean isAutoCompact = DEFAULT_MINOR_COMPACTION;
+  protected boolean autoMajorCompact = DEFAULT_MAJOR_COMPACTION;
+  protected int maxConcurrency = DEFAULT_MINOR_COMPACTION_THREADS;
+  protected int majorCompactionConcurrency = DEFAULT_MAJOR_COMPACTION_THREADS;
+  protected int majorCompactionIntervalMins = DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS;
+  protected int maxInputFileSizeMB = DEFAULT_MAX_INPUT_FILE_SIZE_MB;
+  protected int maxInputFileCount = DEFAULT_MAX_INPUT_FILE_COUNT;
+  protected int minInputFileCount = DEFAULT_MIN_INPUT_FILE_COUNT;
+  protected int oldFileCleanupIntervalMins = DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS;
+  
+  protected int batchSize = DEFAULT_BATCH_SIZE_MB;
+  protected int batchIntervalMillis = DEFAULT_BATCH_INTERVAL_MILLIS;
+  protected int maximumQueueMemory = DEFAULT_MAX_BUFFER_MEMORY;
+  protected boolean isPersistenceEnabled = DEFAULT_BUFFER_PERSISTANCE;
+  protected String diskStoreName = null;
+  protected boolean diskSynchronous = DEFAULT_DISK_SYNCHRONOUS; 
+  protected int dispatcherThreads = DEFAULT_DISPATCHER_THREADS;
   
   private static final Logger logger = LogService.getLogger();
   protected final String logPrefix;
@@ -62,9 +70,6 @@ public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Seria
   public HDFSStoreConfigHolder(HDFSStore config) {
     this.logPrefix = "<" + getName() + "> ";
     if (config == null) {
-      // initialize default compaction strategy and leave the rest for getting
-      // set later
-      this.compactionConfig = AbstractHDFSCompactionConfigHolder.createInstance(null);
       return;
     }
     
@@ -72,12 +77,26 @@ public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Seria
     this.namenodeURL = config.getNameNodeURL();
     this.homeDir = config.getHomeDir();
     this.clientConfigFile = config.getHDFSClientConfigFile();
-    setHDFSCompactionConfig(config.getHDFSCompactionConfig());
     this.blockCacheSize = config.getBlockCacheSize();
-    setHDFSEventQueueAttributes(config.getHDFSEventQueueAttributes());
-    this.maxFileSize = config.getMaxFileSize();
-    this.fileRolloverInterval = config.getFileRolloverInterval();
-    setMinorCompaction(config.getMinorCompaction());
+    this.maxFileSize = config.getMaxWriteOnlyFileSize();
+    this.fileRolloverInterval = config.getWriteOnlyFileRolloverInterval();
+    isAutoCompact = config.getMinorCompaction();
+    maxConcurrency = config.getMinorCompactionThreads();
+    autoMajorCompact = config.getMajorCompaction();
+    majorCompactionConcurrency = config.getMajorCompactionThreads();
+    majorCompactionIntervalMins = config.getMajorCompactionInterval();
+    maxInputFileSizeMB = config.getMaxInputFileSizeMB();
+    maxInputFileCount = config.getMaxInputFileCount();
+    minInputFileCount = config.getMinInputFileCount();
+    oldFileCleanupIntervalMins = config.getPurgeInterval();
+    
+    batchSize = config.getBatchSize();
+    batchIntervalMillis = config.getBatchInterval();
+    maximumQueueMemory = config.getMaxMemory();
+    isPersistenceEnabled = config.getBufferPersistent();
+    diskStoreName = config.getDiskStoreName();
+    diskSynchronous = config.getSynchronousDiskWrite();
+    dispatcherThreads = config.getDispatcherThreads();
   }
   
   public void resetDefaultValues() {
@@ -89,40 +108,83 @@ public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Seria
     maxFileSize = -1;
     fileRolloverInterval = -1;
     
-    compactionConfig.resetDefaultValues();
     isAutoCompact = false;
+    maxConcurrency = -1;
+    maxInputFileSizeMB = -1;
+    maxInputFileCount = -1;
+    minInputFileCount = -1;
+    oldFileCleanupIntervalMins = -1;
+
+    autoMajorCompact = false;
+    majorCompactionConcurrency = -1;
+    majorCompactionIntervalMins = -1;
     
-    // TODO reset hdfseventqueueattributes;
+    batchSize = -1;
+    batchIntervalMillis = -1;
+    maximumQueueMemory = -1;
+    isPersistenceEnabled = false;
+    diskStoreName = null;
+    diskSynchronous = false; 
+    dispatcherThreads = -1;
   }
   
   public void copyFrom(HDFSStoreMutator mutator) {
-    if (mutator.getFileRolloverInterval() >= 0) {
-      logAttrMutation("fileRolloverInterval", mutator.getFileRolloverInterval());
-      setFileRolloverInterval(mutator.getFileRolloverInterval());
+    if (mutator.getWriteOnlyFileRolloverInterval() >= 0) {
+      logAttrMutation("fileRolloverInterval", mutator.getWriteOnlyFileRolloverInterval());
+      setWriteOnlyFileRolloverInterval(mutator.getWriteOnlyFileRolloverInterval());
     }
-    if (mutator.getMaxFileSize() >= 0) {
-      logAttrMutation("MaxFileSize", mutator.getFileRolloverInterval());
-      setMaxFileSize(mutator.getMaxFileSize());
+    if (mutator.getMaxWriteOnlyFileSize() >= 0) {
+      logAttrMutation("MaxFileSize", mutator.getWriteOnlyFileRolloverInterval());
+      setMaxWriteOnlyFileSize(mutator.getMaxWriteOnlyFileSize());
     }
     
-    compactionConfig.copyFrom(mutator.getCompactionConfigMutator());
     if (mutator.getMinorCompaction() != null) {
       logAttrMutation("MinorCompaction", mutator.getMinorCompaction());
       setMinorCompaction(mutator.getMinorCompaction());
     }
     
-    HDFSEventQueueAttributesFactory newFactory = new HDFSEventQueueAttributesFactory(hdfsEventQueueAttrs);
-    HDFSEventQueueAttributesMutator qMutator = mutator.getHDFSEventQueueAttributesMutator();
-
-    if (qMutator.getBatchSizeMB() >= 0) {
-      logAttrMutation("batchSizeMB", mutator.getFileRolloverInterval());
-      newFactory.setBatchSizeMB(qMutator.getBatchSizeMB());
+    if (mutator.getMinorCompactionThreads() >= 0) {
+      logAttrMutation("MaxThreads", mutator.getMinorCompactionThreads());
+      setMinorCompactionThreads(mutator.getMinorCompactionThreads());
+    }
+    
+    if (mutator.getMajorCompactionInterval() > -1) {
+      logAttrMutation("MajorCompactionIntervalMins", mutator.getMajorCompactionInterval());
+      setMajorCompactionInterval(mutator.getMajorCompactionInterval());
+    }
+    if (mutator.getMajorCompactionThreads() >= 0) {
+      logAttrMutation("MajorCompactionMaxThreads", mutator.getMajorCompactionThreads());
+      setMajorCompactionThreads(mutator.getMajorCompactionThreads());
+    }
+    if (mutator.getMajorCompaction() != null) {
+      logAttrMutation("AutoMajorCompaction", mutator.getMajorCompaction());
+      setMajorCompaction(mutator.getMajorCompaction());
     }
-    if (qMutator.getBatchTimeInterval() >= 0) {
-      logAttrMutation("batchTimeInterval", mutator.getFileRolloverInterval());
-      newFactory.setBatchTimeInterval(qMutator.getBatchTimeInterval());
+    if (mutator.getMaxInputFileCount() >= 0) {
+      logAttrMutation("maxInputFileCount", mutator.getMaxInputFileCount());
+      setMaxInputFileCount(mutator.getMaxInputFileCount());
+    }
+    if (mutator.getMaxInputFileSizeMB() >= 0) {
+      logAttrMutation("MaxInputFileSizeMB", mutator.getMaxInputFileSizeMB());
+      setMaxInputFileSizeMB(mutator.getMaxInputFileSizeMB());
+    }
+    if (mutator.getMinInputFileCount() >= 0) {
+      logAttrMutation("MinInputFileCount", mutator.getMinInputFileCount());
+      setMinInputFileCount(mutator.getMinInputFileCount());
+    }    
+    if (mutator.getPurgeInterval() >= 0) {
+      logAttrMutation("OldFilesCleanupIntervalMins", mutator.getPurgeInterval());
+      setPurgeInterval(mutator.getPurgeInterval());
+    }
+    
+    if (mutator.getBatchSize() >= 0) {
+      logAttrMutation("batchSizeMB", mutator.getWriteOnlyFileRolloverInterval());
+      setBatchSize(mutator.getBatchSize());
+    }
+    if (mutator.getBatchInterval() >= 0) {
+      logAttrMutation("batchTimeInterval", mutator.getWriteOnlyFileRolloverInterval());
+      setBatchInterval(mutator.getBatchInterval());
     }
-    hdfsEventQueueAttrs = newFactory.create();
   }
 
   void logAttrMutation(String name, Object value) {
@@ -185,59 +247,25 @@ public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Seria
     return blockCacheSize;
   }
   
-  /**
-   * Sets the HDFS event queue attributes
-   * This causes the store to use the {@link HDFSEventQueueAttributes}.
-   * @param hdfsEventQueueAttrs the attributes of the HDFS Event queue
-   */
-  public HDFSStoreFactory setHDFSEventQueueAttributes(HDFSEventQueueAttributes hdfsEventQueueAttrs) {
-    this.hdfsEventQueueAttrs  = hdfsEventQueueAttrs;
-    return this;
-  }
-  @Override
-  public HDFSEventQueueAttributes getHDFSEventQueueAttributes() {
-    return hdfsEventQueueAttrs;
-  }
-
-  @Override
-  public AbstractHDFSCompactionConfigHolder getHDFSCompactionConfig() {
-    return compactionConfig;
-  }
-  @Override
-  public HDFSStoreConfigHolder setHDFSCompactionConfig(HDFSCompactionConfig config) {
-    if (config == null) {
-      return this;
-    }
-    
-    String s = config.getCompactionStrategy();
-    compactionConfig = AbstractHDFSCompactionConfigHolder.createInstance(s);
-    compactionConfig.copyFrom(config);
-    return this;
-  }
-  @Override
-  public HDFSCompactionConfigFactory createCompactionConfigFactory(String name) {
-    return AbstractHDFSCompactionConfigHolder.createInstance(name);
-  }
-  
   @Override
-  public HDFSStoreFactory setMaxFileSize(int maxFileSize) {
+  public HDFSStoreFactory setMaxWriteOnlyFileSize(int maxFileSize) {
     assertIsPositive(CacheXml.HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL, maxFileSize);
     this.maxFileSize = maxFileSize;
     return this;
   }
   @Override
-  public int getMaxFileSize() {
+  public int getMaxWriteOnlyFileSize() {
     return maxFileSize;
   }
 
   @Override
-  public HDFSStoreFactory setFileRolloverInterval(int count) {
+  public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
     assertIsPositive(CacheXml.HDFS_TIME_FOR_FILE_ROLLOVER, count);
     this.fileRolloverInterval = count;
     return this;
   }
   @Override
-  public int getFileRolloverInterval() {
+  public int getWriteOnlyFileRolloverInterval() {
     return fileRolloverInterval;
   }
   
@@ -251,255 +279,102 @@ public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Seria
     return this;
   }
 
-  /**
-   * Abstract config class for compaction configuration. A concrete class must
-   * extend setters for all configurations it consumes. This class will throw an
-   * exception for any unexpected configuration. Concrete class must also
-   * validate the configuration
-   * 
-   * @author ashvina
-   */
-  public static abstract class AbstractHDFSCompactionConfigHolder implements
-      HDFSCompactionConfig, HDFSCompactionConfigFactory , Serializable{
-    protected int maxInputFileSizeMB = HDFSCompactionConfig.DEFAULT_MAX_INPUT_FILE_SIZE_MB;
-    protected int maxInputFileCount = HDFSCompactionConfig.DEFAULT_MAX_INPUT_FILE_COUNT;
-    protected int minInputFileCount = HDFSCompactionConfig.DEFAULT_MIN_INPUT_FILE_COUNT;
-
-    protected int maxConcurrency = HDFSCompactionConfig.DEFAULT_MAX_THREADS;
-    
-    protected boolean autoMajorCompact = HDFSCompactionConfig.DEFAULT_AUTO_MAJOR_COMPACTION;
-    protected int majorCompactionConcurrency = HDFSCompactionConfig.DEFAULT_MAJOR_COMPACTION_MAX_THREADS;
-    protected int majorCompactionIntervalMins = HDFSCompactionConfig.DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS;
-    protected int oldFileCleanupIntervalMins = HDFSCompactionConfig.DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS;
-    
-    
-    public AbstractHDFSCompactionConfigHolder() {
-      
-    }
-    
-    void copyFrom(HDFSCompactionConfig config) {
-      setMaxInputFileSizeMB(config.getMaxInputFileSizeMB());
-      setMaxInputFileCount(config.getMaxInputFileCount());
-      setMinInputFileCount(config.getMinInputFileCount());
-      setMaxThreads(config.getMaxThreads());
-      setAutoMajorCompaction(config.getAutoMajorCompaction());
-      setMajorCompactionMaxThreads(config.getMajorCompactionMaxThreads());
-      setMajorCompactionIntervalMins(config.getMajorCompactionIntervalMins());
-      setOldFilesCleanupIntervalMins(config.getOldFilesCleanupIntervalMins());
-    }
-    
-    void copyFrom(HDFSCompactionConfigMutator mutator) {
-      if (mutator.getMaxInputFileCount() >= 0) {
-        logAttrMutation("maxInputFileCount", mutator.getMaxInputFileCount());
-        setMaxInputFileCount(mutator.getMaxInputFileCount());
-      }
-      if (mutator.getMaxInputFileSizeMB() >= 0) {
-        logAttrMutation("MaxInputFileSizeMB", mutator.getMaxInputFileSizeMB());
-        setMaxInputFileSizeMB(mutator.getMaxInputFileSizeMB());
-      }
-      if (mutator.getMaxThreads() >= 0) {
-        logAttrMutation("MaxThreads", mutator.getMaxThreads());
-        setMaxThreads(mutator.getMaxThreads());
-      }
-      if (mutator.getMinInputFileCount() >= 0) {
-        logAttrMutation("MinInputFileCount", mutator.getMinInputFileCount());
-        setMinInputFileCount(mutator.getMinInputFileCount());
-      }
-      
-      if (mutator.getMajorCompactionIntervalMins() > -1) {
-        logAttrMutation("MajorCompactionIntervalMins", mutator.getMajorCompactionIntervalMins());
-        setMajorCompactionIntervalMins(mutator.getMajorCompactionIntervalMins());
-      }
-      if (mutator.getMajorCompactionMaxThreads() >= 0) {
-        logAttrMutation("MajorCompactionMaxThreads", mutator.getMajorCompactionMaxThreads());
-        setMajorCompactionMaxThreads(mutator.getMajorCompactionMaxThreads());
-      }
-      if (mutator.getAutoMajorCompaction() != null) {
-        logAttrMutation("AutoMajorCompaction", mutator.getAutoMajorCompaction());
-        setAutoMajorCompaction(mutator.getAutoMajorCompaction());
-      }
-      
-      if (mutator.getOldFilesCleanupIntervalMins() >= 0) {
-        logAttrMutation("OldFilesCleanupIntervalMins", mutator.getOldFilesCleanupIntervalMins());
-        setOldFilesCleanupIntervalMins(mutator.getOldFilesCleanupIntervalMins());
-      }
-    }
-    
-    void logAttrMutation(String name, Object value) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Alter " + name + ":" + value);
-      }
-    }
-    
-    public void resetDefaultValues() {
-      maxInputFileSizeMB = -1;
-      maxInputFileCount = -1;
-      minInputFileCount = -1;
-      maxConcurrency = -1;
-
-      autoMajorCompact = false;
-      majorCompactionConcurrency = -1;
-      majorCompactionIntervalMins = -1;
-      oldFileCleanupIntervalMins = -1;
-    }
-
-    @Override
-    public HDFSCompactionConfigFactory setMaxInputFileSizeMB(int size) {
-      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
-    }
-    @Override
-    public int getMaxInputFileSizeMB() {
-      return maxInputFileSizeMB;
-    }
-
-    @Override
-    public HDFSCompactionConfigFactory setMinInputFileCount(int count) {
-      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
-    }
-    @Override
-    public int getMinInputFileCount() {
-      return minInputFileCount;
-    }
-
-    @Override
-    public HDFSCompactionConfigFactory setMaxInputFileCount(int size) {
-      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
-    }
-    @Override
-    public int getMaxInputFileCount() {
-      return maxInputFileCount;
-    }
-
-    @Override
-    public HDFSCompactionConfigFactory setMaxThreads(int count) {
-      assertIsPositive(CacheXml.HDFS_MINOR_COMPACTION_THREADS, count);
-      this.maxConcurrency = count;
-      return this;
-    }
-    @Override
-    public int getMaxThreads() {
-      return maxConcurrency;
-    }
-
-    @Override
-    public HDFSCompactionConfigFactory setAutoMajorCompaction(boolean auto) {
-      this.autoMajorCompact = auto;
-      return this;
-    }
-    @Override
-    public boolean getAutoMajorCompaction() {
-      return autoMajorCompact;
-    }
-
-    @Override
-    public HDFSCompactionConfigFactory setMajorCompactionIntervalMins(int count) {
-      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
-    }
-    @Override
-    public int getMajorCompactionIntervalMins() {
-      return majorCompactionIntervalMins;
-    }
+  @Override
+  public HDFSStoreFactory setMinorCompactionThreads(int count) {
+    assertIsPositive(CacheXml.HDFS_MINOR_COMPACTION_THREADS, count);
+    this.maxConcurrency = count;
+    return this;
+  }
+  @Override
+  public int getMinorCompactionThreads() {
+    return maxConcurrency;
+  }
 
-    @Override
-    public HDFSCompactionConfigFactory setMajorCompactionMaxThreads(int count) {
-      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
-    }
-    @Override
-    public int getMajorCompactionMaxThreads() {
-      return majorCompactionConcurrency;
-    }
-    
-    
-    @Override
-    public int getOldFilesCleanupIntervalMins() {
-      return oldFileCleanupIntervalMins ;
-    }    
-    @Override
-    public HDFSCompactionConfigFactory setOldFilesCleanupIntervalMins(int interval) {
-      assertIsPositive(CacheXml.HDFS_PURGE_INTERVAL, interval);
-      this.oldFileCleanupIntervalMins = interval;
-      return this;
-    }
+  @Override
+  public HDFSStoreFactory setMajorCompaction(boolean auto) {
+    this.autoMajorCompact = auto;
+    return this;
+  }
+  @Override
+  public boolean getMajorCompaction() {
+    return autoMajorCompact;
+  }
 
-    @Override
-    public HDFSCompactionConfig getConfigView() {
-      return (HDFSCompactionConfig) this;
-    }
-    
-    @Override
-    public HDFSCompactionConfig create() throws GemFireConfigException {
-      AbstractHDFSCompactionConfigHolder config = createInstance(getCompactionStrategy());
-      config.copyFrom(this);
-      config.validate();
-      return config;
-    }
-    
-    protected void validate() {
-    }
+  @Override
+  public HDFSStoreFactory setMajorCompactionInterval(int count) {
+    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count);
+    this.majorCompactionIntervalMins = count;
+    return this;
+  }
+  @Override
+  public int getMajorCompactionInterval() {
+    return majorCompactionIntervalMins;
+  }
 
-    public static AbstractHDFSCompactionConfigHolder createInstance(String name) {
-      if (name == null) {
-        name = DEFAULT_STRATEGY;
-      }
+  @Override
+  public HDFSStoreFactory setMajorCompactionThreads(int count) {
+    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count);
+    this.majorCompactionConcurrency = count;
+    return this;
+  }
+  @Override
+  public int getMajorCompactionThreads() {
+    return majorCompactionConcurrency;
+  }
+  
+  @Override
+  public HDFSStoreFactory setMaxInputFileSizeMB(int size) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size);
+    this.maxInputFileSizeMB = size;
+    return this;
+  }
+  @Override
+  public int getMaxInputFileSizeMB() {
+    return maxInputFileSizeMB;
+  }
 
-      if (name.equalsIgnoreCase(SIZE_ORIENTED)) {
-        return new SizeTieredHdfsCompactionConfigHolder();
-      }
+  @Override
+  public HDFSStoreFactory setMinInputFileCount(int count) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count);
+    this.minInputFileCount = count;
+    return this;
+  }
+  @Override
+  public int getMinInputFileCount() {
+    return minInputFileCount;
+  }
 
-      return new InvalidCompactionConfigHolder();
-    }
+  @Override
+  public HDFSStoreFactory setMaxInputFileCount(int count) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count);
+    this.maxInputFileCount = count;
+    return this;
+  }
+  @Override
+  public int getMaxInputFileCount() {
+    return maxInputFileCount;
+  }
 
-    @Override
-    public String toString() {
-      StringBuilder builder = new StringBuilder();
-      builder.append("AbstractHDFSCompactionConfigHolder@");
-      builder.append(System.identityHashCode(this));
-      builder.append("[autoMajorCompact=");
-      builder.append(autoMajorCompact);
-      builder.append(", ");
-      if (maxInputFileSizeMB > -1) {
-        builder.append("maxInputFileSizeMB=");
-        builder.append(maxInputFileSizeMB);
-        builder.append(", ");
-      }
-      if (maxInputFileCount > -1) {
-        builder.append("maxInputFileCount=");
-        builder.append(maxInputFileCount);
-        builder.append(", ");
-      }
-      if (minInputFileCount > -1) {
-        builder.append("minInputFileCount=");
-        builder.append(minInputFileCount);
-        builder.append(", ");
-      }
-      if (maxConcurrency > -1) {
-        builder.append("maxConcurrency=");
-        builder.append(maxConcurrency);
-        builder.append(", ");
-      }
-      if (majorCompactionConcurrency > -1) {
-        builder.append("majorCompactionConcurrency=");
-        builder.append(majorCompactionConcurrency);
-        builder.append(", ");
-      }
-      if (majorCompactionIntervalMins > -1) {
-        builder.append("majorCompactionIntervalMins=");
-        builder.append(majorCompactionIntervalMins);
-        builder.append(", ");
-      }
-      if (oldFileCleanupIntervalMins > -1) {
-        builder.append("oldFileCleanupIntervalMins=");
-        builder.append(oldFileCleanupIntervalMins);
-      }
-      builder.append("]");
-      return builder.toString();
-    }
+  @Override
+  public int getPurgeInterval() {
+    return oldFileCleanupIntervalMins ;
+  }    
+  @Override
+  public HDFSStoreFactory setPurgeInterval(int interval) {
+    assertIsPositive(CacheXml.HDFS_PURGE_INTERVAL, interval);
+    this.oldFileCleanupIntervalMins = interval;
+    return this;
   }
   
-  public static class InvalidCompactionConfigHolder extends AbstractHDFSCompactionConfigHolder {
-    @Override
-    public String getCompactionStrategy() {
-      return INVALID;
+  protected void validate() {
+    if (minInputFileCount > maxInputFileCount) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX
+          .toLocalizedString(new Object[] {
+              "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT",
+              minInputFileCount,
+              "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT",
+              maxInputFileCount }));
     }
   }
 
@@ -535,54 +410,59 @@ public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Seria
     StringBuilder builder = new StringBuilder();
     builder.append("HDFSStoreConfigHolder@");
     builder.append(System.identityHashCode(this));
-    builder.append(" [name=");
-    builder.append(name);
-    builder.append(", ");
-    if (namenodeURL != null) {
-      builder.append("namenodeURL=");
-      builder.append(namenodeURL);
-      builder.append(", ");
-    }
-    if (homeDir != null) {
-      builder.append("homeDir=");
-      builder.append(homeDir);
-      builder.append(", ");
-    }
-    if (clientConfigFile != null) {
-      builder.append("clientConfigFile=");
-      builder.append(clientConfigFile);
-      builder.append(", ");
-    }
+    builder.append(" [");
+    appendStrProp(builder, name, "name");
+    appendStrProp(builder, namenodeURL, "namenodeURL");
+    appendStrProp(builder, homeDir, "homeDir");
+    appendStrProp(builder, clientConfigFile, "clientConfigFile");
     if (blockCacheSize > -1) {
       builder.append("blockCacheSize=");
       builder.append(blockCacheSize);
       builder.append(", ");
     }
-    if (maxFileSize > -1) {
-      builder.append("maxFileSize=");
-      builder.append(maxFileSize);
-      builder.append(", ");
-    }
-    if (fileRolloverInterval > -1) {
-      builder.append("fileRolloverInterval=");
-      builder.append(fileRolloverInterval);
+    appendIntProp(builder, maxFileSize, "maxFileSize");
+    appendIntProp(builder, fileRolloverInterval, "fileRolloverInterval");
+    appendBoolProp(builder, isAutoCompact, "isAutoCompact");
+    appendBoolProp(builder, autoMajorCompact, "autoMajorCompact");
+    appendIntProp(builder, maxConcurrency, "maxConcurrency");
+    appendIntProp(builder, majorCompactionConcurrency, "majorCompactionConcurrency");
+    appendIntProp(builder, majorCompactionIntervalMins, "majorCompactionIntervalMins");
+    appendIntProp(builder, maxInputFileSizeMB, "maxInputFileSizeMB");
+    appendIntProp(builder, maxInputFileCount, "maxInputFileCount");
+    appendIntProp(builder, minInputFileCount, "minInputFileCount");
+    appendIntProp(builder, oldFileCleanupIntervalMins, "oldFileCleanupIntervalMins");
+    appendIntProp(builder, batchSize, "batchSize");
+    appendIntProp(builder, batchIntervalMillis, "batchInterval");
+    appendIntProp(builder, maximumQueueMemory, "maximumQueueMemory");
+    appendIntProp(builder, dispatcherThreads, "dispatcherThreads");
+    appendBoolProp(builder, isPersistenceEnabled, "isPersistenceEnabled");
+    appendStrProp(builder, diskStoreName, "diskStoreName");
+    appendBoolProp(builder, diskSynchronous, "diskSynchronous");
+
+    builder.append("]");
+    return builder.toString();
+  }
+
+  private void appendStrProp(StringBuilder builder, String value, String name) {
+    if (value != null) {
+      builder.append(name + "=");
+      builder.append(value);
       builder.append(", ");
     }
-    builder.append("minorCompaction=");
-    builder.append(isAutoCompact);
-    builder.append(", ");
+  }
 
-    if (compactionConfig != null) {
-      builder.append("compactionConfig=");
-      builder.append(compactionConfig);
+  private void appendIntProp(StringBuilder builder, int value, String name) {
+    if (value > -1) {
+      builder.append(name + "=");
+      builder.append(value);
       builder.append(", ");
     }
-    if (hdfsEventQueueAttrs != null) {
-      builder.append("hdfsEventQueueAttrs=");
-      builder.append(hdfsEventQueueAttrs);
-    }
-    builder.append("]");
-    return builder.toString();
+  }
+  
+  private void appendBoolProp(StringBuilder builder, boolean value, String name) {
+    builder.append(name + "=");
+    builder.append(value);
+    builder.append(", ");
   }
 
   @Override
@@ -598,4 +478,74 @@ public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Seria
     // completely. Hence mutator at the config holder is not needed
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public String getDiskStoreName() {
+    return this.diskStoreName;
+  }
+  @Override
+  public HDFSStoreFactory setDiskStoreName(String name) {
+    this.diskStoreName = name;
+    return this;
+  }
+
+  @Override
+  public int getBatchInterval() {
+    return this.batchIntervalMillis;
+  }
+  @Override
+  public HDFSStoreFactory setBatchInterval(int intervalMillis){
+    this.batchIntervalMillis = intervalMillis;
+    return this;
+  }
+  
+  @Override
+  public boolean getBufferPersistent() {
+    return isPersistenceEnabled;
+  }
+  @Override
+  public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
+    this.isPersistenceEnabled = isPersistent;
+    return this;
+  }
+
+  @Override
+  public int getDispatcherThreads() {
+    return dispatcherThreads;
+  }
+  @Override
+  public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
+    this.dispatcherThreads = dispatcherThreads;
+    return this;
+  }
+  
+  @Override
+  public int getMaxMemory() {
+    return this.maximumQueueMemory;
+  }
+  @Override
+  public HDFSStoreFactory setMaxMemory(int memory) {
+    this.maximumQueueMemory = memory;
+    return this;
+  }
+  
+  @Override
+  public int getBatchSize() {
+    return this.batchSize;
+  }
+  @Override
+  public HDFSStoreFactory setBatchSize(int size){
+    this.batchSize = size;
+    return this;
+  }
+  
+  @Override
+  public boolean getSynchronousDiskWrite() {
+    return this.diskSynchronous;
+  }
+  @Override
+  public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
+    this.diskSynchronous = isSynchronous;
+    return this;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
index b5fbfe8..e7121aa 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
@@ -9,28 +9,12 @@
 package com.gemstone.gemfire.cache.hdfs.internal;
 
 import com.gemstone.gemfire.GemFireConfigException;
-import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
 import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore.HDFSCompactionConfig;
 import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
 import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder.AbstractHDFSCompactionConfigHolder;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 
 /**
- * HDFS store configuration.
- * 
- * <pre>
- * {@code
- * <hdfs-store name="" home-dir="" namenode-url="">
- * <hdfs-compaction strategy="" auto-compact="" max-input-file-size-mb="" 
- *                  min-input-file-count="" max-input-file-count="" 
- *                  max-concurrency="" auto-major-compaction="" 
- *                  major-compaction-interval-mins="" major-compaction-concurrency=""/>
- * </hdfs-store>
- * }
- * </pre>
- * 
  * @author ashvina
  */
 public class HDFSStoreCreation implements HDFSStoreFactory {
@@ -45,8 +29,7 @@ public class HDFSStoreCreation implements HDFSStoreFactory {
    * @param config configuration source for creating this instance 
    */
   public HDFSStoreCreation(HDFSStoreCreation config) {
-    this.configHolder = new HDFSStoreConfigHolder(config == null ? null
-        : config.configHolder);
+    this.configHolder = new HDFSStoreConfigHolder(config == null ? null : config.configHolder);
   }
 
   @Override
@@ -79,130 +62,112 @@ public class HDFSStoreCreation implements HDFSStoreFactory {
     return this;
   }
   
-  /**
-   * Sets the HDFS event queue attributes
-   * This causes the store to use the {@link HDFSEventQueueAttributes}.
-   * @param hdfsEventQueueAttrs the attributes of the HDFS Event queue
-   * @return a reference to this RegionFactory object
-   * 
-   */
-  public HDFSStoreFactory setHDFSEventQueueAttributes(HDFSEventQueueAttributes hdfsEventQueueAttrs) {
-    configHolder.setHDFSEventQueueAttributes(hdfsEventQueueAttrs);
+  @Override
+  public HDFSStoreFactory setMaxWriteOnlyFileSize(int maxFileSize) {
+    configHolder.setMaxWriteOnlyFileSize(maxFileSize);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setWriteOnlyFileRolloverInterval(int count) {
+    configHolder.setWriteOnlyFileRolloverInterval(count);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setMinorCompaction(boolean auto) {
+    configHolder.setMinorCompaction(auto);
     return this;
   }
   
   @Override
-  public HDFSEventQueueAttributes getHDFSEventQueueAttributes() {
-    return configHolder.getHDFSEventQueueAttributes();
+  public HDFSStoreFactory setMinorCompactionThreads(int count) {
+    configHolder.setMinorCompactionThreads(count);
+    return this;
   }
 
   @Override
-  public HDFSStoreFactory setHDFSCompactionConfig(HDFSCompactionConfig config) {
-    configHolder.setHDFSCompactionConfig(config);
+  public HDFSStoreFactory setMajorCompaction(boolean auto) {
+    configHolder.setMajorCompaction(auto);
     return this;
   }
 
   @Override
-  public HDFSCompactionConfigFactory createCompactionConfigFactory(String name) {
-    return configHolder.createCompactionConfigFactory(name);
+  public HDFSStoreFactory setMajorCompactionInterval(int count) {
+    configHolder.setMajorCompactionInterval(count);
+    return this;
   }
+
   @Override
-  
-  public HDFSStoreFactory setMaxFileSize(int maxFileSize) {
-    configHolder.setMaxFileSize(maxFileSize);
+  public HDFSStoreFactory setMajorCompactionThreads(int count) {
+    configHolder.setMajorCompactionThreads(count);
     return this;
   }
 
   @Override
-  public HDFSStoreFactory setFileRolloverInterval(int count) {
-    configHolder.setFileRolloverInterval(count);
+  public HDFSStoreFactory setMaxInputFileSizeMB(int size) {
+    configHolder.setMaxInputFileSizeMB(size);
     return this;
   }
 
   @Override
-  public HDFSStoreFactory setMinorCompaction(boolean auto) {
-    configHolder.setMinorCompaction(auto);
+  public HDFSStoreFactory setMinInputFileCount(int count) {
+    configHolder.setMinInputFileCount(count);
     return this;
   }
-  
-  /**
-   * Config class for compaction configuration. A concrete class must
-   * extend setters for all configurations it consumes. This class will throw an
-   * exception for any unexpected configuration. Concrete class must also
-   * validate the configuration
-   * 
-   * @author ashvina
-   */
-  public static class HDFSCompactionConfigFactoryImpl implements
-      HDFSCompactionConfigFactory {
-    private AbstractHDFSCompactionConfigHolder configHolder;
-
-    @Override
-    public HDFSCompactionConfigFactory setMaxInputFileSizeMB(int size) {
-      configHolder.setMaxInputFileSizeMB(size);
-      return this;
-    }
 
-    @Override
-    public HDFSCompactionConfigFactory setMinInputFileCount(int count) {
-      configHolder.setMinInputFileCount(count);
-      return this;
-    }
+  @Override
+  public HDFSStoreFactory setMaxInputFileCount(int count) {
+    configHolder.setMaxInputFileCount(count);
+    return this;
+  }
 
-    @Override
-    public HDFSCompactionConfigFactory setMaxInputFileCount(int count) {
-      configHolder.setMaxInputFileCount(count);
-      return this;
-    }
+  @Override
+  public HDFSStoreFactory setPurgeInterval(int interval) {
+    configHolder.setPurgeInterval(interval);
+    return this;
+  }
 
-    @Override
-    public HDFSCompactionConfigFactory setMaxThreads(int count) {
-      configHolder.setMaxThreads(count);
-      return this;
-    }
+  @Override
+  public HDFSStoreFactory setDiskStoreName(String name) {
+    configHolder.setDiskStoreName(name);
+    return this;
+  }
 
-    @Override
-    public HDFSCompactionConfigFactory setAutoMajorCompaction(boolean auto) {
-      configHolder.setAutoMajorCompaction(auto);
-      return this;
-    }
+  @Override
+  public HDFSStoreFactory setMaxMemory(int memory) {
+    configHolder.setMaxMemory(memory);
+    return this;
+  }
 
-    @Override
-    public HDFSCompactionConfigFactory setMajorCompactionIntervalMins(int count) {
-      configHolder.setMajorCompactionIntervalMins(count);
-      return this;
-    }
+  @Override
+  public HDFSStoreFactory setBatchInterval(int intervalMillis) {
+    configHolder.setBatchInterval(intervalMillis);
+    return this;
+  }
 
-    @Override
-    public HDFSCompactionConfigFactory setMajorCompactionMaxThreads(int count) {
-      configHolder.setMajorCompactionMaxThreads(count);
-      return this;
-    }
-        
-    @Override
-    public HDFSCompactionConfigFactory setOldFilesCleanupIntervalMins(int interval) {
-      configHolder.setOldFilesCleanupIntervalMins(interval);
-      return this;
-    }
+  @Override
+  public HDFSStoreFactory setBatchSize(int size) {
+    configHolder.setBatchSize(size);
+    return this;
+  }
 
-    @Override
-    public HDFSCompactionConfig getConfigView() {
-      return configHolder.getConfigView();
-    }
-    
-    @Override
-    public HDFSCompactionConfig create() throws GemFireConfigException {
-      HDFSCompactionConfigFactoryImpl config = createInstance(configHolder.getCompactionStrategy());
-      config.configHolder.copyFrom(this.configHolder);
-      config.configHolder.validate();
-      return (HDFSCompactionConfig) config.configHolder;
-    }
-    
-    private static HDFSCompactionConfigFactoryImpl createInstance(String name) {
-      HDFSCompactionConfigFactoryImpl impl = new HDFSCompactionConfigFactoryImpl();
-      impl.configHolder = AbstractHDFSCompactionConfigHolder.createInstance(name);
-      return impl;
-    }
+  @Override
+  public HDFSStoreFactory setBufferPersistent(boolean isPersistent) {
+    configHolder.setBufferPersistent(isPersistent);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setSynchronousDiskWrite(boolean isSynchronous) {
+    configHolder.setSynchronousDiskWrite(isSynchronous);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setDispatcherThreads(int dispatcherThreads) {
+    configHolder.setDispatcherThreads(dispatcherThreads);
+    return this;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
index e7e75dc..e39e58e 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
@@ -40,6 +40,8 @@ public class HDFSStoreFactoryImpl extends HDFSStoreCreation {
       throw new GemFireConfigException("HDFS store name not provided");
     }
     
+    this.configHolder.validate();
+    
     HDFSStore result = null;
     synchronized (this) {
       if (this.cache instanceof GemFireCacheImpl) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
index 8e7e358..451ac79 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.Logger;
 
-import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
 import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
 import com.gemstone.gemfire.cache.hdfs.HDFSStore;
 import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
@@ -131,7 +130,7 @@ public class HDFSStoreImpl implements HDFSStore {
       throw new HDFSIOException(ex.getMessage(),ex);
     }    
     //HDFSCompactionConfig has already been initialized
-    long cleanUpIntervalMillis = getHDFSCompactionConfig().getOldFilesCleanupIntervalMins() * 60 * 1000;
+    long cleanUpIntervalMillis = getPurgeInterval() * 60 * 1000;
     Path cleanUpIntervalPath = new Path(getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
     HoplogUtil.exposeCleanupIntervalMillis(fileSystem, cleanUpIntervalPath, cleanUpIntervalMillis);
   }
@@ -448,7 +447,7 @@ public class HDFSStoreImpl implements HDFSStore {
     }
     HDFSStoreConfigHolder newHolder = new HDFSStoreConfigHolder(configHolder);
     newHolder.copyFrom(mutator);
-    newHolder.getHDFSCompactionConfig().validate();
+    newHolder.validate();
     HDFSStore oldStore = configHolder;
     configHolder = newHolder;
     if (logger.isDebugEnabled()) {
@@ -495,13 +494,13 @@ public class HDFSStoreImpl implements HDFSStore {
   }
 
   @Override
-  public int getMaxFileSize() {
-    return configHolder.getMaxFileSize();
+  public int getMaxWriteOnlyFileSize() {
+    return configHolder.getMaxWriteOnlyFileSize();
   }
 
   @Override
-  public int getFileRolloverInterval() {
-    return configHolder.getFileRolloverInterval();
+  public int getWriteOnlyFileRolloverInterval() {
+    return configHolder.getWriteOnlyFileRolloverInterval();
   }
 
   @Override
@@ -510,16 +509,82 @@ public class HDFSStoreImpl implements HDFSStore {
   }
 
   @Override
-  public HDFSEventQueueAttributes getHDFSEventQueueAttributes() {
-    return configHolder.getHDFSEventQueueAttributes();
+  public int getMinorCompactionThreads() {
+    return configHolder.getMinorCompactionThreads();
   }
 
   @Override
-  public HDFSCompactionConfig getHDFSCompactionConfig() {
-    return (HDFSCompactionConfig) configHolder.getHDFSCompactionConfig();
+  public boolean getMajorCompaction() {
+    return configHolder.getMajorCompaction();
   }
 
   @Override
+  public int getMajorCompactionInterval() {
+    return configHolder.getMajorCompactionInterval();
+  }
+
+  @Override
+  public int getMajorCompactionThreads() {
+    return configHolder.getMajorCompactionThreads();
+  }
+
+
+  @Override
+  public int getMaxInputFileSizeMB() {
+    return configHolder.getMaxInputFileSizeMB();
+  }
+
+  @Override
+  public int getMinInputFileCount() {
+    return configHolder.getMinInputFileCount();
+  }
+
+  @Override
+  public int getMaxInputFileCount() {
+    return configHolder.getMaxInputFileCount();
+  }
+
+  @Override
+  public int getPurgeInterval() {
+    return configHolder.getPurgeInterval();
+  }
+
+  @Override
+  public String getDiskStoreName() {
+    return configHolder.getDiskStoreName();
+  }
+
+  @Override
+  public int getMaxMemory() {
+    return configHolder.getMaxMemory();
+  }
+
+  @Override
+  public int getBatchSize() {
+    return configHolder.getBatchSize();
+  }
+
+  @Override
+  public int getBatchInterval() {
+    return configHolder.getBatchInterval();
+  }
+
+  @Override
+  public boolean getBufferPersistent() {
+    return configHolder.getBufferPersistent();
+  }
+
+  @Override
+  public boolean getSynchronousDiskWrite() {
+    return configHolder.getSynchronousDiskWrite();
+  }
+
+  @Override
+  public int getDispatcherThreads() {
+    return configHolder.getDispatcherThreads();
+  }
+  
+  @Override
   public HDFSStoreMutator createHdfsStoreMutator() {
     return new HDFSStoreMutatorImpl();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
index e4e2093..46797c4 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
@@ -8,56 +8,45 @@
 
 package com.gemstone.gemfire.cache.hdfs.internal;
 
-import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
-import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory;
 import com.gemstone.gemfire.cache.hdfs.HDFSStore;
 import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder.AbstractHDFSCompactionConfigHolder;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 
 public class HDFSStoreMutatorImpl implements HDFSStoreMutator {
   private HDFSStoreConfigHolder configHolder;
   private Boolean autoCompact;
-  private HDFSCompactionConfigMutator compactionMutator;
-  private HDFSEventQueueAttributesMutator qMutator;
+  private Boolean autoMajorCompact;
 
   public HDFSStoreMutatorImpl() {
     configHolder = new HDFSStoreConfigHolder();
     configHolder.resetDefaultValues();
-    compactionMutator = new HDFSCompactionConfigMutatorImpl(configHolder.getHDFSCompactionConfig());
-    qMutator = new HDFSEventQueueAttributesMutatorImpl(null);
   }
 
   public HDFSStoreMutatorImpl(HDFSStore store) {
     configHolder = new HDFSStoreConfigHolder(store);
-    compactionMutator = new HDFSCompactionConfigMutatorImpl(configHolder.getHDFSCompactionConfig());
-    // The following two steps are needed to set the null boolean values in compactionMutator
-    configHolder.setMinorCompaction(configHolder.getMinorCompaction());
-    compactionMutator.setAutoMajorCompaction(configHolder.getHDFSCompactionConfig().getAutoMajorCompaction());
-    qMutator = new HDFSEventQueueAttributesMutatorImpl(configHolder.getHDFSEventQueueAttributes());
   }
   
-  public HDFSStoreMutator setMaxFileSize(int maxFileSize) {
-    configHolder.setMaxFileSize(maxFileSize);
+  public HDFSStoreMutator setMaxWriteOnlyFileSize(int maxFileSize) {
+    configHolder.setMaxWriteOnlyFileSize(maxFileSize);
     return this;
   }
   @Override
-  public int getMaxFileSize() {
-    return configHolder.getMaxFileSize();
+  public int getMaxWriteOnlyFileSize() {
+    return configHolder.getMaxWriteOnlyFileSize();
   }
 
   @Override
-  public HDFSStoreMutator setFileRolloverInterval(int count) {
-    configHolder.setFileRolloverInterval(count);
+  public HDFSStoreMutator setWriteOnlyFileRolloverInterval(int count) {
+    configHolder.setWriteOnlyFileRolloverInterval(count);
     return this;
   }
   @Override
-  public int getFileRolloverInterval() {
-    return configHolder.getFileRolloverInterval();
+  public int getWriteOnlyFileRolloverInterval() {
+    return configHolder.getWriteOnlyFileRolloverInterval();
   }
 
   @Override
-  public HDFSCompactionConfigMutator setMinorCompaction(boolean auto) {
+  public HDFSStoreMutator setMinorCompaction(boolean auto) {
     autoCompact = Boolean.valueOf(auto);
     configHolder.setMinorCompaction(auto);
     return null;
@@ -68,165 +57,107 @@ public class HDFSStoreMutatorImpl implements HDFSStoreMutator {
   }
   
   @Override
-  public HDFSCompactionConfigMutator getCompactionConfigMutator() {
-    return compactionMutator;
+  public HDFSStoreMutator setMinorCompactionThreads(int count) {
+    configHolder.setMinorCompactionThreads(count);
+    return this;
   }
-
   @Override
-  public HDFSEventQueueAttributesMutator getHDFSEventQueueAttributesMutator() {
-    return qMutator;
+  public int getMinorCompactionThreads() {
+    return configHolder.getMinorCompactionThreads();
   }
-
-  public static class HDFSEventQueueAttributesMutatorImpl implements HDFSEventQueueAttributesMutator {
-    private HDFSEventQueueAttributesFactory factory = new HDFSEventQueueAttributesFactory();
-    int batchSize = -1;
-    int batchInterval = -1;
-    
-    public HDFSEventQueueAttributesMutatorImpl(HDFSEventQueueAttributes qAttrs) {
-      if (qAttrs == null) {
-        return;
-      }
-      
-      setBatchSizeMB(qAttrs.getBatchSizeMB());
-      setBatchTimeInterval(qAttrs.getBatchTimeInterval());
-    }
-    
-    @Override
-    public HDFSEventQueueAttributesMutator setBatchSizeMB(int size) {
-      factory.setBatchSizeMB(size);
-      batchSize = size;
-      // call factory.set to execute attribute value validation
-      return this;
-    }
-    @Override
-    public int getBatchSizeMB() {
-      return batchSize;
-    }
-
-    @Override
-    public HDFSEventQueueAttributesMutator setBatchTimeInterval(int interval) {
-      batchInterval = interval;
-      // call factory.set to execute attribute value validation
-      factory.setBatchTimeInterval(interval);
-      return this;
-    }
-    @Override
-    public int getBatchTimeInterval() {
-      return batchInterval;
-    }
-    
-    @Override
-    public String toString() {
-      StringBuilder builder = new StringBuilder();
-      builder.append("HDFSEventQueueAttributesMutatorImpl [");
-      if (batchSize > -1) {
-        builder.append("batchSize=");
-        builder.append(batchSize);
-        builder.append(", ");
-      }
-      if (batchInterval > -1) {
-        builder.append("batchInterval=");
-        builder.append(batchInterval);
-      }
-      builder.append("]");
-      return builder.toString();
-    }
+  
+  @Override
+  public HDFSStoreMutator setMajorCompaction(boolean auto) {
+    autoMajorCompact = Boolean.valueOf(auto);
+    configHolder.setMajorCompaction(auto);
+    return this;
+  }
+  @Override
+  public Boolean getMajorCompaction() {
+    return autoMajorCompact;
   }
 
-  /**
-   * @author ashvina
-   */
-  public static class HDFSCompactionConfigMutatorImpl implements HDFSCompactionConfigMutator {
-    private AbstractHDFSCompactionConfigHolder configHolder;
-    private Boolean autoMajorCompact;
-
-    public HDFSCompactionConfigMutatorImpl(AbstractHDFSCompactionConfigHolder configHolder) {
-      this.configHolder = configHolder;
-    }
-
-    @Override
-    public HDFSCompactionConfigMutator setMaxInputFileSizeMB(int size) {
-      configHolder.setMaxInputFileSizeMB(size);
-      return this;
-    }
-    @Override
-    public int getMaxInputFileSizeMB() {
-      return configHolder.getMaxInputFileSizeMB();
-    }
-    
-    @Override
-    public HDFSCompactionConfigMutator setMinInputFileCount(int count) {
-      configHolder.setMinInputFileCount(count);
-      return this;
-    }
-    @Override
-    public int getMinInputFileCount() {
-      return configHolder.getMinInputFileCount();
-    }
-
-    @Override
-    public HDFSCompactionConfigMutator setMaxInputFileCount(int count) {
-      configHolder.setMaxInputFileCount(count);
-      return this;
-    }
-    @Override
-    public int getMaxInputFileCount() {
-      return configHolder.getMaxInputFileCount();
-    }
-
-    @Override
-    public HDFSCompactionConfigMutator setMaxThreads(int count) {
-      configHolder.setMaxThreads(count);
-      return this;
-    }
-    @Override
-    public int getMaxThreads() {
-      return configHolder.getMaxThreads();
-    }
-    
-    @Override
-    public HDFSCompactionConfigMutator setAutoMajorCompaction(boolean auto) {
-      autoMajorCompact = Boolean.valueOf(auto);
-      configHolder.setAutoMajorCompaction(auto);
-      return this;
-    }
-    @Override
-    public Boolean getAutoMajorCompaction() {
-      return autoMajorCompact;
-    }
+  @Override
+  public HDFSStoreMutator setMajorCompactionInterval(int count) {
+    configHolder.setMajorCompactionInterval(count);
+    return this;
+  }
+  @Override
+  public int getMajorCompactionInterval() {
+    return configHolder.getMajorCompactionInterval();
+  }
 
-    @Override
-    public HDFSCompactionConfigMutator setMajorCompactionIntervalMins(int count) {
-      configHolder.setMajorCompactionIntervalMins(count);
-      return this;
-    }
-    @Override
-    public int getMajorCompactionIntervalMins() {
-      return configHolder.getMajorCompactionIntervalMins();
-    }
+  @Override
+  public HDFSStoreMutator setMajorCompactionThreads(int count) {
+    configHolder.setMajorCompactionThreads(count);
+    return this;
+  }
+  @Override
+  public int getMajorCompactionThreads() {
+    return configHolder.getMajorCompactionThreads();
+  }
 
-    @Override
-    public HDFSCompactionConfigMutator setMajorCompactionMaxThreads(int count) {
-      configHolder.setMajorCompactionMaxThreads(count);
-      return this;
-    }
-    @Override
-    public int getMajorCompactionMaxThreads() {
-      return configHolder.getMajorCompactionMaxThreads();
-    }
+  @Override
+  public HDFSStoreMutator setMaxInputFileSizeMB(int size) {
+    configHolder.setMaxInputFileSizeMB(size);
+    return this;
+  }
+  @Override
+  public int getMaxInputFileSizeMB() {
+    return configHolder.getMaxInputFileSizeMB();
+  }
+  
+  @Override
+  public HDFSStoreMutator setMinInputFileCount(int count) {
+    configHolder.setMinInputFileCount(count);
+    return this;
+  }
+  @Override
+  public int getMinInputFileCount() {
+    return configHolder.getMinInputFileCount();
+  }
+  
+  @Override
+  public HDFSStoreMutator setMaxInputFileCount(int count) {
+    configHolder.setMaxInputFileCount(count);
+    return this;
+  }
+  @Override
+  public int getMaxInputFileCount() {
+    return configHolder.getMaxInputFileCount();
+  }
+  
+  @Override
+  public HDFSStoreMutator setPurgeInterval(int interval) {
+    configHolder.setPurgeInterval(interval);
+    return this;
+  }
+  @Override
+  public int getPurgeInterval() {
+    return configHolder.getPurgeInterval();
+  }
 
-    @Override
-    public HDFSCompactionConfigMutator setOldFilesCleanupIntervalMins(
-        int interval) {
-      configHolder.setOldFilesCleanupIntervalMins(interval);
-      return this;
-    }
-    @Override
-    public int getOldFilesCleanupIntervalMins() {
-      return configHolder.getOldFilesCleanupIntervalMins();
-    }
+  @Override
+  public int getBatchSize() {
+    return configHolder.batchSize;
+  }
+  @Override
+  public HDFSStoreMutator setBatchSize(int size) {
+    configHolder.setBatchSize(size);
+    return this;
   }
 
+  
+  @Override
+  public int getBatchInterval() {
+    return configHolder.batchIntervalMillis;
+  }
+  @Override
+  public HDFSStoreMutator setBatchInterval(int interval) {
+    configHolder.setBatchInterval(interval);
+    return this;
+  }
+    
   public static void assertIsPositive(String name, int count) {
     if (count < 1) {
       throw new IllegalArgumentException(
@@ -249,15 +180,11 @@ public class HDFSStoreMutatorImpl implements HDFSStoreMutator {
       builder.append(autoCompact);
       builder.append(", ");
     }
-    if (compactionMutator.getAutoMajorCompaction() != null) {
+    if (getMajorCompaction() != null) {
       builder.append("autoMajorCompaction=");
-      builder.append(compactionMutator.getAutoMajorCompaction());
+      builder.append(getMajorCompaction());
       builder.append(", ");
     }
-    if (qMutator != null) {
-      builder.append("qMutator=");
-      builder.append(qMutator);
-    }
     builder.append("]");
     return builder.toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java
deleted file mode 100644
index 57d58b7..0000000
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory.HDFSCompactionConfigFactory;
-import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder.AbstractHDFSCompactionConfigHolder;
-import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-
-/**
- * Class for authorization and validation of HDFS size Tiered compaction config
- * 
- * @author ashvina
- */
-public class SizeTieredHdfsCompactionConfigHolder extends AbstractHDFSCompactionConfigHolder {
-  @Override
-  public String getCompactionStrategy() {
-    return SIZE_ORIENTED;
-  }
-
-  @Override
-  public HDFSCompactionConfigFactory setMaxInputFileSizeMB(int size) {
-    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size);
-    this.maxInputFileSizeMB = size;
-    return this;
-  }
-
-  @Override
-  public HDFSCompactionConfigFactory setMinInputFileCount(int count) {
-    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count);
-    this.minInputFileCount = count;
-    return this;
-  }
-
-  @Override
-  public HDFSCompactionConfigFactory setMaxInputFileCount(int count) {
-    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count);
-    this.maxInputFileCount = count;
-    return this;
-  }
-  
-  @Override
-  public HDFSCompactionConfigFactory setMajorCompactionIntervalMins(int count) {
-    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count);
-    this.majorCompactionIntervalMins = count;
-    return this;
-  }
-  
-  @Override
-  public HDFSCompactionConfigFactory setMajorCompactionMaxThreads(int count) {
-    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count);
-    this.majorCompactionConcurrency = count;
-    return this;
-  }
-
-  @Override
-  protected void validate() {
-    if (minInputFileCount > maxInputFileCount) {
-      throw new IllegalArgumentException(
-          LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX
-          .toLocalizedString(new Object[] {
-              "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT",
-              minInputFileCount,
-              "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT",
-              maxInputFileCount }));
-    }
-    super.validate();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
index d96cd11..ccfd86f 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
@@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore.HDFSCompactionConfig;
 import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
@@ -57,17 +56,14 @@ public class HDFSCompactionManager {
   private HDFSCompactionManager(HDFSStore config) {
     this.storeConfig = config;
     // configure hdfs compaction manager
-    HDFSCompactionConfig compactionConf = config.getHDFSCompactionConfig();
-    
     int capacity = Integer.getInteger(HoplogConfig.COMPCATION_QUEUE_CAPACITY,
         HoplogConfig.COMPCATION_QUEUE_CAPACITY_DEFAULT);
 
-    minorCompactor = new CompactionExecutor(compactionConf.getMaxThreads(),
-        capacity, "MinorCompactor_" + config.getName());
+    minorCompactor = new CompactionExecutor(config.getMinorCompactionThreads(), capacity, "MinorCompactor_"
+        + config.getName());
 
-    majorCompactor = new CompactionExecutor(
-        compactionConf.getMajorCompactionMaxThreads(), capacity, "MajorCompactor_"
-            + config.getName());
+    majorCompactor = new CompactionExecutor(config.getMajorCompactionThreads(), capacity, "MajorCompactor_"
+        + config.getName());
 
     minorCompactor.allowCoreThreadTimeOut(true);
     majorCompactor.allowCoreThreadTimeOut(true);
@@ -172,7 +168,7 @@ public class HDFSCompactionManager {
       if (!isForced) {
         // this is a auto generated compaction request. If auto compaction is
         // disabled, ignore this call.
-        if (isMajor && !store.getHDFSCompactionConfig().getAutoMajorCompaction()) {
+        if (isMajor && !store.getMajorCompaction()) {
           if (logger.isDebugEnabled()) {
             logger.debug("{}Major compaction is disabled. Ignoring request",logPrefix);
           }
@@ -252,7 +248,7 @@ public class HDFSCompactionManager {
       boolean isEnabled = true;
       isEnabled = storeConfig.getMinorCompaction();
       if (req.isMajor) {
-        isEnabled = storeConfig.getHDFSCompactionConfig().getAutoMajorCompaction();
+        isEnabled = storeConfig.getMajorCompaction();
       }
       if (isEnabled || req.isForced) {
         return;
@@ -260,10 +256,10 @@ public class HDFSCompactionManager {
       throw new CompactionIsDisabled(name + " is disabled");
     }
 
-    private void throwIfPoolSizeChanged(CompactionRequest task, HDFSCompactionConfig config) {
-      int threadCount = config.getMaxThreads();
+    private void throwIfPoolSizeChanged(CompactionRequest task, HDFSStore config) {
+      int threadCount = config.getMinorCompactionThreads();
       if (task.isMajor) {
-        threadCount = config.getMajorCompactionMaxThreads();
+        threadCount = config.getMajorCompactionThreads();
       }
       
       if (getCorePoolSize() < threadCount) {
@@ -284,11 +280,8 @@ public class HDFSCompactionManager {
     
     @Override
     public <T> Future<T> submit(Callable<T> task) {
-      HDFSCompactionConfig config;
-      config = HDFSCompactionManager.this.storeConfig.getHDFSCompactionConfig();
-      
       throwIfStopped((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
-      throwIfPoolSizeChanged((CompactionRequest) task, config);
+      throwIfPoolSizeChanged((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
       
       if (logger.isDebugEnabled()) {
         fineLog("New:", task, " pool:", getPoolSize(), " active:", getActiveCount());

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
index c9be401..976482a 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
@@ -159,8 +159,8 @@ public class HDFSUnsortedHoplogOrganizer extends AbstractHoplogOrganizer<Unsorte
         // append completed. If the file is to be rolled over, 
         // close writer and rename the file to a legitimate name.
         // Else, sync the already written data with HDFS nodes. 
-        int maxFileSize = this.store.getMaxFileSize() * 1024 * 1024;  
-        int fileRolloverInterval = this.store.getFileRolloverInterval(); 
+        int maxFileSize = this.store.getMaxWriteOnlyFileSize() * 1024 * 1024;  
+        int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); 
         if (writer.getCurrentSize() >= maxFileSize || 
             timeSinceLastFlush >= fileRolloverInterval) {
           closeCurrentWriter();
@@ -196,8 +196,8 @@ public class HDFSUnsortedHoplogOrganizer extends AbstractHoplogOrganizer<Unsorte
       if (writerSize < (minsizeforrollover * 1024L))
         return;
       
-      int maxFileSize = this.store.getMaxFileSize() * 1024 * 1024;  
-      int fileRolloverInterval = this.store.getFileRolloverInterval(); 
+      int maxFileSize = this.store.getMaxWriteOnlyFileSize() * 1024 * 1024;  
+      int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); 
       if (writerSize >= maxFileSize || 
           timeSinceLastFlush >= fileRolloverInterval || forceClose) {
         closeCurrentWriter();
@@ -427,7 +427,7 @@ public class HDFSUnsortedHoplogOrganizer extends AbstractHoplogOrganizer<Unsorte
       }
   
   public long getfileRolloverInterval(){
-    int fileRolloverInterval = this.store.getFileRolloverInterval(); 
+    int fileRolloverInterval = this.store.getWriteOnlyFileRolloverInterval(); 
     return fileRolloverInterval;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
index 7bb025b..2a8eed6 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HdfsSortedOplogOrganizer.java
@@ -43,7 +43,6 @@ import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
 import com.gemstone.gemfire.cache.hdfs.HDFSStore;
-import com.gemstone.gemfire.cache.hdfs.HDFSStore.HDFSCompactionConfig;
 import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
 import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
 import com.gemstone.gemfire.cache.hdfs.internal.cardinality.CardinalityMergeException;
@@ -157,8 +156,7 @@ public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHopl
     FileSystem fs = store.getFileSystem();
     Path cleanUpIntervalPath = new Path(store.getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME); 
     if (!fs.exists(cleanUpIntervalPath)) {
-      HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
-      long intervalDurationMillis = compactionConf.getOldFilesCleanupIntervalMins() * 60 * 1000;
+      long intervalDurationMillis = store.getPurgeInterval() * 60 * 1000;
       HoplogUtil.exposeCleanupIntervalMillis(fs, cleanUpIntervalPath, intervalDurationMillis);
     }
 
@@ -316,8 +314,7 @@ public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHopl
     
     // determine if a major compaction is needed and create a compaction request
     // with compaction manager
-    HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
-    if (compactionConf.getAutoMajorCompaction()) {
+    if (store.getMajorCompaction()) {
       if (isMajorCompactionNeeded()) {
         req = new CompactionRequest(regionFolder, bucketId, getCompactor(), true);
         HDFSCompactionManager.getInstance(store).submitRequest(req);
@@ -337,9 +334,8 @@ public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHopl
    */
   private boolean isMajorCompactionNeeded() throws IOException {
     // major compaction interval in milliseconds
-    HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
     
-    long majorCInterval = ((long)compactionConf.getMajorCompactionIntervalMins()) * 60 * 1000;
+    long majorCInterval = ((long)store.getMajorCompactionInterval()) * 60 * 1000;
 
     Hoplog oplog = hoplogReadersController.getOldestHoplog();
     if (oplog == null) {
@@ -677,7 +673,7 @@ public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHopl
    * @return number of files deleted
    */
    synchronized int initiateCleanup() throws IOException {
-    int conf = store.getHDFSCompactionConfig().getOldFilesCleanupIntervalMins();
+    int conf = store.getPurgeInterval();
     // minutes to milliseconds
     long intervalDurationMillis = conf * 60 * 1000;
     // Any expired hoplog with timestamp less than targetTS is a delete
@@ -752,8 +748,7 @@ public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHopl
         } else if (timestamp < ts && name.endsWith(FLUSH_HOPLOG_EXTENSION)) {
           isTarget = true;
         } else if (timestamp < ts && name.endsWith(MAJOR_HOPLOG_EXTENSION)) {
-          HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
-          long majorCInterval = ((long)compactionConf.getMajorCompactionIntervalMins()) * 60 * 1000;
+          long majorCInterval = ((long)store.getMajorCompactionInterval()) * 60 * 1000;
           if (timestamp < (System.currentTimeMillis() - majorCInterval)) {
             isTarget = true;
           }
@@ -1360,10 +1355,9 @@ public class HdfsSortedOplogOrganizer extends AbstractHoplogOrganizer<SortedHopl
       // minimum number of files that must be present for compaction to be worth
       final int MIN_FILE_COUNT_COMPACTION;
       
-      HDFSCompactionConfig compactionConf = store.getHDFSCompactionConfig();
-      MAX_COMPACTION_FILE_SIZE = ((long)compactionConf.getMaxInputFileSizeMB()) * 1024 *1024;
-      MAX_FILE_COUNT_COMPACTION = compactionConf.getMaxInputFileCount();
-      MIN_FILE_COUNT_COMPACTION = compactionConf.getMinInputFileCount();
+      MAX_COMPACTION_FILE_SIZE = ((long)store.getMaxInputFileSizeMB()) * 1024 *1024;
+      MAX_FILE_COUNT_COMPACTION = store.getMaxInputFileCount();
+      MIN_FILE_COUNT_COMPACTION = store.getMinInputFileCount();
 
       try {
         // skip till first file smaller than the max compaction file size is

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 3afb161..5487000 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -3199,7 +3199,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
         }
         // if there is no disk store, use the one configured for hdfs queue
         if (attrs.getPartitionAttributes().getLocalMaxMemory() != 0 && diskStoreName == null) {
-          diskStoreName = hdfsStore.getHDFSEventQueueAttributes().getDiskStoreName();
+          diskStoreName = hdfsStore.getDiskStoreName();
         }
       }
       // set LRU heap eviction with overflow to disk for HDFS stores with

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
index a5fd3f1..b9fcfe7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheCreation.java
@@ -525,7 +525,7 @@ public class CacheCreation implements InternalCache, Extensible<Cache> {
       if (creation.getHDFSStoreName() != null)
       {
         HDFSStoreImpl store = cache.findHDFSStore(creation.getHDFSStoreName());
-        if(store == null || store.getHDFSEventQueueAttributes() == null) {
+        if(store == null) {
           HDFSIntegrationUtil.createDefaultAsyncQueueForHDFS((Cache)cache, creation.getHDFSWriteOnly(), id);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3772869d/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
index e015624..3ec5b92 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/xmlcache/CacheXmlParser.java
@@ -78,9 +78,6 @@ import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
 import com.gemstone.gemfire.cache.client.ClientCache;
 import com.gemstone.gemfire.cache.client.PoolFactory;
 import com.gemstone.gemfire.cache.execute.Function;
-import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
-import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory;
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory.HDFSCompactionConfigFactory;
 import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreCreation;
 import com.gemstone.gemfire.cache.partition.PartitionListener;
 import com.gemstone.gemfire.cache.query.IndexType;
@@ -1060,75 +1057,60 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
       }
     }
     
-    boolean eventQueueAttributesExist = false;
-    HDFSEventQueueAttributesFactory eventFactory = new HDFSEventQueueAttributesFactory();
     Integer maxMemory = getIntValue(atts, HDFS_MAX_MEMORY);
     if (maxMemory != null) {
-      eventQueueAttributesExist = true;
-      eventFactory.setMaximumQueueMemory(maxMemory);
+      attrs.setMaxMemory(maxMemory);
     }
     
     Integer batchSize = getIntValue(atts, HDFS_BATCH_SIZE);
     if (batchSize != null) {
-      eventQueueAttributesExist = true;
-      eventFactory.setBatchSizeMB(batchSize);
+      attrs.setBatchSize(batchSize);
     }
     
     Integer batchInterval = getIntValue(atts, HDFS_BATCH_INTERVAL);
     if (batchInterval != null) {
-      eventQueueAttributesExist = true;
-      eventFactory.setBatchTimeInterval(batchInterval);
+      attrs.setBatchInterval(batchInterval);
     }
     
     Integer dispatcherThreads = getIntValue(atts, HDFS_DISPATCHER_THREADS);
     if (dispatcherThreads != null) {
-      eventQueueAttributesExist = true;
-      eventFactory.setDispatcherThreads(dispatcherThreads);
+      attrs.setDispatcherThreads(dispatcherThreads);
     }
     
     Boolean bufferPersistent = getBoolean(atts, HDFS_BUFFER_PERSISTENT);
     if (bufferPersistent != null) {
-      eventQueueAttributesExist = true;
-      eventFactory.setPersistent(bufferPersistent);
+      attrs.setBufferPersistent(bufferPersistent);
     }
     
     Boolean synchronousDiskWrite = getBoolean(atts, HDFS_SYNCHRONOUS_DISK_WRITE);
     if (synchronousDiskWrite != null) {
-      eventQueueAttributesExist = true;
-      eventFactory.setDiskSynchronous(synchronousDiskWrite);
+      attrs.setSynchronousDiskWrite(synchronousDiskWrite);
     }
     
     String diskstoreName = atts.getValue(HDFS_DISK_STORE);
     if (diskstoreName != null) {
-      eventQueueAttributesExist = true;
-      eventFactory.setDiskStoreName(diskstoreName);
+      attrs.setDiskStoreName(diskstoreName);
     }
     
-    if (eventQueueAttributesExist) {
-      HDFSEventQueueAttributes eventQAttribs = eventFactory.create();
-      attrs.setHDFSEventQueueAttributes(eventQAttribs);
-    }
-   
-    HDFSCompactionConfigFactory config = attrs.createCompactionConfigFactory(null);
     Integer purgeInterval = getInteger(atts, HDFS_PURGE_INTERVAL);
     if (purgeInterval != null) {
-      config.setOldFilesCleanupIntervalMins(purgeInterval);
+      attrs.setPurgeInterval(purgeInterval);
     }
     Boolean majorCompaction = getBoolean(atts, HDFS_MAJOR_COMPACTION);
     if (majorCompaction != null) {
-      config.setAutoMajorCompaction(Boolean.valueOf(majorCompaction));
+      attrs.setMajorCompaction(Boolean.valueOf(majorCompaction));
     }
     
     // configure major compaction interval
     Integer majorCompactionInterval = getIntValue(atts, HDFS_MAJOR_COMPACTION_INTERVAL);
     if (majorCompactionInterval != null) {
-      config.setMajorCompactionIntervalMins(majorCompactionInterval);
+      attrs.setMajorCompactionInterval(majorCompactionInterval);
     }
     
     // configure compaction concurrency
     Integer value = getIntValue(atts, HDFS_MAJOR_COMPACTION_THREADS);
     if (value != null)
-      config.setMajorCompactionMaxThreads(value);
+      attrs.setMajorCompactionThreads(value);
     
     Boolean minorCompaction = getBoolean(atts, HDFS_MINOR_COMPACTION);
     if (minorCompaction != null) {
@@ -1138,18 +1120,16 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
     // configure compaction concurrency
     value = getIntValue(atts, HDFS_MINOR_COMPACTION_THREADS);
     if (value != null)
-      config.setMaxThreads(value);
+      attrs.setMinorCompactionThreads(value);
     
-    attrs.setHDFSCompactionConfig(config.getConfigView());
-
     String maxFileSize = atts.getValue(HDFS_MAX_WRITE_ONLY_FILE_SIZE);
     if (maxFileSize != null) {
-      attrs.setMaxFileSize(parseInt(maxFileSize));
+      attrs.setMaxWriteOnlyFileSize(parseInt(maxFileSize));
     }
     
     String fileRolloverInterval = atts.getValue(HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL);
     if (fileRolloverInterval != null) {
-      attrs.setFileRolloverInterval(parseInt(fileRolloverInterval));
+      attrs.setWriteOnlyFileRolloverInterval(parseInt(fileRolloverInterval));
     }
     stack.push(name);
     stack.push(attrs);
@@ -1194,60 +1174,6 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
     return null;
   }
   
-  private void startHDFSEventQueue(Attributes atts) {
-    HDFSEventQueueAttributesFactory eventFactory = new HDFSEventQueueAttributesFactory();
-    
-    //batch-size
-    String batchSize = atts.getValue(HDFS_QUEUE_BATCH_SIZE);
-    if(batchSize != null){
-      eventFactory.setBatchSizeMB(Integer.parseInt(batchSize));
-    }
-    
-    //batch-size
-    String batchInterval = atts.getValue(BATCH_TIME_INTERVAL);
-    if(batchInterval != null){
-      eventFactory.setBatchTimeInterval(Integer.parseInt(batchInterval));
-    }
-    
-    //maximum-queue-memory
-    String maxQueueMemory = atts.getValue(MAXIMUM_QUEUE_MEMORY);
-    if(maxQueueMemory != null){
-      eventFactory.setMaximumQueueMemory(Integer.parseInt(maxQueueMemory));
-    }
-    
-    //persistent
-    String persistent = atts.getValue(PERSISTENT);
-    if(persistent != null){
-      eventFactory.setPersistent(Boolean.parseBoolean(persistent));
-    }
-    
-    String diskStoreName = atts.getValue(DISK_STORE_NAME);
-    if(diskStoreName != null){
-      eventFactory.setDiskStoreName(diskStoreName);
-    }
-    
-    String diskSynchronous = atts.getValue(DISK_SYNCHRONOUS);
-    if(diskSynchronous != null){
-      eventFactory.setDiskSynchronous(Boolean.parseBoolean(diskSynchronous));
-    }
-    
-    HDFSEventQueueAttributes eventAttribs = eventFactory.create();
-    stack.push(eventAttribs);
-  }
-  
-  private void endHDFSEventQueue() {
-    HDFSEventQueueAttributes eventAttribs = (HDFSEventQueueAttributes) stack.pop();
-    
-    Object storeCreation = stack.peek();
-    if (!(storeCreation instanceof HDFSStoreCreation))
-      //TODO:HDFS throw a proper error string
-        throw new CacheXmlException("Store attributes should be a child of store");
-    HDFSStoreCreation store = (HDFSStoreCreation)storeCreation;
-    // put back the popped element
-    
-    store.setHDFSEventQueueAttributes(eventAttribs);
-  }
-  
   /**
    * Create a <code>transaction-writer</code> using the declarable interface
    * and set the transaction manager with the newly instantiated writer.
@@ -3067,9 +2993,6 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
 	else if (qName.equals(HDFS_STORE)) {
         startHDFSStore(atts);
     }
-    else if (qName.equals(HDFS_EVENT_QUEUE)) {
-      startHDFSEventQueue(atts);
-    }
     else if (qName.equals(COMPRESSOR)) {
     }
     else {
@@ -3481,9 +3404,6 @@ public class CacheXmlParser extends CacheXml implements ContentHandler {
       else if (qName.equals(HDFS_STORE)) {
           endHDFSStore();
       }
-      else if (qName.equals(HDFS_EVENT_QUEUE)) {
-        endHDFSEventQueue();
-      }
       else if (qName.equals(COMPRESSOR)) {
         endCompressor();
       }


Mime
View raw message