Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9AD1C200C81 for ; Tue, 18 Apr 2017 07:39:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9964A160BB4; Tue, 18 Apr 2017 05:39:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 65892160BB8 for ; Tue, 18 Apr 2017 07:39:09 +0200 (CEST) Received: (qmail 56178 invoked by uid 500); 18 Apr 2017 05:39:08 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 54973 invoked by uid 99); 18 Apr 2017 05:39:04 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Apr 2017 05:39:04 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79D15E93E1; Tue, 18 Apr 2017 05:39:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 18 Apr 2017 05:39:43 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [41/46] ignite git commit: IGNITE-4995 Multi-cluster support for Web Console. archived-at: Tue, 18 Apr 2017 05:39:11 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java index 8f70100..6f2b60e 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentConfiguration.java @@ -23,17 +23,18 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; +import org.apache.ignite.internal.util.typedef.F; + +import static java.nio.charset.StandardCharsets.UTF_8; /** * Agent configuration. */ public class AgentConfiguration { - /** Default Ignite node HTTP port. */ - public static final int DFLT_NODE_PORT = 8080; - /** Default path to agent property file. */ public static final String DFLT_CFG_PATH = "default.properties"; @@ -51,8 +52,8 @@ public class AgentConfiguration { /** */ @Parameter(names = {"-s", "--server-uri"}, description = "URI for connect to Ignite Console via web-socket protocol" + - " " + - " Default value: " + DFLT_SERVER_URI) + " " + + " Default value: " + DFLT_SERVER_URI) private String srvUri; /** */ @@ -77,7 +78,13 @@ public class AgentConfiguration { private String driversFolder; /** */ - @Parameter(names = { "-h", "--help" }, help = true, description = "Print this help message") + @Parameter(names = {"-dd", "--disable-demo"}, description = "Disable demo mode on this agent " + + " " + + " Default value: false") + private Boolean disableDemo; + + /** */ + @Parameter(names = {"-h", "--help"}, help = true, description = "Print this help message") private Boolean help; /** @@ -158,6 +165,20 @@ public class AgentConfiguration { } /** + * @return Disable demo mode. + */ + public Boolean disableDemo() { + return disableDemo != null ? disableDemo : Boolean.FALSE; + } + + /** + * @param disableDemo Disable demo mode. + */ + public void disableDemo(Boolean disableDemo) { + this.disableDemo = disableDemo; + } + + /** * @return {@code true} If agent options usage should be printed. */ public Boolean help() { @@ -170,14 +191,14 @@ public class AgentConfiguration { public void load(URL cfgUrl) throws IOException { Properties props = new Properties(); - try (Reader reader = new InputStreamReader(cfgUrl.openStream())) { + try (Reader reader = new InputStreamReader(cfgUrl.openStream(), UTF_8)) { props.load(reader); } String val = (String)props.remove("tokens"); if (val != null) - tokens(Arrays.asList(val.split(","))); + tokens(new ArrayList<>(Arrays.asList(val.split(",")))); val = (String)props.remove("server-uri"); @@ -216,13 +237,16 @@ public class AgentConfiguration { if (driversFolder == null) driversFolder(cmd.driversFolder()); + + if (disableDemo == null) + disableDemo(cmd.disableDemo()); } /** {@inheritDoc} */ @Override public String toString() { StringBuilder sb = new StringBuilder(); - if (tokens != null && tokens.size() > 0) { + if (!F.isEmpty(tokens)) { sb.append("User's security tokens : "); boolean first = true; @@ -231,7 +255,7 @@ public class AgentConfiguration { if (first) first = false; else - sb.append(","); + sb.append(','); if (tok.length() > 4) { sb.append(new String(new char[tok.length() - 4]).replace('\0', '*')); @@ -258,7 +282,8 @@ public class AgentConfiguration { drvFld = new File(agentHome, "jdbc-drivers").getPath(); } - sb.append("Path to JDBC drivers folder : ").append(drvFld); + sb.append("Path to JDBC drivers folder : ").append(drvFld).append('\n'); + sb.append("Demo mode : ").append(disableDemo() ? "disabled" : "enabled"); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java index a3d609f..65b8192 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentLauncher.java @@ -34,6 +34,8 @@ import java.net.URL; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Scanner; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.jar.Attributes; import java.util.jar.Manifest; @@ -41,29 +43,48 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import org.apache.ignite.console.agent.handlers.ClusterListener; +import org.apache.ignite.console.agent.handlers.DemoListener; +import org.apache.ignite.console.agent.rest.RestExecutor; import org.apache.ignite.console.agent.handlers.DatabaseListener; import org.apache.ignite.console.agent.handlers.RestListener; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; -import org.apache.log4j.Logger; +import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; import static io.socket.client.Socket.EVENT_CONNECT; -import static io.socket.client.Socket.EVENT_CONNECTING; import static io.socket.client.Socket.EVENT_CONNECT_ERROR; import static io.socket.client.Socket.EVENT_DISCONNECT; import static io.socket.client.Socket.EVENT_ERROR; -import static io.socket.client.Socket.EVENT_RECONNECTING; +import static org.apache.ignite.console.agent.AgentUtils.fromJSON; +import static org.apache.ignite.console.agent.AgentUtils.toJSON; /** * Control Center Agent launcher. */ public class AgentLauncher { /** */ - private static final Logger log = Logger.getLogger(AgentLauncher.class.getName()); + private static final Logger log = LoggerFactory.getLogger(AgentLauncher.class); /** */ - private static final String EVENT_NODE_REST = "node:rest"; + private static final String EVENT_CLUSTER_BROADCAST_START = "cluster:broadcast:start"; + + /** */ + private static final String EVENT_CLUSTER_BROADCAST_STOP = "cluster:broadcast:stop"; + + /** */ + private static final String EVENT_CLUSTER_DISCONNECTED = "cluster:disconnected"; + + /** */ + private static final String EVENT_DEMO_BROADCAST_START = "demo:broadcast:start"; + + /** */ + private static final String EVENT_DEMO_BROADCAST_STOP = "demo:broadcast:stop"; /** */ private static final String EVENT_SCHEMA_IMPORT_DRIVERS = "schemaImport:drivers"; @@ -75,10 +96,24 @@ public class AgentLauncher { private static final String EVENT_SCHEMA_IMPORT_METADATA = "schemaImport:metadata"; /** */ - private static final String EVENT_AGENT_WARNING = "agent:warning"; + private static final String EVENT_NODE_VISOR_TASK = "node:visorTask"; + + /** */ + private static final String EVENT_NODE_REST = "node:rest"; + + /** */ + private static final String EVENT_RESET_TOKENS = "agent:reset:token"; /** */ - private static final String EVENT_AGENT_CLOSE = "agent:close"; + private static final String EVENT_LOG_WARNING = "log:warn"; + + static { + // Optionally remove existing handlers attached to j.u.l root logger. + SLF4JBridgeHandler.removeHandlersForRootLogger(); + + // Add SLF4JBridgeHandler to j.u.l's root logger. + SLF4JBridgeHandler.install(); + } /** * Create a trust manager that trusts all certificates It is not using a particular keyStore @@ -86,15 +121,18 @@ public class AgentLauncher { private static TrustManager[] getTrustManagers() { return new TrustManager[] { new X509TrustManager() { - public java.security.cert.X509Certificate[] getAcceptedIssuers() { + /** {@inheritDoc} */ + @Override public java.security.cert.X509Certificate[] getAcceptedIssuers() { return null; } - public void checkClientTrusted( + /** {@inheritDoc} */ + @Override public void checkClientTrusted( java.security.cert.X509Certificate[] certs, String authType) { } - public void checkServerTrusted( + /** {@inheritDoc} */ + @Override public void checkServerTrusted( java.security.cert.X509Certificate[] certs, String authType) { } }}; @@ -104,14 +142,13 @@ public class AgentLauncher { * On error listener. */ private static final Emitter.Listener onError = new Emitter.Listener() { - @SuppressWarnings("ThrowableResultOfMethodCallIgnored") @Override public void call(Object... args) { Throwable e = (Throwable)args[0]; ConnectException ce = X.cause(e, ConnectException.class); if (ce != null) - log.error("Failed to receive response from server (connection refused)."); + log.error("Failed to establish connection to server (connection refused)."); else { Exception ignore = X.cause(e, SSLHandshakeException.class); @@ -172,7 +209,25 @@ public class AgentLauncher { */ private static final Emitter.Listener onDisconnect = new Emitter.Listener() { @Override public void call(Object... args) { - log.error(String.format("Connection closed: %s.", args)); + log.error("Connection closed: {}", args); + } + }; + + /** + * On token reset listener. + */ + private static final Emitter.Listener onLogWarning = new Emitter.Listener() { + @Override public void call(Object... args) { + log.warn(String.valueOf(args[0])); + } + }; + + /** + * On demo start request. + */ + private static final Emitter.Listener onDemoStart = new Emitter.Listener() { + @Override public void call(Object... args) { + log.warn(String.valueOf(args[0])); } }; @@ -205,7 +260,6 @@ public class AgentLauncher { /** * @param args Args. */ - @SuppressWarnings("BusyWait") public static void main(String[] args) throws Exception { log.info("Starting Apache Ignite Web Console Agent..."); @@ -236,13 +290,13 @@ public class AgentLauncher { File f = AgentUtils.resolvePath(prop); if (f == null) - log.warn("Failed to find agent property file: " + prop); + log.warn("Failed to find agent property file: {}", prop); else propCfg.load(f.toURI().toURL()); } - catch (IOException ignore) { + catch (IOException e) { if (!AgentConfiguration.DFLT_CFG_PATH.equals(prop)) - log.warn("Failed to load agent property file: " + prop, ignore); + log.warn("Failed to load agent property file: " + prop, e); } cfg.merge(propCfg); @@ -314,17 +368,11 @@ public class AgentLauncher { } final Socket client = IO.socket(uri, opts); - - final RestListener restHnd = new RestListener(cfg); - - final DatabaseListener dbHnd = new DatabaseListener(cfg); + final RestExecutor restExecutor = new RestExecutor(cfg.nodeUri()); try { - Emitter.Listener onConnecting = new Emitter.Listener() { - @Override public void call(Object... args) { - log.info("Connecting to: " + cfg.serverUri()); - } - }; + final ClusterListener clusterLsnr = new ClusterListener(client, restExecutor); + final DemoListener demoHnd = new DemoListener(client, restExecutor); Emitter.Listener onConnect = new Emitter.Listener() { @Override public void call(Object... args) { @@ -333,7 +381,8 @@ public class AgentLauncher { JSONObject authMsg = new JSONObject(); try { - authMsg.put("tokens", cfg.tokens()); + authMsg.put("tokens", toJSON(cfg.tokens())); + authMsg.put("disableDemo", cfg.disableDemo()); String clsName = AgentLauncher.class.getSimpleName() + ".class"; @@ -353,14 +402,49 @@ public class AgentLauncher { client.emit("agent:auth", authMsg, new Ack() { @Override public void call(Object... args) { - // Authentication failed if response contains args. - if (args != null && args.length > 0) { - onDisconnect.call(args); + if (args != null) { + if (args[0] instanceof String) { + log.error((String)args[0]); + + System.exit(1); + } + + if (args[0] == null && args[1] instanceof JSONArray) { + try { + List activeTokens = fromJSON(args[1], List.class); + + if (!F.isEmpty(activeTokens)) { + Collection missedTokens = cfg.tokens(); + + cfg.tokens(activeTokens); + + missedTokens.removeAll(activeTokens); + + if (!F.isEmpty(missedTokens)) { + String tokens = F.concat(missedTokens, ", "); - System.exit(1); + log.warn("Failed to authenticate with token(s): {}. " + + "Please reload agent archive or check settings", tokens); + } + + log.info("Authentication success."); + + clusterLsnr.watch(); + + return; + } + } + catch (Exception e) { + log.error("Failed to authenticate agent. Please check agent\'s tokens", e); + + System.exit(1); + } + } } - log.info("Authentication success."); + log.error("Failed to authenticate agent. Please check agent\'s tokens"); + + System.exit(1); } }); } @@ -372,44 +456,52 @@ public class AgentLauncher { } }; + DatabaseListener dbHnd = new DatabaseListener(cfg); + RestListener restHnd = new RestListener(restExecutor); + final CountDownLatch latch = new CountDownLatch(1); + log.info("Connecting to: {}", cfg.serverUri()); + client - .on(EVENT_CONNECTING, onConnecting) .on(EVENT_CONNECT, onConnect) .on(EVENT_CONNECT_ERROR, onError) - .on(EVENT_RECONNECTING, onConnecting) - .on(EVENT_NODE_REST, restHnd) - .on(EVENT_SCHEMA_IMPORT_DRIVERS, dbHnd.availableDriversListener()) - .on(EVENT_SCHEMA_IMPORT_SCHEMAS, dbHnd.schemasListener()) - .on(EVENT_SCHEMA_IMPORT_METADATA, dbHnd.metadataListener()) .on(EVENT_ERROR, onError) .on(EVENT_DISCONNECT, onDisconnect) - .on(EVENT_AGENT_WARNING, new Emitter.Listener() { - @Override public void call(Object... args) { - log.warn(args[0]); - } - }) - .on(EVENT_AGENT_CLOSE, new Emitter.Listener() { + .on(EVENT_LOG_WARNING, onLogWarning) + .on(EVENT_CLUSTER_BROADCAST_START, clusterLsnr.start()) + .on(EVENT_CLUSTER_BROADCAST_STOP, clusterLsnr.stop()) + .on(EVENT_DEMO_BROADCAST_START, demoHnd.start()) + .on(EVENT_DEMO_BROADCAST_STOP, demoHnd.stop()) + .on(EVENT_RESET_TOKENS, new Emitter.Listener() { @Override public void call(Object... args) { - onDisconnect.call(args); + String tok = String.valueOf(args[0]); + + log.warn("Security token has been reset: {}", tok); - client.off(); + cfg.tokens().remove(tok); - latch.countDown(); + if (cfg.tokens().isEmpty()) { + client.off(); + + latch.countDown(); + } } - }); + }) + .on(EVENT_SCHEMA_IMPORT_DRIVERS, dbHnd.availableDriversListener()) + .on(EVENT_SCHEMA_IMPORT_SCHEMAS, dbHnd.schemasListener()) + .on(EVENT_SCHEMA_IMPORT_METADATA, dbHnd.metadataListener()) + .on(EVENT_NODE_VISOR_TASK, restHnd) + .on(EVENT_NODE_REST, restHnd); client.connect(); latch.await(); } finally { - client.close(); + restExecutor.stop(); - restHnd.stop(); - - dbHnd.stop(); + client.close(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java index cb22651..1999afc 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/AgentUtils.java @@ -59,7 +59,7 @@ public class AgentUtils { mapper.registerModule(module); } - + /** * Default constructor. */ @@ -153,7 +153,7 @@ public class AgentUtils { /** * Remove callback from handler arguments. - * + * * @param args Arguments. * @return Arguments without callback. */ @@ -165,7 +165,7 @@ public class AgentUtils { /** * Map java object to JSON object. - * + * * @param obj Java object. * @return {@link JSONObject} or {@link JSONArray}. * @throws IllegalArgumentException If conversion fails due to incompatible type. http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java new file mode 100644 index 0000000..23b8fc7 --- /dev/null +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/ClusterListener.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.console.agent.handlers; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.socket.client.Socket; +import io.socket.emitter.Emitter; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.console.agent.rest.RestExecutor; +import org.apache.ignite.console.agent.rest.RestResult; +import org.apache.ignite.internal.processors.rest.client.message.GridClientNodeBean; +import org.apache.ignite.internal.processors.rest.protocols.http.jetty.GridJettyObjectMapper; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteClosure; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ignite.console.agent.AgentUtils.toJSON; +import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_SUCCESS; + +/** + * + */ +public class ClusterListener { + /** */ + private static final Logger log = LoggerFactory.getLogger(ClusterListener.class); + + /** */ + private static final String EVENT_CLUSTER_CONNECTED = "cluster:connected"; + + /** */ + private static final String EVENT_CLUSTER_TOPOLOGY = "cluster:topology"; + + /** */ + private static final String EVENT_CLUSTER_DISCONNECTED = "cluster:disconnected"; + + /** Default timeout. */ + private static final long DFLT_TIMEOUT = 3000L; + + /** JSON object mapper. */ + private static final ObjectMapper mapper = new GridJettyObjectMapper(); + + /** Nids. */ + private Collection latestNids = Collections.emptyList(); + + /** */ + private final WatchTask watchTask = new WatchTask(); + + /** */ + private final BroadcastTask broadcastTask = new BroadcastTask(); + + /** */ + private static final IgniteClosure NODE2ID = new IgniteClosure() { + @Override public UUID apply(GridClientNodeBean n) { + return n.getNodeId(); + } + + @Override public String toString() { + return "Node bean to node ID transformer closure."; + } + }; + + /** */ + private static final IgniteClosure ID2ID8 = new IgniteClosure() { + @Override public String apply(UUID nid) { + return U.id8(nid).toUpperCase(); + } + + @Override public String toString() { + return "Node ID to ID8 transformer closure."; + } + }; + + /** */ + private static final ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); + + /** */ + private ScheduledFuture refreshTask; + + /** */ + private Socket client; + + /** */ + private RestExecutor restExecutor; + + /** + * @param client Client. + * @param restExecutor Client. + */ + public ClusterListener(Socket client, RestExecutor restExecutor) { + this.client = client; + this.restExecutor = restExecutor; + } + + /** + * Callback on cluster connect. + * + * @param nids Cluster nodes IDs. + */ + private void clusterConnect(Collection nids) { + log.info("Connection successfully established to cluster with nodes: {}", F.viewReadOnly(nids, ID2ID8)); + + client.emit(EVENT_CLUSTER_CONNECTED, toJSON(nids)); + } + + /** + * Callback on disconnect from cluster. + */ + private void clusterDisconnect() { + if (latestNids.isEmpty()) + return; + + latestNids = Collections.emptyList(); + + log.info("Connection to cluster was lost"); + + client.emit(EVENT_CLUSTER_DISCONNECTED, latestNids); + } + + /** + * Stop refresh task. + */ + private void safeStopRefresh() { + if (refreshTask != null) + refreshTask.cancel(true); + } + + /** + * Start watch cluster. + */ + public void watch() { + safeStopRefresh(); + + refreshTask = pool.scheduleWithFixedDelay(watchTask, 0L, DFLT_TIMEOUT, TimeUnit.MILLISECONDS); + } + + /** + * Start broadcast topology to server-side. + */ + public Emitter.Listener start() { + return new Emitter.Listener() { + @Override public void call(Object... args) { + safeStopRefresh(); + + final long timeout = args.length > 1 && args[1] instanceof Long ? (long)args[1] : DFLT_TIMEOUT; + + refreshTask = pool.scheduleWithFixedDelay(broadcastTask, 0L, timeout, TimeUnit.MILLISECONDS); + } + }; + } + + /** + * Stop broadcast topology to server-side. + */ + public Emitter.Listener stop() { + return new Emitter.Listener() { + @Override public void call(Object... args) { + refreshTask.cancel(true); + + watch(); + } + }; + } + + /** */ + private class WatchTask implements Runnable { + /** {@inheritDoc} */ + @Override public void run() { + try { + RestResult top = restExecutor.topology(false, false); + + switch (top.getStatus()) { + case STATUS_SUCCESS: + List nodes = mapper.readValue(top.getData(), + new TypeReference>() {}); + + Collection nids = F.viewReadOnly(nodes, NODE2ID); + + if (Collections.disjoint(latestNids, nids)) + log.info("Connection successfully established to cluster with nodes: {}", F.viewReadOnly(nids, ID2ID8)); + + client.emit(EVENT_CLUSTER_TOPOLOGY, nids); + + latestNids = nids; + + break; + + default: + log.warn(top.getError()); + + clusterDisconnect(); + } + } + catch (IOException ignore) { + clusterDisconnect(); + } + } + } + + /** */ + private class BroadcastTask implements Runnable { + /** {@inheritDoc} */ + @Override public void run() { + try { + RestResult top = restExecutor.topology(false, true); + + switch (top.getStatus()) { + case STATUS_SUCCESS: + List nodes = mapper.readValue(top.getData(), + new TypeReference>() {}); + + Collection nids = F.viewReadOnly(nodes, NODE2ID); + + if (Collections.disjoint(latestNids, nids)) { + clusterConnect(nids); + + clusterDisconnect(); + + watch(); + } + + latestNids = nids; + + client.emit(EVENT_CLUSTER_TOPOLOGY, top.getData()); + + break; + + default: + log.warn(top.getError()); + + clusterDisconnect(); + } + } + catch (IOException ignore) { + clusterDisconnect(); + + watch(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java index a0e9f8f..745c1f2 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DatabaseListener.java @@ -38,6 +38,7 @@ import org.apache.ignite.console.agent.db.DbMetadataReader; import org.apache.ignite.console.agent.db.DbTable; import org.apache.log4j.Logger; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.ignite.console.agent.AgentUtils.resolvePath; /** @@ -84,7 +85,6 @@ public class DatabaseListener { /** */ private final AbstractListener metadataLsnr = new AbstractListener() { - @SuppressWarnings("unchecked") @Override public Object execute(Map args) throws Exception { String driverPath = null; @@ -155,7 +155,7 @@ public class DatabaseListener { URL url = new URL("jar", null, "file:" + (win ? "/" : "") + file.getPath() + "!/META-INF/services/java.sql.Driver"); - try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()))) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), UTF_8))) { String jdbcDriverCls = reader.readLine(); res.add(new JdbcDriver(file.getName(), jdbcDriverCls)); @@ -209,7 +209,7 @@ public class DatabaseListener { * @param jdbcUrl JDBC URL. * @param jdbcInfo Properties to connect to database. * @return Collection of schema names. - * @throws SQLException If failed to load schemas. + * @throws SQLException If failed to collect schemas. */ protected Collection schemas(String jdbcDriverJarPath, String jdbcDriverCls, String jdbcUrl, Properties jdbcInfo) throws SQLException { http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DemoListener.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DemoListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DemoListener.java new file mode 100644 index 0000000..c496817 --- /dev/null +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/DemoListener.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.console.agent.handlers; + +import io.socket.client.Ack; +import io.socket.client.Socket; +import io.socket.emitter.Emitter; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.console.agent.rest.RestExecutor; +import org.apache.ignite.console.agent.rest.RestResult; +import org.apache.ignite.console.demo.AgentClusterDemo; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.ignite.console.agent.AgentUtils.safeCallback; +import static org.apache.ignite.console.agent.AgentUtils.toJSON; + +/** + * + */ +public class DemoListener { + /** */ + private static final String EVENT_DEMO_TOPOLOGY = "demo:topology"; + + /** Default timeout. */ + private static final long DFLT_TIMEOUT = 3000L; + + /** */ + private static final Logger log = LoggerFactory.getLogger(DemoListener.class); + + /** */ + private static final ScheduledExecutorService pool = Executors.newScheduledThreadPool(2); + + /** */ + private ScheduledFuture refreshTask; + + /** */ + private Socket client; + + /** */ + private RestExecutor restExecutor; + + /** + * @param client Client. + * @param restExecutor Client. + */ + public DemoListener(Socket client, RestExecutor restExecutor) { + this.client = client; + this.restExecutor = restExecutor; + } + + /** + * Start broadcast topology to server-side. + */ + public Emitter.Listener start() { + return new Emitter.Listener() { + @Override public void call(final Object... args) { + final Ack demoStartCb = safeCallback(args); + + final long timeout = args.length > 1 && args[1] instanceof Long ? (long)args[1] : DFLT_TIMEOUT; + + if (refreshTask != null) + refreshTask.cancel(true); + + final CountDownLatch latch = AgentClusterDemo.tryStart(); + + pool.schedule(new Runnable() { + @Override public void run() { + try { + U.await(latch); + + demoStartCb.call(); + + refreshTask = pool.scheduleWithFixedDelay(new Runnable() { + @Override public void run() { + try { + RestResult top = restExecutor.topology(true, true); + + client.emit(EVENT_DEMO_TOPOLOGY, toJSON(top)); + } + catch (IOException e) { + log.info("Lost connection to the demo cluster", e); + + stop().call(); // TODO WTF???? + } + } + }, 0L, timeout, TimeUnit.MILLISECONDS); + } + catch (Exception e) { + demoStartCb.call(e); + } + } + }, 0, TimeUnit.MILLISECONDS); + } + }; + } + + /** + * Stop broadcast topology to server-side. + */ + public Emitter.Listener stop() { + return new Emitter.Listener() { + @Override public void call(Object... args) { + refreshTask.cancel(true); + + AgentClusterDemo.stop(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java index fcacc88..2588e8e 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/handlers/RestListener.java @@ -17,93 +17,32 @@ package org.apache.ignite.console.agent.handlers; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.ConnectException; -import java.net.URISyntaxException; -import java.nio.charset.Charset; -import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.apache.commons.codec.Charsets; -import org.apache.http.Header; -import org.apache.http.NameValuePair; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; -import org.apache.ignite.console.agent.AgentConfiguration; -import org.apache.ignite.console.demo.AgentClusterDemo; -import org.apache.log4j.Logger; - -import static org.apache.ignite.console.agent.AgentConfiguration.DFLT_NODE_PORT; +import org.apache.ignite.console.agent.rest.RestExecutor; /** * API to translate REST requests to Ignite cluster. */ public class RestListener extends AbstractListener { /** */ - private static final Logger log = Logger.getLogger(RestListener.class.getName()); - - /** */ - private final AgentConfiguration cfg; - - /** */ - private CloseableHttpClient httpClient; + private final RestExecutor restExecutor; /** - * @param cfg Config. + * @param restExecutor Config. */ - public RestListener(AgentConfiguration cfg) { - super(); - - this.cfg = cfg; - - // Create a connection manager with custom configuration. - PoolingHttpClientConnectionManager connMgr = new PoolingHttpClientConnectionManager(); - - connMgr.setDefaultMaxPerRoute(Integer.MAX_VALUE); - connMgr.setMaxTotal(Integer.MAX_VALUE); - - httpClient = HttpClientBuilder.create().setConnectionManager(connMgr).build(); - } - - /** {@inheritDoc} */ - @Override public void stop() { - super.stop(); - - if (httpClient != null) { - try { - httpClient.close(); - } - catch (IOException ignore) { - // No-op. - } - } - } - - /** {@inheritDoc} */ - @Override protected ExecutorService newThreadPool() { - return Executors.newCachedThreadPool(); + public RestListener(RestExecutor restExecutor) { + this.restExecutor = restExecutor; } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") @Override public Object execute(Map args) throws Exception { if (log.isDebugEnabled()) log.debug("Start parse REST command args: " + args); - String uri = null; + String path = null; if (args.containsKey("uri")) - uri = args.get("uri").toString(); + path = args.get("uri").toString(); Map params = null; @@ -130,158 +69,6 @@ public class RestListener extends AbstractListener { if (args.containsKey("body")) body = args.get("body").toString(); - return executeRest(uri, params, demo, mtd, headers, body); - } - - /** - * @param uri Url. - * @param params Params. - * @param demo Use demo node. - * @param mtd Method. - * @param headers Headers. - * @param body Body. - */ - protected RestResult executeRest(String uri, Map params, boolean demo, - String mtd, Map headers, String body) throws IOException, URISyntaxException { - if (log.isDebugEnabled()) - log.debug("Start execute REST command [method=" + mtd + ", uri=/" + (uri == null ? "" : uri) + - ", parameters=" + params + "]"); - - final URIBuilder builder; - - if (demo) { - // try start demo if needed. - AgentClusterDemo.testDrive(cfg); - - // null if demo node not started yet. - if (cfg.demoNodeUri() == null) - return RestResult.fail("Demo node is not started yet.", 404); - - builder = new URIBuilder(cfg.demoNodeUri()); - } - else - builder = new URIBuilder(cfg.nodeUri()); - - if (builder.getPort() == -1) - builder.setPort(DFLT_NODE_PORT); - - if (uri != null) { - if (!uri.startsWith("/") && !cfg.nodeUri().endsWith("/")) - uri = '/' + uri; - - builder.setPath(uri); - } - - if (params != null) { - for (Map.Entry entry : params.entrySet()) { - if (entry.getValue() != null) - builder.addParameter(entry.getKey(), entry.getValue().toString()); - } - } - - HttpRequestBase httpReq = null; - - try { - if ("GET".equalsIgnoreCase(mtd)) - httpReq = new HttpGet(builder.build()); - else if ("POST".equalsIgnoreCase(mtd)) { - HttpPost post; - - if (body == null) { - List nvps = builder.getQueryParams(); - - builder.clearParameters(); - - post = new HttpPost(builder.build()); - - if (!nvps.isEmpty()) - post.setEntity(new UrlEncodedFormEntity(nvps)); - } - else { - post = new HttpPost(builder.build()); - - post.setEntity(new StringEntity(body)); - } - - httpReq = post; - } - else - throw new IOException("Unknown HTTP-method: " + mtd); - - if (headers != null) { - for (Map.Entry entry : headers.entrySet()) - httpReq.addHeader(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString()); - } - - try (CloseableHttpResponse resp = httpClient.execute(httpReq)) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - resp.getEntity().writeTo(out); - - Charset charset = Charsets.UTF_8; - - Header encodingHdr = resp.getEntity().getContentEncoding(); - - if (encodingHdr != null) { - String encoding = encodingHdr.getValue(); - - charset = Charsets.toCharset(encoding); - } - - return RestResult.success(resp.getStatusLine().getStatusCode(), new String(out.toByteArray(), charset)); - } - catch (ConnectException e) { - log.info("Failed connect to node and execute REST command [uri=" + builder.build() + "]"); - - return RestResult.fail("Failed connect to node and execute REST command.", 404); - } - } - finally { - if (httpReq != null) - httpReq.reset(); - } - } - - /** - * Request result. - */ - public static class RestResult { - /** The field contains description of error if server could not handle the request. */ - public final String error; - - /** REST http code. */ - public final int code; - - /** The field contains result of command. */ - public final String data; - - /** - * @param error The field contains description of error if server could not handle the request. - * @param resCode REST http code. - * @param res The field contains result of command. - */ - private RestResult(String error, int resCode, String res) { - this.error = error; - this.code = resCode; - this.data = res; - } - - /** - * @param error The field contains description of error if server could not handle the request. - * @param restCode REST http code. - * @return Request result. - */ - public static RestResult fail(String error, int restCode) { - return new RestResult(error, restCode, null); - } - - /** - * @param code REST http code. - * @param data The field contains result of command. - * @return Request result. - */ - public static RestResult success(int code, String data) { - return new RestResult(null, code, data); - } + return restExecutor.execute(demo, path, params, mtd, headers, body); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java new file mode 100644 index 0000000..d651aca --- /dev/null +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestExecutor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.console.agent.rest; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.net.ConnectException; +import java.util.HashMap; +import java.util.Map; +import okhttp3.FormBody; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.apache.ignite.console.demo.*; +import org.apache.ignite.internal.processors.rest.protocols.http.jetty.GridJettyObjectMapper; +import org.apache.log4j.Logger; + +import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_AUTH_FAILED; +import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_FAILED; +import static org.apache.ignite.internal.processors.rest.GridRestResponse.STATUS_SUCCESS; + +/** + * + */ +public class RestExecutor { + /** */ + private static final Logger log = Logger.getLogger(RestExecutor.class); + + /** JSON object mapper. */ + private static final ObjectMapper mapper = new GridJettyObjectMapper(); + + /** */ + private final OkHttpClient httpClient; + + /** Node URL. */ + private String nodeUrl; + + /** + * Default constructor. + */ + public RestExecutor(String nodeUrl) { + this.nodeUrl = nodeUrl; + + httpClient = new OkHttpClient.Builder().build(); + } + + /** + * Stop HTTP client. + */ + public void stop() { + if (httpClient != null) + httpClient.dispatcher().executorService().shutdown(); + } + + /** */ + private RestResult sendRequest(boolean demo, String path, Map params, + String mtd, Map headers, String body) throws IOException { + if (demo && AgentClusterDemo.getDemoUrl() == null) { + try { + AgentClusterDemo.tryStart().await(); + } + catch (InterruptedException ignore) { + throw new IllegalStateException("Failed to execute request because of embedded node for demo mode is not started yet."); + } + } + + String url = demo ? AgentClusterDemo.getDemoUrl() : nodeUrl; + + HttpUrl.Builder urlBuilder = HttpUrl.parse(url) + .newBuilder(); + + if (path != null) + urlBuilder.addPathSegment(path); + + final Request.Builder reqBuilder = new Request.Builder(); + + if (headers != null) { + for (Map.Entry entry : headers.entrySet()) + if (entry.getValue() != null) + reqBuilder.addHeader(entry.getKey(), entry.getValue().toString()); + } + + if ("GET".equalsIgnoreCase(mtd)) { + if (params != null) { + for (Map.Entry entry : params.entrySet()) { + if (entry.getValue() != null) + urlBuilder.addQueryParameter(entry.getKey(), entry.getValue().toString()); + } + } + } + else if ("POST".equalsIgnoreCase(mtd)) { + if (body != null) { + MediaType contentType = MediaType.parse("text/plain"); + + reqBuilder.post(RequestBody.create(contentType, body)); + } + else { + FormBody.Builder formBody = new FormBody.Builder(); + + if (params != null) { + for (Map.Entry entry : params.entrySet()) { + if (entry.getValue() != null) + formBody.add(entry.getKey(), entry.getValue().toString()); + } + } + + reqBuilder.post(formBody.build()); + } + } + else + throw new IllegalArgumentException("Unknown HTTP-method: " + mtd); + + reqBuilder.url(urlBuilder.build()); + + try (Response resp = httpClient.newCall(reqBuilder.build()).execute()) { + String content = resp.body().string(); + + if (resp.isSuccessful()) { + JsonNode node = mapper.readTree(content); + + int status = node.get("successStatus").asInt(); + + switch (status) { + case STATUS_SUCCESS: + return RestResult.success(node.get("response").toString()); + + default: + return RestResult.fail(status, node.get("error").asText()); + } + } + + if (resp.code() == 401) + return RestResult.fail(STATUS_AUTH_FAILED, "Failed to authenticate in grid. Please check agent\'s login and password or node port."); + + return RestResult.fail(STATUS_FAILED, "Failed connect to node and execute REST command."); + } + catch (ConnectException ignore) { + throw new ConnectException("Failed connect to node and execute REST command [url=" + urlBuilder + "]"); + } + } + + /** + * @param demo Is demo node request. + * @param path Path segment. + * @param params Params. + * @param mtd Method. + * @param headers Headers. + * @param body Body. + */ + public RestResult execute(boolean demo, String path, Map params, + String mtd, Map headers, String body) { + log.debug("Start execute REST command [method=" + mtd + ", uri=/" + (path == null ? "" : path) + + ", parameters=" + params + "]"); + + try { + return sendRequest(demo, path, params, mtd, headers, body); + } + catch (Exception e) { + log.info("Failed to execute REST command [method=" + mtd + ", uri=/" + (path == null ? "" : path) + + ", parameters=" + params + "]", e); + + return RestResult.fail(404, e.getMessage()); + } + } + + /** + * @param demo Is demo node request. + */ + public RestResult topology(boolean demo, boolean full) throws IOException { + Map params = new HashMap<>(3); + + params.put("cmd", "top"); + params.put("attr", full); + params.put("mtr", full); + + return sendRequest(demo, "ignite", params, "GET", null, null); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestResult.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestResult.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestResult.java new file mode 100644 index 0000000..5beeee7 --- /dev/null +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/agent/rest/RestResult.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.console.agent.rest; + +/** + * Request result. + */ +public class RestResult { + /** REST http code. */ + private final int status; + + /** The field contains description of error if server could not handle the request. */ + private final String error; + + /** The field contains result of command. */ + private final String data; + + /** + * @param status REST http code. + * @param error The field contains description of error if server could not handle the request. + * @param data The field contains result of command. + */ + private RestResult(int status, String error, String data) { + this.status = status; + this.error = error; + this.data = data; + } + + /** + * @param status REST http code. + * @param error The field contains description of error if server could not handle the request. + * @return Request result. + */ + public static RestResult fail(int status, String error) { + return new RestResult(status, error, null); + } + + /** + * @param data The field contains result of command. + * @return Request result. + */ + public static RestResult success(String data) { + return new RestResult(0, null, data); + } + + /** + * @return REST http code. + */ + public int getStatus() { + return status; + } + + /** + * @return The field contains description of error if server could not handle the request. + */ + public String getError() { + return error; + } + + /** + * @return The field contains result of command. + */ + public String getData() { + return data; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java index 3bd0b5a..776e407 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentClusterDemo.java @@ -19,36 +19,38 @@ package org.apache.ignite.console.demo; import java.util.Collection; import java.util.Collections; -import java.util.concurrent.Executors; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.console.agent.AgentConfiguration; import org.apache.ignite.console.demo.service.DemoCachesLoadService; import org.apache.ignite.console.demo.service.DemoRandomCacheLoadService; -import org.apache.ignite.console.demo.service.DemoServiceMultipleInstances; import org.apache.ignite.console.demo.service.DemoServiceClusterSingleton; import org.apache.ignite.console.demo.service.DemoServiceKeyAffinity; +import org.apache.ignite.console.demo.service.DemoServiceMultipleInstances; import org.apache.ignite.console.demo.service.DemoServiceNodeSingleton; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.logger.log4j.Log4JLogger; +import org.apache.ignite.logger.slf4j.Slf4jLogger; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; -import org.apache.log4j.Logger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER; import static org.apache.ignite.IgniteSystemProperties.IGNITE_JETTY_PORT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_QUIET; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER; +import static org.apache.ignite.console.demo.AgentDemoUtils.newScheduledThreadPool; import static org.apache.ignite.events.EventType.EVTS_DISCOVERY; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_JETTY_ADDRS; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_JETTY_PORT; @@ -60,21 +62,28 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_REST_JETTY_PO */ public class AgentClusterDemo { /** */ - private static final Logger log = Logger.getLogger(AgentClusterDemo.class.getName()); + private static final Logger log = LoggerFactory.getLogger(AgentClusterDemo.class); + + /** */ + private static final AtomicBoolean initGuard = new AtomicBoolean(); + + /** */ + private static CountDownLatch initLatch = new CountDownLatch(1); /** */ - private static final AtomicBoolean initLatch = new AtomicBoolean(); + private static volatile String demoUrl; /** */ private static final int NODE_CNT = 3; /** * Configure node. + * @param basePort Base port. * @param gridIdx Ignite instance name index. * @param client If {@code true} then start client node. * @return IgniteConfiguration */ - private static IgniteConfiguration igniteConfiguration(int gridIdx, boolean client) { + private static IgniteConfiguration igniteConfiguration(int basePort, int gridIdx, boolean client) { IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setIgniteInstanceName((client ? "demo-client-" : "demo-server-" ) + gridIdx); @@ -82,14 +91,20 @@ public class AgentClusterDemo { cfg.setEventStorageSpi(new MemoryEventStorageSpi()); cfg.setIncludeEventTypes(EVTS_DISCOVERY); + cfg.getConnectorConfiguration().setPort(basePort); + + System.setProperty(IGNITE_JETTY_PORT, String.valueOf(basePort + 10)); + TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(); - ipFinder.setAddresses(Collections.singletonList("127.0.0.1:60900.." + (60900 + NODE_CNT - 1))); + int discoPort = basePort + 20; + + ipFinder.setAddresses(Collections.singletonList("127.0.0.1:" + discoPort + ".." + (discoPort + NODE_CNT - 1))); // Configure discovery SPI. TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setLocalPort(60900); + discoSpi.setLocalPort(discoPort); discoSpi.setIpFinder(ipFinder); cfg.setDiscoverySpi(discoSpi); @@ -97,12 +112,15 @@ public class AgentClusterDemo { TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); commSpi.setSharedMemoryPort(-1); - commSpi.setLocalPort(60800); + commSpi.setMessageQueueLimit(10); + + int commPort = basePort + 30; + + commSpi.setLocalPort(commPort); cfg.setCommunicationSpi(commSpi); - cfg.setGridLogger(new Log4JLogger(log)); + cfg.setGridLogger(new Slf4jLogger(log)); cfg.setMetricsLogFrequency(0); - cfg.getConnectorConfiguration().setPort(60700); if (client) cfg.setClientMode(true); @@ -113,88 +131,107 @@ public class AgentClusterDemo { /** * Starts read and write from cache in background. * - * @param ignite Ignite. - * @param cnt - maximum count read/write key + * @param services Distributed services on the grid. */ - private static void startLoad(final Ignite ignite, final int cnt) { - ignite.services().deployClusterSingleton("Demo caches load service", new DemoCachesLoadService(cnt)); - ignite.services().deployNodeSingleton("RandomCache load service", new DemoRandomCacheLoadService(cnt)); + private static void deployServices(IgniteServices services) { + services.deployMultiple("Demo service: Multiple instances", new DemoServiceMultipleInstances(), 7, 3); + services.deployNodeSingleton("Demo service: Node singleton", new DemoServiceNodeSingleton()); + services.deployClusterSingleton("Demo service: Cluster singleton", new DemoServiceClusterSingleton()); + services.deployKeyAffinitySingleton("Demo service: Key affinity singleton", + new DemoServiceKeyAffinity(), DemoCachesLoadService.CAR_CACHE_NAME, "id"); + + services.deployClusterSingleton("Demo caches load service", new DemoCachesLoadService(20)); + services.deployNodeSingleton("RandomCache load service", new DemoRandomCacheLoadService(20)); + } + + /** */ + public static String getDemoUrl() { + return demoUrl; } /** * Start ignite node with cacheEmployee and populate it with data. */ - public static boolean testDrive(AgentConfiguration acfg) { - if (initLatch.compareAndSet(false, true)) { + public static CountDownLatch tryStart() { + if (initGuard.compareAndSet(false, true)) { log.info("DEMO: Starting embedded nodes for demo..."); + System.setProperty(IGNITE_NO_ASCII, "true"); + System.setProperty(IGNITE_QUIET, "false"); + System.setProperty(IGNITE_UPDATE_NOTIFIER, "false"); + System.setProperty(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "1"); System.setProperty(IGNITE_PERFORMANCE_SUGGESTIONS_DISABLED, "true"); - System.setProperty(IGNITE_UPDATE_NOTIFIER, "false"); - System.setProperty(IGNITE_JETTY_PORT, "60800"); - System.setProperty(IGNITE_NO_ASCII, "true"); + final AtomicInteger basePort = new AtomicInteger(60700); + final AtomicInteger cnt = new AtomicInteger(-1); - try { - IgniteEx ignite = (IgniteEx)Ignition.start(igniteConfiguration(0, false)); + final ScheduledExecutorService execSrv = newScheduledThreadPool(1, "demo-nodes-start"); - final AtomicInteger cnt = new AtomicInteger(0); + execSrv.scheduleAtFixedRate(new Runnable() { + @Override public void run() { + int idx = cnt.incrementAndGet(); + int port = basePort.get(); - final ScheduledExecutorService execSrv = Executors.newSingleThreadScheduledExecutor(); + try { + IgniteEx ignite = (IgniteEx)Ignition.start(igniteConfiguration(port, idx, idx == NODE_CNT)); - execSrv.scheduleAtFixedRate(new Runnable() { - @Override public void run() { - int idx = cnt.incrementAndGet(); + if (idx == 0) { + Collection jettyAddrs = ignite.localNode().attribute(ATTR_REST_JETTY_ADDRS); - try { - Ignition.start(igniteConfiguration(idx, idx == NODE_CNT)); - } - catch (Throwable e) { - log.error("DEMO: Failed to start embedded node: " + e.getMessage()); - } - finally { - if (idx == NODE_CNT) - execSrv.shutdown(); - } - } - }, 10, 10, TimeUnit.SECONDS); + if (jettyAddrs == null) { + ignite.cluster().stopNodes(); + + throw new IgniteException("DEMO: Failed to start Jetty REST server on embedded node"); + } - IgniteServices services = ignite.services(); + String jettyHost = jettyAddrs.iterator().next(); - services.deployMultiple("Demo service: Multiple instances", new DemoServiceMultipleInstances(), 7, 3); - services.deployNodeSingleton("Demo service: Node singleton", new DemoServiceNodeSingleton()); - services.deployClusterSingleton("Demo service: Cluster singleton", new DemoServiceClusterSingleton()); - services.deployKeyAffinitySingleton("Demo service: Key affinity singleton", - new DemoServiceKeyAffinity(), DemoCachesLoadService.CAR_CACHE_NAME, "id"); + Integer jettyPort = ignite.localNode().attribute(ATTR_REST_JETTY_PORT); - if (log.isDebugEnabled()) - log.debug("DEMO: Started embedded nodes with indexed enabled caches..."); + if (F.isEmpty(jettyHost) || jettyPort == null) + throw new IgniteException("DEMO: Failed to start Jetty REST handler on embedded node"); - Collection jettyAddrs = ignite.localNode().attribute(ATTR_REST_JETTY_ADDRS); + log.info("DEMO: Started embedded node for demo purpose [TCP binary port={}, Jetty REST port={}]", port, jettyPort); - String host = jettyAddrs == null ? null : jettyAddrs.iterator().next(); + demoUrl = String.format("http://%s:%d", jettyHost, jettyPort); - Integer port = ignite.localNode().attribute(ATTR_REST_JETTY_PORT); + initLatch.countDown(); - if (F.isEmpty(host) || port == null) { - log.error("DEMO: Failed to start embedded node with rest!"); + deployServices(ignite.services()); + } + } + catch (Throwable e) { + if (idx == 0) { + basePort.getAndAdd(50); - return false; + log.warn("DEMO: Failed to start embedded node.", e); + } + else + log.error("DEMO: Failed to start embedded node.", e); + } + finally { + if (idx == NODE_CNT) { + log.info("DEMO: All embedded nodes for demo successfully started"); + + execSrv.shutdown(); + } + } } + }, 1, 10, TimeUnit.SECONDS); + } - acfg.demoNodeUri(String.format("http://%s:%d", host, port)); + return initLatch; + } - log.info("DEMO: Embedded nodes for sql and monitoring demo successfully started"); + /** */ + public static void stop() { + demoUrl = null; - startLoad(ignite, 20); - } - catch (Exception e) { - log.error("DEMO: Failed to start embedded node for sql and monitoring demo!", e); + Ignition.stopAll(true); - return false; - } - } + initLatch = new CountDownLatch(1); - return true; + initGuard.compareAndSet(true, false); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java index fb34de7..fb5edb2 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/AgentDemoUtils.java @@ -24,7 +24,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** - * Utilites for Agent demo mode. + * Utilities for Agent demo mode. */ public class AgentDemoUtils { /** Counter for threads in pool. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/323e3870/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java ---------------------------------------------------------------------- diff --git a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java index 5f7823b..b21716c 100644 --- a/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java +++ b/modules/web-console/web-agent/src/main/java/org/apache/ignite/console/demo/service/DemoCachesLoadService.java @@ -120,11 +120,9 @@ public class DemoCachesLoadService implements Service { /** {@inheritDoc} */ @Override public void init(ServiceContext ctx) throws Exception { - ignite.createCache(cacheCountry()); - ignite.createCache(cacheDepartment()); - ignite.createCache(cacheEmployee()); - ignite.createCache(cacheCar()); - ignite.createCache(cacheParking()); + ignite.getOrCreateCaches(Arrays.asList( + cacheCountry(), cacheDepartment(), cacheEmployee(), cacheCar(), cacheParking() + )); populateCacheEmployee(); populateCacheCar(); @@ -203,8 +201,8 @@ public class DemoCachesLoadService implements Service { * @param name cache name. * @return Cache configuration with basic properties set. */ - private static CacheConfiguration cacheConfiguration(String name) { - CacheConfiguration ccfg = new CacheConfiguration<>(name); + private static CacheConfiguration cacheConfiguration(String name) { + CacheConfiguration ccfg = new CacheConfiguration<>(name); ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); ccfg.setQueryDetailMetricsSize(10); @@ -218,8 +216,8 @@ public class DemoCachesLoadService implements Service { /** * Configure cacheCountry. */ - private static CacheConfiguration cacheCountry() { - CacheConfiguration ccfg = cacheConfiguration(COUNTRY_CACHE_NAME); + private static CacheConfiguration cacheCountry() { + CacheConfiguration ccfg = cacheConfiguration(COUNTRY_CACHE_NAME); // Configure cacheCountry types. Collection qryEntities = new ArrayList<>(); @@ -249,8 +247,8 @@ public class DemoCachesLoadService implements Service { /** * Configure cacheEmployee. */ - private static CacheConfiguration cacheDepartment() { - CacheConfiguration ccfg = cacheConfiguration(DEPARTMENT_CACHE_NAME); + private static CacheConfiguration cacheDepartment() { + CacheConfiguration ccfg = cacheConfiguration(DEPARTMENT_CACHE_NAME); // Configure cacheDepartment types. Collection qryEntities = new ArrayList<>(); @@ -280,8 +278,8 @@ public class DemoCachesLoadService implements Service { /** * Configure cacheEmployee. */ - private static CacheConfiguration cacheEmployee() { - CacheConfiguration ccfg = cacheConfiguration(EMPLOYEE_CACHE_NAME); + private static CacheConfiguration cacheEmployee() { + CacheConfiguration ccfg = cacheConfiguration(EMPLOYEE_CACHE_NAME); ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); ccfg.setBackups(1); @@ -314,7 +312,6 @@ public class DemoCachesLoadService implements Service { type.setFields(qryFlds); // Indexes for EMPLOYEE. - Collection indexes = new ArrayList<>(); QueryIndex idx = new QueryIndex(); @@ -327,6 +324,8 @@ public class DemoCachesLoadService implements Service { idx.setFields(indFlds); + Collection indexes = new ArrayList<>(); + indexes.add(idx); indexes.add(new QueryIndex("salary", QueryIndexType.SORTED, false, "EMP_SALARY")); @@ -340,8 +339,8 @@ public class DemoCachesLoadService implements Service { /** * Configure cacheEmployee. */ - private static CacheConfiguration cacheParking() { - CacheConfiguration ccfg = cacheConfiguration(PARKING_CACHE_NAME); + private static CacheConfiguration cacheParking() { + CacheConfiguration ccfg = cacheConfiguration(PARKING_CACHE_NAME); // Configure cacheParking types. Collection qryEntities = new ArrayList<>(); @@ -371,8 +370,8 @@ public class DemoCachesLoadService implements Service { /** * Configure cacheEmployee. */ - private static CacheConfiguration cacheCar() { - CacheConfiguration ccfg = cacheConfiguration(CAR_CACHE_NAME); + private static CacheConfiguration cacheCar() { + CacheConfiguration ccfg = cacheConfiguration(CAR_CACHE_NAME); // Configure cacheCar types. Collection qryEntities = new ArrayList<>();