Return-Path: X-Original-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ABC9DD6BF for ; Tue, 26 Jun 2012 09:26:54 +0000 (UTC) Received: (qmail 49655 invoked by uid 500); 26 Jun 2012 09:26:54 -0000 Delivered-To: apmail-incubator-flume-commits-archive@incubator.apache.org Received: (qmail 49587 invoked by uid 500); 26 Jun 2012 09:26:53 -0000 Mailing-List: contact flume-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-dev@incubator.apache.org Delivered-To: mailing list flume-commits@incubator.apache.org Received: (qmail 49561 invoked by uid 99); 26 Jun 2012 09:26:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jun 2012 09:26:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jun 2012 09:26:50 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E19E62388847; Tue, 26 Jun 2012 09:26:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1353888 - in /incubator/flume/trunk/flume-ng-channels/flume-file-channel/src: main/java/org/apache/flume/channel/file/ test/java/org/apache/flume/channel/file/ Date: Tue, 26 Jun 2012 09:26:29 -0000 To: flume-commits@incubator.apache.org From: mpercy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120626092629.E19E62388847@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: mpercy Date: Tue Jun 26 09:26:24 2012 New Revision: 1353888 URL: http://svn.apache.org/viewvc?rev=1353888&view=rev Log: FLUME-1320. Add safeguard for checkpoint corruption detection. (Arvind Prabhakar via Mike Percy) Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1353888&r1=1353887&r2=1353888&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java (original) +++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java Tue Jun 26 09:26:24 2012 @@ -84,6 +84,13 @@ public class FileChannel extends BasicCh private final ThreadLocal transactions = new ThreadLocal(); private int logWriteTimeout; + private String channelNameDescriptor = "[channel=unknown]"; + + @Override + public synchronized void setName(String name) { + channelNameDescriptor = "[channel=" + name + "]"; + super.setName(name); + } /** * Transaction IDs should unique within a file channel @@ -204,8 +211,9 @@ public class FileChannel extends BasicCh int depth = getDepth(); Preconditions.checkState(queueRemaining.tryAcquire(depth), - "Unable to acquire " + depth + " permits"); - LOG.info("Queue Size after replay: " + depth); + "Unable to acquire " + depth + " permits " + channelNameDescriptor); + LOG.info("Queue Size after replay: " + depth + + channelNameDescriptor); // shutdown hook flushes all data to disk and closes // file descriptors along with setting all closed flags if(!shutdownHookAdded) { @@ -252,21 +260,21 @@ public class FileChannel extends BasicCh @Override protected BasicTransactionSemantics createTransaction() { - Preconditions.checkState(open, "Channel closed"); + Preconditions.checkState(open, "Channel closed" + channelNameDescriptor); FileBackedTransaction trans = transactions.get(); if(trans != null && !trans.isClosed()) { Preconditions.checkState(false, "Thread has transaction which is still open: " + - trans.getStateAsString()); + trans.getStateAsString() + channelNameDescriptor); } trans = new FileBackedTransaction(log, TRANSACTION_ID.incrementAndGet(), - transactionCapacity, keepAlive, queueRemaining); + transactionCapacity, keepAlive, queueRemaining, getName()); transactions.set(trans); return trans; } int getDepth() { - Preconditions.checkState(open, "Channel closed"); + Preconditions.checkState(open, "Channel closed" + channelNameDescriptor); Preconditions.checkNotNull(log, "log"); FlumeEventQueue queue = log.getFlumeEventQueue(); Preconditions.checkNotNull(queue, "queue"); @@ -297,8 +305,10 @@ public class FileChannel extends BasicCh private final Log log; private final FlumeEventQueue queue; private final Semaphore queueRemaining; + private final String channelNameDescriptor; public FileBackedTransaction(Log log, long transactionID, - int transCapacity, int keepAlive, Semaphore queueRemaining) { + int transCapacity, int keepAlive, Semaphore queueRemaining, + String name) { this.log = log; queue = log.getFlumeEventQueue(); this.transactionID = transactionID; @@ -306,6 +316,7 @@ public class FileChannel extends BasicCh this.queueRemaining = queueRemaining; putList = new LinkedBlockingDeque(transCapacity); takeList = new LinkedBlockingDeque(transCapacity); + channelNameDescriptor = "[channel=" + name + "]"; } private boolean isClosed() { return State.CLOSED.equals(getState()); @@ -319,16 +330,19 @@ public class FileChannel extends BasicCh throw new ChannelException("Put queue for FileBackedTransaction " + "of capacity " + putList.size() + " full, consider " + "committing more frequently, increasing capacity or " + - "increasing thread count"); + "increasing thread count. " + channelNameDescriptor); } if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) { - throw new ChannelException("Cannot acquire capacity"); + throw new ChannelException("Cannot acquire capacity. " + + channelNameDescriptor); } try { FlumeEventPointer ptr = log.put(transactionID, event); - Preconditions.checkState(putList.offer(ptr)); + Preconditions.checkState(putList.offer(ptr), "putList offer failed " + + channelNameDescriptor); } catch (IOException e) { - throw new ChannelException("Put failed due to IO error", e); + throw new ChannelException("Put failed due to IO error " + + channelNameDescriptor, e); } } @@ -337,19 +351,22 @@ public class FileChannel extends BasicCh if(takeList.remainingCapacity() == 0) { throw new ChannelException("Take list for FileBackedTransaction, capacity " + takeList.size() + " full, consider committing more frequently, " + - "increasing capacity, or increasing thread count"); + "increasing capacity, or increasing thread count. " + + channelNameDescriptor); } FlumeEventPointer ptr = queue.removeHead(); if(ptr != null) { try { // first add to takeList so that if write to disk // fails rollback actually does it's work - Preconditions.checkState(takeList.offer(ptr)); + Preconditions.checkState(takeList.offer(ptr), "takeList offer failed " + + channelNameDescriptor); log.take(transactionID, ptr); // write take to disk Event event = log.get(ptr); return event; } catch (IOException e) { - throw new ChannelException("Take failed due to IO error", e); + throw new ChannelException("Take failed due to IO error " + + channelNameDescriptor, e); } } return null; @@ -360,7 +377,8 @@ public class FileChannel extends BasicCh int puts = putList.size(); int takes = takeList.size(); if(puts > 0) { - Preconditions.checkState(takes == 0); + Preconditions.checkState(takes == 0, "nonzero puts and takes " + + channelNameDescriptor); synchronized (queue) { while(!putList.isEmpty()) { if(!queue.addTail(putList.removeFirst())) { @@ -370,6 +388,7 @@ public class FileChannel extends BasicCh msg.append("added to the queue but the remaining portion "); msg.append("cannot be added. Those messages will be consumed "); msg.append("despite this transaction failing. Please report."); + msg.append(channelNameDescriptor); LOG.error(msg.toString()); Preconditions.checkState(false, msg.toString()); } @@ -378,13 +397,15 @@ public class FileChannel extends BasicCh try { log.commitPut(transactionID); } catch (IOException e) { - throw new ChannelException("Commit failed due to IO error", e); + throw new ChannelException("Commit failed due to IO error " + + channelNameDescriptor, e); } } else if(takes > 0) { try { log.commitTake(transactionID); } catch (IOException e) { - throw new ChannelException("Commit failed due to IO error", e); + throw new ChannelException("Commit failed due to IO error " + + channelNameDescriptor, e); } queueRemaining.release(takes); } @@ -397,17 +418,20 @@ public class FileChannel extends BasicCh int puts = putList.size(); int takes = takeList.size(); if(takes > 0) { - Preconditions.checkState(puts == 0); + Preconditions.checkState(puts == 0, "nonzero puts and takes " + + channelNameDescriptor); while(!takeList.isEmpty()) { Preconditions.checkState(queue.addHead(takeList.removeLast()), - "Queue add failed, this shouldn't be able to happen"); + "Queue add failed, this shouldn't be able to happen " + + channelNameDescriptor); } } queueRemaining.release(puts); try { log.rollback(transactionID); } catch (IOException e) { - throw new ChannelException("Commit failed due to IO error", e); + throw new ChannelException("Commit failed due to IO error " + + channelNameDescriptor, e); } putList.clear(); takeList.clear(); Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java?rev=1353888&r1=1353887&r2=1353888&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java (original) +++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java Tue Jun 26 09:26:24 2012 @@ -57,9 +57,12 @@ class FlumeEventQueue { private static final int INDEX_TIMESTAMP = 1; private static final int INDEX_SIZE = 2; private static final int INDEX_HEAD = 3; - private static final int INDEX_ACTIVE_LOG = 4; + private static final int INDEX_CHECKPOINT_MARKER = 4; + private static final int CHECKPOINT_COMPLETE = EMPTY; + private static final int CHECKPOINT_INCOMPLETE = 1; + private static final int INDEX_ACTIVE_LOG = 5; private static final int MAX_ACTIVE_LOGS = 1024; - private static final int HEADER_SIZE = 1028; + private static final int HEADER_SIZE = 1029; private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB private final Map fileIDCounts = Maps.newHashMap(); private final MappedByteBuffer mappedBuffer; @@ -68,6 +71,7 @@ class FlumeEventQueue { private final RandomAccessFile checkpointFile; private final java.nio.channels.FileChannel checkpointFileHandle; private final int queueCapacity; + private final String channelNameDescriptor; private int queueSize; private int queueHead; @@ -77,14 +81,15 @@ class FlumeEventQueue { * @param capacity max event capacity of queue * @throws IOException */ - FlumeEventQueue(int capacity, File file) throws IOException { + FlumeEventQueue(int capacity, File file, String name) throws IOException { Preconditions.checkArgument(capacity > 0, "Capacity must be greater than zero"); + this.channelNameDescriptor = "[channel=" + name + "]"; this.queueCapacity = capacity; if (!file.exists()) { Preconditions.checkState(file.createNewFile(), "Unable to create file: " - + file); + + file.getCanonicalPath() + " " + channelNameDescriptor); } boolean freshlyAllocated = false; @@ -115,7 +120,8 @@ class FlumeEventQueue { int expectedCapacity = capacity + HEADER_SIZE; Preconditions.checkState(fileCapacity == expectedCapacity, - "Capacity cannot be reduced once the channel is initialized"); + "Capacity cannot be reduced once the channel is initialized " + + channelNameDescriptor); } checkpointFileHandle = checkpointFile.getChannel(); @@ -129,11 +135,18 @@ class FlumeEventQueue { } else { int version = (int) elementsBuffer.get(INDEX_VERSION); Preconditions.checkState(version == VERSION, - "Invalid version: " + version); + "Invalid version: " + version + channelNameDescriptor); timestamp = elementsBuffer.get(INDEX_TIMESTAMP); queueSize = (int) elementsBuffer.get(INDEX_SIZE); queueHead = (int) elementsBuffer.get(INDEX_HEAD); + long checkpointComplete = + (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER); + Preconditions.checkState(checkpointComplete == CHECKPOINT_COMPLETE, + "The last checkpoint was not completed correctly. Please delete " + + "the checkpoint file: " + file.getCanonicalPath() + " to rebuild " + + "the checkpoint and start again. " + channelNameDescriptor); + int indexMaxLog = INDEX_ACTIVE_LOG + MAX_ACTIVE_LOGS; for (int i = INDEX_ACTIVE_LOG; i < indexMaxLog; i++) { long nextFileCode = elementsBuffer.get(i); @@ -146,7 +159,7 @@ class FlumeEventQueue { } } - elements = new LongBufferWrapper(elementsBuffer); + elements = new LongBufferWrapper(elementsBuffer, channelNameDescriptor); } private Pair deocodeActiveLogCounter(long value) { @@ -173,6 +186,9 @@ class FlumeEventQueue { return false; } + // Start checkpoint + elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE); + updateHeaders(); List fileIdAndCountEncoded = new ArrayList(); @@ -191,6 +207,9 @@ class FlumeEventQueue { } elements.sync(); + + // Finish checkpoint + elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE); mappedBuffer.force(); return true; @@ -207,7 +226,8 @@ class FlumeEventQueue { } long value = remove(0); - Preconditions.checkState(value != EMPTY); + Preconditions.checkState(value != EMPTY, "Empty value " + + channelNameDescriptor); FlumeEventPointer ptr = FlumeEventPointer.fromLong(value); decrementFileID(ptr.getFileID()); @@ -288,7 +308,7 @@ class FlumeEventQueue { AtomicInteger counter = fileIDCounts.get(fileID); if(counter == null) { Preconditions.checkState(fileIDCounts.size() < MAX_ACTIVE_LOGS, - "Too many active logs"); + "Too many active logs " + channelNameDescriptor); counter = new AtomicInteger(0); fileIDCounts.put(fileID, counter); } @@ -297,7 +317,8 @@ class FlumeEventQueue { protected void decrementFileID(int fileID) { AtomicInteger counter = fileIDCounts.get(fileID); - Preconditions.checkState(counter != null); + Preconditions.checkState(counter != null, "null counter " + + channelNameDescriptor); int count = counter.decrementAndGet(); if(count == 0) { fileIDCounts.remove(fileID); @@ -306,7 +327,8 @@ class FlumeEventQueue { protected long get(int index) { if (index < 0 || index > queueSize - 1) { - throw new IndexOutOfBoundsException(String.valueOf(index)); + throw new IndexOutOfBoundsException(String.valueOf(index) + + channelNameDescriptor); } return elements.get(getPhysicalIndex(index)); @@ -314,7 +336,8 @@ class FlumeEventQueue { private void set(int index, long value) { if (index < 0 || index > queueSize - 1) { - throw new IndexOutOfBoundsException(String.valueOf(index)); + throw new IndexOutOfBoundsException(String.valueOf(index) + + channelNameDescriptor); } elements.put(getPhysicalIndex(index), value); @@ -322,7 +345,8 @@ class FlumeEventQueue { protected boolean add(int index, long value) { if (index < 0 || index > queueSize) { - throw new IndexOutOfBoundsException(String.valueOf(index)); + throw new IndexOutOfBoundsException(String.valueOf(index) + + channelNameDescriptor); } if (queueSize == queueCapacity) { @@ -352,7 +376,8 @@ class FlumeEventQueue { protected synchronized long remove(int index) { if (index < 0 || index > queueSize - 1) { - throw new IndexOutOfBoundsException(String.valueOf(index)); + throw new IndexOutOfBoundsException(String.valueOf(index) + + channelNameDescriptor); } long value = get(index); @@ -387,7 +412,7 @@ class FlumeEventQueue { elementsBuffer.put(INDEX_HEAD, queueHead); if (LOG.isDebugEnabled()) { LOG.debug("Updating checkpoint headers: ts: " + timestamp + ", qs: " - + queueSize + ", qh: " + queueHead); + + queueSize + ", qh: " + queueHead + " " + channelNameDescriptor); } } @@ -409,11 +434,13 @@ class FlumeEventQueue { static class LongBufferWrapper { private final LongBuffer buffer; + private final String channelNameDescriptor; Map overwriteMap = new HashMap(); - LongBufferWrapper(LongBuffer lb) { + LongBufferWrapper(LongBuffer lb, String nameDescriptor) { buffer = lb; + channelNameDescriptor = nameDescriptor; } long get(int index) { @@ -446,7 +473,7 @@ class FlumeEventQueue { } Preconditions.checkState(overwriteMap.size() == 0, - "concurrent update detected"); + "concurrent update detected " + channelNameDescriptor); } } } Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java?rev=1353888&r1=1353887&r2=1353888&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java (original) +++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java Tue Jun 26 09:26:24 2012 @@ -79,6 +79,8 @@ class Log { private final ReadLock checkpointReadLock = checkpointLock.readLock(); private final WriteLock checkpointWriterLock = checkpointLock.writeLock(); private int logWriteTimeout; + private final String channelName; + private final String channelNameDescriptor; static class Builder { private long bCheckpointInterval; @@ -88,6 +90,7 @@ class Log { private File[] bLogDirs; private int bLogWriteTimeout = FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT; + private String bName; Builder setCheckpointInterval(long interval) { bCheckpointInterval = interval; @@ -119,14 +122,19 @@ class Log { return this; } + Builder setChannelName(String name) { + bName = name; + return this; + } + Log build() throws IOException { return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity, - bLogWriteTimeout, bCheckpointDir, bLogDirs); + bLogWriteTimeout, bCheckpointDir, bName, bLogDirs); } } private Log(long checkpointInterval, long maxFileSize, int queueCapacity, - int logWriteTimeout, File checkpointDir, File... logDirs) + int logWriteTimeout, File checkpointDir, String name, File... logDirs) throws IOException { Preconditions.checkArgument(checkpointInterval > 0, "checkpointInterval <= 0"); @@ -138,6 +146,9 @@ class Log { + checkpointDir + " could not be created"); Preconditions.checkNotNull(logDirs, "logDirs"); Preconditions.checkArgument(logDirs.length > 0, "logDirs empty"); + this.channelName = name; + this.channelNameDescriptor = "[channel=" + name + "]"; + for (File logDir : logDirs) { Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(), "LogDir " + logDir + " could not be created"); @@ -214,7 +225,7 @@ class Log { * locations. We will read the last one written to disk. */ queue = new FlumeEventQueue(queueCapacity, - new File(checkpointDir, "checkpoint")); + new File(checkpointDir, "checkpoint"), channelName); long ts = queue.getTimestamp(); LOGGER.info("Last Checkpoint " + new Date(ts) + @@ -324,7 +335,7 @@ class Log { if (!lockAcquired) { throw new IOException("Failed to obtain lock for writing to the log. " + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0."); + + "setting it to 0. " + channelNameDescriptor); } try { @@ -377,7 +388,7 @@ class Log { if (!lockAcquired) { throw new IOException("Failed to obtain lock for writing to the log. " + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0."); + + "setting it to 0. " + channelNameDescriptor); } try { @@ -426,7 +437,7 @@ class Log { if (!lockAcquired) { throw new IOException("Failed to obtain lock for writing to the log. " + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0."); + + "setting it to 0. "+ channelNameDescriptor); } if(LOGGER.isDebugEnabled()) { @@ -561,7 +572,7 @@ class Log { if (!lockAcquired) { throw new IOException("Failed to obtain lock for writing to the log. " + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0."); + + "setting it to 0. " + channelNameDescriptor); } try { @@ -629,7 +640,7 @@ class Log { if (!lockAcquired) { throw new IOException("Failed to obtain lock for writing to the log. " + "Try increasing the log write timeout value or disabling it by " - + "setting it to 0."); + + "setting it to 0. "+ channelNameDescriptor); } try { @@ -762,7 +773,8 @@ class Log { FileLock lock = tryLock(dir); if (lock == null) { String msg = "Cannot lock " + dir - + ". The directory is already locked."; + + ". The directory is already locked. " + + channelNameDescriptor; LOGGER.info(msg); throw new IOException(msg); } Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java?rev=1353888&r1=1353887&r2=1353888&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java (original) +++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java Tue Jun 26 09:26:24 2012 @@ -302,7 +302,8 @@ class LogFile { version = fileHandle.readInt(); if(version != VERSION) { throw new IOException("Version is " + Integer.toHexString(version) + - " expected " + Integer.toHexString(VERSION)); + " expected " + Integer.toHexString(VERSION) + + " file: " + file.getCanonicalPath()); } logFileID = fileHandle.readInt(); lastCheckpointPosition = fileHandle.readLong(); Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java?rev=1353888&r1=1353887&r2=1353888&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java (original) +++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java Tue Jun 26 09:26:24 2012 @@ -43,12 +43,12 @@ public class TestCheckpoint { @Test public void testSerialization() throws IOException { FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20); - FlumeEventQueue queueIn = new FlumeEventQueue(1, file); + FlumeEventQueue queueIn = new FlumeEventQueue(1, file, "test"); queueIn.addHead(ptrIn); - FlumeEventQueue queueOut = new FlumeEventQueue(1, file); + FlumeEventQueue queueOut = new FlumeEventQueue(1, file, "test"); Assert.assertEquals(0, queueOut.getTimestamp()); queueIn.checkpoint(false); - FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file); + FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file, "test"); FlumeEventPointer ptrOut = queueOut2.removeHead(); Assert.assertEquals(ptrIn, ptrOut); Assert.assertTrue(queueOut2.getTimestamp() > 0); Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java?rev=1353888&r1=1353887&r2=1353888&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java (original) +++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java Tue Jun 26 09:26:24 2012 @@ -105,7 +105,8 @@ public class TestFileChannel { in.addAll(putEvents(channel, "restart", 1, 1)); } } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity", e.getMessage()); + Assert.assertEquals("Cannot acquire capacity. [channel=null]", + e.getMessage()); } channel.stop(); channel = createFileChannel(); @@ -122,7 +123,8 @@ public class TestFileChannel { in.addAll(putEvents(channel, "restart", 1, 1)); } } catch (ChannelException e) { - Assert.assertEquals("Cannot acquire capacity", e.getMessage()); + Assert.assertEquals("Cannot acquire capacity. [channel=null]", + e.getMessage()); } Configurables.configure(channel, context); List out = takeEvents(channel, 1, Integer.MAX_VALUE); Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java?rev=1353888&r1=1353887&r2=1353888&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java (original) +++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java Tue Jun 26 09:26:24 2012 @@ -40,33 +40,33 @@ public class TestFlumeEventQueue { } @Test public void testQueueIsEmptyAfterCreation() throws IOException { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); Assert.assertNull(queue.removeHead()); } @Test public void testCapacity() throws Exception { - queue = new FlumeEventQueue(1, file); + queue = new FlumeEventQueue(1, file, "test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertFalse(queue.addTail(pointer2)); } @Test(expected=IllegalArgumentException.class) public void testInvalidCapacityZero() throws Exception { - queue = new FlumeEventQueue(0, file); + queue = new FlumeEventQueue(0, file, "test"); } @Test(expected=IllegalArgumentException.class) public void testInvalidCapacityNegative() throws Exception { - queue = new FlumeEventQueue(-1, file); + queue = new FlumeEventQueue(-1, file, "test"); } @Test public void addTail1() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(pointer1, queue.removeHead()); Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs()); } @Test public void addTail2() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); @@ -75,7 +75,7 @@ public class TestFlumeEventQueue { } @Test public void addTailLarge() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); int size = 500; Set fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -92,7 +92,7 @@ public class TestFlumeEventQueue { } @Test public void addHead1() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertEquals(pointer1, queue.removeHead()); @@ -100,7 +100,7 @@ public class TestFlumeEventQueue { } @Test public void addHead2() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs()); @@ -109,7 +109,7 @@ public class TestFlumeEventQueue { } @Test public void addHeadLarge() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); int size = 500; Set fileIDs = Sets.newHashSet(); for (int i = 1; i <= size; i++) { @@ -126,7 +126,7 @@ public class TestFlumeEventQueue { } @Test public void addTailRemove1() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs()); Assert.assertTrue(queue.remove(pointer1)); @@ -137,7 +137,7 @@ public class TestFlumeEventQueue { @Test public void addTailRemove2() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); Assert.assertTrue(queue.addTail(pointer1)); Assert.assertTrue(queue.addTail(pointer2)); Assert.assertTrue(queue.remove(pointer1)); @@ -146,14 +146,14 @@ public class TestFlumeEventQueue { @Test public void addHeadRemove1() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); queue.addHead(pointer1); Assert.assertTrue(queue.remove(pointer1)); Assert.assertNull(queue.removeHead()); } @Test public void addHeadRemove2() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); Assert.assertTrue(queue.addHead(pointer1)); Assert.assertTrue(queue.addHead(pointer2)); Assert.assertTrue(queue.remove(pointer1)); @@ -161,7 +161,7 @@ public class TestFlumeEventQueue { } @Test public void testWrappingCorrectly() throws Exception { - queue = new FlumeEventQueue(1000, file); + queue = new FlumeEventQueue(1000, file, "test"); int size = Integer.MAX_VALUE; for (int i = 1; i <= size; i++) { if(!queue.addHead(new FlumeEventPointer(i, i))) {