flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1173415 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/test/java/org/apache/flume/ flume-ng-node/src/main/java/org/apache/flume/node/ flume-ng-node/src/main/java/org/apache/flume/n...
Date Tue, 20 Sep 2011 22:47:45 GMT
Author: esammer
Date: Tue Sep 20 22:47:44 2011
New Revision: 1173415

URL: http://svn.apache.org/viewvc?rev=1173415&view=rev
Log:
- LogicalNode is no more. Sources and Sinks are directly managed (independently). I'll write
up why this makes sense.

Removed:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/LogicalNode.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestLogicalNode.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java?rev=1173415&r1=1173414&r2=1173415&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/NodeManager.java
Tue Sep 20 22:47:44 2011
@@ -2,19 +2,16 @@ package org.apache.flume.node;
 
 import java.util.Set;
 
-import org.apache.flume.LogicalNode;
 import org.apache.flume.lifecycle.LifecycleAware;
 
 public interface NodeManager extends LifecycleAware {
 
-  public boolean add(LogicalNode node);
+  public boolean add(LifecycleAware node);
 
-  public boolean remove(LogicalNode node);
+  public boolean remove(LifecycleAware node);
 
-  public Set<LogicalNode> getNodes();
+  public Set<LifecycleAware> getNodes();
 
-  public void setNodes(Set<LogicalNode> nodes);
-
-  public LogicalNode getNode(String name);
+  public void setNodes(Set<LifecycleAware> nodes);
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java?rev=1173415&r1=1173414&r2=1173415&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/AbstractLogicalNodeManager.java
Tue Sep 20 22:47:44 2011
@@ -3,54 +3,43 @@ package org.apache.flume.node.nodemanage
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flume.LogicalNode;
+import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.node.NodeManager;
 
 import com.google.common.base.Preconditions;
 
 abstract public class AbstractLogicalNodeManager implements NodeManager {
 
-  private Set<LogicalNode> nodes;
+  private Set<LifecycleAware> nodes;
 
   public AbstractLogicalNodeManager() {
-    nodes = new HashSet<LogicalNode>();
+    nodes = new HashSet<LifecycleAware>();
   }
 
   @Override
-  public boolean add(LogicalNode node) {
+  public boolean add(LifecycleAware node) {
     Preconditions.checkNotNull(node);
 
     return nodes.add(node);
   }
 
   @Override
-  public boolean remove(LogicalNode node) {
+  public boolean remove(LifecycleAware node) {
     Preconditions.checkNotNull(node);
 
     return nodes.remove(node);
   }
 
   @Override
-  public LogicalNode getNode(String name) {
-    for (LogicalNode node : getNodes()) {
-      if (node.getName().equals(name)) {
-        return node;
-      }
-    }
-
-    return null;
-  }
-
-  @Override
-  public Set<LogicalNode> getNodes() {
+  public Set<LifecycleAware> getNodes() {
     return nodes;
   }
 
   @Override
-  public void setNodes(Set<LogicalNode> nodes) {
+  public void setNodes(Set<LifecycleAware> nodes) {
     Preconditions.checkNotNull(nodes);
 
-    this.nodes = new HashSet<LogicalNode>(nodes);
+    this.nodes = new HashSet<LifecycleAware>(nodes);
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1173415&r1=1173414&r2=1173415&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Tue Sep 20 22:47:44 2011
@@ -2,9 +2,9 @@ package org.apache.flume.node.nodemanage
 
 import java.util.Map.Entry;
 
-import org.apache.flume.LogicalNode;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
+import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.lifecycle.LifecycleSupervisor;
 import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
@@ -48,7 +48,7 @@ public class DefaultLogicalNodeManager e
   }
 
   @Override
-  public boolean add(LogicalNode node) {
+  public boolean add(LifecycleAware node) {
     /*
      * FIXME: This type of overriding worries me. There should be a better
      * separation of addition of nodes and management. (i.e. state vs. function)
@@ -67,7 +67,7 @@ public class DefaultLogicalNodeManager e
   }
 
   @Override
-  public boolean remove(LogicalNode node) {
+  public boolean remove(LifecycleAware node) {
     /*
      * FIXME: This type of overriding worries me. There should be a better
      * separation of addition of nodes and management. (i.e. state vs. function)

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1173415&r1=1173414&r2=1173415&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
Tue Sep 20 22:47:44 2011
@@ -1,9 +1,7 @@
 package org.apache.flume.node;
 
-import junit.framework.Assert;
-
-import org.apache.flume.LogicalNode;
 import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -12,6 +10,7 @@ import org.apache.flume.sink.NullSink;
 import org.apache.flume.sink.PollableSinkRunner;
 import org.apache.flume.source.PollableSourceRunner;
 import org.apache.flume.source.SequenceGeneratorSource;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -33,7 +32,7 @@ public class TestAbstractLogicalNodeMana
       @Override
       public void stop() {
 
-        for (LogicalNode node : getNodes()) {
+        for (LifecycleAware node : getNodes()) {
           node.stop();
 
           boolean reached = false;
@@ -62,7 +61,7 @@ public class TestAbstractLogicalNodeMana
       @Override
       public void start() {
 
-        for (LogicalNode node : getNodes()) {
+        for (LifecycleAware node : getNodes()) {
           node.start();
 
           boolean reached = false;
@@ -117,19 +116,14 @@ public class TestAbstractLogicalNodeMana
   @Test
   public void testLifecycle() throws LifecycleException, InterruptedException {
 
-    LogicalNode node = new LogicalNode();
-    node.setName("test");
-
     PollableSourceRunner sourceRunner = new PollableSourceRunner();
     sourceRunner.setSource(new SequenceGeneratorSource());
 
     PollableSinkRunner sinkRunner = new PollableSinkRunner();
     sinkRunner.setSink(new NullSink());
 
-    node.setSourceRunner(sourceRunner);
-    node.setSinkRunner(sinkRunner);
-
-    nodeManager.add(node);
+    nodeManager.add(sourceRunner);
+    nodeManager.add(sinkRunner);
 
     nodeManager.start();
     boolean reached = LifecycleController.waitForOneOf(nodeManager,
@@ -150,8 +144,6 @@ public class TestAbstractLogicalNodeMana
   public void testRapidLifecycleFlapping() throws LifecycleException,
       InterruptedException {
 
-    LogicalNode node = new LogicalNode();
-
     SequenceGeneratorSource source = new SequenceGeneratorSource();
     source.setChannel(new MemoryChannel());
 
@@ -161,11 +153,8 @@ public class TestAbstractLogicalNodeMana
     PollableSinkRunner sinkRunner = new PollableSinkRunner();
     sinkRunner.setSink(new NullSink());
 
-    node.setName("test");
-    node.setSourceRunner(sourceRunner);
-    node.setSinkRunner(sinkRunner);
-
-    nodeManager.add(node);
+    nodeManager.add(sourceRunner);
+    nodeManager.add(sinkRunner);
 
     for (int i = 0; i < 10; i++) {
       nodeManager.start();

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java?rev=1173415&r1=1173414&r2=1173415&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
Tue Sep 20 22:47:44 2011
@@ -3,8 +3,8 @@ package org.apache.flume.node;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flume.LogicalNode;
 import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -47,18 +47,13 @@ public class TestDefaultLogicalNodeManag
             LifecycleState.START_OR_ERROR, 5000));
 
     for (int i = 0; i < 3; i++) {
-      LogicalNode node = new LogicalNode();
-
       SequenceGeneratorSource source = new SequenceGeneratorSource();
       source.setChannel(new MemoryChannel());
 
-      PollableSourceRunner sourceChannelAdapter = new PollableSourceRunner();
-      sourceChannelAdapter.setSource(source);
-
-      node.setName("test-node-" + i);
-      node.setSourceRunner(sourceChannelAdapter);
+      PollableSourceRunner sourceRunner = new PollableSourceRunner();
+      sourceRunner.setSource(source);
 
-      nodeManager.add(node);
+      nodeManager.add(sourceRunner);
     }
 
     Thread.sleep(5000);
@@ -73,21 +68,16 @@ public class TestDefaultLogicalNodeManag
   public void testNodeStartStops() throws LifecycleException,
       InterruptedException {
 
-    Set<LogicalNode> testNodes = new HashSet<LogicalNode>();
+    Set<LifecycleAware> testNodes = new HashSet<LifecycleAware>();
 
     for (int i = 0; i < 30; i++) {
-      LogicalNode node = new LogicalNode();
-
       SequenceGeneratorSource source = new SequenceGeneratorSource();
       source.setChannel(new MemoryChannel());
 
-      PollableSourceRunner sourceChannelAdapter = new PollableSourceRunner();
-      sourceChannelAdapter.setSource(source);
+      PollableSourceRunner sourceRunner = new PollableSourceRunner();
+      sourceRunner.setSource(source);
 
-      node.setName("test-node-" + i);
-      node.setSourceRunner(sourceChannelAdapter);
-
-      testNodes.add(node);
+      testNodes.add(sourceRunner);
     }
 
     nodeManager.start();
@@ -95,7 +85,7 @@ public class TestDefaultLogicalNodeManag
         LifecycleController.waitForOneOf(nodeManager,
             LifecycleState.START_OR_ERROR, 5000));
 
-    for (LogicalNode node : testNodes) {
+    for (LifecycleAware node : testNodes) {
       nodeManager.add(node);
     }
 
@@ -110,21 +100,16 @@ public class TestDefaultLogicalNodeManag
   @Test
   public void testErrorNode() throws LifecycleException, InterruptedException {
 
-    Set<LogicalNode> testNodes = new HashSet<LogicalNode>();
+    Set<LifecycleAware> testNodes = new HashSet<LifecycleAware>();
 
     for (int i = 0; i < 30; i++) {
-      LogicalNode node = new LogicalNode();
-
       SequenceGeneratorSource source = new SequenceGeneratorSource();
       source.setChannel(new MemoryChannel());
 
-      PollableSourceRunner sourceChannelAdapter = new PollableSourceRunner();
-      sourceChannelAdapter.setSource(source);
-
-      node.setName("test-node-" + i);
-      node.setSourceRunner(sourceChannelAdapter);
+      PollableSourceRunner sourceRunner = new PollableSourceRunner();
+      sourceRunner.setSource(source);
 
-      testNodes.add(node);
+      testNodes.add(sourceRunner);
     }
 
     nodeManager.start();
@@ -132,7 +117,7 @@ public class TestDefaultLogicalNodeManag
         LifecycleController.waitForOneOf(nodeManager,
             LifecycleState.START_OR_ERROR, 5000));
 
-    for (LogicalNode node : testNodes) {
+    for (LifecycleAware node : testNodes) {
       nodeManager.add(node);
     }
 

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java?rev=1173415&r1=1173414&r2=1173415&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestFlumeNode.java
Tue Sep 20 22:47:44 2011
@@ -1,10 +1,11 @@
 package org.apache.flume.node;
 
-import org.apache.flume.LogicalNode;
+import org.apache.flume.SourceRunner;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.node.nodemanager.AbstractLogicalNodeManager;
+import org.apache.flume.source.SequenceGeneratorSource;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -50,7 +51,7 @@ public class TestFlumeNode {
     Assert.assertTrue("Matched a known state", reached);
     Assert.assertEquals(LifecycleState.START, node.getLifecycleState());
 
-    LogicalNode n1 = new LogicalNode();
+    SourceRunner n1 = SourceRunner.forSource(new SequenceGeneratorSource());
 
     node.getNodeManager().add(n1);
 



Mime
View raw message