Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3AC52200B5A for ; Thu, 30 Jun 2016 04:21:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 398C2160A57; Thu, 30 Jun 2016 02:21:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C6103160A76 for ; Thu, 30 Jun 2016 04:21:28 +0200 (CEST) Received: (qmail 35777 invoked by uid 500); 30 Jun 2016 02:21:28 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 35359 invoked by uid 99); 30 Jun 2016 02:21:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Jun 2016 02:21:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 77E31ED230; Thu, 30 Jun 2016 02:21:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mpercy@apache.org To: commits@flume.apache.org Date: Thu, 30 Jun 2016 02:21:36 -0000 Message-Id: <697f4455e3684f19bd033ada9ec1ca77@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [10/11] flume git commit: FLUME-2937. Integrate checkstyle for non-test classes archived-at: Thu, 30 Jun 2016 02:21:31 -0000 http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java index 9dfa0d1..f1a892a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java @@ -18,6 +18,12 @@ */ package org.apache.flume.channel.file; +import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.flume.channel.file.proto.ProtosFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -26,50 +32,42 @@ import java.io.RandomAccessFile; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.flume.channel.file.proto.ProtosFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.protobuf.InvalidProtocolBufferException; - final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { - private static final Logger LOG = LoggerFactory - .getLogger(EventQueueBackingStoreFileV3.class); + private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFileV3.class); private final File metaDataFile; EventQueueBackingStoreFileV3(File checkpointFile, int capacity, - String name) throws IOException, BadCheckpointException { + String name) throws IOException, BadCheckpointException { this(checkpointFile, capacity, name, null, false, false); } EventQueueBackingStoreFileV3(File checkpointFile, int capacity, - String name, File checkpointBackupDir, - boolean backupCheckpoint, boolean compressBackup) + String name, File checkpointBackupDir, + boolean backupCheckpoint, boolean compressBackup) throws IOException, BadCheckpointException { super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint, - compressBackup); + compressBackup); Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0 " + capacity); metaDataFile = Serialization.getMetaDataFile(checkpointFile); LOG.info("Starting up with " + checkpointFile + " and " + metaDataFile); - if(metaDataFile.exists()) { + if (metaDataFile.exists()) { FileInputStream inputStream = new FileInputStream(metaDataFile); try { LOG.info("Reading checkpoint metadata from " + metaDataFile); ProtosFactory.Checkpoint checkpoint = ProtosFactory.Checkpoint.parseDelimitedFrom(inputStream); - if(checkpoint == null) { + if (checkpoint == null) { throw new BadCheckpointException("The checkpoint metadata file does " - + "not exist or has zero length"); + + "not exist or has zero length"); } int version = checkpoint.getVersion(); - if(version != getVersion()) { + if (version != getVersion()) { throw new BadCheckpointException("Invalid version: " + version + - " " + name + ", expected " + getVersion()); + " " + name + ", expected " + getVersion()); } long logWriteOrderID = checkpoint.getWriteOrderID(); - if(logWriteOrderID != getCheckpointLogWriteOrderID()) { + if (logWriteOrderID != getCheckpointLogWriteOrderID()) { String msg = "Checkpoint and Meta files have differing " + "logWriteOrderIDs " + getCheckpointLogWriteOrderID() + ", and " + logWriteOrderID; @@ -80,15 +78,15 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { setLogWriteOrderID(logWriteOrderID); setSize(checkpoint.getQueueSize()); setHead(checkpoint.getQueueHead()); - for(ProtosFactory.ActiveLog activeLog : checkpoint.getActiveLogsList()) { + for (ProtosFactory.ActiveLog activeLog : checkpoint.getActiveLogsList()) { Integer logFileID = activeLog.getLogFileID(); Integer count = activeLog.getCount(); logFileIDReferenceCounts.put(logFileID, new AtomicInteger(count)); } } catch (InvalidProtocolBufferException ex) { throw new BadCheckpointException("Checkpoint metadata file is invalid. " - + "The agent might have been stopped while it was being " - + "written", ex); + + "The agent might have been stopped while it was being " + + "written", ex); } finally { try { inputStream.close(); @@ -97,7 +95,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { } } } else { - if(backupExists(checkpointBackupDir) && shouldBackup) { + if (backupExists(checkpointBackupDir) && shouldBackup) { // If a backup exists, then throw an exception to recover checkpoint throw new BadCheckpointException("The checkpoint metadata file does " + "not exist, but a backup exists"); @@ -121,6 +119,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { } } } + File getMetaDataFile() { return metaDataFile; } @@ -129,6 +128,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { protected int getVersion() { return Serialization.VERSION_3; } + @Override protected void writeCheckpointMetaData() throws IOException { ProtosFactory.Checkpoint.Builder checkpointBuilder = @@ -137,14 +137,14 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { checkpointBuilder.setQueueHead(getHead()); checkpointBuilder.setQueueSize(getSize()); checkpointBuilder.setWriteOrderID(getLogWriteOrderID()); - for(Integer logFileID : logFileIDReferenceCounts.keySet()) { + for (Integer logFileID : logFileIDReferenceCounts.keySet()) { int count = logFileIDReferenceCounts.get(logFileID).get(); - if(count != 0) { - ProtosFactory.ActiveLog.Builder activeLogBuilder = - ProtosFactory.ActiveLog.newBuilder(); - activeLogBuilder.setLogFileID(logFileID); - activeLogBuilder.setCount(count); - checkpointBuilder.addActiveLogs(activeLogBuilder.build()); + if (count != 0) { + ProtosFactory.ActiveLog.Builder activeLogBuilder = + ProtosFactory.ActiveLog.newBuilder(); + activeLogBuilder.setLogFileID(logFileID); + activeLogBuilder.setCount(count); + checkpointBuilder.addActiveLogs(activeLogBuilder.build()); } } FileOutputStream outputStream = new FileOutputStream(metaDataFile); @@ -161,8 +161,8 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { } static void upgrade(EventQueueBackingStoreFileV2 backingStoreV2, - File checkpointFile, File metaDataFile) - throws IOException { + File checkpointFile, File metaDataFile) + throws IOException { int head = backingStoreV2.getHead(); int size = backingStoreV2.getSize(); @@ -176,14 +176,14 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { checkpointBuilder.setQueueHead(head); checkpointBuilder.setQueueSize(size); checkpointBuilder.setWriteOrderID(writeOrderID); - for(Integer logFileID : referenceCounts.keySet()) { + for (Integer logFileID : referenceCounts.keySet()) { int count = referenceCounts.get(logFileID).get(); - if(count > 0) { - ProtosFactory.ActiveLog.Builder activeLogBuilder = - ProtosFactory.ActiveLog.newBuilder(); - activeLogBuilder.setLogFileID(logFileID); - activeLogBuilder.setCount(count); - checkpointBuilder.addActiveLogs(activeLogBuilder.build()); + if (count > 0) { + ProtosFactory.ActiveLog.Builder activeLogBuilder = + ProtosFactory.ActiveLog.newBuilder(); + activeLogBuilder.setLogFileID(logFileID); + activeLogBuilder.setCount(count); + checkpointBuilder.addActiveLogs(activeLogBuilder.build()); } } FileOutputStream outputStream = new FileOutputStream(metaDataFile); http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java index ff5242a..4c0c96c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventUtils.java @@ -33,7 +33,7 @@ public class EventUtils { * @return Event if Put instance is present, null otherwise */ public static Event getEventFromTransactionEvent(TransactionEventRecord transactionEventRecord) { - if(transactionEventRecord instanceof Put) { + if (transactionEventRecord instanceof Put) { return ((Put)transactionEventRecord).getEvent(); } return null; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index ed2b996..9d82e43 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -25,7 +25,11 @@ import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; -import org.apache.flume.*; +import org.apache.flume.Channel; +import org.apache.flume.ChannelException; +import org.apache.flume.ChannelFullException; +import org.apache.flume.Context; +import org.apache.flume.Event; import org.apache.flume.annotations.Disposable; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; @@ -71,8 +75,7 @@ import java.util.concurrent.TimeUnit; @Disposable public class FileChannel extends BasicChannelSemantics { - private static final Logger LOG = LoggerFactory - .getLogger(FileChannel.class); + private static final Logger LOG = LoggerFactory.getLogger(FileChannel.class); private Integer capacity = 0; private int keepAlive; @@ -125,8 +128,8 @@ public class FileChannel extends BasicChannelSemantics { context.getString(FileChannelConfiguration.CHECKPOINT_DIR, homePath + "/.flume/file-channel/checkpoint").trim(); - String strBackupCheckpointDir = context.getString - (FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, "").trim(); + String strBackupCheckpointDir = + context.getString(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, "").trim(); String[] strDataDirs = Iterables.toArray( Splitter.on(",").trimResults().omitEmptyStrings().split( @@ -137,9 +140,9 @@ public class FileChannel extends BasicChannelSemantics { if (useDualCheckpoints) { Preconditions.checkState(!strBackupCheckpointDir.isEmpty(), - "Dual checkpointing is enabled, but the backup directory is not set. " + - "Please set " + FileChannelConfiguration.BACKUP_CHECKPOINT_DIR + " " + - "to enable dual checkpointing"); + "Dual checkpointing is enabled, but the backup directory is not set. " + + "Please set " + FileChannelConfiguration.BACKUP_CHECKPOINT_DIR + " " + + "to enable dual checkpointing"); backupCheckpointDir = new File(strBackupCheckpointDir); /* * If the backup directory is the same as the checkpoint directory, @@ -147,9 +150,9 @@ public class FileChannel extends BasicChannelSemantics { * channel. */ Preconditions.checkState(!backupCheckpointDir.equals(checkpointDir), - "Could not configure " + getName() + ". The checkpoint backup " + - "directory and the checkpoint directory are " + - "configured to be the same."); + "Could not configure " + getName() + ". The checkpoint backup " + + "directory and the checkpoint directory are " + + "configured to be the same."); } dataDirs = new File[strDataDirs.length]; @@ -159,10 +162,10 @@ public class FileChannel extends BasicChannelSemantics { capacity = context.getInteger(FileChannelConfiguration.CAPACITY, FileChannelConfiguration.DEFAULT_CAPACITY); - if(capacity <= 0) { + if (capacity <= 0) { capacity = FileChannelConfiguration.DEFAULT_CAPACITY; LOG.warn("Invalid capacity specified, initializing channel to " - + "default capacity of {}", capacity); + + "default capacity of {}", capacity); } keepAlive = @@ -172,48 +175,48 @@ public class FileChannel extends BasicChannelSemantics { context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY, FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY); - if(transactionCapacity <= 0) { + if (transactionCapacity <= 0) { transactionCapacity = - FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY; + FileChannelConfiguration.DEFAULT_TRANSACTION_CAPACITY; LOG.warn("Invalid transaction capacity specified, " + "initializing channel to default " + "capacity of {}", transactionCapacity); } Preconditions.checkState(transactionCapacity <= capacity, - "File Channel transaction capacity cannot be greater than the " + - "capacity of the channel."); + "File Channel transaction capacity cannot be greater than the " + + "capacity of the channel."); checkpointInterval = - context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL, + context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL, FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL); if (checkpointInterval <= 0) { LOG.warn("Checkpoint interval is invalid: " + checkpointInterval - + ", using default: " - + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL); + + ", using default: " + + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL); checkpointInterval = - FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL; + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL; } // cannot be over FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE maxFileSize = Math.min( - context.getLong(FileChannelConfiguration.MAX_FILE_SIZE, - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE), - FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE); + context.getLong(FileChannelConfiguration.MAX_FILE_SIZE, + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE), + FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE); minimumRequiredSpace = Math.max( - context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE, - FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE), - FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE); + context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE, + FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE), + FileChannelConfiguration.FLOOR_MINIMUM_REQUIRED_SPACE); useLogReplayV1 = context.getBoolean( FileChannelConfiguration.USE_LOG_REPLAY_V1, - FileChannelConfiguration.DEFAULT_USE_LOG_REPLAY_V1); + FileChannelConfiguration.DEFAULT_USE_LOG_REPLAY_V1); useFastReplay = context.getBoolean( - FileChannelConfiguration.USE_FAST_REPLAY, - FileChannelConfiguration.DEFAULT_USE_FAST_REPLAY); + FileChannelConfiguration.USE_FAST_REPLAY, + FileChannelConfiguration.DEFAULT_USE_FAST_REPLAY); Context encryptionContext = new Context( context.getSubProperties(EncryptionConfiguration.ENCRYPTION_PREFIX + @@ -224,41 +227,41 @@ public class FileChannel extends BasicChannelSemantics { EncryptionConfiguration.ACTIVE_KEY); encryptionCipherProvider = encryptionContext.getString( EncryptionConfiguration.CIPHER_PROVIDER); - if(encryptionKeyProviderName != null) { + if (encryptionKeyProviderName != null) { Preconditions.checkState(!Strings.isNullOrEmpty(encryptionActiveKey), "Encryption configuration problem: " + EncryptionConfiguration.ACTIVE_KEY + " is missing"); Preconditions.checkState(!Strings.isNullOrEmpty(encryptionCipherProvider), "Encryption configuration problem: " + EncryptionConfiguration.CIPHER_PROVIDER + " is missing"); - Context keyProviderContext = new Context(encryptionContext. - getSubProperties(EncryptionConfiguration.KEY_PROVIDER + ".")); - encryptionKeyProvider = KeyProviderFactory. - getInstance(encryptionKeyProviderName, keyProviderContext); + Context keyProviderContext = new Context( + encryptionContext.getSubProperties(EncryptionConfiguration.KEY_PROVIDER + ".")); + encryptionKeyProvider = KeyProviderFactory.getInstance( + encryptionKeyProviderName, keyProviderContext); } else { Preconditions.checkState(encryptionActiveKey == null, "Encryption configuration problem: " + EncryptionConfiguration.ACTIVE_KEY + " is present while key " + - "provider name is not."); + "provider name is not."); Preconditions.checkState(encryptionCipherProvider == null, "Encryption configuration problem: " + EncryptionConfiguration.CIPHER_PROVIDER + " is present while " + - "key provider name is not."); + "key provider name is not."); } fsyncPerTransaction = context.getBoolean(FileChannelConfiguration - .FSYNC_PER_TXN, FileChannelConfiguration.DEFAULT_FSYNC_PRE_TXN); + .FSYNC_PER_TXN, FileChannelConfiguration.DEFAULT_FSYNC_PRE_TXN); fsyncInterval = context.getInteger(FileChannelConfiguration - .FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL); + .FSYNC_INTERVAL, FileChannelConfiguration.DEFAULT_FSYNC_INTERVAL); checkpointOnClose = context.getBoolean(FileChannelConfiguration - .CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE); + .CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE); - if(queueRemaining == null) { + if (queueRemaining == null) { queueRemaining = new Semaphore(capacity, true); } - if(log != null) { + if (log != null) { log.setCheckpointInterval(checkpointInterval); log.setMaxFileSize(maxFileSize); } @@ -299,7 +302,7 @@ public class FileChannel extends BasicChannelSemantics { Preconditions.checkState(queueRemaining.tryAcquire(depth), "Unable to acquire " + depth + " permits " + channelNameDescriptor); LOG.info("Queue Size after replay: " + depth + " " - + channelNameDescriptor); + + channelNameDescriptor); } catch (Throwable t) { open = false; startupError = t; @@ -337,9 +340,9 @@ public class FileChannel extends BasicChannelSemantics { @Override protected BasicTransactionSemantics createTransaction() { - if(!open) { + if (!open) { String msg = "Channel closed " + channelNameDescriptor; - if(startupError != null) { + if (startupError != null) { msg += ". Due to " + startupError.getClass().getName() + ": " + startupError.getMessage(); throw new IllegalStateException(msg, startupError); @@ -348,27 +351,28 @@ public class FileChannel extends BasicChannelSemantics { } FileBackedTransaction trans = transactions.get(); - if(trans != null && !trans.isClosed()) { + if (trans != null && !trans.isClosed()) { Preconditions.checkState(false, "Thread has transaction which is still open: " + - trans.getStateAsString() + channelNameDescriptor); + trans.getStateAsString() + channelNameDescriptor); } trans = new FileBackedTransaction(log, TransactionIDOracle.next(), - transactionCapacity, keepAlive, queueRemaining, getName(), - fsyncPerTransaction, channelCounter); + transactionCapacity, keepAlive, queueRemaining, getName(), + fsyncPerTransaction, channelCounter); transactions.set(trans); return trans; } protected int getDepth() { - Preconditions.checkState(open, "Channel closed" + channelNameDescriptor); + Preconditions.checkState(open, "Channel closed" + channelNameDescriptor); Preconditions.checkNotNull(log, "log"); FlumeEventQueue queue = log.getFlumeEventQueue(); Preconditions.checkNotNull(queue, "queue"); return queue.getSize(); } + void close() { - if(open) { + if (open) { open = false; try { log.close(); @@ -398,11 +402,12 @@ public class FileChannel extends BasicChannelSemantics { /** * Did this channel recover a backup of the checkpoint to restart? + * * @return true if the channel recovered using a backup. */ @VisibleForTesting boolean checkpointBackupRestored() { - if(log != null) { + if (log != null) { return log.backupRestored(); } return false; @@ -428,10 +433,11 @@ public class FileChannel extends BasicChannelSemantics { private final String channelNameDescriptor; private final ChannelCounter channelCounter; private final boolean fsyncPerTransaction; + public FileBackedTransaction(Log log, long transactionID, - int transCapacity, int keepAlive, Semaphore queueRemaining, - String name, boolean fsyncPerTransaction, ChannelCounter - counter) { + int transCapacity, int keepAlive, Semaphore queueRemaining, + String name, boolean fsyncPerTransaction, ChannelCounter + counter) { this.log = log; queue = log.getFlumeEventQueue(); this.transactionID = transactionID; @@ -443,9 +449,11 @@ public class FileChannel extends BasicChannelSemantics { channelNameDescriptor = "[channel=" + name + "]"; this.channelCounter = counter; } + private boolean isClosed() { return State.CLOSED.equals(getState()); } + private String getStateAsString() { return String.valueOf(getState()); } @@ -453,7 +461,7 @@ public class FileChannel extends BasicChannelSemantics { @Override protected void doPut(Event event) throws InterruptedException { channelCounter.incrementEventPutAttemptCount(); - if(putList.remainingCapacity() == 0) { + if (putList.remainingCapacity() == 0) { throw new ChannelException("Put queue for FileBackedTransaction " + "of capacity " + putList.size() + " full, consider " + "committing more frequently, increasing capacity or " + @@ -461,7 +469,7 @@ public class FileChannel extends BasicChannelSemantics { } // this does not need to be in the critical section as it does not // modify the structure of the log or queue. - if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) { + if (!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) { throw new ChannelFullException("The channel has reached it's capacity. " + "This might be the result of a sink on the channel having too " + "low of batch size, a downstream system running slower than " @@ -473,15 +481,15 @@ public class FileChannel extends BasicChannelSemantics { try { FlumeEventPointer ptr = log.put(transactionID, event); Preconditions.checkState(putList.offer(ptr), "putList offer failed " - + channelNameDescriptor); + + channelNameDescriptor); queue.addWithoutCommit(ptr, transactionID); success = true; } catch (IOException e) { throw new ChannelException("Put failed due to IO error " - + channelNameDescriptor, e); + + channelNameDescriptor, e); } finally { log.unlockShared(); - if(!success) { + if (!success) { // release slot obtained in the case // the put fails for any reason queueRemaining.release(); @@ -492,11 +500,11 @@ public class FileChannel extends BasicChannelSemantics { @Override protected Event doTake() throws InterruptedException { channelCounter.incrementEventTakeAttemptCount(); - if(takeList.remainingCapacity() == 0) { + if (takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for FileBackedTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + "increasing capacity, or increasing thread count. " - + channelNameDescriptor); + + channelNameDescriptor); } log.lockShared(); /* @@ -517,24 +525,24 @@ public class FileChannel extends BasicChannelSemantics { // first add to takeList so that if write to disk // fails rollback actually does it's work Preconditions.checkState(takeList.offer(ptr), - "takeList offer failed " - + channelNameDescriptor); + "takeList offer failed " + + channelNameDescriptor); log.take(transactionID, ptr); // write take to disk Event event = log.get(ptr); return event; } catch (IOException e) { throw new ChannelException("Take failed due to IO error " - + channelNameDescriptor, e); + + channelNameDescriptor, e); } catch (NoopRecordException e) { LOG.warn("Corrupt record replaced by File Channel Integrity " + - "tool found. Will retrieve next event", e); + "tool found. Will retrieve next event", e); takeList.remove(ptr); } catch (CorruptEventException ex) { if (fsyncPerTransaction) { throw new ChannelException(ex); } LOG.warn("Corrupt record found. Event will be " + - "skipped, and next event will be read.", ex); + "skipped, and next event will be read.", ex); takeList.remove(ptr); } } @@ -543,20 +551,21 @@ public class FileChannel extends BasicChannelSemantics { log.unlockShared(); } } + @Override protected void doCommit() throws InterruptedException { int puts = putList.size(); int takes = takeList.size(); - if(puts > 0) { + if (puts > 0) { Preconditions.checkState(takes == 0, "nonzero puts and takes " - + channelNameDescriptor); + + channelNameDescriptor); log.lockShared(); try { log.commitPut(transactionID); channelCounter.addToEventPutSuccessCount(puts); synchronized (queue) { - while(!putList.isEmpty()) { - if(!queue.addTail(putList.removeFirst())) { + while (!putList.isEmpty()) { + if (!queue.addTail(putList.removeFirst())) { StringBuilder msg = new StringBuilder(); msg.append("Queue add failed, this shouldn't be able to "); msg.append("happen. A portion of the transaction has been "); @@ -572,7 +581,7 @@ public class FileChannel extends BasicChannelSemantics { } } catch (IOException e) { throw new ChannelException("Commit failed due to IO error " - + channelNameDescriptor, e); + + channelNameDescriptor, e); } finally { log.unlockShared(); } @@ -595,13 +604,14 @@ public class FileChannel extends BasicChannelSemantics { takeList.clear(); channelCounter.setChannelSize(queue.getSize()); } + @Override protected void doRollback() throws InterruptedException { int puts = putList.size(); int takes = takeList.size(); log.lockShared(); try { - if(takes > 0) { + if (takes > 0) { Preconditions.checkState(puts == 0, "nonzero puts and takes " + channelNameDescriptor); synchronized (queue) { http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java index 5c3c48f..c5678d4 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java @@ -87,10 +87,8 @@ public class FileChannelConfiguration { public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints"; public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false; - public static final String COMPRESS_BACKUP_CHECKPOINT = - "compressBackupCheckpoint"; - public static final boolean DEFAULT_COMPRESS_BACKUP_CHECKPOINT - = false; + public static final String COMPRESS_BACKUP_CHECKPOINT = "compressBackupCheckpoint"; + public static final boolean DEFAULT_COMPRESS_BACKUP_CHECKPOINT = false; public static final String FSYNC_PER_TXN = "fsyncPerTransaction"; public static final boolean DEFAULT_FSYNC_PRE_TXN = true; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java index 53c1251..cd1b6d8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEvent.java @@ -37,15 +37,15 @@ import org.apache.flume.Event; */ class FlumeEvent implements Event, Writable { - private static final byte EVENT_MAP_TEXT_WRITABLE_ID = Byte.valueOf(Integer.valueOf(-116).byteValue()); + private static final byte EVENT_MAP_TEXT_WRITABLE_ID = + Byte.valueOf(Integer.valueOf(-116).byteValue()); - private static ThreadLocal ENCODER_FACTORY = - new ThreadLocal() { + private static ThreadLocal ENCODER_FACTORY = new ThreadLocal() { @Override protected CharsetEncoder initialValue() { - return Charset.forName("UTF-8").newEncoder(). - onMalformedInput(CodingErrorAction.REPLACE). - onUnmappableCharacter(CodingErrorAction.REPLACE); + return Charset.forName("UTF-8").newEncoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); } }; @@ -53,9 +53,9 @@ class FlumeEvent implements Event, Writable { new ThreadLocal() { @Override protected CharsetDecoder initialValue() { - return Charset.forName("UTF-8").newDecoder(). - onMalformedInput(CodingErrorAction.REPLACE). - onUnmappableCharacter(CodingErrorAction.REPLACE); + return Charset.forName("UTF-8").newDecoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE); } }; @@ -65,6 +65,7 @@ class FlumeEvent implements Event, Writable { private FlumeEvent() { this(null, null); } + FlumeEvent(Map headers, byte[] body) { this.headers = headers; this.body = body; @@ -116,13 +117,12 @@ class FlumeEvent implements Event, Writable { WritableUtils.writeVInt(out, valueLength ); out.write(valueBytes.array(), 0, valueLength); } - } - else { + } else { out.writeInt( 0 ); } byte[] body = getBody(); - if(body == null) { + if (body == null) { out.writeInt(-1); } else { out.writeInt(body.length); @@ -174,7 +174,7 @@ class FlumeEvent implements Event, Writable { byte[] body = null; int bodyLength = in.readInt(); - if(bodyLength != -1) { + if (bodyLength != -1) { body = new byte[bodyLength]; in.readFully(body); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java index 5f06ab7..ebf7843 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java @@ -27,6 +27,7 @@ package org.apache.flume.channel.file; class FlumeEventPointer { private final int fileID; private final int offset; + FlumeEventPointer(int fileID, int offset) { this.fileID = fileID; this.offset = offset; @@ -34,24 +35,28 @@ class FlumeEventPointer { * Log files used to have a header, now metadata is in * a separate file so data starts at offset 0. */ - if(offset < 0) { + if (offset < 0) { throw new IllegalArgumentException("offset = " + offset + "(" + Integer.toHexString(offset) + ")" + ", fileID = " + fileID + "(" + Integer.toHexString(fileID) + ")"); } } + int getFileID() { return fileID; } + int getOffset() { return offset; } + public long toLong() { long result = fileID; result = (long)fileID << 32; result += (long)offset; return result; } + @Override public int hashCode() { final int prime = 31; @@ -60,6 +65,7 @@ class FlumeEventPointer { result = prime * result + offset; return result; } + @Override public boolean equals(Object obj) { if (this == obj) { @@ -80,10 +86,12 @@ class FlumeEventPointer { } return true; } + @Override public String toString() { return "FlumeEventPointer [fileID=" + fileID + ", offset=" + offset + "]"; } + public static FlumeEventPointer fromLong(long value) { int fileID = (int)(value >>> 32); int offset = (int)value; http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index d305f4d..4311c7f 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -18,6 +18,17 @@ */ package org.apache.flume.channel.file; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ArrayUtils; +import org.mapdb.DB; +import org.mapdb.DBMaker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -31,18 +42,6 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.ArrayUtils; -import org.mapdb.DB; -import org.mapdb.DBMaker; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.SetMultimap; - /** * Queue of events in the channel. This queue stores only * {@link FlumeEventPointer} objects which are represented @@ -54,7 +53,7 @@ import com.google.common.collect.SetMultimap; */ final class FlumeEventQueue { private static final Logger LOG = LoggerFactory - .getLogger(FlumeEventQueue.class); + .getLogger(FlumeEventQueue.class); private static final int EMPTY = 0; private final EventQueueBackingStore backingStore; private final String channelNameDescriptor; @@ -72,7 +71,7 @@ final class FlumeEventQueue { * @throws IOException */ FlumeEventQueue(EventQueueBackingStore backingStore, File inflightTakesFile, - File inflightPutsFile, File queueSetDBDir) throws Exception { + File inflightPutsFile, File queueSetDBDir) throws Exception { Preconditions.checkArgument(backingStore.getCapacity() > 0, "Capacity must be greater than zero"); Preconditions.checkNotNull(backingStore, "backingStore"); @@ -88,13 +87,13 @@ final class FlumeEventQueue { LOG.error("Could not read checkpoint.", e); throw e; } - if(queueSetDBDir.isDirectory()) { + if (queueSetDBDir.isDirectory()) { FileUtils.deleteDirectory(queueSetDBDir); - } else if(queueSetDBDir.isFile() && !queueSetDBDir.delete()) { + } else if (queueSetDBDir.isFile() && !queueSetDBDir.delete()) { throw new IOException("QueueSetDir " + queueSetDBDir + " is a file and" + " could not be deleted"); } - if(!queueSetDBDir.mkdirs()) { + if (!queueSetDBDir.mkdirs()) { throw new IllegalStateException("Could not create QueueSet Dir " + queueSetDBDir); } @@ -108,7 +107,7 @@ final class FlumeEventQueue { .mmapFileEnableIfSupported() .make(); queueSet = - db.createHashSet("QueueSet " + " - " + backingStore.getName()).make(); + db.createHashSet("QueueSet " + " - " + backingStore.getName()).make(); long start = System.currentTimeMillis(); for (int i = 0; i < backingStore.getSize(); i++) { queueSet.add(get(i)); @@ -118,12 +117,12 @@ final class FlumeEventQueue { } SetMultimap deserializeInflightPuts() - throws IOException, BadCheckpointException{ + throws IOException, BadCheckpointException { return inflightPuts.deserialize(); } SetMultimap deserializeInflightTakes() - throws IOException, BadCheckpointException{ + throws IOException, BadCheckpointException { return inflightTakes.deserialize(); } @@ -133,9 +132,9 @@ final class FlumeEventQueue { synchronized boolean checkpoint(boolean force) throws Exception { if (!backingStore.syncRequired() - && !inflightTakes.syncRequired() - && !force) { //No need to check inflight puts, since that would - //cause elements.syncRequired() to return true. + && !inflightTakes.syncRequired() + && !force) { //No need to check inflight puts, since that would + //cause elements.syncRequired() to return true. LOG.debug("Checkpoint not required"); return false; } @@ -152,13 +151,13 @@ final class FlumeEventQueue { * @return FlumeEventPointer or null if queue is empty */ synchronized FlumeEventPointer removeHead(long transactionID) { - if(backingStore.getSize() == 0) { + if (backingStore.getSize() == 0) { return null; } long value = remove(0, transactionID); Preconditions.checkState(value != EMPTY, "Empty value " - + channelNameDescriptor); + + channelNameDescriptor); FlumeEventPointer ptr = FlumeEventPointer.fromLong(value); backingStore.decrementFileID(ptr.getFileID()); @@ -168,6 +167,7 @@ final class FlumeEventQueue { /** * Add a FlumeEventPointer to the head of the queue. * Called during rollbacks. + * * @param FlumeEventPointer to be added * @return true if space was available and pointer was * added to the queue @@ -180,7 +180,7 @@ final class FlumeEventQueue { //there is a buuuuuuuug! if (backingStore.getSize() == backingStore.getCapacity()) { LOG.error("Could not reinsert to queue, events which were taken but " - + "not committed. Please report this issue."); + + "not committed. Please report this issue."); return false; } @@ -195,6 +195,7 @@ final class FlumeEventQueue { /** * Add a FlumeEventPointer to the tail of the queue. + * * @param FlumeEventPointer to be added * @return true if space was available and pointer * was added to the queue @@ -215,6 +216,7 @@ final class FlumeEventQueue { /** * Must be called when a put happens to the log. This ensures that put commits * after checkpoints will retrieve all events committed in that txn. + * * @param e * @param transactionID */ @@ -227,16 +229,19 @@ final class FlumeEventQueue { * only be used when recovering from a crash. It is not * legal to call this method after replayComplete has been * called. + * * @param FlumeEventPointer to be removed * @return true if the FlumeEventPointer was found * and removed */ + // remove() overloads should not be split, according to checkstyle. + // CHECKSTYLE:OFF synchronized boolean remove(FlumeEventPointer e) { long value = e.toLong(); Preconditions.checkArgument(value != EMPTY); if (queueSet == null) { - throw new IllegalStateException("QueueSet is null, thus replayComplete" - + " has been called which is illegal"); + throw new IllegalStateException("QueueSet is null, thus replayComplete" + + " has been called which is illegal"); } if (!queueSet.contains(value)) { return false; @@ -244,7 +249,7 @@ final class FlumeEventQueue { searchCount++; long start = System.currentTimeMillis(); for (int i = 0; i < backingStore.getSize(); i++) { - if(get(i) == value) { + if (get(i) == value) { remove(i, 0); FlumeEventPointer ptr = FlumeEventPointer.fromLong(value); backingStore.decrementFileID(ptr.getFileID()); @@ -255,6 +260,8 @@ final class FlumeEventQueue { searchTime += System.currentTimeMillis() - start; return false; } + // CHECKSTYLE:ON + /** * @return a copy of the set of fileIDs which are currently on the queue * will be normally be used when deciding which data files can @@ -299,19 +306,19 @@ final class FlumeEventQueue { backingStore.setSize(backingStore.getSize() + 1); - if (index <= backingStore.getSize()/2) { + if (index <= backingStore.getSize() / 2) { // Shift left backingStore.setHead(backingStore.getHead() - 1); if (backingStore.getHead() < 0) { backingStore.setHead(backingStore.getCapacity() - 1); } for (int i = 0; i < index; i++) { - set(i, get(i+1)); + set(i, get(i + 1)); } } else { // Sift right for (int i = backingStore.getSize() - 1; i > index; i--) { - set(i, get(i-1)); + set(i, get(i - 1)); } } set(index, value); @@ -323,6 +330,7 @@ final class FlumeEventQueue { /** * Must be called when a transaction is being committed or rolled back. + * * @param transactionID */ synchronized void completeTransaction(long transactionID) { @@ -334,7 +342,7 @@ final class FlumeEventQueue { protected synchronized long remove(int index, long transactionID) { if (index < 0 || index > backingStore.getSize() - 1) { throw new IndexOutOfBoundsException("index = " + index - + ", queueSize " + backingStore.getSize() +" " + channelNameDescriptor); + + ", queueSize " + backingStore.getSize() + " " + channelNameDescriptor); } copyCount++; long start = System.currentTimeMillis(); @@ -343,13 +351,13 @@ final class FlumeEventQueue { queueSet.remove(value); } //if txn id = 0, we are recovering from a crash. - if(transactionID != 0) { + if (transactionID != 0) { inflightTakes.addEvent(transactionID, value); } - if (index > backingStore.getSize()/2) { + if (index > backingStore.getSize() / 2) { // Move tail part to left for (int i = index; i < backingStore.getSize() - 1; i++) { - long rightValue = get(i+1); + long rightValue = get(i + 1); set(i, rightValue); } set(backingStore.getSize() - 1, EMPTY); @@ -357,7 +365,7 @@ final class FlumeEventQueue { // Move head part to right for (int i = index - 1; i >= 0; i--) { long leftValue = get(i); - set(i+1, leftValue); + set(i + 1, leftValue); } set(0, EMPTY); backingStore.setHead(backingStore.getHead() + 1); @@ -386,7 +394,7 @@ final class FlumeEventQueue { if (db != null) { db.close(); } - } catch(Exception ex) { + } catch (Exception ex) { LOG.warn("Error closing db", ex); } try { @@ -407,7 +415,7 @@ final class FlumeEventQueue { searchTime + ", Copy Count = " + copyCount + ", Copy Time = " + copyTime; LOG.info(msg); - if(db != null) { + if (db != null) { db.close(); } queueSet = null; @@ -440,11 +448,11 @@ final class FlumeEventQueue { private volatile boolean syncRequired = false; private SetMultimap inflightFileIDs = HashMultimap.create(); - public InflightEventWrapper(File inflightEventsFile) throws Exception{ - if(!inflightEventsFile.exists()){ - Preconditions.checkState(inflightEventsFile.createNewFile(),"Could not" - + "create inflight events file: " - + inflightEventsFile.getCanonicalPath()); + public InflightEventWrapper(File inflightEventsFile) throws Exception { + if (!inflightEventsFile.exists()) { + Preconditions.checkState(inflightEventsFile.createNewFile(), "Could not" + + "create inflight events file: " + + inflightEventsFile.getCanonicalPath()); } this.inflightEventsFile = inflightEventsFile; file = new RandomAccessFile(inflightEventsFile, "rw"); @@ -454,10 +462,11 @@ final class FlumeEventQueue { /** * Complete the transaction, and remove all events from inflight list. + * * @param transactionID */ public boolean completeTransaction(Long transactionID) { - if(!inflightEvents.containsKey(transactionID)) { + if (!inflightEvents.containsKey(transactionID)) { return false; } inflightEvents.removeAll(transactionID); @@ -468,28 +477,30 @@ final class FlumeEventQueue { /** * Add an event pointer to the inflights list. + * * @param transactionID * @param pointer */ - public void addEvent(Long transactionID, Long pointer){ + public void addEvent(Long transactionID, Long pointer) { inflightEvents.put(transactionID, pointer); inflightFileIDs.put(transactionID, - FlumeEventPointer.fromLong(pointer).getFileID()); + FlumeEventPointer.fromLong(pointer).getFileID()); syncRequired = true; } /** * Serialize the set of in flights into a byte longBuffer. + * * @return Returns the checksum of the buffer that is being * asynchronously written to disk. */ public void serializeAndWrite() throws Exception { Collection values = inflightEvents.values(); - if(!fileChannel.isOpen()){ + if (!fileChannel.isOpen()) { file = new RandomAccessFile(inflightEventsFile, "rw"); fileChannel = file.getChannel(); } - if(values.isEmpty()){ + if (values.isEmpty()) { file.setLength(0L); } //What is written out? @@ -498,14 +509,15 @@ final class FlumeEventQueue { //transactionid numberofeventsforthistxn listofeventpointers try { - int expectedFileSize = (((inflightEvents.keySet().size() * 2) //For transactionIDs and events per txn ID - + values.size()) * 8) //Event pointers - + 16; //Checksum + int expectedFileSize = (((inflightEvents.keySet().size() * 2) //for transactionIDs and + //events per txn ID + + values.size()) * 8) //Event pointers + + 16; //Checksum //There is no real need of filling the channel with 0s, since we //will write the exact nummber of bytes as expected file size. file.setLength(expectedFileSize); Preconditions.checkState(file.length() == expectedFileSize, - "Expected File size of inflight events file does not match the " + "Expected File size of inflight events file does not match the " + "current file size. Checkpoint is incomplete."); file.seek(0); final ByteBuffer buffer = ByteBuffer.allocate(expectedFileSize); @@ -515,10 +527,10 @@ final class FlumeEventQueue { longBuffer.put(txnID); longBuffer.put((long) pointers.size()); LOG.debug("Number of events inserted into " - + "inflights file: " + String.valueOf(pointers.size()) - + " file: " + inflightEventsFile.getCanonicalPath()); + + "inflights file: " + String.valueOf(pointers.size()) + + " file: " + inflightEventsFile.getCanonicalPath()); long[] written = ArrayUtils.toPrimitive( - pointers.toArray(new Long[0])); + pointers.toArray(new Long[0])); longBuffer.put(written); } byte[] checksum = digest.digest(buffer.array()); @@ -539,56 +551,55 @@ final class FlumeEventQueue { * of transactionIDs to events that were inflight. * * @return - map of inflight events per txnID. - * */ public SetMultimap deserialize() - throws IOException, BadCheckpointException { + throws IOException, BadCheckpointException { SetMultimap inflights = HashMultimap.create(); if (!fileChannel.isOpen()) { file = new RandomAccessFile(inflightEventsFile, "rw"); fileChannel = file.getChannel(); } - if(file.length() == 0) { + if (file.length() == 0) { return inflights; } file.seek(0); byte[] checksum = new byte[16]; file.read(checksum); ByteBuffer buffer = ByteBuffer.allocate( - (int)(file.length() - file.getFilePointer())); + (int) (file.length() - file.getFilePointer())); fileChannel.read(buffer); byte[] fileChecksum = digest.digest(buffer.array()); if (!Arrays.equals(checksum, fileChecksum)) { throw new BadCheckpointException("Checksum of inflights file differs" - + " from the checksum expected."); + + " from the checksum expected."); } buffer.position(0); LongBuffer longBuffer = buffer.asLongBuffer(); try { while (true) { long txnID = longBuffer.get(); - int numEvents = (int)(longBuffer.get()); - for(int i = 0; i < numEvents; i++) { + int numEvents = (int) (longBuffer.get()); + for (int i = 0; i < numEvents; i++) { long val = longBuffer.get(); inflights.put(txnID, val); } } } catch (BufferUnderflowException ex) { LOG.debug("Reached end of inflights buffer. Long buffer position =" - + String.valueOf(longBuffer.position())); + + String.valueOf(longBuffer.position())); } - return inflights; + return inflights; } public int getSize() { return inflightEvents.size(); } - public boolean syncRequired(){ + public boolean syncRequired() { return syncRequired; } - public Collection getFileIDs(){ + public Collection getFileIDs() { return inflightFileIDs.values(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 247c287..02d8e7f 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -63,7 +63,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; * Stores FlumeEvents on disk and pointers to the events in a in memory queue. * Once a log object is created the replay method should be called to reconcile * the on disk write ahead log with the last checkpoint of the queue. - * + *

