metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject metron git commit: METRON-950: Migrate storm-kafka-client to 1.1 closes apache/incubator-metron#584
Date Tue, 16 May 2017 15:02:58 GMT
Repository: metron
Updated Branches:
  refs/heads/master 1277b6c32 -> 6c836d136


METRON-950: Migrate storm-kafka-client to 1.1 closes apache/incubator-metron#584


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/6c836d13
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/6c836d13
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/6c836d13

Branch: refs/heads/master
Commit: 6c836d13635fcf1639ea7b8adbaf1ba4051c4f1c
Parents: 1277b6c
Author: cstella <cestella@gmail.com>
Authored: Tue May 16 11:02:43 2017 -0400
Committer: cstella <cestella@gmail.com>
Committed: Tue May 16 11:02:43 2017 -0400

----------------------------------------------------------------------
 .../parsers/topology/ParserTopologyBuilder.java |   3 +-
 .../metron-storm-kafka-override/pom.xml         | 101 +++++++++++++++++++
 .../storm/kafka/spout/internal/Timer.java       |  58 +++++++++++
 .../storm/kafka/spout/internal/TimerTest.java   |  36 +++++++
 metron-platform/metron-storm-kafka/pom.xml      |   5 +
 .../kafka/flux/SimpleStormKafkaBuilder.java     |  90 +++++++++--------
 .../storm/kafka/flux/SpoutConfiguration.java    |  20 ++--
 .../kafka/flux/SpoutConfigurationTest.java      |  13 +--
 metron-platform/pom.xml                         |   1 +
 pom.xml                                         |  42 ++++----
 10 files changed, 289 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index e9acbaa..196c19d 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.parsers.topology;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
@@ -124,7 +125,7 @@ public class ParserTopologyBuilder {
     kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key
             , KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.toString()
     );
