Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B2361109A6 for ; Wed, 26 Feb 2014 20:59:44 +0000 (UTC) Received: (qmail 41536 invoked by uid 500); 26 Feb 2014 20:59:43 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 41447 invoked by uid 500); 26 Feb 2014 20:59:40 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 41408 invoked by uid 99); 26 Feb 2014 20:59:38 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 26 Feb 2014 20:59:38 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9AA1889F82D; Wed, 26 Feb 2014 20:59:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ctubbsii@apache.org To: commits@accumulo.apache.org Date: Wed, 26 Feb 2014 20:59:40 -0000 Message-Id: <1f9e4892515a4f0f9d8d4cdca12d3b11@git.apache.org> In-Reply-To: <81ef772dd8df4bdf8d67feb43f3e3891@git.apache.org> References: <81ef772dd8df4bdf8d67feb43f3e3891@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] git commit: Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT Merge branch '1.5.1-SNAPSHOT' into 1.6.0-SNAPSHOT Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/59716f06 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/59716f06 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/59716f06 Branch: refs/heads/master Commit: 59716f06bad0b9d8a63c699ac75a645c7e2f4a8e Parents: 4e72516 5dba407 Author: Christopher Tubbs Authored: Wed Feb 26 15:41:42 2014 -0500 Committer: Christopher Tubbs Committed: Wed Feb 26 15:41:42 2014 -0500 ---------------------------------------------------------------------- .../apache/accumulo/tserver/log/DfsLogger.java | 137 +++++++++---------- 1 file changed, 65 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/59716f06/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 33d7722,0000000..55df118 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@@ -1,545 -1,0 +1,538 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.log; + +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_FINISH; +import static org.apache.accumulo.tserver.logger.LogEvents.COMPACTION_START; +import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET; +import static org.apache.accumulo.tserver.logger.LogEvents.MANY_MUTATIONS; +import static org.apache.accumulo.tserver.logger.LogEvents.OPEN; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.security.crypto.CryptoModule; +import org.apache.accumulo.core.security.crypto.CryptoModuleFactory; +import org.apache.accumulo.core.security.crypto.CryptoModuleParameters; +import org.apache.accumulo.core.security.crypto.DefaultCryptoModule; +import org.apache.accumulo.core.security.crypto.NoFlushOutputStream; +import org.apache.accumulo.core.util.Daemon; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.StringUtil; +import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.accumulo.tserver.TabletMutations; +import org.apache.accumulo.tserver.logger.LogFileKey; +import org.apache.accumulo.tserver.logger.LogFileValue; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; + +/** + * Wrap a connection to a logger. + * + */ +public class DfsLogger { + // Package private so that LogSorter can find this + static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---"; + static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; - ++ + private static Logger log = Logger.getLogger(DfsLogger.class); - ++ + public static class LogClosedException extends IOException { + private static final long serialVersionUID = 1L; - ++ + public LogClosedException() { + super("LogClosed"); + } + } - ++ + public static class DFSLoggerInputStreams { - ++ + private FSDataInputStream originalInput; + private DataInputStream decryptingInputStream; + + public DFSLoggerInputStreams(FSDataInputStream originalInput, DataInputStream decryptingInputStream) { + this.originalInput = originalInput; + this.decryptingInputStream = decryptingInputStream; + } + + public FSDataInputStream getOriginalInput() { + return originalInput; + } + + public void setOriginalInput(FSDataInputStream originalInput) { + this.originalInput = originalInput; + } + + public DataInputStream getDecryptingInputStream() { + return decryptingInputStream; + } + + public void setDecryptingInputStream(DataInputStream decryptingInputStream) { + this.decryptingInputStream = decryptingInputStream; + } + } - - ++ + public interface ServerResources { + AccumuloConfiguration getConfiguration(); - ++ + VolumeManager getFileSystem(); - ++ + Set getCurrentTServers(); + } - ++ + private final LinkedBlockingQueue workQueue = new LinkedBlockingQueue(); - ++ + private final Object closeLock = new Object(); - ++ + private static final DfsLogger.LogWork CLOSED_MARKER = new DfsLogger.LogWork(null); - ++ + private static final LogFileValue EMPTY = new LogFileValue(); - ++ + private boolean closed = false; - ++ + private class LogSyncingTask implements Runnable { - ++ + @Override + public void run() { + ArrayList work = new ArrayList(); + while (true) { + work.clear(); - ++ + try { + work.add(workQueue.take()); + } catch (InterruptedException ex) { + continue; + } + workQueue.drainTo(work); - ++ + synchronized (closeLock) { + if (!closed) { + try { + sync.invoke(logFile); + } catch (Exception ex) { + log.warn("Exception syncing " + ex); + for (DfsLogger.LogWork logWork : work) { + logWork.exception = ex; + } + } + } else { + for (DfsLogger.LogWork logWork : work) { + logWork.exception = new LogClosedException(); + } + } + } - ++ + boolean sawClosedMarker = false; + for (DfsLogger.LogWork logWork : work) + if (logWork == CLOSED_MARKER) + sawClosedMarker = true; + else + logWork.latch.countDown(); - ++ + if (sawClosedMarker) { + synchronized (closeLock) { + closeLock.notifyAll(); + } + break; + } + } + } + } - ++ + static class LogWork { + CountDownLatch latch; + volatile Exception exception; + + public LogWork(CountDownLatch latch) { + this.latch = latch; + } + } - ++ + public static class LoggerOperation { + private final LogWork work; - ++ + public LoggerOperation(LogWork work) { + this.work = work; + } - ++ + public void await() throws IOException { + try { + work.latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } - ++ + if (work.exception != null) { + if (work.exception instanceof IOException) + throw (IOException) work.exception; + else if (work.exception instanceof RuntimeException) + throw (RuntimeException) work.exception; + else + throw new RuntimeException(work.exception); + } + } + } - ++ + @Override + public boolean equals(Object obj) { + // filename is unique + if (obj == null) + return false; + if (obj instanceof DfsLogger) + return getFileName().equals(((DfsLogger) obj).getFileName()); + return false; + } - ++ + @Override + public int hashCode() { + // filename is unique + return getFileName().hashCode(); + } - ++ + private final ServerResources conf; + private FSDataOutputStream logFile; + private DataOutputStream encryptingLogFile = null; + private Method sync; + private String logPath; - ++ + public DfsLogger(ServerResources conf) throws IOException { + this.conf = conf; + } - ++ + public DfsLogger(ServerResources conf, String filename) throws IOException { + this.conf = conf; + this.logPath = filename; + } - ++ + public static DFSLoggerInputStreams readHeaderAndReturnStream(VolumeManager fs, Path path, AccumuloConfiguration conf) throws IOException { + FSDataInputStream input = fs.open(path); + DataInputStream decryptingInput = null; - ++ + byte[] magic = DfsLogger.LOG_FILE_HEADER_V3.getBytes(); + byte[] magicBuffer = new byte[magic.length]; + input.readFully(magicBuffer); + if (Arrays.equals(magicBuffer, magic)) { + // additional parameters it needs from the underlying stream. + String cryptoModuleClassname = input.readUTF(); + CryptoModule cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoModuleClassname); + + // Create the parameters and set the input stream into those parameters + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); + params.setEncryptedInputStream(input); + + // Create the plaintext input stream from the encrypted one + params = cryptoModule.getDecryptingInputStream(params); + + if (params.getPlaintextInputStream() instanceof DataInputStream) { + decryptingInput = (DataInputStream) params.getPlaintextInputStream(); + } else { + decryptingInput = new DataInputStream(params.getPlaintextInputStream()); + } + } else { + input.seek(0); + byte[] magicV2 = DfsLogger.LOG_FILE_HEADER_V2.getBytes(); + byte[] magicBufferV2 = new byte[magic.length]; + input.readFully(magicBufferV2); + + if (Arrays.equals(magicBufferV2, magicV2)) { - // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class - // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be ++ // Log files from 1.5 dump their options in raw to the logger files. Since we don't know the class ++ // that needs to read those files, we can make a couple of basic assumptions. Either it's going to be + // the NullCryptoModule (no crypto) or the DefaultCryptoModule. - - // If it's null, we won't have any parameters whatsoever. First, let's attempt to read ++ ++ // If it's null, we won't have any parameters whatsoever. First, let's attempt to read + // parameters + Map opts = new HashMap(); + int count = input.readInt(); + for (int i = 0; i < count; i++) { + String key = input.readUTF(); + String value = input.readUTF(); + opts.put(key, value); + } - ++ + if (opts.size() == 0) { + // NullCryptoModule, we're done + decryptingInput = input; + } else { - ++ + // The DefaultCryptoModule will want to read the parameters from the underlying file, so we will put the file back to that spot. + org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory + .getCryptoModule(DefaultCryptoModule.class.getName()); + + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf); - ++ + input.seek(0); + input.readFully(magicBuffer); + params.setEncryptedInputStream(input); + + params = cryptoModule.getDecryptingInputStream(params); + if (params.getPlaintextInputStream() instanceof DataInputStream) { + decryptingInput = (DataInputStream) params.getPlaintextInputStream(); + } else { + decryptingInput = new DataInputStream(params.getPlaintextInputStream()); + } + } - ++ + } else { + + input.seek(0); + decryptingInput = input; + } + + } + return new DFSLoggerInputStreams(input, decryptingInput); + } - ++ + public synchronized void open(String address) throws IOException { + String filename = UUID.randomUUID().toString(); + String logger = StringUtil.join(Arrays.asList(address.split(":")), "+"); + + log.debug("DfsLogger.open() begin"); + VolumeManager fs = conf.getFileSystem(); - ++ + logPath = fs.choose(ServerConstants.getWalDirs()) + "/" + logger + "/" + filename; + try { + short replication = (short) conf.getConfiguration().getCount(Property.TSERV_WAL_REPLICATION); + if (replication == 0) + replication = fs.getDefaultReplication(new Path(logPath)); + long blockSize = conf.getConfiguration().getMemoryInBytes(Property.TSERV_WAL_BLOCKSIZE); + if (blockSize == 0) + blockSize = (long) (conf.getConfiguration().getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE) * 1.1); + if (conf.getConfiguration().getBoolean(Property.TSERV_WAL_SYNC)) + logFile = fs.createSyncable(new Path(logPath), 0, replication, blockSize); + else + logFile = fs.create(new Path(logPath), true, 0, replication, blockSize); - ++ + try { + NoSuchMethodException e = null; + try { + // sync: send data to datanodes + sync = logFile.getClass().getMethod("sync"); + } catch (NoSuchMethodException ex) { + e = ex; + } + try { + // hsync: send data to datanodes and sync the data to disk + sync = logFile.getClass().getMethod("hsync"); + e = null; + } catch (NoSuchMethodException ex) {} + if (e != null) + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } - ++ + // Initialize the crypto operations. + org.apache.accumulo.core.security.crypto.CryptoModule cryptoModule = org.apache.accumulo.core.security.crypto.CryptoModuleFactory.getCryptoModule(conf + .getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); - ++ + // Initialize the log file with a header and the crypto params used to set up this log file. + logFile.write(LOG_FILE_HEADER_V3.getBytes(Constants.UTF8)); + + CryptoModuleParameters params = CryptoModuleFactory.createParamsObjectFromAccumuloConfiguration(conf.getConfiguration()); + + params.setPlaintextOutputStream(new NoFlushOutputStream(logFile)); + + // In order to bootstrap the reading of this file later, we have to record the CryptoModule that was used to encipher it here, + // so that that crypto module can re-read its own parameters. + + logFile.writeUTF(conf.getConfiguration().get(Property.CRYPTO_MODULE_CLASS)); + - + params = cryptoModule.getEncryptingOutputStream(params); + OutputStream encipheringOutputStream = params.getEncryptedOutputStream(); - ++ + // If the module just kicks back our original stream, then just use it, don't wrap it in + // another data OutputStream. + if (encipheringOutputStream == logFile) { + encryptingLogFile = logFile; + } else { + encryptingLogFile = new DataOutputStream(encipheringOutputStream); + } - ++ + LogFileKey key = new LogFileKey(); + key.event = OPEN; + key.tserverSession = filename; + key.filename = filename; + write(key, EMPTY); + sync.invoke(logFile); + log.debug("Got new write-ahead log: " + this); + } catch (Exception ex) { + if (logFile != null) + logFile.close(); + logFile = null; + encryptingLogFile = null; + throw new IOException(ex); + } - ++ + Thread t = new Daemon(new LogSyncingTask()); + t.setName("Accumulo WALog thread " + toString()); + t.start(); + } - ++ + @Override + public String toString() { + String fileName = getFileName(); + if (fileName.contains(":")) + return getLogger() + "/" + getFileName(); + return fileName; + } - ++ + public String getFileName() { + return logPath.toString(); + } - ++ + public void close() throws IOException { - ++ + synchronized (closeLock) { + if (closed) + return; + // after closed is set to true, nothing else should be added to the queue + // CLOSED_MARKER should be the last thing on the queue, therefore when the + // background thread sees the marker and exits there should be nothing else + // to process... so nothing should be left waiting for the background + // thread to do work + closed = true; + workQueue.add(CLOSED_MARKER); + while (!workQueue.isEmpty()) + try { + closeLock.wait(); + } catch (InterruptedException e) { + log.info("Interrupted"); + } + } + + if (encryptingLogFile != null) + try { + logFile.close(); + } catch (IOException ex) { + log.error(ex); + throw new LogClosedException(); + } + } - ++ + public synchronized void defineTablet(int seq, int tid, KeyExtent tablet) throws IOException { + // write this log to the METADATA table + final LogFileKey key = new LogFileKey(); + key.event = DEFINE_TABLET; + key.seq = seq; + key.tid = tid; + key.tablet = tablet; + try { + write(key, EMPTY); + sync.invoke(logFile); + } catch (Exception ex) { + log.error(ex); + throw new IOException(ex); + } + } - - /** - * @param key - * @param empty2 - * @throws IOException - */ ++ + private synchronized void write(LogFileKey key, LogFileValue value) throws IOException { + key.write(encryptingLogFile); + value.write(encryptingLogFile); + encryptingLogFile.flush(); + } - ++ + public LoggerOperation log(int seq, int tid, Mutation mutation) throws IOException { + return logManyTablets(Collections.singletonList(new TabletMutations(tid, seq, Collections.singletonList(mutation)))); + } - - private LoggerOperation logFileData(List> keys) throws IOException { ++ ++ private LoggerOperation logFileData(List> keys) throws IOException { + DfsLogger.LogWork work = new DfsLogger.LogWork(new CountDownLatch(1)); + synchronized (DfsLogger.this) { + try { + for (Pair pair : keys) { + write(pair.getFirst(), pair.getSecond()); + } + } catch (ClosedChannelException ex) { + throw new LogClosedException(); + } catch (Exception e) { + log.error(e, e); + work.exception = e; + } + } + + synchronized (closeLock) { + // use a different lock for close check so that adding to work queue does not need + // to wait on walog I/O operations - ++ + if (closed) + throw new LogClosedException(); + workQueue.add(work); + } - ++ + return new LoggerOperation(work); + } - ++ + public LoggerOperation logManyTablets(List mutations) throws IOException { - List> data = new ArrayList>(); ++ List> data = new ArrayList>(); + for (TabletMutations tabletMutations : mutations) { + LogFileKey key = new LogFileKey(); + key.event = MANY_MUTATIONS; + key.seq = tabletMutations.getSeq(); + key.tid = tabletMutations.getTid(); + LogFileValue value = new LogFileValue(); + value.mutations = tabletMutations.getMutations(); - data.add(new Pair(key, value)); ++ data.add(new Pair(key, value)); + } + return logFileData(data); + } + + public LoggerOperation minorCompactionFinished(int seq, int tid, String fqfn) throws IOException { + LogFileKey key = new LogFileKey(); + key.event = COMPACTION_FINISH; + key.seq = seq; + key.tid = tid; - return logFileData(Collections.singletonList(new Pair(key, EMPTY))); ++ return logFileData(Collections.singletonList(new Pair(key, EMPTY))); + } - ++ + public LoggerOperation minorCompactionStarted(int seq, int tid, String fqfn) throws IOException { + LogFileKey key = new LogFileKey(); + key.event = COMPACTION_START; + key.seq = seq; + key.tid = tid; + key.filename = fqfn; - return logFileData(Collections.singletonList(new Pair(key, EMPTY))); ++ return logFileData(Collections.singletonList(new Pair(key, EMPTY))); + } + + public String getLogger() { + String parts[] = logPath.split("/"); + return StringUtil.join(Arrays.asList(parts[parts.length - 2].split("[+]")), ":"); + } - ++ +}