Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7C607200CEB for ; Fri, 28 Jul 2017 14:07:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7816516CA44; Fri, 28 Jul 2017 12:07:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 679F816CA64 for ; Fri, 28 Jul 2017 14:07:16 +0200 (CEST) Received: (qmail 34038 invoked by uid 500); 28 Jul 2017 12:07:15 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 33839 invoked by uid 99); 28 Jul 2017 12:07:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Jul 2017 12:07:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 06966E9471; Fri, 28 Jul 2017 12:07:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.apache.org Date: Fri, 28 Jul 2017 12:08:02 -0000 Message-Id: <283892feb1064baf84cf684897135beb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [50/50] [abbrv] ignite git commit: ignite-5658 moved to stripes + cleanup archived-at: Fri, 28 Jul 2017 12:07:18 -0000 ignite-5658 moved to stripes + cleanup Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/55f51b54 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/55f51b54 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/55f51b54 Branch: refs/heads/ignite-5658 Commit: 55f51b54f2ff4a256dda2709d05b6f56a17e8a41 Parents: d849764 Author: Yakov Zhdanov Authored: Fri Jul 28 15:06:56 2017 +0300 Committer: Yakov Zhdanov Committed: Fri Jul 28 15:06:56 2017 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/IgniteDataStreamer.java | 14 +++- .../ignite/internal/jdbc2/JdbcConnection.java | 2 +- .../datastreamer/DataStreamerImpl.java | 82 ++++++++------------ .../DataStreamProcessorSelfTest.java | 17 ++-- 4 files changed, 49 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java index e2473dc..0e84e36 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteDataStreamer.java @@ -72,7 +72,7 @@ import org.jetbrains.annotations.Nullable; * this setting limits maximum allowed number of parallel buffered stream messages that * are being processed on remote nodes. If this number is exceeded, then * {@link #addData(Object, Object)} method will block to control memory utilization. - * Default is defined by {@link #DFLT_MAX_PARALLEL_OPS} value. + * Default is equal to CPU count on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER}. * *
  • * {@link #autoFlushFrequency(long)} - automatic flush frequency in milliseconds. Essentially, @@ -100,8 +100,8 @@ import org.jetbrains.annotations.Nullable; * */ public interface IgniteDataStreamer extends AutoCloseable { - /** Default max concurrent put operations count. */ - public static final int DFLT_MAX_PARALLEL_OPS = 16; + /** Default concurrent put operations multiplier for CPU count. */ + public static final int DFLT_PARALLEL_OPS_MULTIPLIER = 20; /** Default per node buffer size. */ public static final int DFLT_PER_NODE_BUFFER_SIZE = 1024; @@ -193,6 +193,10 @@ public interface IgniteDataStreamer extends AutoCloseable { /** * Gets maximum number of parallel stream operations for a single node. + *

    + * If not provided (is equal to {@code 0}), then default value is equal to CPU count + * on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER} + * or equal provided value if this property is set. * * @return Maximum number of parallel stream operations for a single node. */ @@ -203,7 +207,9 @@ public interface IgniteDataStreamer extends AutoCloseable { *

    * This method should be called prior to {@link #addData(Object, Object)} call. *

    - * If not provided, default value is {@link #DFLT_MAX_PARALLEL_OPS}. + * If not provided (is equal to {@code 0}), then default value is equal to CPU count + * on remote node multiply by {@link #DFLT_PARALLEL_OPS_MULTIPLIER} + * or equal provided value if this property is set. * * @param parallelOps Maximum number of parallel stream operations for a single node. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java index 1bf51f2..eb443c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java @@ -186,7 +186,7 @@ public class JdbcConnection implements Connection { streamNodeBufSize = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_BUF_SIZE, String.valueOf(IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE))); streamNodeParOps = Integer.parseInt(props.getProperty(PROP_STREAMING_PER_NODE_PAR_OPS, - String.valueOf(IgniteDataStreamer.DFLT_MAX_PARALLEL_OPS))); + String.valueOf(IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER))); String nodeIdProp = props.getProperty(PROP_NODE_ID); http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 5d1b0a3..6e9fbb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.datastreamer; +import java.lang.reflect.Array; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -62,6 +63,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.deployment.GridDeployment; @@ -153,7 +155,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed private int bufSize = DFLT_PER_NODE_BUFFER_SIZE; /** */ - private int parallelOps = DFLT_MAX_PARALLEL_OPS; + private int parallelOps; /** */ private long timeout = DFLT_UNLIMIT_TIMEOUT; @@ -828,6 +830,10 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed if (old != null) buf = old; + else if (log.isInfoEnabled()) + log.info("Created buffer for node [nodeId=" + nodeId + + ", parallelOps=" + buf.perNodeParallelOps + + ", stripes=" + buf.stripes.length + ']'); } final Collection entriesForNode = e.getValue(); @@ -1311,15 +1317,8 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** Active futures. */ private final Collection> locFuts; -// /** Buffered entries. */ -// private List entries; /** Buffered entries. */ - private final ConcurrentMap entriesMap = - new ConcurrentHashMap8<>(); - -// /** */ -// @GridToStringExclude -// private GridFutureAdapter curFut; + private final PerStripeBuffer[] stripes; /** Local node flag. */ private final boolean isLocNode; @@ -1333,16 +1332,17 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** */ private final Semaphore sem; -// /** Batch topology. */ -// private AffinityTopologyVersion batchTopVer; + /** */ + private final int perNodeParallelOps; /** Closure to signal on task finish. */ @GridToStringExclude - private final IgniteInClosure> signalC = new IgniteInClosure>() { - @Override public void apply(IgniteInternalFuture t) { - signalTaskFinished(t); - } - }; + private final IgniteInClosure> signalC = + new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture t) { + signalTaskFinished(t); + } + }; /** * @param node Node. @@ -1358,25 +1358,17 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed // Cache local node flag. isLocNode = node.equals(ctx.discovery().localNode()); -// entries = newEntries(); -// curFut = new GridFutureAdapter<>(); -// curFut.listen(signalC); + perNodeParallelOps = parallelOps != 0 ? + parallelOps : + node.metrics().getTotalCpus() * DFLT_PARALLEL_OPS_MULTIPLIER; - sem = new Semaphore(parallelOps); - } + sem = new Semaphore(perNodeParallelOps); -// /** -// * @param remap Remapping flag. -// */ -// private void renewBatch(boolean remap) { -// entries = newEntries(); -// curFut = new GridFutureAdapter<>(); -// -// batchTopVer = null; -// -// if (!remap) -// curFut.listen(signalC); -// } + stripes = (PerStripeBuffer[])Array.newInstance(PerStripeBuffer.class, node.metrics().getTotalCpus()); + + for (int i = 0; i < stripes.length; i++) + stripes[i] = new PerStripeBuffer(i, signalC); + } /** * @param newEntries Infos. @@ -1401,15 +1393,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed // Init buffer. int part = entry.getKey().partition(); - PerPartitionBuffer b = entriesMap.get(part); - - if (b == null) { - PerPartitionBuffer old = - entriesMap.putIfAbsent(part, b = new PerPartitionBuffer(part, signalC)); - - if (old != null) - b = old; - } + PerStripeBuffer b = stripes[part % stripes.length]; synchronized (b) { curFut0 = b.curFut; @@ -1468,7 +1452,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed @Nullable IgniteInternalFuture flush() throws IgniteInterruptedCheckedException { acquireRemapSemaphore(); - for (PerPartitionBuffer b : entriesMap.values()) { + for (PerStripeBuffer b : stripes) { AffinityTopologyVersion batchTopVer = null; List entries0 = null; GridFutureAdapter curFut0 = null; @@ -1729,11 +1713,11 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed dep != null ? dep.classLoaderId() : null, dep == null, topVer, - rcvr == ISOLATED_UPDATER ? partId : -1); + rcvr == ISOLATED_UPDATER ? partId : GridIoMessage.STRIPE_DISABLED_PART); try { ctx.io().sendToGridTopic(node, TOPIC_DATASTREAM, req, - partId == -1 ? plc : GridIoPolicy.SYSTEM_POOL); + req.partition() == GridIoMessage.STRIPE_DISABLED_PART ? plc : GridIoPolicy.SYSTEM_POOL); if (log.isDebugEnabled()) log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']'); @@ -1779,7 +1763,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed // Make sure to complete current future. GridFutureAdapter curFut0; - for (PerPartitionBuffer b : entriesMap.values()) { + for (PerStripeBuffer b : stripes) { synchronized (b) { curFut0 = b.curFut; } @@ -2118,7 +2102,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** * */ - private class PerPartitionBuffer { + private class PerStripeBuffer { /** */ private final int partId; @@ -2138,7 +2122,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed * @param partId Partition ID. * @param c Signal closure. */ - public PerPartitionBuffer( + public PerStripeBuffer( int partId, IgniteInClosure> c ) { @@ -2167,7 +2151,7 @@ public class DataStreamerImpl implements IgniteDataStreamer, Delayed /** {@inheritDoc} */ @Override public String toString() { - return S.toString(PerPartitionBuffer.class, this, super.toString()); + return S.toString(PerStripeBuffer.class, this, super.toString()); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/55f51b54/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java index ec5e6d0..b8ca255 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessorSelfTest.java @@ -962,8 +962,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); - IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME); - try { + try (IgniteDataStreamer ldr = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { ldr.receiver(new StreamReceiver() { @Override public void receive(IgniteCache cache, Collection> entries) throws IgniteException { @@ -972,6 +971,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { cache.put("key", threadName); } }); + ldr.addData("key", "value"); ldr.tryFlush(); @@ -982,9 +982,6 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { } }, 3_000); } - finally { - ldr.close(true); - } assertNotNull(cache.get("key")); @@ -1011,9 +1008,7 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { final IgniteCache cache = ignite.cache(DEFAULT_CACHE_NAME); - IgniteDataStreamer ldr = client.dataStreamer(DEFAULT_CACHE_NAME); - - try { + try (IgniteDataStreamer ldr = client.dataStreamer(DEFAULT_CACHE_NAME)) { ldr.receiver(new StringStringStreamReceiver()); ldr.addData("key", "value"); @@ -1021,14 +1016,12 @@ public class DataStreamProcessorSelfTest extends GridCommonAbstractTest { ldr.tryFlush(); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { + @Override + public boolean apply() { return cache.get("key") != null; } }, 3_000); } - finally { - ldr.close(true); - } assertNotNull(cache.get("key"));