ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [01/10] ignite git commit: ignite-4680-2
Date Fri, 17 Mar 2017 15:04:46 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4680-sb [created] e59edc930


http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 5a8904f..1a87ec8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -41,6 +41,9 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
     /** Timestamp used as base time for cache topology version (January 1, 2014). */
     public static final long TOP_VER_BASE_TIME = 1388520000000L;
 
+    /** Maximum number of atomic ids for thread. Must be power of two ! */
+    protected static final int THREAD_RESERVE_SIZE = 0x4000;
+
     /**
      * Current order. Initialize to current time to make sure that
      * local version increments even after restarts.
@@ -63,6 +66,16 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
     /** */
     private GridCacheVersion ISOLATED_STREAMER_VER;
 
+    /** Global atomic id counter. */
+    protected final AtomicLong globalAtomicCnt = new AtomicLong();
+
+    /** Per thread atomic id counter. */
+    private final ThreadLocal<LongWrapper> threadAtomicVersionCnt = new ThreadLocal<LongWrapper>()
{
+        @Override protected LongWrapper initialValue() {
+            return new LongWrapper(globalAtomicCnt);
+        }
+    };
+
     /** */
     private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -304,4 +317,38 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
     public GridCacheVersion last() {
         return last;
     }
+
+    /**
+     * @return Next future Id for atomic futures.
+     */
+    public long nextAtomicFutureVersion() {
+        LongWrapper cnt = threadAtomicVersionCnt.get();
+        return cnt.getNext();
+    }
+
+    /** Long wrapper. */
+    private static class LongWrapper {
+        /** */
+        private long val;
+        private final AtomicLong globalCnt;
+
+        /** */
+        public LongWrapper(AtomicLong globalCnt) {
+            assert THREAD_RESERVE_SIZE > 1 && (THREAD_RESERVE_SIZE & (THREAD_RESERVE_SIZE
- 1)) == 0 :
+                "THREAD_RESERVE_SIZE must be power of two";
+
+            this.globalCnt = globalCnt;
+            val = globalCnt.getAndAdd(THREAD_RESERVE_SIZE);
+        }
+
+        /** */
+        public long getNext() {
+            long res = val++;
+
+            if ((val & (THREAD_RESERVE_SIZE - 1)) == 0)
+                val = globalCnt.getAndAdd(THREAD_RESERVE_SIZE);
+
+            return res;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
new file mode 100644
index 0000000..5505b3a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MPSCQueue.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.AbstractQueue;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.concurrent.locks.LockSupport.park;
+import static java.util.concurrent.locks.LockSupport.unpark;
+
+/**
+ * @param <E>
+ */
+public final class MPSCQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>
{
+
+    /** */
+    static final int INITIAL_ARRAY_SIZE = 512;
+    /** */
+    static final Node BLOCKED = new Node();
+
+    /** */
+    final AtomicReference<Node> putStack = new AtomicReference<>();
+    /** */
+    private final AtomicInteger takeStackSize = new AtomicInteger();
+
+    /** */
+    private Thread consumerThread;
+    /** */
+    private Object[] takeStack = new Object[INITIAL_ARRAY_SIZE];
+    /** */
+    private int takeStackIndex = -1;
+
+    /**
+     */
+    public MPSCQueue(Thread consumerThread) {
+        assert consumerThread != null;
+        this.consumerThread = consumerThread;
+    }
+
+    /**
+     */
+    public MPSCQueue() {
+    }
+
+    /**
+     * Sets the consumer thread.
+     *
+     * The consumer thread is needed for blocking, so that an offering known which thread
+     * to wakeup. There can only be a single consumerThread and this method should be called
+     * before the queue is safely published. It will not provide a happens before relation
on
+     * its own.
+     *
+     * @param consumerThread the consumer thread.
+     * @throws NullPointerException when consumerThread null.
+     */
+    public void setConsumerThread(Thread consumerThread) {
+        assert consumerThread != null;
+        this.consumerThread = consumerThread;
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * This call is threadsafe; but it will only remove the items that are on the put-stack.
+     */
+    @Override public void clear() {
+        putStack.set(BLOCKED);
+    }
+
+    /** {@inheritDoc}. */
+    @Override public boolean offer(E item) {
+        assert item != null : "item can't be null";
+
+        AtomicReference<Node> putStack = this.putStack;
+        Node newHead = new Node();
+        newHead.item = item;
+
+        for (; ; ) {
+            Node oldHead = putStack.get();
+            if (oldHead == null || oldHead == BLOCKED) {
+                newHead.next = null;
+                newHead.size = 1;
+            }
+            else {
+                newHead.next = oldHead;
+                newHead.size = oldHead.size + 1;
+            }
+
+            if (!putStack.compareAndSet(oldHead, newHead))
+                continue;
+
+            if (oldHead == BLOCKED)
+                unpark(consumerThread);
+
+            return true;
+        }
+    }
+
+    /** {@inheritDoc}. */
+    @Override public E peek() {
+        E item = peekNext();
+        if (item != null)
+            return item;
+
+        if (!drainPutStack())
+            return null;
+
+        return peekNext();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public E take() throws InterruptedException {
+        E item = next();
+
+        if (item != null)
+            return item;
+
+        takeAll();
+        assert takeStackIndex == 0;
+        assert takeStack[takeStackIndex] != null;
+
+        return next();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public E poll() {
+        E item = next();
+
+        if (item != null)
+            return item;
+
+        if (!drainPutStack())
+            return null;
+
+        return next();
+    }
+
+    /** */
+    private E next() {
+        E item = peekNext();
+
+        if (item != null)
+            dequeue();
+
+        return item;
+    }
+
+    /** */
+    private E peekNext() {
+        if (takeStackIndex == -1)
+            return null;
+
+        if (takeStackIndex == takeStack.length) {
+            takeStackIndex = -1;
+            return null;
+        }
+
+        E item = (E)takeStack[takeStackIndex];
+
+        if (item == null) {
+            takeStackIndex = -1;
+            return null;
+        }
+
+        return item;
+    }
+
+    /** */
+    private void dequeue() {
+        takeStack[takeStackIndex] = null;
+        takeStackIndex++;
+        takeStackSize.lazySet(takeStackSize.get() - 1);
+    }
+
+    /** */
+    private void takeAll() throws InterruptedException {
+        long iteration = 0;
+        AtomicReference<Node> putStack = this.putStack;
+
+        for (; ; ) {
+            if (consumerThread != null && consumerThread.isInterrupted()) {
+                putStack.compareAndSet(BLOCKED, null);
+                throw new InterruptedException();
+            }
+
+            Node currentPutStackHead = putStack.get();
+
+            if (currentPutStackHead == null) {
+                // there is nothing to be take, so lets block.
+                if (!putStack.compareAndSet(null, BLOCKED)) {
+                    // we are lucky, something is available
+                    continue;
+                }
+
+                // lets block for real.
+                park();
+            }
+            else if (currentPutStackHead == BLOCKED)
+                park();
+
+            else {
+                if (!putStack.compareAndSet(currentPutStackHead, null))
+                    continue;
+
+                copyIntoTakeStack(currentPutStackHead);
+                break;
+            }
+            iteration++;
+        }
+    }
+
+    /** */
+    private boolean drainPutStack() {
+        for (; ; ) {
+            Node head = putStack.get();
+
+            if (head == null)
+                return false;
+
+            if (putStack.compareAndSet(head, null)) {
+                copyIntoTakeStack(head);
+                return true;
+            }
+        }
+    }
+
+    /** */
+    private void copyIntoTakeStack(Node putStackHead) {
+        int putStackSize = putStackHead.size;
+
+        takeStackSize.lazySet(putStackSize);
+
+        if (putStackSize > takeStack.length)
+            takeStack = new Object[nextPowerOfTwo(putStackHead.size)];
+
+        for (int i = putStackSize - 1; i >= 0; i--) {
+            takeStack[i] = putStackHead.item;
+            putStackHead = putStackHead.next;
+        }
+
+        takeStackIndex = 0;
+        assert takeStack[0] != null;
+    }
+
+    /**
+     * {@inheritDoc}.
+     *
+     * Best effort implementation.
+     */
+    @Override public int size() {
+        Node h = putStack.get();
+        int putStackSize = h == null ? 0 : h.size;
+        return putStackSize + takeStackSize.get();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    /** {@inheritDoc}. */
+    @Override public void put(E e) throws InterruptedException {
+        offer(e);
+    }
+
+    /** {@inheritDoc}. */
+    @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
{
+        add(e);
+        return true;
+    }
+
+    /** {@inheritDoc}. */
+    @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    /** {@inheritDoc}. */
+    @Override public int drainTo(Collection<? super E> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public int drainTo(Collection<? super E> c, int maxElements) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc}. */
+    @Override public Iterator<E> iterator() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** */
+    private static int nextPowerOfTwo(final int value) {
+        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+    }
+
+    /** */
+    private static final class Node<E> {
+        /** */
+        Node next;
+        /** */
+        E item;
+        /** */
+        int size;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 6c85b32..af474ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -38,8 +38,9 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.IgniteStripeThread;
 import org.jetbrains.annotations.NotNull;
+import org.jsr166.LongAdder8;
 
 /**
  * Striped executor.
@@ -75,7 +76,7 @@ public class StripedExecutor implements ExecutorService {
 
         try {
             for (int i = 0; i < cnt; i++) {
-                stripes[i] = new StripeConcurrentQueue(
+                stripes[i] = new StripeMPSCQueue(
                     igniteInstanceName,
                     poolName,
                     i,
@@ -146,6 +147,31 @@ public class StripedExecutor implements ExecutorService {
     }
 
     /**
+     * @return Metrics.
+     */
+    public String getMetrics() {
+        GridStringBuilder sb = new GridStringBuilder();
+        sb.a("completed");
+        GridStringBuilder sb2 = new GridStringBuilder();
+        sb2.a("park");
+        GridStringBuilder sb3 = new GridStringBuilder();
+        sb3.a("unpark");
+        GridStringBuilder sb4 = new GridStringBuilder();
+        sb4.a("queue");
+
+        for (int i = 0; i < stripes.length; i++) {
+            Stripe stripe = stripes[i];
+
+            sb.a(':').a(stripe.completedCnt);
+            sb2.a(':').a(stripe.parkCntr());
+            sb3.a(':').a(stripe.unparkCntr());
+            sb4.a(':').a(stripe.queueSize());
+        }
+
+        return sb.a(' ').a(sb2).a(' ').a(sb3).a(' ').a(sb4).toString();
+    }
+
+    /**
      * @return Stripes count.
      */
     public int stripes() {
@@ -435,10 +461,9 @@ public class StripedExecutor implements ExecutorService {
          * Starts the stripe.
          */
         void start() {
-            thread = new IgniteThread(igniteInstanceName,
+            thread = new IgniteStripeThread(igniteInstanceName,
                 poolName + "-stripe-" + idx,
                 this,
-                IgniteThread.GRP_IDX_UNASSIGNED,
                 idx);
 
             thread.start();
@@ -524,6 +549,20 @@ public class StripedExecutor implements ExecutorService {
          */
         abstract String queueToString();
 
+        /**
+         * @return Number of park ops.
+         */
+        public long parkCntr() {
+            return 0;
+        }
+
+        /**
+         * @return Number of unpark ops.
+         */
+        public long unparkCntr() {
+            return 0;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(Stripe.class, this);
@@ -540,6 +579,12 @@ public class StripedExecutor implements ExecutorService {
         /** */
         private volatile boolean parked;
 
+        /** */
+        private volatile long parkCntr;
+
+        /** */
+        private final LongAdder8 unparkCntr = new LongAdder8();
+
         /**
          * @param igniteInstanceName Ignite instance name.
          * @param poolName Pool name.
@@ -578,6 +623,7 @@ public class StripedExecutor implements ExecutorService {
                     if (r != null)
                         return r;
 
+                    parkCntr++;
                     LockSupport.park();
 
                     if (Thread.interrupted())
@@ -593,8 +639,10 @@ public class StripedExecutor implements ExecutorService {
         void execute(Runnable cmd) {
             queue.add(cmd);
 
-            if (parked)
+            if (parked) {
+                unparkCntr.increment();
                 LockSupport.unpark(thread);
+            }
         }
 
         /** {@inheritDoc} */
@@ -608,6 +656,16 @@ public class StripedExecutor implements ExecutorService {
         }
 
         /** {@inheritDoc} */
+        @Override public long parkCntr() {
+            return parkCntr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long unparkCntr() {
+            return unparkCntr.sum();
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(StripeConcurrentQueue.class, this, super.toString());
         }
@@ -719,4 +777,61 @@ public class StripedExecutor implements ExecutorService {
             return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString());
         }
     }
+
+    /**
+     * Stripe.
+     */
+    private static class StripeMPSCQueue extends Stripe {
+        /** Queue. */
+        private final MPSCQueue<Runnable> queue = new MPSCQueue<>();
+
+        /**
+         * @param gridName Grid name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeMPSCQueue(
+            String gridName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(gridName,
+                poolName,
+                idx,
+                log);
+        }
+
+        /** {@inheritDoc} */
+        @Override void start() {
+            super.start();
+            queue.setConsumerThread(thread);
+        }
+
+        /** {@inheritDoc} */
+        @Override Runnable take() throws InterruptedException {
+            return queue.take();
+        }
+
+        /** {@inheritDoc} */
+        void execute(Runnable cmd) {
+            queue.add(cmd);
+        }
+
+        /** {@inheritDoc} */
+        @Override int queueSize() {
+            return queue.size();
+        }
+
+        /** {@inheritDoc} */
+        @Override String queueToString() {
+            return String.valueOf(queue);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StripeMPSCQueue.class, this, super.toString());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
new file mode 100644
index 0000000..75b655f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripeThread.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.thread;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Class for use within {@link IgniteThreadPoolExecutor} class.
+ */
+public class IgniteStripeThread extends IgniteThread {
+
+    /** Group index. */
+    private final int stripeIdx;
+
+    /** {@inheritDoc} */
+    public IgniteStripeThread(String gridName, String threadName, Runnable r, int stripeIdx)
{
+        super(gridName, threadName, r);
+        this.stripeIdx = stripeIdx;
+    }
+
+    /**
+     * @return Group index.
+     */
+    public int stripeIndex() {
+        return stripeIdx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteStripeThread.class, this, "name", getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
new file mode 100644
index 0000000..d6e7537
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequestTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+
+/**
+ * Trivial GridNearAtomicFullUpdateRequest tests.
+ */
+public class GridNearAtomicFullUpdateRequestTest extends MarshallingAbstractTest {
+    /**
+     * Message marshalling test.
+     *
+     * @throws IgniteCheckedException If fails.
+     */
+    public void testMarshall() throws IgniteCheckedException {
+        GridCacheVersion updVer = new GridCacheVersion(1, 2, 3, 5);
+
+        int entryNum = 3;
+
+        GridNearAtomicFullUpdateRequest msg = new GridNearAtomicFullUpdateRequest(555,
+            UUID.randomUUID(),
+            555L,
+            new AffinityTopologyVersion(25, 5),
+            false,
+            CacheWriteSynchronizationMode.PRIMARY_SYNC,
+            GridCacheOperation.UPDATE,
+            false,
+            null,
+            null,
+            null,
+            null,
+            555,
+            false,
+            false,
+            true,
+            false,
+            3,
+            entryNum);
+
+        for (int i = 0; i < entryNum; i++)
+            msg.addUpdateEntry(
+                key(i, i),
+                val(i),
+                -1,
+                -1,
+                null
+            );
+
+        GridNearAtomicFullUpdateRequest received = marshalUnmarshal(msg);
+
+        assertEquals(555, received.futureId());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
new file mode 100644
index 0000000..f53a32b
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/MarshallingAbstractTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import junit.framework.TestCase;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryContext;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.MarshallerContextTestImpl;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public abstract class MarshallingAbstractTest extends TestCase {
+    /** */
+    private static final byte proto = 2;
+
+    /** */
+    private GridCacheSharedContext ctx;
+
+    /** */
+    private GridCacheContext cctx;
+
+    /** */
+    @Override protected void setUp() throws Exception {
+        super.setUp();
+        ctx = mock(GridCacheSharedContext.class);
+        cctx = mock(GridCacheContext.class);
+
+        CacheObjectBinaryContext coctx = mock(CacheObjectBinaryContext.class);
+        GridKernalContext kctx = mock(GridKernalContext.class);
+        IgniteEx ignite = mock(IgniteEx.class);
+        GridCacheProcessor proc = mock(GridCacheProcessor.class);
+        GridCacheSharedContext shared = mock(GridCacheSharedContext.class);
+
+        Marshaller marsh = new BinaryMarshaller();
+
+        IgniteConfiguration cfg = new IgniteConfiguration();
+        cfg.setMarshaller(marsh);
+
+        when(ctx.cacheContext(anyInt())).thenReturn(cctx);
+        when(ctx.marshaller()).thenReturn(marsh);
+        when(ctx.gridConfig()).thenReturn(cfg);
+
+        when(cctx.cacheObjectContext()).thenReturn(coctx);
+        when(cctx.gridConfig()).thenReturn(cfg);
+        when(cctx.shared()).thenReturn(shared);
+
+        when(cctx.grid()).thenReturn(ignite);
+        when(kctx.grid()).thenReturn(ignite);
+
+        when(ignite.configuration()).thenReturn(cfg);
+
+        when(ctx.kernalContext()).thenReturn(kctx);
+        when(cctx.kernalContext()).thenReturn(kctx);
+        when(coctx.kernalContext()).thenReturn(kctx);
+
+        when(coctx.binaryEnabled()).thenReturn(true);
+
+        when(kctx.cache()).thenReturn(proc);
+        when(kctx.config()).thenReturn(cfg);
+
+        when(proc.context()).thenReturn(ctx);
+
+        IgniteCacheObjectProcessor binaryProcessor = new CacheObjectBinaryProcessorImpl(kctx);
+        when(kctx.cacheObjects()).thenReturn(binaryProcessor);
+
+        // init marshaller
+        marsh.setContext(new MarshallerContextTestImpl());
+
+        when(shared.marshaller()).thenReturn(marsh);
+
+        BinaryContext bctx =
+            new BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(),
new NullLogger());
+
+        IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", bctx, new IgniteConfiguration());
+    }
+
+    /**
+     * @param m Message.
+     * @return Unmarshalled message.
+     */
+    protected <T extends GridCacheMessage> T marshalUnmarshal(T m) throws IgniteCheckedException
{
+        ByteBuffer buf = ByteBuffer.allocate(64 * 1024);
+
+        m.prepareMarshal(ctx);
+        m.writeTo(buf, new DirectMessageWriter(proto));
+
+        System.out.println("Binary size: " + buf.position() + " bytes");
+        buf.flip();
+
+        byte type = buf.get();
+        assertEquals(m.directType(), type);
+
+        MessageFactory msgFactory = new GridIoMessageFactory(null);
+
+        Message mx = msgFactory.create(type);
+        mx.readFrom(buf, new DirectMessageReader(msgFactory, proto));
+        ((GridCacheMessage)mx).finishUnmarshal(ctx, U.gridClassLoader());
+
+        return (T)mx;
+    }
+
+    /** */
+    protected KeyCacheObject key(Object val, int part) {
+        return new KeyCacheObjectImpl(val, null, part);
+    }
+
+    protected CacheObject val(Object val) {
+        return new CacheObjectImpl(val, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d1b4ebd4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
new file mode 100644
index 0000000..f555750
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManagerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.version;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import junit.framework.TestCase;
+
+/**
+ * Simple tests for {@link GridCacheVersionManager}.
+ */
+public class GridCacheVersionManagerTest extends TestCase {
+    /**
+     * Test for for {@link GridCacheVersionManager#nextAtomicFutureVersion()}.
+     */
+    public void testNextAtomicIdMonotonicalGrows() {
+        GridCacheVersionManager mgr = new GridCacheVersionManager();
+        int n = GridCacheVersionManager.THREAD_RESERVE_SIZE;
+
+        for (int i = 0; i < n * 3 + 5; i++) {
+            long l = mgr.nextAtomicFutureVersion();
+            assertEquals(i, l);
+        }
+
+        assertEquals(n * 4, mgr.globalAtomicCnt.get());
+    }
+
+    /**
+     * Test for for {@link GridCacheVersionManager#nextAtomicFutureVersion()} with multiple
threads.
+     *
+     * @throws InterruptedException if fails.
+     */
+    public void testNextAtomicMultiThread() throws InterruptedException {
+        int threadsNum = 3;
+
+        final GridCacheVersionManager mgr = new GridCacheVersionManager();
+        final int n = GridCacheVersionManager.THREAD_RESERVE_SIZE;
+        final int perThreadNum = n * 2 + 10;
+
+        final int[] vals = new int[n * threadsNum * (perThreadNum / n + 1)];
+
+        ExecutorService executorService = Executors.newFixedThreadPool(threadsNum);
+
+        final CountDownLatch latch = new CountDownLatch(threadsNum);
+
+        for (int i = 0; i < threadsNum; i++) {
+            executorService.submit(new Runnable() {
+                @Override public void run() {
+                    for (int i = 0; i < perThreadNum; i++) {
+                        long l = mgr.nextAtomicFutureVersion();
+                        vals[(int)l]++;
+                    }
+                    latch.countDown();
+                }
+            });
+        }
+
+        latch.await();
+        executorService.shutdown();
+
+        int cnt = 0;
+
+        for (int i = 0; i < vals.length; i++) {
+            if (vals[i] > 0)
+                cnt++;
+        }
+
+        assertEquals(threadsNum * perThreadNum, cnt);
+    }
+}


Mime
View raw message