flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject svn commit: r1353888 - in /incubator/flume/trunk/flume-ng-channels/flume-file-channel/src: main/java/org/apache/flume/channel/file/ test/java/org/apache/flume/channel/file/
Date Tue, 26 Jun 2012 09:26:29 GMT
Author: mpercy
Date: Tue Jun 26 09:26:24 2012
New Revision: 1353888

URL: http://svn.apache.org/viewvc?rev=1353888&view=rev
Log:
FLUME-1320. Add safeguard for checkpoint corruption detection.

(Arvind Prabhakar via Mike Percy)

Modified:
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
    incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1353888&r1=1353887&r2=1353888&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
Tue Jun 26 09:26:24 2012
@@ -84,6 +84,13 @@ public class FileChannel extends BasicCh
   private final ThreadLocal<FileBackedTransaction> transactions =
       new ThreadLocal<FileBackedTransaction>();
   private int logWriteTimeout;
+  private String channelNameDescriptor = "[channel=unknown]";
+
+  @Override
+  public synchronized void setName(String name) {
+    channelNameDescriptor = "[channel=" + name + "]";
+    super.setName(name);
+  }
 
   /**
    * Transaction IDs should unique within a file channel
@@ -204,8 +211,9 @@ public class FileChannel extends BasicCh
 
       int depth = getDepth();
       Preconditions.checkState(queueRemaining.tryAcquire(depth),
-          "Unable to acquire " + depth + " permits");
-      LOG.info("Queue Size after replay: " + depth);
+          "Unable to acquire " + depth + " permits " + channelNameDescriptor);
+      LOG.info("Queue Size after replay: " + depth
+           + channelNameDescriptor);
       // shutdown hook flushes all data to disk and closes
       // file descriptors along with setting all closed flags
       if(!shutdownHookAdded) {
@@ -252,21 +260,21 @@ public class FileChannel extends BasicCh
 
   @Override
   protected BasicTransactionSemantics createTransaction() {
-    Preconditions.checkState(open, "Channel closed");
+    Preconditions.checkState(open, "Channel closed"  + channelNameDescriptor);
     FileBackedTransaction trans = transactions.get();
     if(trans != null && !trans.isClosed()) {
       Preconditions.checkState(false,
           "Thread has transaction which is still open: " +
-              trans.getStateAsString());
+              trans.getStateAsString()  + channelNameDescriptor);
     }
     trans = new FileBackedTransaction(log, TRANSACTION_ID.incrementAndGet(),
-        transactionCapacity, keepAlive, queueRemaining);
+        transactionCapacity, keepAlive, queueRemaining, getName());
     transactions.set(trans);
     return trans;
   }
 
   int getDepth() {
-    Preconditions.checkState(open, "Channel closed");
+    Preconditions.checkState(open, "Channel closed"  + channelNameDescriptor);
     Preconditions.checkNotNull(log, "log");
     FlumeEventQueue queue = log.getFlumeEventQueue();
     Preconditions.checkNotNull(queue, "queue");
@@ -297,8 +305,10 @@ public class FileChannel extends BasicCh
     private final Log log;
     private final FlumeEventQueue queue;
     private final Semaphore queueRemaining;
+    private final String channelNameDescriptor;
     public FileBackedTransaction(Log log, long transactionID,
-        int transCapacity, int keepAlive, Semaphore queueRemaining) {
+        int transCapacity, int keepAlive, Semaphore queueRemaining,
+        String name) {
       this.log = log;
       queue = log.getFlumeEventQueue();
       this.transactionID = transactionID;
@@ -306,6 +316,7 @@ public class FileChannel extends BasicCh
       this.queueRemaining = queueRemaining;
       putList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
       takeList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
+      channelNameDescriptor = "[channel=" + name + "]";
     }
     private boolean isClosed() {
       return State.CLOSED.equals(getState());
@@ -319,16 +330,19 @@ public class FileChannel extends BasicCh
         throw new ChannelException("Put queue for FileBackedTransaction " +
             "of capacity " + putList.size() + " full, consider " +
             "committing more frequently, increasing capacity or " +
-            "increasing thread count");
+            "increasing thread count. " + channelNameDescriptor);
       }
       if(!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
-        throw new ChannelException("Cannot acquire capacity");
+        throw new ChannelException("Cannot acquire capacity. "
+            + channelNameDescriptor);
       }
       try {
         FlumeEventPointer ptr = log.put(transactionID, event);
-        Preconditions.checkState(putList.offer(ptr));
+        Preconditions.checkState(putList.offer(ptr), "putList offer failed "
+             + channelNameDescriptor);
       } catch (IOException e) {
-        throw new ChannelException("Put failed due to IO error", e);
+        throw new ChannelException("Put failed due to IO error "
+                + channelNameDescriptor, e);
       }
     }
 
@@ -337,19 +351,22 @@ public class FileChannel extends BasicCh
       if(takeList.remainingCapacity() == 0) {
         throw new ChannelException("Take list for FileBackedTransaction, capacity " +
             takeList.size() + " full, consider committing more frequently, " +
-            "increasing capacity, or increasing thread count");
+            "increasing capacity, or increasing thread count. "
+               + channelNameDescriptor);
       }
       FlumeEventPointer ptr = queue.removeHead();
       if(ptr != null) {
         try {
           // first add to takeList so that if write to disk
           // fails rollback actually does it's work
-          Preconditions.checkState(takeList.offer(ptr));
+          Preconditions.checkState(takeList.offer(ptr), "takeList offer failed "
+               + channelNameDescriptor);
           log.take(transactionID, ptr); // write take to disk
           Event event = log.get(ptr);
           return event;
         } catch (IOException e) {
-          throw new ChannelException("Take failed due to IO error", e);
+          throw new ChannelException("Take failed due to IO error "
+                  + channelNameDescriptor, e);
         }
       }
       return null;
@@ -360,7 +377,8 @@ public class FileChannel extends BasicCh
       int puts = putList.size();
       int takes = takeList.size();
       if(puts > 0) {
-        Preconditions.checkState(takes == 0);
+        Preconditions.checkState(takes == 0, "nonzero puts and takes "
+                + channelNameDescriptor);
         synchronized (queue) {
           while(!putList.isEmpty()) {
             if(!queue.addTail(putList.removeFirst())) {
@@ -370,6 +388,7 @@ public class FileChannel extends BasicCh
               msg.append("added to the queue but the remaining portion ");
               msg.append("cannot be added. Those messages will be consumed ");
               msg.append("despite this transaction failing. Please report.");
+              msg.append(channelNameDescriptor);
               LOG.error(msg.toString());
               Preconditions.checkState(false, msg.toString());
             }
@@ -378,13 +397,15 @@ public class FileChannel extends BasicCh
         try {
           log.commitPut(transactionID);
         } catch (IOException e) {
-          throw new ChannelException("Commit failed due to IO error", e);
+          throw new ChannelException("Commit failed due to IO error "
+              + channelNameDescriptor, e);
         }
       } else if(takes > 0) {
         try {
           log.commitTake(transactionID);
         } catch (IOException e) {
-          throw new ChannelException("Commit failed due to IO error", e);
+          throw new ChannelException("Commit failed due to IO error "
+               + channelNameDescriptor, e);
         }
         queueRemaining.release(takes);
       }
@@ -397,17 +418,20 @@ public class FileChannel extends BasicCh
       int puts = putList.size();
       int takes = takeList.size();
       if(takes > 0) {
-        Preconditions.checkState(puts == 0);
+        Preconditions.checkState(puts == 0, "nonzero puts and takes "
+            + channelNameDescriptor);
         while(!takeList.isEmpty()) {
           Preconditions.checkState(queue.addHead(takeList.removeLast()),
-              "Queue add failed, this shouldn't be able to happen");
+              "Queue add failed, this shouldn't be able to happen "
+                   + channelNameDescriptor);
         }
       }
       queueRemaining.release(puts);
       try {
         log.rollback(transactionID);
       } catch (IOException e) {
-        throw new ChannelException("Commit failed due to IO error", e);
+        throw new ChannelException("Commit failed due to IO error "
+             + channelNameDescriptor, e);
       }
       putList.clear();
       takeList.clear();

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java?rev=1353888&r1=1353887&r2=1353888&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
Tue Jun 26 09:26:24 2012
@@ -57,9 +57,12 @@ class FlumeEventQueue {
   private static final int INDEX_TIMESTAMP = 1;
   private static final int INDEX_SIZE = 2;
   private static final int INDEX_HEAD = 3;
-  private static final int INDEX_ACTIVE_LOG = 4;
+  private static final int INDEX_CHECKPOINT_MARKER = 4;
+  private static final int CHECKPOINT_COMPLETE = EMPTY;
+  private static final int CHECKPOINT_INCOMPLETE = 1;
+  private static final int INDEX_ACTIVE_LOG = 5;
   private static final int MAX_ACTIVE_LOGS = 1024;
-  private static final int HEADER_SIZE = 1028;
+  private static final int HEADER_SIZE = 1029;
   private static final int MAX_ALLOC_BUFFER_SIZE = 2*1024*1024; // 2MB
   private final Map<Integer, AtomicInteger> fileIDCounts = Maps.newHashMap();
   private final MappedByteBuffer mappedBuffer;
@@ -68,6 +71,7 @@ class FlumeEventQueue {
   private final RandomAccessFile checkpointFile;
   private final java.nio.channels.FileChannel checkpointFileHandle;
   private final int queueCapacity;
+  private final String channelNameDescriptor;
 
   private int queueSize;
   private int queueHead;
@@ -77,14 +81,15 @@ class FlumeEventQueue {
    * @param capacity max event capacity of queue
    * @throws IOException
    */
