accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/3] accumulo git commit: ACCUMULO-3775 always sync the OPEN record
Date Thu, 07 May 2015 17:58:31 GMT
ACCUMULO-3775 always sync the OPEN record

Signed-off-by: Josh Elser <elserj@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/931bf897
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/931bf897
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/931bf897

Branch: refs/heads/master
Commit: 931bf897e337756a26064039ca86b848fec556bc
Parents: f55751b
Author: Eric C. Newton <eric.newton@gmail.com>
Authored: Wed May 6 10:39:57 2015 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Thu May 7 13:18:51 2015 -0400

----------------------------------------------------------------------
 .../apache/accumulo/tserver/log/DfsLogger.java  | 14 +++++++--
 .../tserver/log/TabletServerLogger.java         | 32 +++++++++++++++++---
 2 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/931bf897/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index cd7ce08..bf60d45 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -369,6 +369,14 @@ public class DfsLogger implements Comparable<DfsLogger> {
     return new DFSLoggerInputStreams(input, decryptingInput);
   }
 
+  /**
+   * Opens a Write-Ahead Log file and writes the necessary header information and OPEN entry
to the file.
+   * The file is ready to be used for ingest if this method returns successfully. If an exception
is thrown
+   * from this method, it is the callers responsibility to ensure that {@link #close()} is
called to prevent
+   * leaking the file handle and/or syncing thread.
+   *
+   * @param address The address of the host using this WAL
+   */
   public synchronized void open(String address) throws IOException {
     String filename = UUID.randomUUID().toString();
     log.debug("Address is " + address);
@@ -381,6 +389,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
         + Path.SEPARATOR + filename;
 
     metaReference = toString();
+    LoggerOperation op = null;
     try {
       short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION);
       if (replication == 0)
@@ -430,8 +439,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
       key.event = OPEN;
       key.tserverSession = filename;
       key.filename = filename;
-      write(key, EMPTY);
-      log.debug("Got new write-ahead log: " + this);
+      op = logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), Durability.SYNC);
     } catch (Exception ex) {
       if (logFile != null)
         logFile.close();
@@ -443,6 +451,8 @@ public class DfsLogger implements Comparable<DfsLogger> {
     syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask()));
     syncThread.setName("Accumulo WALog thread " + toString());
     syncThread.start();
+    op.await();
+    log.debug("Got new write-ahead log: " + this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/931bf897/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index a2ab551..d468695 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.tserver.Mutations;
 import org.apache.accumulo.tserver.TabletMutations;
 import org.apache.accumulo.tserver.TabletServer;
 import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation;
+import org.apache.accumulo.tserver.log.DfsLogger.ServerResources;
 import org.apache.accumulo.tserver.tablet.CommitSession;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -237,17 +238,40 @@ public class TabletServerLogger {
     nextLogMaker.submit(new Runnable() {
       @Override
       public void run() {
+        final ServerResources conf = tserver.getServerConfig();
+        final VolumeManager fs = conf.getFileSystem();
         while (!nextLogMaker.isShutdown()) {
+          DfsLogger alog = null;
           try {
             log.debug("Creating next WAL");
-            DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
+            alog = new DfsLogger(conf, syncCounter, flushCounter);
             alog.open(tserver.getClientAddressString());
-            log.debug("Created next WAL " + alog.getFileName());
+            String fileName = alog.getFileName();
+            log.debug("Created next WAL " + fileName);
             while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
-              log.info("Our WAL was not used for 12 hours: " + alog.getFileName());
+              log.info("Our WAL was not used for 12 hours: " + fileName);
             }
           } catch (Exception t) {
-            log.error("{}", t.getMessage(), t);
+            log.error("Failed to open WAL", t);
+            if (null != alog) {
+              // It's possible that the sync of the header and OPEN record to the WAL failed
+              // We want to make sure that clean up the resources/thread inside the DfsLogger
+              // object before trying to create a new one.
+              try {
+                alog.close();
+              } catch (IOException e) {
+                log.error("Failed to close WAL after it failed to open", e);
+              }
+              // Try to avoid leaving a bunch of empty WALs lying around
+              try {
+                Path path = alog.getPath();
+                if (fs.exists(path)) {
+                  fs.delete(path);
+                }
+              } catch (IOException e) {
+                log.warn("Failed to delete a WAL that failed to open", e);
+              }
+            }
             try {
               nextLog.offer(t, 12, TimeUnit.HOURS);
             } catch (InterruptedException ex) {


Mime
View raw message