hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jxi...@apache.org
Subject svn commit: r1504647 [1/2] - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/master/ hbase-server/src/main/java/org/a...
Date Thu, 18 Jul 2013 20:42:36 GMT
Author: jxiang
Date: Thu Jul 18 20:42:35 2013
New Revision: 1504647

URL: http://svn.apache.org/r1504647
Log:
HBASE-8962 Clean up code and remove regular log splitting

Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/OrphanHLogAfterSplitException.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/OrphanHLogAfterSplitException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/OrphanHLogAfterSplitException.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/OrphanHLogAfterSplitException.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/OrphanHLogAfterSplitException.java Thu Jul 18 20:42:35 2013
@@ -25,6 +25,8 @@ import java.io.IOException;
 @InterfaceAudience.Private
 public class OrphanHLogAfterSplitException extends IOException {
 
+  private static final long serialVersionUID = -4363805979687710634L;
+
   /**
    * Create this exception without a message
    */

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Jul 18 20:42:35 2013
@@ -728,10 +728,6 @@ public final class HConstants {
 
   public static final String LOCALHOST_IP = "127.0.0.1";
 
-  /** Conf key that enables distributed log splitting */
-  public static final String DISTRIBUTED_LOG_SPLITTING_KEY =
-      "hbase.master.distributed.log.splitting";
-
   /** Conf key that enables unflushed WAL edits directly being replayed to region servers */
   public static final String DISTRIBUTED_LOG_REPLAY_KEY = "hbase.master.distributed.log.replay";
   public static final boolean DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG = false;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Thu Jul 18 20:42:35 2013
@@ -48,11 +48,9 @@ import org.apache.hadoop.hbase.catalog.M
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.InvalidFamilyOperationException;
-import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
 import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -89,7 +87,6 @@ public class MasterFileSystem {
   // create the split log lock
   final Lock splitLogLock = new ReentrantLock();
   final boolean distributedLogReplay;
-  final boolean distributedLogSplitting;
   final SplitLogManager splitLogManager;
   private final MasterServices services;
 
@@ -125,12 +122,8 @@ public class MasterFileSystem {
     // make sure the fs has the same conf
     fs.setConf(conf);
     this.splitLogManager = new SplitLogManager(master.getZooKeeper(), master.getConfiguration(),
-        master, services, master.getServerName());
-    this.distributedLogSplitting = conf.getBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
-    if (this.distributedLogSplitting) {
-      this.splitLogManager.finishInitialization(masterRecovery);
-    }
-    this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
+        master, services, master.getServerName(), masterRecovery);
+    this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
     // setup the filesystem variable
     // set up the archived logs path
@@ -305,16 +298,7 @@ public class MasterFileSystem {
    * @throws IOException
    */
   public void splitMetaLog(final Set<ServerName> serverNames) throws IOException {
-    long splitTime = 0, splitLogSize = 0;
-    List<Path> logDirs = getLogDirs(serverNames);
-
-    splitLogManager.handleDeadWorkers(serverNames);
-    splitTime = EnvironmentEdgeManager.currentTimeMillis();
-    splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, META_FILTER);
-    splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
-    if (this.metricsMaster != null) {
-      this.metricsMaster.addMetaWALSplit(splitTime, splitLogSize);
-    }
+    splitLog(serverNames, META_FILTER);
   }
 
   private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException {
@@ -419,40 +403,13 @@ public class MasterFileSystem {
     long splitTime = 0, splitLogSize = 0;
     List<Path> logDirs = getLogDirs(serverNames);
 
-    if (distributedLogSplitting) {
-      splitLogManager.handleDeadWorkers(serverNames);
-      splitTime = EnvironmentEdgeManager.currentTimeMillis();
-      splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
-      splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
-    } else {
-      for(Path logDir: logDirs){
-        // splitLogLock ensures that dead region servers' logs are processed
-        // one at a time
-        this.splitLogLock.lock();
-        try {
-          HLogSplitter splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir, oldLogDir,
-            this.fs);
-          try {
-            // If FS is in safe mode, just wait till out of it.
-            FSUtils.waitOnSafeMode(conf, conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 1000));
-            splitter.splitLog();
-          } catch (OrphanHLogAfterSplitException e) {
-            LOG.warn("Retrying splitting because of:", e);
-            //An HLogSplitter instance can only be used once.  Get new instance.
-            splitter = HLogSplitter.createLogSplitter(conf, rootdir, logDir,
-              oldLogDir, this.fs);
-            splitter.splitLog();
-          }
-          splitTime = splitter.getTime();
-          splitLogSize = splitter.getSize();
-        } finally {
-          this.splitLogLock.unlock();
-        }
-      }
-    }
+    splitLogManager.handleDeadWorkers(serverNames);
+    splitTime = EnvironmentEdgeManager.currentTimeMillis();
+    splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
+    splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
 
     if (this.metricsMaster != null) {
-      if (filter == this.META_FILTER) {
+      if (filter == META_FILTER) {
         this.metricsMaster.addMetaWALSplit(splitTime, splitLogSize);
       } else {
         this.metricsMaster.addSplit(splitTime, splitLogSize);
@@ -469,6 +426,7 @@ public class MasterFileSystem {
    * needed populating the directory with necessary bootup files).
    * @throws IOException
    */
+  @SuppressWarnings("deprecation")
   private Path checkRootDir(final Path rd, final Configuration c,
     final FileSystem fs)
   throws IOException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java Thu Jul 18 20:42:35 2013
@@ -149,19 +149,39 @@ public class SplitLogManager extends Zoo
 
   /**
    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
-   *   Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)}
+   *   Stoppable stopper, MasterServices master, ServerName serverName,
+   *   boolean masterRecovery, TaskFinisher tf)}
+   * with masterRecovery = false, and tf = null.  Used in unit tests.
+   *
+   * @param zkw the ZK watcher
+   * @param conf the HBase configuration
+   * @param stopper the stoppable in case anything is wrong
+   * @param master the master services
+   * @param serverName the master server name
+   */
+  public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
+      Stoppable stopper, MasterServices master, ServerName serverName) {
+    this(zkw, conf, stopper, master, serverName, false, null);
+  }
+
+  /**
+   * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
+   *   Stoppable stopper, MasterServices master, ServerName serverName,
+   *   boolean masterRecovery, TaskFinisher tf)}
    * that provides a task finisher for copying recovered edits to their final destination.
    * The task finisher has to be robust because it can be arbitrarily restarted or called
    * multiple times.
-   * 
-   * @param zkw
-   * @param conf
-   * @param stopper
-   * @param serverName
+   *
+   * @param zkw the ZK watcher
+   * @param conf the HBase configuration
+   * @param stopper the stoppable in case anything is wrong
+   * @param master the master services
+   * @param serverName the master server name
+   * @param masterRecovery an indication if the master is in recovery
    */
   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
-       Stoppable stopper, MasterServices master, ServerName serverName) {
-    this(zkw, conf, stopper,  master, serverName, new TaskFinisher() {
+      Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) {
+    this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
       @Override
       public Status finish(ServerName workerName, String logfile) {
         try {
@@ -180,14 +200,17 @@ public class SplitLogManager extends Zoo
    * does lookup the orphan tasks in zk but it doesn't block waiting for them
    * to be done.
    *
-   * @param zkw
-   * @param conf
-   * @param stopper
-   * @param serverName
-   * @param tf task finisher 
+   * @param zkw the ZK watcher
+   * @param conf the HBase configuration
+   * @param stopper the stoppable in case anything is wrong
+   * @param master the master services
+   * @param serverName the master server name
+   * @param masterRecovery an indication if the master is in recovery
+   * @param tf task finisher
    */
   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
-        Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf) {
+        Stoppable stopper, MasterServices master,
+        ServerName serverName, boolean masterRecovery, TaskFinisher tf) {
     super(zkw);
     this.taskFinisher = tf;
     this.conf = conf;
@@ -205,12 +228,10 @@ public class SplitLogManager extends Zoo
       conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
 
     this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
-    this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
+    this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
     LOG.info("distributedLogReplay = " + this.distributedLogReplay);
-  }
 
-  public void finishInitialization(boolean masterRecovery) {
     if (!masterRecovery) {
       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
           + ".splitLogManagerTimeoutMonitor");
@@ -1646,12 +1667,4 @@ public class SplitLogManager extends Zoo
       return statusMsg;
     }
   }
-  
-  /**
-   * Completes the initialization
-   */
-  public void finishInitialization() {
-    finishInitialization(false);
-  }
-  
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Thu Jul 18 20:42:35 2013
@@ -866,6 +866,7 @@ class FSHLog implements HLog, Syncable {
    * @return txid of this transaction
    * @throws IOException
    */
+  @SuppressWarnings("deprecation")
   private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId,
       final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore)
     throws IOException {
@@ -1342,15 +1343,13 @@ class FSHLog implements HLog, Syncable {
     if (!fs.exists(p)) {
       throw new FileNotFoundException(p.toString());
     }
-    final Path baseDir = FSUtils.getRootDir(conf);
-    final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
     if (!fs.getFileStatus(p).isDir()) {
       throw new IOException(p + " is not a directory");
     }
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(
-        conf, baseDir, p, oldLogDir, fs);
-    logSplitter.splitLog();
+    final Path baseDir = FSUtils.getRootDir(conf);
+    final Path oldLogDir = new Path(baseDir, HConstants.HREGION_OLDLOGDIR_NAME);
+    HLogSplitter.split(baseDir, p, oldLogDir, fs, conf);
   }
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java Thu Jul 18 20:42:35 2013
@@ -22,13 +22,9 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.net.ConnectException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -39,15 +35,12 @@ import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -58,7 +51,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -71,13 +63,10 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -85,7 +74,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
 import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@@ -98,15 +86,11 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.hbase.zookeeper.ZKTable;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -118,19 +102,10 @@ import com.google.common.collect.Lists;
  */
 @InterfaceAudience.Private
 public class HLogSplitter {
-  private static final String LOG_SPLITTER_IMPL = "hbase.hlog.splitter.impl";
-
   static final Log LOG = LogFactory.getLog(HLogSplitter.class);
 
-  private boolean hasSplit = false;
-  private long splitTime = 0;
-  private long splitSize = 0;
-
-
   // Parameters for split process
   protected final Path rootDir;
-  protected final Path srcDir;
-  protected final Path oldLogDir;
   protected final FileSystem fs;
   protected final Configuration conf;
 
@@ -172,62 +147,11 @@ public class HLogSplitter {
 
   // Min batch size when replay WAL edits
   private final int minBatchSize;
-  
-  /**
-   * Create a new HLogSplitter using the given {@link Configuration} and the
-   * <code>hbase.hlog.splitter.impl</code> property to derived the instance class to use.
-   * distributedLogReplay won't be enabled by this constructor.
-   * <p>
-   * @param conf
-   * @param rootDir hbase directory
-   * @param srcDir logs directory
-   * @param oldLogDir directory where processed logs are archived to
-   * @param fs FileSystem
-   * @return New HLogSplitter instance
-   */
-  public static HLogSplitter createLogSplitter(Configuration conf,
-      final Path rootDir, final Path srcDir,
-      Path oldLogDir, final FileSystem fs)  {
-
-    @SuppressWarnings("unchecked")
-    Class<? extends HLogSplitter> splitterClass = (Class<? extends HLogSplitter>) conf
-        .getClass(LOG_SPLITTER_IMPL, HLogSplitter.class);
-    try {
-       Constructor<? extends HLogSplitter> constructor =
-         splitterClass.getConstructor(
-          Configuration.class, // conf
-          Path.class, // rootDir
-          Path.class, // srcDir
-          Path.class, // oldLogDir
-          FileSystem.class, // fs
-          LastSequenceId.class);
-      return constructor.newInstance(conf, rootDir, srcDir, oldLogDir, fs, null);
-    } catch (IllegalArgumentException e) {
-      throw new RuntimeException(e);
-    } catch (InstantiationException e) {
-      throw new RuntimeException(e);
-    } catch (IllegalAccessException e) {
-      throw new RuntimeException(e);
-    } catch (InvocationTargetException e) {
-      throw new RuntimeException(e);
-    } catch (SecurityException e) {
-      throw new RuntimeException(e);
-    } catch (NoSuchMethodException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
-      Path oldLogDir, FileSystem fs, LastSequenceId idChecker) {
-      this(conf, rootDir, srcDir, oldLogDir, fs, idChecker, null);
-  }
 
-  public HLogSplitter(Configuration conf, Path rootDir, Path srcDir,
-      Path oldLogDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
+  HLogSplitter(Configuration conf, Path rootDir,
+      FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
     this.conf = conf;
     this.rootDir = rootDir;
-    this.srcDir = srcDir;
-    this.oldLogDir = oldLogDir;
     this.fs = fs;
     this.sequenceIdChecker = idChecker;
     this.watcher = zkw;
@@ -253,198 +177,8 @@ public class HLogSplitter {
   }
 
   /**
-   * Split up a bunch of regionserver commit log files that are no longer being
-   * written to, into new files, one per region for region to replay on startup.
-   * Delete the old log files when finished.
-   *
-   * @throws IOException will throw if corrupted hlogs aren't tolerated
-   * @return the list of splits
-   */
-  public List<Path> splitLog()
-      throws IOException {
-    return splitLog((CountDownLatch) null);
-  }
-  
-  /**
-   * Split up a bunch of regionserver commit log files that are no longer being
-   * written to, into new files, one per region for region to replay on startup.
-   * Delete the old log files when finished.
-   *
-   * @param latch
-   * @throws IOException will throw if corrupted hlogs aren't tolerated
-   * @return the list of splits
-   */
-  public List<Path> splitLog(CountDownLatch latch)
-      throws IOException {
-    Preconditions.checkState(!hasSplit,
-        "An HLogSplitter instance may only be used once");
-    hasSplit = true;
-
-    status = TaskMonitor.get().createStatus(
-        "Splitting logs in " + srcDir);
-
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
-
-    status.setStatus("Determining files to split...");
-    List<Path> splits = null;
-    if (!fs.exists(srcDir)) {
-      // Nothing to do
-      status.markComplete("No log directory existed to split.");
-      return splits;
-    }
-    FileStatus[] logfiles = fs.listStatus(srcDir);
-    if (logfiles == null || logfiles.length == 0) {
-      // Nothing to do
-      return splits;
-    }
-    logAndReport("Splitting " + logfiles.length + " hlog(s) in "
-    + srcDir.toString());
-    splits = splitLog(logfiles, latch);
-
-    splitTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
-    String msg = "hlog file splitting completed in " + splitTime +
-        " ms for " + srcDir.toString();
-    status.markComplete(msg);
-    LOG.info(msg);
-    return splits;
-  }
-
-  private void logAndReport(String msg) {
-    status.setStatus(msg);
-    LOG.info(msg);
-  }
-
-  /**
-   * @return time that this split took
-   */
-  public long getTime() {
-    return this.splitTime;
-  }
-
-  /**
-   * @return aggregate size of hlogs that were split
-   */
-  public long getSize() {
-    return this.splitSize;
-  }
-
-  /**
-   * @return a map from encoded region ID to the number of edits written out
-   * for that region.
-   */
-  Map<byte[], Long> getOutputCounts() {
-    Preconditions.checkState(hasSplit);
-    return outputSink.getOutputCounts();
-  }
-
-  /**
-   * Splits or Replays the HLog edits in the given list of logfiles (that are a mix of edits on
-   * multiple regions) by region and then splits(or replay when distributedLogReplay is true) them
-   * per region directories, in batches.
-   * <p>
-   * This process is split into multiple threads. In the main thread, we loop through the logs to be
-   * split. For each log, we:
-   * <ul>
-   * <li>Recover it (take and drop HDFS lease) to ensure no other process can write</li>
-   * <li>Read each edit (see {@link #parseHLog}</li>
-   * <li>Mark as "processed" or "corrupt" depending on outcome</li>
-   * </ul>
-   * <p>
-   * Each edit is passed into the EntryBuffers instance, which takes care of memory accounting and
-   * splitting the edits by region.
-   * <p>
-   * The OutputSink object then manages N other WriterThreads which pull chunks of edits from
-   * EntryBuffers and write them to either recovered.edits files or replay them to newly assigned
-   * region servers directly
-   * <p>
-   * After the process is complete, the log files are archived to a separate directory.
-   */
-  private List<Path> splitLog(final FileStatus[] logfiles, CountDownLatch latch)
-      throws IOException {
-    List<Path> processedLogs = new ArrayList<Path>(logfiles.length);
-    List<Path> corruptedLogs = new ArrayList<Path>(logfiles.length);
-    List<Path> splits;
-
-    boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
-
-    countTotalBytes(logfiles);
-    splitSize = 0;
-
-    outputSink.startWriterThreads();
-
-    try {
-      int i = 0;
-      for (FileStatus log : logfiles) {
-       Path logPath = log.getPath();
-        long logLength = log.getLen();
-        splitSize += logLength;
-        logAndReport("Splitting hlog " + (i++ + 1) + " of " + logfiles.length
-            + ": " + logPath + ", length=" + logLength);
-        Reader in = null;
-        try {
-          //actually, for meta-only hlogs, we don't need to go thru the process
-          //of parsing and segregating by regions since all the logs are for
-          //meta only. However, there is a sequence number that can be obtained
-          //only by parsing.. so we parse for all files currently
-          //TODO: optimize this part somehow
-          in = getReader(fs, log, conf, skipErrors, null);
-          if (in != null) {
-            parseHLog(in, logPath, entryBuffers, fs, conf, skipErrors);
-          }
-          processedLogs.add(logPath);
-        } catch (CorruptedLogFileException e) {
-          LOG.info("Got while parsing hlog " + logPath +
-              ". Marking as corrupted", e);
-          corruptedLogs.add(logPath);
-        } finally {
-          if (in != null) {
-            try {
-              in.close();
-            } catch (IOException e) {
-              LOG.warn("Close log reader threw exception -- continuing", e);
-            }
-          }
-        }
-      }
-      status.setStatus("Log splits complete. Checking for orphaned logs.");
-
-      if (latch != null) {
-        try {
-          latch.await();
-        } catch (InterruptedException ie) {
-          LOG.warn("wait for latch interrupted");
-          Thread.currentThread().interrupt();
-        }
-      }
-      FileStatus[] currFiles = fs.listStatus(srcDir);
-      if (currFiles.length > processedLogs.size()
-          + corruptedLogs.size()) {
-        throw new OrphanHLogAfterSplitException(
-          "Discovered orphan hlog after split. Maybe the "
-            + "HRegionServer was not dead when we started");
-      }
-    } finally {
-      status.setStatus("Finishing writing output logs and closing down.");
-      splits = outputSink.finishWritingAndClose();
-    }
-    status.setStatus("Archiving logs after completed split");
-    archiveLogs(srcDir, corruptedLogs, processedLogs, oldLogDir, fs, conf);
-    return splits;
-  }
-
-  /**
-   * @return the total size of the passed list of files.
-   */
-  private static long countTotalBytes(FileStatus[] logfiles) {
-    long ret = 0;
-    for (FileStatus stat : logfiles) {
-      ret += stat.getLen();
-    }
-    return ret;
-  }
-
-  /**
-   * Splits a HLog file into region's recovered-edits directory
+   * Splits a HLog file into region's recovered-edits directory.
+   * This is the main entry point for distributed log splitting from SplitLogWorker.
    * <p>
    * If the log file has N regions then N recovered.edits files will be produced.
    * <p>
@@ -459,34 +193,40 @@ public class HLogSplitter {
    * @return false if it is interrupted by the progress-able.
    * @throws IOException
    */
-  static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
+  public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
       Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
-      ZooKeeperWatcher zkw)
-      throws IOException {
-    HLogSplitter s = new HLogSplitter(conf, rootDir, null, null/* oldLogDir */, fs, idChecker, zkw);
+      ZooKeeperWatcher zkw) throws IOException {
+    HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw);
     return s.splitLogFile(logfile, reporter);
   }
 
-  /**
-   * Splits a HLog file into region's recovered-edits directory
-   * <p>
-   * If the log file has N regions then N recovered.edits files will be produced.
-   * <p>
-   * @param rootDir
-   * @param logfile
-   * @param fs
-   * @param conf
-   * @param reporter
-   * @return false if it is interrupted by the progress-able.
-   * @throws IOException
-   */
-  static public boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
-      Configuration conf, CancelableProgressable reporter)
-      throws IOException {
-    return HLogSplitter.splitLogFile(rootDir, logfile, fs, conf, reporter, null, null);
+  // A wrapper to split one log folder using the method used by distributed
+  // log splitting. Used by tools and unit tests. It should be package private.
+  // It is public only because TestWALObserver is in a different package,
+  // which uses this method to to log splitting.
+  public static List<Path> split(Path rootDir, Path logDir, Path oldLogDir,
+      FileSystem fs, Configuration conf) throws IOException {
+    FileStatus[] logfiles = fs.listStatus(logDir);
+    List<Path> splits = new ArrayList<Path>();
+    if (logfiles != null && logfiles.length > 0) {
+      for (FileStatus logfile: logfiles) {
+        HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null);
+        if (s.splitLogFile(logfile, null)) {
+          finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
+          if (s.outputSink.splits != null) {
+            splits.addAll(s.outputSink.splits);
+          }
+        }
+      }
+    }
+    if (!fs.delete(logDir, true)) {
+      throw new IOException("Unable to delete src dir: " + logDir);
+    }
+    return splits;
   }
 
-  public boolean splitLogFile(FileStatus logfile,
+  // The real log splitter. It just splits one log file.
+  boolean splitLogFile(FileStatus logfile,
       CancelableProgressable reporter) throws IOException {
     boolean isCorrupted = false;
     Preconditions.checkState(status == null);
@@ -615,31 +355,31 @@ public class HLogSplitter {
    * @param conf
    * @throws IOException
    */
-  public static void finishSplitLogFile(String logfile, Configuration conf)
-      throws IOException {
+  public static void finishSplitLogFile(String logfile,
+      Configuration conf)  throws IOException {
     Path rootdir = FSUtils.getRootDir(conf);
     Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
-    finishSplitLogFile(rootdir, oldLogDir, logfile, conf);
+    Path logPath;
+    if (FSUtils.isStartingWithPath(rootdir, logfile)) {
+      logPath = new Path(logfile);
+    } else {
+      logPath = new Path(rootdir, logfile);
+    }
+    finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
   }
 
-  public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
-      String logfile, Configuration conf) throws IOException {
+  static void finishSplitLogFile(Path rootdir, Path oldLogDir,
+      Path logPath, Configuration conf) throws IOException {
     List<Path> processedLogs = new ArrayList<Path>();
     List<Path> corruptedLogs = new ArrayList<Path>();
     FileSystem fs;
     fs = rootdir.getFileSystem(conf);
-    Path logPath = null;
-    if (FSUtils.isStartingWithPath(rootdir, logfile)) {
-      logPath = new Path(logfile);
-    } else {
-      logPath = new Path(rootdir, logfile);
-    }
     if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
       corruptedLogs.add(logPath);
     } else {
       processedLogs.add(logPath);
     }
-    archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf);
+    archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
     Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
     fs.delete(stagingDir, true);
   }
@@ -657,7 +397,6 @@ public class HLogSplitter {
    * @throws IOException
    */
   private static void archiveLogs(
-      final Path srcDir,
       final List<Path> corruptedLogs,
       final List<Path> processedLogs, final Path oldLogDir,
       final FileSystem fs, final Configuration conf) throws IOException {
@@ -692,12 +431,6 @@ public class HLogSplitter {
         }
       }
     }
-
-    // distributed log splitting removes the srcDir (region's log dir) later
-    // when all the log files in that srcDir have been successfully processed
-    if (srcDir != null && !fs.delete(srcDir, true)) {
-      throw new IOException("Unable to delete src dir: " + srcDir);
-    }
   }
 
   /**
@@ -775,38 +508,6 @@ public class HLogSplitter {
   }
 
   /**
-   * Parse a single hlog and put the edits in entryBuffers
-   *
-   * @param in the hlog reader
-   * @param path the path of the log file
-   * @param entryBuffers the buffer to hold the parsed edits
-   * @param fs the file system
-   * @param conf the configuration
-   * @param skipErrors indicator if CorruptedLogFileException should be thrown instead of IOException
-   * @throws IOException
-   * @throws CorruptedLogFileException if hlog is corrupted
-   */
-  private void parseHLog(final Reader in, Path path,
-		EntryBuffers entryBuffers, final FileSystem fs,
-    final Configuration conf, boolean skipErrors)
-	throws IOException, CorruptedLogFileException {
-    int editsCount = 0;
-    try {
-      Entry entry;
-      while ((entry = getNextLogLine(in, path, skipErrors)) != null) {
-        entryBuffers.appendEntry(entry);
-        editsCount++;
-      }
-    } catch (InterruptedException ie) {
-      IOException t = new InterruptedIOException();
-      t.initCause(ie);
-      throw t;
-    } finally {
-      LOG.debug("Pushed=" + editsCount + " entries from " + path);
-    }
-  }
-
-  /**
    * Create a new {@link Reader} for reading logs to split.
    *
    * @param fs
@@ -823,7 +524,6 @@ public class HLogSplitter {
     long length = file.getLen();
     Reader in;
 
-
     // Check for possibly empty file. With appends, currently Hadoop reports a
     // zero length even if the file has been sync'd. Revisit if HDFS-376 or
     // HDFS-878 is committed.
@@ -896,7 +596,6 @@ public class HLogSplitter {
     }
   }
 
-
   private void writerThreadError(Throwable t) {
     thrown.compareAndSet(null, t);
   }
@@ -1078,7 +777,6 @@ public class HLogSplitter {
     }
   }
 
-
   class WriterThread extends Thread {
     private volatile boolean shouldStop = false;
     private OutputSink outputSink = null;
@@ -1127,7 +825,6 @@ public class HLogSplitter {
       }
     }
 
-
     private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
       outputSink.append(buffer);
     }
@@ -1140,37 +837,6 @@ public class HLogSplitter {
     }
   }
 
-  Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
-    List<String> components = new ArrayList<String>(10);
-    do {
-      components.add(edits.getName());
-      edits = edits.getParent();
-    } while (edits.depth() > rootdir.depth());
-    Path ret = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
-    for (int i = components.size() - 1; i >= 0; i--) {
-      ret = new Path(ret, components.get(i));
-    }
-    try {
-      if (fs.exists(ret)) {
-        LOG.warn("Found existing old temporary edits file. It could be the "
-            + "result of a previous failed split attempt. Deleting "
-            + ret + ", length="
-            + fs.getFileStatus(ret).getLen());
-        if (!fs.delete(ret, false)) {
-          LOG.warn("Failed delete of old " + ret);
-        }
-      }
-      Path dir = ret.getParent();
-      if (!fs.exists(dir)) {
-        if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
-      }
-    } catch (IOException e) {
-      LOG.warn("Could not prepare temp staging area ", e);
-      // ignore, exceptions will be thrown elsewhere
-    }
-    return ret;
-  }
-
   /**
    * The following class is an abstraction class to provide a common interface to support both
    * existing recovered edits file sink and region server WAL edits replay sink
@@ -1199,6 +865,8 @@ public class HLogSplitter {
 
     protected AtomicLong skippedEdits = new AtomicLong();
 
+    protected List<Path> splits = null;
+
     public OutputSink(int numWriters) {
       numThreads = numWriters;
     }
@@ -1334,7 +1002,10 @@ public class HLogSplitter {
           throw MultipleIOException.createIOException(thrown);
         }
       }
-      return (isSuccessful) ? result : null;
+      if (isSuccessful) {
+        splits = result;
+      }
+      return splits;
     }
 
     /**
@@ -2003,16 +1674,17 @@ public class HLogSplitter {
 
     @Override
     List<Path> finishWritingAndClose() throws IOException {
-      List<Path> result = new ArrayList<Path>();
       try {
         if (!finishWriting()) {
           return null;
         }
         if (hasEditsInDisablingOrDisabledTables) {
-          result = logRecoveredEditsOutputSink.finishWritingAndClose();
+          splits = logRecoveredEditsOutputSink.finishWritingAndClose();
+        } else {
+          splits = new ArrayList<Path>();
         }
         // returns an empty array in order to keep interface same as old way
-        return result;
+        return splits;
       } finally {
         List<IOException> thrown = closeRegionServerWriters();
         if (thrown != null && !thrown.isEmpty()) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java Thu Jul 18 20:42:35 2013
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java Thu Jul 18 20:42:35 2013
@@ -353,10 +353,8 @@ public class TestWALObserver {
   }
 
   private Path runWALSplit(final Configuration c) throws IOException {
-    FileSystem fs = FileSystem.get(c);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
-        this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
-    List<Path> splits = logSplitter.splitLog();
+    List<Path> splits = HLogSplitter.split(
+      hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c);
     // Split should generate only 1 file since there's only 1 region
     assertEquals(1, splits.size());
     // Make sure the file exists

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Thu Jul 18 20:42:35 2013
@@ -84,7 +84,6 @@ import org.apache.hadoop.hbase.util.JVMC
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKTable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.log4j.Level;
@@ -130,7 +129,6 @@ public class TestDistributedLogSplitting
     conf.setInt("zookeeper.recovery.retry", 0);
     conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1);
     conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true);
     TEST_UTIL = new HBaseTestingUtility(conf);
     TEST_UTIL.startMiniCluster(NUM_MASTERS, num_rs);
     cluster = TEST_UTIL.getHBaseCluster();
@@ -401,7 +399,6 @@ public class TestDistributedLogSplitting
     abortMaster(cluster);
 
     // abort RS
-    int numRS = cluster.getLiveRegionServerThreads().size();
     LOG.info("Aborting region server: " + hrs.getServerName());
     hrs.abort("testing");
 
@@ -484,7 +481,6 @@ public class TestDistributedLogSplitting
     abortMaster(cluster);
 
     // abort RS
-    int numRS = cluster.getLiveRegionServerThreads().size();
     LOG.info("Aborting region server: " + hrs.getServerName());
     hrs.abort("testing");
 
@@ -498,9 +494,9 @@ public class TestDistributedLogSplitting
 
     Thread.sleep(2000);
     LOG.info("Current Open Regions:" + getAllOnlineRegions(cluster).size());
-    
+
     startMasterAndWaitUntilLogSplit(cluster);
-    
+
     // wait for all regions are fully recovered
     TEST_UTIL.waitFor(180000, 200, new Waiter.Predicate<Exception>() {
       @Override
@@ -518,8 +514,8 @@ public class TestDistributedLogSplitting
 
     ht.close();
   }
-  
-  
+
+
   @Test(timeout = 300000)
   public void testLogReplayTwoSequentialRSDown() throws Exception {
     LOG.info("testRecoveredEditsReplayTwoSequentialRSDown");
@@ -1103,7 +1099,6 @@ public class TestDistributedLogSplitting
     // turn off load balancing to prevent regions from moving around otherwise
     // they will consume recovered.edits
     master.balanceSwitch(false);
-    FileSystem fs = master.getMasterFileSystem().getFileSystem();
     final ZooKeeperWatcher zkw = new ZooKeeperWatcher(curConf, "table-creation", null);
     List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManager.java Thu Jul 18 20:42:35 2013
@@ -193,8 +193,7 @@ public class TestSplitLogManager {
   public void testTaskCreation() throws Exception {
 
     LOG.info("TestTaskCreation - test the creation of a task in zk");
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -214,8 +213,7 @@ public class TestSplitLogManager {
     zkw.getRecoverableZooKeeper().create(tasknode, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
     Task task = slm.findOrCreateOrphanTask(tasknode);
     assertTrue(task.isOrphan());
@@ -241,8 +239,7 @@ public class TestSplitLogManager {
         CreateMode.PERSISTENT);
     int version = ZKUtil.checkExists(zkw, tasknode);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
     Task task = slm.findOrCreateOrphanTask(tasknode);
     assertTrue(task.isOrphan());
@@ -265,8 +262,7 @@ public class TestSplitLogManager {
     LOG.info("TestMultipleResbmits - no indefinite resubmissions");
 
     conf.setInt("hbase.splitlog.max.resubmit", 2);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -298,8 +294,7 @@ public class TestSplitLogManager {
   public void testRescanCleanup() throws Exception {
     LOG.info("TestRescanCleanup - ensure RESCAN nodes are cleaned up");
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -328,8 +323,7 @@ public class TestSplitLogManager {
   public void testTaskDone() throws Exception {
     LOG.info("TestTaskDone - cleanup task node once in DONE state");
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
     final ServerName worker1 = new ServerName("worker1,1,1");
@@ -349,8 +343,7 @@ public class TestSplitLogManager {
     LOG.info("TestTaskErr - cleanup task node once in ERR state");
 
     conf.setInt("hbase.splitlog.max.resubmit", 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -372,8 +365,7 @@ public class TestSplitLogManager {
   public void testTaskResigned() throws Exception {
     LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
     assertEquals(tot_mgr_resubmit.get(), 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     assertEquals(tot_mgr_resubmit.get(), 0);
     TaskBatch batch = new TaskBatch();
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -407,8 +399,7 @@ public class TestSplitLogManager {
     zkw.getRecoverableZooKeeper().create(tasknode1, slt.toByteArray(), Ids.OPEN_ACL_UNSAFE,
         CreateMode.PERSISTENT);
 
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     waitForCounter(tot_mgr_orphan_task_acquired, 0, 1, to/2);
 
     // submit another task which will stay in unassigned mode
@@ -437,8 +428,7 @@ public class TestSplitLogManager {
     LOG.info("testDeadWorker");
 
     conf.setLong("hbase.splitlog.max.resubmit", 0);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -463,8 +453,7 @@ public class TestSplitLogManager {
 
   @Test
   public void testWorkerCrash() throws Exception {
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     TaskBatch batch = new TaskBatch();
 
     String tasknode = submitTaskAndWait(batch, "foo/1");
@@ -489,8 +478,7 @@ public class TestSplitLogManager {
   @Test
   public void testEmptyLogDir() throws Exception {
     LOG.info("testEmptyLogDir");
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     Path emptyLogDirPath = new Path(fs.getWorkingDirectory(),
         UUID.randomUUID().toString());
@@ -505,8 +493,7 @@ public class TestSplitLogManager {
 
     conf.setInt("hbase.splitlog.manager.unassigned.timeout", 0);
     conf.setInt("hbase.splitlog.manager.timeoutmonitor.period", 1000);
-    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER, null);
-    slm.finishInitialization();
+    slm = new SplitLogManager(zkw, conf, stopper, master, DUMMY_MASTER);
     FileSystem fs = TEST_UTIL.getTestFileSystem();
     final Path logDir = new Path(fs.getWorkingDirectory(),
         UUID.randomUUID().toString());
@@ -544,5 +531,4 @@ public class TestSplitLogManager {
       fs.delete(logDir, true);
     }
   }
-
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java Thu Jul 18 20:42:35 2013
@@ -59,6 +59,7 @@ import org.junit.experimental.categories
 
 /** JUnit test case for HLog */
 @Category(LargeTests.class)
+@SuppressWarnings("deprecation")
 public class TestHLog  {
   private static final Log LOG = LogFactory.getLog(TestHLog.class);
   {
@@ -193,10 +194,8 @@ public class TestHLog  {
         log.rollWriter();
       }
       log.close();
-      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-          hbaseDir, logdir, oldLogDir, fs);
-      List<Path> splits =
-        logSplitter.splitLog();
+      List<Path> splits = HLogSplitter.split(
+        hbaseDir, logdir, oldLogDir, fs, conf);
       verifySplits(splits, howmany);
       log = null;
     } finally {
@@ -340,7 +339,7 @@ public class TestHLog  {
 
   private void verifySplits(List<Path> splits, final int howmany)
   throws IOException {
-    assertEquals(howmany, splits.size());
+    assertEquals(howmany * howmany, splits.size());
     for (int i = 0; i < splits.size(); i++) {
       LOG.info("Verifying=" + splits.get(i));
       HLog.Reader reader = HLogFactory.createReader(fs, splits.get(i), conf);
@@ -362,7 +361,7 @@ public class TestHLog  {
           previousRegion = region;
           count++;
         }
-        assertEquals(howmany * howmany, count);
+        assertEquals(howmany, count);
       } finally {
         reader.close();
       }
@@ -479,7 +478,7 @@ public class TestHLog  {
       throw t.exception;
 
     // Make sure you can read all the content
-    HLog.Reader reader = HLogFactory.createReader(this.fs, walPath, this.conf);
+    HLog.Reader reader = HLogFactory.createReader(fs, walPath, conf);
     int count = 0;
     HLog.Entry entry = new HLog.Entry();
     while (reader.next(entry) != null) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogMethods.java Thu Jul 18 20:42:35 2013
@@ -71,8 +71,7 @@ public class TestHLogMethods {
     createFile(fs, recoverededits,
       Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis());
 
-    HLog log = HLogFactory.createHLog(fs, regiondir,
-                                      "dummyLogName", util.getConfiguration());
+    HLogFactory.createHLog(fs, regiondir, "dummyLogName", util.getConfiguration());
     NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
     assertEquals(7, files.size());
     assertEquals(files.pollFirst().getName(), first);
@@ -111,9 +110,8 @@ public class TestHLogMethods {
   @Test
   public void testEntrySink() throws Exception {
     Configuration conf = new Configuration();
-    HLogSplitter splitter = HLogSplitter.createLogSplitter(
-        conf, mock(Path.class), mock(Path.class), mock(Path.class),
-        mock(FileSystem.class));
+    HLogSplitter splitter = new HLogSplitter(
+      conf, mock(Path.class), mock(FileSystem.class), null, null);
 
     EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
     for (int i = 0; i < 1000; i++) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1504647&r1=1504646&r2=1504647&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Thu Jul 18 20:42:35 2013
@@ -42,11 +42,6 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.hbase.exceptions.OrphanHLogAfterSplitException;
-import org.apache.log4j.Level;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -68,13 +63,17 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter.CorruptedLogFileException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -107,7 +106,6 @@ public class TestHLogSplit {
 
   protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
-
   private static final Path HBASEDIR = new Path("/hbase");
   private static final Path HLOGDIR = new Path(HBASEDIR, "hlog");
   private static final Path OLDLOGDIR = new Path(HBASEDIR, "hlog.old");
@@ -209,10 +207,15 @@ public class TestHLogSplit {
         @Override
         public Integer run() throws Exception {
           FileSystem fs = FileSystem.get(conf2);
-          HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf2, HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-          logSplitter.splitLog();
-          Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region);
-          return countHLog(logfile, fs, conf2);
+          int expectedFiles = fs.listStatus(HLOGDIR).length;
+          HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf2);
+          Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+          assertEquals(expectedFiles, logfiles.length);
+          int count = 0;
+          for (Path logfile: logfiles) {
+            count += countHLog(logfile, fs, conf2);
+          }
+          return count;
         }
       });
       LOG.info("zombie=" + counter.get() + ", robber=" + count);
@@ -374,27 +377,6 @@ public class TestHLogSplit {
     HLogFactory.createWriter(fs, p, conf).close();
   }
 
-  @Test(expected = OrphanHLogAfterSplitException.class)
-  public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
-  throws IOException {
-    AtomicBoolean stop = new AtomicBoolean(false);
-
-    assertFalse("Previous test should clean up table dir",
-      fs.exists(new Path("/hbase/t1")));
-
-    generateHLogs(-1);
-
-    CountDownLatch latch = new CountDownLatch(1);
-    try {
-      (new ZombieNewLogWriterRegionServer(latch, stop)).start();
-      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-      logSplitter.splitLog(latch);
-    } finally {
-      stop.set(true);
-    }
-  }
-
   @Test
   public void testSplitPreservesEdits() throws IOException{
     final String REGION = "region__1";
@@ -403,14 +385,12 @@ public class TestHLogSplit {
 
     generateHLogs(1, 10, -1);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-      HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
-
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
-    Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    assertEquals(1, splitLog.length);
 
-    assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog));
+    assertEquals("edits differ after split", true, logsAreEqual(originalLog, splitLog[0]));
   }
 
 
@@ -425,16 +405,17 @@ public class TestHLogSplit {
     // initialize will create a new DFSClient with a new client ID
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
-
-
+    int expectedFiles = fs.listStatus(HLOGDIR).length - 2; // less 2 empty files
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     for (String region : REGIONS) {
-      Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region);
-      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+      Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(expectedFiles, logfiles.length);
+      int count = 0;
+      for (Path logfile: logfiles) {
+        count += countHLog(logfile, fs, conf);
+      }
+      assertEquals(NUM_WRITERS * ENTRIES, count);
     }
-
   }
 
 
@@ -448,13 +429,16 @@ public class TestHLogSplit {
     // initialize will create a new DFSClient with a new client ID
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
-
+    int expectedFiles = fs.listStatus(HLOGDIR).length - 2 ; // less 2 empty files
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     for (String region : REGIONS) {
-      Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region);
-      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+      Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(expectedFiles, logfiles.length);
+      int count = 0;
+      for (Path logfile: logfiles) {
+        count += countHLog(logfile, fs, conf);
+      }
+      assertEquals(NUM_WRITERS * ENTRIES, count);
     }
   }
 
@@ -465,16 +449,17 @@ public class TestHLogSplit {
 
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
-
+    int expectedFiles = fs.listStatus(HLOGDIR).length;
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     for (String region : REGIONS) {
-      Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region);
-      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+      Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(expectedFiles, logfiles.length);
+      int count = 0;
+      for (Path logfile: logfiles) {
+        count += countHLog(logfile, fs, conf);
+      }
+      assertEquals(NUM_WRITERS * ENTRIES, count);
     }
-
-
   }
 
 
@@ -486,15 +471,17 @@ public class TestHLogSplit {
             Corruptions.APPEND_GARBAGE, true, fs);
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
+    int expectedFiles = fs.listStatus(HLOGDIR).length;
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     for (String region : REGIONS) {
-      Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region);
-      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
+      Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(expectedFiles, logfiles.length);
+      int count = 0;
+      for (Path logfile: logfiles) {
+        count += countHLog(logfile, fs, conf);
+      }
+      assertEquals(NUM_WRITERS * ENTRIES, count);
     }
-
-
   }
 
   @Test
@@ -505,18 +492,19 @@ public class TestHLogSplit {
             Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
+    int expectedFiles = fs.listStatus(HLOGDIR).length - 1; // less 1 corrupted file
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     for (String region : REGIONS) {
-      Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region);
-      assertEquals((NUM_WRITERS - 1) * ENTRIES, countHLog(logfile, fs, conf));
+      Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(expectedFiles, logfiles.length);
+      int count = 0;
+      for (Path logfile: logfiles) {
+        count += countHLog(logfile, fs, conf);
+      }
+      assertEquals((NUM_WRITERS - 1) * ENTRIES, count);
     }
-
-
   }
 
-
   @Test
   public void testMiddleGarbageCorruptionSkipErrorsReadsHalfOfFile() throws IOException {
     conf.setBoolean(HBASE_SKIP_ERRORS, true);
@@ -524,19 +512,23 @@ public class TestHLogSplit {
     corruptHLog(new Path(HLOGDIR, HLOG_FILE_PREFIX + "5"),
             Corruptions.INSERT_GARBAGE_IN_THE_MIDDLE, false, fs);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
 
+    int expectedFiles = fs.listStatus(HLOGDIR).length;
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     for (String region : REGIONS) {
-      Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(expectedFiles, logfiles.length);
+      int count = 0;
+      for (Path logfile: logfiles) {
+        count += countHLog(logfile, fs, conf);
+      }
       // the entries in the original logs are alternating regions
       // considering the sequence file header, the middle corruption should
       // affect at least half of the entries
       int goodEntries = (NUM_WRITERS - 1) * ENTRIES;
       int firstHalfEntries = (int) Math.ceil(ENTRIES / 2) - 1;
       assertTrue("The file up to the corrupted area hasn't been parsed",
-              goodEntries + firstHalfEntries <= countHLog(logfile, fs, conf));
+              goodEntries + firstHalfEntries <= count);
     }
   }
 
@@ -556,9 +548,7 @@ public class TestHLogSplit {
         conf.set("faultysequencefilelogreader.failuretype", failureType.name());
         generateHLogs(1, ENTRIES, -1);
         fs.initialize(fs.getUri(), conf);
-        HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-            HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-        logSplitter.splitLog();
+        HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
         FileStatus[] archivedLogs = fs.listStatus(CORRUPTDIR);
         assertEquals("expected a different file", c1.getName(), archivedLogs[0]
             .getPath().getName());
@@ -586,16 +576,13 @@ public class TestHLogSplit {
           FaultySequenceFileLogReader.class, HLog.Reader.class);
       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
       generateHLogs(Integer.MAX_VALUE);
-    fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
+      fs.initialize(fs.getUri(), conf);
+      HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     } finally {
       conf.setClass("hbase.regionserver.hlog.reader.impl", backupClass,
           Reader.class);
       HLogFactory.resetLogReaderClass();
     }
-
   }
 
   @Test
@@ -613,10 +600,8 @@ public class TestHLogSplit {
       conf.set("faultysequencefilelogreader.failuretype", FaultySequenceFileLogReader.FailureType.BEGINNING.name());
       generateHLogs(-1);
       fs.initialize(fs.getUri(), conf);
-      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-          HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
       try {
-        logSplitter.splitLog();
+        HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
       } catch (IOException e) {
         assertEquals(
             "if skip.errors is false all files should remain in place",
@@ -627,7 +612,6 @@ public class TestHLogSplit {
           Reader.class);
       HLogFactory.resetLogReaderClass();
     }
-
   }
 
   @Test
@@ -644,14 +628,13 @@ public class TestHLogSplit {
     corruptHLog(c1, Corruptions.TRUNCATE, true, fs);
 
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
 
-    Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    assertEquals(1, splitLog.length);
 
     int actualCount = 0;
-    HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf);
+    HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
     @SuppressWarnings("unused")
     HLog.Entry entry;
     while ((entry = in.next()) != null) ++actualCount;
@@ -676,14 +659,13 @@ public class TestHLogSplit {
     corruptHLog(c1, Corruptions.TRUNCATE_TRAILER, true, fs);
 
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
 
-    Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    assertEquals(1, splitLog.length);
 
     int actualCount = 0;
-    HLog.Reader in = HLogFactory.createReader(fs, splitLog, conf);
+    HLog.Reader in = HLogFactory.createReader(fs, splitLog[0], conf);
     @SuppressWarnings("unused")
     HLog.Entry entry;
     while ((entry = in.next()) != null) ++actualCount;
@@ -697,16 +679,10 @@ public class TestHLogSplit {
   @Test
   public void testLogsGetArchivedAfterSplit() throws IOException {
     conf.setBoolean(HBASE_SKIP_ERRORS, false);
-
     generateHLogs(-1);
-
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
-
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     FileStatus[] archivedLogs = fs.listStatus(OLDLOGDIR);
-
     assertEquals("wrong number of files in the archive log", NUM_WRITERS, archivedLogs.length);
   }
 
@@ -714,14 +690,17 @@ public class TestHLogSplit {
   public void testSplit() throws IOException {
     generateHLogs(-1);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
 
+    int expectedFiles = fs.listStatus(HLOGDIR).length;
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     for (String region : REGIONS) {
-      Path logfile = getLogForRegion(HBASEDIR, TABLE_NAME, region);
-      assertEquals(NUM_WRITERS * ENTRIES, countHLog(logfile, fs, conf));
-
+      Path[] logfiles = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(expectedFiles, logfiles.length);
+      int count = 0;
+      for (Path logfile: logfiles) {
+        count += countHLog(logfile, fs, conf);
+      }
+      assertEquals(NUM_WRITERS * ENTRIES, count);
     }
   }
 
@@ -730,9 +709,7 @@ public class TestHLogSplit {
   throws IOException {
     generateHLogs(-1);
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     FileStatus [] statuses = null;
     try {
       statuses = fs.listStatus(HLOGDIR);
@@ -745,41 +722,6 @@ public class TestHLogSplit {
     }
   }
 
-
-  @Test
-  public void testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted()
-  throws IOException {
-    AtomicBoolean stop = new AtomicBoolean(false);
-    generateHLogs(-1);
-    fs.initialize(fs.getUri(), conf);
-    CountDownLatch latch = new CountDownLatch(1);
-    Thread zombie = new ZombieNewLogWriterRegionServer(latch, stop);
-
-    List<Path> splits = null;
-    try {
-      zombie.start();
-      try {
-        HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-            HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-        splits = logSplitter.splitLog(latch);
-      } catch (IOException ex) {
-        /* expected */
-        LOG.warn("testSplitWillNotTouchLogsIfNewHLogGetsCreatedAfterSplitStarted", ex);
-      }
-      FileStatus[] files = fs.listStatus(HLOGDIR);
-      if (files == null) fail("no files in " + HLOGDIR + " with splits " + splits);
-      int logFilesNumber = files.length;
-
-      assertEquals("Log files should not be archived if there's an extra file after split",
-              NUM_WRITERS + 1, logFilesNumber);
-    } finally {
-      stop.set(true);
-    }
-
-  }
-
-
-
   @Test(expected = IOException.class)
   public void testSplitWillFailIfWritingToRegionFails() throws Exception {
     //leave 5th log open so we could append the "trap"
@@ -798,10 +740,7 @@ public class TestHLogSplit {
 
     try {
       InstrumentedSequenceFileLogWriter.activateFailure = true;
-      HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-          HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-      logSplitter.splitLog();
-
+      HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     } catch (IOException e) {
       assertEquals("This exception is instrumented and should only be thrown for testing", e.getMessage());
       throw e;
@@ -825,21 +764,14 @@ public class TestHLogSplit {
     generateHLogs(1, 100, -1);
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     fs.rename(OLDLOGDIR, HLOGDIR);
     Path firstSplitPath = new Path(HBASEDIR, Bytes.toString(TABLE_NAME) + ".first");
     Path splitPath = new Path(HBASEDIR, Bytes.toString(TABLE_NAME));
-    fs.rename(splitPath,
-            firstSplitPath);
-
+    fs.rename(splitPath, firstSplitPath);
 
     fs.initialize(fs.getUri(), conf);
-    logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
-
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     assertEquals(0, compareHLogSplitDirs(firstSplitPath, splitPath));
   }
 
@@ -850,16 +782,11 @@ public class TestHLogSplit {
     REGIONS.add(region);
 
     generateHLogs(1);
-
     fs.initialize(fs.getUri(), conf);
 
     Path regiondir = new Path(TABLEDIR, region);
     fs.delete(regiondir, true);
-
-    HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, HLOGDIR, OLDLOGDIR, fs);
-    logSplitter.splitLog();
-
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     assertFalse(fs.exists(regiondir));
   }
 
@@ -868,20 +795,23 @@ public class TestHLogSplit {
     conf.setBoolean(HBASE_SKIP_ERRORS, false);
 
     generateHLogs(-1);
-
     fs.initialize(fs.getUri(), conf);
+    FileStatus[] logfiles = fs.listStatus(HLOGDIR);
+    assertTrue("There should be some log file",
+      logfiles != null && logfiles.length > 0);
     // Set up a splitter that will throw an IOE on the output side
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) {
-      protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
-      throws IOException {
+        conf, HBASEDIR, fs, null, null) {
+      protected HLog.Writer createWriter(FileSystem fs,
+          Path logfile, Configuration conf) throws IOException {
         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
-        Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
+        Mockito.doThrow(new IOException("Injected")).when(
+          mockWriter).append(Mockito.<HLog.Entry>any());
         return mockWriter;
       }
     };
     try {
-      logSplitter.splitLog();
+      logSplitter.splitLogFile(logfiles[0], null);
       fail("Didn't throw!");
     } catch (IOException ioe) {
       assertTrue(ioe.toString().contains("Injected"));
@@ -903,11 +833,8 @@ public class TestHLogSplit {
     Mockito.doThrow(new LeaseExpiredException("Injected: File does not exist")).
         when(spiedFs).append(Mockito.<Path>any());
 
-    HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, null);
-
     try {
-      logSplitter.splitLog();
+      HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
       assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
       assertFalse(fs.exists(HLOGDIR));
     } catch (IOException e) {
@@ -945,11 +872,8 @@ public class TestHLogSplit {
         }
     }).when(spiedFs).open(Mockito.<Path>any(), Mockito.anyInt());
 
-    HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, null);
-
     try {
-      logSplitter.splitLog();
+      HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, spiedFs, conf);
       assertEquals(NUM_WRITERS, fs.listStatus(OLDLOGDIR).length);
       assertFalse(fs.exists(HLOGDIR));
     } catch (IOException e) {
@@ -984,8 +908,8 @@ public class TestHLogSplit {
 
     try {
       conf.setInt("hbase.splitlog.report.period", 1000);
-      HLogSplitter s = new HLogSplitter(conf, HBASEDIR, null, null, spiedFs, null);
-      boolean ret = s.splitLogFile(logfile, localReporter);
+      boolean ret = HLogSplitter.splitLogFile(
+        HBASEDIR, logfile, spiedFs, conf, localReporter, null, null);
       assertFalse("Log splitting should failed", ret);
       assertTrue(count.get() > 0);
     } catch (IOException e) {
@@ -1034,7 +958,8 @@ public class TestHLogSplit {
     localConf.setInt("hbase.regionserver.hlog.splitlog.buffersize", bufferSize);
 
     // Create a fake log file (we'll override the reader to produce a stream of edits)
-    FSDataOutputStream out = fs.create(new Path(HLOGDIR, HLOG_FILE_PREFIX + ".fake"));
+    Path logPath = new Path(HLOGDIR, HLOG_FILE_PREFIX + ".fake");
+    FSDataOutputStream out = fs.create(logPath);
     out.close();
 
     // Make region dirs for our destination regions so the output doesn't get skipped
@@ -1043,7 +968,7 @@ public class TestHLogSplit {
 
     // Create a splitter that reads and writes the data without touching disk
     HLogSplitter logSplitter = new HLogSplitter(
-        localConf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) {
+        localConf, HBASEDIR, fs, null, null) {
 
       /* Produce a mock writer that doesn't write anywhere */
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@@ -1076,7 +1001,6 @@ public class TestHLogSplit {
         return mockWriter;
       }
 
-
       /* Produce a mock reader that generates fake entries */
       protected Reader getReader(FileSystem fs, Path curLogFile,
           Configuration conf, CancelableProgressable reporter) throws IOException {
@@ -1103,15 +1027,13 @@ public class TestHLogSplit {
       }
     };
 
-    logSplitter.splitLog();
+    logSplitter.splitLogFile(fs.getFileStatus(logPath), null);
 
     // Verify number of written edits per region
-
-    Map<byte[], Long> outputCounts = logSplitter.getOutputCounts();
+    Map<byte[], Long> outputCounts = logSplitter.outputSink.getOutputCounts();
     for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
       LOG.info("Got " + entry.getValue() + " output edits for region " +
           Bytes.toString(entry.getKey()));
-
       assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
     }
     assertEquals(regions.size(), outputCounts.size());
@@ -1160,9 +1082,7 @@ public class TestHLogSplit {
       LOG.debug("Renamed region directory: " + rsSplitDir);
 
       // Process the old log files
-      HLogSplitter splitter = HLogSplitter.createLogSplitter(conf,
-        HBASEDIR, rsSplitDir, OLDLOGDIR, fs);
-      splitter.splitLog();
+      HLogSplitter.split(HBASEDIR, rsSplitDir, OLDLOGDIR, fs, conf);
 
       // Now, try to roll the HLog and verify failure
       try {
@@ -1232,17 +1152,6 @@ public class TestHLogSplit {
     }
   }
 
-  private CancelableProgressable reporter = new CancelableProgressable() {
-    int count = 0;
-
-    @Override
-    public boolean progress() {
-      count++;
-      LOG.debug("progress = " + count);
-      return true;
-    }
-  };
-
   @Test
   public void testSplitLogFileWithOneRegion() throws IOException {
     LOG.info("testSplitLogFileWithOneRegion");
@@ -1250,60 +1159,45 @@ public class TestHLogSplit {
     REGIONS.removeAll(REGIONS);
     REGIONS.add(REGION);
 
-
     generateHLogs(1, 10, -1);
-    FileStatus logfile = fs.listStatus(HLOGDIR)[0];
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter);
-    HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
-        .toString(), conf);
-
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
 
     Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
-    Path splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
-
+    Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, REGION);
+    assertEquals(1, splitLog.length);
 
-    assertEquals(true, logsAreEqual(originalLog, splitLog));
+    assertEquals(true, logsAreEqual(originalLog, splitLog[0]));
   }
 
   @Test
-  public void testSplitLogFileDeletedRegionDir()
-  throws IOException {
-	LOG.info("testSplitLogFileDeletedRegionDir");
-	final String REGION = "region__1";
+  public void testSplitLogFileDeletedRegionDir() throws IOException {
+    LOG.info("testSplitLogFileDeletedRegionDir");
+    final String REGION = "region__1";
     REGIONS.removeAll(REGIONS);
     REGIONS.add(REGION);
 
-
     generateHLogs(1, 10, -1);
-    FileStatus logfile = fs.listStatus(HLOGDIR)[0];
     fs.initialize(fs.getUri(), conf);
 
     Path regiondir = new Path(TABLEDIR, REGION);
     LOG.info("Region directory is" + regiondir);
     fs.delete(regiondir, true);
 
-    HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter);
-    HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
-        .toString(), conf);
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
 
     assertTrue(!fs.exists(regiondir));
     assertTrue(true);
   }
 
-
-
   @Test
   public void testSplitLogFileEmpty() throws IOException {
     LOG.info("testSplitLogFileEmpty");
     injectEmptyFile(".empty", true);
-    FileStatus logfile = fs.listStatus(HLOGDIR)[0];
 
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter);
-    HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
-        .toString(), conf);
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     Path tdir = HTableDescriptor.getTableDir(HBASEDIR, TABLE_NAME);
     assertFalse(fs.exists(tdir));
 
@@ -1314,15 +1208,13 @@ public class TestHLogSplit {
   public void testSplitLogFileMultipleRegions() throws IOException {
     LOG.info("testSplitLogFileMultipleRegions");
     generateHLogs(1, 10, -1);
-    FileStatus logfile = fs.listStatus(HLOGDIR)[0];
     fs.initialize(fs.getUri(), conf);
 
-    HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter);
-    HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
-        .toString(), conf);
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
     for (String region : REGIONS) {
-      Path recovered = getLogForRegion(HBASEDIR, TABLE_NAME, region);
-      assertEquals(10, countHLog(recovered, fs, conf));
+      Path[] recovered = getLogForRegion(HBASEDIR, TABLE_NAME, region);
+      assertEquals(1, recovered.length);
+      assertEquals(10, countHLog(recovered[0], fs, conf));
     }
   }
 
@@ -1337,9 +1229,7 @@ public class TestHLogSplit {
         Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
 
     fs.initialize(fs.getUri(), conf);
-    HLogSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, reporter);
-    HLogSplitter.finishSplitLogFile(HBASEDIR, OLDLOGDIR, logfile.getPath()
-        .toString(), conf);
+    HLogSplitter.split(HBASEDIR, HLOGDIR, OLDLOGDIR, fs, conf);
 
     final Path corruptDir = new Path(FSUtils.getRootDir(conf), conf.get(
         "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
@@ -1361,9 +1251,12 @@ public class TestHLogSplit {
     generateHLogs(-1);
 
     HLogFactory.createHLog(fs, regiondir, regionName, conf);
+    FileStatus[] logfiles = fs.listStatus(HLOGDIR);
+    assertTrue("There should be some log file",
+      logfiles != null && logfiles.length > 0);
 
     HLogSplitter logSplitter = new HLogSplitter(
-        conf, HBASEDIR, HLOGDIR, OLDLOGDIR, fs, null) {
+        conf, HBASEDIR, fs, null, null) {
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
       throws IOException {
         HLog.Writer writer = HLogFactory.createWriter(fs, logfile, conf);
@@ -1384,7 +1277,7 @@ public class TestHLogSplit {
       }
     };
     try{
-      logSplitter.splitLog();
+      logSplitter.splitLogFile(logfiles[0], null);
     } catch (IOException e) {
       LOG.info(e);
       Assert.fail("Throws IOException when spliting "
@@ -1443,15 +1336,18 @@ public class TestHLogSplit {
     return ws;
   }
 
-  private Path getLogForRegion(Path rootdir, byte[] table, String region)
+  private Path[] getLogForRegion(Path rootdir, byte[] table, String region)
   throws IOException {
     Path tdir = HTableDescriptor.getTableDir(rootdir, table);
     @SuppressWarnings("deprecation")
     Path editsdir = HLogUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir,
       Bytes.toString(region.getBytes())));
     FileStatus [] files = this.fs.listStatus(editsdir);
-    assertEquals(1, files.length);
-    return files[0].getPath();
+    Path[] paths = new Path[files.length];
+    for (int i = 0; i < files.length; i++) {
+      paths[i] = files[i].getPath();
+    }
+    return paths;
   }
 
   private void corruptHLog(Path path, Corruptions corruption, boolean close,
@@ -1635,6 +1531,4 @@ public class TestHLogSplit {
     }
     return true;
   }
-
 }
-



Mime
View raw message