metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ceste...@apache.org
Subject [1/3] incubator-metron git commit: METRON-797: Pass security.protocol and enable auto-renew for the storm topologies
Date Thu, 30 Mar 2017 22:17:25 GMT
Repository: incubator-metron
Updated Branches:
  refs/heads/master 98dc7659a -> aef84636a


METRON-797: Pass security.protocol and enable auto-renew for the storm topologies


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/dae102b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/dae102b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/dae102b0

Branch: refs/heads/master
Commit: dae102b0228b969d4e685a81dd6df25e59f63cb5
Parents: 98dc765
Author: cstella <cestella@gmail.com>
Authored: Wed Mar 29 08:17:52 2017 -0400
Committer: cstella <cestella@gmail.com>
Committed: Wed Mar 29 08:17:52 2017 -0400

----------------------------------------------------------------------
 dependencies_with_url.csv                       |  5 ++
 .../src/main/config/profiler.properties         |  6 +-
 .../src/main/flux/profiler/remote.yaml          | 17 ++++-
 .../integration/ProfilerIntegrationTest.java    |  2 +
 .../METRON/CURRENT/configuration/metron-env.xml |  2 +
 .../package/templates/enrichment.properties.j2  |  2 +
 .../roles/metron-builder/tasks/main.yml         |  2 +-
 .../flatfile/importer/MapReduceImporter.java    |  2 +
 .../src/main/config/elasticsearch.properties    |  2 +
 .../src/main/config/enrichment.properties       |  3 +
 .../src/main/flux/enrichment/remote.yaml        | 29 +++++++-
 .../integration/EnrichmentIntegrationTest.java  |  2 +
 metron-platform/metron-hbase/pom.xml            | 15 ++++
 .../src/main/flux/indexing/remote.yaml          | 17 +++++
 .../integration/IndexingIntegrationTest.java    |  2 +
 .../parsers/topology/ParserTopologyBuilder.java | 75 +++++++++++++++++---
 .../parsers/topology/ParserTopologyCLI.java     | 53 +++++++++++---
 .../components/ParserTopologyComponent.java     |  8 +--
 .../parsers/topology/ParserTopologyCLITest.java | 18 ++++-
 .../src/main/config/pcap.properties             |  2 +
 .../src/main/flux/pcap/remote.yaml              |  6 ++
 .../PcapTopologyIntegrationTest.java            |  2 +
 .../metron-solr/src/main/config/solr.properties |  2 +
 .../metron/writer/hdfs/SourceHandler.java       |  2 -
 .../apache/metron/writer/kafka/KafkaWriter.java |  7 +-
 25 files changed, 247 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 21f0cb5..25650a3 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -65,6 +65,7 @@ com.sun.jersey:jersey-json:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
 com.sun.jersey:jersey-server:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
 com.thoughtworks.paranamer:paranamer:jar:2.3:compile,BSD,https://github.com/paul-hammant/paranamer
 javax.servlet.jsp:jsp-api:jar:2.1:runtime,CDDL,http://oracle.com
+javax.servlet.jsp:jsp-api:jar:2.1:compile,CDDL,http://oracle.com
 javax.servlet:servlet-api:jar:2.5:compile,CDDL,http://oracle.com
 net.jcip:jcip-annotations:jar:1.0:compile,Public,http://jcip.net/
 org.codehaus.jettison:jettison:jar:1.1:compile,ASLv2,https://github.com/codehaus/jettison
@@ -157,6 +158,7 @@ commons-digester:commons-digester:jar:1.8:compile,The Apache Software
License, V
 commons-digester:commons-digester:jar:2.1:compile,ASLv2,http://commons.apache.org/digester/
 commons-el:commons-el:jar:1.0:provided,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
 commons-el:commons-el:jar:1.0:runtime,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
+commons-el:commons-el:jar:1.0:compile,The Apache Software License, Version 2.0,http://jakarta.apache.org/commons/el/
 commons-httpclient:commons-httpclient:jar:3.1:compile,Apache License,http://jakarta.apache.org/httpcomponents/httpclient-3.x/
 commons-io:commons-io:jar:2.4:compile,ASLv2,http://commons.apache.org/io/
 commons-io:commons-io:jar:2.5:compile,ASLv2,http://commons.apache.org/io/
