geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [44/57] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Thu, 09 Jul 2015 17:03:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
new file mode 100644
index 0000000..d42ebeb
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionFunction.java
@@ -0,0 +1,121 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Function responsible for forcing a compaction on all members
+ * of the system
+ *
+ * @author sbawaska
+ */
+@SuppressWarnings("serial")
+public class HDFSForceCompactionFunction implements Function, InternalEntity {
+
+  public static final int FORCE_COMPACTION_MAX_RETRIES = Integer.getInteger("gemfireXD.maxCompactionRetries", 3);
+
+  public static final int BUCKET_ID_FOR_LAST_RESULT = -1;
+
+  public static final String ID = "HDFSForceCompactionFunction";
+
+  private static final Logger logger = LogService.getLogger();
+  
+  @Override
+  public void execute(FunctionContext context) {
+    if (context.isPossibleDuplicate()) {
+      // do not re-execute the function, another function
+      // targeting the failed buckets will be invoked
+      context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, false));
+      return;
+    }
+    RegionFunctionContext rfc = (RegionFunctionContext) context;
+    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+    HDFSForceCompactionArgs args = (HDFSForceCompactionArgs) rfc.getArguments();
+    Set<Integer> buckets = new HashSet<Integer>(args.getBuckets()); // copying avoids race when the function coordinator
+                                                                    // also runs the function locally
+    buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
+
+    List<Future<CompactionStatus>> futures =  pr.forceLocalHDFSCompaction(buckets, args.isMajor(), 0);
+    int waitFor = args.getMaxWaitTime();
+    for (Future<CompactionStatus> future : futures) {
+      long start = System.currentTimeMillis();
+      CompactionStatus status = null;
+      try {
+        // TODO use a CompletionService instead
+        if (!args.isSynchronous() && waitFor <= 0) {
+          break;
+        }
+        status = args.isSynchronous() ? future.get() : future.get(waitFor, TimeUnit.MILLISECONDS);
+        buckets.remove(status.getBucketId());
+        if (logger.isDebugEnabled()) {
+          logger.debug("HDFS: ForceCompaction sending result:"+status);
+        }
+        context.getResultSender().sendResult(status);
+        long elapsedTime = System.currentTimeMillis() - start;
+        waitFor -= elapsedTime;
+      } catch (InterruptedException e) {
+        // send a list of failed buckets after waiting for all buckets
+      } catch (ExecutionException e) {
+        // send a list of failed buckets after waiting for all buckets
+      } catch (TimeoutException e) {
+        // do not wait for other buckets to complete
+        break;
+      }
+    }
+    // for asynchronous invocation, the status is true for buckets that we did not wait for
+    boolean status = args.isSynchronous() ? false : true;
+    for (Integer bucketId : buckets) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("HDFS: ForceCompaction sending result for bucket:"+bucketId);
+      }
+      context.getResultSender().sendResult(new CompactionStatus(bucketId, status));
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("HDFS: ForceCompaction sending last result");
+    }
+    context.getResultSender().lastResult(new CompactionStatus(BUCKET_ID_FOR_LAST_RESULT, true));
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    // run compaction on primary members
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    // so that we can target re-execution on failed buckets
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
new file mode 100644
index 0000000..913666f
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionResultCollector.java
@@ -0,0 +1,123 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+
+/**
+ * 
+ * @author sbawaska
+ */
+public class HDFSForceCompactionResultCollector implements LocalResultCollector<Object, List<CompactionStatus>> {
+
+  /** list of received replies*/
+  private List<CompactionStatus> reply = new ArrayList<CompactionStatus>();
+
+  /** semaphore to block the caller of getResult()*/
+  private CountDownLatch waitForResults = new CountDownLatch(1);
+
+  /** boolean to indicate if clearResults() was called to indicate a failure*/
+  private volatile boolean shouldRetry;
+
+  private ReplyProcessor21 processor;
+
+  @Override
+  public List<CompactionStatus> getResult() throws FunctionException {
+    try {
+      waitForResults.await();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
+      throw new FunctionException(e);
+    }
+    return reply;
+  }
+
+  @Override
+  public List<CompactionStatus> getResult(long timeout, TimeUnit unit)
+      throws FunctionException, InterruptedException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void addResult(DistributedMember memberID,
+      Object resultOfSingleExecution) {
+    if (resultOfSingleExecution instanceof CompactionStatus) {
+      CompactionStatus status = (CompactionStatus) resultOfSingleExecution;
+      if (status.getBucketId() != HDFSForceCompactionFunction.BUCKET_ID_FOR_LAST_RESULT) {
+        reply.add(status);
+      }
+    }
+  }
+
+  @Override
+  public void endResults() {
+    waitForResults.countDown();
+  }
+
+  @Override
+  public void clearResults() {
+    this.shouldRetry = true;
+    waitForResults.countDown();
+  }
+
+  /**
+   * @return true if retry should be attempted
+   */
+  public boolean shouldRetry() {
+    return this.shouldRetry || !getFailedBucketIds().isEmpty();
+  }
+
+  private Set<Integer> getFailedBucketIds() {
+    Set<Integer> result = new HashSet<Integer>();
+    for (CompactionStatus status : reply) {
+      if (!status.isStatus()) {
+        result.add(status.getBucketId());
+      }
+    }
+    return result;
+  }
+
+  public Set<Integer> getSuccessfulBucketIds() {
+    Set<Integer> result = new HashSet<Integer>();
+    for (CompactionStatus status : reply) {
+      if (status.isStatus()) {
+        result.add(status.getBucketId());
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void setProcessor(ReplyProcessor21 processor) {
+    this.processor = processor;
+  }
+
+  @Override
+  public ReplyProcessor21 getProcessor() {
+    return this.processor;
+  }
+
+@Override
+public void setException(Throwable exception) {
+	// TODO Auto-generated method stub
+	
+}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
new file mode 100644
index 0000000..2993831
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSLastCompactionTimeFunction.java
@@ -0,0 +1,48 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import com.gemstone.gemfire.cache.execute.FunctionAdapter;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+
+/**
+ * Function that returns the oldest timestamp among all the major
+ * compacted buckets on the members
+ *
+ * @author sbawaska
+ */
+@SuppressWarnings("serial")
+public class HDFSLastCompactionTimeFunction extends FunctionAdapter implements InternalEntity{
+
+  public static final String ID = "HDFSLastCompactionTimeFunction";
+
+  @Override
+  public void execute(FunctionContext context) {
+    RegionFunctionContext rfc = (RegionFunctionContext) context;
+    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+    rfc.getResultSender().lastResult(pr.lastLocalMajorHDFSCompaction());
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
new file mode 100644
index 0000000..2edcb91
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirector.java
@@ -0,0 +1,471 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.cache.GemFireCache;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.SystemTimer;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Cache for hoplog organizers associated with buckets of a region. The director creates an
+ * instance of organizer on first get request. It does not read HDFS in advance. Creation of
+ * organizer depends on File system initialization that takes outside this class. This class also
+ * provides utility methods to monitor usage and manage bucket sets.
+ * 
+ */
+public class HDFSRegionDirector {
+  /*
+   * Maps each region name to its listener and store objects. This map must be populated before file
+   * organizers of a bucket can be created
+   */
+  private final ConcurrentHashMap<String, HdfsRegionManager> regionManagerMap;
+  
+  /**
+   * regions of this Gemfire cache are managed by this director. TODO this
+   * should be final and be provided at the time of creation of this instance or
+   * through a cache directory
+   */
+  private GemFireCache cache;
+  
+  // singleton instance
+  private static HDFSRegionDirector instance;
+  
+  final ScheduledExecutorService janitor;
+  private JanitorTask janitorTask;
+  
+  private static final Logger logger = LogService.getLogger();
+  protected final static String logPrefix = "<" + "RegionDirector" + "> ";
+  
+  
+  private HDFSRegionDirector() {
+    regionManagerMap = new ConcurrentHashMap<String, HDFSRegionDirector.HdfsRegionManager>();
+    janitor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread thread = new Thread(r, "HDFSRegionJanitor");
+        thread.setDaemon(true);
+        return thread;
+      }
+    });
+    
+    long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
+        HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
+    
+    janitorTask = new JanitorTask();
+    janitor.scheduleWithFixedDelay(janitorTask, interval, interval,
+        TimeUnit.SECONDS);
+  }
+  
+  public synchronized static HDFSRegionDirector getInstance() {
+    if (instance == null) {
+      instance = new HDFSRegionDirector();
+    }
+    return instance;
+  }
+  
+  public HDFSRegionDirector setCache(GemFireCache cache) {
+    this.cache = cache;
+    return this;
+  }
+
+  public GemFireCache getCache() {
+    return this.cache;
+  }
+  /**
+   * Caches listener, store object and list of organizers associated with the region associated with
+   * a region. Subsequently, these objects will be used each time an organizer is created
+   */
+  public synchronized HdfsRegionManager manageRegion(LocalRegion region, String storeName,
+      HoplogListener listener) {
+    
+    HdfsRegionManager manager = regionManagerMap.get(region.getFullPath());
+    if (manager != null) {
+      // this is an attempt to re-register a region. Assuming this was required
+      // to modify listener or hdfs store impl associated with the region. Hence
+      // will clear the region first.
+
+      clear(region.getFullPath());
+    }
+    
+    HDFSStoreImpl store = HDFSStoreDirector.getInstance().getHDFSStore(storeName);
+    manager = new HdfsRegionManager(region, store, listener, getStatsFactory(), this);
+    regionManagerMap.put(region.getFullPath(), manager);
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Now managing region " + region.getFullPath(), logPrefix);
+    }
+    
+    return manager;
+  }
+  
+  /**
+   * Find the regions that are part of a particular HDFS store.
+   */
+  public Collection<String> getRegionsInStore(HDFSStore store) {
+    TreeSet<String> regions = new TreeSet<String>();
+    for(Map.Entry<String, HdfsRegionManager> entry : regionManagerMap.entrySet()) {
+      if(entry.getValue().getStore().equals(store)) {
+        regions.add(entry.getKey());
+      }
+    }
+    return regions;
+  }
+  
+  public int getBucketCount(String regionPath) {
+    HdfsRegionManager manager = regionManagerMap.get(regionPath);
+    if (manager == null) {
+      throw new IllegalStateException("Region not initialized");
+    }
+
+    return manager.bucketOrganizerMap.size();
+  }
+  
+  public void closeWritersForRegion(String regionPath, int minSizeForFileRollover) throws IOException {
+    regionManagerMap.get(regionPath).closeWriters(minSizeForFileRollover);
+  }
+  /**
+   * removes and closes all {@link HoplogOrganizer} of this region. This call is expected with
+   * a PR disowns a region.
+   */
+  public synchronized void clear(String regionPath) {
+    HdfsRegionManager manager = regionManagerMap.remove(regionPath);
+    if (manager != null) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Closing hoplog region manager for " + regionPath, logPrefix);
+      }
+      manager.close();
+    }
+  }
+
+  /**
+   * Closes all region managers, organizers and hoplogs. This method should be
+   * called before closing the cache to gracefully release all resources
+   */
+  public static synchronized void reset() {
+    if (instance == null) {
+      // nothing to reset
+      return;
+    }
+    
+    instance.janitor.shutdownNow();
+    
+    for (String region : instance.regionManagerMap.keySet()) {
+      instance.clear(region);
+    }
+    instance.cache = null;
+    instance = null;
+  }
+  
+  /**
+   * Terminates current janitor task and schedules a new. The rate of the new
+   * task is based on the value of system property at that time
+   */
+  public static synchronized void resetJanitor() {
+    instance.janitorTask.terminate();
+    instance.janitorTask = instance.new JanitorTask();
+    long interval = Long.getLong(HoplogConfig.JANITOR_INTERVAL_SECS,
+        HoplogConfig.JANITOR_INTERVAL_SECS_DEFAULT);
+    instance.janitor.scheduleWithFixedDelay(instance.janitorTask, 0, interval,
+        TimeUnit.SECONDS);
+  }
+  
+  /**
+   * @param regionPath name of region for which stats object is desired
+   * @return {@link SortedOplogStatistics} instance associated with hdfs region
+   *         name. Null if region is not managed by director
+   */
+  public synchronized SortedOplogStatistics getHdfsRegionStats(String regionPath) {
+    HdfsRegionManager manager = regionManagerMap.get(regionPath);
+    return manager == null ? null : manager.getHdfsStats();
+  }
+  
+  private StatisticsFactory getStatsFactory() {
+    return cache.getDistributedSystem();
+  }
+
+  /**
+   * A helper class to manage region and its organizers
+   */
+  public static class HdfsRegionManager {
+    // name and store configuration of the region whose buckets are managed by this director.
+    private LocalRegion region;
+    private HDFSStoreImpl store;
+    private HoplogListener listener;
+    private volatile boolean closed = false;
+    private final int FILE_ROLLOVER_TASK_INTERVAL = Integer.parseInt
+        (System.getProperty("gemfire.HDFSRegionDirector.FILE_ROLLOVER_TASK_INTERVAL_SECONDS", "60"));
+    
+    private SystemTimer hoplogCloseTimer = null;
+    
+    // instance of hdfs statistics object for this hdfs based region. This
+    // object will collect usage and performance related statistics.
+    private final SortedOplogStatistics hdfsStats;
+
+    /*
+     * An instance of organizer is created for each bucket of regionName region residing on this
+     * node. This member maps bucket id with its corresponding organizer instance. A lock is used to
+     * manage concurrent writes to the map.
+     */
+    private ConcurrentMap<Integer, HoplogOrganizer> bucketOrganizerMap;
+    
+    private HDFSRegionDirector hdfsRegionDirector;
+
+    /**
+     * @param listener
+     *          listener of change events like file creation and deletion
+     * @param hdfsRegionDirector 
+     */
+    HdfsRegionManager(LocalRegion region, HDFSStoreImpl store,
+        HoplogListener listener, StatisticsFactory statsFactory, HDFSRegionDirector hdfsRegionDirector) {
+      bucketOrganizerMap = new ConcurrentHashMap<Integer, HoplogOrganizer>();
+      this.region = region;
+      this.listener = listener;
+      this.store = store;
+      this.hdfsStats = new SortedOplogStatistics(statsFactory, "HDFSRegionStatistics", region.getFullPath());
+      this.hdfsRegionDirector = hdfsRegionDirector;
+    }
+
+    public void closeWriters(int minSizeForFileRollover) throws IOException {
+      final long startTime = System.currentTimeMillis();
+      long elapsedTime = 0;
+        
+      Collection<HoplogOrganizer> organizers = bucketOrganizerMap.values();
+      
+      for (HoplogOrganizer organizer : organizers) {
+      
+        try {
+          this.getRegion().checkReadiness();
+        } catch (Exception e) {
+          break;
+        }
+        
+        ((HDFSUnsortedHoplogOrganizer)organizer).synchronizedCloseWriter(true, 0, 
+            minSizeForFileRollover);
+      }
+      
+    }
+
+    public synchronized <T extends PersistedEventImpl> HoplogOrganizer<T> create(int bucketId) throws IOException {
+      assert !bucketOrganizerMap.containsKey(bucketId);
+
+      HoplogOrganizer<?> organizer = region.getHDFSWriteOnly() 
+          ? new HDFSUnsortedHoplogOrganizer(this, bucketId) 
+          : new HdfsSortedOplogOrganizer(this, bucketId);
+
+      bucketOrganizerMap.put(bucketId, organizer);
+      // initialize a timer that periodically closes the hoplog writer if the 
+      // time for rollover has passed. It also has the responsibility to fix the files.  
+      if (this.region.getHDFSWriteOnly() && 
+          hoplogCloseTimer == null) {
+        hoplogCloseTimer = new SystemTimer(hdfsRegionDirector.
+            getCache().getDistributedSystem(), true);
+        
+        // schedule the task to fix the files that were not closed properly 
+        // last time. 
+        hoplogCloseTimer.scheduleAtFixedRate(new CloseTmpHoplogsTimerTask(this), 
+            1000, FILE_ROLLOVER_TASK_INTERVAL * 1000);
+        
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}Schedulng hoplog rollover timer with interval "+ FILE_ROLLOVER_TASK_INTERVAL + 
+              " for hoplog organizer for " + region.getFullPath()
+              + ":" + bucketId + " " + organizer, logPrefix);
+        }
+      }
+      
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Constructed hoplog organizer for " + region.getFullPath()
+            + ":" + bucketId + " " + organizer, logPrefix);
+      }
+      return (HoplogOrganizer<T>) organizer;
+    }
+    
+    public synchronized <T extends PersistedEventImpl> void addOrganizer(
+        int bucketId, HoplogOrganizer<T> organizer) {
+      if (bucketOrganizerMap.containsKey(bucketId)) {
+        throw new IllegalArgumentException();
+      }
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}added pre constructed organizer " + region.getFullPath()
+            + ":" + bucketId + " " + organizer, logPrefix);
+      }
+      bucketOrganizerMap.put(bucketId, organizer);
+    }
+
+    public void close() {
+      closed = true;
+      
+      if (this.region.getHDFSWriteOnly() && 
+          hoplogCloseTimer != null) {
+        hoplogCloseTimer.cancel();
+        hoplogCloseTimer = null;
+      }
+      for (int bucket : bucketOrganizerMap.keySet()) {
+        close(bucket);
+      }
+    }
+    
+    public boolean isClosed() {
+      return closed;
+    }
+
+    public synchronized void close(int bucketId) {
+      try {
+        HoplogOrganizer organizer = bucketOrganizerMap.remove(bucketId);
+        if (organizer != null) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Closing hoplog organizer for " + region.getFullPath() + ":" + 
+                bucketId + " " + organizer, logPrefix);
+          }
+          organizer.close();
+        }
+      } catch (IOException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(logPrefix + "Error closing hoplog organizer for " + region.getFullPath() + ":" + bucketId, e);
+        }
+      }
+      //TODO abort compaction and flush requests for this region
+    }
+    
+    public static String getRegionFolder(String regionPath) {
+      String folder = regionPath;
+      //Change any underscore into a double underscore
+      folder = folder.replace("_", "__");
+      //get rid of the leading slash
+      folder = folder.replaceFirst("^/", "");
+      //replace slashes with underscores
+      folder = folder.replace('/', '_');
+      return folder;
+    }
+
+    public String getRegionFolder() {
+      return getRegionFolder(region.getFullPath());
+    }
+
+    public HoplogListener getListener() {
+      return listener;
+    }
+
+    public HDFSStoreImpl getStore() {
+      return store;
+    }
+
+    public LocalRegion getRegion() {
+      return region;
+    }
+    
+    public SortedOplogStatistics getHdfsStats() {
+      return hdfsStats;
+    }
+    
+    public Collection<HoplogOrganizer> getBucketOrganizers(){
+      return this.bucketOrganizerMap.values();
+    }
+
+    /**
+     * get the HoplogOrganizers only for the given set of buckets
+     */
+    public Collection<HoplogOrganizer> getBucketOrganizers(Set<Integer> buckets){
+      Set<HoplogOrganizer> result = new HashSet<HoplogOrganizer>();
+      for (Integer bucketId : buckets) {
+        result.add(this.bucketOrganizerMap.get(bucketId));
+      }
+      return result;
+    }
+
+    /**
+     * Delete all files from HDFS for this region. This method
+     * should be called after all members have destroyed their
+     * region in gemfire, so there should be no threads accessing
+     * these files.
+     * @throws IOException 
+     */
+    public void destroyData() throws IOException {
+      //Make sure everything is shut down and closed.
+      close();
+      if (store == null) {
+        return;
+      }
+      Path regionPath = new Path(store.getHomeDir(), getRegionFolder());
+      
+      //Delete all files in HDFS.
+      FileSystem fs = getStore().getFileSystem();
+      if(!fs.delete(regionPath, true)) {
+        if(fs.exists(regionPath)) {
+          throw new IOException("Unable to delete " + regionPath);
+        }
+      }
+    }
+
+    public void performMaintenance() throws IOException {
+      Collection<HoplogOrganizer> buckets = getBucketOrganizers();
+      for (HoplogOrganizer bucket : buckets) {
+        bucket.performMaintenance();
+      }
+    }
+  }
+  
+  private class JanitorTask implements Runnable {
+    boolean terminated = false;
+    @Override
+    public void run() {
+      if (terminated) {
+        return;
+      }
+      fineLog("Executing HDFS Region janitor task", null);
+      
+      Collection<HdfsRegionManager> regions = regionManagerMap.values();
+      for (HdfsRegionManager region : regions) {
+        fineLog("Maintaining region:" + region.getRegionFolder(), null);
+        try {
+          region.performMaintenance();
+        } catch (Throwable e) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.HOPLOG_IO_ERROR , region.getRegionFolder()));
+          logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, e.getMessage()));
+          fineLog(null, e);
+        }
+      }
+    }
+
+    public void terminate() {
+      terminated = true;
+    }
+  }
+  
+  protected static void fineLog(String message, Throwable e) {
+    if(logger.isDebugEnabled()) {
+      logger.debug(message, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
new file mode 100644
index 0000000..77e7165
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStoreDirector.java
@@ -0,0 +1,70 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+
+/**
+ * HDFSStoreDirector is created for managing all instances of HDFSStoreImpl.    
+ *
+ * @author Hemant Bhanawat
+ */
+public final class HDFSStoreDirector {
+  private final ConcurrentHashMap<String, HDFSStoreImpl> storeMap = new ConcurrentHashMap<String, HDFSStoreImpl>();
+
+  // singleton instance
+  private static volatile HDFSStoreDirector instance;
+  
+  private HDFSStoreDirector() {
+
+  }
+  
+  public static final HDFSStoreDirector getInstance() {
+    if (instance == null) {
+      synchronized (HDFSStoreDirector.class)  {
+        if (instance == null)
+          instance = new HDFSStoreDirector();
+      }
+    }
+    return instance;
+  }
+
+  // Called when the region is created.
+  public final void addHDFSStore(HDFSStoreImpl hdfsStore){
+    this.storeMap.put(hdfsStore.getName(), hdfsStore); 
+  }
+  
+  public final HDFSStoreImpl getHDFSStore(String hdfsStoreName) {
+    return this.storeMap.get(hdfsStoreName);
+  }
+  
+  public final void removeHDFSStore(String hdfsStoreName) {
+    this.storeMap.remove(hdfsStoreName);
+  } 
+  
+  public void closeHDFSStores() {
+    Iterator<HDFSStoreImpl> it = this.storeMap.values().iterator();
+    while (it.hasNext()) {
+      HDFSStoreImpl hsi = it.next();
+      hsi.close();
+    }
+    this.storeMap.clear();
+  }
+
+   public ArrayList<HDFSStoreImpl> getAllHDFSStores() {
+    ArrayList<HDFSStoreImpl> hdfsStores = new ArrayList<HDFSStoreImpl>();
+    hdfsStores.addAll(this.storeMap.values());
+    return hdfsStores;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
new file mode 100644
index 0000000..c9be401
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizer.java
@@ -0,0 +1,439 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * Manages unsorted Hoplog files for a bucket (Streaming Ingest option). An instance per bucket 
+ * will exist in each PR
+ * 
+ * @author hemantb
+ *
+ */
+public class HDFSUnsortedHoplogOrganizer extends AbstractHoplogOrganizer<UnsortedHoplogPersistedEvent> {
+  public static final String HOPLOG_REGEX = HOPLOG_NAME_REGEX + "("
+      + SEQ_HOPLOG_EXTENSION + "|" + TEMP_HOPLOG_EXTENSION + ")";
+  public static final Pattern HOPLOG_PATTERN = Pattern.compile(HOPLOG_REGEX);
+  protected static String TMP_FILE_NAME_REGEX = HOPLOG_NAME_REGEX + SEQ_HOPLOG_EXTENSION + TEMP_HOPLOG_EXTENSION + "$";
+  protected static final Pattern patternForTmpHoplog = Pattern.compile(TMP_FILE_NAME_REGEX);
+  
+   volatile private HoplogWriter writer;
+   volatile private Hoplog currentHoplog;
+   
+   volatile private long lastFlushTime = System.currentTimeMillis();
+   
+   volatile private boolean abortFlush = false;
+   private FileSystem fileSystem;
+   
+   public HDFSUnsortedHoplogOrganizer(HdfsRegionManager region, int bucketId) throws IOException{
+    super(region, bucketId);
+    writer = null;
+    sequence = new AtomicInteger(0);
+
+    fileSystem = store.getFileSystem();
+    if (! fileSystem.exists(bucketPath)) {
+      return;
+    }
+    
+    FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All valid hoplog files must match the regex
+        Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
+        return matcher.matches();
+      }
+    });
+
+    if (validHoplogs != null && validHoplogs.length > 0) {
+      for (FileStatus file : validHoplogs) {
+        // account for the disk used by this file
+        incrementDiskUsage(file.getLen());
+      }
+    }
+
+  }
+  
+    @Override
+    public void close() throws IOException {
+      super.close();
+      if (logger.isDebugEnabled())
+        logger.debug("{}Closing the hoplog organizer and the open files", logPrefix);
+      // abort the flush so that we can immediately call the close current writer. 
+      abortFlush = true;
+      synchronizedCloseWriter(true, 0, 0);
+    }
+    
+    
+    /**
+     * Flushes the data to HDFS. 
+     * Synchronization ensures that the writer is not closed when flush is happening.
+     * To abort the flush, abortFlush needs to be set.  
+     * @throws ForceReattemptException 
+     */
+     @Override
+    public synchronized void flush(Iterator<? extends QueuedPersistentEvent> bufferIter, final int count)
+        throws IOException, ForceReattemptException {
+      assert bufferIter != null;
+      
+      if (abortFlush)
+        throw new CacheClosedException("Either the region has been cleared " +
+            "or closed. Aborting the ongoing flush operation.");
+      if (logger.isDebugEnabled())
+        logger.debug("{}Initializing flush operation", logPrefix);
+      
+      // variables for updating stats
+      long start = stats.getFlush().begin();
+      int byteCount = 0;
+      if (writer == null) {
+        // Hoplogs of sequence files are always created with a 0 sequence number
+        currentHoplog = getTmpSortedOplog(0, SEQ_HOPLOG_EXTENSION);
+        try {
+          writer = this.store.getSingletonWriter().runSerially(new Callable<Hoplog.HoplogWriter>() {
+            @Override
+            public HoplogWriter call() throws Exception {
+              return currentHoplog.createWriter(count);
+            }
+          });
+        } catch (Exception e) {
+          if (e instanceof IOException) {
+            throw (IOException)e;
+          }
+          throw new IOException(e);
+        }
+      }
+      long timeSinceLastFlush = (System.currentTimeMillis() - lastFlushTime)/1000 ;
+      
+      try {
+        /**MergeGemXDHDFSToGFE changed the following statement as the code of HeapDataOutputStream is not merged */
+        //HeapDataOutputStream out = new HeapDataOutputStream();
+        while (bufferIter.hasNext()) {
+          HeapDataOutputStream out = new HeapDataOutputStream(1024, null);
+          if (abortFlush) {
+            stats.getFlush().end(byteCount, start);
+            throw new CacheClosedException("Either the region has been cleared " +
+            		"or closed. Aborting the ongoing flush operation.");
+          }
+          QueuedPersistentEvent item = bufferIter.next();
+          item.toHoplogEventBytes(out);
+          byte[] valueBytes = out.toByteArray();
+          writer.append(item.getRawKey(), valueBytes);
+          // add key length and value length to stats byte counter
+          byteCount += (item.getRawKey().length + valueBytes.length);
+          /**MergeGemXDHDFSToGFE how to clear for reuse. Leaving it for Darrel to merge this change*/
+          //out.clearForReuse();
+        }
+        // ping secondaries before making the file a legitimate file to ensure 
+        // that in case of split brain, no other vm has taken up as primary. #50110. 
+        if (!abortFlush)
+          pingSecondaries();
+        // append completed. If the file is to be rolled over, 
+        // close writer and rename the file to a legitimate name.
+        // Else, sync the already written data with HDFS nodes. 
+        int maxFileSize = this.store.getMaxFileSize() * 1024 * 1024;  
+        int fileRolloverInterval = this.store.getFileRolloverInterval(); 
+        if (writer.getCurrentSize() >= maxFileSize || 
+            timeSinceLastFlush >= fileRolloverInterval) {
+          closeCurrentWriter();
+        }
+        else {
+          // if flush is not aborted, hsync the batch. It ensures that 
+          // the batch has reached HDFS and we can discard it. 
+          if (!abortFlush)
+            writer.hsync();
+        }
+      } catch (IOException e) {
+        stats.getFlush().error(start);
+        // as there is an exception, it can be probably be a file specific problem.
+        // close the current file to avoid any file specific issues next time  
+        closeCurrentWriter();
+        // throw the exception so that async queue will dispatch the same batch again 
+        throw e;
+      } 
+      
+      stats.getFlush().end(byteCount, start);
+    }
+    
+    /**
+     * Synchronization ensures that the writer is not closed when flush is happening. 
+     */
+    synchronized void synchronizedCloseWriter(boolean forceClose, 
+        long timeSinceLastFlush, int minsizeforrollover) throws IOException { 
+      long writerSize = 0;
+      if (writer != null){
+        writerSize = writer.getCurrentSize();
+      }
+      
+      if (writerSize < (minsizeforrollover * 1024L))
+        return;
+      
+      int maxFileSize = this.store.getMaxFileSize() * 1024 * 1024;  
+      int fileRolloverInterval = this.store.getFileRolloverInterval(); 
+      if (writerSize >= maxFileSize || 
+          timeSinceLastFlush >= fileRolloverInterval || forceClose) {
+        closeCurrentWriter();
+      }
+      }
+        
+    
+    /**
+     * Closes the current writer so that next time a new hoplog can 
+     * be created. Also, fixes any tmp hoplogs. 
+     * 
+     * @throws IOException
+     */
+    void closeCurrentWriter() throws IOException {
+      
+      if (writer != null) {
+        // If this organizer is closing, it is ok to ignore exceptions here
+        // because CloseTmpHoplogsTimerTask
+        // on another member may have already renamed the hoplog
+        // fixes bug 49141
+        boolean isClosing = abortFlush;
+        try {
+          incrementDiskUsage(writer.getCurrentSize());
+        } catch (IOException e) {
+          if (!isClosing) {
+            throw e;
+          }
+        }
+        if (logger.isDebugEnabled())
+          logger.debug("{}Closing hoplog " + currentHoplog.getFileName(), logPrefix);
+        try{
+          writer.close();
+          makeLegitimate(currentHoplog);
+        } catch (IOException e) {
+          if (!isClosing) {
+            logger.warn(LocalizedStrings.HOPLOG_FLUSH_OPERATION_FAILED, e);
+            throw e;
+          }
+        } finally {
+          writer = null;
+          lastFlushTime = System.currentTimeMillis();
+        }
+      }
+      else
+        lastFlushTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public void clear() throws IOException {
+      boolean prevAbortFlushFlag = abortFlush;
+      // abort the flush so that we can immediately call the close current writer. 
+      abortFlush = true;
+      
+      // Close if there is any existing writer. 
+      try {
+        synchronizedCloseWriter(true, 0, 0);
+      } catch (IOException e) {
+        logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
+      }
+      
+      // reenable the aborted flush
+      abortFlush = prevAbortFlushFlag;
+      
+      // Mark the hoplogs for deletion
+      markHoplogsForDeletion();
+      
+    }
+  
+    @Override
+    public void performMaintenance() {
+      // TODO remove the timer for tmp file conversion. Use this instead
+    }
+
+    @Override
+    public Future<CompactionStatus> forceCompaction(boolean isMajor) {
+      return null;
+    }
+
+    @Override
+    protected Hoplog getHoplog(Path hoplogPath) throws IOException {
+      Hoplog so = new SequenceFileHoplog(fileSystem, hoplogPath, stats);
+      return so;
+    }
+  
+  /**
+   * Fixes the size of hoplogs that were not closed properly last time. 
+   * Such hoplogs are *.tmphop files. Identify them and open them and close 
+   * them, this fixes the size. After doing this rename them to *.hop. 
+   * 
+   * @throws IOException
+   * @throws ForceReattemptException 
+   */
+  void identifyAndFixTmpHoplogs(FileSystem fs) throws IOException, ForceReattemptException {
+    if (logger.isDebugEnabled())
+      logger.debug("{}Fixing temporary hoplogs", logPrefix);
+    
+    // A different filesystem is passed to this function for the following reason: 
+    // For HDFS, if a file wasn't closed properly last time, 
+    // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
+    // FSNamesystem.recoverLeaseInternal function gets called. 
+    // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file, 
+    // created using the same FileSystem object. This is a bug and is being tracked at: 
+    // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
+    // 
+    // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug, 
+    // we create a new file system for the timer task so that it does not encounter the bug. 
+    
+    FileStatus tmpHoplogs[] = FSUtils.listStatus(fs, fs.makeQualified(bucketPath), new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All valid hoplog files must match the regex
+        Matcher matcher = patternForTmpHoplog.matcher(file.getName());
+        return matcher.matches();
+      }
+    });
+    
+    if (tmpHoplogs == null || tmpHoplogs.length == 0) {
+      if (logger.isDebugEnabled())
+        logger.debug("{}No files to fix", logPrefix);
+      return;
+    }
+    // ping secondaries so that in case of split brain, no other vm has taken up 
+    // as primary. #50110. 
+    pingSecondaries();
+    if (logger.isDebugEnabled())
+      logger.debug("{}Files to fix " + tmpHoplogs.length, logPrefix);
+
+    String currentHoplogName = null;
+    // get the current hoplog name. We need to ignore current hoplog while fixing. 
+    if (currentHoplog != null) {
+      currentHoplogName = currentHoplog.getFileName();
+    }
+    
+    for (int i = 0; i < tmpHoplogs.length; i++) {
+      // Skip directories
+      if (tmpHoplogs[i].isDirectory()) {
+        continue;
+      }
+
+      final Path p = tmpHoplogs[i].getPath();
+      
+      if (tmpHoplogs[i].getPath().getName().equals(currentHoplogName)){
+        if (logger.isDebugEnabled())
+          logger.debug("Skipping current file: " + tmpHoplogs[i].getPath().getName(), logPrefix);
+        continue;
+      } 
+      
+      SequenceFileHoplog hoplog = new SequenceFileHoplog(fs, p, stats);
+      try {
+        makeLegitimate(hoplog);
+        logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " was a temporary " +
+            "hoplog because the node managing it wasn't shutdown properly last time. Fixed the hoplog name."));
+      } catch (IOException e) {
+        logger.info (LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + p + " is still a temporary " +
+            "hoplog because the node managing it wasn't shutdown properly last time. Failed to " +
+            "change the hoplog name because an exception was thrown while fixing it. " + e));
+      }
+    }
+  }
+  
+  private FileStatus[] getExpiredHoplogs() throws IOException {
+    FileStatus files[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All expired hoplog end with expire extension and must match the valid file regex
+        String fileName = file.getName();
+        if (! fileName.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
+          return false;
+        }
+        return true;
+      }
+    });
+    return files;
+  }
+  /**
+   * locks sorted oplogs collection, removes oplog and renames for deletion later
+   * @throws IOException 
+   */
+  private void markHoplogsForDeletion() throws IOException {
+    
+    ArrayList<IOException> errors = new ArrayList<IOException>();
+    FileStatus validHoplogs[] = FSUtils.listStatus(fileSystem, bucketPath, new PathFilter() {
+      @Override
+      public boolean accept(Path file) {
+        // All valid hoplog files must match the regex
+        Matcher matcher = HOPLOG_PATTERN.matcher(file.getName());
+        return matcher.matches();
+      }
+    });
+    
+    FileStatus[] expired = getExpiredHoplogs();
+    validHoplogs = filterValidHoplogs(validHoplogs, expired);
+
+    if (validHoplogs == null || validHoplogs.length == 0) {
+      return;
+    }
+    for (FileStatus fileStatus : validHoplogs) {
+      try {
+        addExpiryMarkerForAFile(getHoplog(fileStatus.getPath()));
+      } catch (IOException e) {
+        // even if there is an IO error continue removing other hoplogs and
+        // notify at the end
+        errors.add(e);
+      }
+    }
+    
+    if (!errors.isEmpty()) {
+      for (IOException e : errors) {
+        logger.warn(LocalizedStrings.HOPLOG_HOPLOG_REMOVE_FAILED, e);
+      }
+    }
+  }
+  
+  @Override
+  public Compactor getCompactor() {
+    throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+  }
+  
+    @Override
+  public HoplogIterator<byte[], UnsortedHoplogPersistedEvent> scan(
+      long startOffset, long length) throws IOException {
+    throw new UnsupportedOperationException("Not supported for " + this.getClass().getSimpleName());
+    }
+
+  public long getLastFlushTime() {
+    return this.lastFlushTime;
+      }
+  
+  public long getfileRolloverInterval(){
+    int fileRolloverInterval = this.store.getFileRolloverInterval(); 
+    return fileRolloverInterval;
+    }
+
+  @Override
+  public long getLastMajorCompactionTimestamp() {
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
new file mode 100644
index 0000000..89546bf
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HFileSortedOplog.java
@@ -0,0 +1,844 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.ShutdownHookManager;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.cardinality.HyperLogLog;
+import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics.ScanOperation;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.util.Hex;
+import com.gemstone.gemfire.internal.util.SingletonValue;
+import com.gemstone.gemfire.internal.util.SingletonValue.SingletonBuilder;
+
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
+import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex.BlockIndexReader;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.BloomFilterWriter;
+
+/**
+ * Implements hfile based {@link Hoplog}
+ */
+public final class HFileSortedOplog extends AbstractHoplog {
+
+//  private static final boolean CACHE_DATA_BLOCKS_ON_READ = !Boolean.getBoolean("gemfire.HFileSortedOplog.DISABLE_CACHE_ON_READ");
+  private final CacheConfig cacheConf;
+  private ICardinality entryCountEstimate;
+  
+  // a cached reader for the file
+  private final SingletonValue<HFileReader> reader;
+
+  public HFileSortedOplog(HDFSStoreImpl store, Path hfilePath,
+      BlockCache blockCache, SortedOplogStatistics stats,
+      HFileStoreStatistics storeStats) throws IOException {
+    super(store, hfilePath, stats);
+    cacheConf = getCacheConfInstance(blockCache, stats, storeStats);
+    reader = getReaderContainer();
+  }
+
+  /**
+   * THIS METHOD SHOULD BE USED FOR LONER ONLY
+   */
+  public static HFileSortedOplog getHoplogForLoner(FileSystem inputFS,
+      Path hfilePath) throws IOException {
+    return new HFileSortedOplog(inputFS, hfilePath, null, null, null);
+  }
+
+  private HFileSortedOplog(FileSystem inputFS, Path hfilePath,
+      BlockCache blockCache, SortedOplogStatistics stats,
+      HFileStoreStatistics storeStats) throws IOException {
+    super(inputFS, hfilePath, stats);
+    cacheConf = getCacheConfInstance(blockCache, stats, storeStats);
+    reader = getReaderContainer();
+  }
+
+  protected CacheConfig getCacheConfInstance(BlockCache blockCache,
+      SortedOplogStatistics stats, HFileStoreStatistics storeStats) {
+    CacheConfig tmpConfig = null;
+//    if (stats == null) {
+      tmpConfig = new CacheConfig(conf);
+//    } else {
+//      tmpConfig = new CacheConfig(conf, CACHE_DATA_BLOCKS_ON_READ, blockCache,
+//          HFileSortedOplogFactory.convertStatistics(stats, storeStats));
+//    }
+    tmpConfig.shouldCacheBlockOnRead(BlockCategory.ALL_CATEGORIES);
+    return tmpConfig;
+  }  
+
+  private SingletonValue<HFileReader> getReaderContainer() {
+    return new SingletonValue<HFileReader>(new SingletonBuilder<HFileReader>() {
+      @Override
+      public HFileReader create() throws IOException {
+        if (logger.isDebugEnabled())
+          logger.debug("{}Creating hoplog reader", logPrefix);
+        return new HFileReader();
+      }
+
+      @Override
+      public void postCreate() {
+        if (readerListener != null) {
+          readerListener.readerCreated();
+        }
+      }
+      
+      @Override
+      public void createInProgress() {
+      }
+    });
+  }
+  
+  @Override
+  public HoplogReader getReader() throws IOException {
+    return reader.get();
+  }
+  
+  @Override
+  public ICardinality getEntryCountEstimate() throws IOException {
+    ICardinality result = entryCountEstimate;
+    if (result == null) {
+      HoplogReader rdr = getReader(); // keep this out of the critical section
+      synchronized(this) {
+        result = entryCountEstimate;
+          if (result == null) {
+            entryCountEstimate = result = rdr.getCardinalityEstimator();
+          }
+        }
+    }
+    return result;
+  }
+  
+  @Override
+  public HoplogWriter createWriter(int keys) throws IOException {
+    return new HFileSortedOplogWriter(keys);
+  }
+
+  @Override
+  public boolean isClosed() {
+    HFileReader rdr = reader.getCachedValue();
+    return rdr == null || rdr.isClosed();
+  }
+  
+  @Override
+  public void close() throws IOException {
+    close(true);
+  }
+
+  @Override
+  public void close(boolean clearCache) throws IOException {
+    compareAndClose(null, clearCache);
+  }
+  
+  private void compareAndClose(HFileReader hfileReader, boolean clearCache) throws IOException {
+    HFileReader rdr ;
+    if (hfileReader == null) {
+      rdr = reader.clear(true);
+    } else {
+      boolean result = reader.clear(hfileReader, true);
+      if (! result) {
+        if (logger.isDebugEnabled())
+          logger.debug("{}skipping close, provided hfileReader mismatched", logPrefix);
+        return;
+      } 
+      rdr = hfileReader;
+    }
+    
+    if (rdr != null) {
+      try {
+        rdr.close(clearCache);
+      } finally {
+        if (readerListener != null) {
+          readerListener.readerClosed();
+        }
+      }
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return "HFileSortedOplog[" + getFileName() + "]";
+  }
+
+  private class HFileSortedOplogWriter implements HoplogWriter {
+    private final Writer writer;
+    private final BloomFilterWriter bfw;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public HFileSortedOplogWriter(int keys) throws IOException {
+      try {
+        int hfileBlockSize = Integer.getInteger(
+            HoplogConfig.HFILE_BLOCK_SIZE_CONF, (1 << 16));
+
+        Algorithm compress = Algorithm.valueOf(System.getProperty(HoplogConfig.COMPRESSION,
+            HoplogConfig.COMPRESSION_DEFAULT));
+
+//        ByteComparator bc = new ByteComparator();
+        writer = HFile.getWriterFactory(conf, cacheConf)
+            .withPath(fsProvider.getFS(), path)
+            .withBlockSize(hfileBlockSize)
+//            .withComparator(bc)
+            .withCompression(compress)
+            .create();
+//        bfw = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, BloomType.ROW, keys,
+//            writer, bc);
+        bfw = BloomFilterFactory.createGeneralBloomAtWrite(conf, cacheConf, BloomType.ROW, keys,
+            writer);
+
+        if (logger.isDebugEnabled())
+          logger.debug("{}Created hoplog writer with compression " + compress, logPrefix);
+      } catch (IOException e) {
+        if (logger.isDebugEnabled())
+          logger.debug("{}IO Error while creating writer", logPrefix);
+        throw e;
+      }
+    }
+
+    @Override
+    public void append(byte[] key, byte[] value) throws IOException {
+      writer.append(key, value);
+      bfw.add(key, 0, key.length);
+    }
+
+    @Override
+    public void append(ByteBuffer key, ByteBuffer value) throws IOException {
+      byte[] keyBytes = byteBufferToArray(key);
+      byte[] valueBytes = byteBufferToArray(value);
+      writer.append(keyBytes, valueBytes);
+      bfw.add(keyBytes, 0, keyBytes.length);
+    }
+
+    @Override
+    public void close() throws IOException {
+      close(null);
+    }
+
+    @Override
+    public void close(EnumMap<Meta, byte[]> metadata) throws IOException {
+      if (closed.get()) {
+        if (logger.isDebugEnabled())
+          logger.debug("{}Writer already closed", logPrefix);
+        return;
+      }
+      
+      bfw.compactBloom();
+      writer.addGeneralBloomFilter(bfw);
+
+      // append system metadata
+      writer.appendFileInfo(Meta.GEMFIRE_MAGIC.toBytes(), Hoplog.MAGIC);
+      writer.appendFileInfo(Meta.SORTED_OPLOG_VERSION.toBytes(), HoplogVersion.V1.toBytes());
+      writer.appendFileInfo(Meta.GEMFIRE_VERSION.toBytes(), Version.CURRENT.toBytes());
+      
+      // append comparator info
+//      if (writer.getComparator() instanceof DelegatingSerializedComparator) {
+//        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+//        DataOutput out = new DataOutputStream(bos);
+//        
+//        writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators());
+//        writer.appendFileInfo(Meta.COMPARATORS.toBytes(), bos.toByteArray());
+//      }
+      
+      // append user metadata
+      HyperLogLog cachedEntryCountEstimate = null;
+      if (metadata != null) {
+        for (Entry<Meta, byte[]> entry : metadata.entrySet()) {
+          writer.appendFileInfo(entry.getKey().toBytes(), entry.getValue());
+          if (Meta.LOCAL_CARDINALITY_ESTIMATE_V2.equals(entry.getKey())) {
+             cachedEntryCountEstimate = HyperLogLog.Builder.build(entry.getValue()); 
+          }
+        }
+      }
+      
+      writer.close();
+      if (logger.isDebugEnabled())
+        logger.debug("{}Completed closing writer", logPrefix);
+      closed.set(true);
+      // cache estimate value to avoid reads later
+      entryCountEstimate = cachedEntryCountEstimate;
+    }
+
+    @Override
+    public void hsync() throws IOException {
+      throw new UnsupportedOperationException("hsync is not supported for HFiles"); 
+    }
+
+    @Override
+    public long getCurrentSize() throws IOException {
+      throw new UnsupportedOperationException("getCurrentSize is not supported for HFiles"); 
+    }
+    
+//    private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators) throws IOException {
+//      out.writeInt(comparators.length);
+//      for (SerializedComparator sc : comparators) {
+//        out.writeUTF(sc.getClass().getName());
+//        if (sc instanceof DelegatingSerializedComparator) {
+//          writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators());
+//        }
+//      }
+//    }
+  }
+  
+  private void handleReadIOError(HFileReader hfileReader, IOException e, boolean skipFailIfSafe) {
+    if (logger.isDebugEnabled())
+      logger.debug("Read IO error", e);
+    boolean safeError = ShutdownHookManager.get().isShutdownInProgress();
+    if (safeError) {
+      // IOException because of closed file system. This happens when member is
+      // shutting down
+      if (logger.isDebugEnabled())
+        logger.debug("IO error caused by filesystem shutdown", e);
+      throw new CacheClosedException("IO error caused by filesystem shutdown", e);
+    } 
+    
+    // expose the error wrapped inside remote exception. Remote exceptions are
+    // handled by file system client. So let the caller handle this error
+    if (e instanceof RemoteException) {
+      e = ((RemoteException) e).unwrapRemoteException();
+      throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e);
+    } 
+    
+    FileSystem currentFs = fsProvider.checkFileSystem();
+    if (hfileReader != null && hfileReader.previousFS != currentFs) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Detected new FS client, closing old reader", logPrefix);
+        if (currentFs != null) {
+          if (logger.isDebugEnabled())
+            logger.debug("CurrentFs:" + currentFs.getUri() + "-"
+                + currentFs.hashCode(), logPrefix);
+        }
+        if (hfileReader.previousFS != null) {
+          if (logger.isDebugEnabled())
+            logger.debug("OldFs:" + hfileReader.previousFS.getUri() + "-"
+                + hfileReader.previousFS.hashCode() + ", closing old reader", logPrefix);
+        }
+      }
+      try {
+        HFileSortedOplog.this.compareAndClose(hfileReader, false);
+      } catch (Exception ex) {
+        if (logger.isDebugEnabled())
+          logger.debug("Failed to close reader", ex);
+      }
+      if (skipFailIfSafe) {
+        if (logger.isDebugEnabled())
+          logger.debug("Not faling after io error since FS client changed");
+        return;
+      }
+    }
+
+    // it is not a safe error. let the caller handle it
+    throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e);
+  }
+
+  class HFileReader implements HoplogReader, Closeable {
+    private final Reader reader;
+    private volatile BloomFilter hoplogBloom;
+    private final AtomicBoolean closed;
+    private final Map<byte[], byte[]> fileInfo;
+    private final HyperLogLog estimator;
+    private final FileSystem previousFS;
+    
+    public HFileReader() throws IOException {
+      try {
+        FileSystem fs = fsProvider.getFS();
+        reader = HFile.createReader(fs, path, cacheConf);
+        fileInfo = reader.loadFileInfo();
+        closed = new AtomicBoolean(false);
+
+        validate();
+        if (reader.getComparator() instanceof DelegatingSerializedComparator) {
+          loadComparators((DelegatingSerializedComparator) reader.getComparator());
+        }
+
+        // read the old HLL if it exists so that a CardinalityMergeException will trigger a Major Compaction
+        byte[] hll = fileInfo.get(Meta.LOCAL_CARDINALITY_ESTIMATE.toBytes());
+        if (hll != null) {
+          entryCountEstimate = estimator = HyperLogLog.Builder.build(hll);
+        } else if ((hll = fileInfo.get(Meta.LOCAL_CARDINALITY_ESTIMATE_V2.toBytes())) != null) {
+          entryCountEstimate = estimator = HyperLogLog.Builder.build(hll);
+        } else {
+          estimator = new HyperLogLog(HdfsSortedOplogOrganizer.HLL_CONSTANT);
+        }
+        
+        previousFS = fs;
+      } catch (IOException e) {
+        if (logger.isDebugEnabled())
+          logger.debug("IO Error while creating reader", e);
+        throw e;
+      }
+    }
+
+    @Override
+    public byte[] read(byte[] key) throws IOException {
+      IOException err = null;
+      HFileReader delegateReader = this;
+      for (int retry = 1; retry >= 0; retry --) {
+        try {
+          return delegateReader.readDelegate(key);
+        } catch (IOException e) {
+          err = e;
+          handleReadIOError(delegateReader, e, retry > 0);
+          // Current reader may have got closed in error handling. Get the new
+          // one for retry attempt
+          try {
+            delegateReader = (HFileReader) HFileSortedOplog.this.getReader(); 
+          } catch (IOException ex) {
+            handleReadIOError(null, e, false);
+          }
+        }
+      }
+
+      if (logger.isDebugEnabled())
+        logger.debug("Throwing err from read delegate ", err);
+      throw err;
+    }
+
+    private byte[] readDelegate(byte[] key) throws IOException {
+      try {
+        if (!getBloomFilter().mightContain(key)) {
+          // bloom filter check failed, the key is not present in this hoplog
+          return null;
+        }
+      } catch (IllegalArgumentException e) {
+        if (IOException.class.isAssignableFrom(e.getCause().getClass())) {
+          throw (IOException) e.getCause();
+        } else {
+          throw e;
+        }
+      }
+      
+      byte[] valueBytes = null;
+      ByteBuffer bb = get(key);
+      if (bb != null) {
+        valueBytes = new byte[bb.remaining()];
+        bb.get(valueBytes);
+      } else {
+        stats.getBloom().falsePositive();
+      }
+      return valueBytes;
+    }
+
+    @Override
+    public ByteBuffer get(byte[] key) throws IOException {
+      assert key != null;
+      HFileScanner seek = reader.getScanner(false, true);
+      if (seek.seekTo(key) == 0) {
+        return seek.getValue();
+      }
+      return null;
+    }
+
+    @Override
+    public HoplogIterator<byte[], byte[]> scan(byte[] from, boolean fromInclusive, byte[] to,
+        boolean toInclusive) throws IOException {
+      IOException err = null;
+      HFileReader delegateReader = this;
+      for (int retry = 1; retry >= 0; retry --) {
+        try {
+          return delegateReader.scanDelegate(from, fromInclusive, to, toInclusive);
+        } catch (IOException e) {
+          err = e;
+          handleReadIOError(delegateReader, e, retry > 0);
+          // Current reader may have got closed in error handling. Get the new
+          // one for retry attempt
+          try {
+            delegateReader = (HFileReader) HFileSortedOplog.this.getReader(); 
+          } catch (IOException ex) {
+            handleReadIOError(null, e, false);
+          }
+        }
+      }
+      if (logger.isDebugEnabled())
+        logger.debug("Throwing err from scan delegate ", err);
+      throw err;
+    }
+
+    private HoplogIterator<byte[], byte[]> scanDelegate(byte[] from, boolean fromInclusive, byte[] to,
+        boolean toInclusive) throws IOException {
+      return new HFileSortedIterator(reader.getScanner(true, false), from,
+          fromInclusive, to, toInclusive);
+    }
+    
+    @Override
+    public HoplogIterator<byte[], byte[]> scan(long offset, long length)
+        throws IOException {
+      /**
+       * Identifies the first and last key to be scanned based on offset and
+       * length. It loads hfile block index and identifies the first hfile block
+       * starting after offset. The key of that block is from key for scanner.
+       * Similarly it locates first block starting beyond offset + length range.
+       * It uses key of that block as the to key for scanner
+       */
+
+      // load block indexes in memory
+      BlockIndexReader bir = reader.getDataBlockIndexReader();
+      int blockCount = bir.getRootBlockCount();
+      
+      byte[] fromKey = null, toKey = null;
+
+      // find from key
+      int i = 0;
+      for (; i < blockCount; i++) {
+        if (bir.getRootBlockOffset(i) < offset) {
+          // hfile block has offset less than this reader's split offset. check
+          // the next block
+          continue;
+        }
+
+        // found the first hfile block starting after offset
+        fromKey = bir.getRootBlockKey(i);
+        break;
+      }
+
+      if (fromKey == null) {
+        // seems no block starts after the offset. return no-op scanner
+        return new HFileSortedIterator(null, null, false, null, false);
+      }
+      
+      // find to key
+      for (; i < blockCount; i++) {
+        if (bir.getRootBlockOffset(i) < (offset + length)) {
+          // this hfile block lies within the offset+lenght range. check the
+          // next block for a higher offset
+          continue;
+        }
+
+        // found the first block starting beyong offset+length range.
+        toKey = bir.getRootBlockKey(i);
+        break;
+      }
+
+      // from key is included in scan and to key is excluded
+      HFileScanner scanner = reader.getScanner(true, false);
+      return new HFileSortedIterator(scanner, fromKey, true, toKey, false);
+    }
+    
+    @Override
+    public HoplogIterator<byte[], byte[]> scan() throws IOException {
+      return scan(null, null);
+    }
+
+    public HoplogIterator<byte[], byte[]> scan(byte[] from, byte[] to)
+        throws IOException {
+      return scan(from, true, to, false);
+    }
+
+    @Override
+    public BloomFilter getBloomFilter() throws IOException {
+      BloomFilter result = hoplogBloom;
+      if (result == null) {
+        synchronized (this) {
+          result = hoplogBloom;
+          if (result == null) {
+            hoplogBloom = result = new BloomFilterImpl();
+          }
+        }
+      }
+      return result;
+    }
+
+    @Override
+    public boolean isClosed() {
+      return closed.get();
+    }
+    
+    @Override
+    public void close() throws IOException {
+      close(true);
+    }
+    
+    public void close(boolean clearCache) throws IOException {
+      if (closed.compareAndSet(false, true)) {
+        if (logger.isDebugEnabled())
+          logger.debug("{}Closing reader", logPrefix);
+        reader.close(clearCache);
+      }
+    }
+
+    @Override
+    public long getEntryCount() {
+      return reader.getEntries();
+    }
+
+    public ICardinality getCardinalityEstimator() {
+      return estimator;
+    }
+
+    @Override
+    public long sizeEstimate() {
+      return getCardinalityEstimator().cardinality();
+    }
+
+    private void validate() throws IOException {
+      // check magic
+      byte[] magic = fileInfo.get(Meta.GEMFIRE_MAGIC.toBytes());
+      if (!Arrays.equals(magic, MAGIC)) {
+        throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic)));
+      }
+      
+      // check version compatibility
+      byte[] ver = fileInfo.get(Meta.SORTED_OPLOG_VERSION.toBytes());
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}Hoplog version is " + Hex.toHex(ver), logPrefix);
+      }
+      
+      if (!Arrays.equals(ver, HoplogVersion.V1.toBytes())) {
+        throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver)));
+      }
+    }
+    
+    private void loadComparators(DelegatingSerializedComparator comparator) throws IOException {
+      byte[] raw = fileInfo.get(Meta.COMPARATORS.toBytes());
+      assert raw != null;
+
+      DataInput in = new DataInputStream(new ByteArrayInputStream(raw));
+      comparator.setComparators(readComparators(in));
+    }
+    
+    private SerializedComparator[] readComparators(DataInput in) throws IOException {
+      try {
+        SerializedComparator[] comps = new SerializedComparator[in.readInt()];
+        assert comps.length > 0;
+        
+        for (int i = 0; i < comps.length; i++) {
+          comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance();
+          if (comps[i] instanceof DelegatingSerializedComparator) {
+            ((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in));
+          }
+        }
+        return comps;
+        
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+    
+    class BloomFilterImpl implements BloomFilter {
+      private final org.apache.hadoop.hbase.util.BloomFilter hfileBloom;
+
+      public BloomFilterImpl() throws IOException {
+        DataInput bin = reader.getGeneralBloomFilterMetadata();
+        // instantiate bloom filter if meta present in hfile
+        if (bin != null) {
+          hfileBloom = BloomFilterFactory.createFromMeta(bin, reader);
+          if (reader.getComparator() instanceof DelegatingSerializedComparator) {
+            loadComparators((DelegatingSerializedComparator) hfileBloom.getComparator());
+          }
+        } else {
+          hfileBloom = null;
+        }
+      }
+
+      @Override
+      public boolean mightContain(byte[] key) {
+        assert key != null;
+        return mightContain(key, 0, key.length);
+      }
+
+      @Override
+      public boolean mightContain(byte[] key, int keyOffset, int keyLength) {
+        assert key != null;
+        long start = stats.getBloom().begin();
+        boolean found = hfileBloom == null ? true : hfileBloom.contains(key, keyOffset, keyLength, null);
+        stats.getBloom().end(start);
+        return found;
+      }
+
+      @Override
+      public long getBloomSize() {
+        return hfileBloom == null ? 0 : hfileBloom.getByteSize();
+      }
+    }
+
+    // TODO change the KV types to ByteBuffer instead of byte[]
+    public final class HFileSortedIterator implements HoplogIterator<byte[], byte[]> {
+      private final HFileScanner scan;
+      
+      private final byte[] from;
+      private final boolean fromInclusive;
+      
+      private final byte[] to;
+      private final boolean toInclusive;
+      
+      private ByteBuffer prefetchedKey;
+      private ByteBuffer prefetchedValue;
+      private ByteBuffer currentKey;
+      private ByteBuffer currentValue;
+      
+      // variable linked to scan stats
+      ScanOperation scanStat;
+      private long scanStart;
+      
+      public HFileSortedIterator(HFileScanner scan, byte[] from, boolean fromInclusive, byte[] to, 
+          boolean toInclusive) throws IOException {
+        this.scan = scan;
+        this.from = from;
+        this.fromInclusive = fromInclusive;
+        this.to = to;
+        this.toInclusive = toInclusive;
+
+        scanStat = (stats == null) ? new SortedOplogStatistics("", "").new ScanOperation(
+            0, 0, 0, 0, 0, 0, 0) : stats.getScan();
+        scanStart = scanStat.begin();
+
+        if (scan == null) {
+          return;
+        }
+
+        assert from == null || to == null
+            || scan.getReader().getComparator().compare(from, to) <= 0;
+
+        initIterator();
+      }
+      
+      /*
+       * prefetches first key and value from the file for hasnext to work
+       */
+      private void initIterator() throws IOException {
+        long startNext = scanStat.beginIteration();
+        boolean scanSuccessful = true;
+        if (from == null) {
+          scanSuccessful = scan.seekTo();
+        } else {
+          int compare = scan.seekTo(from);
+          if (compare == 0 && !fromInclusive || compare > 0) {
+            // as from in exclusive and first key is same as from, skip the first key
+            scanSuccessful = scan.next();
+          }
+        }
+        
+        populateKV(startNext, scanSuccessful);
+      }
+      
+      @Override
+      public boolean hasNext() {
+        return prefetchedKey != null;
+      }
+
+      @Override
+      public byte[] next() throws IOException {
+        return byteBufferToArray(nextBB());
+      }
+
+      public ByteBuffer nextBB() throws IOException {
+        long startNext = scanStat.beginIteration();
+        if (prefetchedKey == null) {
+          throw new NoSuchElementException();
+        }
+
+        currentKey = prefetchedKey;
+        currentValue = prefetchedValue;
+
+        prefetchedKey = null;
+        prefetchedValue = null;
+
+        if (scan.next()) {
+          populateKV(startNext, true);
+        }
+        
+        return currentKey;
+      }
+
+      
+      private void populateKV(long nextStartTime, boolean scanSuccessful) {
+        if (!scanSuccessful) {
+          //end of file reached. collect stats and return
+          scanStat.endIteration(0, nextStartTime);
+          return;
+        }
+        
+        prefetchedKey = scan.getKey();
+        prefetchedValue = scan.getValue();
+        
+        if (to != null) {
+          // TODO Optimization? Perform int comparison instead of byte[]. Identify
+          // offset of key greater than two.
+          int compare = -1;
+          compare = scan.getReader().getComparator().compare
+              (prefetchedKey.array(), prefetchedKey.arrayOffset(), prefetchedKey.remaining(), to, 0, to.length);
+          if (compare > 0 || (compare == 0 && !toInclusive)) {
+            prefetchedKey = null;
+            prefetchedValue = null;
+            return;
+          }
+        }
+        
+        // account for bytes read and time spent
+        int byteCount = prefetchedKey.remaining() + prefetchedValue.remaining();
+        scanStat.endIteration(byteCount, nextStartTime);
+      }
+      
+
+      @Override
+      public byte[] getKey() {
+        return byteBufferToArray(getKeyBB());
+      }
+      public ByteBuffer getKeyBB() {
+        return currentKey;
+      }
+
+      @Override
+      public byte[] getValue() {
+        return byteBufferToArray(getValueBB());
+      }
+      public ByteBuffer getValueBB() {
+        return currentValue;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("Cannot delete a key-value from a hfile sorted oplog");
+      }
+      
+      @Override
+      public void close() {
+        scanStat.end(scanStart);
+      }
+    }
+  }
+  
+  public static byte[] byteBufferToArray(ByteBuffer bb) {
+    if (bb == null) {
+      return null;
+    }
+    
+    byte[] tmp = new byte[bb.remaining()];
+    bb.duplicate().get(tmp);
+    return tmp;
+  }
+}


Mime
View raw message