ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [4/4] incubator-ignite git commit: #[GG-10298]: corrected after review.
Date Tue, 30 Jun 2015 11:13:36 GMT
#[GG-10298]: corrected after review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/39f6928d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/39f6928d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/39f6928d

Branch: refs/heads/ignite-1058
Commit: 39f6928defd2050aca05effaa590eedf628fddc8
Parents: 671fb68
Author: iveselovskiy <iveselovskiy@gridgain.com>
Authored: Tue Jun 30 14:04:42 2015 +0300
Committer: iveselovskiy <iveselovskiy@gridgain.com>
Committed: Tue Jun 30 14:04:42 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 70 ++++++++++----------
 .../plugin/extensions/communication/IoPool.java | 42 ++++++++++++
 .../communication/IoPoolExtension.java          | 50 --------------
 3 files changed, 78 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39f6928d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index a58cf73..b79c8b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -68,9 +68,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Disconnect listeners. */
     private final Collection<GridDisconnectListener> disconnectLsnrs = new ConcurrentLinkedQueue<>();
 
-    /** Map of {@link IoPoolExtension}-s injected by Ignite plugins. */
-    // TODO: This should not be concurrent map. Either HashMap or array.
-    private final ConcurrentMap<Byte, IoPoolExtension> ioPoolExtensionMap = new ConcurrentHashMap8<>(128);
+    /** Map of {@link IoPool}-s injected by Ignite plugins. */
+    private final IoPool[] ioPools = new IoPool[128];
 
     /** Public pool. */
     private ExecutorService pubPool;
@@ -270,30 +269,30 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     private void registerIoPoolExtensions() throws IgniteCheckedException {
         // Process custom IO messaging pool extensions:
-        final IoPoolExtension[] executorExtensions
-            = ctx.plugins().extensions(IoPoolExtension.class);
+        final IoPool[] executorExtensions
+            = ctx.plugins().extensions(IoPool.class);
 
         if (executorExtensions != null) {
             // Store it into the map and check for duplicates:
-            for (IoPoolExtension ex : executorExtensions) {
+            for (IoPool ex : executorExtensions) {
                 final byte id = ex.id();
 
                 // 1. Check the pool id is non-negative:
                 if (id < 0)
                     throw new IgniteCheckedException("Failed to register IO executor pool
because its Id is negative " +
-                        "[id=" + id + ", pluginId=" + ex.pligunId() + ']');
+                        "[id=" + id + ']');
 
                 // 2. Check the pool id is in allowed range:
                 if (isReservedGridIoPolicy(id))
                     throw new IgniteCheckedException("Failed to register IO executor pool
because its Id in in the " +
-                        "reserved range (0-31) [id=" + id + ", pluginId=" + ex.pligunId()
+ ']');
+                        "reserved range (0-31) [id=" + id + ']');
 
                 // 3. Check the pool for duplicates:
-                IoPoolExtension pushedOut = ioPoolExtensionMap.putIfAbsent(id, ex);
-                if (pushedOut != null)
+                if (ioPools[id] != null)
                     throw new IgniteCheckedException("Failed to register IO executor pool
because its " +
-                        "Id as already used [id=" + ex.id() + ", pluginId="
-                        + ex.pligunId() + ", existingPoolPluginId=" + pushedOut.pligunId()
+ ']');
+                        "Id as already used [id=" + id + ']');
+
+                ioPools[id] = ex;
             }
         }
     }
@@ -493,7 +492,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (log.isDebugEnabled())
             log.debug(stopInfo());
 
-        ioPoolExtensionMap.clear();
+        Arrays.fill(ioPools, null);
     }
 
     /**
@@ -586,17 +585,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 }
 
                 default:
-                    // TODO: Assert.
-                    if (plc < 0)
-                        throw new IgniteException("Failed to process message with negative
policy. [policy="
-                            + plc + ']');
-
-                    // TODO: If from reserved range - ignore.
-                    // TODO: If from not-reserved range and IoExtension exists - process.
-                    // TODO: If from not-reserved range and IoExtension doesnt exist - ignore.
+                    assert plc >= 0 : "Negative policy: " + plc;
+
                     if (isReservedGridIoPolicy(plc))
-                        throw new IgniteException("Failed to process message with policy
of reserved range. [policy="
-                            + plc + ']');
+                        throw new IgniteCheckedException("Failed to process message with
policy of reserved range. " +
+                            "[policy=" + plc + ']');
 
                     if (msg.isOrdered())
                         processOrderedMessage(nodeId, msg, plc, msgC);
@@ -618,7 +611,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Policy.
      * @return Execution pool.
      */
