flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pras...@apache.org
Subject svn commit: r1215015 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/collector/CollectorSink.java main/java/com/cloudera/flume/conf/FlumeConfiguration.java test/java/com/cloudera/flume/collector/TestCollectorSink.java
Date Fri, 16 Dec 2011 01:35:12 GMT
Author: prasadm
Date: Fri Dec 16 01:35:12 2011
New Revision: 1215015

URL: http://svn.apache.org/viewvc?rev=1215015&view=rev
Log:
FLUME-883. clear acks for failed batches

Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java?rev=1215015&r1=1215014&r2=1215015&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java
(original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/collector/CollectorSink.java
Fri Dec 16 01:35:12 2011
@@ -71,6 +71,7 @@ public class CollectorSink extends Event
   AckAccumulator accum = new AckAccumulator();
   final AckListener ackDest;
   final String snkSpec;
+  final boolean cleanupOnClose;
 
   // This is a container for acks that should be ready for delivery when the
   // hdfs sink is closed/flushed
@@ -88,6 +89,7 @@ public class CollectorSink extends Event
       final Tagger tagger, long checkmillis, AckListener ackDest) {
     this.ackDest = ackDest;
     this.snkSpec = snkSpec;
+    cleanupOnClose = FlumeConfiguration.get().getCollectorCloseErrorCleanup();
     roller = new RollSink(ctx, snkSpec, new TimeTrigger(tagger, millis),
         checkmillis) {
       // this is wraps the normal roll sink with an extra roll detection
@@ -167,13 +169,28 @@ public class CollectorSink extends Event
 
     @Override
     public void close() throws IOException, InterruptedException {
-      LOG.debug("closing roll detect deco {}", tag);
-      super.close();
-      flushRollAcks();
-      LOG.debug("closed  roll detect deco {}", tag);
+      try {
+        LOG.debug("closing roll detect deco {}", tag);
+        super.close();
+        clearRollAcks(true);
+        LOG.debug("closed  roll detect deco {}", tag);
+      } catch (IOException eI) {
+        /* For most sinks, if the close due to IO error then the events
+         * are not saved and need to be resent. We need to clear these
+         * acks since these events are lost. Otherwise the next successful
+         * would flush those and agent won't resend the data.
+         * If the sink can ignore the error, then
+         * flume.collector.close.cleanup needs to be set to false.
+         */
+        LOG.debug("Error in close, clearing roll acks {}", tag);
+        if (cleanupOnClose == true) {
+          clearRollAcks(false);
+        }
+        throw eI;
+      }
     }
 
-    void flushRollAcks() throws IOException {
+    private void clearRollAcks(boolean flushAcks) throws IOException {
       AckListener master = ackDest;
       Collection<String> acktags;
       synchronized (rollAckSet) {
@@ -182,8 +199,10 @@ public class CollectorSink extends Event
         LOG.debug("Roll closed, pushing acks for " + tag + " :: " + acktags);
       }
 
-      for (String at : acktags) {
-        master.end(at);
+      if (flushAcks == true) {
+        for (String at : acktags) {
+          master.end(at);
+        }
       }
     }
   };

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java?rev=1215015&r1=1215014&r2=1215015&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
(original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfiguration.java
Fri Dec 16 01:35:12 2011
@@ -186,6 +186,7 @@ public class FlumeConfiguration extends 
   public static final String COLLECTOR_ROLL_TIMEOUT = "flume.collector.roll.timeout";
   public static final String COLLECTOR_OUTPUT_FORMAT = "flume.collector.output.format";
   public static final String COLLECTOR_DFS_COMPRESS_CODEC = "flume.collector.dfs.compress.codec";
+  public static final String COLLECTOR_CLOSE_ERROR_CLEANUP = "flume.collector.close.cleanup";
 
   // TODO(henry) move these to flume.master - they now tell the master which
   // interface / port to start up on
@@ -612,6 +613,10 @@ public class FlumeConfiguration extends 
     return getInt(COLLECTOR_EVENT_PORT, 35853);
   }
 
+  public boolean getCollectorCloseErrorCleanup() {
+    return getBoolean(COLLECTOR_CLOSE_ERROR_CLEANUP, true);
+  }
+
   /**
    * This returns the type of RPC mechanism (Thrift or Avro) chosen for the
    * FlumeEventServer.

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java?rev=1215015&r1=1215014&r2=1215015&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
(original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/collector/TestCollectorSink.java
Fri Dec 16 01:35:12 2011
@@ -51,6 +51,7 @@ import com.cloudera.flume.agent.durabili
 import com.cloudera.flume.conf.Context;
 import com.cloudera.flume.conf.FlumeArgException;
 import com.cloudera.flume.conf.FlumeBuilder;
+import com.cloudera.flume.conf.FlumeConfiguration;
 import com.cloudera.flume.conf.FlumeSpecException;
 import com.cloudera.flume.conf.LogicalNodeContext;
 import com.cloudera.flume.conf.ReportTestingContext;
@@ -1016,4 +1017,66 @@ public class TestCollectorSink {
    coll.close();
    assertEquals(result.get(0), "success");
  }
+
+ /*
+  * This test verifies that the acks are cleared when a close
+  * throws an IOException
+  */
+ @Test
+ public void testAcksOnCloseError() throws IOException,
+     InterruptedException, FlumeSpecException {
+   final EventSink snk = mock(EventSink.class);
+
+   doNothing().when(snk).append((Event) anyObject());
+
+   doThrow(new IOException("Force close error")).
+       doNothing().
+       doNothing().
+       when(snk).close();
+
+   doNothing().when(snk).open();
+   SinkBuilder sb = new SinkBuilder() {
+     @Override
+     public EventSink build(Context context, String... argv) {
+       return snk;
+     }
+   };
+   SinkFactoryImpl sf = new SinkFactoryImpl();
+   sf.setSink("mIOSink", sb);
+   FlumeBuilder.setSinkFactory(sf);
+
+   final EventSink coll = FlumeBuilder.buildSink(
+       LogicalNodeContext.testingContext(), "collector(1000) { mIOSink }");
+
+   // mem source with acks injected
+   EventSource ackedmem = setupAckRoll();
+
+   coll.open();
+
+   //send first batch
+   coll.append(ackedmem.next()); // ack beg
+   coll.append(ackedmem.next()); // data
+   coll.append(ackedmem.next()); // ack end
+
+   // wait for a roll that results in IOException
+   Clock.sleep(1100);
+   // failed roll should throw away the acks
+   assertEquals(((CollectorSink)coll).rollAckSet.size(), 0);
+
+   //send second batch
+   coll.append(ackedmem.next()); // ack beg
+   coll.append(ackedmem.next()); // data
+   coll.append(ackedmem.next()); // ack end
+
+   // now we should have one ack from the last batch
+   assertEquals(((CollectorSink)coll).rollAckSet.size(), 1);
+
+   // wait for a roll that should succeed
+   Clock.sleep(1100);
+   // successful roll should clear the acks
+   assertEquals(((CollectorSink)coll).rollAckSet.size(), 0);
+
+   coll.close();
+ }
+
 }



Mime
View raw message