Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D8AFB17253 for ; Thu, 7 May 2015 17:58:30 +0000 (UTC) Received: (qmail 39571 invoked by uid 500); 7 May 2015 17:58:30 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 39473 invoked by uid 500); 7 May 2015 17:58:30 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 39452 invoked by uid 99); 7 May 2015 17:58:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2015 17:58:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A32FFE4430; Thu, 7 May 2015 17:58:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: elserj@apache.org To: commits@accumulo.apache.org Date: Thu, 07 May 2015 17:58:31 -0000 Message-Id: In-Reply-To: <5d5c96a90fa8472a89fe56412aacaf8f@git.apache.org> References: <5d5c96a90fa8472a89fe56412aacaf8f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] accumulo git commit: ACCUMULO-3775 always sync the OPEN record ACCUMULO-3775 always sync the OPEN record Signed-off-by: Josh Elser Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/931bf897 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/931bf897 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/931bf897 Branch: refs/heads/master Commit: 931bf897e337756a26064039ca86b848fec556bc Parents: f55751b Author: Eric C. Newton Authored: Wed May 6 10:39:57 2015 -0400 Committer: Josh Elser Committed: Thu May 7 13:18:51 2015 -0400 ---------------------------------------------------------------------- .../apache/accumulo/tserver/log/DfsLogger.java | 14 +++++++-- .../tserver/log/TabletServerLogger.java | 32 +++++++++++++++++--- 2 files changed, 40 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/931bf897/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index cd7ce08..bf60d45 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -369,6 +369,14 @@ public class DfsLogger implements Comparable { return new DFSLoggerInputStreams(input, decryptingInput); } + /** + * Opens a Write-Ahead Log file and writes the necessary header information and OPEN entry to the file. + * The file is ready to be used for ingest if this method returns successfully. If an exception is thrown + * from this method, it is the callers responsibility to ensure that {@link #close()} is called to prevent + * leaking the file handle and/or syncing thread. + * + * @param address The address of the host using this WAL + */ public synchronized void open(String address) throws IOException { String filename = UUID.randomUUID().toString(); log.debug("Address is " + address); @@ -381,6 +389,7 @@ public class DfsLogger implements Comparable { + Path.SEPARATOR + filename; metaReference = toString(); + LoggerOperation op = null; try { short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION); if (replication == 0) @@ -430,8 +439,7 @@ public class DfsLogger implements Comparable { key.event = OPEN; key.tserverSession = filename; key.filename = filename; - write(key, EMPTY); - log.debug("Got new write-ahead log: " + this); + op = logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), Durability.SYNC); } catch (Exception ex) { if (logFile != null) logFile.close(); @@ -443,6 +451,8 @@ public class DfsLogger implements Comparable { syncThread = new Daemon(new LoggingRunnable(log, new LogSyncingTask())); syncThread.setName("Accumulo WALog thread " + toString()); syncThread.start(); + op.await(); + log.debug("Got new write-ahead log: " + this); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/931bf897/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index a2ab551..d468695 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -52,6 +52,7 @@ import org.apache.accumulo.tserver.Mutations; import org.apache.accumulo.tserver.TabletMutations; import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; +import org.apache.accumulo.tserver.log.DfsLogger.ServerResources; import org.apache.accumulo.tserver.tablet.CommitSession; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -237,17 +238,40 @@ public class TabletServerLogger { nextLogMaker.submit(new Runnable() { @Override public void run() { + final ServerResources conf = tserver.getServerConfig(); + final VolumeManager fs = conf.getFileSystem(); while (!nextLogMaker.isShutdown()) { + DfsLogger alog = null; try { log.debug("Creating next WAL"); - DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); + alog = new DfsLogger(conf, syncCounter, flushCounter); alog.open(tserver.getClientAddressString()); - log.debug("Created next WAL " + alog.getFileName()); + String fileName = alog.getFileName(); + log.debug("Created next WAL " + fileName); while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) { - log.info("Our WAL was not used for 12 hours: " + alog.getFileName()); + log.info("Our WAL was not used for 12 hours: " + fileName); } } catch (Exception t) { - log.error("{}", t.getMessage(), t); + log.error("Failed to open WAL", t); + if (null != alog) { + // It's possible that the sync of the header and OPEN record to the WAL failed + // We want to make sure that clean up the resources/thread inside the DfsLogger + // object before trying to create a new one. + try { + alog.close(); + } catch (IOException e) { + log.error("Failed to close WAL after it failed to open", e); + } + // Try to avoid leaving a bunch of empty WALs lying around + try { + Path path = alog.getPath(); + if (fs.exists(path)) { + fs.delete(path); + } + } catch (IOException e) { + log.warn("Failed to delete a WAL that failed to open", e); + } + } try { nextLog.offer(t, 12, TimeUnit.HOURS); } catch (InterruptedException ex) {