gobblin-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-gobblin] zxcware commented on a change in pull request #2622: Added random key generator for reporters. Added reporters fo…
Date Thu, 02 May 2019 04:20:53 GMT
zxcware commented on a change in pull request #2622: Added random key generator for reporters.
Added reporters fo…
URL: https://github.com/apache/incubator-gobblin/pull/2622#discussion_r280284823
 
 

 ##########
 File path: gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/KafkaReportingFormats.java
 ##########
 @@ -17,87 +17,170 @@
 
 package org.apache.gobblin.metrics;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Properties;
 
+import com.codahale.metrics.ScheduledReporter;
 import com.google.common.base.Splitter;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroEventReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroReporter;
 import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
+import org.apache.gobblin.metrics.kafka.KeyValueEventObjectReporter;
+import org.apache.gobblin.metrics.kafka.KeyValueMetricObjectReporter;
 import org.apache.gobblin.metrics.kafka.KafkaReporter;
+import org.apache.gobblin.metrics.kafka.PusherUtils;
+import org.apache.gobblin.metrics.reporter.util.KafkaAvroReporterUtil;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
  * Kafka reporting formats enumeration.
  */
 public enum KafkaReportingFormats {
 
-  AVRO,
-  AVRO_KEY_VALUE,
-  JSON;
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder} for this reporting
format.
-   *
-   * @param properties {@link Properties} containing information to build reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaReporter.Builder}.
-   */
-  public KafkaReporter.Builder<?> metricReporterBuilder(Properties properties) {
-    switch (this) {
-      case AVRO:
-        KafkaAvroReporter.Builder<?> builder = KafkaAvroReporter.BuilderFactory.newBuilder();
-        if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-        }
-        return builder;
-      case JSON:
-        return KafkaReporter.BuilderFactory.newBuilder();
-      default:
-        // This should never happen.
-        throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
+  AVRO() {
+
+    @Override
+    public void buildMetricsScheduledReporter(String brokers, String topic, Properties properties)
throws IOException {
+
+      KafkaAvroReporter.Builder<?> builder = KafkaAvroReporter.BuilderFactory.newBuilder();
+      if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      builder.build(brokers, topic, properties);
+
+    }
+
+    @Override
+    public ScheduledReporter buildEventsScheduledReporter(String brokers, String topic, MetricContext
context, Properties properties) throws IOException {
+
+      KafkaAvroEventReporter.Builder<?> builder = KafkaAvroEventReporter.Factory.forContext(context);
+      if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          ? properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+              PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+      builder.withPusherClassName(pusherClassName);
+
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      // the kafka configuration is composed of the metrics reporting specific keys with
a fallback to the shared
+      // kafka config
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
+          PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
+          ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+      builder.withConfig(kafkaConfig);
+
+      return builder.build(brokers, topic);
+
     }
-  }
-
-  /**
-   * Get a {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder} for this reporting
format.
-   * @param context {@link MetricContext} that should be reported.
-   * @param properties {@link Properties} containing information to build reporters.
-   * @return {@link org.apache.gobblin.metrics.kafka.KafkaEventReporter.Builder}.
-   */
-  public KafkaEventReporter.Builder<?> eventReporterBuilder(MetricContext context,
Properties properties) {
-    switch (this) {
-      case AVRO:
-        KafkaAvroEventReporter.Builder<?> kafkaAvroEventReporterBuilder = KafkaAvroEventReporter.Factory.forContext(context);
-        if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          kafkaAvroEventReporterBuilder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-        }
-        return kafkaAvroEventReporterBuilder;
-
-      case AVRO_KEY_VALUE:
-        KafkaAvroEventKeyValueReporter.Builder<?> kafkaAvroEventKeyValueReporterBuilder
= KafkaAvroEventKeyValueReporter.Factory.forContext(context);
-        if (properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
{
-          List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
-              .splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
-          kafkaAvroEventKeyValueReporterBuilder.withKeys(keys);
-        }
-        if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
-            ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
-          kafkaAvroEventKeyValueReporterBuilder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
-        }
-        return kafkaAvroEventKeyValueReporterBuilder;
-
-      case JSON:
-        return KafkaEventReporter.Factory.forContext(context);
-
-      default:
-        // This should never happen.
-        throw new IllegalArgumentException("KafkaReportingFormat not recognized.");
+  },
+  AVRO_KEY_VALUE() {
+
+    @Override
+    public void buildMetricsScheduledReporter(String brokers, String topic, Properties properties)
throws IOException {
+
+      throw new IOException("Unsupported format for Metric reporting " + this.name());
     }
-  }
+
+    @Override
+    public ScheduledReporter buildEventsScheduledReporter(String brokers, String topic, MetricContext
context, Properties properties) throws IOException {
+
+      KafkaAvroEventKeyValueReporter.Builder<?> builder = KafkaAvroEventKeyValueReporter.Factory.forContext(context);
+      if (properties.containsKey(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS))
{
+        List<String> keys = Splitter.on(",").omitEmptyStrings().trimResults()
+            .splitToList(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_EVENTS_KAFKAPUSHERKEYS));
+        builder.withKeys(keys);
+      }
+      if (Boolean.valueOf(properties.getProperty(ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY,
+          ConfigurationKeys.DEFAULT_METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY))) {
+        builder.withSchemaRegistry(new KafkaAvroSchemaRegistry(properties));
+      }
+      String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          ? properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+              PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+      builder.withPusherClassName(pusherClassName);
+
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      // the kafka configuration is composed of the metrics reporting specific keys with
a fallback to the shared
+      // kafka config
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
+          PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
+          ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+      builder.withConfig(kafkaConfig);
+
+      return builder.build(brokers, topic);
+
+    }
+  },
+  JSON() {
+
+    @Override
+    public void buildMetricsScheduledReporter(String brokers, String topic, Properties properties)
throws IOException {
+      KafkaReporter.Builder builder = KafkaReporter.BuilderFactory.newBuilder();
+      builder.build(brokers, topic, properties);
+    }
+
+    @Override
+    public ScheduledReporter buildEventsScheduledReporter(String brokers, String topic, MetricContext
context, Properties properties) throws IOException {
+       KafkaEventReporter.Builder builder = KafkaEventReporter.Factory.forContext(context);
+       //builder.withConfig(getEventsConfig(properties));
+       String pusherClassName = properties.containsKey(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          ? properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY_FOR_EVENTS)
+          : properties.getProperty(PusherUtils.KAFKA_PUSHER_CLASS_NAME_KEY,
+              PusherUtils.DEFAULT_KAFKA_PUSHER_CLASS_NAME);
+       builder.withPusherClassName(pusherClassName);
+
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      // the kafka configuration is composed of the metrics reporting specific keys with
a fallback to the shared
+      // kafka config
+      Config kafkaConfig = ConfigUtils.getConfigOrEmpty(allConfig,
+          PusherUtils.METRICS_REPORTING_KAFKA_CONFIG_PREFIX).withFallback(ConfigUtils.getConfigOrEmpty(allConfig,
+          ConfigurationKeys.SHARED_KAFKA_CONFIG_PREFIX));
+
+      builder.withConfig(kafkaConfig);
+
+       return builder.build(brokers, topic);
+    }
+  },
+  PLAIN_OBJECT() {
+
+    @Override
+    public void buildMetricsScheduledReporter(String brokers, String topic, Properties properties)
throws IOException {
+
+      KeyValueMetricObjectReporter.Builder<?> builder = KeyValueMetricObjectReporter.Factory.newBuilder();
+      builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      Config config = ConfigUtils.getConfigOrEmpty(allConfig, ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX).withFallback(allConfig);
+      builder.build(brokers, topic, config);
+
+    }
+
+    @Override
+    public ScheduledReporter buildEventsScheduledReporter(String brokers, String topic, MetricContext
context, Properties properties) throws IOException {
+
+      KeyValueEventObjectReporter.Builder<?> builder = KeyValueEventObjectReporter.Factory.forContext(context);
+      Config allConfig = ConfigUtils.propertiesToConfig(properties);
+      Config config = ConfigUtils.getConfigOrEmpty(allConfig, ConfigurationKeys.METRICS_REPORTING_EVENTS_CONFIGURATIONS_PREFIX).withFallback(allConfig);
+      builder.withConfig(config);
+      builder.namespaceOverride(KafkaAvroReporterUtil.extractOverrideNamespace(properties));
+      return builder.build(brokers, topic);
+    }
+  };
+
+  public abstract void buildMetricsScheduledReporter(String brokers, String topic, Properties
properties) throws IOException;
 
 Review comment:
   can use a more concise name `buildMetricsReporter`, the same applied to `buildEventsReporter`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message