flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1541. Implement failover for LoadBalancingSinkProcessor.
Date Thu, 06 Sep 2012 08:50:35 GMT
Updated Branches:
  refs/heads/trunk 80699c462 -> d36861bce


FLUME-1541. Implement failover for LoadBalancingSinkProcessor.

(Juhani Connolly via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d36861bc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d36861bc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d36861bc

Branch: refs/heads/trunk
Commit: d36861bcea4e3afeb08bfddd46ce0f7533186f00
Parents: 80699c4
Author: Mike Percy <mpercy@cloudera.com>
Authored: Thu Sep 6 01:48:53 2012 -0700
Committer: Mike Percy <mpercy@cloudera.com>
Committed: Thu Sep 6 01:48:53 2012 -0700

----------------------------------------------------------------------
 .../apache/flume/sink/AbstractSinkSelector.java    |    5 +
 .../flume/sink/LoadBalancingSinkProcessor.java     |  140 +++++++++-
 .../flume/sink/TestLoadBalancingSinkProcessor.java |  219 ++++++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   40 ++-
 4 files changed, 378 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d36861bc/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
index 63397a5..3e806a7 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSinkSelector.java
@@ -63,4 +63,9 @@ public abstract class AbstractSinkSelector implements SinkSelector {
   protected List<Sink> getSinks() {
     return sinkList;
   }
+
+  @Override
+  public void informSinkFailed(Sink failedSink) {
+    // no-op
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/d36861bc/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
index 18d4509..93a46a0 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/LoadBalancingSinkProcessor.java
@@ -19,8 +19,10 @@
 package org.apache.flume.sink;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.flume.Context;
@@ -78,13 +80,13 @@ import com.google.common.base.Preconditions;
  * @see LoadBalancingSinkProcessor.SinkSelector
  */
 public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
-
   public static final String CONFIG_SELECTOR = "selector";
   public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + ".";
 
   public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN";
   public static final String SELECTOR_NAME_RANDOM = "RANDOM";
-
+  public static final String SELECTOR_NAME_ROUND_ROBIN_BACKOFF = "ROUND_ROBIN_BACKOFF";
+  public static final String SELECTOR_NAME_RANDOM_BACKOFF = "RANDOM_BACKOFF";
 
   private static final Logger LOGGER = LoggerFactory
       .getLogger(LoadBalancingSinkProcessor.class);
@@ -106,6 +108,10 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor
{
       selector = new RoundRobinSinkSelector();
     } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
       selector = new RandomOrderSinkSelector();
+    } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN_BACKOFF)) {
+      selector = new BackoffRoundRobinSinkSelector();
+    } else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM_BACKOFF)) {
+      selector = new BackoffRandomOrderSinkSelector();
     } else {
       try {
         @SuppressWarnings("unchecked")
@@ -151,6 +157,7 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor
{
         status = sink.process();
         break;
       } catch (Exception ex) {
+        selector.informSinkFailed(sink);
         LOGGER.warn("Sink failed to consume event. "
             + "Attempting next sink if available.", ex);
       }
@@ -191,6 +198,8 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor
{
     void setSinks(List<Sink> sinks);
 
     Iterator<Sink> createSinkIterator();
+
+    void informSinkFailed(Sink failedSink);
   }
 
   /**
@@ -248,4 +257,131 @@ public class LoadBalancingSinkProcessor extends AbstractSinkProcessor
{
       return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
     }
   }
+
+  private static class FailureState {
+    long lastFail;
+    long restoreTime;
+    int sequentialFails;
+  }
+
+  public static abstract class AbstractBackoffSinkSelector extends AbstractSinkSelector {
+    // 2 ^ 16 seconds should be more than enough for an upper limit...
+    private static final int EXP_BACKOFF_COUNTER_LIMIT = 16;
+    private static final String CONF_MAX_TIMEOUT = "maxBackoffMillis";
+    private static final long CONSIDER_SEQUENTIAL_RANGE = 2000l;
+    private static final long MAX_TIMEOUT = 30000l;
+
+    protected List<FailureState> sinkStates;
+    protected Map<Sink, FailureState> stateMap;
+    protected  long maxTimeout = MAX_TIMEOUT;
+
+    @Override
+    public void configure(Context context) {
+      super.configure(context);
+      maxTimeout = context.getLong(CONF_MAX_TIMEOUT, MAX_TIMEOUT);
+    }
+
+    @Override
+    public void setSinks(List<Sink> sinks) {
+      super.setSinks(sinks);
+      sinkStates = new ArrayList<FailureState>();
+      stateMap = new HashMap<Sink, FailureState>();
+      for(Sink sink : sinks) {
+        FailureState state = new FailureState();
+        sinkStates.add(state);
+        stateMap.put(sink, state);
+      }
+    }
+
+    @Override
+    public void informSinkFailed(Sink failedSink) {
+      super.informSinkFailed(failedSink);
+      FailureState state = stateMap.get(failedSink);
+      long now = System.currentTimeMillis();
+      long delta = now - state.lastFail;
+
+      long lastBackoffLength = Math.min(MAX_TIMEOUT, 1000 * (1 << state.sequentialFails));
+      long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
+      if( allowableDiff > delta ) {
+        if(state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT)
+        state.sequentialFails++;
+      } else {
+        state.sequentialFails = 1;
+      }
+      state.lastFail = now;
+      state.restoreTime = now + Math.min(MAX_TIMEOUT, 1000 * (1 << state.sequentialFails));
+    }
+
+  }
+
+
+  private static class BackoffRoundRobinSinkSelector extends AbstractBackoffSinkSelector
{
+    private int nextHead = 0;
+
+    @Override
+    public Iterator<Sink> createSinkIterator() {
+      long curTime = System.currentTimeMillis();
+      List<Integer> activeIndices = new ArrayList<Integer>();
+      int index = 0;
+      for(FailureState state : sinkStates) {
+        if (state.restoreTime < curTime) {
+          activeIndices.add(index);
+        }
+        index++;
+      }
+
+      int size = activeIndices.size();
+      // possible that the size has shrunk so gotta adjust nextHead for that
+      if(nextHead >= size) {
+        nextHead = 0;
+      }
+      int begin = nextHead++;
+      if (nextHead == activeIndices.size()) {
+        nextHead = 0;
+      }
+
+      int[] indexOrder = new int[size];
+
+      for (int i=0; i < size; i++) {
+        indexOrder[i] = activeIndices.get((begin + i) % size);
+      }
+
+      return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
+    }
+  }
+
+  /**
+   * A sink selector that implements a random sink selection policy. This
+   * implementation is not thread safe.
+   */
+  private static class BackoffRandomOrderSinkSelector extends AbstractBackoffSinkSelector
{
+    private Random random = new Random(System.currentTimeMillis());
+
+    @Override
+    public Iterator<Sink> createSinkIterator() {
+      long now = System.currentTimeMillis();
+
+      List<Integer> indexList = new ArrayList<Integer>();
+
+      int i = 0;
+      for (FailureState state : sinkStates) {
+        if(state.restoreTime < now)
+          indexList.add(i);
+        i++;
+      }
+
+      int size = indexList.size();
+      int[] indexOrder = new int[size];
+
+      while (indexList.size() != 1) {
+        int pick = random.nextInt(indexList.size());
+        indexOrder[indexList.size() - 1] = indexList.remove(pick);
+      }
+
+      indexOrder[0] = indexList.get(0);
+
+      return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/d36861bc/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
index 1e9c94e..981f88e 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
@@ -113,7 +113,7 @@ public class TestLoadBalancingSinkProcessor {
     s1.setChannel(ch);
 
     // s1 always fails
-    s1.setFail();
+    s1.setFail(true);
 
     MockSink s2 = new MockSink(2);
     s2.setChannel(ch);
@@ -123,7 +123,7 @@ public class TestLoadBalancingSinkProcessor {
     s3.setChannel(ch);
 
     // s3 always fails
-    s3.setFail();
+    s3.setFail(true);
 
     List<Sink> sinks = new ArrayList<Sink>();
     sinks.add(s1);
@@ -143,6 +143,65 @@ public class TestLoadBalancingSinkProcessor {
   }
 
   @Test
+  public void testRandomBackoff() throws Exception {
+    Channel ch = new MockChannel();
+    int n = 100;
+    int numEvents = n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    // s1 always fails
+    s1.setFail(true);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+    MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    // s3 always fails
+    s3.setFail(true);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("random_backoff", sinks);
+
+    // TODO: there is a remote possibility that s0 or s2
+    // never get hit by the random assignment
+    // and thus not backoffed, causing the test to fail
+    for(int i=0; i < 50; i++) {
+      // a well behaved runner would always check the return.
+      lbsp.process();
+    }
+    Assert.assertEquals(50, s2.getEvents().size());
+    s2.setFail(true);
+    s1.setFail(false); // s1 should still be backed off
+    try {
+      lbsp.process();
+      // nothing should be able to process right now
+      Assert.fail("Expected EventDeliveryException");
+    } catch (EventDeliveryException e) {
+      // this is expected
+    }
+    Thread.sleep(2100); // wait for s1 to no longer be backed off
+    Sink.Status s = Sink.Status.READY;
+    while (s != Sink.Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertEquals(50, s1.getEvents().size());
+    Assert.assertEquals(50, s2.getEvents().size());
+    Assert.assertEquals(0, s3.getEvents().size());
+  }
+
+  @Test
   public void testRandomPersistentFailure() throws Exception {
     Channel ch = new MockChannel();
     int n = 100;
@@ -158,7 +217,7 @@ public class TestLoadBalancingSinkProcessor {
     s2.setChannel(ch);
 
     // s2 always fails
-    s2.setFail();
+    s2.setFail(true);
 
     MockSink s3 = new MockSink(3);
     s3.setChannel(ch);
@@ -272,7 +331,7 @@ public class TestLoadBalancingSinkProcessor {
     s1.setChannel(ch);
 
     // s1 always fails
-    s1.setFail();
+    s1.setFail(true);
 
     MockSink s2 = new MockSink(2);
     s2.setChannel(ch);
@@ -282,7 +341,7 @@ public class TestLoadBalancingSinkProcessor {
     s3.setChannel(ch);
 
     // s3 always fails
-    s3.setFail();
+    s3.setFail(true);
 
     List<Sink> sinks = new ArrayList<Sink>();
     sinks.add(s1);
@@ -317,7 +376,7 @@ public class TestLoadBalancingSinkProcessor {
     s2.setChannel(ch);
 
     // s2 always fails
-    s2.setFail();
+    s2.setFail(true);
 
     MockSink s3 = new MockSink(3);
     s3.setChannel(ch);
@@ -339,6 +398,148 @@ public class TestLoadBalancingSinkProcessor {
     Assert.assertTrue(s3.getEvents().size() == 2*n);
   }
 
+  // test that even if the sink recovers immediately that it is kept out of commission briefly
+  // test also verifies that when a sink fails, events are balanced over remaining sinks
+  @Test
+  public void testRoundRobinBackoffInitialFailure() throws EventDeliveryException {
+    Channel ch = new MockChannel();
+    int n = 100;
+    int numEvents = 3*n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+
+      MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks);
+
+    Status s = Status.READY;
+    for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
+      s = lbsp.process();
+    }
+    s2.setFail(true);
+    for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
+      s = lbsp.process();
+    }
+    s2.setFail(false);
+    while (s != Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertEquals((3 * n) / 2, s1.getEvents().size());
+    Assert.assertEquals(1, s2.getEvents().size());
+    Assert.assertEquals((3 * n) /2 - 1, s3.getEvents().size());
+  }
+
+  @Test
+  public void testRoundRobinBackoffIncreasingBackoffs() throws EventDeliveryException, InterruptedException
{
+    Channel ch = new MockChannel();
+    int n = 100;
+    int numEvents = 3*n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+    s2.setFail(true);
+
+      MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks);
+
+    Status s = Status.READY;
+    for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
+      s = lbsp.process();
+    }
+    Assert.assertEquals(0, s2.getEvents().size());
+    Thread.sleep(2100);
+    // this should let the sink come out of backoff and get backed off  for a longer time
+    for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
+      s = lbsp.process();
+    }
+    Assert.assertEquals(0, s2.getEvents().size());
+    s2.setFail(false);
+    Thread.sleep(2100);
+    // this time it shouldn't come out of backoff yet as the timeout isn't over
+    for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
+      s = lbsp.process();
+    }
+    Assert.assertEquals(0, s2.getEvents().size());
+    // after this s2 should be receiving events agains
+    Thread.sleep(2100);
+    while (s != Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertEquals( n + 2, s1.getEvents().size());
+    Assert.assertEquals( n - 3, s2.getEvents().size());
+    Assert.assertEquals( n + 1, s3.getEvents().size());
+  }
+
+  @Test
+  public void testRoundRobinBackoffFailureRecovery() throws EventDeliveryException, InterruptedException
{
+    Channel ch = new MockChannel();
+    int n = 100;
+    int numEvents = 3*n;
+    for (int i = 0; i < numEvents; i++) {
+      ch.put(new MockEvent("test" + i));
+    }
+
+    MockSink s1 = new MockSink(1);
+    s1.setChannel(ch);
+
+    MockSink s2 = new MockSink(2);
+    s2.setChannel(ch);
+    s2.setFail(true);
+
+      MockSink s3 = new MockSink(3);
+    s3.setChannel(ch);
+
+    List<Sink> sinks = new ArrayList<Sink>();
+    sinks.add(s1);
+    sinks.add(s2);
+    sinks.add(s3);
+
+    LoadBalancingSinkProcessor lbsp = getProcessor("round_robin_backoff",sinks);
+
+    Status s = Status.READY;
+    for (int i = 0; i < 3 && s != Status.BACKOFF; i++) {
+      s = lbsp.process();
+    }
+    s2.setFail(false);
+    Thread.sleep(2000);
+    while (s != Status.BACKOFF) {
+      s = lbsp.process();
+    }
+
+    Assert.assertEquals(n + 1, s1.getEvents().size());
+    Assert.assertEquals(n - 1,  s2.getEvents().size());
+    Assert.assertEquals(n, s3.getEvents().size());
+  }
+
+
   @Test
   public void testRoundRobinNoFailure() throws Exception {
 
@@ -388,7 +589,7 @@ public class TestLoadBalancingSinkProcessor {
     s1.setChannel(ch);
 
     // s1 always fails
-    s1.setFail();
+    s1.setFail(true);
 
     MockSink s2 = new MockSink(2);
     s2.setChannel(ch);
@@ -436,8 +637,8 @@ public class TestLoadBalancingSinkProcessor {
       return id;
     }
 
-    void setFail() {
-      fail = true;
+    void setFail(boolean bFail) {
+      fail = bFail;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/d36861bc/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index ffed72b..f9f2383 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1603,28 +1603,38 @@ Load balancing Sink Processor
 Load balancing sink processor provides the ability to load-balance flow over
 multiple sinks. It maintains an indexed list of active sinks on which the
 load must be distributed. Implementation supports distributing load using
-either via ``ROUND_ROBIN`` or via ``RANDOM`` selection mechanism. The choice
-of selection mechanism defaults to ``ROUND_ROBIN`` type, but can be overridden
+either via ``ROUND_ROBIN``, ``RANDOM``, ``ROUND_ROBIN_BACKOFF``, or
+``RANDOM_BACKOFF`` selection mechanisms. The choice of selection mechanism
+defaults to ``ROUND_ROBIN`` type, but can be overridden
 via configuration. Custom selection mechanisms are supported via custom
 classes that inherits from ``LoadBalancingSelector``.
 
 When invoked, this selector picks the next sink using its configured selection
-mechanism and invokes it. In case the selected sink fails to deliver the event,
-the processor picks the next available sink via its configured selection mechanism.
-This implementation does not blacklist the failing sink and instead continues
-to optimistically attempt every available sink. If all sinks invocations
-result in failure, the selector propagates the failure to the sink runner.
+mechanism and invokes it. For ROUND_ROBIN and RANDOM In case the selected sink
+fails to deliver the event, the processor picks the next available sink via
+its configured selection mechanism. This implementation does not blacklist
+the failing sink and instead continues to optimistically attempt every
+available sink. If all sinks invocations result in failure, the selector
+propagates the failure to the sink runner. The BACKOFF variants will blacklist
+sinks that fail, removing them for selection for a given timeout. When the
+timeout ends, if the sink is still unresponsive timeout is increased
+exponentially to avoid potentially getting stuck in long waits on unresponsive
+sinks.
+
+
 
 Required properties are in **bold**.
 
-=============================  ===============  ===============================================================
-Property Name                  Default          Description
-=============================  ===============  ===============================================================
-**processor.sinks**            --               Space separated list of sinks that are participating
in the group
-**processor.type**             ``default``      The component type name, needs to be ``load_balance``
-processor.selector             ``ROUND_ROBIN``  Selection mechanism. Must be either ``ROUND_ROBIN``,
``RANDOM``
-                                                or custom FQDN to class that inherits from
``LoadBalancingSelector``
-=============================  ===============  ===============================================================
+====================================  ===============  ===============================================================
+Property Name                         Default          Description
+====================================  ===============  ===============================================================
+**processor.sinks**            --                      Space separated list of sinks that
are participating in the group
+**processor.type**                    ``default``      The component type name, needs to
be ``load_balance``
+processor.selector                    ``ROUND_ROBIN``  Selection mechanism. Must be either
``ROUND_ROBIN``, ``RANDOM``
+                                                       ``ROUND_ROBIN_BACKOFF``, ``RANDOM_BACKOFF``
or custom FQDN to
+                                                       class that inherits from ``LoadBalancingSelector``
+processor.selector.maxBackoffMillis   30000            used by backoff selectors to limit
exponential backoff in miliseconds
+====================================  ===============  ===============================================================
 
 Example for agent named **agent_foo**:
 


Mime
View raw message