flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1567: Avro source should expose the number of active connections through JMX
Date Wed, 26 Sep 2012 18:29:48 GMT
Updated Branches:
  refs/heads/trunk 3a4349503 -> bcf8e4324


FLUME-1567: Avro source should expose the number of active connections through JMX

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bcf8e432
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bcf8e432
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bcf8e432

Branch: refs/heads/trunk
Commit: bcf8e4324dc45141e63385e298f59854c62bc2e7
Parents: 3a43495
Author: Brock Noland <brock@apache.org>
Authored: Wed Sep 26 13:29:15 2012 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Wed Sep 26 13:29:15 2012 -0500

----------------------------------------------------------------------
 .../flume/instrumentation/SourceCounter.java       |   14 +++++++-
 .../flume/instrumentation/SourceCounterMBean.java  |    2 +
 .../java/org/apache/flume/source/AvroSource.java   |   26 +++++++++++++-
 pom.xml                                            |    6 ++--
 4 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/bcf8e432/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
index 7d69182..972d2c6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
@@ -35,13 +35,17 @@ public class SourceCounter extends MonitoredCounterGroup implements
       "src.append-batch.received";
   private static final String COUNTER_APPEND_BATCH_ACCEPTED =
       "src.append-batch.accepted";
+  
+  private static final String COUNTER_OPEN_CONNECTION_COUNT =
+          "src.open-connection.count";
 
 
   private static final String[] ATTRIBUTES =
     {
       COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED,
       COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED,
-      COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED
+      COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED,
+      COUNTER_OPEN_CONNECTION_COUNT
     };
 
 
@@ -110,4 +114,12 @@ public class SourceCounter extends MonitoredCounterGroup implements
   public long incrementAppendBatchAcceptedCount() {
     return increment(COUNTER_APPEND_BATCH_ACCEPTED);
   }
+
+  public long getOpenConnectionCount() {
+    return get(COUNTER_OPEN_CONNECTION_COUNT);
+  }
+
+  public void setOpenConnectionCount(long openConnectionCount){
+    set(COUNTER_OPEN_CONNECTION_COUNT, openConnectionCount);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/bcf8e432/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
index 792e689..5ccbed4 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
@@ -43,4 +43,6 @@ public interface SourceCounterMBean {
   long getStopTime();
 
   String getType();
+
+  long getOpenConnectionCount();
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/bcf8e432/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index e91af9e..47ccf9f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -19,12 +19,15 @@
 
 package org.apache.flume.source;
 
+import com.google.common.base.Throwables;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Responder;
@@ -116,6 +119,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
   private SourceCounter sourceCounter;
 
   private int maxThreads;
+  private ScheduledExecutorService connectionCountUpdater;
 
   @Override
   public void configure(Context context) {
@@ -147,10 +151,19 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
                       Executors.newCachedThreadPool(),
                       Executors.newFixedThreadPool(maxThreads)));
     }
-
+    connectionCountUpdater = Executors.newSingleThreadScheduledExecutor();
     server.start();
     sourceCounter.start();
     super.start();
+    final NettyServer srv = (NettyServer)server;
+    connectionCountUpdater.scheduleWithFixedDelay(new Runnable(){
+
+      @Override
+      public void run() {
+        sourceCounter.setOpenConnectionCount(
+                Long.valueOf(srv.getNumActiveConnections()));
+      }
+    }, 0, 60, TimeUnit.SECONDS);
 
     logger.info("Avro source {} started.", getName());
   }
@@ -168,8 +181,17 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
           "for Avro server to stop. Exiting. Exception follows.", e);
     }
     sourceCounter.stop();
+    connectionCountUpdater.shutdown();
+    while(!connectionCountUpdater.isTerminated()){
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ex) {
+        logger.error("Interrupted while waiting for connection count executor "
+                + "to terminate", ex);
+        Throwables.propagate(ex);
+      }
+    }
     super.stop();
-
     logger.info("Avro source {} stopped. Metrics: {}", getName(),
         sourceCounter);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/bcf8e432/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f34c808..e19d2d2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -652,19 +652,19 @@ limitations under the License.
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro</artifactId>
-        <version>1.7.1</version>
+        <version>1.7.2</version>
       </dependency>
 
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro-compiler</artifactId>
-        <version>1.7.1</version>
+        <version>1.7.2</version>
       </dependency>
 
       <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro-ipc</artifactId>
-        <version>1.7.1</version>
+        <version>1.7.2</version>
         <exclusions>
           <exclusion>
             <groupId>org.mortbay.jetty</groupId>


Mime
View raw message