geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [28/51] [partial] incubator-geode git commit: SGA #2
Date Fri, 03 Jul 2015 19:21:29 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/ICardinality.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/ICardinality.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/ICardinality.java
new file mode 100644
index 0000000..e798810
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/ICardinality.java
@@ -0,0 +1,78 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.cardinality;
+
+import java.io.IOException;
+
+
+public interface ICardinality
+{
+    /**
+     * @param o stream element
+     * @return false if the value returned by cardinality() is unaffected by the appearance of o in the stream.
+     */
+    boolean offer(Object o);
+
+    /**
+     * Offer the value as a hashed long value
+     *
+     * @param hashedLong - the hash of the item to offer to the estimator
+     * @return false if the value returned by cardinality() is unaffected by the appearance of hashedLong in the stream
+     */
+    boolean offerHashed(long hashedLong);
+
+    /**
+     * Offer the value as a hashed long value
+     *
+     * @param hashedInt - the hash of the item to offer to the estimator
+     * @return false if the value returned by cardinality() is unaffected by the appearance of hashedInt in the stream
+     */
+    boolean offerHashed(int hashedInt);
+
+    /**
+     * @return the number of unique elements in the stream or an estimate thereof
+     */
+    long cardinality();
+
+    /**
+     * @return size in bytes needed for serialization
+     */
+    int sizeof();
+
+    /**
+     * @throws IOException
+     */
+    byte[] getBytes() throws IOException;
+
+    /**
+     * Merges estimators to produce a new estimator for the combined streams
+     * of this estimator and those passed as arguments.
+     * 
+     * Nor this estimator nor the one passed as parameters are modified.
+     *
+     * @param estimators Zero or more compatible estimators
+     * @throws CardinalityMergeException If at least one of the estimators is not compatible with this one
+     */
+    ICardinality merge(ICardinality... estimators) throws CardinalityMergeException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/MurmurHash.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/MurmurHash.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/MurmurHash.java
new file mode 100644
index 0000000..13e5d48
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/MurmurHash.java
@@ -0,0 +1,252 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.cardinality;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * This is a very fast, non-cryptographic hash suitable for general hash-based
+ * lookup. See http://murmurhash.googlepages.com/ for more details.
+ * <p/>
+ * <p>
+ * The C version of MurmurHash 2.0 found at that site was ported to Java by
+ * Andrzej Bialecki (ab at getopt org).
+ * </p>
+ */
+public class MurmurHash
+{
+    public static int hash(Object o)
+    {
+        if (o == null)
+        {
+            return 0;
+        }
+        if (o instanceof Long)
+        {
+            return hashLong((Long) o);
+        }
+        if (o instanceof Integer)
+        {
+            return hashLong((Integer) o);
+        }
+        if (o instanceof Double)
+        {
+            return hashLong(Double.doubleToRawLongBits((Double) o));
+        }
+        if (o instanceof Float)
+        {
+            return hashLong(Float.floatToRawIntBits((Float) o));
+        }
+        if (o instanceof String)
+        {
+            return hash(((String) o).getBytes());
+        }
+        if (o instanceof byte[])
+        {
+            return hash((byte[]) o);
+        }
+        return hash(o.toString());
+    }
+
+    public static int hash(byte[] data)
+    {
+        return hash(data, 0, data.length, -1);
+    }
+
+    public static int hash(byte[] data, int seed)
+    {
+        return hash(data, 0, data.length, seed);
+    }
+
+    public static int hash(byte[] data, int offset, int length, int seed)
+    {
+        int m = 0x5bd1e995;
+        int r = 24;
+
+        int h = seed ^ length;
+
+        int len_4 = length >> 2;
+
+        for (int i = 0; i < len_4; i++)
+        {
+            int i_4 = i << 2;
+            int k = data[offset + i_4 + 3];
+            k = k << 8;
+            k = k | (data[offset + i_4 + 2] & 0xff);
+            k = k << 8;
+            k = k | (data[offset + i_4 + 1] & 0xff);
+            k = k << 8;
+            k = k | (data[offset + i_4 + 0] & 0xff);
+            k *= m;
+            k ^= k >>> r;
+            k *= m;
+            h *= m;
+            h ^= k;
+        }
+
+        // avoid calculating modulo
+        int len_m = len_4 << 2;
+        int left = length - len_m;
+
+        if (left != 0)
+        {
+            if (left >= 3)
+            {
+                h ^= (int) data[offset + length - 3] << 16;
+            }
+            if (left >= 2)
+            {
+                h ^= (int) data[offset + length - 2] << 8;
+            }
+            if (left >= 1)
+            {
+                h ^= (int) data[offset + length - 1];
+            }
+
+            h *= m;
+        }
+
+        h ^= h >>> 13;
+        h *= m;
+        h ^= h >>> 15;
+
+        return h;
+    }
+
+    public static int hashLong(long data)
+    {
+        int m = 0x5bd1e995;
+        int r = 24;
+
+        int h = 0;
+
+        int k = (int) data * m;
+        k ^= k >>> r;
+        h ^= k * m;
+
+        k = (int) (data >> 32) * m;
+        k ^= k >>> r;
+        h *= m;
+        h ^= k * m;
+
+        h ^= h >>> 13;
+        h *= m;
+        h ^= h >>> 15;
+
+        return h;
+    }
+
+    public static long hash64(Object o)
+    {
+        if (o == null)
+        {
+            return 0l;
+        }
+        else if (o instanceof String)
+        {
+            final byte[] bytes = ((String) o).getBytes();
+            return hash64(bytes, bytes.length);
+        }
+        else if (o instanceof byte[])
+        {
+            final byte[] bytes = (byte[]) o;
+            return hash64(bytes, bytes.length);
+        }
+        return hash64(o.toString());
+    }
+
+    // 64 bit implementation copied from here:  https://github.com/tnm/murmurhash-java
+
+    /**
+     * Generates 64 bit hash from byte array with default seed value.
+     *
+     * @param data   byte array to hash
+     * @param length length of the array to hash
+     * @return 64 bit hash of the given string
+     */
+    public static long hash64(final byte[] data, int length)
+    {
+        return hash64(data, length, 0xe17a1465);
+    }
+
+
+    /**
+     * Generates 64 bit hash from byte array of the given length and seed.
+     *
+     * @param data   byte array to hash
+     * @param length length of the array to hash
+     * @param seed   initial seed value
+     * @return 64 bit hash of the given array
+     */
+    public static long hash64(final byte[] data, int length, int seed)
+    {
+        final long m = 0xc6a4a7935bd1e995L;
+        final int r = 47;
+
+        long h = (seed & 0xffffffffl) ^ (length * m);
+
+        int length8 = length / 8;
+
+        for (int i = 0; i < length8; i++)
+        {
+            final int i8 = i * 8;
+            long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8)
+                    + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24)
+                    + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40)
+                    + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56);
+
+            k *= m;
+            k ^= k >>> r;
+            k *= m;
+
+            h ^= k;
+            h *= m;
+        }
+
+        switch (length % 8)
+        {
+            case 7:
+                h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
+            case 6:
+                h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
+            case 5:
+                h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
+            case 4:
+                h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
+            case 3:
+                h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
+            case 2:
+                h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
+            case 1:
+                h ^= (long) (data[length & ~7] & 0xff);
+                h *= m;
+        }
+        ;
+
+        h ^= h >>> r;
+        h *= m;
+        h ^= h >>> r;
+
+        return h;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/RegisterSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/RegisterSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/RegisterSet.java
new file mode 100644
index 0000000..da17154
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/cardinality/RegisterSet.java
@@ -0,0 +1,127 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/*
+ * Copyright (C) 2012 Clearspring Technologies, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.cardinality;
+
+public class RegisterSet
+{
+    public final static int LOG2_BITS_PER_WORD = 6;
+    public final static int REGISTER_SIZE = 5;
+
+    public final int count;
+    public final int size;
+
+    private final int[] M;
+
+    public RegisterSet(int count)
+    {
+        this(count, null);
+    }
+
+    public RegisterSet(int count, int[] initialValues)
+    {
+        this.count = count;
+        int bits = getBits(count);
+
+        if (initialValues == null)
+        {
+            if (bits == 0)
+            {
+                this.M = new int[1];
+            }
+            else if (bits % Integer.SIZE == 0)
+            {
+                this.M = new int[bits];
+            }
+            else
+            {
+                this.M = new int[bits + 1];
+            }
+        }
+        else
+        {
+            this.M = initialValues;
+        }
+        this.size = this.M.length;
+    }
+
+    public static int getBits(int count)
+    {
+        return count / LOG2_BITS_PER_WORD;
+    }
+
+    public void set(int position, int value)
+    {
+        int bucketPos = position / LOG2_BITS_PER_WORD;
+        int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD));
+        this.M[bucketPos] = (this.M[bucketPos] & ~(0x1f << shift)) | (value << shift);
+    }
+
+    public int get(int position)
+    {
+        int bucketPos = position / LOG2_BITS_PER_WORD;
+        int shift = REGISTER_SIZE * (position - (bucketPos * LOG2_BITS_PER_WORD));
+        return (this.M[bucketPos] & (0x1f << shift)) >>> shift;
+    }
+    
+    public boolean updateIfGreater(int position, int value)
+    {
+        int bucket = position / LOG2_BITS_PER_WORD;
+        int shift  = REGISTER_SIZE * (position - (bucket * LOG2_BITS_PER_WORD));
+        int mask = 0x1f << shift;
+
+        // Use long to avoid sign issues with the left-most shift
+        long curVal = this.M[bucket] & mask;
+        long newVal = value << shift;
+        if (curVal < newVal) {
+            this.M[bucket] = (int)((this.M[bucket] & ~mask) | newVal);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    public void merge(RegisterSet that)
+    {
+        for (int bucket = 0; bucket < M.length; bucket++)
+        {
+            int word = 0;
+            for (int j = 0; j < LOG2_BITS_PER_WORD; j++)
+            {
+                int mask = 0x1f << (REGISTER_SIZE * j);
+
+                int thisVal = (this.M[bucket] & mask);
+                int thatVal = (that.M[bucket] & mask);
+                word |= (thisVal < thatVal) ? thatVal : thisVal;
+            }
+            this.M[bucket] = word;
+        }
+    }
+
+    public int[] bits()
+    {
+        int[] copy = new int[size];
+        System.arraycopy(M, 0, copy, 0, M.length);
+        return copy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
new file mode 100644
index 0000000..39aa240
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplog.java
@@ -0,0 +1,348 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.cardinality.ICardinality;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.CompressionType;
+import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Writer.Option;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Abstract class for {@link Hoplog} with common functionality
+ */
+public abstract class AbstractHoplog implements Hoplog {
+  protected final FSProvider fsProvider;
+  
+  // path of the oplog file
+  protected volatile Path path;
+  private volatile HoplogDescriptor hfd;
+  protected Configuration conf;
+  protected SortedOplogStatistics stats;
+  protected Long hoplogModificationTime;
+  protected Long hoplogSize;
+
+  protected HoplogReaderActivityListener readerListener;
+  
+  // logger instance
+  protected static final Logger logger = LogService.getLogger();
+  
+  protected static String logPrefix;
+  // THIS CONSTRUCTOR SHOULD BE USED FOR LONER ONLY
+  AbstractHoplog(FileSystem inputFS, Path filePath, SortedOplogStatistics stats)
+      throws IOException {
+    logPrefix = "<" + filePath.getName() + "> ";
+    this.fsProvider = new FSProvider(inputFS);
+    initialize(filePath, stats, inputFS);
+  }
+
+  public AbstractHoplog(HDFSStoreImpl store, Path filePath,
+      SortedOplogStatistics stats) throws IOException {
+    logPrefix = "<" + filePath.getName() + "> ";
+    this.fsProvider = new FSProvider(store);
+    initialize(filePath, stats, store.getFileSystem());
+  }
+
+  private void initialize(Path path, SortedOplogStatistics stats, FileSystem fs) {
+    this.conf = fs.getConf();
+    this.stats = stats;
+    this.path = fs.makeQualified(path);
+    this.hfd = new HoplogDescriptor(this.path.getName());
+  }
+  
+  @Override
+  public abstract void close() throws IOException; 
+  @Override
+  public abstract HoplogReader getReader() throws IOException;
+
+  @Override
+  public abstract HoplogWriter createWriter(int keys) throws IOException;
+
+  @Override
+  abstract public void close(boolean clearCache) throws IOException;
+
+  @Override
+  public void setReaderActivityListener(HoplogReaderActivityListener listener) {
+    this.readerListener = listener;
+  }
+  
+  @Override
+  public String getFileName() {
+    return this.hfd.getFileName();
+  }
+  
+  public final int compareTo(Hoplog o) {
+    return hfd.compareTo( ((AbstractHoplog)o).hfd);
+  }
+
+  @Override
+  public ICardinality getEntryCountEstimate() throws IOException {
+    return null;
+  }
+  
+  @Override
+  public synchronized void rename(String name) throws IOException {
+    if (logger.isDebugEnabled())
+      logger.debug("{}Renaming hoplog to " + name, logPrefix);
+    Path parent = path.getParent();
+    Path newPath = new Path(parent, name);
+    fsProvider.getFS().rename(path, new Path(parent, newPath));
+
+    // close the old reader and let the new one get created lazily
+    close();
+    
+    // update path to point to the new path
+    path = newPath;
+    this.hfd = new HoplogDescriptor(this.path.getName());
+    logPrefix = "<" + path.getName() + "> ";
+  }
+  
+  @Override
+  public synchronized void delete() throws IOException {
+    if (logger.isDebugEnabled())
+      logger.debug("{}Deleting hoplog", logPrefix);
+    close();
+    this.hoplogModificationTime = null;
+    this.hoplogSize = null;
+    fsProvider.getFS().delete(path, false);
+  }
+
+  @Override
+  public long getModificationTimeStamp() {
+    initHoplogSizeTimeInfo();
+
+    // modification time will not be null if this hoplog is existing. Otherwise
+    // invocation of this method should is invalid
+    if (hoplogModificationTime == null) {
+      throw new IllegalStateException();
+    }
+    
+    return hoplogModificationTime;
+  }
+
+  @Override
+  public long getSize() {
+    initHoplogSizeTimeInfo();
+    
+    // size will not be null if this hoplog is existing. Otherwise
+    // invocation of this method should is invalid
+    if (hoplogSize == null) {
+      throw new IllegalStateException();
+    }
+    
+    return hoplogSize;
+  }
+  
+  private synchronized void initHoplogSizeTimeInfo() {
+    if (hoplogSize != null && hoplogModificationTime != null) {
+      // time and size info is already initialized. no work needed here
+      return;
+    }
+
+    try {
+      FileStatus[] filesInfo = FSUtils.listStatus(fsProvider.getFS(), path, null);
+      if (filesInfo != null && filesInfo.length == 1) {
+        this.hoplogModificationTime = filesInfo[0].getModificationTime();
+        this.hoplogSize = filesInfo[0].getLen();
+      }
+      // TODO else condition may happen if user deletes hoplog from the file system.
+    } catch (IOException e) {
+      logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e);
+      throw new HDFSIOException(
+          LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path),e);
+    }
+  }
+  public static SequenceFile.Writer getSequenceFileWriter(Path path, 
+      Configuration conf, Logger logger) throws IOException {
+    return getSequenceFileWriter(path,conf, logger, null); 
+  }
+  
+  /**
+   * 
+   * @param path
+   * @param conf
+   * @param logger
+   * @param version - is being used only for testing. Should be passed as null for other purposes. 
+   * @return SequenceFile.Writer 
+   * @throws IOException
+   */
+  public static SequenceFile.Writer getSequenceFileWriter(Path path, 
+    Configuration conf, Logger logger, Version version) throws IOException {
+    Option optPath = SequenceFile.Writer.file(path);
+    Option optKey = SequenceFile.Writer.keyClass(BytesWritable.class);
+    Option optVal = SequenceFile.Writer.valueClass(BytesWritable.class);
+    Option optCom = withCompression(logger);
+    if (logger.isDebugEnabled())
+      logger.debug("{}Started creating hoplog " + path, logPrefix);
+    
+    if (version == null)
+      version = Version.CURRENT;
+    //Create a metadata option with the gemfire version, for future versioning
+    //of the key and value format
+    SequenceFile.Metadata metadata = new SequenceFile.Metadata();
+    metadata.set(new Text(Meta.GEMFIRE_VERSION.name()), new Text(String.valueOf(version.ordinal())));
+    Option optMeta = SequenceFile.Writer.metadata(metadata);
+    
+    SequenceFile.Writer writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom, optMeta);
+    
+    return writer;
+  }
+  
+  private static Option withCompression(Logger logger) {
+    String prop = System.getProperty(HoplogConfig.COMPRESSION);
+    if (prop != null) {
+      CompressionCodec codec;
+      if (prop.equalsIgnoreCase("SNAPPY")) {
+        codec = new SnappyCodec();
+      } else if (prop.equalsIgnoreCase("LZ4")) {
+        codec = new Lz4Codec();
+      } else if (prop.equals("GZ")) {
+        codec = new GzipCodec();
+      } else {
+        throw new IllegalStateException("Unsupported codec: " + prop);
+      }
+      if (logger.isDebugEnabled())
+        logger.debug("{}Using compression codec " + codec, logPrefix);
+      return SequenceFile.Writer.compression(CompressionType.BLOCK, codec);
+    }
+    return SequenceFile.Writer.compression(CompressionType.NONE, null);
+  }
+  
+  public static final class HoplogDescriptor implements Comparable<HoplogDescriptor> {
+     private final String fileName;
+     private final String bucket;
+     private final int sequence;
+     private final long timestamp;
+     private final String extension;
+     
+     HoplogDescriptor(final String fileName) {
+       this.fileName = fileName;
+       final Matcher matcher = AbstractHoplogOrganizer.HOPLOG_NAME_PATTERN.matcher(fileName);
+       final boolean matched = matcher.find();
+       assert matched;
+       this.bucket = matcher.group(1);
+       this.sequence = Integer.valueOf(matcher.group(3));
+       this.timestamp = Long.valueOf(matcher.group(2)); 
+       this.extension = matcher.group(4);
+     }
+     
+     public final String getFileName() {
+       return fileName;
+     }
+     
+     @Override
+     public boolean equals(Object o) {
+       if (this == o) {
+         return true;
+       }
+       
+       if (!(o instanceof HoplogDescriptor)) {
+         return false;
+       }
+       
+       final HoplogDescriptor other = (HoplogDescriptor)o;
+       // the two files should belong to same bucket
+       assert this.bucket.equals(other.bucket);
+       
+       // compare sequence first
+       if (this.sequence != other.sequence) {
+         return false;
+       }
+       
+       // sequence is same, compare timestamps
+       if (this.timestamp != other.timestamp) {
+         return false;
+       }
+       
+       return extension.equals(other.extension);
+     }
+
+    @Override
+    public int compareTo(HoplogDescriptor o) {
+      if (this == o) {
+        return 0;
+      }
+      
+      // the two files should belong to same bucket
+      assert this.bucket.equals(o.bucket);
+      
+      // compare sequence first
+      if (sequence > o.sequence) {
+        return -1;
+      } else if (sequence < o.sequence) {
+        return 1;
+      }
+      
+      // sequence is same, compare timestamps
+      if(timestamp > o.timestamp) {
+        return -1; 
+      } else if (timestamp < o.timestamp) {
+        return 1;
+      }
+      
+      //timestamp is the same, compare the file extension. It's
+      //possible a major compaction and minor compaction could finish
+      //at the same time and create the same timestamp and sequence number
+      //it doesn't matter which file we look at first in that case.
+      return extension.compareTo(o.extension);
+    }
+     
+     
+  }
+  
+  protected static final class FSProvider {
+    final FileSystem fs;
+    final HDFSStoreImpl store;
+    
+    // THIS METHOD IS FOR TESTING ONLY
+    FSProvider(FileSystem fs) {
+      this.fs = fs;
+      this.store = null;
+    }
+    
+    FSProvider(HDFSStoreImpl store) {
+      this.store = store;
+      fs = null;
+    }
+    
+    public FileSystem getFS() throws IOException {
+      if (store != null) {
+        return store.getFileSystem();
+      }
+      return fs;
+    }
+
+    public FileSystem checkFileSystem() {
+      store.checkAndClearFileSystem();
+      return store.getCachedFileSystem();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
new file mode 100644
index 0000000..22e8832
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/AbstractHoplogOrganizer.java
@@ -0,0 +1,421 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog.HoplogDescriptor;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+
+public abstract class AbstractHoplogOrganizer<T extends PersistedEventImpl> implements HoplogOrganizer<T> {
+
+  public static final String MINOR_HOPLOG_EXTENSION = ".ihop";
+  public static final String MAJOR_HOPLOG_EXTENSION = ".chop";
+  public static final String EXPIRED_HOPLOG_EXTENSION = ".exp";
+  public static final String TEMP_HOPLOG_EXTENSION = ".tmp";
+
+  public static final String FLUSH_HOPLOG_EXTENSION = ".hop";
+  public static final String SEQ_HOPLOG_EXTENSION = ".shop";
+
+  // all valid hoplogs will follow the following name pattern
+  public static final String HOPLOG_NAME_REGEX = "(.+?)-(\\d+?)-(\\d+?)";
+  public static final Pattern HOPLOG_NAME_PATTERN = Pattern.compile(HOPLOG_NAME_REGEX
+      + "\\.(.*)");
+  
+  public static boolean JUNIT_TEST_RUN = false; 
+
+  protected static final boolean ENABLE_INTEGRITY_CHECKS = Boolean
+      .getBoolean("gemfire.HdfsSortedOplogOrganizer.ENABLE_INTEGRITY_CHECKS")
+      || assertionsEnabled();
+
+  private static boolean assertionsEnabled() {
+    boolean enabled = false;
+    assert enabled = true;
+    return enabled;
+  }
+
+  protected HdfsRegionManager regionManager;
+  // name or id of bucket managed by this organizer
+  protected final String regionFolder;
+  protected final int bucketId;
+
+  // path of the region directory
+  protected final Path basePath;
+  // identifies path of directory containing a bucket's oplog files
+  protected final Path bucketPath;
+
+  protected final HDFSStoreImpl store;
+
+  // assigns a unique increasing number to each oplog file
+  protected AtomicInteger sequence;
+
+  //logger instance
+  protected static final Logger logger = LogService.getLogger();
+  protected final String logPrefix;
+
+  protected SortedOplogStatistics stats;
+  AtomicLong bucketDiskUsage = new AtomicLong(0);
+
+  // creation of new files and expiration of files will be synchronously
+  // notified to the listener
+  protected HoplogListener listener;
+
+  private volatile boolean closed = false;
+  
+  protected Object changePrimarylockObject = new Object();
+  
+  public AbstractHoplogOrganizer(HdfsRegionManager region, int bucketId) {
+
+    assert region != null;
+
+    this.regionManager = region;
+    this.regionFolder = region.getRegionFolder();
+    this.store = region.getStore();
+    this.listener = region.getListener();
+    this.stats = region.getHdfsStats();
+    
+    this.bucketId = bucketId;
+
+    this.basePath = new Path(store.getHomeDir());
+    this.bucketPath = new Path(basePath, regionFolder + "/" + bucketId);
+
+    this.logPrefix = "<" + getRegionBucketStr() + "> ";
+    
+  }
+
+  @Override
+  public boolean isClosed() {
+    return closed || regionManager.isClosed();
+  }
+  
+  @Override
+  public void close() throws IOException {
+    closed = true;
+    
+    // this bucket is closed and may be owned by a new node. So reduce the store
+    // usage stat, as the new owner adds the usage metric
+    incrementDiskUsage((-1) * bucketDiskUsage.get());
+  }
+
+  @Override
+  public abstract void flush(Iterator<? extends QueuedPersistentEvent> bufferIter,
+      int count) throws IOException, ForceReattemptException;
+
+  @Override
+  public abstract void clear() throws IOException;
+
+  protected abstract Hoplog getHoplog(Path hoplogPath) throws IOException;
+
+  @Override
+  public void hoplogCreated(String region, int bucketId, Hoplog... oplogs)
+      throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public void hoplogDeleted(String region, int bucketId, Hoplog... oplogs)
+      throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public void compactionCompleted(String region, int bucket, boolean isMajor) {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+  
+  @Override
+  public T read(byte[] key) throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public HoplogIterator<byte[], T> scan() throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public HoplogIterator<byte[], T> scan(byte[] from, byte[] to)
+      throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public HoplogIterator<byte[], T> scan(byte[] from,
+      boolean fromInclusive, byte[] to, boolean toInclusive) throws IOException {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  @Override
+  public long sizeEstimate() {
+    throw new UnsupportedOperationException("Not supported for "
+        + this.getClass().getSimpleName());
+  }
+
+  /**
+   * @return returns an oplogs full path after prefixing bucket path to the file
+   *         name
+   */
+  protected String getPathStr(Hoplog oplog) {
+    return bucketPath.toString() + "/" + oplog.getFileName();
+  }
+
+  protected String getRegionBucketStr() {
+    return regionFolder + "/" + bucketId;
+  }
+
+  protected SortedHoplogPersistedEvent deserializeValue(byte[] val) throws IOException {
+    try {
+      return SortedHoplogPersistedEvent.fromBytes(val);
+    } catch (ClassNotFoundException e) {
+      logger
+          .error(
+              LocalizedStrings.GetMessage_UNABLE_TO_DESERIALIZE_VALUE_CLASSNOTFOUNDEXCEPTION,
+              e);
+      return null;
+    }
+  }
+
+  /**
+   * @return true if the entry belongs to an destroy event
+   */
+  protected boolean isDeletedEntry(byte[] value, int offset) throws IOException {
+    // Read only the first byte of PersistedEventImpl for the operation
+    assert value != null && value.length > 0 && offset >= 0 && offset < value.length;
+    Operation op = Operation.fromOrdinal(value[offset]);
+
+    if (op.isDestroy() || op.isInvalidate()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * @param seqNum
+   *          desired sequence number of the hoplog. If null a highest number is
+   *          choosen
+   * @param extension
+   *          file extension representing the type of file, e.g. ihop for
+   *          intermediate hoplog
+   * @return a new temporary file for a new sorted oplog. The name consists of
+   *         bucket name, a sequence number for ordering the files followed by a
+   *         timestamp
+   */
+  Hoplog getTmpSortedOplog(Integer seqNum, String extension) throws IOException {
+    if (seqNum == null) {
+      seqNum = sequence.incrementAndGet();
+    }
+    String name = bucketId + "-" + System.currentTimeMillis() + "-" + seqNum 
+        + extension;
+    Path soplogPath = new Path(bucketPath, name + TEMP_HOPLOG_EXTENSION);
+    return getHoplog(soplogPath);
+  }
+  
+  /**
+   * renames a temporary hoplog file to a legitimate name.
+   */
+  static void makeLegitimate(Hoplog so) throws IOException {
+    String name = so.getFileName();
+    assert name.endsWith(TEMP_HOPLOG_EXTENSION);
+
+    int index = name.lastIndexOf(TEMP_HOPLOG_EXTENSION);
+    name = name.substring(0, index);
+    so.rename(name);
+  }
+
+  /**
+   * creates a expiry marker for a file on file system
+   * 
+   * @param hoplog
+   * @throws IOException
+   */
+  protected void addExpiryMarkerForAFile(Hoplog hoplog) throws IOException {
+    FileSystem fs = store.getFileSystem();
+
+    // TODO optimization needed here. instead of creating expired marker
+    // file per file, create a meta file. the main thing to worry is
+    // compaction of meta file itself
+    Path expiryMarker = getExpiryMarkerPath(hoplog.getFileName());
+
+    // uh-oh, why are we trying to expire an already expired file?
+    if (ENABLE_INTEGRITY_CHECKS) {
+      Assert.assertTrue(!fs.exists(expiryMarker),
+          "Expiry marker already exists: " + expiryMarker);
+    }
+
+    FSDataOutputStream expiryMarkerFile = fs.create(expiryMarker);
+    expiryMarkerFile.close();
+
+    if (logger.isDebugEnabled())
+      logger.debug("Hoplog marked expired: " + getPathStr(hoplog));
+  }
+
+  protected Path getExpiryMarkerPath(String name) {
+    return new Path(bucketPath, name + EXPIRED_HOPLOG_EXTENSION);
+  }
+  
+  protected String truncateExpiryExtension(String name) {
+    if (name.endsWith(EXPIRED_HOPLOG_EXTENSION)) {
+      return name.substring(0, name.length() - EXPIRED_HOPLOG_EXTENSION.length());
+    }
+    
+    return name;
+  }
+  
+  /**
+   * updates region stats and a local copy of bucket level store usage metric.
+   * 
+   * @param delta
+   */
+  protected void incrementDiskUsage(long delta) {
+    long newSize = bucketDiskUsage.addAndGet(delta);
+    if (newSize < 0 && delta < 0) {
+      if (logger.isDebugEnabled()){
+        logger.debug("{}Invalid diskUsage size:" + newSize + " caused by delta:"
+            + delta + ", parallel del & close?" + isClosed(), logPrefix);
+      }
+      if (isClosed()) {
+        // avoid corrupting disk usage size during close by reducing residue
+        // size only
+        delta = delta + (-1 * newSize);
+      }
+    }
+    stats.incStoreUsageBytes(delta);
+  }
+
+  /**
+   * Utility method to remove a file from valid file list if a expired marker
+   * for the file exists
+   * 
+   * @param valid
+   *          list of valid files
+   * @param expired
+   *          list of expired file markers
+   * @return list f valid files that do not have a expired file marker
+   */
+  public static FileStatus[] filterValidHoplogs(FileStatus[] valid,
+      FileStatus[] expired) {
+    if (valid == null) {
+      return null;
+    }
+
+    if (expired == null) {
+      return valid;
+    }
+
+    ArrayList<FileStatus> result = new ArrayList<FileStatus>();
+    for (FileStatus vs : valid) {
+      boolean found = false;
+      for (FileStatus ex : expired) {
+        if (ex
+            .getPath()
+            .getName()
+            .equals(
+                vs.getPath().getName()
+                    + HdfsSortedOplogOrganizer.EXPIRED_HOPLOG_EXTENSION)) {
+          found = true;
+        }
+      }
+      if (!found) {
+        result.add(vs);
+      }
+    }
+
+    return result.toArray(new FileStatus[result.size()]);
+  }
+
+  protected void pingSecondaries() throws ForceReattemptException {
+
+    if (JUNIT_TEST_RUN)
+      return;
+    BucketRegion br = ((PartitionedRegion)this.regionManager.getRegion()).getDataStore().getLocalBucketById(this.bucketId);
+    boolean secondariesPingable = false;
+    try {
+      secondariesPingable = br.areSecondariesPingable();
+    } catch (Throwable e) {
+      throw new ForceReattemptException("Failed to ping secondary servers of bucket: " + 
+          this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()), e);
+    }
+    if (!secondariesPingable)
+      throw new ForceReattemptException("Failed to ping secondary servers of bucket: " + 
+          this.bucketId + ", region: " + ((PartitionedRegion)this.regionManager.getRegion()));
+  }
+
+  
+
+  
+  /**
+   * A comparator for ordering soplogs based on the file name. The file names
+   * are assigned incrementally and hint at the age of the file
+   */
+  public static final class HoplogComparator implements
+      Comparator<TrackedReference<Hoplog>> {
+    /**
+     * a file with a higher sequence or timestamp is the younger and hence the
+     * smaller
+     */
+    @Override
+    public int compare(TrackedReference<Hoplog> o1, TrackedReference<Hoplog> o2) {
+      return o1.get().compareTo(o2.get());
+    }
+
+    /**
+     * Compares age of files based on file names and returns 1 if name1 is
+     * older, -1 if name1 is yonger and 0 if the two files are same age
+     */
+    public static int compareByName(String name1, String name2) {
+      HoplogDescriptor hd1 = new HoplogDescriptor(name1);
+      HoplogDescriptor hd2 = new HoplogDescriptor(name2);
+      
+      return hd1.compareTo(hd2);
+    }
+  }
+
+  /**
+   * @param matcher
+   *          A preinitialized / matched regex pattern
+   * @return Timestamp of the
+   */
+  public static long getHoplogTimestamp(Matcher matcher) {
+    return Long.valueOf(matcher.group(2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
new file mode 100644
index 0000000..f46da93
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BloomFilter.java
@@ -0,0 +1,27 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+public interface BloomFilter {
+  /**
+   * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
+   * is such that false positives are allowed, but false negatives cannot occur.
+   */
+  boolean mightContain(byte[] key);
+
+  /**
+   * Returns true if the bloom filter might contain the supplied key. The nature of the bloom filter
+   * is such that false positives are allowed, but false negatives cannot occur.
+   */
+  boolean mightContain(byte[] key, int keyOffset, int keyLength);
+
+  /**
+   * @return Size of the bloom, in bytes
+   */
+  long getBloomSize();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
new file mode 100644
index 0000000..26223a0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CloseTmpHoplogsTimerTask.java
@@ -0,0 +1,100 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.Collection;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+import org.apache.logging.log4j.Logger;
+
+/**
+ * For streaming case, if the bucket traffic goes down after writing few batches of data, 
+ * the flush doesn't get called. In that case, the file is left in tmp state
+ * until the flush restarts. To avoid this issue, added this timer task 
+ * that periodically iterates over the buckets and closes their writer 
+ * if the time for rollover has passed.
+ * 
+ * It also has got an extra responsibility of fixing the file sizes of the files 
+ * that weren't closed properly last time. 
+ *
+ * @author hemantb
+ *
+ */
+class CloseTmpHoplogsTimerTask extends SystemTimerTask {
+  
+  private HdfsRegionManager hdfsRegionManager;
+  private static final Logger logger = LogService.getLogger();
+  private FileSystem filesystem; 
+  
+  public CloseTmpHoplogsTimerTask(HdfsRegionManager hdfsRegionManager) {
+    this.hdfsRegionManager = hdfsRegionManager;
+    
+    // Create a new filesystem 
+    // This is added for the following reason:
+    // For HDFS, if a file wasn't closed properly last time, 
+    // while calling FileSystem.append for this file, FSNamesystem.startFileInternal->
+    // FSNamesystem.recoverLeaseInternal function gets called. 
+    // This function throws AlreadyBeingCreatedException if there is an open handle, to any other file, 
+    // created using the same FileSystem object. This is a bug and is being tracked at: 
+    // https://issues.apache.org/jira/browse/HDFS-3848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
+    // 
+    // The fix for this bug is not yet part of Pivotal HD. So to overcome the bug, 
+    // we create a new file system for the timer task so that it does not encounter the bug. 
+    this.filesystem = this.hdfsRegionManager.getStore().createFileSystem();
+    if (logger.isDebugEnabled()) 
+      logger.debug("created a new file system specifically for timer task");
+  }
+
+  
+  /**
+   * Iterates over all the bucket organizers and closes their writer if the time for 
+   * rollover has passed. It also has the additional responsibility of fixing the tmp
+   * files that were left over in the last unsuccessful run. 
+   */
+  @Override
+  public void run2() {
+    Collection<HoplogOrganizer> organizers =  hdfsRegionManager.getBucketOrganizers();
+    if (logger.isDebugEnabled()) 
+      logger.debug("Starting the close temp logs run.");
+    
+    for (HoplogOrganizer organizer: organizers) {
+      
+      HDFSUnsortedHoplogOrganizer unsortedOrganizer = (HDFSUnsortedHoplogOrganizer)organizer;
+      long timeSinceLastFlush = (System.currentTimeMillis() - unsortedOrganizer.getLastFlushTime())/1000 ;
+      try {
+        this.hdfsRegionManager.getRegion().checkReadiness();
+      } catch (Exception e) {
+        break;
+      }
+      
+      try {
+        // the time since last flush has exceeded file rollover interval, roll over the 
+        // file. 
+        if (timeSinceLastFlush >= unsortedOrganizer.getfileRolloverInterval()) {
+          if (logger.isDebugEnabled()) 
+            logger.debug("Closing writer for bucket: " + unsortedOrganizer.bucketId);
+          unsortedOrganizer.synchronizedCloseWriter(false, timeSinceLastFlush, 0);
+        }
+        
+        // fix the tmp hoplogs, if any. Pass the new file system here. 
+        unsortedOrganizer.identifyAndFixTmpHoplogs(this.filesystem);
+        
+      } catch (Exception e) {
+        logger.warn(LocalizedStrings.HOPLOG_CLOSE_FAILED, e);
+      }
+    }
+    
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
new file mode 100644
index 0000000..04cf7ca
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CompactionStatus.java
@@ -0,0 +1,64 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Status of the compaction task reported in the future
+ * 
+ * @author sbawaska
+ */
+public class CompactionStatus implements VersionedDataSerializable {
+  /**MergeGemXDHDFSToGFE check and verify serializationversions **/
+ 
+  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+  private int bucketId;
+  private boolean status;
+
+  public CompactionStatus() {
+  }
+
+  public CompactionStatus(int bucketId, boolean status) {
+    this.bucketId = bucketId;
+    this.status = status;
+  }
+  public int getBucketId() {
+    return bucketId;
+  }
+  public boolean isStatus() {
+    return status;
+  }
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(bucketId);
+    out.writeBoolean(status);
+  }
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.bucketId = in.readInt();
+    this.status = in.readBoolean();
+  }
+  @Override
+  public Version[] getSerializationVersions() {
+    return serializationVersions;
+  }
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getCanonicalName()).append("@")
+    .append(System.identityHashCode(this)).append(" Bucket:")
+    .append(bucketId).append(" status:").append(status);
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
new file mode 100644
index 0000000..f75cdba
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/FlushStatus.java
@@ -0,0 +1,57 @@
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Reports the result of a flush request.
+ * 
+ * @author bakera
+ */
+public class FlushStatus implements VersionedDataSerializable {
+  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+  private int bucketId;
+
+  private final static int LAST = -1;
+  
+  public FlushStatus() {
+  }
+
+  public static FlushStatus last() {
+    return new FlushStatus(LAST);
+  }
+  
+  public FlushStatus(int bucketId) {
+    this.bucketId = bucketId;
+  }
+  public int getBucketId() {
+    return bucketId;
+  }
+  public boolean isLast() {
+    return bucketId == LAST;
+  }
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(bucketId);
+  }
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.bucketId = in.readInt();
+  }
+  @Override
+  public Version[] getSerializationVersions() {
+    return serializationVersions;
+  }
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getCanonicalName()).append("@")
+    .append(System.identityHashCode(this)).append(" Bucket:")
+    .append(bucketId);
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
new file mode 100644
index 0000000..d96cd11
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManager.java
@@ -0,0 +1,328 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore.HDFSCompactionConfig;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * A singleton which schedules compaction of hoplogs owned by this node as primary and manages
+ * executor of ongoing compactions. Ideally the number of pending request will not exceed the number
+ * of buckets in the node as hoplog organizer avoids creating a new request if compaction on the
+ * bucket is active. Moreover separate queues for major and minor compactions are maintained to
+ * prevent long running major compactions from preventing minor compactions.
+ */
+public class HDFSCompactionManager {
+  /*
+   * Each hdfs store has its own concurrency configuration. Concurrency
+   * configuration is used by compaction manager to manage threads. This member
+   * holds hdsf-store to compaction manager mapping
+   */
+  private static final ConcurrentHashMap<String, HDFSCompactionManager> storeToManagerMap = 
+                                        new ConcurrentHashMap<String, HDFSCompactionManager>();
+
+  // hdfs store configuration used to initialize this instance
+  HDFSStore storeConfig;
+  
+  // Executor for ordered execution of minor compaction requests.
+  private final CompactionExecutor minorCompactor;
+  // Executor for ordered execution of major compaction requests.
+  private final CompactionExecutor majorCompactor;
+
+  private static final Logger logger = LogService.getLogger();
+  protected final static String logPrefix =  "<" + "HDFSCompactionManager" + "> ";;
+  
+  private HDFSCompactionManager(HDFSStore config) {
+    this.storeConfig = config;
+    // configure hdfs compaction manager
+    HDFSCompactionConfig compactionConf = config.getHDFSCompactionConfig();
+    
+    int capacity = Integer.getInteger(HoplogConfig.COMPCATION_QUEUE_CAPACITY,
+        HoplogConfig.COMPCATION_QUEUE_CAPACITY_DEFAULT);
+
+    minorCompactor = new CompactionExecutor(compactionConf.getMaxThreads(),
+        capacity, "MinorCompactor_" + config.getName());
+
+    majorCompactor = new CompactionExecutor(
+        compactionConf.getMajorCompactionMaxThreads(), capacity, "MajorCompactor_"
+            + config.getName());
+
+    minorCompactor.allowCoreThreadTimeOut(true);
+    majorCompactor.allowCoreThreadTimeOut(true);
+  }
+
+  public static synchronized HDFSCompactionManager getInstance(HDFSStore config) {
+    HDFSCompactionManager instance = storeToManagerMap.get(config.getName());
+    if (instance == null) {
+      instance = new HDFSCompactionManager(config);
+      storeToManagerMap.put(config.getName(), instance);
+    }
+    
+    return instance;
+  }
+
+  /**
+   * Accepts compaction request for asynchronous compaction execution.
+   * 
+   * @param request
+   *          compaction request with region and bucket id
+   * @return true if the request is accepted, false if the compactor is overlaoded and there is a
+   *         long wait queue
+   */
+  public synchronized Future<CompactionStatus> submitRequest(CompactionRequest request) {
+    if (!request.isForced && request.compactor.isBusy(request.isMajor)) {
+      if (logger.isDebugEnabled()) {
+        fineLog("Compactor is busy. Ignoring ", request);
+      }
+      return null;
+    }
+    
+    CompactionExecutor executor = request.isMajor ? majorCompactor : minorCompactor;
+    
+    try {
+      return executor.submit(request);
+    } catch (Throwable e) {
+      if (e instanceof CompactionIsDisabled) {
+        if (logger.isDebugEnabled()) {
+          fineLog("{}" +e.getMessage(), logPrefix);
+        }
+      } else {
+        logger.info(LocalizedMessage.create(LocalizedStrings.ONE_ARG, "Compaction request submission failed"), e);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Removes all pending compaction requests. Programmed for TESTING ONLY
+   */
+  public void reset() {
+    minorCompactor.shutdownNow();
+    majorCompactor.shutdownNow();
+    HDFSCompactionManager.storeToManagerMap.remove(storeConfig.getName());
+  }
+  
+  /**
+   * Returns minor compactor. Programmed for TESTING AND MONITORING ONLY  
+   */
+  public ThreadPoolExecutor getMinorCompactor() {
+    return minorCompactor;
+  }
+
+  /**
+   * Returns major compactor. Programmed for TESTING AND MONITORING ONLY  
+   */
+  public ThreadPoolExecutor getMajorCompactor() {
+    return majorCompactor;
+  }
+  
+  /**
+   * Contains important details needed for executing a compaction cycle.
+   */
+  public static class CompactionRequest implements Callable<CompactionStatus> {
+    String regionFolder;
+    int bucket;
+    Compactor compactor;
+    boolean isMajor;
+    final boolean isForced;
+    final boolean versionUpgrade;
+
+    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major) {
+      this(regionFolder, bucket, compactor, major, false);
+    }
+
+    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced) {
+      this(regionFolder, bucket, compactor, major, isForced, false);
+    }
+
+    public CompactionRequest(String regionFolder, int bucket, Compactor compactor, boolean major, boolean isForced, boolean versionUpgrade) {
+      this.regionFolder = regionFolder;
+      this.bucket = bucket;
+      this.compactor = compactor;
+      this.isMajor = major;
+      this.isForced = isForced;
+      this.versionUpgrade = versionUpgrade;
+    }
+
+    @Override
+    public CompactionStatus call() throws Exception {
+      HDFSStore store = compactor.getHdfsStore();
+      if (!isForced) {
+        // this is a auto generated compaction request. If auto compaction is
+        // disabled, ignore this call.
+        if (isMajor && !store.getHDFSCompactionConfig().getAutoMajorCompaction()) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Major compaction is disabled. Ignoring request",logPrefix);
+          }
+          return new CompactionStatus(bucket, false);
+        } else if (!isMajor && !store.getMinorCompaction()) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("{}Minor compaction is disabled. Ignoring request", logPrefix);
+          }
+          return new CompactionStatus(bucket, false);
+        }
+      }
+
+      // all hurdles passed, execute compaction now
+      try {
+        boolean status = compactor.compact(isMajor, versionUpgrade);
+        return new CompactionStatus(bucket, status);
+      } catch (IOException e) {
+        logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_HDFS_COMPACTION_ERROR, bucket), e);
+      }
+      return new CompactionStatus(bucket, false);
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + bucket;
+      result = prime * result
+          + ((regionFolder == null) ? 0 : regionFolder.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      CompactionRequest other = (CompactionRequest) obj;
+      if (bucket != other.bucket)
+        return false;
+      if (regionFolder == null) {
+        if (other.regionFolder != null)
+          return false;
+      } else if (!regionFolder.equals(other.regionFolder))
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "CompactionRequest [regionFolder=" + regionFolder + ", bucket="
+          + bucket + ", isMajor=" + isMajor + ", isForced="+isForced+"]";
+    }
+  }
+
+  /**
+   * Helper class for creating named instances of comapction threads and managing compaction
+   * executor. All threads wait infinitely
+   */
+  private class CompactionExecutor extends ThreadPoolExecutor implements ThreadFactory {
+    final AtomicInteger count = new AtomicInteger(1);
+    private String name;
+
+    CompactionExecutor(int max, int capacity, String name) {
+      super(max, max, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(capacity));
+      allowCoreThreadTimeOut(true);
+      setThreadFactory(this);
+      this.name = name;
+    }
+    
+    private void throwIfStopped(CompactionRequest req, HDFSStore storeConfig) {
+      // check if compaction is enabled everytime. Alter may change this value
+      // so this check is needed everytime
+      boolean isEnabled = true;
+      isEnabled = storeConfig.getMinorCompaction();
+      if (req.isMajor) {
+        isEnabled = storeConfig.getHDFSCompactionConfig().getAutoMajorCompaction();
+      }
+      if (isEnabled || req.isForced) {
+        return;
+      }
+      throw new CompactionIsDisabled(name + " is disabled");
+    }
+
+    private void throwIfPoolSizeChanged(CompactionRequest task, HDFSCompactionConfig config) {
+      int threadCount = config.getMaxThreads();
+      if (task.isMajor) {
+        threadCount = config.getMajorCompactionMaxThreads();
+      }
+      
+      if (getCorePoolSize() < threadCount) {
+        setCorePoolSize(threadCount);
+      } else if (getCorePoolSize() > threadCount) {
+        setCorePoolSize(threadCount);
+      }
+      
+      if (!task.isForced && getActiveCount() > threadCount) {
+        // the number is active threads is more than new max pool size. Throw
+        // error is this is system generated compaction request
+        throw new CompactionIsDisabled(
+            "Rejecting to reduce the number of threads for " + name
+            + ", currently:" + getActiveCount() + " target:"
+            + threadCount);
+      }
+    }
+    
+    @Override
+    public <T> Future<T> submit(Callable<T> task) {
+      HDFSCompactionConfig config;
+      config = HDFSCompactionManager.this.storeConfig.getHDFSCompactionConfig();
+      
+      throwIfStopped((CompactionRequest) task, HDFSCompactionManager.this.storeConfig);
+      throwIfPoolSizeChanged((CompactionRequest) task, config);
+      
+      if (logger.isDebugEnabled()) {
+        fineLog("New:", task, " pool:", getPoolSize(), " active:", getActiveCount());
+      }
+      return super.submit(task);
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+      Thread thread = new Thread(r, name + ":" + count.getAndIncrement());
+      thread.setDaemon(true);
+      if (logger.isDebugEnabled()) {
+        fineLog("New thread:", name, " poolSize:", getPoolSize(),
+            " active:", getActiveCount());
+      }
+      return thread;
+    }
+  }
+  
+  public static class CompactionIsDisabled extends RejectedExecutionException {
+    private static final long serialVersionUID = 1L;
+    public CompactionIsDisabled(String name) {
+      super(name);
+    }
+  }
+  
+  
+  private void fineLog(Object... strings) {
+    if (logger.isDebugEnabled()) {
+      StringBuffer sb = new StringBuffer();
+      for (Object str : strings) {
+        sb.append(str.toString());
+      }
+      logger.debug("{}"+sb.toString(), logPrefix);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
new file mode 100644
index 0000000..0e620d7
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueArgs.java
@@ -0,0 +1,78 @@
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Defines the arguments to the flush queue request.
+ * 
+ * @author bakera
+ */
+@SuppressWarnings("serial")
+public class HDFSFlushQueueArgs implements VersionedDataSerializable {
+
+  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+
+  private HashSet<Integer> buckets;
+
+  private long maxWaitTimeMillis;
+
+  public HDFSFlushQueueArgs() {
+  }
+
+  public HDFSFlushQueueArgs(Set<Integer> buckets, long maxWaitTime) {
+    this.buckets = new HashSet<Integer>(buckets);
+    this.maxWaitTimeMillis = maxWaitTime;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeHashSet(buckets, out);
+    out.writeLong(maxWaitTimeMillis);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException,
+      ClassNotFoundException {
+    this.buckets = DataSerializer.readHashSet(in);
+    this.maxWaitTimeMillis = in.readLong();
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return serializationVersions;
+  }
+
+  public Set<Integer> getBuckets() {
+    return (Set<Integer>) buckets;
+  }
+
+  public void setBuckets(Set<Integer> buckets) {
+    this.buckets = new HashSet<Integer>(buckets);
+  }
+
+  public boolean isSynchronous() {
+    return maxWaitTimeMillis == 0;
+  }
+
+  public long getMaxWaitTime() {
+    return this.maxWaitTimeMillis;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getCanonicalName()).append("@")
+    .append(System.identityHashCode(this))
+    .append(" buckets:").append(buckets)
+    .append(" maxWaitTime:").append(maxWaitTimeMillis);
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
new file mode 100644
index 0000000..a0f648c
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSFlushQueueFunction.java
@@ -0,0 +1,271 @@
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.execute.Function;
+import com.gemstone.gemfire.cache.execute.FunctionContext;
+import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.execute.RegionFunctionContext;
+import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.InternalEntity;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.LocalResultCollector;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+public class HDFSFlushQueueFunction implements Function, InternalEntity{
+  private static final int MAX_RETRIES = Integer.getInteger("gemfireXD.maxFlushQueueRetries", 3);
+  private static final boolean VERBOSE = Boolean.getBoolean("hdfsFlushQueueFunction.VERBOSE");
+  private static final Logger logger = LogService.getLogger();
+  private static final String ID = HDFSFlushQueueFunction.class.getName();
+  
+  public static void flushQueue(PartitionedRegion pr, int maxWaitTime) {
+    
+    Set<Integer> buckets = new HashSet<Integer>(pr.getRegionAdvisor().getBucketSet());
+
+    maxWaitTime *= 1000;
+    long start = System.currentTimeMillis();
+    
+    int retries = 0;
+    long remaining = 0;
+    while (retries++ < MAX_RETRIES && (remaining = waitTime(start, maxWaitTime)) > 0) {
+      if (logger.isDebugEnabled() || VERBOSE) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing buckets " + buckets 
+            + ", attempt = " + retries 
+            + ", remaining = " + remaining));
+      }
+      
+      HDFSFlushQueueArgs args = new HDFSFlushQueueArgs(buckets, remaining);
+      
+      HDFSFlushQueueResultCollector rc = new HDFSFlushQueueResultCollector(buckets);
+      AbstractExecution exec = (AbstractExecution) FunctionService
+          .onRegion(pr)
+          .withArgs(args)
+          .withCollector(rc);
+      exec.setWaitOnExceptionFlag(true);
+      
+      try {
+        exec.execute(ID);
+        if (rc.getResult()) {
+          if (logger.isDebugEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushed all buckets successfully")); 
+          }
+          return;
+        }
+      } catch (FunctionException e) {
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing queue"), e); 
+        }
+      }
+      
+      buckets.removeAll(rc.getSuccessfulBuckets());
+      for (int bucketId : buckets) {
+        remaining = waitTime(start, maxWaitTime);
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + bucketId)); 
+        }
+        pr.getNodeForBucketWrite(bucketId, new PartitionedRegion.RetryTimeKeeper((int) remaining));
+      }
+    }
+    
+    pr.checkReadiness();
+    throw new FunctionException("Unable to flush the following buckets: " + buckets);
+  }
+  
+  private static long waitTime(long start, long max) {
+    if (max == 0) {
+      return Integer.MAX_VALUE;
+    }
+    return start + max - System.currentTimeMillis();
+  }
+  
+  @Override
+  public void execute(FunctionContext context) {
+    RegionFunctionContext rfc = (RegionFunctionContext) context;
+    PartitionedRegion pr = (PartitionedRegion) rfc.getDataSet();
+    
+    HDFSFlushQueueArgs args = (HDFSFlushQueueArgs) rfc.getArguments();
+    Set<Integer> buckets = new HashSet<Integer>(args.getBuckets());
+    buckets.retainAll(pr.getDataStore().getAllLocalPrimaryBucketIds());
+
+    Map<Integer, AsyncFlushResult> flushes = new HashMap<Integer, AsyncFlushResult>();
+    for (int bucketId : buckets) {
+      try {
+        HDFSBucketRegionQueue brq = getQueue(pr, bucketId);
+        if (brq != null) {
+          if (logger.isDebugEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Flushing bucket " + bucketId)); 
+          }
+          flushes.put(bucketId, brq.flush());
+        }
+      } catch (ForceReattemptException e) {
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Encountered error flushing bucket " + bucketId), e); 
+        }
+      }
+    }
+    
+    try {
+      long start = System.currentTimeMillis();
+      for (Map.Entry<Integer, AsyncFlushResult> flush : flushes.entrySet()) {
+        long remaining = waitTime(start, args.getMaxWaitTime());
+        if (logger.isDebugEnabled() || VERBOSE) {
+          logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Waiting for bucket " + flush.getKey() 
+              + " to complete flushing, remaining = " + remaining)); 
+        }
+        
+        if (flush.getValue().waitForFlush(remaining, TimeUnit.MILLISECONDS)) {
+          if (logger.isDebugEnabled() || VERBOSE) {
+            logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Bucket " + flush.getKey() + " flushed successfully")); 
+          }
+          rfc.getResultSender().sendResult(new FlushStatus(flush.getKey()));
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    
+    if (logger.isDebugEnabled() || VERBOSE) {
+      logger.info(LocalizedMessage.create(LocalizedStrings.DEBUG, "Sending final flush result")); 
+    }
+    rfc.getResultSender().lastResult(FlushStatus.last());
+  }
+
+  private HDFSBucketRegionQueue getQueue(PartitionedRegion pr, int bucketId) 
+      throws ForceReattemptException {
+    AsyncEventQueueImpl aeq = pr.getHDFSEventQueue();
+    AbstractGatewaySender gw = (AbstractGatewaySender) aeq.getSender();
+    AbstractGatewaySenderEventProcessor ep = gw.getEventProcessor();
+    if (ep == null) {
+      return null;
+    }
+    
+    ConcurrentParallelGatewaySenderQueue queue = (ConcurrentParallelGatewaySenderQueue) ep.getQueue();
+    return queue.getBucketRegionQueue(pr, bucketId);
+  }
+  
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+  
+  public static class HDFSFlushQueueResultCollector implements LocalResultCollector<Object, Boolean> {
+    private final CountDownLatch complete;
+    private final Set<Integer> expectedBuckets;
+    private final Set<Integer> successfulBuckets;
+
+    private volatile ReplyProcessor21 processor;
+    
+    public HDFSFlushQueueResultCollector(Set<Integer> expectedBuckets) {
+      this.expectedBuckets = expectedBuckets;
+      
+      complete = new CountDownLatch(1);
+      successfulBuckets = new HashSet<Integer>();
+    }
+    
+    public Set<Integer> getSuccessfulBuckets() {
+      synchronized (successfulBuckets) {
+        return new HashSet<Integer>(successfulBuckets);
+      }
+    }
+    
+    @Override
+    public Boolean getResult() throws FunctionException {
+      try {
+        complete.await();
+        synchronized (successfulBuckets) {
+          LogWriterI18n logger = InternalDistributedSystem.getLoggerI18n();
+          if (logger.fineEnabled() || VERBOSE) {
+            logger.info(LocalizedStrings.DEBUG, "Expected buckets: " + expectedBuckets);
+            logger.info(LocalizedStrings.DEBUG, "Successful buckets: " + successfulBuckets);
+          }
+          return expectedBuckets.equals(successfulBuckets);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        GemFireCacheImpl.getExisting().getCancelCriterion().checkCancelInProgress(e);
+        throw new FunctionException(e);
+      }
+    }
+
+    @Override
+    public Boolean getResult(long timeout, TimeUnit unit)
+        throws FunctionException, InterruptedException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public synchronized void addResult(DistributedMember memberID, Object result) {
+      if (result instanceof FlushStatus) {
+        FlushStatus status = (FlushStatus) result;
+        if (!status.isLast()) {
+          synchronized (successfulBuckets) {
+            successfulBuckets.add(status.getBucketId());
+          }        
+        }
+      }
+    }
+
+    @Override
+    public void endResults() {    	
+      complete.countDown();
+    }
+
+    @Override
+    public void clearResults() {
+    }
+
+    @Override
+    public void setProcessor(ReplyProcessor21 processor) {
+      this.processor = processor;
+    }
+
+    @Override
+    public ReplyProcessor21 getProcessor() {
+      return processor;
+    }
+
+	@Override
+	public void setException(Throwable exception) {
+		// TODO Auto-generated method stub
+		
+	}
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/1709e627/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
new file mode 100644
index 0000000..4b7b98e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSForceCompactionArgs.java
@@ -0,0 +1,99 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.internal.VersionedDataSerializable;
+import com.gemstone.gemfire.internal.Version;
+
+/**
+ * Arguments passed to the HDFSForceCompactionFunction
+ * 
+ * @author sbawaska
+ */
+@SuppressWarnings("serial")
+public class HDFSForceCompactionArgs implements VersionedDataSerializable {
+
+  private static Version[] serializationVersions = new Version[]{ Version.GFE_81 };
+
+  private HashSet<Integer> buckets;
+
+  private boolean isMajor;
+
+  private int maxWaitTime;
+
+  public HDFSForceCompactionArgs() {
+  }
+
+  public HDFSForceCompactionArgs(Set<Integer> buckets, boolean isMajor, Integer maxWaitTime) {
+    this.buckets = new HashSet<Integer>(buckets);
+    this.isMajor = isMajor;
+    this.maxWaitTime = maxWaitTime;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeHashSet(buckets, out);
+    out.writeBoolean(isMajor);
+    out.writeInt(maxWaitTime);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException,
+      ClassNotFoundException {
+    this.buckets = DataSerializer.readHashSet(in);
+    this.isMajor = in.readBoolean();
+    this.maxWaitTime = in.readInt();
+  }
+
+  @Override
+  public Version[] getSerializationVersions() {
+    return serializationVersions;
+  }
+
+  public Set<Integer> getBuckets() {
+    return (Set<Integer>) buckets;
+  }
+
+  public void setBuckets(Set<Integer> buckets) {
+    this.buckets = new HashSet<Integer>(buckets);
+  }
+
+  public boolean isMajor() {
+    return isMajor;
+  }
+
+  public void setMajor(boolean isMajor) {
+    this.isMajor = isMajor;
+  }
+
+  public boolean isSynchronous() {
+    return maxWaitTime == 0;
+  }
+
+  public int getMaxWaitTime() {
+    return this.maxWaitTime;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getClass().getCanonicalName()).append("@")
+    .append(System.identityHashCode(this))
+    .append(" buckets:").append(buckets)
+    .append(" isMajor:").append(isMajor)
+    .append(" maxWaitTime:").append(maxWaitTime);
+    return sb.toString();
+  }
+}


Mime
View raw message