ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/2] ignite git commit: IGNITE-2314: Implemented.
Date Tue, 29 Dec 2015 10:07:22 GMT
IGNITE-2314: Implemented.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4f55c84f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4f55c84f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4f55c84f

Branch: refs/heads/ignite-2314
Commit: 4f55c84f0cca6946165d004e75157a847628d123
Parents: f8011bb
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Dec 29 13:08:09 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Dec 29 13:08:09 2015 +0300

----------------------------------------------------------------------
 .../util/MPSCConcurrentLinkedQueue.java         | 127 +++++++++++++++++++
 .../util/ManyToOneConcurrentLinkedQueue.java    |  90 -------------
 .../ignite/internal/util/nio/GridNioServer.java |   5 +-
 3 files changed, 130 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4f55c84f/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCConcurrentLinkedQueue.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCConcurrentLinkedQueue.java
new file mode 100644
index 0000000..1990732
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCConcurrentLinkedQueue.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * MP-SC concurrent linked queue implementation based on Dmitry Vyukov's
+ * <a href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue">
+ *     Non-intrusive MPSC node-based queue</a>.
+ */
+public class MPSCConcurrentLinkedQueue<E>
+{
+    /** Tail field updater. */
+    private static final AtomicReferenceFieldUpdater<MPSCConcurrentLinkedQueue, Node>
TAIL_UPD =
+        AtomicReferenceFieldUpdater.newUpdater(MPSCConcurrentLinkedQueue.class, Node.class,
"tail");
+
+    /** Head. */
+    private Node head;
+
+    /** Tail. */
+    @SuppressWarnings({"UnusedDeclaration", "FieldCanBeLocal"})
+    private volatile Node tail;
+
+    /**
+     * Constructor.
+     */
+    public MPSCConcurrentLinkedQueue()
+    {
+        head = new Node(null);
+
+        tail = head;
+    }
+
+    /**
+     * Offer element.
+     *
+     * @param e Element.
+     */
+    public void offer(final E e)
+    {
+        if (e == null)
+            throw new IllegalArgumentException("Null are not allowed.");
+
+        Node newTail = new Node(e);
+
+        Node prevTail = TAIL_UPD.getAndSet(this, newTail);
+
+        prevTail.setNext(newTail);
+    }
+
+    /**
+     * Poll element.
+     *
+     * @return Element.
+     */
+    @SuppressWarnings("unchecked")
+    public E poll()
+    {
+        final Node node = head.next;
+
+        if (node != null)
+        {
+            Object val = node.val;
+
+            node.val = null;
+
+            head = node;
+
+            return (E)val;
+        }
+        else
+            return null;
+    }
+
+    /**
+     * Node with data.
+     */
+    private static class Node
+    {
+        /** Next field updater. */
+        public static final AtomicReferenceFieldUpdater<Node, Node> NODE_UPD =
+            AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
+
+        /** Value. */
+        private Object val;
+
+        /** Next node. */
+        @SuppressWarnings("UnusedDeclaration")
+        private volatile Node next;
+
+        /**
+         * Constructor.
+         *
+         * @param val Value.
+         */
+        private Node(Object val)
+        {
+            this.val = val;
+        }
+
+        /**
+         * Set next node.
+         *
+         * @param next Next node.
+         */
+        void setNext(Node next)
+        {
+            NODE_UPD.lazySet(this, next);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f55c84f/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
deleted file mode 100644
index d30a5da..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ManyToOneConcurrentLinkedQueue.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.ignite.internal.util;
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-/**
- *
- */
-public class ManyToOneConcurrentLinkedQueue<E>
-{
-    protected static final AtomicReferenceFieldUpdater<ManyToOneConcurrentLinkedQueue,
Node> TAIL_UPD =
-        AtomicReferenceFieldUpdater.newUpdater(ManyToOneConcurrentLinkedQueue.class, Node.class,
"tail");
-
-    protected volatile ManyToOneConcurrentLinkedQueue.Node tail;
-
-    private Node head;
-
-    public ManyToOneConcurrentLinkedQueue()
-    {
-        head = new Node(null);
-        TAIL_UPD.lazySet(this, head);
-    }
-
-    public boolean offer(final E e)
-    {
-        if (null == e)
-        {
-            throw new NullPointerException("element cannot be null");
-        }
-
-        final Node newTail = new Node(e);
-        final Node prevTail = swapTail(newTail);
-        prevTail.setNextOrdered(newTail);
-
-        return true;
-    }
-
-    public E poll()
-    {
-        Object value = null;
-
-        final Node node = head.next;
-
-        if (null != node)
-        {
-            value = node.value;
-            node.value = null;
-            head = node;
-        }
-
-        return (E)value;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Node swapTail(final Node newTail)
-    {
-        return TAIL_UPD.getAndSet(this, newTail);
-    }
-
-    /**
-     * Node with data.
-     */
-    private static class Node
-    {
-        public static final AtomicReferenceFieldUpdater<Node, Node> NODE_UPD =
-            AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
-
-        Object value;
-        volatile Node next;
-
-        /**
-         * Constructor.
-         *
-         * @param value Value.
-         */
-        private Node(Object value)
-        {
-            this.value = value;
-        }
-
-        /**
-         * Set next node.
-         *
-         * @param next Next node.
-         */
-        void setNextOrdered(Node next)
-        {
-            NODE_UPD.lazySet(this, next);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4f55c84f/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 17a0b8f..1375d03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -41,7 +41,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Queue;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -50,6 +49,7 @@ import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.MPSCConcurrentLinkedQueue;
 import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1244,7 +1244,8 @@ public class GridNioServer<T> {
      */
     private abstract class AbstractNioClientWorker extends GridWorker {
         /** Queue of change requests on this selector. */
-        private final Queue<NioOperationFuture> changeReqs = new ConcurrentLinkedDeque8<>();
+        private final MPSCConcurrentLinkedQueue<NioOperationFuture> changeReqs =
+            new MPSCConcurrentLinkedQueue<>();
 
         /** Selector to select read events. */
         private Selector selector;


Mime
View raw message