From commits-return-2832-archive-asf-public=cust-asf.ponee.io@metron.apache.org Wed Apr 18 16:59:36 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 1F6D21807BC for ; Wed, 18 Apr 2018 16:59:32 +0200 (CEST) Received: (qmail 92940 invoked by uid 500); 18 Apr 2018 14:59:32 -0000 Mailing-List: contact commits-help@metron.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@metron.apache.org Delivered-To: mailing list commits@metron.apache.org Received: (qmail 92813 invoked by uid 99); 18 Apr 2018 14:59:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2018 14:59:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4DEFBF68E8; Wed, 18 Apr 2018 14:59:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: otto@apache.org To: commits@metron.apache.org Date: Wed, 18 Apr 2018 14:59:48 -0000 Message-Id: In-Reply-To: <01a9dec83c064e4abd2c287d4c4baab7@git.apache.org> References: <01a9dec83c064e4abd2c287d4c4baab7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/52] [abbrv] metron git commit: METRON-590 Enable Use of Event Time in Profiler (nickwallen) closes apache/metron#965 http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index ceb9e4e..ccce022 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -287,6 +287,8 @@ profiler_input_topic = config['configurations']['metron-enrichment-env']['enrich profiler_kafka_start = config['configurations']['metron-profiler-env']['profiler_kafka_start'] profiler_period_duration = config['configurations']['metron-profiler-env']['profiler_period_duration'] profiler_period_units = config['configurations']['metron-profiler-env']['profiler_period_units'] +profiler_window_duration = config['configurations']['metron-profiler-env']['profiler_window_duration'] +profiler_window_units = config['configurations']['metron-profiler-env']['profiler_window_units'] profiler_ttl = config['configurations']['metron-profiler-env']['profiler_ttl'] profiler_ttl_units = config['configurations']['metron-profiler-env']['profiler_ttl_units'] profiler_hbase_batch = config['configurations']['metron-profiler-env']['profiler_hbase_batch'] @@ -302,6 +304,11 @@ profiler_hbase_acl_configured_flag_file = status_params.profiler_hbase_acl_confi if not len(profiler_topology_worker_childopts) == 0: profiler_topology_worker_childopts += ' ' profiler_topology_worker_childopts += config['configurations']['metron-profiler-env']['profiler_topology_worker_childopts'] +profiler_max_routes_per_bolt=config['configurations']['metron-profiler-env']['profiler_max_routes_per_bolt'] +profiler_window_lag=config['configurations']['metron-profiler-env']['profiler_window_lag'] +profiler_window_lag_units=config['configurations']['metron-profiler-env']['profiler_window_lag_units'] +profiler_topology_message_timeout_secs=config['configurations']['metron-profiler-env']['profiler_topology_message_timeout_secs'] +profiler_topology_max_spout_pending=config['configurations']['metron-profiler-env']['profiler_topology_max_spout_pending'] # Indexing ra_indexing_kafka_start = config['configurations']['metron-indexing-env']['ra_indexing_kafka_start'] http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 index 06fd209..fabdaa7 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 @@ -22,6 +22,10 @@ topology.worker.childopts={{profiler_topology_worker_childopts}} topology.auto-credentials={{topology_auto_credentials}} +profiler.workers={{profiler_topology_workers}} +profiler.executors={{profiler_acker_executors}} +topology.message.timeout.secs={{profiler_topology_message_timeout_secs}} +topology.max.spout.pending={{profiler_topology_max_spout_pending}} ##### Profiler ##### @@ -29,10 +33,16 @@ profiler.input.topic={{enrichment_output_topic}} profiler.output.topic={{enrichment_input_topic}} profiler.period.duration={{profiler_period_duration}} profiler.period.duration.units={{profiler_period_units}} -profiler.workers={{profiler_topology_workers}} -profiler.executors={{profiler_acker_executors}} +profiler.window.duration={{profiler_window_duration}} +profiler.window.duration.units={{profiler_window_units}} profiler.ttl={{profiler_ttl}} profiler.ttl.units={{profiler_ttl_units}} +profiler.window.lag={{profiler_window_lag}} +profiler.window.lag.units={{profiler_window_lag_units}} +profiler.max.routes.per.bolt={{profiler_max_routes_per_bolt}} + +##### HBase ##### + profiler.hbase.salt.divisor=1000 profiler.hbase.table={{profiler_hbase_table}} profiler.hbase.column.family={{profiler_hbase_cf}} @@ -43,6 +53,5 @@ profiler.hbase.flush.interval.seconds={{profiler_hbase_flush_interval}} kafka.zk={{zookeeper_quorum}} kafka.broker={{kafka_brokers}} -# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST kafka.start={{profiler_kafka_start}} kafka.security.protocol={{kafka_security_protocol}} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json index cef9a3b..234b551 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json @@ -221,8 +221,27 @@ "tab-rows": "3", "sections": [ { + "name": "section-profiler-setup", + "row-index": "0", + "column-index": "0", + "row-span": "1", + "column-span": "1", + "section-columns": "1", + "section-rows": "1", + "subsections": [ + { + "name": "subsection-profiler-setup", + "display-name": "Profiler Setup", + "row-index": "0", + "column-index": "0", + "row-span": "1", + "column-span": "1" + } + ] + }, + { "name": "section-profiler-kafka", - "row-index": "0", + "row-index": "1", "column-index": "0", "row-span": "1", "column-span": "1", @@ -240,8 +259,8 @@ ] }, { - "name": "section-profiler-setup", - "row-index": "1", + "name": "section-profiler-storm", + "row-index": "2", "column-index": "0", "row-span": "1", "column-span": "1", @@ -249,8 +268,8 @@ "section-rows": "1", "subsections": [ { - "name": "subsection-profiler-setup", - "display-name": "Profiler Setup", + "name": "subsection-profiler-storm", + "display-name": "Storm", "row-index": "0", "column-index": "0", "row-span": "1", @@ -259,8 +278,8 @@ ] }, { - "name": "section-profiler-storm", - "row-index": "2", + "name": "section-profiler-hbase", + "row-index": "3", "column-index": "0", "row-span": "1", "column-span": "1", @@ -268,8 +287,8 @@ "section-rows": "1", "subsections": [ { - "name": "subsection-profiler-storm", - "display-name": "Storm", + "name": "subsection-profiler-hbase", + "display-name": "HBase", "row-index": "0", "column-index": "0", "row-span": "1", @@ -568,7 +587,6 @@ "config": "metron-indexing-env/bolt_hdfs_rotation_policy_count", "subsection-name": "subsection-indexing-hdfs" }, - { "config": "metron-profiler-env/profiler_kafka_start", "subsection-name": "subsection-profiler-kafka" @@ -582,6 +600,14 @@ "subsection-name": "subsection-profiler-setup" }, { + "config": "metron-profiler-env/profiler_window_duration", + "subsection-name": "subsection-profiler-setup" + }, + { + "config": "metron-profiler-env/profiler_window_units", + "subsection-name": "subsection-profiler-setup" + }, + { "config": "metron-profiler-env/profiler_ttl", "subsection-name": "subsection-profiler-setup" }, @@ -590,20 +616,32 @@ "subsection-name": "subsection-profiler-setup" }, { - "config": "metron-profiler-env/profiler_hbase_table", + "config": "metron-profiler-env/profiler_window_lag", "subsection-name": "subsection-profiler-setup" }, { - "config": "metron-profiler-env/profiler_hbase_cf", + "config": "metron-profiler-env/profiler_window_lag_units", "subsection-name": "subsection-profiler-setup" }, { - "config": "metron-profiler-env/profiler_hbase_batch", + "config": "metron-profiler-env/profiler_max_routes_per_bolt", "subsection-name": "subsection-profiler-setup" }, { + "config": "metron-profiler-env/profiler_hbase_table", + "subsection-name": "subsection-profiler-hbase" + }, + { + "config": "metron-profiler-env/profiler_hbase_cf", + "subsection-name": "subsection-profiler-hbase" + }, + { + "config": "metron-profiler-env/profiler_hbase_batch", + "subsection-name": "subsection-profiler-hbase" + }, + { "config": "metron-profiler-env/profiler_hbase_flush_interval", - "subsection-name": "subsection-profiler-setup" + "subsection-name": "subsection-profiler-hbase" }, { "config": "metron-profiler-env/profiler_topology_worker_childopts", @@ -618,6 +656,14 @@ "subsection-name": "subsection-profiler-storm" }, { + "config": "metron-profiler-env/profiler_topology_message_timeout_secs", + "subsection-name": "subsection-profiler-storm" + }, + { + "config": "metron-profiler-env/profiler_topology_max_spout_pending", + "subsection-name": "subsection-profiler-storm" + }, + { "config": "metron-rest-env/metron_rest_port", "subsection-name": "subsection-rest" }, @@ -905,7 +951,6 @@ "type": "text-field" } }, - { "config": "metron-indexing-env/batch_indexing_acker_executors", "widget": { @@ -1004,6 +1049,18 @@ } }, { + "config": "metron-profiler-env/profiler_window_duration", + "widget": { + "type": "text-field" + } + }, + { + "config": "metron-profiler-env/profiler_window_units", + "widget": { + "type": "combo" + } + }, + { "config": "metron-profiler-env/profiler_ttl", "widget": { "type": "text-field" @@ -1016,6 +1073,24 @@ } }, { + "config": "metron-profiler-env/profiler_max_routes_per_bolt", + "widget": { + "type": "text-field" + } + }, + { + "config": "metron-profiler-env/profiler_window_lag", + "widget": { + "type": "text-field" + } + }, + { + "config": "metron-profiler-env/profiler_window_lag_units", + "widget": { + "type": "combo" + } + }, + { "config": "metron-profiler-env/profiler_hbase_table", "widget": { "type": "text-field" @@ -1057,7 +1132,18 @@ "type": "text-field" } }, - + { + "config": "metron-profiler-env/profiler_topology_max_spout_pending", + "widget": { + "type": "text-field" + } + }, + { + "config": "metron-profiler-env/profiler_topology_message_timeout_secs", + "widget": { + "type": "text-field" + } + }, { "config": "metron-rest-env/metron_rest_port", "widget": { http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java index 06c82d2..6205fbf 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileConfig.java @@ -89,6 +89,9 @@ public class ProfileConfig implements Serializable { */ private Long expires; + public ProfileConfig() { + } + /** * A profile definition requires at the very least the profile name, the foreach, and result * expressions. @@ -114,6 +117,11 @@ public class ProfileConfig implements Serializable { this.profile = profile; } + public ProfileConfig withProfile(String profile) { + this.profile = profile; + return this; + } + public String getForeach() { return foreach; } @@ -122,6 +130,11 @@ public class ProfileConfig implements Serializable { this.foreach = foreach; } + public ProfileConfig withForeach(String foreach) { + this.foreach = foreach; + return this; + } + public String getOnlyif() { return onlyif; } @@ -130,6 +143,11 @@ public class ProfileConfig implements Serializable { this.onlyif = onlyif; } + public ProfileConfig withOnlyif(String onlyif) { + this.onlyif = onlyif; + return this; + } + public Map getInit() { return init; } @@ -138,6 +156,16 @@ public class ProfileConfig implements Serializable { this.init = init; } + public ProfileConfig withInit(Map init) { + this.init.putAll(init); + return this; + } + + public ProfileConfig withInit(String var, String expression) { + this.init.put(var, expression); + return this; + } + public Map getUpdate() { return update; } @@ -146,6 +174,16 @@ public class ProfileConfig implements Serializable { this.update = update; } + public ProfileConfig withUpdate(Map update) { + this.update.putAll(update); + return this; + } + + public ProfileConfig withUpdate(String var, String expression) { + this.update.put(var, expression); + return this; + } + public List getGroupBy() { return groupBy; } @@ -154,6 +192,11 @@ public class ProfileConfig implements Serializable { this.groupBy = groupBy; } + public ProfileConfig withGroupBy(List groupBy) { + this.groupBy = groupBy; + return this; + } + public ProfileResult getResult() { return result; } @@ -162,6 +205,11 @@ public class ProfileConfig implements Serializable { this.result = result; } + public ProfileConfig withResult(String profileExpression) { + this.result = new ProfileResult(profileExpression); + return this; + } + public Long getExpires() { return expires; } @@ -170,6 +218,11 @@ public class ProfileConfig implements Serializable { this.expires = expiresDays; } + public ProfileConfig withExpires(Long expiresDays) { + this.expires = TimeUnit.DAYS.toMillis(expiresDays); + return this; + } + @Override public boolean equals(Object o) { if (this == o) return true; http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java index e7c081a..0bdb7e2 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java @@ -20,9 +20,10 @@ package org.apache.metron.common.configuration.profiler; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** - * The definition for entire Profiler, which may contain many Profile definitions. + * The configuration object for the Profiler, which may contain many Profile definitions. */ public class ProfilerConfig implements Serializable { @@ -31,6 +32,20 @@ public class ProfilerConfig implements Serializable { */ private List profiles = new ArrayList<>(); + /** + * The name of a field containing the timestamp that is used to + * generate profiles. + * + *

