Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E32DA17487 for ; Mon, 2 Feb 2015 03:28:38 +0000 (UTC) Received: (qmail 14486 invoked by uid 500); 2 Feb 2015 03:28:39 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 14412 invoked by uid 500); 2 Feb 2015 03:28:39 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 13998 invoked by uid 99); 2 Feb 2015 03:28:38 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Feb 2015 03:28:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 02 Feb 2015 03:28:20 +0000 Received: (qmail 11977 invoked by uid 99); 2 Feb 2015 03:27:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Feb 2015 03:27:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 53889DFDD6; Mon, 2 Feb 2015 03:27:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vkulichenko@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 02 Feb 2015 03:28:42 -0000 Message-Id: <6c83a3ca5ff2435bb92b875111f5d9f9@git.apache.org> In-Reply-To: <4657e1da43a64cfd9c9d6e68214cfabb@git.apache.org> References: <4657e1da43a64cfd9c9d6e68214cfabb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61 X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java index 0000000,8c07038..c09be9a mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/client/impl/connection/GridClientConnectionManagerAdapter.java @@@ -1,0 -1,763 +1,699 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.client.impl.connection; + + import org.apache.ignite.*; + import org.apache.ignite.logger.java.*; ++import org.apache.ignite.plugin.security.*; + import org.apache.ignite.client.*; + import org.apache.ignite.client.impl.*; + import org.apache.ignite.client.util.*; + import org.apache.ignite.internal.processors.rest.client.message.*; -import org.apache.ignite.plugin.security.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.nio.*; + import org.apache.ignite.internal.util.nio.ssl.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import javax.net.ssl.*; + import java.io.*; + import java.net.*; + import java.nio.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.logging.*; + + import static java.util.logging.Level.*; + import static org.apache.ignite.client.impl.connection.GridClientConnectionCloseReason.*; + import static org.apache.ignite.internal.GridNodeAttributes.*; + + /** + * Cached connections manager. + */ + abstract class GridClientConnectionManagerAdapter implements GridClientConnectionManager { + /** Count of reconnect retries before init considered failed. */ + private static final int INIT_RETRY_CNT = 3; + + /** Initialization retry interval. */ + private static final int INIT_RETRY_INTERVAL = 1000; + + /** Class logger. */ + private final Logger log; + + /** NIO server. */ + private GridNioServer srv; + + /** Active connections. */ + private final ConcurrentMap conns = new ConcurrentHashMap<>(); + + /** Active connections of nodes. */ + private final ConcurrentMap nodeConns = new ConcurrentHashMap<>(); + + /** SSL context. */ + private final SSLContext sslCtx; + + /** Client configuration. */ + protected final GridClientConfiguration cfg; + + /** Topology. */ + private final GridClientTopology top; + + /** Client id. */ + private final UUID clientId; + + /** Router endpoints to use instead of topology info. */ + private final Collection routers; + + /** Closed flag. */ + private volatile boolean closed; + + /** Shared executor service. */ + private final ExecutorService executor; + + /** Endpoint striped lock. */ + private final GridClientStripedLock endpointStripedLock = new GridClientStripedLock(16); + + /** Service for ping requests, {@code null} if HTTP protocol is used. */ + private final ScheduledExecutorService pingExecutor; + + /** Marshaller ID. */ + private final Byte marshId; + - /** Message writer. */ - @SuppressWarnings("FieldCanBeLocal") - private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() { - @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { - assert msg != null; - assert buf != null; - - msg.messageWriter(this, nodeId); - - return msg.writeTo(buf); - } - - @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out, - ByteBuffer buf) throws IOException { - assert msg != null; - assert out != null; - assert buf != null; - assert buf.hasArray(); - - msg.messageWriter(this, nodeId); - - boolean finished = false; - int cnt = 0; - - while (!finished) { - finished = msg.writeTo(buf); - - out.write(buf.array(), 0, buf.position()); - - cnt += buf.position(); - - buf.clear(); - } - - return cnt; - } - }; - + /** + * @param clientId Client ID. + * @param sslCtx SSL context to enable secured connection or {@code null} to use unsecured one. + * @param cfg Client configuration. + * @param routers Routers or empty collection to use endpoints from topology info. + * @param top Topology. + * @param marshId Marshaller ID. + * @throws GridClientException In case of error. + */ + @SuppressWarnings("unchecked") + protected GridClientConnectionManagerAdapter(UUID clientId, + SSLContext sslCtx, + GridClientConfiguration cfg, + Collection routers, + GridClientTopology top, + @Nullable Byte marshId) + throws GridClientException { + assert clientId != null : "clientId != null"; + assert cfg != null : "cfg != null"; + assert routers != null : "routers != null"; + assert top != null : "top != null"; + + this.clientId = clientId; + this.sslCtx = sslCtx; + this.cfg = cfg; + this.routers = new ArrayList<>(routers); + this.top = top; + + log = Logger.getLogger(getClass().getName()); + + executor = cfg.getExecutorService() != null ? cfg.getExecutorService() : + Executors.newCachedThreadPool(new GridClientThreadFactory("exec", true)); + + pingExecutor = cfg.getProtocol() == GridClientProtocol.TCP ? Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors(), new GridClientThreadFactory("exec", true)) : null; + + this.marshId = marshId; + + if (marshId == null && cfg.getMarshaller() == null) + throw new GridClientException("Failed to start client (marshaller is not configured)."); + + if (cfg.getProtocol() == GridClientProtocol.TCP) { + try { + IgniteLogger gridLog = new IgniteJavaLogger(false); + + GridNioFilter[] filters; + - GridNioMessageReader msgReader = new GridNioMessageReader() { - @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, - ByteBuffer buf) { - assert msg != null; - assert buf != null; - - msg.messageReader(this, nodeId); - - return msg.readFrom(buf); - } - - @Nullable @Override public GridTcpMessageFactory messageFactory() { - return null; - } - }; - - GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(msgReader), gridLog, true); ++ GridNioFilter codecFilter = new GridNioCodecFilter(new NioParser(), gridLog, true); + + if (sslCtx != null) { + GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, gridLog); + + sslFilter.directMode(true); + sslFilter.clientMode(true); + + filters = new GridNioFilter[]{codecFilter, sslFilter}; + } + else + filters = new GridNioFilter[]{codecFilter}; + + srv = GridNioServer.builder().address(U.getLocalHost()) + .port(-1) + .listener(new NioListener(log)) + .filters(filters) + .logger(gridLog) + .selectorCount(Runtime.getRuntime().availableProcessors()) + .sendQueueLimit(1024) + .byteOrder(ByteOrder.nativeOrder()) + .tcpNoDelay(cfg.isTcpNoDelay()) + .directBuffer(true) + .directMode(true) + .socketReceiveBufferSize(0) + .socketSendBufferSize(0) + .idleTimeout(Long.MAX_VALUE) + .gridName("gridClient") - .messageWriter(msgWriter) + .daemon(cfg.isDaemon()) + .build(); + + srv.start(); + } + catch (IOException | IgniteCheckedException e) { + throw new GridClientException("Failed to start connection server.", e); + } + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("BusyWait") + @Override public void init(Collection srvs) throws GridClientException, InterruptedException { + init0(); + + GridClientException firstEx = null; + + for (int i = 0; i < INIT_RETRY_CNT; i++) { + Collection srvsCp = new ArrayList<>(srvs); + + while (!srvsCp.isEmpty()) { + GridClientConnection conn = null; + + try { + conn = connect(null, srvsCp); + + conn.topology(cfg.isAutoFetchAttributes(), cfg.isAutoFetchMetrics(), null).get(); + + return; + } + catch (GridServerUnreachableException e) { + // No connection could be opened to any of initial addresses - exit to retry loop. + assert conn == null : + "GridClientConnectionResetException was thrown from GridClientConnection#topology"; + + if (firstEx == null) + firstEx = e; + + break; + } + catch (GridClientConnectionResetException e) { + // Connection was established but topology update failed - + // trying other initial addresses if any. + assert conn != null : "GridClientConnectionResetException was thrown from connect()"; + + if (firstEx == null) + firstEx = e; + + if (!srvsCp.remove(conn.serverAddress())) + // We have misbehaving collection or equals - just exit to avoid infinite loop. + break; + } + } + + Thread.sleep(INIT_RETRY_INTERVAL); + } + + for (GridClientConnection c : conns.values()) { + conns.remove(c.serverAddress(), c); + + c.close(FAILED, false); + } + + throw firstEx; + } + + /** + * Additional initialization. + * + * @throws GridClientException In case of error. + */ + protected abstract void init0() throws GridClientException; + + /** + * Gets active communication facade. + * + * @param node Remote node to which connection should be established. + * @throws GridServerUnreachableException If none of the servers can be reached after the exception. + * @throws GridClientClosedException If client was closed manually. + * @throws InterruptedException If connection was interrupted. + */ + @Override public GridClientConnection connection(GridClientNode node) + throws GridClientClosedException, GridServerUnreachableException, InterruptedException { + assert node != null; + + // Use router's connections if defined. + if (!routers.isEmpty()) + return connection(null, routers); + + GridClientConnection conn = nodeConns.get(node.nodeId()); + + if (conn != null) { + // Ignore closed connections. + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) + closeIdle(); + else + return conn; + } + + // Use node's connection, if node is available over rest. + Collection endpoints = node.availableAddresses(cfg.getProtocol(), true); + + List resolvedEndpoints = new ArrayList<>(endpoints.size()); + + for (InetSocketAddress endpoint : endpoints) + if (!endpoint.isUnresolved()) + resolvedEndpoints.add(endpoint); + + if (resolvedEndpoints.isEmpty()) { + throw new GridServerUnreachableException("No available endpoints to connect " + + "(is rest enabled for this node?): " + node); + } + + boolean sameHost = node.attributes().isEmpty() || + F.containsAny(U.allLocalMACs(), node.attribute(ATTR_MACS).toString().split(", ")); + + Collection srvs = new LinkedHashSet<>(); + + if (sameHost) { + Collections.sort(resolvedEndpoints, U.inetAddressesComparator(true)); + + srvs.addAll(resolvedEndpoints); + } + else { + for (InetSocketAddress endpoint : resolvedEndpoints) + if (!endpoint.getAddress().isLoopbackAddress()) + srvs.add(endpoint); + } + + return connection(node.nodeId(), srvs); + } + + /** + * Returns connection to one of the given addresses. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param srvs Collection of addresses to connect to. + * @return Connection to use for operations, targeted for the given node. + * @throws GridServerUnreachableException If connection can't be established. + * @throws GridClientClosedException If connections manager has been closed already. + * @throws InterruptedException If connection was interrupted. + */ + public GridClientConnection connection(@Nullable UUID nodeId, Collection srvs) + throws GridServerUnreachableException, GridClientClosedException, InterruptedException { + if (srvs == null || srvs.isEmpty()) + throw new GridServerUnreachableException("Failed to establish connection to the grid" + + " (address list is empty)."); + + checkClosed(); + + // Search for existent connection. + for (InetSocketAddress endPoint : srvs) { + assert endPoint != null; + + GridClientConnection conn = conns.get(endPoint); + + if (conn == null) + continue; + + // Ignore closed connections. + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) { + closeIdle(); + + continue; + } + + if (nodeId != null) + nodeConns.put(nodeId, conn); + + return conn; + } + + return connect(nodeId, srvs); + } + + /** + * Creates a connected facade and returns it. Called either from constructor or inside + * a write lock. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param srvs List of server addresses that this method will try to connect to. + * @return Established connection. + * @throws GridServerUnreachableException If none of the servers can be reached. + * @throws InterruptedException If connection was interrupted. + */ + protected GridClientConnection connect(@Nullable UUID nodeId, Collection srvs) + throws GridServerUnreachableException, InterruptedException { + if (srvs.isEmpty()) + throw new GridServerUnreachableException("Failed to establish connection to the grid node (address " + + "list is empty)."); + + Exception cause = null; + + for (InetSocketAddress srv : srvs) { + try { + return connect(nodeId, srv); + } + catch (InterruptedException e) { + throw e; + } + catch (Exception e) { + if (cause == null) + cause = e; + else if (log.isLoggable(INFO)) + log.info("Unable to connect to grid node [srvAddr=" + srv + ", msg=" + e.getMessage() + ']'); + } + } + + assert cause != null; + + throw new GridServerUnreachableException("Failed to connect to any of the servers in list: " + srvs, cause); + } + + /** + * Create new connection to specified server. + * + * @param nodeId {@code UUID} of node for mapping with connection. + * {@code null} if no need of mapping. + * @param addr Remote socket to connect. + * @return Established connection. + * @throws IOException If connection failed. + * @throws GridClientException If protocol error happened. + * @throws InterruptedException If thread was interrupted before connection was established. + */ + protected GridClientConnection connect(@Nullable UUID nodeId, InetSocketAddress addr) + throws IOException, GridClientException, InterruptedException { + endpointStripedLock.lock(addr); + + try { + GridClientConnection old = conns.get(addr); + + if (old != null) { + if (old.isClosed()) { + conns.remove(addr, old); + + if (nodeId != null) + nodeConns.remove(nodeId, old); + } + else { + if (nodeId != null) + nodeConns.put(nodeId, old); + + return old; + } + } + + GridSecurityCredentials cred = null; + + try { + if (cfg.getSecurityCredentialsProvider() != null) + cred = cfg.getSecurityCredentialsProvider().credentials(); + } + catch (IgniteCheckedException e) { + throw new GridClientException("Failed to obtain client credentials.", e); + } + + GridClientConnection conn; + + if (cfg.getProtocol() == GridClientProtocol.TCP) { + conn = new GridClientNioTcpConnection(srv, clientId, addr, sslCtx, pingExecutor, + cfg.getConnectTimeout(), cfg.getPingInterval(), cfg.getPingTimeout(), + cfg.isTcpNoDelay(), cfg.getMarshaller(), marshId, top, cred, keepPortablesThreadLocal()); + } + else + throw new GridServerUnreachableException("Failed to create client (protocol is not supported): " + + cfg.getProtocol()); + + old = conns.putIfAbsent(addr, conn); + + assert old == null; + + if (nodeId != null) + nodeConns.put(nodeId, conn); + + return conn; + } + finally { + endpointStripedLock.unlock(addr); + } + } + + /** + * @return Get thread local used to enable keep portables mode. + */ + protected ThreadLocal keepPortablesThreadLocal() { + return null; + } + + /** {@inheritDoc} */ + @Override public void terminateConnection(GridClientConnection conn, GridClientNode node, Throwable e) { + if (log.isLoggable(Level.FINE)) + log.fine("Connection with remote node was terminated [node=" + node + ", srvAddr=" + + conn.serverAddress() + ", errMsg=" + e.getMessage() + ']'); + + closeIdle(); + + conn.close(FAILED, false); + } + + /** + * Closes all opened connections. + * + * @param waitCompletion If {@code true} waits for all pending requests to be proceeded. + */ + @SuppressWarnings("TooBroadScope") + @Override public void stop(boolean waitCompletion) { + Collection closeConns; + + if (closed) + return; + + // Mark manager as closed. + closed = true; + + // Remove all connections from cache. + closeConns = new ArrayList<>(conns.values()); + + conns.clear(); + + nodeConns.clear(); + + // Close old connection outside the writer lock. + for (GridClientConnection conn : closeConns) + conn.close(CLIENT_CLOSED, waitCompletion); + + if (pingExecutor != null) + GridClientUtils.shutdownNow(GridClientConnectionManager.class, pingExecutor, log); + + GridClientUtils.shutdownNow(GridClientConnectionManager.class, executor, log); + + if (srv != null) + srv.stop(); + } + + /** + * Close all connections idling for more then + * {@link GridClientConfiguration#getMaxConnectionIdleTime()} milliseconds. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private void closeIdle() { + for (Iterator> it = nodeConns.entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = it.next(); + + GridClientConnection conn = entry.getValue(); + + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) { + conns.remove(conn.serverAddress(), conn); + + nodeConns.remove(entry.getKey(), conn); + } + } + + for (GridClientConnection conn : conns.values()) + if (conn.closeIfIdle(cfg.getMaxConnectionIdleTime())) + conns.remove(conn.serverAddress(), conn); + } + + /** + * Checks and throws an exception if this client was closed. + * + * @throws GridClientClosedException If client was closed. + */ + private void checkClosed() throws GridClientClosedException { + if (closed) + throw new GridClientClosedException("Client was closed (no public methods of client can be used anymore)."); + } + + /** + */ + private static class NioListener implements GridNioServerListener { + /** */ + private final Logger log; + + /** + * @param log Logger. + */ + private NioListener(Logger log) { + this.log = log; + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Session connected: " + ses); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (log.isLoggable(Level.FINE)) + log.fine("Session disconnected: " + ses); + + GridClientFutureAdapter handshakeFut = + ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE); + + if (handshakeFut != null) + handshakeFut.onDone( + new GridClientConnectionResetException("Failed to perform handshake (connection failed).")); + else { + GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN); + + if (conn != null) + conn.close(FAILED, false); + } + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, Object msg) { + GridClientFutureAdapter handshakeFut = + ses.removeMeta(GridClientNioTcpConnection.SES_META_HANDSHAKE); + + if (handshakeFut != null) { + assert msg instanceof GridClientHandshakeResponse; + + handleHandshakeResponse(handshakeFut, (GridClientHandshakeResponse)msg); + } + else { + GridClientNioTcpConnection conn = ses.meta(GridClientNioTcpConnection.SES_META_CONN); + + assert conn != null; + + if (msg instanceof GridClientMessageWrapper) { + GridClientMessageWrapper req = (GridClientMessageWrapper)msg; + + if (req.messageSize() != 0) { + assert req.message() != null; + + conn.handleResponse(req); + } + else + conn.handlePingResponse(); + } + else { + assert msg instanceof GridClientPingPacket : msg; + + conn.handlePingResponse(); + } + } + } + + /** + * Handles client handshake response. + * + * @param handshakeFut Future. + * @param msg A handshake response. + */ + private void handleHandshakeResponse(GridClientFutureAdapter handshakeFut, + GridClientHandshakeResponse msg) { + byte rc = msg.resultCode(); + + if (rc != GridClientHandshakeResponse.OK.resultCode()) { + handshakeFut.onDone(new GridClientHandshakeException(rc, + "Handshake failed due to internal error (see server log for more details).")); + } + else + handshakeFut.onDone(true); + } + + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Closing NIO session because of write timeout."); + + ses.close(); + } + + /** {@inheritDoc} */ + @Override public void onSessionIdleTimeout(GridNioSession ses) { + if (log.isLoggable(Level.FINE)) + log.fine("Closing NIO session because of idle timeout."); + + ses.close(); + } + } + + /** + * + */ + private static class NioParser implements GridNioParser { + /** Message metadata key. */ + private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + - /** Message reader. */ - private final GridNioMessageReader msgReader; - - /** - * @param msgReader Message reader. - */ - NioParser(GridNioMessageReader msgReader) { - this.msgReader = msgReader; - } - + /** {@inheritDoc} */ - @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { ++ @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) ++ throws IOException, IgniteCheckedException { + GridClientFutureAdapter handshakeFut = ses.meta(GridClientNioTcpConnection.SES_META_HANDSHAKE); + + if (handshakeFut != null) { + byte code = buf.get(); + + return new GridClientHandshakeResponse(code); + } + + GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY); + + if (msg == null && buf.hasRemaining()) { + byte type = buf.get(); + + if (type == GridClientMessageWrapper.REQ_HEADER) + msg = new GridClientMessageWrapper(); + else + throw new IOException("Invalid message type: " + type); + } + + boolean finished = false; + + if (buf.hasRemaining()) - finished = msgReader.read(null, msg, buf); ++ finished = msg.readFrom(buf); + + if (finished) + return msg; + else { + ses.addMeta(MSG_META_KEY, msg); + + return null; + } + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + // No encoding needed for direct messages. + throw new UnsupportedEncodingException(); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/GridJobCancelRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridJobCancelRequest.java index 0000000,ba2cc86..924f349 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobCancelRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobCancelRequest.java @@@ -1,0 -1,215 +1,211 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal; + + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + + /** + * Job cancellation request. + */ + public class GridJobCancelRequest extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteUuid sesId; + + /** */ + private IgniteUuid jobId; + + /** */ + private boolean sys; + + /** + * No-op constructor to support {@link Externalizable} interface. + * This constructor is not meant to be used for other purposes. + */ + public GridJobCancelRequest() { + // No-op. + } + + /** + * @param sesId Task session ID. + */ + public GridJobCancelRequest(IgniteUuid sesId) { + assert sesId != null; + + this.sesId = sesId; + } + + /** + * @param sesId Task session ID. + * @param jobId Job ID. + */ + public GridJobCancelRequest(@Nullable IgniteUuid sesId, @Nullable IgniteUuid jobId) { + assert sesId != null || jobId != null; + + this.sesId = sesId; + this.jobId = jobId; + } + + /** + * @param sesId Session ID. + * @param jobId Job ID. + * @param sys System flag. + */ + public GridJobCancelRequest(@Nullable IgniteUuid sesId, @Nullable IgniteUuid jobId, boolean sys) { + assert sesId != null || jobId != null; + + this.sesId = sesId; + this.jobId = jobId; + this.sys = sys; + } + + /** + * Gets execution ID of task to be cancelled. + * + * @return Execution ID of task to be cancelled. + */ + @Nullable public IgniteUuid sessionId() { + return sesId; + } + + /** + * Gets session ID of job to be cancelled. If {@code null}, then + * all jobs for the specified task execution ID will be cancelled. + * + * @return Execution ID of job to be cancelled. + */ + @Nullable public IgniteUuid jobId() { + return jobId; + } + + /** + * @return {@code True} if request to cancel is sent out of system when task + * has already been reduced and further results are no longer interesting. + */ + public boolean system() { + return sys; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridJobCancelRequest _clone = new GridJobCancelRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridJobCancelRequest _clone = (GridJobCancelRequest)_msg; + + _clone.sesId = sesId; + _clone.jobId = jobId; + _clone.sys = sys; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putGridUuid(jobId)) ++ if (!commState.putGridUuid("jobId", jobId)) + return false; + + commState.idx++; + + case 1: - if (!commState.putGridUuid(sesId)) ++ if (!commState.putGridUuid("sesId", sesId)) + return false; + + commState.idx++; + + case 2: - if (!commState.putBoolean(sys)) ++ if (!commState.putBoolean("sys", sys)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - IgniteUuid jobId0 = commState.getGridUuid(); ++ jobId = commState.getGridUuid("jobId"); + - if (jobId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - jobId = jobId0; - + commState.idx++; + + case 1: - IgniteUuid sesId0 = commState.getGridUuid(); ++ sesId = commState.getGridUuid("sesId"); + - if (sesId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - sesId = sesId0; - + commState.idx++; + + case 2: - if (buf.remaining() < 1) - return false; ++ sys = commState.getBoolean("sys"); + - sys = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridJobCancelRequest.class, this); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java index 0000000,ae897e6..929f718 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java @@@ -1,0 -1,941 +1,919 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal; + + import org.apache.ignite.compute.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Job execution request. + */ + public class GridJobExecuteRequest extends GridTcpCommunicationMessageAdapter implements GridTaskMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Subject ID. */ + private UUID subjId; + + /** */ + private IgniteUuid sesId; + + /** */ + private IgniteUuid jobId; + + /** */ + @GridToStringExclude + private byte[] jobBytes; + + /** */ + @GridToStringExclude + @GridDirectTransient + private ComputeJob job; + + /** */ + private long startTaskTime; + + /** */ + private long timeout; + + /** */ + private String taskName; + + /** */ + private String userVer; + + /** */ + private String taskClsName; + + /** Node class loader participants. */ + @GridToStringInclude + @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class) + private Map ldrParticipants; + + /** */ + @GridToStringExclude + private byte[] sesAttrsBytes; + + /** */ + @GridToStringExclude + @GridDirectTransient + private Map sesAttrs; + + /** */ + @GridToStringExclude + private byte[] jobAttrsBytes; + + /** */ + @GridToStringExclude + @GridDirectTransient + private Map jobAttrs; + + /** Checkpoint SPI name. */ + private String cpSpi; + + /** */ + @GridDirectTransient + private Collection siblings; + + /** */ + private byte[] siblingsBytes; + + /** Transient since needs to hold local creation time. */ + @GridDirectTransient + private long createTime0 = U.currentTimeMillis(); + + /** @deprecated need to remove and use only {@link #createTime0}. */ + @Deprecated + private long createTime = createTime0; + + /** */ + private IgniteUuid clsLdrId; + + /** */ + private IgniteDeploymentMode depMode; + + /** */ + private boolean dynamicSiblings; + + /** */ + private boolean forceLocDep; + + /** */ + private boolean sesFullSup; + + /** */ + private boolean internal; + + /** */ + @GridDirectCollection(UUID.class) + private Collection top; + + /** + * No-op constructor to support {@link Externalizable} interface. + */ + public GridJobExecuteRequest() { + // No-op. + } + + /** + * @param sesId Task session ID. + * @param jobId Job ID. + * @param taskName Task name. + * @param userVer Code version. + * @param taskClsName Fully qualified task name. + * @param jobBytes Job serialized body. + * @param job Job. + * @param startTaskTime Task execution start time. + * @param timeout Task execution timeout. + * @param top Topology. + * @param siblingsBytes Serialized collection of split siblings. + * @param siblings Collection of split siblings. + * @param sesAttrsBytes Map of session attributes. + * @param sesAttrs Session attributes. + * @param jobAttrsBytes Job context attributes. + * @param jobAttrs Job attributes. + * @param cpSpi Collision SPI. + * @param clsLdrId Task local class loader id. + * @param depMode Task deployment mode. + * @param dynamicSiblings {@code True} if siblings are dynamic. + * @param ldrParticipants Other node class loader IDs that can also load classes. + * @param forceLocDep {@code True} If remote node should ignore deployment settings. + * @param sesFullSup {@code True} if session attributes are disabled. + * @param internal {@code True} if internal job. + * @param subjId Subject ID. + */ + public GridJobExecuteRequest( + IgniteUuid sesId, + IgniteUuid jobId, + String taskName, + String userVer, + String taskClsName, + byte[] jobBytes, + ComputeJob job, + long startTaskTime, + long timeout, + @Nullable Collection top, + byte[] siblingsBytes, + Collection siblings, + byte[] sesAttrsBytes, + Map sesAttrs, + byte[] jobAttrsBytes, + Map jobAttrs, + String cpSpi, + IgniteUuid clsLdrId, + IgniteDeploymentMode depMode, + boolean dynamicSiblings, + Map ldrParticipants, + boolean forceLocDep, + boolean sesFullSup, + boolean internal, + UUID subjId) { + this.top = top; + assert sesId != null; + assert jobId != null; + assert taskName != null; + assert taskClsName != null; + assert job != null || jobBytes != null; + assert sesAttrs != null || sesAttrsBytes != null || !sesFullSup; + assert jobAttrs != null || jobAttrsBytes != null; + assert clsLdrId != null; + assert userVer != null; + assert depMode != null; + + this.sesId = sesId; + this.jobId = jobId; + this.taskName = taskName; + this.userVer = userVer; + this.taskClsName = taskClsName; + this.jobBytes = jobBytes; + this.job = job; + this.startTaskTime = startTaskTime; + this.timeout = timeout; + this.top = top; + this.siblingsBytes = siblingsBytes; + this.siblings = siblings; + this.sesAttrsBytes = sesAttrsBytes; + this.sesAttrs = sesAttrs; + this.jobAttrsBytes = jobAttrsBytes; + this.jobAttrs = jobAttrs; + this.clsLdrId = clsLdrId; + this.depMode = depMode; + this.dynamicSiblings = dynamicSiblings; + this.ldrParticipants = ldrParticipants; + this.forceLocDep = forceLocDep; + this.sesFullSup = sesFullSup; + this.internal = internal; + this.subjId = subjId; + + this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid getSessionId() { + return sesId; + } + + /** + * @return Job session ID. + */ + public IgniteUuid getJobId() { + return jobId; + } + + /** + * @return Task version. + */ + public String getTaskClassName() { + return taskClsName; + } + + /** + * @return Task name. + */ + public String getTaskName() { + return taskName; + } + + /** + * @return Task version. + */ + public String getUserVersion() { + return userVer; + } + + /** + * @return Serialized job bytes. + */ + public byte[] getJobBytes() { + return jobBytes; + } + + /** + * @return Grid job. + */ + public ComputeJob getJob() { + return job; + } + + /** + * @return Task start time. + */ + public long getStartTaskTime() { + return startTaskTime; + } + + /** + * @return Timeout. + */ + public long getTimeout() { + return timeout; + } + + /** + * Gets this instance creation time. + * + * @return This instance creation time. + */ + public long getCreateTime() { + return createTime0; + } + + /** + * @return Serialized collection of split siblings. + */ + public byte[] getSiblingsBytes() { + return siblingsBytes; + } + + /** + * @return Job siblings. + */ + public Collection getSiblings() { + return siblings; + } + + /** + * @return Session attributes. + */ + public byte[] getSessionAttributesBytes() { + return sesAttrsBytes; + } + + /** + * @return Session attributes. + */ + public Map getSessionAttributes() { + return sesAttrs; + } + + /** + * @return Job attributes. + */ + public byte[] getJobAttributesBytes() { + return jobAttrsBytes; + } + + /** + * @return Job attributes. + */ + public Map getJobAttributes() { + return jobAttrs; + } + + /** + * @return Checkpoint SPI name. + */ + public String getCheckpointSpi() { + return cpSpi; + } + + /** + * @return Task local class loader id. + */ + public IgniteUuid getClassLoaderId() { + return clsLdrId; + } + + /** + * @return Deployment mode. + */ + public IgniteDeploymentMode getDeploymentMode() { + return depMode; + } + + /** + * Returns true if siblings list is dynamic, i.e. task is continuous. + * + * @return True if siblings list is dynamic. + */ + public boolean isDynamicSiblings() { + return dynamicSiblings; + } + + /** + * @return Node class loader participant map. + */ + public Map getLoaderParticipants() { + return ldrParticipants; + } + + /** + * @return Returns {@code true} if deployment should always be used. + */ + public boolean isForceLocalDeployment() { + return forceLocDep; + } + + /** + * @return Topology. + */ + @Nullable public Collection topology() { + return top; + } + /** + * @return {@code True} if session attributes are enabled. + */ + public boolean isSessionFullSupport() { + return sesFullSup; + } + + /** + * @return {@code True} if internal job. + */ + public boolean isInternal() { + return internal; + } + + /** + * @return Subject ID. + */ + public UUID getSubjectId() { + return subjId; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridJobExecuteRequest _clone = new GridJobExecuteRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridJobExecuteRequest _clone = (GridJobExecuteRequest)_msg; + + _clone.subjId = subjId; + _clone.sesId = sesId; + _clone.jobId = jobId; + _clone.jobBytes = jobBytes; + _clone.job = job; + _clone.startTaskTime = startTaskTime; + _clone.timeout = timeout; + _clone.taskName = taskName; + _clone.userVer = userVer; + _clone.taskClsName = taskClsName; + _clone.ldrParticipants = ldrParticipants; + _clone.sesAttrsBytes = sesAttrsBytes; + _clone.sesAttrs = sesAttrs; + _clone.jobAttrsBytes = jobAttrsBytes; + _clone.jobAttrs = jobAttrs; + _clone.cpSpi = cpSpi; + _clone.siblings = siblings; + _clone.siblingsBytes = siblingsBytes; + _clone.createTime0 = createTime0; + _clone.createTime = createTime; + _clone.clsLdrId = clsLdrId; + _clone.depMode = depMode; + _clone.dynamicSiblings = dynamicSiblings; + _clone.forceLocDep = forceLocDep; + _clone.sesFullSup = sesFullSup; + _clone.internal = internal; + _clone.top = top; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putGridUuid(clsLdrId)) ++ if (!commState.putGridUuid("clsLdrId", clsLdrId)) + return false; + + commState.idx++; + + case 1: - if (!commState.putString(cpSpi)) ++ if (!commState.putString("cpSpi", cpSpi)) + return false; + + commState.idx++; + + case 2: - if (!commState.putLong(createTime)) ++ if (!commState.putLong("createTime", createTime)) + return false; + + commState.idx++; + + case 3: - if (!commState.putEnum(depMode)) ++ if (!commState.putEnum("depMode", depMode)) + return false; + + commState.idx++; + + case 4: - if (!commState.putBoolean(dynamicSiblings)) ++ if (!commState.putBoolean("dynamicSiblings", dynamicSiblings)) + return false; + + commState.idx++; + + case 5: - if (!commState.putBoolean(forceLocDep)) ++ if (!commState.putBoolean("forceLocDep", forceLocDep)) + return false; + + commState.idx++; + + case 6: - if (!commState.putBoolean(internal)) ++ if (!commState.putBoolean("internal", internal)) + return false; + + commState.idx++; + + case 7: - if (!commState.putByteArray(jobAttrsBytes)) ++ if (!commState.putByteArray("jobAttrsBytes", jobAttrsBytes)) + return false; + + commState.idx++; + + case 8: - if (!commState.putByteArray(jobBytes)) ++ if (!commState.putByteArray("jobBytes", jobBytes)) + return false; + + commState.idx++; + + case 9: - if (!commState.putGridUuid(jobId)) ++ if (!commState.putGridUuid("jobId", jobId)) + return false; + + commState.idx++; + + case 10: + if (ldrParticipants != null) { + if (commState.it == null) { - if (!commState.putInt(ldrParticipants.size())) ++ if (!commState.putInt(null, ldrParticipants.size())) + return false; + + commState.it = ldrParticipants.entrySet().iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + + Map.Entry e = (Map.Entry)commState.cur; + + if (!commState.keyDone) { - if (!commState.putUuid(e.getKey())) ++ if (!commState.putUuid(null, e.getKey())) + return false; + + commState.keyDone = true; + } + - if (!commState.putGridUuid(e.getValue())) ++ if (!commState.putGridUuid(null, e.getValue())) + return false; + + commState.keyDone = false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 11: - if (!commState.putByteArray(sesAttrsBytes)) ++ if (!commState.putByteArray("sesAttrsBytes", sesAttrsBytes)) + return false; + + commState.idx++; + + case 12: - if (!commState.putBoolean(sesFullSup)) ++ if (!commState.putBoolean("sesFullSup", sesFullSup)) + return false; + + commState.idx++; + + case 13: - if (!commState.putGridUuid(sesId)) ++ if (!commState.putGridUuid("sesId", sesId)) + return false; + + commState.idx++; + + case 14: - if (!commState.putByteArray(siblingsBytes)) ++ if (!commState.putByteArray("siblingsBytes", siblingsBytes)) + return false; + + commState.idx++; + + case 15: - if (!commState.putLong(startTaskTime)) ++ if (!commState.putLong("startTaskTime", startTaskTime)) + return false; + + commState.idx++; + + case 16: - if (!commState.putUuid(subjId)) ++ if (!commState.putUuid("subjId", subjId)) + return false; + + commState.idx++; + + case 17: - if (!commState.putString(taskClsName)) ++ if (!commState.putString("taskClsName", taskClsName)) + return false; + + commState.idx++; + + case 18: - if (!commState.putString(taskName)) ++ if (!commState.putString("taskName", taskName)) + return false; + + commState.idx++; + + case 19: - if (!commState.putLong(timeout)) ++ if (!commState.putLong("timeout", timeout)) + return false; + + commState.idx++; + + case 20: + if (top != null) { + if (commState.it == null) { - if (!commState.putInt(top.size())) ++ if (!commState.putInt(null, top.size())) + return false; + + commState.it = top.iterator(); + } + + while (commState.it.hasNext() || commState.cur != NULL) { + if (commState.cur == NULL) + commState.cur = commState.it.next(); + - if (!commState.putUuid((UUID)commState.cur)) ++ if (!commState.putUuid(null, (UUID)commState.cur)) + return false; + + commState.cur = NULL; + } + + commState.it = null; + } else { - if (!commState.putInt(-1)) ++ if (!commState.putInt(null, -1)) + return false; + } + + commState.idx++; + + case 21: - if (!commState.putString(userVer)) ++ if (!commState.putString("userVer", userVer)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - IgniteUuid clsLdrId0 = commState.getGridUuid(); ++ clsLdrId = commState.getGridUuid("clsLdrId"); + - if (clsLdrId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - clsLdrId = clsLdrId0; - + commState.idx++; + + case 1: - String cpSpi0 = commState.getString(); ++ cpSpi = commState.getString("cpSpi"); + - if (cpSpi0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - cpSpi = cpSpi0; - + commState.idx++; + + case 2: - if (buf.remaining() < 8) - return false; ++ createTime = commState.getLong("createTime"); + - createTime = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 3: - if (buf.remaining() < 1) - return false; ++ byte depMode0 = commState.getByte("depMode"); + - byte depMode0 = commState.getByte(); ++ if (!commState.lastRead()) ++ return false; + + depMode = IgniteDeploymentMode.fromOrdinal(depMode0); + + commState.idx++; + + case 4: - if (buf.remaining() < 1) - return false; ++ dynamicSiblings = commState.getBoolean("dynamicSiblings"); + - dynamicSiblings = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 5: - if (buf.remaining() < 1) - return false; ++ forceLocDep = commState.getBoolean("forceLocDep"); + - forceLocDep = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 6: - if (buf.remaining() < 1) - return false; ++ internal = commState.getBoolean("internal"); + - internal = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 7: - byte[] jobAttrsBytes0 = commState.getByteArray(); ++ jobAttrsBytes = commState.getByteArray("jobAttrsBytes"); + - if (jobAttrsBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - jobAttrsBytes = jobAttrsBytes0; - + commState.idx++; + + case 8: - byte[] jobBytes0 = commState.getByteArray(); ++ jobBytes = commState.getByteArray("jobBytes"); + - if (jobBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - jobBytes = jobBytes0; - + commState.idx++; + + case 9: - IgniteUuid jobId0 = commState.getGridUuid(); ++ jobId = commState.getGridUuid("jobId"); + - if (jobId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - jobId = jobId0; - + commState.idx++; + + case 10: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (ldrParticipants == null) + ldrParticipants = new HashMap<>(commState.readSize, 1.0f); + + for (int i = commState.readItems; i < commState.readSize; i++) { + if (!commState.keyDone) { - UUID _val = commState.getUuid(); ++ UUID _val = commState.getUuid(null); + - if (_val == UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + + commState.cur = _val; + commState.keyDone = true; + } + - IgniteUuid _val = commState.getGridUuid(); ++ IgniteUuid _val = commState.getGridUuid(null); + - if (_val == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + + ldrParticipants.put((UUID)commState.cur, _val); + + commState.keyDone = false; + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + commState.cur = null; + + commState.idx++; + + case 11: - byte[] sesAttrsBytes0 = commState.getByteArray(); ++ sesAttrsBytes = commState.getByteArray("sesAttrsBytes"); + - if (sesAttrsBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - sesAttrsBytes = sesAttrsBytes0; - + commState.idx++; + + case 12: - if (buf.remaining() < 1) - return false; ++ sesFullSup = commState.getBoolean("sesFullSup"); + - sesFullSup = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 13: - IgniteUuid sesId0 = commState.getGridUuid(); ++ sesId = commState.getGridUuid("sesId"); + - if (sesId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - sesId = sesId0; - + commState.idx++; + + case 14: - byte[] siblingsBytes0 = commState.getByteArray(); ++ siblingsBytes = commState.getByteArray("siblingsBytes"); + - if (siblingsBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - siblingsBytes = siblingsBytes0; - + commState.idx++; + + case 15: - if (buf.remaining() < 8) - return false; ++ startTaskTime = commState.getLong("startTaskTime"); + - startTaskTime = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 16: + UUID subjId0 = commState.getUuid(); + + if (subjId0 == UUID_NOT_READ) + return false; + + subjId = subjId0; + + commState.idx++; + + case 17: - String taskClsName0 = commState.getString(); ++ String taskClsName0 = commState.getString("taskClsName"); + - if (taskClsName0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - taskClsName = taskClsName0; - + commState.idx++; + + case 18: - String taskName0 = commState.getString(); ++ String taskName0 = commState.getString("taskName"); + - if (taskName0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - taskName = taskName0; - + commState.idx++; + + case 19: - if (buf.remaining() < 8) - return false; ++ timeout = commState.getLong("timeout"); + - timeout = commState.getLong(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 20: + if (commState.readSize == -1) { - if (buf.remaining() < 4) - return false; ++ commState.readSize = commState.getInt(null); + - commState.readSize = commState.getInt(); ++ if (!commState.lastRead()) ++ return false; + } + + if (commState.readSize >= 0) { + if (top == null) + top = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { - UUID _val = commState.getUuid(); ++ UUID _val = commState.getUuid(null); + - if (_val == UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + + top.add((UUID)_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 21: - String userVer0 = commState.getString(); ++ String userVer0 = commState.getString("userVer"); + - if (userVer0 == STR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - userVer = userVer0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 81; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridJobExecuteRequest.class, this); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java index 0000000,d3e6e56..d2711a7 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java @@@ -1,0 -1,374 +1,362 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal; + + import org.apache.ignite.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.tostring.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + + /** + * Job execution response. + */ + public class GridJobExecuteResponse extends GridTcpCommunicationMessageAdapter implements GridTaskMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private UUID nodeId; + + /** */ + private IgniteUuid sesId; + + /** */ + private IgniteUuid jobId; + + /** */ + private byte[] gridExBytes; + + /** */ + @GridDirectTransient + private IgniteCheckedException gridEx; + + /** */ + private byte[] resBytes; + + /** */ + @GridDirectTransient + private Object res; + + /** */ + private byte[] jobAttrsBytes; + + /** */ + @GridDirectTransient + private Map jobAttrs; + + /** */ + private boolean isCancelled; + + /** */ + @GridToStringExclude + @GridDirectTransient + private IgniteCheckedException fakeEx; + + /** + * No-op constructor to support {@link Externalizable} interface. This + * constructor is not meant to be used for other purposes. + */ + public GridJobExecuteResponse() { + // No-op. + } + + /** + * @param nodeId Sender node ID. + * @param sesId Task session ID + * @param jobId Job ID. + * @param gridExBytes Serialized grid exception. + * @param gridEx Grid exception. + * @param resBytes Serialized result. + * @param res Result. + * @param jobAttrsBytes Serialized job attributes. + * @param jobAttrs Job attributes. + * @param isCancelled Whether job was cancelled or not. + */ + public GridJobExecuteResponse(UUID nodeId, IgniteUuid sesId, IgniteUuid jobId, byte[] gridExBytes, + IgniteCheckedException gridEx, byte[] resBytes, Object res, byte[] jobAttrsBytes, + Map jobAttrs, boolean isCancelled) { + assert nodeId != null; + assert sesId != null; + assert jobId != null; + + this.nodeId = nodeId; + this.sesId = sesId; + this.jobId = jobId; + this.gridExBytes = gridExBytes; + this.gridEx = gridEx; + this.resBytes = resBytes; + this.res = res; + this.jobAttrsBytes = jobAttrsBytes; + this.jobAttrs = jobAttrs; + this.isCancelled = isCancelled; + } + + /** + * @return Task session ID. + */ + @Override public IgniteUuid getSessionId() { + return sesId; + } + + /** + * @return Job ID. + */ + public IgniteUuid getJobId() { + return jobId; + } + + /** + * @return Serialized job result. + */ + @Nullable public byte[] getJobResultBytes() { + return resBytes; + } + + /** + * @return Job result. + */ + @Nullable public Object getJobResult() { + return res; + } + + /** + * @return Serialized job exception. + */ + @Nullable public byte[] getExceptionBytes() { + return gridExBytes; + } + + /** + * @return Job exception. + */ + @Nullable public IgniteCheckedException getException() { + return gridEx; + } + + /** + * @return Serialized job attributes. + */ + @Nullable public byte[] getJobAttributesBytes() { + return jobAttrsBytes; + } + + /** + * @return Job attributes. + */ + @Nullable public Map getJobAttributes() { + return jobAttrs; + } + + /** + * @return Job cancellation status. + */ + public boolean isCancelled() { + return isCancelled; + } + + /** + * @return Sender node ID. + */ + public UUID getNodeId() { + return nodeId; + } + + /** + * @return Fake exception. + */ + public IgniteCheckedException getFakeException() { + return fakeEx; + } + + /** + * @param fakeEx Fake exception. + */ + public void setFakeException(IgniteCheckedException fakeEx) { + this.fakeEx = fakeEx; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridJobExecuteResponse _clone = new GridJobExecuteResponse(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridJobExecuteResponse _clone = (GridJobExecuteResponse)_msg; + + _clone.nodeId = nodeId; + _clone.sesId = sesId; + _clone.jobId = jobId; + _clone.gridExBytes = gridExBytes; + _clone.gridEx = gridEx; + _clone.resBytes = resBytes; + _clone.res = res; + _clone.jobAttrsBytes = jobAttrsBytes; + _clone.jobAttrs = jobAttrs; + _clone.isCancelled = isCancelled; + _clone.fakeEx = fakeEx; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putByteArray(gridExBytes)) ++ if (!commState.putByteArray("gridExBytes", gridExBytes)) + return false; + + commState.idx++; + + case 1: - if (!commState.putBoolean(isCancelled)) ++ if (!commState.putBoolean("isCancelled", isCancelled)) + return false; + + commState.idx++; + + case 2: - if (!commState.putByteArray(jobAttrsBytes)) ++ if (!commState.putByteArray("jobAttrsBytes", jobAttrsBytes)) + return false; + + commState.idx++; + + case 3: - if (!commState.putGridUuid(jobId)) ++ if (!commState.putGridUuid("jobId", jobId)) + return false; + + commState.idx++; + + case 4: - if (!commState.putUuid(nodeId)) ++ if (!commState.putUuid("nodeId", nodeId)) + return false; + + commState.idx++; + + case 5: - if (!commState.putByteArray(resBytes)) ++ if (!commState.putByteArray("resBytes", resBytes)) + return false; + + commState.idx++; + + case 6: - if (!commState.putGridUuid(sesId)) ++ if (!commState.putGridUuid("sesId", sesId)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - byte[] gridExBytes0 = commState.getByteArray(); ++ gridExBytes = commState.getByteArray("gridExBytes"); + - if (gridExBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - gridExBytes = gridExBytes0; - + commState.idx++; + + case 1: - if (buf.remaining() < 1) - return false; ++ isCancelled = commState.getBoolean("isCancelled"); + - isCancelled = commState.getBoolean(); ++ if (!commState.lastRead()) ++ return false; + + commState.idx++; + + case 2: - byte[] jobAttrsBytes0 = commState.getByteArray(); ++ jobAttrsBytes = commState.getByteArray("jobAttrsBytes"); + - if (jobAttrsBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - jobAttrsBytes = jobAttrsBytes0; - + commState.idx++; + + case 3: - IgniteUuid jobId0 = commState.getGridUuid(); ++ jobId = commState.getGridUuid("jobId"); + - if (jobId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - jobId = jobId0; - + commState.idx++; + + case 4: - UUID nodeId0 = commState.getUuid(); ++ nodeId = commState.getUuid("nodeId"); + - if (nodeId0 == UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - nodeId = nodeId0; - + commState.idx++; + + case 5: - byte[] resBytes0 = commState.getByteArray(); ++ resBytes = commState.getByteArray("resBytes"); + - if (resBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - resBytes = resBytes0; - + commState.idx++; + + case 6: - IgniteUuid sesId0 = commState.getGridUuid(); ++ sesId = commState.getGridUuid("sesId"); + - if (sesId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - sesId = sesId0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 2; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridJobExecuteResponse.class, this); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java index 0000000,6c2d756..abe558f mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsRequest.java @@@ -1,0 -1,175 +1,171 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal; + + import org.apache.ignite.lang.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.nio.*; + + /** + * Job siblings request. + */ + public class GridJobSiblingsRequest extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private IgniteUuid sesId; + + /** */ + @GridDirectTransient + private Object topic; + + /** */ + private byte[] topicBytes; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridJobSiblingsRequest() { + // No-op. + } + + /** + * @param sesId Session ID. + * @param topic Topic. + * @param topicBytes Serialized topic. + */ + public GridJobSiblingsRequest(IgniteUuid sesId, Object topic, byte[] topicBytes) { + assert sesId != null; + assert topic != null || topicBytes != null; + + this.sesId = sesId; + this.topic = topic; + this.topicBytes = topicBytes; + } + + /** + * @return Session ID. + */ + public IgniteUuid sessionId() { + return sesId; + } + + /** + * @return Topic. + */ + public Object topic() { + return topic; + } + + /** + * @return Serialized topic. + */ + public byte[] topicBytes() { + return topicBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridJobSiblingsRequest _clone = new GridJobSiblingsRequest(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridJobSiblingsRequest _clone = (GridJobSiblingsRequest)_msg; + + _clone.sesId = sesId; + _clone.topic = topic; + _clone.topicBytes = topicBytes; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { - if (!commState.putByte(directType())) ++ if (!commState.putByte(null, directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: - if (!commState.putGridUuid(sesId)) ++ if (!commState.putGridUuid("sesId", sesId)) + return false; + + commState.idx++; + + case 1: - if (!commState.putByteArray(topicBytes)) ++ if (!commState.putByteArray("topicBytes", topicBytes)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @SuppressWarnings("all") + @Override public boolean readFrom(ByteBuffer buf) { + commState.setBuffer(buf); + + switch (commState.idx) { + case 0: - IgniteUuid sesId0 = commState.getGridUuid(); ++ sesId = commState.getGridUuid("sesId"); + - if (sesId0 == GRID_UUID_NOT_READ) ++ if (!commState.lastRead()) + return false; + - sesId = sesId0; - + commState.idx++; + + case 1: - byte[] topicBytes0 = commState.getByteArray(); ++ topicBytes = commState.getByteArray("topicBytes"); + - if (topicBytes0 == BYTE_ARR_NOT_READ) ++ if (!commState.lastRead()) + return false; + - topicBytes = topicBytes0; - + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 3; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridJobSiblingsRequest.class, this); + } + }