flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject svn commit: r1354795 - in /incubator/flume/trunk: flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
Date Thu, 28 Jun 2012 02:02:02 GMT
Author: hshreedharan
Date: Thu Jun 28 02:02:01 2012
New Revision: 1354795

URL: http://svn.apache.org/viewvc?rev=1354795&view=rev
Log:
FLUME-1329. AvroSink can hang during Avro RPC handshake.

(Mike Percy via Hari Shreedharan)


Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
    incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1354795&r1=1354794&r2=1354795&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
Thu Jun 28 02:02:01 2012
@@ -141,9 +141,9 @@ public class AvroSink extends AbstractSi
   private void createConnection() throws FlumeException {
 
     if (client == null) {
-      logger.debug(
-          "Building RpcClient with hostname:{}, port:{}, batchSize:{}",
-          new Object[] { hostname, port, batchSize });
+      logger.debug("Avro sink {}: Building RpcClient with hostname: {}, " +
+          "port: {}, batchSize: {}",
+          new Object[] { getName(), hostname, port, batchSize });
 
        client = RpcClientFactory.getDefaultInstance(hostname, port, batchSize);
     }
@@ -152,12 +152,12 @@ public class AvroSink extends AbstractSi
 
   private void destroyConnection() {
     if (client != null) {
-      logger.debug("Closing avro client:{}", client);
+      logger.debug("Avro sink {} closing avro client: {}", getName(), client);
       try {
         client.close();
       } catch (FlumeException e) {
-        logger.error("Attempt to close avro client failed. Exception follows.",
-            e);
+        logger.error("Avro sink " + getName() + ": Attempt to close avro " +
+            "client failed. Exception follows.", e);
       }
     }
 
@@ -190,7 +190,7 @@ public class AvroSink extends AbstractSi
    */
   @Override
   public void start() {
-    logger.info("Avro sink starting");
+    logger.info("Starting {}...", this);
 
     try {
       createConnection();
@@ -205,18 +205,24 @@ public class AvroSink extends AbstractSi
 
     super.start();
 
-    logger.debug("Avro sink started");
+    logger.info("Avro sink {} started.", getName());
   }
 
   @Override
   public void stop() {
-    logger.info("Avro sink stopping");
+    logger.info("Avro sink {} stopping...", getName());
 
     destroyConnection();
 
     super.stop();
 
-    logger.debug("Avro sink stopped. Metrics:{}", counterGroup);
+    logger.info("Avro sink {} stopped. Metrics: {}", getName(), counterGroup);
+  }
+
+  @Override
+  public String toString() {
+    return "AvroSink " + getName() + " { host: " + hostname + ", port: " +
+        port + " }";
   }
 
   @Override
@@ -255,7 +261,8 @@ public class AvroSink extends AbstractSi
 
     } catch (ChannelException e) {
       transaction.rollback();
-      logger.error("Unable to get event from channel. Exception follows.", e);
+      logger.error("Avro Sink " + getName() + ": Unable to get event from" +
+          " channel. Exception follows.", e);
       status = Status.BACKOFF;
 
     } catch (Exception ex) {

Modified: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java?rev=1354795&r1=1354794&r2=1354795&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
(original)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
Thu Jun 28 02:02:01 2012
@@ -27,9 +27,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -59,6 +63,7 @@ import org.slf4j.LoggerFactory;
 public class NettyAvroRpcClient extends AbstractRpcClient
 implements RpcClient {
 
+  private ExecutorService callTimeoutPool;
   private final ReentrantLock stateLock = new ReentrantLock();
 
   /**
@@ -114,6 +119,8 @@ implements RpcClient {
    * @throws FlumeException
    */
   private void connect(long timeout, TimeUnit tu) throws FlumeException {
+    callTimeoutPool = Executors.newCachedThreadPool(
+        new TransceiverThreadFactory("Flume Avro RPC Client Call Invoker"));
     try {
       transceiver = new NettyTransceiver(this.address,
           new NioClientSocketChannelFactory(
@@ -126,8 +133,7 @@ implements RpcClient {
           SpecificRequestor.getClient(AvroSourceProtocol.Callback.class,
           transceiver);
     } catch (IOException ex) {
-      logger.error("RPC connection error :" , ex);
-      throw new FlumeException("RPC connection error. Exception follows.", ex);
+      throw new FlumeException(this + ": RPC connection error", ex);
     }
 
     setState(ConnState.READY);
@@ -135,12 +141,25 @@ implements RpcClient {
 
   @Override
   public void close() throws FlumeException {
+    if (callTimeoutPool != null) {
+      callTimeoutPool.shutdown();
+      while (!callTimeoutPool.isTerminated()) {
+        try {
+          callTimeoutPool.awaitTermination(requestTimeout,
+              TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ex) {
+          logger.warn(this + ": Interrupted during close", ex);
+          callTimeoutPool.shutdownNow();
+          Thread.currentThread().interrupt();
+          break;
+        }
+      }
+      callTimeoutPool = null;
+    }
     try {
       transceiver.close();
     } catch (IOException ex) {
-      logger.error("Error closing transceiver. " , ex);
-      throw new FlumeException("Error closing transceiver. Exception follows.",
-          ex);
+      throw new FlumeException(this + ": Error closing transceiver.", ex);
     } finally {
       setState(ConnState.DEAD);
     }
@@ -148,14 +167,23 @@ implements RpcClient {
   }
 
   @Override
+  public String toString() {
+    return "NettyAvroRpcClient { host: " + address.getHostName() + ", port: " +
+        address.getPort() + " }";
+  }
+
+  @Override
   public void append(Event event) throws EventDeliveryException {
     try {
       append(event, requestTimeout, TimeUnit.MILLISECONDS);
-    } catch (Exception e) {
+    } catch (Throwable t) {
       // we mark as no longer active without trying to clean up resources
       // client is required to call close() to clean up resources
       setState(ConnState.DEAD);
-      throw new EventDeliveryException("failed to send event", e);
+      if (t instanceof Error) {
+        throw (Error) t;
+      }
+      throw new EventDeliveryException(this + ": Failed to send event", t);
     }
   }
 
@@ -164,17 +192,41 @@ implements RpcClient {
 
     assertReady();
 
-    CallFuture<Status> callFuture = new CallFuture<Status>();
+    final CallFuture<Status> callFuture = new CallFuture<Status>();
+
+    final AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+    avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
+    avroEvent.setHeaders(toCharSeqMap(event.getHeaders()));
 
+    Future<Void> handshake;
     try {
-      AvroFlumeEvent avroEvent = new AvroFlumeEvent();
-      avroEvent.setBody(ByteBuffer.wrap(event.getBody()));
-      avroEvent.setHeaders(toCharSeqMap(event.getHeaders()));
-      avroClient.append(avroEvent, callFuture);
-    } catch (IOException ex) {
-      logger.error("RPC request IO exception. " , ex);
-      throw new EventDeliveryException("RPC request IO exception. " +
-          "Exception follows.", ex);
+      // due to AVRO-1122, avroClient.append() may block
+      handshake = callTimeoutPool.submit(new Callable<Void>() {
+
+        @Override
+        public Void call() throws Exception {
+          avroClient.append(avroEvent, callFuture);
+          return null;
+        }
+      });
+    } catch (RejectedExecutionException ex) {
+      throw new EventDeliveryException(this + ": Executor error", ex);
+    }
+
+    try {
+      handshake.get(connectTimeout, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException ex) {
+      throw new EventDeliveryException(this + ": Handshake timed out", ex);
+    } catch (InterruptedException ex) {
+      throw new EventDeliveryException(this + ": Interrupted in handshake", ex);
+    } catch (ExecutionException ex) {
+      throw new EventDeliveryException(this + ": RPC request exception", ex);
+    } catch (CancellationException ex) {
+      throw new EventDeliveryException(this + ": RPC request cancelled", ex);
+    } finally {
+      if (!handshake.isDone()) {
+        handshake.cancel(true);
+      }
     }
 
     waitForStatusOK(callFuture, timeout, tu);
@@ -185,11 +237,15 @@ implements RpcClient {
     try {
       appendBatch(events, requestTimeout,
           TimeUnit.MILLISECONDS);
-    } catch (Exception e) {
+
+    } catch (Throwable t) {
       // we mark as no longer active without trying to clean up resources
       // client is required to call close() to clean up resources
       setState(ConnState.DEAD);
-      throw new EventDeliveryException(e);
+      if (t instanceof Error) {
+        throw (Error) t;
+      }
+      throw new EventDeliveryException(this + ": Failed to send batch", t);
     }
   }
 
@@ -199,7 +255,7 @@ implements RpcClient {
     assertReady();
 
     Iterator<Event> iter = events.iterator();
-    List<AvroFlumeEvent> avroEvents = new LinkedList<AvroFlumeEvent>();
+    final List<AvroFlumeEvent> avroEvents = new LinkedList<AvroFlumeEvent>();
 
     // send multiple batches... bail if there is a problem at any time
     while (iter.hasNext()) {
@@ -213,13 +269,38 @@ implements RpcClient {
         avroEvents.add(avroEvent);
       }
 
-      CallFuture<Status> callFuture = new CallFuture<Status>();
+      final CallFuture<Status> callFuture = new CallFuture<Status>();
+
+      Future<Void> handshake;
       try {
-        avroClient.appendBatch(avroEvents, callFuture);
-      } catch (IOException ex) {
-        logger.error("RPC request IO exception. " , ex);
-        throw new EventDeliveryException("RPC request IO exception. " +
-            "Exception follows.", ex);
+        // due to AVRO-1122, avroClient.appendBatch() may block
+        handshake = callTimeoutPool.submit(new Callable<Void>() {
+
+          @Override
+          public Void call() throws Exception {
+            avroClient.appendBatch(avroEvents, callFuture);
+            return null;
+          }
+        });
+      } catch (RejectedExecutionException ex) {
+        throw new EventDeliveryException(this + ": Executor error", ex);
+      }
+
+      try {
+        handshake.get(connectTimeout, TimeUnit.MILLISECONDS);
+      } catch (TimeoutException ex) {
+        throw new EventDeliveryException(this + ": Handshake timed out", ex);
+      } catch (InterruptedException ex) {
+        throw new EventDeliveryException(this + ": Interrupted in handshake",
+            ex);
+      } catch (ExecutionException ex) {
+        throw new EventDeliveryException(this + ": RPC request exception", ex);
+      } catch (CancellationException ex) {
+        throw new EventDeliveryException(this + ": RPC request cancelled", ex);
+      } finally {
+        if (!handshake.isDone()) {
+          handshake.cancel(true);
+        }
       }
 
       waitForStatusOK(callFuture, timeout, tu);
@@ -234,31 +315,24 @@ implements RpcClient {
    * @param tu Time Unit of {@code timeout}
    * @throws EventDeliveryException If there is a timeout or if Status != OK
    */
-  private static void waitForStatusOK(CallFuture<Status> callFuture,
+  private void waitForStatusOK(CallFuture<Status> callFuture,
       long timeout, TimeUnit tu) throws EventDeliveryException {
     try {
       Status status = callFuture.get(timeout, tu);
       if (status != Status.OK) {
-        logger.error("Status (" + status + ") is not OK");
-        throw new EventDeliveryException("Status (" + status + ") is not OK");
+        throw new EventDeliveryException(this + ": Avro RPC call returned " +
+            "Status: " + status);
       }
     } catch (CancellationException ex) {
-      logger.error("RPC future was cancelled." , ex);
-      throw new EventDeliveryException("RPC future was cancelled." +
-          " Exception follows.", ex);
+      throw new EventDeliveryException(this + ": RPC future was cancelled", ex);
     } catch (ExecutionException ex) {
-      logger.error("Exception thrown from remote handler." , ex);
-      throw new EventDeliveryException("Exception thrown from remote handler." +
-          " Exception follows.", ex);
+      throw new EventDeliveryException(this + ": Exception thrown from " +
+          "remote handler", ex);
     } catch (TimeoutException ex) {
-      logger.error("RPC request timed out." , ex);
-      throw new EventDeliveryException("RPC request timed out." +
-          " Exception follows.", ex);
+      throw new EventDeliveryException(this + ": RPC request timed out", ex);
     } catch (InterruptedException ex) {
-      logger.error("RPC request interrupted." , ex);
       Thread.currentThread().interrupt();
-      throw new EventDeliveryException("RPC request interrupted." +
-          " Exception follows.", ex);
+      throw new EventDeliveryException(this + ": RPC request interrupted", ex);
     }
   }
 
@@ -274,7 +348,6 @@ implements RpcClient {
     stateLock.lock();
     try {
       if (connState == ConnState.DEAD && connState != newState) {
-        logger.error("Cannot transition from CLOSED state.");
         throw new IllegalStateException("Cannot transition from CLOSED state.");
       }
       connState = newState;
@@ -291,7 +364,6 @@ implements RpcClient {
     try {
       ConnState curState = connState;
       if (curState != ConnState.READY) {
-        logger.error("RPC failed, client in an invalid state: " + curState);
         throw new EventDeliveryException("RPC failed, client in an invalid " +
             "state: " + curState);
       }
@@ -346,8 +418,6 @@ implements RpcClient {
     stateLock.lock();
     try{
       if(connState == ConnState.READY || connState == ConnState.DEAD){
-        logger.error("This client was already configured, " +
-            "cannot reconfigure.");
         throw new FlumeException("This client was already configured, " +
             "cannot reconfigure.");
       }
@@ -364,7 +434,7 @@ implements RpcClient {
         batchSize = Integer.parseInt(strbatchSize);
       } catch (NumberFormatException e) {
         logger.warn("Batchsize is not valid for RpcClient: " + strbatchSize +
-            ".Default value assigned.", e);
+            ". Default value assigned.", e);
       }
     }
 
@@ -375,8 +445,7 @@ implements RpcClient {
     if (hostNames != null && !hostNames.isEmpty()) {
       hosts = hostNames.split("\\s+");
     } else {
-      logger.error("Hosts list is invalid: "+ hostNames);
-      throw new FlumeException("Hosts list is invalid: "+ hostNames);
+      throw new FlumeException("Hosts list is invalid: " + hostNames);
     }
 
     if (hosts.length > 1) {
@@ -388,20 +457,17 @@ implements RpcClient {
     String host = properties.getProperty(
         RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX+hosts[0]);
     if (host == null || host.isEmpty()) {
-      logger.error("Host not found: " + hosts[0]);
       throw new FlumeException("Host not found: " + hosts[0]);
     }
     String[] hostAndPort = host.split(":");
     if (hostAndPort.length != 2){
-      logger.error("Invalid hostname, " + hosts[0]);
-      throw new FlumeException("Invalid hostname, " + hosts[0]);
+      throw new FlumeException("Invalid hostname: " + hosts[0]);
     }
     Integer port = null;
     try {
       port = Integer.parseInt(hostAndPort[1]);
     } catch (NumberFormatException e) {
-      logger.error("Invalid Port:" + hostAndPort[1], e);
-      throw new FlumeException("Invalid Port:" + hostAndPort[1], e);
+      throw new FlumeException("Invalid Port: " + hostAndPort[1], e);
     }
     this.address = new InetSocketAddress(hostAndPort[0], port);
 

Modified: incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java?rev=1354795&r1=1354794&r2=1354795&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
(original)
+++ incubator/flume/trunk/flume-ng-sdk/src/main/java/org/apache/flume/api/RpcClientFactory.java
Thu Jun 28 02:02:01 2012
@@ -145,7 +145,7 @@ public class RpcClientFactory {
 
     private ClientType(String className) {
       this.clientClassName = className;
-  }
+    }
 
     protected String getClientClassName() {
       return this.clientClassName;



Mime
View raw message