Return-Path: X-Original-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-accumulo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 25EE07BBD for ; Tue, 15 Nov 2011 21:49:23 +0000 (UTC) Received: (qmail 71101 invoked by uid 500); 15 Nov 2011 21:49:23 -0000 Delivered-To: apmail-incubator-accumulo-commits-archive@incubator.apache.org Received: (qmail 71083 invoked by uid 500); 15 Nov 2011 21:49:23 -0000 Mailing-List: contact accumulo-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: accumulo-dev@incubator.apache.org Delivered-To: mailing list accumulo-commits@incubator.apache.org Received: (qmail 71076 invoked by uid 99); 15 Nov 2011 21:49:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2011 21:49:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2011 21:49:19 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 9F1CE23888EA; Tue, 15 Nov 2011 21:48:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1202435 - in /incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log: RemoteLogger.java TabletServerLogger.java Date: Tue, 15 Nov 2011 21:48:57 -0000 To: accumulo-commits@incubator.apache.org From: kturner@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111115214857.9F1CE23888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kturner Date: Tue Nov 15 21:48:57 2011 New Revision: 1202435 URL: http://svn.apache.org/viewvc?rev=1202435&view=rev Log: ACCUMULO-119 initial checkin of group commit Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java?rev=1202435&r1=1202434&r2=1202435&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/RemoteLogger.java Tue Nov 15 21:48:57 2011 @@ -17,8 +17,12 @@ package org.apache.accumulo.server.tabletserver.log; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.KeyExtent; @@ -29,6 +33,7 @@ import org.apache.accumulo.core.tabletse import org.apache.accumulo.core.tabletserver.thrift.MutationLogger; import org.apache.accumulo.core.tabletserver.thrift.NoSuchLogIDException; import org.apache.accumulo.core.tabletserver.thrift.TabletMutations; +import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.security.SecurityConstants; @@ -44,6 +49,99 @@ import org.apache.thrift.transport.TTran public class RemoteLogger { private static Logger log = Logger.getLogger(RemoteLogger.class); + private LinkedBlockingQueue workQueue = new LinkedBlockingQueue(); + + private String closeLock = new String("foo"); + + private static final LogWork CLOSED_MARKER = new LogWork(null, null); + + private boolean closed = false; + + public static class LoggerOperation { + private LogWork work; + + public LoggerOperation(LogWork work) { + this.work = work; + } + + public void await() throws NoSuchLogIDException, LoggerClosedException, TException { + try { + work.latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (work.exception != null) { + if (work.exception instanceof NoSuchLogIDException) + throw (NoSuchLogIDException) work.exception; + else if (work.exception instanceof LoggerClosedException) + throw (LoggerClosedException) work.exception; + else if (work.exception instanceof TException) + throw (TException) work.exception; + else if (work.exception instanceof RuntimeException) + throw (RuntimeException) work.exception; + else + throw new RuntimeException(work.exception); + } + } + } + + private static class LogWork { + List mutations; + CountDownLatch latch; + volatile Exception exception; + + public LogWork(List mutations, CountDownLatch latch) { + this.mutations = mutations; + this.latch = latch; + } + } + + private class LogWriterTask implements Runnable { + + @Override + public void run() { + try { + ArrayList work = new ArrayList(); + ArrayList mutations = new ArrayList(); + while (true) { + + work.clear(); + mutations.clear(); + + work.add(workQueue.take()); + workQueue.drainTo(work); + + for (LogWork logWork : work) + if (logWork != CLOSED_MARKER) + mutations.addAll(logWork.mutations); + + synchronized (RemoteLogger.this) { + try { + client.logManyTablets(null, logFile.id, mutations); + } catch (Exception e) { + for (LogWork logWork : work) + if (logWork != CLOSED_MARKER) + logWork.exception = e; + } + } + + boolean sawClosedMarker = false; + for (LogWork logWork : work) + if (logWork == CLOSED_MARKER) + sawClosedMarker = true; + else + logWork.latch.countDown(); + + if (sawClosedMarker) + break; + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + @Override public boolean equals(Object obj) { // filename is unique @@ -87,6 +185,10 @@ public class RemoteLogger { client = null; throw te; } + + Thread t = new Daemon(new LogWriterTask()); + t.setName("Accumulo WALog thread " + toString()); + t.start(); } public RemoteLogger(String address) throws IOException { @@ -123,6 +225,19 @@ public class RemoteLogger { } public synchronized void close() throws NoSuchLogIDException, LoggerClosedException, TException { + + synchronized (closeLock) { + if (closed) + return; + // after closed is set to true, nothing else should be added to the queue + // CLOSED_MARKER should be the last thing on the queue, therefore when the + // background thread sees the marker and exits there should be nothing else + // to process... so nothing should be left waiting for the background + // thread to do work + closed = true; + workQueue.add(CLOSED_MARKER); + } + try { client.close(null, logFile.id); } finally { @@ -135,12 +250,23 @@ public class RemoteLogger { client.defineTablet(null, logFile.id, seq, tid, tablet.toThrift()); } - public synchronized void log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException { - client.log(null, logFile.id, seq, tid, mutation.toThrift()); + public LoggerOperation log(int seq, int tid, Mutation mutation) throws NoSuchLogIDException, LoggerClosedException, TException { + return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation.toThrift())))); } - public synchronized void logManyTablets(List mutations) throws NoSuchLogIDException, LoggerClosedException, TException { - client.logManyTablets(null, logFile.id, mutations); + public LoggerOperation logManyTablets(List mutations) throws NoSuchLogIDException, LoggerClosedException, TException { + LogWork work = new LogWork(mutations, new CountDownLatch(1)); + + synchronized (closeLock) { + // use a differnt lock for close check so that adding to work queue does not need + // to wait on walog I/O operations + + if (closed) + throw new NoSuchLogIDException(); + workQueue.add(work); + } + + return new LoggerOperation(work); } public synchronized void minorCompactionFinished(int seq, int tid, String fqfn) throws NoSuchLogIDException, LoggerClosedException, TException { Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java?rev=1202435&r1=1202434&r2=1202435&view=diff ============================================================================== --- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java (original) +++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/log/TabletServerLogger.java Tue Nov 15 21:48:57 2011 @@ -40,8 +40,9 @@ import org.apache.accumulo.core.tabletse import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.tabletserver.Tablet; -import org.apache.accumulo.server.tabletserver.TabletServer; import org.apache.accumulo.server.tabletserver.Tablet.CommitSession; +import org.apache.accumulo.server.tabletserver.TabletServer; +import org.apache.accumulo.server.tabletserver.log.RemoteLogger.LoggerOperation; import org.apache.log4j.Logger; /** @@ -247,7 +248,7 @@ public class TabletServerLogger { } interface Writer { - void write(RemoteLogger logger, int seq) throws Exception; + LoggerOperation write(RemoteLogger logger, int seq) throws Exception; } private int write(CommitSession commitSession, boolean mincFinish, Writer writer) throws IOException { @@ -294,8 +295,15 @@ public class TabletServerLogger { seq = seqGen.incrementAndGet(); if (seq < 0) throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven"); + ArrayList queuedOperations = new ArrayList(copy.size()); for (RemoteLogger wal : copy) { - writer.write(wal, seq); + LoggerOperation lop = writer.write(wal, seq); + if (lop != null) + queuedOperations.add(lop); + } + + for (LoggerOperation lop : queuedOperations) { + lop.await(); } // double-check: did the log set change? @@ -350,8 +358,9 @@ public class TabletServerLogger { return -1; return write(commitSession, false, new Writer() { @Override - public void write(RemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception { logger.defineTablet(commitSession.getWALogSeq(), commitSession.getLogId(), commitSession.getExtent()); + return null; } }); } @@ -361,8 +370,8 @@ public class TabletServerLogger { return -1; int seq = write(commitSession, false, new Writer() { @Override - public void write(RemoteLogger logger, int ignored) throws Exception { - logger.log(tabletSeq, commitSession.getLogId(), m); + public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception { + return logger.log(tabletSeq, commitSession.getLogId(), m); } }); logSizeEstimate.addAndGet(m.numBytes()); @@ -381,7 +390,7 @@ public class TabletServerLogger { int seq = write(loggables.keySet(), false, new Writer() { @Override - public void write(RemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception { List copy = new ArrayList(loggables.size()); for (Entry> entry : loggables.entrySet()) { CommitSession cs = entry.getKey(); @@ -390,7 +399,7 @@ public class TabletServerLogger { tmutations.add(m.toThrift()); copy.add(new TabletMutations(cs.getLogId(), cs.getWALogSeq(), tmutations)); } - logger.logManyTablets(copy); + return logger.logManyTablets(copy); } }); for (List entry : loggables.values()) { @@ -412,8 +421,9 @@ public class TabletServerLogger { int seq = write(commitSession, true, new Writer() { @Override - public void write(RemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception { logger.minorCompactionFinished(walogSeq, commitSession.getLogId(), fullyQualifiedFileName); + return null; } }); @@ -427,8 +437,9 @@ public class TabletServerLogger { return -1; write(commitSession, false, new Writer() { @Override - public void write(RemoteLogger logger, int ignored) throws Exception { + public LoggerOperation write(RemoteLogger logger, int ignored) throws Exception { logger.minorCompactionStarted(seq, commitSession.getLogId(), fullyQualifiedFileName); + return null; } }); return seq;