accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [15/34] accumulo git commit: ACCUMULO-3423 updates based on review by [~elserj] and [~kturner]
Date Fri, 24 Apr 2015 23:21:01 GMT
ACCUMULO-3423 updates based on review by [~elserj] and [~kturner]


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/afa887b6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/afa887b6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/afa887b6

Branch: refs/heads/master
Commit: afa887b6f5f131a06497eaf1d04ba8c55b0d2877
Parents: daa38ce
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Mon Mar 30 11:25:19 2015 -0400
Committer: Eric C. Newton <eric.newton@gmail.com>
Committed: Mon Mar 30 11:25:19 2015 -0400

----------------------------------------------------------------------
 .../core/metadata/schema/MetadataSchema.java    |   2 +-
 .../core/metadata/MetadataTableSchemaTest.java  |  10 +-
 .../server/master/state/MetaDataStateStore.java |  18 +--
 .../master/state/TabletLocationState.java       |   2 -
 .../accumulo/server/util/ListVolumesUsed.java   |  15 ++
 .../server/util/MasterMetadataUtil.java         |   2 +-
 .../accumulo/server/util/MetadataTableUtil.java |  63 +++++---
 .../gc/GarbageCollectWriteAheadLogs.java        |  33 +++--
 .../CloseWriteAheadLogReferences.java           |   2 -
 .../accumulo/master/TabletGroupWatcher.java     |   2 +
 .../server/GarbageCollectionLogger.java         |   2 +-
 .../apache/accumulo/tserver/TabletServer.java   |   3 +
 .../tserver/log/TabletServerLogger.java         |  22 +--
 .../apache/accumulo/tserver/tablet/Tablet.java  |   4 +-
 .../accumulo/tserver/log/LogEntryTest.java      |  56 ++++++++
 .../org/apache/accumulo/test/UnusedWALIT.java   | 144 +++++++++++++++++++
 .../java/org/apache/accumulo/test/VolumeIT.java |  17 +++
 17 files changed, 337 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 88e11f4..d2f7d07 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -322,7 +322,7 @@ public class MetadataSchema {
 
       Text row = new Text();
       k.getRow(row);
-      if (row.getLength() < section.getRowPrefix().length()) {
+      if (!row.toString().startsWith(section.getRowPrefix())) {
         throw new IllegalArgumentException("Bad key " + k.toString());
       }
       for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength()
- 1; sessionStart++) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
index dfc74cf..cfe59f2 100644
--- a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
@@ -18,6 +18,7 @@
 package org.apache.accumulo.core.metadata;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
@@ -28,12 +29,19 @@ public class MetadataTableSchemaTest {
 
   @Test
   public void testGetTabletServer() throws Exception {
-    Key key = new Key("~wal:host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+    Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
     Text hostPort = new Text();
     Text session = new Text();
     CurrentLogsSection.getTabletServer(key, hostPort, session);
     assertEquals("host:43861", hostPort.toString());
     assertEquals("14a7df0e6420003", session.toString());
+    try {
+      Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
+      CurrentLogsSection.getTabletServer(bogus, hostPort, session);
+      fail("bad argument not thrown");
+    } catch (IllegalArgumentException ex) {
+
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index decc8c7..adcf04d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -132,19 +132,19 @@ public class MetaDataStateStore extends TabletStateStore {
         Mutation m = new Mutation(tls.extent.getMetadataEntry());
         if (tls.current != null) {
           tls.current.clearLocation(m);
+          if (logsForDeadServers != null) {
+            List<Path> logs = logsForDeadServers.get(tls.current);
+            if (logs != null) {
+              for (Path log : logs) {
+                LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString());
+                m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
+              }
+            }
+          }
         }
         if (tls.future != null) {
           tls.future.clearFutureLocation(m);
         }
-        if (logsForDeadServers != null) {
-          List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent());
-          if (logs != null) {
-            for (Path log : logs) {
-              LogEntry entry = new LogEntry(tls.extent, 0, tls.futureOrCurrent().hostPort(),
log.toString());
-              m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue());
-            }
-          }
-        }
         writer.addMutation(m);
       }
     } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
index ebad2c8..a222532 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.io.Text;
  */
 public class TabletLocationState {
 
-  // private static final Logger log = Logger.getLogger(TabletLocationState.class);
-
   static public class BadLocationStateException extends Exception {
     private static final long serialVersionUID = 1L;
     private Text metadataTableEntry;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index bf812cd..9e3fc7d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 
 /**
  *
@@ -120,6 +121,20 @@ public class ListVolumesUsed {
 
     for (String volume : volumes)
       System.out.println("\tVolume : " + volume);
+
+    volumes.clear();
+    scanner.clearColumns();
+    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
+    Text path = new Text();
+    for (Entry<Key,Value> entry : scanner) {
+      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+      volumes.add(getLogURI(path.toString()));
+    }
+
+    System.out.println("Listing volumes referenced in " + name + " current logs section");
+
+    for (String volume : volumes)
+      System.out.println("\tVolume : " + volume);
   }
 
   public static void listVolumes(ClientContext context) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
index 24092f9..fb6c4ee 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java
@@ -267,7 +267,7 @@ public class MasterMetadataUtil {
       while (true) {
         try {
           if (zk.exists(zpath)) {
-            log.debug("Removing " + zpath);
+            log.debug("Removing WAL reference for root table " + zpath);
             zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP);
           }
           break;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index db00b9c..f5326bf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -1065,8 +1065,12 @@ public class MetadataTableUtil {
         public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException,
IOException {
           String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
           String uniqueId = filename.getName();
-          String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString()
+ uniqueId;
-          rw.putPersistentData(path, filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
+          StringBuilder path = new StringBuilder(root);
+          path.append("/");
+          path.append(CurrentLogsSection.getRowPrefix());
+          path.append(tabletSession.toString());
+          path.append(uniqueId);
+          rw.putPersistentData(path.toString(), filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
         }
       });
     } else {
@@ -1076,12 +1080,20 @@ public class MetadataTableUtil {
       if (extent.isMeta()) {
         tableName = RootTable.NAME;
       }
+      BatchWriter bw = null;
       try {
-        BatchWriter bw = context.getConnector().createBatchWriter(tableName, null);
+        bw = context.getConnector().createBatchWriter(tableName, null);
         bw.addMutation(m);
-        bw.close();
       } catch (Exception e) {
         throw new RuntimeException(e);
+      } finally {
+        if (bw != null) {
+          try {
+            bw.close();
+          } catch (Exception e2) {
+            throw new RuntimeException(e2);
+          }
+        }
       }
     }
   }
@@ -1101,21 +1113,30 @@ public class MetadataTableUtil {
 
   public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession,
Set<Path> all) throws AccumuloException {
     try {
-      BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null);
-      BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
-      for (Path fname : all) {
-        Text tname = new Text(fname.toString());
-        Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
-        m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
-        root.addMutation(m);
-        log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString()
+ " log:" + fname);
-        m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
-        m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
-        meta.addMutation(m);
-        removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+      BatchWriter root = null;
+      BatchWriter meta = null;
+      try {
+        root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+        meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+        for (Path fname : all) {
+          Text tname = new Text(fname.toString());
+          Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+          m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
+          root.addMutation(m);
+          log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString()
+ " log:" + fname);
+          m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
+          m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
+          meta.addMutation(m);
+          removeCurrentRootLogMarker(context, lock, tabletSession, fname);
+        }
+      } finally {
+        if (root != null) {
+          root.close();
+        }
+        if (meta != null) {
+          meta.close();
+        }
       }
-      root.close();
-      meta.close();
     } catch (Exception ex) {
       throw new AccumuloException(ex);
     }
@@ -1150,8 +1171,12 @@ public class MetadataTableUtil {
       Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
       scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
       List<Path> logs = new ArrayList<>();
+      Text path = new Text();
       for (Entry<Key,Value> entry : scanner) {
-        logs.add(new Path(entry.getKey().getColumnQualifier().toString()));
+        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+        if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
+          logs.add(new Path(path.toString()));
+        }
       }
       logsForDeadServers.put(server, logs);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index cf068ed..d523706 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -180,19 +180,28 @@ public class GarbageCollectWriteAheadLogs {
   private long removeMarkers(Map<TServerInstance,Set<Path>> candidates) {
     long result = 0;
     try {
-      BatchWriter root = context.getConnector().createBatchWriter(RootTable.NAME, null);
-      BatchWriter meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
-      for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
-        Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
-        for (Path path : entry.getValue()) {
-          m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
-          result++;
+      BatchWriter root = null;
+      BatchWriter meta = null;
+      try {
+        root = context.getConnector().createBatchWriter(RootTable.NAME, null);
+        meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
+        for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet())
{
+          Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
+          for (Path path : entry.getValue()) {
+            m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
+            result++;
+          }
+          root.addMutation(m);
+          meta.addMutation(m);
+        }
+      } finally  {
+        if (meta != null) {
+          meta.close();
+        }
+        if (root != null) {
+          root.close();
         }
-        root.addMutation(m);
-        meta.addMutation(m);
       }
-      meta.close();
-      root.close();
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
@@ -386,7 +395,7 @@ public class GarbageCollectWriteAheadLogs {
       CurrentLogsSection.getPath(entry.getKey(), filename);
       TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()),
sessionId.toString());
       Path path = new Path(filename.toString());
-      if ((!currentServers.contains(tsi) || (entry.getValue().equals(CurrentLogsSection.UNUSED))
&& !rootWALs.contains(path))) {
+      if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED)
&& !rootWALs.contains(path)) {
         Set<Path> logs = unusedLogs.get(tsi);
         if (logs == null) {
           unusedLogs.put(tsi, logs = new HashSet<Path>());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 6686cb8..455aaee 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -247,8 +247,6 @@ public class CloseWriteAheadLogReferences implements Runnable {
         MetadataSchema.ReplicationSection.getFile(entry.getKey(), replFileText);
         String replFile = replFileText.toString();
         boolean isReferenced = referencedWals.contains(replFile);
-        log.debug("replFile " + replFile);
-        log.debug("referencedWals " + referencedWals);
 
         // We only want to clean up WALs (which is everything but rfiles) and only when
         // metadata doesn't have a reference to the given WAL

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index a536e98..9a7c40e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -313,6 +313,8 @@ class TabletGroupWatcher extends Daemon {
         if (this.master.tserverSet.getCurrentServers().equals(currentTServers.keySet()))
{
           Master.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(),
Master.TIME_TO_WAIT_BETWEEN_SCANS / 1000.));
           eventListener.waitForEvents(Master.TIME_TO_WAIT_BETWEEN_SCANS);
+        } else {
+          Master.log.info("Detected change in current tserver set, re-running state machine.");
         }
       } catch (Exception ex) {
         Master.log.error("Error processing table state for store " + store.name(), ex);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
index 5fe2548..57d8da1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java
@@ -98,7 +98,7 @@ public class GarbageCollectionLogger {
     final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
     if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) {
       final long diff = now - lastMemoryCheckTime;
-      if (diff > keepAliveTimeout + 1000) {
+      if (diff > keepAliveTimeout) {
         log.warn(String.format("GC pause checker not called in a timely fashion. Expected
every %.1f seconds but was %.1f seconds since last check",
             keepAliveTimeout / 1000., diff / 1000.));
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index ffc1c2a..9389776 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1719,6 +1719,8 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
     @Override
     public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames)
throws TException {
       log.warn("Garbage collector is attempting to remove logs through the tablet server");
+      log.warn("This is probably because your file Garbage Collector is an older version
than your tablet servers.\n" +
+          "Restart your file Garbage Collector.");
     }
   }
 
@@ -3015,6 +3017,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable
{
 
   public void addLoggersToMetadata(DfsLogger copy, KeyExtent extent) {
     TabletLevel level = TabletLevel.getLevel(extent);
+    // serialize the updates to the metadata per level: avoids updating the level more than
once
     synchronized (level) {
       EnumSet<TabletLevel> set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
       if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index accfc5e..0f3f642 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -262,17 +262,19 @@ public class TabletServerLogger {
       throw new IllegalStateException("close should be called with write lock held!");
     }
     try {
-      try {
-        currentLog.close();
-      } catch (DfsLogger.LogClosedException ex) {
-        // ignore
-      } catch (Throwable ex) {
-        log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex,
ex);
-      } finally {
-        this.tserver.walogClosed(currentLog);
+      if (null != currentLog) {
+        try {
+          currentLog.close();
+        } catch (DfsLogger.LogClosedException ex) {
+          // ignore
+        } catch (Throwable ex) {
+          log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex,
ex);
+        } finally {
+          this.tserver.walogClosed(currentLog);
+        }
+        currentLog = null;
+        logSizeEstimate.set(0);
       }
-      currentLog = null;
-      logSizeEstimate.set(0);
     } catch (Throwable t) {
       throw new IOException(t);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index b30578a..fb0adb8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -933,8 +933,8 @@ public class Tablet implements TabletCommitter {
 
     long count = 0;
 
+    String oldName = Thread.currentThread().getName();
     try {
-      String oldName = Thread.currentThread().getName();
       Thread.currentThread().setName("Minor compacting " + this.extent);
       Span span = Trace.start("write");
       CompactionStats stats;
@@ -956,7 +956,6 @@ public class Tablet implements TabletCommitter {
             commitSession, flushId);
       } finally {
         span.stop();
-        Thread.currentThread().setName(oldName);
       }
       return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten());
     } catch (Exception e) {
@@ -967,6 +966,7 @@ public class Tablet implements TabletCommitter {
       failed = true;
       throw new RuntimeException(e);
     } finally {
+      Thread.currentThread().setName(oldName);
       try {
         getTabletMemory().finalizeMinC();
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
new file mode 100644
index 0000000..44058d3
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+public class LogEntryTest {
+
+  @Test
+  public void test() throws Exception {
+    KeyExtent extent = new KeyExtent(new Text("1"), null, new Text(""));
+    long ts = 12345678L;
+    String server = "localhost:1234";
+    String filename = "default/foo";
+    LogEntry entry = new LogEntry(extent, ts, server, filename);
+    assertEquals(extent, entry.extent);
+    assertEquals(server, entry.server);
+    assertEquals(filename, entry.filename);
+    assertEquals(ts, entry.timestamp);
+    assertEquals("1<; default/foo", entry.toString());
+    assertEquals(new Text("log"), entry.getColumnFamily());
+    assertEquals(new Text("localhost:1234/default/foo"), entry.getColumnQualifier());
+    LogEntry copy = LogEntry.fromBytes(entry.toBytes());
+    assertEquals(entry.toString(), copy.toString());
+    Key key = new Key(new Text("1<"), new Text("log"), new Text("localhost:1234/default/foo"));
+    key.setTimestamp(ts);
+    LogEntry copy2 = LogEntry.fromKeyValue(key, entry.getValue());
+    assertEquals(entry.toString(), copy2.toString());
+    assertEquals(entry.timestamp, copy2.timestamp);
+    assertEquals("foo", entry.getUniqueID());
+    assertEquals("localhost:1234/default/foo", entry.getName());
+    assertEquals(new Value("default/foo".getBytes()), entry.getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
new file mode 100644
index 0000000..3684ee1
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+// When reviewing the changes for ACCUMULO-3423, kturner suggested
+// "tablets will now have log references that contain no data,
+// so it may be marked with 3 WALs, the first with data, the 2nd without, a 3rd with data.
+// It would be useful to have an IT that will test this situation.
+public class UnusedWALIT extends ConfigurableMacIT {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    final long logSize = 1024 * 1024 * 10;
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+    cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, Long.toString(logSize));
+    cfg.setNumTservers(1);
+    // use raw local file system so walogs sync and flush will work
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+    hadoopCoreSite.set("fs.namenode.fs-limits.min-block-size", Long.toString(logSize));
+  }
+
+  @Test(timeout = 2 * 60 * 1000)
+  public void test() throws Exception {
+    // don't want this bad boy cleaning up walog entries
+    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+
+    // make two tables
+    String[] tableNames = getUniqueNames(2);
+    String bigTable = tableNames[0];
+    String lilTable = tableNames[1];
+    Connector c = getConnector();
+    c.tableOperations().create(bigTable);
+    c.tableOperations().create(lilTable);
+
+    // put some data in a log that should be replayed for both tables
+    writeSomeData(c, bigTable, 0, 10, 0, 10);
+    scanSomeData(c, bigTable, 0, 10, 0, 10);
+    writeSomeData(c, lilTable, 0, 1, 0, 1);
+    scanSomeData(c, lilTable, 0, 1, 0, 1);
+    assertEquals(1, getWALCount(c));
+
+    // roll the logs by pushing data into bigTable
+    writeSomeData(c, bigTable, 0, 3000, 0, 1000);
+    assertEquals(3, getWALCount(c));
+
+    // put some data in the latest log
+    writeSomeData(c, lilTable, 1, 10, 0, 10);
+    scanSomeData(c, lilTable, 1, 10, 0, 10);
+
+    // bounce the tserver
+    getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
+    getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+    // wait for the metadata table to be online
+    Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+
+    // check our two sets of data in different logs
+    scanSomeData(c, lilTable, 0, 1, 0, 1);
+    scanSomeData(c, lilTable, 1, 10, 0, 10);
+  }
+
+  private void scanSomeData(Connector c, String table, int startRow, int rowCount, int startCol,
int colCount) throws Exception {
+    Scanner s = c.createScanner(table, Authorizations.EMPTY);
+    s.setRange(new Range(Integer.toHexString(startRow), Integer.toHexString(startRow + rowCount)));
+    int row = startRow;
+    int col = startCol;
+    for (Entry<Key,Value> entry : s) {
+      assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString(), 16));
+      assertEquals(col++, Integer.parseInt(entry.getKey().getColumnQualifier().toString(),
16));
+      if (col == startCol + colCount) {
+        col = startCol;
+        row++;
+        if (row == startRow + rowCount) {
+          break;
+        }
+      }
+    }
+    assertEquals(row, startRow + rowCount);
+  }
+
+  private int getWALCount(Connector c) throws Exception {
+    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(CurrentLogsSection.getRange());
+    try {
+      return Iterators.size(s.iterator());
+    } finally {
+      s.close();
+    }
+  }
+
+  private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int
startCol, int colCount) throws Exception {
+    BatchWriterConfig config = new BatchWriterConfig();
+    config.setMaxMemory(10 * 1024 * 1024);
+    BatchWriter bw = conn.createBatchWriter(table, config);
+    for (int r = startRow; r < startRow + rowCount; r++) {
+      Mutation m = new Mutation(Integer.toHexString(r));
+      for (int c = startCol; c < startCol + colCount; c++) {
+        m.put("", Integer.toHexString(c), "");
+      }
+      bw.addMutation(m);
+    }
+    bw.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/afa887b6/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 7f1f921..7aeb135 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -103,6 +103,7 @@ public class VolumeIT extends ConfigurableMacIT {
     cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath());
     cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost());
     cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString());
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
 
     // use raw local file system so walogs sync and flush will work
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
@@ -425,6 +426,21 @@ public class VolumeIT extends ConfigurableMacIT {
       Assert.fail("Unexpected volume " + path);
     }
 
+    Text path = new Text();
+    for (String table : new String[]{RootTable.NAME, MetadataTable.NAME}) {
+      Scanner meta = conn.createScanner(table, Authorizations.EMPTY);
+      meta.setRange(MetadataSchema.CurrentLogsSection.getRange());
+      outer: for (Entry<Key,Value> entry : meta) {
+        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+        for (int i = 0; i < paths.length; i++) {
+          if (path.toString().startsWith(paths[i].toString())) {
+            continue outer;
+          }
+        }
+        Assert.fail("Unexpected volume " + path);
+      }
+    }
+
     // if a volume is chosen randomly for each tablet, then the probability that a volume
will not be chosen for any tablet is ((num_volumes -
     // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only
2 volumes would be chosen is 2.46e-18
 
@@ -435,6 +451,7 @@ public class VolumeIT extends ConfigurableMacIT {
     }
 
     Assert.assertEquals(200, sum);
+
   }
 
   @Test


Mime
View raw message