geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [22/50] [abbrv] incubator-geode git commit: GEODE-429: Remove HdfsStore Junit and Dunits
Date Mon, 26 Oct 2015 18:28:45 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
deleted file mode 100644
index 011d82b..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSCompactionManagerJUnitTest.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSCompactionManager.CompactionRequest;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogOrganizer.Compactor;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest
-;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HDFSCompactionManagerJUnitTest extends BaseHoplogTestCase {
-  /**
-   * Tests queueing of major and minor compaction requests in respective queues
-   */
-  public void testMinMajCompactionIsolation() throws Exception {
-    // no-op compactor
-    Compactor compactor = new AbstractCompactor() {
-      Object minor = new Object();
-      Object major = new Object();
-      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
-        try {
-          if (isMajor) {
-            synchronized (major) {
-              major.wait();
-            }
-          } else {
-            synchronized (minor) {
-              minor.wait();
-            }
-          }
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-        }
-        return true;
-      }
-    };
-
-    // compaction is disabled. all requests will wait in queue
-    HDFSCompactionManager instance = HDFSCompactionManager.getInstance(hdfsStore);
-    alterMinorCompaction(hdfsStore, true);
-    alterMajorCompaction(hdfsStore, true);
-    
-    assertEquals(0, instance.getMinorCompactor().getActiveCount());
-    assertEquals(0, instance.getMajorCompactor().getActiveCount());
-    
-    //minor request
-    CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
-    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    //major request
-    cr = new CompactionRequest("region", 0, compactor, true);
-    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    
-    //wait for requests to get in queue
-    TimeUnit.MILLISECONDS.sleep(50);
-    assertEquals(1, instance.getMinorCompactor().getActiveCount());
-    assertEquals(1, instance.getMajorCompactor().getActiveCount());
-  }
-
-  /**
-   * Tests compaction pause. Once compaction is stopped, requests will 
-   * start getting rejected
-   */
-  public void testAlterAutoMinorCompaction() throws Exception {
-    // each new compaction execution increments counter by 1. this way track how many pending
tasks
-    final AtomicInteger totalExecuted = new AtomicInteger(0);
-    Compactor compactor = new AbstractCompactor() {
-      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
-        totalExecuted.incrementAndGet();
-        return true;
-      }
-    };
-
-    // compaction is enabled. submit requests and after some time counter should be 0
-    alterMinorCompaction(hdfsStore, true);
-    CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
-    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    cr = new CompactionRequest("region", 1, compactor, false);
-    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-
-    int totalWait = 20;
-    while (totalWait > 0 && 2 != totalExecuted.get()) {
-      // wait for operations to complete. The execution will terminate as soon as possible
-      System.out.println("waiting one small cycle for dummy request to complete");
-      TimeUnit.MILLISECONDS.sleep(50);
-      totalWait--;
-    }
-    assertEquals(2, totalExecuted.get());
-
-    // so compaction works. now disable comapction and submit large number of requests till
rejected
-    // execution counter should not increase
-    alterMinorCompaction(hdfsStore, false);
-    boolean success = false;
-    int i = 0;
-    do {
-      cr = new CompactionRequest("region", ++i, compactor, false);
-      success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
-    } while (success);
-
-    TimeUnit.MILLISECONDS.sleep(500);
-    assertEquals(2, totalExecuted.get());
-  }
-  public void testAlterAutoMajorCompaction() throws Exception {
-    // each new compaction execution increments counter by 1. this way track how many pending
tasks
-    final AtomicInteger totalExecuted = new AtomicInteger(0);
-    Compactor compactor = new AbstractCompactor() {
-      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
-        totalExecuted.incrementAndGet();
-        return true;
-      }
-    };
-    
-    // compaction is enabled. submit requests and after some time counter should be 0
-    alterMajorCompaction(hdfsStore, true);
-    CompactionRequest cr = new CompactionRequest("region", 0, compactor, true);
-    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    cr = new CompactionRequest("region", 1, compactor, true);
-    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    
-    int totalWait = 20;
-    while (totalWait > 0 && 2 != totalExecuted.get()) {
-      // wait for operations to complete. The execution will terminate as soon as possible
-      System.out.println("waiting one small cycle for dummy request to complete");
-      TimeUnit.MILLISECONDS.sleep(50);
-      totalWait--;
-    }
-    assertEquals(2, totalExecuted.get());
-    
-    // so compaction works. now disable comapction and submit large number of requests till
rejected
-    // execution counter should not increase
-    alterMajorCompaction(hdfsStore, false);
-    boolean success = false;
-    int i = 0;
-    do {
-      cr = new CompactionRequest("region", ++i, compactor, true);
-      success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
-      System.out.println("success: " + success);
-    } while (success);
-    
-    TimeUnit.MILLISECONDS.sleep(500);
-    assertEquals(2, totalExecuted.get());
-  }
-  
-  /**
-   * Tests duplicate compaction requests do not cause rejection
-   */
-   public void testDuplicateRequests() throws Exception {
-    final AtomicBoolean barrierOpen = new AtomicBoolean(false);
-    class TestCompactor extends AbstractCompactor {
-      AtomicBoolean busy = new AtomicBoolean(false);
-      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
-        synchronized (barrierOpen) {
-          busy.set(true);
-          if (barrierOpen.get()) {
-            return false;
-          }
-          try {
-            barrierOpen.wait();
-          } catch (InterruptedException e) {
-            return false;
-          }
-          busy.set(false);
-        }
-        return true;
-      }
-      public boolean isBusy(boolean isMajor) {return busy.get();}
-    };
-    
-    System.setProperty(HoplogConfig.COMPCATION_QUEUE_CAPACITY, "10");
-
-    alterMinorCompaction(hdfsStore, true);
-    alterMajorCompaction(hdfsStore, true);
-    // capacity is 10, thread num is 2, so only the first 12 request will be
-    // submitted
-    for (int i = 0; i < 15; i++) {
-      CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), true);
-      boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) !=
null;
-      if (success) {
-        assertTrue("failed for " + i, i < 12);
-      } else {
-        assertTrue("failed for " + i, i >= 12);
-      }
-    }
-    
-    synchronized (barrierOpen) {
-      barrierOpen.set(true);
-      barrierOpen.notifyAll();
-    }
-    TimeUnit.MILLISECONDS.sleep(100);
-    barrierOpen.set(false);
-    
-    HDFSCompactionManager.getInstance(hdfsStore).reset();
-    TestCompactor compactor = new TestCompactor();
-    for (int i = 0; i < 10; i++) {
-      TimeUnit.MILLISECONDS.sleep(20);
-      CompactionRequest cr = new CompactionRequest("region", 0, compactor, true);
-      boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) !=
null;
-      if (success) {
-        assertTrue("failed for " + i, i < 2);
-      } else {
-        assertTrue("failed for " + i, i > 0);
-      }
-    }
-  }
-
-  public void testForceCompactionWithAutoDisabled() throws Exception {
-    HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager,
0);
-
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    items.add(new TestEvent(("1"), ("1-1")));
-    organizer.flush(items.iterator(), items.size());
-
-    items.clear();
-    items.add(new TestEvent(("2"), ("2-1")));
-    organizer.flush(items.iterator(), items.size());
-    
-    FileStatus[] files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-    assertEquals(2, files.length);
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
-    assertEquals(0, files.length);
-    
-    CompactionRequest cr = new CompactionRequest(getName(), 0, organizer.getCompactor(),
true);
-    HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    TimeUnit.MILLISECONDS.sleep(500);
-
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
-    assertEquals(0, files.length);
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
-    assertEquals(0, files.length);
-    
-    organizer.forceCompaction(true);
-    TimeUnit.MILLISECONDS.sleep(500);
-    
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
-    assertEquals(1, files.length);
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
-    assertEquals(2, files.length);
-  }
-
-  /**
-   * Test force major compaction completes on version upgrade even when there is only one
hoplog
-   */
-  public void testForceCompaction() throws Exception {
-    HoplogOrganizer<? extends PersistedEventImpl> organizer = new HdfsSortedOplogOrganizer(regionManager,
0);
-
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    items.add(new TestEvent(("1"), ("1-1")));
-    organizer.flush(items.iterator(), items.size());
-
-    items.clear();
-    items.add(new TestEvent(("2"), ("2-1")));
-    organizer.flush(items.iterator(), items.size());
-    
-    FileStatus[] files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION);
-    assertEquals(2, files.length);
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
-    assertEquals(0, files.length);
-    
-    // isForced is true for user submitted compaction requests (through system procedure)
-    // we do not want to compact an already compacted file
-    CompactionRequest cr = new CompactionRequest(getName(), 0, organizer.getCompactor(),
true, true/*isForced*/);
-    Future<CompactionStatus> status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    status.get().equals(true);
-
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
-    assertEquals(1, files.length);
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
-    assertEquals(2, files.length);
-
-    // second request to force compact does not do anything
-    status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    status.get().equals(false);
-    
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
-    assertEquals(1, files.length);
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
-    assertEquals(2, files.length);
-
-    // upon version upgrade force compaction is allowed
-    cr = new CompactionRequest(getName(), 0, organizer.getCompactor(), true, true, true);
-    status = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr);
-    status.get().equals(true);
-    
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION);
-    assertEquals(2, files.length);
-    files = getBucketHoplogs(getName() + "/0", AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
-    assertEquals(3, files.length); // + 1 for old major hoplog
-  }
-
-  /**
-   * Test successful sequential submission
-   */
-  public void testSameBucketSeqRequest() throws Exception {
-    final AtomicInteger counter = new AtomicInteger(0);
-    Compactor compactor = new AbstractCompactor() {
-      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
-        counter.set(1);
-        return true;
-      }
-    };
-
-    HDFSCompactionManager.getInstance(hdfsStore).reset();
-    alterMinorCompaction(hdfsStore, true);
-    alterMajorCompaction(hdfsStore, true);
-    CompactionRequest cr = new CompactionRequest("region", 0, compactor, false);
-    assertEquals(0, counter.get());
-    boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
-    assertEquals(true, success);
-    while (!counter.compareAndSet(1, 0)) {
-      TimeUnit.MILLISECONDS.sleep(20);
-    }
-    
-    assertEquals(0, counter.get());
-    success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) != null;
-    assertEquals(true, success);
-    for (int i = 0; i < 10; i++) {
-      TimeUnit.MILLISECONDS.sleep(20);
-      if (counter.get() == 1) {
-        break;
-      }
-    }
-    assertEquals(1, counter.get());
-  }
-  
-  public void testAlterMinorThreadsIncrease() throws Exception {
-    doAlterCompactionThreads(false, false);
-  }
-  public void testAlterMinorThreadsDecrease() throws Exception {
-    doAlterCompactionThreads(false, true);
-  }
-  public void testAlterMajorThreadsIncrease() throws Exception {
-    doAlterCompactionThreads(true, false);
-  }
-  public void testAlterMajorThreadsDecrease() throws Exception {
-    doAlterCompactionThreads(true, true);
-  }
-  
-  public void doAlterCompactionThreads(final boolean testMajor, boolean decrease) throws
Exception {
-    final AtomicBoolean barrierOpen = new AtomicBoolean(false);
-    final AtomicInteger counter = new AtomicInteger(0);
-    class TestCompactor extends AbstractCompactor {
-      public boolean compact(boolean isMajor, boolean isForced) throws IOException {
-        synchronized (barrierOpen) {
-          if ((testMajor && !isMajor)  || (!testMajor && isMajor)) {
-            return true;
-          }
-          if (barrierOpen.get()) {
-            return false;
-          }
-          try {
-            barrierOpen.wait();
-          } catch (InterruptedException e) {
-            return false;
-          }
-          counter.incrementAndGet();
-        }
-        return true;
-      }
-    };
-    
-    System.setProperty(HoplogConfig.COMPCATION_QUEUE_CAPACITY, "1");
-
-    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
-    int defaultThreadCount = 10;
-    if (testMajor) {
-      alterMajorCompaction(hdfsStore, true);
-      defaultThreadCount = 2;
-      mutator.setMajorCompactionThreads(15);
-      if (decrease) {
-        mutator.setMajorCompactionThreads(1);
-      }
-    } else {
-      alterMinorCompaction(hdfsStore, true);
-      mutator.setMinorCompactionThreads(15);
-      if (decrease) {
-        mutator.setMinorCompactionThreads(1);
-      }
-    }
-    
-    // capacity is 1, thread num is 10 or 2, so only the first 11 or 3 request will be
-    // submitted
-    cache.getLogger().info("<ExpectedException action=add>java.util.concurrent.RejectedExecutionException</ExpectedException>");
-    for (int i = 0; i < 15; i++) {
-      CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), testMajor);
-      boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) !=
null;
-      if (success) {
-        assertTrue("failed for " + i, i <= defaultThreadCount);
-      } else {
-        assertTrue("failed for " + i, i > defaultThreadCount);
-      }
-    }
-    
-    TimeUnit.MILLISECONDS.sleep(500);
-    assertEquals(0, counter.get());
-    synchronized (barrierOpen) {
-      barrierOpen.set(true);
-      barrierOpen.notifyAll();
-    }
-    TimeUnit.MILLISECONDS.sleep(500);
-    assertEquals(defaultThreadCount, counter.get());
-    
-    hdfsStore.alter(mutator);
-
-    counter.set(0);
-    barrierOpen.set(false);
-    for (int i = 0; i < 15; i++) {
-      TimeUnit.MILLISECONDS.sleep(100);
-      CompactionRequest cr = new CompactionRequest("region", i, new TestCompactor(), testMajor);
-      boolean success = HDFSCompactionManager.getInstance(hdfsStore).submitRequest(cr) !=
null;
-      if (decrease) {
-        if (i > 3) {
-          assertFalse("failed for " + i, success);
-        }
-      } else {
-        assertTrue("failed for " + i, success);
-      }
-    }
-    TimeUnit.MILLISECONDS.sleep(500);
-    synchronized (barrierOpen) {
-      barrierOpen.set(true);
-      barrierOpen.notifyAll();
-    }
-    TimeUnit.MILLISECONDS.sleep(500);
-    if (decrease) {
-      assertTrue(counter.get() < 4);
-    } else {
-      assertEquals(15, counter.get());
-    }
-
-    cache.getLogger().info("<ExpectedException action=remove>java.util.concurrent.RejectedExecutionException</ExpectedException>");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
deleted file mode 100644
index dc7b987..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSRegionDirectorJUnitTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.hdfs.internal.HoplogListenerForRegion;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest
-;
-
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HDFSRegionDirectorJUnitTest extends BaseHoplogTestCase {
-  public void testDirector() throws Exception {
-    int bucketId = 0;
-
-    HdfsRegionManager mgr = regionManager;
-    
-    // no buckets have been created so far.
-    assertEquals(0, director.getBucketCount("/" + getName()));
-
-    // one bucket created
-    mgr.create(bucketId);
-    assertEquals(1, director.getBucketCount("/" + getName()));
-
-    // close bucket test
-    mgr.close(bucketId);
-    
-    // all buckets have been closed.
-    assertEquals(0, director.getBucketCount("/" + getName()));
-
-    mgr.create(bucketId);
-    assertEquals(1, director.getBucketCount("/" + getName()));
-    director.clear("/" + getName());
-    try {
-      assertEquals(0, director.getBucketCount("/" + getName()));
-      fail("The region is no longer managed, hence an exception is expected");
-    } catch (IllegalStateException e) {
-      // exception expected as the region is no longer managed
-    }
-  }
-  
-  public void testCompactionEvents() throws Exception {
-    final AtomicInteger counter = new AtomicInteger(0);
-    HoplogListener myListener = new HoplogListener() {
-      public void hoplogDeleted(String regionFolder, int bucketId, Hoplog... oplogs)
-          throws IOException {
-      }
-      public void hoplogCreated(String regionFolder, int bucketId, Hoplog... oplogs)
-          throws IOException {
-      }
-      public void compactionCompleted(String region, int bucket, boolean isMajor) {
-        counter.incrementAndGet();
-      }
-    };
-
-    HoplogListenerForRegion listenerManager = ((LocalRegion)region).getHoplogListener();
-    listenerManager.addListener(myListener);
-    
-    HoplogOrganizer bucket = regionManager.create(0);
-    // #1
-    ArrayList<PersistedEventImpl> items = new ArrayList<PersistedEventImpl>();
-    items.add(new TestEvent("1", "1"));
-    bucket.flush(items.iterator(), items.size());
-
-    // #2
-    items.clear();
-    items.add(new TestEvent("2", "1"));
-    bucket.flush(items.iterator(), items.size());
-
-    // #3
-    items.clear();
-    items.add(new TestEvent("3", "1"));
-    bucket.flush(items.iterator(), items.size());
-    
-    // #4
-    items.clear();
-    items.add(new TestEvent("4", "1"));
-    bucket.flush(items.iterator(), items.size());
-    
-    bucket.getCompactor().compact(false, false);
-    assertEquals(1, counter.get());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
deleted file mode 100644
index 1d17232..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSStatsJUnitTest.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.util.ArrayList;
-
-import org.apache.hadoop.fs.Path;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.internal.util.BlobHelper;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HDFSStatsJUnitTest extends BaseHoplogTestCase {
-  public void testStoreUsageStats() throws Exception {
-    HoplogOrganizer bucket = regionManager.create(0);
-    
-    long oldUsage = 0;
-    assertEquals(oldUsage, stats.getStoreUsageBytes());
-
-    for (int j = 0; j < 5; j++) {
-      ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-      for (int i = 0; i < 100; i++) {
-        String key = ("key-" + (j * 100 + i));
-        String value = ("value-" + System.nanoTime());
-        items.add(new TestEvent(key, value));
-      }
-      bucket.flush(items.iterator(), 100);
-    }
-    
-    assertTrue(0 < stats.getStoreUsageBytes());
-    oldUsage = stats.getStoreUsageBytes();
-    
-    HdfsSortedOplogOrganizer organizer = new HdfsSortedOplogOrganizer(regionManager, 0);
-    assertEquals(2, stats.getStoreUsageBytes() / oldUsage);
-    
-    organizer.close();
-    assertEquals(1, stats.getStoreUsageBytes() / oldUsage);
-  }
-  
-  public void testWriteStats() throws Exception {
-    HoplogOrganizer bucket = regionManager.create(0);
-
-    // validate flush stats
-    // flush and create many hoplogs and execute one compaction cycle also
-    // 5 hoplogs, total 500 keys
-    assertEquals(0, stats.getFlush().getCount());
-    assertEquals(0, stats.getFlush().getBytes());
-    assertEquals(0, stats.getActiveFileCount());
-    int bytesSent = 0;
-    for (int j = 0; j < 5; j++) {
-      ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-      for (int i = 0; i < 100; i++) {
-        String key = ("key-" + (j * 100 + i));
-        String value = ("value-" + System.nanoTime());
-        items.add(new TestEvent(key, value));
-        bytesSent += (key.getBytes().length + value.getBytes().length);
-      }
-      bucket.flush(items.iterator(), 100);
-
-      // verify stats show
-      assertEquals(j + 1, stats.getFlush().getCount());
-      assertTrue(stats.getFlush().getBytes() > bytesSent);
-      assertEquals(j + 1, stats.getActiveFileCount());
-    }
-
-    // verify compaction stats
-    assertEquals(0, stats.getMinorCompaction().getCount());
-    assertEquals(0, stats.getMinorCompaction().getBytes());
-    assertEquals(0, stats.getInactiveFileCount());
-    bucket.getCompactor().compact(false, false);
-    assertEquals(1, stats.getMinorCompaction().getCount());
-    assertEquals(1, stats.getActiveFileCount());
-    assertEquals(0, stats.getInactiveFileCount());
-    assertEquals(stats.getMinorCompaction().getBytes(), stats.getFlush()
-        .getBytes());
-  }
-  
-  public void testInactiveFileStats() throws Exception {
-    // steps 
-    // create files -> validate active and inactive file count
-    // -> increment reference by using scanner-> compact -> verify active and inactive
file count 
-    HoplogOrganizer bucket = regionManager.create(0);
-    assertEquals(0, stats.getActiveFileCount());
-    assertEquals(0, stats.getInactiveFileCount());
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    for (int j = 0; j < 5; j++) {
-      items.clear();
-      for (int i = 0; i < 100; i++) {
-        String key = ("key-" + (j * 100 + i));
-        String value = ("value-" + System.nanoTime());
-        items.add(new TestEvent(key, value));
-      }
-      bucket.flush(items.iterator(), 100);
-    }
-    
-    assertEquals(5, stats.getActiveFileCount());
-    assertEquals(0, stats.getInactiveFileCount());
-    
-    HoplogIterator<byte[], PersistedEventImpl> scanner = bucket.scan();
-    bucket.getCompactor().compact(true, false);
-    assertEquals(1, stats.getActiveFileCount());
-    assertEquals(5, stats.getInactiveFileCount());
-    
-    scanner.close();
-    assertEquals(1, stats.getActiveFileCount());
-    assertEquals(0, stats.getInactiveFileCount());
-  }
-
-  public void testReadStats() throws Exception {
-    HoplogOrganizer<SortedHoplogPersistedEvent> bucket = regionManager.create(0);
-
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    for (int i = 0; i < 100; i++) {
-      items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
-    }
-    bucket.flush(items.iterator(), 100);
-    
-    // validate read stats
-    assertEquals(0, stats.getRead().getCount());
-    assertEquals(0, stats.getRead().getBytes());
-    // number of bytes read must be greater than size of key and value and must be increasing
-    int bytesRead = "key-1".getBytes().length + "value=1233232".getBytes().length;
-    for (int i = 0; i < 5; i++) {
-      long previousRead = stats.getRead().getBytes();
-      PersistedEventImpl e = bucket.read(BlobHelper.serializeToBlob("key-" + i));
-      assertNotNull(e);
-      assertEquals(i + 1, stats.getRead().getCount());
-      assertTrue( (bytesRead + previousRead) < stats.getRead().getBytes());
-    }
-    
-    //Make sure the block cache stats are being updated.
-//    assertTrue(storeStats.getBlockCache().getMisses() > 0);
-//    assertTrue(storeStats.getBlockCache().getBytesCached() > 0);
-//    assertTrue(storeStats.getBlockCache().getCached() > 0);
-    
-    //Do a duplicate read to make sure we get a hit in the cache
-//    bucket.read(BlobHelper.serializeToBlob("key-" + 0));
-//    assertTrue(storeStats.getBlockCache().getHits() > 0);
-  }
-
-  public void testBloomStats() throws Exception {
-    HoplogOrganizer bucket = regionManager.create(0);
-
-    // create 10 hoplogs
-    for (int j = 0; j < 5; j++) {
-      ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-      for (int i = 0; i < 100; i++) {
-        String key = ("key-" + (j * 100 + i));
-        String value = ("value-" + System.nanoTime());
-        items.add(new TestEvent(key, value));
-      }
-      bucket.flush(items.iterator(), 100);
-    }
-
-    // initially bloom stat will be zero
-    // reading key in first hop will increase bloom hit by 1 (key 0 to 99)
-    // reading key in 5 hoplog will increase bloom hit by 5 (key 400 to 499)
-    assertEquals(0, stats.getBloom().getCount());
-    bucket.read(BlobHelper.serializeToBlob("key-450"));
-    assertEquals(1, stats.getBloom().getCount());
-    bucket.read(BlobHelper.serializeToBlob("key-50"));
-    assertEquals(6, stats.getBloom().getCount());
-  }
-  
-  public void testScanStats() throws Exception {
-    HFileSortedOplog hoplog = new HFileSortedOplog(hdfsStore, new Path(
-          testDataDir, "H-1-1.hop"),blockCache, stats, storeStats);
-    createHoplog(5, hoplog);
-    
-    // initially scan stats will be zero. creating a scanner should increase
-    // scan iteration stats and bytes. On scanner close scan count should be
-    // incremented
-    assertEquals(0, stats.getScan().getCount());
-    assertEquals(0, stats.getScan().getBytes());
-    assertEquals(0, stats.getScan().getTime());
-    assertEquals(0, stats.getScan().getIterations());
-    assertEquals(0, stats.getScan().getIterationTime());
-    
-    HoplogIterator<byte[], byte[]> scanner = hoplog.getReader().scan();
-    assertEquals(0, stats.getScan().getCount());
-    int count = 0;
-    for (byte[] bs = null; scanner.hasNext(); ) {
-      bs = scanner.next();
-      count += bs.length + scanner.getValue().length;
-    }
-    assertEquals(count, stats.getScan().getBytes());
-    assertEquals(5, stats.getScan().getIterations());
-    assertTrue(0 < stats.getScan().getIterationTime());
-    // getcount will be 0 as scanner.close is not being called
-    assertEquals(0, stats.getScan().getCount());
-    assertEquals(0, stats.getScan().getTime());
-    assertEquals(1, stats.getScan().getInProgress());
-    
-    scanner.close();
-    assertEquals(1, stats.getScan().getCount());
-    assertTrue(0 < stats.getScan().getTime());
-    assertTrue(stats.getScan().getIterationTime() <= stats.getScan().getTime());
-  }
-  
-  /**
-   * Validates two buckets belonging to same region update the same stats
-   */
-  public void testRegionBucketShareStats() throws Exception {
-    HoplogOrganizer bucket1 = regionManager.create(0);
-    HoplogOrganizer bucket2 = regionManager.create(1);
-
-    // validate flush stats
-    assertEquals(0, stats.getFlush().getCount());
-    assertEquals(0, stats.getActiveFileCount());
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    for (int i = 0; i < 100; i++) {
-      items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
-    }
-    bucket1.flush(items.iterator(), 100);
-    assertEquals(1, stats.getFlush().getCount());
-    assertEquals(1, stats.getActiveFileCount());
-    items.clear();
-
-    for (int i = 0; i < 100; i++) {
-      items.add(new TestEvent("key-" + i, "value-" + System.nanoTime()));
-    }
-    bucket2.flush(items.iterator(), 100);
-    assertEquals(2, stats.getFlush().getCount());
-    assertEquals(2, stats.getActiveFileCount());
-  }
-
-  @Override
-  protected Cache createCache() {
-    CacheFactory cf = new CacheFactory().set("mcast-port", "0")
-        .set("log-level", "info")
-        .set("enable-time-statistics", "true")
-//        .set("statistic-archive-file", "statArchive.gfs")
-        ;
-    cache = cf.create();
-
-    return cache;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/74c3156a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
deleted file mode 100644
index ab1ccac..0000000
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HDFSUnsortedHoplogOrganizerJUnitTest.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
- */
-package com.gemstone.gemfire.cache.hdfs.internal.hoplog;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.hdfs.HDFSStoreMutator;
-import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
-import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator;
-import com.gemstone.gemfire.cache.hdfs.internal.hoplog.SequenceFileHoplog.SequenceFileIterator;
-import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerHelper;
-import com.gemstone.gemfire.test.junit.categories.HoplogTest;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-/**
- * Test class to test hoplog functionality for streaming ingest 
- * 
- * @author hemantb
- *
- */
-@Category({IntegrationTest.class, HoplogTest.class})
-public class HDFSUnsortedHoplogOrganizerJUnitTest extends BaseHoplogTestCase {
- 
-  /**
-   * Tests flush operation
-   */
-  public void testFlush() throws Exception {
-    int count = 10;
-    int bucketId = (int) System.nanoTime();
-    HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager,
bucketId);
-
-    // flush and create hoplog
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    for (int i = 0; i < count; i++) {
-      items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
-    }
-    
-    organizer.flush(items.iterator(), count);
-    organizer.closeCurrentWriter();
-    
-    // check file existence in bucket directory
-    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
-                      HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
-
-    // only one hoplog should exists
-    assertEquals(1, hoplogs.length);
-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
-  }
-  
-  public void testAlterRollOverInterval() throws Exception {
-    HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager,
0);
-    
-    // flush 4 times with small delays. Only one seq file will be created
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    for (int j = 0; j < 3; j++) {
-      items.clear();
-      for (int i = 0; i < 10; i++) {
-        items.add(new TestEvent(("key-" + (i + 10 * j)), ("value-" + System.nanoTime())));
-      }
-      organizer.flush(items.iterator(), 10);
-      TimeUnit.MILLISECONDS.sleep(1100);
-    }
-    organizer.closeCurrentWriter();
-    
-    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + 0,
-        HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
-    
-    // only one hoplog should exists
-    assertEquals(1, hoplogs.length);
-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
-    
-    HDFSStoreMutator mutator = hdfsStore.createHdfsStoreMutator();
-    mutator.setWriteOnlyFileRolloverInterval(1);
-    hdfsStore.alter(mutator);
-    
-    TimeUnit.MILLISECONDS.sleep(1100);
-    for (int j = 0; j < 2; j++) {
-      items.clear();
-      for (int i = 0; i < 10; i++) {
-        items.add(new TestEvent(("key-" + (i + 10 * j)), ("value-" + System.nanoTime())));
-      }
-      organizer.flush(items.iterator(), 10);
-      TimeUnit.MILLISECONDS.sleep(1100);
-    }
-    organizer.closeCurrentWriter();
-    hoplogs = getBucketHoplogs(getName() + "/" + 0,
-        HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
-    assertEquals(3, hoplogs.length);
-  }
-  
-  public void testSequenceFileScan() throws Exception {
-    int count = 10000;
-    int bucketId = (int) System.nanoTime();
-    HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager,
bucketId);
-
-    // flush and create hoplog
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    for (int i = 0; i < count; i++) {
-      items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
-    }
-    
-    organizer.flush(items.iterator(), count);
-    organizer.closeCurrentWriter();
-    
-    // check file existence in bucket directory
-    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
-                      HdfsSortedOplogOrganizer.SEQ_HOPLOG_EXTENSION);
-
-    // only one hoplog should exists
-    assertEquals(1, hoplogs.length);
-    
-    SequenceFileDetails sfd = getSequenceFileDetails(hdfsStore.getFileSystem(), hoplogs[0].getPath());
-    
-    // End position is before a sync. Should read until sync.
-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, sfd.indexOfKeyBeforeSecondSync
,
-        0, sfd.posBeforeSecondSync);
-    
-    // Start position is inside header. Should start from first key and go to next sync point.

-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, sfd.indexOfKeyBeforeSecondSync,

-        10, sfd.posAfterFirstSync);
-    
-    // Start and end position are between two sync markers. Should not read any keys.   

-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 29, 28, 
-        sfd.posAfterFirstSync, sfd.posBeforeSecondSync - sfd.posAfterFirstSync);
-    
-    // Start position is after a sync and End position is beyond the file size. 
-    //Should read all the records after the next sync.
-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), sfd.indexOfKeyAfterFirstSync,
9999, 
-        sfd.posBeforeFirstSync, 10000000);
-    
-    // Should read all the records. 
-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0, 9999, 0, -1);
-  }
-  
-  class SequenceFileDetails {
-    public int posBeforeFirstSync;
-    public int indexOfKeyBeforeFirstSync;
-    
-    public int posAfterFirstSync;
-    public int indexOfKeyAfterFirstSync; 
-    
-    public int posBeforeSecondSync;
-    public int indexOfKeyBeforeSecondSync;
-  }
-  
-  public SequenceFileDetails getSequenceFileDetails(FileSystem inputFS, Path sequenceFileName)
throws Exception {
-    SequenceFileDetails fd = new SequenceFileDetails();
-    SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
-      
-    SequenceFileIterator iter = (SequenceFileIterator)hoplog.getReader().scan();;
-    int currentkeyStartPos = 0;
-    int cursorPos = 0;
-    String currentKey = null;
-    boolean firstSyncSeen = false; 
-    try {
-      while (iter.hasNext()) {
-        iter.next();
-        currentkeyStartPos = cursorPos;
-        currentKey = ((String)CacheServerHelper.deserialize(iter.getKey()));
-        cursorPos = (int)iter.getPosition();
-        if (iter.syncSeen()){
-          if (firstSyncSeen) {
-            
-            fd.posBeforeSecondSync = currentkeyStartPos;
-            fd.indexOfKeyBeforeSecondSync = Integer.parseInt(currentKey.substring(4));
-            break;
-          } else {
-            fd.posBeforeFirstSync = currentkeyStartPos;
-            fd.indexOfKeyBeforeFirstSync = Integer.parseInt(currentKey.substring(4));
-            
-            fd.posAfterFirstSync = cursorPos;
-            fd.indexOfKeyAfterFirstSync = Integer.parseInt(currentKey.substring(4)) + 1;
-            firstSyncSeen = true;
-          }
-        }
-      }
-
-    } catch (Exception e) {
-      assertTrue(e.toString(), false);
-    }
-    iter.close();
-    hoplog.close();
-    return fd;
-  }
-  
-  public void testClear() throws Exception {
-    int count = 10;
-    int bucketId = (int) System.nanoTime();
-    HDFSUnsortedHoplogOrganizer organizer = new HDFSUnsortedHoplogOrganizer(regionManager,
bucketId);
-
-    // flush and create hoplog
-    ArrayList<TestEvent> items = new ArrayList<TestEvent>();
-    for (int i = 0; i < count; i++) {
-      items.add(new TestEvent(("key-" + i), ("value-" + System.nanoTime())));
-    }
-    organizer.flush(items.iterator(), count);
-    organizer.closeCurrentWriter();
-    // check file existence in bucket directory
-    FileStatus[] hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
-                      AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
-    assertEquals(1, hoplogs.length);
-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
-    
-    
-    // write another batch but do not close the data. 
-    organizer.flush(items.iterator(), count);
-    
-    organizer.clear();
-    
-    hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
-        AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
-    // check file existence in bucket directory
-    FileStatus[] expiredhoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
-                      AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
-    
-    // two expired hoplog should exists
-    assertEquals(2, expiredhoplogs.length);
-    assertEquals(2, hoplogs.length);
-    // check the expired hops name should be same 
-    assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[0].getPath().getName()+
AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) || 
-        expiredhoplogs[1].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)
);
-    assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[1].getPath().getName()+
AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) || 
-        expiredhoplogs[1].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)
);
-    
-    // Test that second time clear should be harmless and should not result in extra files.

-    organizer.clear();
-    hoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
-        AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION);
-    // check file existence in bucket directory
-    expiredhoplogs = getBucketHoplogs(getName() + "/" + bucketId, 
-                      AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION);
-    
-    // two expired hoplog should exists
-    assertEquals(2, expiredhoplogs.length);
-    assertEquals(2, hoplogs.length);
-    // check the expired hops name should be same 
-    assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[0].getPath().getName()+
AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) || 
-        expiredhoplogs[1].getPath().getName().equals(hoplogs[0].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)
);
-    assertTrue(expiredhoplogs[0].getPath().getName().equals(hoplogs[1].getPath().getName()+
AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION) || 
-        expiredhoplogs[1].getPath().getName().equals(hoplogs[1].getPath().getName()+ AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION)
);
-    
-    
-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[0].getPath(), 0);
-    readSequenceFile(hdfsStore.getFileSystem(), hoplogs[1].getPath(), 0);
-  }
-  
-  public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, int index)  throws
IOException{
-    readSequenceFile(inputFS, sequenceFileName, index, -1, 0, -1);
-  }
-  /**
-   * Reads the sequence file assuming that it has keys and values starting from index that

-   * is specified as parameter. 
-   * 
-   */
-  public void readSequenceFile(FileSystem inputFS, Path sequenceFileName, int index, int
endIndex,
-      int startoffset, int length) throws IOException {
-    SequenceFileHoplog hoplog = new SequenceFileHoplog(inputFS, sequenceFileName, null);
-    
-    HoplogIterator<byte[], byte[]> iter = null;
-    if (length == -1){
-      iter = hoplog.getReader().scan();
-    }
-    else {
-      iter = hoplog.getReader().scan(startoffset, length);
-    }
-    
-    try {
-      while (iter.hasNext()) {
-        iter.next();
-        PersistedEventImpl te = UnsortedHoplogPersistedEvent.fromBytes(iter.getValue());
-        String stringkey = ((String)CacheServerHelper.deserialize(iter.getKey()));
-        assertTrue("Expected key: key-" + index + ". Actual key: " + stringkey , ((String)stringkey).equals("key-"
+ index));
-        index++;
-      }
-      if (endIndex != -1)
-      assertTrue ("The keys should have been until key-"+ endIndex + " but they are until
key-"+ (index-1),  index == endIndex + 1) ;
-    } catch (Exception e) {
-      assertTrue(e.toString(), false);
-    }
-    iter.close();
-    hoplog.close();
- }
-
-}



Mime
View raw message