flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-1777. AbstractSource does not provide enough implementation for sub-classes
Date Thu, 13 Dec 2012 20:06:41 GMT
Updated Branches:
  refs/heads/flume-1.4 a96dc82c4 -> a536fe986


FLUME-1777. AbstractSource does not provide enough implementation for sub-classes

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a536fe98
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a536fe98
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a536fe98

Branch: refs/heads/flume-1.4
Commit: a536fe9863d0dd0d5354df7a97aacf95d5711ec6
Parents: a96dc82
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Dec 13 12:05:24 2012 -0800
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Thu Dec 13 12:06:23 2012 -0800

----------------------------------------------------------------------
 .../flume/source/AbstractEventDrivenSource.java    |   38 ++++
 .../flume/source/AbstractPollableSource.java       |   58 ++++++
 .../apache/flume/source/BasicSourceSemantics.java  |  148 +++++++++++++++
 .../org/apache/flume/source/http/HTTPSource.java   |    2 +-
 .../flume/source/TestAbstractPollableSource.java   |   64 +++++++
 .../flume/source/TestBasicSourceSemantics.java     |  121 ++++++++++++
 6 files changed, 430 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/a536fe98/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java
new file mode 100644
index 0000000..89bd357
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractEventDrivenSource.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+
+/**
+ * Base class which ensures sub-classes will inherit all the properties
+ * of BasicSourceSemantics. Adds no additional functionality and is provided
+ * for completeness sake.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class AbstractEventDrivenSource extends BasicSourceSemantics
+  implements EventDrivenSource {
+
+  public AbstractEventDrivenSource() {
+    super();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a536fe98/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
new file mode 100644
index 0000000..356f4d4
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AbstractPollableSource.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.PollableSource;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+
+/**
+ * Base class which ensures sub-classes will inherit all the properties
+ * of BasicSourceSemantics in addition to:
+ * <ol>
+ * <li>Ensuring when configure/start throw an exception process will not
+ * be called</li>
+ * <li>Ensure that process will not be called unless configure and start
+ * have successfully been called</li>
+ * </ol>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class AbstractPollableSource extends BasicSourceSemantics
+  implements PollableSource {
+
+  public AbstractPollableSource() {
+    super();
+  }
+  public Status process() throws EventDeliveryException {
+    Exception exception = getStartException();
+    if (exception != null) {
+      throw new FlumeException("Source had error configuring or starting",
+          exception);
+    }
+    if(!isStarted()) {
+      throw new EventDeliveryException("Source is not started");
+    }
+    return doProcess();
+  }
+
+  protected abstract Status doProcess() throws EventDeliveryException;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a536fe98/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java
b/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java
new file mode 100644
index 0000000..d2672b5
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/BasicSourceSemantics.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.source;
+
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Source;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+/**
+ * Alternative to AbstractSource, which:
+ * <ol>
+ *  <li>Ensure configure cannot be called while started</li>
+ *  <li>Exceptions thrown during configure, start, stop put source in ERROR state</li>
+ *  <li>Exceptions thrown during start, stop will be logged but not re-thrown.</li>
+ *  <li>Exception in configure disables starting</li>
+ * </ol>
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public abstract class BasicSourceSemantics implements Source, Configurable {
+  private static final Logger logger = LoggerFactory
+      .getLogger(BasicSourceSemantics.class);
+  private Exception exception;
+  private ChannelProcessor channelProcessor;
+  private String name;
+  private LifecycleState lifecycleState;
+
+  public BasicSourceSemantics() {
+    lifecycleState = LifecycleState.IDLE;
+  }
+  @Override
+  public synchronized void configure(Context context) {
+    if(isStarted()) {
+      throw new IllegalStateException("Configure called when started");
+    } else {
+      try {
+        exception = null;
+        setLifecycleState(LifecycleState.IDLE);
+        doConfigure(context);
+      } catch (Exception e) {
+        exception = e;
+        setLifecycleState(LifecycleState.ERROR);
+        // causes source to be removed by configuration code
+        Throwables.propagate(e);
+      }
+    }
+  }
+  @Override
+  public synchronized void start() {
+    if (exception != null) {
+      logger.error(String.format("Cannot start due to error: name = %s",
+          getName()), exception);
+    } else {
+      try {
+        Preconditions.checkState(channelProcessor != null,
+            "No channel processor configured");
+        doStart();
+        setLifecycleState(LifecycleState.START);
+      } catch (Exception e) {
+        logger.error(String.format(
+            "Unexpected error performing start: name = %s", getName()), e);
+        exception = e;
+        setLifecycleState(LifecycleState.ERROR);
+      }
+    }
+  }
+  @Override
+  public synchronized void stop() {
+    try {
+      doStop();
+      setLifecycleState(LifecycleState.STOP);
+    } catch (Exception e) {
+      logger.error(String.format(
+          "Unexpected error performing stop: name = %s", getName()), e);
+      setLifecycleState(LifecycleState.ERROR);
+    }
+  }
+  @Override
+  public synchronized void setChannelProcessor(ChannelProcessor cp) {
+    channelProcessor = cp;
+  }
+
+  @Override
+  public synchronized ChannelProcessor getChannelProcessor() {
+    return channelProcessor;
+  }
+  @Override
+  public synchronized void setName(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public synchronized String getName() {
+    return name;
+  }
+
+  @Override
+  public synchronized LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+
+  public String toString() {
+    return this.getClass().getName() + "{name:" + name + ",state:"
+        + lifecycleState +"}";
+  }
+
+  protected boolean isStarted() {
+    return getLifecycleState() == LifecycleState.START;
+  }
+  /**
+   * @return Exception thrown during configure() or start()
+   */
+  protected Exception getStartException() {
+    return exception;
+  }
+  protected synchronized void setLifecycleState(LifecycleState lifecycleState) {
+    this.lifecycleState = lifecycleState;
+  }
+  protected abstract void doConfigure(Context context) throws FlumeException;
+  protected abstract void doStart() throws FlumeException;
+  protected abstract void doStop() throws FlumeException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/a536fe98/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
index d4d818a..b46dc0e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -179,7 +179,7 @@ public class HTTPSource extends AbstractSource implements
         getChannelProcessor().processEventBatch(events);
       } catch (ChannelException ex) {
         LOG.warn("Error appending event to channel. "
-                + "Channel might be full. Consider increasing the channel"
+                + "Channel might be full. Consider increasing the channel "
                 + "capacity or make sure the sinks perform faster.", ex);
         response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                 "Error appending event to channel. Channel might be full."

http://git-wip-us.apache.org/repos/asf/flume/blob/a536fe98/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
new file mode 100644
index 0000000..02a2f0c
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAbstractPollableSource.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source;
+import static org.mockito.Mockito.*;
+
+import org.apache.flume.Context;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAbstractPollableSource {
+
+  private AbstractPollableSource source;
+
+  @Before
+  public void setUp() {
+    source = spy(new AbstractPollableSource() {
+      @Override
+      protected Status doProcess() throws EventDeliveryException {
+        return Status.BACKOFF;
+      }
+      @Override
+      protected void doConfigure(Context context) throws FlumeException {
+        throw new FlumeException("dummy");
+      }
+      @Override
+      protected void doStart() throws FlumeException {
+
+      }
+      @Override
+      protected void doStop() throws FlumeException {
+
+      }
+    });
+  }
+
+  @Test(expected = FlumeException.class)
+  public void testExceptionStartup() throws Exception {
+    source.configure(new Context());
+  }
+  @Test(expected = EventDeliveryException.class)
+  public void testNotStarted() throws Exception {
+    source.process();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/a536fe98/flume-ng-core/src/test/java/org/apache/flume/source/TestBasicSourceSemantics.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestBasicSourceSemantics.java
b/flume-ng-core/src/test/java/org/apache/flume/source/TestBasicSourceSemantics.java
new file mode 100644
index 0000000..9227ef8
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestBasicSourceSemantics.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.source;
+import static org.mockito.Mockito.*;
+
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestBasicSourceSemantics {
+
+  private BasicSourceSemantics source;
+  private ChannelProcessor channelProcessor;
+  private Context context;
+
+  @Before
+  public void setUp() {
+    context = new Context();
+    channelProcessor = mock(ChannelProcessor.class);
+  }
+  public DoNothingSource spyAndConfigure(DoNothingSource source) {
+    source = spy(source);
+    source.setChannelProcessor(channelProcessor);
+    source.configure(context);
+    return source;
+  }
+  @Test
+  public void testDoConfigureThrowsException() throws Exception {
+    source = spy(new DoNothingSource() {
+      @Override
+      protected void doConfigure(Context context) throws FlumeException {
+        throw new FlumeException("dummy");
+      }
+    });
+    source.setChannelProcessor(channelProcessor);
+    try {
+      source.configure(context);
+      Assert.fail();
+    } catch (FlumeException expected) {
+
+    }
+    Assert.assertFalse(source.isStarted());
+    Assert.assertEquals(LifecycleState.ERROR, source.getLifecycleState());
+    Assert.assertNotNull(source.getStartException());
+  }
+  @Test
+  public void testDoStartThrowsException() throws Exception {
+    source = spyAndConfigure(new DoNothingSource() {
+      @Override
+      protected void doStart() throws FlumeException {
+        throw new FlumeException("dummy");
+      }
+    });
+    source.start();
+    Assert.assertFalse(source.isStarted());
+    Assert.assertEquals(LifecycleState.ERROR, source.getLifecycleState());
+    Assert.assertNotNull(source.getStartException());
+  }
+  @Test
+  public void testDoStopThrowsException() throws Exception {
+    source = spyAndConfigure(new DoNothingSource() {
+      @Override
+      protected void doStop() throws FlumeException {
+        throw new FlumeException("dummy");
+      }
+    });
+    source.start();
+    source.stop();
+    Assert.assertFalse(source.isStarted());
+    Assert.assertEquals(LifecycleState.ERROR, source.getLifecycleState());
+    Assert.assertNull(source.getStartException());
+  }
+  @Test
+  public void testConfigureCalledWhenStarted() throws Exception {
+    source = spyAndConfigure(new DoNothingSource());
+    source.start();
+    try {
+      source.configure(context);
+      Assert.fail();
+    } catch (IllegalStateException expected) {
+
+    }
+    Assert.assertTrue(source.isStarted());
+    Assert.assertNull(source.getStartException());
+  }
+  private static class DoNothingSource extends BasicSourceSemantics {
+    @Override
+    protected void doConfigure(Context context) throws FlumeException {
+
+    }
+    @Override
+    protected void doStart() throws FlumeException {
+
+    }
+    @Override
+    protected void doStop() throws FlumeException {
+
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message