-  FlumeEventQueue(int capacity, File file) throws IOException {
+  FlumeEventQueue(int capacity, File file, String name) throws IOException {
     Preconditions.checkArgument(capacity > 0,
         "Capacity must be greater than zero");
+    this.channelNameDescriptor = "[channel=" + name + "]";
     this.queueCapacity = capacity;
 
     if (!file.exists()) {
       Preconditions.checkState(file.createNewFile(), "Unable to create file: "
-          + file);
+          + file.getCanonicalPath() + " " + channelNameDescriptor);
     }
 
     boolean freshlyAllocated = false;
@@ -115,7 +120,8 @@ class FlumeEventQueue {
       int expectedCapacity = capacity + HEADER_SIZE;
 
       Preconditions.checkState(fileCapacity == expectedCapacity,
-          "Capacity cannot be reduced once the channel is initialized");
+          "Capacity cannot be reduced once the channel is initialized "
+              + channelNameDescriptor);
     }
 
     checkpointFileHandle = checkpointFile.getChannel();
@@ -129,11 +135,18 @@ class FlumeEventQueue {
     } else {
       int version = (int) elementsBuffer.get(INDEX_VERSION);
       Preconditions.checkState(version == VERSION,
-          "Invalid version: " + version);
+          "Invalid version: " + version + channelNameDescriptor);
       timestamp = elementsBuffer.get(INDEX_TIMESTAMP);
       queueSize = (int) elementsBuffer.get(INDEX_SIZE);
       queueHead = (int) elementsBuffer.get(INDEX_HEAD);
 
