From commits-return-2810-archive-asf-public=cust-asf.ponee.io@metron.apache.org Tue Apr 17 15:44:58 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C2E7F180649 for ; Tue, 17 Apr 2018 15:44:56 +0200 (CEST) Received: (qmail 89291 invoked by uid 500); 17 Apr 2018 13:44:55 -0000 Mailing-List: contact commits-help@metron.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.apache.org Delivered-To: mailing list commits@metron.apache.org Received: (qmail 89281 invoked by uid 99); 17 Apr 2018 13:44:55 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Apr 2018 13:44:55 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ADDBCF32FD; Tue, 17 Apr 2018 13:44:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nickallen@apache.org To: commits@metron.apache.org Date: Tue, 17 Apr 2018 13:44:55 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] metron git commit: METRON-1499 Enable Configuration of Unified Enrichment Topology via Ambari (nickwallen) closes apache/metron#984 Repository: metron Updated Branches: refs/heads/master 3fcbf8b4e -> 82212ba81 http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml b/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml deleted file mode 100644 index fd7ceff..0000000 --- a/metron-platform/metron-enrichment/src/main/flux/enrichment/remote.yaml +++ /dev/null @@ -1,590 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -name: "enrichment" -config: - topology.workers: ${enrichment.workers} - topology.acker.executors: ${enrichment.acker.executors} - topology.worker.childopts: ${topology.worker.childopts} - topology.auto-credentials: ${topology.auto-credentials} - topology.max.spout.pending: ${topology.max.spout.pending} - -components: - -# Enrichment - - id: "stellarEnrichmentAdapter" - className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" - configMethods: - - name: "ofType" - args: - - "ENRICHMENT" - - # Any kafka props for the producer go here. - - id: "kafkaWriterProps" - className: "java.util.HashMap" - configMethods: - - name: "put" - args: - - "security.protocol" - - "${kafka.security.protocol}" - - - id: "stellarEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "stellar" - - ref: "stellarEnrichmentAdapter" - - - id: "geoEnrichmentAdapter" - className: "org.apache.metron.enrichment.adapters.geo.GeoAdapter" - - id: "geoEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "geo" - - ref: "geoEnrichmentAdapter" - - id: "hostEnrichmentAdapter" - className: "org.apache.metron.enrichment.adapters.host.HostFromJSONListAdapter" - constructorArgs: - - '${enrichment.host.known_hosts}' - - id: "hostEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "host" - - ref: "hostEnrichmentAdapter" - - - id: "simpleHBaseEnrichmentConfig" - className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseConfig" - configMethods: - - name: "withProviderImpl" - args: - - "${hbase.provider.impl}" - - name: "withHBaseTable" - args: - - "${enrichment.simple.hbase.table}" - - name: "withHBaseCF" - args: - - "${enrichment.simple.hbase.cf}" - - id: "simpleHBaseEnrichmentAdapter" - className: "org.apache.metron.enrichment.adapters.simplehbase.SimpleHBaseAdapter" - configMethods: - - name: "withConfig" - args: - - ref: "simpleHBaseEnrichmentConfig" - - id: "simpleHBaseEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "hbaseEnrichment" - - ref: "simpleHBaseEnrichmentAdapter" - - id: "enrichments" - className: "java.util.ArrayList" - configMethods: - - name: "add" - args: - - ref: "geoEnrichment" - - name: "add" - args: - - ref: "hostEnrichment" - - name: "add" - args: - - ref: "simpleHBaseEnrichment" - - name: "add" - args: - - ref: "stellarEnrichment" - - #enrichment error - - id: "enrichmentErrorKafkaWriter" - className: "org.apache.metron.writer.kafka.KafkaWriter" - configMethods: - - name: "withTopic" - args: - - "${enrichment.error.topic}" - - name: "withZkQuorum" - args: - - "${kafka.zk}" - - name: "withProducerConfigs" - args: - - ref: "kafkaWriterProps" - -# Threat Intel - - id: "stellarThreatIntelAdapter" - className: "org.apache.metron.enrichment.adapters.stellar.StellarAdapter" - configMethods: - - name: "ofType" - args: - - "THREAT_INTEL" - - id: "stellarThreatIntelEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "stellar" - - ref: "stellarThreatIntelAdapter" - - id: "simpleHBaseThreatIntelConfig" - className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelConfig" - configMethods: - - name: "withProviderImpl" - args: - - "${hbase.provider.impl}" - - name: "withTrackerHBaseTable" - args: - - "${threat.intel.tracker.table}" - - name: "withTrackerHBaseCF" - args: - - "${threat.intel.tracker.cf}" - - name: "withHBaseTable" - args: - - "${threat.intel.simple.hbase.table}" - - name: "withHBaseCF" - args: - - "${threat.intel.simple.hbase.cf}" - - id: "simpleHBaseThreatIntelAdapter" - className: "org.apache.metron.enrichment.adapters.threatintel.ThreatIntelAdapter" - configMethods: - - name: "withConfig" - args: - - ref: "simpleHBaseThreatIntelConfig" - - id: "simpleHBaseThreatIntelEnrichment" - className: "org.apache.metron.enrichment.configuration.Enrichment" - constructorArgs: - - "hbaseThreatIntel" - - ref: "simpleHBaseThreatIntelAdapter" - - - id: "threatIntels" - className: "java.util.ArrayList" - configMethods: - - name: "add" - args: - - ref: "simpleHBaseThreatIntelEnrichment" - - name: "add" - args: - - ref: "stellarThreatIntelEnrichment" - - #threatintel error - - id: "threatIntelErrorKafkaWriter" - className: "org.apache.metron.writer.kafka.KafkaWriter" - configMethods: - - name: "withTopic" - args: - - "${threat.intel.error.topic}" - - name: "withZkQuorum" - args: - - "${kafka.zk}" - - name: "withProducerConfigs" - args: - - ref: "kafkaWriterProps" -#indexing - - id: "kafkaWriter" - className: "org.apache.metron.writer.kafka.KafkaWriter" - configMethods: - - name: "withTopic" - args: - - "${enrichment.output.topic}" - - name: "withZkQuorum" - args: - - "${kafka.zk}" - - name: "withProducerConfigs" - args: - - ref: "kafkaWriterProps" - -#kafka/zookeeper - # Any kafka props for the consumer go here. - - id: "kafkaProps" - className: "java.util.HashMap" - configMethods: - - name: "put" - args: - - "value.deserializer" - - "org.apache.kafka.common.serialization.ByteArrayDeserializer" - - name: "put" - args: - - "key.deserializer" - - "org.apache.kafka.common.serialization.ByteArrayDeserializer" - - name: "put" - args: - - "group.id" - - "enrichments" - - name: "put" - args: - - "security.protocol" - - "${kafka.security.protocol}" - - - # The fields to pull out of the kafka messages - - id: "fields" - className: "java.util.ArrayList" - configMethods: - - name: "add" - args: - - "value" - - - id: "kafkaConfig" - className: "org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder" - constructorArgs: - - ref: "kafkaProps" - # topic name - - "${enrichment.input.topic}" - - "${kafka.zk}" - - ref: "fields" - configMethods: - - name: "setFirstPollOffsetStrategy" - args: - - "${kafka.start}" - - -spouts: - - id: "kafkaSpout" - className: "org.apache.metron.storm.kafka.flux.StormKafkaSpout" - constructorArgs: - - ref: "kafkaConfig" - parallelism: ${kafka.spout.parallelism} - -bolts: -# Enrichment Bolts - - id: "enrichmentSplitBolt" - className: "org.apache.metron.enrichment.bolt.EnrichmentSplitterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichments" - args: - - ref: "enrichments" - parallelism: ${enrichment.split.parallelism} - - - id: "geoEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "geoEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - - id: "stellarEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "stellarEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - parallelism: ${enrichment.stellar.parallelism} - - - id: "hostEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "hostEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - - id: "simpleHBaseEnrichmentBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "simpleHBaseEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - - id: "enrichmentJoinBolt" - className: "org.apache.metron.enrichment.bolt.EnrichmentJoinBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMaxCacheSize" - args: [${enrichment.join.cache.size}] - - name: "withMaxTimeRetain" - args: [10] - parallelism: ${enrichment.join.parallelism} - - - id: "enrichmentErrorOutputBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMessageWriter" - args: - - ref: "enrichmentErrorKafkaWriter" - - -# Threat Intel Bolts - - id: "threatIntelSplitBolt" - className: "org.apache.metron.enrichment.bolt.ThreatIntelSplitterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichments" - args: - - ref: "threatIntels" - - name: "withMessageFieldName" - args: ["message"] - parallelism: ${threat.intel.split.parallelism} - - - id: "simpleHBaseThreatIntelBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "simpleHBaseThreatIntelEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - - id: "stellarThreatIntelBolt" - className: "org.apache.metron.enrichment.bolt.GenericEnrichmentBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withEnrichment" - args: - - ref: "stellarThreatIntelEnrichment" - - name: "withMaxCacheSize" - args: [10000] - - name: "withMaxTimeRetain" - args: [10] - parallelism: ${threat.intel.stellar.parallelism} - - - id: "threatIntelJoinBolt" - className: "org.apache.metron.enrichment.bolt.ThreatIntelJoinBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMaxCacheSize" - args: [${threat.intel.join.cache.size}] - - name: "withMaxTimeRetain" - args: [10] - parallelism: ${threat.intel.join.parallelism} - - - id: "threatIntelErrorOutputBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMessageWriter" - args: - - ref: "threatIntelErrorKafkaWriter" - -# Indexing Bolts - - id: "outputBolt" - className: "org.apache.metron.writer.bolt.BulkMessageWriterBolt" - constructorArgs: - - "${kafka.zk}" - configMethods: - - name: "withMessageWriter" - args: - - ref: "kafkaWriter" - parallelism: ${kafka.writer.parallelism} - - -streams: -#parser - - name: "spout -> enrichmentSplit" - from: "kafkaSpout" - to: "enrichmentSplitBolt" - grouping: - type: LOCAL_OR_SHUFFLE - -#enrichment - - name: "enrichmentSplit -> host" - from: "enrichmentSplitBolt" - to: "hostEnrichmentBolt" - grouping: - streamId: "host" - type: FIELDS - args: ["message"] - - - name: "enrichmentSplit -> geo" - from: "enrichmentSplitBolt" - to: "geoEnrichmentBolt" - grouping: - streamId: "geo" - type: FIELDS - args: ["message"] - - - name: "enrichmentSplit -> stellar" - from: "enrichmentSplitBolt" - to: "stellarEnrichmentBolt" - grouping: - streamId: "stellar" - type: FIELDS - args: ["message"] - - - - name: "enrichmentSplit -> simpleHBaseEnrichmentBolt" - from: "enrichmentSplitBolt" - to: "simpleHBaseEnrichmentBolt" - grouping: - streamId: "hbaseEnrichment" - type: FIELDS - args: ["message"] - - - name: "splitter -> join" - from: "enrichmentSplitBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "message" - type: FIELDS - args: ["key"] - - - name: "geo -> join" - from: "geoEnrichmentBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "geo" - type: FIELDS - args: ["key"] - - - name: "stellar -> join" - from: "stellarEnrichmentBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "stellar" - type: FIELDS - args: ["key"] - - - name: "simpleHBaseEnrichmentBolt -> join" - from: "simpleHBaseEnrichmentBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "hbaseEnrichment" - type: FIELDS - args: ["key"] - - - name: "host -> join" - from: "hostEnrichmentBolt" - to: "enrichmentJoinBolt" - grouping: - streamId: "host" - type: FIELDS - args: ["key"] - - # Error output - - name: "geoEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "geoEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: LOCAL_OR_SHUFFLE - - - name: "stellarEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "stellarEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: LOCAL_OR_SHUFFLE - - - name: "hostEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "hostEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: LOCAL_OR_SHUFFLE - - - name: "simpleHBaseEnrichmentBolt -> enrichmentErrorOutputBolt" - from: "simpleHBaseEnrichmentBolt" - to: "enrichmentErrorOutputBolt" - grouping: - streamId: "error" - type: LOCAL_OR_SHUFFLE - -#threat intel - - name: "enrichmentJoin -> threatSplit" - from: "enrichmentJoinBolt" - to: "threatIntelSplitBolt" - grouping: - streamId: "message" - type: FIELDS - args: ["key"] - - - name: "threatSplit -> simpleHBaseThreatIntel" - from: "threatIntelSplitBolt" - to: "simpleHBaseThreatIntelBolt" - grouping: - streamId: "hbaseThreatIntel" - type: FIELDS - args: ["message"] - - - name: "threatSplit -> stellarThreatIntel" - from: "threatIntelSplitBolt" - to: "stellarThreatIntelBolt" - grouping: - streamId: "stellar" - type: FIELDS - args: ["message"] - - - - name: "simpleHBaseThreatIntel -> join" - from: "simpleHBaseThreatIntelBolt" - to: "threatIntelJoinBolt" - grouping: - streamId: "hbaseThreatIntel" - type: FIELDS - args: ["key"] - - - name: "stellarThreatIntel -> join" - from: "stellarThreatIntelBolt" - to: "threatIntelJoinBolt" - grouping: - streamId: "stellar" - type: FIELDS - args: ["key"] - - - name: "threatIntelSplit -> threatIntelJoin" - from: "threatIntelSplitBolt" - to: "threatIntelJoinBolt" - grouping: - streamId: "message" - type: FIELDS - args: ["key"] -#output - - name: "threatIntelJoin -> output" - from: "threatIntelJoinBolt" - to: "outputBolt" - grouping: - streamId: "message" - type: LOCAL_OR_SHUFFLE - - # Error output - - name: "simpleHBaseThreatIntelBolt -> threatIntelErrorOutputBolt" - from: "simpleHBaseThreatIntelBolt" - to: "threatIntelErrorOutputBolt" - grouping: - streamId: "error" - type: LOCAL_OR_SHUFFLE - - - name: "stellarThreatIntelBolt -> threatIntelErrorOutputBolt" - from: "stellarThreatIntelBolt" - to: "threatIntelErrorOutputBolt" - grouping: - streamId: "error" - type: LOCAL_OR_SHUFFLE - http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh index 6824b87..77c3a77 100755 --- a/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh +++ b/metron-platform/metron-enrichment/src/main/scripts/start_enrichment_topology.sh @@ -1,5 +1,5 @@ #!/bin/bash -# +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,4 +19,12 @@ METRON_VERSION=${project.version} METRON_HOME=/usr/metron/$METRON_VERSION TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar -storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/enrichment/remote.yaml --filter $METRON_HOME/config/enrichment.properties + +# there are two enrichment topologies. by default, the split-join enrichment topology is executed +SPLIT_JOIN_ARGS="--remote $METRON_HOME/flux/enrichment/remote-splitjoin.yaml --filter $METRON_HOME/config/enrichment-splitjoin.properties" +UNIFIED_ARGS="--remote $METRON_HOME/flux/enrichment/remote-unified.yaml --filter $METRON_HOME/config/enrichment-unified.properties" + +# by passing in different args, the user can execute an alternative enrichment topology +ARGS=${@:-$SPLIT_JOIN_ARGS} + +storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux $ARGS http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java index 3c55c95..2e22eab 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java @@ -65,16 +65,19 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +/** + * Integration test for the 'Split-Join' enrichment topology. + */ public class EnrichmentIntegrationTest extends BaseIntegrationTest { - private static final String ERROR_TOPIC = "enrichment_error"; - private static final String SRC_IP = "ip_src_addr"; - private static final String DST_IP = "ip_dst_addr"; - private static final String MALICIOUS_IP_TYPE = "malicious_ip"; - private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification"; - private static final Map PLAYFUL_ENRICHMENT = new HashMap() {{ + + public static final String ERROR_TOPIC = "enrichment_error"; + public static final String SRC_IP = "ip_src_addr"; + public static final String DST_IP = "ip_dst_addr"; + public static final String MALICIOUS_IP_TYPE = "malicious_ip"; + public static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification"; + public static final Map PLAYFUL_ENRICHMENT = new HashMap() {{ put("orientation", "north"); }}; - public static final String DEFAULT_COUNTRY = "test country"; public static final String DEFAULT_CITY = "test city"; public static final String DEFAULT_POSTAL_CODE = "test postalCode"; @@ -82,15 +85,18 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { public static final String DEFAULT_LONGITUDE = "test longitude"; public static final String DEFAULT_DMACODE= "test dmaCode"; public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE); + public static final String cf = "cf"; + public static final String trackerHBaseTableName = "tracker"; + public static final String threatIntelTableName = "threat_intel"; + public static final String enrichmentsTableName = "enrichments"; - protected String templatePath = "../metron-enrichment/src/main/config/enrichment.properties.j2"; protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed"; private final List inputMessages = getInputMessages(sampleParsedPath); private static File geoHdfsFile; protected String fluxPath() { - return "../metron-enrichment/src/main/flux/enrichment/remote.yaml"; + return "../metron-enrichment/src/main/flux/enrichment/remote-splitjoin.yaml"; } private static List getInputMessages(String path){ @@ -115,13 +121,22 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { geoHdfsFile = new File(new File(baseDir), "GeoIP2-City-Test.mmdb.gz"); } - @Test - public void test() throws Exception { - final String cf = "cf"; - final String trackerHBaseTableName = "tracker"; - final String threatIntelTableName = "threat_intel"; - final String enrichmentsTableName = "enrichments"; - final Properties topologyProperties = new Properties() {{ + /** + * Returns the path to the topology properties template. + * + * @return The path to the topology properties template. + */ + public String getTemplatePath() { + return "../metron-enrichment/src/main/config/enrichment-splitjoin.properties.j2"; + } + + /** + * Properties for the 'Split-Join' topology. + * + * @return The topology properties. + */ + public Properties getTopologyProperties() { + return new Properties() {{ setProperty("enrichment_workers", "1"); setProperty("enrichment_acker_executors", "0"); setProperty("enrichment_topology_worker_childopts", ""); @@ -142,11 +157,8 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," + "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," + "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]"); - setProperty("threatintel_hbase_table", threatIntelTableName); setProperty("threatintel_hbase_cf", cf); - - setProperty("enrichment_kafka_spout_parallelism", "1"); setProperty("enrichment_split_parallelism", "1"); setProperty("enrichment_stellar_parallelism", "1"); @@ -155,8 +167,13 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { setProperty("threat_intel_stellar_parallelism", "1"); setProperty("threat_intel_join_parallelism", "1"); setProperty("kafka_writer_parallelism", "1"); - }}; + } + + @Test + public void test() throws Exception { + + final Properties topologyProperties = getTopologyProperties(); final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList() {{ add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); @@ -196,7 +213,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() .withTopologyLocation(new File(fluxPath())) .withTopologyName("test") - .withTemplateLocation(new File(templatePath)) + .withTemplateLocation(new File(getTemplatePath())) .withTopologyProperties(topologyProperties) .build(); @@ -531,7 +548,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { , message -> { try { return new HashMap<>(JSONUtils.INSTANCE.load(new String(message) - , JSONUtils.MAP_SUPPLIER + , JSONUtils.MAP_SUPPLIER ) ); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/metron/blob/82212ba8/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java index 1f06733..5c19b39 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/UnifiedEnrichmentIntegrationTest.java @@ -17,7 +17,78 @@ */ package org.apache.metron.enrichment.integration; +import org.apache.metron.common.Constants; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; + +import java.util.Properties; + +/** + * Integration test for the 'Unified' enrichment topology. + */ public class UnifiedEnrichmentIntegrationTest extends EnrichmentIntegrationTest { + + /** + * Returns the path to the topology properties template. + * + * @return The path to the topology properties template. + */ + public String getTemplatePath() { + return "../metron-enrichment/src/main/config/enrichment-unified.properties.j2"; + } + + /** + * Properties for the 'Unified' topology. + * + * @return The topology properties. + */ + @Override + public Properties getTopologyProperties() { + return new Properties() {{ + + // storm + setProperty("enrichment_workers", "1"); + setProperty("enrichment_acker_executors", "0"); + setProperty("enrichment_topology_worker_childopts", ""); + setProperty("topology_auto_credentials", "[]"); + setProperty("enrichment_topology_max_spout_pending", "500"); + + // kafka - zookeeper_quorum, kafka_brokers set elsewhere + setProperty("kafka_security_protocol", "PLAINTEXT"); + setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST"); + setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC); + setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC); + setProperty("enrichment_error_topic", ERROR_TOPIC); + setProperty("threatintel_error_topic", ERROR_TOPIC); + + // enrichment + setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName()); + setProperty("enrichment_hbase_table", enrichmentsTableName); + setProperty("enrichment_hbase_cf", cf); + setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," + + "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," + + "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," + + "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]"); + + // threat intel + setProperty("threatintel_hbase_table", threatIntelTableName); + setProperty("threatintel_hbase_cf", cf); + + // parallelism + setProperty("unified_kafka_spout_parallelism", "1"); + setProperty("unified_enrichment_parallelism", "1"); + setProperty("unified_threat_intel_parallelism", "1"); + setProperty("unified_kafka_writer_parallelism", "1"); + + // caches + setProperty("unified_enrichment_cache_size", "1000"); + setProperty("unified_threat_intel_cache_size", "1000"); + + // threads + setProperty("unified_enrichment_threadpool_size", "1"); + setProperty("unified_enrichment_threadpool_type", "FIXED"); + }}; + } + @Override public String fluxPath() { return "../metron-enrichment/src/main/flux/enrichment/remote-unified.yaml";