geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [29/51] [partial] incubator-geode git commit: SGA #2
Date Fri, 03 Jul 2015 19:21:30 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
new file mode 100644
index 0000000..e7e75dc
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreFactoryImpl.java
@@ -0,0 +1,67 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+
+/**
+ * Implementation of HDFSStoreFactory 
+ * 
+ * @author Hemant Bhanawat
+ */
+public class HDFSStoreFactoryImpl extends HDFSStoreCreation {
+  public static final String DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS= "HDFS_QUEUE";
+  
+  private Cache cache;
+  
+  public HDFSStoreFactoryImpl(Cache cache) {
+    this(cache, null);
+  }
+  
+  public HDFSStoreFactoryImpl(Cache cache, HDFSStoreCreation config) {
+    super(config);
+    this.cache = cache;
+  }
+
+  @Override
+  public HDFSStore create(String name) {
+    if (name == null) {
+      throw new GemFireConfigException("HDFS store name not provided");
+    }
+    
+    HDFSStore result = null;
+    synchronized (this) {
+      if (this.cache instanceof GemFireCacheImpl) {
+        GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache;
+        if (gfc.findHDFSStore(name) != null) {
+          throw new StoreExistsException(name);
+        }
+        
+        HDFSStoreImpl hsi = new HDFSStoreImpl(name, this.configHolder);
+        gfc.addHDFSStore(hsi);
+        result = hsi;
+      }
+    }
+    return result;
+  }
+
+  public static final String getEventQueueName(String regionPath) {
+    return HDFSStoreFactoryImpl.DEFAULT_ASYNC_QUEUE_ID_FOR_HDFS + "_"
+        + regionPath.replace('/', '_');
+  }
+
+  public HDFSStore getConfigView() {
+    return (HDFSStore) configHolder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
new file mode 100644
index 0000000..8e7e358
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreImpl.java
@@ -0,0 +1,565 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.ConnectTimeoutException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSStoreDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.cache.control.HeapMemoryMonitor;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.util.SingletonCallable;
+import com.gemstone.gemfire.internal.util.SingletonValue;
+import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder;
+
+/**
+ * Represents a HDFS based persistent store for region data.
+ * 
+ * @author Ashvin Agrawal
+ */
+public class HDFSStoreImpl implements HDFSStore {
+  
+  private volatile HDFSStoreConfigHolder configHolder; 
+  
+  private final SingletonValue<FileSystem> fs;
+
+  /**
+   * Used to make sure that only one thread creates the writer at a time. This prevents the dispatcher
+   * threads from cascading the Connection lock in DFS client see bug 51195
+   */
+  private final SingletonCallable<HoplogWriter> singletonWriter = new SingletonCallable<HoplogWriter>();
+
+  private final HFileStoreStatistics stats;
+  private final BlockCache blockCache;
+
+  private static HashSet<String> secureNameNodes = new HashSet<String>();
+  
+  private final boolean PERFORM_SECURE_HDFS_CHECK = Boolean.getBoolean(HoplogConfig.PERFORM_SECURE_HDFS_CHECK_PROP);
+  private static final Logger logger = LogService.getLogger();
+  protected final String logPrefix;
+  
+  static {
+    HdfsConfiguration.init();
+  }
+  
+  public HDFSStoreImpl(String name, final HDFSStore config) {
+    this.configHolder = new HDFSStoreConfigHolder(config);
+    configHolder.setName(name);
+
+    this.logPrefix = "<" + "HdfsStore:" + name + "> ";
+
+    stats = new HFileStoreStatistics(InternalDistributedSystem.getAnyInstance(), "HDFSStoreStatistics", name);
+
+    final Configuration hconf = new Configuration();
+        
+    // Set the block cache size.
+    // Disable the static block cache. We keep our own cache on the HDFS Store
+    // hconf.setFloat("hfile.block.cache.size", 0f);
+    if (this.getBlockCacheSize() != 0) {
+      long cacheSize = (long) (HeapMemoryMonitor.getTenuredPoolMaxMemory() * this.getBlockCacheSize() / 100);
+
+      // TODO use an off heap block cache if we're using off heap memory?
+      // See CacheConfig.instantiateBlockCache.
+      // According to Anthony, the off heap block cache is still
+      // experimental. Our own off heap cache might be a better bet.
+//      this.blockCache = new LruBlockCache(cacheSize,
+//          StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf, HFileSortedOplogFactory.convertStatistics(stats));
+      this.blockCache = new LruBlockCache(cacheSize, StoreFile.DEFAULT_BLOCKSIZE_SMALL, hconf);
+    } else {
+      this.blockCache = null;
+    }
+    
+    final String clientFile = config.getHDFSClientConfigFile();
+    fs = new SingletonValue<FileSystem>(new SingletonBuilder<FileSystem>() {
+      @Override
+      public FileSystem create() throws IOException {
+        return createFileSystem(hconf, clientFile, false);
+      }
+
+      @Override
+      public void postCreate() {
+      }
+      
+      @Override
+      public void createInProgress() {
+      }
+    });
+    
+    FileSystem fileSystem = null;
+    try {
+      fileSystem = fs.get();
+    } catch (Throwable ex) {
+      throw new HDFSIOException(ex.getMessage(),ex);
+    }    
+    //HDFSCompactionConfig has already been initialized
+    long cleanUpIntervalMillis = getHDFSCompactionConfig().getOldFilesCleanupIntervalMins() * 60 * 1000;
+    Path cleanUpIntervalPath = new Path(getHomeDir(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME);
+    HoplogUtil.exposeCleanupIntervalMillis(fileSystem, cleanUpIntervalPath, cleanUpIntervalMillis);
+  }
+  
+  /**
+   * Creates a new file system every time.  
+   */
+  public FileSystem createFileSystem() {
+    Configuration hconf = new Configuration();
+    try {
+      return createFileSystem(hconf, this.getHDFSClientConfigFile(), true);
+    } catch (Throwable ex) {
+      throw new HDFSIOException(ex.getMessage(),ex);
+    }
+  }
+  
+  private FileSystem createFileSystem(Configuration hconf, String configFile, boolean forceNew) throws IOException {
+    FileSystem filesystem = null; 
+    
+      // load hdfs client config file if specified. The path is on local file
+      // system
+      if (configFile != null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}Adding resource config file to hdfs configuration:" + configFile, logPrefix);
+        }
+        hconf.addResource(new Path(configFile));
+        
+        if (! new File(configFile).exists()) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_CLIENT_CONFIG_FILE_ABSENT, configFile));
+        }
+      }
+      
+      // This setting disables shutdown hook for file system object. Shutdown
+      // hook may cause FS object to close before the cache or store and
+      // unpredictable behavior. This setting is provided for GFXD like server
+      // use cases where FS close is managed by a server. This setting is not
+      // supported by old versions of hadoop, HADOOP-4829
+      hconf.setBoolean("fs.automatic.close", false);
+      
+      // Hadoop has a configuration parameter io.serializations that is a list of serialization 
+      // classes which can be used for obtaining serializers and deserializers. This parameter 
+      // by default contains avro classes. When a sequence file is created, it calls 
+      // SerializationFactory.getSerializer(keyclass). This internally creates objects using 
+      // reflection of all the classes that were part of io.serializations. But since, there is 
+      // no avro class available it throws an exception. 
+      // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes 
+      // that are important to us. 
+      hconf.setStrings("io.serializations",
+          new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"});
+      // create writer
+
+      SchemaMetrics.configureGlobally(hconf);
+      
+      String nameNodeURL = null;
+      if ((nameNodeURL = getNameNodeURL()) == null) {
+          nameNodeURL = hconf.get("fs.default.name");
+      }
+      
+      URI namenodeURI = URI.create(nameNodeURL);
+    
+    //if (! GemFireCacheImpl.getExisting().isHadoopGfxdLonerMode()) {
+      String authType = hconf.get("hadoop.security.authentication");
+      
+      //The following code handles Gemfire XD with secure HDFS
+      //A static set is used to cache all known secure HDFS NameNode urls.
+      UserGroupInformation.setConfiguration(hconf);
+
+      //Compare authentication method ignoring case to make GFXD future version complaint
+      //At least version 2.0.2 starts complaining if the string "kerberos" is not in all small case.
+      //However it seems current version of hadoop accept the authType in any case
+      if (authType.equalsIgnoreCase("kerberos")) {
+        
+        String principal = hconf.get(HoplogConfig.KERBEROS_PRINCIPAL);
+        String keyTab = hconf.get(HoplogConfig.KERBEROS_KEYTAB_FILE);
+       
+        if (!PERFORM_SECURE_HDFS_CHECK) {
+          if (logger.isDebugEnabled())
+            logger.debug("{}Ignore secure hdfs check", logPrefix);
+        } else {
+          if (!secureNameNodes.contains(nameNodeURL)) {
+            if (logger.isDebugEnabled())
+              logger.debug("{}Executing secure hdfs check", logPrefix);
+             try{
+              filesystem = FileSystem.newInstance(namenodeURI, hconf);
+              //Make sure no IOExceptions are generated when accessing insecure HDFS. 
+              filesystem.listFiles(new Path("/"),false);
+              throw new HDFSIOException("Gemfire XD HDFS client and HDFS cluster security levels do not match. The configured HDFS Namenode is not secured.");
+             } catch (IOException ex) {
+               secureNameNodes.add(nameNodeURL);
+             } finally {
+             //Close filesystem to avoid resource leak
+               if(filesystem != null) {
+                 closeFileSystemIgnoreError(filesystem);
+               }
+             }
+          }
+        }
+
+        // check to ensure the namenode principal is defined
+        String nameNodePrincipal = hconf.get("dfs.namenode.kerberos.principal");
+        if (nameNodePrincipal == null) {
+          throw new IOException(LocalizedStrings.GF_KERBEROS_NAMENODE_PRINCIPAL_UNDEF.toLocalizedString());
+        }
+        
+        // ok, the user specified a gfxd principal so we will try to login
+        if (principal != null) {
+          //If NameNode principal is the same as Gemfire XD principal, there is a 
+          //potential security hole
+          String regex = "[/@]";
+          if (nameNodePrincipal != null) {
+            String HDFSUser = nameNodePrincipal.split(regex)[0];
+            String GFXDUser = principal.split(regex)[0];
+            if (HDFSUser.equals(GFXDUser)) {
+              logger.warn(LocalizedMessage.create(LocalizedStrings.HDFS_USER_IS_SAME_AS_GF_USER, GFXDUser));
+            }
+          }
+          
+          // a keytab must exist if the user specifies a principal
+          if (keyTab == null) {
+            throw new IOException(LocalizedStrings.GF_KERBEROS_KEYTAB_UNDEF.toLocalizedString());
+          }
+          
+          // the keytab must exist as well
+          File f = new File(keyTab);
+          if (!f.exists()) {
+            throw new FileNotFoundException(LocalizedStrings.GF_KERBEROS_KEYTAB_FILE_ABSENT.toLocalizedString(f.getAbsolutePath()));
+          }
+
+          //Authenticate Gemfire XD principal to Kerberos KDC using Gemfire XD keytab file
+          String principalWithValidHost = SecurityUtil.getServerPrincipal(principal, "");
+          UserGroupInformation.loginUserFromKeytab(principalWithValidHost, keyTab);
+        } else {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.GF_KERBEROS_PRINCIPAL_UNDEF));
+        }
+      }
+    //}
+
+    filesystem = getFileSystemFactory().create(namenodeURI, hconf, forceNew);
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Initialized FileSystem linked to " + filesystem.getUri()
+          + " " + filesystem.hashCode(), logPrefix);
+    }
+    return filesystem;
+  }
+
+  public FileSystem getFileSystem() throws IOException {
+    return fs.get();
+  }
+  
+  public FileSystem getCachedFileSystem() {
+    return fs.getCachedValue();
+  }
+
+  public SingletonCallable<HoplogWriter> getSingletonWriter() {
+    return this.singletonWriter;
+  }
+
+  private final SingletonCallable<Boolean> fsExists = new SingletonCallable<Boolean>();
+
+  public boolean checkFileSystemExists() throws IOException {
+    try {
+      return fsExists.runSerially(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          FileSystem fileSystem = getCachedFileSystem();
+          if (fileSystem == null) {
+            return false;
+          }
+          return fileSystem.exists(new Path("/"));
+        }
+      });
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException)e;
+      }
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * This method executes a query on namenode. If the query succeeds, FS
+   * instance is healthy. If it fails, the old instance is closed and a new
+   * instance is created.
+   */
+  public void checkAndClearFileSystem() {
+    FileSystem fileSystem = getCachedFileSystem();
+    
+    if (fileSystem != null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Checking file system at " + fileSystem.getUri(), logPrefix);
+      }
+      try {
+        checkFileSystemExists();
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}FS client is ok: " + fileSystem.getUri() + " "
+              + fileSystem.hashCode(), logPrefix);
+        }
+        return;
+      } catch (ConnectTimeoutException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}Hdfs unreachable, FS client is ok: "
+              + fileSystem.getUri() + " " + fileSystem.hashCode(), logPrefix);
+        }
+        return;
+      } catch (IOException e) {
+        logger.debug("IOError in filesystem checkAndClear ", e);
+        
+        // The file system is closed or NN is not reachable. It is safest to
+        // create a new FS instance. If the NN continues to remain unavailable,
+        // all subsequent read/write request will cause HDFSIOException. This is
+        // similar to the way hbase manages failures. This has a drawback
+        // though. A network blip will result in all connections to be
+        // recreated. However trying to preserve the connections and waiting for
+        // FS to auto-recover is not deterministic.
+        if (e instanceof RemoteException) {
+          e = ((RemoteException) e).unwrapRemoteException();
+        }
+
+        logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_UNREACHABLE,
+            fileSystem.getUri()), e);
+      }
+
+      // compare and clear FS container. The fs container needs to be reusable
+      boolean result = fs.clear(fileSystem, true);
+      if (!result) {
+        // the FS instance changed after this call was initiated. Check again
+        logger.debug("{}Failed to clear FS ! I am inconsistent so retrying ..", logPrefix);
+        checkAndClearFileSystem();
+      } else {
+        closeFileSystemIgnoreError(fileSystem);
+      }      
+    }
+  }
+
+  private void closeFileSystemIgnoreError(FileSystem fileSystem) {
+    if (fileSystem == null) {
+      logger.debug("{}Trying to close null file system", logPrefix);
+      return;
+    }
+
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Closing file system at " + fileSystem.getUri() + " "
+            + fileSystem.hashCode(), logPrefix);
+      }
+      fileSystem.close();
+    } catch (Exception e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Failed to close file system at " + fileSystem.getUri()
+            + " " + fileSystem.hashCode(), e);
+      }
+    }
+  }
+
+  public HFileStoreStatistics getStats() {
+    return stats;
+  }
+  
+  public BlockCache getBlockCache() {
+    return blockCache;
+  }
+
+  public void close() {
+    logger.debug("{}Closing file system: " + getName(), logPrefix);
+    stats.close();
+    blockCache.shutdown();
+    //Might want to clear the block cache, but it should be dereferenced.
+    
+    // release DDL hoplog organizer for this store. Also shutdown compaction
+    // threads. These two resources hold references to GemfireCacheImpl
+    // instance. Any error is releasing this resources is not critical and needs
+    // be ignored.
+    try {
+      HDFSCompactionManager manager = HDFSCompactionManager.getInstance(this);
+      if (manager != null) {
+        manager.reset();
+      }
+    } catch (Exception e) {
+      logger.info(e);
+    }
+    
+    // once this store is closed, this store should not be used again
+    FileSystem fileSystem = fs.clear(false);
+    if (fileSystem != null) {
+      closeFileSystemIgnoreError(fileSystem);
+    }    
+  }
+  
+  /**
+   * Test hook to remove all of the contents of the the folder
+   * for this HDFS store from HDFS.
+   * @throws IOException 
+   */
+  public void clearFolder() throws IOException {
+    getFileSystem().delete(new Path(getHomeDir()), true);
+  }
+  
+  @Override
+  public void destroy() {
+    Collection<String> regions = HDFSRegionDirector.getInstance().getRegionsInStore(this);
+    if(!regions.isEmpty()) {
+      throw new IllegalStateException("Cannot destroy a HDFS store that still contains regions: " + regions); 
+    }
+    close();
+    HDFSStoreDirector.getInstance().removeHDFSStore(this.getName());
+  }
+
+  @Override
+  public synchronized HDFSStore alter(HDFSStoreMutator mutator) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Altering hdfsStore " + this, logPrefix);
+      logger.debug("{}Mutator " + mutator, logPrefix);
+    }
+    HDFSStoreConfigHolder newHolder = new HDFSStoreConfigHolder(configHolder);
+    newHolder.copyFrom(mutator);
+    newHolder.getHDFSCompactionConfig().validate();
+    HDFSStore oldStore = configHolder;
+    configHolder = newHolder;
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Resuult of Alter " + this, logPrefix);
+    }
+    return (HDFSStore) oldStore;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("HDFSStoreImpl [");
+    if (configHolder != null) {
+      builder.append("configHolder=");
+      builder.append(configHolder);
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+
+  @Override
+  public String getName() {
+    return configHolder.getName();
+  }
+
+  @Override
+  public String getNameNodeURL() {
+    return configHolder.getNameNodeURL();
+  }
+
+  @Override
+  public String getHomeDir() {
+    return configHolder.getHomeDir();
+  }
+
+  @Override
+  public String getHDFSClientConfigFile() {
+    return configHolder.getHDFSClientConfigFile();
+  }
+
+  @Override
+  public float getBlockCacheSize() {
+    return configHolder.getBlockCacheSize();
+  }
+
+  @Override
+  public int getMaxFileSize() {
+    return configHolder.getMaxFileSize();
+  }
+
+  @Override
+  public int getFileRolloverInterval() {
+    return configHolder.getFileRolloverInterval();
+  }
+
+  @Override
+  public boolean getMinorCompaction() {
+    return configHolder.getMinorCompaction();
+  }
+
+  @Override
+  public HDFSEventQueueAttributes getHDFSEventQueueAttributes() {
+    return configHolder.getHDFSEventQueueAttributes();
+  }
+
+  @Override
+  public HDFSCompactionConfig getHDFSCompactionConfig() {
+    return (HDFSCompactionConfig) configHolder.getHDFSCompactionConfig();
+  }
+
+  @Override
+  public HDFSStoreMutator createHdfsStoreMutator() {
+    return new HDFSStoreMutatorImpl();
+  }
+
+  public FileSystemFactory getFileSystemFactory() {
+    return new DistributedFileSystemFactory();
+  }
+
+  /*
+   * Factory to create HDFS file system instances
+   */
+  static public interface FileSystemFactory {
+    public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException;
+  }
+
+  /*
+   * File system factory implementations for creating instances of file system
+   * connected to distributed HDFS cluster
+   */
+  public class DistributedFileSystemFactory implements FileSystemFactory {
+    private final boolean ALLOW_TEST_FILE_SYSTEM = Boolean.getBoolean(HoplogConfig.ALLOW_LOCAL_HDFS_PROP);
+    private final boolean USE_FS_CACHE = Boolean.getBoolean(HoplogConfig.USE_FS_CACHE);
+
+    @Override
+    public FileSystem create(URI nn, Configuration conf, boolean create) throws IOException {
+      FileSystem filesystem;
+
+      if (USE_FS_CACHE && !create) {
+        filesystem = FileSystem.get(nn, conf);
+      } else {
+        filesystem = FileSystem.newInstance(nn, conf);
+      }
+
+      if (filesystem instanceof LocalFileSystem && !ALLOW_TEST_FILE_SYSTEM) {
+        closeFileSystemIgnoreError(filesystem);
+        throw new IllegalStateException(
+            LocalizedStrings.HOPLOG_TRYING_TO_CREATE_STANDALONE_SYSTEM.toLocalizedString(getNameNodeURL()));
+      }
+
+      return filesystem;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
new file mode 100644
index 0000000..e4e2093
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreMutatorImpl.java
@@ -0,0 +1,264 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder.AbstractHDFSCompactionConfigHolder;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+public class HDFSStoreMutatorImpl implements HDFSStoreMutator {
+  private HDFSStoreConfigHolder configHolder;
+  private Boolean autoCompact;
+  private HDFSCompactionConfigMutator compactionMutator;
+  private HDFSEventQueueAttributesMutator qMutator;
+
+  public HDFSStoreMutatorImpl() {
+    configHolder = new HDFSStoreConfigHolder();
+    configHolder.resetDefaultValues();
+    compactionMutator = new HDFSCompactionConfigMutatorImpl(configHolder.getHDFSCompactionConfig());
+    qMutator = new HDFSEventQueueAttributesMutatorImpl(null);
+  }
+
+  public HDFSStoreMutatorImpl(HDFSStore store) {
+    configHolder = new HDFSStoreConfigHolder(store);
+    compactionMutator = new HDFSCompactionConfigMutatorImpl(configHolder.getHDFSCompactionConfig());
+    // The following two steps are needed to set the null boolean values in compactionMutator
+    configHolder.setMinorCompaction(configHolder.getMinorCompaction());
+    compactionMutator.setAutoMajorCompaction(configHolder.getHDFSCompactionConfig().getAutoMajorCompaction());
+    qMutator = new HDFSEventQueueAttributesMutatorImpl(configHolder.getHDFSEventQueueAttributes());
+  }
+  
+  public HDFSStoreMutator setMaxFileSize(int maxFileSize) {
+    configHolder.setMaxFileSize(maxFileSize);
+    return this;
+  }
+  @Override
+  public int getMaxFileSize() {
+    return configHolder.getMaxFileSize();
+  }
+
+  @Override
+  public HDFSStoreMutator setFileRolloverInterval(int count) {
+    configHolder.setFileRolloverInterval(count);
+    return this;
+  }
+  @Override
+  public int getFileRolloverInterval() {
+    return configHolder.getFileRolloverInterval();
+  }
+
+  @Override
+  public HDFSCompactionConfigMutator setMinorCompaction(boolean auto) {
+    autoCompact = Boolean.valueOf(auto);
+    configHolder.setMinorCompaction(auto);
+    return null;
+  }
+  @Override
+  public Boolean getMinorCompaction() {
+    return autoCompact;
+  }
+  
+  @Override
+  public HDFSCompactionConfigMutator getCompactionConfigMutator() {
+    return compactionMutator;
+  }
+
+  @Override
+  public HDFSEventQueueAttributesMutator getHDFSEventQueueAttributesMutator() {
+    return qMutator;
+  }
+
+  public static class HDFSEventQueueAttributesMutatorImpl implements HDFSEventQueueAttributesMutator {
+    private HDFSEventQueueAttributesFactory factory = new HDFSEventQueueAttributesFactory();
+    int batchSize = -1;
+    int batchInterval = -1;
+    
+    public HDFSEventQueueAttributesMutatorImpl(HDFSEventQueueAttributes qAttrs) {
+      if (qAttrs == null) {
+        return;
+      }
+      
+      setBatchSizeMB(qAttrs.getBatchSizeMB());
+      setBatchTimeInterval(qAttrs.getBatchTimeInterval());
+    }
+    
+    @Override
+    public HDFSEventQueueAttributesMutator setBatchSizeMB(int size) {
+      factory.setBatchSizeMB(size);
+      batchSize = size;
+      // call factory.set to execute attribute value validation
+      return this;
+    }
+    @Override
+    public int getBatchSizeMB() {
+      return batchSize;
+    }
+
+    @Override
+    public HDFSEventQueueAttributesMutator setBatchTimeInterval(int interval) {
+      batchInterval = interval;
+      // call factory.set to execute attribute value validation
+      factory.setBatchTimeInterval(interval);
+      return this;
+    }
+    @Override
+    public int getBatchTimeInterval() {
+      return batchInterval;
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("HDFSEventQueueAttributesMutatorImpl [");
+      if (batchSize > -1) {
+        builder.append("batchSize=");
+        builder.append(batchSize);
+        builder.append(", ");
+      }
+      if (batchInterval > -1) {
+        builder.append("batchInterval=");
+        builder.append(batchInterval);
+      }
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  /**
+   * @author ashvina
+   */
+  public static class HDFSCompactionConfigMutatorImpl implements HDFSCompactionConfigMutator {
+    private AbstractHDFSCompactionConfigHolder configHolder;
+    private Boolean autoMajorCompact;
+
+    public HDFSCompactionConfigMutatorImpl(AbstractHDFSCompactionConfigHolder configHolder) {
+      this.configHolder = configHolder;
+    }
+
+    @Override
+    public HDFSCompactionConfigMutator setMaxInputFileSizeMB(int size) {
+      configHolder.setMaxInputFileSizeMB(size);
+      return this;
+    }
+    @Override
+    public int getMaxInputFileSizeMB() {
+      return configHolder.getMaxInputFileSizeMB();
+    }
+    
+    @Override
+    public HDFSCompactionConfigMutator setMinInputFileCount(int count) {
+      configHolder.setMinInputFileCount(count);
+      return this;
+    }
+    @Override
+    public int getMinInputFileCount() {
+      return configHolder.getMinInputFileCount();
+    }
+
+    @Override
+    public HDFSCompactionConfigMutator setMaxInputFileCount(int count) {
+      configHolder.setMaxInputFileCount(count);
+      return this;
+    }
+    @Override
+    public int getMaxInputFileCount() {
+      return configHolder.getMaxInputFileCount();
+    }
+
+    @Override
+    public HDFSCompactionConfigMutator setMaxThreads(int count) {
+      configHolder.setMaxThreads(count);
+      return this;
+    }
+    @Override
+    public int getMaxThreads() {
+      return configHolder.getMaxThreads();
+    }
+    
+    @Override
+    public HDFSCompactionConfigMutator setAutoMajorCompaction(boolean auto) {
+      autoMajorCompact = Boolean.valueOf(auto);
+      configHolder.setAutoMajorCompaction(auto);
+      return this;
+    }
+    @Override
+    public Boolean getAutoMajorCompaction() {
+      return autoMajorCompact;
+    }
+
+    @Override
+    public HDFSCompactionConfigMutator setMajorCompactionIntervalMins(int count) {
+      configHolder.setMajorCompactionIntervalMins(count);
+      return this;
+    }
+    @Override
+    public int getMajorCompactionIntervalMins() {
+      return configHolder.getMajorCompactionIntervalMins();
+    }
+
+    @Override
+    public HDFSCompactionConfigMutator setMajorCompactionMaxThreads(int count) {
+      configHolder.setMajorCompactionMaxThreads(count);
+      return this;
+    }
+    @Override
+    public int getMajorCompactionMaxThreads() {
+      return configHolder.getMajorCompactionMaxThreads();
+    }
+
+    @Override
+    public HDFSCompactionConfigMutator setOldFilesCleanupIntervalMins(
+        int interval) {
+      configHolder.setOldFilesCleanupIntervalMins(interval);
+      return this;
+    }
+    @Override
+    public int getOldFilesCleanupIntervalMins() {
+      return configHolder.getOldFilesCleanupIntervalMins();
+    }
+  }
+
+  public static void assertIsPositive(String name, int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+              .toLocalizedString(new Object[] { name, count }));
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("HDFSStoreMutatorImpl [");
+    if (configHolder != null) {
+      builder.append("configHolder=");
+      builder.append(configHolder);
+      builder.append(", ");
+    }
+    if (autoCompact != null) {
+      builder.append("MinorCompaction=");
+      builder.append(autoCompact);
+      builder.append(", ");
+    }
+    if (compactionMutator.getAutoMajorCompaction() != null) {
+      builder.append("autoMajorCompaction=");
+      builder.append(compactionMutator.getAutoMajorCompaction());
+      builder.append(", ");
+    }
+    if (qMutator != null) {
+      builder.append("qMutator=");
+      builder.append(qMutator);
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
new file mode 100644
index 0000000..0787ee0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSWriteOnlyStoreEventListener.java
@@ -0,0 +1,176 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Listener that persists events to a write only HDFS store
+ *
+ * @author Hemant Bhanawat
+ */
+public class HDFSWriteOnlyStoreEventListener implements
+    AsyncEventListener {
+
+  private final LogWriterI18n logger;
+  private volatile boolean senderStopped = false; 
+  private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
+  
+  
+  public HDFSWriteOnlyStoreEventListener(LogWriterI18n logger) {
+    this.logger = logger;
+  }
+  
+  @Override
+  public void close() {
+    senderStopped = true;
+  }
+
+  @Override
+  public boolean processEvents(List<AsyncEvent> events) {
+    if (Hoplog.NOP_WRITE) {
+      return true;
+    }
+
+    if (logger.fineEnabled())
+      logger.fine("HDFSWriteOnlyStoreEventListener: A total of " + events.size() + " events are sent from GemFire to persist on HDFS");
+    boolean success = false;
+    try {
+      failureTracker.sleepIfRetry();
+      HDFSGatewayEventImpl hdfsEvent = null;
+      int previousBucketId = -1;
+      BatchManager bm = null;
+      for (AsyncEvent asyncEvent : events) {
+        if (senderStopped){
+          if (logger.fineEnabled()) {
+            logger.fine("HDFSWriteOnlyStoreEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
+          }
+          return false;
+        }
+        hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
+        if (previousBucketId != hdfsEvent.getBucketId()){
+          if (previousBucketId != -1) 
+            persistBatch(bm, previousBucketId);
+          
+          previousBucketId = hdfsEvent.getBucketId();
+          bm = new BatchManager();
+        }
+        bm.addEvent(hdfsEvent);
+      }
+      try {
+        persistBatch(bm, hdfsEvent.getBucketId());
+      } catch (BucketMovedException e) {
+        logger.fine("Batch could not be written to HDFS as the bucket moved. bucket id: " + 
+            hdfsEvent.getBucketId() + " Exception: " + e);
+        return false;
+      }
+      success = true;
+    } catch (IOException e) {
+      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+      return false;
+    }
+    catch (ClassNotFoundException e) {
+      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+      return false;
+    }
+    catch (CacheClosedException e) {
+      // exit silently
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    } catch (ForceReattemptException e) {
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    } catch (InterruptedException e1) {
+      // TODO Auto-generated catch block
+      e1.printStackTrace();
+    } finally {
+      failureTracker.record(success);
+    }
+    return true;
+  }
+  
+  /**
+   * Persists batches of multiple regions specified by the batch manager
+   * 
+   */
+  private void persistBatch(BatchManager bm, int bucketId) throws IOException, ForceReattemptException {
+    Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> eventsPerRegion = 
+        bm.iterator();
+    HoplogOrganizer bucketOrganizer = null; 
+    while (eventsPerRegion.hasNext()) {
+      Map.Entry<LocalRegion, ArrayList<QueuedPersistentEvent>> eventsForARegion = eventsPerRegion.next();
+      bucketOrganizer = getOrganizer((PartitionedRegion) eventsForARegion.getKey(), bucketId);
+      // bucket organizer cannot be null. 
+      if (bucketOrganizer == null)
+        throw new BucketMovedException("Bucket moved. BucketID: " + bucketId + "  HdfsRegion: " +  eventsForARegion.getKey().getName());
+      bucketOrganizer.flush(eventsForARegion.getValue().iterator(), eventsForARegion.getValue().size());
+      if (logger.fineEnabled()) {
+        logger.fine("Batch written to HDFS of size " +  eventsForARegion.getValue().size() + 
+            " for region " + eventsForARegion.getKey());
+      }
+    }
+  }
+
+  private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
+    BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
+    if (br == null) {
+      // got rebalanced or something
+      throw new BucketMovedException("Bucket region is no longer available. BucketId: "+
+          bucketId + " HdfsRegion: " +  region.getName());
+    }
+
+    return br.getHoplogOrganizer();
+  }
+  
+  /**
+   * Sorts out events of the multiple regions into lists per region 
+   *
+   */
+  private class BatchManager implements Iterable<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> {
+    private HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>> regionBatches = 
+        new HashMap<LocalRegion, ArrayList<QueuedPersistentEvent>>();
+    
+    public void addEvent (HDFSGatewayEventImpl hdfsEvent) throws IOException, ClassNotFoundException {
+      LocalRegion region = (LocalRegion) hdfsEvent.getRegion();
+      ArrayList<QueuedPersistentEvent> regionList = regionBatches.get(region);
+      if (regionList == null) {
+        regionList = new ArrayList<QueuedPersistentEvent>();
+        regionBatches.put(region, regionList);
+      }
+      regionList.add(new UnsortedHDFSQueuePersistedEvent(hdfsEvent));
+    }
+
+    @Override
+    public Iterator<Map.Entry<LocalRegion,ArrayList<QueuedPersistentEvent>>> iterator() {
+      return regionBatches.entrySet().iterator();
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
new file mode 100644
index 0000000..e2bef03
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HoplogListenerForRegion.java
@@ -0,0 +1,64 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogListener;
+
+/**
+ * Objects of this class needs to be created for every region. These objects 
+ * listen to the oplog events and take appropriate action.   
+ *
+ * @author Hemant Bhanawat
+ */
+public class HoplogListenerForRegion implements HoplogListener {
+
+  private List<HoplogListener> otherListeners = new CopyOnWriteArrayList<HoplogListener>();
+
+  public HoplogListenerForRegion() {
+    
+  }
+
+  @Override
+  public void hoplogCreated(String regionFolder, int bucketId,
+      Hoplog... oplogs) throws IOException {
+    for (HoplogListener listener : this.otherListeners) {
+      listener.hoplogCreated(regionFolder, bucketId, oplogs);
+    }
+  }
+
+  @Override
+  public void hoplogDeleted(String regionFolder, int bucketId,
+      Hoplog... oplogs) {
+    for (HoplogListener listener : this.otherListeners) {
+      try {
+        listener.hoplogDeleted(regionFolder, bucketId, oplogs);
+      } catch (IOException e) {
+        // TODO handle
+        throw new HDFSIOException(e.getLocalizedMessage(), e);
+      }
+    }
+  }
+
+  public void addListener(HoplogListener listener) {
+    this.otherListeners.add(listener);
+  }
+
+  @Override
+  public void compactionCompleted(String region, int bucket, boolean isMajor) {
+    for (HoplogListener listener : this.otherListeners) {
+      listener.compactionCompleted(region, bucket, isMajor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
new file mode 100644
index 0000000..e66e82d
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/PersistedEventImpl.java
@@ -0,0 +1,194 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.CachedDeserializable;
+import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Event that is persisted in HDFS. As we need to persist some of the EntryEventImpl
+ * variables, we have created this class and have overridden toData and fromData functions.  
+ * 
+ *  There are subclasses of this class of the different types of persisted events
+ *  sorted vs. unsorted, and the persisted events we keep in the region
+ *  queue, which need to hold the region key.
+ *   
+ *
+ * @author Hemant Bhanawat
+ */
+public abstract class PersistedEventImpl {
+  protected Operation op = Operation.UPDATE;
+  
+  protected Object valueObject;
+
+  /**
+   * A field with flags decribing the event
+   */
+  protected byte flags;
+
+   //FLags indicating the type of value
+   //if the value is not a byte array or object, is is an internal delta.
+  private static final byte VALUE_IS_BYTE_ARRAY= 0x01;
+  private static final byte VALUE_IS_OBJECT= (VALUE_IS_BYTE_ARRAY << 1);
+  private static final byte POSSIBLE_DUPLICATE = (VALUE_IS_OBJECT << 1);
+  private static final byte HAS_VERSION_TAG = (POSSIBLE_DUPLICATE << 1);
+  
+
+  /** for deserialization */
+  public PersistedEventImpl() {
+  }
+  
+  public PersistedEventImpl(Object value, Operation op, byte valueIsObject,
+      boolean isPossibleDuplicate, boolean hasVersionTag) throws IOException,
+      ClassNotFoundException {
+    this.op = op;
+    this.valueObject = value;
+    setFlag(VALUE_IS_BYTE_ARRAY, valueIsObject == 0x00);
+    setFlag(VALUE_IS_OBJECT, valueIsObject == 0x01);
+    setFlag(POSSIBLE_DUPLICATE, isPossibleDuplicate);
+    setFlag(HAS_VERSION_TAG, hasVersionTag);
+  }
+  
+  private void setFlag(byte flag, boolean set) {
+    flags = (byte) (set ?  flags | flag :  flags & ~flag);
+  }
+  
+  private boolean getFlag(byte flag) {
+    return (flags & flag) != 0x0;
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    out.writeByte(this.op.ordinal);
+    out.writeByte(this.flags);
+    
+    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
+      DataSerializer.writeByteArray((byte[])this.valueObject, out);
+    } else if (getFlag(VALUE_IS_OBJECT)) {
+      if(valueObject instanceof CachedDeserializable) {
+        CachedDeserializable cd = (CachedDeserializable)valueObject;
+        DataSerializer.writeObjectAsByteArray(cd.getValue(), out);
+      } else {
+        DataSerializer.writeObjectAsByteArray(valueObject, out);
+      }
+    }
+    else {
+      DataSerializer.writeObject(valueObject, out);
+    }
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.op = Operation.fromOrdinal(in.readByte());
+    this.flags = in.readByte();
+    
+    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
+      this.valueObject = DataSerializer.readByteArray(in);
+    } else if (getFlag(VALUE_IS_OBJECT)) {
+      byte[] newValueBytes = DataSerializer.readByteArray(in);
+      if(newValueBytes == null) {
+        this.valueObject = null;
+      } else {
+        if(CachedDeserializableFactory.preferObject()) {
+          this.valueObject =  EntryEventImpl.deserialize(newValueBytes);
+        } else {
+          this.valueObject = CachedDeserializableFactory.create(newValueBytes);
+        }
+      }
+    }
+    else {
+      this.valueObject = DataSerializer.readObject(in);
+    }
+    
+  }
+  
+  /**
+   * Return the timestamp of this event. Depending on the subclass,
+   * this may be part of the version tag, or a separate field.
+   */
+  public abstract long getTimstamp();
+
+  protected boolean hasVersionTag() {
+    return getFlag(HAS_VERSION_TAG);
+  }
+
+  public Operation getOperation()
+  {
+    return this.op;
+  }
+  
+  public Object getValue() {
+    return this.valueObject;
+  }
+  
+  public boolean isPossibleDuplicate()
+  {
+    return getFlag(POSSIBLE_DUPLICATE);
+  }
+
+  /**
+   * returns deserialized value. 
+   * 
+   */
+  public Object getDeserializedValue() throws IOException, ClassNotFoundException {
+    Object retVal = null;
+    if (getFlag(VALUE_IS_BYTE_ARRAY)) { 
+      // value is a byte array
+      retVal = this.valueObject;
+    } else if (getFlag(VALUE_IS_OBJECT)) {
+      if(valueObject instanceof CachedDeserializable) {
+        retVal = ((CachedDeserializable)valueObject).getDeserializedForReading();
+      } else {
+        retVal = valueObject;
+      }
+    }
+    else {
+      // value is a object
+      retVal = this.valueObject;
+    }
+    return retVal;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder str = new StringBuilder(PersistedEventImpl.class.getSimpleName());
+    str.append("@").append(System.identityHashCode(this))
+    .append(" op:").append(op)
+    .append(" valueObject:").append(valueObject)
+    .append(" isPossibleDuplicate:").append(getFlag(POSSIBLE_DUPLICATE));
+    return str.toString();
+  }
+
+  public void copy(PersistedEventImpl usersValue) {
+    this.op = usersValue.op;
+    this.valueObject = usersValue.valueObject;
+    this.flags = usersValue.flags;
+  }
+  
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    int size = 0;
+    
+    // value length
+    size += valueSize; 
+
+    // one byte for op and one byte for flag
+    size += 2;
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
new file mode 100644
index 0000000..d679003
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/QueuedPersistentEvent.java
@@ -0,0 +1,11 @@
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public interface QueuedPersistentEvent {
+  
+  public byte[] getRawKey();
+  
+  public void toHoplogEventBytes(DataOutput out) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
new file mode 100644
index 0000000..b9b1948
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserver.java
@@ -0,0 +1,107 @@
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Tracks flushes using a queue of latches.
+ * 
+ * @author bakera
+ */
+public class SignalledFlushObserver implements FlushObserver {
+  private static class FlushLatch extends CountDownLatch {
+    private final long seqnum;
+    
+    public FlushLatch(long seqnum) {
+      super(1);
+      this.seqnum = seqnum;
+    }
+    
+    public long getSequence() {
+      return seqnum;
+    }
+  }
+  
+  // assume the number of outstanding flush requests is small so we don't
+  // need to organize by seqnum
+  private final List<FlushLatch> signals;
+  
+  private final AtomicLong eventsReceived;
+  private final AtomicLong eventsDelivered;
+  
+  public SignalledFlushObserver() {
+    signals = new ArrayList<FlushLatch>();
+    eventsReceived = new AtomicLong(0);
+    eventsDelivered = new AtomicLong(0);
+  }
+  
+  @Override
+  public boolean shouldDrainImmediately() {
+    synchronized (signals) {
+      return !signals.isEmpty();
+    }
+  }
+  
+  @Override
+  public AsyncFlushResult flush() {
+    final long seqnum = eventsReceived.get();
+    synchronized (signals) {
+      final FlushLatch flush;
+      if (seqnum <= eventsDelivered.get()) {
+        flush = null;
+      } else {
+        flush = new FlushLatch(seqnum);
+        signals.add(flush);
+      }
+      
+      return new AsyncFlushResult() {
+        @Override
+        public boolean waitForFlush(long timeout, TimeUnit unit) throws InterruptedException {
+          return flush == null ? true : flush.await(timeout, unit);
+        }
+      };
+    }
+  }
+
+  /**
+   * Invoked when an event is received.
+   */
+  public void push() {
+    eventsReceived.incrementAndGet();
+  }
+
+  /**
+   * Invoked when a batch has been dispatched.
+   */
+  public void pop(int count) {
+    long highmark = eventsDelivered.addAndGet(count);
+    synchronized (signals) {
+      for (ListIterator<FlushLatch> iter = signals.listIterator(); iter.hasNext(); ) {
+        FlushLatch flush = iter.next();
+        if (flush.getSequence() <= highmark) {
+          flush.countDown();
+          iter.remove();
+        }
+      }
+    }
+  }
+  
+  /**
+   * Invoked when the queue is cleared.
+   */
+  public void clear() {
+    synchronized (signals) {
+      for (FlushLatch flush : signals) {
+        flush.countDown();
+      }
+
+      signals.clear();
+      eventsReceived.set(0);
+      eventsDelivered.set(0);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java
new file mode 100644
index 0000000..57d58b7
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SizeTieredHdfsCompactionConfigHolder.java
@@ -0,0 +1,74 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory.HDFSCompactionConfigFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder.AbstractHDFSCompactionConfigHolder;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Class for authorization and validation of HDFS size Tiered compaction config
+ * 
+ * @author ashvina
+ */
+public class SizeTieredHdfsCompactionConfigHolder extends AbstractHDFSCompactionConfigHolder {
+  @Override
+  public String getCompactionStrategy() {
+    return SIZE_ORIENTED;
+  }
+
+  @Override
+  public HDFSCompactionConfigFactory setMaxInputFileSizeMB(int size) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_SIZE_MB", size);
+    this.maxInputFileSizeMB = size;
+    return this;
+  }
+
+  @Override
+  public HDFSCompactionConfigFactory setMinInputFileCount(int count) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MIN_INPUT_FILE_COUNT", count);
+    this.minInputFileCount = count;
+    return this;
+  }
+
+  @Override
+  public HDFSCompactionConfigFactory setMaxInputFileCount(int count) {
+    HDFSStoreCreation.assertIsPositive("HDFS_COMPACTION_MAX_INPUT_FILE_COUNT", count);
+    this.maxInputFileCount = count;
+    return this;
+  }
+  
+  @Override
+  public HDFSCompactionConfigFactory setMajorCompactionIntervalMins(int count) {
+    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_INTERVAL, count);
+    this.majorCompactionIntervalMins = count;
+    return this;
+  }
+  
+  @Override
+  public HDFSCompactionConfigFactory setMajorCompactionMaxThreads(int count) {
+    HDFSStoreCreation.assertIsPositive(CacheXml.HDFS_MAJOR_COMPACTION_THREADS, count);
+    this.majorCompactionConcurrency = count;
+    return this;
+  }
+
+  @Override
+  protected void validate() {
+    if (minInputFileCount > maxInputFileCount) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.HOPLOG_MIN_IS_MORE_THAN_MAX
+          .toLocalizedString(new Object[] {
+              "HDFS_COMPACTION_MIN_INPUT_FILE_COUNT",
+              minInputFileCount,
+              "HDFS_COMPACTION_MAX_INPUT_FILE_COUNT",
+              maxInputFileCount }));
+    }
+    super.validate();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
new file mode 100644
index 0000000..7bbcf7b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHDFSQueuePersistedEvent.java
@@ -0,0 +1,78 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.DataSerializableFixedID;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * A persistent event that is stored in the hoplog queue. This class is only used
+ * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
+ * record in the file.
+ * 
+ * @author dsmith
+ *
+ */
+public class SortedHDFSQueuePersistedEvent extends SortedHoplogPersistedEvent implements QueuedPersistentEvent {
+  
+  
+  /**key stored in serialized form*/
+  protected byte[] keyBytes = null;
+  
+  public SortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
+  ClassNotFoundException {
+    this(in.getSerializedValue(), in.getOperation(), in.getValueIsObject(), in
+        .getPossibleDuplicate(), in.getVersionTag(), in.getSerializedKey(), in
+        .getCreationTime());
+  }
+
+  public SortedHDFSQueuePersistedEvent(Object valueObject, Operation operation,
+      byte valueIsObject, boolean possibleDuplicate, VersionTag versionTag,
+      byte[] serializedKey, long timestamp) throws ClassNotFoundException, IOException {
+    super(valueObject, operation, valueIsObject, possibleDuplicate, versionTag, timestamp);
+    this.keyBytes = serializedKey;
+    // TODO Auto-generated constructor stub
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeByteArray(this.keyBytes, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.keyBytes = DataSerializer.readByteArray(in);
+  }
+
+  @Override
+  public void toHoplogEventBytes(DataOutput out) throws IOException {
+    super.toData(out);
+  }
+
+  public byte[] getRawKey() {
+    return this.keyBytes;
+  }
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    
+    int size = SortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
+    
+    size += keySize;
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
new file mode 100644
index 0000000..a2bcba1
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/SortedHoplogPersistedEvent.java
@@ -0,0 +1,106 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.ByteArrayDataInput;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+/**
+ * A persistent event that is stored in a sorted hoplog. In addition
+ * to the fields of PersistentEventImpl, this event has a version tag.
+ * 
+ * This class should only be serialized by directly calling toData,
+ * which is why it does not implement DataSerializable
+ * 
+ * @author dsmith
+ */
+public class SortedHoplogPersistedEvent extends PersistedEventImpl {
+  /** version tag for concurrency checks */
+  protected VersionTag versionTag;
+
+  /** timestamp of the event. Used when version checks are disabled*/
+  protected long timestamp;
+
+  public SortedHoplogPersistedEvent(Object valueObject, Operation operation,
+      byte valueIsObject, boolean possibleDuplicate, VersionTag tag, long timestamp) throws ClassNotFoundException, IOException {
+    super(valueObject, operation, valueIsObject, possibleDuplicate, tag != null);
+    this.versionTag = tag;
+    this.timestamp = timestamp;
+  }
+
+  public SortedHoplogPersistedEvent() {
+    //for deserialization
+  }
+
+  @Override
+  public long getTimstamp() {
+    return versionTag == null ? timestamp : versionTag.getVersionTimeStamp();
+  }
+  
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    if (versionTag == null) {
+      out.writeLong(timestamp);
+    } else {
+      //TODO optimize these
+      DataSerializer.writeObject(this.versionTag, out);
+    }
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    if (hasVersionTag()) {
+      this.versionTag = (VersionTag)DataSerializer.readObject(in);
+    } else {
+      this.timestamp = in.readLong();
+    }
+  }
+  
+  /**
+   * @return the concurrency versioning tag for this event, if any
+   */
+  public VersionTag getVersionTag() {
+    return this.versionTag;
+  }
+  
+  public static SortedHoplogPersistedEvent fromBytes(byte[] val)
+      throws IOException, ClassNotFoundException {
+    ByteArrayDataInput in = new ByteArrayDataInput();
+    in.initialize(val, null);
+    SortedHoplogPersistedEvent event = new SortedHoplogPersistedEvent();
+    event.fromData(in);
+    return event;
+  }
+  
+  public void copy(PersistedEventImpl usersValue) {
+    super.copy(usersValue);
+    this.versionTag = ((SortedHoplogPersistedEvent) usersValue).versionTag;
+    this.timestamp = ((SortedHoplogPersistedEvent) usersValue).timestamp;
+  }
+  
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
+    
+    if (versionTag != null) {
+      size +=  versionTag.getSizeInBytes();
+    } else {
+      // size of Timestamp
+      size += 8;
+    }
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
new file mode 100644
index 0000000..7b0570b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHDFSQueuePersistedEvent.java
@@ -0,0 +1,68 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+
+/**
+ * A persistent event that is stored in the hoplog queue. This class is only used
+ * temporarily to copy the data from the HDFSGatewayEventImpl to the persisted
+ * record in the file. 
+ * 
+ * @author dsmith
+ *
+ */
+public class UnsortedHDFSQueuePersistedEvent extends UnsortedHoplogPersistedEvent implements QueuedPersistentEvent {
+  
+  /**the bytes of the key for this entry */
+  protected byte[] keyBytes = null;
+  
+  public UnsortedHDFSQueuePersistedEvent(HDFSGatewayEventImpl in) throws IOException,
+  ClassNotFoundException {
+    super(in.getValue(), in.getOperation(), in.getValueIsObject(), in.getPossibleDuplicate(), 
+        in.getVersionTimeStamp() == 0 ? in.getCreationTime() : in.getVersionTimeStamp());
+    this.keyBytes = in.getSerializedKey();
+    
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeByteArray(this.keyBytes, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.keyBytes = DataSerializer.readByteArray(in);
+  }
+  
+  @Override
+  public void toHoplogEventBytes(DataOutput out) throws IOException {
+    super.toData(out);
+  }
+  
+  public byte[] getRawKey() {
+    return this.keyBytes;
+  }
+  
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    
+    int size = UnsortedHoplogPersistedEvent.getSizeInBytes(keySize, valueSize, versionTag);
+    
+    size += keySize;
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
new file mode 100644
index 0000000..ee48b16
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/UnsortedHoplogPersistedEvent.java
@@ -0,0 +1,84 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.internal.ByteArrayDataInput;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+
+/**
+ * A persisted event that is sorted in an unsorted (sequential hoplog). This
+ * does not have a version stamp, but just a timestamp for the entry.
+ * 
+ * This class should only be serialized by calling toData directly, which
+ * is why it does not implement DataSerializable.
+ * 
+ * @author dsmith
+ *
+ */
+public class UnsortedHoplogPersistedEvent extends PersistedEventImpl {
+  long timestamp;
+  
+  
+
+  public UnsortedHoplogPersistedEvent() {
+    //for deserialization
+  }
+
+  public UnsortedHoplogPersistedEvent(Object value, Operation op,
+      byte valueIsObject, boolean isPossibleDuplicate, long timestamp) throws IOException,
+      ClassNotFoundException {
+    super(value, op, valueIsObject, isPossibleDuplicate, false/*hasVersionTag*/);
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public long getTimstamp() {
+    return timestamp;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeLong(timestamp, out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.timestamp = DataSerializer.readLong(in);
+  }
+  
+  public static UnsortedHoplogPersistedEvent fromBytes(byte[] val)
+      throws IOException, ClassNotFoundException {
+    ByteArrayDataInput in = new ByteArrayDataInput();
+    in.initialize(val, null);
+    UnsortedHoplogPersistedEvent event = new UnsortedHoplogPersistedEvent();
+    event.fromData(in);
+    return event;
+  }
+  
+  public void copy(PersistedEventImpl usersValue) {
+    super.copy(usersValue);
+    this.timestamp = ((UnsortedHoplogPersistedEvent) usersValue).timestamp;
+  }
+  
+  public static int getSizeInBytes(int keySize, int valueSize, VersionTag versionTag) {
+    int size = PersistedEventImpl.getSizeInBytes(keySize, valueSize, versionTag);
+    
+    // size of Timestamp
+    size += 8;
+    
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/Bits.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/Bits.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/Bits.java
new file mode 100644
index 0000000..06ed635
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/Bits.java
@@ -0,0 +1,45 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.cardinality;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public class Bits
+{
+
+    public static int[] getBits(byte[] mBytes) throws IOException
+    {
+        int bitSize = mBytes.length / 4;
+        int[] bits = new int[bitSize];
+        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(mBytes));
+        for (int i = 0; i < bitSize; i++)
+        {
+            bits[i] = dis.readInt();
+        }
+        return bits;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/CardinalityMergeException.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/CardinalityMergeException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/CardinalityMergeException.java
new file mode 100644
index 0000000..31d3f16
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/CardinalityMergeException.java
@@ -0,0 +1,33 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.cardinality;
+
+@SuppressWarnings("serial")
+public abstract class CardinalityMergeException extends Exception
+{
+    public CardinalityMergeException(String message)
+    {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/HyperLogLog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/HyperLogLog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/HyperLogLog.java
new file mode 100644
index 0000000..8c2cd18
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/HyperLogLog.java
@@ -0,0 +1,304 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/*
+ * Copyright (C) 2012 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.cardinality;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Java implementation of HyperLogLog (HLL) algorithm from this paper:
+ * <p/>
+ * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
+ * <p/>
+ * HLL is an improved version of LogLog that is capable of estimating
+ * the cardinality of a set with accuracy = 1.04/sqrt(m) where
+ * m = 2^b.  So we can control accuracy vs space usage by increasing
+ * or decreasing b.
+ * <p/>
+ * The main benefit of using HLL over LL is that it only requires 64%
+ * of the space that LL does to get the same accuracy.
+ * <p/>
+ * This implementation implements a single counter.  If a large (millions)
+ * number of counters are required you may want to refer to:
+ * <p/>
+ * http://dsiutils.dsi.unimi.it/
+ * <p/>
+ * It has a more complex implementation of HLL that supports multiple counters
+ * in a single object, drastically reducing the java overhead from creating
+ * a large number of objects.
+ * <p/>
+ * This implementation leveraged a javascript implementation that Yammer has
+ * been working on:
+ * <p/>
+ * https://github.com/yammer/probablyjs
+ * <p>
+ * Note that this implementation does not include the long range correction function
+ * defined in the original paper.  Empirical evidence shows that the correction
+ * function causes more harm than good.
+ * </p>
+ *
+ * <p>
+ * Users have different motivations to use different types of hashing functions.
+ * Rather than try to keep up with all available hash functions and to remove
+ * the concern of causing future binary incompatibilities this class allows clients
+ * to offer the value in hashed int or long form.  This way clients are free
+ * to change their hash function on their own time line.  We recommend using Google's
+ * Guava Murmur3_128 implementation as it provides good performance and speed when
+ * high precision is required.  In our tests the 32bit MurmurHash function included
+ * in this project is faster and produces better results than the 32 bit murmur3
+ * implementation google provides.
+ * </p>
+ */
+public class HyperLogLog implements ICardinality
+{
+    private final RegisterSet registerSet;
+    private final int log2m;
+    private final double alphaMM;
+
+
+    /**
+     * Create a new HyperLogLog instance using the specified standard deviation.
+     *
+     * @param rsd - the relative standard deviation for the counter.
+     *            smaller values create counters that require more space.
+     */
+    public HyperLogLog(double rsd)
+    {
+        this(log2m(rsd));
+    }
+
+    private static int log2m(double rsd)
+    {
+        return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2));
+    }
+
+    /**
+     * Create a new HyperLogLog instance.  The log2m parameter defines the accuracy of
+     * the counter.  The larger the log2m the better the accuracy.
+     * <p/>
+     * accuracy = 1.04/sqrt(2^log2m)
+     *
+     * @param log2m - the number of bits to use as the basis for the HLL instance
+     */
+    public HyperLogLog(int log2m)
+    {
+        this(log2m, new RegisterSet((int) Math.pow(2, log2m)));
+    }
+
+    /**
+     * Creates a new HyperLogLog instance using the given registers.  Used for unmarshalling a serialized
+     * instance and for merging multiple counters together.
+     *
+     * @param registerSet - the initial values for the register set
+     */
+    public HyperLogLog(int log2m, RegisterSet registerSet)
+    {
+        this.registerSet = registerSet;
+        this.log2m = log2m;
+        int m = (int) Math.pow(2, this.log2m);
+
+        // See the paper.
+        switch (log2m)
+        {
+            case 4:
+                alphaMM = 0.673 * m * m;
+                break;
+            case 5:
+                alphaMM = 0.697 * m * m;
+                break;
+            case 6:
+                alphaMM = 0.709 * m * m;
+                break;
+            default:
+                alphaMM = (0.7213 / (1 + 1.079 / m)) * m * m;
+        }
+    }
+
+
+    @Override
+    public boolean offerHashed(long hashedValue)
+    {
+        // j becomes the binary address determined by the first b log2m of x
+        // j will be between 0 and 2^log2m
+        final int j = (int) (hashedValue >>> (Long.SIZE - log2m));
+        final int r = Long.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1;
+        return registerSet.updateIfGreater(j, r);
+    }
+
+    @Override
+    public boolean offerHashed(int hashedValue)
+    {
+        // j becomes the binary address determined by the first b log2m of x
+        // j will be between 0 and 2^log2m
+        final int j = hashedValue >>> (Integer.SIZE - log2m);
+        final int r = Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1;
+        return registerSet.updateIfGreater(j, r);
+    }
+
+	@Override
+    public boolean offer(Object o)
+    {
+        final int x = MurmurHash.hash(o);
+        return offerHashed(x);
+    }
+
+
+    @Override
+    public long cardinality()
+    {
+        double registerSum = 0;
+        int count = registerSet.count;
+        double zeros = 0.0;
+        for (int j = 0; j < registerSet.count; j++)
+        {
+            int val = registerSet.get(j);
+            registerSum += 1.0 / (1<<val);
+            if (val == 0) {
+                zeros++;
+            }
+        }
+
+        double estimate = alphaMM * (1 / registerSum);
+
+        if (estimate <= (5.0 / 2.0) * count)
+        {
+            // Small Range Estimate
+            return Math.round(count * Math.log(count / zeros));
+        }
+        else
+        {
+            return Math.round(estimate);
+        }
+    }
+
+    @Override
+    public int sizeof()
+    {
+        return registerSet.size * 4;
+    }
+
+    @Override
+    public byte[] getBytes() throws IOException
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+
+        dos.writeInt(log2m);
+        dos.writeInt(registerSet.size * 4);
+        for (int x : registerSet.bits())
+        {
+            dos.writeInt(x);
+        }
+
+        return baos.toByteArray();
+    }
+    
+    /** Add all the elements of the other set to this set.
+     * 
+     * This operation does not imply a loss of precision.
+     * 
+     * @param other A compatible Hyperloglog instance (same log2m)
+     * @throws CardinalityMergeException if other is not compatible
+     */
+    public void addAll(HyperLogLog other) throws CardinalityMergeException {
+        if (this.sizeof() != other.sizeof())
+        {
+            throw new HyperLogLogMergeException("Cannot merge estimators of different sizes");
+        }
+        
+        registerSet.merge(other.registerSet);
+    }
+
+    @Override
+    public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException
+    {
+        HyperLogLog merged = new HyperLogLog(log2m);
+        merged.addAll(this);
+
+        if (estimators == null)
+        {
+            return merged;
+        }
+        
+        for (ICardinality estimator : estimators)
+        {
+            if (!(estimator instanceof HyperLogLog))
+            {
+                throw new HyperLogLogMergeException("Cannot merge estimators of different class");
+            }
+            HyperLogLog hll = (HyperLogLog) estimator;
+            merged.addAll(hll);
+        }
+        
+        return merged;
+    }
+
+    public static class Builder implements IBuilder<ICardinality>, Serializable
+    {
+        private double rsd;
+
+        public Builder(double rsd)
+        {
+            this.rsd = rsd;
+        }
+
+        @Override
+        public HyperLogLog build()
+        {
+            return new HyperLogLog(rsd);
+        }
+
+        @Override
+        public int sizeof()
+        {
+            int log2m = log2m(rsd);
+            int k = (int) Math.pow(2, log2m);
+            return RegisterSet.getBits(k) * 4;
+        }
+
+        public static HyperLogLog build(byte[] bytes) throws IOException
+        {
+            ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            DataInputStream oi = new DataInputStream(bais);
+            int log2m = oi.readInt();
+            int size = oi.readInt();
+            byte[] longArrayBytes = new byte[size];
+            oi.readFully(longArrayBytes);
+            return new HyperLogLog(log2m, new RegisterSet((int) Math.pow(2, log2m), Bits.getBits(longArrayBytes)));
+        }
+    }
+
+    @SuppressWarnings("serial")
+    protected static class HyperLogLogMergeException extends CardinalityMergeException
+    {
+        public HyperLogLogMergeException(String message)
+        {
+            super(message);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/IBuilder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/IBuilder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/IBuilder.java
new file mode 100644
index 0000000..935b2bb
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/IBuilder.java
@@ -0,0 +1,32 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.cardinality;
+
+
+public interface IBuilder<T>
+{
+    T build();
+
+    int sizeof();
+}


Mime
View raw message