incubator-flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1209693 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/sink/AvroSink.java test/java/org/apache/flume/sink/TestAvroSink.java
Date Fri, 02 Dec 2011 21:25:40 GMT
Author: esammer
Date: Fri Dec  2 21:25:40 2011
New Revision: 1209693

URL: http://svn.apache.org/viewvc?rev=1209693&view=rev
Log:
FLUME-827: Avro client conn failure results in 60-second wait before terminating

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1209693&r1=1209692&r2=1209693&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
Fri Dec  2 21:25:40 2011
@@ -143,29 +143,50 @@ public class AvroSink extends AbstractSi
     Preconditions.checkState(port != null, "No port specified");
   }
 
+  private void createConnection() throws IOException {
+    if (transceiver == null) {
+      logger.debug("Creating new tranceiver connection to hostname:{} port:{}",
+          hostname, port);
+      transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
+    }
+
+    if (client == null) {
+      logger.debug("Creating Avro client with tranceiver:{}", transceiver);
+      client = SpecificRequestor.getClient(AvroSourceProtocol.class,
+          transceiver);
+    }
+  }
+
+  private void destroyConnection() {
+    if (transceiver != null) {
+      logger.debug("Destroying tranceiver:{}", transceiver);
+      try {
+        transceiver.close();
+      } catch (IOException e) {
+        logger
+            .error(
+                "Attempt to clean up avro tranceiver after client error failed. Exception
follows.",
+                e);
+      }
+
+      transceiver = null;
+    }
+
+    client = null;
+  }
+
   @Override
   public void start() {
     logger.info("Avro sink starting");
 
     try {
-      transceiver = new NettyTransceiver(new InetSocketAddress(hostname, port));
-      client = SpecificRequestor.getClient(AvroSourceProtocol.class,
-          transceiver);
+      createConnection();
     } catch (Exception e) {
       logger.error("Unable to create avro client using hostname:" + hostname
           + " port:" + port + ". Exception follows.", e);
 
       /* Try to prevent leaking resources. */
-      if (transceiver != null) {
-        try {
-          transceiver.close();
-        } catch (IOException e1) {
-          logger
-              .error(
-                  "Attempt to clean up avro tranceiver after client error failed. Exception
follows.",
-                  e1);
-        }
-      }
+      destroyConnection();
 
       /* FIXME: Mark ourselves as failed. */
       return;
@@ -180,12 +201,7 @@ public class AvroSink extends AbstractSi
   public void stop() {
     logger.info("Avro sink stopping");
 
-    try {
-      transceiver.close();
-    } catch (IOException e) {
-      logger.error(
-          "Unable to shut down avro tranceiver - Possible resource leak!", e);
-    }
+    destroyConnection();
 
     super.stop();
 
@@ -200,6 +216,7 @@ public class AvroSink extends AbstractSi
 
     try {
       transaction.begin();
+      createConnection();
 
       List<AvroFlumeEvent> batch = new LinkedList<AvroFlumeEvent>();
 
@@ -243,6 +260,12 @@ public class AvroSink extends AbstractSi
       transaction.rollback();
       logger.error("Unable to send event batch. Exception follows.", e);
       status = Status.BACKOFF;
+    } catch (Exception e) {
+      transaction.rollback();
+      logger.error(
+          "Unable to communicate with Avro server. Exception follows.", e);
+      status = Status.BACKOFF;
+      destroyConnection();
     } finally {
       transaction.close();
     }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1209693&r1=1209692&r2=1209693&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
Fri Dec  2 21:25:40 2011
@@ -126,6 +126,51 @@ public class TestAvroSink {
     server.close();
   }
 
+  @Test
+  public void testFailedConnect() throws InterruptedException,
+      EventDeliveryException {
+
+    Event event = EventBuilder.withBody("test event 1".getBytes(),
+        new HashMap<String, String>());
+    Server server = createServer();
+
+    server.start();
+    sink.start();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.START_OR_ERROR, 5000));
+
+    server.close();
+
+    Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+    for (int i = 0; i < 10; i++) {
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    for (int i = 0; i < 5; i++) {
+      PollableSink.Status status = sink.process();
+      Assert.assertEquals(PollableSink.Status.BACKOFF, status);
+    }
+
+    server = createServer();
+    server.start();
+
+    for (int i = 0; i < 5; i++) {
+      PollableSink.Status status = sink.process();
+      Assert.assertEquals(PollableSink.Status.READY, status);
+    }
+
+    Assert.assertEquals(PollableSink.Status.BACKOFF, sink.process());
+
+    sink.stop();
+    Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+        LifecycleState.STOP_OR_ERROR, 5000));
+    server.close();
+  }
+
   private Server createServer() {
     Server server = new NettyServer(new SpecificResponder(
         AvroSourceProtocol.class, new MockAvroServer()), new InetSocketAddress(



Mime
View raw message