flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1332885 - in /incubator/flume/trunk: flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/ flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/re...
Date Wed, 02 May 2012 00:06:20 GMT
Author: arvind
Date: Wed May  2 00:06:19 2012
New Revision: 1332885

URL: http://svn.apache.org/viewvc?rev=1332885&view=rev
Log:
FLUME-1121. Recoverable Memory Channel problem during recovery.

(Brock Noland via Arvind Prabhakar)

Modified:
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
    incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
    incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
    incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java
    incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java

Modified: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.java Wed May  2 00:06:19 2012
@@ -22,6 +22,8 @@ package org.apache.flume.channel.recover
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.flume.Channel;
@@ -59,61 +61,125 @@ public class RecoverableMemoryChannel ex
   public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
   public static final String WAL_MIN_RENTENTION_PERIOD = "wal.minRententionPeriod";
   public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";
+  public static final String CAPACITY = "capacity";
+  public static final String KEEPALIVE = "keep-alive";
+
+  public static final int DEFAULT_CAPACITY = 100;
+  public static final int DEFAULT_KEEPALIVE = 3;
 
   private MemoryChannel memoryChannel = new MemoryChannel();
   private AtomicLong seqidGenerator = new AtomicLong(0);
   private WAL<RecoverableMemoryChannelEvent> wal;
