metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From o...@apache.org
Subject [33/50] [abbrv] metron git commit: METRON-1499 Enable Configuration of Unified Enrichment Topology via Ambari (nickwallen) closes apache/metron#984
Date Tue, 15 May 2018 12:59:01 GMT
http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
deleted file mode 100644
index fd7ceff..0000000
--- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml
+++ /dev/null
@@ -1,590 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-name: "enrichment"
-config:
-    topology.workers: ${enrichment.workers}
-    topology.acker.executors: ${enrichment.acker.executors}
-    topology.worker.childopts: ${topology.worker.childopts}
-    topology.auto-credentials: ${topology.auto-credentials}
-    topology.max.spout.pending: ${topology.max.spout.pending}
-
-components:
-
-# Enrichment
-    -   id: "stellarEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
-        configMethods:
-            -   name: "ofType"
-                args:
-                    - "ENRICHMENT"
-
-    # Any kafka props for the producer go here.
-    -   id: "kafkaWriterProps"
-        className: "java.util.HashMap"
-        configMethods:
-          -   name: "put"
-              args:
-                  - "security.protocol"
-                  - "${kafka.security.protocol}"
-
-    -   id: "stellarEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "stellar"
-            -   ref: "stellarEnrichmentAdapter"
-
-    -   id: "geoEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter"
-    -   id: "geoEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "geo"
-            -   ref: "geoEnrichmentAdapter"
-    -   id: "hostEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter"
-        constructorArgs:
-            - '${enrichment.host.known_hosts}'
-    -   id: "hostEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "host"
-            -   ref: "hostEnrichmentAdapter"
-
-    -   id: "simpleHBaseEnrichmentConfig"
-        className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig"
-        configMethods:
-            -   name: "withProviderImpl"
-                args:
-                    - "${hbase.provider.impl}"
-            -   name: "withHBaseTable"
-                args:
-                    - "${enrichment.simple.hbase.table}"
-            -   name: "withHBaseCF"
-                args:
-                    - "${enrichment.simple.hbase.cf}"
-    -   id: "simpleHBaseEnrichmentAdapter"
-        className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter"
-        configMethods:
-           -    name: "withConfig"
-                args:
-                    - ref: "simpleHBaseEnrichmentConfig"
-    -   id: "simpleHBaseEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-          -   "hbaseEnrichment"
-          -   ref: "simpleHBaseEnrichmentAdapter"
-    -   id: "enrichments"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "simpleHBaseEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "stellarEnrichment"
-
-    #enrichment error
-    -   id: "enrichmentErrorKafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${enrichment.error.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-            -   name: "withProducerConfigs"
-                args: 
-                    - ref: "kafkaWriterProps"
-
-# Threat Intel
-    -   id: "stellarThreatIntelAdapter"
-        className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter"
-        configMethods:
-            -   name: "ofType"
-                args:
-                    - "THREAT_INTEL"
-    -   id: "stellarThreatIntelEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-            -   "stellar"
-            -   ref: "stellarThreatIntelAdapter"
-    -   id: "simpleHBaseThreatIntelConfig"
-        className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig"
-        configMethods:
-            -   name: "withProviderImpl"
-                args:
-                    - "${hbase.provider.impl}"
-            -   name: "withTrackerHBaseTable"
-                args:
-                    - "${threat.intel.tracker.table}"
-            -   name: "withTrackerHBaseCF"
-                args:
-                    - "${threat.intel.tracker.cf}"
-            -   name: "withHBaseTable"
-                args:
-                    - "${threat.intel.simple.hbase.table}"
-            -   name: "withHBaseCF"
-                args:
-                    - "${threat.intel.simple.hbase.cf}"
-    -   id: "simpleHBaseThreatIntelAdapter"
-        className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter"
-        configMethods:
-           -    name: "withConfig"
-                args:
-                    - ref: "simpleHBaseThreatIntelConfig"
-    -   id: "simpleHBaseThreatIntelEnrichment"
-        className: "org.apache.metron.enrichment.configuration.Enrichment"
-        constructorArgs:
-          -   "hbaseThreatIntel"
-          -   ref: "simpleHBaseThreatIntelAdapter"
-
-    -   id: "threatIntels"
-        className: "java.util.ArrayList"
-        configMethods:
-            -   name: "add"
-                args:
-                    - ref: "simpleHBaseThreatIntelEnrichment"
-            -   name: "add"
-                args:
-                    - ref: "stellarThreatIntelEnrichment"
-
-    #threatintel error
-    -   id: "threatIntelErrorKafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${threat.intel.error.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-            -   name: "withProducerConfigs"
-                args: 
-                    - ref: "kafkaWriterProps"
-#indexing
-    -   id: "kafkaWriter"
-        className: "org.apache.metron.writer.kafka.KafkaWriter"
-        configMethods:
-            -   name: "withTopic"
-                args:
-                    - "${enrichment.output.topic}"
-            -   name: "withZkQuorum"
-                args:
-                    - "${kafka.zk}"
-            -   name: "withProducerConfigs"
-                args: 
-                    - ref: "kafkaWriterProps"
-
-#kafka/zookeeper
-    # Any kafka props for the consumer go here.
-    -   id: "kafkaProps"
-        className: "java.util.HashMap"
-        configMethods:
-          -   name: "put"
-              args:
-                  - "value.deserializer"
-                  - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
-          -   name: "put"
-              args:
-                  - "key.deserializer"
-                  - "org.apache.kafka.common.serialization.ByteArrayDeserializer"
-          -   name: "put"
-              args:
-                  - "group.id"
-                  - "enrichments"
-          -   name: "put"
-              args:
-                  - "security.protocol"
-                  - "${kafka.security.protocol}"
-
-
-  # The fields to pull out of the kafka messages
-    -   id: "fields"
-        className: "java.util.ArrayList"
-        configMethods:
-          -   name: "add"
-              args:
-                  - "value"
-
-    -   id: "kafkaConfig"
-        className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder"
-        constructorArgs:
-          - ref: "kafkaProps"
-          # topic name
-          - "${enrichment.input.topic}"
-          - "${kafka.zk}"
-          - ref: "fields"
-        configMethods:
-            -   name: "setFirstPollOffsetStrategy"
-                args:
-                    - "${kafka.start}"
-
-
-spouts:
-    -   id: "kafkaSpout"
-        className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout"
-        constructorArgs:
-            - ref: "kafkaConfig"
-        parallelism: ${kafka.spout.parallelism}
-
-bolts:
-# Enrichment Bolts
-    -   id: "enrichmentSplitBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "enrichments"
-        parallelism: ${enrichment.split.parallelism}
-
-    -   id: "geoEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "geoEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-
-    -   id: "stellarEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "stellarEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${enrichment.stellar.parallelism}
-
-    -   id: "hostEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "hostEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-
-    -   id: "simpleHBaseEnrichmentBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "simpleHBaseEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-
-    -   id: "enrichmentJoinBolt"
-        className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMaxCacheSize"
-                args: [${enrichment.join.cache.size}]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${enrichment.join.parallelism}
-
-    -   id: "enrichmentErrorOutputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMessageWriter"
-                args:
-                    - ref: "enrichmentErrorKafkaWriter"
-
-
-# Threat Intel Bolts
-    -   id: "threatIntelSplitBolt"
-        className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichments"
-                args:
-                    - ref: "threatIntels"
-            -   name: "withMessageFieldName"
-                args: ["message"]
-        parallelism: ${threat.intel.split.parallelism}
-
-    -   id: "simpleHBaseThreatIntelBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "simpleHBaseThreatIntelEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-    -   id: "stellarThreatIntelBolt"
-        className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withEnrichment"
-                args:
-                    - ref: "stellarThreatIntelEnrichment"
-            -   name: "withMaxCacheSize"
-                args: [10000]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${threat.intel.stellar.parallelism}
-
-    -   id: "threatIntelJoinBolt"
-        className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMaxCacheSize"
-                args: [${threat.intel.join.cache.size}]
-            -   name: "withMaxTimeRetain"
-                args: [10]
-        parallelism: ${threat.intel.join.parallelism}
-
-    -   id: "threatIntelErrorOutputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMessageWriter"
-                args:
-                    - ref: "threatIntelErrorKafkaWriter"
-
-# Indexing Bolts
-    -   id: "outputBolt"
-        className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt"
-        constructorArgs:
-            - "${kafka.zk}"
-        configMethods:
-            -   name: "withMessageWriter"
-                args:
-                    - ref: "kafkaWriter"
-        parallelism: ${kafka.writer.parallelism}
-
-
-streams:
-#parser
-    -   name: "spout -> enrichmentSplit"
-        from: "kafkaSpout"
-        to: "enrichmentSplitBolt"
-        grouping:
-            type: LOCAL_OR_SHUFFLE
-
-#enrichment
-    -   name: "enrichmentSplit -> host"
-        from: "enrichmentSplitBolt"
-        to: "hostEnrichmentBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "enrichmentSplit -> geo"
-        from: "enrichmentSplitBolt"
-        to: "geoEnrichmentBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "enrichmentSplit -> stellar"
-        from: "enrichmentSplitBolt"
-        to: "stellarEnrichmentBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["message"]
-
-
-    -   name: "enrichmentSplit -> simpleHBaseEnrichmentBolt"
-        from: "enrichmentSplitBolt"
-        to: "simpleHBaseEnrichmentBolt"
-        grouping:
-            streamId: "hbaseEnrichment"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "splitter -> join"
-        from: "enrichmentSplitBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "geo -> join"
-        from: "geoEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "geo"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "stellar -> join"
-        from: "stellarEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "simpleHBaseEnrichmentBolt -> join"
-        from: "simpleHBaseEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "hbaseEnrichment"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "host -> join"
-        from: "hostEnrichmentBolt"
-        to: "enrichmentJoinBolt"
-        grouping:
-            streamId: "host"
-            type: FIELDS
-            args: ["key"]
-
-    # Error output
-    -   name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "geoEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-    -   name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "stellarEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-    -   name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "hostEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-    -   name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt"
-        from: "simpleHBaseEnrichmentBolt"
-        to: "enrichmentErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-#threat intel
-    -   name: "enrichmentJoin -> threatSplit"
-        from: "enrichmentJoinBolt"
-        to: "threatIntelSplitBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "threatSplit -> simpleHBaseThreatIntel"
-        from: "threatIntelSplitBolt"
-        to: "simpleHBaseThreatIntelBolt"
-        grouping:
-            streamId: "hbaseThreatIntel"
-            type: FIELDS
-            args: ["message"]
-
-    -   name: "threatSplit -> stellarThreatIntel"
-        from: "threatIntelSplitBolt"
-        to: "stellarThreatIntelBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["message"]
-
-
-    -   name: "simpleHBaseThreatIntel -> join"
-        from: "simpleHBaseThreatIntelBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "hbaseThreatIntel"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "stellarThreatIntel -> join"
-        from: "stellarThreatIntelBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "stellar"
-            type: FIELDS
-            args: ["key"]
-
-    -   name: "threatIntelSplit -> threatIntelJoin"
-        from: "threatIntelSplitBolt"
-        to: "threatIntelJoinBolt"
-        grouping:
-            streamId: "message"
-            type: FIELDS
-            args: ["key"]
-#output
-    -   name: "threatIntelJoin -> output"
-        from: "threatIntelJoinBolt"
-        to: "outputBolt"
-        grouping:
-            streamId: "message"
-            type: LOCAL_OR_SHUFFLE
-
-    # Error output
-    -   name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt"
-        from: "simpleHBaseThreatIntelBolt"
-        to: "threatIntelErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-
-    -   name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt"
-        from: "stellarThreatIntelBolt"
-        to: "threatIntelErrorOutputBolt"
-        grouping:
-            streamId: "error"
-            type: LOCAL_OR_SHUFFLE
-