By default, the processing time of the Profiler is used rather + * than event time; a value contained within the message itself. + * + *

The field must contain a timestamp in epoch milliseconds. + * + *

If a message does NOT contain this field, it will be dropped + * and not included in any profiles. + */ + private Optional timestampField = Optional.empty(); + public List getProfiles() { return profiles; } @@ -39,10 +54,33 @@ public class ProfilerConfig implements Serializable { this.profiles = profiles; } + public ProfilerConfig withProfile(ProfileConfig profileConfig) { + this.profiles.add(profileConfig); + return this; + } + + public Optional getTimestampField() { + return timestampField; + } + + public void setTimestampField(String timestampField) { + this.timestampField = Optional.of(timestampField); + } + + public void setTimestampField(Optional timestampField) { + this.timestampField = timestampField; + } + + public ProfilerConfig withTimestampField(Optional timestampField) { + this.timestampField = timestampField; + return this; + } + @Override public String toString() { return "ProfilerConfig{" + "profiles=" + profiles + + ", timestampField='" + timestampField + '\'' + '}'; } @@ -50,13 +88,15 @@ public class ProfilerConfig implements Serializable { public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - ProfilerConfig that = (ProfilerConfig) o; - return profiles != null ? profiles.equals(that.profiles) : that.profiles == null; + if (profiles != null ? !profiles.equals(that.profiles) : that.profiles != null) return false; + return timestampField != null ? timestampField.equals(that.timestampField) : that.timestampField == null; } @Override public int hashCode() { - return profiles != null ? profiles.hashCode() : 0; + int result = profiles != null ? profiles.hashCode() : 0; + result = 31 * result + (timestampField != null ? timestampField.hashCode() : 0); + return result; } } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java index c02f19d..02e6015 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java @@ -24,6 +24,10 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.flipkart.zjsonpatch.JsonPatch; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; + import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; @@ -31,17 +35,10 @@ import java.io.IOException; import java.io.InputStream; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Supplier; -import com.google.common.reflect.TypeToken; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; - public enum JSONUtils { INSTANCE; http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java index 68c5203..4976d30 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.function.Supplier; public class ProfilerUpdater extends ConfigurationsUpdater { + public ProfilerUpdater(Reloadable reloadable, Supplier configSupplier) { super(reloadable, configSupplier); } http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java index a0e115d..e178ee0 100644 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfileConfigTest.java @@ -27,10 +27,11 @@ import org.junit.Test; import java.io.IOException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; /** - * Ensures that Profile definitions have the expected defaults + * Tests the {@link ProfileConfig} class. + * + * Ensures that profile definitions have the expected defaults * and can be (de)serialized to and from JSON. */ public class ProfileConfigTest { http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java new file mode 100644 index 0000000..2e73cde --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/configuration/profiler/ProfilerConfigTest.java @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.metron.common.configuration.profiler; + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.common.utils.JSONUtils; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests the {@link ProfilerConfig} class. + */ +public class ProfilerConfigTest { + + /** + * { + * "profiles": [ + * { + * "profile": "profile1", + * "foreach": "ip_src_addr", + * "init": { "count": "0" }, + * "update": { "count": "count + 1" }, + * "result": "count" + * } + * ] + * } + */ + @Multiline + private String noTimestampField; + + /** + * If no 'timestampField' is defined, it should not be present by default. + */ + @Test + public void testNoTimestampField() throws IOException { + ProfilerConfig conf = JSONUtils.INSTANCE.load(noTimestampField, ProfilerConfig.class); + assertFalse(conf.getTimestampField().isPresent()); + } + + /** + * { + * "profiles": [ + * { + * "profile": "profile1", + * "foreach": "ip_src_addr", + * "init": { "count": "0" }, + * "update": { "count": "count + 1" }, + * "result": "count" + * } + * ], + * "timestampField": "timestamp" + * } + */ + @Multiline + private String timestampField; + + /** + * If no 'timestampField' is defined, it should not be present by default. + */ + @Test + public void testTimestampField() throws IOException { + ProfilerConfig conf = JSONUtils.INSTANCE.load(timestampField, ProfilerConfig.class); + assertTrue(conf.getTimestampField().isPresent()); + } + + /** + * { + * "profiles": [ + * { + * "profile": "profile1", + * "foreach": "ip_src_addr", + * "init": { "count": "0" }, + * "update": { "count": "count + 1" }, + * "result": "count" + * }, + * { + * "profile": "profile2", + * "foreach": "ip_dst_addr", + * "init": { "count": "0" }, + * "update": { "count": "count + 1" }, + * "result": "count" + * } + * ] + * } + */ + @Multiline + private String twoProfiles; + + /** + * The 'onlyif' field should default to 'true' when it is not specified. + */ + @Test + public void testTwoProfiles() throws IOException { + ProfilerConfig conf = JSONUtils.INSTANCE.load(twoProfiles, ProfilerConfig.class); + assertEquals(2, conf.getProfiles().size()); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/3083b471/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java index 9d8c57e..08910be 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/components/KafkaComponent.java @@ -30,6 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.logging.Level; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.common.TopicExistsException; @@ -48,6 +51,7 @@ import kafka.utils.Time; import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.metron.integration.InMemoryComponent; @@ -314,11 +318,44 @@ public class KafkaComponent implements InMemoryComponent { } } + /** + * Write a collection of messages to a Kafka topic. + * + * @param topic The name of the Kafka topic. + * @param messages The collection of messages to write. + */ public void writeMessages(String topic, Collection messages) { try(KafkaProducer kafkaProducer = createProducer()) { for (byte[] message : messages) { - kafkaProducer.send(new ProducerRecord(topic, message)); + kafkaProducer.send(new ProducerRecord<>(topic, message)); } } } + + /** + * Write messages to a Kafka topic. + * + * @param topic The name of the Kafka topic. + * @param messages The messages to write. + */ + public void writeMessages(String topic, String ...messages) { + + // convert each message to raw bytes + List messagesAsBytes = Stream.of(messages) + .map(Bytes::toBytes) + .collect(Collectors.toList()); + + writeMessages(topic, messagesAsBytes); + } + + /** + * Write messages to a Kafka topic. + * + * @param topic The name of the Kafka topic. + * @param messages The messages to write. + */ + public void writeMessages(String topic, List messages) { + + writeMessages(topic, messages.toArray(new String[] {})); + } }