ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [27/50] [abbrv] ignite git commit: io test
Date Mon, 10 Oct 2016 14:57:32 GMT
io test


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb7d7cff
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb7d7cff
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb7d7cff

Branch: refs/heads/ignite-gg-8-io2-park
Commit: cb7d7cffb8709d4e8ad2faaf2bc441b5c0473a5d
Parents: 4ba2deb
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Sep 30 10:57:37 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Sep 30 11:00:26 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  10 ++
 .../managers/communication/GridIoManager.java   | 117 ++++++++++++++++---
 .../cache/IgniteIoTestAbstractBenchmark.java    |  61 ++++++++++
 .../yardstick/cache/IgniteIoTestBenchmark.java  |  73 ------------
 .../cache/IgniteIoTestSendAllBenchmark.java     |  32 +++++
 .../cache/IgniteIoTestSendRandomBenchmark.java  |  35 ++++++
 6 files changed, 236 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cb7d7cff/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 1e55463..29691e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3441,6 +3441,16 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         return ctx.io().sendIoTest(node, payload, procFromNioThread);
     }
 
+    /**
+     * @param nodes Nodes.
+     * @param payload Message payload.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload,
boolean procFromNioThread) {
+        return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb7d7cff/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 8c2fc05..bb7bf69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -215,7 +215,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     private boolean stopping;
 
     /** */
-    private final AtomicReference<ConcurrentHashMap<Long, GridFutureAdapter>>
ioTestMap = new AtomicReference<>();
+    private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap
= new AtomicReference<>();
 
     /** */
     private final AtomicLong ioTestId = new AtomicLong();
