hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject svn commit: r1492399 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/master/snapshot/ main/java/org/apache/hadoop/hbase/procedure/ main/java/org/apache/hadoop/hbase/regionserver/snapshot/ test/java/org/apache/hadoop/hbase/...
Date Wed, 12 Jun 2013 19:50:09 GMT
Author: mbertozzi
Date: Wed Jun 12 19:50:09 2013
New Revision: 1492399

URL: http://svn.apache.org/r1492399
Log:
HBASE-8706 Some improvement in snapshot (binlijin)

Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java
Wed Jun 12 19:50:09 2013
@@ -114,7 +114,7 @@ public class SnapshotManager implements 
   private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
 
   /** By default, check to see if the snapshot is complete (ms) */
-  private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 5000;
+  private static final int SNAPSHOT_TIMEOUT_MILLIS_DEFAULT = 60000;
 
   /**
    * Conf key for # of ms elapsed before injecting a snapshot timeout error when waiting
for
@@ -132,7 +132,6 @@ public class SnapshotManager implements 
   private static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
 
   private boolean stopped;
-  private final long wakeFrequency;
   private final MasterServices master;  // Needed by TableEventHandlers
   private final MetricsMaster metricsMaster;
   private final ProcedureCoordinator coordinator;
@@ -169,16 +168,17 @@ public class SnapshotManager implements 
 
     // get the configuration for the coordinator
     Configuration conf = master.getConfiguration();
-    this.wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
-    long keepAliveTime = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
+    long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
+    long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
     int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
 
     // setup the default procedure coordinator
     String name = master.getServerName().toString();
-    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, keepAliveTime, opThreads,
wakeFrequency);
+    ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
     ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinatorRpcs(
         master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
-    this.coordinator = new ProcedureCoordinator(comms, tpool);
+
+    this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
     this.executorService = master.getExecutorService();
     resetTempDir();
   }
@@ -198,8 +198,6 @@ public class SnapshotManager implements 
     this.rootDir = master.getMasterFileSystem().getRootDir();
     checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
 
-    this.wakeFrequency = master.getConfiguration().getInt(SNAPSHOT_WAKE_MILLIS_KEY,
-      SNAPSHOT_WAKE_MILLIS_DEFAULT);
     this.coordinator = coordinator;
     this.executorService = pool;
     resetTempDir();
@@ -871,6 +869,11 @@ public class SnapshotManager implements 
     for (SnapshotSentinel restoreHandler: this.restoreHandlers.values()) {
       restoreHandler.cancel(why);
     }
+    try {
+      coordinator.close();
+    } catch (IOException e) {
+      LOG.error("stop ProcedureCoordinator error", e);
+    }
   }
 
   @Override

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/TakeSnapshotHandler.java
Wed Jun 12 19:50:09 2013
@@ -102,7 +102,7 @@ public abstract class TakeSnapshotHandle
     this.rootDir = this.master.getMasterFileSystem().getRootDir();
     this.snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
     this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir);
-    this.monitor =  new ForeignExceptionDispatcher();
+    this.monitor = new ForeignExceptionDispatcher(snapshot.getName());
 
     this.tableLockManager = master.getTableLockManager();
     this.tableLock = this.tableLockManager.writeLock(Bytes.toBytes(snapshot.getTable())
@@ -168,6 +168,7 @@ public abstract class TakeSnapshotHandle
 
       // run the snapshot
       snapshotRegions(regionsAndLocations);
+      monitor.rethrowException();
 
       // extract each pair to separate lists
       Set<String> serverNames = new HashSet<String>();

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java
Wed Jun 12 19:50:09 2013
@@ -51,11 +51,14 @@ import com.google.common.collect.MapMake
 public class ProcedureCoordinator {
   private static final Log LOG = LogFactory.getLog(ProcedureCoordinator.class);
 
+  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
   final static long TIMEOUT_MILLIS_DEFAULT = 60000;
   final static long WAKE_MILLIS_DEFAULT = 500;
 
   private final ProcedureCoordinatorRpcs rpcs;
   private final ExecutorService pool;
+  private final long wakeTimeMillis;
+  private final long timeoutMillis;
 
   // Running procedure table.  Maps procedure name to running procedure reference
   private final ConcurrentMap<String, Procedure> procedures =
@@ -71,6 +74,23 @@ public class ProcedureCoordinator {
    * @param pool Used for executing procedures.
    */
   public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool) {
+    this(rpcs, pool, TIMEOUT_MILLIS_DEFAULT, WAKE_MILLIS_DEFAULT);
+  }
+
+  /**
+   * Create and start a ProcedureCoordinator.
+   *
+   * The rpc object registers the ProcedureCoordinator and starts any threads in
+   * this constructor.
+   *
+   * @param rpcs
+   * @param pool Used for executing procedures.
+   * @param timeoutMillis
+   */
+  public ProcedureCoordinator(ProcedureCoordinatorRpcs rpcs, ThreadPoolExecutor pool,
+      long timeoutMillis, long wakeTimeMillis) {
+    this.timeoutMillis = timeoutMillis;
+    this.wakeTimeMillis = wakeTimeMillis;
     this.rpcs = rpcs;
     this.pool = pool;
     this.rpcs.start(this);
@@ -78,10 +98,24 @@ public class ProcedureCoordinator {
 
   /**
    * Default thread pool for the procedure
+   *
+   * @param coordName
+   * @param opThreads the maximum number of threads to allow in the pool
    */
-  public static ThreadPoolExecutor defaultPool(String coordName, long keepAliveTime, int
opThreads,
-      long wakeFrequency) {
-    return new ThreadPoolExecutor(1, opThreads, keepAliveTime, TimeUnit.SECONDS,
+  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads) {
+    return defaultPool(coordName, opThreads, KEEP_ALIVE_MILLIS_DEFAULT);
+  }
+
+  /**
+   * Default thread pool for the procedure
+   *
+   * @param coordName
+   * @param opThreads the maximum number of threads to allow in the pool
+   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for
new tasks
+   */
+  public static ThreadPoolExecutor defaultPool(String coordName, int opThreads,
+      long keepAliveMillis) {
+    return new ThreadPoolExecutor(1, opThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
         new SynchronousQueue<Runnable>(),
         new DaemonThreadFactory("(" + coordName + ")-proc-coordinator-pool"));
   }
@@ -194,7 +228,7 @@ public class ProcedureCoordinator {
   Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[] procArgs,
       List<String> expectedMembers) {
     // build the procedure
-    return new Procedure(this, fed, WAKE_MILLIS_DEFAULT, TIMEOUT_MILLIS_DEFAULT,
+    return new Procedure(this, fed, wakeTimeMillis, timeoutMillis,
         procName, procArgs, expectedMembers);
   }
 

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
Wed Jun 12 19:50:09 2013
@@ -51,6 +51,8 @@ import com.google.common.collect.MapMake
 public class ProcedureMember implements Closeable {
   private static final Log LOG = LogFactory.getLog(ProcedureMember.class);
 
+  final static long KEEP_ALIVE_MILLIS_DEFAULT = 5000;
+
   private final SubprocedureFactory builder;
   private final ProcedureMemberRpcs rpcs;
 
@@ -72,9 +74,26 @@ public class ProcedureMember implements 
     this.builder = factory;
   }
 
-  public static ThreadPoolExecutor defaultPool(long wakeFrequency, long keepAlive,
-      int procThreads, String memberName) {
-    return new ThreadPoolExecutor(1, procThreads, keepAlive, TimeUnit.SECONDS,
+  /**
+   * Default thread pool for the procedure
+   *
+   * @param memberName
+   * @param procThreads the maximum number of threads to allow in the pool
+   */
+  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads) {
+    return defaultPool(memberName, procThreads, KEEP_ALIVE_MILLIS_DEFAULT);
+  }
+
+  /**
+   * Default thread pool for the procedure
+   *
+   * @param memberName
+   * @param procThreads the maximum number of threads to allow in the pool
+   * @param keepAliveMillis the maximum time (ms) that excess idle threads will wait for
new tasks
+   */
+  public static ThreadPoolExecutor defaultPool(String memberName, int procThreads,
+      long keepAliveMillis) {
+    return new ThreadPoolExecutor(1, procThreads, keepAliveMillis, TimeUnit.MILLISECONDS,
         new SynchronousQueue<Runnable>(),
         new DaemonThreadFactory("member: '" + memberName + "' subprocedure-pool"));
   }
@@ -85,7 +104,7 @@ public class ProcedureMember implements 
    * @return reference to the Procedure member's rpcs object
    */
   ProcedureMemberRpcs getRpcs() {
-     return rpcs;
+    return rpcs;
   }
 
 

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
Wed Jun 12 19:50:09 2013
@@ -24,9 +24,9 @@ import java.util.concurrent.CountDownLat
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
-import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
 
 /**
@@ -198,7 +198,6 @@ abstract public class Subprocedure imple
       } else {
         msg = "Subprocedure '" + barrierName + "' failed!";
       }
-      LOG.error(msg , e);
       cancel(msg, e);
 
       LOG.debug("Subprocedure '" + barrierName + "' running cleanup.");

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
Wed Jun 12 19:50:09 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.DaemonThreadFactory;
 import org.apache.hadoop.hbase.errorhandling.ForeignException;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.exceptions.SnapshotCreationException;
 import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.procedure.ProcedureMember;
@@ -48,7 +49,6 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.exceptions.SnapshotCreationException;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -125,12 +125,11 @@ public class RegionServerSnapshotManager
 
     // read in the snapshot request configuration properties
     Configuration conf = rss.getConfiguration();
-    long wakeMillis = conf.getLong(SNAPSHOT_REQUEST_WAKE_MILLIS_KEY, SNAPSHOT_REQUEST_WAKE_MILLIS_DEFAULT);
     long keepAlive = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY, SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
     int opThreads = conf.getInt(SNAPSHOT_REQUEST_THREADS_KEY, SNAPSHOT_REQUEST_THREADS_DEFAULT);
 
     // create the actual snapshot procedure member
-    ThreadPoolExecutor pool = ProcedureMember.defaultPool(wakeMillis, keepAlive, opThreads,
nodeName);
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(nodeName, opThreads, keepAlive);
     this.member = new ProcedureMember(memberRpcs, pool, new SnapshotSubprocedureBuilder());
   }
 
@@ -191,7 +190,7 @@ public class RegionServerSnapshotManager
 
     LOG.debug("Launching subprocedure for snapshot " + snapshot.getName() + " from table
"
         + snapshot.getTable());
-    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher();
+    ForeignExceptionDispatcher exnDispatcher = new ForeignExceptionDispatcher(snapshot.getName());
     Configuration conf = rss.getConfiguration();
     long timeoutMillis = conf.getLong(SNAPSHOT_TIMEOUT_MILLIS_KEY,
         SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
@@ -355,7 +354,6 @@ public class RegionServerSnapshotManager
       }
 
       // evict remaining tasks and futures from taskPool.
-      LOG.debug(taskPool);
       while (!futures.isEmpty()) {
         // block to remove cancelled futures;
         LOG.warn("Removing cancelled elements from taskPool");

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureCoordinator.java
Wed Jun 12 19:50:09 2013
@@ -86,7 +86,7 @@ public class TestProcedureCoordinator {
   }
 
   private ProcedureCoordinator buildNewCoordinator() {
-    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, POOL_KEEP_ALIVE,
1, WAKE_FREQUENCY);
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(nodeName, 1, POOL_KEEP_ALIVE);
     return spy(new ProcedureCoordinator(controller, pool));
   }
 

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestProcedureMember.java
Wed Jun 12 19:50:09 2013
@@ -88,7 +88,7 @@ public class TestProcedureMember {
    */
   private ProcedureMember buildCohortMember() {
     String name = "node";
-    ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE,
1, name);
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
     return new ProcedureMember(mockMemberComms, pool, mockBuilder);
   }
 
@@ -98,7 +98,7 @@ public class TestProcedureMember {
   private void buildCohortMemberPair() throws IOException {
     dispatcher = new ForeignExceptionDispatcher();
     String name = "node";
-    ThreadPoolExecutor pool = ProcedureMember.defaultPool(WAKE_FREQUENCY, POOL_KEEP_ALIVE,
1, name);
+    ThreadPoolExecutor pool = ProcedureMember.defaultPool(name, 1, POOL_KEEP_ALIVE);
     member = new ProcedureMember(mockMemberComms, pool, mockBuilder);
     when(mockMemberComms.getMemberName()).thenReturn("membername"); // needed for generating
exception
     Subprocedure subproc = new EmptySubprocedure(member, dispatcher);

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java?rev=1492399&r1=1492398&r2=1492399&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestZKProcedure.java
Wed Jun 12 19:50:09 2013
@@ -128,7 +128,7 @@ public class TestZKProcedure {
     // start running the controller
     ZKProcedureCoordinatorRpcs coordinatorComms = new ZKProcedureCoordinatorRpcs(
         coordZkw, opDescription, COORDINATOR_NODE_NAME);
-    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE,
POOL_SIZE, WAKE_FREQUENCY);
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE,
KEEP_ALIVE);
     ProcedureCoordinator coordinator = new ProcedureCoordinator(coordinatorComms, pool) {
       @Override
       public Procedure createProcedure(ForeignExceptionDispatcher fed, String procName, byte[]
procArgs,
@@ -146,7 +146,7 @@ public class TestZKProcedure {
     for (String member : members) {
       ZooKeeperWatcher watcher = newZooKeeperWatcher();
       ZKProcedureMemberRpcs comms = new ZKProcedureMemberRpcs(watcher, opDescription, member);
-      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE,
1, member);
+      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
       ProcedureMember procMember = new ProcedureMember(comms, pool2, subprocFactory);
       procMembers.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(procMember,
comms));
       comms.start(procMember);
@@ -210,7 +210,7 @@ public class TestZKProcedure {
     ZooKeeperWatcher coordinatorWatcher = newZooKeeperWatcher();
     ZKProcedureCoordinatorRpcs coordinatorController = new ZKProcedureCoordinatorRpcs(
         coordinatorWatcher, opDescription, COORDINATOR_NODE_NAME);
-    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, KEEP_ALIVE,
POOL_SIZE, WAKE_FREQUENCY);
+    ThreadPoolExecutor pool = ProcedureCoordinator.defaultPool(COORDINATOR_NODE_NAME, POOL_SIZE,
KEEP_ALIVE);
     ProcedureCoordinator coordinator = spy(new ProcedureCoordinator(coordinatorController,
pool));
 
     // start a member for each node
@@ -220,7 +220,7 @@ public class TestZKProcedure {
     for (String member : expected) {
       ZooKeeperWatcher watcher = newZooKeeperWatcher();
       ZKProcedureMemberRpcs controller = new ZKProcedureMemberRpcs(watcher, opDescription,
member);
-      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(WAKE_FREQUENCY, KEEP_ALIVE,
1, member);
+      ThreadPoolExecutor pool2 = ProcedureMember.defaultPool(member, 1, KEEP_ALIVE);
       ProcedureMember mem = new ProcedureMember(controller, pool2, subprocFactory);
       members.add(new Pair<ProcedureMember, ZKProcedureMemberRpcs>(mem, controller));
       controller.start(mem);



Mime
View raw message