+      long checkpointComplete =
+          (int) elementsBuffer.get(INDEX_CHECKPOINT_MARKER);
+      Preconditions.checkState(checkpointComplete == CHECKPOINT_COMPLETE,
+          "The last checkpoint was not completed correctly. Please delete "
+          + "the checkpoint file: " + file.getCanonicalPath() + " to rebuild "
+          + "the checkpoint and start again. " + channelNameDescriptor);
+
       int indexMaxLog = INDEX_ACTIVE_LOG + MAX_ACTIVE_LOGS;
       for (int i = INDEX_ACTIVE_LOG; i < indexMaxLog; i++) {
         long nextFileCode = elementsBuffer.get(i);
@@ -146,7 +159,7 @@ class FlumeEventQueue {
       }
     }
 
-    elements = new LongBufferWrapper(elementsBuffer);
+    elements = new LongBufferWrapper(elementsBuffer, channelNameDescriptor);
   }
 
   private Pair<Integer, Integer> deocodeActiveLogCounter(long value) {
@@ -173,6 +186,9 @@ class FlumeEventQueue {
       return false;
     }
 
+    // Start checkpoint
+    elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
+
     updateHeaders();
 
     List<Long> fileIdAndCountEncoded = new ArrayList<Long>();
@@ -191,6 +207,9 @@ class FlumeEventQueue {
     }
 
     elements.sync();
+
+    // Finish checkpoint
+    elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
     mappedBuffer.force();
 
     return true;
@@ -207,7 +226,8 @@ class FlumeEventQueue {
     }
 
     long value = remove(0);
-    Preconditions.checkState(value != EMPTY);
+    Preconditions.checkState(value != EMPTY, "Empty value "
+          + channelNameDescriptor);
 
     FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);
     decrementFileID(ptr.getFileID());
