flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-2082. JMX support for Seq Generator Source.
Date Fri, 21 Jun 2013 19:22:13 GMT
Updated Branches:
  refs/heads/trunk 862c83187 -> 7d131b6a0


FLUME-2082. JMX support for Seq Generator Source.

(Sravya Tirukkovalur via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7d131b6a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7d131b6a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7d131b6a

Branch: refs/heads/trunk
Commit: 7d131b6a0d123b7d7f515cc6a9da001626263a17
Parents: 862c831
Author: Mike Percy <mpercy@apache.org>
Authored: Fri Jun 21 12:21:27 2013 -0700
Committer: Mike Percy <mpercy@apache.org>
Committed: Fri Jun 21 12:21:27 2013 -0700

----------------------------------------------------------------------
 .../flume/source/SequenceGeneratorSource.java    | 19 ++++++++++++-------
 1 file changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7d131b6a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index 0f85e87..51e021a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -23,12 +23,12 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSource;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,14 +40,13 @@ public class SequenceGeneratorSource extends AbstractSource implements
 
   private long sequence;
   private int batchSize;
-  private CounterGroup counterGroup;
+  private SourceCounter sourceCounter;
   private List<Event> batchArrayList;
   private long totalEvents;
   private long eventsSent = 0;
 
   public SequenceGeneratorSource() {
     sequence = 0;
-    counterGroup = new CounterGroup();
   }
 
   /**
@@ -61,6 +60,9 @@ public class SequenceGeneratorSource extends AbstractSource implements
       batchArrayList = new ArrayList<Event>(batchSize);
     }
     totalEvents = context.getLong("totalEvents", Long.MAX_VALUE);
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
   }
 
   @Override
@@ -73,6 +75,7 @@ public class SequenceGeneratorSource extends AbstractSource implements
         if(eventsSent < totalEvents) {
           getChannelProcessor().processEvent(
             EventBuilder.withBody(String.valueOf(sequence++).getBytes()));
+          sourceCounter.incrementEventAcceptedCount();
           eventsSent++;
         } else {
           status = Status.BACKOFF;
@@ -90,11 +93,12 @@ public class SequenceGeneratorSource extends AbstractSource implements
         }
         if(!batchArrayList.isEmpty()) {
           getChannelProcessor().processEventBatch(batchArrayList);
+          sourceCounter.incrementAppendBatchAcceptedCount();
+          sourceCounter.addToEventAcceptedCount(batchArrayList.size());
         }
       }
-      counterGroup.incrementAndGet("events.successful");
+
     } catch (ChannelException ex) {
-      counterGroup.incrementAndGet("events.failed");
       eventsSent -= i;
       logger.error( getName() + " source could not write to channel.", ex);
     }
@@ -107,7 +111,7 @@ public class SequenceGeneratorSource extends AbstractSource implements
     logger.info("Sequence generator source starting");
 
     super.start();
-
+    sourceCounter.start();
     logger.debug("Sequence generator source started");
   }
 
@@ -116,8 +120,9 @@ public class SequenceGeneratorSource extends AbstractSource implements
     logger.info("Sequence generator source stopping");
 
     super.stop();
+    sourceCounter.stop();
 
-    logger.info("Sequence generator source stopped. Metrics:{}", counterGroup);
+    logger.info("Sequence generator source stopped. Metrics:{}",getName(), sourceCounter);
   }
 
 }


Mime
View raw message