Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C116DD459 for ; Fri, 7 Sep 2012 21:52:59 +0000 (UTC) Received: (qmail 34872 invoked by uid 500); 7 Sep 2012 21:52:59 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 34810 invoked by uid 500); 7 Sep 2012 21:52:59 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 34799 invoked by uid 99); 7 Sep 2012 21:52:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Sep 2012 21:52:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5F68028387; Fri, 7 Sep 2012 21:52:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: FLUME-1487. FileChannel format needs to be extensible. Message-Id: <20120907215259.5F68028387@tyr.zones.apache.org> Date: Fri, 7 Sep 2012 21:52:59 +0000 (UTC) Updated Branches: refs/heads/flume-1.3.0 752f70f4f -> 810dfe282 FLUME-1487. FileChannel format needs to be extensible. (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/810dfe28 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/810dfe28 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/810dfe28 Branch: refs/heads/flume-1.3.0 Commit: 810dfe28262eaa882cdc9a251d4978196ac333ea Parents: 752f70f Author: Hari Shreedharan Authored: Fri Sep 7 14:51:16 2012 -0700 Committer: Hari Shreedharan Committed: Fri Sep 7 14:52:24 2012 -0700 ---------------------------------------------------------------------- flume-ng-channels/flume-file-channel/pom.xml | 71 +++ .../flume/channel/file/CheckpointRebuilder.java | 34 +- .../java/org/apache/flume/channel/file/Commit.java | 24 +- .../flume/channel/file/EventQueueBackingStore.java | 76 +++ .../file/EventQueueBackingStoreFactory.java | 110 ++++ .../channel/file/EventQueueBackingStoreFile.java | 278 ++++++++++ .../channel/file/EventQueueBackingStoreFileV2.java | 101 ++++ .../channel/file/EventQueueBackingStoreFileV3.java | 194 +++++++ .../flume/channel/file/FlumeEventPointer.java | 11 +- .../apache/flume/channel/file/FlumeEventQueue.java | 389 +++------------ .../java/org/apache/flume/channel/file/Log.java | 96 ++-- .../org/apache/flume/channel/file/LogFile.java | 309 ++++++------ .../apache/flume/channel/file/LogFileFactory.java | 120 +++++ .../org/apache/flume/channel/file/LogFileV2.java | 156 ++++++ .../org/apache/flume/channel/file/LogFileV3.java | 234 +++++++++ .../org/apache/flume/channel/file/LogUtils.java | 3 +- .../java/org/apache/flume/channel/file/Put.java | 48 ++- .../apache/flume/channel/file/ReplayHandler.java | 14 +- .../org/apache/flume/channel/file/Rollback.java | 20 +- .../apache/flume/channel/file/Serialization.java | 41 ++ .../java/org/apache/flume/channel/file/Take.java | 25 +- .../flume/channel/file/TransactionEventRecord.java | 105 +++- .../src/main/proto/filechannel.proto | 78 +++ .../apache/flume/channel/file/TestCheckpoint.java | 16 +- .../file/TestEventQueueBackingStoreFactory.java | 108 ++++ .../apache/flume/channel/file/TestFileChannel.java | 97 +++-- .../flume/channel/file/TestFlumeEventQueue.java | 183 ++++++-- .../org/apache/flume/channel/file/TestLog.java | 4 +- .../org/apache/flume/channel/file/TestLogFile.java | 34 +- .../apache/flume/channel/file/TestLogRecord.java | 7 +- .../channel/file/TestTransactionEventRecord.java | 145 ------ .../channel/file/TestTransactionEventRecordV2.java | 153 ++++++ .../channel/file/TestTransactionEventRecordV3.java | 140 ++++++ .../org/apache/flume/channel/file/TestUtils.java | 27 +- pom.xml | 7 + 35 files changed, 2632 insertions(+), 826 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/pom.xml b/flume-ng-channels/flume-file-channel/pom.xml index 62d80e3..8e5f871 100644 --- a/flume-ng-channels/flume-file-channel/pom.xml +++ b/flume-ng-channels/flume-file-channel/pom.xml @@ -37,6 +37,71 @@ org.apache.rat apache-rat-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.build.directory}/generated-sources/java + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + false + + + + compile-proto + generate-sources + + run + + + + + PROTO_DIR=src/main/proto + JAVA_DIR=target/generated-sources/java + which cygpath 2> /dev/null + if [ $? = 1 ]; then + IS_WIN=false + else + IS_WIN=true + WIN_PROTO_DIR=`cygpath --windows $PROTO_DIR` + WIN_JAVA_DIR=`cygpath --windows $JAVA_DIR` + fi + mkdir -p $JAVA_DIR 2> /dev/null + for PROTO_FILE in `ls $PROTO_DIR/*.proto 2> /dev/null` + do + if [ "$IS_WIN" = "true" ]; then + protoc -I$WIN_PROTO_DIR --java_out=$WIN_JAVA_DIR $PROTO_FILE + else + protoc -I$PROTO_DIR --java_out=$JAVA_DIR $PROTO_FILE + fi + done + + + + + + + + + + @@ -99,6 +164,12 @@ true + + com.google.protobuf + protobuf-java + compile + + http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java index 32b5324..1ba5d1c 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -40,7 +40,6 @@ public class CheckpointRebuilder { private final File checkpointDir; private final List logFiles; - private final long maxFileSize; private final FlumeEventQueue queue; private final Set committedPuts = Sets.newHashSet(); @@ -55,12 +54,10 @@ public class CheckpointRebuilder { LoggerFactory.getLogger(CheckpointRebuilder.class); public CheckpointRebuilder(File checkpointDir, List logFiles, - long maxFileSize, FlumeEventQueue queue) throws IOException { this.checkpointDir = checkpointDir; this.logFiles = logFiles; this.queue = queue; - this.maxFileSize = maxFileSize; } public boolean rebuild() throws IOException, Exception { @@ -72,7 +69,7 @@ public class CheckpointRebuilder { LOG.info("Attempting to fast replay the log files."); List logReaders = Lists.newArrayList(); for (File logFile : logFiles) { - logReaders.add(new LogFile.SequentialReader(logFile)); + logReaders.add(LogFileFactory.getSequentialReader(logFile)); } long transactionIDSeed = 0; long writeOrderIDSeed = 0; @@ -158,26 +155,25 @@ public class CheckpointRebuilder { private void writeCheckpoint() throws IOException { long checkpointLogOrderID = 0; - List logWriters = Lists.newArrayList(); + List metaDataWriters = Lists.newArrayList(); for (File logFile : logFiles) { String name = logFile.getName(); - logWriters.add(new LogFile.Writer(logFile, - Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)), - maxFileSize)); + metaDataWriters.add(LogFileFactory.getMetaDataWriter(logFile, + Integer.parseInt(name.substring(name.lastIndexOf('-') + 1)))); } try { if (queue.checkpoint(true)) { checkpointLogOrderID = queue.getLogWriteOrderID(); - for (LogFile.Writer logWriter : logWriters) { - logWriter.markCheckpoint(checkpointLogOrderID); + for (LogFile.MetaDataWriter metaDataWriter : metaDataWriters) { + metaDataWriter.markCheckpoint(checkpointLogOrderID); } } } catch (Exception e) { LOG.warn("Error while generating checkpoint " + "using fast generation logic", e); } finally { - for (LogFile.Writer logWriter : logWriters) { - logWriter.close(); + for (LogFile.MetaDataWriter metaDataWriter : metaDataWriters) { + metaDataWriter.close(); } } } @@ -233,8 +229,6 @@ public class CheckpointRebuilder { opt = new Option("l", true, "comma-separated list of log directories"); opt.setRequired(true); options.addOption(opt); - opt = new Option("s", true, "maximum size of log files"); - opt.setRequired(true); options.addOption(opt); opt = new Option("t", true, "capacity of the channel"); opt.setRequired(true); @@ -249,14 +243,14 @@ public class CheckpointRebuilder { logFiles.addAll(Arrays.asList(files)); } int capacity = Integer.parseInt(cli.getOptionValue("t")); - long maxFileSize = Long.parseLong(cli.getOptionValue("s")); - boolean isReplayV1 = cli.hasOption("v"); - FlumeEventQueue queue = new FlumeEventQueue(capacity, - new File(checkpointDir, "checkpoint"), + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(new File(checkpointDir, "checkpoint"), + capacity, "channel"); + FlumeEventQueue queue = new FlumeEventQueue(backingStore, new File(checkpointDir, "inflighttakes"), - new File(checkpointDir, "inflightputs"), "channel"); + new File(checkpointDir, "inflightputs")); CheckpointRebuilder rebuilder = new CheckpointRebuilder(checkpointDir, - logFiles, maxFileSize, queue); + logFiles, queue); if(rebuilder.rebuild()) { rebuilder.writeCheckpoint(); } else { http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java index 2c92d28..7d57bb8 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Commit.java @@ -21,6 +21,10 @@ package org.apache.flume.channel.file; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.flume.chanel.file.proto.ProtosFactory; /** * Represents a Commit on disk @@ -30,11 +34,11 @@ class Commit extends TransactionEventRecord { * Type of Commit Take|Put */ private short type; - Commit(Long transactionID) { - super(transactionID); + Commit(Long transactionID, Long logWriteOrderID) { + super(transactionID, logWriteOrderID); } - Commit(Long transactionID, short type) { - this(transactionID); + Commit(Long transactionID, Long logWriteOrderID, short type) { + this(transactionID, logWriteOrderID); this.type = type; } @Override @@ -42,6 +46,18 @@ class Commit extends TransactionEventRecord { super.readFields(in); type = in.readShort(); } + @Override + void writeProtos(OutputStream out) throws IOException { + ProtosFactory.Commit.Builder commitBuilder = + ProtosFactory.Commit.newBuilder(); + commitBuilder.setType(type); + commitBuilder.build().writeDelimitedTo(out); + } + @Override + void readProtos(InputStream in) throws IOException { + ProtosFactory.Commit commit = ProtosFactory.Commit.parseDelimitedFrom(in); + type = (short) commit.getType(); + } short getType() { return type; http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java new file mode 100644 index 0000000..13b50da --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import java.io.IOException; + +import com.google.common.collect.ImmutableSortedSet; + +abstract class EventQueueBackingStore { + protected static final int EMPTY = 0; + private int queueSize; + private int queueHead; + private long logWriteOrderID; + private final int capacity; + private final String name; + + protected EventQueueBackingStore(int capacity, String name) { + this.capacity = capacity; + this.name = name; + } + + + abstract void checkpoint() throws IOException; + abstract void incrementFileID(int fileID); + abstract void decrementFileID(int fileID); + abstract ImmutableSortedSet getReferenceCounts(); + abstract long get(int index); + abstract void put(int index, long value); + abstract boolean syncRequired(); + abstract void close() throws IOException; + + protected abstract int getVersion(); + + int getSize() { + return queueSize; + } + void setSize(int size) { + queueSize = size; + } + int getHead() { + return queueHead; + } + void setHead(int head) { + queueHead = head; + } + int getCapacity() { + return capacity; + } + + String getName() { + return name; + } + protected void setLogWriteOrderID(long logWriteOrderID) { + this.logWriteOrderID = logWriteOrderID; + } + long getLogWriteOrderID() { + return logWriteOrderID; + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java new file mode 100644 index 0000000..6c07152 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.io.Files; + +class EventQueueBackingStoreFactory { + private static final Logger LOG = LoggerFactory + .getLogger(EventQueueBackingStoreFactory.class); + private EventQueueBackingStoreFactory() {} + static EventQueueBackingStore get(File checkpointFile, int capacity, + String name) throws Exception { + return get(checkpointFile, capacity, name, true); + } + static EventQueueBackingStore get(File checkpointFile, int capacity, + String name, boolean upgrade) throws Exception { + File metaDataFile = Serialization.getMetaDataFile(checkpointFile); + RandomAccessFile checkpointFileHandle = null; + try { + boolean checkpointExists = checkpointFile.exists(); + boolean metaDataExists = metaDataFile.exists(); + if(metaDataExists) { + // if we have a metadata file but no checkpoint file, we have a problem + if(!checkpointExists || checkpointFile.length() == 0) { + LOG.error("MetaData file for checkpoint " + + " exists but checkpoint does not. Checkpoint = " + checkpointFile + + ", metaDataFile = " + metaDataFile); + throw new IllegalStateException( + "The last checkpoint was not completed correctly. Please delete " + + "the checkpoint files: " + checkpointFile + " and " + + Serialization.getMetaDataFile(checkpointFile) + + " to rebuild the checkpoint and start again. " + name); + } + } + // brand new, use v3 + if(!checkpointExists) { + if(!checkpointFile.createNewFile()) { + throw new IOException("Cannot create " + checkpointFile); + } + return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name); + } + // v3 due to meta file, version will be checked by backing store + if(metaDataExists) { + return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name); + } + checkpointFileHandle = new RandomAccessFile(checkpointFile, "r"); + int version = (int)checkpointFileHandle.readLong(); + if(Serialization.VERSION_2 == version) { + if(upgrade) { + return upgrade(checkpointFile, capacity, name); + } + return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name); + } + LOG.error("Found version " + Integer.toHexString(version) + " in " + + checkpointFile); + throw new IllegalStateException( + "The last checkpoint was not completed correctly. Please delete " + + "the checkpoint files: " + checkpointFile + " and " + + Serialization.getMetaDataFile(checkpointFile) + + " to rebuild the checkpoint and start again. " + name); + } finally { + if(checkpointFileHandle != null) { + try { + checkpointFileHandle.close(); + } catch(IOException e) { + LOG.warn("Unable to close " + checkpointFile, e); + } + } + } + } + + private static EventQueueBackingStore upgrade(File checkpointFile, + int capacity, String name) + throws Exception { + LOG.info("Attempting upgrade of " + checkpointFile + " for " + name); + EventQueueBackingStoreFileV2 backingStoreV2 = + new EventQueueBackingStoreFileV2(checkpointFile, capacity, name); + String backupName = checkpointFile.getName() + "-backup-" + + System.currentTimeMillis(); + Files.copy(checkpointFile, + new File(checkpointFile.getParentFile(), backupName)); + File metaDataFile = Serialization.getMetaDataFile(checkpointFile); + EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile, + metaDataFile); + return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java new file mode 100644 index 0000000..4717055 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.LongBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel.MapMode; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSortedSet; +import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; + + +abstract class EventQueueBackingStoreFile extends EventQueueBackingStore { + private static final Logger LOG = LoggerFactory + .getLogger(EventQueueBackingStoreFile.class); + private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB + protected static final int HEADER_SIZE = 1029; + protected static final int INDEX_VERSION = 0; + protected static final int INDEX_WRITE_ORDER_ID = 1; + protected static final int INDEX_CHECKPOINT_MARKER = 4; + protected static final int CHECKPOINT_COMPLETE = 0; + protected static final int CHECKPOINT_INCOMPLETE = 1; + + protected LongBuffer elementsBuffer; + protected final Map overwriteMap = new HashMap(); + protected final Map logFileIDReferenceCounts = Maps.newHashMap(); + protected final MappedByteBuffer mappedBuffer; + protected final RandomAccessFile checkpointFileHandle; + protected final File checkpointFile; + + protected EventQueueBackingStoreFile(int capacity, String name, + File checkpointFile) throws IOException { + super(capacity, name); + this.checkpointFile = checkpointFile; + checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw"); + + if(checkpointFileHandle.length() == 0) { + int totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG; + allocate(checkpointFile, totalBytes); + checkpointFileHandle.seek(INDEX_VERSION * Serialization.SIZE_OF_LONG); + checkpointFileHandle.writeLong(getVersion()); + checkpointFileHandle.getChannel().force(true); + LOG.info("Preallocated " + checkpointFile + " to " + checkpointFileHandle.length() + + " for capacity " + capacity); + } + mappedBuffer = checkpointFileHandle.getChannel().map(MapMode.READ_WRITE, 0, + checkpointFile.length()); + elementsBuffer = mappedBuffer.asLongBuffer(); + + int version = (int) elementsBuffer.get(INDEX_VERSION); + Preconditions.checkState(version == getVersion(), + "Invalid version: " + version + " " + name + ", expected " + getVersion()); + + long checkpointComplete = + (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER); + Preconditions.checkState(checkpointComplete == CHECKPOINT_COMPLETE, + "The last checkpoint was not completed correctly. Please delete " + + "the checkpoint files: " + checkpointFile + " and " + + Serialization.getMetaDataFile(checkpointFile) + + " to rebuild the checkpoint and start again. " + name); + } + + protected long getCheckpointLogWriteOrderID() { + return elementsBuffer.get(INDEX_WRITE_ORDER_ID); + } + + protected abstract void writeCheckpointMetaData() throws IOException; + + @Override + void checkpoint() throws IOException { + + LOG.info("Start checkpoint for " + checkpointFile + + ", elements to sync = " + overwriteMap.size()); + + // Start checkpoint + elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE); + + setLogWriteOrderID(WriteOrderOracle.next()); + LOG.info("Updating checkpoint metadata: logWriteOrderID: " + + getLogWriteOrderID() + ", queueSize: " + getSize() + ", queueHead: " + + getHead()); + elementsBuffer.put(INDEX_WRITE_ORDER_ID, getLogWriteOrderID()); + try { + writeCheckpointMetaData(); + } catch (IOException e) { + throw new IOException("Error writing metadata", e); + } + + Iterator it = overwriteMap.keySet().iterator(); + while (it.hasNext()) { + int index = it.next(); + long value = overwriteMap.get(index); + elementsBuffer.put(index, value); + it.remove(); + } + + Preconditions.checkState(overwriteMap.isEmpty(), + "concurrent update detected "); + + // Finish checkpoint + elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE); + mappedBuffer.force(); + } + + + @Override + void close() { + mappedBuffer.force(); + try { + checkpointFileHandle.close(); + } catch (IOException e) { + LOG.info("Error closing " + checkpointFile, e); + } + } + + @Override + long get(int index) { + int realIndex = getPhysicalIndex(index); + long result = EMPTY; + if (overwriteMap.containsKey(realIndex)) { + result = overwriteMap.get(realIndex); + } else { + result = elementsBuffer.get(realIndex); + } + return result; + } + + @Override + ImmutableSortedSet getReferenceCounts() { + return ImmutableSortedSet.copyOf(logFileIDReferenceCounts.keySet()); + } + + @Override + void put(int index, long value) { + int realIndex = getPhysicalIndex(index); + overwriteMap.put(realIndex, value); + } + + @Override + boolean syncRequired() { + return overwriteMap.size() > 0; + } + + @Override + protected void incrementFileID(int fileID) { + AtomicInteger counter = logFileIDReferenceCounts.get(fileID); + if(counter == null) { + counter = new AtomicInteger(0); + logFileIDReferenceCounts.put(fileID, counter); + } + counter.incrementAndGet(); + } + @Override + protected void decrementFileID(int fileID) { + AtomicInteger counter = logFileIDReferenceCounts.get(fileID); + Preconditions.checkState(counter != null, "null counter "); + int count = counter.decrementAndGet(); + if(count == 0) { + logFileIDReferenceCounts.remove(fileID); + } + } + + protected int getPhysicalIndex(int index) { + return HEADER_SIZE + (getHead() + index) % getCapacity(); + } + + protected static void allocate(File file, long totalBytes) throws IOException { + RandomAccessFile checkpointFile = new RandomAccessFile(file, "rw"); + boolean success = false; + try { + if (totalBytes <= MAX_ALLOC_BUFFER_SIZE) { + checkpointFile.write(new byte[(int)totalBytes]); + } else { + byte[] initBuffer = new byte[MAX_ALLOC_BUFFER_SIZE]; + long remainingBytes = totalBytes; + while (remainingBytes >= MAX_ALLOC_BUFFER_SIZE) { + checkpointFile.write(initBuffer); + remainingBytes -= MAX_ALLOC_BUFFER_SIZE; + } + if (remainingBytes > 0) { + checkpointFile.write(initBuffer, 0, (int)remainingBytes); + } + } + success = true; + } finally { + try { + checkpointFile.close(); + } catch (IOException e) { + if(success) { + throw e; + } + } + } + } + + public static void main(String[] args) throws Exception { + File file = new File(args[0]); + File inflightTakesFile = new File(args[1]); + File inflightPutsFile = new File(args[2]); + if (!file.exists()) { + throw new IOException("File " + file + " does not exist"); + } + if (file.length() == 0) { + throw new IOException("File " + file + " is empty"); + } + int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L); + EventQueueBackingStoreFile backingStore = (EventQueueBackingStoreFile) + EventQueueBackingStoreFactory.get(file,capacity, "debug", false); + System.out.println("File Reference Counts" + + backingStore.logFileIDReferenceCounts); + System.out.println("Queue Capacity " + backingStore.getCapacity()); + System.out.println("Queue Size " + backingStore.getSize()); + System.out.println("Queue Head " + backingStore.getHead()); + for (int index = 0; index < backingStore.getCapacity(); index++) { + long value = backingStore.get(backingStore.getPhysicalIndex(index)); + int fileID = (int) (value >>> 32); + int offset = (int) value; + System.out.println(index + ":" + Long.toHexString(value) + " fileID = " + + fileID + ", offset = " + offset); + } + FlumeEventQueue queue = + new FlumeEventQueue(backingStore, inflightTakesFile, inflightPutsFile); + SetMultimap putMap = queue.deserializeInflightPuts(); + System.out.println("Inflight Puts:"); + + for (Long txnID : putMap.keySet()) { + Set puts = putMap.get(txnID); + System.out.println("Transaction ID: " + String.valueOf(txnID)); + for (long value : puts) { + int fileID = (int) (value >>> 32); + int offset = (int) value; + System.out.println(Long.toHexString(value) + " fileID = " + + fileID + ", offset = " + offset); + } + } + SetMultimap takeMap = queue.deserializeInflightTakes(); + System.out.println("Inflight takes:"); + for (Long txnID : takeMap.keySet()) { + Set takes = takeMap.get(txnID); + System.out.println("Transaction ID: " + String.valueOf(txnID)); + for (long value : takes) { + int fileID = (int) (value >>> 32); + int offset = (int) value; + System.out.println(Long.toHexString(value) + " fileID = " + + fileID + ", offset = " + offset); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java new file mode 100644 index 0000000..8bbc081 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Preconditions; + +final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile { + + + private static final int INDEX_SIZE = 2; + private static final int INDEX_HEAD = 3; + private static final int INDEX_ACTIVE_LOG = 5; + private static final int MAX_ACTIVE_LOGS = 1024; + + EventQueueBackingStoreFileV2(File checkpointFile, int capacity, String name) + throws IOException { + super(capacity, name, checkpointFile); + Preconditions.checkArgument(capacity > 0, + "capacity must be greater than 0 " + capacity); + + setLogWriteOrderID(elementsBuffer.get(INDEX_WRITE_ORDER_ID)); + setSize((int) elementsBuffer.get(INDEX_SIZE)); + setHead((int) elementsBuffer.get(INDEX_HEAD)); + + int indexMaxLog = INDEX_ACTIVE_LOG + MAX_ACTIVE_LOGS; + for (int i = INDEX_ACTIVE_LOG; i < indexMaxLog; i++) { + long nextFileCode = elementsBuffer.get(i); + if (nextFileCode != EMPTY) { + Pair idAndCount = + deocodeActiveLogCounter(nextFileCode); + logFileIDReferenceCounts.put(idAndCount.getLeft(), + new AtomicInteger(idAndCount.getRight())); + } + } + } + @Override + protected int getVersion() { + return Serialization.VERSION_2; + } + + @Override + protected void incrementFileID(int fileID) { + super.incrementFileID(fileID); + Preconditions.checkState(logFileIDReferenceCounts.size() < MAX_ACTIVE_LOGS, + "Too many active logs "); + } + + + private Pair deocodeActiveLogCounter(long value) { + int fileId = (int) (value >>> 32); + int count = (int) value; + return Pair.of(fileId, count); + } + private long encodeActiveLogCounter(int fileId, int count) { + long result = fileId; + result = (long)fileId << 32; + result += (long) count; + return result; + } + @Override + protected void writeCheckpointMetaData() { + elementsBuffer.put(INDEX_SIZE, getSize()); + elementsBuffer.put(INDEX_HEAD, getHead()); + List fileIdAndCountEncoded = new ArrayList(); + for (Integer fileId : logFileIDReferenceCounts.keySet()) { + Integer count = logFileIDReferenceCounts.get(fileId).get(); + long value = encodeActiveLogCounter(fileId, count); + fileIdAndCountEncoded.add(value); + } + + int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size(); + for (int i = 0; i < emptySlots; i++) { + fileIdAndCountEncoded.add(0L); + } + for (int i = 0; i < MAX_ACTIVE_LOGS; i++) { + elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i)); + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java new file mode 100644 index 0000000..c766d09 --- /dev/null +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.channel.file; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.flume.chanel.file.proto.ProtosFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile { + private static final Logger LOG = LoggerFactory + .getLogger(EventQueueBackingStoreFileV3.class); + private final File metaDataFile; + + EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name) + throws IOException { + super(capacity, name, checkpointFile); + Preconditions.checkArgument(capacity > 0, + "capacity must be greater than 0 " + capacity); + metaDataFile = Serialization.getMetaDataFile(checkpointFile); + LOG.info("Starting up with " + checkpointFile + " and " + metaDataFile); + if(metaDataFile.exists()) { + FileInputStream inputStream = new FileInputStream(metaDataFile); + try { + LOG.info("Reading checkpoint metadata from " + metaDataFile); + ProtosFactory.Checkpoint checkpoint = + ProtosFactory.Checkpoint.parseDelimitedFrom(inputStream); + int version = checkpoint.getVersion(); + Preconditions.checkState(version == getVersion(), + "Invalid version: " + version + " " + name + ", expected " + + getVersion()); + long logWriteOrderID = checkpoint.getWriteOrderID(); + if(logWriteOrderID != getCheckpointLogWriteOrderID()) { + LOG.error("Checkpoint and Meta files have differing " + + "logWriteOrderIDs " + getCheckpointLogWriteOrderID() + ", and " + + logWriteOrderID); + throw new IllegalStateException( + "The last checkpoint was not completed correctly. Please delete " + + "the checkpoint files: " + checkpointFile + " and " + + Serialization.getMetaDataFile(checkpointFile) + + " to rebuild the checkpoint and start again. " + name); + } + WriteOrderOracle.setSeed(logWriteOrderID); + setLogWriteOrderID(logWriteOrderID); + setSize(checkpoint.getQueueSize()); + setHead(checkpoint.getQueueHead()); + for(ProtosFactory.ActiveLog activeLog : checkpoint.getActiveLogsList()) { + Integer logFileID = activeLog.getLogFileID(); + Integer count = activeLog.getCount(); + logFileIDReferenceCounts.put(logFileID, new AtomicInteger(count)); + } + } finally { + try { + inputStream.close(); + } catch (IOException e) { + LOG.warn("Unable to close " + metaDataFile, e); + } + } + } else { + ProtosFactory.Checkpoint.Builder checkpointBuilder = + ProtosFactory.Checkpoint.newBuilder(); + checkpointBuilder.setVersion(getVersion()); + checkpointBuilder.setQueueHead(getHead()); + checkpointBuilder.setQueueSize(getSize()); + checkpointBuilder.setWriteOrderID(getLogWriteOrderID()); + FileOutputStream outputStream = new FileOutputStream(metaDataFile); + try { + checkpointBuilder.build().writeDelimitedTo(outputStream); + outputStream.getChannel().force(true); + } finally { + try { + outputStream.close(); + } catch (IOException e) { + LOG.warn("Unable to close " + metaDataFile, e); + } + } + } + } + File getMetaDataFile() { + return metaDataFile; + } + + @Override + protected int getVersion() { + return Serialization.VERSION_3; + } + @Override + protected void writeCheckpointMetaData() throws IOException { + ProtosFactory.Checkpoint.Builder checkpointBuilder = + ProtosFactory.Checkpoint.newBuilder(); + checkpointBuilder.setVersion(getVersion()); + checkpointBuilder.setQueueHead(getHead()); + checkpointBuilder.setQueueSize(getSize()); + checkpointBuilder.setWriteOrderID(getLogWriteOrderID()); + for(Integer logFileID : logFileIDReferenceCounts.keySet()) { + int count = logFileIDReferenceCounts.get(logFileID).get(); + if(count != 0) { + ProtosFactory.ActiveLog.Builder activeLogBuilder = + ProtosFactory.ActiveLog.newBuilder(); + activeLogBuilder.setLogFileID(logFileID); + activeLogBuilder.setCount(count); + checkpointBuilder.addActiveLogs(activeLogBuilder.build()); + } + } + FileOutputStream outputStream = new FileOutputStream(metaDataFile); + try { + checkpointBuilder.build().writeDelimitedTo(outputStream); + outputStream.getChannel().force(true); + } finally { + try { + outputStream.close(); + } catch (IOException e) { + LOG.warn("Unable to close " + metaDataFile, e); + } + } + } + + static void upgrade(EventQueueBackingStoreFileV2 backingStoreV2, + File checkpointFile, File metaDataFile) + throws IOException { + + int head = backingStoreV2.getHead(); + int size = backingStoreV2.getSize(); + long writeOrderID = backingStoreV2.getLogWriteOrderID(); + Map referenceCounts = + backingStoreV2.logFileIDReferenceCounts; + + ProtosFactory.Checkpoint.Builder checkpointBuilder = + ProtosFactory.Checkpoint.newBuilder(); + checkpointBuilder.setVersion(Serialization.VERSION_3); + checkpointBuilder.setQueueHead(head); + checkpointBuilder.setQueueSize(size); + checkpointBuilder.setWriteOrderID(writeOrderID); + for(Integer logFileID : referenceCounts.keySet()) { + int count = referenceCounts.get(logFileID).get(); + if(count > 0) { + ProtosFactory.ActiveLog.Builder activeLogBuilder = + ProtosFactory.ActiveLog.newBuilder(); + activeLogBuilder.setLogFileID(logFileID); + activeLogBuilder.setCount(count); + checkpointBuilder.addActiveLogs(activeLogBuilder.build()); + } + } + FileOutputStream outputStream = new FileOutputStream(metaDataFile); + try { + checkpointBuilder.build().writeDelimitedTo(outputStream); + outputStream.getChannel().force(true); + } finally { + try { + outputStream.close(); + } catch (IOException e) { + LOG.warn("Unable to close " + metaDataFile, e); + } + } + RandomAccessFile checkpointFileHandle = + new RandomAccessFile(checkpointFile, "rw"); + try { + checkpointFileHandle.seek(INDEX_VERSION * Serialization.SIZE_OF_LONG); + checkpointFileHandle.writeLong(Serialization.VERSION_3); + checkpointFileHandle.getChannel().force(true); + } finally { + try { + checkpointFileHandle.close(); + } catch (IOException e) { + LOG.warn("Unable to close " + checkpointFile, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java index e40cd8c..5f06ab7 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventPointer.java @@ -18,7 +18,6 @@ */ package org.apache.flume.channel.file; -import com.google.common.base.Preconditions; /** * Pointer to an Event on disk. This is represented in memory @@ -31,7 +30,15 @@ class FlumeEventPointer { FlumeEventPointer(int fileID, int offset) { this.fileID = fileID; this.offset = offset; - Preconditions.checkArgument(offset > 0); + /* + * Log files used to have a header, now metadata is in + * a separate file so data starts at offset 0. + */ + if(offset < 0) { + throw new IllegalArgumentException("offset = " + offset + "(" + + Integer.toHexString(offset) + ")" + ", fileID = " + fileID + + "(" + Integer.toHexString(fileID) + ")"); + } } int getFileID() { return fileID; http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index 8085d22..a8df042 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -21,25 +21,9 @@ package org.apache.flume.channel.file; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.LongBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel.MapMode; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Maps; -import com.google.common.collect.SetMultimap; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; +import java.nio.LongBuffer; import java.security.MessageDigest; import java.util.Arrays; import java.util.Collection; @@ -49,7 +33,14 @@ import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.ArrayUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; /** * Queue of events in the channel. This queue stores only @@ -63,120 +54,22 @@ import org.apache.commons.lang.ArrayUtils; final class FlumeEventQueue { private static final Logger LOG = LoggerFactory .getLogger(FlumeEventQueue.class); - private static final long VERSION = 2; private static final int EMPTY = 0; - private static final int INDEX_VERSION = 0; - private static final int INDEX_WRITE_ORDER_ID = 1; - private static final int INDEX_SIZE = 2; - private static final int INDEX_HEAD = 3; - private static final int INDEX_CHECKPOINT_MARKER = 4; - private static final int CHECKPOINT_COMPLETE = EMPTY; - private static final int CHECKPOINT_INCOMPLETE = 1; - private static final int INDEX_ACTIVE_LOG = 5; - private static final int MAX_ACTIVE_LOGS = 1024; - private static final int HEADER_SIZE = 1029; - private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB - private final Map fileIDCounts = Maps.newHashMap(); - private final MappedByteBuffer mappedBuffer; - private final LongBuffer elementsBuffer; - private LongBufferWrapper elements; - private final RandomAccessFile checkpointFile; - private final java.nio.channels.FileChannel checkpointFileHandle; - private final int queueCapacity; + private final EventQueueBackingStore backingStore; private final String channelNameDescriptor; private final InflightEventWrapper inflightTakes; private final InflightEventWrapper inflightPuts; - private int queueSize; - private int queueHead; - private long logWriteOrderID; - /** * @param capacity max event capacity of queue * @throws IOException */ - FlumeEventQueue(int capacity, File file, File inflightTakesFile, - File inflightPutsFile, String name) throws Exception { - Preconditions.checkArgument(capacity > 0, + FlumeEventQueue(EventQueueBackingStore backingStore, File inflightTakesFile, + File inflightPutsFile) throws Exception { + Preconditions.checkArgument(backingStore.getCapacity() > 0, "Capacity must be greater than zero"); - this.channelNameDescriptor = "[channel=" + name + "]"; - this.queueCapacity = capacity; - - if (!file.exists()) { - Preconditions.checkState(file.createNewFile(), "Unable to create file: " - + file.getCanonicalPath() + " " + channelNameDescriptor); - } - - boolean freshlyAllocated = false; - checkpointFile = new RandomAccessFile(file, "rw"); - if (checkpointFile.length() == 0) { - // Allocate - LOG.info("Event queue has zero allocation. Initializing to capacity. " - + "Please wait..."); - int totalBytes = (capacity + HEADER_SIZE)*8; - if (totalBytes <= MAX_ALLOC_BUFFER_SIZE) { - checkpointFile.write(new byte[totalBytes]); - } else { - byte[] initBuffer = new byte[MAX_ALLOC_BUFFER_SIZE]; - int remainingBytes = totalBytes; - while (remainingBytes >= MAX_ALLOC_BUFFER_SIZE) { - checkpointFile.write(initBuffer); - remainingBytes -= MAX_ALLOC_BUFFER_SIZE; - } - if (remainingBytes > 0) { - checkpointFile.write(initBuffer, 0, remainingBytes); - } - } - - LOG.info("Event queue allocation complete"); - freshlyAllocated = true; - } else { - int fileCapacity = (int) checkpointFile.length() / 8; - int expectedCapacity = capacity + HEADER_SIZE; - - Preconditions.checkState(fileCapacity == expectedCapacity, - "Capacity cannot be changed once the channel is initialized " - + channelNameDescriptor + ": fileCapacity = " + fileCapacity - + ", expectedCapacity = " + expectedCapacity); - } - - checkpointFileHandle = checkpointFile.getChannel(); - - mappedBuffer = checkpointFileHandle.map(MapMode.READ_WRITE, 0, - file.length()); - - elementsBuffer = mappedBuffer.asLongBuffer(); - if (freshlyAllocated) { - elementsBuffer.put(INDEX_VERSION, VERSION); - } else { - int version = (int) elementsBuffer.get(INDEX_VERSION); - Preconditions.checkState(version == VERSION, - "Invalid version: " + version + channelNameDescriptor); - logWriteOrderID = elementsBuffer.get(INDEX_WRITE_ORDER_ID); - queueSize = (int) elementsBuffer.get(INDEX_SIZE); - queueHead = (int) elementsBuffer.get(INDEX_HEAD); - - long checkpointComplete = - (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER); - Preconditions.checkState(checkpointComplete == CHECKPOINT_COMPLETE, - "The last checkpoint was not completed correctly. Please delete " - + "the checkpoint file: " + file.getCanonicalPath() + " to rebuild " - + "the checkpoint and start again. " + channelNameDescriptor); - - int indexMaxLog = INDEX_ACTIVE_LOG + MAX_ACTIVE_LOGS; - for (int i = INDEX_ACTIVE_LOG; i < indexMaxLog; i++) { - long nextFileCode = elementsBuffer.get(i); - if (nextFileCode != EMPTY) { - Pair idAndCount = - deocodeActiveLogCounter(nextFileCode); - fileIDCounts.put(idAndCount.getLeft(), - new AtomicInteger(idAndCount.getRight())); - } - } - } - - elements = new LongBufferWrapper(elementsBuffer, channelNameDescriptor); - //TODO: Support old code paths with no inflight files. + this.channelNameDescriptor = "[channel=" + backingStore.getName() + "]"; + this.backingStore = backingStore; try { inflightPuts = new InflightEventWrapper(inflightPutsFile); inflightTakes = new InflightEventWrapper(inflightTakesFile); @@ -186,20 +79,6 @@ final class FlumeEventQueue { } } - private Pair deocodeActiveLogCounter(long value) { - int fileId = (int) (value >>> 32); - int count = (int) value; - - return Pair.of(fileId, count); - } - - private long encodeActiveLogCounter(int fileId, int count) { - long result = fileId; - result = (long)fileId << 32; - result += (long) count; - return result; - } - SetMultimap deserializeInflightPuts() throws IOException{ return inflightPuts.deserialize(); } @@ -209,46 +88,20 @@ final class FlumeEventQueue { } synchronized long getLogWriteOrderID() { - return logWriteOrderID; + return backingStore.getLogWriteOrderID(); } synchronized boolean checkpoint(boolean force) throws Exception { - if (!elements.syncRequired() + if (!backingStore.syncRequired() && !inflightTakes.syncRequired() && !force) { //No need to check inflight puts, since that would //cause elements.syncRequired() to return true. LOG.debug("Checkpoint not required"); return false; } - - // Start checkpoint - elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE); - - updateHeaders(); - - List fileIdAndCountEncoded = new ArrayList(); - for (Integer fileId : fileIDCounts.keySet()) { - Integer count = fileIDCounts.get(fileId).get(); - long value = encodeActiveLogCounter(fileId, count); - fileIdAndCountEncoded.add(value); - } - - int emptySlots = MAX_ACTIVE_LOGS - fileIdAndCountEncoded.size(); - for (int i = 0; i < emptySlots; i++) { - fileIdAndCountEncoded.add(0L); - } - for (int i = 0; i < MAX_ACTIVE_LOGS; i++) { - elementsBuffer.put(i + INDEX_ACTIVE_LOG, fileIdAndCountEncoded.get(i)); - } - - elements.sync(); - inflightPuts.serializeAndWrite(); inflightTakes.serializeAndWrite(); - // Finish checkpoint - elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE); - mappedBuffer.force(); - + backingStore.checkpoint(); return true; } @@ -258,7 +111,7 @@ final class FlumeEventQueue { * @return FlumeEventPointer or null if queue is empty */ synchronized FlumeEventPointer removeHead(long transactionID) { - if(queueSize == 0) { + if(backingStore.getSize() == 0) { return null; } @@ -267,7 +120,7 @@ final class FlumeEventQueue { + channelNameDescriptor); FlumeEventPointer ptr = FlumeEventPointer.fromLong(value); - decrementFileID(ptr.getFileID()); + backingStore.decrementFileID(ptr.getFileID()); return ptr; } @@ -284,7 +137,7 @@ final class FlumeEventQueue { //events since they are in the inflight takes. So puts will not happen //in such a way that these takes cannot go back in. If this if returns true, //there is a buuuuuuuug! - if (queueSize == queueCapacity) { + if (backingStore.getSize() == backingStore.getCapacity()) { LOG.error("Could not reinsert to queue, events which were taken but " + "not committed. Please report this issue."); return false; @@ -292,7 +145,7 @@ final class FlumeEventQueue { long value = e.toLong(); Preconditions.checkArgument(value != EMPTY); - incrementFileID(e.getFileID()); + backingStore.incrementFileID(e.getFileID()); add(0, value); return true; @@ -306,15 +159,15 @@ final class FlumeEventQueue { * was added to the queue */ synchronized boolean addTail(FlumeEventPointer e) { - if ((queueSize + inflightTakes.getSize()) == queueCapacity) { + if (getSize() == backingStore.getCapacity()) { return false; } long value = e.toLong(); Preconditions.checkArgument(value != EMPTY); - incrementFileID(e.getFileID()); + backingStore.incrementFileID(e.getFileID()); - add(queueSize, value); + add(backingStore.getSize(), value); return true; } @@ -338,11 +191,11 @@ final class FlumeEventQueue { synchronized boolean remove(FlumeEventPointer e) { long value = e.toLong(); Preconditions.checkArgument(value != EMPTY); - for (int i = 0; i < queueSize; i++) { + for (int i = 0; i < backingStore.getSize(); i++) { if(get(i) == value) { remove(i, 0); FlumeEventPointer ptr = FlumeEventPointer.fromLong(value); - decrementFileID(ptr.getFileID()); + backingStore.decrementFileID(ptr.getFileID()); return true; } } @@ -357,75 +210,53 @@ final class FlumeEventQueue { //Java implements clone pretty well. The main place this is used //in checkpointing and deleting old files, so best //to use a sorted set implementation. - SortedSet fileIDs = new TreeSet(fileIDCounts.keySet()); + SortedSet fileIDs = + new TreeSet(backingStore.getReferenceCounts()); fileIDs.addAll(inflightPuts.getFileIDs()); fileIDs.addAll(inflightTakes.getFileIDs()); return fileIDs; } - protected void incrementFileID(int fileID) { - AtomicInteger counter = fileIDCounts.get(fileID); - if(counter == null) { - Preconditions.checkState(fileIDCounts.size() < MAX_ACTIVE_LOGS, - "Too many active logs " + channelNameDescriptor); - counter = new AtomicInteger(0); - fileIDCounts.put(fileID, counter); - } - counter.incrementAndGet(); - } - - protected void decrementFileID(int fileID) { - AtomicInteger counter = fileIDCounts.get(fileID); - Preconditions.checkState(counter != null, "null counter " - + channelNameDescriptor); - int count = counter.decrementAndGet(); - if(count == 0) { - fileIDCounts.remove(fileID); - } - } - protected long get(int index) { - if (index < 0 || index > queueSize - 1) { + if (index < 0 || index > backingStore.getSize() - 1) { throw new IndexOutOfBoundsException(String.valueOf(index) + channelNameDescriptor); } - - return elements.get(getPhysicalIndex(index)); + return backingStore.get(index); } private void set(int index, long value) { - if (index < 0 || index > queueSize - 1) { + if (index < 0 || index > backingStore.getSize() - 1) { throw new IndexOutOfBoundsException(String.valueOf(index) + channelNameDescriptor); } - - elements.put(getPhysicalIndex(index), value); + backingStore.put(index, value); } protected boolean add(int index, long value) { - if (index < 0 || index > queueSize) { + if (index < 0 || index > backingStore.getSize()) { throw new IndexOutOfBoundsException(String.valueOf(index) + channelNameDescriptor); } - if (queueSize == queueCapacity) { + if (backingStore.getSize() == backingStore.getCapacity()) { return false; } - queueSize++; + backingStore.setSize(backingStore.getSize() + 1); - if (index <= queueSize/2) { + if (index <= backingStore.getSize()/2) { // Shift left - queueHead--; - if (queueHead < 0) { - queueHead = queueCapacity - 1; + backingStore.setHead(backingStore.getHead() - 1); + if (backingStore.getHead() < 0) { + backingStore.setHead(backingStore.getCapacity() - 1); } for (int i = 0; i < index; i++) { set(i, get(i+1)); } } else { // Sift right - for (int i = queueSize - 1; i > index; i--) { + for (int i = backingStore.getSize() - 1; i > index; i--) { set(i, get(i-1)); } } @@ -444,22 +275,22 @@ final class FlumeEventQueue { } protected synchronized long remove(int index, long transactionID) { - if (index < 0 || index > queueSize - 1) { + if (index < 0 || index > backingStore.getSize() - 1) { throw new IndexOutOfBoundsException("index = " + index - + ", queueSize " + queueSize +" " + channelNameDescriptor); + + ", queueSize " + backingStore.getSize() +" " + channelNameDescriptor); } long value = get(index); //if txn id = 0, we are recovering from a crash. if(transactionID != 0) { inflightTakes.addEvent(transactionID, value); } - if (index > queueSize/2) { + if (index > backingStore.getSize()/2) { // Move tail part to left - for (int i = index; i < queueSize - 1; i++) { + for (int i = index; i < backingStore.getSize() - 1; i++) { long rightValue = get(i+1); set(i, rightValue); } - set(queueSize - 1, EMPTY); + set(backingStore.getSize() - 1, EMPTY); } else { // Move head part to right for (int i = index - 1; i >= 0; i--) { @@ -467,88 +298,34 @@ final class FlumeEventQueue { set(i+1, leftValue); } set(0, EMPTY); - queueHead++; - if (queueHead == queueCapacity) { - queueHead = 0; + backingStore.setHead(backingStore.getHead() + 1); + if (backingStore.getHead() == backingStore.getCapacity()) { + backingStore.setHead(0); } } - - queueSize--; + backingStore.setSize(backingStore.getSize() - 1); return value; } - private synchronized void updateHeaders() { - logWriteOrderID = WriteOrderOracle.next(); - elementsBuffer.put(INDEX_WRITE_ORDER_ID, logWriteOrderID); - elementsBuffer.put(INDEX_SIZE, queueSize); - elementsBuffer.put(INDEX_HEAD, queueHead); - if (LOG.isDebugEnabled()) { - LOG.debug("Updating checkpoint headers: ts: " + logWriteOrderID + ", queueSize: " - + queueSize + ", queueHead: " + queueHead + " " + channelNameDescriptor); - } - } - - - private int getPhysicalIndex(int index) { - return HEADER_SIZE + (queueHead + index) % queueCapacity; - } protected synchronized int getSize() { - return queueSize + inflightTakes.getSize(); + return backingStore.getSize() + inflightTakes.getSize(); } /** * @return max capacity of the queue */ public int getCapacity() { - return queueCapacity; + return backingStore.getCapacity(); } - static class LongBufferWrapper { - private final LongBuffer buffer; - private final String channelNameDescriptor; - - Map overwriteMap = new HashMap(); - - LongBufferWrapper(LongBuffer lb, String nameDescriptor) { - buffer = lb; - channelNameDescriptor = nameDescriptor; - } - - long get(int index) { - long result = EMPTY; - if (overwriteMap.containsKey(index)) { - result = overwriteMap.get(index); - } else { - result = buffer.get(index); - } - - return result; - } - - void put(int index, long value) { - overwriteMap.put(index, value); - } - - boolean syncRequired() { - return overwriteMap.size() > 0; - } - - void sync() { - Iterator it = overwriteMap.keySet().iterator(); - while (it.hasNext()) { - int index = it.next(); - long value = overwriteMap.get(index); - - buffer.put(index, value); - it.remove(); - } - - Preconditions.checkState(overwriteMap.size() == 0, - "concurrent update detected " + channelNameDescriptor); + synchronized void close() { + try { + backingStore.close(); + } catch (IOException e) { + LOG.warn("Error closing backing store", e); } } - /** * A representation of in flight events which have not yet been committed. * None of the methods are thread safe, and should be called from thread @@ -740,56 +517,4 @@ final class FlumeEventQueue { return inflightFileIDs.values(); } } - - public static void main(String[] args) throws Exception { - File file = new File(args[0]); - File inflightTakesFile = new File(args[1]); - File inflightPutsFile = new File(args[2]); - if (!file.exists()) { - throw new IOException("File " + file + " does not exist"); - } - if (file.length() == 0) { - throw new IOException("File " + file + " is empty"); - } - int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L); - FlumeEventQueue queue = new FlumeEventQueue( - capacity, file, inflightTakesFile, inflightPutsFile, "debug"); - System.out.println("File Reference Counts" + queue.fileIDCounts); - System.out.println("Queue Capacity " + queue.getCapacity()); - System.out.println("Queue Size " + queue.getSize()); - System.out.println("Queue Head " + queue.queueHead); - for (int index = 0; index < queue.getCapacity(); index++) { - long value = queue.elements.get(queue.getPhysicalIndex(index)); - int fileID = (int) (value >>> 32); - int offset = (int) value; - System.out.println(index + ":" + Long.toHexString(value) + " fileID = " - + fileID + ", offset = " + offset); - } - - SetMultimap putMap = queue.deserializeInflightPuts(); - System.out.println("Inflight Puts:"); - - for (Long txnID : putMap.keySet()) { - Set puts = putMap.get(txnID); - System.out.println("Transaction ID: " + String.valueOf(txnID)); - for (long value : puts) { - int fileID = (int) (value >>> 32); - int offset = (int) value; - System.out.println(Long.toHexString(value) + " fileID = " - + fileID + ", offset = " + offset); - } - } - SetMultimap takeMap = queue.deserializeInflightTakes(); - System.out.println("Inflight takes:"); - for (Long txnID : takeMap.keySet()) { - Set takes = takeMap.get(txnID); - System.out.println("Transaction ID: " + String.valueOf(txnID)); - for (long value : takes) { - int fileID = (int) (value >>> 32); - int offset = (int) value; - System.out.println(Long.toHexString(value) + " fileID = " - + fileID + ", offset = " + offset); - } - } - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index e13ecc4..6e8e3d0 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -46,7 +47,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.util.SortedSet; /** * Stores FlumeEvents on disk and pointers to the events in a in memory queue. @@ -91,7 +91,6 @@ class Log { */ private final WriteLock checkpointWriterLock = checkpointLock.writeLock(); private int logWriteTimeout; - private final String channelName; private final String channelNameDescriptor; private int checkpointWriteTimeout; private boolean useLogReplayV1; @@ -184,7 +183,6 @@ class Log { Preconditions.checkArgument(name != null && !name.trim().isEmpty(), "channel name should be specified"); - this.channelName = name; this.channelNameDescriptor = "[channel=" + name + "]"; this.useLogReplayV1 = useLogReplayV1; this.useFastReplay = useFastReplay; @@ -225,6 +223,7 @@ class Log { * directly before the shutdown or crash. * @throws IOException */ + @SuppressWarnings("deprecation") void replay() throws IOException { Preconditions.checkState(!open, "Cannot replay after Log has been opened"); @@ -248,7 +247,7 @@ class Log { int id = LogUtils.getIDForFile(file); dataFiles.add(file); nextFileID.set(Math.max(nextFileID.get(), id)); - idLogFileMap.put(id, new LogFile.RandomReader(new File(logDir, PREFIX + idLogFileMap.put(id, LogFileFactory.getRandomReader(new File(logDir, PREFIX + id))); } } @@ -268,8 +267,11 @@ class Log { File checkpointFile = new File(checkpointDir, "checkpoint"); File inflightTakesFile = new File(checkpointDir, "inflighttakes"); File inflightPutsFile = new File(checkpointDir, "inflightputs"); - queue = new FlumeEventQueue(queueCapacity, checkpointFile, - inflightTakesFile, inflightPutsFile, channelName); + EventQueueBackingStore backingStore = + EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity, + channelNameDescriptor); + queue = new FlumeEventQueue(backingStore, inflightTakesFile, + inflightPutsFile); LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified()) + ", queue depth = " + queue.getSize()); @@ -279,7 +281,7 @@ class Log { * the list of data files. */ ReplayHandler replayHandler = new ReplayHandler(queue, useFastReplay, - checkpointFile, maxFileSize); + checkpointFile); if(useLogReplayV1) { LOGGER.info("Replaying logs with v1 replay logic"); replayHandler.replayLogv1(dataFiles); @@ -288,7 +290,6 @@ class Log { replayHandler.replayLog(dataFiles); } - for (int index = 0; index < logDirs.length; index++) { LOGGER.info("Rolling " + logDirs[index]); roll(index); @@ -354,8 +355,7 @@ class Log { Preconditions.checkState(open, "Log is closed"); FlumeEvent flumeEvent = new FlumeEvent( event.getHeaders(), event.getBody()); - Put put = new Put(transactionID, flumeEvent); - put.setLogWriteOrderID(WriteOrderOracle.next()); + Put put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put); int logFileIndex = nextLogWriter(transactionID); if (logFiles.get(logFileIndex).isRollRequired(buffer)) { @@ -384,9 +384,8 @@ class Log { void take(long transactionID, FlumeEventPointer pointer) throws IOException { Preconditions.checkState(open, "Log is closed"); - Take take = new Take(transactionID, pointer.getOffset(), - pointer.getFileID()); - take.setLogWriteOrderID(WriteOrderOracle.next()); + Take take = new Take(transactionID, WriteOrderOracle.next(), + pointer.getOffset(), pointer.getFileID()); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(take); int logFileIndex = nextLogWriter(transactionID); if (logFiles.get(logFileIndex).isRollRequired(buffer)) { @@ -416,8 +415,7 @@ class Log { if(LOGGER.isDebugEnabled()) { LOGGER.debug("Rolling back " + transactionID); } - Rollback rollback = new Rollback(transactionID); - rollback.setLogWriteOrderID(WriteOrderOracle.next()); + Rollback rollback = new Rollback(transactionID, WriteOrderOracle.next()); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(rollback); int logFileIndex = nextLogWriter(transactionID); if (logFiles.get(logFileIndex).isRollRequired(buffer)) { @@ -526,6 +524,7 @@ class Log { } } } + queue.close(); try { unlock(checkpointDir); } catch (IOException ex) { @@ -564,8 +563,7 @@ class Log { private void commit(long transactionID, short type) throws IOException { Preconditions.checkState(open, "Log is closed"); - Commit commit = new Commit(transactionID, type); - commit.setLogWriteOrderID(WriteOrderOracle.next()); + Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type); ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit); int logFileIndex = nextLogWriter(transactionID); if (logFiles.get(logFileIndex).isRollRequired(buffer)) { @@ -634,9 +632,9 @@ class Log { "File already exists " + file); Preconditions.checkState(file.createNewFile(), "File could not be created " + file); - idLogFileMap.put(fileID, new LogFile.RandomReader(file)); + idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file)); // writer from this point on will get new reference - logFiles.set(index, new LogFile.Writer(file, fileID, maxFileSize)); + logFiles.set(index, LogFileFactory.getWriter(file, fileID, maxFileSize)); // close out old log if (oldLogFile != null) { oldLogFile.close(); @@ -670,9 +668,9 @@ class Log { if(!lockAcquired) { return false; } - SortedSet idSet = null; + SortedSet logFileRefCountsAll = null, logFileRefCountsActive = null; try { - if (queue.checkpoint(force) || force) { + if (queue.checkpoint(force)) { long logWriteOrderID = queue.getLogWriteOrderID(); //Since the active files might also be in the queue's fileIDs, @@ -681,42 +679,52 @@ class Log { //fileID set from the queue have been updated. //Since clone is smarter than insert, better to make //a copy of the set first so that we can use it later. - idSet = queue.getFileIDs(); - SortedSet idSetToCompare = new TreeSet(idSet); + logFileRefCountsAll = queue.getFileIDs(); + logFileRefCountsActive = new TreeSet(logFileRefCountsAll); int numFiles = logFiles.length(); for (int i = 0; i < numFiles; i++) { - LogFile.Writer writer = logFiles.get(i); - writer.markCheckpoint(logWriteOrderID); - int id = writer.getFileID(); - idSet.remove(id); - LOGGER.debug("Updated checkpoint for file: " + writer.getFile()); + LogFile.Writer logWriter = logFiles.get(i); + int logFileID = logWriter.getLogFileID(); + File logFile = logWriter.getFile(); + LogFile.MetaDataWriter writer = + LogFileFactory.getMetaDataWriter(logFile, logFileID); + try { + writer.markCheckpoint(logWriter.position(), logWriteOrderID); + } finally { + writer.close(); + } + logFileRefCountsAll.remove(logFileID); + LOGGER.info("Updated checkpoint for file: " + logFile + " position: " + + logWriter.position() + " logWriteOrderID: " + logWriteOrderID); } // Update any inactive data files as well - Iterator idIterator = idSet.iterator(); + Iterator idIterator = logFileRefCountsAll.iterator(); while (idIterator.hasNext()) { int id = idIterator.next(); LogFile.RandomReader reader = idLogFileMap.remove(id); File file = reader.getFile(); reader.close(); - // Open writer in inactive mode - LogFile.Writer writer = - new LogFile.Writer(file, id, maxFileSize, false); - writer.markCheckpoint(logWriteOrderID); - writer.close(); - reader = new LogFile.RandomReader(file); + LogFile.MetaDataWriter writer = + LogFileFactory.getMetaDataWriter(file, id); + try { + writer.markCheckpoint(logWriteOrderID); + } finally { + writer.close(); + } + reader = LogFileFactory.getRandomReader(file); idLogFileMap.put(id, reader); - LOGGER.debug("Updated checkpoint for file: " + file); + LOGGER.debug("Updated checkpoint for file: " + file + + "logWriteOrderID " + logWriteOrderID); idIterator.remove(); } - Preconditions.checkState(idSet.size() == 0, - "Could not update all data file timestamps: " + idSet); + Preconditions.checkState(logFileRefCountsAll.size() == 0, + "Could not update all data file timestamps: " + logFileRefCountsAll); //Add files from all log directories for (int index = 0; index < logDirs.length; index++) { - idSetToCompare.add(logFiles.get(index).getFileID()); + logFileRefCountsActive.add(logFiles.get(index).getLogFileID()); } - idSet = idSetToCompare; checkpointCompleted = true; } } finally { @@ -725,7 +733,7 @@ class Log { //Do the deletes outside the checkpointWriterLock //Delete logic is expensive. if (open && checkpointCompleted) { - removeOldLogs(idSet); + removeOldLogs(logFileRefCountsActive); } //Since the exception is not caught, this will not be returned if //an exception is thrown from the try. @@ -755,6 +763,11 @@ class Log { LOGGER.info("Removing old log " + logFile + ", result = " + logFile.delete() + ", minFileID " + minFileID); + File metaDataFile = Serialization.getMetaDataFile(logFile); + if(metaDataFile.exists() && !metaDataFile.delete()) { + LOGGER.warn("Could not remove metadata file " + + metaDataFile + " for " + logFile); + } } } } @@ -795,6 +808,7 @@ class Log { * null if directory is already locked. * @throws IOException if locking fails. */ + @SuppressWarnings("resource") private FileLock tryLock(File dir) throws IOException { File lockF = new File(dir, FILE_LOCK); lockF.deleteOnExit();