ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: WIred up discovery message handling and added duplicate checks.
Date Mon, 13 Mar 2017 12:16:35 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl dcb8b8511 -> f472918f4


WIred up discovery message handling and added duplicate checks.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f472918f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f472918f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f472918f

Branch: refs/heads/ignite-4565-ddl
Commit: f472918f4aac448db75ddac3cfde867114506084
Parents: dcb8b85
Author: devozerov <vozerov@gridgain.com>
Authored: Mon Mar 13 15:16:27 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Mon Mar 13 15:16:27 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |  8 ++
 .../processors/query/GridQueryProcessor.java    | 57 ++++++++++++
 .../internal/processors/query/QueryUtils.java   | 15 ++++
 .../query/h2/ddl/DdlSchemaVersion.java          | 91 --------------------
 4 files changed, 80 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f472918f/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 47a66fe..ce1ffbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -544,6 +544,14 @@ public final class IgniteSystemProperties {
      */
     public static final String IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI = "IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI";
 
+    /**
+     * Indexing discovery history size. Protects from duplicate messages maintaining the
list of IDs of recently
+     * arrived discovery messages.
+     * <p>
+     * Defaults to {@code 1000}.
+     */
+    public static final String IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE = "IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI";
+
     /** Returns true for system properties only avoiding sending sensitive information. */
     private static final IgnitePredicate<Map.Entry<String, String>> PROPS_FILTER
= new IgnitePredicate<Map.Entry<String, String>>() {
         @Override public boolean apply(final Map.Entry<String, String> entry) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f472918f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 51b675b..8ec641a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -42,10 +42,12 @@ import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -57,8 +59,11 @@ import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.query.ddl.CreateIndexOperation;
+import org.apache.ignite.internal.processors.query.ddl.IndexAbstractDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.ddl.IndexAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.query.ddl.IndexInitDiscoveryMessage;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
@@ -68,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 
@@ -105,6 +111,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /** Index create/drop client futures. */
     private final ConcurrentMap<UUID, QueryIndexClientFuture> idxCliFuts = new ConcurrentHashMap<>();
 
+    /** ID history for index create/drop discovery messages. */
+    private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> idxDiscoMsgIdHist
=
+        new GridBoundedConcurrentLinkedHashSet<>(QueryUtils.discoveryHistorySize());
+
     /** */
     private GridTimeoutProcessor.CancelableTask qryDetailMetricsEvictTask;
 
@@ -136,6 +146,23 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             idx.start(ctx, busyLock);
         }
 
+        ctx.discovery().setCustomEventListener(IndexAbstractDiscoveryMessage.class,
+            new CustomEventListener<IndexAbstractDiscoveryMessage>() {
+                /** {@inheritDoc} */
+                @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
+                @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode
snd,
+                    IndexAbstractDiscoveryMessage msg) {
+                    if (notDuplicate(msg)) {
+                        if (msg instanceof IndexInitDiscoveryMessage)
+                            onIndexInitDiscoveryMessage((IndexInitDiscoveryMessage)msg);
+                        if (msg instanceof IndexAckDiscoveryMessage)
+                            onIndexAckDiscoveryMessage((IndexAckDiscoveryMessage)msg);
+                        else
+                            U.warn(log, "Unexpected custom discovery message [msg=" + msg
+ ']');
+                    }
+                }
+            });
+
         // Schedule queries detail metrics eviction.
         qryDetailMetricsEvictTask = ctx.timeout().schedule(new Runnable() {
             @Override public void run() {
@@ -283,6 +310,24 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Handle index init discovery message.
+     *
+     * @param msg Message.
+     */
+    private void onIndexInitDiscoveryMessage(IndexInitDiscoveryMessage msg) {
+        // TODO
+    }
+
+    /**
+     * Handle index ack discovery message.
+     *
+     * @param msg Message.
+     */
+    private void onIndexAckDiscoveryMessage(IndexAckDiscoveryMessage msg) {
+        // TODO
+    }
+
+    /**
      * Register cache in indexing SPI.
      *
      * @param space Space.
@@ -1094,6 +1139,18 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Ensuret that arrived discovery message is not duplicate.
+     *
+     * @param msg Message.
+     * @return {@code True} if message is not duplicated and should be processed further.
+     */
+    private boolean notDuplicate(IndexAbstractDiscoveryMessage msg) {
+        IgniteUuid id = msg.id();
+
+        return idxDiscoMsgIdHist.add(id);
+    }
+
+    /**
      * @param ver Version.
      */
     public static void setRequestAffinityTopologyVersion(AffinityTopologyVersion ver) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f472918f/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
index 0c91b57..3e71e17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryUtils.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheTypeMetadata;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
@@ -55,6 +56,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.ignite.IgniteSystemProperties.*;
+
 /**
  * Utility methods for queries.
  */
@@ -62,6 +65,9 @@ public class QueryUtils {
     /** */
     public static final String _VAL = "_val";
 
+    /** Discovery history size. */
+    private static final int DISCO_HIST_SIZE = getInteger(IGNITE_INDEXING_DISCOVERY_HISTORY_SIZE,
1000);
+
     /** */
     private static final Class<?> GEOMETRY_CLASS = U.classForName("com.vividsolutions.jts.geom.Geometry",
null);
 
@@ -972,6 +978,15 @@ public class QueryUtils {
     }
 
     /**
+     * Discovery history size.
+     *
+     * @return Discovery history size.
+     */
+    public static int discoveryHistorySize() {
+        return DISCO_HIST_SIZE;
+    }
+
+    /**
      * Private constructor.
      */
     private QueryUtils() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f472918f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlSchemaVersion.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlSchemaVersion.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlSchemaVersion.java
deleted file mode 100644
index 50c292c..0000000
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ddl/DdlSchemaVersion.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.ddl;
-
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * DDL schema version. Defines certain schema state and allows to filter unwanted stale events.
- */
-public class DdlSchemaVersion implements Comparable<DdlSchemaVersion> {
-    /** Topology version. */
-    private final long topVer;
-
-    /** Counter. */
-    private final long ctr;
-
-    /**
-     * Constructor.
-     *
-     * @param topVer Topology version.
-     * @param ctr Counter.
-     */
-    public DdlSchemaVersion(long topVer, long ctr) {
-        this.topVer = topVer;
-        this.ctr = ctr;
-    }
-
-    /**
-     * @return Topology version.
-     */
-    public long topologyVersion() {
-        return topVer;
-    }
-
-    /**
-     * @return Counter.
-     */
-    public long counter() {
-        return ctr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return 31 * (int)(topVer ^ (topVer >>> 32)) + (int)(ctr ^ (ctr >>>
32));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean equals(Object obj) {
-        if (obj == this)
-            return true;
-
-        if (obj != null && obj instanceof DdlSchemaVersion) {
-            DdlSchemaVersion other = (DdlSchemaVersion)obj;
-
-            return ctr == other.ctr && topVer == other.topVer;
-        }
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int compareTo(@NotNull DdlSchemaVersion other) {
-        long delta = topVer - other.topVer;
-
-        if (delta == 0)
-            delta = ctr - other.ctr;
-
-        return delta > 0 ? 1 : delta < 0 ? -1 : 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(DdlSchemaVersion.class, this);
-    }
-}


Mime
View raw message