@@ -288,3 +290,6 @@ com.h2database:h2:jar:1.4.192:compile,EPL 1.0,http://www.h2database.com/html/lic
 de.jollyday:jollyday:jar:0.5.2:compile,ASLv2,http://jollyday.sourceforge.net/license.html
 org.threeten:threeten-extra:jar:1.0:compile,BSD,http://www.threeten.org/threeten-extra/license.html
 org.atteo.classindex:classindex:jar:3.3:compile,ASLv2,https://github.com/atteo/classindex
+com.squareup.okhttp:okhttp:jar:2.4.0:compile,ASLv2,https://github.com/square/okhttp
+com.squareup.okio:okio:jar:1.4.0:compile,ASLv2,https://github.com/square/okhttp
+org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.org/

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/main/config/profiler.properties
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/config/profiler.properties b/metron-analytics/metron-profiler/src/main/config/profiler.properties
index 860934f..b79ac73 100644
--- a/metron-analytics/metron-profiler/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler/src/main/config/profiler.properties
@@ -20,6 +20,10 @@
 
 ##### Storm #####
 
+storm.auto.credentials=[]
+
+##### Profiler #####
+
 profiler.workers=1
 profiler.executors=0
 profiler.input.topic=indexing
@@ -34,10 +38,10 @@ profiler.hbase.column.family=P
 profiler.hbase.batch=10
 profiler.hbase.flush.interval.seconds=30
 
-
 ##### Kafka #####
 
 kafka.zk=node1:2181
 kafka.broker=node1:6667
 # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
 kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
index 7ea77a5..0b14bce 100644
--- a/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler/src/main/flux/profiler/remote.yaml
@@ -17,7 +17,7 @@
 name: "profiler"
 
 config:
-
+    topology.auto-credentials: ${storm.auto.credentials}
     topology.workers: ${profiler.workers}
     topology.acker.executors: ${profiler.executors}
 
@@ -60,6 +60,10 @@ components:
                 args:
                     - "group.id"
                     - "profiler"
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
 
   # The fields to pull out of the kafka messages
     -   id: "fields"
@@ -83,6 +87,13 @@ components:
                 args:
                     - "${kafka.start}"
 
+    -   id: "kafkaWriterProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
 
     -   id: "kafkaWriter"
         className: "org.apache.metron.writer.kafka.KafkaWriter"
@@ -91,6 +102,8 @@ components:
                 args: ["${profiler.output.topic}"]
             -   name: "withZkQuorum"
                 args: ["${kafka.zk}"]
+            -   name: "withProducerConfigs"
+                args: [ref: "kafkaWriterProps"]
 
     -   id: "kafkaDestinationHandler"
         className: "org.apache.metron.profiler.bolt.KafkaDestinationHandler"
@@ -174,4 +187,4 @@ streams:
         to: "kafkaBolt"
         grouping:
             streamId: "kafka"
-            type: SHUFFLE
\ No newline at end of file
+            type: SHUFFLE

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
index bca8ed5..7591300 100644
--- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/integration/ProfilerIntegrationTest.java
@@ -305,6 +305,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       setProperty("profiler.hbase.flush.interval.seconds", "1");
       setProperty("profiler.profile.ttl", "20");
       setProperty("hbase.provider.impl", "" + MockTableProvider.class.getName());
+      setProperty("storm.auto.credentials", "[]");
+      setProperty("kafka.security.protocol", "PLAINTEXT");
     }};
 
     // create the mock table

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 199e708..108e0ba 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -178,6 +178,8 @@ indexing.executors=0
 kafka.zk={{ zookeeper_quorum }}
 kafka.broker={{ kafka_brokers }}
 kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT
+storm.auto.credentials=[]
 ##### Indexing #####
 index.input.topic=indexing
 index.error.topic=indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
index dc108f7..8fc2335 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/enrichment.properties.j2
@@ -19,6 +19,8 @@
 
 kafka.zk={{zookeeper_quorum}}
 kafka.broker={{kafka_brokers}}
