ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [14/50] [abbrv] ignite git commit: io test
Date Fri, 21 Oct 2016 15:54:41 GMT
io test


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1759cf3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1759cf3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1759cf3

Branch: refs/heads/ignite-comm-balance
Commit: d1759cf3a919a649f0b9ae4277fdfec9d29c6498
Parents: d3ecf93
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Sep 28 17:18:08 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Sep 28 17:40:34 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/GridTopic.java   |   5 +-
 .../apache/ignite/internal/IgniteKernal.java    |  10 +
 .../managers/communication/GridIoManager.java   | 104 +++++++++
 .../communication/GridIoMessageFactory.java     |   7 +-
 .../communication/IgniteIoTestMessage.java      | 216 +++++++++++++++++++
 .../yardstick/cache/IgniteIoTestBenchmark.java  |  73 +++++++
 6 files changed, 413 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 248f75b..dc20be0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -94,7 +94,10 @@ public enum GridTopic {
     TOPIC_QUERY,
 
     /** */
-    TOPIC_TX;
+    TOPIC_TX,
+
+    /** */
+    TOPIC_IO_TEST;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 6c5a628..54371cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -3419,6 +3419,16 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
         }
     }
 
+    /**
+     * @param node Node.
+     * @param payload Message payload.
+     * @param processFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread)
{
+        return ctx.io().sendIoTest(node, payload, processFromNioThread);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3fdda30..d6a2835 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -35,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -47,6 +50,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -57,6 +61,8 @@ import org.apache.ignite.internal.processors.platform.message.PlatformMessageFil
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -87,6 +93,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
+import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
@@ -207,6 +214,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Stopping flag. */
     private boolean stopping;
 
+    /** */
+    private final AtomicReference<ConcurrentHashMap<Long, GridFutureAdapter>>
ioTestMap = new AtomicReference<>();
+
+    /** */
+    private final AtomicLong ioTestId = new AtomicLong();
+
     /**
      * @param ctx Grid kernal context.
      */
@@ -348,6 +361,87 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             log.debug(startInfo());
 
         registerIoPoolExtensions();
+
+        addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (node == null)
+                    return;
+
+                IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg;
+
+                if (msg0.request()) {
+                    IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null);
+
+                    res.flags(msg0.flags());
+
+                    try {
+                        send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send IO test response [msg=" + msg0 + "]",
e);
+                    }
+                }
+                else {
+                    GridFutureAdapter fut = ioTestMap().remove(msg0.id());
+
+                    if (fut == null) {
+                        U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
+
+                        return;
+                    }
+
+                    fut.onDone();
+                }
+            }
+        });
+    }
+
+    /**
+     * @param node Node.
+     * @param payload Payload.
+     * @param processFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread)
{
+        if (ctx.localNodeId().equals(node.id()))
+            throw new IllegalArgumentException();
+
+        long id = ioTestId.getAndIncrement();
+
+        GridFutureAdapter fut = new GridFutureAdapter();
+
+        ioTestMap().put(id, fut);
+
+        try {
+            IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+            msg.processFromNioThread(processFromNioThread);
+
+            send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            ioTestMap().remove(id);
+
+            return new GridFinishedFuture(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @return IO test futures map.
+     */
+    private ConcurrentHashMap<Long, GridFutureAdapter> ioTestMap() {
+        ConcurrentHashMap<Long, GridFutureAdapter> map = ioTestMap.get();
+
+        if (map == null) {
+            if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
+                map = ioTestMap.get();
+        }
+
+        return map;
     }
 
     /**
@@ -836,6 +930,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             }
         };
 
+        if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
+            IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
+
+            if (msg0.processFromNioThread()) {
+                c.run();
+
+                return;
+            }
+        }
+
         try {
             pool(plc).execute(c);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index bd6ac5b..1b92465 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -761,7 +761,12 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124-125] - this
+            case 126:
+                msg = new IgniteIoTestMessage();
+
+                break;
+
+            // [-3..119] [124-126] - this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
new file mode 100644
index 0000000..08bd110
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class IgniteIoTestMessage implements Message {
+    /** */
+    private static byte FLAG_PROCESS_FROM_NIO = 1;
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long id;
+
+    /** */
+    private byte flags;
+
+    /** */
+    private boolean req;
+
+    /** */
+    private byte payload[];
+
+    /**
+     *
+     */
+    public IgniteIoTestMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param id
+     * @param req
+     * @param payload
+     */
+    public IgniteIoTestMessage(long id, boolean req, byte[] payload) {
+        this.id = id;
+        this.req = req;
+        this.payload = payload;
+    }
+
+    public boolean processFromNioThread() {
+        return isFlag(FLAG_PROCESS_FROM_NIO);
+    }
+
+    public void processFromNioThread(boolean processFromNioThread) {
+        setFlag(processFromNioThread, FLAG_PROCESS_FROM_NIO);
+    }
+
+    public void flags(byte flags) {
+        this.flags = flags;
+    }
+
+    public byte flags() {
+        return flags;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reads flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    public boolean request() {
+        return req;
+    }
+
+    public long id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("id", id))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeByteArray("payload", payload))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeBoolean("req", req))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                id = reader.readLong("id");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                payload = reader.readByteArray("payload");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                req = reader.readBoolean("req");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(IgniteIoTestMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 126;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteIoTestMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
new file mode 100644
index 0000000..bee45e0
--- /dev/null
+++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.yardstick.cache;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.yardstick.IgniteAbstractBenchmark;
+import org.yardstickframework.BenchmarkConfiguration;
+import org.yardstickframework.BenchmarkUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class IgniteIoTestBenchmark extends IgniteAbstractBenchmark {
+    /** */
+    private List<ClusterNode> targetNodes;
+
+    /** */
+    private IgniteKernal ignite;
+
+    /** {@inheritDoc} */
+    @Override public void setUp(BenchmarkConfiguration cfg) throws Exception {
+        super.setUp(cfg);
+
+        ignite = (IgniteKernal)ignite();
+
+        targetNodes = new ArrayList<>();
+
+        ClusterNode loc = ignite().cluster().localNode();
+
+        Collection<ClusterNode> nodes = ignite().cluster().forServers().nodes();
+
+        for (ClusterNode node : nodes) {
+            if (!loc.equals(node))
+                targetNodes.add(node);
+        }
+
+        if (targetNodes.isEmpty())
+            throw new IgniteException("Failed to find remote server nodes [nodes=" + nodes
+ ']');
+
+        BenchmarkUtils.println(cfg, "Initialized target nodes: " + targetNodes + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean test(Map<Object, Object> ctx) throws Exception {
+        ClusterNode node = targetNodes.get(nextRandom(targetNodes.size()));
+
+        ignite.sendIoTest(node, null, false).get();
+
+        return true;
+    }
+}


Mime
View raw message