Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 94F16200BDC for ; Wed, 14 Dec 2016 10:26:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9378E160B19; Wed, 14 Dec 2016 09:26:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 64F1E160B13 for ; Wed, 14 Dec 2016 10:26:18 +0100 (CET) Received: (qmail 77643 invoked by uid 500); 14 Dec 2016 09:26:17 -0000 Mailing-List: contact commits-help@eagle.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@eagle.incubator.apache.org Delivered-To: mailing list commits@eagle.incubator.apache.org Received: (qmail 77630 invoked by uid 99); 14 Dec 2016 09:26:17 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Dec 2016 09:26:17 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 2B5A618127A for ; Wed, 14 Dec 2016 09:26:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 0jk8EZtT_Txw for ; Wed, 14 Dec 2016 09:26:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 975995FCB2 for ; Wed, 14 Dec 2016 09:26:13 +0000 (UTC) Received: (qmail 77608 invoked by uid 99); 14 Dec 2016 09:26:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Dec 2016 09:26:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EE4AAE35C8; Wed, 14 Dec 2016 09:26:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jinhuwu@apache.org To: commits@eagle.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: incubator-eagle git commit: [EAGLE-841] CorrelationSpout reads zk connection from datasource if exists Date: Wed, 14 Dec 2016 09:26:12 +0000 (UTC) archived-at: Wed, 14 Dec 2016 09:26:19 -0000 Repository: incubator-eagle Updated Branches: refs/heads/master 880ba738c -> 7639ff223 [EAGLE-841] CorrelationSpout reads zk connection from datasource if exists Author: wujinhu Closes #741 from wujinhu/EAGLE_841. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7639ff22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7639ff22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7639ff22 Branch: refs/heads/master Commit: 7639ff2237352884c76e862fd14826cb053bb0fc Parents: 880ba73 Author: wujinhu Authored: Wed Dec 14 17:26:05 2016 +0800 Committer: wujinhu Committed: Wed Dec 14 17:26:05 2016 +0800 ---------------------------------------------------------------------- .../eagle/alert/utils/AlertConstants.java | 3 ++ .../alert/engine/spout/CorrelationSpout.java | 38 +++++++++++--------- .../engine/topology/CorrelationSpoutTest.java | 8 +++-- .../eagle/app/service/ApplicationAction.java | 10 ++++++ .../src/main/bin/metadata-ddl.sql | 10 +++--- 5 files changed, 44 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java index ee2c28c..2740836 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java @@ -28,4 +28,7 @@ public class AlertConstants { public static final String ALERT_SERVICE_ENDPOINT_NAME = "AlertService"; public static final String COORDINATOR = "coordinator"; + + public static final String KAFKA_BROKER_ZK_BASE_PATH = "spout.kafkaBrokerZkBasePath"; + public static final String KAFKA_BROKER_ZK_QUORUM = "spout.kafkaBrokerZkQuorum"; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java index 63e94ca..60a9b98 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java @@ -27,6 +27,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.commons.collections.CollectionUtils; import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata; import org.apache.eagle.alert.coordination.model.SpoutSpec; @@ -235,8 +236,10 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener // build lookup table for scheme Map newSchemaName = new HashMap(); + Map> dataSourceProperties = new HashMap<>(); for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) { newSchemaName.put(ds.getTopic(), ds.getSchemeCls()); + dataSourceProperties.put(ds.getTopic(), ds.getProperties()); } // copy and swap @@ -248,7 +251,8 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic)); continue; } - KafkaSpoutWrapper newWrapper = createKafkaSpout(conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); + KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config), + conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds); newKafkaSpoutList.put(topic, newWrapper); } // iterate remove topics and then close KafkaSpout @@ -297,47 +301,47 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener * @return */ @SuppressWarnings("rawtypes") - protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, + protected KafkaSpoutWrapper createKafkaSpout(Config configure, Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, String schemeClsName, SpoutSpec spoutSpec, Map sds) throws Exception { - String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum"); + String kafkaBrokerZkQuorum = configure.getString(AlertConstants.KAFKA_BROKER_ZK_QUORUM); BrokerHosts hosts = null; - if (config.hasPath("spout.kafkaBrokerZkBasePath")) { - hosts = new ZkHosts(kafkaBrokerZkQuorum, config.getString("spout.kafkaBrokerZkBasePath")); + if (configure.hasPath("spout.kafkaBrokerZkBasePath")) { + hosts = new ZkHosts(kafkaBrokerZkQuorum, configure.getString(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH)); } else { hosts = new ZkHosts(kafkaBrokerZkQuorum); } String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT; - if (config.hasPath("spout.stormKafkaTransactionZkPath")) { - transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath"); + if (configure.hasPath("spout.stormKafkaTransactionZkPath")) { + transactionZkRoot = configure.getString("spout.stormKafkaTransactionZkPath"); } boolean logEventEnabled = false; - if (config.hasPath("topology.logEventEnabled")) { - logEventEnabled = config.getBoolean("topology.logEventEnabled"); + if (configure.hasPath("topology.logEventEnabled")) { + logEventEnabled = configure.getBoolean("topology.logEventEnabled"); } // write partition offset etc. into zkRoot+id, see PartitionManager.committedPath String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH; - if (config.hasPath("spout.stormKafkaEagleConsumer")) { - zkStateTransactionRelPath = config.getString("spout.stormKafkaEagleConsumer"); + if (configure.hasPath("spout.stormKafkaEagleConsumer")) { + zkStateTransactionRelPath = configure.getString("spout.stormKafkaEagleConsumer"); } SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId); // transaction zkServers - boolean stormKafkaUseSameZkQuorumWithKafkaBroker = config.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker"); + boolean stormKafkaUseSameZkQuorumWithKafkaBroker = configure.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker"); if (stormKafkaUseSameZkQuorumWithKafkaBroker) { ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum); spoutConfig.zkServers = utils.getZkHosts(); spoutConfig.zkPort = utils.getZkPort(); } else { - ZkServerPortUtils utils = new ZkServerPortUtils(config.getString("spout.stormKafkaTransactionZkQuorum")); + ZkServerPortUtils utils = new ZkServerPortUtils(configure.getString("spout.stormKafkaTransactionZkQuorum")); spoutConfig.zkServers = utils.getZkHosts(); spoutConfig.zkPort = utils.getZkPort(); } // transaction update interval - spoutConfig.stateUpdateIntervalMs = config.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? config.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000; + spoutConfig.stateUpdateIntervalMs = configure.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? configure.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000; // Kafka fetch size - spoutConfig.fetchSizeBytes = config.hasPath("spout.stormKafkaFetchSizeBytes") ? config.getInt("spout.stormKafkaFetchSizeBytes") : 1048586; + spoutConfig.fetchSizeBytes = configure.hasPath("spout.stormKafkaFetchSizeBytes") ? configure.getInt("spout.stormKafkaFetchSizeBytes") : 1048586; // "startOffsetTime" is for test usage, prod should not use this - if (config.hasPath("spout.stormKafkaStartOffsetTime")) { - spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime"); + if (configure.hasPath("spout.stormKafkaStartOffsetTime")) { + spoutConfig.startOffsetTime = configure.getInt("spout.stormKafkaStartOffsetTime"); } spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java index 9deb4b2..5a86cd2 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java @@ -56,7 +56,7 @@ public class CorrelationSpoutTest { AtomicBoolean validated = new AtomicBoolean(false); CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { @Override - protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, + protected KafkaSpoutWrapper createKafkaSpout(Config config, Map conf, TopologyContext context, SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map sds) throws Exception { validated.set(true); @@ -94,7 +94,8 @@ public class CorrelationSpoutTest { final AtomicBoolean verified = new AtomicBoolean(false); CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { @Override - protected KafkaSpoutWrapper createKafkaSpout(Map conf, + protected KafkaSpoutWrapper createKafkaSpout(Config config, + Map conf, TopologyContext context, SpoutOutputCollector collector, String topic, @@ -147,7 +148,8 @@ public class CorrelationSpoutTest { final AtomicBoolean verified = new AtomicBoolean(false); CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) { @Override - protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, + protected KafkaSpoutWrapper createKafkaSpout(Config config, + Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic, String schemeClsName, SpoutSpec streamMetadatas, Map sds) { return new KafkaSpoutWrapper(null, null); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java index 8c7c8d6..a502f81 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java @@ -31,6 +31,7 @@ import org.apache.eagle.app.Application; import org.apache.eagle.app.environment.ExecutionRuntime; import org.apache.eagle.app.environment.ExecutionRuntimeManager; import org.apache.eagle.app.messaging.KafkaStreamSinkConfig; +import org.apache.eagle.app.messaging.KafkaStreamSourceConfig; import org.apache.eagle.metadata.model.ApplicationEntity; import org.apache.eagle.metadata.model.StreamSourceConfig; import org.apache.eagle.metadata.utils.StreamIdConversions; @@ -144,6 +145,15 @@ public class ApplicationAction implements Serializable { datasource.setName(metadata.getAppId()); datasource.setTopic(kafkaCfg.getTopicId()); datasource.setSchemeCls(JsonScheme.class.getCanonicalName()); + datasource.setProperties(new HashMap<>()); + + KafkaStreamSourceConfig streamSourceConfig = (KafkaStreamSourceConfig) streamDesc.getSourceConfig(); + if (streamSourceConfig != null) { + Map properties = datasource.getProperties(); + properties.put(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH, streamSourceConfig.getBrokerZkPath()); + properties.put(AlertConstants.KAFKA_BROKER_ZK_QUORUM, streamSourceConfig.getBrokerZkQuorum()); + } + Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata(); Properties prop = new Properties(); prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId()); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-server-assembly/src/main/bin/metadata-ddl.sql ---------------------------------------------------------------------- diff --git a/eagle-server-assembly/src/main/bin/metadata-ddl.sql b/eagle-server-assembly/src/main/bin/metadata-ddl.sql index f80a7ea..4bed927 100644 --- a/eagle-server-assembly/src/main/bin/metadata-ddl.sql +++ b/eagle-server-assembly/src/main/bin/metadata-ddl.sql @@ -46,21 +46,21 @@ CREATE TABLE IF NOT EXISTS sites ( -- eagle security module metadata CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity ( - site varchar(20) DEFAULT NULL, - filedir varchar(100) DEFAULT NULL, + site varchar(20) NOT NULL, + filedir varchar(100) NOT NULL, sensitivity_type varchar(20) DEFAULT NULL, primary key (site, filedir) ); CREATE TABLE IF NOT EXISTS ip_securityzone ( - iphost varchar(100) DEFAULT NULL, + iphost varchar(100) NOT NULL, security_zone varchar(100) DEFAULT NULL, primary key (iphost) ); CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity ( - site varchar(20) DEFAULT NULL, - hbase_resource varchar(100) DEFAULT NULL, + site varchar(20) NOT NULL, + hbase_resource varchar(100) NOT NULL, sensitivity_type varchar(20) DEFAULT NULL, primary key (site, hbase_resource) );