ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [06/16] ignite git commit: Ignite-642: fix.
Date Tue, 12 Apr 2016 17:48:50 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
new file mode 100644
index 0000000..03fe568
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -0,0 +1,1244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCondition;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLock;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Cache reentrant lock implementation based on AbstractQueuedSynchronizer.
+ */
+public final class GridCacheLockImpl implements GridCacheLockEx, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Deserialization stash. */
+    private static final ThreadLocal<IgniteBiTuple<GridKernalContext, String>> stash =
+        new ThreadLocal<IgniteBiTuple<GridKernalContext, String>>() {
+            @Override protected IgniteBiTuple<GridKernalContext, String> initialValue() {
+                return F.t2();
+            }
+        };
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Reentrant lock name. */
+    private String name;
+
+    /** Removed flag. */
+    private volatile boolean rmvd;
+
+    /** Reentrant lock key. */
+    private GridCacheInternalKey key;
+
+    /** Reentrant lock projection. */
+    private IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView;
+
+    /** Cache context. */
+    private GridCacheContext ctx;
+
+    /** Initialization guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /** Initialization latch. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
+    /** Lock that provides non-overlapping processing of updates. */
+    private ReentrantLock updateLock = new ReentrantLock();
+
+    /** Internal synchronization object. */
+    private Sync sync;
+
+    /** Flag indicating that every operation on this lock should be interrupted. */
+    private volatile boolean interruptAll = false;
+
+    /**
+     * Empty constructor required by {@link Externalizable}.
+     */
+    public GridCacheLockImpl() {
+        // No-op.
+    }
+
+    /**
+     * Synchronization implementation for reentrant lock using AbstractQueuedSynchronizer.
+     */
+    class Sync extends AbstractQueuedSynchronizer {
+        private static final long serialVersionUID = 1192457210091910933L;
+
+        private static final long LOCK_FREE = 0;
+
+        /** Map containing condition objects. */
+        private Map<String, ConditionObject> conditionMap;
+
+        /** List of condition signal calls on this node. */
+        private Map<String, Integer> outgoingSignals;
+
+        /** Last condition waited on. */
+        @Nullable
+        private volatile String lastCondition;
+
+        /** True if any node owning the lock had failed. */
+        private volatile boolean isBroken = false;
+
+        /** UUID of the node that currently owns the lock. */
+        private volatile UUID currentOwnerNode;
+
+        /** ID of the thread that currently owns the lock. */
+        private volatile long currentOwnerThreadId;
+
+        /** UUID of this node. */
+        private final UUID thisNode;
+
+        /** FailoverSafe flag. */
+        private final boolean failoverSafe;
+
+        protected Sync(GridCacheLockState state) {
+            setState(state.get());
+
+            thisNode = ctx.localNodeId();
+
+            currentOwnerNode = state.getId();
+
+            currentOwnerThreadId = state.getThreadId();
+
+            conditionMap = new HashMap<>();
+
+            outgoingSignals = new HashMap<>();
+
+            failoverSafe = state.isFailoverSafe();
+        }
+
+        /** */
+        protected void addOutgoingSignal(String condition) {
+            int cnt = 0;
+            if (outgoingSignals.containsKey(condition)) {
+                cnt = outgoingSignals.get(condition);
+
+                // SignalAll has already been called.
+                if (cnt == 0)
+                    return;
+            }
+
+            outgoingSignals.put(condition, cnt + 1);
+        }
+
+        protected void addOutgoingSignalAll(String condition) {
+            outgoingSignals.put(condition, 0);
+        }
+
+        /** Process any condition await calls on this node. */
+        private String processAwait() {
+            if(lastCondition == null)
+                return null;
+
+            String ret = lastCondition;
+
+            lastCondition = null;
+
+            return ret;
+        }
+
+        /** */
+        private Map<String, Integer> processSignal(){
+            Map<String,Integer> ret = new HashMap<>(outgoingSignals);
+
+            outgoingSignals.clear();
+
+            return ret;
+        }
+
+        /** Interrupt every thread on this node waiting on this lock. */
+        private synchronized void interruptAll(){
+            // First release all threads waiting on associated condition queues.
+            if(!conditionMap.isEmpty()) {
+                // Temporarily obtain ownership of the lock,
+                // in order to signal all conditions.
+                UUID tempUUID = getOwnerNode();
+
+                long tempThreadID = currentOwnerThreadId;
+
+                this.setCurrentOwnerNode(thisNode);
+
+                this.currentOwnerThreadId = Thread.currentThread().getId();
+
+                for (Condition c : conditionMap.values())
+                    c.signalAll();
+
+                // Restore owner node and owner thread.
+                this.setCurrentOwnerNode(tempUUID);
+
+                this.currentOwnerThreadId = tempThreadID;
+            }
+
+            // Interrupt any ongoing transactions.
+            for(Thread t: getQueuedThreads()){
+                t.interrupt();
+            }
+
+            // Interrupt any future call to acquire/release on this sync object.
+            interruptAll = true;
+        }
+
+        /** Check if lock is in correct state (i.e. not broken in non-failoversafe mode),
+         * if not throw  {@linkplain IgniteInterruptedException} */
+        private void validate(){
+            if(Thread.interrupted() || interruptAll){
+                throw new IgniteInterruptedException("Lock broken in non-failoversafe mode.");
+            }
+        }
+
+        /**
+         * Sets the number of permits currently acquired on this lock. This method should only be used in {@linkplain
+         * GridCacheLockImpl#onUpdate(GridCacheLockState)}.
+         *
+         * @param permits Number of permits acquired at this reentrant lock.
+         */
+        final synchronized void setPermits(int permits) {
+            setState(permits);
+        }
+
+        /**
+         * Gets the number of permissions currently acquired at this lock.
+         *
+         * @return Number of permits acquired at this reentrant lock.
+         */
+        final int getPermits() {
+            return getState();
+        }
+
+        /**
+         * Sets the UUID of the node that currently owns this lock. This method should only be used in {@linkplain
+         * GridCacheLockImpl#onUpdate(GridCacheLockState)}.
+         *
+         * @param ownerNode UUID of the node owning this lock.
+         */
+        final synchronized void setCurrentOwnerNode(UUID ownerNode) {
+            currentOwnerNode = ownerNode;
+        }
+
+        /**
+         * Gets the UUID of the node that currently owns the lock.
+         *
+         * @return UUID of the node that currently owns the lock.
+         */
+        final UUID getOwnerNode() {
+            return currentOwnerNode;
+        }
+
+        /**
+         * Checks if latest call to acquire/release was called on this node.
+         * Should only be called from update method.
+         *
+         * @param newOwnerID ID of the node that is about to acquire this lock (or null).
+         * @return true if acquire/release that triggered last update came from this node.
+         */
+        protected boolean isLockedLocally(UUID newOwnerID){
+            if(thisNode.equals(getOwnerNode()) || thisNode.equals(newOwnerID)){
+                return true;
+            }
+
+            return false;
+        }
+
+        protected void setCurrentOwnerThread(long newOwnerThreadId){
+            this.currentOwnerThreadId = newOwnerThreadId;
+        }
+
+        /**
+         * Returns true if node that owned the locked failed before call to unlock.
+         *
+         * @return true if any node failed while owning the lock.
+         */
+        protected boolean isBroken() {
+            return isBroken;
+        }
+
+        /** */
+        protected void setBroken(boolean isBroken) {
+            this.isBroken = isBroken;
+        }
+
+        /**
+         * Performs non-fair tryLock.
+         */
+        final boolean nonfairTryAcquire(int acquires) {
+            // If broken in non-failoversafe mode, exit immediately.
+            if(interruptAll){
+                return true;
+            }
+
+            final Thread current = Thread.currentThread();
+
+            final UUID currentOwner = this.currentOwnerNode;
+
+            int c = getState();
+
+            // Check if lock is released or current owner failed.
+            if(c == 0 || ctx.discovery().node(currentOwner) == null){
+                if (compareAndSetGlobalState(0, acquires, current.getId())){
+
+                    // Not used for synchronization (we use ThreadID), but updated anyway.
+                    setExclusiveOwnerThread(current);
+
+                    while(getState() != acquires)
+                        Thread.yield();
+
+                    return true;
+                }
+            }
+            else if (isHeldExclusively()) {
+                int nextc = c + acquires;
+
+                if (nextc < 0) // overflow
+                    throw new Error("Maximum lock count exceeded");
+
+                setState(nextc);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * Performs lock.
+         */
+        final void lock() {
+             acquire(1);
+        }
+
+        /** {@inheritDoc} */
+        protected final boolean tryAcquire(int acquires) {
+            return nonfairTryAcquire(acquires);
+        }
+
+        /** {@inheritDoc} */
+        protected final boolean tryRelease(int releases) {
+            // This method is called with release==0 only when trying to wake through update,
+            // to check if some other node released the lock.
+            if(releases == 0) {
+                return true;
+            }
+
+            // If broken in non-failoversafe mode, exit immediately.
+            if(interruptAll)
+                return true;
+
+            int c = getState() - releases;
+
+            if (!isHeldExclusively()) {
+                throw new IllegalMonitorStateException();
+            }
+
+            boolean free = false;
+
+            if (c == 0){
+                free = true;
+
+                setGlobalState(0, processAwait(), processSignal());
+
+                while(getState() != 0)
+                    Thread.yield();
+            }
+            else
+                setState(c);
+
+            return free;
+        }
+
+
+        /** {@inheritDoc} */
+        protected final boolean isHeldExclusively() {
+            // While we must in general read state before owner,
+            // we don't need to do so to check if current thread is owner
+
+            return this.currentOwnerThreadId == Thread.currentThread().getId() && thisNode.equals(currentOwnerNode);
+        }
+
+        /** {@inheritDoc} */
+        final synchronized IgniteCondition newCondition(String name) {
+            if(conditionMap.containsKey(name))
+                return new IgniteConditionObject(name, conditionMap.get(name));
+
+            ConditionObject cond = new ConditionObject();
+
+            conditionMap.put(name, cond);
+
+            return new IgniteConditionObject(name, cond);
+        }
+
+        // Methods relayed from outer class
+
+        final int getHoldCount() {
+            return isHeldExclusively() ? getState() : 0;
+        }
+
+        final boolean isLocked() throws IgniteCheckedException {
+            return getState() != 0 || lockView.get(key).get() != 0;
+        }
+
+        /**
+         * This method is used for synchronizing the reentrant lock state across all nodes.
+         */
+        protected boolean compareAndSetGlobalState(final int expVal, final int newVal, long newThreadID) {
+            try {
+                return CU.outTx(
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+
+                                GridCacheLockState val = lockView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
+
+                                if (val.get() == expVal || ctx.discovery().node(val.getId()) == null) {
+                                    val.set(newVal);
+
+                                    val.setId(thisNode);
+
+                                    val.setThreadId(newThreadID);
+
+                                    lockView.put(key, val);
+
+                                    tx.commit();
+
+                                    return true;
+                                }
+
+                                return false;
+                            }
+                            catch (Error | Exception e) {
+                                U.error(log, "Failed to compare and set: " + this, e);
+
+                                throw e;
+                            }
+                        }
+                    }),
+                    ctx
+                );
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        /**
+         * Sets the global state across all nodes after releasing the reentrant lock.
+         *
+         * @param newVal New state.
+         * @param lastCondition Id of the condition await is called.
+         * @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock.
+         */
+        protected boolean setGlobalState(final int newVal, @Nullable final String lastCondition, final Map<String, Integer> outgoingSignals) {
+            try {
+                return CU.outTx(
+                    retryTopologySafe(new Callable<Boolean>() {
+                        @Override public Boolean call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheLockState val = lockView.get(key);
+
+                                if (val == null)
+                                    throw new IgniteCheckedException("Failed to find reentrant lock with given name: " + name);
+
+                                val.set(newVal);
+
+                                val.setId(null);
+
+                                val.setThreadId(LOCK_FREE);
+
+                                // Get global condition queue.
+                                Map<String, LinkedList<UUID>> condMap = val.getConditionMap();
+
+                                // Create map containing signals from this node.
+                                Map<UUID, LinkedList<String>> signalMap = new HashMap<UUID, LinkedList<String>>();
+
+                                // Put any signal calls on this node to global state.
+                                if (!outgoingSignals.isEmpty()) {
+                                    for (String condition : outgoingSignals.keySet()) {
+                                        int cnt = outgoingSignals.get(condition);
+
+                                        // Get queue for this condition.
+                                        List<UUID> list = condMap.get(condition);
+
+                                        if (list != null && !list.isEmpty()) {
+                                            // Check if signalAll was called.
+                                            if (cnt == 0) {
+                                                cnt = list.size();
+                                            }
+
+                                            // Remove from global condition queue.
+                                            for (int i = 0; i < cnt; i++) {
+                                                if(list.isEmpty())
+                                                    break;
+
+                                                UUID uuid = list.remove(0);
+
+                                                // Skip if node to be released is not alive anymore.
+                                                if(ctx.discovery().node(uuid) == null){
+                                                    cnt++;
+
+                                                    continue;
+                                                }
+
+                                                LinkedList<String> queue = signalMap.get(uuid);
+
+                                                if (queue == null) {
+                                                    queue = new LinkedList<String>();
+
+                                                    signalMap.put(uuid, queue);
+                                                }
+
+                                                queue.add(condition);
+                                            }
+                                        }
+                                    }
+                                }
+
+                                val.setSignals(signalMap);
+
+                                // Check if this release is called after condition.await() call;
+                                // If true, add this node to the global waiting queue.
+                                if (lastCondition != null) {
+                                    LinkedList<UUID> queue;
+
+                                    if (!condMap.containsKey(lastCondition)) {
+                                        // New condition object.
+                                        queue = new LinkedList();
+                                    }
+                                    else {
+                                        // Existing condition object.
+                                        queue = condMap.get(lastCondition);
+                                    }
+
+                                    queue.add(thisNode);
+
+                                    condMap.put(lastCondition, queue);
+                                }
+
+                                val.setConditionMap(condMap);
+
+                                lockView.put(key, val);
+
+                                tx.commit();
+
+                                return true;
+                            }
+                            catch (Error | Exception e) {
+                                U.error(log, "Failed to compare and set: " + this, e);
+
+                                throw e;
+                            }
+                        }
+                    }),
+                    ctx
+                );
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+
+        protected synchronized boolean checkIncomingSignals(GridCacheLockState state){
+            if(state.getSignals() == null)
+                return false;
+
+            LinkedList<String> signals = state.getSignals().get(thisNode);
+
+            if(signals == null || signals.isEmpty())
+                return false;
+
+            UUID tempUUID = getOwnerNode();
+
+            Thread tempThread = getExclusiveOwnerThread();
+
+            long tempThreadID = currentOwnerThreadId;
+
+            // Temporarily allow current thread to signal condition object.
+            // This is safe to do because:
+            // 1. if release was called on this node,
+            // it was called from currently active thread;
+            // 2. if release came from a thread on any other node,
+            // all threads on this node are already blocked.
+            this.setCurrentOwnerNode(thisNode);
+
+            this.setExclusiveOwnerThread(Thread.currentThread());
+
+            this.currentOwnerThreadId = Thread.currentThread().getId();
+
+            for(String signal: signals){
+                conditionMap.get(signal).signal();
+            }
+
+            // Restore owner node and owner thread.
+            this.setCurrentOwnerNode(tempUUID);
+
+            this.setExclusiveOwnerThread(tempThread);
+
+            this.currentOwnerThreadId = tempThreadID;
+
+            return true;
+        }
+
+        /**
+         *  Condition implementation for {@linkplain IgniteLock}.
+         *
+         **/
+        public class IgniteConditionObject implements IgniteCondition {
+
+            private final String name;
+
+            private final AbstractQueuedSynchronizer.ConditionObject object;
+
+            protected IgniteConditionObject(String name, ConditionObject object){
+                this.name = name;
+
+                this.object = object;
+            }
+
+            /**
+             * Name of this condition.
+             *
+             * @return name Name of this condition object.
+             */
+            @Override public String name() {
+                return name;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void await() throws IgniteInterruptedException {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if(!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = this.name;
+
+                    object.await();
+
+                    sync.validate();
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void awaitUninterruptibly() {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = this.name;
+
+                    object.awaitUninterruptibly();
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public long awaitNanos(long nanosTimeout) throws IgniteInterruptedException {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if(!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = this.name;
+
+                    long result =  object.awaitNanos(nanosTimeout);
+
+                    sync.validate();
+
+                    return result;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean await(long time, TimeUnit unit) throws IgniteInterruptedException {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if(!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = this.name;
+
+                    boolean result = object.await(time, unit);
+
+                    sync.validate();
+
+                    return result;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean awaitUntil(Date deadline) throws IgniteInterruptedException {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if(!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    lastCondition = this.name;
+
+                    boolean result = object.awaitUntil(deadline);
+
+                    sync.validate();
+
+                    return result;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedException(e);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void signal() {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    validate();
+
+                    addOutgoingSignal(name);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public void signalAll() {
+                ctx.kernalContext().gateway().readLock();
+
+                try {
+                    if (!isHeldExclusively())
+                        throw new IllegalMonitorStateException();
+
+                    sync.validate();
+
+                    addOutgoingSignalAll(name);
+                }
+                finally {
+                    ctx.kernalContext().gateway().readUnlock();
+                }
+            }
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param name Reentrant lock name.
+     * @param key Reentrant lock key.
+     * @param lockView Reentrant lock projection.
+     * @param ctx Cache context.
+     */
+    public GridCacheLockImpl(String name,
+        GridCacheInternalKey key,
+        IgniteInternalCache<GridCacheInternalKey, GridCacheLockState> lockView,
+        GridCacheContext ctx) {
+        assert name != null;
+        assert key != null;
+        assert ctx != null;
+        assert lockView != null;
+
+        this.name = name;
+        this.key = key;
+        this.lockView = lockView;
+        this.ctx = ctx;
+
+        log = ctx.logger(getClass());
+    }
+
+    /**
+     * @throws IgniteCheckedException If operation failed.
+     */
+    private void initializeReentrantLock() throws IgniteCheckedException {
+        if (initGuard.compareAndSet(false, true)) {
+            try {
+                sync = CU.outTx(
+                    retryTopologySafe(new Callable<Sync>() {
+                        @Override public Sync call() throws Exception {
+                            try (IgniteInternalTx tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
+                                GridCacheLockState val = lockView.get(key);
+
+                                if (val == null) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to find reentrant lock with given name: " + name);
+
+                                    return null;
+                                }
+
+                                tx.rollback();
+
+                                return new Sync(val);
+                            }
+                        }
+                    }),
+                    ctx
+                );
+
+                if (log.isDebugEnabled())
+                    log.debug("Initialized internal sync structure: " + sync);
+            }
+            finally {
+                initLatch.countDown();
+            }
+        }
+        else {
+            U.await(initLatch);
+
+            if (sync == null)
+                throw new IgniteCheckedException("Internal reentrant lock has not been properly initialized.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onUpdate(GridCacheLockState val) {
+        // Called only on initialization, so it's safe to ignore update.
+        if (sync == null)
+            return;
+
+        try {
+            updateLock.lock();
+
+            // Check if update came from this node.
+            boolean local = sync.isLockedLocally(val.getId());
+
+            // Process any incoming signals.
+            boolean incomingSignals = sync.checkIncomingSignals(val);
+
+            // Update owner's node id.
+            sync.setCurrentOwnerNode(val.getId());
+
+            // Update owner's thread id.
+            sync.setCurrentOwnerThread(val.getThreadId());
+
+            // Update permission count.
+            sync.setPermits(val.get());
+
+            // Check if any threads waiting on this node need to be notified.
+            if ((incomingSignals || sync.getPermits() == 0) && !local) {
+                // Try to notify any waiting threads.
+                sync.release(0);
+            }
+
+        } finally{
+            updateLock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onNodeRemoved(UUID nodeId) {
+        try {
+            updateLock.lock();
+
+            if (nodeId.equals(sync.getOwnerNode())){
+                sync.setBroken(true);
+
+                if(!sync.failoverSafe){
+                    sync.interruptAll();
+                }
+
+                // Try to notify any waiting threads.
+                sync.release(0);
+            }
+        }
+        finally {
+            updateLock.unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        if (sync == null) {
+            interruptAll = true;
+
+            return;
+        }
+
+        sync.setBroken(true);
+
+        sync.interruptAll();
+
+        // Try to notify any waiting threads.
+        sync.release(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void lock() {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            sync.lock();
+
+            sync.validate();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void lockInterruptibly() throws IgniteInterruptedException {
+        ctx.kernalContext().gateway().readLock();
+
+        try {
+            initializeReentrantLock();
+
+            sync.acquireInterruptibly(1);
+
+            sync.validate();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryLock() {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            boolean result = sync.nonfairTryAcquire(1);
+
+            sync.validate();
+
+            return result;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tryLock(long timeout, TimeUnit unit) throws IgniteInterruptedException {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            boolean result = sync.tryAcquireNanos(1, unit.toNanos(timeout));
+
+            sync.validate();
+
+            return result;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        catch (InterruptedException e){
+            throw new IgniteInterruptedException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void unlock() {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            // Validate before release.
+            sync.validate();
+
+            sync.release(1);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    @NotNull @Override public Condition newCondition() {
+        throw new UnsupportedOperationException("IgniteLock does not allow creation of nameless conditions. ");
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCondition getOrCreateCondition(String name) {
+        ctx.kernalContext().gateway().readLock();
+
+        try{
+            initializeReentrantLock();
+
+            IgniteCondition result = sync.newCondition(name);
+
+            sync.validate();
+
+            return result;
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            ctx.kernalContext().gateway().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getHoldCount() {
+       try{
+            initializeReentrantLock();
+
+            return sync.getHoldCount();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isHeldByCurrentThread() {
+        try{
+            initializeReentrantLock();
+
+            return sync.isHeldExclusively();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isLocked() {
+        try{
+            initializeReentrantLock();
+
+            return sync.isLocked();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasQueuedThreads() {
+        try{
+            initializeReentrantLock();
+
+            return sync.hasQueuedThreads();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasQueuedThread(Thread thread) {
+        try{
+            initializeReentrantLock();
+
+            return sync.isQueued(thread);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasWaiters(IgniteCondition condition) {
+        try{
+            initializeReentrantLock();
+
+            AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name());
+
+            if(c == null)
+                throw new IllegalArgumentException();
+
+            return sync.hasWaiters(c);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getWaitQueueLength(IgniteCondition condition) {
+        try{
+            initializeReentrantLock();
+
+            AbstractQueuedSynchronizer.ConditionObject c = sync.conditionMap.get(condition.name());
+
+            if(c == null)
+                throw new IllegalArgumentException();
+
+            return sync.getWaitQueueLength(c);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    @Override public boolean isFailoverSafe() {
+        return sync.failoverSafe;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isBroken() {
+        try{
+            initializeReentrantLock();
+
+            return sync.isBroken();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheInternalKey key() {
+        return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        return rmvd;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onRemoved() {
+        return rmvd = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void needCheckNotRemoved() {
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx.kernalContext());
+        out.writeUTF(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+        t.set1((GridKernalContext)in.readObject());
+        t.set2(in.readUTF());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        if (!rmvd) {
+            try {
+                boolean force = sync != null ? sync.isBroken() && !sync.failoverSafe : false;
+
+                ctx.kernalContext().dataStructures().removeReentrantLock(name, force);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheLockImpl.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
new file mode 100644
index 0000000..f5b2d06
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockState.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastructures;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheInternal;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *  Grid cache reentrant lock state.
+ */
+public final class GridCacheLockState implements GridCacheInternal, Externalizable, Cloneable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Count. */
+    private int cnt;
+
+    /** Owner thread local id. */
+    private long threadId;
+
+    /** Owner node ID. */
+    private UUID id;
+
+    /** FailoverSafe flag. */
+    private boolean failoverSafe;
+
+    /** Map containing state for each condition object associated with this lock. */
+    @GridToStringInclude
+    private Map<String, LinkedList<UUID>> conditionMap;
+
+    /** Map containing unprocessed signals for condition objects that are associated with this lock. */
+    @GridToStringInclude
+    private Map<UUID, LinkedList<String>> signals;
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Initial count.
+     * @param id UUID of owning node.
+     */
+    public GridCacheLockState(int cnt, UUID id, long threadID, boolean failoverSafe) {
+        assert cnt >= 0;
+
+        this.id = id;
+
+        this.threadId = threadID;
+
+        conditionMap = new HashMap();
+
+        signals = null;
+
+        this.failoverSafe = failoverSafe;
+    }
+
+    /**
+     * Empty constructor required for {@link Externalizable}.
+     */
+    public GridCacheLockState() {
+        // No-op.
+    }
+
+    /**
+     * @param cnt New count.
+     */
+    public void set(int cnt) {
+        this.cnt = cnt;
+    }
+
+    /**
+     * @return Current count.
+     */
+    public int get() {
+        return cnt;
+    }
+
+    /**
+     * @return Current owner thread id.
+     */
+    public long getThreadId() {
+        return threadId;
+    }
+
+    /**
+     * @param threadId New thread owner id.
+     */
+    public void setThreadId(long threadId) {
+        this.threadId = threadId;
+    }
+
+    /**
+     * @return Current owner node id.
+     */
+    public UUID getId() {
+        return id;
+    }
+
+    /**
+     * @return New owner node id.
+     */
+    public void setId(UUID id) {
+        this.id = id;
+    }
+
+    public boolean isFailoverSafe() {
+        return failoverSafe;
+    }
+
+    public int condtionCount(){
+        return conditionMap.size();
+    }
+
+    public Map<String, LinkedList<UUID>> getConditionMap() {
+        return conditionMap;
+    }
+
+    public void setConditionMap(Map<String, LinkedList<UUID>> conditionMap) {
+        this.conditionMap = conditionMap;
+    }
+
+    public Map<UUID, LinkedList<String>> getSignals() {
+        return signals;
+    }
+
+    public void setSignals(Map<UUID, LinkedList<String>> signals) {
+        this.signals = signals;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(cnt);
+        out.writeLong(threadId);
+        U.writeUuid(out, id);
+
+        out.writeBoolean(failoverSafe);
+
+        out.writeBoolean(conditionMap != null);
+
+        if (conditionMap != null) {
+            out.writeInt(conditionMap.size());
+
+            for (Map.Entry<String, LinkedList<UUID>> e : conditionMap.entrySet()) {
+                U.writeString(out, e.getKey());
+
+                out.writeInt(e.getValue().size());
+
+                for(UUID uuid:e.getValue()){
+                    U.writeUuid(out,uuid);
+                }
+            }
+        }
+
+        out.writeBoolean(signals != null);
+
+        if (signals != null) {
+            out.writeInt(signals.size());
+
+            for (Map.Entry<UUID, LinkedList<String>> e : signals.entrySet()) {
+                U.writeUuid(out, e.getKey());
+
+                out.writeInt(e.getValue().size());
+
+                for(String condition:e.getValue()){
+                    U.writeString(out, condition);
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException {
+        cnt = in.readInt();
+        threadId = in.readLong();
+        id = U.readUuid(in);
+
+        failoverSafe = in.readBoolean();
+
+        if (in.readBoolean()) {
+            int size = in.readInt();
+
+            conditionMap = U.newLinkedHashMap(size);
+
+            for (int i = 0; i < size; i++) {
+                String key = U.readString(in);
+
+                int size1 = in.readInt();
+
+                LinkedList<UUID> list = new LinkedList();
+
+                for (int j = 0; j < size1; j++) {
+                    list.add(U.readUuid(in));
+                }
+
+                conditionMap.put(key, list);
+            }
+        }
+
+        if(in.readBoolean()) {
+            assert (conditionMap != null);
+
+            int size = in.readInt();
+
+            signals = U.newLinkedHashMap(size);
+
+            for (int i = 0; i < size; i++) {
+                UUID node = U.readUuid(in);
+
+                int size1 = in.readInt();
+
+                LinkedList<String> list = new LinkedList();
+
+                for (int j = 0; j < size1; j++) {
+                    list.add(U.readString(in));
+                }
+
+                signals.put(node, list);
+            }
+        }
+        else{
+            signals = null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheLockState.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e386558a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java
deleted file mode 100644
index 4a5238b..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheReentrantLockEx.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.datastructures;
-
-import java.util.UUID;
-import org.apache.ignite.IgniteReentrantLock;
-
-/**
- * Grid cache reentrant lock ({@code 'Ex'} stands for external).
- */
-public interface GridCacheReentrantLockEx extends IgniteReentrantLock, GridCacheRemovable {
-    /**
-     * Get current reentrant lock latch key.
-     *
-     * @return Lock key.
-     */
-    public GridCacheInternalKey key();
-
-    /**
-     * Callback to notify reentrant lock on changes.
-     *
-     * @param state New reentrant lock state.
-     */
-    public void onUpdate(GridCacheReentrantLockState state);
-
-    /**
-     * Callback to notify semaphore on topology changes.
-     *
-     * @param nodeId Id of the node that left the grid.
-     */
-    public void onNodeRemoved(UUID nodeId);
-}


Mime
View raw message