Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 647EA19848 for ; Mon, 14 Mar 2016 23:00:12 +0000 (UTC) Received: (qmail 57446 invoked by uid 500); 14 Mar 2016 23:00:12 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 57405 invoked by uid 500); 14 Mar 2016 23:00:12 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 57395 invoked by uid 99); 14 Mar 2016 23:00:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 14 Mar 2016 23:00:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2CFECDFAEC; Mon, 14 Mar 2016 23:00:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Date: Mon, 14 Mar 2016 23:00:12 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] accumulo git commit: ACCUMULO-4112 use metadata table durability config when writing minc events to walog Repository: accumulo Updated Branches: refs/heads/master db2131598 -> 0f2f80d97 ACCUMULO-4112 use metadata table durability config when writing minc events to walog Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/37857c53 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/37857c53 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/37857c53 Branch: refs/heads/master Commit: 37857c5325ff189413a86a073a7a70f8a23d3104 Parents: e52d547 Author: Keith Turner Authored: Mon Mar 14 15:30:28 2016 -0400 Committer: Keith Turner Committed: Mon Mar 14 15:30:28 2016 -0400 ---------------------------------------------------------------------- .../apache/accumulo/tserver/TabletServer.java | 17 ++++++++++++-- .../apache/accumulo/tserver/log/DfsLogger.java | 24 ++++++++++++++++---- .../tserver/log/TabletServerLogger.java | 22 +++++++++--------- 3 files changed, 45 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/37857c53/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 038d3e8..71ad6bc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -2966,13 +2966,26 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } + private Durability getMincEventDurability(KeyExtent extent) { + TableConfiguration conf; + if (extent.isMeta()) { + conf = confFactory.getTableConfiguration(RootTable.ID); + } else { + conf = confFactory.getTableConfiguration(MetadataTable.ID); + } + Durability durability = DurabilityImpl.fromString(conf.get(Property.TABLE_DURABILITY)); + return durability; + } + public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException { + Durability durability = getMincEventDurability(tablet.getExtent()); totalMinorCompactions.incrementAndGet(); - logger.minorCompactionFinished(tablet, newDatafile, walogSeq); + logger.minorCompactionFinished(tablet, newDatafile, walogSeq, durability); } public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException { - logger.minorCompactionStarted(tablet, lastUpdateSequence, newMapfileLocation); + Durability durability = getMincEventDurability(tablet.getExtent()); + logger.minorCompactionStarted(tablet, lastUpdateSequence, newMapfileLocation, durability); } public void recover(VolumeManager fs, KeyExtent extent, TableConfiguration tconf, List logEntries, Set tabletFiles, http://git-wip-us.apache.org/repos/asf/accumulo/blob/37857c53/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 8512690..a36463d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -237,6 +237,20 @@ public class DfsLogger { } } + private static class NoWaitLoggerOperation extends LoggerOperation { + + public NoWaitLoggerOperation() { + super(null); + } + + @Override + public void await() throws IOException { + return; + } + } + + static final LoggerOperation NO_WAIT_LOGGER_OP = new NoWaitLoggerOperation(); + @Override public boolean equals(Object obj) { // filename is unique @@ -545,7 +559,7 @@ public class DfsLogger { } if (durability == Durability.LOG) - return null; + return NO_WAIT_LOGGER_OP; synchronized (closeLock) { // use a different lock for close check so that adding to work queue does not need @@ -587,21 +601,21 @@ public class DfsLogger { return result; } - public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException { + public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn, Durability durability) throws IOException { LogFileKey key = new LogFileKey(); key.event = COMPACTION_FINISH; key.seq = seq; key.tid = tid; - return logFileData(Collections.singletonList(new Pair(key, EMPTY)), Durability.SYNC); + return logFileData(Collections.singletonList(new Pair(key, EMPTY)), durability); } - public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException { + public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn, Durability durability) throws IOException { LogFileKey key = new LogFileKey(); key.event = COMPACTION_START; key.seq = seq; key.tid = tid; key.filename = fqfn; - return logFileData(Collections.singletonList(new Pair(key, EMPTY)), Durability.SYNC); + return logFileData(Collections.singletonList(new Pair(key, EMPTY)), durability); } public String getLogger() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/37857c53/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 13c742a..11585f2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -318,9 +318,7 @@ public class TabletServerLogger { throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven"); ArrayList queuedOperations = new ArrayList(copy.size()); for (DfsLogger wal : copy) { - LoggerOperation lop = writer.write(wal, seq); - if (lop != null) - queuedOperations.add(lop); + queuedOperations.add(writer.write(wal, seq)); } for (LoggerOperation lop : queuedOperations) { @@ -387,7 +385,7 @@ public class TabletServerLogger { @Override public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent()); - return null; + return DfsLogger.NO_WAIT_LOGGER_OP; } }); } @@ -443,29 +441,31 @@ public class TabletServerLogger { return seq; } - public void minorCompactionFinished(final CommitSession commitSession, final String fullyQualifiedFileName, final int walogSeq) throws IOException { + public void minorCompactionFinished(final CommitSession commitSession, final String fullyQualifiedFileName, final int walogSeq, final Durability durability) + throws IOException { long t1 = System.currentTimeMillis(); int seq = write(commitSession, true, new Writer() { @Override public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { - logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName).await(); - return null; + logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName, durability).await(); + return DfsLogger.NO_WAIT_LOGGER_OP; } }); long t2 = System.currentTimeMillis(); - log.debug(" wrote MinC finish " + seq + ": writeTime:" + (t2 - t1) + "ms "); + log.debug(" wrote MinC finish {}: writeTime:{}ms durability:{}", seq, (t2 - t1), durability); } - public int minorCompactionStarted(final CommitSession commitSession, final int seq, final String fullyQualifiedFileName) throws IOException { + public int minorCompactionStarted(final CommitSession commitSession, final int seq, final String fullyQualifiedFileName, final Durability durability) + throws IOException { write(commitSession, false, new Writer() { @Override public LoggerOperation write(DfsLogger logger, int ignored) throws Exception { - logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName).await(); - return null; + logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName, durability).await(); + return DfsLogger.NO_WAIT_LOGGER_OP; } }); return seq;