ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: WIP on client future handling.
Date Fri, 10 Mar 2017 08:11:11 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl abf2f3389 -> e316c4b74


WIP on client future handling.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e316c4b7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e316c4b7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e316c4b7

Branch: refs/heads/ignite-4565-ddl
Commit: e316c4b7464c11ef4eeb7a1a2509e897f67961f5
Parents: abf2f33
Author: devozerov <vozerov@gridgain.com>
Authored: Fri Mar 10 11:11:04 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Fri Mar 10 11:11:04 2017 +0300

----------------------------------------------------------------------
 .../query/index/QueryIndexClientFuture.java     | 79 ++++++++++++++++++++
 .../query/index/QueryIndexHandler.java          | 69 +++++++++++++----
 2 files changed, 133 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e316c4b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexClientFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexClientFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexClientFuture.java
new file mode 100644
index 0000000..53a1770
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexClientFuture.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.index;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.UUID;
+
+/**
+ * Future for dynamic index create/drop returned to the client..
+ */
+public class QueryIndexClientFuture extends GridFutureAdapter<Object> {
+    /** Operation ID. */
+    private final UUID opId;
+
+    /** Key. */
+    private final QueryIndexKey key;
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation ID.
+     * @param key Key.
+     */
+    public QueryIndexClientFuture(UUID opId, QueryIndexKey key) {
+        this.opId = opId;
+        this.key = key;
+    }
+
+    /**
+     * @return Operation ID.
+     */
+    public UUID operationId() {
+        return opId;
+    }
+
+    /**
+     * @return Index key.
+     */
+    public QueryIndexKey getKey() {
+        return key;
+    }
+
+    /**
+     * Handle cache stop.
+     */
+    public void onCacheStop() {
+        onDone(new IgniteException("Operation failed because cache was stopped."));
+    }
+
+    /**
+     * Handle type undeploy.
+     */
+    public void onTypeUnregistered() {
+        onDone(new IgniteException("Operation failed because type was undeployed."));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(QueryIndexClientFuture.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e316c4b7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexHandler.java
index ce28a87..1623fc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/index/QueryIndexHandler.java
@@ -30,7 +30,7 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
 
 import java.util.Collection;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -52,7 +52,7 @@ public class QueryIndexHandler {
 
     /** Client futures. */
     // TODO: Special future which is aware of index key, handle it during cache, type undeploy
and disconnect.
-    private final Map<UUID, GridFutureAdapter> cliFuts = new ConcurrentHashMap<>();
+    private final Map<UUID, QueryIndexClientFuture> cliFuts = new ConcurrentHashMap<>();
 
     /** RW lock. */
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -135,14 +135,15 @@ public class QueryIndexHandler {
         lock.writeLock().lock();
 
         try {
-            Iterator<Map.Entry<QueryIndexKey, Descriptor>> iter = idxs.entrySet().iterator();
+            // Find matching indexes.
+            Collection<QueryIndexKey> rmvIdxKeys = new HashSet<>();
 
-            while (iter.hasNext()) {
-                Map.Entry<QueryIndexKey, Descriptor> entry = iter.next();
-
-                if (F.eq(space, entry.getValue().type().space()))
-                    iter.remove();
+            for (QueryIndexKey key : idxs.keySet()) {
+                if (F.eq(space, key.space()))
+                    rmvIdxKeys.add(key);
             }
+
+            removeIndexes(rmvIdxKeys, false);
         }
         finally {
             lock.writeLock().unlock();
@@ -158,14 +159,18 @@ public class QueryIndexHandler {
         lock.writeLock().lock();
 
         try {
-            Iterator<Map.Entry<QueryIndexKey, Descriptor>> iter = idxs.entrySet().iterator();
+            // Find matching indexes.
+            Collection<QueryIndexKey> rmvKeys = new HashSet<>();
 
-            while (iter.hasNext()) {
-                Map.Entry<QueryIndexKey, Descriptor> entry = iter.next();
+            for (Map.Entry<QueryIndexKey, Descriptor> idxEntry : idxs.entrySet()) {
+                QueryIndexKey idxKey = idxEntry.getKey();
+                Descriptor idxDesc = idxEntry.getValue();
 
-                if (F.eq(desc, entry.getValue().type()))
-                    iter.remove();
+                if (F.eq(desc, idxDesc.type()))
+                    rmvKeys.add(idxKey);
             }
+
+            removeIndexes(rmvKeys, true);
         }
         finally {
             lock.writeLock().unlock();
@@ -173,6 +178,40 @@ public class QueryIndexHandler {
     }
 
     /**
+     * Remove indexes locally. Invoked when cache is either destroyed or type is unregistered.
+     *
+     * @param rmvIdxKeys Index keys to be removed.
+     * @param typUnregister {@code True} if type was undeployed, {@code false} if cache was
undeployed.
+     */
+    private void removeIndexes(Collection<QueryIndexKey> rmvIdxKeys, boolean typUnregister)
{
+        // Remove matched indexes
+        for (QueryIndexKey rmvIdxKey : rmvIdxKeys) {
+            // TODO: Callback to indexing SPI should be done from here
+            idxs.remove(rmvIdxKey);
+        }
+
+        // Complete pending futures.
+        Collection<UUID> rmvCliFutIds = new HashSet<>();
+
+        for (Map.Entry<UUID, QueryIndexClientFuture> cliFutEntry : cliFuts.entrySet())
{
+            UUID cliFutId = cliFutEntry.getKey();
+            QueryIndexClientFuture cliFut = cliFutEntry.getValue();
+
+            if (rmvIdxKeys.contains(cliFut.getKey()))
+                rmvCliFutIds.add(cliFutId);
+        }
+
+        for (UUID rmvCliFutId : rmvCliFutIds) {
+            QueryIndexClientFuture rmvCliFut = cliFuts.remove(rmvCliFutId);
+
+            if (typUnregister)
+                rmvCliFut.onTypeUnregistered();
+            else
+                rmvCliFut.onCacheStop();
+        }
+    }
+
+    /**
      * Handle disconnect.
      */
     public void onDisconnected() {
@@ -217,9 +256,9 @@ public class QueryIndexHandler {
             }
 
             UUID opId = UUID.randomUUID();
-            GridFutureAdapter fut = new GridFutureAdapter();
+            QueryIndexClientFuture fut = new QueryIndexClientFuture(opId, idxKey);
 
-            GridFutureAdapter oldFut = cliFuts.put(opId, fut);
+            QueryIndexClientFuture oldFut = cliFuts.put(opId, fut);
 
             assert oldFut == null;
 


Mime
View raw message