Return-Path: X-Original-To: apmail-flume-commits-archive@www.apache.org Delivered-To: apmail-flume-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 03A75D2F6 for ; Fri, 7 Sep 2012 23:28:55 +0000 (UTC) Received: (qmail 10357 invoked by uid 500); 7 Sep 2012 23:28:54 -0000 Delivered-To: apmail-flume-commits-archive@flume.apache.org Received: (qmail 10264 invoked by uid 500); 7 Sep 2012 23:28:54 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 9416 invoked by uid 99); 7 Sep 2012 23:28:53 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Sep 2012 23:28:53 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E6FE52842C; Fri, 7 Sep 2012 23:28:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hshreedharan@apache.org To: commits@flume.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [27/50] [abbrv] git commit: FLUME-1437: Checkpoint can miss pending takes Message-Id: <20120907232852.E6FE52842C@tyr.zones.apache.org> Date: Fri, 7 Sep 2012 23:28:52 +0000 (UTC) FLUME-1437: Checkpoint can miss pending takes (Hari Shreedharan via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3b7612a3 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3b7612a3 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3b7612a3 Branch: refs/heads/cdh-1.2.0+24_intuit Commit: 3b7612a3f9475d28b41fb2622e9ef04f3e683f6b Parents: b66521a Author: Brock Noland Authored: Tue Aug 21 09:34:22 2012 -0500 Committer: Mike Percy Committed: Fri Sep 7 14:03:05 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/FileChannel.java | 73 ++- .../apache/flume/channel/file/FlumeEventQueue.java | 326 +++++++++++- .../java/org/apache/flume/channel/file/Log.java | 12 +- .../apache/flume/channel/file/ReplayHandler.java | 82 +++- .../apache/flume/channel/file/TestCheckpoint.java | 17 +- .../apache/flume/channel/file/TestFileChannel.java | 401 +++++++++++++-- .../flume/channel/file/TestFlumeEventQueue.java | 108 +++- .../org/apache/flume/channel/file/TestLog.java | 14 +- 8 files changed, 908 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/3b7612a3/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java index e7735e8..b5a0b88 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java @@ -256,6 +256,7 @@ public class FileChannel extends BasicChannelSemantics { super.stop(); } + @Override public String toString() { return "FileChannel " + getName() + " { dataDirs: " + Arrays.toString(dataDirs) + " }"; @@ -362,7 +363,7 @@ public class FileChannel extends BasicChannelSemantics { "increasing capacity, or increasing thread count. " + channelNameDescriptor); } - FlumeEventPointer ptr = queue.removeHead(); + FlumeEventPointer ptr = queue.removeHead(transactionID); if(ptr != null) { try { // first add to takeList so that if write to disk @@ -387,6 +388,23 @@ public class FileChannel extends BasicChannelSemantics { if(puts > 0) { Preconditions.checkState(takes == 0, "nonzero puts and takes " + channelNameDescriptor); + /* + * OK to not put this in synchronized(queue) block, because if a + * checkpoint occurs after the commit it is fine. + * The puts will be in the inflightputs file in the checkpoint. + * The commit did not return, so previous hop would not get success + * for the commit. + * The replay will not see the commit in the log file(since the + * commit is before the checkpoint in the logs) - and hence the events + * are not added back to the queue, so no duplicates or data loss. + */ + try { + log.commitPut(transactionID); + channelCounter.addToEventPutSuccessCount(puts); + } catch (IOException e) { + throw new ChannelException("Commit failed due to IO error " + + channelNameDescriptor, e); + } synchronized (queue) { while(!putList.isEmpty()) { if(!queue.addTail(putList.removeFirst())) { @@ -401,21 +419,24 @@ public class FileChannel extends BasicChannelSemantics { Preconditions.checkState(false, msg.toString()); } } + queue.completeTransaction(transactionID); } + } else if (takes > 0) { try { - log.commitPut(transactionID); - channelCounter.addToEventPutSuccessCount(puts); - } catch (IOException e) { - throw new ChannelException("Commit failed due to IO error " - + channelNameDescriptor, e); - } - } else if(takes > 0) { - try { + /* + * OK to not have the commit take in synchronized(queue) block. + * If a checkpoint happens in between the commitTake and + * the completeTxn call, the takes will be in the inflightTakes file. + * When the channel replays the events, these takes will be put + * back into the channel - and will cause duplicates, but the + * number of duplicates will be pretty limited. + */ log.commitTake(transactionID); + queue.completeTransaction(transactionID); channelCounter.addToEventTakeSuccessCount(takes); } catch (IOException e) { throw new ChannelException("Commit failed due to IO error " - + channelNameDescriptor, e); + + channelNameDescriptor, e); } queueRemaining.release(takes); } @@ -428,26 +449,36 @@ public class FileChannel extends BasicChannelSemantics { protected void doRollback() throws InterruptedException { int puts = putList.size(); int takes = takeList.size(); + /* + * OK to not have the rollback within the synchronized(queue) block. + * If a checkpoint occurs between the rollback and the synchronized(queue) + * block, the takes are kept in the inflighttakes file in the checkpoint. + * During a replay the commit or rollback for the takes are not seen, + * so the takes are re-inserted into the queue - which is a rollback + * anyway. + */ + try { + log.rollback(transactionID); + } catch (IOException e) { + throw new ChannelException("Commit failed due to IO error " + + channelNameDescriptor, e); + } if(takes > 0) { Preconditions.checkState(puts == 0, "nonzero puts and takes " + channelNameDescriptor); - while(!takeList.isEmpty()) { - Preconditions.checkState(queue.addHead(takeList.removeLast()), - "Queue add failed, this shouldn't be able to happen " - + channelNameDescriptor); + synchronized (queue) { + while (!takeList.isEmpty()) { + Preconditions.checkState(queue.addHead(takeList.removeLast()), + "Queue add failed, this shouldn't be able to happen " + + channelNameDescriptor); + } + queue.completeTransaction(transactionID); } } queueRemaining.release(puts); - try { - log.rollback(transactionID); - } catch (IOException e) { - throw new ChannelException("Commit failed due to IO error " - + channelNameDescriptor, e); - } putList.clear(); takeList.clear(); channelCounter.setChannelSize(queue.getSize()); } - } } http://git-wip-us.apache.org/repos/asf/flume/blob/3b7612a3/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java index 9bfee2d..766c59a 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java @@ -35,9 +35,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.ArrayUtils; /** * Queue of events in the channel. This queue stores only @@ -48,7 +60,7 @@ import java.util.TreeSet; * contains the timestamp of last sync, the queue size and * the head position. */ -class FlumeEventQueue { +final class FlumeEventQueue { private static final Logger LOG = LoggerFactory .getLogger(FlumeEventQueue.class); private static final long VERSION = 2; @@ -72,6 +84,8 @@ class FlumeEventQueue { private final java.nio.channels.FileChannel checkpointFileHandle; private final int queueCapacity; private final String channelNameDescriptor; + private final InflightEventWrapper inflightTakes; + private final InflightEventWrapper inflightPuts; private int queueSize; private int queueHead; @@ -81,7 +95,8 @@ class FlumeEventQueue { * @param capacity max event capacity of queue * @throws IOException */ - FlumeEventQueue(int capacity, File file, String name) throws IOException { + FlumeEventQueue(int capacity, File file, File inflightTakesFile, + File inflightPutsFile, String name) throws Exception { Preconditions.checkArgument(capacity > 0, "Capacity must be greater than zero"); this.channelNameDescriptor = "[channel=" + name + "]"; @@ -161,6 +176,14 @@ class FlumeEventQueue { } elements = new LongBufferWrapper(elementsBuffer, channelNameDescriptor); + //TODO: Support old code paths with no inflight files. + try { + inflightPuts = new InflightEventWrapper(inflightPutsFile); + inflightTakes = new InflightEventWrapper(inflightTakesFile); + } catch (Exception e) { + LOG.error("Could not read checkpoint.", e); + throw e; + } } private Pair deocodeActiveLogCounter(long value) { @@ -177,12 +200,23 @@ class FlumeEventQueue { return result; } + SetMultimap deserializeInflightPuts() throws IOException{ + return inflightPuts.deserialize(); + } + + SetMultimap deserializeInflightTakes() throws IOException{ + return inflightTakes.deserialize(); + } + synchronized long getLogWriteOrderID() { return logWriteOrderID; } - synchronized boolean checkpoint(boolean force) { - if (!elements.syncRequired() && !force) { + synchronized boolean checkpoint(boolean force) throws Exception { + if (!elements.syncRequired() + && !inflightTakes.syncRequired() + && !force) { //No need to check inflight puts, since that would + //cause elements.syncRequired() to return true. LOG.debug("Checkpoint not required"); return false; } @@ -209,6 +243,8 @@ class FlumeEventQueue { elements.sync(); + inflightPuts.serializeAndWrite(); + inflightTakes.serializeAndWrite(); // Finish checkpoint elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE); mappedBuffer.force(); @@ -221,12 +257,12 @@ class FlumeEventQueue { * * @return FlumeEventPointer or null if queue is empty */ - synchronized FlumeEventPointer removeHead() { - if(queueSize == 0) { + synchronized FlumeEventPointer removeHead(long transactionID) { + if(queueSize == 0) { return null; } - long value = remove(0); + long value = remove(0, transactionID); Preconditions.checkState(value != EMPTY, "Empty value " + channelNameDescriptor); @@ -236,13 +272,21 @@ class FlumeEventQueue { } /** - * Add a FlumeEventPointer to the head of the queue + * Add a FlumeEventPointer to the head of the queue. + * Called during rollbacks. * @param FlumeEventPointer to be added * @return true if space was available and pointer was * added to the queue */ synchronized boolean addHead(FlumeEventPointer e) { + //Called only during rollback, so should not consider inflight takes' size, + //because normal puts through addTail method already account for these + //events since they are in the inflight takes. So puts will not happen + //in such a way that these takes cannot go back in. If this if returns true, + //there is a buuuuuuuug! if (queueSize == queueCapacity) { + LOG.error("Could not reinsert to queue, events which were taken but " + + "not committed. Please report this issue."); return false; } @@ -256,15 +300,13 @@ class FlumeEventQueue { /** - * Add a FlumeEventPointer to the tail of the queue - * this will normally be used when recovering from a - * crash + * Add a FlumeEventPointer to the tail of the queue. * @param FlumeEventPointer to be added * @return true if space was available and pointer * was added to the queue */ synchronized boolean addTail(FlumeEventPointer e) { - if (queueSize == queueCapacity) { + if ((queueSize + inflightTakes.getSize()) == queueCapacity) { return false; } @@ -277,6 +319,16 @@ class FlumeEventQueue { } /** + * Must be called when a put happens to the log. This ensures that put commits + * after checkpoints will retrieve all events committed in that txn. + * @param e + * @param transactionID + */ + synchronized void addWithoutCommit(FlumeEventPointer e, long transactionID) { + inflightPuts.addEvent(transactionID, e.toLong()); + } + + /** * Remove FlumeEventPointer from queue, will normally * only be used when recovering from a crash * @param FlumeEventPointer to be removed @@ -288,7 +340,7 @@ class FlumeEventQueue { Preconditions.checkArgument(value != EMPTY); for (int i = 0; i < queueSize; i++) { if(get(i) == value) { - remove(i); + remove(i, 0); FlumeEventPointer ptr = FlumeEventPointer.fromLong(value); decrementFileID(ptr.getFileID()); return true; @@ -378,13 +430,26 @@ class FlumeEventQueue { return true; } - protected synchronized long remove(int index) { + /** + * Must be called when a transaction is being committed or rolled back. + * @param transactionID + */ + synchronized void completeTransaction(long transactionID) { + if (!inflightPuts.completeTransaction(transactionID)) { + inflightTakes.completeTransaction(transactionID); + } + } + + protected synchronized long remove(int index, long transactionID) { if (index < 0 || index > queueSize - 1) { throw new IndexOutOfBoundsException("index = " + index + ", queueSize " + queueSize +" " + channelNameDescriptor); } long value = get(index); - + //if txn id = 0, we are recovering from a crash. + if(transactionID != 0) { + inflightTakes.addEvent(transactionID, value); + } if (index > queueSize/2) { // Move tail part to left for (int i = index; i < queueSize - 1; i++) { @@ -426,7 +491,7 @@ class FlumeEventQueue { } protected synchronized int getSize() { - return queueSize; + return queueSize + inflightTakes.getSize(); } /** @@ -481,26 +546,239 @@ class FlumeEventQueue { } } - public static void main(String[] args) throws IOException { + /** + * A representation of in flight events which have not yet been committed. + * None of the methods are thread safe, and should be called from thread + * safe methods only. + */ + private class InflightEventWrapper { + private SetMultimap inflightEvents = HashMultimap.create(); + private RandomAccessFile file; + private volatile java.nio.channels.FileChannel fileChannel; + private final MessageDigest digest; + private volatile Future future; + private final File inflightEventsFile; + private volatile boolean syncRequired = false; + + public InflightEventWrapper(File inflightEventsFile) throws Exception{ + if(!inflightEventsFile.exists()){ + Preconditions.checkState(inflightEventsFile.createNewFile(),"Could not" + + "create inflight events file: " + + inflightEventsFile.getCanonicalPath()); + } + this.inflightEventsFile = inflightEventsFile; + file = new RandomAccessFile(inflightEventsFile, "rw"); + fileChannel = file.getChannel(); + digest = MessageDigest.getInstance("MD5"); + } + + /** + * Complete the transaction, and remove all events from inflight list. + * @param transactionID + */ + public boolean completeTransaction(Long transactionID) { + if(!inflightEvents.containsKey(transactionID)) { + return false; + } + inflightEvents.removeAll(transactionID); + syncRequired = true; + return true; + } + + /** + * Add an event pointer to the inflights list. + * @param transactionID + * @param pointer + */ + public void addEvent(Long transactionID, Long pointer){ + inflightEvents.put(transactionID, pointer); + syncRequired = true; + } + + /** + * Serialize the set of in flights into a byte longBuffer. + * @return Returns the checksum of the buffer that is being + * asynchronously written to disk. + */ + public void serializeAndWrite() throws Exception { + //Check if there is a current write happening, if there is abort it. + if (future != null) { + try { + future.cancel(true); + } catch (Exception e) { + LOG.warn("Interrupted a write to inflights " + + "file: " + inflightEventsFile.getName() + + " to start a new write."); + } + while (!future.isDone()) { + TimeUnit.MILLISECONDS.sleep(100); + } + } + Collection values = inflightEvents.values(); + if(values.isEmpty()){ + file.setLength(0L); + } + if(!fileChannel.isOpen()){ + file = new RandomAccessFile(inflightEventsFile, "rw"); + fileChannel = file.getChannel(); + } + //What is written out? + //Checksum - 16 bytes + //and then each key-value pair from the map: + //transactionid numberofeventsforthistxn listofeventpointers + + try { + int expectedFileSize = (((inflightEvents.keySet().size() * 2) //For transactionIDs and events per txn ID + + values.size()) * 8) //Event pointers + + 16; //Checksum + //There is no real need of filling the channel with 0s, since we + //will write the exact nummber of bytes as expected file size. + file.setLength(expectedFileSize); + Preconditions.checkState(file.length() == expectedFileSize, + "Expected File size of inflight events file does not match the " + + "current file size. Checkpoint is incomplete."); + file.seek(0); + final ByteBuffer buffer = ByteBuffer.allocate(expectedFileSize); + LongBuffer longBuffer = buffer.asLongBuffer(); + for (Long txnID : inflightEvents.keySet()) { + Set pointers = inflightEvents.get(txnID); + longBuffer.put(txnID); + longBuffer.put((long) pointers.size()); + LOG.debug("Number of events inserted into " + + "inflights file: " + String.valueOf(pointers.size()) + + " file: " + inflightEventsFile.getCanonicalPath()); + long[] written = ArrayUtils.toPrimitive( + pointers.toArray(new Long[0])); + longBuffer.put(written); + } + byte[] checksum = digest.digest(buffer.array()); + file.write(checksum); + future = Executors.newSingleThreadExecutor().submit( + new Runnable() { + @Override + public void run() { + try { + buffer.position(0); + fileChannel.write(buffer); + fileChannel.force(true); + } catch (IOException ex) { + LOG.error("Error while writing inflight events to " + + "inflights file: " + + inflightEventsFile.getName()); + } + } + }); + syncRequired = false; + } catch (IOException ex) { + LOG.error("Error while writing checkpoint to disk.", ex); + throw ex; + } + } + + /** + * Read the inflights file and return a + * {@link com.google.common.collect.SetMultimap} + * of transactionIDs to events that were inflight. + * + * @return - map of inflight events per txnID. + * + */ + public SetMultimap deserialize() throws IOException { + SetMultimap inflights = HashMultimap.create(); + if (!fileChannel.isOpen()) { + file = new RandomAccessFile(inflightEventsFile, "rw"); + fileChannel = file.getChannel(); + } + if(file.length() == 0) { + return inflights; + } + file.seek(0); + byte[] checksum = new byte[16]; + file.read(checksum); + ByteBuffer buffer = ByteBuffer.allocate( + (int)(file.length() - file.getFilePointer())); + fileChannel.read(buffer); + byte[] fileChecksum = digest.digest(buffer.array()); + if (!Arrays.equals(checksum, fileChecksum)) { + throw new IllegalStateException("Checksum of inflights file differs" + + " from the checksum expected."); + } + buffer.position(0); + LongBuffer longBuffer = buffer.asLongBuffer(); + try { + while (true) { + long txnID = longBuffer.get(); + int numEvents = (int)(longBuffer.get()); + for(int i = 0; i < numEvents; i++) { + long val = longBuffer.get(); + inflights.put(txnID, val); + } + } + } catch (BufferUnderflowException ex) { + LOG.debug("Reached end of inflights buffer. Long buffer position =" + + String.valueOf(longBuffer.position())); + } + return inflights; + } + + public int getSize() { + return inflightEvents.size(); + } + + public boolean syncRequired(){ + return syncRequired; + } + } + + public static void main(String[] args) throws Exception { File file = new File(args[0]); - if(!file.exists()) { + File inflightTakesFile = new File(args[1]); + File inflightPutsFile = new File(args[2]); + if (!file.exists()) { throw new IOException("File " + file + " does not exist"); } - if(file.length() == 0) { + if (file.length() == 0) { throw new IOException("File " + file + " is empty"); } - int capacity = (int)((file.length() - (HEADER_SIZE * 8L)) / 8L); - FlumeEventQueue queue = new FlumeEventQueue(capacity, file, "debug"); + int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L); + FlumeEventQueue queue = new FlumeEventQueue( + capacity, file, inflightTakesFile, inflightPutsFile, "debug"); System.out.println("File Reference Counts" + queue.fileIDCounts); System.out.println("Queue Capacity " + queue.getCapacity()); System.out.println("Queue Size " + queue.getSize()); System.out.println("Queue Head " + queue.queueHead); for (int index = 0; index < queue.getCapacity(); index++) { long value = queue.elements.get(queue.getPhysicalIndex(index)); - int fileID = (int)(value >>> 32); - int offset = (int)value; + int fileID = (int) (value >>> 32); + int offset = (int) value; System.out.println(index + ":" + Long.toHexString(value) + " fileID = " - + fileID + ", offset = " + offset); + + fileID + ", offset = " + offset); + } + + SetMultimap putMap = queue.deserializeInflightPuts(); + System.out.println("Inflight Puts:"); + + for (Long txnID : putMap.keySet()) { + Set puts = putMap.get(txnID); + System.out.println("Transaction ID: " + String.valueOf(txnID)); + for (long value : puts) { + int fileID = (int) (value >>> 32); + int offset = (int) value; + System.out.println(Long.toHexString(value) + " fileID = " + + fileID + ", offset = " + offset); + } + } + SetMultimap takeMap = queue.deserializeInflightTakes(); + System.out.println("Inflight takes:"); + for (Long txnID : takeMap.keySet()) { + Set takes = takeMap.get(txnID); + System.out.println("Transaction ID: " + String.valueOf(txnID)); + for (long value : takes) { + int fileID = (int) (value >>> 32); + int offset = (int) value; + System.out.println(Long.toHexString(value) + " fileID = " + + fileID + ", offset = " + offset); + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/3b7612a3/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java index 11f1e1f..c356ca4 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java @@ -245,8 +245,10 @@ class Log { * locations. We will read the last one written to disk. */ File checkpointFile = new File(checkpointDir, "checkpoint"); - queue = new FlumeEventQueue(queueCapacity, - checkpointFile, channelName); + File inflightTakesFile = new File(checkpointDir, "inflighttakes"); + File inflightPutsFile = new File(checkpointDir, "inflightputs"); + queue = new FlumeEventQueue(queueCapacity, checkpointFile, + inflightTakesFile, inflightPutsFile, channelName); LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified()) + ", queue depth = " + queue.getSize()); @@ -378,6 +380,7 @@ class Log { boolean error = true; try { FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer); + queue.addWithoutCommit(ptr, transactionID); error = false; return ptr; } finally { @@ -704,7 +707,7 @@ class Log { } } - private boolean writeCheckpoint() throws IOException { + private boolean writeCheckpoint() throws Exception { return writeCheckpoint(false); } @@ -718,8 +721,7 @@ class Log { * @param force a flag to force the writing of checkpoint * @throws IOException if we are unable to write the checkpoint out to disk */ - private boolean writeCheckpoint(boolean force) - throws IOException { + private boolean writeCheckpoint(boolean force) throws Exception { boolean lockAcquired = false; boolean checkpointCompleted = false; try { http://git-wip-us.apache.org/repos/asf/flume/blob/3b7612a3/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java index bbca62c..14e2ff7 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java @@ -26,14 +26,18 @@ import java.util.List; import java.util.Map; import java.util.PriorityQueue; -import org.apache.commons.collections.MultiMap; -import org.apache.commons.collections.map.MultiValueMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import java.util.Iterator; +import java.util.Set; +import org.apache.commons.collections.MultiMap; +import org.apache.commons.collections.map.MultiValueMap; /** * Processes a set of data logs, replaying said logs into the queue. @@ -78,6 +82,16 @@ class ReplayHandler { int total = 0; int count = 0; MultiMap transactionMap = new MultiValueMap(); + //Read inflight puts to see if they were committed + SetMultimap inflightPuts = queue.deserializeInflightPuts(); + for (Long txnID : inflightPuts.keySet()) { + Set eventPointers = inflightPuts.get(txnID); + for (Long eventPointer : eventPointers) { + transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer)); + } + } + + SetMultimap inflightTakes = queue.deserializeInflightTakes(); LOG.info("Starting replay of " + logs); for (File log : logs) { LOG.info("Replaying " + log); @@ -120,6 +134,20 @@ class ReplayHandler { @SuppressWarnings("unchecked") Collection pointers = (Collection) transactionMap.remove(trans); + if (((Commit) record).getType() + == TransactionEventRecord.Type.TAKE.get()) { + if (inflightTakes.containsKey(trans)) { + if (pointers == null) { + pointers = Sets.newHashSet(); + } + Set takes = inflightTakes.removeAll(trans); + Iterator it = takes.iterator(); + while (it.hasNext()) { + Long take = it.next(); + pointers.add(FlumeEventPointer.fromLong(take)); + } + } + } if (pointers != null && pointers.size() > 0) { processCommit(((Commit) record).getType(), pointers); count += pointers.size(); @@ -149,6 +177,19 @@ class ReplayHandler { } } } + //re-insert the events in the take map, + //since the takes were not committed. + int uncommittedTakes = 0; + for (Long inflightTxnId : inflightTakes.keySet()) { + Set inflightUncommittedTakes = + inflightTakes.get(inflightTxnId); + for (Long inflightUncommittedTake : inflightUncommittedTakes) { + queue.addHead(FlumeEventPointer.fromLong(inflightUncommittedTake)); + uncommittedTakes++; + } + } + inflightTakes.clear(); + count += uncommittedTakes; int pendingTakesSize = pendingTakes.size(); if (pendingTakesSize > 0) { String msg = "Pending takes " + pendingTakesSize @@ -173,6 +214,16 @@ class ReplayHandler { MultiMap transactionMap = new MultiValueMap(); long transactionIDSeed = 0, writeOrderIDSeed = 0; LOG.info("Starting replay of " + logs); + //Load the inflight puts into the transaction map to see if they were + //committed in one of the logs. + SetMultimap inflightPuts = queue.deserializeInflightPuts(); + for (Long txnID : inflightPuts.keySet()) { + Set eventPointers = inflightPuts.get(txnID); + for (Long eventPointer : eventPointers) { + transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer)); + } + } + SetMultimap inflightTakes = queue.deserializeInflightTakes(); try { for (File log : logs) { LOG.info("Replaying " + log); @@ -232,6 +283,20 @@ class ReplayHandler { @SuppressWarnings("unchecked") Collection pointers = (Collection) transactionMap.remove(trans); + if (((Commit) record).getType() + == TransactionEventRecord.Type.TAKE.get()) { + if (inflightTakes.containsKey(trans)) { + if(pointers == null){ + pointers = Sets.newHashSet(); + } + Set takes = inflightTakes.removeAll(trans); + Iterator it = takes.iterator(); + while (it.hasNext()) { + Long take = it.next(); + pointers.add(FlumeEventPointer.fromLong(take)); + } + } + } if (pointers != null && pointers.size() > 0) { processCommit(((Commit) record).getType(), pointers); count += pointers.size(); @@ -256,6 +321,19 @@ class ReplayHandler { } } } + //re-insert the events in the take map, + //since the takes were not committed. + int uncommittedTakes = 0; + for (Long inflightTxnId : inflightTakes.keySet()) { + Set inflightUncommittedTakes = + inflightTakes.get(inflightTxnId); + for (Long inflightUncommittedTake : inflightUncommittedTakes) { + queue.addHead(FlumeEventPointer.fromLong(inflightUncommittedTake)); + uncommittedTakes++; + } + } + inflightTakes.clear(); + count += uncommittedTakes; int pendingTakesSize = pendingTakes.size(); if (pendingTakesSize > 0) { String msg = "Pending takes " + pendingTakesSize http://git-wip-us.apache.org/repos/asf/flume/blob/3b7612a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java index 7ec5916..2893538 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java @@ -30,9 +30,13 @@ import org.junit.Test; public class TestCheckpoint { File file; + File inflightPuts; + File inflightTakes; @Before public void setup() throws IOException { file = File.createTempFile("Checkpoint", ""); + inflightPuts = File.createTempFile("inflightPuts", ""); + inflightTakes = File.createTempFile("inflightTakes", ""); Assert.assertTrue(file.isFile()); Assert.assertTrue(file.canWrite()); } @@ -41,15 +45,18 @@ public class TestCheckpoint { file.delete(); } @Test - public void testSerialization() throws IOException { + public void testSerialization() throws Exception { FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20); - FlumeEventQueue queueIn = new FlumeEventQueue(1, file, "test"); + FlumeEventQueue queueIn = new FlumeEventQueue(1, file, inflightTakes, + inflightPuts, "test"); queueIn.addHead(ptrIn); - FlumeEventQueue queueOut = new FlumeEventQueue(1, file, "test"); + FlumeEventQueue queueOut = new FlumeEventQueue(1, file, inflightTakes, + inflightPuts, "test"); Assert.assertEquals(0, queueOut.getLogWriteOrderID()); queueIn.checkpoint(false); - FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file, "test"); - FlumeEventPointer ptrOut = queueOut2.removeHead(); + FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file, inflightTakes, + inflightPuts, "test"); + FlumeEventPointer ptrOut = queueOut2.removeHead(0); Assert.assertEquals(ptrIn, ptrOut); Assert.assertTrue(queueOut2.getLogWriteOrderID() > 0); } http://git-wip-us.apache.org/repos/asf/flume/blob/3b7612a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 1d5a0f9..720fa27 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.flume.channel.file; import java.io.File; @@ -56,17 +55,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.io.Files; import com.google.common.io.Resources; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; public class TestFileChannel { private static final Logger LOG = LoggerFactory - .getLogger(TestFileChannel.class); - + .getLogger(TestFileChannel.class); private FileChannel channel; private File baseDir; private File checkpointDir; @@ -101,7 +104,7 @@ public class TestFileChannel { private Context createContext(Map overrides) { Context context = new Context(); context.put(FileChannelConfiguration.CHECKPOINT_DIR, - checkpointDir.getAbsolutePath()); + checkpointDir.getAbsolutePath()); context.put(FileChannelConfiguration.DATA_DIRS, dataDir); context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); // Set checkpoint for 5 seconds otherwise test will run out of memory @@ -120,6 +123,176 @@ public class TestFileChannel { return channel; } @Test + public void testFailAfterTakeBeforeCommit() throws Throwable { + final FileChannel channel = createFileChannel(); + channel.start(); + final Set eventSet = Sets.newHashSet(); + eventSet.addAll(putEvents(channel, "testTakeFailBeforeCommit", 5, 5)); + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.take(); + channel.take(); + //Simulate multiple sources, so separate thread - txns are thread local, + //so a new txn wont be created here unless it is in a different thread. + Executors.newSingleThreadExecutor().submit(new Runnable() { + @Override + public void run() { + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.take(); + channel.take(); + channel.take(); + } + }).get(); + long lastTake = System.currentTimeMillis(); + File inflightsFile = new File(checkpointDir, "inflighttakes"); + + while (inflightsFile.lastModified() < lastTake) { + Thread.sleep(500); + } + channel.stop(); + //Simulate a sink, so separate thread. + try { + Executors.newSingleThreadExecutor().submit(new Runnable() { + @Override + public void run() { + FileChannel channel = createFileChannel(); + channel.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e; + /* + * Explicitly not put in a loop, so it is easy to find out which + * event became null easily. + */ + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(eventSet.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(eventSet.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(eventSet.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(eventSet.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(eventSet.remove(new String(e.getBody()))); + tx.commit(); + tx.close(); + channel.stop(); + } + }).get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + + } + + @Test + public void testFailAfterPutCheckpointCommit() throws Throwable { + final Set set = Sets.newHashSet(); + final Map overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, "10000"); + final FileChannel channel = createFileChannel(overrides); + channel.start(); + Transaction tx = channel.getTransaction(); + //Initially commit a put to make sure checkpoint is required. + tx.begin(); + channel.put(EventBuilder.withBody(new byte[]{'1', '2'})); + set.add(new String(new byte[]{'1', '2'})); + tx.commit(); + tx.close(); + tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(new byte[]{'a', 'b'})); + set.add(new String(new byte[]{'a', 'b'})); + channel.put(EventBuilder.withBody(new byte[]{'c', 'd'})); + set.add(new String(new byte[]{'c', 'd'})); + channel.put(EventBuilder.withBody(new byte[]{'e', 'f'})); + set.add(new String(new byte[]{'e', 'f'})); + //Simulate multiple sources, so separate thread - txns are thread local, + //so a new txn wont be created here unless it is in a different thread. + final CountDownLatch latch = new CountDownLatch(1); + Executors.newSingleThreadExecutor().submit( + new Runnable() { + @Override + public void run() { + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(new byte[]{'3', '4'})); + channel.put(EventBuilder.withBody(new byte[]{'5', '6'})); + channel.put(EventBuilder.withBody(new byte[]{'7', '8'})); + set.add(new String(new byte[]{'3', '4'})); + set.add(new String(new byte[]{'5', '6'})); + set.add(new String(new byte[]{'7', '8'})); + + try { + latch.await(); + tx.commit(); + } catch (InterruptedException e) { + tx.rollback(); + Throwables.propagate(e); + } finally { + tx.close(); + } + } + }); + long lastPut = System.currentTimeMillis(); + File checkpoint = new File(checkpointDir, "checkpoint"); + while (checkpoint.lastModified() < lastPut) { + Thread.sleep(500); + } + tx.commit(); + tx.close(); + latch.countDown(); + Thread.sleep(2000); + channel.stop(); + + //Simulate a sink, so separate thread. + try { + Executors.newSingleThreadExecutor().submit(new Runnable() { + @Override + public void run() { + FileChannel channel = createFileChannel(); + channel.start(); + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.remove(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.remove(new String(e.getBody()))); + tx.commit(); + tx.close(); + channel.stop(); + } + }).get(); + } catch (ExecutionException e) { + throw e.getCause(); + } + + } + + @Test public void testRestartLogReplayV1() throws Exception { doTestRestart(true); } @@ -130,7 +303,7 @@ public class TestFileChannel { public void doTestRestart(boolean useLogReplayV1) throws Exception { Map overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1, - String.valueOf(useLogReplayV1)); + String.valueOf(useLogReplayV1)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -268,7 +441,7 @@ public class TestFileChannel { Assert.fail(); } catch (ChannelException e) { Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]", - e.getMessage()); + e.getMessage()); } // take an event, roll it back, and // then make sure a put fails @@ -285,7 +458,7 @@ public class TestFileChannel { Assert.fail(); } catch (ChannelException e) { Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]", - e.getMessage()); + e.getMessage()); } // ensure we the events back Assert.assertEquals(5, takeEvents(channel, 1, 5).size()); @@ -316,11 +489,11 @@ public class TestFileChannel { Assert.fail(); } catch (ChannelException e) { Assert.assertEquals("Cannot acquire capacity. [channel="+channel.getName()+"]", - e.getMessage()); + e.getMessage()); } // then do a put which will block but it will be assigned a tx id Future put = Executors.newSingleThreadExecutor() - .submit(new Callable() { + .submit(new Callable() { @Override public String call() throws Exception { List result = putEvents(channel, "blocked-put", 1, 1); @@ -420,11 +593,11 @@ public class TestFileChannel { final CountDownLatch producerStopLatch = new CountDownLatch(numThreads); final CountDownLatch consumerStopLatch = new CountDownLatch(numThreads); final List errors = Collections - .synchronizedList(new ArrayList()); + .synchronizedList(new ArrayList()); final List expected = Collections - .synchronizedList(new ArrayList()); + .synchronizedList(new ArrayList()); final List actual = Collections - .synchronizedList(new ArrayList()); + .synchronizedList(new ArrayList()); for (int i = 0; i < numThreads; i++) { final int id = i; Thread t = new Thread() { @@ -479,9 +652,9 @@ public class TestFileChannel { t.start(); } Assert.assertTrue("Timed out waiting for producers", - producerStopLatch.await(30, TimeUnit.SECONDS)); + producerStopLatch.await(30, TimeUnit.SECONDS)); Assert.assertTrue("Timed out waiting for consumer", - consumerStopLatch.await(30, TimeUnit.SECONDS)); + consumerStopLatch.await(30, TimeUnit.SECONDS)); Assert.assertEquals(Collections.EMPTY_LIST, errors); Collections.sort(expected); Collections.sort(actual); @@ -501,9 +674,9 @@ public class TestFileChannel { // checkpoints and rolls occur during the test Map overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, - String.valueOf(10L * 1000L)); + String.valueOf(10L * 1000L)); overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, - String.valueOf(1024 * 1024 * 5)); + String.valueOf(1024 * 1024 * 5)); // do reconfiguration channel = createFileChannel(overrides); channel.start(); @@ -558,13 +731,13 @@ public class TestFileChannel { */ @Test public void testFileFormatV2postFLUME1432() - throws Exception { + throws Exception { copyDecompressed("fileformat-v2-checkpoint.gz", - new File(checkpointDir, "checkpoint")); + new File(checkpointDir, "checkpoint")); for (int i = 0; i < dataDirs.length; i++) { int fileIndex = i + 1; copyDecompressed("fileformat-v2-log-"+fileIndex+".gz", - new File(dataDirs[i], "log-" + fileIndex)); + new File(dataDirs[i], "log-" + fileIndex)); } Map overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10)); @@ -573,7 +746,7 @@ public class TestFileChannel { Assert.assertTrue(channel.isOpen()); List events = takeEvents(channel, 1); List expected = Arrays.asList(new String[] { - "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691" + "2684", "2685", "2686", "2687", "2688", "2689", "2690", "2691" } ); Assert.assertEquals(expected, events); @@ -584,27 +757,27 @@ public class TestFileChannel { */ @Test public void testFileFormatV2PreFLUME1432LogReplayV1() - throws Exception { + throws Exception { doTestFileFormatV2PreFLUME1432(true); } @Test public void testFileFormatV2PreFLUME1432LogReplayV2() - throws Exception { + throws Exception { doTestFileFormatV2PreFLUME1432(false); } public void doTestFileFormatV2PreFLUME1432(boolean useLogReplayV1) - throws Exception { + throws Exception { copyDecompressed("fileformat-v2-pre-FLUME-1432-checkpoint.gz", - new File(checkpointDir, "checkpoint")); + new File(checkpointDir, "checkpoint")); for (int i = 0; i < dataDirs.length; i++) { int fileIndex = i + 1; - copyDecompressed("fileformat-v2-pre-FLUME-1432-log-"+fileIndex+".gz", - new File(dataDirs[i], "log-" + fileIndex)); + copyDecompressed("fileformat-v2-pre-FLUME-1432-log-" + fileIndex + ".gz", + new File(dataDirs[i], "log-" + fileIndex)); } Map overrides = Maps.newHashMap(); overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000)); overrides.put(FileChannelConfiguration.USE_LOG_REPLAY_V1, - String.valueOf(useLogReplayV1)); + String.valueOf(useLogReplayV1)); channel = createFileChannel(overrides); channel.start(); Assert.assertTrue(channel.isOpen()); @@ -612,20 +785,184 @@ public class TestFileChannel { Assert.assertEquals(50, events.size()); } + /** + * Test contributed by Brock Noland during code review. + * @throws Exception + */ + @Test + public void testTakeTransactionCrossingCheckpoint() throws Exception { + channel = createFileChannel(); + channel.start(); + Assert.assertTrue(channel.isOpen()); + List in = Lists.newArrayList(); + try { + while (true) { + in.addAll(putEvents(channel, "restart", 1, 1)); + } + } catch (ChannelException e) { + Assert.assertEquals("Cannot acquire capacity. [channel=" + + channel.getName() + "]", e.getMessage()); + } + List out = Lists.newArrayList(); + // now take one item off the channel + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = channel.take(); + long takeTime = System.currentTimeMillis(); + Assert.assertNotNull(e); + String s = new String(e.getBody(), Charsets.UTF_8); + out.add(s); + LOG.info("Slow take got " + s); + // sleep so a checkpoint occurs. take is before + // and commit is after the checkpoint + File checkpoint = new File(checkpointDir, "checkpoint"); + while(checkpoint.lastModified() < takeTime){ + TimeUnit.MILLISECONDS.sleep(500); + } + tx.commit(); + tx.close(); + channel.stop(); + channel = createFileChannel(); + channel.start(); + Assert.assertTrue(channel.isOpen()); + // we should not geet the item we took of the queue above + out.addAll(takeEvents(channel, 1, Integer.MAX_VALUE)); + channel.stop(); + Collections.sort(in); + Collections.sort(out); + if (!out.equals(in)) { + List difference = new ArrayList(); + if (in.size() > out.size()) { + LOG.info("The channel shorted us"); + difference.addAll(in); + difference.removeAll(out); + } else { + LOG.info("We got more events than expected, perhaps dups"); + difference.addAll(out); + difference.removeAll(in); + } + LOG.error("difference = " + difference + + ", in.size = " + in.size() + ", out.size = " + out.size()); + Assert.fail(); + } + } + + @Test + public void testPutForceCheckpointCommitReplay() throws Exception{ + Set set = Sets.newHashSet(); + Map overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2)); + FileChannel channel = createFileChannel(overrides); + channel.start(); + //Force a checkpoint by committing a transaction + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(new byte[]{'a','b'})); + set.add(new String(new byte[]{'a','b'})); + tx.commit(); + tx.close(); + tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(new byte[]{'c','d'})); + set.add(new String(new byte[]{'c', 'd'})); + File checkpoint = new File(checkpointDir, "checkpoint"); + long t1 = System.currentTimeMillis(); + while(checkpoint.lastModified() < t1) { + TimeUnit.MILLISECONDS.sleep(500); + if (System.currentTimeMillis() - checkpoint.lastModified() > 15000) { + throw new TimeoutException("Checkpoint did not happen"); + } + } + tx.commit(); + tx.close(); + channel.stop(); + + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + tx = channel.getTransaction(); + tx.begin(); + Event e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.contains(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.contains(new String(e.getBody()))); + tx.commit(); + tx.close(); + channel.stop(); + + } + + @Test + public void testPutCheckpointCommitCheckpointReplay() throws Exception { + Set set = Sets.newHashSet(); + Map overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CAPACITY, String.valueOf(2)); + FileChannel channel = createFileChannel(overrides); + channel.start(); + //Force a checkpoint by committing a transaction + Transaction tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(new byte[]{'a','b'})); + set.add(new String(new byte[]{'a','b'})); + tx.commit(); + tx.close(); + tx = channel.getTransaction(); + tx.begin(); + channel.put(EventBuilder.withBody(new byte[]{'c', 'd'})); + set.add(new String(new byte[]{'c','d'})); + File checkpoint = new File(checkpointDir, "checkpoint"); + long t1 = System.currentTimeMillis(); + while (checkpoint.lastModified() < t1) { + TimeUnit.MILLISECONDS.sleep(500); + if(t1 - checkpoint.lastModified() > 15000){ + throw new TimeoutException("Checkpoint was expected," + + " but did not happen"); + } + } + tx.commit(); + tx.close(); + long t2 = System.currentTimeMillis(); + while(checkpoint.lastModified() < t2){ + TimeUnit.MILLISECONDS.sleep(500); + if (t2 - checkpoint.lastModified() > 15000) { + throw new TimeoutException("Checkpoint was expected, " + + "but did not happen"); + } + } + channel.stop(); + + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + tx = channel.getTransaction(); + tx.begin(); + Event e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.contains(new String(e.getBody()))); + e = channel.take(); + Assert.assertNotNull(e); + Assert.assertTrue(set.contains(new String(e.getBody()))); + tx.commit(); + tx.close(); + channel.stop(); + } + private static void copyDecompressed(String resource, File output) - throws IOException { + throws IOException { URL input = Resources.getResource(resource); long copied = ByteStreams.copy(new GZIPInputStream(input.openStream()), - new FileOutputStream(output)); + new FileOutputStream(output)); LOG.info("Copied " + copied + " bytes from " + input + " to " + output); } private static List takeEvents(Channel channel, - int batchSize) throws Exception { + int batchSize) throws Exception { return takeEvents(channel, batchSize, Integer.MAX_VALUE); } private static List takeEvents(Channel channel, - int batchSize, int numEvents) throws Exception { + int batchSize, int numEvents) throws Exception { List result = Lists.newArrayList(); for (int i = 0; i < numEvents; i += batchSize) { for (int j = 0; j < batchSize; j++) { @@ -650,7 +987,7 @@ public class TestFileChannel { return result; } private static List putEvents(Channel channel, String prefix, - int batchSize, int numEvents) throws Exception { + int batchSize, int numEvents) throws Exception { List result = Lists.newArrayList(); for (int i = 0; i < numEvents; i += batchSize) { for (int j = 0; j < batchSize; j++) { http://git-wip-us.apache.org/repos/asf/flume/blob/3b7612a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java index 569b7c7..d09ddde 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java @@ -18,8 +18,8 @@ */ package org.apache.flume.channel.file; +import com.google.common.collect.SetMultimap; import java.io.File; -import java.io.IOException; import java.util.Set; import org.junit.Assert; @@ -27,55 +27,61 @@ import org.junit.Before; import org.junit.Test; import com.google.common.collect.Sets; +import java.util.Random; +import java.util.concurrent.TimeUnit; public class TestFlumeEventQueue { File file; + File inflightTakes; + File inflightPuts; FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1); FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2); FlumeEventQueue queue; @Before public void setup() throws Exception { file = File.createTempFile("Checkpoint", ""); + inflightTakes = File.createTempFile("inflighttakes", ""); + inflightPuts = File.createTempFile("inflightputs", ""); } @Test - public void testQueueIsEmptyAfterCreation() throws IOException { - queue = new FlumeEventQueue(1000, file, "test"); - Assert.assertNull(queue.removeHead()); + public void testQueueIsEmptyAfterCreation() throws Exception { + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); + Assert.assertNull(queue.removeHead(0)); } @Test public void testCapacity() throws Exception { - queue = new FlumeEventQueue(1, file, "test"); + queue = new FlumeEventQueue(1, file, inflightTakes, inflightPuts,"test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertFalse(queue.addTail(pointer2)); } @Test(expected=IllegalArgumentException.class) public void testInvalidCapacityZero() throws Exception { - queue = new FlumeEventQueue(0, file, "test"); + queue = new FlumeEventQueue(0, file, inflightTakes, inflightPuts,"test"); } @Test(expected=IllegalArgumentException.class) public void testInvalidCapacityNegative() throws Exception { - queue = new FlumeEventQueue(-1, file, "test"); + queue = new FlumeEventQueue(-1, file, inflightTakes, inflightPuts,"test"); } @Test public void addTail1() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); Assert.assertTrue(queue.addTail(pointer1)); - Assert.assertEquals(pointer1, queue.removeHead()); + Assert.assertEquals(pointer1, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } @Test public void addTail2() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); - Assert.assertEquals(pointer1, queue.removeHead()); + Assert.assertEquals(pointer1, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(2), queue.getFileIDs()); } @Test public void addTailLarge() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); int size = 500; Set fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -84,7 +90,7 @@ public class TestFlumeEventQueue { Assert.assertEquals(fileIDs, queue.getFileIDs()); } for (int i = 1; i <= size; i++) { - Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead()); + Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead(0)); fileIDs.remove(i); Assert.assertEquals(fileIDs, queue.getFileIDs()); } @@ -92,24 +98,24 @@ public class TestFlumeEventQueue { } @Test public void addHead1() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); - Assert.assertEquals(pointer1, queue.removeHead()); + Assert.assertEquals(pointer1, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } @Test public void addHead2() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); - Assert.assertEquals(pointer2, queue.removeHead()); + Assert.assertEquals(pointer2, queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); } @Test public void addHeadLarge() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); int size = 500; Set fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -118,7 +124,7 @@ public class TestFlumeEventQueue { Assert.assertEquals(fileIDs, queue.getFileIDs()); } for (int i = size; i > 0; i--) { - Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead()); + Assert.assertEquals(new FlumeEventPointer(i, i), queue.removeHead(0)); fileIDs.remove(i); Assert.assertEquals(fileIDs, queue.getFileIDs()); } @@ -126,42 +132,42 @@ public class TestFlumeEventQueue { } @Test public void addTailRemove1() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertTrue(queue.remove(pointer1)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); - Assert.assertNull(queue.removeHead()); + Assert.assertNull(queue.removeHead(0)); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } @Test public void addTailRemove2() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertTrue(queue.remove(pointer1)); - Assert.assertEquals(pointer2, queue.removeHead()); + Assert.assertEquals(pointer2, queue.removeHead(0)); } @Test public void addHeadRemove1() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); queue.addHead(pointer1); Assert.assertTrue(queue.remove(pointer1)); - Assert.assertNull(queue.removeHead()); + Assert.assertNull(queue.removeHead(0)); } @Test public void addHeadRemove2() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertTrue(queue.remove(pointer1)); - Assert.assertEquals(pointer2, queue.removeHead()); + Assert.assertEquals(pointer2, queue.removeHead(0)); } @Test public void testWrappingCorrectly() throws Exception { - queue = new FlumeEventQueue(1000, file, "test"); + queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test"); int size = Integer.MAX_VALUE; for (int i = 1; i <= size; i++) { if(!queue.addHead(new FlumeEventPointer(i, i))) { @@ -169,7 +175,7 @@ public class TestFlumeEventQueue { } } for (int i = queue.getSize()/2; i > 0; i--) { - Assert.assertNotNull(queue.removeHead()); + Assert.assertNotNull(queue.removeHead(0)); } // addHead below would throw an IndexOOBounds with // bad version of FlumeEventQueue.convert @@ -179,4 +185,48 @@ public class TestFlumeEventQueue { } } } + @Test + public void testInflightPuts() throws Exception{ + queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test"); + long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); + long txnID2 = txnID1 + 1; + queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1); + queue.addWithoutCommit(new FlumeEventPointer(2, 1), txnID1); + queue.addWithoutCommit(new FlumeEventPointer(2, 2), txnID2); + queue.checkpoint(true); + TimeUnit.SECONDS.sleep(3L); + queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test"); + SetMultimap deserializedMap = queue.deserializeInflightPuts(); + Assert.assertTrue(deserializedMap.get( + txnID1).contains(new FlumeEventPointer(1, 1).toLong())); + Assert.assertTrue(deserializedMap.get( + txnID1).contains(new FlumeEventPointer(2, 1).toLong())); + Assert.assertTrue(deserializedMap.get( + txnID2).contains(new FlumeEventPointer(2, 2).toLong())); + } + + @Test + public void testInflightTakes() throws Exception { + queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test"); + long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1); + long txnID2 = txnID1 + 1; + queue.addTail(new FlumeEventPointer(1, 1)); + queue.addTail(new FlumeEventPointer(2, 1)); + queue.addTail(new FlumeEventPointer(2, 2)); + queue.removeHead(txnID1); + queue.removeHead(txnID2); + queue.removeHead(txnID2); + queue.checkpoint(true); + TimeUnit.SECONDS.sleep(3L); + queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test"); + SetMultimap deserializedMap = queue.deserializeInflightTakes(); + Assert.assertTrue(deserializedMap.get( + txnID1).contains(new FlumeEventPointer(1, 1).toLong())); + Assert.assertTrue(deserializedMap.get( + txnID2).contains(new FlumeEventPointer(2, 1).toLong())); + Assert.assertTrue(deserializedMap.get( + txnID2).contains(new FlumeEventPointer(2, 2).toLong())); + + } } + http://git-wip-us.apache.org/repos/asf/flume/blob/3b7612a3/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index e0b5e3f..e923a30 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -140,7 +140,7 @@ public class TestLog { dataDirs).setChannelName("testlog").build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); - Assert.assertNull(queue.removeHead()); + Assert.assertNull(queue.removeHead(transactionID)); } /** @@ -164,7 +164,7 @@ public class TestLog { .setChannelName("testlog").build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); - Assert.assertNull(queue.removeHead()); + Assert.assertNull(queue.removeHead(0)); } /** @@ -212,7 +212,7 @@ public class TestLog { .setChannelName("testlog").build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); - FlumeEventPointer eventPointerOut = queue.removeHead(); + FlumeEventPointer eventPointerOut = queue.removeHead(0); Assert.assertNull(eventPointerOut); } @@ -228,7 +228,7 @@ public class TestLog { .setChannelName("testlog").build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); - FlumeEventPointer eventPointerOut = queue.removeHead(); + FlumeEventPointer eventPointerOut = queue.removeHead(0); Assert.assertNull(eventPointerOut); } @@ -244,16 +244,16 @@ public class TestLog { .setChannelName("testlog").build(); log.replay(); FlumeEventQueue queue = log.getFlumeEventQueue(); - FlumeEventPointer eventPointerOut = queue.removeHead(); + FlumeEventPointer eventPointerOut = queue.removeHead(0); Assert.assertNull(eventPointerOut); } private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn) throws IOException, InterruptedException { FlumeEventQueue queue = log.getFlumeEventQueue(); - FlumeEventPointer eventPointerOut = queue.removeHead(); + FlumeEventPointer eventPointerOut = queue.removeHead(0); Assert.assertNotNull(eventPointerOut); - Assert.assertNull(queue.removeHead()); + Assert.assertNull(queue.removeHead(0)); Assert.assertEquals(eventPointerIn, eventPointerOut); Assert.assertEquals(eventPointerIn.hashCode(), eventPointerOut.hashCode()); FlumeEvent eventOut = log.get(eventPointerOut);