@@ -384,45 +384,72 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     }
                 }
                 else {
-                    GridFutureAdapter fut = ioTestMap().remove(msg0.id());
+                    IoTestFuture fut = ioTestMap().get(msg0.id());
 
-                    if (fut == null) {
+                    if (fut == null)
                         U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
-
-                        return;
-                    }
-
-                    fut.onDone();
+                    else
+                        fut.onResponse();
                 }
             }
         });
     }
 
     /**
+     * @param nodes Nodes.
+     * @param payload Payload.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload,
boolean procFromNioThread) {
+        long id = ioTestId.getAndIncrement();
+
+        IoTestFuture fut = new IoTestFuture(id, nodes.size());
+
+        IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+        msg.processFromNioThread(procFromNioThread);
+
+        ioTestMap().put(id, fut);
+
+        for (int i = 0; i < nodes.size(); i++) {
+            ClusterNode node = nodes.get(i);
+
+            try {
+                send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                ioTestMap().remove(msg.id());
+
+                return new GridFinishedFuture(e);
+            }
+        }
+
+        return fut;
+    }
+
+    /**
      * @param node Node.
      * @param payload Payload.
      * @param procFromNioThread If {@code true} message is processed from NIO thread.
      * @return Response future.
      */
     public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread)
{
-        if (ctx.localNodeId().equals(node.id()))
-            throw new IllegalArgumentException();
-
         long id = ioTestId.getAndIncrement();
 
-        GridFutureAdapter fut = new GridFutureAdapter();
+        IoTestFuture fut = new IoTestFuture(id, 1);
 
-        ioTestMap().put(id, fut);
+        IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
 
-        try {
-            IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+        msg.processFromNioThread(procFromNioThread);
 
-            msg.processFromNioThread(procFromNioThread);
+        ioTestMap().put(id, fut);
 
+        try {
             send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
         }
         catch (IgniteCheckedException e) {
-            ioTestMap().remove(id);
+            ioTestMap().remove(msg.id());
 
             return new GridFinishedFuture(e);
         }
@@ -433,8 +460,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /**
      * @return IO test futures map.
      */
-    private ConcurrentHashMap<Long, GridFutureAdapter> ioTestMap() {
-        ConcurrentHashMap<Long, GridFutureAdapter> map = ioTestMap.get();
+    private ConcurrentHashMap<Long, IoTestFuture> ioTestMap() {
+        ConcurrentHashMap<Long, IoTestFuture> map = ioTestMap.get();
 
         if (map == null) {
             if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
@@ -2720,4 +2747,56 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return S.toString(DelayedMessage.class, this, super.toString());
         }
     }
+
+    /**
+     *
+     */
+    private class IoTestFuture extends GridFutureAdapter<Object> {
+        /** */
+        private final long id;
+
+        /** */
+        private int cntr;
+
+        /**
+         * @param id ID.
+         * @param cntr Counter.
+         */
+        IoTestFuture(long id, int cntr) {
+            assert cntr > 0 : cntr;
+
+            this.id = id;
+            this.cntr = cntr;
+        }
+
+        /**
+         *
+         */
+        void onResponse() {
+            boolean complete;
+
+            synchronized (this) {
+                complete = --cntr == 0;
+            }
+
+            if (complete)
+                onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                ioTestMap().remove(id);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(IoTestFuture.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb7d7cff/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestAbstractBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestAbstractBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestAbstractBenchmark.java
new file mode 100644
index 0000000..17d1742
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestAbstractBenchmark.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+
+/**
+ *
+ */
+public abstract class IgniteIoTestAbstractBenchmark extends IgniteAbstractBenchmark {
+    /** */
+    protected final List<ClusterNode> targetNodes = new ArrayList<>();
+
+    /** */
+    protected IgniteKernal ignite;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        ignite = (IgniteKernal)ignite();
+
+        ClusterNode loc = ignite().cluster().localNode();
+
+        Collection<ClusterNode> nodes = ignite().cluster().forServers().nodes();
+
+        for (ClusterNode node : nodes) {
+            if (!loc.equals(node))
+                targetNodes.add(node);
+        }
+
+        if (targetNodes.isEmpty())
+            throw new IgniteException("Failed to find remote server nodes [nodes=" + nodes
+ ']');
+
+        BenchmarkUtils.println(cfg, "Initialized target nodes: " + F.nodeIds(targetNodes)
+ ']');
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb7d7cff/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
deleted file mode 100644
index bee45e0..0000000
--- a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.yardstick.cache;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
-import org.yardstickframework.BenchmarkConfiguration;
-import org.yardstickframework.BenchmarkUtils;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public class IgniteIoTestBenchmark extends IgniteAbstractBenchmark {
-    /** */
-    private List<ClusterNode> targetNodes;
-
-    /** */
-    private IgniteKernal ignite;
-
-    /** {@inheritDoc} */
-    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
-        super.setUp(cfg);
-
-        ignite = (IgniteKernal)ignite();
-
-        targetNodes = new ArrayList<>();
-
-        ClusterNode loc = ignite().cluster().localNode();
-
-        Collection<ClusterNode> nodes = ignite().cluster().forServers().nodes();
-
-        for (ClusterNode node : nodes) {
-            if (!loc.equals(node))
-                targetNodes.add(node);
-        }
-
-        if (targetNodes.isEmpty())
-            throw new IgniteException("Failed to find remote server nodes [nodes=" + nodes
+ ']');
-
-        BenchmarkUtils.println(cfg, "Initialized target nodes: " + targetNodes + ']');
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
-        ClusterNode node = targetNodes.get(nextRandom(targetNodes.size()));
-
-        ignite.sendIoTest(node, null, false).get();
-
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb7d7cff/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestSendAllBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestSendAllBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestSendAllBenchmark.java
new file mode 100644
index 0000000..d77c4a6
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestSendAllBenchmark.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+
+/**
+ *
+ */
+public class IgniteIoTestSendAllBenchmark extends IgniteIoTestAbstractBenchmark {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        ignite.sendIoTest(targetNodes, null, false).get();
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb7d7cff/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestSendRandomBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestSendRandomBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestSendRandomBenchmark.java
new file mode 100644
index 0000000..ac3a070
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestSendRandomBenchmark.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+
+/**
+ *
+ */
+public class IgniteIoTestSendRandomBenchmark extends IgniteIoTestAbstractBenchmark {
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        ClusterNode node = targetNodes.get(nextRandom(targetNodes.size()));
+
+        ignite.sendIoTest(node, null, false).get();
+
+        return true;
+    }
+}


Mime
View raw message