-    private Executor pool(byte plc) {
+    private Executor pool(byte plc) throws IgniteCheckedException {
         switch (plc) {
             case P2P_POOL:
                 return p2pPool;
@@ -642,21 +635,27 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 return marshCachePool;
 
             default: {
+                assert plc >= 0 : "Negative policy: " + plc;
+
                 if (isReservedGridIoPolicy(plc))
-                    throw new IgniteException("Failed to process message with policy of reserved
range, [policy="
-                        + plc + ']');
+                    throw new IgniteCheckedException("Failed to process message with policy
of reserved" +
+                        " range (0-31), [policy=" + plc + ']');
 
-                IoPoolExtension pool = ioPoolExtensionMap.get(plc);
+                IoPool pool = ioPools[plc];
 
-                // TODO: Assert
                 if (pool == null)
-                    throw new IgniteException("Failed to process message because corresponding
executor pool is not "
-                        + " found. [id=" + plc + ']');
+                    throw new IgniteCheckedException("Failed to process message because no
pool is registered " +
+                        "for policy. [policy=" + plc + ']');
 
                 assert plc == pool.id();
 
-                // TODO: Check for null and throw IgniteCheckedException.
-                return pool.executor();
+                Executor ex = pool.executor();
+
+                if (ex == null)
+                    throw new IgniteCheckedException("Failed to process message because corresponding
executor " +
+                        "is null. [id=" + plc + ']');
+
+                return ex;
             }
         }
     }
@@ -718,7 +717,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         final GridIoMessage msg,
         byte plc,
         final IgniteRunnable msgC
-    ) {
+    ) throws IgniteCheckedException {
         Runnable c = new Runnable() {
             @Override public void run() {
                 try {
@@ -776,7 +775,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         final GridIoMessage msg,
         final byte plc,
         @Nullable final IgniteRunnable msgC
-    ) {
+    ) throws IgniteCheckedException {
         assert msg != null;
 
         long timeout = msg.timeout();
@@ -1451,6 +1450,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 for (GridCommunicationMessageSet msgSet : msgSets)
                     unwindMessageSet(msgSet, lsnr);
             }
+            catch (IgniteCheckedException ice) {
+                throw new IgniteException(ice);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39f6928d/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPool.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPool.java
new file mode 100644
index 0000000..e87b82c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPool.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.plugin.extensions.communication;
+
+import org.apache.ignite.plugin.*;
+
+import java.util.concurrent.*;
+
+/**
+ * The interface of IO Messaging Pool Extension.
+ */
+public interface IoPool extends Extension {
+    /**
+     * Gets the numeric identifier of the pool. This identifier is to be taken from serialized
+     * message and used to find the appropriate executor pool to process it.
+     *
+     * @return The id.
+     */
+    public byte id();
+
+    /**
+     * Gets the Executor for this Pool. Cannot be null.
+     *
+     * @return The executor.
+     */
+    public Executor executor();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/39f6928d/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPoolExtension.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPoolExtension.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPoolExtension.java
deleted file mode 100644
index f452bfd..0000000
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/IoPoolExtension.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.plugin.extensions.communication;
-
-import org.apache.ignite.plugin.*;
-
-import java.util.concurrent.*;
-
-/**
- * The interface of IO Messaging Pool Extension.
- */
-// TODO: IoPoolExtension -> IoPool
-public interface IoPoolExtension extends Extension {
-    /**
-     * Gets the numeric identifier of the pool. This identifier is to be taken from serialized
-     * message and used to find the appropriate executor pool to process it.
-     *
-     * @return The id.
-     */
-    public byte id();
-
-    /**
-     * Gets the Executor for this Pool. Cannot be null.
-     *
-     * @return The executor.
-     */
-    public Executor executor();
-
-    // TODO: Remove.
-    /**
-     * Gets the Id of the plugin that injected this executor pool.
-     * @return The plugin Id.
-     */
-    public String pligunId();
-}


Mime
View raw message