* Before calling any of commitPut/commitTake/get/put/rollback/take * {@linkplain org.apache.flume.channel.file.Log#lockShared()} * should be called. After @@ -217,12 +217,12 @@ public class Log { return this; } - Builder setUseLogReplayV1(boolean useLogReplayV1){ + Builder setUseLogReplayV1(boolean useLogReplayV1) { this.useLogReplayV1 = useLogReplayV1; return this; } - Builder setUseFastReplay(boolean useFastReplay){ + Builder setUseFastReplay(boolean useFastReplay) { this.useFastReplay = useFastReplay; return this; } @@ -264,46 +264,46 @@ public class Log { Log build() throws IOException { return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity, - bUseDualCheckpoints, bCompressBackupCheckpoint,bCheckpointDir, - bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay, - bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias, - bEncryptionCipherProvider, bUsableSpaceRefreshInterval, - fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs); + bUseDualCheckpoints, bCompressBackupCheckpoint, bCheckpointDir, + bBackupCheckpointDir, bName, useLogReplayV1, useFastReplay, + bMinimumRequiredSpace, bEncryptionKeyProvider, bEncryptionKeyAlias, + bEncryptionCipherProvider, bUsableSpaceRefreshInterval, + fsyncPerTransaction, fsyncInterval, checkpointOnClose, bLogDirs); } } private Log(long checkpointInterval, long maxFileSize, int queueCapacity, - boolean useDualCheckpoints, boolean compressBackupCheckpoint, - File checkpointDir, File backupCheckpointDir, - String name, boolean useLogReplayV1, boolean useFastReplay, - long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider, - @Nullable String encryptionKeyAlias, - @Nullable String encryptionCipherProvider, - long usableSpaceRefreshInterval, boolean fsyncPerTransaction, - int fsyncInterval, boolean checkpointOnClose, File... logDirs) - throws IOException { + boolean useDualCheckpoints, boolean compressBackupCheckpoint, + File checkpointDir, File backupCheckpointDir, + String name, boolean useLogReplayV1, boolean useFastReplay, + long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider, + @Nullable String encryptionKeyAlias, + @Nullable String encryptionCipherProvider, + long usableSpaceRefreshInterval, boolean fsyncPerTransaction, + int fsyncInterval, boolean checkpointOnClose, File... logDirs) + throws IOException { Preconditions.checkArgument(checkpointInterval > 0, - "checkpointInterval <= 0"); + "checkpointInterval <= 0"); Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0"); Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0"); Preconditions.checkNotNull(checkpointDir, "checkpointDir"); Preconditions.checkArgument(usableSpaceRefreshInterval > 0, "usableSpaceRefreshInterval <= 0"); Preconditions.checkArgument( - checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir " - + checkpointDir + " could not be created"); + checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir " + + checkpointDir + " could not be created"); if (useDualCheckpoints) { Preconditions.checkNotNull(backupCheckpointDir, "backupCheckpointDir is" + - " null while dual checkpointing is enabled."); + " null while dual checkpointing is enabled."); Preconditions.checkArgument( - backupCheckpointDir.isDirectory() || backupCheckpointDir.mkdirs(), - "Backup CheckpointDir " + backupCheckpointDir + - " could not be created"); + backupCheckpointDir.isDirectory() || backupCheckpointDir.mkdirs(), + "Backup CheckpointDir " + backupCheckpointDir + + " could not be created"); } Preconditions.checkNotNull(logDirs, "logDirs"); Preconditions.checkArgument(logDirs.length > 0, "logDirs empty"); Preconditions.checkArgument(name != null && !name.trim().isEmpty(), - "channel name should be specified"); + "channel name should be specified"); this.channelNameDescriptor = "[channel=" + name + "]"; this.useLogReplayV1 = useLogReplayV1; @@ -317,20 +317,20 @@ public class Log { locks = Maps.newHashMap(); try { lock(checkpointDir); - if(useDualCheckpoints) { + if (useDualCheckpoints) { lock(backupCheckpointDir); } for (File logDir : logDirs) { lock(logDir); } - } catch(IOException e) { + } catch (IOException e) { unlock(checkpointDir); for (File logDir : logDirs) { unlock(logDir); } throw e; } - if(encryptionKeyProvider != null && encryptionKeyAlias != null && + if (encryptionKeyProvider != null && encryptionKeyAlias != null && encryptionCipherProvider != null) { LOGGER.info("Encryption is enabled with encryptionKeyProvider = " + encryptionKeyProvider + ", encryptionKeyAlias = " + encryptionKeyAlias @@ -346,7 +346,7 @@ public class Log { throw new IllegalArgumentException("Encryption configuration must all " + "null or all not null: encryptionKeyProvider = " + encryptionKeyProvider + ", encryptionKeyAlias = " + - encryptionKeyAlias + ", encryptionCipherProvider = " + + encryptionKeyAlias + ", encryptionCipherProvider = " + encryptionCipherProvider); } open = false; @@ -364,7 +364,7 @@ public class Log { logFiles = new AtomicReferenceArray(this.logDirs.length); workerExecutor = Executors.newSingleThreadScheduledExecutor(new - ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name) + ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name) .build()); workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this), this.checkpointInterval, this.checkpointInterval, @@ -374,6 +374,7 @@ public class Log { /** * Read checkpoint and data files from disk replaying them to the state * directly before the shutdown or crash. + * * @throws IOException */ void replay() throws IOException { @@ -416,8 +417,8 @@ public class Log { * locations. We will read the last one written to disk. */ File checkpointFile = new File(checkpointDir, "checkpoint"); - if(shouldFastReplay) { - if(checkpointFile.exists()) { + if (shouldFastReplay) { + if (checkpointFile.exists()) { LOGGER.debug("Disabling fast full replay because checkpoint " + "exists: " + checkpointFile); shouldFastReplay = false; @@ -434,14 +435,14 @@ public class Log { try { backingStore = - EventQueueBackingStoreFactory.get(checkpointFile, - backupCheckpointDir, queueCapacity, channelNameDescriptor, - true, this.useDualCheckpoints, - this.compressBackupCheckpoint); + EventQueueBackingStoreFactory.get(checkpointFile, + backupCheckpointDir, queueCapacity, channelNameDescriptor, + true, this.useDualCheckpoints, + this.compressBackupCheckpoint); queue = new FlumeEventQueue(backingStore, inflightTakesFile, - inflightPutsFile, queueSetDir); + inflightPutsFile, queueSetDir); LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified()) - + ", queue depth = " + queue.getSize()); + + ", queue depth = " + queue.getSize()); /* * We now have everything we need to actually replay the log files @@ -460,7 +461,7 @@ public class Log { + "Restoring checkpoint and starting up.", ex); if (EventQueueBackingStoreFile.backupExists(backupCheckpointDir)) { backupRestored = EventQueueBackingStoreFile.restoreBackup( - checkpointDir, backupCheckpointDir); + checkpointDir, backupCheckpointDir); } } if (!backupRestored) { @@ -472,16 +473,16 @@ public class Log { } } backingStore = EventQueueBackingStoreFactory.get( - checkpointFile, backupCheckpointDir, queueCapacity, - channelNameDescriptor, true, useDualCheckpoints, - compressBackupCheckpoint); + checkpointFile, backupCheckpointDir, queueCapacity, + channelNameDescriptor, true, useDualCheckpoints, + compressBackupCheckpoint); queue = new FlumeEventQueue(backingStore, inflightTakesFile, - inflightPutsFile, queueSetDir); + inflightPutsFile, queueSetDir); // If the checkpoint was deleted due to BadCheckpointException, then // trigger fast replay if the channel is configured to. shouldFastReplay = this.useFastReplay; doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay); - if(!shouldFastReplay) { + if (!shouldFastReplay) { didFullReplayDueToBadCheckpointException = true; } } @@ -514,13 +515,13 @@ public class Log { KeyProvider encryptionKeyProvider, boolean useFastReplay) throws Exception { CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles, - queue, fsyncPerTransaction); + queue, fsyncPerTransaction); if (useFastReplay && rebuilder.rebuild()) { didFastReplay = true; LOGGER.info("Fast replay successful."); } else { ReplayHandler replayHandler = new ReplayHandler(queue, - encryptionKeyProvider, fsyncPerTransaction); + encryptionKeyProvider, fsyncPerTransaction); if (useLogReplayV1) { LOGGER.info("Replaying logs with v1 replay logic"); replayHandler.replayLogv1(dataFiles); @@ -540,10 +541,12 @@ public class Log { boolean didFastReplay() { return didFastReplay; } + @VisibleForTesting public int getReadCount() { return readCount; } + @VisibleForTesting public int getPutCount() { return putCount; @@ -553,10 +556,12 @@ public class Log { public int getTakeCount() { return takeCount; } + @VisibleForTesting public int getCommittedCount() { return committedCount; } + @VisibleForTesting public int getRollbackCount() { return rollbackCount; @@ -564,6 +569,7 @@ public class Log { /** * Was a checkpoint backup used to replay? + * * @return true if a checkpoint backup was used to replay. */ @VisibleForTesting @@ -597,7 +603,7 @@ public class Log { * @throws InterruptedException */ FlumeEvent get(FlumeEventPointer pointer) throws IOException, - InterruptedException, NoopRecordException, CorruptEventException { + InterruptedException, NoopRecordException, CorruptEventException { Preconditions.checkState(open, "Log is closed"); int id = pointer.getFileID(); LogFile.RandomReader logFile = idLogFileMap.get(id); @@ -608,7 +614,7 @@ public class Log { if (fsyncPerTransaction) { open = false; throw new IOException("Corrupt event found. Please run File Channel " + - "Integrity tool.", ex); + "Integrity tool.", ex); } throw ex; } @@ -616,8 +622,9 @@ public class Log { /** * Log a put of an event - * + *

* Synchronization not required as this method is atomic + * * @param transactionID * @param event * @return @@ -633,7 +640,7 @@ public class Log { int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); long requiredSpace = minimumRequiredSpace + buffer.limit(); - if(usableSpace <= requiredSpace) { + if (usableSpace <= requiredSpace) { throw new IOException("Usable space exhausted, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); } @@ -644,7 +651,7 @@ public class Log { error = false; return ptr; } catch (LogFileRetryableIOException e) { - if(!open) { + if (!open) { throw e; } roll(logFileIndex, buffer); @@ -653,7 +660,7 @@ public class Log { return ptr; } } finally { - if(error && open) { + if (error && open) { roll(logFileIndex); } } @@ -661,8 +668,9 @@ public class Log { /** * Log a take of an event, pointer points at the corresponding put - * + *

* Synchronization not required as this method is atomic + * * @param transactionID * @param pointer * @throws IOException @@ -676,7 +684,7 @@ public class Log { int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); long requiredSpace = minimumRequiredSpace + buffer.limit(); - if(usableSpace <= requiredSpace) { + if (usableSpace <= requiredSpace) { throw new IOException("Usable space exhausted, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); } @@ -686,7 +694,7 @@ public class Log { logFiles.get(logFileIndex).take(buffer); error = false; } catch (LogFileRetryableIOException e) { - if(!open) { + if (!open) { throw e; } roll(logFileIndex, buffer); @@ -694,7 +702,7 @@ public class Log { error = false; } } finally { - if(error && open) { + if (error && open) { roll(logFileIndex); } } @@ -702,15 +710,16 @@ public class Log { /** * Log a rollback of a transaction - * + *

* Synchronization not required as this method is atomic + * * @param transactionID * @throws IOException */ void rollback(long transactionID) throws IOException { Preconditions.checkState(open, "Log is closed"); - if(LOGGER.isDebugEnabled()) { + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Rolling back " + transactionID); } Rollback rollback = new Rollback(transactionID, WriteOrderOracle.next()); @@ -718,7 +727,7 @@ public class Log { int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); long requiredSpace = minimumRequiredSpace + buffer.limit(); - if(usableSpace <= requiredSpace) { + if (usableSpace <= requiredSpace) { throw new IOException("Usable space exhausted, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); } @@ -728,7 +737,7 @@ public class Log { logFiles.get(logFileIndex).rollback(buffer); error = false; } catch (LogFileRetryableIOException e) { - if(!open) { + if (!open) { throw e; } roll(logFileIndex, buffer); @@ -736,7 +745,7 @@ public class Log { error = false; } } finally { - if(error && open) { + if (error && open) { roll(logFileIndex); } } @@ -747,14 +756,15 @@ public class Log { * so we know if the pointers corresponding to the events * should be added or removed from the flume queue. We * could infer but it's best to be explicit. - * + *

* Synchronization not required as this method is atomic + * * @param transactionID * @throws IOException * @throws InterruptedException */ void commitPut(long transactionID) throws IOException, - InterruptedException { + InterruptedException { Preconditions.checkState(open, "Log is closed"); commit(transactionID, TransactionEventRecord.Type.PUT.get()); } @@ -764,20 +774,21 @@ public class Log { * so we know if the pointers corresponding to the events * should be added or removed from the flume queue. We * could infer but it's best to be explicit. - * + *

* Synchronization not required as this method is atomic + * * @param transactionID * @throws IOException * @throws InterruptedException */ void commitTake(long transactionID) throws IOException, - InterruptedException { + InterruptedException { Preconditions.checkState(open, "Log is closed"); commit(transactionID, TransactionEventRecord.Type.TAKE.get()); } - private void unlockExclusive() { + private void unlockExclusive() { checkpointWriterLock.unlock(); } @@ -785,11 +796,11 @@ public class Log { checkpointReadLock.lock(); } - void unlockShared() { + void unlockShared() { checkpointReadLock.unlock(); } - private void lockExclusive(){ + private void lockExclusive() { checkpointWriterLock.lock(); } @@ -797,23 +808,23 @@ public class Log { * Synchronization not required since this method gets the write lock, * so checkpoint and this method cannot run at the same time. */ - void close() throws IOException{ + void close() throws IOException { lockExclusive(); try { open = false; try { - if(checkpointOnClose) { + if (checkpointOnClose) { writeCheckpoint(true); // do this before acquiring exclusive lock } } catch (Exception err) { LOGGER.warn("Failed creating checkpoint on close of channel " + channelNameDescriptor + - "Replay will take longer next time channel is started.", err); + "Replay will take longer next time channel is started.", err); } shutdownWorker(); if (logFiles != null) { for (int index = 0; index < logFiles.length(); index++) { LogFile.Writer writer = logFiles.get(index); - if(writer != null) { + if (writer != null) { writer.close(); } } @@ -862,9 +873,11 @@ public class Log { LOGGER.error("Interrupted while waiting for worker to die."); } } + void setCheckpointInterval(long checkpointInterval) { this.checkpointInterval = checkpointInterval; } + void setMaxFileSize(long maxFileSize) { this.maxFileSize = maxFileSize; } @@ -883,7 +896,7 @@ public class Log { int logFileIndex = nextLogWriter(transactionID); long usableSpace = logFiles.get(logFileIndex).getUsableSpace(); long requiredSpace = minimumRequiredSpace + buffer.limit(); - if(usableSpace <= requiredSpace) { + if (usableSpace <= requiredSpace) { throw new IOException("Usable space exhausted, only " + usableSpace + " bytes remaining, required " + requiredSpace + " bytes"); } @@ -898,7 +911,7 @@ public class Log { logFileWriter.sync(); error = false; } catch (LogFileRetryableIOException e) { - if(!open) { + if (!open) { throw e; } roll(logFileIndex, buffer); @@ -908,7 +921,7 @@ public class Log { error = false; } } finally { - if(error && open) { + if (error && open) { roll(logFileIndex); } } @@ -917,11 +930,13 @@ public class Log { /** * Atomic so not synchronization required. + * * @return */ private int nextLogWriter(long transactionID) { - return (int)Math.abs(transactionID % (long)logFiles.length()); + return (int) Math.abs(transactionID % (long) logFiles.length()); } + /** * Unconditionally roll * Synchronization done internally @@ -932,13 +947,14 @@ public class Log { private void roll(int index) throws IOException { roll(index, null); } + /** * Roll a log if needed. Roll always occurs if the log at the index * does not exist (typically on startup), or buffer is null. Otherwise * LogFile.Writer.isRollRequired is checked again to ensure we don't * have threads pile up on this log resulting in multiple successive * rolls - * + *

* Synchronization required since both synchronized and unsynchronized * methods call this method, and this method acquires only a * read lock. The synchronization guarantees that multiple threads don't @@ -948,7 +964,7 @@ public class Log { * @throws IOException */ private synchronized void roll(int index, ByteBuffer buffer) - throws IOException { + throws IOException { lockShared(); try { @@ -956,17 +972,17 @@ public class Log { // check to make sure a roll is actually required due to // the possibility of multiple writes waiting on lock if (oldLogFile == null || buffer == null || - oldLogFile.isRollRequired(buffer)) { + oldLogFile.isRollRequired(buffer)) { try { LOGGER.info("Roll start " + logDirs[index]); int fileID = nextFileID.incrementAndGet(); File file = new File(logDirs[index], PREFIX + fileID); LogFile.Writer writer = LogFileFactory.getWriter(file, fileID, - maxFileSize, encryptionKey, encryptionKeyAlias, - encryptionCipherProvider, usableSpaceRefreshInterval, - fsyncPerTransaction, fsyncInterval); + maxFileSize, encryptionKey, encryptionKeyAlias, + encryptionCipherProvider, usableSpaceRefreshInterval, + fsyncPerTransaction, fsyncInterval); idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file, - encryptionKeyProvider, fsyncPerTransaction)); + encryptionKeyProvider, fsyncPerTransaction)); // writer from this point on will get new reference logFiles.set(index, writer); // close out old log @@ -989,22 +1005,24 @@ public class Log { /** * Write the current checkpoint object and then swap objects so that * the next checkpoint occurs on the other checkpoint directory. - * + *

* Synchronization is not required because this method acquires a * write lock. So this method gets exclusive access to all the * data structures this method accesses. - * @param force a flag to force the writing of checkpoint + * + * @param force a flag to force the writing of checkpoint * @throws IOException if we are unable to write the checkpoint out to disk */ private Boolean writeCheckpoint(Boolean force) throws Exception { boolean checkpointCompleted = false; long usableSpace = checkpointDir.getUsableSpace(); - if(usableSpace <= minimumRequiredSpace) { + if (usableSpace <= minimumRequiredSpace) { throw new IOException("Usable space exhausted, only " + usableSpace + " bytes remaining, required " + minimumRequiredSpace + " bytes"); } lockExclusive(); - SortedSet logFileRefCountsAll = null, logFileRefCountsActive = null; + SortedSet logFileRefCountsAll = null; + SortedSet logFileRefCountsActive = null; try { if (queue.checkpoint(force)) { long logWriteOrderID = queue.getLogWriteOrderID(); @@ -1048,7 +1066,7 @@ public class Log { writer.markCheckpoint(logWriteOrderID); } finally { reader = LogFileFactory.getRandomReader(file, - encryptionKeyProvider, fsyncPerTransaction); + encryptionKeyProvider, fsyncPerTransaction); idLogFileMap.put(id, reader); writer.close(); } @@ -1058,7 +1076,7 @@ public class Log { idIterator.remove(); } Preconditions.checkState(logFileRefCountsAll.size() == 0, - "Could not update all data file timestamps: " + logFileRefCountsAll); + "Could not update all data file timestamps: " + logFileRefCountsAll); //Add files from all log directories for (int index = 0; index < logDirs.length; index++) { logFileRefCountsActive.add(logFiles.get(index).getLogFileID()); @@ -1086,7 +1104,7 @@ public class Log { // these files) and delete them only after the next (since the current // checkpoint will become the backup at that time, // and thus these files are no longer needed). - for(File fileToDelete : pendingDeletes) { + for (File fileToDelete : pendingDeletes) { LOGGER.info("Removing old file: " + fileToDelete); FileUtils.deleteQuietly(fileToDelete); } @@ -1095,7 +1113,7 @@ public class Log { // won't delete any files with an id larger than the min int minFileID = fileIDs.first(); LOGGER.debug("Files currently in use: " + fileIDs); - for(File logDir : logDirs) { + for (File logDir : logDirs) { List logs = LogUtils.getLogs(logDir); // sort oldset to newest LogUtils.sort(logs); @@ -1104,9 +1122,9 @@ public class Log { for (int index = 0; index < size; index++) { File logFile = logs.get(index); int logFileID = LogUtils.getIDForFile(logFile); - if(logFileID < minFileID) { + if (logFileID < minFileID) { LogFile.RandomReader reader = idLogFileMap.remove(logFileID); - if(reader != null) { + if (reader != null) { reader.close(); } File metaDataFile = Serialization.getMetaDataFile(logFile); @@ -1116,12 +1134,13 @@ public class Log { } } } + /** * Lock storage to provide exclusive access. - * + *

*

Locking is not supported by all file systems. * E.g., NFS does not consistently support exclusive locks. - * + *

*

If locking is supported we guarantee exculsive access to the * storage directory. Otherwise, no guarantee is given. * @@ -1137,8 +1156,8 @@ public class Log { throw new IOException(msg); } FileLock secondLock = tryLock(dir); - if(secondLock != null) { - LOGGER.warn("Directory "+dir+" does not support locking"); + if (secondLock != null) { + LOGGER.warn("Directory " + dir + " does not support locking"); secondLock.release(); secondLock.channel().close(); } @@ -1160,10 +1179,10 @@ public class Log { FileLock res = null; try { res = file.getChannel().tryLock(); - } catch(OverlappingFileLockException oe) { + } catch (OverlappingFileLockException oe) { file.close(); return null; - } catch(IOException e) { + } catch (IOException e) { LOGGER.error("Cannot create lock on " + lockF, e); file.close(); throw e; @@ -1178,13 +1197,14 @@ public class Log { */ private void unlock(File dir) throws IOException { FileLock lock = locks.remove(dir.getAbsolutePath()); - if(lock == null) { + if (lock == null) { return; } lock.release(); lock.channel().close(); lock = null; } + static class BackgroundWorker implements Runnable { private static final Logger LOG = LoggerFactory .getLogger(BackgroundWorker.class);