Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B585C17DEE for ; Fri, 17 Apr 2015 16:06:57 +0000 (UTC) Received: (qmail 39983 invoked by uid 500); 17 Apr 2015 16:06:48 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 39948 invoked by uid 500); 17 Apr 2015 16:06:48 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 39935 invoked by uid 99); 17 Apr 2015 16:06:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Apr 2015 16:06:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 17 Apr 2015 16:06:46 +0000 Received: (qmail 37658 invoked by uid 99); 17 Apr 2015 16:06:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Apr 2015 16:06:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C938CDFE2C; Fri, 17 Apr 2015 16:06:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.incubator.apache.org Date: Fri, 17 Apr 2015 16:06:40 -0000 Message-Id: <71b05bc000b94543b3d162f16de0bedd@git.apache.org> In-Reply-To: <6ea54f30f97c45da87358e2826440867@git.apache.org> References: <6ea54f30f97c45da87358e2826440867@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/47] incubator-ignite git commit: # ignite-743 X-Virus-Checked: Checked by ClamAV on apache.org # ignite-743 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23a41dfd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23a41dfd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23a41dfd Branch: refs/heads/ignite-485 Commit: 23a41dfd2b7904c72a9fdebe35ef00b02ad62b3a Parents: d6434bb Author: sboikov Authored: Thu Apr 16 10:51:10 2015 +0300 Committer: sboikov Committed: Thu Apr 16 11:03:10 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheGateway.java | 67 +++++----- .../processors/cache/IgniteCacheProxy.java | 129 +++++++------------ .../datastreamer/DataStreamerUpdateJob.java | 12 -- 3 files changed, 81 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 97fada9..aa73414 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -66,10 +66,10 @@ public class GridCacheGateway { /** * Enter a cache call. * - * @return {@code true} if enter successful, {@code false} if the cache or the node was stopped. + * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped. */ public boolean enterIfNotClosed() { - enterIfNotClosedNoLock(); + onEnter(); // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. @@ -87,17 +87,16 @@ public class GridCacheGateway { /** * Enter a cache call without lock. * - * @return {@code true} if enter successful, {@code false} if the cache or the node was stopped. + * @return {@code True} if enter successful, {@code false} if the cache or the node was stopped. */ public boolean enterIfNotClosedNoLock() { - if (ctx.deploymentEnabled()) - ctx.deploy().onEnter(); + onEnter(); return !stopped; } /** - * Leave a cache call entered by {@link #enter()} method. + * Leave a cache call entered by {@link #enterNoLock} method. */ public void leaveNoLock() { ctx.tm().resetContext(); @@ -125,6 +124,22 @@ public class GridCacheGateway { * @return Previous projection set on this thread. */ @Nullable public GridCacheProjectionImpl enter(@Nullable GridCacheProjectionImpl prj) { + try { + GridCacheAdapter cache = ctx.cache(); + + GridCachePreloader preldr = cache != null ? cache.preloader() : null; + + if (preldr == null) + throw new IllegalStateException("Grid is in invalid state to perform this operation. " + + "It either not started yet or has already being or have stopped [gridName=" + ctx.gridName() + ']'); + + preldr.startFuture().get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for cache preloader start [cacheName=" + + ctx.name() + "]", e); + } + onEnter(); rwLock.readLock(); @@ -132,7 +147,7 @@ public class GridCacheGateway { if (stopped) { rwLock.readUnlock(); - throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + throw new IllegalStateException("Cache has been stopped: " + ctx.name()); } // Must unlock in case of unexpected errors to avoid @@ -155,38 +170,12 @@ public class GridCacheGateway { onEnter(); if (stopped) - throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + throw new IllegalStateException("Cache has been stopped: " + ctx.name()); return setProjectionPerCall(prj); } /** - * On enter. - */ - private void onEnter() { - try { - ctx.itHolder().checkWeakQueue(); - - GridCacheAdapter cache = ctx.cache(); - - GridCachePreloader preldr = cache != null ? cache.preloader() : null; - - if (preldr == null) - throw new IllegalStateException("Grid is in invalid state to perform this operation. " + - "It either not started yet or has already being or have stopped [gridName=" + ctx.gridName() + ']'); - - preldr.startFuture().get(); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to wait for cache preloader start [cacheName=" + - ctx.name() + "]", e); - } - - if (ctx.deploymentEnabled()) - ctx.deploy().onEnter(); - } - - /** * Set thread local projection per call. * * @param prj Projection to guard. @@ -230,6 +219,16 @@ public class GridCacheGateway { /** * */ + private void onEnter() { + ctx.itHolder().checkWeakQueue(); + + if (ctx.deploymentEnabled()) + ctx.deploy().onEnter(); + } + + /** + * + */ public void block() { stopped = true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index d7ef8ba..c1a2d6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -81,7 +81,7 @@ public class IgniteCacheProxy extends AsyncSupportAdapter extends AsyncSupportAdapter prj, boolean async ) { - this(ctx, delegate, prj, async, false); + this(ctx, delegate, prj, async, true); } /** @@ -112,12 +112,14 @@ public class IgniteCacheProxy extends AsyncSupportAdapter ctx, GridCacheProjectionEx delegate, @Nullable GridCacheProjectionImpl prj, - boolean async, boolean lock + boolean async, + boolean lock ) { super(async); @@ -136,10 +138,13 @@ public class IgniteCacheProxy extends AsyncSupportAdapter cacheNoGate() { - return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), true); + return new IgniteCacheProxy<>(ctx, delegate, prj, isAsync(), false); } /** @@ -234,7 +239,11 @@ public class IgniteCacheProxy extends AsyncSupportAdapter prj0 = prj != null ? prj.withExpiryPolicy(plc) : delegate.withExpiryPolicy(plc); - return new IgniteCacheProxy<>(ctx, prj0, (GridCacheProjectionImpl)prj0, isAsync()); + return new IgniteCacheProxy<>(ctx, + prj0, + (GridCacheProjectionImpl)prj0, + isAsync(), + lock); } finally { onLeave(prev); @@ -779,8 +788,7 @@ public class IgniteCacheProxy extends AsyncSupportAdapter>() { - @Override - public void apply(IgniteInternalFuture fut) { + @Override public void apply(IgniteInternalFuture fut) { try { fut.get(); @@ -1343,7 +1351,7 @@ public class IgniteCacheProxy extends AsyncSupportAdapter extends AsyncSupportAdapter createAsyncInstance() { - return new IgniteCacheProxy<>(ctx, delegate, prj, true); + return new IgniteCacheProxy<>(ctx, delegate, prj, true, lock); } /** @@ -1447,7 +1455,8 @@ public class IgniteCacheProxy extends AsyncSupportAdapter((GridCacheContext)ctx, prj0, prj0, - isAsync()); + isAsync(), + lock); } finally { onLeave(prev); @@ -1477,7 +1486,8 @@ public class IgniteCacheProxy extends AsyncSupportAdapter(ctx, prj0, prj0, - isAsync()); + isAsync(), + lock); } finally { onLeave(prev); @@ -1507,38 +1517,6 @@ public class IgniteCacheProxy extends AsyncSupportAdapter)in.readObject(); - - delegate = (GridCacheProjectionEx)in.readObject(); - - prj = (GridCacheProjectionImpl)in.readObject(); - - gate = ctx.gate(); - - lock = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public IgniteFuture rebalance() { - ctx.preloader().forcePreload(); - - return new IgniteFutureImpl<>(ctx.preloader().syncFuture()); - } - /** * @param prj Projection to guard. * @return Previous projection set on this thread. @@ -1552,6 +1530,8 @@ public class IgniteCacheProxy extends AsyncSupportAdapter extends AsyncSupportAdapter extends GridCloseableIteratorAdapter { - /** */ - private X cur; + out.writeObject(delegate); - /** */ - private CacheQueryFuture fut; + out.writeObject(prj); - /** - * @param fut Future. - */ - protected ClIter(CacheQueryFuture fut) { - this.fut = fut; - } + out.writeBoolean(lock); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + ctx = (GridCacheContext)in.readObject(); - /** {@inheritDoc} */ - @Override protected Y onNext() throws IgniteCheckedException { - if (!onHasNext()) - throw new NoSuchElementException(); + delegate = (GridCacheProjectionEx)in.readObject(); - X e = cur; + prj = (GridCacheProjectionImpl)in.readObject(); - cur = null; + gate = ctx.gate(); - return convert(e); - } + lock = in.readBoolean(); + } - /** - * @param x X. - */ - protected abstract Y convert(X x); + /** {@inheritDoc} */ + @Override public IgniteFuture rebalance() { + ctx.preloader().forcePreload(); - /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { - return cur != null || (cur = fut.next()) != null; - } + return new IgniteFutureImpl<>(ctx.preloader().syncFuture()); + } - /** {@inheritDoc} */ - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); - } + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteCacheProxy.class, this); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23a41dfd/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java index 52471cd..21ba3ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerUpdateJob.java @@ -88,18 +88,6 @@ class DataStreamerUpdateJob implements GridPlainCallable { if (log.isDebugEnabled()) log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']'); -// TODO IGNITE-77: restore adapter usage. -// TODO use cacheContext.awaitStarted() instead of preloader().startFuture().get() -// GridCacheAdapter cache = ctx.cache().internalCache(cacheName); -// -// IgniteFuture f = cache.context().preloader().startFuture(); -// -// if (!f.isDone()) -// f.get(); -// -// if (ignoreDepOwnership) -// cache.context().deploy().ignoreOwnership(true); - IgniteCacheProxy cache = ctx.cache().jcache(cacheName).cacheNoGate(); cache.context().awaitStarted();