hudi-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From le...@apache.org
Subject [incubator-hudi] branch master updated: [HUDI-209] Implement JMX metrics reporter (#1106)
Date Thu, 19 Mar 2020 12:10:45 GMT
This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e321c2  [HUDI-209] Implement JMX metrics reporter (#1106)
1e321c2 is described below

commit 1e321c2fc0f9fedcbb8a0339c90a28a9be5afd64
Author: ForwardXu <forwardxu315@gmail.com>
AuthorDate: Thu Mar 19 20:10:35 2020 +0800

    [HUDI-209] Implement JMX metrics reporter (#1106)
---
 hudi-client/pom.xml                                |   4 +
 .../apache/hudi/config/HoodieMetricsConfig.java    |  16 ++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +-
 .../hudi/metrics/InMemoryMetricsReporter.java      |   5 +
 .../apache/hudi/metrics/JmxMetricsReporter.java    |  95 ++++++++----
 .../org/apache/hudi/metrics/JmxReporterServer.java | 160 +++++++++++++++++++++
 .../main/java/org/apache/hudi/metrics/Metrics.java |   4 +-
 .../hudi/metrics/MetricsGraphiteReporter.java      |   7 +
 .../org/apache/hudi/metrics/MetricsReporter.java   |   5 +
 .../hudi/metrics/MetricsReporterFactory.java       |   2 +-
 .../apache/hudi/metrics/TestHoodieJmxMetrics.java  |  27 ++--
 pom.xml                                            |   5 +
 12 files changed, 292 insertions(+), 42 deletions(-)

diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 06d6017..3cab43a 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -117,6 +117,10 @@
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-jmx</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>com.beust</groupId>
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
index a21e4cc..b17e935 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
@@ -101,6 +101,16 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder toJmxHost(String host) {
+      props.setProperty(JMX_HOST, host);
+      return this;
+    }
+
+    public Builder onJmxPort(String port) {
+      props.setProperty(JMX_PORT, port);
+      return this;
+    }
+
     public Builder usePrefix(String prefix) {
       props.setProperty(GRAPHITE_METRIC_PREFIX, prefix);
       return this;
@@ -115,8 +125,10 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
           DEFAULT_GRAPHITE_SERVER_HOST);
       setDefaultOnCondition(props, !props.containsKey(GRAPHITE_SERVER_PORT), GRAPHITE_SERVER_PORT,
           String.valueOf(DEFAULT_GRAPHITE_SERVER_PORT));
-      setDefaultOnCondition(props, !props.containsKey(GRAPHITE_SERVER_PORT), GRAPHITE_SERVER_PORT,
-          String.valueOf(DEFAULT_GRAPHITE_SERVER_PORT));
+      setDefaultOnCondition(props, !props.containsKey(JMX_HOST), JMX_HOST,
+          DEFAULT_JMX_HOST);
+      setDefaultOnCondition(props, !props.containsKey(JMX_PORT), JMX_PORT,
+          String.valueOf(DEFAULT_JMX_PORT));
       return config;
     }
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 04c1dfd..2d323a3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -500,8 +500,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return props.getProperty(HoodieMetricsConfig.JMX_HOST);
   }
 
-  public int getJmxPort() {
-    return Integer.parseInt(props.getProperty(HoodieMetricsConfig.JMX_PORT));
+  public String getJmxPort() {
+    return props.getProperty(HoodieMetricsConfig.JMX_PORT);
   }
 
   /**
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/InMemoryMetricsReporter.java
b/hudi-client/src/main/java/org/apache/hudi/metrics/InMemoryMetricsReporter.java
index a0221c8..a145024 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/InMemoryMetricsReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/InMemoryMetricsReporter.java
@@ -35,4 +35,9 @@ public class InMemoryMetricsReporter extends MetricsReporter {
   public Closeable getReporter() {
     return null;
   }
+
+  @Override
+  public void stop() {
+
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
index 921dcea..c7c596c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxMetricsReporter.java
@@ -18,45 +18,53 @@
 
 package org.apache.hudi.metrics;
 
+import com.google.common.base.Preconditions;
+import java.lang.management.ManagementFactory;
+import javax.management.MBeanServer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 
-import javax.management.remote.JMXConnectorServer;
-import javax.management.remote.JMXConnectorServerFactory;
-import javax.management.remote.JMXServiceURL;
+import com.codahale.metrics.MetricRegistry;
 
 import java.io.Closeable;
-import java.lang.management.ManagementFactory;
-import java.rmi.registry.LocateRegistry;
-import java.util.Objects;
+import java.util.stream.IntStream;
 
 /**
  * Implementation of Jmx reporter, which used to report jmx metric.
  */
 public class JmxMetricsReporter extends MetricsReporter {
 
-  private static final Logger LOG = LogManager.getLogger(JmxMetricsReporter.class);
-  private final JMXConnectorServer connector;
+  private static final org.apache.log4j.Logger LOG = LogManager.getLogger(JmxMetricsReporter.class);
+
+  private final MetricRegistry registry;
+  private JmxReporterServer jmxReporterServer;
 
-  public JmxMetricsReporter(HoodieWriteConfig config) {
+  public JmxMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) {
     try {
+      this.registry = registry;
       // Check the host and port here
       String host = config.getJmxHost();
-      int port = config.getJmxPort();
-      if (host == null || port == 0) {
-        throw new RuntimeException(
+      String portsConfig = config.getJmxPort();
+      if (host == null || portsConfig == null) {
+        throw new HoodieException(
             String.format("Jmx cannot be initialized with host[%s] and port[%s].",
-                    host, port));
+                host, portsConfig));
+      }
+      int[] ports = getPortRangeFromString(portsConfig);
+      boolean successfullyStartedServer = false;
+      for (int port : ports) {
+        jmxReporterServer = createJmxReport(host, port);
+        LOG.info("Started JMX server on port " + port + ".");
+        successfullyStartedServer = true;
+        break;
+      }
+      if (!successfullyStartedServer) {
+        throw new HoodieException(
+            "Could not start JMX server on any configured port. Ports: " + portsConfig);
       }
-      LocateRegistry.createRegistry(port);
-      String serviceUrl =
-          "service:jmx:rmi://" + host + ":" + port + "/jndi/rmi://" + host + ":" + port +
"/jmxrmi";
-      JMXServiceURL url = new JMXServiceURL(serviceUrl);
-      this.connector = JMXConnectorServerFactory
-          .newJMXConnectorServer(url, null, ManagementFactory.getPlatformMBeanServer());
+      LOG.info("Configured JMXReporter with {port:" + portsConfig + "}");
     } catch (Exception e) {
       String msg = "Jmx initialize failed: ";
       LOG.error(msg, e);
@@ -66,11 +74,10 @@ public class JmxMetricsReporter extends MetricsReporter {
 
   @Override
   public void start() {
-    try {
-      Objects.requireNonNull(connector, "Cannot start as the jmxReporter is null.");
-      connector.start();
-    } catch (Exception e) {
-      throw new HoodieException(e);
+    if (jmxReporterServer != null) {
+      jmxReporterServer.start();
+    } else {
+      LOG.error("Cannot start as the jmxReporter is null.");
     }
   }
 
@@ -80,6 +87,42 @@ public class JmxMetricsReporter extends MetricsReporter {
 
   @Override
   public Closeable getReporter() {
-    return null;
+    return jmxReporterServer.getReporter();
+  }
+
+  @Override
+  public void stop() {
+    Preconditions.checkNotNull(jmxReporterServer, "jmxReporterServer is not running.");
+    try {
+      jmxReporterServer.stop();
+    } catch (Exception e) {
+      throw new HoodieException("Stop jmxReporterServer fail", e);
+    }
+  }
+
+  private JmxReporterServer createJmxReport(String host, int port) {
+    MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
+    return JmxReporterServer.forRegistry(registry)
+        .host(host)
+        .port(port)
+        .registerWith(mBeanServer)
+        .build();
+  }
+
+  private int[] getPortRangeFromString(String portsConfig) {
+    String range = portsConfig.trim();
+    int dashIdx = range.indexOf('-');
+    final int start;
+    final int end;
+    if (dashIdx == -1) {
+      start = Integer.parseInt(range);
+      end = Integer.parseInt(range);
+    } else {
+      start = Integer.parseInt(range.substring(0, dashIdx));
+      end = Integer.parseInt(range.substring(dashIdx + 1));
+    }
+    return IntStream.rangeClosed(start, end)
+        .filter(port -> (0 <= port && port <= 65535))
+        .toArray();
   }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java
new file mode 100644
index 0000000..e055af6
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/JmxReporterServer.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.jmx.JmxReporter;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.rmi.NoSuchObjectException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.registry.Registry;
+import java.rmi.server.UnicastRemoteObject;
+import javax.management.MBeanServer;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+
+/**
+ * A reporter which publishes metric values to a JMX server.
+ */
+public class JmxReporterServer {
+
+  /**
+   * Returns a new {@link JmxReporterServer.Builder} for {@link JmxReporterServer}.
+   *
+   * @param registry the registry to report
+   * @return a {@link JmxReporterServer.Builder} instance for a {@link JmxReporterServer}
+   */
+  public static JmxReporterServer.Builder forRegistry(MetricRegistry registry) {
+    return new JmxReporterServer.Builder(registry);
+  }
+
+  /**
+   * A builder for {@link JmxReporterServer} instances.
+   */
+  public static class Builder {
+
+    private final MetricRegistry registry;
+    private MBeanServer mBeanServer;
+    private String host;
+    private int port;
+
+    private Builder(MetricRegistry registry) {
+      this.registry = registry;
+    }
+
+    public JmxReporterServer.Builder host(String host) {
+      this.host = host;
+      return this;
+    }
+
+    public JmxReporterServer.Builder port(int port) {
+      this.port = port;
+      return this;
+    }
+
+    public JmxReporterServer.Builder registerWith(MBeanServer mBeanServer) {
+      this.mBeanServer = mBeanServer;
+      return this;
+    }
+
+    public JmxReporterServer build() {
+      Preconditions.checkNotNull(registry, "registry cannot be null!");
+      Preconditions.checkNotNull(mBeanServer, "mBeanServer cannot be null!");
+      Preconditions
+          .checkArgument(!StringUtils.isNullOrEmpty(host), "host cannot be null or empty!");
+      return new JmxReporterServer(registry, host, port, mBeanServer);
+    }
+  }
+
+  private JMXConnectorServer connector;
+  private Registry rmiRegistry;
+  private JmxReporter reporter;
+
+  protected JmxReporterServer(MetricRegistry registry, String host, int port,
+      MBeanServer mBeanServer) {
+    String serviceUrl =
+        "service:jmx:rmi://localhost:" + port + "/jndi/rmi://" + host + ":" + port + "/jmxrmi";
+    try {
+      JMXServiceURL url = new JMXServiceURL(serviceUrl);
+      connector = JMXConnectorServerFactory
+          .newJMXConnectorServer(url, null, mBeanServer);
+      rmiRegistry = LocateRegistry.createRegistry(port);
+      reporter = JmxReporter.forRegistry(registry).registerWith(mBeanServer).build();
+    } catch (Exception e) {
+      throw new HoodieException("Jmx service url created " + serviceUrl, e);
+    }
+  }
+
+  public JmxReporter getReporter() {
+    return reporter;
+  }
+
+  public void start() {
+    Preconditions.checkArgument(reporter != null && connector != null,
+        "reporter or connector cannot be null!");
+    try {
+      connector.start();
+      reporter.start();
+    } catch (Exception e) {
+      throw new HoodieException("connector or reporter start failed", e);
+    }
+  }
+
+  public void stop() throws IOException {
+    stopConnector();
+    stopReport();
+    stopRmiRegistry();
+  }
+
+  private void stopRmiRegistry() {
+    if (rmiRegistry != null) {
+      try {
+        UnicastRemoteObject.unexportObject(rmiRegistry, true);
+      } catch (NoSuchObjectException e) {
+        throw new HoodieException("Could not un-export our RMI registry", e);
+      } finally {
+        rmiRegistry = null;
+      }
+    }
+  }
+
+  private void stopConnector() throws IOException {
+    if (connector != null) {
+      try {
+        connector.stop();
+      } finally {
+        connector = null;
+      }
+    }
+  }
+
+  private void stopReport() {
+    if (reporter != null) {
+      try {
+        reporter.stop();
+      } finally {
+        reporter = null;
+      }
+    }
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
index 533208f..2538133 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -33,6 +33,7 @@ import java.io.Closeable;
  * This is the main class of the metrics system.
  */
 public class Metrics {
+
   private static final Logger LOG = LogManager.getLogger(Metrics.class);
 
   private static volatile boolean initialized = false;
@@ -47,11 +48,12 @@ public class Metrics {
     if (reporter == null) {
       throw new RuntimeException("Cannot initialize Reporter.");
     }
-    // reporter.start();
+    reporter.start();
 
     Runtime.getRuntime().addShutdownHook(new Thread(() -> {
       try {
         reporter.report();
+        reporter.stop();
         Closeables.close(reporter.getReporter(), true);
       } catch (Exception e) {
         e.printStackTrace();
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
index aac6c70..9855ac0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsGraphiteReporter.java
@@ -87,4 +87,11 @@ public class MetricsGraphiteReporter extends MetricsReporter {
     return GraphiteReporter.forRegistry(registry).prefixedWith(reporterPrefix).convertRatesTo(TimeUnit.SECONDS)
         .convertDurationsTo(TimeUnit.MILLISECONDS).filter(MetricFilter.ALL).build(graphite);
   }
+
+  @Override
+  public void stop() {
+    if (graphiteReporter != null) {
+      graphiteReporter.stop();
+    }
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporter.java
index de52f35..773bb3b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporter.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporter.java
@@ -36,4 +36,9 @@ public abstract class MetricsReporter {
   public abstract void report();
 
   public abstract Closeable getReporter();
+
+  /**
+   * Stop this reporter. Should be used to stop channels, streams and release resources.
+   */
+  public abstract void stop();
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index b9d433d..8a3a592 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -42,7 +42,7 @@ public class MetricsReporterFactory {
         reporter = new InMemoryMetricsReporter();
         break;
       case JMX:
-        reporter = new JmxMetricsReporter(config);
+        reporter = new JmxMetricsReporter(config, registry);
         break;
       default:
         LOG.error("Reporter type[" + type + "] is not supported.");
diff --git a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
index 72b218b..fa0f1da 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metrics/TestHoodieJmxMetrics.java
@@ -18,10 +18,8 @@
 
 package org.apache.hudi.metrics;
 
-import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
-import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.hudi.metrics.Metrics.registerGauge;
@@ -34,20 +32,29 @@ import static org.mockito.Mockito.when;
  */
 public class TestHoodieJmxMetrics {
 
-  @Before
-  public void start() {
-    HoodieWriteConfig config = mock(HoodieWriteConfig.class);
+  HoodieWriteConfig config = mock(HoodieWriteConfig.class);
+
+  @Test
+  public void testRegisterGauge() {
     when(config.isMetricsOn()).thenReturn(true);
     when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
-    when(config.getJmxHost()).thenReturn(HoodieMetricsConfig.DEFAULT_JMX_HOST);
-    when(config.getJmxPort()).thenReturn(HoodieMetricsConfig.DEFAULT_JMX_PORT);
+    when(config.getJmxHost()).thenReturn("localhost");
+    when(config.getJmxPort()).thenReturn("9889");
     new HoodieMetrics(config, "raw_table");
+    registerGauge("jmx_metric1", 123L);
+    assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
+        .get("jmx_metric1").getValue().toString());
   }
 
   @Test
-  public void testRegisterGauge() {
-    registerGauge("jmx_metric", 123L);
+  public void testRegisterGaugeByRangerPort() {
+    when(config.isMetricsOn()).thenReturn(true);
+    when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.JMX);
+    when(config.getJmxHost()).thenReturn("localhost");
+    when(config.getJmxPort()).thenReturn("1000-5000");
+    new HoodieMetrics(config, "raw_table");
+    registerGauge("jmx_metric2", 123L);
     assertEquals("123", Metrics.getInstance().getRegistry().getGauges()
-        .get("jmx_metric").getValue().toString());
+        .get("jmx_metric2").getValue().toString());
   }
 }
diff --git a/pom.xml b/pom.xml
index 62427e5..758a049 100644
--- a/pom.xml
+++ b/pom.xml
@@ -516,6 +516,11 @@
         <artifactId>metrics-core</artifactId>
         <version>${metrics.version}</version>
       </dependency>
+      <dependency>
+        <groupId>io.dropwizard.metrics</groupId>
+        <artifactId>metrics-jmx</artifactId>
+        <version>${metrics.version}</version>
+      </dependency>
 
       <dependency>
         <groupId>com.beust</groupId>


Mime
View raw message