Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B3C14200B43 for ; Tue, 19 Jul 2016 22:06:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B263C160A76; Tue, 19 Jul 2016 20:06:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 168C4160A5C for ; Tue, 19 Jul 2016 22:06:07 +0200 (CEST) Received: (qmail 81727 invoked by uid 500); 19 Jul 2016 20:06:07 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 81708 invoked by uid 99); 19 Jul 2016 20:06:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Jul 2016 20:06:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 17A79E03A6; Tue, 19 Jul 2016 20:06:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Tue, 19 Jul 2016 20:06:07 -0000 Message-Id: <03c8413d7a1142ceac71ad389890de07@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] accumulo git commit: ACCUMULO-4353: Stabilize tablet assignment during transient failure archived-at: Tue, 19 Jul 2016 20:06:09 -0000 Repository: accumulo Updated Branches: refs/heads/master 930592f5e -> ee8e0705c http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java b/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java new file mode 100644 index 0000000..27c57f0 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/MasterTime.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Timer; +import java.util.TimerTask; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.server.zookeeper.ZooReaderWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Keep a persistent roughly monotone view of how long a master has been overseeing this cluster. */ +public class MasterTime extends TimerTask { + private static final Logger log = LoggerFactory.getLogger(MasterTime.class); + + private final String zPath; + private final ZooReaderWriter zk; + private final Master master; + private final Timer timer; + + /** Difference between time stored in ZooKeeper and System.nanoTime() when we last read from ZooKeeper. */ + private long skewAmount; + + public MasterTime(Master master) throws IOException { + this.zPath = ZooUtil.getRoot(master.getInstance()) + Constants.ZMASTER_TICK; + this.zk = ZooReaderWriter.getInstance(); + this.master = master; + + try { + zk.putPersistentData(zPath, "0".getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.SKIP); + skewAmount = Long.parseLong(new String(zk.getData(zPath, null), StandardCharsets.UTF_8)) - System.nanoTime(); + } catch (Exception ex) { + throw new IOException("Error updating master time", ex); + } + + this.timer = new Timer(); + timer.schedule(this, 0, MILLISECONDS.convert(10, SECONDS)); + } + + /** + * How long has this cluster had a Master? + * + * @returns Approximate total duration this cluster has had a Master, in milliseconds. + */ + public synchronized long getTime() { + return MILLISECONDS.convert(System.nanoTime() + skewAmount, NANOSECONDS); + } + + /** Shut down the time keeping. */ + public void shutdown() { + timer.cancel(); + } + + @Override + public void run() { + switch (master.getMasterState()) { + // If we don't have the lock, periodically re-read the value in ZooKeeper, in case there's another master we're + // shadowing for. + case INITIAL: + case STOP: + try { + long zkTime = Long.parseLong(new String(zk.getData(zPath, null), StandardCharsets.UTF_8)); + synchronized (this) { + skewAmount = zkTime - System.nanoTime(); + } + } catch (Exception ex) { + if (log.isDebugEnabled()) { + log.debug("Failed to retrieve master tick time", ex); + } + } + break; + // If we do have the lock, periodically write our clock to ZooKeeper. + case HAVE_LOCK: + case SAFE_MODE: + case NORMAL: + case UNLOAD_METADATA_TABLETS: + case UNLOAD_ROOT_TABLET: + try { + zk.putPersistentData(zPath, Long.toString(System.nanoTime() + skewAmount).getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE); + } catch (Exception ex) { + if (log.isDebugEnabled()) { + log.debug("Failed to update master tick time", ex); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index e20335b..2cf7d9d 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -16,8 +16,8 @@ */ package org.apache.accumulo.master; +import com.google.common.collect.ImmutableSortedSet; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static java.lang.Math.min; import java.io.IOException; import java.util.ArrayList; @@ -92,8 +92,13 @@ import org.apache.thrift.TException; import com.google.common.base.Optional; import com.google.common.collect.Iterators; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.conf.TableConfiguration; +import static java.lang.Math.min; +import java.util.SortedSet; +import static java.lang.Math.min; -class TabletGroupWatcher extends Daemon { +abstract class TabletGroupWatcher extends Daemon { // Constants used to make sure assignment logging isn't excessive in quantity or size private static final String ASSIGNMENT_BUFFER_SEPARATOR = ", "; private static final int ASSINGMENT_BUFFER_MAX_LENGTH = 4096; @@ -101,9 +106,11 @@ class TabletGroupWatcher extends Daemon { private final Master master; final TabletStateStore store; final TabletGroupWatcher dependentWatcher; + private MasterState masterState; final TableStats stats = new TableStats(); + private SortedSet lastScanServers = ImmutableSortedSet.of(); TabletGroupWatcher(Master master, TabletStateStore store, TabletGroupWatcher dependentWatcher) { this.master = master; @@ -111,6 +118,9 @@ class TabletGroupWatcher extends Daemon { this.dependentWatcher = dependentWatcher; } + /** Should this {@code TabletGroupWatcher} suspend tablets? */ + abstract boolean canSuspendTablets(); + Map getStats() { return stats.getLast(); } @@ -124,9 +134,13 @@ class TabletGroupWatcher extends Daemon { return stats.getLast(tableId); } + /** True if the collection of live tservers specified in 'candidates' hasn't changed since the last time an assignment scan was started. */ + public synchronized boolean isSameTserversAsLastScan(Set candidates) { + return candidates.equals(lastScanServers); + } + @Override public void run() { - Thread.currentThread().setName("Watching " + store.name()); int[] oldCounts = new int[TabletState.values().length]; EventCoordinator.Listener eventListener = this.master.nextEvent.getListener(); @@ -158,6 +172,9 @@ class TabletGroupWatcher extends Daemon { if (currentTServers.size() == 0) { eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); + synchronized (this) { + lastScanServers = ImmutableSortedSet.of(); + } continue; } @@ -165,9 +182,10 @@ class TabletGroupWatcher extends Daemon { SortedMap destinations = new TreeMap<>(currentTServers); destinations.keySet().removeAll(this.master.serversToShutdown); - List assignments = new ArrayList<>(); - List assigned = new ArrayList<>(); + List assignments = new ArrayList(); + List assigned = new ArrayList(); List assignedToDeadServers = new ArrayList<>(); + List suspendedToGoneServers = new ArrayList<>(); Map unassigned = new HashMap<>(); Map> logsForDeadServers = new TreeMap<>(); @@ -192,15 +210,18 @@ class TabletGroupWatcher extends Daemon { // Don't overwhelm the tablet servers with work if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned); + flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, suspendedToGoneServers, unassigned); assignments.clear(); assigned.clear(); assignedToDeadServers.clear(); + suspendedToGoneServers.clear(); unassigned.clear(); unloaded = 0; eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); } String tableId = tls.extent.getTableId(); + TableConfiguration tableConf = this.master.getConfigurationFactory().getTableConfiguration(tableId); + MergeStats mergeStats = mergeStatsCache.get(tableId); if (mergeStats == null) { mergeStats = currentMerges.get(tableId); @@ -226,7 +247,7 @@ class TabletGroupWatcher extends Daemon { } // if we are shutting down all the tabletservers, we have to do it in order - if (goal == TabletGoalState.UNASSIGNED && state == TabletState.HOSTED) { + if (goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) { if (this.master.serversToShutdown.equals(currentTServers.keySet())) { if (dependentWatcher != null && dependentWatcher.assignedOrHosted() > 0) { goal = TabletGoalState.HOSTED; @@ -253,6 +274,29 @@ class TabletGroupWatcher extends Daemon { logsForDeadServers.put(tserver, wals.getWalsInUse(tserver)); } break; + case SUSPENDED: + if (master.getSteadyTime() - tls.suspend.suspensionTime < tableConf.getTimeInMillis(Property.TABLE_SUSPEND_DURATION)) { + // Tablet is suspended. See if its tablet server is back. + TServerInstance returnInstance = null; + Iterator find = destinations.tailMap(new TServerInstance(tls.suspend.server, " ")).keySet().iterator(); + if (find.hasNext()) { + TServerInstance found = find.next(); + if (found.getLocation().equals(tls.suspend.server)) { + returnInstance = found; + } + } + + // Old tablet server is back. Return this tablet to its previous owner. + if (returnInstance != null) { + assignments.add(new Assignment(tls.extent, returnInstance)); + } else { + // leave suspended, don't ask for a new assignment. + } + } else { + // Treat as unassigned, ask for a new assignment. + unassigned.put(tls.extent, server); + } + break; case UNASSIGNED: // maybe it's a finishing migration TServerInstance dest = this.master.migrations.get(tls.extent); @@ -276,6 +320,10 @@ class TabletGroupWatcher extends Daemon { } } else { switch (state) { + case SUSPENDED: + // Request a move to UNASSIGNED, so as to allow balancing to continue. + suspendedToGoneServers.add(tls); + // Fall through to unassigned to cancel migrations. case UNASSIGNED: TServerInstance dest = this.master.migrations.get(tls.extent); TableState tableState = TableManager.getInstance().getTableState(tls.extent.getTableId()); @@ -292,7 +340,7 @@ class TabletGroupWatcher extends Daemon { case HOSTED: TServerConnection conn = this.master.tserverSet.getConnection(server); if (conn != null) { - conn.unloadTablet(this.master.masterLock, tls.extent, goal != TabletGoalState.DELETED); + conn.unloadTablet(this.master.masterLock, tls.extent, goal.howUnload(), master.getSteadyTime()); unloaded++; totalUnloaded++; } else { @@ -306,7 +354,7 @@ class TabletGroupWatcher extends Daemon { counts[state.ordinal()]++; } - flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned); + flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, suspendedToGoneServers, unassigned); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(masterState); @@ -326,6 +374,9 @@ class TabletGroupWatcher extends Daemon { updateMergeState(mergeStatsCache); + synchronized (this) { + lastScanServers = ImmutableSortedSet.copyOf(currentTServers.keySet()); + } if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet())) { Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.)); eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS); @@ -749,15 +800,25 @@ class TabletGroupWatcher extends Daemon { } private void flushChanges(SortedMap currentTServers, List assignments, List assigned, - List assignedToDeadServers, Map> logsForDeadServers, Map unassigned) - throws DistributedStoreException, TException, WalMarkerException { + List assignedToDeadServers, Map> logsForDeadServers, List suspendedToGoneServers, + Map unassigned) throws DistributedStoreException, TException, WalMarkerException { + boolean tabletsSuspendable = canSuspendTablets(); if (!assignedToDeadServers.isEmpty()) { int maxServersToShow = min(assignedToDeadServers.size(), 100); Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); Master.log.debug("logs for dead servers: " + logsForDeadServers); - store.unassign(assignedToDeadServers, logsForDeadServers); + if (tabletsSuspendable) { + store.suspend(assignedToDeadServers, logsForDeadServers, master.getSteadyTime()); + } else { + store.unassign(assignedToDeadServers, logsForDeadServers); + } this.master.markDeadServerLogsAsClosed(logsForDeadServers); - this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size()); + this.master.nextEvent.event("Marked %d tablets as suspended because they don't have current servers", assignedToDeadServers.size()); + } + if (!suspendedToGoneServers.isEmpty()) { + int maxServersToShow = min(assignedToDeadServers.size(), 100); + Master.log.debug(assignedToDeadServers.size() + " suspended to gone servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); + store.unsuspend(suspendedToGoneServers); } if (!currentTServers.isEmpty()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java index a89100e..4cb858c 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java +++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java @@ -99,7 +99,7 @@ public class MergeStats { this.total++; if (state.equals(TabletState.HOSTED)) this.hosted++; - if (state.equals(TabletState.UNASSIGNED)) + if (state.equals(TabletState.UNASSIGNED) || state.equals(TabletState.SUSPENDED)) this.unassigned++; } @@ -217,7 +217,7 @@ public class MergeStats { return false; } - if (tls.getState(master.onlineTabletServers()) != TabletState.UNASSIGNED) { + if (tls.getState(master.onlineTabletServers()) != TabletState.UNASSIGNED && tls.getState(master.onlineTabletServers()) != TabletState.SUSPENDED) { log.debug("failing consistency: assigned or hosted " + tls); return false; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java index 73395ea..dd44bc6 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java +++ b/server/master/src/main/java/org/apache/accumulo/master/state/TableCounts.java @@ -36,4 +36,8 @@ public class TableCounts { public int hosted() { return counts[TabletState.HOSTED.ordinal()]; } + + public int suspended() { + return counts[TabletState.SUSPENDED.ordinal()]; + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java index 0bc989e..6497f96 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java @@ -176,7 +176,7 @@ public class RootTabletStateStoreTest { assertEquals(count, 1); TabletLocationState assigned = null; try { - assigned = new TabletLocationState(root, server, null, null, null, false); + assigned = new TabletLocationState(root, server, null, null, null, null, false); } catch (BadLocationStateException e) { fail("Unexpected error " + e); } @@ -203,7 +203,7 @@ public class RootTabletStateStoreTest { TabletLocationState broken = null; try { - broken = new TabletLocationState(notRoot, server, null, null, null, false); + broken = new TabletLocationState(notRoot, server, null, null, null, null, false); } catch (BadLocationStateException e) { fail("Unexpected error " + e); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 94e2ed9..c4df66d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -258,6 +258,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.net.HostAndPort; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; public class TabletServer extends AccumuloServerContext implements Runnable { @@ -329,7 +332,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { private final ZooAuthenticationKeyWatcher authKeyWatcher; private final WalStateManager walMarker; - public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) { + public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) throws IOException { super(confFactory); this.confFactory = confFactory; this.fs = fs; @@ -1549,7 +1552,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } @Override - public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, boolean save) { + public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent textent, TUnloadTabletGoal goal, long requestTime) { try { checkPermission(credentials, lock, "unloadTablet"); } catch (ThriftSecurityException e) { @@ -1559,7 +1562,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { KeyExtent extent = new KeyExtent(textent); - resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, save))); + resourceManager.addMigration(extent, new LoggingRunnable(log, new UnloadTabletHandler(extent, goal, requestTime))); } @Override @@ -1939,11 +1942,13 @@ public class TabletServer extends AccumuloServerContext implements Runnable { private class UnloadTabletHandler implements Runnable { private final KeyExtent extent; - private final boolean saveState; + private final TUnloadTabletGoal goalState; + private final long requestTimeSkew; - public UnloadTabletHandler(KeyExtent extent, boolean saveState) { + public UnloadTabletHandler(KeyExtent extent, TUnloadTabletGoal goalState, long requestTime) { this.extent = extent; - this.saveState = saveState; + this.goalState = goalState; + this.requestTimeSkew = requestTime - MILLISECONDS.convert(System.nanoTime(), NANOSECONDS); } @Override @@ -1982,7 +1987,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } try { - t.close(saveState); + t.close(!goalState.equals(TUnloadTabletGoal.DELETED)); } catch (Throwable e) { if ((t.isClosing() || t.isClosed()) && e instanceof IllegalStateException) { @@ -2003,12 +2008,18 @@ public class TabletServer extends AccumuloServerContext implements Runnable { TServerInstance instance = new TServerInstance(clientAddress, getLock().getSessionId()); TabletLocationState tls = null; try { - tls = new TabletLocationState(extent, null, instance, null, null, false); + tls = new TabletLocationState(extent, null, instance, null, null, null, false); } catch (BadLocationStateException e) { log.error("Unexpected error ", e); } - log.debug("Unassigning " + tls); - TabletStateStore.unassign(TabletServer.this, tls, null); + if (!goalState.equals(TUnloadTabletGoal.SUSPENDED) || extent.isRootTablet() + || (extent.isMeta() && !getConfiguration().getBoolean(Property.MASTER_METADATA_SUSPENDABLE))) { + log.debug("Unassigning " + tls); + TabletStateStore.unassign(TabletServer.this, tls, null); + } else { + log.debug("Suspending " + tls); + TabletStateStore.suspend(TabletServer.this, tls, null, requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS)); + } } catch (DistributedStoreException ex) { log.warn("Unable to update storage", ex); } catch (KeeperException e) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java index 30584a6..2d233c4 100644 --- a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java @@ -185,7 +185,7 @@ public class MergeStateIT extends ConfigurableMacBase { // take it offline m = tablet.getPrevRowUpdateMutation(); Collection> walogs = Collections.emptyList(); - metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null); + metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, null, walogs, false)), null); // now we can split stats = scan(state, metaDataStateStore); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java new file mode 100644 index 0000000..edd1aff --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/master/SuspendedTabletsIT.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.master; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ClientExec; +import org.apache.accumulo.core.client.impl.Credentials; +import org.apache.accumulo.core.client.impl.MasterClient; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.master.thrift.MasterClientService; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.server.master.state.MetaDataTableScanner; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.SetMultimap; +import com.google.common.net.HostAndPort; + +public class SuspendedTabletsIT extends ConfigurableMacBase { + private static final Logger log = LoggerFactory.getLogger(SuspendedTabletsIT.class); + private static final Random RANDOM = new Random(); + private static ExecutorService THREAD_POOL; + + public static final int TSERVERS = 5; + public static final long SUSPEND_DURATION = MILLISECONDS.convert(30, SECONDS); + public static final int TABLETS = 100; + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) { + cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "ms"); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); + cfg.setNumTservers(TSERVERS); + } + + @Test + public void crashAndResumeTserver() throws Exception { + // Run the test body. When we get to the point where we need a tserver to go away, get rid of it via crashing + suspensionTestBody(new TServerKiller() { + @Override + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) throws Exception { + List procs = new ArrayList<>(getCluster().getProcesses().get(ServerType.TABLET_SERVER)); + Collections.shuffle(procs); + + for (int i = 0; i < count; ++i) { + ProcessReference pr = procs.get(i); + log.info("Crashing {}", pr.getProcess()); + getCluster().killProcess(ServerType.TABLET_SERVER, pr); + } + } + }); + } + + @Test + public void shutdownAndResumeTserver() throws Exception { + // Run the test body. When we get to the point where we need tservers to go away, stop them via a clean shutdown. + suspensionTestBody(new TServerKiller() { + @Override + public void eliminateTabletServers(final ClientContext ctx, TabletLocations locs, int count) throws Exception { + Set tserversSet = new HashSet<>(); + for (TabletLocationState tls : locs.locationStates.values()) { + if (tls.current != null) { + tserversSet.add(tls.current); + } + } + List tserversList = new ArrayList<>(tserversSet); + Collections.shuffle(tserversList, RANDOM); + + for (int i = 0; i < count; ++i) { + final String tserverName = tserversList.get(i).toString(); + MasterClient.execute(ctx, new ClientExec() { + @Override + public void execute(MasterClientService.Client client) throws Exception { + log.info("Sending shutdown command to {} via MasterClientService", tserverName); + client.shutdownTabletServer(null, ctx.rpcCreds(), tserverName, false); + } + }); + } + + log.info("Waiting for tserver process{} to die", count == 1 ? "" : "es"); + for (int i = 0; i < 10; ++i) { + List deadProcs = new ArrayList<>(); + for (ProcessReference pr : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) { + Process p = pr.getProcess(); + if (!p.isAlive()) { + deadProcs.add(pr); + } + } + for (ProcessReference pr : deadProcs) { + log.info("Process {} is dead, informing cluster control about this", pr.getProcess()); + getCluster().getClusterControl().killProcess(ServerType.TABLET_SERVER, pr); + --count; + } + if (count == 0) { + return; + } else { + Thread.sleep(MILLISECONDS.convert(2, SECONDS)); + } + } + throw new IllegalStateException("Tablet servers didn't die!"); + } + }); + } + + /** + * Main test body for suspension tests. + * + * @param serverStopper + * callback which shuts down some tablet servers. + */ + private void suspensionTestBody(TServerKiller serverStopper) throws Exception { + Credentials creds = new Credentials("root", new PasswordToken(ROOT_PASSWORD)); + Instance instance = new ZooKeeperInstance(getCluster().getClientConfig()); + ClientContext ctx = new ClientContext(instance, creds, getCluster().getClientConfig()); + + String tableName = getUniqueNames(1)[0]; + + Connector conn = ctx.getConnector(); + + // Create a table with a bunch of splits + log.info("Creating table " + tableName); + conn.tableOperations().create(tableName); + SortedSet splitPoints = new TreeSet<>(); + for (int i = 1; i < TABLETS; ++i) { + splitPoints.add(new Text("" + i)); + } + conn.tableOperations().addSplits(tableName, splitPoints); + + // Wait for all of the tablets to hosted ... + log.info("Waiting on hosting and balance"); + TabletLocations ds; + for (ds = TabletLocations.retrieve(ctx, tableName); ds.hostedCount != TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) { + Thread.sleep(1000); + } + + // ... and balanced. + conn.instanceOperations().waitForBalance(); + do { + // Give at least another 5 seconds for migrations to finish up + Thread.sleep(5000); + ds = TabletLocations.retrieve(ctx, tableName); + } while (ds.hostedCount != TABLETS); + + // Pray all of our tservers have at least 1 tablet. + Assert.assertEquals(TSERVERS, ds.hosted.keySet().size()); + + // Kill two tablet servers hosting our tablets. This should put tablets into suspended state, and thus halt balancing. + + TabletLocations beforeDeathState = ds; + log.info("Eliminating tablet servers"); + serverStopper.eliminateTabletServers(ctx, beforeDeathState, 2); + + // Eventually some tablets will be suspended. + log.info("Waiting on suspended tablets"); + ds = TabletLocations.retrieve(ctx, tableName); + // Until we can scan the metadata table, the master probably can't either, so won't have been able to suspend the tablets. + // So we note the time that we were first able to successfully scan the metadata table. + long killTime = System.nanoTime(); + while (ds.suspended.keySet().size() != 2) { + Thread.sleep(1000); + ds = TabletLocations.retrieve(ctx, tableName); + } + + SetMultimap deadTabletsByServer = ds.suspended; + + // By this point, all tablets should be either hosted or suspended. All suspended tablets should + // "belong" to the dead tablet servers, and should be in exactly the same place as before any tserver death. + for (HostAndPort server : deadTabletsByServer.keySet()) { + Assert.assertEquals(deadTabletsByServer.get(server), beforeDeathState.hosted.get(server)); + } + Assert.assertEquals(TABLETS, ds.hostedCount + ds.suspendedCount); + + // Restart the first tablet server, making sure it ends up on the same port + HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next(); + log.info("Restarting " + restartedServer); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER, null, + ImmutableMap.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(), Property.TSERV_PORTSEARCH.getKey(), "false"), 1); + + // Eventually, the suspended tablets should be reassigned to the newly alive tserver. + log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer); + for (ds = TabletLocations.retrieve(ctx, tableName); ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0; ds = TabletLocations.retrieve(ctx, + tableName)) { + Thread.sleep(1000); + } + Assert.assertEquals(deadTabletsByServer.get(restartedServer), ds.hosted.get(restartedServer)); + + // Finally, after much longer, remaining suspended tablets should be reassigned. + log.info("Awaiting tablet reassignment for remaining tablets"); + for (ds = TabletLocations.retrieve(ctx, tableName); ds.hostedCount != TABLETS; ds = TabletLocations.retrieve(ctx, tableName)) { + Thread.sleep(1000); + } + + long recoverTime = System.nanoTime(); + Assert.assertTrue(recoverTime - killTime >= NANOSECONDS.convert(SUSPEND_DURATION, MILLISECONDS)); + } + + private static interface TServerKiller { + public void eliminateTabletServers(ClientContext ctx, TabletLocations locs, int count) throws Exception; + } + + private static final AtomicInteger threadCounter = new AtomicInteger(0); + + @BeforeClass + public static void init() { + THREAD_POOL = Executors.newCachedThreadPool(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "Scanning deadline thread #" + threadCounter.incrementAndGet()); + } + }); + } + + @AfterClass + public static void cleanup() { + THREAD_POOL.shutdownNow(); + } + + private static class TabletLocations { + public final Map locationStates = new HashMap<>(); + public final SetMultimap hosted = HashMultimap.create(); + public final SetMultimap suspended = HashMultimap.create(); + public int hostedCount = 0; + public int assignedCount = 0; + public int suspendedCount = 0; + public int unassignedCount = 0; + + private TabletLocations() {} + + public static TabletLocations retrieve(final ClientContext ctx, final String tableName) throws Exception { + int sleepTime = 200; + int remainingAttempts = 30; + + while (true) { + try { + FutureTask tlsFuture = new FutureTask<>(new Callable() { + @Override + public TabletLocations call() throws Exception { + TabletLocations answer = new TabletLocations(); + answer.scan(ctx, tableName); + return answer; + } + }); + THREAD_POOL.submit(tlsFuture); + return tlsFuture.get(5, SECONDS); + } catch (TimeoutException ex) { + log.debug("Retrieval timed out", ex); + } catch (Exception ex) { + log.warn("Failed to scan metadata", ex); + } + sleepTime = Math.min(2 * sleepTime, 10000); + Thread.sleep(sleepTime); + --remainingAttempts; + if (remainingAttempts == 0) { + Assert.fail("Scanning of metadata failed, aborting"); + } + } + } + + private void scan(ClientContext ctx, String tableName) throws Exception { + Map idMap = ctx.getConnector().tableOperations().tableIdMap(); + String tableId = Objects.requireNonNull(idMap.get(tableName)); + try (MetaDataTableScanner scanner = new MetaDataTableScanner(ctx, new Range())) { + while (scanner.hasNext()) { + TabletLocationState tls = scanner.next(); + + if (!tls.extent.getTableId().equals(tableId)) { + continue; + } + locationStates.put(tls.extent, tls); + if (tls.suspend != null) { + suspended.put(tls.suspend.server, tls.extent); + ++suspendedCount; + } else if (tls.current != null) { + hosted.put(tls.current.getLocation(), tls.extent); + ++hostedCount; + } else if (tls.future != null) { + ++assignedCount; + } else { + unassignedCount += 1; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3e5524c3/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 63a7771..05a0c54 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -79,6 +79,7 @@ import com.beust.jcommander.Parameter; import com.google.common.net.HostAndPort; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; /** * The purpose of this class is to server as fake tserver that is a data sink like /dev/null. NullTserver modifies the metadata location entries for a table to @@ -179,7 +180,7 @@ public class NullTserver { public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent) throws TException {} @Override - public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent, boolean save) throws TException {} + public void unloadTablet(TInfo tinfo, TCredentials credentials, String lock, TKeyExtent extent, TUnloadTabletGoal goal, long requestTime) throws TException {} @Override public List getActiveScans(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException {