flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject svn commit: r1354787 - in /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume: channel/ChannelProcessor.java sink/DefaultSinkFactory.java source/AvroSource.java
Date Thu, 28 Jun 2012 00:42:46 GMT
Author: hshreedharan
Date: Thu Jun 28 00:42:45 2012
New Revision: 1354787

URL: http://svn.apache.org/viewvc?rev=1354787&view=rev
Log:
FLUME-1330. AvroSource should not use Fixed thread pool for boss threads when pool size is
specified. Also made some logging updates.

(Mike Percy via Hari Shreedharan)


Modified:
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
    incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java?rev=1354787&r1=1354786&r2=1354787&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
Thu Jun 28 00:42:45 2012
@@ -186,14 +186,14 @@ public class ChannelProcessor implements
 
         tx.commit();
       } catch (Throwable t) {
-        LOG.error("Caught exception during Transaction", t);
         tx.rollback();
-        if (t instanceof ChannelException) {
-          throw (ChannelException) t;
-        } else if (t instanceof Error) {
+        if (t instanceof Error) {
+          LOG.error("Error while writing to required channel: " +
+              reqChannel, t);
           throw (Error) t;
         } else {
-          throw new ChannelException("Uncaught throwable", t);
+          throw new ChannelException("Unable to put batch on required " +
+              "channel: " + reqChannel, t);
         }
       } finally {
         if (tx != null) {
@@ -218,7 +218,7 @@ public class ChannelProcessor implements
         tx.commit();
       } catch (Throwable t) {
         tx.rollback();
-        LOG.warn("Unable to put event on optional channel", t);
+        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
         if (t instanceof Error) {
           throw (Error) t;
         }
@@ -251,21 +251,25 @@ public class ChannelProcessor implements
 
     // Process required channels
     List<Channel> requiredChannels = selector.getRequiredChannels(event);
-    for (Channel requiredChannel : requiredChannels) {
-      Transaction tx = requiredChannel.getTransaction();
+    for (Channel reqChannel : requiredChannels) {
+      Transaction tx = reqChannel.getTransaction();
       Preconditions.checkNotNull(tx, "Transaction object must not be null");
       try {
         tx.begin();
 
-        requiredChannel.put(event);
+        reqChannel.put(event);
 
         tx.commit();
-      } catch (ChannelException ex) {
-        tx.rollback();
-        throw ex;
-      } catch (Exception e) {
+      } catch (Throwable t) {
         tx.rollback();
-        throw new ChannelException("Unexpected error", e);
+        if (t instanceof Error) {
+          LOG.error("Error while writing to required channel: " +
+              reqChannel, t);
+          throw (Error) t;
+        } else {
+          throw new ChannelException("Unable to put event on required " +
+              "channel: " + reqChannel, t);
+        }
       } finally {
         if (tx != null) {
           tx.close();
@@ -275,19 +279,21 @@ public class ChannelProcessor implements
 
     // Process optional channels
     List<Channel> optionalChannels = selector.getOptionalChannels(event);
-    for (Channel optionalChannel : optionalChannels) {
+    for (Channel optChannel : optionalChannels) {
       Transaction tx = null;
       try {
-        tx = optionalChannel.getTransaction();
+        tx = optChannel.getTransaction();
         tx.begin();
 
-        optionalChannel.put(event);
+        optChannel.put(event);
 
         tx.commit();
-      } catch (ChannelException ex) {
+      } catch (Throwable t) {
         tx.rollback();
-        LOG.warn("Unable to put event on optional channel "
-            + optionalChannel.getName(), ex);
+        LOG.error("Unable to put event on optional channel: " + optChannel, t);
+        if (t instanceof Error) {
+          throw (Error) t;
+        }
       } finally {
         if (tx != null) {
           tx.close();

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java?rev=1354787&r1=1354786&r2=1354787&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java
Thu Jun 28 00:42:45 2012
@@ -67,7 +67,7 @@ public class DefaultSinkFactory implemen
       throws FlumeException {
     Preconditions.checkNotNull(name);
     Preconditions.checkNotNull(type);
-    logger.info("Creating instance of sink {} type{}", name, type);
+    logger.info("Creating instance of sink: {}, type: {}", name, type);
 
     String sinkClassName = type;
 

Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1354787&r1=1354786&r2=1354787&view=diff
==============================================================================
--- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
(original)
+++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
Thu Jun 28 00:42:45 2012
@@ -135,7 +135,7 @@ public class AvroSource extends Abstract
 
   @Override
   public void start() {
-    logger.info("Avro source starting:{}", this);
+    logger.info("Starting {}...", this);
 
     Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
     if(maxThreads <= 0) {
@@ -143,7 +143,8 @@ public class AvroSource extends Abstract
               new InetSocketAddress(bindAddress, port));
     } else {
       server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),
-              new NioServerSocketChannelFactory(Executors.newFixedThreadPool(maxThreads),
+              new NioServerSocketChannelFactory(
+                      Executors.newCachedThreadPool(),
                       Executors.newFixedThreadPool(maxThreads)));
     }
 
@@ -151,30 +152,31 @@ public class AvroSource extends Abstract
 
     super.start();
 
-    logger.debug("Avro source started");
+    logger.info("Avro source {} started.", getName());
   }
 
   @Override
   public void stop() {
-    logger.info("Avro source stopping:{}", this);
+    logger.info("Avro source {} stopping: {}", getName(), this);
 
     server.close();
 
     try {
       server.join();
     } catch (InterruptedException e) {
-      logger
-          .info("Interrupted while waiting for Avro server to stop. Exiting.");
+      logger.info("Avro source " + getName() + ": Interrupted while waiting " +
+          "for Avro server to stop. Exiting. Exception follows.", e);
     }
 
     super.stop();
 
-    logger.debug("Avro source stopped. Metrics:{}", counterGroup);
+    logger.info("Avro source {} stopped. Metrics: {}", getName(), counterGroup);
   }
 
   @Override
   public String toString() {
-    return "AvroSource: { bindAddress:" + bindAddress + " port:" + port + " }";
+    return "Avro source " + getName() + ": { bindAddress: " + bindAddress +
+        ", port: " + port + " }";
   }
 
   /**
@@ -192,8 +194,7 @@ public class AvroSource extends Abstract
 
   @Override
   public Status append(AvroFlumeEvent avroEvent) {
-    logger.debug("Received avro event:{}", avroEvent);
-
+    logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent);
     counterGroup.incrementAndGet("rpc.received");
 
     Event event = EventBuilder.withBody(avroEvent.getBody().array(),
@@ -202,7 +203,8 @@ public class AvroSource extends Abstract
     try {
       getChannelProcessor().processEvent(event);
     } catch (ChannelException ex) {
-      logger.warn("Unable to process event", ex);
+      logger.warn("Avro source " + getName() + ": Unable to process event. " +
+          "Exception follows.", ex);
       return Status.FAILED;
     }
 
@@ -213,14 +215,16 @@ public class AvroSource extends Abstract
 
   @Override
   public Status appendBatch(List<AvroFlumeEvent> events) {
-    counterGroup.incrementAndGet("rpc.received.batch");
+    logger.debug("Avro source {}: Received avro event batch of {} events.",
+        getName(), events.size());
+    counterGroup.incrementAndGet("rpc.batch.received");
 
     List<Event> batch = new ArrayList<Event>();
 
     for (AvroFlumeEvent avroEvent : events) {
       Event event = EventBuilder.withBody(avroEvent.getBody().array(),
           toStringMap(avroEvent.getHeaders()));
-      counterGroup.incrementAndGet("rpc.events");
+      counterGroup.incrementAndGet("rpc.batch.events");
 
       batch.add(event);
     }
@@ -228,11 +232,12 @@ public class AvroSource extends Abstract
     try {
       getChannelProcessor().processEventBatch(batch);
     } catch (ChannelException ex) {
-      logger.error("Unable to process event batch", ex);
+      logger.error("Avro source " + getName() + ": Unable to process event " +
+          "batch. Exception follows.", ex);
       return Status.FAILED;
     }
 
-    counterGroup.incrementAndGet("rpc.successful");
+    counterGroup.incrementAndGet("rpc.batch.successful");
 
     return Status.OK;
   }



Mime
View raw message