Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5A1B918957 for ; Fri, 24 Apr 2015 23:20:48 +0000 (UTC) Received: (qmail 31184 invoked by uid 500); 24 Apr 2015 23:20:48 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 31036 invoked by uid 500); 24 Apr 2015 23:20:48 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 30853 invoked by uid 99); 24 Apr 2015 23:20:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 24 Apr 2015 23:20:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5CC5E1090; Fri, 24 Apr 2015 23:20:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Fri, 24 Apr 2015 23:20:49 -0000 Message-Id: <7bcb608593dd40518e4b427ccafd693c@git.apache.org> In-Reply-To: <11fdb148eaf74d94adaf4c8d004a2058@git.apache.org> References: <11fdb148eaf74d94adaf4c8d004a2058@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/34] accumulo git commit: ACCUMULO-3625 use log markers against tservers, not tablets http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index ed7626e..a95cffa 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -23,8 +23,6 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -60,6 +58,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; @@ -86,6 +85,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.tablets.TabletTime; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; @@ -120,7 +120,7 @@ public class MetadataTableUtil { return metadataTable; } - private synchronized static Writer getRootTable(ClientContext context) { + public synchronized static Writer getRootTable(ClientContext context) { Credentials credentials = context.getCredentials(); Writer rootTable = root_tables.get(credentials); if (rootTable == null) { @@ -227,7 +227,7 @@ public class MetadataTableUtil { // add before removing in case of process death for (LogEntry logEntry : logsToAdd) - addLogEntry(context, logEntry, zooLock); + addRootLogEntry(context, zooLock, logEntry); removeUnusedWALEntries(context, extent, logsToRemove, zooLock); } else { @@ -252,6 +252,39 @@ public class MetadataTableUtil { } } + private static interface ZooOperation { + void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException; + } + + private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) { + while (true) { + try { + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + if (zoo.isLockHeld(zooLock.getLockID())) { + op.run(zoo); + } + break; + } catch (KeeperException e) { + log.error(e, e); + } catch (InterruptedException e) { + log.error(e, e); + } catch (IOException e) { + log.error(e, e); + } + UtilWaitThread.sleep(1000); + } + } + + private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) { + retryZooKeeperUpdate(context, zooLock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = getZookeeperLogLocation(); + rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE); + } + }); + } + public static SortedMap getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException { TreeMap sizes = new TreeMap(); @@ -451,34 +484,6 @@ public class MetadataTableUtil { return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS; } - public static void addLogEntry(ClientContext context, LogEntry entry, ZooLock zooLock) { - if (entry.extent.isRootTablet()) { - String root = getZookeeperLogLocation(); - while (true) { - try { - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - if (zoo.isLockHeld(zooLock.getLockID())) { - String[] parts = entry.filename.split("/"); - String uniqueId = parts[parts.length - 1]; - zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE); - } - break; - } catch (KeeperException e) { - log.error(e, e); - } catch (InterruptedException e) { - log.error(e, e); - } catch (IOException e) { - log.error(e, e); - } - UtilWaitThread.sleep(1000); - } - } else { - Mutation m = new Mutation(entry.getRow()); - m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue()); - update(context, zooLock, m, entry.extent); - } - } - public static void setRootTabletDir(String dir) throws IOException { IZooReaderWriter zoo = ZooReaderWriter.getInstance(); String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH; @@ -569,22 +574,11 @@ public class MetadataTableUtil { } } - Collections.sort(result, new Comparator() { - @Override - public int compare(LogEntry o1, LogEntry o2) { - long diff = o1.timestamp - o2.timestamp; - if (diff < 0) - return -1; - if (diff > 0) - return 1; - return 0; - } - }); log.info("Returning logs " + result + " for extent " + extent); return result; } - static void getRootLogEntries(ArrayList result) throws KeeperException, InterruptedException, IOException { + static void getRootLogEntries(final ArrayList result) throws KeeperException, InterruptedException, IOException { IZooReaderWriter zoo = ZooReaderWriter.getInstance(); String root = getZookeeperLogLocation(); // there's a little race between getting the children and fetching @@ -592,11 +586,10 @@ public class MetadataTableUtil { while (true) { result.clear(); for (String child : zoo.getChildren(root)) { - LogEntry e = new LogEntry(); try { - e.fromBytes(zoo.getData(root + "/" + child, null)); + LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null)); // upgrade from !0;!0<< -> +r<< - e.extent = RootTable.EXTENT; + e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename); result.add(e); } catch (KeeperException.NoNodeException ex) { continue; @@ -666,28 +659,23 @@ public class MetadataTableUtil { return new LogEntryIterator(context); } - public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, List logEntries, ZooLock zooLock) { + public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List entries, ZooLock zooLock) { if (extent.isRootTablet()) { - for (LogEntry entry : logEntries) { - String root = getZookeeperLogLocation(); - while (true) { - try { - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - if (zoo.isLockHeld(zooLock.getLockID())) { - String parts[] = entry.filename.split("/"); - zoo.recursiveDelete(root + "/" + parts[parts.length - 1], NodeMissingPolicy.SKIP); - } - break; - } catch (Exception e) { - log.error(e, e); + retryZooKeeperUpdate(context, zooLock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = getZookeeperLogLocation(); + for (LogEntry entry : entries) { + String path = root + "/" + entry.getUniqueID(); + log.debug("Removing " + path + " from zookeeper"); + rw.recursiveDelete(path, NodeMissingPolicy.SKIP); } - UtilWaitThread.sleep(1000); } - } + }); } else { Mutation m = new Mutation(extent.getMetadataEntry()); - for (LogEntry entry : logEntries) { - m.putDelete(LogColumnFamily.NAME, new Text(entry.getName())); + for (LogEntry entry : entries) { + m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier()); } update(context, zooLock, m, extent); } @@ -1072,4 +1060,106 @@ public class MetadataTableUtil { return tabletEntries; } + public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final String filename, KeyExtent extent) { + log.debug("Adding log entry " + filename); + if (extent.isRootTablet()) { + retryZooKeeperUpdate(context, zooLock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; + String[] parts = filename.split("/"); + String uniqueId = parts[parts.length - 1]; + String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId; + rw.putPersistentData(path, filename.getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + } + }); + } else { + Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString()); + m.put("log", filename, new Value(EMPTY_BYTES)); + String tableName = MetadataTable.NAME; + if (extent.isMeta()) { + tableName = RootTable.NAME; + } + try { + BatchWriter bw = context.getConnector().createBatchWriter(tableName, null); + bw.addMutation(m); + bw.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final String filename) { + retryZooKeeperUpdate(context, zooLock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; + String[] parts = filename.split("/"); + String uniqueId = parts[parts.length - 1]; + String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId; + log.debug("Removing entry " + path + " from zookeeper"); + rw.recursiveDelete(path, NodeMissingPolicy.SKIP); + } + }); + } + + public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set all) throws AccumuloException { + try { + BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null); + BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null); + for (String fname : all) { + Text tname = new Text(fname.getBytes(UTF_8)); + Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString()); + m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname); + root.addMutation(m); + log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname); + m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString()); + m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED); + meta.addMutation(m); + removeCurrentRootLogMarker(context, lock, tabletSession, fname); + } + root.close(); + meta.close(); + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } + + public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map> logsForDeadServers) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + // already cached + if (logsForDeadServers.containsKey(server)) { + return; + } + if (extent.isRootTablet()) { + final List logs = new ArrayList<>(); + retryZooKeeperUpdate(context, lock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; + logs.clear(); + for (String child : rw.getChildren(root)) { + logs.add(new String(rw.getData(root + "/" + child, null), UTF_8)); + } + } + }); + logsForDeadServers.put(server, logs); + } else { + // use the correct meta table + String table = MetadataTable.NAME; + if (extent.isMeta()) { + table = RootTable.NAME; + } + // fetch the current logs in use, and put them in the cache + Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY); + scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString())); + List logs = new ArrayList<>(); + for (Entry entry : scanner) { + logs.add(entry.getKey().getColumnQualifier().toString()); + } + logsForDeadServers.put(server, logs); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java index 344e245..0de0b0e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.server.util; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -176,20 +175,14 @@ public class ReplicationTableUtil { /** * Write replication ingest entries for each provided file with the given {@link Status}. */ - public static void updateFiles(ClientContext context, KeyExtent extent, Collection files, Status stat) { + public static void updateFiles(ClientContext context, KeyExtent extent, String file, Status stat) { if (log.isDebugEnabled()) { - log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat)); + log.debug("Updating replication status for " + extent + " with " + file + " using " + ProtobufUtil.toString(stat)); } // TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294 - if (files.isEmpty()) { - return; - } Value v = ProtobufUtil.toValue(stat); - for (String file : files) { - // TODO Can preclude this addition if the extent is for a table we don't need to replicate - update(context, createUpdateMutation(new Path(file), v, extent), extent); - } + update(context, createUpdateMutation(new Path(file), v, extent), extent); } static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java index 355fa42..375e263 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java @@ -94,7 +94,7 @@ public class ReplicationTableUtilTest { String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid; long createdTime = System.currentTimeMillis(); - ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime)); + ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), myFile, StatusUtil.fileCreated(createdTime)); verify(writer); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 35c60d6..2561eec 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -18,7 +18,7 @@ package org.apache.accumulo.gc; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -26,21 +26,22 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.UUID; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; @@ -48,29 +49,29 @@ import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.ReplicationTableOfflineException; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client; -import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; -import org.apache.accumulo.core.trace.Tracer; -import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.AccumuloServerContext; -import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.accumulo.server.master.LiveTServerSet; +import org.apache.accumulo.server.master.LiveTServerSet.Listener; +import org.apache.accumulo.server.master.state.MetaDataStateStore; +import org.apache.accumulo.server.master.state.RootTabletStateStore; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.server.master.state.TabletLocationState; +import org.apache.accumulo.server.master.state.TabletState; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.thrift.TException; +import org.apache.hadoop.io.Text; +import org.apache.htrace.Span; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.net.HostAndPort; import com.google.protobuf.InvalidProtocolBufferException; @@ -79,8 +80,7 @@ public class GarbageCollectWriteAheadLogs { private final AccumuloServerContext context; private final VolumeManager fs; - - private boolean useTrash; + private final boolean useTrash; /** * Creates a new GC WAL object. @@ -98,54 +98,33 @@ public class GarbageCollectWriteAheadLogs { this.useTrash = useTrash; } - /** - * Gets the instance used by this object. - * - * @return instance - */ - Instance getInstance() { - return context.getInstance(); - } - - /** - * Gets the volume manager used by this object. - * - * @return volume manager - */ - VolumeManager getVolumeManager() { - return fs; - } - - /** - * Checks if the volume manager should move files to the trash rather than delete them. - * - * @return true if trash is used - */ - boolean isUsingTrash() { - return useTrash; - } - public void collect(GCStatus status) { - Span span = Trace.start("scanServers"); + Span span = Trace.start("getCandidates"); try { - - Map sortedWALogs = getSortedWALogs(); + LiveTServerSet liveServers = new LiveTServerSet(context, new Listener() { + @Override + public void update(LiveTServerSet current, Set deleted, Set added) { + log.debug("New tablet server noticed: " + added); + log.debug("Tablet server removed: " + deleted); + } + }); + Set currentServers = liveServers.getCurrentServers(); status.currentLog.started = System.currentTimeMillis(); - Map fileToServerMap = new HashMap(); - Map nameToFileMap = new HashMap(); - int count = scanServers(fileToServerMap, nameToFileMap); + Map > candidates = new HashMap<>(); + long count = getCurrent(candidates, currentServers); long fileScanStop = System.currentTimeMillis(); - log.info(String.format("Fetched %d files from %d servers in %.2f seconds", fileToServerMap.size(), count, + + log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(), (fileScanStop - status.currentLog.started) / 1000.)); - status.currentLog.candidates = fileToServerMap.size(); + status.currentLog.candidates = count; span.stop(); span = Trace.start("removeMetadataEntries"); try { - count = removeMetadataEntries(nameToFileMap, sortedWALogs, status); + count = removeMetadataEntries(candidates, status, currentServers); } catch (Exception ex) { log.error("Unable to scan metadata table", ex); return; @@ -158,7 +137,7 @@ public class GarbageCollectWriteAheadLogs { span = Trace.start("removeReplicationEntries"); try { - count = removeReplicationEntries(nameToFileMap, sortedWALogs, status); + count = removeReplicationEntries(candidates, status); } catch (Exception ex) { log.error("Unable to scan replication table", ex); return; @@ -170,16 +149,23 @@ public class GarbageCollectWriteAheadLogs { log.info(String.format("%d replication entries scanned in %.2f seconds", count, (replicationEntryScanStop - logEntryScanStop) / 1000.)); span = Trace.start("removeFiles"); - Map> serverToFileMap = mapServersToFiles(fileToServerMap, nameToFileMap); - count = removeFiles(nameToFileMap, serverToFileMap, sortedWALogs, status); + count = removeFiles(candidates, status); long removeStop = System.currentTimeMillis(); - log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, serverToFileMap.size(), (removeStop - logEntryScanStop) / 1000.)); + log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.)); + span.stop(); + + span = Trace.start("removeMarkers"); + count = removeMarkers(candidates); + long removeMarkersStop = System.currentTimeMillis(); + log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.)); + span.stop(); + + status.currentLog.finished = removeStop; status.lastLog = status.currentLog; status.currentLog = new GcCycleStats(); - span.stop(); } catch (Exception e) { log.error("exception occured while garbage collecting write ahead logs", e); @@ -188,161 +174,82 @@ public class GarbageCollectWriteAheadLogs { } } - boolean holdsLock(HostAndPort addr) { + private long removeMarkers(Map> candidates) { + long result = 0; try { - String zpath = ZooUtil.getRoot(context.getInstance()) + Constants.ZTSERVERS + "/" + addr.toString(); - List children = ZooReaderWriter.getInstance().getChildren(zpath); - return !(children == null || children.isEmpty()); - } catch (KeeperException.NoNodeException ex) { - return false; - } catch (Exception ex) { - log.debug(ex.toString(), ex); - return true; - } - } - - private int removeFiles(Map nameToFileMap, Map> serverToFileMap, Map sortedWALogs, final GCStatus status) { - for (Entry> entry : serverToFileMap.entrySet()) { - if (entry.getKey().isEmpty()) { - // old-style log entry, just remove it - for (Path path : entry.getValue()) { - log.debug("Removing old-style WAL " + path); - try { - if (!useTrash || !fs.moveToTrash(path)) - fs.deleteRecursively(path); - status.currentLog.deleted++; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal " + path + ": " + ex); - } - } - } else { - HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false); - if (!holdsLock(address)) { - for (Path path : entry.getValue()) { - log.debug("Removing WAL for offline server " + path); - try { - if (!useTrash || !fs.moveToTrash(path)) - fs.deleteRecursively(path); - status.currentLog.deleted++; - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ex) { - log.error("Unable to delete wal " + path + ": " + ex); - } - } - continue; - } else { - Client tserver = null; - try { - tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context); - tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue())); - log.debug("deleted " + entry.getValue() + " from " + entry.getKey()); - status.currentLog.deleted += entry.getValue().size(); - } catch (TException e) { - log.warn("Error talking to " + address + ": " + e); - } finally { - if (tserver != null) - ThriftUtil.returnClient(tserver); - } + BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null); + BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null); + for (Entry> entry : candidates.entrySet()) { + Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.toString()); + for (String wal : entry.getValue()) { + m.putDelete(CurrentLogsSection.COLF, new Text(wal)); + result++; } + root.addMutation(m); + meta.addMutation(m); } + meta.close(); + root.close(); + } catch (Exception ex) { + throw new RuntimeException(ex); } + return result; + } - for (Path swalog : sortedWALogs.values()) { - log.debug("Removing sorted WAL " + swalog); - try { - if (!useTrash || !fs.moveToTrash(swalog)) { - fs.deleteRecursively(swalog); - } - } catch (FileNotFoundException ex) { - // ignored - } catch (IOException ioe) { + private long removeFiles(Map > candidates, final GCStatus status) { + for (Entry> entry : candidates.entrySet()) { + for (String walog : entry.getValue()) { + log.debug("Removing WAL for offline server " + entry.getKey() + " log " + walog); + Path path = new Path(walog); try { - if (fs.exists(swalog)) { - log.error("Unable to delete sorted walog " + swalog + ": " + ioe); - } + if (!useTrash || !fs.moveToTrash(path)) + fs.deleteRecursively(path); + status.currentLog.deleted++; + } catch (FileNotFoundException ex) { + // ignored } catch (IOException ex) { - log.error("Unable to check for the existence of " + swalog, ex); + log.error("Unable to delete wal " + path + ": " + ex); } } } - - return 0; + return status.currentLog.deleted; } - /** - * Converts a list of paths to their corresponding strings. - * - * @param paths - * list of paths - * @return string forms of paths - */ - static List paths2strings(List paths) { - List result = new ArrayList(paths.size()); - for (Path path : paths) - result.add(path.toString()); - return result; - } + private long removeMetadataEntries(Map > candidates, GCStatus status, Set liveServers) throws IOException, KeeperException, + InterruptedException { - /** - * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the - * mapping of file names to paths is skipped. - * - * @param fileToServerMap - * map of file paths to servers - * @param nameToFileMap - * map of file names to paths - * @return map of servers to lists of file paths - */ - static Map> mapServersToFiles(Map fileToServerMap, Map nameToFileMap) { - Map> result = new HashMap>(); - for (Entry fileServer : fileToServerMap.entrySet()) { - if (!nameToFileMap.containsKey(fileServer.getKey().getName())) - continue; - ArrayList files = result.get(fileServer.getValue()); - if (files == null) { - files = new ArrayList(); - result.put(fileServer.getValue(), files); + // remove any entries if there's a log reference, or a tablet is still assigned to the dead server + + Map walToDeadServer = new HashMap<>(); + for (Entry> entry : candidates.entrySet()) { + for (String file : entry.getValue()) { + walToDeadServer.put(file, entry.getKey()); } - files.add(fileServer.getKey()); } - return result; - } - - protected int removeMetadataEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, - InterruptedException { - int count = 0; - Iterator iterator = MetadataTableUtil.getLogEntries(context); - - // For each WAL reference in the metadata table - while (iterator.hasNext()) { - // Each metadata reference has at least one WAL file - for (String entry : iterator.next().logSet) { - // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases - // the last "/" will mark a UUID file name. - String uuid = entry.substring(entry.lastIndexOf("/") + 1); - if (!isUUID(uuid)) { - // fully expect this to be a uuid, if its not then something is wrong and walog GC should not proceed! - throw new IllegalArgumentException("Expected uuid, but got " + uuid + " from " + entry); - } - - Path pathFromNN = nameToFileMap.remove(uuid); - if (pathFromNN != null) { - status.currentLog.inUse++; - sortedWALogs.remove(uuid); + long count = 0; + RootTabletStateStore root = new RootTabletStateStore(context); + MetaDataStateStore meta = new MetaDataStateStore(context); + Iterator states = Iterators.concat(root.iterator(), meta.iterator()); + while (states.hasNext()) { + count++; + TabletLocationState state = states.next(); + if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) { + candidates.remove(state.current); + } + for (Collection wals : state.walogs) { + for (String wal : wals) { + TServerInstance dead = walToDeadServer.get(wal); + if (dead != null) { + candidates.get(dead).remove(wal); + } } - - count++; } } - return count; } - protected int removeReplicationEntries(Map nameToFileMap, Map sortedWALogs, GCStatus status) throws IOException, KeeperException, - InterruptedException { + protected int removeReplicationEntries(Map > candidates, GCStatus status) throws IOException, KeeperException, + InterruptedException { Connector conn; try { conn = context.getConnector(); @@ -353,21 +260,25 @@ public class GarbageCollectWriteAheadLogs { int count = 0; - Iterator> walIter = nameToFileMap.entrySet().iterator(); + Iterator>> walIter = candidates.entrySet().iterator(); while (walIter.hasNext()) { - Entry wal = walIter.next(); - String fullPath = wal.getValue().toString(); - if (neededByReplication(conn, fullPath)) { - log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath); - // If we haven't already removed it, check to see if this WAL is - // "in use" by replication (needed for replication purposes) - status.currentLog.inUse++; - + Entry> wal = walIter.next(); + Iterator paths = wal.getValue().iterator(); + while (paths.hasNext()) { + String fullPath = paths.next(); + if (neededByReplication(conn, fullPath)) { + log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath); + // If we haven't already removed it, check to see if this WAL is + // "in use" by replication (needed for replication purposes) + status.currentLog.inUse++; + paths.remove(); + } else { + log.debug("WAL not needed for replication {}", fullPath); + } + } + if (wal.getValue().isEmpty()) { walIter.remove(); - sortedWALogs.remove(wal.getKey()); - } else { - log.debug("WAL not needed for replication {}", fullPath); } count++; } @@ -375,6 +286,7 @@ public class GarbageCollectWriteAheadLogs { return count; } + /** * Determine if the given WAL is needed for replication * @@ -435,107 +347,54 @@ public class GarbageCollectWriteAheadLogs { return metaScanner; } - private int scanServers(Map fileToServerMap, Map nameToFileMap) throws Exception { - return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap); - } - - /** - * Scans write-ahead log directories for logs. The maps passed in are populated with scan information. - * - * @param walDirs - * write-ahead log directories - * @param fileToServerMap - * map of file paths to servers - * @param nameToFileMap - * map of file names to paths - * @return number of servers located (including those with no logs present) - */ - int scanServers(String[] walDirs, Map fileToServerMap, Map nameToFileMap) throws Exception { - Set servers = new HashSet(); - for (String walDir : walDirs) { - Path walRoot = new Path(walDir); - FileStatus[] listing = null; - try { - listing = fs.listStatus(walRoot); - } catch (FileNotFoundException e) { - // ignore dir - } - if (listing == null) - continue; - for (FileStatus status : listing) { - String server = status.getPath().getName(); - if (status.isDirectory()) { - servers.add(server); - for (FileStatus file : fs.listStatus(new Path(walRoot, server))) { - if (isUUID(file.getPath().getName())) { - fileToServerMap.put(file.getPath(), server); - nameToFileMap.put(file.getPath().getName(), file.getPath()); - } else { - log.info("Ignoring file " + file.getPath() + " because it doesn't look like a uuid"); - } - } - } else if (isUUID(server)) { - // old-style WAL are not under a directory - servers.add(""); - fileToServerMap.put(status.getPath(), ""); - nameToFileMap.put(server, status.getPath()); - } else { - log.info("Ignoring file " + status.getPath() + " because it doesn't look like a uuid"); - } - } - } - return servers.size(); - } - private Map getSortedWALogs() throws IOException { - return getSortedWALogs(ServerConstants.getRecoveryDirs()); - } /** - * Looks for write-ahead logs in recovery directories. + * Scans log markers. The map passed in is populated with the logs for dead servers. * - * @param recoveryDirs - * recovery directories - * @return map of log file names to paths + * @param logsForDeadServers + * map of dead server to log file entries + * @return total number of log files */ - Map getSortedWALogs(String[] recoveryDirs) throws IOException { - Map result = new HashMap(); - - for (String dir : recoveryDirs) { - Path recoveryDir = new Path(dir); - - if (fs.exists(recoveryDir)) { - for (FileStatus status : fs.listStatus(recoveryDir)) { - String name = status.getPath().getName(); - if (isUUID(name)) { - result.put(name, status.getPath()); - } else { - log.debug("Ignoring file " + status.getPath() + " because it doesn't look like a uuid"); - } + private long getCurrent(Map > logsForDeadServers, Set currentServers) throws Exception { + Set rootWALs = new HashSet(); + // Get entries in zookeeper: + String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS; + ZooReaderWriter zoo = ZooReaderWriter.getInstance(); + List children = zoo.getChildren(zpath); + for (String child : children) { + LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null)); + rootWALs.add(entry.filename); + } + long count = 0; + + // get all the WAL markers that are not in zookeeper for dead servers + Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY); + rootScanner.setRange(CurrentLogsSection.getRange()); + Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY); + metaScanner.setRange(CurrentLogsSection.getRange()); + Iterator> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator()); + Text hostAndPort = new Text(); + Text sessionId = new Text(); + Text filename = new Text(); + while (entries.hasNext()) { + Entry entry = entries.next(); + CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId); + CurrentLogsSection.getPath(entry.getKey(), filename); + TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString()); + if ((!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED)) && !rootWALs.contains(filename)) { + Set logs = logsForDeadServers.get(tsi); + if (logs == null) { + logsForDeadServers.put(tsi, logs = new HashSet()); + } + if (logs.add(new Path(filename.toString()).toString())) { + count++; } } } - return result; - } - /** - * Checks if a string is a valid UUID. - * - * @param name - * string to check - * @return true if string is a UUID - */ - static boolean isUUID(String name) { - if (name == null || name.length() != 36) { - return false; - } - try { - UUID.fromString(name); - return true; - } catch (IllegalArgumentException ex) { - return false; - } + return count; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 35005d8..9328225 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -568,7 +568,6 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa replSpan.stop(); } - // Clean up any unused write-ahead logs Span waLogs = Trace.start("walogs"); try { GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index 9b60c88..8185f23 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -37,15 +37,13 @@ import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.Span; import org.apache.accumulo.core.trace.Trace; @@ -186,20 +184,21 @@ public class CloseWriteAheadLogReferences implements Runnable { try { // TODO Configurable number of threads bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); - bs.setRanges(Collections.singleton(TabletsSection.getRange())); - bs.fetchColumnFamily(LogColumnFamily.NAME); + bs.setRanges(Collections.singleton(CurrentLogsSection.getRange())); + bs.fetchColumnFamily(CurrentLogsSection.COLF); // For each log key/value in the metadata table for (Entry entry : bs) { - // The value may contain multiple WALs - LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); - - log.debug("Found WALs for table(" + logEntry.extent.getTableId() + "): " + logEntry.logSet); + if (entry.getValue().equals(CurrentLogsSection.UNUSED)) { + continue; + } + Text tpath = new Text(); + CurrentLogsSection.getPath(entry.getKey(), tpath); + String path = new Path(tpath.toString()).toString(); + log.debug("Found WAL " + path.toString()); // Normalize each log file (using Path) and add it to the set - for (String logFile : logEntry.logSet) { - referencedWals.add(normalizedWalPaths.get(logFile)); - } + referencedWals.add(normalizedWalPaths.get(path)); } } catch (TableNotFoundException e) { // uhhhh @@ -248,6 +247,8 @@ public class CloseWriteAheadLogReferences implements Runnable { MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText); String replFile = replFileText.toString(); boolean isReferenced = referencedWals.contains(replFile); + log.debug("replFile " + replFile); + log.debug("referencedWals " + referencedWals); // We only want to clean up WALs (which is everything but rfiles) and only when // metadata doesn't have a reference to the given WAL http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java deleted file mode 100644 index 5224f28..0000000 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ /dev/null @@ -1,568 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.gc; - -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.UUID; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.conf.SiteConfiguration; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.gc.thrift.GCStatus; -import org.apache.accumulo.core.gc.thrift.GcCycleStats; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; -import org.apache.accumulo.core.protobuf.ProtobufUtil; -import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; -import org.apache.accumulo.core.replication.ReplicationTable; -import org.apache.accumulo.core.replication.StatusUtil; -import org.apache.accumulo.core.replication.proto.Replication.Status; -import org.apache.accumulo.server.AccumuloServerContext; -import org.apache.accumulo.server.conf.ServerConfigurationFactory; -import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; - -public class GarbageCollectWriteAheadLogsTest { - private static final long BLOCK_SIZE = 64000000L; - - private static final Path DIR_1_PATH = new Path("/dir1"); - private static final Path DIR_2_PATH = new Path("/dir2"); - private static final Path DIR_3_PATH = new Path("/dir3"); - private static final String UUID1 = UUID.randomUUID().toString(); - private static final String UUID2 = UUID.randomUUID().toString(); - private static final String UUID3 = UUID.randomUUID().toString(); - - private Instance instance; - private AccumuloConfiguration systemConfig; - private VolumeManager volMgr; - private GarbageCollectWriteAheadLogs gcwal; - private AccumuloServerContext context; - private long modTime; - - @Rule - public TestName testName = new TestName(); - - @Before - public void setUp() throws Exception { - SiteConfiguration siteConfig = EasyMock.createMock(SiteConfiguration.class); - instance = createMock(Instance.class); - expect(instance.getInstanceID()).andReturn("mock").anyTimes(); - expect(instance.getZooKeepers()).andReturn("localhost").anyTimes(); - expect(instance.getZooKeepersSessionTimeOut()).andReturn(30000).anyTimes(); - systemConfig = new ConfigurationCopy(new HashMap()); - volMgr = createMock(VolumeManager.class); - ServerConfigurationFactory factory = createMock(ServerConfigurationFactory.class); - expect(factory.getConfiguration()).andReturn(systemConfig).anyTimes(); - expect(factory.getInstance()).andReturn(instance).anyTimes(); - expect(factory.getSiteConfiguration()).andReturn(siteConfig).anyTimes(); - - // Just make the SiteConfiguration delegate to our AccumuloConfiguration - // Presently, we only need get(Property) and iterator(). - EasyMock.expect(siteConfig.get(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer() { - @Override - public String answer() { - Object[] args = EasyMock.getCurrentArguments(); - return systemConfig.get((Property) args[0]); - } - }).anyTimes(); - EasyMock.expect(siteConfig.getBoolean(EasyMock.anyObject(Property.class))).andAnswer(new IAnswer() { - @Override - public Boolean answer() { - Object[] args = EasyMock.getCurrentArguments(); - return systemConfig.getBoolean((Property) args[0]); - } - }).anyTimes(); - - EasyMock.expect(siteConfig.iterator()).andAnswer(new IAnswer>>() { - @Override - public Iterator> answer() { - return systemConfig.iterator(); - } - }).anyTimes(); - - replay(instance, factory, siteConfig); - AccumuloServerContext context = new AccumuloServerContext(factory); - gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false); - modTime = System.currentTimeMillis(); - } - - @Test - public void testGetters() { - assertSame(instance, gcwal.getInstance()); - assertSame(volMgr, gcwal.getVolumeManager()); - assertFalse(gcwal.isUsingTrash()); - } - - @Test - public void testPathsToStrings() { - ArrayList paths = new ArrayList(); - paths.add(new Path(DIR_1_PATH, "file1")); - paths.add(DIR_2_PATH); - paths.add(new Path(DIR_3_PATH, "file3")); - List strings = GarbageCollectWriteAheadLogs.paths2strings(paths); - int len = 3; - assertEquals(len, strings.size()); - for (int i = 0; i < len; i++) { - assertEquals(paths.get(i).toString(), strings.get(i)); - } - } - - @Test - public void testMapServersToFiles() { - // @formatter:off - /* - * Test fileToServerMap: - * /dir1/server1/uuid1 -> server1 (new-style) - * /dir1/uuid2 -> "" (old-style) - * /dir3/server3/uuid3 -> server3 (new-style) - */ - // @formatter:on - Map fileToServerMap = new java.util.HashMap(); - Path path1 = new Path(new Path(DIR_1_PATH, "server1"), UUID1); - fileToServerMap.put(path1, "server1"); // new-style - Path path2 = new Path(DIR_1_PATH, UUID2); - fileToServerMap.put(path2, ""); // old-style - Path path3 = new Path(new Path(DIR_3_PATH, "server3"), UUID3); - fileToServerMap.put(path3, "server3"); // old-style - // @formatter:off - /* - * Test nameToFileMap: - * uuid1 -> /dir1/server1/uuid1 - * uuid3 -> /dir3/server3/uuid3 - */ - // @formatter:on - Map nameToFileMap = new java.util.HashMap(); - nameToFileMap.put(UUID1, path1); - nameToFileMap.put(UUID3, path3); - - // @formatter:off - /* - * Expected map: - * server1 -> [ /dir1/server1/uuid1 ] - * server3 -> [ /dir3/server3/uuid3 ] - */ - // @formatter:on - Map> result = GarbageCollectWriteAheadLogs.mapServersToFiles(fileToServerMap, nameToFileMap); - assertEquals(2, result.size()); - ArrayList list1 = result.get("server1"); - assertEquals(1, list1.size()); - assertTrue(list1.contains(path1)); - ArrayList list3 = result.get("server3"); - assertEquals(1, list3.size()); - assertTrue(list3.contains(path3)); - } - - private FileStatus makeFileStatus(int size, Path path) { - boolean isDir = (size == 0); - return new FileStatus(size, isDir, 3, BLOCK_SIZE, modTime, path); - } - - private void mockListStatus(Path dir, FileStatus... fileStatuses) throws Exception { - expect(volMgr.listStatus(dir)).andReturn(fileStatuses); - } - - @Test - public void testScanServers_NewStyle() throws Exception { - String[] walDirs = new String[] {"/dir1", "/dir2", "/dir3"}; - // @formatter:off - /* - * Test directory layout: - * /dir1/ - * server1/ - * uuid1 - * file2 - * subdir2/ - * /dir2/ missing - * /dir3/ - * server3/ - * uuid3 - */ - // @formatter:on - Path serverDir1Path = new Path(DIR_1_PATH, "server1"); - FileStatus serverDir1 = makeFileStatus(0, serverDir1Path); - Path subDir2Path = new Path(DIR_1_PATH, "subdir2"); - FileStatus serverDir2 = makeFileStatus(0, subDir2Path); - mockListStatus(DIR_1_PATH, serverDir1, serverDir2); - Path path1 = new Path(serverDir1Path, UUID1); - FileStatus file1 = makeFileStatus(100, path1); - FileStatus file2 = makeFileStatus(200, new Path(serverDir1Path, "file2")); - mockListStatus(serverDir1Path, file1, file2); - mockListStatus(subDir2Path); - expect(volMgr.listStatus(DIR_2_PATH)).andThrow(new FileNotFoundException()); - Path serverDir3Path = new Path(DIR_3_PATH, "server3"); - FileStatus serverDir3 = makeFileStatus(0, serverDir3Path); - mockListStatus(DIR_3_PATH, serverDir3); - Path path3 = new Path(serverDir3Path, UUID3); - FileStatus file3 = makeFileStatus(300, path3); - mockListStatus(serverDir3Path, file3); - replay(volMgr); - - Map fileToServerMap = new java.util.HashMap(); - Map nameToFileMap = new java.util.HashMap(); - int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap); - assertEquals(3, count); - // @formatter:off - /* - * Expected fileToServerMap: - * /dir1/server1/uuid1 -> server1 - * /dir3/server3/uuid3 -> server3 - */ - // @formatter:on - assertEquals(2, fileToServerMap.size()); - assertEquals("server1", fileToServerMap.get(path1)); - assertEquals("server3", fileToServerMap.get(path3)); - // @formatter:off - /* - * Expected nameToFileMap: - * uuid1 -> /dir1/server1/uuid1 - * uuid3 -> /dir3/server3/uuid3 - */ - // @formatter:on - assertEquals(2, nameToFileMap.size()); - assertEquals(path1, nameToFileMap.get(UUID1)); - assertEquals(path3, nameToFileMap.get(UUID3)); - } - - @Test - public void testScanServers_OldStyle() throws Exception { - // @formatter:off - /* - * Test directory layout: - * /dir1/ - * uuid1 - * /dir3/ - * uuid3 - */ - // @formatter:on - String[] walDirs = new String[] {"/dir1", "/dir3"}; - Path serverFile1Path = new Path(DIR_1_PATH, UUID1); - FileStatus serverFile1 = makeFileStatus(100, serverFile1Path); - mockListStatus(DIR_1_PATH, serverFile1); - Path serverFile3Path = new Path(DIR_3_PATH, UUID3); - FileStatus serverFile3 = makeFileStatus(300, serverFile3Path); - mockListStatus(DIR_3_PATH, serverFile3); - replay(volMgr); - - Map fileToServerMap = new java.util.HashMap(); - Map nameToFileMap = new java.util.HashMap(); - int count = gcwal.scanServers(walDirs, fileToServerMap, nameToFileMap); - /* - * Expect only a single server, the non-server entry for upgrade WALs - */ - assertEquals(1, count); - // @formatter:off - /* - * Expected fileToServerMap: - * /dir1/uuid1 -> "" - * /dir3/uuid3 -> "" - */ - // @formatter:on - assertEquals(2, fileToServerMap.size()); - assertEquals("", fileToServerMap.get(serverFile1Path)); - assertEquals("", fileToServerMap.get(serverFile3Path)); - // @formatter:off - /* - * Expected nameToFileMap: - * uuid1 -> /dir1/uuid1 - * uuid3 -> /dir3/uuid3 - */ - // @formatter:on - assertEquals(2, nameToFileMap.size()); - assertEquals(serverFile1Path, nameToFileMap.get(UUID1)); - assertEquals(serverFile3Path, nameToFileMap.get(UUID3)); - } - - @Test - public void testGetSortedWALogs() throws Exception { - String[] recoveryDirs = new String[] {"/dir1", "/dir2", "/dir3"}; - // @formatter:off - /* - * Test directory layout: - * /dir1/ - * uuid1 - * file2 - * /dir2/ missing - * /dir3/ - * uuid3 - */ - // @formatter:on - expect(volMgr.exists(DIR_1_PATH)).andReturn(true); - expect(volMgr.exists(DIR_2_PATH)).andReturn(false); - expect(volMgr.exists(DIR_3_PATH)).andReturn(true); - Path path1 = new Path(DIR_1_PATH, UUID1); - FileStatus file1 = makeFileStatus(100, path1); - FileStatus file2 = makeFileStatus(200, new Path(DIR_1_PATH, "file2")); - mockListStatus(DIR_1_PATH, file1, file2); - Path path3 = new Path(DIR_3_PATH, UUID3); - FileStatus file3 = makeFileStatus(300, path3); - mockListStatus(DIR_3_PATH, file3); - replay(volMgr); - - Map sortedWalogs = gcwal.getSortedWALogs(recoveryDirs); - // @formatter:off - /* - * Expected map: - * uuid1 -> /dir1/uuid1 - * uuid3 -> /dir3/uuid3 - */ - // @formatter:on - assertEquals(2, sortedWalogs.size()); - assertEquals(path1, sortedWalogs.get(UUID1)); - assertEquals(path3, sortedWalogs.get(UUID3)); - } - - @Test - public void testIsUUID() { - assertTrue(GarbageCollectWriteAheadLogs.isUUID(UUID.randomUUID().toString())); - assertFalse(GarbageCollectWriteAheadLogs.isUUID("foo")); - assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString())); - assertFalse(GarbageCollectWriteAheadLogs.isUUID(null)); - } - - // It was easier to do this than get the mocking working for me - private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs { - - private List> replData; - - ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List> replData) throws IOException { - super(context, fs, useTrash); - this.replData = replData; - } - - @Override - protected Iterable> getReplicationStatusForFile(Connector conn, String wal) { - return this.replData; - } - } - - @Test - public void replicationEntriesAffectGC() throws Exception { - String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); - Connector conn = createMock(Connector.class); - - // Write a Status record which should prevent file1 from being deleted - LinkedList> replData = new LinkedList<>(); - replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()))); - - ReplicationGCWAL replGC = new ReplicationGCWAL(context, volMgr, false, replData); - - replay(conn); - - // Open (not-closed) file must be retained - assertTrue(replGC.neededByReplication(conn, "/wals/" + file1)); - - // No replication data, not needed - replData.clear(); - assertFalse(replGC.neededByReplication(conn, "/wals/" + file2)); - - // The file is closed but not replicated, must be retained - replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue())); - assertTrue(replGC.neededByReplication(conn, "/wals/" + file1)); - - // File is closed and fully replicated, can be deleted - replData.clear(); - replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), - ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build()))); - assertFalse(replGC.neededByReplication(conn, "/wals/" + file1)); - } - - @Test - public void removeReplicationEntries() throws Exception { - String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); - - Instance inst = new MockInstance(testName.getMethodName()); - AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); - - GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); - - long file1CreateTime = System.currentTimeMillis(); - long file2CreateTime = file1CreateTime + 50; - BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector()); - Mutation m = new Mutation("/wals/" + file1); - StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime)); - bw.addMutation(m); - m = new Mutation("/wals/" + file2); - StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime)); - bw.addMutation(m); - - // These WALs are potential candidates for deletion from fs - Map nameToFileMap = new HashMap<>(); - nameToFileMap.put(file1, new Path("/wals/" + file1)); - nameToFileMap.put(file2, new Path("/wals/" + file2)); - - Map sortedWALogs = Collections.emptyMap(); - - // Make the GCStatus and GcCycleStats - GCStatus status = new GCStatus(); - GcCycleStats cycleStats = new GcCycleStats(); - status.currentLog = cycleStats; - - // We should iterate over two entries - Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status)); - - // We should have noted that two files were still in use - Assert.assertEquals(2l, cycleStats.inUse); - - // Both should have been deleted - Assert.assertEquals(0, nameToFileMap.size()); - } - - @Test - public void replicationEntriesOnlyInMetaPreventGC() throws Exception { - String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString(); - - Instance inst = new MockInstance(testName.getMethodName()); - AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); - - Connector conn = context.getConnector(); - - GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); - - long file1CreateTime = System.currentTimeMillis(); - long file2CreateTime = file1CreateTime + 50; - // Write some records to the metadata table, we haven't yet written status records to the replication table - BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1); - m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime)); - bw.addMutation(m); - - m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2); - m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime)); - bw.addMutation(m); - - // These WALs are potential candidates for deletion from fs - Map nameToFileMap = new HashMap<>(); - nameToFileMap.put(file1, new Path("/wals/" + file1)); - nameToFileMap.put(file2, new Path("/wals/" + file2)); - - Map sortedWALogs = Collections.emptyMap(); - - // Make the GCStatus and GcCycleStats objects - GCStatus status = new GCStatus(); - GcCycleStats cycleStats = new GcCycleStats(); - status.currentLog = cycleStats; - - // We should iterate over two entries - Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status)); - - // We should have noted that two files were still in use - Assert.assertEquals(2l, cycleStats.inUse); - - // Both should have been deleted - Assert.assertEquals(0, nameToFileMap.size()); - } - - @Test - public void noReplicationTableDoesntLimitMetatdataResults() throws Exception { - Instance inst = new MockInstance(testName.getMethodName()); - AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); - Connector conn = context.getConnector(); - - String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; - BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal); - m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())); - bw.addMutation(m); - bw.close(); - - GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); - - Iterable> data = gcWALs.getReplicationStatusForFile(conn, wal); - Entry entry = Iterables.getOnlyElement(data); - - Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString()); - } - - @Test - public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception { - Instance inst = new MockInstance(testName.getMethodName()); - AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst)); - Connector conn = context.getConnector(); - - long walCreateTime = System.currentTimeMillis(); - String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; - BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal); - m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime)); - bw.addMutation(m); - bw.close(); - - bw = ReplicationTable.getBatchWriter(conn); - m = new Mutation(wal); - StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime)); - bw.addMutation(m); - bw.close(); - - GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false); - - Iterable> iter = gcWALs.getReplicationStatusForFile(conn, wal); - Map data = new HashMap<>(); - for (Entry e : iter) { - data.put(e.getKey(), e.getValue()); - } - - Assert.assertEquals(2, data.size()); - - // Should get one element from each table (metadata and replication) - for (Key k : data.keySet()) { - String row = k.getRow().toString(); - if (row.startsWith(ReplicationSection.getRowPrefix())) { - Assert.assertTrue(row.endsWith(wal)); - } else { - Assert.assertEquals(wal, row); - } - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java index ba68890..f47f14b 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java @@ -46,20 +46,17 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.conf.ServerConfigurationFactory; @@ -130,22 +127,16 @@ public class CloseWriteAheadLogReferencesTest { public void findOneWalFromMetadata() throws Exception { Connector conn = createMock(Connector.class); BatchScanner bs = createMock(BatchScanner.class); - // Fake out some data final ArrayList> data = new ArrayList<>(); - LogEntry logEntry = new LogEntry(); - logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a")); - logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID(); - logEntry.server = "tserver1"; - logEntry.tabletId = 1; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID(); + data.add(entry("tserver1:9997[1234567890]", file)); // Get a batchscanner, scan the tablets section, fetch only the logs expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs); - bs.setRanges(Collections.singleton(TabletsSection.getRange())); + bs.setRanges(Collections.singleton(CurrentLogsSection.getRange())); expectLastCall().once(); - bs.fetchColumnFamily(LogColumnFamily.NAME); + bs.fetchColumnFamily(CurrentLogsSection.COLF); expectLastCall().once(); expect(bs.iterator()).andAnswer(new IAnswer>>() { @@ -163,54 +154,12 @@ public class CloseWriteAheadLogReferencesTest { // Validate Set wals = refs.getReferencedWals(conn); - Assert.assertEquals(Collections.singleton(logEntry.filename), wals); - - verify(conn, bs); - } - - @Test - public void findManyWalFromSingleMetadata() throws Exception { - Connector conn = createMock(Connector.class); - BatchScanner bs = createMock(BatchScanner.class); - - // Fake out some data - final ArrayList> data = new ArrayList<>(); - LogEntry logEntry = new LogEntry(); - logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a")); - logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID(); - logEntry.server = "tserver1"; - logEntry.tabletId = 1; - // Multiple DFSLoggers - logEntry.logSet = Sets.newHashSet(logEntry.filename, "hdfs://localhost:8020/accumulo/wal/tserver+port/" + UUID.randomUUID()); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - // Get a batchscanner, scan the tablets section, fetch only the logs - expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs); - bs.setRanges(Collections.singleton(TabletsSection.getRange())); - expectLastCall().once(); - bs.fetchColumnFamily(LogColumnFamily.NAME); - expectLastCall().once(); - expect(bs.iterator()).andAnswer(new IAnswer>>() { - - @Override - public Iterator> answer() throws Throwable { - return data.iterator(); - } - - }); - // Close the bs - bs.close(); - expectLastCall().once(); - - replay(conn, bs); - - // Validate - Set wals = refs.getReferencedWals(conn); - Assert.assertEquals(logEntry.logSet, wals); + Assert.assertEquals(Collections.singleton(file), wals); verify(conn, bs); } + // This is a silly test now @Test public void findManyRefsToSingleWalFromMetadata() throws Exception { Connector conn = createMock(Connector.class); @@ -220,31 +169,14 @@ public class CloseWriteAheadLogReferencesTest { // Fake out some data final ArrayList> data = new ArrayList<>(); - LogEntry logEntry = new LogEntry(); - logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a")); - logEntry.filename = "hdfs://localhost:8020/accumulo/wal/tserver+port/" + uuid; - logEntry.server = "tserver1"; - logEntry.tabletId = 1; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - logEntry.extent = new KeyExtent(new Text("1"), new Text("c"), new Text("b")); - logEntry.server = "tserver1"; - logEntry.tabletId = 2; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - logEntry.extent = new KeyExtent(new Text("1"), null, new Text("c")); - logEntry.server = "tserver1"; - logEntry.tabletId = 3; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid; + data.add(entry("tserver1:9997[0123456789]", filename)); // Get a batchscanner, scan the tablets section, fetch only the logs expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs); - bs.setRanges(Collections.singleton(TabletsSection.getRange())); + bs.setRanges(Collections.singleton(CurrentLogsSection.getRange())); expectLastCall().once(); - bs.fetchColumnFamily(LogColumnFamily.NAME); + bs.fetchColumnFamily(CurrentLogsSection.COLF); expectLastCall().once(); expect(bs.iterator()).andAnswer(new IAnswer>>() { @@ -262,7 +194,7 @@ public class CloseWriteAheadLogReferencesTest { // Validate Set wals = refs.getReferencedWals(conn); - Assert.assertEquals(Collections.singleton(logEntry.filename), wals); + Assert.assertEquals(Collections.singleton(filename), wals); verify(conn, bs); } @@ -272,59 +204,22 @@ public class CloseWriteAheadLogReferencesTest { Connector conn = createMock(Connector.class); BatchScanner bs = createMock(BatchScanner.class); - String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+port/" + UUID.randomUUID(), file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+port/" - + UUID.randomUUID(), file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+port/" + UUID.randomUUID(); + String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID(); + String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID(); + String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID(); // Fake out some data final ArrayList> data = new ArrayList<>(); - LogEntry logEntry = new LogEntry(); - logEntry.extent = new KeyExtent(new Text("1"), new Text("b"), new Text("a")); - logEntry.filename = file1; - logEntry.server = "tserver1"; - logEntry.tabletId = 1; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - logEntry.extent = new KeyExtent(new Text("5"), null, null); - logEntry.tabletId = 2; - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - logEntry.extent = new KeyExtent(new Text("3"), new Text("b"), new Text("a")); - logEntry.filename = file2; - logEntry.server = "tserver2"; - logEntry.tabletId = 3; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - logEntry.extent = new KeyExtent(new Text("3"), new Text("c"), new Text("b")); - logEntry.tabletId = 4; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - logEntry.extent = new KeyExtent(new Text("4"), new Text("5"), new Text("0")); - logEntry.filename = file3; - logEntry.server = "tserver3"; - logEntry.tabletId = 5; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - logEntry.extent = new KeyExtent(new Text("4"), new Text("8"), new Text("5")); - logEntry.server = "tserver3"; - logEntry.tabletId = 7; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); - - logEntry.extent = new KeyExtent(new Text("4"), null, new Text("8")); - logEntry.server = "tserver3"; - logEntry.tabletId = 15; - logEntry.logSet = Collections.singleton(logEntry.filename); - data.add(Maps.immutableEntry(new Key(logEntry.getRow(), logEntry.getColumnFamily(), logEntry.getColumnQualifier()), new Value(logEntry.getValue()))); + + data.add(entry("tserver1:9997[1234567890]", file1)); + data.add(entry("tserver2:9997[1234567891]", file2)); + data.add(entry("tserver3:9997[1234567891]", file3)); // Get a batchscanner, scan the tablets section, fetch only the logs expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs); - bs.setRanges(Collections.singleton(TabletsSection.getRange())); + bs.setRanges(Collections.singleton(CurrentLogsSection.getRange())); expectLastCall().once(); - bs.fetchColumnFamily(LogColumnFamily.NAME); + bs.fetchColumnFamily(CurrentLogsSection.COLF); expectLastCall().once(); expect(bs.iterator()).andAnswer(new IAnswer>>() { @@ -347,6 +242,11 @@ public class CloseWriteAheadLogReferencesTest { verify(conn, bs); } + private static Entry entry(String session, String file) { + Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file)); + return Maps.immutableEntry(key, new Value()); + } + @Test public void unusedWalsAreClosed() throws Exception { Set wals = Collections.emptySet(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 5e6dcfb..2434487 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -421,6 +421,9 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List perm.grantNamespacePermission(user, Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.READ); } perm.grantNamespacePermission("root", Namespaces.ACCUMULO_NAMESPACE_ID, NamespacePermission.ALTER_TABLE); + + // add the currlog location for root tablet current logs + zoo.putPersistentData(ZooUtil.getRoot(getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS, new byte[0], NodeExistsPolicy.SKIP); haveUpgradedZooKeeper = true; } catch (Exception ex) { log.fatal("Error performing upgrade", ex); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java index 3809a29..43939d2 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java +++ b/server/master/src/main/java/org/apache/accumulo/master/MasterClientServiceHandler.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.thrift.TKeyExtent; import org.apache.accumulo.core.master.thrift.MasterClientService; @@ -162,7 +163,8 @@ class MasterClientServiceHandler extends FateServiceHandler implements MasterCli scanner.setRange(MetadataSchema.TabletsSection.getRange()); } else { scanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - scanner.setRange(new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange()); + Range range = new KeyExtent(new Text(tableId), null, ByteBufferUtil.toText(startRow)).toMetadataRange(); + scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange())); } TabletsSection.ServerColumnFamily.FLUSH_COLUMN.fetch(scanner); TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner);