Return-Path: X-Original-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-flume-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E5299D3F2 for ; Thu, 28 Jun 2012 00:43:11 +0000 (UTC) Received: (qmail 93368 invoked by uid 500); 28 Jun 2012 00:43:11 -0000 Delivered-To: apmail-incubator-flume-commits-archive@incubator.apache.org Received: (qmail 93325 invoked by uid 500); 28 Jun 2012 00:43:11 -0000 Mailing-List: contact flume-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: flume-dev@incubator.apache.org Delivered-To: mailing list flume-commits@incubator.apache.org Received: (qmail 93317 invoked by uid 99); 28 Jun 2012 00:43:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jun 2012 00:43:11 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 28 Jun 2012 00:43:08 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id EEF8623888CD; Thu, 28 Jun 2012 00:42:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1354787 - in /incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume: channel/ChannelProcessor.java sink/DefaultSinkFactory.java source/AvroSource.java Date: Thu, 28 Jun 2012 00:42:46 -0000 To: flume-commits@incubator.apache.org From: hshreedharan@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120628004246.EEF8623888CD@eris.apache.org> Author: hshreedharan Date: Thu Jun 28 00:42:45 2012 New Revision: 1354787 URL: http://svn.apache.org/viewvc?rev=1354787&view=rev Log: FLUME-1330. AvroSource should not use Fixed thread pool for boss threads when pool size is specified. Also made some logging updates. (Mike Percy via Hari Shreedharan) Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java?rev=1354787&r1=1354786&r2=1354787&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java (original) +++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java Thu Jun 28 00:42:45 2012 @@ -186,14 +186,14 @@ public class ChannelProcessor implements tx.commit(); } catch (Throwable t) { - LOG.error("Caught exception during Transaction", t); tx.rollback(); - if (t instanceof ChannelException) { - throw (ChannelException) t; - } else if (t instanceof Error) { + if (t instanceof Error) { + LOG.error("Error while writing to required channel: " + + reqChannel, t); throw (Error) t; } else { - throw new ChannelException("Uncaught throwable", t); + throw new ChannelException("Unable to put batch on required " + + "channel: " + reqChannel, t); } } finally { if (tx != null) { @@ -218,7 +218,7 @@ public class ChannelProcessor implements tx.commit(); } catch (Throwable t) { tx.rollback(); - LOG.warn("Unable to put event on optional channel", t); + LOG.error("Unable to put batch on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } @@ -251,21 +251,25 @@ public class ChannelProcessor implements // Process required channels List requiredChannels = selector.getRequiredChannels(event); - for (Channel requiredChannel : requiredChannels) { - Transaction tx = requiredChannel.getTransaction(); + for (Channel reqChannel : requiredChannels) { + Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); - requiredChannel.put(event); + reqChannel.put(event); tx.commit(); - } catch (ChannelException ex) { - tx.rollback(); - throw ex; - } catch (Exception e) { + } catch (Throwable t) { tx.rollback(); - throw new ChannelException("Unexpected error", e); + if (t instanceof Error) { + LOG.error("Error while writing to required channel: " + + reqChannel, t); + throw (Error) t; + } else { + throw new ChannelException("Unable to put event on required " + + "channel: " + reqChannel, t); + } } finally { if (tx != null) { tx.close(); @@ -275,19 +279,21 @@ public class ChannelProcessor implements // Process optional channels List optionalChannels = selector.getOptionalChannels(event); - for (Channel optionalChannel : optionalChannels) { + for (Channel optChannel : optionalChannels) { Transaction tx = null; try { - tx = optionalChannel.getTransaction(); + tx = optChannel.getTransaction(); tx.begin(); - optionalChannel.put(event); + optChannel.put(event); tx.commit(); - } catch (ChannelException ex) { + } catch (Throwable t) { tx.rollback(); - LOG.warn("Unable to put event on optional channel " - + optionalChannel.getName(), ex); + LOG.error("Unable to put event on optional channel: " + optChannel, t); + if (t instanceof Error) { + throw (Error) t; + } } finally { if (tx != null) { tx.close(); Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java?rev=1354787&r1=1354786&r2=1354787&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java (original) +++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/sink/DefaultSinkFactory.java Thu Jun 28 00:42:45 2012 @@ -67,7 +67,7 @@ public class DefaultSinkFactory implemen throws FlumeException { Preconditions.checkNotNull(name); Preconditions.checkNotNull(type); - logger.info("Creating instance of sink {} type{}", name, type); + logger.info("Creating instance of sink: {}, type: {}", name, type); String sinkClassName = type; Modified: incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1354787&r1=1354786&r2=1354787&view=diff ============================================================================== --- incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original) +++ incubator/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Thu Jun 28 00:42:45 2012 @@ -135,7 +135,7 @@ public class AvroSource extends Abstract @Override public void start() { - logger.info("Avro source starting:{}", this); + logger.info("Starting {}...", this); Responder responder = new SpecificResponder(AvroSourceProtocol.class, this); if(maxThreads <= 0) { @@ -143,7 +143,8 @@ public class AvroSource extends Abstract new InetSocketAddress(bindAddress, port)); } else { server = new NettyServer(responder, new InetSocketAddress(bindAddress, port), - new NioServerSocketChannelFactory(Executors.newFixedThreadPool(maxThreads), + new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newFixedThreadPool(maxThreads))); } @@ -151,30 +152,31 @@ public class AvroSource extends Abstract super.start(); - logger.debug("Avro source started"); + logger.info("Avro source {} started.", getName()); } @Override public void stop() { - logger.info("Avro source stopping:{}", this); + logger.info("Avro source {} stopping: {}", getName(), this); server.close(); try { server.join(); } catch (InterruptedException e) { - logger - .info("Interrupted while waiting for Avro server to stop. Exiting."); + logger.info("Avro source " + getName() + ": Interrupted while waiting " + + "for Avro server to stop. Exiting. Exception follows.", e); } super.stop(); - logger.debug("Avro source stopped. Metrics:{}", counterGroup); + logger.info("Avro source {} stopped. Metrics: {}", getName(), counterGroup); } @Override public String toString() { - return "AvroSource: { bindAddress:" + bindAddress + " port:" + port + " }"; + return "Avro source " + getName() + ": { bindAddress: " + bindAddress + + ", port: " + port + " }"; } /** @@ -192,8 +194,7 @@ public class AvroSource extends Abstract @Override public Status append(AvroFlumeEvent avroEvent) { - logger.debug("Received avro event:{}", avroEvent); - + logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent); counterGroup.incrementAndGet("rpc.received"); Event event = EventBuilder.withBody(avroEvent.getBody().array(), @@ -202,7 +203,8 @@ public class AvroSource extends Abstract try { getChannelProcessor().processEvent(event); } catch (ChannelException ex) { - logger.warn("Unable to process event", ex); + logger.warn("Avro source " + getName() + ": Unable to process event. " + + "Exception follows.", ex); return Status.FAILED; } @@ -213,14 +215,16 @@ public class AvroSource extends Abstract @Override public Status appendBatch(List events) { - counterGroup.incrementAndGet("rpc.received.batch"); + logger.debug("Avro source {}: Received avro event batch of {} events.", + getName(), events.size()); + counterGroup.incrementAndGet("rpc.batch.received"); List batch = new ArrayList(); for (AvroFlumeEvent avroEvent : events) { Event event = EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); - counterGroup.incrementAndGet("rpc.events"); + counterGroup.incrementAndGet("rpc.batch.events"); batch.add(event); } @@ -228,11 +232,12 @@ public class AvroSource extends Abstract try { getChannelProcessor().processEventBatch(batch); } catch (ChannelException ex) { - logger.error("Unable to process event batch", ex); + logger.error("Avro source " + getName() + ": Unable to process event " + + "batch. Exception follows.", ex); return Status.FAILED; } - counterGroup.incrementAndGet("rpc.successful"); + counterGroup.incrementAndGet("rpc.batch.successful"); return Status.OK; }