Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5412418E81 for ; Thu, 24 Mar 2016 13:14:03 +0000 (UTC) Received: (qmail 31624 invoked by uid 500); 24 Mar 2016 13:14:03 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 31554 invoked by uid 500); 24 Mar 2016 13:14:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 31317 invoked by uid 99); 24 Mar 2016 13:14:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 24 Mar 2016 13:14:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B8A37DFBAF; Thu, 24 Mar 2016 13:14:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 24 Mar 2016 13:14:10 -0000 Message-Id: <91fa7502a1e34e42af44e1f0cc45713e@git.apache.org> In-Reply-To: <3adbbc65c0f043949d6d968018dbcf1e@git.apache.org> References: <3adbbc65c0f043949d6d968018dbcf1e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/11] ignite git commit: IGNITE-2883: IGFS: Now IPC messages are handled in a dedicated thread pool. IGNITE-2883: IGFS: Now IPC messages are handled in a dedicated thread pool. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cfc7d4ee Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cfc7d4ee Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cfc7d4ee Branch: refs/heads/ignite-2004 Commit: cfc7d4eec255d0fe720398882d0c058f6821611a Parents: bc14c80 Author: vozerov-gridgain Authored: Thu Mar 24 14:29:35 2016 +0300 Committer: vozerov-gridgain Committed: Thu Mar 24 14:29:35 2016 +0300 ---------------------------------------------------------------------- .../igfs/IgfsIpcEndpointConfiguration.java | 28 +++++++ .../processors/igfs/IgfsIpcHandler.java | 81 +++++++++++++++----- .../internal/processors/igfs/IgfsProcessor.java | 11 ++- .../internal/processors/igfs/IgfsServer.java | 2 +- .../igfs/IgfsProcessorValidationSelfTest.java | 16 ++++ 5 files changed, 117 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java index 23993a6..1c68d0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsIpcEndpointConfiguration.java @@ -49,6 +49,9 @@ public class IgfsIpcEndpointConfiguration { */ public static final String DFLT_TOKEN_DIR_PATH = "ipc/shmem"; + /** Default threads count. */ + public static final int DFLT_THREAD_CNT = IgniteConfiguration.AVAILABLE_PROC_CNT; + /** Endpoint type. */ private IgfsIpcEndpointType type = DFLT_TYPE; @@ -64,6 +67,9 @@ public class IgfsIpcEndpointConfiguration { /** Token directory path. */ private String tokenDirPath = DFLT_TOKEN_DIR_PATH; + /** Thread count. */ + private int threadCnt = DFLT_THREAD_CNT; + /** * Default constructor. */ @@ -236,6 +242,28 @@ public class IgfsIpcEndpointConfiguration { this.tokenDirPath = tokenDirPath; } + /** + * Get number of threads used by this endpoint to process incoming requests. + *

+ * Defaults to {@link #DFLT_THREAD_CNT}. + * + * @return Number of threads used by this endpoint to process incoming requests. + */ + public int getThreadCount() { + return threadCnt; + } + + /** + * Set number of threads used by this endpoint to process incoming requests. + *

+ * See {@link #getThreadCount()} for more information. + * + * @param threadCnt Number of threads used by this endpoint to process incoming requests. + */ + public void setThreadCount(int threadCnt) { + this.threadCnt = threadCnt; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgfsIpcEndpointConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java index eadbdb2..bf87384 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.igfs; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; import org.apache.ignite.igfs.IgfsOutOfSpaceException; import org.apache.ignite.igfs.IgfsOutputStream; import org.apache.ignite.igfs.IgfsUserContext; @@ -31,20 +32,22 @@ import org.apache.ignite.internal.igfs.common.IgfsIpcCommand; import org.apache.ignite.internal.igfs.common.IgfsMessage; import org.apache.ignite.internal.igfs.common.IgfsPathControlRequest; import org.apache.ignite.internal.igfs.common.IgfsStreamControlRequest; -import org.apache.ignite.internal.processors.closure.GridClosurePolicy; import org.apache.ignite.internal.util.future.GridFinishedFuture; -import org.apache.ignite.internal.util.lang.GridPlainCallable; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.Nullable; import java.io.Closeable; import java.io.DataInput; import java.io.IOException; import java.util.Iterator; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; /** @@ -70,6 +73,9 @@ class IgfsIpcHandler implements IgfsServerHandler { /** Resource ID generator. */ private final AtomicLong rsrcIdGen = new AtomicLong(); + /** Thread pool. */ + private volatile IgniteThreadPoolExecutor pool; + /** Stopping flag. */ private volatile boolean stopping; @@ -77,8 +83,10 @@ class IgfsIpcHandler implements IgfsServerHandler { * Constructs IGFS IPC handler. * * @param igfsCtx Context. + * @param endpointCfg Endpoint configuration. + * @param mgmt Management flag. */ - IgfsIpcHandler(IgfsContext igfsCtx) { + IgfsIpcHandler(IgfsContext igfsCtx, IgfsIpcEndpointConfiguration endpointCfg, boolean mgmt) { assert igfsCtx != null; ctx = igfsCtx.kernalContext(); @@ -87,12 +95,24 @@ class IgfsIpcHandler implements IgfsServerHandler { // Keep buffer size multiple of block size so no extra byte array copies is performed. bufSize = igfsCtx.configuration().getBlockSize() * 2; + // Create thread pool for request handling. + int threadCnt = endpointCfg.getThreadCount(); + + String prefix = "igfs-" + igfsCtx.igfs().name() + (mgmt ? "mgmt-" : "") + "-ipc"; + + pool = new IgniteThreadPoolExecutor(prefix, igfsCtx.kernalContext().gridName(), threadCnt, threadCnt, + Long.MAX_VALUE, new LinkedBlockingQueue()); + log = ctx.log(IgfsIpcHandler.class); } /** {@inheritDoc} */ @Override public void stop() throws IgniteCheckedException { stopping = true; + + U.shutdownNow(getClass(), pool, log); + + pool = null; } /** {@inheritDoc} */ @@ -114,7 +134,7 @@ class IgfsIpcHandler implements IgfsServerHandler { /** {@inheritDoc} */ @Override public IgniteInternalFuture handleAsync(final IgfsClientSession ses, - final IgfsMessage msg, DataInput in) { + final IgfsMessage msg, final DataInput in) { try { // Even if will be closed right after this call, response write error will be ignored. if (stopping) @@ -130,21 +150,32 @@ class IgfsIpcHandler implements IgfsServerHandler { case MAKE_DIRECTORIES: case LIST_FILES: case LIST_PATHS: { - IgfsMessage res = execute(ses, cmd, msg, in); - - fut = res == null ? null : new GridFinishedFuture<>(res); + fut = executeSynchronously(ses, cmd, msg, in); break; } - // Execute command asynchronously in user's pool. + // Execute command asynchronously in pool. default: { - fut = ctx.closure().callLocalSafe(new GridPlainCallable() { - @Override public IgfsMessage call() throws Exception { - // No need to pass data input for non-write-block commands. - return execute(ses, cmd, msg, null); - } - }, GridClosurePolicy.IGFS_POOL); + try { + final GridFutureAdapter fut0 = new GridFutureAdapter<>(); + + pool.execute(new Runnable() { + @Override public void run() { + try { + fut0.onDone(execute(ses, cmd, msg, in)); + } + catch (Exception e) { + fut0.onDone(e); + } + } + }); + + fut = fut0; + } + catch (RejectedExecutionException ignored) { + fut = executeSynchronously(ses, cmd, msg, in); + } } } @@ -157,6 +188,23 @@ class IgfsIpcHandler implements IgfsServerHandler { } /** + * Execute operation synchronously. + * + * @param ses Session. + * @param cmd Command. + * @param msg Message. + * @param in Input. + * @return Result. + * @throws Exception If failed. + */ + @Nullable private IgniteInternalFuture executeSynchronously(IgfsClientSession ses, + IgfsIpcCommand cmd, IgfsMessage msg, DataInput in) throws Exception { + IgfsMessage res = execute(ses, cmd, msg, in); + + return res == null ? null : new GridFinishedFuture<>(res); + } + + /** * Execute IGFS command. * * @param ses Client connection session. @@ -167,8 +215,7 @@ class IgfsIpcHandler implements IgfsServerHandler { * @throws Exception If failed. */ private IgfsMessage execute(IgfsClientSession ses, IgfsIpcCommand cmd, IgfsMessage msg, - @Nullable DataInput in) - throws Exception { + @Nullable DataInput in) throws Exception { switch (cmd) { case HANDSHAKE: return processHandshakeRequest((IgfsHandshakeRequest)msg); @@ -495,8 +542,6 @@ class IgfsIpcHandler implements IgfsServerHandler { } case WRITE_BLOCK: { - assert rsrcId != null : "Missing stream ID"; - IgfsOutputStream out = (IgfsOutputStream)resource(ses, rsrcId); if (out == null) http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java index 44f6e44..778de99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java @@ -26,6 +26,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper; +import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration; import org.apache.ignite.igfs.IgfsMode; import org.apache.ignite.igfs.IgfsPath; import org.apache.ignite.igfs.mapreduce.IgfsJob; @@ -316,12 +317,18 @@ public class IgfsProcessor extends IgfsProcessorAdapter { throw new IgniteCheckedException("Invalid IGFS data cache configuration (key affinity mapper class should be " + IgfsGroupDataBlocksKeyMapper.class.getSimpleName() + "): " + cfg); - if (cfg.getIpcEndpointConfiguration() != null) { - final int tcpPort = cfg.getIpcEndpointConfiguration().getPort(); + IgfsIpcEndpointConfiguration ipcCfg = cfg.getIpcEndpointConfiguration(); + + if (ipcCfg != null) { + final int tcpPort = ipcCfg.getPort(); if (!(tcpPort >= MIN_TCP_PORT && tcpPort <= MAX_TCP_PORT)) throw new IgniteCheckedException("IGFS endpoint TCP port is out of range [" + MIN_TCP_PORT + ".." + MAX_TCP_PORT + "]: " + tcpPort); + + if (ipcCfg.getThreadCount() <= 0) + throw new IgniteCheckedException("IGFS endpoint thread count must be positive: " + + ipcCfg.getThreadCount()); } long maxSpaceSize = cfg.getMaxSpaceSize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java index 314baac..aa4b115 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsServer.java @@ -139,7 +139,7 @@ public class IgfsServer { if (srvEndpoint.getPort() >= 0) igfsCtx.kernalContext().ports().registerPort(srvEndpoint.getPort(), TCP, srvEndpoint.getClass()); - hnd = new IgfsIpcHandler(igfsCtx); + hnd = new IgfsIpcHandler(igfsCtx, endpointCfg, mgmt); // Start client accept worker. acceptWorker = new AcceptWorker(); http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc7d4ee/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java index 27f47e8..0ce1036 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java @@ -469,6 +469,22 @@ public class IgfsProcessorValidationSelfTest extends IgfsCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testInvalidEndpointThreadCount() throws Exception { + final String failMsg = "IGFS endpoint thread count must be positive"; + g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class)); + + final String igfsCfgName = "igfs-cfg"; + final IgfsIpcEndpointConfiguration igfsEndpointCfg = new IgfsIpcEndpointConfiguration(); + igfsEndpointCfg.setThreadCount(0); + g1IgfsCfg1.setName(igfsCfgName); + g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg); + + checkGridStartFails(g1Cfg, failMsg, true); + } + + /** * Checks that the given grid configuration will lead to {@link IgniteCheckedException} upon grid startup. * * @param cfg Grid configuration to check.