+kafka.security.protocol=PLAINTEXT
+storm.auto.credentials=[]
 enrichment.output.topic=indexing
 enrichment.error.topic=enrichments_error
 threat.intel.error.topic=threatintel_error

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-deployment/roles/metron-builder/tasks/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/roles/metron-builder/tasks/main.yml b/metron-deployment/roles/metron-builder/tasks/main.yml
index 889eafe..3f4906e 100644
--- a/metron-deployment/roles/metron-builder/tasks/main.yml
+++ b/metron-deployment/roles/metron-builder/tasks/main.yml
@@ -16,6 +16,6 @@
 #
 ---
 - name: Build Deployment Artifacts
-  local_action: shell cd {{ metron_build_dir }} && mvn clean package -DskipTests
-P HDP-2.5.0.0,mpack,build-rpms
+  local_action: shell cd {{ metron_build_dir }} && mvn clean package -DskipTests
-T 2C -P HDP-2.5.0.0,mpack,build-rpms
   become: false
   run_once: true

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
index e83bdd6..63a84cb 100644
--- a/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
+++ b/metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/flatfile/importer/MapReduceImporter.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.log4j.Logger;
@@ -66,6 +67,7 @@ public enum MapReduceImporter implements Importer{
     job.setNumReduceTasks(0);
     List<Path> paths = inputs.stream().map(p -> new Path(p)).collect(Collectors.toList());
     handler.getInputFormat().set(job, paths, handler.getConfig());
+    TableMapReduceUtil.initCredentials(job);
     try {
       job.waitForCompletion(true);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
index 317742b..d45d3d4 100644
--- a/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
+++ b/metron-platform/metron-elasticsearch/src/main/config/elasticsearch.properties
@@ -17,6 +17,7 @@
 ##### Storm #####
 indexing.workers=1
 indexing.executors=0
+storm.auto.credentials=[]
 
 ##### Kafka #####
 
@@ -24,6 +25,7 @@ kafka.zk=node1:2181
 kafka.broker=node1:6667
 # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
 kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT
 
 ##### Indexing #####
 index.input.topic=indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/main/config/enrichment.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/config/enrichment.properties b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
index c905d30..af5b27b 100644
--- a/metron-platform/metron-enrichment/src/main/config/enrichment.properties
+++ b/metron-platform/metron-enrichment/src/main/config/enrichment.properties
@@ -19,6 +19,9 @@
 
 kafka.zk=node1:2181
 kafka.broker=node1:6667
+kafka.security.protocol=PLAINTEXT
+storm.auto.credentials=[]
+
 enrichment.output.topic=indexing
 enrichment.error.topic=indexing
 threat.intel.error.topic=indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
index 439105f..51fc7ce 100644
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
+++ b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
@@ -18,8 +18,10 @@ name: "enrichment"
 config:
     topology.workers: 1
     topology.acker.executors: 0
+    topology.auto-credentials: ${storm.auto.credentials}
 
 components:
+
 # Enrichment
     -   id: "stellarEnrichmentAdapter"
         className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
@@ -28,6 +30,15 @@ components:
                 args:
                     - "ENRICHMENT"
 
+    # Any kafka props for the producer go here.
+    -   id: "kafkaWriterProps"
+        className: "java.util.HashMap"
+        configMethods:
+          -   name: "put"
+              args:
+                  - "security.protocol"
+                  - "${kafka.security.protocol}"
+
     -   id: "stellarEnrichment"
         className: "org.apache.metron.enrichment.configuration.Enrichment"
         constructorArgs:
@@ -100,6 +111,9 @@ components:
             -   name: "withZkQuorum"
                 args:
                     - "${kafka.zk}"
+            -   name: "withProducerConfigs"
+                args: 
+                    - ref: "kafkaWriterProps"
 
 # Threat Intel
     -   id: "stellarThreatIntelAdapter"
@@ -163,7 +177,9 @@ components:
             -   name: "withZkQuorum"
                 args:
                     - "${kafka.zk}"
-
+            -   name: "withProducerConfigs"
+                args: 
+                    - ref: "kafkaWriterProps"
 #indexing
     -   id: "kafkaWriter"
         className: "org.apache.metron.writer.kafka.KafkaWriter"
@@ -174,9 +190,12 @@ components:
             -   name: "withZkQuorum"
                 args:
                     - "${kafka.zk}"
+            -   name: "withProducerConfigs"
+                args: 
+                    - ref: "kafkaWriterProps"
 
 #kafka/zookeeper
-    # Any kafka props for the producer go here.
+    # Any kafka props for the consumer go here.
     -   id: "kafkaProps"
         className: "java.util.HashMap"
         configMethods:
@@ -192,6 +211,11 @@ components:
               args:
                   - "group.id"
                   - "enrichments"
+          -   name: "put"
+              args:
+                  - "security.protocol"
+                  - "${kafka.security.protocol}"
+
 
   # The fields to pull out of the kafka messages
     -   id: "fields"
@@ -299,6 +323,7 @@ bolts:
                 args:
                     - ref: "enrichmentErrorKafkaWriter"
 
+
 # Threat Intel Bolts
     -   id: "threatIntelSplitBolt"
         className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index e012c55..77b64dc 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -141,6 +141,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       setProperty("enrichment.simple.hbase.cf", cf);
       setProperty("enrichment.output.topic", Constants.INDEXING_TOPIC);
       setProperty("enrichment.error.topic", ERROR_TOPIC);
+      setProperty("kafka.security.protocol", "PLAINTEXT");
+      setProperty("storm.auto.credentials", "[]");
     }};
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>()
{{

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/pom.xml b/metron-platform/metron-hbase/pom.xml
index 36df5a3..4921859 100644
--- a/metron-platform/metron-hbase/pom.xml
+++ b/metron-platform/metron-hbase/pom.xml
@@ -118,6 +118,21 @@
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>
+            <artifactId>storm-hbase</artifactId>
+            <version>${global_storm_version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-server</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hbase</groupId>
+                    <artifactId>hbase-client</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${global_storm_version}</version>
             <scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
index 3905a7a..ec423c5 100644
--- a/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
+++ b/metron-platform/metron-indexing/src/main/flux/indexing/remote.yaml
@@ -15,9 +15,11 @@
 # limitations under the License.
 
 name: "indexing"
+
 config:
     topology.workers: ${indexing.workers}
     topology.acker.executors: ${indexing.executors}
+    topology.auto-credentials: ${storm.auto.credentials}
 
 components:
 
@@ -49,6 +51,15 @@ components:
             -   name: "withRotationPolicy"
                 args:
                     - ref: "hdfsRotationPolicy"
+
+    -   id: "kafkaWriterProps"
+        className: "java.util.HashMap"
+        configMethods:
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
+
     -   id: "kafkaWriter"
         className: "org.apache.metron.writer.kafka.KafkaWriter"
         configMethods:
@@ -58,6 +69,8 @@ components:
             -   name: "withZkQuorum"
                 args:
                     - "${kafka.zk}"
+            -   name: "withProducerConfigs"
+                args: [ref: "kafkaWriterProps"]
 
     -   id: "indexWriter"
         className: "${writer.class.name}"
@@ -79,6 +92,10 @@ components:
                 args:
                     - "group.id"
                     - "indexing"
+            -   name: "put"
+                args:
+                    - "security.protocol"
+                    - "${kafka.security.protocol}"
 
 # The fields to pull out of the kafka messages
     -   id: "fields"

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
index 394fbf0..cc7d7e3 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/IndexingIntegrationTest.java
@@ -121,6 +121,8 @@ public abstract class IndexingIntegrationTest extends BaseIntegrationTest
{
     final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
     final Properties topologyProperties = new Properties() {{
       setProperty("kafka.start", "UNCOMMITTED_EARLIEST");
+      setProperty("kafka.security.protocol", "PLAINTEXT");
+      setProperty("storm.auto.credentials", "[]");
       setProperty("indexing.workers", "1");
       setProperty("indexing.executors", "0");
       setProperty("index.input.topic", Constants.INDEXING_TOPIC);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index b347ca5..e9acbaa 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -64,7 +64,7 @@ public class ParserTopologyBuilder {
    * @throws Exception
    */
   public static TopologyBuilder build(String zookeeperUrl,
-                                      String brokerUrl,
+                                      Optional<String> brokerUrl,
                                       String sensorType,
                                       int spoutParallelism,
                                       int spoutNumTasks,
@@ -72,7 +72,8 @@ public class ParserTopologyBuilder {
                                       int parserNumTasks,
                                       int errorWriterParallelism,
                                       int errorWriterNumTasks,
-                                      Map<String, Object> kafkaSpoutConfig
+                                      Map<String, Object> kafkaSpoutConfig,
+                                      Optional<String> securityProtocol
   ) throws Exception {
 
     // fetch configuration from zookeeper
@@ -81,19 +82,19 @@ public class ParserTopologyBuilder {
 
     // create the spout
     TopologyBuilder builder = new TopologyBuilder();
-    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, Optional.ofNullable(kafkaSpoutConfig)
, parserConfig);
+    KafkaSpout kafkaSpout = createKafkaSpout(zookeeperUrl, sensorType, securityProtocol,
Optional.ofNullable(kafkaSpoutConfig) , parserConfig);
     builder.setSpout("kafkaSpout", kafkaSpout, spoutParallelism)
             .setNumTasks(spoutNumTasks);
 
     // create the parser bolt
-    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, configs,
parserConfig);
+    ParserBolt parserBolt = createParserBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol,
configs, parserConfig);
     builder.setBolt("parserBolt", parserBolt, parserParallelism)
             .setNumTasks(parserNumTasks)
             .shuffleGrouping("kafkaSpout");
 
     // create the error bolt, if needed
     if (errorWriterNumTasks > 0) {
-      WriterBolt errorBolt = createErrorBolt(brokerUrl, sensorType, configs, parserConfig);
+      WriterBolt errorBolt = createErrorBolt(zookeeperUrl, brokerUrl, sensorType, securityProtocol,
configs, parserConfig);
       builder.setBolt("errorMessageWriter", errorBolt, errorWriterParallelism)
               .setNumTasks(errorWriterNumTasks)
               .shuffleGrouping("parserBolt", Constants.ERROR_STREAM);
@@ -111,7 +112,13 @@ public class ParserTopologyBuilder {
    * @param parserConfig            Configuration for the parser
    * @return
    */
-  private static StormKafkaSpout<Object, Object> createKafkaSpout(String zkQuorum,
String sensorType, Optional<Map<String, Object>> kafkaConfigOptional, SensorParserConfig
parserConfig) {
+  private static StormKafkaSpout<Object, Object> createKafkaSpout( String zkQuorum
+                                                 , String sensorType
+                                                 , Optional<String> securityProtocol
+                                                 , Optional<Map<String, Object>>
kafkaConfigOptional
+                                                 , SensorParserConfig parserConfig
+                                                 )
+  {
     Map<String, Object> kafkaSpoutConfigOptions = kafkaConfigOptional.orElse(new HashMap<>());
     String inputTopic = parserConfig.getSensorTopic() != null ? parserConfig.getSensorTopic()
: sensorType;
     kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key
@@ -120,9 +127,32 @@ public class ParserTopologyBuilder {
     kafkaSpoutConfigOptions.putIfAbsent( KafkaSpoutConfig.Consumer.GROUP_ID
             , inputTopic + "_parser"
     );
+    if(securityProtocol.isPresent()) {
+      kafkaSpoutConfigOptions.putIfAbsent("security.protocol", securityProtocol.get());
+    }
     return SimpleStormKafkaBuilder.create(inputTopic, zkQuorum, Arrays.asList("value"), kafkaSpoutConfigOptions);
   }
 
+  private static KafkaWriter createKafkaWriter( Optional<String> broker
+                                              , String zkQuorum
+                                              , Optional<String> securityProtocol
+                                              )
+  {
+    KafkaWriter ret = null;
+    if(broker.isPresent()) {
+      ret = new KafkaWriter(broker.get());
+    }
+    else {
+      ret = new KafkaWriter().withZkQuorum(zkQuorum);
+    }
+    if(securityProtocol.isPresent()) {
+      HashMap<String, Object> config = new HashMap<>();
+      config.put("security.protocol", securityProtocol.get());
+      ret.withProducerConfigs(config);
+    }
+    return ret;
+  }
+
   /**
    * Create a bolt that parses input from a sensor.
    *
@@ -133,7 +163,14 @@ public class ParserTopologyBuilder {
    * @param parserConfig
    * @return A Storm bolt that parses input from a sensor
    */
-  private static ParserBolt createParserBolt(String zookeeperUrl, String brokerUrl, String
sensorType, ParserConfigurations configs, SensorParserConfig parserConfig) {
+  private static ParserBolt createParserBolt( String zookeeperUrl
+                                            , Optional<String> brokerUrl
+                                            , String sensorType
+                                            , Optional<String> securityProtocol
+                                            , ParserConfigurations configs
+                                            , SensorParserConfig parserConfig
+                                            )
+  {
 
     // create message parser
     MessageParser<JSONObject> parser = ReflectionUtils.createInstance(parserConfig.getParserClassName());
@@ -141,7 +178,10 @@ public class ParserTopologyBuilder {
 
     // create writer - if not configured uses a sensible default
     AbstractWriter writer = parserConfig.getWriterClassName() == null ?
-            new KafkaWriter(brokerUrl).withTopic(Constants.ENRICHMENT_TOPIC) :
+            createKafkaWriter( brokerUrl
+                             , zookeeperUrl
+                             , securityProtocol
+                             ).withTopic(Constants.ENRICHMENT_TOPIC) :
             ReflectionUtils.createInstance(parserConfig.getWriterClassName());
     writer.configure(sensorType, new ParserWriterConfiguration(configs));
 
@@ -154,17 +194,30 @@ public class ParserTopologyBuilder {
   /**
    * Create a bolt that handles error messages.
    *
+   * @param zookeeperUrl    Kafka zookeeper URL
    * @param brokerUrl    Kafka Broker URL
    * @param sensorType   Type of sensor that is being consumed.
+   * @param securityProtocol   Security protocol used (if any)
    * @param configs
    * @param parserConfig
    * @return A Storm bolt that handles error messages.
    */
-  private static WriterBolt createErrorBolt(String brokerUrl, String sensorType, ParserConfigurations
configs, SensorParserConfig parserConfig) {
+  private static WriterBolt createErrorBolt( String zookeeperUrl
+                                           , Optional<String> brokerUrl
+                                           , String sensorType
+                                           , Optional<String> securityProtocol
+                                           , ParserConfigurations configs
+                                           , SensorParserConfig parserConfig
+                                           )
+  {
 
     // create writer - if not configured uses a sensible default
-    AbstractWriter writer = parserConfig.getErrorWriterClassName() == null
-            ? new KafkaWriter(brokerUrl).withTopic((String) configs.getGlobalConfig().get("parser.error.topic")).withConfigPrefix("error")
+    AbstractWriter writer = parserConfig.getErrorWriterClassName() == null ?
+              createKafkaWriter( brokerUrl
+                               , zookeeperUrl
+                               , securityProtocol
+                               ).withTopic((String) configs.getGlobalConfig().get("parser.error.topic"))
+                                .withConfigPrefix("error")
             : ReflectionUtils.createInstance(parserConfig.getWriterClassName());
     writer.configure(sensorType, new ParserWriterConfiguration(configs));
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
index 8cf921e..d83146f 100644
--- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
+++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyCLI.java
@@ -17,10 +17,13 @@
  */
 package org.apache.metron.parsers.topology;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
 import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
+import org.apache.storm.hbase.security.AutoHBase;
+import org.apache.storm.hdfs.common.security.AutoHDFS;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.Utils;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -33,9 +36,7 @@ import org.apache.metron.parsers.topology.config.ConfigHandlers;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.function.Function;
 
 public class ParserTopologyCLI {
@@ -55,7 +56,7 @@ public class ParserTopologyCLI {
     BROKER_URL("k", code -> {
       Option o = new Option(code, "kafka", true, "Kafka Broker URL");
       o.setArgName("BROKER_URL");
-      o.setRequired(true);
+      o.setRequired(false);
       return o;
     }),
     SENSOR_TYPE("s", code -> {
@@ -176,6 +177,18 @@ public class ParserTopologyCLI {
       return o;
     }
     )
+    ,SECURITY_PROTOCOL("ksp", code -> {
+      Option o = new Option(code
+                           , "kafka_security_protocol"
+                           , true
+                           , "The kafka security protocol to use (if running with a kerberized
cluster).  E.g. PLAINTEXTSASL"
+                           );
+      o.setArgName("SECURITY_PROTOCOL");
+      o.setRequired(false);
+      o.setType(String.class);
+      return o;
+    }
+    )
     ,TEST("t", code ->
     {
       Option o = new Option("t", "test", true, "Run in Test Mode");
@@ -270,7 +283,7 @@ public class ParserTopologyCLI {
         System.exit(0);
       }
       String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd);;
-      String brokerUrl = ParserOptions.BROKER_URL.get(cmd);
+      Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty();
       String sensorType= ParserOptions.SENSOR_TYPE.get(cmd);
       int spoutParallelism = Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"));
       int spoutNumTasks = Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"));
@@ -284,7 +297,8 @@ public class ParserTopologyCLI {
       if(ParserOptions.SPOUT_CONFIG.has(cmd)) {
         spoutConfig = readSpoutConfig(new File(ParserOptions.SPOUT_CONFIG.get(cmd)));
       }
-
+      Optional<String> securityProtocol = ParserOptions.SECURITY_PROTOCOL.has(cmd)?Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)):Optional.empty();
+      securityProtocol = getSecurityProtocol(securityProtocol, spoutConfig);
       TopologyBuilder builder = ParserTopologyBuilder.build(zookeeperUrl,
               brokerUrl,
               sensorType,
@@ -294,10 +308,18 @@ public class ParserTopologyCLI {
               parserNumTasks,
               errorParallelism,
               errorNumTasks,
-              spoutConfig
+              spoutConfig,
+              securityProtocol
       );
       Config stormConf = ParserOptions.getConfig(cmd);
-
+      if(securityProtocol.isPresent() && !stormConf.containsKey(Config.TOPOLOGY_AUTO_CREDENTIALS))
{
+        //if I'm specifying it already, then I won't impose autohdfs and autohbase
+        List<String> autoCredentials = new ArrayList<>();
+        for (String credential : ImmutableList.of(AutoHDFS.class.getName(), AutoHBase.class.getName()))
{
+          autoCredentials.add(credential);
+        }
+        stormConf.put( Config.TOPOLOGY_AUTO_CREDENTIALS , autoCredentials );
+      }
       if (ParserOptions.TEST.has(cmd)) {
         stormConf.put(Config.TOPOLOGY_DEBUG, true);
         LocalCluster cluster = new LocalCluster();
@@ -312,6 +334,21 @@ public class ParserTopologyCLI {
       System.exit(-1);
     }
   }
+
+  private static Optional<String> getSecurityProtocol(Optional<String> protocol,
Map<String, Object> spoutConfig) {
+    Optional<String> ret = protocol;
+    if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+      ret = Optional.empty();
+    }
+    if(!ret.isPresent()) {
+      ret = Optional.ofNullable((String) spoutConfig.get("security.protocol"));
+    }
+    if(ret.isPresent() && protocol.get().equalsIgnoreCase("PLAINTEXT")) {
+      ret = Optional.empty();
+    }
+    return ret;
+  }
+
   private static Map<String, Object> readSpoutConfig(File inputFile) {
     String json = null;
     if (inputFile.exists()) {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
index 48bcbec..b6a76d0 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/integration/components/ParserTopologyComponent.java
@@ -33,10 +33,7 @@ import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 import static org.apache.metron.integration.components.FluxTopologyComponent.assassinateSlots;
 import static org.apache.metron.integration.components.FluxTopologyComponent.cleanupWorkerDir;
@@ -82,7 +79,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
   public void start() throws UnableToStartException {
     try {
       TopologyBuilder topologyBuilder = ParserTopologyBuilder.build(topologyProperties.getProperty("kafka.zk")
-                                                                   , brokerUrl
+                                                                   , Optional.ofNullable(brokerUrl)
                                                                    , sensorType
                                                                    , 1
                                                                    , 1
@@ -91,6 +88,7 @@ public class ParserTopologyComponent implements InMemoryComponent {
                                                                    , 1
                                                                    , 1
                                                                    , null
+                                                                   , Optional.empty()
                                                                    );
       Map<String, Object> stormConf = new HashMap<>();
       stormConf.put(Config.TOPOLOGY_DEBUG, true);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
index 5e70177..ac73a2b 100644
--- a/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
+++ b/metron-platform/metron-parsers/src/test/java/org/apache/metron/parsers/topology/ParserTopologyCLITest.java
@@ -34,9 +34,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.Map;
+import java.util.*;
 
 public class ParserTopologyCLITest {
 
@@ -75,6 +73,20 @@ public class ParserTopologyCLITest {
     }
   }
 
+  @Test
+  public void testNoOverlappingArgs() throws Exception {
+    Set<String> optionStrs = new HashSet<>();
+    for(ParserTopologyCLI.ParserOptions option : ParserTopologyCLI.ParserOptions.values())
{
+      if(optionStrs.contains(option.option.getLongOpt())) {
+        throw new IllegalStateException("Reused long option: " + option.option.getLongOpt());
+      }
+      if(optionStrs.contains(option.shortCode)) {
+        throw new IllegalStateException("Reused short option: " + option.shortCode);
+      }
+      optionStrs.add(option.option.getLongOpt());
+      optionStrs.add(option.shortCode);
+    }
+  }
 
   @Test
   public void testKafkaOffset_happyPath() throws ParseException {

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
index 48810c5..6e51dc5 100644
--- a/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
+++ b/metron-platform/metron-pcap-backend/src/main/config/pcap.properties
@@ -15,7 +15,9 @@
 #  limitations under the License.
 
 spout.kafka.topic.pcap=pcap
+storm.auto.credentials=[]
 kafka.zk=node1:2181
+kafka.security.protocol=PLAINTEXT
 kafka.pcap.start=END
 kafka.pcap.numPackets=1000
 kafka.pcap.maxTimeMS=300000

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
index 732991b..2b7e0fd 100644
--- a/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
+++ b/metron-platform/metron-pcap-backend/src/main/flux/pcap/remote.yaml
@@ -17,6 +17,7 @@
 name: "pcap"
 config:
     topology.workers: 1
+    topology.auto-credentials: ${storm.auto.credentials}
 
 components:
 
@@ -36,6 +37,11 @@ components:
           args:
             - "group.id"
             - "pcap"
+      -   name: "put"
+          args:
+            - "security.protocol"
+            - "${kafka.security.protocol}"
+
   - id: "kafkaConfig"
     className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
     constructorArgs:

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
index 8b292d7..84e7574 100644
--- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
+++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java
@@ -212,6 +212,8 @@ public class PcapTopologyIntegrationTest {
       setProperty("kafka.pcap.numPackets", "2");
       setProperty("kafka.pcap.maxTimeMS", "200000000");
       setProperty("kafka.pcap.ts_granularity", "NANOSECONDS");
+      setProperty("storm.auto.credentials", "[]");
+      setProperty("kafka.security.protocol", "PLAINTEXT");
     }};
     updatePropertiesCallback.apply(topologyProperties);
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-solr/src/main/config/solr.properties
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties
index 35f368c..914832d 100644
--- a/metron-platform/metron-solr/src/main/config/solr.properties
+++ b/metron-platform/metron-solr/src/main/config/solr.properties
@@ -17,6 +17,7 @@
 ##### Storm #####
 indexing.workers=1
 indexing.executors=0
+storm.auto.credentials=[]
 
 ##### Kafka #####
 
@@ -24,6 +25,7 @@ kafka.zk=node1:2181
 kafka.broker=node1:6667
 # One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST
 kafka.start=UNCOMMITTED_EARLIEST
+kafka.security.protocol=PLAINTEXT
 
 ##### Indexing #####
 index.input.topic=indexing

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index f03ac41..ba3f96c 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -92,9 +92,7 @@ public class SourceHandler {
   }
 
   private void initialize(Map config) throws IOException {
-    Configuration hdfsConfig = new Configuration();
     this.fs = FileSystem.get(new Configuration());
-    HdfsSecurityUtil.login(config, hdfsConfig);
     this.currentFile = createOutputFile();
     if(this.rotationPolicy instanceof TimedRotationPolicy){
       long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/dae102b0/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index 5c00e52..1884f5d 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -105,7 +105,12 @@ public class KafkaWriter extends AbstractWriter implements MessageWriter<JSONObj
   }
 
   public KafkaWriter withProducerConfigs(Map<String, Object> extraConfigs) {
-    this.producerConfigs = extraConfigs;
+    if(producerConfigs == null) {
+      this.producerConfigs = extraConfigs;
+    }
+    else if(extraConfigs != null){
+      producerConfigs.putAll(extraConfigs);
+    }
     return this;
   }
 


Mime
View raw message