@@ -288,7 +308,7 @@ class FlumeEventQueue {
     AtomicInteger counter = fileIDCounts.get(fileID);
     if(counter == null) {
       Preconditions.checkState(fileIDCounts.size() < MAX_ACTIVE_LOGS,
-          "Too many active logs");
+          "Too many active logs " + channelNameDescriptor);
       counter = new AtomicInteger(0);
       fileIDCounts.put(fileID, counter);
     }
@@ -297,7 +317,8 @@ class FlumeEventQueue {
 
   protected void decrementFileID(int fileID) {
     AtomicInteger counter = fileIDCounts.get(fileID);
-    Preconditions.checkState(counter != null);
+    Preconditions.checkState(counter != null, "null counter "
+        + channelNameDescriptor);
     int count = counter.decrementAndGet();
     if(count == 0) {
       fileIDCounts.remove(fileID);
@@ -306,7 +327,8 @@ class FlumeEventQueue {
 
   protected long get(int index) {
     if (index < 0 || index > queueSize - 1) {
-      throw new IndexOutOfBoundsException(String.valueOf(index));
+      throw new IndexOutOfBoundsException(String.valueOf(index)
+          + channelNameDescriptor);
     }
 
     return elements.get(getPhysicalIndex(index));
@@ -314,7 +336,8 @@ class FlumeEventQueue {
 
   private void set(int index, long value) {
     if (index < 0 || index > queueSize - 1) {
-      throw new IndexOutOfBoundsException(String.valueOf(index));
+      throw new IndexOutOfBoundsException(String.valueOf(index)
+          + channelNameDescriptor);
     }
 
     elements.put(getPhysicalIndex(index), value);
@@ -322,7 +345,8 @@ class FlumeEventQueue {
 
   protected boolean add(int index, long value) {
     if (index < 0 || index > queueSize) {
-      throw new IndexOutOfBoundsException(String.valueOf(index));
+      throw new IndexOutOfBoundsException(String.valueOf(index)
+          + channelNameDescriptor);
     }
 
     if (queueSize == queueCapacity) {
@@ -352,7 +376,8 @@ class FlumeEventQueue {
 
   protected synchronized long remove(int index) {
     if (index < 0 || index > queueSize - 1) {
-      throw new IndexOutOfBoundsException(String.valueOf(index));
+      throw new IndexOutOfBoundsException(String.valueOf(index)
+          + channelNameDescriptor);
     }
     long value = get(index);
 
@@ -387,7 +412,7 @@ class FlumeEventQueue {
     elementsBuffer.put(INDEX_HEAD, queueHead);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Updating checkpoint headers: ts: " + timestamp + ", qs: "
-          + queueSize + ", qh: " + queueHead);
+          + queueSize + ", qh: " + queueHead + " " + channelNameDescriptor);
     }
   }
 
@@ -409,11 +434,13 @@ class FlumeEventQueue {
 
   static class LongBufferWrapper {
     private final LongBuffer buffer;
+    private final String channelNameDescriptor;
 
     Map<Integer, Long> overwriteMap = new HashMap<Integer, Long>();
 
-    LongBufferWrapper(LongBuffer lb) {
+    LongBufferWrapper(LongBuffer lb, String nameDescriptor) {
       buffer = lb;
+      channelNameDescriptor = nameDescriptor;
     }
 
     long get(int index) {
@@ -446,7 +473,7 @@ class FlumeEventQueue {
       }
 
       Preconditions.checkState(overwriteMap.size() == 0,
-          "concurrent update detected");
+          "concurrent update detected " + channelNameDescriptor);
     }
   }
 }

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java?rev=1353888&r1=1353887&r2=1353888&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
Tue Jun 26 09:26:24 2012
@@ -79,6 +79,8 @@ class Log {
   private final ReadLock checkpointReadLock = checkpointLock.readLock();
   private final WriteLock checkpointWriterLock = checkpointLock.writeLock();
   private int logWriteTimeout;
+  private final String channelName;
+  private final String channelNameDescriptor;
 
   static class Builder {
     private long bCheckpointInterval;
@@ -88,6 +90,7 @@ class Log {
     private File[] bLogDirs;
     private int bLogWriteTimeout =
         FileChannelConfiguration.DEFAULT_WRITE_TIMEOUT;
+    private String bName;
 
     Builder setCheckpointInterval(long interval) {
       bCheckpointInterval = interval;
@@ -119,14 +122,19 @@ class Log {
       return this;
     }
 
+    Builder setChannelName(String name) {
+      bName = name;
+      return this;
+    }
+
     Log build() throws IOException {
       return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
-          bLogWriteTimeout, bCheckpointDir, bLogDirs);
+          bLogWriteTimeout, bCheckpointDir, bName, bLogDirs);
     }
   }
 
   private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
-      int logWriteTimeout, File checkpointDir, File... logDirs)
+      int logWriteTimeout, File checkpointDir, String name, File... logDirs)
           throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
         "checkpointInterval <= 0");
@@ -138,6 +146,9 @@ class Log {
             + checkpointDir + " could not be created");
     Preconditions.checkNotNull(logDirs, "logDirs");
     Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
+    this.channelName = name;
+    this.channelNameDescriptor = "[channel=" + name + "]";
+
     for (File logDir : logDirs) {
       Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(),
           "LogDir " + logDir + " could not be created");
@@ -214,7 +225,7 @@ class Log {
        * locations. We will read the last one written to disk.
        */
       queue = new FlumeEventQueue(queueCapacity,
-                        new File(checkpointDir, "checkpoint"));
+                        new File(checkpointDir, "checkpoint"), channelName);
 
       long ts = queue.getTimestamp();
       LOGGER.info("Last Checkpoint " + new Date(ts) +
@@ -324,7 +335,7 @@ class Log {
     if (!lockAcquired) {
       throw new IOException("Failed to obtain lock for writing to the log. "
           + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0.");
+          + "setting it to 0. " + channelNameDescriptor);
     }
 
     try {
@@ -377,7 +388,7 @@ class Log {
     if (!lockAcquired) {
       throw new IOException("Failed to obtain lock for writing to the log. "
           + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0.");
+          + "setting it to 0. " + channelNameDescriptor);
     }
 
     try {
@@ -426,7 +437,7 @@ class Log {
     if (!lockAcquired) {
       throw new IOException("Failed to obtain lock for writing to the log. "
           + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0.");
+          + "setting it to 0. "+ channelNameDescriptor);
     }
 
     if(LOGGER.isDebugEnabled()) {
@@ -561,7 +572,7 @@ class Log {
     if (!lockAcquired) {
       throw new IOException("Failed to obtain lock for writing to the log. "
           + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0.");
+          + "setting it to 0. " + channelNameDescriptor);
     }
 
     try {
@@ -629,7 +640,7 @@ class Log {
     if (!lockAcquired) {
       throw new IOException("Failed to obtain lock for writing to the log. "
           + "Try increasing the log write timeout value or disabling it by "
-          + "setting it to 0.");
+          + "setting it to 0. "+ channelNameDescriptor);
     }
 
     try {
@@ -762,7 +773,8 @@ class Log {
     FileLock lock = tryLock(dir);
     if (lock == null) {
       String msg = "Cannot lock " + dir
-          + ". The directory is already locked.";
+          + ". The directory is already locked. "
+          + channelNameDescriptor;
       LOGGER.info(msg);
       throw new IOException(msg);
     }

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java?rev=1353888&r1=1353887&r2=1353888&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
Tue Jun 26 09:26:24 2012
@@ -302,7 +302,8 @@ class LogFile {
       version = fileHandle.readInt();
       if(version != VERSION) {
         throw new IOException("Version is " + Integer.toHexString(version) +
-            " expected " + Integer.toHexString(VERSION));
+            " expected " + Integer.toHexString(VERSION)
+            + " file: " + file.getCanonicalPath());
       }
       logFileID = fileHandle.readInt();
       lastCheckpointPosition = fileHandle.readLong();

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java?rev=1353888&r1=1353887&r2=1353888&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
Tue Jun 26 09:26:24 2012
@@ -43,12 +43,12 @@ public class TestCheckpoint {
   @Test
   public void testSerialization() throws IOException {
     FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
-    FlumeEventQueue queueIn = new FlumeEventQueue(1, file);
+    FlumeEventQueue queueIn = new FlumeEventQueue(1, file, "test");
     queueIn.addHead(ptrIn);
-    FlumeEventQueue queueOut = new FlumeEventQueue(1, file);
+    FlumeEventQueue queueOut = new FlumeEventQueue(1, file, "test");
     Assert.assertEquals(0, queueOut.getTimestamp());
     queueIn.checkpoint(false);
-    FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file);
+    FlumeEventQueue queueOut2 = new FlumeEventQueue(1, file, "test");
     FlumeEventPointer ptrOut = queueOut2.removeHead();
     Assert.assertEquals(ptrIn, ptrOut);
     Assert.assertTrue(queueOut2.getTimestamp() > 0);

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java?rev=1353888&r1=1353887&r2=1353888&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
Tue Jun 26 09:26:24 2012
@@ -105,7 +105,8 @@ public class TestFileChannel {
         in.addAll(putEvents(channel, "restart", 1, 1));
       }
     } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+      Assert.assertEquals("Cannot acquire capacity. [channel=null]",
+          e.getMessage());
     }
     channel.stop();
     channel = createFileChannel();
@@ -122,7 +123,8 @@ public class TestFileChannel {
         in.addAll(putEvents(channel, "restart", 1, 1));
       }
     } catch (ChannelException e) {
-      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+      Assert.assertEquals("Cannot acquire capacity. [channel=null]",
+          e.getMessage());
     }
     Configurables.configure(channel, context);
     List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);

Modified: incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java?rev=1353888&r1=1353887&r2=1353888&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
(original)
+++ incubator/flume/trunk/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
Tue Jun 26 09:26:24 2012
@@ -40,33 +40,33 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void testQueueIsEmptyAfterCreation() throws IOException {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     Assert.assertNull(queue.removeHead());
   }
   @Test
   public void testCapacity() throws Exception {
-    queue = new FlumeEventQueue(1, file);
+    queue = new FlumeEventQueue(1, file, "test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertFalse(queue.addTail(pointer2));
   }
   @Test(expected=IllegalArgumentException.class)
   public void testInvalidCapacityZero() throws Exception {
-    queue = new FlumeEventQueue(0, file);
+    queue = new FlumeEventQueue(0, file, "test");
   }
   @Test(expected=IllegalArgumentException.class)
   public void testInvalidCapacityNegative() throws Exception {
-    queue = new FlumeEventQueue(-1, file);
+    queue = new FlumeEventQueue(-1, file, "test");
   }
   @Test
   public void addTail1() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(pointer1, queue.removeHead());
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
   @Test
   public void addTail2() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -75,7 +75,7 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addTailLarge() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -92,7 +92,7 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addHead1() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertEquals(pointer1, queue.removeHead());
@@ -100,7 +100,7 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addHead2() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -109,7 +109,7 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addHeadLarge() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -126,7 +126,7 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addTailRemove1() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertTrue(queue.remove(pointer1));
@@ -137,7 +137,7 @@ public class TestFlumeEventQueue {
 
   @Test
   public void addTailRemove2() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
@@ -146,14 +146,14 @@ public class TestFlumeEventQueue {
 
   @Test
   public void addHeadRemove1() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     queue.addHead(pointer1);
     Assert.assertTrue(queue.remove(pointer1));
     Assert.assertNull(queue.removeHead());
   }
   @Test
   public void addHeadRemove2() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
@@ -161,7 +161,7 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void testWrappingCorrectly() throws Exception {
-    queue = new FlumeEventQueue(1000, file);
+    queue = new FlumeEventQueue(1000, file, "test");
     int size = Integer.MAX_VALUE;
     for (int i = 1; i <= size; i++) {
       if(!queue.addHead(new FlumeEventPointer(i, i))) {



Mime
View raw message