geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [47/53] [partial] incubator-geode git commit: Initial import of geode-1.0.0.0-SNAPSHOT-2. All the new sub-project directories (like jvsd) were not imported. A diff was done to confirm that this commit is exactly the same as the open directory the snapsho
Date Mon, 06 Jul 2015 18:15:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
new file mode 100644
index 0000000..80da4f3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEntriesSet.java
@@ -0,0 +1,320 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.util.AbstractSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventQueueIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.HDFSRegionMap;
+import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
+import com.gemstone.gemfire.internal.cache.LocalRegion.IteratorType;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import org.apache.hadoop.hbase.util.Bytes;
+
+@SuppressWarnings("rawtypes")
+public class HDFSEntriesSet extends AbstractSet {
+  private final IteratorType type;
+
+  private final HoplogOrganizer hoplogs;
+  private final HDFSBucketRegionQueue brq;
+  
+  private final BucketRegion region; 
+  private final ReferenceQueue<HDFSIterator> refs;
+  
+  public HDFSEntriesSet(BucketRegion region, HDFSBucketRegionQueue brq, 
+      HoplogOrganizer hoplogs, IteratorType type, ReferenceQueue<HDFSIterator> refs) {
+    this.region = region;
+    this.brq = brq;
+    this.hoplogs = hoplogs;
+    this.type = type;
+    this.refs = refs;
+  }
+  
+  @Override
+  public HDFSIterator iterator() {
+    HDFSIterator iter = new HDFSIterator(type, region.getPartitionedRegion(), true);
+    if (refs != null) {
+      // we can't rely on an explicit close but we need to free resources
+      //
+      // This approach has the potential to cause excessive memory load and/or
+      // GC problems if an app holds an iterator ref too long. A lease-based
+      // approach where iterators are automatically for X secs of inactivity is
+      // a potential alternative (but may require tuning for certain
+      // applications)
+      new WeakReference<HDFSEntriesSet.HDFSIterator>(iter, refs);
+    }
+    return iter;
+  }
+
+  @Override
+  public int size() {
+    // TODO this is the tortoise version, need a fast version for estimation
+    // note: more than 2^31-1 records will cause this counter to wrap
+    int size = 0;
+    HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
+    try {
+      while (iter.hasNext()) {
+        if (includeEntry(iter.next())) {
+          size++;
+        }
+      }
+    } finally {
+      iter.close();
+    }
+    return size;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    HDFSIterator iter = new HDFSIterator(null, region.getPartitionedRegion(), false);
+    try {
+      while (iter.hasNext()) {
+        if (includeEntry(iter.next())) {
+          return false;
+        }
+      }
+    } finally {
+      iter.close();
+    }
+    return true;
+  }
+
+  private boolean includeEntry(Object val) {
+    if (val instanceof HDFSGatewayEventImpl) {
+      HDFSGatewayEventImpl evt = (HDFSGatewayEventImpl) val;
+      if (evt.getOperation().isDestroy()) {
+        return false;
+      }
+    } else if (val instanceof PersistedEventImpl) {
+      PersistedEventImpl evt = (PersistedEventImpl) val;
+      if (evt.getOperation().isDestroy()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public class HDFSIterator implements Iterator {
+    private final IteratorType type;
+    private final boolean deserialize;
+    
+    private final SortedEventQueueIterator queue;
+    private final HoplogIterator<byte[], SortedHoplogPersistedEvent> hdfs;
+    private Iterator txCreatedEntryIterator;
+    
+    private boolean queueNext;
+    private boolean hdfsNext;
+    private boolean forUpdate;
+    private boolean hasTxEntry;
+
+    private byte[] currentHdfsKey;
+
+    public HDFSIterator(IteratorType type, Region region, boolean deserialize) {
+      this.type = type;
+      this.deserialize = deserialize;
+
+      // Check whether the queue has become primary here.
+      // There could be some time between bucket becoming a primary 
+      // and underlying queue becoming a primary, so isPrimaryWithWait() 
+      // waits for some time for the queue to become a primary on this member
+      if (!brq.getBucketAdvisor().isPrimaryWithWait()) {
+        InternalDistributedMember primaryHolder = brq.getBucketAdvisor()
+            .basicGetPrimaryMember();
+        throw new PrimaryBucketException("Bucket " + brq.getName()
+            + " is not primary. Current primary holder is " + primaryHolder);
+      }
+      // We are deliberating NOT sync'ing while creating the iterators.  If done
+      // in the correct order, we may get duplicates (due to an in-progress
+      // flush) but we won't miss any entries.  The dupes will be eliminated
+      // during iteration.
+      queue = brq.iterator(region);
+      advanceQueue();
+      
+      HoplogIterator<byte[], SortedHoplogPersistedEvent> tmp = null;
+      try {
+        tmp = hoplogs.scan();
+      } catch (IOException e) {
+        HDFSEntriesSet.this.region.checkForPrimary();
+        throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
+      }
+      
+      hdfs = tmp;
+      if (hdfs != null) {
+        advanceHdfs();
+      }
+    }
+    
+    @Override
+    public boolean hasNext() {
+      boolean nonTxHasNext = hdfsNext || queueNext;
+      if (!nonTxHasNext && this.txCreatedEntryIterator != null) {
+        this.hasTxEntry = this.txCreatedEntryIterator.hasNext();
+        return this.hasTxEntry;
+      }
+      return nonTxHasNext;
+    }
+    
+    @Override
+    public Object next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      if (hasTxEntry) {
+        hasTxEntry = false;
+        return this.txCreatedEntryIterator.next();
+      }
+
+      Object val;
+      if (!queueNext) {
+        val = getFromHdfs();
+        advanceHdfs();
+        
+      } else if (!hdfsNext) {
+        val = getFromQueue();
+        advanceQueue();
+        
+      } else {
+        byte[] qKey = queue.current().getSerializedKey();
+        byte[] hKey = this.currentHdfsKey;
+        
+        int diff = Bytes.compareTo(qKey, hKey);
+        if (diff < 0) {
+          val = getFromQueue();
+          advanceQueue();
+          
+        } else if (diff == 0) {
+          val = getFromQueue();
+          advanceQueue();
+
+          // ignore the duplicate
+          advanceHdfs();
+
+        } else {
+          val = getFromHdfs();
+          advanceHdfs();
+        }
+      }
+      return val;
+    }
+    
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+    
+    public void close() {
+      if (queueNext) {
+        queue.close();
+      }
+
+      if (hdfsNext) {
+        hdfs.close();
+      }
+    }
+
+    private Object getFromQueue() {
+      HDFSGatewayEventImpl evt = queue.current();
+      if (type == null) {
+        return evt;
+      }
+      
+      switch (type) {
+      case KEYS:
+        byte[] key = evt.getSerializedKey();
+        return deserialize ? EntryEventImpl.deserialize(key) : key;
+        
+      case VALUES:
+        return evt.getValue();
+        
+      default:
+        Object keyObj = EntryEventImpl.deserialize(evt.getSerializedKey());
+        if(keyObj instanceof KeyWithRegionContext) {
+          ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
+        }
+        return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, evt, true, forUpdate);
+      }
+    }
+
+    private Object getFromHdfs() {
+      if (type == null) {
+        return hdfs.getValue();
+      }
+      
+      switch (type) {
+      case KEYS:
+        byte[] key = this.currentHdfsKey;
+        return deserialize ? EntryEventImpl.deserialize(key) : key;
+        
+      case VALUES:
+        PersistedEventImpl evt = hdfs.getValue();
+        return evt.getValue();
+        
+      default:
+        Object keyObj = EntryEventImpl.deserialize(this.currentHdfsKey);
+        if(keyObj instanceof KeyWithRegionContext) {
+          ((KeyWithRegionContext)keyObj).setRegionContext(region.getPartitionedRegion());
+        }
+        return ((HDFSRegionMap) region.getRegionMap()).getDelegate().getEntryFromEvent(keyObj, hdfs.getValue(), true, forUpdate);
+      }
+    }
+    
+    private void advanceHdfs() {
+      if (hdfsNext = hdfs.hasNext()) {
+        try {
+          this.currentHdfsKey = hdfs.next();
+        } catch (IOException e) {
+          region.checkForPrimary();
+          throw new HDFSIOException(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(e.getMessage()), e);
+        }
+      } else {
+        this.currentHdfsKey = null;
+        hdfs.close();
+      }
+    }
+    
+    private void advanceQueue() {
+      if (queueNext = queue.hasNext()) {
+        queue.next();
+      } else {
+        brq.checkForPrimary();
+        queue.close();
+      }
+    }
+    
+    public void setForUpdate(){
+      this.forUpdate = true;
+    }
+    
+    /**MergeGemXDHDFSToGFE not sure of this function is required */ 
+    /*public void setTXState(TXState txState) {
+      TXRegionState txr = txState.getTXRegionState(region);
+      if (txr != null) {
+        txr.lock();
+        try {
+          this.txCreatedEntryIterator = txr.getCreatedEntryKeys().iterator();
+        }
+        finally{
+          txr.unlock();
+        }
+      }
+    }*/
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
new file mode 100644
index 0000000..39a2cb6
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventListener.java
@@ -0,0 +1,171 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.i18n.StringIdImpl;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+
+/**
+ * Listener that persists events to HDFS
+ *
+ * @author Hemant Bhanawat
+ */
+public class HDFSEventListener implements AsyncEventListener {
+  private final LogWriterI18n logger;
+  private volatile boolean senderStopped = false;
+
+  private final FailureTracker failureTracker = new FailureTracker(10L, 60 * 1000L, 1.5f);
+
+  public HDFSEventListener(LogWriterI18n logger) {
+    this.logger = logger;
+  }
+  
+  @Override
+  public void close() {
+    senderStopped = true;
+  }
+  
+  @Override
+  public boolean processEvents(List<AsyncEvent> events) {
+    if (Hoplog.NOP_WRITE) {
+      return true;
+    }
+    
+    // The list of events that async queue receives are sorted at the
+    // bucket level. Events for multiple regions are concatenated together.
+    // Events for multiple buckets are sent which are concatenated
+    // one after the other for e.g.
+    //
+    // <Region1, Key1, bucket1>, <Region1, Key19, bucket1>, 
+    // <Region1, Key4, bucket2>, <Region1, Key6, bucket2>
+    // <Region2, Key1, bucket1>, <Region2, Key4, bucket1>
+    // ..
+    
+    Region previousRegion = null;
+    int prevBucketId = -1;
+    ArrayList<QueuedPersistentEvent> list = null;
+    boolean success = false;
+    try {
+      //Back off if we are experiencing failures
+      failureTracker.sleepIfRetry();
+      
+      HoplogOrganizer bucketOrganizer = null; 
+      for (AsyncEvent asyncEvent : events) {
+        if (senderStopped){
+          failureTracker.failure();
+          if (logger.fineEnabled()) {
+            logger.fine("HDFSEventListener.processEvents: Cache is closing down. Ignoring the batch of data.");
+          }
+          return false;
+        }
+        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)asyncEvent;
+        Region region = hdfsEvent.getRegion();
+        
+        if (prevBucketId != hdfsEvent.getBucketId() || region != previousRegion){
+          if (prevBucketId != -1) {
+            bucketOrganizer.flush(list.iterator(), list.size());
+            success=true;
+            if (logger.fineEnabled()) {
+              logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
+            }
+          }
+          bucketOrganizer = getOrganizer((PartitionedRegion) region, hdfsEvent.getBucketId());
+          // Bucket organizer can be null only when the bucket has moved. throw an exception so that the 
+          // batch is discarded. 
+          if (bucketOrganizer == null)
+            throw new BucketMovedException("Bucket moved. BucketId: " + hdfsEvent.getBucketId() +  " HDFSRegion: " + region.getName());
+          list = new  ArrayList<QueuedPersistentEvent>();
+        }
+        try {
+          //TODO:HDFS check if there is any serialization overhead
+          list.add(new SortedHDFSQueuePersistedEvent(hdfsEvent));
+        } catch (ClassNotFoundException e) {
+          //TODO:HDFS add localized string
+          logger.warning(new StringIdImpl(0, "Error while converting HDFSGatewayEvent to PersistedEventImpl."), e);
+          return false;
+        }
+        prevBucketId = hdfsEvent.getBucketId();
+        previousRegion = region;
+        
+      }
+      if (bucketOrganizer != null) {
+        bucketOrganizer.flush(list.iterator(), list.size());
+        success = true;
+        
+        if (logger.fineEnabled()) {
+          logger.fine("Batch written to HDFS of size " + list.size() + " for region " + previousRegion);
+        }
+      }
+    } catch (IOException e) {
+      logger.warning(LocalizedStrings.HOPLOG_FLUSH_FOR_BATCH_FAILED, e);
+      return false;
+    }
+    catch (ForceReattemptException e) {
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    }
+    catch(PrimaryBucketException e) {
+      //do nothing, the bucket is no longer primary so we shouldn't get the same
+      //batch next time.
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    }
+    catch(BucketMovedException e) {
+      //do nothing, the bucket is no longer primary so we shouldn't get the same
+      //batch next time.
+      if (logger.fineEnabled())
+        logger.fine(e);
+      return false;
+    }
+    catch (CacheClosedException e) {
+      if (logger.fineEnabled())
+        logger.fine(e);
+      // exit silently
+      return false;
+    } catch (InterruptedException e1) {
+      if (logger.fineEnabled())
+        logger.fine(e1);
+      return false;
+    } finally {
+      failureTracker.record(success);
+    }
+
+    return true;
+  }
+  
+  private HoplogOrganizer getOrganizer(PartitionedRegion region, int bucketId) {
+    BucketRegion br = region.getDataStore().getLocalBucketById(bucketId);
+    if (br == null) {
+      // got rebalanced or something
+      throw new PrimaryBucketException("Bucket region is no longer available " + bucketId + region);
+    }
+
+    return br.getHoplogOrganizer();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueAttributesImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueAttributesImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueAttributesImpl.java
new file mode 100644
index 0000000..df89841
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueAttributesImpl.java
@@ -0,0 +1,179 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
+import com.gemstone.gemfire.internal.lang.ObjectUtils;
+
+/**
+ * Implementation of HDFSEventQueueAttributes
+ * HDFSEventQueueAttributes represents the attributes of the buffer where events are 
+ * accumulated before they are persisted to HDFS  
+ * 
+ * @author Hemant Bhanawat
+ */
+public class HDFSEventQueueAttributesImpl implements HDFSEventQueueAttributes, DataSerializable, Cloneable {
+
+  private static final long serialVersionUID = 5052784372168230680L;
+  private int maximumQueueMemory;
+  private int batchSize;
+  private boolean isPersistenceEnabled;
+  public String diskStoreName;
+  private int batchIntervalMillis;
+  private boolean diskSynchronous;
+  private int dispatcherThreads;
+  
+  public HDFSEventQueueAttributesImpl(String diskStoreName,
+      int maximumQueueMemory, int batchSize, boolean isPersistenceEnabled,
+      int batchIntervalMillis,  boolean diskSynchronous, int dispatcherThreads) {
+    this.diskStoreName = diskStoreName;
+    this.maximumQueueMemory = maximumQueueMemory;
+    this.batchSize = batchSize;
+    this.isPersistenceEnabled = isPersistenceEnabled;
+    this.batchIntervalMillis = batchIntervalMillis;
+    this.diskSynchronous = diskSynchronous;
+    this.dispatcherThreads = dispatcherThreads;
+  }
+
+  @Override
+  public String getDiskStoreName() {
+    return this.diskStoreName;
+  }
+
+  @Override
+  public int getMaximumQueueMemory() {
+    return this.maximumQueueMemory;
+  }
+
+  @Override
+  public int getBatchSizeMB() {
+    return this.batchSize;
+  }
+
+  @Override
+  public boolean isPersistent() {
+    return this.isPersistenceEnabled;
+  }
+
+  @Override
+  public int getBatchTimeInterval() {
+    return this.batchIntervalMillis;
+  }
+
+  @Override
+  public boolean isDiskSynchronous() {
+    return this.diskSynchronous;
+  }
+
+  @Override
+  public String toString()
+  {
+    StringBuffer s = new StringBuffer();
+    return s.append("HDFSEventQueueAttributes@")
+      .append(System.identityHashCode(this))
+      .append("[maximumQueueMemory=").append(this.maximumQueueMemory)
+      .append(";batchSize=").append(this.batchSize)
+      .append(";isPersistenceEnabled=").append(this.isPersistenceEnabled)
+      .append(";diskStoreName=").append(this.diskStoreName)
+      .append(";batchIntervalMillis=").append(this.batchIntervalMillis)
+      .append(";diskSynchronous=").append(this.diskSynchronous)
+      .append(";dispatcherThreads=").append(this.dispatcherThreads)
+      .append("]") .toString();
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    out.writeInt(this.maximumQueueMemory);
+    out.writeInt(this.batchSize);
+    out.writeBoolean(this.isPersistenceEnabled);
+    DataSerializer.writeString(this.diskStoreName, out);
+    out.writeInt(this.batchIntervalMillis);
+    out.writeBoolean(this.diskSynchronous);
+    out.writeInt(this.dispatcherThreads);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.maximumQueueMemory = in.readInt();
+    this.batchSize = in.readInt();
+    this.isPersistenceEnabled = in.readBoolean();
+    this.diskStoreName = DataSerializer.readString(in);
+    this.batchIntervalMillis = in.readInt();
+    this.diskSynchronous = in.readBoolean();
+    this.dispatcherThreads = in.readInt();
+  }
+  
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) { 
+      return true;
+    }
+    
+    if (! (obj instanceof HDFSEventQueueAttributes)) {
+      return false;
+    }
+      
+    HDFSEventQueueAttributes other = (HDFSEventQueueAttributes) obj;
+      
+      if (this.maximumQueueMemory != other.getMaximumQueueMemory()
+          || this.batchSize != other.getBatchSizeMB()
+          || this.isPersistenceEnabled != other.isPersistent()
+          || this.batchIntervalMillis != other.getBatchTimeInterval()
+          || this.diskSynchronous != other.isDiskSynchronous()
+          || this.dispatcherThreads != other.getDispatcherThreads()
+              || ObjectUtils.equals(getDiskStoreName(), other.getDiskStoreName())
+        ) {
+        return false;
+        
+    }
+  
+    return true;
+  }
+  
+  @Override
+  public Object clone() {
+    HDFSEventQueueAttributesImpl other = null;
+    try {
+      other =
+          (HDFSEventQueueAttributesImpl) super.clone();
+    } catch (CloneNotSupportedException e) {
+    } 
+    other.maximumQueueMemory = this.maximumQueueMemory;
+    other.batchSize = this.batchSize;
+    other.isPersistenceEnabled = this.isPersistenceEnabled;
+    other.diskStoreName = this.diskStoreName;
+    other.batchIntervalMillis = this.batchIntervalMillis;
+    other.diskSynchronous = this.diskSynchronous;
+    other.dispatcherThreads = this.dispatcherThreads;
+    return other;
+  }
+  
+  @Override
+  public int hashCode() {
+	assert false : "hashCode not designed";
+	return -1;
+  }
+  
+  public HDFSEventQueueAttributesImpl copy() {
+    return (HDFSEventQueueAttributesImpl) clone();
+  }
+
+	@Override
+  public int getDispatcherThreads() {
+	  return this.dispatcherThreads;
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
new file mode 100644
index 0000000..41b2aea
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSEventQueueFilter.java
@@ -0,0 +1,65 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+
+/**
+ * Current use of this class is limited to ignoring the Bulk DML operations. 
+ * 
+ * @author hemantb
+ *
+ */
+public class HDFSEventQueueFilter implements GatewayEventFilter{
+  private LogWriterI18n logger;
+  
+  public HDFSEventQueueFilter(LogWriterI18n logger) {
+    this.logger = logger; 
+  }
+  @Override
+  public void close() {
+    
+  }
+
+  @Override
+  public boolean beforeEnqueue(GatewayQueueEvent event) {
+    Operation op = event.getOperation();
+    
+    
+    /* MergeGemXDHDFSToGFE - Disabled as it is gemxd specific 
+    if (op == Operation.BULK_DML_OP) {
+     // On accessors there are no parallel queues, so with the 
+     // current logic, isSerialWanEnabled function in LocalRegion 
+     // always returns true on an accessor. So when a bulk dml 
+     // op is fired on accessor, this behavior results in distribution 
+     // of the bulk dml operation to other members. To avoid putting 
+     // of this bulk dml in parallel queues, added this filter. This 
+     // is not the efficient way as the filters are used before inserting 
+     // in the queue. The bulk dmls should be blocked before they are distributed.
+     if (logger.fineEnabled())
+       logger.fine( "HDFSEventQueueFilter:beforeEnqueue: Disallowing insertion of a bulk DML in HDFS queue.");
+      return false;
+    }*/
+    
+    return true;
+  }
+
+  @Override
+  public boolean beforeTransmit(GatewayQueueEvent event) {
+   // No op
+   return true;
+  }
+
+  @Override
+  public void afterAcknowledgement(GatewayQueueEvent event) {
+    // No op
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
new file mode 100644
index 0000000..e3bd80b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSGatewayEventImpl.java
@@ -0,0 +1,173 @@
+
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.lru.Sizeable;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.offheap.StoredObject;
+import com.gemstone.gemfire.internal.offheap.annotations.Retained;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+
+/**
+ * Gateway event extended for HDFS functionality 
+ *
+ * @author Hemant Bhanawat
+ */
+public class HDFSGatewayEventImpl extends GatewaySenderEventImpl {
+  
+  private static final long serialVersionUID = 4642852957292192406L;
+  protected transient boolean keyIsSerialized = false;
+  protected byte[] serializedKey = null; 
+  protected VersionTag versionTag; 
+  
+  public HDFSGatewayEventImpl(){
+  }
+  
+  @Retained
+  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+      Object substituteValue)
+      throws IOException {
+    super(operation, event, substituteValue);
+    initializeHDFSGatewayEventObject(event);
+  }
+
+  @Retained
+  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+      Object substituteValue, boolean initialize, int bucketId) throws IOException {
+    super(operation, event,substituteValue, initialize, bucketId);
+    initializeHDFSGatewayEventObject(event);
+  }
+
+  @Retained
+  public HDFSGatewayEventImpl(EnumListenerEvent operation, EntryEvent event,
+      Object substituteValue, boolean initialize) throws IOException {
+    super(operation, event, substituteValue, initialize);
+    initializeHDFSGatewayEventObject(event);
+  }
+
+  protected HDFSGatewayEventImpl(HDFSGatewayEventImpl offHeapEvent) {
+    super(offHeapEvent);
+    this.keyIsSerialized = offHeapEvent.keyIsSerialized;
+    this.serializedKey = offHeapEvent.serializedKey;
+    this.versionTag = offHeapEvent.versionTag;
+  }
+  
+  @Override
+  protected GatewaySenderEventImpl makeCopy() {
+    return new HDFSGatewayEventImpl(this);
+  }
+
+  private void initializeHDFSGatewayEventObject(EntryEvent event)
+      throws IOException {
+
+    serializeKey();
+    versionTag = ((EntryEventImpl)event).getVersionTag();
+    if (versionTag != null && versionTag.getMemberID() == null) {
+      versionTag.setMemberID(((LocalRegion)getRegion()).getVersionMember());
+    }
+  }
+
+  private void serializeKey() throws IOException {
+    if (!keyIsSerialized && isInitialized())
+    {
+      this.serializedKey = CacheServerHelper.serialize(this.key);
+      keyIsSerialized = true;
+    } 
+  }
+  /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
+  /*@Override
+  protected StoredObject obtainOffHeapValueBasedOnOp(EntryEventImpl event,
+      boolean hasNonWanDispatcher) {
+    return  event.getOffHeapNewValue();
+  }*/
+  
+  /**MergeGemXDHDFSToGFE This function needs to enabled if similar functionality is added to gatewaysendereventimpl*/
+  /*@Override
+  protected Object obtainHeapValueBasedOnOp(EntryEventImpl event,
+      boolean hasNonWanDispatcher) {
+    return   event.getRawNewValue(shouldApplyDelta());
+  }*/
+  
+  @Override
+  protected boolean shouldApplyDelta() {
+    return true;
+  }
+
+  
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    DataSerializer.writeObject(this.versionTag, out);
+    
+  }
+  
+  @Override
+  protected void serializeKey(DataOutput out) throws IOException {
+    DataSerializer.writeByteArray((byte[])this.serializedKey, out);
+  }
+  
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    super.fromData(in);
+    this.versionTag = (VersionTag)DataSerializer.readObject(in);
+  }
+  
+  @Override
+  protected void deserializeKey(DataInput in) throws IOException,
+    ClassNotFoundException {
+    this.serializedKey = DataSerializer.readByteArray(in);
+    this.key = BlobHelper.deserializeBlob(this.serializedKey,
+        InternalDataSerializer.getVersionForDataStreamOrNull(in), null);
+    keyIsSerialized = true;
+  }
+
+  @Override
+  public int getDSFID() {
+    
+    return HDFS_GATEWAY_EVENT_IMPL;
+  }
+  public byte[] getSerializedKey() {
+    
+    return this.serializedKey;
+  }
+  
+  public VersionTag getVersionTag() {
+    
+    return this.versionTag;
+  }
+  
+  /**
+   * Returns the size on HDFS of this event  
+   * @param writeOnly
+   */
+  public int getSizeOnHDFSInBytes(boolean writeOnly) {
+  
+    if (writeOnly)
+      return UnsortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,  
+          getSerializedValueSize(), this.versionTag);
+    else
+      return SortedHDFSQueuePersistedEvent.getSizeInBytes(this.serializedKey.length,  
+          getSerializedValueSize(), this.versionTag);
+  
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
new file mode 100644
index 0000000..242923b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSIntegrationUtil.java
@@ -0,0 +1,124 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * Contains utility functions
+ *
+ * @author Hemant Bhanawat
+ *
+ */
+public class HDFSIntegrationUtil {
+  
+  public static <K, V> AsyncEventQueue createDefaultAsyncQueueForHDFS(Cache cache, 
+     boolean writeOnly, String regionPath)
+  {
+    // Create default event attributes 
+    HDFSEventQueueAttributesFactory  hdfsqueueFactory = new HDFSEventQueueAttributesFactory();
+    return createAsyncQueueForHDFS(cache,
+        regionPath, writeOnly, hdfsqueueFactory.create());
+  }
+  
+  public static AsyncEventQueue createAsyncQueueForHDFS(Cache cache,
+      String regionPath, boolean writeOnly, HDFSEventQueueAttributes eventAttribs)
+   {
+     LogWriterI18n logger = cache.getLoggerI18n();
+     String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(regionPath);
+
+     AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+     factory.setBatchSize(eventAttribs.getBatchSizeMB());
+     factory.setPersistent(eventAttribs.isPersistent());
+     factory.setDiskStoreName(eventAttribs.getDiskStoreName());
+     factory.setMaximumQueueMemory(eventAttribs.getMaximumQueueMemory());
+     factory.setBatchTimeInterval(eventAttribs.getBatchTimeInterval());
+     factory.setDiskSynchronous(eventAttribs.isDiskSynchronous());
+     factory.setDiskSynchronous(eventAttribs.isDiskSynchronous());
+     factory.setDispatcherThreads(eventAttribs.getDispatcherThreads());
+     factory.setParallel(true);
+     factory.addGatewayEventFilter(new HDFSEventQueueFilter(logger));
+     ((AsyncEventQueueFactoryImpl)factory).setBucketSorted(!writeOnly);
+     ((AsyncEventQueueFactoryImpl)factory).setIsHDFSQueue(true);
+     
+     AsyncEventQueue asyncQ = null;
+     
+     if (!writeOnly)
+       asyncQ = factory.create(defaultAsyncQueueName, new HDFSEventListener(cache.getLoggerI18n()));
+     else
+       asyncQ = factory.create(defaultAsyncQueueName, new HDFSWriteOnlyStoreEventListener(cache.getLoggerI18n()));
+     
+     logger.fine("HDFS: async queue created for HDFS. Id: " + asyncQ.getId() + ". Disk store: " + asyncQ.getDiskStoreName() + 
+         ". Batch size: " + asyncQ.getBatchSize() + ". bucket sorted:  " + !writeOnly) ;
+     return asyncQ;
+     
+   }
+  
+  public static  void createAndAddAsyncQueue(String regionPath,
+      RegionAttributes regionAttributes, Cache cache) {
+    if(!regionAttributes.getDataPolicy().withHDFS()) {
+      return;
+    }
+    
+    String leaderRegionPath = getLeaderRegionPath(regionPath, regionAttributes, cache);
+    
+    String defaultAsyncQueueName = HDFSStoreFactoryImpl.getEventQueueName(leaderRegionPath);
+    if (cache.getAsyncEventQueue(defaultAsyncQueueName) == null) {
+      if (regionAttributes.getHDFSStoreName() != null && regionAttributes.getPartitionAttributes() != null 
+          && !(regionAttributes.getPartitionAttributes().getLocalMaxMemory() == 0)) {
+        HDFSStore store = ((GemFireCacheImpl)cache).findHDFSStore(regionAttributes.getHDFSStoreName());
+        if (store == null) {
+          throw new IllegalStateException(
+              LocalizedStrings.HOPLOG_HDFS_STORE_NOT_FOUND
+                  .toLocalizedString(regionAttributes.getHDFSStoreName()));
+        }
+        HDFSEventQueueAttributes queueAttrs = store.getHDFSEventQueueAttributes();
+        if(queueAttrs == null) {
+          // no async queue is specified for region with a HDFS store. Create a async queue with default 
+          // properties and set the bucketsorted=true.
+          HDFSIntegrationUtil.createDefaultAsyncQueueForHDFS(cache, regionAttributes.getHDFSWriteOnly(), leaderRegionPath);
+        }
+        else {
+          HDFSIntegrationUtil.createAsyncQueueForHDFS(cache, leaderRegionPath, regionAttributes.getHDFSWriteOnly(), queueAttrs);
+        }
+      }
+    }
+  }
+
+  private static String getLeaderRegionPath(String regionPath,
+      RegionAttributes regionAttributes, Cache cache) {
+    String colocated;
+    while(regionAttributes.getPartitionAttributes() != null 
+        && (colocated = regionAttributes.getPartitionAttributes().getColocatedWith()) != null) {
+      // Do not waitOnInitialization() for PR
+      GemFireCacheImpl gfc = (GemFireCacheImpl)cache;
+      Region colocatedRegion = gfc.getPartitionedRegion(colocated, false);
+      if(colocatedRegion == null) {
+        Assert.fail("Could not find parent region " + colocated + " for " + regionPath);
+      }
+      regionAttributes = colocatedRegion.getAttributes();
+      regionPath = colocatedRegion.getFullPath();
+    }
+    return regionPath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
new file mode 100644
index 0000000..9b1542b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSParallelGatewaySenderQueue.java
@@ -0,0 +1,464 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.i18n.LogWriterI18n;
+import com.gemstone.gemfire.internal.SystemTimer;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.cache.ColocationHelper;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+
+/**
+ * Parallel Gateway Sender Queue extended for HDFS functionality 
+ *
+ * @author Hemant Bhanawat
+ */
+public class HDFSParallelGatewaySenderQueue extends ParallelGatewaySenderQueue {
+
+  private int currentBucketIndex = 0;
+  private int elementsPeekedAcrossBuckets = 0;
+  private SystemTimer rollListTimer = null;
+  public static final String ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP = "gemfire.ROLL_SORTED_LIST_TIME_INTERVAL_MS";
+  private final int ROLL_SORTED_LIST_TIME_INTERVAL_MS = Integer.getInteger(ROLL_SORTED_LIST_TIME_INTERVAL_MS__PROP, 3000);
+  
+  public HDFSParallelGatewaySenderQueue(AbstractGatewaySender sender,
+      Set<Region> userPRs, int idx, int nDispatcher) {
+     
+    super(sender, userPRs, idx, nDispatcher);
+    //only first dispatcher Hemant?
+    if (sender.getBucketSorted() && this.index == 0) {
+      rollListTimer = new SystemTimer(sender.getCache().getDistributedSystem(),
+          true);
+      // schedule the task to roll the skip lists
+      rollListTimer.scheduleAtFixedRate(new RollSortedListsTimerTask(), 
+          ROLL_SORTED_LIST_TIME_INTERVAL_MS, ROLL_SORTED_LIST_TIME_INTERVAL_MS);
+    }
+  }
+  
+  @Override
+  public Object peek() throws InterruptedException, CacheException {
+    /* If you call peek and use super.peek it leads to the following exception.
+     * So I'm adding an explicit UnsupportedOperationException.
+     Caused by: java.lang.ClassCastException: com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue cannot be cast to com.gemstone.gemfire.internal.cache.BucketRegionQueue
+        at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.getRandomPrimaryBucket(ParallelGatewaySenderQueue.java:964)
+        at com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue.peek(ParallelGatewaySenderQueue.java:1078)
+     */
+    throw new UnsupportedOperationException();
+  }
+  
+  
+  @Override
+  public void cleanUp() {
+    super.cleanUp();
+    cancelRollListTimer();
+  }
+  
+  private void cancelRollListTimer() {
+    if (rollListTimer != null) {
+      rollListTimer.cancel();
+      rollListTimer = null;
+    }
+  }
+  /**
+   * A call to this function peeks elements from the first local primary bucket. 
+   * Next call to this function peeks elements from the next local primary 
+   * bucket and so on.  
+   */
+  @Override
+  public List peek(int batchSize, int timeToWait) throws InterruptedException,
+  CacheException {
+    
+    List batch = new ArrayList();
+    
+    int batchSizeInBytes = batchSize*1024*1024;
+    PartitionedRegion prQ = getRandomShadowPR();
+    if (prQ == null || prQ.getLocalMaxMemory() == 0) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      blockProcesorThreadIfRequired();
+      return batch;
+    }
+    
+    ArrayList list = null;
+    ArrayList<Integer> pbuckets = new ArrayList<Integer>(prQ
+        .getDataStore().getAllLocalPrimaryBucketIds());
+    ArrayList<Integer> buckets = new ArrayList<Integer>();
+    for(Integer i : pbuckets) {
+    	if(i % this.nDispatcher == this.index)
+    		buckets.add(i);
+    }
+    // In case of failures, peekedEvents would possibly have some elements 
+    // add them. 
+    if (this.resetLastPeeked) {
+      int previousBucketId = -1;
+      boolean stillPrimary = true; 
+      Iterator<GatewaySenderEventImpl>  iter = peekedEvents.iterator();
+      // we need to remove the events of the bucket that are no more primary on 
+      // this node as they cannot be persisted from this node. 
+      while(iter.hasNext()) {
+        HDFSGatewayEventImpl hdfsEvent = (HDFSGatewayEventImpl)iter.next();
+        if (previousBucketId != hdfsEvent.getBucketId()){
+          stillPrimary = buckets.contains(hdfsEvent.getBucketId());
+          previousBucketId = hdfsEvent.getBucketId();
+        }
+        if (stillPrimary)
+          batch.add(hdfsEvent);
+        else {
+          iter.remove();
+        }
+      }
+      this.resetLastPeeked = false;
+    }
+    
+    if (buckets.size() == 0) {
+      // Sleep a bit before trying again. provided by Dan
+      try {
+        Thread.sleep(50);
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      return batch;
+    }
+    
+    if (this.sender.getBucketSorted()) {
+      
+    }
+    
+    // Each call to this function returns index of next bucket 
+    // that is to be processed. This function takes care 
+    // of the bucket sequence that is peeked by a sequence of 
+    // peek calls. 
+    // If there are bucket movements between two consecutive 
+    // calls to this function then there is chance that a bucket 
+    // is processed twice while another one is skipped. But, that is 
+    // ok because in the next round, it will be processed. 
+    Integer bIdIndex = getCurrentBucketIndex(buckets.size());
+    
+    // If we have gone through all the buckets once and no  
+    // elements were peeked from any of the buckets, take a nap.  
+    // This always sleep in the first call but that should be ok  
+    // because the timeToWait in practical use cases would be greater 
+    // than this sleep of 100 ms.  
+    if (bIdIndex == 0 && getAndresetElementsPeekedAcrossBuckets() == 0) { 
+      try { 
+        Thread.sleep(100); 
+      } catch (InterruptedException e) { 
+        Thread.currentThread().interrupt(); 
+      } 
+    } 
+    
+    HDFSBucketRegionQueue hrq = ((HDFSBucketRegionQueue)prQ
+        .getDataStore().getLocalBucketById(buckets.get(bIdIndex)));
+    
+    if (hrq == null) {
+      // bucket moved to another node after getAllLocalPrimaryBucketIds
+      // was called. Peeking not possible. return. 
+      return batch;
+    }
+    long entriesWaitingTobePeeked = hrq.totalEntries();
+    
+    if (entriesWaitingTobePeeked == 0) {
+      blockProcesorThreadIfRequired();
+      return batch;
+    }
+    
+    long currentTimeInMillis = System.currentTimeMillis();
+    long bucketSizeInBytes = hrq.getQueueSizeInBytes();
+    if (((currentTimeInMillis - hrq.getLastPeekTimeInMillis()) >  timeToWait)  
+        || ( bucketSizeInBytes > batchSizeInBytes)
+        || hrq.shouldDrainImmediately()) {
+      // peek now
+      if (logger.isDebugEnabled()) { 
+        logger.debug("Peeking queue " + hrq.getId()   + ": bucketSizeInBytes " + bucketSizeInBytes
+            + ":  batchSizeInBytes" + batchSizeInBytes
+            + ":  timeToWait" + timeToWait
+            + ":  (currentTimeInMillis - hrq.getLastPeekTimeInMillis())" + (currentTimeInMillis - hrq.getLastPeekTimeInMillis()));
+      }
+
+      list = peekAhead(buckets.get(bIdIndex), hrq);
+      
+      if (list != null && list.size() != 0 ) {
+        for (Object object : list) {
+          batch.add(object);
+          peekedEvents.add((HDFSGatewayEventImpl)object);
+        }
+      }
+    }
+    else {
+      blockProcesorThreadIfRequired();
+    }
+    if (logger.isDebugEnabled()  &&  batch.size() > 0) {
+      logger.debug(this + ":  Peeked a batch of " + batch.size() + " entries");
+    }
+    
+    setElementsPeekedAcrossBuckets(batch.size()); 
+    
+    return batch;
+  }
+  
+  /**
+   * This function maintains an index of the last processed bucket.
+   * When it is called, it returns index of the next bucket. 
+   * @param totalBuckets
+   * @return current bucket index
+   */
+  private int getCurrentBucketIndex(int totalBuckets) {
+    int retBucket = currentBucketIndex;
+    if (retBucket >=  totalBuckets) {
+      currentBucketIndex = 0;
+      retBucket = 0;
+    }
+    
+    currentBucketIndex++;
+    
+    return retBucket;
+  }
+  
+  @Override
+  public void remove(int batchSize) throws CacheException {
+    int destroyed = 0;
+    HDFSGatewayEventImpl event = null;
+    
+    if (this.peekedEvents.size() > 0)
+      event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
+    
+    while (event != null && destroyed < batchSize) {
+      Region currentRegion = event.getRegion();
+      int currentBucketId = event.getBucketId();
+      int bucketId = event.getBucketId();
+        
+      ArrayList<HDFSGatewayEventImpl> listToDestroy = new ArrayList<HDFSGatewayEventImpl>();
+      ArrayList<Object> destroyedSeqNum = new ArrayList<Object>();
+      
+      // create a batch of all the entries of a bucket 
+      while (bucketId == currentBucketId) {
+        listToDestroy.add(event);
+        destroyedSeqNum.add(event.getShadowKey());
+        destroyed++;
+
+        if (this.peekedEvents.size() == 0 || (destroyed) >= batchSize) {
+          event = null; 
+          break;
+        }
+
+        event = (HDFSGatewayEventImpl)this.peekedEvents.remove();
+
+        bucketId = event.getBucketId();
+
+        if (!this.sender.isRunning()){
+          if (logger.isDebugEnabled()) {
+            logger.debug("ParallelGatewaySenderQueue#remove: Cache is closing down. Ignoring remove request.");
+          }
+          return;
+        }
+      }
+      try {
+        HDFSBucketRegionQueue brq = getBucketRegionQueue((PartitionedRegion) currentRegion, currentBucketId);
+        
+        if (brq != null) {
+          // destroy the entries from the bucket 
+          brq.destroyKeys(listToDestroy);
+          // Adding the removed event to the map for BatchRemovalMessage
+          // We need to provide the prQ as there could be multiple
+          // queue in a PGS now.
+          PartitionedRegion prQ = brq.getPartitionedRegion();
+          addRemovedEvents(prQ, currentBucketId, destroyedSeqNum);
+        }
+        
+      } catch (ForceReattemptException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("ParallelGatewaySenderQueue#remove: " + "Got ForceReattemptException for " + this
+          + " for bucket = " + bucketId);
+        }
+      }
+      catch(EntryNotFoundException e) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("ParallelGatewaySenderQueue#remove: " + "Got EntryNotFoundException for " + this
+            + " for bucket = " + bucketId );
+        }
+      }
+    }
+  }
+  
+  /** 
+  * Keeps a track of number of elements peeked across all buckets.  
+  */ 
+  private void setElementsPeekedAcrossBuckets(int peekedElements) { 
+    this.elementsPeekedAcrossBuckets +=peekedElements; 
+  } 
+  
+  /** 
+  * Returns the number of elements peeked across all buckets. Also, 
+  * resets this counter. 
+  */ 
+  private int getAndresetElementsPeekedAcrossBuckets() { 
+    int peekedElements = this.elementsPeekedAcrossBuckets; 
+    this.elementsPeekedAcrossBuckets = 0; 
+    return peekedElements; 
+  } 
+
+  @Override
+  public void remove() throws CacheException {
+    throw new UnsupportedOperationException("Method HDFSParallelGatewaySenderQueue#remove is not supported");
+  }
+ 
+  @Override
+  public void put(Object object) throws InterruptedException, CacheException {
+    super.put(object);
+  }
+  
+  protected ArrayList peekAhead(int bucketId, HDFSBucketRegionQueue hrq) throws CacheException {
+    
+    if (logger.isDebugEnabled()) {
+      logger.debug(this + ": Peekahead for the bucket " + bucketId);
+    }
+    ArrayList  list = hrq.peekABatch();
+    if (logger.isDebugEnabled() && list != null ) {
+      logger.debug(this + ": Peeked" + list.size() + "objects from bucket " + bucketId);
+    }
+
+    return list;
+  }
+  
+  @Override
+  public Object take() {
+    throw new UnsupportedOperationException("take() is not supported for " + HDFSParallelGatewaySenderQueue.class.toString());
+  }
+  
+  protected boolean isUsedForHDFS()
+  {
+    return true;
+  }
+  
+  @Override
+  protected void afterRegionAdd (PartitionedRegion userPR) {
+  }
+  
+  /**
+   * gets the value for region key from the HDFSBucketRegionQueue 
+ * @param region 
+   * @throws ForceReattemptException 
+   */
+  public HDFSGatewayEventImpl get(PartitionedRegion region, byte[] regionKey, int bucketId) throws ForceReattemptException  {
+    try {
+      HDFSBucketRegionQueue brq = getBucketRegionQueue(region, bucketId);
+      
+      if (brq ==null)
+        return null;
+      
+      return brq.getObjectForRegionKey(region, regionKey);
+    } catch(EntryNotFoundException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("HDFSParallelGatewaySenderQueue#get: " + "Got EntryNotFoundException for " + this
+            + " for bucket = " + bucketId);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void clear(PartitionedRegion pr, int bucketId) {
+    HDFSBucketRegionQueue brq;
+    try {
+      brq = getBucketRegionQueue(pr, bucketId);
+      if (brq == null)
+        return;
+      brq.clear();
+    } catch (ForceReattemptException e) {
+      //do nothing, bucket was destroyed.
+    }
+  }
+  
+  @Override
+  public int size(PartitionedRegion pr, int bucketId) throws ForceReattemptException {
+   HDFSBucketRegionQueue hq = getBucketRegionQueue(pr, bucketId);
+   return hq.size();
+  }
+
+  public HDFSBucketRegionQueue getBucketRegionQueue(PartitionedRegion region,
+      int bucketId) throws ForceReattemptException {
+    PartitionedRegion leader = ColocationHelper.getLeaderRegion(region);
+    if (leader == null)
+      return null;
+    String leaderregionPath = leader.getFullPath();
+    PartitionedRegion prQ = this.userRegionNameToshadowPRMap.get(leaderregionPath);
+    if (prQ == null)
+      return null;
+    HDFSBucketRegionQueue brq;
+
+    brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
+        .getLocalBucketById(bucketId));
+    if(brq == null) {
+      prQ.getRegionAdvisor().waitForLocalBucketStorage(bucketId);
+    }
+    brq = ((HDFSBucketRegionQueue)prQ.getDataStore()
+        .getInitializedBucketForId(null, bucketId));
+    return brq;
+  }
+  
+  /**
+   * This class has the responsibility of rolling the lists of Sorted event 
+   * Queue. The rolling of lists by a separate thread is required because 
+   * neither put thread nor the peek/remove thread can do that. Put thread
+   * cannot do it because that would mean doing some synchronization with 
+   * other put threads and peek thread that would hamper the put latency. 
+   * Peek thread cannot do it because if the event insert rate is too high
+   * the list size can go way beyond what its size. 
+   * @author hemantb
+   *
+   */
+  class RollSortedListsTimerTask extends SystemTimerTask {
+    
+    
+    /**
+     * This function ensures that if any of the buckets has lists that are beyond 
+     * its size, they gets rolled over into new skip lists. 
+     */
+    @Override
+    public void run2() {
+      Set<PartitionedRegion> prQs = getRegions();
+      for (PartitionedRegion prQ : prQs) {
+        ArrayList<Integer> buckets = new ArrayList<Integer>(prQ
+            .getDataStore().getAllLocalPrimaryBucketIds());
+        for (Integer bId : buckets) {
+          HDFSBucketRegionQueue hrq =  ((HDFSBucketRegionQueue)prQ
+              .getDataStore().getLocalBucketById(bId));
+          if (hrq == null) {
+            // bucket moved to another node after getAllLocalPrimaryBucketIds
+            // was called. continue fixing the next bucket. 
+            continue;
+          }
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rolling over the list for bucket id: " + bId);
+          }
+          hrq.rolloverSkipList();
+         }
+      }
+    }
+  }
+   
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
new file mode 100644
index 0000000..50ea3c6
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreConfigHolder.java
@@ -0,0 +1,601 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.io.Serializable;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributesFactory;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator.HDFSCompactionConfigMutator;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator.HDFSEventQueueAttributesMutator;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+
+/**
+ * Class to hold all hdfs store related configuration. Instead of copying the
+ * same members in two different classes, factory and store, this class will be
+ * used. The idea is let HdfsStoreImpl and HdfsStoreCreation delegate get calls,
+ * set calls and copy constructor calls this class. Moreover this config holder
+ * can be entirely replaced to support alter config
+ * 
+ * @author ashvina
+ */
+public class HDFSStoreConfigHolder implements HDFSStore, HDFSStoreFactory ,Serializable {  
+  private String name = null;
+  private String namenodeURL = null;
+  private String homeDir = DEFAULT_HOME_DIR;
+  private String clientConfigFile = null;
+  private float blockCacheSize = DEFAULT_BLOCK_CACHE_SIZE;
+  private int maxFileSize = DEFAULT_MAX_WRITE_ONLY_FILE_SIZE;
+  private int fileRolloverInterval = DEFAULT_WRITE_ONLY_FILE_ROLLOVER_INTERVAL;
+  protected boolean isAutoCompact = HDFSCompactionConfig.DEFAULT_AUTO_COMPACTION;
+
+  private AbstractHDFSCompactionConfigHolder compactionConfig = null;
+
+  private HDFSEventQueueAttributes hdfsEventQueueAttrs = new HDFSEventQueueAttributesFactory().create();
+  
+  private static final Logger logger = LogService.getLogger();
+  protected final String logPrefix;
+
+  public HDFSStoreConfigHolder() {
+    this(null);
+  }
+
+  /**
+   * @param config configuration source for creating this instance 
+   */
+  public HDFSStoreConfigHolder(HDFSStore config) {
+    this.logPrefix = "<" + getName() + "> ";
+    if (config == null) {
+      // initialize default compaction strategy and leave the rest for getting
+      // set later
+      this.compactionConfig = AbstractHDFSCompactionConfigHolder.createInstance(null);
+      return;
+    }
+    
+    this.name = config.getName();
+    this.namenodeURL = config.getNameNodeURL();
+    this.homeDir = config.getHomeDir();
+    this.clientConfigFile = config.getHDFSClientConfigFile();
+    setHDFSCompactionConfig(config.getHDFSCompactionConfig());
+    this.blockCacheSize = config.getBlockCacheSize();
+    setHDFSEventQueueAttributes(config.getHDFSEventQueueAttributes());
+    this.maxFileSize = config.getMaxFileSize();
+    this.fileRolloverInterval = config.getFileRolloverInterval();
+    setMinorCompaction(config.getMinorCompaction());
+  }
+  
+  public void resetDefaultValues() {
+    name = null;
+    namenodeURL = null;
+    homeDir = null;
+    clientConfigFile = null;
+    blockCacheSize = -1f;
+    maxFileSize = -1;
+    fileRolloverInterval = -1;
+    
+    compactionConfig.resetDefaultValues();
+    isAutoCompact = false;
+    
+    // TODO reset hdfseventqueueattributes;
+  }
+  
+  public void copyFrom(HDFSStoreMutator mutator) {
+    if (mutator.getFileRolloverInterval() >= 0) {
+      logAttrMutation("fileRolloverInterval", mutator.getFileRolloverInterval());
+      setFileRolloverInterval(mutator.getFileRolloverInterval());
+    }
+    if (mutator.getMaxFileSize() >= 0) {
+      logAttrMutation("MaxFileSize", mutator.getFileRolloverInterval());
+      setMaxFileSize(mutator.getMaxFileSize());
+    }
+    
+    compactionConfig.copyFrom(mutator.getCompactionConfigMutator());
+    if (mutator.getMinorCompaction() != null) {
+      logAttrMutation("MinorCompaction", mutator.getMinorCompaction());
+      setMinorCompaction(mutator.getMinorCompaction());
+    }
+    
+    HDFSEventQueueAttributesFactory newFactory = new HDFSEventQueueAttributesFactory(hdfsEventQueueAttrs);
+    HDFSEventQueueAttributesMutator qMutator = mutator.getHDFSEventQueueAttributesMutator();
+
+    if (qMutator.getBatchSizeMB() >= 0) {
+      logAttrMutation("batchSizeMB", mutator.getFileRolloverInterval());
+      newFactory.setBatchSizeMB(qMutator.getBatchSizeMB());
+    }
+    if (qMutator.getBatchTimeInterval() >= 0) {
+      logAttrMutation("batchTimeInterval", mutator.getFileRolloverInterval());
+      newFactory.setBatchTimeInterval(qMutator.getBatchTimeInterval());
+    }
+    hdfsEventQueueAttrs = newFactory.create();
+  }
+
+  void logAttrMutation(String name, Object value) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}Alter " + name + ":" + value, logPrefix);
+    }
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+  @Override
+  public HDFSStoreFactory setName(String name) {
+    this.name = name;
+    return this;
+  }
+
+  @Override
+  public String getNameNodeURL() {
+    return namenodeURL;
+  }
+  @Override
+  public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
+    this.namenodeURL = namenodeURL;
+    return this;
+  }
+
+  @Override
+  public String getHomeDir() {
+    return homeDir;
+  }
+  @Override
+  public HDFSStoreFactory setHomeDir(String homeDir) {
+    this.homeDir = homeDir;
+    return this;
+  }
+
+  @Override
+  public String getHDFSClientConfigFile() {
+    return clientConfigFile;
+  }
+  @Override
+  public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
+    this.clientConfigFile = clientConfigFile;
+    return this;
+  }
+  
+  @Override
+  public HDFSStoreFactory setBlockCacheSize(float percentage) {
+    if(percentage < 0 || percentage > 100) {
+      throw new IllegalArgumentException("Block cache size must be between 0 and 100, inclusive");
+    }
+    this.blockCacheSize  = percentage;
+    return this;
+  }
+  
+  @Override
+  public float getBlockCacheSize() {
+    return blockCacheSize;
+  }
+  
+  /**
+   * Sets the HDFS event queue attributes
+   * This causes the store to use the {@link HDFSEventQueueAttributes}.
+   * @param hdfsEventQueueAttrs the attributes of the HDFS Event queue
+   */
+  public HDFSStoreFactory setHDFSEventQueueAttributes(HDFSEventQueueAttributes hdfsEventQueueAttrs) {
+    this.hdfsEventQueueAttrs  = hdfsEventQueueAttrs;
+    return this;
+  }
+  @Override
+  public HDFSEventQueueAttributes getHDFSEventQueueAttributes() {
+    return hdfsEventQueueAttrs;
+  }
+
+  @Override
+  public AbstractHDFSCompactionConfigHolder getHDFSCompactionConfig() {
+    return compactionConfig;
+  }
+  @Override
+  public HDFSStoreConfigHolder setHDFSCompactionConfig(HDFSCompactionConfig config) {
+    if (config == null) {
+      return this;
+    }
+    
+    String s = config.getCompactionStrategy();
+    compactionConfig = AbstractHDFSCompactionConfigHolder.createInstance(s);
+    compactionConfig.copyFrom(config);
+    return this;
+  }
+  @Override
+  public HDFSCompactionConfigFactory createCompactionConfigFactory(String name) {
+    return AbstractHDFSCompactionConfigHolder.createInstance(name);
+  }
+  
+  @Override
+  public HDFSStoreFactory setMaxFileSize(int maxFileSize) {
+    assertIsPositive(CacheXml.HDFS_WRITE_ONLY_FILE_ROLLOVER_INTERVAL, maxFileSize);
+    this.maxFileSize = maxFileSize;
+    return this;
+  }
+  @Override
+  public int getMaxFileSize() {
+    return maxFileSize;
+  }
+
+  @Override
+  public HDFSStoreFactory setFileRolloverInterval(int count) {
+    assertIsPositive(CacheXml.HDFS_TIME_FOR_FILE_ROLLOVER, count);
+    this.fileRolloverInterval = count;
+    return this;
+  }
+  @Override
+  public int getFileRolloverInterval() {
+    return fileRolloverInterval;
+  }
+  
+  @Override
+  public boolean getMinorCompaction() {
+    return isAutoCompact;
+  }
+  @Override
+  public HDFSStoreFactory setMinorCompaction(boolean auto) {
+    this.isAutoCompact = auto;
+    return this;
+  }
+
+  /**
+   * Abstract config class for compaction configuration. A concrete class must
+   * extend setters for all configurations it consumes. This class will throw an
+   * exception for any unexpected configuration. Concrete class must also
+   * validate the configuration
+   * 
+   * @author ashvina
+   */
+  public static abstract class AbstractHDFSCompactionConfigHolder implements
+      HDFSCompactionConfig, HDFSCompactionConfigFactory , Serializable{
+    protected int maxInputFileSizeMB = HDFSCompactionConfig.DEFAULT_MAX_INPUT_FILE_SIZE_MB;
+    protected int maxInputFileCount = HDFSCompactionConfig.DEFAULT_MAX_INPUT_FILE_COUNT;
+    protected int minInputFileCount = HDFSCompactionConfig.DEFAULT_MIN_INPUT_FILE_COUNT;
+
+    protected int maxConcurrency = HDFSCompactionConfig.DEFAULT_MAX_THREADS;
+    
+    protected boolean autoMajorCompact = HDFSCompactionConfig.DEFAULT_AUTO_MAJOR_COMPACTION;
+    protected int majorCompactionConcurrency = HDFSCompactionConfig.DEFAULT_MAJOR_COMPACTION_MAX_THREADS;
+    protected int majorCompactionIntervalMins = HDFSCompactionConfig.DEFAULT_MAJOR_COMPACTION_INTERVAL_MINS;
+    protected int oldFileCleanupIntervalMins = HDFSCompactionConfig.DEFAULT_OLD_FILE_CLEANUP_INTERVAL_MINS;
+    
+    
+    public AbstractHDFSCompactionConfigHolder() {
+      
+    }
+    
+    void copyFrom(HDFSCompactionConfig config) {
+      setMaxInputFileSizeMB(config.getMaxInputFileSizeMB());
+      setMaxInputFileCount(config.getMaxInputFileCount());
+      setMinInputFileCount(config.getMinInputFileCount());
+      setMaxThreads(config.getMaxThreads());
+      setAutoMajorCompaction(config.getAutoMajorCompaction());
+      setMajorCompactionMaxThreads(config.getMajorCompactionMaxThreads());
+      setMajorCompactionIntervalMins(config.getMajorCompactionIntervalMins());
+      setOldFilesCleanupIntervalMins(config.getOldFilesCleanupIntervalMins());
+    }
+    
+    void copyFrom(HDFSCompactionConfigMutator mutator) {
+      if (mutator.getMaxInputFileCount() >= 0) {
+        logAttrMutation("maxInputFileCount", mutator.getMaxInputFileCount());
+        setMaxInputFileCount(mutator.getMaxInputFileCount());
+      }
+      if (mutator.getMaxInputFileSizeMB() >= 0) {
+        logAttrMutation("MaxInputFileSizeMB", mutator.getMaxInputFileSizeMB());
+        setMaxInputFileSizeMB(mutator.getMaxInputFileSizeMB());
+      }
+      if (mutator.getMaxThreads() >= 0) {
+        logAttrMutation("MaxThreads", mutator.getMaxThreads());
+        setMaxThreads(mutator.getMaxThreads());
+      }
+      if (mutator.getMinInputFileCount() >= 0) {
+        logAttrMutation("MinInputFileCount", mutator.getMinInputFileCount());
+        setMinInputFileCount(mutator.getMinInputFileCount());
+      }
+      
+      if (mutator.getMajorCompactionIntervalMins() > -1) {
+        logAttrMutation("MajorCompactionIntervalMins", mutator.getMajorCompactionIntervalMins());
+        setMajorCompactionIntervalMins(mutator.getMajorCompactionIntervalMins());
+      }
+      if (mutator.getMajorCompactionMaxThreads() >= 0) {
+        logAttrMutation("MajorCompactionMaxThreads", mutator.getMajorCompactionMaxThreads());
+        setMajorCompactionMaxThreads(mutator.getMajorCompactionMaxThreads());
+      }
+      if (mutator.getAutoMajorCompaction() != null) {
+        logAttrMutation("AutoMajorCompaction", mutator.getAutoMajorCompaction());
+        setAutoMajorCompaction(mutator.getAutoMajorCompaction());
+      }
+      
+      if (mutator.getOldFilesCleanupIntervalMins() >= 0) {
+        logAttrMutation("OldFilesCleanupIntervalMins", mutator.getOldFilesCleanupIntervalMins());
+        setOldFilesCleanupIntervalMins(mutator.getOldFilesCleanupIntervalMins());
+      }
+    }
+    
+    void logAttrMutation(String name, Object value) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Alter " + name + ":" + value);
+      }
+    }
+    
+    public void resetDefaultValues() {
+      maxInputFileSizeMB = -1;
+      maxInputFileCount = -1;
+      minInputFileCount = -1;
+      maxConcurrency = -1;
+
+      autoMajorCompact = false;
+      majorCompactionConcurrency = -1;
+      majorCompactionIntervalMins = -1;
+      oldFileCleanupIntervalMins = -1;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMaxInputFileSizeMB(int size) {
+      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
+    }
+    @Override
+    public int getMaxInputFileSizeMB() {
+      return maxInputFileSizeMB;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMinInputFileCount(int count) {
+      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
+    }
+    @Override
+    public int getMinInputFileCount() {
+      return minInputFileCount;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMaxInputFileCount(int size) {
+      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
+    }
+    @Override
+    public int getMaxInputFileCount() {
+      return maxInputFileCount;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMaxThreads(int count) {
+      assertIsPositive(CacheXml.HDFS_MINOR_COMPACTION_THREADS, count);
+      this.maxConcurrency = count;
+      return this;
+    }
+    @Override
+    public int getMaxThreads() {
+      return maxConcurrency;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setAutoMajorCompaction(boolean auto) {
+      this.autoMajorCompact = auto;
+      return this;
+    }
+    @Override
+    public boolean getAutoMajorCompaction() {
+      return autoMajorCompact;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMajorCompactionIntervalMins(int count) {
+      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
+    }
+    @Override
+    public int getMajorCompactionIntervalMins() {
+      return majorCompactionIntervalMins;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMajorCompactionMaxThreads(int count) {
+      throw new GemFireConfigException("This configuration is not applicable to configured compaction strategy");
+    }
+    @Override
+    public int getMajorCompactionMaxThreads() {
+      return majorCompactionConcurrency;
+    }
+    
+    
+    @Override
+    public int getOldFilesCleanupIntervalMins() {
+      return oldFileCleanupIntervalMins ;
+    }    
+    @Override
+    public HDFSCompactionConfigFactory setOldFilesCleanupIntervalMins(int interval) {
+      assertIsPositive(CacheXml.HDFS_PURGE_INTERVAL, interval);
+      this.oldFileCleanupIntervalMins = interval;
+      return this;
+    }
+
+    @Override
+    public HDFSCompactionConfig getConfigView() {
+      return (HDFSCompactionConfig) this;
+    }
+    
+    @Override
+    public HDFSCompactionConfig create() throws GemFireConfigException {
+      AbstractHDFSCompactionConfigHolder config = createInstance(getCompactionStrategy());
+      config.copyFrom(this);
+      config.validate();
+      return config;
+    }
+    
+    protected void validate() {
+    }
+
+    public static AbstractHDFSCompactionConfigHolder createInstance(String name) {
+      if (name == null) {
+        name = DEFAULT_STRATEGY;
+      }
+
+      if (name.equalsIgnoreCase(SIZE_ORIENTED)) {
+        return new SizeTieredHdfsCompactionConfigHolder();
+      }
+
+      return new InvalidCompactionConfigHolder();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("AbstractHDFSCompactionConfigHolder@");
+      builder.append(System.identityHashCode(this));
+      builder.append("[autoMajorCompact=");
+      builder.append(autoMajorCompact);
+      builder.append(", ");
+      if (maxInputFileSizeMB > -1) {
+        builder.append("maxInputFileSizeMB=");
+        builder.append(maxInputFileSizeMB);
+        builder.append(", ");
+      }
+      if (maxInputFileCount > -1) {
+        builder.append("maxInputFileCount=");
+        builder.append(maxInputFileCount);
+        builder.append(", ");
+      }
+      if (minInputFileCount > -1) {
+        builder.append("minInputFileCount=");
+        builder.append(minInputFileCount);
+        builder.append(", ");
+      }
+      if (maxConcurrency > -1) {
+        builder.append("maxConcurrency=");
+        builder.append(maxConcurrency);
+        builder.append(", ");
+      }
+      if (majorCompactionConcurrency > -1) {
+        builder.append("majorCompactionConcurrency=");
+        builder.append(majorCompactionConcurrency);
+        builder.append(", ");
+      }
+      if (majorCompactionIntervalMins > -1) {
+        builder.append("majorCompactionIntervalMins=");
+        builder.append(majorCompactionIntervalMins);
+        builder.append(", ");
+      }
+      if (oldFileCleanupIntervalMins > -1) {
+        builder.append("oldFileCleanupIntervalMins=");
+        builder.append(oldFileCleanupIntervalMins);
+      }
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+  
+  public static class InvalidCompactionConfigHolder extends AbstractHDFSCompactionConfigHolder {
+    @Override
+    public String getCompactionStrategy() {
+      return INVALID;
+    }
+  }
+
+  /**
+   * This method should not be called on this class.
+   * @see HDFSStoreFactory#create(String)
+   */
+  @Override
+  public HDFSStore create(String name) throws GemFireConfigException,
+      StoreExistsException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * This method should not be called on this class.
+   * @see HDFSStoreImpl#destroy()
+   */
+  @Override
+  public void destroy() {
+    throw new UnsupportedOperationException();
+  }
+  
+  public static void assertIsPositive(String name, int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+              .toLocalizedString(new Object[] { name, count }));
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("HDFSStoreConfigHolder@");
+    builder.append(System.identityHashCode(this));
+    builder.append(" [name=");
+    builder.append(name);
+    builder.append(", ");
+    if (namenodeURL != null) {
+      builder.append("namenodeURL=");
+      builder.append(namenodeURL);
+      builder.append(", ");
+    }
+    if (homeDir != null) {
+      builder.append("homeDir=");
+      builder.append(homeDir);
+      builder.append(", ");
+    }
+    if (clientConfigFile != null) {
+      builder.append("clientConfigFile=");
+      builder.append(clientConfigFile);
+      builder.append(", ");
+    }
+    if (blockCacheSize > -1) {
+      builder.append("blockCacheSize=");
+      builder.append(blockCacheSize);
+      builder.append(", ");
+    }
+    if (maxFileSize > -1) {
+      builder.append("maxFileSize=");
+      builder.append(maxFileSize);
+      builder.append(", ");
+    }
+    if (fileRolloverInterval > -1) {
+      builder.append("fileRolloverInterval=");
+      builder.append(fileRolloverInterval);
+      builder.append(", ");
+    }
+    builder.append("minorCompaction=");
+    builder.append(isAutoCompact);
+    builder.append(", ");
+
+    if (compactionConfig != null) {
+      builder.append("compactionConfig=");
+      builder.append(compactionConfig);
+      builder.append(", ");
+    }
+    if (hdfsEventQueueAttrs != null) {
+      builder.append("hdfsEventQueueAttrs=");
+      builder.append(hdfsEventQueueAttrs);
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+
+  @Override
+  public HDFSStoreMutator createHdfsStoreMutator() {
+    // as part of alter execution, hdfs store will replace the config holder
+    // completely. Hence mutator at the config holder is not needed
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public HDFSStore alter(HDFSStoreMutator mutator) {
+    // as part of alter execution, hdfs store will replace the config holder
+    // completely. Hence mutator at the config holder is not needed
+    throw new UnsupportedOperationException();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/31d1b20e/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
new file mode 100644
index 0000000..b5fbfe8
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/HDFSStoreCreation.java
@@ -0,0 +1,225 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *========================================================================
+ */
+
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.hdfs.HDFSEventQueueAttributes;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore.HDFSCompactionConfig;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.StoreExistsException;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreConfigHolder.AbstractHDFSCompactionConfigHolder;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+/**
+ * HDFS store configuration.
+ * 
+ * <pre>
+ * {@code
+ * <hdfs-store name="" home-dir="" namenode-url="">
+ * <hdfs-compaction strategy="" auto-compact="" max-input-file-size-mb="" 
+ *                  min-input-file-count="" max-input-file-count="" 
+ *                  max-concurrency="" auto-major-compaction="" 
+ *                  major-compaction-interval-mins="" major-compaction-concurrency=""/>
+ * </hdfs-store>
+ * }
+ * </pre>
+ * 
+ * @author ashvina
+ */
+public class HDFSStoreCreation implements HDFSStoreFactory {
+  protected HDFSStoreConfigHolder configHolder;
+  
+  public HDFSStoreCreation() {
+    this(null);
+  }
+
+  /**
+   * Copy constructor for HDFSStoreCreation
+   * @param config configuration source for creating this instance 
+   */
+  public HDFSStoreCreation(HDFSStoreCreation config) {
+    this.configHolder = new HDFSStoreConfigHolder(config == null ? null
+        : config.configHolder);
+  }
+
+  @Override
+  public HDFSStoreFactory setName(String name) {
+    configHolder.setName(name);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setNameNodeURL(String namenodeURL) {
+    configHolder.setNameNodeURL(namenodeURL);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setHomeDir(String homeDir) {
+    configHolder.setHomeDir(homeDir);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setHDFSClientConfigFile(String clientConfigFile) {
+    configHolder.setHDFSClientConfigFile(clientConfigFile);
+    return this;
+  }
+  
+  @Override
+  public HDFSStoreFactory setBlockCacheSize(float percentage) {
+    configHolder.setBlockCacheSize(percentage);
+    return this;
+  }
+  
+  /**
+   * Sets the HDFS event queue attributes
+   * This causes the store to use the {@link HDFSEventQueueAttributes}.
+   * @param hdfsEventQueueAttrs the attributes of the HDFS Event queue
+   * @return a reference to this RegionFactory object
+   * 
+   */
+  public HDFSStoreFactory setHDFSEventQueueAttributes(HDFSEventQueueAttributes hdfsEventQueueAttrs) {
+    configHolder.setHDFSEventQueueAttributes(hdfsEventQueueAttrs);
+    return this;
+  }
+  
+  @Override
+  public HDFSEventQueueAttributes getHDFSEventQueueAttributes() {
+    return configHolder.getHDFSEventQueueAttributes();
+  }
+
+  @Override
+  public HDFSStoreFactory setHDFSCompactionConfig(HDFSCompactionConfig config) {
+    configHolder.setHDFSCompactionConfig(config);
+    return this;
+  }
+
+  @Override
+  public HDFSCompactionConfigFactory createCompactionConfigFactory(String name) {
+    return configHolder.createCompactionConfigFactory(name);
+  }
+  @Override
+  
+  public HDFSStoreFactory setMaxFileSize(int maxFileSize) {
+    configHolder.setMaxFileSize(maxFileSize);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setFileRolloverInterval(int count) {
+    configHolder.setFileRolloverInterval(count);
+    return this;
+  }
+
+  @Override
+  public HDFSStoreFactory setMinorCompaction(boolean auto) {
+    configHolder.setMinorCompaction(auto);
+    return this;
+  }
+  
+  /**
+   * Config class for compaction configuration. A concrete class must
+   * extend setters for all configurations it consumes. This class will throw an
+   * exception for any unexpected configuration. Concrete class must also
+   * validate the configuration
+   * 
+   * @author ashvina
+   */
+  public static class HDFSCompactionConfigFactoryImpl implements
+      HDFSCompactionConfigFactory {
+    private AbstractHDFSCompactionConfigHolder configHolder;
+
+    @Override
+    public HDFSCompactionConfigFactory setMaxInputFileSizeMB(int size) {
+      configHolder.setMaxInputFileSizeMB(size);
+      return this;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMinInputFileCount(int count) {
+      configHolder.setMinInputFileCount(count);
+      return this;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMaxInputFileCount(int count) {
+      configHolder.setMaxInputFileCount(count);
+      return this;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMaxThreads(int count) {
+      configHolder.setMaxThreads(count);
+      return this;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setAutoMajorCompaction(boolean auto) {
+      configHolder.setAutoMajorCompaction(auto);
+      return this;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMajorCompactionIntervalMins(int count) {
+      configHolder.setMajorCompactionIntervalMins(count);
+      return this;
+    }
+
+    @Override
+    public HDFSCompactionConfigFactory setMajorCompactionMaxThreads(int count) {
+      configHolder.setMajorCompactionMaxThreads(count);
+      return this;
+    }
+        
+    @Override
+    public HDFSCompactionConfigFactory setOldFilesCleanupIntervalMins(int interval) {
+      configHolder.setOldFilesCleanupIntervalMins(interval);
+      return this;
+    }
+
+    @Override
+    public HDFSCompactionConfig getConfigView() {
+      return configHolder.getConfigView();
+    }
+    
+    @Override
+    public HDFSCompactionConfig create() throws GemFireConfigException {
+      HDFSCompactionConfigFactoryImpl config = createInstance(configHolder.getCompactionStrategy());
+      config.configHolder.copyFrom(this.configHolder);
+      config.configHolder.validate();
+      return (HDFSCompactionConfig) config.configHolder;
+    }
+    
+    private static HDFSCompactionConfigFactoryImpl createInstance(String name) {
+      HDFSCompactionConfigFactoryImpl impl = new HDFSCompactionConfigFactoryImpl();
+      impl.configHolder = AbstractHDFSCompactionConfigHolder.createInstance(name);
+      return impl;
+    }
+  }
+  
+  /**
+   * This method should not be called on this class.
+   * @see HDFSStoreFactory#create(String)
+   */
+  @Override
+  public HDFSStore create(String name) throws GemFireConfigException,
+      StoreExistsException {
+    throw new UnsupportedOperationException();
+  }
+
+  public static void assertIsPositive(String name, int count) {
+    if (count < 1) {
+      throw new IllegalArgumentException(
+          LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE
+              .toLocalizedString(new Object[] { name, count }));
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message