geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject [07/25] incubator-geode git commit: GEODE-10: Reinstating HDFS persistence code
Date Wed, 27 Apr 2016 20:49:53 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
new file mode 100644
index 0000000..3ba7086
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/RegionWithHDFSTestBase.java
@@ -0,0 +1,719 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import static com.gemstone.gemfire.test.dunit.Wait.waitForCriterion;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+import com.gemstone.gemfire.cache.hdfs.HDFSIOException;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.SequenceFileHoplog;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.FileUtil;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+@SuppressWarnings({"serial", "rawtypes", "unchecked"})
+public abstract class RegionWithHDFSTestBase extends CacheTestCase {
+
+  protected String tmpDir;
+
+  public static String homeDir = null;
+
+  protected abstract void checkWithGetAll(String uniqueName, ArrayList arrayl);
+
+  protected abstract void checkWithGet(String uniqueName, int start,
+      int end, boolean expectValue);
+
+  protected abstract void doDestroys(final String uniqueName, int start, int end);
+
+  protected abstract void doPutAll(final String uniqueName, Map map);
+
+  protected abstract void doPuts(final String uniqueName, int start, int end);
+
+  protected abstract SerializableCallable getCreateRegionCallable(final int totalnumOfBuckets, final int batchSizeMB,
+      final int maximumEntries, final String folderPath, final String uniqueName, final int batchInterval, final boolean queuePersistent, 
+      final boolean writeonly, final long timeForRollover, final long maxFileSize);
+  
+  protected abstract void verifyHDFSData(VM vm, String uniqueName) throws Exception ;
+  
+  protected abstract AsyncInvocation doAsyncPuts(VM vm, final String regionName,
+                                                 final int start, final int end, final String suffix) throws Exception;
+  
+  public RegionWithHDFSTestBase(String name) {
+    super(name);
+  }
+
+  @Override
+  public void preTearDownCacheTestCase() throws Exception {
+    super.preTearDownCacheTestCase();
+    for (int h = 0; h < Host.getHostCount(); h++) {
+      Host host = Host.getHost(h);
+      SerializableCallable cleanUp = cleanUpStoresAndDisconnect();
+      for (int v = 0; v < host.getVMCount(); v++) {
+        VM vm = host.getVM(v);
+        // This store will be deleted by the first VM itself. Invocations from
+        // subsequent VMs will be no-op.
+        vm.invoke(cleanUp);
+      }
+    }
+  }
+
+  public SerializableCallable cleanUpStoresAndDisconnect() throws Exception {
+    SerializableCallable cleanUp = new SerializableCallable("cleanUpStoresAndDisconnect") {
+      public Object call() throws Exception {
+        disconnectFromDS();
+        File file;
+        if (homeDir != null) {
+          file = new File(homeDir);
+          FileUtil.delete(file);
+          homeDir = null;
+        }
+        file = new File(tmpDir);
+        FileUtil.delete(file);
+        return 0;
+      }
+    };
+    return cleanUp;
+  }
+
+  @Override
+  public void preSetUp() throws Exception {
+    super.preSetUp();
+    tmpDir = /*System.getProperty("java.io.tmpdir") + "/" +*/ "RegionWithHDFSBasicDUnitTest_" + System.nanoTime();
+  }
+  
+  int createServerRegion(VM vm, final int totalnumOfBuckets, 
+      final int batchSize, final int maximumEntries, final String folderPath, 
+      final String uniqueName, final int batchInterval) {
+    return createServerRegion(vm, totalnumOfBuckets, 
+        batchSize, maximumEntries, folderPath, 
+        uniqueName, batchInterval, false, false);
+  }
+
+  protected int createServerRegion(VM vm, final int totalnumOfBuckets, 
+      final int batchSizeMB, final int maximumEntries, final String folderPath, 
+      final String uniqueName, final int batchInterval, final boolean writeonly,
+      final boolean queuePersistent) {
+    return createServerRegion(vm, totalnumOfBuckets, 
+        batchSizeMB, maximumEntries, folderPath, 
+        uniqueName, batchInterval, writeonly, queuePersistent, -1, -1);
+  }
+  protected int createServerRegion(VM vm, final int totalnumOfBuckets, 
+      final int batchSizeMB, final int maximumEntries, final String folderPath, 
+      final String uniqueName, final int batchInterval, final boolean writeonly,
+      final boolean queuePersistent, final long timeForRollover, final long maxFileSize) {
+    SerializableCallable createRegion = getCreateRegionCallable(
+        totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName,
+        batchInterval, queuePersistent, writeonly, timeForRollover, maxFileSize);
+
+    return (Integer) vm.invoke(createRegion);
+  }
+  protected AsyncInvocation createServerRegionAsync(VM vm, final int totalnumOfBuckets, 
+      final int batchSizeMB, final int maximumEntries, final String folderPath, 
+      final String uniqueName, final int batchInterval, final boolean writeonly,
+      final boolean queuePersistent) {
+    SerializableCallable createRegion = getCreateRegionCallable(
+        totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName,
+        batchInterval, queuePersistent, writeonly, -1, -1);
+
+    return vm.invokeAsync(createRegion);
+  }
+  protected AsyncInvocation createServerRegionAsync(VM vm, final int totalnumOfBuckets, 
+      final int batchSizeMB, final int maximumEntries, final String folderPath, 
+      final String uniqueName, final int batchInterval, final boolean writeonly,
+      final boolean queuePersistent, final long timeForRollover, final long maxFileSize) {
+    SerializableCallable createRegion = getCreateRegionCallable(
+        totalnumOfBuckets, batchSizeMB, maximumEntries, folderPath, uniqueName,
+        batchInterval, queuePersistent, writeonly, timeForRollover, maxFileSize);
+
+    return vm.invokeAsync(createRegion);
+  }
+  
+  /**
+   * Does puts, gets, destroy and getAll. Since there are many updates 
+   * most of the time the data is not found in memory and queue and 
+   * is fetched from HDFS
+   * @throws Throwable 
+   */
+  public void testGetFromHDFS() throws Throwable {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    final String uniqueName = getName();
+    final String homeDir = "../../testGetFromHDFS";
+    
+    createServerRegion(vm0, 7, 1, 50, homeDir, uniqueName, 50, false, true);
+    createServerRegion(vm1, 7, 1, 50, homeDir, uniqueName, 50, false, true);
+    
+    // Do some puts
+    vm0.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        doPuts(uniqueName, 0, 40);
+        return null;
+      }
+    });
+    
+    // Do some puts and destroys 
+    // some order manipulation has been done because of an issue: 
+    // " a higher version update on a key can be batched and 
+    // sent to HDFS before a lower version update on the same key 
+    // is batched and sent to HDFS. This will cause the latest 
+    // update on a key in an older file. Hence, a fetch from HDFS 
+    // will return an older update from a newer file."
+    
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        doPuts(uniqueName, 40, 50);
+        doDestroys(uniqueName, 40, 50);
+        doPuts(uniqueName, 50, 100);
+        doPuts(uniqueName, 30, 40);
+        return null;
+      }
+    });
+    
+    // do some more puts and destroy 
+    // some order manipulation has been done because of an issue: 
+    // " a higher version update on a key can be batched and 
+    // sent to HDFS before a lower version update on the same key 
+    // is batched and sent to HDFS. This will cause the latest 
+    // update on a key in an older file. Hence, a fetch from HDFS 
+    // will return an older update from a newer file."
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        doPuts(uniqueName, 80, 90);
+        doDestroys(uniqueName, 80, 90);
+        doPuts(uniqueName, 110, 200);
+        doPuts(uniqueName, 90, 110);
+        return null;
+      }
+      
+    });
+    
+    // get and getall the values and compare them. 
+    SerializableCallable checkData = new SerializableCallable() {
+      public Object call() throws Exception {
+        checkWithGet(uniqueName, 0, 40, true);
+        checkWithGet(uniqueName, 40, 50, false);
+        checkWithGet(uniqueName, 50, 80, true);
+        checkWithGet(uniqueName, 80, 90, false);
+        checkWithGet(uniqueName, 90, 200, true);
+        checkWithGet(uniqueName, 200, 201, false);
+        
+        ArrayList arrayl = new ArrayList();
+        for (int i =0; i< 200; i++) {
+          String k = "K" + i;
+          if ( !((40 <= i && i < 50) ||   (80 <= i && i < 90)))
+            arrayl.add(k);
+        }
+        checkWithGetAll(uniqueName, arrayl);
+        
+        return null;
+      }
+    };
+    vm1.invoke(checkData);
+    
+    //Restart the members and verify that we can still get the data
+    closeCache(vm0);
+    closeCache(vm1);
+    AsyncInvocation async0 = createServerRegionAsync(vm0, 7, 1, 50, homeDir, uniqueName, 50, false, true);
+    AsyncInvocation async1 = createServerRegionAsync(vm1, 7, 1, 50, homeDir, uniqueName, 50, false, true);
+    
+    async0.getResult();
+    async1.getResult();
+    
+    
+    // get and getall the values and compare them.
+    vm1.invoke(checkData);
+  
+    //TODO:HDFS we are just reading the files here. Need to verify 
+    // once the folder structure is finalized. 
+    dumpFiles(vm1, uniqueName);
+    
+  }
+
+  /**
+   * puts a few entries (keys with multiple updates ). Gets them immediately. 
+   * High probability that it gets it from async queue. 
+   */
+  public void testGetForAsyncQueue() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    
+    final String uniqueName = getName();
+    final String homeDir = "../../testGetForAsyncQueue";
+    
+    createServerRegion(vm0, 2, 5, 1, homeDir, uniqueName, 10000);
+    createServerRegion(vm1, 2, 5, 1, homeDir, uniqueName, 10000);
+    
+    vm0.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        doPuts(uniqueName, 0, 4);
+        return null;
+      }
+    });
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        doPuts(uniqueName, 0, 2);
+        doDestroys(uniqueName, 2, 3);
+        doPuts(uniqueName, 3, 7);
+        
+        checkWithGet(uniqueName, 0, 2, true);
+        checkWithGet(uniqueName, 2, 3, false);
+        checkWithGet(uniqueName, 3, 7, true);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * puts a few entries (keys with multiple updates ). Calls getAll immediately. 
+   * High probability that it gets it from async queue. 
+   */
+  public void testGetAllForAsyncQueue() {
+    
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    
+    final String uniqueName = getName();
+    createServerRegion(vm0, 2, 5, 2, uniqueName, uniqueName, 10000);
+    createServerRegion(vm1, 2, 5, 2, uniqueName, uniqueName, 10000);
+    
+    vm0.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        doPuts(uniqueName, 0, 4);
+        return null;
+      }
+    });
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        doPuts(uniqueName, 1, 5);
+  
+        ArrayList arrayl = new ArrayList();
+        for (int i =0; i< 5; i++) {
+          String k = "K" + i;
+          arrayl.add(k);
+        }
+        checkWithGetAll(uniqueName, arrayl);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * puts a few entries (keys with multiple updates ). Calls getAll immediately. 
+   * High probability that it gets it from async queue. 
+   */
+  public void testPutAllForAsyncQueue() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    
+    final String uniqueName = getName();
+    final String homeDir = "../../testPutAllForAsyncQueue";
+    createServerRegion(vm0, 2, 5, 2, homeDir, uniqueName, 10000);
+    createServerRegion(vm1, 2, 5, 2, homeDir, uniqueName, 10000);
+    
+    vm0.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        HashMap putAllmap = new HashMap();
+        for (int i =0; i< 4; i++)
+          putAllmap.put("K" + i, "V"+ i );
+        doPutAll(uniqueName, putAllmap);
+        return null;
+      }
+    });
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        HashMap putAllmap = new HashMap();
+        for (int i =1; i< 5; i++)
+          putAllmap.put("K" + i, "V"+ i );
+        doPutAll(uniqueName, putAllmap);
+        checkWithGet(uniqueName, 0, 5, true);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Does putAll and get. Since there are many updates 
+   * most of the time the data is not found in memory and queue and 
+   * is fetched from HDFS
+   */
+  public void _testPutAllAndGetFromHDFS() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    
+    final String uniqueName = getName();
+    final String homeDir = "../../testPutAllAndGetFromHDFS";
+    createServerRegion(vm0, 7, 1, 500, homeDir, uniqueName, 500);
+    createServerRegion(vm1, 7, 1, 500, homeDir, uniqueName, 500);
+    
+    // Do some puts
+    vm0.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+          
+        HashMap putAllmap = new HashMap();
+        
+        for (int i =0; i< 500; i++)
+          putAllmap.put("K" + i, "V"+ i );
+        doPutAll(uniqueName, putAllmap);
+        return null;
+      }
+    });
+    
+    // Do putAll and some  destroys 
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        HashMap putAllmap = new HashMap();
+        for (int i = 500; i< 1000; i++)
+          putAllmap.put("K" + i, "V"+ i );
+        doPutAll(uniqueName, putAllmap);
+        return null;
+      }
+    });
+    
+    // do some more puts 
+    // some order manipulation has been done because of an issue: 
+    // " a higher version update on a key can be batched and 
+    // sent to HDFS before a lower version update on the same key 
+    // is batched and sent to HDFS. This will cause the latest 
+    // update on a key in an older file. Hence, a fetch from HDFS 
+    // will return an older update from a newer file."
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        HashMap putAllmap = new HashMap();
+        for (int i =1100; i< 2000; i++)
+          putAllmap.put("K" + i, "V"+ i );
+        doPutAll(uniqueName, putAllmap);
+        putAllmap = new HashMap();
+        for (int i = 900; i< 1100; i++)
+          putAllmap.put("K" + i, "V"+ i );
+        doPutAll(uniqueName, putAllmap);
+        return null;
+      }
+      
+    });
+    
+    // get and getall the values and compare them. 
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        checkWithGet(uniqueName, 0, 2000, true);
+        checkWithGet(uniqueName, 2000,  2001, false);
+        
+        ArrayList arrayl = new ArrayList();
+        for (int i =0; i< 2000; i++) {
+          String k = "K" + i;
+          arrayl.add(k);
+        }
+        checkWithGetAll(uniqueName, arrayl);
+        return null;
+      }
+    });
+    
+  }
+
+  public void _testWObasicClose() throws Throwable{
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    
+    String homeDir = "../../testWObasicClose";
+    final String uniqueName = getName();
+
+    createServerRegion(vm0, 11, 1,  500, homeDir, uniqueName, 500, true, false);
+    createServerRegion(vm1, 11, 1,  500, homeDir, uniqueName, 500, true, false);
+    createServerRegion(vm2, 11, 1,  500, homeDir, uniqueName, 500, true, false);
+    createServerRegion(vm3, 11, 1,  500, homeDir, uniqueName, 500, true, false);
+    
+    AsyncInvocation a1 = doAsyncPuts(vm0, uniqueName, 1, 50, "vm0");
+    AsyncInvocation a2 = doAsyncPuts(vm1, uniqueName, 40, 100, "vm1");
+    AsyncInvocation a3 = doAsyncPuts(vm2, uniqueName, 40, 100, "vm2");
+    AsyncInvocation a4 = doAsyncPuts(vm3, uniqueName, 90, 150, "vm3");
+    
+    a1.join();
+    a2.join();
+    a3.join();
+    a4.join();
+   
+    Thread.sleep(5000); 
+    cacheClose (vm0, false);
+    cacheClose (vm1, false);
+    cacheClose (vm2, false);
+    cacheClose (vm3, false);
+    
+    AsyncInvocation async1 = createServerRegionAsync(vm0, 11, 1,  500, homeDir, uniqueName, 500, true, false);
+    AsyncInvocation async2 = createServerRegionAsync(vm1, 11, 1,  500, homeDir, uniqueName, 500, true, false);
+    AsyncInvocation async3 = createServerRegionAsync(vm2, 11, 1,  500, homeDir, uniqueName, 500, true, false);
+    AsyncInvocation async4 = createServerRegionAsync(vm3, 11, 1,  500, homeDir, uniqueName, 500, true, false);
+    async1.getResult();
+    async2.getResult();
+    async3.getResult();
+    async4.getResult();
+    
+    verifyHDFSData(vm0, uniqueName); 
+    
+    cacheClose (vm0, false);
+    cacheClose (vm1, false);
+    cacheClose (vm2, false);
+    cacheClose (vm3, false);
+  }
+  
+  
+  protected void cacheClose(VM vm, final boolean sleep){
+    vm.invoke( new SerializableCallable() {
+      public Object call() throws Exception {
+        if (sleep)
+          Thread.sleep(2000);
+        getCache().getLogger().info("Cache close in progress "); 
+        getCache().close();
+        getCache().getLogger().info("Cache closed");
+        return null;
+      }
+    });
+    
+  }
+  
+  protected void verifyInEntriesMap (HashMap<String, String> entriesMap, int start, int end, String suffix) {
+    for (int i =start; i< end; i++) {
+      String k = "K" + i;
+      String v = "V"+ i + suffix;
+      Object s = entriesMap.get(v);
+      assertTrue( "The expected key " + k+ " didn't match the received value " + s + ". value: " + v, k.equals(s));
+    }
+  }
+  
+  /**
+   * Reads all the sequence files and returns the list of key value pairs persisted. 
+   * Returns the key value pair as <value, key> tuple as there can be multiple values 
+   * for a key
+   * @throws Exception
+   */
+  protected HashMap<String, HashMap<String, String>>  createFilesAndEntriesMap(VM vm0, final String uniqueName, final String regionName) throws Exception {
+    HashMap<String, HashMap<String, String>> entriesToFileMap = (HashMap<String, HashMap<String, String>>) 
+    vm0.invoke( new SerializableCallable() {
+      public Object call() throws Exception {
+        HashMap<String, HashMap<String, String>> entriesToFileMap = new HashMap<String, HashMap<String, String>>();
+        HDFSStoreImpl hdfsStore = (HDFSStoreImpl) ((GemFireCacheImpl)getCache()).findHDFSStore(uniqueName);
+        FileSystem fs = hdfsStore.getFileSystem();
+        System.err.println("dumping file names in HDFS directory: " + hdfsStore.getHomeDir());
+        try {
+          Path basePath = new Path(hdfsStore.getHomeDir());
+          Path regionPath = new Path(basePath, regionName);
+          RemoteIterator<LocatedFileStatus> files = fs.listFiles(regionPath, true);
+          
+          while(files.hasNext()) {
+            HashMap<String, String> entriesMap = new HashMap<String, String>();
+            LocatedFileStatus next = files.next();
+            /* MergeGemXDHDFSToGFE - Disabled as I am not pulling in DunitEnv */
+            // System.err.println(DUnitEnv.get().getPid() + " - " + next.getPath());
+            System.err.println(" - " + next.getPath());
+            readSequenceFile(fs, next.getPath(), entriesMap);
+            entriesToFileMap.put(next.getPath().getName(), entriesMap);
+          }
+        } catch (FileNotFoundException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+        
+        return entriesToFileMap;
+      }
+      @SuppressWarnings("deprecation")
+      public void readSequenceFile(FileSystem inputFS, Path sequenceFileName,  
+          HashMap<String, String> entriesMap) throws IOException {
+        SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
+        HoplogIterator<byte[], byte[]> iter = hoplog.getReader().scan();
+        try {
+          while (iter.hasNext()) {
+            iter.next();
+            PersistedEventImpl te = UnsortedHoplogPersistedEvent.fromBytes(iter.getValue());
+            String stringkey = ((String)CacheServerHelper.deserialize(iter.getKey()));
+            String value = (String) te.getDeserializedValue();
+            entriesMap.put(value, stringkey);
+            if (getCache().getLoggerI18n().fineEnabled())
+              getCache().getLoggerI18n().fine("Key: " + stringkey + " value: " + value  + " path " + sequenceFileName.getName());
+          }
+        } catch (Exception e) {
+          assertTrue(e.toString(), false);
+        }
+        iter.close();
+        hoplog.close();
+     }
+    });
+    return entriesToFileMap;
+  }
+ protected SerializableCallable validateEmpty(VM vm0, final int numEntries, final String uniqueName) {
+    SerializableCallable validateEmpty = new SerializableCallable("validateEmpty") {
+      public Object call() throws Exception {
+        Region r = getRootRegion(uniqueName);
+        
+        assertTrue(r.isEmpty());
+        
+        //validate region is empty on peer as well
+        assertFalse(r.entrySet().iterator().hasNext());
+        //Make sure the region is empty
+        for (int i =0; i< numEntries; i++) {
+          assertEquals("failure on key K" + i , null, r.get("K" + i));
+        }
+        
+        return null;
+      }
+    };
+    
+    vm0.invoke(validateEmpty);
+    return validateEmpty;
+  }
+
+  protected void closeCache(VM vm0) {
+    //Restart and validate still empty.
+    SerializableRunnable closeCache = new SerializableRunnable("close cache") {
+      @Override
+      public void run() {
+        getCache().close();
+        disconnectFromDS();
+      }
+    };
+    
+    vm0.invoke(closeCache);
+  }
+
+  protected void verifyDataInHDFS(VM vm0, final String uniqueName, final boolean shouldHaveData,
+      final boolean wait, final boolean waitForQueueToDrain, final int numEntries) {
+        vm0.invoke(new SerializableCallable("check for data in hdfs") {
+          @Override
+          public Object call() throws Exception {
+            
+            HDFSRegionDirector director = HDFSRegionDirector.getInstance();
+            final SortedOplogStatistics stats = director.getHdfsRegionStats("/" + uniqueName);
+            waitForCriterion(new WaitCriterion() {
+              @Override
+              public boolean done() {
+                return stats.getActiveFileCount() > 0 == shouldHaveData;
+              }
+              
+              @Override
+              public String description() {
+                return "Waiting for active file count to be greater than 0: " + stats.getActiveFileCount() + " stats=" + System.identityHashCode(stats);
+              }
+            }, 30000, 100, true);
+            
+            if(waitForQueueToDrain) {
+              PartitionedRegion region = (PartitionedRegion) getCache().getRegion(uniqueName);
+              final AsyncEventQueueStats queueStats = region.getHDFSEventQueueStats();
+              waitForCriterion(new WaitCriterion() {
+                @Override
+                public boolean done() {
+                  return queueStats.getEventQueueSize() <= 0;
+                }
+                
+                @Override
+                public String description() {
+                  return "Waiting for queue stats to reach 0: " + queueStats.getEventQueueSize();
+                }
+              }, 30000, 100, true);
+            }
+            return null;
+          }
+        });
+      }
+
+  protected void doPuts(VM vm0, final String uniqueName, final int numEntries) {
+    // Do some puts
+    vm0.invoke(new SerializableCallable("do puts") {
+      public Object call() throws Exception {
+        Region r = getRootRegion(uniqueName);
+        for (int i =0; i< numEntries; i++)
+          r.put("K" + i, "V"+ i );
+        return null;
+      }
+    });
+  }
+
+  protected void validate(VM vm1, final String uniqueName, final int numEntries) {
+    SerializableCallable validate = new SerializableCallable("validate") {
+      public Object call() throws Exception {
+        Region r = getRootRegion(uniqueName);
+        
+        for (int i =0; i< numEntries; i++) {
+          assertEquals("failure on key K" + i , "V"+ i, r.get("K" + i));
+        }
+        
+        return null;
+      }
+    };
+    vm1.invoke(validate);
+  }
+
+  protected void dumpFiles(VM vm0, final String uniqueName) {
+    vm0.invoke(new SerializableRunnable() {
+  
+      @Override
+      public void run() {
+        HDFSStoreImpl hdfsStore = (HDFSStoreImpl) ((GemFireCacheImpl)getCache()).findHDFSStore(uniqueName);
+        FileSystem fs;
+        try {
+          fs = hdfsStore.getFileSystem();
+        } catch (IOException e1) {
+          throw new HDFSIOException(e1.getMessage(), e1);
+        }
+        System.err.println("dumping file names in HDFS directory: " + hdfsStore.getHomeDir());
+        try {
+          RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(hdfsStore.getHomeDir()), true);
+          
+          while(files.hasNext()) {
+            LocatedFileStatus next = files.next();
+            /* MergeGemXDHDFSToGFE - Disabled as I am not pulling in DunitEnv */
+            // System.err.println(DUnitEnv.get().getPid() + " - " + next.getPath());
+            System.err.println(" - " + next.getPath());
+          }
+        } catch (FileNotFoundException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        } catch (IOException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+        
+      }
+      
+    });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserverJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserverJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserverJUnitTest.java
new file mode 100644
index 0000000..26c7094
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SignalledFlushObserverJUnitTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.experimental.categories.Category;
+
+import junit.framework.TestCase;
+
+import com.gemstone.gemfire.cache.hdfs.internal.FlushObserver.AsyncFlushResult;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class SignalledFlushObserverJUnitTest extends TestCase {
+  private AtomicInteger events;
+  private AtomicInteger delivered;
+  
+  private SignalledFlushObserver sfo;
+  
+  public void testEmpty() throws InterruptedException {
+    assertFalse(sfo.shouldDrainImmediately());
+    assertTrue(sfo.flush().waitForFlush(0, TimeUnit.NANOSECONDS));
+    assertFalse(sfo.shouldDrainImmediately());
+  }
+  
+  public void testSingle() throws InterruptedException {
+    sfo.push();
+    AsyncFlushResult result = sfo.flush();
+
+    assertTrue(sfo.shouldDrainImmediately());
+    sfo.pop(1);
+    
+    assertTrue(result.waitForFlush(0, TimeUnit.MILLISECONDS));
+    assertFalse(sfo.shouldDrainImmediately());
+  }
+
+  public void testDouble() throws InterruptedException {
+    sfo.push();
+    sfo.push();
+
+    AsyncFlushResult result = sfo.flush();
+    assertTrue(sfo.shouldDrainImmediately());
+
+    sfo.pop(1);
+    assertFalse(result.waitForFlush(0, TimeUnit.MILLISECONDS));
+
+    sfo.pop(1);
+    assertTrue(result.waitForFlush(0, TimeUnit.MILLISECONDS));
+    assertFalse(sfo.shouldDrainImmediately());
+  }
+
+  public void testTimeout() throws InterruptedException {
+    sfo.push();
+    AsyncFlushResult result = sfo.flush();
+
+    assertTrue(sfo.shouldDrainImmediately());
+    assertFalse(result.waitForFlush(100, TimeUnit.MILLISECONDS));
+    sfo.pop(1);
+    
+    assertTrue(result.waitForFlush(0, TimeUnit.MILLISECONDS));
+    assertFalse(sfo.shouldDrainImmediately());
+  }
+  
+  @Override
+  protected void setUp() {
+    events = new AtomicInteger(0);
+    delivered = new AtomicInteger(0);
+    sfo = new SignalledFlushObserver();
+    AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
+  }
+  
+  private int push() {
+    return events.incrementAndGet();
+  }
+  
+  private int pop() {
+    return delivered.incrementAndGet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java
new file mode 100644
index 0000000..8a7fb34
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/SortedListForAsyncQueueJUnitTest.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache.hdfs.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.junit.experimental.categories.Category;
+
+import junit.framework.TestCase;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.asyncqueue.internal.ParallelAsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.KeyToSeqNumObject;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.MultiRegionSortedQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSBucketRegionQueue.SortedEventQueue;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.EntryEventImpl;
+import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderAttributes;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+/**
+ * A test class for testing whether the functionalities of sorted Aysync Queue.
+ * 
+ */
+@Category({IntegrationTest.class, HoplogTest.class})
+public class SortedListForAsyncQueueJUnitTest extends TestCase {
+  
+  public SortedListForAsyncQueueJUnitTest() {
+    super();
+  }
+
+  private GemFireCacheImpl c;
+
+  @Override
+  public void setUp() {
+    
+    System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
+ // make it a loner
+    this.c = createCache();
+    AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
+  }
+
+  protected GemFireCacheImpl createCache() {
+    return (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").set("log-level", "warning")
+        .create();
+  }
+
+  @Override
+  public void tearDown() {
+    this.c.close();
+  }
+  
+  public void testHopQueueWithOneBucket() throws Exception {
+    this.c.close();
+    this.c = createCache();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(1);
+    
+    RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION);
+    PartitionedRegion r1 = (PartitionedRegion) rf1.setPartitionAttributes(paf.create()).create("r1");
+    r1.put("K9", "x1");
+    r1.put("K8", "x2");
+    // hack to get the queue. 
+    HDFSParallelGatewaySenderQueue hopqueue = getHDFSQueue(r1, this.c);
+    HDFSBucketRegionQueue hdfsBQ = (HDFSBucketRegionQueue)((PartitionedRegion)hopqueue.getRegion()).getDataStore().getLocalBucketById(0);
+    
+    EntryEventImpl ev1 = EntryEventImpl.create((LocalRegion)r1, Operation.CREATE,
+        (Object)"K1", (Object)"V1", null,
+        false, (DistributedMember)c.getMyId());
+    // put some keys with multiple updates.
+    hopqueue.put(getNewEvent("K2", "V2", r1, 0, 2) );
+    hopqueue.put(getNewEvent("K3", "V3a", r1, 0, 8) );
+    hopqueue.put(getNewEvent("K3", "V3", r1, 0, 7) );
+    hopqueue.put(getNewEvent("K1", "V1", r1, 0, 3) );
+    hopqueue.put(getNewEvent("K2", "V2a", r1, 0, 6) );
+    hopqueue.put(getNewEvent("K3", "V3b", r1, 0, 9) );
+    
+    assertTrue(" skip list size should be  6 ", getSortedEventQueue(hdfsBQ).currentSkipList.size() == 6);
+    
+    
+    // peek a key. it should be the lowesy
+    Object[] l = hopqueue.peek(1, 0).toArray();
+    
+    assertTrue("First key should be K1 but is " + ((HDFSGatewayEventImpl)l[0]).getKey(), ((HDFSGatewayEventImpl)l[0]).getKey().equals("K1"));
+    assertTrue(" Peeked skip list size should be  0 ", getSortedEventQueue(hdfsBQ).getPeeked().size() == 6);
+    assertTrue(" skip list size should be  6 ", getSortedEventQueue(hdfsBQ).currentSkipList.size() == 0);
+    
+    // try to fetch the key. it would be in peeked skip list but still available
+    Object o = hopqueue.get(r1, CacheServerHelper.serialize("K1"), 0);
+    assertTrue("First key should be K1", ((HDFSGatewayEventImpl)o).getKey().equals("K1"));
+    
+    assertTrue(" skip lists size should be  6"  , ( getSortedEventQueue(hdfsBQ).getPeeked().size() + getSortedEventQueue(hdfsBQ).currentSkipList.size() ) == 6);
+    
+    o = hopqueue.get(r1, CacheServerHelper.serialize("K2"), 0);
+    Object v = ((HDFSGatewayEventImpl)o).getDeserializedValue();
+    assertTrue(" key should K2 with value V2a but the value was " + v , ((String)v).equals("V2a"));
+    
+    o = hopqueue.get(r1, CacheServerHelper.serialize("K3"), 0);
+    v = ((HDFSGatewayEventImpl)o).getDeserializedValue();
+    assertTrue(" key should K3 with value V3b but the value was " + v , ((String)v).equals("V3b"));
+  }
+
+  protected SortedEventQueue getSortedEventQueue(HDFSBucketRegionQueue hdfsBQ) {
+    MultiRegionSortedQueue multiQueue = (MultiRegionSortedQueue)(hdfsBQ.hdfsEventQueue);
+    return multiQueue.regionToEventQueue.values().iterator().next();
+  }
+  
+  public void testPeekABatch() throws Exception {
+    this.c.close();
+    this.c = createCache();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(1);
+    
+    RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION);
+    PartitionedRegion r1 = (PartitionedRegion) rf1.setPartitionAttributes(paf.create()).create("r1");
+    r1.put("K9", "x1");
+    r1.put("K8", "x2");
+    // hack to get the queue. 
+    HDFSParallelGatewaySenderQueue hopqueue = getHDFSQueue(r1, this.c);
+    HDFSBucketRegionQueue hdfsBQ = (HDFSBucketRegionQueue)((PartitionedRegion)hopqueue.getRegion()).getDataStore().getLocalBucketById(0);
+    
+    
+    // put some keys with multiple updates.
+    hopqueue.put(getNewEvent("K2", "V2", r1, 0, 2) );
+    hopqueue.put(getNewEvent("K3", "V3a", r1, 0, 8) );
+    hopqueue.put(getNewEvent("K3", "V3", r1, 0, 7) );
+    hopqueue.put(getNewEvent("K1", "V1", r1, 0, 3) );
+    hopqueue.put(getNewEvent("K2", "V2a", r1, 0, 6) );
+    hopqueue.put(getNewEvent("K3", "V3b", r1, 0, 9) );
+    
+    getSortedEventQueue(hdfsBQ).rollover(true);
+    
+    hopqueue.put(getNewEvent("K1", "V12", r1, 0, 11) );
+    hopqueue.put(getNewEvent("K5", "V3a", r1, 0, 12) );
+    hopqueue.put(getNewEvent("K5", "V3b", r1, 0, 13) );
+    
+    assertTrue(" skip list size should be  3 but is " + getSortedEventQueue(hdfsBQ).currentSkipList.size(), getSortedEventQueue(hdfsBQ).currentSkipList.size() == 3);
+    assertTrue(" skip list size should be  6 but is " + getSortedEventQueue(hdfsBQ).queueOfLists.peek().size(), getSortedEventQueue(hdfsBQ).queueOfLists.peek().size() == 6);
+    
+    Object o1 = hopqueue.get(r1, CacheServerHelper.serialize("K3"), 0);;
+    Object o2 = hopqueue.get(r1, CacheServerHelper.serialize("K1"), 0);;
+    Object v1 = ((HDFSGatewayEventImpl)o1).getDeserializedValue();
+    Object v2 = ((HDFSGatewayEventImpl)o2).getDeserializedValue();
+    assertTrue(" key should K3 with value V3b but the value was " + v1 , ((String)v1).equals("V3b"));
+    assertTrue(" key should K1 with value V12 but the value was " + v2 , ((String)v2).equals("V12"));
+    
+    
+    ArrayList a = hdfsBQ.peekABatch();
+    assertTrue("First key should be K1 but is " + ((HDFSGatewayEventImpl)a.get(0)).getKey(), ((HDFSGatewayEventImpl)a.get(0)).getKey().equals("K1"));
+    assertTrue("Second key should be K2 but is " + ((HDFSGatewayEventImpl)a.get(1)).getKey(), ((HDFSGatewayEventImpl)a.get(1)).getKey().equals("K2"));
+    assertTrue("Third key should be K2 but is " + ((HDFSGatewayEventImpl)a.get(2)).getKey(), ((HDFSGatewayEventImpl)a.get(2)).getKey().equals("K2"));
+    
+    
+    assertTrue(" Peeked skip list size should be 6 ", getSortedEventQueue(hdfsBQ).getPeeked().size() == 6);
+    assertTrue(" queueOfLists size should be  2 ", getSortedEventQueue(hdfsBQ).queueOfLists.size() == 2);
+    
+    assertTrue(" skip list size should be  3 ", getSortedEventQueue(hdfsBQ).currentSkipList.size() == 3);
+    
+    o1 = hopqueue.get(r1, CacheServerHelper.serialize("K3"), 0);;
+    o2 = hopqueue.get(r1, CacheServerHelper.serialize("K1"), 0);;
+    v1 = ((HDFSGatewayEventImpl)o1).getDeserializedValue();
+    v2 = ((HDFSGatewayEventImpl)o2).getDeserializedValue();
+    assertTrue(" key should K3 with value V3b but the value was " + v1 , ((String)v1).equals("V3b"));
+    assertTrue(" key should K1 with value V12 but the value was " + v2 , ((String)v2).equals("V12"));
+    
+    
+    java.util.Iterator<KeyToSeqNumObject> iter1 = getSortedEventQueue(hdfsBQ).getPeeked().iterator();
+    assertTrue("key in peeked list should be 3 ", iter1.next().getSeqNum() == 3);
+    assertTrue("key in peeked list should be 6 ", iter1.next().getSeqNum() == 6);
+    assertTrue("key in peeked list should be 2 ", iter1.next().getSeqNum() == 2);
+    assertTrue("key in peeked list should be 9 ", iter1.next().getSeqNum() == 9);
+    assertTrue("key in peeked list should be 8 ", iter1.next().getSeqNum() == 8);
+    assertTrue("key in peeked list should be 7 ", iter1.next().getSeqNum() == 7);
+    assertTrue(" Peeked list should not have any more elements. ", iter1.hasNext() == false);
+    
+    
+    java.util.Iterator<KeyToSeqNumObject> iter2 = getSortedEventQueue(hdfsBQ).currentSkipList.iterator();
+    assertTrue("key in peeked list should be 11", iter2.next().getSeqNum() == 11);
+    assertTrue("key in peeked list should be 13", iter2.next().getSeqNum() == 13);
+    assertTrue("key in peeked list should be 12 ", iter2.next().getSeqNum() == 12);
+    
+    iter2 = getSortedEventQueue(hdfsBQ).currentSkipList.iterator();
+    HashSet<Long> hs = new HashSet<Long>();
+    hs.add((long) 11);
+    hs.add((long) 13);
+    hs.add((long) 12);
+    hs.add((long) 3);
+    hs.add((long) 6);
+    hs.add((long) 2);
+    hs.add((long) 9);
+    hs.add((long) 8);
+    hs.add((long) 7);
+    
+    hdfsBQ.hdfsEventQueue.handleRemainingElements(hs);
+    
+    ArrayList a1 = hdfsBQ.peekABatch();
+    o1 = hopqueue.get(r1, CacheServerHelper.serialize("K3"), 0);;
+    o2 = hopqueue.get(r1, CacheServerHelper.serialize("K1"), 0);;
+    v2 = ((HDFSGatewayEventImpl)o2).getDeserializedValue();
+    assertTrue(" key should K3 should not have been found ",  o1 ==null);
+    assertTrue(" key should K1 with value V12 but the value was " + v2 , ((String)v2).equals("V12"));
+    
+    assertTrue("First key should be K1 but is " + ((HDFSGatewayEventImpl)a1.get(0)).getKey(), ((HDFSGatewayEventImpl)a1.get(0)).getKey().equals("K1"));
+    assertTrue("Second key should be K5 but is " + ((HDFSGatewayEventImpl)a1.get(1)).getKey(), ((HDFSGatewayEventImpl)a1.get(1)).getKey().equals("K5"));
+    assertTrue("Third key should be K5 but is " + ((HDFSGatewayEventImpl)a1.get(2)).getKey(), ((HDFSGatewayEventImpl)a1.get(2)).getKey().equals("K5"));
+    
+    assertTrue(" Peeked skip list size should be  3 ", getSortedEventQueue(hdfsBQ).getPeeked().size() == 3);
+    assertTrue(" skip list size should be  0 but is " + getSortedEventQueue(hdfsBQ).currentSkipList.size(), getSortedEventQueue(hdfsBQ).currentSkipList.size() == 0);
+    assertTrue(" skip list size should be  3 but is " + getSortedEventQueue(hdfsBQ).queueOfLists.peek().size(), getSortedEventQueue(hdfsBQ).queueOfLists.peek().size() == 3);
+    assertTrue(" skip list size should be  2 but is " + getSortedEventQueue(hdfsBQ).queueOfLists.size(), getSortedEventQueue(hdfsBQ).queueOfLists.size() == 2);
+    
+  }
+  
+  private HDFSGatewayEventImpl getNewEvent(Object key, Object value, Region r1, int bid, int tailKey) throws Exception {
+    EntryEventImpl ev1 = EntryEventImpl.create((LocalRegion)r1, Operation.CREATE,
+        key, value, null,
+        false, (DistributedMember)c.getMyId());
+    ev1.setEventId(new EventID(this.c.getDistributedSystem()));
+    HDFSGatewayEventImpl event = null;
+    event = new HDFSGatewayEventImpl(EnumListenerEvent.AFTER_CREATE, ev1, null , true, bid);
+    event.setShadowKey((long)tailKey);
+    return event;
+  }
+  
+  /**
+   * Creates the HDFS Queue instance for a region (this skips the creation of 
+   * event processor)
+   */
+  private HDFSParallelGatewaySenderQueue getHDFSQueue(Region region, Cache c) {
+    GatewaySenderAttributes gattrs = new GatewaySenderAttributes();
+    gattrs.isHDFSQueue = true;
+    gattrs.id = "SortedListForAsyncQueueJUnitTest_test";
+    ParallelAsyncEventQueueImpl gatewaySender = new ParallelAsyncEventQueueImpl(c, gattrs);
+    HashSet<Region> set = new HashSet<Region>();
+    set.add(region);
+    HDFSParallelGatewaySenderQueue queue = new HDFSParallelGatewaySenderQueue(gatewaySender, set, 0, 1);
+    queue.start();
+    return queue;
+  }
+  
+ // A test for testing whether the KeyToSeqNumObject compare function is in order.
+  public void testIfTheKeyToSeqNumIsKeptSortedWithoutConflation() throws Exception {
+    byte[] k1 = new byte[] { 1};
+    byte[] k2 = new byte[] { 2};
+    byte[] k3 = new byte[] { 3};
+    byte[] k4 = new byte[] { 4};
+    
+    KeyToSeqNumObject keyToSeq1 = new KeyToSeqNumObject(k1, new Long(2));
+    KeyToSeqNumObject keyToSeq2 = new KeyToSeqNumObject(k1, new Long(5));
+    KeyToSeqNumObject keyToSeq3 = new KeyToSeqNumObject(k1, new Long(8));
+    KeyToSeqNumObject keyToSeq4 = new KeyToSeqNumObject(k2, new Long(3));
+    KeyToSeqNumObject keyToSeq5 = new KeyToSeqNumObject(k2, new Long(7));
+    
+    ConcurrentSkipListSet<KeyToSeqNumObject> list = new ConcurrentSkipListSet<HDFSBucketRegionQueue.KeyToSeqNumObject>();
+    list.add(keyToSeq4);
+    list.add(keyToSeq3);
+    list.add(keyToSeq5);
+    list.add(keyToSeq1);
+    list.add(keyToSeq2);
+    list.add(keyToSeq5);
+    KeyToSeqNumObject k = list.pollFirst();
+    this.c.getLoggerI18n().fine(" KeyToSeqNumObject  byte: " + k.getRegionkey()[0] + " seq num: " + k.getSeqNum());
+    assertTrue ("Order of elements in Concurrent list is not correct ", k.equals(keyToSeq3));
+    list.remove(k);
+    
+    k = list.pollFirst();
+    this.c.getLoggerI18n().fine(" KeyToSeqNumObject  byte: " + k.getRegionkey()[0] + " seq num: " + k.getSeqNum());
+    assertTrue ("Order of elements in Concurrent list is not correct ", k.equals(keyToSeq2));
+    list.remove(k);
+    
+    k = list.pollFirst();
+    this.c.getLoggerI18n().fine(" KeyToSeqNumObject  byte: " + k.getRegionkey()[0] + " seq num: " + k.getSeqNum());
+    assertTrue ("Order of elements in Concurrent list is not correct ", k.equals(keyToSeq1));
+    list.remove(k);
+    
+    list.add(keyToSeq4);
+    list.add(keyToSeq3);
+    list.add(keyToSeq5);
+    list.add(keyToSeq1);
+    k = list.pollFirst();
+    this.c.getLoggerI18n().fine(" KeyToSeqNumObject  byte: " + k.getRegionkey()[0] + " seq num: " + k.getSeqNum());
+    assertTrue ("Order of elements in Concurrent list is not correct ", k.equals(keyToSeq3));
+    list.remove(k);
+    
+    k = list.pollFirst();
+    this.c.getLoggerI18n().fine(" KeyToSeqNumObject  byte: " + k.getRegionkey()[0] + " seq num: " + k.getSeqNum());
+    assertTrue ("Order of elements in Concurrent list is not correct ", k.equals(keyToSeq1));
+    list.remove(k);
+    
+    k = list.pollFirst();
+    this.c.getLoggerI18n().fine(" KeyToSeqNumObject  byte: " + k.getRegionkey()[0] + " seq num: " + k.getSeqNum());
+    assertTrue ("Order of elements in Concurrent list is not correct ", k.equals(keyToSeq5));
+    list.remove(k);
+    
+    k = list.pollFirst();
+    this.c.getLoggerI18n().fine(" KeyToSeqNumObject  byte: " + k.getRegionkey()[0] + " seq num: " + k.getSeqNum());
+    assertTrue ("Order of elements in Concurrent list is not correct ", k.equals(keyToSeq4));
+    
+    list.remove(k);
+  }
+  
+  public void testSingleGet() throws Exception {
+    checkQueueGet("K1", new KeyValue("K1", "V1"), "K1-V1");
+  }
+  
+  public void testMissingGet() throws Exception {
+    checkQueueGet("K1", null, 
+        "K0-V0",
+        "K2-V2");
+  }
+
+  public void testMultipleGet() throws Exception {
+    checkQueueGet("K1", new KeyValue("K1", "V1"), 
+        "K0-V0",
+        "K1-V1",
+        "K2-V2");
+  }
+
+  public void testDuplicateGet() throws Exception {
+    checkQueueGet("K1", new KeyValue("K1", "V1.4"), 
+        "K0-V0",
+        "K1-V1.0",
+        "K1-V1.1",
+        "K1-V1.2",
+        "K1-V1.3",
+        "K1-V1.4",
+        "K2-V2");
+  }
+
+  public void testEmptyIterator() throws Exception {
+    checkQueueIteration(Collections.<KeyValue>emptyList());
+  }
+  
+  public void testSingleIterator() throws Exception {
+    checkQueueIteration(getExpected(), 
+        "K0-V0",
+        "K1-V1",
+        "K2-V2",
+        "K3-V3",
+        "K4-V4",
+        "K5-V5",
+        "K6-V6",
+        "K7-V7",
+        "K8-V8",
+        "K9-V9"
+        );
+  }
+
+  public void testMultipleIterator() throws Exception {
+    checkQueueIteration(getExpected(), 
+        "K0-V0",
+        "K1-V1",
+        "K2-V2",
+        "roll",
+        "K3-V3",
+        "K4-V4",
+        "K5-V5",
+        "K6-V6",
+        "roll",
+        "K7-V7",
+        "K8-V8",
+        "K9-V9"
+        );
+  }
+
+  public void testMixedUpIterator() throws Exception {
+    checkQueueIteration(getExpected(), 
+        "K0-V0",
+        "K5-V5",
+        "K9-V9",
+        "roll",
+        "K3-V3",
+        "K2-V2",
+        "K6-V6",
+        "roll",
+        "K4-V4",
+        "K7-V7",
+        "K8-V8",
+        "K1-V1"
+        );
+  }
+
+  public void testMixedUpIterator2() throws Exception {
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(new KeyValue("K0", "V0"));
+    expected.add(new KeyValue("K1", "V1.2"));
+    expected.add(new KeyValue("K2", "V2.1"));
+    expected.add(new KeyValue("K3", "V3.1"));
+    expected.add(new KeyValue("K4", "V4.2"));
+    expected.add(new KeyValue("K5", "V5.2"));
+    expected.add(new KeyValue("K6", "V6"));
+    expected.add(new KeyValue("K7", "V7"));
+    expected.add(new KeyValue("K8", "V8"));
+    expected.add(new KeyValue("K9", "V9"));
+    
+    checkQueueIteration(expected, 
+        "K1-V1.0",
+        "K2-V2.0",
+        "K3-V3.0",
+        "K4-V4.0",
+        "roll",
+        "K2-V2.1",
+        "K4-V4.1",
+        "K6-V6",
+        "K8-V8",
+        "roll",
+        "K1-V1.1",
+        "K3-V3.1",
+        "K5-V5.0",
+        "K7-V7",
+        "K9-V9",
+        "roll",
+        "K0-V0",
+        "K1-V1.2",
+        "K4-V4.2",
+        "K5-V5.1",
+        "K5-V5.2"
+        );
+  }
+
+  private List<KeyValue> getExpected() {
+    List<KeyValue> expected = new ArrayList<KeyValue>();
+    expected.add(new KeyValue("K0", "V0"));
+    expected.add(new KeyValue("K1", "V1"));
+    expected.add(new KeyValue("K2", "V2"));
+    expected.add(new KeyValue("K3", "V3"));
+    expected.add(new KeyValue("K4", "V4"));
+    expected.add(new KeyValue("K5", "V5"));
+    expected.add(new KeyValue("K6", "V6"));
+    expected.add(new KeyValue("K7", "V7"));
+    expected.add(new KeyValue("K8", "V8"));
+    expected.add(new KeyValue("K9", "V9"));
+    
+    return expected;
+  }
+  
+  private void checkQueueGet(String key, KeyValue expected, String... entries) throws Exception {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(1);
+    
+    RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION);
+    PartitionedRegion r1 = (PartitionedRegion) rf1.setPartitionAttributes(paf.create()).create("r1");
+
+    // create the buckets
+    r1.put("blah", "blah");
+
+    // hack to get the queue. 
+    HDFSParallelGatewaySenderQueue hopqueue = getHDFSQueue(r1, this.c);
+    HDFSBucketRegionQueue brq = (HDFSBucketRegionQueue)((PartitionedRegion)hopqueue.getRegion()).getDataStore().getLocalBucketById(0);
+
+    
+    int seq = 0;
+    for (String s : entries) {
+      if (s.equals("roll")) {
+        brq.rolloverSkipList();
+      } else {
+        String[] kv = s.split("-");
+        hopqueue.put(getNewEvent(kv[0], kv[1], r1, 0, seq++));
+      }
+    }
+
+    byte[] bkey = EntryEventImpl.serialize(key);
+    HDFSGatewayEventImpl evt = hopqueue.get(r1, bkey, 0);
+    if (expected == null) {
+      assertNull(evt);
+      
+    } else {
+      assertEquals(expected.key, evt.getKey());
+      assertEquals(expected.value, evt.getDeserializedValue());
+    }
+  }
+  
+  private void checkQueueIteration(List<KeyValue> expected, String... entries) throws Exception {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(1);
+    
+    RegionFactory rf1 = this.c.createRegionFactory(RegionShortcut.PARTITION);
+    Region r1 = rf1.setPartitionAttributes(paf.create()).create("r1");
+
+    // create the buckets
+    r1.put("blah", "blah");
+
+    HDFSParallelGatewaySenderQueue hopqueue = getHDFSQueue(r1, this.c);
+    HDFSBucketRegionQueue brq = (HDFSBucketRegionQueue)((PartitionedRegion)hopqueue.getRegion()).getDataStore().getLocalBucketById(0);
+    
+    int seq = 0;
+    for (String s : entries) {
+      if (s.equals("roll")) {
+        brq.rolloverSkipList();
+      } else {
+        String[] kv = s.split("-");
+        hopqueue.put(getNewEvent(kv[0], kv[1], r1, 0, seq++));
+        getSortedEventQueue(brq).rollover(true);
+      }
+    }
+    
+    Iterator<HDFSGatewayEventImpl> iter = brq.iterator(r1);
+    List<KeyValue> actual = new ArrayList<KeyValue>();
+    while (iter.hasNext()) {
+      HDFSGatewayEventImpl evt = iter.next();
+      actual.add(new KeyValue((String) evt.getKey(), (String) evt.getDeserializedValue()));
+    }
+    
+    assertEquals(expected, actual);
+  }
+  
+  public static class KeyValue {
+    public final String key;
+    public final String value;
+    
+    public KeyValue(String key, String value) {
+      this.key = key;
+      this.value = value;
+    }
+    
+    @Override
+    public boolean equals(Object o) {
+      if (o == null)
+        return false;
+
+      KeyValue obj = (KeyValue) o;
+      return key.equals(obj.key) && value.equals(obj.value);
+    }
+    
+    @Override
+    public String toString() {
+      return key + "=" + value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java
new file mode 100644
index 0000000..c8c15d5
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/BaseHoplogTestCase.java
@@ -0,0 +1,394 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.SerializedCacheValue;
+import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.hdfs.HDFSStore;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreFactory;
+import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreFactoryImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl;
+import com.gemstone.gemfire.cache.hdfs.internal.SortedHDFSQueuePersistedEvent;
+import com.gemstone.gemfire.cache.hdfs.internal.HDFSStoreImpl.FileSystemFactory;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.Hoplog.HoplogWriter;
+import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics;
+import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics;
+import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+public abstract class BaseHoplogTestCase extends TestCase {
+  public static final String HDFS_STORE_NAME = "hdfs";
+  public static final Random rand = new Random(System.currentTimeMillis());
+  protected Path testDataDir;
+  protected Cache cache;
+  
+  protected HDFSRegionDirector director; 
+  protected HdfsRegionManager regionManager;
+  protected HDFSStoreFactory hsf;
+  protected HDFSStoreImpl hdfsStore;
+  protected RegionFactory<Object, Object> regionfactory;
+  protected Region<Object, Object> region;
+  protected SortedOplogStatistics stats;
+  protected HFileStoreStatistics storeStats;
+  protected BlockCache blockCache;
+  
+  Set<IgnoredException> exceptions = new HashSet<IgnoredException>();
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    System.setProperty(HoplogConfig.ALLOW_LOCAL_HDFS_PROP, "true");
+    
+    //This is logged by HDFS when it is stopped.
+    exceptions.add(IgnoredException.addIgnoredException("sleep interrupted"));
+    exceptions.add(IgnoredException.addIgnoredException("java.io.InterruptedIOException"));
+    
+    testDataDir = new Path("test-case");
+
+    cache = createCache();
+    
+    configureHdfsStoreFactory();
+    hdfsStore = (HDFSStoreImpl) hsf.create(HDFS_STORE_NAME);
+
+    regionfactory = cache.createRegionFactory(RegionShortcut.PARTITION_HDFS);
+    regionfactory.setHDFSStoreName(HDFS_STORE_NAME);
+    region = regionfactory.create(getName());
+    
+    // disable compaction by default and clear existing queues
+    HDFSCompactionManager compactionManager = HDFSCompactionManager.getInstance(hdfsStore);
+    compactionManager.reset();
+    
+    director = HDFSRegionDirector.getInstance();
+    director.setCache(cache);
+    regionManager = ((LocalRegion)region).getHdfsRegionManager();
+    stats = director.getHdfsRegionStats("/" + getName());
+    storeStats = hdfsStore.getStats();
+    blockCache = hdfsStore.getBlockCache();
+    AbstractHoplogOrganizer.JUNIT_TEST_RUN = true;
+  }
+
+  protected void configureHdfsStoreFactory() throws Exception {
+    hsf = this.cache.createHDFSStoreFactory();
+    hsf.setHomeDir(testDataDir.toString());
+    hsf.setMinorCompaction(false);
+    hsf.setMajorCompaction(false);
+  }
+
+  protected Cache createCache() {
+    CacheFactory cf = new CacheFactory().set("mcast-port", "0")
+        .set("log-level", "info")
+        ;
+    cache = cf.create();
+    return cache;
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    if (region != null) {
+      region.destroyRegion();
+    }
+    
+    if (hdfsStore != null) {
+      hdfsStore.getFileSystem().delete(testDataDir, true);
+      hdfsStore.destroy();
+    }
+    
+    if (cache != null) {
+      cache.close();
+    }
+    super.tearDown();
+    for (IgnoredException ex: exceptions) {
+      ex.remove();
+    }
+  }
+
+  /**
+   * creates a hoplog file with numKeys records. Keys follow key-X pattern and values follow value-X
+   * pattern where X=0 to X is = numKeys -1
+   * 
+   * @return the sorted map of inserted KVs
+   */
+  protected TreeMap<String, String> createHoplog(int numKeys, Hoplog oplog) throws IOException {
+    int offset = (numKeys > 10 ? 100000 : 0);
+    
+    HoplogWriter writer = oplog.createWriter(numKeys);
+    TreeMap<String, String> map = new TreeMap<String, String>();
+    for (int i = offset; i < (numKeys + offset); i++) {
+      String key = ("key-" + i);
+      String value = ("value-" + System.nanoTime());
+      writer.append(key.getBytes(), value.getBytes());
+      map.put(key, value);
+    }
+    writer.close();
+    return map;
+  }
+  
+  protected FileStatus[] getBucketHoplogs(String regionAndBucket, final String type)
+      throws IOException {
+    return getBucketHoplogs(hdfsStore.getFileSystem(), regionAndBucket, type);
+  }
+  
+  protected FileStatus[] getBucketHoplogs(FileSystem fs, String regionAndBucket, final String type)
+      throws IOException {
+    FileStatus[] hoplogs = fs.listStatus(
+        new Path(testDataDir, regionAndBucket), new PathFilter() {
+          @Override
+          public boolean accept(Path file) {
+            return file.getName().endsWith(type);
+          }
+        });
+    return hoplogs;
+  }
+
+  protected String getRandomHoplogName() {
+    String hoplogName = "hoplog-" + System.nanoTime() + "-" + rand.nextInt(10000) + ".hop";
+    return hoplogName;
+  }
+  
+//  public static MiniDFSCluster initMiniCluster(int port, int numDN) throws Exception {
+//    HashMap<String, String> map = new HashMap<String, String>();
+//    map.put(DFSConfigKeys.DFS_REPLICATION_KEY, "1");
+//    return initMiniCluster(port, numDN, map);
+//  }
+//
+//  public static MiniDFSCluster initMiniCluster(int port, int numDN, HashMap<String, String> map) throws Exception {
+//    System.setProperty("test.build.data", "hdfs-test-cluster");
+//    Configuration hconf = new HdfsConfiguration();
+//    for (Entry<String, String> entry : map.entrySet()) {
+//      hconf.set(entry.getKey(), entry.getValue());
+//    }
+//
+//    hconf.set("dfs.namenode.fs-limits.min-block-size", "1024");
+//    
+//    Builder builder = new MiniDFSCluster.Builder(hconf);
+//    builder.numDataNodes(numDN);
+//    builder.nameNodePort(port);
+//    MiniDFSCluster cluster = builder.build();
+//    return cluster;
+//  }
+
+  public static void setConfigFile(HDFSStoreFactory factory, File configFile, String config)
+      throws Exception {
+    BufferedWriter bw = new BufferedWriter(new FileWriter(configFile));
+    bw.write(config);
+    bw.close();
+    factory.setHDFSClientConfigFile(configFile.getName());
+  }
+  
+  public static void alterMajorCompaction(HDFSStoreImpl store, boolean enable) {
+    HDFSStoreMutator mutator = store.createHdfsStoreMutator();
+    mutator.setMajorCompaction(enable);
+    store.alter(mutator);
+  }
+  
+  public static void alterMinorCompaction(HDFSStoreImpl store, boolean enable) {
+    HDFSStoreMutator mutator = store.createHdfsStoreMutator();
+    mutator.setMinorCompaction(enable);
+    store.alter(mutator);
+  }
+  
+  public void deleteMiniClusterDir() throws Exception {
+    File clusterDir = new File("hdfs-test-cluster");
+    if (clusterDir.exists()) {
+      FileUtils.deleteDirectory(clusterDir);
+    }
+  }
+  
+  public static class TestEvent extends SortedHDFSQueuePersistedEvent {
+    Object key;
+    
+    public TestEvent(String k, String v) throws Exception {
+      this(k, v, Operation.PUT_IF_ABSENT);
+    }
+
+    public TestEvent(String k, String v, Operation op) throws Exception {
+      super(v, op, (byte) 0x02, false, new DiskVersionTag(), BlobHelper.serializeToBlob(k), 0);
+      this.key = k; 
+    }
+
+    public Object getKey() {
+      return key;
+      
+    }
+
+    public Object getNewValue() {
+      return valueObject;
+    }
+
+    public Operation getOperation() {
+      return op;
+    }
+    
+    public Region<Object, Object> getRegion() {
+      return null;
+    }
+
+    public Object getCallbackArgument() {
+      return null;
+    }
+
+    public boolean isCallbackArgumentAvailable() {
+      return false;
+    }
+
+    public boolean isOriginRemote() {
+      return false;
+    }
+
+    public DistributedMember getDistributedMember() {
+      return null;
+    }
+
+    public boolean isExpiration() {
+      return false;
+    }
+
+    public boolean isDistributed() {
+      return false;
+    }
+
+    public Object getOldValue() {
+      return null;
+    }
+
+    public SerializedCacheValue<Object> getSerializedOldValue() {
+      return null;
+    }
+
+    public SerializedCacheValue<Object> getSerializedNewValue() {
+      return null;
+    }
+
+    public boolean isLocalLoad() {
+      return false;
+    }
+
+    public boolean isNetLoad() {
+      return false;
+    }
+
+    public boolean isLoad() {
+      return false;
+    }
+
+    public boolean isNetSearch() {
+      return false;
+    }
+
+    public TransactionId getTransactionId() {
+      return null;
+    }
+
+    public boolean isBridgeEvent() {
+      return false;
+    }
+
+    public boolean hasClientOrigin() {
+      return false;
+    }
+
+    public boolean isOldValueAvailable() {
+      return false;
+    }
+  }
+  
+  public abstract class AbstractCompactor implements Compactor {
+    @Override
+    public HDFSStore getHdfsStore() {
+      return hdfsStore;
+    }
+
+    public void suspend() {
+    }
+
+    public void resume() {
+    }
+
+    public boolean isBusy(boolean isMajor) {
+      return false;
+    }
+  }
+  
+  public HDFSStoreFactoryImpl getCloseableLocalHdfsStoreFactory() {
+    final FileSystemFactory fsFactory = new FileSystemFactory() {
+      // by default local FS instance is not disabled by close. Hence this
+      // customization
+      class CustomFileSystem extends LocalFileSystem {
+        boolean isClosed = false;
+
+        public void close() throws IOException {
+          isClosed = true;
+          super.close();
+        }
+
+        public FileStatus getFileStatus(Path f) throws IOException {
+          if (isClosed) {
+            throw new IOException();
+          }
+          return super.getFileStatus(f);
+        }
+      }
+
+      public FileSystem create(URI namenode, Configuration conf, boolean forceNew) throws IOException {
+        CustomFileSystem fs = new CustomFileSystem();
+        fs.initialize(namenode, conf);
+        return fs;
+      }
+    };
+
+    HDFSStoreFactoryImpl storeFactory = new HDFSStoreFactoryImpl(cache) {
+      public HDFSStore create(String name) {
+        return new HDFSStoreImpl(name, this.configHolder) {
+          public FileSystemFactory getFileSystemFactory() {
+            return fsFactory;
+          }
+        };
+      }
+    };
+    return storeFactory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java
new file mode 100644
index 0000000..db050b3
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/CardinalityEstimatorJUnitTest.java
@@ -0,0 +1,188 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+
+@Category({IntegrationTest.class, HoplogTest.class})
+public class CardinalityEstimatorJUnitTest extends BaseHoplogTestCase {
+
+  public void testSingleHoplogCardinality() throws Exception {
+    int count = 10;
+    int bucketId = (int) System.nanoTime();
+    HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+    // flush and create hoplog
+    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
+    for (int i = 0; i < count; i++) {
+      items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
+    }
+    // assert that size is 0 before flush begins
+    assertEquals(0, organizer.sizeEstimate());
+    organizer.flush(items.iterator(), count);
+
+    assertEquals(count, organizer.sizeEstimate());
+    assertEquals(0, stats.getActiveReaderCount());
+    
+    organizer.close();
+    organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+    assertEquals(count, organizer.sizeEstimate());
+    assertEquals(1, stats.getActiveReaderCount());
+  }
+
+  public void testSingleHoplogCardinalityWithDuplicates() throws Exception {
+    int bucketId = (int) System.nanoTime();
+    HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+    List<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent("key-0", "value-0"));
+    items.add(new TestEvent("key-0", "value-0"));
+    items.add(new TestEvent("key-1", "value-1"));
+    items.add(new TestEvent("key-2", "value-2"));
+    items.add(new TestEvent("key-3", "value-3"));
+    items.add(new TestEvent("key-3", "value-3"));
+    items.add(new TestEvent("key-4", "value-4"));
+
+    organizer.flush(items.iterator(), 7);
+    assertEquals(5, organizer.sizeEstimate());
+  }
+
+  public void testMultipleHoplogCardinality() throws Exception {
+    int bucketId = (int) System.nanoTime();
+    HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+    List<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent("key-0", "value-0"));
+    items.add(new TestEvent("key-1", "value-1"));
+    items.add(new TestEvent("key-2", "value-2"));
+    items.add(new TestEvent("key-3", "value-3"));
+    items.add(new TestEvent("key-4", "value-4"));
+
+    organizer.flush(items.iterator(), 5);
+    assertEquals(5, organizer.sizeEstimate());
+
+    items.clear();
+    items.add(new TestEvent("key-1", "value-0"));
+    items.add(new TestEvent("key-5", "value-5"));
+    items.add(new TestEvent("key-6", "value-6"));
+    items.add(new TestEvent("key-7", "value-7"));
+    items.add(new TestEvent("key-8", "value-8"));
+    items.add(new TestEvent("key-9", "value-9"));
+
+    organizer.flush(items.iterator(), 6);
+    assertEquals(10, organizer.sizeEstimate());
+  }
+
+  public void testCardinalityAfterRestart() throws Exception {
+    int bucketId = (int) System.nanoTime();
+    HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+    List<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent("key-0", "value-0"));
+    items.add(new TestEvent("key-1", "value-1"));
+    items.add(new TestEvent("key-2", "value-2"));
+    items.add(new TestEvent("key-3", "value-3"));
+    items.add(new TestEvent("key-4", "value-4"));
+
+    assertEquals(0, organizer.sizeEstimate());
+    organizer.flush(items.iterator(), 5);
+    assertEquals(5, organizer.sizeEstimate());
+
+    // restart
+    organizer.close();
+    organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+    assertEquals(5, organizer.sizeEstimate());
+    
+    items.clear();
+    items.add(new TestEvent("key-1", "value-0"));
+    items.add(new TestEvent("key-5", "value-5"));
+    items.add(new TestEvent("key-6", "value-6"));
+    items.add(new TestEvent("key-7", "value-7"));
+    items.add(new TestEvent("key-8", "value-8"));
+    items.add(new TestEvent("key-9", "value-9"));
+
+    organizer.flush(items.iterator(), 6);
+    assertEquals(10, organizer.sizeEstimate());
+
+    // restart - make sure that HLL from the youngest file is read
+    organizer.close();
+    organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+    assertEquals(10, organizer.sizeEstimate());
+    
+    items.clear();
+    items.add(new TestEvent("key-1", "value-1"));
+    items.add(new TestEvent("key-5", "value-5"));
+    items.add(new TestEvent("key-10", "value-10"));
+    items.add(new TestEvent("key-11", "value-11"));
+    items.add(new TestEvent("key-12", "value-12"));
+    items.add(new TestEvent("key-13", "value-13"));
+    items.add(new TestEvent("key-14", "value-14"));
+
+    organizer.flush(items.iterator(), 7);
+    assertEquals(15, organizer.sizeEstimate());
+  }
+
+  public void testCardinalityAfterMajorCompaction() throws Exception {
+    doCardinalityAfterCompactionWork(true);
+  }
+
+  public void testCardinalityAfterMinorCompaction() throws Exception {
+    doCardinalityAfterCompactionWork(false);
+  }
+
+  private void doCardinalityAfterCompactionWork(boolean isMajor) throws Exception {
+    int bucketId = (int) System.nanoTime();
+    HoplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, bucketId);
+
+    List<TestEvent> items = new ArrayList<TestEvent>();
+    items.add(new TestEvent("key-0", "value-0"));
+    items.add(new TestEvent("key-1", "value-1"));
+    items.add(new TestEvent("key-2", "value-2"));
+    items.add(new TestEvent("key-3", "value-3"));
+    items.add(new TestEvent("key-4", "value-4"));
+
+    organizer.flush(items.iterator(), 5);
+    assertEquals(5, organizer.sizeEstimate());
+
+    items.clear();
+    items.add(new TestEvent("key-0", "value-0"));
+    items.add(new TestEvent("key-1", "value-5", Operation.DESTROY));
+    items.add(new TestEvent("key-2", "value-6", Operation.INVALIDATE));
+    items.add(new TestEvent("key-5", "value-5"));
+
+    organizer.flush(items.iterator(), 4);
+    assertEquals(6, organizer.sizeEstimate());
+
+    items.clear();
+    items.add(new TestEvent("key-3", "value-5", Operation.DESTROY));
+    items.add(new TestEvent("key-4", "value-6", Operation.INVALIDATE));
+    items.add(new TestEvent("key-5", "value-0"));
+    items.add(new TestEvent("key-6", "value-5"));
+
+    organizer.flush(items.iterator(), 4);
+    
+    items.add(new TestEvent("key-5", "value-0"));
+    items.add(new TestEvent("key-6", "value-5"));
+    
+    items.clear();
+    organizer.flush(items.iterator(), items.size());
+    assertEquals(7, organizer.sizeEstimate());
+
+    organizer.getCompactor().compact(isMajor, false);
+    assertEquals(3, organizer.sizeEstimate());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9f3f10fd/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java
new file mode 100644
index 0000000..67dcddf
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCacheLoaderJUnitTest.java
@@ -0,0 +1,106 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
+
+import java.util.List;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesMutator;
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.CacheLoaderException;
+import com.gemstone.gemfire.cache.LoaderHelper;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+import com.gemstone.gemfire.test.junit.categories.HoplogTest;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest
+;
+
+/**
+ * Tests that entries loaded from a cache loader are inserted in the HDFS queue 
+ * 
+ * @author hemantb
+ */
+@Category({IntegrationTest.class, HoplogTest.class})
+public class HDFSCacheLoaderJUnitTest extends BaseHoplogTestCase {
+
+  private static int totalEventsReceived = 0;
+  protected void configureHdfsStoreFactory() throws Exception {
+    hsf = this.cache.createHDFSStoreFactory();
+    hsf.setHomeDir(testDataDir.toString());
+    hsf.setBatchInterval(100000000);
+    hsf.setBatchSize(10000);
+  }
+
+  /**
+   * Tests that entries loaded from a cache loader are inserted in the HDFS queue 
+   * but are not inserted in async queues. 
+   * @throws Exception
+   */
+  public void testCacheLoaderForAsyncQAndHDFS() throws Exception {
+    
+    final AsyncEventQueueStats hdfsQueuestatistics = ((AsyncEventQueueImpl)cache.
+        getAsyncEventQueues().toArray()[0]).getStatistics();
+    
+    AttributesMutator am = this.region.getAttributesMutator();
+    am.setCacheLoader(new CacheLoader() {
+      private int i = 0;
+      public Object load(LoaderHelper helper)
+      throws CacheLoaderException {
+        return new Integer(i++);
+      }
+      
+      public void close() { }
+    });
+    
+    
+    
+    String asyncQueueName = "myQueue";
+    new AsyncEventQueueFactoryImpl(cache).setBatchTimeInterval(1).
+    create(asyncQueueName, new AsyncEventListener() {
+      
+      @Override
+      public void close() {
+        // TODO Auto-generated method stub
+        
+      }
+
+      @Override
+      public boolean processEvents(List events) {
+        totalEventsReceived += events.size();
+        return true;
+      }
+    });
+    am.addAsyncEventQueueId(asyncQueueName);
+    
+    region.put(1, new Integer(100));
+    region.destroy(1);
+    region.get(1);
+    region.destroy(1);
+    
+    assertTrue("HDFS queue should have received four events. But it received " + 
+        hdfsQueuestatistics.getEventQueueSize(), 4 == hdfsQueuestatistics.getEventQueueSize());
+    assertTrue("HDFS queue should have received four events. But it received " + 
+        hdfsQueuestatistics.getEventsReceived(), 4 == hdfsQueuestatistics.getEventsReceived());
+    
+    region.get(1);
+    Thread.sleep(2000);
+    
+    assertTrue("Async queue should have received only 5 events. But it received " + 
+        totalEventsReceived, totalEventsReceived == 5);
+    assertTrue("HDFS queue should have received 5 events. But it received " + 
+        hdfsQueuestatistics.getEventQueueSize(), 5 == hdfsQueuestatistics.getEventQueueSize());
+    assertTrue("HDFS queue should have received 5 events. But it received " + 
+        hdfsQueuestatistics.getEventsReceived(), 5 == hdfsQueuestatistics.getEventsReceived());
+    
+    
+  }
+  
+}


Mime
View raw message