Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 95CB4200C3D for ; Tue, 14 Mar 2017 15:34:58 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 945C9160B7E; Tue, 14 Mar 2017 14:34:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B9F35160B7C for ; Tue, 14 Mar 2017 15:34:57 +0100 (CET) Received: (qmail 52300 invoked by uid 500); 14 Mar 2017 14:34:57 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 52285 invoked by uid 99); 14 Mar 2017 14:34:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Mar 2017 14:34:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3149ADFDC5; Tue, 14 Mar 2017 14:34:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Message-Id: <22c3ad688d064e41a9eb1c5b2c107636@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: WIP on state handling. Date: Tue, 14 Mar 2017 14:34:56 +0000 (UTC) archived-at: Tue, 14 Mar 2017 14:34:58 -0000 Repository: ignite Updated Branches: refs/heads/ignite-4565-ddl 4cc2b606b -> a45e24ed7 WIP on state handling. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a45e24ed Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a45e24ed Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a45e24ed Branch: refs/heads/ignite-4565-ddl Commit: a45e24ed715e530c5cf8c723ce0950a32e61d92c Parents: 4cc2b60 Author: devozerov Authored: Tue Mar 14 17:34:47 2017 +0300 Committer: devozerov Committed: Tue Mar 14 17:34:47 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheProcessor.java | 19 +++++--- .../processors/query/QueryIndexStates.java | 47 +++++++++++++------- 2 files changed, 43 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a45e24ed/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 2df2ab6..5a961fe 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1173,6 +1173,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.stop(cancel); } + // TODO: Make sure to notify query client futures. ctx.kernalContext().query().onCacheStop(ctx); ctx.kernalContext().continuous().onCacheStop(ctx); @@ -2702,7 +2703,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { else if (msg instanceof IndexAcceptDiscoveryMessage) onIndexAcceptMessage((IndexAcceptDiscoveryMessage)msg); else if (msg instanceof IndexFinishDiscoveryMessage) - onIndexfinishMessage((IndexFinishDiscoveryMessage)msg); + onIndexFinishMessage((IndexFinishDiscoveryMessage)msg); else U.warn(log, "Unsupported index discovery message type (will ignore): " + msg); } @@ -2750,7 +2751,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (idxStates == null) idxStates = new QueryIndexStates(); - if (idxStates.propose(ctx.localNodeId(), msg)) + if (idxStates.propose(locNodeId, msg)) desc.indexStates(idxStates); } @@ -2760,11 +2761,17 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param msg Message. */ private void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) { - // TODO: Remove init operation from descriptor! + AbstractIndexOperation op = msg.operation(); + + DynamicCacheDescriptor desc = cacheDescriptor(op.space()); + + if (desc == null) + return; - // TODO: Handle concurrent cache stop! + QueryIndexStates idxStates = desc.indexStates(); - // TODO: Enlist cache operation to descriptor! + if (idxStates == null || !idxStates.accept(msg)) + return; // TODO: Initiate exchange-like routine! } @@ -2774,7 +2781,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @param msg Message. */ - private void onIndexfinishMessage(IndexFinishDiscoveryMessage msg) { + private void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) { // TODO: Clear dynamic descriptors! // TODO: Delegate to indexing to handle result and complete client futures! http://git-wip-us.apache.org/repos/asf/ignite/blob/a45e24ed/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java index 13b1525..b775dc0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryIndexStates.java @@ -72,49 +72,62 @@ public class QueryIndexStates implements Serializable { * Process accept message propagating index from proposed to accepted state. * * @param msg Message. + * @return {@code True} if accept succeeded. It may fail in case of concurrent cache stop/start. */ - public void accept(IndexAcceptDiscoveryMessage msg) { + public boolean accept(IndexAcceptDiscoveryMessage msg) { AbstractIndexOperation op = msg.operation(); String idxName = op.indexName(); QueryIndexActiveOperation curOp = activeOps.get(idxName); - assert curOp != null && !curOp.accepted(); // Operation is found and is in proposed ("false") state. - assert F.eq(curOp.operation().operationId(), op.operationId()); // Operation ID matches. + if (curOp != null) { + if (F.eq(curOp.operation().operationId(), op.operationId())) { + assert !curOp.accepted(); - curOp.accept(); + curOp.accept(); + + return true; + } + } + + return false; } /** * Process finish message. * * @param msg Message. + * @return {@code True} if accept succeeded. It may fail in case of concurrent cache stop/start. */ - @SuppressWarnings("ConstantConditions") - public void finish(IndexFinishDiscoveryMessage msg) { + public boolean finish(IndexFinishDiscoveryMessage msg) { AbstractIndexOperation op = msg.operation(); String idxName = op.indexName(); QueryIndexActiveOperation curOp = activeOps.remove(idxName); - assert curOp != null; // Operation is found. - assert F.eq(curOp.operation().operationId(), op.operationId()); // Operation ID matches. + if (curOp != null) { + if (F.eq(curOp.operation().operationId(), op.operationId())) { + if (!msg.hasError()) { + QueryIndexState state; - if (!msg.hasError()) { - QueryIndexState state; + if (op instanceof CreateIndexOperation) + state = new QueryIndexState(idxName, ((CreateIndexOperation)op).index()); + else { + assert op instanceof DropIndexOperation; - if (op instanceof CreateIndexOperation) - state = new QueryIndexState(idxName, ((CreateIndexOperation)op).index()); - else { - assert op instanceof DropIndexOperation; + state = new QueryIndexState(idxName, null); + } - state = new QueryIndexState(idxName, null); - } + readyOps.put(idxName, state); + } - readyOps.put(idxName, state); + return true; + } } + + return false; } /**