http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
index 6824b87..77c3a77 100755
--- a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
+++ b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh
@@ -1,5 +1,5 @@
 #!/bin/bash
-# 
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #     http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,4 +19,12 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/enrichment/remote.yaml
--filter $METRON_HOME/config/enrichment.properties
+
+# there are two enrichment topologies.  by default, the split-join enrichment topology is
executed
+SPLIT_JOIN_ARGS="--remote $METRON_HOME/flux/enrichment/remote-splitjoin.yaml --filter $METRON_HOME/config/enrichment-splitjoin.properties"
+UNIFIED_ARGS="--remote $METRON_HOME/flux/enrichment/remote-unified.yaml --filter $METRON_HOME/config/enrichment-unified.properties"
+
+# by passing in different args, the user can execute an alternative enrichment topology
+ARGS=${@:-$SPLIT_JOIN_ARGS}
+
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux $ARGS

http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index 3c55c95..2e22eab 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -65,16 +65,19 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+/**
+ * Integration test for the 'Split-Join' enrichment topology.
+ */
 public class EnrichmentIntegrationTest extends BaseIntegrationTest {
-  private static final String ERROR_TOPIC = "enrichment_error";
-  private static final String SRC_IP = "ip_src_addr";
-  private static final String DST_IP = "ip_dst_addr";
-  private static final String MALICIOUS_IP_TYPE = "malicious_ip";
-  private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
-  private static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String,
Object>() {{
+
+  public static final String ERROR_TOPIC = "enrichment_error";
+  public static final String SRC_IP = "ip_src_addr";
+  public static final String DST_IP = "ip_dst_addr";
+  public static final String MALICIOUS_IP_TYPE = "malicious_ip";
+  public static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification";
+  public static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String,
Object>() {{
     put("orientation", "north");
   }};
-
   public static final String DEFAULT_COUNTRY = "test country";
   public static final String DEFAULT_CITY = "test city";
   public static final String DEFAULT_POSTAL_CODE = "test postalCode";
@@ -82,15 +85,18 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
   public static final String DEFAULT_LONGITUDE = "test longitude";
   public static final String DEFAULT_DMACODE= "test dmaCode";
   public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE);
+  public static final String cf = "cf";
+  public static final String trackerHBaseTableName = "tracker";
+  public static final String threatIntelTableName = "threat_intel";
+  public static final String enrichmentsTableName = "enrichments";
 
-  protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2";
   protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed";
   private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath);
 
   private static File geoHdfsFile;
 
   protected String fluxPath() {
-    return "../metron-enrichment/src/main/flux/enrichment/remote.yaml";
+    return "../metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml";
   }
 
   private static List<byte[]> getInputMessages(String path){
@@ -115,13 +121,22 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz");
   }
 
-  @Test
-  public void test() throws Exception {
-    final String cf = "cf";
-    final String trackerHBaseTableName = "tracker";
-    final String threatIntelTableName = "threat_intel";
-    final String enrichmentsTableName = "enrichments";
-    final Properties topologyProperties = new Properties() {{
+  /**
+   * Returns the path to the topology properties template.
+   *
+   * @return The path to the topology properties template.
+   */
+  public String getTemplatePath() {
+    return "../metron-enrichment/src/main/config/enrichment-splitjoin.properties.j2";
+  }
+
+  /**
+   * Properties for the 'Split-Join' topology.
+   *
+   * @return The topology properties.
+   */
+  public Properties getTopologyProperties() {
+    return new Properties() {{
       setProperty("enrichment_workers", "1");
       setProperty("enrichment_acker_executors", "0");
       setProperty("enrichment_topology_worker_childopts", "");
@@ -142,11 +157,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
               "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\"
: \"important\"}," +
               "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\"
: \"important\"}," +
               "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\"
: \"important\"}]");
-
       setProperty("threatintel_hbase_table", threatIntelTableName);
       setProperty("threatintel_hbase_cf", cf);
-
-
       setProperty("enrichment_kafka_spout_parallelism", "1");
       setProperty("enrichment_split_parallelism", "1");
       setProperty("enrichment_stellar_parallelism", "1");
@@ -155,8 +167,13 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
       setProperty("threat_intel_stellar_parallelism", "1");
       setProperty("threat_intel_join_parallelism", "1");
       setProperty("kafka_writer_parallelism", "1");
-
     }};
+  }
+
+  @Test
+  public void test() throws Exception {
+
+    final Properties topologyProperties = getTopologyProperties();
     final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties);
     final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>()
{{
       add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
@@ -196,7 +213,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
     FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
             .withTopologyLocation(new File(fluxPath()))
             .withTopologyName("test")
-            .withTemplateLocation(new File(templatePath))
+            .withTemplateLocation(new File(getTemplatePath()))
             .withTopologyProperties(topologyProperties)
             .build();
 
@@ -531,7 +548,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest {
                     , message -> {
                       try {
                         return new HashMap<>(JSONUtils.INSTANCE.load(new String(message)
-                                , JSONUtils.MAP_SUPPLIER 
+                                , JSONUtils.MAP_SUPPLIER
                         )
                         );
                       } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
index 1f06733..5c19b39 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
+++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java
@@ -17,7 +17,78 @@
  */
 package org.apache.metron.enrichment.integration;
 
+import org.apache.metron.common.Constants;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+
+import java.util.Properties;
+
+/**
+ * Integration test for the 'Unified' enrichment topology.
+ */
 public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
+
+  /**
+   * Returns the path to the topology properties template.
+   *
+   * @return The path to the topology properties template.
+   */
+  public String getTemplatePath() {
+    return "../metron-enrichment/src/main/config/enrichment-unified.properties.j2";
+  }
+
+  /**
+   * Properties for the 'Unified' topology.
+   *
+   * @return The topology properties.
+   */
+  @Override
+  public Properties getTopologyProperties() {
+    return new Properties() {{
+
+      // storm
+      setProperty("enrichment_workers", "1");
+      setProperty("enrichment_acker_executors", "0");
+      setProperty("enrichment_topology_worker_childopts", "");
+      setProperty("topology_auto_credentials", "[]");
+      setProperty("enrichment_topology_max_spout_pending", "500");
+
+      // kafka - zookeeper_quorum, kafka_brokers set elsewhere
+      setProperty("kafka_security_protocol", "PLAINTEXT");
+      setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST");
+      setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC);
+      setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC);
+      setProperty("enrichment_error_topic", ERROR_TOPIC);
+      setProperty("threatintel_error_topic", ERROR_TOPIC);
+
+      // enrichment
+      setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName());
+      setProperty("enrichment_hbase_table", enrichmentsTableName);
+      setProperty("enrichment_hbase_cf", cf);
+      setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\",
\"type\":\"webserver\", \"asset_value\" : \"important\"}," +
+              "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\"
: \"important\"}," +
+              "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\"
: \"important\"}," +
+              "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\"
: \"important\"}]");
+
+      // threat intel
+      setProperty("threatintel_hbase_table", threatIntelTableName);
+      setProperty("threatintel_hbase_cf", cf);
+
+      // parallelism
+      setProperty("unified_kafka_spout_parallelism", "1");
+      setProperty("unified_enrichment_parallelism", "1");
+      setProperty("unified_threat_intel_parallelism", "1");
+      setProperty("unified_kafka_writer_parallelism", "1");
+
+      // caches
+      setProperty("unified_enrichment_cache_size", "1000");
+      setProperty("unified_threat_intel_cache_size", "1000");
+
+      // threads
+      setProperty("unified_enrichment_threadpool_size", "1");
+      setProperty("unified_enrichment_threadpool_type", "FIXED");
+    }};
+  }
+
   @Override
   public String fluxPath() {
     return "../metron-enrichment/src/main/flux/enrichment/remote-unified.yaml";


Mime
View raw message