flume-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hiroyuki Kakine <kak...@iij.ad.jp>
Subject Channel's start() and stop()
Date Mon, 23 Apr 2012 10:55:18 GMT
Hi there,

I'm Kakine from IIJ.  I'm a new to here.  Nice to meet you.

I expected Channel's start() and stop() are performed, because Channel
is LifecycleAware.  But as far as my investigation of the source code,
none seems to call start() and stop() of it.

Below patch is to add calling Channel's start() and stop() on the node
configuration changes.  Do you think this change is ok to go in?

Thanks,

Index: flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
===================================================================
--- flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
(revision 1325150)
+++ flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
(working copy)
@@ -21,6 +21,7 @@
 
 import java.util.Map.Entry;
 
+import org.apache.flume.Channel;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.lifecycle.LifecycleAware;
@@ -73,6 +74,15 @@
           logger.error("Error while stopping {}", entry.getValue(), e);
         }
       }
+
+      for (Entry<String, Channel> entry : nodeConfiguration
+          .getChannels().entrySet()) {
+        try{
+          nodeSupervisor.unsupervise(entry.getValue());
+        } catch (Exception e) {
+          logger.error("Error while stopping {}", entry.getValue(), e);
+        }
+      }
     }
 
     this.nodeConfiguration = nodeConfiguration;
@@ -95,6 +105,16 @@
         logger.error("Error while starting {}", entry.getValue(), e);
       }
     }
+
+    for (Entry<String, Channel> entry : nodeConfiguration
+        .getChannels().entrySet()) {
+      try{
+        nodeSupervisor.supervise(entry.getValue(),
+          new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
+      } catch (Exception e) {
+        logger.error("Error while starting {}", entry.getValue(), e);
+      }
+    }
   }
 
   @Override
---------------------------
Hiroyuki Kakine <kakine@iij.ad.jp>
Internet Initiative Japan Inc.


Mime
View raw message