+  /**
+   * MemoryChannel checks to ensure the capacity is available
+   * on commit. That is a problem because we need to write to
+   * disk before we commit the data to MemoryChannel. As such
+   * we keep track of capacity ourselves.
+   */
+  private Semaphore queueRemaining;
+  private int capacity;
+  private int keepAlive;
+  private volatile boolean open;
+
+  public RecoverableMemoryChannel() {
+    open = false;
+  }
 
   @Override
   public void configure(Context context) {
     memoryChannel.configure(context);
-
-    String homePath = System.getProperty("user.home").replace('\\', '/');
-    String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
-    if(wal != null) {
-      try {
-        wal.close();
-      } catch (IOException e) {
-        LOG.error("Error closing existing wal during reconfigure", e);
-      }
+    int capacity = context.getInteger(CAPACITY, DEFAULT_CAPACITY);
+    if(queueRemaining == null) {
+      queueRemaining = new Semaphore(capacity, true);
+    } else if(capacity > this.capacity) {
+      // capacity increase
+      queueRemaining.release(capacity - this.capacity);
+    } else if(capacity < this.capacity) {
+      queueRemaining.acquireUninterruptibly(this.capacity - capacity);
     }
+    this.capacity = capacity;
+    keepAlive = context.getInteger(KEEPALIVE, DEFAULT_KEEPALIVE);
     long rollSize = context.getLong(WAL_ROLL_SIZE, WAL.DEFAULT_ROLL_SIZE);
     long maxLogsSize = context.getLong(WAL_MAX_LOGS_SIZE, WAL.DEFAULT_MAX_LOGS_SIZE);
-    long minRententionPeriod = context.getLong(WAL_MIN_RENTENTION_PERIOD, WAL.DEFAULT_MIN_LOG_RENTENTION_PERIOD);
+    long minLogRetentionPeriod = context.getLong(WAL_MIN_RENTENTION_PERIOD, WAL.DEFAULT_MIN_LOG_RENTENTION_PERIOD);
     long workerInterval = context.getLong(WAL_WORKER_INTERVAL, WAL.DEFAULT_WORKER_INTERVAL);
-    try {
-      wal = new WAL<RecoverableMemoryChannelEvent>(new File(dataDir),
-          RecoverableMemoryChannelEvent.class, rollSize, maxLogsSize,
-          minRententionPeriod, workerInterval);
-    } catch (IOException e) {
-      Throwables.propagate(e);
+    if(wal == null) {
+      String homePath = System.getProperty("user.home").replace('\\', '/');
+      String dataDir = context.getString(WAL_DATA_DIR, homePath + "/.flume/recoverable-memory-channel");
+      try {
+        wal = new WAL<RecoverableMemoryChannelEvent>(new File(dataDir),
+            RecoverableMemoryChannelEvent.class, rollSize, maxLogsSize,
+            minLogRetentionPeriod, workerInterval);
+      } catch (IOException e) {
+        Throwables.propagate(e);
+      }
+    } else {
+      wal.setRollSize(rollSize);
+      wal.setMaxLogsSize(maxLogsSize);
+      wal.setMinLogRetentionPeriod(minLogRetentionPeriod);
+      wal.setWorkerInterval(workerInterval);
+      LOG.warn(this.getClass().getSimpleName() + " only supports " +
+          "partial reconfiguration.");
     }
   }
 
   @Override
   public synchronized void start() {
+    LOG.info("Starting " + this);
     try {
       WALReplayResult<RecoverableMemoryChannelEvent> results = wal.replay();
       Preconditions.checkArgument(results.getSequenceID() >= 0);
       LOG.info("Replay SequenceID " + results.getSequenceID());
       seqidGenerator.set(results.getSequenceID());
-      Transaction transaction = memoryChannel.getTransaction();
-      transaction.begin();
-      LOG.info("Replay Events " + results.getResults().size());
+      int numResults = results.getResults().size();
+      Preconditions.checkState(numResults <= capacity, "Capacity " + capacity +
+          ", but we need to replay " + numResults);
+      LOG.info("Replay Events " + numResults);
       for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
-        memoryChannel.put(entry.getData());
         seqidGenerator.set(Math.max(entry.getSequenceID(),seqidGenerator.get()));
       }
-      transaction.commit();
-      transaction.close();
+      for(WALEntry<RecoverableMemoryChannelEvent> entry : results.getResults()) {
+        Transaction transaction = null;
+        try {
+          transaction = memoryChannel.getTransaction();
+          transaction.begin();
+          memoryChannel.put(entry.getData());
+          transaction.commit();
+        } catch(Exception e) {
+          if(transaction != null) {
+            try {
+              transaction.rollback();
+            } catch(Exception ex) {
+              LOG.info("Error during rollback", ex);
+            }
+          }
+          Throwables.propagate(e);
+        } catch(Error e) {
+          if(transaction != null) {
+            try {
+              transaction.rollback();
+            } catch(Exception ex) {
+              LOG.info("Error during rollback", ex);
+            }
+          }
+          throw e;
+        } finally {
+          if(transaction != null) {
+            transaction.close();
+          }
+        }
+      }
     } catch (IOException e) {
       Throwables.propagate(e);
     }
     super.start();
+    open = true;
   }
 
   @Override
   public synchronized void stop() {
+    open = false;
+    LOG.info("Stopping " + this);
     try {
       close();
     } catch (IOException e) {
@@ -124,7 +190,7 @@ public class RecoverableMemoryChannel ex
 
   @Override
   protected BasicTransactionSemantics createTransaction() {
-    return new FileBackedTransaction(this, memoryChannel);
+    return new RecoverableMemoryTransaction(this, memoryChannel);
   }
 
   private void commitEvents(List<RecoverableMemoryChannelEvent> events)
@@ -155,17 +221,21 @@ public class RecoverableMemoryChannel ex
    * An implementation of {@link Transaction} for {@link RecoverableMemoryChannel}s.
    * </p>
    */
-  private static class FileBackedTransaction extends BasicTransactionSemantics {
+  private static class RecoverableMemoryTransaction extends BasicTransactionSemantics {
 
     private Transaction transaction;
     private MemoryChannel memoryChannel;
-    private RecoverableMemoryChannel fileChannel;
+    private RecoverableMemoryChannel channel;
     private List<Long> sequenceIds = Lists.newArrayList();
     private List<RecoverableMemoryChannelEvent> events = Lists.newArrayList();
-    private FileBackedTransaction(RecoverableMemoryChannel fileChannel, MemoryChannel memoryChannel) {
-      this.fileChannel = fileChannel;
+    private int takes;
+
+    private RecoverableMemoryTransaction(RecoverableMemoryChannel channel,
+        MemoryChannel memoryChannel) {
+      this.channel = channel;
       this.memoryChannel = memoryChannel;
       this.transaction = this.memoryChannel.getTransaction();
+      this.takes = 0;
     }
     @Override
     protected void doBegin() throws InterruptedException {
@@ -173,16 +243,27 @@ public class RecoverableMemoryChannel ex
     }
     @Override
     protected void doPut(Event event) throws InterruptedException {
-      RecoverableMemoryChannelEvent sequencedEvent = new RecoverableMemoryChannelEvent(event, fileChannel.nextSequenceID());
+      if(!channel.open) {
+        throw new ChannelException("Channel not open");
+      }
+      if(!channel.queueRemaining.tryAcquire(channel.keepAlive, TimeUnit.SECONDS)) {
+        throw new ChannelException("Cannot acquire capacity");
+      }
+      RecoverableMemoryChannelEvent sequencedEvent =
+          new RecoverableMemoryChannelEvent(event, channel.nextSequenceID());
       memoryChannel.put(sequencedEvent);
       events.add(sequencedEvent);
     }
 
     @Override
     protected Event doTake() throws InterruptedException {
+      if(!channel.open) {
+        throw new ChannelException("Channel not open");
+      }
       RecoverableMemoryChannelEvent event = (RecoverableMemoryChannelEvent)memoryChannel.take();
       if(event != null) {
         sequenceIds.add(event.sequenceId);
+        takes++;
         return event.event;
       }
       return null;
@@ -190,27 +271,32 @@ public class RecoverableMemoryChannel ex
 
     @Override
     protected void doCommit() throws InterruptedException {
+      if(!channel.open) {
+        throw new ChannelException("Channel not open");
+      }
       if(sequenceIds.size() > 0) {
         try {
-          fileChannel.commitSequenceID(sequenceIds);
+          channel.commitSequenceID(sequenceIds);
         } catch (IOException e) {
           throw new ChannelException("Unable to commit", e);
         }
       }
       if(!events.isEmpty()) {
         try {
-          fileChannel.commitEvents(events);
+          channel.commitEvents(events);
         } catch (IOException e) {
           throw new ChannelException("Unable to commit", e);
         }
       }
       transaction.commit();
+      channel.queueRemaining.release(takes);
     }
 
     @Override
     protected void doRollback() throws InterruptedException {
       sequenceIds.clear();
       events.clear();
+      channel.queueRemaining.release(events.size());
       transaction.rollback();
     }
 

Modified: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/SequenceIDBuffer.java Wed May  2 00:06:19 2012
@@ -159,7 +159,8 @@ public class SequenceIDBuffer implements
       }
 
     }
-    return 0;
+    // default from VM source code
+    return Runtime.getRuntime().maxMemory();
   }
 
 

Modified: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WAL.java Wed May  2 00:06:19 2012
@@ -45,29 +45,29 @@ import com.google.common.io.Files;
  * Provides Write Ahead Log functionality for a generic Writable. All entries
  * stored in the WAL must be assigned a unique increasing sequence id. WAL
  * files will be removed when the following condition holds (defaults):
- * 
+ *
  * At least 512MB of WAL's exist, the file in question is greater than
  * five minutes old and the largest committed sequence id is greater
  * than the largest sequence id in the file.
- * 
+ *
  * <pre>
  *  WAL wal = new WAL(path, Writable.class);
  *  wal.writeEvent(event, 1);
  *  wal.writeEvent(event, 2);
  *  wal.writeSequenceID(1);
  *  wal.writeEvent(event, 3);
- * 
+ *
  *  System crashes or shuts down...
- * 
+ *
  *  WAL wal = new WAL(path, Writable.class);
  *  [Event 2, Event 3]  = wal.replay();
  * </pre>
- * 
+ *
  * WAL files will be created in the specified data directory. They will be
  * rolled at 64MB and deleted five minutes after they are no longer needed.
  * that is the current sequence id) is greater than the greatest sequence id
  *  in the file.
- * 
+ *
  * The only synchronization this class does is around rolling log files. When
  * a roll of the log file is required, the thread which discovers this
  * will execute the roll. Any threads calling a write*() method during
@@ -87,10 +87,10 @@ public class WAL<T extends Writable> imp
   private AtomicLong largestCommitedSequenceID = new AtomicLong(0);
   private volatile boolean rollRequired;
   private volatile boolean rollInProgress;
-  private long rollSize;
-  private long maxLogsSize;
-  private long minLogRentionPeriod;
-  private long workerInterval;
+  private volatile long rollSize;
+  private volatile long maxLogsSize;
+  private volatile long minLogRetentionPeriod;
+  private volatile long workerInterval;
   private int numReplaySequenceIDOverride;
   private Worker backgroundWorker;
 
@@ -120,7 +120,7 @@ public class WAL<T extends Writable> imp
   /**
    * Creates a wal object with no defaults, using the specified parameters in
    * the constructor for operation.
-   * 
+   *
    * @param path
    * @param clazz
    * @param rollSize
@@ -139,7 +139,7 @@ public class WAL<T extends Writable> imp
     this.path = path;
     this.rollSize = rollSize;
     this.maxLogsSize = maxLogsSize;
-    this.minLogRentionPeriod = minLogRentionPeriod;
+    this.minLogRetentionPeriod = minLogRentionPeriod;
     this.workerInterval = workerInterval;
 
     StringBuffer buffer = new StringBuffer();
@@ -168,9 +168,7 @@ public class WAL<T extends Writable> imp
     createOrDie(sequenceIDPath);
     this.clazz = clazz;
 
-    backgroundWorker = new Worker(this.workerInterval, this.maxLogsSize,
-        this.minLogRentionPeriod, largestCommitedSequenceID,
-        fileLargestSequenceIDMap);
+    backgroundWorker = new Worker(this);
     backgroundWorker.setName("WAL-Worker-" + path.getAbsolutePath());
     backgroundWorker.setDaemon(true);
     backgroundWorker.start();
@@ -225,7 +223,6 @@ public class WAL<T extends Writable> imp
         return null;
       }
     });
-
     // then estimate the size of the array
     // needed to hold all the sequence ids
     int baseSize = WALEntry.getBaseSize();
@@ -240,6 +237,7 @@ public class WAL<T extends Writable> imp
     readFiles(sequenceIDPath, new Function<File, Void>() {
       @Override
       public Void apply(File input) {
+        LOG.info("Replaying " + input);
         WALDataFile.Reader<NullWritable> reader = null;
         int localIndex = index.get();
         try {
@@ -282,6 +280,7 @@ public class WAL<T extends Writable> imp
     readFiles(dataPath, new Function<File, Void>() {
       @Override
       public Void apply(File input) {
+        LOG.info("Replaying " + input);
         WALDataFile.Reader<T> reader = null;
         try {
           reader = new WALDataFile.Reader<T>(input, dataClazz);
@@ -316,8 +315,10 @@ public class WAL<T extends Writable> imp
     synchronized (this.fileLargestSequenceIDMap) {
       this.fileLargestSequenceIDMap.clear();
       this.fileLargestSequenceIDMap.putAll(fileLargestSequenceIDMap);
+      LOG.info("SequenceIDMap " + fileLargestSequenceIDMap);
     }
     largestCommitedSequenceID.set(sequenceID.get());
+    LOG.info("Replay complete: LargestCommitedSequenceID = " + largestCommitedSequenceID.get());
     return new WALReplayResult<T>(entries, largestCommitedSequenceID.get());
   }
 
@@ -438,19 +439,10 @@ public class WAL<T extends Writable> imp
   }
 
   private static class Worker extends Thread {
-    private long workerInterval;
-    private long maxLogsSize;
-    private long minLogRentionPeriod;
-    private AtomicLong largestSequenceID;
-    private Map<String, Long> fileLargestSequenceIDMap;
+    private WAL<? extends Writable> wal;
     private volatile boolean run = true;
-
-    public Worker(long workerInterval, long maxLogsSize,
-        long minLogRentionPeriod, AtomicLong largestSequenceID,
-        Map<String, Long> fileLargestSequenceIDMap) {
-      this.workerInterval = workerInterval;
-      this.largestSequenceID = largestSequenceID;
-      this.fileLargestSequenceIDMap = fileLargestSequenceIDMap;
+    public Worker(WAL<? extends Writable> wal) {
+      this.wal = wal;
     }
 
     @Override
@@ -459,7 +451,7 @@ public class WAL<T extends Writable> imp
       while (run) {
         try {
           try {
-            Thread.sleep(workerInterval);
+            Thread.sleep(wal.workerInterval);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
           }
@@ -468,19 +460,19 @@ public class WAL<T extends Writable> imp
           }
           List<String> filesToRemove = Lists.newArrayList();
           long totalSize = 0;
-          synchronized (fileLargestSequenceIDMap) {
-            for (String key : fileLargestSequenceIDMap.keySet()) {
+          synchronized (wal.fileLargestSequenceIDMap) {
+            for (String key : wal.fileLargestSequenceIDMap.keySet()) {
               File file = new File(key);
               totalSize += file.length();
             }
-            if (totalSize >= maxLogsSize) {
-              for (String key : fileLargestSequenceIDMap.keySet()) {
+            if (totalSize >= wal.maxLogsSize) {
+              for (String key : wal.fileLargestSequenceIDMap.keySet()) {
                 File file = new File(key);
-                Long seqid = fileLargestSequenceIDMap.get(key);
-                long largestCommitedSeqID = largestSequenceID.get();
+                Long seqid = wal.fileLargestSequenceIDMap.get(key);
+                long largestCommitedSeqID = wal.largestCommitedSequenceID.get();
                 if (file.exists()
                     // has not been modified in 5 minutes
-                    && System.currentTimeMillis() - file.lastModified() > minLogRentionPeriod
+                    && System.currentTimeMillis() - file.lastModified() > wal.minLogRetentionPeriod
                     // current seqid is greater than the largest seqid in the file
                     && largestCommitedSeqID > seqid) {
                   filesToRemove.add(key);
@@ -489,7 +481,7 @@ public class WAL<T extends Writable> imp
                 }
               }
               for (String key : filesToRemove) {
-                fileLargestSequenceIDMap.remove(key);
+                wal.fileLargestSequenceIDMap.remove(key);
               }
             }
           }
@@ -507,6 +499,23 @@ public class WAL<T extends Writable> imp
     }
   }
 
+
+  public void setRollSize(long rollSize) {
+    this.rollSize = rollSize;
+  }
+
+  public void setMaxLogsSize(long maxLogsSize) {
+    this.maxLogsSize = maxLogsSize;
+  }
+
+  public void setMinLogRetentionPeriod(long minLogRetentionPeriod) {
+    this.minLogRetentionPeriod = minLogRetentionPeriod;
+  }
+
+  public void setWorkerInterval(long workerInterval) {
+    this.workerInterval = workerInterval;
+  }
+
   /**
    * Reads in a WAL and writes out a new WAL. Used if for some reason a replay
    * cannot occur due to the size of the WAL or assumptions about the number of

Modified: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/main/java/org/apache/flume/channel/recoverable/memory/wal/WALDataFile.java Wed May  2 00:06:19 2012
@@ -38,10 +38,10 @@ import com.google.common.collect.Lists;
 class WALDataFile<T extends Writable> {
 
   private static final int VERSION = 1;
-  
+
   private static final int RECORD_TYPE_EVENT = 1;
   private static final int RECORD_TYPE_COMMIT = 2;
-  
+
   static class Reader<T extends Writable> implements Closeable {
     Class<T> clazz;
     DataInputStream input;
@@ -78,7 +78,7 @@ class WALDataFile<T extends Writable> {
           // would have gotten an exception and retried or locally
           // stored the batch for resending later
           return null;
-        }        
+        }
       }
     }
 
@@ -118,7 +118,7 @@ class WALDataFile<T extends Writable> {
       // if this is successful, the events have been
       // successfully persisted and will be replayed
       // in the case of a crash
-      dataOutput.writeInt(RECORD_TYPE_COMMIT); 
+      dataOutput.writeInt(RECORD_TYPE_COMMIT);
       flush(false);
     }
 

Modified: incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java (original)
+++ incubator/flume/trunk/flume-ng-channels/flume-recoverable-memory-channel/src/test/java/org/apache/flume/channel/recoverable/memory/TestRecoverableMemoryChannel.java Wed May  2 00:06:19 2012
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
@@ -56,6 +57,7 @@ public class TestRecoverableMemoryChanne
   .getLogger(TestRecoverableMemoryChannel.class);
 
   private RecoverableMemoryChannel channel;
+  Context context;
 
   private File dataDir;
 
@@ -69,7 +71,7 @@ public class TestRecoverableMemoryChanne
 
   private RecoverableMemoryChannel createFileChannel() {
     RecoverableMemoryChannel channel = new RecoverableMemoryChannel();
-    Context context = new Context();
+    context = new Context();
     context.put(RecoverableMemoryChannel.WAL_DATA_DIR, dataDir.getAbsolutePath());
     Configurables.configure(channel, context);
     channel.start();
@@ -80,7 +82,39 @@ public class TestRecoverableMemoryChanne
   public void teardown() {
     FileUtils.deleteQuietly(dataDir);
   }
-
+  @Test
+  public void testRestart() throws Exception {
+    List<String> in = Lists.newArrayList();
+    try {
+      while(true) {
+        in.addAll(putEvents(channel, "restart", 1, 1));
+      }
+    } catch (ChannelException e) {
+      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+    }
+    channel.stop();
+    channel = createFileChannel();
+    List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    Collections.sort(in);
+    Collections.sort(out);
+    Assert.assertEquals(in, out);
+  }
+  @Test
+  public void testReconfigure() throws Exception {
+    List<String> in = Lists.newArrayList();
+    try {
+      while(true) {
+        in.addAll(putEvents(channel, "restart", 1, 1));
+      }
+    } catch (ChannelException e) {
+      Assert.assertEquals("Cannot acquire capacity", e.getMessage());
+    }
+    Configurables.configure(channel, context);
+    List<String> out = takeEvents(channel, 1, Integer.MAX_VALUE);
+    Collections.sort(in);
+    Collections.sort(out);
+    Assert.assertEquals(in, out);
+  }
   @Test
   public void testRollbackWithSink() throws Exception {
     final NullSink sink = new NullSink();
@@ -165,7 +199,7 @@ public class TestRecoverableMemoryChanne
   @Test
   public void testPut() throws Exception {
     // should find no items
-    int found = takeEvents(channel, "unbatched-gets", 1, 5).size();
+    int found = takeEvents(channel, 1, 5).size();
     Assert.assertEquals(0, found);
     putEvents(channel, "unbatched", 1, 5);
     putEvents(channel, "batched", 5, 5);
@@ -213,11 +247,10 @@ public class TestRecoverableMemoryChanne
           try {
             startLatch.countDown();
             startLatch.await();
-            String prefix = "take-thread-" + Integer.toString(id);
             if (id % 2 == 0) {
-              actual.addAll(takeEvents(channel, prefix, 1, Integer.MAX_VALUE));
+              actual.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
             } else {
-              actual.addAll(takeEvents(channel, prefix, 5, Integer.MAX_VALUE));
+              actual.addAll(takeEvents(channel, 5, Integer.MAX_VALUE));
             }
           } catch (Exception e) {
             errors.add(e);
@@ -235,7 +268,7 @@ public class TestRecoverableMemoryChanne
     Collections.sort(actual);
     Assert.assertEquals(expected, actual);
   }
-  private static List<String> takeEvents(Channel channel, String prefix, int batchSize,
+  private static List<String> takeEvents(Channel channel, int batchSize,
       int numEvents) throws Exception {
     List<String> result = Lists.newArrayList();
     for (int i = 0; i < numEvents; i += batchSize) {

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/conf/Configurables.java Wed May  2 00:06:19 2012
@@ -29,7 +29,7 @@ public class Configurables {
   /**
    * Check that {@code target} implements {@link Configurable} and, if so, ask
    * it to configure itself using the supplied {@code context}.
-   * 
+   *
    * @param target
    *          An object that potentially implements Configurable.
    * @param context

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java Wed May  2 00:06:19 2012
@@ -100,13 +100,13 @@ public class BucketPath {
 
   /**
    * Hardcoded lookups for %x style escape replacement. Add your own!
-   * 
+   *
    * All shorthands are Date format strings, currently.
-   * 
+   *
    * Returns the empty string if an escape is not recognized.
-   * 
+   *
    * Dates follow the same format as unix date, with a few exceptions.
-   * 
+   *
    */
   public static String replaceShorthand(char c, Map<String, String> headers) {
     // It's a date
@@ -189,9 +189,9 @@ public class BucketPath {
   /**
    * Replace all substrings of form %{tagname} with get(tagname).toString() and
    * all shorthand substrings of form %x with a special value.
-   * 
+   *
    * Any unrecognized / not found tags will be replaced with the empty string.
-   * 
+   *
    * TODO(henry): we may want to consider taking this out of Event and into a
    * more general class when we get more use cases for this pattern.
    */

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/lifecycle/LifecycleAware.java Wed May  2 00:06:19 2012
@@ -43,40 +43,40 @@ package org.apache.flume.lifecycle;
  * </p>
  * <code>
  *  public class MyService implements LifecycleAware {
- * 
+ *
  *    private LifecycleState lifecycleState;
- * 
+ *
  *    public MyService() {
  *      lifecycleState = LifecycleState.IDLE;
  *    }
- * 
+ *
  *    @Override
  *    public void start(Context context) throws LifecycleException,
  *      InterruptedException {
- * 
+ *
  *      ...your code does something.
- * 
+ *
  *      lifecycleState = LifecycleState.START;
  *    }
- * 
+ *
  *    @Override
  *    public void stop(Context context) throws LifecycleException,
  *      InterruptedException {
- * 
+ *
  *      try {
  *        ...you stop services here.
  *      } catch (SomethingException) {
  *        lifecycleState = LifecycleState.ERROR;
  *      }
- * 
+ *
  *      lifecycleState = LifecycleState.STOP;
  *    }
- * 
+ *
  *    @Override
  *    public LifecycleState getLifecycleState() {
  *      return lifecycleState;
  *    }
- * 
+ *
  *  }
  * </code>
  */
@@ -90,7 +90,7 @@ public interface LifecycleAware {
    * Implementations should determine the result of any start logic and effect
    * the return value of {@link #getLifecycleState()} accordingly.
    * </p>
-   * 
+   *
    * @throws LifecycleException
    * @throws InterruptedException
    */
@@ -104,7 +104,7 @@ public interface LifecycleAware {
    * Implementations should determine the result of any stop logic and effect
    * the return value of {@link #getLifecycleState()} accordingly.
    * </p>
-   * 
+   *
    * @throws LifecycleException
    * @throws InterruptedException
    */

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java Wed May  2 00:06:19 2012
@@ -49,6 +49,7 @@ abstract public class AbstractSink imple
     lifecycleState = LifecycleState.STOP;
   }
 
+  @Override
   public synchronized Channel getChannel() {
     return channel;
   }

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Wed May  2 00:06:19 2012
@@ -19,7 +19,6 @@
 
 package org.apache.flume.sink;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.flume.Channel;

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java Wed May  2 00:06:19 2012
@@ -98,6 +98,7 @@ implements EventDrivenSource, Configurab
 
     ServerBootstrap serverBootstrap = new ServerBootstrap(factory);
     serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
       public ChannelPipeline getPipeline() {
         syslogTcpHandler handler = new syslogTcpHandler();
         handler.setEventSize(eventSize);

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java Wed May  2 00:06:19 2012
@@ -91,6 +91,7 @@ public class SyslogUDPSource extends Abs
     final syslogHandler handler = new syslogHandler();
     handler.setFormater(formaterProp);
     serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      @Override
       public ChannelPipeline getPipeline() {
        return Channels.pipeline(handler);
       }

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java Wed May  2 00:06:19 2012
@@ -190,6 +190,7 @@ public abstract class AbstractBasicChann
 
   protected void testWrongThread(final Runnable test) throws Exception {
     executor.submit(new Runnable() {
+        @Override
         public void run() {
           testIllegalState(test);
         }
@@ -209,6 +210,7 @@ public abstract class AbstractBasicChann
   protected void testException(TestChannel.Mode mode,
       final Class<? extends Throwable> exceptionClass, final Runnable test) {
     testMode(mode, new Runnable() {
+        @Override
         public void run() {
           testException(exceptionClass, test);
         }
@@ -230,8 +232,10 @@ public abstract class AbstractBasicChann
 
   protected void testInterrupt(final Runnable test) {
     testMode(TestChannel.Mode.SLEEP, new Runnable() {
+        @Override
         public void run() {
           testException(InterruptedException.class, new Runnable() {
+              @Override
               public void run() {
                 interruptTest(test);
               }
@@ -243,6 +247,7 @@ public abstract class AbstractBasicChann
   protected void interruptTest(final Runnable test) {
     final Thread mainThread = Thread.currentThread();
     Future<?> future = executor.submit(new Runnable() {
+        @Override
         public void run() {
           try {
             Thread.sleep(500);

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java Wed May  2 00:06:19 2012
@@ -51,6 +51,7 @@ public class TestBasicChannelSemantics
   public void testMultiThreadedHappyPath() throws Exception {
     final int testLength = 1000;
     Future<?> producer = executor.submit(new Runnable() {
+        @Override
         public void run() {
           try {
             Thread.sleep(500);
@@ -89,6 +90,7 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     executor.submit(new Runnable() {
+        @Override
         public void run() {
           Assert.assertNotSame(transaction, channel.getTransaction());
         }
@@ -99,6 +101,7 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     executor.submit(new Runnable() {
+        @Override
         public void run() {
           Assert.assertNotSame(transaction, channel.getTransaction());
         }
@@ -108,6 +111,7 @@ public class TestBasicChannelSemantics
     transaction.commit();
 
     executor.submit(new Runnable() {
+        @Override
         public void run() {
           Assert.assertNotSame(transaction, channel.getTransaction());
         }
@@ -117,6 +121,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     executor.submit(new Runnable() {
+        @Override
         public void run() {
           Assert.assertNotSame(transaction, channel.getTransaction());
         }
@@ -129,6 +134,7 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testExceptions(new Runnable() {
+        @Override
         public void run() {
           transaction.begin();
         }
@@ -137,6 +143,7 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.begin();
         }
@@ -145,6 +152,7 @@ public class TestBasicChannelSemantics
     transaction.commit();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.begin();
         }
@@ -153,6 +161,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.begin();
         }
@@ -162,6 +171,7 @@ public class TestBasicChannelSemantics
   @Test
   public void testPut1() throws Exception {
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -170,6 +180,7 @@ public class TestBasicChannelSemantics
     Transaction transaction = channel.getTransaction();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -179,12 +190,14 @@ public class TestBasicChannelSemantics
     channel.put(events.get(0));
 
     testIllegalArgument(new Runnable() {
+        @Override
         public void run() {
           channel.put(null);
         }
       });
 
     testExceptions(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -193,6 +206,7 @@ public class TestBasicChannelSemantics
     transaction.commit();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -201,6 +215,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -215,6 +230,7 @@ public class TestBasicChannelSemantics
     transaction.rollback();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -223,6 +239,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -237,6 +254,7 @@ public class TestBasicChannelSemantics
 
     final Transaction finalTransaction = transaction;
     testChannelException(new Runnable() {
+        @Override
         public void run() {
           finalTransaction.commit();
         }
@@ -245,6 +263,7 @@ public class TestBasicChannelSemantics
     transaction.rollback();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -253,6 +272,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -262,6 +282,7 @@ public class TestBasicChannelSemantics
   @Test
   public void testTake1() throws Exception {
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
@@ -270,6 +291,7 @@ public class TestBasicChannelSemantics
     Transaction transaction = channel.getTransaction();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
@@ -284,20 +306,24 @@ public class TestBasicChannelSemantics
     Assert.assertNotNull(channel.take());
 
     testWrongThread(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
       });
 
     testBasicExceptions(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
       });
 
     testMode(TestChannel.Mode.SLEEP, new Runnable() {
+        @Override
         public void run() {
           interruptTest(new Runnable() {
+              @Override
               public void run() {
                 Assert.assertNull(channel.take());
                 Assert.assertTrue(Thread.interrupted());
@@ -311,6 +337,7 @@ public class TestBasicChannelSemantics
     transaction.commit();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
@@ -319,6 +346,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
@@ -333,6 +361,7 @@ public class TestBasicChannelSemantics
     transaction.rollback();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
@@ -341,6 +370,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
@@ -355,6 +385,7 @@ public class TestBasicChannelSemantics
 
     final Transaction finalTransaction = transaction;
     testChannelException(new Runnable() {
+        @Override
         public void run() {
           finalTransaction.commit();
         }
@@ -363,6 +394,7 @@ public class TestBasicChannelSemantics
     transaction.rollback();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
@@ -371,6 +403,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           channel.take();
         }
@@ -382,6 +415,7 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.commit();
         }
@@ -390,6 +424,7 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testExceptions(new Runnable() {
+        @Override
         public void run() {
           transaction.commit();
         }
@@ -398,6 +433,7 @@ public class TestBasicChannelSemantics
     transaction.commit();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.commit();
         }
@@ -406,6 +442,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.commit();
         }
@@ -420,6 +457,7 @@ public class TestBasicChannelSemantics
     transaction.rollback();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.commit();
         }
@@ -428,6 +466,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.commit();
         }
@@ -439,6 +478,7 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -447,6 +487,7 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testWrongThread(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -455,6 +496,7 @@ public class TestBasicChannelSemantics
     transaction.rollback();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -463,6 +505,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -474,6 +517,7 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -482,12 +526,14 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testError(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
       });
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -496,6 +542,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -507,6 +554,7 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -515,12 +563,14 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testRuntimeException(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
       });
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -529,6 +579,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -540,6 +591,7 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -548,12 +600,14 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testChannelException(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
       });
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -562,6 +616,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -574,6 +629,7 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -582,12 +638,14 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testInterrupt(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
       });
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -596,6 +654,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -610,6 +669,7 @@ public class TestBasicChannelSemantics
     transaction.commit();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -618,6 +678,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -631,6 +692,7 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testExceptions(new Runnable() {
+        @Override
         public void run() {
           transaction.commit();
         }
@@ -639,6 +701,7 @@ public class TestBasicChannelSemantics
     transaction.rollback();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -647,6 +710,7 @@ public class TestBasicChannelSemantics
     transaction.close();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.rollback();
         }
@@ -658,12 +722,14 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testError(new Runnable() {
+        @Override
         public void run() {
           transaction.close();
         }
       });
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.close();
         }
@@ -675,12 +741,14 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testRuntimeException(new Runnable() {
+        @Override
         public void run() {
           transaction.close();
         }
       });
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.close();
         }
@@ -692,12 +760,14 @@ public class TestBasicChannelSemantics
     final Transaction transaction = channel.getTransaction();
 
     testChannelException(new Runnable() {
+        @Override
         public void run() {
           transaction.close();
         }
       });
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.close();
         }
@@ -710,6 +780,7 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.close();
         }
@@ -722,12 +793,14 @@ public class TestBasicChannelSemantics
     transaction.begin();
 
     testChannelException(new Runnable() {
+        @Override
         public void run() {
           transaction.commit();
         }
       });
 
     testIllegalState(new Runnable() {
+        @Override
         public void run() {
           transaction.close();
         }

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java Wed May  2 00:06:19 2012
@@ -79,8 +79,10 @@ public class TestChannelUtils
   private void testTransact(final TestChannel.Mode mode,
       Class<? extends Throwable> exceptionClass, final Runnable test) {
     testException(exceptionClass, new Runnable() {
+        @Override
         public void run() {
           ChannelUtils.transact(channel, new Runnable() {
+              @Override
               public void run() {
                 testMode(mode, test);
               }
@@ -95,6 +97,7 @@ public class TestChannelUtils
   private void testTransact(TestChannel.Mode mode,
       Class<? extends Throwable> exceptionClass) {
     testTransact(mode, exceptionClass, new Runnable() {
+        @Override
         public void run() {
           channel.put(events.get(0));
         }
@@ -120,8 +123,10 @@ public class TestChannelUtils
   public void testInterrupt() throws Exception {
     testTransact(TestChannel.Mode.SLEEP, InterruptedException.class,
         new Runnable() {
+          @Override
           public void run() {
             interruptTest(new Runnable() {
+                @Override
                 public void run() {
                   channel.put(events.get(0));
                 }

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java Wed May  2 00:06:19 2012
@@ -52,6 +52,7 @@ public class TestMemoryChannelConcurrenc
 
     Configurables.configure(channel, new Context());
     Thread t1 = new Thread(new Runnable() {
+      @Override
       public void run() {
         Transaction tx = channel.getTransaction();
         tx.begin();
@@ -74,6 +75,7 @@ public class TestMemoryChannelConcurrenc
     });
 
     Thread t2 = new Thread(new Runnable() {
+      @Override
       public void run() {
         Transaction tx = channel.getTransaction();
         try {
@@ -145,6 +147,7 @@ public class TestMemoryChannelConcurrenc
 
     for (int i = 0; i < threadCount; i++) {
       Thread t = new Thread() {
+        @Override
         public void run() {
           Long tid = Thread.currentThread().getId();
           String strtid = tid.toString();
@@ -227,6 +230,7 @@ public class TestMemoryChannelConcurrenc
     // start a sink and source for each
     for (int i = 0; i < threadCount/2; i++) {
       Thread t = new Thread() {
+        @Override
         public void run() {
           Long tid = Thread.currentThread().getId();
           String strtid = tid.toString();
@@ -269,6 +273,7 @@ public class TestMemoryChannelConcurrenc
       t.start();
       final Integer takeMapLock = 0;
       t = new Thread() {
+        @Override
         public void run() {
           Random rng = new Random(Thread.currentThread().getId());
 

Modified: incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java (original)
+++ incubator/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java Wed May  2 00:06:19 2012
@@ -24,8 +24,6 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
-import org.apache.flume.Transaction.TransactionState;
-import org.apache.flume.channel.MemoryChannel.MemoryTransaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;

Modified: incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ incubator/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Wed May  2 00:06:19 2012
@@ -21,6 +21,7 @@ package org.apache.flume.node.nodemanage
 
 import java.util.Map.Entry;
 
+import org.apache.flume.Channel;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.lifecycle.LifecycleAware;
@@ -56,18 +57,30 @@ public class DefaultLogicalNodeManager e
     if (this.nodeConfiguration != null) {
       logger
           .info("Shutting down old configuration: {}", this.nodeConfiguration);
+      for (Entry<String, SourceRunner> entry : this.nodeConfiguration
+          .getSourceRunners().entrySet()) {
+        try{
+          logger.info("Stopping Source " + entry.getKey());
+          nodeSupervisor.unsupervise(entry.getValue());
+        } catch (Exception e){
+          logger.error("Error while stopping {}", entry.getValue(), e);
+        }
+      }
+
       for (Entry<String, SinkRunner> entry :
         this.nodeConfiguration.getSinkRunners().entrySet()) {
         try{
+          logger.info("Stopping Sink " + entry.getKey());
           nodeSupervisor.unsupervise(entry.getValue());
         } catch (Exception e){
           logger.error("Error while stopping {}", entry.getValue(), e);
         }
       }
 
-      for (Entry<String, SourceRunner> entry : this.nodeConfiguration
-          .getSourceRunners().entrySet()) {
+      for (Entry<String, Channel> entry :
+        this.nodeConfiguration.getChannels().entrySet()) {
         try{
+          logger.info("Stopping Channel " + entry.getKey());
           nodeSupervisor.unsupervise(entry.getValue());
         } catch (Exception e){
           logger.error("Error while stopping {}", entry.getValue(), e);
@@ -76,9 +89,22 @@ public class DefaultLogicalNodeManager e
     }
 
     this.nodeConfiguration = nodeConfiguration;
+
+    for (Entry<String, Channel> entry :
+      nodeConfiguration.getChannels().entrySet()) {
+      try{
+        logger.info("Starting Channel " + entry.getKey());
+        nodeSupervisor.supervise(entry.getValue(),
+            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+      } catch (Exception e){
+        logger.error("Error while starting {}", entry.getValue(), e);
+      }
+    }
+
     for (Entry<String, SinkRunner> entry : nodeConfiguration.getSinkRunners()
         .entrySet()) {
       try{
+        logger.info("Starting Sink " + entry.getKey());
         nodeSupervisor.supervise(entry.getValue(),
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
       } catch (Exception e) {
@@ -89,6 +115,7 @@ public class DefaultLogicalNodeManager e
     for (Entry<String, SourceRunner> entry : nodeConfiguration
         .getSourceRunners().entrySet()) {
       try{
+        logger.info("Starting Source " + entry.getKey());
         nodeSupervisor.supervise(entry.getValue(),
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
       } catch (Exception e) {

Modified: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java Wed May  2 00:06:19 2012
@@ -37,10 +37,12 @@ public class TestFailoverRpcClient {
    *
    * @throws FlumeException
    * @throws EventDeliveryException
+   * @throws InterruptedException
    */
 
   @Test
-  public void testFailover() throws FlumeException, EventDeliveryException {
+  public void testFailover()
+      throws FlumeException, EventDeliveryException,InterruptedException {
     FailoverRpcClient client = null;
     Server server1 = RpcTestUtils.startServer(new OKAvroHandler());
     Server server2 = RpcTestUtils.startServer(new OKAvroHandler());
@@ -63,6 +65,7 @@ public class TestFailoverRpcClient {
     Assert.assertEquals(client.getLastConnectedServerAddress(),
         new InetSocketAddress("localhost", server1.getPort()));
     server1.close();
+    Thread.sleep(1000L); // wait a second for the close to occur
     events = new ArrayList<Event>();
     for (int i = 0; i < 50; i++) {
       events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
@@ -71,6 +74,7 @@ public class TestFailoverRpcClient {
     Assert.assertEquals(new InetSocketAddress("localhost", server2.getPort()),
         client.getLastConnectedServerAddress());
     server2.close();
+    Thread.sleep(1000L); // wait a second for the close to occur
     client.append(EventBuilder.withBody("Had a sandwich?",
         Charset.forName("UTF8")));
     Assert.assertEquals(new InetSocketAddress("localhost", server3.getPort()),
@@ -78,6 +82,7 @@ public class TestFailoverRpcClient {
     // Bring server 2 back.
     Server server4 = RpcTestUtils.startServer(new OKAvroHandler(), s2Port);
     server3.close();
+    Thread.sleep(1000L); // wait a second for the close to occur
     events = new ArrayList<Event>();
     for (int i = 0; i < 50; i++) {
       events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
@@ -94,7 +99,7 @@ public class TestFailoverRpcClient {
     Assert.assertEquals(new InetSocketAddress("localhost", s2Port),
         client.getLastConnectedServerAddress());
     server4.close();
-
+    Thread.sleep(1000L); // wait a second for the close to occur
     events = new ArrayList<Event>();
     for (int i = 0; i < 50; i++) {
       events.add(EventBuilder.withBody("evt: " + i, Charset.forName("UTF8")));
@@ -103,6 +108,7 @@ public class TestFailoverRpcClient {
     Assert.assertEquals(new InetSocketAddress("localhost", s1Port),
         client.getLastConnectedServerAddress());
     server5.close();
+    Thread.sleep(1000L); // wait a second for the close to occur
     Server server6 = RpcTestUtils.startServer(new OKAvroHandler(), s1Port);
     client
     .append(EventBuilder.withBody("Had a whole watermelon?",
@@ -111,6 +117,7 @@ public class TestFailoverRpcClient {
         client.getLastConnectedServerAddress());
 
     server6.close();
+    Thread.sleep(1000L); // wait a second for the close to occur
     Server server7 = RpcTestUtils.startServer(new OKAvroHandler(), s3Port);
     events = new ArrayList<Event>();
     for (int i = 0; i < 50; i++) {

Modified: incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java?rev=1332885&r1=1332884&r2=1332885&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java (original)
+++ incubator/flume/trunk/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java Wed May  2 00:06:19 2012
@@ -75,6 +75,7 @@ public class TestNettyAvroRpcClient {
    */
   @Test(expected=FlumeException.class)
   public void testUnableToConnect() throws FlumeException {
+    @SuppressWarnings("unused")
     NettyAvroRpcClient client = new NettyAvroRpcClient(
         new InetSocketAddress(localhost, 1), 0);
   }
@@ -111,15 +112,17 @@ public class TestNettyAvroRpcClient {
    * First connect the client, then shut down the server, then send a request.
    * @throws FlumeException
    * @throws EventDeliveryException
+   * @throws InterruptedException
    */
   @Test(expected=EventDeliveryException.class)
   public void testServerDisconnect() throws FlumeException,
-      EventDeliveryException {
+      EventDeliveryException, InterruptedException {
     NettyAvroRpcClient client = null;
     Server server = RpcTestUtils.startServer(new OKAvroHandler());
     try {
       client = RpcTestUtils.getStockLocalClient(server.getPort());
       server.close();
+      Thread.sleep(1000L); // wait a second for the close to occur
       try {
         server.join();
       } catch (InterruptedException ex) {



Mime
View raw message