Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 91650200D34 for ; Fri, 3 Nov 2017 21:08:05 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8FC1C160BFB; Fri, 3 Nov 2017 20:08:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 603BC160BDE for ; Fri, 3 Nov 2017 21:08:04 +0100 (CET) Received: (qmail 11184 invoked by uid 500); 3 Nov 2017 20:08:03 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 11173 invoked by uid 99); 3 Nov 2017 20:08:03 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Nov 2017 20:08:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id A7B5ADF510 for ; Fri, 3 Nov 2017 20:08:02 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.202 X-Spam-Level: X-Spam-Status: No, score=-99.202 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id b0n85zQm5UOR for ; Fri, 3 Nov 2017 20:08:01 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id E6F865F36D for ; Fri, 3 Nov 2017 20:08:00 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 741D3E0617 for ; Fri, 3 Nov 2017 20:08:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 2597C23F1D for ; Fri, 3 Nov 2017 20:08:00 +0000 (UTC) Date: Fri, 3 Nov 2017 20:08:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@apex.incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (APEXCORE-791) Gateway security settings need to be available in the DAG MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 03 Nov 2017 20:08:05 -0000 [ https://issues.apache.org/jira/browse/APEXCORE-791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238283#comment-16238283 ] ASF GitHub Bot commented on APEXCORE-791: ----------------------------------------- vrozov closed pull request #586: APEXCORE-791 Making gateway security related settings available during construction of the DAG URL: https://github.com/apache/apex-core/pull/586 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 07641d23e2..65be99a6d0 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -71,6 +71,7 @@ import org.apache.apex.engine.plugin.ApexPluginDispatcher; import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher; import org.apache.apex.engine.util.CascadeStorageAgent; +import org.apache.apex.engine.util.PubSubWebSocketClientBuilder; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -197,7 +198,6 @@ public class StreamingContainerManager implements PlanContext { private static final Logger LOG = LoggerFactory.getLogger(StreamingContainerManager.class); - public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login"; public static final String BUILTIN_APPDATA_URL = "builtin"; public static final String CONTAINERS_INFO_FILENAME_FORMAT = "containers_%d.json"; public static final String OPERATORS_INFO_FILENAME_FORMAT = "operators_%d.json"; @@ -556,23 +556,12 @@ private void setupStringCodecs() private void setupWsClient() { - String gatewayAddress = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS); - boolean gatewayUseSsl = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USE_SSL); - String gatewayUserName = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_USER_NAME); - String gatewayPassword = plan.getLogicalPlan().getValue(LogicalPlan.GATEWAY_PASSWORD); - int timeout = plan.getLogicalPlan().getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS); - - if (gatewayAddress != null) { + wsClient = new PubSubWebSocketClientBuilder().setContext(plan.getLogicalPlan()).build(); + if (wsClient != null) { try { - wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", timeout); - if (gatewayUserName != null && gatewayPassword != null) { - wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH); - wsClient.setUserName(gatewayUserName); - wsClient.setPassword(gatewayPassword); - } - wsClient.setup(); - } catch (Exception ex) { - LOG.warn("Cannot establish websocket connection to {}", gatewayAddress, ex); + wsClient.openConnection(); + } catch (Exception e) { + LOG.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e); } } } diff --git a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java index e41b348a0c..49998cfa00 100644 --- a/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java +++ b/engine/src/main/java/com/datatorrent/stram/debug/TupleRecorderCollection.java @@ -26,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.engine.util.PubSubWebSocketClientBuilder; + import com.datatorrent.api.Component; import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; @@ -39,7 +41,6 @@ import com.datatorrent.api.StatsListener; import com.datatorrent.api.StatsListener.OperatorRequest; import com.datatorrent.api.StringCodec; -import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.api.ContainerContext; import com.datatorrent.stram.api.ContainerEvent.ContainerStatsEvent; import com.datatorrent.stram.api.ContainerEvent.NodeActivationEvent; @@ -66,10 +67,6 @@ public class TupleRecorderCollection extends HashMap implements Component { private int tupleRecordingPartFileSize; - private String gatewayAddress; - private boolean gatewayUseSsl = false; - private String gatewayUserName; - private String gatewayPassword; private long tupleRecordingPartFileTimeMillis; private String appPath; private String appId; @@ -89,13 +86,11 @@ public void setup(Context ctx) tupleRecordingPartFileSize = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE); tupleRecordingPartFileTimeMillis = ctx.getValue(LogicalPlan.TUPLE_RECORDING_PART_FILE_TIME_MILLIS); appId = ctx.getValue(LogicalPlan.APPLICATION_ID); - gatewayAddress = ctx.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS); - gatewayUseSsl = ctx.getValue(LogicalPlan.GATEWAY_USE_SSL); - gatewayUserName = ctx.getValue(LogicalPlan.GATEWAY_USER_NAME); - gatewayPassword = ctx.getValue(LogicalPlan.GATEWAY_PASSWORD); appPath = ctx.getValue(LogicalPlan.APPLICATION_PATH); codecs = ctx.getAttributes().get(Context.DAGContext.STRING_CODECS); + wsClient = new PubSubWebSocketClientBuilder().setContext(ctx).build(); + RequestDelegateImpl impl = new RequestDelegateImpl(); RequestFactory rf = ctx.getValue(ContainerContext.REQUEST_FACTORY); if (rf == null) { @@ -161,21 +156,11 @@ private void startRecording(String id, final Node node, int operatorId, final if (!conflict) { logger.debug("Executing start recording request for {}", operatorIdPortNamePair); - if (gatewayAddress != null && wsClient == null) { - synchronized (this) { - if (wsClient == null) { - try { - wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", 500); - if (gatewayUserName != null && gatewayPassword != null) { - wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + StreamingContainerManager.GATEWAY_LOGIN_URL_PATH); - wsClient.setUserName(gatewayUserName); - wsClient.setPassword(gatewayPassword); - } - wsClient.setup(); - } catch (Exception ex) { - logger.warn("Error initializing websocket", ex); - } - } + if (wsClient != null) { + try { + wsClient.openConnection(); + } catch (Exception e) { + logger.warn("Cannot establish websocket connection to uri {}", wsClient.getUri(), e); } } diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java index 01a4c7bb75..5fad04f40a 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlanConfiguration.java @@ -143,6 +143,9 @@ public static final String KEY_APPLICATION_NAME = keyAndDeprecation(Context.DAGContext.APPLICATION_NAME); public static final String KEY_GATEWAY_CONNECT_ADDRESS = keyAndDeprecation(Context.DAGContext.GATEWAY_CONNECT_ADDRESS); + public static final String KEY_GATEWAY_USE_SSL = keyAndDeprecation(Context.DAGContext.GATEWAY_USE_SSL); + public static final String KEY_GATEWAY_USER_NAME = keyAndDeprecation(Context.DAGContext.GATEWAY_USER_NAME); + public static final String KEY_GATEWAY_PASSWORD = keyAndDeprecation(Context.DAGContext.GATEWAY_PASSWORD); private static String keyAndDeprecation(Attribute attr) { @@ -2248,9 +2251,8 @@ private GenericOperator addOperator(LogicalPlan dag, String name, Class clazz */ public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name) { - // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used - String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS); - dag.setAttribute(Context.DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress); + prepareDAGAttributes(dag); + pluginManager.setup(dag); if (app != null) { pluginManager.dispatch(PRE_POPULATE_DAG.event); @@ -2277,6 +2279,25 @@ public void prepareDAG(LogicalPlan dag, StreamingApplication app, String name) pluginManager.teardown(); } + private void prepareDAGAttributes(LogicalPlan dag) + { + // Consider making all attributes available for DAG construction + // EVENTUALLY to be replaced by variable enabled configuration in the demo where the attribute below is used + String connectAddress = conf.get(KEY_GATEWAY_CONNECT_ADDRESS); + dag.setAttribute(DAGContext.GATEWAY_CONNECT_ADDRESS, connectAddress == null ? conf.get(GATEWAY_LISTEN_ADDRESS) : connectAddress); + if (conf.getBoolean(KEY_GATEWAY_USE_SSL, DAGContext.GATEWAY_USE_SSL.defaultValue)) { + dag.setAttribute(DAGContext.GATEWAY_USE_SSL, true); + } + String username = conf.get(KEY_GATEWAY_USER_NAME); + if (username != null) { + dag.setAttribute(DAGContext.GATEWAY_USER_NAME, username); + } + String password = conf.get(KEY_GATEWAY_PASSWORD); + if (password != null) { + dag.setAttribute(DAGContext.GATEWAY_PASSWORD, password); + } + } + private void flattenDAG(LogicalPlan dag, Configuration conf) { for (ModuleMeta moduleMeta : dag.getAllModules()) { diff --git a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java index 47986cb892..b8f5bfea94 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java +++ b/engine/src/main/java/com/datatorrent/stram/util/PubSubWebSocketClient.java @@ -138,6 +138,11 @@ public void setUri(URI uri) this.uri = uri; } + public URI getUri() + { + return uri; + } + public void setIoThreadMultiplier(int ioThreadMultiplier) { this.ioThreadMultiplier = ioThreadMultiplier; @@ -239,17 +244,18 @@ public void onOpen(WebSocket ws) } } + protected boolean isConnectionSetup() + { + return (connection != null); + } + /** * * @return true if the connection is open; false otherwise. */ public boolean isConnectionOpen() { - if (connection == null) { - return false; - } - - return connection.isOpen(); + return isConnectionSetup() ? connection.isOpen() : false; } /** diff --git a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java index 73572f2e8f..d981127a74 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java +++ b/engine/src/main/java/com/datatorrent/stram/util/SharedPubSubWebSocketClient.java @@ -57,6 +57,11 @@ } + /** + * Construct a SharedPubSubWebSocketClient with the given parameters + * @param uri The web socket server uri + * @param timeoutMillis The connection timeout + */ public SharedPubSubWebSocketClient(URI uri, long timeoutMillis) { this.setUri(uri); @@ -64,14 +69,21 @@ public SharedPubSubWebSocketClient(URI uri, long timeoutMillis) this.timeoutMillis = timeoutMillis; } + /** + * Construct a SharedPubSubWebSocketClient with the given parameters + * @param uri The web socket server uri as string + * @param timeoutMillis The connection timeout + */ public SharedPubSubWebSocketClient(String uri, long timeoutMillis) throws URISyntaxException { this(new URI(uri), timeoutMillis); } - public void setup() throws IOException, ExecutionException, InterruptedException, TimeoutException + public synchronized void openConnection() throws IOException, ExecutionException, InterruptedException, TimeoutException { - openConnection(timeoutMillis); + if (!isConnectionSetup()) { + super.openConnection(timeoutMillis); + } } public synchronized void addHandler(String topic, boolean numSubscribers, Handler handler) diff --git a/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java b/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java new file mode 100644 index 0000000000..dc0bd3a914 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/util/PubSubWebSocketClientBuilder.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.util; + +import java.net.URISyntaxException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.util.PubSubWebSocketClient; +import com.datatorrent.stram.util.SharedPubSubWebSocketClient; + +public class PubSubWebSocketClientBuilder +{ + public static final String GATEWAY_LOGIN_URL_PATH = "/ws/v2/login"; + + private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketClientBuilder.class); + + private Context context; + + public PubSubWebSocketClientBuilder setContext(Context context) + { + this.context = context; + return this; + } + + private T build(Class clazz) + { + Preconditions.checkState(context != null, "Context not specified"); + String gatewayAddress = context.getValue(LogicalPlan.GATEWAY_CONNECT_ADDRESS); + if (gatewayAddress != null) { + int timeout = context.getValue(LogicalPlan.PUBSUB_CONNECT_TIMEOUT_MILLIS); + boolean gatewayUseSsl = context.getValue(LogicalPlan.GATEWAY_USE_SSL); + + // The builder can be used to build different types of PubSub clients in future but for now only one is supported + SharedPubSubWebSocketClient wsClient = null; + + try { + wsClient = new SharedPubSubWebSocketClient((gatewayUseSsl ? "wss://" : "ws://") + gatewayAddress + "/pubsub", timeout); + + String gatewayUserName = context.getValue(LogicalPlan.GATEWAY_USER_NAME); + String gatewayPassword = context.getValue(LogicalPlan.GATEWAY_PASSWORD); + if (gatewayUserName != null && gatewayPassword != null) { + wsClient.setLoginUrl((gatewayUseSsl ? "https://" : "http://") + gatewayAddress + GATEWAY_LOGIN_URL_PATH); + wsClient.setUserName(gatewayUserName); + wsClient.setPassword(gatewayPassword); + } + + return (T)wsClient; + } catch (URISyntaxException e) { + logger.warn("Unable to initialize websocket for gateway address {}", gatewayAddress, e); + } + + return null; + } + + return null; + } + + public SharedPubSubWebSocketClient build() + { + return build(SharedPubSubWebSocketClient.class); + } + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org > Gateway security settings need to be available in the DAG > --------------------------------------------------------- > > Key: APEXCORE-791 > URL: https://issues.apache.org/jira/browse/APEXCORE-791 > Project: Apache Apex Core > Issue Type: Bug > Reporter: Pramod Immaneni > Assignee: Pramod Immaneni > Priority: Major > > Gateway connect address attribute is available while constructing the DAG but other gateway security-related attributes such as GATEWAY_USE_SSL are not. These need to be made available as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)