-    kafkaSpoutConfigOptions.putIfAbsent( KafkaSpoutConfig.Consumer.GROUP_ID
+    kafkaSpoutConfigOptions.putIfAbsent( ConsumerConfig.GROUP_ID_CONFIG
             , inputTopic + "_parser"
     );
     if(securityProtocol.isPresent()) {

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka-override/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka-override/pom.xml b/metron-platform/metron-storm-kafka-override/pom.xml
new file mode 100644
index 0000000..8683176
--- /dev/null
+++ b/metron-platform/metron-storm-kafka-override/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.4.0</version>
+    </parent>
+    <artifactId>metron-storm-kafka-override</artifactId>
+    <name>metron-storm-kafka-override</name>
+    <description>Components that extend the Storm/Kafka spout</description>
+    <url>https://metron.apache.org/</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <commons.config.version>1.10</commons.config.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka-client</artifactId>
+            <version>${global_storm_kafka_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${global_kafka_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${global_storm_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>${global_kafka_version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.jmx</groupId>
+                    <artifactId>jmxri</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jdmk</groupId>
+                    <artifactId>jmxtools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
new file mode 100644
index 0000000..f9782ab
--- /dev/null
+++ b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.utils.Time;
+
+public class Timer {
+  private final long delay;
+  private final long period;
+  private final TimeUnit timeUnit;
+  private final long periodNanos;
+  private long start;
+
+  public Timer(long delay, long period, TimeUnit timeUnit) {
+    this.delay = delay;
+    this.period = period;
+    this.timeUnit = timeUnit;
+    this.periodNanos = timeUnit.toNanos(period);
+    this.start = System.nanoTime() + timeUnit.toNanos(delay);
+  }
+
+  public long period() {
+    return this.period;
+  }
+
+  public long delay() {
+    return this.delay;
+  }
+
+  public TimeUnit getTimeUnit() {
+    return this.timeUnit;
+  }
+
+  public boolean isExpiredResetOnTrue() {
+    boolean expired = System.nanoTime() - this.start >= this.periodNanos;
+    if(expired) {
+      this.start = System.nanoTime();
+    }
+
+    return expired;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java
b/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java
new file mode 100644
index 0000000..0d49ae1
--- /dev/null
+++ b/metron-platform/metron-storm-kafka-override/src/test/java/org/apache/storm/kafka/spout/internal/TimerTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+public class TimerTest {
+
+  @Test
+  public void testReset() throws InterruptedException {
+    Timer t = new Timer(0, 2, TimeUnit.SECONDS);
+    Thread.sleep(1000);
+    Assert.assertFalse(t.isExpiredResetOnTrue());
+    Thread.sleep(1000);
+    Assert.assertTrue(t.isExpiredResetOnTrue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml
index b8e3f8d..5c28b34 100644
--- a/metron-platform/metron-storm-kafka/pom.xml
+++ b/metron-platform/metron-storm-kafka/pom.xml
@@ -31,6 +31,11 @@
     </properties>
     <dependencies>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-storm-kafka-override</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-kafka-client</artifactId>
             <version>${global_storm_kafka_version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
index bf5250b..592859e 100644
--- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
+++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
@@ -20,8 +20,10 @@ package org.apache.metron.storm.kafka.flux;
 
 import com.google.common.base.Joiner;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.storm.kafka.spout.*;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -30,10 +32,7 @@ import org.apache.storm.topology.OutputFieldsGetter;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.function.Function;
 
 /**
@@ -47,7 +46,6 @@ import java.util.function.Function;
  * @param <V> The kafka value type
  */
 public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V>
{
-  final static String STREAM = "default";
 
   /**
    * The fields exposed by the kafka consumer.  These will show up in the Storm tuple.
@@ -113,11 +111,12 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K,
V
    * @param <K> The key type in kafka
    * @param <V> The value type in kafka
    */
-  public static class TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V>
{
+  public static class SpoutRecordTranslator<K, V> implements RecordTranslator<K,V>
{
     private List<FieldsConfiguration> configurations;
-    private TupleBuilder(String topic, List<FieldsConfiguration> configurations) {
-      super(topic);
+    private Fields fields;
+    private SpoutRecordTranslator(List<FieldsConfiguration> configurations) {
       this.configurations = configurations;
+      this.fields = FieldsConfiguration.getFields(configurations);
     }
 
     /**
@@ -127,15 +126,27 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K,
V
      * @return list of tuples
      */
     @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+    public List<Object> apply(ConsumerRecord<K, V> consumerRecord) {
       Values ret = new Values();
       for(FieldsConfiguration config : configurations) {
         ret.add(config.recordExtractor.apply(consumerRecord));
       }
       return ret;
     }
+
+    @Override
+    public Fields getFieldsFor(String s) {
+      return fields;
+    }
+
+    @Override
+    public List<String> streams() {
+      return DEFAULT_STREAM;
+    }
   }
 
+  public static String DEFAULT_DESERIALIZER = ByteArrayDeserializer.class.getName();
+
   private String topic;
 
   /**
@@ -165,13 +176,39 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K,
V
                                 , List<String> fieldsConfiguration
                                 )
   {
-    super( modifyKafkaProps(kafkaProps, zkQuorum)
-         , createStreams(fieldsConfiguration, topic)
-         , createTuplesBuilder(fieldsConfiguration, topic)
-         );
+    super( getBootstrapServers(zkQuorum, kafkaProps)
+         , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)),
DEFAULT_DESERIALIZER)
+         , createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
DEFAULT_DESERIALIZER)
+         , topic
+    );
+    setProp(kafkaProps);
+    setRecordTranslator(new SpoutRecordTranslator<>(FieldsConfiguration.toList(fieldsConfiguration)));
     this.topic = topic;
   }
 
+  private static <T> Class<Deserializer<T>> createDeserializer( Optional<String>
deserializerClass
+                                                , String defaultDeserializerClass
+                                                )
+  {
+    try {
+      return (Class<Deserializer<T>>) Class.forName(deserializerClass.orElse(defaultDeserializerClass));
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to create a deserializer: " + deserializerClass.orElse(defaultDeserializerClass)
+ ": " + e.getMessage(), e);
+    }
+  }
+
+  private static String getBootstrapServers(String zkQuorum, Map<String, Object> kafkaProps)
{
+    String brokers = (String)kafkaProps.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+    if(brokers == null) {
+      try {
+        return Joiner.on(",").join(KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum));
+      } catch (Exception e) {
+        throw new IllegalStateException("Unable to find the bootstrap servers: " + e.getMessage(),
e);
+      }
+    }
+    return brokers;
+  }
+
   /**
    * Get the kafka topic.  TODO: In the future, support multiple topics and regex patterns.
    * @return
@@ -202,31 +239,4 @@ public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K,
V
     return new StormKafkaSpout<>(builder);
   }
 
-  private static Map<String, Object> modifyKafkaProps(Map<String, Object> props,
String zkQuorum) {
-    try {
-      if(!props.containsKey(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS)) {
-        //this isn't a putIfAbsent because I only want to pull the brokers from zk if it's
absent.
-        List<String> brokers = KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum);
-        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, Joiner.on(",").join(brokers));
-      }
-      props.putIfAbsent(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, ByteArrayDeserializer.class.getName());
-      props.putIfAbsent(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName());
-
-    } catch (Exception e) {
-      throw new IllegalStateException("Unable to retrieve brokers from zookeeper: " + e.getMessage(),
e);
-    }
-    return props;
-  }
-
-  private static <K,V> KafkaSpoutTuplesBuilder<K, V> createTuplesBuilder(List<String>
config, String topic) {
-    TupleBuilder<K, V> tb =  new TupleBuilder<K, V>(topic, FieldsConfiguration.toList(config));
-    return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(tb).build();
-  }
-
-
-  private static KafkaSpoutStreams createStreams(List<String> config, String topic)
{
-    final Fields fields = FieldsConfiguration.getFields(FieldsConfiguration.toList(config));
-    return new KafkaSpoutStreamsNamedTopics.Builder(fields, STREAM, new String[] { topic}
).build();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java
index 6c0f148..2a4586d 100644
--- a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java
+++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.storm.kafka.flux;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.metron.common.utils.ConversionUtils;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 
@@ -49,12 +51,6 @@ public enum SpoutConfiguration {
                  , container -> container.builder.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.valueOf(container.value.toString()))
                  )
   /**
-   * The maximum number of retries
-   */
-  ,MAX_RETRIES("spout.maxRetries"
-                 , container -> container.builder.setMaxRetries(ConversionUtils.convert(container.value,
Integer.class))
-                 )
-  /**
    * The maximum amount of uncommitted offsets
    */
   ,MAX_UNCOMMITTED_OFFSETS("spout.maxUncommittedOffsets"
@@ -66,6 +62,12 @@ public enum SpoutConfiguration {
   ,OFFSET_COMMIT_PERIOD_MS("spout.offsetCommitPeriodMs"
                  , container -> container.builder.setOffsetCommitPeriodMs(ConversionUtils.convert(container.value,
Long.class))
                  )
+  /**
+   * The partition refresh period in milliseconds
+   */
+  ,PARTITION_REFRESH_PERIOD_MS("spout.partitionRefreshPeriodMs"
+                 , container -> container.builder.setPartitionRefreshPeriodMs(ConversionUtils.convert(container.value,
Long.class))
+                 )
   ;
   private static class Container {
     Map<String, Object> config;
@@ -131,9 +133,9 @@ public enum SpoutConfiguration {
     for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) {
       ret.add(spoutConfig.key);
     }
-    ret.add(KafkaSpoutConfig.Consumer.GROUP_ID);
-    ret.add(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS);
-    ret.add(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT);
+    ret.add(ConsumerConfig.GROUP_ID_CONFIG);
+    ret.add(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+    ret.add(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
     return ret;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
index fdef69d..c6dbd8f 100644
--- a/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
+++ b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.metron.storm.kafka.flux;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.junit.Assert;
@@ -33,14 +34,14 @@ public class SpoutConfigurationTest {
   public void testSeparation() {
     Map<String, Object>  config = new HashMap<String, Object>() {{
       put(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key, "UNCOMMITTED_EARLIEST");
-      put(SpoutConfiguration.MAX_RETRIES.key, "1000");
+      put(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key, "1000");
       put("group.id", "foobar");
     }};
     Map<String, Object> spoutConfig = SpoutConfiguration.separate(config);
     Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key));
     Assert.assertEquals(spoutConfig.get(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key),
"UNCOMMITTED_EARLIEST");
-    Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.MAX_RETRIES.key));
-    Assert.assertEquals(spoutConfig.get(SpoutConfiguration.MAX_RETRIES.key), "1000");
+    Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key));
+    Assert.assertEquals(spoutConfig.get(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key),
"1000");
     Assert.assertEquals(2, spoutConfig.size());
     Assert.assertEquals(1, config.size());
     Assert.assertEquals(config.get("group.id"), "foobar");
@@ -49,15 +50,15 @@ public class SpoutConfigurationTest {
   @Test
   public void testBuilderCreation() {
     Map<String, Object>  config = new HashMap<String, Object>() {{
-      put(SpoutConfiguration.MAX_RETRIES.key, "1000");
-      put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "foo:1234");
+      put(SpoutConfiguration.OFFSET_COMMIT_PERIOD_MS.key, "1000");
+      put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "foo:1234");
       put("group.id", "foobar");
     }};
     Map<String, Object> spoutConfig = SpoutConfiguration.separate(config);
     KafkaSpoutConfig.Builder<Object, Object> builder = new SimpleStormKafkaBuilder(config,
"topic", null);
     SpoutConfiguration.configure(builder, spoutConfig);
     KafkaSpoutConfig c = builder.build();
-    Assert.assertEquals(1000, c.getMaxTupleRetries() );
+    Assert.assertEquals(1000, c.getOffsetsCommitPeriodMs() );
   }
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index 1376e5c..cc48851 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -58,6 +58,7 @@
 		<module>elasticsearch-shaded</module>
 		<module>metron-elasticsearch</module>
 		<module>metron-storm-kafka</module>
+		<module>metron-storm-kafka-override</module>
 	</modules>
 	<dependencies>
 		<dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/6c836d13/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bb32c64..9f62249 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,22 @@
 
     <repositories>
         <repository>
+            <releases>
+                <enabled>true</enabled>
+                <updatePolicy>always</updatePolicy>
+                <checksumPolicy>warn</checksumPolicy>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+                <updatePolicy>never</updatePolicy>
+                <checksumPolicy>warn</checksumPolicy>
+            </snapshots>
+            <id>HDPPrivateReleases</id>
+            <name>HDP Private Releases</name>
+            <url>http://nexus-private.hortonworks.com/nexus/content/groups/public</url>
+            <layout>default</layout>
+        </repository>
+        <repository>
             <id>clojars.org</id>
             <url>http://clojars.org/repo</url>
         </repository>
@@ -83,30 +99,7 @@
         <global_curator_version>2.7.1</global_curator_version>
         <global_classindex_version>3.3</global_classindex_version>
         <global_storm_version>1.0.3</global_storm_version>
-        <!--
-             This bears some explanation.  storm-kafka-client is our kafka spout.
-             If we ever hope to support kerberos, this provides the capability to do so
-             in apache.  Unfortunately, it also does not support, as of Storm 1.0.x
-             Kafka 0.10.x (see https://issues.apache.org/jira/browse/STORM-2091).  
-             The consumer libraries (not to be confused with the protocol) on the JVM
-             are binary incompatible.  Note the discussion on https://issues.apache.org/jira/browse/KAFKA-3006,
-             the main issue is the move to Collection over List.  While this would be polymorphically
-             a non-issue, it would require a recompile of storm-kafka-client against Kafka
0.10.x.
-
-             Since a targeted platform is HDP 2.5.x, which ships only kafka 0.10.x, we need
-             to support kafka 0.10.x.  Therefore, if we are to use this, then we would need

-             to support both Kafka 0.9.x and 0.10.x.  Unfortunately, this would require us

-             to fork some of the internal projects because the 0.9.x API has shifted 
-             (e.g. the Admin functions have different parameters) and behaves
-             differently than 0.10.x in subtle ways (e.g. KAFKA_GET doesn't work as implemented).
-
-             Rather than do this, we chose to depend on the HDP version of storm-kafka because

-             it is compiled against 0.10.x and therefore would allow us to not fork our support
-             for kafka.  I do not like this bleeding of the HDP profile dependency into the
default,
-             but I justify it by noting that this should be able to be removed when we migrate
to
-             Storm 1.1.x, which properly supports Kafka 0.10.x.
-          -->
-        <global_storm_kafka_version>1.0.1.2.5.0.0-1245</global_storm_kafka_version>
+        <global_storm_kafka_version>1.1.0</global_storm_kafka_version>
         <global_flux_version>${base_flux_version}</global_flux_version>
         <global_pcap_version>1.7.1</global_pcap_version>
         <global_kafka_version>0.10.0.1</global_kafka_version>
@@ -138,6 +131,7 @@
             <properties>
                 <hdp_version>2.5.0.0</hdp_version>
                 <build_number>1245</build_number>
+                <global_storm_kafka_version>1.1.0.2.6.1.0-SNAPSHOT</global_storm_kafka_version>
                 <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
                 <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
             </properties>


Mime
View raw message