Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB5A9DEF0 for ; Thu, 20 Dec 2012 08:15:21 +0000 (UTC) Received: (qmail 33872 invoked by uid 500); 20 Dec 2012 08:15:21 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 33567 invoked by uid 500); 20 Dec 2012 08:15:21 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 33187 invoked by uid 99); 20 Dec 2012 08:15:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Dec 2012 08:15:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9138D3248D7; Thu, 20 Dec 2012 08:15:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [10/11] git commit: FLUME-1762: File Channel should recover automatically if the checkpoint is incomplete or bad by deleting the contents of the checkpoint directory Message-Id: <20121220081520.9138D3248D7@tyr.zones.apache.org> Date: Thu, 20 Dec 2012 08:15:20 +0000 (UTC) FLUME-1762: File Channel should recover automatically if the checkpoint is incomplete or bad by deleting the contents of the checkpoint directory (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/38e67d5a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/38e67d5a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/38e67d5a Branch: refs/heads/flume-1.3.1 Commit: 38e67d5acfec3f534f7f33b275967b0adb0266ca Parents: 019358d Author: Brock Noland Authored: Mon Dec 10 13:38:23 2012 -0600 Committer: Hari Shreedharan Committed: Thu Dec 20 00:12:26 2012 -0800 ---------------------------------------------------------------------- .../flume/channel/file/BadCheckpointException.java | 36 +++ .../file/EventQueueBackingStoreFactory.java | 25 +- .../channel/file/EventQueueBackingStoreFile.java | 21 +- .../channel/file/EventQueueBackingStoreFileV2.java | 2 +- .../channel/file/EventQueueBackingStoreFileV3.java | 29 ++- .../apache/flume/channel/file/FlumeEventQueue.java | 11 +- .../java/org/apache/flume/channel/file/Log.java | 76 ++++-- .../apache/flume/channel/file/Serialization.java | 32 ++ .../file/TestEventQueueBackingStoreFactory.java | 195 +++++++++++-- .../flume/channel/file/TestFileChannelRestart.java | 228 ++++++++++++++- .../flume/channel/file/TestFlumeEventQueue.java | 74 +++++- 11 files changed, 633 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java new file mode 100644 index 0000000..588506a --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/BadCheckpointException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flume.channel.file; + +import org.apache.flume.FlumeException; + +/** + * Exception thrown when the checkpoint directory contains invalid data, + * probably due to the channel stopping while the checkpoint was written. + */ +public class BadCheckpointException extends FlumeException{ + private static final long serialVersionUID = -5038652693746472779L; + + public BadCheckpointException(String msg) { + super(msg); + } + public BadCheckpointException(String msg, Throwable t) { + super(msg, t); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java index 6c07152..faec50b 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java @@ -44,15 +44,16 @@ class EventQueueBackingStoreFactory { boolean metaDataExists = metaDataFile.exists(); if(metaDataExists) { // if we have a metadata file but no checkpoint file, we have a problem + // delete everything in the checkpoint directory and force + // a full replay. if(!checkpointExists || checkpointFile.length() == 0) { - LOG.error("MetaData file for checkpoint " + - " exists but checkpoint does not. Checkpoint = " + checkpointFile + - ", metaDataFile = " + metaDataFile); - throw new IllegalStateException( - "The last checkpoint was not completed correctly. Please delete " - + "the checkpoint files: " + checkpointFile + " and " - + Serialization.getMetaDataFile(checkpointFile) - + " to rebuild the checkpoint and start again. " + name); + LOG.warn("MetaData file for checkpoint " + + " exists but checkpoint does not. Checkpoint = " + checkpointFile + + ", metaDataFile = " + metaDataFile); + throw new BadCheckpointException( + "The last checkpoint was not completed correctly. " + + "Please delete all files in the checkpoint directory: " + + checkpointFile.getParentFile()); } } // brand new, use v3 @@ -76,11 +77,8 @@ class EventQueueBackingStoreFactory { } LOG.error("Found version " + Integer.toHexString(version) + " in " + checkpointFile); - throw new IllegalStateException( - "The last checkpoint was not completed correctly. Please delete " - + "the checkpoint files: " + checkpointFile + " and " - + Serialization.getMetaDataFile(checkpointFile) - + " to rebuild the checkpoint and start again. " + name); + throw new BadCheckpointException("Checkpoint file exists with " + + Serialization.VERSION_3 + " but no metadata file found."); } finally { if(checkpointFileHandle != null) { try { @@ -107,4 +105,5 @@ class EventQueueBackingStoreFactory { metaDataFile); return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name); } + } http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java index 5eaf8c2..186b15a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -58,7 +58,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { protected final File checkpointFile; protected EventQueueBackingStoreFile(int capacity, String name, - File checkpointFile) throws IOException { + File checkpointFile) throws IOException, BadCheckpointException { super(capacity, name); this.checkpointFile = checkpointFile; checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw"); @@ -77,23 +77,24 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { ((checkpointFile.length() / Serialization.SIZE_OF_LONG) - HEADER_SIZE) + ". See FileChannel documentation on how to change a channels" + " capacity."; - throw new IllegalStateException(msg); + throw new BadCheckpointException(msg); } mappedBuffer = checkpointFileHandle.getChannel().map(MapMode.READ_WRITE, 0, checkpointFile.length()); elementsBuffer = mappedBuffer.asLongBuffer(); int version = (int) elementsBuffer.get(INDEX_VERSION); - Preconditions.checkState(version == getVersion(), - "Invalid version: " + version + " " + name + ", expected " + getVersion()); - + if(version != getVersion()) { + throw new BadCheckpointException("Invalid version: " + version + " " + + name + ", expected " + getVersion()); + } long checkpointComplete = (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER); - Preconditions.checkState(checkpointComplete == CHECKPOINT_COMPLETE, - "The last checkpoint was not completed correctly. Please delete " - + "the checkpoint files: " + checkpointFile + " and " - + Serialization.getMetaDataFile(checkpointFile) - + " to rebuild the checkpoint and start again. " + name); + if(checkpointComplete != CHECKPOINT_COMPLETE) { + throw new BadCheckpointException("Checkpoint was not completed correctly," + + " probably because the agent stopped while the channel was" + + " checkpointing."); + } } protected long getCheckpointLogWriteOrderID() { http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java index 8bbc081..abd2ea3 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java @@ -35,7 +35,7 @@ final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile { private static final int MAX_ACTIVE_LOGS = 1024; EventQueueBackingStoreFileV2(File checkpointFile, int capacity, String name) - throws IOException { + throws IOException, BadCheckpointException { super(capacity, name, checkpointFile); Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0 " + capacity); http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java index c24f89f..451a9d4 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.protobuf.InvalidProtocolBufferException; final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { private static final Logger LOG = LoggerFactory @@ -38,7 +39,7 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { private final File metaDataFile; EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name) - throws IOException { + throws IOException, BadCheckpointException { super(capacity, name, checkpointFile); Preconditions.checkArgument(capacity > 0, "capacity must be greater than 0 " + capacity); @@ -50,20 +51,22 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { LOG.info("Reading checkpoint metadata from " + metaDataFile); ProtosFactory.Checkpoint checkpoint = ProtosFactory.Checkpoint.parseDelimitedFrom(inputStream); + if(checkpoint == null) { + throw new BadCheckpointException("The checkpoint metadata file does " + + "not exist or has zero length"); + } int version = checkpoint.getVersion(); - Preconditions.checkState(version == getVersion(), - "Invalid version: " + version + " " + name + ", expected " - + getVersion()); + if(version != getVersion()) { + throw new BadCheckpointException("Invalid version: " + version + + " " + name + ", expected " + getVersion()); + } long logWriteOrderID = checkpoint.getWriteOrderID(); if(logWriteOrderID != getCheckpointLogWriteOrderID()) { - LOG.error("Checkpoint and Meta files have differing " + + String msg = "Checkpoint and Meta files have differing " + "logWriteOrderIDs " + getCheckpointLogWriteOrderID() + ", and " - + logWriteOrderID); - throw new IllegalStateException( - "The last checkpoint was not completed correctly. Please delete " - + "the checkpoint files: " + checkpointFile + " and " - + Serialization.getMetaDataFile(checkpointFile) - + " to rebuild the checkpoint and start again. " + name); + + logWriteOrderID; + LOG.warn(msg); + throw new BadCheckpointException(msg); } WriteOrderOracle.setSeed(logWriteOrderID); setLogWriteOrderID(logWriteOrderID); @@ -74,6 +77,10 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { Integer count = activeLog.getCount(); logFileIDReferenceCounts.put(logFileID, new AtomicInteger(count)); } + } catch (InvalidProtocolBufferException ex) { + throw new BadCheckpointException("Checkpoint metadata file is invalid. " + + "The agent might have been stopped while it was being " + + "written", ex); } finally { try { inputStream.close(); http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index 36553c5..74a2bc8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -79,11 +79,13 @@ final class FlumeEventQueue { } } - SetMultimap deserializeInflightPuts() throws IOException{ + SetMultimap deserializeInflightPuts() + throws IOException, BadCheckpointException{ return inflightPuts.deserialize(); } - SetMultimap deserializeInflightTakes() throws IOException{ + SetMultimap deserializeInflightTakes() + throws IOException, BadCheckpointException{ return inflightTakes.deserialize(); } @@ -467,7 +469,8 @@ final class FlumeEventQueue { * @return - map of inflight events per txnID. * */ - public SetMultimap deserialize() throws IOException { + public SetMultimap deserialize() + throws IOException, BadCheckpointException { SetMultimap inflights = HashMultimap.create(); if (!fileChannel.isOpen()) { file = new RandomAccessFile(inflightEventsFile, "rw"); @@ -484,7 +487,7 @@ final class FlumeEventQueue { fileChannel.read(buffer); byte[] fileChecksum = digest.digest(buffer.array()); if (!Arrays.equals(checksum, fileChecksum)) { - throw new IllegalStateException("Checksum of inflights file differs" + throw new BadCheckpointException("Checksum of inflights file differs" + " from the checksum expected."); } buffer.position(0); http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 6d1cf51..ea98e5d 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -336,33 +336,40 @@ class Log { } File inflightTakesFile = new File(checkpointDir, "inflighttakes"); File inflightPutsFile = new File(checkpointDir, "inflightputs"); - EventQueueBackingStore backingStore = - EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity, - channelNameDescriptor); - queue = new FlumeEventQueue(backingStore, inflightTakesFile, - inflightPutsFile); - LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified()) - + ", queue depth = " + queue.getSize()); + EventQueueBackingStore backingStore = null; - /* - * We now have everything we need to actually replay the log files - * the queue, the timestamp the queue was written to disk, and - * the list of data files. - */ - CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles, - queue); - if(useFastReplay && rebuilder.rebuild()) { - LOGGER.info("Fast replay successful."); - } else { - ReplayHandler replayHandler = new ReplayHandler(queue, - encryptionKeyProvider); - if(useLogReplayV1) { - LOGGER.info("Replaying logs with v1 replay logic"); - replayHandler.replayLogv1(dataFiles); - } else { - LOGGER.info("Replaying logs with v2 replay logic"); - replayHandler.replayLog(dataFiles); + + try { + backingStore = + EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity, + channelNameDescriptor); + queue = new FlumeEventQueue(backingStore, inflightTakesFile, + inflightPutsFile); + LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified()) + + ", queue depth = " + queue.getSize()); + + /* + * We now have everything we need to actually replay the log files + * the queue, the timestamp the queue was written to disk, and + * the list of data files. + * + * This will throw if and only if checkpoint file was fine, + * but the inflights were not. If the checkpoint was bad, the backing + * store factory would have thrown. + */ + doReplay(queue, dataFiles, encryptionKeyProvider); + } catch (BadCheckpointException ex) { + LOGGER.warn("Checkpoint may not have completed successfully. " + + "Forcing full replay, this may take a while.", ex); + if(!Serialization.deleteAllFiles(checkpointDir)) { + throw new IOException("Could not delete files in checkpoint " + + "directory to recover from a corrupt or incomplete checkpoint"); } + backingStore = EventQueueBackingStoreFactory.get(checkpointFile, + queueCapacity, channelNameDescriptor); + queue = new FlumeEventQueue(backingStore, inflightTakesFile, + inflightPutsFile); + doReplay(queue, dataFiles, encryptionKeyProvider); } @@ -388,6 +395,25 @@ class Log { } } + private void doReplay(FlumeEventQueue queue, List dataFiles, + KeyProvider encryptionKeyProvider) throws Exception { + CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles, + queue); + if (useFastReplay && rebuilder.rebuild()) { + LOGGER.info("Fast replay successful."); + } else { + ReplayHandler replayHandler = new ReplayHandler(queue, + encryptionKeyProvider); + if (useLogReplayV1) { + LOGGER.info("Replaying logs with v1 replay logic"); + replayHandler.replayLogv1(dataFiles); + } else { + LOGGER.info("Replaying logs with v2 replay logic"); + replayHandler.replayLog(dataFiles); + } + } + } + int getNextFileID() { Preconditions.checkState(open, "Log is closed"); return nextFileID.get(); http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java index ef8cf72..7094d3c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java @@ -18,6 +18,10 @@ */ package org.apache.flume.channel.file; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; class Serialization { @@ -34,6 +38,8 @@ class Serialization { static final String METADATA_TMP_FILENAME = ".tmp"; static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old"; + public static final Logger LOG = LoggerFactory.getLogger(Serialization.class); + static File getMetaDataTempFile(File metaDataFile) { String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME; return new File(metaDataFile.getParentFile(), metaDataFileName); @@ -50,4 +56,30 @@ class Serialization { String oldMetaDataFileName = file.getName() + OLD_METADATA_FILENAME; return new File(file.getParentFile(), oldMetaDataFileName); } + + /** + * Deletes all files in given directory. + * @param checkpointDir - The directory whose files are to be deleted + * @return - true if all files were successfully deleted, false otherwise. + */ + static boolean deleteAllFiles(File checkpointDir) { + if (!checkpointDir.isDirectory()) { + return false; + } + StringBuilder builder = new StringBuilder("Deleted the following files from" + + " the checkpoint directory: "); + File[] files = checkpointDir.listFiles(); + for (File file : files) { + if (!FileUtils.deleteQuietly(file)) { + LOG.info(builder.toString()); + LOG.error("Error while attempting to delete: " + + file.getName()); + return false; + } + builder.append(", ").append(file.getName()); + } + builder.append("."); + LOG.info(builder.toString()); + return true; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java index b1a55be..dfb3bf9 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java @@ -35,6 +35,11 @@ import org.junit.Test; import com.google.common.collect.Lists; import com.google.common.io.Files; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.FileOutputStream; +import java.io.RandomAccessFile; +import java.util.Random; +import org.apache.flume.channel.file.proto.ProtosFactory; public class TestEventQueueBackingStoreFactory { static final List pointersInTestCheckpoint = Arrays.asList(new Long[] { @@ -81,38 +86,26 @@ public class TestEventQueueBackingStoreFactory { verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false), Serialization.VERSION_2, pointersInTestCheckpoint); } - @Test + @Test (expected = BadCheckpointException.class) public void testDecreaseCapacity() throws Exception { Assert.assertTrue(checkpoint.delete()); EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + get(checkpoint, 10, "test"); backingStore.close(); - try { - EventQueueBackingStoreFactory.get(checkpoint, 9, "test"); - Assert.fail(); - } catch (IllegalStateException e) { - String expected = "Configured capacity is 9 but the checkpoint file " + - "capacity is 10. See FileChannel documentation on how to change " + - "a channels capacity."; - Assert.assertEquals(expected, e.getMessage()); - } + EventQueueBackingStoreFactory.get(checkpoint, 9, "test"); + Assert.fail(); } - @Test + + @Test (expected = BadCheckpointException.class) public void testIncreaseCapacity() throws Exception { Assert.assertTrue(checkpoint.delete()); EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. - get(checkpoint, 10, "test"); + get(checkpoint, 10, "test"); backingStore.close(); - try { - EventQueueBackingStoreFactory.get(checkpoint, 11, "test"); - Assert.fail(); - } catch (IllegalStateException e) { - String expected = "Configured capacity is 11 but the checkpoint file " + - "capacity is 10. See FileChannel documentation on how to change " + - "a channels capacity."; - Assert.assertEquals(expected, e.getMessage()); - } + EventQueueBackingStoreFactory.get(checkpoint, 11, "test"); + Assert.fail(); } + @Test public void testNewCheckpoint() throws Exception { Assert.assertTrue(checkpoint.delete()); @@ -120,6 +113,164 @@ public class TestEventQueueBackingStoreFactory { Serialization.VERSION_3, Collections.emptyList()); } + @Test (expected = BadCheckpointException.class) + public void testCheckpointBadVersion() throws Exception { + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + try { + EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. + get(checkpoint, 10, "test"); + backingStore.close(); + writer.seek( + EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG); + writer.writeLong(94L); + writer.getFD().sync(); + + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + } finally { + writer.close(); + } + } + + @Test(expected = BadCheckpointException.class) + public void testIncompleteCheckpoint() throws Exception { + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + + try { + EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. + get(checkpoint, 10, "test"); + backingStore.close(); + writer.seek( + EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * + Serialization.SIZE_OF_LONG); + writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE); + writer.getFD().sync(); + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + } finally { + writer.close(); + } + } + + @Test(expected = BadCheckpointException.class) + public void testCheckpointVersionNotEqualToMeta() throws Exception { + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + try { + EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. + get(checkpoint, 10, "test"); + backingStore.close(); + writer.seek( + EventQueueBackingStoreFile.INDEX_VERSION + * Serialization.SIZE_OF_LONG); + writer.writeLong(2L); + writer.getFD().sync(); + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + } finally { + writer.close(); + } + } + + @Test(expected = BadCheckpointException.class) + public void testCheckpointVersionNotEqualToMeta2() throws Exception { + FileOutputStream os = null; + try { + EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. + get(checkpoint, 10, "test"); + backingStore.close(); + Assert.assertTrue(checkpoint.exists()); + Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0); + FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint)); + ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is); + Assert.assertNotNull(meta); + is.close(); + os = new FileOutputStream( + Serialization.getMetaDataFile(checkpoint)); + meta.toBuilder().setVersion(2).build().writeDelimitedTo(os); + os.flush(); + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + } finally { + os.close(); + } + } + + @Test(expected = BadCheckpointException.class) + public void testCheckpointOrderIdNotEqualToMeta() throws Exception { + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + try { + EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. + get(checkpoint, 10, "test"); + backingStore.close(); + writer.seek( + EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID + * Serialization.SIZE_OF_LONG); + writer.writeLong(2L); + writer.getFD().sync(); + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + } finally { + writer.close(); + } + } + + @Test(expected = BadCheckpointException.class) + public void testCheckpointOrderIdNotEqualToMeta2() throws Exception { + FileOutputStream os = null; + try { + EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. + get(checkpoint, 10, "test"); + backingStore.close(); + Assert.assertTrue(checkpoint.exists()); + Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0); + FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint)); + ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is); + Assert.assertNotNull(meta); + is.close(); + os = new FileOutputStream( + Serialization.getMetaDataFile(checkpoint)); + meta.toBuilder().setWriteOrderID(1).build().writeDelimitedTo(os); + os.flush(); + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + } finally { + os.close(); + } + } + + + @Test(expected = BadCheckpointException.class) + public void testTruncateMeta() throws Exception { + EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. + get(checkpoint, 10, "test"); + backingStore.close(); + Assert.assertTrue(checkpoint.exists()); + File metaFile = Serialization.getMetaDataFile(checkpoint); + Assert.assertTrue(metaFile.length() != 0); + RandomAccessFile writer = new RandomAccessFile(metaFile, "rw"); + writer.setLength(0); + writer.getFD().sync(); + writer.close(); + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + } + + @Test (expected = InvalidProtocolBufferException.class) + public void testCorruptMeta() throws Throwable { + EventQueueBackingStore backingStore = EventQueueBackingStoreFactory. + get(checkpoint, 10, "test"); + backingStore.close(); + Assert.assertTrue(checkpoint.exists()); + File metaFile = Serialization.getMetaDataFile(checkpoint); + Assert.assertTrue(metaFile.length() != 0); + RandomAccessFile writer = new RandomAccessFile(metaFile, "rw"); + writer.seek(10); + writer.writeLong(new Random().nextLong()); + writer.getFD().sync(); + writer.close(); + try { + backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test"); + } catch (BadCheckpointException ex) { + throw ex.getCause(); + } + } + + + + private void verify(EventQueueBackingStore backingStore, long expectedVersion, List expectedPointers) throws Exception { http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index 3f90805..f548f31 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -32,6 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.RandomAccessFile; +import java.util.Random; +import org.apache.flume.channel.file.proto.ProtosFactory; public class TestFileChannelRestart extends TestFileChannelBase { protected static final Logger LOG = LoggerFactory @@ -115,13 +120,14 @@ public class TestFileChannelRestart extends TestFileChannelBase { compareInputAndOut(in, out); } @Test - public void testRestartFailsWhenMetaDataExistsButCheckpointDoesNot() + public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws Exception { Map overrides = Maps.newHashMap(); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Assert.assertEquals(1, putEvents(channel, "restart", 1, 1).size()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); forceCheckpoint(channel); channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); @@ -130,16 +136,21 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(checkpointMetaData.exists()); channel = createFileChannel(overrides); channel.start(); - Assert.assertFalse(channel.isOpen()); + Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(checkpoint.exists()); + Assert.assertTrue(checkpointMetaData.exists()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); } @Test - public void testRestartFailsWhenCheckpointExistsButMetaDoesNot() + public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception { Map overrides = Maps.newHashMap(); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); - Assert.assertEquals(1, putEvents(channel, "restart", 1, 1).size()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); forceCheckpoint(channel); channel.stop(); File checkpoint = new File(checkpointDir, "checkpoint"); @@ -148,8 +159,213 @@ public class TestFileChannelRestart extends TestFileChannelBase { Assert.assertTrue(checkpoint.exists()); channel = createFileChannel(overrides); channel.start(); - Assert.assertFalse(channel.isOpen()); + Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(checkpoint.exists()); + Assert.assertTrue(checkpointMetaData.exists()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); + } + + @Test + public void testRestartWhenNoCheckpointExists() throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + File checkpointMetaData = Serialization.getMetaDataFile(checkpoint); + Assert.assertTrue(checkpointMetaData.delete()); + Assert.assertTrue(checkpoint.delete()); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Assert.assertTrue(checkpoint.exists()); + Assert.assertTrue(checkpointMetaData.exists()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); } + + @Test + public void testBadCheckpointVersion() throws Exception{ + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * + Serialization.SIZE_OF_LONG); + writer.writeLong(2L); + writer.getFD().sync(); + writer.close(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); + } + + @Test + public void testBadCheckpointMetaVersion() throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint)); + ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is); + Assert.assertNotNull(meta); + is.close(); + FileOutputStream os = new FileOutputStream( + Serialization.getMetaDataFile(checkpoint)); + meta.toBuilder().setVersion(2).build().writeDelimitedTo(os); + os.flush(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); + } + + @Test + public void testDifferingOrderIDCheckpointAndMetaVersion() throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint)); + ProtosFactory.Checkpoint meta = ProtosFactory.Checkpoint.parseDelimitedFrom(is); + Assert.assertNotNull(meta); + is.close(); + FileOutputStream os = new FileOutputStream( + Serialization.getMetaDataFile(checkpoint)); + meta.toBuilder().setWriteOrderID(12).build().writeDelimitedTo(os); + os.flush(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); + } + + @Test + public void testIncompleteCheckpoint() throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw"); + writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER + * Serialization.SIZE_OF_LONG); + writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE); + writer.getFD().sync(); + writer.close(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); + } + + @Test + public void testCorruptInflightPuts() throws Exception { + testCorruptInflights("inflightPuts"); + } + + @Test + public void testCorruptInflightTakes() throws Exception { + testCorruptInflights("inflightTakes"); + } + + private void testCorruptInflights(String name) throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File inflight = new File(checkpointDir, name); + RandomAccessFile writer = new RandomAccessFile(inflight, "rw"); + writer.write(new Random().nextInt()); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); + } + + @Test + public void testTruncatedCheckpointMeta() throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + RandomAccessFile writer = new RandomAccessFile( + Serialization.getMetaDataFile(checkpoint), "rw"); + writer.setLength(0); + writer.getFD().sync(); + writer.close(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); + } + + @Test + public void testCorruptCheckpointMeta() throws Exception { + Map overrides = Maps.newHashMap(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set in = putEvents(channel, "restart", 10, 100); + Assert.assertEquals(100, in.size()); + forceCheckpoint(channel); + channel.stop(); + File checkpoint = new File(checkpointDir, "checkpoint"); + RandomAccessFile writer = new RandomAccessFile( + Serialization.getMetaDataFile(checkpoint), "rw"); + writer.seek(10); + writer.writeLong(new Random().nextLong()); + writer.getFD().sync(); + writer.close(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set out = consumeChannel(channel); + compareInputAndOut(in, out); + } + + @Test public void testWithExtraLogs() throws Exception { http://git-wip-us.apache.org/repos/asf/flume/blob/38e67d5a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java index 0173390..203cbf2 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java @@ -38,6 +38,7 @@ import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import com.google.common.io.Files; +import java.io.RandomAccessFile; @RunWith(value = Parameterized.class) public class TestFlumeEventQueue { @@ -74,11 +75,11 @@ public class TestFlumeEventQueue { } @Parameters - public static Collection data() throws IOException { + public static Collection data() throws Exception { Object[][] data = new Object[][] { { new EventQueueBackingStoreSupplier() { @Override - public EventQueueBackingStore get() throws IOException { + public EventQueueBackingStore get() throws Exception { Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000, "test"); @@ -87,7 +88,7 @@ public class TestFlumeEventQueue { }, { new EventQueueBackingStoreSupplier() { @Override - public EventQueueBackingStore get() throws IOException { + public EventQueueBackingStore get() throws Exception { Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs()); return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, "test"); @@ -345,5 +346,70 @@ public class TestFlumeEventQueue { txnID2).contains(new FlumeEventPointer(2, 2).toLong())); } -} + @Test(expected = BadCheckpointException.class) + public void testCorruptInflightPuts() throws Exception { + RandomAccessFile inflight = null; + try { + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); + long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); + long txnID2 = txnID1 + 1; + queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1); + queue.addWithoutCommit(new FlumeEventPointer(2, 1), txnID1); + queue.addWithoutCommit(new FlumeEventPointer(2, 2), txnID2); + queue.checkpoint(true); + TimeUnit.SECONDS.sleep(3L); + inflight = new RandomAccessFile( + backingStoreSupplier.getInflightPuts(), "rw"); + inflight.seek(0); + inflight.writeInt(new Random().nextInt()); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); + SetMultimap deserializedMap = queue.deserializeInflightPuts(); + Assert.assertTrue(deserializedMap.get( + txnID1).contains(new FlumeEventPointer(1, 1).toLong())); + Assert.assertTrue(deserializedMap.get( + txnID1).contains(new FlumeEventPointer(2, 1).toLong())); + Assert.assertTrue(deserializedMap.get( + txnID2).contains(new FlumeEventPointer(2, 2).toLong())); + } finally { + inflight.close(); + } + } + + @Test(expected = BadCheckpointException.class) + public void testCorruptInflightTakes() throws Exception { + RandomAccessFile inflight = null; + try { + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); + long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); + long txnID2 = txnID1 + 1; + queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1); + queue.addWithoutCommit(new FlumeEventPointer(2, 1), txnID1); + queue.addWithoutCommit(new FlumeEventPointer(2, 2), txnID2); + queue.checkpoint(true); + TimeUnit.SECONDS.sleep(3L); + inflight = new RandomAccessFile( + backingStoreSupplier.getInflightTakes(), "rw"); + inflight.seek(0); + inflight.writeInt(new Random().nextInt()); + queue = new FlumeEventQueue(backingStore, + backingStoreSupplier.getInflightTakes(), + backingStoreSupplier.getInflightPuts()); + SetMultimap deserializedMap = queue.deserializeInflightTakes(); + Assert.assertTrue(deserializedMap.get( + txnID1).contains(new FlumeEventPointer(1, 1).toLong())); + Assert.assertTrue(deserializedMap.get( + txnID1).contains(new FlumeEventPointer(2, 1).toLong())); + Assert.assertTrue(deserializedMap.get( + txnID2).contains(new FlumeEventPointer(2, 2).toLong())); + } finally { + inflight.close(); + } + } +}