Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5A74E200C4B for ; Mon, 20 Mar 2017 13:49:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5900C160B8F; Mon, 20 Mar 2017 12:49:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 85333160B76 for ; Mon, 20 Mar 2017 13:49:07 +0100 (CET) Received: (qmail 91338 invoked by uid 500); 20 Mar 2017 12:49:06 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 91325 invoked by uid 99); 20 Mar 2017 12:49:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Mar 2017 12:49:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9E5FCDFF58; Mon, 20 Mar 2017 12:49:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 20 Mar 2017 12:49:06 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] ignite git commit: WIP on operation handler. archived-at: Mon, 20 Mar 2017 12:49:08 -0000 Repository: ignite Updated Branches: refs/heads/ignite-4565-ddl 2b7c1a2c9 -> 1b2a3dedb WIP on operation handler. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee37450a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee37450a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee37450a Branch: refs/heads/ignite-4565-ddl Commit: ee37450a69634f29586b5f7e897c6a41322a602f Parents: 2b7c1a2 Author: devozerov Authored: Mon Mar 20 15:34:35 2017 +0300 Committer: devozerov Committed: Mon Mar 20 15:34:35 2017 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryProcessor.java | 76 +++++---- .../query/ddl/IndexOperationHandler.java | 161 +++++++++++++++++++ 2 files changed, 197 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 1bb3c1c..7fe83ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -222,8 +222,17 @@ public class GridQueryProcessor extends GridProcessorAdapter { if (initIdxStates != null) { Map readyIdxStates = initIdxStates.readyOperations(); - for (QueryTypeCandidate cand : cands) - applyReadyDynamicOperations(cand.descriptor(), readyIdxStates); + for (QueryTypeCandidate cand : cands) { + QueryTypeDescriptorImpl desc = cand.descriptor(); + + for (Map.Entry entry : readyIdxStates.entrySet()) { + String idxName = entry.getKey(); + QueryIndexState idxState = entry.getValue(); + + if (F.eq(desc.tableName(), idxState.tableName())) + QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc); + } + } } // Ready to register at this point. @@ -245,19 +254,19 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Apply ready dynamic index states to not-yet-registered descriptor. + * Find current coordinator. * - * @param desc Descriptor. - * @param idxStates Index states. + * @return {@code True} if node is coordinator. */ - private void applyReadyDynamicOperations(QueryTypeDescriptorImpl desc, Map idxStates) - throws IgniteCheckedException { - for (Map.Entry entry : idxStates.entrySet()) { - String idxName = entry.getKey(); - QueryIndexState idxState = entry.getValue(); + private ClusterNode findCoordinator() { + ClusterNode res = null; - QueryUtils.processDynamicIndexChange(idxName, idxState.index(), desc); + for (ClusterNode node : ctx.discovery().aliveServerNodes()) { + if (res == null || res.order() > node.order()) + res = node; } + + return res; } /** {@inheritDoc} */ @@ -308,6 +317,10 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * Handle cache start. Invoked either from GridCacheProcessor.onKernalStart() method or from exchange worker. + * When called for the first time, we initialize topology thus understanding whether current node is coordinator + * or not. + * * @param cctx Cache context. * @param idxStates Index states. * @throws IgniteCheckedException If failed. @@ -353,33 +366,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { */ public void onIndexAcceptMessage(IndexAcceptDiscoveryMessage msg) { idxWorker.onAccept(msg); - } - - /** - * Handle index accept message. - * - * @param msg Message. - */ - public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) { - idxWorker.onFinish(msg); - } - - /** - * Handle node leave. - * - * @param node Node. - */ - public void onNodeLeave(ClusterNode node) { - // TODO. - } - /** - * Handle index init discovery message. - * - * @param space Space. - * @param op Operation. - */ - public void onIndexAccept(String space, AbstractIndexOperation op) { idxLock.writeLock().lock(); // TODO @@ -456,12 +443,21 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** - * Handle index ack discovery message. + * Handle index accept message. * * @param msg Message. */ - private void onIndexAckDiscoveryMessage(String space, IndexAcceptDiscoveryMessage msg) { - // TODO + public void onIndexFinishMessage(IndexFinishDiscoveryMessage msg) { + idxWorker.onFinish(msg); + } + + /** + * Handle node leave. + * + * @param node Node. + */ + public void onNodeLeave(ClusterNode node) { + // TODO. } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ee37450a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java new file mode 100644 index 0000000..6932724 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/ddl/IndexOperationHandler.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.ddl; + +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.thread.IgniteThread; +import org.jetbrains.annotations.Nullable; + +import java.util.concurrent.CountDownLatch; + +/** + * Index change handler. + */ +public class IndexOperationHandler { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Query processor */ + private final GridQueryProcessor qryProc; + + /** Logger. */ + private final IgniteLogger log; + + /** Target operation. */ + private final AbstractIndexOperation op; + + /** Operation future. */ + private final GridFutureAdapter opFut; + + /** Mutex for concurrent access. */ + private final Object mux = new Object(); + + /** Init flag. */ + private boolean init; + + /** Cancel flag. */ + private boolean cancel; + + /** Worker. */ + private IndexWorker worker; + + /** + * Constructor. + * + * @param ctx Context. + * @param qryProc Query processor. + * @param op Target operation. + */ + public IndexOperationHandler(GridKernalContext ctx, GridQueryProcessor qryProc, AbstractIndexOperation op) { + this.ctx = ctx; + this.qryProc = qryProc; + this.op = op; + + log = ctx.log(IndexOperationHandler.class); + opFut = new GridFutureAdapter(); + } + + /** + * Perform initialization routine. + */ + public void init() { + synchronized (mux) { + if (!init) { + init = true; + + if (!cancel) { + worker = new IndexWorker(ctx.igniteInstanceName(), workerName(), log); + + new IgniteThread(worker).start(); + + worker.awaitStart(); + } + } + } + } + + /** + * @return Worker name. + */ + private String workerName() { + return "index-op-worker" + op.space() + "-" + op.tableName() + "-" + op.indexName(); + } + + /** + * Cancel operation. + */ + public void cancel() { + synchronized (mux) { + if (!cancel) { + cancel = true; + + if (worker != null) + worker.cancel(); + } + + // TODO + } + } + + /** + * Single-shot index worker responsible for operation execution. + */ + private class IndexWorker extends GridWorker { + /** Worker start latch. */ + private final CountDownLatch startLatch = new CountDownLatch(1); + + /** + * Constructor. + * + * @param igniteInstanceName Ignite instance name. + * @param name Worker name. + * @param log Logger. + */ + public IndexWorker(@Nullable String igniteInstanceName, String name, IgniteLogger log) { + super(igniteInstanceName, name, log); + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + startLatch.countDown(); + + // TODO: Do actual create/drop. + } + + /** + * Await start. + */ + private void awaitStart() { + try { + startLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException("Interrupted while waiting index operation worker start: " + + name(), e); + } + } + } +}